开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Java 多线程 阻塞队列 没啥好说的

发表于 2021-03-14

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

如题 , 阻塞队列真没啥好说的 , 工具类 , 了解功能感觉就可以了

一 . 阻塞队列简述

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

  • 在队列为空时,获取元素的线程会等待队列变为非空。
  • 当队列满时,存储元素的线程会等待队列可用。

阻塞队列常用于生产者和消费者的场景:

  • 生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程
  • 阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

通用概念

  • implements Queue : 基本上都是Queue的实现类 ,即实现了 Queue 的方法
  • 可以通过构造方法初始化容量和排序
  • 构造方法可以传入整个集合

队列类型

  • ArrayBlockingQueue :一个由 数组 结构组成的 有界 阻塞队列。
  • LinkedBlockingQueue :一个由 链表 结构组成的 无界 阻塞队列。
  • PriorityBlockingQueue :一个 支持优先级排序 的 无界 阻塞队列。
  • DelayQueue:一个使用 优先级队列 实现的 无界 阻塞队列。
  • SynchronousQueue:一个 不存储元素 的阻塞队列。
  • LinkedTransferQueue:一个由 链表 结构组成的 无界 阻塞队列。
  • LinkedBlockingDeque:一个由 链表 结构组成的双向阻塞队列。
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
// 按照类型分类
• 无锁非阻塞并发队列:ConcurrentLinkedQueue和ConcurrentLinkedDeque
• 普通阻塞队列:基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue和LinkedBlockingDeque
• 优先级阻塞队列:PriorityBlockingQueue
• 延时阻塞队列:DelayQueue
• 其他阻塞队列:SynchronousQueue和LinkedTransferQueue

处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
检查方法 element() peek() 不可用 不可用

性能对比

  • 1、ArrayBlockingQueue 性能优于LinkedBlockingQueue,但是LinkedBlockingQueue是无界的。
  • 2、ArrayBlockingQueue 和 LinkedBlockingQueue 的 poll方法总是比offer方法快,并发越高,差距越大
  • 3、ArrayBlockingQueue 和 LinkedBlockingQueue 的 性能远高于PriorityBlockingQueue,显然优先队列在比较优先级上的操作上耗费太多
  • 4、PriorityBlockingQueue的 offer方法与 poll方法的性能差距很小,基本维持在近似1:1
线程数 20 50 100 200 500 1000
LinkedBlockingQueue 15,0 31,15 32,16 63,32 203,47 563,110
ArrayBlockingQueue 15,0 16,15 31,15 47,16 125,47 364,68
PriorityBlockingQueue 78,78 172,188 360,422 813,969 3094,2641 6547,5453

二. ArrayBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
JAVA复制代码> 一个由数组实现的有界阻塞队列。该队列采用 FIFO 的原则对元素进行排序添加的 
> ArrayBlockingQueue 为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了
> ArrayBlockingQueue 支持对等待的生产者线程和使用者线程进行排序的可选公平策略
- 但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。
- 公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。

// 构造器 :
MC- ArrayBlockingQueue(int capacity)
MC- ArrayBlockingQueue(int capacity, boolean fair)

// 抽象类和接口
I- BlockingQueue<E> : 提供了在多线程环境下的出列、入列操作
?- 内部使用可重入锁 ReentrantLock + Condition 来完成多线程环境的并发操作

// 变量
• items 变量,一个定长数组,维护 ArrayBlockingQueue 的元素。
• takeIndex 变量,int ,为 ArrayBlockingQueue 队首位置。
• putIndex 变量,int ,ArrayBlockingQueue 队尾位置。
• count 变量,元素个数。
• lock 变量,ReentrantLock ,ArrayBlockingQueue 出列入列都必须获取该锁,两个步骤共用一个锁。
• notEmpty 变量,非空,即出列条件。
• notFull 变量,未满,即入列条件。

// 入队
M- add(E e) 方法 : 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true , 满了抛出异常
M- offer(E e) 方法 : 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true , 满了返回false
M- offer(E e, long timeout, TimeUnit unit) 方法 : 将指定的元素插入此队列的尾部 , 已满在设定时间内等待
M- put(E e) 方法 : 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间
M- enqueue :
- 正常添加元素 , 到达队尾的时候重定向到队头
- 总数 + 1
- 通知阻塞线程

// 出列
M- poll() 方法:获取并移除此队列的头,如果此队列为空,则返回 null 。
M- poll(long timeout, TimeUnit unit) 方法:获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
M- take() 方法:获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
M- remove(Object o) 方法:从此队列中移除指定元素的单个实例(如果存在)。

// 核心总结 :
M- offer : 通过 ReentrantLock 上锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- finally -> lock.unlock();

// 关键点 :
1 创建后,容量将无法更改
2 尝试将元素放入满队列将导致操作阻塞
3 尝试从空队列中取出一个元素]也会类似地被阻塞
4 支持可选的公平性策略

三 . DelayQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素
- 如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。
- 也就是说只有在延迟期到时才能够从队列中取元素。

// 作用 :
• 缓存:清掉缓存中超时的缓存数据
• 任务超时处理

// 关键 :
1. 可重入锁ReentrantLock
2. 用于阻塞和通知的Condition对象
3. 根据Delay时间排序的优先级队列:PriorityQueue
4. 用于优化阻塞通知的线程元素leader

// 结构 :
E- AbstractQueue
I- BlockingQueue
M- offer() : 往PriorityQueue中添加元素
- 向 PriorityQueue中插入元素
- 判断当前元素是否为对首元素,如果是的话则设置leader=null , 唤醒所有线程
M- take()
- 获取队首 --- q.peek
IF- 队首为空 , 阻塞 ,等待off 唤醒
ELSE-
获取队首的超时时间 , 已过期则出对
- 如果存在其他线程操作 ,阻塞 , 不存在其他线程 , 独占
- 超时阻塞 --- available.awaitNanos(delay);
- 唤醒阻塞线程

// 使用方式 :
// Step 1 : new 一个
DelayQueue queue = new DelayQueue();

// Step 2 : 加东西
queue.offer(createUserDelayQueueTO());

四 . SynchronousQueue

  1. SynchronousQueue没有容量。
  • 与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
  1. 因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。
  • 例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
  1. SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
  2. 若使用 TransferQueue, 则队列中永远会存在一个 dummy node(这点后面详细阐述)。

SynchronousQueue非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务。

1
2
3
4
5
java复制代码C- SynchronousQueue
E- AbstractQueue
I- BlockingQueue
C- TransferQueue
?- 实现公平性策略的核心类,其节点为QNode

五 . LinkedBlockingDeque

  • 一个有链表组成的双向阻塞队列,与前面的阻塞队列相比它支持从两端插入和移出元素。
    • 以first结尾的表示从对头操作,以last结尾的表示从对尾操作。
  • 支持FIFO、FILO两种操作方式

LinkedBlockingQueue是一个阻塞队列

  • 内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。
    • 基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。
    • 头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据
      • 当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。
      • 这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码

// 简介
是先进先出队列FIFO。
采用ReentrantLock保证线程安全

// 操作结果
增加 : 队列满 >
put -> 一直阻塞
add -> 抛出异常
offer -> 返回false

删除 : 队列为空
remove -> NoSuchElementException
poll -> 返回false
take -> 阻塞



// 源码分析
LinkedBlockingQueue
C- static class Node<E> : 核心静态内部类 , 表示一个节点
|- E item : 节点原始
|- Node<E> next : 下一节点
F- int capacity : 容量界限
F- AtomicInteger count : 当前元素个数
F- Node<E> head :头节点
F- Node<E> last : 尾节点
F- ReentrantLock takeLock : take,poll等获取锁
F- Condition notEmpty : 等待任务的等待队列
F- ReentrantLock putLock : put,offer等插入锁
F- Condition notFull : 等待插入的等待队列
MC- LinkedBlockingQueue() : 最大数量
MC- LinkedBlockingQueue(int capacity) : 指定数量
MC- LinkedBlockingQueue(Collection<? extends E> c) : 指定集合
M- signalNotEmpty : 表示等待take。put/offer调用,否则通常不会锁定takeLock
|- 获取 tackLock : this.takeLock
|- 锁定takeLock -> takeLock.lock();
|- 唤醒take 线程等待队列 -> notEmpty.signal();
|- 释放锁 -> takeLock.unlock();
M- signalNotFull : 表示等待put,take/poll 调用
|- 获取putLock : this.putLock;
|- 锁定putLock -> putLock.lock();
|- 唤醒插入线程等待队列 -> notFull.signal();
|- 释放锁
M- enqueue : 在队列尾部插入
|- last = last.next = node;
M- E dequeue():移除队列头
|- 保留头指针
|- 获取当前链表的第一个元素
|- 头指针指向第一个元素
|- 获取第一个元素的值并且移除第一个
|- 返回第一个元素的值
M- fullyLock : 锁定putLock和takeLock
|- putLock.lock();
|- takeLock.lock();
M- fullyUnlock : 先解锁takeLock,再解锁putLock
|- putLock.unlock();
M- offer: 将给定的元素设置到队列中,如果设置成功返回true
|- 非空判断 , 获取计数器
|- 判断队列是否已满 -> 返回 Boolean
|- 新建节点
|- 获取插入锁 , 并且锁定
|- 队列未满 -> 插入 -> 计数
|- 如果未满 ,继续唤醒插入线程
|- 解锁
|- 如果对了为空 ,获取线程锁阻塞
M- offer(E e, long timeout, TimeUnit unit) :给定的时间内设置到队列中
M- put(E e) : 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞 , 直到队列中有多余的空间
|- 核心1 : putLock.lockInterruptibly();
-> 设置前加锁
|- 核心2 : notFull.await();
-> 队列满时等待
M- take() : 从队列中获取值,如果队列中没有值
M- peek() : 非阻塞的获取队列中的第一个元素,不出队列
M- poll() : 非阻塞的获取队列中的值,未获取到返回null。
M- poll(long timeout, TimeUnit unit) :在给定的时间里,从队列中获取值
M- remove(Object o):从队列中移除指定的值。将两把锁都锁定。
M- clear():清空队列。
M- drainTo(Collection c):将队列中值,全部移除,并发设置到给定的集合中。

六 . LinkedTransferQueue

  • LinkedTransferQueue是一个由链表组成的的无界阻塞队列
  • 它是ConcurrentLinkedQueue、SynchronousQueue (公平模式下)、无界的LinkedBlockingQueues等的超集。

与其他BlockingQueue相比,他多实现了一个接口TransferQueue, 该接口是对BlockingQueue的一种补充,多了tryTranfer()和transfer()两类方法:

  • tranfer():若当前存在一个正在等待获取的消费者线程,即立刻移交之。
    • 否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素
  • tryTranfer(): 若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;
    • 若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作

七 . PriorityBlockingQueue

1
2
3
4
java复制代码- PriorityBlockingQueue是支持优先级的无界队列。
- 默认情况下采用自然顺序排序,当然也可以通过自定义Comparator来指定元素的排序顺序。
- PriorityBlockingQueue内部采用二叉堆的实现方式,整个处理过程并不是特别复杂。
- 添加操作则是不断“上冒”,而删除操作则是不断“下掉”。

八 . ArrayBlockingQueue 与 LinkedBlockingQueue 的区别

Queue 阻塞与否 是否有界 线程安全保障 适用场景 注意事项
ArrayBlockingQueue 阻塞 有界 一把全局锁 生产消费模型,平衡两边处理速度 用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
LinkedBlockingQueue 阻塞 可配置 存取采用 2 把锁 生产消费模型,平衡两边处理速度 无界的时候注意内存溢出问题,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加 JVM 垃圾回收的负担。

九 . 双端队列

而 ArrayDeque、LinkedBlockingDeque 就是双端队列,类名以 Deque 结尾

正如阻塞队列适用于生产者消费者模式,双端队列同样适用与另一种模式,即工作密取。

  • 在生产者-消费者设计中,所有消费者共享一个工作队列,而在工作密取中,每个消费者都有各自的双端队列。
  • 如果一个消费者完成了自己双端队列中的全部工作,那么他就可以从其他消费者的双端队列末尾秘密的获取工作。
    • 具有更好的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。
  • 在大多数时候,他们都只是访问自己的双端队列,从而极大的减少了竞争。
  • 当工作者线程需要访问另一个队列时,它会从队列的尾部而不是头部获取工作,因此进一步降低了队列上的竞争。

十 . 队列对象

1
2
3
4
5
6
7
java复制代码> 阻塞队列 : 阻塞队列有普通的先进先出队列,

> 包括基于数组的ArrayBlockingQueue
> 基于链表的LinkedBlockingQueue/LinkedBlockingDeque
> 基于堆的优先级阻塞队列PriorityBlockingQueue
> 可用于定时任务的延时阻塞队列DelayQueue
> 用于特殊场景的阻塞队列SynchronousQueue和LinkedTransferQueue

十一 . CopyOnWriteArrayList

CopyOnWrite容器即写时复制的容器。
当我们往容器添加元素的时候,先将当前容器进行Copy,复制出一个新的容器,
然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
java复制代码
优缺点
|- 优点:
读操作性能很高,比较适用于读多写少的并发场景。
Java的list在遍历时,若中途有别的线程对list容器进行修改,则会抛出ConcurrentModificationException异常。
而CopyOnWriteArrayList由于其"读写分离"的思想,遍历和修改操作分别作用在不同的list容器
?- 所以在使用迭代器进行遍历时候,也就不会抛出ConcurrentModificationException异常。

|- 缺点:
内存占用问题,执行写操作时会发生数组拷贝
无法保证实时性,Vector对于读写操作均加锁同步,可以保证读和写的强一致性。
而CopyOnWriteArrayList由于其实现策略的原因,写和读分别作用在新老不同容器上
?- 在写操作执行过程中,读不会阻塞但读取到的却是老容器的数据。

|- 使用场景 :
CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景。

CopyOnWriteArrayList
F- ReentrantLock lock = new ReentrantLock() --> 重入锁
F- volatile Object[] array; --> 只能通过 getArray/setArray 访问的数组
M- Object[] getArray() --> 获取数组,非私有方法以便于CopyOnWriteArraySet类的访问
M- setArray(Object[] a) --> 设置数组
M- CopyOnWriteArrayList -- 创建一个空数组
M- CopyOnWriteArrayList(Collection<? extends E> c)
?- 创建一个包含指定集合的数组
B- 如果c的类类型为CopyOnWriteArrayList
|- 直接获取其数组
E- 如果不是
|- 通过 toArray 转数组
|- 如果c.toArray返回的不是 Object[]类型,则通过数组拷贝
|- 设置数组 : setArray(elements);
M- CopyOnWriteArrayList(E[] toCopyIn) : 创建包含给定数组副本的列表
M- size() : 获取数量
M- isEmpty() : 判断列表元素是否为空
M- eq(Object o1, Object o2) : 判断o1 o2是否相等
M- indexOf(Object o, Object[] elements,int index, int fence)
B- 为null , for 循环迭代第一个 null
E- 不为 null ,for 循环 eq
M- lastIndexOf :索引倒叙
M- contains : IndexOf 判断
M- clone : 浅拷贝
|- 重置锁定
|- 返回clone 属性
M- toArray
M- get : 获取原数组中元素
M- set:用指定的元素替换列表中指定位置的元素
|- 获取当前锁并且锁定
|- 获取元素数组
|- 获取老的值
B- 如果老的值和给定值不相等
|- 原数组拷贝 , 将新数组中的索引位置修改为新值
|- 将原数组替换为新数组
E- 否则
|- setArray(elements);
|- 返回老的值
M- add(E e) : 将指定的元素追加到此列表的末尾
|- 获取重入锁 ,锁定
|- 获取原数组
|- 原数组拷贝 并增加一个空位
|- 将指定元素增加到新数组新增的空位中
|- 新数组替换原数组
M- remove :
|- 获取锁并且锁定
|- 获取原数组
|- 获取要删除的元素值 , 获取要移动的值
B- 如果为0,则删除的是最后一个元素
-> setArray(Arrays.copyOf(elements, len - 1));
E- 否则 复制拷贝
|- 新建数组
|- 将原数组中,索引index之前的所有数据,拷贝到新数组中
|- 将元素组,索引index+1 之后的numMoved个元素,复制到新数组,索引index之后
|- 替换原数组
|- 返回老的值 ,最后释放锁
C- COWSubList : 内部视图类

更新记录

  • 20210727 : 修改格局

致谢

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
芋道源码 : http://www.iocoder.cn/JUC/sike/aqs-3/

https://mp.weixin.qq.com/s?__biz=MzIxOTI1NTk5Nw==&mid=2650047475&idx=1&sn=4349ee6ac5e882c536238ed1237e5ba2&chksm=8fde2621b8a9af37df7d0c0a7ef3178d0253abf1e76a682134fce2f2218c93337c7de57835b7&scene=21#wechat_redirect

https://blog.csdn.net/javazejian/article/details/70768369

死磕系列 , 我的多线程导师
http://cmsblogs.com/?cat=151

// JVM 源码 C
https://www.jianshu.com/p/a604f1a9f875

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 多线程 真想聊清楚线程池

发表于 2021-03-14

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

线程池这东西 , 用很简单 , 想用好 , 不容易啊~~

一 . 线程池简介

1 线程池的元素
线程池主要由两个概念组成,一个是任务队列,另一个是工作者线程。

  • 任务队列是一个阻塞队列,保存待执行的任务。
  • 工作者线程主体就是一个循环,循环从队列中接受任务并执行。

2 为什么要用线程池

  • 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

3 线程池中的核心概念

  • BlockingQueue workQueue : 用于保留任务并移交给工作线程的队列
  • HashSet workers : 线程池中所有的工作线程

4 线程池的原理定义 :
线程池通过一个叫 ctl 的 AtomicInteger 决定运行情况 , 通过 ThreadFactory 创建线程 , 并且把等待的线程放入 workQueue , 等待移交给工作线程

二. 常见的线程池

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码// 基本对象
ThreadPoolExecutor

// 可重用固定线程数的线程池
FixedThreadPool
- ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

// 使用单个worker线程的Executor
SingleThreadExecutor

// 会根据需要创建新线程的线程池
CachedThreadPool

三. 线程池的创建

线程池创建可以通过 ThreadPoolExecutor 和 工具类 Executors 实现

3.1 通过构造方法实现(推荐)

通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则

3.2 通过Executor 框架的工具类Executors来实现 (个人demo 可以考虑)

3.2.1 FixedThreadPool

1
java复制代码return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
  • 该方法返回一个固定线程数量的线程池。 (corePoolSize == maximumPoolSize)
  • 使用LinkedBlockingQuene作为阻塞队列
  • 当线程池没有可执行任务时,也不会释放线程
  • 该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。

3.2.2 SingleThreadExecutor

1
2
java复制代码return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
  • 方法返回一个只有一个线程的线程池。
  • 若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。

3.2.3 CachedThreadPool:

1
java复制代码return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
  • 该方法返回一个可根据实际情况调整线程数量的线程池。(默认缓存60s , 线程池的线程数可达到Integer.MAX_VALUE,即2147483647)
  • 内部使用SynchronousQueue作为阻塞队列
  • 线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。
  • 所有线程在当前任务执行完毕后,将返回线程池进行复用。
  • 在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源

3.2.4 ScheduledExecutorService :

1
java复制代码return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
  • 初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据

四. Fork/Join

1
2
3
4
5
6
7
8
9
10
java复制代码1 >  Fork / Join 的核心是 ForkJoinPool , 用于来管理工作线程 
: 工作线程一次只能执行一个任务 ,
: 不会根据任务创建线程,而是将任务存储到工作线程的双端队列中
2 > Fork / join 的思路是分而治之
- Fork 递归的将任务分为较小的子任务
- Join : 将子任务递归的串联成单个结果

3 > 工作窃取算法 : 空闲的线程试图从繁忙的线程(他们的双端队列)中窃取工作

// Fork/Join 依赖于 ForkJoinPool , 此处仅简单介绍 , 详情参考十六章

五. ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码ThreadPoolExecutor实现了生产者/消费者模式,
- 工作者线程就是消费者
- 任务提交者就是生产者,线程池自己维护任务队列。

> ThreadPoolExecutor
- AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
: 此变量 记录了 “线程池中的任务数量”和“线程池的状态”两个信息
: 高3位表示"线程池状态",低29位表示"线程池中的任务数量"
- RUNNING : 111 : 该线程池能接收新任务 ,且能对新任务进行处理
- SHUTDOWN : 000 : 不能接收新任务 ,但是可以对任务进行处理
- STOP : 001 : 不添加新任务 , 不对任务进行处理 , 会中断正在执行的任务
- TIDYING : 010 : 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态
- 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态
- TERMINATED : 011 : 线程池彻底终止的状态

----------------------------------------

> ThreadPoolExecutor 的参数
- corePoolSize : 线程池中核心线程的数量
- maximumPoolSize : 线程池中允许的最大线程数
- keepAliveTime : 线程空闲的时间
- unit : keepAliveTime的单位
- workQueue : 用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口
- threadFactory : 用于设置创建线程的工厂
- allowCoreThreadTimeOut : 允许核心线程过期
- Handler : 处理器
- defaultHandler : 任务拒绝处理器

六 . 线程池的饱和和动态调整

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码// 线程池的饱和策略 , 当线程池满了.  会通过对应的策略
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;


// ThreadPoolExecutor 提供了动态调整线程池容量大小的方法:
• setCorePoolSize:设置核心池大小。
• setMaximumPoolSize:设置线程池最大能创建的线程数目大小。
当上述参数从小变大时,ThreadPoolExecutor 进行线程赋值,还可能立即创建新的线程来执行任务。

// 动态调整源码核心 :

七. 线程池执行任务的过程

刚创建时,里面没有线程调用 execute() 方法,添加任务时:

  • 如果正在运行的线程数量小于核心参数 corePoolSize ,继续创建线程运行这个任务
+ 否则,如果正在运行的线程数量大于或等于 corePoolSize ,将任务加入到阻塞队列中。
+ 否则,如果队列已满,同时正在运行的线程数量小于核心参数 maximumPoolSize ,继续创建线程运行这个任务。
+ 否则,如果队列已满,同时正在运行的线程数量大于或等于 maximumPoolSize ,根据设置的拒绝策略处理。
  • 完成一个任务,继续取下一个任务处理。
+ 没有任务继续处理,线程被中断或者线程池被关闭时,线程退出执行,如果线程池被关闭,线程结束。
+ 否则,判断线程池正在运行的线程数量是否大于核心线程数,如果是,线程结束,否则线程阻塞。因此线程池任务全部执行完成后,继续留存的线程池大小为 corePoolSize 。

八. 线程池中 submit 和 execute 方法有什么区别

两个方法都可以向线程池提交任务。

  • #execute(…) 方法,返回类型是 void ,它定义在 Executor 接口中 , 必须实现Runnable接口 。
  • #submit(…) 方法,可以返回持有计算结果的 Future 对象,它定义在 ExecutorService 接口中,它扩展了 Executor 接口,其它线程池类像 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 都有这些方法。

九 . 如果你提交任务时,线程池队列已满,这时会发生什么

重点在于线程池的队列是有界还是无界的。

如果你使用的 LinkedBlockingQueue,也就是无界队列的话,没关系,继续添加任务到阻塞队列中等待执行,因为 LinkedBlockingQueue 可以近乎认为是一个无穷大的队列,可以无限存放任务。

如果你使用的是有界队列比方说 ArrayBlockingQueue 的话,任务首先会被添加到 ArrayBlockingQueue 中,ArrayBlockingQueue满了,则会使用拒绝策略 RejectedExecutionHandler 处理满了的任务,默认是 AbortPolicy 。

十 . 线程池的底层逻辑

要想弄清楚这一部分 , 首先得理解 Queue , Worker , Task , Thread 等多个概念

  • Queue :
  • Worker :
  • Task :
  • Thread :

10.1 线程池的物理结构

Worker 对象

Worker 对象是 ThreadPoolExecutor 中的一个内部类 , 他是一个包装类 , 是一个线程单元 , 同时提供线程的中断等功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码// 问题一 : Worker 结构
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

// 被封装的线程
final Thread thread;
// 初始任务
Runnable firstTask;
// 线程任务计数器
volatile long completedTasks;

// 可以看到 , 把 worker 都行构建成了 Thread
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}


public void run() {
runWorker(this);
}

protected boolean isHeldExclusively() {....}
// 获取同步状态
protected boolean tryAcquire(int unused) {....}
// 释放同步状态
protected boolean tryRelease(int unused) {....}

//锁操作
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

// 暂停操作
void interruptIfStarted(){....}
}

// 问题二 : ThreadPoolExecutor中的线程包装
- 线程被封装成一个对象Worker
- 通过调用 runWorker(Worker w) 获取任务并执行的死循环
- 如果任务的运行出了什么问题 ,调用 processWorkerExit() 处理

C- ThreadPoolExecutor
PVC- Worker
M- run : public void run() {runWorker(this); }

拒绝策略部分 Policy

ThreadPoolExecutor 中提供了4 个拒绝策略内部类 , 具体的类型详见上文 , 这里来看一下结构 :

1
2
3
java复制代码public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

10.2 主要流程

Step 1 : 进入的起点 - 线程的封装

1
2
3
4
5
6
java复制代码// task 的构建 : 匿名传进来的线程会构建成一个 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

Step 2 : 运行的起点 - execute

ThreadPoolExecutor 中 excutor 方法是执行的起点 , 其中会进行三种操作

  1. 当线程池未满时 , 直接 addWorker 运行
  2. 当线程池满了且正在运行时 , 将线程加入 workQueue 中
  3. 当上述均失败后 , 就会调用 reject 来处理异常情况 (RejectedExecutionHandler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码// 问题一 : execute 中线程池处理任务的逻辑
1 int c = ctl.get();
2 if (workerCountOf(c) < corePoolSize) {
3 if (addWorker(command, true))
4 return;
5 c = ctl.get();
6 }
7 if (isRunning(c) && workQueue.offer(command)) {
8 int recheck = ctl.get();
9 if (! isRunning(recheck) && remove(command))
10 reject(command);
11 else if (workerCountOf(recheck) == 0)
12 addWorker(null, false);
13 }
14 else if (!addWorker(command, false))
15 reject(command);

// 2 : workerCountOf 判断当前线程数是否小于corePoolSize 从而决定是否通过 addWorker 创建线程
// 7 : 如果线程池已满 ,且状态为 running , 尝试把任务添加到 workQueue
// 14 : 如果 7 步处理失败 , 尝试 addWorker , 失败则通过 reject 处理


//补充 : addWork 作用
- 检查是否可以根据当前池状态和给定边界(核心或最大)添加新的工作者
- 创建并启动新的worker,运行firstTask作为它的第一个任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码// 问题二 : 线程池运行 Work 详情 (简述一下就是核心的四步)
1 addWorker(Runnable firstTask, boolean core) : 可以看到addWorker 添加的是一个 Runnable
2 new Worker(firstTask) :如果状态符合 ,会创建一个 Worker 对象
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
3 final Thread t = w.thread;
?- 这里将Thread 取了出来

4 后文将会 t.start()运行

// 补充 : 期间还会进行锁的处理 , 省略一些的主要流程如下
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 核心一 : 状态判断
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

// 核心二 : 容量满了的处理 , 退出或者重试 (可以看到 c 语言的影子)
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 核心三 : 处理开始
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 核心四 : 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Step 3 : 工厂的创建

1
2
java复制代码// 问题四 : 创建工厂
从上文说到 , 线程池通过 ThreadFactory 创建线程 (newThread()) ,

Step 4 : 线程的复用

在上文 Step 1 问题一 中 , 将 线程加入到 workQueue 中了isRunning(c) && workQueue.offer(command) , 这里就是取出来的步骤 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码// 这个问题涉及到的方法主要包括 getTask () 
M- runWorker
while (task != null || (task = getTask()) != null) : 死循环 , 只要还有 task 就会执行
- task.run() : 获取到 task 后 通过 task run 执行

M- getTask()
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

// 补充 : 这里可以看到 , worker 可以理解为一个工作线程 ,他通过 while 不停的从 queue 中获取 task 执行

// 这里很有趣 , worker 更像一个加工工厂 , 我一开始还以为迭代的是 worker , 现在发现是在 worker 上锁后在里面迭代
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task 实际上不是 worker 的内部属性
while (task != null || (task = getTask()) != null) {
// 上锁
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
// 线程执行
task.run();
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

Step 5 : 拒绝策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码// 瞅一瞅拒绝策略 : 
当你的线程池满了后 , 通常这个异常就爆出来了
java.util.concurrent.RejectedExecutionException:
Task java.util.concurrent.FutureTask@523424b5 rejected from
java.util.concurrent.ThreadPoolExecutor@319dead1
[Running, pool size = 6, active threads = 6, queued tasks = 3, completed tasks = 0]

- 尽管我们可以通过拒绝策略有很多种 ,但是超高并发的时候哪一种都不靠谱 , 所以我们先看下 , 这个拒绝策略怎么来的
1 从问题三代码的第七行我们就能看到 workQueue.offer(command) , queue 已经 offer 失败了 , 说明Queue 也满了
2 到14行 , 再次通过 addWork 直接运行 , 失败了
3 执行了 reject 方法 , handler.rejectedExecution(command, this);
?- handler 是接口 ,他有四个实现类 , 具体含义可以见上文拒绝策略
4 例如 AbortPolicy 就是 throw new RejectedExecutionException , CallerRunsPolicy 就是再次run
(主线程慢慢跑 , 肯定慢的)

- 所以部分业务我们要改 , 怎么改 ?
1 spring 里面可以自定义你的拒绝策略 , 可以参考这一篇的用法
@ https://blog.csdn.net/tanglei6636/article/details/90721801
2 ThreadPoolExecutor 构造器里面也有

- 改的思路 ?
前提一是集群已经无法解决 (基本上现阶段集群都能满足) ,且你无法节流
1 放到消息队列
2 入库
3 写盘
4 放集合 , 单独一个线程 , 用一个取一个

Step 6 : 线程的关闭

1
2
3
4
java复制代码1 checkShutdownAccess 校验是否可以关闭
2 RunState 改为 STOP
3 ReentrantLock and isInterrupted
4 drainQueue : remove queue 队列

Step 7 : 如何实现回调 ?

1
2
3
4
java复制代码submit 回调
- <T> Future<T> submit(Callable<T> task)
?- 很明显 , submit 返回的是 Future , 这就意味着主线程能阻塞等待
- RunnableFuture<T> ftask = newTaskFor(task);

10.2 底层复杂分析

ctl 到底怎么玩的 ?

1
2
3
4
5
6
7
8
java复制代码> private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
?- 线程池里面通过 ctl 来判断线程的状态 , 前面说了线程池高3位表示 "线程池状态",低29位表示线程池中的任务数量-
?- 以 STOP 状态为例 , 在运行的时候 ,他的十进制值为 536870912
- 首先 ,我们将他转换为二进制 -> 10000 00000 00000 00000 00000 00000
- 获取后面的29位 ,然后前面补齐 , 最后的高三位即为 001
?- 而 TIDYING 对应的就是 00001 00000 00000 00000 00000 00000 00000 -> 010
?- RUNNING 为 -1 , 按照为数不多的一点残留知识 , 这里说成111是因为负数按照补码表示的原因
?- 众所周知 , 二进制处理的效率最高 ,所以这么玩合情合理

线程池公式

1
2
3
4
5
6
7
8
java复制代码> 计算密集型 :Ncpu + 1

> 包含了 I/O和其他阻塞操作的任务 : Nthreads = Ncpu x Ucpu x (1 + W/C)
  Ncpu = CPU的数量
  Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1
  W/C = 等待时间与计算时间的比率

> IO密集型 = 2Ncpu

比较当前线程容量的方法 workerCountOf(c) 为什么要 & 一个 CAPACITY ?

1
2
3
4
5
6
7
java复制代码1 > public static final int SIZE = 32;
?- 用二进制补码形式表示int值的位数
2 > private static final int COUNT_BITS = Integer.SIZE - 3;
3 > private static final int CAPACITY = (1 << COUNT_BITS) - 1;
4 > private static int workerCountOf(int c) { return c & CAPACITY; }

// 原因一 : 还是状态的原因 , 低 29 位才是 线程数量 , 加上这个参数才能包装低 29 二进制时最开始为 0

十一. 线程池使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码
// 构造一个线程池 , 推荐用法
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));


// 除了测试 , 尽量避免使用以下方法构建线程池
// 线程创建
// 1 CachedThreadPool
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {}

// 2 FixedThreadPool
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {}

// 3 SingleThreadExecutor
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {}

// 4 SingleThreadScheduledExecutor
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleWithFixedDelay(() -> {}


// 线程池的关闭
executor.shutdown();
executor.shutdownNow();

// 信息获取
executor.isTerminated() : 是否关闭
executor.getPoolSize()
executor.getQueue().size()

十二 . 线程池的想法

1
2
3
4
5
6
7
8
9
10
java复制代码// 使用线程池时有一些规约和建议是需要注意的 : 
- 创建线程或线程池时请指定有意义的线程名称
- 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式
- 并且不建议创建无界线程 , 避免 OOM
- 必须回收自定义的 ThreadLocal 变量,尤其在线程池场景下

// 注意点 :
- 注意线程池的拒绝策略 , 当线程池满了时 , 可能会因为策略带来系统崩溃
- newCachedThreadPool 也有可能出现 OOM , 其最大值为 newCachedThreadPool
-

总结

看完并发编程的艺术再来改 ,给我等着 , 还不信搞不定你了

更新记录

  • 20210721 : 优化内容 , 重新布局
  • 20210820 : 完善细节 , 使内容更流畅
  • 20210830 : 优化源码细节和逻辑

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

GitHub 近两万 Star,无需编码,可一键生成前后端代

发表于 2021-03-14

公众号:Java小咖秀,网站:javaxks.com

作者 : zhangdaiscott, 链接: github.com/zhangdaisco…

项目介绍:

JeecgBoot 是一款基于代码生成器的低代码开发平台!前后端分离架构 SpringBoot2.x,SpringCloud,Ant Design&Vue,Mybatis-plus,Shiro,JWT,支持微服务。强大的代码生成器让前后端代码一键生成,实现低代码开发! JeecgBoot 引领新的低代码开发模式(OnlineCoding-> 代码生成器-> 手工MERGE), 帮助解决Java项目70%的重复工作,让开发更多关注业务。既能快速提高效率,节省研发成本,同时又不失灵活性!

JeecgBoot 提供了一系列低代码模块,实现在线开发真正的零代码:Online表单开发、Online报表、报表配置能力、在线图表设计、大屏设计、移动配置能力、表单设计器、在线设计流程、流程自动化配置、插件能力(可插拔)等等!

JEECG宗旨是: 简单功能由OnlineCoding配置实现,做到零代码开发;复杂功能由代码生成器生成进行手工Merge 实现低代码开发,既保证了智能又兼顾灵活;实现了低代码开发的同时又支持灵活编码,解决了当前低代码产品普遍不灵活的弊端!

JEECG业务流程: 采用工作流来实现、扩展出任务接口,供开发编写业务逻辑,表单提供多种解决方案:表单设计器、online配置表单、编码表单。同时实现了流程与表单的分离设计(松耦合)、并支持任务节点灵活配置,既保证了公司流程的保密性,又减少了开发人员的工作量。

适用项目

Jeecg-Boot低代码开发平台,可以应用在任何J2EE项目的开发中,尤其适合SAAS项目、企业信息管理系统(MIS)、内部办公系统(OA)、企业资源计划系统(ERP)、客户关系管理系统(CRM)等,其半智能手工Merge的开发方式,可以显著提高开发效率70%以上,极大降低开发成本。

技术架构:

开发环境

  • 语言:Java 8
  • IDE(JAVA):IDEA / Eclipse安装lombok插件
  • IDE(前端):WebStorm 或者 IDEA
  • 依赖管理:Maven
  • 数据库:MySQL5.7+ & Oracle 11g & Sqlserver2017
  • 缓存:Redis

后端

  • 基础框架:Spring Boot 2.3.5.RELEASE
  • 微服务框架:Spring Cloud Alibaba 2.2.3.RELEASE
  • 持久层框架:Mybatis-plus 3.4.1
  • 安全框架:Apache Shiro 1.7.0,Jwt 3.11.0
  • 微服务技术栈:Spring Cloud Alibaba、Nacos、Gateway、Sentinel、Skywarking
  • 数据库连接池:阿里巴巴Druid 1.1.22
  • 缓存框架:redis
  • 日志打印:logback
  • 其他:fastjson,poi,Swagger-ui,quartz, lombok(简化代码)等。

前端

  • Vue 2.6.10
  • Axios
  • ant-design-vue
  • webpack,
  • vue-cropper- 头像裁剪组件
  • @antv/g2 - Alipay AntV 数据可视化图表
  • Viser-vue - antv/g2 封装实现
  • eslint,@vue/cli 3.2.1
  • vue-print-nb - 打印

功能模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
java复制代码├─系统管理
│ ├─用户管理
│ ├─角色管理
│ ├─菜单管理
│ ├─权限设置(支持按钮权限、数据权限)
│ ├─表单权限(控制字段禁用、隐藏)
│ ├─部门管理
│ ├─我的部门(二级管理员)
│ └─字典管理
│ └─分类字典
│ └─系统公告
│ └─职务管理
│ └─通讯录
│ └─多租户管理
├─消息中心
│ ├─消息管理
│ ├─模板管理
├─代码生成器(低代码)
│ ├─代码生成器功能(一键生成前后端代码,生成后无需修改直接用,绝对是后端开发福音)
│ ├─代码生成器模板(提供4套模板,分别支持单表和一对多模型,不同风格选择)
│ ├─代码生成器模板(生成代码,自带excel导入导出)
│ ├─查询过滤器(查询逻辑无需编码,系统根据页面配置自动生成)
│ ├─高级查询器(弹窗自动组合查询条件)
│ ├─Excel导入导出工具集成(支持单表,一对多 导入导出)
│ ├─平台移动自适应支持
├─系统监控
│ ├─Gateway路由网关
│ ├─性能扫描监控
│ │ ├─监控 Redis
│ │ ├─Tomcat
│ │ ├─jvm
│ │ ├─服务器信息
│ │ ├─请求追踪
│ │ ├─磁盘监控
│ ├─定时任务
│ ├─系统日志
│ ├─消息中心(支持短信、邮件、微信推送等等)
│ ├─数据日志(记录数据快照,可对比快照,查看数据变更情况)
│ ├─系统通知
│ ├─SQL监控
│ ├─swagger-ui(在线接口文档)
│─报表示例
│ ├─曲线图
│ └─饼状图
│ └─柱状图
│ └─折线图
│ └─面积图
│ └─雷达图
│ └─仪表图
│ └─进度条
│ └─排名列表
│ └─等等
│─大屏模板
│ ├─作战指挥中心大屏
│ └─物流服务中心大屏
│─常用示例
│ ├─自定义组件
│ ├─对象存储(对接阿里云)
│ ├─JVXETable示例(各种复杂ERP布局示例)
│ ├─单表模型例子
│ └─一对多模型例子
│ └─打印例子
│ └─一对多TAB例子
│ └─内嵌table例子
│ └─常用选择组件
│ └─异步树table
│ └─接口模拟测试
│ └─表格合计示例
│ └─异步树列表示例
│ └─一对多JEditable
│ └─JEditable组件示例
│ └─图片拖拽排序
│ └─图片翻页
│ └─图片预览
│ └─PDF预览
│ └─分屏功能
│─封装通用组件
│ ├─行编辑表格JEditableTable
│ └─省略显示组件
│ └─时间控件
│ └─高级查询
│ └─用户选择组件
│ └─报表组件封装
│ └─字典组件
│ └─下拉多选组件
│ └─选人组件
│ └─选部门组件
│ └─通过部门选人组件
│ └─封装曲线、柱状图、饼状图、折线图等等报表的组件(经过封装,使用简单)
│ └─在线code编辑器
│ └─上传文件组件
│ └─验证码组件
│ └─树列表组件
│ └─表单禁用组件
│ └─等等
│─更多页面模板
│ ├─各种高级表单
│ ├─各种列表效果
│ └─结果页面
│ └─异常页面
│ └─个人页面
├─高级功能
│ ├─系统编码规则
│ ├─提供单点登录CAS集成方案
│ ├─提供APP发布方案
│ ├─集成Websocket消息通知机制
├─Online在线开发(低代码)
│ ├─Online在线表单 - 功能已开放
│ ├─Online代码生成器 - 功能已开放
│ ├─Online在线报表 - 功能已开放
│ ├─Online在线图表(暂不开源)
│ ├─Online图表模板配置(暂不开源)
│ ├─Online布局设计(暂不开源)
│ ├─多数据源管理 - 功能已开放
├─积木报表设计器(低代码)
│ ├─打印设计器
│ ├─数据报表设计
│ ├─图形报表设计(支持echart)
│ ├─大屏设计器(暂不开源)
│─流程模块功能 (暂不开源)
│ ├─流程设计器
│ ├─在线表单设计
│ └─我的任务
│ └─历史流程
│ └─历史流程
│ └─流程实例管理
│ └─流程监听管理
│ └─流程表达式
│ └─我发起的流程
│ └─我的抄送
│ └─流程委派、抄送、跳转
│ └─。。。
└─其他模块
└─更多功能开发中。。

微服务整体解决方案

微服务架构图

image.png

Jeecg Boot 产品功能蓝图

image.png

项目下载和运行

  • 拉取项目代码
1
2
bash复制代码git clone https://github.com/zhangdaiscott/jeecg-boot.git
cd jeecg-boot/ant-design-jeecg-vue

1.安装node.js

2.切换到ant-design-jeecg-vue文件夹下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码# 安装yarn
npm install -g yarn

# 下载依赖
yarn install

# 启动
yarn run serve

# 编译项目
yarn run build

# Lints and fixes files
yarn run lint

系统效果

大屏模板

image.png

image.png

PC端

image.png

image.png

image.png

image.png

image.png

在线接口文档

image.png

image.png

报表

image.png

image.png

image.png

流程

image.png

image.png

image.png

image.png

手机端

image.png

PAD端

image.png

image.png

其他说明

  • 项目使用的 vue-cli3, 请更新您的 cli
  • 关闭 Eslint (不推荐) 移除 package.json 中 eslintConfig 整个节点代码
  • 修改 Ant Design 配色,在文件 vue.config.js 中,其他 less 变量覆盖参考 ant design 官方说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
css复制代码  css: {
loaderOptions: {
less: {
modifyVars: {
/* less 变量覆盖,用于自定义 ant design 主题 */

'primary-color': '#F5222D',
'link-color': '#F5222D',
'border-radius-base': '4px',
},
javascriptEnabled: true,
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【SpringBoot】 基于 OpenSAML 30 搭

发表于 2021-03-13

认证场景

一个简单的场景,用户小明访问了 通过了用户域 A认证的,访问了 客户端 应用 A,此时客户端应用 A 中挂了一个 属于用户域 B 认证的 web 应用 B. 这种情况下,用户小明打开 应用 B 后,必然要重新在用户域 B 下重新登录认证,才能访问应用 B.

image-20210312084258273.png

为解决用户小明两次登录的困扰,这种情况下就要实现用户域 A 和用户域 B 的认证互信。于是用户域 A 和用户域 B 的决定双方基于 openSAML 3.0 协议实现联邦认证。

OpenSAML 3.0 认证过程

image-20210310211852653.png

用户发起请求

用户向 Service Provider(SP) 请求资源,SP 从而可以判断,用户的这次请求是否需要进行认证。可以建立起用户的访问会话。

SP 将用户请求重定向到 Identify Provider(Idp)

如果 SP 判断 用户发起的请求需要被认证,SP 会创建一个 AuthnRequest 对象,用来指定用户应如何身份验证要求的对象。AuthRequest 对象被编码,作为 HTTP URL 参数, 通过浏览器的重定向发送到 Idp.

用户被授权

Idp 解码 AuthnRequest 并且基于此对用户进行授权。

已被授权的用户被送往 SP

如果认证成功,Idp 会将授权信息关联到 SAML Artifact 具有唯一标识的对象中。Artifact 仍然是被编码,作为 URL 参数,并且通过 HTTP Redirect 被发送回 SP.

SP 请求授权信息

SP 创建一个 含有 Artifact 对象 ArtifactResolve 的对象。ArtifactResolve 通过使用 SOAP web service 被送到 Idp.

Idp 返回授权信息

Idp 接收到 ArtifactResolve 对象 并且解压出 Artifact. Idp 创建 内含授权信息的 ArtifactResponse 对象来响应 SOAP 请求,授权信息存在

SAML Assertion 中。

基于 OpenSAML 认证的方案

用户域 A 和 用户域 B 基于 SAML 3.0 协议实现联邦认证

当用户域 A 和 用户域 B 实现联邦认证后,当小明再次从应用 A 访问,应用 B 时,用户域 A 和 用户域 B 进行联邦认证,认证成功后,小明就可以实现免密登录 应用 B.

image-20210312091329967.png

用户域 A 和 用户域 B 联邦认证具体过程

image-20210312144449544.png

  1. 应用 A 携带用户域 A 颁发的令牌及 访问应用 B 的地址,发起对 用户域 A SP 服务进行访问。
  2. 用户域 A SP 服务,接收到应用 A 的请求后,到用户域 A 的认证服务进行令牌校验。
  3. 用户域 A 认证服务认证完成后,携带 SP 的签名及用户域 A 用户信息,访问 用户域 B Idp 服务。
  4. Idp 服务验证 SP 签名通过后,获取到 用户域 A 的 SP 服务携带的用户信息,到用户域 B 的认证服务进行认证。
  5. 认证完成之后,将用户域 B 的认证信息,携带用户域 B 的签名返回到 SP 服务。
  6. SP 验签成功后,根据 步骤一携带的 目标地址进行跳转,访问应用 B

基于 OpenSAML SpringBoot 实现

实现逻辑基于 open saml 认证过程
image-20210310211852653.png

Step 1: 用户请求的拦截

定义一个 AccessFilter, 拦截用户访并且根据缓存来判断是否需要开启 SAML 认证

1
2
java复制代码@WebFilter(urlPatterns = "/api/*")
public class AccessFilter implements Filter

关键逻辑为:

doFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) request;
HttpServletResponse httpServletResponse = (HttpServletResponse) response;
String userAccount = request.getParameter(Constants.USER_ACCOUNT_KEY);

//校验用户token 是否存在缓存
if (spConsumerCacheService.checkUserIdmTokenCache(userAccount)) {
chain.doFilter(request, response);
} else {
// 建立用户 session
setGotoURLOnSession(httpServletRequest);
// 重定向用户请求进行 SAML 认证
// 构建 SAML 认证请求 AuthnRequest
AuthnRequest authnRequest = buildAuthnRequest(request);
// 重定向用户请求到 SAML IDP Server
redirectUserWithRequest(httpServletResponse, authnRequest);
}
}

Step 2: 身份验证请求

构建 AuthnRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码private AuthnRequest buildAuthnRequest(HttpServletRequest request) {
//构建 AuthnRequest
AuthnRequest authnRequest = OpenSAMLUtils.buildSAMLObject(AuthnRequest.class);
//设置请求时间
authnRequest.setIssueInstant(new DateTime());
//The binding required to transmit the resulting SAML Assertion
//绑定协议为 urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact
authnRequest.setProtocolBinding(SAMLConstants.SAML2_ARTIFACT_BINDING_URI);
//IDP server 认证地址
authnRequest.setDestination(idpEndpoint.getSSOEndPoint());
//IDP 验证成功后 接收 SAML Assertion 地址
authnRequest.setAssertionConsumerServiceURL(idpEndpoint.getSpConsumer());
//Setting ID of the request
authnRequest.setID(OpenSAMLUtils.generateSecureRandomId());
//This is the identification of the sender
//发起人身份,SP ID
authnRequest.setIssuer(buildIssuer(request));
//NameID is the IDP identifier for the user
//the requested authentication context is the SP's requirements for the authentication,
// which includes; how the SP wants the IDP to authenticate the user
authnRequest.setRequestedAuthnContext(buildRequestedAuthnContext());
LOGGER.info("AuthnRequest Object is {}", authnRequest.toString());
return authnRequest;
}

Redirect Request 到 SAML IDP Server

创建 BasicSAMLMessageContext 对象,包含

  • AuthnRequest 对象
  • SubContext 对象:
+ BindingContext 对象 —— 设置中继状态值,SP 签名材料
+ PeerEntityContext 对象 —— EndPointContext 包含 idp endpoint 信息
+ SecurityParametersContext 对象 —— SignatureSigningParameters 包含 SP 的请求签名

通过 saml2 提供的 banding encoding 实现 HTTPRedirectDeflateEncoder,进行向 Idp 重定向

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码private void redirectUserWithRequest(HttpServletResponse httpServletResponse, AuthnRequest authnRequest) {

//BasicSAMLMessageContext was used to contain all the information about the SAML message
MessageContext context = new MessageContext();
// 存放 SAML authnRequest 请求
context.setMessage(authnRequest);

SAMLBindingContext bindingContext = context.getSubcontext(SAMLBindingContext.class, true);
// 中继状态值,进行制作 SP 访问的签名
bindingContext.setRelayState(Constants.IDENTIFY_RELAY_STATE);

//A context containing information about a peer entity
// in other words, the IdP for the SP and vice versa
SAMLPeerEntityContext peerEntityContext = context.getSubcontext(SAMLPeerEntityContext.class, true);
// peer entity itself does not contain much information, but usually contain one or more sub-context,
// such as SAMLEndpointContext,SecurityParametersContext,SAMLMessageInfoContext
// SAMLEndpointContext one of sub-context - this context contains information about a specific endpoint of the peer entity
SAMLEndpointContext endpointContext = peerEntityContext.getSubcontext(SAMLEndpointContext.class, true);
endpointContext.setEndpoint(getIPDEndpoint());

//携带签名
SignatureSigningParameters signatureSigningParameters = new SignatureSigningParameters();
signatureSigningParameters.setSigningCredential(SPCredentials.getCredential());
signatureSigningParameters.setSignatureAlgorithm(SignatureConstants.ALGO_ID_SIGNATURE_RSA_SHA256);
context.getSubcontext(SecurityParametersContext.class, true).setSignatureSigningParameters(signatureSigningParameters);

// SAML 2.0 HTTP Redirect encoder using the DEFLATE encoding method.
// This encoder only supports DEFLATE compression and DSA-SHA1 and RSA-SHA1 signatures.
HTTPRedirectDeflateEncoder encoder = new HTTPRedirectDeflateEncoder();
encoder.setMessageContext(context);
encoder.setHttpServletResponse(httpServletResponse);

try {
encoder.initialize();
} catch (ComponentInitializationException e) {
throw new RuntimeException(e);
}

LOGGER.info("AuthnRequest: ");
OpenSAMLUtils.logSAMLObject(authnRequest);

LOGGER.info("Redirecting to IDP");
try {
encoder.encode();
} catch (MessageEncodingException e) {
throw new RuntimeException(e);
}
}

Step 3: Idp 身份验证

Idp server 定义一个接口,接受 Sp 发起的请求,进行用户身份校验。

1
2
3
4
5
6
7
8
java复制代码/**
* 验证签名的逻辑在 SignatureVerifyFilter 中
*
* @param request
* @param response
*/
@GetMapping("/saml/idp/sso")
public void ssoSaml(HttpServletRequest request, HttpServletResponse response)

校验签名

定义 SignatureVerifyFilter 进行 Sp 服务签名的校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码/**
* <功能描述> idp sso 签名验证过滤器
*
* @date 2021/2/27 18:04
*/
@WebFilter(urlPatterns = "/saml/idp/sso")
public class SignatureVerifyFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) request;
//获取 SP 签名
String authRequest = request.getParameter("SAMLRequest");
String signature = request.getParameter("Signature");
String signatureMethod = request.getParameter("SigAlg");
String relayState = request.getParameter("RelayState");
byte[] signatureBytes = Base64Support.decode(signature);

//获取 sigMaterial
URLBuilder urlBuilder = new URLBuilder(httpServletRequest.getRequestURL().toString());
List<Pair<String, String>> queryParams = urlBuilder.getQueryParams();
queryParams.clear();
queryParams.add(new Pair<>("SAMLRequest", authRequest));
queryParams.add(new Pair<>("RelayState", relayState));
queryParams.add(new Pair<>("SigAlg", signatureMethod));
String sigMaterial = urlBuilder.buildQueryString();

//进行签名验证
try {
boolean verified = XMLSigningUtil.verifyWithURI(SPCredentials.getCredential(), signatureMethod, signatureBytes, sigMaterial.getBytes(StandardCharsets.UTF_8));
if (!verified) {
throw new BizBaseException(ExceptionCode.SIGNATURE_VERIFIED_ERROR);
}
} catch (SecurityException e) {
throw new BizBaseException(ExceptionCode.SIGNATURE_VERIFIED_ERROR);
}
chain.doFilter(request, response);
}
}

解码 SP 发起的请求进行校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码// SAML 2.0 HTTP Redirect decoder using the DEFLATE encoding method. 
// This decoder only supports DEFLATE compression.
HTTPRedirectDeflateDecoder decoder = new HTTPRedirectDeflateDecoder();
BasicParserPool parserPool = new BasicParserPool();
try {
parserPool.initialize();
decoder.setParserPool(parserPool);
decoder.setHttpServletRequest(request);
decoder.initialize();
decoder.decode();
} catch (ComponentInitializationException | MessageDecodingException e) {
LOGGER.error("HTTP DECODE ERROR:{}", e.getMessage());
throw new BizBaseException(ExceptionCode.HTTP_DECODER);
}

//获取 SP 请求信息
MessageContext<SAMLObject> messageContext = decoder.getMessageContext();
AuthnRequest authnRequest = (AuthnRequest) messageContext.getMessage();

//获取 SP 发起者信息
IdentifyIssuerDTO issuerDTO = JacksonUtil.parse(authnRequest.getIssuer().getValue(), new TypeReference<IdentifyIssuerDTO>() {
});

//TODO 校验账号是否存在

重定向 Sp Consumer

指定 Artifact 重定向到 Sp Consumer

1
java复制代码resp.sendRedirect(SPConstants.ASSERTION_CONSUMER_SERVICE + "?SAMLart=AAQAAMFbLinlXaCM%2BFIxiDwGOLAy2T71gbpO7ZhNzAgEANlB90ECfpNEVLg%3D");

Step 4 & Step5: The Artifact and Artifact Resolution

定义 SP Consumer 接口

1
2
3
4
5
6
7
8
9
java复制代码/**
* Artifact 和 Artifact Resolution 处理逻辑在 ConsumerFilter
*
* @param userAccount
* @param request
* @return
*/
@GetMapping("/consumer")
public JsonResult<IdmToken> spAccessConsumer(@RequestParam(value = "userAccount") String userAccount, HttpServletRequest request)

**SP Consumer Filter **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {

HttpServletRequest req = (HttpServletRequest) request;
HttpServletResponse resp = (HttpServletResponse) response;

// SP 创建一个 ArtifactResolve 对象
// 附上 从 IDP Server 带上的 Artifact 对象
LOGGER.info("Artifact received");
Artifact artifact = buildArtifactFromRequest(req);
LOGGER.info("Artifact: " + artifact.getArtifact());

//从request session 上下文中获取 userAccount
String userAccount = (String) req.getSession().getAttribute(Constants.USER_ACCOUNT_KEY);
//构建 artifactResolve 对象
ArtifactResolve artifactResolve = buildArtifactResolve(artifact, userAccount);
LOGGER.info("Sending ArtifactResolve");
LOGGER.info("ArtifactResolve: ");
OpenSAMLUtils.logSAMLObject(artifactResolve);

//向 IDP 发送 ArtifactResolve 获取 Artifact 对象
//通过 web service
ArtifactResponse artifactResponse = sendAndReceiveArtifactResolve(artifactResolve, resp);
LOGGER.info("ArtifactResponse received");
LOGGER.info("ArtifactResponse: ");
OpenSAMLUtils.logSAMLObject(artifactResponse);

// 验证返回 ArtifactResponse 请求地址和请求时间
validateDestinationAndLifetime(artifactResponse, req);

// 获取存有用户信息的 Assertion 断言
EncryptedAssertion encryptedAssertion = getEncryptedAssertion(artifactResponse);
Assertion assertion = decryptAssertion(encryptedAssertion);

// 验证 IDP 签名
verifyAssertionSignature(assertion);
LOGGER.info("Decrypted Assertion: ");
OpenSAMLUtils.logSAMLObject(assertion);

//获取Assertion 用户属性
getAssertionAttributes(assertion, req);

chain.doFilter(req, resp);
}

Idp ArtifactResolution 关键逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码public void resolveArtifact(HttpServletRequest req, HttpServletResponse resp) {
LOGGER.info("recieved artifactResolve:");
//初始化 SOAP decoder
HTTPSOAP11Decoder decoder = new HTTPSOAP11Decoder();
decoder.setHttpServletRequest(req);
try {
BasicParserPool parserPool = new BasicParserPool();
parserPool.initialize();
decoder.setParserPool(parserPool);
decoder.initialize();
decoder.decode();
} catch (MessageDecodingException | ComponentInitializationException e) {
throw new BizBaseException(ExceptionCode.HTTP_SOAP11DECODER_ERROR);
}

//获取 artifactResolve 对象
ArtifactResolve artifactResolve = (ArtifactResolve) decoder.getMessageContext().getMessage();
LOGGER.info("ArtifactResolve:");
OpenSAMLUtils.logSAMLObject(artifactResolve);

//获取 SP 发起人身份
Issuer issuer = artifactResolve.getIssuer();
IdentifyIssuerDTO issuerDTO = JacksonUtil.parse(issuer.getValue(), new TypeReference<IdentifyIssuerDTO>() {
});
String userAccount = issuerDTO.getUserAccount();
String spConsumerUrl = issuerDTO.getConsumerUrl();

//构建 Artifact Response 对象
ArtifactResponse artifactResponse = buildArtifactResponse(userAccount, spConsumerUrl);
MessageContext<SAMLObject> context = new MessageContext<SAMLObject>();
context.setMessage(artifactResponse);

//发送 response 到 SP
HTTPSOAP11Encoder encoder = new HTTPSOAP11Encoder();
encoder.setMessageContext(context);
encoder.setHttpServletResponse(resp);
try {
encoder.prepareContext();
encoder.initialize();
encoder.encode();
} catch (MessageEncodingException | ComponentInitializationException e) {
throw new RuntimeException(e);
}
}

Step6: 处理结果

当整个 SAML 协议认证完成,SP 会允许用户向其访问的资源进行重定向。

1
2
3
4
5
6
7
8
9
10
11
java复制代码// 从上下文中获取用户访问的 target 地址
// 进行重定向
private void redirectToGotoURL(HttpServletRequest req, HttpServletResponse resp) {
String gotoURL = (String)req.getSession().getAttribute(SPConstants.GOTO_URL_SESSION_ATTRIBUTE);
logger.info("Redirecting to requested URL: " + gotoURL);
try {
resp.sendRedirect(gotoURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

参考资料

《A Guide To OpenSAML V3》 —— Stefan Rasmusson

[OpenSAML Sample Code](

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

为什么在做微服务设计的时候需要 DDD? 微服务的缺陷 DD

发表于 2021-03-13

记得之前在规划和设计微服务架构的时候,给了我一个至今依然记忆深刻的提示:『你的设计蓝图里为什么没有看到DDD的影子呢?』

随着对充血模型的领域认知的加深,我越加感觉到DDD的重要性。于是网上一顿海找,并做了学习笔记。

DDD内容繁多,个人浅见,它不同于传统贫血的最核心的一点就是把原先传统的贫血模型里的业务逻辑层拎出来,融入到Domain层,这样面对复杂业务的规模化变更,我们只需要专注于Domain即可。

回到主题,我们要了解的是微服务和DDD到底有什么关系呢?

因为在互联网时代,软件所面临的问题域比以往要复杂得多,这种复杂性来源于不断扩展的问题域自身,也来源于创新变化,以及这种规模性增长所带来的挑战。

然而一个人一个团队,他对复杂的事物的认知是有极限的,面对这种复杂问题唯一的方法就是分而治之。分主要考虑的是如何去分;治意味着分出来的每一个部分要能够独立的运行,能够互相的协作,完成整体的目标,能够一来应对外部变化所带来的冲击。

微服务的缺陷

微服务架构在分和治两个方面都给出了很好的理论指导和最佳实践,那微服务是不是解决复杂问题的银弹呢?其实不然,很多团队在应用了微服务架构来构建他们的系统以后,发现并没有完全解决这种复杂性问题,甚至还带来了一些其他的问题。比如:

  • 服务并没有解决复杂系统如何应对需求变化这个问题,甚至还加剧了这个问题。
  • 当一个需求变化了,需要花大量的精力去识别这个变化影响到了哪些微服务,这些服务的多个团队之间,需要通过无休止的扯皮去决定哪个服务多一些,哪些服务少改一些。
  • 然后测试团队还需要做昂贵的这种联调测试
  • 即便如此呢,开发团队依然不放心,还要通过一系列的开关控制,小心翼翼的去做切流,去做灰度发布。

从业务层面来看,微服务架构没有避免这种散弹式的修改。甚至反而加重了他,这是为什么呢?一个重要的原因是微服务架构在分的这个纬度考虑的并不全面。

DDD功用

当我们去做分的这种工作的时候,需要考虑哪些维度呢?我觉得我们至少要考虑三个维度:

  • 功能纬度
  • 质量纬度,比如性能,可用性
  • 工程纬度

微服务对第2个给出了很好的指导,对第3个也给出了一些建议。但是,对第1个功能纬度只给出来非常有限的指导,就是为什么随着微服务的流行,领域驱动设计(DDD)又被重新重视起来的原因。

DDD弥补了微服务在功能划分方面没有给出很好指导的缺陷。所以他们在面对复杂问题和构建系统时候是一种互补的关系,在系统拆分的时候可以很好的协作。

只是他们看待系统拆分这个角度是不同的。微服务当中的服务所关注的范围正是DDD所推崇的六边形架构中的领域层。

拆分案例

接下来结合DDD和微服务来拆分一个复杂系统。

关于领域

我们称企业的业务范围和在这个范围里进行的活动为领域,和软件系统无关。领域会分成多个子域,比如我们一个电商系统,会有:

  • 商品子域
  • 订单子域
  • 库存子域等等。

在不同的子域里,不同的概念有不同的含义。所以我们在进行领域建模的时候,必须要有一个明确的领域边界,也就是DDD里称做的限界上下文,它是系统内部的一个架构边界,决定了这个系统架构。

划分系统内部架构边界

架构简洁之道这本书里边就说过:『系统架构是由系统的内部架构边界以及边界之间的依赖关系所决定的,与系统中各个组件之间的通信和调用的方式是无关的』。我们常说的微服务的服务调用本身只是一种比函数调用方式成本稍高的,分割应用程序行为的一种形式,系统架构无关。

所以,复杂系统划分的第一重要的是要划分内部的架构边界,即划分清楚这个上下文,以及明确他们之间的关系,这对应于我们之前说的功能的维度。这正是DDD用武之处。其次我们才考虑基于非功能的维度如何划分,这是微服务能够发挥其优势的地方。

举个例子,我们把系统分成ABC三个个上下文,三个上下文的代码可以在一个部署单元里运行,通过进程内调用来完成操作,这就是典型的单体架构;

也可以各自在一个独立的部署单元里运行,通过远程调用来完成操作,这就是现在流行的微服务架构。

边界清晰的好处

我们更多的是两种架构模式的一个混合,比如A和B一起是一个部署单元,C是另外一个独立的部署单元,这种情况往往是因为C非常重要,他并发的访问量非常大,或者它的需求变更比较频繁。将C拆分出来的有以下几个好处:

  • 资源倾斜
  • 使用弹力设计模式:比如重试,熔断,降级
  • 使用特殊技术:比如Go语言
  • 具备独立代码库:有独立团队和运维人员,和A和B的运行期做到隔离不互相影响

这四点正是服务架构所关注的,它是基于非功能纬度的视角来看待拆分这件事情的,他关注的不是系统架构的逻辑边界,更多的关注的是应用程序行为的分隔。

那为什么不把A和B都拆成一个独立的部署单元?

这会带来更多的好处,也会带来额外的成本,架构应该是可以演进的,在业务发展的早期,应该关注系统架构的逻辑边界,保持逻辑边界的清晰和关系的正确,随着业务量的增加,逐步在做拆分,这是组合应用DDD和微服务架构带来的最大的好处。

在单体架构中,保持架构逻辑边界不被突破是有一定难度。如果逻辑边界不清晰,在需要服务器拆分的时候,就未必能拆得出来了。另外没有人一下子就可以把逻辑边界定义正确,即使这个上下文定义的不太正确,在DDD聚合根这个概念可以保障我们能够演进出更适合的上下文。

DDD界限上下文内部通过实体和值对象来对领域概念进行建模,一组实体和值子对象归属于一个聚合根。那按DDD要求

  • 聚合根用来保证内部实体规则的正确性和数据的一致性
  • 外部对象只能通过ID来引用聚合根,不能引用聚合根内部的实体
  • 聚合根之间不能共享一个数据库事务,它们之间的数据一致性需要通过最终的一致性来保障

有了聚合根,基于这些约束,未来可以根据需要把聚合根升级为上下文,甚至拆分成微服务都是比较容易的。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 多线程 漫谈 CAS

发表于 2021-03-13

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . CAS 简介

什么是 CAS ?

CAS操作 —— Compare & Set ,或是 Compare & Swap


CAS 的操作步骤是什么 ? -> 先比较 , 再设置

jdk5增加了并发包java.util.concurrent.*,其下面的类使用CAS算法实现了区别于synchronouse同步锁的一种乐观锁。

  1. CAS是一种无锁算法,CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。
  2. 当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做

CAS 的效率 ?

  • CAS(比较并交换)是CPU指令级的操作,只有一步原子操作,所以非常快
  • CAS避免了请求操作系统来裁定锁的问题

CAS 的消耗?

一个8核CPU计算机系统,每个CPU有cache(CPU内部的高速缓存,寄存器),管芯内还带有一个互联模块,使管芯内的两个核可以互相通信


当存在 cache 和 数据不在一个域中时 ?

“最好情况”是指对某一个变量执行 CAS 操作的 CPU 正好是最后一个操作该变量的CPU,所以对应的缓存线已经在 CPU 的高速缓存中了


算法假想

1
2
3
4
JAVA复制代码do{   
       备份旧数据;  
       基于旧数据构造新数据;  
}while(!CAS( 内存地址,备份的旧数据,新数据 ))

二 . CAS 的缺陷

问题一 : ABA 问题

  1. 一个线程 one 从内存位置 V 中取出 A
  2. 另一个线程 two 也从内存中取出 A ,并且 two 进行了一些操作变成了 B
  3. two 又将 V 位置的数据变成 A ,这时候线程 one 进行 CAS 操作发现内存中仍然是 A
  4. one 操作成功。
  • 尽管线程 one 的 CAS 操作成功,但可能存在潜藏的问题。
  • 从 Java5 开始 JDK 的 atomic包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。
+ AtomicStampedReference 通过包装 [E,Integer] 的元组,来对对象标记版本戳 stamp

问题二 : 循环时间长开销大

对于资源竞争严重(线程冲突严重)的情况,CAS 自旋的概率会比较大,从而浪费更多的 CPU 资源,效率低于 synchronized。

问题三 : 只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁。

三 . CAS 深入分析

在 CAS 中有三个参数:内存值 V、旧的预期值 A、要更新的值 B ,当且仅当内存值 V 的值等于旧的预期值 A 时,才会将内存值V的值修改为 B ,否则什么都不干

主要类 : Unsafe

Unsafe 是 CAS 的核心类 , 他提供了硬件级别得原子操作 (其他情况下Java 需要通过本地 Native 方法访问底层操作系统)

  • unsafe.objectFieldOffset
  • getAndAddInt -> compareAndSwapInt(Object var1, long var2, int var4, int var5)
    • 对以下四个值进行了比较判断 : 对象、对象的地址、预期值、修改值

CPU 的原子操作: CPU 提供了两种方法来实现多处理器的原子操作:总线加锁或者缓存加锁

总线加锁:

总线加锁就是就是使用处理器提供的一个 LOCK# 信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占使用共享内存。但是这种处理方式显得有点儿霸道,不厚道,他把CPU和内存之间的通信锁住了,在锁定期间,其他处理器都不能其他内存地址的数据,其开销有点儿大。所以就有了缓存加锁。

缓存加锁:

其实针对于上面那种情况,我们只需要保证在同一时刻,对某个内存地址的操作是原子性的即可。缓存加锁,就是缓存在内存区域的数据如果在加锁期间,当它执行锁操作写回内存时,处理器不再输出LOCK# 信号,而是修改内部的内存地址,利用缓存一致性协议来保证原子性。缓存一致性机制可以保证同一个内存区域的数据仅能被一个处理器修改,也就是说当 CPU1 修改缓存行中的 i 时使用缓存锁定,那么 CPU2 就不能同时缓存了 i 的缓存行。

CAS 主要实现得方式 :

  • AtomicInteger
    • addAndGet()

四 . CAS CPU 的查询操作

@ blog.csdn.net/youanyyou/a…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
JAVA复制代码     
// CPU 结构简述 :
- 每个CPU有cache(CPU内部的高速缓存,寄存器)
- 管芯内还带有一个互联模块,使管芯内的两个核可以互相通信
- 系统互联模块可以让四个管芯相互通信,并且将管芯与主存连接起来

// 数据流动
- 数据以“缓存线”为单位在系统中传输,“缓存线”对应于内存中一个 2 的幂大小的字节块,大小通常为 32 到 256 字节之间
- 当 CPU 从内存中读取一个变量到它的寄存器中时,必须首先将包含了该变量的缓存线读取到 CPU 高速缓存
- CPU 将寄存器中的一个值存储到内存时,不仅必须将包含了该值的缓存线读到 CPU 高速缓存,
还必须确保没有其他 CPU 拥有该缓存线的拷贝

// 转变流程
• CPU0 检查本地高速缓存,没有找到缓存线。
• 请求被转发到 CPU0 和 CPU1 的互联模块,检查 CPU1 的本地高速缓存,没有找到缓存线。
• 请求被转发到系统互联模块,检查其他三个管芯,得知缓存线被 CPU6和 CPU7 所在的管芯持有。
• 请求被转发到 CPU6 和 CPU7 的互联模块,检查这两个 CPU 的高速缓存,在 CPU7 的高速缓存中找到缓存线。
• CPU7 将缓存线发送给所属的互联模块,并且刷新自己高速缓存中的缓存线。
• CPU6 和 CPU7 的互联模块将缓存线发送给系统互联模块。
• 系统互联模块将缓存线发送给 CPU0 和 CPU1 的互联模块。
• CPU0 和 CPU1 的互联模块将缓存线发送给 CPU0 的高速缓存。
• CPU0 现在可以对高速缓存中的变量执行 CAS 操作了

//效率问题
最好情况 : 某一个变量执行 CAS 操作的 CPU 正好是最后一个操作该变量的CPU

五 . CAS 初级原理

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码// Java 里面 CAS 操作主要通过 Native 方法完成 , 主要的操作对象有 : Unsafe 

C- Unsafe
- 提供了硬件级别的原子操作
- 对于Unsafe类的使用都是受限制的,只有授信的代码才能获得该类的实例
M- public native long allocateMemory(long paramLong) :
M- public native long reallocateMemory(long paramLong1, long paramLong2) : 扩充内存
M- public native void freeMemory(long paramLong) : 释放内存


// 例如 AQS 里面 :
> AbstractQueuedSynchronizer
return unsafe.compareAndSwapObject(this, headOffset, null, update);

六 . CAS 深入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@ https://blog.csdn.net/qq_37113604/article/details/81582784

// .c 文件 sun.misc.Unsafe
public final native boolean compareAndSwapInt(Object o, long offset,int expected, int x);

// C 源码
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}

总结 :

  1. 程序会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。
  2. 如果程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(lock cmpxchg)。
  3. 反之,如果程序是在单处理器上运行,就省略lock前缀
    ?- (单处理器自身会维护单处理器内的顺序一致性,不需要lock前缀提供的内存屏障效果)。

附录 : Git 源码

源码

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 多线程 细说线程状态

发表于 2021-03-12

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

前言

这一篇说一说线程状态

一. 线程等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码// 等待具体时间
> sleep(time)
// 该方式不释放锁 ,低优先级有机会执行
// sleep 后转入 阻塞(blocked)
> wait(time)
> join(time)
> LockSupport.parkNanos()
> LockSupport.parkUnit()
> yield
// 该方式同样不会释放锁 ,同优先级及高优先级执行
// 执行后转入ready

// 仅 进入等待
> wait()
> join()
> LockSuppot.park()

二. 线程通知

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码// 对于设定具体等待时间的 timeout 后自动转入就绪

// 其他等待
> notify()
> notifyAll()

> 不同线程之间采用字符串作为监视器锁,会唤醒别的线程
> 不同线程之间的信号没有共享,等待线程被唤醒后继续进入wait状态:



> 下图为不同线程的等待与唤醒

notify.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码> 执行wait () 时释放锁 , 否则等待的线程如果继续持有锁 , 其他线程就没办法获取锁 , 会陷入死锁

// Wait - Notify 深入知识点
// 一 : Wait 等待知识点
- 当前线程必须拥有这个对象的监视器


// 二 : Wait 等待后
- 执行等待后 , 当前线程将自己放在该对象的等待集中,然后放弃该对象上的所有同步声明
- 如果当前线程在等待之前或等待期间被任何线程中断 , 会抛出 InterruptedException 异常


// 三 : 唤醒时注意点
- Notify 唤醒一个正在等待这个对象的线程监控 (monitor)
- 执行 wait 时会释放锁 , 同时执行 notify 后也会释放锁 (如下图)
- notify 会任意选择一个等待对象来提醒

// 四 : 唤醒后知识点
- 线程唤醒后 , 仍然要等待该对象的锁被释放
- 线程唤醒后 , 将会与任何竞争该锁的对象公平竞争


// 假醒 :
线程也可以在不被通知、中断或超时的情况下被唤醒,这就是所谓的伪唤醒。

notify2.jpg

三. 线程中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码> interrupt() 
// 方法,用于中断线程。调用该方法的线程的状态为将被置为”中断”状态。

> interrupted ()
// 查询当前线程的中断状态,并且清除原状态。

> isInterrupted ()
// 查询指定线程的中断状态,不会清除原状态+

// interrupt() 方法干了什么 ?
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}

// 1 checkAccess() : 其中涉及到 SecurityManager , 所以我们先看看这个类干什么的
- SecurityManager security = System.getSecurityManager();
- security.checkAccess(this);

C- SecurityManager :
?- 这是 Java.lang 底下的一个类

四. 线程死锁

死锁简介 : 当多个进程竞争资源时互相等待对方的资源

死锁的条件 :

  • 互斥条件 : 一个资源每次只能被一个进程使用,即在一段时间内某 资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。
  • 请求与保持条件 :进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源 已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。
  • 不可剥夺条件 : 进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能 由获得该资源的进程自己来释放(只能是主动释放)。
  • 循环等待条件 : 若干进程间形成首尾相接循环等待资源的关系
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码// 资源的分类
- 可抢占资源 : 可抢占资源指某进程在获得这类资源后,该资源可以再被其他进程或系统抢占 , 例如 CPU 资源
- 不可抢占资源

// 死锁的常见原因 :
- 竞争不可抢占资源引起死锁 (共享文件)
- 竞争可消耗资源引起死 (程通信时)
- 进程推进顺序不当引起死锁

// 死锁的预防
- 通过系统中尽量破坏死锁的条件 , 当四大条件有一个不符合时 , 死锁就不会发生
- 通过加锁顺序处理(线程按照一定的顺序加锁)
- 加锁时限(线程尝试获取锁的时候加上一定的时限,超过时限则放弃对该锁的请求,并释放自己占有的锁)
- 死锁检测

// 死锁的解除
- 资源剥离 , 挂起死锁进程且强制对应资源 , 分配进程
- 撤销进程
- 回退进程

五. 线程热锁

1
2
java复制代码> 热锁不算一个物理概念 , 它表示线程在频繁的竞争资源并且资源在频繁的切换\
- 循环等待 :

六. 线程的状态及转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
JAVA复制代码> 线程有以下状态
- NEW : 尚未启动的线程的hread状态
- RUNNABLE : 可运行 , 从虚拟机的角度 , 已经执行 ,但是可能正在等待资源
- BLOCKED : 阻塞 , 此时等待 monitor锁 , 以读取 synchronized 代码
- WAITING : 等待状态 , 处于等待状态的线程正在等待另一个线程执行特定操作
- wait()
- join()
- LockSupport#park()
- TIMED_WAITING : 指定等待时间的等待
- Thread.sleep
- wait(long)
- join(long)
- LockSupport#parkNanos
- LockSupport#parkUntil
- TERMINATED : 终止线程

// 线程间状态改变的方式
• 还没起床:sleeping 。
• 起床收拾好了,随时可以坐地铁出发:Runnable 。
• 等地铁来:Waiting 。
• 地铁来了,但要排队上地铁:I/O 阻塞 。
• 上了地铁,发现暂时没座位:synchronized 阻塞。
• 地铁上找到座位:Running 。
• 到达目的地:Dead 。

状态的转换.jpg.png

xianc.png

七. 状态转换的原理

7.1 wait 与 notify 原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
java复制代码// 节点一 : 你是否发现 , wait 和 notify 是 object 的方法
点开 wait 和 notify 方法就能发现 , 这两个方法是基于 Object 对象的 , 所以我们要理解 ,通知不是通知的线程 ,而是通知的对象
这也就是为什么 , 不要用常量作为通知对象


// 节点二 : java.lang.IllegalMonitorStateException
当我们 wait/notify 时 , 如果未获取到对应对象的 Monitor , 实际上我们会抛出 IllegalMonitorStateException
所以你先要获得监视器 , 有三种方式 :
- 通过执行该对象的同步实例方法。
- 通过执行在对象上同步语句体。
- 对于类型为Class的对象,可以执行该类的同步静态方法。


// 节点三 : 如何进行转换 ?
Step 1 : 首先看 Object 对象 Native 方法 , bative src 中搜索名字即可

static JNINativeMethod methods[] = {
{"hashCode", "()I", (void *)&JVM_IHashCode},
{"wait", "(J)V", (void *)&JVM_MonitorWait},
{"notify", "()V", (void *)&JVM_MonitorNotify},
{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},
{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};

//可以看到这里分别调用了 JVM_MonitorWait , JVM_MonitorNotify , JVM_MonitorNotifyAll ,
//从名字就能看到 , 这里是和Monitor 有关的

Step 2 : 进入全路径了解 : \openjdk\hotspot\src\share\vm\prims
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
JVMWrapper("JVM_MonitorWait");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
JavaThreadInObjectWaitState jtiows(thread, ms != 0);
if (JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
// 当前线程已经拥有监视器,并且还没有添加到等待队列中,因此当前线程不能成为后续线程
}
ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END

Step 3 : ObjectSynchronizer::wait(obj, ms, CHECK);
// TODO : 看不懂了呀....先留个坑
// 总得来说就是 ObjectMonitor通过一个双向链表来保存等待该锁的线程

Step End : link by @ https://www.jianshu.com/p/a604f1a9f875
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
...................
// 创建ObjectWaiter,添加到_WaitSet队列中
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag

//WaitSetLock保护等待队列。通常只锁的拥有着才能访问等待队列
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
//加入等待队列,等待队列是循环双链表
AddWaiter (&node) ;
//使用的是一个简单的自旋锁
Thread::SpinRelease (&_WaitSetLock) ;
.....................
}

7.2 Thread run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
java复制代码// 节点一 : 区别 run 和 start 
run 是通过方法栈直接调用对象的方法 , 而 Start 才是开启线程 , 这一点我们可以从源码中发现 :
- start 方法是同步的
- start0 是一个 native 方法
- group 是线程组 (ThreadGroup) , 线程可以访问关于它自己线程组的信息
?- 线程组主要是为了管理线程 , 将一个大线程分成多个小线程 (盲猜 fork 用到了 , 后面验证一下)
?- 线程组也可以通过关闭组来关闭所有的线程

public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);

boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}

// Step 1 : Thread.c 结构 -> openjdk\src\native\java\lang
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

// \openjdk\hotspot\src\share\vm\prims\jvm.cpp
// Step 2 : JVM_StartThread , 翻译了一下 , 大概可以看到那一句 native_thread = new JavaThread(&thread_entry, sz);
// 以及最后的 Thread::start(native_thread);
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;

//由于排序问题,在抛出异常时不能持有Threads_lock。示例:在构造异常时,可能需要获取heap_lock。
bool throw_illegal_thread_state = false;

// 我们必须释放Threads_lock才能在Thread::start中post jvmti事件
{
// 确保c++ Thread和OSThread结构体在操作之前没有被释放
MutexLocker mu(Threads_lock);

//从JDK 5开始threadStatus用于防止重新启动一个已经启动的线程,所以我们通常会发现javthread是null。然而,对于JNI附加的线程,在创建的线程对象(带有javthread集)和更新其线程状态之间有一个小窗口,因此我们必须检查这一点
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
//我们也可以检查stillborn标志来查看这个线程是否已经停止,但是由于历史原因,我们让线程在开始运行时检测它自己
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
//分配c++线程结构并创建原生线程。
//从java检索到的堆栈大小是有符号的,但是构造函数接受size_t(无符号类型),因此避免传递负值,因为这会导致非常大的堆栈。
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);
// 此时可能由于缺少内存而没有为javthread创建osthread。检查这种情况并在必要时抛出异常。
// 最终,我们可能想要更改这一点,以便只在线程成功创建时才获取锁——然后我们也可以执行这个检查并在javthread构造函数中抛出异常。
if (native_thread->osthread() != NULL) {
// 注意:当前线程没有在“prepare”中使用。
native_thread->prepare(jthread);
}
}
}

if (throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}

assert(native_thread != NULL, "Starting null thread?");

if (native_thread->osthread() == NULL) {
// No one should hold a reference to the 'native_thread'.
delete native_thread;
if (JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted(
JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
"unable to create new native thread");
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}

Thread::start(native_thread);

JVM_END

7.3 Thread yield

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码C- Thread
M- yield() : 可以看到 , yield 同样是一个 native 方法

// Step 1: \openjdk\hotspot\src\share\vm\prims\jvm.cpp
// 主要是2句 : os::sleep(thread, MinSleepInterval, false);
// os::yield();
JVM_ENTRY(void, JVM_Yield(JNIEnv *env, jclass threadClass))
JVMWrapper("JVM_Yield");
if (os::dont_yield()) return;
#ifndef USDT2
HS_DTRACE_PROBE0(hotspot, thread__yield);
#else /* USDT2 */
HOTSPOT_THREAD_YIELD();
#endif /* USDT2 */
// 当ConvertYieldToSleep关闭(默认)时,这与传统VM的yield使用相匹配。对于类似的线程行为至关重要
if (ConvertYieldToSleep) {
os::sleep(thread, MinSleepInterval, false);
} else {
os::yield();
}
JVM_END


// TODO : 主要的其实还没有看懂 , 毕竟 C基础有限

总结

很多东西还是没完全说清楚 , 后面还得继续完善

更新记录

  • 20210727 : 完善代码 , 优化格局

致谢

1
2
3
4
5
6
7
8
9
10
11
java复制代码芋道源码 : http://www.iocoder.cn/JUC/sike/aqs-3/

https://mp.weixin.qq.com/s?__biz=MzIxOTI1NTk5Nw==&mid=2650047475&idx=1&sn=4349ee6ac5e882c536238ed1237e5ba2&chksm=8fde2621b8a9af37df7d0c0a7ef3178d0253abf1e76a682134fce2f2218c93337c7de57835b7&scene=21#wechat_redirect

https://blog.csdn.net/javazejian/article/details/70768369

死磕系列 , 我的多线程导师
http://cmsblogs.com/?cat=151

// JVM 源码 C
https://www.jianshu.com/p/a604f1a9f875

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【翻译】Reactor Netty参考指南 - 4TCP客

发表于 2021-03-12

Reactor Netty参考指南目录


原文地址

Reactor Netty提供了易于使用、易于配置的TcpClient。它隐藏了创建TCP客户端所需的大部分Netty的功能,并增加了Reactive Streams背压。

4.1.连接和断开

要将TCP客户端连接到给定的端点,您必须创建并且配置一个TcpClient实例。默认情况下,host为localhost,post为12012。下面是创建一个TcpClient的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create() //<1>
> .connectNow(); //<2>
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 创建一个TcpClient实例用来进行的配置操作。

<2> 用阻塞的方式进行连接操作,并且等待它初始化完成。

返回的Connection对象提供了简单的连接相关的API,包括disposeNow(),调用这个方法会以阻塞的方式关闭客户端。

4.1.1.Host和Port

想要连接特定的host和port,您可以使用以下方式来配置TCP客户端。示例如下:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com") //<1>
> .port(80) //<2>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 配置TCP的host

<2> 配置TCP的port

4.2.预先初始化

默认情况下,TcpClient初始化资源的操作在需要使用的时候才进行。这意味着初始化加载的时候connect operation会占用额外的时间:

  • 事件循环组
  • 主机名解析器
  • native传输库(当使用了native传输的时候)
  • 用于安全性的native库(使用了OpenSsl的时候)

当您需要预加载这些资源的时候,您可以按照以下方式来配置TcpClient:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
> java复制代码import reactor.core.publisher.Mono;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> TcpClient tcpClient =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello")));
>
> tcpClient.warmup() //<1>
> .block();
>
> Connection connection = tcpClient.connectNow(); //<2>
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 初始化和加载事件循环组,主机名解析器,native传输库和用于安全性的native库

<2> 在连接远程节点的时候会进行主机名解析

4.3.写出数据

如果要发送数据到一个已有的端点,您必须添加一个I/O处理器。这个I/O处理器可以通过NettyOutbound来写出数据。

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> java复制代码import reactor.core.publisher.Mono;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 发送hello字符串给这个端点。

4.4.消费数据

如果要接收从已有端点发过来的数据,您必须添加一个I/O处理器。这个I/O处理器可以通过NettyInbound来读取数据。示例如下:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .handle((inbound, outbound) -> inbound.receive().then()) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 接收从已有端点发送过来的数据

4.5.生命周期回调

下面的生命周期回调用参数是提供给您用来扩展TcpClient的:

Callback Description
doAfterResolve 在成功解析远程地址之后调用。
doOnChannelInit 在初始化channel的时候调用。
doOnConnect 当channel将要连接的时候调用。
doOnConnected 当channel已经连接上的时候调用。
doOnDisconnected 当channel断开的时候被调用。
doOnResolve 当远程地址将要被解析的时候被调用。
doOnResolveError 在远程地址解析失败的情况下被调用。

下面是使用doOnConnected和doOnChannelInit回调的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
> java复制代码import io.netty.handler.logging.LoggingHandler;
> import io.netty.handler.timeout.ReadTimeoutHandler;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
> import java.util.concurrent.TimeUnit;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .doOnConnected(conn ->
> conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) //<1>
> .doOnChannelInit((observer, channel, remoteAddress) ->
> channel.pipeline()
> .addFirst(new LoggingHandler("reactor.netty.examples")))//<2>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 当一个channel连接上之后添加了一个ReadTimeoutHandler到Netty pipeline。

<2> 当初始化channel的时候添加了一个LoggingHandler到Netty pipeline。

4.6.TCP层的配置

这一章节描述了三种TCP层的配置方式:

  • Channel Options
  • Wire Logger
  • Event Loop Group

4.6.1.Channel Options

默认情况下,TCP客户端配置了以下options:

./../../reactor-netty-core/src/main/java/reactor/netty/tcp/TcpClientConnect.java

1
2
3
4
5
6
7
8
> java复制代码TcpClientConnect(ConnectionProvider provider) {
> this.config = new TcpClientConfig(
> provider,
> Collections.singletonMap(ChannelOption.AUTO_READ, false),
> () -> AddressUtils.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
> }
>
>

如果需要添加新的option或者修改已有的option,您可以使用如下的方式:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> java复制代码import io.netty.channel.ChannelOption;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

您可以通过以下的链接找到更多关于Nettychannel options的信息:

  • ChannelOption
  • Socket Options

4.6.2.Wire Logger

Reactor Netty提供了线路记录(wire logging)用来检查点对点的流量。默认情况下,线路记录是关闭的。如果想要开启它,您必须将日志reactor.netty.tcp.TcpClient的设置为DEBUG等级并且按如下方式进行配置:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .wiretap(true) //<1>
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 开启线路记录

默认情况下,线路记录在输出内容的时候会使用AdvancedByteBufFormat#HEX_DUMP。您也可以通过配置TcpClient改为AdvancedByteBufFormat#SIMPLE或者AdvancedByteBufFormat#TEXTUAL:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
> java复制代码import io.netty.handler.logging.LogLevel;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
> import reactor.netty.transport.logging.AdvancedByteBufFormat;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) //<1>
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 开启线路记录并使用AdvancedByteBufFormat#TEXTUAL来输出内容。

4.6.3.Event Loop Group

默认情况下,TCP客户端使用一个”Event Loop Group”,工作线程数等于初始化的时候可以用的处理器数量(但最小是4)。您也可以使用LoopResource#create其中的一个方法来修改配置。

默认的Event Loop Group配置如下:

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
> java复制代码/**
> * Default worker thread count, fallback to available processor
> * (but with a minimum value of 4)
> */
> public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
> /**
> * Default selector thread count, fallback to -1 (no selector thread)
> */
> public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
> /**
> * Default worker thread count for UDP, fallback to available processor
> * (but with a minimum value of 4)
> */
> public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
> /**
> * Default quiet period that guarantees that the disposal of the underlying LoopResources
> * will not happen, fallback to 2 seconds.
> */
> public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
> /**
> * Default maximum amount of time to wait until the disposal of the underlying LoopResources
> * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
> */
> public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
>
> /**
> * Default value whether the native transport (epoll, kqueue) will be preferred,
> * fallback it will be preferred when available
> */
> public static final String NATIVE = "reactor.netty.native";
>
>

如果需要修改这些设置,您也可以通过如下方式进行配置:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> java复制代码import reactor.netty.Connection;
> import reactor.netty.resources.LoopResources;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
>
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .runOn(loop)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

4.7.连接池

默认情况下,TCP客户端使用一个”固定的”的连接池,最大的channel数量是500,待处理队列中最大的获取注册请求数为1000(其余配置请查看下面的系统属性)。这意味着,如果有人尝试从池中获取一个channel,但是池中没有可用的channel则会创建一个新的channel。当池中的channel数量达到了最大值时,新的获取channel的操作会被延迟,直到一个可用的channel再次返回到池中。

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
> java复制代码/**
> * Default max connections. Fallback to
> * available number of processors (but with a minimum value of 16)
> */
> public static final String POOL_MAX_CONNECTIONS = "reactor.netty.pool.maxConnections";
> /**
> * Default acquisition timeout (milliseconds) before error. If -1 will never wait to
> * acquire before opening a new
> * connection in an unbounded fashion. Fallback 45 seconds
> */
> public static final String POOL_ACQUIRE_TIMEOUT = "reactor.netty.pool.acquireTimeout";
> /**
> * Default max idle time, fallback - max idle time is not specified.
> */
> public static final String POOL_MAX_IDLE_TIME = "reactor.netty.pool.maxIdleTime";
> /**
> * Default max life time, fallback - max life time is not specified.
> */
> public static final String POOL_MAX_LIFE_TIME = "reactor.netty.pool.maxLifeTime";
> /**
> * Default leasing strategy (fifo, lifo), fallback to fifo.
> * <ul>
> * <li>fifo - The connection selection is first in, first out</li>
> * <li>lifo - The connection selection is last in, first out</li>
> * </ul>
> */
> public static final String POOL_LEASING_STRATEGY = "reactor.netty.pool.leasingStrategy";
> /**
> * Default {@code getPermitsSamplingRate} (between 0d and 1d (percentage))
> * to be used with a {@link SamplingAllocationStrategy}.
> * This strategy wraps a {@link PoolBuilder#sizeBetween(int, int) sizeBetween} {@link AllocationStrategy}
> * and samples calls to {@link AllocationStrategy#getPermits(int)}.
> * Fallback - sampling is not enabled.
> */
> public static final String POOL_GET_PERMITS_SAMPLING_RATE = "reactor.netty.pool.getPermitsSamplingRate";
> /**
> * Default {@code returnPermitsSamplingRate} (between 0d and 1d (percentage))
> * to be used with a {@link SamplingAllocationStrategy}.
> * This strategy wraps a {@link PoolBuilder#sizeBetween(int, int) sizeBetween} {@link AllocationStrategy}
> * and samples calls to {@link AllocationStrategy#returnPermits(int)}.
> * Fallback - sampling is not enabled.
> */
> public static final String POOL_RETURN_PERMITS_SAMPLING_RATE = "reactor.netty.pool.returnPermitsSamplingRate";
>
>

如果您需要禁用连接池,您可以使用如下配置:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.newConnection()
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

如果您需要为连接池中的channel设置一个特定的空闲时间,您可以使用如下配置:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
> java复制代码import reactor.netty.Connection;
> import reactor.netty.resources.ConnectionProvider;
> import reactor.netty.tcp.TcpClient;
> import java.time.Duration;
>
> public class Application {
>
> public static void main(String[] args) {
> ConnectionProvider provider =
> ConnectionProvider.builder("fixed")
> .maxConnections(50)
> .pendingAcquireTimeout(Duration.ofMillis(30000))
> .maxIdleTime(Duration.ofMillis(60))
> .build();
>
> Connection connection =
> TcpClient.create(provider)
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

当您期望负载很高时,需要谨慎使用一个最大连接数很高的连接池。因为您可以会遇到reactor.netty.http.client.PrematureCloseException异常,其根本原因是太多并发连接的opened/acquired操作导致的”Connect Timeout”。

4.7.1.度量

池化的ConnectionProvider提供了与Micrometer的内建集成。它暴露了所有前缀为reactor.netty.connection.provider的度量。

池化的ConnectionProvider度量

度量名称 类型 描述
reactor.netty.connection.provider.total.connections Gauge 所有连接的数,包括活跃的和空闲的
reactor.netty.connection.provider.active.connections Gauge 已经被成功获取了并且正在使用的连接数
reactor.netty.connection.provider.idle.connections Gauge 空闲的连接数
reactor.netty.connection.provider.pending.connections Gauge 正在等待可用连接的请求数

下面是开启集成的度量的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
> java复制代码import reactor.netty.Connection;
> import reactor.netty.resources.ConnectionProvider;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> ConnectionProvider provider =
> ConnectionProvider.builder("fixed")
> .maxConnections(50)
> .metrics(true) //<1>
> .build();
>
> Connection connection =
> TcpClient.create(provider)
> .host("example.com")
> .port(80)
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 开启内建集成的Micrometer

4.8.SSL和TLS

当您需要使用SSL或者TLS的时候,可以使用下面列出来方式进行配置。默认情况,如果OpenSSL可用的话,则使用SslProvider.OPENSSL。否则使用SslProvider.JDK。可以通过SslContextBuilder或者设置-Dio.netty.handler.ssl.noOpenSsl=true来进行切换。

下面的是使用SslContextBuilder的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> java复制代码import io.netty.handler.ssl.SslContextBuilder;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
>
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(443)
> .secure(spec -> spec.sslContext(sslContextBuilder))
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

4.8.1.服务器名称标识

默认情况下,TCP客户端将远程主机名作为SNI服务器名发送。当您需要修改默认设置的时候,您可以通过如下方式配置TCP客户端:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
> java复制代码import io.netty.handler.ssl.SslContext;
> import io.netty.handler.ssl.SslContextBuilder;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> import javax.net.ssl.SNIHostName;
>
> public class Application {
>
> public static void main(String[] args) throws Exception {
> SslContext sslContext = SslContextBuilder.forClient().build();
>
> Connection connection =
> TcpClient.create()
> .host("127.0.0.1")
> .port(8080)
> .secure(spec -> spec.sslContext(sslContext)
> .serverNames(new SNIHostName("test.com")))
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

4.9.代理支持

TCP客户端支持Netty提供的代理功能,并通过ProxyProvider构建器提供了一种特定的”非代理主机”的方式。下面是使用ProxyProvider的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> java复制代码import reactor.netty.Connection;
> import reactor.netty.transport.ProxyProvider;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .proxy(spec -> spec.type(ProxyProvider.Proxy.SOCKS4)
> .host("proxy")
> .port(8080)
> .nonProxyHosts("localhost"))
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

4.10.度量

TCP客户端支持与Micrometer的内置集成。它暴露了所有前缀为reactor.netty.tcp.client的度量。

下面的表格提供了TCP客户端度量的相关信息:

度量名称 类型 描述
reactor.netty.tcp.client.data.received DistributionSummary 收到的数据量,以字节为单位
reactor.netty.tcp.client.data.sent DistributionSummary 发送的数据量,以字节为单位
reactor.netty.tcp.client.errors Counter 发生的错误数量
reactor.netty.tcp.client.tls.handshake.time Timer TLS握手所花费的时间
reactor.netty.tcp.client.connect.time Timer 连接远程地址所花费的时间
reactor.netty.tcp.client.address.resolver Timer 解析远程地址花费的时间

下面额外的度量也是可用的:

池化的ConnectionProvider度量

度量名称 类型 描述
reactor.netty.connection.provider.total.connections Gauge 所有连接的数,包括活跃的和空闲的
reactor.netty.connection.provider.active.connections Gauge 已经被成功获取了并且正在使用的连接数
reactor.netty.connection.provider.idle.connections Gauge 空闲的连接数
reactor.netty.connection.provider.pending.connections Gauge 正在等待可用连接的请求数

ByteBufAllocator度量

度量名称 类型 描述
reactor.netty.bytebuf.allocator.used.heap.memory Gauge 堆内存的字节数
reactor.netty.bytebuf.allocator.used.direct.memory Gauge 堆外内存的字节数
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge 堆内存的个数(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge 堆外内存的个数(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge threadlocal的缓存数量(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge 微小缓存的大小(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge 小缓存的大小(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge 一般缓存的大小(当使用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge 一个区域的块大小(当使用PooledByteBufAllocator的时候)

下面是开启集成的度量的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .metrics(true) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 开启内建集成的Micrometer

如果您想让TCP客户端度量与除了Micrometer之外的系统集成或者想提供自己与Micrometer的集成来添加自己的度量记录器,您可以按如下方式实现:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
> java复制代码import reactor.netty.Connection;
> import reactor.netty.channel.ChannelMetricsRecorder;
> import reactor.netty.tcp.TcpClient;
>
> import java.net.SocketAddress;
> import java.time.Duration;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .metrics(true, CustomChannelMetricsRecorder::new) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
>
> }
>
>

<1> 开启TCP客户端度量并且提供ChannelMetricsRecorder的实现。

4.11.Unix域套接字

当使用本地传输时,TCP客户端支持Unix域套接字(UDS)。

下面是使用UDS的例子:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> java复制代码import io.netty.channel.unix.DomainSocketAddress;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .remoteAddress(() -> new DomainSocketAddress("/tmp/test.sock")) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 指定将使用的DomainSocketAddress

4.12.主机名解析

默认情况下,TcpClient使用Netty的域名查询机制来异步解析域名。用来替代JVM内置阻塞解析器。

当您需要修改默认设置的时候,您可以像如下通过配置TcpClient来实现:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
> java复制代码import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> import java.time.Duration;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .resolver(spec -> spec.queryTimeout(Duration.ofMillis(500))) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 解析器每次执行DNS查询的超时时间设置为500ms。

下面的列表展示了可用的配置:

配置名称 描述
cacheMaxTimeToLive DNS资源记录缓存的最大存活时间(单位:秒)。如果DNS服务器返回的DNS资源记录的存活时间大于这个最大存活时间。该解析器将忽略来自DNS服务器的存活时间,并使用这个最大存活时间。默认为Integer.MAX_VALUE
cacheMinTimeToLive DNS资源记录缓存的最小存活时间(单位:秒)。如果 DNS 服务器返回的 DNS 资源记录的存活时间小于这个最小存活时间,则此解析器将忽略来自DNS服务器的存活时间并使用这个最小存活时间。默认:0。
cacheNegativeTimeToLive DNS查询失败的缓存时间(单位:秒)。默认:0。
disableOptionalRecord 禁用自动包含的可选设置,该设置会试图告诉远程DNS服务器解析器每次响应可以读取多少数据。默认情况下,该设置为启用状态。
disableRecursionDesired 用于指定该解析器在查询DNS的时候会不会带上期望递归查询(RD)的标志,默认情况下,该设置为启用状态。
maxPayloadSize 设置数据报包缓冲区的容量(单位:字节)。 默认:4096
maxQueriesPerResolve 设置解析主机名允许发送的最大DNS查询次数。默认:16
ndots 设置在进行初始化绝对查询的时候,名称中必须出现的点的数量。默认值:-1(用于判断Unix操作系统的值,否则使用1)。
queryTimeout 设置该解析器每次DNS查询的超时时间(单位:毫秒)。默认:5000
resolvedAddressTypes 解析地址的协议族列表。
roundRobinSelection 启用DnsNameResolver的AddressResolverGroup,用于当命名服务器提供了多个地址的时候支持随机选择目标地址。参见RoundRobinDnsAddressResolverGroup。默认:DnsAddressResolverGroup。
runOn 在给定的LoopResources上执行与DNS服务器的通信。默认情况下,LoopResources只在客户端上被使用。
searchDomains 解析器的搜索域列表。默认情况下,有效搜索域列表是使用的系统DNS搜索域。
trace 在解析器在解析失败时生成详细的跟踪信息时使用的日志记录器和日志级别。

有时候,您或许想切换为JVM内建的解析器。您可以通过如下配置TcpClient的方式来实现:

github.com/reactor/rea…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> java复制代码import io.netty.resolver.DefaultAddressResolverGroup;
> import reactor.netty.Connection;
> import reactor.netty.tcp.TcpClient;
>
> public class Application {
>
> public static void main(String[] args) {
> Connection connection =
> TcpClient.create()
> .host("example.com")
> .port(80)
> .resolver(DefaultAddressResolverGroup.INSTANCE) //<1>
> .connectNow();
>
> connection.onDispose()
> .block();
> }
> }
>
>

<1> 设置为JVM内建的解析器。

Suggest Edit to “TCP Client“


Reactor Netty参考指南目录


版权声明:如需转载,请带上本文链接、注明来源和本声明。否则将追究法律责任。https://www.immuthex.com/posts/reactor-netty-reference-guide/tcp-client

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官:Netty的线程模型可不只是主从多Reactor这么

发表于 2021-03-12

笔者看来Netty的内核主要包括如下图三个部分:

在这里插入图片描述

其各个核心模块主要的职责如下:

  • 内存管理

主要提高高效的内存管理,包含内存分配,内存回收。

  • 网通通道

复制网络通信,例如实现对NIO、OIO等底层JAVA API 的封装,简化网络编程模型。

  • 线程模型

提供高效的线程协作模型。

大家不妨回想一下在以往的面试的过程中,面试官通常会问:Netty 的线程模型是什么?

主从多 Reactor 模型,相信大家都能脱口而出,然后呢?就没有然后了?

线程模型在网络通信中主要解决什么样的问题?在 Netty 中又是如何解决的,Netty 的线程模型为什么如此高效?请容我慢慢道来。

温馨提示:为了保证文章观点的严谨性,将探究领域锁定在:Netty NIO 相关。

1、主从多 Reactor 模型

主从多 Reactor 模型是业界一种非常经典的线程编程模型,其原理图如下所示:

在这里插入图片描述

我们首先简单介绍一下上图中涉及的几个重要角色:

  • Acceptor

请求接收者,在实践时其职责类似服务器,并不真正负责连接请求的建立,而只将其请求委托 Main Reactor 线程池来实现,起到一个转发的作用。

  • Main Reactor

主 Reactor 线程组,主要负责连接事件,并将IO读写请求转发到 SubReactor 线程池。当然在一些需要对客户端进行权限控制等场景下,权限校验的职责可以放到 Main Reactor 线程池,即 Main Reactor 也可以注册通道的读写事件,读取客户端权限校验相关的数据包,执行权限验证,权限验证通过后再将2通道注册到IO线程。

  • Sub Reactor

Main Reactor 通常监听客户端连接后会将通道的读写转发到 Sub Reactor 线程池中一个线程(负载均衡),负责数据的读写。在 NIO 中 通常注册通道的读(OP_READ)、写事件(OP_WRITE)。

为了更加深刻的理解主从 Reactor 模型,我们来看一下网络通讯一般会包含哪些关键动作:

在这里插入图片描述

一个网络交互通常的几个步骤如下:

  • 服务端启动,并在特定端口上监听,例如 web 应用的 80端口。
  • 客户端发起TCP的三次握手,与服务端建立连接,这里以 NIO 为例,连接成功建立后会创建NioSocketChannel对象。
  • 服务端通过 NioSocketChannel 从网卡中读取数据。
  • 服务端根据通信协议从二进制流中解码出一个个请求。
  • 根据请求,执行对应的业务操作,例如 Dubbo 服务端接受一个查询用户ID为1的用户信息。
  • 将业务执行结果返回到客户端,通常涉及到协议编码、压缩等。

线程模型需要解决的问题:连接监听、网络读写、编码、解码、业务执行这些操作步骤如何运用多线程编程,提升性能。

主从多Reactor模型是如何解决上面的问题呢?

  1. 连接建立(OP_ACCEPT)由 Main Reactor 线程池负责,创建NioSocketChannel后,将其转发给SubReactor。
  2. SubReactor 线程池主要负责网络的读写(从网络中读字节流、将字节流发送到网络中),即注册OP_READ、OP_WRITE,并且同一个通道会绑定一个SubReactor线程。
  3. 编码、解码、业务执行,则具体情况具体分析

通常编码、解码会放在IO线程中执行,而业务逻辑的执行通常会采用额外的线程池,但不是绝对的,一个好的框架通常会使用参数来进行定制化选择,例如 ping、pong 这种心跳包,直接在 IO 线程中执行,无需再转发到业务线程池,避免线程切换开销。

温馨提示:在网络编程中,通常将用于网络读写的线程称为IO线程。

2、Netty 的线程模型

Netty的线程模型是基于主从多Reactor模型。

在这里插入图片描述

Netty 中网络的连接事件(OP_ACCEPT)由Main Reactor 线程组实现,即 Boss Group,通常只需设置一个线程。

网络的读写操作由 Work Group ( Sub Reactor) 线程组来实现,线程的个数默认为 2 * CPU Core,一个 Channel 绑定到其中一个 Work 线程,一个 Work 线程中可以绑定多个 Channel。

在 Netty 中编码、解码等操作会被封装成一个一个事件处理器(ChannelHandler),那这些 Handler 是在IO线程池中执行?

默认情况下ChannelHandler 是在 IO 线程中执行,那如何改变默认行为呢?其关键代码如下:

在这里插入图片描述

关键点:在将事件处理器添加到事件链时可以指定在哪个线程池中执行,如果不指定则为IO线程中执行。

面试官:通常业务操作会专门开辟一个线程池,那业务处理完成之后,如何将响应结果通过 IO 线程写入到网卡中呢?

在这里插入图片描述

业务线程调用 Channel 对象的 write 方法并不会立即写入网络,只是将数据放入一个待写入队列(缓存区),然后IO线程每次执行事件选择后,会从待写入缓存区中获取写入任务,将数据真正写入到网络中,数据到达网卡之前会经过一系列的 Channel Handler(Netty事件传播机制),最终写入网卡。

最后再来介绍一下 Netty 中 IO 线程的大体工作流程。

在这里插入图片描述

IO线程处理的关键点:

  • 每一IO线程在执行上述操作时是串行执行的,即注册在一个 Selector(事件选择器)中的所有通道,**同一时间只有一个通道的事件被处理。**这也是为什么NIO应对大文件传输时不具备优势的根本原因。
  • IO 线程在处理完所有就绪事件后,还会从任务队列(Task Queue)获取任务,例如上文中提到的业务线程在执行完业务后需要将返回结果写入网络,Netty 中所有的网络读写操作只能在IO线程中真正获得运行,故业务线程需要将带写入的响应结果封装成 Task,放入到 IO 线程任务队列中。

3、总结

回到到主题,如果我们在面试过程中碰到面试官提问“Netty 的线程模型是什么?”时,我们应该可以从容应对了。

我觉得可以从如下几个方面进行展开。

  1. Netty 的线程模型基于主从多Reactor模型。通常由一个线程负责处理OP_ACCEPT事件,拥有 CPU 核数的两倍的IO线程处理读写事件。
  2. 一个通道的IO操作会绑定在一个IO线程中,而一个IO线程可以注册多个通道。
  3. 在一个网络通信中通常会包含网络数据读写,编码、解码、业务处理。默认情况下编码、解码等操作会在IO线程中运行,但也可以指定其他线程池。
  4. 通常业务处理会单独开启业务线程池,但也可以进一步细化,例如心跳包可以直接在IO线程中处理,而需要再转发给业务线程池,避免线程切换。
  5. 在一个IO线程中所有通道的事件是串行处理的。

欢迎加笔者微信号(dingwpmz),拉您如技术交流加群探讨,关注『中间件兴趣圈』回复**【PDF】**可获取海量学习资料。\

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

总算讲清DDD领域驱动设计中的领域事件(Domain Eve

发表于 2021-03-12

学习什么是领域事件.什么时候并且为什么要使用领域事件。
•学习如何将领域事件建模成对象,何时应该为领域事件创建唯一的身份标识。
•学习一个轻量级的发布-订阅[Gamma et al]模式。
•学习哪些组件用于发布事件,哪些组件用于订阅事件。
•学习为什么我们需要一个事件存储.如何实现事件存储、如何使用事件存储。
•学习S aaSOvation团队是如何通过不同的方式将领域事件发布给自治系统

1 何时、为什么使用领域事件?

1.1 定义

使用领域事件来建模发生在领域中的一些事情。这是一个功能强大的建模工具,让人爱不释手。
使用领域事件时,首先就是要对不同事件进行定义。

《领域驱动设计》一书中并未给出领域事件的定义。因为该模型是在该书出版后才被提出。
当前对领域事件的定义:领域专家所关心的发生在领域中的一些事件。
将领域中所发生的活动建模成一系列的离散事件。每个事件都用领域对象来表 示……领域事件是领域模型的组成部分,表示领域中所发生的事情。[Evans, Ref, P-20]
一

个领域事件将导致进一步的业务操作,在实现业务解耦的同时,还有助于形成完整的业务闭环。

领域事件可以是业务流程的一个步骤,比如一个事件发生后触发的后续动作,比如密码连续输错三次,触发锁定账户的动作。

1.2 识别领域事件

  • “如果发生……,则……”
  • “当做完……的时候,请通知……”(这里的通知本身并不能构成一个事件,而只是表明我们需要向外界发出通知)

在这些场景中,如果发生某种事件后,会触发进一步的操作,那么这个事件很可能就是领域事件。由于领域事件需要发布到外部系统,比如发布到另一个限界上下文。由于这样的事件由订阅方处理,它将对本地和远程上下文产生深远的影响。

那领域事件为什么要用最终一致性,而不是传统SOA的直接调用?

聚合的一个原则:一个事务中最多只能更改一个聚合实例。所以

  • 本地限界上下文中的其他聚合实例便可以通过领域事件的方式同步
  • 用于使远程依赖系统与本地系统保持一致。解耦本地系统和远程系统还有助于提高双方协作服务的可伸缩性。


聚合创建并发布事件。订阅方可以先存储事件,然后再将其转发到远程订阅方,或不经存 储,直接转发。除非MQ共享了模型的数据存储,不然即时转发需要XA(两阶段提交)。

考虑在系统非高峰时期,批处理过程通常进行一些系统维护工作,比如删除过期对象、创建新对象以支持新业务需求或通知用户所发生的重要事件。这样的批处理过程通常需复杂 查询且需庞大事务支持。若这些批处理过程存在冗余会怎么样?
系统中发生的每一件事情,我们都用事件形式捕获,然后将事件发布给订阅方处理,能简化系统吗?肯定的!它可消除先前批处理过程中的复杂查询,因为我们能够准确知道在何时发生何事,限界上下文也由此知道接下来应该做啥。在接收到领域事件时,系统可立即处理。原本批量集中处理的过程可以分散成许多粒度较小的处理单元,业务需求也由此更快满足,用户也可及时进行下一步操作。

领域事件驱动设计可切断领域模型之间的强依赖。
事件发布完成后,发布方不必关心后续订阅方事件处理是否成功,即可实现领域模型的解耦,维护领域模型的独立性和数据一致性。
在领域模型映射到微服务架构时,领域事件可解耦微服务,微服务间的数据不必要求强一致性,而是基于事件的最终一致性。

触发领域事件

领域事件由外部命令触发。触发命令可以是领域服务,也可以是实体的某一个方法或者行为。

触发事件的用法

走canal增量同步数据库数据,通过监听特定表的数据变更来触发生成事件的调用。如此有利于主流业务的解耦,提高维护和可读性。(具体生成事件的操作当然还是放在对应领域的微服务中,canal监听消费端可以理解为一个任务调度平台)。这样的实现逻辑相对简单。

那不同领域事件,如何处理呢?

3 处理领域事件

3.1 微服务内

领域事件发生在微服务内的聚合间,领域事件发生后完成事件实体的构建和事件数据持久化,发布方聚合将事件发布到事件总线,订阅方接收事件数据完成后续业务操作。

微服务内大部分事件的集成,都发生在同一进程,进程自身即可控制事务。但一个事件若同时更新多个聚合,按一次事务只更新一个聚合原则,可考虑引入事件总线。

微服务内应用服务,可通过跨聚合的服务编排和组合,以服务调用方式完成跨聚合访问,这种方式通常应用于实时性和数据一致性要求高的场景。这个过程会用到分布式事务,以保证发布方和订阅方的数据同时更新成功。

在微服务内,不是说少用领域事件,而是推荐少用事件总线。DDD是以聚合为单位进行数据管理,若一次操作会修改同一微服务内的多个聚合的数据,就需保证多个聚合的数据一致性。
为了解耦不同聚合,需采用分布式事务或事件总线,而事件总线不太方便管理服务和数据的关系,可用类似saga之类的分布式事务技术。总之需确保不同聚合的业务规则和数据一致性。

3.2 微服务间

跨微服务的领域事件会在不同限界上下文或领域模型间实现业务协作,主要为解耦,减轻微服务间实时服务访问压力。

领域事件发生在微服务间较多,事件处理机制也更复杂。跨微服务事件可推动业务流程或数据在不同子域或微服务间直接流转。

跨微服务的事件机制要总体考虑事件构建、发布和订阅、事件数据持久化、MQ,甚至事件数据持久化时还可能需考虑引入分布式事务。

微服务间访问也可采用应用服务直接调用,实现数据和服务的实时访问,弊端就是跨微服务的数据同时变更需要引入分布式事务。分布式事务会影响系统性能,增加微服务间耦合,尽量避免使用。

5 领域事件设计

5.1 构建和发布

基本属性

至少包括如下:

  • 事件唯一标识(全局唯一,事件能够无歧义在多个限界上下文中传递)
  • 发生时间
  • 事件类型
  • 事件源

即主要记录事件本身以及事件发生背景的数据。

业务属性

记录事件发生那刻的业务数据,这些数据会随事件传输到订阅方,以开展后续业务操作。

事件基本属性和业务属性一起构成事件实体,事件实体依赖聚合根。领域事件发生后,事件中的业务数据不再修改,因此业务数据可以以序列化值对象的形式保存,这种存储格式在消息中间件中也比较容易解析和获取。

为保证事件结构的统一,通常创建事件的基类,子类可自行继承扩展。由于事件没有太多业务行为,实现一般比较简单。

事件发布前需先构建事件实体并持久化。
事件实体的业务数据推荐按需发布,避免泄露不必要业务信息。

事件发布方式

  • 可通过应用服务或者领域服务发布到事件总线或MQ
  • 也可从事件表中利用定时程序或数据库日志捕获技术获取增量事件数据,发布到MQ

5.2 事件数据持久化

意义

  • 系统之间数据对账
  • 实现发布方和订阅方事件数据的审计

当遇到MQ、订阅方系统宕机或网络中断,在问题解决后仍可继续后续业务流转,保证数据一致性。
毕竟虽然MQ都有持久化功能,但中间过程或在订阅到数据后,在处理之前出问题,需要进行数据对账,这样就没法找到发布时和处理后的数据版本。关键的业务数据推荐还是落库。

实现方案

  • 持久化到本地业务DB的事件表,利用本地事务保证业务和事件数据的一致性
  • 持久化到共享的事件DB。业务、事件DB不在同一DB,它们的数据持久化操作会跨DB,因此需分布式事务保证业务和事件数据强一致性,对系统性能有影响

5.3 事件总线(EventBus)

意义

实现同一微服务内的聚合之间的领域事件,提供事件分发和接收等服务。
是进程内模型,会在微服务内聚合之间遍历订阅者列表,采取同步或异步传递数据。

因为在微服务内部在同一个进程,事件总线相对好配置,它可以配置为异步的也可以配置为同步的。如果是同步就不需要落库。推荐少用微服务内聚合之间的领域事件,它会增加开发复杂度。
而微服务之间的事件,在事件数据落库后,通过应用服务直接发布到MQ。

事件分发流程

  • 若是微服务内的订阅者(其它聚合),则直接分发到指定订阅者
  • 微服务外的订阅者,将事件数据保存到事件库(表)并异步发送到MQ
  • 同时存在微服务内和外订阅者,则先分发到内部订阅者,将事件消息保存到事件库(表),再异步发送到MQ

5.4 MQ

跨微服务的领域事件大多会用到MQ,实现跨微服务的事件发布和订阅。
虽然MQ自身有持久化功能,但中间过程或在订阅到数据后,在处理之前出问题,需要进行数据对账,这样就没法找到发布时和处理后的数据版本。关键的业务数据推荐还是落库。

5.5 接收&&处理

微服务订阅方在应用层采用监听机制,接收MQ中的事件数据,完成事件数据的持久化后,就可以开始进一步的业务处理。领域事件处理可在领域服务中实现。

  • 事件是否被消费成功(消费端成功拿到消息或消费端业务处理成功),如何通知消息生产端?
    因为事件发布方有事件实体的原始的持久化数据,事件订阅方也有自己接收的持久化数据。一般可以通过定期对账的方式检查数据的一致性。
  • 在采取最终一致性的情况下,事件消费端如果出现错误,消费失败,但之前的业务都成功了,虽然记录了event dB,但后续如何处理,人工介入吗?如果人工介入再解决,前端用户会不会看到数据不一致,体验不好?
    失败的情况应该比例是很少的。失败的信息可采用多次重试,如果这个还解决不了,只能将有问题的数据放到一个问题数据区,人工解决。当然要确保一个前提,要保证数据的时序性,不能覆盖已产生的数据。

一般发布方不会等待订阅方反馈结果。发布方有发布的事件表,订阅方有消费事件表,可采用对账方式发现问题数据。

管理

大型系统的领域事件有很多:

  • 做好源端和目的端数据的对账处理,发现并识别处理过程中的异常数据
    异步的方式一般都有源端和目的端定期对账的机制。比如采用类似财务冲正的方式。如果在发布和订阅之间事件表的数据发现异步数据有问题,需要回退,会有相应的代码进行数据处理,不过不同的场景,业务逻辑会不一样,处理的方式会不一样。有的甚至还需要转人工处理。
  • 发现异常数据后,要有相应的处理机制
  • 选择适合自己场景的技术,保证数据正确传输

6 总结

领域事件在设计时我们要重点关注领域事件,用领域事件来驱动业务的流转,尽量采用基于事件的最终一致,降低微服务之间直接访问的压力,实现微服务之间的解耦,维护领域模型的独立性和数据一致性。

领域事件驱动机制可实现一个发布方N个订阅方的模式,这在传统的直接服务调用设计中基本是不可能做到的。

领域事件 V.S CQRS

CQRS主要是想读写分离,将没有领域模型的查询功能,从命令中分离出来。领域事件主要目的还是为了微服务解耦,在连续的业务处理过程中,以异步化的方式完成下一步的业务处理,降低微服务之间的直连。
它们的共同点就是通过消息中间件实现从源端数据到目的端数据的交互和分离。

如果你就是不想用领域事件,聚合之间还可以通过应用层来协调和交互。应用服务是所有聚合之上的服务,负责服务的组合和编排,以及聚合之间的协调。

参考

  • 《实现领域驱动设计》
  • 《领域驱动设计》

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…706707708…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%