社会养老保险新政策

usdt充币教程(www.6allbet.com):Flink SQL 性能优化:multiple input 详解

来源:三公开船 发布时间:2021-02-26 浏览次数:

USDT自动充值

菜宝钱包(caibao.it)是使用TRC-20协议的Usdt第三方支付平台,Usdt收款平台、Usdt自动充提平台、usdt跑分平台。免费提供入金通道、Usdt钱包支付接口、Usdt自动充值接口、Usdt无需实名寄售回收。菜宝Usdt钱包一键生成Usdt钱包、一键调用API接口、一键无实名出售Usdt。

原题目:Flink SQL 性能优化:multiple input 详解

简介: 在 Flink 1.12 中,针对现在 operator chaining 无法笼罩的场景,推出了 multiple input operator 与 source chaining 优化。该优化将消除 Flink 作业中大多数冗余 shuffle,进一步提高作业的执行效率。本文将以一个 SQL 作业为例先容上述优化,并展示 Flink 1.12 在 TPC-DS 测试集上取得的功效。

执行效率的优化一直是 Flink 追寻的目的。在大多数作业,特别是批作业中,数据通过网络在 task 之间通报(称为数据 shuffle)的价值较大。正常情形下一条数据经由网络需要经由序列化、磁盘读写、socket 读写与反序列化等艰难险阻,才气从上游 task 传输到下游;而相同数据在内存中的传输,仅需要花费几个 CPU 周期传输一个八字节指针即可。

Flink 在早期版本中已经通过 operator chaining 机制,将并发相同的相邻单输入算子整合进统一个 task 中,消除了单输入算子之间不需要的网络传输。然而,join 等多输入算子之间同样存在分外的数据 shuffle 问题,shuffle 数据量最大的 source 节点与多输入算子之间的数据传输也无法行使 operator chaining 机制举行优化。

在 Flink 1.12 中,我们针对现在 operator chaining 无法笼罩的场景,推出了 multiple input operator 与 source chaining 优化。该优化将消除 Flink 作业中大多数冗余 shuffle,进一步提高作业的执行效率。本文将以一个 SQL 作业为例先容上述优化,并展示 Flink 1.12 在 TPC-DS 测试集上取得的功效。

我们将以 TPC-DS q96 为例子详细先容若何消除冗余 shuffle,该 SQL 意在通过多路 join 筛选并统计相符特定条件的订单量。

select count(*)

from store_sales

,household_demographics

,time_dim, store

where ss_sold_time_sk = time_dim.t_time_sk

and ss_hdemo_sk = household_demographics.hd_demo_sk

and ss_store_sk = s_store_sk

and time_dim.t_hour = 8

and time_dim.t_minute >= 30

and household_demographics.hd_dep_count = 5

and store.s_store_name = 'ese'

图 1 - 初始执行设计

冗余 Shuffle 是若何发生的?

由于部门算子对输入数据的漫衍有要求(如 hash join 算子要求统一并发内数据 join key 的 hash 值相同),数据在算子之间通报时可能需要经由重新排布与整理。与 map-reduce 的 shuffle 历程类似,Flink shuffle 将上游 task 发生的中心效果举行整理,并按需发送给需要这些中心效果的下游 task。但在一部门情形下,上游产出的数据已经知足了数据漫衍要求(如延续多个 join key 相同的 hash join 算子),此时对数据的整理便不再需要,由此发生的 shuffle 也就成为了冗余 shuffle,在执行设计中以 forward shuffle 示意。

图 1 中的 hash join 算子是一种称为 broadcast hash join 的特殊算子。以 store_sales join time_dim 为例,由于 time_dim 表数据量很小,此时通过 broadcast shuffle 将该表的全量数据发送给 hash join 的每个并发,就能让任何并发接受 store_sales 表的随便数据而不影响 join 效果的正确性,同时提高 hash join 的执行效率。此时 store_sales 表向 join 算子的网络传输也成为了冗余 shuffle。同理几个 join 之间的 shuffle 也是不需要的。

图 2 - 冗余的shuffle(红框符号)

除 hash join 与 broadcast hash join 外,发生冗余 shuffle 的场景另有许多,例如 group key 与 join key 相同的 hash aggregate hash join、group key 具有包罗关系的多个 hash aggregate 等等,这里不再睁开形貌。

Operator Chaining 能解决吗?

对 Flink 优化历程有一定领会的读者可能会知道,为了消除不需要的 forward shuffle,Flink 在早期就已经引入了 operator chaining 机制。该机制将并发相同的相邻单输入算子整合进统一个 task 中,并在统一个线程中一起运算。Operator chaining 机制在图 1 中实在已经在发挥作用,若是没有它,做 broadcast shuffle 的三个 Source 节点名称中被“->”分开的算子将会被拆分至多个差别的 task,发生冗余的数据 shuffle。图 3 为 Operator chaining 关闭是的执行设计。

图 3 - Operator chaining关闭后的执行设计

削减数据在 TM 之间通过网络和文件传输并将算子链接合并入 task 是异常有用的优化:它能削减线程之间的切换,削减新闻的序列化与反序列化,削减数据在缓冲区的交流,并削减延迟的同时提高整体吞吐量。然而,operator chaining 对算子的整合有异常严酷的限制,其中一条就是“下游算子的入度为 1”,也就是说下游算子只能有一起输入。这就将多路输入的算子(如 join)清扫在外。

,

usdt收款平台

菜宝钱包(caibao.it)是使用TRC-20协议的Usdt第三方支付平台,Usdt收款平台、Usdt自动充提平台、usdt跑分平台。免费提供入金通道、Usdt钱包支付接口、Usdt自动充值接口、Usdt无需实名寄售回收。菜宝Usdt钱包一键生成Usdt钱包、一键调用API接口、一键无实名出售Usdt。

,

多输入算子的解决方案:Multiple Input Operator

若是我们能模仿 operator chaining 的优化思绪,引入新的优化机制并知足以下条件:

  1. 该机制支持多路输入(为被组合的算子提供输入)

我们就可以将用 forward shuffle 毗邻的的多输入算子放到一个 task 里执行,从而消除不需要的 shuffle。Flink 社区很早就关注到了 operator chaining 的不足,在 Flink 1.11 中引入了 streaming api 层的

MultipleInputTransformation 以及对应的 MultipleInputStreamTask。这些 api 知足了上述条件 2,而 Flink 1.12 在此基础上在 SQL 层中实现了知足条件 1 的新算子——multiple input operator,可以参考 FLIP 文档[1]。

Multiple input operator 是 table 层一个可插拔的优化。它位于 table 层优化的最后一步,遍历天生的执行设计并将不被 exchange 阻隔的相邻算子整合进一个 multiple input operator 中。图 4 展示了该优化对原本 SQL 优化步骤的修改。

图 4 - 加入 multiple input operator 后的优化步骤

读者可能会有疑问:为什么不在现有的 operator chaining 上举行修改,而要重新努力别辟门户呢?实际上,multiple input operator 除了要完成 operator chaining 的事情之外,还需要对各个输入的优先级举行排序。这是由于一部门多输入算子(如 hash join 与 nested loop join)对输入有严酷的顺序限制,若输入优先级排序欠妥很可能造成死锁。由于算子输入优先级的信息仅在 table 层的算子中有形貌,加倍自然的方式是在 table 层引入该优化机制。

值得注意的是,multiple input operator 差别于治理多个 operator 的 operator chaining,其自己就是一整个大 operator,而其内部运算在外界看来就是一个黑盒。Multiple input operator 的内部结构在 operator name 中完全体现,读者在运行包罗该 operator 的作业时,可以从 operator name 看到哪些算子以怎样的拓扑结构被组合进了 multiple input operator 中。

图 5 展示了经由 multiple input 优化后的算子的拓扑图以及 multiple input operator 的透视图。图中三个 hash join 算子之间的冗余的 shuffle 被移除后,它们可以在一个 task 里执行,只不外 operator chaining 没法处置这种多输入的情形,将它们放到 multiple input operator 里执行,由 multiple input operator 治理各个算子的输入顺序和算子之间的挪用关系。

图 5 - 经由 multiple input 优化后的算子拓扑图

Multiple input operator 的构建和运行历程较为庞大,对此细节有兴趣的读者可以参考设计文档[2]。

Source 也不能遗漏:Source Chaining

经由 multiple input operator 的优化,我们将图 1 中的执行设计优化为图 6,图 3 经由 operator chaining 优化后就变为图 6 的执行图。

图 6 - 经由 multiple input operator 优化后的执行设计

图 6 中从 store_sales 表发生的 forward shuffle(如红框所示)示意我们仍有优化空间。正如序言中所说,在大部门作业中,从 source 直接发生的数据由于没有经由 join 等算子的筛选和加工,shuffle 的数据量是最大的。以 10T 数据下的 TPC-DS q96 为例,若是不举行进一步优化,包罗 store_sales 源表的 task 将向网络中传输 1.03T 的数据,而经由一次 join 的筛选后,数据量急速下降至 16.5G。若是我们能将源表的 forward shuffle 省去,作业整体执行效率又能前进一大步。

惋惜的是,multiple input operator 也不能笼罩 source shuffle 的场景,这是由于 source 差别于其它任何算子,它没有任何输入。Flink 1.12 为此给 operator chaining 新增了 source chaining 功效,将不被 shuffle 阻隔的 source 合并到 operator chaining 中,省去了 source 与下游算子之间的 forward shuffle。

现在仅有 FLIP-27 source 以及 multiple input operator 可以行使 source chaining 功效,不外这已经足够解决本文中的优化场景。

连系 multiple input operator 与 source chaining 之后,图 7 展示了本文优化案例的最终执行方案。

图 7 - 优化后的执行方案

TPC-DS 测试效果

Multiple input operator 与 source chaining 对大部门作业,特别是批作业有显著的优化效果。我们行使 TPC-DS 测试集对 Flink 1.12 的整体性能举行了测试,与 Flink 1.10 宣布的 12267s 总用时相比,Flink 1.12 的总用时仅为 8708s,缩短了近 30% 的运行时间!

图 8 - TPC-DS 测试集总用时对比

图 9 - TPC-DS 部门测试点用时对比

未来设计

通过 TPC-DS 的测试效果看到,source chaining multiple input 能够给我们带来很大的性能提升。现在整体框架已完成,常用批算子已支持消除冗余 exchange 的推导逻辑,后续我们将支持更多的批算子和更精致的推导算法。

流作业的数据 shuffle 虽然不需要像批作业一样将数据写入磁盘,但将网络传输变为内存传输带来的性能提升也是异常可观的,因此流作业支持 source chaining multiple input 也是一个异常令人期待的优化。同时,在流作业上支持该优化还需要许多事情,例如流算子上消除冗余 exchange 的推导逻辑暂未支持,一些算子需要重构以消除输入数据是 binary 的要求等等,这也是为什么 Flink 1.12 暂未在流作业中推出推出该优化的缘故原由。后续版本我们将逐步完成这些事情,也希望更多社区的气力加入我们一起尽早的将更多的优化落地。

作者:贺小令、翁才智

发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片