hive--UDAF实例

xiaoxiao2021-02-28  59

问题:从日志表(uid,登录日期),求每个uid连续登录的最长天数 解决思路:自定义UDAF函数,将问题转换成求最长连续日期的问题

package com.zjs.udaf; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; import org.apache.hadoop.hive.serde2.io.DoubleWritable; public class ContinuousDate extends UDAF { /* * * * */ //实现UDAFEvaluator public static class Evaluator implements UDAFEvaluator { private boolean mEmpty; private double mSum; private int m; private ArrayList<String> dateList; private ArrayList<Double> sumList; public Evaluator() { super(); init(); } //初始化 public void init() { mSum = 0; mEmpty = true; dateList = new ArrayList<String>(); sumList = new ArrayList<Double>(); } //每次对一个新值计算时,都会调用 public boolean iterate(String o) { if (o != null) { //mSum += o.get(); dateList.add(o); mEmpty = false; } return true; } //合并两部分聚集值 public boolean merge(ArrayList<String> o) { //System.out.println("merge"+m+"dateList.size"+dateList.size()); if (o != null) { //mSum += o.get(); if (o.size() > 0) { for (int i = 0; i < o.size(); i++) { dateList.add(o.get(i)); } } mEmpty = false; } return true; } //返回聚合计算当前状态的对象 public ArrayList<String> terminatePartial() { return dateList; } //返回最终聚合结果 public DoubleWritable terminate() { Collections.sort(dateList); for (int i = 0; i < dateList.size(); i++) { //System.out.println(dateList.get(i)); if (i == 0) { mSum = mSum + 1; } else { if (new ContinuousDate().getQuot(dateList.get(i - 1), dateList.get(i)) == -1) { mSum = mSum + 1; } else { sumList.add(mSum); mSum = 1; } } } sumList.add(mSum); return new DoubleWritable(Collections.max(sumList)); } } public long getQuot(String time1, String time2) { long quot = 0; SimpleDateFormat ft = new SimpleDateFormat("yyyyMMdd"); try { Date date1 = ft.parse(time1); Date date2 = ft.parse(time2); quot = date1.getTime() - date2.getTime(); quot = quot / 1000 / 60 / 60 / 24; } catch (ParseException e) { e.printStackTrace(); } return quot; } }

测试:

hive> add jar /home/inf/zhangjishuai/udf/condate.jar; Added [/home/inf/zhangjishuai/udf/condate.jar] to class path Added resources: [/home/inf/zhangjishuai/udf/condate.jar] hive> create temporary function condate1 as 'com.zjs.udaf.ContinuousDate'; OK Time taken: 0.004 seconds hive> select condate1(date1) from tmp.zjs_0502_2; Query ID = inf_20180502175757_35eafd2b-08f5-474b-bd40-b69d2607b523 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1515552670335_720089, Tracking URL = http://namenode02:8088/proxy/application_1515552670335_720089/ Kill Command = /opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/lib/hadoop/bin/hadoop job -kill job_1515552670335_720089 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2018-05-02 17:57:18,720 Stage-1 map = 0%, reduce = 0% 2018-05-02 17:57:23,854 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.91 sec 2018-05-02 17:57:29,993 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 7.26 sec MapReduce Total cumulative CPU time: 7 seconds 260 msec Ended Job = job_1515552670335_720089 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 7.26 sec HDFS Read: 6986 HDFS Write: 4 SUCCESS Total MapReduce CPU Time Spent: 7 seconds 260 msec OK 4.0 Time taken: 17.806 seconds, Fetched: 1 row(s)

数据:

hive> select * from tmp.zjs_0502_2; OK 20180401 20180402 20180403 20180404 20180406 20180408 20180409
转载请注明原文地址: https://www.6miu.com/read-2613818.html

最新回复(0)