public KinesisTestBinder(AmazonKinesisAsync amazonKinesis, KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties) { this.amazonKinesis = amazonKinesis; KinesisStreamProvisioner provisioningProvider = new KinesisStreamProvisioner(amazonKinesis, kinesisBinderConfigurationProperties); KinesisMessageChannelBinder binder = new KinesisMessageChannelBinder(amazonKinesis, kinesisBinderConfigurationProperties, provisioningProvider); GenericApplicationContext context = new GenericApplicationContext(); context.refresh(); binder.setApplicationContext(context); setBinder(binder); }
@Test @Ignore("Kinesalite doesn't support updateShardCount. Test only against real AWS Kinesis") public void testPartitionCountIncreasedIfAutoAddPartitionsSet() { KinesisBinderConfigurationProperties configurationProperties = new KinesisBinderConfigurationProperties(); String stream = "existing" + System.currentTimeMillis(); AmazonKinesisAsync amazonKinesis = localKinesisResource.getResource(); amazonKinesis.createStream(stream, 1); List<Shard> shards = describeStream(stream); assertThat(shards.size()).isEqualTo(1); configurationProperties.setMinShardCount(6); configurationProperties.setAutoAddShards(true); KinesisTestBinder binder = getBinder(configurationProperties); ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties(); Binding<?> binding = binder.bindConsumer(stream, "test", new NullChannel(), consumerProperties); binding.unbind(); shards = describeStream(stream); assertThat(shards.size()).isEqualTo(6); }
private AmazonKinesisAsync getAsyncClient(MapWrap configuration) { Optional<ClientConfiguration> kinesisConfig = configuration.exists("kinesis_config") ? Optional.of(configuration.getObject("kinesis_config")) : Optional.empty(); if (kinesisConfig.isPresent()) { return createClient(kinesisConfig.get(), configuration); } ClientConfiguration clientConfiguration = new ClientConfiguration() .withMaxConnections(configuration.asInt("max_connections", 10)) .withRequestTimeout(configuration.asInt("request_timeout", 60000)); if (System.getenv("https_proxy") != null) { URI proxy = URI.create(System.getenv("https_proxy")); LOGGER.info("Using proxy {}", proxy); clientConfiguration.withProxyHost(proxy.getHost()) .withProxyPort(proxy.getPort()); } return createClient(clientConfiguration, configuration); }
SimpleRetryableKinesisClient(AmazonKinesisAsync amazonKinesisClient, String stream, Optional<String> partitionKey) { this.amazonKinesisClient = amazonKinesisClient; this.stream = stream; if (partitionKey.isPresent()) { this.partitionKeySupplier = Optional.of(new Supplier<StringTemplate>() { final StringTemplate partitionKeyTemplate = StringTemplate.compile(partitionKey.get()); @Override public StringTemplate get() { return partitionKeyTemplate; } }); } else { this.partitionKeySupplier = Optional.empty(); } }
@Bean public AmazonKinesisAsync amazonKinesis(AWSCredentialsProvider awsCredentialsProvider, RegionProvider regionProvider) { return AmazonKinesisAsyncClientBuilder.standard() .withCredentials(awsCredentialsProvider) .withRegion( regionProvider.getRegion() .getName()) .build(); }
public KinesisMessageChannelBinder(AmazonKinesisAsync amazonKinesis, KinesisBinderConfigurationProperties configurationProperties, KinesisStreamProvisioner provisioningProvider) { super(headersToMap(configurationProperties), provisioningProvider); Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null"); this.configurationProperties = configurationProperties; this.amazonKinesis = amazonKinesis; }
private List<Shard> describeStream(String stream) { AmazonKinesisAsync amazonKinesis = localKinesisResource.getResource(); String exclusiveStartShardId = null; DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest() .withStreamName(stream); List<Shard> shardList = new ArrayList<>(); while (true) { DescribeStreamResult describeStreamResult = null; describeStreamRequest.withExclusiveStartShardId(exclusiveStartShardId); describeStreamResult = amazonKinesis.describeStream(describeStreamRequest); StreamDescription streamDescription = describeStreamResult.getStreamDescription(); if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) { shardList.addAll(streamDescription.getShards()); if (streamDescription.getHasMoreShards()) { exclusiveStartShardId = shardList.get(shardList.size() - 1).getShardId(); continue; } else { return shardList; } } try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } } }
@Memoized AmazonKinesisAsync get() { AmazonKinesisAsyncClientBuilder builder = AmazonKinesisAsyncClientBuilder.standard(); if (credentialsProvider() != null) { builder.withCredentials(credentialsProvider()); } if (endpointConfiguration() != null) { builder.withEndpointConfiguration(endpointConfiguration()); } if (region() != null) { builder.withRegion(region()); } AmazonKinesisAsync result = builder.build(); provisioned = true; return result; }
@Bean public KinesisStreamProvisioner provisioningProvider(AmazonKinesisAsync amazonKinesis) { return new KinesisStreamProvisioner(amazonKinesis, this.configurationProperties); }
@Test @SuppressWarnings("unchecked") public void testProducerErrorChannel() throws Exception { KinesisTestBinder binder = getBinder(); final RuntimeException putRecordException = new RuntimeException("putRecordRequestEx"); final AtomicReference<Object> sent = new AtomicReference<>(); AmazonKinesisAsync amazonKinesisMock = mock(AmazonKinesisAsync.class); BDDMockito.given(amazonKinesisMock.putRecordAsync(any(PutRecordRequest.class), any(AsyncHandler.class))) .willAnswer((Answer<Future<PutRecordResult>>) invocation -> { PutRecordRequest request = invocation.getArgument(0); sent.set(request.getData()); AsyncHandler<?, ?> handler = invocation.getArgument(1); handler.onError(putRecordException); return mock(Future.class); }); new DirectFieldAccessor(binder.getBinder()).setPropertyValue("amazonKinesis", amazonKinesisMock); ExtendedProducerProperties<KinesisProducerProperties> producerProps = createProducerProperties(); producerProps.setErrorChannelEnabled(true); DirectChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(producerProps)); Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0", moduleOutputChannel, producerProps); ApplicationContext applicationContext = TestUtils.getPropertyValue(binder.getBinder(), "applicationContext", ApplicationContext.class); SubscribableChannel ec = applicationContext.getBean("ec.0.errors", SubscribableChannel.class); final AtomicReference<Message<?>> errorMessage = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); ec.subscribe(message -> { errorMessage.set(message); latch.countDown(); }); String messagePayload = "oops"; moduleOutputChannel.send(new GenericMessage<>(messagePayload.getBytes())); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class); assertThat(errorMessage.get().getPayload()).isInstanceOf(AwsRequestFailureException.class); AwsRequestFailureException exception = (AwsRequestFailureException) errorMessage.get().getPayload(); assertThat(exception.getCause()).isSameAs(putRecordException); assertThat(((PutRecordRequest) exception.getRequest()).getData()).isSameAs(sent.get()); producerBinding.unbind(); }
private AmazonKinesisAsync createClient(ClientConfiguration config, MapWrap configuration) { AmazonKinesisAsync kinesisClient = new AmazonKinesisAsyncClient(getAwsCredentialsProvider( configuration, config), config, Executors.newFixedThreadPool(config.getMaxConnections(), new ThreadFactoryBuilder().setNameFormat("lumbermill-async-kinesis-%d").build())); Regions region = Regions.fromName(configuration.asString("region", "eu-west-1")); kinesisClient.setRegion(Region.getRegion(region)); if (configuration.exists("endpoint")) { String endpoint = configuration.asString("endpoint"); if (endpoint.length() > 0) { kinesisClient.setEndpoint(endpoint); } } return kinesisClient; }