网站首页 > 博客文章 正文
Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在最新版Flink1.10版本,标志着对 Blink的整合宣告完成,达到了对 Hive 的生产级别集成,Hive作为数据仓库系统的绝对核心,承担着绝大多数的离线数据ETL计算和数据管理,期待Flink未来对Hive的完美支持。
而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。
添加依赖
要与Hive集成,需要在Flink的lib目录下添加额外的依赖jar包,以使集成在Table API程序或SQL Client中的SQL中起作用。或者,可以将这些依赖项放在文件夹中,并分别使用Table API程序或SQL Client 的-C
或-l
选项将它们添加到classpath中。本文使用第一种方式,即将jar包直接复制到$FLINK_HOME/lib目录下。本文使用的Hive版本为2.3.4(对于不同版本的Hive,可以参照官网选择不同的jar包依赖),总共需要3个jar包,如下:
flink-connector-hive_2.11-1.10.0.jar
flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
hive-exec-2.3.4.jar
其中hive-exec-2.3.4.jar在hive的lib文件夹下,另外两个需要自行下载,下载地址:flink-connector-hive_2.11-1.10.0.jar
[https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.10.0/]
flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
[https://maven.aliyun.com/mvn/search]
切莫拔剑四顾心茫然,话不多说,直接上代码。
构建程序
添加Maven依赖
<!--?Flink?Dependency?-->
<dependency>
??<groupId>org.apache.flink</groupId>
??<artifactId>flink-connector-hive_2.11</artifactId>
??<version>1.10.0</version>
??<scope>provided</scope>
</dependency>
<dependency>
??<groupId>org.apache.flink</groupId>
??<artifactId>flink-table-api-java-bridge_2.11</artifactId>
??<version>1.10.0</version>
??<scope>provided</scope>
</dependency>
<!--?Hive?Dependency?-->
<dependency>
????<groupId>org.apache.hive</groupId>
????<artifactId>hive-exec</artifactId>
????<version>${hive.version}</version>
????<scope>provided</scope>
</dependency>??
实例代码
package?com.flink.sql.hiveintegration;
import?org.apache.flink.table.api.EnvironmentSettings;
import?org.apache.flink.table.api.TableEnvironment;
import?org.apache.flink.table.catalog.hive.HiveCatalog;
/**
?*??@Created?with?IntelliJ?IDEA.
?*??@author?:?jmx
?*??@Date:?2020/3/31
?*??@Time:?13:22
?*??
?*/
public?class?FlinkHiveIntegration?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????EnvironmentSettings?settings?=?EnvironmentSettings
????????????????.newInstance()
????????????????.useBlinkPlanner()?//?使用BlinkPlanner
????????????????.inBatchMode()?//?Batch模式,默认为StreamingMode
????????????????.build();
????????//使用StreamingMode
???????/*?EnvironmentSettings?settings?=?EnvironmentSettings
????????????????.newInstance()
????????????????.useBlinkPlanner()?//?使用BlinkPlanner
????????????????.inStreamingMode()?//?StreamingMode
????????????????.build();*/
????????TableEnvironment?tableEnv?=?TableEnvironment.create(settings);
????????String?name?=?"myhive";??????//?Catalog名称,定义一个唯一的名称表示
????????String?defaultDatabase?=?"qfbap_ods";??//?默认数据库名称
????????String?hiveConfDir?=?"/opt/modules/apache-hive-2.3.4-bin/conf";??//?hive-site.xml路径
????????String?version?=?"2.3.4";???????//?Hive版本号
????????HiveCatalog?hive?=?new?HiveCatalog(name,?defaultDatabase,?hiveConfDir,?version);
????????tableEnv.registerCatalog("myhive",?hive);
????????tableEnv.useCatalog("myhive");
????????//?创建数据库,目前不支持创建hive表
????????String?createDbSql?=?"CREATE?DATABASE?IF?NOT?EXISTS?myhive.test123";
????????tableEnv.sqlUpdate(createDbSql);??
????}
}
Flink SQL Client集成Hive
Flink的表和SQL API可以处理用SQL语言编写的查询,但是这些查询需要嵌入到用Java或Scala编写的程序中。此外,这些程序在提交到集群之前需要与构建工具打包。这或多或少地限制了Java/Scala程序员对Flink的使用。
SQL客户端旨在提供一种简单的方式,无需一行Java或Scala代码,即可将表程序编写、调试和提交到Flink集群。Flink SQL客户端CLI允许通过命令行的形式运行分布式程序。使用Flink SQL cli访问Hive,需要配置sql-client-defaults.yaml文件。
sql-client-defaults.yaml配置
目前 HiveTableSink 不支持流式写入(未实现 AppendStreamTableSink)。需要将执行模式改成 batch
模式,否则会报如下错误:
org.apache.flink.table.api.TableException:?Stream?Tables?can?only?be?emitted?by?AppendStreamTableSink,?RetractStreamTableSink,?or?UpsertStreamTableSink.
需要修改的配置内容如下:
#...省略的配置项...
#==============================================================================
#?Catalogs
#==============================================================================
#?配置catalogs,可以配置多个.
catalogs:?#?empty?list
??-?name:?myhive
????type:?hive
????hive-conf-dir:?/opt/modules/apache-hive-2.3.4-bin/conf
????hive-version:?2.3.4
????default-database:?qfbap_ods
#...省略的配置项...
#==============================================================================
#?Execution?properties
#==============================================================================
#?Properties?that?change?the?fundamental?execution?behavior?of?a?table?program.
execution:
??#?select?the?implementation?responsible?for?planning?table?programs
??#?possible?values?are?'blink'?(used?by?default)?or?'old'
??planner:?blink
??#?'batch'?or?'streaming'?execution
??type:?batch
启动Flink SQL Cli
bin/sql-client.sh??embedded
在启动之前,确保Hive的metastore已经开启了,否则会报Failed to create Hive Metastore client异常。启动成功,如下图:
启动之后,就可以在此Cli下执行SQL命令访问Hive的表了,基本的操作如下:
--?命令行帮助
Flink?SQL>?help
--?查看当前会话的catalog,其中myhive为自己配置的,default_catalog为默认的
Flink?SQL>?show?catalogs;
default_catalog
myhive
--?使用catalog
Flink?SQL>?use?catalog?myhive;
--?查看当前catalog的数据库
Flink?SQL>?show?databases;
--?创建数据库
Flink?SQL>?create?database?testdb;
--?删除数据库
Flink?SQL>?drop?database?testdb;
--?创建表
Flink?SQL>?create?table?tbl(id?int,name?string);
--?删除表
Flink?SQL>?drop?table?tbl;
--?查询表
Flink?SQL>?select?*?from??code_city;
--?插入数据
Flink?SQL>?insert?overwrite?code_city?select?id,city,province,event_time?from?code_city_delta?;
Flink?SQL>?INSERT?into?code_city?values(1,'南京','江苏','');
小结
本文以最新版本的Flink为例,对Flink集成Hive进行了实操。首先通过代码的方式与Hive进行集成,然后介绍了如何使用Flink SQL 客户端访问Hive,并对其中会遇到的坑进行了描述,最后给出了Flink SQL Cli的详细使用。相信在未来的版本中Flink SQL会越来越完善,期待Flink未来对Hive的完美支持。
猜你喜欢
- 2024-10-07 Spark源码阅读:SparkSession类之spark对象的使用
- 2024-10-07 存储过程转hivesql有哪些注意事项
- 2024-10-07 0277-Impala并发查询缓慢问题解决方案
- 2024-10-07 画像笔记25-用户画像应用(10)-用户行为分析
- 2024-10-07 CASE … WHEN和cast类型转换(日志案例分析应用)
- 2024-10-07 hbase和hive集成映射(hive与hbase集成)
- 2024-10-07 Hive 导数据的两种方案(hive导入)
- 2024-10-07 Hive SQL常用命令总结,大数据开发人员按需收藏
- 2024-10-07 手撕数据仓库之「HQL规范篇」(数据仓库 sql)
- 2024-10-07 hive 之前操作脚本汇总(hive shell脚本)
你 发表评论:
欢迎- 07-08Google Cloud Platform 加入支持 Docker 的容器引擎
- 07-08日本KDDI与Google Cloud 签署合作备忘录,共探AI未来
- 07-08美国Infoblox与Google Cloud合作推出云原生网络和安全解决方案
- 07-08GoogleCloud为Spanner数据库引入HDD层,将冷存储成本降低80%
- 07-08谷歌推出Cloud Dataproc,缩短集群启动时间
- 07-08Infovista与Google Cloud携手推进射频网络规划革新
- 07-08比利时Odoo与Google Cloud建立增强合作,扩大全球影响力
- 07-08BT 和 Google Cloud 通过 Global Fabric 加速 AI 网络
- 最近发表
-
- Google Cloud Platform 加入支持 Docker 的容器引擎
- 日本KDDI与Google Cloud 签署合作备忘录,共探AI未来
- 美国Infoblox与Google Cloud合作推出云原生网络和安全解决方案
- GoogleCloud为Spanner数据库引入HDD层,将冷存储成本降低80%
- 谷歌推出Cloud Dataproc,缩短集群启动时间
- Infovista与Google Cloud携手推进射频网络规划革新
- 比利时Odoo与Google Cloud建立增强合作,扩大全球影响力
- BT 和 Google Cloud 通过 Global Fabric 加速 AI 网络
- NCSA和Google Cloud合作开发AI驱动的网络防御系统,加强泰国网络空间的安全性
- SAP将在沙特阿拉伯 Google Cloud 上推出BTP服务
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- messagesource (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)