Java 类org.apache.spark.api.java.function.Function 实例源码

项目:vn.vitk    文件:NGramBuilder.java   
/**
 * Creates a n-gram data frame from text lines.
 * @param lines
 * @return a n-gram data frame.
 */
DataFrame createNGramDataFrame(JavaRDD<String> lines) {
    JavaRDD<Row> rows = lines.map(new Function<String, Row>(){
        private static final long serialVersionUID = -4332903997027358601L;

        @Override
        public Row call(String line) throws Exception {
            return RowFactory.create(Arrays.asList(line.split("\\s+")));
        }
    });
    StructType schema = new StructType(new StructField[] {
            new StructField("words",
                    DataTypes.createArrayType(DataTypes.StringType), false,
                    Metadata.empty()) });
    DataFrame wordDF = new SQLContext(jsc).createDataFrame(rows, schema);
    // build a bigram language model
    NGram transformer = new NGram().setInputCol("words")
            .setOutputCol("ngrams").setN(2);
    DataFrame ngramDF = transformer.transform(wordDF);
    ngramDF.show(10, false);
    return ngramDF;
}
项目:vn.vitk    文件:DependencyParser.java   
/**
 * Parses a list of PoS-tagged sentences, each on a line and writes the result to an output 
 * file in a specified output format.
 * @param jsc
 * @param sentences
 * @param outputFileName
 * @param outuptFormat
 */
public void parse(JavaSparkContext jsc, List<String> sentences, String outputFileName, OutputFormat outputFormat) {
    JavaRDD<String> input = jsc.parallelize(sentences);
    JavaRDD<Sentence> sents = input.map(new TaggedLineToSentenceFunction());
    JavaRDD<DependencyGraph> graphs = sents.map(new ParsingFunction());
    JavaRDD<Row> rows = graphs.map(new Function<DependencyGraph, Row>() {
        private static final long serialVersionUID = -812004521983071103L;
        public Row call(DependencyGraph graph) {
            return RowFactory.create(graph.getSentence().toString(), graph.dependencies());
        }
    });
    StructType schema = new StructType(new StructField[]{
        new StructField("sentence", DataTypes.StringType, false, Metadata.empty()), 
        new StructField("dependency", DataTypes.StringType, false, Metadata.empty())
    });
    SQLContext sqlContext = new SQLContext(jsc);
    DataFrame df = sqlContext.createDataFrame(rows, schema);

    if (outputFormat == OutputFormat.TEXT)  
        df.select("dependency").write().text(outputFileName);
    else 
        df.repartition(1).write().json(outputFileName);
}
项目:Java-Data-Science-Cookbook    文件:KMeansClusteringMlib.java   
public static void main( String[] args ){
    SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("K-means Example");
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load and parse data
    String path = "data/km-data.txt";
    JavaRDD<String> data = sc.textFile(path);
    JavaRDD<Vector> parsedData = data.map(
      new Function<String, Vector>() {
        public Vector call(String s) {
          String[] sarray = s.split(" ");
          double[] values = new double[sarray.length];
          for (int i = 0; i < sarray.length; i++)
            values[i] = Double.parseDouble(sarray[i]);
          return Vectors.dense(values);
        }
      }
    );
    parsedData.cache();

    // Cluster the data into two classes using KMeans
    int numClusters = 2;
    int numIterations = 20;
    KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);

    // Evaluate clustering by computing Within Set Sum of Squared Errors
    double WSSSE = clusters.computeCost(parsedData.rdd());
    System.out.println("Within Set Sum of Squared Errors = " + WSSSE);



}
项目:Spark-Machine-Learning-Modules    文件:PredictUnit.java   
public static JavaRDD<Tuple2<Object, Object>> predictForOutput_LogisticRegressionModel(LogisticRegressionModel model, JavaRDD<LabeledPoint> data){
    JavaRDD<Tuple2<Object, Object>> FeaturesAndPrediction = data.map(
      new Function<LabeledPoint, Tuple2<Object, Object>>() {
        private static final long serialVersionUID = 1L;
        public Tuple2<Object, Object> call(LabeledPoint p) {
          Double prediction = model.predict(p.features());
          return new Tuple2<Object, Object>(p.features(), prediction);
        }
      }
    );
    return FeaturesAndPrediction;    
}
项目:fst-bench    文件:JavaSleep.java   
public static void main(String[] args) throws Exception {

    if (args.length != 1) {
      System.err.println("Usage: JavaSleep <seconds>");
      System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("JavaSleep");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    Integer parallel = sparkConf.getInt("spark.default.parallelism", ctx.defaultParallelism());
    Integer seconds = Integer.parseInt(args[0]);

    Integer[] init_val = new Integer[parallel];
    Arrays.fill(init_val, seconds);

    JavaRDD<Integer> workload = ctx.parallelize(Arrays.asList(init_val), parallel).map(new Function<Integer, Integer>() {
      @Override
      public Integer call(Integer s) throws InterruptedException {
        Thread.sleep(s * 1000);
        return 0;
      }
    });

    List<Integer> output = workload.collect();
    ctx.stop();
  }
项目:SparkJNI    文件:StreamUtils.java   
static List<StreamVectors> performJavaStream(String appName, List<StreamVectors> input, int noIters) {
    JavaRDD<StreamVectors> streamVectorsJavaRDD = ExampleUtils.getSparkContext(appName).parallelize(input);
    for (int i = 0; i < noIters; i++) {
        streamVectorsJavaRDD = streamVectorsJavaRDD.map(new Function<StreamVectors, StreamVectors>() {
            @Override
            public StreamVectors call(StreamVectors streamVectors) throws Exception {
                streamVectors.setStartRun(System.nanoTime());
                for (int idx = 0; idx < streamVectors.A.length; idx++) {
                    streamVectors.C[idx] = streamVectors.A[idx];
                }
                for (int idx = 0; idx < streamVectors.A.length; idx++) {
                    streamVectors.B[idx] = streamVectors.scaling_constant * streamVectors.C[idx];
                }
                for (int idx = 0; idx < streamVectors.A.length; idx++) {
                    streamVectors.C[idx] = streamVectors.A[idx] + streamVectors.B[idx];
                }
                for (int idx = 0; idx < streamVectors.A.length; idx++) {
                    streamVectors.A[idx] = streamVectors.B[idx] + streamVectors.scaling_constant * streamVectors.C[idx];
                }
                streamVectors.setEndRun(System.nanoTime());
                return streamVectors;
            }
        });
    }
    return streamVectorsJavaRDD.collect();
}
项目:SparkJNI    文件:StreamMain.java   
private List<StreamVectors> performJavaStream(String appName, List<StreamVectors> input) {
    return ExampleUtils.getSparkContext(appName).parallelize(input).map(new Function<StreamVectors, StreamVectors>() {
        @Override
        public StreamVectors call(StreamVectors streamVectors) throws Exception {
            streamVectors.setStartRun(System.nanoTime());
            for(int idx = 0; idx < streamVectors.A.length; idx++){
                streamVectors.C[idx] = streamVectors.A[idx];
            }
            for(int idx = 0; idx < streamVectors.A.length; idx++){
                streamVectors.B[idx] = streamVectors.scaling_constant * streamVectors.C[idx];
            }
            for(int idx = 0; idx < streamVectors.A.length; idx++){
                streamVectors.C[idx] = streamVectors.A[idx] + streamVectors.B[idx];
            }
            for(int idx = 0; idx < streamVectors.A.length; idx++){
                streamVectors.A[idx] = streamVectors.B[idx] + streamVectors.scaling_constant * streamVectors.C[idx];
            }
            streamVectors.setEndRun(System.nanoTime());
            return streamVectors;
        }
    }).collect();
}
项目:splice-community-sample-code    文件:SaveLogAggRDD.java   
@Override
public void call(JavaPairRDD<PublisherGeoKey, AggregationLog> logsRDD) throws Exception {

    if (logsRDD != null) {
        LOG.info(" Data to process in RDD:" + logsRDD.count());

        JavaRDD<AggregationResult> aggResRDD = logsRDD.map(new Function<Tuple2<PublisherGeoKey, AggregationLog>, AggregationResult>() {
            @Override
            public AggregationResult call(
                    Tuple2<PublisherGeoKey, AggregationLog> arg0)
                    throws Exception {
                PublisherGeoKey p = arg0._1;
                AggregationLog a = arg0._2;
                return new AggregationResult(new Timestamp(a.getTimestamp()),
                        p.getPublisher(), p.getGeo(), a.getImps(),
                        (int) a.getUniquesHll().estimatedSize(),
                        a.getSumBids() / a.getImps());
            }
        });
        LOG.info(" Call Data Process Partition");
        aggResRDD.foreachPartition(new SaveLogAggPartition());
    } else
        LOG.error("Data to process:" + 0);
}
项目:Sparkathon    文件:PassingFunctions.java   
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Big Apple").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    class GetLength implements Function<String, Integer> {
        public Integer call(String s) {
            return s.length();
        }
    }

    class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }

    JavaRDD<String> lines = sc.textFile("src/main/resources/compressed.gz");
    JavaRDD<Integer> lineLengths = lines.map(new GetLength());
    // Printing an RDD
    lineLengths.foreach(x-> System.out.println(x));

    int totalLength = lineLengths.reduce(new Sum());

    System.out.println(totalLength);
}
项目:rheem    文件:FunctionCompiler.java   
/**
 * Create an appropriate {@link Function}-based predicate for deploying the given {@link PredicateDescriptor}
 * on Apache Spark.
 *
 * @param predicateDescriptor describes the function
 * @param operator            that executes the {@link Function}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
 * @param operatorContext     contains optimization information for the {@code operator}
 * @param inputs              that feed the {@code operator}; only required if the {@code descriptor} describes an {@link ExtendedFunction}
 */
public <Type> Function<Type, Boolean> compile(
        PredicateDescriptor<Type> predicateDescriptor,
        SparkExecutionOperator operator,
        OptimizationContext.OperatorContext operatorContext,
        ChannelInstance[] inputs) {
    final Predicate<Type> javaImplementation = predicateDescriptor.getJavaImplementation();
    if (javaImplementation instanceof PredicateDescriptor.ExtendedSerializablePredicate) {
        return new ExtendedPredicateAdapater<>(
                (PredicateDescriptor.ExtendedSerializablePredicate<Type>) javaImplementation,
                new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber())
        );
    } else {
        return new PredicateAdapter<>(javaImplementation);
    }
}
项目:vn.vitk    文件:Tagger.java   
private JavaRDD<String> toTaggedSentence(DataFrame output) {
    return output.javaRDD().map(new Function<Row, String>() {
        private static final long serialVersionUID = 4208643510231783579L;
        @Override
        public String call(Row row) throws Exception {
            String[] tokens = row.getString(0).trim().split("\\s+");
            String[] tags = row.getString(1).trim().split("\\s+");
            if (tokens.length != tags.length) {
                System.err.println("Incompatible lengths!");
                return null;
            }
            StringBuilder sb = new StringBuilder(64);
            for (int j = 0; j < tokens.length; j++) {
                sb.append(tokens[j]);
                sb.append('/');
                sb.append(tags[j]);
                sb.append(' ');
            }
            return sb.toString().trim();
        }
    });
}
项目:vn.vitk    文件:Tokenizer.java   
/**
 * Counts the number of non-space characters in this data set. This utility method 
 * is used to check the tokenization result.
 * @param lines
 * @return number of characters
 */
int numCharacters(JavaRDD<String> lines) {
    JavaRDD<Integer> lengths = lines.map(new Function<String, Integer>() {
        private static final long serialVersionUID = -2189399343462982586L;
        @Override
        public Integer call(String line) throws Exception {
            line = line.replaceAll("[\\s_]+", "");
            return line.length();
        }
    });
    return lengths.reduce(new Function2<Integer, Integer, Integer>() {
        private static final long serialVersionUID = -8438072946884289401L;

        @Override
        public Integer call(Integer e0, Integer e1) throws Exception {
            return e0 + e1;
        }
    });
}
项目:vn.vitk    文件:DependencyParser.java   
/**
 * Parses all sentences in an input file, each on a line and writes the result to 
 * the console window containing flattened dependency tuples.
 * @param jsc
 * @param inputFileName
 */
public void parse(JavaSparkContext jsc, String inputFileName) {
    List<String> sentences = jsc.textFile(inputFileName).collect();
    JavaRDD<String> input = jsc.parallelize(sentences);
    JavaRDD<Sentence> sents = input.map(new TaggedLineToSentenceFunction());
    JavaRDD<DependencyGraph> graphs = sents.map(new ParsingFunction());
    JavaRDD<String> rows = graphs.map(new Function<DependencyGraph, String>() {
        private static final long serialVersionUID = -6021310762521034121L;

        public String call(DependencyGraph graph) {
            return graph.dependencies();
        }
    });
    for (String s : rows.collect()) {
        System.out.println(s);
    }
}
项目:SparkToParquet    文件:AppMain.java   
public static void main(String[] args) throws IOException {
    Flags.setFromCommandLineArgs(THE_OPTIONS, args);

    // 初始化Spark Conf.
    SparkConf conf = new SparkConf().setAppName("A SECTONG Application: Apache Log Analysis with Spark");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval());
    SQLContext sqlContext = new SQLContext(sc);

    // 初始化参数
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(Flags.getInstance().getKafka_topic().split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", Flags.getInstance().getKafka_broker());

    // 从Kafka Stream获取数据
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
            StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);

    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
        private static final long serialVersionUID = 5266880065425088203L;

        public String call(Tuple2<String, String> tuple2) {
            return tuple2._2();
        }
    });

    JavaDStream<ApacheAccessLog> accessLogsDStream = lines.flatMap(line -> {
        List<ApacheAccessLog> list = new ArrayList<>();
        try {
            // 映射每一行
            list.add(ApacheAccessLog.parseFromLogLine(line));
            return list;
        } catch (RuntimeException e) {
            return list;
        }
    }).cache();

    accessLogsDStream.foreachRDD(rdd -> {

        // rdd to DataFrame
        DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);
        // 写入Parquet文件
        df.write().partitionBy("ipAddress", "method", "responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());

        return null;
    });

    // 启动Streaming服务器
    jssc.start(); // 启动计算
    jssc.awaitTermination(); // 等待终止
}
项目:beam    文件:TranslationUtils.java   
/** {@link KV} to pair flatmap function. */
public static <K, V> PairFlatMapFunction<Iterator<KV<K, V>>, K, V> toPairFlatMapFunction() {
  return new PairFlatMapFunction<Iterator<KV<K, V>>, K, V>() {
    @Override
    public Iterator<Tuple2<K, V>> call(final Iterator<KV<K, V>> itr) {
      final Iterator<Tuple2<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<KV<K, V>, Tuple2<K, V>>() {

                @Override
                public Tuple2<K, V> apply(KV<K, V> kv) {
                  return new Tuple2<>(kv.getKey(), kv.getValue());
                }
              });
      return outputItr;
    }
  };
}
项目:beam    文件:TranslationUtils.java   
/** A pair to {@link KV} flatmap function . */
static <K, V> FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>> fromPairFlatMapFunction() {
  return new FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>>() {
    @Override
    public Iterator<KV<K, V>> call(Iterator<Tuple2<K, V>> itr) {
      final Iterator<KV<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<Tuple2<K, V>, KV<K, V>>() {
                @Override
                public KV<K, V> apply(Tuple2<K, V> t2) {
                  return KV.of(t2._1(), t2._2());
                }
              });
      return outputItr;
    }
  };
}
项目:beam    文件:TranslationUtils.java   
/**
 * A utility method that adapts {@link PairFunction} to a {@link PairFlatMapFunction} with an
 * {@link Iterator} input. This is particularly useful because it allows to use functions written
 * for mapToPair functions in flatmapToPair functions.
 *
 * @param pairFunction the {@link PairFunction} to adapt.
 * @param <T> the input type.
 * @param <K> the output key type.
 * @param <V> the output value type.
 * @return a {@link PairFlatMapFunction} that accepts an {@link Iterator} as an input and applies
 *     the {@link PairFunction} on every element.
 */
public static <T, K, V> PairFlatMapFunction<Iterator<T>, K, V> pairFunctionToPairFlatMapFunction(
    final PairFunction<T, K, V> pairFunction) {
  return new PairFlatMapFunction<Iterator<T>, K, V>() {

    @Override
    public Iterator<Tuple2<K, V>> call(Iterator<T> itr) throws Exception {
      final Iterator<Tuple2<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<T, Tuple2<K, V>>() {

                @Override
                public Tuple2<K, V> apply(T t) {
                  try {
                    return pairFunction.call(t);
                  } catch (Exception e) {
                    throw new RuntimeException(e);
                  }
                }
              });
      return outputItr;
    }
  };
}
项目:beam    文件:TranslationUtils.java   
/**
 * A utility method that adapts {@link Function} to a {@link FlatMapFunction} with an {@link
 * Iterator} input. This is particularly useful because it allows to use functions written for map
 * functions in flatmap functions.
 *
 * @param func the {@link Function} to adapt.
 * @param <InputT> the input type.
 * @param <OutputT> the output type.
 * @return a {@link FlatMapFunction} that accepts an {@link Iterator} as an input and applies the
 *     {@link Function} on every element.
 */
public static <InputT, OutputT>
    FlatMapFunction<Iterator<InputT>, OutputT> functionToFlatMapFunction(
        final Function<InputT, OutputT> func) {
  return new FlatMapFunction<Iterator<InputT>, OutputT>() {

    @Override
    public Iterator<OutputT> call(Iterator<InputT> itr) throws Exception {
      final Iterator<OutputT> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<InputT, OutputT>() {

                @Override
                public OutputT apply(InputT t) {
                  try {
                    return func.call(t);
                  } catch (Exception e) {
                    throw new RuntimeException(e);
                  }
                }
              });
      return outputItr;
    }
  };
}
项目:beam    文件:CoderHelpers.java   
/**
 * A function wrapper for converting a byte array pair to a key-value pair, where
 * values are {@link Iterable}.
 *
 * @param keyCoder Coder to deserialize keys.
 * @param valueCoder Coder to deserialize values.
 * @param <K>   The type of the key being deserialized.
 * @param <V>   The type of the value being deserialized.
 * @return A function that accepts a pair of byte arrays and returns a key-value pair.
 */
public static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>
    fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) {
  return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() {
    @Override
    public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) {
      return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
        Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() {
          @Override
          public V apply(byte[] bytes) {
            return fromByteArray(bytes, valueCoder);
          }
        }));
    }
  };
}
项目:net.jgp.labs.spark    文件:RowProcessor.java   
@Override
public void call(JavaRDD<String> rdd) throws Exception {

    JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
        private static final long serialVersionUID = 5167089361335095997L;

        @Override
        public Row call(String msg) {
            Row row = RowFactory.create(msg);
            return row;
        }
    });
    // Create Schema
    StructType schema = DataTypes.createStructType(
            new StructField[] { DataTypes.createStructField("Message", DataTypes.StringType, true) });

    // Get Spark 2.0 session
    SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
    Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema);
    msgDataFrame.show();
}
项目:es-hadoop-v2.2.0    文件:AbstractJavaEsSparkTest.java   
public void testEsRDDZReadJson() throws Exception {
    String target = "spark-test/java-basic-json-read";

    RestUtils.touch("spark-test");
    RestUtils.postData(target, "{\"message\" : \"Hello World\",\"message_date\" : \"2014-05-25\"}".getBytes());
    RestUtils.postData(target, "{\"message\" : \"Goodbye World\",\"message_date\" : \"2014-05-25\"}".getBytes());
    RestUtils.refresh("spark-test");

    JavaRDD<String> esRDD = JavaEsSpark.esJsonRDD(sc, target).values();
    System.out.println(esRDD.collect());
    JavaRDD<String> messages = esRDD.filter(new Function<String, Boolean>() {
        @Override
        public Boolean call(String string) throws Exception {
            return string.contains("message");
        }
    });

    // jdk8
    //esRDD.filter(m -> m.contains("message")));

    assertThat((int) messages.count(), is(2));
    System.out.println(messages.take(10));
    System.out.println(messages);
}
项目:spark-transformers    文件:IfZeroVectorBridgeTest.java   
public DataFrame createDF(JavaRDD<Tuple2<Vector, String>> rdd) {

        // Generate the schema based on the string of schema
        List<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("vectorized_count", new VectorUDT(), true));
        fields.add(DataTypes.createStructField("product_title", DataTypes.StringType, true));

        StructType schema = DataTypes.createStructType(fields);
        // Convert records of the RDD (people) to Rows.
        JavaRDD<Row> rowRDD = rdd.map(
                new Function<Tuple2<Vector, String>, Row>() {
                    public Row call(Tuple2<Vector, String> record) {
                        return RowFactory.create(record._1(), record._2());
                    }
                });

        return sqlContext.createDataFrame(rowRDD, schema);
    }
项目:spark_log_data    文件:LogDataWebinar.java   
private static JavaDStream<String> createDStream(JavaStreamingContext javaStreamingContext, String hostName, int port) {

        JavaReceiverInputDStream<SparkFlumeEvent> flumeEventStream = FlumeUtils.createStream(javaStreamingContext, hostName, port);

        // Set different storage level 
//        flumeEventStream.persist(StorageLevel.MEMORY_AND_DISK_SER());

        JavaDStream<String> dStream = flumeEventStream.map(new Function<SparkFlumeEvent, String>() {

            @Override
            public String call(SparkFlumeEvent sparkFlumeEvent) throws Exception {

                byte[] bodyArray = sparkFlumeEvent.event().getBody().array();
                String logTxt = new String(bodyArray, "UTF-8");
                logger.info(logTxt);

                return logTxt;
            }
        });
        // dStream.print();

        return dStream;
    }
项目:Spark-Course    文件:SparkApp.java   
public  void accessTableWitRDD(JavaSparkContext sc){

        JavaRDD<String> cassandraRDD = javaFunctions(sc).cassandraTable("todolist", "todolisttable")
                .map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                });
        System.out.println("\nReading Data from todolisttable in Cassandra with a RDD: \n" + StringUtils.join(cassandraRDD.toArray(), "\n"));


        // javaFunctions(cassandraRDD).writerBuilder("todolist", "todolisttable", mapToRow(String.class)).saveToCassandra();
    }
项目:chronix.spark    文件:ChronixRDD.java   
/**
 * Transformation: Joins the time series according their identity.
 *
 * @return joined time series
 */
public ChronixRDD joinChunks() {
    JavaPairRDD<MetricTimeSeriesKey, Iterable<MetricTimeSeries>> groupRdd
            = this.groupBy(MetricTimeSeriesKey::new);

    JavaPairRDD<MetricTimeSeriesKey, MetricTimeSeries> joinedRdd
            = groupRdd.mapValues((Function<Iterable<MetricTimeSeries>, MetricTimeSeries>) mtsIt -> {
        MetricTimeSeriesOrdering ordering = new MetricTimeSeriesOrdering();
        List<MetricTimeSeries> orderedChunks = ordering.immutableSortedCopy(mtsIt);
        MetricTimeSeries result = null;
        for (MetricTimeSeries mts : orderedChunks) {
            if (result == null) {
                result = new MetricTimeSeries
                        .Builder(mts.getMetric())
                        .attributes(mts.attributes()).build();
            }
            result.addAll(mts.getTimestampsAsArray(), mts.getValuesAsArray());
        }
        return result;
    });

    JavaRDD<MetricTimeSeries> resultJavaRdd =
            joinedRdd.map((Tuple2<MetricTimeSeriesKey, MetricTimeSeries> mtTuple) -> mtTuple._2);

    return new ChronixRDD(resultJavaRdd);
}
项目:kite-apps    文件:KafkaOutput.java   
/**
 * Writes the content of the stream to the Kafka topic
 * behind this producer.
 */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
    value="SE_INNER_CLASS", justification="Uses state from outer class.")
public void write (JavaDStream<T> stream) {

  stream.foreachRDD(new Function<JavaRDD<T>, Void>() {
    @Override
    public Void call(JavaRDD<T> rdd) throws Exception {

      write(rdd);

      return null;
    }
  });
}
项目:learning-spark    文件:LineCountWithFiltering.java   
public static void main(String[] args) {
  SparkConf sparkConf = new SparkConf().setAppName("Line Count With Filtering");
  JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);


  // Read the source file
  JavaRDD<String> input = sparkContext.textFile(args[0]);

  // RDD is immutable, let's create a new RDD which doesn't contain empty lines
  // the function needs to return true for the records to be kept
  JavaRDD<String> nonEmptyLines = input.filter(new Function<String, Boolean>() {
    @Override
    public Boolean call(String s) throws Exception {
      if(s == null || s.trim().length() < 1) {
        return false;
      }
      return true;
    }
  });

  long count = nonEmptyLines.count();

  System.out.println(String.format("Total lines in %s is %d",args[0],count));
}
项目:spark-utils    文件:SparkAvroLoaderTest.java   
private void assertMapFunction(Function<Tuple2<AvroKey<Country>, NullWritable>, Country> function) throws Exception {
    Country country = Country.newBuilder()
            .setId(3)
            .setIso("PL")
            .setName("Poland")
            .build();
    AvroKey<Country> avroKey = new AvroKey<Country>(country);

    Tuple2<AvroKey<Country>, NullWritable> pair = new Tuple2<>(avroKey, NullWritable.get());

    Country retCountry = function.call(pair);

    assertEquals(Integer.valueOf(3), retCountry.getId());
    assertEquals("PL", retCountry.getIso());
    assertEquals("Poland", retCountry.getName());
}
项目:near-image-replica-detection    文件:MemoryPersistenceSystem.java   
@Override
public JavaPairDStream<ImageFeature, ImageFeature> queryFeaturesStreaming(
        ReplicaConnection conn, IndexingParams params, JavaPairDStream<ImageInfo, ImageFeature> sketches) {
    final ReplicaConnection connF = conn;
    final IndexingParams paramsF = params;
    return sketches.transformToPair(new Function<JavaPairRDD<ImageInfo, ImageFeature>, JavaPairRDD<ImageFeature, ImageFeature>>() {
        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        @Override
        public JavaPairRDD<ImageFeature, ImageFeature> call(JavaPairRDD<ImageInfo, ImageFeature> v1) throws Exception {
            return queryFeatures(connF, paramsF, v1);
        }
    });
}
项目:spark-dataflow    文件:TransformTranslator.java   
private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() {
  return new TransformEvaluator<AvroIO.Read.Bound<T>>() {
    @Override
    public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) {
      String pattern = transform.getFilepattern();
      JavaSparkContext jsc = context.getSparkContext();
      @SuppressWarnings("unchecked")
      JavaRDD<AvroKey<T>> avroFile = (JavaRDD<AvroKey<T>>) (JavaRDD<?>)
          jsc.newAPIHadoopFile(pattern,
                               AvroKeyInputFormat.class,
                               AvroKey.class, NullWritable.class,
                               new Configuration()).keys();
      JavaRDD<WindowedValue<T>> rdd = avroFile.map(
          new Function<AvroKey<T>, T>() {
            @Override
            public T call(AvroKey<T> key) {
              return key.datum();
            }
          }).map(WindowingHelpers.<T>windowFunction());
      context.setOutputRDD(transform, rdd);
    }
  };
}
项目:spark-dataflow    文件:TransformTranslator.java   
private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() {
  return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() {
    @Override
    public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
      String pattern = transform.getFilepattern();
      JavaSparkContext jsc = context.getSparkContext();
      @SuppressWarnings ("unchecked")
      JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern,
          transform.getFormatClass(),
          transform.getKeyClass(), transform.getValueClass(),
          new Configuration());
      JavaRDD<WindowedValue<KV<K, V>>> rdd =
          file.map(new Function<Tuple2<K, V>, KV<K, V>>() {
        @Override
        public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
          return KV.of(t2._1(), t2._2());
        }
      }).map(WindowingHelpers.<KV<K, V>>windowFunction());
      context.setOutputRDD(transform, rdd);
    }
  };
}
项目:spark-dataflow    文件:StreamingTransformTranslator.java   
private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
  return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
    @Override
    public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
      StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
      JavaStreamingContext jssc = sec.getStreamingContext();
      Class<K> keyClazz = transform.getKeyClass();
      Class<V> valueClazz = transform.getValueClass();
      Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
      Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass();
      Map<String, String> kafkaParams = transform.getKafkaParams();
      Set<String> topics = transform.getTopics();
      JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz,
              valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
      JavaDStream<WindowedValue<KV<K, V>>> inputStream =
          inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() {
        @Override
        public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
          return KV.of(t2._1(), t2._2());
        }
      }).map(WindowingHelpers.<KV<K, V>>windowFunction());
      sec.setStream(transform, inputStream);
    }
  };
}
项目:spark-dataflow    文件:CoderHelpers.java   
/**
 * A function wrapper for converting a byte array pair to a key-value pair, where
 * values are {@link Iterable}.
 *
 * @param keyCoder Coder to deserialize keys.
 * @param valueCoder Coder to deserialize values.
 * @param <K>   The type of the key being deserialized.
 * @param <V>   The type of the value being deserialized.
 * @return A function that accepts a pair of byte arrays and returns a key-value pair.
 */
static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>
    fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) {
  return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() {
    @Override
    public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) {
      return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
        Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() {
          @Override
          public V apply(byte[] bytes) {
            return fromByteArray(bytes, valueCoder);
          }
        }));
    }
  };
}
项目:predictionio-template-java-ecom-recommender    文件:Algorithm.java   
private List<ItemScore> similarItems(final List<double[]> recentProductFeatures, Model model, Query query) {
    JavaRDD<ItemScore> itemScores = model.getIndexItemFeatures().map(new Function<Tuple2<Integer, Tuple2<String, double[]>>, ItemScore>() {
        @Override
        public ItemScore call(Tuple2<Integer, Tuple2<String, double[]>> element) throws Exception {
            double similarity = 0.0;
            for (double[] recentFeature : recentProductFeatures) {
                similarity += cosineSimilarity(element._2()._2(), recentFeature);
            }

            return new ItemScore(element._2()._1(), similarity);
        }
    });

    itemScores = validScores(itemScores, query.getWhitelist(), query.getBlacklist(), query.getCategories(), model.getItems(), query.getUserEntityId());
    return sortAndTake(itemScores, query.getNumber());
}
项目:predictionio-template-java-ecom-recommender    文件:Algorithm.java   
private JavaRDD<ItemScore> validScores(JavaRDD<ItemScore> all, final Set<String> whitelist, final Set<String> blacklist, final Set<String> categories, final Map<String, Item> items, String userEntityId) {
    final Set<String> seenItemEntityIds = seenItemEntityIds(userEntityId);
    final Set<String> unavailableItemEntityIds = unavailableItemEntityIds();

    return all.filter(new Function<ItemScore, Boolean>() {
        @Override
        public Boolean call(ItemScore itemScore) throws Exception {
            Item item = items.get(itemScore.getItemEntityId());

            return (item != null
                    && passWhitelistCriteria(whitelist, item.getEntityId())
                    && passBlacklistCriteria(blacklist, item.getEntityId())
                    && passCategoryCriteria(categories, item)
                    && passUnseenCriteria(seenItemEntityIds, item.getEntityId())
                    && passAvailabilityCriteria(unavailableItemEntityIds, item.getEntityId()));
        }
    });
}
项目:streamsx.sparkMLLib    文件:JavaTrainingApplication.java   
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkStreamsSampleTrainingApplication");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    JavaRDD<String> lines = jsc.textFile("data/random_2d_training.csv");
    JavaRDD<Vector> parsedData = lines.map(
      new Function<String, Vector>() {
        @Override
        public Vector call(String s) {
            String[] sarray = s.split(",");
              double[] values = new double[sarray.length];
              for (int i = 0; i < sarray.length; i++) {
                values[i] = Double.parseDouble(sarray[i]);
              }
              return Vectors.dense(values);
        }
      }
    );
    parsedData.cache();

    int numClusters = 10;
    int numIterations = 20;
    KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
    clusters.save(jsc.sc(), "etc/kmeans_model");
    jsc.close();
}
项目:SparkApps    文件:Main.java   
public static void main(String[] args) {
    DbConnection dbConnection = new DbConnection(MYSQL_DRIVER, MYSQL_CONNECTION_URL, MYSQL_USERNAME, MYSQL_PWD);

    // Load data from MySQL
    JdbcRDD<Object[]> jdbcRDD =
            new JdbcRDD<>(sc.sc(), dbConnection, "select * from employees where emp_no >= ? and emp_no <= ?", 10001,
                          499999, 10, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(Object[].class));

    // Convert to JavaRDD
    JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));

    // Join first name and last name
    List<String> employeeFullNameList = javaRDD.map(new Function<Object[], String>() {
        @Override
        public String call(final Object[] record) throws Exception {
            return record[2] + " " + record[3];
        }
    }).collect();

    for (String fullName : employeeFullNameList) {
        LOGGER.info(fullName);
    }
}
项目:kylin    文件:SparkCubingByLayer.java   
private Long getRDDCountSum(JavaPairRDD<ByteArray, Object[]> rdd, final int countMeasureIndex) {
    final ByteArray ONE = new ByteArray();
    Long count = rdd.mapValues(new Function<Object[], Long>() {
        @Override
        public Long call(Object[] objects) throws Exception {
            return (Long) objects[countMeasureIndex];
        }
    }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() {
        @Override
        public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22)
                throws Exception {
            return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2());
        }
    })._2();
    return count;
}
项目:kylin    文件:IteratorUtilsTest.java   
private static ArrayList<Tuple2<Integer, Integer>> getResult(List<Tuple2<Integer, Integer>> list) {

        return Lists.newArrayList(IteratorUtils.merge(list.iterator(), new Comparator<Integer>() {
            @Override
            public int compare(Integer o1, Integer o2) {
                return o1 - o2;
            }
        }, new Function<Iterable<Integer>, Integer>() {
            @Override
            public Integer call(Iterable<Integer> v1) throws Exception {
                int sum = 0;
                for (Integer integer : v1) {
                    sum += integer;
                }
                return sum;
            }
        }));
    }
项目:iis    文件:DocumentOrganizationCombinerTest.java   
private void assertFilterDocumentProjectFunction(Function<Tuple2<String, AffMatchDocumentProject>, Boolean> function,
        Float confidenceLevelThreshold) throws Exception {
    if (confidenceLevelThreshold != null) {
        // given
        float greaterConfidenceLevel = confidenceLevelThreshold+0.1f;
        // execute & assert
        assertTrue(function.call(new Tuple2<String, AffMatchDocumentProject>(projectId, new AffMatchDocumentProject(documentId, projectId, greaterConfidenceLevel))));
        // given
        float smallerConfidenceLevel = confidenceLevelThreshold-0.1f;
        // execute & assert
        assertFalse(function.call(new Tuple2<String, AffMatchDocumentProject>(projectId, new AffMatchDocumentProject(documentId, projectId, smallerConfidenceLevel))));
    } else {
        // execute & assert
        assertTrue(function.call(new Tuple2<String, AffMatchDocumentProject>(projectId, new AffMatchDocumentProject(documentId, projectId, 0.0001f))));
    }
}