您的位置:网站首页 > php源码 > 正文

是时候学习真正的 spark 技术了

类别:php源码 日期:2018-12-6 4:26:37 人气: 来源:

  spark sql 可以说是 spark 中的精华部分了,我感觉整体复杂度是 spark streaming 的 5 倍以上,现在 spark 主推 structed streaming, spark streaming 的也不积极了, 我们基于 spark 来构建大数据计算任务,重心也要向 DataSet 转移,原来基于 RDD 写的代码迁移过来,好处常大的,尤其是在性能方面,有质的提升, spark sql 中的各种内嵌的性能优化是比人裸写 RDD 遵守各种所谓的最佳实践更靠谱的,尤其对新手来讲, 比如有些最佳实践讲到先 filter 操作再 map 操作,这种 spark sql 中会自动进行谓词下推,比如尽量避免使用 shuffle 操作,spark sql 中如果你了相关的配置,会自动使用 broadcast join 来小表,把 shuffle join 为 map join 等等,真的能让我们省很多心。

  spark sql 的代码复杂度是问题的本质复杂度带来的,spark sql 中的 Catalyst 框架大部分逻辑是在一个 Tree 类型的数据结构上做各种,基于 scala 来实现还是很优雅的,scala 的偏函数和强大的 Case 正则匹配,让整个代码看起来还是清晰的, 这篇文章简单的描述下 spark sql 中的一些机制和概念。

  提到的 sessionState 是一个很关键的东西,了当前 session 使用的所有的状态数据,有以下各种需要的东西:

  在 Dataset 上进行 transformations 操作就会生成一个元素为 LogicalPlan 类型的树形结构, 我们来举个例子,假如我有一张学生表,一张分数表,需求是统计所有大于 11 岁的学生的总分。

  这个 queryExecution 就是整个执行计划的执行引擎, 里面有执行过程中,各个中间过程变量,整个执行流程如下

  那么我们例子中的 sql 语句经过 Parser 解析后就会变成一个抽象语法树,对应解析后的逻辑计划 AST 为

  我们可以看到过滤条件变为了 Filter 节点,这个节点是 UnaryNode 类型, 也就是只有一个孩子,两个表中的数据变为了 UnresolvedRelation 节点,这个节点是 LeafNode 类型, 顾名思义,叶子节点, JOIN 操作就表位了 Join 节点, 这个是一个 BinaryNode 节点,有两个孩子。

  这些 operator 组成的抽象语法树就是整个 Catatyst 优化的基础,Catatyst 优化器会在这个树进行各种,把树的节点挪来挪去来进行优化。

  现在经过 Parser 有了抽象语法树,但是并不知道 score,sum 这些东西是啥,所以就需要 analyer 来定位, analyzer 会把 AST 上所有 Unresolved 的东西都转变为 resolved 状态,sparksql 有很多resolve 规则,都很好理解,例如 ResolverRelations 就是解析表(列)的基本类型等信息,ResolveFuncions 就是解析出来函数的基本信息,比如例子中的sum 函数,ResolveReferences 可能不太好理解,我们在 sql 语句中使用的字段比如 Select name 中的 name 对应一个变量, 这个变量在解析表的时候就作为一个变量(Attribute 类型)存在了,那么 Select 对应的 Project 节点中对应的相同的变量就变成了一个引用,他们有相同的 ID,所以经过 ResolveReferences 处理后,就变成了 AttributeReference 类型 ,在最后真正加载数据的时候他们被赋予相同的值,就跟我们写代码的时候定义一个变量一样,这些 Rule 就反复作用在节点上,指定树世界上最龌龊的种族节点趋于稳定,当然优化的次数多了会浪费性能,所以有的 rule 作用 Once, 有的 rule 作用 FixedPoint, 这都是要取舍的。好了, 不说废话,我们做个小实验。

  我们再使用 ResolveReferences 来搞一下,你会发现上层节点中的相同的字段都变成了拥有相同 ID 的引用,他们的类型都是 AttibuteReference。最终所有的 rule 都应用后,整个 AST 就变为了

  sparksql 中的逻辑优化种类繁多,spark sql 中的 Catalyst 框架大部分逻辑是在一个 Tree 类型的数据结构上做各种,基于 scala 来实现还是很优雅的,scala 的偏函数 和 强大的 Case 正则匹配,让整个代码看起来还是清晰的,废话少说,我们来搞个小实验。

  使用 PushPredicateThroughJoin 把一个单单对 stu 表做过滤的 Filter 给下推到 Join 之前了,会少加载很多数据,性能得到了优化,我们来看下最终的样子。

  做完逻辑优化,毕竟只是抽象的逻辑层,还需要先转换为物理执行计划,将逻辑上可行的执行计划变为 Spark 可以真正执行的计划。

  spark sql 把逻辑节点转换为了相应的物理节点, 比如 Join 算子,Spark 根据不同场景为该算子制定了不同的算法策略,有BroadcastHashJoin、ShuffleHashJoin 以及 SortMergeJoin 等, 当然这里面有很多优化的点,spark 在转换的时候会根据一些统计数据来智能选择,这就涉及到基于代价的优化,这也是很大的一块,后面可以开一篇文章单讲, 我们例子中的由于数据量小于 10M, 自动就转为了 BroadcastHashJoin,眼尖的同学可以看到好像多了一些节点,我们来解释下, BroadcastExchange 节点继承 Exchage 类,用来在节点间交换数据,这里的BroadcastExchange 就是会把 LocalTableScan出来的数据 broadcast 到每个 executor 节点,用来做 map-side join。最后的 Aggregate 操作被分为了两步,第一步先进行并行聚合,然后对聚合后的结果,再进行 Final 聚合,这个就类似域名 map-reduce 里面的 combine 和最后的 reduce, 中间加上了一个 Exchange hashpartitioning, 这个是为了相同的 key shuffle 到相同的分区,当前物理计划的 Child 输出数据的 Distribution 达不到要求的时候需要进行Shuffle,这个是在最后的 EnsureRequirement 阶段插入的交换数据节点,在数据库领域里面,有那么一句话,叫得 join 者得天下,我们重点讲一些 spark sql 在 join 操作的时候做的一些取舍。

  Join 操作基本上能上会把两张 Join 的表分为大表和小表,大表作为流式遍历表,小表作为查找表,然后对大表中的每一条记录,根据 Key 来取查找表中取相同 Key 的记录。

  提到 AST 的节点已经转换为了物理节点,这些物理节点最终从头节点递归调用 execute 方法,里面会在 child 生成的 RDD 上调用 transform操作就会产生一个串起来的 RDD 链, 就跟在 spark stremaing 里面在 DStream 递归调用那样。最后执行出来的图如下:

  可以想象到,数据在一个一个的 plan 中流转,然后每个 plan 里面表达式都会对数据进行处理,就相当于经过了一个个小函数的调用处理,这里面就有大量的函数调用开销,那么我们是不是可以把这些小函数内联一下,当成一个大函数,WholeStageCodegen 就是干这事的。

  可以看到最终执行计划每个节点前面有个 * 号,说明整段代码生成被启用,在我们的例子中,Filter, Project,BroadcastHashJoin,Project,HashAggregate 这一段都启用了整段代码生成,级联为了两个大函数,有兴趣可以使用egen 看成后的代码长什么样子。然而 Exchange 算子并没有实现整段代码生成,因为它需要通过网络发送数据。

  我今天的分享就到这里,其实 spark sql 里面有很多有意思的东西,但是因为问题的本质复杂度,导致需要高度抽象才能把这一切理顺,这样就给代码阅读者带来了理解困难, 但是你如果真正看进去了,就会有很多收获。如果对本文有任何见解,欢迎在文末留言说出你的想法。返回搜狐,查看更多

  本文由来源于325棋牌 325游戏中心唯一官方网站

关键词:spark源码分析
0
0
0
0
0
0
0
0
下一篇:没有资料

相关阅读

网友评论 ()条 查看

姓名: 验证码: 看不清楚,换一个

推荐文章更多

热门图文更多

最新文章更多

关于联系我们 - 广告服务 - 友情链接 - 网站地图 - 版权声明 - 人才招聘 - 帮助

CopyRight 2002-2012 技术支持 源码吧 FXT All Rights Reserved

赞助合作: