Downloaders - Elastic Search

class MordinezNLP.downloaders.ElasticSearchDownloader.ElasticSearchDownloader(ip: str, port: int, api_key_1: Optional[str] = None, api_key_2: Optional[str] = None, timeout: int = 100)

Class used to download elastic search data from specified index using multithreading todo make tests

get_all_available_indexes() → List[str]

Get all available elastic search indexes.

Returns

a list of string where each element is a elastic search index name

Return type

List[str]

scroll_data(index_name: str, query: dict, processing_function: Callable[[dict], Any], threads: int = 6, scroll: str = '2m', scroll_size: int = 100) → List[any]

A function that scrolls through an elastic search data from index and returns a multithreaded data processed with processing_function. It returns a List of types returned by a processing_function.

Parameters
  • index_name (str) – An index name to scroll/download the data

  • query (dict) – An elastic search query

  • processing_function (Callable[[dict], Any]) – A function that processes single item from elastic search index

  • threads (int) – A number of threads to run processing on

  • scroll (str) – A scroll value

  • scroll_size (int) – A size of scrolling items at once

Returns

Returns a list of processed items with type according to a processing_function or empty list if index doesn’t exists.

Return type

List[any]

Example usage:

from MordinezNLP.downloaders import ElasticSearchDownloader

es = ElasticSearchDownloader(
    ip='',
    port=9200,
    timeout=10
)

body = {}  # <- use your own elastic search query

' Your own processing function for a single element '
def processing_func(data: dict) -> str:
    return data['my_key']['my_next_key'].replace("\r\n", "\n")


' Scroll the data '
downloaded_elastic_search_data = es.scroll_data(
    'my_index_name',
    body,
    processing_func,
    threads=8
)

print(len(downloaded_elastic_search_data))