@Override public void sendMessage(JsonObject filter, Object msg) { flow.getServiceRef(filter, filter.getString("name"), ar -> { if (ar.succeeded()) { LookupResult a = ar.result(); MessageProducer producer = a.ref.getAs(MessageProducer.class); producer.send(msg); } }); }
@Override public void start() throws Exception { LOGGER.info("starting VertxBusProducerVerticle"); MessageProducer<Object> publisher = getVertx().eventBus().publisher("localhost"); timerId = getVertx().setPeriodic(2000, new Handler<Long>() { int count = 0; @Override public void handle(Long event) { publisher.send("Message "+count++); } }); }
@SuppressWarnings("unchecked") static LocalSessionStoreAdapter of(MessageProducer<String> sessionExpiredProducer, LocalSessionStoreImpl delegate) { LocalMap<String, Session> localMap = Reflection.field("localMap").ofType(LocalMap.class).in(delegate).get(); String sessionMapName = Reflection.field("name").ofType(String.class).in(localMap).get(); Vertx vertx = Reflection.field("vertx").ofType(Vertx.class).in(delegate).get(); long reaperInterval = Reflection.field("reaperInterval").ofType(long.class).in(delegate).get(); delegate.close(); return new LocalSessionStoreAdapter(vertx, sessionMapName, reaperInterval, sessionExpiredProducer); }
public static SessionStore adapt(Vertx vertx, SessionStore sessionStore) { MessageProducer<String> sessionMessageProducer = vertx.eventBus().sender(VAADIN_SESSION_EXPIRED_ADDRESS); if (sessionStore instanceof LocalSessionStoreImpl) { return LocalSessionStoreAdapter.of(sessionMessageProducer, (LocalSessionStoreImpl) sessionStore); } if (sessionStore instanceof ClusteredSessionStoreImpl) { return new ClusteredSessionStoreAdapter(sessionMessageProducer, (ClusteredSessionStoreImpl) sessionStore); } throw new VertxException("Cannot adapt session store of type " + sessionStore.getClass().getName()); }
public void example1(Vertx vertx) { AmqpBridge bridge = AmqpBridge.create(vertx); // Start the bridge, then use the event loop thread to process things thereafter. bridge.start("localhost", 5672, res -> { // Set up a producer using the bridge, send a message with it. MessageProducer<JsonObject> producer = bridge.createProducer("myAmqpAddress"); JsonObject amqpMsgPayload = new JsonObject(); amqpMsgPayload.put("body", "myStringContent"); producer.send(amqpMsgPayload); }); }
public void example3(MessageProducer<JsonObject> producer) { JsonObject applicationProperties = new JsonObject(); applicationProperties.put("name", "value"); JsonObject amqpMsgPayload = new JsonObject(); amqpMsgPayload.put("application_properties", applicationProperties); producer.send(amqpMsgPayload); }
@SuppressWarnings("unused") public void example10(MessageProducer<JsonObject> producer) { JsonObject amqpMsgPayload = new JsonObject(); amqpMsgPayload.put("body", "myRequest"); producer.<JsonObject> send(amqpMsgPayload, res -> { JsonObject amqpReplyMessagePayload = res.result().body(); // ...do something with reply message... }); }
protected <R> MessageProducer<JsonObject> doSend(JsonObject messageBody, Handler<AsyncResult<Message<R>>> replyHandler, String toAddress) { if (replyHandler != null) { bridge.verifyReplyToAddressAvailable(); } org.apache.qpid.proton.message.Message msg = translator.convertToAmqpMessage(messageBody); if (toAddress != null) { msg.setAddress(toAddress); } synchronized (AmqpProducerImpl.this) { // Update the credit tracking. We only need to adjust this here because the sends etc may not be on the context // thread and if that is the case we can't use the ProtonSender sendQueueFull method to check that credit has been // exhausted following this doSend call since we will have only scheduled the actual send for later. remoteCredit--; } bridge.runOnContext(true, v -> { if (replyHandler != null) { bridge.registerReplyToHandler(msg, replyHandler); } sender.send(msg); synchronized (AmqpProducerImpl.this) { // Update the credit tracking *again*. We need to reinitialise it here in case the doSend call was performed on // a thread other than the bridge context, to ensure we didn't fall foul of a race between the above pre-send // update on that thread, the above send on the context thread, and the sendQueueDrainHandler based updates on // the context thread. remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit(); } }); return this; }
@SuppressWarnings("unchecked") @Override public MessageProducer<JsonObject> createProducer(String amqpAddress) { if (!started.get()) { throw new IllegalStateException("Bridge was not successfully started"); } return new AmqpProducerImpl(this, connection, amqpAddress); }
public ClusteredSessionStoreAdapter(MessageProducer<String> sessionExpiredProducer, ClusteredSessionStoreImpl sessionStore) { this.sessionExpiredProducer = sessionExpiredProducer; this.sessionStore = sessionStore; }
public LocalSessionStoreAdapter(Vertx vertx, String sessionMapName, long reaperInterval, MessageProducer<String> sessionExpiredProducer) { super(vertx, sessionMapName, reaperInterval); this.sessionExpiredProducer = Objects.requireNonNull(sessionExpiredProducer); //this.delegate = delegate; }
public void example5(MessageProducer<JsonObject> producer) { producer.writeQueueFull(); }
public void example6(MessageProducer<JsonObject> producer) { producer.drainHandler(v -> { // ...do stuff and send... }); }
@Override public MessageProducer<JsonObject> send(JsonObject messageBody) { return send(messageBody, null); }
@Override public <R> MessageProducer<JsonObject> send(JsonObject messageBody, Handler<AsyncResult<Message<R>>> replyHandler) { return doSend(messageBody, replyHandler, null); }
@Override public synchronized MessageProducer<JsonObject> exceptionHandler(Handler<Throwable> handler) { exceptionHandler = handler; return this; }
@Override public MessageProducer<JsonObject> write(JsonObject data) { return send(data, null); }
@Override public MessageProducer<JsonObject> setWriteQueueMaxSize(int maxSize) { // No-op, available sending credit is controlled by recipient peer in AMQP 1.0. return this; }
@Override public synchronized MessageProducer<JsonObject> drainHandler(Handler<Void> handler) { drainHandler = handler; return this; }
@Override public MessageProducer<JsonObject> deliveryOptions(DeliveryOptions options) { throw new UnsupportedOperationException("DeliveryOptions are not supported by this producer"); }
public static void main(String[] args) throws Exception { File tmp = File.createTempFile("activemq", ".tmp"); tmp.delete(); tmp.mkdirs(); tmp.deleteOnExit(); BrokerService brokerService = new BrokerService(); brokerService.setBrokerName("localhost"); brokerService.setDeleteAllMessagesOnStartup(true); brokerService.setUseJmx(true); brokerService.getManagementContext().setCreateConnector(false); brokerService.setDataDirectory(tmp.getAbsolutePath()); brokerService.setPersistent(false); brokerService.setSchedulerSupport(false); brokerService.setAdvisorySupport(false); TransportConnector connector = brokerService .addConnector("amqp://0.0.0.0:" + PORT + "?transport.transformer=jms" + "&transport.socketBufferSize=65536&ioBufferSize=8192"); connector.setName("amqp"); brokerService.start(); brokerService.waitUntilStarted(); Vertx vertx = Vertx.vertx(); AmqpBridge bridge = AmqpBridge.create(vertx); bridge.start("localhost", PORT, res -> { if (res.succeeded()) { MessageConsumer<JsonObject> consumer = bridge.createConsumer(ADDRESS); consumer.handler(vertxMsg -> { System.out.println("Received message: " + vertxMsg.body().getValue("body")); }); } }); AmqpBridge bridge2 = AmqpBridge.create(vertx); bridge2.start("localhost", PORT, res -> { if (res.succeeded()) { MessageProducer<JsonObject> producer = bridge2.createProducer(ADDRESS); vertx.setPeriodic(1000, id -> { producer.send(new JsonObject().put("body", "the-content")); System.out.println("Sent message"); }); } }); System.in.read(); }
@Test(timeout = 20000) public void testBasicRequestReply(TestContext context) { Async asyncRequest = context.async(); Async asyncShutdown = context.async(); String destinationName = getTestName(); String content = "myStringContent"; String replyContent = "myStringReply"; AmqpBridge bridge = AmqpBridge.create(vertx); bridge.start("localhost", getBrokerAmqpConnectorPort(), startResult -> { context.assertTrue(startResult.succeeded()); MessageProducer<JsonObject> producer = bridge.createProducer(destinationName); JsonObject body = new JsonObject(); body.put(AmqpConstants.BODY, content); producer.<JsonObject> send(body, reply -> { LOG.trace("Sender got reply"); context.assertEquals(replyContent, reply.result().body().getValue(AmqpConstants.BODY), "unexpected reply msg content"); context.assertNotNull(reply.result().address(), "address was not set on reply"); context.assertNull(reply.result().replyAddress(), "reply address was unexpectedly set on the reply"); LOG.trace("Shutting down"); bridge.close(shutdownRes -> { LOG.trace("Shutdown complete"); context.assertTrue(shutdownRes.succeeded()); asyncShutdown.complete(); }); }); LOG.trace("Client sent msg"); MessageConsumer<JsonObject> consumer = bridge.createConsumer(destinationName); consumer.handler(msg -> { JsonObject receivedMsgBody = msg.body(); LOG.trace("Consumer got request msg: " + receivedMsgBody); context.assertNotNull(receivedMsgBody, "expected msg body but none found"); context.assertEquals(content, receivedMsgBody.getValue(AmqpConstants.BODY), "unexpected msg content"); context.assertNotNull(msg.replyAddress(), "reply address was not set on the request"); JsonObject replyBody = new JsonObject(); replyBody.put(AmqpConstants.BODY, replyContent); msg.reply(replyBody); asyncRequest.complete(); }); }); asyncRequest.awaitSuccess(); asyncShutdown.awaitSuccess(); }
@Test(timeout = 20000) public void testReplyHandlingDisabledProducer(TestContext context) throws Exception { Async asyncSend = context.async(); Async asyncShutdown = context.async(); String destinationName = getTestName(); String content = "myStringContent" + destinationName; AmqpBridgeOptions options = new AmqpBridgeOptions().setReplyHandlingSupport(false); AmqpBridge bridge = AmqpBridge.create(vertx, options); bridge.start("localhost", getBrokerAmqpConnectorPort(), startResult -> { context.assertTrue(startResult.succeeded()); MessageProducer<JsonObject> producer = bridge.createProducer(destinationName); JsonObject body = new JsonObject(); body.put(AmqpConstants.BODY, content); // Try send with a reply handler, expect to fail. try { producer.<JsonObject> send(body, reply -> { }); context.fail("Expected exception to be thrown"); } catch (IllegalStateException ise) { // Expected. } // Try send without reply handler. producer.send(body); LOG.trace("Client sent msg"); asyncSend.complete(); LOG.trace("Shutting down"); bridge.close(shutdownRes -> { LOG.trace("Shutdown complete"); context.assertTrue(shutdownRes.succeeded()); asyncShutdown.complete(); }); }); asyncSend.awaitSuccess(); asyncShutdown.awaitSuccess(); }
@Test(timeout = 20000) public void testReplyToOriginalReply(TestContext context) { Async requestReceivedAsync = context.async(); Async replyRecievedAsync = context.async(); Async shutdownAsync = context.async(); String destinationName = getTestName(); String content = "myStringContent"; String replyContent = "myStringReply"; String replyToReplyContent = "myStringReplyToReply"; AmqpBridge bridge = AmqpBridge.create(vertx); bridge.start("localhost", getBrokerAmqpConnectorPort(), startResult -> { context.assertTrue(startResult.succeeded()); MessageProducer<JsonObject> producer = bridge.createProducer(destinationName); JsonObject body = new JsonObject(); body.put(AmqpConstants.BODY, content); producer.<JsonObject> send(body, reply -> { LOG.trace("Sender got first reply"); Message<JsonObject> replyMessage = reply.result(); context.assertEquals(replyContent, replyMessage.body().getValue(AmqpConstants.BODY), "unexpected reply msg content"); context.assertNotNull(replyMessage.address(), "address was not set on the reply"); context.assertNotNull(replyMessage.replyAddress(), "reply address was not set on the reply"); replyRecievedAsync.complete(); JsonObject replyToReplyBody = new JsonObject(); replyToReplyBody.put(AmqpConstants.BODY, replyToReplyContent); replyMessage.reply(replyToReplyBody); }); LOG.trace("Client sent msg"); MessageConsumer<JsonObject> consumer = bridge.createConsumer(destinationName); consumer.handler(msg -> { JsonObject receivedMsgBody = msg.body(); LOG.trace("Receiver got request: " + receivedMsgBody); context.assertNotNull(receivedMsgBody, "expected msg body but none found"); context.assertEquals(content, receivedMsgBody.getValue(AmqpConstants.BODY), "unexpected msg content"); context.assertNotNull(msg.replyAddress(), "reply address was not set on the request"); JsonObject replyBody = new JsonObject(); replyBody.put(AmqpConstants.BODY, replyContent); msg.<JsonObject> reply(replyBody, replyToReply -> { LOG.trace("Receiver got reply to reply"); Message<JsonObject> replyToReplyMessage = replyToReply.result(); context.assertEquals(replyToReplyContent, replyToReplyMessage.body().getValue(AmqpConstants.BODY), "unexpected 2nd reply msg content"); context.assertNull(replyToReplyMessage.replyAddress(), "reply address was unexpectedly set on 2nd reply"); LOG.trace("Shutting down"); bridge.close(shutdownRes -> { LOG.trace("Shutdown complete"); context.assertTrue(shutdownRes.succeeded()); shutdownAsync.complete(); }); }); requestReceivedAsync.complete(); }); }); requestReceivedAsync.awaitSuccess(); replyRecievedAsync.awaitSuccess(); shutdownAsync.awaitSuccess(); }
public EventBusClosableWriteStream(MessageConsumer<Buffer> inputConsumer, MessageProducer<Buffer> outputPublisher) { this.inputConsumer = inputConsumer; this.outputPublisher = outputPublisher; }
/** * Creates a producer to the given AMQP address. * * This method MUST be called from the bridge Context thread, as used in the result handler callback from the start * methods. The bridge MUST be successfully started before the method is called. * * @param amqpAddress * the address to produce to * @return the producer * @throws IllegalStateException * if the bridge was not started or the method is invoked on a thread other than the bridge Context thread, * as used in the result handler callback from the start methods. */ <T> MessageProducer<T> createProducer(String amqpAddress) throws IllegalStateException;