/** * @{inheritDoc */ @Override public void addItems(final String tableName, final List<Item> items, boolean asynch) { Runnable task = new Runnable() { public void run() { List<ReplaceableItem> tmpItems = new ArrayList<ReplaceableItem>(); for (Item item : items) { tmpItems.add(itemToAWSItem(item)); if (tmpItems.size() == 25) { addItemsToTable(new BatchPutAttributesRequest(tableName, tmpItems)); tmpItems.clear(); } } addItemsToTable(new BatchPutAttributesRequest(tableName, tmpItems)); } }; if (asynch) { EXECUTOR.execute(task); } else { task.run(); } }
/** * * @param tableName * @param items */ public void addItems(String tableName, List<ReplaceableItem> items) { try { List<ReplaceableItem> tmpItems = new ArrayList<ReplaceableItem>(); for (ReplaceableItem item : items) { tmpItems.add(item); if (tmpItems.size() == 25) { addItemsToTable(new BatchPutAttributesRequest(tableName, tmpItems)); tmpItems.clear(); } } addItemsToTable(new BatchPutAttributesRequest(tableName, tmpItems)); } catch (Exception t) { logger.error("Error adding result: " + t, t); } }
@Test public void batchPutAttributes() { final List<ReplaceableItem> replaceableItems = Arrays.asList(new ReplaceableItem[] { new ReplaceableItem("ITEM1")}); template.send("direct:start", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader(SdbConstants.OPERATION, SdbOperations.BatchPutAttributes); exchange.getIn().setHeader(SdbConstants.REPLACEABLE_ITEMS, replaceableItems); } }); }
@Test public void execute() { List<ReplaceableItem> replaceableItems = new ArrayList<ReplaceableItem>(); replaceableItems.add(new ReplaceableItem("ITEM1")); exchange.getIn().setHeader(SdbConstants.REPLACEABLE_ITEMS, replaceableItems); command.execute(); assertEquals("DOMAIN1", sdbClient.batchPutAttributesRequest.getDomainName()); assertEquals(replaceableItems, sdbClient.batchPutAttributesRequest.getItems()); }
@Test public void determineReplaceableItems() { assertNull(this.command.determineReplaceableItems()); List<ReplaceableItem> replaceableItems = new ArrayList<ReplaceableItem>(); replaceableItems.add(new ReplaceableItem("ITEM1")); exchange.getIn().setHeader(SdbConstants.REPLACEABLE_ITEMS, replaceableItems); assertEquals(replaceableItems, this.command.determineReplaceableItems()); }
@Test public void batchPutAttributes() { final List<ReplaceableItem> replaceableItems = Arrays.asList(new ReplaceableItem[] { new ReplaceableItem("ITEM1")}); template.send("direct:start", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader(SdbConstants.OPERATION, SdbOperations.BatchPutAttributes); exchange.getIn().setHeader(SdbConstants.REPLACEABLE_ITEMS, replaceableItems); } }); assertEquals("TestDomain", amazonSDBClient.batchPutAttributesRequest.getDomainName()); assertEquals(replaceableItems, amazonSDBClient.batchPutAttributesRequest.getItems()); }
/** * * @{inheritDoc */ public void addTimingResults(final @Nonnull String tableName, final @Nonnull List<TankResult> messages, boolean asynch) { if (!messages.isEmpty()) { Runnable task = new Runnable() { public void run() { List<ReplaceableItem> items = new ArrayList<ReplaceableItem>(); try { for (TankResult result : messages) { ReplaceableItem item = new ReplaceableItem(); item.setAttributes(getTimingAttributes(result)); item.setName(UUID.randomUUID().toString()); items.add(item); if (items.size() == 25) { addItemsToTable(new BatchPutAttributesRequest(tableName, new ArrayList<ReplaceableItem>(items))); // logger.info("Sending " + items.size() + " // results to table " + tableName); items.clear(); } } if (items.size() > 0) { addItemsToTable(new BatchPutAttributesRequest(tableName, items)); logger.info("Sending " + items.size() + " results to table " + tableName); } } catch (Exception t) { logger.error("Error adding results: " + t.getMessage(), t); throw new RuntimeException(t); } } }; if (asynch) { EXECUTOR.execute(task); } else { task.run(); } } }
/** * @param item * @return */ private ReplaceableItem itemToAWSItem(Item item) { List<ReplaceableAttribute> attributes = new ArrayList<ReplaceableAttribute>(); for (com.intuit.tank.reporting.databases.Attribute attr : item.getAttributes()) { addAttribute(attributes, attr.getName(), attr.getValue()); } ReplaceableItem ret = new ReplaceableItem(item.getName(), attributes); return ret; }
@SuppressWarnings("unchecked") protected Collection<ReplaceableItem> determineReplaceableItems() { return exchange.getIn().getHeader(SdbConstants.REPLACEABLE_ITEMS, Collection.class); }
@Override public void flush() { if (cache.size() < 1) { return; } BatchPutAttributesRequest req = new BatchPutAttributesRequest(); List<ReplaceableItem> items = new ArrayList<ReplaceableItem>(CACHE_SIZE); for (Map.Entry<String, Map<String, String>> cacheEntry : cache .entrySet()) { ReplaceableItem entry = new ReplaceableItem(cacheEntry.getKey()); List<ReplaceableAttribute> attributes = new ArrayList<ReplaceableAttribute>( cacheEntry.getValue().size()); for (Map.Entry<String, String> dataEntry : cacheEntry.getValue() .entrySet()) { attributes.add(new ReplaceableAttribute(dataEntry.getKey(), dataEntry.getValue(), false)); } entry.setAttributes(attributes); items.add(entry); } req.setDomainName(domain); req.setItems(items); int tries = 0; do { tries++; try { client.batchPutAttributes(req); cache.clear(); return; } catch (Exception ase) { log.warn(ase); try { Thread.sleep(1000); } catch (InterruptedException e) { } } } while (tries < MAX_TRIES); cache.clear(); throw new RuntimeException("Unable to connect to SDB " + domain); }