Java 类com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream 实例源码

项目:beam    文件:KinesisMockReadTest.java   
@Test
public void readsDataFromMockKinesis() {
  int noOfShards = 3;
  int noOfEventsPerShard = 100;
  List<List<AmazonKinesisMock.TestData>> testData =
      provideTestData(noOfShards, noOfEventsPerShard);

  PCollection<AmazonKinesisMock.TestData> result = p
      .apply(
          KinesisIO.read()
              .withStreamName("stream")
              .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
              .withAWSClientsProvider(new AmazonKinesisMock.Provider(testData, 10))
              .withMaxNumRecords(noOfShards * noOfEventsPerShard))
      .apply(ParDo.of(new KinesisRecordToTestData()));
  PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
  p.run();
}
项目:ingestion-service    文件:KafkaKinesisIntegrationTest.java   
private void startKinesisConsumer() throws Exception {
    AWSCredentialsProvider credentialsProvider = new
            DefaultAWSCredentialsProviderChain();

    String region = "eu-west-1";
    logger.info("Starting in Region " + region);

    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

    KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
            this.getClass().getName(), TestConstants.stream, credentialsProvider, workerId)
            .withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName(region);

    IRecordProcessorFactory recordProcessorFactory = new
            RecordFactory();
    worker = new Worker(recordProcessorFactory,
            kinesisClientLibConfiguration);

    es = Executors.newSingleThreadExecutor();
    es.execute(worker);
}
项目:spark-cstar-canaries    文件:JKinesisReceiver.java   
public JKinesisReceiver(String applicationName, String streamName,
                        String endpointUrl, String regionName,
                        Duration checkpoint, InitialPositionInStream position) {
    super(StorageLevel.MEMORY_ONLY_SER());

    this.workerId = getHostname() + ":" + String.valueOf(UUID.randomUUID());
    this.checkpointInterval = checkpoint;
    this.initialPosition = position;

    Region region = RegionUtils.getRegion(regionName);

    try {
        this.kclConfig = new KinesisClientLibConfiguration(applicationName, streamName,
                                                  getCredsProvider(),
                                                  workerId)
                        .withCommonClientConfig(CLIENT_CONF)
                        .withRegionName(region.getName())
                        .withKinesisEndpoint(endpointUrl)
                        .withInitialPositionInStream(InitialPositionInStream.LATEST)
                        .withTaskBackoffTimeMillis(500);
    } catch (Exception ex) {
        // do absolutely nothing - and feel good about it!
        // but ...
        // we'd do something meaningful in a PROD context
    }
}
项目:samza    文件:KinesisConfig.java   
/**
 * Get KCL config for a given system stream.
 * @param system name of the system
 * @param stream name of the stream
 * @param appName name of the application
 * @return Stream scoped KCL configs required to build
 *         {@link KinesisClientLibConfiguration}
 */
public KinesisClientLibConfiguration getKinesisClientLibConfig(String system, String stream, String appName) {
  ClientConfiguration clientConfig = getAWSClientConfig(system);
  String workerId = appName + "-" + UUID.randomUUID();
  InitialPositionInStream startPos = InitialPositionInStream.LATEST;
  AWSCredentialsProvider provider = credentialsProviderForStream(system, stream);
  KinesisClientLibConfiguration kinesisClientLibConfiguration =
      new KinesisClientLibConfiguration(appName, stream, provider, workerId)
          .withRegionName(getRegion(system, stream).getName())
          .withKinesisClientConfig(clientConfig)
          .withCloudWatchClientConfig(clientConfig)
          .withDynamoDBClientConfig(clientConfig)
          .withInitialPositionInStream(startPos)
          .withCallProcessRecordsEvenForEmptyRecordList(true); // For health monitoring metrics.
  // First, get system scoped configs for KCL and override with configs set at stream scope.
  setKinesisClientLibConfigs(
      subset(String.format(CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, system)), kinesisClientLibConfiguration);
  setKinesisClientLibConfigs(subset(String.format(CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, system, stream)),
      kinesisClientLibConfiguration);
  return kinesisClientLibConfiguration;
}
项目:datacollector    文件:KinesisSourceUpgrader.java   
private void upgradeToConsumerConfigBeanV1(List<Config> configs) {
  for (Config config : configs) {
    // Migrate existing configs that were moved into the Kinesis Consumer config bean
    switch (config.getName()) {
      case "applicationName":
        // fall through
      case "maxBatchSize":
        // fall through
      case "idleTimeBetweenReads":
        // fall through
      case "maxWaitTime":
        // fall through
      case "previewWaitTime":
        moveConfigToBean(config, KINESIS_CONFIG_BEAN);
        break;
      default:
        // no-op
    }
  }
  commitMove(configs);

  configs.add(new Config(KINESIS_CONFIG_BEAN + ".initialPositionInStream", InitialPositionInStream.LATEST));
}
项目:datacollector    文件:KinesisSourceIT.java   
private KinesisConsumerConfigBean getKinesisConsumerConfig(String streamName) {
  KinesisConsumerConfigBean conf = new KinesisConsumerConfigBean();
  conf.dataFormatConfig = new DataParserFormatConfig();
  conf.awsConfig = new AWSConfig();
  conf.awsConfig.awsAccessKeyId = () -> "foo";
  conf.awsConfig.awsSecretAccessKey = () -> "boo";

  conf.region = AWSRegions.OTHER;
  conf.endpoint = getKinesisEndpoint();
  conf.streamName = streamName;

  conf.dataFormat = DataFormat.JSON;
  conf.dataFormatConfig.jsonContent = JsonMode.MULTIPLE_OBJECTS;
  conf.dataFormatConfig.charset = "UTF-8";
  conf.dataFormatConfig.jsonMaxObjectLen = 1024;

  conf.applicationName = UUID.randomUUID().toString();
  conf.idleTimeBetweenReads = 250;
  conf.initialPositionInStream = InitialPositionInStream.TRIM_HORIZON;
  conf.maxBatchSize = 1000;
  conf.maxRecordProcessors = 2; // Must be at least 1

  return conf;
}
项目:aws-kinesis-zombies    文件:Consumer.java   
private void initKinesis() {
    String pid = ManagementFactory.getRuntimeMXBean().getName();
    pid = pid.indexOf('@') == -1 ? pid : pid.substring(0, pid.indexOf('@'));
    log.info("Creating kinesis consumer with pid {}.", pid);
    KinesisClientLibConfiguration config
            = new KinesisClientLibConfiguration(
                    "Zombies" /* aplication name */,
                    streamName,
                    new DefaultAWSCredentialsProviderChain(),
                    "ZombieConsumer_" + pid /* worker id*/)
            .withRegionName(region)
            .withFailoverTimeMillis(1000 * 30) // after 30 seconds this worker is considered ko                        
            .withMaxLeasesForWorker(2) // forced to read only 1 shard for demo reasons.
            .withMaxRecords(500) // max records per GetRecords
            .withCallProcessRecordsEvenForEmptyRecordList(false) // no records -> no processing
            .withInitialLeaseTableWriteCapacity(10) // Dynamodb lease table capacity
            .withInitialLeaseTableReadCapacity(10)
            .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

    final Worker worker
            = new Worker.Builder()
            .recordProcessorFactory(zombieRecordFactory)
            .config(config)
            .build();

    new Thread() {
        @Override
        public void run() {
            worker.run();
        }
    }.start();
}
项目:beam    文件:DynamicCheckpointGeneratorTest.java   
@Test
public void shouldMapAllShardsToCheckpoints() throws Exception {
  given(shard1.getShardId()).willReturn("shard-01");
  given(shard2.getShardId()).willReturn("shard-02");
  given(shard3.getShardId()).willReturn("shard-03");
  given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));

  StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
  DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
      startingPoint);

  KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);

  assertThat(checkpoint).hasSize(3);
}
项目:spark-cstar-canaries    文件:Consumer.java   
public void start() {
    final JavaStreamingContext context = new JavaStreamingContext(conf, checkpointInterval);

    // for graceful shutdown of the application ...
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            System.out.println("Shutting down streaming app...");
            context.stop(true, true);
            System.out.println("Shutdown of streaming app complete.");
        }
    });

    JKinesisReceiver receiver = new JKinesisReceiver(appName, streamName,
                                                     endpointUrl, regionName,
                                                     checkpointInterval,
                                                     InitialPositionInStream.LATEST);

    JavaDStream<String> dstream = context.receiverStream(receiver);

    JavaDStream<EventRecord> recs = dstream.map(new EventRecordMapFunc());

    recs.print();

    // persist to DStream to Cassandra
    javaFunctions(recs)
        .writerBuilder("canary", "eventrecord", mapToRow(EventRecord.class))
        .saveToCassandra();


    System.out.println("Start Spark Stream Processing...");

    context.start();
    context.awaitTermination();

}
项目:samza    文件:KinesisConfig.java   
private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) {
  for (Entry<String, String> entry : config.entrySet()) {
    boolean found = false;
    String key = entry.getKey();
    String value = entry.getValue();
    if (StringUtils.isEmpty(value)) {
      continue;
    }
    for (Method method : KinesisClientLibConfiguration.class.getMethods()) {
      if (method.getName().equals("with" + key)) {
        found = true;
        Class<?> type = method.getParameterTypes()[0];
        try {
          if (type == long.class) {
            method.invoke(kinesisLibConfig, Long.valueOf(value));
          } else if (type == int.class) {
            method.invoke(kinesisLibConfig, Integer.valueOf(value));
          } else if (type == boolean.class) {
            method.invoke(kinesisLibConfig, Boolean.valueOf(value));
          } else if (type == String.class) {
            method.invoke(kinesisLibConfig, value);
          } else if (type == InitialPositionInStream.class) {
            method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase()));
          }
          LOG.info("Loaded property " + key + " = " + value);
          break;
        } catch (Exception e) {
          throw new IllegalArgumentException(
              String.format("Error trying to set field %s with the value '%s'", key, value), e);
        }
      }
    }
    if (!found) {
      LOG.warn("Property " + key + " ignored as there is no corresponding set method");
    }
  }
}
项目:samza    文件:TestKinesisConfig.java   
@Test
public void testKclConfigs() {
  Map<String, String> kv = new HashMap<>();
  String system = "kinesis";
  String stream = "kinesis-stream";
  String systemConfigPrefix = String.format("systems.%s.", system);

  // region config is required for setting kcl config.
  kv.put(systemConfigPrefix + "aws.region", "us-east-1");

  // Kcl Configs
  kv.put(systemConfigPrefix + "aws.kcl.TableName", "sample-table");
  kv.put(systemConfigPrefix + "aws.kcl.MaxRecords", "100");
  kv.put(systemConfigPrefix + "aws.kcl.CallProcessRecordsEvenForEmptyRecordList", "true");
  kv.put(systemConfigPrefix + "aws.kcl.InitialPositionInStream", "TRIM_HORIZON");
  // override one of the Kcl configs for kinesis-stream1
  kv.put(systemConfigPrefix + "streams.kinesis-stream1.aws.kcl.InitialPositionInStream", "LATEST");

  Config config = new MapConfig(kv);
  KinesisConfig kConfig = new KinesisConfig(config);
  KinesisClientLibConfiguration kclConfig = kConfig.getKinesisClientLibConfig(system, stream, "sample-app");

  assertEquals("sample-table", kclConfig.getTableName());
  assertEquals(100, kclConfig.getMaxRecords());
  assertTrue(kclConfig.shouldCallProcessRecordsEvenForEmptyRecordList());
  assertEquals(InitialPositionInStream.TRIM_HORIZON, kclConfig.getInitialPositionInStream());

  // verify if the overriden config is applied for kinesis-stream1
  kclConfig = kConfig.getKinesisClientLibConfig(system, "kinesis-stream1", "sample-app");
  assertEquals(InitialPositionInStream.LATEST, kclConfig.getInitialPositionInStream());
}
项目:koupler    文件:KinesisEventConsumer.java   
public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) {
    KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile);

    InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition);

    KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName,
            new DefaultAWSCredentialsProviderChain(), appName)
                    .withRegionName(config.getRegion())
                    .withInitialPositionInStream(position);

    this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig);
}
项目:datacollector    文件:KinesisSource.java   
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) {
  KinesisClientLibConfiguration kclConfig =
      new KinesisClientLibConfiguration(
          conf.applicationName,
          conf.streamName,
          credentials,
          getWorkerId()
      );

  kclConfig
      .withMaxRecords(maxBatchSize)
      .withCallProcessRecordsEvenForEmptyRecordList(false)
      .withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads)
      .withInitialPositionInStream(conf.initialPositionInStream)
      .withKinesisClientConfig(clientConfiguration);

  if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) {
    kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp));
  }

  if (conf.region == AWSRegions.OTHER) {
    kclConfig.withKinesisEndpoint(conf.endpoint);
  } else {
    kclConfig.withRegionName(conf.region.getLabel());
  }

  return new Worker.Builder()
      .recordProcessorFactory(recordProcessorFactory)
      .metricsFactory(metricsFactory)
      .dynamoDBClient(dynamoDBClient)
      .cloudWatchClient(cloudWatchClient)
      .execService(executor)
      .config(kclConfig)
      .build();
}
项目:components    文件:KinesisInputRuntime.java   
private InitialPositionInStream convertToPosition(KinesisInputProperties.OffsetType offsetType) {
    switch (offsetType) {
    case LATEST:
        return InitialPositionInStream.LATEST;
    case EARLIEST:
        return InitialPositionInStream.TRIM_HORIZON;
    default:
        TalendRuntimeException.build(CommonErrorCodes.UNEXPECTED_ARGUMENT).setAndThrow(
                String.format("Do not support OffsetType %s", offsetType));
        return null;
    }
}
项目:tweetamo    文件:TweetamoServer.java   
/**
 * @param propertiesFile
 * @throws IOException Thrown when we run into issues reading properties
 */
private static void loadProperties(String propertiesFile) throws IOException {
    FileInputStream inputStream = new FileInputStream(propertiesFile);
    Properties properties = new Properties();
    try {
        properties.load(inputStream);
    } finally {
        inputStream.close();
    }

    String appNameOverride = properties.getProperty(ConfigKeys.APPLICATION_NAME_KEY);
    if (appNameOverride != null) {
        applicationName = appNameOverride;
    }
    LOG.info("Using application name " + applicationName);

    String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY);
    if (streamNameOverride != null) {
        streamName = streamNameOverride;
    }
    LOG.info("Using stream name " + streamName);

    String kinesisEndpointOverride = properties.getProperty(ConfigKeys.KINESIS_ENDPOINT_KEY);
    if (kinesisEndpointOverride != null) {
        kinesisEndpoint = kinesisEndpointOverride;
    }
    String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY);
    if (initialPositionOverride != null) {
         initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride);
    }
     LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found).");
}
项目:aws-kinesis-beanstalk-workers    文件:ManagedConsumer.java   
public void configure() throws Exception {
    if (!isConfigured) {
        validateConfig();

        try {
            String userAgent = "AWSKinesisManagedConsumer/" + this.version;

            if (this.positionInStream != null) {
                streamPosition = InitialPositionInStream.valueOf(this.positionInStream);
            } else {
                streamPosition = InitialPositionInStream.LATEST;
            }

            // append the environment name to the application name
            if (environmentName != null) {
                appName = String.format("%s-%s", appName, environmentName);
            }

            // ensure the JVM will refresh the cached IP values of AWS
            // resources
            // (e.g. service endpoints).
            java.security.Security.setProperty("networkaddress.cache.ttl", "60");

            String workerId = NetworkInterface.getNetworkInterfaces() + ":" + UUID.randomUUID();
            LOG.info("Using Worker ID: " + workerId);

            // obtain credentials using the default provider chain or the
            // credentials provider supplied
            AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain()
                    : this.credentialsProvider;

            LOG.info("Using credentials with Access Key ID: "
                    + credentialsProvider.getCredentials().getAWSAccessKeyId());

            config = new KinesisClientLibConfiguration(appName, streamName,
                    credentialsProvider, workerId).withInitialPositionInStream(streamPosition).withKinesisEndpoint(
                    kinesisEndpoint);

            config.getKinesisClientConfiguration().setUserAgent(userAgent);

            if (regionName != null) {
                Region region = Region.getRegion(Regions.fromName(regionName));
                config.withRegionName(region.getName());
            }

            if (this.maxRecords != -1)
                config.withMaxRecords(maxRecords);

            if (this.positionInStream != null)
                config.withInitialPositionInStream(InitialPositionInStream.valueOf(this.positionInStream));

            LOG.info(String.format(
                    "Amazon Kinesis Managed Client prepared for %s on %s in %s (%s) using %s Max Records",
                    config.getApplicationName(), config.getStreamName(),
                    config.getRegionName(), config.getWorkerIdentifier(),
                    config.getMaxRecords()));

            isConfigured = true;
        } catch (Exception e) {
            throw new InvalidConfigurationException(e);
        }
    }
}
项目:zipkin-aws    文件:KinesisStreamFactory.java   
public static Builder newBuilder() {
  return new Builder()
      .checkpointIntervalMillis(2000)
      .initialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
      .storageLevel(StorageLevel.MEMORY_AND_DISK_2());
}
项目:beam    文件:KinesisIO.java   
/**
 * Specify reading from some initial position in stream.
 */
public Read withInitialPositionInStream(InitialPositionInStream initialPosition) {
  return toBuilder()
      .setInitialPosition(new StartingPoint(initialPosition))
      .build();
}
项目:beam    文件:StartingPoint.java   
public StartingPoint(InitialPositionInStream position) {
  this.position = checkNotNull(position, "position");
  this.timestamp = null;
}
项目:beam    文件:StartingPoint.java   
public InitialPositionInStream getPosition() {
  return position;
}
项目:dynamodb-streams-kafka    文件:KafkaDynamoStreamAdapter.java   
public InitialPositionInStream getInitialStreamPoisition() {
    return initialStreamPosition;
}
项目:dynamodb-streams-kafka    文件:KafkaDynamoStreamAdapter.java   
public void setInitialStreamPosition(InitialPositionInStream initialPosition) {
    initialStreamPosition = initialPosition;
}
项目:datacollector    文件:InitialPositionInStreamChooserValues.java   
public InitialPositionInStreamChooserValues() {
  super(InitialPositionInStream.values());
}
项目:amazon-kinesis-aggregators    文件:AggregatorConsumer.java   
public void configure() throws Exception {
    if (!isConfigured) {
        validateConfig();

        if (this.positionInStream != null) {
            streamPosition = InitialPositionInStream
                    .valueOf(this.positionInStream);
        } else {
            streamPosition = InitialPositionInStream.LATEST;
        }

        // append the environment name to the application name
        if (environmentName != null) {
            appName = String.format("%s-%s", appName, environmentName);
        }

        // ensure the JVM will refresh the cached IP values of AWS resources
        // (e.g. service endpoints).
        java.security.Security
                .setProperty("networkaddress.cache.ttl", "60");

        String workerId = NetworkInterface.getNetworkInterfaces() + ":"
                + UUID.randomUUID();
        LOG.info("Using Worker ID: " + workerId);

        // obtain credentials using the default provider chain or the
        // credentials provider supplied
        AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain()
                : this.credentialsProvider;

        LOG.info("Using credentials with Access Key ID: "
                + credentialsProvider.getCredentials().getAWSAccessKeyId());

        config = new KinesisClientLibConfiguration(appName, streamName,
                credentialsProvider, workerId).withInitialPositionInStream(
                streamPosition).withKinesisEndpoint(kinesisEndpoint);

        config.getKinesisClientConfiguration().setUserAgent(
                StreamAggregator.AWSApplication);

        if (regionName != null) {
            Region region = Region.getRegion(Regions.fromName(regionName));
            config.withRegionName(region.getName());
        }

        if (maxRecords != -1)
            config.withMaxRecords(maxRecords);

        // initialise the Aggregators
        aggGroup = buildAggregatorsFromConfig();

        LOG.info(String
                .format("Amazon Kinesis Aggregators Managed Client prepared for %s on %s in %s (%s) using %s Max Records",
                        config.getApplicationName(),
                        config.getStreamName(), config.getRegionName(),
                        config.getWorkerIdentifier(),
                        config.getMaxRecords()));

        isConfigured = true;
    }
}
项目:aws-big-data-blog    文件:ManagedConsumer.java   
public void configure() throws Exception {
    if (!isConfigured) {
        validateConfig();

        try {
            String userAgent = "AWSKinesisManagedConsumer/" + this.version;

            if (this.positionInStream != null) {
                streamPosition = InitialPositionInStream.valueOf(this.positionInStream);
            } else {
                streamPosition = InitialPositionInStream.LATEST;
            }

            // append the environment name to the application name
            if (environmentName != null) {
                appName = String.format("%s-%s", appName, environmentName);
            }

            // ensure the JVM will refresh the cached IP values of AWS
            // resources
            // (e.g. service endpoints).
            java.security.Security.setProperty("networkaddress.cache.ttl", "60");

            String workerId = NetworkInterface.getNetworkInterfaces() + ":" + UUID.randomUUID();
            LOG.info("Using Worker ID: " + workerId);

            // obtain credentials using the default provider chain or the
            // credentials provider supplied
            AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain()
                    : this.credentialsProvider;

            LOG.info("Using credentials with Access Key ID: "
                    + credentialsProvider.getCredentials().getAWSAccessKeyId());

            config = new KinesisClientLibConfiguration(appName, streamName,
                    credentialsProvider, workerId).withInitialPositionInStream(streamPosition).withKinesisEndpoint(
                    kinesisEndpoint);

            config.getKinesisClientConfiguration().setUserAgent(userAgent);

            if (regionName != null) {
                Region region = Region.getRegion(Regions.fromName(regionName));
                config.withRegionName(region.getName());
            }

            if (this.maxRecords != -1)
                config.withMaxRecords(maxRecords);

            if (this.positionInStream != null)
                config.withInitialPositionInStream(InitialPositionInStream.valueOf(this.positionInStream));

            LOG.info(String.format(
                    "Amazon Kinesis Managed Client prepared for %s on %s in %s (%s) using %s Max Records",
                    config.getApplicationName(), config.getStreamName(),
                    config.getRegionName(), config.getWorkerIdentifier(),
                    config.getMaxRecords()));

            isConfigured = true;
        } catch (Exception e) {
            throw new InvalidConfigurationException(e);
        }
    }
}
项目:aws-big-data-blog    文件:KinesisApplication.java   
/**
 * @param propertiesFile
 * @throws IOException Thrown when we run into issues reading properties
 */
private static void loadProperties(String propertiesFile) throws IOException {
    FileInputStream inputStream = new FileInputStream(propertiesFile);
    Properties properties = new Properties();
    try {
        properties.load(inputStream);
    } finally {
        inputStream.close();
    }

    String appNameOverride = properties.getProperty(ConfigKeys.APPLICATION_NAME_KEY);
    if (appNameOverride != null) {
        applicationName = appNameOverride;
    }
    LOG.info("Using application name " + applicationName);

    String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY);
    if (streamNameOverride != null) {
        streamName = streamNameOverride;
    }
    LOG.info("Using stream name " + streamName);

    String kinesisEndpointOverride = properties.getProperty(ConfigKeys.KINESIS_ENDPOINT_KEY);
    if (kinesisEndpointOverride != null) {
        kinesisEndpoint = kinesisEndpointOverride;
    }
    LOG.info("Using Kinesis endpoint " + kinesisEndpoint);

    String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY);
    if (initialPositionOverride != null) {
         initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride);
    }
    LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found).");

    String redisEndpointOverride = properties.getProperty(ConfigKeys.REDIS_ENDPOINT);
    if (redisEndpointOverride != null) {
        redisEndpoint = redisEndpointOverride;
    }
    LOG.info("Using Redis endpoint " + redisEndpoint);

    String redisPortOverride = properties.getProperty(ConfigKeys.REDIS_PORT);
    if (redisPortOverride != null) {
        try {
            redisPort = Integer.parseInt(redisPortOverride);
        } catch(Exception e) {

        }
    }
    LOG.info("Using Redis port " + redisPort);

}
项目:aws-big-data-blog    文件:AmazonDynamoDBStreamstoIgnite.java   
public void run() throws Exception {
    adapterClient = new AmazonDynamoDBStreamsAdapterClient(new ClientConfiguration());
    adapterClient.setEndpoint(streamsEndpoint);
    dynamoDBClient = new AmazonDynamoDBClient(new ClientConfiguration());
    dynamoDBClient.setEndpoint(dynamodbEndpoint);

    cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration());

    TcpDiscoverySpi spi = new TcpDiscoverySpi();
    TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
    List<String> hostList = Arrays.asList(Properties.getString("hostList").split(","));
    ipFinder.setAddresses(hostList);
    spi.setIpFinder(ipFinder);
    IgniteConfiguration cfg = new IgniteConfiguration();
    cfg.setDiscoverySpi(spi);
    cfg.setClientMode(true);
    cfg.setPeerClassLoadingEnabled(true);

    @SuppressWarnings("unused")
    Ignite ignite = Ignition.start(cfg);
    cache = Ignition.ignite().cache(Properties.getString("cacheName"));
    LOG.info(">>> cache acquired");

    recordProcessorFactory = new StreamsRecordProcessorFactory(cache);
    workerConfig = new KinesisClientLibConfiguration(Properties.getString("applicationName"), streamArn,
            streamsCredentials, "ddbstreamsworker")
                    .withMaxRecords(Integer.parseInt(Properties.getString("maxRecords")))
                    .withInitialPositionInStream(
                            InitialPositionInStream.valueOf(Properties.getString("initialPositionInStream")));

    LOG.info("Creating worker for stream: " + streamArn);
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    LOG.info("Starting worker...");

    int exitCode = 0;
    try {
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.");
        t.printStackTrace();
        exitCode = 1;
    }
    System.exit(exitCode);
}
项目:aws-dynamodb-examples    文件:StreamsAdapterDemo.java   
/**
 * @param args
 */
public static void main(String[] args) throws Exception {
    System.out.println("Starting demo...");

    String srcTable = tablePrefix + "-src";
    String destTable = tablePrefix + "-dest";
    streamsCredentials = new ProfileCredentialsProvider();
    dynamoDBCredentials = new ProfileCredentialsProvider();
    recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable);


    /* ===== REQUIRED =====
     * Users will have to explicitly instantiate and configure the adapter, then pass it to
     * the KCL worker.
     */
    adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration());
    adapterClient.setEndpoint(streamsEndpoint);

    dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration());
    dynamoDBClient.setEndpoint(dynamodbEndpoint);

    cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration());

    setUpTables();

    workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo",
            streamArn, streamsCredentials, "streams-demo-worker")
        .withMaxRecords(1)
        .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

    System.out.println("Creating worker for stream: " + streamArn);
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    System.out.println("Starting worker...");
    Thread t = new Thread(worker);
    t.start();

    Thread.sleep(25000);
    worker.shutdown();
    t.join();

    if(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems().equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
        System.out.println("Scan result is equal.");
    } else {
        System.out.println("Tables are different!");
    }

    System.out.println("Done.");
    cleanupAndExit(0);
}