APP下载

Spring Batch(1)——资料批处理概念

消息来源:baojiabao.com 作者: 发布时间:2024-05-02

报价宝综合消息Spring Batch(1)——资料批处理概念

批处理的核心场景

从某个位置读取大量的记录,位置可以是数据库、档案或者外部推送伫列(MQ)。根据业务需要实时处理读取的资料。将处理后的资料写入某个位置,可以是数据库、档案或者推送到伫列。Spring Batch能解决的批处理场景

Spring Batch为批处理提供了一个轻量化的解决方案,它根据批处理的需要迭代处理各种记录,提供事物功能。但是Spring Batch仅仅适用于"离线"场景,在处理的过程中不能和外部进行任何互动,也不允许有任何输入。

Spring Batch的目标

开发人员仅关注业务逻辑,底层框架的互动交由Spring Batch去处理。能够清晰分离业务与框架,框架已经限定了批处理的业务切入点,业务开发只需关注这些切入点(Read、Process、Write)。提供开箱即用的通用界面。快速轻松的融入Spring 框架,基于Spring Framework能够快速扩充套件各种功能。所有现有核心服务都应易于更换或扩充套件,而不会对基础架构层产生任何影响。Spring Batch结构

如上图,通常情况下一个独立的JVM程式就是仅仅用于处理批处理,而不要和其他功能重叠。 在最后一层基础设定(Infrastructure)部分主要分为3个部分。JobLauncher、Job以及Step。每一个Step又细分为ItemReader、ItemProcessor、ItemWirte。使用Spring Batch主要就是知道每一个基础设定负责的内容,然后在对应的设施中实现对应的业务。

Spring Batch 批处理原则与建议

当我们构建一个批处理的过程时,必须注意以下原则:

通常情况下,批处理的过程对系统和架构的设计要够要求比较高,因此尽可能的使用通用架构来处理批量资料处理,降低问题发生的可能性。Spring Batch是一个是一个轻量级的框架,适用于处理一些灵活并没有到海量的资料。批处理应该尽可能的简单,尽量避免在单个批处理中去执行过于复杂的任务。我们可以将任务分成多个批处理或者多个步骤去实现。保证资料处理和物理资料紧密相连。笼统的说就是我们在处理资料的过程中有很多步骤,在某些步骤执行完时应该就写入资料,而不是等所有都处理完。尽可能减少系统资源的使用、尤其是耗费大量资源的IO以及跨服务器引用,尽量分配好资料处理的批次。定期分析系统的IO使用情况、SQL语句的执行情况等,尽可能的减少不必要的IO操作。优化的原则有:尽量在一次事物中对同一资料进行读取或写快取。一次事物中,尽可能在开始就读取所有需要使用的资料。优化索引,观察SQL的执行情况,尽量使用主键索引,尽量避免全表扫描或过多的索引扫描。SQL中的where尽可能通过主键查询。不要在批处理中对相同的资料执行2次相同的操作。对于批处理程式而言应该在批处理启动之前就分配足够的内存,以免处理的过程中去重新申请新的内存页。对资料的完整性应该从最差的角度来考虑,每一步的处理都应该建立完备的资料校验。对于资料的总量我们应该有一个和资料记录在资料结构的某个字段 上。所有的批处理系统都需要进行压力测试。如果整个批处理的过程是基于档案系统,在处理的过程中请切记完成档案的备份以及档案内容的校验。批处理的通用策略

和软件开发的设计模式一样,批处理也有各种各样的现成模式可供参考。当一个开发(设计)人员开始执行批处理任务时,应该将业务逻辑拆分为一下的步骤或者板块分批执行:

资料转换:某个(某些)批处理的外部资料可能来自不同的外部系统或者外部提供者,这些资料的结构千差万别。在统一进行批量资料处理之前需要对这些资料进行转换,合并为一个统一的结构。因此在资料开始真正的执行业务处理之前,先要使用其他的方法或者一些批处理任务将这些资料转换为统一的格式。资料校验:批处理是对大量资料进行处理,并且资料的来源千差万别,所以批处理的输入资料需要对资料的完整性性进行校验(比如校验字段资料是否缺失)。另外批处理输出的资料也需要进行合适的校验(例如处理了100条资料,校验100条资料是否校验成功)提取资料:批处理的工作是逐条从数据库或目标档案读取记录(records),提取时可以通过一些规则从资料来源中进行资料筛选。资料实时更新处理:根据业务要求,对实时资料进行处理。某些时候一行资料记录的处理需要系结在一个事物之下。输出记录到标准的文件格式:资料处理完成之后需要根据格式写入到对应的外部资料系统中。以上五个步骤是一个标准的资料批处理过程,Spring batch框架为业务实现提供了以上几个功能入口。

资料额外处理

某些情况需要实现对资料进行额外处理,在进入批处理之前通过其他方式将资料进行处理。主要内容有:

排序:由于批处理是以独立的行资料(record)进行处理的,在处理的时候并不知道记录前后关系。因此如果需要对整体资料进行排序,最好事先使用其他方式完成。分割:资料拆分也建议使用独立的任务来完成。理由类似排序,因为批处理的过程都是以行记录为基本处理单位的,无法再对分割之后的资料进行扩充套件处理。合并:理由如上。常规资料来源

批处理的资料来源通常包括:

数据库驱动连结(连结到数据库)对资料进行逐条提取。档案驱动连结,对档案资料进行提取讯息驱动连结,从MQ、kafka等讯息系统提取资料。典型的处理过程

在业务停止的视窗期进行批资料处理,例如银行对账、清结算都是在12点日切到黎明之间。简称为离线处理。线上或并发批处理,但是需要对实际业务或使用者的响应进行考量。并行处理多种不同的批处理作业。分割槽处理:将相同的资料分为不同的区块,然后按照相同的步骤分为许多独立的批处理任务对不同的区块进行处理。以上处理过程进行组合。在执行2,3点批处理时需要注意事物隔离等级。

Spring Batch批处理的核心概念

下图是批处理的核心流程图。

Spring Batch同样按照批处理的标准实现了各个层级的元件。并且在框架级别保证资料的完整性和事物性。

如图所示,在一个标准的批处理任务中涵盖的核心概念有JobLauncher、Job、Step,一个Job可以涵盖多个Step,一个Job对应一个启动的JobLauncher。一个Step中分为ItemReader、ItemProcessor、ItemWriter,根据字面意思它们分别对应资料提取、资料处理和资料写入。此外JobLauncher、Job、Step会产生元资料(Metadata),它们会被储存到JobRepository中。

Job

简单的说Job是封装一个批处理过程的实体,与其他的Spring专案类似,Job可以通过XML或Java类配置,称为“Job Configuration”。如下图Job是单个批处理的最顶层。

为了便于理解,可以简单的将Job理解为是每一步(Step)例项的容器。他结合了多个Step,为它们提供统一的服务同时也为Step提供个性化的服务,比如步骤重启。通常情况下Job的配置包含以下内容:

Job的名称定义和排序Step执行例项。标记每个Step是否可以重启。Spring Batch为Job界面提供了预设的实现——SimpleJob,其中实现了一些标准的批处理方法。下面的程式码展示了如可注入一个Job。

@Bean

public Job footballJob() {

return this.jobBuilderFactory.get("footballJob") //get中命名了Job的名称

.start(playerLoad()) //playerLoad、gameLoad、playerSummarization都是Step

.next(gameLoad())

.next(playerSummarization())

.end()

.build();

}

JobInstance

JobInstance是指批处理作业执行的例项。例如一个批处理必须在每天执行一次,系统在2019年5月1日执行了一次我们称之为2019-05-01的例项,类似的还会有2019-05-02、2019-05-03例项。通常情况下,一个JobInstance对应一个JobParameters,对应多个JobExecution。(JobParameters、JobExecution见后文)。同一个JobInstance具有相同的上下文(ExecutionContext内容见后文)。

JobParameters

前面讨论了JobInstance与Job的区别,但是具体的区别内容都是通过JobParameters体现的。一个JobParameters物件中包含了一系列Job执行相关的引数,这些引数可以用于参考或者用于实际的业务使用。对应的关系如下图:

当我们执行2个不同的JobInstance时JobParameters中的属性都会有差异。可以简单的认为一个JobInstance的标识就是Job+JobParameters。

JobExecution

JobExecution可以理解为单次执行Job的容器。一次JobInstance执行的结果可能是成功、也可能是失败。但是对于Spring Batch框架而言,只有返回执行成功才会视为完成一次批处理。例如2019-05-01执行了一次JobInstance,但是执行的过程失败,因此第二次还会有一个“相同的”的JobInstance被执行。

Job用于定义批处理如何执行,JobInstance纯粹的就是一个处理物件,把所有的执行内容和资讯组织在一起,主要是为了当面临问题时定义正确的重启引数。而JobExecution是执行时的“容器”,记录动态执行时的各种属性和上线文。他包括的资讯有:

以上这些内容Spring Batch都会通过JobRepository进行持久化(这些资讯官方文成称之为MetaData),因此在对应的资料来源中可以看到下列资讯:

BATCH_JOB_INSTANCE:

BATCH_JOB_EXECUTION_PARAMS:

BATCH_JOB_EXECUTION:

当某个Job批处理任务失败之后会在对应的数据库表中路对应的状态。假设1月1号执行的任务失败,技术团队花费了大量的时间解决这个问题到了第二天21才继续执行这个任务。

BATCH_JOB_INSTANCE:

BATCH_JOB_EXECUTION_PARAMS:

BATCH_JOB_EXECUTION:

从资料上看好似JobInstance是一个接一个顺序执行的,但是对于Spring Batch并没有进行任何控制。不同的JobInstance很有可能是同时在执行(相同的JobInstance同时执行会丢掷JobExecutionAlreadyRunningException异常)。

Step

Step是批处理重复执行的最小单元,它按照顺序定义了一次执行的必要过程。因此每个Job可以视作由一个或多个多个Step组成。一个Step包含了所有所有进行批处理的必要资讯,这些资讯的内容是由开发人员决定的并没有统一的标准。一个Step可以很简单,也可以很复杂。他可以是复杂业务的组合,也有可能仅仅用于迁移资料。与JobExecution的概念类似,Step也有特定的StepExecution,关系结构如下:

StepExecution

StepExecution表示单次执行Step的容器,每次Step执行时都会有一个新的StepExecution被建立。与JobExecution不同的是,当某个Step执行失败后预设并不会重新执行。StepExecution包含以下属性:

ExecutionContext

前文已经多次提到ExecutionContext。可以简单的认为ExecutionContext提供了一个Key/Value机制,在StepExecution和JobExecution物件的任何位置都可以获取到ExecutionContext中的任何资料。最有价值的作用是记录资料的执行位置,以便发生重启时候从对应的位置继续执行:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition())

比如在任务中有一个名为“loadData”的Step,他的作用是从档案中读取资料写入到数据库,当第一次执行失败后,数据库中有如下资料:

BATCH_JOB_INSTANCE:

BATCH_JOB_EXECUTION_PARAMS:

BATCH_JOB_EXECUTION:

BATCH_STEP_EXECUTION:

BATCH_STEP_EXECUTION_CONTEXT: |STEP_EXEC_ID|SHORT_CONTEXT| |---|---| |1|{piece.count=40321}|

在上面的例子中,Step执行30分钟处理了40321个“pieces”,我们姑且认为“pieces”表示行间的行数(实际就是每个Step完成循环处理的个数)。这个值会在每个commit之前被更新记录在ExecutionContext中(更新需要用到StepListener后文会详细说明)。当我们再次重启这个Job时并记录在BATCH_STEP_EXECUTION_CONTEXT中的资料会载入到ExecutionContext中,这样当我们继续执行批处理任务时可以从上一次中断的位置继续处理。例如下面的程式码在ItemReader中检查上次执行的结果,并从中断的位置继续执行:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {

log.debug("Initializing for restart. Restart data is: " + executionContext);

long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

LineReader reader = getReader();

Object record = "";

while (reader.getPosition() record = readLine();

}

}

ExecutionContext是根据JobInstance进行管理的,因此只要是相同的例项都会具备相同的ExecutionContext(无论是否停止)。此外通过以下方法都可以获得一个ExecutionContext:

ExecutionContext ecStep = stepExecution.getExecutionContext();

ExecutionContext ecJob = jobExecution.getExecutionContext();

但是这2个ExecutionContext并不相同,前者是在一个Step中每次Commit资料之间共享,后者是在Step与Step之间共享。

JobRepository

JobRepository是所有前面介绍的物件例项的持久化机制。他为JobLauncher、Job、Step的实现提供了CRUD操作。当一个Job第一次被启动时,一个JobExecution会从资料来源中获取到,同时在执行的过程中StepExecution、JobExecution的实现都会记录到资料来源中。使用@EnableBatchProcessing注解后JobRepository会进行自动化配置。

JobLauncher

JobLauncher为Job的启动执行提供了一个边界的入口,在启动Job的同时还可以定制JobParameters:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)

throws JobExecutionAlreadyRunningException, JobRestartException,

JobInstanceAlreadyCompleteException, JobParametersInvalidException;

}

竟然都看到最后了,给小编点个关注吧,小编还会持续更新的,只收藏不点关注的都是在耍流氓!

2019-07-12 12:52:00

相关文章