唯品会亿级数据服务平台落地实践

作者 唯品会数据中台团队

作者|邓刚、陈晨、周飞强、冯广远、严旭东、朱寒婷、史修磊、金一丹

数据服务是数据中台体系中的关键组成部分。作为数仓对接上层应用的统一出入口,数据服务将数仓当作一个统一的DB来访问,提供统一的API接口控制数据的流入及流出,能够满足用户对不同类型数据的访问需求。

电商平台唯品会的数据服务自2019年开始建设,在公司内经历了从无到有落地,再到为超过30+业务方提供to B、to C的数据服务的过程。本文主要介绍唯品会自研数据服务Hera的相关背景、架构设计和核心功能。

背景

在统一数仓数据服务之前,数仓提供的访问接入方式往往存在效率问题低、数据指标难统一等问题,具体而言有以下几个比较突出的情况:

● 广告人群USP、DMP系统每天需要通过Hive Server以流的方式从数仓导出数据到本地,每个人群的数据量从几十万到几个亿,人群数量2w+,每个人群运行时间在30min +,部分大人群的运行直接超过1h,在资源紧张的情况下,人群延迟情况严重

● 数仓的数据在被数据产品使用时,需要为每个表新生成一个单独的接口,应用端需要为每一种访问方式(如Presto、Click House)区分使用不同的接口,导致数据产品接口暴涨,不方便维护,影响开发及维护效率。数据在不同的存储时,需要包含clickhouse-client,presto-client等等第三方jar包。

● 不同数据产品中都需要使用一些常用的数据指标,如销售额、订单数、PV、UV等,而这些数据在不同数据产品的实现口径、实现方式都不一样,无法形成数据共享,每个数据产品都重复进行相同的指标建设。因此,在不同数据产品查看相同指标却发现数值不同的情况下,难以判断哪个数据产品提供的数据是准确的。

图1. 在统一数仓数据服务之前,数据流入流出方式

为解决以上问题,数据服务应运而生。目前数据服务的主要优势有:屏蔽底层的存储引擎、计算引擎,使用同一个API(one service),数仓数据分层存储,不同engine的SQL生成能力,自适应SQL执行以及统一缓存架构保障业务SLA,支持数据注册并授权给任何调用方进行使用,提高数据交付效率。

通过唯一的ID标识,数据产品可通过ID查阅数据,而非直接访问对应的数仓表。一方面,指标服务统一了指标的口径,同时也支持快速构建新的数据产品。

架构设计

数据服务能给业务带来运营和商业价值,核心在于给用户提供自助分析数据能力。Hera整体架构基于典型的Master/slave模型,数据流与控制流单独链路,从而保障系统的高可用性。数据服务系统主要分为三层:

1. 应用接入层:业务申请接入时,可以根据业务要求选择数据服务API(TCP Cli-ent),HTTP以及OSP服务接口(公司内部RPC框架)。

2. 数据服务层:主要执行业务提交的任务,并返回结果。主要功能点包括:路由策略,多引擎支持,引擎资源配置,引擎参数动态组装,SQLLispengine生成,SQL自适应执行,统一数据查询缓存,Free Maker SQL动态生成等功能。

3. 数据层:业务查询的数据无论在数仓、Clickhouse、My SQL还是Redis中,都可以很好地得到支持,用户都使用同一套API。

图2. 数据服务整体架构图

调度系统的整体流程大致包含以下模块:

● Master:负责管理所有的Worker、Transfer Server、Adhoc Worker节点,同时负责调度分发作业;

● Worker:负责执行ETL和数据文件导出类型的作业,拉起Adhoc Worker进程(Adhoc任务在Adhoc Worker进程中的线程池中执行),ETL类型的作业通过子进程的方式完成;

● Client:客户端,用于编程式地提交SQL作业;

● Config Center:负责向集群推送统一配置信息及其它运行时相关的配置和SQL-Parser(根据给定的规则解析、替换、生成改写SQL语句,以支持不同计算引擎的执行);

● Transfer Server:文件传输服务。

图3. 数据服务调度流程图

主要功能

Hera数据服务的主要功能有:多队列调度策略、多引擎查询、多任务类型、文件导出、资源隔离、引擎参数动态组装、自适应Engine执行和SQL构建。

多队列调度策略

数据服务支持按照不同用户、不同任务类型并根据权重划分不同调度队列,以满足不同任务类型的SLA。

多引擎查询

数据服务支持目前公司内部所有OLAP和数据库类型,包括Spark、Presto、Clickhouse、Hive、My SQL、Redis。会根据业务具体场景和要求,选择当前最佳的查询引擎。

多任务类型

数据服务支持的任务类型有:ETL、Adhoc、文件导出、数据导入。加上多引擎功能,实现多种功能组合,如Spark adhoc和Presto adhoc。

文件导出

主要是支持大量的数据从数据仓库中导出,便于业务分析和处理,比如供应商发券和信息推送等。

具体执行过程如下:用户提交需要导出数据的SQL,通过分布式engine执行完成后,落地文件到hdfs/alluxio.客户端通过TCP拉取文件到本地。千万亿级的数据导出耗时最多10min。数据导出在人群数据导出上性能由原来的30min+,提升到最多不超过3min,性能提升10~30倍。具体流程如下:

图4. 数据服务文件下载流程图

资源隔离(Worker资源和计算资源)

业务一般分为核心和非核心,在资源分配和调度上也不同。主要是从执行任务Worker和引擎资源,都可以实现物理级别的隔离,最大化减少不同业务之间相互影响。

引擎参数动态组装

线上业务执行需要根据业务情况进行调优,动态限制用户资源使用,集群整体切换等操作,这个时候就需要对用户作业参数动态修改,如OLAP引擎执行任务时,经常都要根据任务调优,设置不同参数。针对这类问题,数据服务提供了根据引擎类型自动组装引擎参数,并且引擎参数支持动态调整,也可以针对特定任务、执行账号、业务类型来设定OLAP引擎执行参数。

自适应Engine执行

业务方在查询时,有可能因为引擎资源不足或者查询条件数据类型不匹配从而导致执行失败。为了提高查询成功率和服务SLA保障,设计了Ad Hoc自适应引擎执行,当一个引擎执行报错后,会切换到另外一个引擎继续执行。具体自适应执行逻辑如下图所示:

图5. 自适应Engine执行

SQL构建

数据服务SQL构建基于维度事实建模,支持单表模型、星型模型和雪花模型。

● 单表模型:一张事实表,一般为DWS或者ADS的汇总事实表。

● 星型模型:1张事实表(如DWD明细事实表)+ N张维表,例如订单明细表(事实表FK=商品ID)+商品维表(维度表PK=商品ID)。

● 雪花模型:1张事实表(如DWD明细事实表)+ N张维表+M张没有直接连接到事实表的维表,例如订单明细表(事实表FK=商品ID)+商品维表(维度表PK=商品ID,FK=品类ID)+品类维表(维度表PK=品类ID)。

图6. SQL维度模型

自定义语法(Lisp)描述指标的计算公式

Lisp是一套自定义的语法,用户可以使用Lisp来描述指标的计算公式。其目标是为了构建统一的指标计算公式处理范式,屏蔽底层的执行引擎的语法细节,最大化优化业务配置和生成指标的效率。

Lisp总体格式(oprator param1 param2 ...)param可以是一个参数,也可以是一个Lisp表达式。目前已经实现的功能:

● 聚合表达式

(count x [y,z...]), count distinct x over (partition by y,z);

在Presto中的实现是approx distinct(x,e) over (partition by y,z),在Spark中的实现是ap-prox count distinct(x,e) over (partition by y,z)。y,z只在开窗函数模式下才生效。目前也支持嵌套的聚合表达式(sum (sum (max x)))。

● 条件表达式

case when实现when1为条件bool或者被比较值then1为对应输出else X为最后的else输出

简单模式(case value val1 then1 [val2 then2] ... [else Val])

eg:(case subject id (int 2) (int 1))-> case subject id when 2 then 1 end)

查找模式(case when1 then1 [when2 then2] ... [else Val])

eg: (case (= subject id (string goods base)) (int 2) (int 1)) -> case when subject id = 'goods base' then 2 else 1 end

● 类型标识表达式

(int xx) xx标识成数字型

(string xx) xx标识成字符串类型

null直接返回null

● 类型转换表达式

(cast bigint xx)

(cast double xx)

(cast string xx)

● 聚合通用表达式

(func a b c ...) 通用Lisp表达式a为函数名后续字段为表达式元素如(func bar 1 2 3)解析为bar(1, 2, 3)

● 非聚合通用表达式

(func none a b c ...) 通用Lisp表达式a为函数名后续字段为表达式元素如(func none bar 1 2 3) 解析为bar(1, 2, 3),设置Lisp对象的aggregation属性为false

例如:(func none json extract scalar 40079 '$.m name' )

Lisp语法的解析

Lisp的解析和翻译是基于antlr4来实现的,处理流程如下:

7. Lisp处理流程图

● 将Lisp(count x y)表达式通过antlr翻译成语法树,如下图所示:

图8. 语法树

● 通过自定义的Listener遍历语法树

● 在遍历语法树的过程中,结合指标的query engine(presto/spark/clickhouse/mysql)元数据生成对应的查询引擎的SQL代码(approx distinct(x,e) over (partition by y))

任务调度

基于Netty库收发集群消息,系统仅仅使用同一个线程池对象Event Loop Group来收发消息,而用户的业务逻辑,则交由一个单独的线程池。

选用Netty的另外一个原因是“零拷贝”的能力,在大量数据返回时,通过文件的形式直接将结果送给调用者。

多队列+多用户调度

业务需求通常包含时间敏感与不敏感作业,为了提高作业的稳定性和系统的可配置性,Hera提供了多队列作业调度的功能。

用户在提交作业时可以显式地指定一个作业队列名,当这个作业在提交到集群时,如果相应的队列有空闲,则就会被添加进相应的队列中,否则返回具体的错误给客户端,如任务队列满、队列名不存在、队列已经关闭等,客户端可以选择“是否重试提交”。

当一个作业被添加进队列之后,Master就会立即尝试调度这个队列中的作业,基于以下条件选择合适的作业运行:

1. 每个队列都有自己的权重,同时会设置占用整个集群的资源总量,如最多使用多少内存、最多运行的任务数量等。

2. 队列中的任务也有自己的权重,同时会记录这个作业入队的时间,在排序当前队列的作业时,利用入队的时间偏移量和总的超时时间,计算得到一个最终的评分。

3. 除了调度系统本身的调度策略外,还需要考虑外部计算集群的负载,在从某个队列中拿出一个作业后,再进行一次过滤,或者是先过滤,再进行作业的评分计算。

一个可用的计算作业评分模型如下:

队列动态因子=队列大小/队列容量* (1-作业运行数/队列并行度)

这个等式表示的意义是:如果某个队列正在等待的作业的占比比较大,同时并行运行的作业数占比也比较大时,这个队列的作业就拥有一个更大的因子,也就意味着在队列权重相同时,这个队列中的作业应该被优先调度。

作业权重=1- (当前时间-入队时间) /超时时间

这个等式表示的意义是:在同一个队列中,如果一个作业的剩余超时时间越少,则意味着此作业将更快达到超时,因此它应该获得更大的选择机会。

score=作业权重+队列动态因子+队列权重

这个等式表示的意义是:对于所有的队列中的所有任务,首先决定一个作业是否优先被调度的因子是设置的队列权重,例如权重为10的队列的作业,应该比权重为1的队列中的作业被优先调度,而不管作业本身的权重(是否会有很大的机率超时);其次影响作业调度优先级的因子是队列动态因子,例如有两个相同权重的队列时,如果一个队列的动态因子为0.5,另外一个队列的动态因子是0.3,那么应该优先选择动态因子为0.5的队列作业进行调度,而不管作业本身的权重;最后影响作业调度优先级的因子是作业权重,例如在同一个队列中,有两个权重分别为0.2和0.5的作业,那么为了避免更多的作业超时,权重为0.2的作业应该被优先调度。

简单描述作业的排序过程就是,首先按队列权重排序所有的队列;对于有重复的队列,则会计算每个队列的动态因子,并按此因子排序;对于每一个队列,作业的排序规则按作业的超时比率来排序;最终依次按序遍历每一个队列,尝试从中选择足够多的作业运行,直到作业都被运行或是达到集群限制条件。这里说足够多,是指每一个队列都会有一个最大的并行度和最大资源占比,这两个限制队列的参数组合,是为了避免因某一个队列的容量和并行度被设置的过大,可能超过了整个集群,导致其它队列被“饿死”的情况。

SQL作业流程

用户通过Client提交原始SQL,这里以Presto SQL为例,Client在提交作业时,指定了SQL路由,则会首先通过访问SQLParser服务,在发送给Master之前,会首先提交SQL语句到SQLParser服务器,将SQL解析成后端计算集群可以支持的SQL语句,如Spark、Presto、Click House等,为了能够减少RPC交互次数,SQLParser会一次返回所有可能被改写的SQL语句。

在接收到SQLParser服务返回的多个可能SQL语句后,就会填充当前的作业对象,真正开始向Master提交运行。

Master在收到用户提交的作业后,会根据一定的调度策略,最终将任务分发到合适的Worker上,开始执行。Worker会首先采用SQL作业默认的执行引擎,比如Presto,提交到对应的计算集群运行,但如果因为某种原因不能得到结果,则会尝试使用其它的计算引擎进行计算。当然这里也可以同时向多个计算集群提交作业,一旦某个集群首先返回结果时,就取消所有其它的作业,不过这需要其它计算集群的入口能够支持取消操作。

当SQL作业完成后,将结果返回到Worker端,为了能够更加高效地将查询结果返回给Client端,Worker会从Master发送的任务对象中提取Client侧信息,并将结果直接发送给Client,直到收到确认信息,至此整个任务才算执行完毕。

在整个作业的流转过程中,会以任务的概念在调度系统中进行传播,并经历几个状态的更新,分别标识new、waiting、running、succeed、failed阶段。

图9. SQL作业处理流程

Metrics采集

数据服务搜集两类metrics,一类静态的,用于描述master/worker/client的基本信息;一类是动态的,描述master/worker的运行时信息。这里主要说明一下有关集群动态信息的采集过程及作用。以worker为例,当worker成功注册到master时,就会开启定时心跳汇报动作,并借道心跳请求,将自己的运行时信息汇报给master。这里主要是内存使用情况,例如当前worker通过估算方法,统计目前运行的任务占据了多少内存,以便master能够在后续的任务分发过程中,能够根据内存信息进行决策。master会统计它所管理的集群整个情况,例如每个任务队列的快照信息、worker的快照信息、集群的运行时配置信息等,并通过参数控制是否打印这些信息,以便调试。

调用情况

目前数据服务每天调用量:

to C: 9000W+/每天。

to B:150W+/每天(透传到执行engine端调用量)。

● ETL任务执行时间基本在3分钟左右完成;

● adhoc查询目前主要有Spark Thrift Server,Presto,Clickhouse 3种引擎,大部分SQL 90% 2s左右完成,Clickhouse查询99%在1s左右完成,Presto调用量50W+/日,Clickhouse调用量44W+/日。

解决的性能问题

数据服务主要解决SLA方面的问题。如人群计算、数据无缝迁移、数据产品SLA(数据产品SLA部分详情的参见《Clickhouse在唯品会数据产品SLA问题的探索》),这里用人群举例说明如下:

人群计算遇到的问题:

● 人群计算任务的数据本地性不好;

● HDFS存在数据热点问题;

● HDFS读写本身存在长尾现象。

数据服务改造新的架构方案:

● 计算与存储同置,这样数据就不需通过网络反复读取,造成网络流量浪费。

● 减少HDFS读写长尾对人群计算造成的额外影响,同时减少人群计算对于HDFS稳定性的影响。

● 广告人群计算介于线上生产任务跟离线任务之间的任务类型。这里我们希望能保证这类应用的可靠性和稳定性,从而更好地为公司业务赋能。

● *过数据服务执行人群计算。

图10. Alluxio和Spark集群混部

基于Alluxio的缓存表同步

将Hive表的location从HDFS路径替换为Alluxio路径,即表示该表的数据存储于Alluxio中。我们使用的方案不是直接写通过ETL任务写Alluxio表的数据,而是由Alluxio主动去拉取同样Hive表结构的HDFS中的数据,即我们创建了一个HDFS表的Alluxio缓存表。基于HDFS的人群计算底表的表结构如下:

    CREATE TABLE `hdfs.ads_tags_table`(
      `oaid_md5` string,
      `mid` string,
      `user_id` bigint,
      .........
      )
    PARTITIONED BY (
      `dt` string)
    LOCATION
      'hdfs://xxxx/hdfs.db/ads_tags_table'

基于Alluxio的人群计算底表的表结构如下:

    CREATE TABLE `alluxio.ads_tags_table`(
      `oaid_md5` string,
      `mid` string,
      `user_id` bigint,
      .........
      )
    PARTITIONED BY (
      `dt` string COMMENT '????')
    LOCATION
      'alluxio://zk@IP1:2181,IP2:2181/alluxio.db/ads_tags_table'

两个表结构的字段和分区定义完全相同。只有两处不同点:通过不同的库名区分了是HDFS的表还是Alluxio的表;location具体确认了数据存储的路径是HDFS还是Alluxio。

由于Alluxio不能感知到分区表的变化,我们开发了一个定时任务去自动感知源表的分区变化,使得Hive表的数据能够同步到Alluxio中。

具体步骤如下:

1. 定时任务发起轮询,检测源表是否有新增分区。

2. 发起一个SYN2ALLUXIO的任务由数据服务执行。

3. 任务执行脚本为将Alluxio表添加与HDFS表相同的分区。

4. 分区添加完成之后,Alluxio会自动从mount的HDFS路径完成数据同步。

图11. Alluxio缓存表同步

人群计算任务

上小节介绍了如何让Alluxio和HDFS的Hive表保持数据同步,接下来需要做的就是让任务计算的Spark任务跑在Spark与Alluxio混部的集群上,充分利用数据的本地性以及计算资源的隔离性,提高人群计算效率。

人群标签计算的SQL样例如下:

    INSERT INTO hive_advert.cd0000127760_full
    SELECT result_id, '20210703'
    FROM
          (SELECT oaid_md5 AS result_id
          FROM hdfs.ads_tags_table AS ta
          WHERE ta.dt = '20210702' and xxxxxxx) AS t

上面是一个Spark SQL的ETL,此处的hdfs.ads tags table即为人群计算依赖的底表,此表为一个HDFS location的表。

人群服务通过调用数据服务执行。数据服务根据底表分区是否同步到Alluxio决定是否需要下推是用Alluxio表来完成计算。如果底表数据已经同步到Alluxio,则使用Alluxio表来做为底表计算人群。

下推逻辑是用Alluxio的表名替换原表,假设此处缓存的Alluxio表名为alluxio.ads tags table,那么原SQL就会被改写成如下:

    INSERT INTO hive_advert.cd0000127760_full
    SELECT result_id, '20210703'
    FROM
          (SELECT oaid_md5 AS result_id
          FROM alluxio.ads_tags_table AS ta
          WHERE ta.dt = '20210702' and xxxxxxx) AS t

依靠数据服务调度系统,通过用户SQL改写以及Alluxio和Spark计算结点混部模式,人群计算任务提速了10%~30%,详见《唯品会实践案例:基于Alluxio优化电商平台热点数据访问性能》

小结

虽然截至今天,Hera数据服务已经支持了很多生产业务,但目前仍有很多需要完善的地方:

● 不同engine存在同一个含义函数写法不一致的情况。这种情况在Presto跟Click House的函数比较时尤为突出,如Presto的strpos(string,substring)函数,在Clickhouse中为position(haystack, needle[, start pos]),且这些函数的参数顺序存在不一致的情况,如何更优雅地支持不同engine的差异情况还需要进一步思考。

● 人群计算采用业界通用的Click House Bit Map解决方案落地,提升人群的计算效率同时扩展数据服务的业务边界。

● 数据服务支持调度的HA和灾备完善,更好地在K8s上进行部署。