开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

讲解JVM原理的文章铺天盖地,希望这篇足够通俗易懂

发表于 2021-07-28

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」

导读

学习过C/C++的同学都有过这样的体验,无论实现什么样的功能,用C/C++实现时,会存在下面两个问题:

  1. 内存管理:使用C/C++编程,我们必须很好地管理系统内存,如果稍有不慎,可能就会有内存溢出的风险
  2. 跨平台:比如,我们用C/C++实现聊天工具,为了让该工具可以在Windows、Mac OS、Linux等多个操作系统下使用,就光网络通讯部分,我们就不得不逐个调用这些操作系统自带的库函数来实现,这个代价是很高的

于是,Sun公司的大佬们决定开发Java语言,该语言使用JVM运行其编写的程序,让JVM来处理上面两个问题:内存管理和跨平台对接。大佬们希望通过这样的方案,让程序员们把更多的精力放在功能实现上。

网上有铺天盖地的文章讲解了JVM内存管理部分,但是,这些文章大多存在以下2个问题:

  1. 讲得不够透彻,导致你产生一种知道大概,但又感觉不够的意犹未尽之感
  2. 内容讲得的确通俗易懂,但是,总感觉支离破碎,知识点无法串联,给你一种不怎么完整的感觉

因此,今天,小k就以一个真实案例为起点,从JVM源码的角度深入剖析案例程序在JVM中的处理过程,给到你更透彻、更连贯的感受。

案例

假设掘金社区后端使用Java开发,掘金的程序员使用使用下面这段代码来启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
typescript复制代码package com.juejin;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class JueJinApplication {
​
public static void main(String[] args) {
SpringApplication.run(JueJinApplication.class, args);
}
​
}

这是一段经典的Spring Boot启动类,那么,当我们将这个类打成jar包后,使用如下java命令执行这个jar:

1
bash复制代码java -cp juejin.jar com.juejin.JueJinApplication

此时,JVM内部会发生什么变化呢?

JNI

写Java的同学都知道,一段Java程序执行的入口是一个main方法,因此,JVM要执行上面这个jar包中的main方法并管理程序的内存,首先,得从jar中找到程序对应的main方法,即JueJinApplication类中的main,然后,把其加载到JVM中,这样,JVM才能自主地管理main方法使用的内存。

于是,Sun公司的程序员们开始着手编写main方法的查找逻辑,在《导读》中,我提到使用C/C++编程,我们必须很好地管理系统内存,于是,程序员们发现使用C++编写查找main方法的功能还要自己管理内存,这样太费事了,因此,他们就想出来一个方案:JNI。

JNI约定了一套Java与其他编程语言交互的契约,通过这个契约,我们就可以实现Java和其他编程语言的双向交互。比如,我们可以用C++调用Java的方法,反之,也可以用Java调用C++的函数。像下面这张图一样:

image.png

有了JNI之后,Sun公司的程序猿们就可以用Java实现案例中查找main方法的功能了,见下图:

image.png

上图就是《导读》案例中Java命令启动时,JVM查找main方法的示意图,JVM通过C++实现的LoadMainClass函数调用Java实现的checkAndLoadMain方法来查找并加载main方法。

image.png

上图中红线部分描述了JVM启动过程中,寻找和加载com.juejin.JueJinApplication及main方法的详细过程:

  1. 通过JLI_Launch函数启动JVM
  2. JLI_Launch内部调用ParseArguments函数解析启动参数
  3. 发现启动参数为-cp,JVM设置启动模式为LM_CLASS,表示指定mainClass启动
  4. 调用GetStaticMethodID函数获取方法名为checkAndLoadMain的方法ID
  5. 调用NewPlatformString函数转换checkAndLoadMain方法的入参,即启动类com.juejin.JueJinApplication的名字
  6. 调用CallStaticObjectMethod函数执行checkAndLoadMain方法,见上图最右边的黄色框:
* 由于启动模式为LM\_CLASS,使用SystemClassLoader去加载启动类mainClass,即com.juejin.JueJinApplication,当然还包括类中的方法main

通过上面的流程,我们发现,由于checkAndLoadMain是一个Java方法,因此,JVM通过JNI调用了该方法。

由此,我们就总结出了通过JNI调用Java方法的契约:

  • 通过GetStaticMethodID函数获取被调用的Java方法名
  • 通过CallStaticObjectMethod函数执行被调用的Java方法

这点可以帮助你在debug JVM源码时找到对应方法的入口。

仔细看图的小伙伴应该已经发现我好像少讲了一些东西。是的,这里我补充一下:JVM会根据启动模式的不同,走不同的链路来完成mainClass的加载,图中,我只画了两种模式(-cp和-jar)的链路,因为这是我们常用的两种启动模式:

  • -cp:指定启动类启动程序,这条链路我上面讲过了。
  • -jar:指定jar包启动程序,这条链路主要有这几个步骤,见上图紫色线部分:
+ JVM发现启动参数为-jar,于是,设置启动模式为LM\_JAR
+ 由于启动模式为LM\_JAR,于是,从jar中找到manifest文件,提取文件中的Main-Class关键字,找到对应的mainClass名
+ 和LM\_CLASS模式加载启动类一样,使用SystemClassLoader去加载启动类mainClass及内部的main方法

其他两种启动模式LM_SOURCE和LM_MODULE,有兴趣的小伙伴可以自己研究一下~

我们的Java程序最终是由JVM执行的,因此,加载到JVM的main方法,最终还是要通过JVM来处理并执行。

不过在讲解JVM执行main方法前,小k先来给你做一个分析:

我们都知道,无论通过maven还是gradle打包后,打包后,包内部的class文件都是字节码,同时,我们知道这样一个定律:

image.png

如上图是CPU处理程序的定律:金字塔从上到下,CPU处理的性能逐渐下降,即处理CPU缓存是最快的,寄存器其次,处理磁盘是最慢的。

由于CPU缓存的读写,程序不能控制,因此,JVM想要高效地执行程序,肯定希望将程序尽可能地放到寄存器中,这样,CPU处理程序就很快了。

但是,我们的jar中的程序是一段字节码,而学计算机的同学都知道,寄存器中存放的是机器指令,也就是二进制指令,因此,JVM只有将程序字节码转换为机器指令,最后,才能将程序对应的机器指令放入寄存器中。

image.png

于是,如上图所示,《导读》中的案例,JVM在使用SystemClassLoader加载JueJinApplication的时候,做了字节码转指令的工作。ps:为了方便解读,图中箭头右侧的机器指令换成汇编表达了。

但是,这里有一个问题:《导读》案例中的类JueJinApplication及注解@SpringBootApplication,它们是线程共享的,而寄存器中的指令是一个一个线程去读取的,因此,将类JueJinApplication及注解@SpringBootApplication写入寄存器就不太合适了,因此,JVM就设计了MetaSpace来存放这两个信息。关于MetaSpace及JMM相关知识,网上有非常多的文章讲解,这里我就不细说了。

而JueJinApplication中的main方法执行相关的元素是线程独享的,可以存入寄存器中,因此,今天我们主要来看一下JueJinApplication中的main方法是如何转化为机器指令的?

模板解释执行

我们先来看JueJinApplication这个类的字节码长什么样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vbnet复制代码public class com.juejin.JueJinApplication {
 public com.juejin.JueJinApplication();
   Code:
      0: aload_0
      1: invokespecial #1                  // Method java/lang/Object."<init>":()V
      4: return
​
 public static void main(java.lang.String[]);
   Code:
      0: ldc           #2                  // class com/juejin/JueJinApplication
      2: aload_0
      3: invokestatic  #3                  // Method org/springframework/boot/SpringApplication.run:(Ljava/lang/Class;[Ljava/lang/String;)Lorg/springframework/context/ConfigurableApplicationContext;
      6: pop
      7: return
}

这里我简单梳理一下里面的结构,代码中Code表示的就是字节码:

  • JueJinApplication类中的字节码:
+ aload\_0:将this引用压入栈顶
+ invokespecial #1:调用JueJinApplication的父类java.lang.Object的构造方法
  • main方法中的字节码:
+ ldc #2:将类JueJinApplication压入栈顶
+ aload\_0:将args参数压入栈顶
+ invokestatic #3:调用静态方法SpringApplication.run,方法入参为类JueJinApplication和args,返回结构为ConfigurableApplicationContext
+ pop:弹出SpringApplication.run方法返回值,因为main方法中没有使用SpringApplication.run的返回值

已知JueJinApplication类中的字节码,那么,我们要把这些字节码指令转换成对应的机器指令,就不得不考虑一个前提:不同CPU架构的指令集对应的机器指令格式是不一样的。比如,有x86指令集、ARM指令集等等,它们的机器指令格式都不相同。因此,JVM设计了这样一个方案来实现JueJinApplication类中main方法字节码指令和机器指令的转换:

image.png

  1. Bytecodes结构中定义了Java中所有会使用到字节码,JVM将这些字节码传递给TemplateTable。如上图顶部框中aload_0、pop为JueJinApplication类中的字节码指令。
  2. TemplateTable使用上一步得到的全量字节码,生成字节码对应的模板,该模板定义了字节码和机器指令模板的映射关系。这里我以aload_0字节码指令为例看下模板:
* `aload_0 => ubcp|__|clvm|__, vtos, atos, aload_0, _` ,其中,=> 表示aload\_0字节码指令和对应机器指令模板的映射:


    + =>左边的aload\_0代表字节码指令aload\_0
    + =>右边表示aload\_0字节码指令对应的机器指令模板,模板中包含5个参数:


        - flags:里面定义了4个flag:


            * ubcp:是否使用bytecode pointer指向字节码指令,如果classfile中的方法是Java方法,那么,方法内的字节码指令就需要这个指针,这时,该flag就是true,如果classfile中的方法是native方法,由于native方法使用C/C++实现,所以,直接调用方法就行,无需指针
            * disp:是否在模板范围内进行转发,比如,goto指令会跳转到其他指令位置,这时该flag就是true
            * clvm:否需要调用vm\_call函数,由于aload\_0内部会调vm\_call函数,因此,clvm为true,反正,为false
            * iswd:是否是宽指令,比如,iload字节码指令就是宽指令,该指令表示从局部变量表读取变量并压入栈顶,当局部变量表可容纳256个变量,即2^8,这时,iswd为false,而iload指令可能读取的局部变量会很多,会超出2^8,此时,就需要扩展局部变量表大小为2^16,即可容纳65536个变量,此时的iswd就为true根据flags的定义,aload\_0字节码指令是Java方法的,因此,ubcp为true,
        - aload\_0:表示aload\_0字节码指令使用aload\_0函数生成对应的机器指令,因为aload\_0字节码指令对应不只一条机器指令
        - vtos:aload\_0字节码指令的入参,这是执行aload\_0字节码指令对应机器指令操作数的入口地址,下面在《栈顶缓存》中详细讲到
        - atos:aload\_0字节码指令的出参,可能作为下一条指令的入参
        - `_`:aload\_0字节码指令使用到的局部变量,由于aload\_0的入参就是栈里的入参变量,非局部变量,因此,这个参数设为\_\_然后,JVM将字节码和机器指令模板的映射关系传递给TemplateInterpreterGenerator
  1. TemplateInterpreterGenerator调用不同CPU架构汇编器生成字节码指令对应的机器指令,我还是以aload_0字节码指令为例:
* 假设JVM调用了x86架构的汇编器生成机器指令,即上图中的x86 Assembler(汇编器):


    + 如上图,底部蓝框中左边的aload\_0即第2步中模板中的aload\_0参数,表示aload\_0字节码指令使用该参数生成对应的机器指令。
    + 如上图,底部蓝框中右边的aload\_0机器指令,表示aload\_0字节码指令对应的机器指令因此,`aload_0 => aload_0机器指令`表示定义了aload\_0字节码指令生成机器指令的过程。
  1. TemplateInterpreterGenerator根据第2步得到的aload_0机器指令模板,匹配第3步中x86汇编器中的aload_0参数,图中两个标红aload_0表示这个匹配,接着,调用该参数执行并生成aload_0对应的机器指令。如上图黄色框中的aload_0指令就表示aload_0字节码指令对应的机器指令。
  2. 将生成的aload_0机器指令写入ICache,指令缓存
  3. 同理,和aload_0字节码指令一样,JVM将JueJinApplication类中main方法中其他的字节码指令都转换生成对应的机器指令,并写如ICache。

JVM将上面通过TemplateInterpreterGenerator模板解释生成器直接生成机器指令,然后,执行机器指令的方式叫做模板解释执行。这是JVM执行Java程序的一种形式,在Hotspot中还有两种执行方式:字节码解释执行和C++解释执行。感兴趣的同学可以自行了解一下。

栈顶缓存

在前面,我提到JVM将字节码转为机器指令的目的是将转化后的指令写入寄存器,来提升CPU处理程序的性能,在JVM中,这样的写入方式就叫做栈顶缓存。我们就以main方法中的aload_0字节码指令为例,来看下JVM是如何做栈顶缓存的。

写栈顶缓存

image.png

JVM将转换后的机器指令写入寄存器是在生成完机器指令后做的,上图展示了《导读》案例中main方法的aload_0字节码指令写入的过程:

  1. 由于解析完classfile后,我们就知道main方法的入参是args,所以,将args压入栈顶。如上图虚线部分。
  2. 栈顶缓存定义了10种状态,表示缓存的变量类型,如上图绿框部分,这里,我先解释一下:
* btos:缓存bool类型的变量,对应bep表示,该变量在栈中的地址
* ztos:缓存byte类型的变量,对应bep表示,该变量在栈中的地址
* ctos:缓存char类型的变量,对应cep表示,该变量在栈中的地址
* stos:缓存short类型的变量,对应sep表示,该变量在栈中的地址
* itos:缓存int类型的变量,对应iep表示,该变量在栈中的地址
* ltos:缓存long类型的变量,对应lep表示,该变量在栈中的地址
* ftos:缓存float类型的变量,对应fep表示,该变量在栈中的地址
* dtos:缓存double类型的变量,对应dep表示,该变量在栈中的地址
* atos:缓存object类型的变量,对应aep表示,该变量在栈中的地址
* vtos:这个很特殊,表示指令所需变量/参数已经在栈顶,无需缓存,对应vep表示,该变量在栈中的地址执行指令前后,操作数在栈中的变化都反映在`*ep`这个变量里。这些`*ep`组成一个数组entry,如上图绿色部分。**为什么用数组,是因为一条指令执行前后的状态是通过多个ep变量反映在栈中的**。

因为aload_0指令中0表示取栈顶中的变量,说明取数是变量已在栈顶,因此,参考上面的栈顶缓存的10种状态,该aload_0指令对应的vep为栈顶的地址。如上图,entry数组中的vep指向了栈顶。因为aload_0指令没有其他操作数,因此,其他ep变量都指向了栈顶。
3. 将每个ep变量写入一个二维数组,该数组的下标为[栈顶缓存状态][字节码指令],这个二维数组就是栈顶缓存。如上图,entry就是这个二维数组,JVM将entry数组中的每一个ep变量,即aload指令操作数在栈中的位置写入[vtos][aload_0],[atos][aload_0]等等。这样就完成了栈顶缓存。

读取栈顶缓存

有了栈顶缓存,JVM在执行main方法对应机器指令时就可以根据指令+操作数从栈顶缓存中找到对应的操作数,最后,交由CPU执行指令,以案例中的main方法的aload_0字节码指令为例,具体过程如下:

image.png

我们关注图中红线部分:

  1. JVM根据aload_0 + 入参args(表示从栈顶取args值),从栈顶缓存二维数组中定位到[vtos][aload_0],该单元中存的就是vep对应的栈的位置:栈顶
  2. 由于vtos对应vep指向栈顶,于是,JVM从栈顶取到入参args的值
  3. 将args的值传递给CPU
  4. CPU从ICache中取出aload_0对应的机器指令
  5. CPU执行机器指令(aload_0指令 + 操作数args的值)

总结

在这篇文章中,我主要讲解了JNI、模板解释执行、栈顶缓存的概念。我相信你可能还有一些关联问题,比如:

  • 栈是怎么生成的,什么时候生成?
  • 栈里存的到底是什么数据,二进制还是16进制,又或者根据数据类型相关?
  • JVM是怎么操作栈的?

都是很好的问题,小k在后面的文章中,慢慢会详细讲解。

最后,小k还是希望掘金的小伙伴们通过文章的学习,可以有所启发、收获和成长。
当然,如果有任何疑问都可以在评论区留言哈,相信每一位小伙伴将来都会成为技术大牛!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用 RulesEngine 实现基础购买获得积分

发表于 2021-07-27

在上一篇文章中我们已经实现了注册送积分,其实参与事件获得积分都可以通过相似的规则进行实现。

今天我们要实现购买获得积分的规则,这是最常用的积分获取方式。它与之前的参与事件获取积分最大的不同在于通过购买获得的积分是基于付款金额/积分比率的,虽然大部分情况下这个比率是一个固定数值,但由于购买金额是一个变量,因此通过购买获得的积分也不是一个固定的数值,不能像参与事件获得积分一样将最终获得的积分直接写在规则中,而是要通过为变量的付款金额带入一个简单的计算公式才能获得。

添加规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
json复制代码{
"WorkflowName": "Payment",
"Rules": [
{
"RuleName": "GivePointsByPayment",
"SuccessEvent": "Pay success and earn {0} points",
"RuleExpressionType": "LambdaExpression",
"Expression": "true",
"Actions": {
"OnSuccess": {
"Name": "OutputExpression",
"Context": {
"Expression": "subtotal * 0.5"
}
}
}
}
]
}

相比于注册送积分,这个新的规则有以下需要注意的地方:

  1. WorkflowName,RulesEngine 触发规则的标识符,一定要和之前不同,且在之后会被用到。
  2. SuccessEvent,因为获取的积分并不是一个固定数值,所以具体的积分在这里还是一个变量。
  3. Actions,新引入的参数,在满足特定情况的时候可以触发的活动,这里因为只考虑程序主逻辑,所以只写了 OnSuccess。
    1. 在该活动的 Context 的 Expression 中写入了这个规则的具体计算公式 subtotal * 0.5。
      1. 其中 subtotal 为付款金额的变量名,之后我们会演示如何使用它。复杂类型还可以写作 object.field 这样的格式。
      2. 而 0.5 是付款金额/积分比率,我们这里可以将其固定为每 1 块钱可以获得 0.5 积分。

可以看出相比注册送积分,购买获得积分的规则在结果中多出了变量,而变量是由公式计算获得,而公式本身又有变量存在。那我们接下来看看如何使用这个规则。

使用规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
C#复制代码public async Task<RuleResponse> Verify(string workFlowName, JsonElement args)
{
var input = new List<RuleParameter>();
switch (workFlowName)
{
case "Payment":
input.Add(new RuleParameter("subtotal", args.GetProperty("subtotal").GetDecimal());
break;
}

RuleResponse response = null;
var resultList = await _engine.ExecuteAllRulesAsync(workFlowName, input.ToArray());

resultList.OnSuccess(message =>
{
var actionResult = resultList
.FirstOrDefault(ruleResult => ruleResult.ActionResult != null)
?.ActionResult.Output;
response = new RuleResponse() {message = string.Format(message, actionResult)};
});

return response;
}

我们对 Verify 函数进行了改动,重点在以下地方:

  1. 返回值改成了 RuleResponse,现在它至于一个 string 型的成员 message,准备在之后对其进行扩展。
  2. 接收参数 string workFlowName 和 JsonElement args。
    1. 其中 workFlowName 标识准备触发的规则的名称,我们现在有多个规则,所以需要将其进行参数化。
    2. args 是 JSON 格式的相关参数,不同的规则对其处理会不一致。
  3. 在进行规则匹配前需要将规则中用到的参数放入规则引擎中,具体做法是构造一个 RuleParameter 其中需要传入两个参数。
    1. 第一个参数是规则中的变量名,表明这个参数要替换的规则中的变量的名字。
    2. 第二个参数是给该变量的赋值。这里是从 Json 中取出 subtotal,并将其转变为 C# 中的 Decimal 类型。
  4. 最后在 OnSuccess 时通过 ActionResult.Output 取出通过积分公式计算获得的结果,再通过 string.Format 将其替换规则中 SuccessEvent 的变量,获得最终结果。

代码上的实现就是这两个部分,下面看看效果。

调用调试

注册获得积分

注册.png

购买获得积分
购买.png

至此,最基础的购买获得积分也完成了。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go 进阶训练营(二)(完整版) Go 进阶训练营(二)

发表于 2021-07-27

Go 进阶训练营(二)

Go 架构实践 - 微服务(微服务可用性设计)

点点我download:Go 进阶训练营
提取马:d17c

目录

隔离
超时控制
过载保护
限流
降级
重试
负载均衡
最佳实践
References

隔离

隔离,本质上是对系统或资源进行分割,从而实现当系统发生故障时能限定传播范围和影响范围,即发生故障后只有出问题的服务不可用,保证其他服务仍然可用。
服务隔离:动静分离、读写分离
轻重隔离:核心、快慢、热点
物理隔离:线程、进程、集群、机房

隔离 - 服务隔离

动静隔离:
小到 CPU 的 cacheline false sharing、数据库 mysql 表设计中避免 bufferpool 频繁过期,隔离动静表,大到架构设计V(cmL46679910)中的图片、静态资源等缓存加速。本质上都体现的一样的思路,即加速/缓存访问变换频次小的。比如 CDN 场景中,将静态资源和动态 API 分离,也是体现了隔离的思路:
1)降低应用服务器负载,静态文件访问负载全部通过CDN。
2)对象存储存储费用最低。
3)海量存储空间,无需考虑存储架构升级。
4)静态CDN带宽加速,延迟低。

image.png
archive: 稿件表,存储稿件的名称、作者、分类、tag、状态等信息,表示稿件的基本信息。
在一个投稿流程中,一旦稿件创建改动的频率比较低。
archive_stat: 稿件统计表,表示稿件的播放、点赞、收藏、投币数量,比较高频的更新。
随着稿件获取流量,稿件被用户V(cmL46679910)所消费,各类计数信息更新比较频繁。
MySQL BufferPool 是用于缓存 DataPage 的,DataPage 可以理解为缓存了表的行,那么如果频繁更新 DataPage 不断会置换,会导致命中率下降的问题,所以我们在表设计中,仍然可以沿用类似的思路,其主表基本更新,在上游 Cache 未命中,透穿到 MySQL,仍然有 BufferPool 的缓存。

image.png
读写分离:主从、Replicaset、CQRS。

隔离 - 轻重隔离

1、核心隔离
业务按照 Level 进行资源池划分(L0/L1/L2)。
1)核心/非核心的故障域的差异隔离(机器资源、依赖资源)。
2)多集群,通过冗余资源来提升吞吐和容灾能力。

image.png
2、快慢隔离
我们可以把服务的吞吐想象为一个池,当突然洪流进来时,池子需要一定时间才能排放完,这时候其他支流在池子里待的时间取决于前面的排放能力,耗时就会增高,对小请求产生影响。
日志传输体系的架构设计中,整个流都会投放到一个 kafka topic 中(早期设计目的: 更好的顺序IO),流内会区分不同的 logidV(cmL46679910),logid 会有不同的 sink 端,它们之前会出现差速,比如 HDFS 抖动吞吐下降,ES 正常水位,全局数据就会整体反压。

image.png
按照各种纬度隔离:sink、部门、业务、logid、重要性(S/A/B/C)。
业务日志也属于某个 logid,日志等级就可以作为隔离通道。
3、热点隔离
何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行缓存。比如:
小表广播: 从 remotecache 提升为 localcache,app 定时更新,甚至可以让运营平台支持广播刷新 localcache。atomic.Value

image.png
主动预热: 比如直播房间页高在线情况下bypass 监控主动防御。

隔离 - 物理隔离

线程隔离
主要通过线程池进行隔离,也是实现服务隔离的基础。把业务进行分类并交给不同的线程池进行处理,当某个线程池处理一种业务请求发生问题时,V(cmL46679910)不会讲故障扩散和影响到其他线程池,保证服务可用。
对于 Go 来说,所有 IO 都是 Nonblocking,且托管给了 Runtime,只会阻塞Goroutine,不阻塞 M,我们只需要考虑 Goroutine 总量的控制,不需要线程模型语言的线程隔离。

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Cloud Gateway深撸 Spring

发表于 2021-07-27

Spring Cloud Gateway

框架 版本
Spring Boot 2.5.3
Spring Cloud 2020.0.3

maven依赖

1
2
3
4
yaml复制代码        <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

概念

1
2
3
scss复制代码路由(routes):gateway核心概念,包含了路由id、转发地址uri、一组断言、一组过滤器
断言(Predicate):用于断定请求是否符合当前路由规则的集合
过滤器(Filter):用来加工当前请求,其中又分为全局过滤器,和路由过滤器

实现方式

  1. 配置文件
1
2
3
4
5
6
7
8
9
10
yaml复制代码spring:
cloud:
gateway:
routes:
- id: theia-routes-base
uri: "http://10.20.23.49:31002"
predicates:
- Path=/zhaolw01/01
filters:
- SetPath=/

说明:

routes:路由集合,设定路由的id、转发url、断言、过滤器

id:需要唯一,用于存储和更新路由信息

uri:转发的地址
predicates:断言集合,多个断言类型取并集
filters:过滤器,多个过滤器都会执行

  1. java类
1
2
3
4
5
6
7
8
9
10
java复制代码        @Bean
public RouteLocator theiaRouteLocator(RouteLocatorBuilder builder) {
return builder
.routes()
.route("theiaRoute",r -> r.path("/xiangaoxiong01")
.filters(gatewayFilterSpec -> gatewayFilterSpec.setPath("/"))
.uri("http://10.20.23.49:31002")
)
.build();
}

说明:

RouteLocator:路由的主要对象,Gateway对于此对象的加载更新,可以实现动态路由

自定义断言

通过继承AbstractRoutePredicateFactory类可以快速实现一个自定义断言,需要自定义一个Config类用于接收断言内容,重写apply方法,返回一个GatewayPredicate类型.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码@Configuration
@Slf4j
public class TheiaServiceRoutePredicateFactory extends AbstractRoutePredicateFactory<TheiaServiceRoutePredicateFactory.Config> {

public TheiaServiceRoutePredicateFactory() {
super(Config.class);
}

@Bean
@ConditionalOnEnabledPredicate
public TheiaServiceRoutePredicateFactory getTheiaServiceRoutePredicateFactory(){
return new TheiaServiceRoutePredicateFactory();
}

@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("patterns");
}

@Override
public Predicate<ServerWebExchange> apply(Config config) {
List<String> patterns = config.getPatterns();
return (GatewayPredicate) serverWebExchange -> {
ServerHttpRequest request = serverWebExchange.getRequest();
log.info("自定义断言:{}", patterns);
String url = request.getURI().getRawPath();
return patterns.parallelStream().filter(x -> url.startsWith(x)).count() > 0 ;
};
}

@Validated
public static class Config {

private List<String> patterns = new ArrayList<>();

public List<String> getPatterns() {
return patterns;
}

public TheiaServiceRoutePredicateFactory.Config setPatterns(List<String> patterns) {
this.patterns = patterns;
return this;
}

}
}

自定义过滤器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码@Configuration
@Slf4j
public class TheiaServiceGatewayFilterFactory extends AbstractGatewayFilterFactory<TheiaServiceGatewayFilterFactory.Config> {

public TheiaServiceGatewayFilterFactory() {
super(TheiaServiceGatewayFilterFactory.Config.class);
}

@Bean
@ConditionalOnEnabledFilter
public TheiaServiceGatewayFilterFactory getTheiaServiceGatewayFilterFactory() {
return new TheiaServiceGatewayFilterFactory();
}

@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("template");
}


@Override
public GatewayFilter apply(Config config) {
String template = config.getTemplate();
return (exchange, chain) -> {
ServerHttpRequest req = exchange.getRequest();
log.info("自定义过滤器:{}",template);
String newPath = req.getURI().getRawPath().replaceAll(template,"/");
ServerHttpRequest request = req.mutate().path(newPath).build();
return chain.filter(exchange.mutate().request(request).build());
};
}

public static class Config {

private String template;

public String getTemplate() {
return template;
}

public void setTemplate(String template) {
this.template = template;
}

}
}

其中AbstractRoutePredicateFactory和AbstractGatewayFilterFactory是官方提供的静态实现类,其中实现了部分通用部分,只需要写自定义的apply方法即可,其中shortcutFieldOrder就是用来处理参数注入的。

原理解析

通过以上方式,我们可以快速实现一个基于Gateway的路由服务,那么生刨一下源码,来看下它是怎么实现的:
通过查看Spring-Cloud-Gateway的源码,查看其中的spring.factories可以知道主要的启动配置:

1
2
3
4
5
6
7
8
9
10
11
12
properties复制代码# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.gateway.config.GatewayClassPathWarningAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayResilience4JCircuitBreakerAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayNoLoadBalancerClientAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayMetricsAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayRedisAutoConfiguration,\
org.springframework.cloud.gateway.discovery.GatewayDiscoveryClientAutoConfiguration,\
org.springframework.cloud.gateway.config.SimpleUrlHandlerMappingGlobalCorsAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayReactiveLoadBalancerClientAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayReactiveOAuth2AutoConfiguration

通过查看GatewayClassPathWarningAutoConfiguration,可以看出Gateway是基于WebFlux实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码        @Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = "org.springframework.web.servlet.DispatcherServlet")
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
protected static class SpringMvcFoundOnClasspathConfiguration {

public SpringMvcFoundOnClasspathConfiguration() {
throw new MvcFoundOnClasspathException();
}

}

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.web.reactive.DispatcherHandler")
protected static class WebfluxMissingFromClasspathConfiguration {

public WebfluxMissingFromClasspathConfiguration() {
log.warn(BORDER + "Spring Webflux is missing from the classpath, "
+ "which is required for Spring Cloud Gateway at this time. "
+ "Please add spring-boot-starter-webflux dependency." + BORDER);
}

}

关键点在于DispatcherServlet是基于Servlet实现的,当有这个Class加载的时候就会报错,当没有DispatcherHandler存在的时候就会报警告异常。

继续往下看GatewayAutoConfiguration就是我们要看的核心了,由于内容过多,只看其中比较重要的几个Bean加载:

1
2
3
4
5
6
java复制代码        @Bean
@ConditionalOnEnabledPredicate(WeightRoutePredicateFactory.class)
public WeightCalculatorWebFilter weightCalculatorWebFilter(ConfigurationService configurationService,
ObjectProvider<RouteLocator> routeLocator) {
return new WeightCalculatorWebFilter(routeLocator, configurationService);
}

作为权重路由加载,因为WeightCalculatorWebFilter实现了WebFliter,而其他的路由是基于DispatcherHandler中的HandlerMapping实现的。

1
2
3
4
5
java复制代码        @Bean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(FilteringWebHandler webHandler,
RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) {
return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);
}

RoutePredicateHandlerMapping就是整个路由的核心处理类,其实现了HandlerMapping接口,将会被注入到DispatcherHandler的handlerMappings中发挥作用。

查看RoutePredicateHandlerMapping中,核心方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码        @Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}

exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}

此方法在DispatcherHandler的主方法中会调用,其中方法lookupRoute就会通过断言规则return r.getPredicate().apply(exchange),获取对应的Route返回,然后再后续的FilteringWebHandler中通过GATEWAY_ROUTE_ATTR属性获取Route对象,然后执行对象中的所有Filters。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码        @Override
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();

List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
AnnotationAwareOrderComparator.sort(combined);

if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}

return new DefaultGatewayFilterChain(combined).filter(exchange);
}

前面的流程基本说明的整个Gateway的基础工作原理,而通过前面的逻辑,可以看到主要是通过RouteLocator类的getRoutes方法获取路由信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码
private AsyncPredicate<ServerWebExchange> lookup(RouteDefinition route, PredicateDefinition predicate) {
RoutePredicateFactory<Object> factory = this.predicates.get(predicate.getName());
if (factory == null) {
throw new IllegalArgumentException("Unable to find RoutePredicateFactory with name " + predicate.getName());
}
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + route.getId() + " applying " + predicate.getArgs() + " to "
+ predicate.getName());
}

// @formatter:off
Object config = this.configurationService.with(factory)
.name(predicate.getName())
.properties(predicate.getArgs())
.eventFunction((bound, properties) -> new PredicateArgsEvent(
RouteDefinitionRouteLocator.this, route.getId(), properties))
.bind();
// @formatter:on

return factory.applyAsync(config);
}

其中核心代码:

1
java复制代码      RoutePredicateFactory<Object> factory = this.predicates.get(predicate.getName());

也就是说是通过类名获取到对应的断言工厂的,而加载所有工厂的RoutePredicateFactory类中的方法:

1
2
3
java复制代码default String name() {
return NameUtils.normalizeRoutePredicateName(getClass());
}

通过查看实现方法发现

1
java复制代码return removeGarbage(clazz.getSimpleName().replace(RoutePredicateFactory.class.getSimpleName(), ""));

也就是说,自己实现的RoutePredicateFactory要是以RoutePredicateFactory结尾的话,就能想PathRoutePredicateFactory那样在断言里面写Path=/**然后自动找到断言实现类,不然就要自己实现getSimpleName方法。

而通过路由的Filters中的信息加载GatewayFilterFactory也是类似的实现:

1
2
java复制代码
GatewayFilterFactory factory = this.gatewayFilterFactories.get(definition.getName());

GatewayFilterFactory:

1
2
3
4
java复制代码default String name() {
// TODO: deal with proxys
return NameUtils.normalizeFilterFactoryName(getClass());
}

到此,基本整个Gateway的原理基本完成,还有其他功能,与主流程无关,不过可以作为扩展适配个性化的需求。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RocketMQ详细配置与使用 一、MQ介绍 二、Rocke

发表于 2021-07-27

一、MQ介绍

1.1 为什么要用MQ

消息队列是一种“先进先出”的数据结构

在这里插入图片描述

其应用场景主要包含以下3个方面

1)应用解耦

系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

在这里插入图片描述

使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

在这里插入图片描述

2)流量削峰

在这里插入图片描述

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

在这里插入图片描述

一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。

处于经济考量目的:

业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰

3)数据分发

在这里插入图片描述

通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可

在这里插入图片描述

1.2 MQ的优点和缺点

优点:解耦、削峰、数据分发

缺点包含以下几点:

  • 系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。

如何保证MQ的高可用?

  • 系统复杂度提高

MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。

如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

  • 一致性问题

A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。

如何保证消息数据处理的一致性?

1.3 各种MQ产品的比较

常见的MQ产品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。

在这里插入图片描述

二、RocketMQ快速入门

RocketMQ 是阿里巴巴2016年MQ中间件,使用 Java 语言开发,在阿里内部,RocketMQ 承接了例如“双11”等高并发场景的消息流转,能够处理万亿级别的消息。

2.1 准备工作

2.1.1 下载RocketMQ

这里选择的 RocketMQ 的版本:4.6.0

下载地址:下载地址

官方文档:rocketmq.apache.org/docs/quick-…

2.2.2 环境要求

  • Linux64位系统
  • JDK1.8(64位)

2.2 安装RocketMQ

2.2.1 安装步骤

我这里是以二进制包方式来安装的:

  1. 解压安装包
  2. 进入安装目录

2.2.2 目录介绍

  • bin:启动脚本,包括 shell 脚本和 CMD 脚本
  • conf:实例配置文件 ,包括 broker 配置文件、logback 配置文件等
  • lib:依赖 jar 包,包括Netty、commons-lang、FastJSON等

2.3 启动RocketMQ

  1. RocketMQ 默认的虚拟机内存较大,启动 Broker 或者 NameServer 可能会因为内存不足而导致失败,所以需要编辑如下两个配置文件,修改 JVM 内存大小
1
2
3
4
5
6
7
8
shell复制代码# 编辑 runbroker.sh 和 runserver.sh 修改默认 JVM 大小
$ vi bin/runbroker.sh
# 参考设置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

$ vi bin/runserver.sh
# 参考设置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  1. 启动 NameServer
1
2
3
4
shell复制代码# 1.启动NameServer
nohup sh bin/mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
  1. 启动 Broker
1
2
3
4
shell复制代码# 1.启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log

bin/mqbroker 的一些可选参数:

  • -c:指定配置文件路径
  • -n:NameServer 的地址

2.4 测试RocketMQ

2.4.1 发送消息

1
2
3
4
shell复制代码# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

2.4.2 接收消息

1
2
3
4
shell复制代码# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

2.5 关闭RocketMQ

1
2
3
4
shell复制代码# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker

2.6 各角色介绍

  • Producer:消息的发送者;举例:发件者
  • Consumer:消息接收者;举例:收件人
  • Consumer Group:消费组;每一个 consumer 实例都属于一个 consumer group,每一条消息只会被同一个 consumer group 里的一个 consumer 实例消费。(不同consumer group可以同时消费同一条消息)
  • Broker:暂存和传输消息;举例:快递公司
  • NameServer:管理 Broker;举例:快递公司的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
  • Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息

在这里插入图片描述

2.7 broker配置文件详解

broker 默认的配置文件位置在:conf/broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
py复制代码#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

2.8 可视化监控平台搭建

2.8.1 概述

RocketMQ 有一个对其扩展的开源项目 incubator-rocketmq-externals,这个项目中有一个子模块叫 rocketmq-console,这个便是管理控制台项目了,先将 incubator-rocketmq-externals 拉到本地,因为我们需要自己对 rocketmq-console 进行编译打包运行。

在这里插入图片描述

2.8.2 下载并编译打包

  1. 克隆项目
1
bash复制代码git clone https://github.com/apache/rocketmq-externals
  1. 在 rocketmq-console 中配置 namesrv 集群地址:
1
2
3
bash复制代码$ cd rocketmq-console
$ vim src/main/resources/application.properties
rocketmq.config.namesrvAddr=10.211.55.4:9876
  1. 配置完成进行编译并打包
1
shell复制代码mvn clean package -Dmaven.test.skip=true
  1. 启动 rocketmq-console:
1
shell复制代码nohup java -jar rocketmq-console-ng-2.0.0.jar > tmp.log &

启动成功后,我们就可以通过浏览器访问 http://IP地址:8080 进入控制台界面了,如下图:

在这里插入图片描述

三、消息发送与消费示例(Maven)

  • 导入MQ客户端依赖

==注意==:rocketmq-client 的版本,要与 RocketMQ 的版本一致

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.0</version>
</dependency>
  • 消息发送者步骤分析:
1. 创建消息生产者 `producer`,并指定生产者组名
2. 指定 `Nameserver` 地址
3. 启动 `producer`
4. 创建消息对象,指定主题 `Topic`、`Tag` 和消息体
5. 发送消息
6. 关闭生产者 `producer`
  • 消息消费者步骤分析:
1. 创建消费者 `Consumer`,制定消费者组名
2. 指定 `Nameserver` 地址
3. 订阅主题 `Topic` 和 `Tag`
4. 设置回调函数,处理消息
5. 启动消费者 `consumer`

3.1 基本样例

3.1.1 消息发送

1)发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 设置消息同步发送失败时的重试次数,默认为 2
producer.setRetryTimesWhenSendFailed(2);
// 设置消息发送超时时间,默认3000ms
producer.setSendMsgTimeout(3000);
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

2)发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 设置消息异步发送失败时的重试次数,默认为 2
producer.setRetryTimesWhenSendAsyncFailed(2);
// 设置消息发送超时时间,默认3000ms
producer.setSendMsgTimeout(3000);
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

3)单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);

}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

3.1.2 消费消息

1)集群模式(负载均衡)

消费者采用集群方式消费消息,==一条消息同一个消费者组中只有一个消费者会消费到==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("Test", "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}

2)广播模式

消费者采用广播的方式消费消息,==一条消息同一个消费者组中每个消费者都要消费==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("Test", "*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}

3.2 顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

3.2.1 顺序消息生产

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
java复制代码/**
* Producer,发送顺序消息
*/
public class Producer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

String[] tags = new String[]{"TagA", "TagC", "TagD"};

// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id

System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}

producer.shutdown();
}

/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;

public long getOrderId() {
return orderId;
}

public void setOrderId(long orderId) {
this.orderId = orderId;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}

/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();

OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);

return orderList;
}
}

3.2.2 顺序消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}

try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

3.3 延时消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

3.3.1 启动消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// 订阅Topics
consumer.subscribe("TestTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}

3.3.2 发送延时消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}

###4.3.3 验证

您将会看到消息的消费比存储时间晚10秒

3.3.4 使用限制

1
2
java复制代码// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

3.4 批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

3.4.1 发送批量消息

如果您每次只发送不超过 4MB 的消息,则很容易使用批处理,样例如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//处理error
}

如果消息的总长度可能大于 4MB 时,这时候最好把消息进行分割

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}

}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}

3.5 过滤消息

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

1
2
java复制代码DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
te复制代码------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------

3.5.1 SQL基本语法

RocketMQ 只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=
  • 字符比较,比如:=,<>,IN
  • IS NULL 或者 IS NOT NULL
  • 逻辑符号 AND,OR,NOT

常量支持类型为:

  • 数值,比如:123,3.1415
  • 字符,比如:'abc',必须用单引号包裹起来
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

只有使用 push 模式的消费者才能用使用SQL92标准的sql语句,接口如下:

1
java复制代码public void subscribe(finalString topic, final MessageSelector messageSelector)

3.5.2 消息生产者

发送消息时,你能通过putUserProperty来设置消息的属性

1
2
3
4
5
6
7
8
9
10
11
java复制代码DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

3.5.3 消息消费者

用MessageSelector.bySql来使用sql筛选消息

1
2
3
4
5
6
7
8
9
10
java复制代码DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

3.6 事务消息

3.6.1 流程分析

在这里插入图片描述

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

####1)事务消息发送及提交

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2)事务补偿

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

3)事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

3.6.2 发送事务消息

1) 创建事务性生产者

使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//创建消息生产者
TransactionMQProducer producer = new TransactionMQProducer("group6");
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
//生产者这是监听器
producer.setTransactionListener(transactionListener);
//启动消息生产者
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
TimeUnit.SECONDS.sleep(1);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//producer.shutdown();
}
}

2)实现事务的监听接口

当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class TransactionListenerImpl implements TransactionListener {

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务");
if (StringUtils.equals("TagA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}

}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果");
return LocalTransactionState.COMMIT_MESSAGE;
}
}

3.6.3 使用限制

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

3.7 连接阿里云RocketMQ时配置AK与Secret

如果是调用阿里云的 RocketMQ,则还需要指定 AK 与 Secret。阿里云 Demo:戳这里

3.7.1 生产者

生产者设置 AK 与 Secert 操作都一样,只需要在创建 Producer 时指定就行,这里就以发送普通消息为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码public class SyncAKProducer {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("设置自己的ACCESS_KEY", "设置自己的SECRET_KEY"));
}

public static void main(String[] args) throws Exception {
/**
* 创建Producer,并开启消息轨迹
* 如果不想开启消息轨迹,可以按照如下方式创建:
* DefaultMQProducer producer = new DefaultMQProducer(M"设置自己的GroupName(唯一)", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer("设置自己的GroupName(唯一)", getAclRPCHook(), true, null);

/**
* 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项.
*/
producer.setAccessChannel(AccessChannel.CLOUD);
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 设置消息同步发送失败时的重试次数,默认为 2
producer.setRetryTimesWhenSendFailed(2);
// 设置消息发送超时时间,默认3000ms
producer.setSendMsgTimeout(3000);
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

3.7.2 消费者

消费者设置 AK 与 Secert 的操作都一样,只需要在创建 Consummer 时指定就行,这里就以接收普通消息为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("设置自己的ACCESS_KEY", "设置自己的SECRET_KEY"));
}

public static void main(String[] args) throws Exception {
/**
* 创建Consumer,并开启消息轨迹
* 如果不想开启消息轨迹,可以按照如下方式创建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqConfig.GROUP_ID, getAclRPCHook(), null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);

/**
* 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项.
*/
consumer.setAccessChannel(AccessChannel.CLOUD);

// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("Test", "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}

四、消息发送与消费示例(Spring Boot)

4.1 导入依赖

1
2
3
4
5
xml复制代码   <dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>

4.2 生产者

4.2.1 application.yaml 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
yaml复制代码# application.yaml
rocketmq:
name-server: 10.124.128.200:9876
producer:
group: test-group
# 发送同步消息失败时,重试次数,默认是 2
retry-times-when-send-failed: 2
# 发送异步消息失败时,重试次数,默认是 2
retry-times-when-send-async-failed: 2
# 发送消息超时时间,默认是 3s
send-message-timeout: 3000

# 连接阿里云RocketMQ时需要配置AK与SK
access-key:
secret-key:

4.2.2 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@RestController
@RequestMapping("/test")
public class ProducerTest {

//自动注入
@Autowired
private RocketMQTemplate rocketMQTemplate;

@PostMapping("/sendSyncMessage")
public void sendSyncMessage(@RequestBody Map<String, Object> msgMap){
//构建消息
Message message = new Message("TopicName", "Tag", hash, JSON.toJSONBytes(msgData));

//发送同步消息
//方法1:使用与第三章相同的方法,调用 getProducer() 方法时会返回DefaultMQProducer对象,然后调用其方法第三章的一样了。
SendResult sendResult = rocketMQTemplate.getProducer().send(message);

//方法2:使用rocketMQTemplate封装的消息发送方法
// 第一个参数指定Topic与Tag,格式: `topicName:tags`
// 第二个参数,Message对象
sendResult = rocketMQTemplate.syncSend("TopicName:Tag", message);
}
}

4.2 消费者

4.2.1 application.yaml 配置文件

1
2
3
4
5
6
7
8
yaml复制代码rocketmq:
name-server: 10.124.128.200:9876

# 下面的配置只有在用阿里云的RocketMQ时,才配置,自己搭建的不需要配置
consumer:
access-key:
secret-key:
access-channel: CLOUD

4.2.2 消费者消息监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",
consumerGroup = "springboot-mq-consumer-1",
selectorExpression = "*")
public class Consumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
log.info("Receive message:" + message);

//如果消费失败,则抛出RuntimeException,RocketMQ会自动重试
//可以手动抛出,也可以使用 Lombok 的 @SneakyThrows 注解来抛出 RuntimeException
throw new RuntimeException("消费失败");
}
}

@RocketMQMessageListener 注解的常用配置参数:

参数 类型 默认值 说明
consumerGroup String 消费者组
topic String Topic
selectorType SelectorType SelectorType.TAG 使用TAG 或者SQL92选择消息,默认tag
selectorExpression String “*“ 控制哪些消息可以选择
consumeMode ConsumeMode ConsumeMode.CONCURRENTLY 消费模式,并发接收还是顺序接收,默认并发模式
messageModel MessageModel MessageModel.CLUSTERING 消费模式,广播模式还是集群模式,默认集群模式
consumeThreadMax int 64 最大消费线程数
consumeTimeout long 15L 消费超时时间(一条消息可能会阻塞使用线程的最长时间(以分钟为单位))
nameServer String 配置文件中读取:${rocketmq.name-server:} nameServer地址
accessKey String 配置文件中读取:${rocketmq.consumer.access-key:} AK
secretKey String 配置文件中读取:${rocketmq.consumer.secret-key:} SK
accessChannel String ${rocketmq.access-channel:}

五、消息存储

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。

消息存储方式

  1. 消息生成者发送消息
  2. MQ 收到消息,将消息进行持久化,在存储中新增一条记录
  3. 返回 ACK 给生产者
  4. MQ push 消息给对应的消费者,然后等待消费者返回 ACK
  5. 如果消息消费者在指定时间内成功返回 ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果 MQ 在指定时间内没有收到 ACK,则认为消息消费失败,会尝试重新 push 消息,重复执行4、5、6步骤
  6. MQ 删除消息

5.1 存储介质

目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是==消息刷盘==至所部署虚拟机/物理机的==文件系统==来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。

消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

5.2 消息存储结构

RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合完成的,消息真正的物理存储文件是 CommitLog,ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。

在这里插入图片描述

  • CommitLog:存储消息的元数据
  • ConsumerQueue:存储消息在 CommitLog 的索引
  • IndexFile:为了消息查询提供了一种通过 key 或时间区间来查询消息的方法,这种通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程

5.3 顺序写

RocketMQ 的消息用顺序写,保证了消息存储的速度。

磁盘如果使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概 100KB/s,和顺序写的性能相差 6000 倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。

5.4 刷盘机制

RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过 Producer 写入 RocketMQ 的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

同步刷盘和异步刷盘

1)同步刷盘

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

2)异步刷盘

在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

3)配置

同步刷盘和异步刷盘,都是通过 Broker 配置文件里的 flushDiskType 参数设置的,把这个参数被配置成 SYNC_FLUSH(同步)、ASYNC_FLUSH (异步)中的一个。

5.5 零拷贝

Linux 操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。

一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

  1. read:读取本地文件内容;
  2. write:将读取的内容通过网络发送出去。

这两个看似简单的操作,实际进行了4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复制到用户态内存;
  3. 然后从用户态内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复制到网卡中进行传输。

在这里插入图片描述

通过使用 mmap 的方式,可以省去向用户态的内存复制,提高速度。这种机制在 Java 中是通过 MappedByteBuffer 实现的

RocketMQ 充分利用了上述特性,也就是所谓的“==零拷贝==”技术,提高消息存盘和网络发送的速度。

这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射不超过 1.5 的文件至用户态的虚拟内存,这也是为何 RocketMQ 默认设置单个 CommitLog (存储消息的元数据)数据文件为 1G 的原因了

六、高可用性机制

在这里插入图片描述

RocketMQ 分布式集群是通过 Master 和 Slave 的配合达到高可用性的。

Master 和 Slave 的区别:在 Broker 的配置文件中,参数 brokerId 的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。

Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。

6.1 消息消费高可用

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。

6.2 消息发送高可用

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可 用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。

在这里插入图片描述

6.3 主从复制

如果一个 Broker 组有 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步和异步两种复制方式。

1)同步复制

同步复制方式是等 Master 和 Slave 均写 成功后才反馈给客户端写成功状态;

在同步复制方式下,如果 Master 出故障, Slave 上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。

2)异步复制

异步复制方式是只要 Master 写成功 即可反馈给客户端写成功状态。

在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果 Master 出了故障,有些数据因为没有被写入 Slave,有可能会丢失;

3)配置

同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数的值有:

  • ASYNC_MASTER:异步复制主节点
  • SYNC_MASTER:同步复制主节点
  • SLAVE:从节点

4)总结

在这里插入图片描述

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是 SYNC_FLUSH (同步刷盘)方式,由于频繁地触发磁盘写动作,会明显降低性能。

通常情况下,应该把 Master 和 Slave 配置成 ASYNC_FLUSH (异步刷盘)的刷盘方式,主从之间配置成 SYNC_MASTER (同步复制)的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。

七、负载均衡

7.1 Producer负载均衡

Producer 端,每个实例在发消息的时候,==默认会轮询所有的 message queue 发送==,以达到让消息平均落在不同的 queue 上。而由于 queue 可以散落在不同的 broker,所以消息就发送到不同的 broker 下,如下图:

在这里插入图片描述

图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。

7.2 Consumer负载均衡

1)集群模式

在集群消费模式下,每个订阅这个 topic 的消费者组都会收到消息,每条消息只会被一个消费者组中的一个实例消费。RocketMQ 采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条 message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照 queue 的数量和实例的数量平均分配 queue 给每个实例。

默认的分配算法是 AllocateMessageQueueAveragely,如下图:

在这里插入图片描述

还有另外一种平均的算法是 AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条 queue,只是以环状轮流分 queue 的形式,如下图:

consumer负载均衡2

需要注意的是,集群模式下,queue 都是只允许分配只一个实例,这是由于如果多个实例同时消费一个 queue 的消息,由于拉取哪些消息是 consumer 主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个 queue 只分给一个 consumer 实例,一个 consumer 实例可以允许同时分到不同的 queue。

通过增加 consumer 实例去分摊 queue 的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的 queue 将分配到其他实例上继续消费。

但是如果 consumer 实例的数量比 message queue 的总数量还多的话,多出来的 consumer 实例将无法分到 queue,也就无法消费到消息,也就无法起到分摊负载的作用了。==所以需要控制让 queue 的总数量大于等于 consumer 的数量。==

2)广播模式

由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

在实现上,其中一个不同就是在 consumer 分配 queue 的时候,所有 consumer 都分到所有的 queue。

八、消息重试

8.1 顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

8.2 无序消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

1)重试次数

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

2)配置方式

消费失败后,重试配置方式

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 返回 Action.ReconsumeLater (推荐)
  • 返回 Null
  • 抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//处理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息将重试
return Action.ReconsumeLater;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}

消费失败后,不重试配置方式

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息处理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}

自定义消息最大重试次数

消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

  • 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
  • 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
1
2
3
4
java复制代码Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);

注意:

  • 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
  • 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
  • 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置

获取消息重试次数

消费者收到消息后,可按照如下方式获取消息的重试次数:

1
2
3
4
5
6
7
8
java复制代码public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//获取消息的重试次数
System.out.println(message.getReconsumeTimes());
return Action.CommitMessage;
}
}

3)多消费组重试

假设有 A 消费者组和 B 消费者组,当 A 和 B 同时监听同一个 topic 时,A 和 B 都获得了同一消息,但是 A 消费失败了(return Action.ReconsumeLater),B 却消费成功了。然后重试的时候,rocketMQ 只会把该消息再发送给 B 消费者组,不会再发送给 A 消费者组了。

九、死信队列

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

9.1 死信特性

死信消息具有以下特性:

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列具有以下特性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

9.2 查看死信信息

  1. 在控制台查询出现死信队列的主题信息

在这里插入图片描述
2. 在消息界面根据主题查询死信消息

在这里插入图片描述
3. 选择重新发送消息

一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

十、消费幂等

消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。

10.1 消费幂等的必要性

在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

  • 发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

10.2 处理方式

因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:

1
2
3
java复制代码Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);

订阅方收到消息时可以根据消息的 Key 进行幂等处理:

1
2
3
4
5
6
java复制代码consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 key 做幂等处理
}
});

十一、RocketMQ使用注意事项

  1. 同一消费者组下,消费者逻辑应相同(监听的 topic,tag 都要相同)
  2. 默认配置下,不同消费者组之间是消息共享(所有消费者组都能获取到同一消息),消费者组内的消费者是负载均衡(只有一个消费者会获得消息)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用深度 Q 学习的 AI 驱动蛇游戏|Python 主题月

发表于 2021-07-27

本文正在参加「Python主题月」,详情查看 活动链接

简介: 该项目基于强化学习,训练蛇吃环境中存在的食物。

下面给出了一个示例 gif,您可以了解我们将要构建的内容。

AI驱动的蛇

Animation.gif

要了解我们如何使用 pygame 手动构建这个蛇 2D 动画模拟,请点击链接:

手把手教你使用 Python 制作贪吃蛇游戏|Python 主题月

在构建基本的蛇游戏之后,现在我们将专注于如何将强化学习应用于它。

我们必须在这个项目中创建三个模块:

  1. 环境(我们刚刚构建的游戏)
  2. 模型(移动预测的强化模型)
  3. 代理(环境和模型之间的中介)

模块链接
image.png

算法:

我们在棋盘上随机放置了蛇和食物。

  • 使用 11 个值计算蛇的状态。如果有任何条件为真,则将该值设置为0,否则设置1。

如何定义 11 个状态

基于当前的 Head 位置代理将计算 11 个状态值,如上所述。

  • 获得这些状态后,代理会将其传递给模型并执行下一步操作。
  • 执行下一个状态后计算奖励。奖励定义如下:
+ **吃食物:+10**
+ **游戏结束:-10**
+ **其他:0**
  • 更新 Q 值(稍后将讨论)并训练模型。
  • 在分析了算法之后,现在我们必须建立思想来继续编码这个算法。

该模型:

image.png

神经网络模型

该模型是使用 Pytorch 设计的,但您也可以根据自己的舒适度使用 TensorFlow。

我们正在使用具有11 大小输入层和具有 256 个神经元和3 个神经 元 输出的 密集层的密集神经网络 。 您可以调整这些超参数以获得最佳结果。

模型如何工作?

  • 游戏开始,Q值随机初始化。
  • 系统获取当前状态 s。
  • 它基于 s,随机或基于其神经网络执行一个动作。在训练的第一阶段,系统经常选择随机动作来最大化探索。后来,该系统越来越依赖其神经网络。
  • 当 AI 选择并执行动作时,环境会给予奖励。然后,代理到达新状态并根据贝尔曼方程更新其 Q 值。

image.png
贝尔曼方程

  • 此外,对于每一步,它存储原始状态、动作、执行该动作后达到的状态、获得的奖励以及游戏是否结束。这些数据随后被采样以训练神经网络。此操作称为重放记忆。
  • 重复最后两个操作,直到满足某个条件(例如:游戏结束)。

该项目的核心是您将要训练的模型,因为蛇将采取的动作的正确性完全取决于您构建的模型的质量。所以我想用部分代码向你解释这一点。

第一部分

  1. 创建一个名为 Linear_Qnet 的类,用于初始化线性神经网络。
  2. 函数forward用于取输入(11个状态向量)并通过神经网络并应用 relu 激活函数并将输出返回给下一个
    移动 1 x 3 矢量大小。简而言之,这是代理将调用的预测函数。
  3. save函数用于保存训练好的模型以备后用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
python复制代码class Linear_QNet(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.linear1 = nn.Linear(input_size, hidden_size)
self.linear2 = nn.Linear(hidden_size, output_size)

def forward(self, x):
x = F.relu(self.linear1(x))
x = self.linear2(x)
return x

def save(self, file_name='model_name.pth'):
model_folder_path = 'Path'
file_name = os.path.join(model_folder_path, file_name)
torch.save(self.state_dict(), file_name)

第二部分

1.初始化QTrainer类
∗ 设置优化器的学习率。

  • Gamma 值是贝尔曼方程中使用的贴现率。
  • 初始化 Adam 优化器以更新权重和偏差。
  • 标准是均方损失函数。

2.train_step 函数

  • 如您所知,PyTorch 仅适用于张量,因此我们正在转换所有输入到张量。
  • 如上所述,我们进行了短暂的记忆训练,然后我们只会传递一个值
    状态、动作、奖励、移动 所以我们需要把它们转换成一个向量,所以我们使用了
    未压缩的功能。
  • 从模型中获取状态并使用以下公式计算新的 Q 值:
    Q_new = 奖励 + gamma * max(next_predicted Qvalue)
  • 计算新 Q 值和先前 Q 值之间的均方误差和
    反向传播该损失以进行权重更新。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
c++复制代码class QTrainer:
def __init__(self,model,lr,gamma):
#Learning Rate for Optimizer
self.lr = lr
#Discount Rate
self.gamma = gamma
#Linear NN defined above.
self.model = model
#optimizer for weight and biases updation
self.optimer = optim.Adam(model.parameters(),lr = self.lr)
#Mean Squared error loss function
self.criterion = nn.MSELoss()



def train_step(self,state,action,reward,next_state,done):
state = torch.tensor(state,dtype=torch.float)
next_state = torch.tensor(next_state,dtype=torch.float)
action = torch.tensor(action,dtype=torch.long)
reward = torch.tensor(reward,dtype=torch.float)

#only one parameter to train, \
Hence convert to tuple of shape(1, x)
if(len(state.shape) == 1):
#(1, x)
state = torch.unsqueeze(state,0)
next_state = torch.unsqueeze(next_state,0)
action = torch.unsqueeze(action,0)
reward = torch.unsqueeze(reward,0)
done = (done, )

# 1. Predicted Q value with current state
pred = self.model(state)
target = pred.clone()
for idx in range(len(done)):
Q_new = reward[idx]
if not done[idx]:
Q_new = reward[idx] +
self.gamma * torch.max(self.model(next_state[idx]))
target[idx][torch.argmax(action).item()] = Q_new
# 2. Q_new = reward + gamma * max(next_predicted Qvalue)
#pred.clone()
#preds[argmax(action)] = Q_new
self.optimer.zero_grad()
loss = self.criterion(target,pred)
loss.backward() # backward propogation of loss

self.optimer.step()

代理

  • 从环境中获取蛇的当前状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
python复制代码def get_state(self, game):
head = game.snake[0]
point_l = Point(head.x - BLOCK_SIZE, head.y)
point_r = Point(head.x + BLOCK_SIZE, head.y)
point_u = Point(head.x, head.y - BLOCK_SIZE)
point_d = Point(head.x, head.y + BLOCK_SIZE)

dir_l = game.direction == Direction.LEFT
dir_r = game.direction == Direction.RIGHT
dir_u = game.direction == Direction.UP
dir_d = game.direction == Direction.DOWN

state = [
# Danger Straight
(dir_u and game.is_collision(point_u))or
(dir_d and game.is_collision(point_d))or
(dir_l and game.is_collision(point_l))or
(dir_r and game.is_collision(point_r)),

# Danger right
(dir_u and game.is_collision(point_r))or
(dir_d and game.is_collision(point_l))or
(dir_u and game.is_collision(point_u))or
(dir_d and game.is_collision(point_d)),

# Danger Left
(dir_u and game.is_collision(point_r))or
(dir_d and game.is_collision(point_l))or
(dir_r and game.is_collision(point_u))or
(dir_l and game.is_collision(point_d)),

# Move Direction
dir_l,
dir_r,
dir_u,
dir_d,

# Food Location
game.food.x < game.head.x, # food is in left
game.food.x > game.head.x, # food is in right
game.food.y < game.head.y, # food is up
game.food.y > game.head.y # food is down
]
return np.array(state, dtype=int)
  • 调用模型获取蛇的下一个状态
1
2
3
4
5
6
7
8
9
10
11
12
13
python复制代码def get_action(self, state):
# 随机移动: tradeoff explotation / exploitation
self.epsilon = 80 - self.n_game
final_move = [0, 0, 0]
if(random.randint(0, 200) < self.epsilon):
move = random.randint(0, 2)
final_move[move] = 1
else:
state0 = torch.tensor(state, dtype=torch.float).cuda()
prediction = self.model(state0).cuda() # prediction by model
move = torch.argmax(prediction).item()
final_move[move] = 1
return final_move

注意: 开发和探索之间存在权衡。开发包括根据目前观察到的数据做出假设为最佳的决定。探索是在不考虑之前的动作和奖励对的情况下随机做出决定。因此,无论是必要的,因为考虑利用漏洞可以导致代理无法探索整个环境,并探索可能并不总是提供一个更好的奖励。

  • 在环境中播放模型预测的步骤。
  • 存储当前状态、执行的移动和奖励。
  • 根据执行的移动和环境获得的奖励训练模型。(训练短记忆)
1
2
python复制代码def train_short_memory(self, state, action, reward, next_state, done):
self.trainer.train_step(state, action, reward, next_state, done)
  • 如果游戏因撞墙或身体而结束,则根据到目前为止执行的所有移动来训练模型并重置环境。(训练长记忆)。以 1000 的批量大小进行训练。
1
2
3
4
5
6
7
python复制代码def train_long_memory(self):
if (len(self.memory) > BATCH_SIZE):
mini_sample = random.sample(self.memory, BATCH_SIZE)
else:
mini_sample = self.memory
states, actions, rewards, next_states, dones = zip(*mini_sample)
self.trainer.train_step(states, actions, rewards, next_states, dones)

训练模型需要大约 100 个时期才能获得更好的性能。查看我的训练进度。

输出:

  • 要运行此游戏,请先在 anaconda 提示符或(任何平台)中创建一个环境。然后安装必要的模块,如 Pytorch(用于 DQ 学习模型)、Pygame(用于游戏的视觉效果)和其他基本模块。
  • 然后在刚刚创建的环境中运行agent.py文件,开始训练,你会看到如下两个GUI,一个是训练进度,一个是AI驱动的snake game。
  • 达到一定分数后可以退出游戏,刚刚训练好的模型会保存在models.py的save函数中定义的路径中。

将来,您只需更改 agent.py 文件中的代码即可使用此训练模型,如下所示:

1
python复制代码self.model.load_state_dict(torch.load('PATH'))

注意: 注释掉所有训练函数调用。

培训进度
image.png

第一代版本
InitialTraining.gif

第二代版本

Animation (1).gif

源代码: SnakeGameAI

应用:

该项目的目标是提出一个想法,即如何应用强化学习以及如何将其用于现实世界的应用程序,例如自动驾驶汽车(例如:AWS DeepRacer)、在装配线上训练机器人等等…

提示:

  • 使用单独的环境并安装所有必需的模块。(可以使用anaconda环境)
  • 为了训练模型,您可以使用 GPU 进行更快的训练。

快速总结——使用深度 Q 学习的 AI 驱动蛇游戏

我希望本系列教程能够帮助到您,博主也在学习进行中,如有什么错误的地方还望批评指正。如果您喜欢这篇文章并有兴趣看到更多此类文章,可以看看这里(Github/Gitee) 这里汇总了我的全部原创及作品源码,关注我以查看更多信息。

🧵 更多相关文章

  • Python 异常处理|Python 主题月
  • Python 多线程教程|Python 主题月
  • Python Socket 编程要点|Python 主题月
  • 30 个 Python 教程和技巧|Python 主题月
  • Python 语句、表达式和缩进|Python 主题月
  • Python 关键字、标识符和变量|Python 主题月
  • 如何在 Python 中编写注释和多行注释|Python 主题月
  • 通过示例了解 Python 数字和类型转换|Python 主题月
  • Python 数据类型——从基础到高级学习|Python 主题月
  • Python 中的面向对象编程一之类、对象和成员|Python 主题月

🍰 往日优秀文章推荐:

  • 每个人都必须知道的 20 个 Python 技巧|Python 主题月
  • 100 个基本 Python 面试问题第一部分(1-20)|Python 主题月
  • 100 个基本 Python 面试问题第二部分(21-40)|Python 主题月
  • 100 个基本 Python 面试问题第三部分(41-60)|Python 主题月
  • 100 个基本 Python 面试问题第四部分(61-80)|Python 主题月
  • 100 个基本 Python 面试问题第五部分(81-100)|Python 主题月

如果你真的从这篇文章中学到了一些新东西,喜欢它,收藏它并与你的小伙伴分享。🤗最后,不要忘了❤或📑支持一下哦

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

后台技术栈 javase javaweb jvm java8

发表于 2021-07-27

javase

  • Instant - 廖雪峰的官方网站

集合

  • 集合各实现类的底层实现原理 - kris - CSDN博客
  • java基础-集合底层原理分析 - 葫芦娃的博客 - CSDN博客
  • 深入理解Java集合之Set - CrazyHzm - CSDN博客
  • Java LinkedHashMap工作原理及实现 - 割肉机 - 博客园

异常

  • Java 中的异常和处理详解 - ImportNew

IO

  • 深入理解Java中的IO - 沉淀所有的痛 - 博客园
  • Java核心类库——IO原理和用法 - 只是小人物 - 博客园

反射

  • Java 反射由浅入深 | 进阶必备 - 掘金

多线程

  • JAVA多线程————一篇文章让你彻底征服多线程开发(一) - 姚佳伟 - CSDN博客
  • Java多线程(二) 多线程的锁机制 - 小灯笼 - 博客园
  • 如何在 Java 中正确使用 wait, notify 和 notifyAll – 以生产者消费者模型为例 - ImportNew
  • 利用wait()和notify()实现生产者与消费者问题 - 南朝烟雨的博客 - CSDN博客
  • 为什么JAVA要提供 wait/notify 机制?是为了避免轮询带来的性能损失 - jinpengrun21的专栏 - CSDN博客
  • bat等大公司常考java多线程面试题 - 掘金
  • Java 之 ThreadLocal 详解 - 掘金

泛型

  • 深入理解Java泛型
  • 擦拭法 - 廖雪峰的官方网站

javaweb

  • JavaWeb学习总结 - 随笔分类 - 孤傲苍狼 - 博客园
  • 拦截器和过滤器的区别 - THISISPAN - 博客园

jvm

第2章 java内存区域与内存溢出异常

  • 栈帧、局部变量表、操作数栈 - Bruce-水桶妖 - ITeye博客
  • 探究 Java 虚拟机栈 - ImportNew
  • JVM 内存区域大小参数设置 - 简书

第3章 垃圾收集器与内存分配策略

  • 1_成为JavaGC专家Part I – 深入浅出Java垃圾回收机制 - ImportNew
  • 2_成为JavaGC专家Part II — 如何监控Java垃圾回收机制 - ImportNew
  • 3_成为Java GC专家(3)—如何优化Java垃圾回收机制 - ImportNew

第7章 虚拟机类加载机制

  • jvm系列一:Java代码编译、执行及类加载 - CSDN博客
  • 【转】JVM介绍 - myLittleGarden - 博客园
  • 《深入理解Java虚拟机》 学习笔记(一)——JVM内存结构 | Heaven’s Door
  • 使用jvisualvm、jmc远程监控JVM_走马行酒醴,驱车布鱼肉-CSDN博客_jvisualvm远程监控jvm

java8

  • 面试又挂了,你理解了 Java 8 的 Consumer、Supplier、Predicate和Function吗? - 云+社区 - 腾讯云
  • [译] 一文带你玩转 Java8 Stream 流,从此操作集合 So Easy - 掘金
  • Java函数式编程forEach理解_keepon的博客-CSDN博客
  • java8 CompletableFuture入门 使用教程 详解所有方法 附实例

数据库

mysql

  • CodingLabs - MySQL索引背后的数据结构及算法原理
  • MySQL的多表查询(笛卡尔积原理) - ζ  简单ヾ° - 博客园
  • MySQL:索引工作原理 - 陈小峰(iefreer)的专栏 - CSDN博客
  • MySQL多表联表查询 - Leesire的专栏 - CSDN博客
  • 深入理解MySQL索引原理和实现——为什么索引可以加速查询? - tongdanping的博客 - CSDN博客

postgresql

  • 什么是PostgreSQL?跟MySQL、Oracle比强在哪? - 云+社区 - 腾讯云

oracle

  • oracle分区表的使用和查询_mzglzzc的专栏-CSDN博客
  • Oracle创建表空间、创建用户的完整过程_数据库_常今-CSDN博客
  • mysql 建表语句 及完整案例_大蛇王的博客-CSDN博客
  • (3条消息)数据库优化之百万级数据方案_平凡之路无尽路的博客-CSDN博客
  • 10分钟搞懂:亿级用户的分布式数据存储解决方案! - SegmentFault 思否
  • 讲讲NoSQL比较火的三个数据库Memcached、Redis、MongoDB - SegmentFault 思否
  • 通俗易懂的学会:SQL窗口函数 - 知乎

linux

  • linux下vi命令大全 - 星尘 - 博客园
  • linux系统中如何进入退出vim编辑器,方法及区别 - crazyYong - 博客园
  • CentOS、Ubuntu、Debian三个linux比较异同 - 记事本 - 博客频道 - CSDN.NET
  • vi & vim 插入 删除 修改 文本-garfield_trump-ChinaUnix博客

定时任务

  • (1条消息)Spring-boot定时任务,注解@Scheduled的参数说明_旷野孤星的博客-CSDN博客
  • (1条消息)spring @Scheduled注解各参数详解,定时任务_@Scheduled,定时任务_徐本锡的专栏-CSDN博客

crontab定时任务

  • Linux之crontab定时任务 - 云+社区 - 腾讯云
  • (1条消息)使用crontab,让linux定时执行shell脚本_运维_Allen的技术天空-CSDN博客

vmware

  • 一、VMware Workstation 15中文破解版 下载与安装(附密钥) - MrXiong - 博客园
  • VMware Workstation pro 15破解版安装 - 简书
  • VMware 12安装Redhat 7.0详解_m沉默-CSDN博客_vmware安装redhat7
  • 实例讲解虚拟机3种网络模式(桥接、nat、Host-only) - ggjucheng - 博客园
  • VMware Workstation安装CentOS 8 最新版 - 知乎

git

  • Git - 安装 Git
  • Git - 重置揭密
  • Git - 撤消操作

maven

  • Maven 教程 | 菜鸟教程

jenkins

  • Jenkins+Maven+Github+Springboot实现可持续自动部署(非常详细) - coder、 - 博客园

消息处理

  • Kafka、ActiveMQ、RabbitMQ、RocketMQ 区别以及高可用原理 - 云+社区 - 腾讯云

算法

  • 剑指Offer系列刷题笔记汇总

数据结构

  • 十大经典排序算法最强总结(含JAVA代码实现) - 郭耀华 - 博客园

Android

  • 五步搞定Android开发环境部署——非常详细的Android开发环境搭建教程 - 边写边唱 - 博客园
  • 如何在eclipse中添加android ADT ADT插件的安装图文教程_编程开发_软件教程_脚本之家
  • 64位win7下Android SDK Manager闪退的解决方法 - fambit025的专栏 - 博客频道 - CSDN.NET
  • AndroidDevTools Android SDK下载 Android Studio下载 Gradle下载 SDK Tools下载
  • Android SDK | Android中文API
  • 资源整合+《 Android课程视频同步笔记 》-黑马程序员IT技术论坛 - Powered by Discuz!
  • 【新提醒】【长沙校区】十年磨一剑【基哥笔记】轻松学android-黑马程序员IT技术论坛 - Powered by Discuz!
  • Android UI-实现底部切换标签(fragment) - 巫山老妖 - 博客频道 - CSDN.NET
  • SQLiteOpenHelper - Android SDK | Android Developers

开发中的常见问题

  • 为什么android中Bitmap的getWidth()返回的图片宽度是图片像素的1.5倍,getHeight()也是。_已解决_博问_博客园
  • Android-android开发中,如何让图片自动适应不同的分辨率呢? - 德问:编程社交问答
  • Android:手把手教你打造可缩放移动的ImageView(上) - 林J - 博客园
  • android如何让程序在后台运行_百度知道
  • Android 启动后台运行程序(Service) - 开源中国社区
  • 请问,如何使安卓软件中的Activity在后台以及关闭屏幕后能继续运行?_百度知道

学习资源

  • 【新提醒】资源整合+《android课程同步视频笔记》-黑马程序员IT技术论坛 - 黑马程序员快速入学必看论坛
  • 【新提醒】1000多种安卓案例源码DEMO每周分享-多的超出你的想像-黑马程序员IT技术论坛 - 黑马程序员快速入学必看论坛

计算机网络

  • 2.子网掩码详解 - Faker_Wang的博客 - CSDN博客
  • 6.tcp三次握手四次挥手(及原因)详解 - xulu_258的专栏 - CSDN博客
  • 6.理论经典:TCP协议的3次握手与4次挥手过程详解 - 简书
  • 7.在浏览器中输入www.baidu.com后执行的全部过程 - qq_33774935的博客 - CSDN博客

五层协议图

  • 计算机网络中各层的协议图表(TCP/IP) - 小胖墩 - CSDN博客
  • 计算机网络五层协议 - 大野狼来啦 - CSDN博客
  • 通俗讲解计算机网络五层协议 - 努力的小白菜 - 博客园

spark

  • Spark学习之路 (一)Spark初识 - 扎心了,老铁 - 博客园

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java字符串拼接的优雅方式 背景 String底层原理 拼

发表于 2021-07-27

背景

字符串拼接不管是在业务上,还是写算法时都会频繁使用到。对于Java来说,字符串拼接有着很多种方式,他们之间的区别是什么,对应不同的业务哪种更好用呢。

image.png

String底层原理

在讨论字符串拼接时,首先需要知道String的底层原理。

我们这里只讨论jdk1.8之后的情况,看下结构

1
arduino复制代码private final byte[] value;

这一行代码已经可以说明很多东西。字符串实质就是不可变的byte数组。因为不可变,所以对他进行拼接对他拼接实际就是生成了多个对象,这就是不鼓励对字符串进行拼接的原因。但不可变也有很多好处,例如线程安全、可以存在字符串缓冲池复用字符串等。

拼接的方法

经典但有时不优雅的 +

1
2
3
ini复制代码String a = "123";
String b = "456";
String c = a + b;

c这个字符串就是ab拼接起来的字符串,“123456”

这段代码反编译出来的代码是

1
2
scss复制代码String c = (new StringBuilder()).append(a).append(b).toString();
​

可以看出这个 + 是Java的语法糖,他实际上是调用的StringBuilder,通过append()来进行拼接。关于StringBuilder我们后面再讲,先来讲下这个用法的优缺点。

优点

“+”,最大的优点就是简洁。如果两个字符串需要首尾拼接,+号义不容辞的成为了最好的使用方式。

缺点

说到缺点的话就多了。简洁也是他的最大缺点,也就是不够灵活。

业务一

有一个字符串List,我需要把他们拼接起来,怎么办?

1
2
3
scss复制代码for(String tmp:list){
s += tmp;
}

简洁的一批,但是他隐藏着很大的问题!

image.png

上面说到这种拼接方式实际是通过StringBuilder的append的方法。你不需要知道他的原理,你只需要知道,每次循环,他都会new一个StringBuilder对象。创建对象的开销是很大的,如果List有几千几万,内存开销和时间开销是不能接受的!

所以阿里巴巴的规范说到:

img

表面上是推荐,实际就是禁止。写算法会消耗大量时间导致不通过,业务也会因为这种方式提高了无故的开销,属于领导看了想打死的代码。

业务二

大家好,我叫XX,我是来自XXX学校的大X学生,我的爱好是XXX。

一个经典的模板,我需要替换掉中间的XXX为controller的参数,怎么办呢?

1
ini复制代码String s = "大家好,我叫"+name+"我是来自"+school+"学校的大"+num+“学生,我的爱好是”+aihao;

属于可用但极其丑陋的代码。如果其他接口也需要这个模板,我还要把这段话复制到所有位置上吗?如果我要改动这个,我要对所有代码进行改动吗。

万能的StringBuilder

先介绍下StringBuilder的原理。把字符串拼接想象成数组就很好理解了,StringBuilder有点类似于ArrayList,可变数组。

1
2
3
4
scss复制代码    /**
    * The value is used for character storage.
    */
   char[] value;

区别就是没有final修饰,当到达阈值时进行扩容操作。append方法就是往后插入。

那么就可以解决上面业务一的问题了。

1
2
3
4
5
ini复制代码StringBuilder sb = new StringBuilder();
for(String tmp:list){
sb.append(tmp);
}
String s = sb.tostring();

相比于上面,只创建了一个StringBuilder对象,减少循环创建的开销。

线程安全的StringBuffer

StringBuffer与StringBuilder相比,有线程安全的优势,通过上锁的方式。同时导致效率略低于StringBuilder。

灵活的String.format()

这个严格来说应该叫做格式化,但也可以用来拼接。

熟悉c语言的应该能够懂,我这里举一个例子

1
2
arduino复制代码String msg = String.format(“我是%s小学的学生,我爱吃%s”,"阳光","屎");
//输出 我是阳光小学的学生,我爱吃屎

使用字符串链代替%s,生成需要的字符串。也不仅可以拼接字符串,可以看下下图(偷的图,没全部验证过,错了别找我)

类型

这种方式就解决了业务二的问题。通过编写枚举或者常量字符串留出对应的位置,使用时再用String.format()拼接。

有点绿色的concat

为什么说他绿色呢,就是我还没有找到他有什么优势。

1
2
3
ini复制代码String s = "123".concat("456");
//结果等价于
String s = "123" + "456";

concat方法的原理是数组扩容后复制之前的内容并写新的内容,和StringBuilder底层有点相像。

但是相比于“+”号来说,既不简便,又没有什么效率上的提高。在循环字符串拼接的条件,效率上会略有一点优势,但是这种情况是根本不被允许的,所以concat就很鸡肋。

JDK1.8优雅写法

刚才提到业务一的解决办法可以使用朴素的StringBuilder来解决,但是对于业务代码来说有一点冗长。

Jdk1.8给出了优雅的答案

1
ini复制代码String s = String.join("_", list);

一行代码,就可以把list里的字符串通过“_”拼接起来。

经典的Guava

guava是我们crud程序员的好伙伴,这里就不用多说了。我们最常接触到的其实就是guava的本地缓存和字符串操作。

1
ini复制代码String result = Joiner.on(",").join(list);

也是简洁的一句话,但是相比于jdk本土的字符串方法来说,他还有一些其他的特性。例如可以把为null的数组给跳过或者替换掉等等。功能要比jdk的要丰富一点。在正常的web项目里基本都会有Guava的依赖,使用起来还是很方便的。

总结

这篇文章偏重于代码编写方面,如何写出简洁高效的代码,是我们要追求的。不要让你写的垃圾代码恶心到接手的同事就好了。

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【每日算法】一题双解 「树的遍历」&「递归」 |Pytho

发表于 2021-07-27

本文正在参加「Python主题月」,详情查看 活动链接

题目描述

这是 LeetCode 上的 671. 二叉树中第二小的节点 ,难度为 简单。

Tag : 「二叉树」、「树的遍历」、「递归」

给定一个非空特殊的二叉树,每个节点都是正数,并且每个节点的子节点数量只能为 2 或 0。如果一个节点有两个子节点的话,那么该节点的值等于两个子节点中较小的一个。

更正式地说,root.val = min(root.left.val, root.right.val) 总成立。

给出这样的一个二叉树,你需要输出所有节点中的第二小的值。如果第二小的值不存在的话,输出 -1 。

示例 1:

1
2
3
4
5
csharp复制代码输入:root = [2,2,5,null,null,5,7]

输出:5

解释:最小的值是 2 ,第二小的值是 5 。

示例 2:

1
2
3
4
5
ini复制代码输入:root = [2,2,2]

输出:-1

解释:最小的值是 2, 但是不存在第二小的值。

提示:

  • 树中节点数目在范围 [1,25][1, 25][1,25] 内
  • 111 <= Node.val <= 2312^{31}231 - 1
  • 对于树中每个节点 root.val == min(root.left.val, root.right.val)

树的遍历

一个朴素的做法是,直接对树进行遍历(广度 & 深度),使用 HashSet 进行存储,得到所有去重后的节点大小。

然后找次小值的方式有多种:可以通过排序找次小值,复杂度为 O(nlog⁡n)O(n\log{n})O(nlogn);也可以使用经典的两个变量 & 一次遍历的方式,找到次小值,复杂度为 O(n)O(n)O(n)。

Java 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Java复制代码class Solution {
Set<Integer> set = new HashSet<>();
public int findSecondMinimumValue(TreeNode root) {
dfs(root);
if (set.size() < 2) return -1;
int first = Integer.MAX_VALUE, second = Integer.MAX_VALUE;
for (int i : set) {
if (i <= first) {
second = first;
first = i;
} else if (i <= second) {
second = i;
}
}
return second;
}
void dfs(TreeNode root) {
if (root == null) return;
set.add(root.val);
dfs(root.left);
dfs(root.right);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Java复制代码class Solution {
Set<Integer> set = new HashSet<>();
public int findSecondMinimumValue(TreeNode root) {
bfs(root);
if (set.size() < 2) return -1;
int first = Integer.MAX_VALUE, second = Integer.MAX_VALUE;
for (int i : set) {
if (i <= first) {
second = first;
first = i;
} else if (i <= second) {
second = i;
}
}
return second;
}
void bfs(TreeNode root) {
Deque<TreeNode> d = new ArrayDeque<>();
d.addLast(root);
while (!d.isEmpty()) {
TreeNode poll = d.pollFirst();
set.add(poll.val);
if (poll.left != null) d.addLast(poll.left);
if (poll.right != null) d.addLast(poll.right);
}
}
}

Python 3 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Python复制代码class Solution:
hashset = set()
def findSecondMinimumValue(self, root: TreeNode) -> int:
self.hashset = set()
self.dfs(root)
if len(self.hashset) < 2:
return -1
first = second = inf
for i in self.hashset:
if i <= first:
second = first
first = i
elif i <= second:
second = i
return second

def dfs(self, root):
if not root:
return
self.hashset.add(root.val)
self.dfs(root.left)
self.dfs(root.right)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Python复制代码class Solution:
hashset = set()
def findSecondMinimumValue(self, root: TreeNode) -> int:
self.hashset = set()
self.bfs(root)
if len(self.hashset) < 2:
return -1
first = second = inf
for i in self.hashset:
if i <= first:
second = first
first = i
elif i <= second:
second = i
return second

def bfs(self, root):
d = deque([root])
while d:
poll = d.popleft()
self.hashset.add(poll.val)
if poll.left:
d.append(poll.left)
d.append(poll.right)
  • 时间复杂度:树的搜索复杂度为 O(n)O(n)O(n),通过线性遍历找次小值,复杂度为 O(n)O(n)O(n)。整体复杂度为 O(n)O(n)O(n)
  • 空间复杂度:O(n)O(n)O(n)

递归

解法一显然没有利用到本题核心条件 :「root.val = min(root.left.val, root.right.val)」和「每个子节点数量要么是 0 要么是 2」。

我们可以设计如下递归函数,含义为 从 root 为根的树进行搜索,找到值比 cur 大的最小数。然后使用全局变量 ans 存储答案。

1
Java复制代码void dfs(TreeNode root, int cur)

那么最终搜索范围为 dfs(root, root.val),这是因为 性质 root.val = min(root.left.val, root.right.val),即最小值会不断往上传递,最终根节点必然是全局最小值。

然后再结合「每个子节点数量要么是 0 要么是 2」,我们可以特判一下 ans 是否为第一次赋值,如果给 ans 赋了新值或者更新了更小的 ans,则不再需要往下搜索了。

Java 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Java复制代码class Solution {
int ans = -1;
public int findSecondMinimumValue(TreeNode root) {
dfs(root, root.val);
return ans;
}
void dfs(TreeNode root, int cur) {
if (root == null) return ;
if (root.val != cur) {
if (ans == -1) ans = root.val;
else ans = Math.min(ans, root.val);
return ;
}
dfs(root.left, cur);
dfs(root.right, cur);
}
}

Python 3 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Python复制代码class Solution:
ans = -1
def findSecondMinimumValue(self, root: TreeNode) -> int:
self.ans = -1
self.dfs(root, root.val)
return self.ans

def dfs(self, root, cur):
if not root:
return
if root.val != cur:
if self.ans == -1:
self.ans = root.val
else:
self.ans = min(self.ans, root.val)
return
self.dfs(root.left, cur)
self.dfs(root.right, cur)
  • 时间复杂度:O(n)O(n)O(n)
  • 空间复杂度:忽略递归带来的空间开销。复杂度为 O(1)O(1)O(1)

最后

这是我们「刷穿 LeetCode」系列文章的第 No.671 篇,系列开始于 2021/01/01,截止于起始日 LeetCode 上共有 1916 道题目,部分是有锁题,我们将先把所有不带锁的题目刷完。

在这个系列文章里面,除了讲解解题思路以外,还会尽可能给出最为简洁的代码。如果涉及通解还会相应的代码模板。

为了方便各位同学能够电脑上进行调试和提交代码,我建立了相关的仓库:github.com/SharingSour… 。

在仓库地址里,你可以看到系列文章的题解链接、系列文章的相应代码、LeetCode 原题链接和其他优选题解。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

杂谈 Java SPI

发表于 2021-07-27

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

虽然是一个老概念 , 但是都怪自己的轻微强迫症 , 有个点没搞清楚 , 就非要把这个概念翻出来看看, 这一篇是对 Java SPI 概念的一个完善 , 为后续的 Dubbo 等框架的分析做准备

Java SPI 是 JDK提供的SPI(Service Provider Interface)机制 , SPI 机制的核心在于 ServiceLoader , 用于加载接口对应的实现类

用一句话来解释 , 就是以 Java 的方式查找接口对应的实现类 , 实现解耦 (区别于 Spring 里面的Bean工具 , SPI 的方式更加底层)

二 . 知识点

SPI 机制中有四个组成部分 :

  • Service Providers : 服务器供应商
    • Installing Service Providers : 安装服务供应商
    • Loading Service providers : 装载服务供应商
  • Service Loader : 服务承载程式

Service Provider : 服务提供者
Service Provider 是 SPI 的特定实现。服务提供者包含一个或多个实现或扩展服务类型的具体类 , 服务提供者是通过我们放在资源目录 META-INF/services 中的提供者配置文件来配置和标识的

ServiceLoader
ServiceLoader 是 SPI 的核心类 , 它的作用是发现和惰性加载实现 , 它使用上下文类路径来定位提供程序实现并将它们放在内部缓存中。

常用的 SPI 类

  • CurrencyNameProvider : 为currency类提供本地化的货币符号
  • LocaleNameProvider : 为Locale类提供本地化名称
  • TimeZoneNameProvider : 为TimeZone类提供本地化的时区名称
  • DateFormatProvider : 提供指定区域的日期和时间格式
  • NumberFormatProvider : 为NumberFormat类提供货币值、整数和百分比值.
  • Driver : 从4.0版本开始,JDBC API支持SPI模式
  • PersistenceProvider : 提供JPA API的实现。
  • JsonProvider : 提供JSON处理对象
  • JsonbProvider : 提供JSON绑定对象
  • Extension : 为CDI容器提供扩展
  • ConfigSourceProvider : 提供用于检索配置属性的源

三 . SPI 的使用

SPI 的使用中我分成了三个包 :

1
2
3
4
5
6
7
8
9
10
java复制代码// provider-api : API 描述包 , 包含 Provider 接口 
|- ExchangeRateProvider : Provider 接口
|- QuoteManager : 一个需要我们通过 SPI 构建的业务接口
|- ProviderManager : Provider 管理器 , 加载 Provider 实现类

//provider-impl : 接口实现类 , 也是我们最终需要 Loader 出的类
|- YahooFinanceExchangeRateProvider
|- YahooQuoteManagerImpl

//server-application : 业务包 , 业务处理 , 获得 API 类

3.1 provider-api 包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码// Step 1 : Provider 接口 , 我们用它返回一个通用的业务类
public interface ExchangeRateProvider {
QuoteManager create();
}

// Step 2 : 这个是我们的业务类
public interface QuoteManager {
List<String> getQuotes(String baseCurrency, LocalDate date);
}

// Step 3 : Provider 管理工具 , 加载出 Provider
public class ProviderManager {

private Logger logger = LoggerFactory.getLogger(this.getClass());

public Iterator<ExchangeRateProvider> providers(boolean refresh) {
logger.info("------> [Step 1 : 进入 Provider 处理流程] <-------");
ServiceLoader<ExchangeRateProvider> loader = ServiceLoader.load(ExchangeRateProvider.class);

if (refresh) {
loader.reload();
}
Iterator<ExchangeRateProvider> provider = loader.iterator();


return provider;

}


}

3.2 provider-impl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码// 实现类
public class YahooFinanceExchangeRateProvider implements ExchangeRateProvider {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public QuoteManager create() {
logger.info("------> this is create <-------");
return new YahooQuoteManagerImpl();
}
}

public class YahooQuoteManagerImpl implements QuoteManager {

@Override
public List<String> getQuotes(String baseCurrency, LocalDate date) {
return new ArrayList<>();
}
}

3.3 Server applicaiton

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class StartService implements ApplicationRunner {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("------> [App 中获取] <-------");

ProviderManager providerManager = new ProviderManager();

Iterator<ExchangeRateProvider> providers = providerManager.providers(true);


while (providers.hasNext()) {
logger.info("------> [providers 获取完成 :{}] <-------", providers.next().create());
}

}
}

PS : 这里有个很重要的东西 , 你需要 META-INF/services 中添加对应的文件

  • 文件名 : Provider 接口
  • 文件内容 : 涉及到的实现类

image.png

这个文件放在 impl 和 app 中都行 , 实际上引包了就会扫描

1
2
3
4
5
6
7
8
9
10
11
xml复制代码<dependency>
<groupId>com.gang.study</groupId>
<artifactId>provider-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>com.gang.study</groupId>
<artifactId>provider-impl</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

PS : 内容已提交到 Git , 欢迎 Star !!!! 👉 Case/java/spi

四 . SPI 源码深入

如果就这么结束当然不符合我一贯的做法 , 源码还是要看一下的, 别说 , 还真有一些启发

4.1 运行的走向

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码// Step 1 : 发起 Providers 加载操作
Iterator<ExchangeRateProvider> providers = providerManager.providers(true);

// Step 2 : ServiceLoader 执行加载
ServiceLoader<ExchangeRateProvider> loader = ServiceLoader.load(ExchangeRateProvider.class);

// Step 3 : ServiceLoader 构造器
private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
reload();
}

// Step 4 : reload 加载
public void reload() {
providers.clear();
lookupIterator = new LazyIterator(service, loader);
}

4.2 属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码// 默认从  META-INF/services/ 路径下加载
private static final String PREFIX = "META-INF/services/";

// 表示正在加载的服务的类或接口
private final Class<S> service;

// 类加载器用于定位、加载和实例化提供程序
private final ClassLoader loader;

// 创建ServiceLoader时获取的访问控制上下文
private final AccessControlContext acc;

// 缓存的提供商,按实例化顺序
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();

// 惰性查找迭代器 , 最终获取会通过这个定制的迭代器
private LazyIterator lookupIterator;

4.3 resource 的加载

这里的核心是做了一个定制的实现类 LazyIterator

前面看了 ServiceLoader 的构建 , 这里来看一下 resource 的加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码// 这里不需要深入太多 , 主要是资源加载处理 Service 类

C- ServiceLoader
public Iterator<S> iterator() {

// 核心一 : 对迭代器做了简单的实现 , 用来调用定制的迭代器 LazyIterator
return new Iterator<S>() {
Iterator<Map.Entry<String,S>> knownProviders = providers.entrySet().iterator();

public boolean hasNext() {
if (knownProviders.hasNext()){
return true;
}
// --> 最终调用 hasNextService()
return lookupIterator.hasNext();
}

public S next() {
if (knownProviders.hasNext()){
return knownProviders.next().getValue();
}
return lookupIterator.next();
}

public void remove() {
throw new UnsupportedOperationException();
}

};
}

// 判断是否存在下一个
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
// META-INF/services/java.util.spi.ResourceBundleControlProvider
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
// 通过 classLoader 加载 resource
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}

while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
// 获得实现类的名称 com.gang.spi.demo.service.YahooFinanceExchangeRateProvider
nextName = pending.next();
return true;
}

// 读取 Resource 中资源
private Iterator<String> parse(Class<?> service, URL u)throws ServiceConfigurationError{
InputStream in = null;
BufferedReader r = null;
// 加载 SPI impl l
ArrayList<String> names = new ArrayList<>();
try {
in = u.openStream();
r = new BufferedReader(new InputStreamReader(in, "utf-8"));
int lc = 1;
// Stream 逐行加载
while ((lc = parseLine(service, u, r, lc, names)) >= 0);
} catch (IOException x) {
fail(service, "Error reading configuration file", x);
} finally {
// .... 省略 close
}
return names.iterator();
}

4.4 load 处理加载 Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码// 迭代器迭代
public S next() {
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}

// 实例化 ProviderImpl
private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
// 通过 cn 获取对象的 class 类
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
// new ServiceConfigurationError(service.getName() + ": " + msg)
fail(service,"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,"Provider " + cn + " not a subtype");
}
try {
// 实例化 Service : com.gang.spi.api.service.ExchangeRateProvider
S p = service.cast(c.newInstance());
// cn : com.gang.spi.demo.service.YahooFinanceExchangeRateProvider
// p : com.gang.spi.demo.service.YahooFinanceExchangeRateProvider
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,"Provider " + cn + " could not be instantiated",x);
}
throw new Error(); // This cannot happen
}

PS: 核心就2个 ,一个是hasNext 中加载 resource , 再在 nextService 中实例化对应的server

这样的好处是 , 只有在正在使用的时候 , 才会真的去实例化这个对象 !!!

五 . 定制与比较

这里主要对比 Spring SPI 的加载方式 , 详见这一篇 盘点 SpringBoot : Factories 处理流程

文件的配置方面 : Spring 中使用的是 SpringFactoriesLoader , 其实与 Java 的方式是很像的 , 但是 Spring 的模式下 , 允许一个 factories 文件装载更多的类 , 使用更加简单 .


资源的加载方法 : 2 者都是通过 classLoader 加载 Resource , 并没有太大本质的区别


而资源的实例化方面 : Spring 通过一个 instantiateFactory 方法触发 ,但是同样的 , 也是 class 反射的原理


高并发情况 : 在调用 hasNext 的时候 , 加载 resource , 在迭代时才实例化 看起来好像没有什么问题 , 但是其 classLoader , provider 都是放在 ServiceLoader 对象属性中 , 多线程情况下会存在冲突

而 Spring 的模式 , 在 Server 启动时 , 就加载对象 Factories , 相对安全很多.


总得来说 , Spring Factories 的 Java SPI 的逻辑思路是一致的 , Java SPI 通过 LazyIterator 加载的方式比较骚气 ,但是相对而言获取起来就会很复杂 .

所以 , 完全可以使用 Spring Factories 来完成你自己想要的业务.

总结

定制 Iterator 完成业务是一个不错的思路

参考与感谢

www.baeldung.com/java-spi

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…590591592…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%