专业的编程技术博客社区

网站首页 > 博客文章 正文

Spark源码阅读:SparkSession类之spark对象的使用

baijin 2024-10-07 06:07:52 博客文章 5 ℃ 0 评论

之前我们聊过 spark-shell 的代码实现,它是我们了解、学习或调试 Spark作业 的重要工具。这个系列我们了解一下SparkSession类的代码实现。本篇作为起始,先介绍下 spark 对象常见的使用方式,包括 Dataset API 和 Spark SQL;后续会详细介绍 SparkSession 对象创建以及功能相关的代码逻辑。

前置条件

本地构建时,增加对 hive 和 JDBC的支持

./build/mvn -Pyarn -Phive -Phive-thriftserver -DskipTests clean package

启动 bin/spark-shell,并做一些简单的测试

./bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/10 11:58:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.0.109:4040
Spark context available as 'sc' (master = local[*], app id = local-1649563140564).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/
         
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_301)
Type in expressions to have them evaluated.
Type :help for more information.

在启动日志里,我们可以看到 Spark context Web UI available at http://192.168.0.109:4040 的字样,Command+双击 在浏览器里打开该链接。切换到 Environment Tab 可以看到application配置,如下图所示:

SparkSession类的成员变量 sessionState 存储了 spark 对象的各种状态,比如 SQL 配置参数、创建的临时表、注册的UDF、catalog (HiveSessionCatalog)、sqlParser、plan 的 analyzerBuilder、optimizerBuilder和planner 等。对于配置信息,我们可以在 shell 里执行 Scala 代码进行查看:

scala> spark.sessionState.conf.getAllConfs.foreach(println(_))
(spark.sql.warehouse.dir,file:/Users/user/go/src/github.com/apache/spark/spark-warehouse)
(spark.driver.host,192.168.0.109)
(spark.driver.port,57611)
(spark.repl.class.uri,spark://192.168.0.109:57611/classes)
(spark.jars,)
(spark.repl.class.outputDir,/private/var/folders/f4/cq91c9wj297fqb4c42tllrd80000gp/T/spark-462c360a-1b35-4b0f-a466-577d4c885f85/repl-11347ed9-e8d9-4db6-8d10-978de2bbe811)
(spark.app.name,Spark shell)
(spark.submit.pyFiles,)
(spark.ui.showConsoleProgress,true)
(spark.app.startTime,1649558999861)
(spark.executor.id,driver)
(spark.submit.deployMode,client)
(spark.master,local[*])
(spark.home,/Users/user/go/src/github.com/apache/spark)
(spark.sql.catalogImplementation,hive)
(spark.app.id,local-1649559001343)


一个案例:读取本地文件写入Hive。

为了便于后面读代码,我们先看一个例子:读取本地的 examples/src/main/resources/people.txt 的文件,并写入Hive表 poeple_t 里。

people.txt 里只有三行,两列分别是 name 和 age:

Michael, 29
Andy, 30
Justin, 19

下面我们将文件读取到一个结构化的 DataFrame 里:

import org.apache.spark.sql._
import org.apache.spark.sql.types._

// 创建schema
val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)

// 
val people =
  sc.textFile("examples/src/main/resources/people.txt").map(
    _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val df = spark.createDataFrame(people, schema)
df.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)

注意:这里 sc 和 spark.sparkContext 是同一个对象,它是Spark的功能入口,代表Spark集群的一个连接,可以在集群上创建RDD、计数器、广播变量等。

除了 textFile 之外,SparkContext还有很多读取文件的成员函数:

创建一个临时view,以便使用SQL进行查询

// 创建一个临时表
df.createOrReplaceTempView("people_view")

// 查询临时表
spark.sql("select name from people_view").collect.foreach(println)
[Michael]
[Andy]
[Justin]

创建一个带时间分区的hive表,使用 PARQUET 格式存储

// 创建hive表
spark.sql("""
create table people_t (name STRING, age INT) 
using PARQUET 
PARTITIONED BY (`date` STRING COMMENT 'format: yyyyMMdd')
""")

注意:Scala语法里,可以通过一对三个双引号创建一个多行字符串。

通过 Spark SQL 将 DataFrame 中的数据写入Hive表特定分区

// 写入hive表 date=20220409
spark.sql("""
insert overwrite people_t partition(date='20220409')
select name, age from people_view
""")

写入以后,我们可以通过 SELECT 语句查询该表中的数据:

spark.sql("""select name, age from people_t where date = '20220409'""").show
+-------+---+
|   name|age|
+-------+---+
| Justin| 19|
|Michael| 29|
|   Andy| 30|
+-------+---+


细心的朋友可能会发现,前面我们用 collect 后面用 show,两个方法都会触发Spark提交Job,并将作业的执行结果回收到 driver 端。不同的是,collect 会回收全部结果存放到一个对象里,而show只取前20行,格式化以后打印到console。

初步了解 SparkSession 的成员函数和成员变量

我们打开一个 spark-shell 或 Jupyter Notebook后,第一条命令通常是 spark,用来查看 SparkSession 是否可用。然后可以通过 spark.xxx 访问对应函数或变量。常用的几个有:

  • 读取文件

上面的例子中,我们通过 sc.textFile 读取文件,但 spark.read.xxx 提供了更丰富的功能,比如读取 text/parquet/orc/json/csv/jdbc 等,然后返回DataFrame。spark.read 其实是一个 DataFrameReader 对象,需要了解其使用方式和实现的话,可以去看它的代码。


  /**
   * Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a
   * `DataFrame`.
   * {{{
   *   sparkSession.read.parquet("/path/to/file.parquet")
   *   sparkSession.read.schema(schema).json("/path/to/file.json")
   * }}}
   *
   * @since 2.0.0
   */
  def read: DataFrameReader = new DataFrameReader(self)

  /**
   * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`.
   * {{{
   *   sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
   *   sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
   * }}}
   *
   * @since 2.0.0
   */
  def readStream: DataStreamReader = new DataStreamReader(self)
  • 执行 SQL query,通常带上 collect/show/count 等触发执行的动作
def sql(sqlText: String): DataFrame

该函数返回一个 DataFrame。需要注意的是:如果是纯query,比如 select 语句,需要配合 collect/show/count 等动作才会触发执行;如果是 DDL/DML 命令,则会立即触发执行。

  • 创建 DataFrame

先说一下 DataFrame 和 Dataset,Dataset 是一个范型类,而 DataFrame 是Dataset 的一种去范型化类,Row 表示 Spark 运算符(map/flatMap/etc) 的一行输出。

# DataFrame 是 Dataset
type DataFrame = Dataset[Row]

Dataset 定义了很多关系型运算符,比如 map、flatMap、select、filter、where、groupBy、reduceBy、agg、union 等,我们可以使用这些声明时的语法实现自己的 Spark Application。关于 DataFrame 我们就介绍这么多,下面继续看创建 DataFrame 的方法。

由于Scala支持函数重载,createDataFrame 函数有八种实现,分别支持不同的参数类型:

第一类:接收 Product 子类的 RDD 或 数组作为参数,比如 case class,tuple。对于 Scala case class 或 tuple

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame

对于上面的例子,我们可以简化为:

case class People(name: String, age: Int)
val people =
  sc.textFile("examples/src/main/resources/people.txt").map(
    _.split(",")).map(p => People(p(0), p(1).trim.toInt))
val df = spark.createDataFrame(people)
df.printSchema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

在这个例子中,val people 是一个RDD[People]。由于 case class People 实现了 trait Product 和 trait TypeTag,Encoders通过类型参数A可以找到对应的解码器,从而简化使用。

第二类:指定数据的 schema 创建 DataFrame

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame

这个上面已经给出了一些例子,比较容易理解。

第三类:制定数据的 bean class 创建 DataFrame

def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame
def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame
def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame

Java Bean 是一种有很多限制的 Java class,比如必须有无参数的构造函数、禁止有private 成员变量、所有成员变量均有getter/setter方法等。写 Java Spark App 时,可能用到这三个方法,有兴趣的同学可以 Google 搜索 "POJO vs Java Beans - GeeksforGeeks" 了解更多信息。

关于 pyspark

我们在命令行启动 pyspark 时,它的使用方式与 spark-shell 基本一致,这是因为在 pyspark 里使用 Python 重新定义了 SparkSession、SparkContext、DataFrame类,字段名与 Scala 里几乎一样,行为通过 java-gateway 转发到 Scala SparkSession 执行。这里的技术细节,以后有时间再讲。

单纯从使用角度来说,pyspark 的 SparkSession 没有那么多的成员函数可以使用,在写代码时可参考 spark/python/pyspark/sql/session.py 下的代码,读取文件可参考 spark/python/pyspark/sql/readwriter.py 下的例子。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表