开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

SpringBoot集成swagger后出现 Failed

发表于 2021-03-30

启动SpringBoot项目的时候控制台输出的log如下

1
2
3
4
5
6
7
8
9
10
11
12
13
shell复制代码2020-11-20 18:52:26.864 WARN  o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'documentationPluginsBootstrapper'; nested exception is com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: com.google.common.collect.FluentIterable.concat(Ljava/lang/Iterable;Ljava/lang/Iterable;)Lcom/google/common/collect/FluentIterable;
2020-11-20 18:52:26.866 INFO o.s.j.e.a.AnnotationMBeanExporter - Unregistering JMX-exposed beans on shutdown
2020-11-20 18:52:26.867 INFO o.s.j.e.a.AnnotationMBeanExporter - Unregistering JMX-exposed beans
2020-11-20 18:52:26.886 WARN c.n.c.sources.URLConfigurationSource - No URLs will be polled as dynamic configuration sources.
2020-11-20 18:52:26.886 INFO c.n.c.sources.URLConfigurationSource - To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2020-11-20 18:52:26.905 INFO o.s.a.r.l.SimpleMessageListenerContainer - Shutdown ignored - container is not active already
2020-11-20 18:52:26.910 INFO o.a.catalina.core.StandardService - Stopping service [Tomcat]
2020-11-20 18:52:26.927 WARN o.a.c.loader.WebappClassLoaderBase - The web application [ROOT] appears to have started a thread named [Abandoned connection cleanup thread] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
java.lang.Object.wait(Native Method)
java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.run(AbandonedConnectionCleanupThread.java:43)

Process finished with exit code 1

根据以前解决错误的思路,往往都是看最后一个报错的日志,例如:

1
shell复制代码but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread

但是、我找了半天都没找到什么原因。还以为是我写的程序存在内存泄漏呢,就去用JVM相关的技术去排错,排了半天也没发现问题。

后来实在没办法了,就继续一步一步的往上翻看日志,找到了一句:

1
2
shell复制代码 Failed to start bean 'documentationPluginsBootstrapper' 
nested exception is com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: com.google.common.collect.FluentIterable.concat(Ljava/lang/Iterable;Ljava/lang/Iterable;)Lcom/google/common/collect/FluentIterable;

去百度一搜,果然是这个的问题。

在这里插入图片描述

出现这个问题的原因就是:当前项目的guava版本与之不匹配。

去查看了下项目里的guava的版本和swagger中的guava的版本:

在这里插入图片描述

果然是版本不同。

解决办法

修改成和swagger相同的guava版本即可:

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
xml复制代码   <!-- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>-->

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

不解释,全网最全Shiro认证与授权原理分析

发表于 2021-03-30

本篇为《Shiro从入门到精通》系列第二篇,在上篇《还在手写filter进行权限校验?尝试一下Shiro吧》中,我们学习了Shiro的基本功能、架构以及各个组件的概念。本篇文章继续深入,以官方示例为基础,讲解使用Shiro的流程以及认证和授权的原理分析。下面开始正文:

前言

Shiro作为常用的权限框架,可被用于解决认证、授权、加密、会话管理等场景。Shiro对其API进行了友好的封装,如果单纯的使用Shiro框架非常简单。但如果使用了多年Shiro,还依旧停留在基本的使用上,那么这篇文章就值得你学习一下。只有了解Shiro的底层和实现,才能够更好的使用和借鉴,同时也能够避免不必要的坑。

下面以官方提供的实例为基础,讲解分析Shiro的基本使用流程,同时针对认证和授权流程进行更底层的原理讲解,让大家真正了解我们所使用的Shiro框架,底层是怎么运作的。

Shiro组成及框架

在学习Shiro各个功能模块之前,需要先从整体上了解Shiro的整体架构,以及核心组件所处的位置。下面为官方提供的架构图:

shiro

上图可以看出Security Manager是Shiro的核心,无论认证、授权、会话管理等都是通过它来进行管理的。在使用和分析原理之前,先来了解后面会用到的组件及其功能:

  • Subject:主体,可以是用户或程序,主体可以访问Security Manager以获得认证、授权、会话等服务;
  • Security Manager:安全管理器,主体所需的认证、授权功能都是在这里进行的,是Shiro的核心;
  • Authenticator:认证器,主体的认证过程通过Authenticator进行;
  • Authorizer:授权器,主体的授权过程通过Authorizer进行;
  • Session Manager:shiro的会话管理器,与web应用提供的Session管理分隔开;
  • Realm:域,可以有一个或多个域,可通过Realm存储授权和认证的逻辑;

上面只列出了部分组件及功能,其他更多组件在后续文章会逐步为大家实践讲解。了解了这些组件和核心功能之后,下面以官方的示例进行讲解。

官方实例分析

Shiro官方示例地址为:shiro.apache.org/tutorial.ht… ,需要留意的是官方示例已经有些老了,在实践中会做一些调整。

我们先在本地将环境搭建起来,运行程序。创建一个基于Maven的Java项目,引入如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
xml复制代码<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.29</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.29</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

目前最新版本为1.7.0,可根据需要引入其他版本。运行官方实例比较坑的地方是,还需要引入log4j和slf4j对应的依赖。都是Apache的项目,因此底层默认采用了log4j的日志框架,如果不引入对应的日志依赖,会报错或无法打印日志。

紧接着,在resources目录下创建一个shiro.ini文件,将官网提供内容配置内容复制进去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ini复制代码# =============================================================================
# Tutorial INI configuration
#
# Usernames/passwords are based on the classic Mel Brooks' film "Spaceballs" :)
# =============================================================================

# -----------------------------------------------------------------------------
# Users and their (optional) assigned roles
# username = password, role1, role2, ..., roleN
# -----------------------------------------------------------------------------
[users]
root = secret, admin
guest = guest, guest
presidentskroob = 12345, president
darkhelmet = ludicrousspeed, darklord, schwartz
lonestarr = vespa, goodguy, schwartz

# -----------------------------------------------------------------------------
# Roles with assigned permissions
# roleName = perm1, perm2, ..., permN
# -----------------------------------------------------------------------------
[roles]
admin = *
schwartz = lightsaber:*
goodguy = winnebago:drive:eagle5

这个文件可看成是一个Realm,其实就是shiro默认的IniRealm,当然在不同的项目中用户、权限、角色等信息可以以各种形式存储,比如数据库存储、缓存存储等。

上述配置文件格式的语义也比较明确,配置了用户和角色等信息,大家留意看一下注释中对数据格式的解释。root = secret, admin表示用户名root,密码是secret,角色是admin。其中角色可以配置多个,在后面依次用逗号分隔即可。schwartz = lightsaber:*表示角色schwartz拥有权限lightsaber:*。

继续创建一个Tutorial类,将官网提供的代码复制进去,由于采用的是1.7.0版本,官网实例中下面的代码已经没办法正常运行了:

1
2
3
ini复制代码Factory<SecurityManager> factory = new IniSecurityManagerFactory("classpath:shiro.ini");
SecurityManager securityManager = factory.getInstance();
SecurityUtils.setSecurityManager(securityManager);

原因是IniSecurityManagerFactory类已经被标注废弃了,替代它的是Environment接口及其实现类。因此需将上述获取SecurityManager的方式改为通过shiro提供的Environment来初始化和获取:

1
2
3
ini复制代码Environment environment = new BasicIniEnvironment("classpath:shiro.ini");
SecurityManager securityManager = environment.getSecurityManager();
SecurityUtils.setSecurityManager(securityManager);

改造之后的完整代码如下(其中英文注释已翻译成中文注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码public class Tutorial {

private static final transient Logger log = LoggerFactory.getLogger(Tutorial.class);

public static void main(String[] args) {
log.info("My First Apache Shiro Application");

// 1.初始化环境,主要是加载shiro.ini配置文件的信息
Environment environment = new BasicIniEnvironment("classpath:shiro.ini");
// 2.获取SecurityManager安全管理器
SecurityManager securityManager = environment.getSecurityManager();
SecurityUtils.setSecurityManager(securityManager);

// 3.获取当前主体(用户)
Subject currentUser = SecurityUtils.getSubject();

// 4.获取当前主体的会话
Session session = currentUser.getSession();
// 5.向会话中存储一些内容(不需要web容器或EJB容器)
session.setAttribute("someKey", "aValue");
// 6.再次从会话中获取存储的内容,并比较与存储值是否一致。
String value = (String) session.getAttribute("someKey");
if ("aValue".equals(value)) {
log.info("Retrieved the correct value! [" + value + "]");
}

// 当前用户进行登录操作,进而可以检验用户的角色和权限。
// 7.判断当前用户是否认证(此时很显然未认证)
if (!currentUser.isAuthenticated()) {
// 8.将账号和密码封装到UsernamePasswordToken中
UsernamePasswordToken token = new UsernamePasswordToken("lonestarr", "vespa");
// 9.记住我
token.setRememberMe(true);
try {
// 10.进行登录操作
currentUser.login(token);
} catch (UnknownAccountException uae) {
log.info("There is no user with username of " + token.getPrincipal());
} catch (IncorrectCredentialsException ice) {
log.info("Password for account " + token.getPrincipal() + " was incorrect!");
} catch (LockedAccountException lae) {
log.info("The account for username " + token.getPrincipal() + " is locked. " +
"Please contact your administrator to unlock it.");
}
// ... 更多其他异常,包括应用程序异常
catch (AuthenticationException ae) {
// 其他意外异常、error处理
}
}

// 打印当前用户的主体信息
log.info("User [" + currentUser.getPrincipal() + "] logged in successfully.");

// 10.检查是否有指定角色权限(前面已经通过Environment加载了权限和角色信息)
if (currentUser.hasRole("schwartz")) {
log.info("May the Schwartz be with you!");
} else {
log.info("Hello, mere mortal.");
}

// 判断是否有资源操作权限
if (currentUser.isPermitted("lightsaber:wield")) {
log.info("You may use a lightsaber ring. Use it wisely.");
} else {
log.info("Sorry, lightsaber rings are for schwartz masters only.");
}

// 更强级别的权限验证
if (currentUser.isPermitted("winnebago:drive:eagle5")) {
log.info("You are permitted to 'drive' the winnebago with license plate (id) 'eagle5'. " +
"Here are the keys - have fun!");
} else {
log.info("Sorry, you aren't allowed to drive the 'eagle5' winnebago!");
}

// 11.登出
currentUser.logout();
System.exit(0);
}
}

完整项目源码:github.com/secbr/shiro

执行程序,打印日志信息如下,可以看到每一步的执行输出:

1
2
3
4
5
6
7
kotlin复制代码INFO - My First Apache Shiro Application
INFO - Enabling session validation scheduler...
INFO - Retrieved the correct value! [aValue]
INFO - User [lonestarr] logged in successfully.
INFO - May the Schwartz be with you!
INFO - You may use a lightsaber ring. Use it wisely.
INFO - You are permitted to 'drive' the winnebago with license plate (id) 'eagle5'. Here are the keys - have fun!

上述代码中包含了11个主要的流程:

  • 1、初始化环境,这里主要是加载shiro.ini配置文件的信息;
  • 2、获取SecurityManager安全管理器;
  • 3、获取当前主体(用户);
  • 4、获取当前主体的会话;
  • 5、向会话中存储一些内容(不需要web容器或EJB容器);
  • 6、再次从会话中获取存储的内容,并比较与存储值是否一致;
  • 7、判断当前用户是否认证;
  • 8、将账号和密码封装到UsernamePasswordToken中;
  • 9、开启记住我;
  • 10、检查是否有指定角色权限;
  • 11、退出登录。

下面我们对几个核心步骤步骤进行分析说明。

初始化环境

源码中通过Environment对象来加载配置文件和初始化SecurityManager,然后通过工具类SecurityUtils对SecurityManager进行设置。在实践中,可根据具体情况进行初始化,比如实例中通过Environment加载文件,也可以直接创建DefaultSecurityManager,在web项目采用DefaultWebSecurityManager等。

1
2
3
4
5
ini复制代码// 1.初始化环境,主要是加载shiro.ini配置文件的信息
Environment environment = new BasicIniEnvironment("classpath:shiro.ini");
// 2.获取SecurityManager安全管理器
SecurityManager securityManager = environment.getSecurityManager();
SecurityUtils.setSecurityManager(securityManager);

这里的配置文件相当于一个Realm,部分SecurityManager实现类(比如:DefaultSecurityManager)提供了setRealm方法,用户可通过该方法自定义设置Realm。

总之,无论获取SecurityManager的方式如何,都需要有这么一个SecurityManager用来处理后续的认证、授权等处理,可见SecurityManager的核心地位。

认证流程

在上述实例代码中,先将认证功能相关的核心代码抽离出来,包含以下代码及操作步骤(省略了SecurityManager的创建和设置):

1
2
3
4
5
6
7
8
9
10
java复制代码// 获取当前主体(用户)
Subject currentUser = SecurityUtils.getSubject();
// 判断当前用户是否认证
currentUser.isAuthenticated()
// 将账号和密码封装到UsernamePasswordToken中
UsernamePasswordToken token = new UsernamePasswordToken("lonestarr", "vespa");
// 记住我
token.setRememberMe(true);
// 登录操作
currentUser.login(token);

从这个代码流程上来看,Shiro的认证过程包括:初始化环境,获取当前用户主体,判断是否认证过,将账号密码进行封装,进行认证,认证完成校验权限。可以通过下图来表示整个流程。

在这里插入图片描述

接下来,通过跟踪源码,来看看Shiro的认证流程涉及到哪些组件。

认证原理分析

认证的入口程序是login方法,以此方法为入口,进行跟踪,并忽略掉非核心操作,可得出认证逻辑经过以下代码执行步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
ini复制代码//currentUser类型为Subject,在构造了SecurityManager之后,提交认证,token封装了用户信息
currentUser.login(token);

//DelegatingSubject类中,调用SecurityManager执行认证
Subject subject = this.securityManager.login(this, token);

//DefaultSecurityManager类中,SecurityManager委托给Authenticator执行认证逻辑
AuthenticationInfo info = this.authenticate(token);

// AuthenticatingSecurityManager类中,进行认证
this.authenticator.authenticate(token);

//AbstractAuthenticator类中,进行认证
AuthenticationInfo info = this.doAuthenticate(token);

//ModularRealmAuthenticator类中,获取多Realm进行身份认证
Collection<Realm> realms = this.getRealms();
doSingleRealmAuthentication(realm, token);

//ModularRealmAuthenticator类中,针对具体的Realm进行身份认证
AuthenticationInfo info = realm.getAuthenticationInfo(token);

//AuthenticatingRealm类中,调用对应的Realm进行校验,认证成功则返回用户属性
AuthenticationInfo info = realm.doGetAuthenticationInfo(token);

//SimpleAccountRealm类中,根据token获取账户信息
UsernamePasswordToken upToken = (UsernamePasswordToken) token;
SimpleAccount account = getUser(upToken.getUsername());

//AuthenticatingRealm类中,比对传入的token和根据token获取到的账户信息
assertCredentialsMatch(token, info);
->getCredentialsMatcher().doCredentialsMatch(token, info);

//SimpleCredentialsMatcher类中,进行具体对比
byte[] tokenBytes = toBytes(tokenCredentials);
byte[] accountBytes = toBytes(accountCredentials);
MessageDigest.isEqual(tokenBytes, accountBytes);
//或
accountCredentials.equals(tokenCredentials);

上述代码包括了认证过程中一些核心流程,抽离出核心部分,整理成流程图如下:

在这里插入图片描述

可以看出,整个认证过程中涉及到了SecurityManager、Subject、Authenticator、Realm等组件,相关组件的功能可参考架构图中的功能说明。

授权原理

实例中授权调用的代码比较少,主要就是以下几个方法:

1
2
3
4
5
6
arduino复制代码// 检查是否有相应角色权限
currentUser.hasRole("schwartz")
// 判断是否有资源操作权限
currentUser.isPermitted("lightsaber:wield")
// 判断是否有(更细粒度的)资源操作权限
currentUser.isPermitted("winnebago:drive:eagle5")

下面以hasRole方法为例,进行追踪分析源代码,看看具体的实现原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
kotlin复制代码// 检查是否有相应角色权限
currentUser.hasRole("schwartz");

// DelegatingSubject类中,委托给SecurityManager判断角色与既定角色是否匹配
this.securityManager.hasRole(this.getPrincipals(), roleIdentifier);

// AuthorizingSecurityManager类中,SecurityManager委托Authorizer进行角色检验
this.authorizer.hasRole(principals, roleIdentifier);

// ModularRealmAuthorizer类中,获取所有Realm,并遍历检查角色
for (Realm realm : getRealms());
((Authorizer) realm).hasRole(principals, roleIdentifier)

// AuthorizingRealm中,Authorizer判断Realm中的角色/权限是否和传入的匹配
AuthorizationInfo info = getAuthorizationInfo(principal);

// AuthorizingRealm中,执行Realm进行授权操作
AuthorizationInfo info = this.doGetAuthorizationInfo(principals);

// SimpleAccountRealm类中,获得用户SimpleAccount(实现了AuthorizationInfo),
// users类型为Map,以用户名为key,对应shiro.ini中配置的初始化用户信息
return this.users.get(username);

// AuthorizingRealm类中,判断传入的用户和初始化配置的是否匹配
return hasRole(roleIdentifier, info);

// AuthorizingRealm类中,最终的授权判断
return info != null && info.getRoles() != null && info.getRoles().contains(roleIdentifier);

上述代码包括了授权过程中一些核心流程,抽离出核心部分,整理成流程图(isPermitted方法类似,读者可自行追踪),如下:

在这里插入图片描述

可以看出,整个认证过程中涉及到了SecurityManager、Subject、Authorizer、Realm等组件,相关组件的功能可参考架构图中的功能说明。

自定义Realm

通过上面认证和授权流程及原理的分析,会发现无论哪个操作都需要通过Realm来定义用户认证时需要的账户信息和授权时的权限信息。但一般情况下不会使用官网示例的基于“ini配置文件”的方式,而是通过自定义Realm组件来实现。

以下面的示例来说,我们可以使用Shiro内置的Realm组件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public class AuthenticationTest {

SimpleAccountRealm simpleAccountRealm = new SimpleAccountRealm();

@Before
public void addUser() {
// 在方法开始前添加一个用户
simpleAccountRealm.addAccount("wmyskxz", "123456");
}

@Test
public void testAuthentication() {
// 1.构建SecurityManager环境
DefaultSecurityManager defaultSecurityManager = new DefaultSecurityManager();
defaultSecurityManager.setRealm(simpleAccountRealm);

// 2.主体提交认证请求
// 设置SecurityManager环境
SecurityUtils.setSecurityManager(defaultSecurityManager);
// 获取当前主体
Subject subject = SecurityUtils.getSubject();
UsernamePasswordToken token = new UsernamePasswordToken("wmyskxz", "123456");
// 登录
subject.login(token);
// subject.isAuthenticated()方法返回一个boolean值,用于判断用户是否认证成功
System.out.println("isAuthenticated:" + subject.isAuthenticated());
}
}

上述示例中创建了一个SimpleAccountRealm对象,并把初始化的账户信息通过addAccount方法添加进去。

实践中自定义Realm的方法通常是继承AuthorizingRealm类,并实现其doGetAuthorizationInfo方法和doGetAuthenticationInfo方法。在上面的流程梳理过程中,我们已经知道doGetAuthorizationInfo方法为授权功能的实现,而doGetAuthenticationInfo方法为认证的功能实现。关于具体实例,后续会用专门的实例来讲解。

小结

本篇文章从Shiro的整体架构、使用实例,再到认证和授权的源码分析,想必经过这番学习,大多数朋友已经了解了使用多年的Shiro框架到底是怎么运作的了。当了解了这些底层的实现,再回头看,是不是感觉之前有疑惑的地方豁然开朗了?是不是有一种原来如此的感觉?那么,恭喜你,你学到了。


程序新视界

\

公众号“ 程序新视界”,一个让你软实力、硬技术同步提升的平台,提供海量资料

\

微信公众号:程序新视界

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

springboot-安全认证security+jwt+OA

发表于 2021-03-30

目录

背景

正文

一、Security的职责:

二、OAuth2.0的流程

三、Jwt是什么

四、总结


背景

前面写过一篇springboot+security+jwt的实战篇,并没有把来龙去脉说清楚,所以想再写一篇把安全认证的前世今生彻底弄清楚。主要从security干什么,OAuth2.0的流程,Jwt是什么来阐述。

正文

一、Security的职责:

  • Spring Security是Spring提供的一个安全框架,提供认证和授权功能,最主要的是它提供了简单的使用方式,同时又有很高的灵活性,简单,灵活,强大。
  • Security本身是一套完整的认证和授权解决方案,只是我们在做系统的时候遵循了OAuth2.0规范,引入了其实现。现在流行的前后端分离,服务端不需要保持session管理,只需要验证token合法性。使用jwt生成token,是为了直接通过解密token直接获取用户信息,简化标准OAuth2.0的操作流程,也是当前服务端架构设计的实际需要。
  • Authentication:用户认证,一般是通过用户名密码来确认用户是否为系统中合法主体;通过用户名和密码验证用户是否合法有效。
  • Authorization:用户授权,一般是给系统中合法主体授予相关资源访问权限;就是权限管理和访问控制。
  • SpringSecurity 采用的是责任链的设计模式,它有一条很长的过滤器链。在过滤器链中定义OAuth2.0的或者Jwt的具体实现。

二、OAuth2.0的流程

  • 是一种授权协议,是规范,不是实现。
  • 角色:资源所有者,客户端(第三方应用),首选服务器,资源服务器
  • Spring Security OAuth2:Spring 对 OAuth2 的开源实现。
  • 具体案例如百度开发平台,微信开发平台
  • 主要是用来获取用户的信息
  • 这里的令牌或者token仅仅是一个标识,不包含用户信息

三、Jwt是什么

  • JSON Web Token // 是一种具体的Token实现框架
  • 是基于token的认证协议的实现
  • 主要用来生成token,验证token合法性,是否过期,获取用户信息
  • token中包含用户信息

四、总结

  • security是基础,OAuth和Jwt是具体实现,在security上生效
  • OAuth2.0是规范
  • jwt是token的实现
  • 如果我们的系统要给第三方做授权,就实现OAuth2.0
  • 如果我们要做前后端分离,就实现token就可以了,jwt仅仅是token的一种实现方式
  • 三者的关系,我应该说明白了,希望对大家有帮助

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

高可用集群篇(四)-- Redis、ElasticSearc

发表于 2021-03-30

Redis集群搭建

Redis集群形式

数据分区方案

客户端分区
  • 客户端分区方案的代表为 Redis Sharding、Redis Sharding 是 Redis Cluster出来之前,业界普遍使用的Redis多实例集群方法;Java的Redis客户端驱动库Jedis,支持Redis Sharding 功能,即 ShardedJedis 以及结合缓存池的 ShardedJedisPool

image-20210320161547744

  • 优点:不使用第三方中间件,分区逻辑可控,配置简单,节点之间无关联,容易线性扩展,灵活性强
  • 缺点:客户端无法动态增删服务节点,客户端需要自行维护分发逻辑,客户端之间无法连接共享,会造成资源浪费
代理分区
  • 带来分区常用方案有 Twemproxy 和 Codis

image-20210320161633138

redis-cluster

高可用方式

Sentinel(哨兵机制)支持高可用
  • 前面介绍了主从机制,但是从运维角度来看,主节点出现了问题我们还需要通过人工干预的方式把从节点设为主节点,还要通知应用程序更新主节点地址,这种方式非常繁琐笨重,而且主节点的读写能力都十分有限,有没有较好的办法解决这两个问题,哨兵机制就是针对第一个问题的有效解决方案,第二个问题则依赖于集群;哨兵的作用就是监控Redis系统的运行状况,其功能主要包括以下三个:
+ **监控**(Monitoring):哨兵会不断地检查你的Master和Slave是否运作正常
+ **提醒**(Notification):当被监控的某个Redis出现问题时,哨兵可以通过API向管理员或者其他应用程序发送通知
+ **自动故障迁移**(Automatic failover):当主数据库出现故障时自动将从数据库转换为主数据库![image-20210321120005360](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/85edb96b56e2ffa3a4b38b5065be75de9d9578f3ba5e91358feecbed658bce5a)
哨兵的原理
  • Redis哨兵的三个定时任务,Redis哨兵判定一个Redis节点故障不可达主要就是通过三个定时监控任务来完成的:
+ 每隔10秒每隔哨兵节点会向主节点和从节点发送 `info replication`命令来获取最新的拓扑结构


![image-20210321120241404](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/2a54c7a62fb9c6c33bf8803348072232a86e4bf7650aebfd83110be79b9e23ab)
+ 每隔2秒每隔哨兵节点回向Redis节点的`_sentinel_:hello`频道发送自己对主节点是否故障的判断以及自身的节点信息,并且其他的哨兵节点也会订阅这个频道来了解其他哨兵节点的信息以及对主节点的判断


![image-20210321120358665](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/1b588ae132756640cbf2cffeae6c58c1ec1b3f3c47eedbab50bc6e3a38ba72e1)
+ 每隔1秒每个哨兵节点会向主节点、从节点、其他的哨兵节点发送一个 `ping`命令来做心跳检测


![image-20210321120648877](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/174b36270699ad972f4c1f8f6b15a5d36bb3d1eaba9f5f1b0f72cae44162c53c)


如果在定时Job3检测不到节点的心跳,会判断为主观下线;如果该节点还是主节点那么还会通知其他的哨兵对该主节点进行心跳检测,这时主观下线的票数就超过了数时,那么这个主节点确实就可能是故障不可达了,这时就由原来的主观下线变为客观下线
+ 故障转移和Leader选举


如果主节点被判定为客观下线之后,就要选取一个哨兵节点来完成后面的故障转移工作,选举一个leader,这里采用的选举算法为Raft;选举出来的哨兵leader就要来完成故障转移工作,也就是在从节点中选举出一个节点来当心的主节点

Redis-Cluster

  • redis-cluster文档
  • Redis的官方多机器部署方案:Redis Cluster;一组 Redis Cluster是由多个Redis实例组成,官方推荐我们是6实例,其中三个为主节点,三个为从节点;一旦有主节点发生故障的时候,Redis Cluster可以选举出对应的从节点成为新的主节点,继续对外服务,从而保证服务的高可用性;
  • 那么对于客户端来说,怎么知道对应的key是要路由到哪一个节点呢?

Redis Cluster把所有的数据划分为16384个不同的槽位,可以根据机器的性能把不同的槽位分配给不同的Redis实例,对于Redis实例来说,他们只会存储部分的Redis数据,当然,槽的数据是可以迁移的,不听的实例之间可以通过一定的协议,进行数据迁移

image-20210321133537139

槽

  • Redis集群的功能限制;Redis集群相对于单机在功能上存在一些限制,需要开发人员提前了解,在使用时做好规避;JAVA CRC16校验算法

image-20210321134106832

image-20210321134132985

  • 可以批量操作 支持有限
+ 类似mset、mget操作,目前只支持对具有相同slot值的key执行批量操作;对于映射为不同slot值的key由于执行mget、mset等操作可能存在于多个节点上,因此不被支持
  • key事务操作 支持有限
+ 只支持多key在同一节点上的事务操作,当多个key分布在不同节点上时无法使用事务功能
  • 可以作为数据分区的最小粒度
  • 不能将一个大的键值对象 如 hash、list等映射到不同的节点
  • 不支持多数据库空间
+ 单机下的Redis可以支持16个数据库(db0 - db15),集群模式下只能使用一个数据库空间,即db0
  • 复制结构,只支持一层
+ 从节点只能复制主节点,不支持嵌套树状复制结构
  • 命令大多会重定向,耗时多

image-20210321134605852

一致性hash

  • 一致性hash可以很好地解决稳定性的问题,可以将所有的存储节点排列在首尾相接的Hash环上,每个key在计算Hash后会顺时针找到临接的存储节点存放;而当有节点加入或退出时,仅仅影响该节点在Hash环上顺时针相邻的后续节点

image-20210321135249822

  • Hash倾斜

如果节点很少,容易出现倾斜,负载不均衡问题;一致性哈希算法,引入了虚拟节点,在整个环上,均衡增加若干个节点;比如a1、a2、b1、b2、c1、c2;a1和a2都是属于A节点的,解决了hash倾斜问题

部署Cluster

创建6个redis节点

  • 三主三从方式,从为了同步备份,主进行slot数据分片

image-20210321143120398

+ 创建目录与配置,并启动容器



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
shell复制代码for port in $(seq 7001 7006); \
do \
mkdir -p /var/mall/redis/node-${port}/conf
touch /var/mall/redis/node-${port}/conf/redis.conf
cat <<EOF>> /var/mall/redis/node-${port}/conf/redis.conf
port ${port}
cluster-enabled yes
cluster-config-file nodes.conf
cluster-announce-ip 192.168.83.133
cluster-announce-port ${port}
cluster-announce-bus-port 1${port}
appendonly yes
EOF
docker run -p ${port}:${port} -p 1${port}:1${port} --name redis-${port} \
--restart=always \
-v /var/mall/redis/node-${port}/data:/data \
-v /var/mall/redis/node-${port}/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server /etc/redis/redis.conf; \
done
![image-20210321142255590](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/a6f8a1f9ddda5456bf0aee305b80679292a0c594b3c0910ab727113029144551) ![image-20210321142412697](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/4e6559be72477eadc1605e27f8e45f4c0aa5c44e522c629a339ded6bcb945e93) ![image-20210321142345881](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/0197beb83acbf29f51e203704beb0b6d3d564889b35beea6238686c6e9e10751)
1
2
3
4
shell复制代码#停止这6个redis容器
docker stop $(docker ps -a |grep redis-700 | awk '{print $1}')
#删除这6个redis容器
docker rm $(docker ps -a | grep redis-700 | awk '{print $1}')

使用redis建立集群

  • 进入任一master节点的redis容器
1
2
3
4
shell复制代码#进入容器
docker exec -it redis-7001 /bin/bash
#建立集群,如果不指定master,会自己随机指定
redis-cli --cluster create 192.168.83.133:7001 192.168.83.133:7002 192.168.83.133:7003 192.168.83.133:7004 192.168.83.133:7005 192.168.83.133:7006 --cluster-replicas 1

image-20210321143726656

输入yes,接受配置,集群搭建完成

image-20210321143940702

测试redis集群

  • 连接redis客户端,保存数据(分片存储)
1
2
3
4
5
6
bash复制代码#集群模式下,比单机多一个 -c
redis-cli -c -h 192.168.83.133 -p 7001

set hello 1
set a aaa
set bb aa

image-20210321144535451

  • 查看集群状态
1
复制代码cluster info

image-20210321145716188

  • 查看节点信息
1
复制代码cluster nodes

image-20210321145818299

  • 模拟一个主节点宕机
1
2
3
4
5
6
7
8
9
10
bash复制代码docker stop redis-7001
#进入7002容器
docker exec -it redis-7002 /bin/bash
#连接redis客户端
redis-cli -c -h 192.168.83.133 -p 7002
#获取之前存在redis7001 上的 hello的值,这时会发现,7001的副本(从机)会替代宕机的7001主机
#7005被集群提升为主节点
get hello
#查看节点信息
cluster nodes

image-20210321150203048

image-20210321150530220

  • 这时,宕机的7001再次启动,观察节点的变化(故障切换)
1
2
3
4
bash复制代码#重启redis-7001容器
docker restart redis-7001
#查看节点信息
cluster nodes

image-20210321151029146

ElasticSearch集群搭建

es集群原理

  • ElasticSearch是天生支持集群的,它不需要依赖其他的服务发现和注册的组件,如zookeeper这些,因为它内置了一个名字叫作ZenDiscovery的模块,是ElasticSearch自己实现的一套用于节点发现和选主等功能的组件,所以ElasticSearch做起集群来非常简单,不需要太多额外的配置的安装额外的第三方组件

单节点

  • 一个运行中的ElasticSearch实例称为一个节点,而集群是由一个或者多个拥有相同cluster.name配置的节点组成;它们共同承担数据和负载的压力,当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据
  • 当一个节点被选举称为主节点时,它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等;而主节点并不需要涉及到文档级别的变更和搜索等操作,所有当集群只拥有一个主节点的情况下,即使流量的增加它也不会成为瓶颈,任何节点都可以成为主节点;我们的示例集群就只有一个节点,所以它同时也成为了主节点
  • 作为用户,我们可以将请求发送到集群中的任何节点,包括主节点;每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点;无论我们将请求发送到哪个子节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回给客户端,ElasticSearch对这一切的管理都是透明的

集群健康

  • ElasticSearch的集群监控信息中包含了许多的统计数据,其中最为重要的一项就是集群健康,它在status字段中展示为green、yellow或者red
1
bash复制代码GET /_cluster/health

status:指示这当前集群在总体上是否工作正常,它的三种颜色含义如下:

​ green:所有的主分片和副本分片都正常运行

​ yellow:所有的主分片都正常运行,但不是所有的副本分片都正常运行

​ red:有主分片没能正常运行

分片

  • 一个分片是一个底层的工作单元,它仅保存了全部数据中的一部分;我们的文档被存储和索引到分片内,但是应用程序是直接与索引而不是与分片进行交互;分片就任务是一个数据区
  • 一个分片可以是主分片或者副本分片;索引内任意一个文档都归属于一个主分片,所有主分片的数目决定着索引能够保存的最大数据量
  • 在索引建立的时候就已经确定了主分片数,但是副本分片数可以随时修改
  • 让我们在包含一个空节点的集群内创建名为blogs的索引,索引在默认情况下会被分配5个主分片,但是为了演示目的:我们将分配3个主分片和一份副本(每个主分片拥有一个副本分片)
1
2
3
4
5
6
bash复制代码PUT /blogs{
"settings":{
"number_of_shards": 3,
"number_of_replicas": 1
}
}
  • 拥有一个索引的单节点集群

image-20210321154512225

此时集群的健康状况为 yellow 则表示全部 主分片都正常运行(集群可以正常服务所有请求),但是副本分片没有全部处在正常状态;实际上,所有三个副本分片都是 unassigned — 它们都没有被分配带任何节点;

在同一个节点上既保存原始数据又保存副本是没有意义的,因为一旦失去了那个节点,我们也将丢失该节点上的所有副本数据

当我们的集群是正常运行的,但是在硬件故障是有丢失数据的风险

新增节点

  • 当你在同一台机器上启动了第二个节点时,只有它和第一个节点有同样的cluster.name配置,它就会自动发现集群并加入到其中;但是在不同机器上启动节点的时候,为了加入到同一集群,你需要配置一个可连接到的单播主机列表;详细信息请查看最好使用单播代替组播
  • 拥有两个节点的集群 – 所有主分片和副本分片都已被分配

image-20210321155401965

此时,cluster-health现在展示的状态为green,这表示所有6个分片(包括3个主分片和3个副本分片)都在正常运行;我们的集群现在不仅仅是正常运行的,并且还始终处于可用状态

水平扩容 – 启动第三个节点

  • 用于三个节点的集群 – 为了分散负载而对分片进行重新分配

image-20210321155829355

Node1 和 Node2 上各有一个分片被迁移到新的 Node3 节点,现在每个节点上都拥有2个分片,而不是之前的3个;这表示每个节点的硬件资源(CPU、RAM、I/O)将被更少的分片所共享,每个分片的性能将会得到提升

在运行的集群上可以动态调整副本分片数目的,我们可以按需伸缩集群;让我们把副本数从默认的1增加到2

1
2
3
4
bash复制代码PUT /blogs/_settings
{
"number_of_replicas": 2
}

blogs 索引现在拥有9个分片:3个主分片和6个副本分片;这意味着我们可以将集群扩容到9个节点,每个节点上一个分片;相比原来的3个节点时,集群的搜索性能可以提升3倍

image-20210321160529979

应对故障

  • 关闭一个节点后的集群

image-20210321160617533

  • 我们关闭的节点是一个主节点;而集群必须拥有一个主节点来保证正常工作,所以发生的第一件事情就是选举一个新的主节点:Node2
  • 在我们关闭Node1的同时也失去了主分片1和2,并且在缺失主分片的时候索引也不能正常工作;如果此时来检查集群的状况,我们看到的状态将会是red:不是所有主分片都在正常工作
  • 幸运的是:在其它节点上存在着这两个主分片的完整副本,所以新的主节点立即将这些分片在Node2和Node3上对应的副本提升为主分片,此时集群的状态将会为yellow,这个提升主分片的过程是瞬间发生的,如同按下一个开关一般
  • 为什么我们集群状态是yellow而不是green呢?虽然我们拥有所有的三个主分片但是同时设置了每个主分片需要对应的两个副本分片,而此时只存在一份副本分片;所以集群不是green的状态

不过我们不必过于担心:如果我们同样关闭了Node2,我们的程序依然可以保持在不丢失任何数据的情况下运行,因为Node3为每一个分片都保留着一份副本

  • 如果我们重新启动Node1,集群可以将缺失的副本分片再次进行分配;如果Node1依然拥有着之前的分片,它将尝试去重用它们,同时仅从主分片复制发生了修改的数据文件

问题解决

主节点
  • 主节点负责创建索引、删除索引、分配分片、追踪集群中的节点状态等工作;ElasticSearch中的主节点的工作量相对较轻,用户的请求可以发往集群中任何一个节点,由该节点负责分发和返回结果,而不需要经过主节点转发;而主节点是由候选主节点通过ZenDiscovery机制选举出来的,所有想要成为主节点,首先要先成为候选主节点
候选主节点
  • 在ElasticSearch集群初始化或者主节点宕机的情况下,由候选主节点中选举其中一个作为主节点,指定候选主节点的配置为:node.master:true

当主节点负载压力过大,或者集群环境中的网络问题,导致其他节点与主节点的通讯的时候,主节点没来得及响应,这样的话,某些节点就认为主节点宕机,重新选择新的主节点,这样的话整个集群的工作就有问题了;比如我们集群中有10个节点,其中7个候选主节点,1个候选主节点成为了主节点,这种情况是正常情况;但是如果现在出现了上述我们所说的主节点响应不及时,导致其他某些节点任认为主节点宕机而重选主节点,那就有问题了,这剩下的6个候选主节点可能有3个候选主节点去重选主节点,最后集群中就出现了两个主节点的情况,这种情况官方称为脑裂现象

集群中不同的节点对应master的选择出现了分歧,出现了多个master竞争,导致主分片和副本的识别也发生了分歧,对一些分歧中的分片标识为了坏片

数据节点
  • 数据节点负责数据的存储与相关具体操作,比如CRUD、搜索、聚合;所以,数据节点对机器的配置要求比较高,首先需要有足够的磁盘空间来存储数据,其次数据操作对系统CPU、Memory和IO的性能消耗都很大;通常随着集群的扩大,需要增加更多的数据节点来提高可用性;指定数据节点的配置:node.data:true

ElasticSearch是允许一个节点既做候选主节点也做数据节点的,但是数据节点的负载较重,所以需要考虑将二者分离开,设置专用的候选主节点和数据节点,避免因数据节点负载重导致主节点不响应

客户端节点
  • 客户端节店就是既不做候选主节点也不做数据节点的节点,只负责请求的分发、汇总等等,但是这样的工作,其实任何一个节点都可以完成,因为在ElasticSearch中一个集群内的节点都可以执行任何请求,其会负责将请求转发给对应的节点进行处理;所以单独增加这样的节点更多的是为了负载均衡;指定该节点的配置为:

node.master:false

node.data:false

脑裂问题可能的成因
  • 1、网络问题:集群键的网络延迟导致了一些节点访问不到master,任务master节点挂掉了从而选举出新的master,并对master上的分片和副本标红,分配新的主分片
  • 2、节点负载:主节点的角色既为master又为data,访问量较大时可能会导致ES停止响应造成大面积延迟,此时其他节点等不到主节点的响应认为主节点挂掉了,会重新选取主节点
  • 3、内存回收:data节点上的ES进程占用的内存较大,引发JVM的大规模内存回收,造成ES进程失去响应
  • 脑裂问题解决方案:
    • 角色分离:即master节点与data节点分离,限制角色;数据节点是需要承担存储和搜索的工作的,压力会很大;所以如果该节点同时作为候选主节点和数据节点,那么一旦选上它作为主节点了,这时主节点的工作压力会非常大,出现脑裂现象的概率就增加了
    • 减少误判:配置主节点的响应时间,在默认情况下,主节点3秒没有响应,其他节点就任务主节点宕机看,那我们可以把该时间设置的长一点,该配置是:discover.zen.ping_timeout:5
    • 选举触发:discovery.zen.minimum_master_nodes:1(默认是1),该属性定义的是为了形成一个集群,有主节点资格并互相连接的节点的最小数目
      • 一个有10个节点的集群,且每个节点都有成为主节点的资格,discovery.zen.minimum_master_nodes参数设置为6
      • 正常情况下,10个节点,互相连接,大于6,就可以形成一个集群
      • 若某个时刻,其中有3个节点断开连接,剩下7个节点,大于6,继续运行之前的集群,而断开的3个节点,小于6,不能形成一个集群
      • 该参数就是为了防止脑裂的产生
      • 建议设置为(候选主节点数 / 2)+ 1

集群结构

  • 以三台物理机为例,在这三台物理机上,搭建了6个ES的节点,三个data节点,三个master节点(每台物理机分别起了一个data和一个master),3个master节点,目的是达到(n/2)+1等于2的要求,这样挂掉一台master后(不考虑data),n等于2,满足参数,其他两个master节点都认为master节点挂掉之后开始重新选举
1
2
3
4
5
6
7
8
ini复制代码#master节点上
node.master=true
node.data=false
discovery.zen.minimum_master_nodes=2

#data节点上
node.master=falsse
node.data=true

image-20210322091009844

es集群搭建

  • es集群:三个主节点,三个数据节点

image-20210322092736711

  • 所有操作之前先运行
1
2
3
4
5
6
ini复制代码#防止JVM报错
#临时修改
sysctl -w vm.max_map_count=262144
#永久修改
echo vm.max_map_count=262144>>/etc/sysctl.conf
sysctl -p

准备docker网络

  • Docker创建容器时默认采用bridge网络,自行分配ip,不允许自己指定

在实际部署中,我们需要指定容器ip,不允许其自行分配ip,尤其是搭建集群时,固定ip是必须的;我们可以创建自己的bridge网络:mynet,创建容器的时候指定网络为mynet并指定ip即可

查看网络模式:docker network ls

创建一个新的bridge网络

1
2
3
4
ini复制代码#创建mynet
docker network create --driver bridge --subnet=172.18.12.0/16 --gateway=172.18.1.1 mynet
#查看网络信息
docker network inspect mynet

image-20210322101740636

  • 以后使用 --network=mynet --ip 172.18.12.x,指定ip

创建三个Msater节点

  • 脚本生成配置并启动容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
shell复制代码#discovery.zen.minimum_master_nodes:2 
#设置这个参数来保证集群中的节点可以知道其他N个有master资格的节点;官方推荐 (N/2)+ 1
for port in $(seq 1 3); \
do \
mkdir -p /var/mall/elasticsearch/master-${port}/config
mkdir -p /var/mall/elasticsearch/master-${port}/data
chmod -R 777 //var/mall/elasticsearch/master-${port}
cat<<EOF> /var/mall/elasticsearch/master-${port}/config/jvm.options
-Xms128m
-Xmx128m
EOF
cat<<EOF> /var/mall/elasticsearch/master-${port}/config/elasticsearch.yml
cluster.name: my-es #集群的名称,同一个集群值必须设置成一样的
node.name: es-master-${port} #该节点的名字
node.master: true #该节点有机会成为master节点
node.data: false #该节点可以存储数据
network.host: 0.0.0.0
http.host: 0.0.0.0 #所有http均可访问
http.port: 920${port}
transport.tcp.port: 930${port}
discovery.zen.ping_timeout: 10s #设置集群中自动发现其他节点时ping的超时时间
#设置集群中的Master节点的初始化列表,可以通过这些节点来自动发现其他新加入集群的节点,es7的新增配置
discovery.seed_hosts: ["172.18.12.21:9301","172.18.12.22:9302","172.18.12.23:9303"]
cluster.initial_master_nodes: ["172.18.12.21"] #新集群初始时的候选主节点,es7新增
EOF
docker run -d --name=es-node-${port} \
-p 920${port}:920${port} -p 930${port}:930${port} --network mynet --ip 172.18.12.2${port} \
-v /var/mall/elasticsearch/master-${port}/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /var/mall/elasticsearch/master-${port}/config/jvm.options:/usr/share/elasticsearch/config/jvm.options \
-v /var/mall/elasticsearch/master-${port}/data:/usr/share/elasticsearch/data \
-v /var/mall/elasticsearch/master-${port}/plugins:/usr/share/elasticsearch/plugins \
elasticsearch:7.6.1;\
done
1
2
3
4
perl复制代码#停止全部es容器
docker stop $(docker ps -a |grep es-node-*|awk '{print $1}')
#删除全部es容器
docker rm -f $(docker ps -a |grep es-node-*|awk '{print $1}')

image-20210322143827246

创建三个Data节点

  • 创建脚本与Master不同之处

image-20210322144802351

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
shell复制代码for port in $(seq 4 6); \
do \
mkdir -p /var/mall/elasticsearch/node-${port}/config
mkdir -p /var/mall/elasticsearch/node-${port}/data
chmod -R 777 //var/mall/elasticsearch/node-${port}
cat<<EOF> /var/mall/elasticsearch/node-${port}/config/jvm.options
-Xms128m
-Xmx128m
EOF
cat<<EOF> /var/mall/elasticsearch/node-${port}/config/elasticsearch.yml
cluster.name: my-es #集群的名称,同一个集群值必须设置成一样的
node.name: es-node-${port} #该节点的名字
node.master: false #该节点有机会成为master节点
node.data: true #该节点可以存储数据
network.host: 0.0.0.0
http.host: 0.0.0.0 #所有http均可访问
http.port: 920${port}
transport.tcp.port: 930${port}
discovery.zen.ping_timeout: 10s #设置集群中自动发现其他节点时ping的超时时间
#设置集群中的Master节点的初始化列表,可以通过这些节点来自动发现其他新加入集群的节点,es7的新增配置
discovery.seed_hosts: ["172.18.12.21:9301","172.18.12.22:9302","172.18.12.23:9303"]
cluster.initial_master_nodes: ["172.18.12.21"] #新集群初始时的候选主节点,es7新增
EOF
docker run -d --name=es-node-${port} \
-p 920${port}:920${port} -p 930${port}:930${port} --network mynet --ip 172.18.12.2${port} \
-v /var/mall/elasticsearch/node-${port}/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /var/mall/elasticsearch/node-${port}/config/jvm.options:/usr/share/elasticsearch/config/jvm.options \
-v /var/mall/elasticsearch/node-${port}/data:/usr/share/elasticsearch/data \
-v /var/mall/elasticsearch/node-${port}/plugins:/usr/share/elasticsearch/plugins \
elasticsearch:7.6.1;\
done
1
2
3
4
5
bash复制代码#查看es容器日志信息
#主节点
docker logs es-node-1
#数据节点
docker logs es-node-4

image-20210322160539096

image-20210322161711340

检查es集群的工作状况

  • 1、浏览器访问 192.168.83.133:9201 -- 9206 都可以正常访问

image-20210322161918898

  • 2、部分命令
+ `/_nodes/process?pretty` :查看节点状况
+ `/_cluster/stats?pretty`:查看集群状态
+ `/_cluster/health?pretty`:查看集群健康状况
+ `/_cat/nodes`:查看各个节点信息


![image-20210322162615183](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/15c60736505abde59fb8843cbb07acf1e5efdd4150ccc72f303d4c8e706d1970)


![image-20210322162628675](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/90063cfb30ba6b4658279256e47f7f279652df2c0d84c3e83cb709fc853fa31b)


    - 这时可以看到 主节点是 `es-master-3`,尝试停止 `es-master-3`容器,观察结果:


    `docker stop es-node-3`


    **会发现触发选主机制,`es-master-1`节点被选举为主节点**


    ![image-20210322163232346](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/f1d290666389d83c3b719fc3da9950ca8175f7d5884fb55c9a39754669b6880c)


    再次启动 `es-master-3`,这时集群会认为这是一个新添节点,`es-master-1`节点仍然是主节点


    `docker restart es-node-3`


    ![image-20210322163708884](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/452ab0f8c1ad5f93ec54c881a4b6f3d4e40edeba5b89b5b658f8468b37c9df2e)

RabbitMQ集群搭建

集群形式

  • RabbitMQ是Erlang开发的,集群非常方便,因为Erlang天生就是一门分布式语言,但其本身并不支持负载均衡
  • RabbitMQ集群中节点包括内存节点(RAM)、磁盘节点(Disk、消息持久化),集群中至少有一个Disk节点

普通模式(默认)

  • 对于普通模式,集群中各个节点有相同的队列结构,但消息只会存在于集群中的一个节点;对于消费者来说,若消息进入A节点的Queue中,当从B节点拉取时,RabbitMQ会将消息从A中取出,并经过B发送给消费者
  • 应用场景:该模式各适合于消息无需持久化的场合,如日志队列;当队列非持久化,且创建该队列的节点宕机,客户端才可以重连集群其他节点,并重新创建队列;若为持久化,只能等故障节点恢复

镜像模式

  • 与普通模式不同之处是消息实体会自动在镜像节点间同步,而不是在取数据时临时拉取,高可用;该模式下,mirror queue 有一套选举算法,即1个master、n个salve,生产者、消费者的请求都会转至master
  • 应用场景:可靠性要求较高场合,如下单、库存队列

缺点:若镜像队列过多,且消息体量大,集群内部的网络带宽将会被此种同步通讯所消耗

+ 1、镜像集群也是基于普通集群,即只有先搭建普通集群,然后才能设置镜像队列
+ 2、若消费过程中,master挂掉,则选举新master,若未来得及确认,则可能会重复消费

搭建MQ集群

  • 创建目录
1
2
3
bash复制代码mkdir -p /var/mall/rabbitmq/rabbitmq01
mkdir -p /var/mall/rabbitmq/rabbitmq02
mkdir -p /var/mall/rabbitmq/rabbitmq03
  • 启动mq容器
1
2
3
4
5
6
7
8
9
10
11
shell复制代码#rabbitmq01
docker run -d --hostname rabbitmq01 --name rabbitmq01 -v /var/mall/rabbitmq/rabbitmq01:/var/lib/rabbitmq -p 15671:15672 -p 5671:5672 -e RABBITMQ_ERLANG_COOKIE='mall' rabbitmq:management

#rabbitmq02
docker run -d --hostname rabbitmq02 --name rabbitmq02 -v /var/mall/rabbitmq/rabbitmq02:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='mall' --link rabbitmq01:rabbitmq01 rabbitmq:management

#rabbitmq03
docker run -d --hostname rabbitmq03 --name rabbitmq03 -v /var/mall/rabbitmq/rabbitmq03:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 -e RABBITMQ_ERLANG_COOKIE='mall' --link rabbitmq01:rabbitmq01 --link rabbitmq02:rabbitmq02 rabbitmq:management

# --hostname 设置容器的主机名
RABBITMQ_ERLANG_COOKIE 节点认证作用,部署集群时,需要同步该值

image-20210323083243934

节点加入集群

  • 进入rabbitmq01容器
1
2
3
4
5
6
7
bash复制代码docker exec -it rabbitmq01 /bin/bash

#清空、初始化mq
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

image-20210323084521683

  • 进入第二个节点 rabbitmq02
1
2
3
4
5
6
7
8
perl复制代码docker exec -it rabbitmq02 /bin/bash

#清空、初始化mq,并加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit

image-20210323084847937

  • 进入第二个节点 rabbitmq03
1
2
3
4
5
6
7
8
perl复制代码docker exec -it rabbitmq03 /bin/bash

#清空、初始化mq,并加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit

image-20210323085104178

默认集群是普通集群,普通集群存在单点故障的隐患

image-20210323085245997

实现镜像集群

  • 进入任一mq容器
1
2
3
4
makefile复制代码docker exec -it rabbitmq01 /bin/bash
#可以使用 rabbitmqctl list_policies -p /; 查看vhost/下面所有policy
#设置策略(所有队列都是高可用模式并且自动同步)
rabbitmqctl set_policy -p / ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
1
2
3
css复制代码#在cluster中任意节点启用策略,策略会自动同步到集群节点
rabbitmqctl set_policy -p / ha-all "^" '{"ha-mode":"all"}'
#策略模式all即复制到所有节点,包含新增节点,策略正则表达式为"^",表示匹配所有

image-20210323090646337

验证镜像集群

  • 在master节点上创建队列

理论结果:rabbitmq02和rabbitmq03 会自动同步队列

image-20210323090830419

image-20210323090853712

image-20210323090918770

  • 测试数据同步

image-20210323091024286

image-20210323091125956

image-20210323091208086

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Android内存泄露检测 LeakCanary20(Ko

发表于 2021-03-30

本文介绍了开源Android内存泄漏监控工具LeakCanary2.0版本的实现原理,同时介绍了新版本新增的hprof文件解析模块的实现原理,包括hprof文件协议格式、部分实现源码等。

一、概述

LeakCanary是一款非常常见的内存泄漏检测工具。经过一系列的变更升级,LeakCanary来到了2.0版本。2.0版本实现内存监控的基本原理和以往版本差异不大,比较重要的一点变化是2.0版本使用了自己的hprof文件解析器,不再依赖于HAHA,整个工具使用的语言也由Java切换到了Kotlin。本文结合源码对2.0版本的内存泄漏监控基本原理和hprof文件解析器实现原理做一个简单地分析介绍。

LeakCanary官方链接:square.github.io/leakcanary/

1.1 新旧差异

1.1.1 .接入方法

新版: 只需要在gradle配置即可。

1
2
3
4
arduino复制代码dependencies {
// debugImplementation because LeakCanary should only run in debug builds.
debugImplementation 'com.squareup.leakcanary:leakcanary-android:2.5'
}

旧版: 1)gradle配置;2)Application 中初始化 LeakCanary.install(this) 。

敲黑板:

1)Leakcanary2.0版本的初始化在App进程拉起时自动完成;

2)初始化源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码internal sealed class AppWatcherInstaller : ContentProvider() {

/**
* [MainProcess] automatically sets up the LeakCanary code that runs in the main app process.
*/
internal class MainProcess : AppWatcherInstaller()

/**
* When using the `leakcanary-android-process` artifact instead of `leakcanary-android`,
* [LeakCanaryProcess] automatically sets up the LeakCanary code
*/
internal class LeakCanaryProcess : AppWatcherInstaller()

override fun onCreate(): Boolean {
val application = context!!.applicationContext as Application
AppWatcher.manualInstall(application)
return true
}
//....
}

3)原理:ContentProvider的onCreate在Application的onCreate之前执行,因此在App进程拉起时会自动执行 AppWatcherInstaller 的onCreate生命周期,利用Android这种机制就可以完成自动初始化;

4)拓展:ContentProvider的onCreate方法在主进程中调用,因此一定不要执行耗时操作,不然会拖慢App启动速度。

1.1.2 整体功能

Leakcanary2.0版本开源了自己实现的hprof文件解析以及泄漏引用链查找的功能模块(命名为shark),后续章节会重点介绍该部分的实现原理。

1.2 整体架构

Leakcanary2.0版本主要增加了shark部分。

二、源码分析

LeakCananry自动检测步骤:

  1. 检测可能泄漏的对象;
  2. 堆快照,生成hprof文件;
  3. 分析hprof文件;
  4. 对泄漏进行分类。

2.1 检测实现

自动检测的对象包含以下四类:

  • 销毁的Activity实例
  • 销毁的Fragment实例\
  • 销毁的View实例
  • 清除的ViewModel实例

另外,LeakCanary也会检测 AppWatcher 监听的对象:

1
arduino复制代码AppWatcher.objectWatcher.watch(myDetachedView, "View was detached")

2.1.1 LeakCanary初始化

)

AppWatcher.config :其中包含是否监听Activity、Fragment等实例的开关;

Activity的生命周期监听:注册 Application.ActivityLifecycleCallbacks ;

Fragment的生命周期期监听:同样,注册FragmentManager.FragmentLifecycleCallbacks ,但Fragment较为复杂,因为Fragment有三种,即android.app.Fragment、androidx.fragment.app.Fragment、android.support.v4.app.Fragment,因此需要注册各自包下的FragmentManager.FragmentLifecycleCallbacks;

ViewModel的监听:由于ViewModel也是androidx下面的特性,因此其依赖androidx.fragment.app.Fragment的监听;

监听Application的可见性:不可见时触发HeapDump,检查存活对象是否存在泄漏。有Activity触发onActivityStarted则程序可见,Activity触发onActivityStopped则程序不可见,因此监听可见性也是注册 Application.ActivityLifecycleCallbacks 来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
scss复制代码//InternalAppWatcher初始化
fun install(application: Application) {

......

val configProvider = { AppWatcher.config }
ActivityDestroyWatcher.install(application, objectWatcher, configProvider)
FragmentDestroyWatcher.install(application, objectWatcher, configProvider)
onAppWatcherInstalled(application)
}

//InternalleakCanary初始化
override fun invoke(application: Application) {
_application = application
checkRunningInDebuggableBuild()

AppWatcher.objectWatcher.addOnObjectRetainedListener(this)

val heapDumper = AndroidHeapDumper(application, createLeakDirectoryProvider(application))

val gcTrigger = GcTrigger.Default

val configProvider = { LeakCanary.config }
//异步线程执行耗时操作
val handlerThread = HandlerThread(LEAK_CANARY_THREAD_NAME)
handlerThread.start()
val backgroundHandler = Handler(handlerThread.looper)

heapDumpTrigger = HeapDumpTrigger(
application, backgroundHandler, AppWatcher.objectWatcher, gcTrigger, heapDumper,
configProvider
)
//Application 可见性监听
application.registerVisibilityListener { applicationVisible ->
this.applicationVisible = applicationVisible
heapDumpTrigger.onApplicationVisibilityChanged(applicationVisible)
}
registerResumedActivityListener(application)
addDynamicShortcut(application)

disableDumpHeapInTests()
}

2.1.2 如何检测泄漏

1)对象的监听者ObjectWatcher

ObjectWatcher 的关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
kotlin复制代码@Synchronized fun watch(
watchedObject: Any,
description: String
) {
if (!isEnabled()) {
return
}
removeWeaklyReachableObjects()
val key = UUID.randomUUID()
.toString()
val watchUptimeMillis = clock.uptimeMillis()
val reference =
KeyedWeakReference(watchedObject, key, description, watchUptimeMillis, queue)
SharkLog.d {
"Watching " +
(if (watchedObject is Class<*>) watchedObject.toString() else "instance of ${watchedObject.javaClass.name}") +
(if (description.isNotEmpty()) " ($description)" else "") +
" with key $key"
}

watchedObjects[key] = reference
checkRetainedExecutor.execute {
moveToRetained(key)
}
}

关键类KeyedWeakReference:弱引用WeakReference和ReferenceQueue的联合使用,参考KeyedWeakReference的父类

WeakReference的构造方法。

这种使用可以实现如果弱引用关联的的对象被回收,则会把这个弱引用加入到queue中,利用这个机制可以在后续判断对象是否被回收。

2)检测留存的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
kotlin复制代码private fun checkRetainedObjects(reason: String) {
val config = configProvider()
// A tick will be rescheduled when this is turned back on.
if (!config.dumpHeap) {
SharkLog.d { "Ignoring check for retained objects scheduled because $reason: LeakCanary.Config.dumpHeap is false" }
return
}

//第一次移除不可达对象
var retainedReferenceCount = objectWatcher.retainedObjectCount

if (retainedReferenceCount > 0) {
//主动出发GC
gcTrigger.runGc()
//第二次移除不可达对象
retainedReferenceCount = objectWatcher.retainedObjectCount
}

//判断是否还有剩余的监听对象存活,且存活的个数是否超过阈值
if (checkRetainedCount(retainedReferenceCount, config.retainedVisibleThreshold)) return

....

SharkLog.d { "Check for retained objects found $retainedReferenceCount objects, dumping the heap" }
dismissRetainedCountNotification()
dumpHeap(retainedReferenceCount, retry = true)
}

检测主要步骤:

  • 第一次移除不可达对象:移除 ReferenceQueue 中记录的KeyedWeakReference 对象(引用着监听的对象实例);
  • 主动触发GC:回收不可达的对象;
  • 第二次移除不可达对象:经过一次GC后可以进一步导致只有WeakReference持有的对象被回收,因此再一次移除ReferenceQueue 中记录的KeyedWeakReference 对象;
  • 判断是否还有剩余的监听对象存活,且存活的个数是否超过阈值;
  • 若满足上面的条件,则抓取Hprof文件,实际调用的是android原生的Debug.dumpHprofData(heapDumpFile.absolutePath) ;
  • 启动异步的HeapAnalyzerService 分析hprof文件,找到泄漏的GcRoot链路,这个也是后面的主要内容。

//HeapDumpTriggerprivate fun dumpHeap( retainedReferenceCount: Int, retry: Boolean ) { …. HeapAnalyzerService.runAnalysis(application, heapDumpFile) }

2.2 Hprof 文件解析

解析入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
kotlin复制代码//HeapAnalyzerService
private fun analyzeHeap(
heapDumpFile: File,
config: Config
): HeapAnalysis {
val heapAnalyzer = HeapAnalyzer(this)

val proguardMappingReader = try {
//解析混淆文件
ProguardMappingReader(assets.open(PROGUARD_MAPPING_FILE_NAME))
} catch (e: IOException) {
null
}
//分析hprof文件
return heapAnalyzer.analyze(
heapDumpFile = heapDumpFile,
leakingObjectFinder = config.leakingObjectFinder,
referenceMatchers = config.referenceMatchers,
computeRetainedHeapSize = config.computeRetainedHeapSize,
objectInspectors = config.objectInspectors,
metadataExtractor = config.metadataExtractor,
proguardMapping = proguardMappingReader?.readProguardMapping()
)
}

关于Hprof文件的解析细节,就需要牵扯到Hprof二进制文件协议:

hg.openjdk.java.net/jdk6/jdk6/j…

通过阅读协议文档,hprof的二进制文件结构大概如下:

)解析流程:

)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
kotlin复制代码fun analyze(
heapDumpFile: File,
leakingObjectFinder: LeakingObjectFinder,
referenceMatchers: List<ReferenceMatcher> = emptyList(),
computeRetainedHeapSize: Boolean = false,
objectInspectors: List<ObjectInspector> = emptyList(),
metadataExtractor: MetadataExtractor = MetadataExtractor.NO_OP,
proguardMapping: ProguardMapping? = null
): HeapAnalysis {
val analysisStartNanoTime = System.nanoTime()

if (!heapDumpFile.exists()) {
val exception = IllegalArgumentException("File does not exist: $heapDumpFile")
return HeapAnalysisFailure(
heapDumpFile, System.currentTimeMillis(), since(analysisStartNanoTime),
HeapAnalysisException(exception)
)
}

return try {
listener.onAnalysisProgress(PARSING_HEAP_DUMP)
Hprof.open(heapDumpFile)
.use { hprof ->
val graph = HprofHeapGraph.indexHprof(hprof, proguardMapping)//建立gragh
val helpers =
FindLeakInput(graph, referenceMatchers, computeRetainedHeapSize, objectInspectors)
helpers.analyzeGraph(//分析graph
metadataExtractor, leakingObjectFinder, heapDumpFile, analysisStartNanoTime
)
}
} catch (exception: Throwable) {
HeapAnalysisFailure(
heapDumpFile, System.currentTimeMillis(), since(analysisStartNanoTime),
HeapAnalysisException(exception)
)
}
}

LeakCanary在建立对象实例Graph时,主要解析以下几种tag:

涉及到的GCRoot对象有以下几种:

LeakCanary会根据Hprof文件构建一个HprofHeapGraph 对象,该对象记录了以下成员变量:

2.2.1 构建内存索引(Graph内容索引)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
kotlin复制代码interface HeapGraph {
val identifierByteSize: Int
/**
* In memory store that can be used to store objects this [HeapGraph] instance.
*/
val context: GraphContext
/**
* All GC roots which type matches types known to this heap graph and which point to non null
* references. You can retrieve the object that a GC Root points to by calling [findObjectById]
* with [GcRoot.id], however you need to first check that [objectExists] returns true because
* GC roots can point to objects that don't exist in the heap dump.
*/
val gcRoots: List<GcRoot>
/**
* Sequence of all objects in the heap dump.
*
* This sequence does not trigger any IO reads.
*/
val objects: Sequence<HeapObject> //所有对象的序列,包括类对象、实例对象、对象数组、原始类型数组

val classes: Sequence<HeapClass> //类对象序列

val instances: Sequence<HeapInstance> //实例对象数组

val objectArrays: Sequence<HeapObjectArray> //对象数组序列

val primitiveArrays: Sequence<HeapPrimitiveArray> //原始类型数组序列
}

为了方便快速定位到对应对象在hprof文件中的位置,LeakCanary提供了内存索引HprofInMemoryIndex :

  1. 建立字符串索引hprofStringCache(Key-value):key是字符ID,value是字符串;

作用: 可以根据类名,查询到字符ID,也可以根据字符ID查询到类名。
2. 建立类名索引classNames(Key-value):key是类对象ID,value是类字符串ID;

作用: 根据类对象ID查询类字符串ID。
3. 建立实例索引**instanceIndex(**Key-value):key是实例对象ID,value是该对象在hprof文件中的位置以及类对象ID;

作用: 快速定位实例的所处位置,方便解析实例字段的值。
4. 建立类对象索引classIndex(Key-value):key是类对象ID,value是其他字段的二进制组合(父类ID、实例大小等等);

作用: 快速定位类对象的所处位置,方便解析类字段类型。
5. 建立对象数组索引objectArrayIndex(Key-value):key是类对象ID,value是其他字段的二进制组合(hprof文件位置等等);

作用: 快速定位对象数组的所处位置,方便解析对象数组引用的对象。
6. 建立原始数组索引primitiveArrayIndex(Key-value):key是类对象ID,value是其他字段的二进制组合(hprof文件位置、元素类型等等);

2.2.2 找到泄漏的对象

1)由于需要检测的对象被

com.squareup.leakcanary.KeyedWeakReference 持有,所以可以根据

com.squareup.leakcanary.KeyedWeakReference 类名查询到类对象ID;

  1. 解析对应类的实例域,找到字段名以及引用的对象ID,即泄漏的对象ID;

2.2.3找到最短的GCRoot引用链

根据解析到的GCRoot对象和泄露的对象,在graph中搜索最短引用链,这里采用的是广度优先遍历的算法进行搜索的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
kotlin复制代码//PathFinder
private fun State.findPathsFromGcRoots(): PathFindingResults {
enqueueGcRoots()//1

val shortestPathsToLeakingObjects = mutableListOf<ReferencePathNode>()
visitingQueue@ while (queuesNotEmpty) {
val node = poll()//2

if (checkSeen(node)) {//2
throw IllegalStateException(
"Node $node objectId=${node.objectId} should not be enqueued when already visited or enqueued"
)
}

if (node.objectId in leakingObjectIds) {//3
shortestPathsToLeakingObjects.add(node)
// Found all refs, stop searching (unless computing retained size)
if (shortestPathsToLeakingObjects.size == leakingObjectIds.size) {//4
if (computeRetainedHeapSize) {
listener.onAnalysisProgress(FINDING_DOMINATORS)
} else {
break@visitingQueue
}
}
}

when (val heapObject = graph.findObjectById(node.objectId)) {//5
is HeapClass -> visitClassRecord(heapObject, node)
is HeapInstance -> visitInstance(heapObject, node)
is HeapObjectArray -> visitObjectArray(heapObject, node)
}
}
return PathFindingResults(shortestPathsToLeakingObjects, dominatedObjectIds)
}

1)GCRoot对象都入队;

2)队列中的对象依次出队,判断对象是否访问过,若访问过,则抛异常,若没访问过则继续;

3)判断出队的对象id是否是需要检测的对象,若是则记录下来,若不是则继续;

4)判断已记录的对象ID数量是否等于泄漏对象的个数,若相等则搜索结束,相反则继续;

5)根据对象类型(类对象、实例对象、对象数组对象),按不同方式访问该对象,解析对象中引用的对象并入队,并重复2)。

入队的元素有相应的数据结构ReferencePathNode ,原理是链表,可以用来反推出引用链。

三、总结

Leakcanary2.0较之前的版本最大变化是改由kotlin实现以及开源了自己实现的hprof解析的代码,总体的思路是根据hprof文件的二进制协议将文件的内容解析成一个图的数据结构,当然这个结构需要很多细节的设计,本文并没有面面俱到,然后广度遍历这个图找到最短路径,路径的起始就是GCRoot对象,结束就是泄漏的对象。至于泄漏的对象的识别原理和之前的版本并没有差异。

作者:vivo 互联网客户端团队-Li Peidong

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Golang 单元测试之路漫漫 Go主题月 总结

发表于 2021-03-29

作为 Gopher 的我,除了编写业务逻辑代码外,还需要写一大堆单元测试,这占了很大一部分的工作量,足以表明它的重要性。

项目结构

我们的项目结构应该是类似下面这样的,calc.go 会对应有一个 calc_test.go 的测试用例文件:

1
2
3
css复制代码example/
|--calc.go
|--calc_test.go

让我们来看下 calc.go 文件的内容:

1
2
3
4
5
go复制代码package example

func Add(a, b int) int {
return a + b
}

对应 calc_test.go 的测试用例可以是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码package example

import "testing"

func TestAdd(t *testing.T) {
if ans := Add(1, 2); ans != 3 {
t.Errorf("1 + 2 expected be 3, but %d got", ans)
}

if ans := Add(-1, -2); ans != -3 {
t.Errorf("-1 + -2 expected be -3, but %d got", ans)
}
}

我们可以运行 go test,这个 package 下所有的测试用例都会被执行。

go test 命令介绍

这里我们用到了 go test 命令,这个命令会自动读取源码目录下面名为 *_test.go 的文件,生成并运行测试用的可执行文件。

性能测试系统可以给出代码的性能数据,帮助测试者分析性能问题。

go test 参数说明:

  • -bench regexp 执行相应的 benchmarks,例如:-bench=.
  • -cover 可以查看覆盖率
  • -run regexp 只运行 regexp 匹配的函数,例如:-run Array 那么就执行包含有 Array 开头的函数,该参数支持通配符 *,和部分正则表达式,例如 ^、$
  • -v 显示测试的详细信息

例如执行某个文件里的所有测试用例,以及使用 -v 显示详细的信息

1
2
3
4
5
6
7
8
shell复制代码$ go test helloworld_test.go
ok command-line-arguments 0.003s
$ go test -v helloworld_test.go
=== RUN TestHelloWorld
--- PASS: TestHelloWorld (0.00s)
helloworld_test.go:8: hello world
PASS
ok command-line-arguments 0.004s

总结

本文简单介绍如何编写 Golang 的单元测试以及 go test 的基本用法,但 Golang 的单元测试远不如此,大家一定要保持学习!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

GO成神之路:接口interface|Go主题月

发表于 2021-03-29

接口interface

接口是一组方法签名,所有实现了该签名的子类都可以赋值给这个接口变量。

go中有两种接口的使用场景:1. 用作类型签名,2. 空接口(无方法签名)

用作类型签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go复制代码type Abser interface {
Abs() float64
}
type Vertex struct {
X, Y float64
}

func (v *Vertex) Abs() float64 {
return math.Sqrt(v.X*v.X + v.Y*v.Y)
}

func main() {
var a Abser
v := Vertex{3, 4}
a = &v // a *Vertex 实现了 Abser
// 下面一行,v 是一个 Vertex(而不是 *Vertex)
//v.Abs()调用时实际被转成了(&v).Abs()
// 所以没有实现 Abser
// 下面为错误代码
a = v

fmt.Println(a.Abs())
}

空接口

空接口就是没有任何方法签名的接口,它可以接收任意类型的值

1
2
3
4
5
6
7
8
9
go复制代码func main() {
var i interface{}
i = 1
i = 1.1
i = "1"
i = map[string]interface{}{}
i = []int{}
i = true
}

interface与nil

接口类似于下面这样一个结构体,一个接口变量记录了它实际指向的值和这个值的类型

1
2
3
4
go复制代码type interface struct{
data interface{}
type Type
}

下面这个例子,给一个接口类型赋值一个bool值,实际上接口内部存储了两个值,一个是具体的值,一个是类型

1
2
3
go复制代码var i interface{}
i = true
fmt.Printf("%v %T",i,i) // true bool

不要判断interface是否为nil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go复制代码func main() {

var i interface{}
if i == nil {
log.Println("is nil") // is nil
}
var ipeople *IPeople
i = ipeople
log.Printf("%v %T", i, i) // nil nil
var people *People
i = people
if people == nil {
log.Println("people is nil") // people is nil
}
if i == nil {
log.Println("i is nil")
} else {
log.Println("i is not nil") // i is not nil
log.Printf("%v %T", i, i) // nil *main.People
}
var people2 People

log.Printf("%v %T", people2, people2) //{} main.People
}

type People struct {
}

type IPeople interface{}

使用断言判断接口是否为nil

语法:v,ok:=i.(type)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
go复制代码func main() {
var i interface{}
i = 1

v, ok := i.(int)
log.Println(v, ok) // 1 true

var people *People
i = people
people, ok = i.(*People)
log.Println(people, ok) // <nil> true

var ipeople *IPeople
i = ipeople
ipeople, ok = i.(*IPeople)
log.Println(ipeople, ok) // <nil> true

var people1 People
var ipeople1 IPeople
i = people1
people1, ok = i.(People)
log.Println(ipeople, ok) // <nil> true
i = ipeople1
ipeople1, ok = i.(IPeople)
log.Println(ipeople1, ok) // <nil> false
if ok && ipeople==nil{
// do...
}
}
type People struct{}
type IPeople interface{}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Golang实践之博客(五) 读取配置文件|Go主题月 前言

发表于 2021-03-29

前言

为何要使用配置文件?

  1. 数据集中管理,便于日后的维护
  2. 避免硬编码,更改内容,无需重新编译

目标

  • go-ini的使用
  • 调整读取配置文件数据

go-ini

功能特性

  1. 支持覆盖加载多个数据源(file, []byte, io.Reader and io.ReadCloser)
  2. 支持递归读取键值
  3. 支持读取父子分区
  4. 支持读取自增键名
  5. 支持读取多行的键值
  6. 支持大量辅助方法
  7. 支持在读取时直接转换为 Go 语言类型
  8. 支持读取和 写入 分区和键的注释
  9. 轻松操作分区、键值和注释
  10. 在保存文件时分区和键值会保持原有的顺序

下载安装

1
shell复制代码go get gopkg.in/ini.v1

读取配置文件数据

新增ini配置文件

1
2
3
4
5
6
7
8
9
10
ini复制代码[server]
HTTP_PORT = :8888

[database]
TYPE = mysql
USER = root
PASSWORD =
#127.0.0.1:3306 数据库IP:数据库端口号
HOST = 127.0.0.1:3306
NAME = blog

新增ConfigService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
go复制代码package ConfigService

import (
"fmt"
"gopkg.in/ini.v1"
"os"
)

type DbInfo struct {
TYPE string
USER string
PASSWORD string
HOST string
NAME string
HTTP_PORT string
}

func GetAppConfig(key string) *DbInfo {
cfg, err := ini.Load("Config/app.ini")
if err != nil {
fmt.Printf("Fail to read file: %v", err)
os.Exit(1)
}
d := new(DbInfo)
_ = cfg.Section(key).MapTo(d)
return d
}

func GetServerConfig() *DbInfo {
return GetAppConfig("server")
}

func GetDbConfig() *DbInfo {
return GetAppConfig("database")
}

DbService连接字符串调整为读取配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
go复制代码package DbService

import (
"fmt"
"golang-blog/Model/Entity"
"golang-blog/Service/ConfigService"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

var Db *gorm.DB

func ConnectDb() {
var (
err error
)
dbConfig := ConfigService.GetAppConfig("database")
connectStr := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True&loc=Local", dbConfig.USER,
dbConfig.PASSWORD,
dbConfig.HOST, dbConfig.NAME)
Db, err = gorm.Open(mysql.Open(connectStr), &gorm.Config{})
if err != nil {
panic(err)
}

// 自动生成表结构
dbErr := Db.AutoMigrate(&Entity.UserEntity{})
if dbErr != nil {
println(err)
}
}

Routers端口号读取配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
go复制代码package Routers

import (
"github.com/gin-gonic/gin"
"golang-blog/Controller/HomeControoler"
"golang-blog/Service/ConfigService"
)

func Init(router *gin.Engine) {
home := router.Group("Home")

// 1.首位多余元素会被删除(../ or //);
//2.然后路由会对新的路径进行不区分大小写的查找;
//3.如果能正常找到对应的handler,路由就会重定向到正确的handler上并返回301或者307.(比如: 用户访问/FOO 和 /..//Foo可能会被重定向到/foo这个路由上)
router.RedirectFixedPath = true

{
home.GET("/", HomeControoler.Index)
home.GET("/Hi", HomeControoler.Hi)
}

serverConfig := ConfigService.GetServerConfig()
router.Run(serverConfig.HTTP_PORT) // 监听并在 127.0.0.1:8888 上启动服务
}

项目地址

github.com/panle666/Go…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

解锁管理EventBus注册新姿势——自定义注解+反射

发表于 2021-03-29

EventBus简介

官网对EventBus的介绍:

EventBus is an open-source library for Android and Java using the publisher/subscriber pattern for loose coupling. EventBus enables central communication to decoupled classes with just a few lines of code – simplifying the code, removing dependencies, and speeding up app development.

翻译过来就是:

EventBus是一个Android和Java的开源库,使用发布者/订阅者模式进行松散耦合。EventBus使中央通信能够仅用几行代码解耦类——简化代码,减少依赖项,并加快应用程序开发。

从介绍中我们可以了解到这是一个事件发布/订阅框架

使用EventBus有什么好处?

  • 简化组件之间的通信
  • 事件发送方和接收方解耦
  • 能够很好地使用UI组件(例如Activities, Fragments)和后台线程
  • 避免复杂且容易出错的依赖关系和生命周期问题
  • 速度快;专门为高性能而优化
  • 很小(~60k jar)
  • 实际安装量超过10,000,000个的应用程序证明了这一点
  • 具有切换线程、订阅者优先级等高级功能

EventBus自2012-07由greenrobot发布第一个版本,历经7年之久,现版本已更新至3.2。从事Android开发的伙伴就算没用过也一定听说过这个框架,因为它实在是太普及了。尽管目前也有许多新的事件总线框架出现,但是依旧阻挡不住我对它的热爱,作为经典的观察者模式实现,以及在Android平台的普及度,它也深受面试官们宠爱。

EventBus简单使用

这里不作EventBus的全面解析,只罗列下简单的使用,想必大家对于EventBus的使用已经是滚瓜烂熟了,但是这里还是要水一下

注册与解除注册

使用该框架进行事件的发布需要进行注册,不再使用时需要进行解除注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
kotlin复制代码class MainActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 注册成为订阅者
EventBus.getDefault().register(this)
}

override fun onDestroy() {
super.onDestroy()
// 解除注册 不再接收事件
EventBus.getDefault().unregister(this)
}
}

发布事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码/**
* 自定义的事件类
*
* @property message String 消息
*
* @author Qu Yunshuo
* @since 3/29/21 8:05 PM
*/
data class SimpleEvent(val message: String)

/**
* 发布一个事件
*/
fun postEvent() {
EventBus.getDefault().post(SimpleEvent("Hello EventBus!"))
}

绑定接受事件的方法

1
2
3
4
5
6
7
kotlin复制代码/**
* 通过[Subscribe]注解进行注册,并且可以指定[ThreadMode]该方法执行的线程类型
*/
@Subscribe(threadMode = ThreadMode.MAIN)
fun onEvent(event: SimpleEvent) {
Log.d("qqq", "onEvent: ${event.message}")
}

使用自定义注解管理EventBus注册

至此就是发布与订阅的简单实用,更详细的使用在这里就不详细说明,不是本文的重点。

到这里就能发现,我们每个订阅者都需要进行手动的注册与解除注册,否则会产生异常。

那最理想的状态就是不用每一次都手动的注册与解除注册,而是能够自动完成。

常见封装方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kotlin复制代码/**
* 最常见的方式就是写在基类中,在[onCreate]方法中进行注册,在[onDestroy]方法中进行解除注册
* 这样实现类就可以自动的完成注册与解除注册,完全不用考虑忘记解除注册这种低级错误🙅
*/
open class BaseActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
EventBus.getDefault().register(this)
}

override fun onDestroy() {
super.onDestroy()
EventBus.getDefault().unregister(this)
}
}

这种方式简单粗暴有效,但是弊端也很明显,不管子类需不需要注册都会自动帮你注册完,简直快乐的一批,当然这不是我们想要的效果。

那其实可以在这种方式上进行优化,比如写一个hook方法,让子类重写决定是否进行初始化,这也是一个不错的方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
kotlin复制代码open class BaseActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// 判断子类是否需要进行注册和解除注册
if (isRegisterEventBus()) EventBus.getDefault().register(this)
}

override fun onDestroy() {
super.onDestroy()
// 判断子类是否需要进行注册和解除注册
if (isRegisterEventBus()) EventBus.getDefault().unregister(this)
}

/**
* 是否注册EventBus
* @return Boolean
*/
open fun isRegisterEventBus(): Boolean = false
}

这也是一个不错的方案,让子类决定是否进行注册,避免一刀切全部注册的情况,代价只是多重写一个方法。下面介绍另外一种方案,使用自定义注解+反射。

自定义注解+反射

整体思路就是我们自定义一个注解,在需要进行注册的类上添加注解,在基类里进行判断当前子类是否使用了该注解从而决定是否进行注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
kotlin复制代码/**
* 进行标记需要进行注册EventBus
*
* @author Qu Yunshuo
* @since 3/29/21 8:33 PM
*/
@Target(AnnotationTarget.CLASS)
@kotlin.annotation.Retention(AnnotationRetention.RUNTIME)
annotation class EventBusRegister


/**
* Activity基类
*
* @author Qu Yunshuo
* @since 3/29/21 8:36 PM
*/
open class BaseActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// 获取到Class对象判断是否有EventBusRegister注解
if (javaClass.isAnnotationPresent(EventBusRegister::class.java)) {
EventBus.getDefault().register(this)
}
}

override fun onDestroy() {
super.onDestroy()
// 获取到Class对象判断是否有EventBusRegister注解
if (javaClass.isAnnotationPresent(EventBusRegister::class.java)) {
EventBus.getDefault().unregister(this)
}
}
}

以上是注解和基类的逻辑,十分的简单,需要注册时,只需要在添加该注解就ok

1
2
3
4
kotlin复制代码@EventBusRegister
class MainActivity : BaseActivity() {
override fun onCreate(savedInstanceState: Bundle?) {...}
}

是不是使用起来也是十分的简单,并不是说这种方案是最优的,本文只是介绍通过自定义注解+反射来实现封装EventBus注册的逻辑,get到以后就可以进行举一反三。

简单封装Utils

下面也放上简单封装的EventBus工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
kotlin复制代码/**
* EventBus工具类
*
* @author Qu Yunshuo
* @since 3/29/21 8:42 PM
*/
object EventBusUtils {

/**
* 订阅
* @param subscriber 订阅者
*/
fun register(subscriber: Any) = EventBus.getDefault().register(subscriber)

/**
* 解除注册
* @param subscriber 订阅者
*/
fun unRegister(subscriber: Any) = EventBus.getDefault().unregister(subscriber)

/**
* 发送普通事件
* @param event 事件
*/
fun postEvent(event: Any) = EventBus.getDefault().post(event)

/**
* 发送粘性事件
* @param stickyEvent 粘性事件
*/
fun postStickyEvent(stickyEvent: Any) = EventBus.getDefault().postSticky(stickyEvent)

/**
* 手动获取粘性事件
* @param stickyEventType 粘性事件
* @param <T> 事件泛型
* @return 返回给定事件类型的最近粘性事件
*/
fun <T> getStickyEvent(stickyEventType: Class<T>): T = EventBus.getDefault().getStickyEvent(stickyEventType)

/**
* 手动删除粘性事件
* @param stickyEventType 粘性事件
* @param <T> 事件泛型
* @return 返回给定事件类型的最近粘性事件
*/
fun <T> removeStickyEvent(stickyEventType: Class<T>): T = EventBus.getDefault().removeStickyEvent(stickyEventType)
}

结语

其实核心代码就那么几行,大多数开发者在日常工作中都没有用过自定义注解,本文借EventBus向大家展示了自定义注解+反射进行初始化的一个操作,其实还是十分的简单的。

技巧都是慢慢积累起来的,也许以后的开发过程中,就可以用这种方式去解决一些问题,这也是我一年前看博客发现的一个操作,当时我还在实习,心里想着竟然还有这种骚操作,今后一直到现在我都将这让种方法沿用至今,今天拿出来水了一篇,也是想分享下这个小技巧。

code 不只是工作,也会是热爱

另外有兴趣的可以看一下我刚毕业时写的第一篇文章:一个 Android MVVM 组件化架构框架

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

秒杀场景下如何保证数据一致性 什么是秒杀? 高并发场景下秒杀

发表于 2021-03-29

本文主要讨论秒杀场景的解决方案。

什么是秒杀?

从字面意思理解,所谓秒杀,就是在极短时间内,大量的请求涌入,处理不当时容易出现服务崩溃或数据不一致等问题的高并发场景。

常见的秒杀场景有淘宝双十一、网约车司机抢单、12306抢票等等。

高并发场景下秒杀超卖Bug复现

在这里准备了一个商品秒杀的小案例,

  1. 按照正常的逻辑编写代码,请求进来先查库存,库存大于0时扣减库存,然后执行其他订单逻辑业务代码;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码/**
* 商品秒杀
*/
@Service
public class GoodsOrderServiceImpl implements OrderService {

@Autowired
private GoodsDao goodsDao;

@Autowired
private OrderDao orderDao;

/**
* 下单
*
* @param goodsId 商品ID
* @param userId 用户ID
* @return
*/
@Override
public boolean grab(int goodsId, int userId) {
// 查询库存
int stock = goodsDao.selectStock(goodsId);
try {
// 这里睡2秒是为了模拟等并发都来到这,模拟真实大量请求涌入
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 库存大于0,扣件库存,保存订单
if (stock > 0) {
goodsDao.updateStock(goodsId, stock - 1);
orderDao.insert(goodsId, userId);
return true;
}
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Service("grabNoLockService")
public class GrabNoLockServiceImpl implements GrabService {

@Autowired
OrderService orderService;

/**
* 无锁的抢购逻辑
*
* @param goodsId
* @param userId
* @return
*/
@Override
public String grabOrder(int goodsId, int userId) {
try {
System.out.println("用户:" + userId + " 执行抢购逻辑");
boolean b = orderService.grab(goodsId, userId);
if (b) {
System.out.println("用户:" + userId + " 抢购成功");
} else {
System.out.println("用户:" + userId + " 抢购失败");
}
} finally {

}
return null;
}
}
  1. 库存设置为2个;

image.png

  1. 使用jmeter开10个线程压测。
  • 压测结果
    • 库存剩余: 1
      image.png
    • 抢购订单: 10
      image.png

出问题了!出大问题了!!

本来有两个库存,现在还剩一个,而秒杀成功的却有10个,出现了严重的超卖问题!

问题分析

问题其实很简单,当秒杀开始,10个请求同时进来,同时去查库存,发现库存=2,然后都去扣减库存,把库存变为1,秒杀成功,共卖出商品10件,库存减1。

那么怎么解决这个问题呢,说去来也挺简单,加锁就行了。

单机模式下的解决方案

加JVM锁

首先在单机模式下,服务只有一个,加JVM锁就OK,synchronized和Lock都可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码@Service("grabJvmLockService")
public class GrabJvmLockServiceImpl implements GrabService {

@Autowired
OrderService orderService;

/**
* JVM锁的抢购逻辑
*
* @param goodsId
* @param userId
* @return
*/
@Override
public String grabOrder(int goodsId, int userId) {
String lock = (goodsId + "");

synchronized (lock.intern()) {
try {
System.out.println("用户:" + userId + " 执行抢购逻辑");
boolean b = orderService.grab(goodsId, userId);
if (b) {
System.out.println("用户:" + userId + " 抢购成功");
} else {
System.out.println("用户:" + userId + " 抢购失败");
}
} finally {

}
}
return null;
}
}

这里以synchronized为例,加锁之后恢复库存重新压测,结果:

  • 压测结果
    • 库存剩余: 0
      image.png
    • 抢购订单: 2
      image.png

大功告成!

JVM锁在集群模式下还有效果吗?

单机模式下的问题解决了,那么在集群模式下,加JVM级别的锁还有效吗?

这里起了两个服务,并且加了一层网关,用来做负载均衡,重新压测,

  • 压测结果
    • 库存剩余: 0
      image.png
    • 抢购订单: 4
      image.png

答案是显而易见的,锁无效!!

集群模式下的解决方案

问题分析

出现这种问题的原因是,JVM级别的锁在两个服务中是不同的两把锁,两个服务各拿个的,各卖各的,不具有互斥性。

image.png

那怎么办呢?也好办,把锁独立出来就好了,让两个服务去拿同一把锁,也就是分布式锁。

image.png

分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。

在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,这个时候,便需要使用到分布式锁。

常见的分布式锁的实现方式有MySQL、Redis、Zookeeper等。

分布式锁–MySQL

MySQL实现锁的方案是:准备一张表作为锁,

  • 加锁时将要抢购的商品ID作为主键或者唯一索引插入作为锁的表中,这样其他线程来加锁时就会插入失败,从而保证互斥性;
  • 解锁时将这条记录删除,其他线程可以继续加锁。

按照上面的方案,编写的部分代码:

  • 锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
java复制代码/**
* MySQL写的分布式锁
*/
@Service
@Data
public class MysqlLock implements Lock {

@Autowired
private GoodsLockDao goodsLockDao;

private ThreadLocal<GoodsLock> goodsLockThreadLocal;

@Override
public void lock() {
// 1、尝试加锁
if (tryLock()) {
System.out.println("尝试加锁");
return;
}
// 2.休眠
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3.递归再次调用
lock();
}

/**
* 非阻塞式加锁,成功,就成功,失败就失败。直接返回
*/
@Override
public boolean tryLock() {
try {
GoodsLock goodsLock = goodsLockThreadLocal.get();
goodsLockDao.insert(goodsLock);
System.out.println("加锁对象:" + goodsLockThreadLocal.get());
return true;
} catch (Exception e) {
return false;
}
}

@Override
public void unlock() {
goodsLockDao.delete(goodsLockThreadLocal.get().getGoodsId());
System.out.println("解锁对象:" + goodsLockThreadLocal.get());
goodsLockThreadLocal.remove();
}

@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub

}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}

@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
  • 抢购逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码@Service("grabMysqlLockService")
public class GrabMysqlLockServiceImpl implements GrabService {

@Autowired
private MysqlLock lock;

@Autowired
OrderService orderService;

ThreadLocal<GoodsLock> goodsLock = new ThreadLocal<>();

@Override
public String grabOrder(int goodsId, int userId) {
// 生成key
GoodsLock gl = new GoodsLock();
gl.setGoodsId(goodsId);
gl.setUserId(userId);
goodsLock.set(gl);
lock.setGoodsLockThreadLocal(goodsLock);

// lock
lock.lock();

// 执行业务
try {
System.out.println("用户:"+userId+" 执行抢购逻辑");

boolean b = orderService.grab(goodsId, userId);
if(b) {
System.out.println("用户:"+userId+" 抢购成功");
}else {
System.out.println("用户:"+userId+" 抢购失败");
}
} finally {
// 释放锁
lock.unlock();
}
return null;
}
}

恢复库存后继续压测,结果符合预期,数据一致。

  • 压测结果
    • 剩余库存:0
      image.png
    • 抢购成功:2
      image.png

问题与解决方案

  1. 由于突然断网等原因,导致锁没有释放成功怎么办?

答:在作为锁的表中加开始时间、结束时间两个字段作为锁的有效期,由于各种原因导致锁没有及时释放时,可以根据有效期进行判断锁是否有效。

  1. 给锁加了有效期后,若有效期结束,线程任务还没有执行完毕怎么办?

答:可以引入watch dog机制,在任务未执行结束前,给锁续期,这个在后面再详细解释。

分布式锁–Redis

在一些中小型项目中可以使用MySQL方案,在大型项目中,给MySQL的配置加上去也可以使用,但用的最多的还是Redis。

Redis加锁的实现方式是使用setnx命令,格式:setnx key value。

setnx是「set if not exists」的缩写;若key不存在,则将key的值设置为value;当key存在时,不做任何操作。

  • 加锁:setnx key value
  • 解锁:del key

Redis分布式锁–死锁问题

产生原因

已经加锁的服务在执行过程中挂掉了,没有来得及释放锁,锁一直存在在Redis中,导致其他服务无法加锁。

解决方案

设置key的过期时间,让key自动过期,过期后,key就不存在了,其他服务就能继续加锁。

  • 要注意的是,添加过期时间时,不能使用这种方式:
1
2
sh复制代码setnx key value;
expire key time_in_second;

这种方式也可能在第一句setnx成功后挂掉,过期时间没有设置,导致死锁。

  • 有效的方案是通过一行命令加锁并设置过期时间,格式如下:
1
sh复制代码set key value nx ex time_in_second;

这种方式在 Redis 2.6.12 版本开始支持,老版本的Redis可以使用LuaScript。

过期时间引发的问题

问题一:假设锁过期时间设置为10秒,服务1加锁后执行10秒还未结束,此时锁过期了,服务2来加锁也能成功,导致两个服务同时拿到锁。

问题二:服务1在执行了14秒后结束去释放锁,会把服务2加的锁释放掉,此时服务3又能加锁成功。

解决方案

问题二容易解决,在释放锁的时候判断一下是不是自己加的锁,如果是自己加的锁,就释放;如果不是则略过。

问题一解决方案:就是上面说的 Watch Dog(看门狗)机制

简单的理解就是另起一个子线程(看门狗),帮主线程看着过期时间,当主线程在执行业务逻辑没有结束时,过期时间每过三分之一,子线程(看门狗)就把过期时间续满,从而保证主线程没有结束,锁就不会过期。

  • Watch Dog(看门狗)机制的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@Service
public class RenewGrabLockServiceImpl implements RenewGrabLockService {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Override
@Async
public void renewLock(String key, String value, int time) {
System.out.println("续命"+key+" "+value);
String v = redisTemplate.opsForValue().get(key);
// 写成死循环,加判断
if (StringUtils.isNotBlank(v) && v.equals(value)){
int sleepTime = time / 3;
try {
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
redisTemplate.expire(key,time,TimeUnit.SECONDS);
renewLock(key,value,time);
}
}

Redis单节点故障

如果执行过程中Redis挂掉了,所有服务来加锁都加不上锁,这就是单节点故障问题。

image.png

解决方案

使用多台Redis。

首先来分析一个问题,多台Redis之间可以做主从吗?

image.png

Redis主从问题

当一个线程加锁成功后,key还没有被同步过去,Redis Master节点挂了,此时Slave节点中没有key的存在,另一个服务来加锁依然可以加锁成功。

image.png

所以,不能使用主从方案。

还有一种方案是红锁。

红锁

红锁方案也是使用多台Redis,但是多台Redis之间没有任何关系,就是独立的Redis。

加锁时,在一台Redis上加锁成功后,马上去下一台Redis上加锁,最终若在过半的Redis上加锁成功,则加锁成功,否则加锁失败。

image.png

红锁会不会出现超卖问题?

会!。

如果运维小哥很勤快,做了自动化,Redis挂掉之后,马上重启了一台,那么重启的Redis里没有之前加锁的key,其他线程依然能够加锁成功,这就导致两个线程同时拿到锁。

image.png

  • 解决方案:延迟重启挂掉的Redis,延迟一天启动也没有问题,重启太快才会有问题。

终极问题

到现在为止程序已经完美了吗?

并没有!

当程序在执行的时候,锁也加上了,狗(watch dog)也开始不停的续期,一切看似很美好,但是Java里还有一个终极问题–STW(Stop The World)。

当遇到FullGC时,JVM会发生STW(Stop The World),此时,世界被按下了暂停键,执行任务的主线程暂停了,用来续期的狗(watch dog)也不会再续期,Redis中的锁会慢慢过期,当锁过期之后,其他JVM又可以来成功加锁,原来的问题又出现了,同时有两个服务拿到锁。

image.png

解决方案
  • 方案一: 鸵鸟算法
  • 方案二: 终极方案 – Zookeeper+MySQL乐观锁

分布式锁–Zookeeper+MySQL乐观锁

Zookeeper是怎么解决STW问题的呢?

  • 加锁时,在zookeeper中创建一个临时顺序节点,创建成功后zookeeper会生成一个序号,将这个序号存到MySQL中的verson字段做校验;
+ 如果锁未释放,发生了`STW`,紧接着锁过期,其他服务去加锁后,会将MySQL中的version字段变掉;
  • 解锁时,验证version字段是否是自己加锁时的内容
+ 如果是,删除节点,释放锁;
+ 如果不是,说明自己已经昏睡过了,执行失败。

世界变得清静了。

相关代码

  • gitee: distributed-lock

参考

马士兵教育

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…695696697…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%