/** * Returns a Cassandra session object. * @return a connection session to Cassandra */ public static Session getClient() { if (session != null) { return session; } try { Builder builder = Cluster.builder().addContactPoints(DBHOSTS.split(",")). withPort(DBPORT).withCredentials(DBUSER, DBPASS); if (SSL) { builder.withSSL(); } cluster = builder.build(); session = cluster.connect(); if (!existsTable(Config.getRootAppIdentifier())) { createTable(session, Config.getRootAppIdentifier()); } else { session.execute("USE " + DBNAME + ";"); } logger.debug("Cassandra host: " + DBHOSTS + ":" + DBPORT + ", keyspace: " + DBNAME); } catch (Exception e) { logger.error("Failed to connect ot Cassandra: {}.", e.getMessage()); } Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { shutdownClient(); } }); return session; }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection); CassandraSink.addSink(source) .setQuery(INSERT) .setClusterBuilder(new ClusterBuilder() { @Override protected Cluster buildCluster(Builder builder) { return builder.addContactPoint("127.0.0.1").build(); } }) .build(); env.execute("WriteTupleIntoCassandra"); }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Message> source = env.fromCollection(messages); CassandraSink.addSink(source) .setClusterBuilder(new ClusterBuilder() { @Override protected Cluster buildCluster(Builder builder) { return builder.addContactPoint("127.0.0.1").build(); } }) .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)}) .build(); env.execute("Cassandra Sink example"); }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Message> source = env.fromCollection(messages); CassandraSink.addSink(source) .setClusterBuilder(new ClusterBuilder() { @Override protected Cluster buildCluster(Builder builder) { return builder.addContactPoint("127.0.0.1").build(); } }) .build(); env.execute("Cassandra Sink example"); }
public CassandraSession(String connectorId, final Builder clusterBuilder, int fetchSizeForPartitionKeySelect, int limitForPartitionKeySelect, JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec) { this.connectorId = connectorId; this.fetchSizeForPartitionKeySelect = fetchSizeForPartitionKeySelect; this.limitForPartitionKeySelect = limitForPartitionKeySelect; this.extraColumnMetadataCodec = extraColumnMetadataCodec; sessionBySchema = CacheBuilder.newBuilder() .build(new CacheLoader<String, Session>() { @Override public Session load(String key) throws Exception { return clusterBuilder.build().connect(); } }); }
private void copyPoolingOptions(Builder builder) { PoolingOptions opts = new PoolingOptions(); opts.setCoreConnectionsPerHost(HostDistance.REMOTE, remoteCoreConnectionsPerHost); opts.setCoreConnectionsPerHost(HostDistance.LOCAL, localCoreConnectionsPerHost); opts.setMaxConnectionsPerHost(HostDistance.REMOTE, remoteMaxConnectionsPerHost); opts.setMaxConnectionsPerHost(HostDistance.LOCAL, localMaxConnectionsPerHost); opts.setMaxSimultaneousRequestsPerConnectionThreshold( HostDistance.REMOTE, remoteMaxSimultaneousRequestsPerConnectionThreshold); opts.setMaxSimultaneousRequestsPerConnectionThreshold( HostDistance.LOCAL, localMaxSimultaneousRequestsPerConnectionThreshold); opts.setMinSimultaneousRequestsPerConnectionThreshold( HostDistance.REMOTE, remoteMinSimultaneousRequestsPerConnectionThreshold); opts.setMinSimultaneousRequestsPerConnectionThreshold( HostDistance.LOCAL, localMinSimultaneousRequestsPerConnectionThreshold); builder.withPoolingOptions(opts); }
public void createCluster() { erroredOut = false; schemaCreated = false; cassandraCluster = CCMBridge.create("test", 1); try { Builder builder = Cluster.builder(); builder = configure(builder); cluster = builder.addContactPoints(IP_PREFIX + '1').build(); session = cluster.connect(); } catch (NoHostAvailableException e) { erroredOut = true; for (Map.Entry<InetSocketAddress, Throwable> entry : e.getErrors().entrySet()) logger.info("Error connecting to " + entry.getKey() + ": " + entry.getValue()); throw new RuntimeException(e); } }
@Inject public SchemaManager(@Named("cassandra.keyspace") String keyspace, @Named("cassandra.host") String host, @Named("cassandra.port") int port, @Named("cassandra.username") String username, @Named("cassandra.password") String password, @Named("cassandra.ssl") boolean ssl) { m_keyspace = keyspace; Builder builder = Cluster.builder() .withPort(port) .addContactPoints(host.split(",")); if (username != null && password != null) { LOG.info("Using username: {} and password: XXXXXXXX", username); builder.withCredentials(username, password); } if (ssl) { LOG.info("Using SSL."); builder.withSSL(); } m_cluster= builder.build(); m_session = m_cluster.connect(); }
public CassandraConfig(DataService dataService, String configId, Map<String, String> properties, boolean odataEnable) throws DataServiceFault { super(dataService, configId, DataSourceTypes.CASSANDRA, properties, odataEnable); Builder builder = Cluster.builder(); this.populateSettings(builder, properties); String keyspace = properties.get(DBConstants.Cassandra.KEYSPACE); this.cluster = builder.build(); try { if (keyspace != null && keyspace.trim().length() > 0) { this.session = this.cluster.connect(keyspace); } else { this.session = this.cluster.connect(); } this.nativeBatchRequestsSupported = this.session.getCluster(). getConfiguration().getProtocolOptions().getProtocolVersion().toInt() > 1; } catch (NoHostAvailableException e) { throw new DataServiceFault(e, DBConstants.FaultCodes.CONNECTION_UNAVAILABLE_ERROR, e.getMessage()); } }
private Builder populateQueryOptions(Map<String, String> properties, Builder builder) { String consistencyLevelProp = properties.get(DBConstants.Cassandra.CONSISTENCY_LEVEL); String serialConsistencyLevelProp = properties.get(DBConstants.Cassandra.SERIAL_CONSISTENCY_LEVEL); String fetchSize = properties.get(DBConstants.Cassandra.FETCH_SIZE); QueryOptions options = new QueryOptions(); if (consistencyLevelProp != null) { options.setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevelProp)); } if (serialConsistencyLevelProp != null) { options.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevelProp)); } if (fetchSize != null) { options.setFetchSize(Integer.parseInt(fetchSize)); } return builder.withQueryOptions(options); }
private Builder populateRetrytPolicy(Map<String, String> properties, Builder builder) throws DataServiceFault { String retryPolicy = properties.get(DBConstants.Cassandra.RETRY_POLICY); if (retryPolicy != null) { if ("DefaultRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); } else if ("DowngradingConsistencyRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); } else if ("FallthroughRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); } else if ("LoggingDefaultRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); } else if ("LoggingDowngradingConsistencyRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); } else if ("LoggingFallthroughRetryPolicy".equals(retryPolicy)) { builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE)); } else { throw new DataServiceFault("Invalid Cassandra retry policy: " + retryPolicy); } } return builder; }
@Test public void connectViaConnectionString() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra("cassandra://localhost/beers") .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void connectViaConnectionStringSupplier() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilderProvider) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra("cassandra://localhost/beers", () -> unit.get(Cluster.Builder.class)) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@BeforeClass public synchronized static void setUpBeforeClass() throws Exception { if (factory == null) { // logger.info("Starting embedded cassandra..."); // EmbeddedCassandraServerHelper.startEmbeddedCassandra("unittest-cassandra.yaml", "./build/cassandra"); // Thread.sleep(100); // logger.info("Successfully started embedded cassandra."); final Cluster cluster = new Builder().addContactPoint("localhost").withPort(CASSANDRA_PORT).build(); // final Session session = cluster.newSession(); // session.execute("CREATE KEYSPACE copper WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"); factory = new UnitTestCassandraEngineFactory(false); factory.setCassandraPort(CASSANDRA_PORT); factory.getEngine().startup(); } }
private CassandraSessionProxy(Consumer<Builder> clusterBuilderSetter, String keyspaceName, String user, String password, String schemaCQL) { this.keyspaceName = keyspaceName; setCluster(clusterBuilderSetter, user, password); createSchema(schemaCQL, keyspaceName, true); try { session = createSessionForKeyspace(keyspaceName); } catch (Exception ex) { ex.printStackTrace(); } }
protected Cluster getCluster() { Builder cb = Cluster.builder(); cb.addContactPoints(contactPoints); cb.withPort(getPort()); if (getDataCenter() != null) { cb.withLoadBalancingPolicy(new DCAwareRoundRobinPolicy(getDataCenter())); } enrichCluster(cb); return cb.build(); }
/** * Constructor, creates Cassandra session * @param contactPoints Cassandra cluster contact points * @param keyspace Keyspace for `lock_leases` */ public LockFactory(String contactPoints, String keyspace) { Builder builder = Cluster.builder(); for (String point : contactPoints.split(",")) { builder.addContactPoint(point.trim()); } Cluster cluster = builder.build(); session = cluster.connect(); session.execute("USE " + keyspace); generalInit(); }
public List<String> getAllSchemas() { ImmutableList.Builder<String> builder = ImmutableList.builder(); List<KeyspaceMetadata> keyspaces = executeWithSession("", new SessionCallable<List<KeyspaceMetadata>>() { @Override public List<KeyspaceMetadata> executeWithSession(Session session) { return session.getCluster().getMetadata().getKeyspaces(); } }); for (KeyspaceMetadata meta : keyspaces) { builder.add(meta.getName()); } return builder.build(); }
public List<String> getAllTables(String schema) throws SchemaNotFoundException { KeyspaceMetadata meta = getCheckedKeyspaceMetadata(schema); ImmutableList.Builder<String> builder = ImmutableList.builder(); for (TableMetadata tableMeta : meta.getTables()) { builder.add(tableMeta.getName()); } return builder.build(); }
public static boolean isClusterActive(){ try{ Builder builder = Cluster.builder().withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)); cluster = builder.addContactPoint("127.0.0.1").build(); session = cluster.connect(); return true; } catch(Exception e){ return false; } }
public void open() { if ( cluster == null || session == null ) { Builder builder = Cluster.builder(); builder.addContactPoints( hosts ); if ( user != null && pass != null && !user.isEmpty() && !pass.isEmpty() ) { builder = builder.withCredentials( user, pass ); } cluster = builder.build(); session = cluster.connect( keyspace ); } }
@Inject public CassandraSessionImpl(@Named("cassandra.keyspace") String keyspace, @Named("cassandra.hostname") String hostname, @Named("cassandra.port") int port, @Named("cassandra.compression") String compression, @Named("cassandra.username") String username, @Named("cassandra.password") String password, @Named("cassandra.ssl") boolean ssl) { checkNotNull(keyspace, "keyspace argument"); checkNotNull(hostname, "hostname argument"); checkArgument(port > 0 && port < 65535, "not a valid port number: %d", port); checkNotNull(compression, "compression argument"); LOG.info("Setting up session with {}:{} using compression {}", hostname, port, compression.toUpperCase()); Builder builder = Cluster .builder() .withPort(port) .addContactPoints(hostname.split(",")) .withReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 2 * 60 * 1000)) .withCompression(Compression.valueOf(compression.toUpperCase())); if (username != null && password != null) { LOG.info("Using username: {} and password: XXXXXXXX", username); builder.withCredentials(username, password); } if (ssl) { LOG.info("Enabling SSL."); builder.withSSL(); } m_session = builder.build().connect(keyspace); }
private Builder populateCredentials(Map<String, String> properties, Builder builder) { String usernameProp = properties.get(DBConstants.Cassandra.USERNAME); String passwordProp = properties.get(DBConstants.Cassandra.PASSWORD); if (usernameProp != null) { builder = builder.withCredentials(usernameProp, passwordProp); } return builder; }
private Builder populatePoolingSettings(Map<String, String> properties, Builder builder) { String localCoreConnectionsPerHost = properties.get(DBConstants.Cassandra.LOCAL_CORE_CONNECTIONS_PER_HOST); String remoteCoreConnectionsPerHost = properties.get(DBConstants.Cassandra.REMOTE_CORE_CONNECTIONS_PER_HOST); String localMaxConnectionsPerHost = properties.get(DBConstants.Cassandra.LOCAL_MAX_CONNECTIONS_PER_HOST); String remoteMaxConnectionsPerHost = properties.get(DBConstants.Cassandra.REMOTE_MAX_CONNECTIONS_PER_HOST); String localNewConnectionThreshold = properties.get(DBConstants.Cassandra.LOCAL_NEW_CONNECTION_THRESHOLD); String remoteNewConnectionThreshold = properties.get(DBConstants.Cassandra.REMOTE_NEW_CONNECTION_THRESHOLD); String localMaxRequestsPerConnection = properties.get(DBConstants.Cassandra.LOCAL_MAX_REQUESTS_PER_CONNECTION); String remoteMaxRequestsPerConnection = properties.get(DBConstants.Cassandra.REMOTE_MAX_REQUESTS_PER_CONNECTION); PoolingOptions options = new PoolingOptions(); if (localCoreConnectionsPerHost != null) { options.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localCoreConnectionsPerHost)); } if (remoteCoreConnectionsPerHost != null) { options.setCoreConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteCoreConnectionsPerHost)); } if (localMaxConnectionsPerHost != null) { options.setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localMaxConnectionsPerHost)); } if (remoteMaxConnectionsPerHost != null) { options.setMaxConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteMaxConnectionsPerHost)); } if (localNewConnectionThreshold != null) { options.setNewConnectionThreshold(HostDistance.LOCAL, Integer.parseInt(localNewConnectionThreshold)); } if (remoteNewConnectionThreshold != null) { options.setNewConnectionThreshold(HostDistance.REMOTE, Integer.parseInt(remoteNewConnectionThreshold)); } if (localMaxRequestsPerConnection != null) { options.setMaxRequestsPerConnection(HostDistance.LOCAL, Integer.parseInt(localMaxRequestsPerConnection)); } if (remoteMaxRequestsPerConnection != null) { options.setMaxRequestsPerConnection(HostDistance.REMOTE, Integer.parseInt(remoteMaxRequestsPerConnection)); } builder = builder.withPoolingOptions(options); return builder; }
private Builder populateSocketOptions(Map<String, String> properties, Builder builder) throws DataServiceFault { String connectionTimeoutMillisProp = properties.get(DBConstants.Cassandra.CONNECTION_TIMEOUT_MILLIS); String keepAliveProp = properties.get(DBConstants.Cassandra.KEEP_ALIVE); String readTimeoutMillisProp = properties.get(DBConstants.Cassandra.READ_TIMEOUT_MILLIS); String receiveBufferSizeProp = properties.get(DBConstants.Cassandra.RECEIVER_BUFFER_SIZE); String reuseAddress = properties.get(DBConstants.Cassandra.REUSE_ADDRESS); String sendBufferSize = properties.get(DBConstants.Cassandra.SEND_BUFFER_SIZE); String soLinger = properties.get(DBConstants.Cassandra.SO_LINGER); String tcpNoDelay = properties.get(DBConstants.Cassandra.TCP_NODELAY); SocketOptions options = new SocketOptions(); if (connectionTimeoutMillisProp != null) { options.setConnectTimeoutMillis(Integer.parseInt(connectionTimeoutMillisProp)); } if (keepAliveProp != null) { options.setKeepAlive(Boolean.parseBoolean(keepAliveProp)); } if (readTimeoutMillisProp != null) { options.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillisProp)); } if (receiveBufferSizeProp != null) { options.setReceiveBufferSize(Integer.parseInt(receiveBufferSizeProp)); } if (reuseAddress != null) { options.setReuseAddress(Boolean.parseBoolean(reuseAddress)); } if (sendBufferSize != null) { options.setSendBufferSize(Integer.parseInt(sendBufferSize)); } if (soLinger != null) { options.setSoLinger(Integer.parseInt(soLinger)); } if (tcpNoDelay != null) { options.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelay)); } return builder.withSocketOptions(options); }
private void connect(String nodes, String dataCenter, String username, String password) { Builder builder = Cluster.builder(); if (nodes == null || nodes.isEmpty()) { throw new RuntimeException(Const.CASS_NODES + " is not defined"); } if (dataCenter != null && !dataCenter.isEmpty()) { DCAwareRoundRobinPolicy policy = DCAwareRoundRobinPolicy.builder() .withLocalDc(dataCenter) .build(); builder.withLoadBalancingPolicy(policy); } String[] nodeParts = nodes.split(","); for (String node : nodeParts) { node = node.trim(); if (!node.isEmpty()) { LOGGER.info("Adding Cassandra node {}", node); builder.addContactPoint(node); } } if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { builder.withCredentials(username, password); } cluster = builder.build(); Metadata metadata = cluster.getMetadata(); LOGGER.info("Connected to cluster: {}", metadata.getClusterName()); for (Host host : metadata.getAllHosts()) { LOGGER.info("Datacenter: {} Host: {} Rack: {}", host.getDatacenter(), host.getAddress(), host.getRack()); } }
@Test public void connectViaProperty() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(unit -> { Config conf = unit.get(Config.class); expect(conf.getString("db")).andReturn("cassandra://localhost/beers"); }) .expect(serviceKey(new Env.ServiceKey())) .expect(clusterBuilder) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra() .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void connectViaPropertySupplier() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(unit -> { Config conf = unit.get(Config.class); expect(conf.getString("db")).andReturn("cassandra://localhost/beers"); }) .expect(serviceKey(new Env.ServiceKey())) .expect(clusterBuilderProvider) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra(() -> unit.get(Cluster.Builder.class)) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void onStop() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .expect(unit -> { Session session = unit.get(Session.class); session.close(); Cluster cluster = unit.get(Cluster.class); cluster.close(); }) .run(unit -> { new Cassandra("cassandra://localhost/beers") .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }, unit -> { unit.captured(Throwing.Runnable.class).iterator().next().run(); }); }
@Test public void onStopSessionerr() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .expect(unit -> { Session session = unit.get(Session.class); session.close(); expectLastCall().andThrow(new IllegalStateException("intentional err")); Cluster cluster = unit.get(Cluster.class); cluster.close(); }) .run(unit -> { new Cassandra("cassandra://localhost/beers") .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }, unit -> { unit.captured(Throwing.Runnable.class).iterator().next().run(); }); }
@SuppressWarnings("unchecked") @Test public void withAccessor() throws Exception { Object value = new Object(); new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .expect(unit -> { MappingManager manager = unit.get(MappingManager.class); expect(manager.createAccessor(Object.class)).andReturn(value); AnnotatedBindingBuilder<Object> abb = unit.mock(AnnotatedBindingBuilder.class); abb.toInstance(value); Binder binder = unit.get(Binder.class); expect(binder.bind(Object.class)).andReturn(abb); }) .run(unit -> { new Cassandra("cassandra://localhost/beers") .accesor(Object.class) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void doWithCluster() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class, StateListener.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(routeMapper).expect(onStop) .expect(unit -> { Cluster cluster = unit.get(Cluster.class); expect(cluster.register(unit.get(StateListener.class))).andReturn(cluster); }) .run(unit -> { new Cassandra("cassandra://localhost/beers") .doWithCluster(c -> c.register(unit.get(StateListener.class))) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Test public void doWithClusterBuilder() throws Exception { new MockUnit(Env.class, Config.class, Binder.class, Cluster.class, Cluster.Builder.class, Configuration.class, Session.class) .expect(clusterBuilder) .expect(serviceKey(new Env.ServiceKey())) .expect(contactPoints("localhost")) .expect(port(9042)) .expect(codecRegistry) .expect(bind("beers", Cluster.class)) .expect(bind(null, Cluster.class)) .expect(bind("beers", Session.class)) .expect(bind(null, Session.class)) .expect(connect("beers")) .expect(mapper) .expect(bind("beers", MappingManager.class)) .expect(bind(null, MappingManager.class)) .expect(datastore) .expect(bind("beers", Datastore.class)) .expect(bind(null, Datastore.class)) .expect(unit -> { Builder builder = unit.get(Cluster.Builder.class); expect(builder.withClusterName("mycluster")).andReturn(builder); }) .expect(routeMapper).expect(onStop) .run(unit -> { new Cassandra("cassandra://localhost/beers") .doWithClusterBuilder(b -> { b.withClusterName("mycluster"); }) .configure(unit.get(Env.class), unit.get(Config.class), unit.get(Binder.class)); }); }
@Override public Cluster create(CassandraServiceInfo serviceInfo, ServiceConnectorConfig serviceConnectorConfig) { Builder builder = Cluster.builder() .addContactPoints(serviceInfo.getContactPoints().toArray(new String[0])) .withPort(serviceInfo.getPort()); if (StringUtils.hasText(serviceInfo.getUsername())) { builder.withCredentials(serviceInfo.getUsername(), serviceInfo.getPassword()); } if (serviceConnectorConfig instanceof CassandraClusterConfig) { CassandraClusterConfig config = (CassandraClusterConfig) serviceConnectorConfig; if (config.getCompression() != null) { builder.withCompression(config.getCompression()); } builder.withPoolingOptions(config.getPoolingOptions()); builder.withSocketOptions(config.getSocketOptions()); builder.withQueryOptions(config.getQueryOptions()); builder.withNettyOptions(config.getNettyOptions()); builder.withLoadBalancingPolicy(config.getLoadBalancingPolicy()); builder.withReconnectionPolicy(config.getReconnectionPolicy()); builder.withRetryPolicy(config.getRetryPolicy()); builder.withProtocolVersion(config.getProtocolVersion()); if (!config.isMetricsEnabled()) { builder.withoutMetrics(); } if (!config.isJmxReportingEnabled()) { builder.withoutJMXReporting(); } } return builder.build(); }
@Override public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) throws UnsupportedDataContextPropertiesException, ConnectionException { final Map<String, Object> map = properties.toMap(); final Builder clusterBuilder = Cluster.builder(); final String hostname = properties.getHostname(); if (!Strings.isNullOrEmpty(hostname)) { clusterBuilder.addContactPoints(hostname.split(",")); } if (properties.getPort() != null) { clusterBuilder.withPort(properties.getPort()); } if (map.containsKey("cluster-name")) { clusterBuilder.withClusterName((String) map.get("cluster-name")); } if (properties.getUsername() != null && properties.getPassword() != null) { clusterBuilder.withCredentials(properties.getUsername(), properties.getPassword()); } final Cluster cluster = clusterBuilder.build(); final String keySpace = getString(map.get("keyspace"), properties.getDatabaseName()); return new CassandraDataContext(cluster, keySpace, properties.getTableDefs()); }
@Override public void testStarted() { Builder builder = Cluster.builder().withClusterName(clusterId).addContactPoint(contactPoint); if (StringUtils.isNotBlank(user)) { builder = builder.withCredentials(user, password); } builder = builder.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistency))); ClusterHolder.putBuilder(getClusterId(), builder); }
/** * 描述: 初始化配置 * 时间: 2017年11月15日 上午11:25:07 * @author yi.zhang * @param servers 服务地址 * @param keyspace 命名空间 * @param username 账号 * @param password 密码 */ public void init(String servers,String keyspace,String username,String password) { try { // socket 链接配置 SocketOptions socket = new SocketOptions(); socket.setKeepAlive(true); socket.setReceiveBufferSize(1024* 1024); socket.setSendBufferSize(1024* 1024); socket.setConnectTimeoutMillis(5 * 1000); socket.setReadTimeoutMillis(1000); //设置连接池 PoolingOptions pool = new PoolingOptions(); // pool.setMaxRequestsPerConnection(HostDistance.LOCAL, 32); // pool.setMaxRequestsPerConnection(HostDistance.REMOTE, 32); // pool.setCoreConnectionsPerHost(HostDistance.LOCAL, 2); // pool.setCoreConnectionsPerHost(HostDistance.REMOTE, 2); // pool.setMaxConnectionsPerHost(HostDistance.LOCAL, 4); // pool.setMaxConnectionsPerHost(HostDistance.REMOTE, 4); pool.setHeartbeatIntervalSeconds(60); pool.setIdleTimeoutSeconds(120); pool.setPoolTimeoutMillis(5 * 1000); List<InetSocketAddress> saddress = new ArrayList<InetSocketAddress>(); if (servers != null && !"".equals(servers)) { for (String server : servers.split(",")) { String[] address = server.split(":"); String ip = address[0]; int port = 9042; if (address != null && address.length > 1) { port = Integer.valueOf(address[1]); } saddress.add(new InetSocketAddress(ip, port)); } } InetSocketAddress[] addresses = new InetSocketAddress[saddress.size()]; saddress.toArray(addresses); Builder builder = Cluster.builder(); builder.withSocketOptions(socket); // 设置压缩方式 builder.withCompression(ProtocolOptions.Compression.LZ4); // 负载策略 // DCAwareRoundRobinPolicy loadBalance = DCAwareRoundRobinPolicy.builder().withLocalDc("localDc").withUsedHostsPerRemoteDc(2).allowRemoteDCsForLocalConsistencyLevel().build(); // builder.withLoadBalancingPolicy(loadBalance); // 重试策略 builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); builder.withPoolingOptions(pool); builder.addContactPointsWithPorts(addresses); builder.withCredentials(username, password); Cluster cluster = builder.build(); if (keyspace != null && !"".equals(keyspace)) { session = cluster.connect(keyspace); } else { session = cluster.connect(); } mapping = new MappingManager(session); } catch (Exception e) { logger.error("-----Cassandra Config init Error-----", e); } }
@Override public void accept(Builder builder) { }