Downloaders - Elastic Search¶
-
class
MordinezNLP.downloaders.ElasticSearchDownloader.
ElasticSearchDownloader
(ip: str, port: int, api_key_1: Optional[str] = None, api_key_2: Optional[str] = None, timeout: int = 100)¶ Class used to download elastic search data from specified index using multithreading todo make tests
-
get_all_available_indexes
() → List[str]¶ Get all available elastic search indexes.
- Returns
a list of string where each element is a elastic search index name
- Return type
List[str]
-
scroll_data
(index_name: str, query: dict, processing_function: Callable[[dict], Any], threads: int = 6, scroll: str = '2m', scroll_size: int = 100) → List[any]¶ A function that scrolls through an elastic search data from index and returns a multithreaded data processed with processing_function. It returns a List of types returned by a processing_function.
- Parameters
index_name (str) – An index name to scroll/download the data
query (dict) – An elastic search query
processing_function (Callable[[dict], Any]) – A function that processes single item from elastic search index
threads (int) – A number of threads to run processing on
scroll (str) – A scroll value
scroll_size (int) – A size of scrolling items at once
- Returns
Returns a list of processed items with type according to a processing_function or empty list if index doesn’t exists.
- Return type
List[any]
-
Example usage:
from MordinezNLP.downloaders import ElasticSearchDownloader
es = ElasticSearchDownloader(
ip='',
port=9200,
timeout=10
)
body = {} # <- use your own elastic search query
' Your own processing function for a single element '
def processing_func(data: dict) -> str:
return data['my_key']['my_next_key'].replace("\r\n", "\n")
' Scroll the data '
downloaded_elastic_search_data = es.scroll_data(
'my_index_name',
body,
processing_func,
threads=8
)
print(len(downloaded_elastic_search_data))