private JavaPairRDD<String, String> parallizeData(SparkDriver spark, List<Tuple2<String, String>> datasetContent) { JavaRDD<Tuple2<String, String>> datasetContentRDD = spark.sc.parallelize(datasetContent); return datasetContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(Tuple2<String, String> term) throws Exception { return term; } }); }
public JavaPairRDD<String, List<String>> tokenizeData(JavaPairRDD<String, String> datasetsContentRDD, String splitter) throws Exception { return datasetsContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, List<String>> call(Tuple2<String, String> arg) throws Exception { String content = arg._2; List<String> tokens = getTokens(content, splitter); return new Tuple2<>(arg._1, tokens); } }); }
private void addNewElement(JavaPairRDD newPair, JavaPairRDD timeStamp) { item2ReadCount = item2ReadCount .union(newPair) .coalesce(numPartitions, false) .reduceByKey((v1, v2) -> (Long) v1 + (Long) v2, numPartitions) .mapToPair((PairFunction<Tuple2<Long, Long>, Long, Long>) Tuple2::swap) .sortByKey(false, numPartitions) .mapToPair((PairFunction<Tuple2<Long, Long>, Long, Long>) Tuple2::swap); item2timeStampData = item2timeStampData .union(timeStamp) .coalesce(numPartitions, false) .reduceByKey(replaceValues) .mapToPair((PairFunction<Tuple2<Long, Long>, Long, Long>) Tuple2::swap) .sortByKey(true, numPartitions) .mapToPair((PairFunction<Tuple2<Long, Long>, Long, Long>) Tuple2::swap); }
/** * A utility method that adapts {@link PairFunction} to a {@link PairFlatMapFunction} with an * {@link Iterator} input. This is particularly useful because it allows to use functions written * for mapToPair functions in flatmapToPair functions. * * @param pairFunction the {@link PairFunction} to adapt. * @param <T> the input type. * @param <K> the output key type. * @param <V> the output value type. * @return a {@link PairFlatMapFunction} that accepts an {@link Iterator} as an input and applies * the {@link PairFunction} on every element. */ public static <T, K, V> PairFlatMapFunction<Iterator<T>, K, V> pairFunctionToPairFlatMapFunction( final PairFunction<T, K, V> pairFunction) { return new PairFlatMapFunction<Iterator<T>, K, V>() { @Override public Iterator<Tuple2<K, V>> call(Iterator<T> itr) throws Exception { final Iterator<Tuple2<K, V>> outputItr = Iterators.transform( itr, new com.google.common.base.Function<T, Tuple2<K, V>>() { @Override public Tuple2<K, V> apply(T t) { try { return pairFunction.call(t); } catch (Exception e) { throw new RuntimeException(e); } } }); return outputItr; } }; }
/** * A function wrapper for converting a byte array pair to a key-value pair, where * values are {@link Iterable}. * * @param keyCoder Coder to deserialize keys. * @param valueCoder Coder to deserialize values. * @param <K> The type of the key being deserialized. * @param <V> The type of the value being deserialized. * @return A function that accepts a pair of byte arrays and returns a key-value pair. */ public static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>> fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) { return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() { @Override public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) { return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder), Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() { @Override public V apply(byte[] bytes) { return fromByteArray(bytes, valueCoder); } })); } }; }
protected void publishToNats(final String subject1, final String subject2, final int partitionsNb) { final JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath()).repartition(partitionsNb); JavaPairDStream<String, String> stream1 = lines.mapToPair((PairFunction<String, String, String>) str -> { return new Tuple2<String, String>(subject1, str); }); JavaPairDStream<String, String> stream2 = lines.mapToPair((PairFunction<String, String, String>) str -> { return new Tuple2<String, String>(subject2, str); }); final JavaPairDStream<String, String> stream = stream1.union(stream2); if (logger.isDebugEnabled()) { stream.print(); } SparkToNatsConnectorPool .newPool() .withNatsURL(NATS_SERVER_URL) .withConnectionTimeout(Duration.ofSeconds(2)) .publishToNatsAsKeyValue(stream); }
private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() { return new TransformEvaluator<TextIO.Write.Bound<T>>() { @Override public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaPairRDD<T, Void> last = ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform)) .map(WindowingHelpers.<T>unwindowFunction()) .mapToPair(new PairFunction<T, T, Void>() { @Override public Tuple2<T, Void> call(T t) throws Exception { return new Tuple2<>(t, null); } }); ShardTemplateInformation shardTemplateInfo = new ShardTemplateInformation(transform.getNumShards(), transform.getShardTemplate(), transform.getFilenamePrefix(), transform.getFilenameSuffix()); writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class, NullWritable.class, TemplatedTextOutputFormat.class); } }; }
/** * A function wrapper for converting a byte array pair to a key-value pair, where * values are {@link Iterable}. * * @param keyCoder Coder to deserialize keys. * @param valueCoder Coder to deserialize values. * @param <K> The type of the key being deserialized. * @param <V> The type of the value being deserialized. * @return A function that accepts a pair of byte arrays and returns a key-value pair. */ static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>> fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) { return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() { @Override public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) { return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder), Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() { @Override public V apply(byte[] bytes) { return fromByteArray(bytes, valueCoder); } })); } }; }
@SuppressWarnings("serial") @Override public SortedCounts<String> execute(final JavaSparkContext spark) { final JavaRDD<String> textFile = spark.textFile(inputFile); final JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(final String rawJSON) throws TwitterException { final Status tweet = TwitterObjectFactory.createStatus(rawJSON); String text = tweet.getText(); return Arrays.asList(text.split(" ")); } }); final JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(final String s) { return new Tuple2<String, Integer>(s.toLowerCase(), 1); } }); final JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(final Integer a, final Integer b) { return a + b; } }); return SortedCounts.create(counts); }
/** * Spatial partitioning without duplicates. * * @return true, if successful * @throws Exception the exception */ private boolean spatialPartitioningWithoutDuplicates() throws Exception { this.distributedRasterColorMatrix = this.distributedRasterColorMatrix.mapToPair(new PairFunction<Tuple2<Pixel, Integer>, Pixel, Integer>() { @Override public Tuple2<Pixel, Integer> call(Tuple2<Pixel, Integer> pixelDoubleTuple2) throws Exception { Pixel newPixel = new Pixel(pixelDoubleTuple2._1().getX(),pixelDoubleTuple2._1().getY(),resolutionX,resolutionY); newPixel.setDuplicate(false); newPixel.setCurrentPartitionId(VisualizationPartitioner.CalculatePartitionId(resolutionX,resolutionY,partitionX, partitionY, pixelDoubleTuple2._1.getX(), pixelDoubleTuple2._1.getY())); Tuple2<Pixel,Integer> newPixelDoubleTuple2 = new Tuple2<Pixel, Integer>(newPixel, pixelDoubleTuple2._2()); return newPixelDoubleTuple2; } }); this.distributedRasterColorMatrix = this.distributedRasterColorMatrix.partitionBy(new VisualizationPartitioner(this.resolutionX,this.resolutionY,this.partitionX,this.partitionY)); return true; }
public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster("local"); sparkConf.setAppName("TestSpark"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> input = sc.parallelize(data); JavaPairRDD<String, String> inputPair = input.mapToPair( new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String x) throws Exception { return new Tuple2<String, String>(x.split(" ")[0], x); } } ); System.out.println(inputPair.take(100)); }
@Before public void before() throws Exception { queryExecutor = new QueryExecutor(deepContext, deepConnectionHandler); // Stubs when(deepConnectionHandler.getConnection(CLUSTERNAME_CONSTANT.getName())).thenReturn(deepConnection); when(deepConnection.getExtractorConfig()).thenReturn(extractorConfig); when(extractorConfig.clone()).thenReturn(extractorConfig); when(deepContext.createJavaRDD(any(ExtractorConfig.class))).thenReturn(singleRdd); when(deepContext.createHDFSRDD(any(ExtractorConfig.class))).thenReturn(rdd); when(rdd.toJavaRDD()).thenReturn(singleRdd); when(singleRdd.collect()).thenReturn(generateListOfCells(3)); when(singleRdd.filter(any(Function.class))).thenReturn(singleRdd); when(singleRdd.map(any(FilterColumns.class))).thenReturn(singleRdd); when(singleRdd.mapToPair(any(PairFunction.class))).thenReturn(pairRdd); when(singleRdd.keyBy(any(Function.class))).thenReturn(pairRdd); when(pairRdd.join(pairRdd)).thenReturn(joinedRdd); when(pairRdd.reduceByKey(any(Function2.class))).thenReturn(pairRdd); when(pairRdd.map(any(Function.class))).thenReturn(singleRdd); when(joinedRdd.map(any(JoinCells.class))).thenReturn(singleRdd); }
@SuppressWarnings("serial") private static final PairFunction<Integer, Integer, Integer> convertToKeyValue() { /** * Convert to key-value [key (integer) : value (integer * integer)] */ return new PairFunction<Integer, Integer, Integer>() { @Override public final Tuple2<Integer, Integer> call(final Integer integer) throws Exception { /* Tuple : key (integer) : value (integer * integer) */ return new Tuple2<Integer, Integer>(integer, integer * integer); } }; }
@SuppressWarnings("serial") private static final JavaPairRDD<Integer, String> userDataAsKeyValue(final JavaRDD<String> userInputFile) { /* Left Outer Join of transactions on users */ return userInputFile.mapToPair(new PairFunction<String, Integer, String>() { public Tuple2<Integer, String> call(final String user) { logger.debug("User : " + user); final String[] userSplit = user.split("\t"); /* Tuple : key (user-id) : value (country) */ return new Tuple2<Integer, String>(Integer.valueOf(userSplit[0]), userSplit[3]); } }); }
public void run() throws IOException { SparkConf conf = new SparkConf(); conf.setAppName(getAppName()); conf.set(SPARK_SERIALIZER, ORG_APACHE_SPARK_SERIALIZER_KRYO_SERIALIZER); JavaSparkUtil.packProjectJars(conf); setupSparkConf(conf); JavaStreamingContext ssc = new JavaStreamingContext(conf, getDuration()); List<JavaDStream<T>> streamsList = getStreamsList(ssc); // Union all the streams if there is more than 1 stream JavaDStream<T> streams = unionStreams(ssc, streamsList); JavaPairDStream<String, RowMutation> pairDStream = streams.mapToPair(new PairFunction<T, String, RowMutation>() { public Tuple2<String, RowMutation> call(T t) { RowMutation rowMutation = convert(t); return new Tuple2<String, RowMutation>(rowMutation.getRowId(), rowMutation); } }); pairDStream.foreachRDD(getFunction()); ssc.start(); ssc.awaitTermination(); }
public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "E:\\hadoop"); SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]"); JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1)); Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.WARN); List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 10)); JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples); JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() ); JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 ); wordCounts.print(); JavaPairDStream<String, Integer> joinedDstream = wordCounts .transformToPair(new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception { JavaPairRDD<String, Integer> modRDD = rdd.join(initialRDD).mapToPair( new PairFunction<Tuple2<String, Tuple2<Integer, Integer>>, String, Integer>() { @Override public Tuple2<String, Integer> call( Tuple2<String, Tuple2<Integer, Integer>> joinedTuple) throws Exception { return new Tuple2<>(joinedTuple._1(),(joinedTuple._2()._1() + joinedTuple._2()._2())); } }); return modRDD; } }); joinedDstream.print(); streamingContext.start(); streamingContext.awaitTermination(); }
public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "E:\\hadoop"); SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]"); JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1)); List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 10)); JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples); JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() ); JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 ); wordCounts.print(); JavaPairDStream<String, Integer> joinedDstream = wordCounts.transformToPair( new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception { rdd.join(initialRDD).mapToPair(new PairFunction<Tuple2<String,Tuple2<Integer,Integer>>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Tuple2<Integer, Integer>> joinedTuple) throws Exception { // TODO Auto-generated method stub return new Tuple2<>( joinedTuple._1(), (joinedTuple._2()._1()+joinedTuple._2()._2()) ); } }); return rdd; } }); joinedDstream.print(); streamingContext.start(); streamingContext.awaitTermination(); }
/** * filter out-of-data metadata * * @param es * the Elasticsearch drive * @param userDatasetsRDD * dataset extracted from session * @return filtered session datasets */ public JavaPairRDD<String, List<String>> removeRetiredDataset(ESDriver es, JavaPairRDD<String, List<String>> userDatasetsRDD) { Map<String, String> nameMap = this.getOnServiceMetadata(es); return userDatasetsRDD.mapToPair(new PairFunction<Tuple2<String, List<String>>, String, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, List<String>> call(Tuple2<String, List<String>> arg0) throws Exception { List<String> oriDatasets = arg0._2; List<String> newDatasets = new ArrayList<>(); int size = oriDatasets.size(); for (int i = 0; i < size; i++) { String name = oriDatasets.get(i); if (nameMap.containsKey(name)) { newDatasets.add(nameMap.get(name)); } } return new Tuple2<>(arg0._1, newDatasets); } }); }
/** * bulidDataQueryRDD: convert click stream list to data set queries pairs. * * @param clickstreamRDD: * click stream data * @param downloadWeight: * weight of download behavior * @return JavaPairRDD, key is short name of data set, and values are queries */ public JavaPairRDD<String, List<String>> bulidDataQueryRDD(JavaRDD<ClickStream> clickstreamRDD, int downloadWeight) { return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, List<String>> call(ClickStream click) throws Exception { List<String> query = new ArrayList<>(); // important! download behavior is given higher weights // than viewing // behavior boolean download = click.isDownload(); int weight = 1; if (download) { weight = downloadWeight; } for (int i = 0; i < weight; i++) { query.add(click.getKeyWords()); } return new Tuple2<>(click.getViewDataset(), query); } }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public List<String> call(List<String> v1, List<String> v2) throws Exception { List<String> list = new ArrayList<>(); list.addAll(v1); list.addAll(v2); return list; } }); }
public JavaPairRDD<String, Double> bulidUserItermRDD(JavaRDD<ClickStream> clickstreamRDD) { return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, Double>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Double> call(ClickStream click) throws Exception { double rate = 1; boolean download = click.isDownload(); if (download) { rate = 2; } String sessionID = click.getSessionID(); String user = sessionID.split("@")[0]; return new Tuple2<>(user + "," + click.getViewDataset(), rate); } }).reduceByKey(new Function2<Double, Double, Double>() { /** * */ private static final long serialVersionUID = 1L; @Override public Double call(Double v1, Double v2) throws Exception { return v1 >= v2 ? v1 : v2; } }); }
/** * buildMetadataRDD: Convert metadata list to JavaPairRDD * * @param es an Elasticsearch client node instance * @param sc spark context * @param index index name of log processing application * @param metadatas metadata list * @return PairRDD, in each pair key is metadata short name and value is term * list extracted from metadata variables. */ protected JavaPairRDD<String, List<String>> buildMetadataRDD(ESDriver es, JavaSparkContext sc, String index, List<PODAACMetadata> metadatas) { JavaRDD<PODAACMetadata> metadataRDD = sc.parallelize(metadatas); JavaPairRDD<String, List<String>> metadataTermsRDD = metadataRDD.mapToPair(new PairFunction<PODAACMetadata, String, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, List<String>> call(PODAACMetadata metadata) throws Exception { return new Tuple2<String, List<String>>(metadata.getShortName(), metadata.getAllTermList()); } }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public List<String> call(List<String> v1, List<String> v2) throws Exception { List<String> list = new ArrayList<String>(); list.addAll(v1); list.addAll(v2); return list; } }); return metadataTermsRDD; }
private static PairFunction<Tuple2<String, String>, String, ExampleXML> parseXml() { ParseXML parser = new ParseXML(); return tuple -> { try { return new Tuple2<>(tuple._1(), parser.call(tuple._2())); } catch(JAXBException badXML) { System.err.printf("Bad XML at %s\n", tuple._1()); badXML.printStackTrace(); return null; } }; }
private static PairFunction<Tuple2<String, ExampleXML>, Object, JsonObject> prepToBq() { return tuple -> { JsonObject json = new JsonObject(); json.addProperty("property1", tuple._2().getProperty1()); json.addProperty("insertId", tuple._1()); return new Tuple2<>(null, json); }; }
public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaTeraSort <HDFS_INPUT> <HDFS_OUTPUT>"); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("JavaTeraSort"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD<String> lines = ctx.textFile(args[0], 1); Integer parallel = sparkConf.getInt("spark.default.parallelism", ctx.defaultParallelism()); Integer reducer = Integer.parseInt(IOCommon.getProperty("hibench.default.shuffle.parallelism").get()); JavaPairRDD<String, String> words = lines.mapToPair(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<String, String>(s.substring(0, 10), s.substring(10)); } }); JavaPairRDD<String, String> sorted = words.sortByKey(true, reducer); JavaRDD<String> result = sorted.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> e) throws Exception { return e._1() + e._2(); } }); result.saveAsTextFile(args[1]); ctx.stop(); }
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs); JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() { @Override public Tuple2<Tuple3<String, String, String>, Stats> call(String s) { return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s)); } }); JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() { @Override public Stats call(Stats stats, Stats stats2) { return stats.merge(stats2); } }); List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect(); for (Tuple2<?,?> t : output) { System.out.println(t._1() + "\t" + t._2()); } jsc.stop(); }
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR"); JavaSparkContext sc = new JavaSparkContext(sparkConf); Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2; JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache(); // Linear transitive closure: each round grows paths by one edge, // by joining the graph's edges with the already-discovered paths. // e.g. join the path (y, z) from the TC with the edge (x, y) from // the graph to obtain the path (x, z). // Because join() joins on keys, the edges are stored in reversed order. JavaPairRDD<Integer, Integer> edges = tc.mapToPair( new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) { return new Tuple2<Integer, Integer>(e._2(), e._1()); } }); long oldCount; long nextCount = tc.count(); do { oldCount = nextCount; // Perform the join, obtaining an RDD of (y, (z, x)) pairs, // then project the result to obtain the new (x, z) paths. tc = tc.union(tc.join(edges).mapToPair(ProjectFn.INSTANCE)).distinct().cache(); nextCount = tc.count(); } while (nextCount != oldCount); System.out.println("TC has " + tc.count() + " edges."); sc.stop(); }
@Override public PairFunction<?, ?, ?> getPrepareFunction() { return new PairFunction<Long, Void, Row>() { @Override public Tuple2<Void, Row> call(Long aLong) throws Exception { return new Tuple2<>(null, (Row)new RowWithSchema(schema, aLong)); } }; }
/** {@link KV} to pair function. */ public static <K, V> PairFunction<KV<K, V>, K, V> toPairFunction() { return new PairFunction<KV<K, V>, K, V>() { @Override public Tuple2<K, V> call(KV<K, V> kv) { return new Tuple2<>(kv.getKey(), kv.getValue()); } }; }
/** Extract key from a {@link WindowedValue} {@link KV} into a pair. */ public static <K, V> PairFunction<WindowedValue<KV<K, V>>, K, WindowedValue<KV<K, V>>> toPairByKeyInWindowedValue() { return new PairFunction<WindowedValue<KV<K, V>>, K, WindowedValue<KV<K, V>>>() { @Override public Tuple2<K, WindowedValue<KV<K, V>>> call(WindowedValue<KV<K, V>> windowedKv) throws Exception { return new Tuple2<>(windowedKv.getValue().getKey(), windowedKv); } }; }
/** * Returns a pair function to convert value to bytes via coder. * @param coderMap - mapping between TupleTag and a coder * @return a pair function to convert value to bytes via coder */ public static PairFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, TupleTag<?>, byte[]> getTupleTagEncodeFunction(final Map<TupleTag<?>, Coder<WindowedValue<?>>> coderMap) { return new PairFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, TupleTag<?>, byte[]>() { @Override public Tuple2<TupleTag<?>, byte[]> call(Tuple2<TupleTag<?>, WindowedValue<?>> tuple2) throws Exception { TupleTag<?> tupleTag = tuple2._1; WindowedValue<?> windowedValue = tuple2._2; return new Tuple2<TupleTag<?>, byte[]> (tupleTag, CoderHelpers.toByteArray(windowedValue, coderMap.get(tupleTag))); } }; }
/** * Returns a pair function to convert bytes to value via coder. * @param coderMap - mapping between TupleTag and a coder * @return a pair function to convert bytes to value via coder * */ public static PairFunction<Tuple2<TupleTag<?>, byte[]>, TupleTag<?>, WindowedValue<?>> getTupleTagDecodeFunction(final Map<TupleTag<?>, Coder<WindowedValue<?>>> coderMap) { return new PairFunction<Tuple2<TupleTag<?>, byte[]>, TupleTag<?>, WindowedValue<?>>() { @Override public Tuple2<TupleTag<?>, WindowedValue<?>> call(Tuple2<TupleTag<?>, byte[]> tuple2) throws Exception { TupleTag<?> tupleTag = tuple2._1; byte[] windowedByteValue = tuple2._2; return new Tuple2<TupleTag<?>, WindowedValue<?>> (tupleTag, CoderHelpers.fromByteArray(windowedByteValue, coderMap.get(tupleTag))); } }; }
/** * A function wrapper for converting a key-value pair to a byte array pair. * * @param keyCoder Coder to serialize keys. * @param valueCoder Coder to serialize values. * @param <K> The type of the key being serialized. * @param <V> The type of the value being serialized. * @return A function that accepts a key-value pair and returns a pair of byte arrays. */ public static <K, V> PairFunction<Tuple2<K, V>, ByteArray, byte[]> toByteFunction( final Coder<K> keyCoder, final Coder<V> valueCoder) { return new PairFunction<Tuple2<K, V>, ByteArray, byte[]>() { @Override public Tuple2<ByteArray, byte[]> call(Tuple2<K, V> kv) { return new Tuple2<>(new ByteArray(toByteArray(kv._1(), keyCoder)), toByteArray(kv._2(), valueCoder)); } }; }
/** * A function wrapper for converting a byte array pair to a key-value pair. * * @param keyCoder Coder to deserialize keys. * @param valueCoder Coder to deserialize values. * @param <K> The type of the key being deserialized. * @param <V> The type of the value being deserialized. * @return A function that accepts a pair of byte arrays and returns a key-value pair. */ public static <K, V> PairFunction<Tuple2<ByteArray, byte[]>, K, V> fromByteFunction( final Coder<K> keyCoder, final Coder<V> valueCoder) { return new PairFunction<Tuple2<ByteArray, byte[]>, K, V>() { @Override public Tuple2<K, V> call(Tuple2<ByteArray, byte[]> tuple) { return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder), fromByteArray(tuple._2(), valueCoder)); } }; }
public static JavaPairDStream<String, String> getJavaPairDStream(final File tempDir, final JavaStreamingContext ssc, final String subject1) { final JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath()); JavaPairDStream<String, String> keyValues = lines.mapToPair((PairFunction<String, String, String>) str -> { return new Tuple2<String, String>(subject1 + "." + str, str); }); return keyValues; }