Java 类com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity 实例源码

项目:MasterStats    文件:DataManager.java   
/**
 * executes a page scan with scanExpression, limited by limiter for objects of the type T with the class clazz
 * and calls action for each object found
 *
 * @param clazz          class of the objects
 * @param scanExpression expression for the scan
 * @param limiter        the read limiter limiting the amount of requests
 * @param action         the function to call for each object
 * @param <T>            the type of the objects
 */
private static <T> void scanPages(Class<T> clazz, DynamoDBScanExpression scanExpression, RateLimiter limiter, Consumer<? super T> action) {
    // define pageScan and add consumed capacity to scan expression
    ScanResultPage<T> pageScan;
    scanExpression.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

    // initialize counter, permits and mapper
    int permitsToConsume = 1;
    DynamoDBMapper dynamoDBMapper = DBConnector.getInstance().getDynamoDBMapper();
    int scanned = 0;
    int count = 0;
    do {
        // acquire permits and scan
        limiter.acquire(permitsToConsume);
        pageScan = dynamoDBMapper.scanPage(clazz, scanExpression);

        // update page scan
        scanExpression.setExclusiveStartKey(pageScan.getLastEvaluatedKey());

        // update stats variables
        scanned += pageScan.getScannedCount();
        count += pageScan.getCount();

        // call the action on each result
        pageScan.getResults().forEach(action);

        // calculate permits for next scan
        Double capacityUnits = pageScan.getConsumedCapacity().getCapacityUnits();
        permitsToConsume = (int) (capacityUnits - 1);
        if (permitsToConsume <= 0) permitsToConsume = 1;

        log.info(String.format("Scanned a page for class %s. Results: %d/%d (%d/%d total). Capacity units consumed: %f",
                clazz.getSimpleName(), pageScan.getCount(), pageScan.getScannedCount(), count, scanned, capacityUnits));
    } while (pageScan.getLastEvaluatedKey() != null);
}
项目:emr-dynamodb-connector    文件:DynamoDBClient.java   
public RetryResult<ScanResult> scanTable(
    String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Integer segment, Integer
    totalSegments, Map<String, AttributeValue> exclusiveStartKey, long limit, Reporter reporter) {
  final ScanRequest scanRequest = new ScanRequest(tableName)
      .withExclusiveStartKey(exclusiveStartKey)
      .withLimit(Ints.checkedCast(limit))
      .withSegment(segment)
      .withTotalSegments(totalSegments)
      .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

  if (dynamoDBQueryFilter != null) {
    Map<String, Condition> scanFilter = dynamoDBQueryFilter.getScanFilter();
    if (!scanFilter.isEmpty()) {
      scanRequest.setScanFilter(scanFilter);
    }
  }

  RetryResult<ScanResult> retryResult = getRetryDriver().runWithRetry(new Callable<ScanResult>() {
    @Override
    public ScanResult call() {
      log.debug("Executing DynamoDB scan: " + scanRequest);
      return dynamoDB.scan(scanRequest);
    }
  }, reporter, PrintCounter.DynamoDBReadThrottle);
  return retryResult;
}
项目:emr-dynamodb-connector    文件:DynamoDBClient.java   
public RetryResult<QueryResult> queryTable(
    String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Map<String, AttributeValue>
    exclusiveStartKey, long limit, Reporter reporter) {
  final QueryRequest queryRequest = new QueryRequest()
      .withTableName(tableName)
      .withExclusiveStartKey(exclusiveStartKey)
      .withKeyConditions(dynamoDBQueryFilter.getKeyConditions())
      .withLimit(Ints.checkedCast(limit))
      .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

  RetryResult<QueryResult> retryResult = getRetryDriver().runWithRetry(
      new Callable<QueryResult>() {
        @Override
        public QueryResult call() {
          log.debug("Executing DynamoDB query: " + queryRequest);
          return dynamoDB.query(queryRequest);
        }
      }, reporter, PrintCounter.DynamoDBReadThrottle);
  return retryResult;
}
项目:dynamodb-import-export-tool    文件:DynamoDBBootstrapWorker.java   
/**
 * Begins to pipe the log results by parallel scanning the table and the
 * consumer writing the results.
 */
public void pipe(final AbstractLogConsumer consumer)
        throws ExecutionException, InterruptedException {
    final DynamoDBTableScan scanner = new DynamoDBTableScan(rateLimit,
            client);

    final ScanRequest request = new ScanRequest().withTableName(tableName)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
            .withLimit(BootstrapConstants.SCAN_LIMIT)
            .withConsistentRead(consistentScan);

    final ParallelScanExecutor scanService = scanner
            .getParallelScanCompletionService(request, numSegments,
                    threadPool, section, totalSections);

    while (!scanService.finished()) {
        SegmentedScanResult result = scanService.grab();
        consumer.writeResult(result);
    }

    shutdown(true);
    consumer.shutdown(true);
}
项目:dynamodb-online-index-violation-detector    文件:TableWriter.java   
/**
 * Sends an update request to the service and returns true if the request is successful.
 */
public boolean sendUpdateRequest(Map<String, AttributeValue> primaryKey, Map<String, AttributeValueUpdate> updateItems,
        Map<String, ExpectedAttributeValue> expectedItems) throws Exception {
    if (updateItems.isEmpty()) {
        return false; // No update, return false
    }
    UpdateItemRequest updateItemRequest = new UpdateItemRequest().withTableName(tableName).withKey(primaryKey).withReturnValues(ReturnValue.UPDATED_NEW)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withAttributeUpdates(updateItems);
    if (expectedItems != null) {
        updateItemRequest.withExpected(expectedItems);
    }

    UpdateItemResult result = dynamoDBClient.updateItem(updateItemRequest);
    if(!isRunningOnDDBLocal) {
        // DDB Local does not support rate limiting
        tableWriteRateLimiter.adjustRateWithConsumedCapacity(result.getConsumedCapacity());
    }
    return true;
}
项目:para    文件:AWSDynamoUtils.java   
/**
 * Writes multiple items in batch.
 * @param items a map of tables->write requests
 */
protected static void batchWrite(Map<String, List<WriteRequest>> items) {
    if (items == null || items.isEmpty()) {
        return;
    }
    try {
        BatchWriteItemResult result = getClient().batchWriteItem(new BatchWriteItemRequest().
                withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withRequestItems(items));
        if (result == null) {
            return;
        }
        logger.debug("batchWrite(): total {}, cc {}", items.size(), result.getConsumedCapacity());

        if (result.getUnprocessedItems() != null && !result.getUnprocessedItems().isEmpty()) {
            Thread.sleep(1000);
            logger.warn("{} UNPROCESSED write requests!", result.getUnprocessedItems().size());
            batchWrite(result.getUnprocessedItems());
        }
    } catch (Exception e) {
        logger.error(null, e);
    }
}
项目:dynamodb-import-export-tool    文件:DynamoDBConsumer.java   
/**
 * Splits up a ScanResult into a list of BatchWriteItemRequests of size 25
 * items or less each.
 */
public static List<BatchWriteItemRequest> splitResultIntoBatches(
        ScanResult result, String tableName) {
    List<BatchWriteItemRequest> batches = new LinkedList<BatchWriteItemRequest>();
    Iterator<Map<String, AttributeValue>> it = result.getItems().iterator();

    BatchWriteItemRequest req = new BatchWriteItemRequest()
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
    List<WriteRequest> writeRequests = new LinkedList<WriteRequest>();
    int i = 0;
    while (it.hasNext()) {
        PutRequest put = new PutRequest(it.next());
        writeRequests.add(new WriteRequest(put));

        i++;
        if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) {
            req.addRequestItemsEntry(tableName, writeRequests);
            batches.add(req);
            req = new BatchWriteItemRequest()
                    .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
            writeRequests = new LinkedList<WriteRequest>();
            i = 0;
        }
    }
    if (i > 0) {
        req.addRequestItemsEntry(tableName, writeRequests);
        batches.add(req);
    }
    return batches;
}
项目:dynamodb-online-index-violation-detector    文件:TableReader.java   
@Override
public void run() {
    Map<String, AttributeValue> exclusiveStartKey = null;
    ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withAttributesToGet(attributesToGet)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withTotalSegments(numOfSegments).withSegment(segmentNum);
    boolean scanNumLimitReached = false;
    while (!scanNumLimitReached) {
        scanRequest.withExclusiveStartKey(exclusiveStartKey);
        ScanResult scanResult = dynamoDBClient.scan(scanRequest);
        if(!isRunningOnDDBLocal) {
            // DDB Local does not support rate limiting
            tableReadRateLimiter.adjustRateWithConsumedCapacity(scanResult.getConsumedCapacity());
        }

        for (Map<String, AttributeValue> item : scanResult.getItems()) {
            checkItemViolationAndAddDeleteRequest(item);
            itemsScanned.addAndGet(1);
            itemScannedByThread += 1;
            scanNumLimitReached = isScanNumberLimitReached();
            if(scanNumLimitReached) {
                break;
            }
        }

        if (deleteViolationAfterFound) {
            sendDeleteViolations();
        }
        PrintHelper.printScanProgress(itemsScanned.get(), itemScannedByThread, violationFoundByThread, violationDeleteByThread);

        if (null == (exclusiveStartKey = scanResult.getLastEvaluatedKey())) {
            break;
        }
    }
    return;
}
项目:para    文件:AWSDynamoUtils.java   
/**
 * Reads multiple items from DynamoDB, in batch.
 * @param <P> type of object
 * @param kna a map of row key->data
 * @param results a map of ID->ParaObject
 */
protected static <P extends ParaObject> void batchGet(Map<String, KeysAndAttributes> kna, Map<String, P> results) {
    if (kna == null || kna.isEmpty() || results == null) {
        return;
    }
    try {
        BatchGetItemResult result = getClient().batchGetItem(new BatchGetItemRequest().
                withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withRequestItems(kna));
        if (result == null) {
            return;
        }

        List<Map<String, AttributeValue>> res = result.getResponses().get(kna.keySet().iterator().next());

        for (Map<String, AttributeValue> item : res) {
            P obj = fromRow(item);
            if (obj != null) {
                results.put(obj.getId(), obj);
            }
        }
        logger.debug("batchGet(): total {}, cc {}", res.size(), result.getConsumedCapacity());

        if (result.getUnprocessedKeys() != null && !result.getUnprocessedKeys().isEmpty()) {
            Thread.sleep(1000);
            logger.warn("{} UNPROCESSED read requests!", result.getUnprocessedKeys().size());
            batchGet(result.getUnprocessedKeys(), results);
        }
    } catch (Exception e) {
        logger.error(null, e);
    }
}
项目:para    文件:AWSDynamoUtils.java   
/**
 * Reads a page from a standard DynamoDB table.
 * @param <P> type of object
 * @param appid the app identifier (name)
 * @param p a {@link Pager}
 * @return the last row key of the page, or null.
 */
public static <P extends ParaObject> List<P> readPageFromTable(String appid, Pager p) {
    Pager pager = (p != null) ? p : new Pager();
    ScanRequest scanRequest = new ScanRequest().
            withTableName(getTableNameForAppid(appid)).
            withLimit(pager.getLimit()).
            withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

    if (!StringUtils.isBlank(pager.getLastKey())) {
        scanRequest = scanRequest.withExclusiveStartKey(Collections.
                singletonMap(Config._KEY, new AttributeValue(pager.getLastKey())));
    }

    ScanResult result = getClient().scan(scanRequest);
    LinkedList<P> results = new LinkedList<>();
    for (Map<String, AttributeValue> item : result.getItems()) {
        P obj = fromRow(item);
        if (obj != null) {
            results.add(obj);
        }
    }

    if (result.getLastEvaluatedKey() != null) {
        pager.setLastKey(result.getLastEvaluatedKey().get(Config._KEY).getS());
    } else if (!results.isEmpty()) {
        // set last key to be equal to the last result - end reached.
        pager.setLastKey(results.peekLast().getId());
    }
    return results;
}
项目:jcabi-dynamo    文件:AwsItem.java   
@Override
public Map<String, AttributeValue> put(
    final Map<String, AttributeValueUpdate> attrs) throws IOException {
    final AmazonDynamoDB aws = this.credentials.aws();
    final Attributes expected = this.attributes.only(this.keys);
    try {
        final UpdateItemRequest request = new UpdateItemRequest()
            .withTableName(this.name)
            .withExpected(expected.asKeys())
            .withKey(expected)
            .withAttributeUpdates(attrs)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
            .withReturnValues(ReturnValue.UPDATED_NEW);
        final long start = System.currentTimeMillis();
        final UpdateItemResult result = aws.updateItem(request);
        Logger.info(
            this, "#put('%s'): updated item to DynamoDB, %s, in %[ms]s",
            attrs,
            new PrintableConsumedCapacity(
                result.getConsumedCapacity()
            ).print(),
            System.currentTimeMillis() - start
        );
        return result.getAttributes();
    } catch (final AmazonClientException ex) {
        throw new IOException(
            String.format(
                "failed to put %s into \"%s\" with %s",
                attrs, this.name, this.keys
            ),
            ex
        );
    } finally {
        aws.shutdown();
    }
}
项目:jcabi-dynamo    文件:AwsItem.java   
/**
 * Makes a GetItemRequest for a given attribute.
 * @param attr Attribute name
 * @return GetItemRequest
 */
private GetItemRequest makeItemRequestFor(final String attr) {
    final GetItemRequest request = new GetItemRequest();
    request.setTableName(this.name);
    request.setAttributesToGet(Collections.singletonList(attr));
    request.setKey(this.attributes.only(this.keys));
    request.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
    request.setConsistentRead(true);
    return request;
}
项目:jcabi-dynamo    文件:AwsIterator.java   
@Override
@SuppressWarnings("PMD.UseConcurrentHashMap")
public void remove() {
    synchronized (this.dosage) {
        final AmazonDynamoDB aws = this.credentials.aws();
        try {
            final Dosage prev = this.dosage.get();
            final List<Map<String, AttributeValue>> items =
                new ArrayList<Map<String, AttributeValue>>(prev.items());
            final Map<String, AttributeValue> item =
                items.remove(this.position);
            final long start = System.currentTimeMillis();
            final DeleteItemResult res = aws.deleteItem(
                new DeleteItemRequest()
                    .withTableName(this.name)
                    .withKey(new Attributes(item).only(this.keys))
                    .withReturnConsumedCapacity(
                        ReturnConsumedCapacity.TOTAL
                    )
                    .withExpected(
                        new Attributes(item).only(this.keys).asKeys()
                    )
            );
            this.dosage.set(new AwsIterator.Fixed(prev, items));
            --this.position;
            Logger.info(
                this,
                "#remove(): item #%d removed from DynamoDB, %s, in %[ms]s",
                this.position,
                new PrintableConsumedCapacity(
                    res.getConsumedCapacity()
                ).print(),
                System.currentTimeMillis() - start
            );
        } finally {
            aws.shutdown();
        }
    }
}
项目:jcabi-dynamo    文件:AwsTable.java   
@Override
public Item put(final Map<String, AttributeValue> attributes)
    throws IOException {
    final AmazonDynamoDB aws = this.credentials.aws();
    try {
        final PutItemRequest request = new PutItemRequest();
        request.setTableName(this.self);
        request.setItem(attributes);
        request.setReturnValues(ReturnValue.NONE);
        request.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
        final PutItemResult result = aws.putItem(request);
        final long start = System.currentTimeMillis();
        Logger.info(
            this, "#put('%[text]s'): created item in '%s', %s, in %[ms]s",
            attributes, this.self,
            new PrintableConsumedCapacity(
                result.getConsumedCapacity()
            ).print(),
            System.currentTimeMillis() - start
        );
        return new AwsItem(
            this.credentials,
            this.frame(),
            this.self,
            new Attributes(attributes).only(this.keys()),
            new Array<String>(this.keys())
        );
    } catch (final AmazonClientException ex) {
        throw new IOException(
            String.format(
                "failed to put into \"%s\" with %s",
                this.self, attributes
            ),
            ex
        );
    } finally {
        aws.shutdown();
    }
}
项目:jcabi-dynamo    文件:AwsTable.java   
@Override
public void delete(final Map<String, AttributeValue> attributes)
    throws IOException {
    final AmazonDynamoDB aws = this.credentials.aws();
    try {
        final DeleteItemRequest request = new DeleteItemRequest();
        request.setTableName(this.self);
        request.setKey(attributes);
        request.setReturnValues(ReturnValue.NONE);
        request.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
        final DeleteItemResult result = aws.deleteItem(request);
        final long start = System.currentTimeMillis();
        Logger.info(
            this,
            "#delete('%[text]s'): deleted item in '%s', %s, in %[ms]s",
            attributes, this.self,
            new PrintableConsumedCapacity(
                result.getConsumedCapacity()
            ).print(),
            System.currentTimeMillis() - start
        );
    } catch (final AmazonClientException ex) {
        throw new IOException(
            String.format(
                "failed to delete at \"%s\" by keys %s",
                this.self, attributes
            ),
            ex
        );
    } finally {
        aws.shutdown();
    }
}
项目:jcabi-dynamo    文件:ScanValve.java   
@Override
public Dosage fetch(final Credentials credentials,
    final String table, final Map<String, Condition> conditions,
    final Collection<String> keys) throws IOException {
    final AmazonDynamoDB aws = credentials.aws();
    try {
        final Collection<String> attrs = new HashSet<String>(
            Arrays.asList(this.attributes)
        );
        attrs.addAll(keys);
        final ScanRequest request = new ScanRequest()
            .withTableName(table)
            .withAttributesToGet(attrs)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
            .withScanFilter(conditions)
            .withLimit(this.limit);
        final long start = System.currentTimeMillis();
        final ScanResult result = aws.scan(request);
        Logger.info(
            this,
            "#items(): loaded %d item(s) from '%s' using %s, %s, in %[ms]s",
            result.getCount(), table, conditions,
            new PrintableConsumedCapacity(
                result.getConsumedCapacity()
            ).print(),
            System.currentTimeMillis() - start
        );
        return new ScanValve.NextDosage(credentials, request, result);
    } catch (final AmazonClientException ex) {
        throw new IOException(
            String.format(
                "failed to fetch from \"%s\" by %s and %s",
                table, conditions, keys
            ),
            ex
        );
    } finally {
        aws.shutdown();
    }
}
项目:dynamodb-geo    文件:DynamoDBManager.java   
/**
 * Query Amazon DynamoDB
 * 
 * @param hashKey
 *            Hash key for the query request.
 * 
 * @param range
 *            The range of geohashs to query.
 * 
 * @return The query result.
 */
public List<QueryResult> queryGeohash(QueryRequest queryRequest, long hashKey, GeohashRange range) {
    List<QueryResult> queryResults = new ArrayList<QueryResult>();
    Map<String, AttributeValue> lastEvaluatedKey = null;

    do {
        Map<String, Condition> keyConditions = new HashMap<String, Condition>();

        Condition hashKeyCondition = new Condition().withComparisonOperator(ComparisonOperator.EQ)
                .withAttributeValueList(new AttributeValue().withN(String.valueOf(hashKey)));
        keyConditions.put(config.getHashKeyAttributeName(), hashKeyCondition);

        AttributeValue minRange = new AttributeValue().withN(Long.toString(range.getRangeMin()));
        AttributeValue maxRange = new AttributeValue().withN(Long.toString(range.getRangeMax()));

        Condition geohashCondition = new Condition().withComparisonOperator(ComparisonOperator.BETWEEN)
                .withAttributeValueList(minRange, maxRange);
        keyConditions.put(config.getGeohashAttributeName(), geohashCondition);

        queryRequest.withTableName(config.getTableName()).withKeyConditions(keyConditions)
                .withIndexName(config.getGeohashIndexName()).withConsistentRead(true)
                .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withExclusiveStartKey(lastEvaluatedKey);

        QueryResult queryResult = config.getDynamoDBClient().query(queryRequest);
        queryResults.add(queryResult);

        lastEvaluatedKey = queryResult.getLastEvaluatedKey();

    } while (lastEvaluatedKey != null);

    return queryResults;
}
项目:dynamodb-janusgraph-storage-backend    文件:AbstractDynamoDbStore.java   
protected UpdateItemRequest createUpdateItemRequest() {
    return new UpdateItemRequest()
            .withTableName(tableName)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
}
项目:dynamodb-janusgraph-storage-backend    文件:AbstractDynamoDbStore.java   
protected GetItemRequest createGetItemRequest() {
    return new GetItemRequest()
            .withTableName(tableName)
            .withConsistentRead(forceConsistentRead)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
}
项目:dynamodb-janusgraph-storage-backend    文件:AbstractDynamoDbStore.java   
protected DeleteItemRequest createDeleteItemRequest() {
    return new DeleteItemRequest()
            .withTableName(tableName)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
}
项目:dynamodb-janusgraph-storage-backend    文件:AbstractDynamoDbStore.java   
protected QueryRequest createQueryRequest() {
    return new QueryRequest()
            .withTableName(tableName)
            .withConsistentRead(forceConsistentRead)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
}
项目:dynamodb-janusgraph-storage-backend    文件:AbstractDynamoDbStore.java   
protected ScanRequest createScanRequest() {
    return new ScanRequest().withTableName(tableName)
            .withConsistentRead(forceConsistentRead)
            .withLimit(client.scanLimit(tableName))
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
}
项目:dynamodb-online-index-violation-detector    文件:TableWriter.java   
protected BatchWriteItemRequest genBatchWriteItemRequest() {
    Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
    requestItems.put(tableName, batchDeleteRequests);
    return new BatchWriteItemRequest().withRequestItems(requestItems).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
}
项目:reinvent2015-practicaldynamodb    文件:SegmentScannerFunctionHandler.java   
@Override
  public Object handleRequest(SegmentScannerInput input, Context context) {
      context.getLogger().log("Input: " + input.toJson() + "\n");
      context.getLogger().log("Start scanning segment " + input.getSegment() + "\n");

      DynamoDB dynamodb = new DynamoDB(Regions.US_WEST_2);

      // update tracking table in DynamoDB stating that we're in progress
dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem(
        new Item().withPrimaryKey(SEGMENT, input.getSegment())
        .withString(STATUS, STATUS_IN_PROGRESS));

      ScanSpec scanSpec = new ScanSpec()
              .withMaxPageSize(MAX_PAGE_SIZE)
              .withSegment(input.getSegment())
              .withTotalSegments(input.getTotalSegments())
              .withConsistentRead(true)
              .withMaxResultSize(MAX_RESULT_SIZE)
              .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

      // if resuming an in-progress segment, specify the start key here
if (input.getStartScore() != null) {
        scanSpec.withExclusiveStartKey(SCORE_ID, input.getStartScore());
      }

      RateLimiter rateLimiter = RateLimiter.create(input.getMaxConsumedCapacity());

      Map<String, AttributeValue> lastEvaluatedKey = null;
      Table scoresTable = dynamodb.getTable(SCORE_TABLE_NAME);

for (Page<Item, ScanOutcome> scanResultPage : scoresTable.scan(scanSpec).pages()) {
          // process items
          for (Item item : scanResultPage) {
              DataTransformer.HIGH_SCORES_BY_DATE_TRANSFORMER.transform(item, dynamodb);
          }

          /*
     * After reading each page, we acquire the consumed capacity from
     * the RateLimiter.
     *
     * For more information on using RateLimiter with DynamoDB scans,
     * see "Rate Limited Scans in Amazon DynamoDB"
     * on the AWS Java Development Blog:
     * https://java.awsblog.com/post/Tx3VAYQIZ3Q0ZVW
     */
          ScanResult scanResult = scanResultPage.getLowLevelResult().getScanResult();
          lastEvaluatedKey = scanResult.getLastEvaluatedKey();
          double consumedCapacity = scanResult.getConsumedCapacity().getCapacityUnits();
          rateLimiter.acquire((int)Math.round(consumedCapacity));

    // forego processing additional pages if we're running out of time
    if (context.getRemainingTimeInMillis() < REMAINING_TIME_CUTOFF) {
            break;
        }
      }

if (lastEvaluatedKey != null && !lastEvaluatedKey.isEmpty()) {
    Entry<String, AttributeValue> entry = lastEvaluatedKey.entrySet()
            .iterator().next();
    String lastScoreId = entry.getValue().getS();

    dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem(
            new Item()
                    .withPrimaryKey(SEGMENT, input.getSegment())
                    .withString(STATUS, STATUS_INCOMPLETE)
                    .withString(LAST_SCORE_ID, lastScoreId));
    return false;
}

      // update tracking table in DynamoDB stating that we're done
dynamodb.getTable(FUNCTION_TRACKER_TABLE_NAME).putItem(
        new Item().withPrimaryKey(SEGMENT, input.getSegment())
        .withString(STATUS, STATUS_DONE));

      context.getLogger().log("Finish scanning segment " + input.getSegment() + "\n");
      return true;
  }
项目:widow    文件:IndexWorker.java   
private void writeToDynamo() throws RetryFailedException, InterruptedException {
    Map<String, AttributeValue> attributeValueMap = indexInput.getAttributes().entrySet()
            .stream()
            .filter((entry) -> entry.getValue() != null)
            .filter((entry) -> StringUtils.isNotBlank(entry.getValue().toString()))
            .collect(Collectors.toMap(
                    e -> e.getKey().toString(),
                    e -> {
                        try {
                            switch (e.getKey().getType()) {
                                case LONG:
                                case DOUBLE:
                                    return new AttributeValue().withN(e.getValue().toString());

                                default:
                                    String serializedData = null;

                                    // use objectMapper for everything but strings because it
                                    // adds quotes to raw strings
                                    if (e.getValue().getClass() != String.class) {
                                        serializedData = objectMapper.writeValueAsString(e.getValue());
                                    } else {
                                        serializedData = (String) e.getValue();
                                    }

                                    return new AttributeValue().withS(serializedData);
                            }
                        } catch (Exception ex) {
                            logger.error("Couldn't serialize data to index.", ex);
                            return null;
                        }
                    }
            ));

    String tableName = config.getString(TABLE_NAME_CONFIG_KEY);

    PutItemRequest putItemRequest = new PutItemRequest()
            .withTableName(tableName)
            .withItem(attributeValueMap)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

    final PutItemResult putItemResult = retry(() -> dynamoDBClient.putItem(putItemRequest));

    logger.info("Consumed table capacity: " + putItemResult.getConsumedCapacity().getCapacityUnits());
}
项目:aws-dynamodb-examples    文件:LowLevelBatchWrite.java   
private static void writeMultipleItemsBatchWrite() {
    try {                    

       // Create a map for the requests in the batch
       Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();

       // Create a PutRequest for a new Forum item
       Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
       forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
       forumItem.put("Threads", new AttributeValue().withN("0"));

       List<WriteRequest> forumList = new ArrayList<WriteRequest>();
       forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
       requestItems.put(table1Name, forumList);

       // Create a PutRequest for a new Thread item
       Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
       threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
       threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
       threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message"));
       threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory")));

       List<WriteRequest> threadList = new ArrayList<WriteRequest>();
       threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));

       // Create a DeleteRequest for a Thread item
       Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); 
       threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3"));
       threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100"));

       threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
       requestItems.put(table2Name, threadList);

       BatchWriteItemResult result;
       BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
           .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

       do {
           System.out.println("Making the request.");

           batchWriteItemRequest.withRequestItems(requestItems);
           result = client.batchWriteItem(batchWriteItemRequest);

           // Print consumed capacity units
           for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
                String tableName = consumedCapacity.getTableName();
                Double consumedCapacityUnits = consumedCapacity.getCapacityUnits();
                System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits);
           }

           // Check for unprocessed keys which could happen if you exceed provisioned throughput
           System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems());
           requestItems = result.getUnprocessedItems();
       } while (result.getUnprocessedItems().size() > 0);

    }  catch (AmazonServiceException ase) {
        System.err.println("Failed to retrieve items: ");
        ase.printStackTrace(System.err);
    }  

}
项目:jcabi-dynamo    文件:QueryValve.java   
@Override
public Dosage fetch(final Credentials credentials, final String table,
    final Map<String, Condition> conditions, final Collection<String> keys)
    throws IOException {
    final AmazonDynamoDB aws = credentials.aws();
    try {
        final Collection<String> attrs = new HashSet<String>(
            Arrays.asList(this.attributes)
        );
        attrs.addAll(keys);
        QueryRequest request = new QueryRequest()
            .withTableName(table)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
            .withKeyConditions(conditions)
            .withConsistentRead(this.consistent)
            .withScanIndexForward(this.forward)
            .withSelect(this.select)
            .withLimit(this.limit);
        if (this.select.equals(Select.SPECIFIC_ATTRIBUTES.toString())) {
            request = request.withAttributesToGet(attrs);
        }
        if (!this.index.isEmpty()) {
            request = request.withIndexName(this.index);
        }
        final long start = System.currentTimeMillis();
        final QueryResult result = aws.query(request);
        Logger.info(
            this,
            "#items(): loaded %d item(s) from '%s' using %s, %s, in %[ms]s",
            result.getCount(), table, conditions,
            new PrintableConsumedCapacity(
                result.getConsumedCapacity()
            ).print(),
            System.currentTimeMillis() - start
        );
        return new QueryValve.NextDosage(credentials, request, result);
    } catch (final AmazonClientException ex) {
        throw new IOException(
            String.format(
                "failed to fetch from \"%s\" by %s and %s",
                table, conditions, keys
            ),
            ex
        );
    } finally {
        aws.shutdown();
    }
}