查看原文
其他

干货 | 大数据处理技术的总结与分析


一 、数据分析处理需求分类



1、事务型处理


在我们实际生活中,事务型数据处理需求非常常见,例如:淘宝网站交易系统、12306网站火车票交易系统、超市POS系统等都属于事务型数据处理系统。

这类系统数据处理特点包括以下几点:

一是事务处理型操作都是细粒度操作,每次事务处理涉及数据量都很小。

二是计算相对简单,一般只有少数几步操作组成,比如修改某行的某列;

三是事务型处理操作涉及数据的增、删、改、查,对事务完整性和数据一致性要求非常高。

四是事务性操作都是实时交互式操作,至少能在几秒内执行完成;

五是基于以上特点,索引是支撑事务型处理一个非常重要的技术。

在数据量和并发交易量不大情况下,一般依托单机版关系型数据库,例如ORACLE、MYSQL、SQLSERVER,再加数据复制(DataGurad、 RMAN、MySQL数据复制等)等高可用措施即可满足业务需求。

在数据量和并发交易量增加情况下,一般可以采用ORALCE RAC集群方式或者是通过硬件升级(采用小型机、大型机等,如银行系统、运营商计费系统、证卷系统)来支撑。

事务型操作在淘宝、12306等互联网企业中,由于数据量大、访问并发量高,必然采用分布式技术来应对,这样就带来了分布式事务处理问题,而分布式事务处理很难做到高效,因此一般采用根据业务应用特点来开发专用的系统来解决本问题。


2、 数据统计分析


数据统计主要是被各类企业通过分析自己的销售记录等企业日常的运营数据,以辅助企业管理层来进行运营决策。典型的使用场景有:周报表、月报表等固定时间提供给领导的各类统计报表;市场营销部门,通过各种维度组合进行统计分析,以制定相应的营销策略等。

数据统计分析特点包括以下几点:

一是数据统计一般涉及大量数据的聚合运算,每次统计涉及数据量会比较大。

二是数据统计分析计算相对复杂,例如会涉及大量goupby、 子查询、嵌套查询、窗口函数、聚合函数、排序等;有些复杂统计可能需要编写SQL脚本才能实现。

三是数据统计分析实时性相对没有事务型操作要求高。但除固定报表外,目前越来越多的用户希望能做做到交互式实时统计;

传统的数据统计分析主要采用基于MPP并行数据库的数据仓库技术。主要采用维度模型,通过预计算等方法,把数据整理成适合统计分析的结构来实现高性能的数据统计分析,以支持可以通过下钻和上卷操作,实现各种维度组合以及各种粒度的统计分析。

另外目前在数据统计分析领域,为了满足交互式统计分析需求,基于内存计算的数据库仓库系统也成为一个发展趋势,例如SAP的HANA平台。


3、 数据挖掘


数据挖掘主要是根据商业目标,采用数据挖掘算法自动从海量数据中发现隐含在海量数据中的规律和知识。

数据挖掘主要过程是:根据分析挖掘目标,从数据库中把数据提取出来,然后经过ETL组织成适合分析挖掘算法使用宽表,然后利用数据挖掘软件进行挖掘。传统的数据挖掘软件,一般只能支持在单机上进行小规模数据处理,受此限制传统数据分析挖掘一般会采用抽样方式来减少数据分析规模。

数据挖掘的计算复杂度和灵活度远远超过前两类需求。一是由于数据挖掘问题开放性,导致数据挖掘会涉及大量衍生变量计算,衍生变量多变导致数据预处理计算复杂性;二是很多数据挖掘算法本身就比较复杂,计算量就很大,特别是大量机器学习算法,都是迭代计算,需要通过多次迭代来求最优解,例如K-means聚类算法、PageRank算法等。

因此总体来讲,数据分析挖掘的特点是:

1、数据挖掘的整个计算更复杂,一般是由多个步骤组成计算流,多个计算步骤之间存在数据交换,也就是会产生大量中间结果,难以用一条sql语句来表达。

2、计算应该能够非常灵活表达,很多需要利用高级语言编程实现。



二、 大数据背景下事务型处理系统相关技术



在google、facebook、taobao等大互联网公司出现之后,这些公司注册和在线用户数量都非长大,因此该公司交易系统需要解决“海量数据+高并发+数据一致性+高可用性”的问题。

为了解决该问题,从目前资料来看,其实没有一个通用的解决方案,各大公司都会根据自己业务特点定制开发相应的系统,但是常用的思路主要包括以下几点:

(1)数据库分片,结合业务和数据特点将数据分布在多台机器上。

(2)利用缓存等机制,尽量利用内存,解决高并发时遇到的随机IO效率问题。

(3)结合数据复制等技术实现读写分离,以及提高系统可用性。

(4)大量采用异步处理机制,对应高并发冲击。

(5)根据实际业务需求,尽量避免分布式事务。


1、相关系统介绍


1) 阿里CORBAR系统

阿里COBAR系统是一个基于MYSQL数据库的分布式数据库系统,属于基于分布式数据库中间件的分布式数据库系统。该系统是前身是陈思儒开发的“变形虫”系统(以前调研过),由于陈思儒离开阿里去了盛大,阿里当心“变形虫”稳定性等问题,重新开发该项目。

该系统主要采用数据库分片思路,实现了:数据拆分、读写分离、复制等功能。由于此系统由于只需要满足事务型操作即可,因此相对真正并行数据库集群(例如TeraData等),此类系统提供操作没有也不需要提供一些复杂跨库处理,因此该系统存在以下限制:

(1)不支持跨库的join、分页、排序、子查询。

(2)insert等变更语句必须包括拆分字段等。

(3)应该不支持跨机事务(以前变形虫不支持)。

说白了此类系统不具备并行计算能力,基本上相当于数据库路由器!

另外此类系统的在实际应用的关键问题是,根据什么对数据进行切分,因为切分不好会导致分布式的事务问题。

2) 阿里OceanBase系统

该系统也是淘宝为了解决高并发、大数据环境下事务型处理而定制开发的一个系统。该系统主要思路和特点如下:

(1)他们发现在实际生成环境中,每天更新的数据只占总体数据的1%不到,因此他们把数据分为:基线数据和增量更新数据。

(2)基线数据是静态数据,采用分布式存储方式进行存储。

(3)只在一台服务器上存储和处理增量更新数据,并且是在内存中存储和处理更新数据。

(4)在系统负载轻的时候,把增量更新批量合并到基线数据中。

(5)数据访问时同时访问基线数据和增量更新数据并合并。

因此这样好处是:

(1)读事务和写事务分离
(2)通过牺牲一点扩展性(写是一个单点),来避免分布式事务处理。

说明:该系统虽然能处理高并发的事务型处理,号称很牛逼,但其实也只是根据电商的事务处理来定制开发的专用系统,个人认为其技术难度小于oracle等通用型的数据库。该系统无法应用到银行或者12306等,因为其事务处理的逻辑远远比电商商品买卖处理逻辑复杂。

在目前的大数据时代,一定是基于应用定制才能找到好的解决方案!

3) 基于Hbase的交易系统

在hadoop平台下,HBASE数据库是一个分布式KV数据库,属于实时数据库范畴。支付宝目前支付记录就是存储在HBASE数据库中。

HBASE数据库接口是非SQL接口,而是KV操作接口(基于Key的访问和基于key范围的scan操作),因此HBASE数据库虽然可扩展性非常好,但是由于其接口限制导致该数据库能支持上层应用很窄。基于HBASE应用的设计中,关键点是key的设计,要根据需要支持的应用来设计key的组成。

可以认为HBASE数据库只支持作为KEY的这一列的索引。虽然目前HBASE有支持二级索引的方案,二级索引维护将会比较麻烦。


2、并发和并行区别


并发是指同时执行通常不相关的各种任务,例如交易型系统典型属于高并发系统。

并行是通过将一个很大的计算任务,划分为多个小的计算任务,然后多个小计算任务的并行执行,来缩短该计算任务计算时间。

两者主要区别在于:

(1)通讯与协调方面:在并行计算中,由于多个小任务同属一个大的计算任务,因此小任务之间存在依赖关系,小任务之间需要大量通讯和协调;相反,并发中的多个任务之间基本相互独立,任务与任务之间相关性很小。

(2)容错处理方面:由于并发任务之间相互独立,某个任务执行失败并不会影响其它的任务。但是并行计算中的多个任务属于一个大任务,因此某个子任务的失败,如果不能恢复(粗粒度容错与细粒度容错),则整个任务都会失败。


3、本章总结


数据量大不一定需要并行计算,虽然数据量大,数据是分布存储,但是如果每次操作基本上还是针对少量数据,因此每次操作基本上都是在一台服务器上完成,不涉及并行计算。只是需要通过数据复制、数据缓存、异步处理等方式来支撑高并发访问量



三、 大数据背景下数据统计分析技术介绍



随数据量变大,和事务处理不同的是,单个统计分析涉及数据量会非常大,单个统计分析任务涉及数据会分散在多台服务器上,且由于计算量大,采用单台服务器进行计算,会导致计算时间非常长,单个统计分析任务必须采用并行计算方式来加快单个统计分析任务执行速度。


1、并行查询与并行计算技术介绍


在大数据背景下的数据统计分析技术门类很多,常见的有:

•  MPP并行数据库 : TeraData、GreenPlum、Vertica等。
•  基于MapReduce并行计算框架的数据仓库:HIVE(Hadoop平台) 、Tenzing(Google公司)
•  基于Hbase的Phoenix系统
•   HadoopDB系统
•  EMC公司的hapt系统
•   MPP分布式查询引擎: Dremel、Impala、Presto、Shard query、Citusdb。
•  基于SPARK的Shark、基于Dryad的SCOPE、基于Tez的stinger。
•  基于hadoop+index的JethroData系统
•  基于内存计算的Druid系统

这些系统都解决了海量数据下的数据统计分析的问题,并且这些系统另外一个共同特点是都提供了SQL或者类SQL接口。

为了能够较好研究这些系统,我们需要对并行查询与并行计算的相关技术做一个简要的介绍。



首先所有的系统都可以分为三个层次: 语义层、并行计算引擎层、分布式存储层。语义层提供一个编程接口让用户表达所需要计算,并负责把该计算翻译成底层并行计算引擎可以执行的执行计划,并由并行计算引擎来执行,最下面一层是分布式存储层。

对于提供类SQL接口并行计算系统,语义层可以认为是SQL解析层。

1) 语义层

SQL语言是一种声名式语言,SQL只是表达了要做什么,而没有表达怎么做。为此,SQL解析层主要作用是:将用户提交的基于SQL的统计分析请求,转化为底层计算引擎层可以执行的执行计划。也就是解决“怎么做”的问题。

SQL解析层工作主要包括两个大方面:

(1) 通过语法分析技术来理解要做什么。在关系数据库中,一般会把SQL语言分析后,形成树型结构的执行计划。

(2) 在语法分析技术上,利用各种优化技术和算法,找出一种最经济物理执行计划。



优化可以分为两个方面:一是逻辑层面优化、二是物理执行层面优化。

(1) 逻辑层优化

逻辑层面个人认为主要是因为同样表达一个分析请求,有的人SQL写的好,有的人SQL写的烂,因此在逻辑层面可以通过一些等价关系代数变换,实现查询重写,将写的比较烂的sql变换为好的写法。



比较典型优化是:“把投影和过滤下沉,先执行过滤和投影操作”,减少中间结果。



(2) 物理层优化

物理层面优化是在逻辑优化后,结合实际物理执行过程,找出最优的物理执行计划。生成物理查询计划的工作包括:

•  增加一些操作符: 包括扫描和排序等。

•  确定各个操作符实现算法。例如扫描是全表扫描还是利用索引;Join是采用HASH连接、索引连接、合并排序等实现算法中的那一种。

•   确定操作符之间的数据流转方法:物化还是流水线方式。

•  采用基于代价估算方法确定最优的物理执行计划,目前代价估算主要是以估算该物理计划需要的IO量。另外对于并行数据库,则还要考虑通讯代价,即尽量减少数据在各个机器之间的传递。

在物理层优化的代价估算过程中,代价估算需要依靠很多统计信息,如表有多大,表中相关列的值分布是什么样子等。传统数据库在数据Load过程中会事先计算好这些统计信息。并行计算中还需要考虑通讯代价。

需要指出是,由于imapla、Presto、HIVE等系统只是一个查询引擎,它们可以直接查询以普通文件方式存储在HDFS系统上的文件,因此这些系统一般无法使用索引和各种统计信息来进行物理执行计划的优化,这些系统一般只能在逻辑层进行一些基于规则静态优化。根据SHARK论文,SHARK系统支持根据前面一些节点计算获得的信息,来动态优化后面执行计划。

(3) 物化与流水线执行方法

一条SQL语句对开发人员而言,感觉只是一次调用,但是实际上在数据库内部,一条SQL语句执行其实是有多个操作符组合而成的的树型结构计算流。如下图:





针对该计算流有两种执行方式:一是基于物化或者是实体化执行方式,另外一种是基于数据流的执行方式。

第一种方法的过程是: 把各个操作运算排序,并把每个操作运算的输出的中间结果存储在磁盘上,直到被另外一个操作运算所读取。

另外一种方法是同时交错进行多个运算,由一个运算产生每个元组直接传递给下一个运算,而不将中间结果存储到磁盘,也不用等到前一个运算全部运算完毕。

例如: 两个表连接后,再进行投影操作。如果采用第一种方法,则需要

把两表连接中间结果临时写入磁盘,然后再读取该结果执行投影操作。而如果采用第二种方法,则连接操作一旦产生一个元组就可以立刻送到投影操作去进行投影操作。

流水线方法可以极大避免大量的中间结果磁盘IO。因此数据库一般会采取流水线方法来执行。流水执行方法有两种模式:一种是需求驱动流水线,也就是从上层主动向下层要求元组,另外一种是生产者驱动流水线执行方式,由低层主动产生元组,由下层向上层推。

目前大部分数据库引擎采用的是需求驱动流水线,实现方式采用基于Graefe提出的迭代器模型。该模型把每个操作都表达为由三个接口: open() , getnext(), close()。每个操作被调用open() 进行准备工作,然后通过反复迭代被调用getnext来获取下一个元组,最后被调用close来进行清理工作。 通过构建迭代器网络,也就是迭代器之间的互相调用,就可以实现需求驱动流水线。

当然不是任何操作都可以流水执行,流水执行条件是:操作要满足在接收输入元组时可以输出元组。例如排序操作就无法进行流水操作,在执行排序操作前都必须进行实体化。

(4) SQL解析层与并行计算引擎层

由于不同并行计算引擎层的执行计划表达不同,因此不同系统需要将SQL解析成不同的形式物理执行计划,例如:

•  MPP关系数据库一般是把SQL解析成树状结构的物理执行计划。

•  HIVE、Tezning数据库是把SQL解析成DAG结构的多个MAPREDUCE组合。

•  DRemel等则类似MPP关系数据库,把SQL解析成一个树状结构执行计划。

•  微软SCOPE则需要把类SQL解析成DAG结构的Dryad可执行的执行计划。

•  SHARK则需要把SQL解析成基于scala语言的DAG结构执行计划。



并行

2) 并行计算引擎层

(1) 并行计算形式

并行化可以分为水平并行(无依赖并行)与垂直并行(流水线并行)两类。如下图:





如果两个操作OP1、OP2 无相互依赖关系,则称这两个操作相互独立。水平并行化指的是互相独立的多个操作或者一个操作内互相独立的多个子操作分别由不同的处理机并行执行的形式。例如,排序操作、扫描操作由不同处理机并行执行就是水平并行化的实例。

水平并行中一个非常常见的就是基于数据划分的并行,例如MAPREDUCE,就是通过将数据划分到多台服务器上,并行执行MAP和Reduce来进行并行运算。也有人把这种基于数据划分并行与操作独立并行区分开。

垂直并行化则是指存在流水线方式依赖关系的操作分别由不同处理机并行执行的形式。流水线方式依赖:如果OP2无需等待OP1执行完毕即可在另一处理机上开始执行。由于一般情况下,流水的级数远小于处理的数据条目,因此流水并行主要意义是在可以避免中间结果磁盘IO操作,对并行度的贡献相对较小。

(2) 并行计算面临的问题与并行计算框架

并行计算需要解决的问题主要包括几下几个方面:自动并行化、通讯、任务调度、并发控制、容错、资源管理。由于并行计算面向上述一系列问题,因为业界为了简化并行程序开发,提供了一系列的并行计算底层库或者框架。

在高性能计算领域,最常用于并行计算编程的库是MPI库,但是该库主要只是解决通讯问题。这导致容错、资源管理、任务调度、并行化等方面问题需要程序员来解决,因此利用MPI开发并行程序相对比较困难。

最近一些年,各大型互联网公司开发开发了一系列的通用并行计算框架。包括谷歌公司的MAPREDUCE框架、微软公司的Dryad框架(目前微软已经停止该项目开发,转而支持hadoop)、谷歌公司基于BSP模型的Pregel框架、Twitter公司的Storm框架、Yahoo公司S4框架、HortonWorks公司的Tez框架、Berkeley大学的spark框架等通用并行计算框架。

有了这些框架了,程序开发时只需要编写串行执行程序即可,而且也不用考虑任务与任务之间的并发控制以及通讯等问题,其它所有问题都有框架来解决 ,这样就大大简化并行程序开发难度。例如采用MAPREDUCE框架,我们只需要提供MAP函数和Reduce函数,这些函数对程序员而言,都只是对本地数据操作。

目前虽然并行计算框架很多,但是可以把它们分成几个大类(基于BSP并行图计算引擎请参考第四章):

•  流数据并行计算框架

Storm、S4是属于流数据并行计算框架,适合对流数据实时处理,也就是在数据写入磁盘前对数据进行实时并发运算。这类特点是计算不变,数据一直在变化。在上一个文档中,对此框架做过详细介绍,这里不再详细介绍。

•  基于DAG通用批处理并行计算框架

MapReduce、Tez、Dryad、Spark等属于基于DAG(有向无环图)的通用批处理并行计算框架。这类框架是针对存储在存储设备上的一批数据进行分析处理,而且把分析处理流程利用DAG模型来表达。

在这些框架中MAPREDUCE是最早出现的框架,而后面出现的一系列框架都为了改进MR框架不足而出现的升级版本。

•  MR框架主要不足是两个方面:

一是编程接口太简单,表现在单个MAPREDUCE无法表达复杂运算,所以在实际应用环境中都是通过多个MR作业组合来完成一个任务。为了简化MR作业组合,在早期出现了一系列项目来执行组和式MR作业,例如Cascading项目。另外一个方面所有问题都必须转换为MAP和REDUCE模式,导致程序编写比较麻烦。

二是MR只支持基于数据分区并行方式,不支持流水线并行,采用是步步物化策略来提高可靠性,当是这种导致大量中间结果物化,IO开销非常大。

因此Tez、Dryad、Spark等后续框架改进主要针对以下两点进行改进:

一是直接支持基于DAG结构表达方法,DAG使得用户能够非常清晰地写出非常复杂的业务逻辑;

二是通过支持流水线并性方式或者是尽量将中间结果放内存等方式,解决中间结果物化导致的IO开销问题。Dryad和Spark框架在执行运算时,都会自动识别可以采取流水线方式执行的计算步骤,并尽量采用流水线执行方式来执行。

容错:由于支持流水线并行或者采取把中间结果放内存的方式,因此要必须考虑容错的问题。由于这些框架都采用的是DAG结构,DAG中一个节点所代表计算的执行是不会对输入进行修改(所谓函数式编程),因此可以多次重复执行不会影响计算。因此如果某个节点计算失败,它可以根据输入重复计算,而如果输入数据也消失了,则让前一个节点重新计算。所有这一切都是由框架自动执行。

当然需要指出的是对一些流水线执行的多个计算步骤,如果某个计算节点失败,则只能整个流水线整体失败。











•  基于Tree结构的MPP并行查询引擎

MPP并行数据库与Dremel、impala、Presto、Shard query、Citusdb都采用的是基于Tree结构并行查询引擎。此类并行计算引擎共同特点是:

一是针对SQL专用并行计算引擎,只支持SQL或者类SQL语义。

二是执行计划都是树状结构;

三是以流水线或者将中间结果放入内存方式来实现快速计算。

四是粗粒度容错机制。

它们之间不同点:

一 MPP并行数据库中并行查询引擎与底层存储是紧耦合的,导致如果采用MPP并行数据库,则只能通过SQL来访问数据,无法采用其他计算引擎直接处理存储在数据库中的数据。

二 Impala、Presto都只是一个并行查询引擎,它们可以直接查询以文件方式存储在HDFS上的数据,这样同一份数据既可以利用这些引擎来实现交互式查询,也可以支持利用其他计算框架进行更深入分析。

三 Dremel 只支持Google自己的基于嵌套结构列式存储(Column IO)。该引擎也主要适合于聚合型计算,不支持join操作。

四 上述引擎中只有MPP并行数据库可以利用索引以及各种统计信息来优化物理执行过程,因此该系统执行效率应该是最高。

五 Dremel、impala都只适合中间结果越来越小的查询,因为这些系统都是把中间结果放在内存,一旦某个中间节点输出结果超过内存,则整个任务会失败,例如大表之间Join。

六 shard query和citusdb 都是在单机版本关系数据库基础上,采用增加一层中间件方式来支持并行查询。

•  基于Tree并行计算引擎与基于DAG并行计算引擎本质区别

基于Tree结构并行计算引擎与基于DAG并行计算引擎从表面上看,它们之间的主要区别是在于语义层面:前者主要专用与SQL类,而后者更通用。

但是MPP并行关系数据库引擎、Imapla等都会支持通过UDF来扩展和解决标准SQL语言表达能力,另外SQL语言本身可以通过嵌套查询、子查询、union等各种方法表达很复杂的计算过程,因此从语义表达层面来讲他们之间不存在本质区别。

这两者之间主要区别还是在于表达执行计划结构方面:树结构是一个逐步汇聚的一个计算过程,无法表达split结构,因此基于DAG表达结构更灵活和通用。个人认为:树型结构可能更加适合采用迭代器模型来实现流水线式的操作(只有树结构才有上下层的关系,因此方便实现上层操作符嵌套调用下层操作符)。

所以不是所有计算都可以通过一个复杂SQL语句来表达!

(5) 自动并行化、数据重分布、本地调度

并行计算引擎最重要的一个职责是自动并行。根据前面的并行计算基础知识,并行计算的形式主要包括:基于数据划分水平并行、基于流水线垂直并行、基于无依赖水平并行三种方式。

大数据属于数据密集型计算,数据数量远远超过计算步骤数量。因此基于数据划分并行方式是最有效的一种并行计算方法。在整个并行计算过程中,基于数据划分中涉及数据可以分为两大类:原始数据与中间结果数据。

•   原始数据划分以及SN、SD架构讨论

原始数据则可能存在两种情况:一是在Shared-nothing架构中,原始数据本身就已经划分好了,例如HDFS或者SN架构 MPP数据库;另外一种情况如shared-disk结构中,原始数据没有划分。

第一种情况下针对原始数据划分并行计算,就要受该划分的限制。例如在MAPREDUCE中,map输入是存储在HDFS上的数据文件,因此MAP实例个数一是不能少于该数据文件分片数,二是MAP实例最好运行在该数据文件所在机器,也就是要求任务调度时,能把该任务调度到特定机器上,即所谓“本地调度”,将计算尽量移动到数据。

第二种情况下,由于所有计算节点都可以看到所有数据,因此此时可以根据计算特点灵活选择:数据划分粒度、并行度、参与计算的节点。例如在ORALCE并性机制中,ORALCE可以针对某张表,按block或者partition 为单位进行划分。

根据上述分析我们可以发现SD架构相对SN架构,在针对原始数据第一级并性计算时,SD架构更灵活,SN架构面临的一个缺陷就是如果原始数据分布不均衡,则存在计算倾斜问题。

但是现在大部分大的数据库厂商的MPP数据库还是采用了SN架构。根据网上所查资料来看,主要原因有两点:

一是SD架构下,磁盘是一个共享资源,计算节点越多磁盘争抢概率越大(和RAID随机IO冲突道理一样),导致该架构可扩展性不够好,也就是可能计算节点越多,效率相反不会提高。

二是从缓存角度来看,SD架构下每个机器缓存都要面向全数据库,会导致命中概率底下;目前ORACLE-RAC开发一个fusion cache技术,实现了一个全局共享缓存来解决上述问题,但是可想而知这会影响系统可扩展性。

因此超过一定规模数据分析系统,都是采用SN架构。



中间结果数据划分与数据重分布

中间结果是由各个计算节点产生的,因此中间结果生成是就是分布在各个参与计算节点之上的,因此:

一 :SD架构下数据共享好处,对中间结果无效。

二 :如果由于计算任务之间需要,需要在任务之间传递中间结果,则即使是SD架构也存在数据重分布的问题,主要是中间结果重分布,也就是中间结果传输。

另外从该过程我们还可以得出另外一个结论:

一: 对于复杂的数据处理,索引只能影响第一级计算,对于中间结果,由于只使用一次,因此没有必要去针对中间结果建立索引。也就是即使我们将数据存储在关系型数据库中,也只有第一级计算能有效利用数据库索引。

二:即使采用并行数据库,如果我们的整个计算过程不能用一个SQL语句来表达,则我们必须自己解决中间结果的划分与并性计算的问题。



(6)并行计算引擎架构与资源管理

所有并行计算引擎实现基本上都是主从结构,即一个MASTER + 多个slave节点的结构。由client向MASTER提交一个job,然后由Master负责将逻辑执行计划变成实际执行计划,并由Master负责将各个任务分发到各个slave中,并负责各个任务的调度。

MPP数据库查询引擎架构







MAPREDUCE架构和该架构缺点

Mapreduce框架中,JobTracker承当MASTER的职责,一般和HDFS中的NadeNode节点安装在一个服务器上。TaskTracker安装在各个DataNode上,承担Slave的角色。



流程如下:

(1)首先用户程序(Client Program)提交了一个job,job的信息会发送到Job Tracker中,Job Tracker是Map-reduce框架的中心,他需要与集群中的机器定时通信(heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有job失败、重启等操作。

(2)TaskTracker是Map-reduce集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况(资源的表示是“本机还能起多少个map-task,多少个reduce-task”,每台机器起map/reduce task的上限是在建立集群的时候配置的),另外TaskTracker也会监视当前机器的tasks运行状况。

(3)TaskTracker需要把这些信息通过heartbeat发送给JobTracker,JobTracker会搜集这些信息以给新提交的job分配运行在哪些机器上。

MAPREDUCE结构存在以下缺点:

(1) jobtracker只能安装在一台服务器上,集中式作业控制导致可扩展性不好,另外JobTracker负责事情太多,容易成为性能瓶颈。

(2) 资源调度与编程模型紧耦合,只支持MAPREDUCE一种编程模型。

(3) 资源划分太简单,每个TaskTracker只是简单把整个机器资源按map task slot和reduce task slot来划分,而没有考虑不通任务所需的内存和CPU等的资源不同。

针对上述特点,hadoop平台开发通用的资源管理器yarn,只负责资源管理和分配,即通过把jobtrack中的资源管理分配自和并行应用程序调度与控制分离,从而实现双层调度框架:由yarn把资源分配给各计算引擎MASTER,再由MASTER分配给各个TASK。



资源管理器YARN





流程如下:

1) client 通过一个CLC (container launch context )向ResourceManager提交一个应用

2)RM 启动该应用的 AplicationMaster。 AplicationMaster启动后先向ResourceManager注册,并利用心跳信息,定期向ResourceManager报告自己存活性和资源分配请求

3)ResourceManager分配一个container(container包括CPU个数和所需内存数量)时, AplicationMaster构造一个CLC,并在该container对应机器上Nodemanager上启动该container。AplicationMaster 监控该container的运行状态,并且该资源需要被回收时,由AplicationMaster停止该container。 监控container内部的作业的执行进度是AplicationMaster的职责。

4)一旦整个运行完毕,AM从RM中解除注册,并且干净退出。

这种架构优点是:

优点一:减小了JobTracker(也就是现在的ResourceManager)的资源消耗,并且让监测每一个Job子任务(tasks)状态的程序分布式化了,更安全、更优美。也就是ApplicationMaster是每个应用一个,并且不通应用对应的ApplicationMaster的实例可以运行在不同服务器上。

优点二:能够支持不同的编程模型ApplicationMaster是一个可变更的部分,用户可以对不同的编程模型写自己的ApplicationMaster,让更多类型的编程模型能够跑在Hadoop集群中。

优点三:对于资源的表示比之前以剩余slot数目更合理。

3) 存储层

数据存储层主要包括以下几类:

一类是基于MPP数据库集群,这类系统特点是存储层与上层并型计算引擎是紧耦合,属于封闭性的系统。

二是采用分布式文件系统,例如SharK、Stinger、HIVE、Impala、Scope等。Shark、Stinger、Hive、Imapla都采用HDFS文件系统作为存储层,Scope采用微软自己开发的分布式文件系统。此类系统特点是存储层与上层计算引擎层之间是松耦合关系。

三是存储层基于单机版本关系数据库,例如CitusDB采用PostSQL数据库系统、shardquery采用Mysql数据库系统。此类系统类似于一个中间件,也可以认为上层和底层存储层属于松耦合关系。

四是可以支持各种异构的存储系统,例如Presto、Tenzing。Presto设计即支持HDFS也支持存储在Mysql中的数据,但是目前只支持HDFS;Tenzing底层支持:Google File System、MySQL、Bigtable。

不同存储系统对上层计算有一些影响,典型如Tenzing系统会利用底层存储系统的一些特性:

(1)例如如果低层是mysql数据库,则可以直接利用mysql索引来过滤

(2)如果底层是bigtable数据库,则可以直接利用bigtable 范围scan来过滤

(3)如果底层是列存储系统,则可以只扫描需要扫描的列。

(4)如果底层是列存储系统,且头文件里面有该列最大值和最小值,则可以利用该信息直接跳过某些文件的扫描。

另外需要指出的是,目前已上所有系统都有一个趋势就是采用列式存储。例如HIVE开发了行列混合的RCFILE文件格式(先按行划分,保证每行的数据不会垮机器存储,然后再按劣存储),shark系统开发了内存中的列式存储格式,citusDB开发了专用postSQL数据库的列式存储引擎。


3、Druid等专用系统简单介绍


1) JethroData系统

JethroData的特点是hadoop+index。该系统对存储在HDFS上的结构化数据建立索引,并把索引文件也以普通文件方式存储在HDFS系统,并在查询处理时采取以下过程:

(1) 查询主节点负责分析SQL语句后,针对sql中的where条件部分,利用索引文件来得到符合where过滤条件后的rowid集合。

(2) 该rowid集合涉及各datanode节点,采用并发方式来读取数据。

(3) 所有数据汇总到查询主节点,进行汇总与计算,并将最终结果返回给客户端。

可以看出,由于该系统设计思路是希望通过索引来加速数据选择,因此只适合每次查询处理只涉及少量一部分数据。

2) Druid系统

本系统是美国metamarket公司开发的面向海量数据的实时统计分析系统,以实现针对上亿级别海量数据统计分析的延迟在1秒以内。该系统于2012年10月开源。该系统可以认为是一个分布式的内存OLAP系统。

该系统主要分析的数据为交易记录,每条交易记录包括三个部分:交易发生的时间点、多个维度属性、多个数值型度量属性。例如:



该系统设计用来可以回答以下问题“有多少个针对Justin Bieber的编辑来自San Francisco? ”、“一个月内来自Calgary的增加编辑字数的平均数是多少?”。而且要求:能够在高并发环境下,在1秒以内完成任意维度组合的统计,且保证系统高可用;还系统还要能够具备实时数据分析能力,也就是能够查询分析到最新的数据,延时时间为秒级。

为了达到上述目标,该公司先后通过测试发现关系数据库技术和NOSQL数据库都无法满足其需求。关系型数据库由于磁盘io瓶颈导致性能无法满足需求,而NOSQL数据库虽然可以采用预计算方法来达到高性能,但是预计算无法满足分析需求灵活多变。

为解决该问题,该公司自己开发DRUID系统,主要技术思路如下:

(1)将原始数据(alpha数据)进行一定粒度合并,合并成beta数据。

(2)将beta数据全部放入内存,并通过分布式内存方式解决单台服务器内存

上限问题。

(3) 针对纬度属性建立索引,以加速数据的选取。

(4) 采用分布式方式进行并行统计,为了保证分布式统计高效,该系统不支持join,而且对聚合计算不支持中位数等无法分布计算的聚合计算函数。

(5) 利用数据复制解决系统高可靠性问题。


4、本章总结


1) MPP并行数据库得益于流水线的执行以及基于统计优化等方面,使得MPP并行数据库的执行效率是最高的。但缺点包括:

•  数据导入时间长,导入时要做各种预处理,例如一些统计信息;

•  执行引擎和存储紧耦合导致数据难以被其他分析引擎进行分析;

•  基于树型结构执行计划,导致MPP并行数据库表达能力有限,更适合做统计与查询,而不适合数据分析处理;

•  容错性差,特别是一个任务涉及数据量越大,该缺陷越明显。

2)HIVE、Tenzing、Shark、SCOPE、Stinger等系统可以认为基本属于同一类系统。这类系统共同特点是:”通用并行计算引擎框架+SQL解析层”。并且可以将HIVE、Tenzing看成是基于第一代系统,而Shark、Scope、Stinger是第二代系统。这一类系统特点如下:

•  存储层、执行引擎层、SQL解析层三者分离,可以方便替换执行引擎,对使用者而言,同一份数据可以采用不同并行执行引擎来分析。

•  在执行效率方面,由于存储和上层分离因此一半只能具备逻辑优化能力,另外由于Tree结构执行计划更容易采用流水线执行方式,因此这类系统执行效率总体来讲不如MPP关系数据库,它们之间排序是MPP数据库 > 第二代系统 > 第一代系统。

•   在执行效率方面,另外一点是这类系统一般内置对索引的支持不是太好或者不支持。

•  在大规模计算容错方面,这类系统要优于MPP关系数据库。

3)Impala、Dremel等可以认为属于同一类系统,此类系统介于前两者系统之间。这类系统特点是:

•   和MPP数据库类似,基于Tree结构执行计划,专注于查询统计,因此效率高于第二类系统,但是可能和第二类系统的第二代相当。

•   与MPP数据库不同的是这类系统只是一个引擎,与存储系统松耦合。也就是SQL解析层与执行层紧偶合,然后和存储层松藕合。

•  只适合做中间结果越来越小查询分析,中间结果都放内存,对内存要求较高,例如无法实现大表之间的join。

因此,在大型互联网企业中,数据量太大,就会出现所谓“高价值、低密度”情况,反映到数据处理上,互联网企业不会长期存储原始数据,而是会把原始数据先经过一部分预处理,经过部分提炼后,把提炼后数据进行长期存储和分析。也就是如下流程:



例如淘宝,把每天数据直接写入Hadoop平台,然后通过每天运行相对固定mapreduce作业来做ETL,然后在计算结果基础上为提供各种分析功能。其中海量原始数据经过固定ETL后被删除,由于只使用一次,因此没有必要花很大精力把这些数据整理成适合分析与挖掘格式。例如在这种场景下,索引也没有太大的价值,因此没有必要花费大量代价来建立索引。

MPP并行数据库,适合存储高密度价值数据,并且是长期存储和多次使用,所以MPP并行数据库会花大量经历在Load阶段,把数据处理成适合分析格式 。

通过上述系统地介绍与比较,我们可以得出一个这样结论:在大数据领域,没有一个通用的解决方案,而需要根据具体业务场景,选择合适的技术!

4)通过上述系统研究,我们可以发现一点就是Join操作,特别是大表之间join操作是最消耗资源,也是最优化难度较高的操作,特别是在并行join的实现难度较大。例如Druid和Dremel等都基本放弃了join操作。

因此个人认为应该从业务上和从数据预处理方面,通过适当数据冗余来尽量避免在分析过程过程中执行join操作。



四、大数据背景下数据分析挖掘技术介绍



1、Mahout与MLlib项目


数据分析挖掘主要涉及两个方面:一是数据预处理;二是数据挖掘。

在数据预处理方面,根据掌握资料来看,大型互联网公司主要以MapReduce、Storm等计算框架为主,这些平台可以较好解决大数据预处理面临并行计算和处理灵活性的问题。但是个人认为spark、tez等属于MapReduce升级版本,因此后面这些计算框架在这方面的应用会越来越广泛。

在数据挖掘算法执行方面,主要问题解决数据挖掘算法并行计算问题。早期在数据挖掘算法并行化方面项目主要是Mahout项目,该项目基于MAPREDUC 并行计算框架实现了推荐、分类等常用数据挖掘算法的并行化。

但由于数据挖掘算法存在以下两个方面特点导致基于MAPREDUCE框架来做数据数据挖掘算法执行引擎效率不高:一是机器学习算法一般比较复杂,通常需要多次迭代计算,而MapReduce框架的步步物化导致中间结果会反复的序列化和反序列化导致效率不高;二是数据与数据之间依赖特别多,在计算过程中机器与机器之间的通讯非常多,而MapReduce框架下Map与Reduce之间存在路障同步, 导致大量时间被消耗在同步等待上面,效率不高。

因此目前Mahout项目在2014年1月份在0.9版本发布后,该项目抛弃了MAPREDUCE框架,转而采用SPARK作为底层计算框架。



除Mahout项目外,SPARK自己采用SPARK专门针对机器学习领域开发MLlib项目。但是MLlib项目出现时间比较晚,因此在成熟度方面不如Mahout。

Mahout项目目前支持的数据挖掘算法如下:



MLLib支持的数据挖掘算法包括:



2、图数据处理处理概述


在数据分析处理领域,随社交网络兴起,对图数据处理的需求越来越多。例如像Facebook和Twitter这样的社交网络,其数据天生就适合于图表示法。对图数据的处理和传统数据库处理一样,也可以分为两种类型的需求:

4 本章总结OLTP工作负载,能够快速低延迟访问小部分图数据。

4 本章总结OLAP工作负载,能够对图对象中的大部分数据进行批量分析与处理。

1) 图数据OLTP处理

(1) 图数据库分类

适合图书据OLTP处理的系统,主要是各种图数据库。从目前来看图数据库主要可以分为两类:

一是基于图存储模型的专用图数据库,如Neo4j、OrientDB、Infinite Graph等;

二是以通用KV存储系统或者关系数据库系统开发的图数据库,例如Titan系统(2013年推出)可以后端存储可以基于HBASE或者是Cassandra,Twitter公司的FlockDB图形数据库和facebook公司Tao图形数据库是基于mysql来进行开发。根据报道美国NSA就是利用2011年开源的Apache Accumulo(属于分布式KV数据库)来存储社会关系网络数据。

(2) 图数据查询

图数据查询其实就是”遍历”图(Traverse)。图数据库查询语言可以使用Gremlin、Cypher等查询语言来查询图。例如Neo4j就支持Cypher查询语言。

Cyper查询语言需要以一个节点来启动(START)查询,然后使用MATCH关键词以WHERE关键字过滤节点或者关系的属性,最后以RETRUN关键词来指定查询所返回的数据是节点、关系还是节点或者关系的属性字段。例如:

START barbara = node:nodeindex(name=”Barbara”);
MATCH(barbara)—(connected_node)
RETURNconnected_node.

(3) 两类图数据库区别

第一类与第二类图数据库区别在于以下几点:

查询功能方面

第一类图数据库可以以非常高效率方式支持复杂查询,既支持从指定起点开始,以任意深度来遍历图,并且还可以支持各种过滤。这样就可以很方便的执行各种图专用查询任务,例如“查找两个节点间所有路径或者最短路径”等。相反第二类数据库则只能支持较为简单查询,如FlockDB就只支持深度为1的关系遍历(个人认为也可以实现,只是效率不高)。

可扩展性方面

大部分第一种图形数据库都不支持分布,个人认为可能分布后这种复杂查询难以做到高效,因此可扩展性不好。而第二种由于只支持简单的图便历,一般通过采取按“边”切分的方法来进行分布存储,因此可扩展性较好。

2) 图数据OLAP处理

对图数据进行复杂分析,就需要分布式的批处理框架。例如大规模的PageRank计算。在这个领域出现并行图计算框架常见有Apache Giraph、Apache Hama、GraphLab、Pregel、GraphX等。

Pregel是Google根据BSP并行计算模型开发的图计算引擎,目前该系统没有开源。GraphX是Spark项目组基于Spark框架开发的图计算引擎;而GraphLab则是直接在MPI框架基础上开发的专用图计算引擎。

下面简单介绍几种主流并行图计算引擎。


3、并行图计算引擎


1) 基于BSP模型的Pregel引擎

简介

Pregel是Google公司开发的并行图计算引擎,主要用于实现各种机器学习算法。Pregel的输入是一个有向图,该有向图每一个顶点都有一个相应由String描述的顶点标识符。每一个顶点都有一个与之对应可修改用户自定义值。每一条有向边都和其源顶点关联,并且也拥有一个可修改的用户自定义值,并同时还记录了其目标顶点的标识符。

Pregel可以采用多种文件格式进行图的保存,比如可以用text文件、关系数据库、Bigtable。为了避免规定死一种特定文件格式,Pregel将从输入中解析出图结构的任务从图的计算过程中进行了分离。计算结果可以以任何一种格式输出并根据应用程序选择最适合的存储方式。Pregel library本身提供了很多常用文件格式的readers和writers,但是用户可以通过继承Reader和Writer类来定义他们自己的读写方式。

编写一个Pregel程序需要继承Pregel中已预定义好的一个基类——Vertex类。



用户覆写Vertex类的虚函数Compute(),该函数会在每一个超级步中对每一个顶点进行调用。预定义的Vertex类方法允许Compute()方法查询当前顶点及其边的信息,以及发送消息到其他的顶点。Compute()方法可以通过调用GetValue()方法来得到当前顶点的值,或者通过调用MutableValue()方法来修改当前顶点的值。同时还可以通过由出边的迭代器提供的方法来查看修改出边对应的值。

基于BSP的执行模型

读取输入初始化该图,当图被初始化好后,运行一系列的超级步直到整个计算结束,这些超级步之间通过一些全局的同步点分隔,输出结果结束计算。

在每个超级步中,顶点的计算都是并行的,每个顶点执行相同的用于表达给定算法逻辑的用户自定义函数。每个顶点可以修改其自身及其出边的状态,接收前一个超级步(S-1)中发送给它的消息,并发送消息给其他顶点(这些消息将会在下一个超级步中被接收),甚至是修改整个图的拓扑结构。边,在这种计算模式中并不是核心对象,没有相应的计算运行在其上。

算法是否能够结束取决于是否所有的顶点都已经“vote”标识其自身已经达到“halt”状态了。在第0个超级步,所有顶点都处于active状态,所有的active顶点都会参与所有对应superstep中的计算。顶点通过将其自身的status设置成“halt”来表示它已经不再active。这就表示该顶点没有进一步的计算需要执行,除非被再次被外部触发,而Pregel框架将不会在接下来的superstep中执行该顶点,除非该顶点收到其它顶点传送的消息。如果顶点接收到消息被唤醒进入active状态,那么在随后的计算中该顶点必须显式的deactive。整个计算在所有顶点都达到“inactive”状态,并且没有message在传送的时候宣告结束。





2) graphLab

(1) 简介

GraphLab一套基于c++的开源图计算库,提供了在共享内存情况下的异步、动态和并行图计算的高层抽象API。该库采用MPI和TCPIP来实现进程间通讯,采用Pthreads实现进程内的多线程并发计算,支持从HDFS和标准文件系统中读取数据。GraphLab定义了多种用于存储图的文件格式,包括"tsv","snap", "adj" "bintsv4"。



(2) 与Pregel的不同

GraphLab不是采用BSP的严格执行模型,GraphLab的基于BSP的Pregel的典型的改进是在更好的“异步迭代计算”和“动态计算”。因此该框架计算效率比Pregel更好。

异步计算:很多重要的MLDM算法迭代更新一大批参数,图结构导致参数更新依赖其它的参数。同步系统会以上一次更新的参数基础上一次更新所有的参数(BSP模型中超级步之间市全局路障同步),而异步系统则以最近的参数作为输入来更新参数。异步迭代更新可以极大加 快MLDM算法的计算速度。因为如果采用同步计算,则存在木桶效应,整体速度取决于最慢的那台机器。在大规模云计算环境下,负载不均衡、网络不均衡、硬件差异和多租户等会导致不同 机器之间的速度存在差异。另外由于图分割不均衡,以及计算复杂性等导致各个节点计算量也不均衡。

动态计算:很多MLDM算法的迭代计算收敛都不对称,例如在参数优化是,通常很多参数在很少几次迭代中就会快速收敛,而剩下少数参数则即使经过多次迭代也会收敛很慢。因此如果我们等同更新所有的参数,则会浪费大量的时间在重复计算那些已近收敛的参数上。最近的一些计算框架部分支持动态计算,例如Pregel可以通过让某些节点跳过一些超级步来部分支持动态计算。

(3) GraphLab的计算模型

graphLab包括三个部分:数据图、更新函数、同步操作。数据图表达用户可修改 的程序状态,存储可变的用户自定义数据和计算之间依赖。更新函数通过一个scope的数据变换来表达用户对数据图的计算和操作。同步操作并发维护全局汇总。

一个点的scope代表存储在这个点上的数据 和所有与这个点相邻的点和边上的所有数据。update f (v ,s(v) ) ---> (s(v) , 边集合) 。经过一个更新函数后,新计算出 的s(v) 会被写回图,并返回一个定点集合,针对该集合的每个点再执行 f(u ,s(u))



为了更高效的并行执行,GraphLab容许GraphLab框架动态的选择执行顺序,即RemoveNext(T)的返回值。因为很多MLDM算法需要执行优先级别,因此也可以指定点的优先级,这样GraphLab会综合考虑优先级以及网络情况来调度。

(3) GraphLab的并行计算

根据领域知识,将图分割为K份,K值远大于机器数量。每个分区被称为atom, 以一个文件形式存储类似HDFS的分布式文件系统上。Atom中存储的是增加点和变的操作记录,可以通过回放的方式来重构图。

采取把点着色的方法,先保证每个点和相邻点之间的颜色都不相同。通过一个颜色一个颜色的并发执行,来实现边一致性。把这种成为颜色步,与BSP的超步模型相对应。该引擎保证在执行下一个颜色步之前,所有的修改都被传递,实现颜色步之间的路障同步。

由Master根据atom索引来计算atom的位置,并负责机器与atom之间的分配关系。然后每个机器读取atom文件来加载图。每个机器上有一个调度器负责调度属于自己的子图的点的计算。调度器负责把每个需要执行update 函数之前所需要的数据和锁准备好后,放入一个流水处理队列中,再由一个worker线程池来执行,通过一个分布式算法来确定所有机器上的调度器中的T为空,也就是整个计算结束。

3)graphX



基于SPARK图形计算引擎,GraphX提供的API可以很方便的表达各种针对的转换、过滤和查询操作,但是GraphX不能直接实现迭代并行图计算算法,但是可以基于这些API用来实现各种并行图计算算法。在GraphX论文中描述了利用GraphX来实现Pregel、PowerGraph的方法。

GraphX的优势是可以很方便的与shark等进行集成,例如直接对shark查询后的结果进行图计算。



4) 总结

(1)上述计算引擎都可以以灵活方式来存储图,基本上都可以以文件方式来存储图数据,实现计算引擎与存储分离。

(2)图计算引擎都根据MDML算法特点采用专用计算模型,以提高效率。

(3) 所有图计算引擎在计算时,基本都是需要把数据都加载到内存中。(来自preglel论文:当前整个的计算状态都是驻留在内存中的。我们已经开始将一些数据存到本地磁盘,同时我们会继续在这个方向进行深入的研究,希望可以支持规模太大以至于内存无法完全存下的情况。



往期精彩回顾大数据公司挖掘数据价值的49个典型案例24个终极数据科学项目(免费获取资源)

详细解读:大数据应用及其解决方案

世界杯开幕:AI早已模拟上万场比赛 你猜谁夺冠?


 感谢观看!

end



网络大数据

 (ID:raincent_com)


网络大数据 www.raincent.com

致力于打造中国最专业的网络大数据科学门户网站。


识别二维码,关注网络大数据




    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存