开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Spring的核心功能和执行流程(下) IOC 和 DI 扩

发表于 2021-11-16

「这是我参与11月更文挑战的第13天,活动详情查看:2021最后一次更文挑战」

IOC 和 DI

IOC(Inversion of Control,翻译为“控制反转”)不是一个具体的技术,而是一种设计思想。与传统控制流相比,IOC 会颠倒控制流,在传统的编程中需要开发者自行创建并销毁对象,而在 IOC 中会把这些操作交给框架来处理,这样开发者就不用关注具体的实现细节了,拿来直接用就可以了,这就是控制反转。

IOC 很好的体现出了面向对象的设计法则之一——好莱坞法则:“别找我们,我们找你”。即由 IOC 容器帮对象找到相应的依赖对象并注入,而不是由对象主动去找。

DI(Dependency Injection,翻译为“依赖注入”)表示组件间的依赖关系交由容器在运行期自动生成,也就是说,由容器动态的将某个依赖关系注入到组件之中,这样就能提升组件的重用频率。通过依赖注入机制,我们只需要通过简单的配置,就可指定目标需要的资源,完成自身的业务逻辑,而不需要关心资源来自哪里、由谁实现等问题。

IOC 和 DI 其实是同一个概念从不同角度的描述的,由于控制反转这个概念比较含糊(可能只理解成了容器控制对象这一个层面,很难让人想到谁来维护对象关系),所以 2004 年被开发者尊称为“教父”的 Martin Fowler(世界顶级专家,敏捷开发方法的创始人之一)又给出了一个新的名字“依赖注入”,相对 IOC 而言,“依赖注入”明确描述了“被注入对象依赖 IOC 容器配置依赖对象”。

扩展

1.Spring IOC 的优点

IOC 的优点有以下几个:

  • 使用更方便,拿来即用,无需显式的创建和销毁的过程;
  • 可以很容易提供众多服务,比如事务管理、消息服务等;
  • 提供了单例模式的支持;
  • 提供了 AOP 抽象,利用它很容易实现权限拦截、运行期监控等功能;
  • 更符合面向对象的设计法则;
  • 低侵入式设计,代码的污染极低,降低了业务对象替换的复杂性。

2.Spring IOC 注入方式汇总

IOC 的注入方式有三种:构造方法注入、Setter 注入和接口注入。

1.构造方法注入

构造方法注入主要是依赖于构造方法去实现,构造方法可以是有参的也可以是无参的,我们平时 new 对象时就是通过类的构造方法来创建类对象的,每个类对象默认会有一个无参的构造方法,Spring 通过构造方法注入的代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class Person {
    public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
private int id;
private String name;
    // 忽略 Setter、Getter 的方法
}

applicationContext.xml 配置如下:

1
2
3
4
xml复制代码<bean id="person" class="org.springframework.beans.Person">
    <constructor-arg value="1" type="int"></constructor-arg>
    <constructor-arg value="Java" type="java.lang.String"></constructor-arg>
</bean>

2.Setter注入

Setter 方法注入的方式是目前 Spring 主流的注入方式,它可以利用 Java Bean 规范所定义的 Setter/Getter 方法来完成注入,可读性和灵活性都很高,它不需要使用声明式构造方法,而是使用 Setter 注入直接设置相关的值,实现示例如下:

1
2
3
4
java复制代码<bean id="person" class="org.springframework.beans.Person">
    <property name="id" value="1"/>
    <property name="name" value="Java"/>
</bean>

3.接口注入

接口注入方式是比较古老的注入方式,因为它需要被依赖的对象实现不必要的接口,带有侵入性,因此现在已经被完全舍弃了,所以本文也不打算做过多的描述,大家只要知道有这回事就行了。

3.Spring AOP

AOP(Aspect-Oriented-Programming,面向切面编程)可以说是 OOP(Object-Oriented Programing,面向对象编程)的补充和完善,OOP 引入封装、继承和多态性等概念来建立一种公共对象处理的能力,当我们需要处理公共行为的时候,OOP 就会显得无能为力,而 AOP 的出现正好解决了这个问题。比如统一的日志处理模块、授权验证模块等都可以使用 AOP 很轻松的处理。

Spring AOP 目前提供了三种配置方式:

  • 基于 Java API 的方式;
  • 基于 @AspectJ(Java)注解的方式;
  • 基于 XML<aop />标签的方式。

1.基于Java API的方式

此配置方式需要实现相关的接口,例如 MethodBeforeAdvice 和 AfterReturningAdvice,并且在 XML 配置中定义相应的规则即可实现。

我们先来定义一个实体类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码package org.springframework.beans;
​
​
public class Person {
   public Person findPerson() {
      Person person = new Person(1, "JDK");
      System.out.println("findPerson 被执行");
      return person;
   }
   public Person() {
   }
   public Person(Integer id, String name) {
      this.id = id;
      this.name = name;
   }
   private Integer id;
   private String name;
   // 忽略 Getter、Setter 方法
}

再定义一个 advice 类,用于对拦截方法的调用之前和调用之后进行相关的业务处理,实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码import org.springframework.aop.AfterReturningAdvice;
import org.springframework.aop.MethodBeforeAdvice;
​
import java.lang.reflect.Method;
​
public class MyAdvice implements MethodBeforeAdvice, AfterReturningAdvice {
   @Override
   public void before(Method method, Object[] args, Object target) throws Throwable {
      System.out.println("准备执行方法: " + method.getName());
   }
​
   @Override
   public void afterReturning(Object returnValue, Method method, Object[] args, Object target) throws Throwable {
      System.out.println(method.getName() + " 方法执行结束");
   }

然后需要在 application.xml 文件中配置相应的拦截规则,配置如下:

1
2
3
4
5
6
7
8
9
10
xml复制代码<!-- 定义 advisor -->
<bean id="myAdvice" class="org.springframework.advice.MyAdvice"></bean>
<!-- 配置规则,拦截方法名称为 find* -->
<bean class="org.springframework.aop.support.RegexpMethodPointcutAdvisor">
    <property name="advice" ref="myAdvice"></property>
    <property name="pattern" value="org.springframework.beans.*.find.*"></property>
</bean>
​
<!-- 定义 DefaultAdvisorAutoProxyCreator 使所有的 advisor 配置自动生效 -->
<bean class="org.springframework.aop.framework.autoproxy.DefaultAdvisorAutoProxyCreator"></bean>

从以上配置中可以看出,我们需要配置一个拦截方法的规则,然后定义一个 DefaultAdvisorAutoProxyCreator 让所有的 advisor 配置自动生效。

最后,我们使用测试代码来完成调用:

1
2
3
4
5
6
7
8
java复制代码public class MyApplication {
   public static void main(String[] args) {
      ApplicationContext context =
            new ClassPathXmlApplicationContext("classpath*:application.xml");
      Person person = context.getBean("person", Person.class);
      person.findPerson();
   }
}

以上程序的执行结果为:

1
2
3
makefile复制代码准备执行方法: findPerson
findPerson 被执行
findPerson 方法执行结束

可以看出 AOP 的拦截已经成功了。

2.基于@AspectJ注解的方式

首先需要在项目中添加 aspectjweaver 的 jar 包,配置如下:

1
2
3
4
5
6
xml复制代码<!-- https://mvnrepository.com/artifact/org.aspectj/aspectjweaver -->
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>1.9.5</version>
</dependency>

此 jar 包来自于 AspectJ,因为 Spring 使用了 AspectJ 提供的一些注解,因此需要添加此 jar 包。之后,我们需要开启 @AspectJ 的注解,开启方式有两种。

可以在 application.xml 配置如下代码中开启 @AspectJ 的注解:

1
xml复制代码<aop:aspectj-autoproxy/>

也可以使用 @EnableAspectJAutoProxy注解开启,代码如下:

1
2
3
4
java复制代码@Configuration
@EnableAspectJAutoProxy
public class AppConfig {
}

之后我们需要声明拦截器的类和拦截方法,以及配置相应的拦截规则,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
​
@Aspect
public class MyAspectJ {
​
   // 配置拦截类 Person
   @Pointcut("execution(* org.springframework.beans.Person.*(..))")
   public void pointCut() {
   }
​
   @Before("pointCut()")
   public void doBefore() {
      System.out.println("执行 doBefore 方法");
   }
​
   @After("pointCut()")
   public void doAfter() {
      System.out.println("执行 doAfter 方法");
  }
}

然后我们只需要在 application.xml 配置中添加注解类,配置如下:

1
xml复制代码<bean class="org.springframework.advice.MyAspectJ"/>

紧接着,我们添加一个需要拦截的方法:

1
2
3
4
5
6
7
8
9
10
11
java复制代码package org.springframework.beans;
​
// 需要拦截的 Bean
public class Person {
   public Person findPerson() {
      Person person = new Person(1, "JDK");
      System.out.println("执行 findPerson 方法");
      return person;
   }
    // 获取其他方法
}

最后,我们开启测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码import org.springframework.beans.Person;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
​
public class MyApplication {
   public static void main(String[] args) {
      ApplicationContext context =
            new ClassPathXmlApplicationContext("classpath*:application.xml");
      Person person = context.getBean("person", Person.class);
      person.findPerson();
   }
}

以上程序的执行结果为:

1
2
3
复制代码执行 doBefore 方法
执行 findPerson 方法
执行 doAfter 方法

可以看出 AOP 拦截成功了。

3.基于 XML <aop /> 标签的方式

基于 XML 的方式与基于注解的方式类似,只是无需使用注解,把相关信息配置到 application.xml 中即可,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
xml复制代码<!-- 拦截处理类 -->
<bean id="myPointcut" class="org.springframework.advice.MyPointcut"></bean>
<aop:config>
    <!-- 拦截规则配置 -->
    <aop:pointcut id="pointcutConfig"
                    expression="execution(* org.springframework.beans.Person.*(..))"/>
    <!-- 拦截方法配置 -->
    <aop:aspect ref="myPointcut">
        <aop:before method="doBefore" pointcut-ref="pointcutConfig"/>
        <aop:after method="doAfter" pointcut-ref="pointcutConfig"/>
    </aop:aspect>
</aop:config>

之后,添加一个普通的类来进行拦截业务的处理,实现代码如下:

1
2
3
4
5
6
7
8
java复制代码public class MyPointcut {
   public void doBefore() {
      System.out.println("执行 doBefore 方法");
   }
   public void doAfter() {
      System.out.println("执行 doAfter 方法");
   }
}

拦截的方法和测试代码与第二种注解的方式相同,这里就不在赘述。

最后执行程序,执行结果为:

1
2
3
复制代码执行 doBefore 方法
执行 findPerson 方法
执行 doAfter 方法

可以看出 AOP 拦截成功了。

Spring AOP 的原理其实很简单,它其实就是一个动态代理,我们在调用 getBean() 方法的时候返回的其实是代理类的实例,而这个代理类在 Spring 中使用的是 JDK Proxy 或 CgLib 实现的,它的核心代码在 DefaultAopProxyFactory#createAopProxy(…) 中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class DefaultAopProxyFactory implements AopProxyFactory, Serializable {
​
@Override
public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
Class<?> targetClass = config.getTargetClass();
if (targetClass == null) {
throw new AopConfigException("TargetSource cannot determine target class: " +
"Either an interface or a target is required for proxy creation.");
}
            // 判断目标类是否为接口
if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
                // 是接口使用 jdk 的代理
return new JdkDynamicAopProxy(config);
}
            // 其他情况使用 CgLib 代理
return new ObjenesisCglibAopProxy(config);
}
else {
return new JdkDynamicAopProxy(config);
}
}
    // 忽略其他代码
}

小结

以上讲了 IOC 和 DI 概念,以及 IOC 的优势和 IOC 注入的三种方式:构造方法注入、Setter 注入和接口注入,最后讲了 Spring AOP 的概念与它的三种配置方式:基于 Java API 的方式、基于 Java 注解的方式和基于 XML 标签的方式。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

彻底搞懂Spring状态机原理,实现订单与物流解耦

发表于 2021-11-16

本文节选自《设计模式就该这样学》

1 状态模式的UML类图

状态模式的UML类图如下图所示。

file

2 使用状态模式实现登录状态自由切换

当我们在社区阅读文章时,如果觉得文章写得很好,我们就会评论、收藏两连发。如果处于登录情况下,则可以直接做评论、收藏这些行为。否则,跳转到登录界面,登录后再继续执行先前的动作。这里涉及的状态有两种:登录与未登录;行为有两种:评论和收藏。下面使用状态模式来实现这个逻辑,代码如下。
首先创建抽象状态角色UserState类。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
public abstract class UserState {
protected AppContext context;

public void setContext(AppContext context) {
this.context = context;
}

public abstract void favorite();

public abstract void comment(String comment);
}

然后创建登录状态LogInState类。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
public class LoginInState extends UserState {
@Override
public void favorite() {
System.out.println("收藏成功!");
}

@Override
public void comment(String comment) {
System.out.println(comment);
}
}

创建未登录状态UnloginState类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码
public class UnLoginState extends UserState {
@Override
public void favorite() {
this.switch2Login();
this.context.getState().favorite();
}

@Override
public void comment(String comment) {
this.switch2Login();
this.context.getState().comment(comment);
}

private void switch2Login() {
System.out.println("跳转到登录页面!");
this.context.setState(this.context.STATE_LOGIN);
}
}

创建上下文角色AppContext类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码
public class AppContext {
public static final UserState STATE_LOGIN = new LoginInState();
public static final UserState STATE_UNLOGIN = new UnLoginState();
private UserState currentState = STATE_UNLOGIN;
{
STATE_LOGIN.setContext(this);
STATE_UNLOGIN.setContext(this);
}

public void setState(UserState state) {
this.currentState = state;
this.currentState.setContext(this);
}

public UserState getState() {
return this.currentState;
}

public void favorite() {
this.currentState.favorite();
}

public void comment(String comment) {
this.currentState.comment(comment);
}
}

最后编写客户端测试代码。

1
2
3
4
5
6
java复制代码
public static void main(String[] args) {
AppContext context = new AppContext();
context.favorite();
context.comment("评论: 好文章,360个赞!");
}

运行结果如下图所示。

file

3 使用状态机实现订单状态流转控制

状态机是状态模式的一种应用,相当于上下文角色的一个升级版。在工作流或游戏等各种系统中有大量使用,如各种工作流引擎,它几乎是状态机的子集和实现,封装状态的变化规则。Spring也提供了一个很好的解决方案。Spring中的组件名称就叫作状态机(StateMachine)。状态机帮助开发者简化状态控制的开发过程,让状态机结构更加层次化。下面用Spring状态机模拟一个订单状态流转的过程。

3.1 添加依赖。

1
2
3
4
5
6
java复制代码
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>

3.2 创建订单实体Order类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码
public class Order {
private int id;
private OrderStatus status;
public void setStatus(OrderStatus status) {
this.status = status;
}

public OrderStatus getStatus() {
return status;
}

public void setId(int id) {
this.id = id;
}

public int getId() {
return id;
}

@Override
public String toString() {
return "订单号:" + id + ", 订单状态:" + status;
}
}

3.3 创建订单状态枚举类和状态转换枚举类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码
/**
* 订单状态
*/
public enum OrderStatus {
//待支付,待发货,待收货,订单结束
WAIT_PAYMENT, WAIT_DELIVER, WAIT_RECEIVE, FINISH;
}

/**
* 订单状态改变事件
*/
public enum OrderStatusChangeEvent {
//支付,发货,确认收货
PAYED, DELIVERY, RECEIVED;
}

3.4 添加状态流转配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
java复制代码
/**
* 订单状态机配置
*/
@Configuration
@EnableStateMachine(name = "orderStateMachine")
public class OrderStateMachineConfig extends StateMachineConfigurerAdapter<OrderStatus, OrderStatusChangeEvent> {

/**
* 配置状态
* @param states
* @throws Exception
*/
public void configure(StateMachineStateConfigurer<OrderStatus, OrderStatusChangeEvent> states) throws Exception {
states
.withStates()
.initial(OrderStatus.WAIT_PAYMENT)
.states(EnumSet.allOf(OrderStatus.class));
}

/**
* 配置状态转换事件关系
* @param transitions
* @throws Exception
*/
public void configure(StateMachineTransitionConfigurer<OrderStatus, OrderStatusChangeEvent> transitions) throws Exception {
transitions
.withExternal().source(OrderStatus.WAIT_PAYMENT).target(OrderStatus.WAIT_DELIVER)
.event(OrderStatusChangeEvent.PAYED)
.and()
.withExternal().source(OrderStatus.WAIT_DELIVER).target(OrderStatus.WAIT_RECEIVE)
.event(OrderStatusChangeEvent.DELIVERY)
.and()
.withExternal().source(OrderStatus.WAIT_RECEIVE).target(OrderStatus.FINISH)
.event(OrderStatusChangeEvent.RECEIVED);
}

/**
* 持久化配置
* 在实际使用中,可以配合Redis等进行持久化操作
* @return
*/
@Bean
public DefaultStateMachinePersister persister(){
return new DefaultStateMachinePersister<>(new StateMachinePersist<Object, Object, Order>() {
@Override
public void write(StateMachineContext<Object, Object> context, Order order) throws Exception {
//此处并没有进行持久化操作
}

@Override
public StateMachineContext<Object, Object> read(Order order) throws Exception {
//此处直接获取Order中的状态,其实并没有进行持久化读取操作
return new DefaultStateMachineContext(order.getStatus(), null, null, null);
}
});
}
}

3.5 添加订单状态监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码
@Component("orderStateListener")
@WithStateMachine(name = "orderStateMachine")
public class OrderStateListenerImpl{

@OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
public boolean payTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
order.setStatus(OrderStatus.WAIT_DELIVER);
System.out.println("支付,状态机反馈信息:" + message.getHeaders().toString());
return true;
}

@OnTransition(source = "WAIT_DELIVER", target = "WAIT_RECEIVE")
public boolean deliverTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
order.setStatus(OrderStatus.WAIT_RECEIVE);
System.out.println("发货,状态机反馈信息:" + message.getHeaders().toString());
return true;
}

@OnTransition(source = "WAIT_RECEIVE", target = "FINISH")
public boolean receiveTransition(Message<OrderStatusChangeEvent> message){
Order order = (Order) message.getHeaders().get("order");
order.setStatus(OrderStatus.FINISH);
System.out.println("收货,状态机反馈信息:" + message.getHeaders().toString());
return true;
}
}

3.6 创建IOrderService接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码
public interface IOrderService {
//创建新订单
Order create();
//发起支付
Order pay(int id);
//订单发货
Order deliver(int id);
//订单收货
Order receive(int id);
//获取所有订单信息
Map<Integer, Order> getOrders();
}

3.7 在Service业务逻辑中应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
java复制代码
@Service("orderService")
public class OrderServiceImpl implements IOrderService {

@Autowired
private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;

@Autowired
private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, Order> persister;

private int id = 1;
private Map<Integer, Order> orders = new HashMap<>();

public Order create() {
Order order = new Order();
order.setStatus(OrderStatus.WAIT_PAYMENT);
order.setId(id++);
orders.put(order.getId(), order);
return order;
}

public Order pay(int id) {
Order order = orders.get(id);
System.out.println("线程名称:" + Thread.currentThread().getName() + " 尝试支付,订单号:" + id);
Message message = MessageBuilder.withPayload(OrderStatusChangeEvent.PAYED).
setHeader("order", order).build();
if (!sendEvent(message, order)) {
System.out.println("线程名称:" + Thread.currentThread().getName() + " 支付失败, 状态异常,订单号:" + id);
}
return orders.get(id);
}

public Order deliver(int id) {
Order order = orders.get(id);
System.out.println("线程名称:" + Thread.currentThread().getName() + " 尝试发货,订单号:" + id);
if (!sendEvent(MessageBuilder.withPayload(OrderStatusChangeEvent.DELIVERY)
.setHeader("order", order).build(), orders.get(id))) {
System.out.println("线程名称:" + Thread.currentThread().getName() + " 发货失败,状态异常,订单号:" + id);
}
return orders.get(id);
}

public Order receive(int id) {
Order order = orders.get(id);
System.out.println("线程名称:" + Thread.currentThread().getName() + " 尝试收货,订单号:" + id);
if (!sendEvent(MessageBuilder.withPayload(OrderStatusChangeEvent.RECEIVED)
.setHeader("order", order).build(), orders.get(id))) {
System.out.println("线程名称:" + Thread.currentThread().getName() + " 收货失败,状态异常,订单号:" + id);
}
return orders.get(id);
}


public Map<Integer, Order> getOrders() {
return orders;
}


/**
* 发送订单状态转换事件
*
* @param message
* @param order
* @return
*/
private synchronized boolean sendEvent(Message<OrderStatusChangeEvent> message, Order order) {
boolean result = false;
try {
orderStateMachine.start();
//尝试恢复状态机状态
persister.restore(orderStateMachine, order);
//添加延迟用于线程安全测试
Thread.sleep(1000);
result = orderStateMachine.sendEvent(message);
//持久化状态机状态
persister.persist(orderStateMachine, order);
} catch (Exception e) {
e.printStackTrace();
} finally {
orderStateMachine.stop();
}
return result;
}
}

3.8 编写客户端测试代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码

@SpringBootApplication
public class Test {
public static void main(String[] args) {

Thread.currentThread().setName("主线程");

ConfigurableApplicationContext context = SpringApplication.run(Test.class,args);

IOrderService orderService = (IOrderService)context.getBean("orderService");

orderService.create();
orderService.create();

orderService.pay(1);

new Thread("客户线程"){
@Override
public void run() {
orderService.deliver(1);
orderService.receive(1);
}
}.start();

orderService.pay(2);
orderService.deliver(2);
orderService.receive(2);

System.out.println("全部订单状态:" + orderService.getOrders());

}
}

通过这个真实的业务案例,相信小伙伴们已经对状态模式有了一个非常深刻的理解。
关注『 Tom弹架构 』回复“设计模式”可获取完整源码。

【推荐】Tom弹架构:30个设计模式真实案例(附源码),挑战年薪60W不是梦

本文为“Tom弹架构”原创,转载请注明出处。技术在于分享,我分享我快乐!如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力。关注『 Tom弹架构 』可获取更多技术干货!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

谷歌插件下载,谷歌插件免费下载

发表于 2021-11-16

大家都知道谷歌商店有很多很多好用的插件,但是我们却并没有办法进行访问。

在这里插入图片描述

于是我自己开发了一个谷歌商店,可以下载各种各样的谷歌插件:

www.cxyhub.com/

大家直接点击就可以访问了。基本覆盖了各种常用的插件。

在这里插入图片描述

第一版本用的 wordpress,后来我用 java 开发了后台,自己写了爬虫~

大家帮忙测评一下,看看有没有什么缺点~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官:你给我说一下什么是时间轮吧?

发表于 2021-11-16

你好呀,我是歪歪。

今天我带大家来卷一下时间轮吧,这个玩意其实还是挺实用的。

常见于各种框架之中,偶现于面试环节,理解起来稍微有点难度,但是知道原理之后也就觉得:

大多数人谈到时间轮的时候都会从 netty 开始聊。

我就不一样了,我想从 Dubbo 里面开始讲,毕竟我第一次接触到时间轮其实是在 Dubbo 里面,当时就惊艳到我了。

而且,Dubbo 的时间轮也是从 Netty 的源码里面拿出来的,基本一模一样。

时间轮在 Dubbo 里面有好几次使用,比如心跳包的发送、请求调用超时时间的检测、还有集群容错策略里面。

我就从 Dubbo 里面这个类说起吧:

org.apache.dubbo.rpc.cluster.support.FailbackClusterInvoker

Failback,属于集群容错策略的一种:

你不了解 Dubbo 也没有关系,你只需要知官网上是这样介绍它的就行了:

我想突出的点在于“定时重发”这四个字。

我们先不去看源码,提到定时重发的时候,你想到了什么东西?

是不是想到了定时任务?

那么怎么去实现定时任务呢?

大家一般都能想到 JDK 里面提供的 ScheduledExecutorService 和 Timer 这两个类。

Timer 就不多说了,性能不够高,现在已经不建议使用这个东西。

ScheduledExecutorService 用的还是相对比较多的,它主要有三个类型的方法:

简单说一下 scheduleAtFixedRate 和 scheduleWithFixedDelay 这两个方法。

ScheduleAtFixedRate,是每次执行时间为上一次任务开始起向后推一个时间间隔。

ScheduleWithFixedDelay,是每次执行时间为上一次任务结束起向后推一个时间间隔。

前者强调的是上一个任务的开始时间,后者强调的是上一个任务的结束时间。

你也可以理解为 ScheduleAtFixedRate 是基于固定时间间隔进行任务调度,而 ScheduleWithFixedDelay 取决于每次任务执行的时间长短,是基于不固定时间间隔进行任务调度。

所以,如果是我们要基于 ScheduledExecutorService 来实现前面说的定时重发功能,我觉得是用 ScheduleWithFixedDelay 好一点,含义为前一次重试完成后才应该隔一段时间进行下一次重试。

让整个重试功能串行化起来。

那么 Dubbo 到底是怎么实现这个定时重试的需求的呢?

撸源码啊,源码之下无秘密。

准备发车。

撸源码

有的同学看到这里可能着急了:不是说讲时间轮吗,怎么又开始撸源码了呀?

你别猴急呀,我这不得循序渐进嘛。

我先带你手撕一波 Dubbo 的源码,让你知道源码这样写的问题是啥,然后我再说解决方案嘛。

再说了,我直接,啪的一下,把解决方案扔你脸上,你也接受不了啊。

我喜欢温柔一点的教学方式。

好了,先看下面的源码。

这几行代码你要是没看明白没有关系,你主要关注 catch 里面的逻辑。

我把代码和官网上的介绍帮你对应了一下。

意思就是调用失败了,还有一个 addFailed 来兜底。

addFailed 是干了啥事呢?

干的就是“定时重发”这事:

org.apache.dubbo.rpc.cluster.support.FailbackClusterInvoker#addFailed

这个方法就可以回答前面我们提出的问题:Dubbo 集群容错里面,到底是怎么实现这个定时重试的需求的呢?

从标号为 ① 的地方可以知道,用的就是 ScheduledExecutorService,具体一点就是用的 scheduleWithFixedDelay 方法。

再具体一点就是如果集群容错采用的是 failback 策略,那么在请求调用失败的 RETRY_FAILED_PERIOD 秒之后,以每隔 RETRY_FAILED_PERIOD 秒一次的频率发起重试,直到重试成功。

RETRY_FAILED_PERIOD 是多少呢?

看第 52 行,它是 5 秒。

另外,你可以在前面 addFailed 方法中看到标号为 ③ 的地方,是在往 failed 里面 put 东西。

failed 又是一个什么东西呢?

看前面的 61 行,是一个 ConcurrentHashMap。

标号为 ③ 的地方,往 failed put 的 key 就是这一次需要重试的请求,value 是处理这一次请求对应的服务端。

failed 这个 map 是什么时候用呢?

请看标号为 ② 的 retryFailed 方法:

在这个方法里面会去遍历 failed 这个 map,全部拿出来再次调用一遍。

如果成功了就调用 remove 方法移除这个请求,没有成功的会抛出异常,打印日志,然后等待下次再次重试。

到这里我们就算是揭开了 Dubbo 的 FailbackClusterInvoker 类的神秘面纱。

面纱之下,隐藏的就是一个 map 加 ScheduledExecutorService。

感觉好像也没啥难的啊,很常规的解决方案嘛,我也能想到啊。

于是你缓缓的在屏幕上打出一个:

但是,朋友们,抓好坐稳,要“但是”了,要转弯了。

这里面其实是有问题的,最直观的就是这个 map,没有限制大小,由于没有限制大小,那么在一些高并发的场景下,是有可能出现内存溢出的。

好,那么问题来了,怎么防止内存溢出呢?

很简单,首先我们可以限制 map 的大小,对吧。

比如限制它的容量为 1000。

满了之后,怎么办呢?

可以搞一个淘汰策略嘛,先进先出(FIFO),或者后进先出(LIFO)。

然后也不能一直重试,如果重试超过了一定次数应该被干掉才对。

上面说的内存溢出和解决方案,都不是我乱说的。

我都是有证据的,因为我是从 FailbackClusterInvoker 这个类的提交记录上看到了它的演进过程的,前面截图的代码也是优化之前版本的代码,并不是最新的代码:

这一次提交,提到了一个编号叫 2425 的 issue。

github.com/apache/dubb…

这里面提到的问题和解决方案,就是我前面说的事情。

终于,铺垫完成,关于时间轮的故事要正式开始了。

时间轮原理

有的朋友又开始猴急了。

要我赶紧上时间轮的源码。

你别着急啊,我直接给你讲源码,你肯定会看懵逼的。

所以我决定,先给你画图,看懂原理。

给大家画一下时间轮的基本样子,理解了时间轮的工作原理,下面的源码解析理解起来也就相对轻松一点了。

首先时间轮最基本的结构其实就是一个数组,比如下面这个长度为 8 的数组:

怎么变成一个轮呢?

首尾相接就可以了:

假如每个元素代表一秒钟,那么这个数组一圈能表达的时间就是 8 秒,就是这样的:

注意我前面强调的是一圈,为 8 秒。

那么 2 圈就是 16 秒, 3 圈就是 24 秒,100 圈就是 800 秒。

这个能理解吧?

我再给你配个图:

虽然数组长度只有 8,但是它可以在上叠加一圈又一圈,那么能表示的数据就多了。

比如我把上面的图的前三圈改成这样画:

希望你能看明白,看不明白也没有关系,我主要是要你知道这里面有一个“第几圈”的概念。

好了,我现在把前面的这个数组美化一下,从视觉上也把它变成一个轮子。

轮子怎么说?

轮子的英文是 wheel,所以我们现在有了一个叫做 wheel 的数组:

然后,把前面的数据给填进去大概是长这样的。

为了方便示意,我只填了下标为 0 和 3 的位置,其他地方也是一个意思:

那么问题就来了。假设这个时候我有一个需要在 800 秒之后执行的任务,应该是怎么样的呢?

800 mod 8 =0,说明应该挂在下标为 0 的地方:

假设又来一个 400 秒之后需要执行的任务呢?

同样的道理,继续往后追加即可:

不要误以为下标对应的链表中的圈数必须按照从小到大的顺序来,这个是没有必要的。

好,现在又来一个 403 秒后需要执行的任务,应该挂在哪儿?

403 mod 8 = 3,那么就是这样的:

我为什么要不厌其烦的给你说怎么计算,怎么挂到对应的下标中去呢?

因为我还需要引出一个东西:待分配任务的队列。

上面画 800 秒、 400 秒和 403 秒的任务的时候,我还省略了一步。

其实应该是这样的:

任务并不是实时挂到时间轮上去的,而是先放到一个待分配的队列中,等到特定的时间再把待分配队列中的任务挂到时间轮上去。

具体是什么时候呢?

下面讲源码的时候再说。

其实除了待分配队列外,还有一个任务取消的队列。

因为放入到时间轮的任务是可以被取消的。

比如在 Dubbo 里面,检验调用是否超时也用的是时间轮机制。

假设一个调用的超时时间是 5s,5s 之后需要触发任务,抛出超时异常。

但是如果请求在 2s 的时候就收到了响应,没有超时,那么这个任务是需要被取消的。

对应的源码就是这块,看不明白没关系,看一眼就行了,我只是为了证明我没有骗你:

org.apache.dubbo.remoting.exchange.support.DefaultFuture#received

原理画图出来大概就是这样,然后我还差一张图。

把源码里面的字段的名称给你对应到上面的图中去。

主要把这几个对象给你对应上,后面看源码就不会太吃力了:

对应起来是这样的:

注意左上角的“worker的工作范围”把整个时间轮包裹了起来,后面看源码的时候你会发现其实整个时间轮的核心逻辑里面没有线程安全的问题,因为 worker 这个单线程把所有的活都干完了。

最后,再提一嘴:比如在前面 FailbackClusterInvoker 的场景下,时间轮触发了重试的任务,但是还是失败了,怎么办呢?

很简单,再次把任务放进去就行了,所以你看源码里面,有一个叫做 rePut 的方法,干的就是这事:

org.apache.dubbo.rpc.cluster.support.FailbackClusterInvoker.RetryTimerTask#run

这里的含义就是如果重试出现异常,且没有超过指定重试次数,那么就可以再次把任务仍回到时间轮里面。

等等,我这里知道“重试次数”之后,还能干什么事儿呢?

比如如果你对接过微信支付,它的回调通知有这样的一个时间间隔:

我知道当前重试的次数,那么我就可以在第 5 次重试的时候把时间设置为 10 分钟,扔到时间轮里面去。

时间轮就可以实现上面的需求。

当然了,MQ 的延迟队列也可以,但是不是本文的讨论范围。

但是用时间轮来做上面这个需求还有一个问题:那就是任务在内存中,如果服务挂了就没有了,这是一个需要注意的地方。

除了 FailbackClusterInvoker 外,其实我觉得时间轮更合适的地方是做心跳。

这可太合适了, Dubbo 的心跳就是用的时间轮来做。

org.apache.dubbo.remoting.exchange.support.header.HeartbeatTimerTask#doTask

从上图可以看到,doTask 方法就是发送心跳包,每次发送完成之后调用 reput 方法,然后再次把发送心跳包的任务仍回给时间轮。

好了,不再扩展应用场景了。

接下来,进入源码分析,跟上节奏,不要乱,大家都能学。

开卷!

时间轮源码

前面把原理理解到位了,接下来就可以看一下我们的源码了。

先说明一下,为了方便我截图,下面的部分截图我是移动了源码的位置,所以可能和你看源码的时候有点不一样。

我们再次审视 Dubbo 的 FailbackClusterInvoker 类中关于时间轮的用法。

首先 failTimer 这个对象,是一个很眼熟的双重检查的单例模式:

这里初始化的 failTimer 就是 HashedWheelTimer 对象关键的逻辑是调用了它的构造方法。

所以,我们先从它的构造方法入手,开始撕它。

先说一下它的几个入参分别是干啥的:

  • threadFactory:线程工厂,可以指定线程的名称和是否是守护进程。
  • tickDuration:两个 tick 之间的时间间隔。
  • unit:tickDuration 的时间单位。
  • ticksPerWheel:时间轮里面的 tick 的个数。
  • maxPendingTimeouts:时间轮中最大等待任务的个数。

所以,Dubbo 这个时间轮的含义就是这样的:

创建一个线程名称为 failback-cluster-timer 的守护线程,每隔一秒执行一次任务。这个时间轮的大小为 32,最大的等待处理任务个数是 failbackTasks,这个值是可以配置的,默认值是 100。

但是很多其他的使用场景下,比如 Dubbo 检查调用是否超时,就没有送 maxPendingTimeouts 这个值:

org.apache.dubbo.remoting.exchange.support.DefaultFuture#TIME_OUT_TIMER

它甚至连 ticksPerWheel 都没有上送。

其实这两个参数都是有默认值的。ticksPerWheel 默认为 512。maxPendingTimeouts 默认为 -1,含义为对等待处理的任务个数不限制:

好了,现在我们整体看一下这个时间轮的构造方法,每一行的作用我都写上了注释:

有几个地方,我也单独拿出来给你说一下。

比如 createWheel 这个方法,如果你八股文背的熟悉的话,你就知道这里和 HashMap 里面确认容量的核心代码是一样一样的。

这也是我在源码注释里面提到的,时间轮里面数组的大小必须是 2 的 n 次方。

为什么,你问我为什么?

别问,问就是为了后面做位运算,操作骚,速度快,逼格高。

我相信下面的这一个代码片段不需要我来解释了,你要是不理解,就再去翻一番 HashMap 的八股文:

但是这一行代码我还是可以多说一句的 mask = wheel.length - 1。

因为我们已经知道 wheel.length 是 2 的 n 次方。

那么假设我们的定时任务的延迟执行时间是 x,那么它应该在时间轮的哪个格子里面呢?

是不是应该用 x 对长度取余,也就是这样计算: x % wheel.length。

但是,取余操作的效率其实不算高。

那么怎么能让这个操作快起来呢?

就是 wheel.length - 1。

wheel.length 是 2 的 n 次方,减一之后它的二级制的低位全部都是 1,举个例子就是这样式儿的:

所以 x % wheel.length = x & (wheel.length - 1)。

在源码里面 mask =wheel.length - 1。

那么 mask 在哪用的呢?

其中的一个地方就是在 Worker 类的 run 方法里面:

org.apache.dubbo.common.timer.HashedWheelTimer.Worker

这里计算出来的 idx 就是当前需要处理的数组的下标。

我这里只是告诉你 mask 确实是参与了 & 位运算,所以你看不懂这块的代码也没有关系,因为我还没讲到这里来。

所以没跟上的同学不要慌,我们接着往下看。

前面我们已经有一个时间轮了,那么怎么调用这个时间呢?

其实就是调用它的 newTimeout 方法:

这个方法有三个入参:

含义很明确,即指定任务(task)在指定时间(delay,unit)之后开始触发。

接下来解读一下 newTimeout 方法:

里面最关键的代码是 start 方法,我带大家看一下到底是在干啥:

分成上下两部分讲。

上面其实就是维护或者判断当前 HashedWheelTimer 的状态,从源码中我们知道状态有三个取值:

  • 0:初始化
  • 1:已启动
  • 2:已关闭

如果是初始化,那么通过一个 cas 操作,把状态更新为已启动,并执行 workerThread.start() 操作,启动 worker 线程。

下面这个部分就稍微有一点点费解了。

如果 startTime 等于 0,即没有被初始化的话,就调用 CountDownLatch 的 await 等待一下下。

而且这个 await 还是在主线程上的 await,主线程在这里等着 startTime 被初始化,这是个什么逻辑呢?

首先,我们要找一下 startTime 是在哪儿被初始化的。

就是在 Worker 的 run 方法里面,而这个方法就是在前面 workerThread.start() 的时候触发的:

org.apache.dubbo.common.timer.HashedWheelTimer.Worker

可以看到,对 startTime 初始化完成后,还判断了是否等于 0。也就是说 System.nanoTime() 方法是有可能返回为 0,一个小细节,如果你去要深究一下的话,也是很有趣的,我这里就不展开了。

startTime 初始化完成之后,立马执行了 startTimeInitialized.countDown() 操作。

这不就和这里呼应起来了吗?

主线程不马上就可以跑起来了吗?

那么问题就来了,这里大费周章的搞一个 startTime 初始化,搞不到主线程还不能继续往下执行是干啥呢?

当然是有用啦,回到 newTimeout 方法接着往下看:

我们分析一下上面这个等式哈。

首先 System.nanoTime() 是代码执行到这个地方的实时时间。

因为 delay 是一个固定值,所以 unit.toNanos(delay) 也是一个固定值。

那么 System.nanoTime()+unit.toNanos(delay) 就是这个任务需要被触发的纳秒数。

举个例子。

假设 System.nanoTime() = 1000,unit.toNanos(delay)=100。

那么这个任务被触发的时间点就是 1000+100=1100。

这个能跟上吧?

那么为什么要减去 startTime 呢?

startTime 我们前面分析了,其实初始化的时候也是 System.nanoTime(),初始化完成后就是一个固定值了。

那岂不是 System.nanoTime()-startTime 几乎趋近于 0?

这个等式 System.nanoTime()+unit.toNanos(delay)-startTime 的意义是什么呢?

是的,这就是我当时看源码的一个疑问。

但是后面我分析出来,其实整个等式里面只有 System.nanoTime() 是一个变量。

第一次计算的时候 System.nanoTime()-startTime 确实趋近于 0,但是当第二次触发的时候,即第二个任务来的时候,计算它的 deadline 的时候,System.nanoTime() 可是远大于 startTime 这个固定值的。

所以,第二次任务的执行时间点应该是当前时间加上指定的延迟时间减去 worker 线程的启动时间,后面的时间以此类推。

前面 newTimeout 方法就分析完了,也就是主线程在这个地方就执行完时间轮相关的逻辑了。

接下来该分析什么呢?

肯定是该轮到时间轮的 worker 线程上场发挥了啊。

worker 线程的逻辑都在 run 方法里面。

而核心逻辑就在一个 do-while 里面:

循环结束的条件是当前时间轮的状态不是启动状态。

也就是说,只要时间轮没有被调用 stop 逻辑,这个线程会一直在运行。

接下来我们逐行看一下循环里面的逻辑,这部分逻辑就是时间轮的核心逻辑。

首先是 final long deadline = waitForNextTick() 这一行,里面就很有故事:

首先你看这个方法名你就知道它是干啥的了。

是在这里面等待,直到下一个时刻的到来。

所以方法进来第一行就是计算下一个时刻的纳秒值是啥。

接着看 for 循环里面,前面部分都看的比较懵逼,只有标号为 ③ 的地方好理解的多,就是让当前线程睡眠指定时间。

所以前面的部分就是在算这个指定时间是什么。

怎么算的呢?

标号为 ① 的地方,前面部分还能看懂,

deadline - currentTime 算出来的就是还需要多长时间才会到下一个时间刻度。

后面直接就看不懂了。

里面的 1000000 好理解,单位是纳秒,换算一下就是 1 毫秒。

这个 999999 是啥玩意?

其实这里的 999999 是为了让算出来的值多 1 毫秒。

比如,deadline - currentTime 算出来是 1000123 纳秒,那么 1000123/1000000=1ms。

但是(1000123+999999)/1000000=2ms。

也就是说要让下面标号为 ③ 的地方,多睡 1ms。

这是为什么呢?

我也不知道,所以我先暂时不管了,留个坑嘛,问题不大,接着往下写。

下面就到了标号为 ② 的地方,看起来是对 windows 操作系统进行了特殊的处理,要把 sleepTimeMs 换算为 10 的倍数。

为啥?

这里我就得批评一下 Dubbo 了,把 Netty 的实现拿过来了,还把关键信息给隐藏了,这不合适吧。

这地方在 Netty 的源码中是这样的:

这里很清晰的指了个路:

github.com/netty/netty…

而顺着这条路,一路往下跟,会找到这样一个地方:

www.javamex.com/tutorials/t…

没想到还有意外收获。

第一个划线的地方大概意思是说当线程调用 Thread.sleep 方法的时候,JVM 会进行一个特殊的调用,将中断周期设置为 1ms。

因为 Thread.sleep 方法的实现是依托于操作系统提供的中断检查,也就是操作系统会在每一个中断的时候去检查是否有线程需要唤醒并且提供 CPU 资源。所以我觉得前面多睡 1ms 的原因就可以用这个原因来解释了。

前面留的坑,这么快就填上了,舒服。

而第二个划线的地方说的是,如果是 windows 的话,中断周期可能是 10ms 或者 15ms,具体和硬件相关。

所以,如果是 windows 的话,需要把睡眠时间调整为 10 的倍数。

一个没啥卵用的知识,送给你。

前面几个问题了解清楚了,waitForNextTick 方法也就理解到位了,它干的事儿就是等,等一个时间刻度的时间,等一个 tick 长度的时间。

等到了之后呢?

就来到了这一行代码 int idx = (int) (tick & mask)

我们前面分析过,计算当前时间对应的下标,位运算,操作骚,速度快,逼格高,不多说。

然后代码执行到这个方法 processCancelledTasks()

看方法名称就知道了,是处理被取消的任务的队列:

逻辑很简单,一目了然,就是把 cancelledTimeouts 队列给清空。

这里是在 remove,在清理。

那么哪里在 add,在添加呢?

就是在下面这个方法中:

org.apache.dubbo.common.timer.HashedWheelTimer.HashedWheelTimeout#cancel

如果调用了 HashedWheelTimeout 的 cancel 方法,那么这个任务就算是被取消了。

前面画图的时候就提到了这个方法,逻辑也很清晰,所以不多解释了。

但是你注意我画了下划线的地方:MpscLinkedQueue。

这是个啥?

这是一个非常牛逼的无锁队列。

但是 Dubbo 这里的 cancelledTimeouts 队列的数据结构明明用的是 LinkedBlockingQueue 呀?

怎么回事呢?

因为这里的注释是 Netty 里面的,Netty 里面用的是 MpscLinkedQueue。

你看我给你对比一下 Netty 和 Dubbo 这里的区别:

所以这里的注解是有误导的,你有时间的话可以给 Dubbo 提给 pr 修改一下。

又拿捏了一个小细节。

好了,我们接着往下卷,来到了这行代码 HashedWheelBucket bucket=wheel[idx]

一目了然,没啥说的。

从时间轮里面获取指定下标的 bucket。

主要看看它下面的这一行代码 transferTimeoutsToBuckets()

我还是每一行都加上注释:

所以这个方法的核心逻辑就是把等待分配的任务都发配到指定的 bucket 上去。

这里也就回答了我画图的时候留下的一个问题:什么时候把等待分配队列里面的任务挂到时间轮上去呢?

就是这个时候。

接下来分析 bucket.expireTimeouts(deadline) 这一行代码。

你看这个方法的调用方就是 bucket,它代表的含义就是准备开始处理这个 bucket 里面的这个链表中的任务了:

最后,还有一行代码 tick++

表示当前这个 tick 已经处理完成了,开始准备下一个时间刻度。

关键代码就分析完了。

一遍看不懂就多看一遍,但是我建议你自己也对照着源码一起看,很快就能搞懂。

相信以后面试官问到时间轮的时候你可以和他战斗上一个回合了。

为什么是一个回合呢?

因为得你回答完这个时间轮后,一般来说,面试官会追问一个:

嗯,说的很不错,那你再介绍一下层级时间轮吧?

当时你就懵逼了:什么,层级时间轮是什么鬼,歪歪没写啊?

是的,怪我,我没写,下次,下次一定。

但是我可以给你指条路,去看看 kafka 对于时间轮的优化。你会看的鼓起掌来。

几个相关的 issues

最后,关于 Dubbo 时间轮,在 issues 里面有一个讨论:

github.com/apache/dubb…

大家有兴趣的可以去看看。

其中提到了一个有意思的问题:

Netty 在 3.x 中有大量使用 HashedWheelTimer,但是在 4.1 中,我们可以发现,Netty 保留了 HashedWheelTimer,但在其源码中并未使用它,而是选择了 ScheduledThreadPoolExecutor,不知道它的用意是什么。

这个问题得到了 Netty 的维护者的亲自答:

github.com/netty/netty…

他的意思是时间轮其实没有任何毛病,我没有用只是因为我们希望与通道的EventLoop位于同一线程上。

在 Netty 里面,有个老哥发现时间轮并没有用上了,甚至想把它给干掉:

我寻思这属于工具类啊,你留着呗,总是会有用的。

另外,前面的 issue 还提到了另外一个问题:

github.com/apache/dubb…

这也是 Dubbo 引入时间轮之后进行的优化。

带你看一眼,上面是优化之后的,下面是之前的写法:

在之前的写法中,就是后台起一个线程,然后搞个死循环,一遍遍的去扫整个集合:

这种方案也能实现需求,但是和时间轮的写法比起来,高下立判。

操作骚,速度快,逼格高。

最后说一句

好了,看到了这里了,转发、在看、点赞随便安排一个吧,要是你都安排上我也不介意。写文章很累的,需要一点正反馈。

给各位读者朋友们磕一个了:

本文已收录自个人博客,欢迎大家来玩。

www.whywhy.vip/

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot+RabbitMQ 实现 RPC 调用

发表于 2021-11-16

说到 RPC(Remote Procedure Call Protocol 远程过程调用协议),小伙伴们脑海里蹦出的估计都是 RESTful API、Dubbo、WebService、Java RMI、CORBA 等。

其实,RabbitMQ 也给我们提供了 RPC 功能,并且使用起来很简单。

今天松哥通过一个简单的案例来和大家分享一下 Spring Boot+RabbitMQ 如何实现一个简单的 RPC 调用。

注意

关于 RabbitMQ 实现 RPC 调用,有的小伙伴可能会有一些误解,心想这还不简单?搞两个消息队列 queue_1 和 queue_2,首先客户端发送消息到 queue_1 上,服务端监听 queue_1 上的消息,收到之后进行处理;处理完成后,服务端发送消息到 queue_2 队列上,然后客户端监听 queue_2 队列上的消息,这样就知道服务端的处理结果了。

这种方式不是不可以,就是有点麻烦!RabbitMQ 中提供了现成的方案可以直接使用,非常方便。接下来我们就一起来学习下。

  1. 架构

先来看一个简单的架构图:

这张图把问题说的很明白了:

  1. 首先 Client 发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是 correlation_id,这个表示这条消息的唯一 id,还有一个内容是 reply_to,这个表示消息回复队列的名字。
  2. Server 从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to 指定的回调队列中。
  3. Client 从回调队列中读取消息,就可以知道消息的执行情况是什么样子了。

这种情况其实非常适合处理异步调用。

  1. 实践

接下来我们通过一个具体的例子来看看这个怎么玩。

2.1 客户端开发

首先我们来创建一个 Spring Boot 工程名为 producer,作为消息生产者,创建时候添加 web 和 rabbitmq 依赖,如下图:

项目创建成功之后,首先在 application.properties 中配置 RabbitMQ 的基本信息,如下:

1
2
3
4
5
6
properties复制代码spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

这个配置前面四行都好理解,我就不赘述,后面两行:首先是配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id,只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来。最后一行配置则是开启发送失败退回。

接下来我们来提供一个配置类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
java复制代码/**
* @author 江南一点雨
* @微信公众号 江南一点雨
* @网站 http://www.itboyhub.com
* @国际站 http://www.javaboy.org
* @微信 a_java_boy
* @GitHub https://github.com/lenve
* @Gitee https://gitee.com/lenve
*/
@Configuration
public class RabbitConfig {

public static final String RPC_QUEUE1 = "queue_1";
public static final String RPC_QUEUE2 = "queue_2";
public static final String RPC_EXCHANGE = "rpc_exchange";

/**
* 设置消息发送RPC队列
*/
@Bean
Queue msgQueue() {
return new Queue(RPC_QUEUE1);
}

/**
* 设置返回队列
*/
@Bean
Queue replyQueue() {
return new Queue(RPC_QUEUE2);
}

/**
* 设置交换机
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(RPC_EXCHANGE);
}

/**
* 请求队列和交换器绑定
*/
@Bean
Binding msgBinding() {
return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
}

/**
* 返回队列和交换器绑定
*/
@Bean
Binding replyBinding() {
return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
}


/**
* 使用 RabbitTemplate发送和接收消息
* 并设置回调队列地址
*/
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setReplyAddress(RPC_QUEUE2);
template.setReplyTimeout(6000);
return template;
}


/**
* 给返回队列设置监听器
*/
@Bean
SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RPC_QUEUE2);
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}
}

这个配置类中我们分别配置了消息发送队列 msgQueue 和消息返回队列 replyQueue,然后将这两个队列和消息交换机进行绑定。这个都是 RabbitMQ 的常规操作,没啥好说的。

在 Spring Boot 中我们负责消息发送的工具是 RabbitTemplate,默认情况下,系统自动提供了该工具,但是这里我们需要对该工具重新进行定制,主要是添加消息发送的返回队列,最后我们还需要给返回队列设置一个监听器。

好啦,接下来我们就可以开始具体的消息发送了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码/**
* @author 江南一点雨
* @微信公众号 江南一点雨
* @网站 http://www.itboyhub.com
* @国际站 http://www.javaboy.org
* @微信 a_java_boy
* @GitHub https://github.com/lenve
* @Gitee https://gitee.com/lenve
*/
@RestController
public class RpcClientController {

private static final Logger logger = LoggerFactory.getLogger(RpcClientController.class);

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/send")
public String send(String message) {
// 创建消息对象
Message newMessage = MessageBuilder.withBody(message.getBytes()).build();

logger.info("client send:{}", newMessage);

//客户端发送消息
Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage);

String response = "";
if (result != null) {
// 获取已发送的消息的 correlationId
String correlationId = newMessage.getMessageProperties().getCorrelationId();
logger.info("correlationId:{}", correlationId);

// 获取响应头信息
HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders();

// 获取 server 返回的消息 id
String msgId = (String) headers.get("spring_returned_message_correlation");

if (msgId.equals(correlationId)) {
response = new String(result.getBody());
logger.info("client receive:{}", response);
}
}
return response;
}
}

这块的代码其实也都是一些常规代码,我挑几个关键的节点说下:

  1. 消息发送调用 sendAndReceive 方法,该方法自带返回值,返回值就是服务端返回的消息。
  2. 服务端返回的消息中,头信息中包含了 spring_returned_message_correlation 字段,这个就是消息发送时候的 correlation_id,通过消息发送时候的 correlation_id 以及返回消息头中的 spring_returned_message_correlation 字段值,我们就可以将返回的消息内容和发送的消息绑定到一起,确认出这个返回的内容就是针对这个发送的消息的。

这就是整个客户端的开发,其实最最核心的就是 sendAndReceive 方法的调用。调用虽然简单,但是准备工作还是要做足够。例如如果我们没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id,这样就无法将返回的消息内容和发送的消息内容关联起来。

2.2 服务端开发

再来看看服务端的开发。

首先创建一个名为 consumer 的 Spring Boot 项目,创建项目添加的依赖和客户端开发创建的依赖是一致的,不再赘述。

然后配置 application.properties 配置文件,该文件的配置也和客户端中的配置一致,不再赘述。

接下来提供一个 RabbitMQ 的配置类,这个配置类就比较简单,单纯的配置一下消息队列并将之和消息交换机绑定起来,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
java复制代码/**
* @author 江南一点雨
* @微信公众号 江南一点雨
* @网站 http://www.itboyhub.com
* @国际站 http://www.javaboy.org
* @微信 a_java_boy
* @GitHub https://github.com/lenve
* @Gitee https://gitee.com/lenve
*/
@Configuration
public class RabbitConfig {

public static final String RPC_QUEUE1 = "queue_1";
public static final String RPC_QUEUE2 = "queue_2";
public static final String RPC_EXCHANGE = "rpc_exchange";

/**
* 配置消息发送队列
*/
@Bean
Queue msgQueue() {
return new Queue(RPC_QUEUE1);
}

/**
* 设置返回队列
*/
@Bean
Queue replyQueue() {
return new Queue(RPC_QUEUE2);
}

/**
* 设置交换机
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(RPC_EXCHANGE);
}

/**
* 请求队列和交换器绑定
*/
@Bean
Binding msgBinding() {
return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
}

/**
* 返回队列和交换器绑定
*/
@Bean
Binding replyBinding() {
return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
}
}

最后我们再来看下消息的消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Component
public class RpcServerController {
private static final Logger logger = LoggerFactory.getLogger(RpcServerController.class);
@Autowired
private RabbitTemplate rabbitTemplate;

@RabbitListener(queues = RabbitConfig.RPC_QUEUE1)
public void process(Message msg) {
logger.info("server receive : {}",msg.toString());
Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build();
CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData);
}
}

这里的逻辑就比较简单了:

  1. 服务端首先收到消息并打印出来。
  2. 服务端提取出原消息中的 correlation_id。
  3. 服务端调用 sendAndReceive 方法,将消息发送给 RPC_QUEUE2 队列,同时带上 correlation_id 参数。

服务端的消息发出后,客户端将收到服务端返回的结果。

OK,大功告成。

2.3 测试

接下来我们进行一个简单测试。

首先启动 RabbitMQ。

接下来分别启动 producer 和 consumer,然后在 postman 中调用 producer 的接口进行测试,如下:

可以看到,已经收到了服务端的返回信息。

来看看 producer 的运行日志:

可以看到,消息发送出去后,同时也收到了 consumer 返回的信息。

可以看到,consumer 也收到了客户端发来的消息。

  1. 小结

好啦,一个小小的案例,带小伙伴们体验一把 RabbitMQ 实现 RPC 调用。

公众号江南一点雨后台回复 mq_rpc 可以获取本文案例哦~感兴趣的小伙伴可以试试~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

5【Jenkins从零到壹】:集成DingTalk 插件实现

发表于 2021-11-16

这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战」

❤️作者简介:大家好,我是小虚竹。Java领域优质创作者🏆,CSDN博客专家认证🏆,华为云享专家认证🏆

❤️技术活,该赏

❤️点赞 👍 收藏 ⭐再看,养成习惯

Jenkins使用教程相关系列 目录


之前都是在jenkins构建时,时不时要上去jenkins后台看下构建的情况,不清楚什么时候能构建成功,或担心会构建失败,生命中最煎熬的莫过于等待了。很不方便,后面找到这款DingTalk插件,再也不用苦苦等待了。

安装DingTalk 插件

img

从可选插件中搜索:“DingTalk”插件,记得安装后重启Jenkins!记得安装后重启Jenkins!记得安装后重启Jenkins!

img

系统配置统一设置钉钉基本信息

img

按图中的设置来:

1、id不用写

2、名称自定义

3、webhook:钉钉群中的机器人设置可以拿到

img

点击测试:钉钉群中会收到一条消息推送,是不是很神奇

img

点击应用,再点击保存

设置项目,勾选对应的机器人

img

img

钉钉设置

1、如图打开钉钉群的配置:智能群助手

img

2、添加机器人

img

如图设置机器人内容:

img

img

点击保存,注意图上的文字描述。

注意

测试过程中,发现最新版本的DingTalk 无法在项目中体现钉钉机器人配置,后来通过降版本的方式,解决了这个问题

img

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

顶级Java才懂的,基准测试JMH!

发表于 2021-11-16

原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。

最近在手写一个ID生成器,需要比较UUID和目前比较流行的 NanoID之间的速度差异,当然也要测一下根据规则自创的ID生成器。

这样的代码属于最基础的API,速度哪怕减上几纳秒,累加起来也是很可观的。关键是,我该如何评估ID的生成速度呢?

  1. 如何统计性能?

常见的方法,是写一些统计代码。这些代码,穿插在我们的逻辑中,进行一些简单的计时运算。比如下面这几行:

1
2
3
4
java复制代码long start = System.currentTimeMillis();
//logic
long cost = System.currentTimeMillis() - start;
System.out.println("Logic cost : " + cost);

这样的统计方式,用在业务代码里,哪怕是APM里,并不见得有什么问题。

可惜的是,这段代码的统计结果,并不见得一定准确。举个例子来说,JVM在执行时,会对一些代码块,或者一些频繁执行的逻辑,进行JIT编译和内联优化,在得到一个稳定的测试结果之前,需要先循环上上万次,进行预热。预热前和预热后的性能差别是非常大的。

另外,评估性能,有很多的指标。如果这些指标数据,每次都要手工去算的话,那肯定是枯燥乏味且低效的。

JMH(the Java Microbenchmark Harness) 就是这样一个能够做基准测试的工具。如果你通过我们一系列的工具,定位到了热点代码,要测试它的性能数据,评估改善情况,就可以交给JMH。它的测量精度非常高,最高可达到纳秒的级别。

JMH已经在JDK 12中被包含,其他版本的需要自行引入maven,坐标如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
xml复制代码<dependencies>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
<scope>provided</scope>
</dependency>
</dependencies>

下面,我们介绍一下这个工具的使用。

  1. 关键注解

JMH是一个jar包,它和单元测试框架JUnit非常的像,可以通过注解进行一些基础配置。这部分配置有很多是可以通过main方法的OptionsBuilder进行设置的。

image.png

上图是一个典型的JMH程序执行的内容。通过开启多个进程,多个线程,首先执行预热,然后执行迭代,最后汇总所有的测试数据进行分析。在执行前后,还可以根据粒度处理一些前置和后置操作。

一个简单的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@Threads(2)
public class BenchmarkTest {
@Benchmark
public long shift() {
long t = 455565655225562L;
long a = 0;
for (int i = 0; i < 1000; i++) {
a = t >> 30;
}
return a;
}

@Benchmark
public long div() {
long t = 455565655225562L;
long a = 0;
for (int i = 0; i < 1000; i++) {
a = t / 1024 / 1024 / 1024;
}
return a;
}

public static void main(String[] args) throws Exception {
Options opts = new OptionsBuilder()
.include(BenchmarkTest.class.getSimpleName())
.resultFormat(ResultFormatType.JSON)
.build();
new Runner(opts).run();
}
}

下面,我们逐一介绍一下比较关键的注解和参数。

@Warmup

样例。

1
2
3
4
java复制代码@Warmup(
iterations = 5,
time = 1,
timeUnit = TimeUnit.SECONDS)

我们不止一次提到预热,warmup这个注解,可以用在类或者方法上,进行预热配置。可以看到,它有几个配置参数。

  • timeUnit:时间的单位,默认的单位是秒。
  • iterations:预热阶段的迭代数。
  • time:每次预热的时间。
  • batchSize:批处理大小,指定了每次操作调用几次方法。

上面的注解,意思是对代码预热总计5秒(迭代5次,每次一秒) 。预热过程的测试数据,是不记录测量结果的。

我们可以看一下它执行的效果:

1
2
3
4
python复制代码# Warmup: 3 iterations, 1 s each
# Warmup Iteration 1: 0.281 ops/ns
# Warmup Iteration 2: 0.376 ops/ns
# Warmup Iteration 3: 0.483 ops/ns

一般来说,基准测试都是针对的比较小的、执行速度相对较快的代码块。这些代码有很大的可能被编译、内联,在编码的时候保持方法的精简,对JIT也是有好的。

说到预热,就不得不提一下在分布式环境下的服务预热。在对服务节点进行发布的时候,通常也会有预热过程,逐步放量到相应的服务节点,直到服务达到最优状态。如下图所示,负载均衡负责这个放量过程,一般是根据百分比进行放量。

image.png

@Measurement

样例如下。

1
2
3
4
java复制代码@Measurement(
iterations = 5,
time = 1,
timeUnit = TimeUnit.SECONDS)

Measurement和Warmup的参数是一样的。不同于预热,它指的是真正的迭代次数。

我们能够从日志中看到这个执行过程:

1
2
3
4
5
6
python复制代码# Measurement: 5 iterations, 1 s each
Iteration 1: 1646.000 ns/op
Iteration 2: 1243.000 ns/op
Iteration 3: 1273.000 ns/op
Iteration 4: 1395.000 ns/op
Iteration 5: 1423.000 ns/op

虽然经过预热之后,代码都能表现出它的最优状态,但一般和实际应用场景还是有些出入的。如果你的测试机器性能很高,或者你的测试机资源利用已经达到了极限,都会影响测试结果的数值。通常情况下,我都会在测试的时候,给机器充足的资源,保持一个稳定的环境。在分析结果的时候,也更加关注不同实现方式的性能差异,而不是测试数据本身。

@BenchmarkMode

此注解用来指定基准测试类型,对应Mode选项,用来修饰类和方法都可以。这里的value,是一个数组,可以配置多个统计维度。比如:

@BenchmarkMode({Throughput,Mode.AverageTime})。统计的就是吞吐量和平均执行时间两个指标。

所谓的模式,在JMH中,可以分为以下几种:

  • Throughput: 整体吞吐量,比如QPS,单位时间内的调用量等。
  • AverageTime: 平均耗时,指的是每次执行的平均时间。如果这个值很小不好辨认,可以把统计的单位时间调小一点。
  • SampleTime: 随机取样。
  • SingleShotTime: 如果你想要测试仅仅一次的性能,比如第一次初始化花了多长时间,就可以使用这个参数,其实和传统的main方法没有什么区别。
  • All: 所有的指标,都算一遍,你可以设置成这个参数看下效果。

我们拿平均时间,看一下一个大体的执行结果:

1
2
3
4
python复制代码Result "com.github.xjjdog.tuning.BenchmarkTest.shift":
2.068 ±(99.9%) 0.038 ns/op [Average]
(min, avg, max) = (2.059, 2.068, 2.083), stdev = 0.010
CI (99.9%): [2.030, 2.106] (assumes normal distribution)

由于我们声明的时间单位是纳秒,本次shift方法的平均响应时间就是2.068纳秒。

我们也可以看下最终的耗时时间。

1
2
3
python复制代码Benchmark            Mode  Cnt  Score   Error  Units
BenchmarkTest.div avgt 5 2.072 ± 0.053 ns/op
BenchmarkTest.shift avgt 5 2.068 ± 0.038 ns/op

由于是平均数,这里的Error值的是误差的意思(或者波动)。

可以看到,在衡量这些指标的时候,都有一个时间维度,它就是通过**@OutputTimeUnit**注解进行配置的。

这个就比较简单了,它指明了基准测试结果的时间类型。可用于类或者方法上。一般选择秒、毫秒、微秒,纳秒那是针对的速度非常快的方法。

举个例子,@BenchmarkMode(Mode.Throughput)和@OutputTimeUnit(TimeUnit.MILLISECONDS)进行组合,代表的就是每毫秒的吞吐量。

如下面的关于吞吐量的结果,就是以毫秒计算的。

1
2
3
python复制代码Benchmark             Mode  Cnt       Score       Error   Units
BenchmarkTest.div thrpt 5 482999.685 ± 6415.832 ops/ms
BenchmarkTest.shift thrpt 5 480599.263 ± 20752.609 ops/ms

OutputTimeUnit注解同样可以修饰类或者方法,通过更改时间级别,可以获取更加易读的结果。

@Fork

fork的值一般设置成1,表示只使用一个进程进行测试;如果这个数字大于1,表示会启用新的进程进行测试;但如果设置成0,程序依然会运行,不过这样是在用户的JVM进程上运行的,可以看下下面的提示,但不推荐这么做。

1
2
3
bash复制代码# Fork: N/A, test runs in the host VM
# *** WARNING: Non-forked runs may silently omit JVM options, mess up profilers, disable compiler hints, etc. ***
# *** WARNING: Use non-forked runs only for debugging purposes, not for actual performance runs. ***

那么fork到底是在进程还是线程环境里运行呢?我们追踪一下JMH的源码,发现每个fork进程是单独运行在Proccess进程里的,这样就可以做完全的环境隔离,避免交叉影响。它的输入输出流,通过Socket连接的模式,发送到我们的执行终端。

image.png

在这里分享一个小技巧。其实fork注解有一个参数叫做jvmArgsAppend,我们可以通过它传递一些JVM的参数。

1
java复制代码@Fork(value = 3, jvmArgsAppend = {"-Xmx2048m", "-server", "-XX:+AggressiveOpts"})

在平常的测试中,也可以适当增加fork数,来减少测试的误差。

@Threads

fork是面向进程的,而Threads是面向线程的。指定了这个注解以后,将会开启并行测试。

如果配置了 Threads.MAX ,则使用和处理机器核数相同的线程数。

@Group

@Group注解只能加在方法上,用来把测试方法进行归类。如果你单个测试文件中方法比较多,或者需要将其归类,则可以使用这个注解。

与之关联的@GroupThreads注解,会在这个归类的基础上,再进行一些线程方面的设置。

@State

@State 指定了在类中变量的作用范围。它有三个取值。

@State 用于声明某个类是一个“状态”,可以用Scope 参数用来表示该状态的共享范围。这个注解必须加在类上,否则提示无法运行。

Scope有如下3种值:

  • Benchmark:表示变量的作用范围是某个基准测试类。
  • Thread:每个线程一份副本,如果配置了Threads注解,则每个Thread都拥有一份变量,它们互不影响。
  • Group:联系上面的@Group注解,在同一个Group里,将会共享同一个变量实例。

在JMHSample04DefaultState测试文件中,演示了变量x的默认作用范围是Thread,关键代码如下:

1
2
3
4
5
6
7
8
java复制代码@State(Scope.Thread)
public class JMHSample_04_DefaultState {
double x = Math.PI;
@Benchmark
public void measure() {
x++;
}
}

@Setup和@TearDown

和单元测试框架JUnit类似,用于基准测试前的初始化动作, @TearDown 用于基准测试后的动作,来做一些全局的配置。

这两个注解,同样有一个Level值,标明了方法运行的时机,它有三个取值。

  • Trial:默认的级别。也就是Benchmark级别。
  • Iteration:每次迭代都会运行。
  • Invocation:每次方法调用都会运行,这个是粒度最细的。

@Param

@Param 注解只能修饰字段,用来测试不同的参数,对程序性能的影响。配合@State注解,可以同时制定这些参数的执行范围。

代码样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@State(Scope.Benchmark)
public class JMHSample_27_Params {
@Param({"1", "31", "65", "101", "103"})
public int arg;
@Param({"0", "1", "2", "4", "8", "16", "32"})
public int certainty;
@Benchmark
public boolean bench() {
return BigInteger.valueOf(arg).isProbablePrime(certainty);
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(JMHSample_27_Params.class.getSimpleName())
// .param("arg", "41", "42") // Use this to selectively constrain/override parameters
.build();

new Runner(opt).run();
}
}

值得注意的是,如果你设置了非常多的参数,这些参数将执行多次,通常会运行很长时间。比如参数1 M个,参数2 N个,那么总共要执行M*N次。

下面是一个执行结果的截图。

image.png

@CompilerControl

这可以说是一个非常有用的功能了。

Java中方法调用的开销是比较大的,尤其是在调用量非常大的情况下。拿简单的getter/setter方法来说,这种方法在Java代码中大量存在。我们在访问的时候,就需要创建相应的栈帧,访问到需要的字段后,再弹出栈帧,恢复原程序的执行。

如果能够把这些对象的访问和操作,纳入到目标方法的调用范围之内,就少了一次方法调用,速度就能得到提升,这就是方法内联的概念。如图所示,代码经过JIT编译之后,效率会有大的提升。

image.png

这个注解可以用在类或者方法上,能够控制方法的编译行为,常用的有3种模式。

强制使用内联(INLINE),禁止使用内联(DONT_INLINE),甚至是禁止方法编译(EXCLUDE)等。

2.将结果图形化

使用JMH测试的结果,可以二次加工,进行图形化展示。结合图表数据,更加直观。通过运行时,指定输出的格式文件,即可获得相应格式的性能测试结果。

比如下面这行代码,就是指定输出JSON格式的数据。

1
2
3
java复制代码Options opt = new OptionsBuilder()
.resultFormat(ResultFormatType.JSON)
.build();

JMH支持以下5种格式的结果:

  • TEXT 导出文本文件。
  • CSV 导出csv格式文件。
  • SCSV 导出scsv等格式的文件。
  • JSON 导出成json文件。
  • LATEX 导出到latex,一种基于ΤΕΧ的排版系统。

一般来说,我们导出成CSV文件,直接在Excel中操作,生成相应的图形就可以了。

image.png

另外介绍几个可以做图的工具:

JMH Visualizer
这里有一个开源的项目(jmh.morethan.io/) ,通过导出json文件,上传之后,可得到简单的统计结果。个人认为它的展示方式并不是很好。

jmh-visual-chart

相比较而言,下面这个工具(deepoove.com/jmh-visual-… ,就相对直观一些。

image.png

meta-chart

一个通用的在线图表生成器。 (www.meta-chart.com/),导出CSV文件后,…

image.png

像Jenkins等一些持续集成工具,也提供了相应的插件,用来直接显示这些测试结果。

END

这个工具非常好用,它使用确切的测试数据,来支持我们的分析结果。一般情况下,如果定位到热点代码,就需要使用基准测试工具进行专项优化,直到性能有了显著的提升。

在我们的这个场景中,就发现使用NanoID,确实是比UUID要快上好多。

作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构和Linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。我的个人微信xjjdog0,欢迎添加好友,进一步交流。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring 嵌套事务提交时机对其他查询操作的影响分析

发表于 2021-11-16

背景

Spring 的 @Transactional 注解可以轻松实现数据库操作的事务控制,本文介绍上周遇到的一个嵌套事务的问题,探讨嵌套事务的提交时机对后续操作的影响。

服务调用链上的事务使用

一个调用链上,方法 A 调用了方法 B ,方法 A 和 B 中同时存在数据库操作,大概伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
csharp复制代码public void A() {
1、数据库查询
2、数据库插入
3、调用方法 B
4、B(); // 另一个类的方法
5、 数据库查询:查询方法 B 插入的数据
6、 数据库操作:删除方法 B 插入的数据
}

public void B {
}

这里讨论事务注解添加的位置,对操作步骤5 和 6 的影响。

情况一:只在 A 上加事务,所有操作在同一个事务中,操作步骤5 和 6 的数据依赖 4 的数据,而且数据是一致的。@Transactional 注解标注的流程中,所有 DAO 的操作在同一个事务中,对于 Propagation.REQUIRES_NEW 的事务来说,前面的数据库写操作,对后面的读操作是可见的。

情况二:A 和 B 上都有事务,步骤4调用 B 方法后向某表插入了数据,5 再查询查不到,但是 6 删除时有删除记录,说明 B 方法的事务的提交是在写操作时提交的。此时,要想让 步骤 5 立即读取到方法 B 插入的数据,可以修改 SQL 加上 for update 强制提交。

情况三:B 上没用事务注解,而是用 TransactionTemplate 手动提交插入操作,那么后面操作数据也能保持一致。

启示录

之所以在方法 B 上添加新事务,是为了不让方法 B 的操作受到方法 A 的异常的影响,但是如果二者数据有查询依赖的话,查询操作不会带来 B 事务的提交,所以造成数据不一致问题。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

irisMVC包的使用

发表于 2021-11-16

「这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战」。

mvc

::: tip mvc包简介

  • 在Iris框架中,封装了mvc包作为对mvc架构的支持,方便开发者遵循mvc的开发原则进行开发。
  • Iris框架支持请求数据、模型、持久数据分层处理,并支持各层级模块代码绑定执行。
  • MVC即:model、view、controller三个部分,分别代表数据层、视图层、控制层。控制器层负责完成页面逻辑、实体层负责完成数据准备与数据操作、视图层负责展现UI效果。
    :::

1 mvc.Application

iris框架中的mvc包中提供了Application结构体定义。开发者可以通过注册自定义的controller来使用对应提供的API,其中包含路由组router.Party,以此用来注册layout、middleware以及相应的handlers等。

2 iris.mvc特性

iris框架封装的mvc包,支持所有的http方法。比如,如果想要提供GET,那么控制器应该有一个名为Get()的函数,开发者可以定义多个方法函数在同一个Controller中提供。这里的Get、Post方法是指的直接和八种请求类型同名的方法,mvc模块会自动执行到Get()、Post()等八种对应的方法。如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
go复制代码package main

import (
"github.com/kataras/iris/v12"
"github.com/kataras/iris/v12/mvc"
)
//自定义的控制器
type CustomController struct{}

//自动处理基础的Http请求
//Url: http://localhost:8000
//Type:GET请求
func (cc *CustomController) Get() mvc.Result{
//todo
return mvc.Response{
ContentType:"text/html",
}
}
/**
* Url:http://localhost:8000
* Type:POST
**/
func (cc *CustomController) Post() mvc.Result{
//todo
return mvc.Response{}
}
func main() {
app := iris.New()
//注册自定义控制器处理请求
mvc.New(app).Handle(new(CustomController))
app.Run(iris.Addr(":8085"))
}

3 MVC使用中间件示例

main包显示了如何将中间件添加到mvc应用程序中,
使用它的Router,它是主iris app的子路由器(iris.Party)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
go复制代码package main
import (
"github.com/kataras/iris/v12"
"github.com/kataras/iris/v12/cache"
"github.com/kataras/iris/v12/mvc"
"time"
)
var cacheHandler = cache.Handler(10 * time.Second)
func main() {
app := iris.New()
mvc.Configure(app, configure)
// http://localhost:8080
// http://localhost:8080/other
// 每10秒刷新一次,你会看到不同的时间输出。
app.Run(iris.Addr(":8080"))
}
func configure(m *mvc.Application) {
m.Router.Use(cacheHandler)
m.Handle(&exampleController{
timeFormat: "2006-01-02 15:04:05",
})
}
type exampleController struct {
timeFormat string
}
func (c *exampleController) Get() string {
now := time.Now().Format(c.timeFormat)
return "last time executed without cache: " + now
}
func (c *exampleController) GetOther() string {
now := time.Now().Format(c.timeFormat)
return "/other: " + now
}

4 整体代码示例

4.1 main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
go复制代码package main

import (
"fmt"
"github.com/kataras/iris/v12"
"pathway/utils"
"pathway/web/models"
"pathway/web/routers"
)

func main() {
app := iris.Default()
models.InitDataBase() //初始化数据库
routers.Register(app) //注册路由
fmt.Println("程序启动")
}

4.2 routers.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
go复制代码package routers

import (
"pathway/sentacom"
"pathway/utils"
"pathway/web/common"
"pathway/web/controller"
"fmt"
"github.com/kataras/iris/v12"
"github.com/kataras/iris/v12/mvc"
)

func Register(app *iris.Application) {
fmt.Println("接口初始化中...")
//处理iris跨域
app.Use(common.AllowCRS)
app.AllowMethods(iris.MethodOptions)
//路径分组
mvc.New(app.Party("/pathway/path/group")).Handle(controller.NewPathGroupController())
//路径分组-->PathItem路径项
mvc.New(app.Party("/pathway/path/item")).Handle(controller.NewPathItemController())

//读取配置文件数据
config := utils.GetConfig()
serveHost := config.Serve.Host
serveport := config.Serve.Port
strAddr := fmt.Sprintf("%s:%v", serveHost, serveport) //服务启动的ip:port
app.Run(iris.Addr(strAddr))

}

4.3 controller.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
go复制代码package controller

import (
"pathway/sentacom"
"pathway/web/common"
"pathway/web/models"
"pathway/web/service"
"github.com/kataras/iris/v12"
"net/http"
"reflect"
)

//PathGroupController
type PathGroupController struct {
Ctx iris.Context //上下文对象
Service service.PathGroupService //操作数据库service层
}
//定义route调用的方法
func NewPathGroupController() *PathGroupController {
return &PathGroupController{Service: service.NewPathGroupService()}
}

//请求方式为 POST 请求url:ip:port/pathway/path/group
func (g *PathGroupController) Post() (result common.ResultData) {
var group models.PathGroup
//读取前端传递过来的json数据
if err := g.Ctx.ReadJSON(&group); err != nil {
return common.ResponseErrorResult(err.Error(), http.StatusBadRequest)
}
//调用service层处理业务逻辑
ret, err := g.Service.SavePathGroup(group)
if err != nil {
return common.ResponseErrorResult(err.Error(), http.StatusBadRequest)
}
return common.ResponseResult(ret, http.StatusOK, sentacom.SuccessText(sentacom.AddSuccess))
}

//请求方式为 GET 请求url:ip:port/pathway/path/group
func (g *PathGroupController) Get() (result common.ResultData) {
//此处省略Get处理的方法
}

func (g *PathGroupController) Put() (result common.ResultData) {
//此处省略Put处理的方法
}

func (g *PathGroupController) Delete() (result common.ResultData) {
//此处省略Delete处理的方法
}

4.4 common.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
go复制代码package common

import "pathway/sentacom"

type ResultData struct {
Status int `json:"status"` //HTTP响应状态码
Msg string `json:"msg"` //返回的提示消息
Data interface{} `json:"data"` // HTTP响应数据
}

// 封装http请求统一返回的数据结构
//
// ResponseResult(err.Error(), 604, "查询失败")
func ResponseResult(data interface{}, status int, msg string) ResultData {
return ResultData{
Status: status,
Msg: msg,
Data: data,
}
}

// 封装http请求统一返回的错误数据结构
//
// ResponseResult(err.Error(), 604)
func ResponseErrorResult(data interface{}, httpStatus int) ResultData {
msg := sentacom.ErrorText(httpStatus)
return ResultData{
Status: httpStatus,
Msg: msg,
Data: data,
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

7张图揭晓RocketMQ存储设计的精髓 存储概述 存储文件

发表于 2021-11-16

简介: RocketMQ 作为一款基于磁盘存储的中间件,具有无限积压能力,并提供高吞吐、低延迟的服务能力,其最核心的部分必然是它优雅的存储设计。

存储概述

RocketMQ 存储的文件主要包括 Commitlog 文件、ConsumeQueue 文件、Index 文件。

RocketMQ 将所有主题的消息存储在同一个文件中,确保消息发送时按顺序写文件,尽最大能力确保消息发送的高可用性与高吞吐量。

但消息中间件一般都是基于主题的订阅与发布模式,消息消费时必须按照主题进行帅选消息,显然从 Commitlog 文件中按照 topic 去筛选消息会变得及其低效,为了提高根据主题检索消息的效率,RocketMQ 引入了 ConsumeQueue 文件,俗成消费队列文件。

关系型数据库可以按照字段属性进行记录检索,作为一款主要面向业务开发的消息中间件,RocketMQ 也提供了基于消息属性的检索能力,底层的核心设计理念是为 Commitlog 文件建立哈希索引,并存储在 Index 文件中。

在 RocketMQ 中顺序写入到 Commitlog 文件后,ConsumeQueue 与 Index 文件都是异步构建的,其数据流向图如下:

存储文件组织方式

RocketMQ 在消息写入过程中追求极致的磁盘顺序写。所有主题的消息全部写入一个文件,即 Commitlog 文件。所有消息按抵达顺序依次追加到文件中,消息一旦写入,不支持修改。Commitlog 文件的具体布局如下图所示:

基于文件编程与基于内存编程有一个很大的不同是在基于内存的编程模式中我们有现成的数据结构,例如 List、HashMap,对数据的读写非常方便,那么一条一条消息存入文件 Commitlog 后,该如何查找呢?

正如关系型数据会为每一条数据引入一个 ID 字段,在基于文件编程的模型中,也会为一条消息引入一个身份标志:消息物理偏移量,即消息存储在文件的起始位置。

正是有了物理偏移量的概念,Commitlog 的文件名命名也是极具技巧性,使用了存储在该文件的第一条消息在整个 Commitlog 文件组中的偏移量来命名,例如第一个 Commitlog 文件为

0000000000000000000,第二个文件为

00000000001073741824,然后依次类推。

这样做的好处是给出任意一个消息的物理偏移量,例如消息偏移量为 73741824,可以通过二分法进行查找,快速定位这个文件在第一个文件中,然后用消息的物理偏移量减去该文件的名称所得到的差值,就是在该文件中的绝对地址。

Commitlog 文件的设计理念是追求极致的消息写,但我们知道消息消费模型是基于主题的订阅机制,即一个消费组是消费特定主题的消息。如果根据主题从 commitlog 文件中检索消息,我们会发现这绝不是一个好主意,只能从文件的第一条消息逐条检索,其性能可想而知,故为了解决基于 topic 的消息检索问题,RocketMQ 引入了 consumequeue 文件,consumequeue 的结构如下图所示。

ConsumeQueue 文件是消息消费队列文件,是 Commitlog 文件基于 Topic 的索引文件,主要用于消费者根据 Topic 消费消息,其组织方式为/topic/queue,同一个队列中存在多个文件。

Consumequeue 的设计极具技巧,每个条目长度固定(8 字节 commitlog 物理偏移量、4 字节消息长度、8 字节 tag hashcode)。

这里不是存储 tag 的原始字符串,而选择存储 hashcode,目的就是确保每个条目的长度固定,可以使用访问类似数组下标的方式快速定位条目,极大地提高了 ConsumeQueue 文件的读取性能。

试想一下,消息消费者根据 topic、消息消费进度(consumeuqe 逻辑偏移量),即第几个 Consumeque 条目,这样的消费进度去访问消息的方法为使用逻辑偏移量 logicOffset * 20 即可找到该条目的起始偏移量(consumequeue 文件中的偏移量),然后读取该偏移量后 20 个字节即得到一个条目,无须遍历 consumequeue 文件。

RocketMQ 与 Kafka 相比具有一个强大的优势,就是支持按消息属性检索消息,引入 consumequeue 文件解决了基于 topic 查找的问题,但如果想基于消息的某一个属性查找消息,consumequeue 文件就无能为力了。

RocketMQ 引入了 Index 索引文件,实现基于文件的哈希索引。IndexFile 的文件存储结构如下图所示:

IndexFile 文件基于物理磁盘文件实现 Hash 索引。其文件由 40 字节的文件头、500万 个 Hash 槽,每个 Hash 槽 4 个字节,最后由 2000万 个 Index 条目,每个条目由 20个 字节构成,分别为 4 字节索引 key 的 hashcode、8 字节消息物理偏移量、4 字节时间戳、4 字节的前一个 Index 条目(Hash 冲突的链表结构)。

即建立了索引 Key 的 hashcode 与物理偏移量的映射关系,根据 key 先快速定义到 commitlog 文件。

顺序写

基于磁盘的读写,提高其写入性能的另外一个设计原理是磁盘顺序写。

磁盘顺序写广泛用在基于文件的存储模型中,大家不妨思考一下 MySQL Redo 日志的引入目的,我们知道在 MySQL InnoDB 的存储引擎中,会有一个内存 Pool,用来缓存磁盘的文件块,当更新语句将数据修改后,会首先在内存中进行修改,然后将变更写入到 redo 文件(刷写到磁盘),然后定时将 InnoDB 内存池中的数据刷写到磁盘。

为什么不一有数据变更,就直接更新到指定的数据文件中呢?以 MySQL InnoDB 中一个库存在上千张,每一个张的数据会使用单独的文件存储,如果每一个表的数据发生变更,就刷写到磁盘,就会存在大量的随机写入,性能无法得到提升,故引入一个 redo 文件,顺序写 redo 文件,从表面上多了一步刷盘操作,但由于是顺序写,相比随机写,带来的性能提升是非常显著的。

内存映射机制

虽然基于磁盘的顺序写可以极大提高 IO 的写效率,但如果基于文件的存储采用常规的 JAVA 文件操作 API,例如 FileOutputStream 等,其性能提升会很有限,RocketMQ 引入了内存映射,将磁盘文件映射到内存中,以操作内存的方式操作磁盘,性能又提升了一个档次。

在 JAVA 中可通过 FileChannel 的 map 方法创建内存映射文件。

在 Linux 服务器中由该方法创建的文件使用的就是操作系统的 pagecache,即页缓存。

Linux 操作系统中的内存使用策略时会尽可能地利用机器的物理内存,并常驻内存中,就是所谓的页缓存。在操作系统的内存不够的情况下,采用缓存置换算法,例如 LRU 将不常用的页缓存回收,即操作系统会自动管理这部分内存。

如果 RocketMQ Broker 进程异常退出,存储在页缓存中的数据并不会丢失,操作系统会定时将页缓存中的数据持久化到磁盘,做到数据安全可靠。不过如果是机器断电等异常情况,存储在页缓存中的数据就有可能丢失。

灵活多变的刷盘策略

有了顺序写和内存映射的加持,RocketMQ 的写入性能得到了极大的保证,但凡事都有利弊,引入了内存映射和页缓存机制,消息会先写入到页缓存,此时消息并没有真正持久化到磁盘。那么 broker 收到客户端的消息发送后,是存储到页缓存中就直接返回成功,还是要持久化到磁盘中才返回成功呢?

这是一个“艰难”的抉择,是在性能与消息可靠性方面进行权衡。为此,RocketMQ 提供了多种策略:同步刷盘、异步刷盘。

1、同步刷盘

同步刷盘在 RocketMQ 的实现中成为组提交,并不是每一条消息都必须刷盘。其设计理念如图所示:

采用同步刷盘,每一个线程将数据追到到内存后,并向刷盘线程提交刷盘请求,然后会阻塞;刷盘线程从任务队列中获取一个任务,然后触发一次刷盘,但并不只刷与请求相关的消息,而是会直接将内存中待刷盘的所有消息一次批量刷盘,然后就可以唤醒一组请求线程,实现组刷盘。

2、异步刷盘

同步刷盘的优点是能保证消息不丢失,即向客户端返回成功就代表这条消息已被持久化到磁盘,即消息非常可靠,但这是以牺牲写入响应延迟性能为代价的,由于 RocketMQ 的消息是先写入 pagecache,故消息丢失的可能性较小,如果能容忍一定几率的消息丢失,可以考虑使用异步刷盘。

异步刷盘指的是 broker 将消息存储到 pagecache 后就立即返回成功,然后开启一个异步线程定时执行 FileChannel 的 forece 方法,将内存中的数据定时刷写到磁盘,默认间隔为 500ms。

内存级读写分离

RocketMQ 为了降低 pagecache 的使用压力引入了 transientStorePoolEnable 机制,即内存级别的读写分离机制。

默认情况下 RocketMQ 将消息写入 pagecache,消息消费时从 pagecache 中读取,这样在高并发时 pagecache 的压力会比较大,容易出现瞬时 broker busy,故 RocketMQ 还引入了 transientStorePoolEnable,将消息先写入堆外内存并立即返回,然后异步将堆外内存中的数据提交到 pagecache,再异步刷盘到磁盘中。其工作机制如下图所示:

消息在消费读取时不会尝试从堆外内存中读,而是从 pagecache 中读取,这样就形成了内存级别的读写分离,即消息写入时主要面对堆外内存,而读消息时主要面对 pagecache。

该方案的优点是消息是直接写入堆外内存,然后异步写入 pagecache。相比每条消息追加直接写入 pagechae,其最大的优势是将消息写入 pagecache 操作批量化。

该方案的缺点是如果由于某些意外操作导致 Broker 进程异常退出,那么存储在堆外内存的数据会丢失,但如果是放入 pagecache,broke r异常退出并不会丢失消息。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…326327328…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%