Spark SQL通過(guò)JDBC連接MySQL讀寫數(shù)據(jù)
Spark SQL可以通過(guò)JDBC從關(guān)系型數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)的方式創(chuàng)建DataFrame,通過(guò)對(duì)DataFrame一系列的計(jì)算后,還可以將數(shù)據(jù)再寫回關(guān)系型數(shù)據(jù)庫(kù)中。
一.從MySQL中加載數(shù)據(jù)(Spark Shell方式)
1.啟動(dòng)Spark Shell,必須指定mysql連接驅(qū)動(dòng)jar包
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell \
--master spark://node1.itcast.cn:7077 \
--jars /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
--driver-class-path /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar
2.從mysql中加載數(shù)據(jù)
val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.10.1:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "person", "user" -> "root", "password" -> "123456")).load()
3.執(zhí)行查詢
jdbcDF.show()
二.將數(shù)據(jù)寫入到MySQL中(打jar包方式)
1.編寫Spark SQL程序
package cn.itcast.spark.sql
import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object JdbcRDD {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("MySQL-Demo")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//通過(guò)并行化創(chuàng)建RDD
val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
//通過(guò)StructType直接指定每個(gè)字段的schema
val schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
)
//將RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
//將schema信息應(yīng)用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//創(chuàng)建Properties存儲(chǔ)數(shù)據(jù)庫(kù)相關(guān)屬性
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "123456")
//將數(shù)據(jù)追加到數(shù)據(jù)庫(kù)
personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.10.1:3306/bigdata", "bigdata.person", prop)
//停止SparkContext
sc.stop()
}
}
2.用maven將程序打包
3.將Jar包提交到spark集群
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \
--class cn.itcast.spark.sql.JdbcRDD \
--master spark://node1.itcast.cn:7077 \
--jars /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
--driver-class-path /usr/local/spark-1.5.2-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
/root/spark-mvn-1.0-SNAPSHOT.jar