private ReadQueryDataImpl(Tablename tablename, ImmutableMap<String, ImmutableList<Object>> keys, ImmutableSet<Clause> whereClauses, ImmutableMap<String, Boolean> columnsToFetch, Integer limit, Boolean allowFiltering, Integer fetchSize, Boolean distinct, PagingState pagingState) { this.tablename = tablename; this.keys = keys; this.whereClauses = whereClauses; this.columnsToFetch = columnsToFetch; this.limit = limit; this.allowFiltering = allowFiltering; this.fetchSize = fetchSize; this.distinct = distinct; this.pagingState = pagingState; }
private TimelinePageVM getPage(Select select, String page, int limit) { //If we have a 'next' page set we deserialise it and add it to the select //statement if (page != null) { select.setPagingState(PagingState.fromString(page)); } //Execute the query ResultSet resultSet = session.execute(select); //Get the next paging state PagingState newPagingState = resultSet.getExecutionInfo().getPagingState(); //The number of rows that can be read without fetching int remaining = resultSet.getAvailableWithoutFetching(); List<XmTimeline> timelines = new ArrayList<>(limit); for (Row row : resultSet) { XmTimeline timeline = TimelineMapper.createTimeline(row); timelines.add(timeline); //If we can't move to the next row without fetching we break if (--remaining == 0) { break; } } //Serialise the next paging state String serializedNewPagingState = newPagingState != null ? newPagingState.toString() : null; //Return an object with a list of timelines and the next paging state return new TimelinePageVM(timelines, serializedNewPagingState); }
/** * Reduces the fetch size and retries the query. Returns true if the query succeeded, false if the root cause * of the exception does not indicate a frame size issue, if the frame size cannot be adjusted down any further, * or if the retried query fails for an unrelated reason. */ private boolean reduceFetchSize(Throwable reason) { if (!isAdaptiveException(reason) || --_remainingAdaptations == 0) { return false; } ExecutionInfo executionInfo = _delegate.getExecutionInfo(); Statement statement = executionInfo.getStatement(); PagingState pagingState = executionInfo.getPagingState(); int fetchSize = statement.getFetchSize(); while (fetchSize > MIN_FETCH_SIZE) { fetchSize = Math.max(fetchSize / 2, MIN_FETCH_SIZE); _log.debug("Retrying query at next page with fetch size {} due to {}", fetchSize, reason.getMessage()); statement.setFetchSize(fetchSize); statement.setPagingState(pagingState); try { _delegate = _session.execute(statement); return true; } catch (Throwable t) { // Exit the adaptation loop if the exception isn't one where adapting further may help if (!isAdaptiveException(t) || --_remainingAdaptations == 0) { return false; } } } return false; }
@Test public void testFetchInvitesPageOfRecords() { PagingState pagingState = null; // page #, page size, # of expected results in the page pagingState = fetchAndAssert(1, 30, 30, pagingState); pagingState = fetchAndAssert(2, 30, 30, pagingState); pagingState = fetchAndAssert(3, 30, 30, pagingState); pagingState = fetchAndAssert(4, 30, 10, pagingState); // Last page results in empty paging state again assertNull(pagingState); }
@Test public void testFetchInvitesPageOfEntities() { PagingState pagingState = null; // page #, page size, # of expected results in the page pagingState = fetchEntityAndAssert(1, 30, 30, pagingState); pagingState = fetchEntityAndAssert(2, 30, 30, pagingState); pagingState = fetchEntityAndAssert(3, 30, 30, pagingState); pagingState = fetchEntityAndAssert(4, 30, 10, pagingState); // Last page results in empty paging state again assertNull(pagingState); }
private PagingState fetchEntityAndAssert(int pageNumber, int pageSize, int expectedSize, PagingState pagingState) { ResultList<InvitesByMonthAndInviteDate> resultList = new DaoImpl(cassandra.getSession(), TABLE_NAME) .readSequenceWithKey("group_id", "group_1") .asEntity(InvitesByMonthAndInviteDate.class) .withFetchSize(pageSize) .withPagingState(pagingState) .execute(); int numRecords = assertSortOrder(resultList); assertEquals("Size should be "+expectedSize, expectedSize, numRecords); return resultList.getExecutionInfo().getPagingState(); }
@Override public Result<AuditLog> getAuditLogs( UUID messageId ) { Statement query = QueryBuilder.select().all().from(TABLE_AUDIT_LOG) .where( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ) ); ResultSet rs = cassandraClient.getApplicationSession().execute( query ); final List<AuditLog> auditLogs = rs.all().stream().map( row -> new AuditLog( AuditLog.Action.valueOf( row.getString( COLUMN_ACTION )), AuditLog.Status.valueOf( row.getString( COLUMN_STATUS )), row.getString( COLUMN_QUEUE_NAME ), row.getString( COLUMN_REGION ), row.getUUID( COLUMN_MESSAGE_ID ), row.getUUID( COLUMN_QUEUE_MESSAGE_ID ), row.getLong( COLUMN_TRANSFER_TIME ) ) ).collect( Collectors.toList() ); return new Result<AuditLog>() { @Override public PagingState getPagingState() { return null; // no paging } @Override public List<AuditLog> getEntities() { return auditLogs; } }; }
@Override public Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize ) { Statement query = QueryBuilder.select().all().from(TABLE_TRANSFER_LOG); query.setFetchSize( fetchSize ); if ( pagingState != null ) { query.setPagingState( pagingState ); } ResultSet rs = cassandraClient.getApplicationSession().execute( query ); final PagingState newPagingState = rs.getExecutionInfo().getPagingState(); final List<TransferLog> transferLogs = new ArrayList<>(); int numReturned = rs.getAvailableWithoutFetching(); for ( int i=0; i<numReturned; i++ ) { Row row = rs.one(); TransferLog tlog = new TransferLog( row.getString( COLUMN_QUEUE_NAME ), row.getString( COLUMN_SOURCE_REGION ), row.getString( COLUMN_DEST_REGION ), row.getUUID( COLUMN_MESSAGE_ID ), row.getLong( COLUMN_TRANSFER_TIME )); transferLogs.add( tlog ); } return new Result<TransferLog>() { @Override public PagingState getPagingState() { return newPagingState; } @Override public List<TransferLog> getEntities() { return transferLogs; } }; }
@Test public void recordTransferLog() throws Exception { TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class ); CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 ); String source = RandomStringUtils.randomAlphanumeric( 15 ); String dest = RandomStringUtils.randomAlphanumeric( 15 ); int numLogs = 100; for ( int i=0; i<numLogs; i++ ) { logSerialization.recordTransferLog( queueName, source, dest, UUIDGen.getTimeUUID()); } int count = 0; int fetchCount = 0; PagingState pagingState = null; while ( true ) { Result<TransferLog> all = logSerialization.getAllTransferLogs( pagingState, 10 ); // we only want entities for our queue List<TransferLog> logs = all.getEntities().stream() .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() ); count += logs.size(); fetchCount++; if ( all.getPagingState() == null ) { break; } pagingState = all.getPagingState(); } Assert.assertEquals( numLogs, count ); }
private List<TransferLog> getTransferLogs(TransferLogSerialization logSerialization) { PagingState pagingState = null; List<TransferLog> allLogs = new ArrayList<>(); while ( true ) { Result<TransferLog> result = logSerialization.getAllTransferLogs( pagingState, 100 ); allLogs.addAll( result.getEntities() ); if ( result.getPagingState() == null ) { break; } pagingState = result.getPagingState(); } return allLogs; }
@Override public void searchVideos(SearchVideosRequest request, StreamObserver<SearchVideosResponse> responseObserver) { LOGGER.debug("Start searching video by tag"); if (!validator.isValid(request, responseObserver)) { return; } final Optional<String> pagingState = Optional .ofNullable(request.getPagingState()) .filter(StringUtils::isNotBlank); videoByTagManager .dsl() .select() .allColumns_FromBaseTable() .where() .tag().Eq(request.getQuery()) .withFetchSize(request.getPageSize()) .withOptionalPagingStateString(pagingState) .getListAsyncWithStats() .handle((tuple2, ex) -> { if (tuple2 != null) { final SearchVideosResponse.Builder builder = SearchVideosResponse.newBuilder(); builder.setQuery(request.getQuery()); tuple2._1().stream().forEach(entity -> builder.addVideos(entity.toResultVideoPreview())); Optional.ofNullable(tuple2._2().getPagingState()) .map(PagingState::toString) .ifPresent(builder::setPagingState); responseObserver.onNext(builder.build()); responseObserver.onCompleted(); LOGGER.debug("End searching video by tag"); } else if (ex != null) { LOGGER.error("Exception when searching video by tag : " + mergeStackTrace(ex)); responseObserver.onError(Status.INTERNAL.withCause(ex).asRuntimeException()); } return tuple2; }); }
@Override public ListRead<ResultList<Record>, Record> withPagingState( PagingState pagingState) { return newQuery(query.withPagingState(pagingState)); }
@Override public ListRead<ResultList<E>, E> withPagingState( PagingState pagingState) { return new ListEntityReadQueryAdapter<>(getContext(), query.withPagingState(pagingState)); }
@Override public ListRead<Count, Count> withPagingState(PagingState pagingState) { throw new IllegalArgumentException("Count readers cannot be configured with paging state."); }
private PagingState fetchAndAssert(int pageNumber, int pageSize, int expectedSize, PagingState pagingState) { Dao dao = new DaoImpl(cassandra.getSession(), TABLE_NAME); ListReadWithUnit<ResultList<Record>, Record> listReadUnit = dao.readSequenceWithKey("group_id", "group_1"); // Pagination requires both: fetchSize and pagingState ListRead<ResultList<Record>, Record> listRead = listReadUnit.all() .withFetchSize(pageSize) .withPagingState(pagingState); ResultList<Record> resultList = listRead.execute(); Iterator<Record> i = resultList.iterator(); int numRecords = assertSortOrder(i); assertEquals("Size should be "+expectedSize, expectedSize, numRecords); return resultList.getExecutionInfo().getPagingState(); }
@Override public PagingState getPagingState() { return pagingState; }
@Override public ListReadQuery withPagingState(PagingState pagingState) { return newQuery(data.pagingState(pagingState)); }
@Override public ListEntityReadQuery<E> withPagingState(PagingState pagingState) { return query.withPagingState(pagingState).asEntity(clazz); }
/** * * @param pagingState paging state to set on driver Statement, or null, if none * @return a cloned query instance with paging state set */ ListRead<T, R> withPagingState(PagingState pagingState);
/** * @return paging state or null, if none * @return */ PagingState getPagingState();
/** * @return a cloned query instance which allows paging */ ListRead<T, R> withPagingState(PagingState pagingState);
/** * Get all transfer logs (for testing purposes) * * @param pagingState Paging state (or null if none) * @param fetchSize Number of rows to be fetched per page (or -1 for default) */ Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize);
PagingState getPagingState();