LockSupport的park等待的底层实现

从上一篇文章中的JDK的延迟队列中,最终是通过LockSupport.park实现线程的等待,那么底层是如何实现等待和超时等待的

LockSupport的park和unpark的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
arduino复制代码public static void park() {
UNSAFE.park(false, 0L);
}

public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}

public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

从上面可以看到实际LockSupport.park是通过Unsafe的的park方法实现,从下面的方法可以看出这个是一个native方法.

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码/**
* Blocks current thread, returning when a balancing
* {@code unpark} occurs, or a balancing {@code unpark} has
* already occurred, or the thread is interrupted, or, if not
* absolute and time is not zero, the given time nanoseconds have
* elapsed, or if absolute, the given deadline in milliseconds
* since Epoch has passed, or spuriously (i.e., returning for no
* "reason"). Note: This operation is in the Unsafe class only
* because {@code unpark} is, so it would be strange to place it
* elsewhere.
*/
public native void park(boolean isAbsolute, long time);

JVM的Unsafe的park方法

从下面JDK中代码中可以thread的Parker的对象的park方法进行一段时间的等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scss复制代码UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
EventThreadPark event;

JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
if (event.should_commit()) {
const oop obj = thread->current_park_blocker();
if (time == 0) {
post_thread_park_event(&event, obj, min_jlong, min_jlong);
} else {
if (isAbsolute != 0) {
post_thread_park_event(&event, obj, min_jlong, time);
} else {
post_thread_park_event(&event, obj, time, min_jlong);
}
}
}
HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END

Thread.hpp的文件中内部定义的Park对象

1
2
3
4
arduino复制代码  private:
Parker _parker;
public:
Parker* parker() { return &_parker; }

下面是Os_posix.cpp中是Linux中实现的Park的park的实现方式

  1. 首先将_counter的变量通过CAS设置为0,返回就旧的值,如果之前是大于0,则说明是允许访问,不用阻塞,直接返回。
  2. 获取当前线程。
  3. 判断线程是否是中断中,如果是,则直接返回,(也就是说线程处于中断状态下会忽略park,不会阻塞等待)
  4. 判断如果传入的time参数小于0 或者 是绝对时间并且time是0,则直接返回,(上面的Unsafe调用park传入的参数是 false、0,所以不满足这种情况)
  5. 如果time大于0,则转换成绝对时间。
  6. 创建ThreadBlockInVM对象,并且调用pthread_mutex_trylock获取线程互斥锁,如果没有获取到锁,则直接返回,
  7. 判断_counter变量是否大于0,如果是,则重置_counter为0,释放线程锁,直接返回。
  8. 调用 OrderAccess::fence(); 加入内存屏障,禁止指令重排序,确保加锁和释放锁的指令的顺序。
  9. 创建OSThreadWaitState对象,
  10. 判断time是否大于0,如果是0,则调用pthread_cond_wait进行等待,如果不是0,然后调用pthread_cond_timedwait进行时间参数为absTime的等待,
  11. 调用pthread_mutex_unlock进行释放_mutex锁,
  12. 再次调用OrderAccess::fence()禁止指令重排序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
scss复制代码 // Parker::park decrements count if > 0, else does a condvar wait.  Unpark
// sets count to 1 and signals condvar. Only one thread ever waits
// on the condvar. Contention seen when trying to park implies that someone
// is unparking you, so don't wait. And spurious returns are fine, so there
// is no need to track notifications.

void Parker::park(bool isAbsolute, jlong time) {

// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(&_counter, 0) > 0) return;

JavaThread *jt = JavaThread::current();

// Optional optimization -- avoid state transitions if there's
// an interrupt pending.
if (jt->is_interrupted(false)) {
return;
}

// Next, demultiplex/decode time arguments
struct timespec absTime;
if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
return;
}
if (time > 0) {
to_abstime(&absTime, time, isAbsolute, false);
}

// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);

// Can't access interrupt state now that we are _thread_blocked. If we've
// been interrupted since we checked above then _counter will be > 0.

// Don't wait if cannot get lock since interference arises from
// unparking.
if (pthread_mutex_trylock(_mutex) != 0) {
return;
}

int status;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait(&_cond[_cur_index], _mutex);
assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT),
status, "cond_wait");
}else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
assert_status(status == 0 || status == ETIMEDOUT,
status, "cond_timedwait");
}
_cur_index = -1;

_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();

Linux操作系统是如何实现pthread_cond_timedwait进行时间等待的

pthread_cond_timedwait函数位于glibc中pthread_cond_wait.c, 可以看到是调用
__pthread_cond_wait_common实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
arduino复制代码/* See __pthread_cond_wait_common.  */
int
___pthread_cond_timedwait64 (pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct __timespec64 *abstime)
{
/* Check parameter validity. This should also tell the compiler that
it can assume that abstime is not NULL. */
if (! valid_nanoseconds (abstime->tv_nsec))
return EINVAL;

/* Relaxed MO is suffice because clock ID bit is only modified
in condition creation. */
unsigned int flags = atomic_load_relaxed (&cond->__data.__wrefs);
clockid_t clockid = (flags & __PTHREAD_COND_CLOCK_MONOTONIC_MASK)
? CLOCK_MONOTONIC : CLOCK_REALTIME;
return __pthread_cond_wait_common (cond, mutex, clockid, abstime);
}

下面__pthread_cond_wait_common是实现通过__futex_abstimed_wait_cancelable64实现时间等待

1
2
3
4
5
6
7
8
9
arduino复制代码static __always_inline int
__pthread_cond_wait_common (pthread_cond_t *cond, pthread_mutex_t *mutex,
clockid_t clockid, const struct __timespec64 *abstime)
{
''省略''`
err = __futex_abstimed_wait_cancelable64 (
cond->__data.__g_signals + g, 0, clockid, abstime, private);
''省略''`
}

__futex_abstimed_wait_cancelable64是调用__futex_abstimed_wait_common

1
2
3
4
5
6
7
8
9
arduino复制代码int
__futex_abstimed_wait_cancelable64 (unsigned int* futex_word,
unsigned int expected, clockid_t clockid,
const struct __timespec64* abstime,
int private)
{
return __futex_abstimed_wait_common (futex_word, expected, clockid,
abstime, private, true);
}

__futex_abstimed_wait_common下面则是通过判断平台是64位或者32位,调用__futex_abstimed_wait_common64或者__futex_abstimed_wait_common32

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
arduino复制代码static int
__futex_abstimed_wait_common (unsigned int* futex_word,
unsigned int expected, clockid_t clockid,
const struct __timespec64* abstime,
int private, bool cancel)
{
int err;
unsigned int clockbit;

/* Work around the fact that the kernel rejects negative timeout values
despite them being valid. */
if (__glibc_unlikely ((abstime != NULL) && (abstime->tv_sec < 0)))
return ETIMEDOUT;

if (! lll_futex_supported_clockid (clockid))
return EINVAL;

clockbit = (clockid == CLOCK_REALTIME) ? FUTEX_CLOCK_REALTIME : 0;
int op = __lll_private_flag (FUTEX_WAIT_BITSET | clockbit, private);

#ifdef __ASSUME_TIME64_SYSCALLS
err = __futex_abstimed_wait_common64 (futex_word, expected, op, abstime,
private, cancel);
#else
bool need_time64 = abstime != NULL && !in_time_t_range (abstime->tv_sec);
if (need_time64)
{
err = __futex_abstimed_wait_common64 (futex_word, expected, op, abstime,
private, cancel);
if (err == -ENOSYS)
err = -EOVERFLOW;
}
else
err = __futex_abstimed_wait_common32 (futex_word, expected, op, abstime,
private, cancel);
#endif

switch (err)
{
case 0:
case -EAGAIN:
case -EINTR:
case -ETIMEDOUT:
case -EINVAL:
case -EOVERFLOW: /* Passed absolute timeout uses 64 bit time_t type, but
underlying kernel does not support 64 bit time_t futex
syscalls. */
return -err;

case -EFAULT: /* Must have been caused by a glibc or application bug. */
case -ENOSYS: /* Must have been caused by a glibc bug. */
/* No other errors are documented at this time. */
default:
futex_fatal_error ();
}
}

__futex_abstimed_wait_common64是调用INTERNAL_SYSCALL_CANCEL宏定义实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
arduino复制代码static int
__futex_abstimed_wait_common64 (unsigned int* futex_word,
unsigned int expected, int op,
const struct __timespec64* abstime,
int private, bool cancel)
{
if (cancel)
return INTERNAL_SYSCALL_CANCEL (futex_time64, futex_word, op, expected,
abstime, NULL /* Unused. */,
FUTEX_BITSET_MATCH_ANY);
else
return INTERNAL_SYSCALL_CALL (futex_time64, futex_word, op, expected,
abstime, NULL /* Ununsed. */,
FUTEX_BITSET_MATCH_ANY);
}

系统调用的的宏定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ini复制代码/* Issue a syscall defined by syscall number plus any other argument
required. Any error will be returned unmodified (including errno). */
#define INTERNAL_SYSCALL_CANCEL(...) \
({ \
long int sc_ret; \
if (NO_SYSCALL_CANCEL_CHECKING) \
sc_ret = INTERNAL_SYSCALL_CALL (__VA_ARGS__); \
else \
{ \
int sc_cancel_oldtype = LIBC_CANCEL_ASYNC (); \
sc_ret = INTERNAL_SYSCALL_CALL (__VA_ARGS__); \
LIBC_CANCEL_RESET (sc_cancel_oldtype); \
} \
sc_ret; \
})

总结

主要对LockSupport的park等待实现的底层实现的浅析,针对于Linux的系统调用还没有找到源码,后续会继续跟踪,希望有读者知道的满帆可以告知下,谢谢。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%