/** * Tests deleting items using a query */ @Override public void assertTestDeleteByQueryDataStore(){ try { log.info("test method: TestDeleteByQuery using DynamoDB store."); DynamoDBKey<String, String> dKey = new DynamoDBKey<String, String>(); dKey.setHashKey("NOWHERE"); dKey.setRangeKey("10/10/1880"); person p1 = buildPerson(dKey.getHashKey().toString(), dKey.getRangeKey().toString(), "John", "Doe", "Peru", "Brazil", "Ecuador"); dataStore.put(dKey, p1); dKey.setRangeKey("11/10/1707"); person p2 = buildPerson(dKey.getHashKey().toString(), dKey.getRangeKey().toString(), "Juan", "Perez", "Germany", "USA", "Scotland"); dataStore.put(dKey, p2); DynamoDBQuery.setScanCompOp(ComparisonOperator.LE); DynamoDBQuery.setType(DynamoDBQuery.SCAN_QUERY); Query<DynamoDBKey, person> query = new DynamoDBQuery<DynamoDBKey, person>(); query.setKey(dKey); log.info("Number of records deleted: "+ dataStore.deleteByQuery(query)); } catch (Exception e) { log.error("Error while running test: TestDeleteByQuery"); e.printStackTrace(); } }
/** * Method to query the data store */ @Override public void assertTestQueryDataStore(){ log.info("test method: testQuery using DynamoDB store."); try { DynamoDBKey<String, String> dKey = new DynamoDBKey<String, String>(); dKey.setHashKey("Peru"); DynamoDBQuery.setScanCompOp(ComparisonOperator.LE); DynamoDBQuery.setType(DynamoDBQuery.SCAN_QUERY); Query<DynamoDBKey, person> query = new DynamoDBQuery<DynamoDBKey, person>(); query.setKey(dKey); Result<DynamoDBKey, person> queryResult = dataStore.execute(query); processQueryResult(queryResult); } catch (Exception e) { log.error("error in test method: testQuery."); e.printStackTrace(); } }
/** * Method to query items into the data store */ @Override public void assertTestQueryKeyRange(){ log.info("test method: testQueryKeyRange using specific data store."); try { DynamoDBKey<String, String> dKey = new DynamoDBKey<String, String>(); DynamoDBKey<String, String> startKey = new DynamoDBKey<String, String>(); DynamoDBKey<String, String> endKey = new DynamoDBKey<String, String>(); dKey.setHashKey("Peru"); startKey.setRangeKey("01/01/1700"); endKey.setRangeKey("31/12/1900"); DynamoDBQuery.setRangeCompOp(ComparisonOperator.BETWEEN); DynamoDBQuery.setType(DynamoDBQuery.RANGE_QUERY); Query<DynamoDBKey, person> query = new DynamoDBQuery<DynamoDBKey, person>(); query.setKey(dKey); query.setStartKey(startKey); query.setEndKey(endKey); Result<DynamoDBKey, person> queryResult = dataStore.execute(query); processQueryResult(queryResult); } catch (Exception e) { log.error("error in test method: testQueryKeyRange."); e.printStackTrace(); } }
/** * Gets scan comparator operator * @return */ public static ComparisonOperator getScanCompOp() { if (scanCompOp == null) scanCompOp = ComparisonOperator.GE; return scanCompOp; }
/** * Gets range query comparator operator * @return */ public static ComparisonOperator getRangeCompOp(){ if (rangeCompOp == null) rangeCompOp = ComparisonOperator.BETWEEN; return rangeCompOp; }
@Override public Object call() throws Exception { running = true; long deleteEpoch = System.currentTimeMillis() - age; Map<String, Condition> filter = new HashMap<String, Condition>(); AttributeValue value = new AttributeValue(); value.setN(deleteEpoch + ""); Condition c = new Condition(); c.setComparisonOperator(ComparisonOperator.LT); c.setAttributeValueList(Collections.singletonList(value)); filter.put("epoch", c); ScanRequest scan = new ScanRequest(MetastoreJanitor.tableName); scan.setScanFilter(filter); scan.setLimit( (int) limiter.getRate()); ScanResult result; int scanTotal = 0; int matched = 0; do { //Can't set a hard limit on the queue since paths can be resubmitted by delete task synchronized (deleteQueue) { while (deleteQueue.size() >= queueSize) { deleteQueue.wait(); } } if(!running) { break; } result = db.scan(scan); scanTotal += result.getScannedCount(); matched += result.getCount(); log.info(String.format("Total scanned: %d, matched: %d, added: %d, queue size: %d, consumed capacity: %f, max rate: %f", scanTotal, matched, result.getCount(), deleteQueue.size(), result.getConsumedCapacityUnits(), limiter.getRate())); for (Map<String, AttributeValue> i : result.getItems()) { if (!i.containsKey("epoch")) { continue; } deleteQueue.put(new Key(i.get(DynamoDBMetastore.HASH_KEY), i.get(DynamoDBMetastore.RANGE_KEY))); } limiter.acquire(result.getConsumedCapacityUnits().intValue()); scan.setExclusiveStartKey(result.getLastEvaluatedKey()); } while (running && result.getLastEvaluatedKey() != null); return Boolean.TRUE; }
/** * Sets scan query comparator operator * @param scanCompOp */ public static void setScanCompOp(ComparisonOperator scanCompOp) { DynamoDBQuery.scanCompOp = scanCompOp; }
/** * Sets range query comparator operator * @param pRangeCompOp */ public static void setRangeCompOp(ComparisonOperator pRangeCompOp){ rangeCompOp = pRangeCompOp; }