public List<UploadDocumentsResult> uploadDocuments(String domainName, List<Document> docs) { if (docs.size() == 0) { return Collections.emptyList(); } AmazonCloudSearchDomainAsyncClient domainClient = cloudSearchDomainAsyncClients.get(domainName); if (domainClient == null) { throw new IllegalArgumentException(domainName + " not known"); } List<UploadDocumentsRequest> uploadDocumentsRequests = createUploadDocumentsRequest(docs); List<UploadDocumentsResult> uploadDocumentsResults = new ArrayList<>(uploadDocumentsRequests.size()); for (UploadDocumentsRequest uploadDocumentsRequest : uploadDocumentsRequests) { UploadDocumentsResult uploadDocumentsResult = domainClient.uploadDocuments(uploadDocumentsRequest); uploadDocumentsResults.add(uploadDocumentsResult); } return uploadDocumentsResults; }
public List<Future<UploadDocumentsResult>> uploadDocumentsAsync(String domainName, List<Document> docs) { if (docs.size() == 0) { return Collections.emptyList(); } AmazonCloudSearchDomainAsyncClient domainClient = cloudSearchDomainAsyncClients.get(domainName); if (domainClient == null) { throw new IllegalArgumentException(domainName + " not known"); } List<UploadDocumentsRequest> uploadDocumentsRequests = createUploadDocumentsRequest(docs); List<Future<UploadDocumentsResult>> uploadDocumentsResults = new ArrayList<>(uploadDocumentsRequests.size()); for (UploadDocumentsRequest uploadDocumentsRequest : uploadDocumentsRequests) { Future<UploadDocumentsResult> uploadDocumentsResult = domainClient .uploadDocumentsAsync(uploadDocumentsRequest); uploadDocumentsResults.add(uploadDocumentsResult); } return uploadDocumentsResults; }
private List<UploadDocumentsRequest> createUploadDocumentsRequest(List<Document> docs) { List<String> parts = chunkedJson(docs); List<UploadDocumentsRequest> uploadDocumentRequests = new ArrayList<>(parts.size()); for (String part : parts) { try (StringInputStream documents = new StringInputStream(part)) { UploadDocumentsRequest uploadDocumentsRequest = new UploadDocumentsRequest(). // withDocuments(documents). // withContentLength((long) part.length()). // withContentType(Applicationjson); if (progressListener != null) { uploadDocumentsRequest.setGeneralProgressListener(progressListener); } uploadDocumentRequests.add(uploadDocumentsRequest); } catch (IOException e) { log.warn("this should never happen", e); } } return uploadDocumentRequests; }
@Override public void process(List<Label> labels, String path) { LabelInsertDoc doc = new LabelInsertDoc(labels, path); Logger.Debug("Json to push: \n%s", doc.asJson()); byte[] jsonBytes = doc.asJsonBytes(); UploadDocumentsRequest pushDoc = getUploadReq(jsonBytes); UploadDocumentsResult upRes = searchClient.uploadDocuments(pushDoc); Logger.Debug("Indexed %s, %s", path, upRes.getStatus()); }
private static UploadDocumentsRequest getUploadReq(byte[] doc) { return new UploadDocumentsRequest() .withDocuments(new ByteArrayInputStream(doc)) .withContentLength((long) doc.length) // CS returns a HTML error if not set (and breaks the sdk json parser) .withContentType("application/json"); }
@Override public void commit() throws IOException { // nothing to do if (numDocsInBatch == 0) { return; } // close the array buffer.append(']'); LOG.info("Sending {} docs to CloudSearch", numDocsInBatch); byte[] bb = buffer.toString().getBytes(StandardCharsets.UTF_8); if (dumpBatchFilesToTemp) { try { File temp = File.createTempFile("CloudSearch_", ".json"); FileUtils.writeByteArrayToFile(temp, bb); LOG.info("Wrote batch file {}", temp.getName()); } catch (IOException e1) { LOG.error("Exception while generating batch file", e1); } finally { // reset buffer and doc counter buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('['); numDocsInBatch = 0; } return; } // not in debug mode try (InputStream inputStream = new ByteArrayInputStream(bb)) { UploadDocumentsRequest batch = new UploadDocumentsRequest(); batch.setContentLength((long) bb.length); batch.setContentType(ContentType.Applicationjson); batch.setDocuments(inputStream); UploadDocumentsResult result = client.uploadDocuments(batch); } catch (Exception e) { LOG.error("Exception while sending batch", e); LOG.error(buffer.toString()); } finally { // reset buffer and doc counter buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('['); numDocsInBatch = 0; } }
@Test public void testIndex() throws Exception { when(amazonCloudSearch.describeDomains(any())).thenReturn(new DescribeDomainsResult() .withDomainStatusList(Lists.newArrayList(new DomainStatus().withSearchService(new ServiceEndpoint().withEndpoint("http://localhost"))))); objectMapper = new ObjectMapper(); Model model = new Model(); model.setGuid("00000000-0000-0000-0000-000000000000"); ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class); getService(ModelIndexer.class).index(model); verify(executor).submit(runnableCaptor.capture()); runnableCaptor.getValue().run(); ArgumentCaptor<UploadDocumentsRequest> requestCaptor = ArgumentCaptor.forClass(UploadDocumentsRequest.class); verify(domain).uploadDocuments(requestCaptor.capture()); Scanner scanner = new Scanner(requestCaptor.getValue().getDocuments()); JsonNode json = new ObjectMapper().readTree(scanner.useDelimiter("\\A").next()).get(0); assertEquals("add", json.get("type").asText()); assertEquals("00000000-0000-0000-0000-000000000000", json.get("id").asText()); assertEquals("00000000-0000-0000-0000-000000000000", json.get("fields").get("guid").asText()); scanner.close(); }