1 概述
SparkSQL可以根据DataFrame对各种数据源进行操作。DataFrame可以像一个普通的RDD一样运行,或者把它注册为临时表以进行查询。在将DataFrame注册为table后,您可以根据table执行SQL语句。这一节描述一些装入和保存数据的一般方法,包括不同的Spark数据源,然后深入讨论可用于内部构建数据源的选项。
2 通用加载/保存函数
在最简单的情况下,所有操作都会以默认类型数据源来加载数据(默认是Parquet,除非修改了spark.sql.sources.default 配置)。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
1、手动指定选项
您还可以手工指定资料来源,并设定一些其他选项参数。可以通过完整的名称指定数据源(例如,org.apache.spark.sql.parquet),而对于内置的受支持的数据源,可以使用简写名(json,parquet,jdbc)。通过以下语法,由任意类型数据源创建的DataFrame可以转换为其他类型的数据格式。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
2、直接对文件使用SQL
Spark SQL还支持直接对文件使用SQL查询,不需要用read方法把文件加载进来。
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
3、保存模式
Save操作有一个可选参数SaveMode,用这个参数可以指定如何处理数据已经存在的情况。很重要的一点是,这些保存模式都没有加锁,所以其操作也不是原子性的。另外,如果使用Overwrite模式,实际操作是,先删除数据,再写新数据。
仅Scala/Java
|
所有支持的语言
|
含义
|
SaveMode.ErrorIfExists (default)
|
“error” (default)
|
(默认模式)从DataFrame向数据源保存数据时,如果数据已经存在,则抛异常。
|
SaveMode.Append
|
“append”
|
如果数据或表已经存在,则将DataFrame的数据追加到已有数据的尾部。
|
SaveMode.Overwrite
|
“overwrite”
|
如果数据或表已经存在,则用DataFrame数据覆盖之。
|
SaveMode.Ignore
|
“ignore”
|
如果数据已经存在,那就放弃保存DataFrame数据。这和SQL里CREATE TABLE IF NOT EXISTS有点类似。
|
4、保存到持久化表
当使用HiveContext时,DataFrame可以使用saveAsTable方法将数据保存为一个持久的表。不像registerTempTable,saveAsTable保存了DataFrame的实际数据内容,并在HiveMetastore中创建一个游标指针。持久表保持不变,即使Spark程序重新启动也没有影响,只要你连接到相同的metastore来读取它。当读取持久表时,只需将表名作为参数,调用SQLContext.table方法就可以得到相应的DataFrame。
saveAsTable缺省情况下创建一个“managedtable”,也就是说,该表数据的位置由metastore控制。如果删除一个表,那么它的数据将同步删除。
3 Parquet文件
Parquet 是一种流行的列式存储格式。Spark SQL提供对Parquet文件的读写支持,而且Parquet文件能够自动保存原始数据的schema。写Parquet文件的时候,所有的字段都会自动转成nullable,以便向后兼容。
1、编程方式加载数据
仍然使用上面例子中的数据:
// 我们继续沿用之前例子中的sqlContext对象
// 为了支持RDD隐式转成DataFrame
import sqlContext.implicits._
val people: RDD[Person] = ... // 和上面例子中相同,一个包含case class对象的RDD
// 该RDD将隐式转成DataFrame,然后保存为parquet文件
people.write.parquet("people.parquet")
// 读取上面保存的Parquet文件(多个文件 - Parquet保存完其实是很多个文件)。Parquet文件是自描述的,文件中保存了schema信息
// 加载Parquet文件,并返回DataFrame结果
val parquetFile = sqlContext.read.parquet("people.parquet")
// Parquet文件(多个)可以注册为临时表,然后在SQL语句中直接查询
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
2、分区发现
对于Hive等系统,最常用的优化方法之一是表分区。对于支持分区的表,数据保存在不同的目录下,分区键被以编码方式保存在单个分区目录路径中。ParquetDataSource现在还支持自动发现和提取分区信息。举例来说,我们可以将以前使用的人口数据存入一个分区表,它的目录结构如下所示,它有2个附加字段,即gender和country,作为分区键:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
在这个例子中,如果需要读取Parquet文件数据,我们只需要把 path/to/table 作为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL能够自动的从路径中提取出分区信息,随后返回的DataFrame的schema如下:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
注意,分区键的数据类型将是自动推导出来的。目前,只支持数值类型和字符串类型数据作为分区键。
有的用户可能不想要自动推导出来的分区键数据类型。这种情况下,你可以通过 spark.sql.sources.partitionColumnTypeInference.enabled (默认是true)来禁用分区键类型推导。禁用之后,分区键总是被当成字符串类型。
从Spark-1.6.0开始,分区发现默认只在指定目录的子目录中进行。以上面的例子来说,如果用户把 path/to/table/gender=male 作为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load,那么gender就不会被作为分区键。如果用户想要指定分区发现的基础目录,可以通过basePath选项指定。例如,如果把 path/to/table/gender=male作为数据目录,并且将basePath设为 path/to/table,那么gender仍然会最为分区键。
3、Schema合并
像ProtoBuffer、Avro和Thrift一样,Parquet也支持schema演变。用户从一个简单的schema开始,逐渐增加所需的新字段。这样的话,用户最终会得到多个schema不同但互相兼容的Parquet文件。目前,Parquet数据源已经支持自动检测这种情况,并合并所有文件的schema。
因为schema合并相对代价比较大,并且在多数情况下不是必要的,所以从Spark-1.5.0之后,默认是被禁用的。你可以这样启用这一功能:
- 读取Parquet文件时,将选项mergeSchema设为true(见下面的示例代码)
- 或者,将全局选项spark.sql.parquet.mergeSchema设为true
// 继续沿用之前的sqlContext对象
// 为了支持RDD隐式转换为DataFrame
import sqlContext.implicits._
// 创建一个简单的DataFrame,存到一个分区目录中
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// 创建另一个DataFrame放到新的分区目录中,
// 并增加一个新字段,丢弃一个老字段
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// 读取分区表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// 最终的schema将由3个字段组成(single,double,triple)
// 并且分区键出现在目录路径中
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
4、Hive metastore Parquet table转换
在读写Hive metastore Parquet 表时,Spark SQL用的是内部的Parquet支持库,而不是Hive SerDe,因为这样性能更好。这一行为是由spark.sql.hive.convertMetastoreParquet 配置项来控制的,而且默认是启用的。
1)Hive/Parquet schema调和
Hive和Parquet在表结构处理上主要有2个不同点:
- Hive大小写敏感,而Parquet不是
- Hive所有字段都是nullable的,而Parquet需要显示设置
由于以上原因,我们必须在Hive metastore Parquet table转Spark SQL Parquet table的时候,对Hive metastore schema做调整,调整规则如下:
- 两种schema中字段名和字段类型必须一致(不考虑nullable)。调和后的字段类型必须在Parquet格式中有相对应的数据类型,所以nullable是也是需要考虑的。
- 调和后Spark SQL Parquet table schema将包含以下字段:
-
- 只出现在Parquet schema中的字段将被丢弃
- 只出现在Hive metastore schema中的字段将被添加进来,并显式地设为nullable。
2)刷新元数据
Spark SQL会缓存Parquet元数据以提高性能。如果Hive metastore Parquet table转换被启用的话,那么转换过来的schema也会被缓存。这时候,如果这些表由Hive或其他外部工具更新了,你必须手动刷新元数据。
// 注意,这里sqlContext是一个HiveContext
sqlContext.refreshTable("my_table")
5、配置
Parquet配置可以通过 SQLContext.setConf 或者 SQL语句中 SET key=value来指定。
属性名
|
默认值
|
含义
|
spark.sql.parquet.binaryAsString
|
false
|
有些老系统,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不区分二进制数据和字符串类型数据。这个标志的意思是,让Spark SQL把二进制数据当字符串处理,以兼容老系统。
|
spark.sql.parquet.int96AsTimestamp
|
true
|
有些老系统,如:特定版本的Impala,Hive,把时间戳存成INT96。这个配置的作用是,让Spark SQL把这些INT96解释为timestamp,以兼容老系统。
|
spark.sql.parquet.cacheMetadata
|
true
|
缓存Parquet schema元数据。可以提升查询静态数据的速度。
|
spark.sql.parquet.compression.codec
|
gzip
|
设置Parquet文件的压缩编码格式。可接受的值有:uncompressed, snappy, gzip(默认), lzo
|
spark.sql.parquet.filterPushdown
|
true
|
启用过滤器下推优化,可以讲过滤条件尽量推导最下层,已取得性能提升
|
spark.sql.hive.convertMetastoreParquet
|
true
|
如果禁用,Spark SQL将使用Hive SerDe,而不是内建的对Parquet tables的支持
|
spark.sql.parquet.output.committer.class
|
org.apache.parquet.hadoop.
ParquetOutputCommitter
|
Parquet使用的数据输出类。这个类必须是 org.apache.hadoop.mapreduce.OutputCommitter的子类。一般来说,它也应该是 org.apache.parquet.hadoop.ParquetOutputCommitter的子类。注意:1. 如果启用spark.speculation, 这个选项将被自动忽略
2. 这个选项必须用hadoop configuration设置,而不是Spark SQLConf
3. 这个选项会覆盖 spark.sql.sources.outputCommitterClass
Spark SQL有一个内建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 这个类的在输出到S3的时候比默认的ParquetOutputCommitter类效率高。
|
spark.sql.parquet.mergeSchema
|
false
|
如果设为true,那么Parquet数据源将会merge 所有数据文件的schema,否则,schema是从summary file获取的(如果summary file没有设置,则随机选一个)
|
4 JSON数据集
Spark SQL在加载JSON数据的时候,可以自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,即可实现这一转换。
注意,通常所说的json文件只是包含一些json数据的文件,而不是我们所需要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。因此,一个常规的多行json文件经常会加载失败。
// sc是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 数据集是由路径指定的
// 路径既可以是单个文件,也可以还是存储文本文件的目录
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// 推导出来的schema,可由printSchema打印出来
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// 将DataFrame注册为table
people.registerTempTable("people")
// 跑SQL语句吧!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 另一种方法是,用一个包含JSON字符串的RDD来创建DataFrame
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
5 Hive表
SparkSQL支持ApacheHive的读取和写入数据。但是,由于Hive的依赖性太大,所以没有将Hive包含在默认的Spark版本包中。要支持Hive,需要在编译spark时添加-Phive和-Phive-thriftserver标志。因此,当编译打包时,Hive将被包含进去。请注意,hivejar包还必须显示在所有工人节点上,当访问Hive数据时将使用hive(例如:使用hive的序列化和反序列化SerDes)。
Hive配置文件hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件。注意,如果以YARNcluster(yarn-clustermode)模式执行查询,为datanucleuslib_mananged/jar/下面的jar包,以及conf/下的hive-site.xml必须可用于驱动器(driver)和所有执行器(executor)。一个简单的方法是通过spark-submit命令的–jars和–file选项提交它们。
若您使用Hive,您必须构建HiveContext,HiveContext派生于SQLContext,并添加HiveMetastore中的查询表格和HiveQL支持。该用户不具备Hive部署,或者您可以创建HiveContext。若hive-site.xml中未配置HiveContext将在当前目录中自动创建一个metastore_db目录,在HiveConf设置的基础上创建一个warehouse目录(默认的/user/hive/warehourse)。因此要注意,您必须将/user/hive/warehouse的写入权限授予启动spark应用程序的用户。
// sc是一个已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 这里用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
和不同版本的Hive Metastore交互
Spark SQL对Hive最重要的支持之一就是和Hive metastore进行交互,这使得Spark SQL可以访问Hive表的元数据。从Spark-1.4.0开始,Spark SQL有专门单独的二进制build版本,可以用来访问不同版本的Hive metastore,其配置表如下。注意,不管所访问的hive是什么版本,Spark SQL内部都是以Hive 1.2.1编译的,而且内部使用的Hive类也是基于这个版本(serdes,UDFs,UDAFs等)
以下选项可用来配置Hive版本以便访问其元数据:
属性名
|
默认值
|
含义
|
spark.sql.hive.metastore.version
|
1.2.1
|
Hive metastore版本,可选的值为0.12.0 到 1.2.1
|
spark.sql.hive.metastore.jars
|
builtin
|
初始化HiveMetastoreClient的jar包。这个属性可以是以下三者之一:
- builtin
目前内建为使用Hive-1.2.1,编译的时候启用-Phive,则会和spark一起打包。如果没有-Phive,那么spark.sql.hive.metastore.version要么是1.2.1,要就是未定义
- maven
使用maven仓库下载的jar包版本。这个选项建议不要再生产环境中使用
- JVM格式的classpath。这个classpath必须包含所有Hive及其依赖的jar包,且包含正确版本的hadoop。这些jar包必须部署在driver节点上,如果你使用yarn-cluster模式,那么必须确保这些jar包也随你的应用程序一起打包
|
spark.sql.hive.metastore.sharedPrefixes
|
com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc
|
一个逗号分隔的类名前缀列表,这些类使用classloader加载,且可以在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就需要这种共享。其他需要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender
|
spark.sql.hive.metastore.barrierPrefixes
|
(empty)
|
一个逗号分隔的类名前缀列表,这些类在每个Spark SQL所访问的Hive版本中都会被显式的reload。例如,某些在共享前缀列表(spark.sql.hive.metastore.sharedPrefixes)中声明为共享的Hive UD函数
|