@Override public ResultSet<CassandraDBContext> execute(Query<CassandraDBContext> query) throws QueryExecutionException { try (Cluster cassandraConnection = buildConnection()) { final Metadata metadata = cassandraConnection.getMetadata(); System.out.printf("Connected to cluster: %s", metadata.getClusterName()); for (final Host host : metadata.getAllHosts()) { System.out.printf("Datacenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack()); } try (Session session = cassandraConnection.connect()) { String queryToExecute = query.getQuery(); System.out.println(queryToExecute); com.datastax.driver.core.ResultSet resultSet = session.execute(queryToExecute); printResultSet(resultSet); ExecutionInfo executionInfo = resultSet.getExecutionInfo(); System.out.println(executionInfo); } } // There isn't any resultset for these use-case return new CassandraResultSet(); }
@Override public String toString() { StringBuilder builder = new StringBuilder(); for (ExecutionInfo info : getAllExecutionInfo()) { builder.append("queried=" + info.getQueriedHost()); builder.append("\r\ntried=") .append(Joiner.on(",").join(info.getTriedHosts())); if (info.getAchievedConsistencyLevel() != null) { builder.append("\r\nachievedConsistencyLevel=" + info.getAchievedConsistencyLevel()); } if (info.getQueryTrace() != null) { builder.append("\r\ntraceid=" + info.getQueryTrace().getTraceId()); builder.append("\r\nevents:\r\n" + Joiner.on("\r\n").join(info.getQueryTrace().getEvents())); } } return builder.toString(); }
public void logExecutionInfo(String prefix, ExecutionInfo executionInfo) { if (executionInfo != null) { StringBuilder msg = new StringBuilder("\n" + prefix); msg.append(String.format("\nHost (queried): %s\n", executionInfo.getQueriedHost().toString())); for (Host host : executionInfo.getTriedHosts()) { msg.append(String.format("Host (tried): %s\n", host.toString())); } QueryTrace queryTrace = executionInfo.getQueryTrace(); if (queryTrace != null) { msg.append(String.format("Trace id: %s\n\n", queryTrace.getTraceId())); msg.append(String.format("%-80s | %-12s | %-20s | %-12s\n", "activity", "timestamp", "source", "source_elapsed")); msg.append(String.format("---------------------------------------------------------------------------------+--------------+----------------------+--------------\n")); for (QueryTrace.Event event : queryTrace.getEvents()) { msg.append(String.format("%80s | %12s | %20s | %12s\n", event.getDescription(), format.format(event.getTimestamp()), event.getSource(), event.getSourceElapsedMicros())); } LOG.info(msg.toString()); } else { LOG.warn("Query Trace is null\n" + msg); } } else { LOG.warn("Null execution info"); } }
private void write(String query, Object... values) { logger.debug("query = {} : values = {}", query, values); PreparedStatement stmt = writeStatementCache.getUnchecked(query); BoundStatement bind = stmt.bind(values); ResultSet rs = session.execute(bind); ExecutionInfo executionInfo = rs.getExecutionInfo(); Host queriedHost = executionInfo.getQueriedHost(); logger.debug("queried host = {}", queriedHost); if (tracingEnabled) { QueryTrace queryTrace = executionInfo.getQueryTrace(); if (queryTrace != null) { if (logger.isDebugEnabled()) { logger.debug("{}", toString(queryTrace)); } } } }
/** * Reduces the fetch size and retries the query. Returns true if the query succeeded, false if the root cause * of the exception does not indicate a frame size issue, if the frame size cannot be adjusted down any further, * or if the retried query fails for an unrelated reason. */ private boolean reduceFetchSize(Throwable reason) { if (!isAdaptiveException(reason) || --_remainingAdaptations == 0) { return false; } ExecutionInfo executionInfo = _delegate.getExecutionInfo(); Statement statement = executionInfo.getStatement(); PagingState pagingState = executionInfo.getPagingState(); int fetchSize = statement.getFetchSize(); while (fetchSize > MIN_FETCH_SIZE) { fetchSize = Math.max(fetchSize / 2, MIN_FETCH_SIZE); _log.debug("Retrying query at next page with fetch size {} due to {}", fetchSize, reason.getMessage()); statement.setFetchSize(fetchSize); statement.setPagingState(pagingState); try { _delegate = _session.execute(statement); return true; } catch (Throwable t) { // Exit the adaptation loop if the exception isn't one where adapting further may help if (!isAdaptiveException(t) || --_remainingAdaptations == 0) { return false; } } } return false; }
private ResultSet createPositiveResultSet(String hostName) { ExecutionInfo executionInfo = mock(ExecutionInfo.class); Host host = mock(Host.class); when(host.toString()).thenReturn(hostName); when(executionInfo.getQueriedHost()).thenReturn(host); ResultSet resultSet = mock(ResultSet.class); when(resultSet.getExecutionInfo()).thenReturn(executionInfo); return resultSet; }
public int healthCheck() { final Statement health = QueryBuilder.select().all().from(HEALTHCHECK_KEYSPACE_NAME, "healthcheck") .where(eq("healthkey", "healthy")); health.setConsistencyLevel(ConsistencyLevel.ALL); health.enableTracing(); QueryTrace queryTrace; cluster.register(new LoggingLatencyTracker()); try { final ResultSet results = session.execute(health); final ExecutionInfo executionInfo = results.getExecutionInfo(); queryTrace = executionInfo.getQueryTrace(); } catch (NoHostAvailableException e) { LOG.error("No hosts available", e); return 2; } if (retryPolicy.getLastDecision() != null) { LOG.warn("Could not query all hosts"); if (queryTrace != null) { final Set<InetAddress> missingHosts = new HashSet<>(hosts.size()); for (Host host : hosts) { missingHosts.add(host.getSocketAddress().getAddress()); } for (QueryTrace.Event event : queryTrace.getEvents()) { missingHosts.remove(event.getSource()); LOG.debug("description={} elapsed={} source={} micros={}", event.getDescription(), millis2Date(event.getTimestamp()), event.getSource(), event.getSourceElapsedMicros()); } if (!missingHosts.isEmpty()) { LOG.error("Missing log entries from these hosts: {}", missingHosts); } } return 1; } return 0; }
private AsyncFuture<QueryTrace> buildTrace( final Connection c, final QueryTrace.Identifier what, final long elapsed, List<ExecutionInfo> info ) { final ImmutableList.Builder<AsyncFuture<QueryTrace>> traces = ImmutableList.builder(); for (final ExecutionInfo i : info) { com.datastax.driver.core.QueryTrace qt = i.getQueryTrace(); if (qt == null) { log.warn("Query trace requested, but is not available"); continue; } traces.add(getEvents(c, qt.getTraceId()).directTransform(events -> { final ImmutableList.Builder<QueryTrace> children = ImmutableList.builder(); for (final Event e : events) { final long eventElapsed = TimeUnit.NANOSECONDS.convert(e.getSourceElapsed(), TimeUnit.MICROSECONDS); children.add(QueryTrace.of(QueryTrace.identifier(e.getName()), eventElapsed)); } final QueryTrace.Identifier segment = QueryTrace.identifier( i.getQueriedHost().toString() + "[" + qt.getTraceId().toString() + "]"); final long segmentElapsed = TimeUnit.NANOSECONDS.convert(qt.getDurationMicros(), TimeUnit.MICROSECONDS); return QueryTrace.of(segment, segmentElapsed, children.build()); })); } return async .collect(traces.build()) .directTransform(t -> QueryTrace.of(what, elapsed, ImmutableList.copyOf(t))); }
public ResultSet getRowWithTracing(String keyspace, String table, String key, ConsistencyLevel level) { Query select = QueryBuilder.select().all().from(keyspace, table) .where(QueryBuilder.eq(KEY, key)).setConsistencyLevel(level) .enableTracing(); ResultSet results = session.execute(select); ExecutionInfo executionInfo = results.getExecutionInfo(); System.out.printf("Host (queried): %s\n", executionInfo .getQueriedHost().toString()); for (Host host : executionInfo.getTriedHosts()) { System.out.printf("Host (tried): %s\n", host.toString()); } QueryTrace queryTrace = executionInfo.getQueryTrace(); System.out.printf("Trace id: %s\n\n", queryTrace.getTraceId()); System.out.printf("%-38s | %-12s | %-10s | %-12s\n", "activity", "timestamp", "source", "source_elapsed"); System.out .println("---------------------------------------+--------------+------------+--------------"); for (QueryTrace.Event event : queryTrace.getEvents()) { System.out.printf("%38s | %12s | %10s | %12s\n", event.getDescription(), new Date(event.getTimestamp()), event.getSource(), event.getSourceElapsedMicros()); } select.disableTracing(); return results; }
private CassandraResultSet<K, String> read(String query, DataType keyType, DataType columnType, Map<String, DataType> valueTypes, Object... values) { logger.debug("query = {} : values = {}", query, values); PreparedStatement stmt = readStatementCache.getUnchecked(query); BoundStatement bind = stmt.bind(values); if (statementFetchSize > 0) { bind.setFetchSize(statementFetchSize); } else { bind.setFetchSize(Integer.MAX_VALUE); } long startTimeNanos = System.nanoTime(); ResultSet rs = session.execute(bind); long durationNanos = System.nanoTime() - startTimeNanos; ExecutionInfo executionInfo = rs.getExecutionInfo(); Host queriedHost = executionInfo.getQueriedHost(); logger.debug("queried host = {}", queriedHost); if (tracingEnabled) { QueryTrace queryTrace = executionInfo.getQueryTrace(); if (queryTrace != null) { if (logger.isDebugEnabled()) { logger.debug("{}", toString(queryTrace)); } } } return new DataStaxCassandraResultSet<K>(rs, ObjectUtils.defaultIfNull(keyType, this.keyType), columnType, valueTypes, durationNanos); }
@Override public ExecutionInfo getExecutionInfo() { return null; }
@Override public List<ExecutionInfo> getAllExecutionInfo() { return null; }
@Override public ExecutionInfo getExecutionInfo() { return _delegate.getExecutionInfo(); }
@Override public List<ExecutionInfo> getAllExecutionInfo() { return _delegate.getAllExecutionInfo(); }
public ExecutionInfo getExecutionInfo() { // TODO Auto-generated method stub return null; }
public List<ExecutionInfo> getAllExecutionInfo() { // TODO Auto-generated method stub return null; }
public void putRowWithTracing(String keyspace, String tableName, String rowKey, Integer version, List<ColumnData> values, ConsistencyLevel level) { StringBuilder command = new StringBuilder(); StringBuilder vals = new StringBuilder(); command.append("INSERT INTO ") .append((keyspace == null) ? keyspaceName : keyspace) .append(".").append(tableName).append(" (").append(KEY) .append(",").append(VERSION).append(",").append(DELETED) .append(","); ColumnData pair = values.get(0); command.append(pair.getColumn()); vals.append(pair.getValue()); for (int i = 1; i < values.size(); i++) { pair = values.get(i); command.append(",").append(pair.getColumn()); vals.append(",").append(pair.getValue()); } command.append(") VALUES (").append("'" + rowKey + "',") .append(version + ",").append("false,").append(vals.toString()) .append(");"); LOG.debug(command.toString()); SimpleStatement ss = new SimpleStatement(command.toString()); Query insert = QueryBuilder.batch(ss).setConsistencyLevel(level) .enableTracing(); ResultSet results = session.execute(insert); ExecutionInfo executionInfo = results.getExecutionInfo(); System.out.printf("Host (queried): %s\n", executionInfo .getQueriedHost().toString()); for (Host host : executionInfo.getTriedHosts()) { System.out.printf("Host (tried): %s\n", host.toString()); } QueryTrace queryTrace = executionInfo.getQueryTrace(); System.out.printf("Trace id: %s\n\n", queryTrace.getTraceId()); System.out.printf("%-38s | %-12s | %-10s | %-12s\n", "activity", "timestamp", "source", "source_elapsed"); System.out .println("---------------------------------------+--------------+------------+--------------"); for (QueryTrace.Event event : queryTrace.getEvents()) { System.out.printf("%38s | %12s | %10s | %12s\n", event.getDescription(), new Date(event.getTimestamp()), event.getSource(), event.getSourceElapsedMicros()); } insert.disableTracing(); }
@Override public ExecutionInfo getExecutionInfo() { return list.getExecutionInfo(); }
@Override public ImmutableList<ExecutionInfo> getAllExecutionInfo() { return list.getAllExecutionInfo(); }
@Override public ExecutionInfo getExecutionInfo() { return record.getExecutionInfo(); }
@Override public ImmutableList<ExecutionInfo> getAllExecutionInfo() { return record.getAllExecutionInfo(); }
@Override public ExecutionInfo getExecutionInfo() { return recordList.getExecutionInfo(); }
@Override public ImmutableList<ExecutionInfo> getAllExecutionInfo() { return recordList.getAllExecutionInfo(); }
static net.oneandone.troilus.java7.ResultList<net.oneandone.troilus.java7.Record> convertToJava7(ResultList<Record> recordList) { return new net.oneandone.troilus.java7.ResultList<net.oneandone.troilus.java7.Record>() { @Override public boolean wasApplied() { return recordList.wasApplied(); } @Override public ExecutionInfo getExecutionInfo() { return recordList.getExecutionInfo(); } @Override public ImmutableList<ExecutionInfo> getAllExecutionInfo() { return recordList.getAllExecutionInfo(); } public net.oneandone.troilus.java7.FetchingIterator<net.oneandone.troilus.java7.Record> iterator() { return new net.oneandone.troilus.java7.FetchingIterator<net.oneandone.troilus.java7.Record>() { private final FetchingIterator<Record> iterator = recordList.iterator(); @Override public boolean hasNext() { return iterator.hasNext(); } @Override public net.oneandone.troilus.java7.Record next() { return RecordAdapter.convertToJava7(iterator.next()); } @Override public int getAvailableWithoutFetching() { return iterator.getAvailableWithoutFetching(); } @Override public ListenableFuture<ResultSet> fetchMoreResultsAsync() { return CompletableFutures.toListenableFuture(iterator.fetchMoreResultsAsync()); } @Override public boolean isFullyFetched() { return iterator.isFullyFetched(); } }; } }; }
@Override public ImmutableList<ExecutionInfo> getAllExecutionInfo() { return null; }
@Override public ExecutionInfo getExecutionInfo() { return result.getExecutionInfo(); }
@Override public ImmutableList<ExecutionInfo> getAllExecutionInfo() { return result.getAllExecutionInfo(); }
@Override public ExecutionInfo getExecutionInfo() { return rs.getExecutionInfo(); }
@Override public ImmutableList<ExecutionInfo> getAllExecutionInfo() { return ImmutableList.copyOf(rs.getAllExecutionInfo()); }