public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("localhost"); factory.setPort(5672); Connection newConnection = factory.newConnection(); Channel channel = newConnection.createChannel(); Scanner scanner = new Scanner(; String message = ""; while(!message.equals("exit")){ System.out.println("Enter your message"); message =; channel.queueDeclare("flink-test", true, false, false, null); channel.basicPublish("", "flink-test", new BasicProperties.Builder() .correlationId(java.util.UUID.randomUUID().toString()).build(), message.getBytes()); } scanner.close(); channel.close(); newConnection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String msg = getMessage(argv); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("dd MMM yyyy @ HH:mm:ss"); String sDate = sdf.format(date); String finalMsg = sDate + ": " + msg; channel.basicPublish(EXCHANGE_NAME, "", null, finalMsg.getBytes("UTF-8")); System.out.println("Emmited message: " + finalMsg); channel.close(); conn.close(); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月11日 下午5:53:02 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 分发信息 for (int i = 0; i < 20; i++) { String message = "Hello RabbitMQ" + i; channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("NewTask send '" + message + "'"); } channel.close(); connection.close(); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:49:49 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct");// 注意是direct // 发送信息 for (String routingKey : routingKeys) { String message = "RoutingSendDirect Send the message level:" + routingKey; channel.basicPublish(EXCHANGE_NAME_ROUTING, routingKey, null, message.getBytes()); System.out.println("RoutingSendDirect Send" + routingKey + "':'" + message); } channel.close(); connection.close(); }
private void close(Connection connection, Channel channel) { try { if (channel != null && channel.isOpen()) { if (this.consumerTag != null) { channel.basicCancel(this.consumerTag); this.consumerTag = null; }"Closing RabbitMQ Channel - " + this.serverIP); channel.close(); = null; } if (connection != null && connection.isOpen()) {"Closing RabbitMQ Connection - " + this.serverIP); connection.close(CLOSE_CONNECTION_TIMEOUT); this.connection = null; } } catch (Exception e) { log.error("Failed to close RabbitMQ connections " + this.serverIP, e); } }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // factory.setHost(""); factory.setUri("amqp://"); factory.setUsername("admin"); // factory.setPassword("admin123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:37:37 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// fanout表示分发,所有的消费者得到同样的队列信息 // 分发信息 for (int i = 0; i < 5; i++) { String message = "Hello World" + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("EmitLog Sent '" + message + "'"); } channel.close(); connection.close(); }
private void updateTaskStatus(TaskStatus status) {"[Study = " + taskStudy + "] [Unit = "+ unitId + "] Sending task update to server. Task id = [" + task.getId() + "] status = ["+status.toString()+"]"); final String QUEUE_NAME = SystemConstants.UBONGO_SERVER_TASKS_STATUS_QUEUE; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(serverAddress); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); task.setStatus(status); RabbitData message = new RabbitData(task, MachineConstants.UPDATE_TASK_REQUEST); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); if (logger.isDebugEnabled()) { logger.debug(" [!] Sent '" + message.getMessage() + "'"); } channel.close(); connection.close(); } catch (Exception e){ logger.error("[Study = " + taskStudy + "] [Unit = "+ unitId + "] Failed sending task status to server. Task id = [" + task.getId() + "] Status = [" + status.toString() + "] error: " + e.getMessage(), e); } }
public static void main(String[] args) throws Exception { String queueName = "TestQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost(""); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); System.out.println(" [*] Waiting for messages..."); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }
public void start(Connection connection) { try { active = true; Channel channel = connection.createChannel(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume("trade.eq.q", false, consumer); while (active) { QueueingConsumer.Delivery msg = consumer.nextDelivery(); System.out.println("message received: " + new String(msg.getBody())); Thread.sleep(1000); channel.basicAck(msg.getEnvelope().getDeliveryTag(), false); } channel.close(); } catch (Exception e) { e.printStackTrace(); } }
/** * Helper method to retrieve queue message from rabbitMQ * * @return result * @throws Exception */ private static String consumeWithoutCertificate() throws Exception { String result = ""; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5671); factory.useSslProtocol(); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); GetResponse chResponse = channel.basicGet("WithoutClientCertQueue", true); if(chResponse != null) { byte[] body = chResponse.getBody(); result = new String(body); } channel.close(); conn.close(); return result; }
public static void main(String[] args) throws Exception { //建立连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置连接地址 factory.setHost(""); factory.setPort(31084); //获取连接 Connection connection = factory.newConnection(); //获取渠道 Channel channel = connection.createChannel(); //声明交换机类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //产生随机数字 String message = RandomStringUtils.randomNumeric(8); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); }
public static void main(String[] args) throws IOException, TimeoutException { //建立连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置连接地址 factory.setHost(""); factory.setPort(31084); //获取连接 Connection connection = factory.newConnection(); //获取渠道 Channel channel = connection.createChannel(); //声明队列,如果不存在就新建 //参数1队列名称;参数2是否持久化;参数3排他性队列,连接断开自动删除;参数4是否自动删除;参数5.参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发送的消息 String message = Thread.currentThread().getName() + "Hello "; //参数1 交换机;参数2 路由键;参数3 基础属性;参数4 消息体 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(Thread.currentThread().getName() + "[send]" + message); channel.close(); connection.close(); }
/** * Open channel */ private Channel openChannel(Connection conn) throws IOException { log.trace("Creating channel..."); Channel channel = conn.createChannel(); log.debug("Created channel: {}", channel); // setup the basicQos if (consumer.getEndpoint().isPrefetchEnabled()) { channel.basicQos(consumer.getEndpoint().getPrefetchSize(), consumer.getEndpoint().getPrefetchCount(), consumer.getEndpoint().isPrefetchGlobal()); } // This really only needs to be called on the first consumer or on // reconnections. if (consumer.getEndpoint().isDeclare()) { consumer.getEndpoint().declareExchangeAndQueue(channel); } return channel; }
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // setRemoteConnectionFactory(factory); setLocalConnectionFactory(factory); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }
/** * @param args * @throws TimeoutException * @throws IOException * @throws InterruptedException * @throws ConsumerCancelledException * @throws ShutdownSignalException */ public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("RPCServer Awating RPC request"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println("RPCServer fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午2:57:32 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行多重绑定 for (String severity : routingKeys2) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, severity); System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME_ROUTING + ", queue:" + queueName + ", BindRoutingKey:" + severity); } System.out.println("ReceiveLogsDirect2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:53:18 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE_NAME_ROUTING, "direct"); // 获取匿名队列名称 String queueName = channel.queueDeclare().getQueue(); // 根据路由关键字进行绑定 for (String routingKey : routingKeys1) { channel.queueBind(queueName, EXCHANGE_NAME_ROUTING, routingKey); System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME_ROUTING + "," + " queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsDirect1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsDirect1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月11日 下午5:21:46 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = null; Channel channel = null; try { // 创建一个新的连接 connection = factory.newConnection(); // 创建一个通道 channel = connection.createChannel(); // 声明一个队列 // queueDeclare第一个参数表示队列名称 //第二个参数为是否持久化(true表示是,队列将在服务器重启时生存) //第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除) //第四个参数为当所有消费者客户端连接断开时是否自动删除队列 //第五个参数为队列的其他参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "{\"temperature\":100}"; // 发送消息到队列中 //basicPublish第一个参数为交换机名称 //第二个参数为队列映射的路由key //第三个参数为消息的其他属性 //第四个参数为发送信息的主体 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Send +'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭通道和连接 channel.close(); connection.close(); } }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月11日 下午5:32:45 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ地址 factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); // 创建一个新的连接 Connection connection = factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); // 声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Customer Waiting Received messages"); // DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery Consumer consumer = new DefaultConsumer(channel) { //envelope主要存放生产者相关信息(比如交换机、路由key等) //body是消息实体 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); } }; // 自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午2:40:52 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");// 对队列进行绑定 System.out.println("ReceiveLogs1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogs1 Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer);// 队列会自动删除 }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:03:24 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); connection = factory.newConnection(); channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); // 待发送的消息 String[] routingKeys = new String[] { "", "", "", "", "", "", "" }; // 发送消息 for (String severity : routingKeys) { String message = "From " + severity + " routingKey' s message!"; channel.basicPublish(EXCHANGE_NAME_TOPIC, severity, null, message.getBytes()); System.out.println("TopicSend Sent '" + severity + "':'" + message + "'"); } } catch (Exception e) { e.printStackTrace(); if (connection != null) { channel.close(); connection.close(); } } finally { if (connection != null) { channel.close(); connection.close(); } } }
/** * @param args * @throws TimeoutException * @throws IOException * @date 2017年7月13日 下午3:08:40 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换器 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; // 绑定路由关键字 for (String bindingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, bindingKey); System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME_TOPIC+", queue:"+queueName+", BindRoutingKey:" + bindingKey); } System.out.println("ReceiveLogsTopic2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
/** * @param args * @throws IOException * @throws TimeoutException * @date 2017年7月13日 下午3:06:20 * @writer junehappylove */ public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由关键字 String[] routingKeys = new String[] { "*.orange.*" }; // 绑定路由 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME_TOPIC, routingKey); System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME_TOPIC + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("ReceiveLogsTopic1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
@POST @Path("rabbitmqRecv") public void send() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost(""); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(properties.getHeaders()); String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }
public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } }
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
public static Connection connection() throws IOException, TimeoutException { if(connection == null) { synchronized(GabrielData.class) { if(connection != null) return connection; Config config = config(); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(config.rabbitMQHost); connectionFactory.setPort(config.rabbitMQPort); connectionFactory.setUsername(config.rabbitMQUsername); connectionFactory.setPassword(config.rabbitMQPassword); connection = connectionFactory.newConnection(); generalPurposeChannel = connection.createChannel(); GatewayInfo.init(generalPurposeChannel); } } return connection; }
protected void connect() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); channel = connection.createChannel(); String queueName = "flowing-retail-" + name; channel.queueDeclare(queueName, true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); // publish/subscribe model channel.queueBind(queueName, EXCHANGE_NAME, "*"); System.out.println(" [*] Waiting for messages."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); eventHandler.handleEvent(message); } }; channel.basicConsume(queueName, true, consumer); }
public static void main(String[] args) throws Exception { String queueName = "TestQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost(""); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
/** * Requests that we load the data for the environment variable configuration set. * @return the {@code Connection} configuration for the currently set information * @throws RuntimeException if anything goes wrong during data loading */ public Connection getConnection() {"Executing lookup of RabbitMQ configuration from environment"); String uri = System.getenv("AMQP_URL"); uri = StringUtils.isBlank(uri) ? System.getenv("RABBITMQ_URL") : uri; uri = StringUtils.isBlank(uri) ? System.getProperty("env.AMQP_URL") : uri; uri = StringUtils.isBlank(uri) ? System.getProperty("env.RABBITMQ_URL") : uri; if (StringUtils.isBlank(uri)) { throw new RuntimeException("Unable to find RabbitMQ configuration."); } // create and return the connection try { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(uri); return factory.newConnection(); } catch (IOException | NoSuchAlgorithmException | URISyntaxException | TimeoutException | KeyManagementException e) { throw new RuntimeException(e.getMessage(), e); } }
@Override protected Result check() throws Exception { Connection conn = null; Channel channel = null; try { conn = connectionFactory.newConnection(); channel = conn.createChannel(); channel.queueDeclarePassive(queueName); return Result.healthy(); } catch (Exception e) { Exception wrappedException = new ContextedRuntimeException(e).addContextValue("queueName", queueName) .addContextValue("connectionFactory", ToStringBuilder.reflectionToString(connectionFactory)); logger.error("Healthcheck Failure", wrappedException); return Result.unhealthy(wrappedException); } finally { closeChannel(channel); closeConnection(conn); } }
default Connection createFromRabbitMqConfig(RabbitMqConfig config){"Trying to connect to RabbitMq '{}:{}'.",, config.port()); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(; factory.setPort(config.port()); if (StringUtils.isNotBlank(config.login())) { factory.setUsername(config.login()); factory.setPassword(config.password()); LOGGER.debug("Using login : {} [{}]", config.login(), config.password()); } factory.setVirtualHost(config.virtualHost()); LOGGER.debug("Use virtualhost {}", config.virtualHost()); try { //ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); return factory.newConnection(); } catch (IOException e) { throw new RuntimeException("Unable to create a consumeConnection to Rabbit " + + ":" + config.port(), e); } }
@Override public Client onConnected(Consumer<> _handler) { if (connection != null) { throw new RuntimeException("Already connected"); } if (!handlers.containsKey(ConnectHandler.class)) { handlers.put(ConnectHandler.class, new LinkedList<>()); } handlers.get(ConnectHandler.class) .add(new ConnectHandler() { @Override public void handle( _client) { _handler.accept(_client); } }); return this; }