Java 类com.amazonaws.services.kinesis.producer.UserRecordResult 实例源码

项目:koupler    文件:KinesisEventProducer.java   
public void send(String event) throws UnsupportedEncodingException {
    byte[] bytes = event.getBytes("UTF-8");
    this.metrics.queueEvent(bytes.length);
    ByteBuffer data = ByteBuffer.wrap(bytes);
    String partitionKey = getPartitionKey(event);
    if (partitionKey != null) {
        ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, partitionKey, data);
        Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
            @Override
            public void onFailure(Throwable t) {
                if (t instanceof UserRecordFailedException) {
                    Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts());
                    LOGGER.error(String.format("Record failed to put - %s : %s", last.getErrorCode(), last.getErrorMessage()));
                }
                LOGGER.error("Exception during put", t);
            }

            @Override
            public void onSuccess(UserRecordResult result) {
                metrics.ackEvent();
            }
        });
    }
}
项目:kinesis-kafka-connector    文件:AmazonKinesisSinkTask.java   
@Override
public void put(Collection<SinkRecord> sinkRecords) {

    // If KinesisProducers cannot write to Kinesis Streams (because of
    // connectivity issues, access issues
    // or misconfigured shards we will pause consumption of messages till
    // backlog is cleared

    validateOutStandingRecords();

    String partitionKey;
    for (SinkRecord sinkRecord : sinkRecords) {

        ListenableFuture<UserRecordResult> f;
        // Kinesis does not allow empty partition key
        if (sinkRecord.key() != null && !sinkRecord.key().toString().trim().equals("")) {
            partitionKey = sinkRecord.key().toString().trim();
        } else {
            partitionKey = Integer.toString(sinkRecord.kafkaPartition());
        }

        if (singleKinesisProducerPerPartition)
            f = addUserRecord(producerMap.get(sinkRecord.kafkaPartition() + "@" + sinkRecord.topic()), streamName,
                    partitionKey, usePartitionAsHashKey, sinkRecord);
        else
            f = addUserRecord(kinesisProducer, streamName, partitionKey, usePartitionAsHashKey, sinkRecord);

        Futures.addCallback(f, callback);

    }
}
项目:kinesis-kafka-connector    文件:AmazonKinesisSinkTask.java   
private ListenableFuture<UserRecordResult> addUserRecord(KinesisProducer kp, String streamName, String partitionKey,
        boolean usePartitionAsHashKey, SinkRecord sinkRecord) {

    // If configured use kafka partition key as explicit hash key
    // This will be useful when sending data from same partition into
    // same shard
    if (usePartitionAsHashKey)
        return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()),
                DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
    else
        return kp.addUserRecord(streamName, partitionKey,
                DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));

}
项目:aws-kinesis-zombies    文件:Drone.java   
@Override
public void onSuccess(UserRecordResult result) {
    recordsCompleted.getAndIncrement();
    if (recordsCompleted.get() % NUMBER_OF_ZOMBIES == 0) {
        log.info(format("Records completed: %s; Shard: %s; SequenceNumber: %s.",
                     recordsCompleted.get(), result.getShardId(), result.getSequenceNumber()));

    }
}
项目:aws-kinesis-zombies    文件:Drone.java   
@SneakyThrows
public void putNewRecord(Zombie zombie) {        
    CoordinateUTM utm = zombie.getCurrentPosition();
    CoordinateLatLon latLon = Datum.WGS84.utmToLatLon(utm);
    ZombieLecture lect = new ZombieLecture(id, zombie.getId(), new Date(), latLon.getLat(), latLon.getLon());
    utm.setAccuracy(RADIOUS);
    String partitionKey = utm.getShortForm();
    String json = mapper.writeValueAsString(lect);
    ByteBuffer data = ByteBuffer.wrap(json.getBytes("UTF-8"));
    ListenableFuture<UserRecordResult> f
            = producer.addUserRecord(streamName, partitionKey, data);
    Futures.addCallback(f, this.recordSentCallback);
}
项目:flink    文件:FlinkKinesisProducer.java   
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);

    // check and pass the configuration properties
    KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);

    producer = getKinesisProducer(producerConfig);
    callback = new FutureCallback<UserRecordResult>() {
        @Override
        public void onSuccess(UserRecordResult result) {
            if (!result.isSuccessful()) {
                if (failOnError) {
                    // only remember the first thrown exception
                    if (thrownException == null) {
                        thrownException = new RuntimeException("Record was not sent successful");
                    }
                } else {
                    LOG.warn("Record was not sent successful");
                }
            }
        }

        @Override
        public void onFailure(Throwable t) {
            if (failOnError) {
                thrownException = t;
            } else {
                LOG.warn("An exception occurred while processing a record", t);
            }
        }
    };

    if (this.customPartitioner != null) {
        this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
    }

    LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}
项目:flink    文件:FlinkKinesisProducer.java   
@Override
public void invoke(OUT value, Context context) throws Exception {
    if (this.producer == null) {
        throw new RuntimeException("Kinesis producer has been closed");
    }

    checkAndPropagateAsyncError();

    String stream = defaultStream;
    String partition = defaultPartition;

    ByteBuffer serialized = schema.serialize(value);

    // maybe set custom stream
    String customStream = schema.getTargetStream(value);
    if (customStream != null) {
        stream = customStream;
    }

    String explicitHashkey = null;
    // maybe set custom partition
    if (customPartitioner != null) {
        partition = customPartitioner.getPartitionId(value);
        explicitHashkey = customPartitioner.getExplicitHashKey(value);
    }

    if (stream == null) {
        if (failOnError) {
            throw new RuntimeException("No target stream set");
        } else {
            LOG.warn("No target stream set. Skipping record");
            return;
        }
    }

    ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
    Futures.addCallback(cb, callback);
}
项目:flink    文件:FlinkKinesisProducerTest.java   
private boolean isAllRecordFuturesCompleted() {
    for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
        if (!future.isDone()) {
            return false;
        }
    }

    return true;
}
项目:flink    文件:FlinkKinesisProducerTest.java   
private int getNumPendingRecordFutures() {
    int numPending = 0;

    for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
        if (!future.isDone()) {
            numPending++;
        }
    }

    return numPending;
}
项目:datacollector    文件:KinesisTarget.java   
private void getAndCheck(Future<UserRecordResult> future) throws StageException {
  try {
    UserRecordResult result = future.get();
    if (!result.isSuccessful()) {
      for (Attempt attempt : result.getAttempts()) {
        LOG.error("Failed to put record: {}", attempt.getErrorMessage());
      }
      throw new StageException(Errors.KINESIS_00, result.getAttempts().get(0).getErrorMessage());
    }
  } catch (InterruptedException | ExecutionException e) {
    LOG.error("Pipeline is shutting down.", e);
    // We should flush if we encounter an error.
    kinesisProducer.flushSync();
  }
}
项目:datacollector    文件:TestKinesisTarget.java   
@SuppressWarnings("unchecked")
@Test
public void testRecordTooLarge() throws Exception {
  KinesisProducerConfigBean config = getKinesisTargetConfig();

  KinesisTarget target = new KinesisTarget(config);
  TargetRunner targetRunner = new TargetRunner.Builder(
      KinesisDTarget.class,
      target
  ).setOnRecordError(OnRecordError.TO_ERROR).build();

  KinesisTestUtil.mockKinesisUtil(1);

  KinesisProducer producer = mock(KinesisProducer.class);
  Whitebox.setInternalState(target, "kinesisProducer", producer);

  targetRunner.runInit();

  ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);

  UserRecordResult result = mock(UserRecordResult.class);

  when(result.isSuccessful()).thenReturn(true);

  when(future.get()).thenReturn(result);

  when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
      .thenReturn(future);

  List<Record> records = new ArrayList<>(4);
  records.add(KinesisTestUtil.getTooLargeRecord());
  records.addAll(KinesisTestUtil.getProducerTestRecords(3));
  targetRunner.runWrite(records);

  // Verify we added 3 good records at the end of the batch but not the bad one
  verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));

  assertEquals(1, targetRunner.getErrorRecords().size());
  targetRunner.runDestroy();
}
项目:flink-stream-processing-refarch    文件:WatermarkTracker.java   
/** Track the timestamp of the event for determining watermark values until it has been sent or dropped. */
public void trackTimestamp(ListenableFuture<UserRecordResult> f, TripEvent event) {
  Futures.addCallback(f, new RemoveTimestampCallback(event));
}
项目:flink-stream-processing-refarch    文件:WatermarkTracker.java   
@Override
public void onSuccess(UserRecordResult result) {
  removeEvent();
}
项目:flink    文件:FlinkKinesisProducerTest.java   
/**
 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
 *
 * <p>Note that this test does not test the snapshot method is blocked correctly when there are pending records.
 * The test for that is covered in testAtLeastOnceProducer.
 */
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test(timeout = 10000)
public void testAsyncErrorRethrownAfterFlush() throws Throwable {
    final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());

    OneInputStreamOperatorTestHarness<String, Object> testHarness =
        new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));

    testHarness.open();

    testHarness.processElement(new StreamRecord<>("msg-1"));
    testHarness.processElement(new StreamRecord<>("msg-2"));
    testHarness.processElement(new StreamRecord<>("msg-3"));

    // only let the first record succeed for now
    UserRecordResult result = mock(UserRecordResult.class);
    when(result.isSuccessful()).thenReturn(true);
    producer.getPendingRecordFutures().get(0).set(result);

    CheckedThread snapshotThread = new CheckedThread() {
        @Override
        public void go() throws Exception {
            // this should block at first, since there are still two pending records that needs to be flushed
            testHarness.snapshot(123L, 123L);
        }
    };
    snapshotThread.start();

    // let the 2nd message fail with an async exception
    producer.getPendingRecordFutures().get(1).setException(new Exception("artificial async failure for 2nd message"));
    producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class));

    try {
        snapshotThread.sync();
    } catch (Exception e) {
        // after the flush, the async exception should have been rethrown
        Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async failure for 2nd message").isPresent());

        // test succeeded
        return;
    }

    Assert.fail();
}
项目:flink    文件:FlinkKinesisProducerTest.java   
/**
 * Test ensuring that the producer is not dropping buffered records;
 * we set a timeout because the test will not finish if the logic is broken.
 */
@SuppressWarnings({"unchecked", "ResultOfMethodCallIgnored"})
@Test(timeout = 10000)
public void testAtLeastOnceProducer() throws Throwable {
    final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());

    OneInputStreamOperatorTestHarness<String, Object> testHarness =
        new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));

    testHarness.open();

    testHarness.processElement(new StreamRecord<>("msg-1"));
    testHarness.processElement(new StreamRecord<>("msg-2"));
    testHarness.processElement(new StreamRecord<>("msg-3"));

    // start a thread to perform checkpointing
    CheckedThread snapshotThread = new CheckedThread() {
        @Override
        public void go() throws Exception {
            // this should block until all records are flushed;
            // if the snapshot implementation returns before pending records are flushed,
            testHarness.snapshot(123L, 123L);
        }
    };
    snapshotThread.start();

    // before proceeding, make sure that flushing has started and that the snapshot is still blocked;
    // this would block forever if the snapshot didn't perform a flush
    producer.waitUntilFlushStarted();
    Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());

    // now, complete the callbacks
    UserRecordResult result = mock(UserRecordResult.class);
    when(result.isSuccessful()).thenReturn(true);

    producer.getPendingRecordFutures().get(0).set(result);
    Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());

    producer.getPendingRecordFutures().get(1).set(result);
    Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());

    producer.getPendingRecordFutures().get(2).set(result);

    // this would fail with an exception if flushing wasn't completed before the snapshot method returned
    snapshotThread.sync();

    testHarness.close();
}
项目:flink    文件:FlinkKinesisProducerTest.java   
List<SettableFuture<UserRecordResult>> getPendingRecordFutures() {
    return pendingRecordFutures;
}
项目:flink    文件:FlinkKinesisProducer.java   
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);

    KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();

    producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
    producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
    if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
        producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
                ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
    }
    if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
        producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
                ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
    }

    producer = new KinesisProducer(producerConfig);
    callback = new FutureCallback<UserRecordResult>() {
        @Override
        public void onSuccess(UserRecordResult result) {
            if (!result.isSuccessful()) {
                if(failOnError) {
                    thrownException = new RuntimeException("Record was not sent successful");
                } else {
                    LOG.warn("Record was not sent successful");
                }
            }
        }

        @Override
        public void onFailure(Throwable t) {
            if (failOnError) {
                thrownException = t;
            } else {
                LOG.warn("An exception occurred while processing a record", t);
            }
        }
    };

    if (this.customPartitioner != null) {
        this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
    }

    LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}
项目:flink    文件:FlinkKinesisProducer.java   
@Override
public void invoke(OUT value) throws Exception {
    if (this.producer == null) {
        throw new RuntimeException("Kinesis producer has been closed");
    }
    if (thrownException != null) {
        String errorMessages = "";
        if (thrownException instanceof UserRecordFailedException) {
            List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
            for (Attempt attempt: attempts) {
                if (attempt.getErrorMessage() != null) {
                    errorMessages += attempt.getErrorMessage() +"\n";
                }
            }
        }
        if (failOnError) {
            throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
        } else {
            LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
            thrownException = null; // reset
        }
    }

    String stream = defaultStream;
    String partition = defaultPartition;

    ByteBuffer serialized = schema.serialize(value);

    // maybe set custom stream
    String customStream = schema.getTargetStream(value);
    if (customStream != null) {
        stream = customStream;
    }

    String explicitHashkey = null;
    // maybe set custom partition
    if (customPartitioner != null) {
        partition = customPartitioner.getPartitionId(value);
        explicitHashkey = customPartitioner.getExplicitHashKey(value);
    }

    if (stream == null) {
        if (failOnError) {
            throw new RuntimeException("No target stream set");
        } else {
            LOG.warn("No target stream set. Skipping record");
            return;
        }
    }

    ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
    Futures.addCallback(cb, callback);
}
项目:datacollector    文件:TestKinesisTarget.java   
@SuppressWarnings("unchecked")
@Test
public void testInOrderProduce() throws Exception {
  KinesisProducerConfigBean config = getKinesisTargetConfig();
  config.preserveOrdering = true;

  KinesisTarget target = new KinesisTarget(config);
  TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build();

  PowerMockito.mockStatic(KinesisUtil.class);

  when(KinesisUtil.checkStreamExists(
      any(ClientConfiguration.class),
      any(KinesisConfigBean.class),
      any(String.class),
      any(List.class),
      any(Stage.Context.class)
      )
  ).thenReturn(1L);

  KinesisProducer producer = mock(KinesisProducer.class);
  Whitebox.setInternalState(target, "kinesisProducer", producer);

  targetRunner.runInit();

  ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);

  UserRecordResult result = mock(UserRecordResult.class);

  when(result.isSuccessful()).thenReturn(true);
  when(result.getShardId()).thenReturn("shardId-000000000000");

  when(future.get()).thenReturn(result);

  when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
      .thenReturn(future);

  targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3));

  // Verify we added 3 records to stream test
  verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));
  // With preserveOrdering we should call flushSync for each record, plus once more for the batch.
  // The last invocation has no effect as no records should be pending.
  verify(producer, times(4)).flushSync();

  targetRunner.runDestroy();
}
项目:datacollector    文件:TestKinesisTarget.java   
@SuppressWarnings("unchecked")
@Test
public void testDefaultProduce() throws Exception {
  KinesisProducerConfigBean config = getKinesisTargetConfig();

  KinesisTarget target = new KinesisTarget(config);
  TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build();

  KinesisTestUtil.mockKinesisUtil(1);

  KinesisProducer producer = mock(KinesisProducer.class);
  Whitebox.setInternalState(target, "kinesisProducer", producer);

  targetRunner.runInit();

  ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);

  UserRecordResult result = mock(UserRecordResult.class);

  when(result.isSuccessful()).thenReturn(true);

  when(future.get()).thenReturn(result);

  when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
      .thenReturn(future);

  targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3));

  // Verify we added 3 records
  verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));
  // By default we should only call flushSync one time per batch.
  verify(producer, times(1)).flushSync();

  targetRunner.runDestroy();
}
项目:kinesis-kafka-connector    文件:AmazonKinesisSinkTask.java   
@Override
public void onSuccess(UserRecordResult result) {

}