网络知识 娱乐 基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活

基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活

因为项目仅仅是不停的抓数据,没有弄成分布式的,但依然需要系统双活来保证系统稳定.


这个时候,我的思路是:这个在另一篇博文中有些的详细思路(在Timer定时任务中_基于Redis自己实现一套双机互备_双活系统_基于抢占原则),可以搜索双活查到,但是实际


实现起来,碰到了一些问题,已经解决,记录下来.


来看一下定时任务Timer_Scheduled_Quartz_如何在SpringBoot中整合


首先根据上面这个博文,已经实现了,一个简单的quartz定时:


1.自己定义一个QuartzTask普通的接口,然后实现几个这个接口,做为不同的定时任务.


2.但这里需要的是,动态添加任务,因为系统启动后,不同的定时器,会做不同的工作,


并且,采集系统需要连接不同的IP的设置进行接口数据采集,这就要求,每个IP的设备


都需要有一个与之对应的定时器,而且,这个IP设备,是动态的,有可能将来,会动态的再添加


一个IP设备,那么这个时候,系统就会自动检测,自动为这个IP设备,生成一个定时任务.


3.所以可以看到,我们这个系统中的定时任务是动态添加的,但是应该是有2个定时任务是固定的.


4.一个定时任务是,用来检测系统,双活的,这个定时任务做的事情是,实时检测自身状态,没问题,就


更新最新时间,这样别的单点系统,在检测是时候发现,有一台机器上活着的,并且没有超时,就不会


上线,让出定时任务执行的机会,给主机,所有的从机属于待命状态.我们这样做,为了保证,集群


中的所有机器中,每个定时任务应该是只执行一份,而,我们这里还要求,不应该是每个定时任务只有


一份在集群中执行,而是集群中的,所有实例中,同时只有一个实例中的定时任务在执行.


5.然后还需要一个定时任务,是检测IP设备变化的,当新添加了新的IP设备以后,就需要动态,给


这个IP设备,添加一个定时任务,而添加的这定时任务就是需要为这个新的IP设备,添加,


6.然后还需要一个定时任务,就是用来获取数据的,周期获取IP设备的数据,而这个定时任务,是需要


为每个IP设备都生成一个定时任务的,所以这里定时任务是需要动态的生成的.


7.然后还需要一个定时任务,用来发送ping命令的,定时给每个IP设备,发送ping命令,这样如果能正常


得到回应,那么就更新一下时间表,{type:ping,time:2022-06-20 15:59:34,IP:192.168.1.110},这样


就说明该IP设备,一直都是活动的.而某些IP设备是,连接上以后,它会自动的推送,ping过来,同时推送


心跳数据过来,这里ping过来指的是连接没有问题了,而心跳过来,才指的是IP设备的接口可以正常获取数据了.


8.还需要一个定时任务,来动态的循环所有的IP设备,然后去连接上所有的IP设备,这个是定时执行,


如果定时执行发现设备掉线超时了他就会再去连接一下.


9.这些定时任务,只有一个定时任务是在启动的时候,利用quartz的配置类,走的,其他的都是动态创建的.


10.关于分布式,本来想通过quartz的分布式功能,来实现的,因为quartz本身自带分布式功能,通过


结合数据库和锁实现的,这个时候,先在数据库表中创建quartz的表,然后在QuartzScheduleConfig中,去配置上数据库dataSource,就可以了,但是这个时候发现有个问题,就是任务是我自己定义的任务,


这样的话,比较方便实现自己的业务,因为我都每个定时任务,都是需要有参数的,这样,我自己定义的任务,里面写上自己定义的参数,比如每个任务创建的时候,都需要把它对应的IP设备的IP,传入进去.


而,自己定义的任务需要用:


MethodInvokingJobDetailFactoryBean factoryBean = new MethodInvokingJobDetailFactoryBean();n factoryBean.setTargetObject(context.getBean(name));n factoryBean.setName(name);n factoryBean.setTargetMethod(QuartzScheduleTask.class.getMethods()[0].getName());n factoryBean.afterPropertiesSet();n这种方式添加任务,而这里factoryBean.setTargetObject(context.getBean(name)); ,setTargetObject这个就是设置,定时器对象的,这个对象是自己写的一个接口,定义了一个execute方法,但是MethodInvokingJobDetailFactoryBean ,这个类是不支持序列化的,也就是说,没办法配合quartz去实现分布式,不用quartz的分布式,就没办法实现系统的多活,因为quartz,提供的分布式功能,本身就可以保证,当一个集群去运行定时器的时候,可以保证一个定时器,只在同一个服务器实例上运行.而MethodInvokingJobDetailFactoryBean 不能序列化,就会导致,对应的task,没办法存入数据库,也就没办法实现分布式多活了.


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


也就是下面的方法就不行了,就考虑使用Job来创建JobDetail,但是这样的话,我们自己的任务


是需要继承Job这个quartz提供的类的. 本来是用下面的这个方法来做配置的.但是有问题


@Configurationn@ConfigurationProperties("quartz")n@Gettern@Setternpublic class QuartzScheduleConfig {n n private String schedulerName;n private String threadCount;n private String threadNamePrefix;n private String tasks;n private final ApplicationContext context;n n @Autowiredn public QuartzScheduleConfig(ApplicationContext context) {n this.context = context;n }n n @Beann public SchedulerFactoryBean schedulerFactoryBean() {n Properties properties = new Properties();n properties.setProperty("org.quartz.threadPool.threadCount", threadCount);n properties.setProperty("org.quartz.threadPool.threadNamePrefix", threadNamePrefix);n SchedulerFactoryBean factory = new SchedulerFactoryBean();n factory.setSchedulerName(schedulerName);n factory.setQuartzProperties(properties);n return factory;n }n n @Beann public Scheduler scheduler() throws Exception {n Scheduler scheduler = schedulerFactoryBean().getScheduler();n scheduler.scheduleJobs(createJobDetails(), true);n return scheduler;n }n n /**n * 创建JobDetailn * 使用是Spring的MethodInvokingJobDetailFactoryBean来创建JobDetailn * 使用Spring的ronTriggerFactoryBean来创建CronTriggern *n * @return Map<JobDetail, Set<CronTrigger>>n */n private Map<JobDetail, Set<? extends Trigger>> createJobDetails(){n Set<String> taskSet = StringUtils.commaDelimitedListToSet(tasks);n Map<JobDetail, Set<? extends Trigger>> map = new HashMap<>(taskSet.size());n for (String task : taskSet) {n String[] nameAndCron = task.split(":");n String name = StringUtils.uncapitalize(nameAndCron[0]);n String cron = nameAndCron[1];n MethodInvokingJobDetailFactoryBean factoryBean = new MethodInvokingJobDetailFactoryBean();n factoryBean.setTargetObject(context.getBean(name));n factoryBean.setName(name);n factoryBean.setTargetMethod(QuartzScheduleTask.class.getMethods()[0].getName());n factoryBean.afterPropertiesSet();n CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();n cronTriggerFactoryBean.setCronExpression(cron);n cronTriggerFactoryBean.setJobDetail(factoryBean.getObject());n cronTriggerFactoryBean.setName(name);n cronTriggerFactoryBean.afterPropertiesSet();n Set<CronTrigger> cronTriggerSet = new HashSet<>(1);n cronTriggerSet.add(cronTriggerFactoryBean.getObject());n map.put(factoryBean.getObject(), cronTriggerSet);n }n return map;n }n}n


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


Spring的MethodInvokingJobDetailFactoryBean是不支持序列化的就导致,quartz无法存入数据库了,


会报错,提示MethodInvokingJobDetailFactoryBean无法序列化.


11.这个时候怎么办? 就想到了,使用quartz中支持分布式的JobDetail来创建Job,这个时候我们,实现的自定义的任务,就需要继承Job这个类了,不能自己写一个接口了,这时候因为继承Job的这个定时任务,需要传入上下文context,而且,这个是系统调用的,不知道怎么传入初始化的参数,所以没能实现,


但是后来查找资料,发现可以通过jobMapData来给新创建的定时任务传递参数,如果这样可以的话


好像也行,这样的话就可以实现quartz的分布式了,也就是能保证集群中同时只有一个定时任务在执行.这样也就相当于实现了,系统的多活了,因为我们的系统中大部分都是定时任务,而非定时任务部分,在spingboot中做的接口,可以通过nginx这样来做负载均衡就行.


12.也就是使用:jobDataMap 用来给 job 传递数据;如果你的Job不是自己定义的接口,而是用


extends Job来实现的话,那么就可以使用这个jobDataMap,来在创建这个job的时候,给这个job传递


参数,只要有这个功能,你就可以在比如,有新的IP设备来的时候,就把参数先记录在某个HashMap中,


然后,在创建job的时候,获取参数放到对应的job中去.


JobDataMap属于JobDetail的一部分
可以在构建JobDataMap时传递参数


首先是在创建JobDetail的时候把参数数据填上:


//创建JobDetailn JobDetail jobDetail = JobBuilder.newJob(MyJob.class)n //唯一标识n .withIdentity("jobDetail1", "group1")n //添加参数通过usingJobData方法,传递给jobn .usingJobData("name","yx")n .build();n


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


然后就是用的时候在继承Job的任务类的execute方法中,从上下文中获取这个参数:


package com.gblfy.job;nnimport org.quartz.Job;nimport org.quartz.JobDataMap;nimport org.quartz.JobExecutionContext;nimport org.quartz.JobExecutionException;nnimport java.time.LocalTime;nnpublic class MyJob implements Job {nn @Overriden public void execute(JobExecutionContext context) throws JobExecutionException {nn LocalTime localTime = LocalTime.now();n /*n 1.从上下文中先获取getJobDetailn 2.再从getJobDetail中获取JobDataMapn 3.最后,从JobDataMap中,根据key获取对应属性的值n */n JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();n String name = jobDataMap.getString("name");nn System.out.println("hello " + name + "我正在执行" + localTime);n }n}nn


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


这样的话,也就是说,我们之前构想的利用quartz实现分布式,多活是可以的,但是,缺不满足我们的另一个想法,我们需要,对系统进行实时的监控,这部分还是需要我们自己来做,所以,后来就没有在用


quartz来做多活了.


12.来看一下,我们做的quartz任务管理的部分代码:


首先QuartzSchduleConfig这个是配置类:


系统启动会自动加载这个类,如果,系统配置了,任务的话会自动在这个地方进行加载:


import lombok.Getter;nimport lombok.Setter;nimport org.quartz.*;nimport org.springframework.beans.factory.annotation.Autowired;nimport org.springframework.beans.factory.annotation.Qualifier;nimport org.springframework.beans.factory.annotation.Value;nimport org.springframework.context.ApplicationContext;nimport org.springframework.context.annotation.Bean;nimport org.springframework.context.annotation.Configuration;nimport org.springframework.scheduling.quartz.CronTriggerFactoryBean;nimport org.springframework.scheduling.quartz.JobDetailFactoryBean;nimport org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;nimport org.springframework.scheduling.quartz.SchedulerFactoryBean;nimport org.springframework.util.StringUtils;nnimport javax.sql.DataSource;nimport java.text.ParseException;nimport java.util.*;nn@Configurationnpublic class QuartzScheduleConfig {nn @Value("${quartz.scheduler-name}")n private String schedulerName;n @Value("${quartz.thread-count}")n private String threadCount;n @Value("${quartz.thread-name-prefix}")n private String threadNamePrefix;n @Value("${quartz.tasks}")n private String tasks;nn static final String jobGroupName = "jobGroup";n static final String triggerGroupName = "triggerGroup";nn// @Autowiredn// @Qualifier(value = "dataSource")n// private DataSource dataSource;n// 注意这里如果打开的话,quartz在添加任务的时候就会自动的吧任务添加到数据库中的数据表去,n//nn private final ApplicationContext context;nnn @Autowiredn public QuartzScheduleConfig(ApplicationContext context) {n this.context = context;n }nn @Beann public SchedulerFactoryBean schedulerFactoryBean(){n Properties properties = new Properties();n properties.setProperty("org.quartz.threadPool.threadCount",threadCount);n properties.setProperty("org.quartz.threadPool.threadNamePrefix",threadNamePrefix);n SchedulerFactoryBean factoryBean = new SchedulerFactoryBean();n //factoryBean.setDataSource(dataSource);n factoryBean.setSchedulerName(schedulerName);n factoryBean.setQuartzProperties(properties);n return factoryBean;n }nn @Beann public Scheduler scheduler() throws Exception{n Scheduler scheduler = schedulerFactoryBean().getScheduler();n scheduler.scheduleJobs(createJobDetails(),true);n return scheduler;n }nn /**n * 创建JobDetailn * 使用Spring的MethodInvokingJobDetailFactoryBean来创建Detailn * @returnn * @throws NoSuchMethodExceptionn * @throws ClassNotFoundExceptionn * @throws ParseExceptionn */n private Map<JobDetail,Set<? extends Trigger>> createJobDetails() throws NoSuchMethodException, ClassNotFoundException, ParseException {n Set<String> taskSet = StringUtils.commaDelimitedListToSet(tasks);n Map<JobDetail,Set<? extends Trigger>> map = new HashMap<>(taskSet.size());n for (String task : taskSet) {n String[] nameAndCron = task.split(":");n String name = StringUtils.uncapitalize(nameAndCron[0]);n String cron = nameAndCron[1];nn //集群模式下不能使用 MethodInvokingJobDetailFactoryBean 不支持序列化n MethodInvokingJobDetailFactoryBean factoryBean = new MethodInvokingJobDetailFactoryBean();n factoryBean.setTargetObject(context.getBean(name));n factoryBean.setName(name);n factoryBean.setTargetMethod(QuartzScheduleTask.class.getMethods()[0].getName());n factoryBean.afterPropertiesSet();nn //JobDetail jobDetail = org.quartz.JobBuilder.newJob(ReportTimeTask.class).withDescription(name).withIdentity(name,jobGroupName).build();nn CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();n cronTriggerFactoryBean.setCronExpression(cron);n cronTriggerFactoryBean.setJobDetail(factoryBean.getObject());n cronTriggerFactoryBean.setName(name);n cronTriggerFactoryBean.afterPropertiesSet();nn Set<CronTrigger> cronTriggerSet = new HashSet<>();n cronTriggerSet.add(cronTriggerFactoryBean.getObject());n map.put(factoryBean.getObject(),cronTriggerSet);nn }nn return map;n }n}n


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


13.然后我们再来看,如何写一个quartz的管理类,来实现任务的添加,删除,修改.


nnimport org.quartz.*;nimport org.quartz.impl.StdSchedulerFactory;nimport org.springframework.beans.factory.annotation.Autowired;nimport org.springframework.context.ApplicationContext;nimport org.springframework.scheduling.quartz.CronTriggerFactoryBean;nimport org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;nnimport java.util.concurrent.ConcurrentHashMap;nn/**n * 定时任务管理器 管理任务n */nnpublic class QuartzManager {n private static SchedulerFactory schedulerFactory = new StdSchedulerFactory();n static final String jobGroupName = "jobGroup";n static final String triggerGroupName = "triggerGroup";n // 保存心跳定时任务管理表对象n public static ConcurrentHashMap<String, QuartzTaskEntity> AllTasksManager = new ConcurrentHashMap<>();nn private final ApplicationContext context;nn @Autowiredn public QuartzManager(ApplicationContext context) {n this.context = context;n }nn /**n * 添加任务n * @param name 任务名称n * @param cron 任务cron表达式n * @param clazz 任务类n */n public static void addTask(String name, String cron, QuartzScheduleTask clazz){n try{n //添加到任务管理对象中n QuartzTaskEntity quartzTaskEntity = new QuartzTaskEntity();n quartzTaskEntity.setName(name);n quartzTaskEntity.setCron(cron);n quartzTaskEntity.setQuartzScheduleTask(clazz);n AllTasksManager.put(name,quartzTaskEntity);nn Scheduler scheduler = schedulerFactory.getScheduler();n //任务名 任务组 任务执行类n MethodInvokingJobDetailFactoryBean factoryBean = new MethodInvokingJobDetailFactoryBean();n factoryBean.setTargetObject(clazz);n factoryBean.setName(name);n factoryBean.setTargetMethod(QuartzScheduleTask.class.getMethods()[0].getName());n factoryBean.afterPropertiesSet();n factoryBean.setGroup(jobGroupName);nn //JobDetail jobDetail = org.quartz.JobBuilder.newJob(clazz.getClass()).n // withDescription(name).withIdentity(name,jobGroupName).build();nn CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();n cronTriggerFactoryBean.setCronExpression(cron);n cronTriggerFactoryBean.setJobDetail(factoryBean.getObject());n cronTriggerFactoryBean.setName(name);n cronTriggerFactoryBean.afterPropertiesSet();n cronTriggerFactoryBean.setGroup(triggerGroupName);nn //JobDetail jobDetail = new JobDetailImpl(name,jobGroupName,clazz);n //scheduler.scheduleJob(factoryBean.getObject(),cronTriggerFactoryBean.getObject());nn scheduler.scheduleJob(factoryBean.getObject(),cronTriggerFactoryBean.getObject());nn if(!scheduler.isShutdown()){n scheduler.start();n }nn }catch (Exception e){n e.printStackTrace();n }n }nn /**n * 修改任务n * @returnn */n public static void modifyTask(String name,String cron){n try{n Scheduler scheduler = schedulerFactory.getScheduler();n JobKey jobKey = new JobKey(name);n JobDetail jobDetail = scheduler.getJobDetail(jobKey);n if(jobDetail!=null){n removeJob(name);n QuartzScheduleTask quartzScheduleTask = AllTasksManager.get(name).getQuartzScheduleTask();n addTask(name,cron,quartzScheduleTask);n }n }catch (Exception e){n e.printStackTrace();n }n }nn /**n * 获取任务实例n * @param namen * @returnn */n public static QuartzScheduleTask getTask(String name){n QuartzScheduleTask quartzScheduleTask = AllTasksManager.get(name).getQuartzScheduleTask();n return quartzScheduleTask;n }nn /**n * 移除任务n * @param namen */n public static void removeJob(String name){n try{n Scheduler scheduler = schedulerFactory.getScheduler();n //停止触发器n TriggerKey triggerKey = new TriggerKey(name,triggerGroupName);n scheduler.pauseTrigger(triggerKey);n //移除触发器n scheduler.unscheduleJob(triggerKey);n //删除任务n JobKey jobKey = new JobKey(name,jobGroupName);n scheduler.deleteJob(jobKey);n }catch (Exception e){n e.printStackTrace();n }n }nnn}n


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


上面就实现了动态的任务的,添加,删除,修改,但是注意上面这个做法是不支持序列化的,因为用到了,


MethodInvokingJobDetailFactoryBean ,这个不支持序列化的bean来实现创建jobDetail.


并且,我这里的job,是我自己定义的,可以看看


14.定义task,这里是自定义的task,不是继承自job的.


定义一个顶层抽象的task:


/**n * 定时任务接口n * @author hcteln */npublic abstract class QuartzScheduleTask {nn /**n * 该方法定义某个具体的定时任务n */n public abstract void execute();nn}n


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


15.然后再去定义一个具体的任务:


import com.alibaba.fastjson.JSON;nimport com.alibaba.fastjson.JSONObject;nimport com.sgcc.lnsoft.common.collector.constans.CommonDefine;nimport com.sgcc.lnsoft.common.collector.util.CommonUtil;nimport com.sgcc.lnsoft.common.core.redis.RedisCache;nimport com.sgcc.lnsoft.common.core.utils.DateUtil;nimport com.sgcc.lnsoft.common.core.utils.WebUtil;nimport lombok.extern.slf4j.Slf4j;nimport org.apache.commons.lang3.StringUtils;nimport org.springframework.beans.factory.annotation.Autowired;nimport org.springframework.stereotype.Component;nnimport java.util.Date;nn@Slf4jn@Component //这里注意,如果启动的时候就希望加载到内存中去,并通过上面的n//我们说的配置类,加载的,可以加上这个@Component,加上以后,凡是在yml,或propertiesn//文件中配置的任务就会自动加载到内存中,如果不加这个@Component,那么后面我们可以n//通过我们那个管理类中的addTask方法,动态的添加也可以的.n@AllArgsConstructornpublic class ReportTimeTask extends QuartzScheduleTask {nn Private String IPStr;nn @Overriden public void execute() {nn n }n}n


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


16.然后我们再看一下去利用我们的管理类,去添加一个任务:


ReportTimeTask reportTimeTask = new ReportTimeTask(IPStr);n n QuartzManager.addTask(IPStr+"_reportTimeTask","cron表达式",reportTimeTask);


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


这样就添加了一个任务,任务添加以后,就会根据cron表达式自动执行了.


17.删除一个任务和修改一个任务就非常简单,这里不说了.


18.上面说的,如果想使用分布式的话,quartz,需要借助数据库,需要先把quartz的mysql表都创建出来,


然后在配置类中去,加上dataSource,然后再,创建jobDetail的时候一定要注意,这个时候,需要用:


JobDetail jobDetail = org.quartz.JobBuilder.newJob(ReportTimeTask.class)n .withDescription(name)n .withIdentity(name,jobGroupName)n .setJobData(jobDataMap).build();n


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活


这种方式来创建jobDetail,注意这里的ReportTimeTask,这个类就必须是继承了quartz提供的Job的


这个类才行.同时可以看到,里面有个setJobData() 这里,有个jobDataMap,这个就可以给我们的Job,


传递参数了,我们就可以通过这种方式来,动态的给我们的Job传递参数,这样既可以实现分布式,也可以给我们的Job动态传递参数了.


基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活

基于Quartz设计采集系统并实现系统双活机制_自己设计系统双活

​编辑