/** * Method to perform PutRecordBatch operation with the given record list. * * @param recordList * the collection of records * @return the output of PutRecordBatch */ private PutRecordBatchResult putRecordBatch(List<Record> recordList) { PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest(); putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName); putRecordBatchRequest.setRecords(recordList); // Put Record Batch records. Max No.Of Records we can put in a // single put record batch request is 500 and total size < 4MB PutRecordBatchResult putRecordBatchResult = null; try { putRecordBatchResult = firehoseClient.putRecordBatch(putRecordBatchRequest); }catch(AmazonKinesisFirehoseException akfe){ System.out.println("Amazon Kinesis Firehose Exception:" + akfe.getLocalizedMessage()); }catch(Exception e){ System.out.println("Connector Exception" + e.getLocalizedMessage()); } return putRecordBatchResult; }
/** * @param sinkRecords */ private void putRecordsInBatch(Collection<SinkRecord> sinkRecords) { List<Record> recordList = new ArrayList<Record>(); int recordsInBatch = 0; int recordsSizeInBytes = 0; for (SinkRecord sinkRecord : sinkRecords) { Record record = DataUtility.createRecord(sinkRecord); recordList.add(record); recordsInBatch++; recordsSizeInBytes += record.getData().capacity(); if (recordsInBatch == batchSize || recordsSizeInBytes > batchSizeInBytes) { putRecordBatch(recordList); recordList.clear(); recordsInBatch = 0; recordsSizeInBytes = 0; } } if (recordsInBatch > 0) { putRecordBatch(recordList); } }
@Override public boolean add(InternalEvent ievent) throws IllegalStateException, IOException { if (dataRecords.size() >= MAX_RECORDS) { logger.trace("hit record index max"); throw new IllegalStateException("reached max payload size"); } byte[] record = this.serializer.serialize(ievent); /* * Restrict size of individual record */ if (record.length > MAX_RECORD_SIZE) { throw new IOException( "serialized event is " + record.length + " larger than max of " + MAX_RECORD_SIZE); } ByteBuffer data = ByteBuffer.wrap(record); dataRecords.add(new Record().withData(data)); return true; }
@Override public void close() { if (this.cos.getByteCount() != 0 && this.dataRecords.size() < MAX_RECORDS) { logger.trace("flushing remainder of buffer"); ByteBuffer data = ByteBuffer.wrap(baos.toByteArray()); this.dataRecords.add(new Record().withData(data)); } try { this.baos.close(); } catch (IOException e) { } }
@Override public ArrayList<Record> getInternalBuffer() { return this.dataRecords; }
@Override public boolean add(InternalEvent ievent) throws IllegalStateException, IOException { byte[] record = serializer.serialize(ievent); /* * Restrict size of individual record */ if (record.length > MAX_RECORD_SIZE) { throw new IOException( "serialized event is " + record.length + " larger than max of " + MAX_RECORD_SIZE); } /* * Write record if there's room in buffer */ if (dataRecords.size() >= MAX_RECORDS) { logger.trace("hit record index max"); throw new IllegalStateException("reached max payload size"); } else { if (cos.getByteCount() + record.length < MAX_RECORD_SIZE) { cos.write(record); return true; } /* * If current record is full then flush buffer to a Firehose Record and create a new buffer */ logger.trace("creating new datarecord"); ByteBuffer data = ByteBuffer.wrap(baos.toByteArray()); this.dataRecords.add(new Record().withData(data)); baos.reset(); cos.resetByteCount(); cos.resetCount(); /* * If we hit the max number of Firehose Records (4) then notify IPC service that this buffer * needs to be sent. */ if (dataRecords.size() >= MAX_RECORDS) { logger.trace("hit record index max"); throw new IllegalStateException("reached max payload size"); } /* * Otherwise write the record to the empty internal buffer */ cos.write(record); } return true; }
@Override protected void putMessage(String message) throws Exception { ByteBuffer data = ByteBuffer.wrap(message.getBytes(getEncoding())); getClient().putRecordAsync(new PutRecordRequest().withDeliveryStreamName(getStreamName()) .withRecord(new Record().withData(data)), asyncCallHandler); }
/** * Converts Kafka record into Kinesis record * * @param sinkRecord * Kafka unit of message * @return Kinesis unit of message */ public static Record createRecord(SinkRecord sinkRecord) { return new Record().withData(parseValue(sinkRecord.valueSchema(), sinkRecord.value())); }
public abstract ArrayList<Record> getInternalBuffer();