Java 类com.amazonaws.services.kinesis.model.StreamStatus 实例源码

项目:kinesis-logback-appender    文件:KinesisAppender.java   
@Override
protected void validateStreamName(AmazonKinesisAsyncClient client, String streamName) {
  DescribeStreamResult describeResult = null;
  try {
    describeResult = getClient().describeStream(streamName);
    String streamStatus = describeResult.getStreamDescription().getStreamStatus();
    if(!StreamStatus.ACTIVE.name().equals(streamStatus) && !StreamStatus.UPDATING.name().equals(streamStatus)) {
      setInitializationFailed(true);
      addError("Stream " + streamName + " is not ready (in active/updating status) for appender: " + name);
    }
  }
  catch(ResourceNotFoundException rnfe) {
    setInitializationFailed(true);
    addError("Stream " + streamName + " doesn't exist for appender: " + name, rnfe);
  }
}
项目:spring-cloud-stream-binder-aws-kinesis    文件:KinesisBinderTests.java   
@Test
public void testAutoCreateStreamForNonExistingStream() throws Exception {
    KinesisTestBinder binder = getBinder();
    DirectChannel output = createBindableChannel("output", new BindingProperties());
    ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties();
    Date testDate = new Date();
    consumerProperties.getExtension()
            .setShardIteratorType(ShardIteratorType.AT_TIMESTAMP.name() + ":" + testDate.getTime());
    String testStreamName = "nonexisting" + System.currentTimeMillis();
    Binding<?> binding = binder.bindConsumer(testStreamName, "test", output, consumerProperties);
    binding.unbind();

    DescribeStreamResult streamResult = localKinesisResource.getResource().describeStream(testStreamName);
    String createdStreamName = streamResult.getStreamDescription().getStreamName();
    int createdShards = streamResult.getStreamDescription().getShards().size();
    String createdStreamStatus = streamResult.getStreamDescription().getStreamStatus();

    assertThat(createdStreamName).isEqualTo(testStreamName);
    assertThat(createdShards)
            .isEqualTo(consumerProperties.getInstanceCount() * consumerProperties.getConcurrency());
    assertThat(createdStreamStatus).isEqualTo(StreamStatus.ACTIVE.toString());

    KinesisShardOffset shardOffset =
            TestUtils.getPropertyValue(binding, "lifecycle.streamInitialSequence", KinesisShardOffset.class);
    assertThat(shardOffset.getIteratorType()).isEqualTo(ShardIteratorType.AT_TIMESTAMP);
    assertThat(shardOffset.getTimestamp()).isEqualTo(testDate);
}
项目:spring-cloud-stream-binder-aws-kinesis    文件:KinesisBinderTests.java   
private List<Shard> describeStream(String stream) {
    AmazonKinesisAsync amazonKinesis = localKinesisResource.getResource();

    String exclusiveStartShardId = null;

    DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest()
            .withStreamName(stream);

    List<Shard> shardList = new ArrayList<>();

    while (true) {
        DescribeStreamResult describeStreamResult = null;

        describeStreamRequest.withExclusiveStartShardId(exclusiveStartShardId);
        describeStreamResult = amazonKinesis.describeStream(describeStreamRequest);
        StreamDescription streamDescription = describeStreamResult.getStreamDescription();
        if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) {
            shardList.addAll(streamDescription.getShards());

            if (streamDescription.getHasMoreShards()) {
                exclusiveStartShardId = shardList.get(shardList.size() - 1).getShardId();
                continue;
            }
            else {
                return shardList;
            }
        }
        try {
            Thread.sleep(100);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
    }
}
项目:spring-cloud-stream-binder-aws-kinesis    文件:KinesisStreamProvisionerTests.java   
private static DescribeStreamResult describeStreamResultWithShards(List<Shard> shards) {
    return new DescribeStreamResult()
            .withStreamDescription(
                    new StreamDescription()
                            .withShards(shards)
                            .withStreamStatus(StreamStatus.ACTIVE)
                            .withHasMoreShards(Boolean.FALSE));
}
项目:flink    文件:KinesisProxy.java   
/**
 * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
 *
 * <p>This method is using a "full jitter" approach described in AWS's article,
 * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
 * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
 * jitter backoff approach will help distribute calls across the fetchers over time.
 *
 * @param streamName the stream to describe
 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
    final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    describeStreamRequest.setStreamName(streamName);
    describeStreamRequest.setExclusiveStartShardId(startShardId);

    DescribeStreamResult describeStreamResult = null;

    // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    int attemptCount = 0;
    while (describeStreamResult == null) { // retry until we get a result
        try {
            describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
        } catch (LimitExceededException le) {
            long backoffMillis = fullJitterBackoff(
                describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
            LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
                + backoffMillis + " millis.");
            Thread.sleep(backoffMillis);
        } catch (ResourceNotFoundException re) {
            throw new RuntimeException("Error while getting stream details", re);
        }
    }

    String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
    if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
        if (LOG.isWarnEnabled()) {
            LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
                "describeStream operation will not contain any shard information.");
        }
    }

    // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
    // start shard id in the returned shards list; check if we need to remove these erroneously returned shards
    if (startShardId != null) {
        List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
        Iterator<Shard> shardItr = shards.iterator();
        while (shardItr.hasNext()) {
            if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
                shardItr.remove();
            }
        }
    }

    return describeStreamResult;
}
项目:flink    文件:KinesisProxy.java   
/**
 * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
 *
 * This method is using a "full jitter" approach described in AWS's article,
 * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
 * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
 * jitter backoff approach will help distribute calls across the fetchers over time.
 *
 * @param streamName the stream to describe
 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
    final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    describeStreamRequest.setStreamName(streamName);
    describeStreamRequest.setExclusiveStartShardId(startShardId);

    DescribeStreamResult describeStreamResult = null;

    // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    int attemptCount = 0;
    while (describeStreamResult == null) { // retry until we get a result
        try {
            describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
        } catch (LimitExceededException le) {
            long backoffMillis = fullJitterBackoff(
                describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
            LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
                + backoffMillis + " millis.");
            Thread.sleep(backoffMillis);
        } catch (ResourceNotFoundException re) {
            throw new RuntimeException("Error while getting stream details", re);
        }
    }

    String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
    if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
        if (LOG.isWarnEnabled()) {
            LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
                "describeStream operation will not contain any shard information.");
        }
    }

    // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
    // start shard id in the returned shards list; check if we need to remove these erroneously returned shards
    if (startShardId != null) {
        List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
        Iterator<Shard> shardItr = shards.iterator();
        while (shardItr.hasNext()) {
            if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
                shardItr.remove();
            }
        }
    }

    return describeStreamResult;
}
项目:kinesis-log4j-appender    文件:KinesisAppender.java   
/**
  * Configures this appender instance and makes it ready for use by the
  * consumers. It validates mandatory parameters and confirms if the configured
  * stream is ready for publishing data yet.
  * 
  * Error details are made available through the fallback handler for this
  * appender
  * 
  * @throws IllegalStateException
  *           if we encounter issues configuring this appender instance
  */
 @Override
 public void activateOptions() {
   if (streamName == null) {
     initializationFailed = true;
     error("Invalid configuration - streamName cannot be null for appender: " + name);
   }

   if (layout == null) {
     initializationFailed = true;
     error("Invalid configuration - No layout for appender: " + name);
   }

   ClientConfiguration clientConfiguration = new ClientConfiguration();
   clientConfiguration = setProxySettingsFromSystemProperties(clientConfiguration);

   clientConfiguration.setMaxErrorRetry(maxRetries);
   clientConfiguration.setRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION,
       PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY, maxRetries, true));
   clientConfiguration.setUserAgent(AppenderConstants.USER_AGENT_STRING);

   BlockingQueue<Runnable> taskBuffer = new LinkedBlockingDeque<Runnable>(bufferSize);
   ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadCount, threadCount,
       AppenderConstants.DEFAULT_THREAD_KEEP_ALIVE_SEC, TimeUnit.SECONDS, taskBuffer, new BlockFastProducerPolicy());
   threadPoolExecutor.prestartAllCoreThreads();
   kinesisClient = new AmazonKinesisAsyncClient(new CustomCredentialsProviderChain(), clientConfiguration,
       threadPoolExecutor);

   boolean regionProvided = !Validator.isBlank(region);
   if (!regionProvided) {
     region = AppenderConstants.DEFAULT_REGION;
   }
   if (!Validator.isBlank(endpoint)) {
     if (regionProvided) {
LOGGER
    .warn("Received configuration for both region as well as Amazon Kinesis endpoint. ("
    + endpoint
    + ") will be used as endpoint instead of default endpoint for region ("
    + region + ")");
     }
     kinesisClient.setEndpoint(endpoint,
  AppenderConstants.DEFAULT_SERVICE_NAME, region);
   } else {
     kinesisClient.setRegion(Region.getRegion(Regions.fromName(region)));
   }

   DescribeStreamResult describeResult = null;
   try {
     describeResult = kinesisClient.describeStream(streamName);
     String streamStatus = describeResult.getStreamDescription().getStreamStatus();
     if (!StreamStatus.ACTIVE.name().equals(streamStatus) && !StreamStatus.UPDATING.name().equals(streamStatus)) {
       initializationFailed = true;
       error("Stream " + streamName + " is not ready (in active/updating status) for appender: " + name);
     }
   } catch (ResourceNotFoundException rnfe) {
     initializationFailed = true;
     error("Stream " + streamName + " doesn't exist for appender: " + name, rnfe);
   }

   asyncCallHander = new AsyncPutCallStatsReporter(name);
 }