private void storeResult(final Map<String, Object> result) { final String userId = (String) result.get("user_id"); final String userToken = (String) result.get("access_token"); final String teamId = (String) result.get("team_id"); @SuppressWarnings("unchecked") final Map<String, Object> bot = (Map<String, Object>) result.get("bot"); final String botToken = (String) bot.get("bot_access_token"); final String botId = (String) bot.get("bot_user_id"); final ArrayList<PutRequest> putRequests = new ArrayList<>(); putRequests.add(item("user:" + userId + ":token", userToken)); putRequests.add(item("user:" + botId + ":token", botToken)); putRequests.add(item("team:" + teamId + ":botuser", botId)); final List<WriteRequest> writeRequests = putRequests.stream() .map(WriteRequest::new) .collect(Collectors.toList()); final HashMap<String, List<WriteRequest>> requestItems = new HashMap<>(); requestItems.put(TableName, writeRequests); final BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest().withRequestItems(requestItems); ddb.batchWriteItem(batchWriteItemRequest); }
/** * calls splitResultIntoBatches to turn the SegmentedScanResult into several * BatchWriteItemRequests and then submits them as individual jobs to the * ExecutorService. */ @Override public Future<Void> writeResult(SegmentedScanResult result) { Future<Void> jobSubmission = null; List<BatchWriteItemRequest> batches = splitResultIntoBatches( result.getScanResult(), tableName); Iterator<BatchWriteItemRequest> batchesIterator = batches.iterator(); while (batchesIterator.hasNext()) { try { jobSubmission = exec .submit(new DynamoDBConsumerWorker(batchesIterator .next(), client, rateLimiter, tableName)); } catch (NullPointerException npe) { throw new NullPointerException( "Thread pool not initialized for LogStashExecutor"); } } return jobSubmission; }
public int sendDeleteRequests() throws IllegalArgumentException { if (batchDeleteRequests.isEmpty()) { return 0; } BatchWriteItemRequest batchWriteItemRequest = genBatchWriteItemRequest(); BatchWriteItemResult bathWriteResult = sendBatchWriteRequest(batchWriteItemRequest); int undeletedItemNum = countAndPrintUndeletedItems(bathWriteResult); if(!isRunningOnDDBLocal) { // DDB Local does not support rate limiting tableWriteRateLimiter.adjustRateWithConsumedCapacity(bathWriteResult.getConsumedCapacity()); } int deletedRequest = batchDeleteRequests.size() - undeletedItemNum; totalNumOfItemsDeleted += deletedRequest; PrintHelper.printDeleteProgressInfo(deletedRequest, totalNumOfItemsDeleted); batchDeleteRequests = new ArrayList<WriteRequest>(); return deletedRequest; }
/** * Writes multiple items in batch. * @param items a map of tables->write requests */ protected static void batchWrite(Map<String, List<WriteRequest>> items) { if (items == null || items.isEmpty()) { return; } try { BatchWriteItemResult result = getClient().batchWriteItem(new BatchWriteItemRequest(). withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withRequestItems(items)); if (result == null) { return; } logger.debug("batchWrite(): total {}, cc {}", items.size(), result.getConsumedCapacity()); if (result.getUnprocessedItems() != null && !result.getUnprocessedItems().isEmpty()) { Thread.sleep(1000); logger.warn("{} UNPROCESSED write requests!", result.getUnprocessedItems().size()); batchWrite(result.getUnprocessedItems()); } } catch (Exception e) { logger.error(null, e); } }
private void commitPartial(List<WriteRequest> list) { Timer t = new Timer(); Map<String, List<WriteRequest>> map = new HashMap<>(); map.put(getTenant().getName(), list); BatchWriteItemResult result = m_client.batchWriteItem(new BatchWriteItemRequest(map)); int retry = 0; while(result.getUnprocessedItems().size() > 0) { if(retry == RETRY_SLEEPS.length) throw new RuntimeException("All retries failed"); m_logger.debug("Committing {} unprocessed items, retry: {}", result.getUnprocessedItems().size(), retry + 1); try { Thread.sleep(RETRY_SLEEPS[retry++]); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } result = m_client.batchWriteItem(new BatchWriteItemRequest(result.getUnprocessedItems())); } m_logger.debug("Committed {} writes in {}", list.size(), t); list.clear(); }
public CompletableFuture<BatchWriteItemResult> batchWriteItem(final BatchWriteItemRequest batchWriteItemRequest) { return asyncExecutor.execute(new Callable<BatchWriteItemResult>() { @Override public BatchWriteItemResult call() throws Exception { return dbExecutor.batchWriteItem(batchWriteItemRequest); } }); }
/** * @param tableName * @param requests */ private void sendBatch(final String tableName, List<WriteRequest> requests) { int numBatches = (int) Math.ceil(requests.size() / (BATCH_SIZE * 1D)); for (int i = 0; i < numBatches; i++) { Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); List<WriteRequest> batch = requests.subList(i * BATCH_SIZE, Math.min(i * BATCH_SIZE + BATCH_SIZE, requests.size())); requestItems.put(tableName, batch); addItemsToTable(tableName, new BatchWriteItemRequest().withRequestItems(requestItems)); } }
/** * Callable class that when called will try to write a batch to a DynamoDB * table. If the write returns unprocessed items it will exponentially back * off until it succeeds. */ public DynamoDBConsumerWorker(BatchWriteItemRequest batchWriteItemRequest, AmazonDynamoDBClient client, RateLimiter rateLimiter, String tableName) { this.batch = batchWriteItemRequest; this.client = client; this.rateLimiter = rateLimiter; this.tableName = tableName; this.exponentialBackoffTime = BootstrapConstants.INITIAL_RETRY_TIME_MILLISECONDS; }
/** * Writes to DynamoDBTable using an exponential backoff. If the * batchWriteItem returns unprocessed items then it will exponentially * backoff and retry the unprocessed items. */ public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) { BatchWriteItemResult writeItemResult = null; List<ConsumedCapacity> consumedCapacities = new LinkedList<ConsumedCapacity>(); Map<String, List<WriteRequest>> unprocessedItems = null; boolean interrupted = false; try { do { writeItemResult = client.batchWriteItem(req); unprocessedItems = writeItemResult.getUnprocessedItems(); consumedCapacities .addAll(writeItemResult.getConsumedCapacity()); if (unprocessedItems != null) { req.setRequestItems(unprocessedItems); try { Thread.sleep(exponentialBackoffTime); } catch (InterruptedException ie) { interrupted = true; } finally { exponentialBackoffTime *= 2; if (exponentialBackoffTime > BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME) { exponentialBackoffTime = BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME; } } } } while (unprocessedItems != null && unprocessedItems.get(tableName) != null); return consumedCapacities; } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
/** * Splits up a ScanResult into a list of BatchWriteItemRequests of size 25 * items or less each. */ public static List<BatchWriteItemRequest> splitResultIntoBatches( ScanResult result, String tableName) { List<BatchWriteItemRequest> batches = new LinkedList<BatchWriteItemRequest>(); Iterator<Map<String, AttributeValue>> it = result.getItems().iterator(); BatchWriteItemRequest req = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); List<WriteRequest> writeRequests = new LinkedList<WriteRequest>(); int i = 0; while (it.hasNext()) { PutRequest put = new PutRequest(it.next()); writeRequests.add(new WriteRequest(put)); i++; if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); req = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); writeRequests = new LinkedList<WriteRequest>(); i = 0; } } if (i > 0) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); } return batches; }
/** * Test that a ScanResult splits into the correct number of batches. */ @Test public void splitResultIntoBatchesTest() { final double numItems = 111.0; String tableName = "test tableName"; ScanResult scanResult = new ScanResult(); List<Map<String, AttributeValue>> items = new LinkedList<Map<String, AttributeValue>>(); for (int i = 0; i < numItems; i++) { Map<String, AttributeValue> sampleScanResult = new HashMap<String, AttributeValue>(); sampleScanResult.put("key", new AttributeValue("attribute value " + i)); items.add(sampleScanResult); } scanResult.setItems(items); SegmentedScanResult result = new SegmentedScanResult(scanResult, 0); replayAll(); List<BatchWriteItemRequest> batches = DynamoDBConsumer .splitResultIntoBatches(result.getScanResult(), tableName); assertEquals(Math.ceil(numItems / BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM), batches.size(), 0.0); verifyAll(); }
protected BatchWriteItemResult sendBatchWriteRequest(BatchWriteItemRequest batchWriteItemRequest) { try { return dynamoDBClient.batchWriteItem(batchWriteItemRequest); } catch (AmazonServiceException ase) { throw new IllegalArgumentException("Error: Failed to delete " + ase.getMessage()); } catch (IllegalArgumentException iae) { throw new IllegalArgumentException("Error: Invalid argument: " + iae.getMessage()); } }
public BatchWritePointResult batchWritePoints(List<PutPointRequest> putPointRequests) { BatchWriteItemRequest batchItemRequest = new BatchWriteItemRequest(); List<WriteRequest> writeRequests = new ArrayList<WriteRequest>(); for (PutPointRequest putPointRequest : putPointRequests) { long geohash = S2Manager.generateGeohash(putPointRequest.getGeoPoint()); long hashKey = S2Manager.generateHashKey(geohash, config.getHashKeyLength()); String geoJson = GeoJsonMapper.stringFromGeoObject(putPointRequest.getGeoPoint()); PutRequest putRequest = putPointRequest.getPutRequest(); AttributeValue hashKeyValue = new AttributeValue().withN(String.valueOf(hashKey)); putRequest.getItem().put(config.getHashKeyAttributeName(), hashKeyValue); putRequest.getItem().put(config.getRangeKeyAttributeName(), putPointRequest.getRangeKeyValue()); AttributeValue geohashValue = new AttributeValue().withN(Long.toString(geohash)); putRequest.getItem().put(config.getGeohashAttributeName(), geohashValue); AttributeValue geoJsonValue = new AttributeValue().withS(geoJson); putRequest.getItem().put(config.getGeoJsonAttributeName(), geoJsonValue); WriteRequest writeRequest = new WriteRequest(putRequest); writeRequests.add(writeRequest); } Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); requestItems.put(config.getTableName(), writeRequests); batchItemRequest.setRequestItems(requestItems); BatchWriteItemResult batchWriteItemResult = config.getDynamoDBClient().batchWriteItem(batchItemRequest); BatchWritePointResult batchWritePointResult = new BatchWritePointResult(batchWriteItemResult); return batchWritePointResult; }
public BatchWriteItemResult batchWriteItem(final BatchWriteItemRequest batchWriteItemRequest) { return dynamoDB.batchWriteItem(batchWriteItemRequest); }
public BatchWriteItemResult batchWriteItem(final BatchWriteItemRequest batchRequest) throws BackendException { int count = 0; for (Entry<String, List<WriteRequest>> entry : batchRequest.getRequestItems().entrySet()) { final String tableName = entry.getKey(); final List<WriteRequest> requests = entry.getValue(); count += requests.size(); if (count > BATCH_WRITE_MAX_NUMBER_OF_ITEMS) { throw new IllegalArgumentException("cant have more than 25 requests in a batchwrite"); } for (final WriteRequest request : requests) { if ((request.getPutRequest() != null) == (request.getDeleteRequest() != null)) { throw new IllegalArgumentException("Exactly one of PutRequest or DeleteRequest must be set in each WriteRequest in a batch write operation"); } final int wcu; final String apiName; if (request.getPutRequest() != null) { apiName = PUT_ITEM; final int bytes = calculateItemSizeInBytes(request.getPutRequest().getItem()); wcu = computeWcu(bytes); } else { //deleterequest apiName = DELETE_ITEM; wcu = estimateCapacityUnits(apiName, tableName); } timedWriteThrottle(apiName, tableName, wcu); } } BatchWriteItemResult result; setUserAgent(batchRequest); final Timer.Context apiTimerContext = getTimerContext(BATCH_WRITE_ITEM, null /*tableName*/); try { result = client.batchWriteItem(batchRequest); } catch (Exception e) { throw processDynamoDbApiException(e, BATCH_WRITE_ITEM, null /*tableName*/); } finally { apiTimerContext.stop(); } if (result.getConsumedCapacity() != null) { for (ConsumedCapacity ccu : result.getConsumedCapacity()) { meterConsumedCapacity(BATCH_WRITE_ITEM, ccu); } } return result; }
protected BatchWriteItemRequest genBatchWriteItemRequest() { Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); requestItems.put(tableName, batchDeleteRequests); return new BatchWriteItemRequest().withRequestItems(requestItems).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); }
private static void writeMultipleItemsBatchWrite() { try { // Begin syntax extract // Create a map for the requests in the batch Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); // Create a PutRequest for a new Forum item Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>(); forumItem.put("Name", new AttributeValue().withS("Amazon RDS")); forumItem.put("Threads", new AttributeValue().withN("0")); List<WriteRequest> forumList = new ArrayList<WriteRequest>(); forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem))); requestItems.put("Forum", forumList); // Create a PutRequest for a new Thread item Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>(); threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS")); threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1")); List<WriteRequest> threadList = new ArrayList<WriteRequest>(); threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem))); // Create a DeleteRequest for a Thread item Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); threadDeleteKey.put("ForumName", new AttributeValue().withS("Some hash attribute value")); threadDeleteKey.put("Subject", new AttributeValue().withS("Some range attribute value")); threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey))); requestItems.put("Thread", threadList); BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest(); System.out.println("Making the request."); batchWriteItemRequest.withRequestItems(requestItems); client.batchWriteItem(batchWriteItemRequest); // End syntax extract } catch (AmazonServiceException ase) { System.err.println("Failed to retrieve items: "); ase.printStackTrace(System.err); } }
private static void writeMultipleItemsBatchWrite() { try { // Create a map for the requests in the batch Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); // Create a PutRequest for a new Forum item Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>(); forumItem.put("Name", new AttributeValue().withS("Amazon RDS")); forumItem.put("Threads", new AttributeValue().withN("0")); List<WriteRequest> forumList = new ArrayList<WriteRequest>(); forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem))); requestItems.put(table1Name, forumList); // Create a PutRequest for a new Thread item Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>(); threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS")); threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1")); threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message")); threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory"))); List<WriteRequest> threadList = new ArrayList<WriteRequest>(); threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem))); // Create a DeleteRequest for a Thread item Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3")); threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100")); threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey))); requestItems.put(table2Name, threadList); BatchWriteItemResult result; BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); do { System.out.println("Making the request."); batchWriteItemRequest.withRequestItems(requestItems); result = client.batchWriteItem(batchWriteItemRequest); // Print consumed capacity units for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) { String tableName = consumedCapacity.getTableName(); Double consumedCapacityUnits = consumedCapacity.getCapacityUnits(); System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits); } // Check for unprocessed keys which could happen if you exceed provisioned throughput System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems()); requestItems = result.getUnprocessedItems(); } while (result.getUnprocessedItems().size() > 0); } catch (AmazonServiceException ase) { System.err.println("Failed to retrieve items: "); ase.printStackTrace(System.err); } }