近些年,大数据背后的价值也开始得到关注和重视,越来越多的企业开始保存和分析数据,希望从中挖掘大数据的价值。大数据产生的根本还是增量数据,单纯的用户数据不足以构成大数据,然而用户的行为或行为相关的日志的数据量,加之随着物联网的发力,产生的增量数据将不可预估,存储和查询增量数据尤为关键。所以,在笔者的工作经历中,本着以下的目标,寻找更优的大数据存储和查询方案:
数据无损:数据分析挖掘都依赖于我们保存的数据,只有做到数据的无损,才有可能任意的定义指标,满足各种业务需求。
保证数据实时性:数据的实时性越来越重要,实时的数据能够更好的运维产品和调整策略,价值更高。单进程每秒接入3.5万数据以上,数据从产生到能够查询到结果这个间隔不会超过5秒。
业务需求快速响应:随着越来越快的业务发展和数据应用要求的提高,数据的查询需要更灵活,快速响应不同且多变的需求。最好是任意定义指标后能够实时查询出结果。
数据灵活探索性:探索性数据分析在对数据进行概括性描述,发现变量之间的相关性以及引导出新的假设。到了大数据时代,海量的无结构、半结构数据从多种渠道源源不断地积累,不受分析模型和研究假设的限制,如何从中找出规律并产生分析模型和研究假设成为新挑战。因此,探索性数据分析成为大数据分析中不可缺少的一步并且走向前台。
超大数据集,统计分析秒级响应:万亿数据量级,千级维度(非稀疏)的统计分析秒级响应。
目前大数据存储查询方案大概可以分为:Hbase系、Dremel系、预聚合系、Lucene系,笔者就自身的使用经验说说这几个系的优缺点,如有纰漏,欢迎一起探讨。
数据查询包括大体可以分为两步,首先根据某一个或几个字段筛选出符合条件的数据,然后根据关联填充其他所需字段信息或者聚合其他字段信息,本文中提到的大数据技术,都将围绕这两方面。
笔者认为Hbase系的解决方案(例如Opentsdb和Kylin)适合相对固定的业务报表类需求,只需要统计少量维度即可满足业务报表需求,对于单值查询有优势,但很难满足灵活聚合数据的场景。
Hbase的表包含的的概念有rowkey、列簇、列限定符、版本(timestamp)和值,对应实际Hdfs的存储结构可以用下图做一个简单总结:
Hbase表中的每一个列簇会对应一个实际的文件,某种层面来说,Hbase并非真正意义的列式存储方案,只是列簇存储。每个文件有若干个DataBlock(数据块默认64k),DataBlock是HBase中数据存储的最小单元,DataBlock中以KeyValue的方式存储用户数据(KeyValue后面有timestamp,图中未标注),其他信息主要包含索引、元数据等信息,在此不做深入探讨。每个KeyValue都由4个部分构成,分别为key length,value length,key和value。其中key的结构相对复杂,包括rowkey、列、KeyType等信息,而value值对应具体列值的二进制数据。为了便于查询,对key做了一个简单的倒排索引,直接使用了java的ConcurrentSkipListMap。
Hbase管理的核心思想是分级分块,存储时根据Rowkey的范围不同,分散到不同的Region,Region又按照列簇分为不同的Store,每个Store实际上又包括StoreFile(对应Hfile)和MemStore,然后由RegionServer管理不同的Region,RegionServer即对应具体的进程,分散不同的机器,提供分布式的存储和查询。查询时,首先获取meta表(一种特殊的Region)所在的RegionServer,通过meta表查找表rowkey相对应的Region和RegionServer信息,最后连接数据所在的RegionServer,查找到相应的数据。
Hbase的这种结构,特别适合根据rowkey做单值查询,不适合scan的场景,因为大部分Scan的情况基本上需要扫描所有数据,性能会非常差。虽然也有扩展的Hbase二级索引方案,但基本上都是通过协处理器,需要另外建立一份rowkey的对应关系,Scan的时候先通过二级索引查找rowkey,然后在根据rowkey查找相应的数据。
这种方式一定程度上能加快数据扫描,但那对于一些识别度不高的列,如性别这样的字段,对应的rowkey相当之多,这样的字段在查找二级索引时的作用很小,另外二级索引所带来的IO性能的开销都会随之增加。而在需要聚合的场景,对于Hbase而言恰恰需要大量scan数据,会非常影响性能。Hbase只有一个简单rowkey的倒排索引,缺少列索引,所有的查询和聚合只能依赖于rowkey,很难解决聚合的性能问题。
随着Hbase的发展,基于Hbase做数据存储包括Opentsdb和Kylin也随之产生,例如Kylin也是一种预聚合方案,因其底层存储使用Hbase,故笔者将其归为Hbase系。在笔者看来,Opentsdb和Kylin的数据结构极其相似,都是将各种维度值组合,结合时间戳拼成rowkey,利用字典的原理将维度值标签化,达到压缩的目的。如此,可以满足快速查询数据的需要,但同时也会受限于Hbase索引,聚合需要大量scan,并不能提升数据聚合的速度。
为了避免查询数据时的聚合,Kylin可以通过cube的方式定制数据结构,在数据接入时通过指定metric来提前聚合数据。这样虽然在一定程度上解决了数据聚合慢的情况,但这是一种典型的空间换时间的方案,组合在维度多、或者有高基数维度的情况,数据膨胀会非常严重,笔者曾遇到存储后的数据比原始数据大90倍的情况。另外,业务的变化会导致重建cube,难以灵活的满足业务需要。
Parquet作为Dremel系的代表,相对Hbase的方案,Scan的性能更好,也避免了存储索引和生成索引的开销。但对于数据还原和聚合,相对直接使用正向索引来说成本会很高,而且以离线处理为主,很难提高数据写的实时性。
Google的Dremel,其最早用于网页文档数据分析,所以设计为嵌套的数据结构,当然它也可以用于扁平的二维表数据存储。开源技术中,Parquet算是Dremel系的代表,各种查询引擎(Hive/Impala/Drill)、计算框架甚至一些序列化结构数据(如ProtoBuf)都对其进行了支持,甚至Spark还专门针对Parquet的数据格式进行了优化,前途一片光明,本文主要结合Parquet来展开论述。
可以用下图简单表示Parquet的文件格式:
Parquet的数据水平切分为多个Row Group,Row Group为数据读写的缓存单元,每个Row Group包含各个的数据列(Column Chunk),数据列有若干Page,Page是压缩和编码的单元,其相应存储的信息包括元数据信息(PageHeader)、重复深度(Repetition Levels)、定义深度(Definition Levels)和列值(Values)信息。
Page实际有三种类型:数据Page、字典Page和索引Page。数据Page用于存储当前行组中该列的值,字典Page存储该列值的编码字典,每一个列块中最多包含一个字典Page,索引Page目前还不支持,未来可能会引入Bloom Filter,能够判断列值是否存在,更有利于判断搜索条件,提升查询速度。
从Parquet的存储结构来看,Parquet没有严格意义上的索引,在查询的过程中需要直接对Row Group的列数据进行扫描,有两方面来保证查询优化,一个是映射下推(Project PushDown),另外一个是谓词下推(Predicate PushDown)。
映射下推主要是利用列式存储的优势,查询数据时只需要扫描查询中需要的列,由于每一列的所有值都是连续存储的,所以分区取出每一列的所有值就可以实现TableScan算子,而避免扫描整个文件内容。
谓词下推在数据库之类的查询系统中最常用的优化手段之一,通过将一些过滤条件尽可能的在最底层执行,减少上层交互的数据量,从而提升性能。另外,针对谓词下推Parquet做了更进一步的优化,优化的方法是对每一个Row Group的每一个Column Chunk在存储的时候都计算对应的统计信息,包括该Column Chunk的最大值、最小值和空值个数。通过这些统计值和该列的过滤条件可以判断该Row Group是否需要扫描。未来还会增加诸如Bloom Filter和Index等优化数据,更加有效的完成谓词下推。
通过这两方面的优化,Parquet的查询时扫描数据的性能能够得到大幅度提升。那Parquet如果填充数据(不同的列拼成一行记录)和聚合数据呢?
主要是使用了Striping/Assembly算法实现的,该算法的思想是将数据的值分为三部分:重复深度(Repetition Levels)、定义深度(Definition Levels)和列值(Values)。通过重复深度可以在读取的时候结合Schema的定义可以知道需要在哪一层创建一个新的repeated节点(如第一层的的为0,表示是新记录,否则则表示repeat的数据),然后通过定义深度知道该值的路径上第几层开始是未定义,从而还原出数据的嵌套结构,如此便能清楚的把一行数据还原出来。由于缺少行号对应的列正向索引,没有办法直接寻址,单纯的依赖于Striping/Assembly算法还原数据或者聚合处理,相对来说成本会高很多。
另外,Parquet的实时写方面是硬伤,基于Parquet的方案基本上都是批量写。一般情况,都是定期生成Parquet文件,所以数据延迟比较严重。为了提高数据的实时性,还需要其他解决方案来解决数据实时的查询,Parquet只能作为历史数据查询的补充。
Parquet存储是相对索引的存储来说,是一种折中处理,没有倒排索引,而是通过Row Group水平分割数据,然后再根据Column垂直分割,保证数据IO不高,直接Scan数据进行查询,相对Hbase的方案,Scan的性能更好。这种方式,避免了存储索引和生成索引的开销,随着索引Page的完善,相信查询性能值得信赖。而对于数据还原和聚合也没有利用正向索引,而是通过Striping/Assembly算法来解决,这种方式更好能够很取巧的解决数据嵌套填充的问题, 但是相对直接使用正向索引来说成本会很高。
另外,由于是基于Row Group为读写的基本单元,属于粗粒度的数据写入,数据生成应该还是以离线处理为主,很难提高数据写的实时性,而引入其他的解决方案又会带来存储架构的复杂性,维护成本都会相应增加。
最近几年,随着OLAP场景的需要,预聚合的解决方案越来越多。其中比较典型的有Kylin、Druid和Pinot。预聚合的方案,笔者不想做过多介绍,其本身只是单纯的为了满足OLAP查询的场景,需要指定预聚合的指标,在数据接入的时候根据指定的指标进行聚合运算,数据在聚合的过程中会丢失metric对应的列值信息。
笔者认为,这种方式需要以有损数据为代价,虽然能够满足短期的OLAP需求,但是对于数据存储是非常不利的,会丢掉数据本身存在的潜在价值。另外,查询的指标也相对固定,没有办法灵活的自由定义所需的指标,只能查询提前聚合好的指标。
Lucene算是java中最先进的开源全文检索工具,基于它有两个很不错的全文检索产品ElasticSearch和Solr。Lucene经过多年的发展,整个索引体系已经非常完善,能够满足的的查询场景远多于传统的数据库存储,这都归功于其强大的索引。但对于日志、行为类时序数据,所有的搜索请求都也必须搜索所有的分片,另外,对于聚合分析场景的支持也是软肋。
Lucene中把一条数据对应为一个Document,数据中的字段对应Lucene的Field,Field的信息可以拆分为多个Term,同时Term中会包含其所属的Field信息,在Lucene中每一个Document都会分配一个行号。然后在数据接入时建立Term和行号的对应关系,就能够根据字段的信息快速的搜索出相应的行号,而Term与行号的对应关系我们称之为字典。大部分时候查询是多个条件的组合,于是Lucene引入了跳表的思想,来加快行号的求交和求并。字典和跳表就共同组成了Lucene的倒排索引。Lucene从4开始使用了FST的数据结构,即得到了很高的字典压缩率,又加快了字典的检索。
为了快速的还原数据信息和聚合数据,Lucene还引入了列正向索引和行正向索引。列正向索引主要是行号和Term的对应关系,行正向主要是行号和Document的对应关系。这两种索引都是可以根据需要配置使用,例如只有单纯的查询,只是用行正向索引就可以,为了实现数据的聚合则必须列正向索引。
有了这些索引后,就可以通过Term来查询出行号,利用正向索引根据行号还原数据信息,或者对数据进行聚合。
另外,为了满足全文检索的需求,Lucene还引入了分词、词向量、高亮以及打分的机制等等。总的来看,Lucene的整个索引体系比较臃肿,其设计的根本还是搜索引擎的思想,满足全文检索的需求。
Lucene本身是单机版的,没有办法分布式,也就以为着其能处理的还是小数据量。ElasticSearch提供了Lucene的分布式处理的解决方案,其核心思想是将Lucene的索引分片。
在写入场景中,对于同一个index的数据,会按照设定的分片数分别建立分片索引,这些分片索引可能位于同一台服务器,也可能不同。同时,各分片索引还需要为自己对应的副本进行同步,直到副本写入成功,一次写入才算完整的完成。当然,单个文档的写入请求只会涉及到一个分片的写入。搜索场景则大致是逆过程,接受请求的节点将请求分发至所有承担该分片查询请求的节点,然后汇总查询请求。这里值得注意的是,任意一个搜索请求均需要在该index的所有分片上执行。
由于ElasticSearch是一个搜索框架,对于所有的搜索请求,都必须搜索所有的分片。对于一个针对内容的搜索应用来说,这显然没有什么问题,因为对应的内容会被存储到哪一个分片往往是不可知的。然而对于日志、行为类数据则不然,因为很多时候我们关注的是某一个特定时间段的数据,这时如果我们可以针对性的搜索这一部分数据,那么搜索性能显然会得到明显的提升。
同时,这类数据往往具有另一个非常重要的特征,即时效性。很多时候我们的需求往往是这样的:对于最近一段时间的热数据,其查询频率往往要比失去时效的冷数据高得多,而ElasticSearch这样不加区分的分片方式显然不足以支持这样的需求。
而另外一方面,ElasticSearch对于聚合分析场景的支持也是软肋,典型的问题是,使用Hyperloglog这类求基数的聚合函数时,非常容易发生oom。这固然跟这类聚合算法的内存消耗相对高有关(事实上,hll在基数估计领域是以内存消耗低著称的,高是相对count,sum这类简单聚合而言)。
数果智能根据开源的方案自研了一套数据存储的解决方案,该方案的索引层通过改造Lucene实现,数据查询和索引写入框架通过扩展Druid实现。既保证了数据的实时性和指标自由定义的问题,又能满足大数据量秒级查询的需求,系统架构如下图,基本实现了文章开头提出的几个目标。
Tindex-Segment,负责文件存储格式,包括数据的索引和存储,查询优化,以及段内数据搜索与实时聚合等。Tindex是基于Lucene的思想重构实现的,由于Lucene索引内容过于复杂,但是其索引的性能在开源方案中比较完善,在数据的压缩和性能之间做了很好的平衡。我们通过改造,主要保留了其必要的索引信息,比原有的Lucene节省了更多的存储空间,同时也加快了查询速度。主要改进有以下几点:
对于海量行为数据的存储来说,存储容量无疑是一个不容忽视的问题。对于使用索引的方案来说,索引后的数据容量通常相对原有数据会有一定程度的膨胀。针对这类情况,Tindex针对索引的不同部分,分别使用了不同形式的压缩技术,保障了能够支持高效查询的同时仅仅需要较少的容量。对于数据内容部分,使用字典的方式编码存储,每条记录仅仅存储文档编号。对于字典本身的存储,使用了前缀压缩的方式,从而降低高基数维度的空间消耗。实际情况下,使用 Tindex 压缩后的数据占用的存储容量仅仅为原始数据的1/5左右。
由于实际使用中,往往需要同时支持搜索和聚合两种场景,而这两种方式对于索引结构的需求是完全相反的。针对这两种情况,Tindex结合了倒排索引和列正向索引这两种不同类型的索引。对于倒排索引部分,使用字典和跳表等技术,实现了数据的快速检索,而对于正向部分,则通过高效的压缩技术,实现了对于海量行下指定列的快速读取。同时,根据不同的情况,可以选择性的只建立其中一种索引(默认情况对于每一列均会同时建两种索引),从而节省大约一般的存储空间和索引时间。
Tindex-Druid,负责分布式查询引擎、指标定义引擎、数据的实时导入、实时数据和元数据管理以及数据缓存。之所以选择Druid是因为我们发现其框架扩展性、查询引擎设计的非常好,很多性能细节都考虑在内。例如:
堆外内存的复用,避免GC问题;
根据查询数据的粒度,以Sequence的方式构建小批量的数据,内存利用率更高;
查询有bySegment级别的缓存,可以做到大范围固定模式的查询;
多种query,最大化提升查询性能,例如topN、timeSeries等查询等等。
框架可灵活的扩展,也是我们考虑的一个很重要的元素,在我们重写了索引后,Druid社区针对高基数维度的查询上线了groupByV2,我们很快就完成了groupByV2也可见其框架非常灵活。
在我们看来,Druid的查询引擎很强大,但是索引层还是针对OLAP查询的场景,这就是我们选择Druid框架进行索引扩展的根本原因。 另外其充分考虑分布式的稳定性,HA策略,针对不同的机器设备情况和应用场景,灵活的配置最大化利用硬件性能来满足场景需要也是我们所看重的。
在开源的Druid版本上自研,继承了Druid所有优点的同时,对查询部分代码全部重新实现,从而在以下几个方面做了较大改进:
对于数据接入来说,不必区分维度和指标,只需要定义数据类型即可,数据使用原始数据的方式进行存储。当需要聚合时,在查询时定义指标即可。假设我们要接入一条包含数字的数据,我们现在只需要定义一个float类型的普通维度。
不同于原生的Druid只支持string类型维度的情况,我们改进后的版本可以支持string, int, long, float、时间等多种维度类型。在原生的Druid中,如果我们需要一个数值型的维度,那么我们只能通过string来实现,这样会带来一个很大的问题,即基于范围的过滤不能利用有序的倒排表,只能通过逐个比较来实现(因为我们不能把字符串大小当成数值大小,这样会导致这样的结果‘12’ < ’2’),从而性能会非常差,因为数值类型维度很容易出现高基维。对于改进后的版本,这样的问题就简单多了,将维度定义为对应的类型即可。
原有的Druid线上的数据,需要在启动时,全部加载才可以提供查询服务。我们通过改造,实现了LRU策略,启动的时候只需要加载段的元数据信息和少量的段信息即可。一方面提升了服务的启动时间,另外一方面,由于索引文件的读取基本都是MMap,当有大量数据段需要加载,在内存不足的情况,会直接使用磁盘swap Cache换页,严重影响查询性能。数据动态加载的很好的避免了使用磁盘swap Cache换页,查询都尽量使用内存,可以通过配置,最大限度的通过硬件环境提供最好的查询环境。
HDFS,大数据发展这么多年,HDFS已经成为PB级、ZB级甚至更多数据的分布式存储标准,很成熟了,所以数果也选用HDFS,不必重新造轮子。Tindex与HDFS可以完美结合,可以作为一个高压缩、自带索引的文件格式,兼容Hive,Spark的所有操作。
Kafka/MetaQ,消息队列,目前Tindex支持kafka、MetaQ等消息队列,由于Tindex对外扩展接口都是基于SPI机制实现,所以如有需要也可以扩展支持更多的消息队列。
Ecosystem Tools,负责Tindex的生态工具支持,目前主要支持Spark、Hive,计划扩展支持Impala、Drill等大数据查询引擎。
支持冷数据下线,通过离线方式(spark/Hive)查询,对于时序数据库普遍存在的一个问题是,对于失去时效性的数据,我们往往不希望它们继续占据宝贵的查询资源。然后我们往往需要在某些时候对他们查询。对于Tindex而言,可以通过将超过一定时间的数据定义为冷数据,这样对应的索引数据会从查询节点下线。当我们需要再次查询时,只需要调用对应的离线接口进行查询即可。
SQL Engine,负责SQL语义转换及表达式定义。
Zookeeper,负责集群状态管理。
未来还会持续优化改造后的Lucene索引,来得到更高的查询性能。优化指标聚合方式,包括:小批量的处理数据,充分利用CPU向量化并行计算的能力;利用code compile避免聚合虚函数频繁调用;与大数据生态对接的持续完善等等。
后续笔者还会深入讲解每一部分的详细实现原理及实践经验,敬请关注!
博客地址:http://blog.yoqi.me/?p=2703
这篇文章还没有评论