Java 类com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration 实例源码

项目:zipkin-aws    文件:KinesisCollector.java   
@Override
public KinesisCollector start() {
  String workerId = null;
  try {
    workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
  } catch (UnknownHostException e) {
    workerId = UUID.randomUUID().toString();
  }
  KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId);
  processor = new KinesisRecordProcessorFactory(collector);
  worker = new Worker.Builder()
      .recordProcessorFactory(processor)
      .config(config)
      .build();

  executor.execute(worker);
  return this;
}
项目:lumber-mill    文件:KinesisConsumerBootstrap.java   
public KinesisConsumerBootstrap(KinesisClientLibConfiguration kinesisCfg,
                                UnitOfWorkListener unitOfWorkListener,
                                ExceptionStrategy exceptionStrategy,
                                Metrics metricsCallback,
                                boolean dry) {
    this.kinesisCfg = kinesisCfg;
    this.unitOfWorkListener = unitOfWorkListener;
    this.exceptionStrategy = exceptionStrategy;
    this.metricsCallback = metricsCallback;
    this.dry = dry;
    String httpsProxy = System.getenv("https_proxy");
    if (StringUtils.isNotEmpty(httpsProxy)) {
        URI proxy = URI.create(httpsProxy);
        kinesisCfg.getKinesisClientConfiguration().setProxyHost(proxy.getHost());
        kinesisCfg.getKinesisClientConfiguration().setProxyPort(proxy.getPort());
        kinesisCfg.getDynamoDBClientConfiguration().setProxyHost(proxy.getHost());
        kinesisCfg.getDynamoDBClientConfiguration().setProxyPort(proxy.getPort());
        kinesisCfg.getCloudWatchClientConfiguration().setProxyHost(proxy.getHost());
        kinesisCfg.getCloudWatchClientConfiguration().setProxyPort(proxy.getPort());
    }
}
项目:ingestion-service    文件:KafkaKinesisIntegrationTest.java   
private void startKinesisConsumer() throws Exception {
    AWSCredentialsProvider credentialsProvider = new
            DefaultAWSCredentialsProviderChain();

    String region = "eu-west-1";
    logger.info("Starting in Region " + region);

    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

    KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
            this.getClass().getName(), TestConstants.stream, credentialsProvider, workerId)
            .withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName(region);

    IRecordProcessorFactory recordProcessorFactory = new
            RecordFactory();
    worker = new Worker(recordProcessorFactory,
            kinesisClientLibConfiguration);

    es = Executors.newSingleThreadExecutor();
    es.execute(worker);
}
项目:spark-cstar-canaries    文件:JKinesisReceiver.java   
public JKinesisReceiver(String applicationName, String streamName,
                        String endpointUrl, String regionName,
                        Duration checkpoint, InitialPositionInStream position) {
    super(StorageLevel.MEMORY_ONLY_SER());

    this.workerId = getHostname() + ":" + String.valueOf(UUID.randomUUID());
    this.checkpointInterval = checkpoint;
    this.initialPosition = position;

    Region region = RegionUtils.getRegion(regionName);

    try {
        this.kclConfig = new KinesisClientLibConfiguration(applicationName, streamName,
                                                  getCredsProvider(),
                                                  workerId)
                        .withCommonClientConfig(CLIENT_CONF)
                        .withRegionName(region.getName())
                        .withKinesisEndpoint(endpointUrl)
                        .withInitialPositionInStream(InitialPositionInStream.LATEST)
                        .withTaskBackoffTimeMillis(500);
    } catch (Exception ex) {
        // do absolutely nothing - and feel good about it!
        // but ...
        // we'd do something meaningful in a PROD context
    }
}
项目:samza    文件:KinesisConfig.java   
/**
 * Get KCL config for a given system stream.
 * @param system name of the system
 * @param stream name of the stream
 * @param appName name of the application
 * @return Stream scoped KCL configs required to build
 *         {@link KinesisClientLibConfiguration}
 */
public KinesisClientLibConfiguration getKinesisClientLibConfig(String system, String stream, String appName) {
  ClientConfiguration clientConfig = getAWSClientConfig(system);
  String workerId = appName + "-" + UUID.randomUUID();
  InitialPositionInStream startPos = InitialPositionInStream.LATEST;
  AWSCredentialsProvider provider = credentialsProviderForStream(system, stream);
  KinesisClientLibConfiguration kinesisClientLibConfiguration =
      new KinesisClientLibConfiguration(appName, stream, provider, workerId)
          .withRegionName(getRegion(system, stream).getName())
          .withKinesisClientConfig(clientConfig)
          .withCloudWatchClientConfig(clientConfig)
          .withDynamoDBClientConfig(clientConfig)
          .withInitialPositionInStream(startPos)
          .withCallProcessRecordsEvenForEmptyRecordList(true); // For health monitoring metrics.
  // First, get system scoped configs for KCL and override with configs set at stream scope.
  setKinesisClientLibConfigs(
      subset(String.format(CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, system)), kinesisClientLibConfiguration);
  setKinesisClientLibConfigs(subset(String.format(CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, system, stream)),
      kinesisClientLibConfiguration);
  return kinesisClientLibConfiguration;
}
项目:amazon-kinesis-aggregators    文件:CSVAggregatorFactory.java   
public static final StreamAggregator newInstance(String streamName, String appName,
        KinesisClientLibConfiguration config, String namespace, List<TimeHorizon> timeHorizons,
        AggregatorType aggregatorType, String delimiter, List<Integer> labelIndicies,
        String labelAttributeAlias, int dateIndex, String dateFormat, String dateAlias,
        List<Object> summaryIndicies) throws Exception {
    StringDataExtractor dataExtractor = new CsvDataExtractor(labelIndicies).withDelimiter(
            delimiter).withDateValueIndex(dateIndex).withDateFormat(dateFormat).withSummaryIndicies(
            summaryIndicies);
    dataExtractor.setAggregatorType(aggregatorType);
    if (labelAttributeAlias != null && !labelAttributeAlias.equals("")) {
        dataExtractor.withLabelAttributeAlias(labelAttributeAlias);
    }
    if (dateAlias != null && !dateAlias.equals("")) {
        dataExtractor.withDateAttributeAlias(dateAlias);
    }
    return new StreamAggregator(streamName, appName, namespace, config, dataExtractor).withTimeHorizon(
            timeHorizons).withAggregatorType(aggregatorType);
}
项目:micro-genie    文件:KinesisConsumer.java   
/***
 * 
 * All input parameters are required 
    * 
 * @param topic
 * @param config
 * @param client
 * @param dynamoClient
 * @param cloudwatchClient
 */
public KinesisConsumer(final String topic, 
        final KinesisClientLibConfiguration config, 
        final AmazonKinesisClient client, 
        final AmazonDynamoDBClient dynamoClient, 
        final AmazonCloudWatchClient cloudwatchClient, 
        final ObjectMapper mapper){

    this.topic =  Preconditions.checkNotNull(topic, "A valid kinesis topic is required");
    this.config = Preconditions.checkNotNull(config, "KinesisClientLibConfiguration is required");
    this.client = Preconditions.checkNotNull(client, "AmazonKinesisClient is required");
    this.dynamoClient = Preconditions.checkNotNull(dynamoClient, "AmazonDynamoDBClient is required");
    this.cloudwatchClient = Preconditions.checkNotNull(cloudwatchClient, "AmazonCloudWatchClient is required");

    this.mapper = Preconditions.checkNotNull(mapper, "ObjectMapper is required");
}
项目:Surf    文件:Util.java   
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{
    Executor executor = Executors.newCachedThreadPool();
    Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor);

    disruptor.handleEventsWith(handler);
    RingBuffer<KinesisEvent> buffer = disruptor.start();

    Properties props = new Properties();
    props.load(new FileReader(conf));
    // Generate a unique worker ID
    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
    String accessid = props.getProperty("aws-access-key-id");
    String secretkey = props.getProperty("aws-secret-key");
    String streamname = props.getProperty("aws-kinesis-stream-name");
    BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey);
    CredProvider credprovider = new CredProvider(creds);
    KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname,  credprovider, workerId);

    Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory());
    return worker;
}
项目:aws-kinesis-zombies    文件:Consumer.java   
private void initKinesis() {
    String pid = ManagementFactory.getRuntimeMXBean().getName();
    pid = pid.indexOf('@') == -1 ? pid : pid.substring(0, pid.indexOf('@'));
    log.info("Creating kinesis consumer with pid {}.", pid);
    KinesisClientLibConfiguration config
            = new KinesisClientLibConfiguration(
                    "Zombies" /* aplication name */,
                    streamName,
                    new DefaultAWSCredentialsProviderChain(),
                    "ZombieConsumer_" + pid /* worker id*/)
            .withRegionName(region)
            .withFailoverTimeMillis(1000 * 30) // after 30 seconds this worker is considered ko                        
            .withMaxLeasesForWorker(2) // forced to read only 1 shard for demo reasons.
            .withMaxRecords(500) // max records per GetRecords
            .withCallProcessRecordsEvenForEmptyRecordList(false) // no records -> no processing
            .withInitialLeaseTableWriteCapacity(10) // Dynamodb lease table capacity
            .withInitialLeaseTableReadCapacity(10)
            .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

    final Worker worker
            = new Worker.Builder()
            .recordProcessorFactory(zombieRecordFactory)
            .config(config)
            .build();

    new Thread() {
        @Override
        public void run() {
            worker.run();
        }
    }.start();
}
项目:dynamodb-streams-kafka    文件:KafkaDynamoStreamAdapter.java   
public void run() {
    streamId = enableStreamForTable(dynamoDBClient, StreamViewType.NEW_IMAGE, sourceTable);
    workerConfig = new KinesisClientLibConfiguration(KCL_APP_NAME, streamId, credentialsProvider, KCL_WORKER_NAME)
        .withMaxRecords(maxRecordsPerRead)
        .withIdleTimeBetweenReadsInMillis(idleTimeBetweenReads)
        .withInitialPositionInStream(initialStreamPosition );
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    workerThread = new Thread(worker);
    workerThread.start();
}
项目:samza    文件:KinesisConfig.java   
private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) {
  for (Entry<String, String> entry : config.entrySet()) {
    boolean found = false;
    String key = entry.getKey();
    String value = entry.getValue();
    if (StringUtils.isEmpty(value)) {
      continue;
    }
    for (Method method : KinesisClientLibConfiguration.class.getMethods()) {
      if (method.getName().equals("with" + key)) {
        found = true;
        Class<?> type = method.getParameterTypes()[0];
        try {
          if (type == long.class) {
            method.invoke(kinesisLibConfig, Long.valueOf(value));
          } else if (type == int.class) {
            method.invoke(kinesisLibConfig, Integer.valueOf(value));
          } else if (type == boolean.class) {
            method.invoke(kinesisLibConfig, Boolean.valueOf(value));
          } else if (type == String.class) {
            method.invoke(kinesisLibConfig, value);
          } else if (type == InitialPositionInStream.class) {
            method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase()));
          }
          LOG.info("Loaded property " + key + " = " + value);
          break;
        } catch (Exception e) {
          throw new IllegalArgumentException(
              String.format("Error trying to set field %s with the value '%s'", key, value), e);
        }
      }
    }
    if (!found) {
      LOG.warn("Property " + key + " ignored as there is no corresponding set method");
    }
  }
}
项目:samza    文件:TestKinesisConfig.java   
@Test
public void testKclConfigs() {
  Map<String, String> kv = new HashMap<>();
  String system = "kinesis";
  String stream = "kinesis-stream";
  String systemConfigPrefix = String.format("systems.%s.", system);

  // region config is required for setting kcl config.
  kv.put(systemConfigPrefix + "aws.region", "us-east-1");

  // Kcl Configs
  kv.put(systemConfigPrefix + "aws.kcl.TableName", "sample-table");
  kv.put(systemConfigPrefix + "aws.kcl.MaxRecords", "100");
  kv.put(systemConfigPrefix + "aws.kcl.CallProcessRecordsEvenForEmptyRecordList", "true");
  kv.put(systemConfigPrefix + "aws.kcl.InitialPositionInStream", "TRIM_HORIZON");
  // override one of the Kcl configs for kinesis-stream1
  kv.put(systemConfigPrefix + "streams.kinesis-stream1.aws.kcl.InitialPositionInStream", "LATEST");

  Config config = new MapConfig(kv);
  KinesisConfig kConfig = new KinesisConfig(config);
  KinesisClientLibConfiguration kclConfig = kConfig.getKinesisClientLibConfig(system, stream, "sample-app");

  assertEquals("sample-table", kclConfig.getTableName());
  assertEquals(100, kclConfig.getMaxRecords());
  assertTrue(kclConfig.shouldCallProcessRecordsEvenForEmptyRecordList());
  assertEquals(InitialPositionInStream.TRIM_HORIZON, kclConfig.getInitialPositionInStream());

  // verify if the overriden config is applied for kinesis-stream1
  kclConfig = kConfig.getKinesisClientLibConfig(system, "kinesis-stream1", "sample-app");
  assertEquals(InitialPositionInStream.LATEST, kclConfig.getInitialPositionInStream());
}
项目:koupler    文件:KinesisEventConsumer.java   
public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) {
    KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile);

    InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition);

    KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName,
            new DefaultAWSCredentialsProviderChain(), appName)
                    .withRegionName(config.getRegion())
                    .withInitialPositionInStream(position);

    this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig);
}
项目:datacollector    文件:KinesisSource.java   
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) {
  KinesisClientLibConfiguration kclConfig =
      new KinesisClientLibConfiguration(
          conf.applicationName,
          conf.streamName,
          credentials,
          getWorkerId()
      );

  kclConfig
      .withMaxRecords(maxBatchSize)
      .withCallProcessRecordsEvenForEmptyRecordList(false)
      .withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads)
      .withInitialPositionInStream(conf.initialPositionInStream)
      .withKinesisClientConfig(clientConfiguration);

  if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) {
    kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp));
  }

  if (conf.region == AWSRegions.OTHER) {
    kclConfig.withKinesisEndpoint(conf.endpoint);
  } else {
    kclConfig.withRegionName(conf.region.getLabel());
  }

  return new Worker.Builder()
      .recordProcessorFactory(recordProcessorFactory)
      .metricsFactory(metricsFactory)
      .dynamoDBClient(dynamoDBClient)
      .cloudWatchClient(cloudWatchClient)
      .execService(executor)
      .config(kclConfig)
      .build();
}
项目:awsbigdata    文件:KinesisServer.java   
public static void main(String[] args) throws Exception {
       java.security.Security.setProperty("networkaddress.cache.ttl" , "60");

       Server server = new Server();
       ServerConnector connector = new ServerConnector(server);
       connector.setPort(80);
       server.addConnector(connector);

       ResourceHandler resource_handler = new ResourceHandler();
       resource_handler.setResourceBase(".");
       ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
       context.setContextPath("/");
       ServletHolder holderEvents = new ServletHolder("ws-events", MessageProxyServlet.class);
       context.addServlet(holderEvents, "/kinesisapp/*");

       HandlerList handlers = new HandlerList();
       handlers.setHandlers(new Handler[] { resource_handler, context, new DefaultHandler() });
       server.setHandler(handlers);        
       server.start();

       AWSCredentialsProvider credentialsProvider = new EnvironmentVariableCredentialsProvider();
       KinesisClientLibConfiguration kinesisConfig = new KinesisClientLibConfiguration(
            System.getProperty("kinesisapp.name"), 
            System.getProperty("kinesisapp.stream"), 
            credentialsProvider, Long.toString(System.currentTimeMillis()))
            .withKinesisEndpoint(System.getProperty("kinesisapp.endpoint"));

       IRecordProcessorFactory factory = new Factory();
       Worker worker = new Worker(factory, kinesisConfig);
       worker.run();

       /*
    Processor p = new Processor();
    p.test();
    */
}
项目:amazon-kinesis-aggregators    文件:ObjectAggregatorFactory.java   
/**
 * Create a new Aggregator for Object Serialised Data based upon a Class
 * which is configured using Annotations from the base class.
 * 
 * @param streamName The Stream Name that the Aggregator is receiving data
 *        from.
 * @param appName The Application Name that an Aggregator is part of.
 * @param config The Kinesis Client Library Configuration to inherit
 *        credentials and connectivity to the database from.
 * @param clazz The annotated class to use for configuration of the
 *        aggregator
 * @return A Stream Aggregator which can process object serialised data
 * @throws Exception
 */
public static final StreamAggregator newInstance(String streamName, String appName,
        KinesisClientLibConfiguration config, Class clazz) throws Exception {
    AnnotationProcessor p = new AnnotationProcessor(clazz);
    ObjectExtractor dataExtractor = new ObjectExtractor(p.getLabelMethodNames(), clazz).withDateMethod(p.getDateMethodName());

    dataExtractor.withSummaryConfig(p.getSummaryConfig());
    //dataExtractor.withSummaryMethods(new ArrayList<>(p.getSummaryMethods().keySet()));

    StreamAggregator agg = new StreamAggregator(streamName, appName, p.getNamespace(), config,
            dataExtractor).withTimeHorizon(p.getTimeHorizon()).withAggregatorType(p.getType()).withRaiseExceptionOnDataExtractionErrors(
            p.shouldFailOnDataExtractionErrors());

    // configure metrics service on the aggregator if it's been
    // configured
    if (p.shouldEmitMetrics()
            || (p.getMetricsEmitter() != null && !p.getMetricsEmitter().equals(
                    CloudWatchMetricsEmitter.class))) {
        if (p.getMetricsEmitter() != null) {
            agg.withMetricsEmitter(p.getMetricsEmitter().newInstance());
        } else {
            agg.withCloudWatchMetrics();
        }
    }

    // create a new instance of the Data Store if one has been
    // configured. Currently we only support pluggable data stores that
    // are configured via their environment or have self defined
    // configuration models: only no args public constructors can be
    // called
    if (p.getDataStore() != null && !p.getDataStore().equals(DynamoDataStore.class)) {
        agg.withDataStore((IDataStore) p.getDataStore().newInstance());
    }

    return agg;
}
项目:amazon-kinesis-aggregators    文件:StreamAggregator.java   
public StreamAggregator(String streamName, String applicationName,
        String namespace, KinesisClientLibConfiguration config,
        IDataExtractor dataExtractor) {
    this.streamName = streamName;
    this.applicationName = applicationName;
    this.namespace = namespace;
    this.config = config;
    this.dataExtractor = dataExtractor;
}
项目:micro-genie    文件:KinesisEventFactory.java   
/****
 * Create subscriber if it has not already been created
 */
@Override
public synchronized  Subscriber createSubscriber(final String clientId, final String topic) {

    final String clientIdToUse = Strings.isNullOrEmpty(clientId)? DEFAULT_CLIENT_ID : clientId;
    Subscriber subscriber = this.subscribers.get(topic);
    if(subscriber == null){
        LOGGER.debug("creating kinsis subscriber for topic {} - clientId: {}", topic, clientIdToUse);
        final KinesisClientLibConfiguration config = createConsumerConfig(clientIdToUse, topic);
        subscriber = new KinesisConsumer(topic, config, this.kinesisClient, this.dynamoDbClient, this.cloudwatchClient, this.mapper);
        this.subscribers.put(topic, subscriber);
    }
    return subscriber;
}
项目:micro-genie    文件:KinesisEventFactory.java   
/***
 * Create consumer configurations
 */
private KinesisClientLibConfiguration createConsumerConfig(final String clientId, final String topic) {

    final String kinesisApplication = String.format(WORKER_ID_TEMPLATE, topic, clientId);
    final KinesisClientLibConfiguration clientConfig = 
            new KinesisClientLibConfiguration(kinesisApplication, topic, new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
    return clientConfig;        
}
项目:Surf    文件:KinesisSource.java   
@Override
public void open(VDSConfiguration ctx) throws Exception {
    _logger.info("Initializing Kinesis Source");
    Executor executor = Executors.newCachedThreadPool();
    Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor);
    disruptor.handleEventsWith(new EventHandler<KinesisEvent>() {
        @Override
        public void onEvent(KinesisEvent kinesisEvent, long l, boolean b) throws Exception {
            _logger.debug("Received a Kinesis Event");
            _eventList.put(kinesisEvent);
            _logger.debug("eventlist size is now {}", _eventList.size());
        }
    });
    RingBuffer<KinesisEvent> buffer = disruptor.start();

    // Generate a unique worker ID
    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
    String accessid = ctx.getString("aws-access-key-id");
    String secretkey = ctx.getString("aws-secret-key");
    String streamname = ctx.getString("aws-kinesis-stream-name");
    String appName = ctx.optString("application-name", System.getProperty("nodename"));
    BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey);
    CredProvider credprovider = new CredProvider(creds);
    KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname,  credprovider, workerId)
            .withMaxRecords(MAX_EVENTS);

    _worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory());
    new Thread(_worker).start();
}
项目:aws-kinesis-beanstalk-workers    文件:ManagedConsumer.java   
public void configure() throws Exception {
    if (!isConfigured) {
        validateConfig();

        try {
            String userAgent = "AWSKinesisManagedConsumer/" + this.version;

            if (this.positionInStream != null) {
                streamPosition = InitialPositionInStream.valueOf(this.positionInStream);
            } else {
                streamPosition = InitialPositionInStream.LATEST;
            }

            // append the environment name to the application name
            if (environmentName != null) {
                appName = String.format("%s-%s", appName, environmentName);
            }

            // ensure the JVM will refresh the cached IP values of AWS
            // resources
            // (e.g. service endpoints).
            java.security.Security.setProperty("networkaddress.cache.ttl", "60");

            String workerId = NetworkInterface.getNetworkInterfaces() + ":" + UUID.randomUUID();
            LOG.info("Using Worker ID: " + workerId);

            // obtain credentials using the default provider chain or the
            // credentials provider supplied
            AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain()
                    : this.credentialsProvider;

            LOG.info("Using credentials with Access Key ID: "
                    + credentialsProvider.getCredentials().getAWSAccessKeyId());

            config = new KinesisClientLibConfiguration(appName, streamName,
                    credentialsProvider, workerId).withInitialPositionInStream(streamPosition).withKinesisEndpoint(
                    kinesisEndpoint);

            config.getKinesisClientConfiguration().setUserAgent(userAgent);

            if (regionName != null) {
                Region region = Region.getRegion(Regions.fromName(regionName));
                config.withRegionName(region.getName());
            }

            if (this.maxRecords != -1)
                config.withMaxRecords(maxRecords);

            if (this.positionInStream != null)
                config.withInitialPositionInStream(InitialPositionInStream.valueOf(this.positionInStream));

            LOG.info(String.format(
                    "Amazon Kinesis Managed Client prepared for %s on %s in %s (%s) using %s Max Records",
                    config.getApplicationName(), config.getStreamName(),
                    config.getRegionName(), config.getWorkerIdentifier(),
                    config.getMaxRecords()));

            isConfigured = true;
        } catch (Exception e) {
            throw new InvalidConfigurationException(e);
        }
    }
}
项目:lumber-mill    文件:KCL.java   
public static KCL create (KinesisClientLibConfiguration configuration) {
    return new KCL (configuration);
}
项目:lumber-mill    文件:KCL.java   
private KCL (KinesisClientLibConfiguration configuration) {
    this.config = configuration;
}
项目:amazon-kinesis-aggregators    文件:AggregatorConsumer.java   
public void configure() throws Exception {
    if (!isConfigured) {
        validateConfig();

        if (this.positionInStream != null) {
            streamPosition = InitialPositionInStream
                    .valueOf(this.positionInStream);
        } else {
            streamPosition = InitialPositionInStream.LATEST;
        }

        // append the environment name to the application name
        if (environmentName != null) {
            appName = String.format("%s-%s", appName, environmentName);
        }

        // ensure the JVM will refresh the cached IP values of AWS resources
        // (e.g. service endpoints).
        java.security.Security
                .setProperty("networkaddress.cache.ttl", "60");

        String workerId = NetworkInterface.getNetworkInterfaces() + ":"
                + UUID.randomUUID();
        LOG.info("Using Worker ID: " + workerId);

        // obtain credentials using the default provider chain or the
        // credentials provider supplied
        AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain()
                : this.credentialsProvider;

        LOG.info("Using credentials with Access Key ID: "
                + credentialsProvider.getCredentials().getAWSAccessKeyId());

        config = new KinesisClientLibConfiguration(appName, streamName,
                credentialsProvider, workerId).withInitialPositionInStream(
                streamPosition).withKinesisEndpoint(kinesisEndpoint);

        config.getKinesisClientConfiguration().setUserAgent(
                StreamAggregator.AWSApplication);

        if (regionName != null) {
            Region region = Region.getRegion(Regions.fromName(regionName));
            config.withRegionName(region.getName());
        }

        if (maxRecords != -1)
            config.withMaxRecords(maxRecords);

        // initialise the Aggregators
        aggGroup = buildAggregatorsFromConfig();

        LOG.info(String
                .format("Amazon Kinesis Aggregators Managed Client prepared for %s on %s in %s (%s) using %s Max Records",
                        config.getApplicationName(),
                        config.getStreamName(), config.getRegionName(),
                        config.getWorkerIdentifier(),
                        config.getMaxRecords()));

        isConfigured = true;
    }
}
项目:amazon-kinesis-aggregators    文件:RegexAggregatorFactory.java   
/**
 * Factory Method which generates a Regular Expression based Aggregator for
 * a number of Time Horizons
 * 
 * @param streamName The name of the Stream to aggregate against.
 * @param appName The application name to associate with the aggregator.
 * @param config The Kinesis Configuration used for the containing worker.
 * @param namespace The namespace to associate with the aggregated data.
 * @param timeHorizon The time horizons on which to aggregate data.
 * @param aggregatorType The type of aggregator to create.
 * @param regularExpression The regular expression used to extract data from
 *        the Kinesis Stream via Character Classes
 * @param labelIndicies The index of the extracted data to be used as the
 *        aggregation label
 * @param dateIndex The index of the extracted data to be used as the time
 *        value
 * @param dateFormat The format of the data which represents the event time
 *        when shipped as a String
 * @param summaryIndicies The indicies or Summary Expressions on indicies
 *        which contain summary values to be aggregated
 * @return
 * @throws Exception
 */
public static final StreamAggregator newInstance(String streamName, String appName,
        KinesisClientLibConfiguration config, String namespace, List<TimeHorizon> timeHorizons,
        AggregatorType aggregatorType, String regularExpression, List<Integer> labelIndicies,
        String labelAttributeAlias, int dateIndex, String dateFormat, String dateAlias,
        List<Object> summaryIndicies) throws Exception {
    StringDataExtractor dataExtractor = new RegexDataExtractor(regularExpression, labelIndicies).withDateValueIndex(
            dateIndex).withDateFormat(dateFormat).withSummaryIndicies(summaryIndicies);
    dataExtractor.setAggregatorType(aggregatorType);

    if (labelAttributeAlias != null && !labelAttributeAlias.equals("")) {
        dataExtractor.withLabelAttributeAlias(labelAttributeAlias);
    }
    if (dateAlias != null && !dateAlias.equals("")) {
        dataExtractor.withDateAttributeAlias(dateAlias);
    }
    return new StreamAggregator(streamName, appName, namespace, config, dataExtractor).withTimeHorizon(
            timeHorizons).withAggregatorType(aggregatorType);
}
项目:aws-big-data-blog    文件:ManagedConsumer.java   
public void configure() throws Exception {
    if (!isConfigured) {
        validateConfig();

        try {
            String userAgent = "AWSKinesisManagedConsumer/" + this.version;

            if (this.positionInStream != null) {
                streamPosition = InitialPositionInStream.valueOf(this.positionInStream);
            } else {
                streamPosition = InitialPositionInStream.LATEST;
            }

            // append the environment name to the application name
            if (environmentName != null) {
                appName = String.format("%s-%s", appName, environmentName);
            }

            // ensure the JVM will refresh the cached IP values of AWS
            // resources
            // (e.g. service endpoints).
            java.security.Security.setProperty("networkaddress.cache.ttl", "60");

            String workerId = NetworkInterface.getNetworkInterfaces() + ":" + UUID.randomUUID();
            LOG.info("Using Worker ID: " + workerId);

            // obtain credentials using the default provider chain or the
            // credentials provider supplied
            AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain()
                    : this.credentialsProvider;

            LOG.info("Using credentials with Access Key ID: "
                    + credentialsProvider.getCredentials().getAWSAccessKeyId());

            config = new KinesisClientLibConfiguration(appName, streamName,
                    credentialsProvider, workerId).withInitialPositionInStream(streamPosition).withKinesisEndpoint(
                    kinesisEndpoint);

            config.getKinesisClientConfiguration().setUserAgent(userAgent);

            if (regionName != null) {
                Region region = Region.getRegion(Regions.fromName(regionName));
                config.withRegionName(region.getName());
            }

            if (this.maxRecords != -1)
                config.withMaxRecords(maxRecords);

            if (this.positionInStream != null)
                config.withInitialPositionInStream(InitialPositionInStream.valueOf(this.positionInStream));

            LOG.info(String.format(
                    "Amazon Kinesis Managed Client prepared for %s on %s in %s (%s) using %s Max Records",
                    config.getApplicationName(), config.getStreamName(),
                    config.getRegionName(), config.getWorkerIdentifier(),
                    config.getMaxRecords()));

            isConfigured = true;
        } catch (Exception e) {
            throw new InvalidConfigurationException(e);
        }
    }
}
项目:aws-big-data-blog    文件:AmazonDynamoDBStreamstoIgnite.java   
public void run() throws Exception {
    adapterClient = new AmazonDynamoDBStreamsAdapterClient(new ClientConfiguration());
    adapterClient.setEndpoint(streamsEndpoint);
    dynamoDBClient = new AmazonDynamoDBClient(new ClientConfiguration());
    dynamoDBClient.setEndpoint(dynamodbEndpoint);

    cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration());

    TcpDiscoverySpi spi = new TcpDiscoverySpi();
    TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
    List<String> hostList = Arrays.asList(Properties.getString("hostList").split(","));
    ipFinder.setAddresses(hostList);
    spi.setIpFinder(ipFinder);
    IgniteConfiguration cfg = new IgniteConfiguration();
    cfg.setDiscoverySpi(spi);
    cfg.setClientMode(true);
    cfg.setPeerClassLoadingEnabled(true);

    @SuppressWarnings("unused")
    Ignite ignite = Ignition.start(cfg);
    cache = Ignition.ignite().cache(Properties.getString("cacheName"));
    LOG.info(">>> cache acquired");

    recordProcessorFactory = new StreamsRecordProcessorFactory(cache);
    workerConfig = new KinesisClientLibConfiguration(Properties.getString("applicationName"), streamArn,
            streamsCredentials, "ddbstreamsworker")
                    .withMaxRecords(Integer.parseInt(Properties.getString("maxRecords")))
                    .withInitialPositionInStream(
                            InitialPositionInStream.valueOf(Properties.getString("initialPositionInStream")));

    LOG.info("Creating worker for stream: " + streamArn);
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    LOG.info("Starting worker...");

    int exitCode = 0;
    try {
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.");
        t.printStackTrace();
        exitCode = 1;
    }
    System.exit(exitCode);
}
项目:aws-dynamodb-examples    文件:StreamsAdapterDemo.java   
/**
 * @param args
 */
public static void main(String[] args) throws Exception {
    System.out.println("Starting demo...");

    String srcTable = tablePrefix + "-src";
    String destTable = tablePrefix + "-dest";
    streamsCredentials = new ProfileCredentialsProvider();
    dynamoDBCredentials = new ProfileCredentialsProvider();
    recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable);


    /* ===== REQUIRED =====
     * Users will have to explicitly instantiate and configure the adapter, then pass it to
     * the KCL worker.
     */
    adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration());
    adapterClient.setEndpoint(streamsEndpoint);

    dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration());
    dynamoDBClient.setEndpoint(dynamodbEndpoint);

    cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration());

    setUpTables();

    workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo",
            streamArn, streamsCredentials, "streams-demo-worker")
        .withMaxRecords(1)
        .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

    System.out.println("Creating worker for stream: " + streamArn);
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    System.out.println("Starting worker...");
    Thread t = new Thread(worker);
    t.start();

    Thread.sleep(25000);
    worker.shutdown();
    t.join();

    if(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems().equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
        System.out.println("Scan result is equal.");
    } else {
        System.out.println("Tables are different!");
    }

    System.out.println("Done.");
    cleanupAndExit(0);
}
项目:amazon-kinesis-aggregators    文件:JsonAggregatorFactory.java   
/**
 * Creates an Aggregator for data that is formatted as JSON Strings on the
 * Kinesis Stream.
 * 
 * @param streamName The Stream Name that the Aggregator is receiving data
 *        from.
 * @param appName The Application Name that an Aggregator is part of.
 * @param config The Kinesis Client Library Configuration to inherit
 *        credentials and connectivity to the database from.
 * @param namespace The namespace used to separate this Aggregator's output
 *        data from other Aggregated data
 * @param timeHorizon The Time Horizon value to use for the granularity of
 *        the Aggregated data
 * @param aggregatorType The type of Aggregator to create. Default is COUNT.
 * @param labelAttributes The attribute name in the JSON document which
 *        should be used as the label value for Aggregation
 * @param dateAttribute The attribute name in the JSON document which should
 *        be used for the time element of the Aggregation. If NULL then the
 *        client receive time will be used.
 * @param dateFormat The format of the dateAttribute, if String based dates
 *        are used. This should follow {@link java.text.SimpleDateFormat}
 *        convention.
 * @param summaryAttributes List of attributes or expressions on attributes
 *        which should be used for summary aggregation.
 * @return A Stream Aggregator which can process JSON data containing the
 *         indicated attributes.
 * @throws Exception
 */
public static final StreamAggregator newInstance(String streamName, String appName,
        KinesisClientLibConfiguration config, String namespace, TimeHorizon timeHorizon,
        AggregatorType aggregatorType, List<String> labelAttributes, String dateAttribute,
        String dateFormat, List<String> summaryAttributes) throws Exception {
    return newInstance(streamName, appName, config, namespace,
            Arrays.asList(new TimeHorizon[] { timeHorizon }), aggregatorType, labelAttributes,
            dateAttribute, dateFormat, summaryAttributes);
}
项目:amazon-kinesis-aggregators    文件:JsonAggregatorFactory.java   
/**
 * Creates an Aggregator for data that is formatted as JSON Strings on the
 * Kinesis Stream.
 * 
 * @param streamName The Stream Name that the Aggregator is receiving data
 *        from.
 * @param appName The Application Name that an Aggregator is part of.
 * @param workerId The worker ID hosting the Aggregator.
 * @param config The Kinesis Client Library Configuration to inherit
 *        credentials and connectivity to the database from.
 * @param namespace The namespace used to separate this Aggregator's output
 *        data from other Aggregated data.
 * @param timeHorizons The list of Time Horizon values to use the
 *        aggregator. Data will be automatically managed at ALL of the
 *        requested granularities using a prefixed namespace on dates.
 * @param aggregatorType The type of Aggregator to create. Default is COUNT.
 * @param labelAttributes The attribute name in the JSON document which
 *        should be used as the label value for Aggregation.
 * @param dateAttribute The attribute name in the JSON document which should
 *        be used for the time element of the Aggregation. If NULL then the
 *        client receive time will be used.
 * @param dateFormat The format of the dateAttribute, if String based dates
 *        are used. This should follow {@link java.text.SimpleDateFormat}
 *        convention.
 * @param summaryAttributes List of attributes or expressions on attributes
 *        which should be used for summary aggregation.
 * @return A Stream Aggregator which can process JSON data containing the
 *         indicated attributes.
 * @throws Exception
 */
public static final StreamAggregator newInstance(String streamName, String appName,
        KinesisClientLibConfiguration config, String namespace, List<TimeHorizon> timeHorizons,
        AggregatorType aggregatorType, List<String> labelAttributes, String dateAttribute,
        String dateFormat, List<String> summaryAttributes) throws Exception {
    IDataExtractor dataExtractor = new JsonDataExtractor(labelAttributes).withDateValueAttribute(
            dateAttribute).withSummaryAttributes(summaryAttributes).withDateFormat(dateFormat);
    dataExtractor.setAggregatorType(aggregatorType);
    return new StreamAggregator(streamName, appName, namespace, config, dataExtractor).withTimeHorizon(
            timeHorizons).withAggregatorType(aggregatorType);
}
项目:amazon-kinesis-aggregators    文件:RegexAggregatorFactory.java   
/**
 * Factory Method which generates a Regular Expression based Aggregator for
 * a single Time Horizon
 * 
 * @param streamName
 * @param appName
 * @param config
 * @param namespace
 * @param timeHorizon
 * @param aggregatorType
 * @param regularExpression
 * @param labelIndicies
 * @param dateIndex
 * @param dateFormat
 * @param summaryIndicies
 * @return
 * @throws Exception
 */
public static final StreamAggregator newInstance(String streamName, String appName,
        KinesisClientLibConfiguration config, String namespace, TimeHorizon timeHorizon,
        AggregatorType aggregatorType, String regularExpression, List<Integer> labelIndicies,
        String labelAttributeAlias, int dateIndex, String dateFormat, String dateAlias,
        List<Object> summaryIndicies) throws Exception {
    return newInstance(streamName, appName, config, namespace,
            Arrays.asList(new TimeHorizon[] { timeHorizon }), aggregatorType,
            regularExpression, labelIndicies, labelAttributeAlias, dateIndex, dateFormat,
            dateAlias, summaryIndicies);
}
项目:amazon-kinesis-aggregators    文件:CSVAggregatorFactory.java   
/**
 * Factory Method to generate a new Aggregator for CSV Data.
 * 
 * @param streamName The name of the Stream to aggregate against.
 * @param appName The application name to associate with the aggregator.
 * @param config The Kinesis Configuration used for the containing worker.
 * @param namespace The namespace to associate with the aggregated data.
 * @param timeHorizon The time horizons on which to aggregate data.
 * @param aggregatorType The type of aggregator to create.
 * @param delimiter The character delimiter for data on the stream.
 * @param labelIndicies The position of the field in the stream data which
 *        should be used to aggregate data.
 * @param dateIndex The position of the field which includes a date item
 *        used to aggregate data by the timeHorizon. Values can be in String
 *        format if dateFormat is supplied, or in epoch seconds.
 * @param dateFormat The format of the date item, if provided as a String
 * @param summaryIndicies The list of field positions, or expressions using
 *        a {@link SummaryCalculation} against the field positions. For
 *        example, simple summaries might have a list of '0,1,2' or when
 *        expressions are used, a list of 'min(0),sum(1),max(2)'.
 * @return Returns a new CSV Aggregator.
 * @throws Exception
 */
public static final StreamAggregator newInstance(String streamName, String appName,
        KinesisClientLibConfiguration config, String namespace, TimeHorizon timeHorizon,
        AggregatorType aggregatorType, String delimiter, List<Integer> labelIndicies,
        String labelAttributeAlias, int dateIndex, String dateFormat, String dateAlias,
        List<Object> summaryIndicies) throws Exception {
    return newInstance(streamName, appName, config, namespace,
            Arrays.asList(new TimeHorizon[] { timeHorizon }), aggregatorType, delimiter,
            labelIndicies, labelAttributeAlias, dateIndex, dateFormat, dateAlias,
            summaryIndicies);
}
项目:amazon-kinesis-aggregators    文件:ObjectAggregatorFactory.java   
/**
 * Create a new Aggregator for data which is object serialised on the stream
 * using Jackson JSON Serialisation.
 * 
 * @param streamName The Stream Name that the Aggregator is receiving data
 *        from.
 * @param appName The Application Name that an Aggregator is part of.
 * @param config The Kinesis Client Library Configuration to inherit
 *        credentials and connectivity to the database from.
 * @param namespace The namespace used to separate this Aggregator's output
 *        data from other Aggregated data
 * @param timeHorizon The Time Horizon value to use for the granularity of
 *        the Aggregated data
 * @param aggregatorType The type of Aggregator to create. Default is COUNT.
 * @param clazz The base class to use as a Transfer Object for the data
 *        stream.
 * @param labelMethods The method on the base class to use to obtain the
 *        label for aggregation.
 * @param dateMethod The method on the object which should be used to
 *        establish the time. If NULL then the client receive time will be
 *        used.
 * @param summaryMethods List of summary method names or expressions to be
 *        used when the AggregatorType is SUM, as secondary aggregated data
 *        points
 * @return A Stream Aggregator which can process object serialised data
 * @throws Exception
 */
public static final StreamAggregator newInstance(String streamName, String appName,
        KinesisClientLibConfiguration config, String namespace, TimeHorizon timeHorizon,
        AggregatorType aggregatorType, Class clazz, List<String> labelMethods,
        String dateMethod, List<String> summaryMethods) throws Exception {
    return newInstance(streamName, appName, config, namespace,
            Arrays.asList(new TimeHorizon[] { timeHorizon }), aggregatorType, clazz,
            labelMethods, dateMethod, summaryMethods);
}
项目:amazon-kinesis-aggregators    文件:ObjectAggregatorFactory.java   
/**
 * Create a new Aggregator for data which is object serialised on the stream
 * using Jackson JSON Serialisation.
 * 
 * @param streamName The Stream Name that the Aggregator is receiving data
 *        from.
 * @param appName The Application Name that an Aggregator is part of.
 * @param config The Kinesis Client Library Configuration to inherit
 *        credentials and connectivity to the database from.
 * @param namespace The namespace used to separate this Aggregator's output
 *        data from other Aggregated data.
 * @param timeHorizons The list of Time Horizon values to use the
 *        aggregator. Data will be automatically managed at ALL of the
 *        requested granularities using a prefixed namespace on dates.
 * @param aggregatorType The type of Aggregator to create. Default is COUNT.
 * @param clazz The base class to use as a Transfer Object for the data
 *        stream.
 * @param labelMethods The methods on the base class to use to obtain the
 *        label for aggregation.
 * @param dateMethod The method on the object which should be used to
 *        establish the time. If NULL then the client receive time will be
 *        used.
 * @param summaryMethods List of summary method names or expressions to be
 *        used when the AggregatorType is SUM, as secondary aggregated data
 *        points.
 * @return A Stream Aggregator which can process object serialised data.
 * @return
 * @throws Exception
 */
public static final StreamAggregator newInstance(String streamName, String appName,
        KinesisClientLibConfiguration config, String namespace, List<TimeHorizon> timeHorizons,
        AggregatorType aggregatorType, Class clazz, List<String> labelMethods,
        String dateMethod, List<String> summaryMethods) throws Exception {
    IDataExtractor dataExtractor = new ObjectExtractor(labelMethods, clazz).withDateMethod(
            dateMethod).withSummaryMethods(summaryMethods);
    dataExtractor.setAggregatorType(aggregatorType);
    return new StreamAggregator(streamName, appName, namespace, config, dataExtractor).withTimeHorizon(
            timeHorizons).withAggregatorType(aggregatorType);
}