Flink1.7 Table API&SQL源码系列之TableEnvironment

Flink1.7 Table API&SQL源码系列之TableEnvironment

一、TableEnvironment的示例使用

为了更好的直观的理解TableEnvironment,下面我给出了Java和Scala代码使用示例。

Java代码示例

// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 创建一个TableEnvironment
// 对于批处理程序来说使用 BatchTableEnvironment 替换 StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 注册一个 Table
tableEnv.registerTable("table1", ...)            // 或者
tableEnv.registerTableSource("table2", ...);     // 或者
tableEnv.registerExternalCatalog("extCat", ...); //注册外部的Catalog目录
//将datastream注册成一个 Table
tableEnv.registerDataStream("stream1",...)
//注册一个 udf
tableEnv.registerFunction("addFun1",...)

// 从Table API的查询中创建一个Table
Table tabResult = tableEnv.scan("table1").select(...);
// 从SQL查询中创建一个Table
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// 用来更新执行Insert语句 目前值支持Insert
tableEnv.sqlUpdate("SELECT ... FROM table2 ... ");

// 将Table API中的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.insertInto(...);

// 执行
env.execute();

Scala代码示例

// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注册一个 Table
tableEnv.registerTable("table1", ...)           // 或者
tableEnv.registerTableSource("table2", ...)     // 或者
tableEnv.registerExternalCatalog("extCat", ...)//注册外部的Catalog目录
//将datastream注册成一个 Table
tableEnv.registerDataStream("stream1",...)
//注册一个 udf
tableEnv.registerFunction("addFun1",...)

// 从Table API的查询中创建一个Table
val tabResult = tableEnv.scan("table1").select(...)
// 从SQL查询中创建一个Table
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// 用来更新执行Insert语句 目前值支持Insert
tableEnv.sqlUpdate("SELECT ... FROM table2 ... ")

// 将Table API中的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.insertInto(...)

// 执行
env.execute()

二、理解TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念,从上面示例看出它主要负责:

  1. 在内部目录中注册一个Table
  2. 注册一个外部目录
  3. 执行SQL查询或执行Insert语句更新
  4. 注册一个用户自定义函数(标量、表及聚合)
  5. 将DataStream或者DataSet转换成Table
  6. 将Table转换为DataSetDataStream或者DataSet
  7. 在已注册的表上指定SQL查询以获取Table
  8. 持有ExecutionEnvironment或者StreamExecutionEnvironment的引用

    注意:一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment目前是无法通过join、union合并在一起。

目前Flink1.7版本TableEnvironment从Java和Scala编程语言,流和批计算模型来来分为以下3个抽象类和4个实体类:

维度 类名
抽象类 TableEnvironment(基类),StreamTableEnvironment和BatchTableEnvironment
Java StreamTableEnvironment和BatchTableEnvironment(实体类)
Scala StreamTableEnvironment和BatchTableEnvironment(实体类)

TableEnvironment的继承关系如下图所示(图片来源) :
img

三、TableEnvironment的抽象与实现

1)TableEnvironment抽象类

TableEnvironment基类在源码org.apache.flink.table.api包中,StreamTableEnvironment和BatchTableEnvironment这两个抽象类继承于基类TableEnvironment。接下来先介绍TableEnvironment基类。

1. TableEnvironment抽象基类

/**
  * The abstract base class for batch and stream TableEnvironments.
  *
  * @param config The configuration of the TableEnvironment
  */
abstract class TableEnvironment(val config: TableConfig) {

  // 用于保存所有已注册和已转换表的目录
  // 这里禁用缓存以防止副作用
  private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
  private val rootSchema: SchemaPlus = internalSchema.plus()

  // Table API/SQL 函数目录
  private[flink] val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns

  // 用于创建Calcite计划程序的配置
  private lazy val frameworkConfig: FrameworkConfig = Frameworks
    .newConfigBuilder
    .defaultSchema(rootSchema)
    .parserConfig(getSqlParserConfig)
    .costFactory(new DataSetCostFactory)
    .typeSystem(new FlinkTypeSystem)
    .operatorTable(getSqlOperatorTable)
    .sqlToRelConverterConfig(getSqlToRelConverterConfig)
    //设定executor用来执行常量表达式
    .executor(new ExpressionReducer(config))
    .build

  // 构建Calcite的RelNodes(相关结点),可以用来获取和表示Calcite的一个关系表达式树.
  protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig)

  // 用于优化此TableEnvironment查询的planner实例
  private lazy val planner: RelOptPlanner = relBuilder.getPlanner

  private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory

  // 一个属性名称唯一的计数器
  private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)

  // 注册外部catalog名到catalog中
  private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog]
  ......
}

从上面源码中可以看出TableEnvironment抽象基类主要通过传入的TableConfig创建了一个FlinkRelBuilder,熟悉Calcite的人对这个RelBuilder应该很熟悉,后面会写文章专门讲解Calcite。这个RelBuilder可以用来获取关系表达式树,获取查询优化RelOptPlanner,获取typeFactory。除此之外还定义了fromTableSourcescanregisterTablesqlQueryregisterTableSink等操作方法。

2. StreamTableEnvironment抽象类

3. BatchTableEnvironment抽象类

2)TableEnvironment实体类

从之前示例和上面源码中可以知道,TableEnvironment通过调用带有参数StreamExecutionEnvironment或者ExecutionEnvironment和一个可选参数TableConfig的静态方法TableEnvironment.getTableEnvironment()来创建。TableConfig可以用来配置TableEnvironment或者自定义查询优化器和转换过程(参考查询优化器)。

参考1

四、正在讨论及改进

目前发现因为TableEnvironment很多版本,这就造成了用户在选择使用TableEnvironment不知道选取哪一个,为了统一和简化对用户的操作目前社区正在讨论,详细内容请看下面连接: 讨论及改进。


 上一篇
十大经典排序算法动画与解析(配代码完全版) 十大经典排序算法动画与解析(配代码完全版)
十大经典排序算法动画与解析(配代码完全版)排序算法是《数据结构与算法》中最基本的算法之一。 排序算法可以分为内部排序和外部排序。 内部排序是数据记录在内存中进行排序。 而外部排序是因排序的数据很大,一次不能容纳全部的排序记录,在排序过程中需
2019-01-01
下一篇 
面试系列-Jvm看这篇就够了 面试系列-Jvm看这篇就够了
面试系列-Jvm看这篇就够了一. JVM基础知识1)Java 是如何实现跨平台的?注意:跨平台的是 Java 程序,而不是 JVM。JVM 是用 C/C++ 开发的,是编译后的机器码,不能跨平台,不同平台下需要安装不同版本的 JVM 答:
2018-12-29
  目录