我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用elasticsearch.helpers.scan()。
def tag_by_phrase(self, tag_query, source): print('query=%s source=%s' % (json.dumps(tag_query), source)) resp = self.es.search(index='fcc-comments', body=tag_query, size=0) total = resp['hits']['total'] print('tagging %s / %s matches' % (self.limit, total)) docs = [] for doc in scan(self.es, index='fcc-comments', query=tag_query, size=1000): docs.append(lib.bulk_update_doc(doc['_id'], {'source': source})) if not len(docs) % 1000: print('\tfetched %s\n%s\t%s' % (len(docs), doc['_id'], doc['_source']['text_data'][:400])) if len(docs) >= self.limit: break print('indexing %s' % (len(docs))) tagged = lib.bulk_update(self.es, docs) print('tagged %s / %s matches' % (tagged, total)) return tagged
def preview(self, fraction=0.1): fetched = 0 scores = [] mod_print = int(1 / fraction) while fetched < self.limit: ''' use search instead of scan because keeping an ordered scan cursor open negates the performance benefits ''' resp = self.es.search(index='fcc-comments', body=self.query, size=self.limit) print('total=%s mod_print=%s' % (resp['hits']['total'], mod_print)) for doc in resp['hits']['hits']: fetched += 1 scores.append(doc['_score']) if not fetched % mod_print: print('\n--- comment %s\t%s\t%s\t%s' % (fetched, doc['_id'], doc['_score'], doc['_source']['text_data'][:1000]))
def delete_dataset(dataset_name): """Delete all entries from a particular dataset.""" q = {'query': {'term': {'dataset': dataset_name}}, '_source': False} def deletes(): for i, res in enumerate(scan(es, query=q, index=es_index)): yield { '_op_type': 'delete', '_index': str(es_index), '_type': res.get('_type'), '_id': res.get('_id') } if i > 0 and i % 10000 == 0: log.info("Delete %s: %s", dataset_name, i) es.indices.refresh(index=es_index) bulk(es, deletes(), stats_only=True, chunk_size=DATA_PAGE) optimize_search()
def _scan_fingerprints(dataset_name=None): if dataset_name: q = {'term': {'dataset': dataset_name}} else: q = {'match_all': {}} q = { 'query': q, '_source': ['fingerprints', 'dataset'] } scan_iter = scan(es, query=q, index=es_index, doc_type=Schema.ENTITY) for i, doc in enumerate(scan_iter): source = doc.get('_source') fps = source.get('fingerprints') if fps is None: continue for fp in fps: if fp is None: continue yield fp, source.get('dataset') if i != 0 and i % 10000 == 0: log.info("Crossref: %s entities...", i)
def find_matching_subscriptions(self, index_name: str) -> set: percolate_document = { 'query': { 'percolate': { 'field': "query", 'document_type': ESDocType.doc.name, 'document': self } } } subscription_ids = set() for hit in scan(ElasticsearchClient.get(self.logger), index=index_name, query=percolate_document): subscription_ids.add(hit["_id"]) self.logger.debug("Found matching subscription count: %i", len(subscription_ids)) return subscription_ids
def scan_index(index, model): """ Yield all documents of model type in an index. This function calls the elasticsearch.helpers.scan function, and yields all the documents in the index that match the doc_type produced by a specific Django model. Args: index: string, the name of the index to scan, must be a configured index as returned from settings.get_index_names. model: a Django model type, used to filter the the documents that are scanned. Yields each document of type model in index, one at a time. """ # see https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-type-query.html query = {"query": {"type": {"value": model._meta.model_name}}} client = get_client() for hit in helpers.scan(client, index=index, query=query): yield hit
def get_rev_links(self, model, rel, *item_types): filter_ = {'term': {'links.' + rel: str(model.uuid)}} if item_types: filter_ = [ filter_, {'terms': {'item_type': item_types}}, ] query = { 'stored_fields': [], 'query': { 'bool': { 'filter': filter_, } } } return [ hit['_id'] for hit in scan(self.es, query=query) ]
def run(self): ''' get documents without a sentiment tag that match significant terms: - significant terms from postive regex tagged vs others - extra multi match clause for stronger terms (in multiple term sets: positive vs negative, untagged, and all - phrase match net neutrality since both terms score high ''' index_queue = multiprocessing.Queue() bulk_index_process = multiprocessing.Process( target=self.bulk_index, args=(index_queue,), ) bulk_index_process.start() fetched = 0 try: while fetched < self.limit: ''' use search instead of scan because keeping an ordered scan cursor open negates the performance benefits ''' resp = self.es.search(index='fcc-comments', body=self.query, size=self.limit) for doc in resp['hits']['hits']: index_queue.put(doc['_id']) fetched += 1 if not fetched % 100: print('%s\t%s\t%s' % (fetched, doc['_score'], doc['_source']['text_data'])) except ConnectionTimeout: print('error fetching: connection timeout') index_queue.put(None) bulk_index_process.join()
def scan(self, index, query, limit=None, id_only=False): size = self.bulk_size max_records = None cnt = 0 if isinstance(limit, int): if limit > 0: size = min(limit, size) max_records = limit kw = dict( index=index, query=query, size=size ) if id_only: kw['_source'] = ['_id'] log.debug('Scanning for %s (size = %d, index = %s)', query, size, index) for hit in helpers.scan(self.client, **kw): if max_records: if cnt >= max_records: log.debug('Stopping after pulling %d records' ' as requested', cnt) raise StopIteration log.debug('Yielding %s', hit['_id']) cnt += 1 if id_only: yield hit.get('_id') else: yield hit
def get_hits(self, start_time, stop_time): #print "in get hits" time_filter_query={ "query":{"bool": { "filter": {"range": {"@timestamp": { "gte": start_time, "lte": stop_time, "format":"epoch_millis" }}}}}} scan_generator=helpers.scan(self.es, query=time_filter_query, index=self.es_index,) newHits=[] for item in scan_generator: utc_record=item['_source']['@timestamp'][:-1] new_value=calendar.timegm(datetime.datetime.strptime(utc_record, "%Y-%m-%dT%H:%M:%S.%f").timetuple())*1000 item['@timestamp']=new_value newHits.append(item) return newHits
def doc_feeder(self, index_type=None, index_name=None, step=10000, verbose=True, query=None, scroll='10m', **kwargs): conn = self.conn index_name = index_name or self.ES_INDEX_NAME doc_type = index_type or self.ES_INDEX_TYPE n = self.count(query=query)['count'] cnt = 0 t0 = time.time() if verbose: print('\ttotal docs: {}'.format(n)) _kwargs = kwargs.copy() _kwargs.update(dict(size=step, index=index_name, doc_type=doc_type)) res = helpers.scan(conn, query=query, scroll=scroll, **_kwargs) t1 = time.time() for doc in res: if verbose and cnt % step == 0: if cnt != 0: print('done.[%.1f%%,%s]' % (cnt*100./n, timesofar(t1))) print('\t{}-{}...'.format(cnt+1, min(cnt+step, n)), end='') t1 = time.time() yield doc cnt += 1 if verbose: print('done.[%.1f%%,%s]' % (cnt*100./n, timesofar(t1))) print("Finished! [{}]".format(timesofar(t0)))
def get_venues_by_location(location, radius, hour_for_cache): """Return a list of venues oid that are in location ('latitude,longitude') in the given radius. Cache results for one hour. """ lat_lon = location.split(',') body = { "query": { "filtered": { "query": {"match_all": {}}, "filter": { "geo_distance": { "distance": str(radius) + 'km', "location": { "lat": float(lat_lon[0]), "lon": float(lat_lon[1]) } } } } }, "_source": { "include": ["oid"] } } try: result = scan( es, index='lac', doc_type='geo_location', query=body, size=500) return [v['_source']['oid'] for v in result] except Exception as e: log.exception(e) return []
def scan(self, **kwargs): es = get_elasticsearch(self) return scan(es, index=es._index, doc_type=self.doc_type, **kwargs)
def get_scroll(query_dsl, es_client, index_name=None, keep_alive='1m'): """Returns an iterator for results matching query_dsl.""" if index_name is None: index_name = es_client.index_name scroll = scan( es_client, query=query_dsl, scroll=keep_alive, index=index_name) return scroll
def get_tags(self, repos, fromdate=None, todate=None): filter = { "bool": { "must": [], "should": [], } } for repo in repos: should_repo_clause = { "bool": { "must": [] } } should_repo_clause["bool"]["must"].append( {"term": {"repo": repo}} ) filter["bool"]["should"].append(should_repo_clause) body = { "filter": filter } body["filter"]["bool"]["must"].append( { "range": { "date": { "gte": fromdate, "lt": todate, } } } ) return [t for t in scanner(self.es, query=body, index=self.index, doc_type=self.dbname)]
def scan_all(self, scroll = '5m', #TODO - hard coded timeout. ): """ Most efficient way to scan all documents. """ rr = es_scan(client = self.es, index = self.index_name, doc_type = self.doc_type, scroll = scroll, query = {"query": {'match_all': {}}}, ) return rr
def typeahead_generate(): """ Re-generate typeahead search. This consists of a weighted set of completions for every possible query. Weighing ideas: - query frequency. - query results quality / count. - language model. TODO: Consider having the `NearestNeighborsBase` storage create this incrementally? Is that approach really better in a clustered setup? """ assert False,'WIP' if mc_config.LOW_LEVEL: es = mc_neighbors.low_level_es_connect() res = scan(client = es, index = index_name, doc_type = doc_type, scroll = '5m', #TODO - hard coded. query = {"query": {'match_all': {} }, #'from':0, #'size':1, }, ) else: nes = mc_neighbors.high_level_connect(index_name = index_name, doc_type = doc_type, ) res = nes.scan_all()
def list_designs(self): """ Return a list designs in corpus :return: a list of all design ids """ result = {} s = scan(self.es, index=self.index_name) for r in s: result.update({r['_source']['stl_id']: True}) return result.keys()
def clear(self, models=None, commit=True): """ Clears the backend of all documents/objects for a collection of models. :param models: List or tuple of models to clear. :param commit: Not used. """ if models is not None: assert isinstance(models, (list, tuple)) try: if models is None: self.conn.indices.delete(index=self.index_name, ignore=404) self.setup_complete = False self.existing_mapping = {} self.content_field_name = None else: models_to_delete = [] for model in models: models_to_delete.append("%s:%s" % (DJANGO_CT, get_model_ct(model))) # Delete using scroll API query = {'query': {'query_string': {'query': " OR ".join(models_to_delete)}}} generator = scan(self.conn, query=query, index=self.index_name, doc_type='modelresult') actions = ({ '_op_type': 'delete', '_id': doc['_id'], } for doc in generator) bulk(self.conn, actions=actions, index=self.index_name, doc_type='modelresult') self.conn.indices.refresh(index=self.index_name) except elasticsearch.TransportError as e: if not self.silently_fail: raise if models is not None: self.log.error("Failed to clear Elasticsearch index of models '%s': %s", ','.join(models_to_delete), e, exc_info=True) else: self.log.error("Failed to clear Elasticsearch index: %s", e, exc_info=True)
def _refresh_percolate_queries(self, index_name: str) -> None: # When dynamic templates are used and queries for percolation have been added # to an index before the index contains mappings of fields referenced by those queries, # the queries must be reloaded when the mappings are present for the queries to match. # See: https://github.com/elastic/elasticsearch/issues/5750 subscription_index_name = Config.get_es_index_name(ESIndexType.subscriptions, self.replica) es_client = ElasticsearchClient.get(self.logger) if not es_client.indices.exists(subscription_index_name): return subscription_queries = [{'_index': index_name, '_type': ESDocType.query.name, '_id': hit['_id'], '_source': hit['_source']['es_query'] } for hit in scan(es_client, index=subscription_index_name, doc_type=ESDocType.subscription.name, query={'query': {'match_all': {}}}) ] if subscription_queries: try: bulk(es_client, iter(subscription_queries), refresh=True) except BulkIndexError as ex: self.logger.error("Error occurred when adding subscription queries to index %s Errors: %s", index_name, ex.errors)
def _remove(self): bulk_deletes = [] for result in scan(self.elastic_conn, index="test", doc_type="test"): result['_op_type'] = 'delete' bulk_deletes.append(result) bulk(self.elastic_conn, bulk_deletes)
def handle_command(self, doc, namespace, timestamp): # Flush buffer before handle command self.commit() db = namespace.split('.', 1)[0] if doc.get('dropDatabase'): dbs = self.command_helper.map_db(db) for _db in dbs: self.elastic.indices.delete(index=_db.lower()) if doc.get('renameCollection'): raise errors.OperationFailed( "elastic_doc_manager does not support renaming a mapping.") if doc.get('create'): db, coll = self.command_helper.map_collection(db, doc['create']) if db and coll: self.elastic.indices.put_mapping( index=db.lower(), doc_type=coll, body={ "_source": {"enabled": True} }) if doc.get('drop'): db, coll = self.command_helper.map_collection(db, doc['drop']) if db and coll: # This will delete the items in coll, but not get rid of the # mapping. warnings.warn("Deleting all documents of type %s on index %s." "The mapping definition will persist and must be" "removed manually." % (coll, db)) responses = streaming_bulk( self.elastic, (dict(result, _op_type='delete') for result in scan( self.elastic, index=db.lower(), doc_type=coll))) for ok, resp in responses: if not ok: LOG.error( "Error occurred while deleting ElasticSearch docum" "ent during handling of 'drop' command: %r" % resp)
def _stream_search(self, *args, **kwargs): """Helper method for iterating over ES search results.""" for hit in scan(self.elastic, query=kwargs.pop('body', None), scroll='10m', **kwargs): hit['_source']['_id'] = hit['_id'] yield hit['_source']
def scan(index=None, doc_type=None, **kwargs): es = get_client(env.elasticsearch_alias) docs = helpers.scan(es, index=index, doc_type=doc_type, ignore=IGNORE, **kwargs) for doc in docs: jsonprint(doc)
def get_all_urls(self, limit=None): '''get all urls in the index ''' res = scan( self.conn, index=self.index, doc_type=RECORD_TYPE, _source_include=[], query={'query': {'match_all': {}}}) for item in islice(res, limit): yield item['_id']
def get_children(self, node): '''get child URLs of `url` ''' assert node.replica is not None res = scan( self.conn, index=self.index, doc_type=UNION_FIND_TYPE, _source_include=[], query={'query': {'term': {'parent': node.get_id()}}}) for item in res: yield AKANode.from_record(item['_source']['child'])
def get_all_unions(self): ''' ''' res = scan( self.conn, index=self.index, doc_type=UNION_FIND_TYPE) for item in res: yield item['_source']
def get_all_plugins(self): index = ElasticConfig.INDEX_VULTASKS["index"] doc_type = ElasticConfig.INDEX_VULTASKS["type"] scroll = "2m" size = 30 body = {"query" : {"match_all": {}}} data = helpers.scan(client=self.es, query=body, index=index, doc_type=doc_type, scroll=scroll, size=size) return data #search assets
def get_assets_doc(self, searchGroup): index = self.assetIndex doc_type = self.assetType scroll = "5m" size = 100 body = {"query":{"bool": {"must": searchGroup}}} data = helpers.scan(client=self.es, query=body, index=index, doc_type=doc_type, scroll=scroll, size=size) return data
def prune_index(index): """Remove all orphaned documents from an index. This function works by scanning the remote index, and in each returned batch of documents looking up whether they appear in the default index queryset. If they don't (they've been deleted, or no longer fit the qs filters) then they are deleted from the index. The deletion is done in one hit after the entire remote index has been scanned. The elasticsearch.helpers.scan function returns each document one at a time, so this function can swamp the database with SELECT requests. Please use sparingly. Returns a list of ids of all the objects deleted. """ logger.info("Pruning missing objects from index '%s'", index) prunes = [] responses = [] client = get_client() for model in get_index_models(index): for hit in scan_index(index, model): obj = _prune_hit(hit, model) if obj: prunes.append(obj) logger.info( "Found %s objects of type '%s' for deletion from '%s'.", len(prunes), model, index ) if len(prunes) > 0: actions = bulk_actions(prunes, index, 'delete') response = helpers.bulk(client, actions, chunk_size=get_setting('chunk_size')) responses.append(response) return responses
def reindex(client, source_index, target_index, target_client=None, chunk_size=500, scroll='5m', transformation_callable=None): """ Reindex all documents from one index to another, potentially (if `target_client` is specified) on a different cluster. .. note:: This helper doesn't transfer mappings, just the data. :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use (for read if `target_client` is specified as well) :arg source_index: index (or list of indices) to read documents from :arg target_index: name of the index in the target cluster to populate :arg target_client: optional, is specified will be used for writing (thus enabling reindex between clusters) :arg chunk_size: number of docs in one chunk sent to es (default: 500) :arg scroll: Specify how long a consistent view of the index should be maintained for scrolled search """ target_client = client if target_client is None else target_client docs = scan(client, index=source_index, scroll=scroll, _source_include=['*']) def _change_doc_index(hits, index): for h in hits: h['_index'] = index if transformation_callable is not None: h = transformation_callable(h) yield h return bulk(target_client, _change_doc_index(docs, target_index), chunk_size=chunk_size, stats_only=True)
def includeme(config): config.add_route('search', '/search{slash:/?}') config.add_route('report', '/report{slash:/?}') config.scan(__name__)
def __iter__(self, *item_types): query = { 'stored_fields': [], 'query': { 'bool': { 'filter': {'terms': {'item_type': item_types}} if item_types else {'match_all': {}} } } } for hit in scan(self.es, query=query): yield hit['_id']
def paginate(self, index, q='*', limit=None, size=None, id_only=True): if not size: size = self.bulk_size log.info('Limit %s, size %s (q = "%s")', limit, size, q) s = Search( using=self.client, index=index, doc_type=self.doc_type) s = s.query(Q('query_string', query=q)) if limit: size = min(size, limit) s = s.extra(size=size) s = s.params( scroll='20m', size=size) if id_only: s = s.source(False) log.debug('Query: %s', simplejson.dumps(s.to_dict(), indent=2)) hits = [] overall = 0 for h in s.scan(): if limit is not None and overall >= limit: raise StopIteration() log.debug('Hit: %s (progress: %d)', h.meta.id, overall) if overall < limit or not limit: if id_only: hits.append(h.meta.id) else: hits.append(h.to_dict()) if len(hits) == size: yield iter(hits) hits = [] overall += size if len(hits): yield iter(hits) else: raise StopIteration()
def analyze_gerrit(es_read, es_write, es_read_index, es_write_index, key): # Retrieve projects info projects = openstack_projects() # Retrieve uuids info uuids = Uuid(pandas.DataFrame(), file_path='openstack_uuids.csv') # Retrieve gender cached data enriched_gender = Gender(pandas.DataFrame(), key, "gerrit_gender.csv") es_write.indices.delete(es_write_index, ignore=[400, 404]) es_write.indices.create(es_write_index, body=MAPPING_GERRIT) query = {"query": {"match_all" :{}}} items = [] cont = 1 uniq_id = 1 first = True for item in helpers.scan(es_read, query, scroll='300m', index=es_read_index): items.append(item["_source"]) if cont % 15000 == 0: # Eventizing the first 7500 changesets gerrit_events = events.Gerrit(items) events_df = gerrit_events.eventize(2) print (cont) print (len(events_df)) # Adding projects information events_df = pandas.merge(events_df, projects, how='left', on='repository') # Adding gender info enriched_gender.data = events_df events_df = enriched_gender.enrich("owner") events_df = events_df.fillna("Unknown") print (len(events_df)) print (events_df.keys()) # Add author uuid uuids.data = events_df events_df["user"] = events_df["owner"] events_df = uuids.enrich(['user','email']) print (len(events_df)) print (events_df.keys()) # Uploading info to the new ES uniq_id = upload_data(events_df, es_write_index, es_write, uniq_id) items = [] cont = cont + 1 #helpers.bulk(es_write, docs) uniq_id = upload_data(events_df, es_write_index, es_write, uniq_id)
def clear(self, models=None, commit=True): # We actually don't want to do this here, as mappings could be # very different. # if not self.setup_complete: # self.setup() if models is not None: assert isinstance(models, (list, tuple)) try: if models is None: self.conn.indices.delete(index=self.index_name, ignore=404) self.setup_complete = False self.existing_mapping = {} else: models_to_delete = [] for model in models: models_to_delete.append("%s:%s" % (DJANGO_CT, get_model_ct(model))) # Delete by query in Elasticsearch asssumes you're dealing with # a ``query`` root object. :/ query = {"query": {"query_string": {"query": " OR ".join(models_to_delete)}}} if elasticsearch.__version__[0] != 2: self.conn.delete_by_query(index=self.index_name, doc_type='modelresult', body=query) else: for result in scan( self.conn, query=query, index=self.index_name, doc_type='modelresult', _source=False, scroll='1m'): self.conn.delete( index=result['_index'], doc_type=result['_type'], id=result['_id'], refresh=True, ignore=404 ) except elasticsearch.TransportError as e: if not self.silently_fail: raise if models is not None: self.log.error("Failed to clear Elasticsearch index of models '%s': %s", ','.join(models_to_delete), e, exc_info=True) else: self.log.error("Failed to clear Elasticsearch index: %s", e, exc_info=True)
def get_commits(self, mails=[], repos=[], fromdate=None, todate=None, start=0, limit=100, sort='desc', scan=False, merge_commit=None, metadata=[], mails_neg=False, domains=None): """ Return the list of commits for authors and/or repos. """ params = {'index': self.index, 'doc_type': self.dbname} body = { "filter": self.get_filter(mails, repos, metadata, mails_neg, domains), } # If None both are returned. If you expect to skip merge commits # then set merge_commit to False if merge_commit is not None: body["filter"]["bool"]["must"].append( {"term": {"merge_commit": merge_commit}}) body["filter"]["bool"]["must"].append( { "range": { "committer_date": { "gte": fromdate, "lt": todate, } } } ) if scan: return scanner(self.es, query=body, index=self.index, doc_type=self.dbname) params['body'] = body params['size'] = limit params['from_'] = start params['sort'] = "committer_date:%s,author_date:%s" % (sort, sort) res = self.es.search(**params) took = res['took'] hits = res['hits']['total'] commits = [r['_source'] for r in res['hits']['hits']] return took, hits, commits
def execute(self, quals, columns, _sortkeys=None): must_list, must_not_list = self._make_match_lists(quals) if must_list or must_not_list: query = get_filtered_query( must_list=must_list, must_not_list=must_not_list) else: query = {} # It's not clear if we should be using `fields` or `_source` here. # `fields` is useful for "stored" fields, which are stored separately # from the main _source JSON document. The idea is that the entire document # does not need to be reparsed when loading only a subset of the fields. # When fields aren't "stored" but are doc_values, they still seem to be # stored independently. # Tests suggest that at least in some cases when dealing with doc_values, # `fields` 1.16 times better than `_source`. query['fields'] = [self._column_to_es_field( column) for column in columns] # When using fields, the values always come back in an array, to make for # more consistent treatment of any actual array fields that we may have # requested. If the field is not truly an array field, the value comes back # in an array of one element. default_value = [None] log_to_postgres('query: %s' % query, logging.DEBUG) for result in scan( self.esclient, query=query, index=self.get_index(quals), doc_type=self._doc_type, size=self._SCROLL_SIZE, scroll=self._SCROLL_LENGTH): obs = result.get('fields', {}) def _massage_value(value, column): if column == '_id': # `_id` is special in that it's always present in the top-level # result, not under `fields`. return result['_id'] # If the column type is an array, return the list. # Otherwise, return the first element of the array. if self._columns[column].type_name.endswith('[]'): return value return value[0] row = { column: _massage_value( obs.get( self._column_to_es_field(column), default_value), column) for column in columns} yield row
def _search(self, esclient, config): """Search Generate events to Splunk from a Elasticsearch search""" # Search body # query-string-syntax # www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html body = { "sort":[{config[KEY_CONFIG_TIMESTAMP]:{"order": "asc"}}], "query": { "bool": { "must": [ {"range": { config[KEY_CONFIG_TIMESTAMP]: { "gte": config[KEY_CONFIG_EARLIEST], "lte": config[KEY_CONFIG_LATEST], "format": "epoch_second", } }}, {"query_string" : { "query" : config[KEY_CONFIG_QUERY], }} ] } } } # Execute search if self.scan: res = helpers.scan(esclient, size=config[KEY_CONFIG_LIMIT], index=config[KEY_CONFIG_INDEX], _source_include=config[KEY_CONFIG_FIELDS], doc_type=config[KEY_CONFIG_SOURCE_TYPE], query=body) for hit in res: yield self._parse_hit(config, hit) else: res = esclient.search(index=config[KEY_CONFIG_INDEX], size=config[KEY_CONFIG_LIMIT], _source_include=config[KEY_CONFIG_FIELDS], doc_type=config[KEY_CONFIG_SOURCE_TYPE], body=body) for hit in res['hits']['hits']: yield self._parse_hit(config, hit)