Python elasticsearch.helpers 模块,streaming_bulk() 实例源码

我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用elasticsearch.helpers.streaming_bulk()

项目:skills-ml    作者:workforce-data-initiative    | 项目源码 | 文件源码
def index_all(self, index_name):
        """Index all available documents, using streaming_bulk for speed
        Args:

        index_name (string): The index
        """
        oks = 0
        notoks = 0
        for ok, item in streaming_bulk(
            self.es_client,
            self._iter_documents(index_name)
        ):
            if ok:
                oks += 1
            else:
                notoks += 1
        logging.info(
            "Import results: %d ok, %d not ok",
            oks,
            notoks
        )
项目:tableschema-elasticsearch-py    作者:frictionlessdata    | 项目源码 | 文件源码
def write(self, bucket, doc_type, rows, primary_key, update=False, as_generator=False):

        if primary_key is None or len(primary_key) == 0:
            raise ValueError('primary_key cannot be an empty list')

        def actions(rows_, doc_type_, primary_key_, update_):
            if update_:
                for row_ in rows_:
                    yield {
                        '_op_type': 'update',
                        '_index': bucket,
                        '_type': doc_type_,
                        '_id': self.generate_doc_id(row_, primary_key_),
                        '_source': {
                            'doc': row_,
                            'doc_as_upsert': True
                        }
                    }
            else:
                for row_ in rows_:
                    yield {
                        '_op_type': 'index',
                        '_index': bucket,
                        '_type': doc_type_,
                        '_id': self.generate_doc_id(row_, primary_key_),
                        '_source': row_
                    }

        iterables = itertools.tee(rows)
        actions_iterable = actions(iterables[0], doc_type, primary_key, update)

        iter = zip(streaming_bulk(self.__es, actions=actions_iterable), iterables[1])

        if as_generator:
            for result, row in iter:
                yield row
        else:
            collections.deque(iter, maxlen=0)

        self.__es.indices.flush(bucket)
项目:panko    作者:openstack    | 项目源码 | 文件源码
def record_events(self, events):

        def _build_bulk_index(event_list):
            for ev in event_list:
                traits = {t.name: t.value for t in ev.traits}
                yield {'_op_type': 'create',
                       '_index': '%s_%s' % (self.index_name,
                                            ev.generated.date().isoformat()),
                       '_type': ev.event_type,
                       '_id': ev.message_id,
                       '_source': {'timestamp': ev.generated.isoformat(),
                                   'traits': traits,
                                   'raw': ev.raw}}

        error = None
        for ok, result in helpers.streaming_bulk(
                self.conn, _build_bulk_index(events)):
            if not ok:
                __, result = result.popitem()
                if result['status'] == 409:
                    LOG.info('Duplicate event detected, skipping it: %s',
                             result)
                else:
                    LOG.exception('Failed to record event: %s', result)
                    error = storage.StorageUnknownWriteError(result)

        if self._refresh_on_write:
            self.conn.indices.refresh(index='%s_*' % self.index_name)
            while self.conn.cluster.pending_tasks(local=True)['tasks']:
                pass
        if error:
            raise error
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def index(self, points):
        for p in points:
            p['_index'] = self.config['indexer']['idx_name']
            p['_type'] = self.es_type

        results = helpers.streaming_bulk(self.client, points)
        for status, r in results:
            if not status:
                log.debug("index err result %s", r)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def index_events(client, events):
    results = helpers.streaming_bulk(client, events)
    for status, r in results:
        if not status:
            log.debug("index err result %s", r)
项目:elastic2-doc-manager    作者:mongodb-labs    | 项目源码 | 文件源码
def handle_command(self, doc, namespace, timestamp):
        # Flush buffer before handle command
        self.commit()
        db = namespace.split('.', 1)[0]
        if doc.get('dropDatabase'):
            dbs = self.command_helper.map_db(db)
            for _db in dbs:
                self.elastic.indices.delete(index=_db.lower())

        if doc.get('renameCollection'):
            raise errors.OperationFailed(
                "elastic_doc_manager does not support renaming a mapping.")

        if doc.get('create'):
            db, coll = self.command_helper.map_collection(db, doc['create'])
            if db and coll:
                self.elastic.indices.put_mapping(
                    index=db.lower(), doc_type=coll,
                    body={
                        "_source": {"enabled": True}
                    })

        if doc.get('drop'):
            db, coll = self.command_helper.map_collection(db, doc['drop'])
            if db and coll:
                # This will delete the items in coll, but not get rid of the
                # mapping.
                warnings.warn("Deleting all documents of type %s on index %s."
                              "The mapping definition will persist and must be"
                              "removed manually." % (coll, db))
                responses = streaming_bulk(
                    self.elastic,
                    (dict(result, _op_type='delete') for result in scan(
                        self.elastic, index=db.lower(), doc_type=coll)))
                for ok, resp in responses:
                    if not ok:
                        LOG.error(
                            "Error occurred while deleting ElasticSearch docum"
                            "ent during handling of 'drop' command: %r" % resp)
项目:elastic2-doc-manager    作者:mongodb-labs    | 项目源码 | 文件源码
def bulk_upsert(self, docs, namespace, timestamp):
        """Insert multiple documents into Elasticsearch."""
        def docs_to_upsert():
            doc = None
            for doc in docs:
                # Remove metadata and redundant _id
                index, doc_type = self._index_and_mapping(namespace)
                doc_id = u(doc.pop("_id"))
                document_action = {
                    '_index': index,
                    '_type': doc_type,
                    '_id': doc_id,
                    '_source': self._formatter.format_document(doc)
                }
                document_meta = {
                    '_index': self.meta_index_name,
                    '_type': self.meta_type,
                    '_id': doc_id,
                    '_source': {
                        'ns': namespace,
                        '_ts': timestamp
                    }
                }
                yield document_action
                yield document_meta
            if doc is None:
                raise errors.EmptyDocsError(
                    "Cannot upsert an empty sequence of "
                    "documents into Elastic Search")
        try:
            kw = {}
            if self.chunk_size > 0:
                kw['chunk_size'] = self.chunk_size

            responses = streaming_bulk(client=self.elastic,
                                       actions=docs_to_upsert(),
                                       **kw)

            for ok, resp in responses:
                if not ok:
                    LOG.error(
                        "Could not bulk-upsert document "
                        "into ElasticSearch: %r" % resp)
            if self.auto_commit_interval == 0:
                self.commit()
        except errors.EmptyDocsError:
            # This can happen when mongo-connector starts up, there is no
            # config file, but nothing to dump
            pass
项目:elastic-doc-manager    作者:mongodb-labs    | 项目源码 | 文件源码
def bulk_upsert(self, docs, namespace, timestamp):
        """Insert multiple documents into Elasticsearch."""
        def docs_to_upsert():
            doc = None
            for doc in docs:
                # Remove metadata and redundant _id
                index, doc_type = self._index_and_mapping(namespace)
                doc_id = u(doc.pop("_id"))
                document_action = {
                    '_index': index,
                    '_type': doc_type,
                    '_id': doc_id,
                    '_source': self._formatter.format_document(doc)
                }
                document_meta = {
                    '_index': self.meta_index_name,
                    '_type': self.meta_type,
                    '_id': doc_id,
                    '_source': {
                        'ns': namespace,
                        '_ts': timestamp
                    }
                }
                yield document_action
                yield document_meta
            if doc is None:
                raise errors.EmptyDocsError(
                    "Cannot upsert an empty sequence of "
                    "documents into Elastic Search")
        try:
            kw = {}
            if self.chunk_size > 0:
                kw['chunk_size'] = self.chunk_size

            responses = streaming_bulk(client=self.elastic,
                                       actions=docs_to_upsert(),
                                       **kw)

            for ok, resp in responses:
                if not ok:
                    LOG.error(
                        "Could not bulk-upsert document "
                        "into ElasticSearch: %r" % resp)
            if self.auto_commit_interval == 0:
                self.commit()
        except errors.EmptyDocsError:
            # This can happen when mongo-connector starts up, there is no
            # config file, but nothing to dump
            pass
项目:fuel-plugin-openstack-telemetry    作者:openstack    | 项目源码 | 文件源码
def record_events(self, events):
        datetime_trait_fields = [
            'audit_period_beginning',
            'audit_period_ending',
            'deleted_at',
            'created_at',
            'launched_at',
            'modify_at'
        ]
        def _build_bulk_index(event_list):
            for ev in event_list:
                traits = {}
                for t in ev.traits:
                    name = t.name
                    value = t.value
                    if name in datetime_trait_fields:
                        try:
                            ts = timeutils.parse_isotime(value)
                            ts = timeutils.normalize_time(ts)
                            value = timeutils.strtime(ts)
                        except ValueError:
                            LOG.exception(
                                _LE('Could not parse timestamp [%s] from [%s] traits field' % (value, name))
                            )
                            value = t.value
                    traits[name] = value
                yield {'_op_type': 'create',
                       '_index': '%s_%s' % (self.index_name,
                                            ev.generated.date().isoformat()),
                       '_type': ev.event_type,
                       '_id': ev.message_id,
                       '_source': {'timestamp': ev.generated.isoformat(),
                                   'traits': traits,
                                   'raw': ev.raw}}

        error = None
        for ok, result in helpers.streaming_bulk(
                self.conn, _build_bulk_index(events)):
            if not ok:
                __, result = result.popitem()
                if result['status'] == 409:
                    LOG.info(_LI('Duplicate event detected, skipping it: %s')
                             % result)
                else:
                    LOG.exception(_LE('Failed to record event: %s') % result)
                    error = storage.StorageUnknownWriteError(result)

        if self._refresh_on_write:
            self.conn.indices.refresh(index='%s_*' % self.index_name)
            while self.conn.cluster.pending_tasks(local=True)['tasks']:
                pass
        if error:
            raise error