private byte[] readMessage() throws Exception { final CountDownLatch countDown = new CountDownLatch(1); final AtomicReference<byte[]> result = new AtomicReference<>(); Channel channel = sender.get().createChannel(); try { channel.basicConsume(sender.queue(), true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { result.set(body); countDown.countDown(); } }); countDown.await(5, TimeUnit.SECONDS); } finally { channel.close(); } return result.get(); }
SourceRecord sourceRecord(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) { Struct key = MessageConverter.key(basicProperties); Struct value = MessageConverter.value(consumerTag, envelope, basicProperties, bytes); final String topic = this.config.kafkaTopic.execute(RabbitMQSourceConnectorConfig.KAFKA_TOPIC_TEMPLATE, value); return new SourceRecord( ImmutableMap.of("routingKey", envelope.getRoutingKey()), ImmutableMap.of("deliveryTag", envelope.getDeliveryTag()), topic, null, key.schema(), key, value.schema(), value, null == basicProperties.getTimestamp() ? this.time.milliseconds() : basicProperties.getTimestamp().getTime() ); }
@Test public void envelope() { final Envelope input = new Envelope( 13246312L, true, "exchange", "routingKey" ); final Struct actual = MessageConverter.envelope(input); assertNotNull(actual, "actual should not be null."); assertField(input.getDeliveryTag(), actual, MessageConverter.FIELD_ENVELOPE_DELIVERYTAG); assertField(input.getExchange(), actual, MessageConverter.FIELD_ENVELOPE_EXCHANGE); assertField(input.getRoutingKey(), actual, MessageConverter.FIELD_ENVELOPE_ROUTINGKEY); assertField(input.isRedeliver(), actual, MessageConverter.FIELD_ENVELOPE_ISREDELIVER); }
@Override @SuppressWarnings("unchecked") public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // decode internal message InternalMessage msg = JSONs.decodeInternalMessage(body); // notify listener if (msg != null) { logger.debug("Communicator received: Received {} message for client {} user {}", msg.getMessageType(), msg.getClientId(), msg.getUserName()); switch (msg.getMessageType()) { case PUBLISH: listener.onPublish((InternalMessage<Publish>) msg); break; case DISCONNECT: listener.onDisconnect((InternalMessage<Disconnect>) msg); break; default: logger.warn("Communicator error: Communicator received unexpected message type {}", msg.getMessageType()); } } }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午2:57:32 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行多重绑定 for (String severity : routingKeys2) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, severity); System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME_ROUTING + ", queue:" + queueName + ", BindRoutingKey:" + severity); } System.out.println("ReceiveLogsDirect2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:53:18 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行绑定 for (String routingKey : routingKeys1) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, routingKey); System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME_ROUTING + "," + " queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsDirect1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月11日 下午5:32:45 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ地址 factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); // 创建一个新的连接 Connection connection = factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); // 声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Customer Waiting Received messages"); // DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { //envelope主要存放生产者相关信息(比如交换机、路由key等) //body是消息实体 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); } }; // 自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:40:52 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");// 对队列进行绑定 System.out.println("ReceiveLogs1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogs1 Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer);// 队列会自动删除 }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:08:40 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换器 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; // 绑定路由关键字 for (String bindingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, bindingKey); System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME_TOPIC+", queue:"+queueName+", BindRoutingKey:" + bindingKey); } System.out.println("ReceiveLogsTopic2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午3:06:20 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[] { "*.orange.*" }; // 绑定路由 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, routingKey); System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME_TOPIC + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsTopic1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * Configures the rabbitmq queues and the {@code shard-<id>-receive} consumer. * * Subclasses that want to change how messages are sent/received can override this method to disable the default implementation. * * @throws IOException if there's an error on {@link Channel#queueDeclare(String, boolean, boolean, boolean, Map)} or {@link Channel#basicConsume(String, boolean, Consumer)}. */ protected void configure() throws IOException { channel.queueDeclare("shard-" + shardId + "-send", true, false, false, null); channel.queueDeclare("shard-" + shardId + "-receive", true, false, false, null); channel.queueDeclare("shard-" + shardId + "-available", true, false, false, null); channel.queueDeclare("shard-" + shardId + "-unavailable", true, false, false, null); channel.basicConsume("shard-" + shardId + "-receive", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ClientWebSocketClient c = client.get(); if(c == null) return; JSONObject obj = new JSONObject(new String(body, StandardCharsets.UTF_8)); if(obj.has("t") && obj.getString("t").equals("gateway-ping-update")) { c.getJDA().setPing(obj.getJSONObject("d").getLong("ping")); return; } if(enableRawGatewayEvent) c.getJDA().getEventManager().handle(new RawGatewayEvent(c.getJDA(), new JSONObject(obj.toString()))); c.handleEvent(obj); } }); }
/** * 接受消息 */ public void receiveQuickstartMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); AMQP.Queue.DeclareOk declareOk = channel.getChannel().queueDeclare(QUICKSTART_QUEUE_NAME, false, false, false, null); System.out.println("等待接受队列【" + QUICKSTART_QUEUE_NAME + "】消息"); //建立一个消费者 监听消息的接受 Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + message); } }; channel.getChannel().basicConsume(QUICKSTART_QUEUE_NAME, true, consumer); //channel.close(); }
/** * 调用接口 * * @param message 消息内容 不能为空 */ public void client(String message) throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); String replyQueueName = channel.getChannel().queueDeclare().getQueue(); String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); System.out.println("rpc客户端发送消息:" + message); channel.getChannel().basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8")); channel.getChannel().basicConsume(replyQueueName, true, new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { System.out.println("rpc客户端收到结果:" + new String(body, "UTF-8") + "\n"); } } }); }
/** * 接受消息 */ public void receivePubsubMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.getChannel().queueDeclare().getQueue(); channel.getChannel().queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接受pusub订阅【" + EXCHANGE_NAME + "】消息"); System.out.println("选择队列:"+queueName); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + message + "'"); } }; channel.getChannel().basicConsume(queueName, true, consumer); }
/** * 接受主题消息 */ public void receiveTopicMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().exchangeDeclare(TOPIC, "topic"); String queueName = channel.getChannel().queueDeclare().getQueue(); channel.getChannel().queueBind(queueName, TOPIC, "bindingKey"); System.out.println("等待接受topic主题【" + TOPIC + "】消息"); System.out.println("选择队列:" + queueName); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.getChannel().basicConsume(queueName, true, consumer); }
@Test public void filterMessage() throws Exception { RabbitMqMessageScheme rabbitMqMessageScheme = new SingleStreamRabbitMqMessageScheme() { @Override public void prepare(Map config, TopologyContext context) { // no operation } @Override public List<Object> convertToTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws Exception { return null; } @Override public Fields getOutputFields() { return new Fields("stringField"); } @Override public void cleanup() { // no operation } }; StreamedTuple streamedTuple = rabbitMqMessageScheme.convertToStreamedTuple(null, null, null); assertNull(streamedTuple); }
@Test public void messageFiltered() throws Exception { RabbitMqSpout rabbitMqSpout = new RabbitMqSpout(rabbitMqChannelProvider, new EmptyRabbitMqMessageScheme()); rabbitMqSpout.open(MINIMUM_CONF, mockTopologyContext, mockSpoutOutputCollector); rabbitMqSpout.activate(); long messageId = 435; Envelope envelope = new Envelope(messageId, false, null, null); AutorecoverableQueueingConsumer mockQueueingConsumer = mock(AutorecoverableQueueingConsumer.class); RabbitMqMessage message = new RabbitMqMessage(envelope, null, null); when(mockQueueingConsumer.nextMessage(anyLong())).thenReturn(message, new RabbitMqMessage[]{null}); rabbitMqSpout.queueingConsumer = mockQueueingConsumer; rabbitMqSpout.nextTuple(); rabbitMqSpout.close(); verify(mockChannel, times(1)).basicAck(messageId, false); verify(rabbitMqChannelProvider, times(1)).cleanup(); }
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Serializer serializer = subscriber.getSerializer(); // Check if we have a serializer, otherwise just ignore the message if (serializer != null) { try { Message message = serializer.deserialize(body); this.getChannel().basicAck(envelope.getDeliveryTag(), false); subscriber.receive(message); } catch (ClassNotFoundException e) { // Reject the message since we cannot do anything useful with it this.getChannel().basicNack(envelope.getDeliveryTag(), false, false); subscriber.getLogger().warn("Received message of unknown type, message dropped", e); } } }
@POST @Path("rabbitmqRecv") public void send() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties.getHeaders()); String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }
GabrielGateway(int shardId, Channel channel) throws IOException { super(shardId, channel); channel.queueDeclare("shard-" + shardId + "-getping", false, false, false, null); channel.queueDeclare("shard-" + shardId + "-getping-response", false, false, false, null); channel.basicConsume("shard-" + shardId + "-getping", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { channel.basicPublish("", "shard-" + shardId + "-getping-response", null, body); } }); }
public static synchronized void init(Channel channel) throws IOException, TimeoutException { if(current != null) { throw new IllegalStateException("Already started"); } current = new GatewayInfo("unknown", "unknown", -1, -1, -1, -1, -1, -1, -1); channel.queueDeclare("gateway-info", false, false, false, null); channel.basicConsume("gateway-info", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { JSONObject object = new JSONObject(new String(body, StandardCharsets.UTF_8)); JSONObject ram = object.getJSONObject("ram"); try { current = new GatewayInfo( object.getString("version"), object.getString("jda-version"), object.getDouble("cpu-usage"), object.getInt("thread-count"), object.getLong("uptime"), ram.getLong("used"), ram.getLong("free"), ram.getLong("total"), ram.getLong("max") ); } catch(JSONException e) { GabrielBot.LOGGER.error("Error creating GatewayInfo: " + e.getMessage()); } } }); }
public GabrielGatewayClient(int shardId, Channel channel) throws IOException { super(shardId, channel, true); channel.queueDeclare("shard-" + shardId + "-getping", false, false, false, null); channel.queueDeclare("shard-" + shardId + "-getping-response", false, false, false, null); channel.basicConsume("shard-" + shardId + "-getping-response", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long now = System.currentTimeMillis(); ping = now - Longs.fromByteArray(body); } }); calculatePing(); PING_CALCULATOR.scheduleAtFixedRate(this::calculatePing, 30, 30, TimeUnit.SECONDS); }
protected void connect() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); channel = connection.createChannel(); String queueName = "flowing-retail-" + name; channel.queueDeclare(queueName, true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // publish/subscribe model channel.queueBind(queueName, EXCHANGE_NAME, "*"); System.out.println(" [*] Waiting for messages."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); eventHandler.handleEvent(message); } }; channel.basicConsume(queueName, true, consumer); }
@Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { if(body != null) { try { // resolve the returned message long deliveryTag = envelope.getDeliveryTag(); String message = new String(body, "UTF-8"); LOG.info("Task received: consumerTag=" + consumerTag + ", deliveryTag=" + deliveryTag + ", message=" + message); JSONObject taskReq = JSONObject.parseObject(message); if(!taskReq.isEmpty()) { long id = messageIdGenerator.incrementAndGet(); WaitingTask task = new WaitingTask(id, taskReq, getChannel(), deliveryTag); waitingTasks.putLast(task); LOG.info("Add task to waiting queue: " + task); } } catch (Exception e) { e.printStackTrace(); } } }
private void handleDelivery(final String queueName, final Envelope envelope, final byte[] body) { final Ternary<String, Channel, EventSubscriber> queueDetails = s_subscribers.get(queueName); if (queueDetails != null) { final EventSubscriber subscriber = queueDetails.third(); final String routingKey = envelope.getRoutingKey(); final String eventSource = getEventSourceFromRoutingKey(routingKey); final String eventCategory = getEventCategoryFromRoutingKey(routingKey); final String eventType = getEventTypeFromRoutingKey(routingKey); final String resourceType = getResourceTypeFromRoutingKey(routingKey); final String resourceUUID = getResourceUUIDFromRoutingKey(routingKey); final Event event = new Event(eventSource, eventCategory, eventType, resourceType, resourceUUID); event.setDescription(new String(body)); // deliver the event to call back object provided by subscriber subscriber.onEvent(event); } }
private void consume(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) { final Message rawMessage = Message.builder() .body(body) .contentType(properties.getContentType()) .encoding(properties.getContentEncoding()) .type(properties.getType()) .build(); final T message = deserialize(rawMessage); final Consumer<Void> onAck = ackAction(envelope.getDeliveryTag()); final MessageContext<T> messageContext = new RabbitMessageContext<>( message, consumerTag, envelope.getExchange(), envelope.getRoutingKey(), properties, onAck); logDelivery(messageContext); consumer.accept(messageContext); }
@Test public void testConnectionFactory() throws Exception { Assert.assertNotNull(connectionFactory1); Assert.assertNotNull(queue); RabbitmqConnection connection = connectionFactory1.getConnection(); Assert.assertNotNull(connection); String queueName = "testing"; Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); String message = "Hello World!"; final CountDownLatch counter = new CountDownLatch(1); Consumer consume = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { Assert.assertEquals("Hello World!", new String(body)); counter.countDown(); } }; channel.basicConsume(queueName, true, consume); channel.basicPublish("", queueName, null, message.getBytes()); counter.await(10, TimeUnit.SECONDS); Assert.assertEquals(0, counter.getCount()); channel.close(); }
public void setup() throws Exception { if (log.isTraceEnabled()) { log.trace("setup()"); } this.consumer = new DefaultConsumer(this.channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { MessageEndpoint localEndpoint; try { localEndpoint = endpointFactory.createEndpoint(null); RabbitmqBytesMessage m = new RabbitmqBytesMessage(consumerTag,envelope,properties,body); onMessage(localEndpoint, m); } catch (UnavailableException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; if ("javax.jms.Queue".equals(this.spec.getDestinationType())) { RabbitmqAdminQueueImpl queue = Util.lookup(new InitialContext(), this.spec.getDestination(), RabbitmqAdminQueueImpl.class); this.channel.basicConsume(queue.getDestinationAddress(),true, consumer); } }
@Override public void handleDelivery(String consumer_Tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long tag = envelope.getDeliveryTag(); if (envelope.isRedeliver() && (recoveredTags.contains(tag) || pendingAck.contains(tag))) { if (recoveredTags.contains(tag)) { pendingAck.add(tag); } return; } // Acknowledgements are sent at the end of the window after adding to idempotency manager pendingAck.add(tag); holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body)); logger.debug("Received Async message: {} buffersize: {} ", new String(body), holdingBuffer.size()); }
/** * Handles a message delivery from the broker. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { LOG.debug("Consumer: {} Received handle delivery", consumerTag); Message message = new Message(properties, body, envelope.getExchange(), envelope.getRoutingKey(), envelope.getDeliveryTag()); LOG.info("Consumer: {} Received message: {}", consumerTag, envelope.getDeliveryTag()); handleMessage(message); _channel.basicAck(envelope.getDeliveryTag(), true); }
@Test public void testMessageCannotBeProcessedBeforeSubmit() throws IOException { long deliveryTag = 1234L; Envelope envelope = mock(Envelope.class); when(envelope.getDeliveryTag()).thenReturn(deliveryTag); when(mockBrokerConfig.getCharset()).thenThrow( new RuntimeException("Something really unexpected happened even before task submit attempt")); try { amqpMessageConsumer.handleDelivery("consumer tag", envelope, null, "some message".getBytes()); verify(amqpAcknowledgementHandler, times(1)).autoReject(); } catch (Exception e) { fail(); } }
@Test public void testRejectFailed() throws IOException { long deliveryTag = 1234L; Envelope envelope = mock(Envelope.class); when(envelope.getDeliveryTag()).thenReturn(deliveryTag); doThrow(new RejectedExecutionException()).when(mockMessageHandler).onMessage(anyString(), any()); doThrow(new RuntimeException()).when(mockChannel).basicReject(eq(deliveryTag), anyBoolean()); try { amqpMessageConsumer.handleDelivery("consumer tag", envelope, null, "some message".getBytes()); verify(amqpAcknowledgementHandler, times(1)).autoReject(); } catch (Exception e) { fail(); } }
@Override public void beginSubscriptionThread() throws InterruptedException, IOException { DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); LOG.info("[RabbitMQEventConsumer {}] Received Message: {}", Thread.currentThread().getId(), new String(body)); Gson gson = new Gson(); @SuppressWarnings("unchecked") final CoordinationEntryEvent<K> event = gson.fromJson(new String(body), CoordinationEntryEvent.class); routeEventToListeners(event); channel.basicAck(deliveryTag, false); } }; consumerTag = channel.basicConsume(queueName, false, consumer); }
@Test public void testConsumerSingleMessage() throws Exception { TransferQueue<RabbitMessage> messages = new LinkedTransferQueue<>(); Channel channel = mock(Channel.class); final Consumer consumer = new StreamSetsMessageConsumer(channel, messages); final Envelope envelope = new Envelope(1L, false, EXCHANGE_NAME, QUEUE_NAME); executor.submit(new Runnable() { @Override public void run() { try { consumer.handleDelivery("consumerTag", envelope, null, TEST_MESSAGE_1.getBytes()); } catch (IOException ignored) { // no op } } }); RabbitMessage message = messages.take(); assertEquals(TEST_MESSAGE_1, new String(message.getBody(), StandardCharsets.UTF_8)); }
public Channel addMsmbHandler(String routingKey, final MsmbMessageHandler handler) { Channel channel; try { channel = this.connection.createChannel(); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, MSMB_EXCHANGE, routingKey); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); MsmbMessage msmbMessage = MessageBusClient.this.adaptor.buildResource(message, MsmbMessage.class); handler.handleMessage(msmbMessage); } }); } catch (IOException e) { throw new SDKMessageBusException(SDKErrorEnum.messageBusConnectionError, "Could not subscribe to Metric Streaming Message Bus", e); } return channel; }
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties props, final byte[] body) throws IOException { String message = "InvalidMessage:none"; //TODO Requires cleanup try { message = new String(body, "UTF-8"); handler.post(new PostMessageTask(LibretalkMessageData.deserialize(message))); } catch (UnsupportedEncodingException ex) { ex.printStackTrace(); } connection.getChannel().basicAck(envelope.getDeliveryTag(), false); Log.d("libretalk::LibretalkMessageReceiver::MessageConsumer", "Received Msg" + message ); }
@Override public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body ) throws IOException { try { Message message = SerializationUtils.deserializeObject( body ); this.logger.finer( this.sourceName + " received a message " + message.getClass().getSimpleName() + " on routing key '" + envelope.getRoutingKey() + "'."); this.messageQueue.add( message ); } catch( ClassNotFoundException | IOException e ) { this.logger.severe( this.sourceName + ": a message could not be deserialized. => " + e.getClass().getSimpleName()); Utils.logException( this.logger, e ); this.messageQueue.errorWhileReceivingMessage(); } }
@Override protected void doInBeforeTrace(SpanRecorder recorder, Object target, Object[] args) { AMQP.BasicProperties properties = (AMQP.BasicProperties) args[2]; Map<String, Object> headers = properties.getHeaders(); Envelope envelope = (Envelope) args[1]; String exchange=envelope.getExchange(); if (exchange == null || exchange.equals("")) exchange = "unknown"; recorder.recordServiceType(RabbitMQConstants.RABBITMQ_SERVICE_TYPE); recorder.recordEndPoint("exchange:"+exchange); if (headers != null) { Object parentApplicationName = headers.get(RabbitMQConstants.META_PARENT_APPLICATION_NAME); Object parentApplicationType = headers.get(RabbitMQConstants.META_PARENT_APPLICATION_TYPE); if (parentApplicationName != null) { recorder.recordParentApplication(parentApplicationName.toString(), NumberUtils.parseShort(parentApplicationType.toString(), ServiceType.UNDEFINED.getCode())); } } recorder.recordRpcName("rabbitmq://exchange="+exchange); recorder.recordAcceptorHost("exchange-" + exchange); if (isDebug) logger.debug("endPoint=" + envelope.getExchange() + ",=" + exchange); }
@Override protected void doInAfterTrace(SpanRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) { DefaultConsumer consumer = (DefaultConsumer) target; Connection connection = consumer.getChannel().getConnection(); Envelope envelope = (Envelope) args[1]; AMQP.BasicProperties properties = (AMQP.BasicProperties) args[2]; byte[] body = (byte[]) args[3]; String exchange=envelope.getExchange(); if (exchange == null || exchange.equals("")) exchange = "unknown"; recorder.recordApi(methodDescriptor); recorder.recordAttribute(RabbitMQConstants.RABBITMQ_ROUTINGKEY_ANNOTATION_KEY, envelope.getRoutingKey()); recorder.recordRemoteAddress(connection.getAddress().getHostAddress() + ":" + connection.getPort()); if (throwable != null) { recorder.recordException(throwable); } }