/** * Builds query expression depending on query type (range or scan) */ public void buildExpression(){ AttributeValue hashAttrValue = buildKeyHashAttribute(); if (hashAttrValue == null) throw new IllegalStateException("There is not a key schema defined."); if (DynamoDBQuery.getType().equals(RANGE_QUERY)){ Condition newCondition = buildRangeCondition(); buildQueryExpression(newCondition, hashAttrValue); } if (DynamoDBQuery.getType().equals(SCAN_QUERY)) buildScanExpression(hashAttrValue); }
/** * Builds range query expression * @param pNewConditionCondition for querying * @param pHashAttrValueHash attribute value where to start */ public void buildQueryExpression(Condition pNewCondition, AttributeValue pHashAttrValue) { DynamoDBQueryExpression newQueryExpression = new DynamoDBQueryExpression(pHashAttrValue); newQueryExpression.setConsistentRead(getConsistencyReadLevel()); newQueryExpression.setRangeKeyCondition(pNewCondition); dynamoDBExpression = newQueryExpression; }
/** * Builds key scan condition using scan comparator, and hash key attribute * @return */ private Condition buildKeyScanCondition(){ Condition scanKeyCondition = new Condition(); scanKeyCondition.setComparisonOperator(getScanCompOp()); scanKeyCondition.withAttributeValueList(buildKeyHashAttribute()); return scanKeyCondition; }
@Override public Object call() throws Exception { running = true; long deleteEpoch = System.currentTimeMillis() - age; Map<String, Condition> filter = new HashMap<String, Condition>(); AttributeValue value = new AttributeValue(); value.setN(deleteEpoch + ""); Condition c = new Condition(); c.setComparisonOperator(ComparisonOperator.LT); c.setAttributeValueList(Collections.singletonList(value)); filter.put("epoch", c); ScanRequest scan = new ScanRequest(MetastoreJanitor.tableName); scan.setScanFilter(filter); scan.setLimit( (int) limiter.getRate()); ScanResult result; int scanTotal = 0; int matched = 0; do { //Can't set a hard limit on the queue since paths can be resubmitted by delete task synchronized (deleteQueue) { while (deleteQueue.size() >= queueSize) { deleteQueue.wait(); } } if(!running) { break; } result = db.scan(scan); scanTotal += result.getScannedCount(); matched += result.getCount(); log.info(String.format("Total scanned: %d, matched: %d, added: %d, queue size: %d, consumed capacity: %f, max rate: %f", scanTotal, matched, result.getCount(), deleteQueue.size(), result.getConsumedCapacityUnits(), limiter.getRate())); for (Map<String, AttributeValue> i : result.getItems()) { if (!i.containsKey("epoch")) { continue; } deleteQueue.put(new Key(i.get(DynamoDBMetastore.HASH_KEY), i.get(DynamoDBMetastore.RANGE_KEY))); } limiter.acquire(result.getConsumedCapacityUnits().intValue()); scan.setExclusiveStartKey(result.getLastEvaluatedKey()); } while (running && result.getLastEvaluatedKey() != null); return Boolean.TRUE; }