Java 类com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome 实例源码

项目:java-persistence    文件:DdbIndex.java   
@Override
public void putItems(T... items) {
    if ( null == items || 0 == items.length ) return;
    for ( int chunk=0; chunk < items.length; chunk += DDB_MAX_BATCH_WRITE_ITEM ) {
        TableWriteItems request = new TableWriteItems(_tableName);
        int max = Math.min(items.length-chunk, DDB_MAX_BATCH_WRITE_ITEM);
        for ( int idx=0; idx < max; idx++ ) {
            request.addItemToPut(_encryption.encrypt(toItem(items[chunk+idx])));
        }
        BatchWriteItemOutcome response = maybeBackoff(false, () ->
                                                      _dynamodb.batchWriteItem(request));
        while ( true ) {
            if ( null == response.getUnprocessedItems() ) break;
            List<WriteRequest> unprocessed = response.getUnprocessedItems().get(_tableName);
            if ( null == unprocessed || unprocessed.size() == 0 ) {
                resetPTE(null);
                break;
            }
            if(LOG.isDebugEnabled())
                LOG.debug("putItems() unprocessed: "+unprocessed.size());
            gotPTE(false);
            try {
                Thread.sleep(backoffSleep(false));
            } catch ( InterruptedException ex ) {
                Thread.currentThread().interrupt();
                throw new AbortedException(ex);
            }
            Map<String,List<WriteRequest>> unproc = response.getUnprocessedItems();
            response = maybeBackoff(false, () ->
                                    _dynamodb.batchWriteItemUnprocessed(unproc));
        }
    }
}
项目:java-persistence    文件:DdbIndex.java   
@Override
public void deleteItems(IndexKey... keys) {
    if ( null == keys || 0 == keys.length ) return;
    for ( int chunk=0; chunk < keys.length; chunk += DDB_MAX_BATCH_WRITE_ITEM ) {
        TableWriteItems request = new TableWriteItems(_tableName);
        int max = Math.min(keys.length-chunk, DDB_MAX_BATCH_WRITE_ITEM);
        for ( int idx=0; idx < max; idx++ ) {
            IndexKey key = keys[chunk+idx];
            if ( null == key ) continue;
            request.addPrimaryKeyToDelete(toPrimaryKey(key));
        }
        BatchWriteItemOutcome response = maybeBackoff(false, () ->
                                                      _dynamodb.batchWriteItem(request));
        while ( true ) {
            if ( null == response.getUnprocessedItems() ) break;
            List<WriteRequest> unprocessed = response.getUnprocessedItems().get(_tableName);
            if ( null == unprocessed || unprocessed.size() == 0 ) {
                resetPTE(null);
                break;
            }
            if(LOG.isDebugEnabled())
                LOG.debug("deleteItems() unprocessed: "+unprocessed.size());
            gotPTE(false);
            try {
                Thread.sleep(backoffSleep(false));
            } catch ( InterruptedException ex ) {
                Thread.currentThread().interrupt();
                throw new AbortedException(ex);
            }
            Map<String,List<WriteRequest>> unproc = response.getUnprocessedItems();
            response = maybeBackoff(false, () ->
                                    _dynamodb.batchWriteItemUnprocessed(unproc));
        }
    }
}
项目:fiware-cygnus    文件:DynamoDBBackendImpl.java   
@Override
public void putItems(String tableName, ArrayList<Item> aggregation) throws Exception {
    try {
        TableWriteItems tableWriteItems = new TableWriteItems(tableName);
        tableWriteItems.withItemsToPut(aggregation);
        BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems);
    } catch (Exception e) {
        LOGGER.error("Error while putting a batch of items in the table " + tableName
                + ". Details=" + e.getMessage());
    } // try catch
}
项目:aws-dynamodb-examples    文件:DocumentAPIBatchWrite.java   
private static void writeMultipleItemsBatchWrite() {
    try {                    

        // Add a new item to Forum
        TableWriteItems forumTableWriteItems = new TableWriteItems(forumTableName) //Forum
            .withItemsToPut(new Item()
                .withPrimaryKey("Name", "Amazon RDS")
                .withNumber("Threads", 0));

        // Add a new item, and delete an existing item, from Thread
        TableWriteItems threadTableWriteItems = new TableWriteItems(threadTableName)
        .withItemsToPut(new Item()
            .withPrimaryKey("ForumName","Amazon RDS","Subject","Amazon RDS Thread 1")
            .withString("Message", "ElasticCache Thread 1 message")
            .withStringSet("Tags", new HashSet<String>(
                Arrays.asList("cache", "in-memory"))))
        .withHashAndRangeKeysToDelete("ForumName","Subject", "Amazon S3", "S3 Thread 100");

        System.out.println("Making the request.");
        BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(forumTableWriteItems, threadTableWriteItems);

        do {

            // Check for unprocessed keys which could happen if you exceed provisioned throughput

            Map<String, List<WriteRequest>> unprocessedItems = outcome.getUnprocessedItems();

            if (outcome.getUnprocessedItems().size() == 0) {
                System.out.println("No unprocessed items found");
            } else {
                System.out.println("Retrieving the unprocessed items");
                outcome = dynamoDB.batchWriteItemUnprocessed(unprocessedItems);
            }

        } while (outcome.getUnprocessedItems().size() > 0);

    }  catch (Exception e) {
        System.err.println("Failed to retrieve items: ");
        e.printStackTrace(System.err);
    }  

}