开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Service层的接口是不是多此一举?

发表于 2021-08-16

这是我参与8月更文挑战的第 16 天,活动详情查看:8月更文挑战

今天我们要探讨的问题是:Service层需要接口?

现在结合我参与的项目以及阅读的一些项目源码来看。如果「 项目中使用了像Spring这样的依赖注入框架,那可以不用接口 」!

先来说说为什么使用了依赖注入框架以后,可以不使用接口!

不需要接口的理由

我整理了支持Service层和Dao层需要加上接口的理由,总结下来就这么三个:

  • 可以在尚未实现具体Service逻辑的情况下编写上层代码,如Controller对Service的调用
  • Spring默认是基于动态代理实现AOP的,动态代理需要接口
  • 可以对Service进行多实现

实际上,这三个理由都站不住脚!

先说说第一个理由:「上层可以在下层逻辑没有实现的情况下进行编码」!很典型的面向接口编程,对层与层之间进行了解耦,看起来好像没有问题。

这种开发方式适合不同模块之间是由不同的人或项目组开发的,因为沟通的成本比较大。同时避免由于项目组之间开发进度的差异而相互影响。

不过让我们回想一下,在一般项目开发里面,有多少项目组是按层来切分开发任务的呢?实际上,大部分的项目都是按照功能划分的。即使是现在前后端分离的情况,单纯的后端开发也是按照功能模块进行任务划分,即一个人负责从Controller层到DAO层的完整逻辑处理。在这种情况下,每一层都先定义一个接口,再去实现逻辑,除了增加了开发人员的工作量(当然,如果代码量计入工作量的话,那开发人员应该也不是太排斥接口的!),实际没有任何用处。

如果开发人员想在下层逻辑没有完成的情况下,先开发上层逻辑,可以先编写下层类的空方法来先完成上层的逻辑。

这里推荐一个个人比较喜欢的开发流程,自上向下的编码流程:

  • 先在Controller层编写逻辑,遇到需要委托Service调用的地方,直接先写出调用代码。优先完成Controller层的流程
  • 然后使用IDE的自动补全,对刚才调用下层的代码生成对应的类和方法,在里面添加TODO
  • 等所有的类和方法都补全了,再基于TODO,按照上面的流程去一个个的完善逻辑。

此方法可以使你对业务流程有比较好的理解。

对于第二个理由,就完全不成立了。Spring默认是基于动态代理的,不过通过配置是可以使用CGLib来实现AOP。CGLib是不需要接口的。

最后一个理由是「可以对Service进行多实现」。这个理由不充分,或者说没有考虑场景。实际上在大多数情况下是不需要多实现,或者说可以使用其它方式替代基于接口的多实现。

另外,对于很多使用了接口的项目,项目结构也是有待商榷的!下面,我们结合项目结构来说明。

项目结构与接口实现

一般项目结构都是按层来划分的,如下所示:

  • Controller
  • Service
  • Dao

对于不需要多实现的情况,也就不需要接口了。上面的项目结构即可满足要求。

对于需要多实现的情况,无论是现在需要,还是后面需要。这种情况下,看起来好像是需要接口。此时的项目结构看起来像这样:

  • Controller
  • Service
    • —-接口在一个包中
      • impl —实现在另一个包里
  • Dao

对于上面的结构,我们来考虑多实现的情况下,该怎么处理?

第一种方式,是在Service中新增一个包,在里面编写新的逻辑,然后修改配置文件,将新实现作为注入对象。

  • Controller
  • Service
    • —- 接口在一个包中
      • impl —实现在另一个包里
      • impl2 —新实现在另一个包里
  • Dao

第二种方式,是新增一个Service模块,在里面编写新的逻辑(注意这里的包和原来Service的包不能相同,或者包相同,但是类名不同,否则无法创建类。因为在加载时需要同时加载两个Service模块,如果包名和类名都相同,两个模块的类全限定名就是一样的了!),然后修改配置文件,将新逻辑作为注入对象。

  • Controller
  • Service
    • —- 接口在一个包中
      • impl —实现在另一个包里
  • Service2
    • impl2 —新实现在另一个包里
  • Dao

相对而言,实际第一种方式相对更简单一点,只需要关注包层面。而第二种方式需要关注模块和包两个层面。另外,实际这两种方式都导致了项目中包含了不需要的逻辑代码。因为老逻辑都会被打进包里。

不过,从结构上来看,实际方式二的结构要比方式一的结构更清晰,因为从模块上能区分逻辑。

那有没有办法来结合两者的优点呢?答案是肯定的,而且操作起来也不复杂!

首先将接口和实现独立开,作为一个独立的模块:

  • Controller
  • Service — 接口模块
  • ServiceImpl
    • impl —实现在另一个包里
  • ServiceImpl2
    • impl2 —新实现在另一个包里
  • Dao

其次,调整打包配置,ServiceImpl和ServiceImpl2二选一。既然ServiceImpl和ServiceImpl2是二选一,那ServiceImpl和ServiceImpl2的包结构就可以相同。包结构相同了,那调整了依赖以后,依赖注入相关的配置就不需要调整了。调整后,项目结构看起来像这样:

  • Controller
  • Service — 接口模块
  • ServiceImpl
    • impl —实现在另一个包
  • ServiceImpl2
    • impl —新实现和老实现在相同的包中
  • Dao

现在,ServiceImpl和ServiceImpl2模块中的包结构、类名都是一样的。那我们还需要接口模块吗?

假设,我们把Service接口模块去掉,结构变成了如下所示:

  • Controller
  • Service1 — 老实现
  • Service2 — 新实现
  • Dao

单纯的通过调整模块依赖,是否能实现Service的多实现?答案显而易见吧?

不使用接口的缺点

上面给出了不使用接口的理由。不过不使用接口并不是完全没有缺点的,主要问题就是在进行多实现的时候,没有一个强接口规范。即不能通过实现接口,借助IDE快速生成框架代码。对于没有实现的接口,IDE也能给出错误提醒。

一个不太优雅的解决是,将原来的模块里的代码拷贝一份到新模块中,基于老代码来实现新的逻辑。

所以,如果一个项目需要多实现、且多实现数量较多(不过一般项目不会有多个实现的),则推荐使用接口。否则不需要使用接口。

总结

本文针对「Service层是否需要接口」这个问题,指出需要接口的理由的问题。以及个人对这个问题的观点,希望对大家有一些帮助。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot整合高性能微服务框架 gRPC | 8月

发表于 2021-08-16

这是我参与8月更文挑战的第16天,活动详情查看:8月更文挑战

作者:Tom哥

微信公众号:微观技术

1、简介

在 gRPC 里,客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得我们能够更容易地创建分布式应用和服务。

gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

目前有非常多优秀的开源项目采用 gRPC 作为通信方式,例如说 Kubernetes、SkyWalking、istio 等等。甚至说,Dubbo 自 2.7.5 版本之后,开始提供对 gRPC 协议的支持

gRPC 主要提供了新增两种 RPC 调用方式:

  • 普通 RPC 调用方式,即请求 - 响应模式。
  • 基于 HTTP/2.0 的 streaming 调用方式。

gRPC 服务调用支持同步和异步方式,同时也支持普通的 RPC 和 streaming 模式,可以最大程度满足业务的需求。

streaming 模式,可以充分利用 HTTP/2.0 协议的多路复用功能,实现在一条 HTTP 链路上并行双向传输数据,有效的解决了 HTTP/1.X 的数据单向传输问题,在大幅减少 HTTP 连接的情况下,充分利用单条链路的性能,可以媲美传统的 RPC 私有长连接协议:更少的链路、更高的性能。

图片

gRPC 的网络 I/O 通信基于 Netty 构建,服务调用底层统一使用异步方式,同步调用是在异步的基础上做了上层封装。因此,gRPC 的异步化是比较彻底的,对于提升 I/O 密集型业务的吞吐量和可靠性有很大的帮助。

netty采用多路复用的 Reactor 线程模型:基于 Linux 的 epoll 和 Selector,一个 I/O 线程可以并行处理成百上千条链路,解决了传统同步 I/O 通信线程膨胀的问题。NIO 解决的是通信层面的异步问题,跟服务调用的异步没有必然关系。

应用场景:

公司早期,为了满足业务快速迭代,技术选型随意,经常遇到多种语言开发,比如:java、python、php、.net 等搭建了不同的业务系统。现在考虑平台化技术升级,一些基础功能需要收拢统一,建设若干微服务中心(如:用户中心、权限中心)。基于此背景,如何做技术选型,我们可以考虑使用gRPC。

gRPC实现步骤:

  • 定义一个服务,指定其能够被远程调用的方法(包含参数、返回类型)
  • 在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端请求
  • 在客户端实现一个存根 Stub ,用于发起远程方法调用

图片

gRPC 客户端和服务端可以在多种语言与环境中运行和交互!我们可以很容易地用 Java 创建一个 gRPC 服务端,用 Java、Go、Python、Ruby 来创建 gRPC 客户端来访问它。

2、proto 接口规范

在pom.xml中添加以下依赖项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
xml复制代码<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty</artifactId>
    <version>${grpc.version}</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>${grpc.version}</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>${grpc.version}</version>
</dependency>
  • 引入 grpc-protobuf 依赖,使用 Protobuf 作为序列化库。
  • 引入 grpc-stub 依赖,使用 gRPC Stub 作为客户端。

添加maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
xml复制代码<build>
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.5.0.Final</version>
        </extension>
    </extensions>
    <plugins>
        <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.5.1</version>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.15.0:exe:${os.detected.classifier}</pluginArtifact>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  • 引入 os-maven-plugin 插件,从 OS 系统中获取参数。因为需要通过它从 OS 系统中获取 os.detected.classifier 参数。
  • 引入 protobuf-maven-plugin 插件,实现将proto 目录下的protobuf 文件,生成Service 和 Message 类。

定义proto接口规范

1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码service UserService {
    rpc query (UserRequest) returns (UserResponse);
}

message UserRequest {
    string name = 1;

}
message UserResponse {
    string name = 1;
    int32 age = 2;
    string address = 3;
}

点击 IDEA 的「compile」按钮,编译 spring-boot-bulking-grpc-proto 项目,并同时执行 protobuf-maven-plugin 插件进行生成。结果如下图所示:

图片

3、服务端实现

定义注解类,用于扫描Grpc相关接口服务

1
2
3
4
5
6
less复制代码@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface GrpcService {
}

接口实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@GrpcService
public class UserService extends UserServiceGrpc.UserServiceImplBase {

    @Override
    public void query(UserRequest request, StreamObserver<UserResponse> responseObserver) {
        System.out.println(" UserService 接收到的参数,name:" + request.getName());

        UserResponse response = UserResponse.newBuilder().setName("微观技术").setAge(30).setAddress("上海").build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

}

启动grpc server端,监听9091端口,并添加proto定义的接口实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typescript复制代码@Component
public class ServiceManager {
    private Server server;
    private int grpcServerPort = 9091;
    public void loadService(Map<String, Object> grpcServiceBeanMap) throws IOException, InterruptedException {
        ServerBuilder serverBuilder = ServerBuilder.forPort(grpcServerPort);
        // 采用注解扫描方式,添加服务
        for (Object bean : grpcServiceBeanMap.values()) {
            serverBuilder.addService((BindableService) bean);
            System.out.println(bean.getClass().getSimpleName() + " is regist in Spring Boot!");

        }
        server = serverBuilder.build().start();

        System.out.println("grpc server is started at " + grpcServerPort);

        // 增加一个钩子,当JVM进程退出时,Server 关闭
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                if (server != null) {
                    server.shutdown();
                }
                System.err.println("*** server shut down!!!!");
            }
        });
        server.awaitTermination();
    }
}

Server 端启动成功

图片

4、客户端调用

定义接口的Stub实例,用于发起远程服务调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typescript复制代码@Configuration
public class GrpcServiceConfig {
    @Bean
    public ManagedChannel getChannel() {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9091)
                .usePlaintext()
                .build();
        return channel;
    }
    @Bean
    public HelloServiceGrpc.HelloServiceBlockingStub getStub1(ManagedChannel channel) {
        return HelloServiceGrpc.newBlockingStub(channel);
    }
    @Bean
    public UserServiceGrpc.UserServiceBlockingStub getStub2(ManagedChannel channel) {
        return UserServiceGrpc.newBlockingStub(channel);
    }
}

Restful接口调用,访问:http://localhost:8098/query

1
2
3
4
5
6
7
8
9
10
ini复制代码@RequestMapping("/query")
public String query() {
    UserRequest request = UserRequest.newBuilder()
            .setName("微观技术")
            .build();
    UserResponse userResponse = userServiceBlockingStub.query(request);
    String result = String.format("name:%s  , age:%s , address:%s ", userResponse.getName(), userResponse.getAge(), userResponse.getAddress());
    System.out.println(result);
    return result;
}

5、开箱即用 Starter 组件

gRPC 社区暂时没有提供 Spring Boot Starter 库,以简化我们对 gRPC 的配置。不过国内有大神已经开源了一个。

地址:github.com/yidongnan/g…

特性:

  • 在 spring boot 应用中,通过 @GrpcService 自动配置并运行一个嵌入式的 gRPC 服务
  • 使用 @GrpcClient 自动创建和管理你的 gRPC Channels 和 stubs
  • 支持 Spring Cloud (向 Consul 或 Eureka 或 Nacos 注册服务并获取gRPC服务信息)
  • 支持 Spring Sleuth 进行链路跟踪(需要单独引入 brave-instrumentation-grpc)
  • 支持对 server、client 分别设置全局拦截器或单个的拦截器
  • 支持 Spring-Security
  • 支持metric (基于 micrometer / actuator )
  • 也适用于 (non-shaded) grpc-netty

6、项目源码地址

github.com/aalansehaiy…

1
2
3
4
vbscript复制代码三个模块:
spring-boot-bulking-grpc-proto
spring-boot-bulking-grpc-client
spring-boot-bulking-grpc-server

作者介绍:

Tom哥,计算机研究生,校招进阿里,P7技术专家,出过专利,CSDN博客专家。负责过电商交易、社区生鲜、流量营销、互联网金融等业务,多年一线团队管理经验

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spock上手经典

发表于 2021-08-16

这是我参与8月更文挑战的第15天,活动详情查看:8月更文挑战

上一篇文章简单介绍了一下spock的概念,这篇文章开始从实战角度一步步学习spock的应用。因为整个的测试代码都需要用groovy来写,如果有必要的话,可以简单学一些groovy这门语言。如果不想学,用官方给的demo也能完全覆盖基本的需要。

spock有一些重要的概念:

Specification

这是spock的基础类,里面含有大量比较重要的方法。每一个单测类都会继承这个类。

Fields

这里会定义很多字段,这属于setup阶段,把需要mock的spring bean或者mock之后的返回在这里定义。举个例子:

1
2
3
4
5
6
groovy复制代码def studentDao = Mock(StudentDao)
def tester = new StudentService(studentDao: studentDao)
...
given: "设置请求参数"
def student1 = new StudentDTO(id: 1, name: "张三", province: "北京")
def student2 = new StudentDTO(id: 2, name: "李四", province: "上海")

Fixture Methods

固定的方法。这里是Specification中预置的一些方法,有以下几种:

1
2
3
4
csharp复制代码def setupSpec() {}    // runs once -  before the first feature method
def setup() {} // runs before every feature method
def cleanup() {} // runs after every feature method
def cleanupSpec() {} // runs once - after the last feature method

可以理解为数据的预热,和执行完的清理。这在junit中也有类似的概念,作为单元测试必备的方法。

Feature Methods

这就是spock比较优秀的地方了。它规定了一些既定的测试步骤,甚至让非技术人员也能一眼看明白,并且书写也极为简单。分为以下几种:

动态方法名

在执行单测时,每push一个元素,都是按照下面的文字打印出来,尤其适合那些if/else的分支

1
2
3
arduino复制代码def "pushing an element #element on the stack"() {
// blocks go here
}

Given Blocks

顾名思义,这里是输入,可以带一些说明:

1
2
3
scala复制代码given: "setup new stack and input elemet"
def stack = new Stack()
def elem = "push me"

given是可选的,可以用setup()方法代替。

When and Then Blocks

也非常好理解,就是当某条件发生时,结果应当为什么。跟junit中的assert比较相似。举个例子:

1
2
3
4
5
6
7
arduino复制代码when: "push stack"
stack.push(elem)

then: "check stack status"
!stack.empty
stack.size() == 1
stack.peek() == elem

这里的操作非常的多,覆盖面也非常广,但基本上是boolean操作,也包含了exception的判断。后续可以慢慢说。

Expect Blocks

这个和when/then操作比较类似,更适合那种相对简单的判定,比如只是一行表达式的判定:

1
2
3
4
5
ini复制代码when:
def x = Math.max(1, 2)

then:
x == 2
1
2
css复制代码expect:
Math.max(1, 2) == 2

节省代码,也更简洁明了。

Cleanup Blocks

清道夫的角色,需要释放一些资源,不然可能会其他的测试case产生影响。

1
2
3
4
5
6
7
8
scala复制代码given:
def file = new File("/some/path")
file.createNewFile()

// ...

cleanup:
file.delete()

当然你可以选择使用cleanup()方法,但使用cleanup块更加灵活。

Where Blocks

Where永远是最后一个块,而且不能重复,这会在Data Driven Testing中有更大的用处,简单举个例子:

1
2
3
4
5
6
7
8
9
ini复制代码def "computing the maximum of two numbers"() {
expect:
Math.max(a, b) == c

where:
a << [5, 3]
b << [1, 9]
c << [5, 9]
}

expect中声明了计算公式,where中可以针对这个公式做多种计算和预期。这里的含义是,当a=5,b=1则c=5;当a=3,b=9则c=9,这里按照求最大值的方法计算了最大值。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

王者并发课-钻石03:琳琅满目-细数CompletableF

发表于 2021-08-16

欢迎来到《王者并发课》,本文是该系列文章中的第26篇,砖石中的第3篇。

从Java8开始,JDK引入了很多新的特性,包括lambda表达式、流式计算等,以及本文所要详述的CompletableFuture. 关于CompletableFuture,你可能首先会联想到Future接口,对于它我们并不陌生,在ThreadPoolExecutor和ForkJoinPool中都见过它的身影。如果你对此感到困惑的话,不妨先阅读我们的前两篇文章。

Future的接口定义本身并不复杂,使用起来也较为简单,它的核心是get()和isDone()方法。然而,Future的简单也导致了它在某些方面会存在先天性的不足。在某些场景下,Future可能无法满足我们的需求,比如我们无法通过Future实现对并发任务的编排。不过,幸运的是,本文所要介绍的CompletableFuture弥补了Future多方面的不足之处,它可能成为你的最佳之选,这也是本文为什么要谈CompletableFuture的原因。

在这篇文章中,我们将结合Future和线程池,探讨CompletableFuture与Future的不同之处,以及它的核心能力和最佳实践。

一、理解CompletableFuture

1. Future的局限性

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理:Future接口中没有关于异常处理的方法;

2. CompletableFuture与Future的不同

简单地说,CompletableFuture是Future接口的扩展和增强。CompletableFuture完整地继承了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

3. CompletableFuture初体验

当然,百闻不如一见,既然CompletableFuture如此神乎其神,我们不妨通过一个特定的场景来体验CompletableFuture的用法。

众所周知,王者中有注明的“草丛三杰(B)”,妲己就是其中之一,她蹲草丛的本领可谓一绝。话说这天,妲己远远看见地方小鲁班蹦蹦跳跳地走来,对付这样的脆皮最适合在草丛中来一波操作。于是,妲己侧身躲进了草丛,在小鲁班欢快地路过时,妲己一套熟练的231连招秒杀了小鲁班。小鲁班死不瞑目,因为他甚至还没看清对手的模样,很快啊!

在这个过程中,包含几组动作:捉拿鲁班、打出技能2、打出技能3以及打出技能1. 我们可以通过CompletableFuture的链式调用来表达这些动作:

1
2
3
4
5
6
java复制代码CompletableFuture.supplyAsync(CompletableFutureDemo::活捉鲁班)
.thenAccept(player -> note(player.getName())) // 接收supplyAsync的结果,获得对方名字
.thenRun(() -> attack("2技能-偶像魅力:鲁班受到妲己285点法术伤害,并眩晕1.5秒..."))
.thenRun(() -> attack("3技能-女王崇拜:妲己放出5团狐火,鲁班受到325++点法术伤害..."))
.thenRun(() -> attack("1技能-灵魂冲击:鲁班受到妲己520点点法术伤害..."))
.thenRunAsync(() -> note("鲁班,卒...")); // 使用线程池的其他线程

你看,使用CompletableFuture编排动作是不是很容易?在这段只有6行的代码中,我们已经使用了supplyAsync()和thenAccept()等4中不同的方法,并且同时使用了同步和异步。在以往,如果手工实现的话,至少需要洋洋洒洒几十行代码。那CompletableFuture是如何做到如此神功的呢?接着往下看。

二、CompletableFuture的核心设计

总体而言,CompletableFuture实现了Future和CompletionStage两个接口,并且只有少量的属性。但是,它有近2400余行的代码,并且关系复杂。所以,在核心设计方面,我们不会展开讨论。

现在,你已经知道,Future接口仅提供了get()和isDone这样的简单方法,仅凭Future无法为CompletableFuture提供丰富的能力。那么,CompletableFuture又是如何扩展自己的能力的呢?这就不得不说CompletionStage接口了,它是CompletableFuture核心,也是我们要关注的重点。

顾名思义,根据CompletionStage名字中的“Stage”,你可以把它理解为任务编排中的步骤。所谓步骤,即任务编排的基本单元,它可以是一次纯粹的计算或者是一个特定的动作。在一次编排中,会包含多个步骤,这些步骤之间会存在依赖、链式和组合等不同的关系,也存在并行和串行的关系。这种关系,类似于Pipeline或者流式计算。

既然是编排,就需要维护任务的创建、建立计算关系。为此,CompletableFuture提供了多达50多个方法,在数量上确实庞大且令人瞠目结舌,想要全部理解显然不太可能,当然也没有必要。虽然CompletableFuture的方法数量众多,但是在理解时仍有规律可循,我们可以通过分类的方式简化对方法的理解,理解了类型和变种,基本上我们也就掌握了CompletableFuture的核心能力。

根据类型,这些方法可以总结为以下四类,其他大部分方法都是基于这四种类型的变种:

类型 接收参数 返回结果 支持异步
Supply ✘ ✔︎ ✔︎
Apply ✔︎ ✔︎ ✔︎
Accept ✔︎ ✘ ✔︎
Run ✘ ✘ ✔︎

关于方法的变种

上述接种类型的方法一般都有三个变种方法:同步、异步和指定线程池。比如, thenApply()的三个变种方法如下所示:

1
2
3
java复制代码<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

下面这幅类图,展示了CompletableFuture和Future、CompletionStage以及Completion之间的关系。当然,由于方法众多,这幅图中并没有全部呈现,而是仅选取了部分重要的方法。

三、CompletableFuture的核心用法

前面已经说过,CompletableFuture的核心方法总共分为四类,而这四类方法又分为两种模式:同步和异步。所以,我们从这四类方法中选取了部分核心的API,它们都是我们经常用到的API。

  • 同步:使用当前线程运行任务;
  • 异步:使用CompletableFuture线程池其他线程运行任务,异步方法的名字中带有Async.

1. runAsync

runAsync()是CompletableFuture最常用的方法之一,它可以接收一个待运行的任务并返回一个CompletableFuture .

当我们想异步运行某个任务时,在以往需要手动实现Thread或者借助Executor实现。而通过runAsync()`就简单多了。比如,我们可以直接传入Runnable类型的任务:

1
2
3
4
5
6
java复制代码CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
note("妲己进入草丛蹲点...等待小鲁班出现");
}
});

此外,在Java8及之后的JDK版本中,我们还可以使用lambda表达式进一步简化代码:

1
java复制代码CompletableFuture.runAsync(() -> note("妲己进入草丛蹲点...等待小鲁班出现"));

这样看起来是不是很简单?相信很多同学也是采用这样的方式来使用runAsync(). 不过,如果你也这么用,那么你就要小心了,这里有陷阱。先卖个关子,文末尾会对CompletableFuture线程池做简要的讲解,帮助你如何避免采坑。

2. supply与supplyAsync

对于supply()这个方法,很多人第一印象可能会比较懵,不知道它是做什么的。但其实,它的名字已经说明了一切:所谓“supply”当然是提供结果的!换句话说,当我们使用supply()时,就表明我们会返回一个结果,并且这个结果可以被后续的任务所使用。

举个例子,在下面的示例代码中,我们通过supplyAsync()返回了结果,而这个结果在后续的thenApply()被使用。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码// 创建nameFuture,返回姓名
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "妲己";
});

// 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "爱你," + name;
});

//阻塞获得表白的结果
System.out.println(sayLoveFuture.get()); // 爱你,妲己

你看,一旦理解了supply()的含义,它也就如此简单了。如果你希望用新的线程运行任务,可以使用supplyAsync().

3. thenApply与thenApplyAsync

刚才,在前面我们已经介绍了supply(),已经知道它是用于提供结果的,并且顺带提了thenApply(). 很明显,不用说你可能已经知道thenApply()是supply()的搭档,用于接收supply()的执行结果,并执行特定的代码逻辑,最后返回CompletableFuture结果。

1
2
3
4
5
6
7
8
9
10
java复制代码
// 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "爱你," + name;
});

public <U> CompletableFuture <U> thenApplyAsync(
Function <? super T, ? extends U> fn) {
return uniApplyStage(null, fn);
}

4. thenAccept与thenAcceptAsync

作为supply()的档案,thenApply()并不是唯一的存在,thenAccept()也是。但与thenApply()不同,thenAccept()只接收数据,但不会返回,它的返回类型是Void.

1
2
3
4
5
6
7
8
java复制代码
CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> {
System.out.println("爱你," + name);
});

public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) {
return uniAcceptStage(null, action);
}

5. thenRun

thenRun()就比较简单了,不接收任务的结果,只运行特定的任务,并且也不返回结果。

1
2
3
java复制代码public CompletableFuture < Void > thenRun(Runnable action) {
return uniRunStage(null, action);
}

所以,如果你在回调中不想返回任何的结果,只运行特定的逻辑,那么你可以考虑使用thenAccept和thenRun. 一般来说,这两个方法会在调用链的最后面使用。.

6. thenCompose与 thenCombine

以上几种方法都是各玩各的,但thenCompose()与thenCombine()就不同了,它们可以实现对依赖和非依赖两种类型的任务的编排。

编排两个存在依赖关系的任务

在前面的例子中,在接收前面任务的结果时,我们使用的是thenApply(). 也就是说,sayLoveFuture在执行时必须依赖nameFuture的完成,否则执行个锤子。

1
2
3
4
5
6
7
8
9
java复制代码// 创建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "妲己";
});

// 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "爱你," + name;
});

但其实,除了thenApply()之外,我们还可以使用thenCompose()来编排两个存在依赖关系的任务。比如,上面的示例代码可以写成:

1
2
3
4
5
6
7
8
java复制代码// 创建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "妲己";
});

CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> {
return CompletableFuture.supplyAsync(() -> "爱你," + name);
});

可以看到,thenCompose()和thenApply()的核心不同之处在于它们的返回值类型:

  • thenApply():返回计算结果的原始类型,比如返回String;
  • thenCompose():返回CompletableFuture类型,比如返回CompletableFuture.

组合两个相互独立的任务

考虑一个场景,当我们在执行某个任务时,需要其他任务就绪才可以,应该怎么做?这样的场景并不少见,我们可以使用前面学过的并发工具类实现,也可以使用thenCombine()实现。

举个例子,当我们计算某个英雄(比如妲己)的胜率时,我们需要获取她参与的总场次(rounds),以及她获胜的场次(winRounds),然后再通过winRounds / rounds来计算。对于这个计算,我们可以这么做:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);

CompletableFuture < Object > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
return 0.0;
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
});
System.out.println(winRateFuture.get());

thenCombine()将另外两个任务的结果同时作为参数,参与到自己的计算逻辑中。在另外两个参数未就绪时,它将会处于等待状态。

7. allOf与anyOf

allOf()与anyOf()也是一对孪生兄弟,当我们需要对多个Future的运行进行组织时,就可以考虑使用它们:

  • allOf():给定一组任务,等待所有任务执行结束;
  • anyOf():给定一组任务,等待其中任一任务执行结束。

allOf()与anyOf()的方法签名如下:

1
2
java复制代码static CompletableFuture<Void>	 allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

需要注意的是,anyOf()将返回完任务的执行结果,但是allOf()不会返回任何结果,它的返回值是Void.

allOf()与anyOf()的示例代码如下所示。我们创建了roundsFuture和winRoundsFuture,并通过sleep模拟它们的执行时间。在执行时,winRoundsFuture将会先返回结果,所以当我们调用 CompletableFuture.anyOf时也会发现输出的是365.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码 CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
return 500;
} catch (InterruptedException e) {
return null;
}
});
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
return 365;
} catch (InterruptedException e) {
return null;
}
});

CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture);
System.out.println(completedFuture.get()); // 返回365

CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture);

在CompletableFuture之前,如果要实现所有任务结束后执行特定的动作,我们可以考虑CountDownLatch等工具类。现在,则多了一选项,我们也可以考虑使用CompletableFuture.allOf.

四、CompletableFuture中的异常处理

对于任何框架来说,对异常的处理都是必不可少的,CompletableFuture当然也不会例外。前面,我们已经了解了CompletableFuture的核心方法。现在,我们再来看如何处理计算过程中的异常。

考虑下面的情况,当rounds=0时,将抛出运行时异常。此时,我们应该如何处理?

1
2
3
4
5
6
7
8
9
java复制代码CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
throw new RuntimeException("总场次错误");
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
});
System.out.println(winRateFuture.get());

在CompletableFuture链式调用中,如果某个任务发生了异常,那么后续的任务将都不会再执行。对于异常,我们有两种处理方式:exceptionally()和handle().

1. 使用exceptionally()回调处理异常

在链式调用的尾部使用exceptionally(),捕获异常并返回错误情况下的默认值。需要注意的是,exceptionally()仅在发生异常时才会调用。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
throw new RuntimeException("总场次错误");
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
}).exceptionally(ex -> {
System.out.println("出错:" + ex.getMessage());
return "";
});
System.out.println(winRateFuture.get());

2. 使用handle()处理异常

除了exceptionally(),CompletableFuture也提供了handle()来处理异常。不过,与exceptionally()不同的是,当我们在调用链中使用了handle(),那么无论是否发生异常,都会调用它。所以,在handle()方法的内部,我们需要通过 if (ex != null) 来判断是否发生了异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
throw new RuntimeException("总场次错误");
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
}).handle((res, ex) -> {
if (ex != null) {
System.out.println("出错:" + ex.getMessage());
return "";
}
return res;
});
System.out.println(winRateFuture.get());

当然,如果我们允许某个任务发生异常而不中断整个调用链路,那么可以在其内部通过try-catch消化掉。

五、CompletableFuture中的线程池

在前面我们已经说过CompletableFuture中的任务有同步、异步和指定线程池三个变种。比如,当我们调用thenAccept()时,将不会使用新的线程,而是使用当前线程。而当我们使用thenAcceptAsync()时,则会创建新的线程。那么,在前面的所有示例中,我们都从未创建过线程,CompletableFuture又是如何创建新线程的?

答案是ForkJoinPool.commonPool(),我们熟悉的老朋友又回来了,还是它。当需要新的线程时,CompletableFuture会从commonPool中获取线程,相关源码如下:

1
2
3
4
java复制代码public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

可问题是,我们已经知道了commonPool的潜在风险,在生产环境中使用无异于给自己挖坑。那怎么办呢?当然是自定义线程池,如此重要的东西务必要掌握在自己的手上。换句话说,当我决定使用CompletableFuture的时候,默认就是我们要创建自己的线程池。不要偷懒,更不要存在侥幸心理。

CompletableFuture中每个核心类型的方法都提供了自定义线程池的重载,使用起来也较为简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码
// supplyAsync中可以指定线程池的方法
public static < U > CompletableFuture < U > supplyAsync(Supplier < U > supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}

// 自定义线程池示例
Executor executor = Executors.newFixedThreadPool(10);

CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
return 500;
} catch (InterruptedException e) {
return null;
}
}, executor);

小结

至此,关于CompletableFuture的讲解已经全部结束。我们已经知道,CompletableFuture是Future的扩展和增强,并提供了大量强大且好玩的优秀特性。这些特性可以帮助我们优雅地解决一些场景问题,而在此之前我们要实现相同的方案可能要花费很大的代价。

当然,CompletableFuture这朵玫瑰虽然很漂亮,但它的刺也同样尖锐,它并不是天生完美。因此,在使用CompletableFuture时仍要遵循一些必要的约束:

  • 自定义线程池:当你决定在生产环境使用CompletableFuture的时候,你应该同时准备好对应的线程池策略,而不是偷懒地使用默认的线程池;
  • 团队共识:技术就是这样,好与不好总是会有不同的标准。当你说好的时候,你的队友可能并不这么认为,反之你也可能也会反对某种技术观点。因此,当你决定采用CompletableFuture的时候,最好和团队同步你的策略,让大家都了解它的优点和潜在的风险,各行其是绝对不是好的策略。

最后,CompletableFuture的源码有近2400行,并且有大量的API. 说实话,在王者系列所分析的源码文章中,CompletableFuture的源码是截止目前最难以理解的。如果将源码展开讲解的话,大概需要数万字,这将直接劝退百分之九十以上的读者。所以,我们也不建议你硬啃所有的源码,而是建议在归纳分类的基础上,有针对性地掌握它的重点部分。当然,如果你饶有兴趣地读完了它所有的源码,在此给你点赞。

正文到此结束,恭喜你又上了一颗星✨

夫子的试炼

  • 动手:编写代码体验runAsync() 的用法,并指定线程池。

延伸阅读与参考资料

  • 《王者并发课》大纲与更新进度总览:juejin.cn/post/696727…
  • thepracticaldeveloper.com/differences…

关于作者

从业近十年,先后从事敏捷与DevOps咨询、Tech Leader和管理等工作,对分布式高并发架构有丰富的实战经验。热衷于技术分享和特定领域书籍翻译,掘金小册《高并发秒杀的设计精要与实现》作者。


关注公众号【MetaThoughts】,及时获取文章更新和文稿。

如果本文对你有帮助,欢迎点赞、关注、监督,我们一起从青铜到王者。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ChaosBlade:从混沌工程实验工具到混沌工程平台 混沌

发表于 2021-08-16

简介: ChaosBlade 是阿里巴巴 2019 年开源的混沌工程项目,已加入到 CNCF Sandbox 中。起初包含面向多环境、多语言的混沌工程实验工具 chaosblade,到现在发展到面向多集群、多环境、多语言的混沌工程平台 chaosblade-box,平台支持实验工具托管和工具自动化部署,通过统一用户实验界面,将用户的精力聚焦在通过混沌工程解决云原生过程中高可用问题上。本文从混沌实验模型抽象、混沌实验工具开源和混沌工程平台升级项目三阶段出发,详细介绍 ChaosBlade。

作者 | 肖长军(穹谷) 桑杰

ChaosBlade 是阿里巴巴 2019 年开源的混沌工程项目,已加入到 CNCF Sandbox 中。起初包含面向多环境、多语言的混沌工程实验工具 chaosblade,到现在发展到面向多集群、多环境、多语言的混沌工程平台 chaosblade-box,平台支持实验工具托管和工具自动化部署,通过统一用户实验界面,将用户的精力聚焦在通过混沌工程解决云原生过程中高可用问题上。本文从混沌实验模型抽象、混沌实验工具开源和混沌工程平台升级项目三阶段出发,详细介绍 ChaosBlade。

在今年可信云测评中,阿里云故障演练平台以最高分首批通过可信云混沌工程平台能力要求最高等级-先进级认证。

混沌实验模型

ChaosBlade 项目覆盖基础资源、应用服务、容器服务等混沌实验场景。在实验工具设计之初就考虑了场景模型统一,便于场景扩展和沉淀,也为平台托管实验工具实现统一场景调用提供模型依据。ChaosBlade 项目中所有的实验场景均遵循此实验模型设计,下面通过实验模型的推导、介绍、意义和具体的应用来详细介绍此模型。

1、实验模型的推导

混沌实验主要包含故障模拟,我们一般对故障的描述如下:

  • 10.0.0.1 机器上挂载的 A 磁盘满造成了服务不可用;
  • 所有节点上的 B dubbo 服务因为执行缓慢造成上游 A dubbo 服务调用延迟,从而造成用户访问缓慢;
  • Kubernetes A 集群中 B 节点上 CPU 所有核使用率满载,造成 A 集群中的 Pod 调度异常;
  • Kubernetes C 集群中 D Pod 网络异常,造成 D 相关的 Service 访问异常。

通过上述,我们可以使用以下句式来描述故障:因为某某机器(或集群中的资源,如 Node,Pod)上的哪个组件发生了什么故障,从而造成了相关影响。我们也可以通过下图来看故障描述拆分:

可以通过这四部分来描述现有的故障场景,所有我们抽象出了一个故障场景模型,也称为混沌实验模型。

2、实验模型的介绍

此实验模型详细描述如下:

  • Scope: 实验实施范围,指具体实施实验的机器、集群及其资源等。
  • Target: 实验靶点,指实验发生的组件。如基础资源场景中的 CPU、网络、磁盘等,Java 场景中的应用组件如 Dubbo、Redis、RocketMQ、JVM 等,容器场景中的 Node、Pod、Container自身等。
  • Matcher: 实验规则匹配器,根据所配置的 Target,定义相关的实验匹配规则,可以配置多个。由于每个 Target 可能有各自特殊的匹配条件,比如 RPC 领域的 Dubbo、gRPC 可以根据服务提供者提供的服务和服务消费者调用的服务进行匹配,缓存领域的 Redis,可以根据 set、get 操作进行匹配。还可以对 matcher 进行扩展,比如扩展实验场景执行策略,控制实验触发时间。
  • Action: 指实验模拟的具体场景,Target 不同,实施的场景也不一样,比如磁盘,可以演练磁盘满,磁盘 IO 读写高,磁盘硬件故障等。如果是应用,可以抽象出延迟、异常、返回指定值(错误码、大对象等)、参数篡改、重复调用等实验场景。如果是容器服务,可以模拟 Node、Pod、Container 资源异常或者其上的基础资源异常等。

使用此模型可以很清晰表达出以下实施混沌实验需要明确的问题:

  • 混沌实验的实施范围是什么
  • 实施混沌实验的对象是什么
  • 实验对象触发实验的条件有哪些
  • 具体实施什么实验场景

3、实验模型的意义

此模型具有以下特点:

  • 简洁:层次清晰,通俗易懂;
  • 通用:覆盖目前所有的故障场景,包含基础资源、应用服务、容器服务、云资源等;
  • 易实现:很方便的定义清晰的接口规范,实验场景扩展实现简单;
  • 语言、领域无关:可以扩展多语言、多领域的模型实现。

此模型具有以下的意义:

  • 更精准的描述混沌实验场景;
  • 更好的理解混沌实验注入;
  • 方便沉淀现有的实验场景;
  • 依据模型发掘更多的场景;
  • 混沌实验工具更加规范、简洁。

4、实验模型的应用

混沌实验模型的应用可归纳为以下几点:

  • 混沌实验模型使实验场景变量参数化,参数规范化;
  • 可遵循模型实现实验场景领域化的水平扩展;
  • 可将混沌实验模型和领域内标准化实现相结合,便捷实现领域内场景垂直扩展;
  • 上层的领域场景可以复用遵循混沌实验模型定义的场景;
  • 通过混沌实验模型声明的场景描述可以很好的接入到 ChaosBlade 中;
  • 遵循实验模型可以很方便的构建上层混沌实验平台。

下文重点介绍基于此模型实现的混沌工程工具 ChaosBlade。

混沌工程实验工具:ChaosBlade

阿里巴巴内部从最早引入混沌工程解决微服务的依赖问题,到业务服务、云服务稳态验证,进一步升级到公共云、专有云的业务连续性保障,以及在验证云原生系统的稳定性等方面积累了比较丰富的场景和实践经验。并且当时混沌工程相关的开源工具存在场景能力分散、上手难度大、缺少实验模型标准,场景难以扩展和沉淀等问题。这些问题就会导致很难实现平台化,你很难通过一个平台去囊括这些工具。所以开源混沌工程实验执行工具 chaosblade,下面通过场景介绍、使用方式、架构设计和案例来详细介绍此工具。

1、混沌实验场景

Chaosblade 工具设计初期就考虑了易用性和场景扩展的便捷性,方便大家上手使用以及根据各自需要扩展更多的实验场景,遵循混沌实验模型提供了统一的操作简洁的执行工具。混沌实验工具支持 Linux、Windows、Docker、Kubernetes等系统平台,覆盖 Java、Golang、NodeJS、C++ 语言应用,共涉及 200 多个实验场景,3000 多个实验参数(v1.0.0-GA)。目前包含的场景领域如下:

  • 基础资源:比如 CPU、内存、网络、磁盘、进程、内核等
  • 应用服务:比如数据库、缓存、消息、JVM 本身、微服务等,还可以指定任意类方法注入各种复杂的实验场景;指定任意方法或某行代码注入延迟、变量和返回值篡改等实验场景
  • Docker 容器:比如杀容器、容器内 CPU、内存、网络、磁盘、进程等实验场景
  • Kubernetes 平台:比如节点上 CPU、内存、网络、磁盘、进程实验场景,Pod 网络和 Pod 本身实验场景如杀 Pod,容器的实验场景如上述的 Docker 容器实验场景
  • 云资源:比如阿里云 ECS 宕机等实验场景

2、工具使用方式

ChaosBlade 是个直接下载解压就可以使用的工具,不需要安装,然后它支持的调用方式包含 CLI 方式,直接执行 blade 命令。

比如这里举的做网络延迟的例子,你添加 -h 参数就可以看到非常完善的命令提示,比如我要一个 9520 端口调用做网络丢包,对齐前面的实验模型,我们就可以看到,它的演练目标是 network,它的 action 是丢包,它的 matcher 就是调用远程的一个服务端口 9520。执行成功后会返回实验结果,每一个实验场景我们都会作为一个对象,它会返回一个实验对象的 UID,此 UID 用于后续的实验管理,比如销毁、查询实验都是通过此 UID 来做的。要销毁实验,也就是恢复实验,直接执行 blade destroy 命令就可以了。

ChaosBlade 另一种调用方式是 Web 方式,通过执行 server 命令对外暴露 HTTP 服务,那么在上层,你如果自己构建混沌实验平台的话,你直接可以通过 HTTP 请求去调用就可以。

3、工具架构设计

ChaosBlade 依据领域实现封装成各自独立的项目,每个项目根据各领域的最佳实践来实现,不仅能满足各领域使用习惯,而且还可以通过混沌实验模型来建立与 chaosblade cli 项目的关系,方便使用 chaosblade 来统一调用,各领域下的实验场景依据混沌实验模型生成 yaml 文件描述,暴露给上层混沌实验平台,混沌实验平台根据实验场景描述文件的变更,自动感知实验场景的变化,无需新增场景时再做平台开发,使混沌平台更加专注于混沌工程其他部分。目前包含的执行器项目如下:

  • chaosblade:混沌实验管理工具,包含创建实验、销毁实验、查询实验、实验环境准备、实验环境撤销等命令,是混沌实验的执行工具,执行方式包含 CLI 和 HTTP 两种。提供完善的命令、实验场景、场景参数说明,操作简洁清晰。
  • chaosblade-spec-go: 混沌实验模型 Golang 语言定义,便于使用 Golang 语言实现的场景都基于此规范便捷实现。
  • chaosblade-exec-os: 基础资源实验场景实现,如CPU、网络、内存、磁盘等。
  • chaosblade-exec-docker: Docker 容器实验场景实现,通过调用 Docker API 标准化实现。
  • chaosblade-operator: Kubernetes 平台实验场景实现,将混沌实验通过 Kubernetes 标准的 CRD 方式定义,很方便的使用 Kubernetes 资源操作的方式来创建、更新、删除实验场景,包括使用 kubectl、client-go 等方式执行,而且还可以使用上述的 chaosblade cli 工具执行。
  • chaosblade-exec-jvm: Java 应用实验场景实现,使用 Java Agent 技术动态挂载,无需任何接入,零成本使用,而且支持卸载,完全回收 Agent 创建的各种资源。
  • chaosblade-exec-cplus: C++ 应用实验场景实现,使用 GDB 技术实现方法、代码行级别的实验场景注入。

4、工具使用案例

通过一个 Dubbo 微服务案例,来介绍 chaosblade 工具的使用。这个微服务 Demo 分三级调用,consumer 调用 provider,provider 调用 base,同时 provider 还调用 mk-demo 数据库,provider 和 base 服务具有两个实例。

这个案例执行的实验场景是数据库调用延迟,我们先定义监控指标:慢 SQL 数和告警信息,做出期望假设:慢 SQL 数增加,钉钉群收到慢 SQL 告警。接下来执行实验。我们直接使用 chaosblade 工具执行,可以看下左下角,我们对 demo-provider 注入调用 mysql 查询时,若数据库是 demo 且表名是 d_discount,则对 50% 的查询操作延迟 600 毫秒。

我们使用阿里云产品 ARMS 做监控告警。大家可以看到,当执行完混沌实验后,很快钉钉群里就收到了报警。所以我们对比下之前定义的监控指标,是符合预期的。但需要注意的是这次符合预期并不代表以后也符合,所以需要通过混沌工程持续性的验证。出现慢 SQL,可通过 ARMS 的链路追踪来排查定位,可以很清楚的看出哪条语句执行慢。

混沌工程平台:chaosblade-box

为了让使用者将精力聚焦在通过混沌工程解决系统高可用问题上,而不是实验工具的选择、部署上,所以将 ChaosBlade 品牌进行升级,开源 chaosblade-box 混沌工程平台。平台托管主流的混沌实验工具,实现工具自动化的部署,通过统一的操作页面实现混沌工程实施。

下面通过平台的功能特点、架构设计及使用案例来介绍混沌工程平台 chaosblade-box。

1、平台功能特点

具备以下功能特点:

  • 支持开源实验工具托管:平台可托管业界主流的实验工具,如自身的 chaosblade 和外部的 litmuschaos 等。后续也会托管 chaos mesh 实验工具。
  • 具备丰富的实验场景:包含基础资源(CPU、内存、网络、磁盘、进程、内核、文件等)、多语言应用服务(Java、C++、NodeJS、Golang 等)、Kubernetes 平台(覆盖 Container、Pod、Node 资源场景,包含上述实验场景)。
  • 实验工具自动化部署:无需手动部署实验工具,实现实验工具在主机或集群上自动化部署。
  • 统一混沌实验用户界面:用户无需关心不同工具的使用方式,在统一用户界面进行混沌实验。
  • 多维度实验方式:支持从主机到 Kubernetes 资源,再到应用维度进行实验编排。
  • 集成云原生生态:采用 Helm 部署管理,集成 Prometheus 监控,支持云原生实验工具托管等。

2、平台架构设计

通过控制台页面可实现 chaosblade、litmuschaos 等已托管工具自动化部署,按照社区的建立的混沌实验模型统一实验场景,根据主机、Kubernetes、应用来划分目标资源,通过目标管理器来控制,在实验创建页面,可以实现白屏化的目标资源选择。平台通过调用混沌实验执行来执行不同工具的实验场景,配合接入 prometheus 监控,可以观察实验 metric 指标,后续会提供丰富的实验报告。Chaosblade-box 的部署也非常简单,具体可以查看:

github.com/chaosblade-…

3、使用说明

安装部署完成后,通过配置 Kubernetes 集群或者主机信息,可以在机器列表页面看到集群或主机数据。选择实验管理创建实验,演练维度支持主机、Node、Pod、Container 维度,选择相应的维度后,会出现对应的资源列表,可以很方便的选择。演练内容包含所托管的所有实验场景。完成实验创建后,自动跳转到演练详情页面,点击执行跳到任务详情页。

演练任务详情页面展示实验的基本信息和实验任务状态,可以很方便的控制实验,以及明确实验任务状态。

未来规划

1、chaosblade

ChaosBlade 未来以云原生为基础,提供面向多集群、多环境、多语言的混沌工程平台和混沌工程实验工具。实验工具继续聚焦在实验场景丰富度和稳定性方面,支持更多的 Kubernetes 资源场景和规范应用服务实验场景标准,提供多语言实验场景标准实现。

2、chaosblade-box

后续会将阿里云故障演练平台(可信云混沌工程平台先进型认证)核心功能开源,与现有的混沌工程平台进行融合,实现更多能力的开放。同时简化混沌工程工具部署实施方面,后续会托管更多的混沌实验工具和兼容主流的平台,实现场景推荐,提供业务、系统监控集成,输出实验报告,在易用的基础上完成混沌工程操作闭环。

作者介绍:

肖长军(花名:穹谷):阿里巴巴技术专家,开源项目 ChaosBlade Founder&Maintainer,阿里云故障演练平台端侧负责人,可信云标准专家,混沌工程布道师,多年分布式系统架构和稳定性建设经验。

桑杰:就职于中国农业银行研发中心,从事于财务相关系统大数据研发工作。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

五分钟带你体验一把分布式事务!so easy!

发表于 2021-08-16

@[toc]
网上关于分布式事务讲理论的多,讲实战的少,今天我想通过一个案例,来让小伙伴们感受一把分布式事务,咱们今天尽量少谈点理论。咱们今天的主角是 Seata!

分布式事务涉及到很多理论,如 CAP,BASE 等,很多小伙伴刚看到这些理论就被劝退了,所以我们今天不讲理论,咱们就看个 Demo,通过代码快速体验一把什么是分布式事务。

  1. 什么是 Seata?

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata 支持的事务模式有四种分别是:

  • Seata AT 模式
  • Seata TCC 模式
  • Seata Saga 模式
  • Seata XA 模式

Seata 中有三个核心概念:

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围,开始全局事务、提交或回滚全局事务。
  • RM ( Resource Manager ) - 资源管理器:管理分支事务处理的资源( Resource ),与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

其中,TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端。

这些概念小伙伴们作为一个了解即可,不了解也能用 Seata,了解了更能理解 Seata 的工作原理。

  1. 搭建 Seata 服务端

我们先来把 Seata 服务端搭建起来。

Seata 下载地址:

  • github.com/seata/seata…

目前最新版本是 1.4.2,我们就使用最新版本来做。

这个工具在 Windows 或者 Linux 上部署差别不大,所以我这里就直接部署在 Windows 上了,方便一些。

我们首先下载 1.4.2 版本的 zip 压缩包,下载之后解压,然后在 conf 目录中配置两个地方:

  1. 首先配置 file.conf 文件

file.conf 中配置 TC 的存储模式,TC 的存储模式有三种:

  • file:适合单机模式,全局事务会话信息在内存中读写,并持久化本地文件 root.data,性能较高。
  • db:适合集群模式,全局事务会话信息通过 db 共享,相对性能差点。
  • redis:适合集群模式,全局事务会话信息通过 redis 共享,相对性能好点,但是要注意,redis 模式在 Seata-Server 1.3 及以上版本支持,性能较高,不过存在事务信息丢失的风险,所以需要开发者提前配置适合当前场景的 redis 持久化配置。

这里我们为了省事,配置为 file 模式,这样事务会话信息读写在内存中完成,持久化则写到本地 file,如下图:

如果配置 db 或者 redis 模式,大家记得填一下下面的相关信息。具体如下图:

题外话

注意,如果使用 db 模式,需要提前准备好数据库脚本,如下(小伙伴们可以直接在公众号江南一点雨后台回复 seata-db 下载这个数据库脚本):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
sql复制代码CREATE DATABASE /*!32312 IF NOT EXISTS*/`seata2` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;

USE `seata2`;

/*Table structure for table `branch_table` */

DROP TABLE IF EXISTS `branch_table`;

CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `branch_table` */

/*Table structure for table `global_table` */

DROP TABLE IF EXISTS `global_table`;

CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `global_table` */

/*Table structure for table `lock_table` */

DROP TABLE IF EXISTS `lock_table`;

CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(128) DEFAULT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`branch_id` bigint(20) NOT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

另外还需要注意的是自己的数据库版本信息,改数据库连接的时候按照实际情况修改,Seata 针对 MySQL5.x 和 MySQL8.x 都提供了对应的数据库驱动(在 lib 目录下),我们只需要把驱动改好就行了。

  1. 再配置 registry.conf 文件

registry.conf 主要配置 Seata 的注册中心,我们这里采用大家比较熟悉的 Eureka,配置如下:

可以看到,支持的配置中心比较多,我们选择 Eureka,选好配置中心之后,记得修改配置中心相关的信息。

OK,现在就配置完成了,但是先别启动,还差一个 Eureka 注册中心。

  1. 项目配置

接下来我们配置项目。

Seata 官方提供了一个非常经典的 Demo,我们直接来看这个 Demo。

官方案例下载地址:github.com/seata/seata…

不过这里是很多案例混在一起的,可能看起来会比较乱,而且由于要下载的依赖比较多,所以极有可能依赖下载失败,因此大家也可以在公众号后台回复 seata-demo 获取松哥整理好的案例,直接导入即可,如下图:

这是一个商品下单的案例,我来和大家稍微解释下:

  • eureka:这是服务注册中心。
  • account:这是账户服务,可以查询/修改用户的账户信息(主要是账户余额)。
  • order:这是订单服务,可以下订单。
  • storage:这是一个仓储服务,可以查询/修改商品的库存数量。
  • bussiness:这是业务,用户下单操作将在这里完成。

这个案例讲了一个什么事呢?

当用户想要下单的时候,调用了 bussiness 中的接口,bussiness 中的接口又调用了它自己的 service,在 service 中,首先开启了全局分布式事务,然后通过 feign 调用 storage 中的接口去扣库存,然后再通过 feign 调用 order 中的接口去创建订单(order 在创建订单的时候,不仅会创建订单,还会扣除用户账户的余额),在扣除库存并完成订单创建之后,接下来会去检查用户的余额和库存数量是否正确,如果用户余额为负数或者库存数量为负数,则会进行事务回滚,否则提交事务。

本案例具体架构如下图:

这个案例就是一个典型的分布式事务问题,storage 和 order 中的事务分属于不同的微服务,但是我们希望他们同时成功或者同时失败。

现在大家明白了这个案例是干嘛的,我们就来把它跑起来。

首先创建一个名为 seata 的数据库,然后执行上面代码中的 all.sql 数据脚本。

接下来用 idea 打开上面这个项目,在每一个项目的 application.properties 文件中(Eureka 不用改),修改数据的连接信息,如下图:

除了 Eureka 之外,另外四个都要改哦。

OK,配置结束。

  1. 启动测试

首先启动 Eureka。

接下来先别记着启动其他服务,先启动 Seata Server,也就是我们第二小节配置的那个服务,在它的 bin 目录下,Windows 下双击/Linux 下执行启动脚本。

最后再分别启动剩下的四个服务,启动完成后,我们可以在 Eureka 中查看相关信息:

可以看到,各个服务都注册上来了。

接下来我们访问 bussiness 中提供的两个测试接口。

第一个测试接口是:

http://127.0.0.1:8084/purchase/commit

这个接口对应的代码是:io.seata.sample.controller.BusinessController#purchaseCommit,这个地方是模拟 U100000 用户购买了 30 个 C100000 商品,每个商品的价格是 100,商品库存是 200,用户账户余额是 10000,所以购买之后,商品库存变为 170,用户账户余额变为 7000。这是正常购买的情况。

1
2
3
4
5
6
7
8
9
java复制代码@RequestMapping(value = "/purchase/commit", produces = "application/json")
public String purchaseCommit() {
try {
businessService.purchase("U100000", "C100000", 30);
} catch (Exception exx) {
return exx.getMessage();
}
return "全局事务提交";
}

当我们调完这个接口之后,就可以去数据库查看相应的数据。

第二个测试的接口是:

http://127.0.0.1:8084/purchase/rollback

这个接口对应的代码是:io.seata.sample.controller.BusinessController#purchaseRollback,这次是模拟用户购买 99999 个商品,无论是用户账户余额还是商品库存数量,都无法支撑这次购买行为,因此这个接口的调用最终会回滚,数据库中的数据会保持原样。

1
2
3
4
5
6
7
8
9
java复制代码@RequestMapping("/purchase/rollback")
public String purchaseRollback() {
try {
businessService.purchase("U100000", "C100000", 99999);
} catch (Exception exx) {
return exx.getMessage();
}
return "全局事务提交";
}

这就是一个分布式事务案例。

小伙伴们感兴趣也可以研究一下官方这个案例,我们会发现这里的东西非常简单,单纯是如下方法上多了一个注解而已(io.seata.sample.service.BusinessService#purchase):

1
2
3
4
5
6
7
8
java复制代码@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
storageFeignClient.deduct(commodityCode, orderCount);
orderFeignClient.create(userId, commodityCode, orderCount);
if (!validData()) {
throw new RuntimeException("账户或库存不足,执行回滚");
}
}

purchase 方法用 @GlobalTransactional 注解标记了下,就开启了全局事务了,里边的两个调用都是 feign 的调用,对应了不同的服务,最后再做一个数据校验,校验失败就抛出异常,一旦该方法抛出异常,上面已经执行的代码就会回滚。

这个项目其余的代码都是微服务中的常规代码,就不赘述了。

  1. 实现原理

我们稍微来说下 Seata 中这个分布式事务的原理,先来看一张图:

这张图非常清晰的描述了上面的案例,大致流程如下:

  1. 有三个概念:TM、RM、TC,这些我们在第一小节已经介绍过了,这里就不再赘述。
  2. 首先由 Business 开启全局事务。
  3. 接下来 Business 在调用 Storage 和 Order 的时候,这两个在数据库操作之前都会向 TC 注册一个分支事务并提交。
  4. 分支事务在操作时,都会向 undo_log 表中提交一条记录,当全局事务提交的时候会清空 undo_log 表中的记录,否则将以该表中的记录为依据进行反向补偿(将数据恢复原样)。

具体到上面的案例,事务提交分两个阶段,过程如下:

一阶段:

  1. 首先 Business 开启全局事务,这个过程中会向 TC 注册,然后会拿到一个 xid,这是一个全局事务 id。
  2. 接下来在 Business 中调用 Storage 微服务。
  3. 来解析 SQL:得到 SQL 的类型(UPDATE),表(storage_tbl),条件(where commodity_code = ‘C100000’)等相关的信息。
  4. 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。

  1. 执行业务 SQL,也就是做真正的数据更新操作。
  2. 查询后镜像:根据前镜像的结果,通过主键定位数据。

  1. 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。

branch_id 和 xid 分别表示分支事务(即 Storage 自己的事务)和全局事务的 id,rollback_info 中保存着前后镜像的内容,这个将作为反向补偿(回滚)的依据,这个字段的值是一个 JSON,松哥挑出来这个 JSON 中比较重要的一部分来和大家分享:

  • beforeImage:这个是修改前数据库中的数据,可以看到每个字段的值,id 为 4,count 的值为 200。
  • afterImage:这个是修改后数据库中的数据,可以看到,此时 id 为 4,count 的值为 170。


  1. Storage 在提交前,会向 TC 注册分支:申请 storage_tbl 表中,主键值等于 4 的记录的全局锁。
  2. 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
  3. 同理,Order 和 Account 也按照上面的步骤提交数据。

以上 1-10 步就是一阶段的数据提交。

再来看二阶段:

二阶段有两种可能,提交或者回滚。

还是以上面的案例为例:

1
2
3
4
5
6
7
8
java复制代码@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
storageFeignClient.deduct(commodityCode, orderCount);
orderFeignClient.create(userId, commodityCode, orderCount);
if (!validData()) {
throw new RuntimeException("账户或库存不足,执行回滚");
}
}

下单时候,扣除了库存,并且创建了订单,最后一检查,发现库存为负数或者用户账户余额为负数,说明这个订单有问题,此时就该抛异常回滚,否则就提交数据。

具体操作如下:

回滚:

  1. 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
  2. 通过 xid 和 branch_id 去 undo_log 表中查找对应的记录。
  3. 数据校验:拿第二步查找到的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理。
  4. 第三步的比较如果相同,则根据 undo_log 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句。
  5. 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

提交:

  1. 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
  2. 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

换句话说,事务如果正常提交了,undo_log 表中是没有记录的,如果大家想看该表中的记录,可以在事务提交之前通过 DEBUG 的方式查看。

  1. 小结

讲了这么多,是不是就把 Seata 讲完了呢?NONONO!这只是 AT 模式而已!还有三种模式,松哥下篇文章再和小伙伴们分享。

好啦,这就是一个简单的分布式事务,小伙伴们先来感受一把!标题是五分钟感受一把分布式事务,因为文章里边我还和大家分享了原理,如果大家只是跑一下案例感受,五分钟应该够了,不信试试!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Netty 源码分析系列(十一)Netty 工作原理详解

发表于 2021-08-16

这是我参与8月更文挑战的第16天,活动详情查看:8月更文挑战

前言

上一篇文章我们对 Reactor 模型进行了详细的讲解,

没看的,可以去瞅瞅:Netty 源码分析系列(十)Reactor 模型

其他关于 Netty 源码分析的文章链接如下:

Netty 源码分析系列(九)Netty 程序引导类

Netty 源码分析系列(八)Netty 如何实现零拷贝

Netty 源码分析系列(七)字节缓冲区 ByteBuf(下)

Netty 源码分析系列(六)字节缓冲区 ByteBuf(上)

Netty 源码分析系列(五)ChannelPipeline源码分析

Netty 源码分析系列(四)ChannelHandler

Netty 源码分析系列(三)Channel 概述

Netty 源码分析系列(二)Netty 架构设计

Netty 源码分析系列(一)Netty 概述

下面我们就来探究一下 Netty 模型,Netty 采用的就是 主从 Reactor 多线程模型。

Netty 模型

下图就是 Netty 的工作原理图:

image-20210815210630321

执行流程如下:

  • Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写。
  • BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup。
  • NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop。
  • NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个NioEventLoop 都有一个selector ,用于监听绑定在其上的 socket 的网络通讯。
  • NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop。
  • 每个Boss NioEventLoop 循环执行的步骤有3步 :
+ 1、轮询accept 事件 ;
+ 2、处理accept 事件 ,与 client 建立连接 ,生成 `NioScocketChannel`,并将其注册到某个 worker NIOEventLoop 上的 selector;
+ 3、处理任务队列的任务 , 即 runAllTasks。
  • 每个 Worker NIOEventLoop 循环执行的步骤 :
+ 1、轮询`read`,`write` 事件 处理 I/O 事件, 即 `read , write` 事件;
+ 2、在对应`NioScocketChannel`处理;
+ 3、处理任务队列的任务 , 即 `runAllTasks`;
  • 每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道),pipeline 中包含了channel ,即通过pipeline 可以获取到对应通道,管道中维护了很多的 处理器(ChannelHandler)。

代码示例

引入Maven依赖

1
2
3
4
5
java复制代码<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.49.Final</version>
</dependency>

服务端的管道处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public class NettyServerHandler extends ChannelInboundHandlerAdapter {

//读取数据实际(这里我们可以读取客户端发送的消息)
/*
1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx =" + ctx);
Channel channel = ctx.channel();
//将 msg 转成一个 ByteBuf
//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress());
}


//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("公司最近账户没啥钱,再等几天吧!", CharsetUtil.UTF_8));
}

//处理异常, 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

服务端主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
java复制代码public class NettyServer {

public static void main(String[] args) throws Exception {
//创建BossGroup 和 WorkerGroup
//说明
//1. 创建两个线程组 bossGroup 和 workerGroup
//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
//3. 两个都是无限循环
//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
// 默认实际 cpu核数 * 2
//
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //bossGroup使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 option主要是针对boss线程组,
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 child主要是针对worker线程组
.childHandler(new ChannelInitializer<SocketChannel>() {//workerGroup使用 SocketChannel创建一个通道初始化对象 (匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
ch.pipeline().addLast(new NettyServerHandler());
}
}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器

System.out.println(".....服务器 is ready...");
//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(7788).sync();
//给cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("服务已启动,端口号为7788...");
} else {
System.out.println("服务启动失败...");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

NioEventLoopGroup是用来处理I/O操作的多线程事件循环器。Netty 提供了许多不同的EventLoopGroup的实现来处理不同的传输。

上面的服务端应用中,有两个NioEventLoopGroup被使用。第一个叫作bossGroup,用来接收进来的连接。第二个叫作workerGroup,用来处理已经被接收的连接,一旦 bossGroup接收连接,就会把连接的信息注册到workerGroup上。

ServerBootstrap是一个NIO服务的引导启动类。可以在这个服务中直接使用Channel。

  • group方法用于 设置EventLoopGroup。
  • 通过Channel方法,可以指定新连接进来的Channel类型为NioServerSocketChannel类。
  • childHandler用于指定ChannelHandler,也就是前面实现的NettyServerHandler。
  • 可以通过option设置指定的Channel来实现NioServerSocketChannel的配置参数。
  • childOption主要设置SocketChannel的子Channel的选项。
  • bind用于绑定端口启动服务。

客户端管道处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class NettyClientHandler extends ChannelInboundHandlerAdapter {

//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client ctx =" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("老板,工资什么时候发给我啊?", CharsetUtil.UTF_8));
}

//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}

//处理异常, 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

客户端主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public class NettyClient {

public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7788).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

客户端只需要一个NioEventLoopGroup就可以了。

测试运行

分别启动服务器 NettyServer 和客户端 NettyClient程序

服务端控制台输出内容:

1
2
3
4
5
ini复制代码.....服务器 is ready...
服务已启动,端口号为7788...
server ctx =ChannelHandlerContext(NettyServerHandler#0, [id: 0xa1b2233c, L:/127.0.0.1:7788 - R:/127.0.0.1:63239])
客户端发送消息是:老板,工资什么时候发给我啊?
客户端地址:/127.0.0.1:63239

客户端控制台输出内容:

1
2
3
4
ini复制代码客户端 ok..
client ctx =ChannelHandlerContext(NettyClientHandler#0, [id: 0x21d6f98e, L:/127.0.0.1:63239 - R:/127.0.0.1:7788])
服务器回复的消息:公司最近账户没啥钱,再等几天吧!
服务器的地址: /127.0.0.1:7788

至此,一个简单的基于Netty开发的服务端和客户端就完成了。

总结

本篇文章主要讲解了 Netty 的工作原理及简单应用。下节我们来讲解 Netty 的编解码。

结尾

我是一个正在被打击还在努力前进的码农。如果文章对你有帮助,记得点赞、关注哟,谢谢!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot(一)—— 快速入门及原理分析

发表于 2021-08-16

这是我参与8月更文挑战的第16天,活动详情查看:8月更文挑战

一、代码实现

1. 创建maven工程,普通的java工程

2. 添加SpringBoot的起步依赖

SpringBoot要求,项目要继承SpringBoot的起步依赖 spring-boot-starter-parent

1
2
3
4
5
xml复制代码     <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>

SpringBoot要集成SpringMVC进行Controller的开发,所以项目要导入web的启动依赖

1
2
3
4
5
6
xml复制代码     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

3. 编写SpringBoot引导类

引导类必须放在与controller等包同级目录

1
2
3
4
5
6
7
8
9
10
11
typescript复制代码 package com.xiaojian;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 ​
 @SpringBootApplication
 public class Application {
     public static void main(String[] args) {
         SpringApplication.run(Application.class);
    }
 ​
 }

4. 编写Controller类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码 package com.xiaojian.controller;
 ​
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 ​
 @Controller
 public class QuickController {
 ​
     @RequestMapping("/quick")
     @ResponseBody
     public String quick(){
         return "hello SpringBoot";
    }
 }

@ResponseBody 必须加上。

5. 测试

浏览器访问 : localhost:8080/quick 显示: hello SpringBoot

二、入门解析

1.SpringBoot代码解析

  1. 所有SpringBoog工程中,都要在pom.xml添加父工程依赖(spring-boot-starter-parent),后面进行源码分析
  2. 以功能为单位,如需要web功能在浏览器进行访问,秩序添加依赖(spring-boot-starter-web)
  3. @SpringBootApplication,声明该类是SpringBoot的引导类 run方法运行SpringBoot引导类,参数是引导类的字节码对象
1
2
3
4
5
6
typescript复制代码 @SpringBootApplication
 public class Application {
     public static void main(String[] args) {
         SpringApplication.run(Application.class);
    }
 }

2.SpringBoot工程热部署(好东西啊)

我们在开发中反复修改类、页面等资源,每次修改后都是需要重新启动才生效,这样每次启动都很麻烦,浪费了大量的时间,我们可以在修改代码后不重启就能生效,在pom.xml中添加如下配置就可以实现这样的功能,我们称之为热部署

1
2
3
4
5
xml复制代码 <!--热部署配置-->
 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
 </dependency>

ps:IDEA进行SpringBoot进行热部署失败的解决方案

  1. Setting -> Compiler -> Build project automatically
  2. 然后 Shift+Ctrl+Alt+/,选择Registry,在列表中找到 compiler.automatically.allow.when.app.running 打钩 完成

三、原理分析

1. 起步依赖原理分析(查看源码)

  • 分析spring-boot-starter-parent
+ spring-boot-starter-parent中也有父依赖spring-boot-starter-dependencies,从中可以看到一部分坐标的版本、依赖管理、插件管理已经定义好,所以我们的SpringBoot工程继承spring-boot-starter-parent后已经具备版本锁定等配置了。 **所以起步依赖的作用就是进行依赖的传递。**
  • 分析spring-boot-starter-web
+ spring-boot-starter-web就是将web开发要使用的spring-web、spring-webmvc等坐标进行了“打包”,这样我们的工程只要引入spring-boot-starter-web起步依赖的坐标就可以进行web开发了,同样体现了依赖传递的作用。在以后需要添加的功能也是同理

2. 自动配置

@SpringBootApplication 注解 相当于配置三个注解

1
2
3
4
5
6
7
8
9
10
11
less复制代码 @SpringBootConfiguration // 相当于 @Configuration ,配置Spring容器(@Bean,注册bean对象,标注在返回某个实例的方法)
 @EnableAutoConfiguration // 自动配置
 @ComponentScan( // 组件扫描,扫描与引导类同级目录包下的bean
     excludeFilters = {@Filter(
     type = FilterType.CUSTOM,
     classes = {TypeExcludeFilter.class}
 ), @Filter(
     type = FilterType.CUSTOM,
     classes = {AutoConfigurationExcludeFilter.class}
 )}
 )

接口EnableAutoConfiguration (重要)

20200717161600

@AutoConfigurationPackage

image-20200717163939015

将主配置类(@SpringBootApplication标注的类)同级目录下的所在包,及下面所有子包里面的所有组件扫描到Spring容器

image-20200717164031337

@Import注解:

通过导入的方式实现把实例加入springIOC容器中

20200710142953

 AutoConfigurationImportSelector:

导入哪些组件的选择器

将所有需要导入的组件以全类名的方式返回;这些组件被添加到容器中。

会给容器中导入非常多的自动配置类(xxxAutoConfiguration);

就是给容器中导入这个场景需要的所有组件,并配置好这些组件。

在该类中获取全类名的方法中,调用 getCandidateConfigurations( … )

image-20200803143353048

loadFactoryNames( … ) ,使用类加载器加载目标位置的资源(也就是需要导入的组件全类名),并存入Properties中返回

image-20200803144045379

目标位置为

image-20200803144309171

自动导入组件全过程图解:

SpringBoot 在启动的时候,从springboot的autoconfigure包的类路径下的META-INF/spring.factories中获取EnableAutoConfiguration指定的值(组件全类名),将这些值作为自动配置类导入到容器中,自动配置类就生效,帮我们进行自动配置工作。

以前我们需要自己配置的东西,自动配置类帮我们做了(对比SSM中的各大配置属性)

SpringBoot自动配置原理

四、SpringBoot项目目录结构

resources文件夹

  • static:保存所有静态资源,js,css,images;
  • templates:保存所有的模板页面;(Spring Boot默认jar包使用嵌入式的Tomcat,默认不支持jsp页面),可以使用模板引擎(freemarker,thymeleaf)
  • application.properties:Spring Boot应用的配置文件;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用各类BeanUtils的时候,切记注意这个坑!

发表于 2021-08-16

在日常开发中,我们经常需要给对象进行赋值,通常会调用其set/get方法,有些时候,如果我们要转换的两个对象之间属性大致相同,会考虑使用属性拷贝工具进行。

如我们经常在代码中会对一个数据结构封装成DO、SDO、DTO、VO等,而这些Bean中的大部分属性都是一样的,所以使用属性拷贝类工具可以帮助我们节省大量的set和get操作。

市面上有很多类似的工具类,比较常用的有

1、Spring BeanUtils 2、Cglib BeanCopier 3、Apache BeanUtils 4、Apache PropertyUtils 5、Dozer 6、MapStucts

这里面我比较建议大家使用的是MapStructs,我在《丢弃掉那些BeanUtils工具类吧,MapStruct真香!!!》中介绍过原因。这里就不再赘述了。

最近我们有个新项目,要创建一个新的应用,因为我自己分析过这些工具的效率,也去看过他们的实现原理,比较下来之后,我觉得MapStruct是最适合我们的,于是就在代码中引入了这个框架。

另外,因为Spring的BeanUtils用起来也比较方便,所以,代码中对于需要beanCopy的地方主要在使用这两个框架。

我们一般是这样的,如果是DO和DTO/Entity之间的转换,我们统一使用MapStruct,因为他可以指定单独的Mapper,可以自定义一些策略。

如果是同对象之间的拷贝(如用一个DO创建一个新的DO),或者完全不相关的两个对象转换,则使用Spring的BeanUtils。

刚开始都没什么问题,但是后面我在写单测的时候,发现了一个问题。

问题

先来看看我们是在什么地方用的Spring的BeanUtils

我们的业务逻辑中,需要对订单信息进行修改,在更改时,不仅要更新订单的上面的属性信息,还需要创建一条变更流水。

而变更流水中同时记录了变更前和变更后的数据,所以就有了以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码//从数据库中查询出当前订单,并加锁
OrderDetail orderDetail = orderDetailDao.queryForLock();

//copy一个新的订单模型
OrderDetail newOrderDetail = new OrderDetail();
BeanUtils.copyProperties(orderDetail, newOrderDetail);

//对新的订单模型进行修改逻辑操作
newOrderDetail.update();

//使用修改前的订单模型和修改后的订单模型组装出订单变更流水
OrderDetailStream orderDetailStream = new OrderDetailStream();
orderDetailStream.create(orderDetail, newOrderDetail);

大致逻辑是这样的,因为创建订单变更流水的时候,需要一个改变前的订单和改变后的订单。所以我们想到了要new一个新的订单模型,然后操作新的订单模型,避免对旧的有影响。

但是,就是这个BeanUtils.copyProperties的过程其实是有问题的。

因为BeanUtils在进行属性copy的时候,本质上是浅拷贝,而不是深拷贝。

浅拷贝?深拷贝?

什么是浅拷贝和深拷贝?来看下概念。

1、浅拷贝:对基本数据类型进行值传递,对引用数据类型进行引用传递般的拷贝,此为浅拷贝。



2、深拷贝:对基本数据类型进行值传递,对引用数据类型,创建一个新的对象,并复制其内容,此为深拷贝。



我们举个实际例子,来看下为啥我说BeanUtils.copyProperties的过程是浅拷贝。

先来定义两个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
arduino复制代码public class Address {
private String province;
private String city;
private String area;
//省略构造函数和setter/getter
}

class User {
private String name;
private String password;
private HomeAddress address;
//省略构造函数和setter/getter
}

然后写一段测试代码:

1
2
3
4
5
6
sql复制代码User user = new User("Hollis", "hollischuang");
user.setAddress(new HomeAddress("zhejiang", "hangzhou", "binjiang"));

User newUser = new User();
BeanUtils.copyProperties(user, newUser);
System.out.println(user.getAddress() == newUser.getAddress());

以上代码输出结果为:true

即,我们BeanUtils.copyProperties拷贝出来的newUser中的address对象和原来的user中的address对象是同一个对象。

可以尝试着修改下newUser中的address对象:

1
2
3
scss复制代码    newUser.getAddress().setCity("shanghai");
System.out.println(JSON.toJSONString(user));
System.out.println(JSON.toJSONString(newUser));

输出结果:

1
2
css复制代码{"address":{"area":"binjiang","city":"shanghai","province":"zhejiang"},"name":"Hollis","password":"hollischuang"}
{"address":{"area":"binjiang","city":"shanghai","province":"zhejiang"},"name":"Hollis","password":"hollischuang"}

可以发现,原来的对象也受到了修改的影响。

这就是所谓的浅拷贝!

如何进行深拷贝

发现问题之后,我们就要想办法解决,那么如何实现深拷贝呢?

1、实现Cloneable接口,重写clone()

在Object类中定义了一个clone方法,这个方法其实在不重写的情况下,其实也是浅拷贝的。

如果想要实现深拷贝,就需要重写clone方法,而想要重写clone方法,就必须实现Cloneable,否则会报CloneNotSupportedException异常。

将上述代码修改下,重写clone方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
typescript复制代码public class Address implements Cloneable{
private String province;
private String city;
private String area;
//省略构造函数和setter/getter

@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
}

class User implements Cloneable{
private String name;
private String password;
private HomeAddress address;
//省略构造函数和setter/getter

@Override
protected Object clone() throws CloneNotSupportedException {
User user = (User)super.clone();
user.setAddress((HomeAddress)address.clone());
return user;
}
}

之后,在执行一下上面的测试代码,就可以发现,这时候newUser中的address对象就是一个新的对象了。

这种方式就能实现深拷贝,但是问题是如果我们在User中有很多个对象,那么clone方法就写的很长,而且如果后面有修改,在User中新增属性,这个地方也要改。

那么,有没有什么办法可以不需要修改,一劳永逸呢?

2、序列化实现深拷贝

我们可以借助序列化来实现深拷贝。先把对象序列化成流,再从流中反序列化成对象,这样就一定是新的对象了。

序列化的方式有很多,比如我们可以使用各种JSON工具,把对象序列化成JSON字符串,然后再从字符串中反序列化成对象。

如使用fastjson实现:

1
ini复制代码User newUser = JSON.parseObject(JSON.toJSONString(user), User.class);

也可实现深拷贝。

除此之外,还可以使用Apache Commons Lang中提供的SerializationUtils工具实现。

我们需要修改下上面的User和Address类,使他们实现Serializable接口,否则是无法进行序列化的。

1
2
java复制代码class User implements Serializable
class Address implements Serializable

然后在需要拷贝的时候:

1
ini复制代码User newUser = (User) SerializationUtils.clone(user);

同样,也可以实现深拷贝啦~!

关于作者:Hollis,一个对Coding有着独特追求的人,阿里巴巴技术专家,《程序员的三门课》联合作者,《Java工程师成神之路》系列文章作者。

关注公众号【Hollis】,后台回复”成神导图”可以咯领取Java工程师进阶思维导图。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MQ整合SpringBoot及延迟队列的优化 Rabbi

发表于 2021-08-16

这是我参与8月更文挑战的第16天,活动详情查看:8月更文挑战


相关文章

RabbitMQ系列汇总:RabbitMQ系列


前言

  • 新建SpringBoot项目,具体方法参考文章 创建一个SpringBoot项目的两种方式
  • 目录结构如下
+ ![image-20210806182220682.png](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/babf1ca46b00a9682f899abb31279b0df90e4ef0db59446938aa45ccc57d4c92)
  • 代码架构图
  • image-20210806182248690.png
  • 创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S。
  • 然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct。
  • 创建一个死信队列 QD。

一、延迟队列

  • pom引入jar
+ 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码	<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
  • 将application.proprties后缀改为yml
+ 
1
2
3
4
5
6
java复制代码spring:
rabbitmq:
host: IP地址
port: 5672
username: admin
password: 111111
  • 添加Swagger config类
+ 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/**
* Swagger配置类
* @author DingYongJun
* @date 2021/8/6
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}

private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("dayu", "https://juejin.cn/user/2084329779387864/posts",
"773530472@qq.com"))
.build();
}
}
  • MQ config
+ 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
java复制代码import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* MQ配置类
* @author DingYongJun
* @date 2021/8/6
*/
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";

// 声明 xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}

// 声明 xExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}

//声明队列 A ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}

// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}

//声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}

//声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}

//声明死信队列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}

//声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
  • controller模拟生产者
+ 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
* controller模拟生产者
* @author DingYongJun
* @date 2021/8/6
*/
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
}
}
  • 消费者
+ 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

/**
* 消费者
* @author DingYongJun
* @date 2021/8/6
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
  • 咋样,整合了SpringBoot后,是不是简单了很多?
  • 准备就绪,我们来启动项目,在浏览器中输入
+ 
1
java复制代码http://localhost:8080/ttl/sendMsg/你好呀大鱼
  • 执行结果
+ ![image-20210806183209464.png](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/efd29fb78a0861681658e5e1709ce06821b2ef06e6b84592218a0e56a47c28a2)
+ 算下时间,第一条时间间隔10s。
+ 第二条为40s。
+ 完美符合我们的设计预期!
+ 第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

二、优化延迟队列

  • 按照上面的设计,如果我们现在需要增加一个50秒延迟的消息,咋办?
  • 难不成我们再在增加一个MQ config?这也太不智能了吧!
  • 如果是预定会议室这种需求,岂不是需要增加无数个队列?
  • 所以下面我们需要对上面这种结构进行优化!
  • 代码架构图
+ ![image-20210806183552128.png](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/73e775820f011942e49949396e7997752bf90f377648fc8fbec933196caff949)
  • 新增MQ config
+ 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
/**
* 新的MQ配置类
* @author DingYongJun
* @date 2021/8/6
*/
@Component
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";

//声明队列 C 死信交换机
@Bean("queueC")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//没有声明 TTL 属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}

//声明队列 B 绑定 X 交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
  • controller
+ 
1
2
3
4
5
6
7
8
9
10
java复制代码    @RequestMapping(value = "sendExpirationMsg", method = RequestMethod.GET)
public void sendMsg(@RequestParam Map<String,Object> parmsMap) {
String message = parmsMap.get("message").toString();
String ttlTime = parmsMap.get("ttlTime").toString();
rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}
+ 这里我们使用Map来接受参数了,毕竟Map无敌好用呀~
  • 浏览器输入
+ 
1
2
java复制代码http://localhost:8080/ttl/sendExpirationMsg?message=你好呀大鱼先生&ttlTime=50000
http://localhost:8080/ttl/sendExpirationMsg?message=你好呀大鱼先生&ttlTime=5000
  • 执行结果
+ ![image-20210806192458906.png](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/c0fd45a97983cf08341cbdfc86a45e79716fcef839b275426be8f04b7b6b5b81)
+ 完美达到要求!过期时间自定义化了!这才是智能呀!

三、MQ插件实现延迟队列

  • 上面的功能看起来似乎没啥问题对吧?
  • 但是我们要知道,MQ只会检查第一个消息是否过期,如果第一个消息延迟很长时间
  • 会导致我们第二消息不回优先得到执行
  • 因为队列是先进先出,没毛病吧?
  • 眼见为实
  • image-20210806192926498.png
  • 很明显看出
+ 我们想要的是第二个5秒延迟的先行消费,然后才是50秒延迟的那条消息。
+ 实际上MQ是等第一条执行完,才立马执行第二条。
+ 这就是先进先出的原因。
  • MQ的插件帮我们解决了这个问题
  • 首先下载插件 rabbitmq_delayed_message_exchange
  • 一定要看好版本哦~否则会报版本错误。
  • 然后上传到服务器中的 /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins目录下
  • 执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 安装
  • 重启MQ systemctl restart rabbitmq-server
  • 看下我们的MQ后台页面
  • image-20210806215903919.png
  • 出现了x-delayed-message的交换机类型,恭喜你启用插件成功啦!
  • MQ config
+ 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}

//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
//自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
args);
}

@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange
delayedExchange) {
return
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
  • controller
+ 
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new
Date(), delayTime, message);
}
  • 执行
+ 
1
2
java复制代码http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
  • 执行结果
+ ![image-20210806220520327.png](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/592d142511013a4e2824a28f715b50ddd1109a48a073af13bc632c5f5a197662)
  • 解决!!!

路漫漫其修远兮,吾必将上下求索~

如果你认为i博主写的不错!写作不易,请点赞、关注、评论给博主一个鼓励吧~hahah

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…561562563…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%