/** {@link KV} to pair flatmap function. */ public static <K, V> PairFlatMapFunction<Iterator<KV<K, V>>, K, V> toPairFlatMapFunction() { return new PairFlatMapFunction<Iterator<KV<K, V>>, K, V>() { @Override public Iterator<Tuple2<K, V>> call(final Iterator<KV<K, V>> itr) { final Iterator<Tuple2<K, V>> outputItr = Iterators.transform( itr, new com.google.common.base.Function<KV<K, V>, Tuple2<K, V>>() { @Override public Tuple2<K, V> apply(KV<K, V> kv) { return new Tuple2<>(kv.getKey(), kv.getValue()); } }); return outputItr; } }; }
/** * A utility method that adapts {@link PairFunction} to a {@link PairFlatMapFunction} with an * {@link Iterator} input. This is particularly useful because it allows to use functions written * for mapToPair functions in flatmapToPair functions. * * @param pairFunction the {@link PairFunction} to adapt. * @param <T> the input type. * @param <K> the output key type. * @param <V> the output value type. * @return a {@link PairFlatMapFunction} that accepts an {@link Iterator} as an input and applies * the {@link PairFunction} on every element. */ public static <T, K, V> PairFlatMapFunction<Iterator<T>, K, V> pairFunctionToPairFlatMapFunction( final PairFunction<T, K, V> pairFunction) { return new PairFlatMapFunction<Iterator<T>, K, V>() { @Override public Iterator<Tuple2<K, V>> call(Iterator<T> itr) throws Exception { final Iterator<Tuple2<K, V>> outputItr = Iterators.transform( itr, new com.google.common.base.Function<T, Tuple2<K, V>>() { @Override public Tuple2<K, V> apply(T t) { try { return pairFunction.call(t); } catch (Exception e) { throw new RuntimeException(e); } } }); return outputItr; } }; }
private JavaRDD<T> partition(final SpatialPartitioner partitioner) { return this.rawSpatialRDD.flatMapToPair( new PairFlatMapFunction<T, Integer, T>() { @Override public Iterator<Tuple2<Integer, T>> call(T spatialObject) throws Exception { return partitioner.placeObject(spatialObject); } } ).partitionBy(partitioner) .mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer, T>>, T>() { @Override public Iterator<T> call(final Iterator<Tuple2<Integer, T>> tuple2Iterator) throws Exception { return new Iterator<T>() { @Override public boolean hasNext() { return tuple2Iterator.hasNext(); } @Override public T next() { return tuple2Iterator.next()._2(); } @Override public void remove() { throw new UnsupportedOperationException(); } }; } }, true); }
@Override public <K, V> PairWorkloadOperator<K, V> flatMapToPair(final FlatMapPairFunction<T, K, V> fun, String componentId) { JavaPairDStream<K, V> pairDStream = dStream.flatMapToPair(new PairFlatMapFunction<T, K, V>() { @Override public Iterable<Tuple2<K, V>> call(T t) throws Exception { return fun.flatMapToPair(t); } }); return new SparkPairWorkloadOperator<>(pairDStream, parallelism); }
private void assertDocToCitationsFunction(PairFlatMapFunction<DocumentMetadata, String, ReferenceMetadata> function) throws Exception { ReferenceMetadata refMetadata1 = ReferenceMetadata.newBuilder().setPosition(3).setBasicMetadata(new BasicMetadata()).build(); ReferenceMetadata refMetadata2 = ReferenceMetadata.newBuilder().setPosition(5).setBasicMetadata(new BasicMetadata()).build(); ReferenceMetadata refMetadata3 = ReferenceMetadata.newBuilder().setPosition(6).setBasicMetadata(new BasicMetadata()).build(); DocumentMetadata docMetadata = DocumentMetadata.newBuilder() .setId("someId") .setBasicMetadata(new BasicMetadata()) .setReferences(Lists.newArrayList(refMetadata1, refMetadata2, refMetadata3)) .build(); Iterable<Tuple2<String, ReferenceMetadata>> retCitations = function.call(docMetadata); List<Tuple2<String, ReferenceMetadata>> retCitationsList = Lists.newArrayList(retCitations); assertEquals(3, retCitationsList.size()); assertEquals("cit_someId_3", retCitationsList.get(0)._1); assertTrue(retCitationsList.get(0)._2 == refMetadata1); assertEquals("cit_someId_5", retCitationsList.get(1)._1); assertTrue(retCitationsList.get(1)._2 == refMetadata2); assertEquals("cit_someId_6", retCitationsList.get(2)._1); assertTrue(retCitationsList.get(2)._2 == refMetadata3); }
/** * Spatial partitioning with duplicates. * * @return true, if successful * @throws Exception the exception */ private boolean spatialPartitioningWithDuplicates() throws Exception { this.distributedRasterCountMatrix = this.distributedRasterCountMatrix.flatMapToPair(new PairFlatMapFunction<Tuple2<Pixel, Double>, Pixel, Double>() { @Override public Iterator<Tuple2<Pixel, Double>> call(Tuple2<Pixel, Double> pixelDoubleTuple2) throws Exception { VisualizationPartitioner vizPartitioner = new VisualizationPartitioner(resolutionX,resolutionY,partitionX,partitionY); return vizPartitioner.assignPartitionIDs(pixelDoubleTuple2, photoFilterRadius).iterator(); } }); this.distributedRasterCountMatrix = this.distributedRasterCountMatrix.partitionBy(new VisualizationPartitioner(this.resolutionX,this.resolutionY,this.partitionX,this.partitionY)); return true; }
private void assertConvertCitationFunction(PairFlatMapFunction<Tuple2<String, ReferenceMetadata>, String, MatchableEntity> function) throws Exception { ReferenceMetadata referenceMetadata = mock(ReferenceMetadata.class); MatchableEntity matchableEntity = mock(MatchableEntity.class); when(converter.convertToMatchableEntity("cit_id_3", referenceMetadata)).thenReturn(matchableEntity); doReturn(Option.apply("some raw text")).when(matchableEntity).rawText(); Iterable<Tuple2<String, MatchableEntity>> retConverted = function.call(new Tuple2<>("cit_id_3", referenceMetadata)); List<Tuple2<String, MatchableEntity>> retConvertedList = Lists.newArrayList(retConverted); assertEquals(1, retConvertedList.size()); assertTrue(retConvertedList.get(0)._2 == matchableEntity); assertEquals("cit_id_3", retConvertedList.get(0)._1); }
public static void main(String[] args) { long start = System.currentTimeMillis(); SparkConf sparkConf = new SparkConf(); sparkConf.setMaster("local"); sparkConf.setAppName("TestSpark"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> input = sc.parallelize(data); // represent the input data as a key value JavaPairRDD<String, Iterable<String>> links = input.mapToPair( new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String x) throws Exception { String[] array = x.split(" "); return new Tuple2(array[0], array[1]); } }).groupByKey(); links.partitionBy(new HashPartitioner(5)); // initialize ranks JavaPairRDD<String, Double> ranks = links.mapValues(x -> 1.0); // Calculates and updates URL ranks continuously using PageRank algorithm. for (int current = 0; current < 10; current++) { // Calculates URL contributions to the rank of other URLs. JavaRDD<Tuple2<Iterable<String>, Double>> values = links.join(ranks).values(); JavaPairRDD<String, Double> contribs = values .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() { @Override public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) { int urlCount = Iterables.size(s._1); List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>(); for (String n : s._1) { results.add(new Tuple2<String, Double>(n, s._2() / urlCount)); } return results; } }); // Re-calculates URL ranks based on neighbor contributions. ranks = contribs .reduceByKey((a, b) -> a + b) .mapValues(x -> 0.15 + x * 0.85); //print(ranks); } print(ranks); sc.stop(); System.out.println((System.currentTimeMillis() - start)/1000 + " seconds"); }
private void assertFlatMapCitationsFunction(PairFlatMapFunction<DocumentMetadata, String, Citation> function) throws Exception { ReferenceMetadata referenceMetadata1 = new ReferenceMetadata(1, null); ReferenceMetadata referenceMetadata2 = new ReferenceMetadata(2, Maps.newHashMap()); ReferenceMetadata referenceMetadata3 = new ReferenceMetadata(3, Maps.newHashMap()); referenceMetadata3.getExternalIds().put("someIdType", "ref.id1"); referenceMetadata3.getExternalIds().put("someOtherIdType", "ref.other.id1"); ReferenceMetadata referenceMetadata4 = new ReferenceMetadata(4, Maps.newHashMap()); referenceMetadata4.getExternalIds().put("someIdType", "ref.id2"); referenceMetadata4.getExternalIds().put("someOtherIdType", "ref.other.id2"); assertThat(function.call(new DocumentMetadata("id-1", null, null, Lists.newArrayList())), iterableWithSize(0)); assertThat(function.call(new DocumentMetadata("id-1", null, null, Lists.newArrayList(referenceMetadata1))), iterableWithSize(0)); assertThat(function.call(new DocumentMetadata("id-1", null, null, Lists.newArrayList(referenceMetadata2))), iterableWithSize(0)); Iterable<Tuple2<String, Citation>> references = function.call(new DocumentMetadata("id-1", null, null, Lists.newArrayList(referenceMetadata3, referenceMetadata4))); assertThat(references, iterableWithSize(2)); Iterator<Tuple2<String, Citation>> referenceIterator = references.iterator(); Tuple2<String, Citation> firstReference = referenceIterator.next(); Tuple2<String, Citation> secondReference = referenceIterator.next(); assertThat(firstReference, equalTo(new Tuple2<String, Citation>("ref.id1", new Citation("id-1", 3, null)))); assertThat(secondReference, equalTo(new Tuple2<String, Citation>("ref.id2", new Citation("id-1", 4, null)))); }
private void assertConvertCitationFunction_TOO_LONG_RAW_TEXT(PairFlatMapFunction<Tuple2<String, ReferenceMetadata>, String, MatchableEntity> function) throws Exception { ReferenceMetadata referenceMetadata = mock(ReferenceMetadata.class); MatchableEntity matchableEntity = mock(MatchableEntity.class); when(converter.convertToMatchableEntity("cit_id_3", referenceMetadata)).thenReturn(matchableEntity); doReturn(Option.apply(StringUtils.repeat('a', 10001))).when(matchableEntity).rawText(); Iterable<Tuple2<String, MatchableEntity>> retConverted = function.call(new Tuple2<>("cit_id_3", referenceMetadata)); List<Tuple2<String, MatchableEntity>> retConvertedList = Lists.newArrayList(retConverted); assertEquals(0, retConvertedList.size()); }