@Override public void setConf(AbstractConfig config) { this.config = (FirehoseTransportConfig) config; this.serializer = new FirehoseTransportSerializer(this.config.getAppendNewline()); this.client = new AmazonKinesisFirehoseClient(new ClientConfiguration().withGzip(true)); if (this.config.getRegion() != null) { this.client.withRegion(this.config.getRegion()); } }
@Override public void start(Map<String, String> props) { batch = Boolean.parseBoolean(props.get(FirehoseSinkConnector.BATCH)); batchSize = Integer.parseInt(props.get(FirehoseSinkConnector.BATCH_SIZE)); batchSizeInBytes = Integer.parseInt(props.get(FirehoseSinkConnector.BATCH_SIZE_IN_BYTES)); deliveryStreamName = props.get(FirehoseSinkConnector.DELIVERY_STREAM); firehoseClient = new AmazonKinesisFirehoseClient(new DefaultAWSCredentialsProviderChain()); firehoseClient.setRegion(RegionUtils.getRegion(props.get(FirehoseSinkConnector.REGION))); // Validate delivery stream validateDeliveryStream(); }
protected FirehoseTransport(AmazonKinesisFirehoseClient client, String deliveryStreamName) { this.client = client; this.deliveryStreamName = deliveryStreamName; }
private void setup(){ firehoseClient = new AmazonKinesisFirehoseClient(); firehoseClient.setEndpoint(firehoseEndpointURL); checkHoseStatus(); }