Java 类com.amazonaws.services.kinesis.model.Record 实例源码

项目:aws-kinesis-beanstalk-workers    文件:MyRecordProcessor.java   
@Override
public void processRecords(List<Record> records,
        IRecordProcessorCheckpointer checkpointer) {
    LOG.info(String.format("Received %s Records", records.size()));

    // add a call to your business logic here!
    //
    // myLinkedClasses.doSomething(records)
    //
    //
    try {
        checkpointer.checkpoint();
    } catch (KinesisClientLibDependencyException | InvalidStateException
            | ThrottlingException | ShutdownException e) {
        e.printStackTrace();
        super.shutdown(checkpointer, ShutdownReason.ZOMBIE);
    }
}
项目:bender    文件:KinesisWrapper.java   
private KinesisWrapper(final InternalEvent internal) {
  KinesisEventRecord eventRecord = ((KinesisInternalEvent) internal).getRecord();
  Record record = eventRecord.getKinesis();

  this.partitionKey = record.getPartitionKey();
  this.sequenceNumber = record.getSequenceNumber();
  this.eventSource = eventRecord.getEventSource();
  this.sourceArn = eventRecord.getEventSourceARN();
  this.functionName = internal.getCtx().getFunctionName();
  this.functionVersion = internal.getCtx().getFunctionVersion();
  this.processingTime = System.currentTimeMillis();
  this.arrivalTime = record.getApproximateArrivalTimestamp().getTime();
  this.timestamp = internal.getEventTime();
  this.processingDelay = processingTime - timestamp;

  if (internal.getEventObj() != null) {
    this.payload = internal.getEventObj().getPayload();
  } else {
    this.payload = null;
  }
}
项目:aws-kinesis-zombies    文件:ZombieRecordProcessor.java   
@SneakyThrows
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    List<Record> records = processRecordsInput.getRecords();
    // Used to update the last processed record
    IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();
    log.info("Recovering records from kinesis.");
    for (Record r : records) {
        try {
            int len = r.getData().remaining();
            byte[] buffer = new byte[len];
            r.getData().get(buffer);
            String json = new String(buffer, "UTF-8");
            ZombieLecture lecture = mapper.readValue(json, ZombieLecture.class);
            this.processZombieLecture(lecture);
            log.debug(processedRecords++ + ": " + json);
            if (processedRecords % 1000 == 999) {
                // Uncomment next line to keep track of the processed lectures. 
                checkpointer.checkpoint();
            }
        } catch (UnsupportedEncodingException | MessagingException ex) {
            log.warn(ex.getMessage());
        }
    }
}
项目:beam    文件:AmazonKinesisMock.java   
@Override
public AmazonKinesis getKinesisClient() {
  return new AmazonKinesisMock(transform(shardedData,
      new Function<List<TestData>, List<Record>>() {

        @Override
        public List<Record> apply(@Nullable List<TestData> testDatas) {
          return transform(testDatas, new Function<TestData, Record>() {

            @Override
            public Record apply(@Nullable TestData testData) {
              return testData.convertToRecord();
            }
          });
        }
      }), numberOfRecordsPerGet);

}
项目:Camel    文件:KinesisConsumerTest.java   
@Test
public void recordsAreSentToTheProcessor() throws Exception {
    when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
        .thenReturn(new GetRecordsResult()
            .withNextShardIterator("nextShardIterator")
            .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2"))
        );

    int messageCount = undertest.poll();

    assertThat(messageCount, is(2));
    final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);

    verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
    assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getSequenceNumber(), is("1"));
    assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getSequenceNumber(), is("2"));
}
项目:Camel    文件:KinesisConsumerTest.java   
@Test
public void exchangePropertiesAreSet() throws Exception {
    String partitionKey = "partitionKey";
    String sequenceNumber = "1";
    when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
        .thenReturn(new GetRecordsResult()
            .withNextShardIterator("nextShardIterator")
            .withRecords(new Record()
                .withSequenceNumber(sequenceNumber)
                .withApproximateArrivalTimestamp(new Date(42))
                .withPartitionKey(partitionKey)
            )
        );

    undertest.poll();

    final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);

    verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class));
    assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L));
    assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey));
    assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber));
}
项目:apex-malhar    文件:AbstractKinesisInputOperator.java   
/**
 * Implement InputOperator Interface.
 */
@Override
public void emitTuples()
{
  if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
    return;
  }
  int count = consumer.getQueueSize();
  if (maxTuplesPerWindow > 0) {
    count = Math.min(count, maxTuplesPerWindow - emitCount);
  }
  for (int i = 0; i < count; i++) {
    Pair<String, Record> data = consumer.pollRecord();
    String shardId = data.getFirst();
    String recordId = data.getSecond().getSequenceNumber();
    emitTuple(data);
    MutablePair<String, Integer> shardOffsetAndCount = currentWindowRecoveryState.get(shardId);
    if (shardOffsetAndCount == null) {
      currentWindowRecoveryState.put(shardId, new MutablePair<String, Integer>(recordId, 1));
    } else {
      shardOffsetAndCount.setRight(shardOffsetAndCount.right + 1);
    }
    shardPosition.put(shardId, recordId);
  }
  emitCount += count;
}
项目:apex-malhar    文件:KinesisTestConsumer.java   
public String processNextIterator(String iterator)
{
  GetRecordsRequest getRequest = new GetRecordsRequest();
  getRequest.setLimit(1000);

  getRequest.setShardIterator(iterator);
  // call "get" operation and get everything in this shard range
  GetRecordsResult getResponse = client.getRecords(getRequest);

  iterator = getResponse.getNextShardIterator();

  List<Record> records = getResponse.getRecords();
  processResponseRecords(records);

  return iterator;
}
项目:apex-malhar    文件:KinesisTestConsumer.java   
protected void processResponseRecords( List<Record> records )
{
  if ( records == null || records.isEmpty() ) {
    return;
  }
  receiveCount += records.size();
  logger.debug("ReceiveCount= {}", receiveCount);

  for ( Record record : records ) {
    holdingBuffer.add(record);
    if ( shouldProcessRecord ) {
      processRecord( record );
    }

    if ( doneLatch != null ) {
      doneLatch.countDown();
    }
  }
}
项目:apex-malhar    文件:KinesisByteArrayOutputOperatorTest.java   
@Override
protected void processRecord(Record record)
{
  String partitionKey = record.getPartitionKey();
  ByteBuffer data = record.getData();
  logger.info("partitionKey={} ", partitionKey);
  byte[] dataBytes = new byte[data.remaining()];
  data.get(dataBytes, 0, dataBytes.length);

  long key = Long.valueOf(partitionKey);
  TestPOJO expected = new TestPOJO(key);

  TestPOJO read = (TestPOJO)fieldValueGenerator.deserializeObject(dataBytes);

  if (!read.outputFieldsEquals(expected)) {
    logger.error("read is not same as expected. read={}, expected={}", read, expected);
    Assert.assertTrue(false);
  } else {
    logger.info("read is same as expected. read={}, expected={}", read, expected);
  }
}
项目:ingestion-service    文件:RecordProcessor.java   
/**
 * Process a single record.
 *
 * @param record The record to be processed.
 */
private void processSingleRecord(Record record) {

    if (producer == null) {
        producer = createProducer();
    }

    String kafkaServers = System.getenv().get("KAFKA_SERVERS");
    if (null == kafkaServers) {
        kafkaServers = Constants.KAFKA_SERVERS;
    }

    ByteBuffer data = record.getData();


    retryCounter = 0;
    // sending event to Queue
    ProducerRecord<String, byte[]> keyedMessage = new ProducerRecord<>(
            TestConstants.topic,
            TestConstants.partition, data.array());
    this.sendMessage(keyedMessage);
}
项目:samza    文件:TestKinesisSystemConsumer.java   
private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Record> inputRecords, String shardId) {
  Iterator outputRecordsIter = outputRecords.iterator();
  inputRecords.forEach(record -> {
      IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next();
      String outputKey = (String) envelope.getKey();
      KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope;
      Assert.assertEquals(outputKey, record.getPartitionKey());
      Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber());
      Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
          record.getApproximateArrivalTimestamp());
      Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
      ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
      record.getData().rewind();
      Assert.assertTrue(outputData.equals(record.getData()));
      verifyOffset(envelope.getOffset(), record, shardId);
    });
}
项目:samza    文件:TestKinesisRecordProcessor.java   
static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard,
    List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException {
  Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
  processors.forEach(processor -> {
      try {
        // Create records and call process records
        IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
        doNothing().when(checkpointer).checkpoint(anyString());
        doNothing().when(checkpointer).checkpoint();
        ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class);
        when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
        when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
        List<Record> inputRecords = createRecords(numRecordsPerShard);
        processorRecordMap.put(processor, inputRecords);
        when(processRecordsInput.getRecords()).thenReturn(inputRecords);
        processor.processRecords(processRecordsInput);
      } catch (ShutdownException | InvalidStateException ex) {
        throw new RuntimeException(ex);
      }
    });
  return processorRecordMap;
}
项目:samza    文件:TestKinesisRecordProcessor.java   
private static List<Record> createRecords(int numRecords) {
  List<Record> records = new ArrayList<>(numRecords);
  Random rand = new Random();

  for (int i = 0; i < numRecords; i++) {
    String dataStr = "testData-" + System.currentTimeMillis();
    ByteBuffer data = ByteBuffer.wrap(dataStr.getBytes(StandardCharsets.UTF_8));
    String key = String.format("partitionKey-%d", rand.nextLong());
    String seqNum = String.format("%04d", 5 * i + 1);
    Record record = new Record()
        .withData(data)
        .withPartitionKey(key)
        .withSequenceNumber(seqNum)
        .withApproximateArrivalTimestamp(new Date());
    records.add(record);
  }
  return records;
}
项目:sumologic-kinesis-connector    文件:CloudWatchMessageModelSumologicTransformerTest.java   
@Test
public void theTransformerShouldFailGracefullyWhenUnableToCompress () {
  CloudWatchMessageModelSumologicTransformer transfomer = new CloudWatchMessageModelSumologicTransformer();

  String randomData = "Some random string without GZIP compression";
  ByteBuffer bufferedData = null;
  try {
    bufferedData = encoder.encode(CharBuffer.wrap(randomData));
  } catch (Exception e) {
    Assert.fail("Getting error: "+e.getMessage());
  }

  Record mockedRecord = new Record();
  mockedRecord.setData(bufferedData);

  CloudWatchLogsMessageModel messageModel = transfomer.toClass(mockedRecord);


  Assert.assertNull(messageModel);
}
项目:streamline    文件:KinesisRecordToTupleMapper.java   
@Override
public List<Object> getTuple(Record record) {
    CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
    List<Object> tuple = new ArrayList<>();
    tuple.add(record.getPartitionKey());
    tuple.add(record.getSequenceNumber());
    try {
        String data = decoder.decode(record.getData()).toString();
        tuple.add(data);
    } catch (CharacterCodingException e) {
        e.printStackTrace();
        LOG.warn("Exception occured. Emitting tuple with empty string data", e);
        tuple.add("");
    }
    return tuple;
}
项目:datacollector    文件:KinesisUtil.java   
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords(
    ClientConfiguration awsClientConfig,
    KinesisConfigBean conf,
    int maxBatchSize,
    GetShardIteratorRequest getShardIteratorRequest
) throws StageException {
  AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf);

  GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
  String shardIterator = getShardIteratorResult.getShardIterator();

  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(maxBatchSize);

  GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
  return getRecordsResult.getRecords();
}
项目:datacollector    文件:KinesisUtil.java   
public static List<com.streamsets.pipeline.api.Record> processKinesisRecord(
    String shardId,
    Record kRecord,
    DataParserFactory parserFactory
) throws DataParserException, IOException {
  final String recordId = createKinesisRecordId(shardId, kRecord);
  DataParser parser = parserFactory.getParser(recordId, kRecord.getData().array());

  List<com.streamsets.pipeline.api.Record> records = new ArrayList<>();
  com.streamsets.pipeline.api.Record r;
  while ((r = parser.parse()) != null) {
    records.add(r);
  }
  parser.close();
  return records;
}
项目:presto-kinesis    文件:MockKinesisClient.java   
public PutRecordResult putRecord(ByteBuffer data, String partitionKey)
{
    // Create record and insert into the shards.  Initially just do it
    // on a round robin basis.
    long ts = System.currentTimeMillis() - 50000;
    Record rec = new Record();
    rec = rec.withData(data).withPartitionKey(partitionKey).withSequenceNumber(String.valueOf(sequenceNo));
    rec.setApproximateArrivalTimestamp(new Date(ts));

    if (nextShard == shards.size()) {
        nextShard = 0;
    }
    InternalShard shard = shards.get(nextShard);
    shard.addRecord(rec);

    PutRecordResult result = new PutRecordResult();
    result.setSequenceNumber(String.valueOf(sequenceNo));
    result.setShardId(shard.getShardId());

    nextShard++;
    sequenceNo++;

    return result;
}
项目:awsbigdata    文件:Processor.java   
@Override
public void processRecords(List<Record> arg0, IRecordProcessorCheckpointer arg1) {
    counter += arg0.size();
    if (counter > target) {
        System.out.println("Received : " + counter + " records");
        target += target;
    }
    Record rec;
    for(int i = 0; i < arg0.size(); i++) {
        rec = arg0.get(i);
        try {
            verifyRecord(rec.getData());
        } 
        catch (JSONException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}
项目:amazon-kinesis-aggregators    文件:AggregatorProcessor.java   
/**
 * {@inheritDoc}
 */
@Override
public void processRecords(List<Record> records,
        IRecordProcessorCheckpointer checkpointer) {
    LOG.info("Aggregating " + records.size()
            + " records for Kinesis Shard " + kinesisShardId);
    try {
        // run data into the aggregator
        agg.aggregate(records);

        // checkpoint the aggregator and kcl
        agg.checkpoint();
        checkpointer.checkpoint(records.get(records.size() - 1));

        LOG.debug("Kinesis Checkpoint for Shard " + kinesisShardId
                + " Complete");
    } catch (Exception e) {
        e.printStackTrace();
        LOG.error(e);
        shutdown(checkpointer, ShutdownReason.ZOMBIE);
    }
}
项目:aws-big-data-blog    文件:ParseReferrerBolt.java   
@Override
public void execute(Tuple input,  BasicOutputCollector collector) {
    Record record = (Record)input.getValueByField(DefaultKinesisRecordScheme.FIELD_RECORD);
    ByteBuffer buffer = record.getData();
    String data = null; 
    try {
        data = decoder.decode(buffer).toString();
        JSONObject jsonObject = new JSONObject(data);

        String referrer = jsonObject.getString("referrer");

        int firstIndex = referrer.indexOf('.');
        int nextIndex = referrer.indexOf('.',firstIndex+1);
        collector.emit(new Values(referrer.substring(firstIndex+1,nextIndex)));

    } catch (CharacterCodingException|JSONException|IllegalStateException e) {
        LOG.error("Exception when decoding record ", e);
    }
}
项目:aws-big-data-blog    文件:MyRecordProcessor.java   
@Override
public void processRecords(List<Record> records,
        IRecordProcessorCheckpointer checkpointer) {
    LOG.info(String.format("Received %s Records", records.size()));

    // add a call to your business logic here!
    //
    // myLinkedClasses.doSomething(records)
    //
    //
    try {
        checkpointer.checkpoint();
    } catch (KinesisClientLibDependencyException | InvalidStateException
            | ThrottlingException | ShutdownException e) {
        e.printStackTrace();
        super.shutdown(checkpointer, ShutdownReason.ZOMBIE);
    }
}
项目:aws-big-data-blog    文件:StreamsRecordProcessor.java   
private void processRecordsWithRetries(List<Record> records) throws Exception {
    for (Record record : records) {
        int tryCount = 0;
        boolean processedOk = false;
        while (tryCount < NUM_RETRIES) {
            try {
                try {
                    processSingleRecord(record);
                    processedOk = true;
                } catch (Throwable t) {
                    System.out.println("Caught throwable " + t + " while processing record " + record);
                    // exponential backoff
                    Thread.sleep(new Double(Math.pow(2, tryCount) * BACKOFF_TIME_IN_MILLIS).longValue());
                }
            } catch (InterruptedException e) {
                throw e;
            }
        }

        if (!processedOk) {
            throw new Exception("Unable to process record " + record.getPartitionKey() + " after " + NUM_RETRIES);
        }
    }
}
项目:micro-genie    文件:KinesisRawEventRecordProcessor.java   
/** Process records performing retries as needed. Skip "poison pill" records.
 * @param records
 */
private void processRecordsWithRetries(List<Record> records) {
 for(Record record : records) {
  boolean processedSuccessfully = false;
        for (int i = 0; i < NUM_RETRIES; i++) {
           processedSuccessfully = this.processRecord(record); 
           if(processedSuccessfully){
               break;   
           }
           /** If here then a failure occurred, let's back off **/
           this.backOff();
        }
        if (!processedSuccessfully){
            LOGGER.error("Couldn't process record {} - skipping record", record);
            /** TODO add an optional failure handler here, such as a dead letter queue **/
        }
    }
}
项目:Surf    文件:RecordProcessor.java   
@Override
public void processRecords(List<Record> list, IRecordProcessorCheckpointer irpc) {        
    _logger.info("Processing {} records", list.size());
    for(Record r: list){
        String data = new String(r.getData().array());
        long seq = _buffer.next();
        KinesisEvent evt = _buffer.get(seq);
        evt.setData(data);
        _buffer.publish(seq);
    }
    try{
        irpc.checkpoint();
    }
    catch(InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException ex){
        _logger.warn("Exception while checkpointing", ex);
    }
}
项目:Surf    文件:RecordProcessor.java   
@Override
public void processRecords(List<Record> list, IRecordProcessorCheckpointer irpc) {        
    _logger.info("Processing {} records", list.size());
    for(Record r: list){
        String data = new String(r.getData().array());
        long seq = _buffer.next();
        KinesisEvent evt = _buffer.get(seq);
        evt.setData(data);
        _buffer.publish(seq);
    }
    try{
        irpc.checkpoint();
    }
    catch(InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException ex){
        _logger.warn("Exception while checkpointing", ex);
    }
}
项目:stail    文件:RawRecordProcessor.java   
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
    records.forEach(record -> {
        byte[] bytes = new byte[record.getData().remaining()];
        record.getData().get(bytes);
        System.out.write(bytes, 0, bytes.length);
        System.out.println();
    });
}
项目:stail    文件:JSONRecordProcessor.java   
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
    records.forEach(record -> {
        try {
            byte[] bytes = new byte[record.getData().remaining()];
            record.getData().get(bytes);
            System.out.println(mapper.writeValueAsString(mapper.readTree(bytes)));
        } catch (IOException e) {
            logger.error("", e);
        }
    });
}
项目:kafka-connect-kinesis    文件:KinesisSourceTask.java   
@Override
public List<SourceRecord> poll() throws InterruptedException {
  List<SourceRecord> records;

  try {
    GetRecordsResult recordsResult = this.kinesisClient.getRecords(this.recordsRequest);
    records = new ArrayList<>(recordsResult.getRecords().size());
    log.trace("poll() - {} record(s) returned from shard {}.", this.config.kinesisShardId);

    for (Record record : recordsResult.getRecords()) {
      SourceRecord sourceRecord = this.recordConverter.sourceRecord(this.config.kinesisStreamName, this.config.kinesisShardId, record);
      records.add(sourceRecord);
    }

    log.trace("poll() - Changing shard iterator to {}", recordsResult.getNextShardIterator());
    this.recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
  } catch (ProvisionedThroughputExceededException ex) {
    log.warn("poll() - Throughput exceeded sleeping {} ms", this.config.kinesisThroughputExceededBackoffMs, ex);
    this.time.sleep(this.config.kinesisThroughputExceededBackoffMs);
    return new ArrayList<>();
  }

  if (records.isEmpty()) {
    log.trace("poll() - No records returned. Sleeping {} ms.", this.config.kinesisEmptyRecordsBackoffMs);
    this.time.sleep(this.config.kinesisEmptyRecordsBackoffMs);
  }

  return records;
}
项目:kafka-connect-kinesis    文件:TestData.java   
public static Record record() {
  return new Record()
      .withApproximateArrivalTimestamp(EXPECTED_APPROXIMATE_ARRIVAL_TIMESTAMP)
      .withData(ByteBuffer.wrap(EXPECTED_DATA))
      .withPartitionKey(EXPECTED_PARTITION_KEY)
      .withSequenceNumber(EXPECTED_SEQUENCE_NUMBER);
}
项目:zipkin-aws    文件:KinesisSpanProcessorTest.java   
@Test
public void collectorFailsWhenRecordEncodedAsSingleSpan() {
  Span span = TestObjects.LOTS_OF_SPANS[0];
  byte[] encodedSpan = Codec.THRIFT.writeSpan(span);
  Record kinesisRecord = new Record().withData(ByteBuffer.wrap(encodedSpan));
  ProcessRecordsInput kinesisInput = new ProcessRecordsInput().withRecords(Collections.singletonList(kinesisRecord));

  kinesisSpanProcessor.processRecords(kinesisInput);

  assertThat(storage.spanStore().getTraces().size()).isEqualTo(0);

  assertThat(metrics.messagesDropped()).isEqualTo(1);
  assertThat(metrics.bytes()).isEqualTo(encodedSpan.length);
}
项目:zipkin-aws    文件:KinesisSpanProcessorTest.java   
private ProcessRecordsInput createTestData(int count) {
  List<Record> records = new ArrayList<>();

  Span[] spans = Arrays.copyOfRange(TestObjects.LOTS_OF_SPANS, 0, count);

  Arrays.stream(spans)
      .map(s -> ByteBuffer.wrap(Codec.THRIFT.writeSpans(Collections.singletonList(s))))
      .map(b -> new Record().withData(b))
      .forEach(records::add);

  return new ProcessRecordsInput().withRecords(records);
}
项目:beam    文件:AmazonKinesisMock.java   
public Record convertToRecord() {
  return new Record().
      withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
      withData(ByteBuffer.wrap(data.getBytes())).
      withSequenceNumber(sequenceNumber).
      withPartitionKey("");
}
项目:beam    文件:AmazonKinesisMock.java   
@Override
public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
  String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
  int shardId = parseInt(shardIteratorParts[0]);
  int startingRecord = parseInt(shardIteratorParts[1]);
  List<Record> shardData = shardedData.get(shardId);

  int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
  int fromIndex = min(startingRecord, toIndex);
  return new GetRecordsResult()
      .withRecords(shardData.subList(fromIndex, toIndex))
      .withNextShardIterator(String.format("%s:%s", shardId, toIndex))
      .withMillisBehindLatest(0L);
}
项目:flink    文件:FakeKinesisBehavioursFactory.java   
public static List<Record> createRecordBatchWithRange(int min, int max) {
    List<Record> batch = new LinkedList<>();
    for (int i = min; i < max; i++) {
        batch.add(
            new Record()
                .withData(ByteBuffer.wrap(String.valueOf(i).getBytes(ConfigConstants.DEFAULT_CHARSET)))
                .withPartitionKey(UUID.randomUUID().toString())
                .withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
                .withSequenceNumber(String.valueOf(i)));
    }
    return batch;
}
项目:flink    文件:FakeKinesisBehavioursFactory.java   
public static List<Record> createRecordBatchWithRange(int min, int max) {
    List<Record> batch = new LinkedList<>();
    for (int i = min; i < max; i++) {
        batch.add(
            new Record()
                .withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
                .withPartitionKey(UUID.randomUUID().toString())
                .withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
                .withSequenceNumber(String.valueOf(i)));
    }
    return batch;
}
项目:lumber-mill    文件:RecordProcessor.java   
/**
 * {@inheritDoc}
 */
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
   try {
       List<Record> records = processRecordsInput.getRecords();
       Thread.currentThread().setName(kinesisShardId);
       int bytes = calculateSize(records);

       LOG.debug("Got {} records ({} bytes) and is behind latest with {}",
               records.size(), bytes, printTextBehindLatest(processRecordsInput));

       metricsCallback.shardBehindMs (kinesisShardId, processRecordsInput.getMillisBehindLatest());

       Observable observable = Observable.create(subscriber -> {
           try {
               for (Record record : records) {
                   subscriber.onNext(Codecs.BYTES.from(record.getData().array())
                           .put("_shardId", kinesisShardId));
               }
               subscriber.onCompleted();
               metricsCallback.recordsProcessed (kinesisShardId, records.size());
               metricsCallback.bytesProcessed (kinesisShardId,bytes);
           } catch (RuntimeException e) {
               subscriber.onError(e);
           }
       });

       unitOfWorkListener.apply(observable).toBlocking().subscribe();
       transaction.checkpoint(processRecordsInput.getCheckpointer());
   } catch (RuntimeException t) {
       doOnError(t);
   }
}
项目:lumber-mill    文件:RecordProcessor.java   
private int calculateSize(List<Record> records) {
    int bytes = 0;

    for (Record r : records) {
        bytes += r.getData().remaining();
    }

    // We get it in binary, but it's actually sent as Base64
    return bytes * 3 / 2;
}
项目:Camel    文件:KinesisEndpoint.java   
public Exchange createExchange(Record record) {
    Exchange exchange = super.createExchange();
    exchange.getIn().setBody(record);
    exchange.getIn().setHeader(KinesisConstants.APPROX_ARRIVAL_TIME, record.getApproximateArrivalTimestamp());
    exchange.getIn().setHeader(KinesisConstants.PARTITION_KEY, record.getPartitionKey());
    exchange.getIn().setHeader(KinesisConstants.SEQUENCE_NUMBER, record.getSequenceNumber());
    return exchange;
}