@Override public int next() { Stopwatch watch = Stopwatch.createStarted(); topLevelState.reset(); int count = 0; Page<Item, ?> page; while (resultIter.hasNext()) { page = resultIter.next(); for (Item item : page) { int rowCount = count++; for (Map.Entry<String, Object> attribute : item.attributes()) { String name = attribute.getKey(); Object value = attribute.getValue(); SchemaPath column = getSchemaPath(name); topLevelState.setColumn(column); handleTopField(rowCount, name, value, topLevelState); } } } topLevelState.setRowCount(count); LOG.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count); return count; }
/** * Reads a page from a "shared" DynamoDB table. Shared tables are tables that have global secondary indexes * and can contain the objects of multiple apps. * @param <P> type of object * @param appid the app identifier (name) * @param pager a {@link Pager} * @return the id of the last object on the page, or null. */ public static <P extends ParaObject> List<P> readPageFromSharedTable(String appid, Pager pager) { LinkedList<P> results = new LinkedList<>(); if (StringUtils.isBlank(appid)) { return results; } Page<Item, QueryOutcome> items = queryGSI(appid, pager); if (items != null) { for (Item item : items) { P obj = ParaObjectUtils.setAnnotatedFields(item.asMap()); if (obj != null) { results.add(obj); } } } if (!results.isEmpty() && pager != null) { pager.setLastKey(results.peekLast().getId()); } return results; }
private static Page<Item, QueryOutcome> queryGSI(String appid, Pager p) { Pager pager = (p != null) ? p : new Pager(); Index index = getSharedIndex(); QuerySpec spec = new QuerySpec(). withMaxPageSize(pager.getLimit()). withMaxResultSize(pager.getLimit()). withKeyConditionExpression(Config._APPID + " = :aid"). withValueMap(new ValueMap().withString(":aid", appid)); if (!StringUtils.isBlank(pager.getLastKey())) { spec = spec.withExclusiveStartKey(new KeyAttribute(Config._APPID, appid), // HASH/PARTITION KEY new KeyAttribute(Config._ID, pager.getLastKey()), // RANGE/SORT KEY new KeyAttribute(Config._KEY, getKeyForAppid(pager.getLastKey(), appid))); // TABLE PRIMARY KEY } return index != null ? index.query(spec).firstPage() : null; }
@Override public List<Feature> loadFeatures(String group) { logger.info("{}", kvp("op", "loadFeatures", "group", group)); List<Feature> features = Lists.newArrayList(); Table table = dynamoDB.getTable(featureTableName); QuerySpec querySpec = new QuerySpec() .withKeyConditionExpression(HASH_KEY + " = :k_" + HASH_KEY) .withValueMap(new ValueMap().withString(":k_" + HASH_KEY, group)) .withConsistentRead(true); DynamoDbCommand<ItemCollection<QueryOutcome>> cmd = new DynamoDbCommand<>("loadFeatures", () -> queryTable(table, querySpec), () -> { throw new RuntimeException("loadFeatureById"); }, hystrixReadConfiguration, metrics); ItemCollection<QueryOutcome> items = cmd.execute(); for (Page<Item, QueryOutcome> page : items.pages()) { page.forEach(item -> features.add(FeatureSupport.toFeature(item.getString("json")))); } return features; }
public Iterator<Page<Item, ?>> query() { QuerySpec query = new QuerySpec(); query.withConsistentRead(consistentRead); query.withMaxPageSize(scanProps.getLimit()); if (!isStarQuery) { query.withProjectionExpression(projection); } NameMapper mapper = new NameMapper(); DynamoQueryFilterSpec filter = (DynamoQueryFilterSpec) slice.getFilter(); // key space DynamoFilterSpec key = filter.getKey(); String keyFilter = asFilterExpression(mapper, key); assert keyFilter != null : "Got a null key filter for query! Spec: " + slice; query.withKeyConditionExpression(keyFilter); // attributes, if we have them DynamoFilterSpec attribute = filter.getAttributeFilter(); String attrFilterExpr = asFilterExpression(mapper, attribute); if (attrFilterExpr != null) { query.withFilterExpression(attrFilterExpr); } query.withNameMap(mapper.nameMap); query.withValueMap(mapper.valueMap); Iterator iter = table.query(query).pages().iterator(); return iter; }
private ScanOutcome scan(ScanSpec scanSpec, PageIterator pageIterator) { if ( null == _convertMarker ) { throw new IllegalStateException("Index must first be initialized with ConvertMarker"); } if ( pageIterator.getPageSize() <= 0 ) { return new ScanOutcome(new ScanResult()); } ItemCollection<ScanOutcome> itemCollection = maybeBackoff(true, () -> _scan.scan(withMarker(scanSpec, pageIterator))); if ( null != itemCollection ) { Iterator<Page<Item, ScanOutcome>> iterator = itemCollection.pages().iterator(); if ( iterator.hasNext() ) { ScanOutcome outcome = maybeBackoff(true, () -> iterator.next().getLowLevelResult()); ScanResult result = outcome.getScanResult(); Map<String,AttributeValue> lastKey = result.getLastEvaluatedKey(); if ( null != lastKey && ! lastKey.isEmpty() ) { pageIterator.setMarker(toMarker(lastKey, false)); } else { pageIterator.setMarker(null); } return outcome; } } pageIterator.setMarker(null); return new ScanOutcome(new ScanResult()); }
private QueryOutcome query(Object hk, QuerySpec querySpec, PageIterator pageIterator) { if ( null == _convertMarker ) { throw new IllegalStateException("Index must first be initialized with ConvertMarker"); } if ( pageIterator.getPageSize() <= 0 ) { return new QueryOutcome(new QueryResult()); } ItemCollection<QueryOutcome> itemCollection = maybeBackoff(true, () -> _query.query(withMarker(querySpec.withHashKey(_hkName, hk), pageIterator, hk))); if ( null != itemCollection ) { Iterator<Page<Item, QueryOutcome>> iterator = itemCollection.pages().iterator(); if ( iterator.hasNext() ) { QueryOutcome outcome = maybeBackoff(true, () -> iterator.next().getLowLevelResult()); QueryResult result = outcome.getQueryResult(); if ( null != pageIterator.getMarker() && null != result.getItems() && result.getItems().size() > 0 ) { pageIterator.setPrevMarker(toMarker(result.getItems().get(0), true)); } else { pageIterator.setPrevMarker(null); } Map<String,AttributeValue> lastKey = result.getLastEvaluatedKey(); if ( null != lastKey && ! lastKey.isEmpty() ) { pageIterator.setMarker(toMarker(lastKey, true)); } else { pageIterator.setMarker(null); } return outcome; } } pageIterator.setPrevMarker(null); pageIterator.setMarker(null); return new QueryOutcome(new QueryResult()); }
private static void findRepliesForAThreadSpecifyOptionalLimit(String forumName, String threadSubject) { Table table = dynamoDB.getTable(tableName); String replyId = forumName + "#" + threadSubject; QuerySpec spec = new QuerySpec() .withKeyConditionExpression("Id = :v_id") .withValueMap(new ValueMap() .withString(":v_id", replyId)) .withMaxPageSize(1); ItemCollection<QueryOutcome> items = table.query(spec); System.out.println("\nfindRepliesForAThreadSpecifyOptionalLimit results:"); // Process each page of results int pageNum = 0; for (Page<Item, QueryOutcome> page : items.pages()) { System.out.println("\nPage: " + ++pageNum); // Process each item on the current page Iterator<Item> item = page.iterator(); while (item.hasNext()) { System.out.println(item.next().toJSONPretty()); } } }
@Override protected Iterator<Page<Item, ?>> buildQuery(DynamoQueryBuilder builder, AmazonDynamoDBAsyncClient client) { return builder.withProps(scanProps).build(client).scan(); }
@Override protected Iterator<Page<Item, ?>> buildQuery(DynamoQueryBuilder builder, AmazonDynamoDBAsyncClient client) { return builder.withProps(scanProps).build(client).query(); }
@Override protected Iterator<Page<Item, ?>> buildQuery(DynamoQueryBuilder builder, AmazonDynamoDBAsyncClient client) { return builder.build(client).get(); }
public Iterator<Page<Item, ?>> get() { GetItemSpec query = new GetItemSpec(); query.withConsistentRead(consistentRead); if (!isStarQuery) { query.withProjectionExpression(projection); } DynamoReadFilterSpec filter = slice.getFilter(); // key space PrimaryKey pk = new PrimaryKey(); DynamoFilterSpec key = filter.getKey(); FilterTree tree = key.getTree(); assert tree != null; tree.visit(new FilterTree.FilterNodeVisitor<Void>() { @Override public Void visitInnerNode(FilterNodeInner inner) { assert inner.getCondition().equals("AND"); inner.getLeft().visit(this); inner.getRight().visit(this); return null; } @Override public Void visitLeafNode(FilterLeaf leaf) { assert leaf.getOperand().equals("=") : "Gets must use '=' for keys. Got: " + leaf.getOperand(); pk.addComponent(leaf.getKey(), leaf.getValues()[0]); return null; } }); query.withPrimaryKey(pk); return new AbstractIterator<Page<Item, ?>>() { private boolean ran = false; @Override protected Page<Item, ?> computeNext() { if (ran) { endOfData(); return null; } try { Item i = table.getItem(query); return new GetItemPage(i); } finally { ran = true; } } }; }
@Override public Page<Item, Item> nextPage() { return null; }
protected abstract Iterator<Page<Item, ?>> buildQuery(DynamoQueryBuilder builder, AmazonDynamoDBAsyncClient client);
@Override public Object handleRequest(SegmentScannerInput input, Context context) { context.getLogger().log("Input: " + input.toJson() + "\n"); context.getLogger().log("Start scanning segment " + input.getSegment() + "\n"); DynamoDB dynamodb = new DynamoDB(Regions.US_WEST_2); // update tracking table in DynamoDB stating that we're in progress dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item().withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_IN_PROGRESS)); ScanSpec scanSpec = new ScanSpec() .withMaxPageSize(MAX_PAGE_SIZE) .withSegment(input.getSegment()) .withTotalSegments(input.getTotalSegments()) .withConsistentRead(true) .withMaxResultSize(MAX_RESULT_SIZE) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); // if resuming an in-progress segment, specify the start key here if (input.getStartScore() != null) { scanSpec.withExclusiveStartKey(SCORE_ID, input.getStartScore()); } RateLimiter rateLimiter = RateLimiter.create(input.getMaxConsumedCapacity()); Map<String, AttributeValue> lastEvaluatedKey = null; Table scoresTable = dynamodb.getTable(SCORE_TABLE_NAME); for (Page<Item, ScanOutcome> scanResultPage : scoresTable.scan(scanSpec).pages()) { // process items for (Item item : scanResultPage) { DataTransformer.HIGH_SCORES_BY_DATE_TRANSFORMER.transform(item, dynamodb); } /* * After reading each page, we acquire the consumed capacity from * the RateLimiter. * * For more information on using RateLimiter with DynamoDB scans, * see "Rate Limited Scans in Amazon DynamoDB" * on the AWS Java Development Blog: * https://java.awsblog.com/post/Tx3VAYQIZ3Q0ZVW */ ScanResult scanResult = scanResultPage.getLowLevelResult().getScanResult(); lastEvaluatedKey = scanResult.getLastEvaluatedKey(); double consumedCapacity = scanResult.getConsumedCapacity().getCapacityUnits(); rateLimiter.acquire((int)Math.round(consumedCapacity)); // forego processing additional pages if we're running out of time if (context.getRemainingTimeInMillis() < REMAINING_TIME_CUTOFF) { break; } } if (lastEvaluatedKey != null && !lastEvaluatedKey.isEmpty()) { Entry<String, AttributeValue> entry = lastEvaluatedKey.entrySet() .iterator().next(); String lastScoreId = entry.getValue().getS(); dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item() .withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_INCOMPLETE) .withString(LAST_SCORE_ID, lastScoreId)); return false; } // update tracking table in DynamoDB stating that we're done dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem( new Item().withPrimaryKey(SEGMENT, input.getSegment()) .withString(STATUS, STATUS_DONE)); context.getLogger().log("Finish scanning segment " + input.getSegment() + "\n"); return true; }
/** * Deletes all objects in a shared table, which belong to a given appid, by scanning the GSI. * @param appid app id */ public static void deleteAllFromSharedTable(String appid) { if (StringUtils.isBlank(appid) || !isSharedAppid(appid)) { return; } Pager pager = new Pager(50); List<WriteRequest> allDeletes = new LinkedList<>(); Page<Item, QueryOutcome> items; // read all phase do { items = queryGSI(appid, pager); if (items == null) { break; } for (Item item : items) { String key = item.getString(Config._KEY); // only delete rows which belong to the given appid if (StringUtils.startsWith(key, appid.trim())) { logger.debug("Preparing to delete '{}' from shared table, appid: '{}'.", key, appid); pager.setLastKey(item.getString(Config._ID)); allDeletes.add(new WriteRequest().withDeleteRequest(new DeleteRequest(). withKey(Collections.singletonMap(Config._KEY, new AttributeValue(key))))); } } } while (items.iterator().hasNext()); // delete all phase final int maxItems = 20; int batchSteps = (allDeletes.size() > maxItems) ? (allDeletes.size() / maxItems) + 1 : 1; List<WriteRequest> reqs = new LinkedList<>(); Iterator<WriteRequest> it = allDeletes.iterator(); String tableName = getTableNameForAppid(appid); for (int i = 0; i < batchSteps; i++) { while (it.hasNext() && reqs.size() < maxItems) { reqs.add(it.next()); } if (reqs.size() > 0) { logger.info("Deleting {} items belonging to app '{}', from shared table (page {}/{})...", reqs.size(), appid, i + 1, batchSteps); batchWrite(Collections.singletonMap(tableName, reqs)); } reqs.clear(); } }