【编者按】作者Michael G. Noll是的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管。本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中。Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略:通过Spark Contributor、Spark布道者陈超我们了解到,在Spark 1.2版本中,Spark Streaming开始支持fully HA模式(选择使用),通过添加一层WAL(Write Ahead Log),每次收到数据后都会存在HDFS上,从而避免了以前版本中的数据丢失情况,但是不可避免的造成了一定的开销,需要开发者自行衡量。
作为一个实时大数据处理工具,Spark Sreaming近日一直被广泛关注,与Apache Storm的对比也经常出现。但是依我说,缺少与Kafka整合,任何实时大数据处理工具都是不完整的,因此我将一个示例Spark Streaming应用程序添加到kafka-storm-starter,并且示范如何从Kafka读取,以及如何写入到Kafka。在这个过程中,我还使用Avro作为数据格式,以及Twitter Bijection进行数据序列化。
在本篇文章,我将详细地这个Spark Streaming示例;同时,我还会穿插当下Spark Streaming与Kafka整合的一些焦点话题。免责声明:这是我首次试验Spark Streaming,仅作为参考。
这里,我也提供了一个非常简短的对比:对比Spark Streaming,Storm的产业采用更高,生产应用也更稳定。但是从另一方面来说,对比Storm,Spark拥有更清晰、等级更高的API,因此Spark使用起来也更加愉快,最起码是在使用Scala编写Spark应用程序的情况(毫无疑问,我更喜欢Spark中的API)。但是,请别这么直接的相信我的话,多看看的和讲义。
不管是Spark还是Storm,它们都是Apache的项目,当下许多大数据平台提供商也已经开始整合这两个框架(或者其中一个)到其商业产品中,比如Hortonworks就同时整合了Spark和Storm,而Cloudera也整合了Spark。
本文的后续部分将讲述许多Spark和Kafka中的parallelism问题,因此,你需要掌握一些Spark中的术语以弄懂这些环节。
一个Spark集群必然包含了1个以上的工者作节点,又称为从主机(为了简化架构,这里我们先抛弃开集群管理者不谈)。
Executor是一个用于应用程序或者工作者节点的进程,它们负责处理tasks,并将数据保存到内存或者磁盘中。每个应用程序都有属于自己的executors,一个executor则包含了一定数量的cores(也被称为slots)来运行分配给它的任务。
Task是一个工作单元,它将被传送给executor。也就是说,task将是你应用程序的计算内容(或者是一部分)。SparkContext将把这些tasks发送到executors进行执行。每个task都会占用父executor中的一个core(slot)。
,文档)将作为一个长期运行的task跑在一个executor上。每个receiver都会负责一个所谓的input DStream(比如从Kafka中读取的一个输入流),同时每个receiver( input DStream)占用一个core/slot。
input DStream:input DStream是DStream的一个类型,它负责将Spark Streaming连接到外部的数据源,用于读取数据。对于每个外部数据源(比如Kafka)你都需要配置一个input DStream。一个Spark Streaming会通过一个input DStream与一个外部数据源进行连接,任何后续的DStream都会建立标准的DStreams。
在Spark的执行模型,每个应用程序都会获得自己的executors,它们会支撑应用程序的整个流程,并以多线个以上的tasks,这种隔离途径非常类似Storm的执行模型。一旦引入类似YARN或者Mesos这样的集群管理器,整个架构将会变得异常复杂,因此这里将不会引入。你可以通过Spark文档中的Cluster Overview了解更多细节。
Spark代码库中的KafkaWordCount对于我们来说是个非常好的起点,但是这里仍然存在一些式问题。
在完成这些操作时,我同样碰到了Spark Streaming和/或Kafka中一些已知的问题,这些问题大部分都已经在Spark mailing list中列出。在下面,我将详细总结Kafka集成到Spark的现状以及一些常见问题。
Kafka将数据存储在话题中,每个话题都包含了一些可配置数量的分区。话题的分区数量对于性能来说非常重要,而这个值一般是消费者parallelism的最大数量:如果一个话题拥有N个分区,那么你的应用程序最大程度上只能进行N个线程的并行,最起码在使用Kafka内置Scala/Java消费者API时是这样的。
与其说应用程序,不如说Kafka术语中的消费者群(consumer group)。消费者群,通过你选择的字符串识别,它是逻辑消费者应用程序集群范围的识别符。同一个消费者群中的所有消费者将分担从一个指定Kafka话题中的读取任务,同时,同一个消费组中所有消费者从话题中读取的线程数最大值即是N(等同于分区的数量),多余的线程将会闲置。
多个不同的Kafka消费者群可以并行的运行:毫无疑问,对同一个Kafka话题,你可以运行多个的逻辑消费者应用程序。这里,每个逻辑应用程序都会运行自己的消费者线程,使用一个唯一的消费者群id。而每个应用程序通常可以使用不同的read parallelisms(见下文)。当在下文我描述不同的方式配置read parallelisms时,我指的是如何完成这些逻辑消费者应用程序中的一个设置。
这里我们不妨看一下现实应用中的复杂性——Kafka中的再平衡事件。在Kafka中,再平衡是个生命周期事件(lifecycle event),在消费者加入或者离开消费者群时都会触发再平衡事件。这里我们不会进行详述,更多再平衡详情可参见我的Kafka training deck一文。
你的应用程序使用消费者群id“terran”,并且从1个线程开始,这个线个分区中进行读取。在运行时,你逐渐将线个。也就是说,在同一个消费者群中,parallelism突然发生了变化。毫无疑问,这将造成Kafka中的再平衡。一旦在平衡结束,你的14个线个线个分区的读取工作,剩余的4个将会被闲置。因此如你想象的一样,初始线程以后只会读取一个分区中的内容,将不会再读取其他分区中的数据。
现在,我们终于对话题、分区有了一定的理解,而分区的数量将作为从Kafka读取时parallelism的上限。但是对于一个应用程序来说,这种机制会产生一个什么样的影响,比如一个Spark Streaming job或者 Storm topology从Kafka中读取数据作为输入。
1. Read parallelism:通常情况下,你期望使用N个线程并行读取Kafka话题中的N个分区。同时,鉴于数据的体积,你期望这些线程跨不同的NIC,也就是跨不同的主机。在Storm中,这可以通过TopologyBuilder#setSpout()设置Kafka spout的parallelism为N来实现。在Spark中,你则需要做更多的事情,在下文我将详述如何实现这一点。
2. Downstream processing parallelism:一旦使用Kafka,你希望对数据进行并行处理。鉴于你的用例,这种等级的parallelism必然与read parallelism有所区别。如果你的用例是计算密集型的,举个例子,对比读取线程,你期望拥有更多的处理线程;这可以通过从多个读取线程shuffling或者网“nning out”数据到处理线程实现。因此,你通过增长网络通信、序列化开销等将访问交付给更多的cores。在Storm中,你通过shuffle grouping将Kafka spout shuffling到下游的bolt中。在Spark中,你需要通过DStreams上的repartition转换来实现。
通常情况下,大家都渴望去耦从Kafka的parallelisms读取,并立即处理读取来的数据。在下一节,我将详述使用 Spark Streaming从Kafka中的读取和写入。
2. Input DStreams上的消费者线程数量。这里,相同的receiver(=task)将运行多个读取线程。这也就是说,读取操作在每个core/machine/NIC上将并行的进行。
为什么会这样?首先以及最重要的,从Kafka中读取通常情况下会受到网络/NIC,也就是说,在同一个主机上你运行多个线程不会增加读的吞吐量。另一方面来讲,虽然不经常,但是有时候从Kafka中读取也会CPU瓶颈。其次,如果你选择第二个选项,多个读取线程在将数据推送到blocks时会出现锁竞争(在block生产者实例上,BlockGenerator的“+=”方法真正使用的是“synchronized”方式)。
在这个例子中,我没有提到每个input DSream会建立多少个线程。在这里,线程的数量可以通过KafkaUtils.createStream方法的参数设置(同时,input topic的数量也可以通过这个方法的参数指定)。在下一节中,我们将通过实际操作展示。
但是在开始之前,在这个步骤我先解释几个Spark Streaming中常见的几个问题,其中有些因为当下Spark中存在的一些引起,另一方面则是由于当下Kafka input DSreams的一些设置造成:
然后,你将会碰到另一个坑——如果你的receiver宕机(OOM,亦或是硬件故障),你将停止从Kafka接收消息。
这里,我们需要对“停止从Kafka中接收”问题做一些解释。当下,当你通过ssc.start()你的streams应用程序后,处理会开始并一直进行,即使是输入数据源(比如Kafka)变得不可用。也就是说,流不能检测出是否与上游数据源失去链接,因此也不会对丢失做出任何反应,举个例子来说也就是重连或者结束执行。类似的,如果你丢失这个数据源的一个receiver,那么你的流应用程序可能就会生成一些空的RDDs。
这是一个非常糟糕的情况。最简单也是最粗糙的方法就是,在与上游数据源断开连接或者一个receiver失败时,重启你的流应用程序。但是,这种解决方案可能并不会产生实际效果,即使你的应用程序需要将Kafka配置选项auto.offset.reset设置到最小——因为Spark Streaming中一些已知的bug,可能导致你的流应用程序发生一些你意想不到的问题,在下文Spark Streaming中常见问题一节我们将详细的进行介绍。
KafkaUtils.createStream方法被重载,因此这里有一些不同方法的特征。在这里,我们会选择Scala派生以获得最佳的控制。
我们建立了5个input DStreams,它们每个都会运行一个消费者线程。如果“zerg.hydra”topic拥有5个分区(或者更少),那么这将是进行并行读取的最佳途径,如果你在意系统最大吞吐量的话。
在之前的章节中,我们覆盖了从Kafka的并行化读取,那么我们就可以在Spark中进行并行化处理。那么这里,你必须弄清楚Spark本身是如何进行并行化处理的。类似Kafka,Spark将parallelism设置的与(RDD)分区数量有关,通过在每个RDD分区上运行task进行。在有些文档中,分区仍然被称为“slices”。
1. input DStreams的数量,也就是说,我们在之前章节中read parallelism的数量作为结果。这是我们的立足点,这样一来,我们在下一个步骤中既可以保持原样,也可以进行修改。
一个DStream转换相关是union。这个方法同样在StreamingContext中,它将从多个DStream中返回一个统一的DStream,它将拥有相同的类型和滑动时间。通常情况下,你更愿意用StreamingContext的派生。一个union将返回一个由Union RDD支撑的UnionDStream。Union RDD由RDDs统一后的所有分区组成,也就是说,如果10个分区都联合了3个RDDs,那么你的联合RDD实例将包含30个分区。换句话说,union会将多个 DStreams压缩到一个 DStreams或者RDD中,但是需要注意的是,这里的parallelism并不会发生改变。你是否使用union依赖于你的用例是否需要从所有Kafka分区进行“in one place”信息获取决定,因此这里大部分都是基于语义需求决定。举个例子,当你需要执行一个不用元素上的(全局)计数。
你的用例将决定需要使用的方法,以及你需要使用哪个。如果你的用例是CPU密集型的,你希望对zerg.hydra topic进行5 read parallelism读取。也就是说,每个消费者进程使用5个receiver,但是却可以将processing parallelism提升到20。
通用的输出操作者都包含了一个功能(函数),让每个RDD都由Stream生成。这个函数需要将每个RDD中的数据推送到一个外部系统,比如将RDD保存到文件,或者通过网络将它写入到一个数据库。需要注意的是,这里的功能函数将在驱动中执行,同时其中通常会伴随RDD行为,它将会促使流RDDs的计算。
在这里,大家去阅读Spark文档中的Design Patterns for using foreachRDD一节,它将详细使用foreachRDD读外部系统中的一些常用推荐模式,以及经常出现的一些陷阱。
在我们这个例子里,我们将按照推荐来重用Kafka生产者实例,通过生产者池跨多个RDDs/batches。 我通过Apache Commons Pool实现了这样一个工具,已经上传到GitHub。这个生产者池本身通过broadcast variable提供给tasks。
需要注意的是, Spark Streaming每分钟都会建立多个RDDs,每个都会包含多个分区,因此你无需为Kafka生产者实例建立新的Kafka生产者,更不用说每个Kafka消息。的步骤将最小化Kafka生产者实例的建立数量,同时也会最小化TCP连接的数量(通常由Kafka集群确定)。你可以使用这个池设置来精确地控制对流应用程序可用的Kafka生产者实例数量。如果存在疑惑,尽量用更少的。
下面的代码是示例Spark Streaming应用程序的要旨(所有代码参见这里)。这里,我做一些解释:
就我自己而言,我非常喜欢 Spark Streaming代码的简洁和表述。在Bobby Evans和 Tom Graves讲话中没有提到的是,Storm中这个功能的等价代码常繁琐和低等级的:kafka-storm-starter中的KafkaStormSpec会运行一个Stormtopology来执行相同的计算。同时,规范文件本身只有非常少的代码,当然是除下说明语言,它们能更好的帮助理解;同时,需要注意的是,在Storm的Java API中,你不能使用上文Spark Streaming 示例中所使用的匿名函数,比如map和foreach步骤。取而代之的是,你必须编写完整的类来获得相同的功能,你可以查看AvroDecoderBolt。这感觉是将Spark的API转换到Java,在这里使用匿名函数常痛苦的。
最后,我同样也非常喜欢Spark的说档,它非常适合初学者查看,甚至还包含了一些进阶使用。关于Kafka整合到Spark,上文已经基本介绍完成,但是我们仍然需要浏览mailing list和深挖源代码。这里,我不得不说,帮助文档的同学做的实在是太棒了。
另一方面,Spark Streaming中一些问题是因为Spark本身的固有问题导致,特别是故障发生时的数据丢失问题。换句话说,这些问题让你不想在生产中使用Spark。
在Spark 1.1版本的驱动中,Spark并不会考虑那些已经接收却因为种种原因没有进行处理的元数据(
)。因此,在某些情况下,你的Spark可能会丢失数据。Tathagata Das指出驱动恢复问题会在Spark的1.2版本中解决,当下已经。
1.1版本中的Kafka连接器是基于Kafka的高等级消费者API。这样就会造成一个问题,Spark Streaming不可以依赖其自身的KafkaInputDStream将数据从Kafka中重新发送,从而无决下游数据丢失问题(比如Spark服务器发生故障)。
有些人甚至认为这个版本中的Kafka连接器不应该投入生产使用,因为它是基于Kafka的高等级消费者API。取而代之,Spark应该使用简单的消费者API(就像Storm中的Kafka spout),它将允许你控制便宜和分区分配确定性。
但是当下Spark社区已经在致力这些方面的改善,比如Dibyendu Bhattacharya的Kafka连接器。后者是Apache Storm Kafka spout的一个端口,它基于Kafka所谓的简单消费者API,它包含了故障发生情景下一个更好的重放机制。
即使拥有如此多志愿者的努力,Spark团队更愿意非特殊情况下的Kafka故障恢复策略,他们的目标是“在所有转换中提供强,通用的策略”,这一点非常难以理解。从另一个角度来说,这是浪费Kafka本身的故障恢复策略。这里确实难以抉择。
Spark的Kafka消费者参数auto.offset.reset的使用同样与Kafka的策略不同。在Kafka中,将auto.offset.reset设置为最小是消费者将自动的将offset设置为最小offset,这通常会发生在两个情况:第一,在ZooKeeper中不存在已有offsets;第二,已存在offset,但是不在范围内。而在Spark中,它会始终删除所有的offsets,并从头开始。这样就代表着,当你使用auto.offset.reset = smallest重启你的应用程序时,你的应用程序将完全重新处理你的Kafka应用程序。更多详情可以在下面的两个讨论中发现:12。
Spark-1341:用于控制Spark Streaming中的数据传输速度。这个能力可以用于很多情况,当你已Kafka引起问题所烦恼时(比如auto.offset.reset所造成的),然后可能让你的应用程序重新处理一些旧数据。但是鉴于这里并没有内置的传输速率控制,这个功能可能会导致你的应用程序过载或者内存不足。
在这些故障处理策略和Kafka聚焦的问题之外之外,扩展性和稳定性上的关注同样不可忽视。再一次,仔细观看Bobby和Tom的视频以获得更多细节。在Spark使用经验上,他们都永远比我更丰富。
当然,我也有我的评论,在 G1 garbage(在Java 1.7.0u4+中) 上可能也会存在问题。但是,我从来都没碰到这个问题。
在我实现这个示例的代码时,我做了一些重要的笔记。虽然这不是一个全面的指南,但是在你开始Kafka整合时可能发挥一定的作用。它包含了Spark Streaming programming guide中的一些信息,也有一些是来自Spark用户的mailing list。
当你建立你的Spark时,对Spark使用的cores数量配置需要特别投入精力。你必须为Spark配置receiver足够使用的cores(见下文),当然实际数据处理所需要的cores的数量也要进行配置。在Spark中,每个receiver都负责一个input DStream。同时,每个receiver(以及每个input DStream) occies一个core,这样做是服务于每个文件流中的读取(详见文档)。举个例子,你的作业需要从两个input streams中读取数据,但是只访问两个cores,这样一来,所有数据都只会被读取而不会被处理。
你可以使用 broadcast variables在不同主机上共享标准、只读参数,相关细节见下文的优化指导。在示例作业中,我使用了broadcast variables共享了两个参数:第一,Kafka生产者池(作业通过它将输出写入Kafka);第二,encoding/decoding Avro数据的注入(从Twitter Bijection中)。Passing functions to Spark。
你可以使用累加器参数来流作业上的所有全局“计数器”,这里可以对照Hadoop作业计数器。在示例作业中,我使用累加器分别计数所有消费的Kafka消息,以及所有对Kafka的写入。如果你对累加器进行命名,它们同样可以在Spark UI上展示。
确定你理解作业中的运行时影响,如果你需要与外部系统通信,比如Kafka。在使用foreachRDD时,你应该阅读中
中的Design Patterns一节。举个例子,我的用例中使用Kafka产生者池来优化 Spark Streaming到Kafka的写入。在这里,优化意味着在多个task享同一个生产者,这个操作可以显著地减少由Kafka集群建立的新TCP连接数。
。我的例子就使用了Kryo和注册器,举个例子,使用Kryo生成的Avro-generated Java类(见
通过将spark.streaming.unpersist设置为true将Spark Streaming 作业设置到明确持续的RDDs。这可以显著地减少Spark在RDD上的内存使用,同时也可以改善GC行为。(点击访问
通过MEMORY_ONLY_SER开始你的储存级别P&S测试(在这里,RDD被存储到序列化对象,每个分区一个字节)。取代反序列化,这样做更有空间效率,特别是使用Kryo这样的高速序列化工具时,但是会增加读取上的CPU密集操作。这个优化对 Spark Streaming作业也非常有效。对于本地测试来说,你可能并不想使用*_2派生(2=复制因子)。
总体来说,我对我的初次Spark Streaming体验非常满意。当然,在Spark/Spark Streaming也存在一些需要特别指明的问题,但是我肯定Spark社区终将解决这些问题。在这个过程中,得到了Spark社区积极和热情的帮助,同时我也非常期待Spark 1.2版本的新特性。
在大型生产中,基于Spark还需要一些TLC才能达到Storm能力,这种情况我可能将它投入生产中么?大部分情况下应该不会,更准确的说应该是现在不会。那么在当下,我又会使用Spark Streaming做什么样的处理?这里有两个想法,我认为肯定存在更多:
它可以非常快的原型数据流。如果你因为数据流太大而扩展性问题,你可以运行 Spark Streaming,在一些样本数据或者一部分数据中。
搭配使用Storm和Spark Streaming。举个例子,你可以使用Storm将原始、大规模输入数据处理到易管理等级,然后使用Spark Streaming来做下一步的分析,因为后者可以开箱即用大量有趣的算法、计算指令和用例。
推荐:
网友评论 ()条 查看