我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用elasticsearch.helpers()。
def get_linkify_items(collections, html_lower): items = elasticsearch.helpers.scan(current_app.es, index=current_app.es_data_db_index_name, doc_type=collections, scroll=u"3h") for i, item in enumerate(items): collection = item["_type"] item = item["_source"] for lang in ["He", "En"]: itemHeader = item.get("Header") title = itemHeader[lang] if itemHeader and lang in itemHeader else "" itemSlug = item.get("Slug") slug = itemSlug[lang] if itemSlug and lang in itemSlug else "" if slug and title and len(title) > 2 and title.lower() in html_lower: slug = slug.decode("utf-8") slug = slug.replace(u"_", u"/") url = u"http://dbs.bh.org.il/{}{}".format("he/" if lang == "he" else "", slug) yield {"collection": collection, "item": {"title": title, "url": url}}
def _bulk_delete(self, ops): errors = [] success_count, delete_failures = elasticsearch.helpers.bulk( self._es_conn, ops, raise_on_error=False, raise_on_exception=False ) for op in delete_failures: op_info = op['delete'] if op_info['status'] == 404: if op_info.get('result') == 'not_found': continue # < 5.x Elasticsearch versions do not return "result" if op_info.get('found') == False: continue if 'exception' in op_info: errors.append(op_info['exception']) else: errors.append("%s: %s" % (op_info['_id'], self._extract_error(op_info))) return errors
def emit(self, body, **kwargs): if not self._elasticsearch: self._elasticsearch = elasticsearch.Elasticsearch(self._hosts, **self._config) if self._bulk: if not isinstance(body, list): raise InvalidPayloadError('invalid payload for bulk indexing. ' 'list expected but %s found' % type(body)) else: if not isinstance(body, dict): raise InvalidPayloadError('invalid payload for document. ' 'dict expected but %s found' % type(body)) self._index_name = kwargs.pop('index', self._index_name) # build index name for daily index types if 'daily' in self._index_type: self._index_name = '%s-%s' % (self._index_name, datetime.now().strftime(self._daily_index_format)) if self._bulk: actions = [dict(_index=self._index_name, _type=self._document_type, _source=b) for b in body] elasticsearch.helpers.bulk(self._elasticsearch, actions) else: self._elasticsearch.index(self._index_name, self._document_type, body=body)
def get_elasticsearch_helper(self): """ Get helpers module for Elasticsearch. Used to bulk index documents. :returns: package ``elasticsearch.helpers`` """ return helpers
def bulk(self): bulk = BulkClient(self) yield bulk elasticsearch.helpers.bulk(self.client, bulk.operations, refresh=self.force_refresh)
def upgrade(elastic, dry_run=False): elastic = elasticsearch.Elasticsearch(elastic) print(json.dumps(elastic.info(), indent=2)) if dry_run: print("Exit from dry mode") return body = [] for hit in elasticsearch.helpers.scan(elastic, index="netmet_catalog", doc_type="config"): config = json.loads(hit["_source"]["config"]) if "static" in config: print("Updating record %s" % hit["_id"]) new_config = json.dumps({ "deployment": config, "mesher": {"full_mesh": {}}, "external": [] }) body.append(json.dumps({"update": {"_id": hit["_id"]}})) body.append(json.dumps({"doc": {"config": new_config}})) if body: elastic.bulk(index="netmet_catalog", doc_type="config", body="\n".join(body)) print("Upgrade finished. %s records changed" % str(len(body) / 2)) else: print("Everything is up to date.")
def do_index(index): now = time.time() print "Index wildcard:", index client = get_server_handle() fields = ['CMSPrimaryPrimaryDataset', 'CMSPrimaryProcessedDataset', 'CMSPrimaryDataTier', 'CRAB_UserHN', 'Workflow', 'CoreHr', 'CpuTimeHr', 'CommittedCoreHr', 'ChirpCMSSWReadBytes', 'ChirpCMSSWEvents', 'ExitCode'] body = {} body['fields'] = fields body['query'] = {"term": {"Type": "analysis"}} conn = create_db() count = 0 retry = True while retry: retry = False try: for result in elasticsearch.helpers.scan(client, query=body, doc_type="job", index=index, scroll="10s"): record_usage(result, conn) count += 1 if count % 10000 == 0: print "%s: Indexed %i records" % (time.ctime(), count) record_db(conn) except Exception, ex: print ex retry = True time.sleep(10) print "Modifications took %.1f minutes" % ((time.time()-now)/60.)
def index_records(es, records_generator): n_ok, n_errors = 0, 0 for date, records in itertools.groupby(records_generator, key=lambda rec: rec["datetime"]["date"]): index = config.log_index(date) print("Indexing records ({})".format(index)) ok, errors = elasticsearch.helpers.bulk(es, _build_actions_from(index, records)) n_ok += ok if errors: print("Errors: {}".format(errors)) n_errors += len(errors) print("Indexed successfully: {:d}, unsuccessfully: {:d}".format(n_ok, n_errors)) return n_ok, n_errors
def handle(self, rows, internal_client): self.logger.debug("Handling rows: %s" % repr(rows)) if not rows: return [] errors = [] bulk_delete_ops = [] mget_map = {} for row in rows: if row['deleted']: bulk_delete_ops.append({'_op_type': 'delete', '_id': self._get_document_id(row), '_index': self._index, '_type': self.DOC_TYPE}) continue mget_map[self._get_document_id(row)] = row if bulk_delete_ops: errors = self._bulk_delete(bulk_delete_ops) if not mget_map: self._check_errors(errors) return self.logger.debug("multiple get map: %s" % repr(mget_map)) stale_rows, mget_errors = self._get_stale_rows(mget_map) errors += mget_errors update_ops = [self._create_index_op(doc_id, row, internal_client) for doc_id, row in stale_rows] _, update_failures = elasticsearch.helpers.bulk( self._es_conn, update_ops, raise_on_error=False, raise_on_exception=False ) self.logger.debug("Index operations: %s" % repr(update_ops)) for op in update_failures: op_info = op['index'] if 'exception' in op_info: errors.append(op_info['exception']) else: errors.append("%s: %s" % ( op_info['_id'], self._extract_error(op_info))) self._check_errors(errors)