《Designing Data-intensive Application》读书笔记 – 批处理

三种不同类型的系统:

  1. 在线服务 —— 关注响应时间和可用性
  2. 批处理系统(离线系统) —— 性能的衡量标准是吞吐量
  3. 流处理系统(准实时系统) —— 和批处理一样消费输入并产生输出,但延迟更低

Unix的批处理


Unix的哲学

unix哲学可以总结为:

(i) Make each program do one thing well. To do a new job, build afresh rather than complicate old programs by adding new features.

(ii) Expect the output of every program to become the input to another, as yet unknown, program. Don’t clutter output with extraneous information. Avoid stringently columnar or binary input formats. Don’t insist on interactive input.

(iii) Design and build software, even operating systems, to be tried early, ideally within weeks. Don’t hesitate to throw away the clumsy parts and rebuild them.

(iv) Use tools in preference to unskilled help to lighten a programming task, even if you have to detour to build the tools and expect to throw some of them out after you’ve finished using them.

或者可以总结为:

Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.

统一的接口

Unix的程序,都会使用相同的I/O接口,那就是File descriptor。可以用它来表示:

  • 文件系统上的真实文件
  • 到另一个进程(stdin,stdout)的通信通道
  • 设备驱动程序(/dev/audio,/dev/lpo)
  • TCP/IP socket

逻辑分离

Unix工具使用stdin和stdout。默认的话,stdin指向键盘输入,stdout指向屏幕输出。但是可以通过重定向,将stdin和stdout指向文件。而管道可以将一个程序的输入作为另一个程序的输出,从而让中间数据不需要写入磁盘。

因此可以编写自己的程序,读取stdin,并将输出写入到stdout,这样就可以和操作系统提供的工具相结合,加入到数据处理的管道中。

但是stdin和stdout能够做的事情还是有限的。比如如果需要多个输入或者多个输出,处理起来会比较棘手。再比如,没法将输出写入到网络连接中

透明度和实验

Unix工具如此成功的原因:

  • Unix命令的输入被视为不可变的,因此可以安全地随意运行,而不用担心损坏文件
  • 可以随时结束整个管道,并将输出定向到less,从而很方便程序的调试
  • 可以将一个阶段的输出写入到文件,并将此文件作为下一段处理的输入,方便重跑

Unix工具最大的局限是只能运行在单机,所以需要Hadoop这样的工具。

MapReduce和分布式文件系统


MapReduce作业执行

Mapper

Mapper对于每条输入记录会执行一次,并生成任意数量的键值对。因为不会保留一条输入到下一条记录的状态,所以每条记录都是独立处理的。

Reducer

MapReduce框架会拉取Mapper生成的键值对,并将同一个键的值收集到一起,并在这组记录上迭代调用Reducer。

分布式执行MapReduce

MapReduce与Unix管道处理的区别在于,MapReduce可以在多台机器上并行执行。只需要考虑Mapper和Reducer的逻辑,而不用关注并行处理的实现。框架会处理数据移动的复杂问题,其处理的步骤大致如下:

  1. MapReduce框架将代码复制到指定的机器上,并启动Map任务进行处理。读取输入文件,一次将一条记录传给Mapper函数,从而生成输出的键值对。Map任务的数量一般由输入文件的大小决定。
  2. 因为Reducer也可能是多个(由用户参数决定),为了将具有相同键的键值对最终交给指定的Reducer处理,框架会根据键的hash值(或者自定义partition函数)来确定哪个Reducer应该处理对应的键值对。
  3. 每个Mapper都会根据Reducer对输出进行分区,而且输出都是按键值对进行排序。输出文件会写入到本地磁盘(写入hdfs的成本太高,而且可以随时重新运行Map任务)。
  4. Mapper输出结束后,MapReduce调度器会通知Reducer可以从该Mapper读取文件。Reducer通过网络从每个Mapper中下载指定分区的键值对文件。按Reducer进行分区,输出有序文件,然后从Mapper向Reducer复制分区数据,这整个过程被称为shuffle
  5. Reducer从Mapper获取文件,并将文件合并到一起。由于Mapper的输出已经排好序,所以Reducer在处理进行归并排序即可。
  6. Reducer处理的时候,会通过迭代器顺序扫描某个键的所有记录,并生成任意数量的输出。Reducer的输出会写入到hdfs上。

MapReduce工作流

多个MapReduce作业经常需要连接到一起,一个作业的输出作为下一个作业的输入。但是并没有像Unix那样的命令管道。

因此需要有相应的工作流调度系统来管理这些工作流,比如Oozie,Azkaban,Luigi,Airflow等。

Reduce端的Join和Group

Sort-merge JOIN

如果由两张表A和B需要根据User进行JOIN,当Reducer在处理的时候,同一个User的记录需要在一起。而且需要先处理A的记录,再处理B的记录,这种技术也被称为二次排序(secondary sort)。

在Reducer处理的时候,先处理A的记录,并可以保存在局部变量中,再处理剩余的B表记录。这个算法也被称为sort-merge join。

处理倾斜

如果存在热点键,那么会有某个Reducer处理特别多的记录,导致总体完成时间被拖慢。

Pig的skewed join做法,是先通过抽样确定哪些键是热点。当执行join的时候,将被关联的表记录随机分发到Reducer上,而热点键会被广播到这些Reducer中。这样相当于将热点键的关联工作分散到多个Reducer上,通过并行减少总体的执行时间。

Hive的skewed join采取了另一种做法,需要显式地指定热点键,并且这些热点值的记录会单独存放在文件中。当执行join的时候,热点键的关联是通过Map Join。

如果需要做Group By聚合运算,比如group by shop_id,可以将任务分解为两个MapReduce作业,第一个作业运行group by shop_id, random_value,第二个作业再根据上一个聚合结果进行再次聚合。

Map端的Join

broadcast hash joins

MapJoin适合的场景,就是大数据集和小数据集做Join。而且小数据集要足够小,能够全部放入Mapper的内存中,载入一个hash表中。这种方式被称为broadcast hash join。

除了将小表放入内存的hash表中,另一个种做法是将小表放入本地磁盘上的只读索引中。

bucketed hash joins

如果MapJoin的两张表,有同样的分区规则,那么在每个分区内都可以独立地进行hash join。这种方式只适用于join的两张表的分区数相同,且具有相同的分区规则和分区键。

map-side merge joins

如果Join的两张表,不仅具有相同的分区,还基于相同的键做了排序。那么,输入是否能够放入内存就不重要了,可以通过归并操作完成两个表的Join。如果merge join的输入,是前一个MapReduce作业产出,那么这个前提是很容易实现的。

MapReduce工作流上的Map Join

当下游需要使用MapReduce作业的输出时,那么作业使用Reduce端的Join还是Map端的Join,就会影响输出的结构。如果是Reduce端,就会根据关联键进行分区和排序。如果是Map端,输出的排序和分区,会和较大表的方式保持一致。

批处理工作流的输出

建立搜索索引

Google最一开始就是使用MapReduce来构建索引。如果对一组文档执行全文检索,那么批处理是比较一种高效的方式:Mapper负责对文档进行分区,每个Reducer构建该分区的索引,并将索引文件写入到分布式文件系统。

由于查询都是只读的,所以索引文件创建完后就是不可变的。索引更新的时候,可以等待批处理生成新的索引文件,并用新索引文件替换旧索引文件。

输出到KV存储

批处理输出的另一个用途,就是用于Web应用,响应用户的实时查询。而这些Web应用的底层存储一般是单独的KV存储,这就需要通过某种方式将批处理的数据导入到KV存储中。可以在Mapper或者Reducer中,通过客户端逐条写入(假如网络可以打通)。虽然可行,但是并不建议这么做:

  • 每条记录都要发起一次网络请求,吞吐量比batch的方式要慢几个数量级。即使支持批量插入,性能也不会太好。
  • MapReduce的作业会并发执行,很容易就将目标数据库压垮,影响其性能。
  • 通常情况下,MapReduce作业的输出是“All or nothing”,作业成功就会有输出,如果任务失败则不会有输出。中途出错,会进行各种重试操作,不会有任何的副作用。但是如果和外部系统有交互的话,重试就会产生一些副作用,用户就需要感知这个重试,从而做一些适配(比如操作幂等)。

更好的解决方法,是通过MapReduce作业生成目标库兼容的数据文件,再将文件一次性导入到目标库中,替换旧的数据。比如导入HBase的时候,可以先生成HFile,再导入到region server中。

批处理输出的哲学

  • 代码错误不会影响输出数据,因为输入数据是只读的,重新运行会生成新的输出。
  • 由于回滚代码没有副作用,功能开发会快很多,有利于敏捷开发。
  • MapReduce框架会自动重试失败的任务,从而避免短暂的系统问题造成任务失败。
  • 逻辑与配置(任务配置,输出配置等)相分离,让团队更专注于代码逻辑的开发。

Hadoop和MPP的对比

MPP数据库专注于在一批机器上并行执行分析SQL的查询,而MapReduce和HDFS的组合更像是一个可以运行任意程序的通用操作系统。

存储多样性

MPP数据库一般是schema on write,写入的时候就要确定好schema格式。而Hadoop则是schema on read的模式。

处理模型多样性

MPP数据库是紧密结合的单体数据库,在磁盘存储、查询计划、调度执行等方面,都进行了特定需求的定制,从而获取最优的性能。而且通过SQL查询语言访问数据。

但是,并非所有类型的需求都可以转换成SQL,MapReduce不仅支持SQL方式(Hive),也支持别的处理模型,用于机器学习、推荐系统等领域。而且这些不同的处理模型,可以共享同一组集群,并且集群上的文件也不用移动到别的系统。

针对频繁故障的设计

MPP数据库作查询的时候,如果某个节点执行的时候崩溃,那么数据库会中止整个查询,让用户进行重试或者自动重试。由于查询一般都很快,这个重试的代价是可以接受的。MPP数据库还倾向于在内存中多保留数据,从而避免磁盘读取的开销。

MapReduce则允许单个任务的失败,单个任务失败时只会重试这个任务,不会影响整体任务的执行。而MapReduce任务一般都是将数据第一时间写入磁盘,一方面是为了容错,另一方面内存也无法容纳那么大的数据集。

MapReduce作业适用于较大数据集的作业,作业一般运行时间都比较长,如果出错就整体重试的话,会带来很大的浪费。此外,如果MapReduce作业和其他作业一起部署在一个集群,为了提高集群的资源利用率,低优先级的作业随时有可能被抢占资源,这个失败的概率会比硬件故障高一个数量级。

MapReduce之后


物化中间状态

如果一个作业的输出,会被多个作业作为输入,那么将结果写入HDFS是有意义的,让多个作业之间形成松耦合。但是如果多个作业之间是紧密耦合的,那么中间状态的物化就会带来一定的成本,相比于Unix管道会存在某些补足

  • MapReduce作业只有在上游作业全部完成后才能启动,而Unix管道连接的进程会同时启动,只有有输出就会立即被消费。同时,不同机器上的子任务有可能发生倾斜,而下游必须等待长尾任务的完成。
  • 下游任务的Mapper通常是多余的,如果它和上游Reducer的分区和排序保持完全一致的话,那么Reducer可以直接和Reducer串接在一起。
  • 将中间状态存储在HDFS,会被复制多份,对于临时数据来说代价太大了。

数据流引擎

为了解决MapReduce存在的这些问题,产生了一些新的分布式执行引擎,比如Spark、Tez、Flink等,他们都是将整个工作流当作一个作业,而不是拆分成若干个独立子任务。

与MapReduce不同的是,数据流引擎中会定义一些函数,称为算子(operators)。这些函数并没有严格区分出Mapper和Reducer这样的角色,可以更灵活地组合。数据流引擎提供了几种选项,将一个算子的输出连接到另一个算子的输入:

  • 对记录按键进行分区和排序,用于sort-merge join和group by,类似于MapReduce中的shuffle。
  • 可以接受多个输入,并以相同的方式进行分区,但是可以跳过排序。比如hash join的时候,分区很重要,但是顺序不重要。
  • 对于broadcast hash join,需要将一个算子的输出,广播到另一个算子所有分区。

与MapReduce模型相比,有如下几个优点:

  • 排序这类昂贵的工作只会出现在需要的地方,而不是默认都做。
  • 没有不必要的Mapper,Mapper的工作都可以合并到前一个Reducer算子中。
  • 调度可以总揽全局,优化数据的分布,尽可能地本地读取。
  • 算子的输出一般都会写入内存或者本地磁盘,相比于HDFS的IO会少很多。
  • MapReduce中的每个子任务都会新建一个JVM,而这里的JVM可以重新运行新的算子。

容错

完全物化中间状态到HDFS的一个优点,就是具有持久性。即使任务出错,也可以重新启动新的任务,读取相同的输入文件。

而数据流引擎采用了另一种方法来容错:如果一台机器故障,则该机器上的中间状态会丢失,需要从其上游去读取数据重新运算(如果有中间状态的话最好,否则就要一直追溯到原始来源)。

为了实现这种重新运算,框架必须跟踪一个给定的数据是如何计算的:使用了什么输入、应用了什么算子。Spark使用RDD的抽象来跟踪数据的血缘,而Flink可以对算子状态进行存档。

图与迭代处理

MapReduce无法处理PageRank这样的图迭代算法,它的迭代方式如下所述:

  1. 调度程序调用批处理来完成计算的某个步骤
  2. 当批处理完成后,调度会基于某些条件检查是否完成
  3. 如果尚未完成,返回步骤1继续执行

Pregel处理模型

Pregel的背后思想,就是一个顶点可以向另一个顶点发送消息,消息是沿着图的边进行发送。

在每次迭代中,在每个顶点调用函数,函数的入参就是所有发给这个顶点的消息,就像调用Reducer一样。与MapReduce不同的是,顶点会记录状态,所以每次迭代只会处理新的消息。

容错

顶点之间通过消息来进行通信,可以批量发送消息从而提高性能。每次迭代之前,需要等待上轮迭代的消息全部送达。即使底层网络导致消息的重复、丢失、延迟,Pregel的实现也保证了消息在每个顶点只会处理一次。

并行执行

顶点之间发送消息,只知道对方的顶点ID,并不知道其具体的物理机器,所有的分区都是由框架决定的。

但是实践中,很难找到一个最优的分区算法,让顶点之间的跨网络通信尽可能地少。出于这个原因,如果可以将图放入到内存中,那么单机的算法很有可能比分布式的还要快。即使内存中放不下,也可以放入单台机器的磁盘。图的并行执行算法,仍然是一个进行中的研究领域。

发表评论