错误容忍机制
在使用Spark大数据处理的过程中,有时候会出现一些软硬件故障导致任务执行失败和数据丢失,这时候就需要设计一个容错机制来解决以下问题:
- 硬盘网络问题、节点挂了、内容不够等问题从而IO异常、响应超时等导致任务执行失败
- 节点挂了导致中间数据丢失
Spark容错就两个办法,错了重算机制和checkpoint
重算机制
需要注意的从哪里开始重算?
- 对于下游stage的task重算,由于spark有”延迟删除策略“,上游stage的Shuffle Write的数据不会删除,可以直接Shuffle Read,不用重新计算
- 对于已经缓存的RDD不需要再次计算,如果缓存丢了就要全部重算了
checkpoint
为什么要设计checkpoint?
checkpoint大家都知道是spark中的重中之重,当一个spark任务非常复杂,上百个RDD要运行个几个小时。即使使用在数据缓存机制中学过的cache和persist持久化,还是有可能因为一些原因比如节点故障导致丢失
对什么数据使用checkpoint?
试想一下,如果一个关键的RDD关联了很多数据,并且他的计算链很长,数据丢失后需要重新计算,这种情况就需要对这个RDD设置checkpoint
也就是说,关联数据很多、计算链很长、多次复用的RDD
checkpoint原理:
为了搞清楚原理,首先要知道数据存到哪里
- 我们使用checkpoint的目的是为了持久化数据,为了在节点挂掉的时候恢复数据对吧,所以数据必须要被可靠的存储,所以一般我们存到hdfs中
那么设计一个checkpoint机制,我们很容易想到之前我们在数据缓存机制中学过的数据缓存方法:对于每个需要缓存的RDD,每计算出一个record,就把它缓存到内存或者磁盘中
但是,数据是要存到hdfs中的,并且一般要复制三份跨节点存储,写入时延又高,就太影响效率了
Spark也没有很好的解决方案,就是先过一遍job,等完成job后再重启一次job,算到要持久化的rdd把数据写入hdfs。这里可以在checkpoint之前先对rdd缓存,这样checkpoint就可以直接把缓存数据写入hdfs,就不用重新算rdd了
使用checkpoint的方法
- 首先设置checkpoint路径,一般是hdfs
sc.setCheckpointDir("HDFS://...")
- 对需要checkpoint的rdd先缓存,这一步可省略,
rdd.cache()
- 然后对需要的rdd设置即可,
rdd.checkpoint()
checkpoint和数据缓存机制的区别
为了更好的学习吸收,这里对比一下checkpoint和之前学到的数据缓存机制的区别
- 目的不同。缓存是为了加速计算,加速后续的job;而checkpoint是为了job失败后快速恢复
- 存储不同。缓存主要用内存,偶尔用磁盘。checkpoint为了可靠存储,主要用hdfs
- 速度不同。缓存速度较快,checkpoint写入速度较慢,为了不影响当前job,在job完成后会另起一个job
- lineage不同。缓存后的rdd丢失了还能找回来;checkpoint后会直接切断该RDD的lineage,因为已经可靠存储了
- 场景不同。缓存用于多次读取、占用空间不是特别大的rdd;checkpoint用于关联数据多、计算链长、多次复用的rdd