Java 类org.apache.spark.api.java.function.Function2 实例源码

项目:big-data-benchmark    文件:SparkWordCount.java   
public static void main(String[] args) {
    if (args.length != 2) {
        System.err.println("Usage:");
        System.err.println("  SparkWordCount <sourceFile> <targetFile>");
        System.exit(1);
    }

    SparkConf conf = new SparkConf()
            .setAppName("Word Count");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> textFile = sc.textFile(args[0]);
    JavaRDD<String> words = textFile.flatMap(LineIterator::new);
    JavaPairRDD<String, Long> pairs =
            words.mapToPair(s -> new Tuple2<>(s, 1L));
    JavaPairRDD<String, Long> counts =
            pairs.reduceByKey((Function2<Long, Long, Long>) (a, b) -> a + b);

    System.out.println("Starting task..");
    long t = System.currentTimeMillis();
    counts.saveAsTextFile(args[1] + "_" + t);
    System.out.println("Time=" + (System.currentTimeMillis() - t));
}
项目:incubator-sdap-mudrod    文件:CrawlerDetection.java   
void checkByRateInParallel() throws InterruptedException, IOException {

    JavaRDD<String> userRDD = getUserRDD(this.httpType);
    LOG.info("Original User count: {}", userRDD.count());

    int userCount = 0;
    userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> {
      ESDriver tmpES = new ESDriver(props);
      tmpES.createBulkProcessor();
      List<Integer> realUserNums = new ArrayList<>();
      while (iterator.hasNext()) {
        String s = iterator.next();
        Integer realUser = checkByRate(tmpES, s);
        realUserNums.add(realUser);
      }
      tmpES.destroyBulkProcessor();
      tmpES.close();
      return realUserNums.iterator();
    }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);

    LOG.info("User count: {}", Integer.toString(userCount));
  }
项目:Sparkathon    文件:PassingFunctions.java   
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Big Apple").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    class GetLength implements Function<String, Integer> {
        public Integer call(String s) {
            return s.length();
        }
    }

    class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }

    JavaRDD<String> lines = sc.textFile("src/main/resources/compressed.gz");
    JavaRDD<Integer> lineLengths = lines.map(new GetLength());
    // Printing an RDD
    lineLengths.foreach(x-> System.out.println(x));

    int totalLength = lineLengths.reduce(new Sum());

    System.out.println(totalLength);
}
项目:mudrod    文件:CrawlerDetection.java   
void checkByRateInParallel() throws InterruptedException, IOException {

    JavaRDD<String> userRDD = getUserRDD(this.httpType);
    LOG.info("Original User count: {}", userRDD.count());

    int userCount = 0;
    userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> {
      ESDriver tmpES = new ESDriver(props);
      tmpES.createBulkProcessor();
      List<Integer> realUserNums = new ArrayList<>();
      while (iterator.hasNext()) {
        String s = iterator.next();
        Integer realUser = checkByRate(tmpES, s);
        realUserNums.add(realUser);
      }
      tmpES.destroyBulkProcessor();
      tmpES.close();
      return realUserNums.iterator();
    }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);

    LOG.info("User count: {}", Integer.toString(userCount));
  }
项目:spark-newsreel-recommender    文件:JavaMain.java   
/**
 * Starts the spark context given a valid configuration.
 * starts a test map-reduce such that all spark workers can fetch dependencies in advance
 */
private static void startContext(int numOfWorkers) {
    JavaSparkContext sc = SharedService.getContext();

    for (int i=0; i<numOfWorkers;i++) {
        final int threadnumber = i;
        new Thread(){
            @Override
            public void run() {
                ImmutableList<Integer> range =
                        ContiguousSet.create(Range.closed(1, 5), DiscreteDomain.integers()).asList();

                JavaRDD<Integer> data = sc.parallelize(range).repartition(numOfWorkers);
                Integer result = data.reduce((Function2<Integer, Integer, Integer>)
                        (v1, v2) -> v1 + v2);
                if (result == 15)
                    log.info("successfully tested worker"+threadnumber);
                else
                    log.warn("worker "+threadnumber+" yielded a false result: "
                            +result+" (should be 15)");
            }
        }.start();
    }
}
项目:vn.vitk    文件:Tokenizer.java   
/**
 * Counts the number of non-space characters in this data set. This utility method 
 * is used to check the tokenization result.
 * @param lines
 * @return number of characters
 */
int numCharacters(JavaRDD<String> lines) {
    JavaRDD<Integer> lengths = lines.map(new Function<String, Integer>() {
        private static final long serialVersionUID = -2189399343462982586L;
        @Override
        public Integer call(String line) throws Exception {
            line = line.replaceAll("[\\s_]+", "");
            return line.length();
        }
    });
    return lengths.reduce(new Function2<Integer, Integer, Integer>() {
        private static final long serialVersionUID = -8438072946884289401L;

        @Override
        public Integer call(Integer e0, Integer e1) throws Exception {
            return e0 + e1;
        }
    });
}
项目:kylin    文件:SparkCubingByLayer.java   
private Long getRDDCountSum(JavaPairRDD<ByteArray, Object[]> rdd, final int countMeasureIndex) {
    final ByteArray ONE = new ByteArray();
    Long count = rdd.mapValues(new Function<Object[], Long>() {
        @Override
        public Long call(Object[] objects) throws Exception {
            return (Long) objects[countMeasureIndex];
        }
    }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() {
        @Override
        public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22)
                throws Exception {
            return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2());
        }
    })._2();
    return count;
}
项目:SHMACK    文件:WordCount.java   
@SuppressWarnings("serial")
@Override
public SortedCounts<String> execute(final JavaSparkContext spark) {
    final JavaRDD<String> textFile = spark.textFile(inputFile);
    final JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(final String rawJSON) throws TwitterException {
            final Status tweet = TwitterObjectFactory.createStatus(rawJSON);
            String text = tweet.getText();
            return Arrays.asList(text.split(" "));
        }
    });
    final JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(final String s) {
            return new Tuple2<String, Integer>(s.toLowerCase(), 1);
        }
    });
    final JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(final Integer a, final Integer b) {
            return a + b;
        }
    });
    return SortedCounts.create(counts);
}
项目:GeoSpark    文件:PolygonRDD.java   
/**
 * Polygon union.
 *
 * @return the polygon
 */
public Polygon PolygonUnion() {
    Polygon result = this.rawSpatialRDD.reduce(new Function2<Polygon, Polygon, Polygon>() {
        public Polygon call(Polygon v1, Polygon v2) {
            //Reduce precision in JTS to avoid TopologyException
            PrecisionModel pModel = new PrecisionModel();
            GeometryPrecisionReducer pReducer = new GeometryPrecisionReducer(pModel);
            Geometry p1 = pReducer.reduce(v1);
            Geometry p2 = pReducer.reduce(v2);
            //Union two polygons
            Geometry polygonGeom = p1.union(p2);
            Coordinate[] coordinates = polygonGeom.getCoordinates();
            ArrayList<Coordinate> coordinateList = new ArrayList<Coordinate>(Arrays.asList(coordinates));
            Coordinate lastCoordinate = coordinateList.get(0);
            coordinateList.add(lastCoordinate);
            Coordinate[] coordinatesClosed = new Coordinate[coordinateList.size()];
            coordinatesClosed = coordinateList.toArray(coordinatesClosed);
            GeometryFactory fact = new GeometryFactory();
            LinearRing linear = new GeometryFactory().createLinearRing(coordinatesClosed);
            Polygon polygon = new Polygon(linear, null, fact);
            //Return the two polygon union result
            return polygon;
        }
    });
    return result;
}
项目:GeoSpark    文件:JoinQuery.java   
private static <U extends Geometry, T extends Geometry> JavaPairRDD<U, Long> countGeometriesByKey(JavaPairRDD<U, T> input) {
    return input.aggregateByKey(
        0L,
        new Function2<Long, T, Long>() {

            @Override
            public Long call(Long count, T t) throws Exception {
                return count + 1;
            }
        },
        new Function2<Long, Long, Long>() {

            @Override
            public Long call(Long count1, Long count2) throws Exception {
                return count1 + count2;
            }
        });
}
项目:GeoSpark    文件:ShapefileRDD.java   
/**
 * read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
 */
public BoundBox getBoundBox(JavaSparkContext sc, String inputPath){
    // read bound boxes into memory
    JavaPairRDD<Long, BoundBox>  bounds = sc.newAPIHadoopFile(
            inputPath,
            BoundaryInputFormat.class,
            Long.class,
            BoundBox.class,
            sc.hadoopConfiguration()
    );
    // merge all into one
    bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>(){
        @Override
        public BoundBox call(BoundBox box1, BoundBox box2) throws Exception {
            return BoundBox.mergeBoundBox(box1, box2);
        }
    });
    // if there is a result assign it to variable : boundBox
    if(bounds.count() > 0){
        return new BoundBox(bounds.collect().get(0)._2());
    }else return null;
}
项目:GeoSpark    文件:ShapefileReader.java   
/**
 * read and merge bound boxes of all shapefiles user input, if there is no, leave BoundBox null;
 */
public static BoundBox readBoundBox(JavaSparkContext sc, String inputPath){
    // read bound boxes into memory
    JavaPairRDD<Long, BoundBox>  bounds = sc.newAPIHadoopFile(
            inputPath,
            BoundaryInputFormat.class,
            Long.class,
            BoundBox.class,
            sc.hadoopConfiguration()
    );
    // merge all into one
    bounds = bounds.reduceByKey(new Function2<BoundBox, BoundBox, BoundBox>(){
        @Override
        public BoundBox call(BoundBox box1, BoundBox box2) throws Exception {
            return BoundBox.mergeBoundBox(box1, box2);
        }
    });
    // if there is a result assign it to variable : boundBox
    if(bounds.count() > 0){
        return new BoundBox(bounds.collect().get(0)._2());
    }else return null;
}
项目:stratio-connector-deep    文件:QueryExecutorTest.java   
@Before
public void before() throws Exception {

    queryExecutor = new QueryExecutor(deepContext, deepConnectionHandler);

    // Stubs
    when(deepConnectionHandler.getConnection(CLUSTERNAME_CONSTANT.getName())).thenReturn(deepConnection);
    when(deepConnection.getExtractorConfig()).thenReturn(extractorConfig);
    when(extractorConfig.clone()).thenReturn(extractorConfig);
    when(deepContext.createJavaRDD(any(ExtractorConfig.class))).thenReturn(singleRdd);
    when(deepContext.createHDFSRDD(any(ExtractorConfig.class))).thenReturn(rdd);
    when(rdd.toJavaRDD()).thenReturn(singleRdd);
    when(singleRdd.collect()).thenReturn(generateListOfCells(3));
    when(singleRdd.filter(any(Function.class))).thenReturn(singleRdd);
    when(singleRdd.map(any(FilterColumns.class))).thenReturn(singleRdd);
    when(singleRdd.mapToPair(any(PairFunction.class))).thenReturn(pairRdd);
    when(singleRdd.keyBy(any(Function.class))).thenReturn(pairRdd);
    when(pairRdd.join(pairRdd)).thenReturn(joinedRdd);
    when(pairRdd.reduceByKey(any(Function2.class))).thenReturn(pairRdd);
    when(pairRdd.map(any(Function.class))).thenReturn(singleRdd);
    when(joinedRdd.map(any(JoinCells.class))).thenReturn(singleRdd);


}
项目:incubator-blur    文件:BlurBulkLoadSparkProcessor.java   
@Override
protected Function2<JavaPairRDD<String, RowMutation>, Time, Void> getFunction() {
  return new Function2<JavaPairRDD<String, RowMutation>, Time, Void>() {
    // Blur Thrift Client
    @Override
    public Void call(JavaPairRDD<String, RowMutation> rdd, Time time) throws Exception {
      Iface client = getBlurClient();
      for (Tuple2<String, RowMutation> tuple : rdd.collect()) {
        if (tuple != null) {
          try {
            RowMutation rm = tuple._2;
            // Index using enqueue mutate call
            client.enqueueMutate(rm);
          } catch (Exception ex) {
            LOG.error("Unknown error while trying to call enqueueMutate.", ex);
            throw ex;
          }
        }
      }
      return null;
    }
  };
}
项目:AbstractRendering    文件:GlyphsetRDD.java   
@Override public Rectangle2D bounds() {
    final JavaRDD<Rectangle2D> rects;
    if (partitions) {
        rects = base.mapPartitions(
            new FlatMapFunction<Iterator<Glyph<G,I>>,Rectangle2D>() {
                public Iterable<Rectangle2D> call(Iterator<Glyph<G, I>> glyphs) throws Exception {
                    ArrayList<Glyph<G,I>> glyphList = Lists.newArrayList(new IterableIterator<>(glyphs));
                    return Arrays.asList(Util.bounds(glyphList));
                }});
    } else {
        rects = base.map(new Function<Glyph<G,I>,Rectangle2D>() {
            public Rectangle2D call(Glyph<G,I> glyph) throws Exception {
                return Util.boundOne(glyph.shape());
            }});
    }

    return rects.reduce(new Function2<Rectangle2D, Rectangle2D,Rectangle2D>() {
        public Rectangle2D call(Rectangle2D left, Rectangle2D right) throws Exception {
            return Util.bounds(left, right);
        }
    });

}
项目:incubator-sdap-mudrod    文件:SessionExtractor.java   
/**
 * bulidDataQueryRDD: convert click stream list to data set queries pairs.
 *
 * @param clickstreamRDD:
 *          click stream data
 * @param downloadWeight:
 *          weight of download behavior
 * @return JavaPairRDD, key is short name of data set, and values are queries
 */
public JavaPairRDD<String, List<String>> bulidDataQueryRDD(JavaRDD<ClickStream> clickstreamRDD, int downloadWeight) {
  return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(ClickStream click) throws Exception {
      List<String> query = new ArrayList<>();
      // important! download behavior is given higher weights
      // than viewing
      // behavior
      boolean download = click.isDownload();
      int weight = 1;
      if (download) {
        weight = downloadWeight;
      }
      for (int i = 0; i < weight; i++) {
        query.add(click.getKeyWords());
      }

      return new Tuple2<>(click.getViewDataset(), query);
    }
  }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public List<String> call(List<String> v1, List<String> v2) throws Exception {
      List<String> list = new ArrayList<>();
      list.addAll(v1);
      list.addAll(v2);
      return list;
    }
  });
}
项目:incubator-sdap-mudrod    文件:SessionExtractor.java   
public JavaPairRDD<String, Double> bulidUserItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
  return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, Double>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, Double> call(ClickStream click) throws Exception {
      double rate = 1;
      boolean download = click.isDownload();
      if (download) {
        rate = 2;
      }

      String sessionID = click.getSessionID();
      String user = sessionID.split("@")[0];

      return new Tuple2<>(user + "," + click.getViewDataset(), rate);
    }
  }).reduceByKey(new Function2<Double, Double, Double>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Double call(Double v1, Double v2) throws Exception {
      return v1 >= v2 ? v1 : v2;

    }
  });
}
项目:incubator-sdap-mudrod    文件:SessionGenerator.java   
public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException {

    JavaRDD<String> userRDD = getUserRDD(this.cleanupType);

    int sessionCount = 0;
    sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();
        List<Integer> sessionNums = new ArrayList<>();
        while (arg0.hasNext()) {
          String s = arg0.next();
          Integer sessionNum = genSessionByReferer(tmpES, s, timeThres);
          sessionNums.add(sessionNum);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return sessionNums.iterator();
      }
    }).reduce(new Function2<Integer, Integer, Integer>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });

    LOG.info("Initial Session count: {}", Integer.toString(sessionCount));
  }
项目:incubator-sdap-mudrod    文件:SessionStatistic.java   
public void processSessionInParallel() throws InterruptedException, IOException {

    List<String> sessions = this.getSessions();
    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition);

    int sessionCount = 0;
    sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
      @Override
      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();
        List<Integer> sessionNums = new ArrayList<Integer>();
        sessionNums.add(0);
        while (arg0.hasNext()) {
          String s = arg0.next();
          Integer sessionNum = processSession(tmpES, s);
          sessionNums.add(sessionNum);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return sessionNums.iterator();
      }
    }).reduce(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });

    LOG.info("Final Session count: {}", Integer.toString(sessionCount));
  }
项目:incubator-sdap-mudrod    文件:MetadataExtractor.java   
/**
 * buildMetadataRDD: Convert metadata list to JavaPairRDD
 *
 * @param es        an Elasticsearch client node instance
 * @param sc        spark context
 * @param index     index name of log processing application
 * @param metadatas metadata list
 * @return PairRDD, in each pair key is metadata short name and value is term
 * list extracted from metadata variables.
 */
protected JavaPairRDD<String, List<String>> buildMetadataRDD(ESDriver es, JavaSparkContext sc, String index, List<PODAACMetadata> metadatas) {
  JavaRDD<PODAACMetadata> metadataRDD = sc.parallelize(metadatas);
  JavaPairRDD<String, List<String>> metadataTermsRDD = metadataRDD.mapToPair(new PairFunction<PODAACMetadata, String, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(PODAACMetadata metadata) throws Exception {
      return new Tuple2<String, List<String>>(metadata.getShortName(), metadata.getAllTermList());
    }
  }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public List<String> call(List<String> v1, List<String> v2) throws Exception {
      List<String> list = new ArrayList<String>();
      list.addAll(v1);
      list.addAll(v2);
      return list;
    }
  });

  return metadataTermsRDD;
}
项目:spark-streaming-direct-kafka    文件:Functions.java   
/**
 * @return a function that returns the second of two values
 * @param <T> element type
 */
public static <T> Function2<T,T,T> last() {
    return new Function2<T,T,T>() {
        @Override
        public T call(T current, T next) {
            return next;
        }
    };
}
项目:gspark    文件:JavaLogQuery.java   
public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);

    JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
      @Override
      public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
        return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
      }
    });

    JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
      @Override
      public Stats call(Stats stats, Stats stats2) {
        return stats.merge(stats2);
      }
    });

    List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
    for (Tuple2<?,?> t : output) {
      System.out.println(t._1() + "\t" + t._2());
    }
    jsc.stop();
  }
项目:mudrod    文件:SessionExtractor.java   
/**
 * bulidDataQueryRDD: convert click stream list to data set queries pairs.
 *
 * @param clickstreamRDD:
 *          click stream data
 * @param downloadWeight:
 *          weight of download behavior
 * @return JavaPairRDD, key is short name of data set, and values are queries
 */
public JavaPairRDD<String, List<String>> bulidDataQueryRDD(JavaRDD<ClickStream> clickstreamRDD, int downloadWeight) {
  return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(ClickStream click) throws Exception {
      List<String> query = new ArrayList<>();
      // important! download behavior is given higher weights
      // than viewing
      // behavior
      boolean download = click.isDownload();
      int weight = 1;
      if (download) {
        weight = downloadWeight;
      }
      for (int i = 0; i < weight; i++) {
        query.add(click.getKeyWords());
      }

      return new Tuple2<>(click.getViewDataset(), query);
    }
  }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public List<String> call(List<String> v1, List<String> v2) throws Exception {
      List<String> list = new ArrayList<>();
      list.addAll(v1);
      list.addAll(v2);
      return list;
    }
  });
}
项目:mudrod    文件:SessionExtractor.java   
public JavaPairRDD<String, Double> bulidUserItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
  return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, Double>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, Double> call(ClickStream click) throws Exception {
      double rate = 1;
      boolean download = click.isDownload();
      if (download) {
        rate = 2;
      }

      String sessionID = click.getSessionID();
      String user = sessionID.split("@")[0];

      return new Tuple2<>(user + "," + click.getViewDataset(), rate);
    }
  }).reduceByKey(new Function2<Double, Double, Double>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Double call(Double v1, Double v2) throws Exception {
      return v1 >= v2 ? v1 : v2;

    }
  });
}
项目:mudrod    文件:SessionGenerator.java   
public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException {

    JavaRDD<String> userRDD = getUserRDD(this.cleanupType);

    int sessionCount = 0;
    sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();
        List<Integer> sessionNums = new ArrayList<>();
        while (arg0.hasNext()) {
          String s = arg0.next();
          Integer sessionNum = genSessionByReferer(tmpES, s, timeThres);
          sessionNums.add(sessionNum);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return sessionNums.iterator();
      }
    }).reduce(new Function2<Integer, Integer, Integer>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });

    LOG.info("Initial Session count: {}", Integer.toString(sessionCount));
  }
项目:mudrod    文件:SessionStatistic.java   
public void processSessionInParallel() throws InterruptedException, IOException {

    List<String> sessions = this.getSessions();
    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition);

    int sessionCount = 0;
    sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
      @Override
      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();
        List<Integer> sessionNums = new ArrayList<Integer>();
        sessionNums.add(0);
        while (arg0.hasNext()) {
          String s = arg0.next();
          Integer sessionNum = processSession(tmpES, s);
          sessionNums.add(sessionNum);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return sessionNums.iterator();
      }
    }).reduce(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });

    LOG.info("Final Session count: {}", Integer.toString(sessionCount));
  }
项目:mudrod    文件:MetadataExtractor.java   
/**
 * buildMetadataRDD: Convert metadata list to JavaPairRDD
 *
 * @param es        an Elasticsearch client node instance
 * @param sc        spark context
 * @param index     index name of log processing application
 * @param metadatas metadata list
 * @return PairRDD, in each pair key is metadata short name and value is term
 * list extracted from metadata variables.
 */
protected JavaPairRDD<String, List<String>> buildMetadataRDD(ESDriver es, JavaSparkContext sc, String index, List<PODAACMetadata> metadatas) {
  JavaRDD<PODAACMetadata> metadataRDD = sc.parallelize(metadatas);
  JavaPairRDD<String, List<String>> metadataTermsRDD = metadataRDD.mapToPair(new PairFunction<PODAACMetadata, String, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(PODAACMetadata metadata) throws Exception {
      return new Tuple2<String, List<String>>(metadata.getShortName(), metadata.getAllTermList());
    }
  }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public List<String> call(List<String> v1, List<String> v2) throws Exception {
      List<String> list = new ArrayList<String>();
      list.addAll(v1);
      list.addAll(v2);
      return list;
    }
  });

  return metadataTermsRDD;
}
项目:incubator-pulsar    文件:SparkStreamingPulsarReceiverExample.java   
public static void main(String[] args) throws InterruptedException {
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("pulsar-spark");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

    ClientConfiguration clientConf = new ClientConfiguration();
    ConsumerConfiguration consConf = new ConsumerConfiguration();
    String url = "pulsar://localhost:6650/";
    String topic = "persistent://sample/standalone/ns1/topic1";
    String subs = "sub1";

    JavaReceiverInputDStream<byte[]> msgs = jssc
            .receiverStream(new SparkStreamingPulsarReceiver(clientConf, consConf, url, topic, subs));

    JavaDStream<Integer> isContainingPulsar = msgs.flatMap(new FlatMapFunction<byte[], Integer>() {
        @Override
        public Iterator<Integer> call(byte[] msg) {
            return Arrays.asList(((new String(msg)).indexOf("Pulsar") != -1) ? 1 : 0).iterator();
        }
    });

    JavaDStream<Integer> numOfPulsar = isContainingPulsar.reduce(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
        }
    });

    numOfPulsar.print();

    jssc.start();
    jssc.awaitTermination();
}
项目:rheem    文件:FunctionCompiler.java   
/**
 * Create an appropriate {@link Function} for deploying the given {@link ReduceDescriptor}
 * on Apache Spark.
 */
public <T> Function2<T, T, T> compile(ReduceDescriptor<T> descriptor,
                                      SparkExecutionOperator operator,
                                      OptimizationContext.OperatorContext operatorContext,
                                      ChannelInstance[] inputs) {
    final BinaryOperator<T> javaImplementation = descriptor.getJavaImplementation();
    if (javaImplementation instanceof FunctionDescriptor.ExtendedSerializableBinaryOperator) {
        return new ExtendedBinaryOperatorAdapter<>(
                (FunctionDescriptor.ExtendedSerializableBinaryOperator<T>) javaImplementation,
                new SparkExecutionContext(operator, inputs, operatorContext.getOptimizationContext().getIterationNumber())
        );
    } else {
        return new BinaryOperatorAdapter<>(javaImplementation);
    }
}
项目:kite-apps    文件:SparkDatasets.java   
/**
 * Save all RDDs in the given DStream to the given view.
 * @param dstream
 * @param view
 */
public static <T> void save(JavaDStream<T> dstream, final View<T> view) {

  final String uri = view.getUri().toString();

  dstream.foreachRDD(new Function2<JavaRDD<T>, Time, Void>() {
    @Override
    public Void call(JavaRDD<T> rdd, Time time) throws Exception {

      save(rdd, uri);

      return null;
    }
  });
}
项目:learning-spark    文件:SimpleAggregation.java   
public static void main(String[] args) {
  SparkConf sparkConf = new SparkConf().setAppName("Ad Provider Aggregation");
  JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);


  // Read the source file
  JavaRDD<String> adInput = sparkContext.textFile(args[0]);

  // Now we have non-empty lines, lets split them into words
  JavaPairRDD<String, Integer> adsRDD = adInput.mapToPair(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String s) {
      CSVReader csvReader = new CSVReader(new StringReader(s));
      // lets skip error handling here for simplicity
      try {
        String[] adDetails = csvReader.readNext();
        return new Tuple2<String, Integer>(adDetails[1], 1);
      } catch (IOException e) {
        e.printStackTrace();
        // noop
      }
      // Need to explore more on error handling
      return new Tuple2<String, Integer>("-1", 1);
    }
  });

  JavaPairRDD<String, Integer> adsAggregated = adsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
      return integer + integer2;
    }
  });

  adsAggregated.saveAsTextFile("./output/ads-aggregated-provider");
}
项目:adam-plugins    文件:JavaCountAlignments.java   
@Override
public JavaRDD<Tuple2<String, Integer>> run(final JavaADAMContext ac, final JavaRDD<AlignmentRecord> recs, final String args) {

    JavaRDD<String> contigNames = recs.map(new Function<AlignmentRecord, String>() {
            @Override
            public String call(final AlignmentRecord rec) {
                return rec.getReadMapped() ? rec.getContigName() : "unmapped";
            }
        });

    JavaPairRDD<String, Integer> counts = contigNames.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(final String contigName) {
                return new Tuple2<String, Integer>(contigName, Integer.valueOf(1));
            }
        });

    JavaPairRDD<String, Integer> reducedCounts = counts.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(final Integer value0, final Integer value1) {
                return Integer.valueOf(value0.intValue() + value1.intValue());
            }
        });

    // todo:  seems like there should be a more direct way
    return JavaRDD.fromRDD(reducedCounts.rdd(), null);
}
项目:adam-plugins    文件:JavaCountAlignmentsPerRead.java   
@Override
public JavaRDD<Tuple2<String, Integer>> run(final JavaADAMContext ac, final JavaRDD<AlignmentRecord> recs, final String args) {

    JavaRDD<String> contigNames = recs.map(new Function<AlignmentRecord, String>() {
            @Override
            public String call(final AlignmentRecord rec) {
                return rec.getReadMapped() ? rec.getReadName() : "unmapped";
            }
        });

    JavaPairRDD<String, Integer> counts = contigNames.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(final String readName) {
                return new Tuple2<String, Integer>(readName, Integer.valueOf(1));
            }
        });

    JavaPairRDD<String, Integer> reducedCounts = counts.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(final Integer value0, final Integer value1) {
                return Integer.valueOf(value0.intValue() + value1.intValue());
            }
        });

    // todo:  seems like there should be a more direct way
    return JavaRDD.fromRDD(reducedCounts.rdd(), null);
}
项目:DDF    文件:MLSupporter.java   
@Override
public long[][] getConfusionMatrix(IModel model, double threshold) throws DDFException {
  SparkDDF ddf = (SparkDDF) this.getDDF();
  SparkDDF predictions = (SparkDDF) ddf.ML.applyModel(model, true, false);

  // Now get the underlying RDD to compute
  JavaRDD<double[]> yTrueYPred = (JavaRDD<double[]>) predictions.getJavaRDD(double[].class);
  final double threshold1 = threshold;
  long[] cm = yTrueYPred.map(new Function<double[], long[]>() {
    @Override
    public long[] call(double[] params) {
      byte isPos = toByte(params[0] > threshold1);
      byte predPos = toByte(params[1] > threshold1);

      long[] result = new long[] { 0L, 0L, 0L, 0L };
      result[isPos << 1 | predPos] = 1L;
      return result;
    }
  }).reduce(new Function2<long[], long[], long[]>() {
    @Override
    public long[] call(long[] a, long[] b) {
      return new long[] { a[0] + b[0], a[1] + b[1], a[2] + b[2], a[3] + b[3] };
    }
  });

  return new long[][] { new long[] { cm[3], cm[2] }, new long[] { cm[1], cm[0] } };
}
项目:learning-spark-examples    文件:BasicSum.java   
public static void main(String[] args) throws Exception {
    String master;
    if (args.length > 0) {
     master = args[0];
    } else {
        master = "local";
    }
    JavaSparkContext sc = new JavaSparkContext(
     master, "basicmap", System.getenv("SPARK_HOME"), System.getenv("JARS"));
   JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
   Integer result = rdd.fold(0, new Function2<Integer, Integer, Integer>() {
       public Integer call(Integer x, Integer y) { return x + y;}});
   System.out.println(result);
}
项目:Test_Projects    文件:Streaming101.java   
public static void main(String[] args) {

    // Create the context with a 1 second batch size
    SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming102");
    //SparkConf sparkConf = new SparkConf().setMaster("spark://10.204.100.206:7077").setAppName("Streaming102");
    sparkConf.setJars(new String[] { "target\\original-TestProjects-1.0-SNAPSHOT.jar" });
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

    String folder = "./stream/";
    if(args.length == 1){
        folder = args[0];
    }

    JavaDStream<String> lines = ssc.textFileStream(folder);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) {
          System.out.println(x);
          return Lists.newArrayList(SPACE.split(x));
      }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
    });

    wordCounts.print();
    ssc.start();
    ssc.awaitTermination();
  }
项目:Test_Projects    文件:Streaming102.java   
public static void main(String[] args) {

    // Create the context with a 1 second batch size
    SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming101");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));


    JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost",9999, StorageLevels.MEMORY_AND_DISK_SER);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) {
          System.out.println(x);
          return Lists.newArrayList(SPACE.split(x));
      }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
    });

    wordCounts.print();
    ssc.start();
    ssc.awaitTermination();
  }
项目:stratio-connector-deep    文件:QueryExecutorTest.java   
@Test
public void simpleProjectAndSelectWithGroupByQueryTest() throws UnsupportedException, ExecutionException
         {

    // Input data
    List<LogicalStep> stepList = new ArrayList<>();
    Project project = createProject(CLUSTERNAME_CONSTANT, TABLE1_CONSTANT);

    GroupBy groupBy = createGroupBy();
    project.setNextStep(groupBy);

    groupBy.setNextStep(createSelect());

    // One single initial step
    stepList.add(project);

    LogicalWorkflow logicalWorkflow = new LogicalWorkflow(stepList);

    // Execution
    queryExecutor.executeWorkFlow(logicalWorkflow);

    // Assertions
    verify(deepContext, times(1)).createJavaRDD(any(ExtractorConfig.class));
    verify(singleRdd, times(0)).filter(any(Function.class));
    verify(singleRdd, times(0)).mapToPair(any(MapKeyForJoin.class));
    verify(singleRdd, times(0)).mapToPair(any(MapKeyForJoin.class));
    verify(pairRdd, times(0)).join(pairRdd);
    verify(joinedRdd, times(0)).map(any(JoinCells.class));
    verify(singleRdd, times(1)).map(any(Function.class));
    verify(joinedRdd, times(0)).map(any(Function.class));
    verify(singleRdd, times(1)).keyBy(any(Function.class));
    verify(pairRdd, times(1)).reduceByKey(any(Function2.class));
    verify(pairRdd, times(1)).map(any(Function.class));

}
项目:MFIBlocking    文件:JavaLogQuery.java   
public static void main(String[] args) throws Exception {
  if (args.length == 0) {
    System.err.println("Usage: JavaLogQuery <master> [logFile]");
    System.exit(1);
  }

  JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
    System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));

  JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);

  JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
    @Override
    public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception {
      return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
    }
  });

  JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
    @Override
    public Stats call(Stats stats, Stats stats2) throws Exception {
      return stats.merge(stats2);
    }
  });

  List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
  for (Tuple2 t : output) {
    System.out.println(t._1 + "\t" + t._2);
  }
  System.exit(0);
}
项目:Building-Data-Streaming-Applications-with-Apache-Kafka    文件:KafkaReceiverWordCountJava.java   
public static void main(String[] args) throws Exception {
    String zkQuorum = "localhost:2181";
    String groupName = "stream";
    int numThreads = 3;
    String topicsName = "test1";
    SparkConf sparkConf = new SparkConf().setAppName("WordCountKafkaStream");

    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(5000));

    Map<String, Integer> topicToBeUsedBySpark = new HashMap<>();
    String[] topics = topicsName.split(",");
    for (String topic : topics) {
        topicToBeUsedBySpark.put(topic, numThreads);
    }

    JavaPairReceiverInputDStream<String, String> streamMessages =
            KafkaUtils.createStream(javaStreamingContext, zkQuorum, groupName, topicToBeUsedBySpark);

    JavaDStream<String> lines = streamMessages.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> tuple2) {
            return tuple2._2();
        }
    });

    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterator<String> call(String x) {
            return Arrays.asList(WORD_DELIMETER.split(x)).iterator();
        }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<>(s, 1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
        }
    });

    wordCounts.print();
    javaStreamingContext.start();
    javaStreamingContext.awaitTermination();
}