/** * Asynchronously puts records to kinesis. * * @param events - List of events to send * @return - Observable with same list as parameter */ public Observable<List<T>> putRecords(List<T> events) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("putRecords() with {} events", events.size()); } RequestContext request = new RequestContext(events, new PutRecordsRequest() .withRecords(events.stream() .map(this::toRecordEntries) .collect(toList())) .withStreamName(stream)); putRecordsAsync(request); return request.subject; }
private void generateRecords() { // Create dummy message int recordNo = 1; while (recordNo <= sendCount) { String dataStr = "Record_" + recordNo; PutRecordsRequestEntry putRecordsEntry = new PutRecordsRequestEntry(); putRecordsEntry.setData(ByteBuffer.wrap(dataStr.getBytes())); putRecordsEntry.setPartitionKey(dataStr); putRecordsRequestEntryList.add(putRecordsEntry); if ( (putRecordsRequestEntryList.size() == batchSize) || (recordNo == sendCount )) { PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); putRecordsRequest.setRecords(putRecordsRequestEntryList); client.putRecords(putRecordsRequest); putRecordsRequestEntryList.clear(); } recordNo++; } }
private void createDummyMessages(String streamName, int count) throws Exception { PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < count; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes())); putRecordsRequestEntry.setPartitionKey(Long.toString(i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); mockClient.putRecords(putRecordsRequest); }
private void createJsonMessages(String streamName, int count, int idStart) throws Exception { String jsonFormat = "{\"id\" : %d, \"name\" : \"%s\"}"; PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < count; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); long id = idStart + i; String name = UUID.randomUUID().toString(); String jsonVal = String.format(jsonFormat, id, name); // ? with StandardCharsets.UTF_8 putRecordsRequestEntry.setData(ByteBuffer.wrap(jsonVal.getBytes())); putRecordsRequestEntry.setPartitionKey(Long.toString(id)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); mockClient.putRecords(putRecordsRequest); }
@Override public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) throws AmazonServiceException, AmazonClientException { // Setup method to add a batch of new records: InternalStream theStream = this.getStream(putRecordsRequest.getStreamName()); if (theStream != null) { PutRecordsResult result = new PutRecordsResult(); ArrayList<PutRecordsResultEntry> resultList = new ArrayList<PutRecordsResultEntry>(); for (PutRecordsRequestEntry entry : putRecordsRequest.getRecords()) { PutRecordResult putResult = theStream.putRecord(entry.getData(), entry.getPartitionKey()); resultList.add((new PutRecordsResultEntry()).withShardId(putResult.getShardId()).withSequenceNumber(putResult.getSequenceNumber())); } result.setRecords(resultList); return result; } else { throw new AmazonClientException("This stream does not exist!"); } }
private void createMessages(String streamName, int count) throws Exception { PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < count; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes())); putRecordsRequestEntry.setPartitionKey(Long.toString(i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); embeddedKinesisStream.getKinesisClient().putRecords(putRecordsRequest); }
@Test(enabled = false) public void testKinesisPutRecords() throws Exception { List<LogEvent> logEventList = Collections.singletonList( Log4jLogEvent.createEvent("foobar", null, "foo.bar", Level.ERROR, null, null, null, null, null, "tname", null, 12345)); Collection<PutRecordsRequestEntry> records = new ArrayList<>(); for (LogEvent logEvent : logEventList) { String toJson = objectMapper.writeValueAsString(logEvent); PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry() .withData(ByteBuffer.wrap(toJson.getBytes())) .withPartitionKey("testKinesisPutRecords"); records.add(putRecordsRequestEntry); } PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setRecords(records); putRecordsRequest.setStreamName(awsStreamName); PutRecordsResult putRecordsResult = client.putRecords(putRecordsRequest); System.out.println(putRecordsResult.toString()); }
/** * Based on the request and the result, returns a new request * containing the records that failed. * @param result is the last PutRecordsResult * @return a new PutRecordsRequest with failing records */ private PutRecordsRequest failedRecords(PutRecordsResult result) { List<PutRecordsRequestEntry> newRecords = new ArrayList<>(); List<PutRecordsResultEntry> records = result.getRecords(); for (int i = 0; i < records.size(); i++) { if (records.get(i).getErrorCode() != null) { newRecords.add(putRecordsRequest.getRecords().get(i)); } } return new PutRecordsRequest() .withRecords(newRecords) .withStreamName(putRecordsRequest.getStreamName()); }
private void flushRecords() { try { PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); putRecordsRequest.setRecords(putRecordsRequestEntryList); client.putRecords(putRecordsRequest); putRecordsRequestEntryList.clear(); logger.debug( "Records flushed." ); } catch (AmazonClientException e) { logger.warn( "PutRecordsRequest exception.", e ); throw new RuntimeException(e); } }
public Optional<PutRecordsRequest> put(PutRecordsRequestEntry entry) { int newRequestSize = requestSize + entry.getData().remaining() + entry.getPartitionKey().length(); if (entries.size() < maxCount && newRequestSize <= maxSize) { requestSize = newRequestSize; entries.add(entry); return Optional.empty(); } else { Optional<PutRecordsRequest> ret = flush(); put(entry); return ret; } }
public Optional<PutRecordsRequest> flush() { if (entries.size() > 0) { PutRecordsRequest r = new PutRecordsRequest(); r.setRecords(entries); entries = new ArrayList<>(); requestSize = 0; return Optional.of(r); } else { return Optional.empty(); } }
@Override public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) { throw new RuntimeException("Not implemented"); }
public RequestContext(List<E> events, PutRecordsRequest putRecordsRequest) { this.events = events; this.putRecordsRequest = putRecordsRequest; }
protected void flush() { kinesis.putRecords(new PutRecordsRequest() .withStreamName(STREAM_NAME) .withRecords(entries)); entries.clear(); }