/** * Registers a handler for accessing single configuration keys (input: String, reply type: String). If no * config value is present the consumer will reply with a NOT_FOUND failure. * @param address the event bus address to register. * @param eventBus the event bus. * @return the consumer registered. */ public static MessageConsumer<String> registerSingleConfigEntryProvider(String address, EventBus eventBus){ MessageConsumer<String> consumer = eventBus.consumer(address); consumer.handler(h -> { String key = (String) h.body(); if (key == null) { h.fail(HttpResponseStatus.BAD_REQUEST.code(), "Missing config key."); } else { String value = ConfigurationProvider.getConfiguration().getOrDefault(key, null); if (value != null) { h.reply(value); } else { h.fail(HttpResponseStatus.NOT_FOUND.code(), "Config key not found: " + key); } } }); return consumer; }
/** * Registers a handler for accessing multiple configuration keys (input: String[] (Json), * reply type: Map<String,String></String,String> (Json). * @param address the event bus address to register. * @param eventBus the event bus. * @return the consumer registered. */ public static MessageConsumer<String> registerMultiConfigEntryProvider(String address, EventBus eventBus){ MessageConsumer<String> consumer = eventBus.consumer(address); consumer.handler(h -> { String val = h.body(); Configuration config = ConfigurationProvider.getConfiguration(); Map<String,String> entries = new TreeMap<>(); if(val!=null){ String[] sections = Json.decodeValue(val, String[].class); for (String section : sections) { if(section!=null) { entries.putAll(config.with(ConfigurationFunctions.section(section)).getProperties()); } } }else{ entries.putAll(config.getProperties()); } h.reply(Json.encode(entries)); }); return consumer; }
@Override public void start() throws Exception { super.start(); client = JDBCClient.create(vertx, dataSource); eventBus.registerDefaultCodec(Book.class, new BookEncoder()); eventBus.registerDefaultCodec(Recipe.class, new RecipeEncoder()); eventBus.registerDefaultCodec((Class<ArrayList<Book>>) (Class<?>) ArrayList.class, new ListOfBookEncoder()); initDb(); MessageConsumer<Object> read = eventBus.consumer("de.nierbeck.vertx.jdbc.read"); //TODO: move those addresses to a "global" dict. MessageConsumer<Object> write = eventBus.consumer("de.nierbeck.vertx.jdbc.write.add"); MessageConsumer<Object> update = eventBus.consumer("de.nierbeck.vertx.jdbc.write.update"); MessageConsumer<Object> delete = eventBus.consumer("de.nierbeck.vertx.jdbc.delete"); read.handler(this::handleRead); write.handler(this::handleWrite); update.handler(this::handleUpdate); delete.handler(this::handleDelete); }
public void example2(ServiceDiscovery discovery) { // Get the record discovery.getRecord(new JsonObject().put("name", "some-message-source-service"), ar -> { if (ar.succeeded() && ar.result() != null) { // Retrieve the service reference ServiceReference reference = discovery.getReference(ar.result()); // Retrieve the service object MessageConsumer<JsonObject> consumer = reference.getAs(MessageConsumer.class); // Attach a message handler on it consumer.handler(message -> { // message handler JsonObject payload = message.body(); }); } }); }
@Override public void start(Future<Void> future) throws Exception { super.start(); MessageSource.<JsonObject>getConsumer(discovery, new JsonObject().put("name", "shopping-order-message-source"), ar -> { if (ar.succeeded()) { MessageConsumer<JsonObject> orderConsumer = ar.result(); orderConsumer.handler(message -> { Order wrappedOrder = wrapRawOrder(message.body()); dispatchOrder(wrappedOrder, message); }); future.complete(); } else { future.fail(ar.cause()); } }); }
/** * Registers a schema definition created by the * {@link GraphQLService}. * <p> * The provided registration is cloned, completed with publisher-related information, registered and then returned. * * @param partialRegistration the partially completed schema registration * @param options the service discovery options to add * @param publishedHandler the event handler to invoke on schema published events * @param unpublishedHandler the event handler to invoke on schema unpublished events * @return the completed schema registration */ protected SchemaRegistration register( SchemaRegistration partialRegistration, ServiceDiscoveryOptions options, SchemaPublishedHandler<SchemaRegistration> publishedHandler, SchemaUnpublishedHandler<SchemaRegistration> unpublishedHandler) { // First start listening to schema events. registerSchemaEventConsumers(options, publishedHandler, unpublishedHandler); // Then register service consumer created from schema definition, if it was not registered yet. MessageConsumer<JsonObject> serviceConsumer = registerSchemaServiceConsumer( partialRegistration.getRecord(), partialRegistration.getSchemaDefinition()); // Complete the schema registration SchemaRegistration fullRegistration = SchemaRegistration.create(partialRegistration.getDiscovery(), options, partialRegistration.getRecord(), partialRegistration.getSchemaDefinition(), serviceConsumer); return super.register(options.getName(), fullRegistration); }
/** * Handles a un-subscription request to the current {@link Destination}. * * @param connection the connection * @param frame the {@code UNSUBSCRIBE} frame * @return {@code true} if the un-subscription has been handled, {@code false} otherwise. */ @Override public synchronized boolean unsubscribe(StompServerConnection connection, Frame frame) { for (Subscription subscription : new ArrayList<>(subscriptions)) { if (subscription.connection.equals(connection) && subscription.id.equals(frame.getId())) { boolean r = subscriptions.remove(subscription); Optional<Subscription> any = subscriptions.stream().filter(s -> s.destination.equals(subscription.destination)).findAny(); // We unregister the event bus consumer if there are no subscription on this address anymore. if (!any.isPresent()) { MessageConsumer<?> consumer = registry.remove(subscription.destination); if (consumer != null) { consumer.unregister(); } } return r; } } return false; }
/** * Removes all subscriptions of the given connection * * @param connection the connection * @return the current instance of {@link Destination} */ @Override public synchronized Destination unsubscribeConnection(StompServerConnection connection) { new ArrayList<>(subscriptions) .stream() .filter(subscription -> subscription.connection.equals(connection)) .forEach(s -> { subscriptions.remove(s); Optional<Subscription> any = subscriptions.stream().filter(s2 -> s2.destination.equals(s.destination)) .findAny(); // We unregister the event bus consumer if there are no subscription on this address anymore. if (!any.isPresent()) { MessageConsumer<?> consumer = registry.remove(s.destination); if (consumer != null) { consumer.unregister(); } } }); return this; }
/** * Register the proxy handle on the event bus. * * @param eventBus the event bus * @param address the proxy address */ public MessageConsumer<JsonObject> register(EventBus eventBus, String address, List<Function<Message<JsonObject>, Future<Message<JsonObject>>>> interceptors) { Handler<Message<JsonObject>> handler = this::handle; if (interceptors != null) { for (Function<Message<JsonObject>, Future<Message<JsonObject>>> interceptor : interceptors) { Handler<Message<JsonObject>> prev = handler; handler = msg -> { Future<Message<JsonObject>> fut = interceptor.apply(msg); fut.setHandler(ar -> { if (ar.succeeded()) { prev.handle(msg); } else { ReplyException exception = (ReplyException) ar.cause(); msg.fail(exception.failureCode(), exception.getMessage()); } }); }; } } consumer = eventBus.consumer(address, handler); return consumer; }
@Override public void start() throws Exception { super.start(); MessageConsumer<JsonArray> ebConsumer = vertx.eventBus() .consumer(CommonConstants.VERTX_EVENT_BUS_HE_RSS_JDG_PUT); ebConsumer.handler(this::processEntries); }
ReceiverGenerator generateInitializing() { tsb = TypeSpec.classBuilder(MessageFormat.format("{0}{1}", interfaceElement.getSimpleName(), VXRIFA_RECEIVER_SUFFIX)).addModifiers(Modifier.PUBLIC); tsb.addSuperinterface(ParameterizedTypeName.get(ClassName.get(VxRifaReceiver.class), TypeName.get(interfaceElement.asType()))); vertxField = FieldSpec.builder(io.vertx.core.Vertx.class, "vertx", Modifier.PRIVATE, Modifier.FINAL).build(); tsb.addField(vertxField); eventBusAddressField = FieldSpec.builder(java.lang.String.class, "eventBusAddress", Modifier.PRIVATE, Modifier.FINAL).build(); tsb.addField(eventBusAddressField); consumersField = FieldSpec.builder(ParameterizedTypeName.get(ClassName.get(List.class), ParameterizedTypeName.get(ClassName.get(MessageConsumer.class), WildcardTypeName.subtypeOf(Object.class))), "consumers", Modifier.PRIVATE) .build(); tsb.addField(consumersField); tsb.addMethod( MethodSpec.constructorBuilder() .addModifiers(Modifier.PUBLIC) .addParameter(io.vertx.core.Vertx.class, vertxField.name) .addStatement("this.$N = $N", vertxField, vertxField) .addStatement("this.$N = $S", eventBusAddressField, interfaceElement.getQualifiedName().toString()) .build() ); tsb.addMethod( MethodSpec.constructorBuilder() .addModifiers(Modifier.PUBLIC) .addParameter(io.vertx.core.Vertx.class, vertxField.name) .addParameter(java.lang.String.class, eventBusAddressField.name) .addStatement("this.$N = $N", vertxField, vertxField) .addStatement("this.$N = $N", eventBusAddressField, eventBusAddressField) .build() ); return this; }
static void scheduling(Vertx vertx) { EventBus eventBus = vertx.eventBus(); // Consumer of the timer events MessageConsumer<JsonObject> consumer = eventBus.consumer("scheduler:timer"); // Listens and prints timer events. When timer completes stops the Vertx consumer.handler ( message -> { JsonObject event = message.body(); if (event.getString("event").equals("complete")) { System.out.println("completed"); vertx.close(); } else { System.out.println(event); } } ); // Create new timer eventBus.send ( "chime", (new JsonObject()).put("operation", "create").put("name", "scheduler:timer") .put("publish", false).put("max count", 3) .put("description", (new JsonObject()).put("type", "interval").put("delay", 1)), ar -> { if (ar.succeeded()) { System.out.println("Scheduling started: " + ar.result().body()); } else { System.out.println("Message failed: " + ar.cause()); vertx.close(); } } ); }
private Future<MessageConsumer<JsonObject>> getMarketSource(ServiceDiscovery discovery) { Future<MessageConsumer<JsonObject>> future = Future.future(); MessageSource.getConsumer(discovery, rec -> rec.getName().equalsIgnoreCase("market-data"), future); return future; }
private void initialize(Future<Void> done, String company, int numberOfShares, Future<PortfolioService> retrieveThePortfolioService, Future<MessageConsumer<JsonObject>> retrieveTheMarket, AsyncResult<CompositeFuture> ar) { if (ar.failed()) { done.fail(ar.cause()); } else { PortfolioService portfolio = retrieveThePortfolioService.result(); MessageConsumer<JsonObject> consumer = retrieveTheMarket.result(); consumer.handler(message -> TraderUtils.dumbTradingLogic(company, numberOfShares, portfolio, message.body())); done.complete(); } }
@Override public void start(Future<Void> done) throws Exception { String company = TraderUtils.pickACompany(); int numberOfShares = TraderUtils.pickANumber(); System.out.println("Java-Callback compulsive trader configured for company " + company + " and shares: " + numberOfShares); // TODO Complete the code to apply the trading _logic_ on each message received from the "market-data" message // source // ---- // Retrieve service discovery Future<ServiceDiscovery> retrieveServiceDiscovery = getServiceDiscovery(vertx); // When the service discovery is retrieved, retrieve the portfolio service and market data retrieveServiceDiscovery.setHandler(discovery -> { // TODO 1 - Get the Future objects for the portfolio and market services. Just use the method given below Future<PortfolioService> retrieveThePortfolioService = getPortfolioService(discovery.result()); Future<MessageConsumer<JsonObject>> retrieveTheMarket = getMarketSource(discovery.result()); // TODO 2 - Use CompositeFuture.all to "wait" until both future are completed. // TODO 3 - Attach a handler on the composite future, and call "initialize" // When both are completed, register the message handler to execute the trading logic CompositeFuture.all(retrieveServiceDiscovery, retrieveTheMarket) .setHandler(x -> initialize(done, company, numberOfShares, retrieveThePortfolioService, retrieveTheMarket, x)); }); // ---- }
/** * Registers a handler for configuring any objects sent via the message bus using Tamaya's injection API. * @param address the event bus address to register. * @param eventBus the event bus. * @return the consumer registered. */ public static MessageConsumer<Object> registerConfigurationInjector(String address, EventBus eventBus){ MessageConsumer<Object> consumer = eventBus.consumer(address); consumer.handler(h -> { Object o = h.body(); if(o==null){ h.fail(HttpResponseStatus.BAD_REQUEST.code(), "Required object to configure is missing."); }else { ConfigurationInjection.getConfigurationInjector().configure(o); h.reply("OK"); } }); return consumer; }
public ApplicationLauncher(String[] args) { this.args = args; logger.info(String.format("Starting excelastic %s..", VERSION)); logger.info("to import files without the web interface use: <filename> <index> <mapping>"); Future<Void> future = Future.future(); future.setHandler(done -> { if (done.succeeded()) { logger.info("Successfully started application"); if (args.length > 1) { importFile(getFileName(), getIndexName()); } else { MessageConsumer<?> consumer = vertx.eventBus().consumer(ES_STATUS); consumer.handler(message -> { logger.info(String.format("Attempting to open browser.. [ES connected=%s]", message.body().toString())); try { Desktop.getDesktop().browse(new URI(Configuration.getWebsiteURL())); } catch (IOException | URISyntaxException e) { logger.warning(e.getMessage()); } consumer.pause(); }); } } else { logger.log(Level.SEVERE, "Failed to start application", done.cause()); vertx.close(); } }); start(future); }
/** * Starts the verticle asynchronously. The the initialization is completed, it calls * `complete()` on the given {@link Future} object. If something wrong happens, * `fail` is called. * * @param future the future to indicate the completion */ @Override public void start(Future<Void> future) throws ClassNotFoundException { super.start(); // Get configuration config = ConfigFactory.load(); // creates the jdbc client. JsonObject jdbcConfig = new JsonObject(config.getObject("jdbc").render(ConfigRenderOptions.concise())); jdbc = JDBCClient.createNonShared(vertx, jdbcConfig); Class.forName(jdbcConfig.getString("driverclass")); // Start HTTP server and listen for portfolio events EventBus eventBus = vertx.eventBus(); Future<HttpServer> httpEndpointReady = configureTheHTTPServer(); httpEndpointReady.setHandler(ar -> { if (ar.succeeded()) { MessageConsumer<JsonObject> portfolioConsumer = eventBus.consumer(config.getString("portfolio.address")); portfolioConsumer.handler(message -> { storeInDatabase(message.body()); }); future.complete(); } else { future.fail(ar.cause()); } }); publishHttpEndpoint("audit", config.getString("http.host"), config.getInt("http.public.port"), config.getString("http.root"), ar -> { if (ar.failed()) { ar.cause().printStackTrace(); } else { System.out.println("Audit (Rest endpoint) service published : " + ar.succeeded()); } }); }
private Future<MessageConsumer<JsonObject>> retrieveThePortfolioMessageSource() { Future<MessageConsumer<JsonObject>> future = Future.future(); MessageSource.getConsumer(discovery, new JsonObject().put("name", "portfolio-events"), future.completer() ); return future; }
@Override public void start(Future<Void> future) { super.start(); // Get configuration config = ConfigFactory.load(); String company = TraderUtils.pickACompany(); int numberOfShares = TraderUtils.pickANumber(); EventBus eventBus = vertx.eventBus(); EventBusService.getProxy(discovery, PortfolioService.class, ar -> { if (ar.failed()) { System.out.println("Portfolio service could not be retrieved: " + ar.cause()); } else { // Our services: PortfolioService portfolio = ar.result(); MessageConsumer<JsonObject> marketConsumer = eventBus.consumer(config.getString("market.address")); // Listen to the market... marketConsumer.handler(message -> { JsonObject quote = message.body(); TraderUtils.dumbTradingLogic(company, numberOfShares, portfolio, quote); }); } }); }
public void example3(ServiceDiscovery discovery) { MessageSource.<JsonObject>getConsumer(discovery, new JsonObject().put("name", "some-message-source-service"), ar -> { if (ar.succeeded()) { MessageConsumer<JsonObject> consumer = ar.result(); // Attach a message handler on it consumer.handler(message -> { // message handler JsonObject payload = message.body(); }); // ... } }); }
public void example5(ServiceDiscovery discovery, Record record1, Record record2) { ServiceReference reference1 = discovery.getReference(record1); ServiceReference reference2 = discovery.getReference(record2); // Then, gets the service object, the returned type depends on the service type: // For http endpoint: HttpClient client = reference1.getAs(HttpClient.class); // For message source MessageConsumer consumer = reference2.getAs(MessageConsumer.class); // When done with the service reference1.release(); reference2.release(); }
/** * Convenient method that looks for a message source and provides the configured {@link MessageConsumer}. The * async result is marked as failed is there are no matching services, or if the lookup fails. * * @param discovery The service discovery instance * @param filter The filter, optional * @param resultHandler The result handler * @param <T> The class of the message */ static <T> void getConsumer(ServiceDiscovery discovery, JsonObject filter, Handler<AsyncResult<MessageConsumer<T>>> resultHandler) { discovery.getRecord(filter, ar -> { if (ar.failed() || ar.result() == null) { resultHandler.handle(Future.failedFuture("No matching record")); } else { resultHandler.handle(Future.succeededFuture(discovery.<MessageConsumer<T>>getReference(ar.result()).get())); } }); }
/** * Convenient method that looks for a message source and provides the configured {@link MessageConsumer}. The * async result is marked as failed is there are no matching services, or if the lookup fails. * * @param discovery The service discovery instance * @param filter The filter, must not be {@code null} * @param resultHandler The result handler * @param <T> The class of the message */ static <T> void getConsumer(ServiceDiscovery discovery, Function<Record, Boolean> filter, Handler<AsyncResult<MessageConsumer<T>>> resultHandler) { discovery.getRecord(filter, ar -> { if (ar.failed() || ar.result() == null) { resultHandler.handle(Future.failedFuture("No matching record")); } else { resultHandler.handle(Future.succeededFuture(discovery.<MessageConsumer<T>>getReference(ar.result()).get())); } }); }
@Test public void test() throws InterruptedException { Random random = new Random(); vertx.setPeriodic(10, l -> { vertx.eventBus().publish("data", random.nextDouble()); }); Record record = MessageSource.createRecord("Hello", "data"); discovery.publish(record, (r) -> { }); await().until(() -> record.getRegistration() != null); AtomicReference<Record> found = new AtomicReference<>(); discovery.getRecord(new JsonObject().put("name", "Hello"), ar -> { found.set(ar.result()); }); await().until(() -> found.get() != null); ServiceReference service = discovery.getReference(found.get()); MessageConsumer<Double> consumer = service.get(); List<Double> data = new ArrayList<>(); consumer.handler(message -> { data.add(message.body()); }); await().until(() -> !data.isEmpty()); service.release(); int size = data.size(); Thread.sleep(500); assertThat(data.size()).isEqualTo(size); // Just there to be sure we can call it twice service.release(); }
public void handle(Long time) { final Handler<Long> currentHandler = this; netClient.connect(port, host, socket -> { if (socket.succeeded()) { log.trace("establishSocket", "success"); currentDelay = delayFactor; final NetSocket netSocket = socket.result(); final RedisCommandHandler redisHandler = new RedisCommandHandler(netSocket); final MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer(eventBusAddress, redisHandler); netSocket.exceptionHandler(ex -> { log.error("establishSocket", "exception", "unknown", ex); consumer.unregister(); redisHandler.finish(); }); netSocket.closeHandler(message -> { log.warn("establishSocket", "socketClosed"); consumer.unregister(); redisHandler.finish(); vertx.setTimer(currentDelay, currentHandler); }); } else { if (socket.result() != null) { log.warn("establishSocket", "closeSocket"); socket.result().close(); } currentDelay = Math.min(currentDelay * 2, MAXIMUM_DELAY); log.warn("establishSocket", "failed", new String[] {"eventBusAddress", "server"}, eventBusAddress, host); vertx.setTimer(currentDelay, currentHandler); } }); }
@Override public void start(Future<Void> fut) throws Exception { // Sensors sensors.put(1, null); sensors.put(2, null); // W1 Bus init W1Master w1Master = new W1Master(); // Get DS18B20 Temp device objects for (TemperatureSensor device : w1Master.getDevices(TemperatureSensor.class)) { if (device.getName().contains("28-0000062d006a")) sensor1 = device; if (device.getName().contains("28-0000062d1425")) sensor2 = device; } // Read the 2 temp sensors "immediately" readTemp(event -> fut.complete()); // .. and continue refreshing the 2 temp. sensors every minute vertx.setPeriodic(2000, event -> { readTemp(event1 -> log.info("Temp sensor reading ok : " + sensors.get(1) + " / " + sensors.get(2))); }); // Handler to serve the sensors values on the vertx event loop MessageConsumer<String> dhwConsumer = vertx.eventBus().consumer("sensor-temp"); dhwConsumer.handler(event -> { String sensorId = event.body(); if (sensorId.equals("1")) { event.reply(sensors.get(1)); } else if (sensorId.equals("2")) { event.reply(sensors.get(2)); } else { event.fail(-1, "Bad sensor Id"); } }); }
/** * Configure and check the sender link of the endpoint. * The sender link is used for the response to a received request and is driven by the vertx event bus. * It listens to the provided resource identifier of the endpoint as vertx event address and then sends the * constructed response. * Since the response is endpoint specific, it is an abstract method {@link #getAmqpReply(io.vertx.core.eventbus.Message)} and needs to be implemented * by the subclass. * * @param con The AMQP connection that the link is part of. * @param sender The ProtonSender that has already been created for this endpoint. * @param replyToAddress The resource identifier for the responses of this endpoint (see {@link ResourceIdentifier} for details). * Note that the reply address is different for each client and is passed in during link creation. */ @Override public final void onLinkAttach(final ProtonConnection con, final ProtonSender sender, final ResourceIdentifier replyToAddress) { if (replyToAddress.getResourceId() == null) { logger.debug("client [{}] provided invalid reply-to address", sender.getName()); sender.setCondition(ProtonHelper.condition(AmqpError.INVALID_FIELD, String.format("reply-to address must have the following format %s/<tenant>/<reply-address>", getName()))); sender.close(); } else { logger.debug("establishing sender link with client [{}]", sender.getName()); final MessageConsumer<JsonObject> replyConsumer = vertx.eventBus().consumer(replyToAddress.toString(), message -> { // TODO check for correct session here...? logger.trace("forwarding reply to client [{}]: {}", sender.getName(), message.body()); final Message amqpReply = getAmqpReply(message); sender.send(amqpReply); }); sender.setQoS(ProtonQoS.AT_LEAST_ONCE); sender.closeHandler(senderClosed -> { logger.debug("client [{}] closed sender link, removing associated event bus consumer [{}]", sender.getName(), replyConsumer.address()); replyConsumer.unregister(); if (senderClosed.succeeded()) { senderClosed.result().close(); } }); sender.open(); } }
public void registerConsumers(Vertx vertx, Event<Object> event) { CountDownLatch latch = new CountDownLatch(consumerAddresses.size()); for (String address : consumerAddresses) { MessageConsumer<?> consumer = vertx.eventBus().consumer(address, VertxHandler.from(vertx, event, address)); consumer.completionHandler(ar -> { if (ar.succeeded()) { LOGGER.debug("Sucessfully registered event consumer for {0}", address); latch.countDown(); } else { LOGGER.error("Cannot register event consumer for {0}", ar.cause(), address); } }); } Context context = this.context; if (context == null && vertx != null) { context = vertx.getOrCreateContext(); } long timeout = context != null ? context.config().getLong(CONSUMER_REGISTRATION_TIMEOUT_KEY, DEFAULT_CONSUMER_REGISTRATION_TIMEOUT) : DEFAULT_CONSUMER_REGISTRATION_TIMEOUT; try { if (!latch.await(timeout, TimeUnit.MILLISECONDS)) { throw new IllegalStateException(String.format("Message consumers not registered within %s ms [registered: %s, total: %s]", timeout, latch.getCount(), consumerAddresses.size())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } }
public <T extends Queryable> MessageConsumer<JsonObject> registerServiceConsumer(String address, T implementation) { MessageConsumer<JsonObject> serviceConsumer; if (!messageConsumers.containsKey(address)) { serviceConsumer = ProxyHelper.registerService(Queryable.class, vertx, implementation, address); messageConsumers.put(address, serviceConsumer); } else { serviceConsumer = messageConsumers.get(address); } consumerRegistrations.add(address); return serviceConsumer; }
public void close() { for (Iterator<Map.Entry<String, MessageConsumer<JsonObject>>> it = messageConsumers.entrySet().iterator(); it.hasNext();) { MessageConsumer consumer = it.next().getValue(); if (consumer.isRegistered()) { consumer.unregister(); } it.remove(); } consumerRegistrations.clear(); }
private SchemaRegistration(ServiceDiscovery discovery, ServiceDiscoveryOptions options, Record record, SchemaDefinition schemaDefinition, MessageConsumer<JsonObject> serviceConsumer) { super(discovery, options); Objects.requireNonNull(record, "Service record cannot be null"); Objects.requireNonNull(schemaDefinition, "Schema definition cannot be null"); this.record = record; this.schemaDefinition = schemaDefinition; this.serviceConsumer = serviceConsumer; }
/** * Publish a GraphQL schema for querying. * <p> * On success a {@link SchemaRegistration} is returned. It contains the message consumer of the * {@link Queryable} service proxy that supplies the published {@link SchemaDefinition}, the published service * discovery record, and the {@link ServiceDiscovery} it was published to. * <p> * Note that unless invoked from a {@link SchemaPublisher} a * client needs to keep hold of the returned {@link Record} as long as it is published. * * @param vertx the vert.x instance * @param discovery the service discovery instance * @param definition the service proxy instance exposing the graphql schema * @param resultHandler the result handler that returns the registration */ static void publish(Vertx vertx, ServiceDiscovery discovery, SchemaDefinition definition, Handler<AsyncResult<SchemaRegistration>> resultHandler) { Objects.requireNonNull(vertx, "Vertx cannot be null"); Objects.requireNonNull(discovery, "Service discovery cannot be null"); Objects.requireNonNull(definition, "GraphQL queryable cannot be null"); Objects.requireNonNull(resultHandler, "Publication result handler cannot be null"); // TODO Caching proxy ok? final MessageConsumer<JsonObject> serviceConsumer; if (definition.metadata().get("publisherId") == null) { serviceConsumer = ProxyHelper.registerService( Queryable.class, vertx, definition, definition.serviceAddress()); } else { // Publisher handles service instantiation, manages consumer. serviceConsumer = null; } Record record = new Record() .setType(SERVICE_TYPE) .setName(definition.schemaName()) .setMetadata(definition.metadata().toJson()) .setLocation(new JsonObject().put(Record.ENDPOINT, definition.serviceAddress())); discovery.publish(record, rh -> { if (rh.succeeded()) { resultHandler.handle(Future.succeededFuture( SchemaRegistration.create(discovery, null, rh.result(), definition, serviceConsumer))); } else { resultHandler.handle(Future.failedFuture(rh.cause())); } }); }
/** * Handle back-pressure on component. * * @param stream stream in read. * @param endHandler end handler to call. */ public void handlePressure(ReadStream stream, Handler<Void> endHandler) { MessageConsumer<String> consumer = eventBus.consumer(parentEndpoint + ".pressure"); consumer.handler(new PressureHandler(stream, parentEndpoint, h -> { if (endHandler != null) { endHandler.handle(null); } consumer.unregister(); })); }
@Override public void start(Future<Void> future) { super.start(); //---- // Initialize the trader String company = TraderUtils.pickACompany(); int numberOfShares = TraderUtils.pickANumber(); System.out.println("Java compulsive trader configured for company " + company + " and shares: " + numberOfShares); // We need to retrieve two services, create two futures object that will get the services Future<MessageConsumer<JsonObject>> marketFuture = Future.future(); Future<PortfolioService> portfolioFuture = Future.future(); // Retrieve the services, use the "special" completed to assign the future MessageSource.getConsumer(discovery, new JsonObject().put("name", "market-data"), marketFuture); EventBusService.getProxy(discovery, PortfolioService.class, portfolioFuture); // When done (both services retrieved), execute the handler CompositeFuture.all(marketFuture, portfolioFuture).setHandler(ar -> { if (ar.failed()) { future.fail("One of the required service cannot " + "be retrieved: " + ar.cause()); } else { // Our services: PortfolioService portfolio = portfolioFuture.result(); MessageConsumer<JsonObject> marketConsumer = marketFuture.result(); // Listen the market... marketConsumer.handler(message -> { JsonObject quote = message.body(); TraderUtils.dumbTradingLogic(company, numberOfShares, portfolio, quote); }); future.complete(); } }); // ---- }
@Test public void test() throws InterruptedException { Random random = new Random(); vertx.setPeriodic(10, l -> { vertx.eventBus().publish("data", random.nextDouble()); }); Record record = MessageSource.createRecord("Hello", "data"); discovery.publish(record, (r) -> { }); await().until(() -> record.getRegistration() != null); AtomicReference<Record> found = new AtomicReference<>(); discovery.getRecord(new JsonObject().put("name", "Hello"), ar -> { found.set(ar.result()); }); await().until(() -> found.get() != null); Service service = DiscoveryService.getService(vertx, found.get()); MessageConsumer<Double> consumer = service.get(); List<Double> data = new ArrayList<>(); consumer.handler(message -> { data.add(message.body()); }); await().until(() -> ! data.isEmpty()); service.release(); int size = data.size(); Thread.sleep(200); assertThat(data.size()).isEqualTo(size); }
/** * * @param <REQ> * @param <RESP> * @param config * @return MessageConsumer */ protected <REQ extends Message, RESP extends Message> MessageConsumerRegistration<REQ, RESP> registerMessageConsumer(@NonNull final MessageConsumerConfig<REQ, RESP> config) { Preconditions.checkState(!messageConsumerRegistrations.containsKey(config.getAddressMessageMapping().getAddress())); final EventBus eventBus = vertx.eventBus(); registerMessageCodecs(config); final String address = config.getAddressMessageMapping().getAddress(); final MessageConsumer<REQ> consumer = config.isLocal() ? eventBus.localConsumer(address) : eventBus.consumer(address); consumer.completionHandler(config.getCompletionHandler().map(handler -> messageConsumerCompletionHandler(address, Optional.of(handler), config)) .orElseGet(() -> messageConsumerCompletionHandler(address, Optional.empty(), config))); consumer.endHandler(config.getEndHandler().map(handler -> messageConsumerEndHandler(address, Optional.of(handler), config)) .orElseGet(() -> messageConsumerEndHandler(address, Optional.empty(), config))); config.getExceptionHandler().ifPresent(consumer::exceptionHandler); consumer.handler(messageConsumerHandler(config)); final String processSpecificAddress = config.getAddressMessageMapping().getProcessSpecificAddress(); final MessageConsumer<REQ> processSpecificConsumer = config.isLocal() ? eventBus.localConsumer(processSpecificAddress) : eventBus.consumer(processSpecificAddress); processSpecificConsumer.completionHandler(config.getCompletionHandler().map(handler -> messageConsumerCompletionHandler(processSpecificAddress, Optional.of(handler), config)) .orElseGet(() -> messageConsumerCompletionHandler(processSpecificAddress, Optional.empty(), config))); processSpecificConsumer.endHandler(config.getEndHandler().map(handler -> messageConsumerEndHandler(processSpecificAddress, Optional.of(handler), config)) .orElseGet(() -> messageConsumerEndHandler(processSpecificAddress, Optional.empty(), config))); config.getExceptionHandler().ifPresent(processSpecificConsumer::exceptionHandler); processSpecificConsumer.handler(messageConsumerHandler(config)); final MessageConsumerRegistration<REQ, RESP> messageConsumerRegistration = MessageConsumerRegistration.<REQ, RESP>builder() .messageConsumer(consumer) .processSpecificMessageConsumer(processSpecificConsumer) .config(config) .build(); messageConsumerRegistrations = ImmutableMap.<String, MessageConsumerRegistration<?, ?>>builder().putAll(messageConsumerRegistrations).put( config.address(), messageConsumerRegistration ).build(); return messageConsumerRegistration; }
@After public void tearDown() { clients.forEach(StompClient::close); clients.clear(); consumers.forEach(MessageConsumer::unregister); consumers.clear(); AsyncLock<Void> lock = new AsyncLock<>(); server.close(lock.handler()); lock.waitForSuccess(); lock = new AsyncLock<>(); vertx.close(lock.handler()); lock.waitForSuccess(); }