private String enableStreamForTable(AmazonDynamoDBClient client, StreamViewType viewType, String tableName) { DescribeTableRequest describeTableRequest = new DescribeTableRequest() .withTableName(tableName); DescribeTableResult describeResult = client.describeTable(describeTableRequest); if (describeResult.getTable().getStreamSpecification().isStreamEnabled()) { //TODO: what if the viewtype doesn't match return describeResult.getTable().getLatestStreamId(); } StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(viewType); UpdateTableRequest updateTableRequest = new UpdateTableRequest() .withTableName(tableName) .withStreamSpecification(streamSpecification); UpdateTableResult result = client.updateTable(updateTableRequest); return result.getTableDescription().getLatestStreamId(); }
public static String enableStreamForTable(AmazonDynamoDBClient client, StreamViewType viewType, String tableName) { DescribeTableRequest describeTableRequest = new DescribeTableRequest() .withTableName(tableName); DescribeTableResult describeResult = client.describeTable(describeTableRequest); if (describeResult.getTable().getStreamSpecification().isStreamEnabled()) { //TODO: what if the viewtype doesn't match return describeResult.getTable().getLatestStreamId(); } StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(viewType); UpdateTableRequest updateTableRequest = new UpdateTableRequest() .withTableName(tableName) .withStreamSpecification(streamSpecification); UpdateTableResult result = client.updateTable(updateTableRequest); return result.getTableDescription().getLatestStreamId(); }
public void run() { streamId = enableStreamForTable(dynamoDBClient, StreamViewType.NEW_IMAGE, sourceTable); workerConfig = new KinesisClientLibConfiguration(KCL_APP_NAME, streamId, credentialsProvider, KCL_WORKER_NAME) .withMaxRecords(maxRecordsPerRead) .withIdleTimeBetweenReadsInMillis(idleTimeBetweenReads) .withInitialPositionInStream(initialStreamPosition ); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); workerThread = new Thread(worker); workerThread.start(); }
public static String createTable(AmazonDynamoDBClient client, String tableName) { java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput() .withReadCapacityUnits(2L).withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest() .withTableName(tableName) .withAttributeDefinitions(attributeDefinitions) .withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput) .withStreamSpecification(streamSpecification); try { System.out.println("Creating table " + tableName); CreateTableResult result = client.createTable(createTableRequest); return result.getTableDescription().getLatestStreamId(); } catch(ResourceInUseException e) { System.out.println("Table already exists."); return describeTable(client, tableName).getTable().getLatestStreamId(); } }
public static void main(String[] args) { dynamodb.createTable(new CreateTableRequest() .withTableName(SCORE_TABLE_NAME) .withKeySchema(new KeySchemaElement(SCORE_ID, KeyType.HASH)) .withAttributeDefinitions(new AttributeDefinition(SCORE_ID, ScalarAttributeType.S)) .withProvisionedThroughput(new ProvisionedThroughput(20L, 20L)) .withStreamSpecification(new StreamSpecification() .withStreamEnabled(true) .withStreamViewType(StreamViewType.NEW_IMAGE)) ); }
/** * @return StreamArn */ public static String createTable(AmazonDynamoDBClient client, String tableName) { java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput() .withReadCapacityUnits(2L).withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest() .withTableName(tableName) .withAttributeDefinitions(attributeDefinitions) .withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput) .withStreamSpecification(streamSpecification); try { System.out.println("Creating table " + tableName); CreateTableResult result = client.createTable(createTableRequest); return result.getTableDescription().getLatestStreamArn(); } catch(ResourceInUseException e) { System.out.println("Table already exists."); return describeTable(client, tableName).getTable().getLatestStreamArn(); } }
public static TableDescription createTable(AmazonDynamoDB client, String tableName) throws InterruptedException { CreateTableRequest tableReq = new CreateTableRequest().withTableName(tableName) .withKeySchema(new KeySchemaElement("Id", KeyType.HASH)) .withAttributeDefinitions(new AttributeDefinition("Id", ScalarAttributeType.N)) .withProvisionedThroughput(new ProvisionedThroughput(10L, 10L)) .withStreamSpecification(new StreamSpecification().withStreamEnabled(true).withStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES)); DynamoDB dynamoDB = new DynamoDB(client); Table table = dynamoDB.createTable(tableReq); return table.waitForActive(); }