spring batch的使用和定时器Quart的使用

xiaoxiao2021-02-28  86

 spring Batch是一个基于Spring的企业级批处理框架,它通过配合定时器Quartz来轻易实现大批量的数据读取或插入,并且全程自动化,无需人员管理。

在使用spring batch之前,得对spring batch的流程有一个基本了解

每个batch它都包含了一个job,而一个job中却有可能包含多个step,整个batch中干活的是step,batch主要是用来对数据的操作,所以step就有三个操作数据的东西,一个是ItemReader用来读取数据的,一个是ItemProcessor用来处理数据的,一个是ItemWriter用来写数据(可以是文件也可以是插入sql语句),JobLauncher用来启动Job,JobRepository是上述处理提供的一种持久化机制,它为JobLauncher,Job,和Step实例提供CRUD操作。

pom.xml  三个batch的jar包

[html] view plain copy print ? <span style="white-space:pre">      </span><dependency>           <groupId>org.springframework</groupId>           <artifactId>spring-batch-core</artifactId>           <version>2.1.8.RELEASE</version>          </dependency>                    <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-batch-infrastructure</artifactId>            <version>2.1.8.RELEASE</version>      <span style="white-space:pre">  </span></dependency>                     <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-batch-test</artifactId>            <version>2.1.8.RELEASE</version>          </dependency>     <dependency> <groupId>org.springframework</groupId> <artifactId>spring-batch-core</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-batch-infrastructure</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-batch-test</artifactId> <version>2.1.8.RELEASE</version> </dependency>

batch.xml

[html] view plain copy print ? <beans xmlns="http://www.springframework.org/schema/beans"      xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"      xsi:schemaLocation="http://www.springframework.org/schema/batch          http://www.springframework.org/schema/batch/spring-batch-2.1.xsd          http://www.springframework.org/schema/beans           http://www.springframework.org/schema/beans/spring-beans-3.1.xsd      ">          <bean id="jobLauncher"          class="org.springframework.batch.core.launch.support.SimpleJobLauncher">          <property name="jobRepository" ref="jobRepository" />      </bean>        <bean id="jobRepository"          class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">          <property name="validateTransactionState" value="false" />        </bean>  <span style="white-space:pre">      </span><!--一个job-->          <batch:job id="writerteacherInterview">                  <batch:step id="teacherInterview">                      <batch:tasklet>                          <batch:chunk reader="jdbcItemReaderTeacherInterview" writer="teacherInterviewItemWriter"                              processor="teacherInterviewProcessor" commit-interval="10">                          </batch:chunk>                      </batch:tasklet>                  </batch:step>              </batch:job>                <!--job的读取数据操作-->      <bean id="jdbcItemReaderTeacherInterview"          class="org.springframework.batch.item.database.JdbcCursorItemReader"          scope="step">          <property name="dataSource" ref="dataSource" />          <property name="sql"              value="select distinct teacherName ,count(teacherName) as num from examininterviewrecord   where pdate >'${detail_startime}' and pdate < '${detail_endtime}'  GROUP BY teacherName " />          <property name="rowMapper" ref="teacherInterviewMapper">          </property>      </bean>      </beans>   <beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd "> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="validateTransactionState" value="false" /> </bean> <!--一个job--> <batch:job id="writerteacherInterview"> <batch:step id="teacherInterview"> <batch:tasklet> <batch:chunk reader="jdbcItemReaderTeacherInterview" writer="teacherInterviewItemWriter" processor="teacherInterviewProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <!--job的读取数据操作--> <bean id="jdbcItemReaderTeacherInterview" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step"> <property name="dataSource" ref="dataSource" /> <property name="sql" value="select distinct teacherName ,count(teacherName) as num from examininterviewrecord where pdate >'${detail_startime}' and pdate < '${detail_endtime}' GROUP BY teacherName " /> <property name="rowMapper" ref="teacherInterviewMapper"> </property> </bean> </beans>

读取数据    teacherInterviewMapper

[java] view plain copy print ? package com.yc.batch;    import java.sql.ResultSet;  import java.sql.SQLException;  import org.springframework.jdbc.core.RowMapper;  import org.springframework.stereotype.Component;  import com.yc.vo.TeacherInterviewdetail;  import com.yc.vo.TeacherWorkdetail;  import com.yc.vo.Workdetail;  @Component("teacherInterviewMapper")    public class TeacherInterviewMapper implements RowMapper {        @Override      public Object mapRow(ResultSet rs, int rowNum) throws SQLException {                      TeacherInterviewdetail TId=new TeacherInterviewdetail();          TId.setTeacherName(rs.getString("teacherName"));           TId.setNum(rs.getInt("num"));          return TId;        }  }    package com.yc.batch; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Component; import com.yc.vo.TeacherInterviewdetail; import com.yc.vo.TeacherWorkdetail; import com.yc.vo.Workdetail; @Component("teacherInterviewMapper") public class TeacherInterviewMapper implements RowMapper { @Override public Object mapRow(ResultSet rs, int rowNum) throws SQLException { TeacherInterviewdetail TId=new TeacherInterviewdetail(); TId.setTeacherName(rs.getString("teacherName")); TId.setNum(rs.getInt("num")); return TId; } }

处理数据  teacherInterviewProcessor ,这个处理数据方法,一般都是在这里在这里进行一些数据的加工,比如有些数据没有读到,你也可以在这个方法和后面那个写入数据的类里面写,所以就导致了这个类里面你可以什么都不敢,直接把数据抛到后面去,让后面的写数据类来处理;我这里就是处理数据的这个类什么都没写,但是最好还是按它的规则来!

[java] view plain copy print ? package com.yc.batch;    import org.hibernate.engine.transaction.jta.platform.internal.SynchronizationRegistryBasedSynchronizationStrategy;  import org.springframework.batch.item.ItemProcessor;  import org.springframework.stereotype.Component;  import org.springframework.stereotype.Service;    import com.yc.vo.TeacherInterviewdetail;  import com.yc.vo.TeacherWorkdetail;  import com.yc.vo.Workdetail;      //业务层  @Component("teacherInterviewProcessor")  public class TeacherInterviewProcessor implements ItemProcessor<TeacherInterviewdetail, TeacherInterviewdetail> {        @Override      public TeacherInterviewdetail process(TeacherInterviewdetail teacherInterviewdetail) throws Exception {                     return teacherInterviewdetail;      }  }   package com.yc.batch; import org.hibernate.engine.transaction.jta.platform.internal.SynchronizationRegistryBasedSynchronizationStrategy; import org.springframework.batch.item.ItemProcessor; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import com.yc.vo.TeacherInterviewdetail; import com.yc.vo.TeacherWorkdetail; import com.yc.vo.Workdetail; //业务层 @Component("teacherInterviewProcessor") public class TeacherInterviewProcessor implements ItemProcessor<TeacherInterviewdetail, TeacherInterviewdetail> { @Override public TeacherInterviewdetail process(TeacherInterviewdetail teacherInterviewdetail) throws Exception { return teacherInterviewdetail; } }

写数据 teacherInterviewItemWriter 这个类里面主要是把数据写进一个文件里,同时我这个类里面还有一些数据处理

[java] view plain copy print ? package com.yc.batch;    import java.io.InputStream;    import java.text.NumberFormat;  import java.util.ArrayList;  import java.util.List;  import java.util.Properties;  import javax.annotation.Resource;  import org.springframework.batch.item.ItemWriter;  import org.springframework.stereotype.Component;  import org.springframework.stereotype.Service;  import com.yc.biz.ExamineeClassBiz;  import com.yc.biz.WorkBiz;  import com.yc.utils.CsvUtils;  import com.yc.vo.TeacherInterviewdetail;  import com.yc.vo.TeacherWorkdetail;  import com.yc.vo.Workdetail;  import net.sf.ehcache.util.PropertyUtil;  //写  @Component("teacherInterviewItemWriter")  public class TeacherInterviewItemWriter implements ItemWriter<TeacherInterviewdetail>{        @Override      public void write(List<? extends TeacherInterviewdetail> teacherInterviewdetails) throws Exception {          Properties props = new Properties();          InputStream in= PropertyUtil.class.getClassLoader().getResourceAsStream("connectionConfig.properties");          props.load(in);          String time=props.getProperty("detail_time");          CsvUtils cu=new CsvUtils();           List<Object> works=new ArrayList<Object>();           for(TeacherInterviewdetail t:teacherInterviewdetails){               works.add(t);           }                     String path=this.getClass().getResource("/").getPath();          path=path.substring(0,path.lastIndexOf("/"));          path=path.substring(0,path.lastIndexOf("/"));          path=path.substring(0,path.lastIndexOf("/"));          path=path.substring(0,path.lastIndexOf("/"));          cu.writeCsv(path+"/csv/teacherInterview_"+time+".csv",works );        }   }   package com.yc.batch; import java.io.InputStream; import java.text.NumberFormat; import java.util.ArrayList; import java.util.List; import java.util.Properties; import javax.annotation.Resource; import org.springframework.batch.item.ItemWriter; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import com.yc.biz.ExamineeClassBiz; import com.yc.biz.WorkBiz; import com.yc.utils.CsvUtils; import com.yc.vo.TeacherInterviewdetail; import com.yc.vo.TeacherWorkdetail; import com.yc.vo.Workdetail; import net.sf.ehcache.util.PropertyUtil; //写 @Component("teacherInterviewItemWriter") public class TeacherInterviewItemWriter implements ItemWriter<TeacherInterviewdetail>{ @Override public void write(List<? extends TeacherInterviewdetail> teacherInterviewdetails) throws Exception { Properties props = new Properties(); InputStream in= PropertyUtil.class.getClassLoader().getResourceAsStream("connectionConfig.properties"); props.load(in); String time=props.getProperty("detail_time"); CsvUtils cu=new CsvUtils(); List<Object> works=new ArrayList<Object>(); for(TeacherInterviewdetail t:teacherInterviewdetails){ works.add(t); } String path=this.getClass().getResource("/").getPath(); path=path.substring(0,path.lastIndexOf("/")); path=path.substring(0,path.lastIndexOf("/")); path=path.substring(0,path.lastIndexOf("/")); path=path.substring(0,path.lastIndexOf("/")); cu.writeCsv(path+"/csv/teacherInterview_"+time+".csv",works ); } } 我这里有用到一个吧数据写进CSV文件的jar包

[html] view plain copy print ? <span style="white-space:pre">          </span><dependency>            <groupId>net.sourceforge.javacsv</groupId>            <artifactId>javacsv</artifactId>            <version>2.0</version>          </dependency>   <dependency> <groupId>net.sourceforge.javacsv</groupId> <artifactId>javacsv</artifactId> <version>2.0</version> </dependency>

CsvUtils帮助类的写入CSV文件方法

[cpp] view plain copy print ? /**       * 写入CSV文件       * @throws IOException       */        public void writeCsv(String path,List<Object> t) throws IOException{            String csvFilePath = path;          String filepath=path.substring(0,path.lastIndexOf("/"));          File f=new File(filepath);          if(!f.exists()){              f.mkdirs();          }          File file=new File(path);          if(!file.exists()){              file.createNewFile();          }          CsvWriter wr =new CsvWriter(csvFilePath,',',Charset.forName("GBK"));          try {               for(Object obj:t){                  String[] contents=obj.toString().split(",");                  wr.writeRecord(contents);                }              wr.close();            } catch (IOException e) {                e.printStackTrace();            }        }     /** * 写入CSV文件 * @throws IOException */ public void writeCsv(String path,List<Object> t) throws IOException{ String csvFilePath = path; String filepath=path.substring(0,path.lastIndexOf("/")); File f=new File(filepath); if(!f.exists()){ f.mkdirs(); } File file=new File(path); if(!file.exists()){ file.createNewFile(); } CsvWriter wr =new CsvWriter(csvFilePath,',',Charset.forName("GBK")); try { for(Object obj:t){ String[] contents=obj.toString().split(","); wr.writeRecord(contents); } wr.close(); } catch (IOException e) { e.printStackTrace(); } } 就这样一个基本的batch流程就跑起来了,它通过从数据里读取一些数据,然后经过处理后,被存进服务器下的一个文件里面,之后像这种数据的读取就不需要去数据库里面

查询了,而是可以直接通过读取CSV文件来处理这个业务。一般使用这个的都会配一个定时器,让它们每隔一段时间跑一次,从而获得较新的数据

下面是定时器的配置

定时器的配置非常简单,我是使用注解方式来配置的

定时器任务类

[java] view plain copy print ? package com.yc.task.impl;  import javax.transaction.Transactional;  import org.springframework.batch.core.JobParametersInvalidException;  import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;  import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;  import org.springframework.batch.core.repository.JobRestartException;  import org.springframework.batch.item.ItemProcessor;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.scheduling.annotation.Scheduled;  import org.springframework.stereotype.Component;  import org.springframework.stereotype.Service;    import com.yc.batch.ClassBatch;  import com.yc.batch.MessageItemBatch;  import com.yc.batch.TeacherInterviewBatch;  import com.yc.batch.TearcherBatch;  import com.yc.po.Work;  import com.yc.task.WorkTask;  import com.yc.vo.Workdetail;  @Service  public class WorkTaskImpl implements WorkTask{        @Autowired      private TeacherInterviewBatch teacherInterviewBatch;//教师访谈记录      public void setTeacherInterviewBatch(TeacherInterviewBatch teacherInterviewBatch) {          this.teacherInterviewBatch = teacherInterviewBatch;      }            @Scheduled(cron= "0 30 22 * * ?")   //每天晚上十点30执行一次  这个注解会让框架会自动把这个方法看成任务启动方法       @Override      public void task() {          try {              teacherInterviewBatch.test();//教师访谈          } catch (Exception e) {              e.printStackTrace();          }                }    }   package com.yc.task.impl; import javax.transaction.Transactional; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.batch.item.ItemProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import com.yc.batch.ClassBatch; import com.yc.batch.MessageItemBatch; import com.yc.batch.TeacherInterviewBatch; import com.yc.batch.TearcherBatch; import com.yc.po.Work; import com.yc.task.WorkTask; import com.yc.vo.Workdetail; @Service public class WorkTaskImpl implements WorkTask{ @Autowired private TeacherInterviewBatch teacherInterviewBatch;//教师访谈记录 public void setTeacherInterviewBatch(TeacherInterviewBatch teacherInterviewBatch) { this.teacherInterviewBatch = teacherInterviewBatch; } @Scheduled(cron= "0 30 22 * * ?") //每天晚上十点30执行一次 这个注解会让框架会自动把这个方法看成任务启动方法 @Override public void task() { try { teacherInterviewBatch.test();//教师访谈 } catch (Exception e) { e.printStackTrace(); } } }定时器所真正要执行的方法

[java] view plain copy print ? package com.yc.batch;    import javax.annotation.Resource;  import org.apache.commons.jexl2.Main;  import org.springframework.batch.core.Job;  import org.springframework.batch.core.JobExecution;  import org.springframework.batch.core.JobParameters;  import org.springframework.batch.core.JobParametersBuilder;  import org.springframework.batch.core.JobParametersInvalidException;  import org.springframework.batch.core.launch.JobLauncher;  import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;  import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;  import org.springframework.batch.core.repository.JobRestartException;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.stereotype.Component;    @Component  public class TeacherInterviewBatch {        private Job job;      private JobLauncher launcher;        @Resource(name="writerteacherInterview")      public void setJob(Job job) {          this.job = job;      }        @Autowired       public void setLauncher(JobLauncher launcher) {          this.launcher = launcher;      }          public void test() throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException{            JobParameters jobParameters =                  new JobParametersBuilder()                  .addLong("time",System.currentTimeMillis()).toJobParameters();            JobExecution result = launcher.run(job, jobParameters);      }    }   package com.yc.batch; import javax.annotation.Resource; import org.apache.commons.jexl2.Main; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TeacherInterviewBatch { private Job job; private JobLauncher launcher; @Resource(name="writerteacherInterview") public void setJob(Job job) { this.job = job; } @Autowired public void setLauncher(JobLauncher launcher) { this.launcher = launcher; } public void test() throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException{ JobParameters jobParameters = new JobParametersBuilder() .addLong("time",System.currentTimeMillis()).toJobParameters(); JobExecution result = launcher.run(job, jobParameters); } } 就这样batch就被定时器调度起来了,每天十点准时使用batch来操作数据,将需要展示的数据写入文件,到时候项目运行直接读取文件来展示数据,当数据量达到一定量时,这会给用户提高体验感
转载请注明原文地址: https://www.6miu.com/read-77389.html

最新回复(0)