专业的编程技术博客社区

网站首页 > 博客文章 正文

聊聊flink TableEnvironment的scan操作

baijin 2024-10-03 17:37:03 博客文章 4 ℃ 0 评论

本文主要研究一下flink TableEnvironment的scan操作

实例

//Scanning a directly registered table
val tab: Table = tableEnv.scan("tableName")
?
//Scanning a table from a registered catalog
val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
  • scan操作用于从schema读取指定的table,也可以传入catalogName及dbName从指定的catalog及db读取

TableEnvironment.scan

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment.scala

abstract class TableEnvironment(val config: TableConfig) {
?
 private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
 private val rootSchema: SchemaPlus = internalSchema.plus()
?
 //......
?
 @throws[TableException]
 @varargs
 def scan(tablePath: String*): Table = {
 scanInternal(tablePath.toArray) match {
 case Some(table) => table
 case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.")
 }
 }
?
 private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
 require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
 val schemaPaths = tablePath.slice(0, tablePath.length - 1)
 val schema = getSchema(schemaPaths)
 if (schema != null) {
 val tableName = tablePath(tablePath.length - 1)
 val table = schema.getTable(tableName)
 if (table != null) {
 return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory))))
 }
 }
 None
 }
?
 private def getSchema(schemaPath: Array[String]): SchemaPlus = {
 var schema = rootSchema
 for (schemaName <- schemaPath) {
 schema = schema.getSubSchema(schemaName)
 if (schema == null) {
 return schema
 }
 }
 schema
 }
?
 //......
}
  • scan方法内部调用的是scanInternal,scanInternal首先读取catalog及db信息,然后调用getSchema方法来获取schema
  • getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
  • 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table

小结

  • TableEnvironment的scan操作就是从Schema中查找Table,可以使用tableName,或者额外指定catalog及db来查找
  • getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
  • 获取到schema之后,就可以从tablePath数组获取tableName(数组最后一个元素),调用SchemaPlus的getTable方法查找Table

doc

  • Table API

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表