网站首页 > 博客文章 正文
之前我们聊过 spark-shell 的代码实现,它是我们了解、学习或调试 Spark作业 的重要工具。这个系列我们了解一下SparkSession类的代码实现。本篇作为起始,先介绍下 spark 对象常见的使用方式,包括 Dataset API 和 Spark SQL;后续会详细介绍 SparkSession 对象创建以及功能相关的代码逻辑。
前置条件
本地构建时,增加对 hive 和 JDBC的支持
./build/mvn -Pyarn -Phive -Phive-thriftserver -DskipTests clean package
启动 bin/spark-shell,并做一些简单的测试
./bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/10 11:58:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.0.109:4040
Spark context available as 'sc' (master = local[*], app id = local-1649563140564).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_301)
Type in expressions to have them evaluated.
Type :help for more information.
在启动日志里,我们可以看到 Spark context Web UI available at http://192.168.0.109:4040 的字样,Command+双击 在浏览器里打开该链接。切换到 Environment Tab 可以看到application配置,如下图所示:
SparkSession类的成员变量 sessionState 存储了 spark 对象的各种状态,比如 SQL 配置参数、创建的临时表、注册的UDF、catalog (HiveSessionCatalog)、sqlParser、plan 的 analyzerBuilder、optimizerBuilder和planner 等。对于配置信息,我们可以在 shell 里执行 Scala 代码进行查看:
scala> spark.sessionState.conf.getAllConfs.foreach(println(_))
(spark.sql.warehouse.dir,file:/Users/user/go/src/github.com/apache/spark/spark-warehouse)
(spark.driver.host,192.168.0.109)
(spark.driver.port,57611)
(spark.repl.class.uri,spark://192.168.0.109:57611/classes)
(spark.jars,)
(spark.repl.class.outputDir,/private/var/folders/f4/cq91c9wj297fqb4c42tllrd80000gp/T/spark-462c360a-1b35-4b0f-a466-577d4c885f85/repl-11347ed9-e8d9-4db6-8d10-978de2bbe811)
(spark.app.name,Spark shell)
(spark.submit.pyFiles,)
(spark.ui.showConsoleProgress,true)
(spark.app.startTime,1649558999861)
(spark.executor.id,driver)
(spark.submit.deployMode,client)
(spark.master,local[*])
(spark.home,/Users/user/go/src/github.com/apache/spark)
(spark.sql.catalogImplementation,hive)
(spark.app.id,local-1649559001343)
一个案例:读取本地文件写入Hive。
为了便于后面读代码,我们先看一个例子:读取本地的 examples/src/main/resources/people.txt 的文件,并写入Hive表 poeple_t 里。
people.txt 里只有三行,两列分别是 name 和 age:
Michael, 29
Andy, 30
Justin, 19
下面我们将文件读取到一个结构化的 DataFrame 里:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// 创建schema
val schema =
StructType(
StructField("name", StringType, false) ::
StructField("age", IntegerType, true) :: Nil)
//
val people =
sc.textFile("examples/src/main/resources/people.txt").map(
_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val df = spark.createDataFrame(people, schema)
df.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)
注意:这里 sc 和 spark.sparkContext 是同一个对象,它是Spark的功能入口,代表Spark集群的一个连接,可以在集群上创建RDD、计数器、广播变量等。
除了 textFile 之外,SparkContext还有很多读取文件的成员函数:
创建一个临时view,以便使用SQL进行查询
// 创建一个临时表
df.createOrReplaceTempView("people_view")
// 查询临时表
spark.sql("select name from people_view").collect.foreach(println)
[Michael]
[Andy]
[Justin]
创建一个带时间分区的hive表,使用 PARQUET 格式存储
// 创建hive表
spark.sql("""
create table people_t (name STRING, age INT)
using PARQUET
PARTITIONED BY (`date` STRING COMMENT 'format: yyyyMMdd')
""")
注意:Scala语法里,可以通过一对三个双引号创建一个多行字符串。
通过 Spark SQL 将 DataFrame 中的数据写入Hive表特定分区
// 写入hive表 date=20220409
spark.sql("""
insert overwrite people_t partition(date='20220409')
select name, age from people_view
""")
写入以后,我们可以通过 SELECT 语句查询该表中的数据:
spark.sql("""select name, age from people_t where date = '20220409'""").show
+-------+---+
| name|age|
+-------+---+
| Justin| 19|
|Michael| 29|
| Andy| 30|
+-------+---+
细心的朋友可能会发现,前面我们用 collect 后面用 show,两个方法都会触发Spark提交Job,并将作业的执行结果回收到 driver 端。不同的是,collect 会回收全部结果存放到一个对象里,而show只取前20行,格式化以后打印到console。
初步了解 SparkSession 的成员函数和成员变量
我们打开一个 spark-shell 或 Jupyter Notebook后,第一条命令通常是 spark,用来查看 SparkSession 是否可用。然后可以通过 spark.xxx 访问对应函数或变量。常用的几个有:
- 读取文件
上面的例子中,我们通过 sc.textFile 读取文件,但 spark.read.xxx 提供了更丰富的功能,比如读取 text/parquet/orc/json/csv/jdbc 等,然后返回DataFrame。spark.read 其实是一个 DataFrameReader 对象,需要了解其使用方式和实现的话,可以去看它的代码。
/**
* Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a
* `DataFrame`.
* {{{
* sparkSession.read.parquet("/path/to/file.parquet")
* sparkSession.read.schema(schema).json("/path/to/file.json")
* }}}
*
* @since 2.0.0
*/
def read: DataFrameReader = new DataFrameReader(self)
/**
* Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`.
* {{{
* sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
* sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
* }}}
*
* @since 2.0.0
*/
def readStream: DataStreamReader = new DataStreamReader(self)
- 执行 SQL query,通常带上 collect/show/count 等触发执行的动作
def sql(sqlText: String): DataFrame
该函数返回一个 DataFrame。需要注意的是:如果是纯query,比如 select 语句,需要配合 collect/show/count 等动作才会触发执行;如果是 DDL/DML 命令,则会立即触发执行。
- 创建 DataFrame
先说一下 DataFrame 和 Dataset,Dataset 是一个范型类,而 DataFrame 是Dataset 的一种去范型化类,Row 表示 Spark 运算符(map/flatMap/etc) 的一行输出。
# DataFrame 是 Dataset
type DataFrame = Dataset[Row]
Dataset 定义了很多关系型运算符,比如 map、flatMap、select、filter、where、groupBy、reduceBy、agg、union 等,我们可以使用这些声明时的语法实现自己的 Spark Application。关于 DataFrame 我们就介绍这么多,下面继续看创建 DataFrame 的方法。
由于Scala支持函数重载,createDataFrame 函数有八种实现,分别支持不同的参数类型:
第一类:接收 Product 子类的 RDD 或 数组作为参数,比如 case class,tuple。对于 Scala case class 或 tuple
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
对于上面的例子,我们可以简化为:
case class People(name: String, age: Int)
val people =
sc.textFile("examples/src/main/resources/people.txt").map(
_.split(",")).map(p => People(p(0), p(1).trim.toInt))
val df = spark.createDataFrame(people)
df.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
在这个例子中,val people 是一个RDD[People]。由于 case class People 实现了 trait Product 和 trait TypeTag,Encoders通过类型参数A可以找到对应的解码器,从而简化使用。
第二类:指定数据的 schema 创建 DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
这个上面已经给出了一些例子,比较容易理解。
第三类:制定数据的 bean class 创建 DataFrame
def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame
def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame
def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame
Java Bean 是一种有很多限制的 Java class,比如必须有无参数的构造函数、禁止有private 成员变量、所有成员变量均有getter/setter方法等。写 Java Spark App 时,可能用到这三个方法,有兴趣的同学可以 Google 搜索 "POJO vs Java Beans - GeeksforGeeks" 了解更多信息。
关于 pyspark
我们在命令行启动 pyspark 时,它的使用方式与 spark-shell 基本一致,这是因为在 pyspark 里使用 Python 重新定义了 SparkSession、SparkContext、DataFrame类,字段名与 Scala 里几乎一样,行为通过 java-gateway 转发到 Scala SparkSession 执行。这里的技术细节,以后有时间再讲。
单纯从使用角度来说,pyspark 的 SparkSession 没有那么多的成员函数可以使用,在写代码时可参考 spark/python/pyspark/sql/session.py 下的代码,读取文件可参考 spark/python/pyspark/sql/readwriter.py 下的例子。
猜你喜欢
- 2024-10-07 Flink1.10集成Hive快速入门(flink集群)
- 2024-10-07 存储过程转hivesql有哪些注意事项
- 2024-10-07 0277-Impala并发查询缓慢问题解决方案
- 2024-10-07 画像笔记25-用户画像应用(10)-用户行为分析
- 2024-10-07 CASE … WHEN和cast类型转换(日志案例分析应用)
- 2024-10-07 hbase和hive集成映射(hive与hbase集成)
- 2024-10-07 Hive 导数据的两种方案(hive导入)
- 2024-10-07 Hive SQL常用命令总结,大数据开发人员按需收藏
- 2024-10-07 手撕数据仓库之「HQL规范篇」(数据仓库 sql)
- 2024-10-07 hive 之前操作脚本汇总(hive shell脚本)
你 发表评论:
欢迎- 07-08Google Cloud Platform 加入支持 Docker 的容器引擎
- 07-08日本KDDI与Google Cloud 签署合作备忘录,共探AI未来
- 07-08美国Infoblox与Google Cloud合作推出云原生网络和安全解决方案
- 07-08GoogleCloud为Spanner数据库引入HDD层,将冷存储成本降低80%
- 07-08谷歌推出Cloud Dataproc,缩短集群启动时间
- 07-08Infovista与Google Cloud携手推进射频网络规划革新
- 07-08比利时Odoo与Google Cloud建立增强合作,扩大全球影响力
- 07-08BT 和 Google Cloud 通过 Global Fabric 加速 AI 网络
- 最近发表
-
- Google Cloud Platform 加入支持 Docker 的容器引擎
- 日本KDDI与Google Cloud 签署合作备忘录,共探AI未来
- 美国Infoblox与Google Cloud合作推出云原生网络和安全解决方案
- GoogleCloud为Spanner数据库引入HDD层,将冷存储成本降低80%
- 谷歌推出Cloud Dataproc,缩短集群启动时间
- Infovista与Google Cloud携手推进射频网络规划革新
- 比利时Odoo与Google Cloud建立增强合作,扩大全球影响力
- BT 和 Google Cloud 通过 Global Fabric 加速 AI 网络
- NCSA和Google Cloud合作开发AI驱动的网络防御系统,加强泰国网络空间的安全性
- SAP将在沙特阿拉伯 Google Cloud 上推出BTP服务
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- messagesource (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)