我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用elasticsearch.helpers.streaming_bulk()。
def index_all(self, index_name): """Index all available documents, using streaming_bulk for speed Args: index_name (string): The index """ oks = 0 notoks = 0 for ok, item in streaming_bulk( self.es_client, self._iter_documents(index_name) ): if ok: oks += 1 else: notoks += 1 logging.info( "Import results: %d ok, %d not ok", oks, notoks )
def write(self, bucket, doc_type, rows, primary_key, update=False, as_generator=False): if primary_key is None or len(primary_key) == 0: raise ValueError('primary_key cannot be an empty list') def actions(rows_, doc_type_, primary_key_, update_): if update_: for row_ in rows_: yield { '_op_type': 'update', '_index': bucket, '_type': doc_type_, '_id': self.generate_doc_id(row_, primary_key_), '_source': { 'doc': row_, 'doc_as_upsert': True } } else: for row_ in rows_: yield { '_op_type': 'index', '_index': bucket, '_type': doc_type_, '_id': self.generate_doc_id(row_, primary_key_), '_source': row_ } iterables = itertools.tee(rows) actions_iterable = actions(iterables[0], doc_type, primary_key, update) iter = zip(streaming_bulk(self.__es, actions=actions_iterable), iterables[1]) if as_generator: for result, row in iter: yield row else: collections.deque(iter, maxlen=0) self.__es.indices.flush(bucket)
def record_events(self, events): def _build_bulk_index(event_list): for ev in event_list: traits = {t.name: t.value for t in ev.traits} yield {'_op_type': 'create', '_index': '%s_%s' % (self.index_name, ev.generated.date().isoformat()), '_type': ev.event_type, '_id': ev.message_id, '_source': {'timestamp': ev.generated.isoformat(), 'traits': traits, 'raw': ev.raw}} error = None for ok, result in helpers.streaming_bulk( self.conn, _build_bulk_index(events)): if not ok: __, result = result.popitem() if result['status'] == 409: LOG.info('Duplicate event detected, skipping it: %s', result) else: LOG.exception('Failed to record event: %s', result) error = storage.StorageUnknownWriteError(result) if self._refresh_on_write: self.conn.indices.refresh(index='%s_*' % self.index_name) while self.conn.cluster.pending_tasks(local=True)['tasks']: pass if error: raise error
def index(self, points): for p in points: p['_index'] = self.config['indexer']['idx_name'] p['_type'] = self.es_type results = helpers.streaming_bulk(self.client, points) for status, r in results: if not status: log.debug("index err result %s", r)
def index_events(client, events): results = helpers.streaming_bulk(client, events) for status, r in results: if not status: log.debug("index err result %s", r)
def handle_command(self, doc, namespace, timestamp): # Flush buffer before handle command self.commit() db = namespace.split('.', 1)[0] if doc.get('dropDatabase'): dbs = self.command_helper.map_db(db) for _db in dbs: self.elastic.indices.delete(index=_db.lower()) if doc.get('renameCollection'): raise errors.OperationFailed( "elastic_doc_manager does not support renaming a mapping.") if doc.get('create'): db, coll = self.command_helper.map_collection(db, doc['create']) if db and coll: self.elastic.indices.put_mapping( index=db.lower(), doc_type=coll, body={ "_source": {"enabled": True} }) if doc.get('drop'): db, coll = self.command_helper.map_collection(db, doc['drop']) if db and coll: # This will delete the items in coll, but not get rid of the # mapping. warnings.warn("Deleting all documents of type %s on index %s." "The mapping definition will persist and must be" "removed manually." % (coll, db)) responses = streaming_bulk( self.elastic, (dict(result, _op_type='delete') for result in scan( self.elastic, index=db.lower(), doc_type=coll))) for ok, resp in responses: if not ok: LOG.error( "Error occurred while deleting ElasticSearch docum" "ent during handling of 'drop' command: %r" % resp)
def bulk_upsert(self, docs, namespace, timestamp): """Insert multiple documents into Elasticsearch.""" def docs_to_upsert(): doc = None for doc in docs: # Remove metadata and redundant _id index, doc_type = self._index_and_mapping(namespace) doc_id = u(doc.pop("_id")) document_action = { '_index': index, '_type': doc_type, '_id': doc_id, '_source': self._formatter.format_document(doc) } document_meta = { '_index': self.meta_index_name, '_type': self.meta_type, '_id': doc_id, '_source': { 'ns': namespace, '_ts': timestamp } } yield document_action yield document_meta if doc is None: raise errors.EmptyDocsError( "Cannot upsert an empty sequence of " "documents into Elastic Search") try: kw = {} if self.chunk_size > 0: kw['chunk_size'] = self.chunk_size responses = streaming_bulk(client=self.elastic, actions=docs_to_upsert(), **kw) for ok, resp in responses: if not ok: LOG.error( "Could not bulk-upsert document " "into ElasticSearch: %r" % resp) if self.auto_commit_interval == 0: self.commit() except errors.EmptyDocsError: # This can happen when mongo-connector starts up, there is no # config file, but nothing to dump pass
def record_events(self, events): datetime_trait_fields = [ 'audit_period_beginning', 'audit_period_ending', 'deleted_at', 'created_at', 'launched_at', 'modify_at' ] def _build_bulk_index(event_list): for ev in event_list: traits = {} for t in ev.traits: name = t.name value = t.value if name in datetime_trait_fields: try: ts = timeutils.parse_isotime(value) ts = timeutils.normalize_time(ts) value = timeutils.strtime(ts) except ValueError: LOG.exception( _LE('Could not parse timestamp [%s] from [%s] traits field' % (value, name)) ) value = t.value traits[name] = value yield {'_op_type': 'create', '_index': '%s_%s' % (self.index_name, ev.generated.date().isoformat()), '_type': ev.event_type, '_id': ev.message_id, '_source': {'timestamp': ev.generated.isoformat(), 'traits': traits, 'raw': ev.raw}} error = None for ok, result in helpers.streaming_bulk( self.conn, _build_bulk_index(events)): if not ok: __, result = result.popitem() if result['status'] == 409: LOG.info(_LI('Duplicate event detected, skipping it: %s') % result) else: LOG.exception(_LE('Failed to record event: %s') % result) error = storage.StorageUnknownWriteError(result) if self._refresh_on_write: self.conn.indices.refresh(index='%s_*' % self.index_name) while self.conn.cluster.pending_tasks(local=True)['tasks']: pass if error: raise error