APP下载

基于XML描述的可程式设计函式式ETL实现

消息来源:baojiabao.com 作者: 发布时间:2024-05-20

报价宝综合消息基于XML描述的可程式设计函式式ETL实现

转载本文需注明出处:微信公众号EAWorld,违者必究。

引言:

传统 ETL 主要以 SQL 为主要技术手段,把资料经抽取、清洗转换之后载入到资料仓库。但是在如今移动互联网大力发展的场景下,产生大量碎片化和不规则的资料。-,公安等行业,传统数据库已经远远无法满足需求。资料原始档案通过档案汇入到基础库,再通过大资料 HQL等技术手段提取出二级库,这中间的资料汇入和 SQL ETL 的提取的过程,大量消耗 IO 效能和计算资源,在很多场景下已经是资料处理的瓶颈所在。

普元在实施公安专案过程中开发了一种基于 XML 描述的可程式设计的函式 ETL 转换方法。主要用于大资料档案处理领域,能从原始资料档案直接、快速载入到专题库的技术手段。技术方案主要解决了用 XML 的技术手段描述资料档案的格式,包含档案字段切分、字段型别、预设值、异常值校验、时间格式校验。在处理时可新增自行开发的 JAVA UDF 函式,函式实参支援变数、常量、表示式、函式和运算子过载。同时函式支援多层巢状,即内部函式的返回值最为外部函式的实参。该方案实现了 XML 内函式体的语法解析并在执行过程中直接编译为 Java 字节码的技术。有效的解决了-、公安、电信行业巨量的资料处理需要的大量计算资源和 IO 效能瓶颈,有效的提高了资料处理效率和降低了资料处理开发难度。

目录:

一、基于 XML 控制档案解析资料档案方案介绍

二、XML 控制档案结构和语法

三、函式和多层巢状函式传参

四、UDF 函式编写方法

五、资料测试工具

六、FlumeOnYarn 架构和分散式部署

一、基于 XML 控制档案解析资料档案方案介绍

对于资料开发专案,我们常常会面临众多的资料对接,部分场景不仅资料量大,且资料种类多,资料解析开发工作量巨大。对于大量资料对接,一般设计的 RPC 界面和 WebService 一般都达不到资料效能要求的。并且他们都是点对点的服务,一旦上下游系统故障,都会造成整个资料对接异常。因此大部分都会选择使用档案的方式进行资料对接。

对于非实时资料对接需求,这种方式的优点:

在资料量大的情况下,可以通过档案传输,上游只写入,无需关心资料业务和故障;方案简单,避免了网络协议相关的概念;维护简单,只需保证磁盘写入稳定性即可;我们常常会面临基于此架构的资料对接。但基于此架构资料处理工作都在下游(即资料使用方)。

面对大量资料对接和众多的资料型别,我们对于每种资料档案解析、解码、清洗消耗大量的人力,并且基于编码的方式对于较多资料型别的场景程式码量大,且难以管理。因此经过多次资料开发实践,我们开发了一种基于 XML 描述的方式来解析和清洗资料档案的实现。

本架构实现适合以下几个方面:

基于档案的资料对接;档案无法直接汇入到目标数据库,需要做转换,清洗为目标格式;如上资料对接架构图,Flume 基本实现了基于档案系统的自动扫描和读取,因此架构实现了基于 Flume Sink 的模组。本架构也可作为SDK 作为框架整合到现有资料处理方案中。

二、XML资料控制档案结构和语法

JD_TYPE_V1

textfile

,

exp_flag

sender_id

sender_num

sender_address

receiver_num

expect_time

receiver_num_origin

is_sender_num_null

number_connect

all_num_null

sender_province_code

如上 XML 描述了一种资料档案型别及该型别的切分方法,资料每行经过切分后,产生的多个数据列的转换方法。

理论上,每种资料型别应该对应一个控制档案,意味着控制档案来描述该种资料型别如何解析和转换。

Key 主要标注该控制档案处理的型别ID;Delimiter 为档案列切割字元;Fields 中包含每列的字段描述;资料型别支援Java基本型别和date型别;Skip为资料对齐语法,控制在列中忽略某列的值;Default = true 属性为资料对齐语法,给某列提供预设值,提供预设值的列在资料列中不移动位移;Value 提供了给该字段提供当列中无值时提供预设值;value=null则指定列值为null;Date 型别需 pattern 属性;

三、函式和多层巢状函式传参

预设值

词法分析时字段field 的value 属性值没有以英文小括号闭合的实体。如下示例中的primeton:

data_vendor

函式

函式是由一组字串、数字、下划线组成的合法函式名和0 到多个形式引数组成。在词法分析时字段field 的 value 属性值由英文小括号闭合的实体。如下示例中的:

location(),yn(),concat();

curr_time

receiver_num_origin

is_sender_num_null

number_connect

函式名

函式体小括号前面的部分。一般由字串、数字、下划线组成的一组特定的名称。如location(receiver_tel),location 即为该函式的函式名称。

函式的形式引数:

1.无引数

词法分析时value的值满足函式条件且函式体内无引数。如下示例中:unix_timestamp() 获得当前系统内的 Unix 时间戳;

curr_time

2.常量型形参

词法分析时函式体内以英文单引号引用的值为函式体的常量型形参。如’100’,函式示例为:random_int(‘100’),生成 0-100 以内的随机整形数值;

rand_num

3.变数型形参

词法分析时函式体内参数没有英文单引号引用并且不以英文小括号闭合的为函式体的变数型形参。如下示例中的receiver_tel;

r_num_loc

4.函式型形参

词法分析时函式体内没有英文单引号并且以英文小括号闭合的引数型别引数为函式体的函式型引数。如下示例中的:none(sender_num)和none(receiver_num);

all_num_null

词法分析获得到函式体的同时,使用函式名呼叫UdfRegistors.getUdf(udfName) 函式,以检验当前系统必要存在该函式,否则则丢掷无法识别的函式异常。

5.型别校验

词法分析阶段获得了字段 field 的取值是预设值或者函式,下一步需校验其预设值或函式的返回值是否能和定义的字段型别相匹配。如果是函式同时校验函式的形参和实参型别是否相匹配。

data_vendor

call_flag

如上示例中的primeton 需能转换为 string 型别,call_flag 需能转换为 int 型别。如果型别不能转换,则会丢掷型别无法转换异常。对于函式,通过 returnType 返回型别和字段型别进行校验,可匹配或者是该型别的子型别则型别验证通过。

四、UDF 函式编写方法

编写一个UDF函式的步骤:

继承 UDF 类,实现 eval 方法;Eval 方法传入的是一个数组引数;判断引数长度是否和预期的一致;判断位置引数型别是否和预期的一致;实现函式体;返回eval函式执行的返回值,理论上该返回值的型别应该一致,不应该同一函式返回多种型别值;函式编写者应该保证函式体内是执行绪安全的;UDF 实现如下:

public abstract class UDF {

/**

* 是否支援该组引数型别,不支援丢掷UnsupportedTypeException异常。预设返回 true

*/

public void support(Class>... paramsClass)throws UnsupportedTypeException;

/**

* 该 UDF 返回值型别,用于校验巢状函式型别是否匹配。可返回简单型别,map,array,record 等型别.预设返回 String 型别

*/

public Class> returnType();

/**

* UDF 执行函式,当输入不符合预期时,向外丢掷异常

* @param params 函式的输入实参

* @return 函式输出结果,简单型别或者复杂型别,支援简单型别,map,array,record 型别

*/

public abstract Object eval(Object... params);

}

一个判断是否包含子串的UDF 写法:

所有的UDF都通过一个核心注册类(这点类似 Hive 的FunctionRegistry)

public final class UdfRegistors {

/**

* UDF 函式对映

*/

static final Map UDF_CACHED = new HashMap();

static {

UDF_CACHED.put("copy", new CopyUDF()); // 复制一个变数的值

UDF_CACHED.put("eq", new EqUDF()); // 判断两个变数是否相等

UDF_CACHED.put("yn", new YnUDF()); // 根据输入true,false 转换为 Y、N

UDF_CACHED.put("null", new NullUDF()); // 判断变数是否为null

// add udf method

UDF_CACHED.put("location", new LocationUDF()); // 获得手机号码的归属地

UDF_CACHED.put("nation_code", new NationCodeUDF()); // 根据国家名称获取国家程式码

UDF_CACHED.put("province_code", new ProvinceCodeUDF()); //根据省名称获取省程式码

UDF_CACHED.put("city_code", new CityCodeUDF()); // 根据城市名称获取城市程式码

UDF_CACHED.put("phone_num", new PhoneNumUDF()); // 校验是否是手机号或者固话

UDF_CACHED.put("number_format", new NumberFormatUDF()); //校验是否可以转化成数字

}

/**

* 新增一个UDF函式

* @param key UDF 函式

* @param value UDF 函式 eval 应执行绪安全

* @return

*/

public static boolean addUdf(String key, UDF value) {

return UDF_CACHED.put(Optional.of(key).map((it)->it.toLowerCase()).get(), value) != null;

}

/**

* 获得内建的 udf 函式

*/

public static UDF getUdf(String udfName) {

return UDF_CACHED.get(udfName.toLowerCase());

}

}

UDF 函式注册时期:

可在编译期系结内建的 UDF 函式;可在系统启动时配置自载入的 UDF 函式;可在执行期动态注入UDF 函式;

五、资料测试工具

资料对接过程,面对资料是否能转换为目标结果常常无从所知。基于XML 控制档案的资料解析,可实现一个测试工具。该工具通过上传资料档案和上传 XML 控制档案,可对资料档案随机的读取行进行匹配测试,只要资料列和目标 XML档案能通过列匹配测试,则资料可通过 ETL 解析清洗。否则继续修改 XML 控制档案,直到顺利通过匹配。

六、FlumeOnYarn 架构和分散式部署

本架构适合以档案作为资料对接的方案,另一方面,通过扩充套件 Flume 即可实现拿来主义。Flume 内部实现对 Channel 的 Transaction,对于每个以档案构造的 Event 物件是原子操作,要么全部成功,要么失败。flume依赖事务来保证event的可靠性。Flume 预设没有分散式实现,因此开发了 FlumeOnYarn 的架构,用于支援 Flume 的分散式部署。

FlumeOnYarn优势:

无需每个节点安装 Flume,可一键启动和停止;配置档案在客户端节点修改,自动复制到 Yarn 上各例项,无需每个节点修改;基于 CDH或HDP的发行版,即使实现了 Web 视觉化化的配置和分散式部署,但是对于 Flume 只能实现单配置档案例项,无法实现多配置例项;丛集的规模可以根据资料量大小进行实时的调整(增减节点),实现弹性处理。通过命令或者 api 即可控制(CDH 等需要在页面新增 host,繁琐且不易动态调整);多个租户或者同一租户多个处理例项互不影响,且能隔离(Yarn Container);FlumeOnYarn 架构

上图所示,提交FlumeOnYarn 需要客户端,该客户端没有太多和Flume安装包结构特殊的地方,只是在 lib 下添加了 flume-yarn 的架构支援和 bin 下 flume-on-yarn 的启动指令码。

Flume OnYarn 客户端程式

通过 bin/flume-on-yarn 即可提交 FlumeOnYarn Application 丛集。如下的命令即可一次性申请多个 Yarn 资源节点,实现一键部署:

bin/flume-on-yarn yarn -s --name agent_name –conf conf/flume-hdfs.conf --num-instances 5

总结

关于作者:震秦,普元资深开发工程师,专注于大资料开发 8 年,擅长 Hadoop 生态内各工具的使用和优化。参与某公关广告(上市)公司DMP 建设,负责资料分层设计和批处理,排程实现,完成交付使用;参与国内多省市公安社交网络专案部署,负责产品开发(Spark 分析应用);参与资料清洗加工为我方主题库并部署上层应用。

关于EAWorld:微服务,DevOps,资料治理,移动架构原创技术分享。

2019-08-15 08:48:00

相关文章