@Test public void testProvisionProducerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KinesisProducerProperties()); String name = "test-stream"; Integer shards = 1; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, shards)) .thenReturn(new CreateStreamResult()); ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, shards); assertThat(destination.getName()).isEqualTo(name); }
@Test public void testProvisionConsumerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); int instanceCount = 1; int concurrency = 1; ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); extendedConsumerProperties.setInstanceCount(instanceCount); extendedConsumerProperties.setConcurrency(concurrency); String name = "test-stream"; String group = "test-group"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, instanceCount * concurrency)) .thenReturn(new CreateStreamResult()); ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, instanceCount * concurrency); assertThat(destination.getName()).isEqualTo(name); }
@Override public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) throws AmazonServiceException, AmazonClientException { // Setup method to create a new stream: InternalStream stream = new InternalStream(createStreamRequest.getStreamName(), createStreamRequest.getShardCount(), true); this.streams.add(stream); return new CreateStreamResult(); }
@Override public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) { throw new RuntimeException("Not implemented"); }
@Override public CreateStreamResult createStream(String streamName, Integer shardCount) { throw new RuntimeException("Not implemented"); }
@Override public CreateStreamResult createStream(String s, Integer integer) throws AmazonServiceException, AmazonClientException { return this.createStream((new CreateStreamRequest()).withStreamName(s).withShardCount(integer)); }