@Override public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) throws AmazonServiceException, AmazonClientException { // Setup method to add a batch of new records: InternalStream theStream = this.getStream(putRecordsRequest.getStreamName()); if (theStream != null) { PutRecordsResult result = new PutRecordsResult(); ArrayList<PutRecordsResultEntry> resultList = new ArrayList<PutRecordsResultEntry>(); for (PutRecordsRequestEntry entry : putRecordsRequest.getRecords()) { PutRecordResult putResult = theStream.putRecord(entry.getData(), entry.getPartitionKey()); resultList.add((new PutRecordsResultEntry()).withShardId(putResult.getShardId()).withSequenceNumber(putResult.getSequenceNumber())); } result.setRecords(resultList); return result; } else { throw new AmazonClientException("This stream does not exist!"); } }
/** * Based on the request and the result, returns a new request * containing the records that failed. * @param result is the last PutRecordsResult * @return a new PutRecordsRequest with failing records */ private PutRecordsRequest failedRecords(PutRecordsResult result) { List<PutRecordsRequestEntry> newRecords = new ArrayList<>(); List<PutRecordsResultEntry> records = result.getRecords(); for (int i = 0; i < records.size(); i++) { if (records.get(i).getErrorCode() != null) { newRecords.add(putRecordsRequest.getRecords().get(i)); } } return new PutRecordsRequest() .withRecords(newRecords) .withStreamName(putRecordsRequest.getStreamName()); }