/** * getAllWordsInDoc: Extracted all unique terms from all docs. * * @param docwordRDD Pair RDD, each key is a doc, and value is term list extracted from * that doc. * @return unique term list */ public static JavaRDD<String> getAllWordsInDoc(JavaPairRDD<String, List<String>> docwordRDD) { JavaRDD<String> wordRDD = docwordRDD.values().flatMap(new FlatMapFunction<List<String>, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterator<String> call(List<String> list) { return list.iterator(); } }).distinct(); return wordRDD; }
/** * loadClickStremFromTxt:Load click stream form txt file * * @param clickthroughFile * txt file * @param sc * the spark context * @return clickstream list in JavaRDD format {@link ClickStream} */ public JavaRDD<ClickStream> loadClickStremFromTxt(String clickthroughFile, JavaSparkContext sc) { return sc.textFile(clickthroughFile).flatMap(new FlatMapFunction<String, ClickStream>() { /** * */ private static final long serialVersionUID = 1L; @SuppressWarnings("unchecked") @Override public Iterator<ClickStream> call(String line) throws Exception { List<ClickStream> clickthroughs = (List<ClickStream>) ClickStream.parseFromTextLine(line); return (Iterator<ClickStream>) clickthroughs; } }); }
void checkByRateInParallel() throws InterruptedException, IOException { JavaRDD<String> userRDD = getUserRDD(this.httpType); LOG.info("Original User count: {}", userRDD.count()); int userCount = 0; userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> { ESDriver tmpES = new ESDriver(props); tmpES.createBulkProcessor(); List<Integer> realUserNums = new ArrayList<>(); while (iterator.hasNext()) { String s = iterator.next(); Integer realUser = checkByRate(tmpES, s); realUserNums.add(realUser); } tmpES.destroyBulkProcessor(); tmpES.close(); return realUserNums.iterator(); }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b); LOG.info("User count: {}", Integer.toString(userCount)); }
/** A pair to {@link KV} flatmap function . */ static <K, V> FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>> fromPairFlatMapFunction() { return new FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>>() { @Override public Iterator<KV<K, V>> call(Iterator<Tuple2<K, V>> itr) { final Iterator<KV<K, V>> outputItr = Iterators.transform( itr, new com.google.common.base.Function<Tuple2<K, V>, KV<K, V>>() { @Override public KV<K, V> apply(Tuple2<K, V> t2) { return KV.of(t2._1(), t2._2()); } }); return outputItr; } }; }
/** * A utility method that adapts {@link Function} to a {@link FlatMapFunction} with an {@link * Iterator} input. This is particularly useful because it allows to use functions written for map * functions in flatmap functions. * * @param func the {@link Function} to adapt. * @param <InputT> the input type. * @param <OutputT> the output type. * @return a {@link FlatMapFunction} that accepts an {@link Iterator} as an input and applies the * {@link Function} on every element. */ public static <InputT, OutputT> FlatMapFunction<Iterator<InputT>, OutputT> functionToFlatMapFunction( final Function<InputT, OutputT> func) { return new FlatMapFunction<Iterator<InputT>, OutputT>() { @Override public Iterator<OutputT> call(Iterator<InputT> itr) throws Exception { final Iterator<OutputT> outputItr = Iterators.transform( itr, new com.google.common.base.Function<InputT, OutputT>() { @Override public OutputT apply(InputT t) { try { return func.call(t); } catch (Exception e) { throw new RuntimeException(e); } } }); return outputItr; } }; }
/** * Low-level chunked query. * * @param query Solr query * @param zkHost Zookeeper host * @param collection the Solr collection of chronix time series data * @param chronixStorage a ChronixSolrCloudStorage instance * @return ChronixRDD of time series (chunks) * @throws SolrServerException */ public ChronixRDD queryChronixChunks( final SolrQuery query, final String zkHost, final String collection, final ChronixSolrCloudStorage chronixStorage) throws SolrServerException, IOException { // first get a list of replicas to query for this collection List<String> shards = chronixStorage.getShardList(zkHost, collection); // parallelize the requests to the shards JavaRDD<MetricTimeSeries> docs = jsc.parallelize(shards, shards.size()).flatMap( (FlatMapFunction<String, MetricTimeSeries>) shardUrl -> chronixStorage.streamFromSingleNode( zkHost, collection, shardUrl, query, new MetricTimeSeriesConverter()).iterator()); return new ChronixRDD(docs); }
/** * Transformation: Transforms the ChronixRDD into a RDD of MetricObservations (pair of timestamp & value + dimensions). * * @return RDD of MetricObservations */ public JavaRDD<MetricObservation> toObservations() { return this.flatMap((FlatMapFunction<MetricTimeSeries, MetricObservation>) ts -> ts.points().map(point -> { //null-safe read of dimensional values String host = ts.attributes().get(MetricDimension.HOST) == null ? null : ts.attributes().get(MetricDimension.HOST).toString(); String series = ts.attributes().get(MetricDimension.MEASUREMENT_SERIES) == null ? null : ts.attributes().get(MetricDimension.MEASUREMENT_SERIES).toString(); String process = ts.attributes().get(MetricDimension.PROCESS) == null ? null : ts.attributes().get(MetricDimension.PROCESS).toString(); String group = ts.attributes().get(MetricDimension.METRIC_GROUP) == null ? null : ts.attributes().get(MetricDimension.METRIC_GROUP).toString(); String ag = ts.attributes().get(MetricDimension.AGGREGATION_LEVEL) == null ? null : ts.attributes().get(MetricDimension.AGGREGATION_LEVEL).toString(); //convert Point/MetricTimeSeries to MetricObservation return new MetricObservation( ts.getMetric(), host, series, process, group, ag, point.getTimestamp(), point.getValue() ); }).iterator()); }
static public void main(String[] args) { JavaSparkContext sc = new JavaSparkContext("local", "Hello Spark"); JavaRDD<String> lines = sc.textFile("data/test1.txt"); //JavaRDD<String> tokens = lines.flatMap(line -> tokenize(line)); // worked for mllib version 1.5, not for version 2.0 JavaRDD<String> tokens = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) { return tokenize(s).iterator(); } }); JavaPairRDD<String, Integer> counts = tokens.mapToPair( token -> new Tuple2<String, Integer>(token.toLowerCase(), 1)) .reduceByKey((count1, count2) -> count1 + count2); Map countMap = counts.collectAsMap(); System.out.println(countMap); List<Tuple2<String, Integer>> collection = counts.collect(); System.out.println(collection); }
@SuppressWarnings("serial") @Override public SortedCounts<String> execute(final JavaSparkContext spark) { final JavaRDD<String> textFile = spark.textFile(inputFile); final JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(final String rawJSON) throws TwitterException { final Status tweet = TwitterObjectFactory.createStatus(rawJSON); String text = tweet.getText(); return Arrays.asList(text.split(" ")); } }); final JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(final String s) { return new Tuple2<String, Integer>(s.toLowerCase(), 1); } }); final JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(final Integer a, final Integer b) { return a + b; } }); return SortedCounts.create(counts); }
private JavaRDD<T> partition(final SpatialPartitioner partitioner) { return this.rawSpatialRDD.flatMapToPair( new PairFlatMapFunction<T, Integer, T>() { @Override public Iterator<Tuple2<Integer, T>> call(T spatialObject) throws Exception { return partitioner.placeObject(spatialObject); } } ).partitionBy(partitioner) .mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer, T>>, T>() { @Override public Iterator<T> call(final Iterator<Tuple2<Integer, T>> tuple2Iterator) throws Exception { return new Iterator<T>() { @Override public boolean hasNext() { return tuple2Iterator.hasNext(); } @Override public T next() { return tuple2Iterator.next()._2(); } @Override public void remove() { throw new UnsupportedOperationException(); } }; } }, true); }
public static void main(String[] args) throws Exception { if (args.length != 2) { throw new Exception("Usage BasicFlatMap sparkMaster inputFile"); } JavaSparkContext sc = new JavaSparkContext( args[0], "basicflatmap", System.getenv("SPARK_HOME"), System.getenv("JARS")); JavaRDD<String> rdd = sc.textFile(args[1]); JavaRDD<String> words = rdd.flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); }}); Map<String, Long> result = words.countByValue(); for (Entry<String, Long> entry: result.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue()); } }
private static FlatMapFunction<Shard<GATKRead>, ReadWalkerContext> getReadsFunction( Broadcast<ReferenceMultiSource> bReferenceSource, Broadcast<FeatureManager> bFeatureManager, SAMSequenceDictionary sequenceDictionary, int readShardPadding) { return (FlatMapFunction<Shard<GATKRead>, ReadWalkerContext>) shard -> { // get reference bases for this shard (padded) SimpleInterval paddedInterval = shard.getInterval().expandWithinContig(readShardPadding, sequenceDictionary); ReferenceDataSource reference = bReferenceSource == null ? null : new ReferenceMemorySource(bReferenceSource.getValue().getReferenceBases(paddedInterval), sequenceDictionary); FeatureManager features = bFeatureManager == null ? null : bFeatureManager.getValue(); return StreamSupport.stream(shard.spliterator(), false) .map(r -> { final SimpleInterval readInterval = getReadInterval(r); return new ReadWalkerContext(r, new ReferenceContext(reference, readInterval), new FeatureContext(features, readInterval)); }).iterator(); }; }
private static FlatMapFunction<Shard<VariantContext>, VariantWalkerContext> getVariantsFunction( final Broadcast<ReferenceMultiSource> bReferenceSource, final Broadcast<FeatureManager> bFeatureManager, final SAMSequenceDictionary sequenceDictionary, final int variantShardPadding) { return (FlatMapFunction<Shard<VariantContext>, VariantWalkerContext>) shard -> { // get reference bases for this shard (padded) SimpleInterval paddedInterval = shard.getInterval().expandWithinContig(variantShardPadding, sequenceDictionary); ReferenceDataSource reference = bReferenceSource == null ? null : new ReferenceMemorySource(bReferenceSource.getValue().getReferenceBases(paddedInterval), sequenceDictionary); FeatureManager features = bFeatureManager == null ? null : bFeatureManager.getValue(); return StreamSupport.stream(shard.spliterator(), false) .filter(v -> v.getStart() >= shard.getStart() && v.getStart() <= shard.getEnd()) // only include variants that start in the shard .map(v -> { final SimpleInterval variantInterval = new SimpleInterval(v); return new VariantWalkerContext(v, new ReadsContext(), // empty new ReferenceContext(reference, variantInterval), new FeatureContext(features, variantInterval)); }).iterator(); }; }
/** * @return and RDD of {@link Tuple2<AssemblyRegion, SimpleInterval>} which pairs each AssemblyRegion with the * interval it was generated in */ private static FlatMapFunction<Iterator<Shard<GATKRead>>, Tuple2<AssemblyRegion, SimpleInterval>> shardsToAssemblyRegions( final Broadcast<ReferenceMultiSource> reference, final Broadcast<HaplotypeCallerArgumentCollection> hcArgsBroadcast, final ShardingArgumentCollection assemblyArgs, final SAMFileHeader header, final Broadcast<VariantAnnotatorEngine> annotatorEngineBroadcast) { return shards -> { final ReferenceMultiSource referenceMultiSource = reference.value(); final ReferenceMultiSourceAdapter referenceSource = new ReferenceMultiSourceAdapter(referenceMultiSource); final HaplotypeCallerEngine hcEngine = new HaplotypeCallerEngine(hcArgsBroadcast.value(), false, false, header, referenceSource, annotatorEngineBroadcast.getValue()); final ReadsDownsampler readsDownsampler = assemblyArgs.maxReadsPerAlignmentStart > 0 ? new PositionalDownsampler(assemblyArgs.maxReadsPerAlignmentStart, header) : null; return Utils.stream(shards) //TODO we've hacked multi interval shards here with a shim, but we should investigate as smarter approach https://github.com/broadinstitute/gatk/issues/4299 .map(shard -> new ShardToMultiIntervalShardAdapter<>( new DownsampleableSparkReadShard(new ShardBoundary(shard.getInterval(), shard.getPaddedInterval()), shard, readsDownsampler))) .flatMap(shardToRegion(assemblyArgs, header, referenceSource, hcEngine)).iterator(); }; }
@Test(dataProvider = "readPairsAndPartitions") public void testPutPairsInSamePartition(int numPairs, int numPartitions, int[] expectedReadsPerPartition) throws IOException { JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); SAMFileHeader header = ArtificialReadUtils.createArtificialSamHeader(); header.setSortOrder(SAMFileHeader.SortOrder.queryname); JavaRDD<GATKRead> reads = createPairedReads(ctx, header, numPairs, numPartitions); ReadsSparkSource readsSparkSource = new ReadsSparkSource(ctx); JavaRDD<GATKRead> pairedReads = readsSparkSource.putPairsInSamePartition(header, reads); List<List<GATKRead>> partitions = pairedReads.mapPartitions((FlatMapFunction<Iterator<GATKRead>, List<GATKRead>>) it -> Iterators.singletonIterator(Lists.newArrayList(it))).collect(); assertEquals(partitions.size(), numPartitions); for (int i = 0; i < numPartitions; i++) { assertEquals(partitions.get(i).size(), expectedReadsPerPartition[i]); } assertEquals(Arrays.stream(expectedReadsPerPartition).sum(), numPairs * 2); }
protected void doIterationPathsMDS(SparkComputationGraph graph, JavaRDD<String> split, int splitNum, int numSplits, int dataSetObjectNumExamples) { log.info("Starting training of split {} of {}. workerMiniBatchSize={}, averagingFreq={}, Configured for {} workers", splitNum, numSplits, batchSizePerWorker, averagingFrequency, numWorkers); if (collectTrainingStats) stats.logMapPartitionsStart(); JavaRDD<String> splitData = split; if (collectTrainingStats) stats.logRepartitionStart(); splitData = SparkUtils.repartition(splitData, repartition, repartitionStrategy, numObjectsEachWorker(dataSetObjectNumExamples), numWorkers); int nPartitions = splitData.partitions().size(); if (collectTrainingStats && repartition != Repartition.Never) stats.logRepartitionEnd(); FlatMapFunction<Iterator<String>, ParameterAveragingTrainingResult> function = new ExecuteWorkerPathMDSFlatMap<>(getWorkerInstance(graph)); JavaRDD<ParameterAveragingTrainingResult> result = splitData.mapPartitions(function); processResults(null, graph, result, splitNum, numSplits); if (collectTrainingStats) stats.logMapPartitionsEnd(nPartitions); }
protected void doIteration(SparkComputationGraph graph, JavaRDD<MultiDataSet> split, int splitNum, int numSplits) { log.info("Starting training of split {} of {}. workerMiniBatchSize={}, averagingFreq={}, Configured for {} workers", splitNum, numSplits, batchSizePerWorker, averagingFrequency, numWorkers); if (collectTrainingStats) stats.logMapPartitionsStart(); JavaRDD<MultiDataSet> splitData = split; splitData = SparkUtils.repartition(splitData, repartition, repartitionStrategy, numObjectsEachWorker(rddDataSetNumExamples), numWorkers); int nPartitions = split.partitions().size(); FlatMapFunction<Iterator<MultiDataSet>, ParameterAveragingTrainingResult> function = new ExecuteWorkerMultiDataSetFlatMap<>(getWorkerInstance(graph)); JavaRDD<ParameterAveragingTrainingResult> result = splitData.mapPartitions(function); processResults(null, graph, result, splitNum, numSplits); if (collectTrainingStats) stats.logMapPartitionsEnd(nPartitions); }
protected void doIterationPDS_MDS(SparkComputationGraph graph, JavaRDD<PortableDataStream> split, int splitNum, int numSplits) { log.info("Starting training of split {} of {}. workerMiniBatchSize={}, averagingFreq={}, Configured for {} workers", splitNum, numSplits, batchSizePerWorker, averagingFrequency, numWorkers); if (collectTrainingStats) stats.logMapPartitionsStart(); JavaRDD<PortableDataStream> splitData = split; if (collectTrainingStats) stats.logRepartitionStart(); splitData = SparkUtils.repartition(splitData, repartition, repartitionStrategy, numObjectsEachWorker(rddDataSetNumExamples), numWorkers); int nPartitions = splitData.partitions().size(); if (collectTrainingStats && repartition != Repartition.Never) stats.logRepartitionEnd(); FlatMapFunction<Iterator<PortableDataStream>, ParameterAveragingTrainingResult> function = new ExecuteWorkerPDSMDSFlatMap<>(getWorkerInstance(graph)); JavaRDD<ParameterAveragingTrainingResult> result = splitData.mapPartitions(function); processResults(null, graph, result, splitNum, numSplits); if (collectTrainingStats) stats.logMapPartitionsEnd(nPartitions); }
@Override public Rectangle2D bounds() { final JavaRDD<Rectangle2D> rects; if (partitions) { rects = base.mapPartitions( new FlatMapFunction<Iterator<Glyph<G,I>>,Rectangle2D>() { public Iterable<Rectangle2D> call(Iterator<Glyph<G, I>> glyphs) throws Exception { ArrayList<Glyph<G,I>> glyphList = Lists.newArrayList(new IterableIterator<>(glyphs)); return Arrays.asList(Util.bounds(glyphList)); }}); } else { rects = base.map(new Function<Glyph<G,I>,Rectangle2D>() { public Rectangle2D call(Glyph<G,I> glyph) throws Exception { return Util.boundOne(glyph.shape()); }}); } return rects.reduce(new Function2<Rectangle2D, Rectangle2D,Rectangle2D>() { public Rectangle2D call(Rectangle2D left, Rectangle2D right) throws Exception { return Util.bounds(left, right); } }); }
private static CoordinateMatrix GetLU_COORD(CoordinateMatrix A) { JavaRDD<MatrixEntry> rows = A.entries().toJavaRDD().cache(); JavaRDD<MatrixEntry> LUEntries = rows.mapPartitions(new FlatMapFunction<Iterator<MatrixEntry>, MatrixEntry>() { @Override public Iterator<MatrixEntry> call(Iterator<MatrixEntry> matrixEntryIterator) throws Exception { List<MatrixEntry> newLowerEntries = new ArrayList<MatrixEntry>(); while(matrixEntryIterator.hasNext()) { MatrixEntry currentEntry = matrixEntryIterator.next(); if(currentEntry.i() != currentEntry.j()) { newLowerEntries.add(currentEntry); } else { newLowerEntries.add(new MatrixEntry(currentEntry.i(), currentEntry.j(), 0.0)); } } return newLowerEntries.iterator(); } }); CoordinateMatrix newMatrix = new CoordinateMatrix(LUEntries.rdd()); return newMatrix; }
private static CoordinateMatrix GetD_COORD(CoordinateMatrix A, boolean inverseValues, JavaSparkContext jsc) { JavaRDD<MatrixEntry> rows = A.entries().toJavaRDD().cache(); final Broadcast<Boolean> inverseValuesBC = jsc.broadcast(inverseValues); JavaRDD<MatrixEntry> LUEntries = rows.mapPartitions(new FlatMapFunction<Iterator<MatrixEntry>, MatrixEntry>() { @Override public Iterator<MatrixEntry> call(Iterator<MatrixEntry> matrixEntryIterator) throws Exception { List<MatrixEntry> newLowerEntries = new ArrayList<MatrixEntry>(); boolean inverseValuesValue = inverseValuesBC.getValue().booleanValue(); while(matrixEntryIterator.hasNext()) { MatrixEntry currentEntry = matrixEntryIterator.next(); if(currentEntry.i() == currentEntry.j()) { if(inverseValuesValue) { newLowerEntries.add(new MatrixEntry(currentEntry.i(), currentEntry.j(), 1.0/currentEntry.value())); } else { newLowerEntries.add(currentEntry); } } else { newLowerEntries.add(new MatrixEntry(currentEntry.i(), currentEntry.j(), 0.0)); } } return newLowerEntries.iterator(); } }); CoordinateMatrix newMatrix = new CoordinateMatrix(LUEntries.rdd()); return newMatrix; }
public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException { JavaRDD<String> userRDD = getUserRDD(this.cleanupType); int sessionCount = 0; sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterator<Integer> call(Iterator<String> arg0) throws Exception { ESDriver tmpES = new ESDriver(props); tmpES.createBulkProcessor(); List<Integer> sessionNums = new ArrayList<>(); while (arg0.hasNext()) { String s = arg0.next(); Integer sessionNum = genSessionByReferer(tmpES, s, timeThres); sessionNums.add(sessionNum); } tmpES.destroyBulkProcessor(); tmpES.close(); return sessionNums.iterator(); } }).reduce(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer a, Integer b) { return a + b; } }); LOG.info("Initial Session count: {}", Integer.toString(sessionCount)); }
public void processSessionInParallel() throws InterruptedException, IOException { List<String> sessions = this.getSessions(); JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition); int sessionCount = 0; sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { @Override public Iterator<Integer> call(Iterator<String> arg0) throws Exception { ESDriver tmpES = new ESDriver(props); tmpES.createBulkProcessor(); List<Integer> sessionNums = new ArrayList<Integer>(); sessionNums.add(0); while (arg0.hasNext()) { String s = arg0.next(); Integer sessionNum = processSession(tmpES, s); sessionNums.add(sessionNum); } tmpES.destroyBulkProcessor(); tmpES.close(); return sessionNums.iterator(); } }).reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer a, Integer b) { return a + b; } }); LOG.info("Final Session count: {}", Integer.toString(sessionCount)); }
public static void main(String args[]) throws StreamingQueryException { SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .master("local") .config("spark.sql.shuffle.partitions", 8) .getOrCreate(); // Create DataFrame representing the stream of input lines from connection to localhost:9999 Dataset<Row> lines = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load(); // Split the lines into words Dataset<String> words = lines .as(Encoders.STRING()) .flatMap( new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }, Encoders.STRING()); // Generate running word count Dataset<Row> wordCounts = words.groupBy("value").count(); // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); }
public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); ClientConfiguration clientConf = new ClientConfiguration(); ConsumerConfiguration consConf = new ConsumerConfiguration(); String url = "pulsar://localhost:6650/"; String topic = "persistent://sample/standalone/ns1/topic1"; String subs = "sub1"; JavaReceiverInputDStream<byte[]> msgs = jssc .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs)); JavaDStream<Integer> isContainingPulsar = msgs.flatMap(new FlatMapFunction<byte[], Integer>() { @Override public Iterator<Integer> call(byte[] msg) { return Arrays.asList(((new String(msg)).indexOf("Pulsar") != -1) ? 1 : 0).iterator(); } }); JavaDStream<Integer> numOfPulsar = isContainingPulsar.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); numOfPulsar.print(); jssc.start(); jssc.awaitTermination(); }
/** * Create an appropriate {@link Function} for deploying the given {@link MapPartitionsDescriptor} * on Apache Spark's {@link JavaRDD#mapPartitions(FlatMapFunction)}. * * @param descriptor describes the function * @param operator that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction} * @param operatorContext contains optimization information for the {@code operator} * @param inputs that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction} */ public <I, O> FlatMapFunction<Iterator<I>, O> compile(MapPartitionsDescriptor<I, O> descriptor, SparkExecutionOperator operator, OptimizationContext.OperatorContext operatorContext, ChannelInstance[] inputs) { final java.util.function.Function<Iterable<I>, Iterable<O>> javaImplementation = descriptor.getJavaImplementation(); if (javaImplementation instanceof FunctionDescriptor.ExtendedSerializableFunction) { return new ExtendedMapPartitionsFunctionAdapter<>( (FunctionDescriptor.ExtendedSerializableFunction<Iterable<I>, Iterable<O>>) javaImplementation, new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber()) ); } else { return new MapPartitionsFunctionAdapter<>(javaImplementation); } }
/** * Create an appropriate {@link FlatMapFunction} for deploying the given {@link FlatMapDescriptor} * on Apache Spark. * * @param descriptor describes the function * @param operator that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction} * @param operatorContext contains optimization information for the {@code operator} * @param inputs that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction} */ public <I, O> FlatMapFunction<I, O> compile(FlatMapDescriptor<I, O> descriptor, SparkExecutionOperator operator, OptimizationContext.OperatorContext operatorContext, ChannelInstance[] inputs) { final java.util.function.Function<I, Iterable<O>> javaImplementation = descriptor.getJavaImplementation(); if (javaImplementation instanceof FunctionDescriptor.ExtendedSerializableFunction) { return new ExtendedFlatMapFunctionAdapter<>( (FunctionDescriptor.ExtendedSerializableFunction<I, Iterable<O>>) javaImplementation, new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber()) ); } else { return new FlatMapFunctionAdapter<>(javaImplementation); } }
@SuppressWarnings("serial") public static FlatMapFunction<Row, Row> morphlineMapper(final String morphlineFile, final String morphlineId, final StructType outputSchema) { return new FlatMapFunction<Row, Row>() { @Override public Iterator<Row> call(Row row) throws Exception { // Retrieve the Command pipeline via ThreadLocal Pipeline pipeline = MorphlineUtils.getPipeline(morphlineFile, morphlineId); if (null == pipeline) { pipeline = MorphlineUtils.setPipeline(morphlineFile, morphlineId, new Collector(), true); } // Convert each Row into a Record StructType inputSchema = row.schema(); if (null == inputSchema) { throw new RuntimeException("Row does not have an associated StructType schema"); } Record inputRecord = new Record(); String[] fieldNames = inputSchema.fieldNames(); // TODO : Confirm nested object conversion for (int i = 0; i < fieldNames.length; i++) { inputRecord.put(fieldNames[i], row.get(i)); } // Process each Record via the Command pipeline List<Record> outputRecords = MorphlineUtils.executePipeline(pipeline, inputRecord); // Convert each Record into a new Row List<Row> outputRows = Lists.newArrayListWithCapacity(outputRecords.size()); for (Record record : outputRecords) { outputRows.add(MorphlineUtils.convertToRow(outputSchema, record)); } return outputRows.iterator(); } }; }
@Test public void morphlineMapper( final @Mocked MorphlineUtils.Pipeline pipeline, final @Mocked Row row, final @Mocked StructType schema ) throws Exception { new Expectations(MorphlineUtils.class) {{ MorphlineUtils.getPipeline("file", "id"); result = pipeline; times = 1; MorphlineUtils.executePipeline(pipeline, (Record) any); result = Lists.newArrayList(); times = 1; row.schema(); result = schema; row.get(anyInt); returns("val1", "val2"); times = 2; schema.fieldNames(); result = new String[] { "one", "two"}; }}; FlatMapFunction<Row, Row> function = MorphlineUtils.morphlineMapper("file", "id", schema); Iterator<Row> results = function.call(row); assertEquals("Invalid number of Rows returned", 0, Lists.newArrayList(results).size()); new Verifications() {{ Record record; MorphlineUtils.executePipeline(pipeline, record = withCapture()); assertEquals(2, record.getFields().size()); assertEquals("val1", record.get("one").get(0)); }}; }
@Test public void morphlineMapperNoPipeline( final @Mocked MorphlineUtils.Pipeline pipeline, final @Mocked Row row, final @Mocked StructType schema ) throws Exception { new Expectations(MorphlineUtils.class) {{ MorphlineUtils.getPipeline("file", "id"); result = null; times = 1; MorphlineUtils.setPipeline("file", "id", (MorphlineUtils.Collector) any, true); result = pipeline; times = 1; MorphlineUtils.executePipeline(pipeline, (Record) any); result = Lists.newArrayList(); times = 1; row.schema(); result = schema; row.get(anyInt); returns("val1", "val2"); times = 2; schema.fieldNames(); result = new String[] { "one", "two"}; }}; FlatMapFunction<Row, Row> function = MorphlineUtils.morphlineMapper("file", "id", schema); Iterator<Row> results = function.call(row); assertEquals("Invalid number of Rows returned", 0, Lists.newArrayList(results).size()); new Verifications() {{ Record record; MorphlineUtils.executePipeline(pipeline, record = withCapture()); assertEquals(2, record.getFields().size()); assertEquals("val1", record.get("one").get(0)); }}; }
@Test (expected = RuntimeException.class) public void morphlineMapperNoSchema( final @Mocked MorphlineUtils.Pipeline pipeline, final @Mocked Row row, final @Mocked StructType schema ) throws Exception { new Expectations(MorphlineUtils.class) {{ MorphlineUtils.getPipeline("file", "id"); result = pipeline; times = 1; row.schema(); result = null; }}; FlatMapFunction<Row, Row> function = MorphlineUtils.morphlineMapper("file", "id", schema); function.call(row); }
@Override public String insert(Entity entity, final Set<Value> values) throws ParseException, Exception { JavaDStream<Value> cache = lines.flatMap(new FlatMapFunction<String, Value>() { @Override public Iterable<Value> call(String x) { return values; } }); cache.persist(); return entity.getId(); }
public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkJoinTest"); JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1)); ssc.checkpoint("checkpoint"); JavaReceiverInputDStream<String> lines = ssc.socketTextStream("127.0.0.1", 9999); JavaPairDStream<String, Long> nameStream = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String l) throws Exception { return Arrays.asList(l.split(" ")); } }).mapToPair(new PairFunction<String, String, Long>() { public Tuple2<String, Long> call(String w) throws Exception { return new Tuple2<>(w, 1L); } }).window(Durations.seconds(30), Durations.seconds(10)); JavaReceiverInputDStream<String> lines2 = ssc.socketTextStream("127.0.0.1", 9998); JavaPairDStream<String, Long> nameAgeStream = lines2.mapToPair(new PairFunction<String, String, Long>() { @Override public Tuple2<String, Long> call(String s) throws Exception { String[] list = s.split(" "); String name = list[0]; long age = 0L; if (list.length > 1) age = Long.parseLong(list[1]); return new Tuple2<String, Long>(name, age); } }).window(Durations.seconds(11), Durations.seconds(11)); // nameStream.print(); // nameAgeStream.print(); JavaPairDStream<String, Tuple2<Long, Long>> joinedStream = nameStream.join(nameAgeStream); joinedStream.print(); ssc.start(); ssc.awaitTermination(); }
/** * Call variants from Tuples of AssemblyRegion and Simple Interval * The interval should be the non-padded shard boundary for the shard that the corresponding AssemblyRegion was * created in, it's used to eliminate redundant variant calls at the edge of shard boundaries. */ private static FlatMapFunction<Iterator<Tuple2<AssemblyRegion, SimpleInterval>>, VariantContext> callVariantsFromAssemblyRegions( final AuthHolder authHolder, final SAMFileHeader header, final Broadcast<ReferenceMultiSource> referenceBroadcast, final Broadcast<HaplotypeCallerArgumentCollection> hcArgsBroadcast) { return regionAndIntervals -> { //HaplotypeCallerEngine isn't serializable but is expensive to instantiate, so construct and reuse one for every partition final ReferenceMultiSourceAdapter referenceReader = new ReferenceMultiSourceAdapter(referenceBroadcast.getValue(), authHolder); final HaplotypeCallerEngine hcEngine = new HaplotypeCallerEngine(hcArgsBroadcast.value(), header, referenceReader); return iteratorToStream(regionAndIntervals).flatMap(regionToVariants(hcEngine)).iterator(); }; }