@Override public String toString() { return "{" + "channelType=" + channelType + ", channelNo=" + delegate.getChannelNumber() + ", localPort=" + ((AMQConnection) delegate.getConnection()).getLocalPort() + '}'; }
private synchronized Connection createConnection(ChannelType connectionType) throws ConnectionFailureException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date startTime = new Date(); sdf.setTimeZone(TimeZone.getTimeZone("UTC")); settings.getClient_properties().put("connection_type", connectionType.toString()); settings.getClient_properties().put("connect_time", sdf.format(startTime)+"Z"); ConnectionFactory cf = new ConnectionFactory(); cf.setRequestedHeartbeat(settings.getHeartbeat()); cf.setConnectionTimeout(settings.getConnection_timeout_millis()); cf.setShutdownTimeout(settings.getShutdown_timeout_millis()); cf.setRequestedFrameMax(settings.getFrame_max()); cf.setHandshakeTimeout(settings.getHandshake_timeout_millis()); cf.setClientProperties((Map)settings.getClient_properties()); //cf.setSocketConfigurator(); NOTE is this worth investigating?? cf.setRequestedChannelMax(0);//Hard coded .. cf.setAutomaticRecoveryEnabled(false);//Hard coded .. cf.setTopologyRecoveryEnabled(false);//Hard coded .. Exception lastException = null; Connection connection = null; for (BrokerAddresses.BrokerAddress address : addresses) { cf.setPassword(address.password); cf.setUsername(address.username); cf.setPort(address.port); cf.setHost(address.host); cf.setVirtualHost(address.virtualHost); try { if(address.scheme.toLowerCase().equals("amqps")){ cf.useSslProtocol(); cf.setSocketFactory(SSLSocketFactory.getDefault()); //Because rabbit uses NoopTrustStore by default... } log.infoWithParams("Creating "+connectionType+" connection to broker ...", "address", address.toString(), "settings", settings.toString()); connection = cf.newConnection(); boolean isOpen = connection.isOpen(); if(!isOpen){ continue; } break; } catch (Exception e) { log.debugWithParams("Failed to createConnection to broker", "address", address.toString()); lastException = e; } } if(connection == null){ throw new ConnectionFailureException(cf, lastException); } conToChannel.put(connectionType, new ConnectionInfo( connection, new ArrayList<ChannelImpl>(), settings.getClient_properties(), connectionType) ); log.infoWithParams("Successfully created "+connectionType+" connection to broker.", "address", addresses.get(0).toString(), "localPort", ((AMQConnection) connection).getLocalPort(), "settings", settings.toString()); return connection; }