博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于自定义日志打印的UDAF调试
阅读量:6275 次
发布时间:2019-06-22

本文共 8896 字,大约阅读时间需要 29 分钟。

看到最近有一些用户,代码在本地IDE环境里调试成功了后,到线上调试出现结果不符合预期的情况。因为IDE里无法模拟多个worker进行分布式调试UDAF的场景,所以有一些BUG可能需要到线上用一些简单的测试数据进行调试。这里用最简单的手工打印日志的方法,针对代码调试中最麻烦的UDAF的例子做一次调试。通过问题的定位和解决,希望能给大家在面对UDF的线上调试的时候提供一些思路。

初始化

首先,线上的真实数据可能非常多,千万不要直接对着上亿条数据直接调试,否则很难定位到原因。面对线上的问题,最好先根据数据情况,简化计算场景。比如我这里,就先把测试数据简化成:

drop table if exists testUDAF;create table testUDAF(    str string) partitioned by (ds string);--dual表是我早前已经创建好的就一条数据的表insert overwrite table testUDAF partition (ds)select str,ds from (    select 'a' as str,1 as ds from dual union all    select 'a' as str,1 as ds from dual union all    select 'b' as str,1 as ds from dual union all    select 'a' as str,2 as ds from dual union all    select 'c' as str,2 as ds from dual union all    select 'c' as str,2 as ds from dual) a;select * from testUDAF;

可以看到模拟数据是

screenshot
这样一共6一条记录,分布在2个不同的分区里。
我们希望UDAF的计算结果能类似:

SELECT wm_concat(',', concat(str, ':', cnt)) AS retFROM (    SELECT str, COUNT(*) AS cnt    FROM testUDAF    GROUP BY str) a;

screenshot

代码编写

在本地已经调试好的JAVA代码如下:

package com.aliyun.odps.udaf;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.*;import java.util.Map.Entry;import com.aliyun.odps.io.NullWritable;import com.aliyun.odps.io.Text;import com.aliyun.odps.io.Writable;import com.aliyun.odps.udf.UDFException;import com.aliyun.odps.udf.Aggregator;import com.aliyun.odps.udf.annotation.Resolve;@Resolve({"string->string"})public class MySum extends Aggregator {    private static final String rd = ":";    private static final String fd = ",";    private static class SumBuffer implements Writable {        private HashMap
dict = new HashMap<>(); @Override public void write(DataOutput out) throws IOException { String dictStr = dict.toString(); out.writeUTF(dictStr); } /* * 做了个简单的反序列化 * */ @Override public void readFields(DataInput in) throws IOException { String dictStr = in.readUTF(); String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(","); for(int i=0;i
entry : p.dict.entrySet()) { Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue(); buf.dict.put(entry.getKey(), count); } } @Override public Writable terminate(Writable buffer) throws UDFException { SumBuffer buf = (SumBuffer) buffer; StringBuilder sb = new StringBuilder(); for (Entry
entry : buf.dict.entrySet()) { sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd); } Text resault = new Text(); resault.set(sb.substring(0,sb.length()-1)); return resault; }}

因为逻辑不复杂,所以也没有添加更多的注释。可以看到用一个Map来存放中间数据,并用toString来做序列化,然后写了段简单的代码进行反序列化。到了terminate后,拼成需要的结果再返回。

打包后,注册一下函数并测试一下结果:

odps@ >add jar D:\\mysum.jar -f;OK: Resource 'mysum.jar' have been updated.odps@ >create function mysum as com.aliyun.odps.udaf.MySum using mysum.jar;Success: Function 'mysum' have been created.odps@ >select mysum(str) from testUDAF;+-----+| _c0 |+-----+| a:3,b:1,c:4 |+-----+

排查思路

可以看到,这里c的值不知道为什么变成了4,这个是在本地没有发现的问题。还好我们的数据量比较小,所以定位起来比较方便。目前的思路是,我们已经明确输入的数据是什么,也知道我们期望的结果是什么。那么我们首先需要知道,在中间数据的一步步流转的过程中,从哪里开始和我们预期的不一样。定位到是哪里开始数据和预期不符合后,再结合上下文的代码逻辑,定位到问题的原因。

首先我们给代码加上一些异常打印,看看流转过程中的数据分别是什么。通过System.err.println,我们把我们想要的信息打印到stderr里。

@Override    public void iterate(Writable buffer, Writable[] args) throws UDFException {        SumBuffer iterateDictBuffer = (SumBuffer) buffer;        String content;        if (args[0] instanceof NullWritable) {            content = "Null";        } else {            content = args[0].toString();        }        Long count = iterateDictBuffer.dict.containsKey(content) ? iterateDictBuffer.dict.get(content) : 0L;        System.err.println("input in iterate:" + content+"\tdict:"+iterateDictBuffer.dict);    //拿到原始的输入和当前的状态        iterateDictBuffer.dict.put(content, count + 1);        System.err.println("output in iterate:" + iterateDictBuffer.dict);                    //打印iterate输出的内容    }    @Override    public void merge(Writable buffer, Writable partial) throws UDFException {        SumBuffer buf = (SumBuffer) buffer;        SumBuffer p = (SumBuffer) partial;        System.err.println("buffer in merge:" + buf.dict);                    //打印merge里的buffer的内容        System.err.println("partial in merge:" + p.dict);                    //打印merge里的partial的内容        for (Entry
entry : p.dict.entrySet()) { Long count = buf.dict.containsKey(entry.getKey()) ? buf.dict.get(entry.getKey()) + entry.getValue() : entry.getValue(); buf.dict.put(entry.getKey(), count); } System.err.println("output in merge:" + buf.dict); //打印merge里的输出的内容 } @Override public Writable terminate(Writable buffer) throws UDFException { SumBuffer buf = (SumBuffer) buffer; System.err.println("output in terminate:" + buf.dict); //打印terminate里的输入的内容 StringBuilder sb = new StringBuilder(); for (Entry
entry : buf.dict.entrySet()) { sb.append(entry.getKey()).append(rd).append(entry.getValue()).append(fd); } System.err.println(sb.substring(0,sb.length()-1)); //打印terminate里的输出的内容 Text resault = new Text(); resault.set(sb.substring(0,sb.length()-1)); return resault; }

先打印了这么几个方法里。这样打印的思路主要是,看看每次调用的时候的数据输入输出是什么。从而定位到是从哪里开始出现的问题。

打包,替换掉jar包,然后重新调用一下函数,可以看到

odps@ >add jar D:\\mysum.jar -f;OK: Resource 'mysum.jar' have been updated.odps@ >select mysum(str) from testUDAF;+-----+| _c0 |+-----+| a:3,b:1,c:4 |+-----+

结果数据是不变的,但是我们可以看下日志。打开里面的logview,可以看到:

screenshot
里面的日志,2个Map里的日志分别是:

Heap Size: 1024Minput in iterate:a    dict:{}output in iterate:{a=1}input in iterate:c    dict:{a=1}output in iterate:{a=1, c=1}input in iterate:c    dict:{a=1, c=1}output in iterate:{a=1, c=2}Heap Size: 1024Minput in iterate:a    dict:{}output in iterate:{a=1}input in iterate:a    dict:{a=1}output in iterate:{a=2}input in iterate:b    dict:{a=2}output in iterate:{a=2, b=1}

看到都是对的,然后看下Reduce里的结果:

Heap Size: 1024Mbuffer in merge:{}partial in merge:{a=1, c=2}output in merge:{a=1, c=2}buffer in merge:{a=1, c=2}partial in merge:{a=2, b=1, c=2}output in merge:{a=3, b=1, c=4}output in terminate:{a=3, b=1, c=4}a:3,b:1,c:4

看一下,partial in merge:{a=2, b=1, c=2} 这条数据不符合预期。照道理说,我们前面输出的是output in iterate:{a=2, b=1},怎么到这里就变成了{a=2, b=1, c=2}了呢。

这种的变化,是在多个worker之间进行传递的时候,我们做了序列号和反序列化,于是我们在这里又打了一些日志:

@Override        public void write(DataOutput out) throws IOException {            String dictStr = dict.toString();            out.writeUTF(dictStr);            System.err.println("dict in write:" + dictStr);                    //打印序列化输出        }        /*         * 做了个简单的反序列化         * */        @Override        public void readFields(DataInput in) throws IOException {            String dictStr = in.readUTF();            System.err.println("dictStr in readFields:" + dictStr);                    //打印反序列化输出            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");            for(int i=0;i

重新打包跑一次,这次看到的日志是这样:

--map阶段:dict in write:{a=1, c=2}dict in write:{a=2, b=1}--reduce阶段:dictStr in readFields:{a=1, c=2}dict in readFields:{a=1, c=2}dictStr in readFields:{a=2, b=1}dict in readFields:{a=2, b=1, c=2}

果然反序列化的时候输出的结果就有问题了。但是从这里还没有明确的证据说明是哪行代码出的问题。看到dict输出的结果不符合预期,我们先看看输入的时候是什么。于是再加一行日志:

@Override        public void readFields(DataInput in) throws IOException {            String dictStr = in.readUTF();            System.err.println("dictStr in readFields:" + dictStr);                    //打印反序列化输出            System.err.println("dict in readFields before put:" + dict);                    //打印反序列化输出            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");            for(int i=0;i

看到这会的reduce阶段日志

dictStr in readFields:{a=1, c=2}dict in readFields before put:{}dict in readFields:{a=1, c=2}dictStr in readFields:{a=2, b=1}dict in readFields before put:{a=1, c=2}dict in readFields:{a=2, b=1, c=2}

这下真相大白了。我们第二次调用readFields序列化{a=2, b=1}这个字符串的时候,发现本来应该为空的dict的内容竟然是上次计算后的结果。实际上,在readFields里,相同worker里的SumBuffer被复用了。这种情况下,为了保证计算的准确性,我们可以自己清空一下dict的内容

@Override        public void readFields(DataInput in) throws IOException {            String dictStr = in.readUTF();            dict = new HashMap<>();            String[] tokens = dictStr.replace("{", "").replace("}", "").replace(" ","").split(",");            for(int i=0;i

这下终于对了

odps@ >add jar D:\\mysum.jar -f;OK: Resource 'mysum.jar' have been updated.odps@ >select mysum(str) from testUDAF;+-----+| _c0 |+-----+| a:3,b:1,c:2 |+-----+

总结

代码还有其他更多可以优化的地方。不过这次为了能简单说明调试的过程,简化代码逻辑,就没在这方面再多下功夫。实际的业务代码里还需要考虑到性能和异常捕捉等问题。

System.err.println这个办法虽然很笨,但是很有效,不是吗?

转载地址:http://fvwva.baihongyu.com/

你可能感兴趣的文章
html5 聊天机器人
查看>>
第三章:Shiro的配置——深入浅出学Shiro细粒度权限开发框架
查看>>
openstack虚拟机修改IP地址
查看>>
80后创业的经验谈(转,朴实但实用!推荐)
查看>>
初识 lex
查看>>
让Windows图片查看器和windows资源管理器显示WebP格式
查看>>
我的友情链接
查看>>
TCP and UDP Small Servers
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
Linux的dd命令
查看>>
从服务器下载一个离线包,格式为gz的压缩包,怎么解压呢。
查看>>
vim使用点滴
查看>>
embedded linux学习中几个需要明确的概念
查看>>
mysql常用语法
查看>>
Morris ajax
查看>>
【Docker学习笔记(四)】通过Nginx镜像快速搭建静态网站
查看>>
ORA-12514: TNS: 监听程序当前无法识别连接描述符中请求的服务
查看>>
<转>云主机配置OpenStack使用spice的方法
查看>>
java jvm GC 各个区内存参数设置
查看>>