sparksql結(jié)合hive最佳實踐
一、Spark SQL快速上手
1、Spark SQL是什么
Spark SQL 是一個用來處理結(jié)構(gòu)化數(shù)據(jù)的spark組件。它提供了一個叫做DataFrames的可編程抽象數(shù)據(jù)模型,并且可被視為一個分布式的SQL查詢引擎。
2、Spark SQL的基礎(chǔ)數(shù)據(jù)模型-----DataFrames
DataFrame是由“命名列”(類似關(guān)系表的字段定義)所組織起來的一個分布式數(shù)據(jù)集合。你可以把它看成是一個關(guān)系型數(shù)據(jù)庫的表。
DataFrame可以通過多種來源創(chuàng)建:結(jié)構(gòu)化數(shù)據(jù)文件,hive的表,外部數(shù)據(jù)庫,或者RDDs
3、Spark SQL如何使用
首先,利用sqlContext從外部數(shù)據(jù)源加載數(shù)據(jù)為DataFrame
然后,利用DataFrame上豐富的api進行查詢、轉(zhuǎn)換
最后,將結(jié)果進行展現(xiàn)或存儲為各種外部數(shù)據(jù)形式
如圖所示:
4、Spark SQL代碼示例
? 加載數(shù)據(jù)
sqlContext支持從各種各樣的數(shù)據(jù)源中創(chuàng)建DataFrame,內(nèi)置支持的數(shù)據(jù)源有parquetFile,jsonFile,外部數(shù)據(jù)庫,hive表,RDD等,另外,hbase等數(shù)據(jù)源的支持也在社區(qū)不斷涌現(xiàn)
# 從Hive中的users表構(gòu)造DataFrame
users = sqlContext.table("users")
# 加載S3上的JSON文件
logs = sqlContext.load("s3n://path/to/data.json", "json")
# 加載HDFS上的Parquet文件
clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet")
# 通過JDBC訪問MySQL
comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user")
# 將普通RDD轉(zhuǎn)變?yōu)镈ataFrame
rdd = sparkContext.textFile("article.txt") \
.flatMap(_.split(" ")) \
.map((_, 1)) \
.reduceByKey(_+_) \
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"])
# 將本地數(shù)據(jù)容器轉(zhuǎn)變?yōu)镈ataFrame
data = [("Alice", 21), ("Bob", 24)]
people = sqlContext.createDataFrame(data, ["name", "age"])
? 使用DataFrame
Spark DataFrame提供了一整套用于操縱數(shù)據(jù)的DSL。這些DSL在語義上與SQL關(guān)系查詢非常相近(這也是Spark SQL能夠為DataFrame提供無縫支持的重要原因之一)。以下是一組用戶數(shù)據(jù)分析示例:
# 創(chuàng)建一個只包含年齡小于21歲用戶的DataFrame
young = users.filter(users.age < 21)
# 也可以使用Pandas風(fēng)格的語法
young = users[users.age < 21]
# 將所有人的年齡加1
young.select(young.name, young.age + 1)
# 統(tǒng)計年輕用戶中各性別人數(shù)
young.groupBy("gender").count()
# 將所有年輕用戶與另一個名為logs的DataFrame聯(lián)接起來
young.join(logs, logs.userId == users.userId, "left_outer")
除DSL以外,我們當(dāng)然也可以使用熟悉的SQL來處理DataFrame:
young.registerTempTable("young")
sqlContext.sql("SELECT count(*) FROM young")
? 保存結(jié)果
對數(shù)據(jù)的分析完成之后,可以將結(jié)果保存在多種形式的外部存儲中
# 追加至HDFS上的Parquet文件
young.save(path="hdfs://path/to/data.parquet", source="parquet", mode="append")
# 覆寫S3上的JSON文件
young.save(path="s3n://path/to/data.json", source="json",mode="append")
# 保存為Hive的內(nèi)部表
young.saveAsTable(tableName="young", source="parquet" mode="overwrite")
# 轉(zhuǎn)換為Pandas DataFrame(Python API特有功能)
pandasDF = young.toPandas()
# 以表格形式打印輸出
young.show()
二、SparkSQL操作Hive中的表數(shù)據(jù)
spark可以通過讀取hive的元數(shù)據(jù)來兼容hive,讀取hive的表數(shù)據(jù),然后在spark引擎中進行sql統(tǒng)計分析,從而,通過sparksql與hive結(jié)合實現(xiàn)數(shù)據(jù)分析將成為一種最佳實踐。詳細實現(xiàn)步驟如下:
1、啟動hive的元數(shù)據(jù)服務(wù)
hive可以通過服務(wù)的形式對外提供元數(shù)據(jù)讀寫操作,通過簡單的配置即可
? 編輯 $HIVE_HOME/conf/hive-site.xml,增加如下內(nèi)容:
<property>
<name>hive.metastore.uris</name>
<value>thrift:// hdp-node-01:9083</value>
</property>
? 啟動hive metastore
[hadoop@hdp-node-01 ~]${HIVE_HOME}/bin/hive --service metastore 1>/dev/null 2>&1 &
? 查看 metastore:
[hadoop@hdp-node-01 ~] jobs
[1]+ Running hive --service metastore &
2、spark配置
? 將hive的配置文件拷貝給spark
將 $HIVE_HOME/conf/hive-site.xml copy或者軟鏈 到 $SPARK_HOME/conf/
? 將mysql的jdbc驅(qū)動包拷貝給spark
將 $HIVE_HOME/lib/mysql-connector-java-5.1.12.jar copy或者軟鏈到$SPARK_HOME/lib/
3、啟動spark-sql的shell交互界面
spark-sql已經(jīng)集成在spark-shell中,因此,只要啟動spark-shell,就可以使用spakr-sql的shell交互接口:
[hadoop@hdp-node-01 spark] bin/spark-shell --master spark://hdp-node-01:7077
或者,可以啟動spark-sql界面,使用起來更方便
[hadoop@hdp-node-01 spark] bin/spark-sql --master spark://hdp-node-01:7077
4、在交互界面輸入sql進行查詢
注:以下所用到的庫和表,都是已經(jīng)在hive中存在的庫和表
? 如果在spark-shell中執(zhí)行sql查詢,使用sqlContext對象調(diào)用sql()方法
scala> sqlContext.sql("select remote_addr from dw_weblog.t_ods_detail group by remote_addr").collect.foreach(println)
? 如果是在spark-sql中執(zhí)行sql查詢,則可以直接輸入sql語句
scala> show databases
scala> use dw_weblog
scala> select remote_addr from dw_weblog.t_ods_detail group by remote_addr
5、在IDEA中編寫代碼使用hive-sql
如下所示:
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
import hiveContext.sql
//指定庫
sql("use dw_weblog")
//執(zhí)行標(biāo)準(zhǔn)sql語句
sql("create table sparksql as select remote_addr,count(*) from t_ods_detail group by remote_addr")
……
綜上所述,sparksql類似于hive,可以支持sql語法來對海量數(shù)據(jù)進行分析查詢,跟hive不同的是,hive執(zhí)行sql任務(wù)的底層運算引擎采用mapreduce運算框架,而sparksql執(zhí)行sql任務(wù)的運算引擎是spark core,從而充分利用spark內(nèi)存計算及DAG模型的優(yōu)勢,大幅提升海量數(shù)據(jù)的分析查詢速度
源碼
sparksql結(jié)合hive最佳實踐<br />
一、Spark SQL快速上手<br />
1、Spark SQL是什么<br />
Spark SQL 是一個用來處理結(jié)構(gòu)化數(shù)據(jù)的spark組件。它提供了一個叫做DataFrames的可編程抽象數(shù)據(jù)模型,并且可被視為一個分布式的SQL查詢引擎。<br />
<br />
2、Spark SQL的基礎(chǔ)數(shù)據(jù)模型-----DataFrames<br />
DataFrame是由“命名列”(類似關(guān)系表的字段定義)所組織起來的一個分布式數(shù)據(jù)集合。你可以把它看成是一個關(guān)系型數(shù)據(jù)庫的表。<br />
DataFrame可以通過多種來源創(chuàng)建:結(jié)構(gòu)化數(shù)據(jù)文件,hive的表,外部數(shù)據(jù)庫,或者RDDs<br />
<br />
3、Spark SQL如何使用<br />
首先,利用sqlContext從外部數(shù)據(jù)源加載數(shù)據(jù)為DataFrame<br />
然后,利用DataFrame上豐富的api進行查詢、轉(zhuǎn)換<br />
最后,將結(jié)果進行展現(xiàn)或存儲為各種外部數(shù)據(jù)形式<br />
如圖所示:
<div style="text-align: center;"><img alt="" src="/files/image/201512/20151229153449294.jpg" style="width: 400px; height: 253px;" /></div>
<br />
4、Spark SQL代碼示例<br />
? 加載數(shù)據(jù)<br />
sqlContext支持從各種各樣的數(shù)據(jù)源中創(chuàng)建DataFrame,內(nèi)置支持的數(shù)據(jù)源有parquetFile,jsonFile,外部數(shù)據(jù)庫,hive表,RDD等,另外,hbase等數(shù)據(jù)源的支持也在社區(qū)不斷涌現(xiàn)<br />
# 從Hive中的users表構(gòu)造DataFrame<br />
users = sqlContext.table("users")<br />
# 加載S3上的JSON文件<br />
logs = sqlContext.load("s3n://path/to/data.json", "json")<br />
# 加載HDFS上的Parquet文件<br />
clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet")<br />
# 通過JDBC訪問MySQL<br />
comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user")<br />
# 將普通RDD轉(zhuǎn)變?yōu)镈ataFrame<br />
rdd = sparkContext.textFile("article.txt") \<br />
.flatMap(_.split(" ")) \<br />
.map((_, 1)) \<br />
.reduceByKey(_+_) \<br />
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"])<br />
<br />
# 將本地數(shù)據(jù)容器轉(zhuǎn)變?yōu)镈ataFrame<br />
data = [("Alice", 21), ("Bob", 24)]<br />
people = sqlContext.createDataFrame(data, ["name", "age"])<br />
<br />
? 使用DataFrame<br />
Spark DataFrame提供了一整套用于操縱數(shù)據(jù)的DSL。這些DSL在語義上與SQL關(guān)系查詢非常相近(這也是Spark SQL能夠為DataFrame提供無縫支持的重要原因之一)。以下是一組用戶數(shù)據(jù)分析示例:<br />
# 創(chuàng)建一個只包含年齡小于21歲用戶的DataFrame<br />
young = users.filter(users.age < 21)<br />
<br />
# 也可以使用Pandas風(fēng)格的語法<br />
young = users[users.age < 21]<br />
# 將所有人的年齡加1<br />
young.select(young.name, young.age + 1)<br />
# 統(tǒng)計年輕用戶中各性別人數(shù)<br />
young.groupBy("gender").count()<br />
# 將所有年輕用戶與另一個名為logs的DataFrame聯(lián)接起來<br />
young.join(logs, logs.userId == users.userId, "left_outer")<br />
除DSL以外,我們當(dāng)然也可以使用熟悉的SQL來處理DataFrame:<br />
young.registerTempTable("young")<br />
sqlContext.sql("SELECT count(*) FROM young")<br />
<br />
? 保存結(jié)果<br />
對數(shù)據(jù)的分析完成之后,可以將結(jié)果保存在多種形式的外部存儲中<br />
# 追加至HDFS上的Parquet文件<br />
young.save(path="hdfs://path/to/data.parquet", source="parquet", mode="append")<br />
<br />
# 覆寫S3上的JSON文件<br />
young.save(path="s3n://path/to/data.json", source="json",mode="append")<br />
<br />
# 保存為Hive的內(nèi)部表<br />
young.saveAsTable(tableName="young", source="parquet" mode="overwrite")<br />
<br />
# 轉(zhuǎn)換為Pandas DataFrame(Python API特有功能)<br />
pandasDF = young.toPandas()<br />
<br />
# 以表格形式打印輸出<br />
young.show()<br />
<br />
<br />
二、SparkSQL操作Hive中的表數(shù)據(jù)<br />
spark可以通過讀取hive的元數(shù)據(jù)來兼容hive,讀取hive的表數(shù)據(jù),然后在spark引擎中進行sql統(tǒng)計分析,從而,通過sparksql與hive結(jié)合實現(xiàn)數(shù)據(jù)分析將成為一種最佳實踐。詳細實現(xiàn)步驟如下:<br />
<br />
1、啟動hive的元數(shù)據(jù)服務(wù)<br />
hive可以通過服務(wù)的形式對外提供元數(shù)據(jù)讀寫操作,通過簡單的配置即可<br />
? 編輯 $HIVE_HOME/conf/hive-site.xml,增加如下內(nèi)容:<br />
<property><br />
<name>hive.metastore.uris</name><br />
<value>thrift:// hdp-node-01:9083</value><br />
</property><br />
<br />
? 啟動hive metastore<br />
[hadoop@hdp-node-01 ~]${HIVE_HOME}/bin/hive --service metastore 1>/dev/null 2>&1 &<br />
<br />
? 查看 metastore:<br />
[hadoop@hdp-node-01 ~] jobs<br />
[1]+ Running hive --service metastore &<br />
<br />
<br />
2、spark配置<br />
? 將hive的配置文件拷貝給spark<br />
將 $HIVE_HOME/conf/hive-site.xml copy或者軟鏈 到 $SPARK_HOME/conf/<br />
<br />
? 將mysql的jdbc驅(qū)動包拷貝給spark<br />
將 $HIVE_HOME/lib/mysql-connector-java-5.1.12.jar copy或者軟鏈到$SPARK_HOME/lib/<br />
<br />
3、啟動spark-sql的shell交互界面<br />
spark-sql已經(jīng)集成在spark-shell中,因此,只要啟動spark-shell,就可以使用spakr-sql的shell交互接口:<br />
[hadoop@hdp-node-01 spark] bin/spark-shell --master spark://hdp-node-01:7077
<div style="text-align: center;"><img alt="" src="/files/image/201512/20151229153518583.png" style="width: 400px; height: 170px;" /> </div>
<br />
或者,可以啟動spark-sql界面,使用起來更方便<br />
[hadoop@hdp-node-01 spark] bin/spark-sql --master spark://hdp-node-01:7077<br />
<br />
4、在交互界面輸入sql進行查詢<br />
注:以下所用到的庫和表,都是已經(jīng)在hive中存在的庫和表<br />
<br />
? 如果在spark-shell中執(zhí)行sql查詢,使用sqlContext對象調(diào)用sql()方法<br />
scala> sqlContext.sql("select remote_addr from dw_weblog.t_ods_detail group by remote_addr").collect.foreach(println)<br />
<br />
? 如果是在spark-sql中執(zhí)行sql查詢,則可以直接輸入sql語句<br />
scala> show databases<br />
scala> use dw_weblog<br />
scala> select remote_addr from dw_weblog.t_ods_detail group by remote_addr<br />
<br />
<br />
<br />
5、在IDEA中編寫代碼使用hive-sql<br />
如下所示:<br />
val hiveContext = new HiveContext(sc)<br />
import hiveContext.implicits._<br />
import hiveContext.sql<br />
//指定庫<br />
sql("use dw_weblog")<br />
//執(zhí)行標(biāo)準(zhǔn)sql語句<br />
sql("create table sparksql as select remote_addr,count(*) from t_ods_detail group by remote_addr")<br />
……<br />
<br />
<br />
<br />
<br />
綜上所述,sparksql類似于hive,可以支持sql語法來對海量數(shù)據(jù)進行分析查詢,跟hive不同的是,hive執(zhí)行sql任務(wù)的底層運算引擎采用mapreduce運算框架,而sparksql執(zhí)行sql任務(wù)的運算引擎是spark core,從而充分利用spark內(nèi)存計算及DAG模型的優(yōu)勢,大幅提升海量數(shù)據(jù)的分析查詢速度<br />