朋友写bug导致公司资损几百万,被裁一脚踢飞,提着破包滚蛋!

  • 👏作者简介:大家好,我是爱敲代码的小黄,阿里巴巴淘天Java开发工程师,CSDN博客专家
  • 📕系列专栏:Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2024计划中:以梦为马,扬帆起航,2024追梦人
  • 📝联系方式:smallyellow521,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

一、背景

大家好呀,好久没有更新博文了

最近,我有一个朋友写了一个 bug,导致线上损失了几百万

说是bug,也不算bug

罪魁祸首就是 @Scheduler 这个注解

如果你的项目中用到了这个注解,我真的建议你花几分钟看一下这个问题

说不定下一个踩坑被裁的就是你

废话不多说,我们一起来看看这个注解有啥隐藏的问题

二、架构

朋友线上的架构如下:

简单介绍下业务逻辑:

  • 调度引擎将一些SQL封装成SQL任务提交至大数据平台
  • 大数据平台执行任务返回结果于调度引擎执行后续逻辑
  • 监控系统启动定时任务定时扫描SQL Task,若超过90分钟还未返回结果,重新提交该任务

三、问题

当天朋友正乐呵呵的工作,想着下班后去哪搞点好吃的

突然,下游找上门来,问:你们怎么回事,离线表已经好久没有产出了,客户都投诉了!

前面还风淡云轻,听到投诉,立马跳了起来:我靠,怎么回事

朋友排查之后发现:线上SQL Task全部超时导致离线表未产出

奇怪,按照上述的描述,我们的监控系统应该能识别到超时的任务并将其重新启动

当然,这种危机关头也不是查询原因的时候,赶紧先重新将这些任务启动再说

后面排查之后发现:监控系统的超时扫描定时任务未启动,从而导致线上超时离线任务未重新提交

这里先埋一个伏笔:监控系统不止一个定时任务

四、原理

OK~终于到了解密了时刻了

1、业务逻辑

我们先看下业务代码如何写的

1
2
3
4
5
6
7
8
9
java复制代码@SpringBootApplication
@EnableScheduling
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

}

定时任务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Component
public class SchedulerTest {

// 初始化5秒,之后每隔5秒执行一次
@Scheduled(initialDelay = 5000, fixedRate = 5000)
public void scheduled1() {
System.out.println("I am scheduled1, thread = " + Thread.currentThread());
}

@Scheduled(initialDelay = 5000, fixedRate = 5000)
public void scheduled2() {
System.out.println("I am scheduled2, thread = " + Thread.currentThread());
}

@Scheduled(initialDelay = 5000, fixedRate = 5000)
public void scheduled3() {
System.out.println("I am scheduled3, thread = " + Thread.currentThread());
}
}

启动程序输出:

1
2
3
java复制代码I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled2, thread = Thread[scheduling-1,5,main]
I am scheduled3, thread = Thread[scheduling-1,5,main]

通过输出你能看出来什么原因嘛?

2、报错逻辑

我们将上述的 SchedulerTest稍微修改一下:将第一个定时任务改为死循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Component
public class SchedulerTest {

// 初始化5秒,之后每隔5秒执行一次
@Scheduled(initialDelay = 5000, fixedRate = 5000)
public void scheduled1() throws InterruptedException {
while (true) {
System.out.println("I am scheduled1, thread = " + Thread.currentThread());
Thread.sleep(5000);
}
}

@Scheduled(initialDelay = 5000, fixedRate = 5000)
public void scheduled2() throws InterruptedException {
while (true) {
System.out.println("I am scheduled2, thread = " + Thread.currentThread());
Thread.sleep(5000);
}
}

@Scheduled(initialDelay = 5000, fixedRate = 5000)
public void scheduled3() throws InterruptedException {
while (true) {
System.out.println("I am scheduled3, thread = " + Thread.currentThread());
Thread.sleep(5000);
}

}
}

输出数据:

1
2
3
4
5
6
7
8
9
java复制代码I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]

有没有发现什么问题?

3、原理揭露

通过上面两个案例,我们很明显的可以看出来:**我们 **@Scheduled是单线程的!!!

所以,当我们存在三个定时任务时,其中一个定时任务卡死,就会导致其余定时任务无法启动
从而造成线上故障

那问题来了,整个 Spring是如何对 @Scheduled解析和运行的呢,底层又是如何结合实现的

黄哥主打的就是源码,接下来开始源码揭露

4、源码揭露

我们可以直接看其源码:

4.1 扫描注解

我们先从 ScheduledAnnotationBeanPostProcessor这个类看起

相信看过之前Spring源码解析系列的人应该认识这个这个类吧

对的,经典的后置处理器

主要的作用:Bean初始化过程中扫描目标Bean中的@Scheduled注解,为其创建相应的调度任务

浅挖下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码public Object postProcessAfterInitialization(Object bean, String beanName) {

// 获取当前类描述
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);

// 检查目标类targetClass是否已经存在于nonAnnotatedClasses集合中
// targetClass是否是Scheduled.class或Schedules.class的候选类
if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {

// 获取当前带有Scheduled注解的方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
});

// 如果注解方法为空的,则将类加入Set中,避免下一次判断
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
}else {
// 反之遍历其带有注解的方法,封装成
annotatedMethods.forEach((method, scheduledAnnotations) ->
scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
}

比如上述例子:
image.png

4.2 封装任务

在上面的 processScheduled(scheduled, method, bean)方法里面,封装了我们的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
// 将当前的bean和方法封装成Runable
Runnable runnable = createRunnable(bean, method);

// 获取初始化延时时间
long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
// 获取cron命令
String cron = scheduled.cron();

// 根据模式不同注册不同的定时任务

long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
if (fixedDelay >= 0) {
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}

long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
if (fixedRate >= 0) {
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}

所以,我们的重点就在这一句了:tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
go复制代码public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
// 等到后续应用事件初始化后,才会正式运行该任务
if (this.taskScheduler != null) {
if (task.getInitialDelay() > 0) {
Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());
}
else {
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
}
}
else {
// 第一次进入会走这里添加对应的定时任务
addFixedRateTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}

我们在第一次运行时,该函数只会向fixedRateTasks中添加对应的task

4.3 任务运行

我们继续回到 ScheduledAnnotationBeanPostProcessor中,找到 onApplicationEvent这个方法

onApplicationEvent是Spring框架中的一个方法,用于处理应用程序事件。它是ApplicationListener接口的方法之一,用于响应特定类型的应用程序事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
go复制代码public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
// Running in an ApplicationContext -> register tasks this late...
// giving other ContextRefreshedEvent listeners a chance to perform
// their work at the same time (e.g. Spring Batch's job registration).
finishRegistration();
}
}
private void finishRegistration() {
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}

// ........
this.registrar.afterPropertiesSet();
}

OK,这一句也就是我们的重点:this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));

1
2
3
4
go复制代码// 这里将我们的taskScheduler进行初始化
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}

最后执行 afterPropertiesSet方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
go复制代码public void afterPropertiesSet() {
scheduleTasks();
}

protected void scheduleTasks() {
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
}

public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
// 如果当前的taskScheduler不为null的话
if (this.taskScheduler != null) {
if (task.getInitialDelay() > 0) {
Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());
// 提交我们的定时任务
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());
}
else {
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
}
}
}

那这里是如何提交任务运行的呢

image.png

继续往下追代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {
// 获取ScheduledExecutorService执行类
ScheduledExecutorService executor = getScheduledExecutor();
long initialDelay = startTime.getTime() - this.clock.millis();
try {
// 调用JUC包进行任务运行
return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

OK,到这里我们基本梳理清楚了

但还有最后一个问题,那就是 ThreadPoolTaskScheduler是如何来的?

回到我们一开始的this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));

我们可以看到 resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)中使用 TaskScheduler.class

继续往下看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码private <T> T resolveSchedulerBean(BeanFactory beanFactory, Class<T> schedulerType, boolean byName) {
if (beanFactory instanceof AutowireCapableBeanFactory) {
// 这里使用TaskScheduler.class获取bean
NamedBeanHolder<T> holder = ((AutowireCapableBeanFactory) beanFactory).resolveNamedBean(schedulerType);
if (this.beanName != null && beanFactory instanceof ConfigurableBeanFactory) {
((ConfigurableBeanFactory) beanFactory).registerDependentBean(holder.getBeanName(), this.beanName);
}
return holder.getBeanInstance();
}
else {
return beanFactory.getBean(schedulerType);
}
}

在往下追的话:

image.png

这个类应该不用介绍了吧,讲了几百遍了

那为什么我们通过 TaskScheduler.class可以得到 ThreadPoolTaskScheduler呢?

很简单,因为这哥们实现了TaskScheduler接口

1
2
java复制代码public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {

但这哥们有个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
private volatile int poolSize = 1;

/**
* Set the ScheduledExecutorService's pool size.
* Default is 1.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setPoolSize(int poolSize) {
Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher");
if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
((ScheduledThreadPoolExecutor) this.scheduledExecutor).setCorePoolSize(poolSize);
}
this.poolSize = poolSize;
}
}

是的,这坑爹的ThreadPoolTaskScheduler线程池默认为1

然后所有使用 @Scheduled的定时任务共有这一个注解

所以,我们任务如果一个运行不完,其余任务都在阻塞着

这就是问题的源码根因

4、解决方式

有两种解决方式:

  • 增加配置类
  • 配置文件新增配置

4.1 增加配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Configuration
public class ScheduleConfig {
/**
* 修复同一时间无法执行多个定时任务问题。
* @Scheduled默认是单线程的
*/
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
//核心线程池数量,方法: 返回可用处理器的Java虚拟机的数量。
taskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors() * 2);
return taskScheduler;
}
}

4.2 配置文件

1
properties复制代码spring.task.scheduling.pool.size=10

五. 总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

如果你也对 后端架构中间件源码 有兴趣,欢迎添加博主微信:smallyellow521,一起学习,一起成长

我是爱敲代码的小黄,阿里巴巴淘天集团核心事业部Java高级开发工程师,双非二本,培训班出身

通过两年努力,成功拿下阿里、百度、美团、滴滴、快手、拼多多等大厂,想通过自己的事迹告诉大家,努力是会有收获的!

双非本两年经验,我是如何拿下阿里、百度、美团、滴滴、快手、拼多多等大厂offer的?

我们下期再见。

从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%