Java 类org.apache.spark.api.java.function.PairFlatMapFunction 实例源码

项目:beam    文件:TranslationUtils.java   
/** {@link KV} to pair flatmap function. */
public static <K, V> PairFlatMapFunction<Iterator<KV<K, V>>, K, V> toPairFlatMapFunction() {
  return new PairFlatMapFunction<Iterator<KV<K, V>>, K, V>() {
    @Override
    public Iterator<Tuple2<K, V>> call(final Iterator<KV<K, V>> itr) {
      final Iterator<Tuple2<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<KV<K, V>, Tuple2<K, V>>() {

                @Override
                public Tuple2<K, V> apply(KV<K, V> kv) {
                  return new Tuple2<>(kv.getKey(), kv.getValue());
                }
              });
      return outputItr;
    }
  };
}
项目:beam    文件:TranslationUtils.java   
/**
 * A utility method that adapts {@link PairFunction} to a {@link PairFlatMapFunction} with an
 * {@link Iterator} input. This is particularly useful because it allows to use functions written
 * for mapToPair functions in flatmapToPair functions.
 *
 * @param pairFunction the {@link PairFunction} to adapt.
 * @param <T> the input type.
 * @param <K> the output key type.
 * @param <V> the output value type.
 * @return a {@link PairFlatMapFunction} that accepts an {@link Iterator} as an input and applies
 *     the {@link PairFunction} on every element.
 */
public static <T, K, V> PairFlatMapFunction<Iterator<T>, K, V> pairFunctionToPairFlatMapFunction(
    final PairFunction<T, K, V> pairFunction) {
  return new PairFlatMapFunction<Iterator<T>, K, V>() {

    @Override
    public Iterator<Tuple2<K, V>> call(Iterator<T> itr) throws Exception {
      final Iterator<Tuple2<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<T, Tuple2<K, V>>() {

                @Override
                public Tuple2<K, V> apply(T t) {
                  try {
                    return pairFunction.call(t);
                  } catch (Exception e) {
                    throw new RuntimeException(e);
                  }
                }
              });
      return outputItr;
    }
  };
}
项目:GeoSpark    文件:SpatialRDD.java   
private JavaRDD<T> partition(final SpatialPartitioner partitioner) {
    return this.rawSpatialRDD.flatMapToPair(
        new PairFlatMapFunction<T, Integer, T>() {
            @Override
            public Iterator<Tuple2<Integer, T>> call(T spatialObject) throws Exception {
                return partitioner.placeObject(spatialObject);
            }
        }
    ).partitionBy(partitioner)
        .mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer, T>>, T>() {
            @Override
            public Iterator<T> call(final Iterator<Tuple2<Integer, T>> tuple2Iterator) throws Exception {
                return new Iterator<T>() {
                    @Override
                    public boolean hasNext() {
                        return tuple2Iterator.hasNext();
                    }

                    @Override
                    public T next() {
                        return tuple2Iterator.next()._2();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };              }
        }, true);
}
项目:StreamBench    文件:SparkWorkloadOperator.java   
@Override
public <K, V> PairWorkloadOperator<K, V> flatMapToPair(final FlatMapPairFunction<T, K, V> fun,
                                                       String componentId) {
    JavaPairDStream<K, V> pairDStream = dStream.flatMapToPair(new PairFlatMapFunction<T, K, V>() {
        @Override
        public Iterable<Tuple2<K, V>> call(T t) throws Exception {
            return fun.flatMapToPair(t);
        }
    });
    return new SparkPairWorkloadOperator<>(pairDStream, parallelism);
}
项目:iis    文件:ReferenceMetadataInputReaderTest.java   
private void assertDocToCitationsFunction(PairFlatMapFunction<DocumentMetadata, String, ReferenceMetadata> function) throws Exception {
    ReferenceMetadata refMetadata1 = ReferenceMetadata.newBuilder().setPosition(3).setBasicMetadata(new BasicMetadata()).build();
    ReferenceMetadata refMetadata2 = ReferenceMetadata.newBuilder().setPosition(5).setBasicMetadata(new BasicMetadata()).build();
    ReferenceMetadata refMetadata3 = ReferenceMetadata.newBuilder().setPosition(6).setBasicMetadata(new BasicMetadata()).build();

    DocumentMetadata docMetadata = DocumentMetadata.newBuilder()
            .setId("someId")
            .setBasicMetadata(new BasicMetadata())
            .setReferences(Lists.newArrayList(refMetadata1, refMetadata2, refMetadata3))
            .build();


    Iterable<Tuple2<String, ReferenceMetadata>> retCitations = function.call(docMetadata);


    List<Tuple2<String, ReferenceMetadata>> retCitationsList = Lists.newArrayList(retCitations);
    assertEquals(3, retCitationsList.size());

    assertEquals("cit_someId_3", retCitationsList.get(0)._1);
    assertTrue(retCitationsList.get(0)._2 == refMetadata1);

    assertEquals("cit_someId_5", retCitationsList.get(1)._1);
    assertTrue(retCitationsList.get(1)._2 == refMetadata2);

    assertEquals("cit_someId_6", retCitationsList.get(2)._1);
    assertTrue(retCitationsList.get(2)._2 == refMetadata3);
}
项目:GeoSpark    文件:VisualizationOperator.java   
/**
 * Spatial partitioning with duplicates.
 *
 * @return true, if successful
 * @throws Exception the exception
 */
private boolean spatialPartitioningWithDuplicates() throws Exception
{
    this.distributedRasterCountMatrix = this.distributedRasterCountMatrix.flatMapToPair(new PairFlatMapFunction<Tuple2<Pixel, Double>, Pixel, Double>() {
        @Override
        public Iterator<Tuple2<Pixel, Double>> call(Tuple2<Pixel, Double> pixelDoubleTuple2) throws Exception {
            VisualizationPartitioner vizPartitioner = new VisualizationPartitioner(resolutionX,resolutionY,partitionX,partitionY);
            return vizPartitioner.assignPartitionIDs(pixelDoubleTuple2, photoFilterRadius).iterator();
        }
    });
    this.distributedRasterCountMatrix = this.distributedRasterCountMatrix.partitionBy(new VisualizationPartitioner(this.resolutionX,this.resolutionY,this.partitionX,this.partitionY));
    return true;
}
项目:iis    文件:ReferenceMetadataInputConverterTest.java   
private void assertConvertCitationFunction(PairFlatMapFunction<Tuple2<String, ReferenceMetadata>, String, MatchableEntity> function) throws Exception {
    ReferenceMetadata referenceMetadata = mock(ReferenceMetadata.class);
    MatchableEntity matchableEntity = mock(MatchableEntity.class);

    when(converter.convertToMatchableEntity("cit_id_3", referenceMetadata)).thenReturn(matchableEntity);
    doReturn(Option.apply("some raw text")).when(matchableEntity).rawText();


    Iterable<Tuple2<String, MatchableEntity>> retConverted = function.call(new Tuple2<>("cit_id_3", referenceMetadata));


    List<Tuple2<String, MatchableEntity>> retConvertedList = Lists.newArrayList(retConverted);
    assertEquals(1, retConvertedList.size());
    assertTrue(retConvertedList.get(0)._2 == matchableEntity);
    assertEquals("cit_id_3", retConvertedList.get(0)._1);

}
项目:Test_Projects    文件:PageRank.java   
public static void main(String[] args) {
    long start = System.currentTimeMillis();
    SparkConf sparkConf = new SparkConf();
    sparkConf.setMaster("local");
    sparkConf.setAppName("TestSpark");

    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    JavaRDD<String> input = sc.parallelize(data);

    // represent the input data as a key value
    JavaPairRDD<String, Iterable<String>> links = input.mapToPair(
            new PairFunction<String, String, String>() {
                @Override
                public Tuple2<String, String> call(String x)
                        throws Exception {
                    String[] array = x.split(" ");
                    return new Tuple2(array[0], array[1]);
                }
            }).groupByKey();
    links.partitionBy(new HashPartitioner(5));

    // initialize ranks
    JavaPairRDD<String, Double> ranks = links.mapValues(x -> 1.0);

    // Calculates and updates URL ranks continuously using PageRank algorithm.
    for (int current = 0; current < 10; current++) {
        // Calculates URL contributions to the rank of other URLs.
        JavaRDD<Tuple2<Iterable<String>, Double>> values = links.join(ranks).values();

        JavaPairRDD<String, Double> contribs = values
                .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
                    @Override
                    public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
                        int urlCount = Iterables.size(s._1);
                        List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
                        for (String n : s._1) {
                            results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
                        }
                        return results;
                    }
                });

        // Re-calculates URL ranks based on neighbor contributions.
        ranks = contribs
                    .reduceByKey((a, b) -> a + b)
                    .mapValues(x -> 0.15 + x * 0.85);

        //print(ranks);
    }

    print(ranks);

    sc.stop();
    System.out.println((System.currentTimeMillis() - start)/1000 + " seconds");
}
项目:iis    文件:ExternalIdReferenceExtractorTest.java   
private void assertFlatMapCitationsFunction(PairFlatMapFunction<DocumentMetadata, String, Citation> function) throws Exception {
    ReferenceMetadata referenceMetadata1 = new ReferenceMetadata(1, null);
    ReferenceMetadata referenceMetadata2 = new ReferenceMetadata(2, Maps.newHashMap());

    ReferenceMetadata referenceMetadata3 = new ReferenceMetadata(3, Maps.newHashMap());
    referenceMetadata3.getExternalIds().put("someIdType", "ref.id1");
    referenceMetadata3.getExternalIds().put("someOtherIdType", "ref.other.id1");

    ReferenceMetadata referenceMetadata4 = new ReferenceMetadata(4, Maps.newHashMap());
    referenceMetadata4.getExternalIds().put("someIdType", "ref.id2");
    referenceMetadata4.getExternalIds().put("someOtherIdType", "ref.other.id2");



    assertThat(function.call(new DocumentMetadata("id-1", null, null, Lists.newArrayList())), iterableWithSize(0));
    assertThat(function.call(new DocumentMetadata("id-1", null, null, Lists.newArrayList(referenceMetadata1))), iterableWithSize(0));
    assertThat(function.call(new DocumentMetadata("id-1", null, null, Lists.newArrayList(referenceMetadata2))), iterableWithSize(0));

    Iterable<Tuple2<String, Citation>> references = function.call(new DocumentMetadata("id-1", null, null, Lists.newArrayList(referenceMetadata3, referenceMetadata4)));

    assertThat(references, iterableWithSize(2));
    Iterator<Tuple2<String, Citation>> referenceIterator = references.iterator();
    Tuple2<String, Citation> firstReference = referenceIterator.next();
    Tuple2<String, Citation> secondReference = referenceIterator.next();

    assertThat(firstReference, equalTo(new Tuple2<String, Citation>("ref.id1", new Citation("id-1", 3, null))));
    assertThat(secondReference, equalTo(new Tuple2<String, Citation>("ref.id2", new Citation("id-1", 4, null))));


}
项目:iis    文件:ReferenceMetadataInputConverterTest.java   
private void assertConvertCitationFunction_TOO_LONG_RAW_TEXT(PairFlatMapFunction<Tuple2<String, ReferenceMetadata>, String, MatchableEntity> function) throws Exception {
    ReferenceMetadata referenceMetadata = mock(ReferenceMetadata.class);
    MatchableEntity matchableEntity = mock(MatchableEntity.class);

    when(converter.convertToMatchableEntity("cit_id_3", referenceMetadata)).thenReturn(matchableEntity);
    doReturn(Option.apply(StringUtils.repeat('a', 10001))).when(matchableEntity).rawText();


    Iterable<Tuple2<String, MatchableEntity>> retConverted = function.call(new Tuple2<>("cit_id_3", referenceMetadata));


    List<Tuple2<String, MatchableEntity>> retConvertedList = Lists.newArrayList(retConverted);
    assertEquals(0, retConvertedList.size());

}