网络知识 娱乐 数据治理(十):Atlas 案例演示

数据治理(十):Atlas 案例演示

#头条创作挑战赛#

Atlas 案例演示

由于 Atlas 目前版本对 Hive 元数据监控比较好,这里我们改写了数仓“商户营收业务”业务,只使用 Hive Shell 脚本实现,后期来演示 Atlas 对元数据的管理。


“商户营收业务”数仓分层图:


一、创建所有 Hive 表

在 node3 上执行数仓“商户营收业务”创建所有表的 SQL 脚本:



CREATE EXTERNAL TABLE `TO_YCAK_MAC_D`(n `MID` int, n `SRL_ID` string, n `HARD_ID` string, n `SONG_WHSE_VER` string, n `EXEC_VER` string, n `UI_VER` string, n `IS_ONLINE` string, n `STS` int, n `CUR_LOGIN_TM` string, n `PAY_SW` string, n `LANG` int, n `SONG_WHSE_TYPE` int, n `SCR_TYPE` int)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCAK_MAC_D';nnnCREATE EXTERNAL TABLE `TO_YCAK_MAC_LOC_D`(n `MID` int, n `PRVC_ID` int, n `CTY_ID` int, n `PRVC` string, n `CTY` string, n `MAP_CLSS` string, n `LON` string, n `LAT` string, n `ADDR` string, n `ADDR_FMT` string, n `REV_TM` string, n `SALE_TM` string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCAK_MAC_LOC_D';nnnCREATE EXTERNAL TABLE `TO_YCBK_MAC_ADMIN_MAP_D`(n `MID` int, n `MAC_NM` string, n `PKG_NUM` int, n `PKG_NM` string, n `INV_RATE` double, n `AGE_RATE` double, n `COM_RATE` double, n `PAR_RATE` double, n `DEPOSIT` double, n `SCENE_PRVC_ID` string, n `SCENE_CTY_ID` string, n `SCENE_AREA_ID` string, n `SCENE_ADDR` string, n `PRDCT_TYPE` string, n `SERIAL_NUM` string, n `HAD_MPAY_FUNC` int, n `IS_ACTV` int, n `ACTV_TM` string,n `ORDER_TM` string,n `GROUND_NM` string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_MAC_ADMIN_MAP_D';nnnCREATE EXTERNAL TABLE `TO_YCBK_MAC_STORE_MAP_D`(n `STORE_ID` int, n `MID` int, n `PRDCT_TYPE` int, n `ADMINID` int, n `CREAT_TM` stringn)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_MAC_STORE_MAP_D';nnnnnCREATE EXTERNAL TABLE `TO_YCBK_STORE_D`(n `ID` int, n `STORE_NM` string, n `TAG_ID` string, n `TAG_NM` string, n `SUB_TAG_ID` string,n `SUB_TAG_NM` string,n `PRVC_ID` string,n `CTY_ID` string,n `AREA_ID` string,n `ADDR` string,n `GROUND_NM` string,n `BUS_TM` string,n `CLOS_TM` string,n `SUB_SCENE_CATGY_ID` string,n `SUB_SCENE_CATGY_NM` string,n `SUB_SCENE_ID` string,n `SUB_SCENE_NM` string,n `BRND_ID` string,n `BRND_NM` string,n `SUB_BRND_ID` string,n `SUB_BRND_NM` stringn)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_STORE_D';nnnCREATE EXTERNAL TABLE `TO_YCBK_PRVC_D`(n `PRVC_ID` int, n `PRVC` stringn)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_PRVC_D';nnnCREATE EXTERNAL TABLE `TO_YCBK_CITY_D`(n `PRVC_ID` int, n `CTY_ID` int,n `CTY` stringn)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_CITY_D';nnnCREATE EXTERNAL TABLE `TO_YCBK_AREA_D`(n `CTY_ID` int, n `AREA_ID` int,n `AREA` stringn)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_AREA_D';nnnCREATE EXTERNAL TABLE `TW_MAC_BASEINFO_D`(n `MID` int, n `MAC_NM` string,n `SONG_WHSE_VER` string, n `EXEC_VER` string, n `UI_VER` string, n `HARD_ID` string, n `SALE_TM` string, n `REV_TM` string, n `OPER_NM` string, n `PRVC` string, n `CTY` string, n `AREA` string, n `ADDR` string,n `STORE_NM` string,n `SCENCE_CATGY` string, n `SUB_SCENCE_CATGY` string, n `SCENE` string, n `SUB_SCENE` string, n `BRND` string, n `SUB_BRND` string, n `PRDCT_NM` string, n `PRDCT_TYP` int, n `BUS_MODE` string, n `INV_RATE` double, n `AGE_RATE` double, n `COM_RATE` double, n `PAR_RATE` double, n `IS_ACTV` int, n `ACTV_TM` string,n `PAY_SW` int,n `PRTN_NM` string,n `CUR_LOGIN_TM` stringn )nPARTITIONED BY (data_dt string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TW_MAC_BASEINFO_D';nnnCREATE EXTERNAL TABLE `TO_YCAK_USR_D`(n `UID` int, n `REG_MID` int, n `GDR` string, n `BIRTHDAY` string,n `MSISDN` string,n `LOC_ID` int,n `LOG_MDE` int,n `REG_TM` string,n `USR_EXP` string,n `SCORE` int,n `LEVEL` int,n `WX_ID` stringn )nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_D';nnnCREATE EXTERNAL TABLE `TO_YCAK_USR_ALI_D`(n `UID` int, n `REG_MID` int, n `GDR` string, n `BIRTHDAY` string,n `MSISDN` string,n `LOC_ID` int,n `LOG_MDE` int,n `REG_TM` string,n `USR_EXP` string,n `SCORE` int,n `LEVEL` int,n `USR_TYPE` string,n `IS_CERT` string,n `IS_STDNT` string,n `ALY_ID` string n )nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_ALI_D';nnnCREATE EXTERNAL TABLE `TO_YCAK_USR_QQ_D`(n `UID` int, n `REG_MID` int, n `GDR` string, n `BIRTHDAY` string,n `MSISDN` string,n `LOC_ID` int,n `LOG_MDE` int,n `REG_TM` string,n `USR_EXP` string,n `SCORE` int,n `LEVEL` int,n `QQID` string n )nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_QQ_D';nnnCREATE EXTERNAL TABLE `TO_YCAK_USR_APP_D`(n `UID` int, n `REG_MID` int, n `GDR` string, n `BIRTHDAY` string,n `MSISDN` string,n `LOC_ID` int,n `REG_TM` string,n `USR_EXP` string,n `LEVEL` int,n `APP_ID` string n )nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_APP_D';nnnCREATE EXTERNAL TABLE `TO_YCAK_USR_LOGIN_D`(n `ID` int, n `UID` int, n `MID` int, n `LOGIN_TM` string,n `LOGOUT_TM` string,n `MODE_TYPE` intn )nPARTITIONED BY (`data_dt` string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_LOGIN_D';nnnCREATE EXTERNAL TABLE `TW_USR_BASEINFO_D`(n `UID` int, n `REG_MID` int, n `REG_CHNL` string, n `REF_UID` string,n `GDR` string,n `BIRTHDAY` string,n `MSISDN` string,n `LOC_ID` int,n `LOG_MDE` string,n `REG_DT` string,n `REG_TM` string,n `USR_EXP` string,n `SCORE` int,n `LEVEL` int,n `USR_TYPE` string,n `IS_CERT` string,n `IS_STDNT` stringn )nPARTITIONED BY (`data_dt` string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TW_USR_BASEINFO_D';nnnCREATE EXTERNAL TABLE `TO_YCAK_USR_LOC_D`(n `ID` int, n `UID` int, n `LAT` string, n `LNG` string,n `DATETIME` string,n `MID` stringn )nPARTITIONED BY (data_dt string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_LOC_D';nnnCREATE EXTERNAL TABLE `TW_MAC_LOC_D`(n `MID` int, n `X` string, n `Y` string, n `CNT` int,n `ADDER` string,n `PRVC` string,n `CTY` string,n `CTY_CD` string,n `DISTRICT` string,n `AD_CD` string,n `TOWN_SHIP` string,n `TOWN_CD` string,n `NB_NM` string,n `NB_TP` string,n `BD_NM` string,n `BD_TP` string,n `STREET` string,n `STREET_NB` string,n `STREET_LOC` string,n `STREET_DRCTION` string,n `STREET_DSTANCE` string,n `BUS_INFO` stringn )nPARTITIONED BY (data_dt string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TW_MAC_LOC_D';nnnnnCREATE EXTERNAL TABLE `TO_YCAK_CNSM_D`(n `ID` int, n `MID` int, n `PRDCD_TYPE` int, n `PAY_TYPE` int,n `PKG_ID` int,n `PKG_NM` string,n `AMT` int,n `CNSM_ID` string,n `ORDR_ID` string,n `TRD_ID` string,n `ACT_TM` string,n `UID` int,n `NICK_NM` string,n `ACTV_ID` int,n `ACTV_NM` string,n `CPN_TYPE` int,n `CPN_TYPE_NM` string,n `PKG_PRC` int,n `PKG_DSCNT` int,n `ORDR_TYPE` int,n `BILL_DT` intn )nPARTITIONED BY (data_dt string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_CNSM_D';nnnCREATE EXTERNAL TABLE `TW_CNSM_BRIEF_D`(n `ID` int, n `TRD_ID` string, n `UID` string, n `MID` int,n `PRDCD_TYPE` int,n `PAY_TYPE` int,n `ACT_TM` string,n `PKG_ID` int,n `COIN_PRC` int,n `COIN_CNT` int,n `UPDATE_TM` string,n `ORDR_ID` string,n `ACTV_NM` string,n `PKG_PRC` int,n `PKG_DSCNT` int,n `CPN_TYPE` int,n `ABN_TYP` intn )nPARTITIONED BY (data_dt string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TW_CNSM_BRIEF_D';nnnCREATE EXTERNAL TABLE `TW_MAC_STAT_D`(n `MID` int, n `MAC_NM` string, n `PRDCT_TYPE` string, n `STORE_NM` int,n `BUS_MODE` string,n `PAY_SW` string,n `SCENCE_CATGY` string,n `SUB_SCENCE_CATGY` string,n `SCENE` string,n `SUB_SCENE` string,n `BRND` string,n `SUB_BRND` string,n `PRVC` string,n `CTY` string,n `AREA` string,n `AGE_ID` string,n `INV_RATE` string,n `AGE_RATE` string,n `COM_RATE` string,n `PAR_RATE` string,n `PKG_ID` string,n `PAY_TYPE` string,n `CNSM_USR_CNT` string,n `REF_USR_CNT` string,n `NEW_USR_CNT` string,n `REV_ORDR_CNT` string,n `REF_ORDR_CNT` string,n `TOT_REV` string,n `TOT_REF` stringn )nPARTITIONED BY (data_dt string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TW_MAC_STAT_D';nnnCREATE EXTERNAL TABLE `TM_USR_MRCHNT_STAT_D`(n `ADMIN_ID` string, n `PAY_TYPE` int, n `REV_ORDR_CNT` int, n `REF_ORDR_CNT` int,n `TOT_REV` double,n `TOT_REF` double,n `TOT_INV_REV` DECIMAL(10,2),n `TOT_AGE_REV` DECIMAL(10,2),n `TOT_COM_REV` DECIMAL(10,2),n `TOT_PAR_REV` DECIMAL(10,2)n )nPARTITIONED BY (DATA_DT string)nROW FORMAT DELIMITED FIELDS TERMINATED BY 't' nLOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TM_USR_MRCHNT_STAT_D';nn



[root@node3 test]# hive -f ./CreateAllHiveTables.sql nn



执行如下命令,将 mysql 数据导入到 ODS 层中,注意输入时间:


mysql 数据导入到 ODS 所有表的脚本附件:


[root@node3 ~]# cd /root/testnnn[root@node3 test]# sh all_mysql_to_ods.sh 20220413nn




#!/bin/bashn###################################################################n### 将所有mysql中的数据导入到ODS中 ###n###################################################################nif [ x"$1" = x ]; thenn echo "====没有导入数据的日期,输入日期===="n exitnelsen echo "====使用传入的日期 ===="n currentDate=$1nfinecho "日期为 : $currentDate"nsh /root/test/ods_mysqltohive_to_ycak_cnsm_d.sh $1nsh /root/test/ods_mysqltohive_to_ycak_mac_d.sh $1nsh /root/test/ods_mysqltohive_to_ycak_mac_loc_d.sh $1nsh /root/test/ods_mysqltohive_to_ycak_usr_ali_d.sh $1nsh /root/test/ods_mysqltohive_to_ycak_usr_app_d.sh $1nsh /root/test/ods_mysqltohive_to_ycak_usr_d.sh $1nsh /root/test/ods_mysqltohive_to_ycak_usr_loc_d.sh $1nsh /root/test/ods_mysqltohive_to_ycak_usr_login_d.sh $1nsh /root/test/ods_mysqltohive_to_ycak_usr_qq_d.sh $1nsh /root/test/ods_mysqltohive_to_ycbk_area_d.sh $1nsh /root/test/ods_mysqltohive_to_ycbk_city_d.sh $1nsh /root/test/ods_mysqltohive_to_ycbk_mac_admin_map_d.sh $1nsh /root/test/ods_mysqltohive_to_ycbk_mac_store_map_d.sh $1nsh /root/test/ods_mysqltohive_to_ycbk_prvc_d.sh $1nsh /root/test/ods_mysqltohive_to_ycbk_store_d.sh $1nn



查看 Atlas 中监控到的创建 Hive 表


二、编写处理业务 Shell 脚本

以下脚本包含处理“商户营收业务”所有脚本,这些脚本都是 Hive + Shell 的脚本,调用时需要传入参数,也可以使用 Azkaban 进行调度。

1、ODS 层数据表获取 EDS 层 TW_MAC_BASEINFO_D 机器的基本信息表脚本附件:


#!/bin/bashn#######################################################################n### 根据ODS层: ###n### TO_YCAK_MAC_D 机器基本信息日全量表 ###n### TO_YCAK_MAC_LOC_D 机器位置信息日全量表 ###n### TO_YCBK_MAC_ADMIN_MAP_D 机器客户映射关系资料日全量表 ###n### TO_YCBK_MAC_STORE_MAP_D 机器门店映射关系日全量表 ###n### TO_YCBK_STORE_D 门店信息日全量表 ###n### TO_YCBK_PRVC_D 机器省份日全量表 ###n### TO_YCBK_CITY_D 机器城市日全量表 ###n### TO_YCBK_AREA_D 机器区县日全量表 ###n### 获取EDS层表 TW_MAC_BASEINFO_D 机器的基本信息 ###n#######################################################################nif [ x"$1" = x ]; thenn echo "====没有导入数据的日期,输入日期===="n exitnelsen echo "====使用传入的日期 ===="n currentDate=$1nfinecho "日期为 : $currentDate"nnn`hive -e "set hive.exec.mode.local.auto=true"`nnn`hive -e "insert overwrite table tw_mac_baseinfo_d partition(data_dt='${currentDate}') nselectn YCAK.MID, --机器IDn YCBK.MAC_NM, --机器名称n YCAK.SONG_WHSE_VER, --歌曲版本n YCAK.EXEC_VER, --系统版本号n YCAK.UI_VER, --歌曲UI版本号n YCAK.HARD_ID, --硬件IDn YCAK.SALE_TM, --销售时间n YCAK.REV_TM, --运营时间n YCBK.STORE_NM as OPER_NM, --运营商名称n if (YCAK.PRVC is null,YCBK.PRVC,YCAK.PRVC) as PRVC, --机器所在省n if (YCAK.CTY is null,YCBK.CTY,YCAK.CTY) as CTY, --机器所在市n YCBK.AREA, --机器所在区域n if (YCAK.ADDR_FMT is null,YCBK.ADDR,YCAK.ADDR_FMT) as ADDR, --机器详细地址n YCBK.STORE_NM, --门店名称n YCBK.TAG_NM as SCENCE_CATGY, --主场景名称n YCBK.SUB_SCENE_CATGY_NM as SUB_SCENCE_CATGY, --子场景分类名称n YCBK.SUB_TAG_NM as SCENE , --主场景分类名称n YCBK.SUB_SCENE_NM as SUB_SCENE , --子场景名称n YCBK.BRND_NM as BRND, --主场景品牌n YCBK.SUB_BRND_NM as SUB_BRND, --子场景品牌n YCBK.PKG_NM as PRDCT_NM, --产品名称n 2 as PRDCT_TYP, --产品类型n case when YCBK.PKG_NM = '联营版' then '联营'n when YCBK.INV_RATE < 100 then '联营'n else '卖断' end BUS_MODE, --运营模式n YCBK.INV_RATE, --投资人分成比例n YCBK.AGE_RATE, --代理人、联盟人分成比例n YCBK.COM_RATE, --公司分成比例n YCBK.PAR_RATE, --合作方分成比例n if (YCAK.STS is null ,YCBK.IS_ACTV,YCAK.STS) as IS_ACTV, --是否激活n YCBK.ACTV_TM, --激活时间n if (YCAK.PAY_SW is null ,YCBK.PAY_SW,YCAK.PAY_SW) as PAY_SW, --是否开通移动支付n YCBK.STORE_NM as PRTN_NM, --代理人姓名,这里获取门店名称n YCAK.CUR_LOGIN_TM --最近登录时间nFROM (nSELECTn TEMP.MID, --机器IDn MAC.SRL_ID, --序列号n MAC.HARD_ID, --硬件IDn MAC.SONG_WHSE_VER, --歌库版本号n MAC.EXEC_VER, --系统版本号n MAC.UI_VER, --歌库UI版本号n MAC.STS, --激活状态n MAC.CUR_LOGIN_TM, --最近登录时间n MAC.PAY_SW, --支付开关是否打开n MAC.IS_ONLINE, --是否在线n 2 as PRDCT_TYPE, --产品类型,2n LOC.PRVC , --机器所在省份n LOC.CTY , --机器所在城市n LOC.ADDR_FMT, --详细地址n LOC.REV_TM, --运营时间n LOC.SALE_TM --销售时间n from (select MID from TO_YCAK_MAC_D union select MID from TO_YCBK_MAC_ADMIN_MAP_D) as TEMPn left join TO_YCAK_MAC_D as MAC on TEMP.MID = MAC.MIDn left join TO_YCAK_MAC_LOC_D as LOC on TEMP.MID = LOC.MIDn) as YCAKnLEFT JOIN (nnn selectn TEMP.MID, --机器IDn MA.MAC_NM, --机器名称n MA.PKG_NM, --套餐名称n MA.INV_RATE, --投资人分成比例n MA.AGE_RATE, --承接方分成比例n MA.COM_RATE, --公司分成比例n MA.PAR_RATE, --合作方分成比例n MA.IS_ACTV, --是否激活n MA.ACTV_TM, --激活时间n MA.HAD_MPAY_FUNC as PAY_SW, --支付开关是否打开n PRVC.PRVC, --省份n CTY.CTY, --城市n AREA.AREA, --区、县n CONCAT(MA.SCENE_ADDR,MA.GROUND_NM) as ADDR, --场景地址,场地名称,n STORE.GROUND_NM as STORE_NM, --门店名称,这里的store_nm都是数字n STORE.TAG_NM, --主场景名称n STORE.SUB_TAG_NM,--主场景分类n STORE.SUB_SCENE_CATGY_NM, --子场景分类名称n STORE.SUB_SCENE_NM, --子场景名称n STORE.BRND_NM, --品牌名称n STORE.SUB_BRND_NM --子品牌名称n from (select MID from TO_YCAK_MAC_D union select MID from TO_YCBK_MAC_ADMIN_MAP_D) as TEMPn left join TO_YCBK_MAC_ADMIN_MAP_D as MA on TEMP.MID = MA.MIDn left join TO_YCBK_PRVC_D as PRVC on MA.SCENE_PRVC_ID = PRVC.PRVC_IDn left join TO_YCBK_CITY_D as CTY on MA.SCENE_CTY_ID = CTY.CTY_IDn left join TO_YCBK_AREA_D as AREA on MA.SCENE_AREA_ID = AREA.AREA_IDn left join TO_YCBK_MAC_STORE_MAP_D as SMA on TEMP.MID = SMA.MIDn left join TO_YCBK_STORE_D as STORE on SMA.STORE_ID = STORE.IDn) as YCBKnON YCAK.MID = YCBK.MID"`nn


2、ODS 层数据表获取 EDS 层 TW_USR_BASEINFO_D 活跃用户信息数据表脚本附件:


#!/bin/bashn###################################################################n### 根据 YCAK 库中所有用户信息获取表 TW_USR_BASEINFO_D 用户信息 ###n###################################################################nif [ x"$1" = x ]; thenn echo "====没有导入数据的日期,输入日期===="n exitnelsen echo "====使用传入的日期 ===="n currentDate=$1nfinecho "日期为 : $currentDate"nnn`hive -e "insert overwrite table TW_USR_BASEINFO_D partition (data_dt = ${currentDate})nSELECTn UID, --用户IDn REG_MID, --机器IDn '1' AS REG_CHNL, -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道n WX_ID AS REF_UID, --微信账号n GDR, --性别n BIRTHDAY, --生日n MSISDN, --手机号码n LOC_ID, --地区IDn LOG_MDE, --注册登录方式n substring(REG_TM,1,8) AS REG_DT, --注册日期n substring(REG_TM,9,6) AS REG_TM, --注册时间n USR_EXP, --用户当前经验值n SCORE, --累计积分n LEVEL, --用户等级n '2' AS USR_TYPE, --用户类型 1-企业 2-个人n NULL AS IS_CERT, --实名认证n NULL AS IS_STDNT --是否是学生nFROM TO_YCAK_USR_DnUNIONnSELECTn UID, --用户IDn REG_MID, --机器IDn '2' AS REG_CHNL, -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道n ALY_ID AS REF_UID, --支付宝账号n GDR, --性别n BIRTHDAY, --生日n MSISDN, --手机号码n LOC_ID, --地区IDn LOG_MDE, --注册登录方式n substring(REG_TM,1,8) AS REG_DT, --注册日期n substring(REG_TM,9,6) AS REG_TM, --注册时间n USR_EXP, --用户当前经验值n SCORE, --累计积分n LEVEL, --用户等级n NVL(USR_TYPE,'2') AS USR_TYPE, --用户类型 1-企业 2-个人n IS_CERT , --实名认证n IS_STDNT --是否是学生nFROM TO_YCAK_USR_ALI_DnUNIONnSELECTn UID, --用户IDn REG_MID, --机器IDn '3' AS REG_CHNL, -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道n QQID AS REF_UID, --QQ账号n GDR, --性别n BIRTHDAY, --生日n MSISDN, --手机号码n LOC_ID, --地区IDn LOG_MDE, --注册登录方式n substring(REG_TM,1,8) AS REG_DT, --注册日期n substring(REG_TM,9,6) AS REG_TM, --注册时间n USR_EXP, --用户当前经验值n SCORE, --累计积分n LEVEL, --用户等级n '2' AS USR_TYPE, --用户类型 1-企业 2-个人n NULL AS IS_CERT, --实名认证n NULL AS IS_STDNT --是否是学生nFROM TO_YCAK_USR_QQ_DnUNIONnSELECTn UID, --用户IDn REG_MID, --机器IDn '4' AS REG_CHNL, -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道n APP_ID AS REF_UID, --APP账号n GDR, --性别n BIRTHDAY, --生日n MSISDN, --手机号码n LOC_ID, --地区IDn NULL AS LOG_MDE, --注册登录方式n substring(REG_TM,1,8) AS REG_DT, --注册日期n substring(REG_TM,9,6) AS REG_TM, --注册时间n USR_EXP, --用户当前经验值n 0 AS SCORE, --累计积分n LEVEL, --用户等级n '2' AS USR_TYPE, --用户类型 1-企业 2-个人n NULL AS IS_CERT, --实名认证n NULL AS IS_STDNT --是否是学生nFROM TO_YCAK_USR_APP_D"`nn


3、ODS 层数据表获取 EDS 层 TW_CNSM_BRIEF_D 消费退款订单流水日增量表脚本附件:


#!/bin/bashn###################################################################n### 根据 YCAK 库中用户消费订单明细表 TO_YCAK_CNSM_D ###n### 获取 EDS 层 TW_CNSM_BRIEF_D 消费退款订单流水日增量表 ###n###################################################################nif [ x"$1" = x ]; thenn echo "====没有导入数据的日期,输入日期===="n exitnelsen echo "====使用传入的日期 ===="n currentDate=$1nfinecho "日期为 : $currentDate"nnn`hive -e "insert overwrite table TW_CNSM_BRIEF_D partition (data_dt=${currentDate})nselectnID, --IDnTRD_ID, --第三方交易编号ncast(UID as string) AS UID, --用户IDnMID, --机器IDnPRDCD_TYPE, --产品类型nPAY_TYPE, --支付类型nACT_TM, --消费时间nPKG_ID, --套餐IDncase when AMT<0 then AMT*-1 else AMT end AS COIN_PRC, --币值n1 AS COIN_CNT, --币数 ,单位分nACT_TM as UPDATE_TM, --状态更新时间nORDR_ID, --订单IDnACTV_NM, --优惠活动名称nPKG_PRC, --套餐原价nPKG_DSCNT, --套餐优惠价nCPN_TYPE, --优惠券类型nCASE WHEN ORDR_TYPE = 1 THEN 0n WHEN ORDR_TYPE = 2 THEN 1n WHEN ORDR_TYPE = 3 THEN 2n WHEN ORDR_TYPE = 4 THEN 2 END AS ABN_TYP --异常类型:0-无异常 1-异常订单 2-商家退款nFROM TO_YCAK_CNSM_DnWHERE DATA_DT = ${currentDate} "`nn


4、EDS-DWD 层数据获取 EDS-DWS 层 TW_MAC_STAT_D 机器日营收情况统计表脚本附件:


#!/bin/bashn###################################################################n### 根据 EDS-DWD 层中: ###n### TW_MAC_BASEINFO_D 机器基础信息日全量表 ###n### TW_USR_BASEINFO_D 活跃用户基础信息日增量表 ###n### TW_CNSM_BRIEF_D 消费退款订单流水日增量表 ###n### 获取 EDS-DWS 层 TW_MAC_STAT_D 机器日营收情况统计表 ###n###################################################################nif [ x"$1" = x ]; thenn echo "====没有导入数据的日期,输入日期===="n exitnelsen echo "====使用传入的日期 ===="n currentDate=$1nfinecho "日期为 : $currentDate"nnn`hive -e "insert overwrite table TW_MAC_STAT_D partition (data_dt = ${currentDate})nSELECTn A.MID, --机器IDn A.MAC_NM, --机器名称n A.PRDCT_TYP, --产品类型n A.STORE_NM, --门店名称n A.BUS_MODE, --运营模式n A.PAY_SW, --是否开通移动支付n A.SCENCE_CATGY, --主场景分类n A.SUB_SCENCE_CATGY, --子场景分类n A.SCENE, --主场景n A.SUB_SCENE, --子场景n A.BRND, --主场景品牌n A.SUB_BRND, --子场景品牌n A.PRVC, --省份n A.CTY, --城市n A.AREA, --区县n A.PRTN_NM as AGE_ID, --代理人IDn A.INV_RATE, --投资人分成比例n A.AGE_RATE, --代理人、联盟人分成比例n A.COM_RATE, --公司分成比例n A.PAR_RATE, --合作方分成比例n C.PKG_ID, --套餐IDn C.PAY_TYPE, --支付类型n NVL(C.CNSM_USR_CNT,0) AS CNSM_USR_CNT, --总消费用户数n NVL(D.REF_USR_CNT,0) AS REF_USR_CNT, --总退款用户数n NVL(E.NEW_USR_CNT,0) AS NEW_USR_CNT, --总新增用户数n NVL(C.REV_ORDR_CNT,0) AS REV_ORDR_CNT, --总营收订单数n NVL(D.REF_ORDR_CNT,0) AS REF_ORDR_CNT, --总退款订单数n NVL(C.TOT_REV,0) AS TOT_REV, --总营收n NVL(D.TOT_REF,0) AS TOT_REF --总退款nFROM (SELECT * FROM TW_MAC_BASEINFO_D WHERE DATA_DT = ${currentDate}) A --机器基础信息nLEFT JOIN (n selectn MID, --机器IDn PKG_ID, --套餐IDn PAY_TYPE, --支付类型n COUNT(DISTINCT UID) as CNSM_USR_CNT, --总消费用户数n SUM(COIN_CNT * COIN_PRC) as TOT_REV, --总营收n COUNT(ORDR_ID) as REV_ORDR_CNT --总营收订单数n from TW_CNSM_BRIEF_Dn where ABN_TYP = 0 AND DATA_DT = ${currentDate}n group by MID,PKG_ID,PAY_TYPEn) C on A.MID = C.MID --机器当日营收信息nLEFT JOIN (n selectn MID, --机器IDn PKG_ID, --套餐IDn PAY_TYPE, --支付类型n COUNT(DISTINCT UID) as REF_USR_CNT, --总退款用户数n SUM(COIN_CNT * COIN_PRC) as TOT_REF, --总退款n COUNT(ORDR_ID) as REF_ORDR_CNT --总退款订单数n from TW_CNSM_BRIEF_Dn where ABN_TYP = 2n group by MID,PKG_ID,PAY_TYPEn) D on A.MID = D.MIDn AND C.MID = D.MIDn AND C.PKG_ID = D.PKG_IDn AND C.PAY_TYPE = D.PAY_TYPE --机器当日退款信息nLEFT JOIN (n selectn REG_MID as MID, --机器IDn count(UID) as NEW_USR_CNT --新增用户个数n from TW_USR_BASEINFO_Dn where REG_DT = ${currentDate}n group by REG_MIDn) E on A.MID = E.MID --机器当日新增用户信息n"`nn


5、EDS-DWS 层数据获取 DM 层 TM_USR_MRCHNT_STAT_D 商户日营收统计表脚本附件:


#!/bin/bashn###################################################################n### 根据 EDS-DWS 层中: ###n### TW_MAC_STAT_D 机器日营收情况统计表 ###n### 获取DM层 TM_USR_MRCHNT_STAT_D 商户日营收统计表 ###n###################################################################nif [ x"$1" = x ]; thenn echo "====没有导入数据的日期,输入日期===="n exitnelsen echo "====使用传入的日期 ===="n currentDate=$1nfinecho "日期为 : $currentDate"nnn`hive -e "insert overwrite table TM_USR_MRCHNT_STAT_D partition (data_dt=${currentDate})nselectn AGE_ID AS ADMIN_ID, --代理人n PAY_TYPE,n SUM(REV_ORDR_CNT) AS REV_ORDR_CNT, --总营收订单数n SUM(REF_ORDR_CNT) AS REF_ORDR_CNT, --总退款订单数n CAST(SUM(TOT_REV) AS DECIMAL(10,2)) AS TOT_REV, --总营收n CAST(SUM(TOT_REF) AS DECIMAL(10,2)) AS TOT_REF, --总退款n CAST(SUM(TOT_REV * NVL(INV_RATE,0)) AS DECIMAL(10,2)) AS TOT_INV_REV, --投资人营收n CAST(SUM(TOT_REV * NVL(AGE_RATE,0)) AS DECIMAL(10,2)) AS TOT_AGE_REV, --代理人营收n CAST(SUM(TOT_REV * NVL(COM_RATE,0)) AS DECIMAL(10,2)) AS TOT_COM_REV, --公司营收n CAST(SUM(TOT_REV * NVL(PAR_RATE,0)) AS DECIMAL(10,2)) AS TOT_PAR_REV --合伙人营收nfrom TW_MAC_STAT_DnWHERE DATA_DT = ${currentDate}nGROUP BY AGE_ID,PAY_TYPEn"`nn


三、手动执行脚本

注意:执行脚本时需要传入时间:


[root@node3 test]# sh ProduceShell1.sh 20220413n[root@node3 test]# sh ProduceShell2.sh 20220413n[root@node3 test]# sh ProduceShell3.sh 20220413n[root@node3 test]# sh ProduceShell4.sh 20220413n[root@node3 test]# sh ProduceShell5.sh 20220413nn


四、Atlas 中查看表元数据

查看 EDS 层表 TW_MAC_BASEINFO_D 机器的基本信息表血缘关系:



查看 EDS 层表 TW_USR_BASEINFO_D 活跃用户信息数据表血缘关系:



查看 EDS 层表 TW_CNSM_BRIEF_D 消费退款订单流水日增量表血缘关系:



查看 EDS-DWS 层 TW_MAC_STAT_D 机器日营收情况统计表血缘关系:



查看 DM 层 TM_USR_MRCHNT_STAT_D 商户日营收统计表血缘关系:



以上除了可以查看表之间的血缘关系还可以查看字段的血缘关系,以 EDS-DWS 层表 TW_MAC_STAT_D 机器日营收情况统计表中的“机器-MID”字段为例,查看字段的血缘关系如下:



我们可以根据 Atlas 提供的表、字段的血缘关系及时定位问题,加快数据分析效率。