Java 类org.apache.spark.api.java.StorageLevels 实例源码

项目:searchanalytics-bigdata    文件:SparkStreamServiceImpl.java   
@Override
    public void startFlumeStream() {
        JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(
                jssc, "localhost", 41111, StorageLevels.MEMORY_AND_DISK);

        QueryStringJDStreams queryStringJDStreams = new QueryStringJDStreams();

        // Run top top search query string stream
        queryStringJDStreams
                .topQueryStringsCountInLastOneHourUsingSparkFlumeEvent(flumeStream);

        // Run top product view stream
        //TODO: uncomment to get both stats.
//      queryStringJDStreams
//              .topProductViewsCountInLastOneHourUsingSparkFlumeEvent(flumeStream);
        jssc.start();
    }
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountTransformOpEx.java   
public static void main(String[] args) throws Exception {

      System.setProperty("hadoop.home.dir", "E:\\hadoop");

   SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
   JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
   Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.WARN); 
   List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 10));
   JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);


   JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);

   JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );

   JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );

   wordCounts.print();

JavaPairDStream<String, Integer> joinedDstream = wordCounts
        .transformToPair(new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
            @Override
            public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
                JavaPairRDD<String, Integer> modRDD = rdd.join(initialRDD).mapToPair(
                        new PairFunction<Tuple2<String, Tuple2<Integer, Integer>>, String, Integer>() {
                            @Override
                            public Tuple2<String, Integer> call(
                                    Tuple2<String, Tuple2<Integer, Integer>> joinedTuple) throws Exception {
                                return new Tuple2<>(joinedTuple._1(),(joinedTuple._2()._1() + joinedTuple._2()._2()));
                            }
                        });
                return modRDD;
            }
        });

   joinedDstream.print();
   streamingContext.start();
   streamingContext.awaitTermination();
 }
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountSocketStateful.java   
public static void main(String[] args) throws Exception {
 System.setProperty("hadoop.home.dir", "E:\\hadoop");

   SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
   JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
   streamingContext.checkpoint("E:\\hadoop\\checkpoint");
// Initial state RDD input to mapWithState
   @SuppressWarnings("unchecked")
   List<Tuple2<String, Integer>> tuples =Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
   JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);

   JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);

   JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );

   JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );



  // Update the cumulative count function
  Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
      new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> call(String word, Optional<Integer> one,
            State<Integer> state) {
          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
          Tuple2<String, Integer> output = new Tuple2<>(word, sum);
          state.update(sum);
          return output;
        }
      };

  // DStream made of get cumulative counts that get updated in every batch
  JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

  stateDstream.print();
  streamingContext.start();
  streamingContext.awaitTermination();
}
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountSocketJava8Ex.java   
public static void main(String[] args) throws Exception {

     System.setProperty("hadoop.home.dir", "E:\\hadoop");

  SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
  JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));

  List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 10));
  JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);


  JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);

  JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );

  JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );

  wordCounts.print();

JavaPairDStream<String, Integer> joinedDstream = wordCounts.transformToPair(
   new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
        @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
            rdd.join(initialRDD).mapToPair(new PairFunction<Tuple2<String,Tuple2<Integer,Integer>>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<String, Tuple2<Integer, Integer>> joinedTuple)
                        throws Exception {
                    // TODO Auto-generated method stub
                    return new Tuple2<>( joinedTuple._1(), (joinedTuple._2()._1()+joinedTuple._2()._2()) );
                }
            });

        return rdd;                      
        }
      });

joinedDstream.print();
  streamingContext.start();
  streamingContext.awaitTermination();
}
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountRecoverableEx.java   
protected static JavaStreamingContext createContext(String ip, int port, String checkpointDirectory) {
    SparkConf sparkConf = new SparkConf().setAppName("WordCountRecoverableEx").setMaster("local[*]");
    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
    streamingContext.checkpoint(checkpointDirectory);
    // Initial state RDD input to mapWithState
    @SuppressWarnings("unchecked")
    List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
    JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);

    JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream(ip,port, StorageLevels.MEMORY_AND_DISK_SER);

    JavaDStream<String> words = StreamingLines.flatMap(str -> Arrays.asList(str.split(" ")).iterator());

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str -> new Tuple2<>(str, 1))
            .reduceByKey((count1, count2) -> count1 + count2);

    // Update the cumulative count function
    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
            int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
            Tuple2<String, Integer> output = new Tuple2<>(word, sum);
            state.update(sum);
            return output;
        }
    };

    // DStream made of get cumulative counts that get updated in every batch
    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts
            .mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

    stateDstream.print();
    return streamingContext;
}
项目:Test_Projects    文件:Streaming102.java   
public static void main(String[] args) {

    // Create the context with a 1 second batch size
    SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming101");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));


    JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost",9999, StorageLevels.MEMORY_AND_DISK_SER);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) {
          System.out.println(x);
          return Lists.newArrayList(SPACE.split(x));
      }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
    });

    wordCounts.print();
    ssc.start();
    ssc.awaitTermination();
  }
项目:Apache-Spark-2x-for-Java-Developers    文件:WindowBatchInterval.java   
public static void main(String[] args) {
    //Window Specific property if Hadoop is not instaalled or HADOOP_HOME is not set
     System.setProperty("hadoop.home.dir", "E:\\hadoop");
    //Logger rootLogger = LogManager.getRootLogger();
        //rootLogger.setLevel(Level.WARN); 
       SparkConf conf = new SparkConf().setAppName("KafkaExample").setMaster("local[*]");


       JavaSparkContext sc = new JavaSparkContext(conf);
       JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.minutes(2));
       streamingContext.checkpoint("E:\\hadoop\\checkpoint");
       Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.WARN); 

     List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 10));
    JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);


    JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);

    JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );

    wordCounts.print();
    wordCounts.window(Durations.minutes(8)).countByValue()
      .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
    wordCounts.window(Durations.minutes(8),Durations.minutes(2)).countByValue()
      .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
    wordCounts.window(Durations.minutes(12),Durations.minutes(8)).countByValue()
      .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
    wordCounts.window(Durations.minutes(2),Durations.minutes(2)).countByValue()
      .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
    wordCounts.window(Durations.minutes(12),Durations.minutes(12)).countByValue()
      .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));

    //comment these two operation to make it run
    wordCounts.window(Durations.minutes(5),Durations.minutes(2)).countByValue()
      .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
    wordCounts.window(Durations.minutes(10),Durations.minutes(1)).countByValue()
      .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));

       streamingContext.start();
       try {
        streamingContext.awaitTermination();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
项目:lecture-bigdata    文件:StreamDriver.java   
public static void main(String[] args) {
    // Create the context with a 1 second window size
    SparkConf sparkConf = new SparkConf().setAppName("TweetServerCount");   
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); 
    // connect to a simple server streaming tweets (or any Json object)
    JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 1234, StorageLevels.MEMORY_AND_DISK_SER);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        public Iterable<String> call(String x)  {
            try {
                JsonObject jsonObject =  new JsonParser().parse(x).getAsJsonObject();
                // read from the json object the value of the attribute "text" - in a tweet this is the current text of the tweet
                String textline=jsonObject.get("text").getAsString();
                // split into words
                return Lists.newArrayList(SPACE.split(textline));

                } catch (Exception e) {
                    // sometimes the server does not transfer the correct result due to special characters, we ignore them

                }
            return Lists.newArrayList("");
        }
    });
    /** Count the number of tweets containing the word "worldcup" within the 1 second window **/
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) {
                    // count number of worldcup
                    if (s.toLowerCase().contains("worldcup")) {
                        return new Tuple2<String, Integer>("worldcup",1);
                    } 
                    // if it does not contain worldcup return 0
                    return new Tuple2<String, Integer>("worldcup",0);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                // summarize the total counts of worldcup with in the 1 second window
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
            }
            });
    // print the counts per 1 second / window
    wordCounts.print();
    ssc.start();
    ssc.awaitTermination();
}
项目:hbase-downstreamer    文件:JavaNetworkWordCountStoreInHBase.java   
public static void main(String[] args) {
  if (args.length < 2) {
    System.err.println("Usage: JavaNetworkWordCountStoreInHBase <hostname> <port>");
    System.exit(1);
  }

  // Create the context with a 1 second batch size
  SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCountStoreInHBase");
  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

  // Copy the keytab to our executors
  ssc.sparkContext().addFile(sparkConf.get("spark.yarn.keytab"));

  // Create a JavaReceiverInputDStream on target ip:port and count the
  // words in input stream of \n delimited text (eg. generated by 'nc')
  // Note that no duplication in storage level only for running locally.
  // Replication necessary in distributed scenario for fault tolerance.
  JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
          args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
  JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterable<String> call(String x) {
      return Lists.newArrayList(SPACE.split(x));
    }
  });
  JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
    new PairFunction<String, String, Integer>() {
      @Override
      public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
      }
    }).reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    });

  final StoreCountsToHBase store = new StoreCountsToHBase(sparkConf);

  wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
    @Override
    public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
      store.setTime(time);
      rdd.foreachPartition(store);
      return null;
    }
  });

  ssc.start();
  ssc.awaitTermination();
}
项目:hbase-downstreamer    文件:JavaNetworkWordCountStoreInHBase.java   
public static void main(String[] args) {
  if (args.length < 2) {
    System.err.println("Usage: JavaNetworkWordCountStoreInHBase <hostname> <port>");
    System.exit(1);
  }

  // Create the context with a 1 second batch size
  SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCountStoreInHBase");
  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

  // Copy the keytab to our executors
  ssc.sparkContext().addFile(sparkConf.get("spark.yarn.keytab"));

  // Create a JavaReceiverInputDStream on target ip:port and count the
  // words in input stream of \n delimited text (eg. generated by 'nc')
  // Note that no duplication in storage level only for running locally.
  // Replication necessary in distributed scenario for fault tolerance.
  JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
          args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
  JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterable<String> call(String x) {
      return Lists.newArrayList(SPACE.split(x));
    }
  });
  JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
    new PairFunction<String, String, Integer>() {
      @Override
      public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
      }
    }).reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    });

  final StoreCountsToHBase store = new StoreCountsToHBase(sparkConf);

  wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
    @Override
    public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
      store.setTime(time);
      rdd.foreachPartition(store);
      return null;
    }
  });

  ssc.start();
  ssc.awaitTermination();
}
项目:MELA    文件:JavaNetworkWordCount.java   
public static void main(String[] args) {
    if (args.length < 2) {
        System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
        System.exit(1);
    }

    // StreamingExamples.setStreamingLogLevels();
    // Create the context with a 1 second batch size
    SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));

    // Create a JavaReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
            args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);

    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String x) {
            //return Lists.newArrayList(SPACE.split(x));

            List<String> wordsList = new ArrayList<>();

            System.out.println("CHECK POINT ###############");
            wordsList.addAll(Arrays.asList(SPACE.split(x)));

            return wordsList;
        }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<String, Integer>(s, 1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });

    wordCounts.print();
    ssc.start();
    ssc.awaitTermination();

    System.out.println("Task done");
}