开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Win10安装MySQL5和MySQL8

发表于 2021-06-15
  1. 下载数据库,配置环境变量

因为是安装两个MySQL数据库,端口号要不一样,MySQL默认端口号是3306,建议先配置非默认端口号,以免出现问题

1.1 官网下载5.7和8.0的压缩包

  • 我下载的是5.7.23 和8.0.23

image-20210511221253559

image-20210511221559191

image-20210511221623829

1.2 解压到目录下

  • 将压缩包解压到你要安装的目录,我这里是放到D:\develop\MySQL文件夹下

image-20210511232856979

1.3 配置系统变量

在系统变量中添加MYSQL5_HOME和MYSQL8_HOME两个变量,变量值就是你的解压路径,然后在Path中将这两个的bin添加进去,完成后保存即可

image-20210511232346352

image-20210511232552106

  1. 安装MySQL8.0

2.1 创建配置文件

在根目录下创建my.ini的文件,文件的编码要是ANSI,然后再编辑配置文件保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ini复制代码[mysql]  
# 设置 mysql 客户端默认字符集
default-character-set=utf8

[mysqld]
#设置端口
port = 3307

# 设置 mysql 的安装目录
basedir=D:\develop\MySQL\mysql-8.0.23-winx64

# 设置 mysql 数据库的数据的存放目录
datadir=D:\develop\MySQL\mysql-8.0.23-winx64\data

# 允许最大连接数
max_connections=10000

# 允许最大连接人数
max_user_connections=1000

# 服务端使用的字符集默认为 8 比特编码的 latin1 字符集
character-set-server=utf8

# 创建新表时将使用的默认存储引擎
#default-storage-engine=MyISAM
default-storage-engine=InnoDB

# 连接时间
wait_timeout=31536000
interactive_timeout=31536000

2.2 初始化MySQL

确保根目录下没有data文件夹,如果有,就先删除,然后cmd进入到bin目录下

image-20210511233917085

执行mysqld --initialize

  • 如果执行mysqld --initialize-insecure,则创建的root默认没有密码

image-20210511233955289

完成初始化,自动在根目录下创建一个data文件夹

image-20210511234017435

此时,mysql的root用户密码默认是空的,如果你没有加上-insecure,则会分配随机密码,密码在data目录下的xxx.err文件里:

image-20210511234228371

由于我的默认空密码,所以没有显示,否则会在这末尾有显示

image-20210511234316425

2.3 安装MySQL服务

在命令行中输入mysqld --install MYSQL80(MYSQL80是服务名称,可以自定义,必须要加上,因为默认的是MYSQL,如果不加,两个数据库就会冲突了)

image-20210511234836288

2.4 开启MySQL服务

在命令行中输入net start MYSQL80开启MySQL8.0服务

如果输入命令后提示无法开启,如图:

image-20210511235001042

应该是服务配置有问题,打开系统的服务配置,找到MYSQL80

image-20210511235102129

右击查看属性,很明显,这路径错了(应该是以前安装过MySQL,没删除干净,导致缓存)

image-20210511235235092

不过没关系,我们可以手动修改

image-20210511235315742

成功启动!

image-20210511235443037

2.5 修改初始化密码

命令行输入mysql -u root -P 3307 -p(-P是代表端口号,-p代表密码,-u代表用户)进入MySQL,由于我没有密码,所以直接回车进入,如果有密码,将生成的密码填入即可

image-20210511235640737

然后修改密码,输入ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '123456';

image-20210512000447763

最后刷新权限,输入flush privileges

image-20210512000503612

到此,MySQL8就安装成功了

  1. 安装MySQL5.7

同理,和安装MySQL8.0基本是一样的,只是有一些细微差别

3.1 端口号要不同

MySQL5.7对于我来说比较常用,所以我将它设置为3306默认端口号,同时路径也要修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
in复制代码[mysql]  
# 设置 mysql 客户端默认字符集
default-character-set=utf8

[mysqld]
#设置端口
port = 3306

# 设置 mysql 的安装目录
basedir=D:\develop\MySQL\mysql-5.7.23-winx64

# 设置 mysql 数据库的数据的存放目录
datadir=D:\develop\MySQL\mysql-5.7.23-winx64\data

# 允许最大连接数
max_connections=10000

# 允许最大连接人数
max_user_connections=1000

# 服务端使用的字符集默认为 8 比特编码的 latin1 字符集
character-set-server=utf8

# 创建新表时将使用的默认存储引擎
#default-storage-engine=MyISAM
default-storage-engine=InnoDB

# 连接时间
wait_timeout=31536000
interactive_timeout=31536000

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用vs建立C/C++项目

发表于 2021-06-15

最近有粉丝私信我,说看到我文章中的截图看起来好像界面和它的不一样?当我建议他们用vs编程的时候,他们说vs建立项目感觉好复杂,不会弄?其实真的是很简单的,所以在这里给大家做个简单的介绍。那就以我的计算机上VS2017为示范。ps:前提是你在安装C++的时候,装上了有关C++的模块,这样才好顺利建立一个项目。

特此声明:此文适合C/C++初学者,或者首次使用vs编译器写C/C++的同学们学习阅读!

好,首先第一步:我们打开VS,首页再工具栏上会有文件,编辑等等一系列的文字提示,我们点击 文件–》项目。如下图所示:

在这里插入图片描述

在这里插入图片描述

因为我们在之前预装了Vistual C++,所以在这里选择Vistual C++,然后点击空项目,底下的名称可以自己命名,我此处就用默认的Project1来命名了,然后位置可以进行选择,之后点击确定。建议大家专门去分一个存放代码的盘,这样可以保证我们以后找代码比较方便。

在这里插入图片描述

在这里插入图片描述

然后我们到了现在如图这样一个样子:

在这里插入图片描述

在这里插入图片描述

接着我们点击源文件,添加新建项,此时需要注意:

在这里插入图片描述

在这里插入图片描述

假如我们要添加一个C++文件,那自然应该添加后缀为cpp的文件,点击C++文件,然后下面的名称可以自己命名,命名之后点击添加,就完成了cpp源文件的添加:

在这里插入图片描述

在这里插入图片描述

接着说一下,如何添加C语言类型的文件,其实也很简单,上述操作步骤不变,一直到修改名称时候,把.cpp改成.c然后进行添加即可:

在这里插入图片描述

在这里插入图片描述

添加成功源文件,那还得添加头文件啊,我们都知道头文件都是.h文件,所以不管是C或者C++都是一样的。和添加源文件步骤类似,在头文件文件夹出右键,然后点击添加新建项:

在这里插入图片描述

在这里插入图片描述

然后我们点击Visual C++,然后选择头文件,然后下面的名称,我们可以修改,也可以不改名字,进行添加:

在这里插入图片描述

在这里插入图片描述

所有内容都添加完毕之后,我们就可以写代码来测试看看效果怎么样啦,写一个“Hello KookNut”吧!在这里我就拿.cpp举例了嗷,学习C语言的同学,我相信到这里写个Hello World不存在任何问题吧!

头文件中包含以下内容

1
arduino复制代码#pragma once#include<iostream>using namespace std;

源文件中包含以下内容

1
java复制代码#include"Demo.h"int main(){    cout << "Hello KookNut39" << endl;    return 0;}

我们来运行,看一下结果:

在这里插入图片描述

在这里插入图片描述

至此,我们完成了一个简单的C++项目,输出了我们要的字符串。其实用VS建立一个工程不是难点,然后安装VS环境也可以在本站搜到很多很详细的教程,作为初学者,我们主要是学习C/C++基本语法和知识点,有关这些,可以看看我之前的文章欧,点击这里,打下良好的语法基础之后,我们就可以在以后面对开发大型项目的过程中也能够游刃有余!!!如果文章对您有帮助,还希望赏作者一键三连+评论留言

最后吟诗一句:“千磨万击还坚劲,任尔东西南北风”

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

最详细的MyBatis批量添加、更新、删除实战篇(日常开发必

发表于 2021-06-15

前言

工作中,经常会遇到很多批量操作的需求:批量添加、批量更新、批量删除、批量导入、批量审核等等,下面这篇文章我们将一一复现,首先我们先了解一下mybatis的标签foreach循环:

一、MybatIs标签foreach

foreach的主要用在构建in条件中,它可以在SQL语句中进行迭代一个集合。
foreach元素的属性主要有 item,index,collection,open,separator,close。

1》item集合中每一个元素进行迭代时的别名

2》index表示在迭代过程中,每次迭代到的位置

3》open该语句以什么开始

4》separator在每次进行迭代之间以什么符号作为分隔符

5》close以什么结束

1.collection属性主要有一下3种情况:

1.1 如果传入的是单参数且参数类型是一个List的时候,collection属性值为list

1.2 如果传入的是单参数且参数类型是一个array数组的时候,collection的属性值为array

1.3 如果传入的参数是多个的时候,我们就需要把它们封装成一个Map了

二、批量添加

当传入参数是list集合的时候:

2.1 Mapper.xml

1
2
3
4
5
6
7
8
java复制代码<!--批量插入员工数据-->
<insert id="saveEmp" parameterType="java.util.List">
INSERT INTO t_employee(id, name, age, salary, department_id,update_time)
VALUES
<foreach collection="list" item="item" index="index" separator=",">
(#{item.id},#{item.name},#{item.age},#{item.salary},#{item.departmentId},now())
</foreach>
</insert>

2.2 Controller层

1
2
3
4
5
java复制代码@PostMapping("saveBath")
@ResponseBody
public CommonResult<Employee> saveBath(@RequestBody List<Employee> employeeList){
return employeeService.saveEmp(employeeList);
}

@ResponseBody:返回Json数据 @RequestBody:接受Json数据

2.3 Json数组集合数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码[
{
"id": 1,
"name": "DT测试1",
"age": 26,
"salary": 10000.0,
"departmentId": 1
},
{
"id": 2,
"name": "DT测试2",
"age": 26,
"salary": 10000.0,
"departmentId": 2
}
]

三、批量更新

1.Mapper.xml

1.1 批量更新第一种方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码<update id="updateBatch" parameterType="java.util.List" >
<foreach collection="list" item="item" index="index" separator=";">
UPDATE t_employee
<set>
<if test="item.name != null and item.name != ''" >
name = #{item.name},
</if>
<if test="item.age != null" >
age = #{item.age},
</if>
<if test="item.salary != null" >
salary = #{item.salary},
</if>
<if test="item.salary != null" >
salary = #{item.departmentId},
</if>
</set>
where id = #{item.id}
</foreach>
</update>

记得连接数据库加:

1
java复制代码allowMultiQueries=true

不然会报如下错误:

You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘UPDATE t_employee

MySQL连接数据库时,添加语句:“allowMultiQueries=true”的作用:
1.可以在sql语句后携带分号,实现多语句执行。
2.可以执行批处理,同时发出多个SQL语句。

这种方式就是通过SQL拼接,单条单条的进行更新。

1.2 批量更新第二种方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码<update id="updateBatch" parameterType="java.util.List" >
update t_employee
<trim prefix="set" suffixOverrides=",">
<trim prefix="name=case" suffix="end,">
<foreach collection="list" item="i" index="index">
<if test="i.name != null and i.name != ''">
when id=#{i.id} then #{i.name}
</if>
</foreach>
</trim>
<trim prefix="age=case" suffix="end,">
<foreach collection="list" item="i" index="index">
<if test="i.age != null">
when id=#{i.id} then #{i.age}
</if>
</foreach>
</trim>
</trim>
where
<foreach collection="list" separator="or" item="i" index="index" >
id = #{i.id}
</foreach>
</update>

实际上是通过case when语句进行批量更新,只要一条SQL语句:

在这里插入图片描述

当然除了上面两种方式外,还可通过如下:

1
java复制代码批量更新第三种方法,用ON DUPLICATE KEY UPDATE,就是一个批量插入操作,在插入的时候,如果已存在,则更新,所以可以变相达到批量修改的效果。

一般不推荐这种更新大数据量的SQL,关于这种方式小编前面的文章也有说过使用方式,这里不再赘述。

注意:上面的方式是针对多个字段的情况,如果只是更新单个字段,可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码<!-- 批量更新第二种方法,针对单个字段进行批量更新 -->
<update id="updateBatch" parameterType="java.util.List">
UPDATE t_employee
SET name = CASE
<foreach collection="list" item="item" index="index">
WHEN id = #{item.id} THEN #{item.name}
</foreach>
END
WHERE id IN
<foreach collection="list" index="index" item="item" open="(" separator="," close=")">
#{item.id}
</foreach>
</update>

2.Controller层

1
2
3
4
5
java复制代码 @PostMapping("updateBatch")
@ResponseBody
public CommonResult<Employee> updateBatch(@RequestBody List<Employee> employeeList){
return employeeService.updateBatch(employeeList);
}

3.Json集合数据

1
2
3
4
5
6
7
8
9
10
11
12
json复制代码[
{
"id": 1,
"name": "DT测试111",
"age": 2611
},
{
"id": 2,
"name": "DT测试211",
"age": 2611
}
]

四、批量删除

  1. 传入的是List数组对象

1.Mapper.xml

1
2
3
4
5
6
java复制代码<delete id="deleteBath" parameterType="java.util.List">
DELETE FROM t_employee WHERE id IN
<foreach collection="list" item="item" open="(" separator="," close=")">
#{item.id}
</foreach>
</delete>

2.Controller层

1
2
3
4
5
java复制代码@PostMapping("deleteBath")
@ResponseBody
public CommonResult<Employee> deleteBath(@RequestBody List<Employee> employeeList){
return employeeService.deleteBath(employeeList);
}

3.Json集合数据

1
2
3
4
5
6
7
8
java复制代码[
{
"id": 1
},
{
"id": 2
}
]
  1. 传入的是数组

1.Mapper.xml

1
2
3
4
5
6
java复制代码<delete id="deleteBath" parameterType="java.util.Arrays">
DELETE FROM t_employee WHERE id IN
<foreach collection="array" item="ids" open="(" separator="," close=")">
#{ids}
</foreach>
</delete>

2.Controller层

1
2
3
4
5
java复制代码@PostMapping("deleteBath")
@ResponseBody
public CommonResult<Employee> deleteBath(@RequestBody int[] ids){
return employeeService.deleteBath(ids);
}

3.Json数组

1
java复制代码[1,2]
  1. 传入的是Map集合

1.Mapper.xml

1
2
3
4
5
6
java复制代码<delete id="deleteBath" parameterType="java.util.Map">
DELETE FROM t_employee WHERE id IN
<foreach collection="ids" item="item" open="(" separator="," close=")">
#{item}
</foreach>
</delete>
1
java复制代码int deleteBath(@Param("ids") Map<String, Integer> ids);

2.Controller层

1
2
3
4
5
6
7
8
9
java复制代码 @PostMapping("deleteBath")
@ResponseBody
public CommonResult<Employee> deleteBath(@RequestBody Map<String,Object> map){
// 接收List
List<Integer> ids = (List<Integer>) map.get("ids");
Map<String, Integer> stringMap = new HashMap<>();
ids.forEach(id -> stringMap.put("ids", id));
return employeeService.deleteBath(stringMap);
}

3.map数据

1
2
3
java复制代码{
"ids": [1,2]
}

五、批量查询

1.Mapper.xml

1
2
3
4
5
6
java复制代码<select id="findBath" resultType="com.dt.springbootdemo.entity.Employee" parameterType="com.dt.springbootdemo.entity.Employee">
SELECT * FROM t_employee WHERE id IN
<foreach collection="list" item="item" open="(" separator="," close=")">
#{item.id}
</foreach>
</select>

2.Controller层

1
2
3
4
5
java复制代码@GetMapping("findBath")
@ResponseBody
public CommonResult<List<Employee>> findBath(@RequestBody List<Employee> employeeList){
return employeeService.findBath(employeeList);
}

3.Json集合数据

1
2
3
4
5
6
7
8
java复制代码[
{
"id": 1
},
{
"id": 2
}
]

至于其它的数据格式就不再赘述了,很简单,变一下数据格式就可以了:

总结

本篇完结!熬夜干货,创作不易,动动小手点赞吧!!!!后面会继续输出更多干货给大家,喜欢的请关注小编CSDN:blog.csdn.net/qq_41107231 以及掘金:juejin.cn/user/394024…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java JDK 动态代理的原理其实很简单 前言 目录

发表于 2021-06-15

这是我参与更文挑战的第3天,活动详情查看: 更文挑战

请点赞关注,你的支持对我意义重大。

🔥 Hi,我是小彭。本文已收录到 GitHub · AndroidFamily 中。这里有 Android 进阶成长知识体系,有志同道合的朋友,关注公众号 [彭旭锐] 带你建立核心竞争力。

前言

  • 代理模式(Proxy Pattern)也称委托模式(Delegate Pattern),是一种结构型设计模式,也是一项基础设计技巧;
  • 其中,动态代理有很多有意思的应用场景,比如 AOP、日志框架、全局性异常处理、事务处理等。这篇文章,我们主要讨论最基本的 JDK 动态代理。

目录


前置知识

这篇文章的内容会涉及以下前置 / 相关知识,贴心的我都帮你准备好了,请享用~

  • Java 反射机制(含 Kotlin)
  • 关于 Java 泛型能问的都在这里了(含Kotlin)

  1. 概述

  • 什么是代理 (模式)? 代理模式 (Proxy Pattern) 也称委托模式 (Deletage Pattern),属于结构型设计模式,也是一项基本的设计技巧。通常,代理模式用于处理两种问题:
    • 1、控制对基础对象的访问
    • 2、在访问基础对象时增加额外功能

这是两种非常朴素的场景,正因如此,我们常常会觉得其它设计模式中存在代理模式的影子。UML 类图和时序图如下:

  • 代理的基本分类: 静态代理 + 动态代理,分类的标准是 “代理关系是否在编译期确定;
  • 动态代理的实现方式: JDK、CGLIB、Javassist、ASM

  1. 静态代理

2.1 静态代理的定义

静态代理是指代理关系在编译期确定的代理模式。使用静态代理时,通常的做法是为每个业务类抽象一个接口,对应地创建一个代理类。举个例子,需要给网络请求增加日志打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
typescript复制代码1、定义基础接口
public interface HttpApi {
String get(String url);
}

2、网络请求的真正实现
public class RealModule implements HttpApi {
@Override
public String get(String url) {
return "result";
}
}

3、代理类
public class Proxy implements HttpApi {
private HttpApi target;

Proxy(HttpApi target) {
this.target = target;
}

@Override
public String get(String url) {
// 扩展的功能
Log.i("http-statistic", url);
// 访问基础对象
return target.get(url);
}
}

2.2 静态代理的缺点

  • 1、重复性: 需要代理的业务或方法越多,重复的模板代码越多;
  • 2、脆弱性: 一旦改动基础接口,代理类也需要同步修改(因为代理类也实现了基础接口)。

  1. 动态代理

3.1 动态代理的定义

动态代理是指代理关系在运行时确定的代理模式。需要注意,JDK 动态代理并不等价于动态代理,前者只是动态代理的实现之一,其它实现方案还有:CGLIB 动态代理、Javassist 动态代理和 ASM 动态代理等。因为代理类在编译前不存在,代理关系到运行时才能确定,因此称为动态代理。

3.2 JDK 动态代理示例

我们今天主要讨论JDK 动态代理(Dymanic Proxy API),它是 JDK1.3 中引入的特性,核心 API 是 Proxy 类和 InvocationHandler 接口。它的原理是利用反射机制在运行时生成代理类的字节码。

我们继续用打印日志的例子,使用动态代理时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
typescript复制代码public class ProxyFactory {
public static HttpApi getProxy(HttpApi target) {
return (HttpApi) Proxy.newProxyInstance(
target.getClass().getClassLoader(),
target.getClass().getInterfaces(),
new LogHandler(target));
}

private static class LogHandler implements InvocationHandler {
private HttpApi target;

LogHandler(HttpApi target) {
this.target = target;
}
// method底层的方法无参数时,args为空或者长度为0
@Override
public Object invoke(Object proxy, Method method, @Nullable Object[] args)
throws Throwable {
// 扩展的功能
Log.i("http-statistic", (String) args[0]);
// 访问基础对象
return method.invoke(target, args);
}
}
}

如果需要兼容多个业务接口,可以使用泛型:

1
2
3
4
5
6
7
8
9
10
11
12
13
typescript复制代码public class ProxyFactory {
@SuppressWarnings("unchecked")
public static <T> T getProxy(T target) {
return (T) Proxy.newProxyInstance(
target.getClass().getClassLoader(),
target.getClass().getInterfaces(),
new LogHandler(target));
}

private static class LogHandler<T> implements InvocationHandler {
// 同上
}
}

客户端调用:

1
2
ini复制代码HttpAPi proxy = ProxyFactory.getProxy<HttpApi>(target);
OtherHttpApi proxy = ProxyFactory.getProxy<OtherHttpApi>(otherTarget);

通过泛型参数传递不同的类型,客户端可以按需实例化不同类型的代理对象。基础接口的所有方法都统一到 InvocationHandler#invoke() 处理。静态代理的两个缺点都得到解决:

  • 1、重复性:即使有多个基础业务需要代理,也不需要编写过多重复的模板代码;
  • 2、脆弱性:当基础接口变更时,同步改动代理并不是必须的。

3.3 静态代理 & 动态代理对比

  • 共同点:两种代理模式实现都在不改动基础对象的前提下,对基础对象进行访问控制和扩展,符合开闭原则。
  • 不同点:静态代理存在重复性和脆弱性的缺点;而动态代理(搭配泛型参数)可以实现了一个代理同时处理 N 种基础接口,一定程度上规避了静态代理的缺点。从原理上讲,静态代理的代理类 Class 文件在编译期生成,而动态代理的代理类 Class 文件在运行时生成,代理类在 coding 阶段并不存在,代理关系直到运行时才确定。

  1. JDK 动态代理源码分析

这一节,我们来分析 JDK 动态代理的源码,核心类是 Proxy,主要分析 Proxy 如何生成代理类,以及如何将方法调用统一分发到 InvocationHandler 接口。

4.1 API 概述

Proxy 类主要包括以下 API:

Proxy 描述
getProxyClass(ClassLoader, Class…) : Class 获取实现目标接口的代理类 Class 对象
newProxyInstance(ClassLoader,Class<?>[],InvocationHandler) : Object 获取实现目标接口的代理对象
isProxyClass(Class<?>) : boolean 判断一个 Class 对象是否属于代理类
getInvocationHandler(Object) : InvocationHandler 获取代理对象内部的 InvocationHandler

4.2 核心源码

Proxy.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ini复制代码1、获取代理类 Class 对象
public static Class<?> getProxyClass(ClassLoader loader,Class<?>... interfaces){
final Class<?>[] intfs = interfaces.clone();
...
1.1 获得代理类 Class 对象
return getProxyClass0(loader, intfs);
}

2、实例化代理类对象
public static Object newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h){
...
final Class<?>[] intfs = interfaces.clone();
2.1 获得代理类 Class对象
Class<?> cl = getProxyClass0(loader, intfs);
...
2.2 获得代理类构造器 (接收一个 InvocationHandler 参数)
// private static final Class<?>[] constructorParams = { InvocationHandler.class };
final Constructor<?> cons = cl.getConstructor(constructorParams);
final InvocationHandler ih = h;
...
2.3 反射创建实例
return newInstance(cons, ih);
}

可以看到,实例化代理对象也需要先通过 getProxyClass0(…) 获取代理类 Class 对象,而 newProxyInstance(…) 随后会获取参数为 InvocationHandler 的构造函数实例化一个代理类对象。

我们先看下代理类 Class 对象是如何获取的:

Proxy.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码-> 1.1、2.1 获得代理类 Class对象
private static Class<?> getProxyClass0(ClassLoader loader,Class<?>... interfaces) {
...
从缓存中获取代理类,如果缓存未命中,则通过ProxyClassFactory生成代理类
return proxyClassCache.get(loader, interfaces);
}

private static final class ProxyClassFactory implements BiFunction<ClassLoader, Class<?>[], Class<?>>{

3.1 代理类命名前缀
private static final String proxyClassNamePrefix = "$Proxy";

3.2 代理类命名后缀,从 0 递增(原子 Long)
private static final AtomicLong nextUniqueNumber = new AtomicLong();

@Override
public Class<?> apply(ClassLoader loader, Class<?>[] interfaces) {
Map<Class<?>, Boolean> interfaceSet = new IdentityHashMap<>(interfaces.length);
3.3 参数校验
for (Class<?> intf : interfaces) {
// 验证参数 interfaces 和 ClassLoder 中加载的是同一个类
// 验证参数 interfaces 是接口类型
// 验证参数 interfaces 中没有重复项
// 否则抛出 IllegalArgumentException
}
// 验证所有non-public接口来自同一个包

3.4(一般地)代理类包名
// public static final String PROXY_PACKAGE = "com.sun.proxy";
String proxyPkg = ReflectUtil.PROXY_PACKAGE + ".";

3.5 代理类的全限定名
long num = nextUniqueNumber.getAndIncrement();
String proxyName = proxyPkg + proxyClassNamePrefix + num;

3.6 生成字节码数据
byte[] proxyClassFile = ProxyGenerator.generateProxyClass(proxyName, interfaces);

3.7 从字节码生成 Class 对象
return defineClass0(loader, proxyName,proxyClassFile, 0, proxyClassFile.length);
}
}

-> 3.6 生成字节码数据
public static byte[] generateProxyClass(final String var0, Class[] var1) {
ProxyGenerator var2 = new ProxyGenerator(var0, var1);
...
final byte[] var3 = var2.generateClassFile();
return var3;
}

ProxyGenerator.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
csharp复制代码private byte[] generateClassFile() {
3.6.1 只代理Object的hashCode、equals和toString
this.addProxyMethod(hashCodeMethod, Object.class);
this.addProxyMethod(equalsMethod, Object.class);
this.addProxyMethod(toStringMethod, Object.class);

3.6.2 代理接口的每个方法
...
for(var1 = 0; var1 < this.interfaces.length; ++var1) {
...
}

3.6.3 添加带有 InvocationHandler 参数的构造器
this.methods.add(this.generateConstructor());
var7 = this.proxyMethods.values().iterator();
while(var7.hasNext()) {
...
3.6.4 在每个代理的方法中调用InvocationHandler#invoke()
}

3.6.5 输出字节流
ByteArrayOutputStream var9 = new ByteArrayOutputStream();
DataOutputStream var10 = new DataOutputStream(var9);
...
return var9.toByteArray();
}

以上代码已经非常简化了,主要关注核心流程:JDK 动态代理生成的代理类命名为 com.sun.proxy$Proxy[从0开始的数字](例如:com.sun.proxy$Proxy0),这个类继承自 java.lang.reflect.Proxy。其内部还有一个参数为 InvocationHandler 的构造器,对于代理接口的方法调用都会分发到 InvocationHandler#invoke()。

UML 类图如下,需要注意图中红色箭头,表示代理类和 HttpApi 接口的代理关系在运行时才确定:

提示: Android 系统中生成字节码和从字节码生成 Class 对象的步骤都是 native 方法:

  • private static native Class<?> generateProxy(…)
  • 对应的native方法:dalvik/vm/native/java_lang_reflect_Proxy.cpp

4.3 查看代理类源码

可以看到,ProxyGenerator#generateProxyClass() 其实是一个静态 public 方法,所以我们直接调用,并将代理类 Class 的字节流写入磁盘文件,使用 IntelliJ IDEA 的反编译功能查看源代码。

输出字节码:

1
2
3
4
5
6
7
8
9
10
11
ini复制代码byte[] classFile = ProxyGenerator.generateProxyClass("$proxy0",new Class[]{HttpApi.class});
// 直接写入项目路径下,方便使用IntelliJ IDEA的反编译功能
String path = "/Users/pengxurui/IdeaProjects/untitled/src/proxy/HttpApi.class";
try(FileOutputStream fos = new FileOutputStream(path)){
fos.write(classFile);
fos.flush();
System.out.println("success");
} catch (Exception e){
e.printStackTrace();
System.out.println("fail");
}

反编译结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
scala复制代码public final class $proxy0 extends Proxy implements HttpApi {
//反射的元数据Method存储起来,避免重复创建
private static Method m1;
private static Method m2;
private static Method m3;
private static Method m0;

public $proxy0(InvocationHandler var1) throws {
super(var1);
}

/**
* Object#hashCode()
* Object#equals(Object)
* Object#toString()
*/

// 实现了HttpApi接口
public final String get() throws {
try {
//转发到Invocation#invoke()
return (String)super.h.invoke(this, m3, (Object[])null);
} catch (RuntimeException | Error var2) {
throw var2;
} catch (Throwable var3) {
throw new UndeclaredThrowableException(var3);
}
}

static {
try {
//Object#hashCode()
//Object#equals(Object)
//Object#toString()
m3 = Class.forName("HttpApi").getMethod("get");
} catch (NoSuchMethodException var2) {
throw new NoSuchMethodError(var2.getMessage());
} catch (ClassNotFoundException var3) {
throw new NoClassDefFoundError(var3.getMessage());
}
}
}

4.4 常见误区

  • 基础对象必须实现基础接口,否则不能使用动态代理

这个想法可能来自于一些没有实现任何接口的类,因此就没有办法得到接口的Class对象作为Proxy#newProxyInstance() 的参数,这确实会带来一些麻烦,举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
kotlin复制代码package com.domain;
public interface HttpApi {
String get();
}

// 另一个包的non-public接口
package com.domain.inner;
/**non-public**/interface OtherHttpApi{
String get();
}

package com.domain.inner;
// OtherHttpApiImpl类没有实现HttpApi接口或者没有实现任何接口
public class OtherHttpApiImpl /**extends OtherHttpApi**/{
public String get() {
return "result";
}
}

// Client:
HttpApi api = (HttpApi) Proxy.newProxyInstance(...}, new InvocationHandler() {
OtherHttpApiImpl impl = new OtherHttpApiImpl();

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// TODO:扩展的新功能
// IllegalArgumentException: object is not an instance of declaring class
return method.invoke(impl,args);
}
});
api.get();

在这个例子里,OtherHttpApiImpl 类因为历史原因没有实现 HttpApi 接口,虽然方法签名与 HttpApi 接口的方法签名完全相同,但是遗憾,无法完成代理。也有补救的办法,找到 HttpApi 接口中签名相同的 Method,使用这个 Method 来转发调用。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码HttpApi api = (HttpApi) Proxy.newProxyInstance(...}, new InvocationHandler() {
OtherHttpApiImpl impl = new OtherHttpApiImpl();

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// TODO:扩展的新功能
if (method.getDeclaringClass() != impl.getClass()) {
// 找到相同签名的方法
Method realMethod = impl.getClass().getDeclaredMethod(method.getName(), method.getParameterTypes());
return realMethod.invoke(impl, args);
}else{
return method.invoke(impl,args);
}
}
});

  1. 总结

今天,我们讨论了静态代理和动态代理两种代理模式,静态代理在设计模式中随处可见,但存在重复性和脆弱性的缺点,动态代理的代理关系在运行时确定,可以实现一个代理处理 N 种基础接口,一定程度上规避了静态代理的缺点。在我们熟悉的一个网络请求框架中,就充分利用了动态代理的特性,你知道是在说哪个框架吗?


参考资料

  • Android源码设计模式解析与实战 —— 何红辉,关爱民 著
  • 修炼Java开发技术:在架构中体验设计模式和算法之美 —— 于广 著
  • 深入理解Android内核设计思想 —— 林学森 著
  • Wikipedia:Aspect-oriented programming
  • Explore the Dynamic Proxy API —— Jeremy Blosser 著
  • Generically chain dynamic proxies —— Srijeeb Roy 著
  • 动态代理竟然如此简单! —— cxuan 著

我是小彭,带你构建 Android 知识体系。技术和职场问题,请关注公众号 [彭旭锐] 私信我提问。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MongoDB 不专业指北(二):Mongo常用的10条 操

发表于 2021-06-15

这是我参与更文挑战的第15天,活动详情查看: 更文挑战

1. 显示全部可用数据库

1
bash复制代码> show dbs;

该命令将展示 mongo 的全部数据库名称,并列出来。

2. 切换数据库

1
bash复制代码> use mydb;

该命令会选择一个指定的数据库,如果数据库不存在,则会自动创建一个。但是需要注意,由于此时数据库没有数据,因此当使用 show dbs命令的时候,看不到该数据库。只有插入了数据集后才可以看到。

3. 显示数据集

1
bash复制代码> show collections;

该命令将展示当前选择的数据库下的数据集。注意如果没有数据集,则不会显示任何内容。

4. 插入数据

插入数据的格式为 db.{数据集名}.insert({数据键值对}),成功后返回插入的条数。

1
2
bash复制代码> db.test.insert({"name": "岛上码农"});
WriteResult({ "nInserted" : 1 })

插入多条数据使用中括号括起来即可,此时返回的是批量操作结果,其中 nInserted 返回的是成功插入的条数。。

1
2
3
4
5
6
7
8
9
10
11
bash复制代码> db.test.insert([{"name": "岛上码农"},{"name": "掘金"}]);
BulkWriteResult({
"writeErrors" : [ ],
"writeConcernErrors" : [ ],
"nInserted" : 2,
"nUpserted" : 0,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"upserted" : [ ]
})

5. 更新数据

更新一条数据的命令如下,其中格式为 db.{数据集名}.update({查询条件}, {$set: {更新后数据}})。

1
2
bash复制代码> db.test.update({"name": "岛上码农"}, {$set: {"name": "码农"}});
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

以上命令只会更新一条匹配的数据,如果要更新多条,需要增加参数:{multi: true}。

1
2
bash复制代码> db.test.update({"name": "岛上码农"}, {$set: {"name": "码农"}}, {multi: true});
WriteResult({ "nMatched" : 2, "nUpserted" : 0, "nModified" : 2 })

也可以使用 updateMany 更新多条。

1
2
bash复制代码> db.test.updateMany({"name": "码农"}, {$set: {"name": "岛上码农"}});
{ "acknowledged" : true, "matchedCount" : 3, "modifiedCount" : 3 }

6. 替换文档

替换文档会使用新的文档替换掉已有的文档,其中格式为 db.{数据集名}.save({新文档数据})。例如下面的例子替换了_id 为60c8a50adb9890bf41255fe4的文档。

1
2
3
4
5
6
7
bash复制代码> db.test.save({"_id": "60c8a50adb9890bf41255fe4", "name": "岛上码农-1"});
WriteResult({
"nMatched" : 0,
"nUpserted" : 1,
"nModified" : 0,
"_id" : "60c8a50adb9890bf41255fe4"
})

7. 查询数据

查询数据命令为格式为 db.{数据集名}.find()。如果需要限制条数可以加limit(n)。

1
bash复制代码> db.test.find();

查询出来的格式需要美化的话,加上 pretty()即可。

1
bash复制代码> db.test.find().pretty();

按条件查询时,在 find 中添加筛选参数即可。

1
bash复制代码> db.test.find({"name":"岛上码农"}).pretty();

8. 统计条数

统计时使用 count()函数即可,如果需要筛选也是在 find 方法中传筛选条件即可。

1
bash复制代码> db.test.find().count();

9. 删除文档

删除文档的格式为db.test.remove({筛选条件});

1
2
bash复制代码> db.test.remove({"name":"岛上码农-1"});
WriteResult({ "nRemoved" : 1 })

删除一条的使用 deleteOne 方法,删除多条使用 deleteMany 方法。

1
2
3
4
5
bash复制代码> db.test.deleteOne({"name":"岛上码农"});
{ "acknowledged" : true, "deletedCount" : 1 }

> db.test.deleteMany({"name":"岛上码农"});
{ "acknowledged" : true, "deletedCount" : 2 }

10. 查看帮助文档

对于有些命令不懂操作的,查看操作文档即可,命令格式为 db.{数据集名}.help()。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

从源码的角度来熟悉redis的渐进式rehash

发表于 2021-06-15

这是我参与更文挑战的第3天,活动详情查看: 更文挑战

概述

前两天整理了redis中字典的实现==》字典终极篇。强烈建议没看的XDM可以先去看一遍(看一遍 看一遍 看一遍 重要的事情说三遍😄),字典底层使用到哈希表,字典终极篇 这里也提到当哈希表需要扩展或者收缩的时候会将我们ht[0]里面的数据rehash到ht[1]中,而这个过程并不是一下完成的,而是一个多次 渐进的过程。好优雅的设计 崇拜~

渐进式的原因

分多次进行的原因是,如果我们的ht[0]中保存的键值对是十几或者几百这种较小的值,我们一次rehash是没有问题的,但是如果是几十万几百万呢?一次想要把这些key value rehash到ht[1]是需要一定的时间的,redis服务短时间不可用。这当然是不允许的哈~(redis不可用,那我们的服务请求不就都打到mysql了 导致mysql服务被打挂 然后我们的服务直接全是500 哇!不敢想象。。)

文字描述rehash的过程

  1. 根据字典终极篇中我们提到的ht[1]分配策略进行内存分配,此时同时存在ht[0]和ht[1]两个哈希表。
  2. 这个时候将我们字典结构上的rehashidx设置为0表示我们的rehash正式开始(之前是-1)。
  3. 在rehash进行期间,定时任务会每次都会对ht[0]上的100 * n个key(这个为什么是100*n呢?下面看下源码xdm就知道了),我们对字典的增删改查会进行一个非空key的rehash。每完成一个key都会将rehashidx的值加1
  4. 随着字典操作的不断进行 ht[0]上的所有键值都rehash到ht[1]后 ht[0]是一个空的哈希表。将ht[0]指向ht[1],ht[1]指向null,这个时候rehashidx设置成-1 表示rehash完成。

渐进式rehash的好处是采用了分而治之的方法,将rehash的过程分散到每个对字典的操作中去,避免了一起rehash庞大的计算量。

不多叨叨,来看下源码

我们来看下这个dictRehashMilliseconds方法,这个方法会在redis的定时任务中databasesCron进行调用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
ini复制代码int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;

while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}

int dictRehash(dict *d, int n) {

int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;

while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;

nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}

/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}

/* More to rehash... */
return 1;
}

在这个地方有一个死循环,只有时间超时才会break,也就是说单次的rehash每次最多是100ms,但是xdm有没有发现一个bug,就是如果dictRehash(d,100)这个方法执行超过100ms 这次就会超过100ms了。(redis其实自己是默认每次哈希100个key的时间是小于100ms的),这个地方就是上边说的为什么是100*n个key了

1
2
3
4
kotlin复制代码 while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}

接下来我们看一下最核心的dictRehash方法,这个就是核心的rehash方法。empty_visits也是一个非常非常优雅的设计,如果累计访问的null值是我们要rehash数的10倍就直接return,也是为了防止我们哈希表中null很多的情况下rehash过程花费太多的时间(这个设计真的优雅到我了)

我们在对字典进行操作的时候也会对字典进行dictRehash的操作,不过这种只会操作一个非空的key。

最后这个地方的逻辑就是rehash完成的实现:

1
2
3
4
5
6
7
ini复制代码if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}

我们可以发现

rehashidx的作用很重要,他会记录我们当前dict的rehash所进行到的索引值(我们可以分治的重要字段),也是我们当前dict是否在rehash的判断标志。

总结

当我们在rehash过程中对字典进行增删改查操作会是什么情况呢?

  1. 更新:先查询ht[0]上有没有如果有则更新,如果没有则去ht[1]上进行查询更新(查询、删除也是这个逻辑)。
  2. 新增:新增的话会直接操作ht[1],这样可以保证ht[0]上不会再新增了。

xdm redis的rehash到这就完成了,设计的真的是相当的优雅,我们可以将这种思想运用到我们的日常设计开发中,让我们的代码都优雅起来~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

网易云信大规模聊天室系统架构解析 前言 网易云信聊天室系统架

发表于 2021-06-15

前言

聊天室是一类非常重要的 IM 系统,不同于单聊和群聊,聊天室是一种大规模的实时消息分发系统。

聊天室有多种技术实现方案,业界也有一些开源的实现,每种实现都有自己的特点和应用场景。网易云信作为 PaaS 平台,其聊天室的系统架构和方案有几个突出的特点:

  • 水平扩展能力:主要体现在两方面,一个是聊天室数量,一个是单个聊天室的人数。
  • 功能丰富:作为一个平台,聊天室提供底层通信能力,提供了丰富的功能集,来适配各种各样的业务场景,使用方可以根据自己的业务要求按需使用。
  • 支持全球化:云信目前提供了覆盖全球的通信网络,通过接入云信自研的 WE-CAN 大网,全球范围内延迟不超过 250ms。

本文我们来详细介绍一下网易云信大规模聊天室系统的具体架构以及实践应用案例。

网易云信聊天室系统架构

首先,我们先来看一下网易云信当前聊天室的详细技术架构,以及我们在架构升级优化过程中做的一些事情。

整体架构图

如下图,是网易云信聊天室的技术架构:

image.png

主要包括以下部分:

  • 接入层的 ChatLink
  • 网络传输层的 WE-CAN、WE-CAN bridge
  • 调度层的 Dispatcher
  • 服务层的 Callback、Queue、Presence、Tag、History 等
  • CDN 分发层的 CDN Manager、CDN Pusher、CDN Source

下面,我们针对每一层展开详细分析。

接入层

image.png

接入层根据客户端的类型不同会有不同的实现,例如常见客户端(iOS、Andriod、Windows、Mac 等)基于私有二进制协议,Web 端基于 Websocket 协议实现。接入层作为距离客户端的最后一公里,其接入速度、质量以及数据安全都是至关重要的:

接入速度和质量

目前我们搭建了覆盖全国各省份以及全世界各大洲的边缘节点,缩短最后一公里,减少不确定性,提升服务的稳定性。

数据安全

基于对称+非对称加密,客户端与服务器之前实现 0-RTT,完成秘钥交换和登录,同时也支持 RSA/AES/SM2/SM4 等各种加密算法。

接入层除了接受来自客户端的请求,还负责进行消息的单播和广播,因此接入层需要管理本节点的所有长连接,包括每个聊天室房间的连接以及每个连接的标签属性。此外接入层会上报自己的负载信息给后端服务,方便调度层进行合理的调度。

当流量洪峰来临时,因为需要进行消息的广播,接入层往往是压力最大的,为了保证服务的稳定性,我们做了很多优化策略:

自适应的流控策略

  • 单机流控:接入层服务会监控本机整体的网络带宽使用情况,并设置 2 个阈值 T1 和 T2,当带宽使用率超过 T1 时,触发流控,如果进一步超过了 T2,则不仅触发流控还会不断的调整流控的强度。最终的目标是使带宽使用率稳定在 T1 和 T2 之间。
  • 单连接流控:除此之外,接入层服务还会记录每个长连接的消息分发速度,并进行细粒度的调整,避免单机粗粒度流控导致单个连接分发过少或者过多,做到消息分发的平滑,即减少了带宽流量的波动尖刺,也改善了端侧的体验。

性能优化

ChatLink 高负载运行时,除了网络带宽,调用链路上的各个环节都可能成为性能的瓶颈。我们通过减少编解码的次数(包括序列化、压缩等)、多线程并发、减少内存拷贝、消息合并等多种方式,显著地提升了服务性能。

网络传输层

网易云信聊天室系统最初的架构是将接入层和后端服务层部署在同一个机房的,大部分用户都是直连 BGP 机房的 ChatLink,对于偏远地区或者海外,则通过专线的方式部署代理节点完成加速。该方案存在明显的缺点就是服务能力上限受制于单机房的容量,此外,专线也是一笔不小的开销。

在我们接入 WE-CAN 大网后,接入层 ChatLink 可以做到客户端就近接入,提高服务质量的同时降低了成本。此外,多机房的架构也使得我们的服务能力上升了一个台阶。
​

image.png

为了适配 WE-CAN 大网,我们设计了 WE-CAN Bridge 层,作为大网接入协议和聊天室协议的桥接层,负责协议转换、会话管理、转发接收。通过这种分层架构,接入层和后端业务层可以少修改或者不修改,减少对已有系统的改造成本,也降低了架构升级带来的风险。

调度层

调度层是客户端接入聊天室系统的前提。客户端登录聊天室之前需要先获取接入地址,分配服务我们称之为 Dispatcher。

image.png

Dispatcher 是中心化的,会接受来自 WE-CAN 和 ChatLink 的心跳信息,根据心跳情况来选择最佳接入点,调度系统设计主要考虑的几个关键点是:

调度精度

调度系统会根据请求方的 IP 判断地域和运营商信息,对比各个边缘节点的所属区域、运营商以及节点本身的负载(如 CPU、网络带宽等),此外还考虑边缘节点到中心机房的链路情况(来自 WE-CAN),计算综合打分,并把最优的若干节点作为调度结果。

调度性能

面对高并发场景,比如一个大型聊天室,活动初期往往伴随着大量人员的同时进入,此时就需要调度系统做出快速的反应。为此,我们会将上述的调度规则以及原始数据进行本地缓存优化,此外,为了避免心跳信息滞后导致分配不合理引起节点过载,分配服务时还会动态调整负载因子,在保证调度性能的前提下,尽量做到分配结果的平滑。

服务层

服务层实现了各种业务功能,包括:在线状态、房间管理、云端历史、第三回调、聊天室队列、聊天室标签等。其中最基础的是在线状态管理和房间管理:

  • 在线状态管理:管理某个账号的登录状态,包括登录了哪些聊天室、登录在了哪些接入点等
  • 房间管理:管理某个聊天室房间的状态,包括房间分布在哪些接入点,房间里有哪些成员等

在线状态管理和房间管理的难点在于如何有效管理海量用户和房间。网易云信 PaaS 平台的特性,使得我们可以根据不同的租户来进行 Region 划分,从而做到水平扩展。此外,由于状态数据有快速变化的特点(短 TTL),当某些核心用户或者某个客户报备了大型活动时,云信可以在短时间内进行相关资源的快速拆分和隔离。

服务层除了要支持海量客户接入、水平扩展外,还有一个很重要能力,就是需要提供各种各样的扩展性功能来适配客户各种应用场景。为此云信提供了各种各样丰富的功能,比如:

  • 第三方回调:方便客户干预 C 端用户的登录、发消息等核心环节,自定义实现各种业务逻辑。因为涉及到服务调用,而这个调用是跨机房甚至是跨地区的,为了避免第三方服务故障导致云信服务异常,我们设计了隔离、熔断等机制来减少对关键流程的影响;
  • 聊天室队列:可以方便用户实现一些诸如麦序、抢麦等业务场景需求;
  • 聊天室标签:作为云信业内首创的特色功能,支持消息的个性化分发。其实现原理是通过客户端登录时设置标签组以及发消息时设置标签表达式,来定义消息分发和接收的规则。标签信息会同时保存在服务层以及接入层,通过将部分标签计算下推到接入层,节省了中心服务的带宽和计算资源。

CDN 分发层

当我们评价一个聊天室系统时,常用的一个词是无上限。架构支持无上限不代表真的无上限。一个聊天室系统,在逻辑上,各个组成单元都是可以水平扩展的,但是每个服务所依赖的物理机、交换机、机房带宽等都是有容量上限的。因此,能够合理地调配多个地域的多个机房,甚至是外部的其他资源,才能真正体现出一个聊天室系统所能支撑的容量上限。

在网易云信的聊天室系统中,用户所有的接入点遍布各地机房,天然的将各地的资源进行了整合,所能支撑的容量上限自然高于单机房或者单地区多机房的部署模式。

进一步的,当面临一个更大规模的聊天室,此时如果能利用一些外部的通用能力不失为一种合适的选择。融合 CDN 弹幕方案就是这样一种技术实现方案,它可以利用各大 CDN 厂商部署在各地的边缘节点,利用静态加速这样的通用能力来支持超大规模的聊天室消息分发。

基于融合 CDN 弹幕分发方案,其核心点就是弹幕的分发和管理,这是一个可选的模块,云信内部对此进行了封装,可以根据不同的业务特点来选择是否开启而不需要修改任何业务代码。

在开启融合 CDN 弹幕分发方案的情况下,所有的弹幕广播会划分成 两条链路:

  • 重要的且需要实时送达的消息会走长连接到达客户端
  • 其他的海量消息则会进入 CDN Pusher,通过各种策略进行聚合后送达 CDN Source

客户端 SDK 会采取一定的策略定时从 CDN 边缘节点获取弹幕消息。SDK 会聚合不同来源的消息,排序后回调给用户,App 层无需关系消息来自哪里,只需根据自己的业务需求进行处理即可。

image.png

如上图,展示了 CDN 弹幕分发链路的消息流转过程:CDN Manager 负责管理不同 CDN 厂商的分配策略(登录时会通过长连接下发,且能动态调整)。此外,还负责管理平台上各个聊天室融合 CDN 模式的开启和关闭,以及对应的 CDN Pusher 资源的调配和回收。CDN Pusher 实际负责接收来自客户端消息,并根据消息的类型、消息的优先级等,组装成以一个一个的静态资源,推给 CDN Source,等待 CDN 回源拉取。

落地实践案例

下面,我们介绍应用了网易云信聊天室系统的典型应用场景。

大规模场景应用案例

在2020年8月,网易云音乐 TFBoys 的 7 周年线上演唱会就是一个聊天室大规模场景应用的典型案例。在这场活动创造了 78w+ 的在线付费演唱会的世界纪录,其弹幕互动的实现方式采用了网易云信基于融合 CDN 弹幕分发方案。事实上,在筹备环节,我们的聊天室系统达成了 20 分钟完成从 0 到 1000w 在线,上行消息 tps 达到 100w 的性能指标。

image.png

如上图,是支持本次活动弹幕分发的架构图,普通弹幕和礼物消息分别通过客户端 SDK 以及服务器 API 到达云信服务器,并最终进入弹幕广播服务,随后分流到长连接和 CDN 上,再通过 pull / push 混合的方式送达客户端。

特色功能 - 聊天室标签应用案例

近年来,随着互联网的发展,在线教育越来越火爆,最近又兴起了“超级小班课”模式。所谓超级小班课,指的是大型多人课堂与小班互动模式结合。

在线直播场景下,文字互动作为其中重要的一环,是聊天室的典型应用场景。但在超级小班课的模式下,常规的聊天室系统却存在各种各样的问题,不管是建立多个聊天室,还是单个聊天室进行消息过滤,都存在一些严重的问题。

网易云信首创的聊天室标签功能,完美支持了上述业务场景,基于聊天室标签,我们可以灵活地支持聊天室消息定向收发、聊天室权限定向管理、聊天室成员定向查询等个性化功能,真正实现大型直播下多场景的分组互动,比如对学生进行分组标签后,方便进行因材施教;分小组讨论,小组间内部讨论和组间 PK 等等。

image.png

如上图,展示了超级小班课的一个场景:1 个主讲教师+ N 个互动小班+ N 个助教,所有学生被划分成了一个一个的小班,由对应的助教完成预习提醒、课后答疑、作业监督、学员学习情况反馈等工作,同时又接收来自主讲老师的直播画面,做到了大课的规模,小课的效果。

总结

以上,就是本文的全部分享,主要介绍了网易云信构建一个大型聊天室系统的主要技术以及架构原理。任何系统的搭建都不是一蹴而就的,云信也会继续打磨底层技术,就像引入 WE-CAN 来提升网络传输效果,也会继续丰富完善我们的功能图谱,如业内首创的聊天室标签功能等。网易云信将持续在 IM 领域深耕,为各种场景和行业的用户提供最优质的服务。

作者介绍

曹佳俊,网易云信资深服务端开发工程师,中科院研究生毕业后加入网易,一直在网易云信负责 IM 服务器相关的开发工作。对 IM 系统构建以及相关中间件的开发有丰富的实战经验。

更多技术干货,欢迎关注【网易智企技术+】微信公众号

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RabbitMQ由浅入深入门全总结(一) 写在最前面 1

发表于 2021-06-15

写在最前面

距离上一次发文章已经很久了,其实这段时间一直也没有停笔,只不过在忙着找工作还有学校结课的事情,重新弄了一下博客,后面也会陆陆续续会把文章最近更新出来~

  • 这篇文章有点长,就分了两篇
  • PS:那个Github上Java知识问答的文章也没有停笔,最近也会陆续更新

文章目录:

  • RabbitMQ由浅入深入门全总结(一)
  • RabbitMQ由浅入深入门全总结(二)
  1. 浅浅道来

1.1 什么是中间件?

IDC(互联网数据中心)的定义:中间件是一种独立的系统软件服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源,中间件位于客户机服务器的操作系统之上,管理计算资源和网络通信。

首先,中间件是某一类软件的总称,而不是某一种具体的软件。它是一种位于平台(操作系统硬件) 和 应用程序之间的通用服务,它屏蔽了底层操作系统的各种复杂性,减轻了开发人员的技术负担,同时它的设计不针对某一具体目标,而是提供具有普遍通用特点的功能模块服务,这些服务具有标准的程序接口和协议,根据平台的不同,也可以有不同的实现。

通俗的例子(仅供参考,并不算完全一致):

  • 我开了一家咖啡店,我身边有 A B C 等 n 家咖啡豆的供应商,但是我肯定要挑选价格又实惠,质量还不错的豆子,但是市场是受到多方面因素波动的,可能我现在的选择,在一段时间后已经不是最佳选项了。所以我专门找到一家市场中介,让他帮我操心这一摊子事情,我只和你说清价格和质量要求,你去找就是了,过程我一点也不操心。这个中介的概念,就类似中间件的

1.1.1 分布式的概念(补充)

这一段,来自我之前写的 Dubbo 入门的那篇文章哈

在百度以及维基中的定义都相对专业且晦涩,大部分博客或者教程经常会使用《分布式系统原理和范型》中的定义,即:“分布式系统是若干独立计算机的集合,这些计算机对于用户来说就像是单个相关系统”

下面我们用一些篇幅来通俗的解释一下什么叫做分布式

1.1.1.1 什么是集中式系统

提到分布式,不得不提的就是 “集中式系统”,这个概念最好理解了,它就是将功能,程序等安装在同一台设备上,就由这一台主机设备向外提供服务

举个最简单的例子:你拿一台PC主机,将其改装成了一台简单的服务器,配置好各种内容后,你将MySQL,Web服务器,FTP,Nginx 等等,全部安装在其中,打包部署项目后,就可以对外提供服务了,但是一旦这台机器无论是软件还是硬件出现了问题,整个系统都会受到严重的牵连错误,鸡蛋放在一个篮子里,要打就全打了

1.1.12 什么是分布式系统

既然集中式系统有这样一种牵一发而动全身的问题,那么分布式的其中一个作用,自然是来解决这样的问题了,正如定义中所知,分布式系统在用户的体验感官里,就像传统的单系统一样,一些变化都是这个系统本身内部进行的,对于用户并没有什么太大的感觉

例如:淘宝,京东这种大型电商平台,它们的主机都是数以万计的,否则根本没法处理大量的数据和请求,具体其中有什么划分,以及操作,我们下面会说到,但是对于用户的我们,我们不需要也不想关心这些,我们仍可以单纯的认为,我们面对的就是 “淘宝” 这一台 “主机”

所以分布式的一个相对专业一些的说法是这样的(进程粒度)两个或者多个程序,分别运行在不同的主机进程上,它们互相配合协调,完成共同的功能,那么这几个程序之间构成的系统就可以叫做分布式系统

  • 这几者都是相同的程序 —— 分布式
  • 这几者都是不同的程序 —— 集群

1.2 什么是消息中间件/消息队列(MQ)

消息中间件,顾名思义就是用来处理消息相关服务的中间件,它提供了一种系统之间通信交互的通道,例如发送方只需要把想传输的信息交给消息中间件,而发送的协议,方式,发送过程中出现的网络,故障等等问题,都由中间件进行处理,因此它负责保证信息的可靠传输。

所以消息中间件,就是一种用来接受数据,存储数据,发送数据的技术,它提供了各种功能,可以实现消息的高可用,高可靠,也提供了很好的容错机制等。可以程序对系统资源的占用,以及传输效率的提升有很大帮助。

  • 常说的 MQ 就是指消息队列,即 Message Quene,常见的消息队列有,经典的 ActivieMQ,热门的 Kafka,阿里的 RocketMQ 等等,以及这里讲解的 RabbitMQ。
  • 不同的 MQ 有着不同的特点,以及其更加擅长的方向,倒也说不上谁好谁坏,只有谁更合适。

1.2.1 消息队列应用场景

根据业务的需要,其实它可以有多种应用场景,例如解耦,削峰填谷,广播等,我们举两个场景来梳理一下简单的过程

1.2.1.1 业务解耦

最近在考虑买几本书看,就以买书下订单举例,当我点击购买之后,可能会有这么一串业务逻辑执行,① 减去库存容量 ② 生成订单 ③ 支付 ④ 更新订单状态 ⑤ 发送购买成功短信 ⑥ 更新商品快递揽收状态。在初期阶段,我们完全可以让这些业务同步执行,但是后期为了提升效率,就可以将需要立即执行的任务和可稍缓执行的任务进行分离,例如 ⑤ 发送购买成功短信 ⑥ 更新商品快递揽收状态,都可以考虑异执行。在主流程执行结束后,这些可稍缓的业务可以通过给 MQ 发送消息,就判定已经执行,保证流程先结束。然后再通过拉取 MQ 消息,或者 MQ 主动推送去异步执行其他的业务。

1.2.1.2 削峰填谷

例如发送一条带有已读未读标识的公告信息,所以需要对每一个用户都写一条这样的公告消息,例如存到 MongoDB 中,即便 MongoDB 也支撑不下来瞬时写入百万、千万记录的情况,所以可以考虑使用消息队列。比如说我们可以在Java后端系统上面,用异步多线程的方法,向消息队列MQ中发送消息,这样Web系统发布公告消息的时候就不占用数据库正常的 CRUD 操作。系统消息保存在消息队列中,我们只是用它来做削峰填谷,系统消息最终还是要存储在数据库上面。于是我们可以这样设计,在用户登陆系统的时候,用异步线程从消息队列MQ中,接收该用户的系统消息,然后把系统消息存储在数据库中,最后消息队列MQ中的该条消息自动删除。因为用户的错峰登录,所以往数据库中写入消息的任务也变成了错峰写入。

1.3 什么是 RabbitMQ

RabbitMQ 是一个使用 Erlang 语言编写,且遵循 AMQP协议的开源消息队列系统,支持多种客户端(语言),用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征。

更详细的介绍可以直接看一下官网:

  • www.rabbitmq.com/

总之这就是一种常见的消息队列,它的这些特点,都会在后面逐条讲解到,我们首先从入门下载安装部分先说起,然后再到使用。

  1. 下载与安装

一般来说,安装的方式有手动安装和 Docker 安装,大部分场景下,都会使用 Docker 安装,但是作为学习阶段,如果不是特别着急,学习一下手动安装,也不是什么坏事。

注:云服务器和虚拟机都可以,演示的 Linux 版本为 CentOS 7.9

2.1 手动安装

2.1.1 下载安装过程

注:可以在 Linux 中通过 yum 直接下载安装,这里选择了在自己的 Windows 主机先下载文件,然后再通过 FTP 传到 Linux 上,直接安装。可以避免虚拟机上因为网络而造成的一些下载问题。

  1. 首先打开官网的下载目录,然后根据自己 Linux 的版本,选择版本。
* 地址:[www.rabbitmq.com/download.ht…](https://www.rabbitmq.com/download.html)

  1. 因为 RabbitMQ 是 Erlang 语言编写的,所以还需要提供 Erlang 环境,接着去下载 Erlang。
* 地址:[www.erlang-solutions.com/downloads](https://www.erlang-solutions.com/downloads)
    + A:此网站访问速度极慢,请耐心等待,或者需要挂上梯子
    + B:Erlang 版本需要与 RabbitMQ 匹配(如图,有最大和最小版本的限制)
        - 版本查看地址:[www.rabbitmq.com/which-erlan…](https://www.rabbitmq.com/which-erlang.html)
        - 这里选择了 RabbitMQ 3.8.14 和 Erlang 23.2.3

  1. 将文件上传到 Linux 中(我这里指定位置是 /usr/local/bin/rabbitmq ,可以自己更改选择)
    • 现在很多 Shell 软件都自带内置的 FTP 上传,例如 FinalShell,MobaXterm 等等
    • 上传后的文件和目录位置如下
1
2
3
4
shell复制代码[root@centos7 rabbitmq]# ls
esl-erlang_23.2.3-1_centos_7_amd64.rpm rabbitmq-server-3.8.14-1.el7.noarch.rpm
[root@centos7 rabbitmq]# pwd
/usr/local/bin/rabbitmq
  1. 安装 Erlang 、Socat 和 RabbitMQ
    • Erlang 、Socat 都是 RabbitMQ 所依赖的
1
2
3
4
5
6
7
8
shell复制代码# 安装 Erlang,安装后执行 erl -v 显示版本号则代表成功
rpm -ivh esl-erlang_23.2.3-1_centos_7_amd64.rpm

# 安装 Socat 这里没有下载源文件,而是直接通过 yum 在线安装,因为它并不大
yum install -y socat

# 安装 RabbitMQ
rpm -ivh rabbitmq-server-3.8.14-1.el7.noarch.rpm
  1. 安装结束,启动服务查看 RabbitMQ 是否可以启动成功
1
2
3
4
5
6
7
8
shell复制代码# 启动服务
systemctl start rabbitmq-server
# 开机自启
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server.service

如图所示,即安装启动成功

若安装错误,处理参考:

  • Linux之RabbitMQ安装各种问题处理
  • rabbitmq ERROR: epmd error for host deb:address (cannot connect to host/port)解决方法

2.1.2 配置 Web 界面管理

上面的安装其实已经结束了,但是 RabbitMQ 提供给了我们一个 Web 形式的管理界面,默认是没有的,需要进行安装。

  1. 安装 Web 管理插件,然后重启服务
1
2
3
4
5
shell复制代码# 安装命令
rabbitmq-plugins enable rabbitmq_management

# 重启服务
systemctl restart rabbitmq-server
  1. 一定要开放 Linux 防火墙 的 15672 端口,否则就会无法访问,在学习阶段,你甚至可以去查询命令把防火墙关掉
    • 对应服务器(阿里云,腾讯云等)就是在安全组中开放 15672 端口
    • 访问 Linux IP:15672 ,例如 http://192.168.122.1:15672
1
2
3
4
5
6
7
8
shell复制代码# 查询 15672 是否开放,一般默认都是 no
firewall-cmd --query-port=15672/tcp
# 开放指定端口 15672
firewall-cmd --add-port=15672/tcp --permanent
# 重新载入
firewall-cmd --reload
# 再次查询,结果就是 yes 了
firewall-cmd --query-port=15672/tcp
  1. 添加远程登录的账户
* RabbitMQ 有一个默认账号和密码都是 guest 但是只能在 localhost 下访问
1
2
shell复制代码# 新增用户 用户名和密码都是 admin
rabbitmqctl add_user admin admin
  1. 为远程登录的账户添加权限
    • administrator(超级管理员):登录控制台、查看所有信息、操作用户、操作策略
    • monitoring(监控者): 登录控制台、查看所有信息
    • policymaker(策略制定者): 登录控制台、指定策略
    • managment(普通管理员): 登录控制台
1
2
shell复制代码# 设置用户分配操作权限,admin 用户的权限为 administrator
rabbitmqctl set_user_tags admin administrator
  1. 为用户添加资源权限
    • 因为 admin 已经是超级管理员权限了,所以其实不分配资源权限也可以,会默认去做。
1
2
3
shell复制代码# 命令格式为: set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
# 这里即为 admin 用户开启 配置文件和读写的权限
rabbitmqctl set_permissions -p / admin ".*"".*"".*"
  1. 访问 Linux IP:15672 ,例如 http://192.168.122.1:15672 ,输入刚才设置好的用户名密码 admin
    • 如图:访问成功

2.1.2.1 命令小结

  1. 添加用户:rabbitmqctl add_user <username> <password>
  2. 修改密码:rabbitmqctl change_password <username> <newpass>
  3. 删除用户:rabbitmqctl delete_user <username>
  4. 用户列表:rabbitmqctl list_users
  5. 设置用户角色:rabbitmqctl set_user_tags <username> <tag1,tag2>
  6. 删除用户所有角色:rabbitmqctl set_user_tags <username>
  7. 为用户添加资源权限:set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

使用:输入 rabbitmqctl ,则会提示可能使用的命令,然后 使用 rabbitmqctl hepl <命令> 可以查看具体命令的使用方法和参数。

2.1.3 简单介绍 Web 界面管理

  • Connections(连接):此处用来管理与 RabbitMQ 建立连接后的生产者和消费者
  • Channels(通道):连接建立后,会形成通道,消息的投递获取依赖通道。
  • Exchanges(交换机):用来实现消息的路由
  • Queues(队列):存放消息的队列,消息等待被消费,消费后被移除队列。
  • Admin(管理):用于对管理用户,以及对应权限进行设置,如下图所示

Tags 就是用来指定用户的角色

  • administrator(超级管理员):登录控制台、查看所有信息、操作用户、操作策略
  • monitoring(监控者): 登录控制台、查看所有信息
  • policymaker(策略制定者): 登录控制台、指定策略
  • managment(普通管理员): 登录控制台

2.2 Docker 安装

在 Docker 中安装 RabbitMQ 不需要自己去考虑版本,环境等的各种冲突不兼容问题,是非常便捷的,我演示的这台虚拟机是一个 CentOS 7.9 裸机,所以我们从更新 yum,到安装 Docker 和 安装 RabbitMQ 按步骤都讲一下

2.2.1 配置 yum

  1. 更新 yum 到最新版
1
2
3
4
5
shell复制代码# 更新 yum
yum update

# 检查yum依赖的几个包 yum-utils 提供 yum-config-manager 功能, 后面两个是 devicemapper 用到的
yum install -y yum-utils device-mapper-persistent-data lvm2
  1. 设置 yum 源为阿里云
1
shell复制代码yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

2.2.2 安装 docker

2.2.2.1 步骤

  1. 使用 yum 安装 docker
    • docker-ce 是社区版的意思,ee为企业版
1
shell复制代码yum install docker-ce -y
  1. 通过查看版本,检查安装是否成功
1
shell复制代码docker -v
  1. Docker 镜像加速(这里 <你的ID >要换成自己的哈)
1
2
3
4
5
6
7
8
shell复制代码sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://<你的ID>.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
  • 国内从 DockerHub 拉取镜像有时会遇到困难,此时可以配置镜像加速器。Docker 官方和国内很多云服务商都提供了国内加速器服务,例如:
+ 科大镜像:**[docker.mirrors.ustc.edu.cn/](https://docker.mirrors.ustc.edu.cn/)**
+ 网易:**[hub-mirror.c.163.com/](https://hub-mirror.c.163.com/)**
+ 阿里云:**https://<你的ID>.mirror.aliyuncs.com**
+ 七牛云加速器:**[reg-mirror.qiniu.com](https://reg-mirror.qiniu.com)**当配置某一个加速器地址之后,若发现拉取不到镜像,请切换到另一个加速器地址。国内各大云服务商均提供了 Docker 镜像加速服务,建议根据运行 Docker 的云平台选择对应的镜像加速服务。

阿里云镜像获取地址:cr.console.aliyun.com/cn-hangzhou…

2.2.2.2 Docker 常见命令

2.2.2.2.1 管理命令

  • 就启动,停止,重启这些简单的命令使用 service 也是可以的,systemctl 功能稍微强大一些
1
2
3
4
5
6
7
8
9
10
11
shell复制代码# 启动 docker
systemctl docker start
# 停止 docker
systemctl docker stop
# 重启 docker
systemctl docker restart
# 查看 docker 状态
systemctl status docker
# 开机自启
systemctl enable docker
systemctl unenable docker

2.2.2.2.2 镜像命令

1
2
3
4
5
6
shell复制代码# 导入镜像文件
docker load < xxx.tar.gz
# 查看安装的镜像
docker images
# 删除镜像
docker rmi 镜像名

2.2.3 安装 RabbitMQ (任选其一)

注:直接用 2.2.3.2 一句话安装 会更好一些

2.2.3.1 一步一步安装

  1. 获取 RabbitMQ 的镜像
1
shell复制代码docker pull rabbitmq:management
  1. 创建并运行容器(具体参数在 3 中介绍)
1
shell复制代码docker run -id --name 容器名 -p 15672:15672 -p 5672:5672 rabbitmq:management

2.2.3.2 一句话安装

上面的安装方式,就是先获取到 RabbitMQ 镜像后再开始安装,这里是没有问题的,创建时会有一个问题,因为我们要安装 management 也就是它的 web 管理,如果不做一些处理,默认装好的是没有用户的,所以还需要像前面一样自己进去配置,而 Docker Hub 已经给出了我们配置的示例,即使用 -e 代表配置,使用 RABBITMQ_DEFAULT_USER 和 RABBITMQ_DEFAULT_PASS 配置用户名和密码

更多请查看 Docker Hub 官方给予例子中的 Setting default user and password 章节

registry.hub.docker.com/_/rabbitmq/

  1. 执行安装
1
shell复制代码docker run -di --name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
  1. 通过容器状态,查看是否运行成功
1
2
3
4
5
6
7
8
9
10
shell复制代码# 查看容器运行状态
docker ps -a
# 启动
docker start 容器名
# 停止
docker stop 容器名
# 退出命令行,不停止
exit
# 进入到node容器(如果开启了 -t 的情况)
docker exec -it 容器名 bash

2.2.3.2.1 参数介绍

下面分别讲解一下这些参数的说明:

  • -i:表示运行容器。
  • -t:表示为容器保留交互的方式(命令行),即分配一个伪终端。所以常常会见到 -it 这样的搭配。
  • --name :为容器起个名字。
  • -v:表示目录映射关系(前者是宿主机目录,后者是映射到宿主机上的目录),可以使用多个 -v 做多个目录或文件映射。注意:推荐做目录映射,在宿主机上做修改,然后共享到容器上。
  • -d:表示创建一个守护式容器在后台运行(这样创建容器后不会自动登录容器,如果只加 -i -t 两个参数,创建后就会自动进去容器),即后端挂起运行。
  • -p:表示端口映射,前者是宿主机端口,后者是容器内的映射端口。可以使用多个 -p 做多个端口映射,只有做了端口映射,才能被外界访问。

给大家举个例子:

1
2
3
4
5
shell复制代码# 创建容器,把容器 3000 端口映射到宿主机 3000 端口,把/demo映射到宿主机的/demo  face是我下载好的一个现成的镜像
docker run -d -it -p 3000:3000 -v /demo:/demo --name node face

# 例如,名为 node 的镜像中有一个需要执行的 python 程序,就可以通过如下命令进入刚才分配到的命令行中去执行这个程序
docker exec -it node bash
  • 因为使用了 -t 这个参数,所以可以分配到一个伪终端,通过 docker exec -it 容器名 bash 进入命令行
  • -v 目录映射后,进入容器后,也会有一个一模一样的 demo 文件夹,例如在其中可以执行 python 程序

2.2.3.2.1 端口介绍

4369 :erlang发现端口

5672:client端通信端口

15672:管理界面ui端口

25672:server间内部通信端口

61613:不带TLS和带TLS的STOMP客户端

1883:不启用和启用TLS的MQTT客户端

比较关键的就是 5672 和 15672

更多端口详情可以访问官网文档

  • www.rabbitmq.com/networking.…

注:如果要通过远程连接,例如访问 web 管理页面的 15672 端口,Java 客户端连接的 5672 端口, 一定要进行一个开放操作,否则都连接不到。

  • 以下为基于 CentOS 7.9 开放 15672 端口的例子
1
2
3
4
5
6
7
8
shell复制代码# 查询 15672 是否开放,一般默认都是 no
firewall-cmd --query-port=15672/tcp
# 开放指定端口 15672
firewall-cmd --add-port=15672/tcp --permanent
# 重新载入
firewall-cmd --reload
# 再次查询,结果就是 yes 了
firewall-cmd --query-port=15672/tcp
  • 以下是关闭防火墙的命令
1
2
shell复制代码systemctl disable firewalld
systemctl stop firewalld
  1. RabbitMQ 协议和模型

安装结束后,就要进入主题,即用 Java 或者 Springboot 代码来实现 RabbitMQ的几种方式,但是想要很好的理解这几种路由交换方式,就需要对它的协议和架构模型有所了解。

3.1 协议

3.1.1 什么是协议?

协议,网络协议的简称,网络协议是通信计算机双方必须共同遵从的一组约定。如怎么样建立连接、怎么样互相识别等。只有遵守这个约定,计算机之间才能相互通信交流。它的三要素是:语法、语义、时序。

为了使数据在网络上从源到达目的,网络通信的参与方必须遵循相同的规则,这套规则称为协议(protocol),它最终体现为在网络上传输的数据包的格式。

3.1.1.1 网络协议的三要素

  1. 语法:数据与控制信息的结构和格式,以及数据出现的顺序。
  2. 语义:解释控制信息每个部分的意义,以及规定了需要发出何种控制信息以及完成的动作做出何种响应。
  3. 时序:对事件发生顺序的详细说明。

人们形象地把这三个要素描述为:做什么,怎么做,做的顺序。

举个例子 HTTP 协议

语法:HTTP 规定了请求报文和响应报文的格式
语义:客户端主动发起请求称为请求,服务端随之返回数据,称为响应
时序: 一个请求对应一个响应,而且先有请求后有响应

3.1.1.1.1 面试题:为什么消息中间件不直接使用 HTTP 协议

对于一个消息中间件来说,其主要责任就是负责数据传递,存储,分发,高性能和简洁才是我们所追求的,而 HTTP 请求报文头和响应报文头是比较复杂的,包含了Cookie,数据的加密解密,窗台吗,响应码等附加的功能,我们并不需要这么复杂的功能。

同时大部分情况下 HTTP 大部分都是短链接,在实际的交互过程中,一个请求到响应都很有可能会中断,中断以后就不会执行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取信息的过程,出现问题和故障要对数据或消息执行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行

3.1.2 RabbitMQ 的 AMQP 协议

RabbitMQ 的使用的协议是 AMQP(advanced message queuing protocol),它在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。

AMQP 更准确的说是一种 binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的 Provider(Producer) 天然性就是跨平台的。

相比较其它消息协议,其特性为:

  1. 分布式事务支持
  2. 消息的持久化支持
  3. 高性能和高可靠的消息处理优势

3.1.3 架构模型

想要学习后面的几种消息具体的发送模式,这个模型图就必须理解清楚,因为这几种方式就是对这个模型不同程度的选择和缩减

  • Producer:消息的生产者(发送消息的程序)。
  • Connection:应用程序与Broker之间的网络连接。
  • Channel:信道,即信息传输的通道,可以建立多个 Channel,每个 Channel 代表一个会话任务。
+ 信道是建立在 TCP 连接内的虚拟连接,信息的读写都通过信道传输,因为对于操纵系统而言,建立和销毁 TCP 是非常昂贵的,所以引入了信道的概念,以复用一条 TCP 连接。
  • Broker(Server) :标识消息队列服务器实体,例如这里就是 RabbitMQ Server。
  • Virtual Host:虚拟主机,一个 Broker 中可以设置多个 Virtual Host,用作不同用户的权限隔离。
+ Broker 可以理解为整个数据库服务,而 Virtual Host 就是其中每个数据库的感觉,不同项目可以对应不同的数据库,其中有着项目所属的业务表等等。
+ 每个 Virtual Host 中,可以有若干个 Exchange 和 Queue。
  • Exchange:交换机,用来接收生产者发送的消息,然后将这些消息根据路由键发送到队列。
  • Binding:Exchange 和 Queue 之间的虚拟连接,Binding 中可以包括多个 Routing key。
  • Routing key:路由规则,虚拟机用它来确认如何路由一个特定消息。
  • Queue:消息队列,它是消息的容器,用来保存消息,每一条消息都能传入一个或者多个队列中,等待消费者消费,即取出这个消息。
  • Consumer:消息的消费者(接收消息的程序)。
  1. Java 实现 RabbitMQ

4.1 环境搭建

官网介绍几种模型:www.rabbitmq.com/getstarted.…

截止目前为止,官网一共提供了 7 中模型的介绍,我们主要介绍前五种基本的模式,也有人将 Direct 和 Topic模式都归入 Routing 模式,也可以看做四大种。

4.1.1 创建 Java 项目

首先创建好一个不使用骨架的 Maven 项目,然后引入 RabbitMQ 依赖,还有单元测试依赖即可

1
2
3
4
5
6
7
8
9
10
xml复制代码<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>

4.1.2 创建虚拟主机(可选)

在这里,我们创建了一个新的 Virtual Hosts,用来为这个Java项目服务,大家还可以创建一个新的用户,然后对其开启这个 Virtual Hosts 的访问权限(即将虚拟主机与用户绑定)。我们这里还是用 admin(我之前创建的一个管理员权限用户) 来演示。

注:这部分不去做也可以,直接用 / 和 admin 用户也行

4.1.3 创建连接工具类

由于我们后面要演示多种例子,而每一次获取连接和释放连接、关闭资源等操作代码都是一致的,为了防止代码冗余,优化代码,更易理解,提取出一个工具类,这样大家将重心放在不同实现方式的对比上就行了。

  • RabbitMqUtil 工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
java复制代码public class RabbitMqUtil {
/**
* 主机名 即 Linux IP地址
*/
private static String host = "";
/**
* 端口号 客户端访问默认都是 5672
*/
private static int port = 0;
/**
* 虚拟主机 可以设置为默认的 / 或者自己创建出指定的虚拟主机
*/
private static String virtualHost = "";
/**
* 用户名
*/
private static String username = "";
/**
* 密码
*/
private static String password = "";

// 使用静态代码块为Properties对象赋值
static {
try {
//实例化对象
Properties properties = new Properties();
//获取properties文件的流对象
InputStream in = RabbitMqUtil.class.getClassLoader().getResourceAsStream("rabbitmq.properties");
properties.load(in);
// 分别获取 value
host = properties.getProperty("host");
port = Integer.parseInt(properties.getProperty("port"));
virtualHost = properties.getProperty("virtualHost");
username = properties.getProperty("username");
password = properties.getProperty("password");

} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 获取连接
*
* @return 连接
*/
public static Connection getConnection() {
try {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接 rabbitmq 主机
connectionFactory.setHost(host);
// 设置端口号
connectionFactory.setPort(port);
// 设置连接的虚拟主机(数据库的感觉)
connectionFactory.setVirtualHost(virtualHost);
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// 返回一个新连接
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
* 关闭通道和释放连接
*
* @param channel channel
* @param connection connection
*/
public static void close(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
  • properties
1
2
3
4
5
properties复制代码host=192.168.122.1
port=5672
virtualHost=/rabbitmq_maven_01
username=admin
password=admin

4.2 五种实现方式

说明:

  • 队列名,消息等等字符串内容,更推荐定义成变量传入,我文中都是直接写在参数中的,这种魔法值的写法,并不是很优美。
  • 生产者中使用了 Junit 单元测试,但是消费者中却在 main 函数中编写,这是因为,我们希望消费者处于一个持续运行等待的状态,如果使用 Junit 会导致,程序在执行一次后结束掉。
    • 除了在 main 函数中编写,还可以考虑使用 sleep 等待或者 while(true) 让程序不要直接终止掉。

4.2.1 简单队列模式(Hello Word)

  • Producer:消息的生产者(发送消息的程序)。
  • Queue:消息队列,理解为一个容器,生产者向它发送消息,它把消息存储,等待消费者消费。
  • Consumer:消息的消费者(接收消息的程序)。

4.2.1.1 如何理解

由图所示,简单队列模式,一个生产者,经过一个队列,对应一个消费者。可以看做是点对点的一种传输方式,相较与 3.1.3 中的模型图,最主要的特点就是看不到 Exchange(交换机) 和 routekey(路由键) ,正是因为这种模式简单,所以并不会涉及到复杂的条件分发等等,因此也不需要用户去显式的考虑交换机和路由键的问题。

  • 但是要注意,这种模式并不是生产者直接对接队列,而是用了默认的交换机,默认的交换机会把消息发送到和 routekey 名称相同的队列中去,这也是我们在后面代码中在 routekey 位置填写了队列名称的原因

4.2.1.2 代码实现

4.2.1.2.1 生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare("queue1",false,false,false,null);
// 发布消息
channel.basicPublish("","queue1",null,"This is rabbitmq message 001 !".getBytes());
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel,connection);
}
}
  1. 通过工具类获取连接
  2. 获取连接通道:根据 3.1.3 的模型图可知,生产者需要在获取到连接后,再获取信道,才能去访问后面的交换机队列等。
  3. 通道绑定消息队列:绑定队列前,应该绑定交换机,但是此模式中隐蔽了交换机的概念,背后使用了默认的交换机,所以直接绑定队列。
    • queueDeclare 方法解释
      • 参数1:queue(队列名称),如果队列不存在,则自动创建。
      • 参数2:durable(队列是否持久化),持久化可以保证服务器重启后此队列仍然存在。
      • 参数3:exclusive(排他队列)即是否独占队列,如果此项为 true,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。
      • 参数4:autoDelete(自动删除),最后一个消费者将消息消费完毕后,自动删除队列。
      • 参数5:arguments(携带附加属性)。
  4. 发布消息:此处可以指定消息队列的发送方法,以及内容等,因为此模式比较简单,所以没有涉及到全部参数,后面的模式会有详细的讲解
    • basicPublish 方法解释
      • 参数1:exchange(交换机名称)。
      • 参数2:routingKey(路由key),此处填写队列名,可理解为把消息发送到和 routekey 名称相同的队列中去。
      • 参数3:props(消息的控制状态),可以在此处控制消息的持久化。
        • 参数为:MessageProperties.PERSISTENT_TEXT_PLAIN
      • 参数4:body(消息主体),类型是一个字节数组,要转一下类型。
  5. 通过工具关闭channel和释放连接:先关闭通道,再释放连接。

4.2.1.2.2 消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException{
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare("queue1", false, false, false, null);
// 消费消息
channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body): " + new String(body));
}
});
}
}
  1. 通过工具类获取连接
  2. 获取连接通道
  3. 通道绑定消息队列
  4. 消费消息:此处用来指定消费哪个队列的消息,以及一些机制和回调
* basicConsume 方法解释
    + 参数1:queue(队列名称),即消费哪个队列的消息 。
    + 参数2:autoAck(自动应答)开始消息的自动确认机制,只要消费了就从队列删除消息。
    + 参数3:callback(消费时的回调接口),callback 的类型是 Consumer 这里使用了 DefaultConsumer 就是 Consumer 的一个实现类。其中重写 handleDelivery 方法,就可以获取到消费的数据内容了,这里主要使用了其中的 body,即查看消息主体,其他三个参数暂时还没用到,有兴趣可以先打印输出一下,能先有个大概的了解。

4.2.2 工作队列模式(Work Queue)

  • Producer:消息的生产者(发送消息的程序)。
  • Queue:消息队列,理解为一个容器,生产者向它发送消息,它把消息存储,等待消费者消费。
  • Consumer:消息的消费者(接收消息的程序)。
+ 此处我们假设 Consumer1、Consumer2、Consumer3 分别为完成任务速度不一样快的消费者,这会引出此模式的一个重点问题。

4.2.2.1 如何理解

工作模式由图可以看出,就是在简单队列模式的基础上,增加了多个消费者,也就是让多个消费者绑定同一个队列,共同去消费,这样能解决简单队列模式中,如果生产速速远大于消费速度,而导致的消息堆积现象。

  • 因为消息被消费后就会消失,所以不必担心任务会重复执行。

4.2.2.2 代码实现

注:工作队列模式有两种

  1. 轮询模式:每个消费者均分消息
  2. 公平分发模式(能者多劳):按能力分发,处理速度快的分发的多,处理速度慢的分发的少

我们首先演示的是轮询模式,根据它的缺点,又能引出公平分发模式

下面只描述与上面有差异的部分,在简单模式中,这些基本的方法都有介绍过

4.2.2.2.1 轮询模式-生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare("work", true, false, false, null);
for (int i = 1; i <= 20; i++) {
// 发布消息
channel.basicPublish("", "work", null, (i + "号消息").getBytes());
}
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel, connection);
}
}

流程和简单队列模式基本一致,有一些小小的改动,生产者中主要就是加了层循环,因为有多个消费者,所以多发送一些消息,可以看出一些特点和问题。

4.2.2.2.2 轮询模式-消费者代码

  • 消费者 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class Consumer1 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel = connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare("work", true, false, false, null);
// 消费消息
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号:消费-" + new String(body));
}
});
}
}
  • 消费者 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel = connection.createChannel();
// 通道绑定消息队列
channel.queueDeclare("work", true, false, false, null);
// 消费消息
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号:消费-" + new String(body));
}
});
}

上述两个消费者都在 basicConsume中开启了自动 Ack 应答,这一点下面会详述,同时在消费者 1 中,增加了 sleep 2s 的语句,模拟消费者1处理消息速度慢,而消费者2处理消息速度快的场景。

运行结果:

  • Consumer1
1
2
3
4
5
6
7
8
9
10
java复制代码消费者1号:消费-1号消息
消费者1号:消费-3号消息
消费者1号:消费-5号消息
消费者1号:消费-7号消息
消费者1号:消费-9号消息
消费者1号:消费-11号消息
消费者1号:消费-13号消息
消费者1号:消费-15号消息
消费者1号:消费-17号消息
消费者1号:消费-19号消息
  • Consumer2
1
2
3
4
5
6
7
8
9
10
java复制代码消费者2号:消费-2号消息
消费者2号:消费-4号消息
消费者2号:消费-6号消息
消费者2号:消费-8号消息
消费者2号:消费-10号消息
消费者2号:消费-12号消息
消费者2号:消费-14号消息
消费者2号:消费-16号消息
消费者2号:消费-18号消息
消费者2号:消费-20号消息

观察执行过程:发现两个消费者虽然每个人最后都各自处理了一半的消息,而且是按照一人一条分配的,但是消费者2号处理速度快,一下子就全部处理完了,但是消费者1号,每一次处理都需要 2s 所以,只能缓慢的处理,而消费者2号就处于一个空闲浪费的情况了。

如何切换为公平分发模式呢?

这就和 basicConsume 中的第二个参数,开启自动确认消费有关了,它默认是 true,也就代表只要一旦拿到队列中分发给这个消费者的消息,我就会自动返回一个确认消费的标识,队列收到后就会自动删除掉队列中的消息。

  • 但是这其中有一个很重要的问题,这种方式就是将风险交给了消费者,例如消费者收到了自己需要处理的 10 条消息,刚消费了 4 个,消费者宕机,挂掉了,后面的 6 个消息就丢失了。

如果想要修改为按能力分配的方式,有两个要点

  1. 设置通道一次只能消费一个消息
  2. 关闭消息的自动确认,手动确认消息

4.2.2.2.3 公平分发模式-生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 一次只发送一条消息
channel.basicQos(1);
// 通道绑定消息队列
channel.queueDeclare("work", true, false, false, null);
for (int i = 1; i <= 20; i++) {
// 发布消息
channel.basicPublish("", "work", null, (i + "号消息").getBytes());
}
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel, connection);
}

4.2.2.2.4 公平分发模式-消费者代码

  • 消费者1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class Consumer1 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel = connection.createChannel();
// 一次只接受一条未确认的消息
channel.basicQos(1);
// 通道绑定消息队列
channel.queueDeclare("work", true, false, false, null);
// 消费消息
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号:消费-" + new String(body));
// 返回 deliveryTag 代表队列可以删除此消息了
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
  • 消费者2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel = connection.createChannel();
//步骤一:一次只接受一条未确认的消息
channel.basicQos(1);
// 通道绑定消息队列
channel.queueDeclare("work", true, false, false, null);
// 消费消息
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号:消费-" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}

运行结果:

  • Consumer1
1
java复制代码消费者1号:消费-1号消息
  • Consumer2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码消费者2号:消费-2号消息
消费者2号:消费-3号消息
消费者2号:消费-4号消息
消费者2号:消费-5号消息
消费者2号:消费-6号消息
消费者2号:消费-7号消息
消费者2号:消费-8号消息
消费者2号:消费-9号消息
消费者2号:消费-10号消息
消费者2号:消费-11号消息
消费者2号:消费-12号消息
消费者2号:消费-13号消息
消费者2号:消费-14号消息
消费者2号:消费-15号消息
消费者2号:消费-16号消息
消费者2号:消费-17号消息
消费者2号:消费-18号消息
消费者2号:消费-19号消息
消费者2号:消费-20号消息

4.2.3 发布与订阅模式(Fanout 广播)

  • Producer:消息的生产者(发送消息的程序)。
  • Exchange :交换机,负责发送消息给指定队列。
  • Queue:消息队列,理解为一个容器,生产者向它发送消息,它把消息存储,等待消费者消费。
  • Consumer:消息的消费者(接收消息的程序)。

4.2.3.1 如何理解

Fanout 直译为 “扇出” 但是大家更多的会把它叫做广播或者发布与订阅,它是一种没有路由key的模式,生产者将消息发送给交换机,交换机会把所有消息复制同步到所有与它绑定过的队列上,而每个队列只能有一个消费者拿到这条消息,如果在一个消费者连接中,创建多个通道,则会出现争抢消息的结果。

4.2.3.2 代码实现

注:下面只描述与上面有差异的部分,在简单模式中,这些基本的方法都有介绍过

4.2.3.2.1 生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
final Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("order", "fanout");
for (int i = 1; i <= 20; i++) {
// 发布消息
channel.basicPublish("order", "", null, "fanout!".getBytes());
}
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel, connection);
}
}
  1. 声明交换机
* exchangeDeclare 方法解释
    + 参数1:exchange(交换机名称),如果交换机不存在,则自动创建
    + 参数2:type(类型),此处选择 fanout 模式
  1. 发布消息:在 basicPublish 方法的第一个参数中输入上述定义好的交换机的名字,第二个参数,路由键为空
* 循环 20 条是为了演示消费者

4.2.3.2.2 消费者代码

  • 消费者1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class Consumer1 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("order", "fanout");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定临时队列和交换机
channel.queueBind(queue, "order", "");
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号:消费-" + new String(body));
}
});
}
}
  1. 声明交换机
  2. 创建临时队列
  3. 绑定临时队列和交换机
    • queueBind 方法解释
      • 参数1:queue(临时队列)
      • 参数2:exchange(交换机)
      • 参数3:routingKey(路由key)
  • 消费者2:演示了一个连接中,多个通道的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();

// 获取连接通道
Channel channel = connection.createChannel();
Channel channel2 = connection.createChannel();

// 声明交换机
channel.exchangeDeclare("order", "fanout");
channel2.exchangeDeclare("order", "fanout");

// 创建临时队列
String queue = channel.queueDeclare().getQueue();
System.out.println(queue);

// 绑定临时队列和交换机
channel.queueBind(queue, "order", "");
channel2.queueBind(queue, "order", "");

// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号:消费-" + new String(body));
}
});

// 消费消息
channel2.basicConsume(queue, true, new DefaultConsumer(channel2) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2-2号:消费-" + new String(body));
}
});
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码消费者2号:消费-fanout!
消费者2号:消费-fanout!
消费者2-2号:消费-fanout!
消费者2号:消费-fanout!
消费者2号:消费-fanout!
消费者2号:消费-fanout!
消费者2号:消费-fanout!
消费者2号:消费-fanout!
消费者2号:消费-fanout!
消费者2号:消费-fanout!
消费者2号:消费-fanout!
消费者2-2号:消费-fanout!
消费者2-2号:消费-fanout!
消费者2-2号:消费-fanout!
消费者2-2号:消费-fanout!
消费者2-2号:消费-fanout!
消费者2-2号:消费-fanout!
消费者2-2号:消费-fanout!
消费者2-2号:消费-fanout!
消费者2-2号:消费-fanout!

4.2.3.2.3 为什么消费者中也声明交换机?

从上面的代码中可以看出,在 Producer 和 Conusmer 中我们都分别声明了交换机,但是消费者由图可知,并不会与交换机有直接的接触,为什么消费者中也声明交换机呢?

这是为了保证 Producer 或者 Producer 执行的时候,永远不会因为交换机还没被声明而出错,例如你只在 Producer 声明了交换机,那么你就必须先启动 Producer ,如果直接执行 Conusmer 此时交换机就还不存在,就会报错。而全部写入声明,则可以保证不论先启动谁,都会声明到交换机。

4.2.4 路由模式( Routing / Direct)

  • Producer:消息的生产者(发送消息的程序)。
  • Exchange :交换机,负责发送消息给指定队列。
  • routingKey:路由key,即上图的 key1,key2 等,相当于在交换机和队列之间又加了一层限制
  • Queue:消息队列,理解为一个容器,生产者向它发送消息,它把消息存储,等待消费者消费。
  • Consumer:消息的消费者(接收消息的程序)。

4.2.4.1 如何理解

路由模式的交换机类型是 direct,与 fanout 模式相比,多了路由 key 这个概念。生产者发送携带指定 routingKey(路由key) 的消息到交换机,交换机拿着此 routingKey 去找到绑定了这个 routingKey 的队列,然后发送到此队列,一个队列可以绑定多个 routingKey 。

4.2.4.2 代码实现

4.2.4.2.1 生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("order_direct", "direct");
// 指定 routingKey
String key = "info";
// 发布消息
channel.basicPublish("order_direct", key, null, ("发送给指定路由" + key + "的消息").getBytes());
// 通过工具关闭channel和释放连接
RabbitMqUtil.close(channel, connection);
}
}
  1. 指定 routingKey ,即在 basicPublish 方法 的第二个参数中,指定 key 的值

4.2.4.2.2 消费者代码

  • 消费者 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class Consumer1 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("order_direct", "direct");
// 获取临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定临时队列和交换机
channel.queueBind(queue, "order_direct", "info");
channel.queueBind(queue, "order_direct", "error");
channel.queueBind(queue, "order_direct", "warn");
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:消费-" + new String(body));
}
});
}
}
  1. 只是在绑定队列和交换机的时候,增加了 key 这个值
  • 消费者2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("order_direct", "direct");
// 获取临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定临时队列和交换机
channel.queueBind(queue, "order_direct", "error");
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:消费-" + new String(body));
}
});
}
}

运行结果:只有消费者 1 收到了消息

1
java'复制代码消费者1:消费-发送给指定路由info的消息

4.2.5 通配符匹配模式(Topic)

  • Producer:消息的生产者(发送消息的程序)。
  • Exchange :交换机,负责发送消息给指定队列。
  • routingKey:路由key,即上图的 key1,key2 等,相当于在交换机和队列之间又加了一层限制
    • 但是 Topic 中的 key 为通配符的形式,这样可以大大的提高效率
  • Queue:消息队列,理解为一个容器,生产者向它发送消息,它把消息存储,等待消费者消费。
  • Consumer:消息的消费者(接收消息的程序)。

4.2.5.1 如何理解

通配符匹配模式的交换机类型为 topic,因为它与 Direct 模式很相似,所以大家有时候也会把 Direct 模式和 Topic 共同归入路由模式下,它们的区别就是,Direct 模式的 routingKey 是一个指定的值,而 Topic 模式的 routingKey 可以使用通配符, 而且一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: ideal.insert。

  • * :匹配正好一个词,例如: order.* 可以匹配到 order.insert
  • #:匹配一个或者多个词,例如:order.# 可以匹配到 order.insert.common
    • # 就像一个多层的概念,而 * 只是一个单层的概念

4.2.5.2 代码实现

4.2.5.2.1 生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class Producer {
@Test
public void sendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel = connection.createChannel();
channel.exchangeDeclare("order_topic", "topic");
// 声明交换机
String key = "user.query.all";
// 发布消息
channel.basicPublish("order_topic", key, null, ("发送给指定路由" + key + "的消息").getBytes());
RabbitMqUtil.close(channel, connection);
}
}

4.2.5.2.2 消费者代码

  • 消费者1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class Consumer1 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("order_topic", "topic");
// 获取临时队列
String queue = channel.queueDeclare().getQueue();
// 指定路由key
String key = "user.*";
channel.queueBind(queue, "order_topic", key);
// 发布消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:消费-" + new String(body));
}
});
}
}
  • 消费者2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class Consumer2 {
public static void main(String[] args) throws IOException {
// 通过工具类获取连接
Connection connection = RabbitMqUtil.getConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare("order_topic", "topic");
// 获取临时队列
String queue = channel.queueDeclare().getQueue();
// 指定路由key
String key = "user.#";
channel.queueBind(queue, "order_topic", key);
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:消费-" + new String(body));
}
});
}
}

运行结果:只有消费者 2 收到了消息,因为消息是一个多层的结构,只有 user.# 能匹配到

1
java'复制代码消费者2:消费-发送给指定路由user.query.all的消息
  1. Springboot 实现 RabbitMQ

SpringBoot 提供 Spring For RabbitMQ 的启动器,同时提供了一系列注解以及 RabbitTemplate 供我们使用,能够极大的简化开发 RabbitMQ 的步骤,下面分别演示了【5.1 基于纯注解】 以及【 5.2 基于注解 + 配置类】 的写法,其使用方式大同小异,只是声明和绑定队列交换机等的位置不同。一般认为后者更好维护管理,任选其一即可。

环境准备:

  1. 首先创建 SprinBoot 项目,然后选择 RabbitMQ 的启动器,以及单元测试等基本启动器
  2. 编写 yml 配置文件,编写连接 RabbitMQ 需要的数据

RabbitMQ 依赖

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml 配置文件

1
2
3
4
5
6
7
yml复制代码spring:
rabbitmq:
host: 192.168.122.1 # 服务器地址
port: 5672 # tcp端口
username: admin # 用户名
password: admin # 用户密码
virtual-host: /rabbitmq_springboot_01 # 虚拟主机

5.1 基于纯注解

注:此方式没有创建配置类来管理队列以及交换机的声明和绑定等,而是全部通过注解的方式直接在消费者中写入

5.1.1 简单队列模式

所有生产消息的代码,我们都放到 Test 中去做

  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleSendMessage() {
rabbitTemplate.convertAndSend("simple_queue", "This is a message !");
}
}
  1. 第一步就是注入 SpringBoot 提供给我们的 RabbitTemplate
  2. 通过 RabbitTemplate 的 convertAndSend 方法用来发送消息,他有多种重载方式,今天分别会用到 2 个 和 3 个参数的
    • convertAndSend 方法详解(两个参数)
      • 参数1:routingKey(路由key)
      • 参数2:object(发送的消息正文)
    • convertAndSend 方法详解(三个参数)
      • 参数1:exchange(交换机)
      • 参数2:routingKey(路由key)
      • 参数3:object(发送的消息正文)
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
java复制代码// 注入容器
@Component
// 监听 RabbitMQ
@RabbitListener(queuesToDeclare = @Queue(value = "simple_queue", durable = "true", exclusive = "false", autoDelete = "false"))
public class SimpleConsumer {
// 自动回调
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("消费者:" + message);
}
}
  1. 注入容器
  2. 监听 RabbitMQ,在 @RabbitListener 注解中,可以实现,队列的声明,以及后面交换机与队列的绑定等
* @Queue 可以有四个参数,因为其各有默认值,所以只给定 value 值,就会按照 持久化,非独占,非自动删除的方式默认创建
    + 参数1:value(队列名)
    + 参数2:durable ( 持久化消息队列)RabbitMQ 重启后,队列仍存在,默认 true
    + 参数3:exclusive(是否独占) 表示该消息队列是否只在当前 Connection 生效,默认是 false
    + 参数4:auto-delete(自动删除)表示消息队列没有在使用时将被自动删除,默认是 false
  1. 在方法上添加 @RabbitHandler 注解,就能够实现自动回调,这样我们就能拿到生产者中的消息了
* 注:receiveMessage 这个方法的参数类型,取决于你在生产者有发送了什么类型的数据

5.1.2 工作队列模式

5.1.2.1 轮询模式

  • 生产者:没什么好说的,因为工作模式有多个消费者,所以多发送几条消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired

@Test
public void testWorkSendMessage() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("work_queue", "This is a message !, 序号:" + i);
}
}
}
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Component
public class WorkConsumer {
// 监听 RabbitMQ
@RabbitListener(queuesToDeclare = @Queue("work_queue"))
// 消费者1
public void receiveMessage1(String message) {
System.out.println("消费者1:" + message);

// 监听 RabbitMQ
@RabbitListener(queuesToDeclare = @Queue("work_queue")
// 消费者2
public void receiveMessage2(String message) {
System.out.println("消费者2:" + message);
}
}
  1. @RabbitListener 注解,既可以放在类上,也可以放在方法上,例如上述代码,我们就分别放在了两个方法上,用来指代不同的消费者。
    • 但是如果在类上加入 @RabbitListener 注解,而在下面两个方法中,添加 @RabbitHandler 注解则会报错,需要分别为每个消费者都创建一个类

5.1.2.2 公平模式(按能力分配)

5.1.2.2.1 修改配置文件的方式

  • 生产者不变
  • 修改配置文件 yml / properties
1
2
3
4
5
6
7
8
9
10
11
12
yml复制代码spring:
rabbitmq:
host: 192.168.122.1 # 服务器地址
port: 5672 # tcp端口
username: admin # 用户名
password: admin # 用户密码
virtual-host: /rabbitmq_springboot_01 # 虚拟主机
# 新增部分
listener:
simple:
acknowledge-mode: manual # 开启 ack 手动应答
prefetch: 1 # 每次只能消费 1 条消息
  1. acknowledge-mode 选项介绍
    • auto:自动确认,为默认选项
    • manual:手动确认(按能力分配就需要设置为手动确认)
    • none:不确认,发送后自动丢弃
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码@Component
public class WorkConsumer {
// 监听 RabbitMQ
@RabbitListener(queuesToDeclare = @Queue("work_queue"))
// 消费者 1
public void receiveMessage(String body, Message message, Channel channel) throws IOException {
try {
// 打印输出消息主题
System.out.println("消费者1:" + body);
// 返回 deliveryTag 代表队列可以删除此消息了
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
// 消费者告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}

// 监听 RabbitMQ
@RabbitListener(queuesToDeclare = @Queue("work_queue"))
// 消费者 2
public void receiveMessage2(String body, Message message, Channel channel) throws IOException{
try {
// 延迟 2s 代表处理业务慢
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

try {
// 打印输出消息主题
System.out.println("消费者2:" + body);
// 返回 deliveryTag 代表队列可以删除此消息了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
// 消费者告诉队列信息消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
  1. 因为在 yml 配置中开启了手动确认,所以,需要在成功和失败后分别返回确认消息
  2. basicAck 方法解释
* 参数1:deliveryTag(交付标志,即该消息的index),返回即代表确认收到消息,队列可以删除此消息了
* 参数2:mutiple(是否批量)选择 true 将一次性拒绝所有小于 deliveryTag 的消息
  1. basicNack 方法解释
* 参数 1 | 参数 2 同上
* 参数3:requeue(被拒绝的是否重新进入队列)

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码消费者1:This is a message !, 序号:2
消费者1:This is a message !, 序号:3
消费者1:This is a message !, 序号:4
消费者1:This is a message !, 序号:5
消费者1:This is a message !, 序号:6
消费者1:This is a message !, 序号:7
消费者1:This is a message !, 序号:8
消费者1:This is a message !, 序号:9
消费者1:This is a message !, 序号:10
消费者1:This is a message !, 序号:11
消费者1:This is a message !, 序号:12
消费者1:This is a message !, 序号:13
消费者1:This is a message !, 序号:14
消费者1:This is a message !, 序号:15
消费者1:This is a message !, 序号:16
消费者1:This is a message !, 序号:17
消费者1:This is a message !, 序号:18
消费者1:This is a message !, 序号:19
消费者1:This is a message !, 序号:20

消费者2:This is a message !, 序号:1

到现在已经实现了修改配置文件的方式实现按能力分配,补充几个配置的内容,我们上面只用了一部分,其他的方便大家参考,yml 和 properties 大家自己选择即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
properties复制代码# 发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# spring.rabbitmq.publisher-confirms=true(旧版)
# 发送回调
spring.rabbitmq.publisher-returns=true
# 消费手动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 并发消费者初始化值
spring.rabbitmq.listener.simple.concurrency=1
# 并发消费者的最大值
spring.rabbitmq.listener.simple.max-concurrency=10
# 每个消费者每次监听时可拉取处理的消息数量
# 在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
spring.rabbitmq.listener.simple.prefetch=1
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

5.1.2.2.1 配置工厂的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码/**
* 设置消费者的确认机制,并达到能者多劳的效果
*
* @param connectionFactory 连接工厂
* @return
*/
@Bean("workListenerFactory")
public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory =
new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
// 修改为手动确认
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 拒绝策略,true 回到队列 false丢弃,默认是true
containerFactory.setDefaultRequeueRejected(true);
// 默认的PrefetchCount是250 修改为 1
containerFactory.setPrefetchCount(1);

return containerFactory;
}
  • 消费者修改
1
2
3
java复制代码@RabbitListener(queuesToDeclare = @Queue("work_queue"))
// 将上面的监听,增加 containerFactory 属性,然后将配置好的工厂传入
@RabbitListener(queuesToDeclare = @Queue("work_queue"), containerFactory = "workListenerFactory")

5.1.3 发布与订阅模式

  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired

@Test
public void testFanoutSendMessage() {
rabbitTemplate.convertAndSend("order_exchange", "", "This is a message !");
}
}
  1. 因为从这个模式开始,就涉及到交换机了,所以用的是三个参数的方法
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Component
public class FanoutConsumer {
// 绑定临时队列和交换机
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(), // 临时队列
exchange = @Exchange(name = "order_exchange", type = "fanout") // 交换机与类型
)
})
public void receiveMessage1(String message) {
System.out.println("消费者1:" + message);
}

// 绑定临时队列和交换机
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(), // 临时队列
exchange = @Exchange(name = "order_exchange", type = "fanout") // 交换机与类型
)
})
public void receiveMessage2(String message) {
System.out.println("消费者2:" + message);
}
}

5.1.4 路由模式(Direct)

  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired

@Test
public void testDirectSendMessage() {
rabbitTemplate.convertAndSend("direct_exchange", "info", "This is a message !");
}
}
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码@Component
public class DirectConsumer {
// 绑定临时队列和交换机
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(), // 临时队列
exchange = @Exchange(name = "direct_exchange", type = "direct"), // 交换机和类型
key = {"info", "warn", "error"} // 路由key
)

})
public void receiveMessage1(String message) {
System.out.println("消费者1:" + message);
}

// 绑定临时队列和交换机
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(), // 临时队列
exchange = @Exchange(name = "direct_exchange", type = "direct"), // 交换机和类型
key = {"info", "warn", "error"} // 路由key
)

})
public void receiveMessage2(String message) {
System.out.println("消费者2:" + message);
}
}

5.1.5 主题模式

  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired

@Test
public void testTopicSendMessage() {
rabbitTemplate.convertAndSend("topic_exchange", "order.insert.common", "This is a message !");
}
}
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码@Component
public class TopicConsumer {
// 绑定临时队列和交换机
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(), // 临时队列
exchange = @Exchange(name = "topic_exchange", type = "topic"), // 交换机和类型
key = {"order.*"} // 通配符路由key
)

})
public void receiveMessage1(String message) {
System.out.println("消费者1:" + message);
}

// 绑定临时队列和交换机
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(), // 临时队列
exchange = @Exchange(name = "topic_exchange", type = "topic"), // 交换机和类型
key = {"order.*"} // 通配符路由key
)
})
public void receiveMessage2(String message) {
System.out.println("消费者2:" + message);
}
}

5.2 基于注解 + 配置类

其实这种方式,就是将交换机,队列的声明和绑定都在配置类中进行,一个是消费者中的注解变的简洁了,再有就是统一管理,更加条理,而且生产者和消费者引用的时候也更加方便,日后修改的时候,也不需要对每一处都修改。

由于篇幅过长了,这里演示最复杂的 Topic 方式,其他的也是信手拈来。

  • 配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码@Configuration
public class RabbitMqConfiguration {

public static final String TOPIC_EXCHANGE = "topic_order_exchange";
public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
public static final String TOPIC_QUEUE_NAME_2 = "test_topic_queue_2";
public static final String TOPIC_ROUTINGKEY_1 = "test.*";
public static final String TOPIC_ROUTINGKEY_2 = "test.#";

@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}

@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE_NAME_1);
}

@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE_NAME_2);
}

@Bean
public Binding bindingTopic1(){
return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with(TOPIC_ROUTINGKEY_1);
}
@Bean
public Binding bindingTopic2(){
return BindingBuilder.bind(topicQueue2())
.to(topicExchange())
.with(TOPIC_ROUTINGKEY_2);
}

}
  1. 添加 @Configuration 注解:表明这是一个配置类
  2. 定义常量:将交换机名,队列名,路由key 等都可以创建为常量,调用,管理和修改都非常方便,还可以创建出一个专门的 RabbitMQ 的常量类。
  3. 定义交换机:因为这个例子是 Topic 所以选择 TopicExchange 类型
  4. 定义队列:传入队列名常量即可,因为持久化等存在默认值,也可以自己自定持久化,是否独占等参数
  5. 绑定交换机和队列:利用 BindingBuilder 的 bind 方法绑定队列,to 绑定到指定交换机,with 传入路由key
  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired

@Test
public void testTopicSendMessage() {
rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
}
}
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Component
public class TopicConsumer {
// 绑定队列即可
@RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_1})
public void receiveMessage1(String message) {
System.out.println("消费者1:" + message);
}

// 绑定队列即可
@RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_2})
public void receiveMessage2(String message) {
System.out.println("消费者2:" + message);
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

nodejs+Puppeteer(无头浏览器) 实现服务端

发表于 2021-06-15

应用场景:
客户在创建前后端分离的应用时,前端只用于做一些基本的数据展示,如果涉及大量的数据或报表生成,建议放在服务器端展示,本文主要分享如何在服务器端使用ActiveReportsJS 实现报表导出PDF功能

Demo下载:

gcdn.grapecity.com.cn/forum.php?m…

环境准备:
node.js v14.15.0+Headless 无头浏览器

操作步骤:

添加资源文件

image.png

配置资源和文件
const puppeteer = require(‘puppeteer’);
const fs = require(‘fs’);
var static = require(‘node-static’);
var http = require(‘http’);

var file = new(static.Server)(__dirname + ‘/resources’);

http.createServer(function (req, res) {
file.serve(req, res);
}).listen(9999);

const fonts = [
{
name: ‘Montserrat’,
source: ‘Montserrat-Regular.ttf’
},
{
name: ‘Montserrat’,
source: ‘Montserrat-Medium.ttf’,
weight: 500
}
];

调用浏览器并初始化 调用ARJS 导出PDF文件

(async () => {
const browser = await puppeteer.launch({headless: true});
const page = await browser.newPage();
await page.goto(http://localhost:9999/host.html);
//await page.goto(${__dirname}/resources/host.html);
const pdfString =await page.evaluate(({reportUrl, fonts}) =>

new Promise(async (resolve, reject) => {
// await GC.ActiveReports.Core.FontStore.registerFonts(fonts);
const report = new GC.ActiveReports.Core.PageReport();
await report.load(reportUrl);
const doc = await report.run();
const result = await GC.ActiveReports.PdfExport.exportDocument(doc, {fonts: fonts, info: {author: ‘GrapeCity’}});
const reader = new FileReader();
reader.readAsBinaryString(result.data);
reader.onload = () => resolve(reader.result);
reader.onerror = () => reject(‘Error occurred while reading binary string’);
}), {reportUrl: ‘SimpleTable.rdlx-json’, fonts: fonts});

1
2
3
4
ini复制代码const pdfData = Buffer.from(pdfString, 'binary');
fs.writeFileSync(`${__dirname}/out115.pdf`, pdfData);
console.log('done');
process.exit(0);

})();

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

环境安装——MySQL安装 一文教会你安装与卸载MySQL

发表于 2021-06-15

一文教会你安装与卸载MySQL

  1. 官网下载

访问官方网站:www.mysql.com/ 下载MySQL。

这里就不是我着重讲的重点了。接下来还是用我发给大家的版本吧。也可以自行下载免费的版本都可以的啦!

  1. 文件下载

这里直接分享一份百度网盘的链接给大家。

链接:pan.baidu.com/s/1IECCQqcJ…
提取码:a646

里面包括了MySQL的安装,还有SQLyog图形界面的安装包。

  1. 卸载步骤

有人就会奇怪,为什么要先将卸载步骤呢?因为我这台电脑中,本来就存在了MySQL,所以还是带大家先来卸载步骤啦。哈哈,无非我就是懒。

卸载这一步骤还是非常有用滴!

  1. 停止MySQL服务。

具体有两种方式:

* 通过命令行方式:`Win + R` 然后敲`cmd`进入命令行。



1
2
3
4
5
shell复制代码net stop mysql

按下entry键后结果:
MySQL 服务正在停止.
MySQL 服务已成功停止。
* 通过计算机管理方式 右击计算机-->管理-->服务-->启动或停止`MySQL`服务 ![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/c5d4dd657226fe183c178b20854a56bd9909a307040b11c602ec9e4a7d5d990a)
  1. 下载MySQL安装程序。

找到控制面板–> 程序和功能,卸载MySQL程序。


3. 卸载过后,也需要删除文件夹中的MySQL,比如我的下载目录是E:\Program Files (x86)中MySQL该目录下剩余了所有文件,把MySQL文件夹也删了。


4. 删除C盘ProgramDate隐藏目录中关于MySQL的目录。(重点)

ProgramDate目录是隐藏的,所以需要我们显示出来才能继续操作:

* 可以点击`C`盘中的查看,--> 再点击隐藏的项目的功能,就能得到了。


![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/7d3929929ae5df331b5d6bafc8c9350f30fb92a612ffc3238ba2e2d0579ea0b1)
* 如果没有看到这个功能的话,就按我下面这个步骤来


第一步:在计算机界面,点击组织,选择文件夹和搜索选项:


![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/cc62f11cbe89da573abf67726d837313fae1dd259e3af9aa39abce367bb58cac)


第二步:弹出如下对话框,点击查看:


![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/45404222ee48353180e54d3ccc5a39a6fe90600b7130490a9f88ed0253994d95)


第三步:在高级设置中向下搜索,可看到显示隐藏文件夹,点击确认。


![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/37da9103745b0151c4cae7ef86b4fca2a7ada9785018aff763c67837deed1e6e)此时你已经得到了这个`ProgramDate`隐藏目录了。接下来就是删除里面的`MySQL`。

走到这里,基本上来说,你的MySQL已经完全卸载干净了。接下来就是正式咱们的安装步骤了。

  1. 安装步骤

安装前注意先关闭了windows防火墙。

  1. 打开下载的MySQL安装文件mysql-5.5.40-win64.msi,直接双击打开即可。

  1. 进入到安装MySQL界面中。
* 进入界面后,点击下一步即`Next`:


![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/4cea4019c363b2274f1823dcafd7b7e2e776676668db6c6fca3dc1872b5e1e5e)
* 选择同意安装,并进入下一步:


![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/f5dd12792788ffee0c89d4bbc8ec351d7389898ef2644160c1b8fb3e9b259c2b)
* 选择安装类型,有`Typical`(默认)、`Complete`(完全)、`Custom`(用户自定义)三个选项,选择`Custom`,按`Next`键继续:


![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/49b7d12e78e321dc65e68621cd68a3faa3278b72dc217e10089f20fe6de284ab)
* 默认路径即可,如果真的`C`盘放不下了,可以选择`Browse`来手动指定安装目录。


![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/f6cdb1e3639e6d52e88e4f2398e6e2a19cd866da1f2470b24a015a49834c52f0)
* 点击安装即可,就可以安装`MySQL`了:


![](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/c864f86f9b367b3e9c0eb3d9acf4764fa6d1db512ca92103d2ff1701ac083f27)
  1. MySQL配置

  • 在安装完成后,出现如下界面将进入MySQL配置向导:

点击Finish后,就会进入配置界面,按Next继续。:

  • 选择配置方式,Detailed Configuration(手动精确配置)、Standard Configuration(标准配置),我们选择Detailed Configuration,方便熟悉配置过程,按Next继续。

  • 选择服务器类型,Developer Machine(开发测试类,mysql占用很少资源)、Server Machine(服务器类型,mysql占用较多资源)、Dedicated MySQL Server Machine(专门的数据库服务器,mysql占用所有可用资源)。这里我们选择Developer Machine,按Next继续。

  • 选择mysql数据库的大致用途,Multifunctional Database(通用多功能型,好)、Transactional Database Only(服务器类型,专注于事务处理,一般)、Non-Transactional Database Only(非事务处理型,较简单,主要做一些监控、记数用,对MyISAM数据类型的支持仅限于non-transactional),按Next继续。

  • 选择表空间的路径,默认即可。按Next继续。

  • 选择网站并发连接数,同时连接的数目,Decision Support(DSS)/OLAP(20个左右)、Online Transaction Processing(OLTP)(500个左右)、Manual Setting(手动设置,自己输一个数)。这里选择Manual Setting然后设置20即可,按Next继续。

  • 是否启用TCP/IP连接,设定端口,如果不启用,就只能在自己的机器上访问MySQL数据库了,在这个页面上,您还可以选择“启用标准模式”(Enable Strict Mode),这样MySQL就不会允许细小的语法错误。如果是新手,建议您取消标准模式以减少麻烦。但熟悉MySQL以后,尽量使用标准模式,因为它可以降低有害数据进入数据库的可能性。按Next继续 。

  • 就是对MySQL默认数据库语言编码进行设置(重要),一般选UTF-8,按 Next继续。

  • 选择是否将MySQL安装为windows服务,还可以指定Service Name(服务标识名称),是否将MySQL的bin目录加入到windows path(加入后,就可以直接使用bin下的文件,而不用指出目录名,比如连接,mysql.exe -uusername -ppassword就可以了,不用指出MySQL.exe的完整地址,很方便),我这里全部打上了勾,Service Name不变。按Next继续。

  • 询问是否要修改默认root用户(超级管理)的密码。Enable root access from remote machines(是否允许root用户在其它的机器上登陆,如果要安全,就不要勾上,如果要方便,就勾上它)。最后Create An Anonymous Account(新建一个匿名用户,匿名用户可以连接数据库,不能操作数据,包括查询),一般就不用勾了,设置完毕,这里密码设置为123,上下得一致,才可以按Next继续。

  • 确认设置无误,按Execute使设置生效,即完成MySQL的安装和配置。

  • 一般来说,直接点Execute,就会成功到这个界面:

接下来如果安装过程可能出现的错误,我都讲述的,如果没有的话,那就太怪异啦。

  1. 安装出现的问题

6.1 如果出现了最后一步卡死的状态的话,不要慌:

首先等上一下,看是不是只是突然卡了,如果一直卡死状态,可以用任务管理器把他关闭,接下来就按我说的操作做即可:

重启电脑后,再次右键以管理员的身份运行,然后继续按照之前的配置来配置即可。

6.2 如果出现在安装最后一步过程中,或者点击Finish后出现1045

这个原因一般出现在以前有安装MySQL的服务器上。

解决的办法:

  1. 卸载MySQL。保证能卸载干净,在之前卸载MySQL中也讲解过了。
  2. 保证删除:

Windows Xp系统删除目录C:\Documents and Settings\All Users\Application Data\

windows 7\8\10操作系统删除目录C:\ProgramData\MySQL。

这个步骤上面卸载的时候也讲解过。注意一定删除即可。
3. 重新安装,按安装和配置的步骤走即可。

6.3 提示 Can’t connect to MySQL server (10060)

当远程连接MySQL数据库的时候显示Can't connect to MySQL server (10060)>
解决办法:

  • 关闭windows防火墙即可解决该问题。

6.4 出现10061错误

如果出现ERROR 2003: Can't connect to MySQL server on 'localhost' (10061)。

  • 首先检查MySQL服务有没有启动 –> 如果没有启动,则要启动这个服务,即可解决。
  • 如果问题还是没有解决请尝试下面的步骤:
    1. 删除MySQL安装路径下面的my.ini。
    2. 打开MySQL安装路径的\bin\winmysqladmin.exe 输入用户名 和密码。
    3. 进入命令行中,然后在dos下 输入:mysqld-nt -remove即删除服务。
    4. 在dos下接着输入:mysqld-nt -install安装服务。
    5. 在dos下接着输入:net start mysql启动服务。

6.5 忘记密码咋办

几个步骤教会你重置密码:

  1. 首先检查MySQL服务是否启动,若已启动则先将其停止服务,可在开始菜单的运行,使用cmd进入命令行,在dos命令窗口中输入如下命令:net stop mysql


2. 再打开一个cmd窗口,输入下列命令(敲回车后,保持窗口的状态,不要关闭该窗口,直接进行下面的步骤即可;):

注意:下面语句中的文件路径需要改成你自己的文件路径,一般安装在自己的目录下,如果默认的话,一般跟我一样,还是要看看这个文件夹下面有没有my.ini文件。

mysqld --defaults-file="C:\Program Files\MySQL\MySQL Server 5.5\my.ini" --console --skip-grant-tables

解释下:这个命令可以通过跳过权限安全检查,开启MySQL服务,这样连接MySQL时,可以不用输入用户密码。


3. 打开第二个cmd窗口,输入下列命令:mysql -uroot -p

出现下面的提示时直接敲回车,不用输入密码:Enter password:

然后就就会出现登录成功的信息。


4. 继续输入如下命令:use mysql;

然后输入如下命令更改root密码:UPDATE user SET Password=PASSWORD('newpassword') where USER='root';

注意:上面语句中的newpassword需要改成你自己想要设置的密码。也就是你修改后的密码。


5. 输入如下命令刷新权限:FLUSH PRIVILEGES;

然后输入如下命令即可退出: quit


6. 重新登录,测试是否修改成功: mysql -uroot -p123

显示登录信息: 成功 就一切ok了,如果不成功,从新执行第4-8步直到成功为止。


7. 此时可以关闭所有的dos窗口了。
8. 再次手动启动MySQL的服务即可,正常使用了。

或者在dos命令行输入:net start mysql 命令也可以实现启动MySQL的服务!

  1. 安装SQLyog软件

按着我的步骤直接安装。、

  1. 点击下载好的SQLyog软件。


2. 选择中文,并点击OK进入下一步:


3. 然后无脑下一步就可以啦。其中点击接受。路径可以选择自己想要安装的路径。


4. 安装完成后,点击Finish后就会出现要注册SQLyog界面。

点击sn.txt,里面有相应的名称和证书密匙,可以选择其中一个注册。
5. 然后进入登录界面,按步骤登录即可,账号和密码就是你安装MySQL时的账号和密码。端口号默认为3306。


6. 链接进去后,看下软件界面信息:

到这里你就安装完MySQL相关的所有东西啦。

注: 如果文章有任何错误和建议,请各位大佬尽情留言!如果这篇文章对你也有所帮助,希望可爱亲切的您给个三连关注下,非常感谢啦!也可以微信搜索太子爷哪吒公众号私聊我,感谢各位大佬!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…642643644…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%