private Builder populateRetrytPolicy(Map<String, String> properties, Builder builder) throws DataServiceFault { String retryPolicy = properties.get(DBConstants.Cassandra.RETRY_POLICY); if (retryPolicy != null) { if ("DefaultRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); } else if ("DowngradingConsistencyRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); } else if ("FallthroughRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); } else if ("LoggingDefaultRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); } else if ("LoggingDowngradingConsistencyRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); } else if ("LoggingFallthroughRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE)); } else { throw new DataServiceFault("Invalid Cassandra retry policy: " + retryPolicy); } } return builder; }
@Override public void connect() { SocketOptions socketOptions = new SocketOptions(); socketOptions.setReadTimeoutMillis(500); cluster = Cluster.builder() .addContactPoint("localhost") .withPort(port) .withRetryPolicy(new LoggingRetryPolicy(new RetryReads())) .withSocketOptions(socketOptions) .build(); session = cluster.connect("scassandra"); storeStatement = session.prepare("insert into person(first_name, last_name, age, interesting_dates) values (?,?,?,?)"); retrieveStatement = session.prepare("select * from person where first_name = ? and last_name = ?"); }
private Cluster.Builder populateRetrytPolicy(Properties properties, Cluster.Builder builder) { String retryPolicy = properties.getProperty(CassandraStoreParameters.RETRY_POLICY); if (retryPolicy != null) { switch (retryPolicy) { case "DefaultRetryPolicy": builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); break; case "DowngradingConsistencyRetryPolicy": builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); break; case "FallthroughRetryPolicy": builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); break; case "LoggingDefaultRetryPolicy": builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); break; case "LoggingDowngradingConsistencyRetryPolicy": builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); break; case "LoggingFallthroughRetryPolicy": builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE)); break; default: LOG.error("Unsupported retry policy : {} ", retryPolicy); break; } } return builder; }
/** * Creates a {@link SessionManager} instance. Sub-class my override this * method to customized its own {@link SessionManager}. * * @return */ protected SessionManager createSessionManager() { SessionManager sm = new SessionManager(); // sm.setRetryPolicy(new // LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); sm.setRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); sm.setSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(10000, 3)); sm.init(); return sm; }
@PostConstruct public void initialize() { String cassandraHosts = env.getProperty("ea.cassandra.hosts","localhost:9042"); String cassandraClusterName = env.getProperty("ea.cassandra.cluster","ElasticActorsCluster"); String cassandraKeyspaceName = env.getProperty("ea.cassandra.keyspace","\"ElasticActors\""); Integer cassandraPort = env.getProperty("ea.cassandra.port", Integer.class, 9042); Set<String> hostSet = StringUtils.commaDelimitedListToSet(cassandraHosts); String[] contactPoints = new String[hostSet.size()]; int i=0; for (String host : hostSet) { if(host.contains(":")) { contactPoints[i] = host.substring(0,host.indexOf(":")); } else { contactPoints[i] = host; } i+=1; } PoolingOptions poolingOptions = new PoolingOptions(); poolingOptions.setHeartbeatIntervalSeconds(60); poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, 2, env.getProperty("ea.cassandra.maxActive",Integer.class,Runtime.getRuntime().availableProcessors() * 3)); poolingOptions.setPoolTimeoutMillis(2000); Cluster cassandraCluster = Cluster.builder().withClusterName(cassandraClusterName) .addContactPoints(contactPoints) .withPort(cassandraPort) .withLoadBalancingPolicy(new RoundRobinPolicy()) .withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)) .withPoolingOptions(poolingOptions) .withReconnectionPolicy(new ConstantReconnectionPolicy(env.getProperty("ea.cassandra.retryDownedHostsDelayInSeconds",Integer.class,1) * 1000)) .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM)).build(); this.cassandraSession = cassandraCluster.connect(cassandraKeyspaceName); }
@BeforeClass() public static void setup() throws ConfigurationException, IOException { Schema.instance.clear(); cassandra = new EmbeddedCassandraService(); cassandra.start(); cluster = Cluster.builder().addContactPoint("127.0.0.1") .withRetryPolicy(new LoggingRetryPolicy(Policies.defaultRetryPolicy())) .withPort(DatabaseDescriptor.getNativeTransportPort()).build(); session = cluster.connect(); session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +" WITH replication " + "= {'class':'SimpleStrategy', 'replication_factor':1};"); session.execute("USE " + KEYSPACE); session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (" + "key blob," + "value blob," + "PRIMARY KEY (key));"); // Prepared statements getStatement = session.prepare("SELECT value FROM " + TABLE + " WHERE key = ?;"); getStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); putStatement = session.prepare("INSERT INTO " + TABLE + " (key, value) VALUES (?, ?);"); putStatement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); StringBuilder s = new StringBuilder(); char a='a'; char z='z'; for (int i = 0; i < 500*1024; i++) { char x = (char)((i%((z-a)+1))+a); if (x == 'a') { x = '\n'; } s.append(x); } VALUE = s.toString(); }
@Override public RetryPolicy build() { return new LoggingRetryPolicy(subPolicy.build()); }