protected void processOneResult(final JsonObject dataChange) { final JsonObject data = dataChange.getJsonObject("data"); final JsonObject payload = data.getJsonObject("payload"); // We send it off to the eventbus and in any case have the // final destination header set - just in case final EventBus eb = this.getVertx().eventBus(); final DeliveryOptions opts = new DeliveryOptions(); this.getListenerConfig().getEventBusAddresses().forEach(destination -> { opts.addHeader(Constants.BUS_FINAL_DESTINATION, destination); }); // Intermediate step for deduplication of messages if (this.useDedupService()) { eb.publish(this.getListenerConfig().getEventBusDedupAddress(), payload, opts); } else { this.getListenerConfig().getEventBusAddresses().forEach(destination -> { try { eb.publish(destination, payload, opts); this.logger.info("Sending to [" + destination + "]:" + payload.toString()); } catch (final Throwable t) { this.logger.error(t.getMessage(), t); } }); } }
private static ModuleSystem createMessageHandlers(ModuleSystem module) { EventBus eventBus = module.require(EventBus.class); module.require(MessageHandlersBuilder.class).build( MessageHandlersBuilder.BuildParams.builder() .module(module) .flowParamss(ImmutableList.of( new MessageHandlersBuilder.FlowParams( Entities.USER_ENTITY, new FieldExpressionImpl( "r.id" ) ) )) .build() ).forEach(addressAndHandler -> { eventBus.consumer(addressAndHandler.getAddress(), addressAndHandler.getMessageHandler()); }); return module; }
/** * Check for the Authentication info if required * * @return a future that resolves when successful got AuthInfo */ private Future<AuthInfo> getAuthInfo() { Future<AuthInfo> result; final String authName = this.getConsumerConfig().getAuthName(); if ((this.authInfo == null) && (authName != null)) { result = Future.future(); final EventBus eb = this.getVertx().eventBus(); final String address = Constants.BUS_AUTHREQUEST + authName; eb.send(address, null, replyHandler -> { if (replyHandler.succeeded()) { this.authInfo = (AuthInfo) replyHandler.result().body(); result.complete(this.authInfo); } else { result.fail(replyHandler.cause()); } }); } else { result = Future.succeededFuture(this.authInfo); } return result; }
/** * @see net.wissel.salesforce.vertx.listener.CometD#processOneResult(io.vertx.core.json.JsonObject) */ @Override protected void processOneResult(JsonObject dataChange) { final JsonObject data = dataChange.getJsonObject("data"); final JsonObject payload = data.getJsonObject("payload"); final String objectType = payload.getString("ObjectType__c"); // We send it off to the eventbus final EventBus eb = this.getVertx().eventBus(); this.getListenerConfig().getEventBusAddresses().forEach(destination -> { try { eb.publish(destination+objectType, payload); this.logger.info("Sending to [" + destination+objectType + "]:" + payload.toString()); } catch (final Throwable t) { this.logger.error(t.getMessage(), t); } }); }
private Future<AuthInfo> getAuthInfo() { Future<AuthInfo> result; if (this.authInfo == null) { result = Future.future(); final EventBus eb = this.getVertx().eventBus(); final String address = Constants.BUS_AUTHREQUEST + this.getListenerConfig().getAuthName(); eb.send(address, null, replyHandler -> { if (replyHandler.succeeded()) { this.authInfo = (AuthInfo) replyHandler.result().body(); result.complete(this.authInfo); } else { result.fail(replyHandler.cause()); } }); } else { result = Future.succeededFuture(this.authInfo); } return result; }
@Override public void start() { final EventBus bus = this.vertx.eventBus(); bus.<JsonObject>consumer(ID.Addr.REGISTRY_START, result -> { final JsonObject data = result.body(); final String name = data.getString(Registry.NAME); final HttpServerOptions options = new HttpServerOptions(data.getJsonObject(Registry.OPTIONS)); final String[] uris = data.getString(Registry.URIS).split(Strings.COMMA); final Set<String> uriData = new TreeSet<>(Arrays.asList(uris)); // Write the data to registry. this.registry.registryHttp(name, options, Etat.RUNNING); this.registry.registryRoute(name, options, uriData); LOGGER.info(Info.MICRO_REGISTRY_CONSUME, getClass().getSimpleName(), name, ID.Addr.REGISTRY_START); }); }
@Override public Handler<RoutingContext> attack(final Event event) { return Fn.get(() -> (context) -> Responser.exec(() -> { // 1. Build Envelop final Envelop request = this.invoke(context, event); // 2. Build event bus final Vertx vertx = context.vertx(); final EventBus bus = vertx.eventBus(); // 3. Send message final String address = this.address(event); bus.<Envelop>send(address, request, handler -> { final Envelop response; if (handler.succeeded()) { // Request - Response message response = this.success(address, handler); } else { response = this.failure(address, handler); } Answer.reply(context, response, event); }); }, context, event), event); }
private void registerSynonyms() { EventBus eventBus = vertx.eventBus(); eventBus.consumer(WordnetAddresses.SYNONYMS.getAddress(), (Handler<Message<String>>) message -> { String body = message.body(); IDictionary dictionary = dictionaryCache.getDictionary(); IIndexWord idxWord = dictionary.getIndexWord(body, POS.NOUN); IWordID wordID = idxWord.getWordIDs().get(0); // 1st meaning IWord word = dictionary.getWord(wordID); ISynset synset = word.getSynset(); List<String> synonyms = synset.getWords().stream().map(IWord::getLemma).collect(Collectors.toList()); message.reply(new JsonArray(synonyms)); }); }
@Override public void start() throws Exception { // Retrieve the event bus EventBus eventBus = vertx.eventBus(); // Execute the given handler every 2000 ms vertx.setPeriodic(2000, l -> { // Use the eventBus() method to retrieve the event bus and send a "{"message":hello"} JSON message on the // "greetings" address. // 1 - Create the JSON object using the JsonObject class, and `put` the 'message':'hello' entry // TODO // 2 - Use the `send` method of the event bus to _send_ the message. Messages sent with the `send` method // are received by a single consumer. Messages sent with the `publish` method are received by all // registered consumers. // TODO }); }
public AppImpl(Config config) { Objects.requireNonNull(config); final ModuleSystemBuilder builder = ModuleSystem.builder(); TrackerExporter.exportTo( TrackerExporter.ExportToParams.builder() .builder(builder) .config(config) .build() ); module = builder.build(); eventBus = module.require(EventBus.class); createMessageHandlers(module); MessageBus messageBus = module.require(MessageBus.class); }
/** * Registers a handler for accessing single configuration keys (input: String, reply type: String). If no * config value is present the consumer will reply with a NOT_FOUND failure. * @param address the event bus address to register. * @param eventBus the event bus. * @return the consumer registered. */ public static MessageConsumer<String> registerSingleConfigEntryProvider(String address, EventBus eventBus){ MessageConsumer<String> consumer = eventBus.consumer(address); consumer.handler(h -> { String key = (String) h.body(); if (key == null) { h.fail(HttpResponseStatus.BAD_REQUEST.code(), "Missing config key."); } else { String value = ConfigurationProvider.getConfiguration().getOrDefault(key, null); if (value != null) { h.reply(value); } else { h.fail(HttpResponseStatus.NOT_FOUND.code(), "Config key not found: " + key); } } }); return consumer; }
/** * Registers a handler for accessing multiple configuration keys (input: String[] (Json), * reply type: Map<String,String></String,String> (Json). * @param address the event bus address to register. * @param eventBus the event bus. * @return the consumer registered. */ public static MessageConsumer<String> registerMultiConfigEntryProvider(String address, EventBus eventBus){ MessageConsumer<String> consumer = eventBus.consumer(address); consumer.handler(h -> { String val = h.body(); Configuration config = ConfigurationProvider.getConfiguration(); Map<String,String> entries = new TreeMap<>(); if(val!=null){ String[] sections = Json.decodeValue(val, String[].class); for (String section : sections) { if(section!=null) { entries.putAll(config.with(ConfigurationFunctions.section(section)).getProperties()); } } }else{ entries.putAll(config.getProperties()); } h.reply(Json.encode(entries)); }); return consumer; }
private static void configureRoute(Route route, Operation operation, EventBus eventBus) { Optional.ofNullable(operation.getConsumes()).ifPresent(consumes -> consumes.forEach(route::consumes)); Optional.ofNullable(operation.getProduces()).ifPresent(produces -> produces.forEach(route::produces)); route.handler(context -> { try { JsonObject message = new JsonObject(); operation.getParameters().forEach( parameter -> { String name = parameter.getName(); String value = PARAMETER_EXTRACTORS.get(parameter.getIn()).extract(name, parameter.getRequired(), context.request()); message.put(name, value); }); eventBus.<JsonObject>send(operation.getOperationId(), message, operationResponse -> { if (operationResponse.succeeded()) { context.response().end(operationResponse.result().body().encode()); } else { internalServerErrorEnd(context.response()); } }); } catch (RuntimeException e) { badRequestEnd(context.response()); } }); }
@Override public void initialize(Vertx vertx, JsonObject config, Future<Void> complete) { //eventbus consumer EventBus eb = vertx.eventBus(); Lists.newArrayList(ServiceLoader.load(ApiCmdFactory.class)) .stream() .map(f -> f.create(vertx, config)) .forEach(cmd -> { String address = cmdAddress(cmd.cmd()); Log.create(LOGGER) .setEvent("api.cmd.register") .addData("address", address) .addData("cmd", cmd.cmd()) .info(); eb.<JsonObject>consumer(address, msg -> { try { consumer(cmd, msg); } catch (Exception e) { EventbusUtils.onFailure(msg, 0, e); } }); }); complete.complete(); }
@Override public void start() throws Exception { JsonObject config = config(); String gremlinServerConfigPath = config.getString("gremlinServerConfigPath"); Server.single.init(gremlinServerConfigPath).start(); client = Client.single.start(); eventBusAddr = config.getString("eventBusAddress"); EventBus eventBus = vertx.eventBus(); eventBus.consumer(eventBusAddr, event -> { String body = (String) event.body(); logger.info("received from the eventbus of {} : {}", eventBusAddr, body); GremlinMessage gremlinMessage = Json.decodeValue(body, GremlinMessage.class); List<Object> submit; try { submit = client.submit(gremlinMessage.getGremlinScript(), null, gremlinMessage.getParambindings()); String result = Json.encode(submit); event.reply(result); } catch (Exception e) { logger.error("can't execute script : " + gremlinMessage.getGremlinScript() + " because: " + e.getMessage(), e); } }); }
/** * Sets up the fixture. */ @Before public void setUp() { fileSystem = mock(FileSystem.class); Context ctx = mock(Context.class); eventBus = mock(EventBus.class); vertx = mock(Vertx.class); when(vertx.eventBus()).thenReturn(eventBus); when(vertx.fileSystem()).thenReturn(fileSystem); props = new FileBasedRegistrationConfigProperties(); props.setFilename(FILE_NAME); registrationService = new FileBasedRegistrationService(); registrationService.setConfig(props); registrationService.init(vertx, ctx); }
@Test public void testRegisterMessageCodecNotFoundAbortOnFailure() { config.put("messageCodecs", new JsonArray("[\"com.groupon.vertx.utils.MainVerticleTest$NonExistentCodec\"]")); final EventBus eventBus = Mockito.mock(EventBus.class); Mockito.doReturn(eventBus).when(vertx).eventBus(); try { MainVerticle.registerMessageCodecs(vertx, config, true); fail("Expected exception not thrown"); } catch (final MainVerticle.CodecRegistrationException e) { // Expected exception } Mockito.verify(eventBus, Mockito.never()).registerCodec(Mockito.any(MyMessageCodec.class)); latch.countDown(); }
/** * Collects metrics on messages that are sent * * <h3>Meters</h3> * <ol> * <li>{@link RunRightFastVerticleMetrics.Meters#MESSAGE_SENT} * <li>{@link RunRightFastVerticleMetrics.Meters#MESSAGE_PUBLISHED} * </ol> * * <h3>Gauges</h3> * <ol> * <li>{@link RunRightFastVerticleMetrics.Meters#MESSAGE_SENT} * <li>{@link RunRightFastVerticleMetrics.Meters#MESSAGE_PUBLISHED} * </ol> * * @param eventBus * @param address * @param codec - used to register the message codec * @param metricRegistry used to register the 2 meters described above */ public ProtobufMessageProducer( @NonNull final EventBus eventBus, final String address, @NonNull final ProtobufMessageCodec<A> codec, @NonNull final MetricRegistry metricRegistry) { checkArgument(isNotBlank(address)); this.eventBus = eventBus; this.address = address; registerMessageCodec(codec); this.messageSent = metricRegistry.meter(String.format("%s::%s", MESSAGE_SENT.metricName, address)); this.messagePublished = metricRegistry.meter(String.format("%s::%s", MESSAGE_PUBLISHED.metricName, address)); metricRegistry.register(String.format("%s::%s", MESSAGE_LAST_SENT_TS.metricName, address), (Gauge<String>) () -> { return messageLastSent != null ? DateTimeFormatter.ISO_INSTANT.format(messageLastSent) : null; }); metricRegistry.register(String.format("%s::%s", MESSAGE_LAST_PUBLISHED_TS.metricName, address), (Gauge<String>) () -> { return messageLastPublished != null ? DateTimeFormatter.ISO_INSTANT.format(messageLastPublished) : null; }); }
private static void initializeConsumers(final EventBus eventBus, final MpdClient mpdClient) { eventBus.consumer(GET_CURRENT_TRACK).handler( event -> mpdClient.getCurrent() .thenAccept(optTrack -> event.reply(optTrack.map(Song::toJson).orElse(null))) .exceptionally(logException("Could not get current track"))); eventBus.consumer(PLAY).handler( event -> mpdClient.play() .exceptionally(logException("Could not play"))); eventBus.consumer(STOP).handler( event -> mpdClient.stop() .exceptionally(logException("Could not stop"))); eventBus.consumer(PAUSE).handler( event -> mpdClient.pause() .exceptionally(logException("Could not pause"))); eventBus.consumer(PREV).handler( event -> mpdClient.previous() .exceptionally(logException("Could not go to previous"))); eventBus.consumer(NEXT).handler( event -> mpdClient.next() .exceptionally(logException("Could not go to next"))); }
private void tryToInvoke(Object instance, Method method, BridgeEvent be) { List<Object> paramInstances = new ArrayList<>(); for (Class<?> parameterClass : method.getParameterTypes()) { final Vertx vertx = config.getVertx(); if (parameterClass.equals(BridgeEvent.class)) { paramInstances.add(be); } else if (parameterClass.equals(EventBus.class)) { paramInstances.add(vertx.eventBus()); } else if (parameterClass.equals(Vertx.class)) { paramInstances.add(vertx); } } try { method.invoke(instance, paramInstances.toArray()); } catch (Exception e) { LOG.error("Error while handling websocket", e); if (!be.failed() && !be.succeeded()) { be.fail(e); } } }
private Object[] getParamValues(Method method, SockJSSocket socket, Buffer msg) { final Vertx vertx = config.getVertx(); List<Object> paramInstances = new ArrayList<>(); for (Class<?> parameterClass : method.getParameterTypes()) { if (parameterClass.equals(SockJSSocket.class)) { paramInstances.add(socket); } else if (Buffer.class.isAssignableFrom(parameterClass)) { paramInstances.add(msg); } else if (parameterClass.equals(EventBus.class)) { paramInstances.add(vertx.eventBus()); } else if (parameterClass.equals(Vertx.class)) { paramInstances.add(vertx); } } return paramInstances.toArray(); }
/** * Register a service route * * @param message the eventbus message for registering the service */ private void serviceRegisterHandler(Message<ServiceInfo> message) { final ServiceInfo info = message.body(); if(info.getPort()<=0) { final EventBus eventBus = vertx.eventBus(); Stream.of(info.getOperations()).forEach(operation -> { final String type = operation.getType(); // TODO use "better" key than a simple relative url final String url = operation.getUrl(); final String[] mimes = operation.getProduces() != null ? operation.getProduces() : new String[]{""}; // TODO specify timeout in service info object, so that every Service can specify his own timeout // defaultServiceTimeout = operation.getInteger("timeout"); if (!registeredRoutes.contains(url)) { registeredRoutes.add(url); handleServiceType(eventBus, type, url, mimes); } } ); } }
private void handleServiceType(EventBus eventBus, String type, String url, String[] mimes) { switch (Type.valueOf(type)) { case REST_GET: restHandler.handleRESTGetRegistration(eventBus, url, mimes); break; case REST_POST: restHandler.handleRESTPostRegistration(eventBus, url, mimes); break; case EVENTBUS: break; case WEBSOCKET: break; default: } }
public void handleRESTPostRegistration(final EventBus eventBus, final String url, final String[] mimes) { routeMatcher.matchMethod(HttpMethod.POST, url, request -> { request.setExpectMultipart(true); request.endHandler(new VoidHandler() { public void handle() { final MultiMap attrs = request.formAttributes(); handleRestRequest(eventBus, request, url, getParameterEntity(attrs), Arrays.asList(mimes), defaultServiceTimeout); } }); } ); }
private void sendToWSService(final ServerWebSocket serverSocket, final String path, final WSEndpoint endpoint) { final EventBus eventBus = vertx.eventBus(); serverSocket.handler(handler -> { try { log("send WS:+ " + endpoint.getUrl()); eventBus.send(path, Serializer.serialize(new WSDataWrapper(endpoint, handler.getBytes())), new DeliveryOptions().setSendTimeout(GlobalKeyHolder.DEFAULT_SERVICE_TIMEOUT)); } catch (IOException e) { e.printStackTrace(); } } ); serverSocket.resume(); //TODO set close handler!! }
@Override public void run(AsyncResultHandler<Void> startupHandler, Handler<DuplexStream<Buffer, Buffer>> duplexStreamHandler) { final EventBus eventBus = vertx.eventBus(); inputConsumer = eventBus.consumer(configObj.getString("input.address")); outputPublisher = eventBus.publisher(configObj.getString("output.address")); completePublisher = eventBus.publisher(configObj.getString("complete.address")); closePublisher = eventBus.publisher(configObj.getString("close.address")); final EventBusClosableWriteStream eventBusStream = new EventBusClosableWriteStream(inputConsumer, outputPublisher); final DuplexStream<Buffer, Buffer> duplexStream = new DuplexStream<>(inputConsumer.bodyStream(), eventBusStream); duplexStream .closeHandler(v -> { closePublisher.write(Buffer.buffer("Closed")); }) .writeCompleteHandler(pack -> { completePublisher.write(Buffer.buffer("Ok")); }); vertx.runOnContext(v -> { duplexStreamHandler.handle(duplexStream); }); startupHandler.handle(Future.succeededFuture()); }
/** * Register the proxy handle on the event bus. * * @param eventBus the event bus * @param address the proxy address */ public MessageConsumer<JsonObject> register(EventBus eventBus, String address, List<Function<Message<JsonObject>, Future<Message<JsonObject>>>> interceptors) { Handler<Message<JsonObject>> handler = this::handle; if (interceptors != null) { for (Function<Message<JsonObject>, Future<Message<JsonObject>>> interceptor : interceptors) { Handler<Message<JsonObject>> prev = handler; handler = msg -> { Future<Message<JsonObject>> fut = interceptor.apply(msg); fut.setHandler(ar -> { if (ar.succeeded()) { prev.handle(msg); } else { ReplyException exception = (ReplyException) ar.cause(); msg.fail(exception.failureCode(), exception.getMessage()); } }); }; } } consumer = eventBus.consumer(address, handler); return consumer; }
@Override public SFDCVerticle startListening() { this.logger.info("Start listening:" + this.getClass().getName()); // Listen on the event bus final EventBus eb = this.vertx.eventBus(); this.consumer = eb.consumer(this.getConsumerConfig().getEventBusAddress()); this.logger.info(this.getClass().getName() + " listening on " + this.consumer.address()); this.consumer.handler(this::processIncoming); // Done this.listening = true; return this; }
@Override public SFDCVerticle startListening() { this.logger.info("Start listening:" + this.getClass().getName()); // Listen on the event bus final EventBus eb = this.vertx.eventBus(); this.dedupConsumer = eb.consumer(Constants.BUS_DEDUP_PREFIX + this.getDedupConfig().getInstanceName()); this.logger.info(this.getClass().getName() + " listening on " + this.dedupConsumer.address()); this.dedupConsumer.handler(this::processIncoming); // Done this.listening = true; return this; }
/** * Check incoming messages for final destinations and then forward them if they are not * duplicates * * @param incomingData received from event bus */ protected void processIncoming(final Message<JsonObject> incomingData) { final MultiMap headers = incomingData.headers(); final List<String> finalDestination = headers.getAll(Constants.BUS_FINAL_DESTINATION); if ((finalDestination != null) && !finalDestination.isEmpty()) { final JsonObject j = incomingData.body(); // Forwarding the original headers final DeliveryOptions dOpts = new DeliveryOptions(); dOpts.setHeaders(headers); final Future<Void> duplicate = Future.future(f -> { if (f.succeeded()) { // Forwarding it to where it should go final EventBus eb = this.getVertx().eventBus(); finalDestination.forEach(d -> { eb.send(d, j, dOpts); }); } else { this.logger.info("Dropped duplicate Object:" + String.valueOf(j)); } }); // The duplicate check happens here! this.checkForDuplicate(duplicate, j); } else { this.logger.fatal(new Exception("Incoming message without final destination" + incomingData.toString())); } }
@Override public void start() { // 1. Get event bus final EventBus bus = this.vertx.eventBus(); // 2. Consume address for (final Receipt receipt : RECEIPTS) { // 3. Deploy for each type final String address = receipt.getAddress(); // 4. Get target reference and method final Object reference = receipt.getProxy(); final Method method = receipt.getMethod(); // 5. Verify verify(method); try { Fn.safeNull(() -> bus.<Envelop>consumer(address, message -> { if (isVoid(method)) { // void Message<Envelop> Instance.invoke(reference, method.getName(), message); } else { // Envelop ( Envelop ) syncReply(message, reference, method.getName()); } }), address, reference, method); } catch (final Throwable ex) { ex.printStackTrace(); } } }
@Override public void start() { final EventBus bus = this.vertx.eventBus(); bus.<JsonObject>consumer(ID.Addr.IPC_START, result -> { final JsonObject data = result.body(); final ServidorOptions options = new ServidorOptions(data); // Write the data to registry. this.registry.registryRpc(options, Etat.RUNNING); this.registry.registryIpcs(options, Tunnel.IPCS.keySet()); LOGGER.info(Info.MICRO_REGISTRY_CONSUME, getClass().getSimpleName(), options.getName(), ID.Addr.IPC_START); }); }
private void startRegistry(final ServidorOptions options) { // Rpc Agent is only valid in Micro mode final EventBus bus = this.vertx.eventBus(); final String address = ID.Addr.IPC_START; LOGGER.info(Info.IPC_REGISTRY_SEND, getClass().getSimpleName(), options.getName(), address); bus.publish(address, options.toJson()); }
private void startRegistry(final String name, final HttpServerOptions options, final Set<String> tree) { // Enabled micro mode. if (EtcdData.enabled()) { final JsonObject data = getMessage(name, options, tree); // Send Data to Event Bus final EventBus bus = this.vertx.eventBus(); final String address = ID.Addr.REGISTRY_START; LOGGER.info(Info.MICRO_REGISTRY_SEND, getClass().getSimpleName(), name, address); bus.publish(address, data); } }
@Override public Handler<RoutingContext> attack(final Event event) { return Fn.get(() -> (context) -> Responser.exec(() -> { // 1. Build Arguments final Object[] arguments = buildArgs(context, event); // 2. Method callxx final Object returnValue = invoke(event, arguments); final Envelop request = Flower.continuous(context, returnValue); // 3. Build event bus final Vertx vertx = context.vertx(); final EventBus bus = vertx.eventBus(); // 4. Send message final String address = address(event); bus.<Envelop>send(address, request, handler -> { final Envelop response; if (handler.succeeded()) { // One Way message response = Envelop.ok(); } else { response = failure(address, handler); } Answer.reply(context, response, event); }); }, context, event), event); }
static void scheduling(Vertx vertx) { EventBus eventBus = vertx.eventBus(); // Consumer of the timer events MessageConsumer<JsonObject> consumer = eventBus.consumer("scheduler:timer"); // Listens and prints timer events. When timer completes stops the Vertx consumer.handler ( message -> { JsonObject event = message.body(); if (event.getString("event").equals("complete")) { System.out.println("completed"); vertx.close(); } else { System.out.println(event); } } ); // Create new timer eventBus.send ( "chime", (new JsonObject()).put("operation", "create").put("name", "scheduler:timer") .put("publish", false).put("max count", 3) .put("description", (new JsonObject()).put("type", "interval").put("delay", 1)), ar -> { if (ar.succeeded()) { System.out.println("Scheduling started: " + ar.result().body()); } else { System.out.println("Message failed: " + ar.cause()); vertx.close(); } } ); }
@Override public void eventBusInitialized(@NotNull EventBus bus) { if (options.isEmbeddedServerEnabled()) { server = MetricsServer .create(vertx) .apply(options.getRegistry(), options.getFormat()) .apply(options.getAddress()); } }
private void registerNameFinder() { EventBus eventBus = vertx.eventBus(); eventBus.consumer(NlpAddresses.PERSONNAME.getAddress(), (Handler<Message<String[]>>) message -> { String[] tokens = message.body(); List<PersonName> names = new ArrayList<>(); Span nameSpans[] = nameFinder.find(tokens); double[] spanProbs = nameFinder.probs(nameSpans); for (int i = 0; i < nameSpans.length; i++) { Span nameSpan = nameSpans[i]; int start = nameSpan.getStart(); int end = nameSpan.getEnd() - 1; String name; if (start == end) { name = tokens[start]; } else { name = tokens[start] + " " + tokens[end]; } double probability = spanProbs[i]; String[] nameTokens = Arrays.copyOfRange(tokens, start, end + 1); names.add(new PersonName(name, nameTokens, probability)); } message.reply(names); }); }
@Override public void start() throws Exception { EventBus eventBus = vertx.eventBus(); // Register a consumer and call the `reply` method with a JSON object containing the greeting message. ~ // parameter is passed in the incoming message body (a name). For example, if the incoming message is the // String "vert.x", the reply contains: `{"message" : "hello vert.x"}`. // Unlike the previous exercise, the incoming message has a `String` body. // TODO }