@Test public void testVnodeSupport() throws Exception { // Validate that peers as appropriately discovered when connecting to a node and vnodes are // assigned. try (BoundCluster boundCluster = server.register(ClusterSpec.builder().withNumberOfTokens(256).withNodes(3, 3, 3)); Cluster driverCluster = defaultBuilder(boundCluster).build()) { driverCluster.init(); // Should be 9 hosts assertThat(driverCluster.getMetadata().getAllHosts()).hasSize(9); Set<Token> allTokens = new HashSet<>(); for (Host host : driverCluster.getMetadata().getAllHosts()) { assertThat(host.getTokens()).hasSize(256); allTokens.addAll(host.getTokens()); } // Should be 256*9 unique tokens. assertThat(allTokens).hasSize(256 * 9); } }
private void logTokenBatchMap(String name, Map<Token, Deque<BatchStatement>> map) { if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(name); sb.append(": Size: ").append(map.size()); sb.append("; Tokens: |"); for (Entry<Token, Deque<BatchStatement>> entry : map.entrySet()) { sb.append(entry.getKey().toString()).append(":"); for (BatchStatement bs : entry.getValue()) { sb.append(bs.size()).append(","); } sb.append("|."); } logger.debug(sb.toString()); } }
private Observable.Transformer<BoundStatement, Integer> applyMicroBatching() { return tObservable -> tObservable .groupBy(b -> { ByteBuffer routingKey = b.getRoutingKey(ProtocolVersion.NEWEST_SUPPORTED, codecRegistry); Token token = metadata.newToken(routingKey); for (TokenRange tokenRange : session.getCluster().getMetadata().getTokenRanges()) { if (tokenRange.contains(token)) { return tokenRange; } } log.warn("Unable to find any Cassandra node to insert token " + token.toString()); return session.getCluster().getMetadata().getTokenRanges().iterator().next(); }) .flatMap(g -> g.compose(new BoundBatchStatementTransformer())) .flatMap(batch -> rxSession .execute(batch) .compose(applyInsertRetryPolicy()) .map(resultSet -> batch.size()) ); }
private String buildQuery(TokenRange tokenRange) { Token start = tokenRange.getStart(); Token end = tokenRange.getEnd(); List<String> pkColumns = tableMetadata.getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.toList()); String tokenStatement = String.format("token(%s)", String.join(", ", pkColumns)); StringBuilder ret = new StringBuilder(); ret.append("SELECT "); ret.append(tokenStatement); // add the token(pk) statement so that we can count partitions ret.append(", "); ret.append(columns); ret.append(" FROM "); ret.append(tableMetadata.getName()); if (start != null || end != null) ret.append(" WHERE "); if (start != null) { ret.append(tokenStatement); ret.append(" > "); ret.append(start.toString()); } if (start != null && end != null) ret.append(" AND "); if (end != null) { ret.append(tokenStatement); ret.append(" <= "); ret.append(end.toString()); } return ret.toString(); }
@Override public Token getToken(int i) { return row.getToken(i); }
@Override public Token getToken(String name) { return row.getToken(name); }
@Override public Token getPartitionKeyToken() { return row.getPartitionKeyToken(); }
public boolean run() throws Exception { State state = currentState.get(); if (state == null) { // start processing a new token range TokenRange range = tokenRangeIterator.next(); if (range == null) return true; // no more token ranges to process state = new State(range, buildQuery(range)); currentState.set(state); } ResultSet results; Statement statement = new SimpleStatement(state.query); statement.setFetchSize(pageSize); if (state.pagingState != null) statement.setPagingState(state.pagingState); results = client.getSession().execute(statement); state.pagingState = results.getExecutionInfo().getPagingState(); int remaining = results.getAvailableWithoutFetching(); rowCount += remaining; for (Row row : results) { // this call will only succeed if we've added token(partition keys) to the query Token partition = row.getPartitionKeyToken(); if (!state.partitions.contains(partition)) { partitionCount += 1; state.partitions.add(partition); } if (--remaining == 0) break; } if (results.isExhausted() || isWarmup) { // no more pages to fetch or just warming up, ready to move on to another token range currentState.set(null); } return true; }
@Override public Token getToken(int i) { throw new UnsupportedOperationException(); }
@Override public Token getToken(String name) { throw new UnsupportedOperationException(); }
@Override public Token getPartitionKeyToken() { throw new UnsupportedOperationException(); }