private void sentWatermark() { try { //refresh the list of available shards, if current state is too old if (System.currentTimeMillis() - lastShardRefreshTime >= SHARD_REFRESH_MILLIES) { refreshShards(); lastShardRefreshTime = System.currentTimeMillis(); } //send a watermark to every shard of the Kinesis stream shards.parallelStream() .map(shard -> new PutRecordRequest() .withStreamName(streamName) .withData(new WatermarkEvent(currentWatermark).payload) .withPartitionKey("23") .withExplicitHashKey(shard.getHashKeyRange().getStartingHashKey())) .map(kinesisClient::putRecord) .forEach(putRecordResult -> LOG.trace("send watermark {} to shard {}", new DateTime(currentWatermark), putRecordResult.getShardId())); LOG.debug("send watermark {}", new DateTime(currentWatermark)); } catch (LimitExceededException | ProvisionedThroughputExceededException e) { //if any request is throttled, just wait for the next iteration to submit another watermark LOG.warn("skipping watermark due to limit exceeded exception"); } }
@Before public void setup() throws Exception { when(kinesisEndpoint.getClient()).thenReturn(kinesisClient); when(kinesisEndpoint.getEndpointUri()).thenReturn("kinesis://etl"); when(kinesisEndpoint.getStreamName()).thenReturn(STREAM_NAME); when(exchange.getOut()).thenReturn(outMessage); when(exchange.getIn()).thenReturn(inMessage); when(exchange.getPattern()).thenReturn(ExchangePattern.InOut); when(inMessage.getBody(ByteBuffer.class)).thenReturn(SAMPLE_BUFFER); when(inMessage.getHeader(KinesisConstants.PARTITION_KEY)).thenReturn(PARTITION_KEY); when(putRecordResult.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER); when(putRecordResult.getShardId()).thenReturn(SHARD_ID); when(kinesisClient.putRecord(any(PutRecordRequest.class))).thenReturn(putRecordResult); kinesisProducer = new KinesisProducer(kinesisEndpoint); }
@Test public void shouldHaveProperHeadersWhenSending() throws Exception { String seqNoForOrdering = "1851"; when(inMessage.getHeader(KinesisConstants.SEQUENCE_NUMBER)).thenReturn(seqNoForOrdering); kinesisProducer.process(exchange); ArgumentCaptor<PutRecordRequest> capture = ArgumentCaptor.forClass(PutRecordRequest.class); verify(kinesisClient).putRecord(capture.capture()); PutRecordRequest request = capture.getValue(); assertEquals(PARTITION_KEY, request.getPartitionKey()); assertEquals(seqNoForOrdering, request.getSequenceNumberForOrdering()); verify(outMessage).setHeader(KinesisConstants.SEQUENCE_NUMBER, SEQUENCE_NUMBER); verify(outMessage).setHeader(KinesisConstants.SHARD_ID, SHARD_ID); }
public void processTuple(T tuple) { // Send out single data try { if (isBatchProcessing) { if (putRecordsRequestEntryList.size() == batchSize) { flushRecords(); logger.debug( "flushed {} records.", batchSize ); } addRecord(tuple); } else { Pair<String, V> keyValue = tupleToKeyValue(tuple); PutRecordRequest requestRecord = new PutRecordRequest(); requestRecord.setStreamName(streamName); requestRecord.setPartitionKey(keyValue.first); requestRecord.setData(ByteBuffer.wrap(getRecord(keyValue.second))); client.putRecord(requestRecord); } sendCount++; } catch (AmazonClientException e) { throw new RuntimeException(e); } }
/** * Process the input file and send PutRecordRequests to Amazon Kinesis. * * This function serves to Isolate StreamSource logic so subclasses * can process input files differently. * * @param inputStream * the input stream to process * @param iteration * the iteration if looping over file * @throws IOException * throw exception if error processing inputStream. */ protected void processInputStream(InputStream inputStream, int iteration) throws IOException { try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) { String line; int lines = 0; while ((line = br.readLine()) != null) { SimpleKinesisMessageModel kinesisMessageModel = new SimpleKinesisMessageModel(line); //SimpleKinesisMessageModel kinesisMessageModel = objectMapper.readValue(line, SimpleKinesisMessageModel.class); PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM); putRecordRequest.setData(ByteBuffer.wrap(line.getBytes())); putRecordRequest.setPartitionKey(Integer.toString(kinesisMessageModel.getId())); kinesisClient.putRecord(putRecordRequest); lines++; } LOG.info("Added " + lines + " records to stream source."); } }
public static void main(String[] args) { AmazonKinesisClient kinesisClient = Helper.setupKinesisClient(); setupHosebirdClient(); hosebirdClient.connect(); while (!hosebirdClient.isDone()) { try { String tweetText = msgQueue.take(); // Add Data to a Stream PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); putRecordRequest.setData(ByteBuffer.wrap(tweetText.getBytes())); putRecordRequest.setPartitionKey(String.format("partitionKey-%s", "tweets")); PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest); System.out.println(String.format("Seq No: %s - %s", putRecordResult.getSequenceNumber(), tweetText)); } catch (InterruptedException e) { e.printStackTrace(); } } }
private void sendUsingAsyncClient(final PutRecordRequest putRecordRequest, Message<JsonObject> event) { if (retryCounter == 3) { sendError(event, "Failed sending message to Kinesis"); } final Context ctx = vertx.currentContext(); kinesisAsyncClient.putRecordAsync(putRecordRequest, new AsyncHandler<PutRecordRequest,PutRecordResult>() { public void onSuccess(PutRecordRequest request, final PutRecordResult recordResult) { ctx.runOnContext(v -> { logger.debug("Sent message to Kinesis: " + recordResult.toString()); sendOK(event); }); } public void onError(final java.lang.Exception iexc) { ctx.runOnContext(v -> { retryCounter++; kinesisAsyncClient = createClient(); logger.info("Failed sending message to Kinesis, retry: " + retryCounter + " ... ", iexc); vertx.setTimer(500, timerID -> sendUsingAsyncClient(putRecordRequest, event)); }); } }); }
/** * This method is called whenever a logging happens via logger.log(..) API * calls. Implementation for this appender will take in log events instantly * as long as the buffer is not full (as per user configuration). This call * will block if internal buffer is full until internal threads create some * space by publishing some of the records. * * If there is any error in parsing logevents, those logevents would be * dropped. */ @Override public void append(LoggingEvent logEvent) { if (initializationFailed) { error("Check the configuration and whether the configured stream " + streamName + " exists and is active. Failed to initialize kinesis log4j appender: " + name); return; } try { String message = layout.format(logEvent); ByteBuffer data = ByteBuffer.wrap(message.getBytes(encoding)); kinesisClient.putRecordAsync(new PutRecordRequest().withPartitionKey(UUID.randomUUID().toString()) .withStreamName(streamName).withData(data), asyncCallHander); } catch (Exception e) { LOGGER.error("Failed to schedule log entry for publishing into Kinesis stream: " + streamName); errorHandler.error("Failed to schedule log entry for publishing into Kinesis stream: " + streamName, e, ErrorCode.WRITE_FAILURE, logEvent); } }
private void run(final int events, final OutputFormat format, final String streamName, final String region) throws Exception { AmazonKinesis kinesisClient = new AmazonKinesisClient( new DefaultAWSCredentialsProviderChain()); kinesisClient.setRegion(Region.getRegion(Regions.fromName(region))); int count = 0; SensorReading r = null; do { r = nextSensorReading(format); try { PutRecordRequest req = new PutRecordRequest() .withPartitionKey("" + rand.nextLong()) .withStreamName(streamName) .withData(ByteBuffer.wrap(r.toString().getBytes())); kinesisClient.putRecord(req); } catch (ProvisionedThroughputExceededException e) { Thread.sleep(BACKOFF); } System.out.println(r); count++; } while (count < events); }
/** * Process the input file and send PutRecordRequests to Amazon Kinesis. * * This function serves to Isolate StreamSource logic so subclasses * can process input files differently. * * @param inputStream * the input stream to process * @param iteration * the iteration if looping over file * @throws IOException * throw exception if error processing inputStream. */ protected void processInputStream(InputStream inputStream, int iteration) throws IOException { try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) { String line; int lines = 0; while ((line = br.readLine()) != null) { KinesisMessageModel kinesisMessageModel = objectMapper.readValue(line, KinesisMessageModel.class); PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM); putRecordRequest.setData(ByteBuffer.wrap(line.getBytes())); putRecordRequest.setPartitionKey(Integer.toString(kinesisMessageModel.getUserid())); kinesisClient.putRecord(putRecordRequest); lines++; } LOG.info("Added " + lines + " records to stream source."); } }
public void run() { while (true) { try { // get message from queue - blocking so code will wait here for work to do Event event = eventsQueue.take(); PutRecordRequest put = new PutRecordRequest(); put.setStreamName(this.streamName); put.setData(event.getData()); put.setPartitionKey(event.getPartitionKey()); PutRecordResult result = kinesisClient.putRecord(put); logger.info(result.getSequenceNumber() + ": {}", this); } catch (Exception e) { // didn't get record - move on to next\ e.printStackTrace(); } } }
@Override public void write(VDSEvent event) throws Exception { Map <String, String> headers = event.getEventInfo(); PutRecordRequest req = new PutRecordRequest(); req.setStreamName(_streamName); req.setData(ByteBuffer.wrap(event.getBuffer().array(), 0, event.getBufferLen())); if(headers != null){ if(headers.containsKey(PARTITION_KEY)){ _logger.debug("Using partition key from header"); req.setPartitionKey(headers.get(PARTITION_KEY)); } else{ _logger.debug("Using random partition key"); req.setPartitionKey(String.valueOf(_random.nextLong())); } if(headers.containsKey(SEQUENCE_NUMBER)){ req.setSequenceNumberForOrdering(headers.get(SEQUENCE_NUMBER)); } } if(_logger.isDebugEnabled()){ _logger.debug("Record content: {}", new String(event.getBuffer().array(), 0, event.getBufferLen())); _logger.debug("Record length: {}", event.getBufferLen()); } _client.putRecordAsync(req, _callback); }
@Override public Event save(Event event, String tenantId) { if (event == null || tenantId == null) { log.error(event.toString()); throw new IllegalArgumentException("Event or Tenant cannot be null"); } PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(stream); putRecordRequest.setPartitionKey(tenantId); putRecordRequest.setData(ByteBuffer.wrap(event.toJSON().getBytes())); PutRecordResult result = kinesisClient.putRecord(putRecordRequest); log.debug(String.format("Successfully putrecord, partition key : %s, ShardID: %s, Sequence Number: %s", putRecordRequest.getPartitionKey(),result.getShardId(),result.getSequenceNumber())); return event; }
public com.amazonaws.services.kinesis.AmazonKinesis build(AmazonKinesis kinesisProperties) { return new AbstractAmazonKinesis() { public PutRecordResult putRecord(PutRecordRequest request) { // do nothing return new PutRecordResult(); } }; }
@Override public Call<Void> sendSpans(List<byte[]> list) { if (closeCalled) throw new IllegalStateException("closed"); ByteBuffer message = ByteBuffer.wrap(BytesMessageEncoder.forEncoding(encoding()).encode(list)); PutRecordRequest request = new PutRecordRequest(); request.setStreamName(streamName()); request.setData(message); request.setPartitionKey(getPartitionKey()); return new KinesisCall(request); }
@Override protected void doEnqueue(Callback<Void> callback) { future = get().putRecordAsync(message, new AsyncHandler<PutRecordRequest, PutRecordResult>() { @Override public void onError(Exception e) { callback.onError(e); } @Override public void onSuccess(PutRecordRequest request, PutRecordResult result) { callback.onSuccess(null); } }); if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); }
@Override public void process(Exchange exchange) throws Exception { PutRecordRequest request = createRequest(exchange); PutRecordResult putRecordResult = getEndpoint().getClient().putRecord(request); Message message = getMessageForResponse(exchange); message.setHeader(KinesisConstants.SEQUENCE_NUMBER, putRecordResult.getSequenceNumber()); message.setHeader(KinesisConstants.SHARD_ID, putRecordResult.getShardId()); }
private PutRecordRequest createRequest(Exchange exchange) { ByteBuffer body = exchange.getIn().getBody(ByteBuffer.class); Object partitionKey = exchange.getIn().getHeader(KinesisConstants.PARTITION_KEY); Object sequenceNumber = exchange.getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER); PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setData(body); putRecordRequest.setStreamName(getEndpoint().getStreamName()); putRecordRequest.setPartitionKey(partitionKey.toString()); if (sequenceNumber != null) { putRecordRequest.setSequenceNumberForOrdering(sequenceNumber.toString()); } return putRecordRequest; }
@Test public void shouldPutRecordInRightStreamWhenProcessingExchange() throws Exception { kinesisProducer.process(exchange); ArgumentCaptor<PutRecordRequest> capture = ArgumentCaptor.forClass(PutRecordRequest.class); verify(kinesisClient).putRecord(capture.capture()); PutRecordRequest request = capture.getValue(); ByteBuffer byteBuffer = request.getData(); byte[] actualArray = byteBuffer.array(); byte[] sampleArray = SAMPLE_BUFFER.array(); assertEquals(sampleArray, actualArray); assertEquals(STREAM_NAME, request.getStreamName()); }
@Override public void run() { if (records == null) { generateRecords(); } else { for (String msg : records) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(streamName); putRecordRequest.setData(ByteBuffer.wrap(msg.getBytes())); putRecordRequest.setPartitionKey(msg); client.putRecord(putRecordRequest); } } }
protected void sendMessageToKinesis(ByteBuffer payload, String partitionKey) throws KinesisException { if (kinesisAsyncClient == null) { throw new KinesisException("AmazonKinesisAsyncClient is not initialized"); } PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(Constants.STREAM_NAME); putRecordRequest.setPartitionKey(partitionKey); LOGGER.info("Writing to streamName " + Constants.STREAM_NAME + " using partitionkey " + partitionKey); putRecordRequest.setData(payload); Future<PutRecordResult> futureResult = kinesisAsyncClient.putRecordAsync(putRecordRequest); try { PutRecordResult recordResult = futureResult.get(); LOGGER.info("Sent message to Kinesis: " + recordResult.toString()); } catch (InterruptedException iexc) { LOGGER.error(iexc); } catch (ExecutionException eexc) { LOGGER.error(eexc); } }
public static void putEventRecord(AmazonKinesis client, String stream) throws Exception { String eventRecord = getEventRecord(); PutRecordRequest put = new PutRecordRequest(); put.setStreamName(stream); put.setPartitionKey("test:650"); put.setData(ByteBuffer.wrap(eventRecord.getBytes("UTF-8"))); try { PutRecordResult result = client.putRecord(put); System.out.println(result); } catch (AmazonClientException ex) { System.out.println("PutRecord failed."); } }
private void flushBuffer() throws IOException { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(config.KINESIS_INPUT_STREAM); putRecordRequest.setData(ByteBuffer.wrap(bufferToBytes())); putRecordRequest.setPartitionKey(String.valueOf(UUID.randomUUID())); kinesisClient.putRecord(putRecordRequest); buffer.clear(); }
@Override public PutRecordResult putRecord(PutRecordRequest putRecordRequest) throws AmazonServiceException, AmazonClientException { // Setup method to add a new record: InternalStream theStream = this.getStream(putRecordRequest.getStreamName()); if (theStream != null) { PutRecordResult result = theStream.putRecord(putRecordRequest.getData(), putRecordRequest.getPartitionKey()); return result; } else { throw new AmazonClientException("This stream does not exist!"); } }
protected void sendMessageToKinesis(final Message<JsonObject> event) throws KinesisException { if (kinesisAsyncClient == null) { throw new KinesisException("AmazonKinesisAsyncClient is not initialized"); } if (!isValid(event.body().getString(PAYLOAD))) { logger.error("Invalid message provided."); return; } JsonObject object = event.body(); logger.debug(" --- Got event " + event.toString()); logger.debug(" --- Got body + " + object.toString()); byte[] payload = object.getBinary(PAYLOAD); if (payload == null) { logger.debug(" --- Payload is null, trying to get the payload as String"); payload = object.getString(PAYLOAD).getBytes(); } logger.debug("Binary payload size: " + payload.length); String msgPartitionKey = object.getString(PARTITION_KEY); String requestPartitionKey = msgPartitionKey != null ? msgPartitionKey : partitionKey; PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(streamName); putRecordRequest.setPartitionKey(requestPartitionKey); logger.debug("Writing to streamName " + streamName + " using partitionkey " + requestPartitionKey); putRecordRequest.setData(ByteBuffer.wrap(payload)); retryCounter = 0; this.sendUsingAsyncClient(putRecordRequest, event); }
@Test public void simpleProcessTest() throws EventDeliveryException { Channel channel = mock(Channel.class); Transaction transactionMock = mock(Transaction.class); AmazonKinesisClient kinesisClient = mock(AmazonKinesisClient.class); PutRecordResult putRecordResult = mock(PutRecordResult.class); when(channel.getTransaction()).thenReturn(transactionMock); Event testEvent = new SimpleEvent(); byte[] testBody = new byte[]{'b', 'o', 'd', 'y'}; testEvent.setBody(testBody); when(channel.take()).thenReturn(testEvent); when(kinesisClient.putRecord(any(PutRecordRequest.class))).thenReturn(putRecordResult); KinesisSink kinesisSink = new KinesisSink(kinesisClient); kinesisSink.setChannel(channel); Context context = new Context(); context.put(KinesisSinkConfigurationConstant.ACCESS_KEY, "default"); context.put(KinesisSinkConfigurationConstant.ACCESS_SECRET_KEY, "default"); context.put(KinesisSinkConfigurationConstant.STREAM_NAME, "default"); kinesisSink.configure(context); kinesisSink.start(); kinesisSink.process(); verify(channel, times(1)).getTransaction(); verify(channel, times(1)).take(); verify(transactionMock, times(1)).begin(); verify(transactionMock, times(1)).close(); verify(transactionMock, times(1)).commit(); verify(transactionMock, times(0)).rollback(); }
public static void main(String[] args) throws SQLException { java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); query = System.getProperty("kinesisapp.query"); conn = DriverManager.getConnection( System.getProperty("kinesisapp.jdbcurl"), System.getProperty("kinesisapp.dbuser"), System.getProperty("kinesisapp.dbpassword")); conn.setAutoCommit(true); AmazonKinesisClient client = new AmazonKinesisClient(); client.setEndpoint("https://kinesis.us-east-1.amazonaws.com"); String stream = "test"; int iteration = 100; int threashold = 1000; String data = new String("{\"user\":\"10125\",\"line\":\"aaa\",\"station\":\"bbb\",\"latitude\":35."); Random rand = new Random(); try { long start = System.currentTimeMillis(); String myKey = Long.toString(Thread.currentThread().getId()); for (int i = 0; i < iteration; i++) { try { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(stream); putRecordRequest.setData(ByteBuffer.wrap((data+Integer.toString(rand.nextInt(19)+52)+",\"longitude\":139."+Integer.toString(rand.nextInt(39)+51)+"}").getBytes())); putRecordRequest.setPartitionKey(myKey); PutRecordResult putRecordResult = client.putRecord(putRecordRequest); } catch(Exception iex) { } } System.out.println("Elapsed time(ms) for task " + Thread.currentThread().getId() + " : " + (System.currentTimeMillis() - start)); } catch(Exception ex) { ex.printStackTrace(); } }
public static void main(String[] args) { java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); AmazonKinesisClient client = new AmazonKinesisClient(); client.setEndpoint("https://kinesis.us-east-1.amazonaws.com"); String stream = "test"; int iteration = 100; int threashold = 1000; String data = new String("{\"user\":\"10125\",\"line\":\"aaa\",\"station\":\"bbb\",\"latitude\":35."); Random rand = new Random(); try { long start = System.currentTimeMillis(); String myKey = Long.toString(Thread.currentThread().getId()); for (int i = 0; i < iteration; i++) { try { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(stream); putRecordRequest.setData(ByteBuffer.wrap((data+Integer.toString(rand.nextInt(19)+52)+",\"longitude\":139."+Integer.toString(rand.nextInt(39)+51)+"}").getBytes())); putRecordRequest.setPartitionKey(myKey); PutRecordResult putRecordResult = client.putRecord(putRecordRequest); } catch(Exception iex) { } } System.out.println("Elapsed time(ms) for task " + Thread.currentThread().getId() + " : " + (System.currentTimeMillis() - start)); } catch(Exception ex) { ex.printStackTrace(); } }
/** * This method is invoked when a log record is successfully sent to Kinesis. * Though this is not too useful for production use cases, it provides a good * debugging tool while tweaking parameters for the appender. */ @Override public void onSuccess(PutRecordRequest request, PutRecordResult result) { successfulRequestCount++; if (logger.isDebugEnabled() && (successfulRequestCount + failedRequestCount) % 3000 == 0) { logger.debug("Appender (" + appenderName + ") made " + successfulRequestCount + " successful put requests out of total " + (successfulRequestCount + failedRequestCount) + " in " + PeriodFormat.getDefault().print(new Period(startTime, DateTime.now())) + " since start"); } }
@Override protected void handle(E event, byte[] encoded) throws Exception { ByteBuffer buffer = ByteBuffer.wrap(encoded); PutRecordRequest request = new PutRecordRequest() .withPartitionKey(getPartitionKey(event)) .withStreamName(stream) .withData(buffer); String errorMessage = format("Appender '%s' failed to send logging event '%s' to Kinesis stream '%s'", getName(), event, stream); CountDownLatch latch = new CountDownLatch(isAsyncParent() ? 0 : 1); kinesis.putRecordAsync(request, new LoggingEventHandler<PutRecordRequest, PutRecordResult>(this, latch, errorMessage)); AppenderExecutors.awaitLatch(this, latch, getMaxFlushTime()); }
/** * publish a single message */ @Override public void submit(final Event event) { try { Preconditions.checkNotNull(event, "event cannot be null"); Preconditions.checkNotNull(event.getEventData(), "event data cannot be null"); Preconditions.checkArgument(!Strings.isNullOrEmpty(event.getTopic()), "Topic is required"); Preconditions.checkNotNull(event.getPartitionKey(), "Partition key cannot be null"); Preconditions.checkArgument(!Strings.isNullOrEmpty(event.getPartitionKey().toString()), "PartitionKey is required"); /** serialize the event **/ byte[] bytes = this.mapper.writeValueAsBytes(event); final PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(event.getTopic()); putRecordRequest.setPartitionKey(event.getPartitionKey().toString()); putRecordRequest.setData(ByteBuffer.wrap(bytes)); //putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); final PutRecordResult putRecordResult = client.putRecord(putRecordRequest); LOGGER.trace("published message to stream: {} partitionKey: {}, sequenceNumberForOrdering: {}, returnedSequenceNumber:{}", putRecordRequest.getStreamName(), putRecordRequest.getPartitionKey(), putRecordRequest.getSequenceNumberForOrdering(), putRecordResult.getSequenceNumber()); } catch (JsonProcessingException e) { LOGGER.error(e.getMessage(), e); } }
@Test @SuppressWarnings("unchecked") public void testProducerErrorChannel() throws Exception { KinesisTestBinder binder = getBinder(); final RuntimeException putRecordException = new RuntimeException("putRecordRequestEx"); final AtomicReference<Object> sent = new AtomicReference<>(); AmazonKinesisAsync amazonKinesisMock = mock(AmazonKinesisAsync.class); BDDMockito.given(amazonKinesisMock.putRecordAsync(any(PutRecordRequest.class), any(AsyncHandler.class))) .willAnswer((Answer<Future<PutRecordResult>>) invocation -> { PutRecordRequest request = invocation.getArgument(0); sent.set(request.getData()); AsyncHandler<?, ?> handler = invocation.getArgument(1); handler.onError(putRecordException); return mock(Future.class); }); new DirectFieldAccessor(binder.getBinder()).setPropertyValue("amazonKinesis", amazonKinesisMock); ExtendedProducerProperties<KinesisProducerProperties> producerProps = createProducerProperties(); producerProps.setErrorChannelEnabled(true); DirectChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(producerProps)); Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0", moduleOutputChannel, producerProps); ApplicationContext applicationContext = TestUtils.getPropertyValue(binder.getBinder(), "applicationContext", ApplicationContext.class); SubscribableChannel ec = applicationContext.getBean("ec.0.errors", SubscribableChannel.class); final AtomicReference<Message<?>> errorMessage = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); ec.subscribe(message -> { errorMessage.set(message); latch.countDown(); }); String messagePayload = "oops"; moduleOutputChannel.send(new GenericMessage<>(messagePayload.getBytes())); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class); assertThat(errorMessage.get().getPayload()).isInstanceOf(AwsRequestFailureException.class); AwsRequestFailureException exception = (AwsRequestFailureException) errorMessage.get().getPayload(); assertThat(exception.getCause()).isSameAs(putRecordException); assertThat(((PutRecordRequest) exception.getRequest()).getData()).isSameAs(sent.get()); producerBinding.unbind(); }
KinesisCall(PutRecordRequest message) { this.message = message; }
@Override public PutRecordResult putRecord(PutRecordRequest putRecordRequest) { throw new RuntimeException("Not implemented"); }
@Override protected void putMessage(String message) throws Exception { ByteBuffer data = ByteBuffer.wrap(message.getBytes(getEncoding())); getClient().putRecordAsync(new PutRecordRequest().withPartitionKey(UUID.randomUUID().toString()) .withStreamName(getStreamName()).withData(data), asyncCallHander); }
@Override public void onSuccess(PutRecordRequest rqst, PutRecordResult result) { _logger.debug("PutRecord completed successfully. Sequence number = {}", result.getSequenceNumber()); count.incrementAndGet(); }
/** * This method is invoked when a log record is successfully sent to Kinesis. * Though this is not too useful for production use cases, it provides a good * debugging tool while tweaking parameters for the appender. */ @Override public void onSuccess(PutRecordRequest request, PutRecordResult result) { successfulRequestCount++; }