@Override public void reduce(Text key, Iterable<TextArrayWritable> values, Context context) throws IOException, InterruptedException { String origin = key.toString(); TreeBidiMap<Double, String> topCarriers = new TreeBidiMap<>(); for (TextArrayWritable each : values) { Text[] texts = (Text[]) each.toArray(); String carrier = texts[0].toString(); Double performance = Double.valueOf(texts[1].toString()); topCarriers.put(performance, carrier); if (topCarriers.size() > 10) { topCarriers.remove(topCarriers.firstKey()); } } persist(origin, topCarriers); }
@Test public void givenkeysAndValues_whenUpdateCache_thenUpdatedAndSorted() { reducer.setMaxElements(3); reducer.updateCache(40, "4444"); reducer.updateCache(10, "1111"); reducer.updateCache(20, "2222"); reducer.updateCache(30, "3333"); reducer.updateCache(100, "1111"); TreeBidiMap<Double, String> cache = reducer.getCache(); assertTrue(cache.size() == 3); assertTrue(cache.containsKey(100.)); assertTrue(cache.containsValue("1111")); assertTrue(cache.containsKey(40.)); assertTrue(cache.containsValue("4444")); assertTrue(cache.containsKey(30.)); assertTrue(cache.containsValue("3333")); }
@Test public void givenkeysAndValues_whenUpdateCache_thenUpdatedAndSorted() { SumReducer reducer = new SumReducer(); reducer.setMaxElements(3); reducer.updateCache(10, "test"); reducer.updateCache(2, "test2"); reducer.updateCache(3, "test3"); reducer.updateCache(4, "test4"); reducer.updateCache(5, "test2"); TreeBidiMap<Integer, String> cache = reducer.getCache(); assertTrue(cache.size() == 3); assertTrue(cache.containsKey(5)); assertTrue(cache.containsValue("test2")); assertTrue(cache.containsKey(10)); assertTrue(cache.containsValue("test")); assertTrue(cache.containsKey(4)); assertTrue(cache.containsValue("test4")); }
@Test public void testCassandraInsert() { TopDestReducer reducer = new TopDestReducer(); TreeBidiMap<Double, String> topCarriers = new TreeBidiMap<>(); topCarriers.put(1., "AW"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); reducer.persist("NYC", topCarriers); }
void persist(String origin, TreeBidiMap<Double, String> topCarriers) { String cqlQuery = "INSERT INTO capstone.airport (code, top_carriers) VALUES (?,?)"; PreparedStatement preparedStatement = connect.prepare(cqlQuery); Object[] values = topCarriers.values().toArray(); CollectionUtils.reverseArray(values); BoundStatement boundStatement = preparedStatement.bind(origin, newArrayList(values)); connect.executeAsync(boundStatement); }
@Test public void testCassandraInsert() { TopCarriersReducer reducer = new TopCarriersReducer(); TreeBidiMap<Double, String> topCarriers = new TreeBidiMap<>(); topCarriers.put(1., "AW"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); topCarriers.put(2., "LA"); reducer.persist("NYC", topCarriers); }
/** * make RData object from Renjin * renjin-script-engine-*-with dependencies.jar required * https://nexus.bedatadriven.com/content/groups/public/org/renjin/renjin-script-engine/ **/ protected static void makeRMatrix(String in_rf, String out_Rmat) { // TODO Auto-generated method stub ScriptEngineManager manager = new ScriptEngineManager(); ScriptEngine engine = manager.getEngineByName("Renjin"); if(engine == null) { throw new RuntimeException("Renjin not found!!!"); } try { BufferedReader br = Utils.getBufferedReader(in_rf); final BidiMap<Integer, String> scaffs = new TreeBidiMap<Integer, String>(); String line; String s[]; int w = 0; while( (line=br.readLine())!=null && line.startsWith("##")) { scaffs.put(w++, line.replaceAll("^##", "")); } int n = scaffs.size(); int A = w*(w-1)/2; DoubleMatrixBuilder dMat = new DoubleMatrixBuilder(n,n); DoubleMatrixBuilder iMat = new DoubleMatrixBuilder(n,n); DoubleMatrixBuilder dAllMat = new DoubleMatrixBuilder(A*2,4); w = 0; while( line!=null ) { s = line.split("\\s+"); int i=scaffs.getKey(s[5]), j=scaffs.getKey(s[6]); double d = Double.parseDouble(s[0]); dMat.set(i,j,d); dMat.set(j,i,d); iMat.set(i,j,w+1); iMat.set(j,i,w+1+A); for(int k=0; k<4; k++) { d = Double.parseDouble(s[k+1]); dAllMat.set(w, k, d); dAllMat.set(w+A, (k==0||k==3)?k:(3-k), d); } w++; line = br.readLine(); } br.close(); StringVector scf = new StringArrayVector(scaffs.values()); dMat.setRowNames(scf); dMat.setColNames(scf); iMat.setRowNames(scf); iMat.setColNames(scf); Context context = Context.newTopLevelContext(); FileOutputStream fos = new FileOutputStream(out_Rmat); GZIPOutputStream zos = new GZIPOutputStream(fos); RDataWriter writer = new RDataWriter(context, zos); ListVector.NamedBuilder Rdat = new ListVector.NamedBuilder(); Rdat.add("scaffs", scf); Rdat.add("n", n); Rdat.add("A", A); Rdat.add("distanceAll", dAllMat.build()); Rdat.add("indexMat", iMat.build()); Rdat.add("distanceMat", dMat.build()); writer.save(Rdat.build()); writer.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
void persist(String origin, TreeBidiMap<Double, String> topCarriers) { BoundStatement boundStatement = preparedStatement.bind(origin, newArrayList(topCarriers.values())); connect.executeAsync(boundStatement); }
public TreeBidiMap<Integer, String> getCache() { return cache; }
public TreeBidiMap<Double, String> getCache() { return cache; }