查看原文
其他

一篇文带你快速起步Apache Storm(附教程+赠书)

2017-07-11 杜亦舒 DBAplus社群


作者介绍

杜亦舒创业中,技术合伙人,喜欢研究分享技术。个人订阅号:性能与架构。


本文介绍了 Apache Storm 的基本原理和开发方法,包括一个 PDF 和 2 个示例的源码。


内容大纲:

  1. Storm是什么

  2. 应用场景

  3. Storm与Hadoop的关系

  4. Storm怎么用

  5. 示例1:统计单词出现的次数

  6. 核心概念

  7. 集群架构

  8. 示例2:统计通话记录


1

Storm是什么


Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。


Storm 是流式框架,有很高的数据吞吐能力,Strom 本身是无状态的,通过 ZooKeeper 管理分布式集群环境和集群状态。


Strom 的安装和使用都很简单,但功能强大,可以并行地对实时数据流进行各种处理。


2

应用场景


应用 Storm 的场景例如:


  • 日志处理


监控系统中的事件日志,使用 Storm 检查每条日志信息,把符合匹配规则的消息保存到数据库。


  • 电商商品推荐


后台需要维护每个用户的兴趣点,主要基于用户的历史行为、查询、点击、地理信息等信息获得,其中有很多实时数据,可以使用 Storm 进行处理,在此基础上进行精准的商品推荐和放置广告。


3

Storm与Hadoop的关系


Storm 与 Hadoop 都用来处理大数据,那么它们的关系是怎样的呢?


Hadoop 是强大的大数据处理系统,但是在实时计算方面不够擅长;Storm的核心功能就是提供强大的实时处理能力,但没有涉及存储;所以 Storm 与 Hadoop 即不同也互补。


它们的最主要的区别例如:


  1. Storm 是实时流处理模式,Hadoop 是批处理模式;

  2. Storm 就像一条川流不息的河流,只要不是意外或者人为停止,它就会一直运行,Hadoop 是在需要时执行 MapReduce 任务,执行完成后停止;

  3. 在处理时间上,Storm 每秒可以处理数万条消息,HDFS+MapReduce 处理大量数据时通常需要几分钟到几小时。


4

Storm 怎么用


Storm 非常简洁,为了便于理解,先看 2 个最核心的概念:


  1. 数据源头 Spout

  2. 数据处理单元 Bolt


先把 Spout 与外部数据进行对接,这样就可以把外面的数据量引到 Storm 中了。Spout 接收到数据后就会发给 Bolt,这就需要告诉 Bolt 如何处理,处理完成后把数据放到哪儿,例如数据库。


这就是一个最简单的模型:



而其中 Bolt 可以有多个,这就使得 Storm 强大而灵活。



可以看出 Storm 就是一个有拓扑图,点是 Spout 或者 Bolt,边是数据流向。



所以,使用 Storm 时需要做的就是把整个拓扑图构造出来:


  1. 定义数据从哪儿来

  2. 定义数据流向和处理单元的逻辑

  3. 定义数据到哪儿去


5

示例1 统计单词出现的次数


以大数据中的helloworld “词频统计” 为例来学习 Storm 的开发方法。


1、实现思路


构造拓扑结构:


  • Spout 读取一行文本,向下游发送;

  • 把句子分割为单词的 Bolt,从 Spout 发出的数据中取得一行文本,然后分割为一个个的单词,把每个单词向下游发送;

  • 单词计数的 Bolt,接收分词 Bolt 发出的单词,对每个单词出现的次数进行统计,然后把单词及其次数作为一个数据单元向下游发送;

  • 结果输出的 Bolt,接收计数 Bolt 发出的数据单元,取出单词及其次数进行汇总,最后输出统计结果;

  • 创建拓扑对象,把 Spout 和各个 Bolt 连接起来,然后提交到 Storm 中执行。


2、代码实现


创建项目目录 storm-wordcount


项目目录下新建 maven 工程文件 pom.xml



项目根目录下执行 maven 命令,安装依赖

mvn package

mvn dependency:tree


项目根目录下创建源码目录 src/main/java

然后在 java 目录下创建包目录 com/storm/test


现在项目的目录结构如下:



具体代码如下:


SpoutWords.java



BoltSplit.java



BoltCount.java



BoltReport.java



WordCountTopology.java



编译运行



执行后会输出大量日志信息,之后会循环输出单词统计信息,例如:



6

核心概念


Spouts - 数据流的源头


Storm 从外部接收数据,例如 Twitter Streaming API、Kafka ……,通过 Spouts 从这些数据源读取数据。


Bolts - 逻辑处理单元


Spouts 向 Bolts 传递数据,Bolts 接收数据进行处理操作,然后把结果再发射出去。


Bolts 的常见操作例如:过滤、聚合、连接、与数据源数据库交互。


Tuple - 数据单元


Storm 数据流中的数据单元,例如水流中是一滴滴的水滴,Storm 数据流中流淌的就是一个个的 tuple,里面包含着数据。


Stream - 数据流


是一个无序的 Tuple 序列。


Topology - 拓扑


Spouts 和 Bolts 连接起来之后形成了 Topology,其中定义了整个应用的实时处理逻辑。


Topology 是一个有向图,定点是计算单元,边是数据流。


Topology 始于 Spout,Spout 向一个或者多个 Bolt 发射数据,Bolt 拥有处理逻辑,Bolt 的输出可以发射给其它 Bolt 作为它们的输入。Storm 会保持 Topology 一直运行,除非杀掉他。


Tasks - 任务


一个 Task 就是一个 Spout 的执行,或者一个 Bolt 的执行。


Workers - 工作进程


Worker 负责实际运行 Task,Topology 运行在一个分布式环境中的多个工作节点上,Storm 会把 Tasks 均匀的分布在所有 Worker 上。


每个 Worker 都是一个物理 JVM,执行着 Topology 中所有 Task 的一个子集。


Stream Grouping - 流分组


简单理解就是控制 Tuple 的路由,定义 Tuple 在 Topology 中如何流动。每个 Spout 或 Bolt 都会在集群中执行多个任务,每个任务都对应为一个线程的执行,Stream Grouping 定义的就是如何从一个 Task 集合 向其他 Task 集合 发送 tuple。


现在 Storm 中已经有 8 种分组策略,下面看下其中 4 种常用的:


(1)Shuffle Grouping


随机分配,可以让每个 Bolt 获得数量均衡的 tuple。



(2)Field Grouping


field 名字相同的 tuples 会被组织在一起,例如,如果根据 "user-id" 这个 field 分组,相同 "user-id" 的 tuples 将总是去向同一个 Task,其他的 tuples 则去向不同的 Task。



(3)Global Grouping


全局统一分组,所有数据流都流向同一个 Bolt。



(4)All Grouping


向每个实例都发送一次,主要用于发送信号。



回顾一下 wordcount 示例中使用的 grouping 方法



设置第一个 Bolt bolt-split 时使用的是 shuffleGrouping,因为 Spout 发送一行文字,给谁都行,不关心分组,所以使用 shuffleGrouping 随机即可。


设置第二个 Bolt bolt-count 时使用的是 fieldsGrouping,因为 bolt-split 是按单词发射的,所以需要让同一个单词被同一个Task处理,就要使用按字段分组方式。


设置第三个 Bolt bolt-report 时使用的是 globalGrouping 统一分组,因为到这儿就要汇总了,需要接收所有的统计结果。


7

集群架构


Storm cluster 中有2种类型节点:master node、worker nodes。


master node 中运行着一个守护进程,名为 Nimbus,负责向集群中分布代码、分配任务、监控失败状况。每个 worker node 中运行着一个守护进程,名为 Supervisor,负责接收工作任务,根据 Nimbus 的指令来启动或者停止工作进程。


每个工作进程执行一个拓扑的子集,一个拓扑包含多个工作进程,这些工作进程散布在集群里的多台机器中。



Nimbus 与 Supervisors 的协作是通过 ZooKeeper 集群 完成的,Nimbus 与 Supervisors 都是无状态的,状态信息保存在 ZooKeeper 或者本地磁盘中。


Nimbus 与 Supervisors 具有高可靠性,即使通过 kill -9 杀掉他们,也会快速重新启动起来,这使得 Storm Cluster 极其稳定。


上图是 Storm Cluster 的全景图,下面我们看一下细节:



概念总结:

  • Nimbus - Storm Cluster 的主节点,其他节点都是工作节点 worker node,主节点负责任务分配、监控失败等管理工作。

  • Supervisor - 负责接受 Nimbus 分配的任务,一个 Supervisor 会有多个工作进程 worker process,并管理这些工作进程来完成接收到的任务。

  • Worker process - 工作进程,一个工作进程会执行某个特定 Topology 的相关任务,它并不直接自己执行任务,而是创建executors 来执行,一个工作进程可以有多个 executor。

  • Executor - 执行器,就是工作进程创建的一个线程,一个执行器会执行一个或多个任务。

  • Task - 执行实际的数据处理,也就是一个 Spout 或 Bolt。


简单总结一下:

  1. 一个 Storm cluster 中有一个Nimbus 和多个Supervisor;

  2. 一个 Supervisor 下有多个 Worker process;

  3. 一个 Worker process 有多个 Executor;

  4. 一个 Executor 下有多个 Task。


8

示例2: 统计通话记录


1、需求描述


处理通话记录,统计相同呼叫人和被呼叫人的通话次数、通话总时长。


例如通话记录:

# 呼叫者号码,接收者号码,持续时长

1234123402, 1234123401,20

...


统计结果例如:

1234123402-1234123401 : 87,2523

1234123402-1234123404 : 95,2919

...


2、实现思路


构造拓扑图:


  • Spout 读取通话日志,发送给 Bolt,数据包括 from, to, duration

  • Bolt1 接收 Spout 的数据,重新组织数据格式为 from-to, duration

  • Bolt2 接收 Bolt1 的数据,对 from-to 相同的数据进行汇总,累计次数、总时长,在最后输出统计结果

  • Topology 中连接 Spout 与 Bolt1、Bolt2,并提交 Storm 执行


3、具体实现


(1)创建项目目录


在合适的位置创建项目目录 storm-mobile


(2)新建 pom.xml


项目根目录下新建maven工程文件 pom.xml,内容:



项目根目录下创建源码目录 src/main/java

然后在 java 目录下创建包目录 com/storm/test


现在项目的目录结构如下:



(3)安装依赖


项目根目录下执行maven命令

mvn package

mvn dependency:tree


(4)代码


下面是具体代码:


FakeCallLogReaderSpout.java



CallLogCreatorBolt.java



CallLogCounterBolt.java



LogAnalyserStorm.java



(5)编译运行



会输出大量的 log 信息,执行完成后,会在底部附近输出程序的执行结果,类似如下信息:



9

小结


Storm 开发的基本思路:

  • Spout 中与外部数据源对接,然后发送给内部的 Bolt

    Spout 中的主要方法 nextTuple() 被循环调用,在这里面处理数据的接收、发射

  • Bolt 中定义数据处理逻辑,对接收到的数据进行处理

    其主要方法 execute() 每次收到数据时被调用,在这里定义处理逻辑

  • Topology 中把 Spout 和 Bolt 串起来,定义好上下游关系,然后提交到 storm 执行


先把这个最简单的思路理解好,然后在此基础上进行扩充,学习更多的用法就简单了,希望本文可以帮助您快速认识 Storm。


教程下载:如需下载本文,可点击文末【阅读原文】或登录云盘 http://pan.baidu.com/s/1geHiLY7进行下载。


相关专题:


赠书来了

在本文微信订阅号(dbaplus)评论区留下足以引起共鸣的真知灼见,并在本文发布后的隔天中午12点成为点赞数最多的1名可获得以下新书一本~


特别鸣谢博文视点为活动提供图书赞助。


精选专题(官网:dbaplus.cn)

◆  近期热文  ◆  

从0到1,蘑菇街怎样打破应用运维自动化的技术藩篱?

一张思维导图学会如何构建高性能MySQL系统!

承载新美大3万台服务器的云计算基础运维

数据架构选型必读 (第4期) : 6月数据库产品技术解析

一张思维导图纵观MySQL数据安全体系!

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存