@Override public void putItems(T... items) { if ( null == items || 0 == items.length ) return; for ( int chunk=0; chunk < items.length; chunk += DDB_MAX_BATCH_WRITE_ITEM ) { TableWriteItems request = new TableWriteItems(_tableName); int max = Math.min(items.length-chunk, DDB_MAX_BATCH_WRITE_ITEM); for ( int idx=0; idx < max; idx++ ) { request.addItemToPut(_encryption.encrypt(toItem(items[chunk+idx]))); } BatchWriteItemOutcome response = maybeBackoff(false, () -> _dynamodb.batchWriteItem(request)); while ( true ) { if ( null == response.getUnprocessedItems() ) break; List<WriteRequest> unprocessed = response.getUnprocessedItems().get(_tableName); if ( null == unprocessed || unprocessed.size() == 0 ) { resetPTE(null); break; } if(LOG.isDebugEnabled()) LOG.debug("putItems() unprocessed: "+unprocessed.size()); gotPTE(false); try { Thread.sleep(backoffSleep(false)); } catch ( InterruptedException ex ) { Thread.currentThread().interrupt(); throw new AbortedException(ex); } Map<String,List<WriteRequest>> unproc = response.getUnprocessedItems(); response = maybeBackoff(false, () -> _dynamodb.batchWriteItemUnprocessed(unproc)); } } }
@Override public void deleteItems(IndexKey... keys) { if ( null == keys || 0 == keys.length ) return; for ( int chunk=0; chunk < keys.length; chunk += DDB_MAX_BATCH_WRITE_ITEM ) { TableWriteItems request = new TableWriteItems(_tableName); int max = Math.min(keys.length-chunk, DDB_MAX_BATCH_WRITE_ITEM); for ( int idx=0; idx < max; idx++ ) { IndexKey key = keys[chunk+idx]; if ( null == key ) continue; request.addPrimaryKeyToDelete(toPrimaryKey(key)); } BatchWriteItemOutcome response = maybeBackoff(false, () -> _dynamodb.batchWriteItem(request)); while ( true ) { if ( null == response.getUnprocessedItems() ) break; List<WriteRequest> unprocessed = response.getUnprocessedItems().get(_tableName); if ( null == unprocessed || unprocessed.size() == 0 ) { resetPTE(null); break; } if(LOG.isDebugEnabled()) LOG.debug("deleteItems() unprocessed: "+unprocessed.size()); gotPTE(false); try { Thread.sleep(backoffSleep(false)); } catch ( InterruptedException ex ) { Thread.currentThread().interrupt(); throw new AbortedException(ex); } Map<String,List<WriteRequest>> unproc = response.getUnprocessedItems(); response = maybeBackoff(false, () -> _dynamodb.batchWriteItemUnprocessed(unproc)); } } }
@Override public void putItems(String tableName, ArrayList<Item> aggregation) throws Exception { try { TableWriteItems tableWriteItems = new TableWriteItems(tableName); tableWriteItems.withItemsToPut(aggregation); BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(tableWriteItems); } catch (Exception e) { LOGGER.error("Error while putting a batch of items in the table " + tableName + ". Details=" + e.getMessage()); } // try catch }
private static void writeMultipleItemsBatchWrite() { try { // Add a new item to Forum TableWriteItems forumTableWriteItems = new TableWriteItems(forumTableName) //Forum .withItemsToPut(new Item() .withPrimaryKey("Name", "Amazon RDS") .withNumber("Threads", 0)); // Add a new item, and delete an existing item, from Thread TableWriteItems threadTableWriteItems = new TableWriteItems(threadTableName) .withItemsToPut(new Item() .withPrimaryKey("ForumName","Amazon RDS","Subject","Amazon RDS Thread 1") .withString("Message", "ElasticCache Thread 1 message") .withStringSet("Tags", new HashSet<String>( Arrays.asList("cache", "in-memory")))) .withHashAndRangeKeysToDelete("ForumName","Subject", "Amazon S3", "S3 Thread 100"); System.out.println("Making the request."); BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(forumTableWriteItems, threadTableWriteItems); do { // Check for unprocessed keys which could happen if you exceed provisioned throughput Map<String, List<WriteRequest>> unprocessedItems = outcome.getUnprocessedItems(); if (outcome.getUnprocessedItems().size() == 0) { System.out.println("No unprocessed items found"); } else { System.out.println("Retrieving the unprocessed items"); outcome = dynamoDB.batchWriteItemUnprocessed(unprocessedItems); } } while (outcome.getUnprocessedItems().size() > 0); } catch (Exception e) { System.err.println("Failed to retrieve items: "); e.printStackTrace(System.err); } }