private void storeResult(final Map<String, Object> result) { final String userId = (String) result.get("user_id"); final String userToken = (String) result.get("access_token"); final String teamId = (String) result.get("team_id"); @SuppressWarnings("unchecked") final Map<String, Object> bot = (Map<String, Object>) result.get("bot"); final String botToken = (String) bot.get("bot_access_token"); final String botId = (String) bot.get("bot_user_id"); final ArrayList<PutRequest> putRequests = new ArrayList<>(); putRequests.add(item("user:" + userId + ":token", userToken)); putRequests.add(item("user:" + botId + ":token", botToken)); putRequests.add(item("team:" + teamId + ":botuser", botId)); final List<WriteRequest> writeRequests = putRequests.stream() .map(WriteRequest::new) .collect(Collectors.toList()); final HashMap<String, List<WriteRequest>> requestItems = new HashMap<>(); requestItems.put(TableName, writeRequests); final BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest().withRequestItems(requestItems); ddb.batchWriteItem(batchWriteItemRequest); }
private void insert(ValueSource valueSource, Schema schema, Object value, PutRequest put) { final AttributeValue attributeValue; try { attributeValue = schema == null ? AttributeValueConverter.toAttributeValueSchemaless(value) : AttributeValueConverter.toAttributeValue(schema, value); } catch (DataException e) { log.error("Failed to convert record with schema={} value={}", schema, value, e); throw e; } final String topAttributeName = valueSource.topAttributeName(config); if (!topAttributeName.isEmpty()) { put.addItemEntry(topAttributeName, attributeValue); } else if (attributeValue.getM() != null) { put.setItem(attributeValue.getM()); } else { throw new ConnectException("No top attribute name configured for " + valueSource + ", and it could not be converted to Map: " + attributeValue); } }
public BatchWriteItemResult putBatch(String tableName, Map<String, AttributeValue> item, long maxItemsPerBatch, Reporter reporter) throws UnsupportedEncodingException { int itemSizeBytes = DynamoDBUtil.getItemSizeBytes(item); if (itemSizeBytes > maxItemByteSize) { throw new RuntimeException("Cannot pass items with size greater than " + maxItemByteSize + ". Item with size of " + itemSizeBytes + " was given."); } maxItemsPerBatch = DynamoDBUtil.getBoundedBatchLimit(config, maxItemsPerBatch); BatchWriteItemResult result = null; if (writeBatchMap.containsKey(tableName)) { boolean writeRequestsForTableAtLimit = writeBatchMap.get(tableName).size() >= maxItemsPerBatch; boolean totalSizeOfWriteBatchesOverLimit = writeBatchMapSizeBytes + itemSizeBytes > maxBatchSize; if (writeRequestsForTableAtLimit || totalSizeOfWriteBatchesOverLimit) { result = writeBatch(reporter, itemSizeBytes); } } // writeBatchMap could be cleared from writeBatch() List<WriteRequest> writeBatchList; if (!writeBatchMap.containsKey(tableName)) { writeBatchList = new ArrayList<>((int) maxItemsPerBatch); writeBatchMap.put(tableName, writeBatchList); } else { writeBatchList = writeBatchMap.get(tableName); } writeBatchList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(item))); writeBatchMapSizeBytes += itemSizeBytes; return result; }
private PutRequest toPutRequest(SinkRecord record) { final PutRequest put = new PutRequest(); if (!config.ignoreRecordValue) { insert(ValueSource.RECORD_VALUE, record.valueSchema(), record.value(), put); } if (!config.ignoreRecordKey) { insert(ValueSource.RECORD_KEY, record.keySchema(), record.key(), put); } if (config.kafkaCoordinateNames != null) { put.addItemEntry(config.kafkaCoordinateNames.topic, new AttributeValue().withS(record.topic())); put.addItemEntry(config.kafkaCoordinateNames.partition, new AttributeValue().withN(String.valueOf(record.kafkaPartition()))); put.addItemEntry(config.kafkaCoordinateNames.offset, new AttributeValue().withN(String.valueOf(record.kafkaOffset()))); } return put; }
/** * * @{inheritDoc */ @Override public void addTimingResults(final @Nonnull String tableName, final @Nonnull List<TankResult> results, boolean async) { if (!results.isEmpty()) { Runnable task = new Runnable() { public void run() { MethodTimer mt = new MethodTimer(logger, this.getClass(), "addTimingResults (" + results + ")"); List<WriteRequest> requests = new ArrayList<WriteRequest>(); try { for (TankResult result : results) { Map<String, AttributeValue> item = getTimingAttributes(result); PutRequest putRequest = new PutRequest().withItem(item); WriteRequest writeRequest = new WriteRequest().withPutRequest(putRequest); requests.add(writeRequest); } sendBatch(tableName, requests); } catch (Exception t) { logger.error("Error adding results: " + t.getMessage(), t); throw new RuntimeException(t); } mt.endAndLog(); } }; if (async) { EXECUTOR.execute(task); } else { task.run(); } } }
/** * @{inheritDoc */ @Override public void addItems(final String tableName, List<Item> itemList, final boolean asynch) { if (!itemList.isEmpty()) { final List<Item> items = new ArrayList<Item>(itemList); Runnable task = new Runnable() { public void run() { MethodTimer mt = new MethodTimer(logger, this.getClass(), "addItems (" + items + ")"); List<WriteRequest> requests = new ArrayList<WriteRequest>(); try { for (Item item : items) { Map<String, AttributeValue> toInsert = itemToMap(item); PutRequest putRequest = new PutRequest().withItem(toInsert); WriteRequest writeRequest = new WriteRequest().withPutRequest(putRequest); requests.add(writeRequest); } sendBatch(tableName, requests); } catch (Exception t) { logger.error("Error adding results: " + t.getMessage(), t); throw new RuntimeException(t); } mt.endAndLog(); } }; if (asynch) { EXECUTOR.execute(task); } else { task.run(); } } }
/** * Splits up a ScanResult into a list of BatchWriteItemRequests of size 25 * items or less each. */ public static List<BatchWriteItemRequest> splitResultIntoBatches( ScanResult result, String tableName) { List<BatchWriteItemRequest> batches = new LinkedList<BatchWriteItemRequest>(); Iterator<Map<String, AttributeValue>> it = result.getItems().iterator(); BatchWriteItemRequest req = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); List<WriteRequest> writeRequests = new LinkedList<WriteRequest>(); int i = 0; while (it.hasNext()) { PutRequest put = new PutRequest(it.next()); writeRequests.add(new WriteRequest(put)); i++; if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); req = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); writeRequests = new LinkedList<WriteRequest>(); i = 0; } } if (i > 0) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); } return batches; }
@Test public void test_batchWriteItem_WithAllParameters() throws Exception { createTable(); String TEST_ATTRIBUTE_2 = "Attribute2"; String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2"; Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); List<WriteRequest> writeRequests = new ArrayList<WriteRequest>(); Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>(); item1.put(TEST_ATTRIBUTE, new AttributeValue() .withS(TEST_ATTRIBUTE_VALUE)); WriteRequest writeRequest1 = new WriteRequest() .withPutRequest(new PutRequest() .withItem(item1)); writeRequests.add(writeRequest1); Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>(); item2.put(TEST_ATTRIBUTE_2, new AttributeValue() .withS(TEST_ATTRIBUTE_VALUE_2)); WriteRequest writeRequest2 = new WriteRequest() .withPutRequest(new PutRequest() .withItem(item2)); writeRequests.add(writeRequest2); requestItems.put(TEST_TABLE_NAME, writeRequests); BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems); List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity(); assertThat(consumedCapacities.size(), equalTo(writeRequests.size())); }
@Override public <P extends ParaObject> void createAll(String appid, List<P> objects) { if (objects == null || objects.isEmpty() || StringUtils.isBlank(appid)) { return; } List<WriteRequest> reqs = new ArrayList<>(objects.size()); int batchSteps = 1; if ((objects.size() > MAX_ITEMS_PER_WRITE)) { batchSteps = (objects.size() / MAX_ITEMS_PER_WRITE) + ((objects.size() % MAX_ITEMS_PER_WRITE > 0) ? 1 : 0); } Iterator<P> it = objects.iterator(); String tableName = getTableNameForAppid(appid); int j = 0; for (int i = 0; i < batchSteps; i++) { while (it.hasNext() && j < MAX_ITEMS_PER_WRITE) { ParaObject object = it.next(); if (StringUtils.isBlank(object.getId())) { object.setId(Utils.getNewId()); } if (object.getTimestamp() == null) { object.setTimestamp(Utils.timestamp()); } //if (updateOp) object.setUpdated(Utils.timestamp()); object.setAppid(appid); Map<String, AttributeValue> row = toRow(object, null); setRowKey(getKeyForAppid(object.getId(), appid), row); reqs.add(new WriteRequest().withPutRequest(new PutRequest().withItem(row))); j++; } batchWrite(Collections.singletonMap(tableName, reqs)); reqs.clear(); j = 0; } logger.debug("DAO.createAll() {}->{}", appid, (objects == null) ? 0 : objects.size()); }
public PutPointRequest(GeoPoint geoPoint, AttributeValue rangeKeyValue) { putItemRequest = new PutItemRequest(); putItemRequest.setItem(new HashMap<String, AttributeValue>()); putRequest = new PutRequest(); putRequest.setItem(new HashMap<String, AttributeValue>()); this.geoPoint = geoPoint; this.rangeKeyValue = rangeKeyValue; }
public BatchWritePointResult batchWritePoints(List<PutPointRequest> putPointRequests) { BatchWriteItemRequest batchItemRequest = new BatchWriteItemRequest(); List<WriteRequest> writeRequests = new ArrayList<WriteRequest>(); for (PutPointRequest putPointRequest : putPointRequests) { long geohash = S2Manager.generateGeohash(putPointRequest.getGeoPoint()); long hashKey = S2Manager.generateHashKey(geohash, config.getHashKeyLength()); String geoJson = GeoJsonMapper.stringFromGeoObject(putPointRequest.getGeoPoint()); PutRequest putRequest = putPointRequest.getPutRequest(); AttributeValue hashKeyValue = new AttributeValue().withN(String.valueOf(hashKey)); putRequest.getItem().put(config.getHashKeyAttributeName(), hashKeyValue); putRequest.getItem().put(config.getRangeKeyAttributeName(), putPointRequest.getRangeKeyValue()); AttributeValue geohashValue = new AttributeValue().withN(Long.toString(geohash)); putRequest.getItem().put(config.getGeohashAttributeName(), geohashValue); AttributeValue geoJsonValue = new AttributeValue().withS(geoJson); putRequest.getItem().put(config.getGeoJsonAttributeName(), geoJsonValue); WriteRequest writeRequest = new WriteRequest(putRequest); writeRequests.add(writeRequest); } Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); requestItems.put(config.getTableName(), writeRequests); batchItemRequest.setRequestItems(requestItems); BatchWriteItemResult batchWriteItemResult = config.getDynamoDBClient().batchWriteItem(batchItemRequest); BatchWritePointResult batchWritePointResult = new BatchWritePointResult(batchWriteItemResult); return batchWritePointResult; }
private static void writeMultipleItemsBatchWrite() { try { // Begin syntax extract // Create a map for the requests in the batch Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); // Create a PutRequest for a new Forum item Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>(); forumItem.put("Name", new AttributeValue().withS("Amazon RDS")); forumItem.put("Threads", new AttributeValue().withN("0")); List<WriteRequest> forumList = new ArrayList<WriteRequest>(); forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem))); requestItems.put("Forum", forumList); // Create a PutRequest for a new Thread item Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>(); threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS")); threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1")); List<WriteRequest> threadList = new ArrayList<WriteRequest>(); threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem))); // Create a DeleteRequest for a Thread item Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); threadDeleteKey.put("ForumName", new AttributeValue().withS("Some hash attribute value")); threadDeleteKey.put("Subject", new AttributeValue().withS("Some range attribute value")); threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey))); requestItems.put("Thread", threadList); BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest(); System.out.println("Making the request."); batchWriteItemRequest.withRequestItems(requestItems); client.batchWriteItem(batchWriteItemRequest); // End syntax extract } catch (AmazonServiceException ase) { System.err.println("Failed to retrieve items: "); ase.printStackTrace(System.err); } }
private static void writeMultipleItemsBatchWrite() { try { // Create a map for the requests in the batch Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>(); // Create a PutRequest for a new Forum item Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>(); forumItem.put("Name", new AttributeValue().withS("Amazon RDS")); forumItem.put("Threads", new AttributeValue().withN("0")); List<WriteRequest> forumList = new ArrayList<WriteRequest>(); forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem))); requestItems.put(table1Name, forumList); // Create a PutRequest for a new Thread item Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>(); threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS")); threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1")); threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message")); threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory"))); List<WriteRequest> threadList = new ArrayList<WriteRequest>(); threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem))); // Create a DeleteRequest for a Thread item Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3")); threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100")); threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey))); requestItems.put(table2Name, threadList); BatchWriteItemResult result; BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest() .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); do { System.out.println("Making the request."); batchWriteItemRequest.withRequestItems(requestItems); result = client.batchWriteItem(batchWriteItemRequest); // Print consumed capacity units for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) { String tableName = consumedCapacity.getTableName(); Double consumedCapacityUnits = consumedCapacity.getCapacityUnits(); System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits); } // Check for unprocessed keys which could happen if you exceed provisioned throughput System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems()); requestItems = result.getUnprocessedItems(); } while (result.getUnprocessedItems().size() > 0); } catch (AmazonServiceException ase) { System.err.println("Failed to retrieve items: "); ase.printStackTrace(System.err); } }
public PutRequest getPutRequest() { return putRequest; }