我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用elasticsearch.helpers.parallel_bulk()。
def parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=100 * 1024 * 1024, expand_action_callback=es_helpers.expand_action, **kwargs): """ es_helpers.parallel_bulk rewritten with imap_fixed_output_buffer instead of Pool.imap, which consumed unbounded memory if the generator outruns the upload (which usually happens). """ actions = map(expand_action_callback, actions) for result in imap_fixed_output_buffer( lambda chunk: list( es_helpers._process_bulk_chunk(client, chunk, **kwargs)), es_helpers._chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer), threads=thread_count, ): for item in result: yield item
def bulk(self, client, actions, stats_only=False, **kwargs): success, failed = 0, 0 # list of errors to be collected is not stats_only errors = [] for ok, item in parallel_bulk(client, actions, **kwargs): # go through request-reponse pairs and detect failures if not ok: if not stats_only: errors.append(item) failed += 1 else: success += 1 return success, failed if stats_only else errors
def parallel_bulk(self, the_iter, *args, **kw): if self.use_custom_parallel_bulk: return self._non_parallel_bulk(the_iter, *args, **kw) else: return es_parallel_bulk(self.es, the_iter, *args, **kw)
def _perform_index_sync(self, sql_table_cls, es_doc_cls, id_logger): es_doc = es_doc_cls() elasticsearch_conn = connections.get_connection() sync_timestamp = current_server_timestamp() pending_insertions = self._compute_dirty_documents( sql_table_cls, es_doc.doc_type) bulk_op = self._synchronisation_op(es_doc, pending_insertions) self._logging(logging.INFO, 'Performing synchronization.') for ok, info in parallel_bulk(elasticsearch_conn, bulk_op): obj_id = info['index']['_id'] \ if 'index' in info else info['update']['_id'] if ok: # Mark the task as handled so we don't retreat it next time self._logging(logging.INFO, 'Document %s has been synced successfully.' % obj_id) sql_table_cls.update_last_sync(obj_id, sync_timestamp) else: id_logger(obj_id, logging.ERROR, 'Error while syncing document %s index.' % obj_id) # Refresh indices to increase research speed elasticsearch_dsl.Index(es_doc.index).refresh()
def _geocomplete_index_batch(self, elasticsearch_conn, to_index): log_msg = 'Indexing documents.' self._logging(logging.INFO, log_msg) for ok, info in parallel_bulk(elasticsearch_conn, to_index): if not ok: doc_id = info['create']['_id'] doc_type = info['create']['_type'] doc_index = info['create']['_index'] logging_level = logging.ERROR err_msg = "Couldn't index document: '%s', of type: %s, " \ "under index: %s." % (doc_id, doc_type, doc_index) self._logging(logging_level, err_msg)
def run(self): # Chequeo si existe el índice, si no, lo creo if not self.elastic.indices.exists(settings.TEST_INDEX): self.elastic.indices.create(settings.TEST_INDEX, body=INDEX_CREATION_BODY) for interval in COLLAPSE_INTERVALS: self.init_series(interval) for success, info in parallel_bulk(self.elastic, self.bulk_items): if not success: print("ERROR:", info)
def run(self, distributions=None): """Indexa en Elasticsearch todos los datos de las distribuciones guardadas en la base de datos, o las especificadas por el iterable 'distributions' """ self.init_index() # Optimización: Desactivo el refresh de los datos mientras indexo self.elastic.indices.put_settings( index=self.index, body=constants.DEACTIVATE_REFRESH_BODY ) logger.info(strings.INDEX_START) for distribution in distributions: fields = distribution.field_set.all() fields = {field.title: field.series_id for field in fields} df = self.init_df(distribution, fields) self.generate_properties(df, fields) logger.info(strings.BULK_REQUEST_START) for success, info in parallel_bulk(self.elastic, self.bulk_actions): if not success: logger.warn(strings.BULK_REQUEST_ERROR, info) logger.info(strings.BULK_REQUEST_END) # Reactivo el proceso de replicado una vez finalizado self.elastic.indices.put_settings( index=self.index, body=constants.REACTIVATE_REFRESH_BODY ) segments = constants.FORCE_MERGE_SEGMENTS self.elastic.indices.forcemerge(index=self.index, max_num_segments=segments) logger.info(strings.INDEX_END)
def pump_it(es, bulk_accumulator): rows_pumped = 0 # TODO: make threads and chunks configurable for success, info in parallel_bulk(es, bulk_accumulator, thread_count=16, chunk_size=300): if success: rows_pumped += 1 else: logger.warning('Pumping documents failed: {}'.format(info)) return rows_pumped
def flush_cache(self): if len(self.cache) == 0: return True retry = 2 for i in range(retry): try: to_upload = helpers.parallel_bulk(self.es, self.cache_insertable_iterable()) counter = 0 num_items = len(self.cache) for item in to_upload: self.logger.debug("{} of {} Elastic objects uploaded".format(num_items, counter)) counter = counter + 1 output = "Pushed {} items to Elasticsearch to index {}".format(num_items, self.index) output += " and browbeat UUID {}".format(str(browbeat_uuid)) self.logger.info(output) self.cache = deque() self.last_upload = datetime.datetime.utcnow() return True except Exception as Err: self.logger.error( "Error pushing data to Elasticsearch, going to retry" " in 10 seconds") self.logger.error("Exception: {}".format(Err)) time.sleep(10) if i == (retry - 1): self.logger.error("Pushing Data to Elasticsearch failed in spite of retry," " dumping JSON for {} cached items".format(len(self.cache))) for item in self.cache: filename = item['test_name'] + '-' + item['identifier'] filename += '-elastic' + '.' + 'json' elastic_file = os.path.join(item['result_dir'], filename) with open(elastic_file, 'w') as result_file: json.dump(item['result'], result_file, indent=4, sort_keys=True) self.logger.info("Saved Elasticsearch consumable result JSON to {}". format(elastic_file)) self.cache = deque() self.last_upload = datetime.datetime.utcnow() return False