/** * @{inheritDoc */ @Override public void addItems(final String tableName, final List<Item> items, boolean asynch) { Runnable task = new Runnable() { public void run() { List<ReplaceableItem> tmpItems = new ArrayList<ReplaceableItem>(); for (Item item : items) { tmpItems.add(itemToAWSItem(item)); if (tmpItems.size() == 25) { addItemsToTable(new BatchPutAttributesRequest(tableName, tmpItems)); tmpItems.clear(); } } addItemsToTable(new BatchPutAttributesRequest(tableName, tmpItems)); } }; if (asynch) { EXECUTOR.execute(task); } else { task.run(); } }
/** * * @param tableName * @param items */ public void addItems(String tableName, List<ReplaceableItem> items) { try { List<ReplaceableItem> tmpItems = new ArrayList<ReplaceableItem>(); for (ReplaceableItem item : items) { tmpItems.add(item); if (tmpItems.size() == 25) { addItemsToTable(new BatchPutAttributesRequest(tableName, tmpItems)); tmpItems.clear(); } } addItemsToTable(new BatchPutAttributesRequest(tableName, tmpItems)); } catch (Exception t) { logger.error("Error adding result: " + t, t); } }
public void execute() { BatchPutAttributesRequest request = new BatchPutAttributesRequest() .withDomainName(determineDomainName()) .withItems(determineReplaceableItems()); log.trace("Sending request [{}] for exchange [{}]...", request, exchange); this.sdbClient.batchPutAttributes(request); log.trace("Request sent"); }
/** * * @{inheritDoc */ public void addTimingResults(final @Nonnull String tableName, final @Nonnull List<TankResult> messages, boolean asynch) { if (!messages.isEmpty()) { Runnable task = new Runnable() { public void run() { List<ReplaceableItem> items = new ArrayList<ReplaceableItem>(); try { for (TankResult result : messages) { ReplaceableItem item = new ReplaceableItem(); item.setAttributes(getTimingAttributes(result)); item.setName(UUID.randomUUID().toString()); items.add(item); if (items.size() == 25) { addItemsToTable(new BatchPutAttributesRequest(tableName, new ArrayList<ReplaceableItem>(items))); // logger.info("Sending " + items.size() + " // results to table " + tableName); items.clear(); } } if (items.size() > 0) { addItemsToTable(new BatchPutAttributesRequest(tableName, items)); logger.info("Sending " + items.size() + " results to table " + tableName); } } catch (Exception t) { logger.error("Error adding results: " + t.getMessage(), t); throw new RuntimeException(t); } } }; if (asynch) { EXECUTOR.execute(task); } else { task.run(); } } }
private void addItemsToTable(final BatchPutAttributesRequest request) { boolean shouldRetry; int retries = 0; do { shouldRetry = false; try { db.batchPutAttributes(request); } catch (AmazonServiceException e) { int status = e.getStatusCode(); if (status == HttpStatus.SC_INTERNAL_SERVER_ERROR || status == HttpStatus.SC_SERVICE_UNAVAILABLE) { shouldRetry = true; long delay = (long) (Math.random() * (Math.pow(4, retries++) * 100L)); try { Thread.sleep(delay); } catch (InterruptedException iex) { logger.error("Caught InterruptedException exception", iex); } } else if ("DuplicateItemName".equals(e.getErrorCode())) { // ignore. } else { logger.error("Error writing to DB: " + e.getMessage()); } } } while (shouldRetry && retries < MAX_NUMBER_OF_RETRIES); }
@Override public BatchPutAttributesResult batchPutAttributes(BatchPutAttributesRequest batchPutAttributesRequest) throws AmazonServiceException, AmazonClientException { this.batchPutAttributesRequest = batchPutAttributesRequest; return new BatchPutAttributesResult(); }
@Override public void flush() { if (cache.size() < 1) { return; } BatchPutAttributesRequest req = new BatchPutAttributesRequest(); List<ReplaceableItem> items = new ArrayList<ReplaceableItem>(CACHE_SIZE); for (Map.Entry<String, Map<String, String>> cacheEntry : cache .entrySet()) { ReplaceableItem entry = new ReplaceableItem(cacheEntry.getKey()); List<ReplaceableAttribute> attributes = new ArrayList<ReplaceableAttribute>( cacheEntry.getValue().size()); for (Map.Entry<String, String> dataEntry : cacheEntry.getValue() .entrySet()) { attributes.add(new ReplaceableAttribute(dataEntry.getKey(), dataEntry.getValue(), false)); } entry.setAttributes(attributes); items.add(entry); } req.setDomainName(domain); req.setItems(items); int tries = 0; do { tries++; try { client.batchPutAttributes(req); cache.clear(); return; } catch (Exception ase) { log.warn(ase); try { Thread.sleep(1000); } catch (InterruptedException e) { } } } while (tries < MAX_TRIES); cache.clear(); throw new RuntimeException("Unable to connect to SDB " + domain); }