开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

8000字长文让你彻底了解 Java 8 的 Lambda、

发表于 2020-06-10

我是风筝,公众号「古时的风筝」
文章会收录在 JavaNewBee 中,公众号更多干货。公众号回复『666』获取高清大图。

就在今年 Java 25周岁了,可能比在座的各位中的一些少年年龄还大,但令人遗憾的是,竟然没有我大,不禁感叹,Java 还是太小了。(难道我会说是因为我老了?)


而就在上个月,Java 15 的试验版悄悄发布了,但是在 Java 界一直有个神秘现象,那就是「你发你发任你发,我的最爱 Java 8」.

据 Snyk 和 The Java Magazine 联合推出发布的 2020 JVM 生态调查报告显示,在所有的 Java 版本中,仍然有 64% 的开发者使用 Java 8。另外一些开发者可能已经开始用 Java 9、Java 11、Java 13 了,当然还有一些神仙开发者还在坚持使用 JDK 1.6 和 1.7。

尽管 Java 8 发布多年,使用者众多,可神奇的是竟然有很多同学没有用过 Java 8 的新特性,比如 Lambda表达式、比如方法引用,再比如今天要说的 Stream。其实 Stream 就是以 Lambda 和方法引用为基础,封装的简单易用、函数式风格的 API。

Java 8 是在 2014 年发布的,实话说,风筝我也是在 Java 8 发布后很长一段时间才用的 Stream,因为 Java 8 发布的时候我还在 C# 的世界中挣扎,而使用 Lambda 表达式却很早了,因为 Python 中用 Lambda 很方便,没错,我写 Python 的时间要比 Java 的时间还长。


要讲 Stream ,那就不得不先说一下它的左膀右臂 Lambda 和方法引用,你用的 Stream API 其实就是函数式的编程风格,其中的「函数」就是方法引用,「式」就是 Lambda 表达式。


Lambda 表达式


Lambda 表达式是一个匿名函数,Lambda表达式基于数学中的λ演算得名,直接对应于其中的lambda抽象,是一个匿名函数,即没有函数名的函数。Lambda表达式可以表示闭包。

在 Java 中,Lambda 表达式的格式是像下面这样

1
2
3
4
5
复制代码// 无参数,无返回值
() -> log.info("Lambda")

// 有参数,有返回值
(int a, int b) -> { a+b }

其等价于

1
2
3
4
5
复制代码log.info("Lambda");

private int plus(int a, int b){
return a+b;
}

最常见的一个例子就是新建线程,有时候为了省事,会用下面的方法创建并启动一个线程,这是匿名内部类的写法,new Thread需要一个 implements 自Runnable类型的对象实例作为参数,比较好的方式是创建一个新类,这个类 implements Runnable,然后 new 出这个新类的实例作为参数传给 Thread。而匿名内部类不用找对象接收,直接当做参数。

1
2
3
4
5
6
复制代码new Thread(new Runnable() {
@Override
public void run() {
System.out.println("快速新建并启动一个线程");
}
}).start();

但是这样写是不是感觉看上去很乱、很土,而这时候,换上 Lambda 表达式就是另外一种感觉了。

1
2
3
复制代码new Thread(()->{
System.out.println("快速新建并启动一个线程");
}).start();

怎么样,这样一改,瞬间感觉清新脱俗了不少,简洁优雅了不少。

Lambda 表达式简化了匿名内部类的形式,可以达到同样的效果,但是 Lambda 要优雅的多。虽然最终达到的目的是一样的,但其实内部的实现原理却不相同。

匿名内部类在编译之后会创建一个新的匿名内部类出来,而 Lambda 是调用 JVM invokedynamic指令实现的,并不会产生新类。

方法引用

方法引用的出现,使得我们可以将一个方法赋给一个变量或者作为参数传递给另外一个方法。::双冒号作为方法引用的符号,比如下面这两行语句,引用 Integer类的 parseInt方法。

1
2
复制代码Function<String, Integer> s = Integer::parseInt;
Integer i = s.apply("10");

或者下面这两行,引用 Integer类的 compare方法。

1
2
复制代码Comparator<Integer> comparator = Integer::compare;
int result = comparator.compare(100,10);

再比如,下面这两行代码,同样是引用 Integer类的 compare方法,但是返回类型却不一样,但却都能正常执行,并正确返回。

1
2
复制代码IntBinaryOperator intBinaryOperator = Integer::compare;
int result = intBinaryOperator.applyAsInt(10,100);

相信有的同学看到这里恐怕是下面这个状态,完全不可理喻吗,也太随便了吧,返回给谁都能接盘。


先别激动,来来来,现在咱们就来解惑,解除蒙圈脸。

Q:什么样的方法可以被引用?

A:这么说吧,任何你有办法访问到的方法都可以被引用。

Q:返回值到底是什么类型?

A:这就问到点儿上了,上面又是 Function、又是Comparator、又是 IntBinaryOperator的,看上去好像没有规律,其实不然。

返回的类型是 Java 8 专门定义的函数式接口,这类接口用 @FunctionalInterface 注解。

比如 Function这个函数式接口的定义如下:

1
2
3
4
复制代码@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}

还有很关键的一点,你的引用方法的参数个数、类型,返回值类型要和函数式接口中的方法声明一一对应才行。

比如 Integer.parseInt方法定义如下:

1
2
3
复制代码public static int parseInt(String s) throws NumberFormatException {
return parseInt(s,10);
}

首先parseInt方法的参数个数是 1 个,而 Function中的 apply方法参数个数也是 1 个,参数个数对应上了,再来,apply方法的参数类型和返回类型是泛型类型,所以肯定能和 parseInt方法对应上。

这样一来,就可以正确的接收Integer::parseInt的方法引用,并可以调用Funciton的apply方法,这时候,调用到的其实就是对应的 Integer.parseInt方法了。

用这套标准套到 Integer::compare方法上,就不难理解为什么即可以用 Comparator<Integer>接收,又可以用 IntBinaryOperator接收了,而且调用它们各自的方法都能正确的返回结果。

Integer.compare方法定义如下:

1
2
3
复制代码public static int compare(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}

返回值类型 int,两个参数,并且参数类型都是 int。

然后来看Comparator和IntBinaryOperator它们两个的函数式接口定义和其中对应的方法:

1
2
3
4
5
6
7
8
9
复制代码@FunctionalInterface
public interface Comparator<T> {
int compare(T o1, T o2);
}

@FunctionalInterface
public interface IntBinaryOperator {
int applyAsInt(int left, int right);
}

对不对,都能正确的匹配上,所以前面示例中用这两个函数式接口都能正常接收。其实不止这两个,只要是在某个函数式接口中声明了这样的方法:两个参数,参数类型是 int或者泛型,并且返回值是 int或者泛型的,都可以完美接收。

JDK 中定义了很多函数式接口,主要在 java.util.function包下,还有 java.util.Comparator 专门用作定制比较器。另外,前面说的 Runnable也是一个函数式接口。

functionInterface 引用
自己动手实现一个例子


1. 定义一个函数式接口,并添加一个方法

定义了名称为 KiteFunction 的函数式接口,使用 @FunctionalInterface注解,然后声明了具有两个参数的方法 run,都是泛型类型,返回结果也是泛型。

还有一点很重要,函数式接口中只能声明一个可被实现的方法,你不能声明了一个 run方法,又声明一个 start方法,到时候编译器就不知道用哪个接收了。而用default 关键字修饰的方法则没有影响。

1
2
3
4
5
6
7
8
9
10
11
复制代码@FunctionalInterface
public interface KiteFunction<T, R, S> {

/**
* 定义一个双参数的方法
* @param t
* @param s
* @return
*/
R run(T t,S s);
}

2. 定义一个与 KiteFunction 中 run 方法对应的方法

在 FunctionTest 类中定义了方法 DateFormat,一个将 LocalDateTime类型格式化为字符串类型的方法。

1
2
3
4
5
6
复制代码public class FunctionTest {
public static String DateFormat(LocalDateTime dateTime, String partten) {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(partten);
return dateTime.format(dateTimeFormatter);
}
}

3.用方法引用的方式调用

正常情况下我们直接使用 FunctionTest.DateFormat()就可以了。

而用函数式方式,是这样的。

1
2
复制代码KiteFunction<LocalDateTime,String,String> functionDateFormat = FunctionTest::DateFormat;
String dateString = functionDateFormat.run(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss");

而其实我可以不专门在外面定义 DateFormat这个方法,而是像下面这样,使用匿名内部类。

1
2
3
4
5
6
7
8
9
10
11
复制代码public static void main(String[] args) throws Exception {

String dateString = new KiteFunction<LocalDateTime, String, String>() {
@Override
public String run(LocalDateTime localDateTime, String s) {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(s);
return localDateTime.format(dateTimeFormatter);
}
}.run(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss");
System.out.println(dateString);
}

前面第一个 Runnable的例子也提到了,这样的匿名内部类可以用 Lambda 表达式的形式简写,简写后的代码如下:

1
2
3
4
5
6
7
8
9
复制代码public static void main(String[] args) throws Exception {

KiteFunction<LocalDateTime, String, String> functionDateFormat = (LocalDateTime dateTime, String partten) -> {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(partten);
return dateTime.format(dateTimeFormatter);
};
String dateString = functionDateFormat.run(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss");
System.out.println(dateString);
}

使用(LocalDateTime dateTime, String partten) -> { } 这样的 Lambda 表达式直接返回方法引用。

Stream API

为了说一下 Stream API 的使用,可以说是大费周章啊,知其然,也要知其所以然吗,追求技术的态度和姿势要正确。

当然 Stream 也不只是 Lambda 表达式就厉害了,真正厉害的还是它的功能,Stream 是 Java 8 中集合数据处理的利器,很多本来复杂、需要写很多代码的方法,比如过滤、分组等操作,往往使用 Stream 就可以在一行代码搞定,当然也因为 Stream 都是链式操作,一行代码可能会调用好几个方法。

Collection接口提供了 stream()方法,让我们可以在一个集合方便的使用 Stream API 来进行各种操作。值得注意的是,我们执行的任何操作都不会对源集合造成影响,你可以同时在一个集合上提取出多个 stream 进行操作。

我们看 Stream 接口的定义,继承自 BaseStream,机会所有的接口声明都是接收方法引用类型的参数,比如 filter方法,接收了一个 Predicate类型的参数,它就是一个函数式接口,常用来作为条件比较、筛选、过滤用,JPA中也使用了这个函数式接口用来做查询条件拼接。

1
2
3
4
5
6
复制代码public interface Stream<T> extends BaseStream<T, Stream<T>> {

Stream<T> filter(Predicate<? super T> predicate);

// 其他接口
}

下面就来看看 Stream 常用 API。

of

可接收一个泛型对象或可变成泛型集合,构造一个 Stream 对象。

1
2
3
复制代码private static void createStream(){
Stream<String> stringStream = Stream.of("a","b","c");
}

empty

创建一个空的 Stream 对象。

concat

连接两个 Stream ,不改变其中任何一个 Steam 对象,返回一个新的 Stream 对象。

1
2
3
4
5
复制代码private static void concatStream(){
Stream<String> a = Stream.of("a","b","c");
Stream<String> b = Stream.of("d","e");
Stream<String> c = Stream.concat(a,b);
}

max

一般用于求数字集合中的最大值,或者按实体中数字类型的属性比较,拥有最大值的那个实体。它接收一个 Comparator<T>,上面也举到这个例子了,它是一个函数式接口类型,专门用作定义两个对象之间的比较,例如下面这个方法使用了 Integer::compareTo这个方法引用。

1
2
3
4
5
复制代码private static void max(){
Stream<Integer> integerStream = Stream.of(2, 2, 100, 5);
Integer max = integerStream.max(Integer::compareTo).get();
System.out.println(max);
}

当然,我们也可以自己定制一个 Comparator,顺便复习一下 Lambda 表达式形式的方法引用。

1
2
3
4
5
6
复制代码private static void max(){
Stream<Integer> integerStream = Stream.of(2, 2, 100, 5);
Comparator<Integer> comparator = (x, y) -> (x.intValue() < y.intValue()) ? -1 : ((x.equals(y)) ? 0 : 1);
Integer max = integerStream.max(comparator).get();
System.out.println(max);
}

min

与 max 用法一样,只不过是求最小值。

findFirst

获取 Stream 中的第一个元素。

findAny

获取 Stream 中的某个元素,如果是串行情况下,一般都会返回第一个元素,并行情况下就不一定了。

count

返回元素个数。

1
2
复制代码Stream<String> a = Stream.of("a", "b", "c");
long x = a.count();

peek

建立一个通道,在这个通道中对 Stream 的每个元素执行对应的操作,对应 Consumer<T>的函数式接口,这是一个消费者函数式接口,顾名思义,它是用来消费 Stream 元素的,比如下面这个方法,把每个元素转换成对应的大写字母并输出。

1
2
3
4
复制代码private static void peek() {
Stream<String> a = Stream.of("a", "b", "c");
List<String> list = a.peek(e->System.out.println(e.toUpperCase())).collect(Collectors.toList());
}

forEach

和 peek 方法类似,都接收一个消费者函数式接口,可以对每个元素进行对应的操作,但是和 peek 不同的是,forEach 执行之后,这个 Stream 就真的被消费掉了,之后这个 Stream 流就没有了,不可以再对它进行后续操作了,而 peek操作完之后,还是一个可操作的 Stream 对象。

正好借着这个说一下,我们在使用 Stream API 的时候,都是一串链式操作,这是因为很多方法,比如接下来要说到的 filter方法等,返回值还是这个 Stream 类型的,也就是被当前方法处理过的 Stream 对象,所以 Stream API 仍然可以使用。

1
2
3
4
复制代码private static void forEach() {
Stream<String> a = Stream.of("a", "b", "c");
a.forEach(e->System.out.println(e.toUpperCase()));
}

forEachOrdered

功能与 forEach是一样的,不同的是,forEachOrdered是有顺序保证的,也就是对 Stream 中元素按插入时的顺序进行消费。为什么这么说呢,当开启并行的时候,forEach和 forEachOrdered的效果就不一样了。

1
2
复制代码Stream<String> a = Stream.of("a", "b", "c");
a.parallel().forEach(e->System.out.println(e.toUpperCase()));

当使用上面的代码时,输出的结果可能是 B、A、C 或者 A、C、B或者A、B、C,而使用下面的代码,则每次都是 A、 B、C

1
2
复制代码Stream<String> a = Stream.of("a", "b", "c");
a.parallel().forEachOrdered(e->System.out.println(e.toUpperCase()));

limit

获取前 n 条数据,类似于 MySQL 的limit,只不过只能接收一个参数,就是数据条数。

1
2
3
4
复制代码private static void limit() {
Stream<String> a = Stream.of("a", "b", "c");
a.limit(2).forEach(e->System.out.println(e));
}

上述代码打印的结果是 a、b。

skip

跳过前 n 条数据,例如下面代码,返回结果是 c。

1
2
3
4
复制代码private static void skip() {
Stream<String> a = Stream.of("a", "b", "c");
a.skip(2).forEach(e->System.out.println(e));
}

distinct

元素去重,例如下面方法返回元素是 a、b、c,将重复的 b 只保留了一个。

1
2
3
4
复制代码private static void distinct() {
Stream<String> a = Stream.of("a", "b", "c","b");
a.distinct().forEach(e->System.out.println(e));
}

sorted

有两个重载,一个无参数,另外一个有个 Comparator类型的参数。

无参类型的按照自然顺序进行排序,只适合比较单纯的元素,比如数字、字母等。

1
2
3
4
复制代码private static void sorted() {
Stream<String> a = Stream.of("a", "c", "b");
a.sorted().forEach(e->System.out.println(e));
}

有参数的需要自定义排序规则,例如下面这个方法,按照第二个字母的大小顺序排序,最后输出的结果是 a1、b3、c6。

1
2
3
4
复制代码private static void sortedWithComparator() {
Stream<String> a = Stream.of("a1", "c6", "b3");
a.sorted((x,y)->Integer.parseInt(x.substring(1))>Integer.parseInt(y.substring(1))?1:-1).forEach(e->System.out.println(e));
}

为了更好的说明接下来的几个 API ,我模拟了几条项目中经常用到的类似数据,10条用户信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码private static List<User> getUserData() {
Random random = new Random();
List<User> users = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
User user = new User();
user.setUserId(i);
user.setUserName(String.format("古时的风筝 %s 号", i));
user.setAge(random.nextInt(100));
user.setGender(i % 2);
user.setPhone("18812021111");
user.setAddress("无");
users.add(user);
}
return users;
}

filter

用于条件筛选过滤,筛选出符合条件的数据。例如下面这个方法,筛选出性别为 0,年龄大于 50 的记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码private static void filter(){
List<User> users = getUserData();
Stream<User> stream = users.stream();
stream.filter(user -> user.getGender().equals(0) && user.getAge()>50).forEach(e->System.out.println(e));

/**
*等同于下面这种形式 匿名内部类
*/
// stream.filter(new Predicate<User>() {
// @Override
// public boolean test(User user) {
// return user.getGender().equals(0) && user.getAge()>50;
// }
// }).forEach(e->System.out.println(e));
}

map

map方法的接口方法声明如下,接受一个 Function函数式接口,把它翻译成映射最合适了,通过原始数据元素,映射出新的类型。

1
复制代码<R> Stream<R> map(Function<? super T, ? extends R> mapper);

而 Function的声明是这样的,观察 apply方法,接受一个 T 型参数,返回一个 R 型参数。用于将一个类型转换成另外一个类型正合适,这也是 map的初衷所在,用于改变当前元素的类型,例如将 Integer 转为 String类型,将 DAO 实体类型,转换为 DTO 实例类型。

当然了,T 和 R 的类型也可以一样,这样的话,就和 peek方法没什么不同了。

1
2
3
4
5
6
7
8
9
10
11
复制代码@FunctionalInterface
public interface Function<T, R> {

/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}

例如下面这个方法,应该是业务系统的常用需求,将 User 转换为 API 输出的数据格式。

1
2
3
4
5
6
7
8
9
10
11
12
复制代码private static void map(){
List<User> users = getUserData();
Stream<User> stream = users.stream();
List<UserDto> userDtos = stream.map(user -> dao2Dto(user)).collect(Collectors.toList());
}

private static UserDto dao2Dto(User user){
UserDto dto = new UserDto();
BeanUtils.copyProperties(user, dto);
//其他额外处理
return dto;
}

mapToInt

将元素转换成 int 类型,在 map方法的基础上进行封装。

mapToLong

将元素转换成 Long 类型,在 map方法的基础上进行封装。

mapToDouble

将元素转换成 Double 类型,在 map方法的基础上进行封装。

flatMap

这是用在一些比较特别的场景下,当你的 Stream 是以下这几种结构的时候,需要用到 flatMap方法,用于将原有二维结构扁平化。

  1. Stream<String[]>
  2. Stream<Set<String>>
  3. Stream<List<String>>

以上这三类结构,通过 flatMap方法,可以将结果转化为 Stream<String>这种形式,方便之后的其他操作。

比如下面这个方法,将List<List<User>>扁平处理,然后再使用 map或其他方法进行操作。

1
2
3
4
5
6
7
8
9
复制代码private static void flatMap(){
List<User> users = getUserData();
List<User> users1 = getUserData();
List<List<User>> userList = new ArrayList<>();
userList.add(users);
userList.add(users1);
Stream<List<User>> stream = userList.stream();
List<UserDto> userDtos = stream.flatMap(subUserList->subUserList.stream()).map(user -> dao2Dto(user)).collect(Collectors.toList());
}

flatMapToInt

用法参考 flatMap,将元素扁平为 int 类型,在 flatMap方法的基础上进行封装。

flatMapToLong

用法参考 flatMap,将元素扁平为 Long 类型,在 flatMap方法的基础上进行封装。

flatMapToDouble

用法参考 flatMap,将元素扁平为 Double 类型,在 flatMap方法的基础上进行封装。

collection

在进行了一系列操作之后,我们最终的结果大多数时候并不是为了获取 Stream 类型的数据,而是要把结果变为 List、Map 这样的常用数据结构,而 collection就是为了实现这个目的。

就拿 map 方法的那个例子说明,将对象类型进行转换后,最终我们需要的结果集是一个 List<UserDto >类型的,使用 collect方法将 Stream 转换为我们需要的类型。

下面是 collect接口方法的定义:

1
复制代码<R, A> R collect(Collector<? super T, A, R> collector);

下面这个例子演示了将一个简单的 Integer Stream 过滤出大于 7 的值,然后转换成 List<Integer>集合,用的是 Collectors.toList()这个收集器。

1
2
3
4
复制代码private static void collect(){
Stream<Integer> integerStream = Stream.of(1,2,5,7,8,12,33);
List<Integer> list = integerStream.filter(s -> s.intValue()>7).collect(Collectors.toList());
}

很多同学表示看不太懂这个 Collector是怎么一个意思,来,我们看下面这段代码,这是 collect的另一个重载方法,你可以理解为它的参数是按顺序执行的,这样就清楚了,这就是个 ArrayList 从创建到调用 addAll方法的一个过程。

1
2
3
4
5
复制代码private static void collect(){
Stream<Integer> integerStream = Stream.of(1,2,5,7,8,12,33);
List<Integer> list = integerStream.filter(s -> s.intValue()>7).collect(ArrayList::new, ArrayList::add,
ArrayList::addAll);
}

我们在自定义 Collector的时候其实也是这个逻辑,不过我们根本不用自定义, Collectors已经为我们提供了很多拿来即用的收集器。比如我们经常用到Collectors.toList()、Collectors.toSet()、Collectors.toMap()。另外还有比如Collectors.groupingBy()用来分组,比如下面这个例子,按照 userId 字段分组,返回以 userId 为key,List 为value 的 Map,或者返回每个 key 的个数。

1
2
3
4
5
复制代码// 返回 userId:List<User>
Map<String,List<User>> map = user.stream().collect(Collectors.groupingBy(User::getUserId));

// 返回 userId:每组个数
Map<String,Long> map = user.stream().collect(Collectors.groupingBy(User::getUserId,Collectors.counting()));

toArray

collection是返回列表、map 等,toArray是返回数组,有两个重载,一个空参数,返回的是 Object[]。

另一个接收一个 IntFunction<R>类型参数。

1
2
3
4
5
6
7
8
9
10
11
复制代码@FunctionalInterface
public interface IntFunction<R> {

/**
* Applies this function to the given argument.
*
* @param value the function argument
* @return the function result
*/
R apply(int value);
}

比如像下面这样使用,参数是 User[]::new也就是new 一个 User 数组,长度为最后的 Stream 长度。

1
2
3
4
5
复制代码private static void toArray() {
List<User> users = getUserData();
Stream<User> stream = users.stream();
User[] userArray = stream.filter(user -> user.getGender().equals(0) && user.getAge() > 50).toArray(User[]::new);
}

reduce

它的作用是每次计算的时候都用到上一次的计算结果,比如求和操作,前两个数的和加上第三个数的和,再加上第四个数,一直加到最后一个数位置,最后返回结果,就是 reduce的工作过程。

1
2
3
4
5
复制代码private static void reduce(){
Stream<Integer> integerStream = Stream.of(1,2,5,7,8,12,33);
Integer sum = integerStream.reduce(0,(x,y)->x+y);
System.out.println(sum);
}

另外 Collectors好多方法都用到了 reduce,比如 groupingBy、minBy、maxBy等等。

并行 Stream

Stream 本质上来说就是用来做数据处理的,为了加快处理速度,Stream API 提供了并行处理 Stream 的方式。通过 users.parallelStream()或者users.stream().parallel() 的方式来创建并行 Stream 对象,支持的 API 和普通 Stream 几乎是一致的。

并行 Stream 默认使用 ForkJoinPool线程池,当然也支持自定义,不过一般情况下没有必要。ForkJoin 框架的分治策略与并行流处理正好契合。

虽然并行这个词听上去很厉害,但并不是所有情况使用并行流都是正确的,很多时候完全没这个必要。

什么情况下使用或不应使用并行流操作呢?

  1. 必须在多核 CPU 下才使用并行 Stream,听上去好像是废话。
  2. 在数据量不大的情况下使用普通串行 Stream 就可以了,使用并行 Stream 对性能影响不大。
  3. CPU 密集型计算适合使用并行 Stream,而 IO 密集型使用并行 Stream 反而会更慢。
  4. 虽然计算是并行的可能很快,但最后大多数时候还是要使用 collect合并的,如果合并代价很大,也不适合用并行 Stream。
  5. 有些操作,比如 limit、 findFirst、forEachOrdered 等依赖于元素顺序的操作,都不适合用并行 Stream。

最后

Java 25 周岁了,有多少同学跟我一样在用 Java 8,还有多少同学再用更早的版本,请说出你的故事。


壮士且慢,先给点个赞吧,总是被白嫖,身体吃不消!

我是风筝,公众号「古时的风筝」。一个兼具深度与广度的程序员鼓励师,一个本打算写诗却写起了代码的田园码农!你可选择现在就关注我,或者看看历史文章再关注也不迟。公众号回复『666』有惊喜。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试问:Kafka 为什么速度那么快?

发表于 2020-06-10

Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。

即使是普通的服务器,Kafka也可以轻松支持每秒百万级的写入请求,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。

下面从数据写入和读取两方面分析,为什么为什么Kafka速度这么快。

写入数据

Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术, 顺序写入 和 MMFile 。

顺序写入

磁盘读写的快慢取决于你怎么使用它,也就是顺序读写或者随机读写。在顺序读写的情况下,某些优化场景磁盘的读写速度可以和内存持平。

因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。

而且Linux对于磁盘的读写优化也比较多,包括read-ahead和write-behind,磁盘缓存等。如果在内存做这些操作的时候,一个是JAVA对象的内存开销很大,另一个是随着堆内存数据的增多,JAVA的GC时间会变得很长,使用磁盘操作有以下几个好处:

  • 磁盘顺序读写速度超过内存随机读写
  • JVM的GC效率低,内存占用大。使用磁盘可以避免这一问题
  • 系统冷启动后,磁盘缓存依然可用

在这里插入图片描述

上图就展示了Kafka是如何写入数据的, 每一个Partition其实都是一个文件 ,收到消息后Kafka会把数据插入到文件末尾(虚框部分)。

这种方法有一个缺陷—— 没有办法删除数据 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示 读取到了第几条数据 。

在这里插入图片描述

两个消费者,Consumer1有两个offset分别对应Partition0、Partition1(假设每一个Topic一个Partition);Consumer2有一个offset对应Partition2。这个offset是由客户端SDK负责保存的,Kafka的Broker完全无视这个东西的存在;一般情况下SDK会把它保存到zookeeper里面。(所以需要给Consumer提供zookeeper的地址)。

如果不删除硬盘肯定会被撑满,所以Kakfa提供了两种策略来删除数据。一是基于时间,二是基于partition文件大小。具体配置可以参看它的配置文档。

Memory Mapped Files

即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并 不是实时的写入硬盘 ,它充分利用了现代操作系统 分页存储 来利用内存提高I/O效率。

Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件 ,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。

通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。

使用这种方式可以获取很大的I/O提升, 省去了用户空间到内核空间 复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)也有一个很明显的缺陷——不可靠, 写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。 Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);写入mmap之后立即返回Producer不调用flush叫 异步 (async)。

读取数据

Kafka在读取磁盘时做了哪些优化?

基于sendfile实现Zero Copy

传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:

  1. 调用read函数,文件数据被copy到内核缓冲区
  2. read函数返回,文件数据从内核缓冲区copy到用户缓冲区
  3. write函数调用,将文件数据从用户缓冲区copy到内核与socket相关的缓冲区。
  4. 数据从socket缓冲区copy到相关协议引擎。

以上细节是传统read/write方式进行网络文件传输的方式,我们可以看到,在这个过程当中,文件数据实际上是经过了四次copy操作:

硬盘—>内核buf—>用户buf—>socket相关缓冲区—>协议引擎

而sendfile系统调用则提供了一种减少以上多次copy,提升文件传输性能的方法。
在内核版本2.1中,引入了sendfile系统调用,以简化网络上和两个本地文件之间的数据传输。 sendfile的引入不仅减少了数据复制,还减少了上下文切换。

sendfile(socket, file, len);

运行流程如下:

  1. sendfile系统调用,文件数据被copy至内核缓冲区
  2. 再从内核缓冲区copy至内核中socket相关的缓冲区
  3. 最后再socket相关的缓冲区copy到协议引擎

相较传统read/write方式,2.1版本内核引进的sendfile已经减少了内核缓冲区到user缓冲区,再由user缓冲区到socket相关缓冲区的文件copy,而在内核版本2.4之后,文件描述符结果被改变,sendfile实现了更简单的方式,再次减少了一次copy操作。

在apache,nginx,lighttpd等web服务器当中,都有一项sendfile相关的配置,使用sendfile可以大幅提升文件传输性能。

Kafka把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候Kafka直接把文件发送给消费者,配合mmap作为文件读写方式,直接把它传给sendfile。

批量压缩

在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。

  • 如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
  • Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩
  • Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议

总结

Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。

BLOG地址:www.liangsonghua.com

关注微信公众号:松花皮蛋的黑板报,获取更多精彩!

公众号介绍:分享在京东工作的技术感悟,还有JAVA技术和业内最佳实践,大部分都是务实的、能看懂的、可复现的

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何使用JavaScript访问设备前后摄像头

发表于 2020-06-09

在这篇文章中,我将向您展示如何通过JavaScript在网页上访问设备的摄像头,并支持多种浏览器,而无需外部库。

如何使用相机API

要访问用户的相机(或麦克风),我们使用JavaScript MediaStream API。该API允许通过流访问这些设备捕获的视频和音频。

第一步是检查浏览器是否支持此API:

1
2
3
4
5
6
复制代码if (
"mediaDevices" in navigator &&
"getUserMedia" in navigator.mediaDevices
) {
// ok, 浏览器支持它
}

在现代浏览器中,支持是不错的(当然没有Internet Explorer)。

捕获视频流

要捕获由摄像机生成的视频流,我们使用 mediaDevices 对象的 getUserMedia 方法。这个方法接收一个对象,其中包含我们要请求的媒体类型(视频或音频)和一些要求。首先,我们可以通过 {video: true} 来获取摄像机的视频。

1
复制代码const videoStream = await navigator.mediaDevices.getUserMedia({ video: true });

此调用将询问用户是否允许访问摄像机。如果用户拒绝,它将引发异常并且不返回流。因此,必须在 try/catch 块内完成处理这种情况。

请注意,它返回一个Promise,因此您必须使用 async/await 或 then 块。在Mac OS系统上还会弹出授权

点击“好”,就可以访问电脑摄像头了,控制台输出的 videoStream 对象如下

视频规格(requirements)

我们可以通过传递有关所需分辨率以及最小和最大限制的信息来改善视频的要求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码const constraints = {
video: {
width: {
min: 1280,
ideal: 1920,
max: 2560,
},
height: {
min: 720,
ideal: 1080,
max: 1440,
},
},
};

const videoStream = await navigator.mediaDevices.getUserMedia(constraints);

这样,流以正确的宽度和高度比例进入,如果它是处于纵向模式的手机,则需要进行尺寸反转。

在页面上显示视频

既然有了流,我们该如何处理?

我们可以在页面上的 video 元素中显示视频:

1
2
3
4
复制代码// 页面中有一个 <video autoplay id="video"></video> 标签
const video = document.querySelector("#video");
const videoStream = await navigator.mediaDevices.getUserMedia(constraints);
video.srcObject = videoStream;

请注意 video 标签中的自动播放属性 autoplay,没有它,你需要调用 video.play() 才能真正开始显示图像。

访问手机的前后摄像头

默认情况下,getUserMedia 将使用系统默认的视频录制设备。如果是有两个摄像头的手机,它使用前置摄像头。

要访问后置摄像头,我们必须在视频规格中包括 faceModeMode:"environment":

1
2
3
4
5
6
7
复制代码const constraints = {
video: {
width: { ... },
height: { ... },
facingMode: "environment"
},
};

默认值为 faceingMode:"user",即前置摄像头。

需要注意的是,如果你想在已经播放视频的情况下更换摄像机,你需要先停止当前的视频流,然后再将其替换成另一台摄像机的视频流。

1
2
3
复制代码videoStream.getTracks().forEach((track) => {
track.stop();
});

截屏

你可以做的另一件很酷的事情是捕获视频的图像(屏幕快照)。

你可以在canvas上绘制当前视频帧,例如:

1
2
3
4
5
复制代码// 页面中有一个 <canvas id="canvas"></canvas> 标签
const canvas = document.querySelector("#canvas");
canvas.width = video.videoWidth;
canvas.height = video.videoHeight;
canvas.getContext("2d").drawImage(video, 0, 0);

你还可以在 img 元素中显示画布内容。

在本教程创建的示例中,我添加了一个按钮,该按钮可从画布动态创建图像并将其添加到页面:

1
2
3
复制代码const img = document.createElement("img");
img.src = canvas.toDataURL("image/png");
screenshotsContainer.prepend(img);

完整示例和代码

在线效果及源代码:coding.zhangbing.site/view.html?u…

PC

手机QQ中浏览效果


本文首发于公众号 《前端全栈开发者》 ID:by-zhangbing-dev,第一时间阅读最新文章,会优先两天发表新文章。关注后私信回复:大礼包,送某网精品视频课程网盘资料,准能为你节省不少钱!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

写给大忙人看的netty实战 初探netty netty组件

发表于 2020-06-09

初探netty

netty作为一个NIO客户端服务器框架,可以快速、轻松地构建网络应用,比如协议服务器和客户端。netty吸收了FTP、SMTP、HTTP等协议的实现经验,在易用性和敏捷性的基础上保证了程序的稳健性以及可维护性 。

当我们刚开始学习java网络编程时都是开一个socket端口,然后调用方法accept()方法阻塞式等待连接,然后不断读取数据。等到后面掌握了更多后,我们尝试使用非阻塞方式读取数据以及使用Selector选择器的非阻塞I/O。

1591339191895

随着业务不断增长,成千上万的并发量不再是不可能的了。为了更高的吞吐量与可扩展的性能,稳健、易行的客户端开发框架成为开发者的追求,而netty很完美地满足了人们的需求。它封装了java复杂的底层API,并以易于使用的方式暴露出来,使用netty可以更加注重业务逻辑的开发,而不是琐碎的底层架构。

在下面是netty的核心组件,详细的内容将于后续记录:

  • Channel(通道)。

通道是java nio的一个核心概念,它表示一个到实体的操作连接(比如网络连接、文件I/O操作)。

  • 回调。

回调实际上是 一个方法,当一个方法调用时,其指定(或者说是绑定)的回调方法也会被调用。

  • Future。

Future是一个异步操作的占位符,当该异步操作完成时,其对应的Future对象便会调用。netty的Future实现——ChannelFuture允许一个异步操作可以注册多个ChannelFutureListener实例。每个netty的出站I/O都会返回一个ChannelFuture实例。

  • 事件与Handler(处理器)。

netty是通过事件来通知我们操作状态的改变,事件可能有:

+ 连接激活或失活(入站事件)。
+ 数据读取(入站事件)。
+ 用户事件(入站事件)。
+ 错误事件(入站事件)。
+ 打开或关闭到远程节点的连接(出站事件)。
+ 将数据写到套接字(出站事件)。netty为`ChannelHandler`有许多实现,同时你也可以自定义实现。每个事件都会分发到对应的`ChannelHandler`类中的某个方法。

注意:入站和出站是相对ChannelHandler而言的,进入ChannelHandler为入站,从ChannelHandler发出消息是出站。

代码示例

下面是一个简单的netty使用:

入站处理器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码@ChannelHandler.Sharable // 标识一个 channelHandler 可以被多个 channel 安全地调用。
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
// 当有入站消息时该方法就会调用
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf)msg;
System.out.println("服务器收到消息:" + buffer.toString(CharsetUtil.UTF_8));
// 将接收到的消息写给发送者,而不冲刷出站消息。
ctx.write(buffer);
}

@Override
// channelRead消费完读取的数据的时候被触发
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将未决消息冲刷到远程节点,并且关闭该 channel
ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
channelFuture.addListener(ChannelFutureListener.CLOSE);
}

@Override
// 在读操作时处理异常
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 打印异常栈,并关闭该 channel
cause.printStackTrace();
ctx.close();
}
}

注意:netty服务端handler的处理是采用责任链的形式。默认情况下,channelHandler会把对它的方法调用转发给链中的下一个channelHandler。如果 exceptionCaught()方法没有被该链中的某处实现,那么所接收的异常会被传递到channelPipeline的尾端并被记录。

服务器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码public class NettyServer {
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer();
nettyServer.start(8888);
}

public void start(int port){
// 处理TCP连接请求
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 处理I/O事件
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 用于引导和绑定服务器
ServerBootstrap bootstrap = new ServerBootstrap();
//将上面的线程组加入到 bootstrap 中
bootstrap.group(bossGroup,workGroup)
//将通道设置为异步的通道
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 因为 NettyServerHandler 被标注为 @Sharable,所以可以使用相同的实例
socketChannel.pipeline().addLast(new NettyServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG,200)
.childOption(ChannelOption.SO_KEEPALIVE,true);
// 异步绑定服务器,调用 sync() 方法阻塞等待直到绑定完成。
ChannelFuture future = bootstrap.bind(port).sync();
// 获取 channel 的 closeFuture,并且阻塞直到它完成。
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

出站处理器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("netty 活跃", CharsetUtil.UTF_8));
}

// 记录已接收的消息存储
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("客户端接收消息:" + msg.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

SimpleChannelInboundHandler与ChannelInboundHandler:

客户端使用SimpleChannelInboundHandler的原因是它不需要考虑异步操作,当channelRead0方法执行完后,SimpleChannelInboundHandler就会释放指向保存该消息的ByteBuffer的内存。

服务端使用ChannelInboundHandler是因为它需要给客户端传送消息,而ctx.write()方法是异步的,可能channelRead()方法执行完了它还没有返回,所以为了避免这种情况便使用了ChannelInboundHandler。channelReadComplete方法会在channelRead()消费完读取的数据的时候被触发,此时它会将输出冲刷到channel。

客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
复制代码public class NettyClient {
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient();
nettyClient.connect("localhost", 8888);
}

public void connect(String hostname,int port) {
// 处理TCP连接请求
EventLoopGroup group = new NioEventLoopGroup();
try {
// 用于引导和绑定服务器
Bootstrap bootstrap = new Bootstrap();
//将上面的线程组加入到 bootstrap 中
bootstrap.group(group)
//将通道设置为异步的通道
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline()
.addLast(new NettyClientHandler());
}
})
.option(ChannelOption.TCP_NODELAY,true);

// 连接到远程节点,阻塞等待直到连接完成。
ChannelFuture future = bootstrap.connect(hostname, port).sync();
// 阻塞,直到 channel 关闭。
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭线程池并释放所有资源。
group.shutdownGracefully();
}
}
}

总结

从上面的代码示例我们可以看出netty服务端和客户端的构建其实大同小异,都是实现了处理器然后进行绑定。

服务端开发:

  1. 创建并实现处理器逻辑。
    1. 应用程序根据需求扩展ChannelHandler。
    2. 针对不同类型的事件来调用ChannelHandler。
  2. 创建ServerBootstrap实例来引导和绑定服务器。
  3. 创建并分配一个NioEventLoopGroup实例来进行请求处理。
  4. 创建并分配一个NioEventLoopGroup实例来进行事件处理。
  5. 指定服务器绑定的本地InetSocketAddress。
  6. 使用处理器实例初始化每一个新的channel。
  7. 调用ServerBootstrap.bind()方法来绑定服务器。

客户端开发:

  1. 创建并实现处理器逻辑。
    1. 应用程序根据需求扩展ChannelHandler。
    2. 针对不同类型的事件来调用ChannelHandler。
  2. 创建Bootstrap实例来初始化客户端。
  3. 创建并分配一个NioEventLoopGroup实例来进行事件处理。
  4. 为服务器创建InetSocketAddress实例。
  5. 当连接被建立时,一个Handler会被安装到该channel的ChannelPipline上。
  6. 调用Bootstrap.connect()方法连接远程节点 。

在前面,我们初步了解了netty,包括它的核心内容、简单使用。接下来我们将继续学习netty的主要组件与设计理念。

netty组件化设计

Channel接口

我们知道netty的核心组件包括通道,通道是java中很重要的一个概念,它表达一个到实体操作的连接。netty将多个操作抽象出来作为通道,其中包括:

  • Socket操作。
  • 多线程处理。
  • 异步通知 。

我们知道网络中基本的I/O操作(连接建立、读取数据、写入数据)都是依赖于底层网络传输所提供的接口,在java中则表示为Socket类。netty对此类进行进一步的封装,大大降低了Socket类的复杂度,它基于Channel接口提供了一系列的实现:

  • EmbeddedChannel
  • EpollDatagramChannel
  • LocalServerChannel
  • KQueueDatagramChannel
  • NioDatagramChannel
  • NioSctpChannel
  • NioSocketChannel

1591443547407

如图所示,每个Channel都会被分配一个ChannelPipline和ChannelConfig,ChannelConfig包含了该Channel的所有配置,并且支持热更新。通常在Channel实例被创建时,就会创建默认的ChannelConfig:

1
2
3
4
5
复制代码// NioServerSocketChannel构造方法
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

netty提供了一个ChannelOption类,定义了ChannelConfig支持的所有参数类型,可以这样使用:

1
2
3
4
5
6
7
复制代码NioServerSocketChannel channel = new NioServerSocketChannel();
ServerSocketChannelConfig config = channel.config();
config.setOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// 针对每个ChannelOption选项,netty还提供了对应的方法,比如上面的设置可以这样代替
// config.setAllocator(PooledByteBufAllocator.DEFAULT);
// 设置通道
bootstrap.channel(channel.getClass());

为了保证Channel的顺序性,因此实现了Comparable接口。所以如果两个不同的Channel的哈希值一样,那么就会抛出错误。

Channel还提供了另外方法,主要的有:

方法名 描述
eventLoop 返回对应的EventLoop
pipeline 返回对应的ChannelPipline
localAddress 返回本地的SocketAddress
remoteAddress 返回远程的SocketAddress
write 将数据写到远程节点,这个数据将被传递给ChannelPipline,并且排队直到它被冲刷
flush 将之前写好的数据冲刷到远程节点
writeFlush 写数据并将其冲刷到远程节点
isActive 判断该Channel是否处于活动状态

netty支持的传输

NIO

NIO通过Selector实现了所有I/O异步的操作,Selector运行在一个检查状态变化并对其做出相应的线程上。Selector的位模式由java.nio.channels.SelectionKey定义:

名称 描述
OP_ACCEPT 接受连接并创建Channel时获得通知
OP_CONNECT 在建立连接时获得通知
OP_READ 在数据可以读取时获得通知
OP_WRITE 在Channel的发送缓冲区可写时获得通知

1591446199302

Epoll

Epoll是来自Linux的一个高性能、可扩展的I/O事件通知特性。netty为Linux的epoll提供了对应的API。

OIO

OIO(old IO)是旧的阻塞I/O,通过常规的传输API使用,可以在项目移植的时候用它来进行过渡。

Local

它是由netty提供的本地传输方式,用于在同一个JVM中运行的客户端和服务端之间的异步通信。

Embedded

Embedded传输使得我们可以将一组ChannelHandler作为辅助类嵌入到其它的ChannelHandler内部,这样可以在不需要修改内部代码的情况下扩展一个ChannelHandler的功能。Embedded传输的关键是EmbeddChannel的Channel实现。

Channel的生命周期

Channel定义了一组和ChannelInboundHandler相关的状态模式:

状态 描述
ChannelUnregistered Channel已被创建 ,但未被注册到EventLoop
ChannelRegistered Channel被注册到EventLoop
ChannelActive Channel处于激活状态,可以接收和发送数据
ChannelInactive Channel处于未激活状态

基于EmbeddedChannel的单元测试

EmbeddedChannel是netty专门改进针对ChannelHandler的单元测试而提供的。可以用它来模拟发送和请求消息,来测试对应的ChannelHandler的功能实现 。

EmbeddedChannel提供了以下常用的API:

API 描述
writeInbound 写一个入站消息到 EmbeddedChannel。 如果数据能从 EmbeddedChannel 通过 readInbound() 读到,则返回 true;
readInbound 从 EmbeddedChannel 读到入站消息。任何返回遍历整个ChannelPipeline。如果读取还没有准备,则此方法返回 null;
writeOutbound 写一个出站消息到 EmbeddedChannel。 如果数据能从 EmbeddedChannel 通过 readOutbound() 读到,则返回 true;
readOutbound 从 EmbeddedChannel 读到出站消息。任何返回遍历整个ChannelPipeline。如果读取还没有准备,则此方法返回 null;
Finish 如果从入站或者出站中能读到数据,标记 EmbeddedChannel 完成并且返回。这同时会调用 EmbeddedChannel 的关闭方法;

1591534125350

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码@Test
public void testFramesDecoded(){
ByteBuf buf= Unpooled.buffer();
for (int i=0;i<9;i++){
buf.writeByte(i);
}
ByteBuf input=buf.duplicate();
EmbeddedChannel channel=new EmbeddedChannel(
new FixedLengthFrameDecoder(3)
);
assertTrue(channel.writeInbound(input.retain()));
assertTrue(channel.finish());
//读取消息
ByteBuf read=channel.readInbound();
assertEquals(buf.readSlice(3),read);
read.release();

read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
assertNull(channel.readInbound());
buf.release();
}

EventLoop接口

EventLoop定义了netty的核心抽象,用于处理连接的生命周期中所发生的事情。

1591427101690

从上图我们可以看出:

  • 一个EventLoopGroup包含一个或多个EventLoop。
  • 一个EventLoopGroup在生命周期中只能和一个Thread绑定。
  • 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理。
  • 一个Channel在它的生命周期中只能注册于一个EventLoop。
  • 一个EventLoop可能会被分配给一个或多个Channel。

netty使用事件循环EventLoop来处理连接中的请求任务。EventLoop采用了两个基本的API:网络和并发编程。io.netty.util.concurrent包基于JUC包而构建地,用来提供线程的执行器 。io.util.channel包的类为了与Channel事件进行交互,扩展了这些类和接口。

1591530734568

netty的任务调度扩展了JUC的SecheduledExecutorService,因为netty的定时任务可以放入EventLoop的执行队列,不需要像JUC那样进行线程切换,所以降低 了性能消耗:

1
2
3
4
5
6
7
8
> 复制代码ctx.channel().eventLoop().schedule(new Runnable() { // 创建任务处理线程
> @Override
> public void run() {
> System.out.println("EventLoop任务调度");
> }
> }, 60, TimeUnit.MICROSECONDS); // 指定调度周期
>
>

1591532990719

ChannelFuture接口

netty提供了ChannelFuture接口用于作为异步调用的占位符,ChannelFuture.addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时得到通知。

ChannelHandler接口

ChannelHandler接口成为处理入站和出站数据的应用程序逻辑的容器。

netty以适配器模式提供了大量默认的ChannelHandler实现,旨在简化应用程序处理逻辑的开发过程。下面是编写自定义ChannelHandler时常有的适配器类:

  • ChannelHandlerAdapter
  • ChannelInbouAdapter
  • ChannelOutboundHandlerAdapter
  • ChannelDuplexHandler

ChannelHandler有对应的生命周期,在ChannelHandler在ChannelPipline 中添加或移除时,会调用相关操作。

类型 描述
handlerAdded ChannelHandler被添加到ChannelPipline
handlerRemoved ChannelHandler被ChannelPipline移除
exceptionCaught 处理过程中ChannelPipline有错误产生

我们可以通过实现ChannelInboundHandler或ChannelOutboundHandler接口来自定义自己的处理逻辑,也可以通过扩展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter类来实现 。

资源管理:在处理器处理数据时,我们需要确保在最后没有资源泄露,后面讲到的ByteBuf引用计数技术也是为了解决这个问题的。netty提供了ResourceLeakDetector用于检测内存泄漏。泄漏的级别可以通过java -Dio.netty.leakDetectionLevel=泄漏级别来指定。netty定义的泄漏级别有:

级别 描述
DISABLED 禁用泄漏检测
SIMPLE 使用1%的默认采样率并报告发现的泄漏(默认级别)
ADVANCED 使用1%的采样率并报告发现的泄漏以及对应的消息被访问位置
PARANOID 对每次消息访问进行采样,报告发现的泄漏和对应的访问位置

编码器和解码器就是很典型的ChannelHandler实现。

在进行服务器开发时,就要注意解决粘包问题。大体的解决方案就是定义一个协议格式,接收信息后对数据按照协议进行解析。netty对应于不同的格式设计,提供了不同类型的抽象,大部分的命名方式为XxxDecoder和XxxEncoder,比如支持Google的Protocol Buffers的ProtobufEncoder和ProtobufDecoder。

netty的解码编码器大体可以分为两类:

  • 从字节到消息(ByteToMessage)
  • 从消息到消息(MessageToMessage)

我们可以通过扩展netty预置的解码器和编码器来实现自己的处理器。对于每个从入站Channel读取的消息,在channelRead()方法进行完后,它将调用解码器所提供的decode()方法,并将已解码的字节转发给ChannelPipline中的下一个ChannelInboundHandler。出站也类似。

解码器

netty为字节到消息的编码器实现提供了一个基类:ByteToMessageDecoder(继承了ChannelInboundHandlerAdapter),该类会对入站数据进行缓冲,直到准备好处理。它有两个最重要的方法:

API 描述
decode(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) 必须要实现的抽象方法。decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该List,或者该ByteBuf 中没有更多可读取的字节时为止。然后,如果该List 不为空,那么它的内容将会被传递给ChannelPipeline 中的下一个ChannelInboundHandler。
decodeLast(ChannelHandlerContext ctx,ByteBuf in,List< Objec t> out) 当Channel的状态变为非活动时,这个方法将会被调用一次。默认是调用decode()方法。

ReplayingDecoder扩展了ByteToMessageDecoder(继承了ChannelInboundHandlerAdapter),ReplayingDecoder在处理数据时不用判断接收数据的长度。ReplayingDecoder实现了自己的ReplayingDecoderByteBuf,当数据不够时会抛出异常,然后ReplayingDecoder会重置readerIndex并且再次调用decode方法。

虽然ReplayingDecoder使用比ByteToMessageDecoder更便利,但是实际上ReplayingDecoder的运行稍慢于ByteToMessageDecoder。

netty为消息到消息的编码器实现提供了一个基类:MessageToMessageDecoder。它有最重要的方法:

API 描述
decode(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) 必须要实现的抽象方法。decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该List,或者该ByteBuf 中没有更多可读取的字节时为止。然后,如果该List 不为空,那么它的内容将会被传递给ChannelPipeline 中的下一个ChannelInboundHandler。

netty提供了TooLongFrameException异常,用于在解码器在帧超过指定大小限制时抛出,防止解码器缓冲大量数据造成内存耗尽。

解码基于分隔符的协议:基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或者消息段(帧)的开头或者结尾。用于处理基于分隔符的协议和基于长度的协议的解码器有:

名称 描述
DelimiterBasedFrameDecoder 使用任何由用户提供的分隔符来提取帧的通用解码器
LineBasedFrameDecoder 提取由行尾符(\n 或者\r\n)分隔的帧的解码器。这个解码器比DelimiterBasedFrameDecoder 更快

解码基于长度的协议:基于长度的协议通过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记它的结束。用于基于长度的协议的解码器有:

名称 描述
FixedLengthFrameDecoder 提取在调用构造函数时指定的定长帧
LengthFieldBasedFrameDecoder 根据编码进帧头部中的长度值提取帧;该字段的偏移量以及长度在构造函数中指定

编码器

编码器实现了ChannelOutboundHandler,并将出站数据从一种格式转换为另一种格式。

MessageToByteEncoder是将消息转换为字节的基类,最重要的方法是:

API 描述
encode(ChannelHandlerContext ctx,I msg,ByteBuf out) 必须要实现的抽象方法。被调用时将会传入要被该类编码为ByteBuf 的(类型为I 的)出站消息。该ByteBuf 随后将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler

ByteToMessageDecoder之所以比MessageToByteEncoder多个decodeLast方法,是因为解码器通常需要在Channel关闭之后产生最后一个消息。

MessageToMessageEncoder是将消息转换为消息的基类,最重要的方法是:

API 描述
encode(ChannelHandlerContext ctx,I msg,List< Object > out) 必须要实现的抽象方法。。每个通过write()方法写入的消息都将会被传递给encode()方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler

编解码器

从上面我们了解到编码器都是继承了ChannelInboundHandlerAdapter,而解码器继承了ChannelOutboundHandlerAdapter,那么如果我们同时实现这两个特性,是不是就可以将编码和解码整合到一个类中?netty基此为我们提供了字节和消息的编解码器。

字节编解码器:ByteToMessageCodec抽象类,该类结合了ByteToMessageDecoder和MessageToByteEncode,该类由重要的三个方法:

API 描述
decode(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) 只要有字节可以被消费,这个方法就将会被调用。它将入站ByteBuf 转换为指定的消息格式, 并将其转发给ChannelPipeline 中的下一个ChannelInboundHandler
decodeLast(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) 这个方法的默认实现委托给了decode()方法。它只会在Channel 的状态变为非活动时被调用一次。它可以被重写以实现特殊的处理。
encode(ChannelHandlerContext ctx,msg,ByteBuf out) 对于每个将被编码并写入出站ByteBuf 的(类型为I 的)消息来说,这个方法都将会被调用

消息编解码器:MessageToMessageCodec抽象类,通过使用MessageToMessageCodec,可以在一个单个的类中实现该转换的往返过程。该类的两个重要方法:

API 描述
protected abstract decode(ChannelHandlerContext ctx,INBOUND_IN msg,List< Object > out) 这个方法被调用时会被传入INBOUND_IN 类型的消息。它将把它们解码为OUTBOUND_IN 类型的消息,这些消息将被转发给ChannelPipeline 中的下一个ChannelInboundHandler
protected abstract encode(ChannelHandlerContext ctx,OUTBOUND_IN msg,List< Object > out) 对于每个OUTBOUND_IN 类型的消息,这个方法都将会被调用。这些消息将会被编码为INBOUND_IN 类型的消息,然后被转发给ChannelPipeline 中的下一个ChannelOutboundHandler

CombinedChannelDuplexHandler类可以将编码器和解码器组合起来,如下例所示:

1
2
3
4
5
6
7
复制代码// ByteToCharDecoder是自定义的编码器,CharToByteEncoder是自定义的解码器
public class CombinedByteCharCodec extends
CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public CombinedByteCharCodec() {
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}

序列化与反序列化

netty提供了三种序列化方式:

  1. jdk自带的序列化。
名称 描述
CompatibleObjectDecode 和使用 JDK 序列化的非基于 Netty的远程节点进行互操作的解码器
CompatibleObjectEncoder 和使用JDK 序列化的非基于Netty 的远程节点进行互操作的编码器
ObjectDecoder 构建于JDK 序列化之上的使用自定义的序列化来解码的解码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取
ObjectEncoder 构建于JDK 序列化之上的使用自定义的序列化来编码的编码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取
2. 使用JBoss序列化:JBoss不仅修复了jdk自带的序列化器的一些问题,而且提高了性能。
名称 描述
CompatibleMarshallingDecoder,CompatibleMarshallingEncoder 与只使用JDK 序列化的远程节点兼容
MarshallingDecoder,MarshallingEncoder 适用于使用JBoss Marshalling 的节点。这些类必须一起使用
3. 使用Protocol Buffers序列化:Protocol Buffers是一种由谷歌公司开发并开源的数据交换格式。
名称 描述
ProtobufDecoder 使用protobuf 对消息进行解码
ProtobufEncoder 使用protobuf 对消息进行编码
ProtobufVarint32FrameDecoder 根据消息中的Google Protocol Buffers 的”Base 128 Varints”整型长度字段值动态地分割所接收到的ByteBuf
ProtobufVarint32LengthFieldPrepender 向ByteBuf 前追加一个Google Protocal Buffers 的”Base 128 Varints”整型的长度字段值

预置的ChannelHandler

使用https:netty通过SslHandler使用了javax.net.ssl包,来实现SSL加密。此外它还提供了OpenSSL工具的SSLEngine实现——OPenSSLEngine,它的性能比jdk的SSLEngine更好。netty默认会尝试加载OpenSSLEngine,如果失败再去加载JdkSSLEngine。它的相关方法有:

API 描述
setHandshakeTimeout (long,TimeUnit),setHandshakeTimeoutMillis (long),getHandshakeTimeoutMillis() 设置和获取超时时间,超时之后,握手ChannelFuture 将会被通知失败
setCloseNotifyTimeout (long,TimeUnit),setCloseNotifyTimeoutMillis (long),getCloseNotifyTimeoutMillis() 设置和获取超时时间,超时之后,将会触发一个关闭通知并关闭连接。这也将会导致通知该ChannelFuture 失败
handshakeFuture() 返回一个在握手完成后将会得到通知的ChannelFuure。如果握手先前已经执行过了,则返回一个包含了先前的握手结果的ChannelFuture
close(),close(ChannelPromise),close(ChannelHandlerContext,ChannelPromise) 发送close_notify 以请求关闭并销毁底层的SslEngine

Http编解码器:netty提供了多个ChannelHandler用于将数据格式化为http响应或将请求解析为数据。下图展示了http请求响应的组成部分:

1591601567435

主要的编解码器有:

名称 描述
HttpRequestEncoder 将HttpRequest、HttpContent 和LastHttpContent 消息编码为字节
HttpResponseEncoder 将HttpResponse、HttpContent 和LastHttpContent 消息编码为字节
HttpRequestDecoder 将字节解码为HttpRequest、HttpContent 和LastHttpContent 消息
HttpResponseDecoder 将字节解码为HttpResponse、HttpContent 和LastHttpContent 消息

http聚合:对于某些请求或响应数据,netty的编解码器可能会无法完整解析它们,而是将它们解析为多个数据片段,比如HttpServerCodec只能获取uri中参数,那么如果使用post请求,因为信息保存在messageBody,所以无法完全解析。这时就需要加上HttpObjectAggregator。HttpObjectAggregator结构体系如下:

1591602159652

MessageAggregator的decode()里有一个currentMessage参数,它是该handler的成员变量,每一个channel对应一个handler实例,这个currentMessage会存储多次decode迭代的结果,这就是聚合实现的关键。

http压缩:虽然http数据压缩会带来服务器时钟的开销,但是可以节省网络流量,加快传输速率。客户端可以使用HttpContentDecompressor来处理来自服务器的内容,服务端使用HttpContentCompressor()进行数据压缩。

websocket:netty为实现websocket长连接提供了多种框架。以下是websocketFrame类型:

名称 描述
BinaryWebSocketFrame 数据帧:二进制数据
TextWebSocketFrame 数据帧:文本数据
ContinuationWebSocketFrame 数据帧:属于上一个BinaryWebSocketFrame 或者TextWebSocketFrame 的文本的或者二进制数据
CloseWebSocketFrame 控制帧:一个CLOSE 请求、关闭的状态码以及关闭的原因
PingWebSocketFrame 控制帧:请求一个PongWebSocketFrame
PongWebSocketFrame 控制帧:对PingWebSocketFrame 请求的响应

要想为WebSocket 添加安全性,只需要将SslHandler作为第一个ChannelHandler 添加到
ChannelPipeline 中。

连接管理:netty为检测处理空闲和超时连接提供了管理器。主要的有:

名称 描述
IdleStateHandler 当连接空闲时间太长时,将会触发一个IdleStateEvent 事件。然后,可以通过在ChannelInboundHandler中重写userEventTriggered()方法来处理该IdleStateEvent 事件
ReadTimeoutHandler 如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个ReadTimeoutException 并关闭对应的Channel。可以通过重写ChannelHandler 中的exceptionCaught()方法来检测该ReadTimeoutException
WriteTimeoutHandler 如果在指定的时间间隔内没有任何出站数据写入,则抛出一个WriteTimeoutException 并关闭对应的Channel 。可以通过重写你的ChannelHandler 的exceptionCaught()方法检测该WriteTimeoutException

ChannelPipline接口

ChannelPipline接口实现了链式调用,可添加ChannelHandler容器。当Channel被创建时,它会被自动分配到它所属的ChannelPipline上。

1591427854256

如图,当一个入站消息进入时,它会从ChannelPipline的头部开始,并被传递给第一个ChannelInboundHandler,当在此ChannelHandler被处理完后,它又会被传递给下一个ChannelInboundHandler,直到到达ChannelPipline的尾部。消息的出站与入站差不多。

当ChannelHandler被添加到ChannelPipline时,它会被分配一个ChannelHandlerContext,其代表了ChannelHandler和ChannelPipline之间的绑定。

在netty中,有两种发送消息的方式:

  1. 直接写到Channel中,这将使消息从ChannelPipline的尾部开始流动。
  2. 直接写到ChannelHandlerContext中,这将是消息从ChannelPipline中的 下一个ChannelHandlerContext开始流动。

在ChannelPipline 传播事件时,它会测试ChannelPipline 中的下一个ChannelHandler的类型是否符合事件的运动方向,如果不匹配,那么它会跳过该ChannelHandler。

ChannelHandlerContext

ChannelHandlerContext主要作用是使ChannelHandler和ChannelPipline进行交互,ChannelHandler可以通知ChannelHandler所属的ChannelPipline的下一个ChannelHandler ,可以修改它所属的ChannelPipline。

ChannelHandlerContext API:

1591518493786

ChannelHandlerContext中有些方法Channel和ChannelPipline也有,要注意的是如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个ChannelPipeline 进行传播。而调用位于ChannelHandlerContext
上的相同方法,则将从当前所关联的ChannelHandler 开始,并且只会传播给位于该
ChannelPipeline 中的下一个能够处理该事件的ChannelHandler。

ServerBootstrap/Boostrap

netty的引导类为应用程序的网络层配置提供了容器,ServerBootstrap用于引导服务端,Bootstrap用于引导客户端。

1591429873430

我们可以看到ServerBootstrap和Bootstrap都实现了AbstractBootstrap抽象类。

在前面文章中,在实现服务端引导程序时,我们创建并绑定了两个EventLoopGroup,这是为什么呢?查看源码我们可以看出它的解释:

1591430052433

源码的解释为服务端引导需要一个EventLoopGroup表示服务器自身已被绑定到了本地端口正在监听的套接字,第二组则是用来处理客户端连接的。

查看Bootstrap的源码我们可以看到当调用Bootstrap.group(EventLoopGroup group)时,实际上是调用AbstractBootstrap的group方法,正和ServerBootstrap.group(EventLoopGroup parentGroup, EventLoopGroup childGroup)的第一行一样。

ByetBuf

网络传输的基本单位都是字节,而对于字节操作可以使用java的ByetBuffer API。但是ByteBuf有一些的局限性 :

  1. 它的长度长度固定,一旦分配完成,容量不能动态扩展和收缩,当需要编码的POJO对象大于ByteBuffer的容量时,会发生索引越界异常;
  2. ByteBuffer只有一个标识位控的指针position,读写的时候需要手工调用flip()和rewind()等,很容易导致程序处理失败。
  3. API功能有限,一些高级和实用的特性需要使用者自己编程实现。

对于此,netty在ByteBuffer的基础上再次构建,从而实现了新的、功能强大的ByteBuf API,它具有以下优点:

  1. 可以被用户自定义的缓冲区类型扩展。
  2. 通过内置复合缓冲区实现零拷贝。
  3. 容量变长增长 。
  4. 读和写采用不同的指针。
  5. 支持方法链式调用。
  6. 支持引用计数。
  7. 支持池化。

ByteBuf的使用模式

堆缓冲区

堆缓冲区模式又称为:支撑数组(backing array)模式。之间将数据存放在JVM的堆空间,通过将数据存储在数组中实现。示例如下:

1
2
3
4
5
6
7
8
9
10
复制代码public static void heapBuffer() {
// 创建Java堆缓冲区
ByteBuf heapBuf = Unpooled.buffer();
if (heapBuf.hasArray()) { // 判断是否有支撑数组
byte[] array = heapBuf.array(); // 获取该支撑数组引用
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); // 计算第一个字节偏移量
int length = heapBuf.readableBytes(); // 获取可读字节数
handleArray(array, offset, length); // 调用自己的方法
}
}

直接缓冲区

直接缓冲区属于堆外分配的直接内存,不会占用堆的容量。适用于套接字传输过程,避免了数据从内部缓冲区拷贝到直接缓冲区的过程,性能较好。它的主要缺点是相对于基于堆的缓冲区,它们的分配和释放代价都较为昂贵,并且因为数据不是在java堆上 ,所以处理前你需要再进行一次复制。

1
2
3
4
5
6
7
8
9
复制代码public static void directBuffer() {
ByteBuf directBuf = Unpooled.directBuffer();
if (!directBuf.hasArray()) { // 如果不是堆缓冲区
int length = directBuf.readableBytes(); // 获取可读字节数
byte[] array = new byte[length]; // 分配一个新数组来保存数据
directBuf.getBytes(directBuf.readerIndex(), array); // 将数据复制到新数组
handleArray(array, 0, length); // 调用自己的方法
}
}

复合缓冲区

复合缓冲区是netty特有的缓冲区。本质上类似于提供一个或多个ByteBuf的组合视图,可以根据需要添加和删除不同类型的ByteBuf。复合缓冲区不支持访问其支撑数组。因此如果要访问,需要先将内容拷贝到堆内存中,再进行访问。

1
2
3
4
5
6
7
8
9
10
11
复制代码public static void byteBufComposite() {
// 复合缓冲区,只是提供一个视图
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = Unpooled.buffer();
ByteBuf bodyBuf = Unpooled.directBuffer();
messageBuf.addComponents(headerBuf, bodyBuf); // 将ByteBuf实例追加到CompositeByteBuf
messageBuf.removeComponent(0); // 移除索引位置为0的缓冲区
for (ByteBuf buf : messageBuf) { // 遍历缓冲区
System.out.println(buf.toString());
}
}

字节级操作

随机访问索引:ByteBuf的索引是从0开始的。

1
2
3
4
5
6
7
复制代码public static void byteBufRelativeAccess() {
ByteBuf buffer = Unpooled.buffer();
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);// 不改变readerIndex值
System.out.println((char) b);
}
}

对于那些只需要一个索引值参数的方法,它们都不会改变readIndex和writeIndex,不过可以通过调用readerIndex(index)和writeIndex(index)来手动改变。

可丢弃字节:可丢弃字节区域是指[0,readerIndex)之间的区域。可调用discardReadBytes()方法丢弃已经读过的字节。discardReadBytes()方法会移动可读字节区域内容(CONTENT)。如果频繁调用,会有多次数据复制开销,对性能有一定的影响

可读字节:可读字节区域是指[readerIndex, writerIndex)之间的区域。任何readxxx()和skipxxx()的操作方法,都会改变readerIndex索引。

可写字节:可写字节区域是指[writerIndex, capacity)之间的区域。任何writexxx()的操作方法都将改变writerIndex的值。

索引管理:

  1. markReaderIndex()、markWriterIndex()——将流的当前位置标记。
  2. resetReaderIndex()、resetWriterIndex()——将流重置到标记位置。
  3. readIndex(index)、writeIndex(int)——将读/写索引移到指定位置。
  4. clear()——将readerIndex和writeerIndex设置为0,但是内存中的内容不会被清除。

查找操作:查找ByteBuf指定的值,最简单的就是使用indexOf()方法,较复杂的可以通过ByteBufProcessor作为参数的方法达,比如int index = buffer.forEachByte(ByteProcessor.FIND_CR);。

派生缓冲区:派生缓冲区为ByteBuf提供了一个访问的视图。视图仅仅提供一种访问操作,不做任何拷贝操作。如果你修改了这个新的ByteBuf实例的具体内容,那么对应的源实例也会被修改,如果需要拷贝现有缓冲区的真实副本,请使用copy()或copy(int, int)方法。

读/写操作:有两种类别的读写:

  1. get()和set()操作——从给定的索引开始,并且保持索引不变。
  2. read()和write()操作——从给定的索引开始,并且根据已经访问过的字节数对索引进行访问。

ByteBufHolder接口

ByteBufHolder是netty的高级特性,为缓冲区池化提供了支持。可以通过子类实现ByteBufHolder接口,根据自身需要添加自己需要的数据字段。可以用于自定义缓冲区类型扩展字段。

ByetBuf分配

按需分配

netty通过ByteBufAllocator接口实现了(ByteBuf的)池化。可以通过Channel或者ChannelHandler的ChannelHandlerContext获取一个到ByteBufAllocator的引用。

netty提供了两种ByteBufAllocator的实现:PooledByteBufAllocator(池化)、UnpolledByteBufAlloactor(非池化)。netty默认使用PooledByteBufAllocator,但是也可以通过ChannelConfig修改。

Unpooled缓冲区

在未获得ByteBufAllocator引用的情况下,我们可以使用netty提付的Unpooled工具类来创建未池化的ByteBufAllocator。

ByteBufUtil类

ByteBufUtil类提供了用于操作ByteBuf的静态的辅助方法。hexdump()方法以十六进制的表示形式大于ByteBuf内容。equals(ByteBuf, ByteBuf)方法用来判断两个ByteBuf实例是否相等。

引用计数

netty通过实现ReferenceCounted接口为ButeBuf和ButeBufHolder引入了引用计数技术。当计数为0时,系统将会回收缓冲区,它降低了内存分配的开销。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

刚去面试Spring Cloud 问了我35个问题,全部分享

发表于 2020-06-09

❝
对于目前来说,微服务已经是程序员必会的技能之一了。Spring Cloud 作为一个优质的微服务框架,已经被很多公司使用。以下小编给大家整理35道Spring Cloud 的知识点,或者说是面试题都不冲突。后续还会继续更新⛽️

❞

1.什么是SpringCloud?

Spring Cloud为开发人员提供了工具,以快速构建分布式系统中的一些常见模式(例如,配置管理,服务发现,断路器,智能路由,微代理,控制总线,一次性令牌,全局锁,领导选举,分布式会话,群集状态)。它们可以在任何分布式环境中正常工作,包括开发人员自己的笔记本电脑,裸机数据中心以及Cloud Foundry等托管平台。

2.什么是微服务?

所谓的微服务是SOA架构下的最终产物,该架构的设计目标是为了肢解业务,使得服务能够独立运行。微服务设计原则:

1、各司其职 。

2、服务高可用和可扩展性。

3.SpringCloud有哪些特征?

Spring Cloud专注于为典型的用例和可扩展性机制(包括其他用例)提供良好的开箱即用体验。

  • 分布式/版本化配置
  • 服务注册和发现
  • 路由
  • 服务到服务的调用
  • 负载均衡
  • 断路器
  • 全局锁
  • 领导选举和集群状态
  • 分布式消息传递

4.SpringCloud核心组件?

Eureka : 注册中心

Ribbon :客服端负载均衡

Hystrix : 服务容错处理

Feign: 声明式REST客户端

Zuul : 服务网关

Config : 分布式配置

5.SpringCloud基于什么协议?

HTTP

6.SpringCloud和Dubbo区别?

image-20200429230443620

image-20200429230443620

7.Eureka是什么?

云端服务发现,一个基于 REST 的服务,用于定位服务,以实现云端中间层服务发现和故障转移。

8.服务治理的基础角色?

服务注册中心:提供服务注册与发现的能力。

服务提供者:提供服务的应用,会把自己提供的服务注册到注册中心。

服务消费者:服务的消费者,从注册中心获取服务列表。

9.什么是服务续约?

在注册完服务以后,服务提供者会维护一个心跳来向注册中心证明自己还活着,以此防止被“剔除服务”。

10.什么是服务下线?

当服务实例进行正常关闭时,会发送一个REST请求(我要下线了)给注册中心,收到请求后,将该服务状态设置下线(DOWN),并把这事件传播出去。

11.什么是失效剔除?

当服务非正常下线时,可能服务注册中心没有收到下线请求,注册中心会创建一个定时任务(默认60s)将没有在固定时间(默认90s)内续约的服务剔除掉。

12.什么是自我保护机制?

在运行期间,注册中心会统计心跳失败比例在15分钟之内是否低于85%,如果低于的情况,注册中心会将当前注册实例信息保护起来,不再删除这些实例信息,当网络恢复后,退出自我保护机制。

自我保护机制让服务集群更稳定、健壮。

13.Ribbon是什么?

提供云端负载均衡,有多种负载均衡策略可供选择,可配合服务发现和断路器使用。

14.Ribbon负载均衡的注解是?

@LoadBalanced

15.Ribbon负载均衡策略有哪些?

RandomRule : 随机。

RoundRobinRule : 轮询。

RetryRule : 重试。

WeightedResponseTimeRule : 权重。

ClientConfigEnabledRoundRobinRule : 一般不用,通过继承该策略,默认的choose就实现了线性轮询机制。可以基于它来做扩展。

BestAvailableRule : 通过便利负载均衡器中维护的所有服务实例,会过滤到故障的,并选择并发请求最小的一个。

PredicateBasedRule : 先过滤清单,再轮询。

AvailabilityFilteringRule :继承了父类的先过滤清单,再轮询。调整了算法。

ZoneAvoidanceRule : 该类也是PredicateBasedRule的子类,它可以组合过滤条件。以ZoneAvoidancePredicate为主过滤条件,以AvailabilityPredicate为次过滤条件。

16.什么是服务熔断?

服务熔断的作用类似于我们家用的保险丝,当某服务出现不可用或响应超时的情况时,为了防止整个系统出现雪崩,暂时停止对该服务的调用。

17.什么是服务降级?

服务降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行。

18.什么是Hystrix?

熔断器,容错管理工具,旨在通过熔断机制控制服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。

19.断路器Hystrix的有哪些功能?

  • 通过第三方客户端访问依赖服务出现高延迟或者失败时,为系统提供保护和控制 。
  • 在复杂的分布式系统中防止级联失败(服务雪崩效应) 。
  • 快速失败 (Failfast) 同时能快速恢复。
  • 提供失败回滚 (Fallback) 和优雅的服务降级机制。
  • 提供近实时的监控、 报警和运维控制手段。

20.Hystrix将远程调用封装到?

HystrixCommand 或者 HystrixObservableCommand对象中。

21.启动熔断降级服务的注解?

@EnableHystrix

22.什么是Feign?

Feign是一种声明式、模板化的HTTP客户端。

23.Feign优点?

1.feign采用的是基于接口的注解。
2.feign整合了ribbon,具有负载均衡的能力。
3.整合了Hystrix,具有熔断的能力。

24.什么是Config?

配置管理工具包,让你可以把配置放到远程服务器,集中化管理集群配置,目前支持本地存储、Git以及Subversion。

25.Config组件中的两个角色?

Config Server : 配置中心服务端。

Config Client : 配置中心客户端。

26.什么是Zuul?

Zuul 是在云平台上提供动态路由,监控,弹性,安全等边缘服务的框架。Zuul 相当于是设备和 Netflix 流应用的 Web 网站后端所有请求的前门。

27.使用Zuul的优点?

  • 方便监控。可以在微服务网管手机监控数据并将其推送到外部系统进行分析。
  • 方便认证。可在网关进行统一认证,然后在讲请求转发到后端服务。
  • 隐藏架构实现细节,提供统一的入口给客户端请求,减少了客户端和每个微服务的交互次数。
  • 可以统一处理切面任务,避免每个微服务自己开发,提升效率。
  • 高可用高伸缩性的服务,避免单点失效。

28.Zuul的核心是?

过滤器。

29.Zuul有几种过滤器类型?分别是?

4种。

pre : 可以在请求被路由之前调用。

适用于身份认证的场景,认证通过后再继续执行下面的流程。

route : 在路由请求时被调用。

适用于灰度发布场景,在将要路由的时候可以做一些自定义的逻辑。

post :在 route 和 error 过滤器之后被调用。

这种过滤器将请求路由到达具体的服务之后执行。适用于需要添加响应头,记录响应日志等应用场景。

error : 处理请求时发生错误时被调用。

在执行过程中发送错误时会进入 error 过滤器,可以用来统一记录错误信息。

30.什么是Sleuth?

日志收集工具包,封装了Dapper和log-based追踪以及Zipkin和HTrace操作,为SpringCloud应用实现了一种分布式追踪解决方案。

31.Sleuth帮助我们做了哪些工作?

  • 可以方便的了解到每个采样的请求耗时,分析出哪些服务调用比较耗时。
  • 对于程序未捕捉的异常,可以在集成Zipkin服务页面上看到。
  • 识别调用比较频繁的服务,从而进行优化。

32.什么是Bus?

事件、消息总线,用于在集群(例如,配置变化事件)中传播状态变化,可与Spring Cloud Config联合实现热部署。

33.eureka比zookeeper的优势在?

A:高可用 C:一致性,P:分区容错性

Zookeeper保证了CP,Eureka保证了AP。

Eureka可以很好的应对因网络故障导致部分节点失去联系的情况,而不会像Zookeeper那样使整个微服务瘫痪。

34.什么是Stream?

数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。

35.你还知道哪些微服务框架?

Dubbo
Motan
Tars
gRPC
Thrift

参考:

  • 《Spring Cloud微服务实战》
  • 《Spring Cloud微服务全栈技术与案例解析》
  • 《Spring Cloud微服务架构开发实战》
  • ​ https://spring.io/projects/spring-cloud
  • ​ https://www.springcloud.cc/
  • ​ 百度百科

❝
持续更新。。。

❞


本文使用 mdnice 排版

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring 异步调用,多线程,一行代码实现 Spring

发表于 2020-06-09

Spring 异步调用,多线程

  • 概述
  • 快速入门
  • 异步回调
  • 异步异常处理
  • 自定义执行器

1、概述


在日常开发中,我们的逻辑都是同步调用,顺序执行。但是在某些情况下我们希望异步调用,将主线程和部分逻辑分开,以达到程序更快速的执行,提升性能。例如,高并发的接口,用户操作日志等。

异步调用,对应的是同步调用。

  • 同步调用:指程序按照 定义顺序 依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;
  • 异步调用:指程序在顺序执行时,不等异步调用返回执行结果,就执行后面的程序。

考虑到异步的可靠性,我们一般会考虑引入消息队列,例如: RabbitMQ、RocketMQ、Kafka 等等。 但是在一些时候,我们不需要如此高的可靠性,可以使用进程内的队列或线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码public static void main(String[] args) {
// 创建线程池。这里只是临时测试,开发规范...
ExecutorService executor = Executors.newFixedThreadPool(10);

// 提交任务到线程池中执行。
executor.submit(new Runnable() {

@Override
public void run() {
System.out.println("听说我被异步调用了");
}

});
}

在进程内的队列或者线程池,相对不可靠的原因是,队列和线程池中的任务仅仅存储在内存中,如何JVM进程被异常关闭,将会导致丢失,未被执行。

而分布式消息队列,异步调用会以一个消息的形式,存储在消息服务器上,所以即使JVM进程被异常中断,消息依然在消息服务队列的服务器上

所以使用进程内的队列或者线程池来实现异步调用的话,一定要尽可能的保证JVM进程的优雅关闭,保证他们在关闭前被执行完。

在 Spring Framework 的 Spring Task 模块,提供了 @Async 注解,可以添加在方法上,自动实现该方法的异步调用

简单来说,我们可以像使用 @Transactional声明式事务,使用SpringTask提供的@Async注解,声明式异步。而在实现原理上,也是基于 Spring AOP 拦截,实现异步提交该操作到线程池中,达到异步调用的目的。

2、快速入门

2.1 引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-29-async-demo</artifactId>

<dependencies>
<!-- 引入 Spring Boot 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>

因为 Spring Task 是 Spring Framework 的模块,所以在我们引入 spring-boot-web 依赖后,无需特别引入它。

2.2 Application


创建Application类,添加@EnableAsync 开启 @Async 的支持

1
2
3
4
5
6
7
8
复制代码@SpringBootApplication
@EnableAsync // 开启 @Async 的支持
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
  • 在类上添加 @EnableAsync 注解,启用异步功能。

2.3 DemoService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
复制代码package cn.iocoder.springboot.lab29.asynctask.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Service
public class DemoService {

private Logger logger = LoggerFactory.getLogger(getClass());

public Integer execute01() {
logger.info("[execute01]");
sleep(10);
return 1;
}

public Integer execute02() {
logger.info("[execute02]");
sleep(5);
return 2;
}

private static void sleep(int seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Async
public Integer zhaoDaoNvPengYou(Integer a, Integer b) {
throw new RuntimeException("程序员不需要女朋友");
}

}
  • 定义 execute01和 execute02方法,分别模拟sleep 10秒和5秒。
  • 同时在方法中,使用logger打印日志,方便我们看到每个方法的执行时间,和执行的线程

2.4 同步调用测试

编写DemoServiceTest测试类,添加#task01()方法,同步调用上述方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private DemoService demoService;

@Test
public void task01() {
long now = System.currentTimeMillis();
logger.info("[task01][开始执行]");

demoService.execute01();
demoService.execute02();

logger.info("[task01][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
}
}

运行单元测试,打印日志如下:

1
2
3
4
复制代码2020-06-02 09:16:03.391  INFO 3108 --- [      main] c.i.s.l.a.service.DemoServiceTest        : [task01][开始执行]
2020-06-02 09:16:03.402 INFO 3108 --- [ main] c.i.s.l.asynctask.service.DemoService : [execute01]
2020-06-02 09:16:13.403 INFO 3108 --- [ main] c.i.s.l.asynctask.service.DemoService : [execute02]
2020-06-02 09:16:18.403 INFO 3108 --- [ main] c.i.s.l.a.service.DemoServiceTest : [task01][结束执行,消耗时长 15012 毫秒]
  • 两个方法都按顺序执行,执行时间15秒。
  • 都在主线程执行。

2.5 异步调用测试


修改DemoServiceTest ,增加 execute01Async()和execute02Async()异步调用方法,代码:

1
2
3
4
5
6
7
8
9
复制代码	@Async
public Integer execute01Async() {
return this.execute01();
}

@Async
public Integer execute02Async() {
return this.execute02();
}
  • 在execute01Async() 和 execute01Async()上,添加@Async实现异步调用

修改DemoServiceTest类, 编写 #task02() 方法,异步调用上述的两个方法。

1
2
3
4
5
6
7
8
9
10
复制代码	@Test
public void task02() {
long now = System.currentTimeMillis();
logger.info("[task02][开始执行]");

demoService.execute01Async();
demoService.execute02Async();

logger.info("[task02][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
}

打印日志:

1
2
3
复制代码2020-06-02 10:57:41.643  INFO 14416 --- [main] c.i.s.l.a.service.DemoServiceTest        : [task02][开始执行]
2020-06-02 10:57:41.675 INFO 14416 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-06-02 10:57:41.682 INFO 14416 --- [main] c.i.s.l.a.service.DemoServiceTest : [task02][结束执行,消耗时长 39 毫秒]
  • DemoService 的两个方法,异步执行,所以主线程只消耗 39毫秒左右。注意,实际这两个方法,并没有执行完成。
  • DemoService 的两个方法,都在异步线程池中执行。

2.6 等待异步调用完成测试

在上面的**【2.5 异步调用测试】**异步调用中,两个方法只是异步调用,方法没有执行完。在一些业务场景中,我们达到异步调用效果,同时主线程有返回结果,就需要主线程阻塞等待异步调用的结果。

修改DemoService,添加execute01AsyncWithFuture()和execute01AsyncWithFuture()异步调用,并返回 Future 对象 。代码:

1
2
3
4
5
6
7
8
9
复制代码	@Async
public Future<Integer> execute01AsyncWithFuture() {
return AsyncResult.forValue(this.execute01());
}

@Async
public Future<Integer> execute02AsyncWithFuture() {
return AsyncResult.forValue(this.execute02());
}
  • 在这里两个异步方法中,添加了AsyncResult.forValue(this.execute02());,返回带有执行结果的Future对象

修改DemoServiceTest类, 编写 #task02() 方法,异步调用上述的两个方法,并阻塞线程等待异步调用返回结果

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码	@Test
public void task03() throws ExecutionException, InterruptedException {
long now = System.currentTimeMillis();
logger.info("[task03][开始执行]");

// 执行任务
Future<Integer> execute01Result = demoService.execute01AsyncWithFuture();
Future<Integer> execute02Result = demoService.execute02AsyncWithFuture();
// 阻塞等待结果
execute01Result.get();
execute02Result.get();

logger.info("[task03][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
}
  • 异步调用两个方法,并返回对应Future对象。这两个的异步调用逻辑,可以并行执行。
  • Future对象的get()方法,效果:阻塞线程等待返回结果。

打印日志:

1
2
3
4
5
复制代码2020-06-02 13:56:43.955  INFO 7828 --- [ main] c.i.s.l.a.service.DemoServiceTest        : [task03][开始执行]
2020-06-02 13:56:43.987 INFO 7828 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-06-02 13:56:44.008 INFO 7828 --- [ task-1] c.i.s.l.asynctask.service.DemoService : [execute01]
2020-06-02 13:56:44.008 INFO 7828 --- [ task-2] c.i.s.l.asynctask.service.DemoService : [execute02]
2020-06-02 13:56:54.008 INFO 7828 --- [ main] c.i.s.l.a.service.DemoServiceTest : [task03][结束执行,消耗时长 10053 毫秒]
  • 两个异步调用方法,分别由线程池 task-1和task-2 同时执行。 因为主线程阻塞等待执行结果 ,执行时间10秒,当同时有多个异步调用,线程阻塞等待,执行时间由消耗最长的异步调用逻辑所决定。

2.7 应用配置文件

在application中,添加spring Task配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码spring:
task:
# Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
execution:
thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
pool: # 线程池相关
core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
shutdown:
await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • Spring 本身依赖了Spring Task
  • 在spring.task.execution配置项, Spring Task 调度任务的配置 ,对应TaskExecutionProperties配置类
  • Spring Boot TaskExecutionAutoConfiguration 自动化配置类, 实现了Spring Task 自动配置,创建了ThreadPoolTaskExecutor基于线程池的任务执行器,实际上ThreadPoolTaskExecutor就是ThreadPoolExecutor的分装,主要增加执行任务,并返回 ListenableFuture 对象功能。

之前说的异步的可靠性,要优雅的关闭进程。spring.task.execution.shutdown配置关闭,是为了实现Spring Task的优雅关闭。异步任务在执行过程中,如果应用开始关闭,异步任务需要使用的Bean被销毁,例如:需要访问数据库连接池,这时候异步任务还在执行中,一旦需要访问数据库,但是没有对应的Bean将会导致报错。

  • 通过配置await-termination: true,实现在应用关闭时,等待异步任务执行完成。这样在应用关闭时,Spring 会等待 ThreadPoolTaskExecutor执行完任务,再销毁Bean。
  • 应用关闭时,在某些业务场景下我们不可能让Spring一直等待,异步任务的完成。通过配置await-termination-period: 60,设置Spring最大等待时间,时间一到将不再等待异步任务完成。

3、异步回调

业务场景中,执行完异步任务,可能需回调。下面介绍异步执行完成后,实现自定义回调。

3.1、AsyncResult 源码解释

在 2.6 等待异步调用完成 中,我们看到的 AsyncResult类 表示异步结果。返回结果分为两种情况:

  • 执行成功时,调用AsyncResult#forValue(V value) 静态方法,返回成功的 ListenableFuture对象,

源码:

1
2
3
4
5
6
7
8
9
复制代码	/**
* Create a new async result which exposes the given value from {@link Future#get()}.
* @param value the value to expose
* @since 4.2
* @see Future#get()
*/
public static <V> ListenableFuture<V> forValue(V value) {
return new AsyncResult<>(value, null);
}
  • 执行异常时,调用 AsyncResult#forExecutionException(Throwable ex) 静态方法,返回异常的 ListenableFuture 对象。源码:
1
2
3
4
5
6
7
8
9
10
11
复制代码	/**
* Create a new async result which exposes the given exception as an
* {@link ExecutionException} from {@link Future#get()}.
* @param ex the exception to expose (either an pre-built {@link ExecutionException}
* or a cause to be wrapped in an {@link ExecutionException})
* @since 4.2
* @see ExecutionException
*/
public static <V> ListenableFuture<V> forExecutionException(Throwable ex) {
return new AsyncResult<>(null, ex);
}

AsyncResult 同时也实现了 ListenableFuture接口,提供异步执行结果回调处理。

1
复制代码public class AsyncResult<V> implements ListenableFuture<V>

ListenableFuture接口,源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码public interface ListenableFuture<T> extends Future<T> {

// 添加回调方法,统一处理成功和异常的情况。
void addCallback(ListenableFutureCallback<? super T> callback);

// 添加成功和失败的回调方法,分别处理成功和异常的情况。
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);


// 将 ListenableFuture 转换成 JDK8 提供的 CompletableFuture 。
// 这样,后续我们可以使用 ListenableFuture 来设置回调
default CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
addCallback(completable::complete, completable::completeExceptionally);
return completable;
}

}

ListenableFuture继承了Future,所以AsyncResult 也实现了Future的接口,源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码public interface Future<V> {

// 如果任务还没开始,执行 cancel(...) 方法将返回 false;
// 如果任务已经启动,执行 cancel(true) 方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回 true ;
// 当任务已经启动,执行 cancel(false) 方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回 false ;
// 当任务已经完成,执行 cancel(...) 方法将返回 false 。
// mayInterruptRunning 参数表示是否中断执行中的线程。
boolean cancel(boolean mayInterruptIfRunning);

// 如果任务完成前被取消,则返回 true 。
boolean isCancelled();

// 如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回 true 。
boolean isDone();

// 获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。
V get() throws InterruptedException, ExecutionException;

// 获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的 timeout 时间,该方法将抛出异常。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

AsyncResult 中对addCallback(...)方法回调的实现,源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码	@Override
public void addCallback(ListenableFutureCallback<? super V> callback) {
addCallback(callback, callback);
}

@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException != null) { // 《1》
failureCallback.onFailure(exposedException(this.executionException));
}
else { // 《2》
successCallback.onSuccess(this.value);
}
}
catch (Throwable ex) { // 《3》
// Ignore
}
}

// 从 ExecutionException 中,获得原始异常。
private static Throwable exposedException(Throwable original) {
if (original instanceof ExecutionException) {
Throwable cause = original.getCause();
if (cause != null) {
return cause;
}
}
return original;
}
  • 从 ListenableFutureCallback 知道 ,ListenableFutureCallback 接口同时继承了 SuccessCallback、FailureCallback接口
1
复制代码public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback
  • 《1》,如果是异常处理结果调用 failureCallback回调
  • 《2》,如果是成功处理结果调用successCallback回调
  • 《3》,如果回调逻辑发生异常,直接忽略。假设多个回调,其中一个出现议程,不会影响其他的回调。

实际上,AsyncResult 是作为异步执行的结果。既然是结果,执行就已经完成。所以,在我们调用 #addCallback(...) 接口方法来添加回调时,必然直接使用回调处理执行的结果。

AsyncResult 对 Future 定义的所有方法,实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
复制代码// AsyncResult.java

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false; // 因为是 AsyncResult 是执行结果,所以直接返回 false 表示取消失败。
}

@Override
public boolean isCancelled() {
return false; // 因为是 AsyncResult 是执行结果,所以直接返回 false 表示未取消。
}

@Override
public boolean isDone() {
return true; // 因为是 AsyncResult 是执行结果,所以直接返回 true 表示已完成。
}

@Override
@Nullable
public V get() throws ExecutionException {
// 如果发生异常,则抛出该异常。
if (this.executionException != null) {
throw (this.executionException instanceof ExecutionException ?
(ExecutionException) this.executionException :
new ExecutionException(this.executionException));
}
// 如果执行成功,则返回该 value 结果
return this.value;
}

@Override
@Nullable
public V get(long timeout, TimeUnit unit) throws ExecutionException {
return get();
}

3.2 ListenableFutureTask

在我们调用使用 @Async 注解的方法时,如果方法返回的类型是 ListenableFuture 的情况下,实际方法返回的是 ListenableFutureTask 对象。

ListenableFutureTask 类,也实现 ListenableFuture 接口,继承 FutureTask 类,ListenableFuture 的 FutureTask 实现类。

ListenableFutureTask 对 ListenableFuture 定义的 #addCallback(...) 方法,实现源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();

@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.callbacks.addCallback(callback);
}

@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.callbacks.addSuccessCallback(successCallback);
this.callbacks.addFailureCallback(failureCallback);
}
  • 可以看到在ListenableFutureTask中,暂存回调到ListenableFutureCallbackRegistry中

ListenableFutureTask 对 FutureTask 已实现的 #done() 方法,进行重写。实现源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码@Override
protected void done() {
Throwable cause;
try {
// 获得执行结果
T result = get();
// 执行成功,执行成功的回调
this.callbacks.success(result);
return;
}
catch (InterruptedException ex) { // 如果有中断异常 InterruptedException异常,则打断当前线程,直接返回
Thread.currentThread().interrupt();
return;
}
catch (ExecutionException ex) { // 如果有 ExecutionException 异常,获取真实异常,并设置到cause中
cause = ex.getCause();
if (cause == null) {
cause = ex;
}
}
catch (Throwable ex) {
cause = ex; // 设置异常到 cause 中
}
// 执行异常,执行异常的回调
this.callbacks.failure(cause);
}

3.3 具体示例

修改 DemoService 的代码,增加 #execute02() 的异步调用,并返回 ListenableFuture 对象。代码如下:

1
2
3
4
5
6
7
8
9
复制代码@Async
public ListenableFuture<Integer> execute01AsyncWithListenableFuture() {
try {
//int i = 1 / 0;
return AsyncResult.forValue(this.execute02());
} catch (Throwable ex) {
return AsyncResult.forExecutionException(ex);
}
}
  • 根据执行的结果,包装出成功还是异常的 AsyncResult 对象。

DemoServiceTest 测试类,编写 #task04() 方法,异步调用上述的方法,在塞等待执行完成的同时,添加相应的回调 Callback 方法。代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
复制代码@Test
public void task04() throws ExecutionException, InterruptedException {
long now = System.currentTimeMillis();
logger.info("[task04][开始执行]");

// <1> 执行任务
ListenableFuture<Integer> execute01Result = demoService.execute01AsyncWithListenableFuture();
logger.info("[task04][execute01Result 的类型是:({})]",execute01Result.getClass().getSimpleName());
execute01Result.addCallback(new SuccessCallback<Integer>() { // <2.1> 增加成功的回调

@Override
public void onSuccess(Integer result) {
logger.info("[onSuccess][result: {}]", result);
}

}, new FailureCallback() { // <2.1> 增加失败的回调

@Override
public void onFailure(Throwable ex) {
logger.info("[onFailure][发生异常]", ex);
}

});
execute01Result.addCallback(new ListenableFutureCallback<Integer>() { // <2.2> 增加成功和失败的统一回调

@Override
public void onSuccess(Integer result) {
logger.info("[onSuccess][result: {}]", result);
}

@Override
public void onFailure(Throwable ex) {
logger.info("[onFailure][发生异常]", ex);
}

});
// <3> 阻塞等待结果
execute01Result.get();

logger.info("[task04][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
}
  • <1> 处,调用 DemoService#execute01AsyncWithListenableFuture() 方法,异步调用该方法,并返回 ListenableFutureTask 对象。这里,我们看下打印的日志。
1
复制代码2020-06-08 14:13:16.738  INFO 5060 --- [  main] c.i.s.l.a.service.DemoServiceTest : [task04][execute01Result 的类型是:(ListenableFutureTask)]
  • <2.1> 处,增加成功的回调和失败的回调。
  • <2.2> 处,增加成功和失败的统一回调。
  • <3> 处,阻塞等待结果。执行完成后,我们会看到回调被执行,打印日志如下:
1
2
3
复制代码2020-06-08 14:13:21.752  INFO 5060 --- [   main] c.i.s.l.a.service.DemoServiceTest   : [task04][结束执行,消耗时长 5057 毫秒]
2020-06-08 14:13:21.752 INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest : [onSuccess][result: 2]
2020-06-08 14:13:21.752 INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest : [onSuccess][result: 2]
  1. 异步异常处理器

通过实现 AsyncUncaughtExceptionHandler 接口,达到对异步调用的异常的统一处理。

创建 GlobalAsyncExceptionHandler 类,全局统一的异步调用异常的处理器。代码:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码@Component
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.error("[handleUncaughtException][method({}) params({}) 发生异常]",
method, params, ex);
}

}
  • 类上,我们添加了 @Component 注解,考虑到胖友可能会注入一些 Spring Bean 到属性中。
  • 实现 #handleUncaughtException(Throwable ex, Method method, Object... params) 方法,打印异常日志。

注意,AsyncUncaughtExceptionHandler 只能拦截返回类型非 Future 的异步调用方法。通过看 AsyncExecutionAspectSupport#handleError(Throwable ex, Method method, Object... params) 的源码,可以很容易得到这个结论,代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码// AsyncExecutionAspectSupport.java

protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
// 重点!!!如果返回类型是 Future ,则直接抛出该异常。
if (Future.class.isAssignableFrom(method.getReturnType())) {
ReflectionUtils.rethrowException(ex);
} else {
// 否则,交给 AsyncUncaughtExceptionHandler 来处理。
// Could not transmit the exception to the caller with default executor
try {
this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
} catch (Throwable ex2) {
logger.warn("Exception handler for async method '" + method.toGenericString() +
"' threw unexpected exception itself", ex2);
}
}
}
  • 对了,AsyncExecutionAspectSupport 是 AsyncExecutionInterceptor 的父类哟。所以哟,返回类型为 Future 的异步调用方法,需要通过「3. 异步回调」来处理。

4.2 AsyncConfig


创建 AsyncConfig 类,配置异常处理器。代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig implements AsyncConfigurer {

@Autowired
private GlobalAsyncExceptionHandler exceptionHandler;

@Override
public Executor getAsyncExecutor() {
return null;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return exceptionHandler;
}

}
  • 在类上添加 @EnableAsync 注解,启用异步功能。这样「2. Application」 的 @EnableAsync 注解,也就可以去掉了。
  • 实现 AsyncConfigurer 接口,实现异步相关的全局配置。 此时此刻,胖友有没想到 SpringMVC 的 WebMvcConfigurer 接口。
  • 实现 #getAsyncUncaughtExceptionHandler() 方法,返回我们定义的 GlobalAsyncExceptionHandler 对象。
  • 实现 #getAsyncExecutor() 方法,返回 Spring Task 异步任务的默认执行器。这里,我们返回了 null ,并未定义默认执行器。所以最终会使用 TaskExecutionAutoConfiguration 自动化配置类创建出来的 ThreadPoolTaskExecutor 任务执行器,作为默认执行器。

4.3 DemoService

DemoService 类,增加 #zhaoDaoNvPengYou(...) 的异步调用。代码如下:

1
2
3
4
复制代码@Async
public Integer zhaoDaoNvPengYou(Integer a, Integer b) {
throw new RuntimeException("异步全局异常");
}

4.4 简单测试

1
2
3
4
5
6
7
复制代码 @Test
public void testZhaoDaoNvPengYou() throws InterruptedException {
demoService.zhaoDaoNvPengYou(1, 2);

// sleep 1 秒,保证异步调用的执行
Thread.sleep(1000);
}

运行单元测试,执行日志如下:

1
2
3
复制代码2020-06-08 15:26:35.120 ERROR 11388 --- [         task-1] .i.s.l.a.c.a.GlobalAsyncExceptionHandler : [handleUncaughtException][method(public java.lang.Integer cn.iocoder.springboot.lab29.asynctask.service.DemoService.zhaoDaoNvPengYou(java.lang.Integer,java.lang.Integer)) params([1, 2]) 发生异常]

java.lang.RuntimeException: 异步全局异常
  1. 自定义执行器

在 上面 中,我们使用 Spring Boot TaskExecutionAutoConfiguration 自动化配置类,实现自动配置 ThreadPoolTaskExecutor 任务执行器。

本小节,我们希望两个自定义 ThreadPoolTaskExecutor 任务执行器,实现不同方法,分别使用这两个 ThreadPoolTaskExecutor 任务执行器。

5.1 引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-29-async-demo</artifactId>

<dependencies>
<!-- 引入 Spring Boot 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
  • 和 上面引入依赖 一致。

5.2 应用配置文件


在 application.yml 中,添加 Spring Task 定时任务的配置,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码spring:
task:
# Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
execution-one:
thread-name-prefix: task-one- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
pool: # 线程池相关
core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
shutdown:
await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
# Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
execution-two:
thread-name-prefix: task-two- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
pool: # 线程池相关
core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
shutdown:
await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • 在 spring.task 配置项下,我们新增了 execution-one 和 execution-two 两个执行器的配置。在格式上,我们保持和在「2.7 应用配置文件」看到的 spring.task.exeuction 一致,方便我们后续复用 TaskExecutionProperties 属性配置类来映射。

5.3 AsyncConfig


创建 AsyncConfig 类,配置两个执行器。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
复制代码@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig
{

public static final String EXECUTOR_ONE_BEAN_NAME = "executor-one";
public static final String EXECUTOR_TWO_BEAN_NAME = "executor-two";

@Configuration
public static class ExecutorOneConfiguration
{

@Bean(name = EXECUTOR_ONE_BEAN_NAME + "-properties")
@Primary
@ConfigurationProperties(prefix = "spring.task.execution-one")
// 读取 spring.task.execution-one 配置到 TaskExecutionProperties 对象
public TaskExecutionProperties taskExecutionProperties()
{
return new TaskExecutionProperties();
}

@Bean(name = EXECUTOR_ONE_BEAN_NAME)
public ThreadPoolTaskExecutor threadPoolTaskExecutor()
{
// 创建 TaskExecutorBuilder 对象
TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
// 创建 ThreadPoolTaskExecutor 对象
return builder.build();
}

}

@Configuration
public static class ExecutorTwoConfiguration
{

@Bean(name = EXECUTOR_TWO_BEAN_NAME + "-properties")
@ConfigurationProperties(prefix = "spring.task.execution-two")
// 读取 spring.task.execution-two 配置到 TaskExecutionProperties 对象
public TaskExecutionProperties taskExecutionProperties()
{
return new TaskExecutionProperties();
}

@Bean(name = EXECUTOR_TWO_BEAN_NAME)
public ThreadPoolTaskExecutor threadPoolTaskExecutor()
{
// 创建 TaskExecutorBuilder 对象
TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
// 创建 ThreadPoolTaskExecutor 对象
return builder.build();
}
}

private static TaskExecutorBuilder createTskExecutorBuilder(TaskExecutionProperties properties)
{
// Pool 属性
TaskExecutionProperties.Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
// Shutdown 属性
TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
// 其它基本属性
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
// builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
// builder = builder.taskDecorator(taskDecorator.getIfUnique());
return builder;
}

}
  • 参考 Spring Boot TaskExecutionAutoConfiguration 自动化配置类,我们创建了 ExecutorOneConfiguration 和 ExecutorTwoConfiguration 配置类,来分别创建 Bean 名字为 executor-one 和 executor-two 两个执行器。

5.4 DemoService


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码@Service
public class DemoService
{

private Logger logger = LoggerFactory.getLogger(getClass());

@Async(AsyncConfig.EXECUTOR_ONE_BEAN_NAME)
public Integer execute01()
{
logger.info("[execute01]");
return 1;
}

@Async(AsyncConfig.EXECUTOR_TWO_BEAN_NAME)
public Integer execute02()
{
logger.info("[execute02]");
return 2;
}

}
  • 在 @Async 注解上,我们设置了其使用的执行器的 Bean 名字。

5.5 简单测试


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest
{

@Autowired
private DemoService demoService;

@Test
public void testExecute() throws InterruptedException
{
demoService.execute01();
demoService.execute02();

// sleep 1 秒,保证异步调用的执行
Thread.sleep(1000);
}

}

运行单元测试,执行日志如下:

1
2
复制代码2020-06-08 15:38:28.846  INFO 12020 --- [     task-one-1] c.i.s.l.asynctask.service.DemoService    : [execute01]
2020-06-08 15:38:28.846 INFO 12020 --- [ task-two-1] c.i.s.l.asynctask.service.DemoService : [execute02]
  • 从日志中,我们可以看到,#execute01() 方法在 executor-one 执行器中执行,而 #execute02() 方法在 executor-two 执行器中执行。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

用 Swagger 测试接口,怎么在请求头中携带 Token

发表于 2020-06-09

来自一个小伙伴在微信上的提问:

看到这个问题,松哥忽然想到我自己之前写过 Spring Boot+Swagger 的用法:

  • SpringBoot 整合 Swagger2

也写过 OAuth2 + Jwt 的用法:

  • 想让 OAuth2 和 JWT 在一起愉快玩耍?请看松哥的表演

但是还没有将这两个结合在一起写过,所以小伙伴们对此有了疑问,想一想这还是一个非常常见的问题,因为现在使用令牌登录的场景越来越多,在这种情况下,如果使用 Swagger 来测试接口,要怎么在请求头中携带 Token 呢?今天松哥就来和大家聊一聊。

1.项目规划

如果小伙伴们没有看过松哥之前发的 OAuth2 系列文章,建议一定先看下(公众号江南一点雨后台回复 OAuth2 获取),再来看本文内容,否则接下来的内容可能会犯迷糊。

这里松哥搭建一个 OAuth2+JWT 的环境来做演示。一共搭建两个服务:

服务名 端口 备注
auth-server 8080 授权服务器
user-server 8081 资源服务器

我稍微解释一下:

  • auth-server 就是我的资源服务器,用来颁发 JWT 令牌。
  • user-server 则是资源服务器,访问 user-server 上的资源,都需要携带令牌才能访问。
  • swagger 则用来给 user-server 上的接口生成文档。

OK,这是我们项目的一个大致规划。

2.环境搭建

接下来我们来搭建 OAuth2 测试环境。

2.1 授权服务器搭建

首先我们搭建一个名为 auth-server 的授权服务,搭建的时候,选择如下三个依赖:

  • Web
  • Spring Cloud Security
  • Spirng Cloud OAuth2

项目创建完成后,首先提供一个 Spring Security 的基本配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码@Configuration
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Bean
PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}

@Override
@Bean
public AuthenticationManager authenticationManagerBean() throws Exception {
return super.authenticationManagerBean();
}

@Override
protected void configure(AuthenticationManagerBuilder auth) throws Exception {
auth.inMemoryAuthentication()
.withUser("sang")
.password(passwordEncoder().encode("123"))
.roles("admin")
.and()
.withUser("javaboy")
.password(passwordEncoder().encode("123"))
.roles("user");
}

@Override
protected void configure(HttpSecurity http) throws Exception {
http.csrf().disable().formLogin();
}
}

在这段代码中,为了代码简洁,我就不把 Spring Security 用户存到数据库中去了,直接存在内存中。

这里我创建了一个名为 sang 的用户,密码是 123,角色是 admin。同时我还配置了一个表单登录。

这段配置的目的,实际上就是配置用户。例如你想用微信登录第三方网站,在这个过程中,你得先登录微信,登录微信就要你的用户名/密码信息,那么我们在这里配置的,其实就是用户的用户名/密码/角色信息。

需要注意的是,在当前案例中,我将采用 OAuth2 中的 password 模式进行登录,因此这里还需要明确的提供一个 AuthenticationManager 的 Bean。

基本的用户信息配置完成后,接下来我们来配置授权服务器。

首先来配置 TokenStore:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码@Configuration
public class AccessTokenConfig {
@Bean
TokenStore tokenStore() {
return new JwtTokenStore(jwtAccessTokenConverter());
}

@Bean
JwtAccessTokenConverter jwtAccessTokenConverter() {
JwtAccessTokenConverter converter = new JwtAccessTokenConverter();
converter.setSigningKey("javaboy");
return converter;
}
}
  1. TokenStore 我们使用 JwtTokenStore 这个实例。使用了 JWT,access_token 实际上就不用存储了(无状态登录,服务端不需要保存信息),因为用户的所有信息都在 jwt 里边,所以这里配置的 JwtTokenStore 本质上并不是做存储。
  2. 另外我们还提供了一个 JwtAccessTokenConverter,这个 JwtAccessTokenConverter 可以实现将用户信息和 JWT 进行转换(将用户信息转为 jwt 字符串,或者从 jwt 字符串提取出用户信息)。
  3. 另外,在 JWT 字符串生成的时候,我们需要一个签名,这个签名需要自己保存好。

接下来对授权服务器进行详细配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
复制代码@EnableAuthorizationServer
@Configuration
public class AuthorizationServer extends AuthorizationServerConfigurerAdapter {
@Autowired
TokenStore tokenStore;
@Autowired
ClientDetailsService clientDetailsService;
@Autowired
AuthenticationManager authenticationManager;
@Autowired
PasswordEncoder passwordEncoder;
@Autowired
JwtAccessTokenConverter jwtAccessTokenConverter;

@Bean
AuthorizationServerTokenServices tokenServices() {
DefaultTokenServices services = new DefaultTokenServices();
services.setClientDetailsService(clientDetailsService);
services.setSupportRefreshToken(true);
services.setTokenStore(tokenStore);
services.setAccessTokenValiditySeconds(60 * 60 * 24 * 2);
services.setRefreshTokenValiditySeconds(60 * 60 * 24 * 7);
TokenEnhancerChain tokenEnhancerChain = new TokenEnhancerChain();
tokenEnhancerChain.setTokenEnhancers(Arrays.asList(jwtAccessTokenConverter));
services.setTokenEnhancer(tokenEnhancerChain);
return services;
}

@Override
public void configure(AuthorizationServerSecurityConfigurer security) throws Exception {
security.allowFormAuthenticationForClients();
}

@Override
public void configure(ClientDetailsServiceConfigurer clients) throws Exception {
clients.inMemory()
.withClient("javaboy")
.secret(passwordEncoder.encode("123"))
.resourceIds("res1")
.authorizedGrantTypes("password", "refresh_token")
.scopes("all")
.redirectUris("http://localhost:8082/index.html");
}

@Override
public void configure(AuthorizationServerEndpointsConfigurer endpoints) throws Exception {
endpoints
.authenticationManager(authenticationManager)
.tokenServices(tokenServices());
}
}

这段代码有点长,我来给大家挨个解释:

  1. 创建 AuthorizationServer 类继承自 AuthorizationServerConfigurerAdapter,来对授权服务器做进一步的详细配置,AuthorizationServer 类记得加上 @EnableAuthorizationServer 注解,表示开启授权服务器的自动化配置。
  2. 在 AuthorizationServer 类中,我们其实主要重写三个 configure 方法。
  3. AuthorizationServerSecurityConfigurer 用来配置令牌端点的安全约束,也就是这个端点谁能访问,谁不能访问。
  4. ClientDetailsServiceConfigurer 用来配置客户端的详细信息,在之前文章中,松哥和大家讲过,授权服务器要做两方面的检验,一方面是校验客户端,另一方面则是校验用户,校验用户,我们前面已经配置了,这里就是配置校验客户端。客户端的信息我们可以存在数据库中,这其实也是比较容易的,和用户信息存到数据库中类似,但是这里为了简化代码,我还是将客户端信息存在内存中,这里我们分别配置了客户端的 id,secret、资源 id、授权类型、授权范围以及重定向 uri。授权类型我在之前文章中和大家一共讲了四种,四种之中不包含 refresh_token 这种类型,但是在实际操作中,refresh_token 也被算作一种。
  5. AuthorizationServerEndpointsConfigurer 这里用来配置令牌的访问端点和令牌服务。
  6. tokenServices 这个 Bean 主要用来配置 Token 的一些基本信息,例如 Token 是否支持刷新、Token 的存储位置、Token 的有效期以及刷新 Token 的有效期等等。Token 有效期这个好理解,刷新 Token 的有效期我说一下,当 Token 快要过期的时候,我们需要获取一个新的 Token,在获取新的 Token 时候,需要有一个凭证信息,这个凭证信息不是旧的 Token,而是另外一个 refresh_token,这个 refresh_token 也是有有效期的。

好了,如此之后,我们的授权服务器就算是配置完成了,接下来我们启动授权服务器。

如果小伙伴们对于上面的配置感到迷糊,可以在公众号后台回复 OAuth2,先系统的学习一下松哥的 OAuth2 教程。

2.2 资源服务器搭建

接下来我们搭建一个资源服务器。大家网上看到的例子,资源服务器大多都是和授权服务器放在一起的,如果项目比较小的话,这样做是没问题的,但是如果是一个大项目,这种做法就不合适了。

资源服务器就是用来存放用户的资源,例如你在微信上的图像、openid 等信息,用户从授权服务器上拿到 access_token 之后,接下来就可以通过 access_token 来资源服务器请求数据。

我们创建一个新的 Spring Boot 项目,叫做 user-server ,作为我们的资源服务器,创建时,添加如下依赖:

项目创建成功之后,先把前面的 AccessTokenConfig 拷贝到资源服务器上,然后添加如下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码@Configuration
@EnableResourceServer
public class ResourceServerConfig extends ResourceServerConfigurerAdapter {
@Autowired
TokenStore tokenStore;

@Override
public void configure(ResourceServerSecurityConfigurer resources) throws Exception {
resources.resourceId("res1").tokenStore(tokenStore);
}

@Override
public void configure(HttpSecurity http) throws Exception {
http.authorizeRequests()
.antMatchers("/admin/**").hasRole("admin")
.anyRequest().authenticated();
}
}

这段配置代码很简单,我简单的说一下:

  1. 首先在 configure 方法中配置资源 ID 和 TokenStore,这里配置好之后,会自动调用 JwtAccessTokenConverter 将 jwt 解析出来,jwt 里边就包含了用户的基本信息,所以就不用远程校验 access_token 了。
  2. 最后配置一下资源的拦截规则,这就是 Spring Security 中的基本写法,我就不再赘述。

接下来我们再来配置两个测试接口:

1
2
3
4
5
6
7
8
9
10
11
复制代码@RestController
public class HelloController {
@GetMapping("/hello")
public String hello() {
return "hello";
}
@GetMapping("/admin/hello")
public String admin() {
return "admin";
}
}

如此之后,我们的资源服务器就算配置成功了。

2.3 测试

分别启动授权服务器和资源服务器,先访问授权服务器获取 access_token:

再利用拿到的 access_token 去访问资源服务器:

OK,测试没问题。

3.整合 Swagger

接下来,我们在 user-server 中加入 swagger 功能,首先我们加入 swagger 依赖:

1
2
3
4
5
6
7
8
9
10
复制代码 <dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>

这里加入的依赖有两个,一个用来生成接口数据,另一个 swagger-ui 用来做数据展示。

3.1 认证方式一

请求头加参数,这里给大家介绍两种,先来看第一种。

先配置一个 Docket 实例,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码@Configuration
@EnableSwagger2
public class Swagger2Config {
@Bean
Docket docket() {
return new Docket(DocumentationType.SWAGGER_2)
.select()
.apis(RequestHandlerSelectors.basePackage("org.javaboy.oauth2.res.controller"))
.paths(PathSelectors.any())
.build()
.securityContexts(Arrays.asList(securityContexts()))
.securitySchemes(Arrays.asList(securitySchemes()))
.apiInfo(new ApiInfoBuilder()
.description("接口文档的描述信息")
.title("微人事项目接口文档")
.contact(new Contact("javaboy","http://www.javaboy.org","wangsong0210@gmail.com"))
.version("v1.0")
.license("Apache2.0")
.build());
}
private SecurityScheme securitySchemes() {
return new ApiKey("Authorization", "Authorization", "header");
}

private SecurityContext securityContexts() {
return SecurityContext.builder()
.securityReferences(defaultAuth())
.forPaths(PathSelectors.any())
.build();
}

private List<SecurityReference> defaultAuth() {
AuthorizationScope authorizationScope = new AuthorizationScope("xxx", "描述信息");
AuthorizationScope[] authorizationScopes = new AuthorizationScope[1];
authorizationScopes[0] = authorizationScope;
return Arrays.asList(new SecurityReference("Authorization", authorizationScopes));
}
}

这里的配置稍微有点长,我来给大家解释下:

  • 首先通过 @EnableSwagger2 注解启用 Swagger2。
  • 配置一个 Docket Bean,这个 Bean 中,配置映射路径和要扫描的接口的位置。
  • 在 apiInfo 中,主要配置一下 Swagger2 文档网站的信息,例如网站的 title,网站的描述,联系人的信息,使用的协议等等。
  • 通过 securitySchemes 来配置全局参数,这里的配置是一个名为 Authorization 的请求头(OAuth2 中需要携带的请求头)。
  • securityContexts 则用来配置有哪些请求需要携带 Token,这里我们配置了所有请求。

配置完成后,我们还需要给 swagger-ui 放行,否则 swagger-ui 相关的静态资源会被 Spring Security 拦截下来:

1
2
3
4
5
6
7
8
9
10
11
复制代码@Configuration
public class SecurityConfig extends WebSecurityConfigurerAdapter {

@Override
public void configure(WebSecurity web) throws Exception {
web.ignoring().antMatchers("/swagger-ui.html")
.antMatchers("/webjars/**")
.antMatchers("/v2/**")
.antMatchers("/swagger-resources/**");
}
}

配置完成后,重启 user-server,浏览器输入 http://localhost:8081/swagger-ui.html,结果如下:

大家可以看到,页面中多了一个 Authorize 按钮,点击该按钮,输入 Bearer ${token},如下:

输入完成后,点击 Authorize 按钮,完成认证,接下来,user-server 中的各种接口就可以直接调用测试了。

上面这种方式比较通用,不仅仅适用于 OAuth2,也适用于其他一些自定义的 token 登录方式。

但是这种方式需要开发者先通过其他途径获取到 access_token,有的人会觉得这样有点麻烦,那么有没有更好的办法呢?请看方式二。

3.2 认证方式二

认证方式二就是直接在 Swagger 中填入认证信息,这样就不用从外部去获取 access_token 了,效果如下:

我们来看下这个怎么配置。

由于 swagger 去请求 /oauth/token 接口会跨域,所以我们首先要修改 auth-server ,使之支持跨域:

主要是两方面的修改,首先是配置 CorsFilter,允许跨域,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码@Configuration
public class GlobalCorsConfiguration {
@Bean
public CorsFilter corsFilter() {
CorsConfiguration corsConfiguration = new CorsConfiguration();
corsConfiguration.setAllowCredentials(true);
corsConfiguration.addAllowedOrigin("*");
corsConfiguration.addAllowedHeader("*");
corsConfiguration.addAllowedMethod("*");
UrlBasedCorsConfigurationSource urlBasedCorsConfigurationSource = new UrlBasedCorsConfigurationSource();
urlBasedCorsConfigurationSource.registerCorsConfiguration("/**", corsConfiguration);
return new CorsFilter(urlBasedCorsConfigurationSource);
}
}

然后在 SecurityConfig 中开启跨域支持:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码@Configuration
@Order(Ordered.HIGHEST_PRECEDENCE)
public class SecurityConfig extends WebSecurityConfigurerAdapter {
...
...
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.requestMatchers().antMatchers(HttpMethod.OPTIONS, "/oauth/**")
.and()
.csrf().disable().formLogin()
.and()
.cors();
}
}

经过这两步的配置,服务端的跨域支持就开启了。

接下来我们在 user-server 中修改关于 Docket bean 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
复制代码@Configuration
@EnableSwagger2
public class Swagger2Config {
@Bean
Docket docket() {
return new Docket(DocumentationType.SWAGGER_2)
.select()
.apis(RequestHandlerSelectors.basePackage("org.javaboy.oauth2.res.controller"))
.paths(PathSelectors.any())
.build()
.securityContexts(Arrays.asList(securityContext()))
.securitySchemes(Arrays.asList(securityScheme()))
.apiInfo(new ApiInfoBuilder()
.description("接口文档的描述信息")
.title("微人事项目接口文档")
.contact(new Contact("javaboy","http://www.javaboy.org","wangsong0210@gmail.com"))
.version("v1.0")
.license("Apache2.0")
.build());
}

private AuthorizationScope[] scopes() {
return new AuthorizationScope[]{
new AuthorizationScope("all", "all scope")
};
}

private SecurityScheme securityScheme() {
GrantType grant = new ResourceOwnerPasswordCredentialsGrant("http://localhost:8080/oauth/token");
return new OAuthBuilder().name("OAuth2")
.grantTypes(Arrays.asList(grant))
.scopes(Arrays.asList(scopes()))
.build();
}

private SecurityContext securityContext() {
return SecurityContext.builder()
.securityReferences(Arrays.asList(new SecurityReference("OAuth2", scopes())))
.forPaths(PathSelectors.any())
.build();
}
}

这段配置跟前面的类似,主要是 SecurityScheme 不同。这里采用了 OAuthBuilder 来构建,构建时即得配置 token 的获取地址。

好了,配置完成,重启 auth-server 和 user-server 进行测试。测试效果就是松哥前面给出的图片,不再赘述。

这种方式最大的好处就是不用通过其他途径获取 access_token,直接在 swagger-ui 页面输入 password 模式的认证参数即可。非常方便,仅限于 OAuth2 模式。

4.小结

好了,今天就和小伙伴们介绍了在 Swagger 请求中,如何修改请求头的问题,感兴趣的小伙伴可以下来试试哦~

本文案例下载地址:github.com/lenve/sprin…

好啦,小伙伴们如果觉得有收获,记得点个在看鼓励下松哥哦~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

我以为我对Mysql事务很熟,直到我遇到了阿里面试官

发表于 2020-06-09

前言

迎面走来了一个风尘仆仆的身穿格子衫的男子,手里拿着一个MacBook Pro,看着那稀少的发量,和那从容淡定的眼神。

我心里一颤,我去,这是架构师,架构师来面我技术面,我心里顿时不淡定了,表面很稳实则心里慌得一批。

果然,他手里拿着我的简历,快速的扫了一下,然后用眼角余光看了一下我,上来就开问。

Mysql事务简介

「面试官:」 看你简历上说精通Mysql优化方法,你先来说说你对Mysql的事务的了解吧。

我心里喜了一下,这个简单啊,哥我可是北大(背大)的,再来面试之前,早就有准备的,二话不说,上去就是背。

「我:」 好的,数据库的事务是指一组sql语句组成的数据库逻辑处理单元,在这组的sql操作中,要么全部执行成功,要么全部执行失败。

「我:」 这里的一组sql操作,举个简单又经典的例子就是转账了,事务A中要进行转账,那么转出的账号要扣钱,转入的账号要加钱,这两个操作都必须同时执行成功,为了确保数据的一致性。

「面试官:」 刚才你提到了数据一致性,你知道事务的特性吗?说说你的理解。

ACID简介

「我:」 在Mysql中事务的四大特性主要包含:「原子性(Atomicity)」、「一致性(Consistent)」、「隔离性(Isalotion)」、「持久性(Durable)」,简称为ACID。

「我:」 原子性是指事务的原子性操作,对数据的修改要么全部执行成功,要么全部失败,实现事务的原子性,是基于日志的Redo/Undo机制。

「我:」 一致性是指执行事务前后的状态要一致,可以理解为数据一致性。隔离性侧重指事务之间相互隔离,不受影响,这个与事务设置的隔离级别有密切的关系。

「我:」 持久性则是指在一个事务提交后,这个事务的状态会被持久化到数据库中,也就是事务提交,对数据的新增、更新将会持久化到书库中。

「我:」 在我的理解中,原子性、隔离性、持久性都是为了保障一致性而存在的,一致性也是最终的目的。

心里暗自欢喜,背完了,平时背的多,面试就会说,幸好难不倒我。

ACID原理

「面试官:」 刚才你说原子性是基于日志的Redo/Undo机制,你能说一说Redo/Undo机制吗?

啊哈?我都说了什么,不小心给自己埋了一颗大雷。不慌,哥脑子里还有货,假装若有所思的停了几十秒,接着背。

「我:」 Redo/Undo机制比较简单,它们将所有对数据的更新操作都写到日志中。

「我:」 Redo log用来记录某数据块被修改后的值,可以用来恢复未写入 data file 的已成功事务更新的数据;Undo log是用来记录数据更新前的值,保证数据更新失败能够回滚。

「我:」 假如数据库在执行的过程中,不小心崩了,可以通过该日志的方式,回滚之前已经执行成功的操作,实现事务的一致性。

「面试官:」 可以举一个场景,说一下具体的实现流程吗?

「我:」 可以的,假如某个时刻数据库崩溃,在崩溃之前有事务A和事务B在执行,事务A已经提交,而事务B还未提交。当数据库重启进行 crash-recovery 时,就会通过Redo log将已经提交事务的更改写到数据文件,而还没有提交的就通过Undo log进行roll back。

事务隔离级别

「面试官:」 之前你还提到事务的隔离级别,你能说一说吗?

「我:」 可以的,在Mysql中事务的隔离级别分为四大等级,「读未提交(READ UNCOMMITTED)、读提交 (READ COMMITTED)、可重复读 (REPEATABLE READ)、串行化 (SERIALIZABLE)」。

「我:」 读未提交会读到另一个事务的未提交的数据,产生脏读问题,读提交则解决了脏读的,出现了不可重复读,即在一个事务任意时刻读到的数据可能不一样,可能会受到其它事务对数据修改提交后的影响,一般是对于update的操作。

「我:」 可重复读解决了之前不可重复读和脏读的问题,但是由带来了幻读的问题,幻读一般是针对inser操作。

「我:」 例如:第一个事务查询一个User表id=100发现不存在该数据行,这时第二个事务又进来了,新增了一条id=100的数据行并且提交了事务。

「我:」 这时第一个事务新增一条id=100的数据行会报主键冲突,第一个事务再select一下,发现id=100数据行已经存在,这就是幻读。

「面试官:」 小伙子你能演示一下吗?我不太会你能教教我吗?我电脑在这里,你演示我看一看。

男人的嘴骗人的鬼,我信你个鬼,你这糟老头子坏得很,出来装X总是要还的,只能默默含泪把它敲完。

「我:」 首先创建一个User表,最为一个测试表,测试表里面有三个字段,并插入两条测试数据。

1
2
3
4
5
6
7
复制代码CREATE TABLE User (
id INT(11) NOT NULL PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(20),
age INT DEFAULT 0
) ENGINE=InnoDB DEFAULT CHARSET=gb2312;
`INSERT INTO `user` VALUES (1, 'zhangsan', 23);
INSERT INTO `user` VALUES (2, 'lisi', 20);`

「我:」 再Mysql中可以先查询一下他的默认隔离级别,可以看出Mysql的默认隔离级别是REPEATABLE-READ。

「我:」 先来演示一下读未提交,先把默认的隔离级别修改为READ UNCOMMITTED。

「我:」 他设置隔离级别的语句中set global transaction isolation level read uncommitted,这里的global也可以换成session,global表示全局的,而session表示当前会话,也就是当前窗口有效。

「我:」 当设置完隔离级别后对于之前打开的会话,是无效的,要重新打开一个窗口设置隔离级别才生效。

「我:」 然后是开启事务,Mysql中开启事务有两种方式begin/start transaction,最后提交事务执行commit,或者回滚事务rollback。

「我:」 在执行begin/start transaction命令,它们并不是一个事务的起点,在执行完它们后的第一个sql语句,才表示事务真正的启动 。

「我:」 这里直接打开两个新的窗口,同时开启事务,再第一个窗口先update一个id=1的数据行name改为’非科班的科班’,执行成功。

「我:」 然后再第二个窗口执行两次的查询,分别是窗口一update之前的查询和update之后的查询。

「我:」 第一个session产生的未提交的事务的状态就会直接影响到第二sesison,也就是脏读。

「我:」 对于读提交也是一样的,开启事务后,第一个事务先执行查询数据,然后第二个session执行update操作,但是还没有commit,这是第一个session再次select,数据并没有改变,再第二个session执行commit之后,第一个session再次select就是改变后的数据了。

「我:」 这样第一个事务的查询结果就会收到第二事务的影响,这个也就是产生不可重复读的问题。

「面试官:」 小伙子你能画一下他执行的过程图吗?你讲的我有点乱,我还没有彻底明白。

我心里一万只什么马在飞过,欲哭无泪,这面试官真难伺候,说时迟那时快,从左屁股兜抽出笔,从右屁股兜拿出纸,开始画。

「我:」 这个是读提交的时间轴图,读未提交的时间轴突,原理也一样的,第二个select的时候数据就已经改变了。

这是面试官拿过我的图看了一点,微微的点了点头,嘴角露出思思的笑意,我想你这糟老头子应该不会再刁难我了吧。

「面试官:」 嗯,你接着演示你的可重复读吧。

「我:」 嗯,好的,然后就是可重复读,和之前一样的操作。

「我:」 将两个session开启为REPEATABLE READ,同时开启事务,再第一个事务中先select,然后在第二个事务里面update数据行,可以发现即使第二个事务已经commit,第一个事务再次select数据也还是没有改变,这就解决了不可重复读的问题。

「我:」 这里有个不同的地方就是在Mysql中,默认的不可重复读个隔离级别也解决了幻读的问题。

「我:」 从上面的演示中可以看出第一个事务中先select一个id=3的数据行,这条数据行是不存在的,返回Empty set,然后第二个事务中insert一条id=3的数据行并且commit,第一个事务中再次select的,数据也好是没有id=3的数据行。

「我:」 最后的串行化,样式步骤也是一样的,结果也和Mysql中默认的个可重复读隔离级别的结果一样,串行化的执行流程相当于把事务的执行过程变为顺序执行,我这边就不再做演示了。

「我:」 这四大等级从上到下,隔离的效果是逐渐增强,但是性能却是越来越差。

Mysql的锁机制

「面试官:」 哦?性能越来越差?为什么会性能越来越差?你能说一说原因吗?

哎呀,我这嘴,少说一句会死啊,这下好了,这个得说底层实现原理了,从原来得假装若有所思,变成了真正得若有所思。

「我:」 这个得从Mysq的锁说起,在Mysql中的锁可以分为分「享锁/读锁(Shared Locks)」、「排他锁/写锁(Exclusive Locks)」 、「间隙锁」、「行锁(Record Locks)」、「表锁」。

「我:」 在四个隔离级别中加锁肯定是要消耗性能的,而读未提交是没有加任何锁的,所以对于它来说也就是没有隔离的效果,所以它的性能也是最好的。

「我:」 对于串行化加的是一把大锁,读的时候加共享锁,不能写,写的时候,家的是排它锁,阻塞其它事务的写入和读取,若是其它的事务长时间不能写入就会直接报超时,所以它的性能也是最差的,对于它来就没有什么并发性可言。

「我:」 对于读提交和可重复读,他们俩的实现是兼顾解决数据问题,然后又要有一定的并发行,所以在实现上锁机制会比串行化优化很多,提高并发性,所以性能也会比较好。

「我:」 他们俩的底层实现采用的是MVCC(多版本并发控制)方式进行实现。

「面试官:」 你能先说一下先这几个锁的概念吗?我不是很懂,说说你的理解。

「我:」 哦,好的,共享锁是针对同一份数据,多个读操作可以同时进行,简单来说即读加锁,不能写并且可并行读;排他锁针对写操作,假如当前写操作没有完成,那么它会阻断其它的写锁和读锁,即写加锁,其它读写都阻塞 。

「我:」 而行锁和表锁,是从锁的粒度上进行划分的,行锁锁定当前数据行,锁的粒度小,加锁慢,发生锁冲突的概率小,并发度高,行锁也是MyISAM和InnoDB的区别之一,InnoDB支持行锁并且支持事务 。

「我:」 而表锁则锁的粒度大,加锁快,开销小,但是锁冲突的概率大,并发度低。

「我:」 间隙锁则分为两种:Gap Locks和Next-Key Locks。Gap Locks会锁住两个索引之间的区间,比如select * from User where id>3 and id<5 for update,就会在区间(3,5)之间加上Gap Locks。

「我:」 Next-Key Locks是Gap Locks+Record Locks形成闭区间锁select * from User where id>=3 and id=<5 for update,就会在区间[3,5]之间加上Next-Key Locks。

「面试官:」 那Mysql中什么时候会加锁呢?

「我:」 在数据库的增、删、改、查中,只有增、删、改才会加上排它锁,而只是查询并不会加锁,只能通过在select语句后显式加lock in share mode或者for update来加共享锁或者排它锁。

事务底层实现原理

「面试官:」 你在上面提到MVCC(多版本并发控制),你能说一说原理吗?

「我:」 在实现MVCC时用到了一致性视图,用于支持读提交和可重复读的实现。

「我:」 在实现可重复读的隔离级别,只需要在事务开始的时候创建一致性视图,也叫做快照,之后的查询里都共用这个一致性视图,后续的事务对数据的更改是对当前事务是不可见的,这样就实现了可重复读。

「我:」 而读提交,每一个语句执行前都会重新计算出一个新的视图,这个也是可重复读和读提交在MVCC实现层面上的区别。

「面试官:」 那你知道快照(视图)在MVCC底层是怎么工作的吗?

「我:」 在InnoDB 中每一个事务都有一个自己的事务id,并且是唯一的,递增的 。

「我:」 对于Mysql中的每一个数据行都有可能存在多个版本,在每次事务更新数据的时候,都会生成一个新的数据版本,并且把自己的数据id赋值给当前版本的row trx_id。

「面试官:」 小伙子你可以画个图我看看吗?我不是很明白。

我有什么办法呢?完全没办法,只能又从屁股兜李拿出笔和纸,迅速的画了起来,相当这次面试要是不过血亏啊,浪费了我两张纸和笔水,多贵啊,只能豁出去了。


「我:」 如图中所示,假如三个事务更新了同一行数据,那么就会有对应的三个数据版本。

「我:」 实际上版本1、版本2并非实际物理存在的,而图中的U1和U2实际就是undo log,这v1和v2版本是根据当前v3和undo log计算出来的。

「面试官:」 那对于一个快照来说,你直到它要遵循什么规则吗?

「我:」 嗯,对于一个事务视图来说除了对自己更新的总是可见,另外还有三种情况:版本未提交的,都是不可见的;版本已经提交,但是是在创建视图之后提交的也是不可见的;版本已经提交,若是在创建视图之前提交的是可见的。

「面试官:」 假如两个事务执行写操作,又怎么保证并发呢?

「我:」 假如事务1和事务2都要执行update操作,事务1先update数据行的时候,先回获取行锁,锁定数据,当事务2要进行update操作的时候,也会取获取该数据行的行锁,但是已经被事务1占有,事务2只能wait。

「我:」 若是事务1长时间没有释放锁,事务2就会出现超时异常 。

「面试官:」 这个是在update的where后的条件是在有索引的情况下吧?

「我:」 嗯,是的 。

「面试官:」 那没有索引的条件下呢?没办法快速定位到数据行呢?

「我:」 若是没有索引的条件下,就获取所有行,都加上行锁,然后Mysql会再次过滤符合条件的的行并释放锁,只有符合条件的行才会继续持有锁。

「我:」 这样的性能消耗也会比较大。

「面试官:」 嗯嗯

此时面试官看看手表一个多钟已经过去了,也已经到了饭点时刻,我想他应该是肚子饿了,不会继续追问吧,两人持续僵了三十秒,他终于开口了。

「面试官:」 小伙子,现在时间也已经到了饭点了,今天的面试就到此结束吧,你回去等通知吧。

「我:」 。。。。。。。。。。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官:换人!他连 TCP 这几个参数都不懂

发表于 2020-06-08
每日一句英语学习,每天进步一点点:


前言

TCP 性能的提升不仅考察 TCP 的理论知识,还考察了对于操作系统提供的内核参数的理解与应用。

TCP 协议是由操作系统实现,所以操作系统提供了不少调节 TCP 的参数。

Linux TCP 参数

Linux TCP 参数

如何正确有效的使用这些参数,来提高 TCP 性能是一个不那么简单事情。我们需要针对 TCP 每个阶段的问题来对症下药,而不是病急乱投医。

接下来,将以三个角度来阐述提升 TCP 的策略,分别是:

  • TCP 三次握手的性能提升;
  • TCP 四次挥手的性能提升;
  • TCP 数据传输的性能提升;

本节提纲

本节提纲


正文

01 TCP 三次握手的性能提升

TCP 是面向连接的、可靠的、双向传输的传输层通信协议,所以在传输数据之前需要经过三次握手才能建立连接。

三次握手与数据传输

三次握手与数据传输

那么,三次握手的过程在一个 HTTP 请求的平均时间占比 10% 以上,在网络状态不佳、高并发或者遭遇 SYN 攻击等场景中,如果不能有效正确的调节三次握手中的参数,就会对性能产生很多的影响。

如何正确有效的使用这些参数,来提高 TCP 三次握手的性能,这就需要理解「三次握手的状态变迁」,这样当出现问题时,先用 netstat 命令查看是哪个握手阶段出现了问题,再来对症下药,而不是病急乱投医。

TCP 三次握手的状态变迁

TCP 三次握手的状态变迁

客户端和服务端都可以针对三次握手优化性能。主动发起连接的客户端优化相对简单些,而服务端需要监听端口,属于被动连接方,其间保持许多的中间状态,优化方法相对复杂一些。

所以,客户端(主动发起连接方)和服务端(被动连接方)优化的方式是不同的,接下来分别针对客户端和服务端优化。

客户端优化

三次握手建立连接的首要目的是「同步序列号」。

只有同步了序列号才有可靠传输,TCP 许多特性都依赖于序列号实现,比如流量控制、丢包重传等,这也是三次握手中的报文称为 SYN 的原因,SYN 的全称就叫 Synchronize Sequence Numbers(同步序列号)。

TCP 头部

TCP 头部

SYN_SENT 状态的优化

客户端作为主动发起连接方,首先它将发送 SYN 包,于是客户端的连接就会处于 SYN_SENT 状态。

客户端在等待服务端回复的 ACK 报文,正常情况下,服务器会在几毫秒内返回 SYN+ACK ,但如果客户端长时间没有收到 SYN+ACK 报文,则会重发 SYN 包,重发的次数由 tcp_syn_retries 参数控制,默认是 5 次:

通常,第一次超时重传是在 1 秒后,第二次超时重传是在 2 秒,第三次超时重传是在 4 秒后,第四次超时重传是在 8 秒后,第五次是在超时重传 16 秒后。没错,每次超时的时间是上一次的 2 倍。

当第五次超时重传后,会继续等待 32 秒,如果仍然服务端没有回应 ACK,客户端就会终止三次握手。

所以,总耗时是 1+2+4+8+16+32=63 秒,大约 1 分钟左右。

SYN 超时重传

SYN 超时重传

你可以根据网络的稳定性和目标服务器的繁忙程度修改 SYN 的重传次数,调整客户端的三次握手时间上限。比如内网中通讯时,就可以适当调低重试次数,尽快把错误暴露给应用程序。

服务端优化

当服务端收到 SYN 包后,服务端会立马回复 SYN+ACK 包,表明确认收到了客户端的序列号,同时也把自己的序列号发给对方。

此时,服务端出现了新连接,状态是 SYN_RCV。在这个状态下,Linux 内核就会建立一个「半连接队列」来维护「未完成」的握手信息,当半连接队列溢出后,服务端就无法再建立新的连接。

半连接队列与全连接队列

半连接队列与全连接队列

SYN 攻击,攻击的是就是这个半连接队列。

如何查看由于 SYN 半连接队列已满,而被丢弃连接的情况?

我们可以通过该 netstat -s 命令给出的统计结果中, 可以得到由于半连接队列已满,引发的失败次数:

上面输出的数值是累计值,表示共有多少个 TCP 连接因为半连接队列溢出而被丢弃。隔几秒执行几次,如果有上升的趋势,说明当前存在半连接队列溢出的现象。

如何调整 SYN 半连接队列大小?

要想增大半连接队列,不能只单纯增大 tcp_max_syn_backlog 的值,还需一同增大 somaxconn 和 backlog,也就是增大 accept 队列。否则,只单纯增大 tcp_max_syn_backlog 是无效的。

增大 tcp_max_syn_backlog 和 somaxconn 的方法是修改 Linux 内核参数:

增大 backlog 的方式,每个 Web 服务都不同,比如 Nginx 增大 backlog 的方法如下:

最后,改变了如上这些参数后,要重启 Nginx 服务,因为 SYN 半连接队列和 accept 队列都是在 listen() 初始化的。

如果 SYN 半连接队列已满,只能丢弃连接吗?

并不是这样,开启 syncookies 功能就可以在不使用 SYN 半连接队列的情况下成功建立连接。

syncookies 的工作原理:服务器根据当前状态计算出一个值,放在己方发出的 SYN+ACK 报文中发出,当客户端返回 ACK 报文时,取出该值验证,如果合法,就认为连接建立成功,如下图所示。

开启 syncookies 功能

开启 syncookies 功能

syncookies 参数主要有以下三个值:

  • 0 值,表示关闭该功能;
  • 1 值,表示仅当 SYN 半连接队列放不下时,再启用它;
  • 2 值,表示无条件开启功能;

那么在应对 SYN 攻击时,只需要设置为 1 即可:

SYN_RCV 状态的优化

当客户端接收到服务器发来的 SYN+ACK 报文后,就会回复 ACK 给服务器,同时客户端连接状态从 SYN_SENT 转换为 ESTABLISHED,表示连接建立成功。

服务器端连接成功建立的时间还要再往后,等到服务端收到客户端的 ACK 后,服务端的连接状态才变为 ESTABLISHED。

如果服务器没有收到 ACK,就会重发 SYN+ACK 报文,同时一直处于 SYN_RCV 状态。

当网络繁忙、不稳定时,报文丢失就会变严重,此时应该调大重发次数。反之则可以调小重发次数。修改重发次数的方法是,调整 tcp_synack_retries 参数:

tcp_synack_retries 的默认重试次数是 5 次,与客户端重传 SYN 类似,它的重传会经历 1、2、4、8、16 秒,最后一次重传后会继续等待 32 秒,如果服务端仍然没有收到 ACK,才会关闭连接,故共需要等待 63 秒。

服务器收到 ACK 后连接建立成功,此时,内核会把连接从半连接队列移除,然后创建新的完全的连接,并将其添加到 accept 队列,等待进程调用 accept 函数时把连接取出来。

如果进程不能及时地调用 accept 函数,就会造成 accept 队列(也称全连接队列)溢出,最终导致建立好的 TCP 连接被丢弃。

accept 队列溢出

accept 队列溢出

accept 队列已满,只能丢弃连接吗?

丢弃连接只是 Linux 的默认行为,我们还可以选择向客户端发送 RST 复位报文,告诉客户端连接已经建立失败。打开这一功能需要将 tcp_abort_on_overflow 参数设置为 1。

tcp_abort_on_overflow 共有两个值分别是 0 和 1,其分别表示:

  • 0 :如果 accept 队列满了,那么 server 扔掉 client 发过来的 ack ;
  • 1 :如果 accept 队列满了,server 发送一个 RST 包给 client,表示废掉这个握手过程和这个连接;

如果要想知道客户端连接不上服务端,是不是服务端 TCP 全连接队列满的原因,那么可以把 tcp_abort_on_overflow 设置为 1,这时如果在客户端异常中可以看到很多 connection reset by peer 的错误,那么就可以证明是由于服务端 TCP 全连接队列溢出的问题。

通常情况下,应当把 tcp_abort_on_overflow 设置为 0,因为这样更有利于应对突发流量。

举个例子,当 accept 队列满导致服务器丢掉了 ACK,与此同时,客户端的连接状态却是 ESTABLISHED,客户端进程就在建立好的连接上发送请求。只要服务器没有为请求回复 ACK,客户端的请求就会被多次「重发」。如果服务器上的进程只是短暂的繁忙造成 accept 队列满,那么当 accept 队列有空位时,再次接收到的请求报文由于含有 ACK,仍然会触发服务器端成功建立连接。

tcp_abort_on_overflow 为 0 可以应对突发流量

tcp_abort_on_overflow 为 0 可以应对突发流量

所以,tcp_abort_on_overflow 设为 0 可以提高连接建立的成功率,只有你非常肯定 TCP 全连接队列会长期溢出时,才能设置为 1 以尽快通知客户端。

如何调整 accept 队列的长度呢?

accept 队列的长度取决于 somaxconn 和 backlog 之间的最小值,也就是 min(somaxconn, backlog),其中:

  • somaxconn 是 Linux 内核的参数,默认值是 128,可以通过 net.core.somaxconn 来设置其值;
  • backlog 是 listen(int sockfd, int backlog) 函数中的 backlog 大小;

Tomcat、Nginx、Apache 常见的 Web 服务的 backlog 默认值都是 511。

如何查看服务端进程 accept 队列的长度?

可以通过 ss -ltn 命令查看:

  • Recv-Q:当前 accept 队列的大小,也就是当前已完成三次握手并等待服务端 accept() 的 TCP 连接;
  • Send-Q:accept 队列最大长度,上面的输出结果说明监听 8088 端口的 TCP 服务,accept 队列的最大长度为 128;

如何查看由于 accept 连接队列已满,而被丢弃的连接?

当超过了 accept 连接队列,服务端则会丢掉后续进来的 TCP 连接,丢掉的 TCP 连接的个数会被统计起来,我们可以使用 netstat -s 命令来查看:

上面看到的 41150 times ,表示 accept 队列溢出的次数,注意这个是累计值。可以隔几秒钟执行下,如果这个数字一直在增加的话,说明 accept 连接队列偶尔满了。

如果持续不断地有连接因为 accept 队列溢出被丢弃,就应该调大 backlog 以及 somaxconn 参数。

如何绕过三次握手?

以上我们只是在对三次握手的过程进行优化,接下来我们看看如何绕过三次握手发送数据。

三次握手建立连接造成的后果就是,HTTP 请求必须在一个 RTT(从客户端到服务器一个往返的时间)后才能发送。

常规 HTTP 请求

常规 HTTP 请求

在 Linux 3.7 内核版本之后,提供了 TCP Fast Open 功能,这个功能可以减少 TCP 连接建立的时延。

接下来说说,TCP Fast Open 功能的工作方式。

开启 TCP Fast Open 功能

开启 TCP Fast Open 功能

在客户端首次建立连接时的过程:

  1. 客户端发送 SYN 报文,该报文包含 Fast Open 选项,且该选项的 Cookie 为空,这表明客户端请求 Fast Open Cookie;
  2. 支持 TCP Fast Open 的服务器生成 Cookie,并将其置于 SYN-ACK 数据包中的 Fast Open 选项以发回客户端;
  3. 客户端收到 SYN-ACK 后,本地缓存 Fast Open 选项中的 Cookie。

所以,第一次发起 HTTP GET 请求的时候,还是需要正常的三次握手流程。

之后,如果客户端再次向服务器建立连接时的过程:

  1. 客户端发送 SYN 报文,该报文包含「数据」(对于非 TFO 的普通 TCP 握手过程,SYN 报文中不包含「数据」)以及此前记录的 Cookie;
  2. 支持 TCP Fast Open 的服务器会对收到 Cookie 进行校验:如果 Cookie 有效,服务器将在 SYN-ACK 报文中对 SYN 和「数据」进行确认,服务器随后将「数据」递送至相应的应用程序;如果 Cookie 无效,服务器将丢弃 SYN 报文中包含的「数据」,且其随后发出的 SYN-ACK 报文将只确认 SYN 的对应序列号;
  3. 如果服务器接受了 SYN 报文中的「数据」,服务器可在握手完成之前发送「数据」,这就减少了握手带来的 1 个 RTT 的时间消耗;
  4. 客户端将发送 ACK 确认服务器发回的 SYN 以及「数据」,但如果客户端在初始的 SYN 报文中发送的「数据」没有被确认,则客户端将重新发送「数据」;
  5. 此后的 TCP 连接的数据传输过程和非 TFO 的正常情况一致。

所以,之后发起 HTTP GET 请求的时候,可以绕过三次握手,这就减少了握手带来的 1 个 RTT 的时间消耗。

开启了 TFO 功能,cookie 的值是存放到 TCP option 字段里的:

TCP option 字段 - TFO

TCP option 字段 - TFO

注:客户端在请求并存储了 Fast Open Cookie 之后,可以不断重复 TCP Fast Open 直至服务器认为 Cookie 无效(通常为过期)。

Linux 下怎么打开 TCP Fast Open 功能呢?

在 Linux 系统中,可以通过设置 tcp_fastopn 内核参数,来打开 Fast Open 功能:

tcp_fastopn 各个值的意义:

  • 0 关闭
  • 1 作为客户端使用 Fast Open 功能
  • 2 作为服务端使用 Fast Open 功能
  • 3 无论作为客户端还是服务器,都可以使用 Fast Open 功能

TCP Fast Open 功能需要客户端和服务端同时支持,才有效果。

小结

本小结主要介绍了关于优化 TCP 三次握手的几个 TCP 参数。

三次握手优化策略

三次握手优化策略

客户端的优化

当客户端发起 SYN 包时,可以通过 tcp_syn_retries 控制其重传的次数。

服务端的优化

当服务端 SYN 半连接队列溢出后,会导致后续连接被丢弃,可以通过 netstat -s 观察半连接队列溢出的情况,如果 SYN 半连接队列溢出情况比较严重,可以通过 tcp_max_syn_backlog、somaxconn、backlog 参数来调整 SYN 半连接队列的大小。

服务端回复 SYN+ACK 的重传次数由 tcp_synack_retries 参数控制。如果遭受 SYN 攻击,应把 tcp_syncookies 参数设置为 1,表示仅在 SYN 队列满后开启 syncookie 功能,可以保证正常的连接成功建立。

服务端收到客户端返回的 ACK,会把连接移入 accpet 队列,等待进行调用 accpet() 函数取出连接。

可以通过 ss -lnt 查看服务端进程的 accept 队列长度,如果 accept 队列溢出,系统默认丢弃 ACK,如果可以把 tcp_abort_on_overflow 设置为 1 ,表示用 RST 通知客户端连接建立失败。

如果 accpet 队列溢出严重,可以通过 listen 函数的 backlog 参数和 somaxconn 系统参数提高队列大小,accept 队列长度取决于 min(backlog, somaxconn)。

绕过三次握手

TCP Fast Open 功能可以绕过三次握手,使得 HTTP 请求减少了 1 个 RTT 的时间,Linux 下可以通过 tcp_fastopen 开启该功能,同时必须保证服务端和客户端同时支持。


02 TCP 四次挥手的性能提升

接下来,我们一起看看针对 TCP 四次挥手关不连接时,如何优化性能。

在开始之前,我们得先了解四次挥手状态变迁的过程。

客户端和服务端双方都可以主动断开连接,通常先关闭连接的一方称为主动方,后关闭连接的一方称为被动方。

客户端主动关闭

客户端主动关闭

可以看到,四次挥手过程只涉及了两种报文,分别是 FIN 和 ACK:

  • FIN 就是结束连接的意思,谁发出 FIN 报文,就表示它将不会再发送任何数据,关闭这一方向上的传输通道;
  • ACK 就是确认的意思,用来通知对方:你方的发送通道已经关闭;

四次挥手的过程:

  • 当主动方关闭连接时,会发送 FIN 报文,此时发送方的 TCP 连接将从 ESTABLISHED 变成 FIN_WAIT1。
  • 当被动方收到 FIN 报文后,内核会自动回复 ACK 报文,连接状态将从 ESTABLISHED 变成 CLOSE_WAIT,表示被动方在等待进程调用 close 函数关闭连接。
  • 当主动方收到这个 ACK 后,连接状态由 FIN_WAIT1 变为 FIN_WAIT2,也就是表示主动方的发送通道就关闭了。
  • 当被动方进入 CLOSE_WAIT 时,被动方还会继续处理数据,等到进程的 read 函数返回 0 后,应用程序就会调用 close 函数,进而触发内核发送 FIN 报文,此时被动方的连接状态变为 LAST_ACK。
  • 当主动方收到这个 FIN 报文后,内核会回复 ACK 报文给被动方,同时主动方的连接状态由 FIN_WAIT2 变为 TIME_WAIT,在 Linux 系统下大约等待 1 分钟后,TIME_WAIT 状态的连接才会彻底关闭。
  • 当被动方收到最后的 ACK 报文后,被动方的连接就会关闭。

你可以看到,每个方向都需要一个 FIN 和一个 ACK,因此通常被称为四次挥手。

这里一点需要注意是:主动关闭连接的,才有 TIME_WAIT 状态。

主动关闭方和被动关闭方优化的思路也不同,接下来分别说说如何优化他们。

主动方的优化

关闭的连接的方式通常有两种,分别是 RST 报文关闭和 FIN 报文关闭。

如果进程异常退出了,内核就会发送 RST 报文来关闭,它可以不走四次挥手流程,是一个暴力关闭连接的方式。

安全关闭连接的方式必须通过四次挥手,它由进程调用 close 和 shutdown 函数发起 FIN 报文(shutdown 参数须传入 SHUT_WR 或者 SHUT_RDWR 才会发送 FIN)。

调用 close 函数 和 shutdown 函数有什么区别?

调用了 close 函数意味着完全断开连接,完全断开不仅指无法传输数据,而且也不能发送数据。 此时,调用了 close 函数的一方的连接叫做「孤儿连接」,如果你用 netstat -p 命令,会发现连接对应的进程名为空。

使用 close 函数关闭连接是不优雅的。于是,就出现了一种优雅关闭连接的 shutdown 函数,它可以控制只关闭一个方向的连接:

第二个参数决定断开连接的方式,主要有以下三种方式:

  • SHUT_RD(0):关闭连接的「读」这个方向,如果接收缓冲区有已接收的数据,则将会被丢弃,并且后续再收到新的数据,会对数据进行 ACK,然后悄悄地丢弃。也就是说,对端还是会接收到 ACK,在这种情况下根本不知道数据已经被丢弃了。
  • SHUT_WR(1):关闭连接的「写」这个方向,这就是常被称为「半关闭」的连接。如果发送缓冲区还有未发送的数据,将被立即发送出去,并发送一个 FIN 报文给对端。
  • SHUT_RDWR(2):相当于 SHUT_RD 和 SHUT_WR 操作各一次,关闭套接字的读和写两个方向。

close 和 shutdown 函数都可以关闭连接,但这两种方式关闭的连接,不只功能上有差异,控制它们的 Linux 参数也不相同。

FIN_WAIT1 状态的优化

主动方发送 FIN 报文后,连接就处于 FIN_WAIT1 状态,正常情况下,如果能及时收到被动方的 ACK,则会很快变为 FIN_WAIT2 状态。

但是当迟迟收不到对方返回的 ACK 时,连接就会一直处于 FIN_WAIT1 状态。此时,内核会定时重发 FIN 报文,其中重发次数由 tcp_orphan_retries 参数控制(注意,orphan 虽然是孤儿的意思,该参数却不只对孤儿连接有效,事实上,它对所有 FIN_WAIT1 状态下的连接都有效),默认值是 0。

你可能会好奇,这 0 表示几次?实际上当为 0 时,特指 8 次,从下面的内核源码可知:

如果 FIN_WAIT1 状态连接很多,我们就需要考虑降低 tcp_orphan_retries 的值,当重传次数超过 tcp_orphan_retries 时,连接就会直接关闭掉。

对于普遍正常情况时,调低 tcp_orphan_retries 就已经可以了。如果遇到恶意攻击,FIN 报文根本无法发送出去,这由 TCP 两个特性导致的:

  • 首先,TCP 必须保证报文是有序发送的,FIN 报文也不例外,当发送缓冲区还有数据没有发送时,FIN 报文也不能提前发送。
  • 其次,TCP 有流量控制功能,当接收方接收窗口为 0 时,发送方就不能再发送数据。所以,当攻击者下载大文件时,就可以通过接收窗口设为 0 ,这就会使得 FIN 报文都无法发送出去,那么连接会一直处于 FIN_WAIT1 状态。

解决这种问题的方法,是调整 tcp_max_orphans 参数,它定义了「孤儿连接」的最大数量:

当进程调用了 close 函数关闭连接,此时连接就会是「孤儿连接」,因为它无法在发送和接收数据。Linux 系统为了防止孤儿连接过多,导致系统资源长时间被占用,就提供了 tcp_max_orphans 参数。如果孤儿连接数量大于它,新增的孤儿连接将不再走四次挥手,而是直接发送 RST 复位报文强制关闭。

FIN_WAIT2 状态的优化

当主动方收到 ACK 报文后,会处于 FIN_WAIT2 状态,就表示主动方的发送通道已经关闭,接下来将等待对方发送 FIN 报文,关闭对方的发送通道。

这时,如果连接是用 shutdown 函数关闭的,连接可以一直处于 FIN_WAIT2 状态,因为它可能还可以发送或接收数据。但对于 close 函数关闭的孤儿连接,由于无法在发送和接收数据,所以这个状态不可以持续太久,而 tcp_fin_timeout 控制了这个状态下连接的持续时长,默认值是 60 秒:

它意味着对于孤儿连接(调用 close 关闭的连接),如果在 60 秒后还没有收到 FIN 报文,连接就会直接关闭。

这个 60 秒不是随便决定的,它与 TIME_WAIT 状态持续的时间是相同的,后面我们在来说说为什么是 60 秒。

TIME_WAIT 状态的优化

TIME_WAIT 是主动方四次挥手的最后一个状态,也是最常遇见的状态。

当收到被动方发来的 FIN 报文后,主动方会立刻回复 ACK,表示确认对方的发送通道已经关闭,接着就处于 TIME_WAIT 状态。在 Linux 系统,TIME_WAIT 状态会持续 60 秒后才会进入关闭状态。

TIME_WAIT 状态的连接,在主动方看来确实快已经关闭了。然后,被动方没有收到 ACK 报文前,还是处于 LAST_ACK 状态。如果这个 ACK 报文没有到达被动方,被动方就会重发 FIN 报文。重发次数仍然由前面介绍过的 tcp_orphan_retries 参数控制。

TIME-WAIT 的状态尤其重要,主要是两个原因:

  • 防止具有相同「四元组」的「旧」数据包被收到;
  • 保证「被动关闭连接」的一方能被正确的关闭,即保证最后的 ACK 能让被动关闭方接收,从而帮助其正常关闭;

原因一:防止旧连接的数据包

TIME-WAIT 的一个作用是防止收到历史数据,从而导致数据错乱的问题。

假设 TIME-WAIT 没有等待时间或时间过短,被延迟的数据包抵达后会发生什么呢?

接收到历史数据的异常

接收到历史数据的异常

  • 如上图黄色框框服务端在关闭连接之前发送的 SEQ = 301 报文,被网络延迟了。
  • 这时有相同端口的 TCP 连接被复用后,被延迟的 SEQ = 301 抵达了客户端,那么客户端是有可能正常接收这个过期的报文,这就会产生数据错乱等严重的问题。

所以,TCP 就设计出了这么一个机制,经过 2MSL 这个时间,足以让两个方向上的数据包都被丢弃,使得原来连接的数据包在网络中都自然消失,再出现的数据包一定都是新建立连接所产生的。

原因二:保证连接正确关闭

TIME-WAIT 的另外一个作用是等待足够的时间以确保最后的 ACK 能让被动关闭方接收,从而帮助其正常关闭。

假设 TIME-WAIT 没有等待时间或时间过短,断开连接会造成什么问题呢?

没有确保正常断开的异常

没有确保正常断开的异常

  • 如上图红色框框客户端四次挥手的最后一个 ACK 报文如果在网络中被丢失了,此时如果客户端 TIME-WAIT 过短或没有,则就直接进入了 CLOSE 状态了,那么服务端则会一直处在 LASE-ACK 状态。
  • 当客户端发起建立连接的 SYN 请求报文后,服务端会发送 RST 报文给客户端,连接建立的过程就会被终止。

我们再回过头来看看,为什么 TIME_WAIT 状态要保持 60 秒呢?这与孤儿连接 FIN_WAIT2 状态默认保留 60 秒的原理是一样的,因为这两个状态都需要保持 2MSL 时长。MSL 全称是 Maximum Segment Lifetime,它定义了一个报文在网络中的最长生存时间(报文每经过一次路由器的转发,IP 头部的 TTL 字段就会减 1,减到 0 时报文就被丢弃,这就限制了报文的最长存活时间)。

为什么是 2 MSL 的时长呢?这其实是相当于至少允许报文丢失一次。比如,若 ACK 在一个 MSL 内丢失,这样被动方重发的 FIN 会在第 2 个 MSL 内到达,TIME_WAIT 状态的连接可以应对。

为什么不是 4 或者 8 MSL 的时长呢?你可以想象一个丢包率达到百分之一的糟糕网络,连续两次丢包的概率只有万分之一,这个概率实在是太小了,忽略它比解决它更具性价比。

因此,TIME_WAIT 和 FIN_WAIT2 状态的最大时长都是 2 MSL,由于在 Linux 系统中,MSL 的值固定为 30 秒,所以它们都是 60 秒。

虽然 TIME_WAIT 状态有存在的必要,但它毕竟会消耗系统资源。如果发起连接一方的 TIME_WAIT 状态过多,占满了所有端口资源,则会导致无法创建新连接。

  • 客户端受端口资源限制:如果客户端 TIME_WAIT 过多,就会导致端口资源被占用,因为端口就65536个,被占满就会导致无法创建新的连接;
  • 服务端受系统资源限制:由于一个 四元组表示TCP连接,理论上服务端可以建立很多连接,服务端确实只监听一个端口 但是会把连接扔给处理线程,所以理论上监听的端口可以继续监听。但是线程池处理不了那么多一直不断的连接了。所以当服务端出现大量 TIME_WAIT 时,系统资源被占满时,会导致处理不过来新的连接;

另外,Linux 提供了 tcp_max_tw_buckets 参数,当 TIME_WAIT 的连接数量超过该参数时,新关闭的连接就不再经历 TIME_WAIT 而直接关闭:

当服务器的并发连接增多时,相应地,同时处于 TIME_WAIT 状态的连接数量也会变多,此时就应当调大 tcp_max_tw_buckets 参数,减少不同连接间数据错乱的概率。

tcp_max_tw_buckets 也不是越大越好,毕竟内存和端口都是有限的。

有一种方式可以在建立新连接时,复用处于 TIME_WAIT 状态的连接,那就是打开 tcp_tw_reuse 参数。但是需要注意,该参数是只用于客户端(建立连接的发起方),因为是在调用 connect() 时起作用的,而对于服务端(被动连接方)是没有用的。

tcp_tw_reuse 从协议角度理解是安全可控的,可以复用处于 TIME_WAIT 的端口为新的连接所用。

什么是协议角度理解的安全可控呢?主要有两点:

  • 只适用于连接发起方,也就是 C/S 模型中的客户端;
  • 对应的 TIME_WAIT 状态的连接创建时间超过 1 秒才可以被复用。

使用这个选项,还有一个前提,需要打开对 TCP 时间戳的支持(对方也要打开 ):

由于引入了时间戳,它能带来了些好处:

  • 我们在前面提到的 2MSL 问题就不复存在了,因为重复的数据包会因为时间戳过期被自然丢弃;
  • 同时,它还可以防止序列号绕回,也是因为重复的数据包会由于时间戳过期被自然丢弃;

时间戳是在 TCP 的选择字段里定义的,开启了时间戳功能,在 TCP 报文传输的时候会带上发送报文的时间戳。

TCP option 字段 - 时间戳

TCP option 字段 - 时间戳

我们来看看开启了 tcp_tw_reuse 功能,如果四次挥手中的最后一次 ACK 在网络中丢失了,会发生什么?

四次挥手中的最后一次 ACK 在网络中丢失

四次挥手中的最后一次 ACK 在网络中丢失

上图的流程:

  • 四次挥手中的最后一次 ACK 在网络中丢失了,服务端一直处于 LAST_ACK 状态;
  • 客户端由于开启了 tcp_tw_reuse 功能,客户再次发起新连接的时候,会复用超过 1 秒后的 time_wait 状态的连接。但客户端新发的 SYN 包会被忽略(由于时间戳),因为服务端比较了客户端的上一个报文与 SYN 报文的时间戳,过期的报文就会被服务端丢弃;
  • 服务端 FIN 报文迟迟没有收到四次挥手的最后一次 ACK,于是超时重发了 FIN 报文给客户端;
  • 处于 SYN_SENT 状态的客户,由于收到了 FIN 报文,则会回 RST 给服务端,于是服务端就离开了 LAST_ACK 状态;
  • 最初的客户端 SYN 报文超时重发了( 1 秒钟后),此时就与服务端能正确的三次握手了。

所以大家都会说开启了 tcp_tw_reuse,可以在复用了 time_wait 状态的 1 秒过后成功建立连接,这 1 秒主要是花费在 SYN 包重传。

另外,老版本的 Linux 还提供了 tcp_tw_recycle 参数,但是当开启了它,就有两个坑:

  • Linux 会加快客户端和服务端 TIME_WAIT 状态的时间,也就是它会使得 TIME_WAIT 状态会小于 60 秒,很容易导致数据错乱;
  • 另外,Linux 会丢弃所有来自远端时间戳小于上次记录的时间戳(由同一个远端发送的)的任何数据包。就是说要使用该选项,则必须保证数据包的时间戳是单调递增的。那么,问题在于,此处的时间戳并不是我们通常意义上面的绝对时间,而是一个相对时间。很多情况下,我们是没法保证时间戳单调递增的,比如使用了 NAT、LVS 等情况;

所以,不建议设置为 1 ,在 Linux 4.12 版本后,Linux 内核直接取消了这一参数,建议关闭它:

另外,我们可以在程序中设置 socket 选项,来设置调用 close 关闭连接行为。

如果 l_onoff 为非 0, 且 l_linger 值为 0,那么调用 close 后,会立该发送一个 RST 标志给对端,该 TCP 连接将跳过四次挥手,也就跳过了 TIME_WAIT 状态,直接关闭。

但这为跨越 TIME_WAIT 状态提供了一个可能,不过是一个非常危险的行为,不值得提倡。

被动方的优化

当被动方收到 FIN 报文时,内核会自动回复 ACK,同时连接处于 CLOSE_WAIT 状态,顾名思义,它表示等待应用进程调用 close 函数关闭连接。

内核没有权利替代进程去关闭连接,因为如果主动方是通过 shutdown 关闭连接,那么它就是想在半关闭连接上接收数据或发送数据。因此,Linux 并没有限制 CLOSE_WAIT 状态的持续时间。

当然,大多数应用程序并不使用 shutdown 函数关闭连接。所以,当你用 netstat 命令发现大量 CLOSE_WAIT 状态。就需要排查你的应用程序,因为可能因为应用程序出现了 Bug,read 函数返回 0 时,没有调用 close 函数。

处于 CLOSE_WAIT 状态时,调用了 close 函数,内核就会发出 FIN 报文关闭发送通道,同时连接进入 LAST_ACK 状态,等待主动方返回 ACK 来确认连接关闭。

如果迟迟收不到这个 ACK,内核就会重发 FIN 报文,重发次数仍然由 tcp_orphan_retries 参数控制,这与主动方重发 FIN 报文的优化策略一致。

还有一点我们需要注意的,如果被动方迅速调用 close 函数,那么被动方的 ACK 和 FIN 有可能在一个报文中发送,这样看起来,四次挥手会变成三次挥手,这只是一种特殊情况,不用在意。

如果连接双方同时关闭连接,会怎么样?

由于 TCP 是双全工的协议,所以是会出现两方同时关闭连接的现象,也就是同时发送了 FIN 报文。

此时,上面介绍的优化策略仍然适用。两方发送 FIN 报文时,都认为自己是主动方,所以都进入了 FIN_WAIT1 状态,FIN 报文的重发次数仍由 tcp_orphan_retries 参数控制。

同时关闭

同时关闭

接下来,双方在等待 ACK 报文的过程中,都等来了 FIN 报文。这是一种新情况,所以连接会进入一种叫做 CLOSING 的新状态,它替代了 FIN_WAIT2 状态。接着,双方内核回复 ACK 确认对方发送通道的关闭后,进入 TIME_WAIT 状态,等待 2MSL 的时间后,连接自动关闭。

小结

针对 TCP 四次挥手的优化,我们需要根据主动方和被动方四次挥手状态变化来调整系统 TCP 内核参数。

四次挥手的优化策略

四次挥手的优化策略

主动方的优化

主动发起 FIN 报文断开连接的一方,如果迟迟没收到对方的 ACK 回复,则会重传 FIN 报文,重传的次数由 tcp_orphan_retries 参数决定。

当主动方收到 ACK 报文后,连接就进入 FIN_WAIT2 状态,根据关闭的方式不同,优化的方式也不同:

  • 如果这是 close 函数关闭的连接,那么它就是孤儿连接。如果 tcp_fin_timeout 秒内没有收到对方的 FIN 报文,连接就直接关闭。同时,为了应对孤儿连接占用太多的资源,tcp_max_orphans 定义了最大孤儿连接的数量,超过时连接就会直接释放。
  • 反之是 shutdown 函数关闭的连接,则不受此参数限制;

当主动方接收到 FIN 报文,并返回 ACK 后,主动方的连接进入 TIME_WAIT 状态。这一状态会持续 1 分钟,为了防止 TIME_WAIT 状态占用太多的资源,tcp_max_tw_buckets 定义了最大数量,超过时连接也会直接释放。

当 TIME_WAIT 状态过多时,还可以通过设置 tcp_tw_reuse 和 tcp_timestamps 为 1 ,将 TIME_WAIT 状态的端口复用于作为客户端的新连接,注意该参数只适用于客户端。

被动方的优化

被动关闭的连接方应对非常简单,它在回复 ACK 后就进入了 CLOSE_WAIT 状态,等待进程调用 close 函数关闭连接。因此,出现大量 CLOSE_WAIT 状态的连接时,应当从应用程序中找问题。

当被动方发送 FIN 报文后,连接就进入 LAST_ACK 状态,在未等到 ACK 时,会在 tcp_orphan_retries 参数的控制下重发 FIN 报文。


03 TCP 传输数据的性能提升

在前面介绍的是三次握手和四次挥手的优化策略,接下来主要介绍的是 TCP 传输数据时的优化策略。

TCP 连接是由内核维护的,内核会为每个连接建立内存缓冲区:

  • 如果连接的内存配置过小,就无法充分使用网络带宽,TCP 传输效率就会降低;
  • 如果连接的内存配置过大,很容易把服务器资源耗尽,这样就会导致新连接无法建立;

因此,我们必须理解 Linux 下 TCP 内存的用途,才能正确地配置内存大小。

滑动窗口是如何影响传输速度的?

TCP 会保证每一个报文都能够抵达对方,它的机制是这样:报文发出去后,必须接收到对方返回的确认报文 ACK,如果迟迟未收到,就会超时重发该报文,直到收到对方的 ACK 为止。

所以,TCP 报文发出去后,并不会立马从内存中删除,因为重传时还需要用到它。

由于 TCP 是内核维护的,所以报文存放在内核缓冲区。如果连接非常多,我们可以通过 free 命令观察到 buff/cache 内存是会增大。

如果 TCP 是每发送一个数据,都要进行一次确认应答。当上一个数据包收到了应答了, 再发送下一个。这个模式就有点像我和你面对面聊天,你一句我一句,但这种方式的缺点是效率比较低的。

按数据包进行确认应答

按数据包进行确认应答

所以,这样的传输方式有一个缺点:数据包的往返时间越长,通信的效率就越低。

要解决这一问题不难,并行批量发送报文,再批量确认报文即刻。

并行处理

并行处理

然而,这引出了另一个问题,发送方可以随心所欲的发送报文吗?当然这不现实,我们还得考虑接收方的处理能力。

当接收方硬件不如发送方,或者系统繁忙、资源紧张时,是无法瞬间处理这么多报文的。于是,这些报文只能被丢掉,使得网络效率非常低。

为了解决这种现象发生,TCP 提供一种机制可以让「发送方」根据「接收方」的实际接收能力控制发送的数据量,这就是滑动窗口的由来。

接收方根据它的缓冲区,可以计算出后续能够接收多少字节的报文,这个数字叫做接收窗口。当内核接收到报文时,必须用缓冲区存放它们,这样剩余缓冲区空间变小,接收窗口也就变小了;当进程调用 read 函数后,数据被读入了用户空间,内核缓冲区就被清空,这意味着主机可以接收更多的报文,接收窗口就会变大。

因此,接收窗口并不是恒定不变的,接收方会把当前可接收的大小放在 TCP 报文头部中的窗口字段,这样就可以起到窗口大小通知的作用。

发送方的窗口等价于接收方的窗口吗?如果不考虑拥塞控制,发送方的窗口大小「约等于」接收方的窗口大小,因为窗口通知报文在网络传输是存在时延的,所以是约等于的关系。

TCP 头部

TCP 头部

从上图中可以看到,窗口字段只有 2 个字节,因此它最多能表达 65535 字节大小的窗口,也就是 64KB 大小。

这个窗口大小最大值,在当今高速网络下,很明显是不够用的。所以后续有了扩充窗口的方法:在 TCP 选项(option)字段定义了窗口扩大因子,用于扩大 TCP 通告窗口,其值大小是 2^14,这样就使 TCP 的窗口大小从 16 位扩大为 30 位(2^16 * 2^ 14 = 2^30),所以此时窗口的最大值可以达到 1GB。

TCP option 选项 - 窗口扩展

TCP option 选项 - 窗口扩展

Linux 中打开这一功能,需要把 tcp_window_scaling 配置设为 1(默认打开):

要使用窗口扩大选项,通讯双方必须在各自的 SYN 报文中发送这个选项:

  • 主动建立连接的一方在 SYN 报文中发送这个选项;
  • 而被动建立连接的一方只有在收到带窗口扩大选项的 SYN 报文之后才能发送这个选项。

这样看来,只要进程能及时地调用 read 函数读取数据,并且接收缓冲区配置得足够大,那么接收窗口就可以无限地放大,发送方也就无限地提升发送速度。

这是不可能的,因为网络的传输能力是有限的,当发送方依据发送窗口,发送超过网络处理能力的报文时,路由器会直接丢弃这些报文。因此,缓冲区的内存并不是越大越好。

如果确定最大传输速度?

在前面我们知道了 TCP 的传输速度,受制于发送窗口与接收窗口,以及网络设备传输能力。其中,窗口大小由内核缓冲区大小决定。如果缓冲区与网络传输能力匹配,那么缓冲区的利用率就达到了最大化。

问题来了,如何计算网络的传输能力呢?

相信大家都知道网络是有「带宽」限制的,带宽描述的是网络传输能力,它与内核缓冲区的计量单位不同:

  • 带宽是单位时间内的流量,表达是「速度」,比如常见的带宽 100 MB/s;
  • 缓冲区单位是字节,当网络速度乘以时间才能得到字节数;

这里需要说一个概念,就是带宽时延积,它决定网络中飞行报文的大小,它的计算方式:

比如最大带宽是 100 MB/s,网络时延(RTT)是 10ms 时,意味着客户端到服务端的网络一共可以存放 100MB/s * 0.01s = 1MB 的字节。

这个 1MB 是带宽和时延的乘积,所以它就叫「带宽时延积」(缩写为 BDP,Bandwidth Delay Product)。同时,这 1MB 也表示「飞行中」的 TCP 报文大小,它们就在网络线路、路由器等网络设备上。如果飞行报文超过了 1 MB,就会导致网络过载,容易丢包。

由于发送缓冲区大小决定了发送窗口的上限,而发送窗口又决定了「已发送未确认」的飞行报文的上限。因此,发送缓冲区不能超过「带宽时延积」。

发送缓冲区与带宽时延积的关系:

  • 如果发送缓冲区「超过」带宽时延积,超出的部分就没办法有效的网络传输,同时导致网络过载,容易丢包;
  • 如果发送缓冲区「小于」带宽时延积,就不能很好的发挥出网络的传输效率。

所以,发送缓冲区的大小最好是往带宽时延积靠近。

怎样调整缓冲区大小?

在 Linux 中发送缓冲区和接收缓冲都是可以用参数调节的。设置完后,Linux 会根据你设置的缓冲区进行动态调节。

调节发送缓冲区范围

先来看看发送缓冲区,它的范围通过 tcp_wmem 参数配置;

上面三个数字单位都是字节,它们分别表示:

  • 第一个数值是动态范围的最小值,4096 byte = 4K;
  • 第二个数值是初始默认值,87380 byte ≈ 86K;
  • 第三个数值是动态范围的最大值,4194304 byte = 4096K(4M);

发送缓冲区是自行调节的,当发送方发送的数据被确认后,并且没有新的数据要发送,就会把发送缓冲区的内存释放掉。

调节接收缓冲区范围

而接收缓冲区的调整就比较复杂一些,先来看看设置接收缓冲区范围的 tcp_rmem 参数:

上面三个数字单位都是字节,它们分别表示:

  • 第一个数值是动态范围的最小值,表示即使在内存压力下也可以保证的最小接收缓冲区大小,4096 byte = 4K;
  • 第二个数值是初始默认值,87380 byte ≈ 86K;
  • 第三个数值是动态范围的最大值,6291456 byte = 6144K(6M);

接收缓冲区可以根据系统空闲内存的大小来调节接收窗口:

  • 如果系统的空闲内存很多,就可以自动把缓冲区增大一些,这样传给对方的接收窗口也会变大,因而提升发送方发送的传输数据数量;
  • 反正,如果系统的内存很紧张,就会减少缓冲区,这虽然会降低传输效率,可以保证更多的并发连接正常工作;

发送缓冲区的调节功能是自动开启的,而接收缓冲区则需要配置 tcp_moderate_rcvbuf 为 1 来开启调节功能:

调节 TCP 内存范围

接收缓冲区调节时,怎么知道当前内存是否紧张或充分呢?这是通过 tcp_mem 配置完成的:

上面三个数字单位不是字节,而是「页面大小」,1 页表示 4KB,它们分别表示:

  • 当 TCP 内存小于第 1 个值时,不需要进行自动调节;
  • 在第 1 和第 2 个值之间时,内核开始调节接收缓冲区的大小;
  • 大于第 3 个值时,内核不再为 TCP 分配新内存,此时新连接是无法建立的;

一般情况下这些值是在系统启动时根据系统内存数量计算得到的。根据当前 tcp_mem 最大内存页面数是 177120,当内存为 (177120 * 4) / 1024K ≈ 692M 时,系统将无法为新的 TCP 连接分配内存,即 TCP 连接将被拒绝。

根据实际场景调节的策略

在高并发服务器中,为了兼顾网速与大量的并发连接,我们应当保证缓冲区的动态调整的最大值达到带宽时延积,而最小值保持默认的 4K 不变即可。而对于内存紧张的服务而言,调低默认值是提高并发的有效手段。

同时,如果这是网络 IO 型服务器,那么,调大 tcp_mem 的上限可以让 TCP 连接使用更多的系统内存,这有利于提升并发能力。需要注意的是,tcp_wmem 和 tcp_rmem 的单位是字节,而 tcp_mem 的单位是页面大小。而且,千万不要在 socket 上直接设置 SO_SNDBUF 或者 SO_RCVBUF,这样会关闭缓冲区的动态调整功能。

小结

本节针对 TCP 优化数据传输的方式,做了一些介绍。

数据传输的优化策略

数据传输的优化策略

TCP 可靠性是通过 ACK 确认报文实现的,又依赖滑动窗口提升了发送速度也兼顾了接收方的处理能力。

可是,默认的滑动窗口最大值只有 64 KB,不满足当今的高速网络的要求,要想要想提升发送速度必须提升滑动窗口的上限,在 Linux 下是通过设置 tcp_window_scaling 为 1 做到的,此时最大值可高达 1GB。

滑动窗口定义了网络中飞行报文的最大字节数,当它超过带宽时延积时,网络过载,就会发生丢包。而当它小于带宽时延积时,就无法充分利用网络带宽。因此,滑动窗口的设置,必须参考带宽时延积。

内核缓冲区决定了滑动窗口的上限,缓冲区可分为:发送缓冲区 tcp_wmem 和接收缓冲区 tcp_rmem。

Linux 会对缓冲区动态调节,我们应该把缓冲区的上限设置为带宽时延积。发送缓冲区的调节功能是自动打开的,而接收缓冲区需要把 tcp_moderate_rcvbuf 设置为 1 来开启。其中,调节的依据是 TCP 内存范围 tcp_mem。

但需要注意的是,如果程序中的 socket 设置 SO_SNDBUF 和 SO_RCVBUF,则会关闭缓冲区的动态整功能,所以不建议在程序设置它俩,而是交给内核自动调整比较好。

有效配置这些参数后,既能够最大程度地保持并发性,也能让资源充裕时连接传输速度达到最大值。


巨人的肩膀

[1] 系统性能调优必知必会.陶辉.极客时间.

[2] 网络编程实战专栏.盛延敏.极客时间.

[3] http://www.blogjava.net/yongboy/archive/2013/04/11/397677.html

[4] http://blog.itpub.net/31559359/viewspace-2284113/

[5] https://blog.51cto.com/professor/1909022

[6] https://vincent.bernat.ch/en/blog/2014-tcp-time-wait-state-linux


唠嗑唠嗑

跟大家说个沉痛的事情。

我想大部分小伙伴都发现了,最近公众号改版,订阅号里的信息流不再是以时间顺序了,而是以推荐算法方式显示顺序。

这对小林这种「周更」的作者,真的一次重重打击,非常的不友好。

因为长时间没发文,公众号可能会把推荐的权重降低,这就会导致很多读者,会收不到我的「最新」的推文,如此下去,那小林文章不就无人问津了?(抱头痛哭 …)

另外,小林更文时间长的原因,不是因为偷懒。

而是为了把知识点「写的更清楚,画的更清晰」,所以这必然会花费更多更长的时间。

如果你认可和喜欢小林的文章,不想错过文章的第一时间推送,可以动动你的小手手,给小林公众号一个「星标」。

平时没事,就让「小林coding」静静地躺在你的订阅号底部,但是你要知道它在这其间并非无所事事,而是在努力地准备着更好的内容,等准备好了,它自然会「蹦出」在你面前。

小林是专为大家图解的工具人,Goodbye,我们下次见!


读者问答

读者问:“小林,请教个问题,somaxconn和backlog是不是都是指的是accept队列?然后somaxconn是内核参数,backlog是通过系统调用间隔地修改somaxconn,比如Linux中listen()函数?”

两者取最小值才是 accpet 队列。

读者问:“小林,还有个问题要请教下,“如果 accept 队列满了,那么 server 扔掉 client 发过来的 ack”,也就是说该TCP连接还是位于半连接队列中,没有丢弃吗?”

  1. 当 accept 队列满了,后续新进来的syn包都会被丢失
  2. 我文章的突发流量例子是,那个连接进来的时候 accept 队列还没满,但是在第三次握手的时候,accept 队列突然满了,就会导致 ack 被丢弃,就一直处于半连接队列。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

熬夜之作:一文带你了解Cat分布式监控

发表于 2020-06-08

Cat 是什么?

CAT(Central Application Tracking)是基于 Java 开发的实时应用监控平台,包括实时应用监控,业务监控。

CAT 作为服务端项目基础组件,提供了 Java, C/C++, Node.js, Python, Go 等多语言客户端,已经在美团点评的基础架构中间件框架(MVC 框架,RPC 框架,数据库框架,缓存框架等,消息队列,配置系统等)深度集成,为美团点评各业务线提供系统丰富的性能指标、健康状况、实时告警等。

CAT 很大的优势是它是一个实时系统,CAT 大部分系统是分钟级统计,但是从数据生成到服务端处理结束是秒级别,秒级定义是 48 分钟 40 秒,基本上看到 48 分钟 38 秒数据,整体报表的统计粒度是分钟级;第二个优势,监控数据是全量统计,客户端预计算;链路数据是采样计算。

Github: https://github.com/dianping/cat[1]

Cat 功能亮点

  • 实时处理:信息的价值会随时间锐减,尤其是事故处理过程中
  • 全量数据:全量采集指标数据,便于深度分析故障案例
  • 高可用:故障的还原与问题定位,需要高可用监控来支撑
  • 故障容忍:故障不影响业务正常运转、对业务透明
  • 高吞吐:海量监控数据的收集,需要高吞吐能力做保证
  • 可扩展:支持分布式、跨 IDC 部署,横向扩展的监控系统

为什么要用 Cat?

场景一:用户反馈 App 无法下单,用户反馈无法支付,用户反馈商品无法搜索等问题

场景一的问题在于当系统出现问题后,第一反馈的总是用户。我们需要做的是什么,是在出问题后研发第一时间知晓,而不是让用户来告诉我们出问题了。

Cat 可以出故障后提供秒级别的异常告警机制,不用再等用户来反馈问题了。

场景二:出故障后如何快速定位问题

一般传统的方式当出现问题后,我们就会去服务器上看看服务是否还存活。如果存活就会看看日志是否有异常信息。

在 Cat 后台的首页,会展示各个系统的运行情况,如果有异常,则会大片飘红,非常明显。最直接的方式还是直接查看 Problem 报表,这里会为我们展示直接的异常信息,快速定位问题。

图片

图片

场景三:用户反馈订单列表要 10 几秒才展示,用户反馈下单一直在转圈圈

场景三属于优化相关,对于研发来说,优化是一个长期的过程,没有最好只有更好。优化除了需要有对应的方案,最重要的是要对症下药。

所谓的对症下药也就是在优化之前,你得先知道哪里比较慢。RPC 调用慢?数据库查询慢?缓存更新慢?

Cat 可以提供详细的性能数据,95 线,99 线等。更细粒度的就是可以看到某个请求或者某个业务方法的所有耗时逻辑,前提是你做了埋点操作。

Cat 报表

Cat 目前有五种报表,每种都有特定的应用场景,下面我们来具体聊聊这些报表的作用。

Transaction 报表

适用于监控一段代码运行情况,比如:运行次数、QPS、错误次数、失败率、响应时间统计(平均影响时间、Tp 分位值)等等场景。

埋点方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码public void shopService() {
Transaction transaction = Cat.newTransaction("ShopService", "Service");
try {
service();
transaction.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
transaction.setStatus(e); // catch 到异常,设置状态,代表此请求失败
Cat.logError(e); // 将异常上报到cat上
// 也可以选择向上抛出: throw e;
} finally {
transaction.complete();
}
}

可以在基础框架中对 Rpc, 数据库等框架进行埋点,这样就可以通过 Cat 来监控这些组件了。

业务中需要埋点也可以使用 Cat 的 Transaction,比如下单,支付等核心功能,通常我们对 URL 进行埋点就可以了,也就包含了具体的业务流程。

图片

图片

Event 报表

适用于监控一段代码运行次数,比如记录程序中一个事件记录了多少次,错误了多少次。Event 报表的整体结构与 Transaction 报表几乎一样,只缺少响应时间的统计。

埋点方式:

1
复制代码 Cat.logEvent("Func", "Func1");

图片

图片

Problem 报表

Problem 记录整个项目在运行过程中出现的问题,包括一些异常、错误、访问较长的行为。

如果有人反馈你的接口报 500 错误了,你进 Cat 后就直接可以去 Problem 报表了,错误信息就在 Problem 中。

图片

图片

Problem 报表不需要手动埋点,我们只需要在项目中集成日志的 LogAppender 就可以将所有 error 异常记录,下面的段落中会讲解如何整合 LogAppender。

Heartbeat 报表

Heartbeat 报表是 CAT 客户端,以一分钟为周期,定期向服务端汇报当前运行时候的一些状态。

图片

图片

系统指标有系统的负载信息,内存使用情况,磁盘使用情况等。

JVM 指标有 GC 相关信息,线程相关信息。

Business 报表

Business 报表对应着业务指标,比如订单指标。与 Transaction、Event、Problem 不同,Business 更偏向于宏观上的指标,另外三者偏向于微观代码的执行情况。

这个报表我也没怎么用过,用的多的还是前面几个。

图片

图片

Cat 在 Kitty Cloud 中的应用

Kitty Cloud 的基础组件是 Kitty,Kitty 里面对需要的一些框架都进行了一层包装,比如扩展,增加 Cat 埋点之类的功能。

Cat 的集成

Kitty 中对 Cat 封装了一层,在使用的时候直接依赖 kitty-spring-cloud-starter-cat 即可整合 Cat 到项目中。

1
2
3
4
5
复制代码 <dependency>
       <groupId>com.cxytiandi</groupId>
       <artifactId>kitty-spring-cloud-starter-cat</artifactId>
       <version>Kitty Version</version>
 </dependency>

然后在 application 配置文件中配置 Cat 的服务端地址信息,多个英文逗号分隔:

1
复制代码cat.servers=47.105.66.210

在项目的 resources 目录下创建 META-INF 目录,然后在 META-INF 中创建 app.properties 文件配置 app.name。此名称是在 Cat 后台显示的应用名

1
复制代码app.name=kitty-cloud-comment-provider

最后需要配置一下 Cat 的 LogAppender,这样应用在记录 error 级别的日志时,Cat 可以及时进行异常告警操作。

在 logback.xml 增加下面的配置:

1
2
3
4
复制代码 <appender name="CatAppender" class="com.dianping.cat.logback.CatLogbackAppender"></appender>
 <root level="INFO">
     <appender-ref ref="CatAppender" />
 </root>

更详细的内容请移步 Cat 的 Github 主页进行查看。

MVC 框架埋点

基于 Spring Boot 做 Web 应用开发,我们最常用到的一个 Starter 包就是 spring-boot-starter-web。

如果你使用了 Kitty 来构建微服务的框架,那么就不再需要直接依赖 spring-boot-starter-web。而是需要依赖 Kitty 中的 kitty-spring-cloud-starter-web。

kitty-spring-cloud-starter-web 在 spring-boot-starter-web 的基础上进行了封装,会对请求的 Url 进行 Cat 埋点,会对一些通用信息进行接收透传,会对 RestTemplate 的调用进行 Cat 埋点。

在项目中依赖 kitty-spring-cloud-starter-web:

1
2
3
4
5
复制代码<dependency>
      <groupId>com.cxytiandi</groupId>
      <artifactId>kitty-spring-cloud-starter-web</artifactId>
      <version>Kitty Version</version>
</dependency>

启动项目,然后访问你的 REST API。可以在 Cat 的控制台看到 URL 的监控信息。

图片

图片

点击 URL 进去可以看到具体的 URL 信息。

图片

图片

再进一步可以看到整个 URL 的信息,比如数据库的查询,缓存的操作,Http 的调用等。后端同学在优化性能的时候就直接从 URL 下手可以将整个请求的链路耗时的情况都分析清楚。

图片

图片

Mybatis 埋点

Kitty 中 Mybatis 是用的 Mybatis Plus, 主要是对数据库相关操作的 SQL 进行了 Cat 埋点,可以很方便的查看 SQL 的耗时情况。

依赖 kitty-spring-cloud-starter-mybatis:

1
2
3
4
5
复制代码 <dependency>
     <groupId>com.cxytiandi</groupId>
     <artifactId>kitty-spring-cloud-starter-mybatis</artifactId>
     <version>Kitty Version</version>
 </dependency>

其他的使用方式还是跟 Mybatis Plus 一样,具体参考 Mybatis Plus 文档:https://mp.baomidou.com[2]

只要涉及到数据库的操作,都会在 Cat 中进行数据的展示。

图片

图片

点击 SQL 进去还可以看到是哪个 Mapper 的操作。

图片

图片

再进一步就可以看到具体的 SQL 语句和消耗的时间。

图片

图片

有了这些数据,后端研发同学就可以对相关的 SQL 进行优化了。

Redis 埋点

如果需要使用 Spring Data Redis 的话,直接集成 kitty-spring-cloud-starter-redis 就可以,kitty-spring-cloud-starter-redis 中对 Redis 的命令进行了埋点,可以在 Cat 上直观的查看对应的命令和消耗的时间。

添加对应的 Maven 依赖:

1
2
3
4
5
复制代码<dependency>
     <groupId>com.cxytiandi</groupId>
     <artifactId>kitty-spring-cloud-starter-redis</artifactId>
     <version>Kitty Version</version>
 </dependency>

直接使用 StringRedisTemplate:

1
2
3
4
复制代码@Autowired
private StringRedisTemplate stringRedisTemplate;
 
stringRedisTemplate.opsForValue().set("name", "yinjihuan");

Cat 中可以看到 Redis 信息。

图片

图片

点击 Redis 进去可以看到有哪些命令。

图片

图片

再进去可以看到命令的详细信息,比如操作的 key 和消耗的时间。

图片

图片

MongoDB 埋点

Kitty 中对 Spring Data Mongodb 做了封装,只对 MongoTemplate 做了埋点。使用时需要依赖 kitty-spring-cloud-starter-mongodb。

1
2
3
4
5
复制代码<dependency>
    <groupId>com.cxytiandi</groupId>
    <artifactId>kitty-spring-cloud-starter-mongodb</artifactId>
    <version>Kitty Version</version>
</dependency>

在发生 Mongo 的操作后,Cat 上就可以看到相关的数据了。

图片

图片

点进去就可以看到是 MongoTemplate 的哪个方法发生了调用。

图片

图片

再进一步就可以看到具体的 Mongo 参数和消耗的时间。

图片

图片

还有 Dubbo, Feign,Jetcache,ElasticSearch 等框架的埋点就不细讲了,感兴趣的可以移步 Github 查看代码。

Cat 使用小技巧

埋点工具类

如果要对业务方法进行监控,我们一般会用 Transaction 功能,将业务逻辑包含在 Transaction 里面,就能监控这个业务的耗时信息。

埋点的方式也是通过 Cat.newTransaction 来进行,具体可以参考上面 Transaction 介绍时给出的埋点示列。

像这种埋点的方式最好是有一个统一的工具类去做,将埋点的细节封装起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码public class CatTransactionManager {
public static <T> T newTransaction(Supplier<T> function, String type, String name, Map<String, Object> data) {
Transaction transaction = Cat.newTransaction(type, name);
if (data != null && !data.isEmpty()) {
data.forEach(transaction::addData);
}
try {
T result = function.get();
transaction.setStatus(Message.SUCCESS);
return result;
} catch (Exception e) {
Cat.logError(e);
if (e.getMessage() != null) {
Cat.logEvent(type + "_" + name + "_Error", e.getMessage());
}
transaction.setStatus(e);
throw e;
} finally {
transaction.complete();
}
}
}

工具类使用:

1
2
3
4
5
6
7
8
9
10
11
复制代码public SearchResponse search(SearchRequest searchRequest, RequestOptions options) {
Map<String, Object> catData = new HashMap<>(1);
catData.put(ElasticSearchConstant.SEARCH_REQUEST, searchRequest.toString());
return CatTransactionManager.newTransaction(() -> {
try {
return restHighLevelClient.search(searchRequest, options);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, ElasticSearchConstant.ES_CAT_TYPE, ElasticSearchConstant.SEARCH, catData);
}

通过使用工具类,不再需要每个监控的地方都是设置 Transaction 是否 complete,是否成功这些信息了。

注解埋点

为了让 Transaction 使用更方便,我们可以自定义注解来做这个事情。比如需要监控下单,支付等核心业务方法,那么就可以使用自定义的 Transaction 注解加在方法上,然后通过 AOP 去统一做监控。

定义注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface CatTransaction {
/**
* 类型, 默认为Method
* @return
*/
String type() default "";
/**
* 名称, 默认为类名.方法名
* @return
*/
String name() default "";
/**
* 是否保存参数信息到Cat
* @return
*/
boolean isSaveParamToCat() default true;
}

定义切面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码@Aspect
public class CatTransactionAspect {
@Around("@annotation(catTransaction)")
public Object aroundAdvice(ProceedingJoinPoint joinpoint, CatTransaction catTransaction) throws Throwable {
String type = catTransaction.type();
if (StringUtils.isEmpty(type)){
type = CatConstantsExt.METHOD;
}
String name = catTransaction.name();
if (StringUtils.isEmpty(name)){
name = joinpoint.getSignature().getDeclaringType().getSimpleName() + "." + joinpoint.getSignature().getName();
}
Map<String, Object> data = new HashMap<>(1);
if (catTransaction.isSaveParamToCat()) {
Object[] args = joinpoint.getArgs();
if (args != null) {
data.put("params", JsonUtils.toJson(args));
}
}
return CatTransactionManager.newTransaction(() -> {
try {
return joinpoint.proceed();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}, type, name, data);
}
`}`

注解使用:

1
2
3
4
复制代码@CatTransaction
@Override
public Page<ArticleIndexBO> searchArticleIndex(ArticleIndexSearchParam param) {
}

你可能关心的几个问题

Cat 能做链路跟踪吗?

Cat 主要是一个实时监控系统,并不是一个标准的全链路系统,主要是 Cat 的 logview 在异步线程等等一些场景下,不太合适,Cat 本身模型并不适合这个。Cat 的 Github 上有说明:在美团点评内部,有 mtrace 专门做全链路分析。

但是如果在 Mvc,远程调用等这些框架中做好了数据的无缝传输,Cat 也可以充当一个链路跟踪的系统,基本的场景足够了。

Cat 也可以构建远程消息树,可以看到请求经过了哪些服务,每个服务的耗时等信息。只不过服务之间的依赖关系图在 Cat 中没有。

下图请求从网关进行请求转发到 articles 上面,然后 articles 里面调用了 users 的接口。

图片

图片

Cat 跟 Skywalking 哪个好用?

Skywalking 也是一款非常优秀的 APM 框架,我还没用过,不过看过一些文档,功能点挺全的 ,界面也挺好看。最大的优势是不用像 Cat 一样需要埋点,使用字节码增强的方式来对应用进行监控。

之所以列出这个小标题是因为如果大家还没有用的话肯定会纠结要选择落地哪个去做监控。我个人认为这两个都可以,可以自己先弄个简单的版本体验体验,结合你想要的功能点来评估落地哪个。

用 Cat 的话最好有一套基础框架,在基础框架中埋好点,这样才能在 Cat 中详细的显示各种信息来帮助我们快速定位问题和优化性能。

感兴趣的 Star 下呗:https://github.com/yinjihuan/kitty-cloud[3]

关于作者:尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。个人微信 jihuan900,欢迎勾搭。

参考资料

[1] https://github.com/dianping/cat: https://github.com/dianping/cat

[2] https://mp.baomidou.com: https://mp.baomidou.com

[3] https://github.com/yinjihuan/kitty-cloud: https://github.com/yinjihuan/kitty-cloud

感兴趣的可以关注下我的微信公众号 猿天地,更多技术文章第一时间阅读。我的GitHub也有一些开源的代码 github.com/yinjihuan

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…805806807…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%