0%

rdd转df的两种方式

分为编程方式和反射方式两种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  /**
* 使用编程方式
*/
def programmatically(spark: SparkSession): Unit = {
// RDD=>DF时需要的隐式转换
import spark.implicits._
// 创建RDD
val rdd = spark.sparkContext.textFile("ruozedata-spark-sql/data/info.txt")

// STEP1: RDD[String] ==> RDD[Row]
val infoRDD: RDD[Row] = rdd.map(x => {
val splits = x.split(",")
val id = splits(0).trim.toInt
val name = splits(1).trim
val age = splits(2).trim.toInt
Row(id, name, age)
})

// STEP2: schema
val schema = StructType(
StructField("id", IntegerType, true) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) :: Nil)

// STEP3: createDataFrame
val df = spark.createDataFrame(infoRDD, schema)
df.printSchema()
df.show()
}

/**
* 使用反射方式将RDD转成DF
*/
def reflection(spark: SparkSession): Unit = {
// RDD=>DF时需要的隐式转换
import spark.implicits._

// 创建RDD
val rdd = spark.sparkContext.textFile("ruozedata-spark-sql/data/info.txt")

// RDD[String] ==> case class
val infoDF = rdd.map(x => {
val splits = x.split(",")
val id = splits(0).trim.toInt
val name = splits(1).trim
val age = splits(2).trim.toInt
Info(id, name, age)
}).toDF() // 最终转成DF

infoDF.printSchema()
infoDF.show()
}

case class Info(id: Int, name: String, age: Int)
}