/** * Set the reconnection policy to define how often and in what interval to retry setup connections. * * @param policy * The reconnection policy as string "constant" {@link ConstantReconnectionPolicy} or "exponential" * {@link ExponentialReconnectionPolicy} * @param delay * The initial or constant delay * @param max * The maximum delay (only required for {@link ExponentialReconnectionPolicy}) */ public void setReconnectionPolicy(String policy, int delay, int... max) { switch(policy) { case "constant": setReconnectionPolicy(new ConstantReconnectionPolicy(delay)); break; case "exponential": setReconnectionPolicy(new ExponentialReconnectionPolicy(delay, max[0])); break; default: setReconnectionPolicy(Policies.defaultReconnectionPolicy()); break; } }
@Provides @Singleton Cluster provideCluster() { try { Cluster cluster = Cluster.builder() .addContactPointsWithPorts(Arrays.asList( // new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9042) // mvn cassandra:run + nodetool enablebinary new InetSocketAddress(InetAddress.getByName("localhost"), 9142) // cassandraunit )) .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 5000L)) .build(); Metadata metadata = cluster.getMetadata(); LOGGER.info("Connected to cluster: '{}'", metadata.getClusterName()); metadata.getAllHosts() .forEach(host -> LOGGER.info("Datacenter: '{}'; Host: '{}'; Rack: '{}'", new Object[] { host.getDatacenter(), host.getAddress(), host.getRack() }) ); return cluster; } catch (UnknownHostException e) { LOGGER.error("Can't connect to Cassandra", e); return null; } }
/** * Currently we connect just once and then reuse the connection. * We do not bother with closing the connection. * * It is normal to use one Session per DB. The Session is thread safe. */ private void connect() { if (cluster == null) { log.info("Connecting to Cassandra server on " + this.dbHost + " at port " + this.dbPort); // allow fetching as much data as present in the DB QueryOptions queryOptions = new QueryOptions(); queryOptions.setFetchSize(Integer.MAX_VALUE); queryOptions.setConsistencyLevel(ConsistencyLevel.ONE); cluster = Cluster.builder() .addContactPoint(this.dbHost) .withPort(this.dbPort) .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())) .withReconnectionPolicy(new ExponentialReconnectionPolicy(500, 30000)) .withQueryOptions(queryOptions) .withCredentials(this.dbUser, this.dbPassword) .build(); } if (session == null) { log.info("Connecting to Cassandra DB with name " + this.dbName); session = cluster.connect(dbName); } }
@Test public void testReconnectionPolicyParsing() throws Exception { String retryPolicyStr = "ConstantReconnectionPolicy((long)10)"; System.out.println(retryPolicyStr); assertTrue(Utils.parseReconnectionPolicy(retryPolicyStr) instanceof ConstantReconnectionPolicy); System.out.println("===================="); retryPolicyStr = "ExponentialReconnectionPolicy((long)10,(Long)100)"; System.out.println(retryPolicyStr); assertTrue(Utils.parseReconnectionPolicy(retryPolicyStr) instanceof ExponentialReconnectionPolicy); System.out.println("===================="); }
public MetaStoreClient(String... contactPoints) { if (contactPoints.length == 0) throw new RuntimeException("No contact points specified"); cluster = Cluster.builder() .addContactPoints(contactPoints) .withClusterName(Schema.CLUSTER) .withLoadBalancingPolicy(new RoundRobinPolicy()) .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) .withReconnectionPolicy(new ExponentialReconnectionPolicy(100, 10000)) .withoutMetrics() .build(); }
private Cluster getNewCluster(String cassandraNodes) { return Cluster.builder() .withoutJMXReporting() .withoutMetrics() .addContactPoints(cassandraNodes.split(",")) .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(5))) .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())) .build(); }
@Test public void buildsPolicyWithDelayAndMaxInMillis() throws Exception { final ExponentialReconnectionPolicyFactory factory = new ExponentialReconnectionPolicyFactory(); factory.setBaseDelay(Duration.seconds(4)); factory.setMaxDelay(Duration.seconds(7)); final ExponentialReconnectionPolicy policy = (ExponentialReconnectionPolicy) factory.build(); assertThat(policy.getBaseDelayMs()).isEqualTo(4000L); assertThat(policy.getMaxDelayMs()).isEqualTo(7000L); }
@Inject public CassandraSessionImpl(@Named("cassandra.keyspace") String keyspace, @Named("cassandra.hostname") String hostname, @Named("cassandra.port") int port, @Named("cassandra.compression") String compression, @Named("cassandra.username") String username, @Named("cassandra.password") String password, @Named("cassandra.ssl") boolean ssl) { checkNotNull(keyspace, "keyspace argument"); checkNotNull(hostname, "hostname argument"); checkArgument(port > 0 && port < 65535, "not a valid port number: %d", port); checkNotNull(compression, "compression argument"); LOG.info("Setting up session with {}:{} using compression {}", hostname, port, compression.toUpperCase()); Builder builder = Cluster .builder() .withPort(port) .addContactPoints(hostname.split(",")) .withReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 2 * 60 * 1000)) .withCompression(Compression.valueOf(compression.toUpperCase())); if (username != null && password != null) { LOG.info("Using username: {} and password: XXXXXXXX", username); builder.withCredentials(username, password); } if (ssl) { LOG.info("Enabling SSL."); builder.withSSL(); } m_session = builder.build().connect(keyspace); }
@Provides @Singleton Cluster provideCluster() { Cluster cluster = Cluster.builder() .addContactPoints("127.0.0.1") .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 5000L)) .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())) .build(); Metadata metadata = cluster.getMetadata(); LOGGER.info("Connected to cluster: '{}'", metadata.getClusterName()); metadata.getAllHosts() .forEach(host -> LOGGER.info("Datacenter: '{}'; Host: '{}'; Rack: '{}'", new Object[] { host.getDatacenter(), host.getAddress(), host.getRack() }) ); return cluster; }
public CassandraSessionManaged build(Environment environment, String localDc) { PoolingOptions poolingOptions = new PoolingOptions(); poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, 3, 5) .setConnectionsPerHost(HostDistance.REMOTE, 1, 2); final DCAwareRoundRobinPolicy.Builder builder = DCAwareRoundRobinPolicy.builder(); if (localDc != null) { builder.withLocalDc(localDc); } QueryOptions queryOptions = new QueryOptions(); queryOptions.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL); final Cluster cluster = Cluster .builder() .withRetryPolicy(DefaultRetryPolicy.INSTANCE) .withReconnectionPolicy(new ExponentialReconnectionPolicy(10L, 1000L)) .withQueryOptions(queryOptions) .withLoadBalancingPolicy(new TokenAwarePolicy(builder.build())) .addContactPoints(getContactPoints().stream().toArray(String[]::new)) .withPort(getPort()) .withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(1000, 2)) .withPoolingOptions(poolingOptions) .build(); cluster.getConfiguration().getCodecRegistry() .register(InstantCodec.instance); Session session = cluster.connect(getKeySpace()); CassandraSessionManaged cassandraSessionManaged = new CassandraSessionManaged(cluster, session); environment.lifecycle().manage(cassandraSessionManaged); return cassandraSessionManaged; }
@Singleton @Provides public static CassandraSession createCassandraSession( CassandraConnectorId connectorId, CassandraClientConfig config, JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec) { requireNonNull(config, "config is null"); requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null"); Cluster.Builder clusterBuilder = Cluster.builder(); List<String> contactPoints = requireNonNull(config.getContactPoints(), "contactPoints is null"); checkArgument(!contactPoints.isEmpty(), "empty contactPoints"); clusterBuilder.addContactPoints(contactPoints.toArray(new String[contactPoints.size()])); clusterBuilder.withPort(config.getNativeProtocolPort()); clusterBuilder.withReconnectionPolicy(new ExponentialReconnectionPolicy(500, 10000)); clusterBuilder.withRetryPolicy(config.getRetryPolicy().getPolicy()); SocketOptions socketOptions = new SocketOptions(); socketOptions.setReadTimeoutMillis(Ints.checkedCast(config.getClientReadTimeout().toMillis())); socketOptions.setConnectTimeoutMillis(Ints.checkedCast(config.getClientConnectTimeout().toMillis())); if (config.getClientSoLinger() != null) { socketOptions.setSoLinger(config.getClientSoLinger()); } clusterBuilder.withSocketOptions(socketOptions); if (config.getUsername() != null && config.getPassword() != null) { clusterBuilder.withCredentials(config.getUsername(), config.getPassword()); } QueryOptions options = new QueryOptions(); options.setFetchSize(config.getFetchSize()); options.setConsistencyLevel(config.getConsistencyLevel()); clusterBuilder.withQueryOptions(options); return new CassandraSession( connectorId.toString(), clusterBuilder, config.getFetchSizeForPartitionKeySelect(), config.getLimitForPartitionKeySelect(), extraColumnMetadataCodec); }
@Override public ReconnectionPolicy build() { return new ExponentialReconnectionPolicy(baseDelay.toMilliseconds(), maxDelay.toMilliseconds()); }