public static void main(String[] args) { if (args.length != 2) { System.err.println("Usage:"); System.err.println(" SparkWordCount <sourceFile> <targetFile>"); System.exit(1); } SparkConf conf = new SparkConf() .setAppName("Word Count"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> textFile = sc.textFile(args[0]); JavaRDD<String> words = textFile.flatMap(LineIterator::new); JavaPairRDD<String, Long> pairs = words.mapToPair(s -> new Tuple2<>(s, 1L)); JavaPairRDD<String, Long> counts = pairs.reduceByKey((Function2<Long, Long, Long>) (a, b) -> a + b); System.out.println("Starting task.."); long t = System.currentTimeMillis(); counts.saveAsTextFile(args[1] + "_" + t); System.out.println("Time=" + (System.currentTimeMillis() - t)); }
void checkByRateInParallel() throws InterruptedException, IOException { JavaRDD<String> userRDD = getUserRDD(this.httpType); LOG.info("Original User count: {}", userRDD.count()); int userCount = 0; userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> { ESDriver tmpES = new ESDriver(props); tmpES.createBulkProcessor(); List<Integer> realUserNums = new ArrayList<>(); while (iterator.hasNext()) { String s = iterator.next(); Integer realUser = checkByRate(tmpES, s); realUserNums.add(realUser); } tmpES.destroyBulkProcessor(); tmpES.close(); return realUserNums.iterator(); }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b); LOG.info("User count: {}", Integer.toString(userCount)); }
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Big Apple").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); class GetLength implements Function<String, Integer> { public Integer call(String s) { return s.length(); } } class Sum implements Function2<Integer, Integer, Integer> { public Integer call(Integer a, Integer b) { return a + b; } } JavaRDD<String> lines = sc.textFile("src/main/resources/compressed.gz"); JavaRDD<Integer> lineLengths = lines.map(new GetLength()); // Printing an RDD lineLengths.foreach(x-> System.out.println(x)); int totalLength = lineLengths.reduce(new Sum()); System.out.println(totalLength); }
/** * Starts the spark context given a valid configuration. * starts a test map-reduce such that all spark workers can fetch dependencies in advance */ private static void startContext(int numOfWorkers) { JavaSparkContext sc = SharedService.getContext(); for (int i=0; i<numOfWorkers;i++) { final int threadnumber = i; new Thread(){ @Override public void run() { ImmutableList<Integer> range = ContiguousSet.create(Range.closed(1, 5), DiscreteDomain.integers()).asList(); JavaRDD<Integer> data = sc.parallelize(range).repartition(numOfWorkers); Integer result = data.reduce((Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2); if (result == 15) log.info("successfully tested worker"+threadnumber); else log.warn("worker "+threadnumber+" yielded a false result: " +result+" (should be 15)"); } }.start(); } }
/** * Counts the number of non-space characters in this data set. This utility method * is used to check the tokenization result. * @param lines * @return number of characters */ int numCharacters(JavaRDD<String> lines) { JavaRDD<Integer> lengths = lines.map(new Function<String, Integer>() { private static final long serialVersionUID = -2189399343462982586L; @Override public Integer call(String line) throws Exception { line = line.replaceAll("[\\s_]+", ""); return line.length(); } }); return lengths.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = -8438072946884289401L; @Override public Integer call(Integer e0, Integer e1) throws Exception { return e0 + e1; } }); }
private Long getRDDCountSum(JavaPairRDD<ByteArray, Object[]> rdd, final int countMeasureIndex) { final ByteArray ONE = new ByteArray(); Long count = rdd.mapValues(new Function<Object[], Long>() { @Override public Long call(Object[] objects) throws Exception { return (Long) objects[countMeasureIndex]; } }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() { @Override public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22) throws Exception { return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2()); } })._2(); return count; }
@SuppressWarnings("serial") @Override public SortedCounts<String> execute(final JavaSparkContext spark) { final JavaRDD<String> textFile = spark.textFile(inputFile); final JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(final String rawJSON) throws TwitterException { final Status tweet = TwitterObjectFactory.createStatus(rawJSON); String text = tweet.getText(); return Arrays.asList(text.split(" ")); } }); final JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(final String s) { return new Tuple2<String, Integer>(s.toLowerCase(), 1); } }); final JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(final Integer a, final Integer b) { return a + b; } }); return SortedCounts.create(counts); }
/** * Polygon union. * * @return the polygon */ public Polygon PolygonUnion() { Polygon result = this.rawSpatialRDD.reduce(new Function2<Polygon, Polygon, Polygon>() { public Polygon call(Polygon v1, Polygon v2) { //Reduce precision in JTS to avoid TopologyException PrecisionModel pModel = new PrecisionModel(); GeometryPrecisionReducer pReducer = new GeometryPrecisionReducer(pModel); Geometry p1 = pReducer.reduce(v1); Geometry p2 = pReducer.reduce(v2); //Union two polygons Geometry polygonGeom = p1.union(p2); Coordinate[] coordinates = polygonGeom.getCoordinates(); ArrayList<Coordinate> coordinateList = new ArrayList<Coordinate>(Arrays.asList(coordinates)); Coordinate lastCoordinate = coordinateList.get(0); coordinateList.add(lastCoordinate); Coordinate[] coordinatesClosed = new Coordinate[coordinateList.size()]; coordinatesClosed = coordinateList.toArray(coordinatesClosed); GeometryFactory fact = new GeometryFactory(); LinearRing linear = new GeometryFactory().createLinearRing(coordinatesClosed); Polygon polygon = new Polygon(linear, null, fact); //Return the two polygon union result return polygon; } }); return result; }
private static <U extends Geometry, T extends Geometry> JavaPairRDD<U, Long> countGeometriesByKey(JavaPairRDD<U, T> input) { return input.aggregateByKey( 0L, new Function2<Long, T, Long>() { @Override public Long call(Long count, T t) throws Exception { return count + 1; } }, new Function2<Long, Long, Long>() { @Override public Long call(Long count1, Long count2) throws Exception { return count1 + count2; } }); }
/** * read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null; */ public BoundBox getBoundBox(JavaSparkContext sc, String inputPath){ // read bound boxes into memory JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile( inputPath, BoundaryInputFormat.class, Long.class, BoundBox.class, sc.hadoopConfiguration() ); // merge all into one bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>(){ @Override public BoundBox call(BoundBox box1, BoundBox box2) throws Exception { return BoundBox.mergeBoundBox(box1, box2); } }); // if there is a result assign it to variable : boundBox if(bounds.count() > 0){ return new BoundBox(bounds.collect().get(0)._2()); }else return null; }
/** * read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null; */ public static BoundBox readBoundBox(JavaSparkContext sc, String inputPath){ // read bound boxes into memory JavaPairRDD<Long, BoundBox> bounds = sc.newAPIHadoopFile( inputPath, BoundaryInputFormat.class, Long.class, BoundBox.class, sc.hadoopConfiguration() ); // merge all into one bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>(){ @Override public BoundBox call(BoundBox box1, BoundBox box2) throws Exception { return BoundBox.mergeBoundBox(box1, box2); } }); // if there is a result assign it to variable : boundBox if(bounds.count() > 0){ return new BoundBox(bounds.collect().get(0)._2()); }else return null; }
@Before public void before() throws Exception { queryExecutor = new QueryExecutor(deepContext, deepConnectionHandler); // Stubs when(deepConnectionHandler.getConnection(CLUSTERNAME_CONSTANT.getName())).thenReturn(deepConnection); when(deepConnection.getExtractorConfig()).thenReturn(extractorConfig); when(extractorConfig.clone()).thenReturn(extractorConfig); when(deepContext.createJavaRDD(any(ExtractorConfig.class))).thenReturn(singleRdd); when(deepContext.createHDFSRDD(any(ExtractorConfig.class))).thenReturn(rdd); when(rdd.toJavaRDD()).thenReturn(singleRdd); when(singleRdd.collect()).thenReturn(generateListOfCells(3)); when(singleRdd.filter(any(Function.class))).thenReturn(singleRdd); when(singleRdd.map(any(FilterColumns.class))).thenReturn(singleRdd); when(singleRdd.mapToPair(any(PairFunction.class))).thenReturn(pairRdd); when(singleRdd.keyBy(any(Function.class))).thenReturn(pairRdd); when(pairRdd.join(pairRdd)).thenReturn(joinedRdd); when(pairRdd.reduceByKey(any(Function2.class))).thenReturn(pairRdd); when(pairRdd.map(any(Function.class))).thenReturn(singleRdd); when(joinedRdd.map(any(JoinCells.class))).thenReturn(singleRdd); }
@Override protected Function2<JavaPairRDD<String, RowMutation>, Time, Void> getFunction() { return new Function2<JavaPairRDD<String, RowMutation>, Time, Void>() { // Blur Thrift Client @Override public Void call(JavaPairRDD<String, RowMutation> rdd, Time time) throws Exception { Iface client = getBlurClient(); for (Tuple2<String, RowMutation> tuple : rdd.collect()) { if (tuple != null) { try { RowMutation rm = tuple._2; // Index using enqueue mutate call client.enqueueMutate(rm); } catch (Exception ex) { LOG.error("Unknown error while trying to call enqueueMutate.", ex); throw ex; } } } return null; } }; }
@Override public Rectangle2D bounds() { final JavaRDD<Rectangle2D> rects; if (partitions) { rects = base.mapPartitions( new FlatMapFunction<Iterator<Glyph<G,I>>,Rectangle2D>() { public Iterable<Rectangle2D> call(Iterator<Glyph<G, I>> glyphs) throws Exception { ArrayList<Glyph<G,I>> glyphList = Lists.newArrayList(new IterableIterator<>(glyphs)); return Arrays.asList(Util.bounds(glyphList)); }}); } else { rects = base.map(new Function<Glyph<G,I>,Rectangle2D>() { public Rectangle2D call(Glyph<G,I> glyph) throws Exception { return Util.boundOne(glyph.shape()); }}); } return rects.reduce(new Function2<Rectangle2D, Rectangle2D,Rectangle2D>() { public Rectangle2D call(Rectangle2D left, Rectangle2D right) throws Exception { return Util.bounds(left, right); } }); }
/** * bulidDataQueryRDD: convert click stream list to data set queries pairs. * * @param clickstreamRDD: * click stream data * @param downloadWeight: * weight of download behavior * @return JavaPairRDD, key is short name of data set, and values are queries */ public JavaPairRDD<String, List<String>> bulidDataQueryRDD(JavaRDD<ClickStream> clickstreamRDD, int downloadWeight) { return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, List<String>> call(ClickStream click) throws Exception { List<String> query = new ArrayList<>(); // important! download behavior is given higher weights // than viewing // behavior boolean download = click.isDownload(); int weight = 1; if (download) { weight = downloadWeight; } for (int i = 0; i < weight; i++) { query.add(click.getKeyWords()); } return new Tuple2<>(click.getViewDataset(), query); } }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public List<String> call(List<String> v1, List<String> v2) throws Exception { List<String> list = new ArrayList<>(); list.addAll(v1); list.addAll(v2); return list; } }); }
public JavaPairRDD<String, Double> bulidUserItermRDD(JavaRDD<ClickStream> clickstreamRDD) { return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, Double>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Double> call(ClickStream click) throws Exception { double rate = 1; boolean download = click.isDownload(); if (download) { rate = 2; } String sessionID = click.getSessionID(); String user = sessionID.split("@")[0]; return new Tuple2<>(user + "," + click.getViewDataset(), rate); } }).reduceByKey(new Function2<Double, Double, Double>() { /** * */ private static final long serialVersionUID = 1L; @Override public Double call(Double v1, Double v2) throws Exception { return v1 >= v2 ? v1 : v2; } }); }
public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException { JavaRDD<String> userRDD = getUserRDD(this.cleanupType); int sessionCount = 0; sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterator<Integer> call(Iterator<String> arg0) throws Exception { ESDriver tmpES = new ESDriver(props); tmpES.createBulkProcessor(); List<Integer> sessionNums = new ArrayList<>(); while (arg0.hasNext()) { String s = arg0.next(); Integer sessionNum = genSessionByReferer(tmpES, s, timeThres); sessionNums.add(sessionNum); } tmpES.destroyBulkProcessor(); tmpES.close(); return sessionNums.iterator(); } }).reduce(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer a, Integer b) { return a + b; } }); LOG.info("Initial Session count: {}", Integer.toString(sessionCount)); }
public void processSessionInParallel() throws InterruptedException, IOException { List<String> sessions = this.getSessions(); JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition); int sessionCount = 0; sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { @Override public Iterator<Integer> call(Iterator<String> arg0) throws Exception { ESDriver tmpES = new ESDriver(props); tmpES.createBulkProcessor(); List<Integer> sessionNums = new ArrayList<Integer>(); sessionNums.add(0); while (arg0.hasNext()) { String s = arg0.next(); Integer sessionNum = processSession(tmpES, s); sessionNums.add(sessionNum); } tmpES.destroyBulkProcessor(); tmpES.close(); return sessionNums.iterator(); } }).reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer a, Integer b) { return a + b; } }); LOG.info("Final Session count: {}", Integer.toString(sessionCount)); }
/** * buildMetadataRDD: Convert metadata list to JavaPairRDD * * @param es an Elasticsearch client node instance * @param sc spark context * @param index index name of log processing application * @param metadatas metadata list * @return PairRDD, in each pair key is metadata short name and value is term * list extracted from metadata variables. */ protected JavaPairRDD<String, List<String>> buildMetadataRDD(ESDriver es, JavaSparkContext sc, String index, List<PODAACMetadata> metadatas) { JavaRDD<PODAACMetadata> metadataRDD = sc.parallelize(metadatas); JavaPairRDD<String, List<String>> metadataTermsRDD = metadataRDD.mapToPair(new PairFunction<PODAACMetadata, String, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, List<String>> call(PODAACMetadata metadata) throws Exception { return new Tuple2<String, List<String>>(metadata.getShortName(), metadata.getAllTermList()); } }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public List<String> call(List<String> v1, List<String> v2) throws Exception { List<String> list = new ArrayList<String>(); list.addAll(v1); list.addAll(v2); return list; } }); return metadataTermsRDD; }
/** * @return a function that returns the second of two values * @param <T> element type */ public static <T> Function2<T,T,T> last() { return new Function2<T,T,T>() { @Override public T call(T current, T next) { return next; } }; }
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs); JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() { @Override public Tuple2<Tuple3<String, String, String>, Stats> call(String s) { return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s)); } }); JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() { @Override public Stats call(Stats stats, Stats stats2) { return stats.merge(stats2); } }); List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect(); for (Tuple2<?,?> t : output) { System.out.println(t._1() + "\t" + t._2()); } jsc.stop(); }
public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); ClientConfiguration clientConf = new ClientConfiguration(); ConsumerConfiguration consConf = new ConsumerConfiguration(); String url = "pulsar://localhost:6650/"; String topic = "persistent://sample/standalone/ns1/topic1"; String subs = "sub1"; JavaReceiverInputDStream<byte[]> msgs = jssc .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs)); JavaDStream<Integer> isContainingPulsar = msgs.flatMap(new FlatMapFunction<byte[], Integer>() { @Override public Iterator<Integer> call(byte[] msg) { return Arrays.asList(((new String(msg)).indexOf("Pulsar") != -1) ? 1 : 0).iterator(); } }); JavaDStream<Integer> numOfPulsar = isContainingPulsar.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); numOfPulsar.print(); jssc.start(); jssc.awaitTermination(); }
/** * Create an appropriate {@link Function} for deploying the given {@link ReduceDescriptor} * on Apache Spark. */ public <T> Function2<T, T, T> compile(ReduceDescriptor<T> descriptor, SparkExecutionOperator operator, OptimizationContext.OperatorContext operatorContext, ChannelInstance[] inputs) { final BinaryOperator<T> javaImplementation = descriptor.getJavaImplementation(); if (javaImplementation instanceof FunctionDescriptor.ExtendedSerializableBinaryOperator) { return new ExtendedBinaryOperatorAdapter<>( (FunctionDescriptor.ExtendedSerializableBinaryOperator<T>) javaImplementation, new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber()) ); } else { return new BinaryOperatorAdapter<>(javaImplementation); } }
/** * Save all RDDs in the given DStream to the given view. * @param dstream * @param view */ public static <T> void save(JavaDStream<T> dstream, final View<T> view) { final String uri = view.getUri().toString(); dstream.foreachRDD(new Function2<JavaRDD<T>, Time, Void>() { @Override public Void call(JavaRDD<T> rdd, Time time) throws Exception { save(rdd, uri); return null; } }); }
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("Ad Provider Aggregation"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); // Read the source file JavaRDD<String> adInput = sparkContext.textFile(args[0]); // Now we have non-empty lines, lets split them into words JavaPairRDD<String, Integer> adsRDD = adInput.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { CSVReader csvReader = new CSVReader(new StringReader(s)); // lets skip error handling here for simplicity try { String[] adDetails = csvReader.readNext(); return new Tuple2<String, Integer>(adDetails[1], 1); } catch (IOException e) { e.printStackTrace(); // noop } // Need to explore more on error handling return new Tuple2<String, Integer>("-1", 1); } }); JavaPairRDD<String, Integer> adsAggregated = adsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); adsAggregated.saveAsTextFile("./output/ads-aggregated-provider"); }
@Override public JavaRDD<Tuple2<String, Integer>> run(final JavaADAMContext ac, final JavaRDD<AlignmentRecord> recs, final String args) { JavaRDD<String> contigNames = recs.map(new Function<AlignmentRecord, String>() { @Override public String call(final AlignmentRecord rec) { return rec.getReadMapped() ? rec.getContigName() : "unmapped"; } }); JavaPairRDD<String, Integer> counts = contigNames.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(final String contigName) { return new Tuple2<String, Integer>(contigName, Integer.valueOf(1)); } }); JavaPairRDD<String, Integer> reducedCounts = counts.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(final Integer value0, final Integer value1) { return Integer.valueOf(value0.intValue() + value1.intValue()); } }); // todo: seems like there should be a more direct way return JavaRDD.fromRDD(reducedCounts.rdd(), null); }
@Override public JavaRDD<Tuple2<String, Integer>> run(final JavaADAMContext ac, final JavaRDD<AlignmentRecord> recs, final String args) { JavaRDD<String> contigNames = recs.map(new Function<AlignmentRecord, String>() { @Override public String call(final AlignmentRecord rec) { return rec.getReadMapped() ? rec.getReadName() : "unmapped"; } }); JavaPairRDD<String, Integer> counts = contigNames.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(final String readName) { return new Tuple2<String, Integer>(readName, Integer.valueOf(1)); } }); JavaPairRDD<String, Integer> reducedCounts = counts.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(final Integer value0, final Integer value1) { return Integer.valueOf(value0.intValue() + value1.intValue()); } }); // todo: seems like there should be a more direct way return JavaRDD.fromRDD(reducedCounts.rdd(), null); }
@Override public long[][] getConfusionMatrix(IModel model, double threshold) throws DDFException { SparkDDF ddf = (SparkDDF) this.getDDF(); SparkDDF predictions = (SparkDDF) ddf.ML.applyModel(model, true, false); // Now get the underlying RDD to compute JavaRDD<double[]> yTrueYPred = (JavaRDD<double[]>) predictions.getJavaRDD(double[].class); final double threshold1 = threshold; long[] cm = yTrueYPred.map(new Function<double[], long[]>() { @Override public long[] call(double[] params) { byte isPos = toByte(params[0] > threshold1); byte predPos = toByte(params[1] > threshold1); long[] result = new long[] { 0L, 0L, 0L, 0L }; result[isPos << 1 | predPos] = 1L; return result; } }).reduce(new Function2<long[], long[], long[]>() { @Override public long[] call(long[] a, long[] b) { return new long[] { a[0] + b[0], a[1] + b[1], a[2] + b[2], a[3] + b[3] }; } }); return new long[][] { new long[] { cm[3], cm[2] }, new long[] { cm[1], cm[0] } }; }
public static void main(String[] args) throws Exception { String master; if (args.length > 0) { master = args[0]; } else { master = "local"; } JavaSparkContext sc = new JavaSparkContext( master, "basicmap", System.getenv("SPARK_HOME"), System.getenv("JARS")); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); Integer result = rdd.fold(0, new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) { return x + y;}}); System.out.println(result); }
public static void main(String[] args) { // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming102"); //SparkConf sparkConf = new SparkConf().setMaster("spark://10.204.100.206:7077").setAppName("Streaming102"); sparkConf.setJars(new String[] { "target\\original-TestProjects-1.0-SNAPSHOT.jar" }); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); String folder = "./stream/"; if(args.length == 1){ folder = args[0]; } JavaDStream<String> lines = ssc.textFileStream(folder); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { System.out.println(x); return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); ssc.start(); ssc.awaitTermination(); }
public static void main(String[] args) { // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming101"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost",9999, StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { System.out.println(x); return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); ssc.start(); ssc.awaitTermination(); }
@Test public void simpleProjectAndSelectWithGroupByQueryTest() throws UnsupportedException, ExecutionException { // Input data List<LogicalStep> stepList = new ArrayList<>(); Project project = createProject(CLUSTERNAME_CONSTANT, TABLE1_CONSTANT); GroupBy groupBy = createGroupBy(); project.setNextStep(groupBy); groupBy.setNextStep(createSelect()); // One single initial step stepList.add(project); LogicalWorkflow logicalWorkflow = new LogicalWorkflow(stepList); // Execution queryExecutor.executeWorkFlow(logicalWorkflow); // Assertions verify(deepContext, times(1)).createJavaRDD(any(ExtractorConfig.class)); verify(singleRdd, times(0)).filter(any(Function.class)); verify(singleRdd, times(0)).mapToPair(any(MapKeyForJoin.class)); verify(singleRdd, times(0)).mapToPair(any(MapKeyForJoin.class)); verify(pairRdd, times(0)).join(pairRdd); verify(joinedRdd, times(0)).map(any(JoinCells.class)); verify(singleRdd, times(1)).map(any(Function.class)); verify(joinedRdd, times(0)).map(any(Function.class)); verify(singleRdd, times(1)).keyBy(any(Function.class)); verify(pairRdd, times(1)).reduceByKey(any(Function2.class)); verify(pairRdd, times(1)).map(any(Function.class)); }
public static void main(String[] args) throws Exception { if (args.length == 0) { System.err.println("Usage: JavaLogQuery <master> [logFile]"); System.exit(1); } JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery", System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs); JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() { @Override public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception { return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s)); } }); JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() { @Override public Stats call(Stats stats, Stats stats2) throws Exception { return stats.merge(stats2); } }); List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect(); for (Tuple2 t : output) { System.out.println(t._1 + "\t" + t._2); } System.exit(0); }
public static void main(String[] args) throws Exception { String zkQuorum = "localhost:2181"; String groupName = "stream"; int numThreads = 3; String topicsName = "test1"; SparkConf sparkConf = new SparkConf().setAppName("WordCountKafkaStream"); JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(5000)); Map<String, Integer> topicToBeUsedBySpark = new HashMap<>(); String[] topics = topicsName.split(","); for (String topic : topics) { topicToBeUsedBySpark.put(topic, numThreads); } JavaPairReceiverInputDStream<String, String> streamMessages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, groupName, topicToBeUsedBySpark); JavaDStream<String> lines = streamMessages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(WORD_DELIMETER.split(x)).iterator(); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); javaStreamingContext.start(); javaStreamingContext.awaitTermination(); }