ReceiverGenerator generateUnregisterMethod() { MethodSpec.Builder unregisterMB = MethodSpec.methodBuilder("unregisterReceiver"); // Generates cosumers waiting Future for success handler unregistration unregisterMB.addAnnotation(Override.class) .addModifiers(Modifier.PUBLIC) .addStatement("return $T.all($N.stream().map((consumer) -> {" + "$T future = $T.future();" + "consumer.unregister(future);" + "return future;" + "}).collect($T.toList()))", CompositeFuture.class, consumersField, ParameterizedTypeName.get(ClassName.get(Future.class), TypeName.get(Void.class)), TypeName.get(Future.class), TypeName.get(Collectors.class) ) .returns(ParameterizedTypeName.get(ClassName.get(Future.class), WildcardTypeName.subtypeOf(Object.class))); tsb.addMethod(unregisterMB.build()); return this; }
private void setupCompletionListeners() { List<Future> futures = new ArrayList<>(); if (config.isTxImport()) { futures.add(txImport); } if (config.isBlockImport()) { futures.add(blockImport); } CompositeFuture.all(futures).setHandler(done -> { if (done.succeeded()) { finishWithSuccess(); } else { cancelImport(null); Form.showAlertFromError(done.cause()); Async.setScene(SETTINGS_FXML); } }); }
private Future<SQLConnection> createSomeDataIfNone(SQLConnection connection) { Future<SQLConnection> future = Future.future(); connection.query("SELECT * FROM Articles", select -> { if (select.failed()) { future.fail(select.cause()); } else { if (select.result().getResults().isEmpty()) { Article article1 = new Article("Fallacies of distributed computing", "https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing"); Article article2 = new Article("Reactive Manifesto", "https://www.reactivemanifesto.org/"); Future<Article> insertion1 = insert(connection, article1, false); Future<Article> insertion2 = insert(connection, article2, false); CompositeFuture.all(insertion1, insertion2) .setHandler(r -> future.handle(r.map(connection))); } else { future.complete(connection); } } }); return future; }
@Test public void fetchOneByConditionWithMultipleMatchesShouldFail() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Future<Integer> insertFuture1 = Future.future(); Future<Integer> insertFuture2 = Future.future(); Something someNewObject = createSomething(); dao.insertReturningPrimaryAsync(someNewObject,insertFuture1); dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(someNewObject.getSomehugenumber()),insertFuture2); CompositeFuture.all(insertFuture1,insertFuture2). setHandler(consumeOrFailHandler(v->{ dao.fetchOneAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(someNewObject.getSomehugenumber()),h->{ Assert.assertNotNull(h.cause()); //cursor fetched more than one row Assert.assertEquals(TooManyRowsException.class, h.cause().getClass()); dao.deleteExecAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(someNewObject.getSomehugenumber()),countdownLatchHandler(latch)); }); })); await(latch); }
@Test public void fetchByConditionWithMultipleMatchesShouldSucceed() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Future<Integer> insertFuture1 = Future.future(); Future<Integer> insertFuture2 = Future.future(); Something someNewObject = createSomething(); dao.insertReturningPrimaryAsync(someNewObject,insertFuture1); dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(someNewObject.getSomehugenumber()),insertFuture2); CompositeFuture.all(insertFuture1, insertFuture2). setHandler(consumeOrFailHandler(v -> { dao.fetchAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(someNewObject.getSomehugenumber()), h -> { Assert.assertNotNull(h.result()); //cursor fetched more than one row Assert.assertEquals(2, h.result().size()); dao.deleteExecAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(someNewObject.getSomehugenumber()), countdownLatchHandler(latch)); }); })); await(latch); }
public void generate(Message<JsonObject> message) { JsonObject metadata = message.body(); String build = metadata.getString("build", "maven"); String language = metadata.getString("language", "java"); //Act as a activation flags in .gitignore metadata.put(build, true); metadata.put(language, true); String baseDir = metadata.getString("baseDir"); CompositeFuture.all( generateFile(metadata, baseDir, BUILD.get(build)), generateFile(metadata, baseDir, LANGUAGES.get(language)), generateFile(metadata, baseDir, ".gitignore"), generateFile(metadata, baseDir, ".editorconfig") ).setHandler(ar -> { if (ar.failed()) { log.error("Impossible to generate project {} : {}", metadata, ar.cause().getMessage()); message.fail(500, ar.cause().getMessage()); } else { message.reply(null); } }); }
private void startInstances(Future<Void> future, RealmContext context) { List<Future> futures = new ArrayList<>(); for (InstanceSettings instance : context.instances()) { Future deploy = Future.future(); futures.add(deploy); context.handler(() -> new InstanceHandler(new InstanceContext(context, instance))).setHandler((done) -> { if (done.succeeded()) { deploy.complete(); } else { context.onInstanceFailed(instance.getName(), done.cause()); deploy.fail(done.cause()); } }); } CompositeFuture.all(futures).setHandler(done -> { if (done.succeeded()) { future.complete(); } else { future.fail(done.cause()); } }); }
@Test public void testWithFailedCommands(TestContext tc) { breaker = CircuitBreaker.create("some-circuit-breaker", vertx); Async async = tc.async(); Future<Void> command1 = breaker.execute(commandThatFails()); Future<Void> command2 = breaker.execute(commandThatWorks()); Future<Void> command3 = breaker.execute(commandThatWorks()); Future<Void> command4 = breaker.execute(commandThatFails()); CompositeFuture.join(command1, command2, command3, command4) .setHandler(ar -> { assertThat(metrics()) .contains("name", "some-circuit-breaker") .contains("state", CircuitBreakerState.CLOSED.name()) .contains("totalErrorCount", 2) // Failure + Timeout + Exception .contains("totalSuccessCount", 2) .contains("totalTimeoutCount", 0) .contains("totalExceptionCount", 0) .contains("totalFailureCount", 2) .contains("totalOperationCount", 4) .contains("totalSuccessPercentage", 50) .contains("totalErrorPercentage", 50); async.complete(); }); }
@Test public void testEviction(TestContext tc) { breaker = CircuitBreaker.create("some-circuit-breaker", vertx, new CircuitBreakerOptions().setMetricsRollingWindow(10)); Async async = tc.async(); int count = 1000; List<Future> list = new ArrayList<>(); for (int i = 0; i < count; i++) { list.add(breaker.execute(commandThatWorks())); } CompositeFuture.all(list) .setHandler(ar -> { assertThat(ar).succeeded(); assertThat(metrics().getInteger("totalOperationCount")).isEqualTo(1000); assertThat(metrics().getInteger("rollingOperationCount")).isLessThan(1000); async.complete(); }); }
private void doSubCmd(ApiDiscovery discovery, List<ApiDefinition> definitions, ApiSubCmd subCmd, JsonObject jsonObject, Future<JsonObject> complete) { List<Future> futures = new ArrayList<>(); for (ApiDefinition definition : definitions) { Future<Void> future = Future.future(); futures.add(future); subCmd.handle(definition, jsonObject); discovery.publish(definition, ar -> { if (ar.succeeded()) { future.complete(); } else { future.fail(ar.cause()); } }); } CompositeFuture.all(futures) .setHandler(ar -> { if (ar.succeeded()) { complete.complete(succeedResult()); } else { complete.fail(ar.cause()); } }); }
private void deleteByName(ApiDiscovery discovery, List<String> names, Future<JsonObject> complete) { List<Future> futures = new ArrayList<>(); for (String name : names) { Future<Void> future = Future.future(); futures.add(future); discovery.unpublish(name, ar -> { if (ar.succeeded()) { future.complete(); } else { future.fail(ar.cause()); } }); } CompositeFuture.all(futures) .setHandler(ar -> { if (ar.succeeded()) { complete.complete(succeedResult()); } else { complete.fail(ar.cause()); } }); }
private synchronized void unregisterAllServices(Future<Void> completed) { List<Future> list = new ArrayList<>(); imports.forEach(svc -> { Future<Void> unreg = Future.future(); svc.unregister(publisher, unreg); list.add(unreg); }); CompositeFuture.all(list).setHandler(x -> { if (x.failed()) { completed.fail(x.cause()); } else { completed.complete(); } }); }
public void restartModules(Handler<ExtendedAsyncResult<Void>> fut) { deploymentStore.getAll(res1 -> { if (res1.failed()) { fut.handle(new Failure<>(res1.getType(), res1.cause())); } else { List<Future> futures = new LinkedList<>(); for (DeploymentDescriptor dd : res1.result()) { Future<DeploymentDescriptor> f = Future.future(); addAndDeploy1(dd, f::handle); futures.add(f); } CompositeFuture.all(futures).setHandler(res2 -> { if (res2.failed()) { fut.handle(new Failure<>(INTERNAL, res2.cause())); } else { fut.handle(new Success<>()); } }); } }); }
private void getKeys2(String val, Handler<ExtendedAsyncResult<Collection<String>>> fut) { Collection<String> result = new TreeSet<>(); if (val == null || val.isEmpty()) { fut.handle(new Success<>(result)); } else { KeyList keys = Json.decodeValue(val, KeyList.class); List<Future> futures = new LinkedList<>(); for (String k : keys.keys) { Future<Void> f = Future.future(); list.get(k, res -> { if (res.failed()) { f.handle(Future.failedFuture(res.cause())); } else { String v = res.result(); if (v != null) { result.add(k); } f.handle(Future.succeededFuture()); } }); futures.add(f); } CompositeFuture.all(futures).setHandler(res -> { if (res.failed()) { fut.handle(new Failure<>(INTERNAL, res.cause())); } else { fut.handle(new Success<>(result)); } }); } }
private synchronized void unregisterAllServices(Future<Void> done) { List<Future> list = new ArrayList<>(); new HashSet<>(registrations).forEach(reg -> { Future<Void> unreg = Future.future(); publisher.unpublish(reg.record().getRegistration(), unreg.completer()); }); registrations.clear(); CompositeFuture.all(list).setHandler(x -> { if (x.failed()) { done.fail(x.cause()); } else { done.complete(); } }); }
@Override public void close(Handler<Void> completionHandler) { List<Future> list = new ArrayList<>(); for (Record record : records) { publisher.unpublish(record.getRegistration(), v -> list.add(v.succeeded() ? Future.succeededFuture() : Future.failedFuture(v.cause()))); } CompositeFuture.all(list).setHandler(ar -> { if (ar.succeeded()) { LOGGER.info("Successfully closed the service importer " + this); } else { LOGGER.error("A failure has been caught while stopping " + this, ar.cause()); } if (completionHandler != null) { completionHandler.handle(null); } } ); }
@Override public void start(Future<Void> startFuture) throws Exception { DeploymentOptionsParser.parseVerticleDeploymentOptionsJsonFile(); // We can safely deploy Command, EventStore, and Query in parallel. // Facade must be the last one. Future<Void> startEventStoreFuture = startVerticle(this.vertx, Verticle.EventStore); Future<Void> startCommandFuture = startVerticle(this.vertx, Verticle.Command); Future<Void> startQueryFuture = startVerticle(this.vertx, Verticle.Query); CompositeFuture.all(startCommandFuture, startEventStoreFuture, startQueryFuture).setHandler(ar -> { if (ar.succeeded()) { startFuture.setHandler(startVerticle(this.vertx, Verticle.Facade).completer()); } else { startFuture.fail(ar.cause()); }} ); }
private void closeAllDeployments() { LOG.info("Undeploying verticles"); List<Future> futures = new LinkedList<>(); this.verticleDeployments.forEach((verticleName, deploymentID) -> { if (deploymentID != null && vertx.deploymentIDs().contains(deploymentID)) { LOG.info("Undeploying {} with ID: {}", verticleName, deploymentID); Future<Void> future = Future.future(); vertx.undeploy(deploymentID, future.completer()); futures.add(future); } }); CompositeFuture.all(futures).setHandler(ar -> { if (ar.succeeded()) { LOG.info("Undeployed all verticles"); } else { LOG.error("Failed to undeploy some verticles", ar.cause()); } }); }
/** * Get both Hono clients and connect them to Hono's microservices. * * @return The result of the creation and connection of the Hono clients. */ private Future<Void> getHonoClients() { // we need two clients to get it working, define futures for them final Future<RegistrationClient> registrationClientTracker = getRegistrationClient(); final Future<MessageSender> messageSenderTracker = getMessageSender(); final Future<Void> result = Future.future(); CompositeFuture.all(registrationClientTracker, messageSenderTracker).setHandler(s -> { if (result.failed()) { System.err.println( "hono clients could not be created : " + s.cause().getMessage()); result.fail(s.cause()); } else { registrationClient = registrationClientTracker.result(); messageSender = messageSenderTracker.result(); result.complete(); } }); return result; }
@Override protected final Future<Void> deployRequiredVerticles(int maxInstances) { Future<Void> result = Future.future(); CompositeFuture.all( deployAuthenticationService(), // we only need 1 authentication service deployRegistrationService(), deployCredentialsService()).setHandler(ar -> { if (ar.succeeded()) { result.complete(); } else { result.fail(ar.cause()); } }); return result; }
/** * Deploys the server to vert.x. * * @param context The vert.x test context. */ @BeforeClass public static void setUp(final TestContext context) { final Future<String> restServerDeploymentTracker = Future.future(); final Future<String> credentialsServiceDeploymentTracker = Future.future(); credentialsService = new FileBasedCredentialsService(); credentialsService.setConfig(new FileBasedCredentialsConfigProperties()); vertx.deployVerticle(credentialsService, credentialsServiceDeploymentTracker.completer()); final ServiceConfigProperties restServerProps = new ServiceConfigProperties(); restServerProps.setInsecurePortEnabled(true); restServerProps.setInsecurePort(0); restServerProps.setInsecurePortBindAddress(HOST); deviceRegistryRestServer = new DeviceRegistryRestServer(); deviceRegistryRestServer.addEndpoint(new CredentialsHttpEndpoint(vertx)); deviceRegistryRestServer.setConfig(restServerProps); vertx.deployVerticle(deviceRegistryRestServer, restServerDeploymentTracker.completer()); CompositeFuture.all(restServerDeploymentTracker, credentialsServiceDeploymentTracker) .setHandler(context.asyncAssertSuccess()); }
private static Future<Integer> addMultipleCredentials(final List<JsonObject> credentialsList) { final Future<Integer> result = Future.future(); @SuppressWarnings("rawtypes") final List<Future> addTrackers = new ArrayList<>(); for (JsonObject creds : credentialsList) { addTrackers.add(addCredentials(creds)); } CompositeFuture.all(addTrackers).setHandler(r -> { if (r.succeeded()) { result.complete(HttpURLConnection.HTTP_CREATED); } else { result.fail(r.cause()); } }); return result; }
@Override public void doStop(final Future<Void> stopFuture) { Future<Void> serverTracker = Future.future(); if (this.server != null) { this.server.close(serverTracker.completer()); } else { serverTracker.complete(); } Future<Void> insecureServerTracker = Future.future(); if (this.insecureServer != null) { this.insecureServer.close(insecureServerTracker.completer()); } else { insecureServerTracker.complete(); } CompositeFuture.all(serverTracker, insecureServerTracker) .compose(d -> stopFuture.complete(), stopFuture); }
@Override public final void doStart(final Future<Void> startFuture) { checkPortConfiguration() .compose(s -> preStartup()) .compose(s -> { Router router = createRouter(); if (router == null) { return Future.failedFuture("no router configured"); } else { addRoutes(router); return CompositeFuture.all(bindSecureHttpServer(router), bindInsecureHttpServer(router)); } }) .compose(s -> { try { onStartupSuccess(); startFuture.complete(); } catch (Exception e) { LOG.error("error in onStartupSuccess", e); startFuture.fail(e); } }, startFuture); }
private Future<Router> startEndpoints() { final Future<Router> startFuture = Future.future(); final Router router = createRouter(); if (router == null) { startFuture.fail("no router configured"); } else { addEndpointRoutes(router); addCustomRoutes(router); @SuppressWarnings("rawtypes") List<Future> endpointFutures = new ArrayList<>(endpoints.size()); for (HttpEndpoint ep : endpoints) { LOG.info("starting endpoint [name: {}, class: {}]", ep.getName(), ep.getClass().getName()); endpointFutures.add(ep.start()); } CompositeFuture.all(endpointFutures).setHandler(startup -> { if (startup.succeeded()) { startFuture.complete(router); } else { startFuture.fail(startup.cause()); } }); } return startFuture; }
private Future<Void> stopEndpoints() { final Future<Void> stopFuture = Future.future(); @SuppressWarnings("rawtypes") List<Future> endpointFutures = new ArrayList<>(endpoints.size()); for (HttpEndpoint ep : endpoints) { LOG.info("stopping endpoint [name: {}, class: {}]", ep.getName(), ep.getClass().getName()); endpointFutures.add(ep.stop()); } CompositeFuture.all(endpointFutures).setHandler(shutdown -> { if (shutdown.succeeded()) { stopFuture.complete(); } else { stopFuture.fail(shutdown.cause()); } }); return stopFuture; }
private CompositeFuture closeServiceClients() { Future<Void> messagingTracker = Future.future(); if (messaging == null) { messagingTracker.complete(); } else { messaging.shutdown(messagingTracker.completer()); } Future<Void> registrationTracker = Future.future(); if (registration == null) { registrationTracker.complete(); } else { registration.shutdown(registrationTracker.completer()); } Future<Void> credentialsTracker = Future.future(); if (credentialsAuthProvider == null) { credentialsTracker.complete(); } else { credentialsTracker = credentialsAuthProvider.stop(); } return CompositeFuture.all(messagingTracker, registrationTracker, credentialsTracker); }
private Future<Void> startEndpoints() { @SuppressWarnings("rawtypes") List<Future> endpointFutures = new ArrayList<>(endpoints.size()); for (AmqpEndpoint ep : endpoints.values()) { LOG.info("starting endpoint [name: {}, class: {}]", ep.getName(), ep.getClass().getName()); endpointFutures.add(ep.start()); } final Future<Void> startFuture = Future.future(); CompositeFuture.all(endpointFutures).setHandler(startup -> { if (startup.succeeded()) { startFuture.complete(); } else { startFuture.fail(startup.cause()); } }); return startFuture; }
private Future<Void> stopEndpoints() { @SuppressWarnings("rawtypes") List<Future> endpointFutures = new ArrayList<>(endpoints.size()); for (AmqpEndpoint ep : endpoints.values()) { LOG.info("stopping endpoint [name: {}, class: {}]", ep.getName(), ep.getClass().getName()); endpointFutures.add(ep.stop()); } final Future<Void> stopFuture = Future.future(); CompositeFuture.all(endpointFutures).setHandler(shutdown -> { if (shutdown.succeeded()) { stopFuture.complete(); } else { stopFuture.fail(shutdown.cause()); } }); return stopFuture; }
/** * Requests to load the data with the specified key asynchronously, and returns a future of the resulting value. * <p> * If batching is enabled (the default), you'll have to call {@link DataLoader#dispatch()} at a later stage to * start batch execution. If you forget this call the future will never be completed (unless already completed, * and returned from cache). * * @param key the key to load * @return the future of the value */ public Future<V> load(K key) { Objects.requireNonNull(key, "Key cannot be null"); Object cacheKey = getCacheKey(key); if (loaderOptions.cachingEnabled() && futureCache.containsKey(cacheKey)) { return futureCache.get(cacheKey); } Future<V> future = Future.future(); if (loaderOptions.batchingEnabled()) { loaderQueue.put(key, future); } else { CompositeFuture compositeFuture = batchLoadFunction.load(Collections.singleton(key)); if (compositeFuture.succeeded()) { future.complete(compositeFuture.result().resultAt(0)); } else { future.fail(compositeFuture.cause()); } } if (loaderOptions.cachingEnabled()) { futureCache.set(cacheKey, future); } return future; }
/** * Dispatches the queued load requests to the batch execution function and returns a composite future of the result. * <p> * If batching is disabled, or there are no queued requests, then a succeeded composite future is returned. * * @return the composite future of the queued load requests */ public CompositeFuture dispatch() { if (!loaderOptions.batchingEnabled() || loaderQueue.size() == 0) { return CompositeFuture.join(Collections.emptyList()); } CompositeFuture batch = batchLoadFunction.load(loaderQueue.keySet()); dispatchedQueues.put(batch, new LinkedHashMap<>(loaderQueue)); batch.setHandler(rh -> { AtomicInteger index = new AtomicInteger(0); dispatchedQueues.get(batch).forEach((key, future) -> { if (batch.succeeded(index.get())) { future.complete(batch.resultAt(index.get())); } else { future.fail(batch.cause(index.get())); } index.incrementAndGet(); }); dispatchedQueues.remove(batch); }); loaderQueue.clear(); return batch; }
@Test public void should_Build_a_really_really_simple_data_loader() { AtomicBoolean success = new AtomicBoolean(); DataLoader<Integer, Integer> identityLoader = new DataLoader<>(keys -> CompositeFuture.join(keys.stream() .map(Future::succeededFuture) .collect(Collectors.toCollection(ArrayList::new)))); Future<Integer> future1 = identityLoader.load(1); future1.setHandler(rh -> { assertThat(rh.result(), equalTo(1)); success.set(rh.succeeded()); }); identityLoader.dispatch(); await().untilAtomic(success, is(true)); }
@Test public void should_Support_loading_multiple_keys_in_one_call() { AtomicBoolean success = new AtomicBoolean(); DataLoader<Integer, Integer> identityLoader = new DataLoader<>(keys -> CompositeFuture.join(keys.stream() .map(Future::succeededFuture) .collect(Collectors.toCollection(ArrayList::new)))); CompositeFuture futureAll = identityLoader.loadMany(asList(1, 2)); futureAll.setHandler(rh -> { assertThat(rh.result().size(), is(2)); success.set(rh.succeeded()); }); identityLoader.dispatch(); await().untilAtomic(success, is(true)); assertThat(futureAll.list(), equalTo(asList(1, 2))); }
@Test public void should_Cache_on_redispatch() { ArrayList<Collection> loadCalls = new ArrayList<>(); DataLoader<String, String> identityLoader = idLoader(new DataLoaderOptions(), loadCalls); Future<String> future1 = identityLoader.load("A"); identityLoader.dispatch(); CompositeFuture future2 = identityLoader.loadMany(asList("A", "B")); identityLoader.dispatch(); await().until(() -> future1.isComplete() && future2.isComplete()); assertThat(future1.result(), equalTo("A")); assertThat(future2.list(), equalTo(asList("A", "B"))); assertThat(loadCalls, equalTo(asList(asList("A"), asList("B")))); }
@Test public void should_Represent_failures_and_successes_simultaneously() { AtomicBoolean success = new AtomicBoolean(); ArrayList<Collection> loadCalls = new ArrayList<>(); DataLoader<Integer, Integer> evenLoader = idLoaderWithErrors(new DataLoaderOptions(), loadCalls); Future<Integer> future1 = evenLoader.load(1); Future<Integer> future2 = evenLoader.load(2); Future<Integer> future3 = evenLoader.load(3); Future<Integer> future4 = evenLoader.load(4); CompositeFuture result = evenLoader.dispatch(); result.setHandler(rh -> success.set(true)); await().untilAtomic(success, is(true)); assertThat(future1.failed(), is(true)); assertThat(future1.cause(), instanceOf(IllegalStateException.class)); assertThat(future2.result(), equalTo(2)); assertThat(future3.failed(), is(true)); assertThat(future4.result(), equalTo(4)); assertThat(loadCalls, equalTo(Collections.singletonList(asList(1, 2, 3, 4)))); }
@Test public void should_Batch_loads_occurring_within_futures() { ArrayList<Collection> loadCalls = new ArrayList<>(); DataLoader<String, String> identityLoader = idLoader(DataLoaderOptions.create(), loadCalls); Future.<String>future().setHandler(rh -> { identityLoader.load("a"); Future.future().setHandler(rh2 -> { identityLoader.load("b"); Future.future().setHandler(rh3 -> { identityLoader.load("c"); Future.future().setHandler(rh4 -> identityLoader.load("d")).complete(); }).complete(); }).complete(); }).complete(); CompositeFuture composite = identityLoader.dispatch(); await().until((Callable<Boolean>) composite::isComplete); assertThat(loadCalls, equalTo( Collections.singletonList(asList("a", "b", "c", "d")))); }
@Override public void stop(Future<Void> future) throws Exception { // In current design, the publisher is responsible for removing the service List<Future> futures = new ArrayList<>(); registeredRecords.forEach(record -> { Future<Void> cleanupFuture = Future.future(); futures.add(cleanupFuture); discovery.unpublish(record.getRegistration(), cleanupFuture.completer()); }); if (futures.isEmpty()) { discovery.close(); future.complete(); } else { CompositeFuture.all(futures) .setHandler(ar -> { discovery.close(); if (ar.failed()) { future.fail(ar.cause()); } else { future.complete(); } }); } }
private void deployVerticles(Future<Void> startPromise, Deployment ... deployments) { List<Future> futures = new ArrayList<>(); for (Deployment deployment : deployments) { Future<Void> promise = Future.future(); futures.add(promise); vertx.deployVerticle(deployment.verticle, deployment.options, result -> { if (result.succeeded()) { promise.complete(); } else { promise.fail(result.cause()); } }); } CompositeFuture.all(futures).setHandler(result -> { if (result.succeeded()) { startPromise.complete(); } else { startPromise.fail(result.cause()); } }); }
@Test public void fetchOneByConditionWithMultipleMatchesShouldFail() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Future<Integer> insertFuture1 = Future.future(); Future<Integer> insertFuture2 = Future.future(); dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(1L),insertFuture1); dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(1L),insertFuture2); CompositeFuture.all(insertFuture1,insertFuture2). setHandler(consumeOrFailHandler(v->{ dao.fetchOneAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(1L),h->{ Assert.assertNotNull(h.cause()); //cursor fetched more than one row Assert.assertEquals(TooManyRowsException.class, h.cause().getClass()); dao.deleteExecAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(1L),countdownLatchHandler(latch)); }); })); await(latch); }
@Test public void fetchByConditionWithMultipleMatchesShouldSucceed() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Future<Integer> insertFuture1 = Future.future(); Future<Integer> insertFuture2 = Future.future(); dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(1L),insertFuture1); dao.insertReturningPrimaryAsync(createSomething().setSomehugenumber(1L),insertFuture2); CompositeFuture.all(insertFuture1, insertFuture2). setHandler(consumeOrFailHandler(v -> { dao.fetchAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(1L), h -> { Assert.assertNotNull(h.result()); //cursor fetched more than one row Assert.assertEquals(2, h.result().size()); dao.deleteExecAsync(Tables.SOMETHING.SOMEHUGENUMBER.eq(1L), countdownLatchHandler(latch)); }); })); await(latch); }