导读:本次分享的题目为京东零售大数据 OLAP 应用与实践,主要介绍京东零售的数据背景,为应对增量数据所开发的一些工具,以及针对高并发大规模数据的业务场景所进行的查询架构的升级和改造,最后会总结常见问题并介绍未来规划。
全文目录:
- 背景介绍
- 数据工具
- 查询架构
- 问题和规划
分享嘉宾|陈洪健 京东 大数据架构师
编辑整理|刘鹏鹏 北京嘀嘀无限科技有限公司
出品社区|DataFun
01
背景介绍
1. 京东黄金眼商智流量业务场景
首先以流量这一业务场景来介绍一下我们现在的业务形态。
- 流量实时
实时流量数据的全量化分析,实时的流量概览,实时流量 PV/UV,流量的来源去向。
- 流量概览
以天/周/月粒度任意维度组合进行流量分析的离线数据,如有效用户数、成交用户数,我们都会进行全维度的分析。全维度的分析,不可能对所有的场景都做预计算,必然有大量的场景是进行极速的 OLAP 来计算。
- 流量的来源去向
我们假定从其他方面购买的流量,包括首次来源、末次来源,流向的分析,如用户从一个店铺跳转到另外一个店铺的趋向分析,流量的衰减,以及当中的一些购买的一些数据的分析,进行更精细的维度分析。
- 流量归因
假定用户在网站上的行为是连续性的,我们把这种区间性的划分为各个独立的访问区间,这里的区间我们认为存在因果关系。从而可以将用户的点击和购买行为进行关联计算的平台。
2. 流量分析特征
流量数据的特征:
- 海量
万亿流量数据,每日更新数据量达千亿量级,目前火热的 ClickHouse 等开源社区工具也满足不了每天更新千亿量级数据量的处理,因此我们也做了一些改造。
- 多变性
流量分析在电商领域中是以商品 SPU 为主,随着组织架构的变化、采销的岗位变化、以及商品、品牌、品类等属性的变化,做及时的调整来制定更及时准确的经营策略。
- 复杂性
首先流量有相应的品类,包括渠道、平台,关联的维度很多且层级也很深,渠道有 1~4 级甚至到 6 级,品类也到 4 级,这种层级的累加就造成了数据分析的困难性和复杂性。其次是数据的随机性,用户的点击行为是任意的,广告的推荐或图片展示都可能吸引用户的点击行为。
- 实效性
也就是我们常说的 SLA,没有及时性的变化,也就没有太大的意义了。同时,查询的时效要求高并发,大量的采销可能在同一个时间点,如双 11 零点就是一个大家都看的时间点,就会造成那个区间段的高并发,另外,还要求查询的低延时。
3. 京东黄金眼解决思路
基于上面的特点,我们主要从三个方向来解决。
- 时效性
首先每天的数据加工离线预计算要求在 8 点前出数;其次是推数时效,需要达到 10 亿量级的更新速度;最后是版本化更新数据无感知,每天更新的量级比较大,假设有张表每天要更新 100 亿数据,当更新了 20 亿的时候,用户访问了此时 PV 是 1000 万,过了一会儿再看又变成 2000 万,就会导致用户的困惑,显然不可能一会儿就增加了 100% 的数据量。因此我们实行了版本化的更新数据,在数据没有完成之前,用户看不到进行中的数据,当他能看到的时候就是正确的数据。这对用户的经营策略的抉择、相应的方案制定都更加精准。
- 精准性
每日更新的数据维度信息通常有两种方案,直接关联 OLAP 表或者字典刷卡。另外也会对特定的场景进行物化视图,如实时从 Kafka 写到 ClickHouse 的场景中如果有去重的要求,用到的引擎是
ReplicatedReplacingMergeTree,如果不去重直接用 ReplicateMergeTree 的话,需要建立物化视图进行去重。需要注意的是物化视图的 group by 和 order by 要尽量跟原表保持一致,否则会产生数据差异性。
- 高并发
ClickHouse 一般只有 20~50 的 QPS,如果要满足 200 甚至 2000 的 QPS 就需要进行升级,这里我们用到了其他引擎的特性。单个的 ClickHouse 集群无法满足业务查询的需求,因此我们进行了多活集群的负载来同时满足业务查询,如 a 到 A 集群查询,b 到 B 集群查询,此时就需要做集群的负载,这必然涉及到集群的状态管控,来实时监控集群 CPU、内存等资源的状态,以及读取 ClickHouse 本身的系统表信息,如 select query_log、system.parts 里的数据合并情况。
--
02
数据工具
第一部分解决了数据从其他数据源到 ClickHouse 的过程,接下来介绍如何避免关联维表或者关联刷数,以及 ClickHouse 的预计算框架。
1. ClickHouse推数工具
我们采用 Spark 作为 ClickHouse 推数的执行引擎。
- 集群检查部分
第一、元数据的补全,也就是我们常说的建表。我们是基于 local 表的建表语句,如果 Cluster 某个节点挂了导致建表失败时会进行回滚。第二、集群负载的管控,主要是内存和 CPU。当查询压力过大时,推数进程会自动进入等待队列。当集群压力得到缓解,如有 60% 的 CPU 空闲时,会继续推数。第三、删除数据检查,更新数据之前会对删除数据进行检查,Zookeeper 的部分节点可能删除数据时已经成功执行了,但是查询的时候还有部分节点没有刷新,必然会导致这次推数会有很大的问题。第四、元数据的更新,我们推数的语句,如 select 一张表会自动在 ClickHouse 里建表及选择引擎,我们提供引擎选择以及可配制化的参数。假设原来有四个字段,并且已经开启元数据更新的配置,当变成了五个字段时,相应的会在 ClickHouse 里面增加一个字段,来补全这个字段的信息。
- 数据传输部分
第一、数据切分。数据切分主要是 Distributed 的数据分布情况,我们从插入数据的环节提前把小文件合并,那么 ClickHouse 的 compaction 压力会变得很小,就不会出现 too many parts 这种情况。
第二、并发推数。推数的逻辑是基于 local 表的形式,因为分布式表会导致单个节点压力过大,无法满足高并发的需求。同时运用到了多副本机制,ClickHouse 本身有一种机制是取任意一个副本,其他的副本数据会自动进行 copy。我们在推数时,会对副本压力最小的机器进行推数,保证在一个集群的情况下读写的相对分离。对于一些像 Array、Map 嵌套这种复杂的数据类型都涉及到推数工具的数据格式转换,这里我们也在推动社区改造成根据列式推数。目前推数还是以行的这种形式,先处理数据的每一行,然后处理每一列,需要判断很多次的数据类型。而列式推数只需要判断一次,数据的转换会变得很快,亿级的数据可能只要秒级就推数完成了。这部分我们已经做了一些开发,但是社区目前还没有这样的接口。
第三、异常数据的处理,Hive 和 ClickHouse 的 String 长度是不一样的,可能会因为超长或者字段类型不匹配,以及重复数据、空值、默认值的处理,都属于异常处理。我们把对应的异常做了分类,这样用户会可以快速的定位到相应的问题,修改脚本提高开发效率。
- 数据验证部分
首先最直观的就是数据推了多少条,在 ClickHouse 里面校验影响多少条。其次是指标的合理值,如对于流量数据,某部门 pv 是 100,推数完成后发现 pv 是零,这显然不合理,就需要进行报警,以此类推也可以进行多个指标的校验。如果每天的偏移量不是很大的话(偏移量可以设置),就可以认为数据是完整的。
2. 技术特性
数据验证完成后就到了版本化的更新。对于小批量数据来说,几秒钟就推数完了,对用户没有感知。但如果是几百亿数据的话,必然会造成数据在用户展示的时候不全,此时就需要进行版本化的更新。推数的时候先放到缓存表,在数据校验完成后再 attach 到正式表,因为缓存表和正式表的存储结构是相等的,因此可以实现秒级的推数。
在数据传输部分,数据是以 Shard 的方式切分,我们知道数据是在哪个分片上,假设集群有 60 个分片,当某一分片所在的节点挂了时,其他的分片不需要重新推数,在替换节点或重启节点之后,只需要对这一个分片进行重新导数。而如果进行回滚的话,会把其他所有的节点的数据全部删除。
3. ClickHouse 刷数工具
为什么会有刷数工具呢?上面提到的电商领域的 SKU 会经常变更,传统的方式是从最下层的流量明细表开始关联维表,把这个 SKU 的 ID 关联最新的维表得到对应部门名称、类目等等,这种方式集群的压力会比较大。因为数据已经按照 SKU 进行分布了,因此我们的方案是先对流量明细进行聚合,如聚合到十分钟级粒度,数据量会减少 30%~70%,可以看到 browser_uniq_id 字段变成了 AggregateFunction,数据量的减少也解决了集群压力的问题。
接下来是刷岗,关联维表会使得并发性很差,还有一种方式是直接关联字典,SKU直接用这种字典方法并发性会差些。那么我们的方案是直接更新数据周期的所有明细数据。因为字典、数据表已经是聚合的状态,都是以同一个 SKU 进行分布的。这时候维表的量级因为是拆分到 60 个分片以上,相当于每个分片上的数据只有 1/60,那么就可以认为是一个 Map Join,因为本身字典要加载到内存,所以刷岗的效率就会非常高。
最终就变成了流量明细刷岗表,所有的数据都是最新的状态,直接查询不需要关联任何的表。
4. 技术特性
- 数据分布
分布式表和字典表一样,明细表、聚合表以 sku_id 分布,此时相对应的用户id都根据同一种规则进行分布。
- LocaltoLocal
聚合的逻辑跟刷岗一样都是 local to local 的形式,本地表减少了分布式表的网络路由和开销。
- 各分片字典不同动态加载
各个分片根据不同的状态进行数据加载,减少内存的使用。
- 多主题字典
如果是大宽表的形式,可以拆分成交易、流量、用户、营销等主题,减少刷数的数据量,当然也可以分集群。我们所有的数据都是最新的状态不需要关联维表,只有一些业务逻辑如交易和流量,其实可以不用把它们合并,直接下发两个集群查交易和流量数据,最后在服务层把数据进行合并。
- 多副本的刷数
如A副本刷 11 月的数据,B 副本刷十月的数据。多个副本同时在工作,此时只需要保证有一个副本是可以供查询的状态。如果是在深夜,可以让所有的副本同时刷数,这样的话刷数周期就会变得非常长,刷完数都会有数据校验和版本化,跟刚才推数是一样的,不管是刷数还是推数的流程,都是先进行了数据校验之后再推到正式表。
5. ClickHouse 预计算
预计算,也就是在 ClickHouse 里聚合计算,这可能是以后 ClickHouse 的一个方向。因为现在大多数的逻辑做维度计算时,还是通过离线在 Hive 里用 Spark 进行计算,然后再推到 ClickHouse 里面。对于一些数据量不是很大但是更新周期非常长的业务,Spark 就不能解决运算数据周期长的问题。
接下来说下整体的逻辑,第一个是 Grouping_ID 路径生成,ClickHouse 没有这种语法,因此我们做了语法的生成。第二个是数据的智能切分,有些数据的分布可以进行介入,然后进行聚合结算,如查询 select xx group by xx 时超内存了,那么这时候就会一个 MPP 的协助器介入进行数据的切分,然后这时候有一个中间缓存,缓存完成后再进行计算,这里面会有并行调节器和集群状态的管控,因为这种进行大批量的预计算,瞬时的下发了很多集群压力会非常大。
6. 技术特性
我们先看一下右边的这个代码,做这种 grouping sets 的时候,如果直接 group by所有的 month 可能会超内存,因为一年只有 12 个月。那么如果转换成 month=1 计算一次、month=2 也计算一次,然后进行数据合并,其实和 grouping sets 是一个原理,不同的这种 sql 进行 union 时,MPP 架构可以进行数据的智能切分。当有这种多维度的时候,把每个维度进行切分,以小维度进行计算后再合并。
技术特性主要有以下几点:
- grouping sets id 的生成,我们要分析生成的所有维度级其实就只有四类,指定的维度、必须要有的维度、有级联关系的维度、需要剔除的维度。那么这时候把一些如这一次我不需要对平台进行 grouping sets id 不区分平台,或者不区分经营类型,这些都可以剔除。那么所有的这种其实就是一个字符串的运算,那么我有或者差合并。就可以把所有这种 grouping sets id 全部生成出来。
- 智能裁剪数据,当超内存的时候,把算不出来的进行拆分然后再 union,从小到多。这样就解决了在 ClickHouse 的内存限制,因为 ClickHouse 可以把单台机子的性能发挥到极致,如果我们不进行介入的话,很多的维度没法在 ClickHouse 里面进行运算。
- 数据缓存和数据校验。在做这种预计算时,必然有很多结果集会经常用到,那么这时候可以把压缩 cache 打开。
- 集群状态管控。我们可以控制 query_log、读取 cpu 的负载情况。
--
03
查询架构
前面解决了数据到 ClickHouse 以及在 ClickHouse 里面的加工,已经能满足大多数的 OLAP 需求,那么接下来的问题就是高并发和低延时,因为数据再怎么取的厉害,从 1000 亿压缩到了最后只成两三百亿,也还是不能避免 ClickHouse 本身并发性以及低时延的限制。
1. 高可用查询架构
这时候用到了其他组件的特性来弥补 ClickHouse 的缺陷。
第一个是从数据服务层下发查询,如果查询路由开了,那么我们可以用 Es 存储大的且变动较小的数据量集合。
第二个是用 Redis 提前缓存,有些用户访问频率非常高,就提前把这些数据进行缓存。对于实时的数据也是一样可以进行预热,一开始可能会有些数据延迟,数据偏移量可以放小一点或者放长一点。这时候数据量可能会有一些差异,但是我可以满足那一个点。其实主要内容算是爆发点。把那个点扛过去,后面整个都是一个平稳的运行。
第三个是 ClickHouse 的存储,对明细数据做预计算;每天离线出数的时间可能要到三四点钟才开始出数,那么 1~4 点钟可以进行提前的预计算;对于流量明细数据,可以满足任意交叉维度的 OLAP 计算,因为其他地方原来也都是 OLAP,但是提前进行预计算就保证了 ClickHouse 从 0 点到 24 点每时每刻要么就是在预计算,要么就是用户查询,CPU 基本上都维持在很高的状态,这样的话用户的体验就很好。
最后是负载均衡,Quota 控制、容灾、多活集群控制。
2. 多存储引擎 Combo 方案
- ClickHouse 多活集群。对重要的业务部署多活(主备)集群,并且为满足容灾尽量使备集群负载控制在 30% 以下,并且需要跨机房部署,因为如果部署在同一个机房的话,一个机房挂掉可能整个集群都不可用了,也就没有太大的意义。
- 分布式 Redis。通过提前预热经常访问的用户,那么这时候他打开数据,打开页面就缓存了,其实并不是走的一个 CK 的 OLAP,它只是我提前进行了 OLAP,只是用了空闲的时间把这个数据提前缓存出来了。
- ES。把大的且更新少的数据集合预计算放在Es里面。
- Spark 预计算。主体的预计算通过spark计算,然后再推到 ClickHouse 里面。
3. 查询优化
① 去重计算
- 一些查询如经常用的 countDistinct,如果对精度要求不是特别高的话,可以用 uniqCombine64 或 count group by,性能要比 countDistinct 快一些。
- 但凡能转成 int 型的,都转成 int 型。int 型可以用 array 函数性能更好。
- 实时有个很大的问题是数据并不是最新的状态,需要用 optimize+final 获取最新的状态数据。
- 实时的话可以用 ReplacingMergeTree 进行不停的状态变更,如果没有状态变更且需要去重的话,可以用 AggregateMergeTree 并基于分钟粒度建立物化视图。再次强调,这里物化视图的聚合 key 索引和 group by、order by 要保持一致,否则会有数据差异,这也是我们踩的最多的坑。然后我们可以用就是 argMax 在多个节点进行去重。
- 如果有些业务需要用这种 bitmap,比方说用户 ID 在部门之间的流程、有效成交,直接用 Uint64 性能会很高。
② 数据裁剪
- Doris 有一个比较好的特性是可以建立 rollup 小表,以物理的代价来换取查询的性能,但 ClickHouse 创建 rollup 表要手工创建会很麻烦,因此我们做了一些工具可以指定需要多少列、对应的索引。
- Name 尽量不要参与计算,能用 ID 的尽量 ID。
③ Join 优化
- join 的话 ClickHouse 稍微不是那么强,可以做一些本地的 in 取代 join,把过滤条件放在 join 的子查询,同时 join 尽量用 int 型。
- global join 会把数据推到所有的节点,比较适合小的维度表。
- join 在本地节点的操作,适合相同分片的两张表关联。
- 关联 join 的顺序是大表在左,小表在右。
--
04
问题和规划
1. 常见问题
在实践过程中我们也遇到了一些问题,如节点掉盘、Zookeeper 节点不一致、关联字典无法高并发查询、CPU 打满无法获取状态、以及多集群结果不一致等等。那么这时候有专业的团队,来对这些问题进行修复。我们的稳定和顺畅,也离不开这些团队的支持。
在导数部分,如果多个任务同时导数,因为经常没有办法获取集群的状态,拼命地往一个节点写导致集群宕机。脏数据的处理,然后副本集合,这时候我们也提了副本集合,就是我们推同一个副本的话可能会不太好。那么我们可以跨副本的进行这种插入。只要保证最终的数据是正确的就行了。
分片级的回滚,导数期间如果删除数据,页面会没有显示。那么这时候我们版本表的出现,然后还有一个就是多实例集群,像我们这边有一些集群就是一台机子性能过于强大,比方像有几百核 CPU 或者是几百 G 内存,那么这时候我们觉得它用一台的话,其实我们往往查询是没有办法打到那么多了,比方说建了个 60 个分片 20 个,三个副本的话,对集群资源是很大的浪费。这时候可以把单台物理机虚拟出三个或者是四个,然后可以尽量的把整台的所有 CPU 跟内存打满,这样的话,可以降本增效的一个过程。然后还有一个在 ClickHouse 里面离线明细按天(时间维度)聚合的导数,其中一些字段需要去重,这些字段我们设置成AggregateFunction(uniq, String)。
插入的时候使用 uniqState,查询的时候使用 uniqMerge,那么还有一些情况,我们需要实时聚合数据,只是需要去重的话,那么这时候就直接用就是 ReplicatedMergeTree,然后建一个物化视图
ReplicatedAggregatingMergeTree 实时聚合。就是物化 DDL,我们还是比较推荐的方式,DDL 尽量是在 Local 层面上进去,因为本身 ZK 的节点或者这种协调性其实并不是可控的。
因为我个人认为 CK 使用的这种 ZK 的这种主从设计,其实可能对于未来是一个隐患,因为可能会遇到在 znode 过多或者是删除数据不及时创建表没有过程之类的,都可能是在 ZK 的这个层面上面出现了问题。Doris 这部分做得就比较好。但是目前它既然是一种情况的话,那么他既然没有办法保证它操作的原子完成这种一致性的话,那么我们可以进行人工的加一些判断,来让集群更加的稳定。
2. 未来规划
- 多集群的联合查询
对于我们来说查询是可以分成 IO 型、内存型或者 CPU 型,不同集群创建时的硬件配置不一样,那么就可以把它下发到对应的性能上更加有优势的集群,可以整体管控查询能力。还有就是一些非常重要的查询,如 VP 的查询优先级相对会更高。比方说在一段时间内销售数据可能不尽如人意,这时候大的 VP 可能要拍板做一些新的经营策略的调整,那这时候他下发了一个查询,我们要及时的返回。
- ClickHouse 预计算的通用性
现在大家更多的只是把 ClickHouse 想像成就是 OLAP。但是 ClickHouse 有很多的时间,如凌晨其实是可以做预计算的,这种可以认为是一个提前的 OLAP。但是我是把数据进行更加合理的调整,让用户的查询变得更加性能化,更加满足用户的需求,更加及时地返回用户数据。还有一个就是 ClickHouse 的与计算 SQL 语义向 Hive 靠近,因为我们原来大部分开发的离线脚本,语义上比较像 Hive 或者 Kafka 的 JDBC。那么这时候如果语义更加靠近的话,我们原来的脚本是可以往上面靠的。这样的话我们改动的工作量相对可能会大一点,但是对于开发人员的体感很好,大部分脚本的迁移,也会提升整体的性能,是一个降本增效的好事情。
- 不同存储数据库的同步
ClickHouse 的预计算结果可以同步到 Es,Redis。
3. 查询资源容器化
上图是我们整体的框架图。
最上层是业务侧的查询请求,我们把业务请求按优先级进行了划分,同时根据请求所耗的资源类型如内存型、IO 型、CPU 型,分成不同的优先级,同时根据 SQL 语义的推断以及每天上报的资源使用情况对用户的价值进行量化,进而区分用户可以划分到哪个类型下。
进行查表策略的调整,以匹配到最小的元数据。当然刚才我们做的存储引擎的优化,combo 的组合如 Es、HBase、Redis 等都是可以用的。
用这种 KV 的形式进行最小的匹配,比方说匹配到最上面一层,当然如果做到最上面 ClickHouse 这一层的话,相对应的会识别出你是内存型、CPU 型还是 IO 型的,然后会找集群压力相对较小的机器下发查询,或者是刚刚查过的相同类型,这时候又发了一个查询,有数据缓存,那么还会下发到这个集群。
以上就是我们整个的未来规划。
--
05
问答环节
Q:刷岗的 SKU 维度表很小,那么 Local 表最大能放多大的表?这样关联速度也会很快吗?
A:第一个是最大能放多大的表?我们这边用一台 64G 内存的机器测试结果是存储的SKU 在 3000 万左右,最高可以放到 5000 万。大概 1G 的话,能放 100 万左右的数据。第二个是为什么不能全用这种直接来查询?主要还是并发性的问题。因为如果大家都在这个时间点加载到内存,并发性可能就只有 10~20,但如果没有这步操作的话,并发可以达到 40~50,是一个倍级的差异。
今天的分享就到这里,谢谢大家。
|分享嘉宾|
陈洪健|京东 大数据架构师
曾负责中兴、烽火通信数仓建设和大数据处理,在京东主要负责海量数据olap,性能优化和架构设计。
|DataFun新媒体矩阵|
|关于DataFun|
专注于大数据、人工智能技术应用的分享与交流。发起于2017年,在北京、上海、深圳、杭州等城市举办超过100+线下和100+线上沙龙、论坛及峰会,已邀请超过2000位专家和学者参与分享。其公众号 DataFunTalk 累计生产原创文章800+,百万+阅读,15万+精准粉丝。
本文暂时没有评论,来添加一个吧(●'◡'●)