hive拉链工具

xiaoxiao2021-02-28  129

大家好 最近由于公司业务需要,小白写了一篇hive拉链工具,下边对工具进行简单的介绍 工具名为zipperu(意思是拉链工具),由bin,conf,historys,logs,tmp组成 实现原理 小编的实现原理是根据业务表(你每天更新的表),你所关注的字段(比如phonenumber发生了变化你就认为这条数据发生了变化,然后更改其历史状态)进行MD5加密,比较该字段的MD5值是否发生变化,则跟新该条数据,否则不跟新! bin下边只有一个简单的脚本 zipperu.sh 用来处理所有的任务,以及业务逻辑 conf下边有个zipperu.conf文件,打开文件由 tableN=xxx(需要处理的业务表,由库名.表名组成) rowkeys={customerid} 中括号里边是业务表的主键,如果是多个用逗号隔开,比如{id,di2,id3}最后一列不加逗号 tableMD5=xxxx  tableMD5要生成加md5表的名字 column={birthday} birthday是你业务表需要关注的维度,如果这个字段有变化,就认为这天数据已经更新 zipperu.conf文件每行代表一个需要处理的表,字段之间用tab或则空格隔开 historys 就是每天拉链自动生成的sql脚本 logs 每天运行的任务记录 tmp 脚本执行生成的临时文件,请不要将任何文件放在tmp目录下,以为脚本启动会清空tmp目录 目前还不支持删除,只支持新增和更改,由于小编水平,有限,多多包涵! 若有问题,请联系我  微信号 dym012345678 邮箱  18233005414@163.com 代码如下 #!/bin/bash . /etc/profile cd `dirname $0` logs_data=`date +%F` confFile=../conf/zipperu.conf mkdir -p ../logs/$logs_data mkdir -p ../historys/$logs_data cat $confFile | while read linet;  do rm -rf ../tmp/* echo "--------------------------------------------------------------------------------------------------正在读取配置文件$confFile----------------------------------------------------------------------------------------------------------------------" if [[ "$confFile" = "" ]] ; then          echo "-------------------------------------------------------------------------您输入的配置文件为空,请输入有效配置文件!-------------------------------------------------------------------------------"          exit 1    else                         echo "----------------------------------------------------------------------------本次拉链,您输入的配置文件为:$confFile-----------------------------------------------------------------"    fi tableN=`echo $linet | awk '{print $1}'|awk -F '=' '{print $2}'` if [[ "$tableN" = "" ]] ; then                                echo "您的配置文件$linet hive表配置为空,请重新配置"                         exit 1 else                             echo "----------------------------------------------------------本次拉链,您配置的hive表为:$tableN-------------------------------------------------------------------"                 fi rowkeys=`echo $linet | awk '{print $2}'|awk -F '=' '{print $2}' |sed 's/}//g'|sed 's/{//g'`                 if [[ "$rowkeys" = "" ]] ; then                         echo "您的配置文件$linet hive表主键配置为空,请重新配置"                         exit 1                 else echo $rowkeys >> ../tmp/$tableN.rowkeys_tmp1 cat ../tmp/$tableN.rowkeys_tmp1 | tr -s "\","\" "\"\012"\" | sed s/[[:space:]]//g >> ../tmp/$tableN.rowkeys_tmp2 rowkey=`sed -n '1p' ../tmp/$tableN.rowkeys_tmp2` rowkeysn=`cat ../tmp/$tableN.rowkeys_tmp2 |wc -l`                         echo "----------------------------------------------------------本次拉链,您配置的hive表主键为:$rowkey------------------------------------------------------------------"                 fi tableMD5=`echo $linet | awk '{print $3}'|awk -F '=' '{print $2}'`                 if [[ "$tableMD5" = "" ]] ; then                         echo "您的配置文件$linet hiveMD5表配置为空,请重新配置"                         exit 1                 else                         echo "----------------------------------------------------------本次拉链,您配置的hiveMD5表为:$tableMD5-------------------------------------------------------------------"                 fi column=`echo $linet | awk '{print $4}'|awk -F '=' '{print $2}'|sed 's/}//g'|sed 's/{//g'` #获取配置文件中的列 if [[ "$column" = "" ]] ; then                         echo "您的配置文件$linet下列为空"                         exit 1                 else echo $rowkey start_time=`date "+%Y%m%d%H%M%S"` start_date=`date +%F` end_date=`date +%F` etl_time=`date '+%Y-%m-%d %H:%M:%S'` tableMD5_Y="${tableMD5}"_Y"" tableN_his="${tableN}"_his"" tableN_tmp_h="${tableN}"_tmp_h"" tableN_tmp_c="${tableN}"_tmp_c"" #rm -rf ../tmp/*                         echo "----------------------------------------------------------本次拉链,您配置的列为:$column-------------------------------------------------------------------" echo $column >> ../tmp/$tableN.tmp cat ../tmp/$tableN.tmp |tr -s "\","\" "\"\012"\" | sed s/[[:space:]]//g > ../tmp/$tableN.tmp2 rm -rf ../tmp/$tableN.tmp ln=`cat ../tmp/$tableN.tmp2 | wc -l` if [[ "$ln" -gt  "1" ]] ; then var=0 for line in `cat ../tmp/$tableN.tmp2`; do linenum=`awk '{print NR}' ../tmp/$tableN.tmp2 |tail -n1` linenum1=`echo $[linenum-1]` if [ $linenum1 -eq $var ] ; then                                  echo "coalesce($line,''),','" >> ../tmp/$tableN.tmp3 #是最后一个字段处理                          else echo "coalesce($line,''),','," >> ../tmp/$tableN.tmp3 #最后一个字段处理                          fi                                   ((var+=1)) done rm -rf ../tmp/$tableN.tmp2 column2=`cat ../tmp/$tableN.tmp3` echo $column2 >> ../tmp/$tableN.tmp4 cat ../tmp/$tableN.tmp4 | sed s/[[:space:]]//g > ../tmp/$tableN.tmp5 column2=`cat ../tmp/$tableN.tmp5` ############################################################################################### #获取当前表的字段tableN(业务表的所有字段字段,用来见分区表) hive -e "desc $tableN;" >> ../tmp/$tableN.colsinfo_tmp1 expand ../tmp/$tableN.colsinfo_tmp1 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2 rm -rf ../tmp/$tableN.colsinfo_tmp1 tableNcolsn=`cat ../tmp/$tableN.colsinfo_tmp2 |wc -l` echo $tableNcolsn sed -i 's/$/,/' ../tmp/$tableN.colsinfo_tmp2 tableNcols=`cat ../tmp/$tableN.colsinfo_tmp2` sql0="create table $tableN_his($tableNcols etl_time string , versions int , start_date string) partitioned by (end_date string);"  echo $sql0 >> ../historys/$logs_data/$start_time$tableN_his.create.sql sql1="drop table if exists $tableN_tmp_h;create table $tableN_tmp_h as select *,md5(concat($column2)) as md5_str from $tableN_his where end_date = '3000-12-31';" echo $sql1 >> ../historys/$logs_data/$start_time$tableN_his.create.sql sql2="drop table if exists $tableN_tmp_c;create table $tableN_tmp_c as select *,md5(concat($column2)) as md5_str from $tableN;" echo $sql2 >> ../historys/$logs_data/$start_time$tableN_his.create.sql awk '{print $1}' ../tmp/$tableN.colsinfo_tmp2 > ../tmp/$tableN.colsinfo_tmp3 echo "etl_time" >>../tmp/$tableN.colsinfo_tmp3 echo "versions" >>../tmp/$tableN.colsinfo_tmp3 echo "start_date" >>../tmp/$tableN.colsinfo_tmp3 cat ../tmp/$tableN.colsinfo_tmp3 | while read fiel; do echo "h.$fiel," >> ../tmp/$tableN.colsinfo_tmp4 done echo "'$end_date' as end_date" >> ../tmp/$tableN.colsinfo_tmp4 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp4 >> ../tmp/$tableN.colsinfo_tmp5 hall=`cat ../tmp/$tableN.colsinfo_tmp5` echo "$hall" expand ../tmp/$tableN.colsinfo_tmp2 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2_1 cat ../tmp/$tableN.colsinfo_tmp2_1 | awk  '{print $1}'| while read fiel2; do echo "case when c.$rowkey is not null then c.$fiel2 else h.$fiel2 end as $fiel2," >>../tmp/$tableN.colsinfo_tmp2_2 done awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp2_2 >> ../tmp/$tableN.colsinfo_tmp2_3 allcase=`cat ../tmp/$tableN.colsinfo_tmp2_2` if [[ "$rowkeysn" -eq  "1" ]] ; then sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;" echo $sql3 >>../historys/$logs_data/$start_time$tableN_his.create.sql else  sed -i '1d' ../tmp/$tableN.rowkeys_tmp2 cat ../tmp/$tableN.rowkeys_tmp2 | while read fiel3; do echo "and h.$fiel3 = c.$fiel3" >>../tmp/$tableN.rowkeys_tmp3 done  rowksys=`cat ../tmp/$tableN.rowkeys_tmp3` sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey $rowksys insert overwrite table $tableN_his partition(end_date='$end_date') select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;" echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql fi nohup hive -e "$sql1 $sql2 $sql3" >> ../logs/$logs_data/$start_time$tableN_his.log else ###################################加密md5字段数目为1的情况 column2="$column" #获取当前表的字段tableN(业务表的所有字段字段,用来见分区表) hive -e "desc $tableN;" >> ../tmp/$tableN.colsinfo_tmp1 expand ../tmp/$tableN.colsinfo_tmp1 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2 rm -rf ../tmp/$tableN.colsinfo_tmp1 tableNcolsn=`cat ../tmp/$tableN.colsinfo_tmp2 |wc -l` sed -i 's/$/,/' ../tmp/$tableN.colsinfo_tmp2 tableNcols=`cat ../tmp/$tableN.colsinfo_tmp2` sql0="create table $tableN_his($tableNcols etl_time string , versions int , start_date string) partitioned by (end_date string);"  echo $sql0 >> ../historys/$logs_data/$start_time$tableN_his.create.sql sql1="drop table if exists $tableN_tmp_h;create table $tableN_tmp_h as select *,md5(concat($column2)) as md5_str from $tableN_his where end_date = '3000-12-31';" sql2="drop table if exists $tableN_tmp_c;create table $tableN_tmp_c as select *,md5(concat($column2)) as md5_str from $tableN;" echo $sql1 >> ../historys/$logs_data/$start_time$tableN_his.create.sql echo $sql2 >> ../historys/$logs_data/$start_time$tableN_his.create.sql awk '{print $1}' ../tmp/$tableN.colsinfo_tmp2 > ../tmp/$tableN.colsinfo_tmp3 echo "etl_time" >>../tmp/$tableN.colsinfo_tmp3 echo "versions" >>../tmp/$tableN.colsinfo_tmp3 echo "start_date" >>../tmp/$tableN.colsinfo_tmp3 cat ../tmp/$tableN.colsinfo_tmp3 | while read fiel; do echo "h.$fiel," >> ../tmp/$tableN.colsinfo_tmp4 done echo "'$end_date' as end_date" >> ../tmp/$tableN.colsinfo_tmp4 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp4 >> ../tmp/$tableN.colsinfo_tmp5 hall=`cat ../tmp/$tableN.colsinfo_tmp5` echo "$hall" expand ../tmp/$tableN.colsinfo_tmp2 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2_1 cat ../tmp/$tableN.colsinfo_tmp2_1 | awk  '{print $1}'| while read fiel2; do echo "case when c.$rowkey is not null then c.$fiel2 else h.$fiel2 end as $fiel2," >>../tmp/$tableN.colsinfo_tmp2_2 done awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp2_2 >> ../tmp/$tableN.colsinfo_tmp2_3 allcase=`cat ../tmp/$tableN.colsinfo_tmp2_2` if [[ "$rowkeysn" -eq  "1" ]] ; then sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;" echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql else  sed -i '1d' ../tmp/$tableN.rowkeys_tmp2 cat ../tmp/$tableN.rowkeys_tmp2 | while read fiel3; do echo "and h.$fiel3 = c.$fiel3" >>../tmp/$tableN.rowkeys_tmp3 done  rowksys=`cat ../tmp/$tableN.rowkeys_tmp3` sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey $rowksys insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;" echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql fi # hive -e "$sql0" nohup hive -e "$sql1 $sql2 $sql3"  >> ../logs/$logs_data/$start_time$tableN_his.log fi fi rm -rf ../tmp/* done
转载请注明原文地址: https://www.6miu.com/read-21640.html

最新回复(0)