数据准备

JavaRdd

    JavaRDD<String> javaRdd = spark.read().textFile("/home/source.csv").javaRDD();

JavaRDD转DataSet需要定义Table的实体类

public class Table implements Serializable {
    private String field1;
    private String field2;
    private String field3;
    private String field4;
    public String getField1() {
        return field1;
    }
    public void setField1(String field1) {
        this.field1 = field1;
    }
    public String getField2() {
        return field1;
    }
    public void setField2(String field2) {
        this.field2 = field2;
    }
    public String getField1() {
        return field1;
    }
    public void setField3(String field3) {
        this.field3 = field3;
    }
    public String getField4() {
        return field4;
    }
    public void setField4(String field4) {
        this.field4 = field4;
    }
     public String toString() {
        return "Table { field1=" + field1 + ", field2=" + field2 + ", field3=" + field3 + ", field4=" + field4 + " }";
    }
}

利用反射生成DataSet

JavaRDD<Table> tablesRDD = javaRdd.map(new Function<String, Table>() {
    public Table call(String eachLine) throws Exception {
        String cells[] = eachLine.split(",");
        Table tableLine = new Table();
        tableLine.setField1(cells[0])
        tableLine.setField2(cells[1])
        tableLine.setField3(cells[2])
        tableLine.setField4(cells[3])
        return tableLine;
    }
});
Dataset<Row> ds = spark.createDataFrame(tablesRDD, Table.class);

只要有树叶飞舞的地方,火就会燃烧。