private void indexJSONTableDocuments(TransportClient client, String indexName, String typeName, String tablePath, String... fields) { loginTestUser(TEST_USER_NAME, TEST_USER_GROUP); // Create an OJAI connection to MapR cluster Connection connection = DriverManager.getConnection(CONNECTION_URL); // Get an instance of OJAI DocumentStore final DocumentStore store = connection.getStore(tablePath); DocumentStream documentStream = store.find(fields); for (Document document : documentStream) { IndexResponse response = client.prepareIndex(indexName, typeName, document.getId().getString()) .setSource(document.asJsonString(), XContentType.JSON) .get(); log.info("Elasticsearch Index Response: '{}'", response); } // Close this instance of OJAI DocumentStore store.close(); // Close the OJAI connection and release any resources held by the connection connection.close(); }
/** * 批量删除 * * @param transportClient */ private static void batchDelete(TransportClient transportClient) throws IOException { BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk(); DeleteRequestBuilder deleteRequestBuilder1 = transportClient.prepareDelete("product_index", "product", "1"); DeleteRequestBuilder deleteRequestBuilder2 = transportClient.prepareDelete("product_index", "product", "2"); DeleteRequestBuilder deleteRequestBuilder3 = transportClient.prepareDelete("product_index", "product", "3"); bulkRequestBuilder.add(deleteRequestBuilder1); bulkRequestBuilder.add(deleteRequestBuilder2); bulkRequestBuilder.add(deleteRequestBuilder3); BulkResponse bulkResponse = bulkRequestBuilder.get(); for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { logger.info("--------------------------------version= " + bulkItemResponse.getVersion()); } }
private TransportClient getElasticClient() { try { // un-command this, if you have multiple node // TransportClient client1 = new PreBuiltTransportClient(Settings.EMPTY) // .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300)) // .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300)); Settings setting = Settings.builder() .put("cluster.name", elasticPro.getProperty("cluster")) .put("client.transport.sniff", Boolean.valueOf(elasticPro.getProperty("transport.sniff"))).build(); client = new PreBuiltTransportClient(setting) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticPro.getProperty("host")), Integer.valueOf(elasticPro.getProperty("port")))); } catch (UnknownHostException ex) { log.error("Exception occurred while getting Client : " + ex, ex); } return client; }
private TransportClient createClient() { Config cfg = getTypeCfg(); Settings settings = Settings.builder().put("cluster.name", cfg.getString("elastic.cluster-name")).build(); TransportClient client = new PreBuiltTransportClient(settings); List<String> servers = cfg.getStringList("elastic.servers"); logger.debug(marker, "Elastic Servers: {}", servers); for (String addr : servers) { try { String[] a = addr.split(":"); String host = a[0]; int port = Integer.parseInt(a[1]); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); } catch (Exception e) { logger.error(marker, "Transport client creation failed for '{}'", addr, e); } } return client; }
@BeforeClass public static void setUpClass() throws Exception { faker = new Faker(); String ip = esContainer.getContainerIpAddress(); Integer transportPort = esContainer.getMappedPort(9300); MapConfiguration memoryParams = new MapConfiguration(new HashMap<>()); memoryParams.setProperty(CONFIG_ES_CLUSTER_HOST, ip); memoryParams.setProperty(CONFIG_ES_CLUSTER_PORT, transportPort); memoryParams.setProperty(CONFIG_ES_CLUSTER_NAME, "elasticsearch"); injector = Guice.createInjector( Modules.override(new ElasticSearchModule()) .with(binder -> { binder.bind(Configuration.class).toProvider(() -> memoryParams); }) ); productDao = injector.getInstance(ProductDao.class); esClient = injector.getInstance(TransportClient.class); }
public static TransportClient getClient() { try { if (tclient == null) { String EsHosts = "192.168.1.41:9300,192.168.1.42:9300,192.168.1.43:9300"; Settings settings = Settings.settingsBuilder() .put("cluster.name", "dkes")//设置集群名称 .put("tclient.transport.sniff", true).build();//自动嗅探整个集群的状态,把集群中其它机器的ip地址加到客户端中 tclient = TransportClient.builder().settings(settings).build(); String[] nodes = EsHosts.split(","); for (String node : nodes) { if (node.length() > 0) {//跳过为空的node(当开头、结尾有逗号或多个连续逗号时会出现空node) String[] hostPort = node.split(":"); tclient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); } } }//if } catch (Exception e) { e.printStackTrace(); } return tclient; }
/** * 多字段查询 */ public static void multisearch() { try { Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch1").build(); TransportClient transportClient = TransportClient.builder(). settings(settings).build().addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName("172.16.2.93"), 9300)); SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch("service2","clients"); SearchResponse searchResponse = searchRequestBuilder. setQuery(QueryBuilders.boolQuery() .should(QueryBuilders.termQuery("id","5")) .should(QueryBuilders.prefixQuery("content","oracle"))) .setFrom(0).setSize(100).setExplain(true).execute().actionGet(); SearchHits searchHits = searchResponse.getHits(); System.out.println(); System.out.println("Total Hits is " + searchHits.totalHits()); System.out.println(); } catch (Exception e) { e.printStackTrace(); } }
/** * 获取多个对象(根据ID) * * @param transportClient * @throws IOException */ private static void queryByMultiGet(TransportClient transportClient) throws IOException { MultiGetResponse multiGetItemResponses = transportClient.prepareMultiGet() .add("product_index", "product", "1") .add("product_index", "product", "2") .add("product_index", "product", "3") .add("product_index", "product", "4") .add("product_index", "product", "5") .get(); String resultJSON = null; for (MultiGetItemResponse multiGetItemResponse : multiGetItemResponses) { GetResponse getResponse = multiGetItemResponse.getResponse(); if (getResponse.isExists()) { resultJSON = getResponse.getSourceAsString(); } } logger.info("--------------------------------:" + resultJSON); }
/** * Scroll 获取多个对象 * * @param transportClient * @throws IOException */ private static void queryByScroll(TransportClient transportClient) throws IOException { //setSize 是设置每批查询多少条数据 SearchResponse searchResponse = transportClient.prepareSearch("product_index").setTypes("product") .setQuery(QueryBuilders.termQuery("product_name", "飞利浦")) .setScroll(new TimeValue(60000)) .setSize(3) .get(); int count = 0; do { for (SearchHit searchHit : searchResponse.getHits().getHits()) { //打印查询结果,或者做其他处理 logger.info("count=" + ++count); logger.info(searchHit.getSourceAsString()); } searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId()).setScroll(new TimeValue(60000)) .execute() .actionGet(); } while (searchResponse.getHits().getHits().length != 0); }
public BulkProcessor buildBulkProcessor(Context context, TransportClient client) { bulkActions = context.getInteger(ES_BULK_ACTIONS, DEFAULT_ES_BULK_ACTIONS); bulkProcessorName = context.getString(ES_BULK_PROCESSOR_NAME, DEFAULT_ES_BULK_PROCESSOR_NAME); bulkSize = Util.getByteSizeValue(context.getInteger(ES_BULK_SIZE), context.getString(ES_BULK_SIZE_UNIT)); concurrentRequest = context.getInteger(ES_CONCURRENT_REQUEST, DEFAULT_ES_CONCURRENT_REQUEST); flushIntervalTime = Util.getTimeValue(context.getString(ES_FLUSH_INTERVAL_TIME), DEFAULT_ES_FLUSH_INTERVAL_TIME); backoffPolicyTimeInterval = context.getString(ES_BACKOFF_POLICY_TIME_INTERVAL, DEFAULT_ES_BACKOFF_POLICY_START_DELAY); backoffPolicyRetries = context.getInteger(ES_BACKOFF_POLICY_RETRIES, DEFAULT_ES_BACKOFF_POLICY_RETRIES); return build(client); }
@Override public void configure(Context context) { String[] hosts = getHosts(context); if(ArrayUtils.isNotEmpty(hosts)) { TransportClient client = new ElasticsearchClientBuilder( context.getString(PREFIX + ES_CLUSTER_NAME, DEFAULT_ES_CLUSTER_NAME), hosts) .setTransportSniff(context.getBoolean( PREFIX + ES_TRANSPORT_SNIFF, false)) .setIgnoreClusterName(context.getBoolean( PREFIX + ES_IGNORE_CLUSTER_NAME, false)) .setTransportPingTimeout(Util.getTimeValue(context.getString( PREFIX + ES_TRANSPORT_PING_TIMEOUT), DEFAULT_ES_TIME)) .setNodeSamplerInterval(Util.getTimeValue(context.getString( PREFIX + ES_TRANSPORT_NODE_SAMPLER_INTERVAL), DEFAULT_ES_TIME)) .build(); buildIndexBuilder(context); buildSerializer(context); bulkProcessor = new BulkProcessorBuilder().buildBulkProcessor(context, client); } else { logger.error("Could not create transport client, No host exist"); } }
public TransportClient getClient() { if (esClient == null) { synchronized (this) { if (esClient == null) { try { //判断配置 Preconditions.checkNotNull(clusterName, "es 服务clusterName未配置"); Preconditions.checkNotNull(addresses, "es 服务ip未配置"); //Preconditions.checkArgument(esPort > 0, "es 服务服务port未配置"); //设置集群的名字 Settings settings = Settings.settingsBuilder().put("client.node", true).put("cluster.name", clusterName).put("client.transport.sniff", sniff).build(); //Settings settings = Settings.settingsBuilder().put("client.transport.sniff", sniff).build(); //创建集群client并添加集群节点地址 esClient = TransportClient.builder().settings(settings).build(); for (String address : addresses) { String[] inetAddress = address.split(":"); esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(inetAddress[0]), new Integer(inetAddress[1]))); } }catch (Exception e){ LOGGER.error("客户端连接初始化异常",e); } } } } return esClient; }
@Test public void failureHandlerExecutesFailoverForEachBatchItemSeparately() { // given Builder builder = createTestObjectFactoryBuilder(); ClientObjectFactory<TransportClient, BulkRequest> config = builder.build(); FailoverPolicy failoverPolicy = spy(new NoopFailoverPolicy()); String payload1 = "test1"; String payload2 = "test2"; BulkRequest bulk = new BulkRequest() .add(spy(new IndexRequest().source(payload1, XContentType.CBOR))) .add(spy(new IndexRequest().source(payload2, XContentType.CBOR))); // when config.createFailureHandler(failoverPolicy).apply(bulk); // then ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class); verify(failoverPolicy, times(2)).deliver((String) captor.capture()); assertTrue(captor.getAllValues().contains(payload1)); assertTrue(captor.getAllValues().contains(payload2)); }
@Test public void failureHandlerExecutesFailoverForEachBatchItemSeparately() { // given Builder builder = createTestObjectFactoryBuilder(); ClientObjectFactory<TransportClient, BulkRequest> config = builder.build(); FailoverPolicy failoverPolicy = spy(new NoopFailoverPolicy()); String payload1 = "test1"; String payload2 = "test2"; BulkRequest bulk = new BulkRequest() .add(spy(new IndexRequest().source(payload1))) .add(spy(new IndexRequest().source(payload2))); // when config.createFailureHandler(failoverPolicy).apply(bulk); // then ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class); verify(failoverPolicy, times(2)).deliver((String) captor.capture()); assertTrue(captor.getAllValues().contains(payload1)); assertTrue(captor.getAllValues().contains(payload2)); }
/** * create a elasticsearch transport client (remote elasticsearch) * @param addresses an array of host:port addresses * @param clusterName */ public ElasticsearchClient(final String[] addresses, final String clusterName) { // create default settings and add cluster name Settings.Builder settings = Settings.builder() .put("cluster.routing.allocation.enable", "all") .put("cluster.routing.allocation.allow_rebalance", "always"); if (clusterName != null) settings.put("cluster.name", clusterName); // create a client TransportClient tc = new PreBuiltTransportClient(settings.build()); for (String address: addresses) { String a = address.trim(); int p = a.indexOf(':'); if (p >= 0) try { InetAddress i = InetAddress.getByName(a.substring(0, p)); int port = Integer.parseInt(a.substring(p + 1)); tc.addTransportAddress(new InetSocketTransportAddress(i, port)); } catch (UnknownHostException e) { Data.logger.warn("", e); } } this.elasticsearchClient = tc; }
/** * TransportClient provider. * @return TransportClient */ public TransportClient get() { final String hostCsv = configuration.getString(CONFIG_ES_CLUSTER_HOST); final List<String> hosts = Splitter.on(",").splitToList(hostCsv); Preconditions.checkState(!hosts.isEmpty()); final TransportClient transportClient = new PreBuiltTransportClient(esSettings()); final Integer esTransportPort = configuration.getInteger(CONFIG_ES_CLUSTER_PORT, 9300); log.info("connect to elastic search {} on port {} ", hostCsv, esTransportPort); hosts.forEach( host -> transportClient.addTransportAddress( new InetSocketTransportAddress(new InetSocketAddress(host, esTransportPort)) ) ); return transportClient; }
@Override public TransportClient connect() { Settings settings = Settings.builder() .put("cluster.name", "elasticsearch") .put("client.transport.sniff", true).build(); try { client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("event-apptst01.as.it.ubc.ca"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("event-apptst02.as.it.ubc.ca"), 9300)); } catch (UnknownHostException uhe) { logger.error(uhe.toString()); } return client; }
private BulkProcessor build(TransportClient client) { logger.trace("Bulk processor name: [{}] bulkActions: [{}], bulkSize: [{}], flush interval time: [{}]," + " concurrent Request: [{}], backoffPolicyTimeInterval: [{}], backoffPolicyRetries: [{}] ", new Object[]{bulkProcessorName, bulkActions, bulkSize, flushIntervalTime, concurrentRequest, backoffPolicyTimeInterval, backoffPolicyRetries}); return BulkProcessor.builder(client, getListener()) .setName(bulkProcessorName) .setBulkActions(bulkActions) .setBulkSize(bulkSize) .setFlushInterval(flushIntervalTime) .setConcurrentRequests(concurrentRequest) .setBackoffPolicy(BackoffPolicy.exponentialBackoff( Util.getTimeValue(backoffPolicyTimeInterval, DEFAULT_ES_BACKOFF_POLICY_START_DELAY), backoffPolicyRetries)) .build(); }
@Test public void configReturnsACopyOfServerUrisList() { // given Builder builder = createTestObjectFactoryBuilder(); builder.withServerUris("http://localhost:9200;http://localhost:9201;http://localhost:9202"); ClientObjectFactory<TransportClient, BulkRequest> config = builder.build(); // when Collection<String> serverUrisList = config.getServerList(); serverUrisList.add("test"); // then assertNotEquals(serverUrisList.size(), config.getServerList().size()); }
public static void main(String[] args) throws IOException { Log4jESLoggerFactory.getRootLogger().setLevel("ERROR"); Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", "ciphergateway").build(); //Use one and only one Client in your JVM. It's threadsafe. Client client = new TransportClient(settings) .addTransportAddress(new InetSocketTransportAddress("localhost", 9300)); Long start = System.currentTimeMillis(); // exportES(client); // deleteES(client); // importES(client); // searchES(client); exportAndDeleteES(client); client.close(); Long end = System.currentTimeMillis(); System.out.println("用时: " + (end - start) + " ms"); }
public ElasticSearchConnection(String jdbcUrl) { Settings settings = Settings.builder().put("client.transport.ignore_cluster_name", true).build(); try { TransportClient transportClient = new PreBuiltTransportClient(settings); String hostAndPortArrayStr = jdbcUrl.split("/")[2]; String[] hostAndPortArray = hostAndPortArrayStr.split(","); for (String hostAndPort : hostAndPortArray) { String host = hostAndPort.split(":")[0]; String port = hostAndPort.split(":")[1]; transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port))); } client = transportClient; } catch (UnknownHostException e) { e.printStackTrace(); } }
/** * Builds the {@link MaprElasticSearchService} according to the specified properties. * * @return instence of {@link MaprElasticSearchService}, which can be started via {@link MaprElasticSearchService#start()}. * @throws IllegalStateException in case when some of the required properties are missed. */ public MaprElasticSearchService build() { ensureFieldNonNull("port", this.port); ensureFieldNonNull("hostname", this.inetAddress); ensureFieldNonNull("indexName", this.indexName); ensureFieldNonNull("typeName", this.typeName); ensureFieldNonNull("changelog", this.changelog); ensureFieldNonNull("fields", this.fields); return () -> { // Create ES Client TransportClient client = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress(new InetSocketTransportAddress(inetAddress, port)); // Create CDC Listener ChangelogListener listener = ChangelogListenerImpl.forChangelog(changelog); // Set 'onInsert' callback listener.onInsert(new SaveIndexCDCCallback(client)); // Set 'onUpdate' callback listener.onUpdate(new SaveIndexCDCCallback(client)); // Define and set 'onDelete' callback listener.onDelete((id) -> client.prepareDelete(indexName, typeName, id).get()); listener.listen(); }; }
private static TransportClient createTransportClient(ElasticsearchSearchIndexConfiguration config) { Settings settings = tryReadSettingsFromFile(config); if (settings == null) { Settings.Builder settingsBuilder = Settings.builder(); if (config.getClusterName() != null) { settingsBuilder.put("cluster.name", config.getClusterName()); } for (Map.Entry<String, String> esSetting : config.getEsSettings().entrySet()) { settingsBuilder.put(esSetting.getKey(), esSetting.getValue()); } settings = settingsBuilder.build(); } TransportClient transportClient = new PreBuiltTransportClient(settings); for (String esLocation : config.getEsLocations()) { String[] locationSocket = esLocation.split(":"); String hostname; int port; if (locationSocket.length == 2) { hostname = locationSocket[0]; port = Integer.parseInt(locationSocket[1]); } else if (locationSocket.length == 1) { hostname = locationSocket[0]; port = config.getPort(); } else { throw new MemgraphException("Invalid elastic search location: " + esLocation); } InetAddress host; try { host = InetAddress.getByName(hostname); } catch (UnknownHostException ex) { throw new MemgraphException("Could not resolve host: " + hostname, ex); } transportClient.addTransportAddress(new InetSocketTransportAddress(host, port)); } return transportClient; }
@Bean public TransportClient initESClient() throws NumberFormatException, UnknownHostException{ String ip = env.getProperty("spring.es.ip"); String port = env.getProperty("spring.es.port"); String clusterName = env.getProperty("spring.es.cluster_name"); Builder builder = Settings.builder().put("cluster.name", clusterName).put("client.transport.sniff", true); Settings esSettings = builder.build(); TransportClient client = new PreBuiltTransportClient(esSettings); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip), Integer.parseInt(port))); logger.info("ES Client 初始化成功, ip : {}, port : {}, cluster_name : {}", ip, port, clusterName); return client; }
@Override protected void configure() { bind(Configuration.class).toProvider(ConfigurationProvider.class).in(Singleton.class); bind(TransportClient.class).toProvider(TransportClientProvider.class).in(Singleton.class); bind(JsonFormat.Printer.class).toInstance(JsonFormat.printer()); bind(JsonFormat.Parser.class).toInstance(JsonFormat.parser()); }
public void testThatTransportClientCanConnect() throws Exception { Settings settings = Settings.builder() .put("cluster.name", internalCluster().getClusterName()) .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .build(); try (TransportClient transportClient = new MockTransportClient(settings, Netty4Plugin.class)) { transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), randomPort)); ClusterHealthResponse response = transportClient.admin().cluster().prepareHealth().get(); assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN)); } }
@Test public void testPluginInstalled() { try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)) { Settings settings = client.settings(); assertEquals(Netty4Plugin.NETTY_TRANSPORT_NAME, NetworkModule.HTTP_DEFAULT_TYPE_SETTING.get(settings)); assertEquals(Netty4Plugin.NETTY_TRANSPORT_NAME, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING.get(settings)); } }
private static Client startClient(Path tempDir, TransportAddress... transportAddresses) { Settings.Builder builder = Settings.builder() .put("node.name", "qa_smoke_client_" + counter.getAndIncrement()) .put("client.transport.ignore_cluster_name", true) .put(Environment.PATH_HOME_SETTING.getKey(), tempDir); final Collection<Class<? extends Plugin>> plugins; if (random().nextBoolean()) { builder.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME); plugins = Collections.singleton(MockTcpTransportPlugin.class); } else { plugins = Collections.emptyList(); } TransportClient client = new PreBuiltTransportClient(builder.build(), plugins).addTransportAddresses(transportAddresses); logger.info("--> Elasticsearch Java TransportClient started"); Exception clientException = null; try { ClusterHealthResponse health = client.admin().cluster().prepareHealth().get(); logger.info("--> connected to [{}] cluster which is running [{}] node(s).", health.getClusterName(), health.getNumberOfNodes()); } catch (Exception e) { clientException = e; } assumeNoException("Sounds like your cluster is not running at " + clusterAddresses, clientException); return client; }
public Wrapper(Logger logger, Settings settings, ThreadPool threadPool) { this.logger = logger; this.threadPool = threadPool; // Should the action listener be threaded or not by default. Action listeners are automatically threaded for // the transport client in order to make sure client side code is not executed on IO threads. this.threadedListener = TransportClient.CLIENT_TYPE.equals(Client.CLIENT_TYPE_SETTING_S.get(settings)); }
public void testThreadedListeners() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<Throwable> failure = new AtomicReference<>(); final AtomicReference<String> threadName = new AtomicReference<>(); Client client = client(); IndexRequest request = new IndexRequest("test", "type", "1"); if (randomBoolean()) { // set the source, without it, we will have a verification failure request.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); } client.index(request, new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { threadName.set(Thread.currentThread().getName()); latch.countDown(); } @Override public void onFailure(Exception e) { threadName.set(Thread.currentThread().getName()); failure.set(e); latch.countDown(); } }); latch.await(); boolean shouldBeThreaded = TransportClient.CLIENT_TYPE.equals(Client.CLIENT_TYPE_SETTING_S.get(client.settings())); if (shouldBeThreaded) { assertTrue(threadName.get().contains("listener")); } else { assertFalse(threadName.get().contains("listener")); } }
@Bean TransportClient transportClient() throws UnknownHostException { return new PreBuiltTransportClient( Settings.builder() .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "slacklistener") .build() ) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(System.getProperty("elasticsearch.host")), 9300)); }
/** * Open client to elaticsearch cluster * * @param clusterName */ private void openClient(String clusterName) { logger.info("Using ElasticSearch hostnames: {} ", Arrays.toString(serverAddresses)); Settings settings = Settings.builder().put("cluster.name", clusterName).build();; TransportClient transportClient = TransportClient.builder().settings(settings).build(); for (InetSocketTransportAddress host : serverAddresses) { transportClient.addTransportAddress(host); } if (client != null) { client.close(); } client = transportClient; }
public EsStore(String host, int port) { String clusterName = Configuration.getProperties().getString("es_cluster_name", "soundwave"); Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", clusterName) .put("client.transport.sniff", false).build(); esClient = new TransportClient(settings) .addTransportAddress( new InetSocketTransportAddress(host, port)); }
/** * wildcard查询/or条件/and条件 */ public static void wildcardQuery() { try { Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch1").build(); TransportClient transportClient = TransportClient.builder(). settings(settings).build().addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName("172.16.2.94"), 9300)); SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch("sqd.es_start"); // {"query": {"bool": {"must": [{"or": [{"wildcard": {"content": "*oracle*"}},{"wildcard": {"content": "*mysql*"}}]}],"must_not": [],"should": []}},"from": 0, "size": 10, "sort": [],"aggs": {}} SearchResponse searchResponse = searchRequestBuilder. setQuery(QueryBuilders.boolQuery() .must(QueryBuilders.orQuery(QueryBuilders.wildcardQuery("content","*mysql*"), QueryBuilders.wildcardQuery("content","*oracle*"))) .must(QueryBuilders.termQuery("tbool","false"))) .setFrom(0).setSize(100).setExplain(true).execute().actionGet(); SearchHits searchHits = searchResponse.getHits(); System.out.println(); System.out.println("Total Hits is " + searchHits.totalHits()); System.out.println(); for (int i = 0; i < searchHits.getHits().length; ++i) { System.out.println("content is " + searchHits.getHits()[i].getSource().get("content")); } } catch (Exception e) { e.printStackTrace(); } }
/** * json查询 */ public static void jsonquery() { try { Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch1").build(); TransportClient transportClient = TransportClient.builder(). settings(settings).build().addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName("172.16.2.93"), 9300)); SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch("service2"); SearchResponse searchResponse = searchRequestBuilder.setSource("{\n" + "\"query\": {\n" + "\"bool\": {\n" + "\"must\": [\n" + "{\n" + "\"prefix\": {\n" + "\"content\": \"oracle\"\n" + "}\n" + "}\n" + "],\n" + "\"must_not\": [ ],\n" + "\"should\": [ ]\n" + "}\n" + "},\n" + "\"from\": 0,\n" + "\"size\": 10,\n" + "\"sort\": [ ],\n" + "\"aggs\": { }\n" + "}") .get(); SearchHits searchHits = searchResponse.getHits(); System.out.println(); System.out.println("Total Hits is " + searchHits.totalHits()); System.out.println(); for (int i = 0; i < searchHits.getHits().length; ++i) { System.out.println("content is " + searchHits.getHits()[i].getSource().get("content")); } } catch (Exception e) { e.printStackTrace(); } }
/** * 通过json创建index */ public static void createByJson() { try { Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch1").build(); TransportClient client = TransportClient.builder(). settings(settings).build().addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName("172.16.2.93"), 9300)); XContentBuilder mapping = jsonBuilder() .startObject() .startObject("general") .startObject("properties") .startObject("message") .field("type", "string") .field("index", "not_analyzed") .endObject() .startObject("source") .field("type","string") .endObject() .endObject() .endObject() .endObject(); client.admin().indices().prepareCreate("t2") .setSettings(Settings.builder() .put("index.number_of_shards", 3) .put("index.number_of_replicas", 2) ) .get(); client.admin().indices().preparePutMapping("t2") .setType("general") .setSource(mapping) .execute().actionGet(); } catch (Exception e) { e.printStackTrace(); } }
@SuppressWarnings({"unchecked", "resource"}) public static void main(String[] args) throws IOException { // 先构建client,两个参数分别是:cluster.name 固定参数代表后面参数的含义,集群名称 // client.transport.sniff 表示设置自动探查集群的集群节点 Settings settings = Settings.builder() .put("cluster.name", "youmeek-cluster") .put("client.transport.sniff", true) .build(); //单个节点的写法 TransportClient transportClient = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.127"), 9300)); //====================================================== create(transportClient); batchCreate(transportClient); batchUpdate(transportClient); batchDelete(transportClient); update(transportClient); query(transportClient); queryByMatchOneParam(transportClient); queryByMatchMoreParam(transportClient); queryByTerm(transportClient); queryByPrefix(transportClient); queryByBool(transportClient); queryMore(transportClient); queryByMultiGet(transportClient); queryByScroll(transportClient); queryByTemplate(transportClient); delete(transportClient); aggregate(transportClient); //====================================================== transportClient.close(); }
/** * 批量更新 * * @param transportClient */ private static void batchUpdate(TransportClient transportClient) throws IOException { BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk(); UpdateRequestBuilder updateRequestBuilder1 = transportClient.prepareUpdate("product_index", "product", "1") .setDoc(XContentFactory.jsonBuilder() .startObject() .field("product_name", "更新后的商品名称1") .endObject()); UpdateRequestBuilder updateRequestBuilder2 = transportClient.prepareUpdate("product_index", "product", "2") .setDoc(XContentFactory.jsonBuilder() .startObject() .field("product_name", "更新后的商品名称2") .endObject()); UpdateRequestBuilder updateRequestBuilder3 = transportClient.prepareUpdate("product_index", "product", "3") .setDoc(XContentFactory.jsonBuilder() .startObject() .field("product_name", "更新后的商品名称3") .endObject()); bulkRequestBuilder.add(updateRequestBuilder1); bulkRequestBuilder.add(updateRequestBuilder2); bulkRequestBuilder.add(updateRequestBuilder3); BulkResponse bulkResponse = bulkRequestBuilder.get(); for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { logger.info("--------------------------------version= " + bulkItemResponse.getVersion()); } }
/** * 查询 match 单个字段 * * @param transportClient * @throws IOException */ private static void queryByMatchOneParam(TransportClient transportClient) throws IOException { SearchResponse searchResponse = transportClient.prepareSearch("product_index").setTypes("product") .setQuery(QueryBuilders.matchQuery("product_name", "飞利浦")) .get(); for (SearchHit searchHit : searchResponse.getHits().getHits()) { logger.info("--------------------------------:" + searchHit.getSourceAsString()); } }