网站首页 > 博客文章 正文
序
本文主要研究一下flink TableEnvironment的scan操作
实例
//Scanning a directly registered table val tab: Table = tableEnv.scan("tableName") ? //Scanning a table from a registered catalog val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
- scan操作用于从schema读取指定的table,也可以传入catalogName及dbName从指定的catalog及db读取
TableEnvironment.scan
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment.scala
abstract class TableEnvironment(val config: TableConfig) { ? private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false) private val rootSchema: SchemaPlus = internalSchema.plus() ? //...... ? @throws[TableException] @varargs def scan(tablePath: String*): Table = { scanInternal(tablePath.toArray) match { case Some(table) => table case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.") } } ? private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = { require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.") val schemaPaths = tablePath.slice(0, tablePath.length - 1) val schema = getSchema(schemaPaths) if (schema != null) { val tableName = tablePath(tablePath.length - 1) val table = schema.getTable(tableName) if (table != null) { return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory)))) } } None } ? private def getSchema(schemaPath: Array[String]): SchemaPlus = { var schema = rootSchema for (schemaName <- schemaPath) { schema = schema.getSubSchema(schemaName) if (schema == null) { return schema } } schema } ? //...... }
- scan方法内部调用的是scanInternal,scanInternal首先读取catalog及db信息,然后调用getSchema方法来获取schema
- getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
- 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table
小结
- TableEnvironment的scan操作就是从Schema中查找Table,可以使用tableName,或者额外指定catalog及db来查找
- getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
- 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table
doc
- Table API
- 上一篇: 5 年迭代 5 次,抖音推荐系统演进历程
- 下一篇: Flink架构、原理(flinkr)
猜你喜欢
- 2024-10-03 FlinkSQL全面指南(flinksql udf)
- 2024-10-03 Apache Flink学习笔记(六)Table API
- 2024-10-03 Flink Table API & SQL 聚合性能调优
- 2024-10-03 美团点评基于 Flink 的实时数仓平台实践
- 2024-10-03 Flink SQL 动态表 & 连续查询详解(建议收藏)
- 2024-10-03 Flink 1.11 与 Hive 批流一体数仓实践
- 2024-10-03 Flink SQL中的动态表和临时表(flink sql动态查询)
- 2024-10-03 三分钟速懂大数据Flink | 窗口操作
- 2024-10-03 大数据_Flink_Java版_Table API 和 Flink SQL(1)_基本介绍和简单示例
- 2024-10-03 新一代大数据计算引擎 Flink从入门到实战
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- messagesource (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)