/** * Method to perform PutRecordBatch operation with the given record list. * * @param recordList * the collection of records * @return the output of PutRecordBatch */ private PutRecordBatchResult putRecordBatch(List<Record> recordList) { PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest(); putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName); putRecordBatchRequest.setRecords(recordList); // Put Record Batch records. Max No.Of Records we can put in a // single put record batch request is 500 and total size < 4MB PutRecordBatchResult putRecordBatchResult = null; try { putRecordBatchResult = firehoseClient.putRecordBatch(putRecordBatchRequest); }catch(AmazonKinesisFirehoseException akfe){ System.out.println("Amazon Kinesis Firehose Exception:" + akfe.getLocalizedMessage()); }catch(Exception e){ System.out.println("Connector Exception" + e.getLocalizedMessage()); } return putRecordBatchResult; }
public void sendBatch(TransportBuffer buffer) { FirehoseTransportBuffer tb = (FirehoseTransportBuffer) buffer; /* * Create batch put request with given records */ PutRecordBatchRequest batch = new PutRecordBatchRequest() .withDeliveryStreamName(this.deliveryStreamName).withRecords(tb.getInternalBuffer()); /* * Put recored */ client.putRecordBatch(batch); }
private void flush(List<com.amazonaws.services.kinesisfirehose.model.Record> records, List<Record> sdcRecords) throws StageException { if (records.isEmpty()) { return; } PutRecordBatchRequest batchRequest = new PutRecordBatchRequest() .withDeliveryStreamName(conf.streamName) .withRecords(records); PutRecordBatchResult result = firehoseClient.putRecordBatch(batchRequest); int numFailed = result.getFailedPutCount(); if (numFailed != 0) { List<PutRecordBatchResponseEntry> responses = result.getRequestResponses(); for (int i = 0; i < responses.size(); i++) { PutRecordBatchResponseEntry response = responses.get(i); if (response.getErrorCode() != null) { LOG.error(Errors.KINESIS_05.getMessage(), sdcRecords.get(i), response.getErrorMessage()); errorRecordHandler.onError( new OnRecordErrorException( sdcRecords.get(i), Errors.KINESIS_05, sdcRecords.get(i), response.getErrorMessage() ) ); } } } recordCounter += records.size(); records.clear(); sdcRecords.clear(); }