private void storeResult(final Map<String, Object> result) { final String userId = (String) result.get("user_id"); final String userToken = (String) result.get("access_token"); final String teamId = (String) result.get("team_id"); @SuppressWarnings("unchecked") final Map<String, Object> bot = (Map<String, Object>) result.get("bot"); final String botToken = (String) bot.get("bot_access_token"); final String botId = (String) bot.get("bot_user_id"); final ArrayList<PutRequest> putRequests = new ArrayList<>(); putRequests.add(item("user:" + userId + ":token", userToken)); putRequests.add(item("user:" + botId + ":token", botToken)); putRequests.add(item("team:" + teamId + ":botuser", botId)); final List<WriteRequest> writeRequests = putRequests.stream() .map(WriteRequest::new) .collect(Collectors.toList()); final HashMap<String, List<WriteRequest>> requestItems = new HashMap<>(); requestItems.put(TableName, writeRequests); final BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest().withRequestItems(requestItems); ddb.batchWriteItem(batchWriteItemRequest); }
public int sendDeleteRequests() throws IllegalArgumentException { if (batchDeleteRequests.isEmpty()) { return 0; } BatchWriteItemRequest batchWriteItemRequest = genBatchWriteItemRequest(); BatchWriteItemResult bathWriteResult = sendBatchWriteRequest(batchWriteItemRequest); int undeletedItemNum = countAndPrintUndeletedItems(bathWriteResult); if(!isRunningOnDDBLocal) { // DDB Local does not support rate limiting tableWriteRateLimiter.adjustRateWithConsumedCapacity(bathWriteResult.getConsumedCapacity()); } int deletedRequest = batchDeleteRequests.size() - undeletedItemNum; totalNumOfItemsDeleted += deletedRequest; PrintHelper.printDeleteProgressInfo(deletedRequest, totalNumOfItemsDeleted); batchDeleteRequests = new ArrayList<WriteRequest>(); return deletedRequest; }
/** * Writes multiple items in batch. * @param items a map of tables->write requests */ protected static void batchWrite(Map<String, List<WriteRequest>> items) { if (items == null || items.isEmpty()) { return; } try { BatchWriteItemResult result = getClient().batchWriteItem(new BatchWriteItemRequest(). withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withRequestItems(items)); if (result == null) { return; } logger.debug("batchWrite(): total {}, cc {}", items.size(), result.getConsumedCapacity()); if (result.getUnprocessedItems() != null && !result.getUnprocessedItems().isEmpty()) { Thread.sleep(1000); logger.warn("{} UNPROCESSED write requests!", result.getUnprocessedItems().size()); batchWrite(result.getUnprocessedItems()); } } catch (Exception e) { logger.error(null, e); } }
@Override public <P extends ParaObject> void deleteAll(String appid, List<P> objects) { if (objects == null || objects.isEmpty() || StringUtils.isBlank(appid)) { return; } List<WriteRequest> reqs = new ArrayList<>(objects.size()); for (ParaObject object : objects) { if (object != null) { reqs.add(new WriteRequest().withDeleteRequest(new DeleteRequest(). withKey(Collections.singletonMap(Config._KEY, new AttributeValue(getKeyForAppid(object.getId(), appid)))))); } } batchWrite(Collections.singletonMap(getTableNameForAppid(appid), reqs)); logger.debug("DAO.deleteAll() {}", objects.size()); }
private void commitPartial(List<WriteRequest> list) { Timer t = new Timer(); Map<String, List<WriteRequest>> map = new HashMap<>(); map.put(getTenant().getName(), list); BatchWriteItemResult result = m_client.batchWriteItem(new BatchWriteItemRequest(map)); int retry = 0; while(result.getUnprocessedItems().size() > 0) { if(retry == RETRY_SLEEPS.length) throw new RuntimeException("All retries failed"); m_logger.debug("Committing {} unprocessed items, retry: {}", result.getUnprocessedItems().size(), retry + 1); try { Thread.sleep(RETRY_SLEEPS[retry++]); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } result = m_client.batchWriteItem(new BatchWriteItemRequest(result.getUnprocessedItems())); } m_logger.debug("Committed {} writes in {}", list.size(), t); list.clear(); }
public CompletableFuture<BatchWriteItemResult> batchWriteItem(final Map<String, List<WriteRequest>> requestItems) { return asyncExecutor.execute(new Callable<BatchWriteItemResult>() { @Override public BatchWriteItemResult call() throws Exception { return dbExecutor.batchWriteItem(requestItems); } }); }
public BatchWriteItemResult putBatch(String tableName, Map<String, AttributeValue> item, long maxItemsPerBatch, Reporter reporter) throws UnsupportedEncodingException { int itemSizeBytes = DynamoDBUtil.getItemSizeBytes(item); if (itemSizeBytes > maxItemByteSize) { throw new RuntimeException("Cannot pass items with size greater than " + maxItemByteSize + ". Item with size of " + itemSizeBytes + " was given."); } maxItemsPerBatch = DynamoDBUtil.getBoundedBatchLimit(config, maxItemsPerBatch); BatchWriteItemResult result = null; if (writeBatchMap.containsKey(tableName)) { boolean writeRequestsForTableAtLimit = writeBatchMap.get(tableName).size() >= maxItemsPerBatch; boolean totalSizeOfWriteBatchesOverLimit = writeBatchMapSizeBytes + itemSizeBytes > maxBatchSize; if (writeRequestsForTableAtLimit || totalSizeOfWriteBatchesOverLimit) { result = writeBatch(reporter, itemSizeBytes); } } // writeBatchMap could be cleared from writeBatch() List<WriteRequest> writeBatchList; if (!writeBatchMap.containsKey(tableName)) { writeBatchList = new ArrayList<>((int) maxItemsPerBatch); writeBatchMap.put(tableName, writeBatchList); } else { writeBatchList = writeBatchMap.get(tableName); } writeBatchList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(item))); writeBatchMapSizeBytes += itemSizeBytes; return result; }
@Override public void write(K key, V value) throws IOException { if (value == null) { throw new RuntimeException("Null record encountered. At least the key columns must be " + "specified."); } verifyInterval(); if (progressable != null) { progressable.progress(); } DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value); BatchWriteItemResult result = client.putBatch(tableName, item.getItem(), permissibleWritesPerSecond - writesPerSecond, reporter); batchSize++; totalItemsWritten++; if (result != null) { if (result.getConsumedCapacity() != null) { for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) { double consumedUnits = consumedCapacity.getCapacityUnits(); totalIOPSConsumed += consumedUnits; } } int unprocessedItems = 0; for (List<WriteRequest> requests : result.getUnprocessedItems().values()) { unprocessedItems += requests.size(); } writesPerSecond += batchSize - unprocessedItems; batchSize = unprocessedItems; } }
private Map<String, List<WriteRequest>> toWritesByTable(Iterator<SinkRecord> recordIterator) { final Map<String, List<WriteRequest>> writesByTable = new HashMap<>(); for (int count = 0; recordIterator.hasNext() && count < config.batchSize; count++) { final SinkRecord record = recordIterator.next(); final WriteRequest writeRequest = new WriteRequest(toPutRequest(record)); writesByTable.computeIfAbsent(tableName(record), k -> new ArrayList<>(config.batchSize)).add(writeRequest); } return writesByTable; }
private static String makeMessage(Map<String, List<WriteRequest>> unprocessedItems) { final StringBuilder msg = new StringBuilder("Unprocessed writes: {"); for (Map.Entry<String, List<WriteRequest>> e : unprocessedItems.entrySet()) { msg.append(" ").append(e.getKey()).append("(").append(e.getValue().size()).append(")").append(" "); } msg.append("}"); return msg.toString(); }
@Override public void putItems(T... items) { if ( null == items || 0 == items.length ) return; for ( int chunk=0; chunk < items.length; chunk += DDB_MAX_BATCH_WRITE_ITEM ) { TableWriteItems request = new TableWriteItems(_tableName); int max = Math.min(items.length-chunk, DDB_MAX_BATCH_WRITE_ITEM); for ( int idx=0; idx < max; idx++ ) { request.addItemToPut(_encryption.encrypt(toItem(items[chunk+idx]))); } BatchWriteItemOutcome response = maybeBackoff(false, () -> _dynamodb.batchWriteItem(request)); while ( true ) { if ( null == response.getUnprocessedItems() ) break; List<WriteRequest> unprocessed = response.getUnprocessedItems().get(_tableName); if ( null == unprocessed || unprocessed.size() == 0 ) { resetPTE(null); break; } if(LOG.isDebugEnabled()) LOG.debug("putItems() unprocessed: "+unprocessed.size()); gotPTE(false); try { Thread.sleep(backoffSleep(false)); } catch ( InterruptedException ex ) { Thread.currentThread().interrupt(); throw new AbortedException(ex); } Map<String,List<WriteRequest>> unproc = response.getUnprocessedItems(); response = maybeBackoff(false, () -> _dynamodb.batchWriteItemUnprocessed(unproc)); } } }
@Override public void deleteItems(IndexKey... keys) { if ( null == keys || 0 == keys.length ) return; for ( int chunk=0; chunk < keys.length; chunk += DDB_MAX_BATCH_WRITE_ITEM ) { TableWriteItems request = new TableWriteItems(_tableName); int max = Math.min(keys.length-chunk, DDB_MAX_BATCH_WRITE_ITEM); for ( int idx=0; idx < max; idx++ ) { IndexKey key = keys[chunk+idx]; if ( null == key ) continue; request.addPrimaryKeyToDelete(toPrimaryKey(key)); } BatchWriteItemOutcome response = maybeBackoff(false, () -> _dynamodb.batchWriteItem(request)); while ( true ) { if ( null == response.getUnprocessedItems() ) break; List<WriteRequest> unprocessed = response.getUnprocessedItems().get(_tableName); if ( null == unprocessed || unprocessed.size() == 0 ) { resetPTE(null); break; } if(LOG.isDebugEnabled()) LOG.debug("deleteItems() unprocessed: "+unprocessed.size()); gotPTE(false); try { Thread.sleep(backoffSleep(false)); } catch ( InterruptedException ex ) { Thread.currentThread().interrupt(); throw new AbortedException(ex); } Map<String,List<WriteRequest>> unproc = response.getUnprocessedItems(); response = maybeBackoff(false, () -> _dynamodb.batchWriteItemUnprocessed(unproc)); } } }
/** * * @{inheritDoc */ @Override public void addTimingResults(final @Nonnull String tableName, final @Nonnull List<TankResult> results, boolean async) { if (!results.isEmpty()) { Runnable task = new Runnable() { public void run() { MethodTimer mt = new MethodTimer(logger, this.getClass(), "addTimingResults (" + results + ")"); List<WriteRequest> requests = new ArrayList<WriteRequest>(); try { for (TankResult result : results) { Map<String, AttributeValue> item = getTimingAttributes(result); PutRequest putRequest = new PutRequest().withItem(item); WriteRequest writeRequest = new WriteRequest().withPutRequest(putRequest); requests.add(writeRequest); } sendBatch(tableName, requests); } catch (Exception t) { logger.error("Error adding results: " + t.getMessage(), t); throw new RuntimeException(t); } mt.endAndLog(); } }; if (async) { EXECUTOR.execute(task); } else { task.run(); } } }
/** * @{inheritDoc */ @Override public void addItems(final String tableName, List<Item> itemList, final boolean asynch) { if (!itemList.isEmpty()) { final List<Item> items = new ArrayList<Item>(itemList); Runnable task = new Runnable() { public void run() { MethodTimer mt = new MethodTimer(logger, this.getClass(), "addItems (" + items + ")"); List<WriteRequest> requests = new ArrayList<WriteRequest>(); try { for (Item item : items) { Map<String, AttributeValue> toInsert = itemToMap(item); PutRequest putRequest = new PutRequest().withItem(toInsert); WriteRequest writeRequest = new WriteRequest().withPutRequest(putRequest); requests.add(writeRequest); } sendBatch(tableName, requests); } catch (Exception t) { logger.error("Error adding results: " + t.getMessage(), t); throw new RuntimeException(t); } mt.endAndLog(); } }; if (asynch) { EXECUTOR.execute(task); } else { task.run(); } } }
/** * @param tableName * @param requests */ private void sendBatch(final String tableName, List<WriteRequest> requests) { int numBatches = (int) Math.ceil(requests.size() / (BATCH_SIZE * 1D)); for (int i = 0; i < numBatches; i++) { Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); List<WriteRequest> batch = requests.subList(i * BATCH_SIZE, Math.min(i * BATCH_SIZE + BATCH_SIZE, requests.size())); requestItems.put(tableName, batch); addItemsToTable(tableName, new BatchWriteItemRequest().withRequestItems(requestItems)); } }
/** * Writes to DynamoDBTable using an exponential backoff. If the * batchWriteItem returns unprocessed items then it will exponentially * backoff and retry the unprocessed items. */ public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) { BatchWriteItemResult writeItemResult = null; List<ConsumedCapacity> consumedCapacities = new LinkedList<ConsumedCapacity>(); Map<String, List<WriteRequest>> unprocessedItems = null; boolean interrupted = false; try { do { writeItemResult = client.batchWriteItem(req); unprocessedItems = writeItemResult.getUnprocessedItems(); consumedCapacities .addAll(writeItemResult.getConsumedCapacity()); if (unprocessedItems != null) { req.setRequestItems(unprocessedItems); try { Thread.sleep(exponentialBackoffTime); } catch (InterruptedException ie) { interrupted = true; } finally { exponentialBackoffTime *= 2; if (exponentialBackoffTime > BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME) { exponentialBackoffTime = BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME; } } } } while (unprocessedItems != null && unprocessedItems.get(tableName) != null); return consumedCapacities; } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
/** * Splits up a ScanResult into a list of BatchWriteItemRequests of size 25 * items or less each. */ public static List<BatchWriteItemRequest> splitResultIntoBatches( ScanResult result, String tableName) { List<BatchWriteItemRequest> batches = new LinkedList<BatchWriteItemRequest>(); Iterator<Map<String, AttributeValue>> it = result.getItems().iterator(); BatchWriteItemRequest req = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); List<WriteRequest> writeRequests = new LinkedList<WriteRequest>(); int i = 0; while (it.hasNext()) { PutRequest put = new PutRequest(it.next()); writeRequests.add(new WriteRequest(put)); i++; if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); req = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); writeRequests = new LinkedList<WriteRequest>(); i = 0; } } if (i > 0) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); } return batches; }
/** * Constructor for unit test. */ public TableWriter(String tableName, String tableHashKeyName, String tableRangeKeyName, AmazonDynamoDBClient dynamoDBClient, RateLimiter rateLimiter, ArrayList<WriteRequest> batchDeleteRequests) { this.tableName = tableName; this.tableHashKeyName = tableHashKeyName; this.tableRangeKeyName = tableRangeKeyName; this.dynamoDBClient = dynamoDBClient; this.batchDeleteRequests = batchDeleteRequests; }
public TableWriter(Options options, TableHelper tableHelper, AmazonDynamoDBClient dynamoDBClient, int numOfTasks, boolean isRunningOnDDBLocal) { this.tableName = options.getTableName(); this.tableHashKeyName = tableHelper.getTableHashKeyName(); this.tableRangeKeyName = tableHelper.getTableRangeKeyName(); this.dynamoDBClient = dynamoDBClient; this.totalNumOfItemsDeleted = 0; batchDeleteRequests = new ArrayList<WriteRequest>(); tableWriteRateLimiter = new TableRWRateLimiter(tableHelper.getWriteCapacityUnits(), options.getReadWriteIOPSPercent(), numOfTasks); TableWriter.isRunningOnDDBLocal = isRunningOnDDBLocal; }
protected int addDeleteRequest(AttributeValue tableHashKey, AttributeValue tableRangeKey) { Map<String, AttributeValue> primaryKey = genTablePrimaryKey(tableHashKey, tableRangeKey); batchDeleteRequests.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(primaryKey))); int deletedItems = 0; if (batchDeleteRequests.size() == MAX_BATCH_WRITE_REQUEST_NUM) { deletedItems = sendDeleteRequests(); } return deletedItems; }
protected int countAndPrintUndeletedItems(BatchWriteItemResult batchWriteResult) { if (!batchWriteResult.getUnprocessedItems().isEmpty()) { logger.warn("WARNING: UNPROCESSED ITEMS:"); List<WriteRequest> unprocessedRequests = (batchWriteResult.getUnprocessedItems()).get(tableName); int count = 0; for (WriteRequest w : unprocessedRequests) { PrintHelper.printItem(0, count++, w.getDeleteRequest().getKey()); } return unprocessedRequests.size(); } return 0; }
@Test public void testCountAndPrintUndeletedItems() { BatchWriteItemResult mockBatchWriteResult = Mockito.mock(BatchWriteItemResult.class); HashMap<String, List<WriteRequest>> mockUnProcessedItems = Mockito.mock(HashMap.class); Mockito.when(mockBatchWriteResult.getUnprocessedItems()).thenReturn(mockUnProcessedItems); Mockito.when(mockUnProcessedItems.isEmpty()).thenReturn(true); tableWriter.countAndPrintUndeletedItems(mockBatchWriteResult); Mockito.verify(mockUnProcessedItems, Mockito.times(1)).isEmpty(); Mockito.verify(mockUnProcessedItems, Mockito.times(0)).get(Mockito.anyString()); }
@Test public void test_batchWriteItem_WithAllParameters() throws Exception { createTable(); String TEST_ATTRIBUTE_2 = "Attribute2"; String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2"; Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); List<WriteRequest> writeRequests = new ArrayList<WriteRequest>(); Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>(); item1.put(TEST_ATTRIBUTE, new AttributeValue() .withS(TEST_ATTRIBUTE_VALUE)); WriteRequest writeRequest1 = new WriteRequest() .withPutRequest(new PutRequest() .withItem(item1)); writeRequests.add(writeRequest1); Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>(); item2.put(TEST_ATTRIBUTE_2, new AttributeValue() .withS(TEST_ATTRIBUTE_VALUE_2)); WriteRequest writeRequest2 = new WriteRequest() .withPutRequest(new PutRequest() .withItem(item2)); writeRequests.add(writeRequest2); requestItems.put(TEST_TABLE_NAME, writeRequests); BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems); List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity(); assertThat(consumedCapacities.size(), equalTo(writeRequests.size())); }
@Override public <P extends ParaObject> void createAll(String appid, List<P> objects) { if (objects == null || objects.isEmpty() || StringUtils.isBlank(appid)) { return; } List<WriteRequest> reqs = new ArrayList<>(objects.size()); int batchSteps = 1; if ((objects.size() > MAX_ITEMS_PER_WRITE)) { batchSteps = (objects.size() / MAX_ITEMS_PER_WRITE) + ((objects.size() % MAX_ITEMS_PER_WRITE > 0) ? 1 : 0); } Iterator<P> it = objects.iterator(); String tableName = getTableNameForAppid(appid); int j = 0; for (int i = 0; i < batchSteps; i++) { while (it.hasNext() && j < MAX_ITEMS_PER_WRITE) { ParaObject object = it.next(); if (StringUtils.isBlank(object.getId())) { object.setId(Utils.getNewId()); } if (object.getTimestamp() == null) { object.setTimestamp(Utils.timestamp()); } //if (updateOp) object.setUpdated(Utils.timestamp()); object.setAppid(appid); Map<String, AttributeValue> row = toRow(object, null); setRowKey(getKeyForAppid(object.getId(), appid), row); reqs.add(new WriteRequest().withPutRequest(new PutRequest().withItem(row))); j++; } batchWrite(Collections.singletonMap(tableName, reqs)); reqs.clear(); j = 0; } logger.debug("DAO.createAll() {}->{}", appid, (objects == null) ? 0 : objects.size()); }
public BatchWritePointResult batchWritePoints(List<PutPointRequest> putPointRequests) { BatchWriteItemRequest batchItemRequest = new BatchWriteItemRequest(); List<WriteRequest> writeRequests = new ArrayList<WriteRequest>(); for (PutPointRequest putPointRequest : putPointRequests) { long geohash = S2Manager.generateGeohash(putPointRequest.getGeoPoint()); long hashKey = S2Manager.generateHashKey(geohash, config.getHashKeyLength()); String geoJson = GeoJsonMapper.stringFromGeoObject(putPointRequest.getGeoPoint()); PutRequest putRequest = putPointRequest.getPutRequest(); AttributeValue hashKeyValue = new AttributeValue().withN(String.valueOf(hashKey)); putRequest.getItem().put(config.getHashKeyAttributeName(), hashKeyValue); putRequest.getItem().put(config.getRangeKeyAttributeName(), putPointRequest.getRangeKeyValue()); AttributeValue geohashValue = new AttributeValue().withN(Long.toString(geohash)); putRequest.getItem().put(config.getGeohashAttributeName(), geohashValue); AttributeValue geoJsonValue = new AttributeValue().withS(geoJson); putRequest.getItem().put(config.getGeoJsonAttributeName(), geoJsonValue); WriteRequest writeRequest = new WriteRequest(putRequest); writeRequests.add(writeRequest); } Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); requestItems.put(config.getTableName(), writeRequests); batchItemRequest.setRequestItems(requestItems); BatchWriteItemResult batchWriteItemResult = config.getDynamoDBClient().batchWriteItem(batchItemRequest); BatchWritePointResult batchWritePointResult = new BatchWritePointResult(batchWriteItemResult); return batchWritePointResult; }
public BatchWriteItemResult batchWriteItem(final Map<String, List<WriteRequest>> requestItems) { return dynamoDB.batchWriteItem(requestItems); }
public UnprocessedItemsException(Map<String, List<WriteRequest>> unprocessedItems) { super(makeMessage(unprocessedItems)); this.unprocessedItems = unprocessedItems; }
/** * @{inheritDoc */ @Override public void deleteForJob(final String tableName, final String jobId, final boolean asynch) { Runnable task = new Runnable() { public void run() { MethodTimer mt = new MethodTimer(logger, this.getClass(), "deleteForJob (" + jobId + ")"); List<Item> items = getItems(tableName, null, null, null, jobId); if (!items.isEmpty()) { List<WriteRequest> requests = new ArrayList<WriteRequest>(); try { for (Item item : items) { String id = null; for (Attribute attr : item.getAttributes()) { if (DatabaseKeys.REQUEST_NAME_KEY.getShortKey().equals(attr.getName())) { id = attr.getValue(); break; } } if (id != null) { Map<String, AttributeValue> keyMap = new HashMap<String, AttributeValue>(); keyMap.put(DatabaseKeys.REQUEST_NAME_KEY.getShortKey(), new AttributeValue().withS(id)); keyMap.put(DatabaseKeys.JOB_ID_KEY.getShortKey(), new AttributeValue().withS(jobId)); DeleteRequest deleteRequest = new DeleteRequest().withKey(keyMap); WriteRequest writeRequest = new WriteRequest().withDeleteRequest(deleteRequest); requests.add(writeRequest); } } sendBatch(tableName, requests); } catch (Exception t) { logger.error("Error adding results: " + t.getMessage(), t); throw new RuntimeException(t); } } mt.endAndLog(); } }; if (asynch) { EXECUTOR.execute(task); } else { task.run(); } }
public BatchWriteItemResult batchWriteItem(final BatchWriteItemRequest batchRequest) throws BackendException { int count = 0; for (Entry<String, List<WriteRequest>> entry : batchRequest.getRequestItems().entrySet()) { final String tableName = entry.getKey(); final List<WriteRequest> requests = entry.getValue(); count += requests.size(); if (count > BATCH_WRITE_MAX_NUMBER_OF_ITEMS) { throw new IllegalArgumentException("cant have more than 25 requests in a batchwrite"); } for (final WriteRequest request : requests) { if ((request.getPutRequest() != null) == (request.getDeleteRequest() != null)) { throw new IllegalArgumentException("Exactly one of PutRequest or DeleteRequest must be set in each WriteRequest in a batch write operation"); } final int wcu; final String apiName; if (request.getPutRequest() != null) { apiName = PUT_ITEM; final int bytes = calculateItemSizeInBytes(request.getPutRequest().getItem()); wcu = computeWcu(bytes); } else { //deleterequest apiName = DELETE_ITEM; wcu = estimateCapacityUnits(apiName, tableName); } timedWriteThrottle(apiName, tableName, wcu); } } BatchWriteItemResult result; setUserAgent(batchRequest); final Timer.Context apiTimerContext = getTimerContext(BATCH_WRITE_ITEM, null /*tableName*/); try { result = client.batchWriteItem(batchRequest); } catch (Exception e) { throw processDynamoDbApiException(e, BATCH_WRITE_ITEM, null /*tableName*/); } finally { apiTimerContext.stop(); } if (result.getConsumedCapacity() != null) { for (ConsumedCapacity ccu : result.getConsumedCapacity()) { meterConsumedCapacity(BATCH_WRITE_ITEM, ccu); } } return result; }
protected BatchWriteItemRequest genBatchWriteItemRequest() { Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); requestItems.put(tableName, batchDeleteRequests); return new BatchWriteItemRequest().withRequestItems(requestItems).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); }
/** * Deletes all objects in a shared table, which belong to a given appid, by scanning the GSI. * @param appid app id */ public static void deleteAllFromSharedTable(String appid) { if (StringUtils.isBlank(appid) || !isSharedAppid(appid)) { return; } Pager pager = new Pager(50); List<WriteRequest> allDeletes = new LinkedList<>(); Page<Item, QueryOutcome> items; // read all phase do { items = queryGSI(appid, pager); if (items == null) { break; } for (Item item : items) { String key = item.getString(Config._KEY); // only delete rows which belong to the given appid if (StringUtils.startsWith(key, appid.trim())) { logger.debug("Preparing to delete '{}' from shared table, appid: '{}'.", key, appid); pager.setLastKey(item.getString(Config._ID)); allDeletes.add(new WriteRequest().withDeleteRequest(new DeleteRequest(). withKey(Collections.singletonMap(Config._KEY, new AttributeValue(key))))); } } } while (items.iterator().hasNext()); // delete all phase final int maxItems = 20; int batchSteps = (allDeletes.size() > maxItems) ? (allDeletes.size() / maxItems) + 1 : 1; List<WriteRequest> reqs = new LinkedList<>(); Iterator<WriteRequest> it = allDeletes.iterator(); String tableName = getTableNameForAppid(appid); for (int i = 0; i < batchSteps; i++) { while (it.hasNext() && reqs.size() < maxItems) { reqs.add(it.next()); } if (reqs.size() > 0) { logger.info("Deleting {} items belonging to app '{}', from shared table (page {}/{})...", reqs.size(), appid, i + 1, batchSteps); batchWrite(Collections.singletonMap(tableName, reqs)); } reqs.clear(); } }
/** * Only works in DEBUG level. * Prints the loaded .jar files at the start of Cygnus run. */ public static void printLoadedJars() { // trace the file containing the httpclient library URL myClassURL = PoolingClientConnectionManager.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading httpclient from " + myClassURL.toExternalForm()); // trace the file containing the httpcore library myClassURL = DefaultBHttpServerConnection.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading httpcore from " + myClassURL.toExternalForm()); // trace the file containing the junit library myClassURL = ErrorCollector.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading junit from " + myClassURL.toExternalForm()); // trace the file containing the flume-ng-node library myClassURL = RegexExtractorInterceptorMillisSerializer.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading flume-ng-node from " + myClassURL.toExternalForm()); // trace the file containing the libthrift library myClassURL = ListMetaData.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading libthrift from " + myClassURL.toExternalForm()); // trace the file containing the gson library myClassURL = JsonPrimitive.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading gson from " + myClassURL.toExternalForm()); // trace the file containing the json-simple library myClassURL = Yytoken.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading json-simple from " + myClassURL.toExternalForm()); // trace the file containing the mysql-connector-java library myClassURL = Driver.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading mysql-connector-java from " + myClassURL.toExternalForm()); // trace the file containing the postgresql library myClassURL = BlobOutputStream.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading postgresql from " + myClassURL.toExternalForm()); // trace the file containing the log4j library myClassURL = SequenceNumberPatternConverter.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading log4j from " + myClassURL.toExternalForm()); // trace the file containing the hadoop-core library myClassURL = AbstractMetricsContext.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading hadoop-core from " + myClassURL.toExternalForm()); // trace the file containing the hive-exec library myClassURL = AbstractMapJoinOperator.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading hive-exec from " + myClassURL.toExternalForm()); // trace the file containing the hive-jdbc library myClassURL = HivePreparedStatement.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading hive-jdbc from " + myClassURL.toExternalForm()); // trace the file containing the mongodb-driver library myClassURL = AsyncReadWriteBinding.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading mongodb-driver from " + myClassURL.toExternalForm()); // trace the file containing the kafka-clients library myClassURL = OffsetOutOfRangeException.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading kafka-clientsc from " + myClassURL.toExternalForm()); // trace the file containing the zkclient library myClassURL = ZkNoNodeException.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading zkclient from " + myClassURL.toExternalForm()); // trace the file containing the kafka_2.11 library myClassURL = KafkaMigrationTool.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading kafka_2.11 from " + myClassURL.toExternalForm()); // trace the file containing the aws-java-sdk-dynamodb library myClassURL = WriteRequest.class.getProtectionDomain().getCodeSource().getLocation(); LOGGER.debug("Loading aws-java-sdk-dynamodb from " + myClassURL.toExternalForm()); }
private static void writeMultipleItemsBatchWrite() { try { // Begin syntax extract // Create a map for the requests in the batch Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); // Create a PutRequest for a new Forum item Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>(); forumItem.put("Name", new AttributeValue().withS("Amazon RDS")); forumItem.put("Threads", new AttributeValue().withN("0")); List<WriteRequest> forumList = new ArrayList<WriteRequest>(); forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem))); requestItems.put("Forum", forumList); // Create a PutRequest for a new Thread item Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>(); threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS")); threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1")); List<WriteRequest> threadList = new ArrayList<WriteRequest>(); threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem))); // Create a DeleteRequest for a Thread item Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); threadDeleteKey.put("ForumName", new AttributeValue().withS("Some hash attribute value")); threadDeleteKey.put("Subject", new AttributeValue().withS("Some range attribute value")); threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey))); requestItems.put("Thread", threadList); BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest(); System.out.println("Making the request."); batchWriteItemRequest.withRequestItems(requestItems); client.batchWriteItem(batchWriteItemRequest); // End syntax extract } catch (AmazonServiceException ase) { System.err.println("Failed to retrieve items: "); ase.printStackTrace(System.err); } }
private static void writeMultipleItemsBatchWrite() { try { // Create a map for the requests in the batch Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); // Create a PutRequest for a new Forum item Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>(); forumItem.put("Name", new AttributeValue().withS("Amazon RDS")); forumItem.put("Threads", new AttributeValue().withN("0")); List<WriteRequest> forumList = new ArrayList<WriteRequest>(); forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem))); requestItems.put(table1Name, forumList); // Create a PutRequest for a new Thread item Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>(); threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS")); threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1")); threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message")); threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory"))); List<WriteRequest> threadList = new ArrayList<WriteRequest>(); threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem))); // Create a DeleteRequest for a Thread item Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3")); threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100")); threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey))); requestItems.put(table2Name, threadList); BatchWriteItemResult result; BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); do { System.out.println("Making the request."); batchWriteItemRequest.withRequestItems(requestItems); result = client.batchWriteItem(batchWriteItemRequest); // Print consumed capacity units for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) { String tableName = consumedCapacity.getTableName(); Double consumedCapacityUnits = consumedCapacity.getCapacityUnits(); System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits); } // Check for unprocessed keys which could happen if you exceed provisioned throughput System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems()); requestItems = result.getUnprocessedItems(); } while (result.getUnprocessedItems().size() > 0); } catch (AmazonServiceException ase) { System.err.println("Failed to retrieve items: "); ase.printStackTrace(System.err); } }
private static void writeMultipleItemsBatchWrite() { try { // Add a new item to Forum TableWriteItems forumTableWriteItems = new TableWriteItems(forumTableName) //Forum .withItemsToPut(new Item() .withPrimaryKey("Name", "Amazon RDS") .withNumber("Threads", 0)); // Add a new item, and delete an existing item, from Thread TableWriteItems threadTableWriteItems = new TableWriteItems(threadTableName) .withItemsToPut(new Item() .withPrimaryKey("ForumName","Amazon RDS","Subject","Amazon RDS Thread 1") .withString("Message", "ElasticCache Thread 1 message") .withStringSet("Tags", new HashSet<String>( Arrays.asList("cache", "in-memory")))) .withHashAndRangeKeysToDelete("ForumName","Subject", "Amazon S3", "S3 Thread 100"); System.out.println("Making the request."); BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(forumTableWriteItems, threadTableWriteItems); do { // Check for unprocessed keys which could happen if you exceed provisioned throughput Map<String, List<WriteRequest>> unprocessedItems = outcome.getUnprocessedItems(); if (outcome.getUnprocessedItems().size() == 0) { System.out.println("No unprocessed items found"); } else { System.out.println("Retrieving the unprocessed items"); outcome = dynamoDB.batchWriteItemUnprocessed(unprocessedItems); } } while (outcome.getUnprocessedItems().size() > 0); } catch (Exception e) { System.err.println("Failed to retrieve items: "); e.printStackTrace(System.err); } }