private void connect() throws IOException { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-queue-mode", "lazy"); // we want to minimize memory usage; see boolean lazys = lazy.get(); try {, true, false, false, lazys ? arguments : null); } catch (AlreadyClosedException e) { lazys = !lazys; try { channel = connection.createChannel(); // may happen if a queue was previously not declared "lazy". So we try non-lazy queue setting now., true, false, false, lazys ? arguments : null); // if this is successfull, set the new lazy value lazy.set(lazys); } catch (AlreadyClosedException ee) { throw new IOException(ee.getMessage()); } } }
@Test public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception { AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class); RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3)); Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1); Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); Mockito.when(conn.createChannel()).thenReturn(channel); Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG"); Mockito.when(channel.isOpen()).thenReturn(false); Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG"); Mockito.doThrow(alreadyClosedException).when(channel).close(); consumer.doStart(); consumer.doStop(); Mockito.verify(conn).close(30 * 1000); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:rabbitMQ") .id("producingRoute") .onException(AlreadyClosedException.class, ConnectException.class) .maximumRedeliveries(10) .redeliveryDelay(500L) .end() .log("Sending message") .inOnly(rabbitMQEndpoint) .to(producingMockEndpoint); from(rabbitMQEndpoint) .id("consumingRoute") .log("Receiving message") .to(consumingMockEndpoint); } }; }
/** * Helper method to close connection in RMQChannel * * @throws ShutdownSignalException * @throws IOException * @throws AlreadyClosedException */ public void closeConnection() throws ShutdownSignalException, IOException, AlreadyClosedException{ if (connection != null) { try { channel.close(); } catch (Exception e) { //ignore as the connection gets closed anyway } channel = null; try { connection.close(); } finally { connection = null; } } }
/** * Returns an answer that fails n times for each thread, throwing t for the first n invocations * and returning {@code returnValue} thereafter. Prior to throwing t, the connection handler's * shutdown listener is completed if t is a connection shutdown signal. */ protected <T> Answer<T> failNTimes(final int n, final Throwable t, final T returnValue, final RetryableResource resource) { return new Answer<T>() { AtomicInteger failures = new AtomicInteger(); @Override public T answer(InvocationOnMock invocation) throws Throwable { if (failures.getAndIncrement() >= n) return returnValue; if (t instanceof ShutdownSignalException) callShutdownListener(resource, (ShutdownSignalException) t); if (t instanceof ShutdownSignalException && !(t instanceof AlreadyClosedException)) throw new IOException(t); else throw t; } }; }
@Override public void publish(final String routingKey, final String message) throws IOException { try { if(connected) { channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); } else { logger.warn("Not connected! Ignoring message..."); } } catch (AlreadyClosedException ex) { logger.debug(ex.getMessage(), ex); } }
@Override protected void closeImpl() throws IOException { //! We are going to assume that closing an already closed // connection is considered success. if (connection != null && connection.isOpen()) { try { connection.close(factory.getConnectionTimeout()); } catch (AlreadyClosedException ignored) {} } }
@Override protected void stopImpl() throws IOException { //! As with closing the connection, closing an already // closed channel is considered success. if (channel != null && channel.isOpen()) { try { channel.close(); } catch (AlreadyClosedException ignored) {} } }
@Test(expectedExceptions = AlreadyClosedException.class) public void shouldThrowOnAlreadyClosedChannelInvocation() throws Throwable { mockConnection(); Channel channel = mockChannel().proxy; when(channel.getCloseReason()).thenReturn(channelShutdownSignal()); channel.close(); channel.abort(); }
@Override public Object invoke(Object ignored, final Method method, final Object[] args) throws Throwable { if (closed && method.getDeclaringClass().isAssignableFrom(Channel.class)) throw new AlreadyClosedException(delegate.getCloseReason()); Callable<Object> callable = new Callable<Object>() { @Override public Object call() throws Exception { if (method.getDeclaringClass().isAssignableFrom(ChannelConfig.class)) return Reflection.invoke(config, method, args); String methodName = method.getName(); if ("basicAck".equals(methodName) || "basicNack".equals(methodName) || "basicReject".equals(methodName)) { long deliveryTag = (Long) args[0] - previousMaxDeliveryTag; if (deliveryTag > 0) args[0] = deliveryTag; else return null; } else if ("basicConsume".equals(methodName)) return handleConsumerDeclare(method, args); else if ("basicCancel".equals(methodName) && args[0] != null) consumerDeclarations.remove((String) args[0]); else if ("exchangeDelete".equals(methodName) && args[0] != null) connectionHandler.exchangeDeclarations.remove((String) args[0]); else if ("exchangeUnbind".equals(methodName) && args[0] != null) connectionHandler.exchangeBindings.remove((String) args[0], new Binding(args)); else if ("queueDelete".equals(methodName) && args[0] != null) connectionHandler.queueDeclarations.remove((String) args[0]); else if ("queueUnbind".equals(methodName) && args[0] != null) connectionHandler.queueBindings.remove((String) args[0], new Binding(args)); Object result = Reflection.invoke(delegate, method, args); if ("exchangeDeclare".equals(methodName)) handleExchangeDeclare(method, args); else if ("exchangeBind".equals(methodName)) handleExchangeBind(args); else if ("queueDeclare".equals(methodName)) handleQueueDeclare(((Queue.DeclareOk) result).getQueue(), method, args); else if ("queueBind".equals(methodName)) handleQueueBind(method, args); else if ("flowBlocked".equals(methodName)) flowBlocked = true; else if ("basicQos".equals(methodName)) { // Store non-global Qos if (args.length < 3 || !(Boolean) args[2]) basicQos = new ResourceDeclaration(method, args); } else if ("confirmSelect".equals(methodName)) confirmSelect = true; else if ("txSelect".equals(methodName)) txSelect = true; else if (methodName.startsWith("add")) handleAdd(methodName, args[0]); else if (methodName.startsWith("remove")) handleRemove(methodName, args[0]); else if (methodName.startsWith("clear")) handleClear(methodName); return result; } @Override public String toString() { return Reflection.toString(method); } }; return handleCommonMethods(delegate, method, args) ? null : callWithRetries(callable, config.getChannelRetryPolicy(), null, config.getRetryableExceptions(), canRecover(), true); }
/** * Reliably returns whether the shutdown signal represents a connection closure. */ public static boolean isConnectionClosure(ShutdownSignalException e) { return e instanceof AlreadyClosedException ? e.getReference() instanceof Connection : e.isHardError(); }