Hive实现时间拉链功能

xiaoxiao2021-02-28  94

背景:

在数据仓库的数据模型设计过程中,经常会遇到如下的业务需求: 1. 表的数据量很大,大几千万或上亿; 2. 表中的部分字段会被update更新操作,如用户的上级领导,产品的描述信息,订单的状态等等; 3. 需要查看某一个时间点或者时间段的历史快照信息,比如,查看某一个订单在历史某一个时间点的状态; 4. 变化的比例和频率不是很大,比如,总共有8000万的用户,每天新增和发生变化的有30万左右; 5. 如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费; 6. 时间拉链历史表,既能满足反应数据的历史状态,又可以最大程度的节省存储空间。

一. 演示的数据

2015-08-21 以及之前的订单表数据:

1|2015-08-18|2015-08-18|创建

2|2015-08-18|2015-08-18|创建

3|2015-08-19|2015-08-21|支付

4|2015-08-19|2015-08-21|完成

5|2015-08-19|2015-08-20|支付

6|2015-08-20|2015-08-20|创建

7|2015-08-20|2015-08-21|支付

8|2015-08-21|2015-08-21|创建

2015-08-22 订单表数据:

1|2015-08-18|2015-08-22|支付

2|2015-08-18|2015-08-22|完成

6|2015-08-20|2015-08-22|支付

8|2015-08-21|2015-08-22|支付

9|2015-08-22|2015-08-22|创建

10|2015-08-22|2015-08-22|支付

2015-08-23 订单表数据:

1|2015-08-18|2015-08-23|完成

3|2015-08-19|2015-08-23|完成

5|2015-08-19|2015-08-23|完成

8|2015-08-21|2015-08-23|完成

11|2015-08-23|2015-08-23|创建

12|2015-08-23|2015-08-23|创建

13|2015-08-23|2015-08-23|支付

将上面所有的数据全部保存到如下文件中:

/home/Hadoop/hivetestdata/Time_zipper/orders.txt

二. 表结构

源系统中订单表结构:

[sql]  view plain  copy use timezipper;   create table orders (       orderid int,       createtime string,       modifiedtime string,       status string   ) row format delimited fields terminated by '|';      load data local inpath "file:///home/hadoop/hivetestdata/Time_zipper/orders.txt" into table orders;  

在数据仓库的ODS层,有一张订单的增量数据表,按天分区,存放每天的增量数据:

[sql]  view plain  copy create table t_ods_orders_inc (       orderid int,       createtime string,       modifiedtime string,       status string   ) partitioned by (day string) row format delimited fields terminated by '|';  

在数据仓库的DW层,有一张订单的历史数据拉链表,存放订单的历史状态数据:

[sql]  view plain  copy create table t_dw_orders_his (       orderid int,       createtime string,       modifiedtime string,       status string,       dw_start_date string,       dw_end_date string   );  

三. 全量数据初始化

在数据从源业务系统每天正常抽取和刷新到DW订单历史表之前,

需要做一次全量的初始化,就是从源订单表中指定某一天以前的数据全部抽取到ODW,并刷新到DW。

以上面的数据为例,比如在2015-08-22这天做全量初始化,那么我需要将2015-08-21以及之前的所有的数据都抽取并刷新到DW,步骤如下:

第一步:抽取全量数据到ODS:

[sql]  view plain  copy use timezipper;   INSERT overwrite TABLE t_ods_orders_inc PARTITION (day='2015-08-21')   SELECT orderid,createtime,modifiedtime,status   FROM orders   WHERE modifiedtime <= '2015-08-21';  

第二步:从ODS刷新到DW:

[sql]  view plain  copy use timezipper;   INSERT overwrite TABLE t_dw_orders_his   SELECT orderid,createtime,modifiedtime,status,   createtime AS dw_start_date,   '9999-12-31' AS dw_end_date   FROM t_ods_orders_inc   WHERE day = '2015-08-21';  

四. 增量抽取历史数据并计算生成时间拉链结果表

从2015-08-23开始,需要每天正常刷新前一天(2015-08-22)的增量数据到历史表,步骤如下:

第一步:通过增量抽取,将2015-08-22的数据抽取到ODS:

[sql]  view plain  copy INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = '2015-08-22')   SELECT orderid,createtime,modifiedtime,status   FROM orders   WHERE modifiedtime = '2015-08-22';  

第二步:通过DW历史数据(数据日期为2015-08-21(包含2015-08-21以及之前的数据)),和ODS增量数据(2015-08-22),刷新历史表:

先把数据放到一张临时表中:

[sql]  view plain  copy drop table if exists t_dw_orders_his_tmp;   create table t_dw_orders_his_tmp as    select  oo.orderid,          oo.createtime,          oo.modifiedtime,          oo.status,          oo.dw_start_date,          oo.dw_end_date   from (select           tt.orderid,          tt.createtime,          tt.modifiedtime,          tt.status,          tt.dw_start_date,          tt.dw_end_date,          row_number() over(distribute by tt.orderid,tt.createtime,tt.modifiedtime,status sort by tt.dw_end_date desc) rn       from (            select a.orderid,                   a.createtime,                   a.modifiedtime,                   a.status,                   a.dw_start_date,                   -- 2015-08-21保存的都是历史数据                   case when b.orderid is not null and a.dw_end_date > '2015-08-22' then '2015-08-21'                         else a.dw_end_date end as dw_end_date                    from t_dw_orders_his a                    left outer join (select * from t_ods_orders_inc where day = '2015-08-22') b                    on (a.orderid = b.orderid)                      union all                    select orderid,                  createtime,                  modifiedtime,                  status,                  modifiedtime as dw_start_date,                  '9999-12-31' as dw_end_date              from t_ods_orders_inc            where day = '2015-08-22') tt    ) oo where oo.rn =1;  

说明:

UNION ALL的两个结果集中,

第一个是用历史表left outer join 日期为 ${yyy-MM-dd} 的增量表,能关联上的,并且dw_end_date > ${yyy-MM-dd},说明状态有变化,则把原来的dw_end_date置为(${yyy-MM-dd} – 1), 

关联不上的,说明状态无变化,dw_end_date无变化。

第二个结果集是直接将增量数据插入历史表,并将dw_end_date设为9999-12-31

最后把临时表中数据插入历史表:

[sql]  view plain  copy INSERT overwrite TABLE t_dw_orders_his   SELECT * FROM t_dw_orders_his_tmp;  

五. 同上面的步骤一样,增量抽取历史数据并计算生成时间拉链结果表

再看将2015-08-23的增量数据刷新到历史表:

[sql]  view plain  copy insert overwrite table t_ods_orders_inc partition (day = '2015-08-23')    select orderid,createtime,modifiedtime,status    from orders    where modifiedtime = '2015-08-23';       drop table if exists t_dw_orders_his_tmp;   create table t_dw_orders_his_tmp as    select  oo.orderid,          oo.createtime,          oo.modifiedtime,          oo.status,          oo.dw_start_date,          oo.dw_end_date   from (select           tt.orderid,          tt.createtime,          tt.modifiedtime,          tt.status,          tt.dw_start_date,          tt.dw_end_date,          row_number() over(distribute by tt.orderid,tt.createtime,tt.modifiedtime,status sort by tt.dw_end_date desc) rn   from (        select a.orderid,               a.createtime,               a.modifiedtime,               a.status,               a.dw_start_date,               -- 2015-08-20保存的都是历史数据               case when b.orderid is not null and a.dw_end_date > '2015-08-23' then '2015-08-22'                     else a.dw_end_date end as dw_end_date                from t_dw_orders_his a                left outer join (select * from t_ods_orders_inc where day = '2015-08-23') b                on (a.orderid = b.orderid)              union all            select orderid,              createtime,              modifiedtime,              status,              modifiedtime as dw_start_date,              '9999-12-31' as dw_end_date          from t_ods_orders_inc        where day = '2015-08-23') tt    ) oo where oo.rn =1;           insert overwrite table t_dw_orders_his    select * from t_dw_orders_his_tmp;  

六. 查看上面步骤生成的时间拉链结果表

按照上面的方法刷新完后,生成的时间拉链的历史表数据如下:

1 2015-08-18 2015-08-18 创建 2015-08-182015-08-21 1 2015-08-18 2015-08-22 支付 2015-08-222015-08-22 1 2015-08-18 2015-08-23 完成 2015-08-239999-12-31 2 2015-08-18 2015-08-18 创建 2015-08-182015-08-21 2 2015-08-18 2015-08-22 完成 2015-08-229999-12-31 3 2015-08-19 2015-08-21 支付 2015-08-192015-08-22 3 2015-08-19 2015-08-23 完成 2015-08-239999-12-31 4 2015-08-19 2015-08-21 完成 2015-08-199999-12-31 5 2015-08-19 2015-08-20 支付 2015-08-192015-08-22 5 2015-08-19 2015-08-23 完成 2015-08-239999-12-31 6 2015-08-20 2015-08-20 创建 2015-08-202015-08-21 6 2015-08-20 2015-08-22 支付 2015-08-229999-12-31 7 2015-08-20 2015-08-21 支付 2015-08-209999-12-31 8 2015-08-21 2015-08-21 创建 2015-08-21 2015-08-21 8 2015-08-21 2015-08-22 支付 2015-08-22 2015-08-22 8 2015-08-21 2015-08-23 完成 2015-08-23 9999-12-31 9 2015-08-22 2015-08-22 创建 2015-08-229999-12-31 10 2015-08-22 2015-08-22 支付 2015-08-229999-12-31 11 2015-08-23 2015-08-23 创建 2015-08-239999-12-31 12 2015-08-23 2015-08-23 创建 2015-08-239999-12-31 13 2015-08-23 2015-08-23 支付 2015-08-239999-12-31

比如我们查看订单8, 可以发现订单8从2015-08-21-2015-08-23号,状态变化了三次(创建->支付->完成),因此历史表中有三条记录。

转载请注明原文地址: https://www.6miu.com/read-70469.html

最新回复(0)