/** * executes a page scan with scanExpression, limited by limiter for objects of the type T with the class clazz * and calls action for each object found * * @param clazz class of the objects * @param scanExpression expression for the scan * @param limiter the read limiter limiting the amount of requests * @param action the function to call for each object * @param <T> the type of the objects */ private static <T> void scanPages(Class<T> clazz, DynamoDBScanExpression scanExpression, RateLimiter limiter, Consumer<? super T> action) { // define pageScan and add consumed capacity to scan expression ScanResultPage<T> pageScan; scanExpression.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); // initialize counter, permits and mapper int permitsToConsume = 1; DynamoDBMapper dynamoDBMapper = DBConnector.getInstance().getDynamoDBMapper(); int scanned = 0; int count = 0; do { // acquire permits and scan limiter.acquire(permitsToConsume); pageScan = dynamoDBMapper.scanPage(clazz, scanExpression); // update page scan scanExpression.setExclusiveStartKey(pageScan.getLastEvaluatedKey()); // update stats variables scanned += pageScan.getScannedCount(); count += pageScan.getCount(); // call the action on each result pageScan.getResults().forEach(action); // calculate permits for next scan Double capacityUnits = pageScan.getConsumedCapacity().getCapacityUnits(); permitsToConsume = (int) (capacityUnits - 1); if (permitsToConsume <= 0) permitsToConsume = 1; log.info(String.format("Scanned a page for class %s. Results: %d/%d (%d/%d total). Capacity units consumed: %f", clazz.getSimpleName(), pageScan.getCount(), pageScan.getScannedCount(), count, scanned, capacityUnits)); } while (pageScan.getLastEvaluatedKey() != null); }
public RetryResult<ScanResult> scanTable( String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Integer segment, Integer totalSegments, Map<String, AttributeValue> exclusiveStartKey, long limit, Reporter reporter) { final ScanRequest scanRequest = new ScanRequest(tableName) .withExclusiveStartKey(exclusiveStartKey) .withLimit(Ints.checkedCast(limit)) .withSegment(segment) .withTotalSegments(totalSegments) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); if (dynamoDBQueryFilter != null) { Map<String, Condition> scanFilter = dynamoDBQueryFilter.getScanFilter(); if (!scanFilter.isEmpty()) { scanRequest.setScanFilter(scanFilter); } } RetryResult<ScanResult> retryResult = getRetryDriver().runWithRetry(new Callable<ScanResult>() { @Override public ScanResult call() { log.debug("Executing DynamoDB scan: " + scanRequest); return dynamoDB.scan(scanRequest); } }, reporter, PrintCounter.DynamoDBReadThrottle); return retryResult; }
public RetryResult<QueryResult> queryTable( String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Map<String, AttributeValue> exclusiveStartKey, long limit, Reporter reporter) { final QueryRequest queryRequest = new QueryRequest() .withTableName(tableName) .withExclusiveStartKey(exclusiveStartKey) .withKeyConditions(dynamoDBQueryFilter.getKeyConditions()) .withLimit(Ints.checkedCast(limit)) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); RetryResult<QueryResult> retryResult = getRetryDriver().runWithRetry( new Callable<QueryResult>() { @Override public QueryResult call() { log.debug("Executing DynamoDB query: " + queryRequest); return dynamoDB.query(queryRequest); } }, reporter, PrintCounter.DynamoDBReadThrottle); return retryResult; }
/** * Begins to pipe the log results by parallel scanning the table and the * consumer writing the results. */ public void pipe(final AbstractLogConsumer consumer) throws ExecutionException, InterruptedException { final DynamoDBTableScan scanner = new DynamoDBTableScan(rateLimit, client); final ScanRequest request = new ScanRequest().withTableName(tableName) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .withLimit(BootstrapConstants.SCAN_LIMIT) .withConsistentRead(consistentScan); final ParallelScanExecutor scanService = scanner .getParallelScanCompletionService(request, numSegments, threadPool, section, totalSections); while (!scanService.finished()) { SegmentedScanResult result = scanService.grab(); consumer.writeResult(result); } shutdown(true); consumer.shutdown(true); }
/** * Sends an update request to the service and returns true if the request is successful. */ public boolean sendUpdateRequest(Map<String, AttributeValue> primaryKey, Map<String, AttributeValueUpdate> updateItems, Map<String, ExpectedAttributeValue> expectedItems) throws Exception { if (updateItems.isEmpty()) { return false; // No update, return false } UpdateItemRequest updateItemRequest = new UpdateItemRequest().withTableName(tableName).withKey(primaryKey).withReturnValues(ReturnValue.UPDATED_NEW) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withAttributeUpdates(updateItems); if (expectedItems != null) { updateItemRequest.withExpected(expectedItems); } UpdateItemResult result = dynamoDBClient.updateItem(updateItemRequest); if(!isRunningOnDDBLocal) { // DDB Local does not support rate limiting tableWriteRateLimiter.adjustRateWithConsumedCapacity(result.getConsumedCapacity()); } return true; }
/** * Writes multiple items in batch. * @param items a map of tables->write requests */ protected static void batchWrite(Map<String, List<WriteRequest>> items) { if (items == null || items.isEmpty()) { return; } try { BatchWriteItemResult result = getClient().batchWriteItem(new BatchWriteItemRequest(). withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withRequestItems(items)); if (result == null) { return; } logger.debug("batchWrite(): total {}, cc {}", items.size(), result.getConsumedCapacity()); if (result.getUnprocessedItems() != null && !result.getUnprocessedItems().isEmpty()) { Thread.sleep(1000); logger.warn("{} UNPROCESSED write requests!", result.getUnprocessedItems().size()); batchWrite(result.getUnprocessedItems()); } } catch (Exception e) { logger.error(null, e); } }
/** * Splits up a ScanResult into a list of BatchWriteItemRequests of size 25 * items or less each. */ public static List<BatchWriteItemRequest> splitResultIntoBatches( ScanResult result, String tableName) { List<BatchWriteItemRequest> batches = new LinkedList<BatchWriteItemRequest>(); Iterator<Map<String, AttributeValue>> it = result.getItems().iterator(); BatchWriteItemRequest req = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); List<WriteRequest> writeRequests = new LinkedList<WriteRequest>(); int i = 0; while (it.hasNext()) { PutRequest put = new PutRequest(it.next()); writeRequests.add(new WriteRequest(put)); i++; if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); req = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); writeRequests = new LinkedList<WriteRequest>(); i = 0; } } if (i > 0) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); } return batches; }
@Override public void run() { Map<String, AttributeValue> exclusiveStartKey = null; ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withAttributesToGet(attributesToGet) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withTotalSegments(numOfSegments).withSegment(segmentNum); boolean scanNumLimitReached = false; while (!scanNumLimitReached) { scanRequest.withExclusiveStartKey(exclusiveStartKey); ScanResult scanResult = dynamoDBClient.scan(scanRequest); if(!isRunningOnDDBLocal) { // DDB Local does not support rate limiting tableReadRateLimiter.adjustRateWithConsumedCapacity(scanResult.getConsumedCapacity()); } for (Map<String, AttributeValue> item : scanResult.getItems()) { checkItemViolationAndAddDeleteRequest(item); itemsScanned.addAndGet(1); itemScannedByThread += 1; scanNumLimitReached = isScanNumberLimitReached(); if(scanNumLimitReached) { break; } } if (deleteViolationAfterFound) { sendDeleteViolations(); } PrintHelper.printScanProgress(itemsScanned.get(), itemScannedByThread, violationFoundByThread, violationDeleteByThread); if (null == (exclusiveStartKey = scanResult.getLastEvaluatedKey())) { break; } } return; }
/** * Reads multiple items from DynamoDB, in batch. * @param <P> type of object * @param kna a map of row key->data * @param results a map of ID->ParaObject */ protected static <P extends ParaObject> void batchGet(Map<String, KeysAndAttributes> kna, Map<String, P> results) { if (kna == null || kna.isEmpty() || results == null) { return; } try { BatchGetItemResult result = getClient().batchGetItem(new BatchGetItemRequest(). withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withRequestItems(kna)); if (result == null) { return; } List<Map<String, AttributeValue>> res = result.getResponses().get(kna.keySet().iterator().next()); for (Map<String, AttributeValue> item : res) { P obj = fromRow(item); if (obj != null) { results.put(obj.getId(), obj); } } logger.debug("batchGet(): total {}, cc {}", res.size(), result.getConsumedCapacity()); if (result.getUnprocessedKeys() != null && !result.getUnprocessedKeys().isEmpty()) { Thread.sleep(1000); logger.warn("{} UNPROCESSED read requests!", result.getUnprocessedKeys().size()); batchGet(result.getUnprocessedKeys(), results); } } catch (Exception e) { logger.error(null, e); } }
/** * Reads a page from a standard DynamoDB table. * @param <P> type of object * @param appid the app identifier (name) * @param p a {@link Pager} * @return the last row key of the page, or null. */ public static <P extends ParaObject> List<P> readPageFromTable(String appid, Pager p) { Pager pager = (p != null) ? p : new Pager(); ScanRequest scanRequest = new ScanRequest(). withTableName(getTableNameForAppid(appid)). withLimit(pager.getLimit()). withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); if (!StringUtils.isBlank(pager.getLastKey())) { scanRequest = scanRequest.withExclusiveStartKey(Collections. singletonMap(Config._KEY, new AttributeValue(pager.getLastKey()))); } ScanResult result = getClient().scan(scanRequest); LinkedList<P> results = new LinkedList<>(); for (Map<String, AttributeValue> item : result.getItems()) { P obj = fromRow(item); if (obj != null) { results.add(obj); } } if (result.getLastEvaluatedKey() != null) { pager.setLastKey(result.getLastEvaluatedKey().get(Config._KEY).getS()); } else if (!results.isEmpty()) { // set last key to be equal to the last result - end reached. pager.setLastKey(results.peekLast().getId()); } return results; }
@Override public Map<String, AttributeValue> put( final Map<String, AttributeValueUpdate> attrs) throws IOException { final AmazonDynamoDB aws = this.credentials.aws(); final Attributes expected = this.attributes.only(this.keys); try { final UpdateItemRequest request = new UpdateItemRequest() .withTableName(this.name) .withExpected(expected.asKeys()) .withKey(expected) .withAttributeUpdates(attrs) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .withReturnValues(ReturnValue.UPDATED_NEW); final long start = System.currentTimeMillis(); final UpdateItemResult result = aws.updateItem(request); Logger.info( this, "#put('%s'): updated item to DynamoDB, %s, in %[ms]s", attrs, new PrintableConsumedCapacity( result.getConsumedCapacity() ).print(), System.currentTimeMillis() - start ); return result.getAttributes(); } catch (final AmazonClientException ex) { throw new IOException( String.format( "failed to put %s into \"%s\" with %s", attrs, this.name, this.keys ), ex ); } finally { aws.shutdown(); } }
/** * Makes a GetItemRequest for a given attribute. * @param attr Attribute name * @return GetItemRequest */ private GetItemRequest makeItemRequestFor(final String attr) { final GetItemRequest request = new GetItemRequest(); request.setTableName(this.name); request.setAttributesToGet(Collections.singletonList(attr)); request.setKey(this.attributes.only(this.keys)); request.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); request.setConsistentRead(true); return request; }
@Override @SuppressWarnings("PMD.UseConcurrentHashMap") public void remove() { synchronized (this.dosage) { final AmazonDynamoDB aws = this.credentials.aws(); try { final Dosage prev = this.dosage.get(); final List<Map<String, AttributeValue>> items = new ArrayList<Map<String, AttributeValue>>(prev.items()); final Map<String, AttributeValue> item = items.remove(this.position); final long start = System.currentTimeMillis(); final DeleteItemResult res = aws.deleteItem( new DeleteItemRequest() .withTableName(this.name) .withKey(new Attributes(item).only(this.keys)) .withReturnConsumedCapacity( ReturnConsumedCapacity.TOTAL ) .withExpected( new Attributes(item).only(this.keys).asKeys() ) ); this.dosage.set(new AwsIterator.Fixed(prev, items)); --this.position; Logger.info( this, "#remove(): item #%d removed from DynamoDB, %s, in %[ms]s", this.position, new PrintableConsumedCapacity( res.getConsumedCapacity() ).print(), System.currentTimeMillis() - start ); } finally { aws.shutdown(); } } }
@Override public Item put(final Map<String, AttributeValue> attributes) throws IOException { final AmazonDynamoDB aws = this.credentials.aws(); try { final PutItemRequest request = new PutItemRequest(); request.setTableName(this.self); request.setItem(attributes); request.setReturnValues(ReturnValue.NONE); request.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); final PutItemResult result = aws.putItem(request); final long start = System.currentTimeMillis(); Logger.info( this, "#put('%[text]s'): created item in '%s', %s, in %[ms]s", attributes, this.self, new PrintableConsumedCapacity( result.getConsumedCapacity() ).print(), System.currentTimeMillis() - start ); return new AwsItem( this.credentials, this.frame(), this.self, new Attributes(attributes).only(this.keys()), new Array<String>(this.keys()) ); } catch (final AmazonClientException ex) { throw new IOException( String.format( "failed to put into \"%s\" with %s", this.self, attributes ), ex ); } finally { aws.shutdown(); } }
@Override public void delete(final Map<String, AttributeValue> attributes) throws IOException { final AmazonDynamoDB aws = this.credentials.aws(); try { final DeleteItemRequest request = new DeleteItemRequest(); request.setTableName(this.self); request.setKey(attributes); request.setReturnValues(ReturnValue.NONE); request.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); final DeleteItemResult result = aws.deleteItem(request); final long start = System.currentTimeMillis(); Logger.info( this, "#delete('%[text]s'): deleted item in '%s', %s, in %[ms]s", attributes, this.self, new PrintableConsumedCapacity( result.getConsumedCapacity() ).print(), System.currentTimeMillis() - start ); } catch (final AmazonClientException ex) { throw new IOException( String.format( "failed to delete at \"%s\" by keys %s", this.self, attributes ), ex ); } finally { aws.shutdown(); } }
@Override public Dosage fetch(final Credentials credentials, final String table, final Map<String, Condition> conditions, final Collection<String> keys) throws IOException { final AmazonDynamoDB aws = credentials.aws(); try { final Collection<String> attrs = new HashSet<String>( Arrays.asList(this.attributes) ); attrs.addAll(keys); final ScanRequest request = new ScanRequest() .withTableName(table) .withAttributesToGet(attrs) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .withScanFilter(conditions) .withLimit(this.limit); final long start = System.currentTimeMillis(); final ScanResult result = aws.scan(request); Logger.info( this, "#items(): loaded %d item(s) from '%s' using %s, %s, in %[ms]s", result.getCount(), table, conditions, new PrintableConsumedCapacity( result.getConsumedCapacity() ).print(), System.currentTimeMillis() - start ); return new ScanValve.NextDosage(credentials, request, result); } catch (final AmazonClientException ex) { throw new IOException( String.format( "failed to fetch from \"%s\" by %s and %s", table, conditions, keys ), ex ); } finally { aws.shutdown(); } }
/** * Query Amazon DynamoDB * * @param hashKey * Hash key for the query request. * * @param range * The range of geohashs to query. * * @return The query result. */ public List<QueryResult> queryGeohash(QueryRequest queryRequest, long hashKey, GeohashRange range) { List<QueryResult> queryResults = new ArrayList<QueryResult>(); Map<String, AttributeValue> lastEvaluatedKey = null; do { Map<String, Condition> keyConditions = new HashMap<String, Condition>(); Condition hashKeyCondition = new Condition().withComparisonOperator(ComparisonOperator.EQ) .withAttributeValueList(new AttributeValue().withN(String.valueOf(hashKey))); keyConditions.put(config.getHashKeyAttributeName(), hashKeyCondition); AttributeValue minRange = new AttributeValue().withN(Long.toString(range.getRangeMin())); AttributeValue maxRange = new AttributeValue().withN(Long.toString(range.getRangeMax())); Condition geohashCondition = new Condition().withComparisonOperator(ComparisonOperator.BETWEEN) .withAttributeValueList(minRange, maxRange); keyConditions.put(config.getGeohashAttributeName(), geohashCondition); queryRequest.withTableName(config.getTableName()).withKeyConditions(keyConditions) .withIndexName(config.getGeohashIndexName()).withConsistentRead(true) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withExclusiveStartKey(lastEvaluatedKey); QueryResult queryResult = config.getDynamoDBClient().query(queryRequest); queryResults.add(queryResult); lastEvaluatedKey = queryResult.getLastEvaluatedKey(); } while (lastEvaluatedKey != null); return queryResults; }
protected UpdateItemRequest createUpdateItemRequest() { return new UpdateItemRequest() .withTableName(tableName) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); }
protected GetItemRequest createGetItemRequest() { return new GetItemRequest() .withTableName(tableName) .withConsistentRead(forceConsistentRead) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); }
protected DeleteItemRequest createDeleteItemRequest() { return new DeleteItemRequest() .withTableName(tableName) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); }
protected QueryRequest createQueryRequest() { return new QueryRequest() .withTableName(tableName) .withConsistentRead(forceConsistentRead) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); }
protected ScanRequest createScanRequest() { return new ScanRequest().withTableName(tableName) .withConsistentRead(forceConsistentRead) .withLimit(client.scanLimit(tableName)) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); }
protected BatchWriteItemRequest genBatchWriteItemRequest() { Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); requestItems.put(tableName, batchDeleteRequests); return new BatchWriteItemRequest().withRequestItems(requestItems).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); }
@Override public Object handleRequest(SegmentScannerInput input, Context context) { context.getLogger().log("Input: " + input.toJson() + "\n"); context.getLogger().log("Start scanning segment " + input.getSegment() + "\n"); DynamoDB dynamodb = new DynamoDB(Regions.US_WEST_2); // update tracking table in DynamoDB stating that we're in progress dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item().withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_IN_PROGRESS)); ScanSpec scanSpec = new ScanSpec() .withMaxPageSize(MAX_PAGE_SIZE) .withSegment(input.getSegment()) .withTotalSegments(input.getTotalSegments()) .withConsistentRead(true) .withMaxResultSize(MAX_RESULT_SIZE) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); // if resuming an in-progress segment, specify the start key here if (input.getStartScore() != null) { scanSpec.withExclusiveStartKey(SCORE_ID, input.getStartScore()); } RateLimiter rateLimiter = RateLimiter.create(input.getMaxConsumedCapacity()); Map<String, AttributeValue> lastEvaluatedKey = null; Table scoresTable = dynamodb.getTable(SCORE_TABLE_NAME); for (Page<Item, ScanOutcome> scanResultPage : scoresTable.scan(scanSpec).pages()) { // process items for (Item item : scanResultPage) { DataTransformer.HIGH_SCORES_BY_DATE_TRANSFORMER.transform(item, dynamodb); } /* * After reading each page, we acquire the consumed capacity from * the RateLimiter. * * For more information on using RateLimiter with DynamoDB scans, * see "Rate Limited Scans in Amazon DynamoDB" * on the AWS Java Development Blog: * https://java.awsblog.com/post/Tx3VAYQIZ3Q0ZVW */ ScanResult scanResult = scanResultPage.getLowLevelResult().getScanResult(); lastEvaluatedKey = scanResult.getLastEvaluatedKey(); double consumedCapacity = scanResult.getConsumedCapacity().getCapacityUnits(); rateLimiter.acquire((int)Math.round(consumedCapacity)); // forego processing additional pages if we're running out of time if (context.getRemainingTimeInMillis() < REMAINING_TIME_CUTOFF) { break; } } if (lastEvaluatedKey != null && !lastEvaluatedKey.isEmpty()) { Entry<String, AttributeValue> entry = lastEvaluatedKey.entrySet() .iterator().next(); String lastScoreId = entry.getValue().getS(); dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item() .withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_INCOMPLETE) .withString(LAST_SCORE_ID, lastScoreId)); return false; } // update tracking table in DynamoDB stating that we're done dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item().withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_DONE)); context.getLogger().log("Finish scanning segment " + input.getSegment() + "\n"); return true; }
private void writeToDynamo() throws RetryFailedException, InterruptedException { Map<String, AttributeValue> attributeValueMap = indexInput.getAttributes().entrySet() .stream() .filter((entry) -> entry.getValue() != null) .filter((entry) -> StringUtils.isNotBlank(entry.getValue().toString())) .collect(Collectors.toMap( e -> e.getKey().toString(), e -> { try { switch (e.getKey().getType()) { case LONG: case DOUBLE: return new AttributeValue().withN(e.getValue().toString()); default: String serializedData = null; // use objectMapper for everything but strings because it // adds quotes to raw strings if (e.getValue().getClass() != String.class) { serializedData = objectMapper.writeValueAsString(e.getValue()); } else { serializedData = (String) e.getValue(); } return new AttributeValue().withS(serializedData); } } catch (Exception ex) { logger.error("Couldn't serialize data to index.", ex); return null; } } )); String tableName = config.getString(TABLE_NAME_CONFIG_KEY); PutItemRequest putItemRequest = new PutItemRequest() .withTableName(tableName) .withItem(attributeValueMap) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); final PutItemResult putItemResult = retry(() -> dynamoDBClient.putItem(putItemRequest)); logger.info("Consumed table capacity: " + putItemResult.getConsumedCapacity().getCapacityUnits()); }
private static void writeMultipleItemsBatchWrite() { try { // Create a map for the requests in the batch Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); // Create a PutRequest for a new Forum item Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>(); forumItem.put("Name", new AttributeValue().withS("Amazon RDS")); forumItem.put("Threads", new AttributeValue().withN("0")); List<WriteRequest> forumList = new ArrayList<WriteRequest>(); forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem))); requestItems.put(table1Name, forumList); // Create a PutRequest for a new Thread item Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>(); threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS")); threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1")); threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message")); threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory"))); List<WriteRequest> threadList = new ArrayList<WriteRequest>(); threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem))); // Create a DeleteRequest for a Thread item Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3")); threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100")); threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey))); requestItems.put(table2Name, threadList); BatchWriteItemResult result; BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); do { System.out.println("Making the request."); batchWriteItemRequest.withRequestItems(requestItems); result = client.batchWriteItem(batchWriteItemRequest); // Print consumed capacity units for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) { String tableName = consumedCapacity.getTableName(); Double consumedCapacityUnits = consumedCapacity.getCapacityUnits(); System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits); } // Check for unprocessed keys which could happen if you exceed provisioned throughput System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems()); requestItems = result.getUnprocessedItems(); } while (result.getUnprocessedItems().size() > 0); } catch (AmazonServiceException ase) { System.err.println("Failed to retrieve items: "); ase.printStackTrace(System.err); } }
@Override public Dosage fetch(final Credentials credentials, final String table, final Map<String, Condition> conditions, final Collection<String> keys) throws IOException { final AmazonDynamoDB aws = credentials.aws(); try { final Collection<String> attrs = new HashSet<String>( Arrays.asList(this.attributes) ); attrs.addAll(keys); QueryRequest request = new QueryRequest() .withTableName(table) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .withKeyConditions(conditions) .withConsistentRead(this.consistent) .withScanIndexForward(this.forward) .withSelect(this.select) .withLimit(this.limit); if (this.select.equals(Select.SPECIFIC_ATTRIBUTES.toString())) { request = request.withAttributesToGet(attrs); } if (!this.index.isEmpty()) { request = request.withIndexName(this.index); } final long start = System.currentTimeMillis(); final QueryResult result = aws.query(request); Logger.info( this, "#items(): loaded %d item(s) from '%s' using %s, %s, in %[ms]s", result.getCount(), table, conditions, new PrintableConsumedCapacity( result.getConsumedCapacity() ).print(), System.currentTimeMillis() - start ); return new QueryValve.NextDosage(credentials, request, result); } catch (final AmazonClientException ex) { throw new IOException( String.format( "failed to fetch from \"%s\" by %s and %s", table, conditions, keys ), ex ); } finally { aws.shutdown(); } }