1 业务背景
随着大资料的快速发展,业务场景越来越复杂,离线式的批处理框架MapReduce已经不能满足业务,大量的场景需要实时的资料处理结果来进行分析、决策。Spark Streaming是一种分散式的大资料实时计算框架,他提供了动态的,高吞吐量的,可容错的流式资料处理,不仅可以实现使用者行为分析,还能在金融、舆情分析、网络监控等方面发挥作用。个推开发者服务——讯息推送“应景推送”正是应用了Spark Streaming技术,基于大资料分析人群属性,同时利用LBS地理围栏技术,实时触发精准讯息推送,实现使用者的精细化运营。此外,个推在应用Spark Streaming做实时处理kafka资料时,采用Direct模式代替Receiver模式的手段,实现了资源优化和程式稳定性提升。本文将从Spark Streaming获取kafka资料的两种模式入手,结合个推实践,带你解读Receiver和Direct模式的原理和特点,以及从Receiver模式到Direct模式的优化对比。
2 两种模式的原理和区别
Receiver模式1. Receiver模式下的执行架构

InputDStream: 从流资料来源接收的输入资料。Receiver:负责接收资料流,并将资料写到本地。Streaming Context:代表SparkStreaming,负责Streaming层面的任务排程,生成jobs传送到Spark engine处理。Spark Context: 代表Spark Core,负责批处理层面的任务排程,真正执行job的Spark engine。2. Receiver从kafka拉取资料的过程

该模式下:
在executor上会有receiver从kafka接收资料并存储在Spark executor中,在到了batch时间后触发job去处理接收到的资料,1个receiver占用1个core;为了不丢资料需要开启WAL机制,这会将receiver接收到的资料写一份备份到第三方系统上(如:HDFS);receiver内部使用kafka High Level API去消费资料及自动更新offset。Direct模式
1. Direct模式下的执行架构
与receiver模式类似,不同在于executor中没有receiver元件,从kafka拉去资料的方式不同。
2. Direct从kafka拉取资料的过程

该模式下:
没有receiver,无需额外的core用于不停地接收资料,而是定期查询kafka中的每个partition的最新的offset,每个批次拉取上次处理的offset和当前查询的offset的范围的资料进行处理;为了不丢资料,无需将资料备份落地,而只需要手动储存offset即可;内部使用kafka simple Level API去消费资料, 需要手动维护offset,kafka zk上不会自动更新offset。Receiver与Direct模式的区别
前者在executor中有Receiver接受资料,并且1个Receiver占用一个core;而后者无Receiver,所以不会暂用core;前者InputDStream的分割槽是 num_receiver *batchInterval/blockInteral,后者的分割槽数是kafka topic partition的数量。Receiver模式下num_receiver的设定不合理会影响效能或造成资源浪费;如果设定太小,并行度不够,整个链路上接收资料将是瓶颈;如果设定太多,则会浪费资源;前者使用zookeeper来维护consumer的偏移量,而后者需要自己维护偏移量;为了保证不丢失资料,前者需要开启WAL机制,而后者不需要,只需要在程式中成功消费完资料后再更新偏移量即可。
3 Receiver改造成Direct模式
个推使用Spark Streaming做实时处理kafka资料,先前使用的是receiver模式;receiver有以下特点:
receiver模式下,每个receiver需要单独占用一个core;为了保证不丢失资料,需要开启WAL机制,使用checkpoint储存状态;当receiver接受资料速率大于处理资料速率,导致资料积压,最终可能会导致程式挂掉。由于以上特点,receiver模式下会造成一定的资源浪费;使用checkpoint储存状态, 如果需要升级程式,则会导致checkpoint无法使用;第3点receiver模式下会导致程式不太稳定;并且如果设定receiver数量不合理也会造成效能瓶颈在receiver。为了优化资源和程式稳定性,应将receiver模式改造成direct模式。
修改方式如下:
1. 修改InputDStream的建立
将receiver的:
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
改成direct的:
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
2. 手动维护offset
receiver模式程式码:
(receiver模式不需要手动维护offset,而是内部通过kafka consumer high level API 提交到kafka/zk储存)
kafkaStream.map {
...
}.foreachRDD { rdd =>
// 资料处理
doCompute(rdd)
}
direct模式程式码:
directKafkaStream.map {
...
}.foreachRDD { rdd =>
// 获取当前rdd资料对应的offset
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 资料处理
doCompute(rdd)
// 自己实现储存offset
commitOffsets(offsetRanges)
}
4 其他优化点
1. 在receiver模式下:拆分InputDStream,增加Receiver,从而增加接收资料的并行度;调整blockInterval,适当减小,增加task数量,从而增加并行度(在core的数量>task数量的情况下);如果开启了WAL机制,资料的储存级别设定为MOMERY_AND_DISK_SER。资料序列化使用Kryoserializationl,相比Java serializationl 更快,序列化后的资料更小;建议使用CMS垃圾回收器降低GC开销;选择高效能的算子(mapPartitions, foreachPartitions, aggregateByKey等);repartition的使用:在streaming程式中因为batch时间特别短,所以资料量一般较小,所以repartition的时间短,可以解决一些因为topicpartition中资料分配不均匀导致的资料倾斜问题;因为SparkStreaming生产的job最终都是在sparkcore上执行的,所以sparkCore的优化也很重要;BackPressure流控为什么引入Backpressure?当batch processing time>batchinterval 这种情况持续过长的时间,会造成资料在内存中堆积,导致Receiver所在Executor内存溢位等问题;Backpressure:根据JobScheduler反馈作业的执行资讯来动态调整资料接收率;配置使用:spark.streaming.backpressure.enabled
含义: 是否启用 SparkStreaming内部的backpressure机制,
预设值:false ,表示禁用
spark.streaming.backpressure.initialRate
含义: receiver 为第一个batch接收资料时的比率
spark.streaming.receiver.maxRate
含义: receiver接收资料的最大比率,如果设定值spark.streaming.kafka.maxRatePerPartition
含义: 从每个kafka partition中读取资料的最大比率
8. speculation机制
spark内建speculation机制,推测job中的执行特别慢的task,将这些task kill,并重新排程这些task执行。 预设speculation机制是关闭的,通过以下配置引数开启:
spark.speculation=true
注意:在有些情况下,开启speculation反而效果不好,比如:streaming程式消费多个topic时,从kafka读取资料直接处理,没有重新分割槽,这时如果多个topic的partition的资料量相差较大那么可能会导致正常执行更大资料量的task会被认为执行缓慢,而被中途kill掉,这种情况下可能导致batch的处理时间反而变长;可以通过repartition来解决这个问题,但是要衡量repartition的时间;而在streaming程式中因为batch时间特别短,所以资料量一般较小,所以repartition的时间短,不像spark_batch一次处理大量资料一旦repartition则会特别久,所以最终还是要根据具体情况测试来决定。





























