Java 类org.apache.spark.api.java.function.FlatMapFunction 实例源码

项目:incubator-sdap-mudrod    文件:RDDUtil.java   
/**
 * getAllWordsInDoc: Extracted all unique terms from all docs.
 *
 * @param docwordRDD Pair RDD, each key is a doc, and value is term list extracted from
 *                   that doc.
 * @return unique term list
 */
public static JavaRDD<String> getAllWordsInDoc(JavaPairRDD<String, List<String>> docwordRDD) {
  JavaRDD<String> wordRDD = docwordRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Iterator<String> call(List<String> list) {
      return list.iterator();
    }
  }).distinct();

  return wordRDD;
}
项目:incubator-sdap-mudrod    文件:SessionExtractor.java   
/**
 * loadClickStremFromTxt:Load click stream form txt file
 *
 * @param clickthroughFile
 *          txt file
 * @param sc
 *          the spark context
 * @return clickstream list in JavaRDD format {@link ClickStream}
 */
public JavaRDD<ClickStream> loadClickStremFromTxt(String clickthroughFile, JavaSparkContext sc) {
  return sc.textFile(clickthroughFile).flatMap(new FlatMapFunction<String, ClickStream>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @SuppressWarnings("unchecked")
    @Override
    public Iterator<ClickStream> call(String line) throws Exception {
      List<ClickStream> clickthroughs = (List<ClickStream>) ClickStream.parseFromTextLine(line);
      return (Iterator<ClickStream>) clickthroughs;
    }
  });
}
项目:incubator-sdap-mudrod    文件:CrawlerDetection.java   
void checkByRateInParallel() throws InterruptedException, IOException {

    JavaRDD<String> userRDD = getUserRDD(this.httpType);
    LOG.info("Original User count: {}", userRDD.count());

    int userCount = 0;
    userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> {
      ESDriver tmpES = new ESDriver(props);
      tmpES.createBulkProcessor();
      List<Integer> realUserNums = new ArrayList<>();
      while (iterator.hasNext()) {
        String s = iterator.next();
        Integer realUser = checkByRate(tmpES, s);
        realUserNums.add(realUser);
      }
      tmpES.destroyBulkProcessor();
      tmpES.close();
      return realUserNums.iterator();
    }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);

    LOG.info("User count: {}", Integer.toString(userCount));
  }
项目:mudrod    文件:RDDUtil.java   
/**
 * getAllWordsInDoc: Extracted all unique terms from all docs.
 *
 * @param docwordRDD Pair RDD, each key is a doc, and value is term list extracted from
 *                   that doc.
 * @return unique term list
 */
public static JavaRDD<String> getAllWordsInDoc(JavaPairRDD<String, List<String>> docwordRDD) {
  JavaRDD<String> wordRDD = docwordRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Iterator<String> call(List<String> list) {
      return list.iterator();
    }
  }).distinct();

  return wordRDD;
}
项目:mudrod    文件:SessionExtractor.java   
/**
 * loadClickStremFromTxt:Load click stream form txt file
 *
 * @param clickthroughFile
 *          txt file
 * @param sc
 *          the spark context
 * @return clickstream list in JavaRDD format {@link ClickStream}
 */
public JavaRDD<ClickStream> loadClickStremFromTxt(String clickthroughFile, JavaSparkContext sc) {
  return sc.textFile(clickthroughFile).flatMap(new FlatMapFunction<String, ClickStream>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @SuppressWarnings("unchecked")
    @Override
    public Iterator<ClickStream> call(String line) throws Exception {
      List<ClickStream> clickthroughs = (List<ClickStream>) ClickStream.parseFromTextLine(line);
      return (Iterator<ClickStream>) clickthroughs;
    }
  });
}
项目:mudrod    文件:CrawlerDetection.java   
void checkByRateInParallel() throws InterruptedException, IOException {

    JavaRDD<String> userRDD = getUserRDD(this.httpType);
    LOG.info("Original User count: {}", userRDD.count());

    int userCount = 0;
    userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> {
      ESDriver tmpES = new ESDriver(props);
      tmpES.createBulkProcessor();
      List<Integer> realUserNums = new ArrayList<>();
      while (iterator.hasNext()) {
        String s = iterator.next();
        Integer realUser = checkByRate(tmpES, s);
        realUserNums.add(realUser);
      }
      tmpES.destroyBulkProcessor();
      tmpES.close();
      return realUserNums.iterator();
    }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);

    LOG.info("User count: {}", Integer.toString(userCount));
  }
项目:beam    文件:TranslationUtils.java   
/** A pair to {@link KV} flatmap function . */
static <K, V> FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>> fromPairFlatMapFunction() {
  return new FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>>() {
    @Override
    public Iterator<KV<K, V>> call(Iterator<Tuple2<K, V>> itr) {
      final Iterator<KV<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<Tuple2<K, V>, KV<K, V>>() {
                @Override
                public KV<K, V> apply(Tuple2<K, V> t2) {
                  return KV.of(t2._1(), t2._2());
                }
              });
      return outputItr;
    }
  };
}
项目:beam    文件:TranslationUtils.java   
/**
 * A utility method that adapts {@link Function} to a {@link FlatMapFunction} with an {@link
 * Iterator} input. This is particularly useful because it allows to use functions written for map
 * functions in flatmap functions.
 *
 * @param func the {@link Function} to adapt.
 * @param <InputT> the input type.
 * @param <OutputT> the output type.
 * @return a {@link FlatMapFunction} that accepts an {@link Iterator} as an input and applies the
 *     {@link Function} on every element.
 */
public static <InputT, OutputT>
    FlatMapFunction<Iterator<InputT>, OutputT> functionToFlatMapFunction(
        final Function<InputT, OutputT> func) {
  return new FlatMapFunction<Iterator<InputT>, OutputT>() {

    @Override
    public Iterator<OutputT> call(Iterator<InputT> itr) throws Exception {
      final Iterator<OutputT> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<InputT, OutputT>() {

                @Override
                public OutputT apply(InputT t) {
                  try {
                    return func.call(t);
                  } catch (Exception e) {
                    throw new RuntimeException(e);
                  }
                }
              });
      return outputItr;
    }
  };
}
项目:chronix.spark    文件:ChronixSparkContext.java   
/**
 * Low-level chunked query.
 *
 * @param query Solr query
 * @param zkHost Zookeeper host
 * @param collection     the Solr collection of chronix time series data
 * @param chronixStorage a ChronixSolrCloudStorage instance
 * @return ChronixRDD of time series (chunks)
 * @throws SolrServerException
 */
public ChronixRDD queryChronixChunks(
        final SolrQuery query,
        final String zkHost,
        final String collection,
        final ChronixSolrCloudStorage chronixStorage) throws SolrServerException, IOException {

    // first get a list of replicas to query for this collection
    List<String> shards = chronixStorage.getShardList(zkHost, collection);

    // parallelize the requests to the shards
    JavaRDD<MetricTimeSeries> docs = jsc.parallelize(shards, shards.size()).flatMap(
            (FlatMapFunction<String, MetricTimeSeries>) shardUrl -> chronixStorage.streamFromSingleNode(
                    zkHost, collection, shardUrl, query, new MetricTimeSeriesConverter()).iterator());
    return new ChronixRDD(docs);
}
项目:chronix.spark    文件:ChronixRDD.java   
/**
 * Transformation: Transforms the ChronixRDD into a RDD of MetricObservations (pair of timestamp & value + dimensions).
 *
 * @return RDD of MetricObservations
 */
public JavaRDD<MetricObservation> toObservations() {
    return this.flatMap((FlatMapFunction<MetricTimeSeries, MetricObservation>) ts -> ts.points().map(point -> {
        //null-safe read of dimensional values
        String host = ts.attributes().get(MetricDimension.HOST) == null ? null
                : ts.attributes().get(MetricDimension.HOST).toString();
        String series = ts.attributes().get(MetricDimension.MEASUREMENT_SERIES) == null ? null
                : ts.attributes().get(MetricDimension.MEASUREMENT_SERIES).toString();
        String process = ts.attributes().get(MetricDimension.PROCESS) == null ? null
                : ts.attributes().get(MetricDimension.PROCESS).toString();
        String group = ts.attributes().get(MetricDimension.METRIC_GROUP) == null ? null
                : ts.attributes().get(MetricDimension.METRIC_GROUP).toString();
        String ag = ts.attributes().get(MetricDimension.AGGREGATION_LEVEL) == null ? null
                : ts.attributes().get(MetricDimension.AGGREGATION_LEVEL).toString();
        //convert Point/MetricTimeSeries to MetricObservation
        return new MetricObservation(
                ts.getMetric(),
                host, series, process, group, ag,
                point.getTimestamp(),
                point.getValue()
        );
    }).iterator());
}
项目:power-java    文件:HelloSpark.java   
static public void main(String[] args) {
  JavaSparkContext sc = new JavaSparkContext("local", "Hello Spark");

  JavaRDD<String> lines = sc.textFile("data/test1.txt");
  //JavaRDD<String> tokens = lines.flatMap(line -> tokenize(line)); // worked for mllib version 1.5, not for version 2.0
  JavaRDD<String> tokens = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String s) {
      return tokenize(s).iterator();
    }
  });
  JavaPairRDD<String, Integer> counts =
      tokens.mapToPair(
          token ->
              new Tuple2<String, Integer>(token.toLowerCase(), 1))
          .reduceByKey((count1, count2) -> count1 + count2);
  Map countMap = counts.collectAsMap();
  System.out.println(countMap);
  List<Tuple2<String, Integer>> collection = counts.collect();
  System.out.println(collection);
}
项目:SHMACK    文件:WordCount.java   
@SuppressWarnings("serial")
@Override
public SortedCounts<String> execute(final JavaSparkContext spark) {
    final JavaRDD<String> textFile = spark.textFile(inputFile);
    final JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(final String rawJSON) throws TwitterException {
            final Status tweet = TwitterObjectFactory.createStatus(rawJSON);
            String text = tweet.getText();
            return Arrays.asList(text.split(" "));
        }
    });
    final JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(final String s) {
            return new Tuple2<String, Integer>(s.toLowerCase(), 1);
        }
    });
    final JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(final Integer a, final Integer b) {
            return a + b;
        }
    });
    return SortedCounts.create(counts);
}
项目:GeoSpark    文件:SpatialRDD.java   
private JavaRDD<T> partition(final SpatialPartitioner partitioner) {
    return this.rawSpatialRDD.flatMapToPair(
        new PairFlatMapFunction<T, Integer, T>() {
            @Override
            public Iterator<Tuple2<Integer, T>> call(T spatialObject) throws Exception {
                return partitioner.placeObject(spatialObject);
            }
        }
    ).partitionBy(partitioner)
        .mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer, T>>, T>() {
            @Override
            public Iterator<T> call(final Iterator<Tuple2<Integer, T>> tuple2Iterator) throws Exception {
                return new Iterator<T>() {
                    @Override
                    public boolean hasNext() {
                        return tuple2Iterator.hasNext();
                    }

                    @Override
                    public T next() {
                        return tuple2Iterator.next()._2();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };              }
        }, true);
}
项目:learning-spark-examples    文件:BasicFlatMap.java   
public static void main(String[] args) throws Exception {

        if (args.length != 2) {
      throw new Exception("Usage BasicFlatMap sparkMaster inputFile");
        }

    JavaSparkContext sc = new JavaSparkContext(
      args[0], "basicflatmap", System.getenv("SPARK_HOME"), System.getenv("JARS"));
    JavaRDD<String> rdd = sc.textFile(args[1]);
    JavaRDD<String> words = rdd.flatMap(
      new FlatMapFunction<String, String>() { public Iterable<String> call(String x) {
          return Arrays.asList(x.split(" "));
        }});
    Map<String, Long> result = words.countByValue();
    for (Entry<String, Long> entry: result.entrySet()) {
      System.out.println(entry.getKey() + ":" + entry.getValue());
    }
  }
项目:gatk    文件:ReadWalkerSpark.java   
private static FlatMapFunction<Shard<GATKRead>, ReadWalkerContext> getReadsFunction(
        Broadcast<ReferenceMultiSource> bReferenceSource, Broadcast<FeatureManager> bFeatureManager,
        SAMSequenceDictionary sequenceDictionary, int readShardPadding) {
    return (FlatMapFunction<Shard<GATKRead>, ReadWalkerContext>) shard -> {
        // get reference bases for this shard (padded)
        SimpleInterval paddedInterval = shard.getInterval().expandWithinContig(readShardPadding, sequenceDictionary);
        ReferenceDataSource reference = bReferenceSource == null ? null :
                new ReferenceMemorySource(bReferenceSource.getValue().getReferenceBases(paddedInterval), sequenceDictionary);
        FeatureManager features = bFeatureManager == null ? null : bFeatureManager.getValue();

        return StreamSupport.stream(shard.spliterator(), false)
                .map(r -> {
                    final SimpleInterval readInterval = getReadInterval(r);
                    return new ReadWalkerContext(r, new ReferenceContext(reference, readInterval), new FeatureContext(features, readInterval));
                }).iterator();
    };
}
项目:gatk    文件:VariantWalkerSpark.java   
private static FlatMapFunction<Shard<VariantContext>, VariantWalkerContext> getVariantsFunction(
        final Broadcast<ReferenceMultiSource> bReferenceSource,
        final Broadcast<FeatureManager> bFeatureManager,
        final SAMSequenceDictionary sequenceDictionary, final int variantShardPadding) {
    return (FlatMapFunction<Shard<VariantContext>, VariantWalkerContext>) shard -> {
        // get reference bases for this shard (padded)
        SimpleInterval paddedInterval = shard.getInterval().expandWithinContig(variantShardPadding, sequenceDictionary);
        ReferenceDataSource reference = bReferenceSource == null ? null :
                new ReferenceMemorySource(bReferenceSource.getValue().getReferenceBases(paddedInterval), sequenceDictionary);
        FeatureManager features = bFeatureManager == null ? null : bFeatureManager.getValue();

        return StreamSupport.stream(shard.spliterator(), false)
                .filter(v -> v.getStart() >= shard.getStart() && v.getStart() <= shard.getEnd()) // only include variants that start in the shard
                .map(v -> {
                    final SimpleInterval variantInterval = new SimpleInterval(v);
                    return new VariantWalkerContext(v,
                            new ReadsContext(), // empty
                            new ReferenceContext(reference, variantInterval),
                            new FeatureContext(features, variantInterval));
                }).iterator();
    };
}
项目:gatk    文件:HaplotypeCallerSpark.java   
/**
 * @return and RDD of {@link Tuple2<AssemblyRegion, SimpleInterval>} which pairs each AssemblyRegion with the
 * interval it was generated in
 */
private static FlatMapFunction<Iterator<Shard<GATKRead>>, Tuple2<AssemblyRegion, SimpleInterval>> shardsToAssemblyRegions(
        final Broadcast<ReferenceMultiSource> reference,
        final Broadcast<HaplotypeCallerArgumentCollection> hcArgsBroadcast,
        final ShardingArgumentCollection assemblyArgs,
        final SAMFileHeader header,
        final Broadcast<VariantAnnotatorEngine> annotatorEngineBroadcast) {
    return shards -> {
        final ReferenceMultiSource referenceMultiSource = reference.value();
        final ReferenceMultiSourceAdapter referenceSource = new ReferenceMultiSourceAdapter(referenceMultiSource);
        final HaplotypeCallerEngine hcEngine = new HaplotypeCallerEngine(hcArgsBroadcast.value(), false, false, header, referenceSource, annotatorEngineBroadcast.getValue());

        final ReadsDownsampler readsDownsampler = assemblyArgs.maxReadsPerAlignmentStart > 0 ?
            new PositionalDownsampler(assemblyArgs.maxReadsPerAlignmentStart, header) : null;
        return Utils.stream(shards)
                //TODO we've hacked multi interval shards here with a shim, but we should investigate as smarter approach https://github.com/broadinstitute/gatk/issues/4299
            .map(shard -> new ShardToMultiIntervalShardAdapter<>(
                    new DownsampleableSparkReadShard(new ShardBoundary(shard.getInterval(), shard.getPaddedInterval()), shard, readsDownsampler)))
            .flatMap(shardToRegion(assemblyArgs, header, referenceSource, hcEngine)).iterator();
    };
}
项目:gatk    文件:ReadsSparkSourceUnitTest.java   
@Test(dataProvider = "readPairsAndPartitions")
public void testPutPairsInSamePartition(int numPairs, int numPartitions, int[] expectedReadsPerPartition) throws IOException {
    JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
    SAMFileHeader header = ArtificialReadUtils.createArtificialSamHeader();
    header.setSortOrder(SAMFileHeader.SortOrder.queryname);
    JavaRDD<GATKRead> reads = createPairedReads(ctx, header, numPairs, numPartitions);
    ReadsSparkSource readsSparkSource = new ReadsSparkSource(ctx);
    JavaRDD<GATKRead> pairedReads = readsSparkSource.putPairsInSamePartition(header, reads);
    List<List<GATKRead>> partitions = pairedReads.mapPartitions((FlatMapFunction<Iterator<GATKRead>, List<GATKRead>>) it ->
            Iterators.singletonIterator(Lists.newArrayList(it))).collect();
    assertEquals(partitions.size(), numPartitions);
    for (int i = 0; i < numPartitions; i++) {
        assertEquals(partitions.get(i).size(), expectedReadsPerPartition[i]);
    }
    assertEquals(Arrays.stream(expectedReadsPerPartition).sum(), numPairs * 2);
}
项目:deeplearning4j    文件:ParameterAveragingTrainingMaster.java   
protected void doIterationPathsMDS(SparkComputationGraph graph, JavaRDD<String> split, int splitNum, int numSplits,
                int dataSetObjectNumExamples) {
    log.info("Starting training of split {} of {}. workerMiniBatchSize={}, averagingFreq={}, Configured for {} workers",
                    splitNum, numSplits, batchSizePerWorker, averagingFrequency, numWorkers);
    if (collectTrainingStats)
        stats.logMapPartitionsStart();

    JavaRDD<String> splitData = split;
    if (collectTrainingStats)
        stats.logRepartitionStart();
    splitData = SparkUtils.repartition(splitData, repartition, repartitionStrategy,
                    numObjectsEachWorker(dataSetObjectNumExamples), numWorkers);
    int nPartitions = splitData.partitions().size();
    if (collectTrainingStats && repartition != Repartition.Never)
        stats.logRepartitionEnd();


    FlatMapFunction<Iterator<String>, ParameterAveragingTrainingResult> function =
                    new ExecuteWorkerPathMDSFlatMap<>(getWorkerInstance(graph));

    JavaRDD<ParameterAveragingTrainingResult> result = splitData.mapPartitions(function);
    processResults(null, graph, result, splitNum, numSplits);

    if (collectTrainingStats)
        stats.logMapPartitionsEnd(nPartitions);
}
项目:deeplearning4j    文件:ParameterAveragingTrainingMaster.java   
protected void doIteration(SparkComputationGraph graph, JavaRDD<MultiDataSet> split, int splitNum, int numSplits) {
    log.info("Starting training of split {} of {}. workerMiniBatchSize={}, averagingFreq={}, Configured for {} workers",
                    splitNum, numSplits, batchSizePerWorker, averagingFrequency, numWorkers);
    if (collectTrainingStats)
        stats.logMapPartitionsStart();

    JavaRDD<MultiDataSet> splitData = split;

    splitData = SparkUtils.repartition(splitData, repartition, repartitionStrategy,
                    numObjectsEachWorker(rddDataSetNumExamples), numWorkers);
    int nPartitions = split.partitions().size();

    FlatMapFunction<Iterator<MultiDataSet>, ParameterAveragingTrainingResult> function =
                    new ExecuteWorkerMultiDataSetFlatMap<>(getWorkerInstance(graph));
    JavaRDD<ParameterAveragingTrainingResult> result = splitData.mapPartitions(function);
    processResults(null, graph, result, splitNum, numSplits);

    if (collectTrainingStats)
        stats.logMapPartitionsEnd(nPartitions);
}
项目:deeplearning4j    文件:ParameterAveragingTrainingMaster.java   
protected void doIterationPDS_MDS(SparkComputationGraph graph, JavaRDD<PortableDataStream> split, int splitNum,
                int numSplits) {
    log.info("Starting training of split {} of {}. workerMiniBatchSize={}, averagingFreq={}, Configured for {} workers",
                    splitNum, numSplits, batchSizePerWorker, averagingFrequency, numWorkers);
    if (collectTrainingStats)
        stats.logMapPartitionsStart();

    JavaRDD<PortableDataStream> splitData = split;
    if (collectTrainingStats)
        stats.logRepartitionStart();
    splitData = SparkUtils.repartition(splitData, repartition, repartitionStrategy,
                    numObjectsEachWorker(rddDataSetNumExamples), numWorkers);
    int nPartitions = splitData.partitions().size();
    if (collectTrainingStats && repartition != Repartition.Never)
        stats.logRepartitionEnd();

    FlatMapFunction<Iterator<PortableDataStream>, ParameterAveragingTrainingResult> function =
                    new ExecuteWorkerPDSMDSFlatMap<>(getWorkerInstance(graph));

    JavaRDD<ParameterAveragingTrainingResult> result = splitData.mapPartitions(function);
    processResults(null, graph, result, splitNum, numSplits);

    if (collectTrainingStats)
        stats.logMapPartitionsEnd(nPartitions);
}
项目:AbstractRendering    文件:GlyphsetRDD.java   
@Override public Rectangle2D bounds() {
    final JavaRDD<Rectangle2D> rects;
    if (partitions) {
        rects = base.mapPartitions(
            new FlatMapFunction<Iterator<Glyph<G,I>>,Rectangle2D>() {
                public Iterable<Rectangle2D> call(Iterator<Glyph<G, I>> glyphs) throws Exception {
                    ArrayList<Glyph<G,I>> glyphList = Lists.newArrayList(new IterableIterator<>(glyphs));
                    return Arrays.asList(Util.bounds(glyphList));
                }});
    } else {
        rects = base.map(new Function<Glyph<G,I>,Rectangle2D>() {
            public Rectangle2D call(Glyph<G,I> glyph) throws Exception {
                return Util.boundOne(glyph.shape());
            }});
    }

    return rects.reduce(new Function2<Rectangle2D, Rectangle2D,Rectangle2D>() {
        public Rectangle2D call(Rectangle2D left, Rectangle2D right) throws Exception {
            return Util.bounds(left, right);
        }
    });

}
项目:BLASpark    文件:OtherOperations.java   
private static CoordinateMatrix GetLU_COORD(CoordinateMatrix A) {

        JavaRDD<MatrixEntry> rows = A.entries().toJavaRDD().cache();

        JavaRDD<MatrixEntry> LUEntries = rows.mapPartitions(new FlatMapFunction<Iterator<MatrixEntry>, MatrixEntry>() {
            @Override
            public Iterator<MatrixEntry> call(Iterator<MatrixEntry> matrixEntryIterator) throws Exception {
                List<MatrixEntry> newLowerEntries = new ArrayList<MatrixEntry>();


                while(matrixEntryIterator.hasNext()) {
                    MatrixEntry currentEntry = matrixEntryIterator.next();

                    if(currentEntry.i() != currentEntry.j()) {
                        newLowerEntries.add(currentEntry);
                    }
                    else {
                        newLowerEntries.add(new MatrixEntry(currentEntry.i(), currentEntry.j(), 0.0));
                    }

                }

                return newLowerEntries.iterator();
            }
        });

        CoordinateMatrix newMatrix = new CoordinateMatrix(LUEntries.rdd());

        return newMatrix;
    }
项目:BLASpark    文件:OtherOperations.java   
private static CoordinateMatrix GetD_COORD(CoordinateMatrix A, boolean inverseValues, JavaSparkContext jsc) {

        JavaRDD<MatrixEntry> rows = A.entries().toJavaRDD().cache();

        final Broadcast<Boolean> inverseValuesBC = jsc.broadcast(inverseValues);

        JavaRDD<MatrixEntry> LUEntries = rows.mapPartitions(new FlatMapFunction<Iterator<MatrixEntry>, MatrixEntry>() {
            @Override
            public Iterator<MatrixEntry> call(Iterator<MatrixEntry> matrixEntryIterator) throws Exception {
                List<MatrixEntry> newLowerEntries = new ArrayList<MatrixEntry>();

                boolean inverseValuesValue = inverseValuesBC.getValue().booleanValue();

                while(matrixEntryIterator.hasNext()) {
                    MatrixEntry currentEntry = matrixEntryIterator.next();

                    if(currentEntry.i() == currentEntry.j()) {
                        if(inverseValuesValue) {
                            newLowerEntries.add(new MatrixEntry(currentEntry.i(), currentEntry.j(), 1.0/currentEntry.value()));
                        }
                        else {
                            newLowerEntries.add(currentEntry);
                        }

                    }
                    else {
                        newLowerEntries.add(new MatrixEntry(currentEntry.i(), currentEntry.j(), 0.0));
                    }

                }

                return newLowerEntries.iterator();
            }
        });

        CoordinateMatrix newMatrix = new CoordinateMatrix(LUEntries.rdd());

        return newMatrix;
    }
项目:incubator-sdap-mudrod    文件:SessionGenerator.java   
public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException {

    JavaRDD<String> userRDD = getUserRDD(this.cleanupType);

    int sessionCount = 0;
    sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();
        List<Integer> sessionNums = new ArrayList<>();
        while (arg0.hasNext()) {
          String s = arg0.next();
          Integer sessionNum = genSessionByReferer(tmpES, s, timeThres);
          sessionNums.add(sessionNum);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return sessionNums.iterator();
      }
    }).reduce(new Function2<Integer, Integer, Integer>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });

    LOG.info("Initial Session count: {}", Integer.toString(sessionCount));
  }
项目:incubator-sdap-mudrod    文件:SessionStatistic.java   
public void processSessionInParallel() throws InterruptedException, IOException {

    List<String> sessions = this.getSessions();
    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition);

    int sessionCount = 0;
    sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
      @Override
      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();
        List<Integer> sessionNums = new ArrayList<Integer>();
        sessionNums.add(0);
        while (arg0.hasNext()) {
          String s = arg0.next();
          Integer sessionNum = processSession(tmpES, s);
          sessionNums.add(sessionNum);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return sessionNums.iterator();
      }
    }).reduce(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });

    LOG.info("Final Session count: {}", Integer.toString(sessionCount));
  }
项目:Sparkathon    文件:JavaStructuredNetworkWordCount.java   
public static void main(String args[]) throws StreamingQueryException {
    SparkSession spark = SparkSession
            .builder()
            .appName("JavaStructuredNetworkWordCount")
            .master("local")
            .config("spark.sql.shuffle.partitions", 8)
            .getOrCreate();

    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    Dataset<Row> lines = spark
            .readStream()
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load();

   // Split the lines into words
    Dataset<String> words = lines
            .as(Encoders.STRING())
            .flatMap(
                    new FlatMapFunction<String, String>() {
                        @Override
                        public Iterator<String> call(String x) {
                            return Arrays.asList(x.split(" ")).iterator();
                        }
                    }, Encoders.STRING());

    // Generate running word count
    Dataset<Row> wordCounts = words.groupBy("value").count();

    // Start running the query that prints the running counts to the console
    StreamingQuery query = wordCounts.writeStream()
            .outputMode("complete")
            .format("console")
            .start();

    query.awaitTermination();
}
项目:mudrod    文件:SessionGenerator.java   
public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException {

    JavaRDD<String> userRDD = getUserRDD(this.cleanupType);

    int sessionCount = 0;
    sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();
        List<Integer> sessionNums = new ArrayList<>();
        while (arg0.hasNext()) {
          String s = arg0.next();
          Integer sessionNum = genSessionByReferer(tmpES, s, timeThres);
          sessionNums.add(sessionNum);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return sessionNums.iterator();
      }
    }).reduce(new Function2<Integer, Integer, Integer>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });

    LOG.info("Initial Session count: {}", Integer.toString(sessionCount));
  }
项目:mudrod    文件:SessionStatistic.java   
public void processSessionInParallel() throws InterruptedException, IOException {

    List<String> sessions = this.getSessions();
    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition);

    int sessionCount = 0;
    sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
      @Override
      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();
        List<Integer> sessionNums = new ArrayList<Integer>();
        sessionNums.add(0);
        while (arg0.hasNext()) {
          String s = arg0.next();
          Integer sessionNum = processSession(tmpES, s);
          sessionNums.add(sessionNum);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return sessionNums.iterator();
      }
    }).reduce(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });

    LOG.info("Final Session count: {}", Integer.toString(sessionCount));
  }
项目:incubator-pulsar    文件:SparkStreamingPulsarReceiverExample.java   
public static void main(String[] args) throws InterruptedException {
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

    ClientConfiguration clientConf = new ClientConfiguration();
    ConsumerConfiguration consConf = new ConsumerConfiguration();
    String url = "pulsar://localhost:6650/";
    String topic = "persistent://sample/standalone/ns1/topic1";
    String subs = "sub1";

    JavaReceiverInputDStream<byte[]> msgs = jssc
            .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs));

    JavaDStream<Integer> isContainingPulsar = msgs.flatMap(new FlatMapFunction<byte[], Integer>() {
        @Override
        public Iterator<Integer> call(byte[] msg) {
            return Arrays.asList(((new String(msg)).indexOf("Pulsar") != -1) ? 1 : 0).iterator();
        }
    });

    JavaDStream<Integer> numOfPulsar = isContainingPulsar.reduce(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
        }
    });

    numOfPulsar.print();

    jssc.start();
    jssc.awaitTermination();
}
项目:rheem    文件:FunctionCompiler.java   
/**
 * Create an appropriate {@link Function} for deploying the given {@link MapPartitionsDescriptor}
 * on Apache Spark's {@link JavaRDD#mapPartitions(FlatMapFunction)}.
 *
 * @param descriptor      describes the function
 * @param operator        that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
 * @param operatorContext contains optimization information for the {@code operator}
 * @param inputs          that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
 */
public <I, O> FlatMapFunction<Iterator<I>, O> compile(MapPartitionsDescriptor<I, O> descriptor,
                                                      SparkExecutionOperator operator,
                                                      OptimizationContext.OperatorContext operatorContext,
                                                      ChannelInstance[] inputs) {
    final java.util.function.Function<Iterable<I>, Iterable<O>> javaImplementation = descriptor.getJavaImplementation();
    if (javaImplementation instanceof FunctionDescriptor.ExtendedSerializableFunction) {
        return new ExtendedMapPartitionsFunctionAdapter<>(
                (FunctionDescriptor.ExtendedSerializableFunction<Iterable<I>, Iterable<O>>) javaImplementation,
                new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber())
        );
    } else {
        return new MapPartitionsFunctionAdapter<>(javaImplementation);
    }
}
项目:rheem    文件:FunctionCompiler.java   
/**
 * Create an appropriate {@link FlatMapFunction} for deploying the given {@link FlatMapDescriptor}
 * on Apache Spark.
 *
 * @param descriptor      describes the function
 * @param operator        that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
 * @param operatorContext contains optimization information for the {@code operator}
 * @param inputs          that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
 */
public <I, O> FlatMapFunction<I, O> compile(FlatMapDescriptor<I, O> descriptor,
                                            SparkExecutionOperator operator,
                                            OptimizationContext.OperatorContext operatorContext,
                                            ChannelInstance[] inputs) {
    final java.util.function.Function<I, Iterable<O>> javaImplementation = descriptor.getJavaImplementation();
    if (javaImplementation instanceof FunctionDescriptor.ExtendedSerializableFunction) {
        return new ExtendedFlatMapFunctionAdapter<>(
                (FunctionDescriptor.ExtendedSerializableFunction<I, Iterable<O>>) javaImplementation,
                new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber())
        );
    } else {
        return new FlatMapFunctionAdapter<>(javaImplementation);
    }
}
项目:envelope    文件:MorphlineUtils.java   
@SuppressWarnings("serial")
public static FlatMapFunction<Row, Row> morphlineMapper(final String morphlineFile, final String morphlineId,
                                                        final StructType outputSchema) {
  return new FlatMapFunction<Row, Row>() {
    @Override
    public Iterator<Row> call(Row row) throws Exception {
      // Retrieve the Command pipeline via ThreadLocal
      Pipeline pipeline = MorphlineUtils.getPipeline(morphlineFile, morphlineId);

      if (null == pipeline) {
        pipeline = MorphlineUtils.setPipeline(morphlineFile, morphlineId, new Collector(), true);
      }

      // Convert each Row into a Record
      StructType inputSchema = row.schema();
      if (null == inputSchema) {
        throw new RuntimeException("Row does not have an associated StructType schema");
      }

      Record inputRecord = new Record();
      String[] fieldNames = inputSchema.fieldNames();

      // TODO : Confirm nested object conversion
      for (int i = 0; i < fieldNames.length; i++) {
        inputRecord.put(fieldNames[i], row.get(i));
      }

      // Process each Record via the Command pipeline
      List<Record> outputRecords = MorphlineUtils.executePipeline(pipeline, inputRecord);

      // Convert each Record into a new Row
      List<Row> outputRows = Lists.newArrayListWithCapacity(outputRecords.size());
      for (Record record : outputRecords) {
        outputRows.add(MorphlineUtils.convertToRow(outputSchema, record));
      }

      return outputRows.iterator();
    }
  };
}
项目:envelope    文件:TestMorphlineUtils.java   
@Test
public void morphlineMapper(
    final @Mocked MorphlineUtils.Pipeline pipeline,
    final @Mocked Row row,
    final @Mocked StructType schema
) throws Exception {

  new Expectations(MorphlineUtils.class) {{
    MorphlineUtils.getPipeline("file", "id"); result = pipeline; times = 1;
    MorphlineUtils.executePipeline(pipeline, (Record) any); result = Lists.newArrayList(); times = 1;
    row.schema(); result = schema;
    row.get(anyInt); returns("val1", "val2"); times = 2;
    schema.fieldNames(); result = new String[] { "one", "two"};
  }};

  FlatMapFunction<Row, Row> function = MorphlineUtils.morphlineMapper("file", "id", schema);
  Iterator<Row> results = function.call(row);

  assertEquals("Invalid number of Rows returned", 0, Lists.newArrayList(results).size());

  new Verifications() {{
    Record record;
    MorphlineUtils.executePipeline(pipeline, record = withCapture());
    assertEquals(2, record.getFields().size());
    assertEquals("val1", record.get("one").get(0));
  }};
}
项目:envelope    文件:TestMorphlineUtils.java   
@Test
public void morphlineMapperNoPipeline(
    final @Mocked MorphlineUtils.Pipeline pipeline,
    final @Mocked Row row,
    final @Mocked StructType schema
) throws Exception {

  new Expectations(MorphlineUtils.class) {{
    MorphlineUtils.getPipeline("file", "id"); result = null; times = 1;
    MorphlineUtils.setPipeline("file", "id", (MorphlineUtils.Collector) any, true); result = pipeline; times = 1;
    MorphlineUtils.executePipeline(pipeline, (Record) any); result = Lists.newArrayList(); times = 1;
    row.schema(); result = schema;
    row.get(anyInt); returns("val1", "val2"); times = 2;
    schema.fieldNames(); result = new String[] { "one", "two"};
  }};

  FlatMapFunction<Row, Row> function = MorphlineUtils.morphlineMapper("file", "id", schema);
  Iterator<Row> results = function.call(row);

  assertEquals("Invalid number of Rows returned", 0, Lists.newArrayList(results).size());

  new Verifications() {{
    Record record;
    MorphlineUtils.executePipeline(pipeline, record = withCapture());
    assertEquals(2, record.getFields().size());
    assertEquals("val1", record.get("one").get(0));
  }};
}
项目:envelope    文件:TestMorphlineUtils.java   
@Test (expected = RuntimeException.class)
public void morphlineMapperNoSchema(
    final @Mocked MorphlineUtils.Pipeline pipeline,
    final @Mocked Row row,
    final @Mocked StructType schema
) throws Exception {

  new Expectations(MorphlineUtils.class) {{
    MorphlineUtils.getPipeline("file", "id"); result = pipeline; times = 1;
    row.schema(); result = null;
  }};

  FlatMapFunction<Row, Row> function = MorphlineUtils.morphlineMapper("file", "id", schema);
  function.call(row);
}
项目:federator    文件:SparkStreaming.java   
@Override
public String insert(Entity entity, final Set<Value> values) throws ParseException, Exception {

    JavaDStream<Value> cache = lines.flatMap(new FlatMapFunction<String, Value>() {

        @Override
        public Iterable<Value> call(String x) {
            return values;
        }
    });

    cache.persist();

    return entity.getId();
}
项目:federator    文件:SparkRedisStreaming.java   
@Override
public String insert(Entity entity, final Set<Value> values) throws ParseException, Exception {

    JavaDStream<Value> cache = lines.flatMap(new FlatMapFunction<String, Value>() {

        @Override
        public Iterable<Value> call(String x) {
            return values;
        }
    });

    cache.persist();

    return entity.getId();
}
项目:StreamBench    文件:SparkJoinTest.java   
public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkJoinTest");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
        ssc.checkpoint("checkpoint");

        JavaReceiverInputDStream<String> lines = ssc.socketTextStream("127.0.0.1", 9999);
        JavaPairDStream<String, Long> nameStream = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String l) throws Exception {
                return Arrays.asList(l.split(" "));
            }
        }).mapToPair(new PairFunction<String, String, Long>() {
            public Tuple2<String, Long> call(String w) throws Exception {
                return new Tuple2<>(w, 1L);
            }
        }).window(Durations.seconds(30), Durations.seconds(10));

        JavaReceiverInputDStream<String> lines2 = ssc.socketTextStream("127.0.0.1", 9998);
        JavaPairDStream<String, Long> nameAgeStream = lines2.mapToPair(new PairFunction<String, String, Long>() {
            @Override
            public Tuple2<String, Long> call(String s) throws Exception {
                String[] list = s.split(" ");
                String name = list[0];
                long age = 0L;
                if (list.length > 1)
                    age = Long.parseLong(list[1]);
                return new Tuple2<String, Long>(name, age);
            }
        }).window(Durations.seconds(11), Durations.seconds(11));

//        nameStream.print();
//        nameAgeStream.print();
        JavaPairDStream<String, Tuple2<Long, Long>> joinedStream = nameStream.join(nameAgeStream);
        joinedStream.print();

        ssc.start();
        ssc.awaitTermination();
    }
项目:gatk-protected    文件:HaplotypeCallerSpark.java   
/**
 * Call variants from Tuples of AssemblyRegion and Simple Interval
 * The interval should be the non-padded shard boundary for the shard that the corresponding AssemblyRegion was
 * created in, it's used to eliminate redundant variant calls at the edge of shard boundaries.
 */
private static FlatMapFunction<Iterator<Tuple2<AssemblyRegion, SimpleInterval>>, VariantContext> callVariantsFromAssemblyRegions(
        final AuthHolder authHolder,
        final SAMFileHeader header,
        final Broadcast<ReferenceMultiSource> referenceBroadcast,
        final Broadcast<HaplotypeCallerArgumentCollection> hcArgsBroadcast) {
    return regionAndIntervals -> {
        //HaplotypeCallerEngine isn't serializable but is expensive to instantiate, so construct and reuse one for every partition
        final ReferenceMultiSourceAdapter referenceReader = new ReferenceMultiSourceAdapter(referenceBroadcast.getValue(), authHolder);
        final HaplotypeCallerEngine hcEngine = new HaplotypeCallerEngine(hcArgsBroadcast.value(), header, referenceReader);
        return iteratorToStream(regionAndIntervals).flatMap(regionToVariants(hcEngine)).iterator();
    };
}