protected CLIENT_POOL findByContext() { Context currentContext = Vertx.currentContext(); if (currentContext != null && currentContext.owner() == vertx && currentContext.isEventLoopContext()) { // standard reactive mode CLIENT_POOL clientPool = currentContext.get(id); if (clientPool != null) { return clientPool; } // this will make "client.thread-count" bigger than which in microservice.yaml // maybe it's better to remove "client.thread-count", just use "rest/highway.thread-count" return createClientPool(); } // not in correct context: // 1.normal thread // 2.vertx worker thread // 3.other vertx thread // select a existing context return nextPool(); }
@Test public void start(@Mocked Context context) throws Exception { AtomicInteger count = new AtomicInteger(); ClientPoolManager<HttpClientWithContext> clientMgr = new MockUp<ClientPoolManager<HttpClientWithContext>>() { @Mock HttpClientWithContext createClientPool() { count.incrementAndGet(); return null; } }.getMockInstance(); clientVerticle.init(null, context); JsonObject config = new SimpleJsonObject(); config.put(ClientVerticle.CLIENT_MGR, clientMgr); new Expectations() { { context.config(); result = config; } }; clientVerticle.start(); Assert.assertEquals(1, count.get()); }
@Test public void send_inDisconnectedStatus(@Mocked AbstractTcpClientPackage tcpClientPackage, @Mocked TcpOutputStream tcpOutputStream) { long msgId = 1; new Expectations(tcpClientConnection) { { tcpClientPackage.getMsgId(); result = msgId; } }; new MockUp<Context>(context) { @Mock void runOnContext(Handler<Void> action) { action.handle(null); } }; tcpClientConnection.send(tcpClientPackage, ar -> { }); Assert.assertSame(tcpClientPackage, packageQueue.poll()); Assert.assertNull(packageQueue.poll()); Assert.assertEquals(Status.CONNECTING, Deencapsulation.getField(tcpClientConnection, "status")); }
@Test public void send_disconnectedToTryLogin(@Mocked AbstractTcpClientPackage tcpClientPackage, @Mocked TcpOutputStream tcpOutputStream) { long msgId = 1; new Expectations(tcpClientConnection) { { tcpClientPackage.getMsgId(); result = msgId; } }; new MockUp<Context>(context) { @Mock void runOnContext(Handler<Void> action) { Deencapsulation.setField(tcpClientConnection, "status", Status.TRY_LOGIN); action.handle(null); } }; tcpClientConnection.send(tcpClientPackage, ar -> { }); Assert.assertSame(tcpClientPackage, packageQueue.poll()); Assert.assertNull(packageQueue.poll()); Assert.assertEquals(Status.TRY_LOGIN, Deencapsulation.getField(tcpClientConnection, "status")); }
@Test public void send_disconnectedToWorking(@Mocked AbstractTcpClientPackage tcpClientPackage, @Mocked TcpOutputStream tcpOutputStream) { long msgId = 1; new Expectations(tcpClientConnection) { { tcpClientPackage.getMsgId(); result = msgId; } }; new MockUp<Context>(context) { @Mock void runOnContext(Handler<Void> action) { Deencapsulation.setField(tcpClientConnection, "status", Status.WORKING); action.handle(null); } }; tcpClientConnection.send(tcpClientPackage, ar -> { }); Assert.assertNull(writeQueue.poll()); Assert.assertNull(packageQueue.poll()); Assert.assertEquals(Status.WORKING, Deencapsulation.getField(tcpClientConnection, "status")); }
@Test public void createClientPool(@Mocked Vertx vertx, @Mocked Context context, @Mocked HttpClient httpClient) { new Expectations(VertxImpl.class) { { VertxImpl.context(); result = context; context.owner(); result = vertx; vertx.createHttpClient(httpClientOptions); result = httpClient; } }; HttpClientWithContext pool = factory.createClientPool(); Assert.assertSame(context, pool.context()); Assert.assertSame(httpClient, pool.getHttpClient()); }
@Test public void findByContext_otherVertx(@Mocked Vertx otherVertx, @Mocked Context otherContext) { HttpClientWithContext pool = new HttpClientWithContext(null, null); pools.add(pool); new Expectations(VertxImpl.class) { { VertxImpl.context(); result = otherContext; otherContext.owner(); result = otherVertx; } }; Assert.assertSame(pool, poolMgr.findByContext()); }
@Test public void findByContext_woker(@Mocked Context workerContext) { HttpClientWithContext pool = new HttpClientWithContext(null, null); pools.add(pool); new Expectations(VertxImpl.class) { { VertxImpl.context(); result = workerContext; workerContext.owner(); result = vertx; workerContext.isEventLoopContext(); result = false; } }; Assert.assertSame(pool, poolMgr.findByContext()); }
@Test public void testRestServerVerticleWithRouter(@Mocked Transport transport, @Mocked Vertx vertx, @Mocked Context context, @Mocked JsonObject jsonObject, @Mocked Future<Void> startFuture) throws Exception { URIEndpointObject endpointObject = new URIEndpointObject("http://127.0.0.1:8080"); new Expectations() { { transport.parseAddress("http://127.0.0.1:8080"); result = endpointObject; } }; Endpoint endpiont = new Endpoint(transport, "http://127.0.0.1:8080"); new Expectations() { { context.config(); result = jsonObject; jsonObject.getValue(AbstractTransport.ENDPOINT_KEY); result = endpiont; } }; RestServerVerticle server = new RestServerVerticle(); // process stuff done by Expectations server.init(vertx, context); server.start(startFuture); }
@Test public void testRestServerVerticleWithRouterSSL(@Mocked Transport transport, @Mocked Vertx vertx, @Mocked Context context, @Mocked JsonObject jsonObject, @Mocked Future<Void> startFuture) throws Exception { URIEndpointObject endpointObject = new URIEndpointObject("http://127.0.0.1:8080?sslEnabled=true"); new Expectations() { { transport.parseAddress("http://127.0.0.1:8080?sslEnabled=true"); result = endpointObject; } }; Endpoint endpiont = new Endpoint(transport, "http://127.0.0.1:8080?sslEnabled=true"); new Expectations() { { context.config(); result = jsonObject; jsonObject.getValue(AbstractTransport.ENDPOINT_KEY); result = endpiont; } }; RestServerVerticle server = new RestServerVerticle(); // process stuff done by Expectations server.init(vertx, context); server.start(startFuture); }
@Test public void testDoMethodNullPointerException(@Mocked HttpClient httpClient) throws Exception { Context context = new MockUp<Context>() { @Mock public void runOnContext(Handler<Void> action) { action.handle(null); } }.getMockInstance(); HttpClientWithContext httpClientWithContext = new HttpClientWithContext(httpClient, context); Invocation invocation = mock(Invocation.class); AsyncResponse asyncResp = mock(AsyncResponse.class); try { this.doMethod(httpClientWithContext, invocation, asyncResp); fail("Expect to throw NullPointerException, but got none"); } catch (NullPointerException e) { } }
@Override public void init(Vertx vertx, Context context) { super.init(vertx, context); String host = System.getenv(CommonConstants.JDG_SERVICE_HOST_ENV) != null ? System.getenv(CommonConstants.JDG_SERVICE_HOST_ENV) : CommonConstants.JDG_SERVICE_HOST_DEFAULT; String port = System.getenv(CommonConstants.JDG_SERVICE_PORT_ENV) != null ? System.getenv(CommonConstants.JDG_SERVICE_PORT_ENV) : CommonConstants.JDG_SERVICE_PORT_DEFAULT; ConfigurationBuilder builder = new ConfigurationBuilder(); builder.addServers(String.format(JDG_CONNECTION_STRING_FORMAT, host, port)); builder.nearCache().mode(NearCacheMode.INVALIDATED).maxEntries(25); builder.marshaller(new ProtoStreamMarshaller()); cacheManager = new RemoteCacheManager(builder.build()); this.registerProtoBufSchema(); }
@Override protected void schedule(CommandBase<?> cmd) { Context current = Vertx.currentContext(); if (current == context) { pool.acquire(new CommandWaiter() { @Override protected void onSuccess(Connection conn) { // Work around stack over flow context.runOnContext(v -> { conn.schedule(cmd); conn.close(this); }); } @Override protected void onFailure(Throwable cause) { cmd.fail(cause); } }); } else { context.runOnContext(v -> schedule(cmd)); } }
public VertxAsyncLogoutHandler(final Vertx vertx, final Context context, final AsyncConfig<Void, CommonProfile, VertxAsyncWebContext> config, final LogoutHandlerOptions options) { DefaultAsyncLogoutLogic<Void, CommonProfile, VertxAsyncWebContext> defaultApplicationLogoutLogic = new DefaultAsyncLogoutLogic<>(config, httpActionAdapter, options.getDefaultUrl(), options.getLogoutUrlPattern(), options.isLocalLogout(), options.isDestroySession(), options.isCentralLogout()); defaultApplicationLogoutLogic.setProfileManagerFactory(c -> new VertxAsyncProfileManager(c)); this.logoutLogic = defaultApplicationLogoutLogic; this.config = config; this.asynchronousComputationAdapter = new VertxAsynchronousComputationAdapter(vertx, context); }
public VertxAsyncSecurityHandler(final Vertx vertx, final Context context, final AsyncConfig config, final Pac4jAuthProvider authProvider, final SecurityHandlerOptions options) { super(authProvider); CommonHelper.assertNotNull("vertx", vertx); CommonHelper.assertNotNull("context", context); CommonHelper.assertNotNull("config", config); CommonHelper.assertNotNull("config.getClients()", config.getClients()); CommonHelper.assertNotNull("authProvider", authProvider); CommonHelper.assertNotNull("options", options); clientNames = options.getClients(); authorizerName = options.getAuthorizers(); matcherName = options.getMatchers(); multiProfile = options.isMultiProfile(); this.vertx = vertx; this.asynchronousComputationAdapter = new VertxAsynchronousComputationAdapter(vertx, context); this.context = context; this.config = config; final DefaultAsyncSecurityLogic<Void, U , VertxAsyncWebContext> securityLogic = new DefaultAsyncSecurityLogic<Void, U, VertxAsyncWebContext>(options.isSaveProfileInSession(), options.isMultiProfile(), config, httpActionAdapter); securityLogic.setProfileManagerFactory(c -> new VertxAsyncProfileManager(c)); this.securityLogic = securityLogic; }
/** * Test for failing non-blocking synchronous computation with unchecked exception * @param testContext */ @Test(timeout = 1000, expected=IntentionalException.class) public void testFromNonBlockingSynchronousFailureUncheckedException(final TestContext testContext) { final Context context = rule.vertx().getOrCreateContext(); final Async async = testContext.async(); final int input = 1; AsynchronousComputationAdapter.fromNonBlocking(() -> IntentionalException.throwException(input)) .thenAccept(i -> { context.runOnContext(x -> { assertThat(i, is(input + 1)); async.complete(); }); }); }
/** * Test for failing non-blocking synchronous computation with checked exception * @param testContext */ @Test(timeout = 1000, expected=CheckedIntentionalException.class) public void testFromNonBlockingSynchronousFailureCheckedException(final TestContext testContext) { final Context context = rule.vertx().getOrCreateContext(); final Async async = testContext.async(); final int input = 1; AsynchronousComputationAdapter.fromNonBlocking(ExceptionSoftener.softenSupplier(() -> CheckedIntentionalException.throwException())) .thenAccept(i -> { context.runOnContext(x -> { assertThat(i, is(input + 1)); async.complete(); }); }); }
@Test(timeout = 1000) public void testConvertFromBlockingSynchronous(final TestContext testContext) { final Context context = rule.vertx().getOrCreateContext(); Async async = testContext.async(); final int input = 1; new VertxAsynchronousComputationAdapter(rule.vertx(), context) .fromBlocking(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return incrementNow(input); }) .thenAccept(i -> context.runOnContext(x -> { assertThat(i, is(input + 1)); async.complete(); })); }
@Test(timeout = 1000) public void testConvertFromBlockingSynchronousRunnable(final TestContext testContext) { final Context context = rule.vertx().getOrCreateContext(); final Async async = testContext.async(); final AtomicInteger mutable = new AtomicInteger(1); new VertxAsynchronousComputationAdapter(rule.vertx(), context) .fromBlocking(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } mutable.set(10); }) .thenRun(() -> context.runOnContext(v -> { assertThat(mutable.get(), is(10)); async.complete(); })); }
@Test(timeout = 1000) public void testFromNonBlockingSupplierOnContext(final TestContext testContext) { final Context context = rule.vertx().getOrCreateContext(); Async async = testContext.async(); final List<Integer> ints = Arrays.asList(1); final CompletableFuture<Integer> future = new VertxAsynchronousComputationAdapter(rule.vertx(), context) .fromNonBlockingOnContext(() -> { ints.set(0, 2); return 1; }); future.thenAccept(i -> { assertThat(i, is(1)); assertThat(ints, is(Arrays.asList(2))); async.complete(); }); }
@Override public void init(final Vertx vertx, final Context context) { super.init(vertx, context); this.flow = CachingGoogleAuthCodeFlow.create( AUTH_CACHE_TTL, CLIENT_ID, CLIENT_SECRET, ORGANIZATION_ID, REDIRECT_URL ); this.jwtAuth = JwtAuth.create( vertx, KEYSTORE_PATH, KEYSTORE_PASS, ImmutableList.of(nexusDockerHost, nexusHttpHost) ); }
@Validate @Override public void putAdminLoglevel(Level level, String javaPackage, java.util.Map<String, String>okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) throws Exception { try { JsonObject updatedLoggers = LogUtil.updateLogConfiguration(javaPackage, level.name()); OutStream os = new OutStream(); os.setData(updatedLoggers); asyncResultHandler.handle(io.vertx.core.Future.succeededFuture(PutAdminLoglevelResponse.withJsonOK(os))); } catch (Exception e) { asyncResultHandler.handle(io.vertx.core.Future.succeededFuture(PutAdminLoglevelResponse.withPlainInternalServerError("ERROR" + e.getMessage()))); log.error(e.getMessage(), e); } }
@Test public void testBatchHandler(TestContext ctx) throws Exception { String topicName = "testBatchHandler"; String consumerId = topicName; Async batch1 = ctx.async(); AtomicInteger index = new AtomicInteger(); int numMessages = 500; kafkaCluster.useTo().produceStrings(numMessages, batch1::complete, () -> new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement())); batch1.awaitSuccess(10000); Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Context context = vertx.getOrCreateContext(); consumer = createConsumer(context, config); Async batchHandler = ctx.async(); consumer.batchHandler(records -> { ctx.assertEquals(numMessages, records.count()); batchHandler.complete(); }); consumer.exceptionHandler(ctx::fail); consumer.handler(rec -> {}); consumer.subscribe(Collections.singleton(topicName)); }
/** * Creates a new event consumer for a tenant. * * @param context The vert.x context to run all interactions with the server on. * @param clientConfig The configuration properties to use. * @param con The AMQP connection to the server. * @param tenantId The tenant to consumer events for. * @param pathSeparator The address path separator character used by the server. * @param eventConsumer The consumer to invoke with each event received. * @param creationHandler The handler to invoke with the outcome of the creation attempt. * @throws NullPointerException if any of the parameters is {@code null}. */ public static void create( final Context context, final ClientConfigProperties clientConfig, final ProtonConnection con, final String tenantId, final String pathSeparator, final BiConsumer<ProtonDelivery, Message> eventConsumer, final Handler<AsyncResult<MessageConsumer>> creationHandler) { Objects.requireNonNull(context); Objects.requireNonNull(clientConfig); Objects.requireNonNull(con); Objects.requireNonNull(tenantId); Objects.requireNonNull(pathSeparator); Objects.requireNonNull(eventConsumer); Objects.requireNonNull(creationHandler); createConsumer(context, clientConfig, con, tenantId, pathSeparator, EVENT_ADDRESS_TEMPLATE, ProtonQoS.AT_LEAST_ONCE, eventConsumer).setHandler(created -> { if (created.succeeded()) { creationHandler.handle(Future.succeededFuture( new EventConsumerImpl(context, clientConfig, created.result()))); } else { creationHandler.handle(Future.failedFuture(created.cause())); } }); }
/** * Creates a new credentials client for a tenant. * * @param context The vert.x context to run all interactions with the server on. * @param clientConfig The configuration properties to use. * @param con The AMQP connection to the server. * @param tenantId The tenant for which credentials are handled. * @param senderCloseHook A handler to invoke if the peer closes the sender link unexpectedly. * @param receiverCloseHook A handler to invoke if the peer closes the receiver link unexpectedly. * @param creationHandler The handler to invoke with the outcome of the creation attempt. * @throws NullPointerException if any of the parameters is {@code null}. */ public static void create( final Context context, final ClientConfigProperties clientConfig, final ProtonConnection con, final String tenantId, final Handler<String> senderCloseHook, final Handler<String> receiverCloseHook, final Handler<AsyncResult<CredentialsClient>> creationHandler) { LOG.debug("creating new credentials client for [{}]", tenantId); final CredentialsClientImpl client = new CredentialsClientImpl(context, clientConfig, tenantId); client.createLinks(con, senderCloseHook, receiverCloseHook).setHandler(s -> { if (s.succeeded()) { LOG.debug("successfully created credentials client for [{}]", tenantId); creationHandler.handle(Future.succeededFuture(client)); } else { LOG.debug("failed to create credentials client for [{}]", tenantId, s.cause()); creationHandler.handle(Future.failedFuture(s.cause())); } }); }
public Observable<Void> close(SfsVertx vertx) { Context context = vertx.getOrCreateContext(); return aVoid() .doOnNext(aVoid -> checkState(status.compareAndSet(STARTED, STOPPING))) .doOnNext(aVoid -> readOnly.compareAndSet(false, true)) .flatMap(new WaitForActiveWriters(vertx, activeWriters)) .flatMap(new WaitForEmptyWriteQueue(vertx, writeQueueSupport)) .flatMap(aVoid -> RxHelper.executeBlocking(context, vertx.getBackgroundPool(), () -> { try { channel.close(); return (Void) null; } catch (IOException e) { throw new RuntimeException(e); } })) .doOnNext(aVoid -> checkState(status.compareAndSet(STOPPING, STOPPED))); }
public Observable<Void> close(SfsVertx vertx) { Context context = vertx.getOrCreateContext(); return aVoid() .doOnNext(aVoid -> checkState(status.compareAndSet(STARTED, STOPPING) || status.compareAndSet(START_FAILED, STOPPING), "Status was %s expected %s or %s", status.get(), STARTED, START_FAILED)) .doOnNext(aVoid -> readOnly.compareAndSet(false, true)) .flatMap(new WaitForActiveWriters(vertx, activeWriters)) .flatMap(new WaitForEmptyWriteQueue(vertx, writeQueueSupport)) .doOnNext(aVoid -> periodics.forEach(vertx::cancelTimer)) .flatMap(aVoid -> RxHelper.executeBlocking(context, vertx.getBackgroundPool(), () -> { try { if (channel != null) { channel.close(); } return (Void) null; } catch (IOException e) { throw new RuntimeException(e); } })) .doOnNext(aVoid -> checkState(status.compareAndSet(STOPPING, STOPPED))); }
/** * Returns a new CompletableFuture that is asynchronously completed by a action running in the worker thread pool of * Vert.x * <p> * This method is different from {@link CompletableFuture#runAsync(Runnable)} as it does not use a fork join * executor, but the worker thread pool. * * @param context the Vert.x context * @param runnable the action, when its execution completes, it completes the returned CompletableFuture. If the * execution throws an exception, the returned CompletableFuture is completed exceptionally. * @return the new CompletableFuture */ public static VertxCompletableFuture<Void> runBlockingAsync(Context context, Runnable runnable) { Objects.requireNonNull(runnable); VertxCompletableFuture<Void> future = new VertxCompletableFuture<>(Objects.requireNonNull(context)); context.executeBlocking( fut -> { try { runnable.run(); future.complete(null); } catch (Throwable e) { future.completeExceptionally(e); } }, null ); return future; }
@Validate @Override public void getAdminPostgresActiveSessions(String dbname, Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) throws Exception { PostgresClient.getInstance(vertxContext.owner(), "public").select("SELECT pid , usename, " + "application_name, client_addr, client_hostname, " + "query, state from pg_stat_activity where datname='"+dbname+"'", reply -> { if(reply.succeeded()){ OutStream stream = new OutStream(); stream.setData(reply.result().getRows()); asyncResultHandler.handle(io.vertx.core.Future.succeededFuture(GetAdminPostgresActiveSessionsResponse. withJsonOK(stream))); } else{ log.error(reply.cause().getMessage(), reply.cause()); asyncResultHandler.handle(io.vertx.core.Future.failedFuture(reply.cause().getMessage())); } }); }
@Override public Observable<Encrypted> encrypt(VertxContext<Server> vertxContext, byte[] plainBytes) { SfsVertx sfsVertx = vertxContext.vertx(); Context context = sfsVertx.getOrCreateContext(); return defer(() -> RxHelper.executeBlocking(context, sfsVertx.getBackgroundPool(), () -> { String algorithm = AlgorithmName; Future<KeyOperationResult> encrypted = kms.encryptAsync(azureKeyIdentifier, algorithm, plainBytes); try { KeyOperationResult result = encrypted.get(60, SECONDS); CipherText instance = newBuilder() .setAlgorithm(algorithm) .setKeyIdentifier(result.getKid()) .setData(copyFrom(result.getResult())) .build(); return new Encrypted(instance.toByteArray(), format("xppsazure:%s", azureKeyIdentifier)); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new RuntimeException(e); } })); }
public Observable<Void> open(VertxContext<Server> vertxContext, Path workingDirectory) { this.workingDirectory = workingDirectory; this.tmpDirectory = Paths.get(workingDirectory.toString(), "tmp"); this.backupDirectory = Paths.get(workingDirectory.toString(), "backup"); SfsVertx vertx = vertxContext.vertx(); Context context = vertx.getOrCreateContext(); return RxHelper.executeBlocking(context, vertx.getBackgroundPool(), () -> { try { Files.createDirectories(workingDirectory); Files.createDirectories(tmpDirectory); Files.createDirectories(backupDirectory); } catch (IOException e) { throw new RuntimeException(e); } return null; }); }
public Observable<Void> stop(VertxContext<Server> vertxContext) { SfsVertx vertx = vertxContext.vertx(); Context context = vertx.getOrCreateContext(); return Defer.aVoid() .filter(aVoid -> status.compareAndSet(Status.STARTED, Status.STOPPING) || status.compareAndSet(Status.STARTING, Status.STOPPING)) .flatMap(aVoid -> RxHelper.executeBlocking(context, vertx.getBackgroundPool(), (() -> { LOGGER.debug("Stopping Elasticsearch"); if (elasticSearchClient != null) { try { elasticSearchClient.close(); } catch (Throwable e) { LOGGER.warn(e.getLocalizedMessage(), e); } elasticSearchClient = null; } LOGGER.debug("Stopped Elasticsearch"); return (Void) null; })) .doOnNext(aVoid1 -> Preconditions.checkState(status.compareAndSet(Status.STOPPING, Status.STOPPED)))); }
void tenantExists(Context context, String tenantId, Handler<AsyncResult<Boolean>> handler){ /* connect as user in postgres-conf.json file (super user) - so that all commands will be available */ PostgresClient.getInstance(context.owner()).select( "SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = '"+ PostgresClient.convertToPsqlStandard(tenantId) +"');", reply -> { try { if(reply.succeeded()){ handler.handle(io.vertx.core.Future.succeededFuture(reply.result().getResults().get(0).getBoolean(0))); } else { log.error(reply.cause().getMessage(), reply.cause()); handler.handle(io.vertx.core.Future.failedFuture(reply.cause().getMessage())); } } catch (Exception e) { log.error(e.getMessage(), e); handler.handle(io.vertx.core.Future.failedFuture(e.getMessage())); } }); }
/** * Creates a new sender for publishing events to a Hono server. * * @param context The vertx context to run all interactions with the server on. * @param clientConfig The configuration properties to use. * @param con The connection to the Hono server. * @param tenantId The tenant that the events will be published for. * @param deviceId The device that the events will be published for or {@code null} * if the events are going to be be produced by arbitrary devices of the * tenant. * @param closeHook The handler to invoke when the Hono server closes the sender. The sender's * target address is provided as an argument to the handler. * @param creationHandler The handler to invoke with the result of the creation attempt. * @throws NullPointerException if any of context, connection, tenant or handler is {@code null}. * @throws IllegalArgumentException if waitForInitialCredits is {@code < 1}. */ public static void create( final Context context, final ClientConfigProperties clientConfig, final ProtonConnection con, final String tenantId, final String deviceId, final Handler<String> closeHook, final Handler<AsyncResult<MessageSender>> creationHandler) { Objects.requireNonNull(context); Objects.requireNonNull(con); Objects.requireNonNull(tenantId); Objects.requireNonNull(creationHandler); final String targetAddress = getTargetAddress(tenantId, deviceId); createSender(context, clientConfig, con, targetAddress, ProtonQoS.AT_LEAST_ONCE, closeHook).compose(sender -> { return Future.<MessageSender> succeededFuture( new EventSenderImpl(clientConfig, sender, tenantId, targetAddress, context)); }).setHandler(creationHandler); }
@Override public CLIENT_POOL createClientPool() { Context context = Vertx.currentContext(); Vertx vertx = context.owner(); NetClientWrapper netClientWrapper = new NetClientWrapper(vertx, normalClientConfig, sslClientConfig); return doCreateClientPool(context, netClientWrapper); }
public TcpClientConnection(Context context, NetClientWrapper netClientWrapper, String strEndpoint) { this.setContext(context); this.netClientWrapper = netClientWrapper; endpoint = new URIEndpointObject(strEndpoint); this.socketAddress = endpoint.getSocketAddress(); this.remoteSupportLogin = Boolean.parseBoolean(endpoint.getFirst(TcpConst.LOGIN)); this.clientConfig = netClientWrapper.getClientConfig(endpoint.isSslEnabled()); }
public AbstractTcpClientConnectionPool(Context context, NetClientWrapper netClientWrapper) { this.context = context; this.netClientWrapper = netClientWrapper; startCheckTimeout(context); }
@Override public HttpClientWithContext createClientPool() { Context context = Vertx.currentContext(); HttpClient httpClient = context.owner().createHttpClient(httpClientOptions); return new HttpClientWithContext(httpClient, context); }
public static Vertx currentVertx() { Context context = Vertx.currentContext(); if (context == null) { throw new RuntimeException("get currentVertx error, currentContext is null."); } return context.owner(); }
public static <T> void runInContext(Context context, AsyncResultCallback<T> callback, T result, Throwable e) { if (context == Vertx.currentContext()) { complete(callback, result, e); } else { context.runOnContext(v -> complete(callback, result, e)); } }