Java 类com.amazonaws.services.kinesis.clientlibrary.types.UserRecord 实例源码

项目:beam    文件:SimplifiedKinesisClient.java   
/**
 * Gets records from Kinesis and deaggregates them if needed.
 *
 * @return list of deaggregated records
 * @throws TransientKinesisException - in case of recoverable situation
 */
public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
    final String shardId, final Integer limit)
    throws
    TransientKinesisException {
  return wrapExceptions(new Callable<GetKinesisRecordsResult>() {

    @Override
    public GetKinesisRecordsResult call() throws Exception {
      GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
          .withShardIterator(shardIterator)
          .withLimit(limit));
      return new GetKinesisRecordsResult(
          UserRecord.deaggregate(response.getRecords()),
          response.getNextShardIterator(),
          response.getMillisBehindLatest(),
          streamName, shardId);
    }
  });
}
项目:beam    文件:KinesisRecord.java   
public KinesisRecord(UserRecord record, String streamName, String shardId) {
  this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
      record.getPartitionKey(),
      new Instant(record.getApproximateArrivalTimestamp()),
      Instant.now(),
      streamName, shardId);
}
项目:beam    文件:GetKinesisRecordsResult.java   
public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
    long millisBehindLatest, final String streamName, final String shardId) {
  this.records = transform(records, new Function<UserRecord, KinesisRecord>() {

    @Nullable
    @Override
    public KinesisRecord apply(@Nullable UserRecord input) {
      assert input != null;  // to make FindBugs happy
      return new KinesisRecord(input, streamName, shardId);
    }
  });
  this.nextShardIterator = nextShardIterator;
  this.millisBehindLatest = millisBehindLatest;
}
项目:flink    文件:ShardConsumer.java   
/**
 * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
 * successfully collected sequence number in this shard consumer is also updated so that
 * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
 * iterators if necessary.
 *
 * <p>Note that the server-side Kinesis timestamp is attached to the record when collected. When the
 * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
 *
 * @param record record to deserialize and collect
 * @throws IOException
 */
private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
    throws IOException {
    ByteBuffer recordData = record.getData();

    byte[] dataBytes = new byte[recordData.remaining()];
    recordData.get(dataBytes);

    final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();

    final T value = deserializer.deserialize(
        dataBytes,
        record.getPartitionKey(),
        record.getSequenceNumber(),
        approxArrivalTimestamp,
        subscribedShard.getStreamName(),
        subscribedShard.getShard().getShardId());

    SequenceNumber collectedSequenceNumber = (record.isAggregated())
        ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
        : new SequenceNumber(record.getSequenceNumber());

    fetcherRef.emitRecordAndUpdateState(
        value,
        approxArrivalTimestamp,
        subscribedShardStateIndex,
        collectedSequenceNumber);

    lastSequenceNum = collectedSequenceNumber;
}
项目:flink    文件:ShardConsumer.java   
/**
 * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
 * successfully collected sequence number in this shard consumer is also updated so that
 * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
 * iterators if necessary.
 *
 * Note that the server-side Kinesis timestamp is attached to the record when collected. When the
 * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
 *
 * @param record record to deserialize and collect
 * @throws IOException
 */
private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
    throws IOException {
    ByteBuffer recordData = record.getData();

    byte[] dataBytes = new byte[recordData.remaining()];
    recordData.get(dataBytes);

    final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();

    final T value = deserializer.deserialize(
        dataBytes,
        record.getPartitionKey(),
        record.getSequenceNumber(),
        approxArrivalTimestamp,
        subscribedShard.getStreamName(),
        subscribedShard.getShard().getShardId());

    SequenceNumber collectedSequenceNumber = (record.isAggregated())
        ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
        : new SequenceNumber(record.getSequenceNumber());

    fetcherRef.emitRecordAndUpdateState(
        value,
        approxArrivalTimestamp,
        subscribedShardStateIndex,
        collectedSequenceNumber);

    lastSequenceNum = collectedSequenceNumber;
}
项目:datacollector    文件:KinesisSource.java   
private void previewProcess(
  int maxBatchSize,
  BatchMaker batchMaker
) throws IOException, StageException {
  ClientConfiguration awsClientConfig = AWSUtil.getClientConfiguration(conf.proxyConfig);

  String shardId = KinesisUtil.getLastShardId(awsClientConfig, conf, conf.streamName);

  GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
  getShardIteratorRequest.setStreamName(conf.streamName);
  getShardIteratorRequest.setShardId(shardId);
  getShardIteratorRequest.setShardIteratorType(conf.initialPositionInStream.name());

  List<com.amazonaws.services.kinesis.model.Record> results = KinesisUtil.getPreviewRecords(
      awsClientConfig,
      conf,
      Math.min(conf.maxBatchSize, maxBatchSize),
      getShardIteratorRequest
  );

  int batchSize = results.size() > maxBatchSize ? maxBatchSize : results.size();

  for (int index = 0; index < batchSize; index++) {
    com.amazonaws.services.kinesis.model.Record record = results.get(index);
    UserRecord userRecord = new UserRecord(record);
    KinesisUtil.processKinesisRecord(
        getShardIteratorRequest.getShardId(),
        userRecord,
        parserFactory
    ).forEach(batchMaker::addRecord);
  }
}
项目:datacollector    文件:KinesisTestUtil.java   
public static List<com.amazonaws.services.kinesis.model.Record> getConsumerTestRecords(int i) {
  List<com.amazonaws.services.kinesis.model.Record> records = new ArrayList<>(i);

  for (int j = 0; j < i; j++) {
    com.amazonaws.services.kinesis.model.Record record = new com.amazonaws.services.kinesis.model.Record()
        .withData(ByteBuffer.wrap(String.format("{\"seq\": %s}", j).getBytes()))
        .withPartitionKey(StringUtils.repeat("0", 19))
        .withSequenceNumber(String.valueOf(j))
        .withApproximateArrivalTimestamp(Calendar.getInstance().getTime());
    records.add(new UserRecord(record));
  }

  return records;
}
项目:datacollector    文件:KinesisTestUtil.java   
public static com.amazonaws.services.kinesis.model.Record getBadConsumerTestRecord(int seqNo) {
  com.amazonaws.services.kinesis.model.Record record = new com.amazonaws.services.kinesis.model.Record()
      .withData(ByteBuffer.wrap(String.format("{\"seq\": %s", seqNo).getBytes()))
      .withPartitionKey(StringUtils.repeat("0", 19))
      .withSequenceNumber(String.valueOf(seqNo))
      .withApproximateArrivalTimestamp(Calendar.getInstance().getTime());

  return new UserRecord(record);
}
项目:datacollector    文件:KinesisTestUtil.java   
public static com.amazonaws.services.kinesis.model.Record getConsumerTestRecord(int seqNo) {
  com.amazonaws.services.kinesis.model.Record record = new com.amazonaws.services.kinesis.model.Record()
      .withData(ByteBuffer.wrap(String.format("{\"seq\": %s}", seqNo).getBytes()))
      .withPartitionKey(StringUtils.repeat("0", 19))
      .withSequenceNumber(String.valueOf(seqNo))
      .withApproximateArrivalTimestamp(Calendar.getInstance().getTime());

  return new UserRecord(record);
}
项目:flink    文件:ShardConsumer.java   
@SuppressWarnings("unchecked")
protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
    return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
项目:flink    文件:ShardConsumer.java   
@SuppressWarnings("unchecked")
protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
    return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
项目:datacollector    文件:KinesisUtil.java   
public static String createKinesisRecordId(String shardId, com.amazonaws.services.kinesis.model.Record record) {
  return shardId + "::" + record.getPartitionKey() + "::" + record.getSequenceNumber() + "::" + ((UserRecord)
      record).getSubSequenceNumber();
}