Ansible 是一个 IT 自动化软件,类似于 Puppet 和 Chef,是实现 Infra-as-a-code 的一种工具。

QuickStart

理解 Ansible 如何控制远程服务器

Ansible 的所有操作是基于 ssh 协议的,我们可以通过 ssh 命令在远程服务器上执行命令:

$ ssh root@ip 'ping baidu.com'
PING baidu.com (39.156.69.79) 56(84) bytes of data.
64 bytes from 39.156.69.79 (39.156.69.79): icmp_seq=1 ttl=48 time=7.83 ms
64 bytes from 39.156.69.79 (39.156.69.79): icmp_seq=2 ttl=48 time=3.43 ms
64 bytes from 39.156.69.79 (39.156.69.79): icmp_seq=3 ttl=48 time=3.45 ms
64 bytes from 39.156.69.79 (39.156.69.79): icmp_seq=4 ttl=48 time=3.46 ms

在 Ansible 中编写的脚本最终也会被翻译成类似上面命令的方式被执行。

在 Ansible 执行的过程中,需要两种角色的节点,分别对应 ssh 的 client 和 server:

  • 管理节点(执行 ansible 命令的机器)
  • 托管节点(被 ansible 的管理的机器)

Ansible 的管理节点目前支持类 unix 系统,也就是 linux 和 macos。暂时不支持 windows 作为管理节点。管理节点可以是自己本地的电脑,但是由于私有化部署时我们面对的通常都是客户的内网环境,一般都无法在自己的开发机直接访问。为了简化使用方式,我们选择客户的一台服务器及作为管理节点也同时作为托管节点,所有 ansible 的操作总是从管理节点发起。

既然 Ansible 是基于 ssh 协议实现的,我们必须决定以哪个用户的身份登陆,以何种方式进行 ssh 认证,以及确保登陆用户有 sudo 的权限。ssh 常用的认证方式支持密码和证书两种,由于我们希望自动化部署,不想在部署过程中频繁的输入密码,因此配置证书认证是比较好的选择。另外很多客户环境下 sudo 也需要额外的密码,因此需要在初始化环境时配置 ssh 的登陆用户可以免密的进行 sudo 操作。

总结一下,我们需要:

  • 选择一台客户内网的服务器作为管理节点,该节点通常也是我们的托管节点
  • 对托管节点配置基于证书的 ssh 免密登陆
  • 确保 ssh 登陆用户可以免密进行 sudo 操作

Inventory File

Ansible 可同时操作属于一个组的多台主机,组和主机之间的关系通过 inventory 文件配置。默认的文件路径为 /etc/ansible/hosts 。不过我们通常不把 inventory file 放在系统目录下,而是放在单独的配置目录。

我们试着写一个 inventory file :

[webserver]
host1
host2

保存为 hosts ,然后通过 ansible 的命令行执行 adhoc 命令

$ ansible all -i hosts -m ping
host1 | UNREACHABLE! => {
    "changed": false,
    "msg": "Failed to connect to the host via ssh: ssh: Could not resolve hostname host1: nodename nor servname provided, or not known",
    "unreachable": true
}
host2 | UNREACHABLE! => {
    "changed": false,
    "msg": "Failed to connect to the host via ssh: ssh: Could not resolve hostname host2: nodename nor servname provided, or not known",
    "unreachable": true
}

其中 all<host-pattern> , all 是一个特殊的 pattern,匹配所有 hosts。提示报错是预期的结果,因为 host1, host2 并不存在。

Playbook,Role,Task

上一节展示了如何通过 ansible 执行 adhoc 命令,但是大部分时候我们用的更多的是 ansible-playbook 命令。Playbook 是 ansible 的任务编排语言,通过 YAML 来描述。关于 Playbook 和 role, task 的关系,一句话就可以说清楚:剧本 (playbook) 开始,角色们(role) 依次登上舞台,完成自己的任务 (task)。

playbook 一般是 ansible 执行的入口;role 更像是模块,是我们复用代码的基本单位;task 是一个个具体的实现。

先从一个不考虑复用的 playbook 开始:

- hosts: all
  tasks:
    - name: install vim
      yum:
        name: vim
        state: present

    - name: install jdk
      yum:
        name: openjdk
        state: present

这个 playbook 会在所有机器上安装 vim 和 openjdk。yum 是 ansible 提供的一个模块,ansible 拥有非常强大的生态,我们需要几乎所有操作都被 ansible 很好的封装了。所有模块的文档可以参考:https://docs.ansible.com/ansible/latest/modules/modules_by_category.html

随着 task 越来越多,playbook 会变得非常长而且不容易维护,中间如果有重复部分也难以实现代码复用。这个时候就轮到角色(role) 登场了。

编写可复用的脚本

role 从代码上看就是一个特定结构的目录,典型的 role 目录结构如下:

site.yml
webservers.yml
fooservers.yml
roles/
   common/
     files/
     templates/
     tasks/
     handlers/
     vars/
     defaults/
     meta/
   webservers/
     files/
     templates/
     tasks/
     handlers/
     vars/
     defaults/
     meta/

每个目录下默认的入口都是 main.yml 这个文件。然后我们可以在 playbook 中引入这个 role:

---
- hosts: webservers
  roles:
     - common
     - webservers

role 和 role 之间可以设置依赖关系,通过依赖我们可以隐式的执行某些 role。角色依赖需要定义在 meta/main.yml 文件中:

---
dependencies:
  - { role: common, some_parameter: 3 }
  - { role: apache, port: 80 }
  - { role: postgres, dbname: blarg, other_parameter: 12 }

当一个 playbook 包含多个 role,并且 role 依赖了共同的 role 时,可能会有重复执行的情况。ansible 有一些机制避免重复无意义的执行,但是该规则不是很容易直观理解。

变量

变量的定义

Inventory File

[all:vars]
foo1=bar
foo2=bar2

[group1]
host1
host2

[group1:vars]
foo3=bar3

PlayBook

---
- hosts: all
  vars:
    http_port: 80
  vars_prompt:
    - name: service_name
      prompt: "Which service do you want to remove?"
      private: no
  vars_files:
    - /vars/external_vars.yml

Role

有些变量只用于当前 role,可能会定义在 role 当中。一般位于 defaults/mian.yml 或者 vars/ 目录下。

具体的使用参见:https://docs.ansible.com/ansible/latest/user_guide/playbooks_reuse_roles.html

Ansible 预定义

部分变量是 Ansible 自己定义的变量,比如我们要获取机器的 hostname:

{{ ansible_facts['nodename'] }}

这部分预定义变量非常多,可以通过下面命令查看:

$ ansible hostname -m setup
{
    "ansible_all_ipv4_addresses": [
        "REDACTED IP ADDRESS"
    ],
    "ansible_all_ipv6_addresses": [
        "REDACTED IPV6 ADDRESS"
    ],
    "ansible_apparmor": {
        "status": "disabled"
    },
    "ansible_architecture": "x86_64",
    "ansible_bios_date": "11/28/2013"
        ......
}

Command Line

ansible-playbook site.yml --extra-vars='{"foo": "bar"}'

https://docs.ansible.com/ansible/latest/user_guide/playbooks_variables.html

变量的作用域

ansible 的变量作用域主要有 3 种:

  • Global:ansible 配置文件,环境变量,命令行
  • Play:playbook vars, vars_prompt, vars_files; role defaults, vars
  • Host: Inventory 中定义的的 host vars; facts

变量使用

ansible 允许你在各处通过 jinja2 的语法使用变量。jinja2 是一个用 Python 开发的模版引擎,本身并不复杂,核心东西就 3 个:变量的输出,控制流,filter

两个大括号包起来表示输出变量:

I'm {{ name }}

变量输出时可以用过 filter 控制格式,类似 bash 里的 pipeline。filter 本质上是一个 Python 的函数,有一个入参和一个返回结果:

I'm {{ name | trim | title }}

控制流需要区别于输出,用 {% %} 表示。比如一个 for 循环

{% for name in names %}
I'm {{ name | title }}
{% endfor %}

条件判断

{% for name in names %}
{% if ignore_case %}
I'm {{ name | lower }}
{% else %}
I'm {{ name }}
{% endif %}
{% endif %}

上面就是 jinja2 的介绍,更多细节需要去看文档。

需要注意的是,在 Ansible 中如果你要使用 jinja2 的语法去引用一个变量,必须用双引号内使用。

- hosts: all
  vars:
    deploy_path: "{{ home_dir }}/apps"

比如我们想根据各种上下文生成 nginx 的配置文件,可以通过 template 命令来渲染。首先定一个模版文件 nginx.conf.j2

server {
  server_name {{ server_name }};
  listen 80;
  
  location / {
    try_files $uri $uri/ /index.html;
  }
}

我们希望这个配置文件可以覆盖默认的 nginx 配置:

- hosts: nginx
  vars:
    server_name: gio.com
    nginx_user: nginx
    nginx_group: "{{ nginx_user }}"
  tasks:
    - name: generate nginx config file
      template:
        src: nginx.conf.j2
        dest: /etc/nginx/nginx.conf
        owner: "{{ nginx_user }}"
        group: "{{ nginx_group }}"
      become: yes

最佳实践

保持操作的幂等

shell 脚本在执行时,很多命令的执行结果不是确定的。很多时候我们无法避免的需要反复重试某些脚本:

  • 脚本自身有 bug,修完以后需要重试
  • 机器状态不确定。比如脚本执行过程中机器重启了;免密 sudo 忘记配置结果执行到某个需要 sudo 的 task 时跪了。
  • 命令本身执行结果就是不确定的,比如通过 systemd 启动一个进程,systemctl start 命令返回成功并不代表真的启动成功了。

ansible 的写法都是声明式而不是命令式。命令描述过程,声明式描述意图。明确的意图可以让 ansible 可以更好的帮你决定是否要重试某个任务,安全的隐藏细节。比如我们删除某个文件,命令式描述这样的:

- name: delete file
  command: rm /tmp/xxx

但是当我们重复跑这个 task 时,已经被删除文件无法再次被删除,需要在每次删除前检查目标文件是否已经被删除。但是用声明式的写法就很容易实现:

- name: delete file
  file:
    src: /tmp/xxx
    state: absent

编写 ansible 脚本时,要始终记得一件事:不要想着你要做什么操作,而是描述你期望某个对象的状态是怎么样的。

谨慎处理非 root 用户运行时的逻辑

ansible 的 task 有两个属性 becomebecome_user ,分别代表是否要使用 sudo 以及 sudo 的用户。比如创建一个目录并指定目录的 owner 和 group

- name: create dir
  file:
    path: /etc/foo
    state: directory
    owner: foo
    group: foo

如果我们以 root 用户的身份执行 ansible 脚本,上面的脚本没有任何问题。但是如果是一个有 sudo 权限的普通用户,如果没有显式使用 sudo 的话,没有权限在 /etc 目录下创建任何东西。这是时候就需要使用 become

- name: create dir
  file:
    path: /etc/foo
    state: directory
    owner: foo
    group: foo
  become: yes

测试脚本

检查语法:

ansible-playbook --syntax-check <playbook>

Dry-run:

ansible-playbook --check <playbook>

真实执行脚本:

由于测试可能需要反复执行,每次都申请服务器显然不现实,推荐本地用 VirtualBox + Vagrant 来进行测试。

首先定义 Vargrantfile,来创建 3 个虚拟机,hostname 是 hadoop[1-3]

$ cat Vagrantfile                                                                                                                                                    
# -*- mode: ruby -*-
# vi: set ft=ruby :

# All Vagrant configuration is done below. The "2" in Vagrant.configure
# configures the configuration version (we support older styles for
# backwards compatibility). Please don't change it unless you know what
# you're doing.
Vagrant.configure("2") do |config|
  config.vm.define 'hadoop01' do |hadoop01|
    hadoop01.vm.box = 'centos/7'
    hadoop01.vm.hostname = 'hadoop01'
    hadoop01.vm.network "forwarded_port", guest: 80, host: 8000
    hadoop01.vm.network :private_network, :ip => '192.168.10.192'
    hadoop01.vm.provision :hosts, :sync_hosts => true
    hadoop01.vm.provision :hosts, :add_localhost_hostnames => false
    hadoop01.vm.provider :virtualbox do |v|
      v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"]
      v.customize ["modifyvm", :id, "--memory", 2048]
      v.customize ["modifyvm", :id, "--name", "hadoop01"]
    end
  end

  config.vm.define 'hadoop02' do |hadoop02|
    hadoop02.vm.box = 'centos/7'
    hadoop02.vm.hostname = 'hadoop02'
    hadoop02.vm.network "forwarded_port", guest: 80, host: 8001
    hadoop02.vm.network :private_network, :ip => '192.168.10.193'
    hadoop02.vm.provision :hosts, :sync_hosts => true
    hadoop02.vm.provision :hosts, :add_localhost_hostnames => false
    hadoop02.vm.provider :virtualbox do |v|
      v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"]
      v.customize ["modifyvm", :id, "--memory", 2048]
      v.customize ["modifyvm", :id, "--name", "hadoop02"]
    end
  end

  config.vm.define 'hadoop03' do |hadoop03|
    hadoop03.vm.box = 'centos/7'
    hadoop03.vm.hostname = 'hadoop03'
    hadoop03.vm.network "forwarded_port", guest: 80, host: 8002
    hadoop03.vm.network :private_network, :ip => '192.168.10.194'
    hadoop03.vm.provision :hosts, :sync_hosts => true
    hadoop03.vm.provision :hosts, :add_localhost_hostnames => false
    hadoop03.vm.provider :virtualbox do |v|
      v.customize ["modifyvm", :id, "--natdnshostresolver1", "on"]
      v.customize ["modifyvm", :id, "--memory", 2048]
      v.customize ["modifyvm", :id, "--name", "hadoop03"]
    end
  end
end

启动服务器并配置 ssh

$ vagrant up

$ vagrat ssh-config

创建一个用于测试的 inventory file,然后执行 playbook:

ansible-playbook <playbook> -i <inventory_file>

单元测试:

ansible 的单元测试比较接近集成测试。有第一个第三方的框架可以支持:

https://molecule.readthedocs.io/en/latest/

细节比较多这里不单独介绍,执行也需要依赖 docker 或者 vagrant

Reference

背景

Hive 锁机制是为了让 Hive 支持并发读写而设计的 feature,另外要解决并发读写的情况下”脏读“ (Read uncommited)的问题。脏读的问题本身通过实现了原子的 reader/writer 已经得到解决(https://issues.apache.org/jira/browse/HIVE-829)和锁机制并不绑定。

锁机制

Hive 内部定义了两种类型的锁:

  • 共享锁(Share)
  • 互斥锁(Exclusive)

不同锁之间的兼容性入下面表格:

Lock Compatibility Existing Lock(S) Existing Lock(X)
Requested Lock(S) True False
Requested Lock(X) False False

锁的基本机制是:

  • 元信息和数据的变更需要互斥锁
  • 数据的读取需要共享锁

根据这个机制,Hive 的一些场景操作对应的锁级别如下:

Hive command Locks Acquired
select .. T1 partition P1 S on T1, T1.P1
insert into T2(partition P2) select .. T1 partition P1 S on T2, T1, T1.P1 and X on T2.P2
insert into T2(partition P.Q) select .. T1 partition P1 S on T2, T2.P, T1, T1.P1 and X on T2.P.Q
alter table T1 rename T2 X on T1
alter table T1 add cols X on T1
alter table T1 replace cols X on T1
alter table T1 change cols X on T1
alter table T1 concatenate X on T1
alter table T1 add partition P1 S on T1, X on T1.P1
alter table T1 drop partition P1 S on T1, X on T1.P1
alter table T1 touch partition P1 S on T1, X on T1.P1
alter table T1 set serdeproperties S on T1
alter table T1 set serializer S on T1
alter table T1 set file format S on T1
alter table T1 set tblproperties X on T1
alter table T1 partition P1 concatenate X on T1.P1
drop table T1 X on T1

Hive 锁在 zookeeper 上会对应 ephemeral 的节点,避免释放锁失败导致死锁

调试锁🔐

可以通过下面命令查看某个表或者分区的锁

  • SHOW LOCKS ;
  • SHOW LOCKS EXTENDED;
  • SHOW LOCKS PARTITION ();
  • SHOW LOCKS PARTITION () EXTENDED;

See also EXPLAIN LOCKS.

关闭锁机制

可以通过设置 hive.support.concurrency=fasle 来解决

关闭锁机制会造成下面影响:

  • 并发读写同一份数据时,读操作可能会随机失败
  • 并发写操作的结果在随机出现,后完成的任务覆盖之前完成任务的结果
  • SHOW LOCKS, UNLOCK TABLE 会报错

HiveLockManager 的实现

在关闭 Hive 锁的过程中,发现粗暴的禁用 concurrency 会导致 UNLOCK TABLE 语法报错。一些遗留系统已经依赖这个语法来确保自身任务不被阻塞,这样的修改会导致这些程序出现问题。于是转而研究有没有其他简单锁的实现可以达到类似的效果。粗看 Hive 的代码找到这 3 种实现:

  • DbLockManager 配合 DbTxnManager 用于在 Hive 中实现事务,不能单独使用
  • EmbeddedLockManager HiveServer 级别基于内存实现的锁
  • ZooKeeperHiveLockManager 默认的 LockManager 实现,基于 zookeeper 实现的分布式协调锁

Hive Zookeeper 锁泄露问题

在 cancel Hive 查询时,有概率发生 Zookeeper 锁释放失败的问题。因为 Hive 的锁在Zookeeper 是持久节点,累计的锁释放失败可能造成 Zookeeper 的 Node 数量过多,影响 Zookeeper 的性能。社区有对应的 ISSUE,该问题在 2.3.0 版本才被 FIX: https://issues.apache.org/jira/browse/HIVE-15997

HiveServer 上可以发现类似日志,就是锁释放失败的标志:

2019-03-06T07:41:56,556 ERROR [HiveServer2-Background-Pool: Thread-45399] ZooKeeperHiveLockManager: Failed to release ZooKeeper lock:
java.lang.InterruptedException
        at java.lang.Object.wait(Native Method) ~[?:1.8.0_45]
        at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_45]
        at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342) ~[zookeeper-3.4.5-cdh5.5.0.jar:3.4.5-cdh5.5.0--1]
        at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:871) ~[zookeeper-3.4.5-cdh5.5.0.jar:3.4.5-cdh5.5.0--1]
        at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:239) ~[curator-framework-2.6.0.jar:?]
        at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:234) ~[curator-framework-2.6.0.jar:?]
        at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[curator-client-2.6.0.jar:?]
        at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:230) ~[curator-framework-2.6.0.jar:?]
        at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:215) ~[curator-framework-2.6.0.jar:?]
        at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:42) ~[curator-framework-2.6.0.jar:?]
        at org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager.unlockPrimitive(ZooKeeperHiveLockManager.java:474) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager.unlockWithRetry(ZooKeeperHiveLockManager.java:452) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager.unlock(ZooKeeperHiveLockManager.java:440) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager.releaseLocks(ZooKeeperHiveLockManager.java:222) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager.releaseLocks(DummyTxnManager.java:188) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hadoop.hive.ql.Driver.releaseLocksAndCommitOrRollback(Driver.java:1136) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hadoop.hive.ql.Driver.rollback(Driver.java:1516) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1456) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1171) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1166) [hive-exec-2.1.1.jar:2.1.1]
        at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:242) [hive-service-2.1.1.jar:2.1.1]
        at org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91) [hive-service-2.1.1.jar:2.1.1]
        at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:334) [hive-service-2.1.1.jar:2.1.1]
        at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_45]
        at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_45]
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) [hadoop-common-2.6.0-cdh5.5.0.jar:?]
        at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:347) [hive-service-2.1.1.jar:2.1.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_45]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]

锁泄露除了修复这个 ISSUE 以外比较难处理。在公司中,如果有成熟的调度器协调任务的依赖关系,那么非常建议禁用掉 Hive 的锁机制。在表数量众多,分区众多的场景下,使用 Zookeeper 的代价也是非常高的。

参考资料

引言

在大数据系统中,我们往往无法直接对在线系统中的数据直接进行检索和计算。在线系统所使用关系型数据库,缓存存储数据的方式都非常不同,很多系统不适合 OLAP 式的查询,也不允许 OLAP 查询影响到在线业务的稳定性。从数仓建设的角度思考,稳定规范的数仓必定依赖于稳定和规范的数据源,数据需要经过采集加工后才能真正被数仓所使用。推动数据同步服务的平台化,才有可能从源头规范数据的产出。数据同步服务不像数据挖掘一样可以直接产生价值,但它更像是连接不同存储的高速公路,好的同步工具可以很大程度上提升数据开发的效率。

本文主要介绍知乎在数据同步这方面的建设,工具选型和平台化的实践。

业务场景及架构

在线业务的数据库在知乎内部还是以 MySQL 和 HBase 为主,所以数据源方面主要考虑 MySQL 和 Hive 的互相同步,后续可以支持 HBase。早期数据同步使用 Oozie + Sqoop 来完成,基本满足业务需求。但是随着数仓任务不断变多,出现了很多重复同步的例子,对同步任务的负载管理也是空白。凌晨同步数据导致 MySQL 不断报警,DBA 苦不堪言。对于业务来说,哪些表已经被同步了,哪些表还没有也是一个黑盒子,依赖其他业务方的数据都只能靠口头的约定。为了解决这些问题,决定对数据同步做一个统一的平台,简化同步任务的配置,调度平衡负载,管理元信息等等。

技术选型

数据同步工具市面上有很多解决方案,面向批的主要有 Apache Sqoop 和阿里开源的 DataX,下面主要对比这两种数据同步工具。

Sqoop

Pros:

  • 基于 MapReduce 实现,容易并行和利用现有集群的计算资源
  • 和 Hive 兼容性好,支持 Parquet,ORC 等格式
  • 支持自动迁移 Schema
  • 社区强大,遇到的问题容易解决

Cons:

  • 支持的数据源不算太丰富(比如 ES),扩展难度大
  • 不支持限速,容易对 MySQL 造成压力
DataX

Pros:

  • 支持的数据源丰富尤其是支持从非关系型数据库到关系型数据库的同步
  • 支持限速
  • 扩展方便,插件开发难度低

Cons:

  • 需要额外的运行资源,当任务比较多的时候费机器
  • 没有原生支持导出到 Hive,需要做很多额外的工作才能满足需求

考虑到同步本身要消耗不少的计算和带宽资源,Sqoop 可以更好的利用 Hadoop 集群的资源,而且和 Hive 适配的更好,最终选择了 Sqoop 作为数据同步的工具。

平台化及实践

平台化的目标是构建一个相对通用的数据同步平台,更好的支持新业务的接入,和公司内部的系统集成,满足业务需求。平台初期设计的目标有以下几个:

  • 简单的任务配置界面,方便新的任务接入
  • 监控和报警
  • 屏蔽 MySQL DDL 造成的影响
  • 可扩展新数据源

简化任务接入

平台不应该要求每个用户都理解底层数据同步的原理,对用户而言,应该是描述数据源 (Source) 和目标存储(Sink),还有同步周期等配置。所有提供的同步任务应该经过审核,防止未经许可的数据被同步,或者同步配置不合理,增加平台负担。最后暴露给用户的 UI 大概如下图。

15421169494010

增量同步

对于数据量非常大的数据源,如果每次同步都是全量,对于 MySQL 的压力会特别大,同步需要的时间也会很长。因此需要一种可以每次只同步新增数据的机制,减少对于 MySQL 端的压力。但是增量同步不是没有代价的,它要求业务在设计表结构的时候,满足一些约束:

  • 业务对数据没有物理的删除操作,而是采用类似标记删除的机制
  • 数据没有 UPDATE (类似日志) 或者有 UPDATE 但是提供 updated_at 来标记每一行最后一次更新的时间

对于满足上面条件,数据量比较大的表就可以采用增量同步的方式拉取。小数据量的表不需要考虑增量同步,因为数据和合并也需要时间,如果收益不大就不应该引入额外的复杂性。一个经验值是行数 <= 2000w 的都属于数据量比较小的表,具体还取决于存储的数据内容(比如有很多 Text 类型的字段)。

处理 Schema 变更

做数据同步永远回避不掉的一个问题就是 Schema 的变更,对 MySQL 来说,Schema 变更就是数据库的 DDL 操作。数据同步平台应该尽可能屏蔽 MySQL DDL 对同步任务的影响,并且对兼容的变更,及时变更推送到目标存储。

数据同步平台会定时的扫描每个同步任务上游的数据源,保存当前 Schema 的快照,如果发现 Schema 发生变化,就通知下游做出一样的变更。绝大部分的 DDL 还是增加字段,对于这种情况数据同步平台可以很好屏蔽变更对数仓的影响。对于删除字段的操作原则上禁止的,如果一定要做,需要走变更流程,通知到依赖该表的业务方,进行 Schema 同步的调整。

存储格式

Hive 默认的格式是 Textfile,这是一种类似 CSV 的存储方式,但是对于 OLAP 查询来说性能并不友好。通常我们会选择一些列式存储提高存储和检索的效率。Hive 中比较成熟的列式存储格式有 Parquet 和 ORC。这两个存储的查询性能相差不大,但是 ORC 和 Hive 集成更好而且对于非嵌套数据结构查询性能是优于 Parquet 的。但是知乎内部因为也用了 Impala,早期的 Impala 版本不支持 ORC 格式的文件,为了兼容 Impala 最终选择了 Parquet 作为默认的存储格式。

关于列式存储的原理和 Benchmark,可以参考这个 Slide

负载管理

当同步任务越来越多时,单纯的按照任务启动时间来触发同步任务已经不能满足需求。数据同步应该保证对于线上业务没有影响,在此基础上速度越快越好。本质上是让 Sqoop 充分利用 MySQL 节点的 iops。要避免对线上服务的影响,对于需要数据同步的库单独建立一个从节点,隔离线上流量。初次之外,需要一个调度策略来决定一个任务何时执行。由于任务的总数量并不多,但是每个任务可能会执行非常长的时间,最终决定采用一个中央式的调度器,调度器的状态都持久化在数据库中,方便重启或者故障恢复。最终架构图如下

数据同步调度器

最终任务的调度流程如下:

  1. 每个 MySQL 实例是调度器的一个队列,根据同步的元信息决定该任务属于哪个队列
  2. 根据要同步数据量预估资源消耗,向调度器申请资源
  3. 调度器将任务提交到执行队列,没有意外的话会立刻开始执行
  4. Monitor 定时向调度器汇报 MySQL 节点的负载,如果负载过高就停止向该队列提交新的任务
  5. 任务结束后从调度器归还资源

性能优化

针对不同的数据源选择合适的并发数

Sqoop 是基于 MapReduce 实现的,提交任务前先会生成 MapReduce 代码,然后提交到 Hadoop 集群。Job 整体的并发度就取决于 Mapper 的个数。Sqoop 默认的并发数是 4,对于数据量比较大的表的同步显然是不够的,对于数据量比较小的任务又太多了,这个参数一定要在运行时根据数据源的元信息去动态决定。

优化 Distributed Cache 避免任务启动对 HDFS 的压力

在平台上线后,随着任务越来越多,发现如果 HDFS 的性能出现抖动,对同步任务整体的执行时间影响非常大,导致夜间的很多后继任务受到影响。开始推测是数据写入 HDFS 性能慢导致同步出现延时,但是任务大多数会卡在提交阶段。随着进一步排查,发现 MapReduce 为了解决不同作业依赖问题,引入了 Distributed Cache 机制可以将 Job 依赖的 lib 上传到 HDFS,然后再启动作业。Sqoop 也使用了类似的机制,会依赖 Hive 的相关 lib,这些依赖加起来有好几十个文件,总大小接近 150MB,虽然对于 HDFS 来说是很小数字,但是当同步任务非常多的时候,集群一点点的性能抖动都会导致调度器的吞吐大幅度下降,最终同步的产出会有严重延时。最后的解决方法是将 Sqoop 安装到集群中,然后通过 Sqoop 的参数 --skip-distcache 避免在任务提交阶段上传依赖的 jar。

关闭推测执行(Speculative Execution)

所谓推测执行是这样一种机制:在集群环境下运行 MapReduce,一个 job 下的多个 task 执行速度不一致,比如有的任务已经完成,但是有些任务可能只跑了10%,这些任务将会成为整个 job 的短板。推测执行会对运行慢的 task 启动备份任务,然后以先运行完成的 task 的结果为准,kill 掉另外一个 task。这个策略可以提升 job 的稳定性,在一些极端情况下加快 job 的执行速度。

Sqoop 默认的分片策略是按照数据库的主键和 Mapper 数量来决定每个分片拉取的数据量。如果主键不是单调递增或者递增的步长有大幅波动,分片就会出现数据倾斜。对于一个数据量较大的表来说,适度的数据倾斜是一定会存在的情况,当 Mapper 结束时间不均而触发推测执行机制时,MySQL 的数据被重复且并发的读取,占用了大量 io 资源,也会影响到其他同步的任务。在一个 Hadoop 集群中,我们仍然认为一个节点不可用导致整个 MapReduce 失败仍然是小概率事件,对这种错误,在调度器上增加重试就可以很好的解决问题而不是依赖推测执行机制。

监控和报警

根据 USE 原则,大概整理出下面几个需要监控的指标:

  • MySQL 机器的负载,IOPS,出入带宽
  • 调度队列长度,Yarn 提交队列长度
  • 任务执行错误数

报警更多是针对队列饱和度和同步错误进行的

和离线作业调度器集成

展望

数据同步发展到比较多的任务后,新增的同步任务越来越多,删除的速度远远跟不上新增的速度,总体来说同步的压力会越来越大,需要一个更好的机制去发现无用的同步任务并通知业务删除,减轻平台的压力。

另外就是数据源的支持不够,Hive 和 HBase、ElasticSearch 互通已经成了一个呼声很强烈的需求。Hive 虽然可以通过挂外部表用 SQL 的方式写入数据,但是效率不高有很难控制并发,很容易影响到线上集群,需要有更好的实现方案才能在生产环境真正的运行起来。

另外这里没有谈到的一个话题就是流式数据如何做同步,一个典型的场景就是 Kafka 的日志实时落地然后实时进行 OLAP 的查询,或者通过 MySQL binlog 实时更新 ElasticSearch 的索引。关于这块的基础设置知乎也在快速建设中,非常欢迎感兴趣同学投简历到 ck@zhihu.com ,加入知乎大数据计算平台组。

参考资料

Sqoop 是一个数据同步工具,用于关系型数据库和各种大数据存储比如 Hive 之间的数据相互同步。Sqoop 因为它的使用便利得到了广泛使用。类似的工具还有阿里开源的 DataX 和其他商业工具。

Sqoop 2.0 主要解决 Sqoop 1.x 扩展难的问题,提出的 Server-Client 模型,具体用的不是特别多。本文主要介绍的还是 Sqoop 1.x,最新的 Sqoop 版本是 1.4.7

安装

Sqoop 安装需要依赖 Hadoop 和 Hive,以 Debain 为例,安装 Sqoop 也比较简单。

apt-get install hadoop hive hive-hbase hive-hcatalog sqoop

除此之外,针对不同的数据源,需要不同的 JDBC Driver,这个是 Sqoop 默认没有自带的库,需要自行安装。比如 MySQL 的 Driver 是 mysql-connector-java-5.1.13-bin.jar ,确保 Jar 包在 Sqoop 的 classpath 内就行。

数据源

Sqoop 支持非常多的数据源,理论上所有支持 JDBC 的数据源都可以作为 Sqoop 的数据源。最常见的场景还是从关系型数据(RDBMS)导入到 Hive, HBase 或者 HDFS。

Sqoop 的扩展性没有想象中的那么好,但是因为大部分企业的数据仓库还是构建在传统的 Hive 和 HBase 之上的,Sqoop 还是可以满足 80% 的数据同步需求的。

一个简单以 MySQL 作为上游数据源的同步:

sqoop import --connect jdbc:mysql://database.example.com/employees \
  --username dbuser --password "" 

Sqoop 支持将数据同步到 HDFS 或者直接到 Hive:

sqoop import --connect jdbc:mysql://database.example.com/employees \
  --username dbuser --password "" --table employees \
  --hive-import --hive-overwrite \
  --hive-database employees --hive-table employees

存储格式

存储格式主要是 Hive 的概念,但是对于数据同步来讲,格式的选择会影响同步数据,类型系统的兼容性等等,我们必须予以关注。参考下面的表格:

压缩比 预计算 类型兼容性
TextFile 一般
SequenceFile 一般
Parquet 是(sqoop 依赖的版本 feature 不完整)
ORC

Hive 默认的存储格式是 TextFile,TextFile 类似一个 CSV 文件,使用不可见服务分割列,同步后的数据可读性比较好。但是因为所有数据都是按文本存储的,对于某些类型(比如 blob/bit )无法支持。

Parquet/ORC 都是列式存储格式,这里不多介绍。在生产环境中更倾向于选择 Parquet/ORC ,节省空间的同时在 Hive 上的查询速度也更快。

同步为 Parquet 格式:

sqoop import --connect jdbc:mysql://database.example.com/employees \
 --username dbuser --password "" --table employees \
 --hive-import --hive-overwrite \
 --hive-database employees --hive-table employees \
 --as-parquetfile

如果要导出为 ORC 格式,需要借助 Hive 提供的一个组件 HCatalog,同步语法也稍稍不太一样

sqoop import --connect jdbc:mysql://database.example.com/employees \
 --username dbuser --password "" --table employees \
 --drop-and-create-hcatalog-table \
 --hcatalog-database employees --hcatalog-table employees \
 --hcatalog-storage-stanza "STORED AS ORC"

Parquet 理论上也可以通过这种方式同步,不过实测当前 Sqoop 版本 (1.4.7) 还是有 BUG,还是等等吧。

类型的兼容性

由于数据源支持的类型和 Hive 本身可能不太一样,所以必然存在类型转换的问题。实际在使用过程中也是非常头疼的一件事。对于 Hive 来说,支持的类型取决于采用的存储格式。以 MySQL 为例,当存储格式为 Hive 时,基本的类型映射如下:

MySQL(bigint) --> Hive(bigint) 
MySQL(tinyint) --> Hive(tinyint) 
MySQL(int) --> Hive(int) 
MySQL(double) --> Hive(double) 
MySQL(bit) --> Hive(boolean) 
MySQL(varchar) --> Hive(string) 
MySQL(decimal) --> Hive(double) 
MySQL(date/timestamp) --> Hive(string)

这里的类型映射并不完全准确,因为还取决于目标存储格式支持的类型。

由于 Text 格式非常类似 CSV,使用文本存储所有数据,对于 Binary/Blob 这样的类型就无法支持。Parquet/ORC/Avro 因为引入了序列化协议,本身存储是基于二进制的,所以可以支持绝大部分类型。

如果你在使用 TextFile 需要注意下面的问题:

  • 上游数据源中的 NULL 会被转化为字符串的 NULL, Hive 中的 NULL\N 表示
  • 如果内容中含有换行符,同步到 Hive 中会被当做独立的两行来处理,造成查询结果和实际数据不相符

处理方法比较简单

如果在使用 Parquet,要注意 Sqoop 自带的 Parquet 库版本比较旧,不支持 DateTime/Timestamp 类型的数据,而是会用一个表示 ms 的 BIGINT 来代替,分析数据的时候应该注意这点。

数据校验

Sqoop 内建有 validate 机制,只能验证单表的 row count: Sqoop User Guide (v1.4.3)

增量导入

对于数据量很大的库,全量同步会非常痛,但是如果可以选择还是尽可能的选择全量同步,这种同步模式对数据一致性的保证最好,没有状态。如果不得不进行增量同步,可以继续往后看。

增量导入对业务是有一定侵入的,Schema 的设计和数据写入模式需要遵守一定的规范:

  • 增量同步表,最好有一个 Primary Key ,最好是单调递增的 ID
  • 数据的写入模式满足下面两种情形之一
    • (Append)表的内容类似日志,一次写入不做修改和删除
    • (LastModified)表的内容有修改和删除,但是删除操作是逻辑删除,比如用 is_deleted 字段标识,并且有一个最后更新的时间戳比如 updated_atupdated_at 上有索引。

增量的数据同步大致分为 2 个阶段:读取增量数据和合并数据。对 Sqoop 来说,增量同步需要 sqoop-metastore 的支持,用于保存上次同步的位置。

比如对于 Append 模式,假设我们有一张表叫 employees,Primary Key 是 id,上一次同步到 id <= 10000 的数据:

sqoop import --connect jdbc:mysql://database.example.com/employees \
  --username dbuser --password "" --table employees \
  --target-dir <path/to/hive/table/location> \
  --incremental append --check-column id --last-value 10000

我们直接将数据 load 到了 Hive 的表空间里,Hive 可以直接查询到最新增量的数据。
对 LastModified 模式会稍微复杂一些,除了加载增量数据,还涉及数据合并的问题,这里唯一的主键就特别重要了。

sqoop import --connect jdbc:mysql://database.example.com/employees \
  --username dbuser --password "" --table employees \
  --target-dir <path/to/hive/table/location> \
  --incremental lastmodified --check-column updated_at --last-value '2018-07-05 00:00:00'

Sqoop 会在同步结束后再启动一个 merge 任务对数据去重,如果表太小,可能 merge 的代价比全量同步的还要高,我们就要慎重考虑全量同步是不是值得了。

由于 HDFS 不支持修改文件,sqoop 的 --incremental--hive-import 不能同时使用

Sqoop 也提供了单独的 sqoop merge 工具,我们也可以分开进行 import 和 merge 这两个步骤。

加速同步

这个小节讨论一下如何加快 Sqoop 的同步速度,Sqoop 同步速度大致取决于下面的几个因素:

  • 数据源的读取速度
  • HDFS 写入速度
  • 数据倾斜程度
数据源的读取速度

如果上游数据源是 MySQL,可以考虑更换 SSD,保证 MySQL 实例的负载不要太高。除此之外,Sqoop 可以通过参数控制并发读取的 Mapper 个数加快读取速度。

sqoop -m <mapper_num> ......

注意 -m 并不是越大越高,并发数过高会把数据库实例打死,同步速度反而变慢。

Sqoop 默认会通过 jdbc 的 API 来读取数据,但是可以通过参数控制使用 MySQL 自己的 mysqldump 来导出数据,这种方式比 jdbc 快一些,缺点是你不能选择要同步的列。另外只能支持目标格式为 Textfile。比较局限但是特定情况下还是很好使的。

HDFS 写入速度

这个除了刚刚提供的控制并发数,还需要保证 Yarn 分配给 Sqoop 的资源充足,不要让资源成为同步的瓶颈。另外,当我们选择 Parquet/ORC 作为存储格式时,数据在写入的时候需要做大量的预计算,这个过程是比较消耗 CPU 和内存的,我们可以控制 MapReduce 参数,适当提高 Sqoop 的资源配额。

sqoop -Dmapreduce.map.cpu.vcores=4 -Dmapreduce.map.memory.mb=8192 ...
数据倾斜

Sqoop 默认的导入策略是根据主键进行分区导入的,具体的并发粒度取决于 -m 参数。如果主键不连续出现大幅度跳跃,就会导致 Sqoop 导入的时候出现严重的数据倾斜。比如某张表的主键分布是这样的:

1
2
3
...
1000
1001
100000
100001

Sqoop 计算每个 Mapper 读取的数据范围的时候,会遵循很简单的公式计算:

range = (max(pk) - min(pk)) / mapper

几乎出现所有的数据 load 都集中在第一个 mapper 上,整体同步相当于没有并发。

参考阅读:

导出

Sqoop 支持将 Hive 的数据导出到 MySQL,方便在线系统调用。

sqoop export --connect jdbc:mysql://database.example.com/employees --table employees --username dbuser --password "" --relaxed-isolation --update-key id --update-mode allowinsert --hcatalog-database employees --hcatalog-table employees

借助 HCatalog 可以比较轻松的将 Hive 表的数据直接导出到 MySQL。更多的详情参考官方文档,这里不多介绍。

更进一步

如果我们要同步的数据非常多,管理同步任务本身就变成了一件复杂的事情。我们不仅要考虑源数据库的负载,安全性。还要考虑同步任务的启动时间,Schema 变更等等问题。实际使用的时候,我们在内部自研了一个平台,管理 MySQL 和 Hive 的数据源并对 Sqoop 任务做了调度。有一部分功能在 Sqoop 2.0 已经实现了。在大规模使用 sqoop 一定要想清楚运维的问题。

Reference

使用 Hadoop 软件好像难免会自己改下代码做些定制,或者在部分组件的版本选择上激进,其他的版本( 比如 HDFS) 上保守。最近在公司升级 Hive 到 2.1.1 ,也对代码做了一些调整确保对业务的兼容性,之前公司使用的是 hive-1.2.2-cdh-5.5.0 。cdh 的发布节奏太慢跟不上社区的节奏,而且截止到现在,社区版本的 BUG 数量和稳定性都可以接受而不是必须选择商业公司给我们提供的发行版。

公司用的服务器是 Debian 7/8,为了方便的把定制过的 Hive 部署到服务器,需要将 Hive 打包成 deb,一直没找到特别好的打包方法。要做到 Cloudera 那样规范的 deb 非常繁琐,要处理启动脚本,环境变量,配置文件的 alternatives 等等。顺着这个思路找到了 Cloudera 官方的打包工具 cdh-package ,但是这个库已经太长时间没有维护了,里面依赖的版本信息非常老旧,而且自己测试也没运行成功。但是 cdh-package 是基于 BigTop 的,BigTop 本身的维护还不错。

Bigtop 非常有野心,它的主要目标就是构建一个 Apache Hadoop 生态系统的包和交互式测试的社区。这个包括对各类不同级别工程进行测试(包,平台,运行时间,升级等...),它由社区以关注系统作为一个整体开发而来。BigTop 官方除了介绍怎么安装之外没有任何使用文档,不过研究以后发现还算简单,不需要太多的说明。

准备 BigTop 环境

可以根据官方的说明来安装,我这里是直接从 Github 拉了代码:

git clone https://github.com/apache/bigtop.git
cd bigtop
git checkout release-1.2.1
./gradlew

然后我们可以运行 ./gradlew tasks 看下 BigTop 给我们提供的命令,命令遵循下面的格式 ./gradlew <package>-<dist>

$ ./gradlew tasks
# hide some output
hive-clean - Removing hive component build and output directories
hive-deb - Building DEB for hive artifacts
hive-download - Download hive artifacts
hive-help - List of available tasks for hive
hive-info - Info about hive component build
hive-pkg - Invoking a native binary packaging target deb
hive-relnotes - Preparing release notes for hive. No yet implemented!!!
hive-rpm - Building RPM for hive artifacts
hive-sdeb - Building SDEB for hive artifacts
hive-spkg - Invoking a native binary packaging target sdeb
hive-srpm - Building SRPM for hive artifacts
hive-tar - Preparing a tarball for hive artifacts
hive-version - Show version of hive component

然后编辑 bigtop.bom 将依赖的版本改成自己需要的版本,注意 BigTop 这里会优先使用 bigtop.bom 中定义的版本号覆盖源代码的版本号。

'hive' {
      name    = 'hive'
      relNotes = 'Apache Hive'
      version { base = '1.2.1'; pkg = base; release = 1 }
      tarball { destination = "apache-${name}-${version.base}-src.tar.gz"
                source      = destination }
      url     { download_path = "/$name/$name-${version.base}/"
                site = "${apache.APACHE_MIRROR}/${download_path}"
                archive = "${apache.APACHE_ARCHIVE}/${download_path}" }
    }

下面将介绍如何用 BigTop 打包 Hive

用 BigTop 打包 Hive

我们的目标是将一份修改过的 Hive 代码打包成 deb 包分发到集群,首先在编辑机器上准备一些必要的依赖:

sudo apt-get update
sudo apt-get install devscripts
sudo apt-get install dh-make

接下来准备 Hive 的代码,Bigtop 默认根据 bom 文件里指定的版本号从上游下载 Hive 的代码,解压然后编译。但是由于我们要使用自己修改过的版本,可以修改 bigtop.bom 从内部 git 仓库下载代码。

  'hive' {
      name    = 'hive'
      relNotes = 'Apache Hive'
      version { base = '2.1.1'; pkg = base; release = 1 }
      tarball { destination = "apache-${name}-${version.base}-src.tar.gz"
                source      = destination }
      git     { repo = "https://exmaple.com:hive/hive"
                ref = "release-2.1.1"
                dir  = "${name}-${version.base}" }
    }

然后就可以开始打包了:

./gradlew hive-deb

注意 BigTop 在它的仓库里包含了对 Hive 的几个 patch 文件,我这边测试的时候会导致编译失败,建议删除:

rm -f /bigtop-packages/src/common/hive/*

然后清理构建环境,重新打包:

./gradlew hive-clean
./gradlew hive-deb

如果需要更新包,可以提升 Release number,默认是 1 ,这个 BigTop 在文档里没有提及:

BIGTOP_BUILD_STAMP=<release> ./gradlew hive-deb

发布 deb 包

看看我们构建的结果,产生了很多 deb 包,这些包都需要上传到内部的 mirror

$ ls -l output/hive/
total 138516
-rw-r--r-- 1 ck ck 78645700 Feb  1 18:32 hive_2.1.1-1_all.deb
-rw-r--r-- 1 ck ck   314946 Feb  1 18:33 hive_2.1.1-1_amd64.build
-rw-r--r-- 1 ck ck     3461 Feb  1 18:32 hive_2.1.1-1_amd64.changes
-rw-r--r-- 1 ck ck    12500 Feb  1 18:18 hive_2.1.1-1.debian.tar.xz
-rw-r--r-- 1 ck ck     1227 Feb  1 18:18 hive_2.1.1-1.dsc
-rw-r--r-- 1 ck ck     1829 Feb  1 18:18 hive_2.1.1-1_source.changes
-rw-r--r-- 1 ck ck 20999949 Feb  1 18:18 hive_2.1.1.orig.tar.gz
-rw-r--r-- 1 ck ck   107906 Feb  1 18:32 hive-hbase_2.1.1-1_all.deb
-rw-r--r-- 1 ck ck   452862 Feb  1 18:32 hive-hcatalog_2.1.1-1_all.deb
-rw-r--r-- 1 ck ck     3632 Feb  1 18:32 hive-hcatalog-server_2.1.1-1_all.deb
-rw-r--r-- 1 ck ck 39029552 Feb  1 18:32 hive-jdbc_2.1.1-1_all.deb
-rw-r--r-- 1 ck ck     3734 Feb  1 18:32 hive-metastore_2.1.1-1_all.deb
-rw-r--r-- 1 ck ck     3738 Feb  1 18:32 hive-server2_2.1.1-1_all.deb
-rw-r--r-- 1 ck ck  2068240 Feb  1 18:32 hive-webhcat_2.1.1-1_all.deb
-rw-r--r-- 1 ck ck     3608 Feb  1 18:32 hive-webhcat-server_2.1.1-1_all.deb