Flink1.7 Table API&SQL源码系列之TableEnvironment
一、TableEnvironment的示例使用
为了更好的直观的理解TableEnvironment,下面我给出了Java和Scala代码使用示例。
Java代码示例
// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建一个TableEnvironment
// 对于批处理程序来说使用 BatchTableEnvironment 替换 StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 注册一个 Table
tableEnv.registerTable("table1", ...) // 或者
tableEnv.registerTableSource("table2", ...); // 或者
tableEnv.registerExternalCatalog("extCat", ...); //注册外部的Catalog目录
//将datastream注册成一个 Table
tableEnv.registerDataStream("stream1",...)
//注册一个 udf
tableEnv.registerFunction("addFun1",...)
// 从Table API的查询中创建一个Table
Table tabResult = tableEnv.scan("table1").select(...);
// 从SQL查询中创建一个Table
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// 用来更新执行Insert语句 目前值支持Insert
tableEnv.sqlUpdate("SELECT ... FROM table2 ... ");
// 将Table API中的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.insertInto(...);
// 执行
env.execute();
Scala代码示例
// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注册一个 Table
tableEnv.registerTable("table1", ...) // 或者
tableEnv.registerTableSource("table2", ...) // 或者
tableEnv.registerExternalCatalog("extCat", ...)//注册外部的Catalog目录
//将datastream注册成一个 Table
tableEnv.registerDataStream("stream1",...)
//注册一个 udf
tableEnv.registerFunction("addFun1",...)
// 从Table API的查询中创建一个Table
val tabResult = tableEnv.scan("table1").select(...)
// 从SQL查询中创建一个Table
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// 用来更新执行Insert语句 目前值支持Insert
tableEnv.sqlUpdate("SELECT ... FROM table2 ... ")
// 将Table API中的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.insertInto(...)
// 执行
env.execute()
二、理解TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念,从上面示例看出它主要负责:
- 在内部目录中注册一个Table
- 注册一个外部目录
- 执行SQL查询或执行Insert语句更新
- 注册一个用户自定义函数(标量、表及聚合)
- 将DataStream或者DataSet转换成Table
- 将Table转换为DataSetDataStream或者DataSet
- 在已注册的表上指定SQL查询以获取Table
持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
注意:一个Table总是会绑定到一个指定的
TableEnvironment
中,相同的查询不同的TableEnvironment
目前是无法通过join、union合并在一起。
目前Flink1.7版本TableEnvironment
从Java和Scala编程语言,流和批计算模型来来分为以下3个抽象类和4个实体类:
维度 | 类名 |
---|---|
抽象类 | TableEnvironment(基类),StreamTableEnvironment和BatchTableEnvironment |
Java | StreamTableEnvironment和BatchTableEnvironment(实体类) |
Scala | StreamTableEnvironment和BatchTableEnvironment(实体类) |
TableEnvironment
的继承关系如下图所示(图片来源) :
三、TableEnvironment的抽象与实现
1)TableEnvironment抽象类
TableEnvironment基类在源码org.apache.flink.table.api
包中,StreamTableEnvironment和BatchTableEnvironment这两个抽象类继承于基类TableEnvironment。接下来先介绍TableEnvironment基类。
1. TableEnvironment抽象基类
/**
* The abstract base class for batch and stream TableEnvironments.
*
* @param config The configuration of the TableEnvironment
*/
abstract class TableEnvironment(val config: TableConfig) {
// 用于保存所有已注册和已转换表的目录
// 这里禁用缓存以防止副作用
private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
private val rootSchema: SchemaPlus = internalSchema.plus()
// Table API/SQL 函数目录
private[flink] val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
// 用于创建Calcite计划程序的配置
private lazy val frameworkConfig: FrameworkConfig = Frameworks
.newConfigBuilder
.defaultSchema(rootSchema)
.parserConfig(getSqlParserConfig)
.costFactory(new DataSetCostFactory)
.typeSystem(new FlinkTypeSystem)
.operatorTable(getSqlOperatorTable)
.sqlToRelConverterConfig(getSqlToRelConverterConfig)
//设定executor用来执行常量表达式
.executor(new ExpressionReducer(config))
.build
// 构建Calcite的RelNodes(相关结点),可以用来获取和表示Calcite的一个关系表达式树.
protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig)
// 用于优化此TableEnvironment查询的planner实例
private lazy val planner: RelOptPlanner = relBuilder.getPlanner
private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
// 一个属性名称唯一的计数器
private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
// 注册外部catalog名到catalog中
private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog]
......
}
从上面源码中可以看出TableEnvironment
抽象基类主要通过传入的TableConfig
创建了一个FlinkRelBuilder
,熟悉Calcite
的人对这个RelBuilder
应该很熟悉,后面会写文章专门讲解Calcite
。这个RelBuilder
可以用来获取关系表达式树,获取查询优化RelOptPlanner
,获取typeFactory
。除此之外还定义了fromTableSource
,scan
,registerTable
,sqlQuery
,registerTableSink
等操作方法。
2. StreamTableEnvironment抽象类
3. BatchTableEnvironment抽象类
2)TableEnvironment实体类
从之前示例和上面源码中可以知道,TableEnvironment通过调用带有参数StreamExecutionEnvironment或者ExecutionEnvironment和一个可选参数TableConfig的静态方法TableEnvironment.getTableEnvironment()来创建。TableConfig可以用来配置TableEnvironment或者自定义查询优化器和转换过程(参考查询优化器)。
四、正在讨论及改进
目前发现因为TableEnvironment
很多版本,这就造成了用户在选择使用TableEnvironment
不知道选取哪一个,为了统一和简化对用户的操作目前社区正在讨论,详细内容请看下面连接: 讨论及改进。