private ScanResult getHashNumberRangeKeyItems(String[] hashKeys, String hashType) { List<Map<String, AttributeValue>> items = new ArrayList<>(); for (String key : hashKeys) { for (Integer i = 0; i < NUM_RANGE_KEYS_PER_HASH_KEY; i++) { Map<String, AttributeValue> item = new HashMap<>(); if (hashType.equals("S")) { item.put("hashKey", new AttributeValue(key)); } else { item.put("hashKey", new AttributeValue().withN(key)); } item.put("rangeKey", new AttributeValue().withN("0" + i.toString())); items.add(item); } } return new ScanResult().withScannedCount(items.size()).withItems(items).withConsumedCapacity (new ConsumedCapacity().withCapacityUnits(1d)); }
@Test public void execute() { Map<String, Condition> scanFilter = new HashMap<String, Condition>(); Condition condition = new Condition() .withComparisonOperator(ComparisonOperator.GT.toString()) .withAttributeValueList(new AttributeValue().withN("1985")); scanFilter.put("year", condition); exchange.getIn().setHeader(DdbConstants.SCAN_FILTER, scanFilter); command.execute(); Map<String, AttributeValue> mapAssert = new HashMap<String, AttributeValue>(); mapAssert.put("1", new AttributeValue("LAST_KEY")); ConsumedCapacity consumed = (ConsumedCapacity) exchange.getIn().getHeader(DdbConstants.CONSUMED_CAPACITY); assertEquals(scanFilter, ddbClient.scanRequest.getScanFilter()); assertEquals(Integer.valueOf(10), exchange.getIn().getHeader(DdbConstants.SCANNED_COUNT, Integer.class)); assertEquals(Integer.valueOf(1), exchange.getIn().getHeader(DdbConstants.COUNT, Integer.class)); assertEquals(Double.valueOf(1.0), consumed.getCapacityUnits()); assertEquals(mapAssert, exchange.getIn().getHeader(DdbConstants.LAST_EVALUATED_KEY, Map.class)); Map<?, ?> items = (Map<?, ?>) exchange.getIn().getHeader(DdbConstants.ITEMS, List.class).get(0); assertEquals(new AttributeValue("attrValue"), items.get("attrName")); }
@Override public QueryResultWrapper next() throws BackendException { final Query backoff = new ExponentialBackoff.Query(request, delegate, permitsToConsume); final QueryResult result = backoff.runWithBackoff(); final ConsumedCapacity consumedCapacity = result.getConsumedCapacity(); if (null != consumedCapacity) { permitsToConsume = Math.max((int) (consumedCapacity.getCapacityUnits() - 1.0), 1); totalCapacityUnits += consumedCapacity.getCapacityUnits(); } if (result.getLastEvaluatedKey() != null && !result.getLastEvaluatedKey().isEmpty()) { request.setExclusiveStartKey(result.getLastEvaluatedKey()); } else { markComplete(); } // a update returned count returnedCount += result.getCount(); // b update scanned count scannedCount += result.getScannedCount(); // c add scanned finalItemList finalItemList.addAll(result.getItems()); return new QueryResultWrapper(titanKey, result); }
@Override public void write(K key, V value) throws IOException { if (value == null) { throw new RuntimeException("Null record encountered. At least the key columns must be " + "specified."); } verifyInterval(); if (progressable != null) { progressable.progress(); } DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value); BatchWriteItemResult result = client.putBatch(tableName, item.getItem(), permissibleWritesPerSecond - writesPerSecond, reporter); batchSize++; totalItemsWritten++; if (result != null) { if (result.getConsumedCapacity() != null) { for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) { double consumedUnits = consumedCapacity.getCapacityUnits(); totalIOPSConsumed += consumedUnits; } } int unprocessedItems = 0; for (List<WriteRequest> requests : result.getUnprocessedItems().values()) { unprocessedItems += requests.size(); } writesPerSecond += batchSize - unprocessedItems; batchSize = unprocessedItems; } }
private ScanResult getHashKeyItems(String[] hashKeys, String type) { List<Map<String, AttributeValue>> items = new ArrayList<>(); for (String key : hashKeys) { Map<String, AttributeValue> item = new HashMap<>(); if (type.equals("S")) { item.put("hashKey", new AttributeValue(key)); } else { item.put("hashKey", new AttributeValue().withN(key)); } items.add(item); } return new ScanResult().withScannedCount(items.size()).withItems(items).withConsumedCapacity (new ConsumedCapacity().withCapacityUnits(1d)); }
@Test public void execute() { Map<String, AttributeValue> startKey = new HashMap<String, AttributeValue>(); startKey.put("1", new AttributeValue("startKey")); List<String> attributeNames = Arrays.asList("attrNameOne", "attrNameTwo"); exchange.getIn().setHeader(DdbConstants.ATTRIBUTE_NAMES, attributeNames); exchange.getIn().setHeader(DdbConstants.CONSISTENT_READ, true); exchange.getIn().setHeader(DdbConstants.START_KEY, startKey); exchange.getIn().setHeader(DdbConstants.LIMIT, 10); exchange.getIn().setHeader(DdbConstants.SCAN_INDEX_FORWARD, true); Map<String, Condition> keyConditions = new HashMap<String, Condition>(); Condition condition = new Condition() .withComparisonOperator(ComparisonOperator.GT.toString()) .withAttributeValueList(new AttributeValue().withN("1985")); keyConditions.put("1", condition); exchange.getIn().setHeader(DdbConstants.KEY_CONDITIONS, keyConditions); command.execute(); Map<String, AttributeValue> mapAssert = new HashMap<String, AttributeValue>(); mapAssert.put("1", new AttributeValue("LAST_KEY")); ConsumedCapacity consumed = (ConsumedCapacity) exchange.getIn().getHeader(DdbConstants.CONSUMED_CAPACITY); assertEquals(Integer.valueOf(1), exchange.getIn().getHeader(DdbConstants.COUNT, Integer.class)); assertEquals(Double.valueOf(1.0), consumed.getCapacityUnits()); assertEquals(mapAssert, exchange.getIn().getHeader(DdbConstants.LAST_EVALUATED_KEY, Map.class)); assertEquals(keyConditions, exchange.getIn().getHeader(DdbConstants.KEY_CONDITIONS, Map.class)); Map<?, ?> items = (Map<?, ?>) exchange.getIn().getHeader(DdbConstants.ITEMS, List.class).get(0); assertEquals(new AttributeValue("attrValue"), items.get("attrName")); }
@SuppressWarnings("unchecked") @Override public ScanResult scan(ScanRequest scanRequest) { this.scanRequest = scanRequest; ConsumedCapacity consumed = new ConsumedCapacity(); consumed.setCapacityUnits(1.0); Map<String, AttributeValue> lastEvaluatedKey = new HashMap<String, AttributeValue>(); lastEvaluatedKey.put("1", new AttributeValue("LAST_KEY")); return new ScanResult() .withConsumedCapacity(consumed) .withCount(1) .withItems(getAttributes()) .withScannedCount(10) .withLastEvaluatedKey(lastEvaluatedKey); }
@SuppressWarnings("unchecked") @Override public QueryResult query(QueryRequest queryRequest) { this.queryRequest = queryRequest; ConsumedCapacity consumed = new ConsumedCapacity(); consumed.setCapacityUnits(1.0); Map<String, AttributeValue> lastEvaluatedKey = new HashMap<String, AttributeValue>(); lastEvaluatedKey.put("1", new AttributeValue("LAST_KEY")); return new QueryResult() .withConsumedCapacity(consumed) .withCount(1) .withItems(getAttributes()) .withLastEvaluatedKey(lastEvaluatedKey); }
@Override protected QueryResultWrapper getMergedPages() { final QueryResult mergedDynamoResult = new QueryResult().withItems(getFinalItemList()) .withCount(returnedCount) .withScannedCount(scannedCount) .withConsumedCapacity(new ConsumedCapacity() .withTableName(request.getTableName()) .withCapacityUnits(totalCapacityUnits)); return new QueryResultWrapper(titanKey, mergedDynamoResult); }
@Override public SegmentedScanResult call() { ScanResult result = null; result = runWithBackoff(); final ConsumedCapacity cc = result.getConsumedCapacity(); if (cc != null && cc.getCapacityUnits() != null) { lastConsumedCapacity = result.getConsumedCapacity() .getCapacityUnits().intValue(); } else if (result.getScannedCount() != null && result.getCount() != null) { final boolean isConsistent = request.getConsistentRead(); int itemSize = isConsistent ? BootstrapConstants.STRONGLY_CONSISTENT_READ_ITEM_SIZE : BootstrapConstants.EVENTUALLY_CONSISTENT_READ_ITEM_SIZE; lastConsumedCapacity = (result.getScannedCount() / (int) Math.max(1.0, result.getCount())) * (ItemSizeCalculator.calculateScanResultSizeInBytes(result) / itemSize); } if (result.getLastEvaluatedKey() != null && !result.getLastEvaluatedKey().isEmpty()) { hasNext = true; request.setExclusiveStartKey(result.getLastEvaluatedKey()); } else { hasNext = false; } if (lastConsumedCapacity > 0) { rateLimiter.acquire(lastConsumedCapacity); } return new SegmentedScanResult(result, request.getSegment()); }
/** * Batch writes the write request to the DynamoDB endpoint and THEN acquires * permits equal to the consumed capacity of the write. */ @Override public Void call() { List<ConsumedCapacity> batchResult = runWithBackoff(batch); Iterator<ConsumedCapacity> it = batchResult.iterator(); int consumedCapacity = 0; while (it.hasNext()) { consumedCapacity += it.next().getCapacityUnits().intValue(); } rateLimiter.acquire(consumedCapacity); return null; }
/** * Writes to DynamoDBTable using an exponential backoff. If the * batchWriteItem returns unprocessed items then it will exponentially * backoff and retry the unprocessed items. */ public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) { BatchWriteItemResult writeItemResult = null; List<ConsumedCapacity> consumedCapacities = new LinkedList<ConsumedCapacity>(); Map<String, List<WriteRequest>> unprocessedItems = null; boolean interrupted = false; try { do { writeItemResult = client.batchWriteItem(req); unprocessedItems = writeItemResult.getUnprocessedItems(); consumedCapacities .addAll(writeItemResult.getConsumedCapacity()); if (unprocessedItems != null) { req.setRequestItems(unprocessedItems); try { Thread.sleep(exponentialBackoffTime); } catch (InterruptedException ie) { interrupted = true; } finally { exponentialBackoffTime *= 2; if (exponentialBackoffTime > BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME) { exponentialBackoffTime = BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME; } } } } while (unprocessedItems != null && unprocessedItems.get(tableName) != null); return consumedCapacities; } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
public void adjustRateWithConsumedCapacity(ConsumedCapacity consumedCapacity) { accumulatedReadWritePermits += consumedCapacity.getCapacityUnits(); if (accumulatedReadWritePermits > 1.0) { int intValueOfPermits = Double.valueOf(accumulatedReadWritePermits).intValue(); rateLimiter.acquire(intValueOfPermits); accumulatedReadWritePermits -= (double) intValueOfPermits; } }
@Test public void test_batchWriteItem_WithAllParameters() throws Exception { createTable(); String TEST_ATTRIBUTE_2 = "Attribute2"; String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2"; Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); List<WriteRequest> writeRequests = new ArrayList<WriteRequest>(); Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>(); item1.put(TEST_ATTRIBUTE, new AttributeValue() .withS(TEST_ATTRIBUTE_VALUE)); WriteRequest writeRequest1 = new WriteRequest() .withPutRequest(new PutRequest() .withItem(item1)); writeRequests.add(writeRequest1); Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>(); item2.put(TEST_ATTRIBUTE_2, new AttributeValue() .withS(TEST_ATTRIBUTE_VALUE_2)); WriteRequest writeRequest2 = new WriteRequest() .withPutRequest(new PutRequest() .withItem(item2)); writeRequests.add(writeRequest2); requestItems.put(TEST_TABLE_NAME, writeRequests); BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems); List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity(); assertThat(consumedCapacities.size(), equalTo(writeRequests.size())); }
/** * QueryValve can fetch data from AWS. * @throws Exception If some problem inside */ @Test @SuppressWarnings("unchecked") public void fetchesData() throws Exception { final Valve valve = new QueryValve(); final Credentials credentials = Mockito.mock(Credentials.class); final ImmutableMap<String, AttributeValue> item = new ImmutableMap.Builder<String, AttributeValue>() .build(); final AmazonDynamoDB aws = Mockito.mock(AmazonDynamoDB.class); Mockito.doReturn(aws).when(credentials).aws(); Mockito.doReturn( new QueryResult() .withItems( Collections.<Map<String, AttributeValue>>singletonList(item) ) .withConsumedCapacity( new ConsumedCapacity().withCapacityUnits(1.0d) ) ).when(aws).query(Mockito.any(QueryRequest.class)); final Dosage dosage = valve.fetch( credentials, "table", new Conditions(), new ArrayList<String>(0) ); MatcherAssert.assertThat(dosage.hasNext(), Matchers.is(false)); MatcherAssert.assertThat(dosage.items(), Matchers.hasItem(item)); }
/** * ScanValve can fetch data from AWS. * @throws Exception If some problem inside */ @Test @SuppressWarnings("unchecked") public void fetchesData() throws Exception { final Valve valve = new ScanValve(); final Credentials credentials = Mockito.mock(Credentials.class); final ImmutableMap<String, AttributeValue> item = new ImmutableMap.Builder<String, AttributeValue>() .build(); final AmazonDynamoDB aws = Mockito.mock(AmazonDynamoDB.class); Mockito.doReturn(aws).when(credentials).aws(); Mockito.doReturn( new ScanResult() .withItems( Collections.<Map<String, AttributeValue>>singletonList(item) ) .withConsumedCapacity( new ConsumedCapacity().withCapacityUnits(1d) ) ).when(aws).scan(Mockito.any(ScanRequest.class)); final Dosage dosage = valve.fetch( credentials, "table", new Conditions(), new ArrayList<String>(0) ); MatcherAssert.assertThat(dosage.hasNext(), Matchers.is(false)); MatcherAssert.assertThat(dosage.items(), Matchers.hasItem(item)); }
public BatchWriteItemResult batchWriteItem(final BatchWriteItemRequest batchRequest) throws BackendException { int count = 0; for (Entry<String, List<WriteRequest>> entry : batchRequest.getRequestItems().entrySet()) { final String tableName = entry.getKey(); final List<WriteRequest> requests = entry.getValue(); count += requests.size(); if (count > BATCH_WRITE_MAX_NUMBER_OF_ITEMS) { throw new IllegalArgumentException("cant have more than 25 requests in a batchwrite"); } for (final WriteRequest request : requests) { if ((request.getPutRequest() != null) == (request.getDeleteRequest() != null)) { throw new IllegalArgumentException("Exactly one of PutRequest or DeleteRequest must be set in each WriteRequest in a batch write operation"); } final int wcu; final String apiName; if (request.getPutRequest() != null) { apiName = PUT_ITEM; final int bytes = calculateItemSizeInBytes(request.getPutRequest().getItem()); wcu = computeWcu(bytes); } else { //deleterequest apiName = DELETE_ITEM; wcu = estimateCapacityUnits(apiName, tableName); } timedWriteThrottle(apiName, tableName, wcu); } } BatchWriteItemResult result; setUserAgent(batchRequest); final Timer.Context apiTimerContext = getTimerContext(BATCH_WRITE_ITEM, null /*tableName*/); try { result = client.batchWriteItem(batchRequest); } catch (Exception e) { throw processDynamoDbApiException(e, BATCH_WRITE_ITEM, null /*tableName*/); } finally { apiTimerContext.stop(); } if (result.getConsumedCapacity() != null) { for (ConsumedCapacity ccu : result.getConsumedCapacity()) { meterConsumedCapacity(BATCH_WRITE_ITEM, ccu); } } return result; }
private void meterConsumedCapacity(final String apiName, final ConsumedCapacity ccu) { if (ccu != null) { getConsumedCapacityMeter(apiName, ccu.getTableName()).mark(Math.round(ccu.getCapacityUnits())); } }
public void adjustRateWithConsumedCapacity(List<ConsumedCapacity> bathWriteConsumedCapacity) { for (ConsumedCapacity consumedCapacity : bathWriteConsumedCapacity) { adjustRateWithConsumedCapacity(consumedCapacity); } }
private static void writeMultipleItemsBatchWrite() { try { // Create a map for the requests in the batch Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); // Create a PutRequest for a new Forum item Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>(); forumItem.put("Name", new AttributeValue().withS("Amazon RDS")); forumItem.put("Threads", new AttributeValue().withN("0")); List<WriteRequest> forumList = new ArrayList<WriteRequest>(); forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem))); requestItems.put(table1Name, forumList); // Create a PutRequest for a new Thread item Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>(); threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS")); threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1")); threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message")); threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory"))); List<WriteRequest> threadList = new ArrayList<WriteRequest>(); threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem))); // Create a DeleteRequest for a Thread item Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3")); threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100")); threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey))); requestItems.put(table2Name, threadList); BatchWriteItemResult result; BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); do { System.out.println("Making the request."); batchWriteItemRequest.withRequestItems(requestItems); result = client.batchWriteItem(batchWriteItemRequest); // Print consumed capacity units for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) { String tableName = consumedCapacity.getTableName(); Double consumedCapacityUnits = consumedCapacity.getCapacityUnits(); System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits); } // Check for unprocessed keys which could happen if you exceed provisioned throughput System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems()); requestItems = result.getUnprocessedItems(); } while (result.getUnprocessedItems().size() > 0); } catch (AmazonServiceException ase) { System.err.println("Failed to retrieve items: "); ase.printStackTrace(System.err); } }
/** * AwsTable can save an item. * @throws Exception If some problem inside */ @Test public void savesItemToDynamo() throws Exception { final Credentials credentials = Mockito.mock(Credentials.class); final AmazonDynamoDB aws = Mockito.mock(AmazonDynamoDB.class); Mockito.doReturn(aws).when(credentials).aws(); Mockito.doReturn( new PutItemResult().withConsumedCapacity( new ConsumedCapacity().withCapacityUnits(1.0d) ) ).when(aws).putItem(Mockito.any(PutItemRequest.class)); Mockito.doReturn( new DescribeTableResult().withTable( new TableDescription().withKeySchema( new KeySchemaElement().withAttributeName(AwsTableTest.KEY) ) ) ).when(aws).describeTable(Mockito.any(DescribeTableRequest.class)); final String attr = "attribute-1"; final AttributeValue value = new AttributeValue("value-1"); final String name = "table-name"; final Table table = new AwsTable( credentials, Mockito.mock(Region.class), name ); table.put(new Attributes().with(attr, value)); Mockito.verify(aws).putItem( PutItemRequest.class.cast( MockitoHamcrest.argThat( Matchers.allOf( Matchers.hasProperty( AwsTableTest.TABLE_NAME, Matchers.equalTo(name) ), Matchers.hasProperty( "item", Matchers.hasEntry( Matchers.equalTo(attr), Matchers.equalTo(value) ) ) ) ) ) ); }
/** * AwsTable can delete an item. * @throws Exception If some problem inside */ @Test public void deletesItemFromDynamo() throws Exception { final Credentials credentials = Mockito.mock(Credentials.class); final AmazonDynamoDB aws = Mockito.mock(AmazonDynamoDB.class); Mockito.doReturn(aws).when(credentials).aws(); Mockito.doReturn( new DeleteItemResult().withConsumedCapacity( new ConsumedCapacity().withCapacityUnits(1.0d) ) ).when(aws).deleteItem(Mockito.any(DeleteItemRequest.class)); Mockito.doReturn( new DescribeTableResult().withTable( new TableDescription().withKeySchema( new KeySchemaElement().withAttributeName(AwsTableTest.KEY) ) ) ).when(aws).describeTable(Mockito.any(DescribeTableRequest.class)); final String attr = "attribute-2"; final AttributeValue value = new AttributeValue("value-2"); final String name = "table-name-2"; final Table table = new AwsTable( credentials, Mockito.mock(Region.class), name ); table.delete(new Attributes().with(attr, value)); Mockito.verify(aws).deleteItem( DeleteItemRequest.class.cast( MockitoHamcrest.argThat( Matchers.allOf( Matchers.hasProperty( AwsTableTest.TABLE_NAME, Matchers.equalTo(name) ), Matchers.hasProperty( AwsTableTest.KEY, Matchers.hasEntry( Matchers.equalTo(attr), Matchers.equalTo(value) ) ) ) ) ) ); }
/** * Default ctor. * @param capacity Consumed capacity */ PrintableConsumedCapacity(final ConsumedCapacity capacity) { this.capacity = capacity; }