
动态管理定时任务:摆脱@Scheduled的束缚
动态管理定时任务:摆脱@Scheduled的束缚
在Spring Boot项目开发中,我们通常使用@Scheduled
注解来实现定时任务。然而,这种方式存在一些局限性:
- 定时任务的配置是固定的,修改需要重启应用
- 无法在运行时动态管理任务(启动、停止、修改)
- 缺乏对任务执行情况的监控和管理
本文将介绍一种改造Spring Boot定时任务的方案,通过数据库存储任务配置,实现任务的动态管理。这种方案可以通过后台管理界面对定时任务进行增删改查,无需重启应用即可生效。
核心设计思路
- 使用数据库存储定时任务配置信息
- 使用Spring的
ThreadPoolTaskScheduler
替代默认的定时任务执行器 - 实现任务的动态注册、启动、停止、重启等管理功能
- 应用启动时初始化所有需要自启的任务
数据库设计
首先,我们需要创建一个存储定时任务的表:
CREATE TABLE `t_scheduled_task` (
`id` varchar(36) NOT NULL COMMENT '任务ID',
`task_key` varchar(50) NOT NULL COMMENT '任务key',
`task_name` varchar(255) DEFAULT NULL COMMENT '任务名称',
`task_type` varchar(10) DEFAULT NULL COMMENT '任务类型',
`task_cron` varchar(255) DEFAULT NULL COMMENT 'cron表达式',
`init_start_flag` varchar(10) DEFAULT NULL COMMENT '是否初始化启动 0:否 1:是',
`start_flag` varchar(10) DEFAULT NULL COMMENT '是否启动 0:否 1:是',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定时任务表';
核心实现类
1. 定时任务配置类
首先,我们需要一个配置类来创建线程池和初始化任务列表:
@Configuration
@Slf4j
public class ScheduledTaskConfig {
private final ScheduledTaskMapper scheduledTaskMapper;
private final ApplicationContext applicationContext;
public ScheduledTaskConfig(ScheduledTaskMapper scheduledTaskMapper,
ApplicationContext applicationContext) {
this.scheduledTaskMapper = scheduledTaskMapper;
this.applicationContext = applicationContext;
}
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
log.info("创建定时任务调度线程池 start");
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(20);
threadPoolTaskScheduler.setThreadNamePrefix("taskExecutor-");
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
log.info("创建定时任务调度线程池 end");
return threadPoolTaskScheduler;
}
/**
* 初始化定时任务Map
* key :任务key
* value : 执行接口实现
*/
@Bean(name = "scheduledTaskJobMap")
public Map<String, Runnable> scheduledTaskJobMap() {
return initScheduledTask();
}
public Map<String, Runnable> initScheduledTask() {
List<ScheduledTask> scheduledTaskList = scheduledTaskMapper.getAllTask();
if (scheduledTaskList.size() == 0) {
return new ConcurrentHashMap<>();
}
Map<String, Runnable> scheduledTaskJobMap = new ConcurrentHashMap<>();
for (ScheduledTask scheduledTask : scheduledTaskList) {
TaskRunner taskRunner = applicationContext.getBean(TaskRunner.class, scheduledTask);
scheduledTaskJobMap.put(scheduledTask.getId(), taskRunner);
}
return scheduledTaskJobMap;
}
}
2. 定时任务服务实现
接下来,我们实现定时任务的核心管理逻辑:
@Service
@Slf4j
public class ScheduledTaskServiceImpl implements ScheduledTaskService {
private final ScheduledTaskMapper taskMapper;
private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
// 注入所有任务实现
@Autowired
@Qualifier(value = "scheduledTaskJobMap")
private Map<String, Runnable> scheduledTaskJobMap;
// 存放已启动任务的容器
private Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<>();
// 可重入锁,保证任务启动的线程安全
private ReentrantLock lock = new ReentrantLock();
public ScheduledTaskServiceImpl(ScheduledTaskMapper taskMapper,
ThreadPoolTaskScheduler threadPoolTaskScheduler) {
this.taskMapper = taskMapper;
this.threadPoolTaskScheduler = threadPoolTaskScheduler;
}
/**
* 获取所有任务列表
*/
@Override
public List<ScheduledTask> taskList() {
log.info(">>>>>> 获取任务列表开始 >>>>>> ");
List<ScheduledTask> taskBeanList = taskMapper.getAllTask();
if (CollectionUtils.isEmpty(taskBeanList)) {
return new ArrayList<>();
}
for (ScheduledTask taskBean : taskBeanList) {
String id = taskBean.getId();
//是否启动标记处理
taskBean.setStartFlag(this.isStart(id));
}
log.info(">>>>>> 获取任务列表结束 >>>>>> ");
return taskBeanList;
}
/**
* 根据任务id启动任务
*/
@Override
public Boolean start(String id) {
log.info(">>>>>> 启动任务 {} 开始 >>>>>>", id);
//添加锁放一个线程启动,防止多人启动多次
lock.lock();
log.info(">>>>>> 添加任务启动锁完毕");
try {
//校验是否已经启动
if ("1".equals(this.isStart(id))) {
log.info(">>>>>> 当前任务已经启动,无需重复启动!");
return false;
}
//校验任务是否存在
if (!scheduledTaskJobMap.containsKey(id)) {
return false;
}
//根据key获取任务配置信息
ScheduledTask scheduledTask = taskMapper.getById(id);
//启动任务
this.doStartTask(scheduledTask);
} finally {
// 释放锁
lock.unlock();
log.info(">>>>>> 释放任务启动锁完毕");
}
log.info(">>>>>> 启动任务 {} 结束 >>>>>>", id);
return true;
}
/**
* 根据任务id停止任务
*/
@Override
public Boolean stop(String id) {
log.info(">>>>>> 进入停止任务 {} >>>>>>", id);
//当前任务实例是否存在
boolean taskStartFlag = scheduledFutureMap.containsKey(id);
log.info(">>>>>> 当前任务实例是否存在 {}", taskStartFlag);
if (taskStartFlag) {
//获取任务实例
ScheduledFuture scheduledFuture = scheduledFutureMap.get(id);
//关闭实例
scheduledFuture.cancel(true);
}
log.info(">>>>>> 结束停止任务 {} >>>>>>", id);
return taskStartFlag;
}
/**
* 根据任务id重启任务
*/
@Override
public Boolean restart(String id) {
log.info(">>>>>> 进入重启任务 {} >>>>>>", id);
//先停止
this.stop(id);
//再启动
return this.start(id);
}
/**
* 移除已执行的任务
*/
@Override
public void removeTask(String id) {
log.info(">>>>>> 移除任务 {} 开始 >>>>>>", id);
scheduledFutureMap.remove(id);
log.info(">>>>>> 移除任务 {} 结束 >>>>>>", id);
}
/**
* 程序启动时初始化所有需要自启的任务
*/
@Override
public void initAllTask(List<ScheduledTask> scheduledTaskBeanList) {
log.info("程序启动 ==> 初始化所有任务开始 !size={}", scheduledTaskBeanList.size());
if (CollectionUtils.isEmpty(scheduledTaskBeanList)) {
return;
}
for (ScheduledTask scheduledTask : scheduledTaskBeanList) {
//任务id
String id = scheduledTask.getId();
//校验是否已经启动
if ("1".equals(this.isStart(id))) {
continue;
}
//启动任务
this.doStartTask(scheduledTask);
}
log.info("程序启动 ==> 初始化所有任务结束 !size={}", scheduledTaskBeanList.size());
}
/**
* 执行启动任务
*/
private void doStartTask(ScheduledTask scheduledTask) {
//任务id
String id = scheduledTask.getId();
//定时表达式
String taskCron = scheduledTask.getTaskCron();
//获取需要定时调度的接口
Runnable taskRunner = scheduledTaskJobMap.get(id);
log.info(">>>>>> 任务 [ {} ] ,cron={}", scheduledTask.getTaskName(), taskCron);
ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(taskRunner,
new Trigger() {
@Override
public Instant nextExecution(TriggerContext triggerContext) {
CronTrigger cronTrigger = new CronTrigger(taskCron);
return Instant.ofEpochMilli(cronTrigger.nextExecutionTime(triggerContext).getTime());
}
});
//将启动的任务放入map
scheduledFutureMap.put(id, scheduledFuture);
}
/**
* 任务是否已经启动
*/
private String isStart(String id) {
//校验是否已经启动
if (scheduledFutureMap.containsKey(id)) {
if (!scheduledFutureMap.get(id).isCancelled()) {
return "1";
}
}
return "0";
}
}
3. 任务执行器
任务执行器是实际执行业务逻辑的地方:
@Component
@Slf4j
public class TaskRunner implements Runnable {
private final ScheduledTask scheduledTask;
// 注入相关服务和工具
@Autowired
private UserService userService;
@Autowired
private MessageService messageService;
@Autowired
private ScheduledTaskService taskService;
public TaskRunner(ScheduledTask scheduledTask) {
this.scheduledTask = scheduledTask;
}
// 根据任务类型执行相应业务逻辑
@Override
public void run() {
log.info("ScheduledTask => 当前任务id {} ", scheduledTask.getId());
// 根据任务类型执行不同的业务逻辑
if ("TASK_TYPE_1".equals(scheduledTask.getTaskKey())) {
// 执行类型1的业务逻辑
executeTaskType1();
} else if ("TASK_TYPE_2".equals(scheduledTask.getTaskKey())) {
// 执行类型2的业务逻辑
executeTaskType2();
// 任务执行完成后,如果是一次性任务,可以停止并移除
taskService.stop(scheduledTask.getId());
taskService.removeTask(scheduledTask.getId());
}
}
private void executeTaskType1() {
// 实现具体业务逻辑
log.info("执行任务类型1的业务逻辑");
}
private void executeTaskType2() {
// 实现具体业务逻辑
log.info("执行任务类型2的业务逻辑");
}
}
4. 应用启动自动初始化任务
在应用启动时,我们需要自动初始化和启动所有标记为自启的任务:
@Component
@Order(value = 1)
public class ScheduledTaskRunner implements ApplicationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTaskRunner.class);
@Autowired
private ScheduledTaskMapper taskMapper;
@Autowired
private ScheduledTaskService scheduledTaskService;
/**
* 程序启动完毕后,初始化需要自启的任务
*/
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
LOGGER.info(" >>>>>> 项目启动完毕, 开启 => 需要自启的任务 开始!");
List<ScheduledTask> scheduledTaskBeanList = taskMapper.getAllNeedStartTask();
scheduledTaskService.initAllTask(scheduledTaskBeanList);
LOGGER.info(" >>>>>> 项目启动完毕, 开启 => 需要自启的任务 结束!");
}
}
5. Cron表达式工具类
为了便于生成Cron表达式,我们可以提供一个工具类:
public class CronUtil {
private static final SimpleDateFormat sdf = new SimpleDateFormat("ss mm HH dd MM ? yyyy");
/**
* 日期转换cron表达式
* @param date 时间点
* @return cron表达式
*/
public static String getCron(Date date) {
String formatTimeStr = null;
if (Objects.nonNull(date)) {
formatTimeStr = sdf.format(date);
}
return formatTimeStr;
}
/**
* 获取指定日期的cron表达式
*/
public static String getCron(String year, String week, String month, String day, String hour, String minutes, String seconds) {
return seconds + " " + minutes + " " + hour + " " + day + " " + month + " " + week + " " + year;
}
// 其他重载方法略
}
使用示例
下面展示如何在业务代码中使用这套动态定时任务框架:
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private ScheduledTaskMapper scheduledTaskMapper;
@Autowired
private ScheduledTaskService scheduledTaskService;
@Autowired
private ScheduledTaskConfig scheduledTaskConfig;
/**
* 创建用户并设置定期修改密码的定时任务
*/
@Transactional(rollbackFor = Exception.class)
public Result<Object> createUser(UserDTO userDTO) {
// 用户创建逻辑
User user = new User();
user.setId(UUID.randomUUID().toString());
user.setUserName(userDTO.getUserName());
// 设置其他用户信息
// 保存用户
userMapper.insert(user);
// 创建定时任务 - 定期提醒修改密码
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
// 设置3个月后提醒修改密码
cal.add(Calendar.MONTH, 3);
// 生成Cron表达式
String cron = CronUtil.getCron(cal.getTime());
// 创建并保存定时任务
ScheduledTask scheduledTask = ScheduledTask.builder()
.id(user.getId())
.taskKey("UPDATE_PASSWORD")
.taskName("提醒用户修改密码")
.taskType("0")
.taskCron(cron)
.initStartFlag("1") // 需要自启
.startFlag("1") // 立即启动
.build();
scheduledTaskMapper.insert(scheduledTask);
// 重新加载任务映射并启动任务
scheduledTaskConfig.scheduledTaskJobMap();
scheduledTaskService.start(scheduledTask.getId());
return Result.success();
}
}
管理接口
你还可以开发一套管理接口,用于可视化管理定时任务:
@RestController
@RequestMapping("/api/task")
public class ScheduledTaskController {
@Autowired
private ScheduledTaskService scheduledTaskService;
@Autowired
private ScheduledTaskMapper scheduledTaskMapper;
/**
* 获取任务列表
*/
@GetMapping("/list")
public Result<List<ScheduledTask>> list() {
List<ScheduledTask> list = scheduledTaskService.taskList();
return Result.success(list);
}
/**
* 启动任务
*/
@PostMapping("/start/{id}")
public Result<Boolean> start(@PathVariable("id") String id) {
Boolean result = scheduledTaskService.start(id);
return Result.success(result);
}
/**
* 停止任务
*/
@PostMapping("/stop/{id}")
public Result<Boolean> stop(@PathVariable("id") String id) {
Boolean result = scheduledTaskService.stop(id);
return Result.success(result);
}
/**
* 重启任务
*/
@PostMapping("/restart/{id}")
public Result<Boolean> restart(@PathVariable("id") String id) {
Boolean result = scheduledTaskService.restart(id);
return Result.success(result);
}
/**
* 保存任务
*/
@PostMapping("/save")
public Result<Boolean> saveTask(@RequestBody ScheduledTask scheduledTask) {
scheduledTaskMapper.insert(scheduledTask);
return Result.success(true);
}
/**
* 更新任务
*/
@PostMapping("/update")
public Result<Boolean> updateTask(@RequestBody ScheduledTask scheduledTask) {
scheduledTaskMapper.update(scheduledTask);
return Result.success(true);
}
}
总结
通过这套方案,我们实现了定时任务的动态管理,主要优势包括:
- 动态管理:可以在运行时动态添加、修改、启动、停止任务,无需重启应用
- 可视化管理:可以结合前端页面,实现任务的可视化管理
- 高可靠性:使用数据库存储任务配置,提高了可靠性
- 灵活性:可以根据业务需求,定制不同类型的任务处理逻辑
- 监控友好:可以轻松实现任务执行情况的监控和记录
相比Spring Boot自带的@Scheduled
注解,这种方案更加灵活和强大,特别适合对定时任务有动态管理需求的系统。
注意事项:
- 需要合理设计数据库表结构,保存任务的关键信息
- 在高并发情况下,要注意任务的并发控制
- 对于长时间运行的任务,考虑添加超时处理机制
- 建议增加任务执行日志,便于问题排查
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员橙子
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果