背景
一个应用,需要定时通过 「SFTP/FTP/本地文件」等方式,采集指定目录及其子目录下所有满足文件名称规则的文件。
定时任务用 Quartz 触发,为了提高效率,为每个目录启动一个独立线程去采集。采集程序 Quartz 的 Job 流程如下:
- 搜集目录:递归目录,统计包含文件的目录,并收集到所有需要采集的目录列表。
- 启动采集线程:对每个目录,启动一个线程去遍历文件。
这个思路没问题,但是问题在于,我们不知道这些采集线程什么时候结束。Quartz 的定时周期是用户设定的,可大可小。怎么保证,在上一轮任务未执行完成时,新一轮的采集任务不会对上次的任务产生干扰呢?
本文将介绍对这个问题的优化过程,答案很简单,就是并发控制,两个技术点:
- Quartz 提供了
@DisallowConcurrentExecution
注解,可以禁止任务并行。 - JUC 包提供了丰富的同步控制工具,例如
CyclicBarrier
和CountDownLatch
都可以实现该目标。
简单易用的话,就选 CountDownLatch
。
顾名思义,减数锁,它就是一个减数器。打个比方,N 个同时工作的人,每个人都有使用它的权利。谁完成了工作就按下它,计数器减一,当计数器减少到 0 时,说明所有人都完成了工作,那么总管就可以拿汇总结果来继续其他的事情了。
旧的实现思路及问题
代码前任实现的思路是这样的:
- 为每个目录开启一个线程,并全局维护已经创建的线程名称。
- Quartz 的 Job 执行时,判断当前目录是否存在工作线程,如果不存在,直接为当前目录创建一个工作线程,线程名称为目录名称,并加入缓存。
- 如果存在,则利用
Thread.currentThread().getThreadGroup()
获取正在运行的线程,找到名称为当前目录的线程后,再调用它的stop()
方式,停止它。 - Quartz 的 Job 默认任务是并行。并行的含义就是,调度周期一到,不管上一轮的任务是否结束,都会开启本轮任务。
这个流程存在的问题是,没有考虑文件 IO 操作阻塞的情况。当定时任务周期过短、采集文件数量过多时,新一轮任务在检测到某目录存在正在执行的线程时,调用线程的 stop()
方法、试图停止该线程,而目标线程可能处于IO阻塞状态没有响应。
随着时间的推移,产生了大量的采集线程,导致文件采集程序运行一段时间后,就不采了,要重启程序才行。有一次分析堆栈发现,线程总数上千,都处于 BLOCKED
状态,为什么都处于了阻塞状态呢?我也没有分析出是什么原因。猜测可能跟这个 stop()
线程没停掉有关。
重构后的流程
对于祖传代码,一直秉承一个原则:「只要功能正常,不动;代码再烂,忍着看看就好」。但这个功能处于并发环境中,而且用到地方很多,SFTP 文件采集过程中,经常运行一段时间后就不采集了,总让项目重启也不是办法。
于是在不动核心流程的情况下,优化了并发控制过程,至少可以保证流程清晰、由 JUC 工具控制多线程的同步,不会出现一个目录存在大量工作线程的情况。
具体思路:
- 调整 Quartz 的采集 Job 的类型,使用
@DisallowConcurrentExecution
注解,禁止任务并行。 - Quartz 的 Job 作为总控制线程,它维护一个
CountDownLatch
同步控制锁,锁的总数是开启的采集线程个数。它开启 N 个目录的采集线程后,就用latch.await()
阻塞自己,等着所有采集线程执行完成。 - 采集线程也维护相同的
CountDownLatch
对象,它的run()
方法执行完成时,计数器减一。
Quartz 的 Job 流程伪代码如下:
1 | java复制代码public void execute(JobExecutionContext context) throws JobExecutionException { |
采集线程 MyRunnable
的逻辑:
1 | java复制代码private class MyRunnable implements Runnable { |
启示录
第一,任何改动都可能存在问题,即使只改一行代码,也可能因为粗心大意都存在问题,所以代码测试很重要。
第二,思考一个问题,Thread
类的 stop()
方法,到底能不能停止一个线程呢?从现场发回的堆栈分析看,大量线程都处于 BLOCKED
状态,强制停止线程代码前后,线程数量并没有少,可以肯定 stop()
方法在那个场景下并没有生效。
验证:模拟 IO 阻塞时,强制调用线程的 stop()
,测试代码如下:
1 | java复制代码public static void main(String[] args) { |
这里通过 ServerSocket 可以模拟 IO 阻塞,验证结果:
确定两点:
- 调用
stop()
方法后,那个线程还在,程序没有退出。 ServerSocket.accept()
方法调用后,线程虽然阻塞了,但是主线程中打印的状态仍然是 Runnable 。
结论:对于因 IO 阻塞的线程,stop() 方法无效。
第三,回顾一下线程状态图:
注:此图来自博客园某博主的图片,原图链接
根据这个图的状态,在非 IO 阻塞状态下,比如因 sleep()、latch.wait()
等操作而进入阻塞状态时,Thread.stop()
方法是可以停止线程的,前面的测试方法 Socket 接收操作换成 sleep()
是可以结束的。
第四,Quartz 定时调度,感觉大部分情况下,任务都不需要并行,所以使用的时候一定要知道禁止任务并行的技术点。对于耗时长的任务,有必要禁止并行。再手动敲一遍这个注解 @DisallowConcurrentExecution
,字面意思是「不允许并发执行」。
本文转载自: 掘金