public void createStream(int shardCount, String streamName) { CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName(streamName); createStreamRequest.setShardCount(shardCount); amazonKinesisClient.createStream(createStreamRequest); try { while (checkStreamStatus(streamName).equals("ACTIVE") == false) { MILLISECONDS.sleep(1000); } } catch (Exception e) { } streamsCreated.add(streamName); }
/** * Create the specified topic with the specified number of partitions */ public void createTopic(String topicName, int partitions) { LOGGER.info("Determining if Kinesis topic: {} already exists...", topicName); try{ final DescribeStreamRequest describeRequest = new DescribeStreamRequest(); describeRequest.withStreamName(topicName); this.client.describeStream(describeRequest); }catch(ResourceNotFoundException rnf){ LOGGER.info("Kinesis stream for topic: {} does not exist, creating now with shard count: {}",topicName, partitions); final CreateStreamRequest request = new CreateStreamRequest(); request.withStreamName(topicName); request.withShardCount(partitions); this.client.createStream(request); this.waitForStreamToBecomeAvailable(topicName, DEFAULT_WAIT_TIME_MINUTES); LOGGER.info("Create topic completed for topic: {}", topicName); } }
@Override public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) throws AmazonServiceException, AmazonClientException { // Setup method to create a new stream: InternalStream stream = new InternalStream(createStreamRequest.getStreamName(), createStreamRequest.getShardCount(), true); this.streams.add(stream); return new CreateStreamResult(); }
@Override public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) { throw new RuntimeException("Not implemented"); }
@Override public CreateStreamResult createStream(String s, Integer integer) throws AmazonServiceException, AmazonClientException { return this.createStream((new CreateStreamRequest()).withStreamName(s).withShardCount(integer)); }