/** * Private interface for creating tables which handles any instances of * Throttling of the API * * @param dynamoClient * @param dynamoTable * @return * @throws Exception */ public static CreateTableResult safeCreateTable( final AmazonDynamoDB dynamoClient, final CreateTableRequest createTableRequest) throws Exception { CreateTableResult res = null; final int tryMax = 10; int tries = 0; while (true) { try { res = dynamoClient.createTable(createTableRequest); return res; } catch (LimitExceededException le) { if (tries < tryMax) { // back off for 1 second Thread.sleep(1000); tries++; } else { throw le; } } catch (ResourceInUseException rie) { // someone else is trying to create the table while we are, so // return ok return null; } } }
@Override protected void createTable(TableDescription table) { List<GlobalSecondaryIndex> gsis = new ArrayList<>(); List<LocalSecondaryIndex> lsis = new ArrayList<>(); ProvisionedThroughput mainThroughtput = null; List<KeySchemaElement> mainKeySchema = null; Map<String, AttrType> attrTypes = new HashMap<>(); for ( IndexDescription index : table.getIndexes() ) { addAttrType(table.getTableName(), index.getIndexName(), attrTypes, index.getHashKey()); addAttrType(table.getTableName(), index.getIndexName(), attrTypes, index.getRangeKey()); ProvisionedThroughput throughput = new ProvisionedThroughput() .withReadCapacityUnits(index.getReadCapacity()) .withWriteCapacityUnits(index.getWriteCapacity()); switch ( index.getIndexType() ) { case MAIN_INDEX: mainThroughtput = throughput; mainKeySchema = toKeySchema(table.getTableName(), index); break; case LOCAL_SECONDARY_INDEX: lsis.add( new LocalSecondaryIndex() .withProjection( new Projection().withProjectionType(ProjectionType.ALL)) .withIndexName(index.getIndexName()) .withKeySchema(toKeySchema(table.getTableName(), index))); break; case GLOBAL_SECONDARY_INDEX: gsis.add( new GlobalSecondaryIndex() .withIndexName(index.getIndexName()) .withKeySchema(toKeySchema(table.getTableName(), index)) .withProjection( new Projection().withProjectionType(ProjectionType.ALL)) .withProvisionedThroughput(throughput)); break; default: throw new UnsupportedOperationException( "Unsupported indexType="+index.getIndexType()+" for table name "+ table.getTableName()+" index="+index.getIndexName()); } } String tableName = getTableName(table.getTableName()); for ( int retry=0;; retry++ ) { try { _dynamodb.createTable( new CreateTableRequest() .withKeySchema(mainKeySchema) .withProvisionedThroughput(mainThroughtput) .withAttributeDefinitions(toAttributeDefinitions(attrTypes)) .withLocalSecondaryIndexes(lsis.size() == 0 ? null : lsis) .withGlobalSecondaryIndexes(gsis.size() == 0 ? null : gsis) .withTableName(tableName)); break; } catch ( LimitExceededException ex ) { long secs = (retry < 6) ? (long)Math.pow(2, retry) : 60L; LOG.info("Waiting {} seconds to create {} due to: {}", secs, tableName, ex.getMessage()); try { Thread.sleep(1000*secs); } catch ( InterruptedException interruptedEx ) { Thread.currentThread().interrupt(); return; } } } }