private static UpdateRequest createUpdateRequest(Object document, Exchange exchange) { UpdateRequest updateRequest = new UpdateRequest(); if (document instanceof byte[]) { updateRequest.doc((byte[]) document); } else if (document instanceof Map) { updateRequest.doc((Map<String, Object>) document); } else if (document instanceof String) { updateRequest.doc((String) document); } else if (document instanceof XContentBuilder) { updateRequest.doc((XContentBuilder) document); } else { return null; } return updateRequest .consistencyLevel(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class)) .parent(exchange.getIn().getHeader( ElasticsearchConstants.PARENT, String.class)) .index(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) .type(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)) .id(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_ID, String.class)); }
private static IndexRequest createIndexRequest(Object document, Exchange exchange) { IndexRequest indexRequest = new IndexRequest(); if (document instanceof byte[]) { indexRequest.source((byte[]) document); } else if (document instanceof Map) { indexRequest.source((Map<String, Object>) document); } else if (document instanceof String) { indexRequest.source((String) document); } else if (document instanceof XContentBuilder) { indexRequest.source((XContentBuilder) document); } else { return null; } return indexRequest .consistencyLevel(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class)) .parent(exchange.getIn().getHeader( ElasticsearchConstants.PARENT, String.class)) .index(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) .type(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)); }
@Converter public static SearchRequest toSearchRequest(Object queryObject, Exchange exchange) { SearchRequest searchRequest = new SearchRequest(exchange.getIn() .getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) .types(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)); // Setup the query object into the search request if (queryObject instanceof byte[]) { searchRequest.source((byte[]) queryObject); } else if (queryObject instanceof Map) { searchRequest.source((Map<String, Object>) queryObject); } else if (queryObject instanceof String) { searchRequest.source((String) queryObject); } else if (queryObject instanceof XContentBuilder) { searchRequest.source((XContentBuilder) queryObject); } else { // Cannot convert the queryObject into SearchRequest return null; } return searchRequest; }
@Converter public static GetRequest toGetRequest(String id, Exchange exchange) { return new GetRequest(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) .type(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)).id(id); }
@Converter public static DeleteRequest toDeleteRequest(String id, Exchange exchange) { return new DeleteRequest() .index(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) .type(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)).id(id); }
@Test public void testIndexContentUsingHeaders() throws Exception { CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to("elasticsearch://local"); } }); camelctx.getComponent("elasticsearch", ElasticsearchComponent.class).setClient(client); camelctx.start(); try { Map<String, String> indexedData = new HashMap<>(); indexedData.put("content", "test"); Map<String, Object> headers = new HashMap<>(); headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); ProducerTemplate template = camelctx.createProducerTemplate(); String indexId = template.requestBodyAndHeaders("direct:start", indexedData, headers, String.class); Assert.assertNotNull("Index id should not be null", indexId); } finally { camelctx.stop(); } }
@Converter public static IndexRequest toIndexRequest(Object document, Exchange exchange) { return createIndexRequest(document, exchange) .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class)); }
@Converter public static UpdateRequest toUpdateRequest(Object document, Exchange exchange) { return createUpdateRequest(document, exchange) .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class)); }
@Converter public static ExistsRequest toExistsRequest(String id, Exchange exchange) { return new ExistsRequest(exchange.getIn().getHeader( ElasticsearchConstants.PARAM_INDEX_NAME, String.class)); }