大数据平台的数据同步服务实践

1 minute read

大数据平台的数据同步服务实践

引言

在大数据系统中,我们往往无法直接对在线系统中的数据直接进行检索和计算。在线系统所使用关系型数据库,缓存存储数据的方式都非常不同,很多系统不适合 OLAP 式的查询,也不允许 OLAP 查询影响到在线业务的稳定性。从数仓建设的角度思考,稳定规范的数仓必定依赖于稳定和规范的数据源,数据需要经过采集加工后才能真正被数仓所使用。推动数据同步服务的平台化,才有可能从源头规范数据的产出。数据同步服务不像数据挖掘一样可以直接产生价值,但它更像是连接不同存储的高速公路,好的同步工具可以很大程度上提升数据开发的效率。

本文主要介绍知乎在数据同步这方面的建设,工具选型和平台化的实践。

业务场景及架构

在线业务的数据库在知乎内部还是以 MySQL 和 HBase 为主,所以数据源方面主要考虑 MySQL 和 Hive 的互相同步,后续可以支持 HBase。早期数据同步使用 Oozie + Sqoop 来完成,基本满足业务需求。但是随着数仓任务不断变多,出现了很多重复同步的例子,对同步任务的负载管理也是空白。凌晨同步数据导致 MySQL 不断报警,DBA 苦不堪言。对于业务来说,哪些表已经被同步了,哪些表还没有也是一个黑盒子,依赖其他业务方的数据都只能靠口头的约定。为了解决这些问题,决定对数据同步做一个统一的平台,简化同步任务的配置,调度平衡负载,管理元信息等等。

技术选型

数据同步工具市面上有很多解决方案,面向批的主要有 Apache Sqoop 和阿里开源的 DataX,下面主要对比这两种数据同步工具。

Sqoop

Pros:

  • 基于 MapReduce 实现,容易并行和利用现有集群的计算资源
  • 和 Hive 兼容性好,支持 Parquet,ORC 等格式
  • 支持自动迁移 Schema
  • 社区强大,遇到的问题容易解决

Cons:

  • 支持的数据源不算太丰富(比如 ES),扩展难度大
  • 不支持限速,容易对 MySQL 造成压力
DataX

Pros:

  • 支持的数据源丰富尤其是支持从非关系型数据库到关系型数据库的同步
  • 支持限速
  • 扩展方便,插件开发难度低

Cons:

  • 需要额外的运行资源,当任务比较多的时候费机器
  • 没有原生支持导出到 Hive,需要做很多额外的工作才能满足需求

考虑到同步本身要消耗不少的计算和带宽资源,Sqoop 可以更好的利用 Hadoop 集群的资源,而且和 Hive 适配的更好,最终选择了 Sqoop 作为数据同步的工具。

平台化及实践

平台化的目标是构建一个相对通用的数据同步平台,更好的支持新业务的接入,和公司内部的系统集成,满足业务需求。平台初期设计的目标有以下几个:

  • 简单的任务配置界面,方便新的任务接入
  • 监控和报警
  • 屏蔽 MySQL DDL 造成的影响
  • 可扩展新数据源

简化任务接入

平台不应该要求每个用户都理解底层数据同步的原理,对用户而言,应该是描述数据源 (Source) 和目标存储(Sink),还有同步周期等配置。所有提供的同步任务应该经过审核,防止未经许可的数据被同步,或者同步配置不合理,增加平台负担。最后暴露给用户的 UI 大概如下图。

15421169494010-w600

增量同步

对于数据量非常大的数据源,如果每次同步都是全量,对于 MySQL 的压力会特别大,同步需要的时间也会很长。因此需要一种可以每次只同步新增数据的机制,减少对于 MySQL 端的压力。但是增量同步不是没有代价的,它要求业务在设计表结构的时候,满足一些约束:

  • 业务对数据没有物理的删除操作,而是采用类似标记删除的机制
  • 数据没有 UPDATE (类似日志) 或者有 UPDATE 但是提供 updated_at 来标记每一行最后一次更新的时间

对于满足上面条件,数据量比较大的表就可以采用增量同步的方式拉取。小数据量的表不需要考虑增量同步,因为数据和合并也需要时间,如果收益不大就不应该引入额外的复杂性。一个经验值是行数 <= 2000w 的都属于数据量比较小的表,具体还取决于存储的数据内容(比如有很多 Text 类型的字段)。

处理 Schema 变更

做数据同步永远回避不掉的一个问题就是 Schema 的变更,对 MySQL 来说,Schema 变更就是数据库的 DDL 操作。数据同步平台应该尽可能屏蔽 MySQL DDL 对同步任务的影响,并且对兼容的变更,及时变更推送到目标存储。

数据同步平台会定时的扫描每个同步任务上游的数据源,保存当前 Schema 的快照,如果发现 Schema 发生变化,就通知下游做出一样的变更。绝大部分的 DDL 还是增加字段,对于这种情况数据同步平台可以很好屏蔽变更对数仓的影响。对于删除字段的操作原则上禁止的,如果一定要做,需要走变更流程,通知到依赖该表的业务方,进行 Schema 同步的调整。

存储格式

Hive 默认的格式是 Textfile,这是一种类似 CSV 的存储方式,但是对于 OLAP 查询来说性能并不友好。通常我们会选择一些列式存储提高存储和检索的效率。Hive 中比较成熟的列式存储格式有 Parquet 和 ORC。这两个存储的查询性能相差不大,但是 ORC 和 Hive 集成更好而且对于非嵌套数据结构查询性能是优于 Parquet 的。但是知乎内部因为也用了 Impala,早期的 Impala 版本不支持 ORC 格式的文件,为了兼容 Impala 最终选择了 Parquet 作为默认的存储格式。

关于列式存储的原理和 Benchmark,可以参考这个 Slide

负载管理

当同步任务越来越多时,单纯的按照任务启动时间来触发同步任务已经不能满足需求。数据同步应该保证对于线上业务没有影响,在此基础上速度越快越好。本质上是让 Sqoop 充分利用 MySQL 节点的 iops。要避免对线上服务的影响,对于需要数据同步的库单独建立一个从节点,隔离线上流量。初次之外,需要一个调度策略来决定一个任务何时执行。由于任务的总数量并不多,但是每个任务可能会执行非常长的时间,最终决定采用一个中央式的调度器,调度器的状态都持久化在数据库中,方便重启或者故障恢复。最终架构图如下

数据同步调度器-w336

最终任务的调度流程如下:

  1. 每个 MySQL 实例是调度器的一个队列,根据同步的元信息决定该任务属于哪个队列
  2. 根据要同步数据量预估资源消耗,向调度器申请资源
  3. 调度器将任务提交到执行队列,没有意外的话会立刻开始执行
  4. Monitor 定时向调度器汇报 MySQL 节点的负载,如果负载过高就停止向该队列提交新的任务
  5. 任务结束后从调度器归还资源

性能优化

针对不同的数据源选择合适的并发数

Sqoop 是基于 MapReduce 实现的,提交任务前先会生成 MapReduce 代码,然后提交到 Hadoop 集群。Job 整体的并发度就取决于 Mapper 的个数。Sqoop 默认的并发数是 4,对于数据量比较大的表的同步显然是不够的,对于数据量比较小的任务又太多了,这个参数一定要在运行时根据数据源的元信息去动态决定。

优化 Distributed Cache 避免任务启动对 HDFS 的压力

在平台上线后,随着任务越来越多,发现如果 HDFS 的性能出现抖动,对同步任务整体的执行时间影响非常大,导致夜间的很多后继任务受到影响。开始推测是数据写入 HDFS 性能慢导致同步出现延时,但是任务大多数会卡在提交阶段。随着进一步排查,发现 MapReduce 为了解决不同作业依赖问题,引入了 Distributed Cache 机制可以将 Job 依赖的 lib 上传到 HDFS,然后再启动作业。Sqoop 也使用了类似的机制,会依赖 Hive 的相关 lib,这些依赖加起来有好几十个文件,总大小接近 150MB,虽然对于 HDFS 来说是很小数字,但是当同步任务非常多的时候,集群一点点的性能抖动都会导致调度器的吞吐大幅度下降,最终同步的产出会有严重延时。最后的解决方法是将 Sqoop 安装到集群中,然后通过 Sqoop 的参数 --skip-distcache 避免在任务提交阶段上传依赖的 jar。

关闭推测执行(Speculative Execution)

所谓推测执行是这样一种机制:在集群环境下运行 MapReduce,一个 job 下的多个 task 执行速度不一致,比如有的任务已经完成,但是有些任务可能只跑了10%,这些任务将会成为整个 job 的短板。推测执行会对运行慢的 task 启动备份任务,然后以先运行完成的 task 的结果为准,kill 掉另外一个 task。这个策略可以提升 job 的稳定性,在一些极端情况下加快 job 的执行速度。

Sqoop 默认的分片策略是按照数据库的主键和 Mapper 数量来决定每个分片拉取的数据量。如果主键不是单调递增或者递增的步长有大幅波动,分片就会出现数据倾斜。对于一个数据量较大的表来说,适度的数据倾斜是一定会存在的情况,当 Mapper 结束时间不均而触发推测执行机制时,MySQL 的数据被重复且并发的读取,占用了大量 io 资源,也会影响到其他同步的任务。在一个 Hadoop 集群中,我们仍然认为一个节点不可用导致整个 MapReduce 失败仍然是小概率事件,对这种错误,在调度器上增加重试就可以很好的解决问题而不是依赖推测执行机制。

监控和报警

根据 USE 原则,大概整理出下面几个需要监控的指标:

  • MySQL 机器的负载,IOPS,出入带宽
  • 调度队列长度,Yarn 提交队列长度
  • 任务执行错误数

报警更多是针对队列饱和度和同步错误进行的

和离线作业调度器集成

数据同步平台有一个重要的问题就是处理数据依赖问题。数据同步的数据源一般是整个批计算 DAG 的最上游节点。数据同步平台和调度器之间需要一个方式来通知对方某个数据源已经同步完成,并进行下游任务的拉起。在批计算中,一般是通过对某个分区打标记的方式实现的,比如 Hive 和 Spark 中常用的 _SUCCESS 文件。但是这种方式的问题非常明显:

  • 检查依赖时需要频繁的 polling HDFS 目录的状态,当任务非常多的时候,对 HDFS 集群很不友好
  • _SUCCESS 文件是基于分区的,对没有分区的数据源不适用。同样难以适用于时刻变化的数据源,比如通过 Flink Sink 的实时表
  • 难以被 API 化,和更多其他系统集成

由于数据同步平台还承载了一些实时数据源的同步需求,我们希望对于数据源依赖的描述在流和批上是一致的。因此我们借鉴了 Flink 中 LOW Watermark 的概念,将依赖的检查转变为对数据源 watermark 的检查。并通过 API 暴露了统一的依赖检查接口。

比如 2018-10-10 凌晨一个天级别的同步任务完成,我们就更新 watermark 标识:

watermark(2018-10-09 00:00:00) -> watermark(2018-10-10 00:00:00) 

对依赖的检查也从对标志的检查转变为 watermark 检查。比如一个计算任务依赖 dt=2018-10-10 的数据,只要确保依赖分区代表的时间戳超过了被依赖数据源的 watermark 即可。这种表达方式适用于天和小时级别的表,也可以轻松扩展到实时数据源上。

展望

数据同步发展到比较多的任务后,新增的同步任务越来越多,删除的速度远远跟不上新增的速度,总体来说同步的压力会越来越大,需要一个更好的机制去发现无用的同步任务并通知业务删除,减轻平台的压力。

另外就是数据源的支持不够,Hive 和 HBase、ElasticSearch 互通已经成了一个呼声很强烈的需求。Hive 虽然可以通过挂外部表用 SQL 的方式写入数据,但是效率不高有很难控制并发,很容易影响到线上集群,需要有更好的实现方案才能在生产环境真正的运行起来。

另外这里没有谈到的一个话题就是流式数据如何做同步,一个典型的场景就是 Kafka 的日志实时落地然后实时进行 OLAP 的查询,或者通过 MySQL binlog 实时更新 ElasticSearch 的索引。关于这块的基础设置知乎也在快速建设中,非常欢迎感兴趣同学投简历到 ck@zhihu.com ,加入知乎大数据计算平台组。

参考资料

Updated: