public int sendDeleteRequests() throws IllegalArgumentException { if (batchDeleteRequests.isEmpty()) { return 0; } BatchWriteItemRequest batchWriteItemRequest = genBatchWriteItemRequest(); BatchWriteItemResult bathWriteResult = sendBatchWriteRequest(batchWriteItemRequest); int undeletedItemNum = countAndPrintUndeletedItems(bathWriteResult); if(!isRunningOnDDBLocal) { // DDB Local does not support rate limiting tableWriteRateLimiter.adjustRateWithConsumedCapacity(bathWriteResult.getConsumedCapacity()); } int deletedRequest = batchDeleteRequests.size() - undeletedItemNum; totalNumOfItemsDeleted += deletedRequest; PrintHelper.printDeleteProgressInfo(deletedRequest, totalNumOfItemsDeleted); batchDeleteRequests = new ArrayList<WriteRequest>(); return deletedRequest; }
/** * Writes multiple items in batch. * @param items a map of tables->write requests */ protected static void batchWrite(Map<String, List<WriteRequest>> items) { if (items == null || items.isEmpty()) { return; } try { BatchWriteItemResult result = getClient().batchWriteItem(new BatchWriteItemRequest(). withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withRequestItems(items)); if (result == null) { return; } logger.debug("batchWrite(): total {}, cc {}", items.size(), result.getConsumedCapacity()); if (result.getUnprocessedItems() != null && !result.getUnprocessedItems().isEmpty()) { Thread.sleep(1000); logger.warn("{} UNPROCESSED write requests!", result.getUnprocessedItems().size()); batchWrite(result.getUnprocessedItems()); } } catch (Exception e) { logger.error(null, e); } }
private void commitPartial(List<WriteRequest> list) { Timer t = new Timer(); Map<String, List<WriteRequest>> map = new HashMap<>(); map.put(getTenant().getName(), list); BatchWriteItemResult result = m_client.batchWriteItem(new BatchWriteItemRequest(map)); int retry = 0; while(result.getUnprocessedItems().size() > 0) { if(retry == RETRY_SLEEPS.length) throw new RuntimeException("All retries failed"); m_logger.debug("Committing {} unprocessed items, retry: {}", result.getUnprocessedItems().size(), retry + 1); try { Thread.sleep(RETRY_SLEEPS[retry++]); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } result = m_client.batchWriteItem(new BatchWriteItemRequest(result.getUnprocessedItems())); } m_logger.debug("Committed {} writes in {}", list.size(), t); list.clear(); }
public CompletableFuture<BatchWriteItemResult> batchWriteItem(final Map<String, List<WriteRequest>> requestItems) { return asyncExecutor.execute(new Callable<BatchWriteItemResult>() { @Override public BatchWriteItemResult call() throws Exception { return dbExecutor.batchWriteItem(requestItems); } }); }
public CompletableFuture<BatchWriteItemResult> batchWriteItem(final BatchWriteItemRequest batchWriteItemRequest) { return asyncExecutor.execute(new Callable<BatchWriteItemResult>() { @Override public BatchWriteItemResult call() throws Exception { return dbExecutor.batchWriteItem(batchWriteItemRequest); } }); }
public BatchWriteItemResult putBatch(String tableName, Map<String, AttributeValue> item, long maxItemsPerBatch, Reporter reporter) throws UnsupportedEncodingException { int itemSizeBytes = DynamoDBUtil.getItemSizeBytes(item); if (itemSizeBytes > maxItemByteSize) { throw new RuntimeException("Cannot pass items with size greater than " + maxItemByteSize + ". Item with size of " + itemSizeBytes + " was given."); } maxItemsPerBatch = DynamoDBUtil.getBoundedBatchLimit(config, maxItemsPerBatch); BatchWriteItemResult result = null; if (writeBatchMap.containsKey(tableName)) { boolean writeRequestsForTableAtLimit = writeBatchMap.get(tableName).size() >= maxItemsPerBatch; boolean totalSizeOfWriteBatchesOverLimit = writeBatchMapSizeBytes + itemSizeBytes > maxBatchSize; if (writeRequestsForTableAtLimit || totalSizeOfWriteBatchesOverLimit) { result = writeBatch(reporter, itemSizeBytes); } } // writeBatchMap could be cleared from writeBatch() List<WriteRequest> writeBatchList; if (!writeBatchMap.containsKey(tableName)) { writeBatchList = new ArrayList<>((int) maxItemsPerBatch); writeBatchMap.put(tableName, writeBatchList); } else { writeBatchList = writeBatchMap.get(tableName); } writeBatchList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(item))); writeBatchMapSizeBytes += itemSizeBytes; return result; }
@Override public void write(K key, V value) throws IOException { if (value == null) { throw new RuntimeException("Null record encountered. At least the key columns must be " + "specified."); } verifyInterval(); if (progressable != null) { progressable.progress(); } DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value); BatchWriteItemResult result = client.putBatch(tableName, item.getItem(), permissibleWritesPerSecond - writesPerSecond, reporter); batchSize++; totalItemsWritten++; if (result != null) { if (result.getConsumedCapacity() != null) { for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) { double consumedUnits = consumedCapacity.getCapacityUnits(); totalIOPSConsumed += consumedUnits; } } int unprocessedItems = 0; for (List<WriteRequest> requests : result.getUnprocessedItems().values()) { unprocessedItems += requests.size(); } writesPerSecond += batchSize - unprocessedItems; batchSize = unprocessedItems; } }
/** * Writes to DynamoDBTable using an exponential backoff. If the * batchWriteItem returns unprocessed items then it will exponentially * backoff and retry the unprocessed items. */ public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) { BatchWriteItemResult writeItemResult = null; List<ConsumedCapacity> consumedCapacities = new LinkedList<ConsumedCapacity>(); Map<String, List<WriteRequest>> unprocessedItems = null; boolean interrupted = false; try { do { writeItemResult = client.batchWriteItem(req); unprocessedItems = writeItemResult.getUnprocessedItems(); consumedCapacities .addAll(writeItemResult.getConsumedCapacity()); if (unprocessedItems != null) { req.setRequestItems(unprocessedItems); try { Thread.sleep(exponentialBackoffTime); } catch (InterruptedException ie) { interrupted = true; } finally { exponentialBackoffTime *= 2; if (exponentialBackoffTime > BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME) { exponentialBackoffTime = BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME; } } } } while (unprocessedItems != null && unprocessedItems.get(tableName) != null); return consumedCapacities; } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
protected BatchWriteItemResult sendBatchWriteRequest(BatchWriteItemRequest batchWriteItemRequest) { try { return dynamoDBClient.batchWriteItem(batchWriteItemRequest); } catch (AmazonServiceException ase) { throw new IllegalArgumentException("Error: Failed to delete " + ase.getMessage()); } catch (IllegalArgumentException iae) { throw new IllegalArgumentException("Error: Invalid argument: " + iae.getMessage()); } }
protected int countAndPrintUndeletedItems(BatchWriteItemResult batchWriteResult) { if (!batchWriteResult.getUnprocessedItems().isEmpty()) { logger.warn("WARNING: UNPROCESSED ITEMS:"); List<WriteRequest> unprocessedRequests = (batchWriteResult.getUnprocessedItems()).get(tableName); int count = 0; for (WriteRequest w : unprocessedRequests) { PrintHelper.printItem(0, count++, w.getDeleteRequest().getKey()); } return unprocessedRequests.size(); } return 0; }
@Test public void testCountAndPrintUndeletedItems() { BatchWriteItemResult mockBatchWriteResult = Mockito.mock(BatchWriteItemResult.class); HashMap<String, List<WriteRequest>> mockUnProcessedItems = Mockito.mock(HashMap.class); Mockito.when(mockBatchWriteResult.getUnprocessedItems()).thenReturn(mockUnProcessedItems); Mockito.when(mockUnProcessedItems.isEmpty()).thenReturn(true); tableWriter.countAndPrintUndeletedItems(mockBatchWriteResult); Mockito.verify(mockUnProcessedItems, Mockito.times(1)).isEmpty(); Mockito.verify(mockUnProcessedItems, Mockito.times(0)).get(Mockito.anyString()); }
@Test public void test_batchWriteItem_WithAllParameters() throws Exception { createTable(); String TEST_ATTRIBUTE_2 = "Attribute2"; String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2"; Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); List<WriteRequest> writeRequests = new ArrayList<WriteRequest>(); Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>(); item1.put(TEST_ATTRIBUTE, new AttributeValue() .withS(TEST_ATTRIBUTE_VALUE)); WriteRequest writeRequest1 = new WriteRequest() .withPutRequest(new PutRequest() .withItem(item1)); writeRequests.add(writeRequest1); Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>(); item2.put(TEST_ATTRIBUTE_2, new AttributeValue() .withS(TEST_ATTRIBUTE_VALUE_2)); WriteRequest writeRequest2 = new WriteRequest() .withPutRequest(new PutRequest() .withItem(item2)); writeRequests.add(writeRequest2); requestItems.put(TEST_TABLE_NAME, writeRequests); BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems); List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity(); assertThat(consumedCapacities.size(), equalTo(writeRequests.size())); }
public BatchWritePointResult batchWritePoints(List<PutPointRequest> putPointRequests) { BatchWriteItemRequest batchItemRequest = new BatchWriteItemRequest(); List<WriteRequest> writeRequests = new ArrayList<WriteRequest>(); for (PutPointRequest putPointRequest : putPointRequests) { long geohash = S2Manager.generateGeohash(putPointRequest.getGeoPoint()); long hashKey = S2Manager.generateHashKey(geohash, config.getHashKeyLength()); String geoJson = GeoJsonMapper.stringFromGeoObject(putPointRequest.getGeoPoint()); PutRequest putRequest = putPointRequest.getPutRequest(); AttributeValue hashKeyValue = new AttributeValue().withN(String.valueOf(hashKey)); putRequest.getItem().put(config.getHashKeyAttributeName(), hashKeyValue); putRequest.getItem().put(config.getRangeKeyAttributeName(), putPointRequest.getRangeKeyValue()); AttributeValue geohashValue = new AttributeValue().withN(Long.toString(geohash)); putRequest.getItem().put(config.getGeohashAttributeName(), geohashValue); AttributeValue geoJsonValue = new AttributeValue().withS(geoJson); putRequest.getItem().put(config.getGeoJsonAttributeName(), geoJsonValue); WriteRequest writeRequest = new WriteRequest(putRequest); writeRequests.add(writeRequest); } Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); requestItems.put(config.getTableName(), writeRequests); batchItemRequest.setRequestItems(requestItems); BatchWriteItemResult batchWriteItemResult = config.getDynamoDBClient().batchWriteItem(batchItemRequest); BatchWritePointResult batchWritePointResult = new BatchWritePointResult(batchWriteItemResult); return batchWritePointResult; }
public BatchWriteItemResult batchWriteItem(final Map<String, List<WriteRequest>> requestItems) { return dynamoDB.batchWriteItem(requestItems); }
public BatchWriteItemResult batchWriteItem(final BatchWriteItemRequest batchWriteItemRequest) { return dynamoDB.batchWriteItem(batchWriteItemRequest); }
public BatchWriteItemResult batchWriteItem(final BatchWriteItemRequest batchRequest) throws BackendException { int count = 0; for (Entry<String, List<WriteRequest>> entry : batchRequest.getRequestItems().entrySet()) { final String tableName = entry.getKey(); final List<WriteRequest> requests = entry.getValue(); count += requests.size(); if (count > BATCH_WRITE_MAX_NUMBER_OF_ITEMS) { throw new IllegalArgumentException("cant have more than 25 requests in a batchwrite"); } for (final WriteRequest request : requests) { if ((request.getPutRequest() != null) == (request.getDeleteRequest() != null)) { throw new IllegalArgumentException("Exactly one of PutRequest or DeleteRequest must be set in each WriteRequest in a batch write operation"); } final int wcu; final String apiName; if (request.getPutRequest() != null) { apiName = PUT_ITEM; final int bytes = calculateItemSizeInBytes(request.getPutRequest().getItem()); wcu = computeWcu(bytes); } else { //deleterequest apiName = DELETE_ITEM; wcu = estimateCapacityUnits(apiName, tableName); } timedWriteThrottle(apiName, tableName, wcu); } } BatchWriteItemResult result; setUserAgent(batchRequest); final Timer.Context apiTimerContext = getTimerContext(BATCH_WRITE_ITEM, null /*tableName*/); try { result = client.batchWriteItem(batchRequest); } catch (Exception e) { throw processDynamoDbApiException(e, BATCH_WRITE_ITEM, null /*tableName*/); } finally { apiTimerContext.stop(); } if (result.getConsumedCapacity() != null) { for (ConsumedCapacity ccu : result.getConsumedCapacity()) { meterConsumedCapacity(BATCH_WRITE_ITEM, ccu); } } return result; }
private static void writeMultipleItemsBatchWrite() { try { // Create a map for the requests in the batch Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); // Create a PutRequest for a new Forum item Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>(); forumItem.put("Name", new AttributeValue().withS("Amazon RDS")); forumItem.put("Threads", new AttributeValue().withN("0")); List<WriteRequest> forumList = new ArrayList<WriteRequest>(); forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem))); requestItems.put(table1Name, forumList); // Create a PutRequest for a new Thread item Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>(); threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS")); threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1")); threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message")); threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory"))); List<WriteRequest> threadList = new ArrayList<WriteRequest>(); threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem))); // Create a DeleteRequest for a Thread item Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3")); threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100")); threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey))); requestItems.put(table2Name, threadList); BatchWriteItemResult result; BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); do { System.out.println("Making the request."); batchWriteItemRequest.withRequestItems(requestItems); result = client.batchWriteItem(batchWriteItemRequest); // Print consumed capacity units for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) { String tableName = consumedCapacity.getTableName(); Double consumedCapacityUnits = consumedCapacity.getCapacityUnits(); System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits); } // Check for unprocessed keys which could happen if you exceed provisioned throughput System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems()); requestItems = result.getUnprocessedItems(); } while (result.getUnprocessedItems().size() > 0); } catch (AmazonServiceException ase) { System.err.println("Failed to retrieve items: "); ase.printStackTrace(System.err); } }
public BatchWritePointResult(BatchWriteItemResult batchWriteItemResult) { this.batchWriteItemResult = batchWriteItemResult; }
public BatchWriteItemResult getBatchWriteItemResult() { return batchWriteItemResult; }