作者 | Jaskaran S. Puri
译者 | 火火酱 责编 | 徐威龙
封图| CSDN 下载于视觉中国
电子商务市场中典型的一天是这样的:每分钟发生1万个事件流,并且要选择合适的工具对其进行处理。本文将帮助你重新创建一个场景,其中有大量的数据流入,你不仅需要存储这些数据,还需要对其进行一些实时分析。这只是系统设计(System Design)的例子之一,你必须在其中开发高度可用且可扩展的数据管道。虽然在电子商务这样的用例中,可能有n个需要考虑的其他因素,但在本文中,我们会将其分成3个主要部分:简而言之,几乎所有系统设计都从这个角度进行分析的,同样,这也是最容易出问题的地方。
在介绍我们的工具之前,先退一步,看看这里要解决的是什么样的用例或问题。要想了解输入或提取层(Input or Ingestion layer),首先要先了解一下输出层(Output layer)。一般来讲,输出层可以通过两种方式实现:1. 批处理(Batch Processing):如果你只进行一次分析,或者只是要更新一下每日报告,又或者只是在团队中进行随机演示,那么你可以选择成批地提取数据。这意味着可能要从你的DB数据库中取出小部分数据转储,并对其进行分析。2. 实时处理(Real-Time Processing):也被称为流数据(Streaming Data),这种方式在进行十分重要的数据分析的情况下经常使用。这在B2C场景中最为常见。批处理的好处是,它减少了构建实时管道的开销,而且你永远不用处理完整的数据集。尽管这在B2C环境中并不适用,尤其是在电子商务环境中,你必须推荐新产品、跟踪用户行为或设计实时仪表板。现在,在了解了输出层的实时特性之后,我们就要选择相应的提取工具(Ingestion Tools)了。当然,要想从各种用例中获取数据,有很多工具可供选择。但是根据流行程度、社区实力和在各种用例中的实现情况来看,Kafka和Spark Streaming是很好的选择。(Kafka:https://kafka.apache.org/)(Spark Streaming:https://spark.apache.org/streaming/)同样,要重视了解业务需求,以便确定执行同一工作的几个不同工具。在电子商务这样的场景中,我们知道需要实时输出数据,但是怎样才算实时呢?1-2秒就算相当实时了!这点没错,但对于电子商务网站来说却并非如此,因为用户并不会等待一秒之后再执行下次点击。这就引出了延迟(latency)的概念。这是我们用来选择提取工具的标准。这两个工具之间有很多不同之处,但Kafka能够提供毫秒级的延迟!
在此用例中,我们将分别讨论Spark和Kafka的处理机制。我们将会看到spark如何处理底层硬件本不应该持有的数据。另一方面,我们也将看到使用Kafka来消费数据是多么的容易,以及其是如何处理百万级规模的数据。我将使用以下来自Kaggle的数据集,该数据集行数超过1亿。https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store(Kaggle:https://kaggle.com/)除非你有一台非常高端的计算机,否则不可能将整个数据集加载到本地机器的内存中,甚至不可能将其拆分为多批进行处理,当然,除非你对每批传入都执行处理,这就是为什么我们要使用Spark。(Spark:https://spark.apache.org/)
设置spark有其复杂性,因此,为了加快速度,我们将在Databricks上启动一个spark集群,使你能够在AWS S3(数据驻留的地方)的数据支持下快速启动集群。(Databricks:http://databricks.com/try-databricks)(AWS S3:https://aws.amazon.com/s3/)Spark遵循典型的Master-Slave架构,该体系概括而言即主服务器(Master Server)负责所有的作业调度及一些其他工作,从服务器(Slave Server)负责执行实际操作或将数据保存在内存中。 Spark 架构, 1 主节点(Master Node) + 2 工作/从节点(Worker/Slave Nodes)当我们在数据上实现Spark时,会再对其进行详细讨论。就目前而言,我在数据块上构建了一个1个工作节点(Worker Node)+ 1个主节点(Master Node)集群,总共有2个核和8 GB内存,尽管完整的配置应该是4个核和16 GB内存。之所以是2核8GB内存是因为我们所有的spark操作都只在工作节点上进行,而我们只有一个工作节点。内核数量(也就是2)将在这里扮演一个十分关键的角色,因为所有的并行化工作都将在这里发生。
内存中只能存储小部分数据,因此spark所做的是:它只在你要对数据执行某些操作时,才将数据加载到内存中。例如,下面这行代码将并行读取我们的数据集,即利用2个内核来读取我们的数据。ecomm_df = sparkSession.read.csv("/mnt/%s/ecomm.csv" % MOUNT_NAME, header=True
我们将以大约112个小块的形式提取14 GB的文件,由于我们有2个内核,所以每次取2个小块,每个小块128MB。尽管如此,spark不会在你提交该命令时立即开始读取文件,因为还有另一个延迟计算(lazy evaluation)的概念,这使得它不能按照传统的python方式进行读取!但是我们仍然可以通过快速转换为RDD来检查该文件的分区/块数量。
ecomm_df.rdd.getNumPartitions()
OUTPUT: 110 #Number of partitions
(延迟计算:https://bit.ly/2xxt2Br)这与我们计算的十分接近。查看以下链接,了解我是如何从14 GB的文件大小中计算出112个分区的。(链接地址:https://bit.ly/2IR77Yh)现在,在不涉及太多技术信息的情况下,我们来快速浏览一下数据:# A SAMPLE RECORD OF OUR DATARow(event_time='2019-11-01 00:00:00 UTC', event_type='view', product_id='1003461', category_id='2053013555631882655', category_code='electronics.smartphone', brand='xiaomi', price='489.07', user_id='520088904', user_session='4d3b30da-a5e4-49df-b1a8-ba5943f1dd33')
筛选出那些仅购买了小米智能手机的人,然后执行LEFT JOIN。看看每个命令是如何被分成110个任务和2个始终并行运行的任务的。按照品牌,分析有百分之多少的用户仅查看、添加到购物车还是购买了特定的商品,现在你已经了解了spark的功能,这是一种可以在有限的资源集上训练/分析几乎任何大小的数据的可扩展的方法。
我们之前已经讨论了不少关于Kafka的问题,所以就不在这里进行深入探讨了,让我们看看在真实场景中摄取这类数据的Kafka管道是什么样子的!无论何时,当我们要讨论的是1亿数量级事件时,可扩展性就是要优先考虑的重中之重,对分区(partitions)和消费者组(consumer groups)的理解也是如此。在如此大的提取量下,这两个要素可以破坏我们的系统。看看下面这个架构,大致了解一下Kafka系统。 你的邮箱/信箱就是此模型在现实生活中的副本。
1. 邮递员:这个人是生产者,他的工作是挑选数据并将其放入你的邮箱中。2. 邮箱/信箱:这是你的代理商,如果没有人来收信的话,信就会在这里不断堆积。3. 你的地址:这是你的主题(topic),邮递员是如何知道要把数据发送到哪里的呢?4. 你:你就是消费者,你有责任收集这些数据并对其进行进一步处理。这是对Kafka数据流机制的非常简单的解释,能够帮助我们进一步理解本文,而分区和消费者组的概念也能够帮助你更好地理解代码片段。 主题容纳着数据,可以被分成n个分区以供并行使用。在此种规模下,你需要并行处理数据。为此,可以将传入的数据分成不同的分区,此时,我们可以设置消费者组,这意味着有多个消费者希望从同一个源读取数据。参照上面的体系结构,两个消费者从同一个源读取数据,因此,会在同一时间读取更多的数据,但读取的数据却不相同。如果消费者1(Consumer 1)已经读取了第1行和第2行,那么消费者2(Consumer 2)将永远不会看到这些数据,因为这种分离在分区级上就已经发生了。下面是我使用分区和消费者组大规模提取此类数据的一个实现:# 4 Partitions Made
# Topic Name : ecomm_test./kafka_2.11-2.3.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic ecomm_test# Send data to ecomm_test topic
producer.send(topic='ecomm_test', value=line)# Start 2 consumers and assign it to the group "ecommGroup"
consumer = KafkaConsumer('ecomm_test', group_id='ecommGroup')
consumer = KafkaConsumer('ecomm_test', group_id='ecommGroup')# Output of how consumer 1 reads data, only reading from 2 partitions i.e. 0 & 1ConsumerRecord(topic=u'ecomm_test', partition=1, value='2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387\n')ConsumerRecord(topic=u'ecomm_test', partition=0, value='2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f\n')# Output of how consumer 2 reads data, only reading from 2 partitions i.e. 2 & 3ConsumerRecord(topic=u'ecomm_test', partition=3, value='2019-11-01 00:00:05 UTC,view,4600658,2053013563944993659,appliances.kitchen.dishwasher,samsung,411.83,526595547,aab33a9a-29c3-4d50-84c1-8a2bc9256104\n')ConsumerRecord(topic=u'ecomm_test', partition=2, value='2019-11-01 00:00:01 UTC,view,1306421,2053013558920217191,computers.notebook,hp,514.56,514028527,df8184cc-3694-4549-8c8c-6b5171877376\n')
1. KISS:保持简单愚蠢:保持架构尽可能的简单。3. CAP定理:一致性、可用性、分区容错性。选择两个对你来说最重要的。最后,我们将介绍可以在生产系统中实现的最终架构,尽管它们还涉及许多其他组件(如可用区、存储系统和故障转移计划)但本文只是对生产中最终处理层的概述。
具有数据流的完整架构图如你所见,图中描述得非常明白,没有针对所有用例的正确的架构/系统设计。你只需要根据给定资源构建出可行的方案。原文:https://towardsdatascience.com/knowing-pyspark-and-kafka-a-100-million-events-use-case-5910159d08d7