突发!员工跳楼!只拿低保工资!央企设计院集体罢工!

突发!北京某院集体罢工!

淄博向东,惠泊向西:在人民与人民币之间,惠泊停车选择了人民币

【少儿禁】马建《亮出你的舌苔或空空荡荡》

10部适合女性看的唯美情色电影

生成图片,分享到微信朋友圈

自由微信安卓APP发布,立即下载! | 提交文章网址
查看原文

一文入门流处理开发

数据社 数据社 2022-07-01

一、Flink介绍

Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。

详细介绍可以参考之前的《flink简介》。

二、部署环境

操作系统环境:

flink支持Linux, Mac OS X, 和 Windows环境部署,本次部署选择Linux环境部署。

JDK:要求Java 7或者更高

三、下载软件

  • jdk1.8.0_144

  • flink-1.4.2-bin-hadoop26-scala_2.11.tgz

四、部署步骤

1、JDK安装步骤此处省略,安装后验证下JDK环境

  1. $ java -version

  2. openjdk version "1.8.0_144"

  3. OpenJDKRuntimeEnvironment(build 1.8.0_144-b01)

  4. OpenJDK64-BitServer VM (build 25.144-b01, mixed mode)

2、安装部署flink 本文介绍flink部署分为两种模式:local,standalone。下面依次介绍这两种模式的部署方式。

找到下载的flink压缩包,进行解压

  1. $ tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz

首先是local模式,最为简单。

  1. $ cd flink-1.4.2

  2. $ bin/start-local.sh

  3. Starting job manager

我们可以通过查看日志确认是否启动成功

  1. $ tailf flink-csap-taskmanager-0-XXXX.log

  2. 2018-05-0310:07:53,718 INFO org.apache.flink.runtime.filecache.FileCache- User file cache uses directory /tmp/flink-dist-cache-4c371de9-0f85-4889-b4d9-4a522641549c

  3. 2018-05-0310:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager- StartingTaskManager actor at akka://flink/user/taskmanager#-524742300.

  4. 2018-05-0310:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager- TaskManager data connection information: 2c358d6f38949f9aae31c5bddb0cc1dc@ LY1F-R021707-VM14.local(dataPort=55234)

  5. 2018-05-0310:07:53,726 INFO org.apache.flink.runtime.taskmanager.TaskManager- TaskManager has 1 task slot(s).

  6. 2018-05-0310:07:53,727 INFO org.apache.flink.runtime.taskmanager.TaskManager- Memory usage stats: [HEAP: 111/1024/1024 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]

  7. 2018-05-0310:07:53,730 INFO org.apache.flink.runtime.taskmanager.TaskManager- Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)

  8. 2018-05-0310:07:53,848 INFO org.apache.flink.runtime.taskmanager.TaskManager- Successful registration at JobManager(akka.tcp://flink@localhost:6123/user/jobmanager), starting network stack and library cache.

  9. 2018-05-0310:07:53,851 INFO org.apache.flink.runtime.taskmanager.TaskManager- Determined BLOB server address to be localhost/127.0.0.1:52382.Starting BLOB cache.

  10. 2018-05-0310:07:53,858 INFO org.apache.flink.runtime.blob.PermanentBlobCache- Created BLOB cache storage directory /tmp/blobStore-c07b9e80-41f0-490f-8126-7008144c4b0b

  11. 2018-05-0310:07:53,861 INFO org.apache.flink.runtime.blob.TransientBlobCache- Created BLOB cache storage directory /tmp/blobStore-e0d1b687-1c47-41c4-b5bc-10ceaa39e778

JobManager进程将会在8081端口上启动一个WEB页面,我们可以通过浏览器到hostname:8081中查看相关的信息。可以打开页面查看到相关信息,说明local模式部署是没问题的。

下面来看一下standlone部署方式。

安装JDK,解压压缩包,都是一样的。不一样的是我们要修改解压后的flink配置文件。然后在集群主机间做免密,免密操作方法。

修改conf/flink-conf.yaml,我们将jobmanager.rpc.address的值设置成你master节点的IP地址。此外,我们通过jobmanager.heap.mb和taskmanager.heap.mb配置参数来设置每个节点的JVM能够分配的最大内存。从配置参数名字可以看出,这个参数的单位是MB,如果某些节点拥有比你之前设置的值更多的内存时,我们可以在那个节通过FLINKTMHEAP参数类覆盖值钱的设置。

我们需要把所有将要作为worker节点的IP地址存放在conf/slaves文件中,在conf/slaves文件中,每个IP地址必须放在一行,如下:

  1. 192.168.0.100

  2. 192.168.0.101

  3. .

  4. .

  5. .

  6. 192.168.0.150

然后将修改好的flink包整理复制到集群各个节点。每个节点flink路径保持一致。然后启动集群

  1. $ bin/start-cluster.sh

查看日志是否成功。

以上是部署方法,部署成功后,我们来跑一个demo程序,验证一下Flink的流处理功能,对其有个初步的了解。

flink为了更好的让大家理解,已经给大家提供了一些demo代码,demo的jar包可以在/examples/streaming首先看一下demo代码:

  1. objectSocketWindowWordCount{


  2. def main(args: Array[String]) : Unit= {


  3. // the port to connect to

  4. val port: Int= try{

  5. ParameterTool.fromArgs(args).getInt("port")

  6. } catch{

  7. case e: Exception=> {

  8. System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")

  9. return

  10. }

  11. }


  12. // get the execution environment

  13. val env: StreamExecutionEnvironment= StreamExecutionEnvironment.getExecutionEnvironment


  14. // get input data by connecting to the socket

  15. val text = env.socketTextStream("localhost", port, '\n')


  16. // parse the data, group it, window it, and aggregate the counts

  17. val windowCounts = text

  18. .flatMap { w => w.split("\\s") }

  19. .map { w => WordWithCount(w, 1) }

  20. .keyBy("word")

  21. .timeWindow(Time.seconds(5), Time.seconds(1))

  22. .sum("count")


  23. // print the results with a single thread, rather than in parallel

  24. windowCounts.print().setParallelism(1)


  25. env.execute("Socket Window WordCount")

  26. }


  27. // Data type for words with count

  28. caseclassWordWithCount(word: String, count: Long)

  29. }

这个demo是监控端口,然后对端口输入单子进行wordcount的程序。

运行demo,首先打开一个窗口进行端口数据输入:

  1. $ nc -l 9001

  2. hello

  3. hello

  4. word

  5. world

然后运行demo监控端口单词输入统计:

  1. $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9001

运行后可以看到结果统计:

  1. $ more flink-csap-taskmanager-0-XXX.out.1

  2. hello : 1

  3. hello : 1

  4. word : 1

  5. world : 1


五、IEDA开发环境搭建

1、安装java环境

此处略去,这个你已经会了~

2、安装maven

参考Maven安装与配置(https://www.jianshu.com/p/183d4f229103)

3、配置IDEA

参考如何使用IntelliJ IDEA 配置Maven(https://www.jianshu.com/p/467bd544998d)

4、pom文件设置

  1. </properties>

  2. <dependencies>

  3. <dependency>

  4. <groupId>org.scala-lang</groupId>

  5. <artifactId>scala-library</artifactId>

  6. <version>${scala.version}</version>

  7. </dependency>

  8. <dependency>

  9. <groupId>org.apache.flink</groupId>

  10. <artifactId>flink-java</artifactId>

  11. <version>${flink.version}</version>

  12. </dependency>

  13. <dependency>

  14. <groupId>org.apache.flink</groupId>

  15. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

  16. <version>${flink.version}</version>

  17. </dependency>

  18. <dependency>

  19. <groupId>org.apache.flink</groupId>

  20. <artifactId>flink-scala_${scala.binary.version}</artifactId>

  21. <version>${flink.version}</version>

  22. </dependency>

  23. <dependency>

  24. <groupId>org.apache.flink</groupId>

  25. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>

  26. <version>${flink.version}</version>

  27. </dependency>

  28. <dependency>

  29. <groupId>org.apache.flink</groupId>

  30. <artifactId>flink-table_${scala.binary.version}</artifactId>

  31. <version>${flink.version}</version>

  32. </dependency>

  33. <dependency>

  34. <groupId>org.apache.flink</groupId>

  35. <artifactId>flink-clients_${scala.binary.version}</artifactId>

  36. <version>${flink.version}</version>

  37. </dependency>

  38. <dependency>

  39. <groupId>org.apache.flink</groupId>

  40. <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>

  41. <version>${flink.version}</version>

  42. </dependency>

  43. <dependency>


5、代码示例

  1. import org.apache.flink.api.common.functions.FlatMapFunction;

  2. import org.apache.flink.api.java.utils.ParameterTool;

  3. import org.apache.flink.streaming.api.datastream.DataStream;

  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;

  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

  6. import org.apache.flink.streaming.api.windowing.time.Time;

  7. import org.apache.flink.util.Collector;


  8. /**

  9. * Author: qincf

  10. * Date: 2018/11/02

  11. * Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来

  12. * 先在目标主机1.1.1.1机器上执行nc -l 9000

  13. */

  14. publicclassStreamingWindowWordCount{

  15. publicstaticvoid main(String[] args) throwsException{

  16. //定义socket的端口号

  17. int port;

  18. try{

  19. ParameterTool parameterTool = ParameterTool.fromArgs(args);

  20. port = parameterTool.getInt("port");

  21. }catch(Exception e){

  22. System.err.println("没有指定port参数,使用默认值9000");

  23. port = 9000;

  24. }

  25. //获取运行环境

  26. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  27. //连接socket获取输入的数据

  28. DataStreamSource<String> text = env.socketTextStream("1.1.1.1", port, "\n");

  29. //计算数据

  30. DataStream<WordWithCount> windowCount = text.flatMap(newFlatMapFunction<String, WordWithCount>() {

  31. publicvoid flatMap(String value, Collector<WordWithCount> out) throwsException{

  32. String[] splits = value.split("\\s");

  33. for(String word:splits) {

  34. out.collect(newWordWithCount(word,1L));

  35. }

  36. }

  37. })//打平操作,把每行的单词转为<word,count>类型的数据

  38. //针对相同的word数据进行分组

  39. .keyBy("word")

  40. //指定计算数据的窗口大小和滑动窗口大小

  41. .timeWindow(Time.seconds(2),Time.seconds(1))

  42. .sum("count");

  43. //获取可视化JSON

  44. System.out.println(env.getExecutionPlan());

  45. //把数据打印到控制台,使用一个并行度

  46. windowCount.print().setParallelism(1);

  47. //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行

  48. env.execute("streaming word count");




  49. }


  50. /**

  51. * 主要为了存储单词以及单词出现的次数

  52. */

  53. publicstaticclassWordWithCount{

  54. publicString word;

  55. publiclong count;

  56. publicWordWithCount(){}

  57. publicWordWithCount(String word, long count) {

  58. this.word = word;

  59. this.count = count;

  60. }


  61. @Override

  62. publicString toString() {

  63. return"WordWithCount{"+

  64. "word='"+ word + '\''+

  65. ", count="+ count +

  66. '}';

  67. }

  68. }


  69. }

6、测试步骤

首先在1.1.1.1机器上使用nc命令模拟数据发送

  1. nc -l 9000

然后在IEDA中运营StreamingWindowWordCount程序 在主机上输入字符

  1. [root@data01]# nc -l 9000

  2. a

  3. a

  4. b

  5. c

  6. d

  7. d

此时运行程序后,IDEA中会打印处结果

  1. WordWithCount{word='a', count=1}

  2. WordWithCount{word='a', count=2}

  3. WordWithCount{word='b', count=1}

  4. WordWithCount{word='d', count=1}

  5. WordWithCount{word='c', count=1}

  6. WordWithCount{word='c', count=1}

  7. WordWithCount{word='a', count=1}

  8. WordWithCount{word='d', count=1}

  9. WordWithCount{word='b', count=1}

大家会看到,wordcount的结果。仔细看还有一串json输出,这部分是什么呢?代码中加了一个打印执行计划的部分:

  1. /获取可视化JSON

  2. System.out.println(env.getExecutionPlan());

Flink提供了一个可视化执行计划的结果,类似Spark的DAG图,把json粘贴到Flink Plan Visualizer可以看到执行计划图: 


完整代码demo代码请关注公众号,回复【flink资料】获取更多Flink介绍。

现在你已经搭建好Flink开发环境了,可以开启你的流处理旅程了,更多教程可以参考Flink官网。


历史好文推荐
  1. Flink简介

  2. 数据湖vs数据仓库vs数据集市

  3. 一文了解数据湖引擎

  4. kafka实战宝典:手动修改消费偏移量的两种方式

  5. Kafka实战宝典:如何跨机房传输数据

文章有问题?点此查看未经处理的缓存