Magnet: Push-based Shuffle Service for Large-scale Data Processing

本文是阅读 LinkedIn 公司2020年发表的论文 Magnet: Push-based Shuffle Service for Large-scale Data Processing 一点笔记 。
什么是Shuffle

Magnet: Push-based Shuffle Service for Large-scale Data Processing

文章插图
以上图为例,在一个DAG的执行图中,节点与节点之间的数据交换就是Shuffle的过程 。虽然Shuffle的过程很简单,但是不同的引擎有不同的实现 。以shuffle数据传输的介质来看
  • 有基于磁盘的shuffle,例如Map/Reduce,Spark,Flink Batch中,上下游之前的数据都是需要落盘后来进行传输,这类通常是离线处理框架,对延迟不敏感,基于磁盘更加可靠稳定 。
  • 有基于内存的pipeline模式的shuffle方案,例如Presto/Flink Streaming中,主要是对时延比较敏感的场景,基于内存Shuffle,通过网络rpc直接传输内存数据
而基于本地磁盘的Shuffle实现中又有很多种不同的实现
  • 有基于Hash的方案,每个map端的task为每个reduce task 产生一个 shuffle文件
  • 有基于Sort方案,每个map端的task按照 partitionId + hash(key) 排序,并最终merge成一个文件以及一个index文件,在reduce端读取时根据每个task的index文件来读取相应segment的数据
以部署方式来看
  • 有基于worker的本地shuffle的方案,直接通过worker来提供读写的功能
  • 有基于external shuffle的实现,通常托管于资源管理框架,在Yarn框架中就可以实现这种辅助服务,这样就可以及时的释放worker计算资源
  • 有基于Remote shuffle的实现,在云计算时代逐渐成为主流,因为其存算分离的架构往往能带来更好的可扩展性并且网络带宽的提高使得co-locate_也许_不再那么重要 。
Spark Shuffle实现
Magnet: Push-based Shuffle Service for Large-scale Data Processing

文章插图
这里再大致介绍下spark原生的external sort shuffle的详细流程
  1. 每个spark executor启动后和本地节点的external shuffle service注册,同一个机器的多个executor会共享这个机器上的shuffle service服务 。
  2. map stage处理完数据之后会产出两个文件 shuffle data 和 index文件,map task会按照partition key 来进行排序,属于同一个reduce 的数据作为一个Shuffle Block,而index文件中则会记录不同的Shuffle Block 之间的边界offset,辅助下游读取
  3. 当下游reduce task开始运行,首先会查询Spark driver 得到input shuffle blocks的位置信息,然后开始和spark ESS建立链接开始读取数据,读取数据时就会根据index文件来skip读取自己task那个shuffle blocks

    经验总结扩展阅读