小编典典

使用Apache Spark和Java将CSV解析为DataFrame / DataSet

java

我是新手,我想使用group-by&reduce从CSV中查找以下内容(按聘用者一行):

      Department, Designation, costToCompany, State
      Sales, Trainee, 12000, UP
      Sales, Lead, 32000, AP
      Sales, Lead, 32000, LA
      Sales, Lead, 32000, TN
      Sales, Lead, 32000, AP
      Sales, Lead, 32000, TN 
      Sales, Lead, 32000, LA
      Sales, Lead, 32000, LA
      Marketing, Associate, 18000, TN
      Marketing, Associate, 18000, TN
      HR, Manager, 58000, TN

我想通过按 部门,指定,州* 和其他列(以 sum(costToCompany)TotalEmployeeCount
来简化CSV 格式)
*

应该得到如下结果:

      Dept, Desg, state, empCount, totalCost
      Sales,Lead,AP,2,64000
      Sales,Lead,LA,3,96000  
      Sales,Lead,TN,2,64000

有什么方法可以使用转换和操作来实现这一目标。还是我们应该进行RDD操作?


阅读 333

收藏
2020-09-28

共1个答案

小编典典

程序

  • 创建一个类(模式)以封装您的结构(方法B不是必需的,但是如果使用Java,它将使您的代码更易于阅读)

    public class Record implements Serializable {
    

    String department;
    String designation;
    long costToCompany;
    String state;
    // constructor , getters and setters
    }

  • 加载CVS(JSON)文件

        JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified


    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });

此时,您有2种方法:

A.SparkSQL

  • 注册一个表(使用您定义的模式类)
        JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
  • 用所需的查询分组查询表
        JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
  • 在这里,您还可以使用SQL方法执行所需的任何其他查询

火花

  • 使用复合密钥映射:DepartmentDesignationState
        JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }

});

  • 使用组合键,求和costToCompany列和按键累积记录数的reduceByKey
        JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
2020-09-28