Java 类org.apache.spark.api.java.Optional 实例源码

项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountSocketStateful.java   
public static void main(String[] args) throws Exception {
 System.setProperty("hadoop.home.dir", "E:\\hadoop");

   SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
   JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
   streamingContext.checkpoint("E:\\hadoop\\checkpoint");
// Initial state RDD input to mapWithState
   @SuppressWarnings("unchecked")
   List<Tuple2<String, Integer>> tuples =Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
   JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);

   JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);

   JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );

   JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );



  // Update the cumulative count function
  Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
      new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> call(String word, Optional<Integer> one,
            State<Integer> state) {
          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
          Tuple2<String, Integer> output = new Tuple2<>(word, sum);
          state.update(sum);
          return output;
        }
      };

  // DStream made of get cumulative counts that get updated in every batch
  JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

  stateDstream.print();
  streamingContext.start();
  streamingContext.awaitTermination();
}
项目:Apache-Spark-2x-for-Java-Developers    文件:StateFulProcessingExample.java   
public static void main(String[] args) throws InterruptedException {

        System.setProperty("hadoop.home.dir", "C:\\softwares\\Winutils");

        SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Stateful Streaming Example")
                .config("spark.sql.warehouse.dir", "file:////C:/Users/sgulati/spark-warehouse").getOrCreate();

        JavaStreamingContext jssc= new JavaStreamingContext(new JavaSparkContext(sparkSession.sparkContext()),
                Durations.milliseconds(1000));
        JavaReceiverInputDStream<String> inStream = jssc.socketTextStream("10.204.136.223", 9999);
        jssc.checkpoint("C:\\Users\\sgulati\\spark-checkpoint");

        JavaDStream<FlightDetails> flightDetailsStream = inStream.map(x -> {
            ObjectMapper mapper = new ObjectMapper();
            return mapper.readValue(x, FlightDetails.class);
        });



        JavaPairDStream<String, FlightDetails> flightDetailsPairStream = flightDetailsStream
                .mapToPair(f -> new Tuple2<String, FlightDetails>(f.getFlightId(), f));

        Function3<String, Optional<FlightDetails>, State<List<FlightDetails>>, Tuple2<String, Double>> mappingFunc = (
                flightId, curFlightDetail, state) -> {
            List<FlightDetails> details = state.exists() ? state.get() : new ArrayList<>();

            boolean isLanded = false;

            if (curFlightDetail.isPresent()) {
                details.add(curFlightDetail.get());
                if (curFlightDetail.get().isLanded()) {
                    isLanded = true;
                }
            }
            Double avgSpeed = details.stream().mapToDouble(f -> f.getTemperature()).average().orElse(0.0);

            if (isLanded) {
                state.remove();
            } else {
                state.update(details);
            }
            return new Tuple2<String, Double>(flightId, avgSpeed);
        };

        JavaMapWithStateDStream<String, FlightDetails, List<FlightDetails>, Tuple2<String, Double>> streamWithState = flightDetailsPairStream
                .mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(5)));

        streamWithState.print();
        jssc.start();
        jssc.awaitTermination();
    }
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountRecoverableEx.java   
protected static JavaStreamingContext createContext(String ip, int port, String checkpointDirectory) {
    SparkConf sparkConf = new SparkConf().setAppName("WordCountRecoverableEx").setMaster("local[*]");
    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
    streamingContext.checkpoint(checkpointDirectory);
    // Initial state RDD input to mapWithState
    @SuppressWarnings("unchecked")
    List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
    JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);

    JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream(ip,port, StorageLevels.MEMORY_AND_DISK_SER);

    JavaDStream<String> words = StreamingLines.flatMap(str -> Arrays.asList(str.split(" ")).iterator());

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str -> new Tuple2<>(str, 1))
            .reduceByKey((count1, count2) -> count1 + count2);

    // Update the cumulative count function
    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
            int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
            Tuple2<String, Integer> output = new Tuple2<>(word, sum);
            state.update(sum);
            return output;
        }
    };

    // DStream made of get cumulative counts that get updated in every batch
    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts
            .mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

    stateDstream.print();
    return streamingContext;
}