开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

不要再重复造轮子了,Hutool这款开源工具类库贼好使

发表于 2021-11-16

「这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战」

读者群里有个小伙伴感慨说,“Hutool 这款开源类库太厉害了,基本上该有该的工具类,它里面都有。”讲真的,我平常工作中也经常用 Hutool,它确实可以帮助我们简化每一行代码,使 Java 拥有函数式语言般的优雅,让 Java 语言变得“甜甜的”。

PS:为了能够帮助更多的 Java 爱好者,已将《Java 程序员进阶之路》开源到了 GitHub(本篇已收录)。该专栏目前已经收获了 598 枚星标,如果你也喜欢这个专栏,觉得有帮助的话,可以去点个 star,这样也方便以后进行更系统化的学习!

github.com/itwanger/to…

Hutool 的作者在官网上说,Hutool 是 Hu+tool 的自造词(好像不用说,我们也能猜得到),“Hu”用来致敬他的“前任”公司,“tool”就是工具的意思,谐音就有意思了,“糊涂”,寓意追求“万事都作糊涂观,无所谓失,无所谓得”(一个开源类库,上升到了哲学的高度,作者厉害了)。

看了一下开发团队的一个成员介绍,一个 Java 后端工具的作者竟然爱前端、爱数码,爱美女,嗯嗯嗯,确实“难得糊涂”(手动狗头)。

废话就说到这,来吧,实操走起!

01、引入 Hutool

Maven 项目只需要在 pom.xml 文件中添加以下依赖即可。

1
2
3
4
5
xml复制代码<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.3</version>
</dependency>

Hutool 的设计思想是尽量减少重复的定义,让项目中的 util 包尽量少。一个好的轮子可以在很大程度上避免“复制粘贴”,从而节省我们开发人员对项目中公用类库和公用工具方法的封装时间。同时呢,成熟的开源库也可以最大限度的避免封装不完善带来的 bug。

就像作者在官网上说的那样:

  • 以前,我们打开搜索引擎 -> 搜“Java MD5 加密” -> 打开某篇博客 -> 复制粘贴 -> 改改,变得好用些

有了 Hutool 以后呢,引入 Hutool -> 直接 SecureUtil.md5()

Hutool 对不仅对 JDK 底层的文件、流、加密解密、转码、正则、线程、XML等做了封装,还提供了以下这些组件:

非常多,非常全面,鉴于此,我只挑选一些我喜欢的来介绍下(偷偷地告诉你,我就是想偷懒)。

02、类型转换

类型转换在 Java 开发中很常见,尤其是从 HttpRequest 中获取参数的时候,前端传递的是整形,但后端只能先获取到字符串,然后再调用 parseXXX() 方法进行转换,还要加上判空,很繁琐。

Hutool 的 Convert 类可以简化这个操作,可以将任意可能的类型转换为指定类型,同时第二个参数 defaultValue 可用于在转换失败时返回一个默认值。

1
2
3
java复制代码String param = "10";
int paramInt = Convert.toInt(param);
int paramIntDefault = Convert.toInt(param, 0);

把字符串转换成日期:

1
2
java复制代码String dateStr = "2020年09月29日";
Date date = Convert.toDate(dateStr);

把字符串转成 Unicode:

1
2
java复制代码String unicodeStr = "沉默王二";
String unicode = Convert.strToUnicode(unicodeStr);

03、日期时间

JDK 自带的 Date 和 Calendar 不太好用,Hutool 封装的 DateUtil 用起来就舒服多了!

获取当前日期:

1
java复制代码Date date = DateUtil.date();

DateUtil.date() 返回的其实是 DateTime,它继承自 Date 对象,重写了 toString() 方法,返回 yyyy-MM-dd HH:mm:ss 格式的字符串。

有些小伙伴是不是想看看我写这篇文章的时间,输出一下给大家看看:

1
csharp复制代码System.out.println(date);// 2020-09-29 04:28:02

字符串转日期:

1
2
java复制代码String dateStr = "2020-09-29";
Date date = DateUtil.parse(dateStr);

DateUtil.parse() 会自动识别一些常用的格式,比如说:

  • yyyy-MM-dd HH:mm:ss
  • yyyy-MM-dd
  • HH:mm:ss
  • yyyy-MM-dd HH:mm
  • yyyy-MM-dd HH:mm:ss.SSS

还可以识别带中文的:

  • 年月日时分秒

格式化时间差:

1
2
3
4
5
6
7
8
9
10
java复制代码String dateStr1 = "2020-09-29 22:33:23";
Date date1 = DateUtil.parse(dateStr1);

String dateStr2 = "2020-10-01 23:34:27";
Date date2 = DateUtil.parse(dateStr2);

long betweenDay = DateUtil.between(date1, date2, DateUnit.MS);

// 输出:2天1小时1分4秒
String formatBetween = DateUtil.formatBetween(betweenDay, BetweenFormater.Level.SECOND);

星座和属相:

1
2
3
4
java复制代码// 射手座
String zodiac = DateUtil.getZodiac(Month.DECEMBER.getValue(), 10);
// 蛇
String chineseZodiac = DateUtil.getChineseZodiac(1989);

04、IO 流相关

IO 操作包括读和写,应用的场景主要包括网络操作和文件操作,原生的 Java 类库区分字符流和字节流,字节流 InputStream 和 OutputStream 就有很多很多种,使用起来让人头皮发麻。

Hutool 封装了流操作工具类 IoUtil、文件读写操作工具类 FileUtil、文件类型判断工具类 FileTypeUtil 等等。

1
2
3
java复制代码BufferedInputStream in = FileUtil.getInputStream("hutool/origin.txt");
BufferedOutputStream out = FileUtil.getOutputStream("hutool/to.txt");
long copySize = IoUtil.copy(in, out, IoUtil.DEFAULT_BUFFER_SIZE);

在 IO 操作中,文件的操作相对来说是比较复杂的,但使用频率也很高,几乎所有的项目中都躺着一个叫 FileUtil 或者 FileUtils 的工具类。Hutool 的 FileUtil 类包含以下几类操作:

  • 文件操作:包括文件目录的新建、删除、复制、移动、改名等
  • 文件判断:判断文件或目录是否非空,是否为目录,是否为文件等等
  • 绝对路径:针对 ClassPath 中的文件转换为绝对路径文件
  • 文件名:主文件名,扩展名的获取
  • 读操作:包括 getReader、readXXX 操作
  • 写操作:包括 getWriter、writeXXX 操作

顺带说说 classpath。

在实际编码当中,我们通常需要从某些文件里面读取一些数据,比如配置文件、文本文件、图片等等,那这些文件通常放在什么位置呢?

放在项目结构图中的 resources 目录下,当项目编译后,会出现在 classes 目录下。对应磁盘上的目录如下图所示:

当我们要读取文件的时候,我是不建议使用绝对路径的,因为操作系统不一样的话,文件的路径标识符也是不一样的。最好使用相对路径。

假设在 src/resources 下放了一个文件 origin.txt,文件的路径参数如下所示:

1
java复制代码FileUtil.getInputStream("origin.txt")

假设文件放在 src/resources/hutool 目录下,则路径参数改为:

1
java复制代码FileUtil.getInputStream("hutool/origin.txt")

05、字符串工具

Hutool 封装的字符串工具类 StrUtil 和 Apache Commons Lang 包中的 StringUtils 类似,作者认为优势在于 Str 比 String 短,尽管我不觉得。不过,我倒是挺喜欢其中的一个方法的:

1
2
3
java复制代码String template = "{},一枚沉默但有趣的程序员,喜欢他的文章的话,请微信搜索{}";
String str = StrUtil.format(template, "沉默王二", "沉默王二");
// 沉默王二,一枚沉默但有趣的程序员,喜欢他的文章的话,请微信搜索沉默王二

06、反射工具

反射机制可以让 Java 变得更加灵活,因此在某些情况下,反射可以做到事半功倍的效果。Hutool 封装的反射工具 ReflectUtil 包括:

  • 获取构造方法
  • 获取字段
  • 获取字段值
  • 获取方法
  • 执行方法(对象方法和静态方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码package com.itwanger.hutool.reflect;

import cn.hutool.core.util.ReflectUtil;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;

public class ReflectDemo {
private int id;

public ReflectDemo() {
System.out.println("构造方法");
}

public void print() {
System.out.println("我是沉默王二");
}

public static void main(String[] args) throws IllegalAccessException {
// 构建对象
ReflectDemo reflectDemo = ReflectUtil.newInstance(ReflectDemo.class);

// 获取构造方法
Constructor[] constructors = ReflectUtil.getConstructors(ReflectDemo.class);
for (Constructor constructor : constructors) {
System.out.println(constructor.getName());
}

// 获取字段
Field field = ReflectUtil.getField(ReflectDemo.class, "id");
field.setInt(reflectDemo, 10);
// 获取字段值
System.out.println(ReflectUtil.getFieldValue(reflectDemo, field));

// 获取所有方法
Method[] methods = ReflectUtil.getMethods(ReflectDemo.class);
for (Method m : methods) {
System.out.println(m.getName());
}

// 获取指定方法
Method method = ReflectUtil.getMethod(ReflectDemo.class, "print");
System.out.println(method.getName());


// 执行方法
ReflectUtil.invoke(reflectDemo, "print");
}
}

07、压缩工具

在 Java 中,对文件、文件夹打包压缩是一件很繁琐的事情,Hutool 封装的 ZipUtil 针对 java.util.zip 包做了优化,可以使用一个方法搞定压缩和解压,并且自动处理文件和目录的问题,不再需要用户判断,大大简化的压缩解压的复杂度。

1
2
java复制代码ZipUtil.zip("hutool", "hutool.zip");
File unzip = ZipUtil.unzip("hutool.zip", "hutoolzip");

08、身份证工具

Hutool 封装的 IdcardUtil 可以用来对身份证进行验证,支持大陆 15 位、18 位身份证,港澳台 10 位身份证。

1
2
3
4
5
java复制代码String ID_18 = "321083197812162119";
String ID_15 = "150102880730303";

boolean valid = IdcardUtil.isValidCard(ID_18);
boolean valid15 = IdcardUtil.isValidCard(ID_15);

09、扩展 HashMap

Java 中的 HashMap 是强类型的,而 Hutool 封装的 Dict 对键的类型要求没那么严格。

1
2
3
4
5
6
7
java复制代码Dict dict = Dict.create()
.set("age", 18)
.set("name", "沉默王二")
.set("birthday", DateTime.now());

int age = dict.getInt("age");
String name = dict.getStr("name");

10、控制台打印

本地编码的过程中,经常需要使用 System.out 打印结果,但是往往一些复杂的对象不支持直接打印,比如说数组,需要调用 Arrays.toString。Hutool 封装的 Console 类借鉴了 JavaScript 中的 console.log(),使得打印变成了一个非常便捷的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class ConsoleDemo {
public static void main(String[] args) {
// 打印字符串
Console.log("沉默王二,一枚有趣的程序员");

// 打印字符串模板
Console.log("洛阳是{}朝古都",13);

int [] ints = {1,2,3,4};
// 打印数组
Console.log(ints);
}
}

11、字段验证器

做 Web 开发的时候,后端通常需要对表单提交过来的数据进行验证。Hutool 封装的 Validator 可以进行很多有效的条件验证:

  • 是不是邮箱
  • 是不是 IP V4、V6
  • 是不是电话号码
  • 等等

1
2
java复制代码Validator.isEmail("沉默王二");
Validator.isMobile("itwanger.com");

12、双向查找 Map

Guava 中提供了一种特殊的 Map 结构,叫做 BiMap,实现了一种双向查找的功能,可以根据 key 查找 value,也可以根据 value 查找 key,Hutool 也提供这种 Map 结构。

1
2
3
4
5
6
7
8
9
10
11
java复制代码BiMap<String, String> biMap = new BiMap<>(new HashMap<>());
biMap.put("wanger", "沉默王二");
biMap.put("wangsan", "沉默王三");

// get value by key
biMap.get("wanger");
biMap.get("wangsan");

// get key by value
biMap.getKey("沉默王二");
biMap.getKey("沉默王三");

在实际的开发工作中,其实我更倾向于使用 Guava 的 BiMap,而不是 Hutool 的。这里提一下,主要是我发现了 Hutool 在线文档上的一处错误,提了个 issue(从中可以看出我一颗一丝不苟的心和一双清澈明亮的大眼睛啊)。

13、图片工具

Hutool 封装的 ImgUtil 可以对图片进行缩放、裁剪、转为黑白、加水印等操作。

缩放图片:

1
2
3
4
5
java复制代码ImgUtil.scale(
FileUtil.file("hutool/wangsan.jpg"),
FileUtil.file("hutool/wangsan_small.jpg"),
0.5f
);

裁剪图片:

1
2
3
4
5
java复制代码ImgUtil.cut(
FileUtil.file("hutool/wangsan.jpg"),
FileUtil.file("hutool/wangsan_cut.jpg"),
new Rectangle(200, 200, 100, 100)
);

添加水印:

1
2
3
4
5
6
7
8
9
java复制代码ImgUtil.pressText(//
FileUtil.file("hutool/wangsan.jpg"),
FileUtil.file("hutool/wangsan_logo.jpg"),
"沉默王二", Color.WHITE,
new Font("黑体", Font.BOLD, 100),
0,
0,
0.8f
);

趁机让大家欣赏一下二哥帅气的真容。

14、配置文件

众所周知,Java 中广泛应用的配置文件 Properties 存在一个特别大的诟病:不支持中文。每次使用时,如果想存放中文字符,就必须借助 IDE 相关插件才能转为 Unicode 符号,而这种反人类的符号在命令行下根本没法看。

于是,Hutool 的 Setting 运用而生。Setting 除了兼容 Properties 文件格式外,还提供了一些特有功能,这些功能包括:

  • 各种编码方式支持
  • 变量支持
  • 分组支持

先整个配置文件 example.setting,内容如下:

1
2
ini复制代码name=沉默王二
age=18

再来读取和更新配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class SettingDemo {
private final static String SETTING = "hutool/example.setting";
public static void main(String[] args) {
// 初始化 Setting
Setting setting = new Setting(SETTING);

// 读取
setting.getStr("name", "沉默王二");

// 在配置文件变更时自动加载
setting.autoLoad(true);

// 通过代码方式增加键值对
setting.set("birthday", "2020年09月29日");
setting.store(SETTING);
}
}

15、日志工厂

Hutool 封装的日志工厂 LogFactory 兼容了各大日志框架,使用起来也非常简便。

1
2
3
4
5
6
7
java复制代码public class LogDemo {
private static final Log log = LogFactory.get();

public static void main(String[] args) {
log.debug("难得糊涂");
}
}

先通过 LogFactory.get() 自动识别引入的日志框架,从而创建对应日志框架的门面 Log 对象,然后调用 debug()、info() 等方法输出日志。

如果不想创建 Log 对象的话,可以使用 StaticLog,顾名思义,一个提供了静态方法的日志类。

1
java复制代码StaticLog.info("爽啊 {}.", "沉默王二的文章");

16、缓存工具

CacheUtil 是 Hutool 封装的创建缓存的快捷工具类,可以创建不同的缓存对象:

  • FIFOCache:先入先出,元素不停的加入缓存直到缓存满为止,当缓存满时,清理过期缓存对象,清理后依旧满则删除先入的缓存。
1
2
3
4
5
6
7
8
java复制代码Cache<String, String> fifoCache = CacheUtil.newFIFOCache(3);
fifoCache.put("key1", "沉默王一");
fifoCache.put("key2", "沉默王二");
fifoCache.put("key3", "沉默王三");
fifoCache.put("key4", "沉默王四");

// 大小为 3,所以 key3 放入后 key1 被清除
String value1 = fifoCache.get("key1");
  • LFUCache,最少使用,根据使用次数来判定对象是否被持续缓存,当缓存满时清理过期对象,清理后依旧满的情况下清除最少访问的对象并将其他对象的访问数减去这个最少访问数,以便新对象进入后可以公平计数。
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码Cache<String, String> lfuCache = CacheUtil.newLFUCache(3);

lfuCache.put("key1", "沉默王一");
// 使用次数+1
lfuCache.get("key1");
lfuCache.put("key2", "沉默王二");
lfuCache.put("key3", "沉默王三");
lfuCache.put("key4", "沉默王四");

// 由于缓存容量只有 3,当加入第 4 个元素的时候,最少使用的将被移除(2,3被移除)
String value2 = lfuCache.get("key2");
String value3 = lfuCache.get("key3");
  • LRUCache,最近最久未使用,根据使用时间来判定对象是否被持续缓存,当对象被访问时放入缓存,当缓存满了,最久未被使用的对象将被移除。
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码Cache<String, String> lruCache = CacheUtil.newLRUCache(3);

lruCache.put("key1", "沉默王一");
lruCache.put("key2", "沉默王二");
lruCache.put("key3", "沉默王三");
// 使用时间近了
lruCache.get("key1");
lruCache.put("key4", "沉默王四");

// 由于缓存容量只有 3,当加入第 4 个元素的时候,最久使用的将被移除(2)
String value2 = lruCache.get("key2");
System.out.println(value2);

17、加密解密

加密分为三种:

  • 对称加密(symmetric),例如:AES、DES 等
  • 非对称加密(asymmetric),例如:RSA、DSA 等
  • 摘要加密(digest),例如:MD5、SHA-1、SHA-256、HMAC 等

Hutool 针对这三种情况都做了封装:

  • 对称加密 SymmetricCrypto
  • 非对称加密 AsymmetricCrypto
  • 摘要加密 Digester

快速加密工具类 SecureUtil 有以下这些方法:

1)对称加密

  • SecureUtil.aes
  • SecureUtil.des

2)非对称加密

  • SecureUtil.rsa
  • SecureUtil.dsa

3)摘要加密

  • SecureUtil.md5
  • SecureUtil.sha1
  • SecureUtil.hmac
  • SecureUtil.hmacMd5
  • SecureUtil.hmacSha1

只写一个简单的例子作为参考:

1
2
3
4
5
6
7
8
9
java复制代码public class SecureUtilDemo {
static AES aes = SecureUtil.aes();
public static void main(String[] args) {
String encry = aes.encryptHex("沉默王二");
System.out.println(encry);
String oo = aes.decryptStr(encry);
System.out.println(oo);
}
}

18、其他类库

Hutool 中的类库还有很多,尤其是一些对第三方类库的进一步封装,比如邮件工具 MailUtil,二维码工具 QrCodeUtil,Emoji 工具 EmojiUtil,小伙伴们可以参考 Hutool 的官方文档:www.hutool.cn/

项目源码地址:github.com/looly/hutoo…

Java 程序员进阶之路,该专栏风趣幽默、通俗易懂,对 Java 爱好者极度友好和舒适😄,内容包括但不限于 Java 基础、Java 集合框架、Java IO、Java 并发编程、Java 虚拟机、Java 企业级开发(SSM、Spring Boot)等核心知识点。

GitHub 地址:github.com/itwanger/to…

亮白版和暗黑版的 PDF 也准备好了呢,让我们一起成为更好的 Java 工程师吧,一起冲!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RabbitMQ 的七种消息传递形式

发表于 2021-11-16

@[toc]
今天这篇文章比较简单,来和小伙伴们分享一下 RabbitMQ 的七种消息传递形式。一起来看看。

大部分情况下,我们可能都是在 Spring Boot 或者 Spring Cloud 环境下使用 RabbitMQ,因此本文我也主要从这两个方面来和大家分享 RabbitMQ 的用法。

  1. RabbitMQ 架构简介

一图胜千言,如下:

1587705504342

这张图中涉及到如下一些概念:

  1. 生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。
  2. 交换机(Exchange):和生产者建立连接并接收生产者的消息。
  3. 消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。
  4. 队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。
  5. 路由(Routes):交换机转发消息到队列的规则。
  1. 准备工作

大家知道,RabbitMQ 是 AMQP 阵营里的产品,Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp,因此首先创建 Spring Boot 项目并添加该依赖,如下:

项目创建成功后,在 application.properties 中配置 RabbitMQ 的基本连接信息,如下:

1
2
3
4
properties复制代码spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

接下来进行 RabbitMQ 配置,在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。

RabbitMQ 官网介绍了如下几种消息分发的形式:



这里给出了七种,其中第七种是消息确认,消息确认这块松哥之前发过相关的文章,传送门:

  • 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?
  • RabbitMQ 高可用之如何确保消息成功消费

所以这里我主要和大家介绍前六种消息收发方式。

  1. 消息收发

3.1 Hello World

咦?这个咋没有交换机?这个其实是默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下:

来看看代码实现:

先来看看队列的定义:

1
2
3
4
5
6
7
8
9
10
java复制代码@Configuration
public class HelloWorldConfig {

public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";

@Bean
Queue queue1() {
return new Queue(HELLO_WORLD_QUEUE_NAME);
}
}

再来看看消息消费者的定义:

1
2
3
4
5
6
7
java复制代码@Component
public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(String msg) {
System.out.println("msg = " + msg);
}
}

消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@SpringBootTest
class RabbitmqdemoApplicationTests {

@Autowired
RabbitTemplate rabbitTemplate;


@Test
void contextLoads() {
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
}

}

这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。

3.2 Work queues

这种情况是这样的:

一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:

一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。

先来看并发能力的配置,如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Component
public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(String msg) {
System.out.println("receive = " + msg);
}
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")
public void receive2(String msg) {
System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName());
}
}

可以看到,第二个消费者我配置了 concurrency 为 10,此时,对于第二个消费者,将会同时存在 10 个子线程去消费消息。

启动项目,在 RabbitMQ 后台也可以看到一共有 11 个消费者。

此时,如果生产者发送 10 条消息,就会一下都被消费掉。

消息发送方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@SpringBootTest
class RabbitmqdemoApplicationTests {

@Autowired
RabbitTemplate rabbitTemplate;


@Test
void contextLoads() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
}
}

}

消息消费日志如下:

可以看到,消息都被第一个消费者消费了。但是小伙伴们需要注意,事情并不总是这样(多试几次就可以看到差异),消息也有可能被第一个消费者消费(只是由于第二个消费者有十个线程一起开动,所以第二个消费者消费的消息占比更大)。

当然消息消费者也可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息,配置手动 ack 的方式如下:

1
properties复制代码spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Component
public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(Message message,Channel channel) throws IOException {
System.out.println("receive="+message.getPayload());
channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);
}

@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10")
public void receive2(Message message, Channel channel) throws IOException {
System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName());
channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);
}
}

此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息。

这就是 Work queues 这种情况。

3.3 Publish/Subscrite

再来看发布订阅模式,这种情况是这样:

一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,如下图:

这种情况下,我们有四种交换机可供选择,分别是:

  • Direct
  • Fanout
  • Topic
  • Header

我分别来给大家举一个简单例子看下。

3.3.1 Direct

DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。DirectExchange 的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Configuration
public class RabbitDirectConfig {
public final static String DIRECTNAME = "javaboy-direct";
@Bean
Queue queue() {
return new Queue("hello-queue");
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(DIRECTNAME, true, false);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(directExchange()).with("direct");
}
}
  • 首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字,重启后是否依然有效以及长期未用时是否删除。
  • 创建一个Binding对象将Exchange和Queue绑定在一起。
  • DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一个Queue的实例即可。

再来看看消费者:

1
2
3
4
5
6
7
java复制代码@Component
public class DirectReceiver {
@RabbitListener(queues = "hello-queue")
public void handler1(String msg) {
System.out.println("DirectReceiver:" + msg);
}
}

通过 @RabbitListener 注解指定一个方法是一个消息消费方法,方法参数就是所接收到的消息。然后在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送,如下:

1
2
3
4
5
6
7
8
9
10
java复制代码@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void directTest() {
rabbitTemplate.convertAndSend("hello-queue", "hello direct!");
}
}

最终执行结果如下:

3.3.2 Fanout

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Configuration
public class RabbitFanoutConfig {
public final static String FANOUTNAME = "sang-fanout";
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUTNAME, true, false);
}
@Bean
Queue queueOne() {
return new Queue("queue-one");
}
@Bean
Queue queueTwo() {
return new Queue("queue-two");
}
@Bean
Binding bindingOne() {
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
@Bean
Binding bindingTwo() {
return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
}
}

在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。接下来创建两个消费者,如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handler1(String message) {
System.out.println("FanoutReceiver:handler1:" + message);
}
@RabbitListener(queues = "queue-two")
public void handler2(String message) {
System.out.println("FanoutReceiver:handler2:" + message);
}
}

两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息,如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void fanoutTest() {
rabbitTemplate
.convertAndSend(RabbitFanoutConfig.FANOUTNAME,
null, "hello fanout!");
}
}

注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null。

最终执行日志如下:

3.3.3 Topic

TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。TopicExchange 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码@Configuration
public class RabbitTopicConfig {
public final static String TOPICNAME = "sang-topic";
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPICNAME, true, false);
}
@Bean
Queue xiaomi() {
return new Queue("xiaomi");
}
@Bean
Queue huawei() {
return new Queue("huawei");
}
@Bean
Queue phone() {
return new Queue("phone");
}
@Bean
Binding xiaomiBinding() {
return BindingBuilder.bind(xiaomi()).to(topicExchange())
.with("xiaomi.#");
}
@Bean
Binding huaweiBinding() {
return BindingBuilder.bind(huawei()).to(topicExchange())
.with("huawei.#");
}
@Bean
Binding phoneBinding() {
return BindingBuilder.bind(phone()).to(topicExchange())
.with("#.phone.#");
}
}
  • 首先创建 TopicExchange,参数和前面的一致。然后创建三个 Queue,第一个 Queue 用来存储和 “xiaomi” 有关的消息,第二个 Queue 用来存储和 “huawei” 有关的消息,第三个 Queue 用来存储和 “phone” 有关的消息。
  • 将三个 Queue 分别绑定到 TopicExchange 上,第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上,第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上,第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。

接下来针对三个 Queue 创建三个消费者,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Component
public class TopicReceiver {
@RabbitListener(queues = "phone")
public void handler1(String message) {
System.out.println("PhoneReceiver:" + message);
}
@RabbitListener(queues = "xiaomi")
public void handler2(String message) {
System.out.println("XiaoMiReceiver:"+message);
}
@RabbitListener(queues = "huawei")
public void handler3(String message) {
System.out.println("HuaWeiReceiver:"+message);
}
}

然后在单元测试中进行消息的发送,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void topicTest() {
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"xiaomi.news","小米新闻..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"huawei.news","华为新闻..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"xiaomi.phone","小米手机..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"huawei.phone","华为手机..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"phone.news","手机新闻..");
}
}

根据 RabbitTopicConfig 中的配置,第一条消息将被路由到名称为 “xiaomi” 的 Queue 上,第二条消息将被路由到名为 “huawei” 的 Queue 上,第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,最后一条消息则将被路由到名为 “phone” 的 Queue 上。

3.3.4 Header

HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码@Configuration
public class RabbitHeaderConfig {
public final static String HEADERNAME = "javaboy-header";
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(HEADERNAME, true, false);
}
@Bean
Queue queueName() {
return new Queue("name-queue");
}
@Bean
Queue queueAge() {
return new Queue("age-queue");
}
@Bean
Binding bindingName() {
Map<String, Object> map = new HashMap<>();
map.put("name", "sang");
return BindingBuilder.bind(queueName())
.to(headersExchange()).whereAny(map).match();
}
@Bean
Binding bindingAge() {
return BindingBuilder.bind(queueAge())
.to(headersExchange()).where("age").exists();
}
}

这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,第一个 bindingName 方法中,whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value,就把该消息路由到名为 “name-queue” 的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则表示只要消息的 Header 中包含 age,不管 age 的值是多少,都将消息路由到名为 “age-queue” 的 Queue 上。

接下来创建两个消息消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Component
public class HeaderReceiver {
@RabbitListener(queues = "name-queue")
public void handler1(byte[] msg) {
System.out.println("HeaderReceiver:name:"
+ new String(msg, 0, msg.length));
}
@RabbitListener(queues = "age-queue")
public void handler2(byte[] msg) {
System.out.println("HeaderReceiver:age:"
+ new String(msg, 0, msg.length));
}
}

注意这里的参数用 byte 数组接收。然后在单元测试中创建消息的发送方法,这里消息的发送也和 routingkey 无关,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void headerTest() {
Message nameMsg = MessageBuilder
.withBody("hello header! name-queue".getBytes())
.setHeader("name", "sang").build();
Message ageMsg = MessageBuilder
.withBody("hello header! age-queue".getBytes())
.setHeader("age", "99").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
}
}

这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去。

最终执行效果如下:

3.4 Routing

这种情况是这样:

一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可。

如下图:

这个就是按照 routing key 去路由消息,我这里就不再举例子了,大家可以参考 3.3.1 小结。

3.5 Topics

这种情况是这样:

一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写时注意格式 xxx.xxx.xxx 去编写。

如下图:

这个我也就不举例啦,前面 3.3.3 小节已经举过例子了,不再赘述。

3.6 RPC

RPC 这种消息收发形式,松哥前两天刚刚写了文章和大家介绍,这里就不多说了,传送门:

  • SpringBoot+RabbitMQ 实现 RPC 调用

3.7 Publisher Confirms

这种发送确认松哥之前有写过相关文章,传送门:

  • 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?
  • RabbitMQ 高可用之如何确保消息成功消费
  1. 小结

好啦,今天这篇文章主要是和小伙伴们整理了 RabbitMQ 中消息收发的七种形式,感兴趣的小伙伴可以试试哦~

公众号【江南一点雨】后台回复 rabbitmqdemo,获取本文案例地址~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

28 Pika:如何基于SSD实现大容量Redis? 基

发表于 2021-11-16

基本概念

  • 基于大内存的大容量实例在实例恢复、主从同步过程中会引起一系列潜在问题
+ 例如:**恢复时间增长**
+ **主从切换开销大**
+ **缓冲区易溢出**
  • 360 公司 DBA 和基础架构组联合开发的 Pika\
  • pika目标(使用ssd平滑替代redis)\
+ **单实例可以保存大容量数据**,同时避免了实例恢复和主从同步时的潜在问题\
+ **和 Redis 数据类型保持兼容**,可以支持使用 Redis 的应用平滑地迁移到 Pika 上

大内存Redis实例的潜在问题

  • 大内存的潜在问题
+ **RDB生成**和**恢复效率**低  快照


    - **fork**时间久,导致主线程**阻塞**
    - 可能造成swap 内存换到磁盘
+ 全量同步时长增加、缓冲区易溢出


    - 全量同步 RDB文件很大导致全量同步时长增加
    - 主从切换的过程耗时增加,同样会影响业务的可用性

Pika整体架构

  • 整体架构
+ 网络框架
+ Pika线程模块
+ Nemo存储模块
+ RocksDB
+ binlog
  • 网络框架
+ 功能:负责底层网络请求的接收和发送
+ 实现:


    - 对操作系统底层的网络函数进行了封装 socket
    - Pika 线程模块采用了多线程模型来具体处理客户端请求\


        * 请求分发线程(DispatchThread)\
        * 一组工作线程(WorkerThread)(将请求封装成task)\
        * 线程池(ThreadPool)\
+ 调优:增加工作线程数和线程池的线程数
  • Nemo
+ 实现了 Pika 和 Redis 的数**据类型兼容**,降低Pika的学习成本
  • binlog
+ 记录写命令,用于主从节点的命令同步(避免大内存复制,命令比数据小很多)\

Pika 如何基于 SSD 保存更多数据?

  • 基本概念:使用了业界广泛应用的持久化键值数据库RocksDB
  • RocksDB的读写机制(不会占据太大的内存空间)\
+ RocksDB会使用两小块内存空间来交替缓存写入的数据(Memtable1,Memtable2)\


    - 一般为几MB,几十MB
+ 优先写入Memtable1,Memtable1则写入SSD中
+ 此时Memtable2将代替Memtable1
+ 等待Memtable1数据都写完且Memtable2写满,则切换到Memtable1
  • 为什么pika不会出现大文件同步的效率和内存溢出的问题
+ 基于 RocksDB 保存了**数据文件**,不需要再通过**内存快照**进行恢复了\
+ 实现增量命令同步,既节省了内存,还避免了**缓冲区溢出**的问题\
  • pika的优势
+ Pika 使用 RocksDB 把大量数据保存到了 SSD,同时避免了内存快照的生成和恢复问题\
+ Pika 使用 binlog 机制进行主从同步,避免大内存时的影响\

\

Pika 如何实现 Redis 数据类型兼容?

  • 基本概念:RocksDB 只提供了单值的键值对类型,只满足redis的string数据结构
  • Nemo 模块就负责把 Redis 的集合类型转换成单值的键值对\
+ redis集合类型


    - List 和 Set 类型,它们的集合中也只有单值\
    - Hash(field-value) 和 Sorted Set(member-score ) 类型,它们的集合中的元素是成对的\
+ list的转换


    - key:保证多位组成有意义 保存在list的顺序
    - value:前继和后继元素,存活时间,值,版本,存活时间
+ set的转换


    - key:保存set的key和value
    - value:保存版本和存活时间
+ hash的转换


    - key:size hashkey field1
    - value:value1,field2,value2.....
+ zset


    - 类似hash,不过会根据score进行排序

list的转化

\

Pika 的其他优势与不足

  • pika的优点
+ 实例重启快 直接从ssd中获取数据,不需要回放数据\
+ 执行全量同步的风险低,使用binlog(磁盘)实现增量同步,不必担心缓冲区大小的限制
+ 多线程模型,降低了读写ssd对pika的性能影响
  • pika的缺点
+ 访问性能低于redis


    - 将存储从缓存从内存转移到ssd
    - 记录binlog会降低效率
    - 个人感觉数据结构效率很低
  • 适用场景:保存大容量数据是我们的首要需求,那么,Pika 是一个不错的解决方案

\

总结

  • pika优点
+ 既支持 Redis 操作接口,又能支持保存大容量的数据\
+ 支持迁移redis
  • 调优
+ 增加线程数据量,提升并发请求处理能力
+ 使用高配的SSD,提升SSD自身访问性能
  • redis迁移pika
+ Redis 数据迁移到 Pika\


    - aof\_to\_pika -i [Redis AOF文件] -h [Pika IP] -p [Pika port] -a [认证信息]
+ 把 Redis 请求转发给 Pika\
  • github.com/Qihoo360/pi…\

\

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot整合Druid+全局事务管理+Mybat

发表于 2021-11-16

@[TOC]

SpringBoot整合Druid+全局事务管理+Mybatis-Plus+代码生成器

在springboot开发当中,Druid,全局事务管理,代码生成器都是非常实用的,特此记录下整合的过程

整合Druid连接池

springboot默认的连接池是:HikariCP,但是Druid的功能相对来说比较全面。
数据库连接池了解和常用连接池对比
Druid连接池官网

第一步:引入相关JAR

1
2
3
4
5
6
7
8
9
10
xml复制代码  <dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.22</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>

第二步:配置相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
yml复制代码spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
name: 数据源名称
driver-class-name: com.mysql.jdbc.Driver
username: root
password: 123456
url: jdbc:mysql://127.0.0.1:3306/springboot?characterEncoding=utf8&useSSL=false
# 连接池的配置信息
# 初始化大小,最小,最大
initial-size: 5
min-idle: 5
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
filters: stat,wall,slf4j,config
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
web-stat-filter:
enabled: true
url-pattern: "/"
exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
stat-view-servlet:
enabled: true
url-pattern: "/druid/*"
login-username: admin # 登录账号 不设置就不需要登录就可以访问druid可视化界面
login-password: 123456 # 登录密码
reset-enable: false
allow: "" # 白名单 表示所有
deny: 192.168.1.12 # 黑名单

第三步:在浏览器当中输入:http://127.0.0.1:8080/druid/index.html 即可进入可视化界面

全局事务管理器

springboot当中添加事务直接使用注解@Transactional 即可,但是每个方法都要添加比较麻烦,可以直接通过切面的方式添加一个全局的事务管理器。注意事项是,要注意方法名开头的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码@Configuration
@EnableTransactionManagement
public class TransactionConfiguration {

/**
* 配置全局事务的切点为service层的所有方法 AOP切面表达式 可参考(https://blog.csdn.net/ycf921244819/article/details/106599489)
* TODO 设置service层所在位置
*/
private static final String AOP_POINTCUT_EXPRESSION = "execution (* cn.hjljy.fastboot..*.service..*.*(..))";

/**
* 注入事务管理器
*/
@Autowired
private TransactionManager transactionManager;


/**
* 配置事务拦截器
*/
@Bean
public TransactionInterceptor txAdvice() {

RuleBasedTransactionAttribute txAttrRequired = new RuleBasedTransactionAttribute();
txAttrRequired.setName("REQUIRED事务");
//设置事务传播机制,默认是PROPAGATION_REQUIRED:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务
txAttrRequired.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
//设置异常回滚为Exception 默认是RuntimeException
List<RollbackRuleAttribute> rollbackRuleAttributes = new ArrayList<>();
rollbackRuleAttributes.add(new RollbackRuleAttribute(Exception.class));
txAttrRequired.setRollbackRules(rollbackRuleAttributes);

RuleBasedTransactionAttribute txAttrRequiredReadOnly = new RuleBasedTransactionAttribute();
txAttrRequiredReadOnly.setName("SUPPORTS事务");
//设置事务传播机制,PROPAGATION_SUPPORTS:如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务的方式继续运行
txAttrRequiredReadOnly.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
//设置异常回滚为Exception 默认是RuntimeException
txAttrRequiredReadOnly.setRollbackRules(rollbackRuleAttributes);
txAttrRequiredReadOnly.setReadOnly(true);

/*事务管理规则,声明具备事务管理的方法名*/
NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();
//方法名规则限制,必须以下列开头才会加入事务管理当中
source.addTransactionalMethod("add*", txAttrRequired);
source.addTransactionalMethod("save*", txAttrRequired);
source.addTransactionalMethod("create*", txAttrRequired);
source.addTransactionalMethod("insert*", txAttrRequired);
source.addTransactionalMethod("submit*", txAttrRequired);
source.addTransactionalMethod("del*", txAttrRequired);
source.addTransactionalMethod("remove*", txAttrRequired);
source.addTransactionalMethod("update*", txAttrRequired);
source.addTransactionalMethod("exec*", txAttrRequired);
source.addTransactionalMethod("set*", txAttrRequired);

//对于查询方法,根据实际情况添加事务管理 可能存在查询多个数据时,已查询出来的数据刚好被改变的情况
source.addTransactionalMethod("get*", txAttrRequiredReadOnly);
source.addTransactionalMethod("select*", txAttrRequiredReadOnly);
source.addTransactionalMethod("query*", txAttrRequiredReadOnly);
source.addTransactionalMethod("find*", txAttrRequiredReadOnly);
source.addTransactionalMethod("list*", txAttrRequiredReadOnly);
source.addTransactionalMethod("count*", txAttrRequiredReadOnly);
source.addTransactionalMethod("is*", txAttrRequiredReadOnly);
return new TransactionInterceptor(transactionManager, source);
}

/**
* 设置切面
*/
@Bean
public Advisor txAdviceAdvisor() {
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
pointcut.setExpression(AOP_POINTCUT_EXPRESSION);
return new DefaultPointcutAdvisor(pointcut, txAdvice());
}
}

整合Mybatis-Plus

第一步:引入JAR包

1
2
3
4
5
xml复制代码    <dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>

第二步:添加配置信息

1
2
3
4
5
6
7
8
9
10
11
yml复制代码mybatis-plus:
mapper-locations: classpath:mapper/*.xml #xml所在位置 不设置默认是在mapper类同级
configuration:
mapUnderscoreToCamelCase: true # 开启驼峰匹配 默认为true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 打印sql语句和入参数据
global-config:
db-config:
logic-delete-value: 1 #逻辑删除 配合@TableLogic注解
logic-not-delete-value: 0 #逻辑不删除
update-strategy: not_null # 更新时字段如果为null,就不进行更新该字段。
insert-strategy: not_null # 插入时如果字段为null,就不插入数据,建议数据库表字段设置默认值

第三步:添加分页和mapper扫描

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Configuration
@MapperScan("cn.hjljy.fastboot.mapper")
public class MybatisPlusConfiguration {
/**
* mybatis-plus分页插件
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor page = new PaginationInterceptor();
//设置分页数据库类型
page.setDbType(DbType.MYSQL);
page.setDialect(new MySqlDialect());
//优化count sql
page.setCountSqlParser(new JsqlParserCountOptimize(true));
//设置每页最大值
page.setLimit(999L);
return page;
}
}

第四步:创建一个Mapper类继承BaseMapper,就可以简单使用了。
可以参考官方文档入门:mp.baomidou.com/guide/quick…

整合代码生成器

AutoGenerator 是 MyBatis-Plus 的代码生成器,通过 AutoGenerator 可以快速生成 Entity、Mapper、Mapper XML、Service、Controller 等各个模块的代码,极大的提升了开发效率

考虑到dto和po在大部分情况下字段都是一样的,官方未提供DTO,所以可以拷贝一份entity.java.vm修改为dto.java.vm放在resources目录下面。然后根据自定义提示进行修改。
具体结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
java复制代码package $!{cfg.dtoPackage};

#foreach($pkg in ${table.importPackages})
import ${pkg};
#end
#if(${swagger2})
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
#end
#if(${entityLombokModel})
import lombok.Data;
import lombok.EqualsAndHashCode;
#if(${chainModel})
import lombok.experimental.Accessors;
#end
#end

/**
* <p>
* $!{table.comment}
* </p>
*
* @author ${author}
* @since ${date}
*/
#if(${entityLombokModel})
@Data
#if(${superEntityClass})
@EqualsAndHashCode(callSuper = true)
#else
@EqualsAndHashCode(callSuper = false)
#end
#if(${chainModel})
@Accessors(chain = true)
#end
#end
#if(${table.convert})
@TableName("${table.name}")
#end
#if(${swagger2})
@ApiModel(value="${entity}Dto对象", description="$!{table.comment}")
#end
#if(${superEntityClass})
public class ${entity}Dto extends ${superEntityClass}#if(${activeRecord})<${entity}>#end {
#elseif(${activeRecord})
public class ${entity}Dto extends Model<${entity}> {
#else
public class ${entity}Dto implements Serializable {
#end

#if(${entitySerialVersionUID})
private static final long serialVersionUID=1L;
#end
## ---------- BEGIN 字段循环遍历 ----------
#foreach($field in ${table.fields})

#if(${field.keyFlag})
#set($keyPropertyName=${field.propertyName})
#end
#if("$!field.comment" != "")
#if(${swagger2})
@ApiModelProperty(value = "${field.comment}")
#else
/**
* ${field.comment}
*/
#end
#end
#if(${field.keyFlag})
## 主键
#if(${field.keyIdentityFlag})
@TableId(value = "${field.annotationColumnName}", type = IdType.AUTO)
#elseif(!$null.isNull(${idType}) && "$!idType" != "")
@TableId(value = "${field.annotationColumnName}", type = IdType.${idType})
#elseif(${field.convert})
@TableId("${field.annotationColumnName}")
#end
## 普通字段
#elseif(${field.fill})
## ----- 存在字段填充设置 -----
#if(${field.convert})
@TableField(value = "${field.annotationColumnName}", fill = FieldFill.${field.fill})
#else
@TableField(fill = FieldFill.${field.fill})
#end
#elseif(${field.convert})
@TableField("${field.annotationColumnName}")
#end
## 乐观锁注解
#if(${versionFieldName}==${field.name})
@Version
#end
## 逻辑删除注解
#if(${logicDeleteFieldName}==${field.name})
@TableLogic
#end
private ${field.propertyType} ${field.propertyName};
#end
## ---------- END 字段循环遍历 ----------

#if(!${entityLombokModel})
#foreach($field in ${table.fields})
#if(${field.propertyType.equals("boolean")})
#set($getprefix="is")
#else
#set($getprefix="get")
#end

public ${field.propertyType} ${getprefix}${field.capitalName}() {
return ${field.propertyName};
}

#if(${chainModel})
public ${entity} set${field.capitalName}(${field.propertyType} ${field.propertyName}) {
#else
public void set${field.capitalName}(${field.propertyType} ${field.propertyName}) {
#end
this.${field.propertyName} = ${field.propertyName};
#if(${chainModel})
return this;
#end
}
#end
## --foreach end---
#end
## --end of #if(!${entityLombokModel})--

#if(${entityColumnConstant})
#foreach($field in ${table.fields})
public static final String ${field.name.toUpperCase()} = "${field.name}";

#end
#end
#if(${activeRecord})
@Override
protected Serializable pkVal() {
#if(${keyPropertyName})
return this.${keyPropertyName};
#else
return null;
#end
}

#end
#if(!${entityLombokModel})
@Override
public String toString() {
return "${entity}{" +
#foreach($field in ${table.fields})
#if($!{foreach.index}==0)
"${field.propertyName}=" + ${field.propertyName} +
#else
", ${field.propertyName}=" + ${field.propertyName} +
#end
#end
"}";
}
#end
}

具体代码生成器的执行代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
java复制代码public class CodeGenerator {

public static void main(String[] args) {
// 代码生成器
AutoGenerator mpg = new AutoGenerator();

// 全局配置
GlobalConfig gc = new GlobalConfig();
String projectPath = System.getProperty("user.dir");
gc.setOutputDir(projectPath + "/src/main/java");
gc.setAuthor("海加尔金鹰(www.hjljy.cn)");
gc.setOpen(false);
//设置实体类后缀
gc.setEntityName("%sPo");
//实体属性 Swagger2 注解
gc.setSwagger2(true);
gc.setBaseColumnList(true);
gc.setBaseResultMap(true);
mpg.setGlobalConfig(gc);

// 数据源配置
DataSourceConfig dsc = new DataSourceConfig();
dsc.setUrl("jdbc:mysql://localhost:3306/springboot?serverTimezone=GMT&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true");
dsc.setDriverName("com.mysql.jdbc.Driver");
dsc.setUsername("root");
dsc.setPassword("123456");
mpg.setDataSource(dsc);

// 包配置
PackageConfig pc = new PackageConfig();
pc.setModuleName(null);
String scanner = scanner("请输入整体业务包名");
String modelName = StringUtils.isBlank(scanner) ? "" : "."+scanner;
//moduleName是整体分模块

pc.setParent("cn.hjljy.fastboot");
pc.setMapper("mapper"+modelName);
pc.setService("service"+modelName);
pc.setServiceImpl("service"+modelName+".impl");
pc.setEntity("pojo"+modelName+".po");
pc.setController("controller"+modelName);
mpg.setPackageInfo(pc);


String dtoPath = pc.getParent() + ".pojo.dto";
// 配置模板
TemplateConfig templateConfig = new TemplateConfig();
// 不输出默认的XML 默认生成的xml在mapper层里面
templateConfig.setXml(null);
mpg.setTemplate(templateConfig);

//配置自定义输出的文件 xml和dto
//模板引擎是 velocity
String xmlTemplatePath = "/templates/mapper.xml.vm";
// 自定义输出配置
List<FileOutConfig> focList = new ArrayList<>();
// 自定义配置会被优先输出
focList.add(new FileOutConfig(xmlTemplatePath) {
@Override
public String outputFile(TableInfo tableInfo) {
// 自定义输出文件名 , 如果你 Entity 设置了前后缀、此处注意 xml 的名称会跟着发生变化!!
return projectPath + "/src/main/resources/mapper/" + scanner
+ "/" + tableInfo.getEntityName() + "Mapper" + StringPool.DOT_XML;
}
});

String dtoTemplatePath = "/dto.java.vm";
// 自定义配置会被优先输出
focList.add(new FileOutConfig(dtoTemplatePath) {
@Override
public String outputFile(TableInfo tableInfo) {
// 自定义输出文件名 , 如果你 Entity 设置了前后缀、此处注意 xml 的名称会跟着发生变化!!
return projectPath + "/src/main/java/cn/hjljy/fastboot/pojo/"+scanner+"/dto/" +
tableInfo.getEntityName() + "Dto" + StringPool.DOT_JAVA;
}
});

// 自定义配置
InjectionConfig cfg = new InjectionConfig() {

@Override
public void initMap() {
Map<String, Object> map = new HashMap<>();
map.put("dtoPackage", dtoPath);
this.setMap(map);
}
};
cfg.setFileOutConfigList(focList);
mpg.setCfg(cfg);
// 策略配置
StrategyConfig strategy = new StrategyConfig();
strategy.setNaming(NamingStrategy.underline_to_camel);
strategy.setColumnNaming(NamingStrategy.underline_to_camel);
strategy.setEntityLombokModel(true);
strategy.setRestControllerStyle(true);
strategy.setInclude(scanner("表名,多个英文逗号分割").split(","));
strategy.setControllerMappingHyphenStyle(true);
//设置逻辑删除字段
strategy.setLogicDeleteFieldName("status");
mpg.setStrategy(strategy);
mpg.setTemplateEngine(new VelocityTemplateEngine());
mpg.execute();
}

/**
* <p>
* 读取控制台内容
* </p>
*/
public static String scanner(String tip) {
Scanner scanner = new Scanner(System.in);
StringBuilder help = new StringBuilder();
help.append("请输入" + tip + ":");
System.out.println(help.toString());
if (scanner.hasNext()) {
String ipt = scanner.next();
if (StringUtils.isNotEmpty(ipt)) {
return ipt;
}
}
throw new MybatisPlusException("请输入正确的" + tip + "!");
}
}

总结

算是框架里面非常基础的一些东西。不过能够提高不少的开发效率!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

springboot之线程池ThreadPoolTaskEx

发表于 2021-11-16

前言

最近项目当中有需求,要进行异步的处理,需要使用到线程池,很久没有使用到线程池了,一来是做JAVAweb开发基本上很少用到异步处理,二来是发现有的老项目里面,线程和线程池的使用比较混乱,有好几个线程池,有的线程池是通过spring管理的,有的是自己创建的,然后有的地方是直接创建的线程。所以这里记录下自己在项目当中如何优雅的使用线程池!避免项目当中到处都是线程池!!!

SpringBoot整合ThreadPoolTaskExecutor线程池

ThreadPoolExecutor:这个是JAVA自己实现的线程池执行类,基本上创建线程池都是通过这个类进行的创建!
ThreadPoolTaskExecutor :这个是springboot基于ThreadPoolExecutor实现的一个线程池执行类。

In the absence of an Executor bean in the context, Spring Boot auto-configures a ThreadPoolTaskExecutor with sensible defaults that can be automatically associated to asynchronous task execution (@EnableAsync) and Spring MVC asynchronous request processing.

在springboot当中,根据 官方文档的说明,如果没有配置线程池的话,springboot会自动配置一个ThreadPoolTaskExecutor 线程池到bean当中,我们只需要按照他的方式调用就可以了!!!

使用springboot默认的线程池

既然springboot有默认的线程池,说明我们可以很简单的进行调用

方式一:通过@Async注解调用

第一步:在Application启动类上面加上@EnableAsync

1
2
3
4
5
6
7
java复制代码@SpringBootApplication
@EnableAsync
public class ThreadpoolApplication {
public static void main(String[] args) {
SpringApplication.run(ThreadpoolApplication.class, args);
}
}

第二步:在需要异步执行的方法上加上@Async注解

1
2
3
4
5
6
7
8
9
java复制代码@Service
public class AsyncTest {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Async
public void hello(String name){
//这里使用logger 方便查看执行的线程是什么
logger.info("异步线程启动 started."+name);
}
}

第三步:测试类进行测试验证

1
2
3
4
5
6
7
8
java复制代码    @Autowired
AsyncTest asyncTest;
@Test
void contextLoads() throws InterruptedException {
asyncTest.hello("afsasfasf");
//一定要休眠 不然主线程关闭了,子线程还没有启动
Thread.sleep(1000);
}

查看打印的日志:

INFO 2276 — [ main] c.h.s.t.t.ThreadpoolApplicationTests : Started ThreadpoolApplicationTests in 3.003 seconds (JVM running for 5.342)
INFO 2276 — [ task-1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.afsasfasf

可以清楚的看到新开了一个task-1的线程执行任务。验证成功!!!

方式二:直接调用ThreadPoolTaskExecutor

修改上面测试类,直接注入ThreadPoolTaskExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@SpringBootTest
class ThreadPoolApplicationTests {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
AsyncTest asyncTest;
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Test
void contextLoads() throws InterruptedException {
asyncTest.hello("async注解创建");
threadPoolTaskExecutor.submit(new Thread(()->{
logger.info("threadPoolTaskExecutor 创建线程");
}));
//一定要休眠 不然主线程关闭了,子线程还没有启动
Thread.sleep(1000);
}
}

查看打印的日志发现都成功创建线程!!!:

INFO 12360 — [ task-2] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程
INFO 12360 — [ task-1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建

备注1:如果只使用ThreadPoolTaskExecutor, 是可以不用在Application启动类上面加上@EnableAsync注解的哦!!!
备注2:多次测试发现ThreadPoolTaskExecutor执行比@Async要快!!!

线程池默认配置信息

以下是springboot默认的线程池配置,可以在application.properties文件当中进行相关的设置!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
xml复制代码# 核心线程数
spring.task.execution.pool.core-size=8
# 最大线程数
spring.task.execution.pool.max-size=16
# 空闲线程存活时间
spring.task.execution.pool.keep-alive=60s
# 是否允许核心线程超时
spring.task.execution.pool.allow-core-thread-timeout=true
# 线程队列数量
spring.task.execution.pool.queue-capacity=100
# 线程关闭等待
spring.task.execution.shutdown.await-termination=false
spring.task.execution.shutdown.await-termination-period=
# 线程名称前缀
spring.task.execution.thread-name-prefix=task-

深入springboot默认的线程池

根据官方文档的说明,Spring Boot auto-configures a ThreadPoolTaskExecutor 。最终找到springboot的线程池自动装配类:TaskExecutionAutoConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码    @Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) {
Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
Stream var10001 = taskExecutorCustomizers.orderedStream();
var10001.getClass();
builder = builder.customizers(var10001::iterator);
builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique());
return builder;
}

同时在ThreadPoolTaskExecutor源码当中可以看到线程池的初始化方式是直接调用的ThreadPoolExecutor进行的初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码 protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
public void execute(Runnable command) {
Runnable decorated = ThreadPoolTaskExecutor.this.taskDecorator.decorate(command);
if (decorated != command) {
ThreadPoolTaskExecutor.this.decoratedTaskMap.put(decorated, command);
}

super.execute(decorated);
}
};
} else {
executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
}

if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}

this.threadPoolExecutor = executor;
return executor;
}

同时会发现默认的线程池拒绝策略是: AbortPolicy 直接抛出异常!!!

private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy();

使用自定义的线程池

在默认配置信息里面是没有线程池的拒绝策略设置的方法的,如果需要更换拒绝策略就需要自定义线程池,并且如果项目当中需要多个自定义的线程池,又要如何进行管理呢?

自定义Configuration

第一步:创建一个ThreadPoolConfig 先只配置一个线程池,并设置拒绝策略为CallerRunsPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@Configuration
public class ThreadPoolConfig {

@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}

然后执行之前写的测试代码发现,使用的线程池已经变成自定义的线程池了。

INFO 12740 — [ myExecutor–2] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程
INFO 12740 — [ myExecutor–1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建

第二步:如果配置有多个线程池,该如何指定线程池呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码@Configuration
public class ThreadPoolConfig {

@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}

@Bean("poolExecutor")
public Executor poolExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor2--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}

@Bean("taskPoolExecutor")
public Executor taskPoolExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor3--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}

执行测试类,直接报错说找到多个类,不知道加载哪个类:

No qualifying bean of type ‘org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor’ available: expected single matching bean but found 3: taskExecutor,taskPoolExecutor

由于测试类当中是这样自动注入的:

1
2
java复制代码@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;

考虑到@Autowired 以及@Resource两个注入时的存在多个类如何匹配问题,然后发现只要我们在注入时指定具体的bean就会调用对应的线程池!!!

即修改测试类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码    @Autowired
AsyncTest asyncTest;
@Autowired
ThreadPoolTaskExecutor poolExecutor; //会去匹配 @Bean("poolExecutor") 这个线程池
@Test
void contextLoads() throws InterruptedException {
asyncTest.hello("async注解创建");
//一定要休眠 不然主线程关闭了,子线程还没有启动
poolExecutor.submit(new Thread(()->{
logger.info("threadPoolTaskExecutor 创建线程");
}));
Thread.sleep(1000);
}

最后得到如下信息:

INFO 13636 — [ myExecutor2–1] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程
INFO 13636 — [ myExecutor–1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建

备注1:如果是使用的@Async注解,只需要在注解里面指定bean的名称就可以切换到对应的线程池去了。如下所示:

1
2
3
4
java复制代码	@Async("taskPoolExecutor")
public void hello(String name){
logger.info("异步线程启动 started."+name);
}

备注2:如果有多个线程池,但是在@Async注解里面没有指定的话,会默认加载第一个配置的线程池

总结

线程池的四种拒绝策略:www.cnblogs.com/cblogs/p/94…
JAVA常用的四种线程池: www.cnblogs.com/zhujiabin/p…
线程池的使用是为了管理线程,但是对于线程池项目当中也是要管理起来的。有利于后续的维护!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JVM(Java Virtual Machine)对象创建过

发表于 2021-11-16

对象创建详解

  1. 当虚拟机遇到字节码指令new时,判断指令是否能在常量池中定位到一个类的符号引用,并检查这个类是否被加载,解析和初始化,如果没有则执行类的初始化。
  2. 为对象分配内存 (对象所占用内存大小在类加载完成后即可确定)

2.1)、 内存分配的俩种方式:

1
2
3
4
5
6
7
8
9
makefile复制代码2.1.1)、指针碰撞: 是指内存是整齐有序情况下

2.1.2)、空闲列表: 内存存储不规整情况下

2.1.3)、内存是否整齐是由垃圾回收器是否带有空间压缩整理来决定的。

2.1.4) 、Serial、ParNew 等垃圾回收器带空间压缩整理

2.1.5) 、CMS这种基于清除算法垃圾回收器,只能采用空闲列表分配内存

2.2)、对象在虚拟机中频繁创建,在并发场景下也是不安全的。
(可能出现正在给对象A分配内存,指针还没来得及修改,对象B又同时使用了原来的指针来分配内存的情况) 以上问题有两种解决方式。

1
2
3
bash复制代码2.2.1 、同步锁:JVM采用CAS方式失败重试方式来保证原子性。

2.2.2 、本地线程分配缓存区(TLAB):按照不同线程划分不同空间中进行,每个线程在java堆中预先分配一块小内存,称为本地线程分配缓存区,那个线程需要分配就在对应的缓存区中分配,如果本地缓存区使用完了,分配新的缓存区在使用同步锁方式。是否使用TLAB,可通过配置参数决定(-XX:+/-useTALB)。以空间换取时间方式,默认大小是512k。
  1. 对象的内存布局

堆在内存中存储布局分为三部分:对象头、实例数据、对齐填充

对象头:

第一种:自身的运行时数据(哈希吗、GC分代年龄、锁标志状态、线程持有锁…)

第二种:类型指针(对象指向它的类型元数据,虚拟机通过指针确定该对象是那个类的实例)

实例数据:

存储有效数据(代码所定义的各种字段内从,父类继承或者是子类中定义的都保存下来)

对齐填充:

虚拟机要求对象起始地址必须是8字节的整数倍,因此,如果对象实例数据部分没有对齐的话,就需要通过对齐填充来补全。

4.以上完成后从虚拟机角度来看一个新对象已经产生,如果从应用程序来看对象才开始构造函数。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

钉钉宜搭入选Forrester《中国低代码平台市场分析报告》

发表于 2021-11-16

简介: 🎉 最新:钉钉宜搭入选Forrester《中国低代码平台市场分析报告》!

11月12日,全球知名研究机构Forrester发布《中国低代码平台市场分析报告(The State Of Low-Code Platforms In China)》,钉钉宜搭入围。报告整体分析了中国各类低代码平台的特点,并对低代码技术如何帮助中国企业加速数字化转型进行了深入探讨。报告指出,宜搭与阿里云、钉钉能力的深度融合,所产生的应用天然与钉钉应用深度集成,让组织内外的协同更高效。

“低代码”这一概念由Forrester在2014年正式提出。7年来,低代码技术在中国逐渐成为主流趋势。钉钉宜搭等领先的低代码平台正在服务各垂直领域的企业,提升应用程序交付效率、降低运营成本,扩大低代码开发者人群规模并激发个人的创新力。低代码技术在中国的快速发展,使其成为企业加快数字化转型、适应未来的关键技术战略。

Forrester的报告指出:中国公司的商业及技术决策者在数字化转型过程中优先考虑低代码技术。58%的人正在采用低代码平台和工具进行软件开发,16%的人正计划这样做。低代码平台与人工智能、物联网的协同作用使其在未来更为适合支撑企业的核心竞争力。

加快数字创新,提高成本效益。低代码平台和工具降低了创新团队尝试新想法的障碍。低代码平台提供可配置的UI、预置模板以及自动化能力,以方便快速交付应用程序和自定义流程,帮助企业低成本创新并为业务创造价值。

图:钉钉宜搭提供的200+模板

业务人员也能快速上手。为了响应市场和客户需求的变化,企业需要修改或扩展现有的业务应用程序,甚至构建新的应用程序。低代码平台出现后,企业业务人员可以使用各类预配置解决方案,从而提高生产力。

图:用钉钉宜搭“拖拉拽”即可搭建应用

动态支持弹性运营。企业可以通过采用云原生架构的低代码平台,开发应用程序实现弹性运营。低代码平台可确保应用程序安全、可管理且可扩展,帮助企业构建灵活的能力来应对危机和事件。新冠肺炎疫情加速了这些实践,并证明了低代码工具的价值。

图:疫情期间,钉钉宜搭平均1.5天上线一个应用

Forrester在报告中还举例说明了低代码平台在中国各类行业:如银行、保险、零售、医疗保健、政府、制造、电信和建筑等领域的广泛应用场景。Forrester在报告中写道:“家居零售企业居然之家使用阿里巴巴旗下产品宜搭,开发了400多个管理和财务应用程序,将运营效率提升了60%。”

Forrester在报告中指出,公共云服务提供商提供云生态系统协同能力,可实现多样化的云环境部署。例如阿里巴巴的宜搭结合了阿里云和钉钉的独特优势,所开发的应用天然与钉钉内其他应用集成,让组织内外协同更高效。用宜搭来构建企业专属应用,你只需关注业务本身,其他例如数据存储、运行环境、服务器、网络安全等,平台为你全部搞定。此外,借助阿里云,宜搭还提供了强大的弹性计算、动态扩容能力,为你的业务高效、稳定的运行保驾护航。

宜搭作为钉钉官方低代码应用搭建平台,和钉钉原生能力默认打通。宜搭默认使用钉钉企业通讯录,流程审批可基于组织架构,方便、快捷。支持钉钉消息、钉钉待办,确保重要事项消息必达。搭建好的应用可快速接入企业工作台,高效实现在线协同、移动办公。

宜搭近期发布的3.0版本,带来了在连接器、数据大屏以及安全方面的重磅更新:3大连接能力套件,让业务互联互通;更低门槛的数据BI能力,一键生成酷炫数字大屏;全局水印、独立域名等安全能力,为用户提供国密级专属安全。

连接的效率和深度,决定了企业的生产力。宜搭3大连接能力套件:Excel一键升级企业数字化应用,宜搭连接器以及跨组织流程审批和应用分发,实现了应用与应用之间,应用与人之间,企业上下游之间的全链路连接,宜搭搭建的应用在钉钉上天然互联互通。

业务数据全面打通,一键生成酷炫大屏。本次宜搭全新发布了20多款常用报表可视化组件,提供10多款开箱即用的可视化炫酷大屏模版,涵盖销售、人事、生产制造等诸多领域,可广泛用于展示汇报、指挥决策,业务分析等场景。支持100万数据量的表与表关联,和一亿数据量的海量数据处理能力。

全局水印,独立域名,提供全面安全保障。保护客户的数据安全是第一原则,宜搭平台依托钉钉全面的安全防护策略,获得公安部信息系统三级等级保护认证,数据享受国密级别安全保护。宜搭上线应用全局水印功能,一旦发生信息泄漏可快速追溯来源。统一风控账号体系+独立企业域名,筑起安全高墙,为企业提供专业级安全防护。

低代码技术已成为中国企业加速数字化转型和适应未来战略的关键部分,钉钉宜搭将会一直陪伴在用户身边,通过低代码这一新生产力工具,不断激发个人和组织的创造力,全面加速企业的数字化转型。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

庖丁解InnoDB之UNDO LOG 一 Undo Log的

发表于 2021-11-16

简介: Undo Log是InnoDB十分重要的组成部分,它的作用横贯InnoDB中两个最主要的部分,并发控制(Concurrency Control)和故障恢复(Crash Recovery),InnoDB中Undo Log的实现亦日志亦数据。本文将从其作用、设计思路、记录内容、组织结构,以及各种功能实现等方面,整体介绍InnoDB中的Undo Log。

作者 | 瀚之

来源 | 阿里技术公众号

Undo Log是InnoDB十分重要的组成部分,它的作用横贯InnoDB中两个最主要的部分,并发控制(Concurrency Control)和故障恢复(Crash Recovery),InnoDB中Undo Log的实现亦日志亦数据。本文将从其作用、设计思路、记录内容、组织结构,以及各种功能实现等方面,整体介绍InnoDB中的Undo Log,文章会深入一定的代码实现,但在细节上还是希望用抽象的实现思路代替具体的代码。本文基于MySQL 8.0,但在大多数的设计思路上MySQL的各个版本都是一致的。考虑到篇幅有限,以及避免过多信息的干扰,从而能够聚焦Undo Log本身的内容,本文中一笔带过或有意省略了一些内容,包括索引、事务系统、临时表、XA事务、Virtual Column、外部记录、Blob等。

一 Undo Log的作用

数据库故障恢复机制的前世今生中提到过,Undo Log用来记录每次修改之前的历史值,配合Redo Log用于故障恢复。这也就是InnoDB中Undo Log的第一个作用:

1 事务回滚

在设计数据库时,我们假设数据库可能在任何时刻,由于如硬件故障,软件Bug,运维操作等原因突然崩溃。这个时候尚未完成提交的事务可能已经有部分数据写入了磁盘,如果不加处理,会违反数据库对Atomic的保证,也就是任何事务的修改要么全部提交,要么全部取消。针对这个问题,直观的想法是等到事务真正提交时,才能允许这个事务的任何修改落盘,也就是No-Steal策略。显而易见,这种做法一方面造成很大的内存空间压力,另一方面提交时的大量随机IO会极大的影响性能。因此,数据库实现中通常会在正常事务进行中,就不断的连续写入Undo Log,来记录本次修改之前的历史值。当Crash真正发生时,可以在Recovery过程中通过回放Undo Log将未提交事务的修改抹掉。InnoDB采用的就是这种方式。

既然已经有了在Crash Recovery时支持事务回滚的Undo Log,自然地,在正常运行过程中,死锁处理或用户请求的事务回滚也可以利用这部分数据来完成。

2 MVCC(Multi-Versioin Concurrency Control)

浅析数据库并发控制机制中提到过,为了避免只读事务与写事务之间的冲突,避免写操作等待读操作,几乎所有的主流数据库都采用了多版本并发控制(MVCC)的方式,也就是为每条记录保存多份历史数据供读事务访问,新的写入只需要添加新的版本即可,无需等待。InnoDB在这里复用了Undo Log中已经记录的历史版本数据来满足MVCC的需求。

二 什么样的Undo Log

庖丁解InnoDB之REDO LOG中讲过的基于Page的Redo Log可以更好的支持并发的Redo应用,从而缩短DB的Crash Recovery时间。而对于Undo Log来说,InnoDB用Undo Log来实现MVCC,DB运行过程中是允许有历史版本的数据存在的。因此,Crash Recovery时利用Undo Log的事务回滚完全可以在后台,像正常运行的事务一样异步回滚,从而让数据库先恢复服务。因此,Undo Log的设计思路不同于Redo Log,Undo Log需要的是事务之间的并发,以及方便的多版本数据维护,其重放逻辑不希望因DB的物理存储变化而变化。因此,InnoDB中的Undo Log采用了基于事务的Logical Logging的方式。

同时,更多的责任意味着更复杂的管理逻辑,InnoDB中其实是把Undo当做一种数据来维护和使用的,也就是说,Undo Log日志本身也像其他的数据库数据一样,会写自己对应的Redo Log,通过Redo Log来保证自己的原子性。因此,更合适的称呼应该是Undo Data。

三 Undo Record中的内容

每当InnoDB中需要修改某个Record时,都会将其历史版本写入一个Undo Log中,对应的Undo Record是Update类型。当插入新的Record时,还没有一个历史版本,但为了方便事务回滚时做逆向(Delete)操作,这里还是会写入一个Insert类型的Undo Record。

1 Insert类型的Undo Record

这种Undo Record在代码中对应的是TRX_UNDO_INSERT_REC类型。不同于Update类型的Undo Record,Insert Undo Record仅仅是为了可能的事务回滚准备的,并不在MVCC功能中承担作用。因此只需要记录对应Record的Key,供回滚时查找Record位置即可。

其中Undo Number是Undo的一个递增编号,Table ID用来表示是哪张表的修改。下面一组Key Fields的长度不定,因为对应表的主键可能由多个field组成,这里需要记录Record完整的主键信息,回滚的时候可以通过这个信息在索引中定位到对应的Record。除此之外,在Undo Record的头尾还各留了两个字节用户记录其前序和后继Undo Record的位置。

2 Update类型的Undo Record

由于MVCC需要保留Record的多个历史版本,当某个Record的历史版本还在被使用时,这个Record是不能被真正的删除的。因此,当需要删除时,其实只是修改对应Record的Delete Mark标记。对应的,如果这时这个Record又重新插入,其实也只是修改一下Delete Mark标记,也就是将这两种情况的delete和insert转变成了update操作。再加上常规的Record修改,因此这里的Update Undo Record会对应三种Type:TRX_UNDO_UPD_EXIST_REC、TRX_UNDO_DEL_MARK_REC和TRX_UNDO_UPD_DEL_REC。他们的存储内容也类似:

除了跟Insert Undo Record相同的头尾信息,以及主键Key Fileds之外,Update Undo Record增加了:

  • Transaction Id记录了产生这个历史版本事务Id,用作后续MVCC中的版本可见性判断
  • Rollptr指向的是该记录的上一个版本的位置,包括space number,page number和page内的offset。沿着Rollptr可以找到一个Record的所有历史版本。
  • Update Fields中记录的就是当前这个Record版本相对于其之后的一次修改的Delta信息,包括所有被修改的Field的编号,长度和历史值。

四 Undo Record的组织方式

上面介绍了一个Undo Record中的存放的内容,每一次的修改都会产生至少一个Undo Record,那么大量Undo Record如何组织起来,来支持高效的访问和管理呢,这一小节我们将从几个层面来进行介绍:首先是在不考虑物理存储的情况下的逻辑组织方式;之后,物理组织方式介绍如何将其存储到到实际16KB物理块中;然后文件组织方式介绍整体的文件结构;最后再介绍其在内存中的组织方式。

1 逻辑组织方式 - Undo Log

每个事务其实会修改一组的Record,对应的也就会产生一组Undo Record,这些Undo Record收尾相连就组成了这个事务的Undo Log。除了一个个的Undo Record之外,还在开头增加了一个Undo Log Header来记录一些必要的控制信息,因此,一个Undo Log的结构如下所示:

Undo Log Header中记录了产生这个Undo Log的事务的Trx ID;Trx No是事务的提交顺序,也会用这个来判断是否能Purge,这个在后面会详细介绍;Delete Mark标明该Undo Log中有没有TRX_UNDO_DEL_MARK_REC类型的Undo Record,避免Purge时不必要的扫描;Log Start Offset中记录Undo Log Header的结束位置,方便之后Header中增加内容时的兼容;之后是一些Flag信息;Next Undo Log及Prev Undo Log标记前后两个Undo Log,这个会在接下来介绍;最后通过History List Node将自己挂载到为Purge准备的History List中。

索引中的同一个Record被不同事务修改,会产生不同的历史版本,这些历史版本又通过Rollptr穿成一个链表,供MVCC使用。如下图所示:

示例中有三个事务操作了表t上,主键id是1的记录,首先事务I插入了这条记录并且设置filed a的值是A,之后事务J和事务K分别将这条id为1的记录中的filed a的值修改为了B和C。I,J,K三个事务分别有自己的逻辑上连续的三条Undo Log,每条Undo Log有自己的Undo Log Header。从索引中的这条Record沿着Rollptr可以依次找到这三个事务Undo Log中关于这条记录的历史版本。同时可以看出,Insert类型Undo Record中只记录了对应的主键值:id=1,而Update类型的Undo Record中还记录了对应的历史版本的生成事务Trx_id,以及被修改的field a的历史值。

2 物理组织格式 - Undo Segment

上面描述了一个Undo Log的结构,一个事务会产生多大的Undo Log本身是不可控的,而最终写入磁盘却是按照固定的块大小为单位的,InnoDB中默认是16KB,那么如何用固定的块大小承载不定长的Undo Log,以实现高效的空间分配、复用,避免空间浪费。InnoDB的基本思路是让多个较小的Undo Log紧凑存在一个Undo Page中,而对较大的Undo Log则随着不断的写入,按需分配足够多的Undo Page分散承载。下面我们就看看这部分的物理存储方式:

如上所示,是一个Undo Segment的示意图,每个写事务开始写操作之前都需要持有一个Undo Segment,一个Undo Segment中的所有磁盘空间的分配和释放,也就是16KB Page的申请和释放,都是由一个FSP的Segment管理的,这个跟索引中的Leaf Node Segment和Non-Leaf Node Segment的管理方式是一致的,这部分之后会有单独的文章来进行介绍。

Undo Segment会持有至少一个Undo Page,每个Undo Page会在开头38字节到56字节记录Undo Page Header,其中记录Undo Page的类型、最后一条Undo Record的位置,当前Page还空闲部分的开头,也就是下一条Undo Record要写入的位置。Undo Segment中的第一个Undo Page还会在56字节到86字节记录Undo Segment Header,这个就是这个Undo Segment中磁盘空间管理的Handle;其中记录的是这个Undo Segment的状态,比如TRX_UNDO_CACHED、TRX_UNDO_TO_PURGE等;这个Undo Segment中最后一条Undo Record的位置;这个FSP Segment的Header,以及当前分配出来的所有Undo Page的链表。

Undo Page剩余的空间都是用来存放Undo Log的,对于像上图Undo Log 1,Undo Log 2这种较短的Undo Log,为了避免Page内的空间浪费,InnoDB会复用Undo Page来存放多个Undo Log,而对于像Undo Log 3这种比较长的Undo Log可能会分配多个Undo Page来存放。需要注意的是Undo Page的复用只会发生在第一个Page。

3 文件组织方式 - Undo Tablespace

每一时刻一个Undo Segment都是被一个事务独占的。每个写事务都会持有至少一个Undo Segment,当有大量写事务并发运行时,就需要存在多个Undo Segment。InnoDB中的Undo 文件中准备了大量的Undo Segment的槽位,按照1024一组划分为Rollback Segment。每个Undo Tablespace最多会包含128个Rollback Segment,Undo Tablespace文件中的第三个Page会固定作为这128个Rollback Segment的目录,也就是Rollback Segment Arrary Header,其中最多会有128个指针指向各个Rollback Segment Header所在的Page。Rollback Segment Header是按需分配的,其中包含1024个Slot,每个Slot占四个字节,指向一个Undo Segment的First Page。除此之前还会记录该Rollback Segment中已提交事务的History List,后续的Purge过程会顺序从这里开始回收工作。

可以看出Rollback Segment的个数会直接影响InnoDB支持的最大事务并发数。MySQL 8.0由于支持了最多127个独立的Undo Tablespace,一方面避免了ibdata1的膨胀,方便undo空间回收,另一方面也大大增加了最大的Rollback Segment的个数,增加了可支持的最大并发写事务数。如下图所示:

4 内存组织结构

上面介绍的都是Undo数据在磁盘上的组织结构,除此之外,在内存中也会维护对应的数据结构来管理Undo Log,如下图所示:

对应每个磁盘Undo Tablespace会有一个undo::Tablespace的内存结构,其中最主要的就是一组trx_rseg_t的集合,trx_rseg_t对应的就是上面介绍过的一个Rollback Segment Header,除了一些基本的元信息之外,trx_rseg_t中维护了四个trx_undo_t的链表,Update List中是正在被使用的用于写入Update类型Undo的Undo Segment;Update Cache List中是空闲空间比较多,可以被后续事务复用的Update类型Undo Segment;对应的,Insert List和Insert Cache List分别是正在使用中的Insert类型Undo Segment,和空间空间较多,可以被后续复用的Insert类型Undo Segment。因此trx_undo_t对应的就是上面介绍过的Undo Segment。接下来,我们就从Undo的写入、Undo用于Rollback、MVCC、Crash Recovery以及如何清理Undo等方面来介绍InnoDB中Undo的角色和功能。

五 Undo的写入

当写事务开始时,会先通过trx_assign_rseg_durable分配一个Rollback Segment,该事务的内存结构trx_t也会通过rsegs指针指向对应的trx_rseg_t内存结构,这里的分配策略很简单,就是依次尝试下一个Active的Rollback Segment。之后当第一次真正产生修改需要写Undo Record的时,会调用trx_undo_assign_undo来获得一个Undo Segment。这里会优先复用trx_rseg_t上Cached List中的trx_undo_t,也就是已经分配出来但没有被正在使用的Undo Segment,如果没有才调用trx_undo_create创建新的Undo Segment,trx_undo_create中会轮询选择当前Rollback Segment中可用的Slot,也是就值FIL_NUL的Slot,申请新的Undo Page,初始化Undo Page Header,Undo Segment Header等信息,创建新的trx_undo_t内存结构并挂到trx_rseg_t的对应List中。

获得了可用的Undo Segment之后,该事务会在合适的位置初始化自己的Undo Log Header,之后,其所有修改产生的Undo Record都会顺序的通过trx_undo_report_row_operation顺序的写入当前的Undo Log,其中会根据是insert还是update类型,分别调用trx_undo_page_report_insert或者trx_undo_page_report_modify。本文开始已经介绍过了具体的Undo Record内容。简单的讲,insert类型会记录插入Record的主键,update类型除了记录主键以外还会有一个update fileds记录这个历史值跟索引值的diff。之后指向当前Undo Record位置的Rollptr会返回写入索引的Record上。

当一个Page写满后,会调用trx_undo_add_page来在当前的Undo Segment上添加新的Page,新Page写入Undo Page Header之后继续供事务写入Undo Record,为了方便维护,这里有一个限制就是单条Undo Record不跨page,如果当前Page放不下,会将整个Undo Record写入下一个Page。

当事务结束(commit或者rollback)之后,如果只占用了一个Undo Page,且当前Undo Page使用空间小于page的3/4,这个Undo Segment会保留并加入到对应的insert/update cached list中。否则,insert类型的Undo Segment会直接回收,而update类型的Undo Segment会等待后台的Purge做完后回收。根据不同的情况,Undo Segment Header中的State会被从TRX_UNDO_ACTIVE改成TRX_UNDO_TO_FREE,TRX_UNDO_TO_PURGE或TRX_UNDO_CACHED,这个修改其实就是InnoDB的事务结束的标志,无论是Rollback还是Commit,在这个修改对应的Redo落盘之后,就可以返回用户结果,并且Crash Recovery之后也不会再做回滚处理。

六 Undo for Rollback

InnoDB中的事务可能会由用户主动触发Rollback;也可能因为遇到死锁异常Rollback;或者发生Crash,重启后对未提交的事务回滚。在Undo层面来看,这些回滚的操作是一致的,基本的过程就是从该事务的Undo Log中,从后向前依次读取Undo Record,并根据其中内容做逆向操作,恢复索引记录。

回滚的入口是函数row_undo,其中会先调用trx_roll_pop_top_rec_of_trx获取并删除该事务的最后一条Undo Record。如下图例子中的Undo Log包括三条Undo Records,其中Record 1在Undo Page 1中,Record 2,3在Undo Page 2中,先通过从Undo Segment Header中记录的Page List找到当前事务的最后一个Undo Page的Header,并根据Undo Page 2的Header上记录的Free Space Offset定位最后一条Undo Record结束的位置,当然实际运行时,这两个值是缓存在trx_undo_t的top_page_no和top_offset中的。利用Prev Record Offset可以找到Undo Record 3,做完对应的回滚操作之后,再通过前序指针Prev Record Offset找到前一个Undo Record,依次进行处理。处理完当前Page中的所有Undo Records后,再沿着Undo Page Header中的List找到前一个Undo Page,重复前面的过程,完成一个事务所有Page上的所有Undo Records的回滚。

拿到一个Undo Record之后,自然地,就是对其中内容的解析,这里会调用row_undo_ins_parse_undo_rec,从Undo Record中获取修改行的table,解析出其中记录的主键信息,如果是update类型,还会拿到一个update vector记录其相对于更新的一个版本的变化。

TRX_UNDO_INSERT_REC类型的Undo回滚在row_undo_ins中进行,insert的逆向操作当然就是delete,根据从Undo Record中解析出来的主键,用row_undo_search_clust_to_pcur定位到对应的ROW, 分别调用row_undo_ins_remove_sec_rec和row_undo_ins_remove_clust_rec在二级索引和主索引上将当前行删除。

update类型的undo包括TRX_UNDO_UPD_EXIST_REC,TRX_UNDO_DEL_MARK_REC和TRX_UNDO_UPD_DEL_REC三种情况,他们的Undo回滚都是在row_undo_mod中进行,首先会调用row_undo_mod_del_unmark_sec_and_undo_update,其中根据从Undo Record中解析出的update vector来回退这次操作在所有二级索引上的影响,可能包括重新插入被删除的二级索引记录、去除其中的Delete Mark标记,或者用update vector中的diff信息将二级索引记录修改之前的值。之后调用row_undo_mod_clust同样利用update vector中记录的diff信息将主索引记录修改回之前的值。

完成回滚的Undo Log部分,会调用trx_roll_try_truncate进行回收,对不再使用的page调用trx_undo_free_last_page将磁盘空间交还给Undo Segment,这个是写入过程中trx_undo_add_page的逆操作。

七 Undo for MVCC

多版本的目的是为了避免写事务和读事务的互相等待,那么每个读事务都需要在不对Record加Lock的情况下, 找到对应的应该看到的历史版本。所谓历史版本就是假设在该只读事务开始的时候对整个DB打一个快照,之后该事务的所有读请求都从这个快照上获取。当然实现上不能真正去为每个事务打一个快照,这个时间空间都太高了。InnoDB的做法,是在读事务第一次读取的时候获取一份ReadView,并一直持有,其中记录所有当前活跃的写事务ID,由于写事务的ID是自增分配的,通过这个ReadView我们可以知道在这一瞬间,哪些事务已经提交哪些还在运行,根据Read Committed的要求,未提交的事务的修改就是不应该被看见的,对应地,已经提交的事务的修改应该被看到。

作为存储历史版本的Undo Record,其中记录的trx_id就是做这个可见性判断的,对应的主索引的Record上也有这个值。当一个读事务拿着自己的ReadView访问某个表索引上的记录时,会通过比较Record上的trx_id确定是否是可见的版本,如果不可见就沿着Record或Undo Record中记录的rollptr一路找更老的历史版本。如下图所示,事务R开始需要查询表t上的id为1的记录,R开始时事务I已经提交,事务J还在运行,事务K还没开始,这些信息都被记录在了事务R的ReadView中。事务R从索引中找到对应的这条Record[1, C],对应的trx_id是K,不可见。沿着Rollptr找到Undo中的前一版本[1, B],对应的trx_id是J,不可见。继续沿着Rollptr找到[1, A],trx_id是I可见,返回结果。

前面提到过,作为Logical Log,Undo中记录的其实是前后两个版本的diff信息,而读操作最终是要获得完整的Record内容的,也就是说这个沿着rollptr指针一路查找的过程中需要用Undo Record中的diff内容依次构造出对应的历史版本,这个过程在函数row_search_mvcc中,其中trx_undo_prev_version_build会根据当前的rollptr找到对应的Undo Record位置,这里如果是rollptr指向的是insert类型,或者找到了已经Purge了的位置,说明到头了,会直接返回失败。否则,就会解析对应的Undo Record,恢复出trx_id、指向下一条Undo Record的rollptr、主键信息,diff信息update vector等信息。之后通过row_upd_rec_in_place,用update vector修改当前持有的Record拷贝中的信息,获得Record的这个历史版本。之后调用自己ReadView的changes_visible判断可见性,如果可见则返回用户。完成这个历史版本的读取。

八 Undo for Crash Recovery

Crash Recovery时,需要利用Undo中的信息将未提交的事务的所有影响回滚,以保证数据库的Failure Atomic。前面提到过,InnoDB中的Undo其实是像数据一样处理的,也从上面的组织结构中可以看出来,Undo本身有着比Redo Log复杂得多、按事务分配而不是顺序写入的组织结构,其本身的Durability像InnoDB中其他的数据一样,需要靠Redo来保证,像庖丁解InnoDB之REDO LOG中介绍的那样。除了通用的一些MLOG_2BYTES、MLOG_4BYTES类型之外,Undo本身也有自己对应的Redo Log类型:MLOG_UNDO_INIT类型在Undo Page舒适化的时候记录初始化;在分配Undo Log的时候,需要重用Undo Log Header或需要创建新的Undo Log Header的时候,会分别记录MLOG_UNDO_HDR_REUSE和MLOG_UNDO_HDR_CREATE类型的Redo Record;MLOG_UNDO_INSERT是最常见的,在Undo Log里写入新的Undo Record都对应的写这个日志记录写入Undo中的所有内容;最后,MLOG_UNDO_ERASE_END 对应Undo Log跨Undo Page时抹除最后一个不完整的Undo Record的操作。

如数据库故障恢复机制的前世今生中讲过的ARIES过程,Crash Recovery的过程中会先重放所有的Redo Log,整个Undo的磁盘组织结构,也会作为一种数据类型也会通过上面讲到的这些Redo类型的重放恢复出来。之后在trx_sys_init_at_db_start中会扫描Undo的磁盘结构,遍历所有的Rollback Segment和其中所有的Undo Segment,通过读取Undo Segment Header中的State,可以知道在Crash前,最后持有这个Undo Segment的事务状态。如果是TRX_UNDO_ACTIVE,说明当时事务需要回滚,否则说明事务已经结束,可以继续清理Undo Segment的逻辑。之后,就可以恢复出Undo Log的内存组织模式,包括活跃事务的内存结构trx_t,Rollback Segment的内存结构trx_rseg_t,以及其中的trx_undo_t的四个链表。

Crash Recovery完成之前,会启动在srv_dict_recover_on_restart中启动异步回滚线程trx_recovery_rollback_thread,其中对Crash前还活跃的事务,通过trx_rollback_active进行回滚,这个过程跟上面提到的Undo for Rollback是一致的。

九 Undo的清理

我们已经知道,InnoDB在Undo Log中保存了多份历史版本来实现MVCC,当某个历史版本已经确认不会被任何现有的和未来的事务看到的时候,就应该被清理掉。因此就需要有办法判断哪些Undo Log不会再被看到。InnoDB中每个写事务结束时都会拿一个递增的编号trx_no作为事务的提交序号,而每个读事务会在自己的ReadView中记录自己开始的时候看到的最大的trx_no为m_low_limit_no。那么,如果一个事务的trx_no小于当前所有活跃的读事务Readview中的这个m_low_limit_no,说明这个事务在所有的读开始之前已经提交了,其修改的新版本是可见的, 因此不再需要通过undo构建之前的版本,这个事务的Undo Log也就可以被清理了。如下图所所以,由于ReadView List中最老的ReadView在获取时,Transaction J就已经Commit,因此所有的读事务都一定能被Index中的版本或者第一个Undo历史版本满足,不需要更老的Undo,因此整个Transaction J的Undo Log都可以清理了。

Undo的清理工作是由专门的后台线程srv_purge_coordinator_thread进行扫描和分发, 并由多个srv_worker_thread真正清理的。coordinator会首先在函数trx_purge_attach_undo_recs中扫描innodb_purge_batch_size配置个Undo Records,作为一轮清理的任务分发给worker。

1 扫描一批要清理Undo Records

事务结束的时候,对于需要Purge的Update类型的Undo Log,会按照事务提交的顺序trx_no,挂载到Rollback Segment Header的History List上。Undo Log回收的基本思路,就是按照trx_no从小到大,依次遍历所有Undo Log进行清理操作。前面介绍了,InnoDB中有多个Rollback Segment,那么就会有多个History List,每个History List内部事务有序,但还需要从多个History List上找一个trx_no全局有序的序列,如下图所示:

图中的事务编号是按照InnoDB这里引入了一个堆结构purge_queue,用来依次从所有History List中找到下一个拥有最小trx_no的事务。purge_queue中记录了所有等待Purge的Rollback Segment和其History中trx_no最小的事务,trx_purge_choose_next_log依次从purge_queue中pop出拥有全局最小trx_no的Undo Log。调用trx_purge_get_next_rec遍历对应的Undo Log,处理每一条Undo Record。之后继续调用trx_purge_rseg_get_next_history_log从purge_queue中获取下一条trx_no最小的Undo Log,并且将当前Rollback Segment上的下一条Undo Log继续push进purge_queue,等待后续的顺序处理。对应上图的处理过程和对应的函数调用,如下图所示:

其中,trx_purge_get_next_rec会从上到下遍历一个Undo Log中的所有Undo Record,这个跟前面讲过的Rollback时候从下到上的遍历方向是相反的,还是以同样的场景为例,要Purge的Undo Log横跨两个Undo Page,Undo Record 1在Page 1中,而Undo Record 2,3在Page 2中。如下图所示,首先会从当前的Undo Log Header中找到第一个Undo Record的位置Log Start Offset,处理完Undo Record1之后沿着Next Record Offset去找下一个Undo Record,当找到Page末尾时,要通过Page List Node找下一个Page,找到Page内的第一个Undo Record,重复上面的过程直到找出所有的Undo Record。

对每个要Purge的Undo Record,在真正删除它本身之前,可能还需要处理一些索引上的信息,这是由于正常运行过程中,当需要删除某个Record时,为了保证其之前的历史版本还可以通过Rollptr找到,Record是没有真正删除的,只是打了Delete Mark的标记,并作为一种特殊的Update操作记录了Undo Record。那么在对应的TRX_UNDO_DEL_MARK_REC类型的Undo Record被清理之前,需要先从索引上真正地删除这个Delete Mark的记录。因此Undo Record的清理工作会分为两个过程:

  • TRX_UNDO_DEL_MARK_REC类型Undo Record对应的Record的真正删除,称为Undo Purge;
  • 以及Undo Record本身从旧到新的删除,称为Undo Truncate。

除此之外,当配置的独立Undo Tablespace大于两个的时候,InnoDB支持通过重建来缩小超过配置大小的Undo Tablespace:

  • Undo Tablespace的重建缩小,称为Undo Tablespace Truncate

2 Undo Purge

这一步主要针对的是TRX_UNDO_DEL_MARK_REC类型的Undo Record,用来真正的删除索引上被标记为Delete Mark的Record。worker线程会在row_purge函数中,循环处理coordinator分配来的每一个Undo Records,先通过row_purge_parse_undo_rec,依次从Undo Record中解析出type、table_id、rollptr、对应记录的主键信息以及update vector。之后,针对TRX_UNDO_DEL_MARK_REC类型,调用row_purge_remove_sec_if_poss将需要删除的记录从所有的二级索引上删除,调用row_purge_remove_clust_if_poss从主索引上删除。另外,TRX_UNDO_UPD_EXIST_REC类型的Undo虽然不涉及主索引的删除,但可能需要做二级索引的删除,也是在这里处理的。

3 Undo Truncate

coordinator线程会等待所有的worker完成一批Undo Records的Purge工作,之后尝试清理不再需要的Undo Log,trx_purge_truncate函数中会遍历所有的Rollback Segment中的所有Undo Segment,如果其状态是TRX_UNDO_TO_PURGE,调用trx_purge_free_segment释放占用的磁盘空间并从History List中删除。否则,说明该Undo Segment正在被使用或者还在被cache(TRX_UNDO_CACHED类型),那么只通过trx_purge_remove_log_hd将其从History List中删除。

需要注意的是,Undo Truncate的动作并不是每次都会进行的,它的频次是由参数innodb_rseg_truncate_frequency控制的,也就是说要攒innodb_rseg_truncate_frequency个batch才进行一次,前面提到每一个batch中会处理innodb_purge_batch_size个Undo Records,这也就是为什么我们从show engine innodb status中看到的Undo History List的缩短是跳变的。

4 Undo Tablespace Truncate

如果innodb_trx_purge_truncate配置打开,在函数trx_purge_truncate中还会去尝试重建Undo Tablespaces以缩小文件空间占用。Undo Truncate之后,会在函数trx_purge_mark_undo_for_truncate中扫描所有的Undo Tablespace,文件大小大于配置的innodb_max_undo_log_size的Tablespace会被标记为inactive,每一时刻最多有一个Tablespace处于inactive,inactive的Undo Tablespace上的所有Rollback Segment都不参与给新事物的分配,等该文件上所有的活跃事务退出,并且所有的Undo Log都完成Purge之后,这个Tablespace就会被通过trx_purge_initiate_truncate重建,包括重建Undo Tablespace中的文件结构和内存结构,之后被重新标记为active,参与分配给新的事务使用。

十 总结

本文首先概括地介绍了Undo Log的角色,之后介绍了一个Undo Record中的内容,紧接着介绍它的逻辑组织方式、物理组织方式、文件组织方式以及内存组织方式,详细描述了Undo Tablespace、Rollback Segment、Undo Segment、Undo Log和Undo Record的之间的关系和层级。这些组织方式都是为了更好的使用和维护Undo信息。最后在此基础上,介绍了Undo在各个重要的DB功能中的作用和实现方式,包括事务回滚、MVCC、Crash Recovery、Purge等。

参考:

[1] MySQL 8.0.11Source Code Documentation: Format of redo log

dev.mysql.com/doc/dev/mys…

[2] MySQL Source Code

github.com/mysql/mysql…

[3] The basics of the InnoDB undo logging and history system

blog.jcole.us/2014/04/16/….

[4] MySQL · 引擎特性 · InnoDB undo log 漫游

mysql.taobao.org/monthly/201…

[5] 数据库故障恢复机制的前世今生

catkang.github.io/2019/01/16/…

[6] 浅析数据库并发控制机制

catkang.github.io/2018/09/19/…

[7] 庖丁解InnoDB之REDO LOG

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

手写热部署

发表于 2021-11-16

「这是我参与11月更文挑战的第8天,活动详情查看:2021最后一次更文挑战」

引言

在项目开发中,每次修改文件就需要重启一次代码,这样太浪费时间了,所以在IDEA中使用JRebel插件实现项目🔥热部署,可自动热部署,无需重启项目。虽然一直清楚热部署是打破双亲委派来实现的,但是一直没有手写过热部署代码,今天写一次。😁

双亲委派机制

了解热部署之前,首先需要知道什么是双亲委派,在IDE中写的代码最终经过编译器会形成.class文件,由classLoader加载到JVM中执行。

JVM中提供了三层的ClassLoader:

  • Bootstrap classLoader:主要负责加载核心的类库(java.lang.*等),构造ExtClassLoader和APPClassLoader。
  • ExtClassLoader:主要负责加载jre/lib/ext目录下的一些扩展的jar。
  • AppClassLoader:主要负责加载应用程序的主函数类
    加载过程图如下:
    Untitled-2021-11-11-1524.png

实现热部署思路

一个类一旦被JVM加载过,就不会再次被加载。想实现热部署,就需要在.class文件修改后,由classLoader重新加载修改的.class文件。对.class文件做监听,一旦文件修改,则重新加载类。
在此实现中用一个Map模拟JVM已经加载过的.class文件,当监听到文件内容修改之后,移除Map中旧的.class文件,将新的.class文件加载并存放至Map中,调用init方法,执行初始化动作,模拟.class文件已经加载到JVM虚拟机中。

代码实现

pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hanhang</groupId>
<artifactId>hotCode</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.18</version>
</dependency>
</dependencies>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

</project>

IApplication接口

定义IApplication接口,所有监听的类都实现自这个接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public interface IApplication {
/**
* 初始化
*/
void init();

/**
* 执行
*/
void execute();

/**
* 销毁
*/
void destroy();
}

TestApplication1

监听加载的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class TestApplication1 implements IApplication {
@Override
public void init() {
System.out.println("TestApplication1--》3");
}

@Override
public void execute() {
System.out.println("TestApplication1--》execute");
}

@Override
public void destroy() {
System.out.println("TestApplication1--》destroy");
}
}

IClassLoader

类加载器,实现通过包扫描类的功能

1
2
3
4
5
6
7
8
9
java复制代码public interface IClassLoader {
/**
* 创建classLoader
* @param parentClassLoader 父classLoader
* @param paths 路径
* @return 类加载器
*/
ClassLoader createClassLoader(ClassLoader parentClassLoader, String...paths);
}

SimpleJarLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
java复制代码import com.hanhang.inter.IClassLoader;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* @author hanhang
*/
public class SimpleJarLoader implements IClassLoader {
@Override
public ClassLoader createClassLoader(ClassLoader parentClassLoader, String... paths) {
List<URL> jarsToLoad = new ArrayList<>();
for (String folder : paths) {
List<String> jarPaths = scanJarFiles(folder);

for (String jar : jarPaths) {

try {
File file = new File(jar);
jarsToLoad.add(file.toURI().toURL());

} catch (MalformedURLException e) {
e.printStackTrace();
}
}
}

URL[] urls = new URL[jarsToLoad.size()];
jarsToLoad.toArray(urls);

return new URLClassLoader(urls, parentClassLoader);
}

/**
* 扫描文件
* @param folderPath 文件路径
* @return 文件列表
*/
private List<String> scanJarFiles(String folderPath) {

List<String> jars = new ArrayList<>();
File folder = new File(folderPath);
if (!folder.isDirectory()) {
throw new RuntimeException("扫描的路径不存在, path:" + folderPath);
}

for (File f : Objects.requireNonNull(folder.listFiles())) {
if (!f.isFile()) {
continue;
}
String name = f.getName();

if (name.length() == 0) {
continue;
}

int extIndex = name.lastIndexOf(".");
if (extIndex < 0) {
continue;
}

String ext = name.substring(extIndex);
if (!".jar".equalsIgnoreCase(ext)) {
continue;
}

jars.add(folderPath + "/" + name);
}
return jars;
}
}

AppConfigList配置类

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Data
public class AppConfigList {
private List<AppConfig> configs;

@Data
public static class AppConfig{
private String name;

private String file;
}
}

GlobalSetting 全局配置类

1
2
3
4
java复制代码public class GlobalSetting {
public static final String APP_CONFIG_NAME = "application.xml";
public static final String JAR_FOLDER = "com/hanhang/app/";
}

application.xml配置

通过xml配置加后面的解析,确定监听那个class文件。

1
2
3
4
5
6
xml复制代码<apps>
<app>
<name>TestApplication1</name>
<file>com.hanhang.app.TestApplication1</file>
</app>
</apps>

JarFileChangeListener 监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class JarFileChangeListener implements FileListener {
@Override
public void fileCreated(FileChangeEvent fileChangeEvent) throws Exception {
String name = fileChangeEvent.getFileObject().getName().getBaseName().replace(".class","");

ApplicationManager.getInstance().reloadApplication(name);
}

@Override
public void fileDeleted(FileChangeEvent fileChangeEvent) throws Exception {
String name = fileChangeEvent.getFileObject().getName().getBaseName().replace(".class","");

ApplicationManager.getInstance().reloadApplication(name);
}

@Override
public void fileChanged(FileChangeEvent fileChangeEvent) throws Exception {
String name = fileChangeEvent.getFileObject().getName().getBaseName().replace(".class","");

ApplicationManager.getInstance().reloadApplication(name);

}
}

AppConfigManager

此类为config的管理类,用于加载配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
java复制代码import com.hanhang.config.AppConfigList;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.xml.DomDriver;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

/**
* @author hanhang
*/
public class AppConfigManager {
private final List<AppConfigList.AppConfig> configs;

public AppConfigManager(){
configs = new ArrayList<>();
}

/**
* 加载配置
* @param path 路径
*/
public void loadAllApplicationConfigs(URI path){

File file = new File(path);
XStream xstream = getXmlDefine();
try {
AppConfigList configList = (AppConfigList)xstream.fromXML(new FileInputStream(file));

if(configList.getConfigs() != null){
this.configs.addAll(new ArrayList<>(configList.getConfigs()));
}

} catch (FileNotFoundException e) {
e.printStackTrace();
}

}

/**
* 获取xml配置定义
* @return XStream
*/
private XStream getXmlDefine(){
XStream xstream = new XStream(new DomDriver());
xstream.alias("apps", AppConfigList.class);
xstream.alias("app", AppConfigList.AppConfig.class);
xstream.aliasField("name", AppConfigList.AppConfig.class, "name");
xstream.aliasField("file", AppConfigList.AppConfig.class, "file");
xstream.addImplicitCollection(AppConfigList.class, "configs");
Class<?>[] classes = new Class[] {AppConfigList.class,AppConfigList.AppConfig.class};
xstream.allowTypes(classes);
return xstream;
}

public final List<AppConfigList.AppConfig> getConfigs() {
return configs;
}

public AppConfigList.AppConfig getConfig(String name){
for(AppConfigList.AppConfig config : this.configs){
if(config.getName().equalsIgnoreCase(name)){
return config;
}
}
return null;
}
}

ApplicationManager

此类管理已经在Map中加载的类,并且添加监听器,实现class文件修改后的监听重新加载工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
java复制代码import com.hanhang.config.AppConfigList;
import com.hanhang.config.GlobalSetting;
import com.hanhang.inter.IApplication;
import com.hanhang.inter.IClassLoader;
import com.hanhang.inter.impl.SimpleJarLoader;
import com.hanhang.listener.JarFileChangeListener;
import org.apache.commons.vfs2.*;
import org.apache.commons.vfs2.impl.DefaultFileMonitor;

import java.io.File;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* @author hanhang
*/
public class ApplicationManager {
private static ApplicationManager instance;

private IClassLoader jarLoader;
private AppConfigManager configManager;

private Map<String, IApplication> apps;

private ApplicationManager(){
}

public void init(){
jarLoader = new SimpleJarLoader();
configManager = new AppConfigManager();
apps = new HashMap<>();

initAppConfigs();

URL basePath = this.getClass().getClassLoader().getResource("");

loadAllApplications(Objects.requireNonNull(basePath).getPath());

initMonitorForChange(basePath.getPath());
}

/**
* 初始化配置
*/
public void initAppConfigs(){

try {
URL path = this.getClass().getClassLoader().getResource(GlobalSetting.APP_CONFIG_NAME);
configManager.loadAllApplicationConfigs(Objects.requireNonNull(path).toURI());
} catch (URISyntaxException e) {
e.printStackTrace();
}
}

/**
* 加载类
* @param basePath 根目录
*/
public void loadAllApplications(String basePath){

for(AppConfigList.AppConfig config : this.configManager.getConfigs()){
this.createApplication(basePath, config);
}
}

/**
* 初始化监听器
* @param basePath 路径
*/
public void initMonitorForChange(String basePath){
try {
FileSystemManager fileManager = VFS.getManager();

File file = new File(basePath + GlobalSetting.JAR_FOLDER);
FileObject monitoredDir = fileManager.resolveFile(file.getAbsolutePath());
FileListener fileMonitorListener = new JarFileChangeListener();
DefaultFileMonitor fileMonitor = new DefaultFileMonitor(fileMonitorListener);
fileMonitor.setRecursive(true);
fileMonitor.addFile(monitoredDir);
fileMonitor.start();
System.out.println("Now to listen " + monitoredDir.getName().getPath());

} catch (FileSystemException e) {
e.printStackTrace();
}
}

/**
* 根据配置加载类
* @param basePath 路径
* @param config 配置
*/
public void createApplication(String basePath, AppConfigList.AppConfig config){
String folderName = basePath + GlobalSetting.JAR_FOLDER;
ClassLoader loader = this.jarLoader.createClassLoader(ApplicationManager.class.getClassLoader(), folderName);

try {
Class<?> appClass = loader.loadClass(config.getFile());

IApplication app = (IApplication)appClass.newInstance();

app.init();

this.apps.put(config.getName(), app);

} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
}

/**
* 重新加载
* @param name 类名
*/
public void reloadApplication(String name){
IApplication oldApp = this.apps.remove(name);

if(oldApp == null){
return;
}

oldApp.destroy();

AppConfigList.AppConfig config = this.configManager.getConfig(name);
if(config == null){
return;
}

createApplication(getBasePath(), config);
}

public static ApplicationManager getInstance(){
if(instance == null){
instance = new ApplicationManager();
}
return instance;
}

/**
* 获取类
* @param name 类名
* @return 缓存中的类
*/
public IApplication getApplication(String name){
if(this.apps.containsKey(name)){
return this.apps.get(name);
}
return null;
}

public String getBasePath(){
return Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).getPath();
}
}

MainTest

测试类,创建一个线程,让程序一直监听文件修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public static void main(String[] args){

Thread t = new Thread(new Runnable() {

@Override
public void run() {
ApplicationManager manager = ApplicationManager.getInstance();
manager.init();
}
});

t.start();

while(true){
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

代码演示

程序启动后,控制台输出
image.png
将TestApplication1的init方法修改为:

1
2
3
4
java复制代码@Override
public void init() {
System.out.println("TestApplication1--》300");
}

重新build项目,控制台输出如下:

image.png
此时,TestApplication1已经重新加载。

总结

以上就是我实现🔥热部署的代码,github源码地址:github.com/hanhang6/ho…

如果觉得我写的有问题,评论区里可以留言。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用 Transactional 时常犯的N种错误 1

发表于 2021-11-16

@Transactional是我们在用Spring时候几乎逃不掉的一个注解,该注解主要用来声明事务。它的实现原理是通过Spring AOP在注解修饰方法的前后织入事务管理的实现语句,所以开发者只需要通过一个注解就能代替一系列繁琐的事务开始、事务关闭等重复性的编码任务。

编码方式确实简单了,但也因为隐藏了直观的实现逻辑,一些错误的编码方法可能会让@Transactional注解失效,达不到事务的作用。最直接的表现就是:方法执行过程中抛出了异常,但事务没有回滚,最终导致了脏数据的产生。

之前我在博客上也写过一篇有趣的讨论我来出个题:这个事务会不会回滚?,当时很多人都给出了标准的错误答案,如果没看过的小伙伴不妨进去挑战一下?

虽然之前讨论了一些特殊情况,但还是一直有小伙伴会邮件、微信群里问一些关于事务失效的问题。主要还是@Transactional声明事务失效的情况真的是多种多样!所以,今天写一篇总结一下,如果下次再碰到,那就打开这片文章,一个个顺下来看,是不是哪里写错了。当然可能这里还会有遗漏,所以如果你有其他错误案例,也可以告诉我,我会持续整理到这篇文章里。

  1. 在同一个类中调用

错误案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public class A {

public void methodA() {
methodB();

// 其他操作
}

@Transactional
public void methodB() {
// 写数据库操作
}

}

这类错误适用于所有基于Spring AOP实现的注解,比如:《使用@Async实现异步调用》中提到的@Async注解,《使用@Scheduled实现定时任务》中提到的@Scheduled注解,还有Spring缓存注解的使用解中提到的@Cacheable注解等。

解决这个问题的方法比较简单,还是合理规划好层次关系即可,比如这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@Service
@AllArgsConstructor
public class A {

private B b;

public void methodA() {
b.methodB();
// 其他操作
}
}

@Service
public class B {

@Transactional
public void methodB() {
// 写数据库操作
}

}

注意:这里A类用了构造器注入B的实现(为什么没用@Autowrire,可以看看前几天分享的这篇什么时候不要用@Autowired注入),构造函数用Lombok的@AllArgsConstructor生成(这个不熟悉的话可以看看之前这篇Lombok:让JAVA代码更优雅)。

  1. @Transactional修饰方法不是public

错误案例:

1
2
3
4
5
6
7
8
java复制代码public class TransactionalMistake {

@Transactional
private void method() {
// 写数据库操作
}

}

这也是基于Spring AOP实现的注解所要满足的要求。这个最简单,很好理解,也很直观,就不详细展开了。直接把方法访问类型改成public即可。

  1. 不同的数据源

错误案例:

1
2
3
4
5
6
7
8
9
java复制代码public class TransactionalMistake {

@Transactional
public void createOrder(Order order) {
orderRepo1.save(order);
orderRepo2.save(order);
}

}

有的时候,我们一个操作可能会同时写多个数据源,比如上面这个例子里的orderRepo1和orderRepo2是连接的两个不同数据源。默认情况下,这种跨数据源的事务是不会成功的。

如果要在多个数据源之间实现事务,那么可以引入JTA,具体如何做的话可以看看之前的这篇分享《使用JTA实现多数据源的事务管理》

  1. 回滚异常配置不正确

默认情况下,仅对RuntimeException和Error进行回滚。如果不是的它们及它们的子孙异常的话,就不会回滚。

所以,在自定义异常的时候,要做好适当的规划,如果要影响事务回滚,可以定义为RuntimeException的子类;如果不是RuntimeException,但也希望触发回滚,那么可以使用rollbackFor属性来指定要回滚的异常。

1
2
3
4
5
6
7
8
java复制代码public class TransactionalMistake {

@Transactional(rollbackFor = XXXException.class)
public void method() throws XXXException {

}

}
  1. 数据库引擎不支持事务

这个来源于一个读者反馈的例子,代码跟我的案例一摸一样,我这边是好的,但他就是不回滚。

后来排查出来是因为漏了一个关键属性的配置:

1
properties复制代码spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect

这里的spring.jpa.database-platform配置主要用来设置hibernate使用的方言。这里特地采用了MySQL5InnoDBDialect,主要为了保障在使用Spring Data JPA时候,Hibernate自动创建表的时候使用InnoDB存储引擎,不然就会以默认存储引擎MyISAM来建表,而MyISAM存储引擎是没有事务的。

如果你的事务没有生效,那么可以看看创建的表,是不是使用了MyISAM存储引擎,如果是的话,那就是这个原因了!

小结

如果你看到最后,发现还有其他情况还没有囊括其中,欢迎告诉我们哟,我们会持续更新这篇文章!以帮助碰到此类问题的读者。

好了,今天的学习就到这里!如果您学习过程中如遇困难?可以加入我们超高质量的Spring技术交流群,参与交流与讨论,更好的学习与进步!

欢迎关注我的公众号:程序猿DD,分享外面看不到的干货与思考!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…325326327…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%