@Override public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) throws AmazonServiceException, AmazonClientException { // Setup method to add a batch of new records: InternalStream theStream = this.getStream(putRecordsRequest.getStreamName()); if (theStream != null) { PutRecordsResult result = new PutRecordsResult(); ArrayList<PutRecordsResultEntry> resultList = new ArrayList<PutRecordsResultEntry>(); for (PutRecordsRequestEntry entry : putRecordsRequest.getRecords()) { PutRecordResult putResult = theStream.putRecord(entry.getData(), entry.getPartitionKey()); resultList.add((new PutRecordsResultEntry()).withShardId(putResult.getShardId()).withSequenceNumber(putResult.getSequenceNumber())); } result.setRecords(resultList); return result; } else { throw new AmazonClientException("This stream does not exist!"); } }
@Test(enabled = false) public void testKinesisPutRecords() throws Exception { List<LogEvent> logEventList = Collections.singletonList( Log4jLogEvent.createEvent("foobar", null, "foo.bar", Level.ERROR, null, null, null, null, null, "tname", null, 12345)); Collection<PutRecordsRequestEntry> records = new ArrayList<>(); for (LogEvent logEvent : logEventList) { String toJson = objectMapper.writeValueAsString(logEvent); PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry() .withData(ByteBuffer.wrap(toJson.getBytes())) .withPartitionKey("testKinesisPutRecords"); records.add(putRecordsRequestEntry); } PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setRecords(records); putRecordsRequest.setStreamName(awsStreamName); PutRecordsResult putRecordsResult = client.putRecords(putRecordsRequest); System.out.println(putRecordsResult.toString()); }
/** * Based on non successful records, returns a RequestContext with a correct PutRecordsRequest * * @param result is the last result returned from Kinesis * @return RequestContext IF there are more attempts, Optional.empty() otherwise */ public Observable<Optional<RequestContext>> nextAttempt(PutRecordsResult result) { this.attempt.incrementAndGet(); if (!hasNextAttempt()) { return Observable.just(Optional.empty()); } this.putRecordsRequest = failedRecords(result); return Observables.just(Optional.of(this)).withDelay(timer); }
/** * Based on the request and the result, returns a new request * containing the records that failed. * @param result is the last PutRecordsResult * @return a new PutRecordsRequest with failing records */ private PutRecordsRequest failedRecords(PutRecordsResult result) { List<PutRecordsRequestEntry> newRecords = new ArrayList<>(); List<PutRecordsResultEntry> records = result.getRecords(); for (int i = 0; i < records.size(); i++) { if (records.get(i).getErrorCode() != null) { newRecords.add(putRecordsRequest.getRecords().get(i)); } } return new PutRecordsRequest() .withRecords(newRecords) .withStreamName(putRecordsRequest.getStreamName()); }
@Override public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) { throw new RuntimeException("Not implemented"); }