Java 类com.datastax.driver.core.policies.TokenAwarePolicy 实例源码

项目:act-platform    文件:ClusterManager.java   
@Override
public void startComponent() {
  if (cluster == null) {
    // Configure and build up the Cassandra cluster.
    cluster = Cluster.builder()
            .withClusterName(clusterName)
            .withPort(port)
            .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
            // TokenAware requires query has routing info (e.g. BoundStatement with all PK value bound).
            .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()))
            .addContactPoints(contactPoints.toArray(new String[contactPoints.size()]))
            .build();

    // Register any codecs.
    cluster.getConfiguration().getCodecRegistry()
            .register(new CassandraEnumCodec<>(AccessMode.class, AccessMode.getValueMap()))
            .register(new CassandraEnumCodec<>(Direction.class, Direction.getValueMap()))
            .register(new CassandraEnumCodec<>(SourceEntity.Type.class, SourceEntity.Type.getValueMap()));

    // Create a session.
    manager = new MappingManager(cluster.connect());
  }
}
项目:cassandra-java-driver-examples    文件:LoadBalancingPolicyExample.java   
static Session connect() {
    String contactPoint = "localhost";
    String keySpace = "ks1";

    if(session == null) {

        DCAwareRoundRobinPolicy dcAwarePolicy = new DCAwareRoundRobinPolicy.Builder().build();
        LoadBalancingPolicy policy = new TokenAwarePolicy(dcAwarePolicy);

        cluster = Cluster.builder().addContactPoint(contactPoint)
                .withLoadBalancingPolicy(policy).build();
        cluster.init();
        for (Host host : cluster.getMetadata().getAllHosts()) {
            System.out.printf("Address: %s, Rack: %s, Datacenter: %s, Tokens: %s\n", host.getAddress(),
                    host.getDatacenter(), host.getRack(), host.getTokens());
        }
    }
    return session;
}
项目:cassandra-count    文件:CqlCount.java   
private void setup()
throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
              CertificateException, UnrecoverableKeyException  {
// Connect to Cassandra
Cluster.Builder clusterBuilder = Cluster.builder()
    .addContactPoint(host)
    .withPort(port)
    .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()));
if (null != username)
    clusterBuilder = clusterBuilder.withCredentials(username, password);
       if (null != truststorePath)
           clusterBuilder = clusterBuilder.withSSL(createSSLOptions());

cluster = clusterBuilder.build();
       if (null == cluster) {
           throw new IOException("Could not create cluster");
       }
session = cluster.connect();
   }
项目:zipkin    文件:DefaultSessionFactory.java   
static Cluster buildCluster(Cassandra3Storage cassandra) {
  Cluster.Builder builder = Cluster.builder();
  List<InetSocketAddress> contactPoints = parseContactPoints(cassandra);
  int defaultPort = findConnectPort(contactPoints);
  builder.addContactPointsWithPorts(contactPoints);
  builder.withPort(defaultPort); // This ends up protocolOptions.port
  if (cassandra.username != null && cassandra.password != null) {
    builder.withCredentials(cassandra.username, cassandra.password);
  }
  builder.withRetryPolicy(ZipkinRetryPolicy.INSTANCE);
  builder.withLoadBalancingPolicy(new TokenAwarePolicy(new LatencyAwarePolicy.Builder(
      cassandra.localDc != null
          ? DCAwareRoundRobinPolicy.builder().withLocalDc(cassandra.localDc).build()
          : new RoundRobinPolicy()
      // This can select remote, but LatencyAwarePolicy will prefer local
  ).build()));
  builder.withPoolingOptions(new PoolingOptions().setMaxConnectionsPerHost(
      HostDistance.LOCAL, cassandra.maxConnections
  ));
  return builder.build();
}
项目:zipkin    文件:SessionFactory.java   
static Cluster buildCluster(CassandraStorage cassandra) {
  Cluster.Builder builder = Cluster.builder();
  List<InetSocketAddress> contactPoints = parseContactPoints(cassandra);
  int defaultPort = findConnectPort(contactPoints);
  builder.addContactPointsWithPorts(contactPoints);
  builder.withPort(defaultPort); // This ends up protocolOptions.port
  if (cassandra.username != null && cassandra.password != null) {
    builder.withCredentials(cassandra.username, cassandra.password);
  }
  builder.withRetryPolicy(ZipkinRetryPolicy.INSTANCE);
  builder.withLoadBalancingPolicy(new TokenAwarePolicy(new LatencyAwarePolicy.Builder(
      cassandra.localDc != null
          ? DCAwareRoundRobinPolicy.builder().withLocalDc(cassandra.localDc).build()
          : new RoundRobinPolicy()
      // This can select remote, but LatencyAwarePolicy will prefer local
  ).build()));
  builder.withPoolingOptions(new PoolingOptions().setMaxConnectionsPerHost(
      HostDistance.LOCAL, cassandra.maxConnections
  ));
  return builder.build();
}
项目:beam    文件:CassandraServiceImpl.java   
/**
 * Get a Cassandra cluster using hosts and port.
 */
private Cluster getCluster(List<String> hosts, int port, String username, String password,
                           String localDc, String consistencyLevel) {
  Cluster.Builder builder = Cluster.builder()
      .addContactPoints(hosts.toArray(new String[0]))
      .withPort(port);

  if (username != null) {
    builder.withAuthProvider(new PlainTextAuthProvider(username, password));
  }

  if (localDc != null) {
    builder.withLoadBalancingPolicy(
        new TokenAwarePolicy(new DCAwareRoundRobinPolicy.Builder().withLocalDc(localDc).build()));
  } else {
    builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
  }

  if (consistencyLevel != null) {
    builder.withQueryOptions(
        new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel)));
  }

  return builder.build();
}
项目:cassandra-loader    文件:CqlDelimUnload.java   
private void setup()
    throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
           CertificateException, UnrecoverableKeyException  {
    // Connect to Cassandra
    PoolingOptions pOpts = new PoolingOptions();
    pOpts.setCoreConnectionsPerHost(HostDistance.LOCAL, 4);
    pOpts.setMaxConnectionsPerHost(HostDistance.LOCAL, 4);
    Cluster.Builder clusterBuilder = Cluster.builder()
        .addContactPoint(host)
        .withPort(port)
        .withPoolingOptions(pOpts)
        .withLoadBalancingPolicy(new TokenAwarePolicy( DCAwareRoundRobinPolicy.builder().build()));
    if (null != username)
        clusterBuilder = clusterBuilder.withCredentials(username, password);
    if (null != truststorePath)
        clusterBuilder = clusterBuilder.withSSL(createSSLOptions());

    cluster = clusterBuilder.build();
    if (null == cluster) {
        throw new IOException("Could not create cluster");
    }
    session = cluster.connect();
}
项目:Mache    文件:MacheAbstractCassandraKafkaSamplerClient.java   
protected void createCache(Map<String, String> mapParams) throws Exception {
    final Cluster.Builder bluePrint = Cluster.builder().withClusterName("BluePrint")
            .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
            .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
            .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy()))
            .addContactPoint(mapParams.get("cassandra.server.ip.address")).withPort(9042);

    cache1 = mache(String.class, CassandraTestEntity.class)
            .cachedBy(guava())
            .storedIn(cassandra()
                    .withCluster(bluePrint)
                    .withKeyspace(mapParams.get("keyspace.name"))
                    .withSchemaOptions(SchemaOptions.CREATE_SCHEMA_IF_NEEDED)
                    .build())
            .withMessaging(kafka()
                    .withKafkaMqConfig(KafkaMqConfigBuilder.builder()
                            .withZkHost(mapParams.get("kafka.connection"))
                            .build())
                    .withTopic(mapParams.get("kafka.topic"))
                    .build())
            .macheUp();
}
项目:FastCSVLoader    文件:CqlFrameLoader.java   
public Cluster cluster() {

        if (cluster != null) return cluster;
        String[] entryPoints = System.getProperty("cassandra.servers", "localhost").split(",");
        String clusterName = System.getProperty("cassandra.cluster-name", "Test Cluster");
        int port = Integer.getInteger("cassandra.port", 9042);
        log.info("Connecting the cluster {} via hosts {} with port {}", clusterName, Arrays.toString(entryPoints), port);
        Cluster.Builder builder = Cluster.builder()
                .addContactPoints(entryPoints)
                .withClusterName(clusterName)
                .withPort(port)
                .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
        cluster = builder.build();
        return cluster;

    }
项目:ats-framework    文件:CassandraDbProvider.java   
/**
 * Currently we connect just once and then reuse the connection.
 * We do not bother with closing the connection.
 *
 * It is normal to use one Session per DB. The Session is thread safe.
 */
private void connect() {

    if (cluster == null) {

        log.info("Connecting to Cassandra server on " + this.dbHost + " at port " + this.dbPort);

        // allow fetching as much data as present in the DB
        QueryOptions queryOptions = new QueryOptions();
        queryOptions.setFetchSize(Integer.MAX_VALUE);
        queryOptions.setConsistencyLevel(ConsistencyLevel.ONE);

        cluster = Cluster.builder()
                         .addContactPoint(this.dbHost)
                         .withPort(this.dbPort)
                         .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()))
                         .withReconnectionPolicy(new ExponentialReconnectionPolicy(500, 30000))
                         .withQueryOptions(queryOptions)
                         .withCredentials(this.dbUser, this.dbPassword)
                         .build();

    }

    if (session == null) {

        log.info("Connecting to Cassandra DB with name " + this.dbName);
        session = cluster.connect(dbName);
    }
}
项目:cassandra-java-driver-examples    文件:TracingExample.java   
public static void main(String[] args){
    Cluster cluster;
    Session session;
    cluster = Cluster
            .builder()
            .addContactPoint("127.0.0.1")
            .withRetryPolicy(DefaultRetryPolicy.INSTANCE) //Other option: DowngradingConsistencyRetryPolicy
            .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()))
            .build();
    session = cluster.connect("demo");


    PreparedStatement statement = session.prepare("INSERT INTO user (id, name) VALUES (?, ?)");
    Statement boundStatement = statement
            .bind(1, "user 1")
            .enableTracing();

    long startTime = System.currentTimeMillis();
    ResultSet resultSet = session.execute(boundStatement);
    long duration = System.currentTimeMillis() - startTime;
    System.out.format("Time taken: %d", duration);

    ExecutionInfo executionInfo = resultSet.getExecutionInfo();
    printQueryTrace(executionInfo.getQueryTrace());
    cluster.close();

}
项目:zipkin    文件:SessionFactoryTest.java   
RoundRobinPolicy toRoundRobinPolicy(Cassandra3Storage storage) {
  return (RoundRobinPolicy) ((LatencyAwarePolicy) ((TokenAwarePolicy) buildCluster(storage)
      .getConfiguration()
      .getPolicies()
      .getLoadBalancingPolicy())
      .getChildPolicy()).getChildPolicy();
}
项目:zipkin    文件:SessionFactoryTest.java   
DCAwareRoundRobinPolicy toDCAwareRoundRobinPolicy(Cassandra3Storage storage) {
  return (DCAwareRoundRobinPolicy) ((LatencyAwarePolicy) ((TokenAwarePolicy) buildCluster(storage)
      .getConfiguration()
      .getPolicies()
      .getLoadBalancingPolicy())
      .getChildPolicy()).getChildPolicy();
}
项目:zipkin    文件:SessionFactoryTest.java   
RoundRobinPolicy toRoundRobinPolicy(CassandraStorage storage) {
  return (RoundRobinPolicy) ((LatencyAwarePolicy) ((TokenAwarePolicy) buildCluster(storage)
      .getConfiguration()
      .getPolicies()
      .getLoadBalancingPolicy())
      .getChildPolicy()).getChildPolicy();
}
项目:zipkin    文件:SessionFactoryTest.java   
DCAwareRoundRobinPolicy toDCAwareRoundRobinPolicy(CassandraStorage storage) {
  return (DCAwareRoundRobinPolicy) ((LatencyAwarePolicy) ((TokenAwarePolicy) buildCluster(storage)
      .getConfiguration()
      .getPolicies()
      .getLoadBalancingPolicy())
      .getChildPolicy()).getChildPolicy();
}
项目:heroic    文件:ManagedSetupConnection.java   
public AsyncFuture<Connection> construct() {
    AsyncFuture<Session> session = async.call(() -> {
        // @formatter:off
        final PoolingOptions pooling = new PoolingOptions();

        final QueryOptions queryOptions = new QueryOptions()
            .setFetchSize(fetchSize)
            .setConsistencyLevel(consistencyLevel);

        final SocketOptions socketOptions = new SocketOptions()
            .setReadTimeoutMillis((int) readTimeout.toMilliseconds());

        final Cluster.Builder cluster = Cluster.builder()
            .addContactPointsWithPorts(seeds)
            .withRetryPolicy(retryPolicy)
            .withPoolingOptions(pooling)
            .withQueryOptions(queryOptions)
            .withSocketOptions(socketOptions)
            .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
        // @formatter:on

        authentication.accept(cluster);
        return cluster.build().connect();
    });

    if (configure) {
        session = session.lazyTransform(s -> {
            return schema.configure(s).directTransform(i -> s);
        });
    }

    return session.lazyTransform(s -> {
        return schema.instance(s).directTransform(schema -> {
            return new Connection(s, schema);
        });
    });
}
项目:cassandra-jdbc-wrapper    文件:UtilsUnitTest.java   
@Test
public void testLoadBalancingPolicyParsing() throws Exception
{
    String lbPolicyStr = "RoundRobinPolicy()";
    System.out.println(lbPolicyStr);
    assertTrue(Utils.parseLbPolicy(lbPolicyStr) instanceof RoundRobinPolicy);
    System.out.println("====================");
    lbPolicyStr = "TokenAwarePolicy(RoundRobinPolicy())";
    System.out.println(lbPolicyStr);
    assertTrue(Utils.parseLbPolicy(lbPolicyStr) instanceof TokenAwarePolicy);
    System.out.println("====================");
    lbPolicyStr = "DCAwareRoundRobinPolicy(\"dc1\")";
    System.out.println(lbPolicyStr);
    assertTrue(Utils.parseLbPolicy(lbPolicyStr) instanceof DCAwareRoundRobinPolicy);
    System.out.println("====================");
    lbPolicyStr = "TokenAwarePolicy(DCAwareRoundRobinPolicy(\"dc1\"))";
    System.out.println(lbPolicyStr);
    assertTrue(Utils.parseLbPolicy(lbPolicyStr) instanceof TokenAwarePolicy);       
    System.out.println("====================");
    lbPolicyStr = "TokenAwarePolicy";
    System.out.println(lbPolicyStr);
    assertTrue(Utils.parseLbPolicy(lbPolicyStr)==null);
    System.out.println("====================");
    lbPolicyStr = "LatencyAwarePolicy(TokenAwarePolicy(RoundRobinPolicy()),(double) 10.5,(long) 1,(long) 10,(long)1,10)";       
    System.out.println(lbPolicyStr);
    assertTrue(Utils.parseLbPolicy(lbPolicyStr) instanceof LatencyAwarePolicy);
    System.out.println("====================");             

}
项目:cassandra-etl    文件:CassandraConnection.java   
private Cluster getNewCluster(String cassandraNodes) {
    return Cluster.builder()
            .withoutJMXReporting()
            .withoutMetrics()
            .addContactPoints(cassandraNodes.split(","))
            .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
            .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(5)))
            .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()))
            .build();
}
项目:dropwizard-cassandra    文件:TokenAwarePolicyFactoryTest.java   
@Test
public void buildsPolicyWithChildPolicy() throws Exception {
    final TokenAwarePolicyFactory factory = new TokenAwarePolicyFactory();
    factory.setSubPolicy(subPolicyFactory);

    final TokenAwarePolicy policy = (TokenAwarePolicy) factory.build();

    assertThat(policy.getChildPolicy()).isSameAs(subPolicy);
}
项目:monasca-persister    文件:CassandraMetricBatch.java   
public CassandraMetricBatch(Metadata metadata, ProtocolOptions protocol, CodecRegistry codec,
    TokenAwarePolicy lbPolicy, int batchLimit) {
  this.protocol = protocol;
  this.codec = codec;
  this.metadata = metadata;
  this.policy = lbPolicy;
  metricQueries = new HashMap<>();
  this.batchLimit = batchLimit;

  metricQueries = new HashMap<>();
  dimensionQueries = new HashMap<>();
  dimensionMetricQueries = new HashMap<>();
  metricDimensionQueries = new HashMap<>();
  measurementQueries = new HashMap<>();
}
项目:kha    文件:CassandraModule.java   
@Provides @Singleton Cluster provideCluster() {
    Cluster cluster = Cluster.builder()
            .addContactPoints("127.0.0.1")
            .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
            .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 5000L))
            .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()))
            .build();
    Metadata metadata = cluster.getMetadata();
    LOGGER.info("Connected to cluster: '{}'", metadata.getClusterName());
    metadata.getAllHosts()
            .forEach(host -> LOGGER.info("Datacenter: '{}'; Host: '{}'; Rack: '{}'",
                    new Object[] { host.getDatacenter(), host.getAddress(), host.getRack() })
            );
    return cluster;
}
项目:izettle-toolbox    文件:CassandraSessionFactory.java   
public CassandraSessionManaged build(Environment environment, String localDc) {
    PoolingOptions poolingOptions = new PoolingOptions();
    poolingOptions.setConnectionsPerHost(HostDistance.LOCAL,  3, 5)
        .setConnectionsPerHost(HostDistance.REMOTE, 1, 2);
    final DCAwareRoundRobinPolicy.Builder builder = DCAwareRoundRobinPolicy.builder();
    if (localDc != null) {
        builder.withLocalDc(localDc);
    }
    QueryOptions queryOptions = new QueryOptions();
    queryOptions.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
    final Cluster cluster = Cluster
        .builder()
        .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
        .withReconnectionPolicy(new ExponentialReconnectionPolicy(10L, 1000L))
        .withQueryOptions(queryOptions)
        .withLoadBalancingPolicy(new TokenAwarePolicy(builder.build()))
        .addContactPoints(getContactPoints().stream().toArray(String[]::new))
        .withPort(getPort())
        .withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(1000, 2))
        .withPoolingOptions(poolingOptions)
        .build();

    cluster.getConfiguration().getCodecRegistry()
        .register(InstantCodec.instance);

    Session session = cluster.connect(getKeySpace());

    CassandraSessionManaged cassandraSessionManaged = new CassandraSessionManaged(cluster, session);
    environment.lifecycle().manage(cassandraSessionManaged);

    return cassandraSessionManaged;
}
项目:staash    文件:PaasPropertiesModule.java   
@Provides
@Named("pooledmetacluster")
Cluster providePooledCluster(@Named("staash.cassclient") String clientType,@Named("staash.metacluster") String clustername) {
    if (clientType.equals("cql")) {
    Cluster cluster = Cluster.builder().withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())).addContactPoint(clustername).build();
    return cluster;
    }else {
        return null;
    }
}
项目:staash    文件:TestPaasPropertiesModule.java   
@Provides
    @Named("pooledmetacluster")
    Cluster providePooledCluster(@Named("paas.cassclient") String clientType,@Named("paas.metacluster") String clustername) {
        if (clientType.equals("cql")) {
        Cluster cluster = Cluster.builder().withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())).addContactPoint(clustername).build();
//        Cluster cluster = Cluster.builder().addContactPoint(clustername).build();
        return cluster;
        }else {
            return null;
        }
    }
项目:blueflood    文件:DatastaxIO.java   
private static void connect() {
    Set<InetSocketAddress> dbHosts = ioconfig.getUniqueBinaryTransportHostsAsInetSocketAddresses();

    int readTimeoutMaxRetries = ioconfig.getReadTimeoutMaxRetries();
    int writeTimeoutMaxRetries = ioconfig.getWriteTimeoutMaxRetries();
    int unavailableMaxRetries = ioconfig.getUnavailableMaxRetries();

    CodecRegistry codecRegistry = new CodecRegistry();

    cluster = Cluster.builder()
            .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(ioconfig.getDatacenterName()).build(), false))
            .withPoolingOptions(getPoolingOptions())
            .withRetryPolicy(new RetryNTimes(readTimeoutMaxRetries, writeTimeoutMaxRetries, unavailableMaxRetries))
            .withCodecRegistry(codecRegistry)
            .withSocketOptions(getSocketOptions())
            .addContactPointsWithPorts(dbHosts)
            .build();

    QueryLogger queryLogger = QueryLogger.builder()
            .withConstantThreshold(5000)
            .build();

    cluster.register(queryLogger);

    if ( LOG.isDebugEnabled() ) {
        logDebugConnectionInfo();
    }

    try {
        session = cluster.connect( CassandraModel.QUOTED_KEYSPACE );
    }
    catch (NoHostAvailableException e){
        // TODO: figure out how to bubble this up
        throw new RuntimeException(e);
    }
}
项目:apache-ignite-cassandra1    文件:CassandraConfig.java   
public static LoadBalancingPolicy getLoadBalancingPolicy() {

        LoadBalancingPolicy loadBalancingPolicy = new TokenAwarePolicy(new RoundRobinPolicy());
        return loadBalancingPolicy;
    }
项目:dropwizard-cassandra    文件:TokenAwarePolicyFactory.java   
@Override
public LoadBalancingPolicy build() {
    return (shuffleReplicas == null)
            ? new TokenAwarePolicy(subPolicy.build())
            : new TokenAwarePolicy(subPolicy.build(), shuffleReplicas);
}
项目:monasca-persister    文件:CassandraCluster.java   
@Inject
public CassandraCluster(final PersisterConfig config) {

  this.dbConfig = config.getCassandraDbConfiguration();

  QueryOptions qo = new QueryOptions();
  qo.setConsistencyLevel(ConsistencyLevel.valueOf(dbConfig.getConsistencyLevel()));
  qo.setDefaultIdempotence(true);

  String[] contactPoints = dbConfig.getContactPoints();
  int retries = dbConfig.getMaxWriteRetries();
  Builder builder = Cluster.builder().addContactPoints(contactPoints).withPort(dbConfig.getPort());
  builder
      .withSocketOptions(new SocketOptions().setConnectTimeoutMillis(dbConfig.getConnectionTimeout())
          .setReadTimeoutMillis(dbConfig.getReadTimeout()));
  builder.withQueryOptions(qo).withRetryPolicy(new MonascaRetryPolicy(retries, retries, retries));

  lbPolicy = new TokenAwarePolicy(
      DCAwareRoundRobinPolicy.builder().withLocalDc(dbConfig.getLocalDataCenter()).build());
  builder.withLoadBalancingPolicy(lbPolicy);

  String user = dbConfig.getUser();
  if (user != null && !user.isEmpty()) {
    builder.withAuthProvider(new PlainTextAuthProvider(dbConfig.getUser(), dbConfig.getPassword()));
  }
  cluster = builder.build();

  PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions();

  poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, dbConfig.getMaxConnections(),
      dbConfig.getMaxConnections()).setConnectionsPerHost(HostDistance.REMOTE,
          dbConfig.getMaxConnections(), dbConfig.getMaxConnections());

  poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, dbConfig.getMaxRequests())
      .setMaxRequestsPerConnection(HostDistance.REMOTE, dbConfig.getMaxRequests());

  metricsSession = cluster.connect(dbConfig.getKeySpace());

  measurementInsertStmt = metricsSession.prepare(MEASUREMENT_INSERT_CQL).setIdempotent(true);
  measurementUpdateStmt = metricsSession.prepare(MEASUREMENT_UPDATE_CQL).setIdempotent(true);
  metricInsertStmt = metricsSession.prepare(METRICS_INSERT_CQL).setIdempotent(true);
  metricUpdateStmt = metricsSession.prepare(METRICS_UPDATE_CQL).setIdempotent(true);
  dimensionStmt = metricsSession.prepare(DIMENSION_INSERT_CQL).setIdempotent(true);
  dimensionMetricStmt = metricsSession.prepare(DIMENSION_METRIC_INSERT_CQL).setIdempotent(true);
  metricDimensionStmt = metricsSession.prepare(METRIC_DIMENSION_INSERT_CQL).setIdempotent(true);

  retrieveMetricIdStmt = metricsSession.prepare(RETRIEVE_METRIC_ID_CQL).setIdempotent(true);
  retrieveMetricDimensionStmt = metricsSession.prepare(RETRIEVE_METRIC_DIMENSION_CQL)
      .setIdempotent(true);

  alarmsSession = cluster.connect(dbConfig.getKeySpace());

  alarmHistoryInsertStmt = alarmsSession.prepare(INSERT_ALARM_STATE_HISTORY_SQL).setIdempotent(true);

  metricIdCache = CacheBuilder.newBuilder()
      .maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();

  dimensionCache = CacheBuilder.newBuilder()
      .maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();

  metricDimensionCache = CacheBuilder.newBuilder()
      .maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();

  logger.info("loading cached definitions from db");

  ExecutorService executor = Executors.newFixedThreadPool(250);

  //a majority of the ids are for metrics not actively receiving msgs anymore
  //loadMetricIdCache(executor);

  loadDimensionCache();

  loadMetricDimensionCache(executor);

  executor.shutdown();
}
项目:monasca-persister    文件:CassandraCluster.java   
public TokenAwarePolicy getLoadBalancePolicy() {
  return lbPolicy;
}