/** * Creates a n-gram data frame from text lines. * @param lines * @return a n-gram data frame. */ DataFrame createNGramDataFrame(JavaRDD<String> lines) { JavaRDD<Row> rows = lines.map(new Function<String, Row>(){ private static final long serialVersionUID = -4332903997027358601L; @Override public Row call(String line) throws Exception { return RowFactory.create(Arrays.asList(line.split("\\s+"))); } }); StructType schema = new StructType(new StructField[] { new StructField("words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty()) }); DataFrame wordDF = new SQLContext(jsc).createDataFrame(rows, schema); // build a bigram language model NGram transformer = new NGram().setInputCol("words") .setOutputCol("ngrams").setN(2); DataFrame ngramDF = transformer.transform(wordDF); ngramDF.show(10, false); return ngramDF; }
/** * Parses a list of PoS-tagged sentences, each on a line and writes the result to an output * file in a specified output format. * @param jsc * @param sentences * @param outputFileName * @param outuptFormat */ public void parse(JavaSparkContext jsc, List<String> sentences, String outputFileName, OutputFormat outputFormat) { JavaRDD<String> input = jsc.parallelize(sentences); JavaRDD<Sentence> sents = input.map(new TaggedLineToSentenceFunction()); JavaRDD<DependencyGraph> graphs = sents.map(new ParsingFunction()); JavaRDD<Row> rows = graphs.map(new Function<DependencyGraph, Row>() { private static final long serialVersionUID = -812004521983071103L; public Row call(DependencyGraph graph) { return RowFactory.create(graph.getSentence().toString(), graph.dependencies()); } }); StructType schema = new StructType(new StructField[]{ new StructField("sentence", DataTypes.StringType, false, Metadata.empty()), new StructField("dependency", DataTypes.StringType, false, Metadata.empty()) }); SQLContext sqlContext = new SQLContext(jsc); DataFrame df = sqlContext.createDataFrame(rows, schema); if (outputFormat == OutputFormat.TEXT) df.select("dependency").write().text(outputFileName); else df.repartition(1).write().json(outputFileName); }
public static void main( String[] args ){ SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("K-means Example"); JavaSparkContext sc = new JavaSparkContext(conf); // Load and parse data String path = "data/km-data.txt"; JavaRDD<String> data = sc.textFile(path); JavaRDD<Vector> parsedData = data.map( new Function<String, Vector>() { public Vector call(String s) { String[] sarray = s.split(" "); double[] values = new double[sarray.length]; for (int i = 0; i < sarray.length; i++) values[i] = Double.parseDouble(sarray[i]); return Vectors.dense(values); } } ); parsedData.cache(); // Cluster the data into two classes using KMeans int numClusters = 2; int numIterations = 20; KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations); // Evaluate clustering by computing Within Set Sum of Squared Errors double WSSSE = clusters.computeCost(parsedData.rdd()); System.out.println("Within Set Sum of Squared Errors = " + WSSSE); }
public static JavaRDD<Tuple2<Object, Object>> predictForOutput_LogisticRegressionModel(LogisticRegressionModel model, JavaRDD<LabeledPoint> data){ JavaRDD<Tuple2<Object, Object>> FeaturesAndPrediction = data.map( new Function<LabeledPoint, Tuple2<Object, Object>>() { private static final long serialVersionUID = 1L; public Tuple2<Object, Object> call(LabeledPoint p) { Double prediction = model.predict(p.features()); return new Tuple2<Object, Object>(p.features(), prediction); } } ); return FeaturesAndPrediction; }
public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println("Usage: JavaSleep <seconds>"); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("JavaSleep"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); Integer parallel = sparkConf.getInt("spark.default.parallelism", ctx.defaultParallelism()); Integer seconds = Integer.parseInt(args[0]); Integer[] init_val = new Integer[parallel]; Arrays.fill(init_val, seconds); JavaRDD<Integer> workload = ctx.parallelize(Arrays.asList(init_val), parallel).map(new Function<Integer, Integer>() { @Override public Integer call(Integer s) throws InterruptedException { Thread.sleep(s * 1000); return 0; } }); List<Integer> output = workload.collect(); ctx.stop(); }
static List<StreamVectors> performJavaStream(String appName, List<StreamVectors> input, int noIters) { JavaRDD<StreamVectors> streamVectorsJavaRDD = ExampleUtils.getSparkContext(appName).parallelize(input); for (int i = 0; i < noIters; i++) { streamVectorsJavaRDD = streamVectorsJavaRDD.map(new Function<StreamVectors, StreamVectors>() { @Override public StreamVectors call(StreamVectors streamVectors) throws Exception { streamVectors.setStartRun(System.nanoTime()); for (int idx = 0; idx < streamVectors.A.length; idx++) { streamVectors.C[idx] = streamVectors.A[idx]; } for (int idx = 0; idx < streamVectors.A.length; idx++) { streamVectors.B[idx] = streamVectors.scaling_constant * streamVectors.C[idx]; } for (int idx = 0; idx < streamVectors.A.length; idx++) { streamVectors.C[idx] = streamVectors.A[idx] + streamVectors.B[idx]; } for (int idx = 0; idx < streamVectors.A.length; idx++) { streamVectors.A[idx] = streamVectors.B[idx] + streamVectors.scaling_constant * streamVectors.C[idx]; } streamVectors.setEndRun(System.nanoTime()); return streamVectors; } }); } return streamVectorsJavaRDD.collect(); }
private List<StreamVectors> performJavaStream(String appName, List<StreamVectors> input) { return ExampleUtils.getSparkContext(appName).parallelize(input).map(new Function<StreamVectors, StreamVectors>() { @Override public StreamVectors call(StreamVectors streamVectors) throws Exception { streamVectors.setStartRun(System.nanoTime()); for(int idx = 0; idx < streamVectors.A.length; idx++){ streamVectors.C[idx] = streamVectors.A[idx]; } for(int idx = 0; idx < streamVectors.A.length; idx++){ streamVectors.B[idx] = streamVectors.scaling_constant * streamVectors.C[idx]; } for(int idx = 0; idx < streamVectors.A.length; idx++){ streamVectors.C[idx] = streamVectors.A[idx] + streamVectors.B[idx]; } for(int idx = 0; idx < streamVectors.A.length; idx++){ streamVectors.A[idx] = streamVectors.B[idx] + streamVectors.scaling_constant * streamVectors.C[idx]; } streamVectors.setEndRun(System.nanoTime()); return streamVectors; } }).collect(); }
@Override public void call(JavaPairRDD<PublisherGeoKey, AggregationLog> logsRDD) throws Exception { if (logsRDD != null) { LOG.info(" Data to process in RDD:" + logsRDD.count()); JavaRDD<AggregationResult> aggResRDD = logsRDD.map(new Function<Tuple2<PublisherGeoKey, AggregationLog>, AggregationResult>() { @Override public AggregationResult call( Tuple2<PublisherGeoKey, AggregationLog> arg0) throws Exception { PublisherGeoKey p = arg0._1; AggregationLog a = arg0._2; return new AggregationResult(new Timestamp(a.getTimestamp()), p.getPublisher(), p.getGeo(), a.getImps(), (int) a.getUniquesHll().estimatedSize(), a.getSumBids() / a.getImps()); } }); LOG.info(" Call Data Process Partition"); aggResRDD.foreachPartition(new SaveLogAggPartition()); } else LOG.error("Data to process:" + 0); }
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Big Apple").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); class GetLength implements Function<String, Integer> { public Integer call(String s) { return s.length(); } } class Sum implements Function2<Integer, Integer, Integer> { public Integer call(Integer a, Integer b) { return a + b; } } JavaRDD<String> lines = sc.textFile("src/main/resources/compressed.gz"); JavaRDD<Integer> lineLengths = lines.map(new GetLength()); // Printing an RDD lineLengths.foreach(x-> System.out.println(x)); int totalLength = lineLengths.reduce(new Sum()); System.out.println(totalLength); }
/** * Create an appropriate {@link Function}-based predicate for deploying the given {@link PredicateDescriptor} * on Apache Spark. * * @param predicateDescriptor describes the function * @param operator that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction} * @param operatorContext contains optimization information for the {@code operator} * @param inputs that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction} */ public <Type> Function<Type, Boolean> compile( PredicateDescriptor<Type> predicateDescriptor, SparkExecutionOperator operator, OptimizationContext.OperatorContext operatorContext, ChannelInstance[] inputs) { final Predicate<Type> javaImplementation = predicateDescriptor.getJavaImplementation(); if (javaImplementation instanceof PredicateDescriptor.ExtendedSerializablePredicate) { return new ExtendedPredicateAdapater<>( (PredicateDescriptor.ExtendedSerializablePredicate<Type>) javaImplementation, new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber()) ); } else { return new PredicateAdapter<>(javaImplementation); } }
private JavaRDD<String> toTaggedSentence(DataFrame output) { return output.javaRDD().map(new Function<Row, String>() { private static final long serialVersionUID = 4208643510231783579L; @Override public String call(Row row) throws Exception { String[] tokens = row.getString(0).trim().split("\\s+"); String[] tags = row.getString(1).trim().split("\\s+"); if (tokens.length != tags.length) { System.err.println("Incompatible lengths!"); return null; } StringBuilder sb = new StringBuilder(64); for (int j = 0; j < tokens.length; j++) { sb.append(tokens[j]); sb.append('/'); sb.append(tags[j]); sb.append(' '); } return sb.toString().trim(); } }); }
/** * Counts the number of non-space characters in this data set. This utility method * is used to check the tokenization result. * @param lines * @return number of characters */ int numCharacters(JavaRDD<String> lines) { JavaRDD<Integer> lengths = lines.map(new Function<String, Integer>() { private static final long serialVersionUID = -2189399343462982586L; @Override public Integer call(String line) throws Exception { line = line.replaceAll("[\\s_]+", ""); return line.length(); } }); return lengths.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = -8438072946884289401L; @Override public Integer call(Integer e0, Integer e1) throws Exception { return e0 + e1; } }); }
/** * Parses all sentences in an input file, each on a line and writes the result to * the console window containing flattened dependency tuples. * @param jsc * @param inputFileName */ public void parse(JavaSparkContext jsc, String inputFileName) { List<String> sentences = jsc.textFile(inputFileName).collect(); JavaRDD<String> input = jsc.parallelize(sentences); JavaRDD<Sentence> sents = input.map(new TaggedLineToSentenceFunction()); JavaRDD<DependencyGraph> graphs = sents.map(new ParsingFunction()); JavaRDD<String> rows = graphs.map(new Function<DependencyGraph, String>() { private static final long serialVersionUID = -6021310762521034121L; public String call(DependencyGraph graph) { return graph.dependencies(); } }); for (String s : rows.collect()) { System.out.println(s); } }
public static void main(String[] args) throws IOException { Flags.setFromCommandLineArgs(THE_OPTIONS, args); // 初始化Spark Conf. SparkConf conf = new SparkConf().setAppName("A SECTONG Application: Apache Log Analysis with Spark"); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval()); SQLContext sqlContext = new SQLContext(sc); // 初始化参数 HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(Flags.getInstance().getKafka_topic().split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", Flags.getInstance().getKafka_broker()); // 从Kafka Stream获取数据 JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { private static final long serialVersionUID = 5266880065425088203L; public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<ApacheAccessLog> accessLogsDStream = lines.flatMap(line -> { List<ApacheAccessLog> list = new ArrayList<>(); try { // 映射每一行 list.add(ApacheAccessLog.parseFromLogLine(line)); return list; } catch (RuntimeException e) { return list; } }).cache(); accessLogsDStream.foreachRDD(rdd -> { // rdd to DataFrame DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class); // 写入Parquet文件 df.write().partitionBy("ipAddress", "method", "responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile()); return null; }); // 启动Streaming服务器 jssc.start(); // 启动计算 jssc.awaitTermination(); // 等待终止 }
/** {@link KV} to pair flatmap function. */ public static <K, V> PairFlatMapFunction<Iterator<KV<K, V>>, K, V> toPairFlatMapFunction() { return new PairFlatMapFunction<Iterator<KV<K, V>>, K, V>() { @Override public Iterator<Tuple2<K, V>> call(final Iterator<KV<K, V>> itr) { final Iterator<Tuple2<K, V>> outputItr = Iterators.transform( itr, new com.google.common.base.Function<KV<K, V>, Tuple2<K, V>>() { @Override public Tuple2<K, V> apply(KV<K, V> kv) { return new Tuple2<>(kv.getKey(), kv.getValue()); } }); return outputItr; } }; }
/** A pair to {@link KV} flatmap function . */ static <K, V> FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>> fromPairFlatMapFunction() { return new FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>>() { @Override public Iterator<KV<K, V>> call(Iterator<Tuple2<K, V>> itr) { final Iterator<KV<K, V>> outputItr = Iterators.transform( itr, new com.google.common.base.Function<Tuple2<K, V>, KV<K, V>>() { @Override public KV<K, V> apply(Tuple2<K, V> t2) { return KV.of(t2._1(), t2._2()); } }); return outputItr; } }; }
/** * A utility method that adapts {@link PairFunction} to a {@link PairFlatMapFunction} with an * {@link Iterator} input. This is particularly useful because it allows to use functions written * for mapToPair functions in flatmapToPair functions. * * @param pairFunction the {@link PairFunction} to adapt. * @param <T> the input type. * @param <K> the output key type. * @param <V> the output value type. * @return a {@link PairFlatMapFunction} that accepts an {@link Iterator} as an input and applies * the {@link PairFunction} on every element. */ public static <T, K, V> PairFlatMapFunction<Iterator<T>, K, V> pairFunctionToPairFlatMapFunction( final PairFunction<T, K, V> pairFunction) { return new PairFlatMapFunction<Iterator<T>, K, V>() { @Override public Iterator<Tuple2<K, V>> call(Iterator<T> itr) throws Exception { final Iterator<Tuple2<K, V>> outputItr = Iterators.transform( itr, new com.google.common.base.Function<T, Tuple2<K, V>>() { @Override public Tuple2<K, V> apply(T t) { try { return pairFunction.call(t); } catch (Exception e) { throw new RuntimeException(e); } } }); return outputItr; } }; }
/** * A utility method that adapts {@link Function} to a {@link FlatMapFunction} with an {@link * Iterator} input. This is particularly useful because it allows to use functions written for map * functions in flatmap functions. * * @param func the {@link Function} to adapt. * @param <InputT> the input type. * @param <OutputT> the output type. * @return a {@link FlatMapFunction} that accepts an {@link Iterator} as an input and applies the * {@link Function} on every element. */ public static <InputT, OutputT> FlatMapFunction<Iterator<InputT>, OutputT> functionToFlatMapFunction( final Function<InputT, OutputT> func) { return new FlatMapFunction<Iterator<InputT>, OutputT>() { @Override public Iterator<OutputT> call(Iterator<InputT> itr) throws Exception { final Iterator<OutputT> outputItr = Iterators.transform( itr, new com.google.common.base.Function<InputT, OutputT>() { @Override public OutputT apply(InputT t) { try { return func.call(t); } catch (Exception e) { throw new RuntimeException(e); } } }); return outputItr; } }; }
/** * A function wrapper for converting a byte array pair to a key-value pair, where * values are {@link Iterable}. * * @param keyCoder Coder to deserialize keys. * @param valueCoder Coder to deserialize values. * @param <K> The type of the key being deserialized. * @param <V> The type of the value being deserialized. * @return A function that accepts a pair of byte arrays and returns a key-value pair. */ public static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>> fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) { return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() { @Override public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) { return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder), Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() { @Override public V apply(byte[] bytes) { return fromByteArray(bytes, valueCoder); } })); } }; }
@Override public void call(JavaRDD<String> rdd) throws Exception { JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() { private static final long serialVersionUID = 5167089361335095997L; @Override public Row call(String msg) { Row row = RowFactory.create(msg); return row; } }); // Create Schema StructType schema = DataTypes.createStructType( new StructField[] { DataTypes.createStructField("Message", DataTypes.StringType, true) }); // Get Spark 2.0 session SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema); msgDataFrame.show(); }
public void testEsRDDZReadJson() throws Exception { String target = "spark-test/java-basic-json-read"; RestUtils.touch("spark-test"); RestUtils.postData(target, "{\"message\" : \"Hello World\",\"message_date\" : \"2014-05-25\"}".getBytes()); RestUtils.postData(target, "{\"message\" : \"Goodbye World\",\"message_date\" : \"2014-05-25\"}".getBytes()); RestUtils.refresh("spark-test"); JavaRDD<String> esRDD = JavaEsSpark.esJsonRDD(sc, target).values(); System.out.println(esRDD.collect()); JavaRDD<String> messages = esRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String string) throws Exception { return string.contains("message"); } }); // jdk8 //esRDD.filter(m -> m.contains("message"))); assertThat((int) messages.count(), is(2)); System.out.println(messages.take(10)); System.out.println(messages); }
public DataFrame createDF(JavaRDD<Tuple2<Vector, String>> rdd) { // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("vectorized_count", new VectorUDT(), true)); fields.add(DataTypes.createStructField("product_title", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows. JavaRDD<Row> rowRDD = rdd.map( new Function<Tuple2<Vector, String>, Row>() { public Row call(Tuple2<Vector, String> record) { return RowFactory.create(record._1(), record._2()); } }); return sqlContext.createDataFrame(rowRDD, schema); }
private static JavaDStream<String> createDStream(JavaStreamingContext javaStreamingContext, String hostName, int port) { JavaReceiverInputDStream<SparkFlumeEvent> flumeEventStream = FlumeUtils.createStream(javaStreamingContext, hostName, port); // Set different storage level // flumeEventStream.persist(StorageLevel.MEMORY_AND_DISK_SER()); JavaDStream<String> dStream = flumeEventStream.map(new Function<SparkFlumeEvent, String>() { @Override public String call(SparkFlumeEvent sparkFlumeEvent) throws Exception { byte[] bodyArray = sparkFlumeEvent.event().getBody().array(); String logTxt = new String(bodyArray, "UTF-8"); logger.info(logTxt); return logTxt; } }); // dStream.print(); return dStream; }
public void accessTableWitRDD(JavaSparkContext sc){ JavaRDD<String> cassandraRDD = javaFunctions(sc).cassandraTable("todolist", "todolisttable") .map(new Function<CassandraRow, String>() { @Override public String call(CassandraRow cassandraRow) throws Exception { return cassandraRow.toString(); } }); System.out.println("\nReading Data from todolisttable in Cassandra with a RDD: \n" + StringUtils.join(cassandraRDD.toArray(), "\n")); // javaFunctions(cassandraRDD).writerBuilder("todolist", "todolisttable", mapToRow(String.class)).saveToCassandra(); }
/** * Transformation: Joins the time series according their identity. * * @return joined time series */ public ChronixRDD joinChunks() { JavaPairRDD<MetricTimeSeriesKey, Iterable<MetricTimeSeries>> groupRdd = this.groupBy(MetricTimeSeriesKey::new); JavaPairRDD<MetricTimeSeriesKey, MetricTimeSeries> joinedRdd = groupRdd.mapValues((Function<Iterable<MetricTimeSeries>, MetricTimeSeries>) mtsIt -> { MetricTimeSeriesOrdering ordering = new MetricTimeSeriesOrdering(); List<MetricTimeSeries> orderedChunks = ordering.immutableSortedCopy(mtsIt); MetricTimeSeries result = null; for (MetricTimeSeries mts : orderedChunks) { if (result == null) { result = new MetricTimeSeries .Builder(mts.getMetric()) .attributes(mts.attributes()).build(); } result.addAll(mts.getTimestampsAsArray(), mts.getValuesAsArray()); } return result; }); JavaRDD<MetricTimeSeries> resultJavaRdd = joinedRdd.map((Tuple2<MetricTimeSeriesKey, MetricTimeSeries> mtTuple) -> mtTuple._2); return new ChronixRDD(resultJavaRdd); }
/** * Writes the content of the stream to the Kafka topic * behind this producer. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="SE_INNER_CLASS", justification="Uses state from outer class.") public void write (JavaDStream<T> stream) { stream.foreachRDD(new Function<JavaRDD<T>, Void>() { @Override public Void call(JavaRDD<T> rdd) throws Exception { write(rdd); return null; } }); }
public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("Line Count With Filtering"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); // Read the source file JavaRDD<String> input = sparkContext.textFile(args[0]); // RDD is immutable, let's create a new RDD which doesn't contain empty lines // the function needs to return true for the records to be kept JavaRDD<String> nonEmptyLines = input.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { if(s == null || s.trim().length() < 1) { return false; } return true; } }); long count = nonEmptyLines.count(); System.out.println(String.format("Total lines in %s is %d",args[0],count)); }
private void assertMapFunction(Function<Tuple2<AvroKey<Country>, NullWritable>, Country> function) throws Exception { Country country = Country.newBuilder() .setId(3) .setIso("PL") .setName("Poland") .build(); AvroKey<Country> avroKey = new AvroKey<Country>(country); Tuple2<AvroKey<Country>, NullWritable> pair = new Tuple2<>(avroKey, NullWritable.get()); Country retCountry = function.call(pair); assertEquals(Integer.valueOf(3), retCountry.getId()); assertEquals("PL", retCountry.getIso()); assertEquals("Poland", retCountry.getName()); }
@Override public JavaPairDStream<ImageFeature, ImageFeature> queryFeaturesStreaming( ReplicaConnection conn, IndexingParams params, JavaPairDStream<ImageInfo, ImageFeature> sketches) { final ReplicaConnection connF = conn; final IndexingParams paramsF = params; return sketches.transformToPair(new Function<JavaPairRDD<ImageInfo, ImageFeature>, JavaPairRDD<ImageFeature, ImageFeature>>() { /** * */ private static final long serialVersionUID = 1L; @Override public JavaPairRDD<ImageFeature, ImageFeature> call(JavaPairRDD<ImageInfo, ImageFeature> v1) throws Exception { return queryFeatures(connF, paramsF, v1); } }); }
private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() { return new TransformEvaluator<AvroIO.Read.Bound<T>>() { @Override public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) { String pattern = transform.getFilepattern(); JavaSparkContext jsc = context.getSparkContext(); @SuppressWarnings("unchecked") JavaRDD<AvroKey<T>> avroFile = (JavaRDD<AvroKey<T>>) (JavaRDD<?>) jsc.newAPIHadoopFile(pattern, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, new Configuration()).keys(); JavaRDD<WindowedValue<T>> rdd = avroFile.map( new Function<AvroKey<T>, T>() { @Override public T call(AvroKey<T> key) { return key.datum(); } }).map(WindowingHelpers.<T>windowFunction()); context.setOutputRDD(transform, rdd); } }; }
private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() { return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() { @Override public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) { String pattern = transform.getFilepattern(); JavaSparkContext jsc = context.getSparkContext(); @SuppressWarnings ("unchecked") JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern, transform.getFormatClass(), transform.getKeyClass(), transform.getValueClass(), new Configuration()); JavaRDD<WindowedValue<KV<K, V>>> rdd = file.map(new Function<Tuple2<K, V>, KV<K, V>>() { @Override public KV<K, V> call(Tuple2<K, V> t2) throws Exception { return KV.of(t2._1(), t2._2()); } }).map(WindowingHelpers.<KV<K, V>>windowFunction()); context.setOutputRDD(transform, rdd); } }; }
private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() { return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() { @Override public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) { StreamingEvaluationContext sec = (StreamingEvaluationContext) context; JavaStreamingContext jssc = sec.getStreamingContext(); Class<K> keyClazz = transform.getKeyClass(); Class<V> valueClazz = transform.getValueClass(); Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass(); Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass(); Map<String, String> kafkaParams = transform.getKafkaParams(); Set<String> topics = transform.getTopics(); JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz, valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics); JavaDStream<WindowedValue<KV<K, V>>> inputStream = inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() { @Override public KV<K, V> call(Tuple2<K, V> t2) throws Exception { return KV.of(t2._1(), t2._2()); } }).map(WindowingHelpers.<KV<K, V>>windowFunction()); sec.setStream(transform, inputStream); } }; }
/** * A function wrapper for converting a byte array pair to a key-value pair, where * values are {@link Iterable}. * * @param keyCoder Coder to deserialize keys. * @param valueCoder Coder to deserialize values. * @param <K> The type of the key being deserialized. * @param <V> The type of the value being deserialized. * @return A function that accepts a pair of byte arrays and returns a key-value pair. */ static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>> fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) { return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() { @Override public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) { return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder), Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() { @Override public V apply(byte[] bytes) { return fromByteArray(bytes, valueCoder); } })); } }; }
private List<ItemScore> similarItems(final List<double[]> recentProductFeatures, Model model, Query query) { JavaRDD<ItemScore> itemScores = model.getIndexItemFeatures().map(new Function<Tuple2<Integer, Tuple2<String, double[]>>, ItemScore>() { @Override public ItemScore call(Tuple2<Integer, Tuple2<String, double[]>> element) throws Exception { double similarity = 0.0; for (double[] recentFeature : recentProductFeatures) { similarity += cosineSimilarity(element._2()._2(), recentFeature); } return new ItemScore(element._2()._1(), similarity); } }); itemScores = validScores(itemScores, query.getWhitelist(), query.getBlacklist(), query.getCategories(), model.getItems(), query.getUserEntityId()); return sortAndTake(itemScores, query.getNumber()); }
private JavaRDD<ItemScore> validScores(JavaRDD<ItemScore> all, final Set<String> whitelist, final Set<String> blacklist, final Set<String> categories, final Map<String, Item> items, String userEntityId) { final Set<String> seenItemEntityIds = seenItemEntityIds(userEntityId); final Set<String> unavailableItemEntityIds = unavailableItemEntityIds(); return all.filter(new Function<ItemScore, Boolean>() { @Override public Boolean call(ItemScore itemScore) throws Exception { Item item = items.get(itemScore.getItemEntityId()); return (item != null && passWhitelistCriteria(whitelist, item.getEntityId()) && passBlacklistCriteria(blacklist, item.getEntityId()) && passCategoryCriteria(categories, item) && passUnseenCriteria(seenItemEntityIds, item.getEntityId()) && passAvailabilityCriteria(unavailableItemEntityIds, item.getEntityId())); } }); }
public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkStreamsSampleTrainingApplication"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("data/random_2d_training.csv"); JavaRDD<Vector> parsedData = lines.map( new Function<String, Vector>() { @Override public Vector call(String s) { String[] sarray = s.split(","); double[] values = new double[sarray.length]; for (int i = 0; i < sarray.length; i++) { values[i] = Double.parseDouble(sarray[i]); } return Vectors.dense(values); } } ); parsedData.cache(); int numClusters = 10; int numIterations = 20; KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations); clusters.save(jsc.sc(), "etc/kmeans_model"); jsc.close(); }
public static void main(String[] args) { DbConnection dbConnection = new DbConnection(MYSQL_DRIVER, MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD); // Load data from MySQL JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<>(sc.sc(), dbConnection, "select * from employees where emp_no >= ? and emp_no <= ?", 10001, 499999, 10, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(Object[].class)); // Convert to JavaRDD JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class)); // Join first name and last name List<String> employeeFullNameList = javaRDD.map(new Function<Object[], String>() { @Override public String call(final Object[] record) throws Exception { return record[2] + " " + record[3]; } }).collect(); for (String fullName : employeeFullNameList) { LOGGER.info(fullName); } }
private Long getRDDCountSum(JavaPairRDD<ByteArray, Object[]> rdd, final int countMeasureIndex) { final ByteArray ONE = new ByteArray(); Long count = rdd.mapValues(new Function<Object[], Long>() { @Override public Long call(Object[] objects) throws Exception { return (Long) objects[countMeasureIndex]; } }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() { @Override public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22) throws Exception { return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2()); } })._2(); return count; }
private static ArrayList<Tuple2<Integer, Integer>> getResult(List<Tuple2<Integer, Integer>> list) { return Lists.newArrayList(IteratorUtils.merge(list.iterator(), new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { return o1 - o2; } }, new Function<Iterable<Integer>, Integer>() { @Override public Integer call(Iterable<Integer> v1) throws Exception { int sum = 0; for (Integer integer : v1) { sum += integer; } return sum; } })); }
private void assertFilterDocumentProjectFunction(Function<Tuple2<String, AffMatchDocumentProject>, Boolean> function, Float confidenceLevelThreshold) throws Exception { if (confidenceLevelThreshold != null) { // given float greaterConfidenceLevel = confidenceLevelThreshold+0.1f; // execute & assert assertTrue(function.call(new Tuple2<String, AffMatchDocumentProject>(projectId, new AffMatchDocumentProject(documentId, projectId, greaterConfidenceLevel)))); // given float smallerConfidenceLevel = confidenceLevelThreshold-0.1f; // execute & assert assertFalse(function.call(new Tuple2<String, AffMatchDocumentProject>(projectId, new AffMatchDocumentProject(documentId, projectId, smallerConfidenceLevel)))); } else { // execute & assert assertTrue(function.call(new Tuple2<String, AffMatchDocumentProject>(projectId, new AffMatchDocumentProject(documentId, projectId, 0.0001f)))); } }