static Scope buildChildSpan(AMQP.BasicProperties props, Tracer tracer) { SpanContext context = TracingUtils.extract(props, tracer); if (context != null) { Tracer.SpanBuilder spanBuilder = tracer.buildSpan("receive") .ignoreActiveSpan() .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER); spanBuilder.addReference(References.FOLLOWS_FROM, context); Scope scope = spanBuilder.startActive(true); SpanDecorator.onResponse(scope.span()); return scope; } return null; }
private AMQP.BasicProperties inject(AMQP.BasicProperties properties, Span span) { // Headers of AMQP.BasicProperties is unmodifiableMap therefore we build new AMQP.BasicProperties // with injected span context into headers Map<String, Object> headers = new HashMap<>(); tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new HeadersMapInjectAdapter(headers)); if (properties == null) { return new AMQP.BasicProperties().builder().headers(headers).build(); } if (properties.getHeaders() != null) { headers.putAll(properties.getHeaders()); } return properties.builder() .headers(headers) .build(); }
public ListenableFuture<?> publishAsync(final Exchange exchange, final Message message, final @Nullable BasicProperties properties, final @Nullable Publish publish) { // NOTE: Serialization must happen synchronously, because getter methods may not be thread-safe final String payload = gson.toJson(message); final AMQP.BasicProperties finalProperties = getProperties(message, properties); final Publish finalPublish = Publish.forMessage(message, publish); if(this.executorService == null) throw new IllegalStateException("Not connected"); return this.executorService.submit(new Runnable() { @Override public void run() { try { publish(exchange, payload, finalProperties, finalPublish); } catch(Throwable e) { logger.log(Level.SEVERE, "Unhandled exception publishing message type " + finalProperties.getType(), e); } } }); }
SourceRecord sourceRecord(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) { Struct key = MessageConverter.key(basicProperties); Struct value = MessageConverter.value(consumerTag, envelope, basicProperties, bytes); final String topic = this.config.kafkaTopic.execute(RabbitMQSourceConnectorConfig.KAFKA_TOPIC_TEMPLATE, value); return new SourceRecord( ImmutableMap.of("routingKey", envelope.getRoutingKey()), ImmutableMap.of("deliveryTag", envelope.getDeliveryTag()), topic, null, key.schema(), key, value.schema(), value, null == basicProperties.getTimestamp() ? this.time.milliseconds() : basicProperties.getTimestamp().getTime() ); }
GabrielGateway(int shardId, Channel channel) throws IOException { super(shardId, channel); channel.queueDeclare("shard-" + shardId + "-getping", false, false, false, null); channel.queueDeclare("shard-" + shardId + "-getping-response", false, false, false, null); channel.basicConsume("shard-" + shardId + "-getping", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { channel.basicPublish("", "shard-" + shardId + "-getping-response", null, body); } }); }
@Override @SuppressWarnings("unchecked") public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // decode internal message InternalMessage msg = JSONs.decodeInternalMessage(body); // notify listener if (msg != null) { logger.debug("Communicator received: Received {} message for client {} user {}", msg.getMessageType(), msg.getClientId(), msg.getUserName()); switch (msg.getMessageType()) { case PUBLISH: listener.onPublish((InternalMessage<Publish>) msg); break; case DISCONNECT: listener.onDisconnect((InternalMessage<Disconnect>) msg); break; default: logger.warn("Communicator error: Communicator received unexpected message type {}", msg.getMessageType()); } } }
/** * @param args * @throws TimeoutException * @throws IOException * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException */ public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("RPCServer Awating RPC request"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println("RPCServer fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午2:57:32 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行多重绑定 for (String severity : routingKeys2) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, severity); System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME_ROUTING + ", queue:" + queueName + ", BindRoutingKey:" + severity); } System.out.println("ReceiveLogsDirect2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:53:18 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行绑定 for (String routingKey : routingKeys1) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, routingKey); System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME_ROUTING + "," + " queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsDirect1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
public static synchronized void init(Channel channel) throws IOException, TimeoutException { if(current != null) { throw new IllegalStateException("Already started"); } current = new GatewayInfo("unknown", "unknown", -1, -1, -1, -1, -1, -1, -1); channel.queueDeclare("gateway-info", false, false, false, null); channel.basicConsume("gateway-info", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { JSONObject object = new JSONObject(new String(body, StandardCharsets.UTF_8)); JSONObject ram = object.getJSONObject("ram"); try { current = new GatewayInfo( object.getString("version"), object.getString("jda-version"), object.getDouble("cpu-usage"), object.getInt("thread-count"), object.getLong("uptime"), ram.getLong("used"), ram.getLong("free"), ram.getLong("total"), ram.getLong("max") ); } catch(JSONException e) { GabrielBot.LOGGER.error("Error creating GatewayInfo: " + e.getMessage()); } } }); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:40:52 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");// 对队列进行绑定 System.out.println("ReceiveLogs1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogs1 Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer);// 队列会自动删除 }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:08:40 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换器 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; // 绑定路由关键字 for (String bindingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, bindingKey); System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME_TOPIC+", queue:"+queueName+", BindRoutingKey:" + bindingKey); } System.out.println("ReceiveLogsTopic2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午3:06:20 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[] { "*.orange.*" }; // 绑定路由 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, routingKey); System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME_TOPIC + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsTopic1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * Configures the rabbitmq queues and the {@code shard-<id>-receive} consumer. * * Subclasses that want to change how messages are sent/received can override this method to disable the default implementation. * * @throws IOException if there's an error on {@link Channel#queueDeclare(String, boolean, boolean, boolean, Map)} or {@link Channel#basicConsume(String, boolean, Consumer)}. */ protected void configure() throws IOException { channel.queueDeclare("shard-" + shardId + "-send", true, false, false, null); channel.queueDeclare("shard-" + shardId + "-receive", true, false, false, null); channel.queueDeclare("shard-" + shardId + "-available", true, false, false, null); channel.queueDeclare("shard-" + shardId + "-unavailable", true, false, false, null); channel.basicConsume("shard-" + shardId + "-receive", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ClientWebSocketClient c = client.get(); if(c == null) return; JSONObject obj = new JSONObject(new String(body, StandardCharsets.UTF_8)); if(obj.has("t") && obj.getString("t").equals("gateway-ping-update")) { c.getJDA().setPing(obj.getJSONObject("d").getLong("ping")); return; } if(enableRawGatewayEvent) c.getJDA().getEventManager().handle(new RawGatewayEvent(c.getJDA(), new JSONObject(obj.toString()))); c.handleEvent(obj); } }); }
/** * 接受消息 */ public void receiveQuickstartMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); AMQP.Queue.DeclareOk declareOk = channel.getChannel().queueDeclare(QUICKSTART_QUEUE_NAME, false, false, false, null); System.out.println("等待接受队列【" + QUICKSTART_QUEUE_NAME + "】消息"); //建立一个消费者 监听消息的接受 Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + message); } }; channel.getChannel().basicConsume(QUICKSTART_QUEUE_NAME, true, consumer); //channel.close(); }
/** * 调用接口 * * @param message 消息内容 不能为空 */ public void client(String message) throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); String replyQueueName = channel.getChannel().queueDeclare().getQueue(); String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); System.out.println("rpc客户端发送消息:" + message); channel.getChannel().basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8")); channel.getChannel().basicConsume(replyQueueName, true, new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { System.out.println("rpc客户端收到结果:" + new String(body, "UTF-8") + "\n"); } } }); }
/** * 接受消息 */ public void receivePubsubMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.getChannel().queueDeclare().getQueue(); channel.getChannel().queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接受pusub订阅【" + EXCHANGE_NAME + "】消息"); System.out.println("选择队列:"+queueName); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + message + "'"); } }; channel.getChannel().basicConsume(queueName, true, consumer); }
/** * 接受主题消息 */ public void receiveTopicMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().exchangeDeclare(TOPIC, "topic"); String queueName = channel.getChannel().queueDeclare().getQueue(); channel.getChannel().queueBind(queueName, TOPIC, "bindingKey"); System.out.println("等待接受topic主题【" + TOPIC + "】消息"); System.out.println("选择队列:" + queueName); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息:" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.getChannel().basicConsume(queueName, true, consumer); }
@Test public void filterMessage() throws Exception { RabbitMqMessageScheme rabbitMqMessageScheme = new SingleStreamRabbitMqMessageScheme() { @Override public void prepare(Map config, TopologyContext context) { // no operation } @Override public List<Object> convertToTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws Exception { return null; } @Override public Fields getOutputFields() { return new Fields("stringField"); } @Override public void cleanup() { // no operation } }; StreamedTuple streamedTuple = rabbitMqMessageScheme.convertToStreamedTuple(null, null, null); assertNull(streamedTuple); }
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Serializer serializer = subscriber.getSerializer(); // Check if we have a serializer, otherwise just ignore the message if (serializer != null) { try { Message message = serializer.deserialize(body); this.getChannel().basicAck(envelope.getDeliveryTag(), false); subscriber.receive(message); } catch (ClassNotFoundException e) { // Reject the message since we cannot do anything useful with it this.getChannel().basicNack(envelope.getDeliveryTag(), false, false); subscriber.getLogger().warn("Received message of unknown type, message dropped", e); } } }
public int send(String mqName, JSONObject message, String replyMq){ /*Used when you want to send a new msg to the specified MQ *Side effect: correlation Id and replyMq name will be stored in * this object only */ try { if(channel==null) connectToBroker(); String routingKey = mqName; AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .replyTo(replyMq) .build(); channel.basicPublish(defaultExchange, routingKey, props, message.toString().getBytes("UTF-8")); replyMqName = replyMq; } catch(Exception e){ e.printStackTrace(); return 0; } return 1; }
public int sendWithHeader(String mqName, JSONObject message, String replyMq, Map<String, Object> headersMap){ /*Used when you want to send a new msg to the specified MQ with header-fields set in the msg headers *Side effect: correlation Id and replyMq name will be stored in * this object only */ try { if(channel==null) connectToBroker(); String routingKey = mqName; AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .replyTo(replyMq) .headers(headersMap) .build(); channel.basicPublish(defaultExchange, routingKey, props, message.toString().getBytes("UTF-8")); replyMqName = replyMq; } catch(Exception e){ e.printStackTrace(); return 0; } return 1; }
@POST @Path("rabbitmqRecv") public void send() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties.getHeaders()); String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Scope child = TracingUtils.buildChildSpan(properties, tracer); try { consumer.handleDelivery(consumerTag, envelope, properties, body); } finally { if (child != null) { child.close(); } } }
public static SpanContext extract(AMQP.BasicProperties props, Tracer tracer) { SpanContext spanContext = tracer .extract(Format.Builtin.TEXT_MAP, new HeadersMapExtractAdapter(props.getHeaders())); if (spanContext != null) { return spanContext; } Span span = tracer.activeSpan(); if (span != null) { return span.context(); } return null; }
@Override public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException { try (Scope scope = buildSpan(exchange, props)) { AMQP.BasicProperties properties = inject(props, scope.span()); channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body); } }
private Scope buildSpan(String exchange, AMQP.BasicProperties props) { Tracer.SpanBuilder spanBuilder = tracer.buildSpan("send") .ignoreActiveSpan() .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER); SpanContext spanContext = null; if (props != null && props.getHeaders() != null) { // just in case if span context was injected manually to props in basicPublish spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new HeadersMapExtractAdapter(props.getHeaders())); } if (spanContext == null) { Span parentSpan = tracer.activeSpan(); if (parentSpan != null) { spanContext = parentSpan.context(); } } if (spanContext != null) { spanBuilder.asChildOf(spanContext); } Scope scope = spanBuilder.startActive(true); SpanDecorator.onRequest(exchange, scope.span()); return scope; }
static Struct value(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) { return new Struct(SCHEMA_VALUE) .put(FIELD_MESSAGE_CONSUMERTAG, consumerTag) .put(FIELD_MESSAGE_ENVELOPE, envelope(envelope)) .put(FIELD_MESSAGE_BASICPROPERTIES, basicProperties(basicProperties)) .put(FIELD_MESSAGE_BODY, body); }
@Override @SuppressWarnings("unchecked") public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // decode internal message InternalMessage msg = JSONs.decodeInternalMessage(body); // notify listener if (msg != null) { logger.debug("Communicator received: Received {} message from broker {} for client {} user {}", msg.getMessageType(), msg.getBrokerId(), msg.getClientId(), msg.getUserName()); switch (msg.getMessageType()) { case CONNECT: this.listener.onConnect((InternalMessage<Connect>) msg); break; case PUBLISH: this.listener.onPublish((InternalMessage<Publish>) msg); break; case SUBSCRIBE: this.listener.onSubscribe((InternalMessage<Subscribe>) msg); break; case UNSUBSCRIBE: this.listener.onUnsubscribe((InternalMessage<Unsubscribe>) msg); break; case DISCONNECT: this.listener.onDisconnect((InternalMessage<Disconnect>) msg); break; default: logger.warn("Communicator error: Communicator received unexpected message type {}", msg.getMessageType()); } } }
public String call(String message) throws IOException, InterruptedException { String response; String corrID = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrID).replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrID)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月11日 下午5:55:38 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("Worker1 Waiting for messages"); // 每次从队列获取的数量 //channel.basicQos(1);保证一次只分发一个 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Worker1 Received '" + message + "'"); try { //throw new Exception(); doWork(message); } catch (Exception e) { channel.abort(); } finally { System.out.println("Worker1 Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; //autoAck是否自动回复, //如果为true的话,每次生产者只要发送信息就会从内存中删除, //那么如果消费者程序异常退出,那么就无法获取数据, //我们当然是不希望出现这样的情况,所以才去手动回复, //每当消费者收到并处理信息然后在通知生成者。 //最后从队列中删除这条信息。 //如果消费者异常退出,如果还有其他消费者, //那么就会把队列中的消息发送给其他消费者, //如果没有,等消费者启动时候再次发送。 boolean autoAck = false; // 消息消费完成确认 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月11日 下午5:55:38 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("Worker2 Waiting for messages"); // 每次从队列获取的数量 //channel.basicQos(1);保证一次只分发一个 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Worker2 Received '" + message + "'"); try { //throw new Exception(); doWork(message); } catch (Exception e) { channel.abort(); } finally { System.out.println("Worker2 Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; //autoAck是否自动回复, //如果为true的话,每次生产者只要发送信息就会从内存中删除, //那么如果消费者程序异常退出,那么就无法获取数据, //我们当然是不希望出现这样的情况,所以才去手动回复, //每当消费者收到并处理信息然后在通知生成者。 //最后从队列中删除这条信息。 //如果消费者异常退出,如果还有其他消费者, //那么就会把队列中的消息发送给其他消费者, //如果没有,等消费者启动时候再次发送。 boolean autoAck = false; // 消息消费完成确认 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); }
/** * Method to exchange a message to another service * @param data * @param routingKey * @return */ public String rabbitRPCRoutingKeyExchange(byte[] data, String routingKey){ this.corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(this.corrId).replyTo(replyQueueName).build(); try { channel.basicPublish(this.exchange, routingKey, props, data); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { boolean b = response.offer(new String(body, ENCODE)); Log .forContext("responseStatus",b) .forContext("Service", "web-service") .information("rabbit message handled status "); } } }); return response.take(); } catch (Exception e) { Log .forContext("MemberName", "rabbitRPCRoutingKeyExchange") .forContext("Service", "web-service") .error(e,"Exception"); } return null; }
/** * 发送消息 * * @param message 消息内容 不能为空 */ public void sendQuickstartMessage(String message) throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); System.out.println(channel); AMQP.Queue.DeclareOk declareOk = channel.getChannel().queueDeclare(QUICKSTART_QUEUE_NAME, false, false, false, null); channel.getChannel().basicPublish("", QUICKSTART_QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送成功:" + message); channel.close(); }
/** * 服务端开启服务 */ public void service() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.getChannel().basicQos(1); System.out.println("等待rpc客户端连接..."); Consumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body, "UTF-8"); System.out.println("服务端接受到消息:" + message); response = message + UUID.randomUUID().toString(); } catch (RuntimeException e) { e.printStackTrace(); } finally { channel.getChannel().basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.getChannel().basicAck(envelope.getDeliveryTag(), false); System.out.println("服务端将处理结果:" + response + ",返回客户单\n"); } } }; channel.getChannel().basicConsume(RPC_QUEUE_NAME, false, consumer); }
/** * 接受工作队列消息 */ public void receiveWorkQueueMessage() throws IOException, TimeoutException { RabbitMQChannel channel = new RabbitMQChannel().channel(); channel.getChannel().queueDeclare(WORK_QUEUE_NAME, true, false, false, null); channel.getChannel().basicQos(1); System.out.println("等待接受workQueue队列【"+WORK_QUEUE_NAME+"】消息"); //建立一个消费者 监听消息的接受 DefaultConsumer consumer = new DefaultConsumer(channel.getChannel()) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受workQueue消息:" + message); try { for (char ch : message.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } finally { getChannel().basicAck(envelope.getDeliveryTag(), false); } } }; channel.getChannel().basicConsume(WORK_QUEUE_NAME, false, consumer); }