教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

多種方法創(chuàng)建DataFrame【大數(shù)據(jù)技術文章】

更新時間:2021年03月23日14時14分 來源:傳智教育 瀏覽次數(shù):

在Spark2.0版本之前,Spark SQL中的SQLContext是創(chuàng)建DataFrame和執(zhí)行SQL的入口,我們可以利用HiveContext接口,通過HiveQL語句操作Hive表數(shù)據(jù),實現(xiàn)數(shù)據(jù)查詢功能。而在Spark2.0之后,Spark使用全新的SparkSession接口替代SQLContext及HiveContext接口完成數(shù)據(jù)的加載、轉(zhuǎn)換、處理等功能。

創(chuàng)建SparkSession對象可以通過“SparkSession.builder().getOrCreate()”方法獲取,但當我們使用Spark-Shell編寫程序時,Spark-Shell客戶端會默認提供了一個名為“sc”的SparkContext對象和一個名為“spark”的SparkSession對象,因此我們可以直接使用這兩個對象,不需要自行創(chuàng)建。啟動Spark-Shell命令如下所示。

$ spark-shell --master local[2]

在啟動Spark-Shell完成后,效果如圖1所示。

DataFrame的創(chuàng)建方法【大數(shù)據(jù)文章】

圖1 啟動Spark-Shell

在圖1中可以看出,SparkContext、SparkSession對象已創(chuàng)建完成。創(chuàng)建DataFrame有多種方式,最基本的方式是從一個已經(jīng)存在的RDD調(diào)用toDF()方法進行轉(zhuǎn)換得到DataFrame,或者通過Spark讀取數(shù)據(jù)源直接創(chuàng)建。

在創(chuàng)建DataFrame之前,為了支持RDD轉(zhuǎn)換成DataFrame及后續(xù)的SQL操作,需要導入spark.implicits._包啟用隱式轉(zhuǎn)換。若使用SparkSession方式創(chuàng)建DataFrame,可以使用spark.read操作,從不同類型的文件中加載數(shù)據(jù)創(chuàng)建DataFrame,具體操作API如表1所示。

表1 spark.read操作

代碼示例 描述
spark.read.text("people.txt") 讀取txt格式的文本文件,創(chuàng)建DataFrame                                 
spark.read.csv ("people.csv") 讀取csv格式的文本文件,創(chuàng)建DataFrame
spark.read.json("people.json") 讀取json格式的文本文件,創(chuàng)建DataFrame
spark.read.parquet("people.parquet") 讀取parquet格式的文本文件,創(chuàng)建DataFrame

1.數(shù)據(jù)準備

在HDFS文件系統(tǒng)中的/spark目錄中有一個person.txt文件,內(nèi)容如文件1所示。

文件1 person.txt

 zhangsan 20
 lisi 29
 wangwu 25
 zhaoliu 30
 tianqi 35
 jerry 40

2.通過文件直接創(chuàng)建DataFrame

我們通過Spark讀取數(shù)據(jù)源的方式進行創(chuàng)建DataFrame,在Spark-Shell輸入下列代碼:

scala > val personDF = spark.read.text("/spark/person.txt")
personDF: org.apache.spark.sql.DataFrame = [value: String]
scala > personDF.printSchema()
root
 |-- value: String (Nullable = true)

從上述返回結(jié)果personDF的屬性可以看出,創(chuàng)建DataFrame對象完成,之后調(diào)用DataFrame的printSchema()方法可以打印當前對象的Schema元數(shù)據(jù)信息。從返回結(jié)果可以看出,當前value字段是String數(shù)據(jù)類型,并且還可以為Null。

使用DataFrame的show()方法可以查看當前DataFrame的結(jié)果數(shù)據(jù),具體代碼和返回結(jié)果如下所示。

scala > personDF.show()
+-------------+                          
|   value   |
+-------------+
|1 zhangsan 20|
|2 lisi    29|
|3 wangwu  25|
|4 zhaoliu 30|
|5 tianqi  35|
|6 jerry  40|
+-------------+

從上述返回結(jié)果看出,當前personDF對象中的6條記錄就對應了person.txt文本文件中的數(shù)據(jù)。

3.RDD轉(zhuǎn)換DataFrame

調(diào)用RDD的toDF()方法,可以將RDD轉(zhuǎn)換為DataFrame對象,具體代碼如下所示。

   scala > val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))
   lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at
   map at <console>:24
   scala > case class Person(id:Int,name:String,age:Int)
   defined class Person
   scala > val personRDD = 
lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
   personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map
   at <console>:27
   scala > val personDF = personRDD.toDF()
  personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more
  field]
  scala > personDF.show
  +----+--------+----+
  | id |  name | age|
  +----+--------+----+
  | 1 |zhangsan | 20|
  | 2 |lisi   |  29|
  | 3 |wangwu  |  25|
  | 4 |zhaoliu |  30|
  | 5 |tianqi  |  35|
  | 6 |jerry  |  40|
  +----+--------+----+
  scala > personDF.printSchema
  root
   |-- id: integer (nullable = false)
   |-- name: string (nullable = true)
   |-- age: integer (nullable = false)

在上述代碼中,第1行代碼將文本文件轉(zhuǎn)換成RDD,第4行代碼定義Person樣例類,相當于定義表的Schema元數(shù)據(jù)信息,第6行代碼表示使RDD中的數(shù)組數(shù)據(jù)與樣例類進行關聯(lián),最終會將RDD[Array[String]]更改為RDD[Person],第9行代碼表示調(diào)用RDD的toDF()方法,就可以把RDD轉(zhuǎn)換成了DataFrame了。第12-27行代碼表示調(diào)用DataFrame方法并從返回結(jié)果可以看出,RDD對象成功轉(zhuǎn)換DataFrame。




猜你喜歡:

Redis、傳統(tǒng)數(shù)據(jù)庫、HBase以及Hive的區(qū)別

怎樣安裝和配置Sqoop?

DataFrame是什么意思?與RDD相比有哪些優(yōu)點?

大數(shù)據(jù)Hadoop生態(tài)圈包含哪些子系統(tǒng)?

傳智教育大數(shù)據(jù)項目開發(fā)培訓

0 分享到:
和我們在線交談!