public PutRecordResult putRecord(ByteBuffer data, String partitionKey) { // Create record and insert into the shards. Initially just do it // on a round robin basis. long ts = System.currentTimeMillis() - 50000; Record rec = new Record(); rec = rec.withData(data).withPartitionKey(partitionKey).withSequenceNumber(String.valueOf(sequenceNo)); rec.setApproximateArrivalTimestamp(new Date(ts)); if (nextShard == shards.size()) { nextShard = 0; } InternalShard shard = shards.get(nextShard); shard.addRecord(rec); PutRecordResult result = new PutRecordResult(); result.setSequenceNumber(String.valueOf(sequenceNo)); result.setShardId(shard.getShardId()); nextShard++; sequenceNo++; return result; }
@Override public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) throws AmazonServiceException, AmazonClientException { // Setup method to add a batch of new records: InternalStream theStream = this.getStream(putRecordsRequest.getStreamName()); if (theStream != null) { PutRecordsResult result = new PutRecordsResult(); ArrayList<PutRecordsResultEntry> resultList = new ArrayList<PutRecordsResultEntry>(); for (PutRecordsRequestEntry entry : putRecordsRequest.getRecords()) { PutRecordResult putResult = theStream.putRecord(entry.getData(), entry.getPartitionKey()); resultList.add((new PutRecordsResultEntry()).withShardId(putResult.getShardId()).withSequenceNumber(putResult.getSequenceNumber())); } result.setRecords(resultList); return result; } else { throw new AmazonClientException("This stream does not exist!"); } }
public static void main(String[] args) { AmazonKinesisClient kinesisClient = Helper.setupKinesisClient(); setupHosebirdClient(); hosebirdClient.connect(); while (!hosebirdClient.isDone()) { try { String tweetText = msgQueue.take(); // Add Data to a Stream PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); putRecordRequest.setData(ByteBuffer.wrap(tweetText.getBytes())); putRecordRequest.setPartitionKey(String.format("partitionKey-%s", "tweets")); PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest); System.out.println(String.format("Seq No: %s - %s", putRecordResult.getSequenceNumber(), tweetText)); } catch (InterruptedException e) { e.printStackTrace(); } } }
private void sendUsingAsyncClient(final PutRecordRequest putRecordRequest, Message<JsonObject> event) { if (retryCounter == 3) { sendError(event, "Failed sending message to Kinesis"); } final Context ctx = vertx.currentContext(); kinesisAsyncClient.putRecordAsync(putRecordRequest, new AsyncHandler<PutRecordRequest,PutRecordResult>() { public void onSuccess(PutRecordRequest request, final PutRecordResult recordResult) { ctx.runOnContext(v -> { logger.debug("Sent message to Kinesis: " + recordResult.toString()); sendOK(event); }); } public void onError(final java.lang.Exception iexc) { ctx.runOnContext(v -> { retryCounter++; kinesisAsyncClient = createClient(); logger.info("Failed sending message to Kinesis, retry: " + retryCounter + " ... ", iexc); vertx.setTimer(500, timerID -> sendUsingAsyncClient(putRecordRequest, event)); }); } }); }
public void run() { while (true) { try { // get message from queue - blocking so code will wait here for work to do Event event = eventsQueue.take(); PutRecordRequest put = new PutRecordRequest(); put.setStreamName(this.streamName); put.setData(event.getData()); put.setPartitionKey(event.getPartitionKey()); PutRecordResult result = kinesisClient.putRecord(put); logger.info(result.getSequenceNumber() + ": {}", this); } catch (Exception e) { // didn't get record - move on to next\ e.printStackTrace(); } } }
@Override protected void runOnce() throws Exception { ClickEvent event = inputQueue.take(); String partitionKey = event.getSessionId(); ByteBuffer data = ByteBuffer.wrap( event.getPayload().getBytes("UTF-8")); recordsPut.getAndIncrement(); PutRecordResult res = kinesis.putRecord( STREAM_NAME, data, partitionKey); MetricDatum d = new MetricDatum() .withDimensions( new Dimension().withName("StreamName").withValue(STREAM_NAME), new Dimension().withName("ShardId").withValue(res.getShardId()), new Dimension().withName("Host").withValue( InetAddress.getLocalHost().toString())) .withValue(1.0) .withMetricName("RecordsPut"); cw.putMetricData(new PutMetricDataRequest() .withMetricData(d) .withNamespace("MySampleProducer")); }
@Override public Event save(Event event, String tenantId) { if (event == null || tenantId == null) { log.error(event.toString()); throw new IllegalArgumentException("Event or Tenant cannot be null"); } PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(stream); putRecordRequest.setPartitionKey(tenantId); putRecordRequest.setData(ByteBuffer.wrap(event.toJSON().getBytes())); PutRecordResult result = kinesisClient.putRecord(putRecordRequest); log.debug(String.format("Successfully putrecord, partition key : %s, ShardID: %s, Sequence Number: %s", putRecordRequest.getPartitionKey(),result.getShardId(),result.getSequenceNumber())); return event; }
public com.amazonaws.services.kinesis.AmazonKinesis build(AmazonKinesis kinesisProperties) { return new AbstractAmazonKinesis() { public PutRecordResult putRecord(PutRecordRequest request) { // do nothing return new PutRecordResult(); } }; }
@Override protected void doEnqueue(Callback<Void> callback) { future = get().putRecordAsync(message, new AsyncHandler<PutRecordRequest, PutRecordResult>() { @Override public void onError(Exception e) { callback.onError(e); } @Override public void onSuccess(PutRecordRequest request, PutRecordResult result) { callback.onSuccess(null); } }); if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); }
@Override public void process(Exchange exchange) throws Exception { PutRecordRequest request = createRequest(exchange); PutRecordResult putRecordResult = getEndpoint().getClient().putRecord(request); Message message = getMessageForResponse(exchange); message.setHeader(KinesisConstants.SEQUENCE_NUMBER, putRecordResult.getSequenceNumber()); message.setHeader(KinesisConstants.SHARD_ID, putRecordResult.getShardId()); }
protected void sendMessageToKinesis(ByteBuffer payload, String partitionKey) throws KinesisException { if (kinesisAsyncClient == null) { throw new KinesisException("AmazonKinesisAsyncClient is not initialized"); } PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(Constants.STREAM_NAME); putRecordRequest.setPartitionKey(partitionKey); LOGGER.info("Writing to streamName " + Constants.STREAM_NAME + " using partitionkey " + partitionKey); putRecordRequest.setData(payload); Future<PutRecordResult> futureResult = kinesisAsyncClient.putRecordAsync(putRecordRequest); try { PutRecordResult recordResult = futureResult.get(); LOGGER.info("Sent message to Kinesis: " + recordResult.toString()); } catch (InterruptedException iexc) { LOGGER.error(iexc); } catch (ExecutionException eexc) { LOGGER.error(eexc); } }
public static void putEventRecord(AmazonKinesis client, String stream) throws Exception { String eventRecord = getEventRecord(); PutRecordRequest put = new PutRecordRequest(); put.setStreamName(stream); put.setPartitionKey("test:650"); put.setData(ByteBuffer.wrap(eventRecord.getBytes("UTF-8"))); try { PutRecordResult result = client.putRecord(put); System.out.println(result); } catch (AmazonClientException ex) { System.out.println("PutRecord failed."); } }
@Override public PutRecordResult putRecord(PutRecordRequest putRecordRequest) throws AmazonServiceException, AmazonClientException { // Setup method to add a new record: InternalStream theStream = this.getStream(putRecordRequest.getStreamName()); if (theStream != null) { PutRecordResult result = theStream.putRecord(putRecordRequest.getData(), putRecordRequest.getPartitionKey()); return result; } else { throw new AmazonClientException("This stream does not exist!"); } }
@Test public void simpleProcessTest() throws EventDeliveryException { Channel channel = mock(Channel.class); Transaction transactionMock = mock(Transaction.class); AmazonKinesisClient kinesisClient = mock(AmazonKinesisClient.class); PutRecordResult putRecordResult = mock(PutRecordResult.class); when(channel.getTransaction()).thenReturn(transactionMock); Event testEvent = new SimpleEvent(); byte[] testBody = new byte[]{'b', 'o', 'd', 'y'}; testEvent.setBody(testBody); when(channel.take()).thenReturn(testEvent); when(kinesisClient.putRecord(any(PutRecordRequest.class))).thenReturn(putRecordResult); KinesisSink kinesisSink = new KinesisSink(kinesisClient); kinesisSink.setChannel(channel); Context context = new Context(); context.put(KinesisSinkConfigurationConstant.ACCESS_KEY, "default"); context.put(KinesisSinkConfigurationConstant.ACCESS_SECRET_KEY, "default"); context.put(KinesisSinkConfigurationConstant.STREAM_NAME, "default"); kinesisSink.configure(context); kinesisSink.start(); kinesisSink.process(); verify(channel, times(1)).getTransaction(); verify(channel, times(1)).take(); verify(transactionMock, times(1)).begin(); verify(transactionMock, times(1)).close(); verify(transactionMock, times(1)).commit(); verify(transactionMock, times(0)).rollback(); }
public static void main(String[] args) throws SQLException { java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); query = System.getProperty("kinesisapp.query"); conn = DriverManager.getConnection( System.getProperty("kinesisapp.jdbcurl"), System.getProperty("kinesisapp.dbuser"), System.getProperty("kinesisapp.dbpassword")); conn.setAutoCommit(true); AmazonKinesisClient client = new AmazonKinesisClient(); client.setEndpoint("https://kinesis.us-east-1.amazonaws.com"); String stream = "test"; int iteration = 100; int threashold = 1000; String data = new String("{\"user\":\"10125\",\"line\":\"aaa\",\"station\":\"bbb\",\"latitude\":35."); Random rand = new Random(); try { long start = System.currentTimeMillis(); String myKey = Long.toString(Thread.currentThread().getId()); for (int i = 0; i < iteration; i++) { try { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(stream); putRecordRequest.setData(ByteBuffer.wrap((data+Integer.toString(rand.nextInt(19)+52)+",\"longitude\":139."+Integer.toString(rand.nextInt(39)+51)+"}").getBytes())); putRecordRequest.setPartitionKey(myKey); PutRecordResult putRecordResult = client.putRecord(putRecordRequest); } catch(Exception iex) { } } System.out.println("Elapsed time(ms) for task " + Thread.currentThread().getId() + " : " + (System.currentTimeMillis() - start)); } catch(Exception ex) { ex.printStackTrace(); } }
public static void main(String[] args) { java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); AmazonKinesisClient client = new AmazonKinesisClient(); client.setEndpoint("https://kinesis.us-east-1.amazonaws.com"); String stream = "test"; int iteration = 100; int threashold = 1000; String data = new String("{\"user\":\"10125\",\"line\":\"aaa\",\"station\":\"bbb\",\"latitude\":35."); Random rand = new Random(); try { long start = System.currentTimeMillis(); String myKey = Long.toString(Thread.currentThread().getId()); for (int i = 0; i < iteration; i++) { try { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(stream); putRecordRequest.setData(ByteBuffer.wrap((data+Integer.toString(rand.nextInt(19)+52)+",\"longitude\":139."+Integer.toString(rand.nextInt(39)+51)+"}").getBytes())); putRecordRequest.setPartitionKey(myKey); PutRecordResult putRecordResult = client.putRecord(putRecordRequest); } catch(Exception iex) { } } System.out.println("Elapsed time(ms) for task " + Thread.currentThread().getId() + " : " + (System.currentTimeMillis() - start)); } catch(Exception ex) { ex.printStackTrace(); } }
/** * This method is invoked when a log record is successfully sent to Kinesis. * Though this is not too useful for production use cases, it provides a good * debugging tool while tweaking parameters for the appender. */ @Override public void onSuccess(PutRecordRequest request, PutRecordResult result) { successfulRequestCount++; if (logger.isDebugEnabled() && (successfulRequestCount + failedRequestCount) % 3000 == 0) { logger.debug("Appender (" + appenderName + ") made " + successfulRequestCount + " successful put requests out of total " + (successfulRequestCount + failedRequestCount) + " in " + PeriodFormat.getDefault().print(new Period(startTime, DateTime.now())) + " since start"); } }
@Override protected void handle(E event, byte[] encoded) throws Exception { ByteBuffer buffer = ByteBuffer.wrap(encoded); PutRecordRequest request = new PutRecordRequest() .withPartitionKey(getPartitionKey(event)) .withStreamName(stream) .withData(buffer); String errorMessage = format("Appender '%s' failed to send logging event '%s' to Kinesis stream '%s'", getName(), event, stream); CountDownLatch latch = new CountDownLatch(isAsyncParent() ? 0 : 1); kinesis.putRecordAsync(request, new LoggingEventHandler<PutRecordRequest, PutRecordResult>(this, latch, errorMessage)); AppenderExecutors.awaitLatch(this, latch, getMaxFlushTime()); }
/** * publish a single message */ @Override public void submit(final Event event) { try { Preconditions.checkNotNull(event, "event cannot be null"); Preconditions.checkNotNull(event.getEventData(), "event data cannot be null"); Preconditions.checkArgument(!Strings.isNullOrEmpty(event.getTopic()), "Topic is required"); Preconditions.checkNotNull(event.getPartitionKey(), "Partition key cannot be null"); Preconditions.checkArgument(!Strings.isNullOrEmpty(event.getPartitionKey().toString()), "PartitionKey is required"); /** serialize the event **/ byte[] bytes = this.mapper.writeValueAsBytes(event); final PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(event.getTopic()); putRecordRequest.setPartitionKey(event.getPartitionKey().toString()); putRecordRequest.setData(ByteBuffer.wrap(bytes)); //putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); final PutRecordResult putRecordResult = client.putRecord(putRecordRequest); LOGGER.trace("published message to stream: {} partitionKey: {}, sequenceNumberForOrdering: {}, returnedSequenceNumber:{}", putRecordRequest.getStreamName(), putRecordRequest.getPartitionKey(), putRecordRequest.getSequenceNumberForOrdering(), putRecordResult.getSequenceNumber()); } catch (JsonProcessingException e) { LOGGER.error(e.getMessage(), e); } }
@Test @SuppressWarnings("unchecked") public void testProducerErrorChannel() throws Exception { KinesisTestBinder binder = getBinder(); final RuntimeException putRecordException = new RuntimeException("putRecordRequestEx"); final AtomicReference<Object> sent = new AtomicReference<>(); AmazonKinesisAsync amazonKinesisMock = mock(AmazonKinesisAsync.class); BDDMockito.given(amazonKinesisMock.putRecordAsync(any(PutRecordRequest.class), any(AsyncHandler.class))) .willAnswer((Answer<Future<PutRecordResult>>) invocation -> { PutRecordRequest request = invocation.getArgument(0); sent.set(request.getData()); AsyncHandler<?, ?> handler = invocation.getArgument(1); handler.onError(putRecordException); return mock(Future.class); }); new DirectFieldAccessor(binder.getBinder()).setPropertyValue("amazonKinesis", amazonKinesisMock); ExtendedProducerProperties<KinesisProducerProperties> producerProps = createProducerProperties(); producerProps.setErrorChannelEnabled(true); DirectChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(producerProps)); Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0", moduleOutputChannel, producerProps); ApplicationContext applicationContext = TestUtils.getPropertyValue(binder.getBinder(), "applicationContext", ApplicationContext.class); SubscribableChannel ec = applicationContext.getBean("ec.0.errors", SubscribableChannel.class); final AtomicReference<Message<?>> errorMessage = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); ec.subscribe(message -> { errorMessage.set(message); latch.countDown(); }); String messagePayload = "oops"; moduleOutputChannel.send(new GenericMessage<>(messagePayload.getBytes())); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class); assertThat(errorMessage.get().getPayload()).isInstanceOf(AwsRequestFailureException.class); AwsRequestFailureException exception = (AwsRequestFailureException) errorMessage.get().getPayload(); assertThat(exception.getCause()).isSameAs(putRecordException); assertThat(((PutRecordRequest) exception.getRequest()).getData()).isSameAs(sent.get()); producerBinding.unbind(); }
@Override protected void doCancel() { Future<PutRecordResult> maybeFuture = future; if (maybeFuture != null) maybeFuture.cancel(true); }
@Override protected boolean doIsCanceled() { Future<PutRecordResult> maybeFuture = future; return maybeFuture != null && maybeFuture.isCancelled(); }
@Override public PutRecordResult putRecord(PutRecordRequest putRecordRequest) { throw new RuntimeException("Not implemented"); }
@Override public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) { throw new RuntimeException("Not implemented"); }
@Override public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey, String sequenceNumberForOrdering) { throw new RuntimeException("Not implemented"); }
@Override public PutRecordResult putRecord(String s, ByteBuffer byteBuffer, String s1) throws AmazonServiceException, AmazonClientException { throw new UnsupportedOperationException("MockKinesisClient doesn't support this."); }
@Override public PutRecordResult putRecord(String s, ByteBuffer byteBuffer, String s1, String s2) throws AmazonServiceException, AmazonClientException { throw new UnsupportedOperationException("MockKinesisClient doesn't support this."); }
@Override public void onSuccess(PutRecordRequest rqst, PutRecordResult result) { _logger.debug("PutRecord completed successfully. Sequence number = {}", result.getSequenceNumber()); count.incrementAndGet(); }
/** * This method is invoked when a log record is successfully sent to Kinesis. * Though this is not too useful for production use cases, it provides a good * debugging tool while tweaking parameters for the appender. */ @Override public void onSuccess(PutRecordRequest request, PutRecordResult result) { successfulRequestCount++; }