开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

这才是图文并茂:我写了1万多字,就是为了让你了解AQS是怎么

发表于 2020-11-18

前言

如果你想深入研究Java并发的话,那么AQS一定是绕不开的一块知识点,Java并发包很多的同步工具类底层都是基于AQS来实现的,比如我们工作中经常用的Lock工具ReentrantLock、栅栏CountDownLatch、信号量Semaphore等,而且关于AQS的知识点也是面试中经常考察的内容,所以,无论是为了更好的使用还是为了应付面试,深入学习AQS都很有必要。

CAS

学习AQS之前,我们有必要了解一个知识点,就是AQS底层中大量使用的CAS,关于CAS,大家应该都不陌生,如果还有哪位同学不清楚的话,可以看看我之前的文章《面试必备知识点:悲观锁和乐观锁的那些事儿》 ,这里不多复述,哈哈,给自己旧文章加了阅读量

此时,好几块搬砖朝我飞了过来。。。。。

好吧,开个玩笑,还是大概讲解一下吧,了解的同学可以跳过这一段。

CAS是乐观锁的一种思想,它假设线程对资源的访问是没有冲突的,同时所有的线程执行都不需要等待,可以持续执行。 如果有冲突的话,就用比较+交换的方式来检测冲突,有冲突就不断重试。

CAS的全称是Compare-and-Swap,也就是比较并交换,它包含了三个参数:V,A,B,V表示要读写的内存位置,A表示旧的预期值,B表示新值,当执行CAS时,只有当V的值等于预期值A时,才会把V的值改为B,这样的方式可以让多个线程同时去修改,但也会因为线程操作失败而不断重试,对CPU有一定程序上的开销。

AQS简介

本文主角正式登场。

AQS,全名AbstractQueuedSynchronizer,是一个抽象类的队列式同步器,它的内部通过维护一个状态volatile int state(共享资源),一个FIFO线程等待队列来实现同步功能。

state用关键字volatile修饰,代表着该共享资源的状态一更改就能被所有线程可见,而AQS的加锁方式本质上就是多个线程在竞争state,当state为0时代表线程可以竞争锁,不为0时代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,这些线程会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。

而这个等待队列其实就相当于一个CLH队列,用一张原理图来表示大致如下:

基础定义

AQS支持两种资源分享的方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

自定义的同步器继承AQS后,只需要实现共享资源state的获取和释放方式即可,其他如线程队列的维护(如获取资源失败入队/唤醒出队等)等操作,AQS在顶层已经实现了,

AQS代码内部提供了一系列操作锁和线程队列的方法,主要操作锁的方法包含以下几个:

  • compareAndSetState():利用CAS的操作来设置state的值
  • tryAcquire(int):独占方式获取锁。成功则返回true,失败则返回false。
  • tryRelease(int):独占方式释放锁。成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式释放锁。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式释放锁。如果释放后允许唤醒后续等待结点返回true,否则返回false。

像ReentrantLock就是实现了自定义的tryAcquire-tryRelease,从而操作state的值来实现同步效果。

除此之外,AQS内部还定义了一个静态类Node,表示CLH队列的每一个结点,该结点的作用是对每一个等待获取资源做了封装,包含了需要同步的线程本身、线程等待状态…..

我们可以看下该类的一些重点变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
arduino复制代码static final class Node {
/** 表示共享模式下等待的Node */
static final Node SHARED = new Node();
/** 表示独占模式下等待的mode */
static final Node EXCLUSIVE = null;

/** 下面几个为waitStatus的具体值 */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile int waitStatus;

/** 表示前面的结点 */
volatile Node prev;
/** 表示后面的结点 */
volatile Node next;
/**当前结点装载的线程,初始化时被创建,使用后会置空*/
volatile Thread thread;
/**链接到下一个节点的等待条件,用到Condition的时候会使用到*/
Node nextWaiter;

}

代码里面定义了一个表示当前Node结点等待状态的字段waitStatus,该字段的取值包含了CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,这五个值代表了不同的特定场景:

  • CANCELLED:表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL:表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL(记住这个-1的值,因为后面我们讲的时候经常会提到)
  • CONDITION:表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。(注:Condition是AQS的一个组件,后面会细说)
  • PROPAGATE:共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

也就是说,当waitStatus为负值表示结点处于有效等待状态,为正值的时候表示结点已被取消。

在AQS内部中还维护了两个Node对象head和tail,一开始默认都为null

1
2
java复制代码private transient volatile Node head;
private transient volatile Node tail;

讲完了AQS的一些基础定义,我们就可以开始学习同步的具体运行机制了,为了更好的演示,我们用ReentrantLock作为使用入口,一步步跟进源码探究AQS底层是如何运作的,这里说明一下,因为ReentrantLock底层调用的AQS是独占模式,所以下文讲解的AQS源码也是针对独占模式的操作

好了,热身正式结束,来吧。

独占模式

加锁过程

我们都知道,ReentrantLock的加锁和解锁方法分别为lock()和unLock(),我们先来看获取锁的方法,

1
2
3
4
5
6
scss复制代码final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

逻辑很简单,线程进来后直接利用CAS尝试抢占锁,如果抢占成功state值回被改为1,且设置对象独占锁线程为当前线程,否则就调用acquire(1)再次尝试获取锁。

我们假定有两个线程A和B同时竞争锁,A进来先抢占到锁,此时的AQS模型图就类似这样:

继续走下面的方法,

1
2
3
4
5
scss复制代码public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

acquire包含了几个函数的调用,

tryAcquire:尝试直接获取锁,如果成功就直接返回;

addWaiter:将该线程加入等待队列FIFO的尾部,并标记为独占模式;

acquireQueued:线程阻塞在等待队列中获取锁,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

selfInterrupt:自我中断,就是既拿不到锁,又在等待时被中断了,线程就会进行自我中断selfInterrupt(),将中断补上。

我们一个个来看源码,并结合上面的两个线程来做场景分析。

tryAcquire

不用多说,就是为了再次尝试获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

当线程B进来后,nonfairTryAcquire方法首先会获取state的值,如果为0,则正常获取该锁,不为0的话判断是否是当前线程占用了,是的话就累加state的值,这里的累加也是为了配合释放锁时候的次数,从而实现可重入锁的效果。

当然,因为之前锁已经被线程A占领了,所以这时候tryAcquire会返回false,继续下面的流程。

addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ini复制代码private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

这段代码首先会创建一个和当前线程绑定的Node节点,Node为双向链表。此时等待队列中的tail指针为空,直接调用enq(node)方法将当前线程加入等待队列尾部,然后返回当前结点的前驱结点,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ini复制代码private Node enq(final Node node) {
// CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) {
// 队列为空,初始化一个Node结点作为Head结点,并将tail结点也指向它
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 把当前结点插入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

第一遍循环时,tail指针为空,初始化一个Node结点,并把head和tail结点都指向它,然后第二次循环进来之后,tail结点不为空了,就将当前的结点加入到tail结点后面,也就是这样:

todo 如果此时有另一个线程C进来的话,发现锁已经被A拿走了,然后队列里已经有了线程B,那么线程C就只能乖乖排到线程B的后面去,

acquireQueued

接着解读方法,通过tryAcquire()和addWaiter(),我们的线程还是没有拿到资源,并且还被排到了队列的尾部,如果让你来设计的话,这个时候你会怎么处理线程呢?其实答案也很简单,能做的事无非两个:

1、循环让线程再抢资源。但仔细一推敲就知道不合理,因为如果有多个线程都参与的话,你抢我也抢只会降低系统性能

2、进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源

毫无疑问,选择2更加靠谱,acquireQueued方法做的也是这样的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
ini复制代码final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 标记是否会被中断
boolean interrupted = false;
// CAS自旋
for (;;) {
// 获取当前结点的前结点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 获取锁失败,则将此线程对应的node的waitStatus改为CANCEL
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驱结点等待状态为"SIGNAL",那么自己就可以安心等待被唤醒了
return true;
if (ws > 0) {
/*
* 前驱结点被取消了,通过循环一直往前找,直到找到等待状态有效的结点(等待状态值小于等于0) ,
* 然后排在他们的后边,至于那些被当前Node强制"靠后"的结点,因为已经被取消了,也没有引用链,
* 就等着被GC了
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前驱正常,那就把前驱的状态设置成SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

acquireQueued方法的流程是这样的:

1、CAS自旋,先判断当前传入的Node的前结点是否为head结点,是的话就尝试获取锁,获取锁成功的话就把当前结点置为head,之前的head置为null(方便GC),然后返回

2、如果前驱结点不是head或者加锁失败的话,就调用shouldParkAfterFailedAcquire,将前驱节点的waitStatus变为了SIGNAL=-1,最后执行parkAndChecknIterrupt方法,调用LockSupport.park()挂起当前线程,parkAndCheckInterrupt在挂起线程后会判断线程是否被中断,如果被中断的话,就会重新跑acquireQueued方法的CAS自旋操作,直到获取资源。

ps:LockSupport.park方法会让当前线程进入waitting状态,在这种状态下,线程被唤醒的情况有两种,一是被unpark(),二是被interrupt(),所以,如果是第二种情况的话,需要返回被中断的标志,然后在acquire顶层方法的窗口那里自我中断补上

此时,因为线程A还未释放锁,所以线程B状态都是被挂起的,

到这里,加锁的流程就分析完了,其实整体来说也并不复杂,而且当你理解了独占模式加锁的过程,后面释放锁和共享模式的运行机制也没什么难懂的了,所以整个加锁的过程还是有必要多消化下的,也是AQS的重中之重。

为了方便你们更加清晰理解,我加多一张流程图吧(这个作者也太暖了吧,哈哈)

释放锁

说完了加锁,我们来看看释放锁是怎么做的,AQS中释放锁的方法是release(),当调用该方法时会释放指定量的资源 (也就是锁) ,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

还是一步步看源码吧,

1
2
3
4
5
6
7
8
9
java复制代码public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease

代码上可以看出,核心的逻辑都在tryRelease方法中,该方法的作用是释放资源,AQS里该方法没有具体的实现,需要由自定义的同步器去实现,我们看下ReentrantLock代码中对应方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

tryRelease方法会减去state对应的值,如果state为0,也就是已经彻底释放资源,就返回true,并且把独占的线程置为null,否则返回false。

此时AQS中的数据就会变成这样:

完全释放资源后,当前线程要做的就是唤醒CLH队列中第一个在等待资源的线程,也就是head结点后面的线程,此时调用的方法是unparkSuccessor(),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ini复制代码private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//将head结点的状态置为0
compareAndSetWaitStatus(node, ws, 0);
//找到下一个需要唤醒的结点s
Node s = node.next;
//如果为空或已取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从后向前,直到找到等待状态小于0的结点,前面说了,结点waitStatus小于0时才有效
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到有效的结点,直接唤醒
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}

方法的逻辑很简单,就是先将head的结点状态置为0,避免下面找结点的时候再找到head,然后找到队列中最前面的有效结点,然后唤醒,我们假设这个时候线程A已经释放锁,那么此时队列中排最前边竞争锁的线程B就会被唤醒,

然后被唤醒的线程B就会尝试用CAS获取锁,回到acquireQueued方法的逻辑,

1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码for (;;) {
// 获取当前结点的前结点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}

当线程B获取锁之后,会把当前结点赋值给head,然后原先的前驱结点 (也就是原来的head结点) 去掉引用链,方便回收,这样一来,线程B获取锁的整个过程就完成了,此时AQS的数据就会变成这样:

到这里,我们已经分析完了AQS独占模式下加锁和释放锁的过程,也就是tryAccquire->tryRelease这一链条的逻辑,除此之外,AQS中还支持共享模式的同步,这种模式下关于锁的操作核心其实就是tryAcquireShared->tryReleaseShared这两个方法,我们可以简单看下

共享模式

获取锁

AQS中,共享模式获取锁的顶层入口方法是acquireShared,该方法会获取指定数量的资源,成功的话就直接返回,失败的话就进入等待队列,直到获取资源,

1
2
3
4
arduino复制代码public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

该方法里包含了两个方法的调用,

tryAcquireShared:尝试获取一定资源的锁,返回的值代表获取锁的状态。

doAcquireShared:进入等待队列,并循环尝试获取锁,直到成功。

tryAcquireShared

tryAcquireShared在AQS里没有实现,同样由自定义的同步器去完成具体的逻辑,像一些较为常见的并发工具Semaphore、CountDownLatch里就有对该方法的自定义实现,虽然实现的逻辑不同,但方法的作用是一样的,就是获取一定资源的资源,然后根据返回值判断是否还有剩余资源,从而决定下一步的操作。

返回值有三种定义:

  • 负值代表获取失败;
  • 0代表获取成功,但没有剩余的资源,也就是state已经为0;
  • 正值代表获取成功,而且state还有剩余,其他线程可以继续领取

当返回值小于0时,证明此次获取一定数量的锁失败了,然后就会走doAcquireShared方法

doAcquireShared

此方法的作用是将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回,这是它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
ini复制代码private void doAcquireShared(int arg) {
// 加入队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// CAS自旋
for (;;) {
final Node p = node.predecessor();
// 判断前驱结点是否是head
if (p == head) {
// 尝试获取一定数量的锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取锁成功,而且还有剩余资源,就设置当前结点为head,并继续唤醒下一个线程
setHeadAndPropagate(node, r);
// 让前驱结点去掉引用链,方便被GC
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 跟独占模式一样,改前驱结点waitStatus为-1,并且当前线程挂起,等待被唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// head指向自己
setHead(node);
// 如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

看到这里,你会不会一点熟悉的感觉,这个方法的逻辑怎么跟上面那个acquireQueued() 那么类似啊?对的,其实两个流程并没有太大的差别。只是doAcquireShared()比起独占模式下的获取锁上多了一步唤醒后继线程的操作,当获取完一定的资源后,发现还有剩余的资源,就继续唤醒下一个邻居线程,这才符合”共享”的思想嘛。

这里我们可以提出一个疑问,共享模式下,当前线程释放了一定数量的资源,但这部分资源满足不了下一个等待结点的需要的话,那么会怎么样?

按照正常的思维,共享模式是可以多个线程同时执行的才对,所以,多个线程的情况下,如果老大释放完资源,但这部分资源满足不了老二,但能满足老三,那么老三就可以拿到资源。可事实是,从源码设计中可以看出,如果真的发生了这种情况,老三是拿不到资源的,因为等待队列是按顺序排列的,老二的资源需求量大,会把后面量小的老三以及老四、老五等都给卡住。从这一个角度来看,虽然AQS严格保证了顺序,但也降低了并发能力

接着往下说吧,唤醒下一个邻居线程的逻辑在doReleaseShared()中,我们放到下面的释放锁来解析。

释放锁

共享模式释放锁的顶层方法是releaseShared,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

1
2
3
4
5
6
7
arduino复制代码public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

该方法同样包含两部分的逻辑:

tryReleaseShared:释放资源。

doAcquireShared:唤醒后继结点。

跟tryAcquireShared方法一样,tryReleaseShared在AQS中没有具体的实现,由子同步器自己去定义,但功能都一样,就是释放一定数量的资源。

释放完资源后,线程不会马上就收工,而是唤醒等待队列里最前排的等待结点。

doAcquireShared

唤醒后继结点的工作在doReleaseShared()方法中完成,我们可以看下它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码private void doReleaseShared() {
for (;;) {
// 获取等待队列中的head结点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// head结点waitStatus = -1,唤醒下一个结点对应的线程
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继结点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

代码没什么特别的,就是如果等待队列head结点的waitStatus为-1的话,就直接唤醒后继结点,唤醒的方法unparkSuccessor()在上面已经讲过了,这里也没必要再复述。

总的来看,AQS共享模式的运作流程和独占模式很相似,只要掌握了独占模式的流程运转,共享模式什么的不就那样吗,没难度。这也是我为什么共享模式讲解中不画流程图的原因,没必要嘛。

Condition

介绍完了AQS的核心功能,我们再扩展一个知识点,在AQS中,除了提供独占/共享模式的加锁/解锁功能,它还对外提供了关于Condition的一些操作方法。

Condition是个接口,在jdk1.5版本后设计的,基本的方法就是await()和signal()方法,功能大概就对应Object的wait()和notify(),Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现 ,AQS中就定义了一个类ConditionObject来实现了这个接口,

那么它应该怎么用呢?我们可以简单写个demo来看下效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
csharp复制代码public class ConditionDemo {

public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread tA = new Thread(() -> {
lock.lock();
try {
System.out.println("线程A加锁成功");
System.out.println("线程A执行await被挂起");
condition.await();
System.out.println("线程A被唤醒成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("线程A释放锁成功");
}
});

Thread tB = new Thread(() -> {
lock.lock();
try {
System.out.println("线程B加锁成功");
condition.signal();
System.out.println("线程B唤醒线程A");
} finally {
lock.unlock();
System.out.println("线程B释放锁成功");
}
});
tA.start();
tB.start();
}
}

执行main函数后结果输出为:

1
2
3
4
5
6
7
css复制代码线程A加锁成功
线程A执行await被挂起
线程B加锁成功
线程B唤醒线程A
线程B释放锁成功
线程A被唤醒成功
线程A释放锁成功

代码执行的结果很容易理解,线程A先获取锁,然后调用await()方法挂起当前线程并释放锁,线程B这时候拿到锁,然后调用signal唤醒线程A。

毫无疑问,这两个方法让线程的状态发生了变化,我们仔细来研究一下,

翻看AQS的源码,我们会发现Condition中定义了两个属性firstWaiter和lastWaiter,前面说了,AQS中包含了一个FIFO的CLH等待队列,每个Conditon对象就包含这样一个等待队列,而这两个属性分别表示的是等待队列中的首尾结点,

1
2
3
4
arduino复制代码/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

注意:Condition当中的等待队列和AQS主体的同步等待队列是分开的,两个队列虽然结构体相同,但是作用域是分开的

await

先看await()的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scss复制代码public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到等待队列中
Node node = addConditionWaiter();
// 完全释放占有的资源,并返回资源数
int savedState = fullyRelease(node);
int interruptMode = 0;
// 循环判断当前结点是不是在Condition的队列中,是的话挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

当一个线程调用Condition.await()方法,将会以当前线程构造结点,这个结点的waitStatus赋值为Node.CONDITION,也就是-2,并将结点从尾部加入等待队列,然后尾部结点就会指向这个新增的结点,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ini复制代码private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

我们依然用上面的demo来演示,此时,线程A获取锁并调用**Condition.await()**方法后,AQS内部的数据结构会变成这样:

在Condition队列中插入对应的结点后,线程A会释放所持有的资源,走到while循环那层逻辑,

1
2
3
4
5
kotlin复制代码while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

isOnSyncQueue方法的会判断当前的线程节点是不是在同步队列中,这个时候此结点还在Condition队列中,所以该方法返回false,这样的话循环会一直持续下去,线程被挂起,等待被唤醒,此时,线程A的流程暂时停止了。

当线程A调用await()方法挂起的时候,线程B获取到了线程A释放的资源,然后执行signal()方法:

signal

1
2
3
4
5
6
7
java复制代码public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

先判断当前线程是否为获取锁的线程,如果不是则直接抛出异常。 接着调用doSignal()方法来唤醒线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码private void doSignal(Node first) {
// 循环,从队列一直往后找不为空的首结点
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
// CAS循环,将结点的waitStatus改为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 上面已经分析过,此方法会把当前结点加入到等待队列中,并返回前驱结点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

从doSignal的代码中可以看出,这时候程序寻找的是Condition等待队列中首结点firstWaiter的结点,此时该结点指向的是线程A的结点,所以之后的流程作用的都是线程A的结点。

这里分析下transferForSignal方法,先通过CAS自旋将结点waitStatus改为0,然后就把结点放入到同步队列 (此队列不是Condition的等待队列) 中,然后再用CAS将同步队列中该结点的前驱结点waitStatus改为Node.SIGNAL,也就是-1,此时AQS的数据结构大概如下 (额…..少画了个箭头,大家就当head结点是线程A结点的前驱结点就好):

回到await()方法,当线程A的结点被加入同步队列中时,isOnSyncQueue()会返回true,跳出循环,

1
2
3
4
5
6
7
8
9
10
11
scss复制代码while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);

接着执行acquireQueued()方法,这里就不用多说了吧,尝试重新获取锁,如果获取锁失败继续会被挂起,直到另外线程释放锁才被唤醒。

所以,当线程B释放完锁后,线程A被唤醒,继续尝试获取锁,至此流程结束。

对于这整个通信过程,我们可以画一张流程图展示下:

总结

说完了Condition的使用和底层运行机制,我们再来总结下它跟普通 wait/notify 的比较,一般这也是问的比较多的,Condition大概有以下两点优势:

  • Condition 需要结合 Lock 进行控制,使用的时候要注意一定要对应的unlock(),可以对多个不同条件进行控制,只要new 多个 Condition对象就可以为多个线程控制通信,wait/notify 只能和 synchronized 关键字一起使用,并且只能唤醒一个或者全部的等待队列;
  • Condition 有类似于 await 的机制,因此不会产生加锁方式而产生的死锁出现,同时底层实现的是 park/unpark 的机制,因此也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,但是 wait/notify 会产生先唤醒再挂起的死锁。

最后

对AQS的源码分析到这里就全部结束了,虽然还有很多知识点没讲解,比如公平锁/非公平锁下AQS是怎么作用的,篇幅所限,部分知识点没有扩展还请见谅,尽管如此,如果您能看完文章的话,相信对AQS也算是有足够的了解了。

回顾本篇文章,我们不难发现,无论是独占还是共享模式,或者结合是Condition工具使用,AQS本质上的同步功能都是通过对锁和队列中结点的操作来实现的,从设计上讲,AQS的组成结构并不算复杂,底层的运转机制也不会很绕,所以,大家如果看源码的时候觉得有些困难的话也不用灰心,多看几遍,顺便画个图之类的,理清下流程还是没什么问题的。

当然,自己看得懂是一回事,写出来让别人看懂又是另一回事了,就像这篇文章,我花了好长的时间来准备,又是画图又是理流程的,期间还参考了不少网上大神的博文,肝了几天才算是成文了。虽然我知道本文不算什么高质文,但我也算是费尽心力了,写技术文真是挺累的,大家看的觉得不错的话还请帮忙转发下或点个赞吧!这也是对我最好的鼓励了


作者:鄙人薛某,一个不拘于技术的互联网人,技术三流,吹水一流,想看更多精彩文章可以关注我的公众号哦~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java异步编程指南

发表于 2020-11-17

在我们平时开发中或多或少都会遇到需要调用接口来完成一个功能的需求,这个接口可以是内部系统也可以是外部的,然后等到接口返回数据了才能继续其他的业务流程,这就是传统的同步模式。

同步模式虽然简单但缺点也很明显,如果对方服务处理缓慢迟迟未能返回数据,或网络问题导致响应变长,就会阻塞我们调用方的线程,导致我们主流程的耗时latency延长,传统的解决方式是增加接口的超时timeout设置,防止无限期等待。但即使这样还是会占用CPU资源。

在我们做rpc远程调用,redis,数据库访问等比较耗时的网络请求时经常要面对这样的问题,这种业务场景我们可以引入异步的编程思想,即主流程不需要阻塞等待接口返回数据,而是继续往下执行,当真正需要这个接口返回结果时再通过回调或阻塞的方式获取,此时我们的主流程和异步任务是并行执行的。

Java中实现异步主要是通过Future,CompletableFuture,Guava ListenableFuture以及一些异步响应式框架如RxJava实现。

下面我们主要看下这几种组件适用的业务场景和需要注意的地方,避免踩坑。

一. Future

java.util.concurrent.Future是JDK5引入的,用来获取一个异步计算的结果。你可以使用isDone方法检查计算是否完成,也可以使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

Future的api说明

实际开发中我们一般会结合线程池的submit配合使用,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码package com.javakk;

import java.util.concurrent.*;

public class FutureTest {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool(); // 线程池
Future<String> future = executor.submit(() ->{
Thread.sleep(200); // 模拟接口调用,耗时200ms
return "hello world";
});
// 在输出下面异步结果时主线程可以不阻塞的做其他事情
// TODO 其他业务逻辑

System.out.println("异步结果:"+future.get()); //主线程获取异步结果
// 或者通过下面轮询的方式
// while(!future.isDone());
}
}

// 输出结果:
异步结果:hello world

简单的说我有一个任务,提交给了Future,Future替我完成这个任务,这期间我可以去做别的事情。一段时间之后,我再从Future取出结果。

上面的代码有2个地方需要注意,在15行不建议使用future.get()方式,而应该使用future.get(long timeout, TimeUnit unit), 尤其是在生产环境一定要设置合理的超时时间,防止程序无限期等待下去。另外就是要考虑异步任务执行过程中报错抛出异常的情况,需要捕获future的异常信息。

通过代码可以看出一些简单的异步场景可以使用Future解决,但是对于结果的获取却不是很方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式相当于把异步变成了同步,显然和异步编程的初衷相违背,轮询的方式又会浪费CPU资源。

Future没有提供通知的机制,就是回调,我们无法知道它什么时间完成任务。

而且在复杂一点的情况下,比如多个异步任务的场景,一个异步任务依赖上一个异步任务的执行结果,异步任务合并等,Future无法满足需求。

二.ListenableFuture

Google并发包下的listenableFuture对Java原生的future做了扩展,顾名思义就是使用监听器模式实现的回调,所以叫可监听的future。

在我们公司早期的项目里(jdk8之前的版本)都是使用listenableFuture来实现异步编程。

要使用listenableFuture还要结合MoreExecutor线程池,MoreExecutor是对Java原生线程池的封装,比如常用的MoreExecutors.listeningDecorator(threadPool); 修改Java原生线程池的submit方法,封装了future返回listenableFuture。

代码示例如下:

1
2
3
4
5
6
ini复制代码// ListeningExecutorService继承jdk的ExecutorService接口,重写了submit方法,修改返回值类型为ListenableFuture
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
[ListenableFuture](http://javakk.com/tag/listenablefuture "查看更多关于 ListenableFuture 的文章")<String> listenableFuture = executor.submit(() -> {
Thread.sleep(200); // 模拟接口调用,耗时200ms
return "hello world";
});

上面的代码是构造了一个ListenableFuture的异步任务,调用它的结果一般有两种方式:

基于addListener:

1
2
3
4
5
6
7
8
9
10
csharp复制代码listenableFuture.addListener(() -> {
try {
System.out.println("异步结果:" + listenableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}, executor);

// 输出结果:
异步结果:hello world

基于addCallback:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typescript复制代码Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("异步结果:" + result);
}

@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, executor);

// 输出结果:
异步结果:hello world

其实两种方式都是基于回调,具体使用哪种看业务场景。

  • addListener需要自己代码里捕获处理异常情况,最好设置超时时间
  • addCallback把正常返回和异常情况做了分离,方便我们针对不同情况做处理

另外Futures里还有很多其他的api,可以满足我们负责场景,比如transform()可以处理异步任务之间的依赖情况,allAsList()将多个ListenableFuture合并成一个。

三. CompletableFuture

如果你们公司的jdk是8或以上的版本,那可以直接使用CompletableFuture类来实现异步编程。

Java8新增的CompletableFuture类借鉴了Google Guava的ListenableFuture,它包含50多个方法,默认使用forkJoinPool线程池,提供了非常强大的Future扩展功能,可以帮助我们简化异步编程的复杂性,结合函数式编程,通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的多种方法,可以满足大部分异步回调场景。

CompletableFuture的api

虽然方法很多但有个特征:

  • 以Async结尾的方法签名表示是在异步线程里执行,没有以Async结尾的方法则是由主线程调用
  • 如果参数里有Runnable类型,则没有返回结果,即纯消费的方法
  • 如果参数里没有指定executor则默认使用forkJoinPool线程池,指定了则以指定的线程池来执行任务

下面就来看下常用的几种api代码示例:

转换 : thenApplyAsync

1
2
3
4
5
6
7
8
9
10
11
rust复制代码CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() ->
"hello"
);
// f2依赖f1的结果做转换
CompletableFuture<String> f2 = f1.thenApplyAsync(t ->
t + " world"
);
System.out.println("异步结果:" + f2.get());

// 输出结果:
异步结果:hello world

这里先说明一下,示例代码只关注核心功能,如果要实际使用需要考虑超时和异常情况,大家需要注意。

在上面的代码中异步任务f2需要异步任务f1的结果才能执行,但对于我们的主线程来说,无须等到f1返回结果后再调用函数f2,即不会阻塞主流程,而是告诉CompletableFuture当执行完了f1的方法再去执行f2,只有当需要最后的结果时再获取。

组合 : thenComposeAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rust复制代码CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() ->
"hello"
);
// f2虽然依赖f1的结果,但不会等待f1结果返回,而是再包装成一个future返回
CompletableFuture<String> f2 = f1.thenComposeAsync(t ->
CompletableFuture.supplyAsync(() ->
t + " world"
)
);
// 等到真正调用的时候再执行f2里的逻辑
System.out.println("异步结果:" + f2.get());

// 输出结果:
异步结果:hello world

通过代码注释能看出thenCompose相当于flatMap,避免CompletableFuture<CompletableFuture<String>>这种写法。

这也是thenCompose和thenApply的区别,通过查看api也能看出:

thenApply:

1
2
3
4
typescript复制代码public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}

thenCompose:

1
2
3
4
typescript复制代码public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(screenExecutor(executor), fn);
}

合并 : thenCombineAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ini复制代码CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return " world";
});
CompletableFuture<String> f3 = f1.thenCombineAsync(f2, (t1, t2) ->
t1 + t2
);
long time = System.currentTimeMillis();
System.out.println("异步结果:" + f3.get());
System.out.println("耗时:" + (System.currentTimeMillis() - time));

// 输出结果:
异步结果:hello world
耗时:1002

从代码输出结果可以看到两个异步任务f1、f2是并行执行,彼此无先后依赖顺序,thenCombineAsync适合将两个并行执行的异步任务的结果合并返回成一个新的future。

还有一个类似的方法thenAcceptBoth也是合并两个future的结果,但是不会返回新的值,内部消费掉了。

二选一 : applyToEitherAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码Random rand = new Random();
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});
CompletableFuture<String> f3 = f1.applyToEitherAsync(f2, t -> t);
long time = System.currentTimeMillis();
System.out.println("异步结果:" + f3.get());
System.out.println("耗时:" + (System.currentTimeMillis() - time));

输出的结果有时候是hello 有时候是world,哪个future先执行完就根据它的结果计算,取两个future最先返回的。

这里要说明一点,如果两个future是同时返回结果,那么applyToEitherAsync永远以第一个future的结果为准,大家可以把上面代码的Thread.sleep注释掉测试下。

另外acceptEither方法和这个类似,但是没有返回值。

allOf / anyOf

前面讲的compose,combine,either都是处理两个future的方法,如果是超过2个的可以使用allOf或anyOf

allOf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
ini复制代码CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "java老k";
});
List<CompletableFuture<String>> list = new ArrayList<>();
list.add(f1);
list.add(f2);
list.add(f3);

CompletableFuture<Void> f4 = CompletableFuture.allOf(list.toArray(new CompletableFuture[]{}));
long time = System.currentTimeMillis();
f4.thenRunAsync(() ->
list.forEach(f -> {
try {
System.out.println("异步结果:" + f.get());
} catch (Exception e) {
e.printStackTrace();
}
})
);
f4.get();
System.out.println("耗时:" + (System.currentTimeMillis() - time));
// 输出结果:
耗时:1004
异步结果:hello
异步结果:world
异步结果:java老k

allOf方法是当所有的CompletableFuture都执行完后执行计算,无返回值。

anyOf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ini复制代码Random rand = new Random(); // 随机数
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000)); // 模拟接口调用耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "java老k";
});

CompletableFuture<Object> f4 = CompletableFuture.anyOf(f1, f2, f3);
long time = System.currentTimeMillis();
System.out.println("异步结果:" + f4.get());
System.out.println("耗时:" + (System.currentTimeMillis() - time));

// 输出结果:
异步结果:java老k
耗时:1075

多次执行输出的结果不一样,anyOf方法当任意一个CompletableFuture执行完后就会执行计算。

虽然说CompletableFuture更适合I/O场景,但使用时一定要结合具体业务,比如说有些公共方法处理异步任务时需要考虑异常情况,这时候使用CompletableFuture.handle(BiFunction<? super T, Throwable, ? extends U> fn)更合适,handle方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出。

CompletableFuture还有一个坑需要注意:如果线上流量比较大的情况下会出现响应缓慢的问题。

因为CompletableFuture默认使用的线程池是forkJoinPool,当时对一台使用了CompletableFuture实现异步回调功能的接口做压测,通过监控系统发现有大量的ForkJoinPool.commonPool-worker-* 线程处于等待状态,进一步分析dump信息发现是forkJoinPool的makeCommonPool问题,如下图:

看到这大家应该清楚了,如果在项目里没有设置java.util.concurrent.ForkJoinPool.common.parallelism的值,那么forkJoinPool线程池的线程数就是(cpu-1),我们测试环境的机器是2核,这样实际执行任务的线程数只有1个,当有大量请求过来时,如果有耗时高的io操作,势必会造成更多的线程等待,进而拖累服务响应时间。

解决方案一个是设置java.util.concurrent.ForkJoinPool.common.parallelism这个值(要在项目启动时指定),或者指定线程池不使用默认的forkJoinPool。

forkJoinPoll线程池不了解的可以看下这篇文章:线程池ForkJoinPool简介

线程数如何设置可以参考《Java并发编程实战》这本书给出的建议,如下图:

线程池设置线程数公式:

threads = N CPU

U CPU
(1 + W/C)

其中:

  • N CPU 是处理器的核数
  • U CPU 是期望的CPU利用率(介于0和1之间)
  • W/C是等待时间与计算时间的比率

网上也有这么区分的:

如果服务是cpu密集型的,设置为电脑的核数

如果服务是io密集型的,设置为电脑的核数*2

其实我觉得并不严谨,尤其是io密集型的还要参考QPS和web服务器的配置。

线程池使用不当造成的后果和分析可以在推荐阅读里了解。

今天主要讲了java实现异步编程的几种方式,大家可以结合自己的实际情况参考,下次有时间会跟大家分享下我们另外一个项目如何使用RxJava实现的全异步化服务。

文章来源:javakk.com/225.html

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

带你解惑大厂必会使用的 Lambda表达式、函数式接口 第一

发表于 2020-11-17

前言:

应广大读者的需要,霈哥给大家带来新一期的干货啦!

带你解惑大厂必会使用的 Lambda表达式、函数式接口

[带你解惑大厂必会使用的 Stream流、方法引用]

若对你和身边的朋友有帮助, 抓紧关注 IT霈哥 点赞! 点赞! 点赞! 评论!收藏! 分享给更多的朋友共同学习交流, 每天持续更新离不开你的支持!


第一章 Lambda表达式

1.1 函数式编程思想概述

在数学中,函数就是有输入量、输出量的一套计算方案,也就是“拿什么东西做什么事情”。相对而言,面向对象过分强调“必须通过对象的形式来做事情”,而函数式思想则尽量忽略面向对象的复杂语法——强调做什么,而不是以什么形式做。

做什么,而不是怎么做

我们真的希望创建一个匿名内部类对象吗?不。我们只是为了做这件事情而不得不创建一个对象。我们真正希望做的事情是:将run方法体内的代码传递给Thread类知晓。

传递一段代码——这才是我们真正的目的。而创建对象只是受限于面向对象语法而不得不采取的一种手段方式。那,有没有更加简单的办法?如果我们将关注点从“怎么做”回归到“做什么”的本质上,就会发现只要能够更好地达到目的,过程与形式其实并不重要。

1.2 Lambda的优化

当需要启动一个线程去完成任务时,通常会通过java.lang.Runnable接口来定义任务内容,并使用java.lang.Thread类来启动该线程。

传统写法,代码如下:

1
2
3
4
5
6
7
8
9
10
java复制代码public class Demo03Thread {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("多线程任务执行!");
}
}).start();
}
}

本着“一切皆对象”的思想,这种做法是无可厚非的:首先创建一个Runnable接口的匿名内部类对象来指定任务内容,再将其交给一个线程来启动。

代码分析:

对于Runnable的匿名内部类用法,可以分析出几点内容:

  • Thread类需要Runnable接口作为参数,其中的抽象run方法是用来指定线程任务内容的核心;
  • 为了指定run的方法体,不得不需要Runnable接口的实现类;
  • 为了省去定义一个RunnableImpl实现类的麻烦,不得不使用匿名内部类;
  • 必须覆盖重写抽象run方法,所以方法名称、方法参数、方法返回值不得不再写一遍,且不能写错;
  • 而实际上,似乎只有方法体才是关键所在。

Lambda表达式写法,代码如下:

借助Java 8的全新语法,上述Runnable接口的匿名内部类写法可以通过更简单的Lambda表达式达到等效:

1
2
3
4
5
java复制代码public class Demo04LambdaRunnable {
public static void main(String[] args) {
new Thread(() -> System.out.println("多线程任务执行!")).start(); // 启动线程
}
}

这段代码和刚才的执行效果是完全一样的,可以在1.8或更高的编译级别下通过。从代码的语义中可以看出:我们启动了一个线程,而线程任务的内容以一种更加简洁的形式被指定。

不再有“不得不创建接口对象”的束缚,不再有“抽象方法覆盖重写”的负担,就是这么简单!

1.3 Lambda的格式

标准格式:

Lambda省去面向对象的条条框框,格式由3个部分组成:

  • 一些参数
  • 一个箭头
  • 一段代码

Lambda表达式的标准格式为:

1
rust复制代码(参数类型 参数名称) -> { 代码语句 }

格式说明:

  • 小括号内的语法与传统方法参数列表一致:无参数则留空;多个参数则用逗号分隔。
  • ->是新引入的语法格式,代表指向动作。
  • 大括号内的语法与传统方法体要求基本一致。

匿名内部类与lambda对比:

1
2
3
4
5
6
java复制代码new Thread(new Runnable() {
@Override
public void run() {
System.out.println("多线程任务执行!");
}
}).start();

仔细分析该代码中,Runnable接口只有一个run方法的定义:

  • public abstract void run();

即制定了一种做事情的方案(其实就是一个方法):

  • 无参数:不需要任何条件即可执行该方案。
  • 无返回值:该方案不产生任何结果。
  • 代码块(方法体):该方案的具体执行步骤。

同样的语义体现在Lambda语法中,要更加简单:

1
java复制代码() -> System.out.println("多线程任务执行!")
  • 前面的一对小括号即run方法的参数(无),代表不需要任何条件;
  • 中间的一个箭头代表将前面的参数传递给后面的代码;
  • 后面的输出语句即业务逻辑代码。

参数和返回值:

下面举例演示java.util.Comparator<T>接口的使用场景代码,其中的抽象方法定义为:

  • public abstract int compare(T o1, T o2);

当需要对一个对象数组进行排序时,Arrays.sort方法需要一个Comparator接口实例来指定排序的规则。假设有一个Person类,含有String name和int age两个成员变量:

1
2
3
4
5
6
java复制代码public class Person { 
private String name;
private int age;

// 省略构造器、toString方法与Getter Setter
}

传统写法

如果使用传统的代码对Person[]数组进行排序,写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class Demo05Comparator {
public static void main(String[] args) {
// 本来年龄乱序的对象数组
Person[] array = {
new Person("古力娜扎", 19),
new Person("迪丽热巴", 18),
new Person("马尔扎哈", 20)
};

// 匿名内部类
Comparator<Person> comp = new Comparator<Person>() {
@Override
public int compare(Person o1, Person o2) {
return o1.getAge() - o2.getAge();
}
};
Arrays.sort(array, comp); // 第二个参数为排序规则,即Comparator接口实例

for (Person person : array) {
System.out.println(person);
}
}
}

这种做法在面向对象的思想中,似乎也是“理所当然”的。其中Comparator接口的实例(使用了匿名内部类)代表了“按照年龄从小到大”的排序规则。

代码分析

下面我们来搞清楚上述代码真正要做什么事情。

  • 为了排序,Arrays.sort方法需要排序规则,即Comparator接口的实例,抽象方法compare是关键;
  • 为了指定compare的方法体,不得不需要Comparator接口的实现类;
  • 为了省去定义一个ComparatorImpl实现类的麻烦,不得不使用匿名内部类;
  • 必须覆盖重写抽象compare方法,所以方法名称、方法参数、方法返回值不得不再写一遍,且不能写错;
  • 实际上,只有参数和方法体才是关键。

Lambda写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class Demo06ComparatorLambda {
public static void main(String[] args) {
Person[] array = {
new Person("古力娜扎", 19),
new Person("迪丽热巴", 18),
new Person("马尔扎哈", 20) };

Arrays.sort(array, (Person a, Person b) -> {
return a.getAge() - b.getAge();
});

for (Person person : array) {
System.out.println(person);
}
}
}

省略格式:

省略规则

在Lambda标准格式的基础上,使用省略写法的规则为:

  1. 小括号内参数的类型可以省略;
  2. 如果小括号内有且仅有一个参,则小括号可以省略;
  3. 如果大括号内有且仅有一个语句,则无论是否有返回值,都可以省略大括号、return关键字及语句分号。

备注:掌握这些省略规则后,请对应地回顾本章开头的多线程案例。

可推导即可省略

Lambda强调的是“做什么”而不是“怎么做”,所以凡是可以推导得知的信息,都可以省略。例如上例还可以使用Lambda的省略写法:

1
2
3
4
java复制代码Runnable接口简化:
1. () -> System.out.println("多线程任务执行!")
Comparator接口简化:
2. Arrays.sort(array, (a, b) -> a.getAge() - b.getAge());

1.4 Lambda的前提条件

Lambda的语法非常简洁,完全没有面向对象复杂的束缚。但是使用时有几个问题需要特别注意:

  1. 使用Lambda必须具有接口,且要求接口中有且仅有一个抽象方法。
    无论是JDK内置的Runnable、Comparator接口还是自定义的接口,只有当接口中的抽象方法存在且唯一时,才可以使用Lambda。
  2. 使用Lambda必须具有接口作为方法参数。
    也就是方法的参数或局部变量类型必须为Lambda对应的接口类型,才能使用Lambda作为该接口的实例。

备注:有且仅有一个抽象方法的接口,称为“函数式接口”。

第二章 函数式接口

2.1 概述

函数式接口在Java中是指:有且仅有一个抽象方法的接口。

函数式接口,即适用于函数式编程场景的接口。而Java中的函数式编程体现就是Lambda,所以函数式接口就是可以适用于Lambda使用的接口。只有确保接口中有且仅有一个抽象方法,Java中的Lambda才能顺利地进行推导。

备注:从应用层面来讲,Java中的Lambda可以看做是匿名内部类的简化格式,但是二者在原理上不同。

格式

只要确保接口中有且仅有一个抽象方法即可:

1
2
3
4
java复制代码修饰符 interface 接口名称 {
public abstract 返回值类型 方法名称(可选参数信息);
// 其他非抽象方法内容
}

由于接口当中抽象方法的public abstract是可以省略的,所以定义一个函数式接口很简单:

1
2
3
java复制代码public interface MyFunctionalInterface {	
void myMethod();
}

FunctionalInterface注解

与@Override注解的作用类似,Java 8中专门为函数式接口引入了一个新的注解:@FunctionalInterface。该注解可用于一个接口的定义上:

1
2
3
4
java复制代码@FunctionalInterface
public interface MyFunctionalInterface {
void myMethod();
}

一旦使用该注解来定义接口,编译器将会强制检查该接口是否确实有且仅有一个抽象方法,否则将会报错。不过,即使不使用该注解,只要满足函数式接口的定义,这仍然是一个函数式接口,使用起来都一样。

2.2 常用函数式接口

JDK提供了大量常用的函数式接口以丰富Lambda的典型使用场景,它们主要在java.util.function包中被提供。前文的MySupplier接口就是在模拟一个函数式接口:java.util.function.Supplier<T>。其实还有很多,下面是最简单的几个接口及使用示例。

Supplier接口

java.util.function.Supplier<T>接口,它意味着”供给” , 对应的Lambda表达式需要“对外提供”一个符合泛型类型的对象数据。

抽象方法 : get

仅包含一个无参的方法:T get()。用来获取一个泛型参数指定类型的对象数据。

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class Demo08Supplier {
private static String getString(Supplier<String> function) {
return function.get();
}

public static void main(String[] args) {
String msgA = "Hello";
String msgB = "World";
System.out.println(getString(() -> msgA + msgB));
}
}

求数组元素最大值

使用Supplier接口作为方法参数类型,通过Lambda表达式求出int数组中的最大值。提示:接口的泛型请使用java.lang.Integer类。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class DemoIntArray {
public static void main(String[] args) {
int[] array = { 10, 20, 100, 30, 40, 50 };
printMax(() -> {
int max = array[0];
for (int i = 1; i < array.length; i++) {
if (array[i] > max) {
max = array[i];
}
}
return max;
});
}

private static void printMax(Supplier<Integer> supplier) {
int max = supplier.get();
System.out.println(max);
}
}

Consumer接口

java.util.function.Consumer<T>接口则正好相反,它不是生产一个数据,而是消费一个数据,其数据类型由泛型参数决定。

抽象方法:accept

Consumer接口中包含抽象方法void accept(T t),意为消费一个指定泛型的数据。基本使用如:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码import java.util.function.Consumer;

public class Demo09Consumer {
private static void consumeString(Consumer<String> function , String str) {
function.accept(str);
}

public static void main(String[] args) {
consumeString(s -> System.out.println(s), "后端跟我学");

}
}

Function接口

java.util.function.Function<T,R>接口用来根据一个类型的数据得到另一个类型的数据,前者称为前置条件,后者称为后置条件。有进有出,所以称为“函数Function”。

抽象方法:apply

Function接口中最主要的抽象方法为:R apply(T t),根据类型T的参数获取类型R的结果。使用的场景例如:将String类型转换为Integer类型。

1
2
3
4
5
6
7
8
9
10
java复制代码public class Demo11FunctionApply {
private static void method(Function<String, Integer> function, Str str) {
int num = function.apply(str);
System.out.println(num + 20);
}

public static void main(String[] args) {
method(s -> Integer.parseInt(s) , "10");
}
}

Predicate接口

有时候我们需要对某种类型的数据进行判断,从而得到一个boolean值结果。这时可以使用java.util.function.Predicate<T>接口。

抽象方法:test

Predicate接口中包含一个抽象方法:boolean test(T t)。用于条件判断的场景,条件判断的标准是传入的Lambda表达式逻辑,只要字符串长度大于5则认为很长。

1
2
3
4
5
6
7
8
9
10
java复制代码public class Demo15PredicateTest {
private static void method(Predicate<String> predicate,String str) {
boolean veryLong = predicate.test(str);
System.out.println("字符串很长吗:" + veryLong);
}

public static void main(String[] args) {
method(s -> s.length() > 5, "HelloWorld");
}
}

后续连载文章, 敬请观看:

  • 带你解惑大厂必会使用的 Stream流,方法引用
    观看更多文章,请移步至 如何学习Java技术生态(分布式微服务),霈哥有话说!

若对你和身边的朋友有帮助, 抓紧关注 IT霈哥 点赞! 点赞! 点赞! 评论!收藏! 分享给更多的朋友共同学习交流, 每天持续更新离不开你的支持!

欢迎关注我的B站,将来会发布文章同步视频

1
2
![](https://gitee.com/songjianzaina/juejin_p6/raw/master/img/cfdb24b01fa6729fd734bf363337e1e645254641a86cc83a6ac024e2af3f5fbe)
欢迎关注我的公众号,获取更多资料

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Android 用力过猛!为了组件化改造学习十几家大厂的

发表于 2020-11-17

请点赞关注,你的支持对我意义重大。

🔥 Hi,我是小彭。本文已收录到 GitHub · AndroidFamily 中。这里有 Android 进阶成长知识体系,有志同道合的朋友,关注公众号 [彭旭锐] 带你建立核心竞争力。


这篇文章是 组件化系列文章第 1 篇,相关 Android 工程化专栏完整文章列表:

一、Gradle 基础:

  • 1、Gradle 基础 :Wrapper、Groovy、生命周期、Project、Task、增量
  • 2、Gradle 插件:Plugin、Extension 扩展、NamedDomainObjectContainer、调试
  • 3、Gradle 依赖管理
  • 4、Maven 发布:SHAPSHOT 快照、uploadArchives、Nexus、AAR
  • 5、Gradle 插件案例:EasyPrivacy、so 文件适配 64 位架构、ABI

二、AGP 插件:

  • 1、AGP 构建过程
  • 2、AGP 常用配置项:Manifest、BuildConfig、buildTypes、壳工程、环境切换
  • 3、APG Transform:AOP、TransformTask、增量、字节码、Dex
  • 4、AGP 代码混淆:ProGuard、R8、Optimize、Keep、组件化
  • 5、APK 签名:认证、完整性、v1、v2、v3、Zip、Wallet
  • 6、AGP 案例:多渠道打包

三、组件化开发:

  • 1、方案积累:有赞、蘑菇街、得到、携程、支付宝、手淘、爱奇艺、微信、美团
  • 2、组件化架构基础
  • 3、ARouter 源码分析
  • 4、组件化案例:通用方案
  • 5、组件化案例:组件化事件总线框架
  • 6、组件化案例:组件化 Key-Value 框架

四、AOP 面向切面编程:

  • 1、AOP 基础
  • 2、Java 注解
  • 3、Java 注解处理器:APT、javac
  • 4、Java 动态代理:代理模式、Proxy、字节码
  • 5、Java ServiceLoader:服务发现、SPI、META-INF
  • 6、AspectJ 框架:Transform
  • 7、Javassist 框架
  • 8、ASM 框架
  • 9、AspectJ 案例:限制按钮点击抖动

五、相关计算机基础

  • 1、Base64 编码
  • 2、安全传输:加密、摘要、签名、CA 证书、防窃听、完整性、认证

前言

  • 组件化是 保持整个 App 可持续地进行高质量开发的基础,近年来也是业界一直在积极探索和实践的方向,在深入理解组件化架构的过程中,将不断考验你的技术深度与广度;
  • 实践中我还参考了十几家技术团队的解决方案(例如:美团、有赞、阿里等等),在这个系列里,我将总结我对于组件化的思考和实践。如果能帮上忙,请务必点赞加关注,这真的对我非常重要。

大厂解决方案参考

1 有赞微商城 IOS 端

  • 指出了业务模块化的基本演进:“混乱” -> “中介者模式”->“去中心化”
  • 梳理了模块间通信需求:UI 页面跳转、动作执行及复杂数据传输、一对多的通知广播;
  • 分享了“复杂数据传输”的处理办法:“复制粘贴代码” -> “下沉到common”

2. 有赞微商城 Android 端

  • 提出了 5 个调整方向:抽象基础模块、公共服务去中心化、业务模块服务化、抽象基础组件、单/多模块打包
  • 介绍了基于 3 个基础组件依赖和 1 个 Gradle 插件的落地方案

3. 蘑菇街 App

  • 提到了组件间通信的实现:URL 统跳、协议下沉(需要有一个公共的地方来容纳这些 public protocl)、隐式 Intent
  • 提到了组件生命周期管理:监听系统通知、ModuleManager 中手动遍历调用
  • 提到了组件版本管理与持续集成经验

4. 得到 App

  • 提到了组件生命周期、服务注册的实现
  • 提到了公共层定义组件服务、base层定义通用资源
  • 提到了 implementation 与 runtimeOnly 的代码 / 资源隔离效果;
  • 提到了 JIMU 插件的调试切换、智能配置功能;
  • 提到了 2 种调用组件声明周期的方法: javassist 和反射;
  • 提到了有序初始化组件的解决方案:StartUp、DAG

5. 携程 App

  • 提到了使用数据总线或者 URL 总线实现页面和功能跳转
  • 提到了定制 Gradle 插件来定制化打包

6. 支付宝 App

  • 介绍了基于 Bundle 和 mPaaS 打包插件的 Quinox 客户端框架,用于解决模块化和动态化两大问题;
  • 提到了解决资源 id 重复的解法:改造 aapt,为每个 Bundle 指定不同 packageId;

7. 手淘 App

  • 提到了手淘 All In (聚划算、天猫、彩票)方案的隐患:性能、复用、稳定;
  • 提到了容器框架 Atlas 的工作原理;

8. 爱奇艺 App

  • 提到了使用 Service 进行跨进程组件通信的缺点;
  • 介绍了组件化跨进程通信框架 Andromeda 的功能与核心原理;

9. 微信 App

  • 提到了微信架构演进:简单分层架构 -> 多进程架构 -> 多子工程并行开发架构;
  • 提到了基础工程存在中心化的问题:越来越多的代码/事件很“自然的”被下沉到基础工程;
  • 提出了重塑模块化的 3 个目标:改变通信方式、重新设计模块、约束代码边界
  • 提出了服务注册的通信方式
  • 提出了新颖的接口暴露方法:将文件后缀修改为.api(需配合include_with_api 命令)
  • 设计了模块生命周期:dependency()、configure()、execute()
  • 设计了 pins 工程结构,用于约束代码边界
  • 提出了组件化的考量问题:动态性、隔离性

10. 美团 App

  • 设计了组件消息总线 modular-event
  • 设计了组件化路由框架 WMRouter

参考资料

《Android组件化架构》 —— 苍王 著

有赞技术团队:

《有赞移动应用如何给页面安上“任意门”》 —— qiezi 著

《有赞微商城-Android组件化方案》 —— qiezi 著

《有赞移动 iOS 组件化(模块化)架构设计实践》 —— jackie 著

腾讯技术团队:

《微信Android模块化架构重构实践》 —— carlguo 著

阿里巴巴技术团队:

《你知道支付宝容器化架构是怎么搭建的吗?》 —— 奶盖

《Atlas-手淘组件化框架的前世今生和未来的路》 —— 手淘团队 著

美团技术团队:

《外卖客户端容器化架构的演进》 —— 郭赛 同同 徐宏 著

《Android组件化方案及组件消息总线modular-event实战》 —— 海亮 著

《Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus》 —— 海亮 著

《WMRouter:美团外卖Android开源路由框架》 —— 子健 渊博 云驰 著

《美团猫眼android模块化实战-可能是最详细的模块化实战》 —— happylion_heart 著

爱奇艺技术团队:

《Android组件化跨进程通信框架Andromeda解析》 —— wanderingguy 著

得到技术团队:

《Android彻底组件化demo发布》 —— 格竹子 著

《Android彻底组件化—代码和资源隔离》 —— 格竹子 著

《组件化:代码隔离也难不倒组件的按序初始化》 —— leobert-lan 著

《浅谈Android组件化》 —— 张明庆 著

携程技术团队:

《携程移动App架构优化之旅》 —— 陈浩然 著

蘑菇街技术团队:

《蘑菇街 App 的组件化之路》 —— limboy 著

《蘑菇街 App 的组件化之路·续》 —— limboy 著

其他:

《Android 组件化/模块化 的理解!》 —— 前行的乌龟 著

《Android组件化开发思想与实践》 —— popular_linda 著

《关于Android模块化我有一些话不知当讲不当讲》 —— 流水 著

《Android组件化方案》 —— 张华洋 著

《蘑菇街、滴滴、淘宝、微信的组件化架构解析》 —— 刘小杜 著

《Android业务组件化开发实践》 —— kymjs张涛 著

《Android实现模块 api 化》 —— Tyhj 著


我是小彭,带你构建 Android 知识体系。技术和职场问题,请关注公众号 [彭旭锐] 私信我提问。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

优雅快速的统计千万级别uv

发表于 2020-11-17

定义

PV是page view的缩写,即页面浏览量,通常是衡量一个网络新闻频道或网站甚至一条网络新闻的主要指标。网页浏览数是评价网站流量最常用的指标之一,简称为PV

UV是unique visitor的简写,是指通过互联网访问、浏览这个网页的自然人。

通过以上的概念,可以清晰的看出pv是比较好设计的,网站的每一次被访问,pv都会增加,但是uv就不一定会增加了,uv本质上记录的是按照某个标准划分的自然人,这个标准其实我们可以自己去定义,比如:可以定义同一个IP的访问者为同一个UV,这也是最常见的uv定义之一,另外还有根据cookie定义等等。无论是pv还是uv,都需要一个时间段来加以描述,平时我们所说的pv,uv数量指的都是24小时之内(一个自然日)的数据。

pv相比较uv来说,技术上比较容易一些,今天咱们就来说一说uv的统计,为什么说uv的统计相对来说比较难呢,因为uv涉及到同一个标准下的自然人的去重,尤其是一个uv千万级别的网站,设计一个好的uv统计系统也许并非想象的那么容易。

那我们就来设计一个以一个自然日为时间段的uv统计系统,一个自然人(uv)的定义为同一个来源IP(当然你也可以自定义其他标准),数据量级别假设为每日千万uv的量级。

注意:今天我们讨论的重点是获取到自然人定义的信息之后如何设计uv统计系统,并非是如何获取自然人的定义。uv系统的设计并非想象的那么简单,因为uv可能随着网站的营销策略会出现瞬间大流量,比如网站举办了一个秒杀活动。

基于DB方案

服务端编程有一句名言曰:没有一个表解决不了的功能,如果有那就两个表三个表。一个uv统计系统确实可以基于数据库来实现,而且也不复杂,uv统计的记录表可以类似如下(不要太纠结以下表设计是否合理):

字段 类型 描述
IP varchar(30) 客户端来源ip
DayID int 时间的简写,例如 20190629
其他字段 int 其他字段描述

当一个请求到达服务器,服务端每次需要查询一次数据库是否有当前IP和当前时间的访问记录,如果有,则说明是同一个uv,如果没有,则说明是新的uv记录,插入数据库。当然以上两步也可以写到一个sql语句中:

1
2
3
4
5
6
7
8
sql复制代码if exists( select 1 from table where ip='ip' and dayid=dayid )
  Begin
    return 0
  End
else
  Begin
    insert into table .......
  End

所有基于数据库的解决方案,在数据量大的情况下几乎都更容易出现瓶颈。面对每天千万级别的uv统计,基于数据库的这种方案也许并不是最优的。

优化方案

面对每一个系统的设计,我们都应该沉下心来思考具体的业务。至于uv统计这个业务有几个特点:

  1. 每次请求都需要判断是否已经存在相同的uv记录
  2. 持久化uv数据不能影响正常的业务
  3. uv数据的准确性可以忍受一定程度的误差
哈希表

基于数据库的方案中,在大数据量的情况下,性能的瓶颈引发原因之一就是:判断是否已经存在相同记录,所以要优化这个系统,肯定首先是要优化这个步骤。根据菜菜以前的文章,是否可以想到解决这个问题的数据结构,对,就是哈希表。哈希表根据key来查找value的时间复杂度为O(1)常数级别,可以完美的解决查找相同记录的操作瓶颈。
也许在uv数据量比较小的时候,哈希表也许是个不错的选择,但是面对千万级别的uv数据量,哈希表的哈希冲突和扩容,以及哈希表占用的内存也许并不是好的选择了。假设哈希表的每个key和value 占用10字节,1千万的uv数据大约占用 100M,对于现代计算机来说,100M其实不算大,但是有没有更好的方案呢?

优化哈希表

基于哈希表的方案,在千万级别数据量的情况下,只能算是勉强应付,如果是10亿的数据量呢?那有没有更好的办法搞定10亿级数据量的uv统计呢?这里抛开持久化数据,因为持久化设计到数据库的分表分库等优化策略了,咱们以后再谈。有没有更好的办法去快速判断在10亿级别的uv中某条记录是否存在呢?
为了尽量缩小使用的内存,我们可以这样设计,可以预先分配bit类型的数组,数组的大小是统计的最大数据量的一个倍数,这个倍数可以自定义调整。现在假设系统的uv最大数据量为1千万,系统可以预先分配一个长度为5千万的bit数组,bit占用的内存最小,只占用一位。按照一个哈希冲突比较小的哈希函数计算每一个数据的哈希值,并设置bit数组相应哈希值位置的值为1。由于哈希函数都有冲突,有可能不同的数据会出现相同的哈希值,出现误判,但是我们可以用多个不同的哈希函数来计算同一个数据,来产生不同的哈希值,同时把这多个哈希值的数组位置都设置为1,从而大大减少了误判率,刚才新建的数组为最大数据量的一个倍数也是为了减小冲突的一种方式(容量越大,冲突越小)。当一个1千万的uv数据量级,5千万的bit数组占用内存才几十M而已,比哈希表要小很多,在10亿级别下内存占用差距将会更大。

以下为代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
ini复制代码 class BloomFilter
{
BitArray container = null;
public BloomFilter(int length)
{
container = new BitArray(length);
}

public void Set(string key)
{
var h1 = Hash1(key);
var h2 = Hash2(key);
var h3 = Hash3(key);
var h4 = Hash4(key);
container[h1] = true;
container[h2] = true;
container[h3] = true;
container[h4] = true;

}
public bool Get(string key)
{
var h1 = Hash1(key);
var h2 = Hash2(key);
var h3 = Hash3(key);
var h4 = Hash4(key);

return container[h1] && container[h2] && container[h3] && container[h4];
}

//模拟哈希函数1
int Hash1(string key)
{
int hash = 5381;
int i;
int count;
char[] bitarray = key.ToCharArray();
count = bitarray.Length;
while (count > 0)
{
hash += (hash << 5) + (bitarray[bitarray.Length - count]);
count--;
}
return (hash & 0x7FFFFFFF) % container.Length;

}
int Hash2(string key)
{
int seed = 131; // 31 131 1313 13131 131313 etc..
int hash = 0;
int count;
char[] bitarray = (key+"key2").ToCharArray();
count = bitarray.Length;
while (count > 0)
{
hash = hash * seed + (bitarray[bitarray.Length - count]);
count--;
}

return (hash & 0x7FFFFFFF)% container.Length;
}
int Hash3(string key)
{
int hash = 0;
int i;
int count;
char[] bitarray = (key + "keykey3").ToCharArray();
count = bitarray.Length;
for (i = 0; i < count; i++)
{
if ((i & 1) == 0)
{
hash ^= ((hash << 7) ^ (bitarray[i]) ^ (hash >> 3));
}
else
{
hash ^= (~((hash << 11) ^ (bitarray[i]) ^ (hash >> 5)));
}
count--;
}

return (hash & 0x7FFFFFFF) % container.Length;

}
int Hash4(string key)
{
int hash = 5381;
int i;
int count;
char[] bitarray = (key + "keykeyke4").ToCharArray();
count = bitarray.Length;
while (count > 0)
{
hash += (hash << 5) + (bitarray[bitarray.Length - count]);
count--;
}
return (hash & 0x7FFFFFFF) % container.Length;
}
}

测试程序为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ini复制代码 BloomFilter bf = new BloomFilter(200000000);
int exsitNumber = 0;
int noExsitNumber = 0;

for (int i=0;i < 10000000; i++)
{
string key = $"ip_{i}";
var isExsit= bf.Get(key);
if (isExsit)
{
exsitNumber += 1;
}
else
{
bf.Set(key);
noExsitNumber += 1;
}
}
Console.WriteLine($"判断存在的数据量:{exsitNumber}");
Console.WriteLine($"判断不存在的数据量:{noExsitNumber}");

测试结果:

1
2
复制代码判断存在的数据量:7017
判断不存在的数据量:9992983

image

占用内存40M,误判率不到 千分之1,在这个业务场景下在可接受范围之内。在真正的业务当中,系统并不会在启动之初就分配这么大的bit数组,而是随着冲突增多慢慢扩容到一定容量的。

异步优化

当判断一个数据是否已经存在这个过程解决之后,下一个步骤就是把数据持久化到DB,如果数据量较大或者瞬间数据量较大,可以考虑使用mq或者读写IO比较大的NOSql来代替直接插入关系型数据库。

image

思路一转,整个的uv流程其实也都可以异步化,而且也推荐这么做。
image

更多精彩文章

  • 分布式大并发系列
  • 架构设计系列
  • 趣学算法和数据结构系列
  • 设计模式系列

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SQL 查找"存在",别再用 count 了,很耗费时间的!

发表于 2020-11-17

根据某一条件从数据库表中查询 『有』与『没有』,只有两种状态,那为什么在写SQL的时候,还要SELECT count(*) 呢?

无论是刚入道的程序员新星,还是精湛沙场多年的程序员老白,都是一如既往的count

目前多数人的写法

多次REVIEW代码时,发现如现现象:

业务代码中,需要根据一个或多个条件,查询是否存在记录,不关心有多少条记录。普遍的SQL及代码写法如下

SQL写法:

SELECT count(*) FROM table WHERE a = 1 AND b = 2

Java写法:

int nums = xxDao.countXxxxByXxx(params); if ( nums > 0 ) { //当存在时,执行这里的代码 } else { //当不存在时,执行这里的代码 }

是不是感觉很OK,没有什么问题

优化方案

推荐写法如下:

SQL写法:

SELECT 1 FROM table WHERE a = 1 AND b = 2 LIMIT 1

Java写法:

Integer exist = xxDao.existXxxxByXxx(params); if ( exist != NULL ) { //当存在时,执行这里的代码 } else { //当不存在时,执行这里的代码 }

SQL不再使用count,而是改用LIMIT 1,让数据库查询时遇到一条就返回,不要再继续查找还有多少条了

业务代码中直接判断是否非空即可

总结

根据查询条件查出来的条数越多,性能提升的越明显,在某些情况下,还可以减少联合索引的创建

Java 的知识面非常广,面试问的涉及也非常广泛,重点包括:Java 基础、Java 并发,JVM、MySQL、数据结构、算法、Spring、微服务、MQ 等等,涉及的知识点何其庞大,所以我们在复习的时候也往往无从下手,今天小编给大家带来一套 Java 面试题,题库非常全面,包括 Java 基础、Java 集合、JVM、Java 并发、Spring全家桶、Redis、MySQL、Dubbo、Netty、MQ 等等,包含 Java 后端知识点 2000 + ,部分如下:

资料获取方式:关注公众号:“程序员白楠楠”获取上述资料

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

缓冲池 (buffer pool),这次彻底懂了!!!

发表于 2020-11-17

应用系统分层架构,为了加速数据访问,会把最常访问的数据,放在缓存 (cache) 里,避免每次都去访问数据库。

操作系统,会有缓冲池 (buffer pool) 机制,避免每次访问磁盘,以加速数据的访问。

MySQL 作为一个存储系统,同样具有缓冲池 (buffer pool) 机制,以避免每次查询数据都进行磁盘 IO。

今天,和大家聊一聊 InnoDB 的缓冲池。

InnoDB 的缓冲池缓存什么?有什么用?

缓存表数据与索引数据,把磁盘上的数据加载到缓冲池,避免每次访问都进行磁盘 IO,起到加速访问的作用。

速度快,那为啥不把所有数据都放到缓冲池里?

凡事都具备两面性,抛开数据易失性不说,访问快速的反面是存储容量小:

(1)缓存访问快,但容量小,数据库存储了 200G 数据,缓存容量可能只有 64G;

(2)内存访问快,但容量小,买一台笔记本磁盘有 2T,内存可能只有 16G;

因此,只能把 “最热” 的数据放到 “最近” 的地方,以 “最大限度” 的降低磁盘访问。

如何管理与淘汰缓冲池,使得性能最大化呢?

在介绍具体细节之前,先介绍下 “预读” 的概念。

什么是预读?

磁盘读写,并不是按需读取,而是按页读取,一次至少读一页数据(一般是 4K),如果未来要读取的数据就在页中,就能够省去后续的磁盘 IO,提高效率。

预读为什么有效?

数据访问,通常都遵循 “集中读写” 的原则,使用一些数据,大概率会使用附近的数据,这就是所谓的“局部性原理”,它表明提前加载是有效的,确实能够减少磁盘 IO。

按页 (4K) 读取,和 InnoDB 的缓冲池设计有啥关系?

(1)磁盘访问按页读取能够提高性能,所以缓冲池一般也是按页缓存数据;

(2)预读机制启示了我们,能把一些 “可能要访问” 的页提前加入缓冲池,避免未来的磁盘 IO 操作;

InnoDB 是以什么算法,来管理这些缓冲页呢?

最容易想到的,就是 LRU(Least recently used)。

画外音:memcache,OS 都会用 LRU 来进行页置换管理,但 MySQL 的玩法并不一样。

传统的 LRU 是如何进行缓冲页管理?

最常见的玩法是,把入缓冲池的页放到 LRU 的头部,作为最近访问的元素,从而最晚被淘汰。这里又分两种情况:

(1)页已经在缓冲池里,那就只做 “移至”LRU 头部的动作,而没有页被淘汰;

(2)页不在缓冲池里,除了做 “放入”LRU 头部的动作,还要做 “淘汰”LRU 尾部页的动作;

如上图,假如管理缓冲池的 LRU 长度为 10,缓冲了页号为 1,3,5…,40,7 的页。

假如,接下来要访问的数据在页号为 4 的页中:

(1)页号为 4 的页,本来就在缓冲池里;

(2)把页号为 4 的页,放到 LRU 的头部即可,没有页被淘汰;

画外音:为了减少数据移动,LRU 一般用链表实现。

假如,再接下来要访问的数据在页号为 50 的页中:

(1)页号为 50 的页,原来不在缓冲池里;

(2)把页号为 50 的页,放到 LRU 头部,同时淘汰尾部页号为 7 的页;

传统的 LRU 缓冲池算法十分直观,OS,memcache 等很多软件都在用,MySQL 为啥这么矫情,不能直接用呢?

这里有两个问题:

(1)预读失效;

(2)缓冲池污染;

什么是预读失效?

由于预读 (Read-Ahead),提前把页放入了缓冲池,但最终 MySQL 并没有从页中读取数据,称为预读失效。

如何对预读失效进行优化?

要优化预读失效,思路是:

(1)让预读失败的页,停留在缓冲池 LRU 里的时间尽可能短;

(2)让真正被读取的页,才挪到缓冲池 LRU 的头部;

以保证,真正被读取的热数据留在缓冲池里的时间尽可能长。

具体方法是:

(1)将 LRU 分为两个部分:

  • 新生代 (new sublist)
  • 老生代 (old sublist)

(2)新老生代收尾相连,即:新生代的尾 (tail) 连接着老生代的头 (head);

(3)新页(例如被预读的页)加入缓冲池时,只加入到老生代头部:

  • 如果数据真正被读取(预读成功),才会加入到新生代的头部
  • 如果数据没有被读取,则会比新生代里的 “热数据页” 更早被淘汰出缓冲池

举个例子,整个缓冲池 LRU 如上图:

(1)整个 LRU 长度是 10;

(2)前 70% 是新生代;

(3)后 30% 是老生代;

(4)新老生代首尾相连;

假如有一个页号为 50 的新页被预读加入缓冲池:

(1)50 只会从老生代头部插入,老生代尾部(也是整体尾部)的页会被淘汰掉;

(2)假设 50 这一页不会被真正读取,即预读失败,它将比新生代的数据更早淘汰出缓冲池;

假如 50 这一页立刻被读取到,例如 SQL 访问了页内的行 row 数据:

(1)它会被立刻加入到新生代的头部;

(2)新生代的页会被挤到老生代,此时并不会有页面被真正淘汰;

改进版缓冲池 LRU 能够很好的解决 “预读失败” 的问题。

画外音:但也不要因噎废食,因为害怕预读失败而取消预读策略,大部分情况下,局部性原理是成立的,预读是有效的。

新老生代改进版 LRU 仍然解决不了缓冲池污染的问题。

什么是 MySQL 缓冲池污染?

当某一个 SQL 语句,要批量扫描大量数据时,可能导致把缓冲池的所有页都替换出去,导致大量热数据被换出,MySQL 性能急剧下降,这种情况叫缓冲池污染。

例如,有一个数据量较大的用户表,当执行:

select * from user where name like “%shenjian%”;

虽然结果集可能只有少量数据,但这类 like 不能命中索引,必须全表扫描,就需要访问大量的页:

(1)把页加到缓冲池(插入老生代头部);

(2)从页里读出相关的 row(插入新生代头部);

(3)row 里的 name 字段和字符串 shenjian 进行比较,如果符合条件,加入到结果集中;

(4)… 直到扫描完所有页中的所有 row…

如此一来,所有的数据页都会被加载到新生代的头部,但只会访问一次,真正的热数据被大量换出。

怎么这类扫码大量数据导致的缓冲池污染问题呢?

MySQL 缓冲池加入了一个 “老生代停留时间窗口” 的机制:

(1)假设 T = 老生代停留时间窗口;

(2)插入老生代头部的页,即使立刻被访问,并不会立刻放入新生代头部;

(3)只有满足 “被访问” 并且 “在老生代停留时间” 大于 T,才会被放入新生代头部;

继续举例,假如批量数据扫描,有 51,52,53,54,55 等五个页面将要依次被访问。

如果没有 “老生代停留时间窗口” 的策略,这些批量被访问的页面,会换出大量热数据。

加入 “老生代停留时间窗口” 策略后,短时间内被大量加载的页,并不会立刻插入新生代头部,而是优先淘汰那些,短期内仅仅访问了一次的页。

而只有在老生代呆的时间足够久,停留时间大于 T,才会被插入新生代头部。

上述原理,对应 InnoDB 里哪些参数?

有三个比较重要的参数。

参数:innodb_buffer_pool_size

介绍:配置缓冲池的大小,在内存允许的情况下,DBA 往往会建议调大这个参数,越多数据和索引放到内存里,数据库的性能会越好。

参数:innodb_old_blocks_pct

介绍:老生代占整个 LRU 链长度的比例,默认是 37,即整个 LRU 中新生代与老生代长度比例是 63:37。

画外音:如果把这个参数设为 100,就退化为普通 LRU 了。

参数:innodb_old_blocks_time

介绍:老生代停留时间窗口,单位是毫秒,默认是 1000,即同时满足 “被访问” 与“在老生代停留时间超过 1 秒”两个条件,才会被插入到新生代头部。

总结

(1)缓冲池 (buffer pool) 是一种常见的降低磁盘访问的机制;

(2)缓冲池通常以页 (page) 为单位缓存数据;

(3)缓冲池的常见管理算法是 LRU,memcache,OS,InnoDB 都使用了这种算法;

(4)InnoDB 对普通 LRU 进行了优化:

  • 将缓冲池分为老生代和新生代,入缓冲池的页,优先进入老生代,页被访问,才进入新生代,以解决预读失效的问题
  • 页被访问,且在老生代停留时间超过配置阈值的,才进入新生代,以解决批量数据访问,大量热数据淘汰的问题

思路,比结论重要。

解决了什么问题,比方案重要。

**架构师之路 - 分享技术思路**

相关推荐:

《写一个 cache,要掌握哪些技术点》

《6 条 shell 小技巧,让脚本更专业 | 1 分钟系列》

《MyISAM 与 InnoDB 的索引差异 | 1 分钟系列》

《两个小工具,分析 MySQL 死锁》

调研:缓冲池是缓存的差别是啥?

画外音:长文阅读和转发低,为啥?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Swagger界面丑、功能弱怎么破?用knife4j增强下就

发表于 2020-11-17

在做CRMEB-JAVA开源商城系统时,我们团队用到了uni-app,也是时下比较流行的移动端开发技术,这里边就牵扯到了前后端全部分离的问题,一般在使用java开发前后端分离项目的时候,都会用到Swagger,Swagger 是一个用于生成、描述、调用和可视化 RESTful 风格的 Web api 服务的框架,Swagger 让部署管理和使用功能强大的 API变的简单,但这仅仅是在小型项目里,API接口不多的情况下。

随着,CRMEB-JAVA开源商城系统的不断优化,功能的不断增加,并且前后台都做到了前后端分离,接口数量一度超过了200多个,这就使得系统中一些使用体验变的越来越差,例如:
提交参数为JSON没法格式化,参数错了查找麻烦,返回结果没法折叠,太长了没法看,真是非常的痛苦呀!

但是,作为程序员一定要知道,你遇到的问题别人也一定遇到过,也肯定有热心的大牛解决了这个问题,所以,一个叫knife4j的增强版本Swagger诞生了,他帮我们很好的解决了以上问题,引用一下原作者的话来简单了解下knife4j:

希望knife4j像一把匕首一样小巧,轻量,并且功能强悍,也希望把她做成一个为Swagger接口文档服务的通用性解决方案,而不仅仅只是专注于前端Ui前端.

先上一张项目刚上的knife4j优化过的Swagger的剧照,先睹为快:

看上图是不是一目了然,最重要的是搜索框可以搜索相关api接口快速查询,并且可以在线调试接口,再来张图展示!

knife4j开源项目说明:

结构说明如下:

模块名称 说明
knife4j 为Java MVC框架集成Swagger的增强解决方案,明细请参考上面的介绍说明
knife4j-admin 云端Swagger接口文档注册管理中心,集成gateway网关对任意微服务文档进行组合集成,该模块尚未启动开发
knife4j-extension chrome浏览器的增强swagger接口文档ui,快速渲染swagger资源,该模块尚未启动开发
knife4j-service 为swagger服务的一系列接口服务程序 ,该模块尚未启动开发
knife4j-front knife4j-spring-ui的纯前端静态版本,用于集成非Java语言使用,该模块尚未启动开发
knife4j-vue 该模块是knife4j前端的源码模块,基于Vue框架开发,knife4j-spring-ui中的文件是使用该模块进行打包构建的
swagger-bootstrap-ui knife4j的前身,最后发布版本是1.9.6
swagger-bootstrap-ui-front swagger-bootstrap-ui的纯前端版本,基于1.9.3的分支改造而成,其他开发语言体系可以使用,改造后一直未更新

Spring Boot 项目使用knife4j

  • 官方给的DOM:knife4j-spring-boot-demo
  • 研究CRMEB_JAVA开源商城系统,学习看在实际开发当中,如何集成进项目!

maven引用

第一步: 是在项目的pom.xml文件中引入knife4j的依赖,如下:

1
2
3
4
5
6
7
8
java复制代码<dependencies>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-spring-boot-starter</artifactId>
<!--在引用时请在maven中央仓库搜索最新版本号-->
<version>2.0.2</version>
</dependency>
</dependencies>

如果你想使用bom的方式引入,请参考Maven Bom方式引用

创建Swagger配置文件

新建Swagger的配置文件SwaggerConfiguration.java文件,创建springfox提供的Docket分组对象,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
less复制代码@Configuration
@EnableSwagger2
@EnableKnife4j
@Import(BeanValidatorPluginsConfiguration.class)
public class SwaggerConfiguration {


@Bean(value = "defaultApi2")
public Docket defaultApi2() {
Docket docket=new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
//分组名称
.groupName("2.X版本")
.select()
//这里指定Controller扫描包路径
.apis(RequestHandlerSelectors.basePackage("com.swagger.bootstrap.ui.demo.new2"))
.paths(PathSelectors.any())
.build();
return docket;
}

}

以上有两个注解需要特别说明,如下表:

注解 说明
@EnableSwagger2 该注解是Springfox-swagger框架提供的使用Swagger注解,该注解必须加
@EnableKnife4j 该注解是knife4j提供的增强注解,Ui提供了例如动态参数、参数过滤、接口排序等增强功能,如果你想使用这些增强功能就必须加该注解,否则可以不用加

访问

  • 在浏览器输入地址:http://host:port/doc.html
  • 点击进入CRMEB-JAVA开源商城项目学习查看:Gitee项目地址

感谢您的阅读,如果对您有帮助,欢迎关注CRMEB掘金号。码云上有我们开源的商城项目,知识付费项目,均是基于PHP+vue+mysql开发,学习研究欢迎使用,大家可以动动发财的小手点点Start哦,关注我不迷路!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

深坑啊!同一个Spring AOP的坑,我一天踩了两次!

发表于 2020-11-17

GitHub 18k Star 的Java工程师成神之路,不来了解一下吗!

GitHub 18k Star 的Java工程师成神之路,真的不来了解一下吗!

GitHub 18k Star 的Java工程师成神之路,真的真的不来了解一下吗!

前几天,我刚刚发布过一篇文章《自定义注解!绝对是程序员装逼的利器!!》,介绍过如何使用Spring AOP + 自定义注解来提升代码的优雅性。

很多读者看完之后表示用起来很爽,但是后台也有人留言说自己配置了Spring的AOP之后,发现切面不生效。

其实,这个问题我在用的过程中也遇到过,而且还是同一个问题一天之内遇到了两次。

说明这个问题很容易被忽略,并且这个问题带来的后果可能是极其严重的。那么,我们就来简单回顾一下问题是怎么样的。

问题重现

最初我定义了一个注解,希望可以方便统一的对一些数据库操作做缓存。于是就有了以下代码:

首先,定义一个注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
less复制代码@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Cacheable {

/**
* 策略名称,需要保证唯一
*
* @return
*/
public String keyName();

/**
* 超时时长,单位:秒
*
* @return
*/
public int expireTime();

}

然后自定义一个切面,对所有使用了该注解的方法进行切面处理:

1
2
3
4
5
6
7
8
9
less复制代码@Aspect
@Component
public class CacheableAspect {
private static final Logger LOGGER = LoggerFactory.getLogger(FacadeAspect.class);
@Around("@annotation(com.hollis.cache.Cacheable)")
public Object cache(ProceedingJoinPoint pjp) throws Throwable {
// 先查缓存,如果缓存中有值,直接返回。如果缓存中没有,先执行方法,再将返回值存储到缓存中。
}
}

然后就可以使用该注解了,使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typescript复制代码@Component
public class StrategyService extends BaseStrategyService {

public PricingResponse getFactor(Map<String, String> pricingParams) {
// 做一些参数校验,以及异常捕获相关的事情
return this.loadFactor(tieredPricingParams);
}

@Override
@Cacheable(keyName = "key0001", expireTime = 60 * 60 * 2)
private PricingResponse loadFactor(Map<String, String> pricingParams) {
//代码执行
}
}

以上,对loadFactor方法增加了切面,为了方便使用,我们还定义了一个getFactor方法,设置为public,方便外部调用。

但是,在调试过程中,我发现我们设置在loadFactor方法上面的切面并没有成功,无法执行切面类。

于是开始排查问题具体是什么。

问题排查

为了排查这个问题,首先是把所有的代码检查一遍,看看切面的代码是不是有问题,有没有可能有手误打错了字之类的。

但是发现都没有。于是就想办法找找问题。

接下来我把loadFactor的访问权限从private改成public,发现没有效果。

然后我尝试着在方法外直接调用loadFactor而不是getFactor。

发现这样做就可以成功的执行到切面里面了。

发现这一现象的时候,我突然恍然大悟,直捶大腿。原来如此,原来如此,就应该是这样的。

我突然就想到了问题的原因。其实原因挺简单的,也是我之前了解到过的原理,但是在问题刚刚发生的时候我并没有想到这里,而是通过debug,发现这个现象之后我才突然想到这个原理。

那么,就来说说为什么会发生这样的问题。

代理的调用方式

我们发现上面的问题关键在于loadFactor方法被调用的方式不同。我们知道,方法的调用通常有以下几种方式:

1、在类内部,通过this进行自调用:

1
2
3
4
5
6
7
8
9
10
11
typescript复制代码public class SimplePojo implements Pojo {

public void foo() {
// this next method invocation is a direct call on the 'this' reference
this.bar();
}

public void bar() {
// some logic...
}
}

2、在类外部,通过该类的对象进行调用

1
2
3
4
5
6
7
8
typescript复制代码public class Main {

public static void main(String[] args) {
Pojo pojo = new SimplePojo();
// this is a direct method call on the 'pojo' reference
pojo.foo();
}
}

类关系及调用过程中如下图:

-w485

如果是静态方法,也可以通过类直接调用。

3、在类外部,通过该类的代理对象进行调用:

1
2
3
4
5
6
7
8
9
10
11
12
dart复制代码public class Main {

public static void main(String[] args) {
ProxyFactory factory = new ProxyFactory(new SimplePojo());
factory.addInterface(Pojo.class);
factory.addAdvice(new RetryAdvice());

Pojo pojo = (Pojo) factory.getProxy();
// this is a method call on the proxy!
pojo.foo();
}
}

类关系及调用过程中如下图:

-w613

那么,Spring的AOP其实是第三种调用方式,就是通过代理对象调用,只有这种调用方式,才能够在真正的对象的执行前后,能够让代理对象也执行相关代码,才能起到切面的作用。

而对于使用this的方式调用,这种只是自调用,并不会使用代理对象进行调用,也就无法执行切面类。

问题解决

那么,我们知道了,想要真正的执行代理,那么就需要通过代理对象进行调用而不是使用this调用的方式。

那么,这个问题的解决办法也就是想办法通过代理对象来调用目标方法即可。

这种问题的解决网上有很多种办法,这里介绍一个相对简单的。其他的更多的办法大家可以在网上找到一些案例。搜索关键词”AOP 自调用”即可。

获取代理对象进行调用

我们需要修改一下前面的StrategyService的代码,修改成以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typescript复制代码@Component
public class StrategyService{

public PricingResponse getFactor(Map<String, String> pricingParams) {
// 做一些参数校验,以及异常捕获相关的事情
// 这里不使用this.loadFactor而是使用AopContext.currentProxy()调用,目的是解决AOP代理不支持方法自调用的问题
if (AopContext.currentProxy() instanceof StrategyService) {
return ((StrategyService)AopContext.currentProxy()).loadFactor(tieredPricingParams);
} else {
// 部分实现没有被代理过,则直接进行自调用即可
return loadFactor(tieredPricingParams);
}
}

@Override
@StrategyCache(keyName = "key0001", expireTime = 60 * 60 * 2)
private PricingResponse loadFactor(Map<String, String> oricingParams) {
//代码执行
}
}

即使用AopContext.currentProxy()获取到代理对象,然后通过代理对象调用对应的方法。

还有个地方需要注意,以上方式还需要将Aspect的expose-proxy设置成true。如果是配置文件修改:

1
ini复制代码 <aop:aspectj-autoproxy proxy-target-class="true" expose-proxy="true"/>

如果是SpringBoot,则修改应用启动入口类的注解:

1
2
3
4
kotlin复制代码@EnableAspectJAutoProxy(exposeProxy = true)
public class Application {

}

总结

以上,我们分析并解决了一个Spring AOP不支持方法自调用的问题。

AOP失败这个问题,其实还是很严重的,因为如果发生非预期的失效,那么直接问题就是没有执行切面方法,更严重的后果可能是诸如事务未生效、日志未打印、缓存未查询等各种问题。

所以,还是建议大家看完此文之后,统查一下自己的代码,是否存在方法自调用的情况。这种情况下,任何切面都是无法生效的!

关于作者:Hollis,一个对Coding有着独特追求的人,阿里巴巴技术专家,《程序员的三门课》联合作者,《Java工程师成神之路》系列文章作者。

如果您有任何意见、建议,或者想与作者交流,都可以关注公众号【Hollis】,直接后台给我留言。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Java劝退师】Redis 知识脑图 - 分布式缓存 Re

发表于 2020-11-17

Redis

Redis

分布式缓存

一、使用场景

1. 数据库缓存

缓存 - 原始数据的复制级,用于快速访问

将访问过的内容进行缓存,再次访问先找缓存,缓存命中返回数据,不命中找数据库,回填缓存

2. 提高系统响应

Redis数据是放在内存中,数据库数据大多放在硬盘中,内存的访问速度远大于硬盘,相较于数据库可以瞬间处理大量读/写请求

3. 做 Session 分离

当架构使用 Tomcat 集群,并且使用 Tomcat 做 Session 复制,将产生 (1) 复制时性能损耗 (2) 无法保证实时同步 问题,因此可以采取将 Session 统一放在 Redis 中,这样多个 Tomcat 就可以共享 Session 消息

4. 乐观锁

使用 Redis 的 watch + incr 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码jedis1.watch(redisKey);
String redisValue = jedis1.get(redisKey);
int valInteger = Integer.valueOf(redisValue);
String userInfo = UUID.randomUUID().toString();

// 没有秒完
if (valInteger < 20) {
Transaction tx = jedis1.multi();
tx.incr(redisKey);
List list = tx.exec();
// 秒成功 失败返回空list而不是空
if (list != null && list.size() > 0) {
System.out.println("用户:" + userInfo + ",秒杀成功!当前成功人数:" + (valInteger + 1));
}
// 版本变化,被别人抢了
else {
System.out.println("用户:" + userInfo + ",秒杀失败");
}
}
// 秒完了
else {
System.out.println("已经有20人秒杀成功,秒杀结束");
}

5. 分布式锁 (悲观锁)

若需要控制多个进程( JVM ) 并发的时序性(串型化),可以采用 Redis 的 setnx 实现

当 value 不存在时则赋值,属于原子操作

1
2
3
4
5
6
shell复制代码127.0.0.1:6379> setnx name zhangf # 如果name不存在赋值
(integer) 1
127.0.0.1:6379> setnx name zhaoyun # 再次赋值失败
(integer) 0
127.0.0.1:6379> get name
"zhangf"
1
2
3
4
5
6
7
8
9
10
shell复制代码127.0.0.1:6379> set age 18 NX PX 10000 # 如果不存在赋值 有效期10秒
OK
127.0.0.1:6379> set age 20 NX # 赋值失败
(nil)
127.0.0.1:6379> get age # age失效
(nil)
127.0.0.1:6379> set age 30 NX PX 10000 # 赋值成功
OK
127.0.0.1:6379> get age
"30"

自己实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码/**
* 使用redis的set命令实现获取分布式锁
* @param lockKey 可以就是锁
* @param requestId 请求ID,保证同一性 uuid+threadID
* @param expireTime 过期时间,避免死锁
* @return
*/
public boolean getLock(String lockKey,String requestId,int expireTime) {
// NX: 保证互斥性
// hset 原子性操作 只要lockKey有效 则说明有进程在使用分布式锁
String result = jedis.set(lockKey, requestId, "NX", "EX", expireTime);
if("OK".equals(result)) {
return true;
}
return false;
}
1
2
3
4
5
6
7
8
java复制代码public static boolean releaseLock(String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey),Collections.singletonList(requestId));
if (result.equals(1L)) {
return true;
}
return false;
}

使用 Redission 实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public class DistributedRedisLock {

//从配置类中获取redisson对象
private static Redisson redisson = RedissonManager.getRedisson();
private static final String LOCK_TITLE = "redisLock_";

//加锁
public static boolean acquire(String lockName){
//声明key对象
String key = LOCK_TITLE + lockName;
//获取锁对象
RLock mylock = redisson.getLock(key);
//加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadId
mylock.lock(2,3,TimeUtil.SECOND);
//加锁成功
return true;
}

//锁的释放
public static void release(String lockName){
//必须是和加锁时的同一个key
String key = LOCK_TITLE + lockName;
//获取所对象
RLock mylock = redisson.getLock(key);
//释放锁(解锁)
mylock.unlock();
}

}

6. MyBatis 的二级缓存

二、锁的解释

  1. 悲观锁 - 我认为会出问题,所以先上锁 - 性能差
* synchronized
* 数据库中的行锁、表锁
  1. 乐观锁 - 谁都可以来,但是我成功了,你就成功不了 - 性能高
* 秒杀场景

三、缓存的读写模式

1. Cache Aside Pattern - Redis

  • 读: 先读缓存,缓存没有读数据库,取出数据放数缓存,并返回响应
  • 更: 先更新数据库,再删除缓存

不更新缓存的原因 :

  • 缓存如果是一个复杂结构(hash、list),会需要遍历,增加复杂度
  • 只更新不删除有机率出现脏读情况

2. Read/Write Through Pattern - Guava Cache

应用进程只操作缓存,缓存操作数据库

3. Write Behind Caching Pattern - EVCache

应用进程只更新缓存,缓存”异步”批量更新到数据库 - 不能实时同步,甚至可能会丢数据

四、 数据类型

1. String 字符串

  1. 字符串
  2. 数字
  3. 浮点数
  4. 乐观锁 - watch + incr
  5. 分布式锁 - setnx

2. List 列表

存储有序、可重复元素,获取头部、尾部纪录极快

  1. 栈、队列
  2. 用户列表、商品列表、评论列表

3. Set 集合

无序、唯一元素

  1. 关注的用户
  2. 随机抽奖 ( spop命令 )

4. SortedSet 有序集合

元素唯一,可按分数排序

底层实现: 跳跃表

  1. 点击排行榜
  2. 销量排行榜
  3. 关注排行榜

5. Hash 散列表

String 类型的 field、value 映射表

  1. 对象
  2. 表数据映射

6. Bitmap 位图

value 只能是 0 或 1

  1. 用户签到
  2. 统计活跃用户
  3. 查找用户在线状态

7. Geo 地理位置

使用Z阶曲线、Base32编码、GeoHash算法,保存经纬度

  1. 纪录地理位置
  2. 计算距离
  3. 附近的人

8. Stream 数据流

持久化消息队列

五、过期、淘汰策略

1. maxmemory

  1. 【默认】禁止驱逐 - 可作为DB使用 - 数据太多可能导致崩溃
  2. 【推荐】设置为物理内存的 3/4

如果设置了 maxmemory 则 maxmemory-policy 要配置

2. maxmemory-policy (删除策略)

  1. 定时删除
* 使用定时器删除过期时间的 Key - 不推荐
  1. 惰性删除
* 访问 Key 发现已过期,则删除
  1. 主动删除 - 随机挑选键值对,使用遍历太耗时
* no-enviction - 不删除 (默认)
* allkeys-lru - 使用时间最远 (看使用的时间) - 通常采用
* volatile-lru - 从已设置过期时间的数据,挑选使用时间最远
* allkeys-lfu - 最近最少使用 (看使用次数)
* volatile-lfu - 从已设置过期时间的数据,挑选最近最少使用
* allkeys-random - 随机 - 希望请求压力平均分布时采用
* volatile-random - 从已设置过期时间的数据,随机挑选
* volatile-ttl - 挑选 TTL 值最小

Redis 默认采用 惰性删除 + 主动删除

3. expire (TTL)

数据存活时间

六、持久化

目的是为了快速恢复数据而非存储数据

AOF 记录过程,RDB只管结果

1. RDB 快照 (默认)

流程 : 父进程 fork 子进程 (此时父进程阻塞),子进程创建 RDB 文档,根据父进程内存生成快照文档,并对原有文档进行原子替换

优点:

  • 使用二进制压缩,空间小,方便传输

缺点:

  • 无法保证数据完整,将丢失快照以后的所有数据
  • fork 子进程过程阻塞,若数据太大,将导致短时间无法响应请求 (如需避免须关闭RDB,启用AOF)

2. AOF 操作日志

将运行的命令记录到 AOF 文档中

优点:

  • 数据安全,不丢失数据

缺点:

  • 性能低

保存模式 (硬盘)

  1. AOF_FSYNC_NO : 不保存

只有当 (1) Redis 关闭 (2) AOF 功能关闭 (3) 操作系统写缓存刷新 才会将AOF存到硬盘
2. AOF_FSYNC_EVERYSEC : 每秒保存一次【默认】

最多丢失2秒钟数据
3. AOF_FSYNC_ALWAYS : 一条命令保存一次 (不推荐)

最多丢失一条命令数据

重写

将 AOF 文件内命令进行删除与合并,对文件进行瘦身,且整个过程绝对安全

七、Redis 弱事务

开启事务后

(1) 语法错误 - 命令队列清除

(2) 运行错误[类型错误..等] - 正确的命令 运行,不回滚 【性能考量】

1. Redis 的 ACID

  • Atomicity (原子性) : 一个队列中的命令,要么运行,要么不运行
  • Consistency (一致性): 事务运行前、后状态必须是一致的。Redis 是 AP 模型,集群中不能保证实时一致,只能保证最终一致
  • Isolation (隔离性) : 命令是顺序运行,但在一个事务中,有可能运行其他客户端的命令
  • Durability (持久性) : 有持久化,但不保证数据完整性

2. 事务命令

当被监视的字段被其他客户端更动之后,监视后开启的命令队列将被清空

  1. watch : 监视key (客户端内共享)
  2. multi : 开启事务,后续命令将放入命令队列中
  3. exec : 运行命令队列
  4. discard : 清除命令队列
  5. unwatch : 清除监视key (客户端内共享)
1
2
3
4
5
6
7
8
9
10
11
12
shell复制代码127.0.0.1:6379> watch s1 # 监视key(客户端内共享) 当被监视的字段被其他客户端更动之后,监视后开启的命令队列将被清空
OK
127.0.0.1:6379> multi # 开启事务,后续命令将放入命令队列中
OK
127.0.0.1:6379> set s1 555 # 此时在没有exec之前,通过另一个命令窗口对监控的s1字段进行修改
QUEUED
127.0.0.1:6379> exec # 运行命令
(nil)
127.0.0.1:6379> get s1 # 因为命令队列被清空,导致命令运行失败,此处只能查找到其他客户端修改后的结果
222
127.0.0.1:6379> unwatch # 清除监视key
OK

3. Lua 脚本

具备强原子性,脚本运行过程,不允许插入新的命令,故运行时间应尽量短

八、优化

  1. 避免大key ( value > 100K ) → 拆为小 key
  2. 避免使用 key*、hgetall … 等全量操作
  3. RDB改为AOF,甚至关闭
  4. 添加多条数据时使用管道 pipeline
  5. 使用 Hash 存储
  6. 限制内存大小,避免出现 swap 或 OOM 错误

九、高可用

1. 主从复制 (读写分离)

  1. 主可写,从不可写
  2. 主挂了,从不能为主
  3. 从服务器使用 replicaof 命令开启

2. 哨兵

当主服务器下线,Sentinel 将从服务器升级为主服务器

初始化

  1. Sentinel 向 Master 发送 info 命令,取得 Slave 服务器地址,之后向 Master、Slave 发送 info 命令,获得 Redis 状态
  2. Sentinel 向 Master、Slave 订阅 :hello 频道,并向频道发送自身消息,让 Sentinel 之间可以互相感知

Sentinel Leader 选举

  1. Sentinel 每秒向 Redis 发送心跳连接,若无回应则视为【主观下线】,并向其他 Sentinel 发送查找命令,若有 quorum 数量的 Sentinel 都认为该 Redis 下线,将被判定为【客观下线】
  2. 当 Master 客观下线,将使用 Raft 协议选出 Leader Sentinel 运行 Redis 的【故障转移】
Raft

选举开始,所有节点都是 Follower。如果收到 RequestVote (投票给我) 、AppendEntries (已选出Leader) 的请求,则保持 Follower 状态

一段时间(随机)内没收到请求,则将身分转换为 Candidate 开始竞选 Leader,如果获得过半票数则成为 Leader。

如果最后未选出 Leader,则 Term + 1,开启下一轮选举

故障转移

  1. 选出 Slave 取代原 Master ,并让其他 Slave 复制新 Master

选择标准 (1) slave-priority 最高 (2) 复制偏移量最大 (3) run_id 最小[重启最少次]
2. 向客户端返回新 Master 地址
3. 更新所有 Redis 的 redis.conf、sentinel.conf

3. Codis (Proxy)

优点:

  • 客户端透明,和 Codis 交互与和 Redis 交互相同
  • 支持在线数据迁移
  • 支持高可用 (Redis、Proxy)
  • 数据自动均衡分配
  • 支持 1024 个 Redis 实例

缺点:

  • 某些命令不支持
  • 只有一个 Codis,性能将下降20%
  • 采用自有的 Redis 分支,与原版不同步

4. Redis Cluster

优势

  1. 高性能
* 多主节点、负载均衡、读写分离
  1. 高可用
* 主从复制、Raft选举
  1. 易扩展
* 添加、移除节点,不须停机
* 数据分片
  1. 原生
* 不需要其他代理或工具,和单机 Redis 完全兼容

失效判定

  • 半数以上主节点当机 (无法投票)
  • 某个分区的主、从节点同时当机 (slot槽不连续)

副本飘移

集群中拥有最多从机的节点组,漂移到单点的主从节点组

十、高并发问题

1. 缓存穿透

查找缓存中 Key 不存在的数据,会穿透缓存去查数据库,导致DB压力过大

解决方案:

  1. 结果为空也进行缓存,TTL 设短一些,且在对数据库 insert 数据后清除缓存

问题: 缓存空值将占用空间
2. 使用布隆过滤器先使用布隆过滤器查找 Key 是否存在,不存在则返回,存在再查缓存与DB

2. 缓存雪崩

大量缓存在同一时刻失效,导致客户端直接查找DB,造成DB压力过大

解决方案:

  1. 让 Key 的失效期分散
  2. 设置二级缓存(本地缓存) - 可能存在数据不一致问题
  3. 高可用(读写分离)

3. 缓存击穿

热Key失效,导致DB某个纪录瞬间被大量访问

解决方案:

  1. 使用分布式锁 setnx,让其他线程处于等待状态,来保证DB安全
  2. Key 不设置超时时间,过期策略使用 validate-lru

4. 数据一致性 - 延迟双删

  1. 更新DB后删除缓存,读数据再填充缓存
  2. 2 秒后再删除一次缓存
  3. 设置缓存过期时间 10秒 or 1小时
  4. 若缓存删除失败,则记录到日志,并用脚本提取后删除 (1天)

5. 大Key

  1. Value 是 String 类型,可以存到 MongoDB 或 CDN 上,如果必须用 Redis,则单独存储,并且采一主多从架构
  2. hash、set、zset、list 类型,元素过多,可以将 Key 进行 Hash 取模后生成新 Key,将 Key 进行分拆
  3. 删除使用 unlink 而非 del 命令

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…765766767…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%