public static void main(String[] args) { AmazonDynamoDBClient client = new AmazonDynamoDBClient(); client.setEndpoint("http://localhost:8000"); DynamoDB dynamoDB = new DynamoDB(client); Table table = dynamoDB.getTable("Movies"); ScanSpec scanSpec = new ScanSpec() .withProjectionExpression("#yr, title, info.rating") .withFilterExpression("#yr between :start_yr and :end_yr") .withNameMap(new NameMap().with("#yr", "year")) .withValueMap(new ValueMap().withNumber(":start_yr", 1950).withNumber(":end_yr", 1959)); ItemCollection<ScanOutcome> items = table.scan(scanSpec); Iterator<Item> iter = items.iterator(); while (iter.hasNext()) { Item item = iter.next(); System.out.println(item.toString()); } }
@Override public List<T> getAll() { //ToDo: implement batching to counter large tables and Dynamo request limits return StreamSupport.stream(table.scan(new ScanSpec()).spliterator(), true) .map(i -> converter.fromDynamo(i.asMap())) .collect(Collectors.toList()); }
private ScanOutcome scan(ScanSpec scanSpec, PageIterator pageIterator) { if ( null == _convertMarker ) { throw new IllegalStateException("Index must first be initialized with ConvertMarker"); } if ( pageIterator.getPageSize() <= 0 ) { return new ScanOutcome(new ScanResult()); } ItemCollection<ScanOutcome> itemCollection = maybeBackoff(true, () -> _scan.scan(withMarker(scanSpec, pageIterator))); if ( null != itemCollection ) { Iterator<Page<Item, ScanOutcome>> iterator = itemCollection.pages().iterator(); if ( iterator.hasNext() ) { ScanOutcome outcome = maybeBackoff(true, () -> iterator.next().getLowLevelResult()); ScanResult result = outcome.getScanResult(); Map<String,AttributeValue> lastKey = result.getLastEvaluatedKey(); if ( null != lastKey && ! lastKey.isEmpty() ) { pageIterator.setMarker(toMarker(lastKey, false)); } else { pageIterator.setMarker(null); } return outcome; } } pageIterator.setMarker(null); return new ScanOutcome(new ScanResult()); }
private ScanSpec withMarker(ScanSpec spec, PageIterator pageIterator) { if ( ! pageIterator.isForward() ) { throw new IllegalArgumentException("Backward scans are not supported"); } if ( null != pageIterator.getMarker() ) { spec.withExclusiveStartKey(fromMarker(null, pageIterator.getMarker())); } int pageSize = pageIterator.getPageSize(); if ( pageSize != Integer.MAX_VALUE ) { spec.withMaxPageSize(pageSize); } return spec; }
@Override public void run() { System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time..."); int totalScannedItemCount = 0; Table table = dynamoDB.getTable(tableName); try { ScanSpec spec = new ScanSpec() .withMaxResultSize(itemLimit) .withTotalSegments(totalSegments) .withSegment(segment); ItemCollection<ScanOutcome> items = table.scan(spec); Iterator<Item> iterator = items.iterator(); Item currentItem = null; while (iterator.hasNext()) { totalScannedItemCount++; currentItem = iterator.next(); System.out.println(currentItem.toString()); } } catch (Exception e) { System.err.println(e.getMessage()); } finally { System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName); } }
@Override public List<T> scanItems(PageIterator pageIterator) { return toList(scan(new ScanSpec(), pageIterator)); }
@Override public int countItems(PageIterator pageIterator) { return toCount(scan(new ScanSpec().withSelect(Select.COUNT), pageIterator)); }
@Override public Object handleRequest(SegmentScannerInput input, Context context) { context.getLogger().log("Input: " + input.toJson() + "\n"); context.getLogger().log("Start scanning segment " + input.getSegment() + "\n"); DynamoDB dynamodb = new DynamoDB(Regions.US_WEST_2); // update tracking table in DynamoDB stating that we're in progress dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item().withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_IN_PROGRESS)); ScanSpec scanSpec = new ScanSpec() .withMaxPageSize(MAX_PAGE_SIZE) .withSegment(input.getSegment()) .withTotalSegments(input.getTotalSegments()) .withConsistentRead(true) .withMaxResultSize(MAX_RESULT_SIZE) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); // if resuming an in-progress segment, specify the start key here if (input.getStartScore() != null) { scanSpec.withExclusiveStartKey(SCORE_ID, input.getStartScore()); } RateLimiter rateLimiter = RateLimiter.create(input.getMaxConsumedCapacity()); Map<String, AttributeValue> lastEvaluatedKey = null; Table scoresTable = dynamodb.getTable(SCORE_TABLE_NAME); for (Page<Item, ScanOutcome> scanResultPage : scoresTable.scan(scanSpec).pages()) { // process items for (Item item : scanResultPage) { DataTransformer.HIGH_SCORES_BY_DATE_TRANSFORMER.transform(item, dynamodb); } /* * After reading each page, we acquire the consumed capacity from * the RateLimiter. * * For more information on using RateLimiter with DynamoDB scans, * see "Rate Limited Scans in Amazon DynamoDB" * on the AWS Java Development Blog: * https://java.awsblog.com/post/Tx3VAYQIZ3Q0ZVW */ ScanResult scanResult = scanResultPage.getLowLevelResult().getScanResult(); lastEvaluatedKey = scanResult.getLastEvaluatedKey(); double consumedCapacity = scanResult.getConsumedCapacity().getCapacityUnits(); rateLimiter.acquire((int)Math.round(consumedCapacity)); // forego processing additional pages if we're running out of time if (context.getRemainingTimeInMillis() < REMAINING_TIME_CUTOFF) { break; } } if (lastEvaluatedKey != null && !lastEvaluatedKey.isEmpty()) { Entry<String, AttributeValue> entry = lastEvaluatedKey.entrySet() .iterator().next(); String lastScoreId = entry.getValue().getS(); dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item() .withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_INCOMPLETE) .withString(LAST_SCORE_ID, lastScoreId)); return false; } // update tracking table in DynamoDB stating that we're done dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item().withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_DONE)); context.getLogger().log("Finish scanning segment " + input.getSegment() + "\n"); return true; }