private ServiceInfo getServiceInfoByVertx(Consumer<ServiceInfoResult> consumer, Function<ServiceInfo,Boolean> criteria) { // TODO add caching mechanism with TTL to reduce vertx.eventBus().send(GlobalKeyHolder.SERVICE_REGISTRY_GET, "xyz", (AsyncResultHandler<Message<byte[]>>) h -> { if (h.succeeded()) { final List<ServiceInfo> serviceInfos = getServiceInfoFromMessage(h).filter(info -> criteria.apply(info)).collect(Collectors.toList()); if(!serviceInfos.isEmpty()){ consumer.accept(new ServiceInfoResult(serviceInfos.stream(),h.succeeded(),h.cause())); }else { consumer.accept(new ServiceInfoResult(serviceInfos.stream(),false,new ServiceNotFoundException("selected service not found"))); } } else { consumer.accept(new ServiceInfoResult(Stream.<ServiceInfo>empty(),h.succeeded(),h.cause())); } } ); return null; }
@Override public void run(StreamMux mux, AsyncResultHandler<MessageStream> messageStreamAsyncResultHandler) { final MessageInjectorImpl messageInjector = new MessageInjectorImpl(mux, messageMatcher); filterPlugin.run(messageInjector, asyncResult -> { if (asyncResult.failed()) { final Throwable cause = asyncResult.cause(); logger.error(String.format("Unable to obtain output plugin duplex stream for %s. Cause: %s", pluginName, cause.getMessage()), cause); run(mux, messageStreamAsyncResultHandler); return; } final MessageStream messageStream = asyncResult.result(); messageStream.setMessageMatcher(messageMatcher); messageStream.setErrorStrategy(errorStrategy); messageStreamAsyncResultHandler.handle(Future.succeededFuture(messageStream)); }); }
@Override public void run(AsyncResultHandler<Void> startupHandler, Handler<DuplexStream<Buffer, Buffer>> duplexStreamHandler) { netServer.connectHandler(socket -> { final DuplexStream<Buffer, Buffer> duplexStream = new DuplexStream<>(socket, new SocketWriteStream(socket)); duplexStream .closeHandler(v -> { socket.close(); }) .packDecorator(pack -> { final Message message = pack.getMessage(); message.setRemoteAddress(socket.remoteAddress()); message.setLocalAddress(socket.localAddress()); }); duplexStreamHandler.handle(duplexStream); }); netServer.listen(netServerAsyncResult -> { if (netServerAsyncResult.failed()) { startupHandler.handle(Future.failedFuture(netServerAsyncResult.cause())); return; } logger.info("Tcp Server started: " + configObj); startupHandler.handle(Future.succeededFuture()); }); }
@Override protected void createObject(AsyncResultHandler<ClosableWriteStream<Buffer>> readyHandler) { netClient.connect(port, host, connectHandler -> { if (connectHandler.failed()) { final Throwable cause = connectHandler.cause(); logger.error(cause.getMessage(), cause); readyHandler.handle(Future.failedFuture(cause)); return; } readyHandler.handle(Future.succeededFuture(new SocketWriteStream(connectHandler.result()))); }); }
@Override public void run(AsyncResultHandler<DuplexStream<Buffer, Buffer>> duplexStreamAsyncResultHandler) { if (keepAlive) { createKeepAliveStream(duplexStreamAsyncResultHandler); return; } createStream(duplexStreamAsyncResultHandler); }
private void createKeepAliveStream(AsyncResultHandler<DuplexStream<Buffer, Buffer>> duplexStreamAsyncResultHandler) { socketPool.borrowObject(asyncResult -> { if (asyncResult.failed()) { final Throwable cause = asyncResult.cause(); logger.error(String.format("Unable to obtain socket for %s. Cause: %s", configObj.toString(), cause.getMessage()), cause); run(duplexStreamAsyncResultHandler); return; } final ClosableWriteStream<Buffer> writeStream = asyncResult.result(); SocketWriteStream socketWriteStream = (SocketWriteStream) writeStream; NetSocket socket = socketWriteStream.getSocket(); final DuplexStream<Buffer, Buffer> duplexStream = new DuplexStream<>(socket, writeStream) .closeHandler(v -> { socket.close(); }) .packDecorator(pack -> { final Message message = pack.getMessage(); message.setRemoteAddress(socket.remoteAddress()); message.setLocalAddress(socket.localAddress()); }); duplexStreamAsyncResultHandler.handle(Future.succeededFuture(duplexStream)); }); }
@Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { final MethodMeta methodData = this.metaData.getMeta(method); final JsonObject message = convertToRequest(methodData, args); if(logger.isDebugEnabled()) { logger.debug("invoke: " + message); } final ServiceMetaData.Argument returnType = methodData.getReturnType(); ServiceHandler sh = (ServiceHandler)args[args.length-1]; vertx.eventBus().send(address, message, this.options, new AsyncResultHandler<Message<JsonObject>>() { @Override public void handle(final AsyncResult<Message<JsonObject>> event) { if( event.succeeded() ) { if( returnType.getType().isAssignableFrom(Void.class) || event.result().body() == null) { sh.ok(null); } else { final ProxyJsonSerializer serializer = returnType.getSerializer(); try { sh.ok(serializer == null ? event.result().body().getValue(RETURN) : serializer.fromJson(event.result().body().getJsonObject(RETURN), returnType.getType())); }catch (Throwable t) { // Bad conversion sh.fail(FailHandler.BAD_DESERIALIZATION, new JsonObject().put("error", t.getMessage())); } } } else { ReplyException re = (ReplyException) event.cause(); switch (re.failureType()) { case NO_HANDLERS: case TIMEOUT: sh.fail(re.failureCode(), new JsonObject().put("error", re.getMessage())); break; case RECIPIENT_FAILURE: default: sh.fail(re.failureCode(), new JsonObject(re.getMessage())); break; } } } }); return null; }
@Override public void stop(AsyncResultHandler<Void> stopHandler) { inputPlugin.stop(stopHandler::handle); }
@Override public void stop(AsyncResultHandler<Void> stopHandler) { netServer.close(stopHandler); }
private void createStream(AsyncResultHandler<DuplexStream<Buffer, Buffer>> duplexStreamAsyncResultHandler) { final DelayedSocketStream stream = new DelayedSocketStream(socketPool); final DuplexStream<Buffer, Buffer> duplexStream = new DuplexStream<>(NullReadStream.getInstance(), stream); duplexStreamAsyncResultHandler.handle(Future.succeededFuture(duplexStream)); }
@Override public void run(AsyncResultHandler<DuplexStream<Buffer, Buffer>> duplexStreamAsyncResultHandler) { final KafkaProducerStream kafkaProducerStream = new KafkaProducerStream(vertx, recordFactory); final DuplexStream<Buffer, Buffer> duplexStream = new DuplexStream<>(NullReadStream.getInstance(), kafkaProducerStream); duplexStreamAsyncResultHandler.handle(Future.succeededFuture(duplexStream)); }
@Override protected void createObject(AsyncResultHandler<Fake> readyHandler) { readyHandler.handle(Future.succeededFuture(new Fake())); }
/** * The way the code works now it's expected that the duplexStreamHandler not be invoked immediately. Usually this * isn't a problem since IO is async and creating the server stream requires async callbacks. However, if you * are writing a plugin that can create the streams during the same "tick" as the run call, please invoke * the handler using vertx.runOnContext() to get the next "tick". * @param startupHandler * @param duplexStreamHandler */ void run(AsyncResultHandler<Void> startupHandler, Handler<DuplexStream<Buffer, Buffer>> duplexStreamHandler);
void run(AsyncResultHandler<DuplexStream<Buffer, Buffer>> duplexStreamAsyncResultHandler);
void run(StreamMux mux, AsyncResultHandler<MessageStream> messageStreamAsyncResultHandler);
void stop(AsyncResultHandler<Void> stopHandler);
void run(MessageInjector messageInjector, AsyncResultHandler<MessageStream> messageStreamAsyncResultHandler);
public void start(AsyncResultHandler<Void> startupHandler, Handler<MessageStream> messageParsingStreamHandler);