我是新手,我想使用group-by&reduce从CSV中查找以下内容(按聘用者一行):
Department, Designation, costToCompany, State Sales, Trainee, 12000, UP Sales, Lead, 32000, AP Sales, Lead, 32000, LA Sales, Lead, 32000, TN Sales, Lead, 32000, AP Sales, Lead, 32000, TN Sales, Lead, 32000, LA Sales, Lead, 32000, LA Marketing, Associate, 18000, TN Marketing, Associate, 18000, TN HR, Manager, 58000, TN
我想通过按 部门,指定,州* 和其他列(以 sum(costToCompany) 和 TotalEmployeeCount 来简化CSV 格式) *
应该得到如下结果:
Dept, Desg, state, empCount, totalCost Sales,Lead,AP,2,64000 Sales,Lead,LA,3,96000 Sales,Lead,TN,2,64000
有什么方法可以使用转换和操作来实现这一目标。还是我们应该进行RDD操作?
创建一个类(模式)以封装您的结构(方法B不是必需的,但是如果使用Java,它将使您的代码更易于阅读)
public class Record implements Serializable {
String department; String designation; long costToCompany; String state; // constructor , getters and setters }
加载CVS(JSON)文件
JavaSparkContext sc; JavaRDD<String> data = sc.textFile("path/input.csv"); //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified JavaRDD<Record> rdd_records = sc.textFile(data).map( new Function<String, Record>() { public Record call(String line) throws Exception { // Here you can use JSON // Gson gson = new Gson(); // gson.fromJson(line, Record.class); String[] fields = line.split(","); Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; } });
此时,您有2种方法:
JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class); table.registerAsTable("record_table"); table.printSchema();
JavaSchemaRDD res = sqlContext.sql(" select department,designation,state,sum(costToCompany),count(*) from record_table group by department,designation,state ");
Department
Designation
State
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = rdd_records.mapToPair(new PairFunction<Record, String, Tuple2<Long, Integer>>(){ public Tuple2<String, Tuple2<Long, Integer>> call(Record record){ Tuple2<String, Tuple2<Long, Integer>> t2 = new Tuple2<String, Tuple2<Long,Integer>>( record.Department + record.Designation + record.State, new Tuple2<Long, Integer>(record.costToCompany,1) ); return t2; }
});
costToCompany
JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long, Integer>>() { public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1, Tuple2<Long, Integer> v2) throws Exception { return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2); } });