public void send(String event) throws UnsupportedEncodingException { byte[] bytes = event.getBytes("UTF-8"); this.metrics.queueEvent(bytes.length); ByteBuffer data = ByteBuffer.wrap(bytes); String partitionKey = getPartitionKey(event); if (partitionKey != null) { ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, partitionKey, data); Futures.addCallback(f, new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { if (t instanceof UserRecordFailedException) { Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts()); LOGGER.error(String.format("Record failed to put - %s : %s", last.getErrorCode(), last.getErrorMessage())); } LOGGER.error("Exception during put", t); } @Override public void onSuccess(UserRecordResult result) { metrics.ackEvent(); } }); } }
@Override public void put(Collection<SinkRecord> sinkRecords) { // If KinesisProducers cannot write to Kinesis Streams (because of // connectivity issues, access issues // or misconfigured shards we will pause consumption of messages till // backlog is cleared validateOutStandingRecords(); String partitionKey; for (SinkRecord sinkRecord : sinkRecords) { ListenableFuture<UserRecordResult> f; // Kinesis does not allow empty partition key if (sinkRecord.key() != null && !sinkRecord.key().toString().trim().equals("")) { partitionKey = sinkRecord.key().toString().trim(); } else { partitionKey = Integer.toString(sinkRecord.kafkaPartition()); } if (singleKinesisProducerPerPartition) f = addUserRecord(producerMap.get(sinkRecord.kafkaPartition() + "@" + sinkRecord.topic()), streamName, partitionKey, usePartitionAsHashKey, sinkRecord); else f = addUserRecord(kinesisProducer, streamName, partitionKey, usePartitionAsHashKey, sinkRecord); Futures.addCallback(f, callback); } }
private ListenableFuture<UserRecordResult> addUserRecord(KinesisProducer kp, String streamName, String partitionKey, boolean usePartitionAsHashKey, SinkRecord sinkRecord) { // If configured use kafka partition key as explicit hash key // This will be useful when sending data from same partition into // same shard if (usePartitionAsHashKey) return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()), DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value())); else return kp.addUserRecord(streamName, partitionKey, DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value())); }
@Override public void onSuccess(UserRecordResult result) { recordsCompleted.getAndIncrement(); if (recordsCompleted.get() % NUMBER_OF_ZOMBIES == 0) { log.info(format("Records completed: %s; Shard: %s; SequenceNumber: %s.", recordsCompleted.get(), result.getShardId(), result.getSequenceNumber())); } }
@SneakyThrows public void putNewRecord(Zombie zombie) { CoordinateUTM utm = zombie.getCurrentPosition(); CoordinateLatLon latLon = Datum.WGS84.utmToLatLon(utm); ZombieLecture lect = new ZombieLecture(id, zombie.getId(), new Date(), latLon.getLat(), latLon.getLon()); utm.setAccuracy(RADIOUS); String partitionKey = utm.getShortForm(); String json = mapper.writeValueAsString(lect); ByteBuffer data = ByteBuffer.wrap(json.getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, partitionKey, data); Futures.addCallback(f, this.recordSentCallback); }
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); // check and pass the configuration properties KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps); producer = getKinesisProducer(producerConfig); callback = new FutureCallback<UserRecordResult>() { @Override public void onSuccess(UserRecordResult result) { if (!result.isSuccessful()) { if (failOnError) { // only remember the first thrown exception if (thrownException == null) { thrownException = new RuntimeException("Record was not sent successful"); } } else { LOG.warn("Record was not sent successful"); } } } @Override public void onFailure(Throwable t) { if (failOnError) { thrownException = t; } else { LOG.warn("An exception occurred while processing a record", t); } } }; if (this.customPartitioner != null) { this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion()); }
@Override public void invoke(OUT value, Context context) throws Exception { if (this.producer == null) { throw new RuntimeException("Kinesis producer has been closed"); } checkAndPropagateAsyncError(); String stream = defaultStream; String partition = defaultPartition; ByteBuffer serialized = schema.serialize(value); // maybe set custom stream String customStream = schema.getTargetStream(value); if (customStream != null) { stream = customStream; } String explicitHashkey = null; // maybe set custom partition if (customPartitioner != null) { partition = customPartitioner.getPartitionId(value); explicitHashkey = customPartitioner.getExplicitHashKey(value); } if (stream == null) { if (failOnError) { throw new RuntimeException("No target stream set"); } else { LOG.warn("No target stream set. Skipping record"); return; } } ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized); Futures.addCallback(cb, callback); }
private boolean isAllRecordFuturesCompleted() { for (SettableFuture<UserRecordResult> future : pendingRecordFutures) { if (!future.isDone()) { return false; } } return true; }
private int getNumPendingRecordFutures() { int numPending = 0; for (SettableFuture<UserRecordResult> future : pendingRecordFutures) { if (!future.isDone()) { numPending++; } } return numPending; }
private void getAndCheck(Future<UserRecordResult> future) throws StageException { try { UserRecordResult result = future.get(); if (!result.isSuccessful()) { for (Attempt attempt : result.getAttempts()) { LOG.error("Failed to put record: {}", attempt.getErrorMessage()); } throw new StageException(Errors.KINESIS_00, result.getAttempts().get(0).getErrorMessage()); } } catch (InterruptedException | ExecutionException e) { LOG.error("Pipeline is shutting down.", e); // We should flush if we encounter an error. kinesisProducer.flushSync(); } }
@SuppressWarnings("unchecked") @Test public void testRecordTooLarge() throws Exception { KinesisProducerConfigBean config = getKinesisTargetConfig(); KinesisTarget target = new KinesisTarget(config); TargetRunner targetRunner = new TargetRunner.Builder( KinesisDTarget.class, target ).setOnRecordError(OnRecordError.TO_ERROR).build(); KinesisTestUtil.mockKinesisUtil(1); KinesisProducer producer = mock(KinesisProducer.class); Whitebox.setInternalState(target, "kinesisProducer", producer); targetRunner.runInit(); ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class); UserRecordResult result = mock(UserRecordResult.class); when(result.isSuccessful()).thenReturn(true); when(future.get()).thenReturn(result); when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class))) .thenReturn(future); List<Record> records = new ArrayList<>(4); records.add(KinesisTestUtil.getTooLargeRecord()); records.addAll(KinesisTestUtil.getProducerTestRecords(3)); targetRunner.runWrite(records); // Verify we added 3 good records at the end of the batch but not the bad one verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class)); assertEquals(1, targetRunner.getErrorRecords().size()); targetRunner.runDestroy(); }
/** Track the timestamp of the event for determining watermark values until it has been sent or dropped. */ public void trackTimestamp(ListenableFuture<UserRecordResult> f, TripEvent event) { Futures.addCallback(f, new RemoveTimestampCallback(event)); }
@Override public void onSuccess(UserRecordResult result) { removeEvent(); }
/** * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint, * it should be rethrown; we set a timeout because the test will not finish if the logic is broken. * * <p>Note that this test does not test the snapshot method is blocked correctly when there are pending records. * The test for that is covered in testAtLeastOnceProducer. */ @SuppressWarnings("ResultOfMethodCallIgnored") @Test(timeout = 10000) public void testAsyncErrorRethrownAfterFlush() throws Throwable { final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); testHarness.open(); testHarness.processElement(new StreamRecord<>("msg-1")); testHarness.processElement(new StreamRecord<>("msg-2")); testHarness.processElement(new StreamRecord<>("msg-3")); // only let the first record succeed for now UserRecordResult result = mock(UserRecordResult.class); when(result.isSuccessful()).thenReturn(true); producer.getPendingRecordFutures().get(0).set(result); CheckedThread snapshotThread = new CheckedThread() { @Override public void go() throws Exception { // this should block at first, since there are still two pending records that needs to be flushed testHarness.snapshot(123L, 123L); } }; snapshotThread.start(); // let the 2nd message fail with an async exception producer.getPendingRecordFutures().get(1).setException(new Exception("artificial async failure for 2nd message")); producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class)); try { snapshotThread.sync(); } catch (Exception e) { // after the flush, the async exception should have been rethrown Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async failure for 2nd message").isPresent()); // test succeeded return; } Assert.fail(); }
/** * Test ensuring that the producer is not dropping buffered records; * we set a timeout because the test will not finish if the logic is broken. */ @SuppressWarnings({"unchecked", "ResultOfMethodCallIgnored"}) @Test(timeout = 10000) public void testAtLeastOnceProducer() throws Throwable { final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); testHarness.open(); testHarness.processElement(new StreamRecord<>("msg-1")); testHarness.processElement(new StreamRecord<>("msg-2")); testHarness.processElement(new StreamRecord<>("msg-3")); // start a thread to perform checkpointing CheckedThread snapshotThread = new CheckedThread() { @Override public void go() throws Exception { // this should block until all records are flushed; // if the snapshot implementation returns before pending records are flushed, testHarness.snapshot(123L, 123L); } }; snapshotThread.start(); // before proceeding, make sure that flushing has started and that the snapshot is still blocked; // this would block forever if the snapshot didn't perform a flush producer.waitUntilFlushStarted(); Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive()); // now, complete the callbacks UserRecordResult result = mock(UserRecordResult.class); when(result.isSuccessful()).thenReturn(true); producer.getPendingRecordFutures().get(0).set(result); Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive()); producer.getPendingRecordFutures().get(1).set(result); Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive()); producer.getPendingRecordFutures().get(2).set(result); // this would fail with an exception if flushing wasn't completed before the snapshot method returned snapshotThread.sync(); testHarness.close(); }
List<SettableFuture<UserRecordResult>> getPendingRecordFutures() { return pendingRecordFutures; }
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION)); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) { producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); } if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) { producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); } producer = new KinesisProducer(producerConfig); callback = new FutureCallback<UserRecordResult>() { @Override public void onSuccess(UserRecordResult result) { if (!result.isSuccessful()) { if(failOnError) { thrownException = new RuntimeException("Record was not sent successful"); } else { LOG.warn("Record was not sent successful"); } } } @Override public void onFailure(Throwable t) { if (failOnError) { thrownException = t; } else { LOG.warn("An exception occurred while processing a record", t); } } }; if (this.customPartitioner != null) { this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion()); }
@Override public void invoke(OUT value) throws Exception { if (this.producer == null) { throw new RuntimeException("Kinesis producer has been closed"); } if (thrownException != null) { String errorMessages = ""; if (thrownException instanceof UserRecordFailedException) { List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts(); for (Attempt attempt: attempts) { if (attempt.getErrorMessage() != null) { errorMessages += attempt.getErrorMessage() +"\n"; } } } if (failOnError) { throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException); } else { LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages); thrownException = null; // reset } } String stream = defaultStream; String partition = defaultPartition; ByteBuffer serialized = schema.serialize(value); // maybe set custom stream String customStream = schema.getTargetStream(value); if (customStream != null) { stream = customStream; } String explicitHashkey = null; // maybe set custom partition if (customPartitioner != null) { partition = customPartitioner.getPartitionId(value); explicitHashkey = customPartitioner.getExplicitHashKey(value); } if (stream == null) { if (failOnError) { throw new RuntimeException("No target stream set"); } else { LOG.warn("No target stream set. Skipping record"); return; } } ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized); Futures.addCallback(cb, callback); }
@SuppressWarnings("unchecked") @Test public void testInOrderProduce() throws Exception { KinesisProducerConfigBean config = getKinesisTargetConfig(); config.preserveOrdering = true; KinesisTarget target = new KinesisTarget(config); TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build(); PowerMockito.mockStatic(KinesisUtil.class); when(KinesisUtil.checkStreamExists( any(ClientConfiguration.class), any(KinesisConfigBean.class), any(String.class), any(List.class), any(Stage.Context.class) ) ).thenReturn(1L); KinesisProducer producer = mock(KinesisProducer.class); Whitebox.setInternalState(target, "kinesisProducer", producer); targetRunner.runInit(); ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class); UserRecordResult result = mock(UserRecordResult.class); when(result.isSuccessful()).thenReturn(true); when(result.getShardId()).thenReturn("shardId-000000000000"); when(future.get()).thenReturn(result); when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class))) .thenReturn(future); targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3)); // Verify we added 3 records to stream test verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class)); // With preserveOrdering we should call flushSync for each record, plus once more for the batch. // The last invocation has no effect as no records should be pending. verify(producer, times(4)).flushSync(); targetRunner.runDestroy(); }
@SuppressWarnings("unchecked") @Test public void testDefaultProduce() throws Exception { KinesisProducerConfigBean config = getKinesisTargetConfig(); KinesisTarget target = new KinesisTarget(config); TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build(); KinesisTestUtil.mockKinesisUtil(1); KinesisProducer producer = mock(KinesisProducer.class); Whitebox.setInternalState(target, "kinesisProducer", producer); targetRunner.runInit(); ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class); UserRecordResult result = mock(UserRecordResult.class); when(result.isSuccessful()).thenReturn(true); when(future.get()).thenReturn(result); when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class))) .thenReturn(future); targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3)); // Verify we added 3 records verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class)); // By default we should only call flushSync one time per batch. verify(producer, times(1)).flushSync(); targetRunner.runDestroy(); }
@Override public void onSuccess(UserRecordResult result) { }