protected Observable<SearchResponse> scroll() { return defer(() -> { if (scrollId == null) { return just(null); } SearchScrollRequestBuilder request = elasticsearch.get() .prepareSearchScroll(scrollId) .setScroll(timeValueMillis(elasticsearch.getDefaultScrollTimeout())); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Request = " + Jsonify.toString(request)); } return elasticsearch.execute(vertxContext, request, elasticsearch.getDefaultSearchTimeout()) .map(Optional::get) .flatMap(searchResponse -> just(searchResponse) .map(searchResponse1 -> { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Response = " + Jsonify.toString(searchResponse1)); } return searchResponse1; })); }); }
protected <E extends EsDocument> ScrollableResponse<List<E>> scrollNext( String scrollId, ThrowingFunction<String, E> createFunc) throws Exception { SearchScrollRequestBuilder builder = esClient.prepareSearchScroll(scrollId) .setScroll(TimeValue.timeValueMillis(SCROLLDEFAULTTIMEOUT)); SearchResponse response = builder.execute().actionGet(); ArrayList<E> list = new ArrayList<>(); ScrollableResponse<List<E>> ret = new ScrollableResponse<>(); ret.setValue(list); ret.setContinousToken(response.getScrollId()); if (response.getHits().totalHits() == 0) { //Clear the scroll as early as possible to save resource ClearScrollRequestBuilder clearRequestBuilder = esClient.prepareClearScroll().addScrollId(scrollId); clearRequestBuilder.execute(); ret.setScrollToEnd(true); } else { for (int i = 0; i < response.getHits().getHits().length; i++) { String str = response.getHits().getAt(i).getSourceAsString(); E element = createFunc.apply(str); element.setId(response.getHits().getAt(i).getId()); list.add(element); } } return ret; }
public void emit() { // 构建查询 Scroll esScroll = new Scroll(new TimeValue(scroll, TimeUnit.MINUTES)); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setScroll(esScroll).setQuery(query).setSize(size); if (StringUtils.isNotEmpty(type)) { searchRequestBuilder.setTypes(type); } if (logger.isDebugEnabled()) { logger.debug(searchRequestBuilder.toString()); } String scrollId = null; try { SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); scrollId = searchResponse.getScrollId(); long total = searchResponse.getHits().getTotalHits(); // 处理第一次search查询 int messageCount = submitMessage(searchResponse); // 计算剩余的批次 long remainTotal = total - messageCount; long batchNum = remainTotal % size == 0 ? remainTotal / size : (remainTotal / size) + 1; for (long i = 0; !stop && i < batchNum; i++) { // 按批查询数据 SearchScrollRequestBuilder scrollRequestBuilder = client.prepareSearchScroll(scrollId).setScroll(esScroll); searchResponse = scrollRequestBuilder.execute().actionGet(); submitMessage(searchResponse); } } catch (Exception e) { logger.error("query error", e); } finally { if (StringUtils.isNotEmpty(scrollId)) { client.prepareClearScroll().addScrollId(scrollId).execute().actionGet(); } } }
public SearchScrollRequestBuilder prepareSearchScroll(String scrollId) { return client.prepareSearchScroll(scrollId); }
@Override public SearchScrollRequestBuilder prepareSearchScroll(String scrollId) { return new SearchScrollRequestBuilder(this, SearchScrollAction.INSTANCE, scrollId); }
protected Observable<Void> scroll( final PersistentAccount account, final String prefix, final String delimiter, final String marker, final String endMarker, final int limit, final Elasticsearch elasticsearch, final String scrollId, final NavigableMap<String, SparseContainer> listedContainers) { SearchScrollRequestBuilder scrollRequest = elasticsearch.get().prepareSearchScroll(scrollId) .setScroll(timeValueMillis(elasticsearch.getDefaultScrollTimeout())); if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("Search Request = %s", Jsonify.toString(scrollRequest))); } return elasticsearch.execute(vertxContext, scrollRequest, elasticsearch.getDefaultSearchTimeout()) .map(Optional::get) .flatMap(searchResponse -> { if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("Search Response = %s", Jsonify.toString(searchResponse))); } SearchHits hits = searchResponse.getHits(); int numberOfHits = hits.getHits().length; if (numberOfHits > 0) { return toIterable(account, prefix, delimiter, marker, endMarker, hits) .doOnNext(sparseContainer -> { String id = sparseContainer.getContainerName(); SparseContainer existing = listedContainers.get(id); if (existing == null) { listedContainers.put(id, sparseContainer); } else { existing.setByteCount(existing.getByteCount() + sparseContainer.getByteCount()); existing.setObjectCount(existing.getObjectCount() + sparseContainer.getObjectCount()); } if (listedContainers.size() > limit) { listedContainers.pollLastEntry(); } }) .count() .map(new ToVoid<>()) .flatMap(aVoid -> scroll(account, prefix, delimiter, marker, endMarker, limit, elasticsearch, searchResponse.getScrollId(), listedContainers)); } else { return clearScroll(elasticsearch, searchResponse.getScrollId()); } }); }
protected Observable<Void> scroll( final PersistentContainer container, final String prefix, final String delimiter, final String marker, final String endMarker, final int limit, final Elasticsearch elasticsearch, final String scrollId, final NavigableMap<String, ListedObject> listedObjects) { SearchScrollRequestBuilder scrollRequest = elasticsearch.get().prepareSearchScroll(scrollId) .setScroll(timeValueMillis(elasticsearch.getDefaultScrollTimeout())); if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("Search Request = %s", Jsonify.toString(scrollRequest))); } return elasticsearch.execute(vertxContext, scrollRequest, elasticsearch.getDefaultSearchTimeout()) .map(Optional::get) .flatMap(searchResponse -> { if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("Search Response = %s", Jsonify.toString(searchResponse))); } SearchHits hits = searchResponse.getHits(); int numberOfHits = hits.getHits().length; if (numberOfHits > 0) { for (ListedObject listedObject : toIterable(container, prefix, delimiter, marker, endMarker, hits)) { String id = listedObject.getName(); ListedObject existing = listedObjects.get(id); if (existing == null) { listedObjects.put(id, listedObject); } else { existing.setLength(existing.getLength() + listedObject.getLength()); } if (listedObjects.size() > limit) { listedObjects.pollLastEntry(); } } return scroll(container, prefix, delimiter, marker, endMarker, limit, elasticsearch, searchResponse.getScrollId(), listedObjects); } else { return clearScroll(elasticsearch, searchResponse.getScrollId()); } }); }
@Override public SearchScrollRequestBuilder prepareSearchScroll(String scrollId) { // TODO Auto-generated method stub return null; }
public SearchScrollRequestBuilder prepareSearchScroll(String scrollId) { return client.getClient().prepareSearchScroll(scrollId); }
public SearchScrollRequestBuilder prepareSearchScroll(String scrollId) { return internalClient.prepareSearchScroll(scrollId); }
public SearchScrollRequestBuilder getScrollBuilder(String scrollId){ return esProvider.getClient().prepareSearchScroll( scrollId ); }
/** * A search scroll request to continue searching a previous scrollable search request. */ SearchScrollRequestBuilder prepareSearchScroll(String scrollId);