开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

两地三中心,如何部署奇数个节点?

发表于 2021-11-08

原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。

两地三中心,是有钱的公司,为保障数据安全和高可用,一个常见的需求,通常指的是 “同城双活,异地备份”。

2 + 1 = 3,从描述上来看,就知道它们之间是有阶级属性的。

异地备份的机房,level上自然就比同城双活的两个机房低了一个档次,否则也不会沦为备胎。辩证的看待这个问题,我们就能够自如的处理感情上脚踏多只船的问题。

  1. 部署结构

为了描述方便,我们把同城的两个机房,称为A和B。把可怜的备份机房,称作机房C。

同城的两个机房,距离上自然就近了一些。我们可以用图直观的表示一下这个距离差异。

image-20211108145652644.png

所以这个备份机房,非常的没有存在感。实际上,它也非常的有自知之明,只把自己放在一个备份的场景,能够接受非常大的请求延迟和比较长的数据不一致窗口。

这么算下来,就只剩下A和B两位陪你玩了,此之为双活。

  1. 奇数节点的意义

双活的意思,是两个机房要同时对外提供服务。运行在不同机房的服务,分为两种,一种是有状态的,一种是无状态的。

无状态的服务,由于自身并不存储数据,只是作为传话筒,处理上自然行云流水,没什么值得好讨论的。

难搞的是有状态的服务。即使它像鱼一样记忆只有5秒,这部分记忆依然会对整个系统提出了高标准的要求——

我们需要有个集中的地方来存储这些数据。

大家都是搞技术的,那就举例几个常见的组件。

  1. Zookeeper动物园,需要做集中的配置中心或者分布式协调工作
  2. Redis Cluster需要处理一些全局的缓存数据
  3. ElasticSearch进行数据存储

无数个案例告诉我们,要部署这些服务,得部署奇数个节点才行。为什么不能部署偶数个?因为有个脑残的问题,那就是脑裂。

我们拿Zookeeper来说,假如我们部署了6个节点,那么你要两个集群能够可用,需要至少4个存活才行。你要是设置成了3个,那它就会出现问题。

如下图,在6个节点的场景中,A和B机房网络产生了闪断。A机房的三个节点发现不能再连接B机房的节点,于是它们三个自己组个集群,并写入了 a = 100, b = 300两条数据;同理,B机房也组了个局,写入了a = 100, b = 600两条记录。

而且它们都写成功了。

image-20211108152039105.png

一个集群变成了两个,并写入了不同的数据。那我到底以谁的数据为准呢?真是要了命。

这就是脑裂问题,我们不能把集群要求的最小节点设置成3,而是起码要为4。

所以你看不管是ES还是raft协议,不管是paxos和zab,都推荐部署奇数个节点,然后把最小可用集群节点设置成 (n / 2 + 1) — 此所谓有一半以上节点投票才成,且有更好的容错性。

  1. 如何部署奇数个节点

那这个问题该如何解决呢?

假如是同城三活,那么我们只需要在每个机房部署一个节点就可以了。但即使是双活,都是公司非常有钱才能搞得起。现在搞个三活,你大概率会赢得老板一个心虚的白眼。

当然也可以采用 2 + 2 + 1的模式。

找不到一个专用的机房部署一套集群,但找几个第三方的服务器,部署一下我们的几个服务节点倒是可以的。

听起来很美好,但实际上不会这么做。因为这批第三方的服务器,对带宽、延迟 、安全、稳定的要求,一点都不低。

还是老老实实的在两个中心玩吧,野花野草闻着香,但大概率有毒。

实际上,即使是姐妹花,A和B总是有些差异。只要我们别把A和B看的太对等,问题就好处理。

image-20211108154314233.png

如上图,在A机房部署3个节点,在B机房部署2个节点。只要你这么部署了,在你的脑子里,A就是要B的level高一些,虽然你对外宣称它们是一样的。

就像你脚踏两只船,你和2人说都很爱ta。但一旦ta俩有冲突,你还是会毫不犹豫的选一个。

这就是考验。

我们切回上图,看一下几种情况。在这种部署情况下,当发生脑裂,B机房的2个节点是无法提供服务的,所以也不会有异常数据进入。

当B机房整个发生问题,A机房还是能够正常运行。

当A机房整个发生问题,B机房此时只有2个节点,不满足最小的3个节点。这个时候该怎么办呢?

没错,我们手动启动一个。你看节点6的边框是虚线的,也就意味着它是一个待命状态,随时待命转正,完全的接管A的工作。

代价也是有的,毕竟A才是你心中的No.1。ta离你而去,给你造成了困扰。自己的选择,就是含着泪,你也得把B给顶上去。

相信我,不过是小时段的阵痛,你很快会再次进入双活的世界。到时候你是把B当作No.1,还是继续换回A,都没有问题。

而且这选择很没意义。

比起谁的level高,你想要双活的根本原因,那就是谁都不相信。所以,就把未来交给薛定谔的猫吧—-

谁让你是个多情又多疑的程序员呢。

作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构和Linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。我的个人微信xjjdog0,欢迎添加好友,进一步交流。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

GO语言仿事件系统 题外话 处理方实现 你以为结束了

发表于 2021-11-08

「这是我参与11月更文挑战的第8天,活动详情查看:2021最后一次更文挑战」

题外话

话说这是一个风和日丽的上午,拿到了外卖小哥送来的我最心爱的麻辣烫,我打开了因为昨天刚充了三毛钱电费而恢复了使用的电脑,登上了满是美女好友的微信,熟悉的声音马上响彻在了我这三百平米的卧室中,果然我的魅力无法让人自拔,点开这闪烁的美女头像,一行文字映入眼帘,还钱,月底还不还钱把你麻辣烫到了…… 真好
话说最近各地都开始下雪了,都降温了,是真冷啊

在这里插入图片描述

跑题了……废话不多说,上货在这里插入图片描述

事件系统原理

事件系统可以分为两部分,事件的发生部分和处理部分,就像某个明星发生了丑闻,就会有公关,法务等等部门马上响应并处理。
一个事件系统拥有如下特性:
1.能够实现事件的一方,可以根据事件 ID 或名字注册对应的事件。
2.事件发起者,会根据注册信息通知这些注册者。
3.一个事件可以有多个实现方响应。

在这里插入图片描述

事件注册实现

事件的发生就会通知对应的处理方,所以我们需要将事件与处理方连接起来,所以我们需要通过注册的方式将事件与处理方连接起来

1
2
3
4
5
6
7
8
9
10
11
12
go复制代码//创建一个全局事件集,用map切片储存,时间名为key,处理方法为value
var nameOfEvent = make(map[string][]func(interface{}))

// 注册事件,提供事件名和回调函数
func RegisterEvent(name string, callback func(interface{})) {
// 通过名字查找事件列表
list := nameOfEvent[name]
// 在列表切片中添加函数
list = append(list, callback)
// 将修改的事件列表切片保存回去
nameOfEvent[name] = list
}

1.首先创建一个全局事件集,用map切片储存,时间名为key,处理方法为value
2.注册事件,事件名和回调函数(处理方法)作为参数
因为一个事件可以有多个处理方,所以采用数组储存不同处理方

事件调用实现

1
2
3
4
5
6
7
8
9
10
go复制代码// 调用事件
func CallEvent(name string, param interface{}) {
// 通过名字找到事件列表
list := nameOfEvent[name]
// 遍历这个事件的所有回调
for _, callback := range list {
// 传入参数调用回调
callback(param)
}
}

1.通过时间名获取处理方
2.遍历处理方,将每个函数回调传入事件参数并调用,就会触发事件实现方的逻辑处理。

处理方实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
go复制代码// 声明机构的结构体
type Actor struct {
Aname string
Bname string
}
// 为机构添加一个事件处理函数
func (actor *Actor) OnEvent(param interface{}) {
fmt.Println(actor.Aname,param)
}
// 为机构添加另一个事件处理函数
func (actor *Actor) GlobalEvent(param interface{}) {
fmt.Println(actor.Bname, param)
}

// 为第三方机构添加另一个事件处理函数
func ThreeOnEvent(param interface{}) {
fmt.Println("第三方", param)
}

1.首先声明机构和第三方,代表多个处理方
2.实现对应处理方法

事件触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码func main() {
// 实例化机构
actor := Actor{
"法务",
"公关",
}
// 注册名为OnSkill的法务回调
RegisterEvent("xxx出轨了", actor.OnEvent)
// 注册名为OnSkill的公关回调
RegisterEvent("xxx出轨了", actor.GlobalEvent)
// 再次在OnSkill上注册第三方事件
RegisterEvent("xxx出轨了", ThreeOnEvent)
// 调用事件,所有注册的同名函数都会被调用
CallEvent("xxx出轨了", "开始行动")
}

在这里插入图片描述

  1. 实例化机构,并为机构中各部门署名
  2. 注册事件
  3. 自动处理

你以为结束了

结果演示,角色和全局的事件会按注册顺序顺序地触发。

一般来说,事件系统不保证同一个事件实现方多个函数列表中的调用顺序,事件系统认为所有实现函数都是平等的。也就是说,无论例子中的 actor.OnEvent 先注册,还是 GlobalEvent() 函数先注册,最终谁先被调用,都是无所谓的,开发者不应该去关注和要求保证调用的顺序。

一个完善的事件系统还会提供移除单个和所有事件的方法。
在这里插入图片描述

大家看完发现有什么错误,写在下面吧!跟我黑虎阿福比划比划!
在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

关于 CAP 理论的一点思考

发表于 2021-11-08

「这是我参与11月更文挑战的第8天,活动详情查看:2021最后一次更文挑战」

分布式系统

  分布式系统的出现是为了解决单机系统计算和存储的瓶颈,单个计算机的内存,硬盘,cpu是有上限的,并且价格也是指数级的上升。如果一台计算机只能存储10GB的数据,执行某个计算任务要20s的时间,当我们想要存储20GB的数据,想要执行计算任务的时间缩短为10s,我们就可以设计分布式系统来达到目的。总而言之,分布式系统是一种针对单机系统资源瓶颈问题的解决方案,它通过网络通信协议连接并协调多个计算机单元来共同完成计算、存储任务。

  分布式系统通过软件系统来解决当下工业技术无法解决的计算和存储问题,那么相应地也会带来单机系统不存在的挑战:

    透明性

      分布式系统的设计相当复杂,你需要实现:服务发现,负载均很,master选举等功能,而这些对于使用者来说是透明的,最好的分布式系统就是使用者根本不会察觉的分布式的存在,给他的感觉是像使用单机系统一样。

    可扩展性

      可以轻松地支持水平扩展,当两台计算机不够用时,我扩展到四台计算机也很简单。

    可用性与可靠性

      要承诺提供的服务是可用可靠的,不能我昨天存储的数据,今天就找不到了;也不能我有时候可以访问,有时候却访问不了。

    一致性

      分布式数据存储中,可能会存在跨区域存储和数据备份的问题,要保证用户从任何地方读取到的数据都是一致的。

CAP理论

  在分布式系统中,必然会存在C(consistency,一致性),A(availability,可用性),P(partition tolerance,分区容错性)这三个基本要求,然而分布式系统只能满足其中任意两个。这是为什么呢?

  我们这三个两两组合进行分析:

  CP系统:同时满足一致性和分区容错性,设想南京,北京,上海三地有三个单机组成一个分布式系统,它们通过网路相互连接协同,但分区容错性发生,及上海区域的网络连接丢失,南京和北京区域可以正常工作,此时南京区域有人写入一条数据,我们希望系统有一致性,即在北京和上海也能访问到这条数据,北京和南京网络连接正常,所以北京已经同步了这条数据;但是上海由于网络异常,无法同步,就不能保证数据一致性,我们只能告诉南京用户数据写入失败,即丧失了可用性。

  AP系统:同时满可用性和分区容错性,根据上面的例子,南京用户无法把数据同步到上海,但为了保证可用性,我们告诉南京用户数据写入成功,此时上海却没有这条数据,即不存在数据一致性了。

  CA系统:同时满足一致性和可用性,要知道网络是极其不稳定的因素,但是用户每次写入数据都能同步到所有节点上,说明节点不会存在网络分区,那么也就没有分区容错性了。

image.png   

  在实际生产应用中,我们无法保证我们的节点都在同一机房(不会存在网络分区),所以我们使用的分布式系统一般都是CP系统和AP系统,这之中就是数据一致性和服务可用性的权衡,大多数开源的分布式工具都是用户自定义配置成CP系统和AP系统,比如:Elasticsearch。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot AOP学习(二) Spring AOP

发表于 2021-11-08

「这是我参与11月更文挑战的第8天,活动详情查看:2021最后一次更文挑战」

前言

感谢阅读菜菜的文章,本篇文章是继上一篇 SpringBoot AOP学习(一):AOP的诞生及AOP注解介绍后对AOP注解的使用作一个具体的应用,由于本身我也是才接触不久,借此机会把自己的学习心得记录下来,也希望各位大佬不吝赐教~
在这里插入图片描述
为了学起来更加得心应手,这里简单复习了下IOC:

IOC理论,用来实现对象之间的“解耦”,解决对象之间的耦合度过高的问题。IOC(控制反转)的具体实现是通过借助于“第三方”实现具有依赖关系的对象之间的解耦,这个“第三方”就是IOC容器;同时,IOC也叫依赖注入(DI),那么这两种叫法的区别是什么,且看:
控制反转:获得依赖对象的过程被反转了,从前主动,现在被动
依赖注入:就是由IOC容器在运行期间,动态地将某种依赖关系注入到对象之中
他们两兄弟是同一个东西从不同的角度来进行描述的,IOC中最基本的技术就是“反射(Reflection)”编程

AOP日志功能实战

菜菜接触一个项目要实现日志功能,需求如下:

1、记录操作人、操作时间
2、记录request请求参数
3、记录response回调数据
4、记录具体的业务描述供系统使用者查看(这里需要自定义注解)
在这里插入图片描述

案例代码结构

为了方便,菜菜将所有代码放在了同一包内
在这里插入图片描述

重点就是这个Aspect切面类
开始实现
首先,
在这里插入图片描述
先定义实体类和controller类

ReqDTO.java

1
2
3
4
5
6
7
8
9
10
11
12
bash复制代码package com.caicai.aop.csdn;


import lombok.Data;

@Data
public class ReqDTO {

private String user_id;

private String user_name;
}

TestController.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bash复制代码package com.caicai.aop.csdn;

import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/aop")
public class TestController {

@RequestMapping(path = "/test", method = RequestMethod.POST)
@MyLog(operateType = "Log测试", operateExplain = "模拟日志记录") //这里使用的自定义注解
public String test(@RequestBody ReqDTO reqDTO) {
// int i = 1 / 0; //模拟异常
System.out.println("调用 Log测试 方法");
return "调用 Log测试 方法 end" ;
}
}

然后是定义自定义注解

MyLog .java

1
2
3
4
5
6
7
8
9
10
11
12
13
bash复制代码package com.caicai.aop.csdn;

import java.lang.annotation.*;

@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyLog {
/** 操作类型 **/
String operateType();
/** 操作解释 **/
String operateExplain();
}

最后,定义我们的切面
在这里插入图片描述

要想把一个类变成切面类,需要两步,

第一步,在类上使用 @Component 注解 把切面类加入到IOC容器中
第二步,在类上使用 @Aspect 注解 使之成为切面类

TestAspect .java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
bash复制代码package com.caicai.aop.csdn;


import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

/**
* @Aspect 切面类注解实例
* @author 菜菜bu菜
*/


//声明这是一个组件
@Component
//声明这是一个切面Bean
@Aspect
public class TestAspect {


//配置切入点,该方法无方法体,主要为方便同类中其他方法使用此处配置的切入点
//@annotation表示标注了某个注解的所有方法
@Pointcut("@annotation(com.caicai.aop.csdn.MyLog)")
public void aspect(){ }


//配置前置通知,使用在方法aspect()上注册的切入点
//同时接受JoinPoint切入点对象,可以没有该参数

@Before("aspect()")
public void Before(){
System.out.println("---------Before方法开始执行");
}

//配置后置通知,使用在方法aspect()上注册的切入点
@After("aspect()")
public void After(JoinPoint joinPoint){
System.out.println("---------After方法开始执行");
}

//最终通知
//returning能够将目标方法的返回值传到切面增强方法里
//声明rvt时指定的类型会限制目标方法必须返回指定类型(String)的值或没有返回值
//此处将rvt的类型声明为Object,意味着对目标方法的返回值不加限制
@AfterReturning(pointcut ="aspect()",returning = "rvt")
public void AfterReturning(String rvt){
System.out.println("--------AfterReturning方法开始执行:---"+rvt);
}

//异常通知
//声明e时指定的类型会限制目标方法必须抛出指定类型的异常
//此处将e的类型声明为Throwable,意味着对目标方法抛出的异常不加限制
@AfterThrowing(pointcut="aspect()",throwing="e")
public void AfterThrowing(Throwable e){
System.out.println("--------AfterThrowing方法开始执行:"+e);
}


//@Around注解可以用来在调用一个具体方法前和调用后来完成一些具体的任务。
//功能很强大,可以深入了解下
@Around("aspect()")
public Object Around(ProceedingJoinPoint joinPoint) throws Throwable {
System.out.println("--------Around方法开始执行");
//获取自定义注解里面的值
Method method = ((MethodSignature)joinPoint.getSignature()).getMethod();
MyLog logAnnotation = (MyLog)method.getAnnotation(MyLog.class);
System.err.println("operateType:------"+logAnnotation.operateType());
System.err.println("operateExplain:------"+logAnnotation.operateExplain());

//获取入参
Object[] objs = joinPoint.getArgs();
String[] argNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames(); // 参数名
Map<String, Object> paramMap = new HashMap<String, Object>();
for (int i = 0; i < objs.length; i++) {
paramMap.put(argNames[i], objs[i]);
}
System.err.println("入参:"+paramMap.toString());

//获取出参
Object result =joinPoint.proceed();
System.err.println("出参:"+result.toString());
return result;

}

}

测试

正常测试

在这里插入图片描述
结果:

1
2
3
4
5
6
7
8
9
bash复制代码--------Around方法开始执行
---------Before方法开始执行
调用 Log测试 方法
--------AfterReturning方法开始执行:---调用 Log测试 方法 end
---------After方法开始执行
operateType:------Log测试
operateExplain:------模拟日志记录
入参:{reqDTO=ReqDTO(user_id=1234, user_name=3423)}
出参:调用 Log测试 方法 end

异常测试

在TestController.java下test方法里面加上 int i = 1 / 0; //模拟异常

1
2
3
4
5
6
7
bash复制代码  @RequestMapping(path = "/test", method = RequestMethod.POST)
@MyLog(operateType = "Log测试:", operateExplain = "模拟日志记录") //这里使用的自定义注解
public String test(@RequestBody ReqDTO reqDTO) {
int i = 1 / 0; //模拟异常
System.out.println("调用 Log测试 方法");
return "调用 Log测试 方法 end" ;
}

再次发送请求
在这里插入图片描述
结果:

1
2
3
4
5
6
7
bash复制代码--------Around方法开始执行
---------Before方法开始执行
--------AfterThrowing方法开始执行:java.lang.ArithmeticException: / by zero
---------After方法开始执行
operateType:------Log测试
operateExplain:------模拟日志记录
入参:{reqDTO=ReqDTO(user_id=1234, user_name=3423)}

总结

AOP真真真强啊!面向切面编程(aop)是对面向对象编程(oop)的补充,面向对象编程将程序分解成各个层次的对象,面向切面编程将程序运行过程分解成各个切面。

实现AOP的技术,主要分为两大类:

  1. 采用动态代理技术,利用截取消息的方式,对该消息进行装饰,以取代原有对象行为的执行;
  2. 采用静态织入的方式,引入特定的语法创建“方面”,从而使得编译器可以在编译期间织入有关“方面”的代码,属于静态代理。

AOP框架具有的两个特征:

  1. 各个步骤之间的良好隔离性
  2. 源代码无关性

AOP从程序运行角度考虑程序的结构,提取业务处理过程的切面,oop是静态的抽象,aop是动态的抽象。
在这里插入图片描述
最后,对于实际应用中 @Around的应用非常广泛,下一篇来具体学习一下它~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot整合kafka SpringBoot整合

发表于 2021-11-08

SpringBoot整合kafka

1.前提已经安装好kafka

2.新建springboot项目

添加项目依赖

1
2
3
4
xml复制代码        <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

添加配置文件application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
properties复制代码###########【Kafka集群】###########
spring.kafka.bootstrap-servers=192.168.2.243:9092,192.168.2.244:9092,192.168.2.245:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
#16384=16KB
#5120=5KB
spring.kafka.producer.batch-size=5120
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

# 生产端缓冲区大小
#33554432B=32M
#5242880=5M
spring.kafka.producer.buffer-memory = 5242880
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50

添加配置文件(可选)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码package com.example.kafka.config;


import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaInitialConfiguration {

// 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
@Bean
public NewTopic initialTopic() {
return new NewTopic("topic-test-llc",3, (short) 2 );
}

// 如果要修改分区数,只需修改配置值重启项目即可
// 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
@Bean
public NewTopic updateTopic() {
return new NewTopic("testtopic",10, (short) 2 );
}

}

kafka的生产者,不带回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码package com.example.kafka.controller;

import com.alibaba.fastjson.JSONObject;
import com.example.kafka.vo.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;

@RestController
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

// 发送消息
@GetMapping("/kafka/normal/{message}")
public String sendMessage1(@PathVariable("message") String normalMessage, session) {
kafkaTemplate.send("topic-test-llc", sendResult);
return "ok";
}
}

kafka的消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码package com.example.kafka.consumer;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

//@Component
public class KafkaConsumer {

// 消费监听
@KafkaListener(topics = {"topic-test-llc"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费Topic:"+record.topic()+"**分区"+record.partition()+"**值内容"+record.value());
}
}

kafka的消费者的启动速度有点慢可能要稍等一短时间才会收到kafka发来的消息

访问:

访问路径:

1
bash复制代码localhost:8080/kafka/normal/aaa

上面示例创建了一个生产者,发送消息到topic1,消费者监听topic1消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。启动项目,postman调接口触发生产者发送消息,

image-20210908140624429

3.带回调函数的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法。

第一种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}

第二种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}

4.自定义分区器

kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体要追加到哪个分区?这就是分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;

② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;

③ patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

我们自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码package com.example.kafka.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//自定义分区规则(这里假设全部发到0号分区)

return 0;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}

在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名,

1
2
properties复制代码# 自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

5.kafka事务提交

如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务,

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    @GetMapping("/kafka/transaction")
public void sendMessageTransaction(){
//生命事务,后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operations ->{
operations.send("topic","test executeInTransaction");
throw new RuntimeException("fail");
});

//不声明事务,后面保存但前端消息已经发送成功了
kafkaTemplate.send("topic","test executeInTransaction");
throw new RuntimeException("fail");
}

6.消费者

指定topic、partition、offset消费

前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码    /**
* @Title 指定topic、partition、offset消费
* @Description 同时监听topic1和topic2,监听topic1的0号分区、
* topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
* @param record
*/
@KafkaListener(id="consumer1",groupId = "felix-group",topicPartitions = {
@TopicPartition(topic = "topic1",partitions = {"0"}),
@TopicPartition(topic = "topic2",partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?,?> record){
System.out.println("topic:"+record.topic()+"partition:"+record.partition()+"offset:"+record.offset()+"value:"+record.value());
}

属性解释:

① id:消费者ID;

② groupId:消费组ID;

③ topics:监听的topic,可监听多个;

④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。

注意:topics和topicPartitions不能同时使用;

7.批量消费

设置application.prpertise开启批量消费即可,

1
2
3
4
properties复制代码# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50

接收消息时用List来接收,监听代码如下,

1
2
3
4
5
6
7
java复制代码    @KafkaListener(id="consumer2",groupId = "felix-group",topics = "topic1" )
public void onMesssage(List<ConsumerRecord<?,?>> records){
System.out.println(">>>批量消费一次,records.size()="+records.size());
for(ConsumerRecord<?,?> record:records){
System.out.println(record.value());
}
}

8.ConsumerAwareListenerErrorHandler异常处理器

通过异常处理器,我们可以处理consumer在消费时发生的异常。

新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码    //异常处理
// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(){
return (message,exception,consumer)->{
System.out.println("消费异常:"+message.getPayload());
return null;
};
}

//将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?,?> record) throws Exception{
throw new Exception("简单消费-模拟异常");
}

// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?,?>> records) throws Exception{
System.out.println("批量消费一次...");
throw new Exception("批量消费-模拟异常");
}

9.消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
@Component
public class KafkaConsumer {
@Autowired
ConsumerFactory consumerFactory;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码package com.example.kafka.consumer;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class KafkaConsumer {

@Autowired
ConsumerFactory consumerFactory;

//消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory(){
ConcurrentKafkaListenerContainerFactory factory=new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//被过滤器的消息将被丢弃
factory.setAckDiscarded(true);
//消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
if(Integer.parseInt(consumerRecord.value().toString())%2==0){
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}

//消息过滤监听
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?,?> record){
System.out.println(record.value());
}
}

上面实现了一个”过滤奇数、接收偶数”的过滤策略,我们向topic1发送0-99总共100条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数,

10.消息转发

在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下

1
2
3
4
5
6
7
8
9
10
11
java复制代码   /**
* @Title 消息转发
* @Description 从topic1接收到的消息经过处理后转发到topic2
* @param record
* @return
*/
@KafkaListener(topics = {"topic"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?,?> record){
return record.value()+"-forward message";
}

11.定时启动,停止监听器

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

① 禁止监听器自启动;

② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;

新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码@EnableScheduling
@Component
public class CronTimer {
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired
private KafkaListenerEndpointRegistry registry;

@Autowired
private ConsumerFactory consumerFactory;

// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动
container.setAutoStartup(false);
return container;
}

// 监听器
@KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
}

// 定时启动监听器
@Scheduled(cron = "0 42 11 * * ? ")
public void startListener() {
System.out.println("启动监听器...");
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("timingConsumer").isRunning()) {
registry.getListenerContainer("timingConsumer").start();
}
//registry.getListenerContainer("timingConsumer").resume();
}

// 定时停止监听器
@Scheduled(cron = "0 45 11 * * ? ")
public void shutDownListener() {
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}

启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作,

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

若依中的代码自动生成器研究-表查询篇

发表于 2021-11-08

这是我参与11月更文挑战的第7天,活动详情查看:2021最后一次更文挑战

最近生产环境用了一个开源系统:若依,其中有一个版块很有意思,很能提高生产效率: “代码生成器”。

其功能所处模块菜单为:系统工具->代码生成。我们来研究一下他的代码生成逻辑。

工具使用方法

  1. 建表

使用代码生成,首先需要在数据库建立一个业务表,比如我们要建立系统的用户表:表名称为my_user, 如下图

image.png
2. 导入

在若依“系统工具”->“代码生成”中,点击“导入”

image.png

在弹出窗口中,勾选my_user,然后点击“确定”,之后在列表界面可以看到导入的表:

image.png
3. 代码预览

点击my_user后面的“预览”,即可查看预生成的代码,预览代码生成很快,几乎没有延迟。

image.png

而且预览代码分为很多个部分,包括domain.java, mapper.java, service.java, serviceImpl.java, controller.java, mapper.xml以及前端的.vue文件与接口请求.js文件。

代码生成逻辑研究

sql查找

我们根据各个步骤请求的接口来研究一下代码生成逻辑。

点击“导入”时,窗口弹出数据库中的表列表,我们通过F12浏览器调试,发现其调用接口为/tool/gen/db/list,在后台我们去查看这个接口。接口入口如下图,在ruoyi-generator版块中。

image.png

其SQL查询语句经过逐级查找,位于GenTableMapper.xml中:

image.png

咱们拷贝出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
sql复制代码SELECT
table_name,
table_comment,
create_time,
update_time
FROM
information_schema. TABLES
WHERE
table_schema = (SELECT DATABASE())
AND table_name NOT LIKE 'qrtz_%'
AND table_name NOT LIKE 'gen_%'
AND table_name NOT IN (
SELECT
table_name
FROM
gen_table
)
ORDER BY
create_time DESC

其中的SELECT DATABASE()可以查询当前所在数据库的数据库名称。information_schema. TABLES会显示目前本台mysql连接上的所有数据库。字段table_comment会显示我们在创建表时写的注释,即在navcat上的这一部分:

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

啥?还不明白SpringAOP?

发表于 2021-11-08

「这是我参与11月更文挑战的第4天,活动详情查看:2021最后一次更文挑战」

AOP(Aspect Orient Programming),我们一般称为面向方面(切面)编程,作为面向对象的一种补充,用于处理系统中分布于各个模块的横切关注点,比如事务管理、日志、缓存等等。AOP实现的关键在于AOP框架自动创建的AOP代理,AOP代理主要分为静态代理和动态代理,静态代理的代表为AspectJ;而动态代理则以Spring AOP为代表。本文会分别对AspectJ和Spring AOP的实现进行分析和介绍。

使用AspectJ的编译时增强实现AOP

举个实例的例子来说。首先我们有一个普通的Hello类

public class Hello {
public void sayHello() {
System.out.println(“hello”);
}

1
2
3
4
java复制代码public static void main(String[] args) {
Hello h = new Hello();
h.sayHello();
}

}

使用AspectJ编写一个Aspect

1
2
3
4
5
6
7
java复制代码public aspect TxAspect {
void around():call(void Hello.sayHello()){
System.out.println("开始事务 ...");
proceed();
System.out.println("事务结束 ...");
}
}

这里模拟了一个事务的场景,类似于Spring的声明式事务。使用AspectJ的编译器编译,编译完成之后再运行这个Hello类,可以看到以下输出:

1
2
3
java复制代码开始事务 ...
hello
事务结束 ...

显然,AOP已经生效了,那么究竟AspectJ是如何在没有修改Hello类的情况下为Hello类增加新功能的呢?

查看一下编译后的Hello.class

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class Hello {
public Hello() {
}

public void sayHello() {
System.out.println("hello");
}

public static void main(String[] args) {
Hello h = new Hello();
sayHello_aroundBody1$advice(h, TxAspect.aspectOf(), (AroundClosure)null);
}
}

使用Spring AOP

与AspectJ的静态代理不同,Spring AOP使用的动态代理,所谓的动态代理就是说AOP框架不会去修改字节码,而是在内存中临时为方法生成一个AOP对象,这个AOP对象包含了目标对象的全部方法,并且在特定的切点做了增强处理,并回调原对象的方法。

Spring AOP中的动态代理主要有两种方式,JDK动态代理和CGLIB动态代理。JDK动态代理通过反射来接收被代理的类,并且要求被代理的类必须实现一个接口。JDK动态代理的核心是InvocationHandler接口和Proxy类。

如果目标类没有实现接口,那么Spring AOP会选择使用CGLIB来动态代理目标类。CGLIB(Code Generation Library),是一个代码生成的类库,可以在运行时动态的生成某个类的子类,注意,CGLIB是通过继承的方式做的动态代理,因此如果某个类被标记为final,那么它是无法使用CGLIB做动态代理的。

为了验证以上的说法,可以做一个简单的测试。首先测试实现接口的情况。

定义一个接口

1
2
3
java复制代码public interface Person {
String sayHello(String name);
}

实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Component
public class Chinese implements Person {
@Timer
@Override
public String sayHello(String name) {
System.out.println("-- sayHello() --");
return name + " hello, AOP";
}

public void eat(String food) {
System.out.println("我正在吃:" + food);
}
}

这里的@Timer注解是我自己定义的一个普通注解,用来标记Pointcut。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Aspect
@Component
public class AdviceTest {

@Pointcut("@annotation(com.zzm.aop.Timer)")
public void pointcut() {
}

@Before("pointcut()")
public void before() {
System.out.println("before");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@SpringBootApplication
@RestController
public class SpringBootDemoApplication {

//这里必须使用Person接口做注入
@Autowired
private Person chinese;

@RequestMapping("/test")
public void test() {
chinese.sayHello("listen");
System.out.println(chinese.getClass());
}

public static void main(String[] args) {
SpringApplication.run(SpringBootDemoApplication.class, args);
}
}

输出

1
2
3
java复制代码before
-- sayHello() --
class com.sun.proxy.$Proxy53

可以看到类型是com.sun.proxy.$Proxy53,也就是前面提到的Proxy类,因此这里Spring AOP使用了JDK的动态代理。

再来看看不实现接口的情况,修改Chinese类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Component
public class Chinese {

@Timer
// @Override
public String sayHello(String name) {
System.out.println("-- sayHello() --");
return name + " hello, AOP";
}

public void eat(String food) {
System.out.println("我正在吃:" + food);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@SpringBootApplication
@RestController
public class SpringBootDemoApplication {

//直接用Chinese类注入
@Autowired
private Chinese chinese;

@RequestMapping("/test")
public void test() {
chinese.sayHello("listen");
System.out.println(chinese.getClass());
}

public static void main(String[] args) {
SpringApplication.run(SpringBootDemoApplication.class, args);
}
}

输出:

1
2
3
4
java复制代码before
-- sayHello() --
class
com.zzm.aop.Chinese$$EnhancerBySpringCGLIB$$56b89168

可以看到类被CGLIB增强了,也就是动态代理。这里的CGLIB代理就是Spring AOP的代理,这个类也就是所谓的AOP代理,AOP代理类在切点动态地织入了增强处理。

小结

AspectJ在编译时就增强了目标对象,Spring AOP的动态代理则是在每次运行时动态的增强,生成AOP代理对象,区别在于生成AOP代理对象的时机不同,相对来说AspectJ的静态代理方式具有更好的性能,但是AspectJ需要特定的编译器进行处理,而Spring AOP则无需特定的编译器处理。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Java 对象拷贝机制】使用 CGlib 实现 Bean

发表于 2021-11-08

【Java对象拷贝机制】使用CGlib实现Bean拷贝(BeanCopier)

对象拷贝现状

业务系统中经常需要两个对象进行属性的拷贝,不能否认逐个的对象拷贝是最快速最安全的做法,但是当数据对象的属性字段数量超过程序员的容忍的程度,代码因此变得臃肿不堪,使用一些方便的对象拷贝工具类将是很好的选择。

模型数据转换

项目中或多或少会对某些实体进行转换(DTO、VO、DO 或者 PO 等),往往具有相同的属性名称,数量少的情况下我们可以直接采取 set、get 方法进行赋值,可是如果这样的转换在很多地方都会用到,还是靠 set 来进行操作势必会大大的影响开发效率。

  • 关于实体转换,我们把一个实体对应一张表(这可以当成 DO)。
  • 业务中与第三方进行数据交互,我们需要把实体的数据传给他们,但不一定是一个 DO 中的所有属性可能减少或者多个 DO 中的属性组成,这里我们引入 DTO(这个实体中我们可以去除一些隐私信息,比如:银行卡号,身份证,密码)。
  • 一个性别我们用 1、2 表示男女,页面中不能直接显示 1 或者 2,需要显示男、女或者靓仔(男)、靓妹(女),这时候代表这样的一个实体我们可以看作 VO。

目前流行的较为公用认可的工具类:

Apache 的两个版本:(反射机制)

  • org.apache.commons.beanutils.PropertyUtils.copyProperties(Object dest, Object orig)

原因:dateTimeConveter 的 conveter 没有对 null 值的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
vbnet复制代码​
// targetObject特殊属性的限制:(Date,BigDecimal等)
​
public class BeanObject { //此处省略getter,setter方法
   private String name;
   private java.util.Date date;
}
public class BeanObjectTest {
    public static void main(String args[]) throws Throwable {
    BeanObject from = new BeanObject();
    BeanObject to = new BeanObject();
    //from.setDate(new java.util.Date());
    from.setName("TTTT");
    org.apache.commons.beanutils.BeanUtils.copyProperties(to, from);//如果from.setDate去掉,此处出现conveter异常
    System.out.println(ToStringBuilder.reflectionToString(from));    
    System.out.println(ToStringBuilder.reflectionToString(to));
    }
}
  • org.apache.commons.beanutils.BeanUtils.copyProperties(Object dest, Object orig)
  • 相同属性名,且类型不匹配时候的处理
  • 原因:这两个工具类不支持同名异类型的匹配 !!!【包装类 Long 和原始数据类型 long 是可以的】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
vbnet复制代码public class SourceClass {  //此处省略getter,setter方法
   private Long num;
   private String name;
}
​
public class TargetClass {  //此处省略getter,setter方法
   private Long num;
   private String name;
}
​
public class PropertyUtilsTest {
   public static void main(String args[]) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
       SourceClass from = new SourceClass();
       from.setNum(1);
       from.setName("name");
       TargetClass to = new TargetClass();
       //抛出参数不匹配异常
       org.apache.commons.beanutils.PropertyUtils.copyProperties(to, from);
       org.springframework.beans.BeanUtils.copyProperties(from, to);
       //抛出参数不匹配异常
       System.out.println(ToStringBuilder.reflectionToString(from));
       System.out.println(ToStringBuilder.reflectionToString(to));
  }
}

Spring 版本:(反射机制)

  • org.springframework.beans.BeanUtils.copyProperties(Object source, Object target, Class editable, String[] ignoreProperties)

cglib 版本:(使用动态代理,效率高)

cglib 是一款比较底层的操作 java 字节码的框架

  • net.sf.cglib.beans.BeanCopier.copy(Object paramObject1, Object paramObject2, Converter paramConverter)

工具操作

img

原理简介

反射类型:(apache)

都使用静态类调用,最终转化虚拟机中两个单例的工具对象。

1
2
3
csharp复制代码public BeanUtilsBean(){
   this(new ConvertUtilsBean(), new PropertyUtilsBean());
}
  • ConvertUtilsBean 可以通过 ConvertUtils 全局自定义注册。
  • ConvertUtils.register(new DateConvert(), java.util.Date.class);
  • PropertyUtilsBean 的 copyProperties 方法实现了拷贝的算法。
  1. 动态 bean:orig instanceof DynaBean:Object value = ((DynaBean)orig).get(name); 然后把 value 复制到动态 bean 类。
  2. Map 类型:orig instanceof Map:key 值逐个拷贝
  3. 其他普通类:从 beanInfo【每一个对象都有一个缓存的 bean 信息,包含属性字段等】取出 name,然后把 sourceClass 和 targetClass 逐个拷贝。

Cglib 类型:BeanCopier

1
2
ini复制代码copier = BeanCopier.create(source.getClass(), target.getClass(), false);
copier.copy(source, target, null);

Get 和 set 方法不匹配的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typescript复制代码public class BeanCopierTest {
   /**
   * 从该用例看出BeanCopier.create的target.class 的每一个get方法必须有队形的set方法
   * @param args
   */
   public static void main(String args[]) {
       BeanCopier copier = BeanCopier.create(UnSatifisedBeanCopierObject.class, SourceClass.class,false);
       copier = BeanCopier.create(SourceClass.class, UnSatifisedBeanCopierObject.class, false); //此处抛出异常创建
  }
}
class UnSatifisedBeanCopierObject {
   private String name;
   private Long num;
   public String getName() {undefined
       return name;
  }
   public void setName(String name) {undefined
       this.name = name;
  }
   public Long getNum() {undefined
       return num;
  }
   // public void setNum(Long num) {undefined
   //     this.num = num;
   // }
}

Create 对象过程:产生 sourceClass-> TargetClass 的拷贝代理类,放入 jvm 中,所以创建的代理类的时候比较耗时。最好保证这个对象的单例模式,可以参照最后一部分的优化方案。

创建过程 -> 源代码见 jdk:

net.sf.cglib.beans.BeanCopier.Generator.generateClass(ClassVisitor)

  1. 获取 sourceClass 的所有 public get 方法-》PropertyDescriptor[] getters
  2. 获取 TargetClass 的所有 public set 方法-》PropertyDescriptor[] setters
  3. 遍历 setters 的每一个属性,执行 4 和 5
  4. 按 setters 的 name 生成 sourceClass 的所有 setter 方法-》PropertyDescriptor getter【不符合 javabean 规范的类将会可能出现空指针异常】
  5. PropertyDescriptor[] setters-》PropertyDescriptor setter
  6. 将 setter 和 getter 名字和类型 配对,生成代理类的拷贝方法。

原理总结

Copy 属性过程:调用生成的代理类,代理类的代码和手工操作的代码很类似,效率非常高。

上述这几种方式速度最快的是 BeanCopier,默认只复制名称和类型相同的字段,还会对 date 为空的情况不进行复制。

我认为这样做最好,比如对象 A 的值复制到 B 中,我们把相同的进行复制,把不同的,也就是需要我们个性化的一些字段,单独出来用 get 来赋值,这样程序就会很明确,重点也就聚焦在了不同的地方。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

leader说用下httpclient的重试,但我没用,因为

发表于 2021-11-08

上期周总结中写了要分析超时重试方案,这次专门介绍下可用的方案。

1、故事背景

客户对我们系统的可用性要求特别高不能低于99%,为了监控这些系统的可用性同时不对各子产品进行代码入侵,我们采取了单独开发服务进行探活。
其实探活通俗的说就是:定时调用下各种接口,查看服务是否可用。各系统接口访问协议不同主要分为:

  • websocket
  • http
  • git ssh
  • git http
  • linux shell(ssh)

k8s集群环境下夹杂这各种代理转发中间件,导致链路超长,一旦网络抖动或请求超时就误判为不可用,最终会导致指标很难看。因此大家建议增加重试策略,如果超过3次才判断为服务不可用。

2、重试方案实现分析

leader说用下httpclient的重试。
我觉得Leader的建议很不错,但是只支持Http协议,其他协议的怎么办?然后就开始思考有没有简单做法?因此我在心里描述着自己的诉求:

  1. 有没有通用的解决办法?
  2. 每个协议都要开发重试,工作量巨大。我不能陷入细节,要从具体细节抽象出来规律,然后利用设计模式之类的思想类解决问题。
  3. 不能对业务逻辑进行入侵。

再深入思考:

  1. 系统设计之初就采用了策略模式进行了抽象。
  2. 也采用了代理模式,避免controller层之间调用策略。代理模式其实也就是代理人,代理人在执行策略之前,可以随意做文章呀。

好了大概想到了解决办法,我要在代理人那里做文章。

3、重试方案搜集

在做事之前,喜欢搜集下现在是否有成熟的解决方案,因为时间紧急没时间重复造轮子,因此我搜集到了二种常见的方案:

3.1、Spring-Retry

Spring Retry提供了自动重新调用失败操作的能力。这对于错误可能是暂时性的(如暂时性的网络故障)很有帮助。Spring Retry提供了对流程和基于策略的行为的声明性控制,易于扩展和定制。

3.1.1、引入依赖

1
2
3
4
5
6
7
8
9
java复制代码<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

3.1.2、接口增加注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码@Service
@Slf4j
public class RemoteService {

/**
* 添加重试注解,当有异常时触发重试机制.设置重试5次,默认是3.延时2000ms再次执行,每次延时提高1.5倍.当返回结果不符合要求时,主动报错触发重试.
* @param count
* @return
* @throws Exception
*/
@Retryable(value = {RemoteAccessException.class }, maxAttempts = 5, backoff = @Backoff(delay = 2000, multiplier = 1.5))
public String call(Integer count) throws Exception {
if(count == 10){
log.info("Remote RPC call do something... {}",LocalTime.now());
throw new RemoteAccessException("RPC调用异常");
}
return "SUCCESS";
}

/**
* 定义回调,注意异常类型和方法返回值类型要与重试方法一致
* @param e
* @return
*/
@Recover
public String recover(RemoteAccessException e) {
log.info("Remote RPC Call fail",e);
return "recover SUCCESS";
}
}

@Retryable 中有3个参数,

  • value是 可还原的异常类型,也就是重试的异常类型。
  • maxAttempts 则代表了最大的尝试次数,默认是3次。
  • exclude,指定异常不重试,默认为空
  • include,指定异常重试,为空时,所以异常进行重试
  • backoff 则代表了延迟,默认是没有延迟的,就是失败后立即重试,当然加上延迟时间的处理方案更好,看业务场景,也可以不加括号里面的(delay = 3000L)),默认延迟1000ms.

@Backoff

  • delay:指定延迟后重试
  • multiplier:延迟的倍数,eg: delay=1000L,multiplier=2时,第一次重试为1秒,第二次为2秒,第三次为4秒

注意:

注意这里如果@Retryable注解的方法是在Service层,然后在Controller层进行调用的,如果你在本类中调用,那么@Retryable 不会工作。因为当使用@Retryable时,Spring会在原始bean周围创建一个代理,然后可以在特殊情况下特殊处理,这也就是重试的原理了。所以在这种情况下,Spring推荐我们调用一个实际的方法,然后捕获我们在value中抛出的异常,然后根据@Retryable 的饿配置来进行调用。
使用了@Retryable的方法,你要把异常进行抛出处理,要不不会被Retry捕获

3.1.3、调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@RestController
@RequestMapping("/retry")
@Slf4j
public class RetryController {

@Autowired
private RemoteService remoteService;

@RequestMapping("/show/{count}")
public String show(@PathVariable Integer count){
try {
return remoteService.call(count);
} catch (Exception e) {
log.error("RetryController.show Exception",e);
return "Hello SUCCESS";
}
}
}

3.1.3、 开启重试

1
2
3
4
5
6
7
8
9
java复制代码@SpringBootApplication
@EnableRetry //开启重试
public class Application {

public static void main(String[] args) {

SpringApplication.run(Application.class,args);
}
}

3.2、Guava Retrying

guava-retrying 是一个线程安全的 Java 重试类库,提供了一种通用方法去处理任意需要重试的代码,可以方便灵活地控制重试次数、重试时机、重试频率、停止时机等,并具有异常处理功能。

3.2.1、 引入依赖

1
2
3
4
5
java复制代码<dependency>
      <groupId>com.github.rholder</groupId>
      <artifactId>guava-retrying</artifactId>
      <version>2.0.0</version>
</dependency>

3.2.2、 入门demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码Callable<Boolean> callable = new Callable<Boolean>() {
    public Boolean call() throws Exception {
        return true; // do something useful here
    }
};

Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
    .retryIfResult(Predicates.<Boolean>isNull()) // callable返回null时重试
    .retryIfExceptionOfType(IOException.class) // callable抛出IOException重试
    .retryIfRuntimeException() // callable抛出RuntimeException重试
    .withStopStrategy(StopStrategies.stopAfterAttempt(3)) // 重试3次后停止
    .build();
try {
    retryer.call(callable);
} catch (RetryException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

**接下来对其进行详细说明: **

  • RetryerBuilder是一个factory创建者,可以定制设置重试源且可以支持多个重试源,可以配置重试次数或重试超时时间,以及可以配置等待时间间隔,创建重试者Retryer实例。
  • RetryerBuilder的重试源支持Exception异常对象 和自定义断言对象,通过retryIfException 和retryIfResult设置,同时支持多个且能兼容。
  • retryIfException,抛出runtime异常、checked异常时都会重试,但是抛出error不会重试。
  • retryIfRuntimeException只会在抛runtime异常的时候才重试,checked异常和error都不重试。
  • retryIfExceptionOfType允许我们只在发生特定异常的时候才重试,比如NullPointerException和IllegalStateException`都属于runtime异常,也包括自定义的error

​

3.2.3、WaitStrategies 重试等待策略

ExponentialWaitStrategy 指数等待策略

指数补偿 算法 Exponential Backoff

1
less复制代码.withWaitStrategy(WaitStrategies.exponentialWait(100, 5, TimeUnit.MINUTES))

创建一个永久重试的重试器,每次重试失败时以递增的指数时间等待,直到最多5分钟。 5分钟后,每隔5分钟重试一次。对该例而言:

1
java复制代码第一次失败后,依次等待时长:2^1 * 100;2^2 * 100;2^3 * 100;...

在ExponentialWaitStrategy中,根据重试次数计算等待时长的源码我们可以关注下:

1
2
3
4
5
6
7
8
9
ini复制代码@Override
public long computeSleepTime(Attempt failedAttempt) {
double exp = Math.pow(2, failedAttempt.getAttemptNumber());
long result = Math.round(multiplier * exp);
if (result > maximumWait) {
result = maximumWait;
}
return result >= 0L ? result : 0L;
}

如果以后有类似的需求,我们可以自己写下这些算法,而有关更多指数补偿 算法 Exponential Backoff,可以参考:en.wikipedia.org/wiki/Expone…

FibonacciWaitStrategy 斐波那契等待策略

Fibonacci Backoff 斐波那契补偿算法

1
less复制代码.withWaitStrategy(WaitStrategies.fibonacciWait(100, 2, TimeUnit.MINUTES))

创建一个永久重试的重试器,每次重试失败时以斐波那契数列来计算等待时间,直到最多2分钟;2分钟后,每隔2分钟重试一次;对该例而言:

1
markdown复制代码第一次失败后,依次等待时长:1*100;1*100;2*100;3*100;5*100;...

FixedWaitStrategy 固定时长等待策略

1
scss复制代码withWaitStrategy(WaitStrategies.fixedWait(10,  TimeUnit.SECONDS))

固定时长等待策略,失败后,将等待固定的时长进行重试;

RandomWaitStrategy 随机时长等待策略

1
2
scss复制代码withWaitStrategy(WaitStrategies.randomWait(10,  TimeUnit.SECONDS));
withWaitStrategy(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 10, TimeUnit.SECONDS));

随机时长等待策略,可以设置一个随机等待的最大时长,也可以设置一个随机等待的时长区间。

IncrementingWaitStrategy 递增等待策略

1
scss复制代码withWaitStrategy(WaitStrategies.incrementingWait(1,  TimeUnit.SECONDS, 5, TimeUnit.SECONDS))

递增等待策略,根据初始值和递增值,等待时长依次递增。就本例而言:

第一次失败后,将依次等待1s;6s(1+5);11(1+5+5)s;16(1+5+5+5)s;…

ExceptionWaitStrategy 异常等待策略

1
scss复制代码withWaitStrategy(WaitStrategies.exceptionWait(ArithmeticException.class, e -> 1000L))

根据所发生的异常指定重试的等待时长;如果异常不匹配,则等待时长为0;

CompositeWaitStrategy 复合等待策略

1
less复制代码.withWaitStrategy(WaitStrategies.join(WaitStrategies.exceptionWait(ArithmeticException.class, e -> 1000L),WaitStrategies.fixedWait(5, TimeUnit.SECONDS)))

复合等待策略;如果所执行的程序满足一个或多个等待策略,那么等待时间为所有等待策略时间的总和。

3.2.4、 StopStrategies 重试停止策略

NeverStopStrategy

1
scss复制代码withStopStrategy(StopStrategies.neverStop())

一直不停止,一直需要重试。

StopAfterAttemptStrategy

1
scss复制代码withStopStrategy(StopStrategies.stopAfterAttempt(3))

在重试次数达到最大次数之后,终止任务。

StopAfterDelayStrategy

1
scss复制代码withStopStrategy(StopStrategies.stopAfterDelay(3, TimeUnit.MINUTES))

在重试任务达到设置的最长时长之后,无论任务执行次数,都终止任务。

BlockStrategies 阻塞策略

阻塞策略默认提供的只有一种:ThreadSleepStrategy,实现方式是通过Thread.sleep(sleepTime)来实现;不过这也给了我们极大的发挥空间,我们可以自己实现阻塞策略。

AttemptTimeLimiters 任务执行时长限制

这个表示单次任务执行时间限制(如果单次任务执行超时,则终止执行当前任务);

** NoAttemptTimeLimit 无时长限制**

1
less复制代码.withAttemptTimeLimiter(AttemptTimeLimiters.noTimeLimit())

顾名思义,不限制执行时长;每次都是等执行任务执行完成之后,才进行后续的重试策咯。

FixedAttemptTimeLimit

1
2
less复制代码.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(10, TimeUnit.SECONDS));
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(10, TimeUnit.SECONDS, Executors.newCachedThreadPool()));

可以指定任务的执行时长限制,并且为了控制线程管理,最好指定相应的线程池。

3.2.5、重试监听

当重试发生时,如果需要额外做一些动作,比如发送邮件通知之类的,可以通过RetryListener,Guava Retryer在每次重试之后会自动回调监听器,并且支持注册多个监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Slf4j
class DiyRetryListener<Boolean> implements RetryListener {
    @Override
    public <Boolean> void onRetry(Attempt<Boolean> attempt) {
        log.info("重试次数:{}",attempt.getAttemptNumber());
        log.info("距离第一次重试的延迟:{}",attempt.getDelaySinceFirstAttempt());
        if(attempt.hasException()){
            log.error("异常原因:",attempt.getExceptionCause());
        }else {
            System.out.println("正常处理结果:{}" + attempt.getResult());
        }
    }
}

定义监听器之后,需要在Retryer中进行注册。

1
2
3
4
5
6
7
java复制代码        Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
                .retryIfResult(Predicates.<Boolean>isNull()) // callable返回null时重试
                .retryIfExceptionOfType(IOException.class) // callable抛出IOException重试
                .retryIfRuntimeException() // callable抛出RuntimeException重试
                .withStopStrategy(StopStrategies.stopAfterAttempt(3)) // 重试3次后停止
                .withRetryListener(new DiyRetryListener<Boolean>()) // 注册监听器
                .build();

4、总结

  • 以上2个组件让重试逻辑和系统逻辑分开实现了解耦。
  • 独立思考,站在巨人的肩膀上。

后面针对spring retry进行源码解析,我们下期见!

5、引用

blog.csdn.net/chaoHappy/a…
www.jianshu.com/p/0aca07c31…
rholder.github.io/guava-retry…
www.jianshu.com/p/a289dde63…
juejin.cn/post/701210…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一次Redis生产事故,老司机也翻了车

发表于 2021-11-08

「这是我参与11月更文挑战的第3天,活动详情查看:2021最后一次更文挑战」。PS:已经更文多少天,N就写几。一定要写对文案,否则文章不计入在内。

一、前因

公司有个核心项目redis的客户端一直是使用的jedis,后面技术负责人要求把jedis客户端替换成效能更高的lettuce客户端,同时使用spring框架自带的RedisTemplate类来操作redis。

然而世事难料,就是这么一个简单的需求却让老司机也翻了车。。。

二、事故预演

按照预设的结果,本次开发任务应该是非常轻松的:

  1. 将配置文件中jedis连接池的配置项平移替换成lettuce的;
  2. 把项目中jedis配置相关的代码删掉;
  3. 把使用到jedis的地方替换成redisTemplate。

伪代码

其他配置项不一一展示

1
2
3
4
xml复制代码spring.redis.jedis.pool.max-idle = 200
spring.redis.jedis.pool.min-idle = 10
spring.redis.jedis.pool.max-active = 200
spring.redis.jedis.pool.max-wait = 2000

替换成

1
2
3
4
xml复制代码spring.redis.lettuce.pool.max-idle = 200
spring.redis.lettuce.pool.min-idle = 10
spring.redis.lettuce.pool.max-wait = 2000
spring.redis.lettuce.pool.max-active = 200

业务代码也从jedis换成redisTemplate

jedis的伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码/**
* 设置商品库存到redis - jedis
* @param goodId 商品id
* @param count 库存量
* @return
*/
@PatchMapping("/storage/jedis")
public String setStorageByJedis(
@RequestParam("goodId") String goodId,
@RequestParam("count") String count) {
Jedis jedis = getJedis();
jedis.set("good:" + goodId, count);
jedis.close();
return "success";
}

redisTemplate的伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码/**
* 设置商品库存到redis - redisTemplate
* @param goodId 商品id
* @param count 库存量
* @return
*/
@PatchMapping("/storage")
public String setStorage(
@RequestParam("goodId") String goodId,
@RequestParam("count") String count) {
redisTemplate.opsForValue().set("good:" + goodId, count);
return "success";
}

然而一切工作做完,信心满满的上线发布之后,却大面积的爆发了线上bug。属于严重的生产事故。

image

从错误日志中我们可以清晰的看到是因为String类型的数据无法转换成int类型,我心中出现了一个大大的问号:明明我存到redis的是可以转成数字类型的字符串呀?

原因分析

通过Redis-Desktop-Manager可视化工具查看数据

image

发现string类型的键值对value值多了一对双引号

纳尼!怎么用jedis的时候就没有,换成redisTemplate就有了?

经过一番代码检查,发现使用redisTemplate的过程中好像少了一个步骤:配置序列化
一般如果没有特殊配置或者要使用redis连接池,就只用在配置中心或者配置文件中加入

1
2
3
xml复制代码spring.redis.host = 172.0.0.1
spring.redis.port = 6379
spring.redis.password = 123456Copy to clipboardErrorCopied

然后注入redisTemplate就可以使用了,非常简单。

然而RedisTemplate使用的默认序列化器是JDK自带的序列化器,看源码:

image

看RedisTemplate的类图

image

由于RedisTemplate继承了RedisAccessor,RedisAccessor实现了InitializingBean,所以在RedisTemplate类初始化完成后,可以重写afterPropertiesSet()方法,设置序列化器。

解决方案

写一个redis的配置类,重新设置序列化器。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Configuration
@ConditionalOnClass(RedisOperations.class)
public class RedisTemplateAutoConfiguration {

@Bean
@ConditionalOnMissingBean(name="redisTemplate")
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate template=new RedisTemplate();
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
template.setConnectionFactory(redisConnectionFactory);
return template;
}

这里只针对redis的string类型配置StringRedisSerializer序列化器,大家可以根据项目实际需求增加Hash对象类型的配置。

Spring自带提供了多种序列化器,如下

image

也可以自定义序列化器,需要实现RedisSerializer接口,并重写serialize()和deserialize()方法。

为了方便演示,没有写全局的redis配置类,直接在接口中重置序列化器,伪代码如下:

1
2
3
4
5
6
7
8
9
java复制代码@PatchMapping("/storage")
public String setStorage(
@RequestParam("goodId") String goodId,
@RequestParam("count") String count) {
redisTemplate.setKeySerializer(new StringRedisSerializer()); // 重置redis string类型key的序列化器
redisTemplate.setValueSerializer(new StringRedisSerializer()); // 重置redis string类型value的序列化器
redisTemplate.opsForValue().set("good:" + goodId, count);
return "success";
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…394395396…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%