Java 类io.vertx.core.eventbus.MessageProducer 实例源码

项目:vertx-service-flow    文件:ServiceFlowBase.java   
@Override
public void sendMessage(JsonObject filter, Object msg) {
  flow.getServiceRef(filter, filter.getString("name"), ar -> {
    if (ar.succeeded()) {
      LookupResult a = ar.result();
      MessageProducer producer = a.ref.getAs(MessageProducer.class);
      producer.send(msg);
    }
  });
}
项目:Karaf-Vertx    文件:VertxBusProducerVerticle.java   
@Override
public void start() throws Exception {
    LOGGER.info("starting VertxBusProducerVerticle");
    MessageProducer<Object> publisher = getVertx().eventBus().publisher("localhost");

    timerId = getVertx().setPeriodic(2000, new Handler<Long>() {
        int count = 0;

        @Override
        public void handle(Long event) {
            publisher.send("Message "+count++);
        }
    });
}
项目:vaadin-vertx-samples    文件:LocalSessionStoreAdapter.java   
@SuppressWarnings("unchecked")
static LocalSessionStoreAdapter of(MessageProducer<String> sessionExpiredProducer, LocalSessionStoreImpl delegate) {
    LocalMap<String, Session> localMap = Reflection.field("localMap").ofType(LocalMap.class).in(delegate).get();
    String sessionMapName = Reflection.field("name").ofType(String.class).in(localMap).get();
    Vertx vertx = Reflection.field("vertx").ofType(Vertx.class).in(delegate).get();
    long reaperInterval = Reflection.field("reaperInterval").ofType(long.class).in(delegate).get();
    delegate.close();
    return new LocalSessionStoreAdapter(vertx, sessionMapName, reaperInterval, sessionExpiredProducer);
}
项目:vaadin-vertx-samples    文件:SessionStoreAdapter.java   
public static SessionStore adapt(Vertx vertx, SessionStore sessionStore) {
    MessageProducer<String> sessionMessageProducer = vertx.eventBus().sender(VAADIN_SESSION_EXPIRED_ADDRESS);
    if (sessionStore instanceof LocalSessionStoreImpl) {
        return LocalSessionStoreAdapter.of(sessionMessageProducer, (LocalSessionStoreImpl) sessionStore);
    }
    if (sessionStore instanceof ClusteredSessionStoreImpl) {
        return new ClusteredSessionStoreAdapter(sessionMessageProducer, (ClusteredSessionStoreImpl) sessionStore);
    }
    throw new VertxException("Cannot adapt session store of type " + sessionStore.getClass().getName());
}
项目:vertx-amqp-bridge    文件:VertxAmqpBridgeExamples.java   
public void example1(Vertx vertx) {
  AmqpBridge bridge = AmqpBridge.create(vertx);
  // Start the bridge, then use the event loop thread to process things thereafter.
  bridge.start("localhost", 5672, res -> {
    // Set up a producer using the bridge, send a message with it.
    MessageProducer<JsonObject> producer = bridge.createProducer("myAmqpAddress");

    JsonObject amqpMsgPayload = new JsonObject();
    amqpMsgPayload.put("body", "myStringContent");

    producer.send(amqpMsgPayload);
  });
}
项目:vertx-amqp-bridge    文件:VertxAmqpBridgeExamples.java   
public void example3(MessageProducer<JsonObject> producer) {
  JsonObject applicationProperties = new JsonObject();
  applicationProperties.put("name", "value");

  JsonObject amqpMsgPayload = new JsonObject();
  amqpMsgPayload.put("application_properties", applicationProperties);

  producer.send(amqpMsgPayload);
}
项目:vertx-amqp-bridge    文件:VertxAmqpBridgeExamples.java   
@SuppressWarnings("unused")
public void example10(MessageProducer<JsonObject> producer) {
  JsonObject amqpMsgPayload = new JsonObject();
  amqpMsgPayload.put("body", "myRequest");

  producer.<JsonObject> send(amqpMsgPayload, res -> {
    JsonObject amqpReplyMessagePayload = res.result().body();
    // ...do something with reply message...
  });
}
项目:vertx-amqp-bridge    文件:AmqpProducerImpl.java   
protected <R> MessageProducer<JsonObject> doSend(JsonObject messageBody,
                                                 Handler<AsyncResult<Message<R>>> replyHandler, String toAddress) {
  if (replyHandler != null) {
    bridge.verifyReplyToAddressAvailable();
  }

  org.apache.qpid.proton.message.Message msg = translator.convertToAmqpMessage(messageBody);

  if (toAddress != null) {
    msg.setAddress(toAddress);
  }

  synchronized (AmqpProducerImpl.this) {
    // Update the credit tracking. We only need to adjust this here because the sends etc may not be on the context
    // thread and if that is the case we can't use the ProtonSender sendQueueFull method to check that credit has been
    // exhausted following this doSend call since we will have only scheduled the actual send for later.
    remoteCredit--;
  }

  bridge.runOnContext(true, v -> {
    if (replyHandler != null) {
      bridge.registerReplyToHandler(msg, replyHandler);
    }

    sender.send(msg);

    synchronized (AmqpProducerImpl.this) {
      // Update the credit tracking *again*. We need to reinitialise it here in case the doSend call was performed on
      // a thread other than the bridge context, to ensure we didn't fall foul of a race between the above pre-send
      // update on that thread, the above send on the context thread, and the sendQueueDrainHandler based updates on
      // the context thread.
      remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit();
    }
  });

  return this;
}
项目:vertx-amqp-bridge    文件:AmqpBridgeImpl.java   
@SuppressWarnings("unchecked")
@Override
public MessageProducer<JsonObject> createProducer(String amqpAddress) {
  if (!started.get()) {
    throw new IllegalStateException("Bridge was not successfully started");
  }

  return new AmqpProducerImpl(this, connection, amqpAddress);
}
项目:vaadin-vertx-samples    文件:ClusteredSessionStoreAdapter.java   
public ClusteredSessionStoreAdapter(MessageProducer<String> sessionExpiredProducer, ClusteredSessionStoreImpl sessionStore) {
    this.sessionExpiredProducer = sessionExpiredProducer;
    this.sessionStore = sessionStore;
}
项目:vaadin-vertx-samples    文件:LocalSessionStoreAdapter.java   
public LocalSessionStoreAdapter(Vertx vertx, String sessionMapName, long reaperInterval, MessageProducer<String> sessionExpiredProducer) {
    super(vertx, sessionMapName, reaperInterval);
    this.sessionExpiredProducer = Objects.requireNonNull(sessionExpiredProducer);
    //this.delegate = delegate;
}
项目:vertx-amqp-bridge    文件:VertxAmqpBridgeExamples.java   
public void example5(MessageProducer<JsonObject> producer) {
  producer.writeQueueFull();
}
项目:vertx-amqp-bridge    文件:VertxAmqpBridgeExamples.java   
public void example6(MessageProducer<JsonObject> producer) {
  producer.drainHandler(v -> {
    // ...do stuff and send...
  });
}
项目:vertx-amqp-bridge    文件:AmqpProducerImpl.java   
@Override
public MessageProducer<JsonObject> send(JsonObject messageBody) {
  return send(messageBody, null);
}
项目:vertx-amqp-bridge    文件:AmqpProducerImpl.java   
@Override
public <R> MessageProducer<JsonObject> send(JsonObject messageBody, Handler<AsyncResult<Message<R>>> replyHandler) {
  return doSend(messageBody, replyHandler, null);
}
项目:vertx-amqp-bridge    文件:AmqpProducerImpl.java   
@Override
public synchronized MessageProducer<JsonObject> exceptionHandler(Handler<Throwable> handler) {
  exceptionHandler = handler;

  return this;
}
项目:vertx-amqp-bridge    文件:AmqpProducerImpl.java   
@Override
public MessageProducer<JsonObject> write(JsonObject data) {
  return send(data, null);
}
项目:vertx-amqp-bridge    文件:AmqpProducerImpl.java   
@Override
public MessageProducer<JsonObject> setWriteQueueMaxSize(int maxSize) {
  // No-op, available sending credit is controlled by recipient peer in AMQP 1.0.
  return this;
}
项目:vertx-amqp-bridge    文件:AmqpProducerImpl.java   
@Override
public synchronized MessageProducer<JsonObject> drainHandler(Handler<Void> handler) {
  drainHandler = handler;
  return this;
}
项目:vertx-amqp-bridge    文件:AmqpProducerImpl.java   
@Override
public MessageProducer<JsonObject> deliveryOptions(DeliveryOptions options) {
  throw new UnsupportedOperationException("DeliveryOptions are not supported by this producer");
}
项目:vertx-amqp-bridge    文件:Main.java   
public static void main(String[] args) throws Exception {

    File tmp = File.createTempFile("activemq", ".tmp");
    tmp.delete();
    tmp.mkdirs();
    tmp.deleteOnExit();

    BrokerService brokerService = new BrokerService();
    brokerService.setBrokerName("localhost");
    brokerService.setDeleteAllMessagesOnStartup(true);
    brokerService.setUseJmx(true);
    brokerService.getManagementContext().setCreateConnector(false);
    brokerService.setDataDirectory(tmp.getAbsolutePath());
    brokerService.setPersistent(false);
    brokerService.setSchedulerSupport(false);
    brokerService.setAdvisorySupport(false);

    TransportConnector connector = brokerService
        .addConnector("amqp://0.0.0.0:" + PORT + "?transport.transformer=jms"
            + "&transport.socketBufferSize=65536&ioBufferSize=8192");
    connector.setName("amqp");

    brokerService.start();
    brokerService.waitUntilStarted();


    Vertx vertx = Vertx.vertx();

    AmqpBridge bridge = AmqpBridge.create(vertx);
    bridge.start("localhost", PORT, res -> {
      if (res.succeeded()) {
        MessageConsumer<JsonObject> consumer = bridge.createConsumer(ADDRESS);
        consumer.handler(vertxMsg -> {
          System.out.println("Received message: " + vertxMsg.body().getValue("body"));
        });
      }
    });

    AmqpBridge bridge2 = AmqpBridge.create(vertx);
    bridge2.start("localhost", PORT, res -> {
      if (res.succeeded()) {
        MessageProducer<JsonObject> producer = bridge2.createProducer(ADDRESS);
        vertx.setPeriodic(1000, id -> {
          producer.send(new JsonObject().put("body", "the-content"));
          System.out.println("Sent message");
        });
      }
    });

    System.in.read();
  }
项目:vertx-amqp-bridge    文件:AmqpBridgeTest.java   
@Test(timeout = 20000)
public void testBasicRequestReply(TestContext context) {
  Async asyncRequest = context.async();
  Async asyncShutdown = context.async();

  String destinationName = getTestName();
  String content = "myStringContent";
  String replyContent = "myStringReply";

  AmqpBridge bridge = AmqpBridge.create(vertx);
  bridge.start("localhost", getBrokerAmqpConnectorPort(), startResult -> {
    context.assertTrue(startResult.succeeded());

    MessageProducer<JsonObject> producer = bridge.createProducer(destinationName);

    JsonObject body = new JsonObject();
    body.put(AmqpConstants.BODY, content);

    producer.<JsonObject> send(body, reply -> {
      LOG.trace("Sender got reply");
      context.assertEquals(replyContent, reply.result().body().getValue(AmqpConstants.BODY),
          "unexpected reply msg content");
      context.assertNotNull(reply.result().address(), "address was not set on reply");
      context.assertNull(reply.result().replyAddress(), "reply address was unexpectedly set on the reply");

      LOG.trace("Shutting down");
      bridge.close(shutdownRes -> {
        LOG.trace("Shutdown complete");
        context.assertTrue(shutdownRes.succeeded());
        asyncShutdown.complete();
      });
    });
    LOG.trace("Client sent msg");

    MessageConsumer<JsonObject> consumer = bridge.createConsumer(destinationName);
    consumer.handler(msg -> {
      JsonObject receivedMsgBody = msg.body();
      LOG.trace("Consumer got request msg: " + receivedMsgBody);

      context.assertNotNull(receivedMsgBody, "expected msg body but none found");
      context.assertEquals(content, receivedMsgBody.getValue(AmqpConstants.BODY), "unexpected msg content");
      context.assertNotNull(msg.replyAddress(), "reply address was not set on the request");

      JsonObject replyBody = new JsonObject();
      replyBody.put(AmqpConstants.BODY, replyContent);

      msg.reply(replyBody);

      asyncRequest.complete();
    });
  });

  asyncRequest.awaitSuccess();
  asyncShutdown.awaitSuccess();
}
项目:vertx-amqp-bridge    文件:AmqpBridgeTest.java   
@Test(timeout = 20000)
public void testReplyHandlingDisabledProducer(TestContext context) throws Exception {
  Async asyncSend = context.async();
  Async asyncShutdown = context.async();

  String destinationName = getTestName();
  String content = "myStringContent" + destinationName;

  AmqpBridgeOptions options = new AmqpBridgeOptions().setReplyHandlingSupport(false);
  AmqpBridge bridge = AmqpBridge.create(vertx, options);
  bridge.start("localhost", getBrokerAmqpConnectorPort(), startResult -> {
    context.assertTrue(startResult.succeeded());

    MessageProducer<JsonObject> producer = bridge.createProducer(destinationName);

    JsonObject body = new JsonObject();
    body.put(AmqpConstants.BODY, content);

    // Try send with a reply handler, expect to fail.
    try {
      producer.<JsonObject> send(body, reply -> {
      });
      context.fail("Expected exception to be thrown");
    } catch (IllegalStateException ise) {
      // Expected.
    }

    // Try send without reply handler.
    producer.send(body);
    LOG.trace("Client sent msg");
    asyncSend.complete();

    LOG.trace("Shutting down");
    bridge.close(shutdownRes -> {
      LOG.trace("Shutdown complete");
      context.assertTrue(shutdownRes.succeeded());
      asyncShutdown.complete();
    });
  });

  asyncSend.awaitSuccess();
  asyncShutdown.awaitSuccess();
}
项目:vertx-amqp-bridge    文件:AmqpBridgeTest.java   
@Test(timeout = 20000)
public void testReplyToOriginalReply(TestContext context) {
  Async requestReceivedAsync = context.async();
  Async replyRecievedAsync = context.async();
  Async shutdownAsync = context.async();

  String destinationName = getTestName();
  String content = "myStringContent";
  String replyContent = "myStringReply";
  String replyToReplyContent = "myStringReplyToReply";

  AmqpBridge bridge = AmqpBridge.create(vertx);
  bridge.start("localhost", getBrokerAmqpConnectorPort(), startResult -> {
    context.assertTrue(startResult.succeeded());

    MessageProducer<JsonObject> producer = bridge.createProducer(destinationName);

    JsonObject body = new JsonObject();
    body.put(AmqpConstants.BODY, content);

    producer.<JsonObject> send(body, reply -> {
      LOG.trace("Sender got first reply");
      Message<JsonObject> replyMessage = reply.result();
      context.assertEquals(replyContent, replyMessage.body().getValue(AmqpConstants.BODY),
          "unexpected reply msg content");
      context.assertNotNull(replyMessage.address(), "address was not set on the reply");
      context.assertNotNull(replyMessage.replyAddress(), "reply address was not set on the reply");

      replyRecievedAsync.complete();

      JsonObject replyToReplyBody = new JsonObject();
      replyToReplyBody.put(AmqpConstants.BODY, replyToReplyContent);

      replyMessage.reply(replyToReplyBody);
    });
    LOG.trace("Client sent msg");

    MessageConsumer<JsonObject> consumer = bridge.createConsumer(destinationName);
    consumer.handler(msg -> {
      JsonObject receivedMsgBody = msg.body();
      LOG.trace("Receiver got request: " + receivedMsgBody);

      context.assertNotNull(receivedMsgBody, "expected msg body but none found");
      context.assertEquals(content, receivedMsgBody.getValue(AmqpConstants.BODY), "unexpected msg content");
      context.assertNotNull(msg.replyAddress(), "reply address was not set on the request");

      JsonObject replyBody = new JsonObject();
      replyBody.put(AmqpConstants.BODY, replyContent);

      msg.<JsonObject> reply(replyBody, replyToReply -> {
        LOG.trace("Receiver got reply to reply");
        Message<JsonObject> replyToReplyMessage = replyToReply.result();
        context.assertEquals(replyToReplyContent, replyToReplyMessage.body().getValue(AmqpConstants.BODY),
            "unexpected 2nd reply msg content");
        context.assertNull(replyToReplyMessage.replyAddress(), "reply address was unexpectedly set on 2nd reply");

        LOG.trace("Shutting down");
        bridge.close(shutdownRes -> {
          LOG.trace("Shutdown complete");
          context.assertTrue(shutdownRes.succeeded());
          shutdownAsync.complete();
        });
      });

      requestReceivedAsync.complete();
    });
  });

  requestReceivedAsync.awaitSuccess();
  replyRecievedAsync.awaitSuccess();
  shutdownAsync.awaitSuccess();
}
项目:usher    文件:EventBusInput.java   
public EventBusClosableWriteStream(MessageConsumer<Buffer> inputConsumer, MessageProducer<Buffer> outputPublisher) {
    this.inputConsumer = inputConsumer;
    this.outputPublisher = outputPublisher;
}
项目:vertx-amqp-bridge    文件:AmqpBridge.java   
/**
 * Creates a producer to the given AMQP address.
 *
 * This method MUST be called from the bridge Context thread, as used in the result handler callback from the start
 * methods. The bridge MUST be successfully started before the method is called.
 *
 * @param amqpAddress
 *          the address to produce to
 * @return the producer
 * @throws IllegalStateException
 *           if the bridge was not started or the method is invoked on a thread other than the bridge Context thread,
 *           as used in the result handler callback from the start methods.
 */
<T> MessageProducer<T> createProducer(String amqpAddress) throws IllegalStateException;