开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

python 进程(池)、线程(池)

发表于 2020-09-24

进程、多进程、进程池

进程总概述

进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
python3复制代码from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

多进程(进程池创建)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
python3复制代码from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))


if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(3)
for i in range(4):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

解析:
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
Parent process 87461.
Waiting for all subprocesses done…
Run task 0 (87462)…
Run task 1 (87463)…
Run task 2 (87464)…
Task 1 runs 1.66 seconds.
Run task 3 (87463)… —————–> task3在某个进程结束时,在创建
Task 2 runs 2.33 seconds.
Task 0 runs 2.54 seconds.
Task 3 runs 2.83 seconds.
All subprocesses done.

进程之间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
python3复制代码from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

线程总概述

线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
python3复制代码import time, threading

# 新线程执行的代码:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

线程锁-线程安全(操作同一个变量)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
python3复制代码balance = 0
lock = threading.Lock()

def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
global balance
balance = balance + n
balance = balance - n
finally:
# 改完了一定要释放锁:
lock.release()

线程池创建

ThreadPoolExecutor实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
python复制代码from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()

def echo_server(addr):
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

手动创建你自己的线程池, 通常可以使用一个Queue来轻松实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
python3复制代码from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
'''
Handle a client connection
'''
sock, client_addr = q.get()
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')

sock.close()

def echo_server(addr, nworkers):
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()

# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))
echo_server(('',15000), 128)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Deprecated】数据结构 并查集 & 联合 -

发表于 2020-09-24

提示

这篇文章是在 2020 年写的,最近我对整篇文章重写并增加了新的内容,你直接看 如何使用并查集解决朋友圈问题?

⭐️ 本文已收录到 AndroidFamily,技术和职场问题,请关注公众号 [彭旭锐] 和 [BaguTree Pro] 知识星球提问。

前言

  • 并查集是一种非常适用于处理 动态连通 问题的数据结构,在面试中比较冷门,建议应试者合理安排学习时间;
  • 在这篇文章里,我将梳理并查集的基本知识 & 常考题型。如果能帮上忙,请务必点赞加关注,这真的对我非常重要。

系列文章

  • 《算法面试题 | 链表问题总结》
  • 《算法面试题 | 链表相交 & 成环问题》
  • 《算法面试题 | 回溯算法解题框架》
  • 《数据结构面试题 | 并查集 & 联合 - 查找算法》
  • 《数据结构面试题 | 二叉树基础》

延伸文章

  • 算法面试题 | 高楼丢鸡蛋(源自谷歌面试题)
题目 提示 & 题解
990. 等式方程的可满足性 Satisfiability of Equality Equations 【题解】
547. 朋友圈 Friend Circles 【题解】
684. 冗余连接 Redundant Connection 【题解】
685. 冗余连接 II Redundant Connection II
1319. 连通网络的操作次数 Number of Operations to Make Network Connected 【题解】
399.除法求值 Evaluate Division
952. 按公因数计算最大组件大小Largest Component Size by Common Factor
130. 被围绕的区域 Surrounded Regions
128. 最长连续序列Longest Consecutive Sequence
721. 账户合并 Accounts Merge
765. 情侣牵手Couples Holding Hands

目录

  1. 定义

并查集还有多个相同的名字:不相交集合(Disjoint Sets)、合并-查找集合(Merge-find Set)、联合-查询数据结构(Union-find Data Structure)、联合-查询算法(Union-find algorithm),从这么多的相似概念中,我们可以体会到并查集解决的问题,即:处理不相交集合的合并和查询。

具体来说,用于一对元素是否相连。因为元素之间的关系并不是一开始就可以确定的,所以需要一边查询一边连接(合并),这种问题叫做动态连通性问题。举个例子,990. Satisfiability of Equality Equations 等式方程的可满足性

引用自 LeetCode

  1. 并查集的实现

并查集在底层实现可以是数组,也可以是散列表,不管使用什么底层实现,关键在于能表示一个对应关系,即:key 或下标表示了节点本身,而 value 表示顶点的父亲节点,初始化时指向自己。

一般来说,当节点总个数固定不变 / 已知时,使用数组,否则使用散列表。例如在 990. Satisfiability of Equality Equations 等式方程的可满足性 这道题中,节点是已知的 26 个字母,此时使用数组即可;在 684. Redundant Connection 冗余连接 这道题中,节点个数是未知的,此时使用散列表更合适。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
kotlin复制代码基于数组
class UnionFind(n: Int) {
1. 初始时,节点指向自己
val parent = IntArray(n) { it }
...
}
-------------------------------------------------------
基于散列表
class UnionFind() {
val parent = HashMap<Int, Int>()

fun find(x: Int): Int {
1. 首次查询时,添加到散列表,并指向自己
if (null == parent[x]) {
parent[x] = x
}
var key = x
while (parent[key] != key) {
parent[key] = parent[parent[key]]!!
key = parent[key]!!
}
return key
}
}

提示
这里为不熟悉 Kotlin 的同学解释一下,IntArray(n) { it }其实是 IntArray(n){ index -> index },即:创建了一个数组,数组每个位置上的值为下标的值。

并查集的物理与逻辑实现
可以看到,并查集在物理实现上基于数组或散列表,从逻辑上就是若干棵树组成的森林,每个节点都持有父节点的引用。

  1. 并查集的两个操作

并查集使用根节点来代表一个集合,这种方法叫做 代表元法,根节点就是代表元。在此基础上,并查集实现了两个基本操作:合并 & 查询

  • 合并操作
    合并操作就是将一个节点的的父节点指向另一个的根节点
  • 查询操作
    查询操作就是查询节点的根节点,如果两个节点的根节点相同,那么表示在一个集合中(相连)。

例如,使用Union(x,y)来合并两个节点,而Find(x)查询元素的根节点(代表元)。下面是一个最基本的并查集实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
kotlin复制代码private class UnionFind(n: Int) {

1. 初始时,节点指向自己
val parent = IntArray(n) { it }

2. 查询
fun find(x: Int): Int {
var key = x
while (key != parent[key]) {
key = parent[key]
}
return key
}

3. 合并
fun union(x: Int, y: Int) {
val rootX = find(x)
val rootY = find(y)
parent[rootY] = rootX
}

4. 判断连通性
fun isConnected(x: Int, y: Int): Boolean {
return find(x) == find(y)
}
}

并查集的合并操作

  1. 连通问题 & 路径问题

并查集专注于解决连通问题,即两个元素是否相连,不关心元素之间的路径 & 距离;而路径问题往往需要找出连接两个元素的路径甚至是最短路径,这就超出了并查集的处理范围。

举个例子,给定一系列航班信息,北京——上海,上海——苏州,苏州——杭州,杭州——北京,问是否存在北京到苏州的路径,这就是连通问题,而问北京到苏州的最短路径,这就是路径问题。

关于并查集的 连通分量,表示的是整个并查集中独立的集合(或树)的个数。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码private class UnionFind(n: Int) {

1. 初始时,节点指向自己
val parent = IntArray(n) { it }

2. 连通分量计数
var count = n

3. 合并
fun union(x: Int, y: Int) {
val rootX = find(x)
val rootY = find(y)
if(rootX == rootY){
return
}
// 合并后,连通分量减一
parent[rootY] = rootX
count --
}
...
}

连通分量就是集合个数

  1. 并查集的优化

前面说到并查集逻辑上是树的数据结构,既然是树的结构,性能就与树的高度有关。极端情况下,会出现树的高度为 n 的情况(例如 union(6,7).union(5,6)、union(4,5,)、..),此时,查询的复杂度将退化到O(n)

在并查集里,有两类针对树高度的优化:路径压缩 & 按秩合并:

5.1 路径压缩(Path compression)

指在查询的过程中,更改父节点的引用使得树的高度更低,具体分为 隔代压缩 & 完全压缩。

  • 隔代压缩:将父节点指向父节点的父节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kotlin复制代码1. 非递归写法
fun find(x: Int): Int {
var key = x
while (parent[key] != key) {
parent[key] = parent[parent[key]] // 关键
key = parent[key]
}
return key
}
-------------------------------------------------------
2. 递归写法
fun find(x: Int): Int {
var key = x
if (parent[key] != key) {
parent[key] = find(parent[key])
}
return parent[key] // 关键
}

提示
这里为不熟悉 Kotlin 的同学解释一下,为什么要加 var key = x 呢?因为 Kotlin 中函数形参是不可变变量。

  • 完全压缩:直接将父节点指向根节点

隔代压缩 & 完全压缩

5.2 按秩(Rank)合并

指在合并的过程中,将高度更低的树根节点指向高度更高的根节点,避免合并以后树的高度增加。为了表示树的高度,使用 rank 数组,表示以第 i 个节点为根的树的高度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
kotlin复制代码class UnionFind(n: Int) {
1. 根节点的高度
private val rank = IntArray(n) { 1 }
2. 查询(未使用路径压缩)
fun find(x: Int): Int {
var key = x
while (key != parent[key]) {
key = parent[key]
}
return key
}
3. 按秩合并
fun union(key1: Int, key2: Int) {
val root1 = find(key1)
val root2 = find(key2)

if(rank[root1] > rank[root2]){
parent[root2] = root1
}else if(rank[root2] > rank[root1]){
parent[root1] = root2
}else{
parent[root1] = root2
rank[root1] ++
// 或
// parent[root2] = root1
// rank[root2] ++
}
}
}

按秩合并

5.3 小结

需要记住的是:只使用一种优化方式,查询的时间复杂度是O(lgn)O(lgn)O(lgn),如果同时使用路径压缩和按秩合并,查询的时间复杂度接近O(1)O(1)O(1)(反阿克德函数),可以说是相当优秀了。通常情况下,路径压缩和按秩合并使用一个即可。

  1. 总结

  • 应试建议
    1、并查集比较冷门,建议应试者合理安排学习时间,在优化并查集时,一般路径压缩和按秩合并使用一个即可。隔代压缩实现简单,也很好记,建议使用。
    2、着重理解集合的 代表元法的含义、合并 & 查询操作、优化方法。

参考资料

  • 《并查集》 —— LeetCode 出品
  • 《第 12 章 并查集》 —— liweiwei 著
  • 《等式方程的可满足性》 —— LeetCode 出品

推荐阅读

  • 密码学 | Base64是加密算法吗?
  • Java | 带你理解 ServiceLoader 的原理与设计思想
  • Java | 这是一篇全面的注解使用攻略(含 Kotlin)
  • Java | 反射:在运行时访问类型信息(含 Kotlin)
  • Android | 面试必问的 Handler,你确定不看看?
  • Android | 带你理解 NativeAllocationRegistry 的原理与设计思想
  • 计算机组成原理 | Unicode 和 UTF-8是什么关系?
  • 计算机组成原理 | 为什么浮点数运算不精确?(阿里笔试)

我是小彭,带你构建 Android 知识体系。技术和职场问题,请关注公众号 [彭旭锐] 私信我提问。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

想要搭建个论坛?Guide哥调研了100来个 Java 开源

发表于 2020-09-23

大家好!我是 Guide 哥,Java 后端开发。一个会一点前端,喜欢烹饪的自由少年。

最近有点小忙。但是,由于前几天答应了一位读者自己会推荐一些开源的论坛系统,所以,昨晚就简单地熬了个夜,对比了很多个开源论坛系统之后,总结成了这篇文章。

这篇文章我一共推荐了 5 个论坛类开源项目,除了有 1 个是基于 PHP 开发之外,其他都是基于 Java ,并且大部分都是基于 Spring Boot 这个主流框架来做的。

欢迎小伙伴们在评论区补充啊!ღ( ´・ᴗ・` )比心

  1. NiterForum

  • Github 地址:github.com/yourkevin/N…
  • 官网地址:niter.cn/forum
  • Star : 0.5k
  • 简介:尼特社区-NiterForum-一个论坛程序,几乎具有一个论坛/社区所应该有的全部功能-后端 Springboot/MyBatis/Maven/MySQL-前端 Thymeleaf/Layui-可供初学者,学习、交流使用。
  • 技术栈: 后端 Springboot + MyBatis + Maven + MySQL 前端 Thymeleaf + Layui
  • 推荐等级 :⭐⭐⭐⭐⭐
  • 评价:可以说 NiterForum 提供了一个论坛所能提供的所有功能,功能特性覆盖的非常全面。但这并不是这次推荐他的主要原因。作为本次论坛项目中第一个推荐的项目,NiterForum 的 NB 之处就是:他提供 NiterApp,完美适配了 NiterForum,支持 app 端扫码登录!

  1. Symphony

  • Github 地址:github.com/88250/symph…
  • 官网地址:ld246.com/
  • Star : 0.7k
  • 简介:🎶 一款用 Java 实现的现代化社区(论坛/问答/BBS/社交网络/博客)系统平台。
  • 技术栈: Latke (作者自研的以 JSON 为主的 Java Web 框架)+jsoup + Jodd
  • 推荐等级 :⭐⭐⭐⭐
  • 评价:讲真,Symphony 是笔者目前见过的论坛项目中功能最齐全的一款(没有之一),满足多维需求:面向内容、面向知识问答、面向用户分享、交友、游戏等。而且 Symphony 风格时尚,充满创新、好玩的特性。交互体验一级棒。这个项目的缺点也很明显,那就是项目使用的技术栈不是主流,比较小众(不过,作者自研 Java Web 框架的精神还是非常值得赞赏的!)。

sym

  1. 码问社区

  • Github 地址:github.com/codedrinker…
  • 官网地址:www.mawen.co/?sort=hot
  • Star : 1.1k
  • 简介:开源论坛、问答系统,现有功能提问、回复、通知、最新、最热、消除零回复功能。
  • 技术栈:SpringBoot + MyBatis+MySQL/H2+Flyway
  • 推荐等级 :⭐⭐⭐⭐⭐
  • 评价:码问社区的作者是阿里巴巴的一位大佬,开源了很多有意思的项目,码问社区就是其中一款,采用 SpringBoot + Vue 等主流技术栈打造,并配有整个开发过程的视频讲解。实战项目首推。

  1. MDclub

  • Github 地址:github.com/zdhxiong/md…
  • 官网地址:community.mdclub.org/
  • Star : 0.5k
  • 简介:MDClub 漂亮、轻量且好用,它能让在线讨论变得更加轻松愉悦
  • 技术栈:PHP+MySQL
  • 推荐等级 :⭐⭐⭐⭐
  • 评价 :MDclub 是一款简约风格的论坛项目。漂亮、轻量且容易上手。代码实现基于 MDUI 框架,分层分明。网站适配多种终端,从手机、ipad 到大屏显示器,均能自动适配,并且提供根据操作系统的主题,自动切换亮色主题和暗色主题。这个特性真的超赞的~

mdclub.png

  1. 朋也社区

  • Github 地址:github.com/tomoya92/py…
  • 官网地址:tomoya92.github.io/pybbs/
  • Star : 1.1 k
  • 简介:更实用的 Java 开发的社区(论坛)
  • 技术栈:Spring-Boot + Mybatis-Plus + MySQL
  • 推荐等级 :⭐⭐⭐⭐
  • 评价:朋也社区基于 Java 语言,采用主流的 Java Web 开发框架(SpringBoot)进行开发。个人觉得朋也社区最大的亮点是在设计层面上支持高度的可定制化。要实现这点很不容易,需要有很强的设计能力,并且朋也社区在实现过程对于各种集成的服务支持配置化(可随意开启或关闭)。

我是 Guide 哥,一 Java 后端开发,会一点前端,自由的少年。我们下期再见!微信搜“JavaGuide”回复“面试突击”领取我整理的 4 本原创PDF

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

用Python爬取了全国近5000家旅游景点,分析国庆去哪玩

发表于 2020-09-23

2020 国庆马上就要到了

我想今年大家在家都憋坏了

今年国庆和中秋刚好又是同一天,加起来有 8 天假

这么长的假期,当然是出去 玩玩玩!

image

但是每次长假期间,你有没有想起被人山人海支配的恐惧呢?

image

那么该去哪些地方呢?

下面我用 Python 爬取了全国近 **5000 **个旅游景点,并结合 **pyecharts **来做分析

技能包

  • 爬虫
  • Mysql
  • sqlalchemy
  • pyecharts

数据爬取

旅游景点的数据是从网上爬取的,该数据包含以下维度:景点名称,所属省市区,景点简介,门票价格,评分,近期销量,景点评级等。

爬虫比较简单,多分析下就可以,直接贴代码了

image

爬虫代码

基本上没什么反爬,加点延时别爬太快就是了…

爬取下来之后直接入库,入库部分代码如下:

image

入库

当然,每个省份我只爬了前 10 页,因为后面的基本都是些不知名的旅游景点

爬完之后看了下数据库有 4572 条数据,我想应该够了吧…

image

有了数据之后,是不是就能搞事情了…,简单做几个分析

数据分析

接下来我们使用 pyecharts 来对爬取的数据做一个简单的分析

1. 景点销量排行榜

直接从数据库查出数据,丢进去就行。

image

结果展示:

image

2. 景点评级排行榜

假期这么长,想玩的地方有很多!如何选择一个景点多的地方并且评级高的

这样就能在一个地方尽情的玩耍,而不用四处奔波

我们不妨来看看各省市能排的上号的景点有多少吧!

代码如下:

image

将数据查询出来后作一个简单的清洗,统计各省份 4A 级以上景点数量

一起来看看吧

image.gif

再将这些数据放到地图上看看..更加直观

image

3.各省销量排行榜

通过该数据,可以看出哪些省市区的人会比较多…

这样就能避免去人山人海的地方看人人人人.。。

image

综合分析

最终考虑去哪儿,可能需要多维度的分析。

所有,我把价格,评分,销量这三个维度放在一起

当然,你也可以选择你关心的维度来..最好自己动手试试

这样印象深刻!

image

来看看效果把:

image

重点关注下销量高,评分高,价格合适的…

提前祝大家国庆有个美好的假期!

wx.jpg

相关文章可点击查看:

​使用Pyecharts对猫眼电影票房可视化分析

利用 Flask 动态展示 Pyecharts 图表数据的几种方法

pyecharts可视化和微信的结合

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ServiceMesh的关键:边车模式(sidecar);又

发表于 2020-09-23

不羡鸳鸯不羡仙,一行代码调半天。原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。

哎,又堵车了。

记性好的同学,一定记得我们那辆敞快明亮的JMC 。拥有一辆JMC,任嘶吼的狂风穿过车窗打在脸上,是一件无比畅快的事情。

这次的车不一样。有点高级。开的猛的时候,狂风能够​掀掉头盔。​

仔细观察上面的这辆车,它有三个轮子。其中左边多出来的轮子和座位,就叫做边车。它是可拆卸的,加上之后,就可以带人兜风了。

边车模式(sidecar),就像是梅超风对于陈玄风,莫邪对于干将。由于和比较前沿的ServiceMesh概念息息相关,所以很容易让人望而却步。你到网上去随便一搜,都是晦涩难懂的概念。要了解下一代微服务,提前布局,需要啃一些枯燥的知识。

随着我的介绍,你会发现sidecar模式,是一个高度抽象的模式。但是不要着急,这辆车形状怪异的交通工具,我们依然能够驾驭。它的概念很简单,只不过有很多使用限制。

一步步升级

注意:下面都是边车模式,只不过有的边车实在是简陋。

<1>

大家都知道,微服务是复杂的,引入了一系列的问题,服务治理显得尤关重要。比如日志收集、服务监控、服务治理等。

比如上面这张图,我们在一个Linux服务器上,部署了四个进程。其中,web服务是最主要的进程,其他进程只是作为一些附加功能部署上去的。

其实,这三个圆圈,就是边车的功能。只要把它给挂载上,上面的服务就拥有了这些功能。

但对于这三个组件的配置,是相当复杂的。我们需要很多重复的工作。

<2>


上面这张图,通过将Web应用与我们的辅助应用打包在一块,进一步增强了 不可变性。拥有了容器的加持,我们就能够靠约定来简化打包和发布操作。比如,上面的各个组件就可以通过localhost直接通信。

但可惜的是,我们的这些辅助程序,都是作为docker容器里的进程去启动的,这种 富容器模式 有诸多缺陷,不符合不可变基础设施的理念,所以并不值得推荐。

<3>

k8s的Pod,在容器的基础上,进一步抽象。一个Pod中可以包含多个容器。如下图,基础服务和Web服务可以分别独自构建,最后以Pod作为载体,搭上便车就可以了。

为了更加显著的看到这个过程,下面这张图以日志收集为例,介绍了两个pod,相同日志收集docker容器的拓扑图。

从上面的演进过程我们可以看到。边车,就是辅助或者基础程序而已。但如何方便的管理这些附加的程序,我们有不同的组织方式。只有高度的抽象层次,才能进行方便的组装与设计。

<4>

到此为止,我们可以看一下ServiceMesh经典的两张图了。

我们把Web应用(业务服务),抽象成绿色的方块。然后把辅助组件(sidecar),抽象成蓝色方块。在一个相对简单的环境中,我们的部署方式如下所示。


由于辅助组件并不能单独存在,所以它们都依附在绿色的服务上面。

我们抽调服务集群的血肉(Web服务),只留下它的筋骨(sidecar),就可以获得下面这张图,这就是ServiceMesh。可以看到里面的连接线条是非常复杂的,人工不可能完成,只能依靠平台去管理。

任何东西只要一上规模了,就体现了它的复杂度。这还仅仅是只有36个服务节点的拓扑图。

不要小看这一个小小的蓝色方块。它不仅仅可以是一个辅助程序,而且可以成为基础设施。现在典型的service mesh,分为数据平面和控制平面,大多数落地的企业使用proxy方式实现了数据平面,对控制平面的实现有限。

像比较流行的Istio,通过负载均衡、服务间的身份验证、监控等方法,它可以轻松地创建一个已经部署了服务的网络,而服务的代码只需很少更改甚至无需更改。通过在整个环境中部署一个特殊的 sidecar代理,为服务添加 Istio 的支持,而代理会拦截微服务之间的所有网络通信,然后使用其控制平面的功能来配置和管理 Istio。

我们看下它官方的功能描述:

  1. 为 HTTP、gRPC、WebSocket 和 TCP 流量自动负载均衡。
  2. 通过丰富的路由规则、重试、故障转移和故障注入对流量行为进行细粒度控制。
  3. 可插拔的策略层和配置 API,支持访问控制、速率限制和配额。
  4. 集群内(包括集群的入口和出口)所有流量的自动化度量、日志记录和追踪。
  5. 在具有强大的基于身份验证和授权的集群中实现安全的服务间通信。

可以说,ServiceMesh将业务属性剥离了出去,只剩下一张大网,涵盖了所有运维和基础服务的工作。

要用它,不能说是没有代价的。其中有两点比较重要:

  1. 网络包通过层层的代理和转发(Ambassador模式),效率会降低,排错会变困难。
  2. 需要按照这个网格的规范进行改造,也就是写一堆适配器(Adapter模式)。

SpringCloud的Sidecar

说到适配器,就不禁想起了SpringCloud的Sidecar。

Java里要说玩新概念,怎么能少的了Spring家族?SpringCloud同样有一个sidecar的组件,它的maven坐标如下。

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-sidecar</artifactId>
</dependency>

它做的事情,更加像一个适配器。它能把一个普通的php或者nodejs服务,伪装成一个正常的SpringCloud服务。

通过简单的配置,我们就可以让一些其他语言开发的Web应用,加入到SpringCloud体系中来。

它的使用比较简单,在此不过多介绍。

End

可以看到,我们今天的这辆车,虽然简陋,但是很高级,甚至和最前沿的ServiceMesh挂钩了。在这里,我实在是佩服计算机界的名词创造能力和抽象能力。一个简单的生产者消费者,玩出了响应式编程;一个简单的边车模式,玩出了ServicemMesh。

虽然这个东西比较新,但比起什么中台概念来,能够落地不务虚,是更加有技术说服力的。因为中台搞不死​程序员,但会搞死公司。 ​

未来还会有什么奇形怪状的交通工具呢?这是个未知数。请搭上xjjdog的轻便列车,一块探索吧。

作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构和Linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于CDC技术的ElasticSearch索引同步机制

发表于 2020-09-23

概述

ElasticSearch作为一个基于Lucene的搜索引擎被广泛应用于各种应用系统,比如电商、新闻类、咨询类网站。在使用ElasticSearch开发应用的过程中,一个非常重要的过程是将数据导入到ElasticSearch索引中建立文档。在一开始系统规模比较小时,我们可以使用logstash来同步索引。logstash的好处是开方量少,只要进行编写简单的索引模板和同步sql,就能快速搭建索引同步程序。但是随着应用数据规模的变大,索引变化变得非常频繁。logstash的缺点也随着暴露,包括(1)不支持删除,只能通过修改字段属性软删除,随着应用使用时间的增长,ElasticSearch中会留存大量的无用数据,拖慢搜索速度。(2)sql分页效率低,sql查询慢。logstash的分页逻辑是先有一个大的子查询,然后再从子查询中分页获取数据,因此效率低下,当数据库数据量大时,一个分页查询就需要几百秒。同步几千万数据可能需要1天时间。因此我们决定放弃使用logstash,而改用使用canal来搭建基于CDC技术的ElasticSearch索引同步机制。

系统架构设计

CDC索引同步模型

如图所示,索引同步系统由几个部分组成,下面分点介绍。

(1)数据库

原始数据数据库

(2)Canal

Canal是阿里云开源的MySql数据库增量数据订阅和消费工具。它的实现原理是将自己伪装为一个MySQL slave,向MySql master发送dump协议;MySQL master收到dump请求,开始推送binary log给slave,canal解析binary log对象。

(3)Canal Client

Canal Client是自己实现的程序,通过从Canal Server中获取经过Canal解析之后的数据库binlog日志,做相应的业务逻辑处理。在本文介绍的基于CDC的索引同步系统中,Canal Client订阅搜索相关的数据库表的binlog日志,如果跟数据搜索相关的数据发生变化时,就向Rabbit发一条消息,表明数据发生变化了,通知同步Worker从MySQL同步数据到ES。

(4)RabbitMQ

消息队列,也可以选用Kafaka等其他消息队列,根据具体业务确定。

(5)索引同步Worker

Worker从消息队列中消费数据,根据消息从MySQL获取相应的数据并同步到ElasticSearch中。

Canal Client实现

Canal Client从Canal Server中获取binlog日志,并根据业务需求进行处理。以下通过一些关键代码介绍Canal Client的实现。

(1)在pom中添加Canal client的依赖。

1
2
3
4
5
xml复制代码      <dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>

(2)初始化Canal连接

CanalConfig包含了Canal的配置信息。CanalConnector为canal-client包中的类,我们通过这个类来连接server,获取binlog,关闭server。该服务基于SpringBoot。因此init会在CanalClientInitializer bean被创建时被调用,preDestory会在服务关闭,CanClientInitializer被销毁时被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码@Component
@Slf4j
public class CanalClientInitializer {


CanalConfig canalConfig;

CanalConnector connector;

CanalDataProcessor canalDataProcessor;


public CanalClientInitializer(@Autowired CanalConfig canalConfig, @Autowired CanalDataProcessor canalDataProcessor) {
this.canalConfig = canalConfig;
this.canalDataProcessor = canalDataProcessor;
}


@PostConstruct
public void init() throws InterruptedException {
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()), canalConfig.getDestination(), "", "");
//建立连接
connector.connect();
//订阅相关的表
connector.subscribe(canalConfig.getSyncTable());
canalDataProcessor.process(connector);
}


@PreDestroy
public void preDestroy() {
log.info("stop the canal client");
canalDataProcessor.stopProcess();
}

}

(3)CanalDataProcessor获取并处理binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码@Component
@Slf4j
public class CanalDataProcessor {


boolean isRunning;

RabbitTemplate rabbitTemplate;

TableChangeProcessor tableChangeProcessor;

public CanalDataProcessor(@Autowired RabbitTemplate rabbitTemplate, @Autowired TableChangeProcessor processor) {
this.rabbitTemplate = rabbitTemplate;
this.tableChangeProcessor = processor;
}

@Async
public void process(CanalConnector connector) throws InterruptedException {
isRunning = true;
while (isRunning) {
try {
//获取消息
Message message = connector.getWithoutAck(100, 10L, TimeUnit.SECONDS);
//业务处理逻辑
processMessage(message);
//消息被成功执行,向Canal Server发送ack消息通知server该message已经被处理完成
connector.ack(message.getId());
} catch (Exception e) {
log.error("wtf", e);
//当消息没被成功处理完成时进行回滚,下次能够重新获取该Message
connector.rollback();
Thread.sleep(1000);
}
}
connector.disconnect();
}


public void stopProcess() {
isRunning = false;
}


private void processMessage(Message message) {
for(Entry entry : message.getEntries()) {
try {
tableChangeProcessor.process(entry);
} catch (Exception e) {
log.error("wtf", e);
continue;
}
}
}
}

(4)TableChangeProcessor

TableChangeProcessor中为具体的业务逻辑,处理Message,获取跟搜索相关的数据变化,发送相应的消息到消息队列中。

注意点

(1)忽略搜索无关的数据字段变化,避免不必要的索引更新,降低服务器压力。如Products表中有一个product_weight表示商品重量发生了变化,但其实商品重量跟搜索无关,那就不要关心这个变化。

(2)对于搜索中不会出现的数据,不要写入到ES中,比如电商商品中的下架商品,另外,如果商品被下架,则要进行监听通知索引同步Worker从es中删除索引文档。这样能够降低ES中总的索引文档数量,提升搜索效率。

(3)要考虑Rabbit挂掉或者队列写满,消息无法写入的情况;首先应该在Rabbit发送消息时添加重试,其次应该在重试几次还是失败的情况下抛出异常,canal消息流回滚,下次还是能够获取到这个数据变化的Canal消息,避免数据变动的丢失。

(4)注意目前Canal只支持单Client。如果要实现高可用,则需要依赖于ZooKeeper,一个Client作为工作Client,其余Client作为冷备,当工作Client挂掉时,冷备Client监听到ZooKeeper数据变化,抢占锁成为工作Client。

Canal Worker实现

索引同步Worker从消息队列中获取Canal Client发送的跟搜索相关的数据库变化消息。举个例子,比如商品表中跟搜索相关的字段发生了变化,Canal Client会发送以下一条数据:

1
2
3
4
5
json复制代码{
"change_id": "694212527059369984",
"change_type": 1, //商品发生变化
"change_time": "1600741397"
}

在Worker中监听队列消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码@Component
@Slf4j
public class ProductChangeQueueListener {

@Autowired
@Qualifier("snake")
ObjectMapper om;

@Autowired
ChangeEventHandlerFactory changeEventHandlerFactory;

@RabbitListener(queues = RabbitConfig.PRODUCT_QUEUE_NAME, containerFactory = "customRabbitListenerContainerFactory")
public void onChange(Message message) {

ChangeEvent event = parse(message);
if(event == null) {
return;
}

changeEventHandlerFactory.handle(event);
}

private ChangeEvent parse(Message message) {
ChangeEvent event = null;
try {
event = om.readValue(new String(message.getBody()), ChangeEvent.class);
} catch (Exception e) {
log.error("同步失败,解析失败", e);
}
return event;
}


}

ChangeEventHandlerFactory为事件处理器的工厂类。以下为一个事件处理器的实现。它监听changeType为CHANGE_TYPE_OUT_PRODUCT的事件,从数据库中获取到变动的数据,构建ES的IndexRequest,并将Request存入到RequestBulkBuffer中,等待批量同步到ES中。有些同学可能会有疑问,为何不直接从Canal中获取数据,主要原因是Canal中只包含了单表数据,但是索引文档可能包含了多表的数据,因此还需要从MySQL获取数据。如果索引文档中只包含单表数据,可以考虑在ChangeEvent中包含修改之后的数据,索引同步Woker就不用再从MySql中再获取一遍数据,提升Worker工作效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码@Component
@Slf4j
public class OutProductEventHandler implements ChangeEventHandler {

@Autowired
ProductDao productDao;

@Autowired
RequestBulkBuffer buffer;


@Autowired
OutProductChangeRequestBuilder builder;

@Override
@Retryable
public boolean handle(ChangeEvent changeEvent) {
if (!match(changeEvent)) {
return false;
}

Tuple dataTuple = productDao.getProductWithStore(changeEvent.getChangeId());
if (dataTuple == null) {
return true;
}
Product product = dataTuple.get(QProduct.product);
Store store = dataTuple.get(QStore.store);

IndexRequest request = null;
try {
request = builder.convertToUpdateQuery(getTimestampNow(), product, store);
} catch (Exception e) {
log.error("wtf", e);
}
if (request == null) {
return true;
}
buffer.add(request);
return true;

}

@Override
public boolean match(ChangeEvent changeEvent) {
return ChangeEvent.CHANGE_TYPE_OUT_PRODUCT == changeEvent.getChangeType();
}
}

在上面的OutProductEventHandler类中,我们并不直接在该类中使用RestHighLevelClient将文档更新到ES索引,而是将IndexRequest暂存到RequestBulkBuffer中。RestBulkBuffer使用CircularFifoBuffer作为存储数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Component
public class RequestBulkBuffer {

CircularFifoBuffer buffer;

public RequestBulkBuffer(CircularFifoBuffer buffer) {
this.buffer = buffer;
}


public void add(DocWriteRequest<?> request) {
buffer.add(request);
}

}

CircularFifoBuffer是一个经过改造的环形队列实现。允许多线程写,在我们这个应用场景中只支持也只需支持单线程读->处理->移除处理完的数据。当环形队列缓存满时,借助于semaphore,写入线程将会被阻塞,在后面的Worker如何防止数据丢失中,我们来阐述为什么要这么做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
java复制代码/**
* 允许多线程写
* 只允许单线程->读->处理->移除
*/
public class CircularFifoBuffer {

private Logger logger = LoggerFactory.getLogger(CircularFifoBuffer.class.getName());


private transient Object[] elements;

private transient int start = 0;
private transient int end = 0;

private transient boolean full = false;

private final int maxElements;

private ReentrantLock addLock;

private Semaphore semaphore;

public CircularFifoBuffer(int size) {
if (size <= 0) {
throw new IllegalArgumentException("The size must be greater than 0");
}
elements = new Object[size];
maxElements = elements.length;
addLock = new ReentrantLock();
semaphore = new Semaphore(size);
}


public int size() {
int size = 0;

if (end < start) {
size = maxElements - start + end;
} else if (end == start) {
size = (full ? maxElements : 0);
} else {
size = end - start;
}

return size;
}

public boolean isEmpty() {
return size() == 0;
}

public boolean isFull() {
return size() == maxElements;
}

public int maxSize() {
return maxElements;
}

public void clear() {
full = false;
start = 0;
end = 0;
Arrays.fill(elements, null);
}

public boolean add(Object element) {
if (null == element) {
throw new NullPointerException("Attempted to add null object to buffer");
}

addLock.lock();
try {
semaphore.acquire();
} catch (Exception e) {
logger.error("RingBuffer", "线程退出,添加失败");
return false;
}

elements[end++] = element;


if (end >= maxElements) {
end = 0;
}

if (end == start) {
full = true;
}

addLock.unlock();

return true;

}

public Object get() {
if (isEmpty()) {
return null;
}

return elements[start];
}


public Object remove() {
if (isEmpty()) {
return null;
}

Object element = elements[start];
if(null != element) {
elements[start++] = null;
if (start >= maxElements) {
start = 0;
}
full = false;
semaphore.release();
}
return element;
}


/**
* @param size the max size of elements will return
*/
public Object[] get(int size) {
int queueSize = size();
if (queueSize == 0) { //empty
return new Object[0];
}
int realFetchSize = queueSize >= size ? size : queueSize;
if (end > start) {
return Arrays.copyOfRange(elements, start, start + realFetchSize);
} else {
if (maxElements - start >= realFetchSize) {
return Arrays.copyOfRange(elements, start, start + realFetchSize);
} else {
return ArrayUtils.addAll(
Arrays.copyOfRange(elements, start, maxElements),
Arrays.copyOfRange(elements, 0, realFetchSize - (maxElements - start))
);
}
}
}


public Object[] getAll() {
return get(size());
}



public Object[] remove(int size) {
if(isEmpty()) {
return new Object[0];
}
int queueSize = size();
int realFetchSize = queueSize >= size ? size : queueSize;
Object [] retArr = new Object[realFetchSize];
for(int i=0;i<realFetchSize;i++) {
retArr[i] = remove();
}

return retArr;
}

}

下面这个类为缓存的消费者,它循环从buffer中获取一定数据的数据,并使用RestHighLevelClient将数据批量同步到ES。在Worker启动时,会创建一个线程调用startConsume,在服务关闭时该线程结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
java复制代码@Slf4j
public class RequestBulkConsumer {
private static final int DEFAULT_BULK_SIZE = 2000;

private CircularFifoBuffer buffer;
private EsBulkRequestService service;

private boolean isRunning = false;
private int bulkSize = DEFAULT_BULK_SIZE;

public RequestBulkConsumer(CircularFifoBuffer buffer, RestHighLevelClient client) {
this.buffer = buffer;
this.service = new EsBulkRequestService(client);
}

public void setBulkSize(int size) {
this.bulkSize = size;
}

public int getBulkSize() {
return bulkSize;
}

public boolean isRunning() {
return isRunning;
}


public void startConsume() {
if(isRunning) {
return;
}
isRunning = true;
while(true) {
if(!isRunning) {
break;
}

Object [] items = buffer.get(bulkSize);
if(items.length == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
} else {
List<DocWriteRequest<?>> requests = convert(items);
try {
BulkResponse response = service.request(requests);
processResponse(response);
buffer.remove(items.length);
if (items.length < bulkSize) {
Thread.sleep(3000);
}
} catch (InterruptedException e) {
break;
} catch (IOException e) {
log.error("wtf", e);
} catch (Exception e) {
log.error("wtf", e);
buffer.remove(items.length);
}
}
}
}


private List<DocWriteRequest<?>> convert(Object [] items) {
return Stream.of(items)
.map(i -> {
if(i instanceof DocWriteRequest) {
return (DocWriteRequest<?>) i;
} else {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

public void stop() {
isRunning = false;
}


private void processResponse(BulkResponse bulkResponse) {
BulkItemResponse [] itemResponseArr = bulkResponse.getItems();
for(BulkItemResponse resp : itemResponseArr) {
DocWriteResponse docWriteResponse = resp.getResponse();
if(docWriteResponse instanceof IndexResponse) {
IndexResponse indexResponse = (IndexResponse) docWriteResponse;
if(indexResponse.getResult() != Result.CREATED && indexResponse.getResult() != Result.UPDATED) {
if(indexResponse.status() == RestStatus.CONFLICT) {
continue;
} else {
log.error("索引更新失败: {}, {}", indexResponse.getId(), resp.getFailureMessage());
}
}
} else if(docWriteResponse instanceof DeleteResponse) {
DeleteResponse deleteResponse = (DeleteResponse) docWriteResponse;
if(deleteResponse.getResult() != Result.DELETED) {
log.error("索引删除失败: {}, {}", deleteResponse.getId(), resp.getFailureMessage());
}
}
}
}
}

以下为Worker的主要几个类的代码。在索引同步系统中,高可用并不是最重要的,因为我们的搜索本身是一个准实时系统,只需要保证最终一致性就可以了,我们主要需要避免的是数据变更的丢失。以下说明在Worker中是如何避免数据丢失的。

避免数据丢失

(1)如果Rabbit挂掉,没关系,Canal Client那边在Rabbit挂掉期间无法消费binlog,会等待Rabbit重启之后再处理数据变化。Worker只要能做到Rabbit重启之后重连就行。

(2)如果MySQL挂掉,则Worker无法从数据库中获取数据,则消息处理失败,消息会堆积在Rabbit中。等MySQL重新上线之后,消息重新开始处理,数据也不会丢失。

(3)如果ES挂掉,则批量处理线程消费buffer中的数据时会失败,buffer会被生产者填满,由于CircularFifoBuffer在被填满时使用了信号量阻塞生产者线程,消息又会被堆积在Rabbit中,等待ES重新上线之后,消息重新开始处理,数据也不会丢失。

(4)如果Rabbit队列被写满,emmm,设置好在内存被占满时将消息写入硬盘然后搞一个大一点的硬盘吧,Rabbit默认应该就是这么做的。然后做好预警,当消息达到一定量时抓紧处理,一般来说可能性不是很大。

(5)版本冲突,如果商品表中某一条数据如商品A在同一秒内变化了两次,消息队列中有连续两条消息,又由于这两条消息可能在两个线程中被消费,由于网络,计算机性能等原因,先变的数据后被写入ES中,导致ES中数据和MySql数据不一致。因此我们在更新索引时使用ES的外部版本号。使用从MySQL中取数据时的时间戳作为版本号,只有当时间戳比当前版本号大或相等时才能变更文档,否则ES会报版本冲突错误。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    private IndexRequest convertToUpdateQuery(Long timestamp, OutStoreProduct outStoreProduct) throws JsonProcessingException {
IndexRequest indexRequest = new IndexRequest(indexName, "doc", outStoreProduct.getId());
if(StringUtils.isEmpty(outStoreProduct.getTooEbaoProductId())) {
log.error("商品 {} 的ebaoProductId为空,无法同步", outStoreProduct.getId());
return null;
}
indexRequest.source(om.writeValueAsString(outStoreProduct), XContentType.JSON)
.versionType(VersionType.EXTERNAL_GTE)
.version(timestamp)
.routing(outStoreProduct.getTooEbaoProductId());
return indexRequest;
}

关于全量同步

以上只是实现了增量同步,在索引初始化时,我们需要做全量同步操作,将数据从数据库初始化到ES索引中。我们可以在Worker中写一个接口,该接口实现逻辑分批将数据同步任务发到消息队列中,其它worker收到消息后完成对应任务。比如我们可以发布每一个门店的数据同步任务,worker每收到一个消息,同步一个门店的数据。

总结

综上,本系统是一个近实时的能够保证ES和MySQL数据一致性的高效索引同步系统。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot 系列教程 第一百一篇:Sprin

发表于 2020-09-23

前言

过去的Web项目开发里,我们使用最多的两种权限框架有Shiro和Spring Security,Shiro轻量级、入门门槛低功能相对也强大;Spring Security背景强大,功能丰富,入门门槛相对高,配置稍微复杂,由Spring团队开源。而现如今将介绍一款轻量级、功能强大且几乎零配置的sa-token权限框架, sa-token是一个JavaWeb权限认证框架,强大、简单、好用。

一、核心知识

  1. 什么是sa-token

sa-token是一个JavaWeb权限认证框架,强大、简单、好用

登录验证、权限验证、自定义session会话、踢人下线、持久层扩展、无cookie模式、模拟他人账号、多账号体系、注解式鉴权、Spring集成…

零配置开箱即用,覆盖所有应用场景,你所需要的功能,这里都有

与其它权限认证框架相比,sa-token有以下两点优点:

  • 上手简单:能自动化的配置全部自动化,不让你费脑子
  • 功能强大:能涵盖的功能全部涵盖,不让你用个框架还要自己给框架打各种补丁
  1. sa-token 😋

在线文档:sa-token.dev33.cn/

  1. 框架优点

与其它权限认证框架相比,sa-token尽力保证两点:

上手简单:能自动化的配置全部自动化,不让你费脑子
功能强大:能涵盖的功能全部涵盖,不让你用个框架还要自己给框架打各种补丁


二、涵盖功能

  • 登录验证
  • 权限验证
  • 自定义session会话
  • 踢人下线
  • 模拟他人账号
  • 持久层扩展(集成redis)
  • 多账号认证体系(比如一个商城项目的user表和admin表)
  • 无cookie模式(APP、小程序等前后台分离场景)
  • 注解式鉴权(优雅的将鉴权与业务代码分离)
  • 零配置与Spring等框架集成
  • …

三、集成

  1. maven依赖

在项目中直接通过 pom.xml 导入 sa-token 的依赖即可

1
2
3
4
5
xml复制代码<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token</artifactId>
<version>1.4.0</version>
</dependency>
  1. 获取源码

  • github地址: github.com/click33/sa-…
  • gitee地址: gitee.com/sz6/sa-toke…
  1. jar包下载

点击下载:sa-token-1.4.0.jar

四、准备工作

本篇将带你从零开始集成sa-token,从而让你快速熟悉sa-token的使用姿势,以maven + springboot为例


五、工程搭建

  1. 创建SpringBoot项目

在这里插入图片描述

1.2 引入sa-token的依赖

1
2
3
4
5
6
xml复制代码<!-- sa-token 权限认证, 在线文档:http://sa-token.dev33.cn/ -->
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token</artifactId>
<version>1.4.0</version>
</dependency>
  1. 引入jar包依赖

  • 在 pom.xml 中添加依赖,完整pom如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
xml复制代码<!-- web应用依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- lombok代码简化工具 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- sa-token 权限认证, 在线文档:http://sa-token.dev33.cn/ -->
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token</artifactId>
<version>1.4.0</version>
</dependency>
  1. 配置文件

  • 你可以零配置启动项目
  • 但同时你也可以在application.yml中增加如下配置,定制性使用框架:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
yaml复制代码server:
port: 8070

spring:
# sa-token配置
sa-token:
# token名称(同时也是cookie名称)
token-name: satoken
# token有效期,单位s 默认30天
timeout: 2592000
# 在多人登录同一账号时,是否共享会话(为true时共用一个,为false时新登录挤掉旧登录)
is-share: true
# 是否在cookie读取不到token时,继续从请求header里继续尝试读取
is-read-head: true
# 是否在header读取不到token时,继续从请求题参数里继续尝试读取
is-read-body: true
# 是否在初始化配置时打印版本字符画
is-v: true
  • 如果你习惯于 application.properties 类型的配置文件,那也很好办:
  • 百度: springboot properties与yml 配置文件的区别
  1. 创建主类

在项目中新建包 com.pj ,在此包内新建主类 SaTokenDemoApplication.java,输入以下代码:

1
2
3
4
5
6
7
8
java复制代码@SaTokenSetup // 标注启动 sa-token
@SpringBootApplication
public class SaTokenDemoApplication {
public static void main(String[] args) throws JsonProcessingException {
SpringApplication.run(SaTokenDemoApplication.class, args); // run-->
System.out.println("启动成功:sa-token配置如下:" + SaTokenManager.getConfig());
}
}
  1. 运行

运行代码,当你从控制台看到类似下面的内容时,就代表框架已经成功集成了
运行结果
在这里插入图片描述


六、普通spring环境

普通spring环境与springboot环境大体无异,只不过需要在项目根目录下手动创建配置文件sa-token.properties来完成配置即可。

七、sa-token常用API介绍

官方提供了一系列的常用API接口,这里只简单介绍一下登录相关的API接口: sa-token.dev33.cn/doc/#/use/l…

  1. 登录验证

1.1 核心思想

  • 所谓登录验证,说白了就是限制某些接口只有登录后才能访问(如:查询我的账号资料)
  • 如何判断你有没有登录?当然是登录成功后我给你做个标记
  • 在需要鉴权的接口里检查标记,有标记者视为已登录,无标记者视为未登录
  • 根据以上思路,我们很容易想到以下api:

1.2 具体API

StpUtil.setLoginId(Object loginId)

  • 标记当前会话登录的账号id
  • 建议的参数类型:long | int | String, 不可以传入复杂类型,如:User、Admin等等

StpUtil.logout()

  • 当前会话注销登录

StpUtil.isLogin()

  • 获取当前会话是否已经登录,返回true=已登录,false=未登录

StpUtil.checkLogin()

  • 检验当前会话是否已经登录, 如果未登录,则抛出异常:NotLoginException
  • 扩展:NotLoginException 对象可通过 getLoginKey() 方法获取具体是哪个 StpLogic 抛出的异常

StpUtil.getLoginId()

获取当前会话登录id, 如果未登录,则抛出异常:NotLoginException
类似API还有:

StpUtil.getLoginIdAsString() 获取当前会话登录id, 并转化为String类型

StpUtil.getLoginIdAsInt() 获取当前会话登录id, 并转化为int类型

StpUtil.getLoginIdAsLong() 获取当前会话登录id, 并转化为long类型

StpUtil.getLoginId(T defaultValue)

  • 获取当前会话登录id, 如果未登录,则返回默认值 (defaultValue可以为任意类型)
  • 类似API还有:

StpUtil.getLoginId_defaultNull() 获取当前会话登录id, 如果未登录,则返回null

getLoginIdByToken(String tokenValue)

  • 获取指定token对应的登录id,如果未登录,则返回 null

八、项目测试

  1. 启动SpringBoot项目

启动SpringBoot示例项目,控制天打印sa-token信息如下:

在这里插入图片描述


九、源码下载

  1. 源码

github.com/Thinkingcao…


  1. 参考

官方文档: sa-token.dev33.cn/


总结

每一种Java框架在每个不同时期都有对应的优缺点,相比Shiro、Spring Security 这些权限框架,sa-token显得相对轻量级,因为sa-token仅仅封装了一些常用的登录验证、权限验证、自定义session会话、踢人下线等功能,是一款强大、简单、好用的权限框架。

长按下图二维码,关注公众号「Thinking曹」,在通往Java架构的路上我想与你一同前行,共同进步!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

工作总结:一次代码优化的实践(模板方法+策略+工厂方法模式)

发表于 2020-09-22

前言

好久没分享工作总结啦,今天来一份代码优化总结。用模板方法+策略+工厂方法模式优化了代码,耐心点看完,应该对大家有帮助的~

本文已经收录到github

github.com/whx123/Java…

公众号:捡田螺的小男孩

优化代码前

先来了解一下类似的业务场景,简言之,就是:多个商户接入我们系统,都是走一个类似的流程通过http请求出去的。

优化前,每个公司对应一个句柄服务,伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
scss复制代码// 商户A处理句柄
CompanyAHandler implements RequestHandler {
Resp hander(req){
//查询商户信息
queryMerchantInfo();
//加签
signature();
// http请求(走代理)
httpRequestbyProxy()
// 验签
verify();
}
}
// 商户B处理句柄
CompanyBHandler implements RequestHandler {
Resp hander(Rreq){
//查询商户信息
queryMerchantInfo();
//加签
signature();
// http请求(不走代理)
httpRequestbyDirect();
// 验签
verify();
}
}
// 商户C处理句柄
CompanyBHandler implements RequestHandler {
Resp hander(Rreq){
//查询商户信息
queryMerchantInfo();
// webservice 方式调用
requestByWebservice();
}
}

优化代码思路

我的优化代码思路,是有重复代码,先把它抽出来,或者公用变量,或者公用方法,伸着公用类。所以呢,查询商户信息呀,加签呀,http请求呀先全部各抽成一个公用方法。你细心点会发现,连每个Handler处理过程都很类似的,大概都是查询商户信息+加签+http请求+验签,于是呢,可以直接把它们抽象成一个公用类呀~在这里就要引入模板方法模式咯

模板方法模式

1
2
sql复制代码在模板模式(Template Pattern)中,一个抽象类公开定义了执行它的方法的方式/模板。它的子类可以按需要重写方法实现,但调用将以抽象类中定义的方式进行。
这种类型的设计模式属于行为型模式。

既然每个Handler处理,都是类似的流程,那定义一个抽象类,把查询商户信息,加签,http请求,验签什么的,都放到里面去,俨然一个模板一样。然后,因为有些商户走http代理,有些又没走代理,怎么办呢? 定义一个抽象方法,给子类实现嘛,因为能共用就放到父类(当前的抽象类),不能共用就放到子类嘛~代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scss复制代码abstract class AbstractCompanyCommonService implements ICompanyCommonService { 
//模板方法
Resp handlerTempPlate(req){
//查询商户信息
queryMerchantInfo();
// 加签
signature();
//http 请求
if(isRequestByProxy()){
httpProxy();
}else{
httpDirect();
}
// 验签
verifySinature();
}
// Http是否走代理
abstract boolean isRequestByProxy();
}

子类商户A实现:

1
2
3
4
5
6
7
8
typescript复制代码CompanyAServiceImpl extends AbstractCompanyCommonService{
Resp hander(req){
return handlerTempPlate(req);
}
//公司A是走代理的
boolean isRequestByProxy(){
return true;
}

子类商户B实现:

1
2
3
4
5
6
7
8
typescript复制代码CompanyBServiceImpl extends AbstractCompanyCommonService{
Resp hander(req){
return handlerTempPlate(req);
}
//公司B是不走代理的
boolean isRequestByProxy(){
return false;
}

策略模式

心细的读者会发现,甚至提出疑问,你的商户C的服务实现跟你定义的公用模板,不太一样呢,那当然,实际开发中,不跟你定义的模板一样太常见了,需求是产品提的嘛,又不是根据你模板提的,是代码服务于需求的。好了,不多说啦,我使用了策略模式,来优化这个问题。

1
sql复制代码在策略模式(Strategy Pattern)中,一个类的行为或其算法可以在运行时更改。这种类型的设计模式属于行为型模式。

策略模式理解起来其好抽象对不对?我个人理解,其实策略模式就是定义一个方法(所谓算法),给子类自己去实现。实际上就是定义个方法/接口,让子类自己去实现。看代码吧:

1
2
3
4
csharp复制代码// 定义一个方法,把策略交给子类去实现。
interface ICompanyCommonService{
Resp hander(req);
}

前面商户A和商户B还是不变,使用抽象类AbstractCompanyCommonService的模板,模板不满足商户C,商户C只能自己去实现咯,各个子类自己去实现的行为,就是策略模式的体现呢,如下:

1
2
3
4
5
6
7
8
9
10
scss复制代码CompanyCServiceImpl extends AbstractCompanyCommonService{
Res hander(req){
//查询商户信息
queryMerchantInfo();
requestByWebservice();
}
//随意了,你都不走模板了
boolean isRequestByProxy(){
return false;
}

工厂方法模式

商户A、B、C服务怎么被管理呢,之前分别给A,B,C服务实现handler的,现在好了,都不知道怎么管理了,怎么知道调用哪个呢?别慌,解决方案是工厂方法模式。

1
复制代码在工厂模式中,我们在创建对象时不会对客户端暴露创建逻辑,并且是通过使用一个共同的接口来指向新创建的对象。

工厂方法模式具体实现就是:接口定义一个枚举,每个服务实现都重新实现枚举,设置唯一标志枚举,再交给spring容器管理。看代码咯:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typescript复制代码interface ICompanyCommonService{
Resp hander(req);
CompanyEnum getCompanyEnum();
}

CompanyAServiceImpl extends AbstractCompanyCommonService{
Resp hander(req){
return handlerTempPlate(req);
}
//公司A是走代理的
boolean isRequestByProxy(){
return true;
}
CompanyEnum getCompanyEnum(){
return CompanyEnum.A;
}

CompanyBServiceImpl extends AbstractCompanyCommonService{
Resp hander(req){
return handlerTempPlate(req);
}
//公司B是不走代理的
boolean isRequestByProxy(){
return false;
}
CompanyEnum getCompanyEnum(){
return CompanyEnum.B;
}

来来来,工厂方法模式出炉咯:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typescript复制代码@Component
public class CompanyServiceFactory implements ApplicationContextAware {

private static Map<CompanyEnum, ICompanyCommonService> map = new HashMap<>();

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, ICompanyCommonService> tempMap = applicationContext.getBeansOfType(ICompanyCommonService.class);
tempMap.values().forEach(iCompanyCommonService ->
map.put(iCompanyCommonService.getCompanyEnum(), iCompanyCommonService));
}

public Resp handler(req) {
return map.get(CompanyEnum.getCompanyEnum(req.getCompanyFlag()).hander(req);
}
}

最后建议

最后,不要为了使用设计模式生搬硬套,而是优化代码过程中,发现这个设计模式刚好适用,才去用的哈。附上最后的代码咯:

1
2
3
4
5
6
7
8
9
typescript复制代码@Service
public class CompanyHandler implements RequestHandler {
@Autowire
private CompanyServiceFactory companyServiceFactory;

Resp hander(req){
return companyServiceFactory.handler(req);
}
}

个人公众号

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【建议收藏】阿里P7总结的Spring注解笔记,把组件注册讲

发表于 2020-09-22

环境搭建

注解的方式是通过配置类的方式来注入组件,注解注入要比XML注入的方式简单,注解注入也需要在前者的基础上,添加一个spring-context的包,也是实际开发中常用的方式。

准备所需Jar包

Spring注解之组件注册

Spring提供了许多的注解配置,这样我们就可以通过注解的方式实现组件的注册,下图就是Spring中经常使用到的注解。

@ComponentScan和@Configurable

原先xml的方式

1
2
3
4
5
6
7
8
9
10
11
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 要扫描的包 -->
<context:component-scan base-package="model"></context:component-scan>
</beans>

使用配置类
@Configurable来标注该类为Spring中的配置类,@ComponentScan(“model”)是为该配置类指定要去扫描的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码package config;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.ComponentScan;
import model.Product;

/**
* @Configurable: 该注解是标注该类是配置类
* @ComponentScan:配置要扫描的包
* @author GaoYang
*/
@Configurable
@ComponentScan("model")
public class MainConfig {

}

@Component

使用该注解就可以将Java对象@Component注册到Ioc容器中,@Component注解要是给属性赋值要配合@Value注解为属性赋值。

1
2
3
4
5
6
7
8
9
10
11
12
kotlin复制代码/**
@Componnt可以指定该对象的id,也可以不用指定
默认id为该类的类名首字母小写
*/
@Component("students")
public class Student {
@Value("01")
private int sid;
@Value("侯宁宁")
private String name;
@Value("男")
private String sex;

配置类

1
2
3
4
5
6
7
8
9
10
less复制代码/**
* @Configurable: 该注解是标注该类是配置类
* @ComponentScan:配置要扫描的包
* @author GaoYang
*/
@Configurable
@ComponentScan("model")
public class MainConfig {

}

使用@Configuration注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
typescript复制代码@Component("students")
public class Student {
@Value("01")
private int sid;
@Value("侯宁宁")
private String name;
@Value("男")
private String sex;

public Student() {
super();
}
public Student(int sid, String name, String sex) {
super();
this.sid = sid;
this.name = name;
this.sex = sex;
}
public int getSid() {
return sid;
}
public void setSid(int sid) {
this.sid = sid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
@Override
public String toString() {
return "Student [sid=" + sid + ", name=" + name + ", sex=" + sex + "]";
}

}

测试

@Bean

使用@Bean注解该可以在我们的spring注册类里标注,创建对象的方法,可以通过一个返回值为该对象的方法去创建该对象,并通过构造器为该对象的属性进行赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typescript复制代码// 配置类
@Configurable
@ComponentScan("model")
public class MainConfig {
// 默认id为方法名
@Bean
public Product product1() {
return new Product("张三","hashd",1);
}
// 可以指定id
@Bean("product2")
public Product product2() {
return new Product("张三","hashd",1);
}

}

Java-Bean对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
typescript复制代码public class Product {
private String name;
private String price;
private int num;
public Product() {
super();
}
public Product(String name, String price, int num) {
super();
this.name = name;
this.price = price;
this.num = num;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPrice() {
return price;
}
public void setPrice(String price) {
this.price = price;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return "Product [name=" + name + ", price=" + price + ", num=" + num + "]";
}

}

测试

@TypeFilter

@TypeFilter注解
是通过设置条件来过滤一些资源,我们可以过滤一些资源不让它加载到ioc容器中。它的使用要在@ComponentScan这个注解中国去使用,通过excludeFilters参数传值,excludeFilters是一个数组,可以设定多个@TypeFilter。

@TypeFilter语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
less复制代码@Configurable
@ComponentScan(value = "model",excludeFilters = {
// FilterType.ANNOTATION是通过注解的形式进行过滤
@Filter(type = FilterType.ANNOTATION,classes = {Controller.class}),

// FilterType.ASSIGNABLE_TYPE 是通过给定的类型
@Filter(type = FilterType.ASSIGNABLE_TYPE,classes = {Product.class}),

// FilterType.ASPECTJ 根据正则表达式
@Filter(type = FilterType.ASPECTJ,classes = {""}),

// FilterType.CUSTOM 使用自定义规则
@Filter(type = FilterType.CUSTOM,classes = {TypeFilterImp.class})

})
public class MainConfig {
// @Bean == <bean></bean>

}

@FilterType.CUSTOM自定义规则

使用自定义规则,我们必须给它创建一个制定规则的类,这个类要去实现TypeFilter这个接口,并实现match这个方法,过滤器就会根据match方法的返回值加载,如果去ture就去过滤不满足条件的,如果为false则不会去加载!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码/**
* MetadataReader: 读取到的当前正在扫描的信息
* MetadataReaderFactory:可以获取到其他任何类的信息
*/
@Override
public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
// 获取当前类注解的信息
AnnotationMetadata mr = metadataReader.getAnnotationMetadata();
// 获取当前正在扫描的类的信息
ClassMetadata classMetadata = metadataReader.getClassMetadata();
// 获取当前类的资源信息
Resource resource = metadataReader.getResource();
// 获取当前类的名字
String className = classMetadata.getClassName();
System.out.println("----"+className);
// contains包含“er”
if(className.contains("er")) {
return true;
}
return false;
}

}

@Scope

Spring创建对象默认是单例的,使用@Scope来描述也就是scope=“singleton”,另外scope还有prototype、request、session、global session作用域。

各作用域的的作用

  • singleton单例模式,全局有且仅有一个实例。(默认值)
  • prototype原型模式,每次获取Bean的时候会有一个新的实例。
  • request表示该针对每一次HTTP请求都会产生一个新的bean,同时该bean仅在当前HTTP request内有效,配置实例:
    request、session、global session使用的时候首先要在初始化web的web.xml中做如下配置:
    如果你使用的是Servlet 2.4及以上的web容器,那么你仅需要在web应用的XML声明文件web.xml中增加下述ContextListener即可:
1
2
3
4
5
6
7
8
9
10
> xml复制代码<web-app>
> ...
> <listener>
> <listener-class>org.springframework.web.context.request.RequestContextListener</listener-class>
> </listener>
> ...
> </web-app>
>
>
>
  • session作用域表示该针对每一次HTTP请求都会产生一个新的bean,同时该bean仅在当前HTTP session内有效
  • global session作用域类似于标准的HTTP Session作用域,不过它仅仅在基于portlet的web应用中才有意义。Portlet规范定义了全局Session的概念,它被所有构成某个 portlet web应用的各种不同的portlet所共享。在global session作用域中定义的bean被限定于全局portlet Session的生命周期范围内。如果你在web中使用global session作用域来标识bean,那么web会自动当成session类型来使用。

案例演示

singleton

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
less复制代码@Configurable
@ComponentScan("model")
public class MainConfig {
/**
* @Scope
* prototype: 多实例的 @Scope("prototype")
* singleton: 单实例的 @Scope("person")
* request: 一次请求创建一个实例
* session: 同一个session创建一个实例
* @return
*/
@Scope("singleton")
@Bean
public Product product() {
System.out.println("该实例已被创建");
return new Product("张三","hashd",1);
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
ini复制代码public class text {
public static void main(String[] args) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(MainConfig.class);
System.out.println("Ioc容器已创建完成!");
Product bean1 = applicationContext.getBean(Product.class);
Product bean2 = applicationContext.getBean(Product.class);
System.out.println(bean1== bean2);

}
}

从下图可以看到,bean1 == bean2

Layz-bean

@Layz赖加载主要是针对的是单例模式下,单例模式下ioc容器初始化时,就将bean对象注入到了容器中,@Layz注解可以让容器创建时不去注册容器,而是等到第一次调用时才去注册bean对象。此时,创建的对象依然是单例模式!

使用语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
less复制代码// 配置类
@Configurable
@ComponentScan("model")
public class MainConfig {
/**
* 懒加载:
* 针对的是单实例的bean,默认在容器启动的时候创建对象
* 赖加载:容器启动时不创建对象,当第一次被调用时被创建
*
*/
@Lazy
@Bean
public Product product() {
System.out.println("该实例已被创建");
return new Product("张三","hashd",1);
}

测试

@Conditional

@Conditional注解是根据制定条件来进行注册,需要我创建配置条件的配置类,如果条件满足就进行注册,不满足就不去注册。

语法

配置类

1
2
3
4
5
6
7
8
9
less复制代码@Configurable
public class MainConfig {

@Conditional({winCondition.class})
@Bean("wind")
public Product wind() {
System.out.println("该实例已被创建");
return new Product("张三","wind",1);
}

条件类必须去实现Condition接口,并添加为实现的方法!

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class winCondition implements Condition{

@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata arg1) {
Environment environment = context.getEnvironment();
// 获取当前操作系统的名字
String property = environment.getProperty("os.name");
if(property.contains("Windows")) {
return true;
}
return false;
}
}

案例

需求根据当前操作系统去注册组件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
less复制代码// 配置类
@Configurable
@Import(Hero.class)
public class MainConfig {
// Windows系统
@Conditional({winCondition.class})
@Bean("wind")
public Product wind() {
System.out.println("该实例已被创建");
return new Product("张三","wind",1);
}
// Linux系统
@Conditional({linuxCondition.class})
@Bean("linux")
public Product linux() {
return new Product("李四","linux",2);

}

}

条件配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class winCondition implements Condition{
// Windows系统,返回true
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata arg1) {
Environment environment = context.getEnvironment();
String property = environment.getProperty("os.name");
if(property.contains("Windows")) {
return true;
}
return false;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public class linuxCondition implements Condition{
/**
* ConditionContext: 判断条件能使用上下文环境
* AnnotatedTypeMetadata: 注释信息
*/

@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
// 是否Linux系统
// 1、能获取到ioc使用的bean工厂
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
// 2、获取类加载器
ClassLoader clLoader = context.getClassLoader();
// 3、获取当前环境信息
Environment environment = context.getEnvironment();
String property = environment.getProperty("os.name");

// 5、bean注册类
BeanDefinitionRegistry registry = context.getRegistry();
if(property.contains("Linux")) {
return true;
}

return false;
}

测试…

@import

  • @Import只能用在类上 ,@Import通过快速导入的方式实现把实例加入spring的IOC容器中
  • 加入IOC容器的方式有很多种,@Import注解就相对很牛皮了,@Import注解可以用于导入第三方包 ,当然@Bean注解也可以,但是@Import注解快速导入的方式更加便捷
  • @Import注解有三种用法

第一种用法:直接填class数组

直接填对应的class数组,class数组可以有0到多个。对应的import的bean都将加入到spring容器中,这些在容器中bean名称是该类的全类名 ,比如com.yc.类名

1
2
3
4
ruby复制代码@Import({ 类名.class , 类名.class... })
public class TestDemo {

}

第二种用法:ImportSelector方式【重点】

这种方式的前提就是一个类要实现ImportSelector接口,假如我要用这种方法,目标对象是Myclass这个类,分析具体如下:
创建Myclass类并实现ImportSelector接口

1
2
3
4
5
6
7
8
9
10
11
12
typescript复制代码public class Myclass implements ImportSelector {
//既然是接口肯定要实现这个接口的方法
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
return new String[0];
}
}
// 分析实现接口的selectImports方法中的:

// 1、返回值: 就是我们实际上要导入到容器中的组件全类名【重点 】
// 2、参数: AnnotationMetadata表示当前被@Import注解给标注的所有注解信息【不是重点】
// 需要注意的是selectImports方法可以返回空数组但是不能返回null,否则会报空指针异常!

以上分析完毕之后,具体用法步骤如下:
第一步:创建Myclass类并实现ImportSelector接口,这里用于演示就添加一个全类名给其返回值

1
2
3
4
5
6
typescript复制代码public class Myclass implements ImportSelector {
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
return new String[]{"com.yc.Test.TestDemo3"};
}
}

第二步:编写TestDemo 类,并标注上使用ImportSelector方式的Myclass类

1
2
3
4
5
6
7
8
csharp复制代码@Import({TestDemo2.class,Myclass.class})
public class TestDemo {
@Bean
public AccountDao2 accountDao2(){
return new AccountDao2();
}

}

第三步:编写打印容器中的组件测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
arduino复制代码**
* 打印容器中的组件测试
*/
public class AnnotationTestDemo {
public static void main(String[] args) {
AnnotationConfigApplicationContext applicationContext=new AnnotationConfigApplicationContext(TestDemo.class); //这里的参数代表要做操作的类

String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();
for (String name : beanDefinitionNames){
System.out.println(name);
}

}
}

第三种用法:ImportBeanDefinitionRegistrar方式

同样是一个接口,类似于第二种ImportSelector用法,相似度80%,只不过这种用法比较自定义化注册,具体如下:

1
2
3
4
5
6
7
8
9
10
11
typescript复制代码public class Myclass2 implements ImportBeanDefinitionRegistrar {
//该实现方法默认为空
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {

}
}
// 参数分析:

// 第一个参数:annotationMetadata 和之前的ImportSelector参数一样都是表示当前被@Import注解给标注的所有注解信息
// 第二个参数表示用于注册定义一个bean

第二步:编写代码,自定义注册bean

1
2
3
4
5
6
7
8
9
java复制代码public class Myclass2 implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
//指定bean定义信息(包括bean的类型、作用域...)
RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(TestDemo4.class);
//注册一个bean指定bean名字(id)
beanDefinitionRegistry.registerBeanDefinition("TestDemo4444",rootBeanDefinition);
}
}

第三步:编写TestDemo 类,并标注上使用ImportBeanDefinitionRegistrar方式的Myclass2类

1
2
3
4
5
6
7
8
9
csharp复制代码@Import({TestDemo2.class,Myclass.class,Myclass2.class})
public class TestDemo {

@Bean
public AccountDao2 accountDao222(){
return new AccountDao2();
}

}

@FactoryBean

编写配置类

1
2
3
4
5
6
7
8
9
kotlin复制代码// 标记这是一个Spring配置类
@Configuration
public class SpringConfiguration {
// 如果没有@Bean注解,则注入到容器中的id就是方法名(也就是myFactoryBean),但是如果显示的给了值,那么注入到容器中的就是factoryBean
@Bean("factoryBean")
public MyFactoryBean myFactoryBean(){
return new MyFactoryBean();
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ini复制代码public class SpringDemo {

@Test
public void springTest01() throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(SpringConfiguration.class);
// 容器中获取的Bean,实际上就是工厂Bean(MyFactoryBean通过getObject()方法返回的对象)
Object factoryBean01 = context.getBean("factoryBean");
System.out.println("实际上注入到容器中的类型是:" + factoryBean01.getClass());

Object factoryBean02 = context.getBean("factoryBean");
System.out.println("注入到容器内的对象是否是单例:" + (factoryBean01 == factoryBean02));

Object factoryBean03 = context.getBean("&factoryBean");
System.out.println("如果想获取到MyFactoryBean的对象,使用&前缀:" + factoryBean03);

// 输出打印Spring中的所有Bean名称
String[] beanDefinitionNames = context.getBeanDefinitionNames();
for (String beanDefinitionName : beanDefinitionNames) {
System.out.println(beanDefinitionName);
}
}
}

最后

感谢你看到这里,文章有什么不足还请指正,觉得文章对你有帮助的话记得给我点个赞,每天都会分享java相关技术文章或行业资讯,欢迎大家关注和转发文章!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Springcloud+Mybatis使用多数据源的四种方式

发表于 2020-09-22

前段时间在做会员中心和中间件系统开发时,多次碰到多数据源配置问题,主要用到分包方式、参数化切换、注解+AOP、动态添加 这四种方式。这里做一下总结,分享下使用心得以及踩过的坑。

四种方式对比

文章比较长,首先给出四种实现方式的对比,大家可以根据自身需要,选择阅读。

分包方式 参数化切换 注解方式 动态添加方式
适用场景 编码时便知道用哪个数据源 运行时才能确定用哪个数据源 编码时便知道用哪个数据源 运行时动态添加新数据源
切换模式 自动 手动 自动 手动
mapper路径 需要分包 无要求 无要求 无要求
增加数据源是否需要修改配置类 需要 不需要 不需要 \
实现复杂度 简单 复杂 复杂 复杂

分包方式

数据源配置文件

在yml中,配置两个数据源,id分别为master和s1。

1
2
3
4
5
6
7
8
9
10
11
12
yaml复制代码spring:
datasource:
master:
jdbcUrl: jdbc:mysql://192.168.xxx.xxx:xxxx/db1?.........
username: xxx
password: xxx
driverClassName: com.mysql.cj.jdbc.Driver
s1:
jdbcUrl: jdbc:mysql://192.168.xxx.xxx:xxxx/db2?........
username: xxx
password: xxx
driverClassName: com.mysql.cj.jdbc.Driver

数据源配置类

master数据源配置类

注意点:

需要用@Primary注解指定默认数据源,否则spring不知道哪个是主数据源;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码@Configuration
@MapperScan(basePackages = "com.hosjoy.xxx.xxx.xxx.xxx.mapper.master", sqlSessionFactoryRef = "masterSqlSessionFactory")
public class MasterDataSourceConfig {

//默认数据源
@Bean(name = "masterDataSource")
@Primary
@ConfigurationProperties(prefix = "spring.datasource.master")
public HikariDataSource masterDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

@Bean(name = "masterSqlSessionFactory")
@Primary
public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource datasource, PaginationInterceptor paginationInterceptor)
throws Exception {
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(datasource);
bean.setMapperLocations(
// 设置mybatis的xml所在位置
new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/master/**/**.xml"));
bean.setPlugins(new Interceptor[]{paginationInterceptor});
return bean.getObject();
}

@Bean(name = "masterSqlSessionTemplate")
@Primary
public SqlSessionTemplate masterSqlSessionTemplate(
@Qualifier("masterSqlSessionFactory") SqlSessionFactory sessionfactory) {
return new SqlSessionTemplate(sessionfactory);
}
}

s1数据源配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Configuration
@MapperScan(basePackages = "com.hosjoy.xxx.xxx.xxx.xxx.mapper.s1", sqlSessionFactoryRef = "s1SqlSessionFactory")
public class S1DataSourceConfig {

@Bean(name = "s1DataSource")
@ConfigurationProperties(prefix = "spring.datasource.s1")
public HikariDataSource s1DataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

@Bean(name = "s1SqlSessionFactory")
public SqlSessionFactory s1SqlSessionFactory(@Qualifier("s1DataSource") DataSource datasource
, PaginationInterceptor paginationInterceptor)
throws Exception {
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(datasource);
bean.setMapperLocations(
// 设置mybatis的xml所在位置
new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/s1/**/**.xml"));
bean.setPlugins(new Interceptor[]{paginationInterceptor});
return bean.getObject();
}

@Bean(name = "s1SqlSessionTemplate")
public SqlSessionTemplate s1SqlSessionTemplate(
@Qualifier("s1SqlSessionFactory") SqlSessionFactory sessionfactory) {
return new SqlSessionTemplate(sessionfactory);
}
}

使用

可以看出,mapper接口、xml文件,需要按照不同的数据源分包。在操作数据库时,根据需要在service类中注入dao层。

特点分析

优点

实现起来简单,只需要编写数据源配置文件和配置类,mapper接口和xml文件注意分包即可。

缺点

很明显,如果后面要增加或删除数据源,不仅要修改数据源配置文件,还需要修改配置类。

例如增加一个数据源,同时还需要新写一个该数据源的配置类,同时还要考虑新建mapper接口包、xml包等,没有实现 “热插拔” 效果。

参数化切换方式

思想

参数化切换数据源,意思是,业务侧需要根据当前业务参数,动态的切换到不同的数据源。

这与分包思想不同。分包的前提是在编写代码的时候,就已经知道当前需要用哪个数据源,而参数化切换数据源需要根据业务参数决定用哪个数据源。

例如,请求参数userType值为1时,需要切换到数据源slave1;请求参数userType值为2时,需要切换到数据源slave2。

1
2
3
4
5
6
7
8
9
java复制代码/**伪代码**/
int userType = reqUser.getType();
if (userType == 1){
//切换到数据源slave1
//数据库操作
} else if(userType == 2){
//切换到数据源slave2
//数据库操作
}

设计思路

数据源注册

数据源配置类创建datasource时,从yml配置文件中读取所有数据源配置,自动创建每个数据源,并注册至bean工厂和AbstractRoutingDatasource(后面聊聊这个),同时返回默认的数据源master。

image-20200701112808482

数据源切换

(1)通过线程池处理请求,每个请求独占一个线程,这样每个线程切换数据源时互不影响。

(2)根据业务参数获取应切换的数据源ID,根据ID从数据源缓存池获取数据源bean;

(3)生成当前线程数据源key;

(4)将key设置到threadLocal;

(5)将key和数据源bean放入数据源缓存池;

(6)在执行mapper方法前,spring会调用determineCurrentLookupKey方法获取key,然后根据key去数据源缓存池取出数据源,然后getConnection获取该数据源连接;

(7)使用该数据源执行数据库操作;

(8)释放当前线程数据源。

image-20200701112808482
AbstractRoutingDataSource源码分析


spring为我们提供了AbstractRoutingDataSource抽象类,该类就是实现动态切换数据源的关键。

我们看下该类的类图,其实现了DataSource接口。

image-20200701151212528

我们看下它的getConnection方法的逻辑,其首先调用determineTargetDataSource来获取数据源,再获取数据库连接。很容易猜想到就是这里来决定具体使用哪个数据源的。

image-20200701151449823

进入到determineTargetDataSource方法,我们可以看到它先是调用determineCurrentLookupKey获取到一个lookupKey,然后根据这个key去resolvedDataSources里去找相应的数据源。

image-20200701151728497

看下该类定义的几个对象,defaultTargetDataSource是默认数据源,resolvedDataSources是一个map对象,存储所有主从数据源。

image-20200701151932020

所以,关键就是这个lookupKey的获取逻辑,决定了当前获取的是哪个数据源,然后执行getConnection等一系列操作。determineCurrentLookupKey是AbstractRoutingDataSource类中的一个抽象方法,而它的返回值是你所要用的数据源dataSource的key值,有了这个key值,resolvedDataSource(这是个map,由配置文件中设置好后存入的)就从中取出对应的DataSource,如果找不到,就用配置默认的数据源。

所以,通过扩展AbstractRoutingDataSource类,并重写其中的determineCurrentLookupKey()方法,可以实现数据源的切换。

代码实现

下面贴出关键代码实现。

数据源配置文件

这里配了3个数据源,其中主数据源是MySQL,两个从数据源是sqlserver。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
yaml复制代码spring:
datasource:
master:
jdbcUrl: jdbc:mysql://192.168.xx.xxx:xxx/db1?........
username: xxx
password: xxx
driverClassName: com.mysql.cj.jdbc.Driver
slave1:
jdbcUrl: jdbc:sqlserver://192.168.xx.xxx:xxx;DatabaseName=db2
username: xxx
password: xxx
driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver
slave2:
jdbcUrl: jdbc:sqlserver://192.168.xx.xxx:xxx;DatabaseName=db3
username: xxx
password: xxx
driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver

定义动态数据源

主要是继承AbstractRoutingDataSource,实现determineCurrentLookupKey方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public class DynamicDataSource extends AbstractRoutingDataSource {
/*存储所有数据源*/
private Map<Object, Object> backupTargetDataSources;

public Map<Object, Object> getBackupTargetDataSources() {
return backupTargetDataSources;
}
/*defaultDataSource为默认数据源*/
public DynamicDataSource(DataSource defaultDataSource, Map<Object, Object> targetDataSource) {
backupTargetDataSources = targetDataSource;
super.setDefaultTargetDataSource(defaultDataSource);
super.setTargetDataSources(backupTargetDataSources);
super.afterPropertiesSet();
}
public void addDataSource(String key, DataSource dataSource) {
this.backupTargetDataSources.put(key, dataSource);
super.setTargetDataSources(this.backupTargetDataSources);
super.afterPropertiesSet();
}
/*返回当前线程的数据源的key*/
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceContextHolder.getContextKey();
}
}

定义数据源key线程变量持有

定义一个ThreadLocal静态变量,该变量持有了线程和线程的数据源key之间的关系。当我们要切换数据源时,首先要自己生成一个key,将这个key存入threadLocal线程变量中;同时还应该从DynamicDataSource对象中的backupTargetDataSources属性中获取到数据源对象, 然后将key和数据源对象再put到backupTargetDataSources中。 这样,spring就能根据determineCurrentLookupKey方法返回的key,从backupTargetDataSources中取出我们刚刚设置的数据源对象,进行getConnection等一系列操作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码public class DynamicDataSourceContextHolder {
/**
* 存储线程和数据源key的映射关系
*/
private static final ThreadLocal<String> DATASOURCE_CONTEXT_KEY_HOLDER = new ThreadLocal<>();

/***
* 设置当前线程数据源key
*/
public static void setContextKey(String key) {
DATASOURCE_CONTEXT_KEY_HOLDER.set(key);
}
/***
* 获取当前线程数据源key
*/
public static String getContextKey() {
String key = DATASOURCE_CONTEXT_KEY_HOLDER.get();
return key == null ? DataSourceConstants.DS_KEY_MASTER : key;
}
/***
* 删除当前线程数据源key
*/
public static void removeContextKey() {
DynamicDataSource dynamicDataSource = RequestHandleMethodRegistry.getContext().getBean(DynamicDataSource.class);
String currentKey = DATASOURCE_CONTEXT_KEY_HOLDER.get();
if (StringUtils.isNotBlank(currentKey) && !"master".equals(currentKey)) {
dynamicDataSource.getBackupTargetDataSources().remove(currentKey);
}
DATASOURCE_CONTEXT_KEY_HOLDER.remove();
}
}

多数据源自动配置类

这里通过读取yml配置文件中所有数据源的配置,自动为每个数据源创建datasource 对象并注册至bean工厂。同时将这些数据源对象,设置到AbstractRoutingDataSource中。

通过这种方式,后面如果需要添加或修改数据源,都无需新增或修改java配置类,只需去配置中心修改yml文件即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码@Configuration
@MapperScan(basePackages = "com.hosjoy.xxx.xxx.modules.xxx.mapper")
public class DynamicDataSourceConfig {
@Autowired
private BeanFactory beanFactory;
@Autowired
private DynamicDataSourceProperty dynamicDataSourceProperty;
/**
* 功能描述: <br>
* 〈动态数据源bean 自动配置注册所有数据源〉
*
* @param
* @return javax.sql.DataSource
* @Author li.he
* @Date 2020/6/4 16:47
* @Modifier
*/
@Bean
@Primary
public DataSource dynamicDataSource() {
DefaultListableBeanFactory listableBeanFactory = (DefaultListableBeanFactory) beanFactory;
/*获取yml所有数据源配置*/
Map<String, Object> datasource = dynamicDataSourceProperty.getDatasource();
Map<Object, Object> dataSourceMap = new HashMap<>(5);
Optional.ofNullable(datasource).ifPresent(map -> {
for (Map.Entry<String, Object> entry : map.entrySet()) {
//创建数据源对象
HikariDataSource dataSource = (HikariDataSource) DataSourceBuilder.create().build();
String dataSourceId = entry.getKey();
configeDataSource(entry, dataSource);
/*bean工厂注册每个数据源bean*/
listableBeanFactory.registerSingleton(dataSourceId, dataSource);
dataSourceMap.put(dataSourceId, dataSource);
}
});
//AbstractRoutingDataSource设置主从数据源
return new DynamicDataSource(beanFactory.getBean("master", DataSource.class), dataSourceMap);
}

private void configeDataSource(Map.Entry<String, Object> entry, HikariDataSource dataSource) {
Map<String, Object> dataSourceConfig = (Map<String, Object>) entry.getValue();
dataSource.setJdbcUrl(MapUtils.getString(dataSourceConfig, "jdbcUrl"));
dataSource.setDriverClassName(MapUtils.getString(dataSourceConfig, "driverClassName"));
dataSource.setUsername(MapUtils.getString(dataSourceConfig, "username"));
dataSource.setPassword(MapUtils.getString(dataSourceConfig, "password"));
}

}

数据源切换工具类

切换逻辑:

(1)生成当前线程数据源key

(2)根据业务条件,获取应切换的数据源ID;

(3)根据ID从数据源缓存池中获取数据源对象,并再次添加到backupTargetDataSources缓存池中;

(4)threadLocal设置当前线程对应的数据源key;

(5)在执行数据库操作前,spring会调用determineCurrentLookupKey方法获取key,然后根据key去数据源缓存池取出数据源,然后getConnection获取该数据源连接;

(6)使用该数据源执行数据库操作;

(7)释放缓存:threadLocal清理当前线程数据源信息、数据源缓存池清理当前线程数据源key和数据源对象,目的是防止内存泄漏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码@Slf4j
@Component
public class DataSourceUtil {
@Autowired
private DataSourceConfiger dataSourceConfiger;

/*根据业务条件切换数据源*/
public void switchDataSource(String key, Predicate<? super Map<String, Object>> predicate) {
try {
//生成当前线程数据源key
String newDsKey = System.currentTimeMillis() + "";
List<Map<String, Object>> configValues = dataSourceConfiger.getConfigValues(key);
Map<String, Object> db = configValues.stream().filter(predicate)
.findFirst().get();
String id = MapUtils.getString(db, "id");
//根据ID从数据源缓存池中获取数据源对象,并再次添加到backupTargetDataSources
addDataSource(newDsKey, id);
//设置当前线程对应的数据源key
DynamicDataSourceContextHolder.setContextKey(newDsKey);
log.info("当前线程数据源切换成功,当前数据源ID:{}", id);

}
catch (Exception e) {
log.error("切换数据源失败,请检查数据源配置文件。key:{}, 条件:{}", key, predicate.toString());
throw new ClientException("切换数据源失败,请检查数据源配置", e);
}
}

/*将数据源添加至多数据源缓存池中*/
public static void addDataSource(String key, String dataSourceId) {
DynamicDataSource dynamicDataSource = RequestHandleMethodRegistry.getContext().getBean(DynamicDataSource.class);
DataSource dataSource = (DataSource) dynamicDataSource.getBackupTargetDataSources().get(dataSourceId);
dynamicDataSource.addDataSource(key, dataSource);
}
}

使用

1
2
3
4
5
6
7
8
9
10
java复制代码public void doExecute(ReqTestParams reqTestParams){
//构造条件
Predicate<? super Map<String, Object>> predicate =.........;
//切换数据源
dataSourceUtil.switchDataSource("testKey", predicate);
//数据库操作
mapper.testQuery();
//清理缓存,避免内存泄漏
DynamicDataSourceContextHolder.removeContextKey();
}

每次数据源使用后,都要调用removeContextKey方法清理缓存,避免内存泄漏,这里可以考虑用AOP拦截特定方法,利用后置通知为执行方法代理执行缓存清理工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Aspect
@Component
@Slf4j
public class RequestHandleMethodAspect {
@After("xxxxxxxxxxxxxxExecution表达式xxxxxxxxxxxxxxxxxx")
public void afterRunning(JoinPoint joinPoint){
String name = joinPoint.getSignature().toString();
long id = Thread.currentThread().getId();
log.info("方法执行完毕,开始清空当前线程数据源,线程id:{},代理方法:{}",id,name);
DynamicDataSourceContextHolder.removeContextKey();
log.info("当前线程数据源清空完毕,已返回至默认数据源:{}",id);
}
}

特点分析

(1)参数化切换数据源方式,出发点和分包方式不一样,适合于在运行时才能确定用哪个数据源。

(2)需要手动执行切换数据源操作;

(3)无需分包,mapper和xml路径自由定义;

(4)增加数据源,无需修改java配置类,只需修改数据源配置文件即可。

注解方式

思想

该方式利用注解+AOP思想,为需要切换数据源的方法标记自定义注解,注解属性指定数据源ID,然后利用AOP切面拦截注解标记的方法,在方法执行前,切换至相应数据源;在方法执行结束后,切换至默认数据源。

需要注意的是,自定义切面的优先级需要高于@Transactional注解对应切面的优先级。

否则,在自定义注解和@Transactional同时使用时,@Transactional切面会优先执行,切面在调用getConnection方法时,会去调用AbstractRoutingDataSource.determineCurrentLookupKey方法,此时获取到的是默认数据源master。这时@UsingDataSource对应的切面即使再设置当前线程的数据源key,后面也不会再去调用determineCurrentLookupKey方法来切换数据源了。

设计思路

数据源注册

同上。

数据源切换

利用切面,拦截所有@UsingDataSource注解标记的方法,根据dataSourceId属性,在方法执行前,切换至相应数据源;在方法执行结束后,清理缓存并切换至默认数据源。

image-20200717094314674

代码实现

数据源配置文件

同上。

定义动态数据源

同上。

定义数据源key线程变量持有

同上。

多数据源自动配置类

同上。

数据源切换工具类

切换逻辑:

(1)生成当前线程数据源key

(3)根据ID从数据源缓存池中获取数据源对象,并再次添加到backupTargetDataSources缓存池中;

(4)threadLocal设置当前线程对应的数据源key;

(5)在执行数据库操作前,spring会调用determineCurrentLookupKey方法获取key,然后根据key去数据源缓存池取出数据源,然后getConnection获取该数据源连接;

(6)使用该数据源执行数据库操作;

(7)释放缓存:threadLocal清理当前线程数据源信息、数据源缓存池清理当前线程数据源key和数据源对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public static void switchDataSource(String dataSourceId) {
if (StringUtils.isBlank(dataSourceId)) {
throw new ClientException("切换数据源失败,数据源ID不能为空");
}
try {
String threadDataSourceKey = UUID.randomUUID().toString();
DataSourceUtil.addDataSource(threadDataSourceKey, dataSourceId);
DynamicDataSourceContextHolder.setContextKey(threadDataSourceKey);
}
catch (Exception e) {
log.error("切换数据源失败,未找到指定的数据源,请确保所指定的数据源ID已在配置文件中配置。dataSourceId:{}", dataSourceId);
throw new ClientException("切换数据源失败,未找到指定的数据源,请确保所指定的数据源ID已在配置文件中配置。dataSourceId:" + dataSourceId, e);
}
}

自定义注解

自定义注解标记当前方法所使用的数据源,默认为主数据源。

1
2
3
4
5
6
java复制代码@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UsingDataSource {

String dataSourceId() default "master";
}

切面

主要是定义前置通知和后置通知,拦截UsingDataSource注解标记的方法,方法执行前切换数据源,方法执行后清理数据源缓存。

需要标记切面的优先级比@Transaction注解对应切面的优先级要高。否则,在自定义注解和@Transactional同时使用时,@Transactional切面会优先执行,切面在调用getConnection方法时,会去调用AbstractRoutingDataSource.determineCurrentLookupKey方法,此时获取到的是默认数据源master。这时@UsingDataSource对应的切面即使再设置当前线程的数据源key,后面也不会再去调用determineCurrentLookupKey方法来切换数据源了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码@Aspect
@Component
@Slf4j
@Order(value = 1)
public class DynamicDataSourceAspect {

//拦截UsingDataSource注解标记的方法,方法执行前切换数据源
@Before(value = "@annotation(usingDataSource)")
public void before(JoinPoint joinPoint, UsingDataSource usingDataSource) {
String dataSourceId = usingDataSource.dataSourceId();
log.info("执行目标方法前开始切换数据源,目标方法:{}, dataSourceId:{}", joinPoint.getSignature().toString(), dataSourceId);
try {
DataSourceUtil.switchDataSource(dataSourceId);
}
catch (Exception e) {
log.error("切换数据源失败!数据源可能未配置或不可用,数据源ID:{}", dataSourceId, e);
throw new ClientException("切换数据源失败!数据源可能未配置或不可用,数据源ID:" + dataSourceId, e);
}
log.info("目标方法:{} , 已切换至数据源:{}", joinPoint.getSignature().toString(), dataSourceId);
}

//拦截UsingDataSource注解标记的方法,方法执行后清理数据源,防止内存泄漏
@After(value = "@annotation(com.hosjoy.hbp.dts.common.annotation.UsingDataSource)")
public void after(JoinPoint joinPoint) {
log.info("目标方法执行完毕,执行清理,切换至默认数据源,目标方法:{}", joinPoint.getSignature().toString());
try {
DynamicDataSourceContextHolder.removeContextKey();
}
catch (Exception e) {
log.error("清理数据源失败", e);
throw new ClientException("清理数据源失败", e);
}
log.info("目标方法:{} , 数据源清理完毕,已返回默认数据源", joinPoint.getSignature().toString());
}
}

使用

1
2
3
4
5
6
7
8
9
10
11
java复制代码@UsingDataSource(dataSourceId = "slave1")
@Transactional
public void test(){
AddressPo po = new AddressPo();
po.setMemberCode("asldgjlk");
po.setName("lihe");
po.setPhone("13544986666");
po.setProvince("asdgjwlkgj");
addressMapper.insert(po);
int i = 1 / 0;
}

动态添加方式(非常用)

业务场景描述

这种业务场景不是很常见,但肯定是有人遇到过的。

项目里面只配置了1个默认的数据源,而具体运行时需要动态的添加新的数据源,非已配置好的静态的多数据源。例如需要去服务器实时读取数据源配置信息(非配置在本地),然后再执行数据库操作。

这种业务场景,以上3种方式就都不适用了,因为上述的数据源都是提前在yml文件配制好的。

实现思路

除了第6步外,利用之前写好的代码就可以实现。

思路是:

(1)创建新数据源;

(2)DynamicDataSource注册新数据源;

(3)切换:设置当前线程数据源key;添加临时数据源;

(4)数据库操作(必须在另一个service实现,否则无法控制事务);

(5)清理当前线程数据源key、清理临时数据源;

(6)清理刚刚注册的数据源;

(7)此时已返回至默认数据源。

代码

代码写的比较粗陋,但是模板大概就是这样子,主要想表达实现的方式。

Service A:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public String testUsingNewDataSource(){
DynamicDataSource dynamicDataSource = RequestHandleMethodRegistry.getContext().getBean("dynamicDataSource", DynamicDataSource.class);
try {
//模拟从服务器读取数据源信息
//..........................
//....................

//创建新数据源
HikariDataSource dataSource = (HikariDataSource) DataSourceBuilder.create().build();
dataSource.setJdbcUrl("jdbc:mysql://192.168.xxx.xxx:xxxx/xxxxx?......");
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUsername("xxx");
dataSource.setPassword("xxx");

//DynamicDataSource注册新数据源
dynamicDataSource.addDataSource("test_ds_id", dataSource);

//设置当前线程数据源key、添加临时数据源
DataSourceUtil.switchDataSource("test_ds_id");

//数据库操作(必须在另一个service实现,否则无法控制事务)
serviceB.testInsert();
}
finally {
//清理当前线程数据源key
DynamicDataSourceContextHolder.removeContextKey();

//清理刚刚注册的数据源
dynamicDataSource.removeDataSource("test_ds_id");

}
return "aa";
}

Service B:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Transactional(rollbackFor = Exception.class)
public void testInsert() {
AddressPo po = new AddressPo();
po.setMemberCode("555555555");
po.setName("李郃");
po.setPhone("16651694996");
po.setProvince("江苏省");
po.setCity("南京市");
po.setArea("浦口区");
po.setAddress("南京市浦口区宁六路219号");
po.setDef(false);
po.setCreateBy("23958");
addressMapper.insert(po);
//测试事务回滚
int i = 1 / 0;
}

DynamicDataSource: 增加removeDataSource方法, 清理注册的新数据源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class DynamicDataSource extends AbstractRoutingDataSource {

.................
.................
.................
public void removeDataSource(String key){
this.backupTargetDataSources.remove(key);
super.setTargetDataSources(this.backupTargetDataSources);
super.afterPropertiesSet();
}

.................
.................
.................
}

四种方式对比

分包方式 参数化切换 注解方式 动态添加方式
适用场景 编码时便知道用哪个数据源 运行时才能确定用哪个数据源 编码时便知道用哪个数据源 运行时动态添加新数据源
切换模式 自动 手动 自动 手动
mapper路径 需要分包 无要求 无要求 无要求
增加数据源是否需要修改配置类 需要 不需要 不需要 \
实现复杂度 简单 复杂 复杂 复杂

事务问题

使用上述数据源配置方式,可实现单个数据源事务控制。

例如在一个service方法中,需要操作多个数据源执行CUD时,是可以实现单个数据源事务控制的。方式如下,分别将需要事务控制的方法单独抽取到另一个service,可实现单个事务方法的事务控制。

ServiceA:

1
2
3
4
java复制代码public void updateMuilty(){
serviceB.updateDb1();
serviceB.updateDb2();
}

ServiceB:

1
2
3
4
5
6
7
8
9
10
11
java复制代码@UsingDataSource(dataSourceId = "slave1")
@Transactional
public void updateDb1(){
//业务逻辑......
}

@UsingDataSource(dataSourceId = "slave2")
@Transactional
public void updateDb2(){
//业务逻辑......
}

但是在同一个方法里控制多个数据源的事务就不是这么简单了,这就属于分布式事务的范围,可以考虑使用atomikos开源项目实现JTA分布式事务处理或者阿里的Fescar框架。

由于涉及到分布式事务控制,实现比较复杂,这里只是引出这个问题,后面抽时间把这块补上来。

参考文章

1.www.liaoxuefeng.com/article/118… Spring主从数据库的配置和动态数据源切换原理

2.blog.csdn.net/hekf2010/ar… Springcloud 多数库 多数据源整合,查询动态切换数据库

3.blog.csdn.net/tuesdayma/a… springboot-mybatis多数据源的两种整合方法

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…778779780…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%