Java 类org.apache.spark.api.java.function.PairFunction 实例源码

项目:incubator-sdap-mudrod    文件:MetadataOpt.java   
private JavaPairRDD<String, String> parallizeData(SparkDriver spark, List<Tuple2<String, String>> datasetContent) {

    JavaRDD<Tuple2<String, String>> datasetContentRDD = spark.sc.parallelize(datasetContent);

    return datasetContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
      /**
       * 
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, String> call(Tuple2<String, String> term) throws Exception {
        return term;
      }
    });

  }
项目:incubator-sdap-mudrod    文件:MetadataOpt.java   
public JavaPairRDD<String, List<String>> tokenizeData(JavaPairRDD<String, String> datasetsContentRDD, String splitter) throws Exception {

    return datasetsContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, List<String>>() {
      /**
       * 
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, List<String>> call(Tuple2<String, String> arg) throws Exception {
        String content = arg._2;
        List<String> tokens = getTokens(content, splitter);

        return new Tuple2<>(arg._1, tokens);
      }
    });

  }
项目:mudrod    文件:MetadataOpt.java   
private JavaPairRDD<String, String> parallizeData(SparkDriver spark, List<Tuple2<String, String>> datasetContent) {

    JavaRDD<Tuple2<String, String>> datasetContentRDD = spark.sc.parallelize(datasetContent);

    return datasetContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
      /**
       * 
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, String> call(Tuple2<String, String> term) throws Exception {
        return term;
      }
    });

  }
项目:mudrod    文件:MetadataOpt.java   
public JavaPairRDD<String, List<String>> tokenizeData(JavaPairRDD<String, String> datasetsContentRDD, String splitter) throws Exception {

    return datasetsContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, List<String>>() {
      /**
       * 
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, List<String>> call(Tuple2<String, String> arg) throws Exception {
        String content = arg._2;
        List<String> tokens = getTokens(content, splitter);

        return new Tuple2<>(arg._1, tokens);
      }
    });

  }
项目:spark-newsreel-recommender    文件:ConcurrentSparkList.java   
private void addNewElement(JavaPairRDD newPair, JavaPairRDD timeStamp) {
    item2ReadCount = item2ReadCount
            .union(newPair)
            .coalesce(numPartitions, false)
            .reduceByKey((v1, v2) ->  (Long) v1 +  (Long) v2, numPartitions)
            .mapToPair((PairFunction<Tuple2<Long, Long>, Long, Long>) Tuple2::swap)
            .sortByKey(false, numPartitions)
            .mapToPair((PairFunction<Tuple2<Long, Long>, Long, Long>) Tuple2::swap);
    item2timeStampData = item2timeStampData
            .union(timeStamp)
            .coalesce(numPartitions, false)
            .reduceByKey(replaceValues)
            .mapToPair((PairFunction<Tuple2<Long, Long>, Long, Long>) Tuple2::swap)
            .sortByKey(true, numPartitions)
            .mapToPair((PairFunction<Tuple2<Long, Long>, Long, Long>) Tuple2::swap);
}
项目:beam    文件:TranslationUtils.java   
/**
 * A utility method that adapts {@link PairFunction} to a {@link PairFlatMapFunction} with an
 * {@link Iterator} input. This is particularly useful because it allows to use functions written
 * for mapToPair functions in flatmapToPair functions.
 *
 * @param pairFunction the {@link PairFunction} to adapt.
 * @param <T> the input type.
 * @param <K> the output key type.
 * @param <V> the output value type.
 * @return a {@link PairFlatMapFunction} that accepts an {@link Iterator} as an input and applies
 *     the {@link PairFunction} on every element.
 */
public static <T, K, V> PairFlatMapFunction<Iterator<T>, K, V> pairFunctionToPairFlatMapFunction(
    final PairFunction<T, K, V> pairFunction) {
  return new PairFlatMapFunction<Iterator<T>, K, V>() {

    @Override
    public Iterator<Tuple2<K, V>> call(Iterator<T> itr) throws Exception {
      final Iterator<Tuple2<K, V>> outputItr =
          Iterators.transform(
              itr,
              new com.google.common.base.Function<T, Tuple2<K, V>>() {

                @Override
                public Tuple2<K, V> apply(T t) {
                  try {
                    return pairFunction.call(t);
                  } catch (Exception e) {
                    throw new RuntimeException(e);
                  }
                }
              });
      return outputItr;
    }
  };
}
项目:beam    文件:CoderHelpers.java   
/**
 * A function wrapper for converting a byte array pair to a key-value pair, where
 * values are {@link Iterable}.
 *
 * @param keyCoder Coder to deserialize keys.
 * @param valueCoder Coder to deserialize values.
 * @param <K>   The type of the key being deserialized.
 * @param <V>   The type of the value being deserialized.
 * @return A function that accepts a pair of byte arrays and returns a key-value pair.
 */
public static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>
    fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) {
  return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() {
    @Override
    public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) {
      return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
        Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() {
          @Override
          public V apply(byte[] bytes) {
            return fromByteArray(bytes, valueCoder);
          }
        }));
    }
  };
}
项目:nats-connector-spark    文件:KeyValueSparkToStandardNatsConnectorLifecycleTest.java   
protected void publishToNats(final String subject1, final String subject2, final int partitionsNb) {
    final JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath()).repartition(partitionsNb);      

    JavaPairDStream<String, String> stream1 = 
            lines.mapToPair((PairFunction<String, String, String>) str -> {
                                return new Tuple2<String, String>(subject1, str);
                            });
    JavaPairDStream<String, String> stream2 = 
            lines.mapToPair((PairFunction<String, String, String>) str -> {
                                return new Tuple2<String, String>(subject2, str);
                            });
    final JavaPairDStream<String, String> stream = stream1.union(stream2);

    if (logger.isDebugEnabled()) {
        stream.print();
    }       

    SparkToNatsConnectorPool
        .newPool()
        .withNatsURL(NATS_SERVER_URL)
        .withConnectionTimeout(Duration.ofSeconds(2))
        .publishToNatsAsKeyValue(stream);
}
项目:spark-dataflow    文件:TransformTranslator.java   
private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() {
  return new TransformEvaluator<TextIO.Write.Bound<T>>() {
    @Override
    public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) {
      @SuppressWarnings("unchecked")
      JavaPairRDD<T, Void> last =
          ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
          .map(WindowingHelpers.<T>unwindowFunction())
          .mapToPair(new PairFunction<T, T,
                  Void>() {
            @Override
            public Tuple2<T, Void> call(T t) throws Exception {
              return new Tuple2<>(t, null);
            }
          });
      ShardTemplateInformation shardTemplateInfo =
          new ShardTemplateInformation(transform.getNumShards(),
              transform.getShardTemplate(), transform.getFilenamePrefix(),
              transform.getFilenameSuffix());
      writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class,
          NullWritable.class, TemplatedTextOutputFormat.class);
    }
  };
}
项目:spark-dataflow    文件:CoderHelpers.java   
/**
 * A function wrapper for converting a byte array pair to a key-value pair, where
 * values are {@link Iterable}.
 *
 * @param keyCoder Coder to deserialize keys.
 * @param valueCoder Coder to deserialize values.
 * @param <K>   The type of the key being deserialized.
 * @param <V>   The type of the value being deserialized.
 * @return A function that accepts a pair of byte arrays and returns a key-value pair.
 */
static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>
    fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) {
  return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() {
    @Override
    public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) {
      return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
        Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() {
          @Override
          public V apply(byte[] bytes) {
            return fromByteArray(bytes, valueCoder);
          }
        }));
    }
  };
}
项目:SHMACK    文件:WordCount.java   
@SuppressWarnings("serial")
@Override
public SortedCounts<String> execute(final JavaSparkContext spark) {
    final JavaRDD<String> textFile = spark.textFile(inputFile);
    final JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(final String rawJSON) throws TwitterException {
            final Status tweet = TwitterObjectFactory.createStatus(rawJSON);
            String text = tweet.getText();
            return Arrays.asList(text.split(" "));
        }
    });
    final JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(final String s) {
            return new Tuple2<String, Integer>(s.toLowerCase(), 1);
        }
    });
    final JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(final Integer a, final Integer b) {
            return a + b;
        }
    });
    return SortedCounts.create(counts);
}
项目:GeoSpark    文件:VisualizationOperator.java   
/**
 * Spatial partitioning without duplicates.
 *
 * @return true, if successful
 * @throws Exception the exception
 */
private boolean spatialPartitioningWithoutDuplicates() throws Exception
{
    this.distributedRasterColorMatrix = this.distributedRasterColorMatrix.mapToPair(new PairFunction<Tuple2<Pixel, Integer>, Pixel, Integer>() {
        @Override
        public Tuple2<Pixel, Integer> call(Tuple2<Pixel, Integer> pixelDoubleTuple2) throws Exception {
            Pixel newPixel = new Pixel(pixelDoubleTuple2._1().getX(),pixelDoubleTuple2._1().getY(),resolutionX,resolutionY);
            newPixel.setDuplicate(false);
            newPixel.setCurrentPartitionId(VisualizationPartitioner.CalculatePartitionId(resolutionX,resolutionY,partitionX, partitionY, pixelDoubleTuple2._1.getX(), pixelDoubleTuple2._1.getY()));
            Tuple2<Pixel,Integer> newPixelDoubleTuple2 = new Tuple2<Pixel, Integer>(newPixel, pixelDoubleTuple2._2());
            return newPixelDoubleTuple2;
        }
    });
    this.distributedRasterColorMatrix = this.distributedRasterColorMatrix.partitionBy(new VisualizationPartitioner(this.resolutionX,this.resolutionY,this.partitionX,this.partitionY));
    return true;
}
项目:Test_Projects    文件:Spark301.java   
public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf();
    sparkConf.setMaster("local");
    sparkConf.setAppName("TestSpark");

    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    JavaRDD<String> input = sc.parallelize(data);

    JavaPairRDD<String, String> inputPair = input.mapToPair(
            new PairFunction<String, String, String>() {
                @Override
                public Tuple2<String, String> call(String x)    throws Exception {
                    return new Tuple2<String, String>(x.split(" ")[0], x);
                }
            }
    );

    System.out.println(inputPair.take(100));


}
项目:stratio-connector-deep    文件:QueryExecutorTest.java   
@Before
public void before() throws Exception {

    queryExecutor = new QueryExecutor(deepContext, deepConnectionHandler);

    // Stubs
    when(deepConnectionHandler.getConnection(CLUSTERNAME_CONSTANT.getName())).thenReturn(deepConnection);
    when(deepConnection.getExtractorConfig()).thenReturn(extractorConfig);
    when(extractorConfig.clone()).thenReturn(extractorConfig);
    when(deepContext.createJavaRDD(any(ExtractorConfig.class))).thenReturn(singleRdd);
    when(deepContext.createHDFSRDD(any(ExtractorConfig.class))).thenReturn(rdd);
    when(rdd.toJavaRDD()).thenReturn(singleRdd);
    when(singleRdd.collect()).thenReturn(generateListOfCells(3));
    when(singleRdd.filter(any(Function.class))).thenReturn(singleRdd);
    when(singleRdd.map(any(FilterColumns.class))).thenReturn(singleRdd);
    when(singleRdd.mapToPair(any(PairFunction.class))).thenReturn(pairRdd);
    when(singleRdd.keyBy(any(Function.class))).thenReturn(pairRdd);
    when(pairRdd.join(pairRdd)).thenReturn(joinedRdd);
    when(pairRdd.reduceByKey(any(Function2.class))).thenReturn(pairRdd);
    when(pairRdd.map(any(Function.class))).thenReturn(singleRdd);
    when(joinedRdd.map(any(JoinCells.class))).thenReturn(singleRdd);


}
项目:java-feature-set    文件:SparkKeyValueEx.java   
@SuppressWarnings("serial")
private static final PairFunction<Integer, Integer, Integer> convertToKeyValue() {

  /**
   * Convert to key-value [key (integer) : value (integer * integer)]
   */
  return new PairFunction<Integer, Integer, Integer>() {

    @Override
    public final Tuple2<Integer, Integer> call(final Integer integer) throws Exception {

      /* Tuple : key (integer) : value (integer * integer) */
      return new Tuple2<Integer, Integer>(integer, integer * integer);
    }
  };
}
项目:java-feature-set    文件:ProductsCountryCount.java   
@SuppressWarnings("serial")
private static final JavaPairRDD<Integer, String> userDataAsKeyValue(final JavaRDD<String> userInputFile) {

  /* Left Outer Join of transactions on users */
  return userInputFile.mapToPair(new PairFunction<String, Integer, String>() {

    public Tuple2<Integer, String> call(final String user) {

      logger.debug("User : " + user);
      final String[] userSplit = user.split("\t");

      /* Tuple : key (user-id) : value (country) */
      return new Tuple2<Integer, String>(Integer.valueOf(userSplit[0]), userSplit[3]);
    }
  });
}
项目:incubator-blur    文件:BlurLoadSparkProcessor.java   
public void run() throws IOException {
  SparkConf conf = new SparkConf();
  conf.setAppName(getAppName());
  conf.set(SPARK_SERIALIZER, ORG_APACHE_SPARK_SERIALIZER_KRYO_SERIALIZER);
  JavaSparkUtil.packProjectJars(conf);
  setupSparkConf(conf);

  JavaStreamingContext ssc = new JavaStreamingContext(conf, getDuration());
  List<JavaDStream<T>> streamsList = getStreamsList(ssc);

  // Union all the streams if there is more than 1 stream
  JavaDStream<T> streams = unionStreams(ssc, streamsList);

  JavaPairDStream<String, RowMutation> pairDStream = streams.mapToPair(new PairFunction<T, String, RowMutation>() {
    public Tuple2<String, RowMutation> call(T t) {
      RowMutation rowMutation = convert(t);
      return new Tuple2<String, RowMutation>(rowMutation.getRowId(), rowMutation);
    }
  });

  pairDStream.foreachRDD(getFunction());

  ssc.start();
  ssc.awaitTermination();
}
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountTransformOpEx.java   
public static void main(String[] args) throws Exception {

      System.setProperty("hadoop.home.dir", "E:\\hadoop");

   SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
   JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
   Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.WARN); 
   List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 10));
   JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);


   JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);

   JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );

   JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );

   wordCounts.print();

JavaPairDStream<String, Integer> joinedDstream = wordCounts
        .transformToPair(new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
            @Override
            public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
                JavaPairRDD<String, Integer> modRDD = rdd.join(initialRDD).mapToPair(
                        new PairFunction<Tuple2<String, Tuple2<Integer, Integer>>, String, Integer>() {
                            @Override
                            public Tuple2<String, Integer> call(
                                    Tuple2<String, Tuple2<Integer, Integer>> joinedTuple) throws Exception {
                                return new Tuple2<>(joinedTuple._1(),(joinedTuple._2()._1() + joinedTuple._2()._2()));
                            }
                        });
                return modRDD;
            }
        });

   joinedDstream.print();
   streamingContext.start();
   streamingContext.awaitTermination();
 }
项目:Apache-Spark-2x-for-Java-Developers    文件:WordCountSocketJava8Ex.java   
public static void main(String[] args) throws Exception {

     System.setProperty("hadoop.home.dir", "E:\\hadoop");

  SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
  JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));

  List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 10), new Tuple2<>("world", 10));
  JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);


  JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);

  JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );

  JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );

  wordCounts.print();

JavaPairDStream<String, Integer> joinedDstream = wordCounts.transformToPair(
   new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
        @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
            rdd.join(initialRDD).mapToPair(new PairFunction<Tuple2<String,Tuple2<Integer,Integer>>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<String, Tuple2<Integer, Integer>> joinedTuple)
                        throws Exception {
                    // TODO Auto-generated method stub
                    return new Tuple2<>( joinedTuple._1(), (joinedTuple._2()._1()+joinedTuple._2()._2()) );
                }
            });

        return rdd;                      
        }
      });

joinedDstream.print();
  streamingContext.start();
  streamingContext.awaitTermination();
}
项目:incubator-sdap-mudrod    文件:SessionCooccurence.java   
/**
 * filter out-of-data metadata
 *
 * @param es
 *          the Elasticsearch drive
 * @param userDatasetsRDD
 *          dataset extracted from session
 * @return filtered session datasets
 */
public JavaPairRDD<String, List<String>> removeRetiredDataset(ESDriver es, JavaPairRDD<String, List<String>> userDatasetsRDD) {

  Map<String, String> nameMap = this.getOnServiceMetadata(es);

  return userDatasetsRDD.mapToPair(new PairFunction<Tuple2<String, List<String>>, String, List<String>>() {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(Tuple2<String, List<String>> arg0) throws Exception {
      List<String> oriDatasets = arg0._2;
      List<String> newDatasets = new ArrayList<>();
      int size = oriDatasets.size();
      for (int i = 0; i < size; i++) {
        String name = oriDatasets.get(i);
        if (nameMap.containsKey(name)) {
          newDatasets.add(nameMap.get(name));
        }
      }
      return new Tuple2<>(arg0._1, newDatasets);
    }
  });

}
项目:incubator-sdap-mudrod    文件:SessionExtractor.java   
/**
 * bulidDataQueryRDD: convert click stream list to data set queries pairs.
 *
 * @param clickstreamRDD:
 *          click stream data
 * @param downloadWeight:
 *          weight of download behavior
 * @return JavaPairRDD, key is short name of data set, and values are queries
 */
public JavaPairRDD<String, List<String>> bulidDataQueryRDD(JavaRDD<ClickStream> clickstreamRDD, int downloadWeight) {
  return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(ClickStream click) throws Exception {
      List<String> query = new ArrayList<>();
      // important! download behavior is given higher weights
      // than viewing
      // behavior
      boolean download = click.isDownload();
      int weight = 1;
      if (download) {
        weight = downloadWeight;
      }
      for (int i = 0; i < weight; i++) {
        query.add(click.getKeyWords());
      }

      return new Tuple2<>(click.getViewDataset(), query);
    }
  }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public List<String> call(List<String> v1, List<String> v2) throws Exception {
      List<String> list = new ArrayList<>();
      list.addAll(v1);
      list.addAll(v2);
      return list;
    }
  });
}
项目:incubator-sdap-mudrod    文件:SessionExtractor.java   
public JavaPairRDD<String, Double> bulidUserItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
  return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, Double>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, Double> call(ClickStream click) throws Exception {
      double rate = 1;
      boolean download = click.isDownload();
      if (download) {
        rate = 2;
      }

      String sessionID = click.getSessionID();
      String user = sessionID.split("@")[0];

      return new Tuple2<>(user + "," + click.getViewDataset(), rate);
    }
  }).reduceByKey(new Function2<Double, Double, Double>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Double call(Double v1, Double v2) throws Exception {
      return v1 >= v2 ? v1 : v2;

    }
  });
}
项目:incubator-sdap-mudrod    文件:MetadataExtractor.java   
/**
 * buildMetadataRDD: Convert metadata list to JavaPairRDD
 *
 * @param es        an Elasticsearch client node instance
 * @param sc        spark context
 * @param index     index name of log processing application
 * @param metadatas metadata list
 * @return PairRDD, in each pair key is metadata short name and value is term
 * list extracted from metadata variables.
 */
protected JavaPairRDD<String, List<String>> buildMetadataRDD(ESDriver es, JavaSparkContext sc, String index, List<PODAACMetadata> metadatas) {
  JavaRDD<PODAACMetadata> metadataRDD = sc.parallelize(metadatas);
  JavaPairRDD<String, List<String>> metadataTermsRDD = metadataRDD.mapToPair(new PairFunction<PODAACMetadata, String, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(PODAACMetadata metadata) throws Exception {
      return new Tuple2<String, List<String>>(metadata.getShortName(), metadata.getAllTermList());
    }
  }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public List<String> call(List<String> v1, List<String> v2) throws Exception {
      List<String> list = new ArrayList<String>();
      list.addAll(v1);
      list.addAll(v2);
      return list;
    }
  });

  return metadataTermsRDD;
}
项目:gcp    文件:Spark8Organized.java   
private static PairFunction<Tuple2<String, String>, String, ExampleXML> parseXml() {
  ParseXML parser = new ParseXML();
  return tuple -> {
    try {
      return new Tuple2<>(tuple._1(), parser.call(tuple._2()));
    } catch(JAXBException badXML) {
      System.err.printf("Bad XML at %s\n", tuple._1());
      badXML.printStackTrace();
      return null;
    }
  };
}
项目:gcp    文件:Spark8Organized.java   
private static PairFunction<Tuple2<String, ExampleXML>, Object, JsonObject> prepToBq() {
  return tuple -> {
    JsonObject json = new JsonObject();
    json.addProperty("property1", tuple._2().getProperty1());
    json.addProperty("insertId", tuple._1());
    return new Tuple2<>(null, json);
  };
}
项目:fst-bench    文件:JavaTeraSort.java   
public static void main(String[] args) throws Exception {

    if (args.length < 2) {
      System.err.println("Usage: JavaTeraSort <HDFS_INPUT> <HDFS_OUTPUT>");
      System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("JavaTeraSort");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    JavaRDD<String> lines = ctx.textFile(args[0], 1);
    Integer parallel = sparkConf.getInt("spark.default.parallelism", ctx.defaultParallelism());
    Integer reducer  = Integer.parseInt(IOCommon.getProperty("hibench.default.shuffle.parallelism").get());
    JavaPairRDD<String, String> words = lines.mapToPair(new PairFunction<String, String, String>() {
        @Override
        public Tuple2<String, String> call(String s) throws Exception {
            return new Tuple2<String, String>(s.substring(0, 10), s.substring(10));
        }
    });


    JavaPairRDD<String, String> sorted = words.sortByKey(true, reducer);

    JavaRDD<String> result = sorted.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> e) throws Exception {
            return e._1() + e._2();
        }
    });

    result.saveAsTextFile(args[1]);

    ctx.stop();
  }
项目:gspark    文件:JavaLogQuery.java   
public static void main(String[] args) {

    SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);

    JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
      @Override
      public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
        return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
      }
    });

    JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
      @Override
      public Stats call(Stats stats, Stats stats2) {
        return stats.merge(stats2);
      }
    });

    List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
    for (Tuple2<?,?> t : output) {
      System.out.println(t._1() + "\t" + t._2());
    }
    jsc.stop();
  }
项目:gspark    文件:JavaTC.java   
public static void main(String[] args) {
  SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
  JavaSparkContext sc = new JavaSparkContext(sparkConf);
  Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
  JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();

  // Linear transitive closure: each round grows paths by one edge,
  // by joining the graph's edges with the already-discovered paths.
  // e.g. join the path (y, z) from the TC with the edge (x, y) from
  // the graph to obtain the path (x, z).

  // Because join() joins on keys, the edges are stored in reversed order.
  JavaPairRDD<Integer, Integer> edges = tc.mapToPair(
    new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
      @Override
      public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
        return new Tuple2<Integer, Integer>(e._2(), e._1());
      }
  });

  long oldCount;
  long nextCount = tc.count();
  do {
    oldCount = nextCount;
    // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
    // then project the result to obtain the new (x, z) paths.
    tc = tc.union(tc.join(edges).mapToPair(ProjectFn.INSTANCE)).distinct().cache();
    nextCount = tc.count();
  } while (nextCount != oldCount);

  System.out.println("TC has " + tc.count() + " edges.");
  sc.stop();
}
项目:mudrod    文件:SessionCooccurence.java   
/**
 * filter out-of-data metadata
 *
 * @param es
 *          the Elasticsearch drive
 * @param userDatasetsRDD
 *          dataset extracted from session
 * @return filtered session datasets
 */
public JavaPairRDD<String, List<String>> removeRetiredDataset(ESDriver es, JavaPairRDD<String, List<String>> userDatasetsRDD) {

  Map<String, String> nameMap = this.getOnServiceMetadata(es);

  return userDatasetsRDD.mapToPair(new PairFunction<Tuple2<String, List<String>>, String, List<String>>() {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(Tuple2<String, List<String>> arg0) throws Exception {
      List<String> oriDatasets = arg0._2;
      List<String> newDatasets = new ArrayList<>();
      int size = oriDatasets.size();
      for (int i = 0; i < size; i++) {
        String name = oriDatasets.get(i);
        if (nameMap.containsKey(name)) {
          newDatasets.add(nameMap.get(name));
        }
      }
      return new Tuple2<>(arg0._1, newDatasets);
    }
  });

}
项目:mudrod    文件:SessionExtractor.java   
/**
 * bulidDataQueryRDD: convert click stream list to data set queries pairs.
 *
 * @param clickstreamRDD:
 *          click stream data
 * @param downloadWeight:
 *          weight of download behavior
 * @return JavaPairRDD, key is short name of data set, and values are queries
 */
public JavaPairRDD<String, List<String>> bulidDataQueryRDD(JavaRDD<ClickStream> clickstreamRDD, int downloadWeight) {
  return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(ClickStream click) throws Exception {
      List<String> query = new ArrayList<>();
      // important! download behavior is given higher weights
      // than viewing
      // behavior
      boolean download = click.isDownload();
      int weight = 1;
      if (download) {
        weight = downloadWeight;
      }
      for (int i = 0; i < weight; i++) {
        query.add(click.getKeyWords());
      }

      return new Tuple2<>(click.getViewDataset(), query);
    }
  }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public List<String> call(List<String> v1, List<String> v2) throws Exception {
      List<String> list = new ArrayList<>();
      list.addAll(v1);
      list.addAll(v2);
      return list;
    }
  });
}
项目:mudrod    文件:SessionExtractor.java   
public JavaPairRDD<String, Double> bulidUserItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
  return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, Double>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, Double> call(ClickStream click) throws Exception {
      double rate = 1;
      boolean download = click.isDownload();
      if (download) {
        rate = 2;
      }

      String sessionID = click.getSessionID();
      String user = sessionID.split("@")[0];

      return new Tuple2<>(user + "," + click.getViewDataset(), rate);
    }
  }).reduceByKey(new Function2<Double, Double, Double>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Double call(Double v1, Double v2) throws Exception {
      return v1 >= v2 ? v1 : v2;

    }
  });
}
项目:mudrod    文件:MetadataExtractor.java   
/**
 * buildMetadataRDD: Convert metadata list to JavaPairRDD
 *
 * @param es        an Elasticsearch client node instance
 * @param sc        spark context
 * @param index     index name of log processing application
 * @param metadatas metadata list
 * @return PairRDD, in each pair key is metadata short name and value is term
 * list extracted from metadata variables.
 */
protected JavaPairRDD<String, List<String>> buildMetadataRDD(ESDriver es, JavaSparkContext sc, String index, List<PODAACMetadata> metadatas) {
  JavaRDD<PODAACMetadata> metadataRDD = sc.parallelize(metadatas);
  JavaPairRDD<String, List<String>> metadataTermsRDD = metadataRDD.mapToPair(new PairFunction<PODAACMetadata, String, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, List<String>> call(PODAACMetadata metadata) throws Exception {
      return new Tuple2<String, List<String>>(metadata.getShortName(), metadata.getAllTermList());
    }
  }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
    /**
     *
     */
    private static final long serialVersionUID = 1L;

    @Override
    public List<String> call(List<String> v1, List<String> v2) throws Exception {
      List<String> list = new ArrayList<String>();
      list.addAll(v1);
      list.addAll(v2);
      return list;
    }
  });

  return metadataTermsRDD;
}
项目:envelope    文件:DummyStreamInput.java   
@Override
public PairFunction<?, ?, ?> getPrepareFunction() {
  return new PairFunction<Long, Void, Row>() {
    @Override
    public Tuple2<Void, Row> call(Long aLong) throws Exception {
      return new Tuple2<>(null, (Row)new RowWithSchema(schema, aLong));
    }
  };
}
项目:beam    文件:TranslationUtils.java   
/** {@link KV} to pair function. */
public static <K, V> PairFunction<KV<K, V>, K, V> toPairFunction() {
  return new PairFunction<KV<K, V>, K, V>() {
    @Override
    public Tuple2<K, V> call(KV<K, V> kv) {
      return new Tuple2<>(kv.getKey(), kv.getValue());
    }
  };
}
项目:beam    文件:TranslationUtils.java   
/** Extract key from a {@link WindowedValue} {@link KV} into a pair. */
public static <K, V>
    PairFunction<WindowedValue<KV<K, V>>, K, WindowedValue<KV<K, V>>>
        toPairByKeyInWindowedValue() {
  return new PairFunction<WindowedValue<KV<K, V>>, K, WindowedValue<KV<K, V>>>() {
    @Override
    public Tuple2<K, WindowedValue<KV<K, V>>> call(WindowedValue<KV<K, V>> windowedKv)
        throws Exception {
      return new Tuple2<>(windowedKv.getValue().getKey(), windowedKv);
    }
  };
}
项目:beam    文件:TranslationUtils.java   
/**
 * Returns a pair function to convert value to bytes via coder.
 * @param coderMap - mapping between TupleTag and a coder
 * @return a pair function to convert value to bytes via coder
 */
public static PairFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, TupleTag<?>, byte[]>
    getTupleTagEncodeFunction(final Map<TupleTag<?>, Coder<WindowedValue<?>>> coderMap) {
  return new PairFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, TupleTag<?>, byte[]>() {

    @Override public Tuple2<TupleTag<?>, byte[]>
    call(Tuple2<TupleTag<?>, WindowedValue<?>> tuple2) throws Exception {
      TupleTag<?> tupleTag = tuple2._1;
      WindowedValue<?> windowedValue = tuple2._2;
      return new Tuple2<TupleTag<?>, byte[]>
          (tupleTag, CoderHelpers.toByteArray(windowedValue, coderMap.get(tupleTag)));
    }
  };
}
项目:beam    文件:TranslationUtils.java   
/**
 * Returns a pair function to convert bytes to value via coder.
 * @param coderMap - mapping between TupleTag and a coder
 * @return a pair function to convert bytes to value via coder
 * */
public static PairFunction<Tuple2<TupleTag<?>, byte[]>, TupleTag<?>, WindowedValue<?>>
    getTupleTagDecodeFunction(final Map<TupleTag<?>, Coder<WindowedValue<?>>> coderMap) {
  return new PairFunction<Tuple2<TupleTag<?>, byte[]>, TupleTag<?>, WindowedValue<?>>() {

    @Override public Tuple2<TupleTag<?>, WindowedValue<?>>
    call(Tuple2<TupleTag<?>, byte[]> tuple2) throws Exception {
      TupleTag<?> tupleTag = tuple2._1;
      byte[] windowedByteValue = tuple2._2;
      return new Tuple2<TupleTag<?>, WindowedValue<?>>
          (tupleTag, CoderHelpers.fromByteArray(windowedByteValue, coderMap.get(tupleTag)));
    }
  };
}
项目:beam    文件:CoderHelpers.java   
/**
 * A function wrapper for converting a key-value pair to a byte array pair.
 *
 * @param keyCoder Coder to serialize keys.
 * @param valueCoder Coder to serialize values.
 * @param <K>   The type of the key being serialized.
 * @param <V>   The type of the value being serialized.
 * @return A function that accepts a key-value pair and returns a pair of byte arrays.
 */
public static <K, V> PairFunction<Tuple2<K, V>, ByteArray, byte[]> toByteFunction(
    final Coder<K> keyCoder, final Coder<V> valueCoder) {
  return new PairFunction<Tuple2<K, V>, ByteArray, byte[]>() {
    @Override
    public Tuple2<ByteArray, byte[]> call(Tuple2<K, V> kv) {
      return new Tuple2<>(new ByteArray(toByteArray(kv._1(), keyCoder)), toByteArray(kv._2(),
          valueCoder));
    }
  };
}
项目:beam    文件:CoderHelpers.java   
/**
 * A function wrapper for converting a byte array pair to a key-value pair.
 *
 * @param keyCoder Coder to deserialize keys.
 * @param valueCoder Coder to deserialize values.
 * @param <K>   The type of the key being deserialized.
 * @param <V>   The type of the value being deserialized.
 * @return A function that accepts a pair of byte arrays and returns a key-value pair.
 */
public static <K, V> PairFunction<Tuple2<ByteArray, byte[]>, K, V> fromByteFunction(
    final Coder<K> keyCoder, final Coder<V> valueCoder) {
  return new PairFunction<Tuple2<ByteArray, byte[]>, K, V>() {
    @Override
    public Tuple2<K, V> call(Tuple2<ByteArray, byte[]> tuple) {
      return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
          fromByteArray(tuple._2(), valueCoder));
    }
  };
}
项目:nats-connector-spark    文件:UnitTestUtilities.java   
public static JavaPairDStream<String, String> getJavaPairDStream(final File tempDir, final JavaStreamingContext ssc, final String subject1) {
    final JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath());
    JavaPairDStream<String, String> keyValues = lines.mapToPair((PairFunction<String, String, String>) str -> {
                        return new Tuple2<String, String>(subject1 + "." + str, str);
                    });
    return keyValues;
}