SQLContext
val sc: SparkContext // 假设已经有一个 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 用于包含RDD到DataFrame隐式转换操作
import sqlContext.implicits._
RDD
val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// 将DataFrame内容打印到stdout
df.show()
DataFrame提供了结构化数据的领域专用语言支持,包括Scala, Java, Python and R。
这里我们给出一个结构化数据处理的基本示例:
val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 创建一个 DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// 展示 DataFrame 的内容
df.show()
// age name
// null Michael
// 30 Andy
// 19 Justin
// 打印数据树形结构
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin
// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// 筛选出年龄大于21的人
df.filter(df("age") > 21).show()
// age name
// 30 Andy
// 计算各个年龄的人数
df.groupBy("age").count().show()
// age count
// null 1
// 19 1
// 30 1
val sqlContext = ... // 已有一个 SQLContext 对象
val df = sqlContext.sql("SELECT * FROM table")
// 对普通类型数据的Encoder是由 importing sqlContext.implicits._ 自动提供的
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)
// 以下这行不仅定义了case class,同时也自动为其创建了Encoder
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()
// DataFrame 只需提供一个和数据schema对应的class即可转换为 Dataset。Spark会根据字段名进行映射。
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]
// sc 是已有的 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 为了支持RDD到DataFrame的隐式转换
import sqlContext.implicits._
// 定义一个case class.
// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
// 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema
case class Person(name: String, age: Int)
// 创建一个包含Person对象的RDD,并将其注册成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// sqlContext.sql方法可以直接执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子
// 查询结果中每行的字段可以按字段索引访问:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// 或者按字段名访问:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回结果: Map("name" -> "Justin", "age" -> 19)
// sc 是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 创建一个RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// 数据的schema被编码与一个字符串中
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL 各个数据类型
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// 基于前面的字符串生成schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// 将RDD[people]的各个记录转换为Rows,即:得到一个包含Row对象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// 将schema应用到包含Row对象的RDD上,得到一个DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// 将DataFrame注册为table
peopleDataFrame.registerTempTable("people")
// 执行SQL语句
val results = sqlContext.sql("SELECT name FROM people")
// SQL查询的结果是DataFrame,且能够支持所有常见的RDD算子
// 并且其字段可以以索引访问,也可以用字段名访问
results.map(t => "Name: " + t(0)).collect().foreach(println)