/** * Gets records from Kinesis and deaggregates them if needed. * * @return list of deaggregated records * @throws TransientKinesisException - in case of recoverable situation */ public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName, final String shardId, final Integer limit) throws TransientKinesisException { return wrapExceptions(new Callable<GetKinesisRecordsResult>() { @Override public GetKinesisRecordsResult call() throws Exception { GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest() .withShardIterator(shardIterator) .withLimit(limit)); return new GetKinesisRecordsResult( UserRecord.deaggregate(response.getRecords()), response.getNextShardIterator(), response.getMillisBehindLatest(), streamName, shardId); } }); }
public KinesisRecord(UserRecord record, String streamName, String shardId) { this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(), record.getPartitionKey(), new Instant(record.getApproximateArrivalTimestamp()), Instant.now(), streamName, shardId); }
public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator, long millisBehindLatest, final String streamName, final String shardId) { this.records = transform(records, new Function<UserRecord, KinesisRecord>() { @Nullable @Override public KinesisRecord apply(@Nullable UserRecord input) { assert input != null; // to make FindBugs happy return new KinesisRecord(input, streamName, shardId); } }); this.nextShardIterator = nextShardIterator; this.millisBehindLatest = millisBehindLatest; }
/** * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last * successfully collected sequence number in this shard consumer is also updated so that * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard * iterators if necessary. * * <p>Note that the server-side Kinesis timestamp is attached to the record when collected. When the * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default. * * @param record record to deserialize and collect * @throws IOException */ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) throws IOException { ByteBuffer recordData = record.getData(); byte[] dataBytes = new byte[recordData.remaining()]; recordData.get(dataBytes); final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime(); final T value = deserializer.deserialize( dataBytes, record.getPartitionKey(), record.getSequenceNumber(), approxArrivalTimestamp, subscribedShard.getStreamName(), subscribedShard.getShard().getShardId()); SequenceNumber collectedSequenceNumber = (record.isAggregated()) ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) : new SequenceNumber(record.getSequenceNumber()); fetcherRef.emitRecordAndUpdateState( value, approxArrivalTimestamp, subscribedShardStateIndex, collectedSequenceNumber); lastSequenceNum = collectedSequenceNumber; }
/** * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last * successfully collected sequence number in this shard consumer is also updated so that * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard * iterators if necessary. * * Note that the server-side Kinesis timestamp is attached to the record when collected. When the * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default. * * @param record record to deserialize and collect * @throws IOException */ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) throws IOException { ByteBuffer recordData = record.getData(); byte[] dataBytes = new byte[recordData.remaining()]; recordData.get(dataBytes); final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime(); final T value = deserializer.deserialize( dataBytes, record.getPartitionKey(), record.getSequenceNumber(), approxArrivalTimestamp, subscribedShard.getStreamName(), subscribedShard.getShard().getShardId()); SequenceNumber collectedSequenceNumber = (record.isAggregated()) ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) : new SequenceNumber(record.getSequenceNumber()); fetcherRef.emitRecordAndUpdateState( value, approxArrivalTimestamp, subscribedShardStateIndex, collectedSequenceNumber); lastSequenceNum = collectedSequenceNumber; }
private void previewProcess( int maxBatchSize, BatchMaker batchMaker ) throws IOException, StageException { ClientConfiguration awsClientConfig = AWSUtil.getClientConfiguration(conf.proxyConfig); String shardId = KinesisUtil.getLastShardId(awsClientConfig, conf, conf.streamName); GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(conf.streamName); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(conf.initialPositionInStream.name()); List<com.amazonaws.services.kinesis.model.Record> results = KinesisUtil.getPreviewRecords( awsClientConfig, conf, Math.min(conf.maxBatchSize, maxBatchSize), getShardIteratorRequest ); int batchSize = results.size() > maxBatchSize ? maxBatchSize : results.size(); for (int index = 0; index < batchSize; index++) { com.amazonaws.services.kinesis.model.Record record = results.get(index); UserRecord userRecord = new UserRecord(record); KinesisUtil.processKinesisRecord( getShardIteratorRequest.getShardId(), userRecord, parserFactory ).forEach(batchMaker::addRecord); } }
public static List<com.amazonaws.services.kinesis.model.Record> getConsumerTestRecords(int i) { List<com.amazonaws.services.kinesis.model.Record> records = new ArrayList<>(i); for (int j = 0; j < i; j++) { com.amazonaws.services.kinesis.model.Record record = new com.amazonaws.services.kinesis.model.Record() .withData(ByteBuffer.wrap(String.format("{\"seq\": %s}", j).getBytes())) .withPartitionKey(StringUtils.repeat("0", 19)) .withSequenceNumber(String.valueOf(j)) .withApproximateArrivalTimestamp(Calendar.getInstance().getTime()); records.add(new UserRecord(record)); } return records; }
public static com.amazonaws.services.kinesis.model.Record getBadConsumerTestRecord(int seqNo) { com.amazonaws.services.kinesis.model.Record record = new com.amazonaws.services.kinesis.model.Record() .withData(ByteBuffer.wrap(String.format("{\"seq\": %s", seqNo).getBytes())) .withPartitionKey(StringUtils.repeat("0", 19)) .withSequenceNumber(String.valueOf(seqNo)) .withApproximateArrivalTimestamp(Calendar.getInstance().getTime()); return new UserRecord(record); }
public static com.amazonaws.services.kinesis.model.Record getConsumerTestRecord(int seqNo) { com.amazonaws.services.kinesis.model.Record record = new com.amazonaws.services.kinesis.model.Record() .withData(ByteBuffer.wrap(String.format("{\"seq\": %s}", seqNo).getBytes())) .withPartitionKey(StringUtils.repeat("0", 19)) .withSequenceNumber(String.valueOf(seqNo)) .withApproximateArrivalTimestamp(Calendar.getInstance().getTime()); return new UserRecord(record); }
@SuppressWarnings("unchecked") protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); }
public static String createKinesisRecordId(String shardId, com.amazonaws.services.kinesis.model.Record record) { return shardId + "::" + record.getPartitionKey() + "::" + record.getSequenceNumber() + "::" + ((UserRecord) record).getSubSequenceNumber(); }