定时采集文件时用CountDownLatch同步控制案例

背景

一个应用,需要定时通过 「SFTP/FTP/本地文件」等方式,采集指定目录及其子目录下所有满足文件名称规则的文件。

定时任务用 Quartz 触发,为了提高效率,为每个目录启动一个独立线程去采集。采集程序 Quartz 的 Job 流程如下:

  1. 搜集目录:递归目录,统计包含文件的目录,并收集到所有需要采集的目录列表。
  2. 启动采集线程:对每个目录,启动一个线程去遍历文件。

这个思路没问题,但是问题在于,我们不知道这些采集线程什么时候结束。Quartz 的定时周期是用户设定的,可大可小。怎么保证,在上一轮任务未执行完成时,新一轮的采集任务不会对上次的任务产生干扰呢?

本文将介绍对这个问题的优化过程,答案很简单,就是并发控制,两个技术点:

  1. Quartz 提供了 @DisallowConcurrentExecution 注解,可以禁止任务并行。
  2. JUC 包提供了丰富的同步控制工具,例如 CyclicBarrierCountDownLatch 都可以实现该目标。

简单易用的话,就选 CountDownLatch

顾名思义,减数锁,它就是一个减数器。打个比方,N 个同时工作的人,每个人都有使用它的权利。谁完成了工作就按下它,计数器减一,当计数器减少到 0 时,说明所有人都完成了工作,那么总管就可以拿汇总结果来继续其他的事情了。

旧的实现思路及问题

代码前任实现的思路是这样的:

  1. 为每个目录开启一个线程,并全局维护已经创建的线程名称。
  2. Quartz 的 Job 执行时,判断当前目录是否存在工作线程,如果不存在,直接为当前目录创建一个工作线程,线程名称为目录名称,并加入缓存。
  3. 如果存在,则利用 Thread.currentThread().getThreadGroup() 获取正在运行的线程,找到名称为当前目录的线程后,再调用它的 stop() 方式,停止它。
  4. Quartz 的 Job 默认任务是并行。并行的含义就是,调度周期一到,不管上一轮的任务是否结束,都会开启本轮任务。

这个流程存在的问题是,没有考虑文件 IO 操作阻塞的情况。当定时任务周期过短、采集文件数量过多时,新一轮任务在检测到某目录存在正在执行的线程时,调用线程的 stop() 方法、试图停止该线程,而目标线程可能处于IO阻塞状态没有响应。

随着时间的推移,产生了大量的采集线程,导致文件采集程序运行一段时间后,就不采了,要重启程序才行。有一次分析堆栈发现,线程总数上千,都处于 BLOCKED 状态,为什么都处于了阻塞状态呢?我也没有分析出是什么原因。猜测可能跟这个 stop() 线程没停掉有关。

重构后的流程

对于祖传代码,一直秉承一个原则:「只要功能正常,不动;代码再烂,忍着看看就好」。但这个功能处于并发环境中,而且用到地方很多,SFTP 文件采集过程中,经常运行一段时间后就不采集了,总让项目重启也不是办法。

于是在不动核心流程的情况下,优化了并发控制过程,至少可以保证流程清晰、由 JUC 工具控制多线程的同步,不会出现一个目录存在大量工作线程的情况。

具体思路:

  1. 调整 Quartz 的采集 Job 的类型,使用 @DisallowConcurrentExecution 注解,禁止任务并行。
  2. Quartz 的 Job 作为总控制线程,它维护一个 CountDownLatch 同步控制锁,锁的总数是开启的采集线程个数。它开启 N 个目录的采集线程后,就用 latch.await() 阻塞自己,等着所有采集线程执行完成。
  3. 采集线程也维护相同的 CountDownLatch 对象,它的 run() 方法执行完成时,计数器减一。

Quartz 的 Job 流程伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码public void execute(JobExecutionContext context) throws JobExecutionException {
long start = System.currentTimeMillis();

// 递归收集采集目录
String[] dirsToRead = "/dirA,/dirB,/dirC,/dirD".split(",");
Set<String> finalDir = new HashSet<>(dirsToRead.length);
for (String curDir : dirsToRead) {
if (StringUtils.isEmpty(curDir)) {
continue;
}

// TODO 递归收集需要采集的目录名称到finalDir中
recursiveSubDir(curDir);
}

if (finalDir.isEmpty()) {
logger.info("No dir ,over.");
return;
}

// 准备同步控制锁,总数数目录个数
int size = finalDir.size();
CountDownLatch latch = new CountDownLatch(size);

// 每个目录创建一个采集线程,并传入同步锁
int threadNum = 0;
for(String curDir: finalDir) {
threadNum ++;
String theadName = "MyTask" + threadNum;
new Thread(new MyRunnable(curDir,threadNum, latch), theadName).start();
}

// 等待采集任务结束
try {
latch.await();
} catch (InterruptedException e) {
// NON-OP
}

long end = System.currentTimeMillis();
logger.info("Over job,takes {} s.", (end -start)/ 1000);
}

采集线程 MyRunnable 的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码private class MyRunnable implements Runnable {
private String dirPath; // 待采目录
private CountDownLatch latch; // 同步控制锁,总数为目标目录个数

public MyRunnable(String dirPath, Integer threadNum, CountDownLatch latch) {
this.dirPath = dirPath;
this.threadNum = threadNum;
this.latch = latch;
}

@Override
public void run() {
try{
// TODO 执行目标目录下的文件采集流程
……
} catch(Execption e) {
// TODO ERROR
} finally {
// 同步锁计数器减一 ,必须放 finally 中
latch.countDown();
}
}
}

启示录

第一,任何改动都可能存在问题,即使只改一行代码,也可能因为粗心大意都存在问题,所以代码测试很重要。

第二,思考一个问题,Thread 类的 stop() 方法,到底能不能停止一个线程呢?从现场发回的堆栈分析看,大量线程都处于 BLOCKED 状态,强制停止线程代码前后,线程数量并没有少,可以肯定 stop() 方法在那个场景下并没有生效。

验证:模拟 IO 阻塞时,强制调用线程的 stop() ,测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
int port = 1234; // 选择一个端口号
// 模拟IO阻塞
ServerSocket serverSocket = null;
Socket clientSocket = null;
try {
serverSocket = new ServerSocket(port);
System.out.println("服务器在端口 " + port + " 上等待连接...");

// 接受客户端连接
clientSocket = serverSocket.accept();
System.out.println("客户端已连接。");

// 接收数据之前暂停
System.out.println("暂停接收数据之前...");
Thread.sleep(10000); // 暂停10秒钟

// 接收数据
InputStream inputStream = clientSocket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String data = reader.readLine();
System.out.println("接收到数据: " + data);

// 接收数据之后暂停
System.out.println("暂停接收数据之后...");
Thread.sleep(1000000); // 暂停10秒钟
} catch (Exception e) {
e.printStackTrace();
}
}, "thread1-1");
thread1.start();

try {
System.out.println("主线程休眠3秒后调用 thread.stop() 方法。");
Thread.sleep(3000);
thread1.stop();
System.out.println("thread1 status :" + thread1.getState().name());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

这里通过 ServerSocket 可以模拟 IO 阻塞,验证结果:
在这里插入图片描述
确定两点:

  1. 调用 stop() 方法后,那个线程还在,程序没有退出。
  2. ServerSocket.accept() 方法调用后,线程虽然阻塞了,但是主线程中打印的状态仍然是 Runnable 。

结论:对于因 IO 阻塞的线程,stop() 方法无效。

第三,回顾一下线程状态图:

image.png
注:此图来自博客园某博主的图片,原图链接

根据这个图的状态,在非 IO 阻塞状态下,比如因 sleep()、latch.wait() 等操作而进入阻塞状态时,Thread.stop() 方法是可以停止线程的,前面的测试方法 Socket 接收操作换成 sleep() 是可以结束的。

第四,Quartz 定时调度,感觉大部分情况下,任务都不需要并行,所以使用的时候一定要知道禁止任务并行的技术点。对于耗时长的任务,有必要禁止并行。再手动敲一遍这个注解 @DisallowConcurrentExecution,字面意思是「不允许并发执行」。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%