开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

安全权限框架 —— Shiro(一)

发表于 2021-08-26

这是我参与8月更文挑战的第26天,活动详情查看:8月更文挑战

1.简介

  • Apache Shiro 是Java的一个安全(权限)框架;
  • Shiro 可以非常容易的开发出足够好的应用,其不仅可以用在JavaSE环境,也可以用在JavaEE环境;
  • 主要功能:认证(Authentication)、授权(Authorization)、加密、回话管理、与Web集成、缓存等;

Shiro 架构

  • Subject:应用代码直接交互的对象是Subject,也就是说Shiro的对外API 核心就是Subject。Subject 代表了当前“用户”,这个用户不一定是一个具体的人,与当前应用交互的任何东西都是Subject,如网络爬虫,机器人等;与Subject 的所有交互都会委托给SecurityManager;Subject 其实是一个门面,SecurityManager才是实际的执行者;
  • SecurityManager:安全管理器;即所有与安全有关的操作都会与SecurityManager交互;且其管理着所有Subject;可以看出它是Shiro的核心,它负责与Shiro的其他组件进行交互,它相当于SpringMVC中DispatcherServlet的角色
  • Realm:Shiro从Realm 获取安全数据(如用户、角色、权限),就是说SecurityManager要验证用户身份,那么它需要从Realm 获取相应的用户进行比较以确定用户身份是否合法;也需要从Realm 得到用户相应的角色/权限进行验证用户是否能进行操作;可以把Realm 看成DataSource

2.与Spring整合开发

2.1 Shiro认证

2.1.1 配置文件

web.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
xml复制代码<!-- Shiro 过滤器定义 -->  
<filter>
<filter-name>shiroFilter</filter-name>
<filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class>
<init-param>
<!-- 该值缺省为 false,表示生命周期由 SpringApplicationContext;
设置为 true 表示由 ServletContainer 管理
-->
<param-name>targetFilterLifecycle</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>shiroFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

applicationContext.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
xml复制代码    <!-- 1.安全管理器SecurityManager -->
<bean id="securityManager" class="org.apache.shiro.web.mgt.DefaultWebSecurityManager">
<property name="realm" ref="myRealm"/>
</bean>

<!--2.配置CacheManager
2.1 需要加入ehcache 的jar包,及配置文件
-->
<bean id="cacheManager" class="org.apache.shiro.cache.ehcache.EhCacheManager">
<property name="cacheManagerConfigFile" value="classpath:ehcache.xml"/>
</bean>

<!-- 3.自定义Realm
-->
<bean id="myRealm" class="com.xiaojian.shiro.realms.MyRealm"/>


<!-- 4.保证实现了Shiro内部lifecycle函数的bean执行
可以来调用IOC容器中 shiro bean 生命周期方法
-->
<bean id="lifecycleBeanPostProcessor" class="org.apache.shiro.spring.LifecycleBeanPostProcessor"/>

<!-- 5.开启Shiro注解,必须配置了lifecycleBeanPostProcessor -->
<bean class="org.springframework.aop.framework.autoproxy.DefaultAdvisorAutoProxyCreator" depends-on="lifecycleBeanPostProcessor"/>
<bean class="org.apache.shiro.spring.security.interceptor.AuthorizationAttributeSourceAdvisor">
<property name="securityManager" ref="securityManager"/>
</bean>

<!-- 5.Shiro过滤器
id 必须和web.xml文件中配置DelegatingFilterProxy的<filter-name>一致
因为Shiro会在 IOC容器中查询和 <filter-name> 名字对应的 filter bean
-->
<bean id="shiroFilter" class="org.apache.shiro.spring.web.ShiroFilterFactoryBean">
<!-- Shiro的核心安全接口,这个属性是必须的 -->
<property name="securityManager" ref="securityManager"/>
<!-- 身份认证失败,则跳转到登录页面的配置 -->
<property name="loginUrl" value="/login.jsp"/>
<property name="successUrl" value="success.jsp"/>
<property name="unauthorizedUrl" value="unauth.jsp"/>

<!-- Shiro连接约束配置,即过滤链的定义 -->
<property name="filterChainDefinitions">
<value>
/login.jsp=anon
/**=authc
</value>
</property>
</bean>

MyRealm.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码public class MyRealm extends AuthorizingRealm{

@Resource
private BloggerService bloggerService;

/**
* AuthorizationInfo:角色的权限集合
* 获取授权信息
*/
@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
// TODO Auto-generated method stub
return null;
}

/**
* AuthenticationInfo:用户的角色集合
* 登录验证
* token: 令牌,基于用户名密码的名牌
*/
@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authenticationToken) throws AuthenticationException {
System.out.println("执行认证逻辑");

// 假设用户名、密码
String username = "xiaojian";

// 1.转换类型,获取Token
UsernamePasswordToken token = (UsernamePasswordToken)authenticationToken;
// 2.对比用户名
if(!token.getUsername().equals(username)){
// 用户名不存在
return null; //// Shiro底层会抛出 UnknownAccountException 异常
}
// 3.根据用户情况,构建AuthenticationInfo对象返回。通常使用实现类:SimpleAuthenticationInfo
// 3.1 principal:认证的实体信息,可以是username,也可以是对应的实体类对象
Object principal = null;
// 3.2 hashedCredentials:密码
Object hashedCredentials = "1234"; //a20b8e682a72eeac0049847855cecb86
// 3.3 realName:当前realm对象的name,调用父类的getName()方法
String realmName = getName();

SimpleAuthenticationInfo simpleAuthenticationInfo = null;
simpleAuthenticationInfo = new SimpleAuthenticationInfo(principal,hashedCredentials,realmName);

return simpleAuthenticationInfo;
}
}

2.1.2 Shiro中默认的过滤器

过滤器类 过滤器名称 例子
anon 无参,匿名可访问 /login.jsp=anon
authc 无参,需认证(登录)才能访问 /admin/**=authc
user 无参,表示必须存在用户
perms 可有多个参数,多个时必须加上引号,参数间用逗号分隔。当有多个参数时必须每个参数都通过,相当于isPermitedAll()方法 /admin/*=perms[user:add] /admin/**=perms[“user:add,user:update”]
roles 角色过滤器,判断当前用户是否指定角色。规则同上。相当于hasAllRoles()方法 /admin/**=roles[“admin,guest”]
logout 注销登录时,完成一定的功能:任何现有的session都将会失效,而且任何身份都将失去关联(web程序中,RememberMe cookie也将会被删除)

2.1.3 URL匹配模式

使用Ant风格模式,Ant路径通配符支持 ?、*、**,通配符匹配不包括目录分隔符 “/“

  • ? : 匹配一个字符,如:/admin?,匹配:/admin1;不匹配:/admin123, /admin/
  • *:匹配零个或多个字符串或一个路径
  • **:匹配路径中的零个或多个路径

2.1.4 URL匹配顺序

​ 第一次匹配优先的方式。所以一般 /* 的路径访问都放在后面。

2.1.5 Shiro加密

Shiro 认证中密码比对:使用AuthorizingRealm中的 credentialsMatcher 进行的密码比对。

1、md5加密(不可逆的)

(1).如何把一个字符串加密为MD5;

(2).替换当前 Realm的credentialsMatcher 属性,直接使用HashedCredentialsMather对象,并设置加密算法。

applicationContext.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
xml复制代码    <!-- 3.自定义Realm
实现了 Realm 接口的bean
-->
<bean id="myRealm" class="com.xiaojian.shiro.realms.MyRealm">
<property name="credentialsMatcher">
<bean class="org.apache.shiro.authc.credential.HashedCredentialsMatcher">
<!--加密方式-->
<property name="hashAlgorithmName" value="MD5"/>
<!--加密的次数-->
<property name="hashIterations" value="1024"/>
</bean>
</property>
</bean>

这时候服务器会自动将浏览器传来的密码使用 MD5加密,加密1024次。

2、md5盐值加密

applicationContext.xml不变

(1). doGetAuthenticationInfo方法返回值创建SimpleAuthenticationInfo,使用构造器:

​ SimpleAuthenticationInfo(principal, hashedCredentials, credentialsSalt, realmName);

(2). 使用ByteSource.Util.bytes(username);加密盐值

(3). 盐值需唯一,一般使用随机字符串、user id

(4). 可以使用 new SimpleHash(hashAlgorithmName, credentials, salt, hashIterations); 计算盐值加密后的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码   /**
* 执行认证逻辑
*/
@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authenticationToken) throws AuthenticationException {
System.out.println("执行认证逻辑");

// 1.转换类型,获取Token
UsernamePasswordToken token = (UsernamePasswordToken)authenticationToken;
String username = token.getUsername();

//3.根据用户情况,构建AuthenticationInfo对象返回,通常使用实现类 SimpleAuthenticationInfo
// 3.1 principal:认证的实体信息,可以是username,也可以是对应的实体类对象
Object principal = username;
// 3.2 hashedCredentials:密码
Object hashedCredentials = null; //a20b8e682a72eeac0049847855cecb86
// 2.对比用户名、密码
if("user".equals(username)){
hashedCredentials = "3e042e1e3801c502c05e13c3ebb495c9";
} else if("admin".equals(username)){
hashedCredentials = "c34af346c89b8b03438e27a32863c9b5";
}
// 3.3 credentialsSalt:加密盐值
ByteSource credentialsSalt = ByteSource.Util.bytes(username);
// 3.4 realName:当前realm对象的name,调用父类的getName()方法
String realmName = getName();

SimpleAuthenticationInfo simpleAuthenticationInfo = new SimpleAuthenticationInfo(principal, hashedCredentials, credentialsSalt, realmName);

return simpleAuthenticationInfo;
}

MD5Test.java

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class MD5Test {
public static void main(String[] args) {
String hashAlgorithmName = "MD5";
Object credentials = "1234";
Object salt = "admin";
int hashIterations = 1024;

Object result = new SimpleHash(hashAlgorithmName, credentials, salt, hashIterations);
System.out.println(result);
}
}

2.1.6 多Reaml

为什么使用多 Realm ?

​ 根据不同的登录需求,需要做不同的验证。如:手机号登录、邮箱登录。。。

使用 ModularRealmAuthenticator类,属性Collection realms;注入 Realm集合

SecondRealm.java 同MyRealm类,将验证密码改为 SHA1加密后的值。

applicationContext.xml

1
2
3
4
5
6
7
8
9
10
xml复制代码    <bean id="secondRealm" class="com.xiaojian.shiro.realms.SecondRealm">
<property name="credentialsMatcher">
<bean class="org.apache.shiro.authc.credential.HashedCredentialsMatcher">
<!--加密方式-->
<property name="hashAlgorithmName" value="SHA1"/>
<!--加密的次数-->
<property name="hashIterations" value="1024"/>
</bean>
</property>
</bean>
认证策略 (AuthenticationStrategy)

AuthenticationStrategy 接口的默认实现:

  • FirstSuccessfulStrategy: 还要有一个 Realm 验证成功即可,只返回第一个 Realm 身份验证成功的认证信息,其他的忽略;
  • AtLeastOneSuccessfulStrategy: 只要有一个 Realm 验证成功即可,和 FirstSuccessfulStrategy 不同,将返回所有 Realm 身份验证成功的认证信息;
  • AllSuccessfulStrategy: 所有 Realm 验证成功才算成功,且返回所有 Realm 身份验证成功的认证信息,如果有一个失败就失败了。
  • ModularRealmAuthenticator 默认是 AtLeastOneSuccessfulStrategy 策略

配置:

1
2
3
4
5
6
7
8
9
10
11
xml复制代码<bean id="authenticator" class="org.apache.shiro.authc.pam.ModularRealmAuthenticator">
<!-- <property name="realms">-->
<!-- <list>-->
<!-- <ref bean="myRealm"/>-->
<!-- <ref bean="secondRealm"/>-->
<!-- </list>-->
<!-- </property>-->
<property name="authenticationStrategy">
<bean class="org.apache.shiro.authc.pam.AllSuccessfulStrategy"/>
</property>
</bean>

使用多 Realm 后,可以把 authenticator 配置给 SecurityManager

通常将所有的 Realm 配置给 安全管理器SecurityManager

1
2
3
4
5
6
7
8
9
10
11
12
xml复制代码之后
<!-- 1.安全管理器SecurityManager -->
<bean id="securityManager" class="org.apache.shiro.web.mgt.DefaultWebSecurityManager">
<property name="authenticator" ref="authenticator"/>

<property name="realms">
<list>
<ref bean="myRealm"/>
<ref bean="secondRealm"/>
</list>
</property>
</bean>

2.2 Shiro 授权

多 Realm 实现授权时,有一个通过,都是授权通过。

1
2
3
xml复制代码Shiro过滤器添加角色拦截    
/user.jsp = roles[user]
/admin.jsp = roles[admin]

MyRealm.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码/**
* 执行授权逻辑
*/
@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principalCollection) {
System.out.println("MyRealm 执行授权逻辑");
// 1. 从 principalCollection 获取用户信息 (是用户名还是用户对象,取决于你认证时 principal参数 放入的是啥)
Object principal = principalCollection.getPrimaryPrincipal();
// 2. 利用登录用户的信息来获取当前用户的角色或权限
Set<String> roles = new HashSet<>();
roles.add("user");
if("admin".equals(principal)){
roles.add("admin");
}
// 3. 创建 SimpleAuthorizationInfo,并设置其 roles 属性
SimpleAuthorizationInfo authorizationInfo = new SimpleAuthorizationInfo();
authorizationInfo.setRoles(roles);

// 4. 返回 SimpleAuthorizationInfo
return authorizationInfo;
}

2.3 Shiro 标签

Shiro 提供了 JSTL 标签用于在 JSP 页面进行权限控制,如根据登录用户显示相应的页面按钮。

标签 描述 示例
guest 用户没有身份验证时显示相应信息,即游客信息 image-20200307234952932
user 用户已经经过认证/记住我登录后显示相应的信息 image-20200308000528385
authenticated 用户已经身份验证通过,即 Subject.login登录成功,不是记住我登录的 image-20200308000656696
notAuthenticated 用户未进行身份验证,即没有调用Subject.login进行登录,包括记住我自动登录的也属于未进行身份验证。 image-20200308000937683
pincipal 显示用户身份信息,默认调用 image-20200308001012588
hasRole 如果当前Subject 有角色将显示body 体内容 image-20200308001040891
hasAnyRoles 如果当前Subject有任意一个角色(或的关系)将显示body体内容。 image-20200308001147878
lacksRole 如果当前Subject 没有角色将显示body 体内容 image-20200308001256323
hasPermission 如果当前Subject 有权限将显示body 体内容 image-20200308001314089
lacksPermission 如果当前Subject没有权限将显示body体内容 image-20200308001326991

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Springboot 解决跨域的四种姿势

发表于 2021-08-26

简介

跨域我就不多说了,我们今天开门见山直接解决跨域的几种姿势,那就上姿势

姿势

姿势一

实现WebMvcConfigurer#addCorsMappings的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kotlin复制代码import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class CorsConfig implements WebMvcConfigurer {

@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*")
.allowedMethods("GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS")
.allowCredentials(true)
.maxAge(3600)
.allowedHeaders("*");
}
}

姿势二

重新注入CorsFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.filter.CorsFilter;

/**
* 解决跨域
*/
@Configuration
public class CorsFilterConfig {


/**
* 开启跨域访问拦截器
*
* @date 2021/4/29 9:50
*/
@Bean
public CorsFilter corsFilter() {
//创建CorsConfiguration对象后添加配置
CorsConfiguration corsConfiguration = new CorsConfiguration();
//设置放行哪些原始域
corsConfiguration.addAllowedOrigin("*");
//放行哪些原始请求头部信息
corsConfiguration.addAllowedHeader("*");
//放行哪些请求方式
corsConfiguration.addAllowedMethod("*");

UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
//2. 添加映射路径
source.registerCorsConfiguration("/**", corsConfiguration);
return new CorsFilter(source);
}
}

姿势三

创建一个filter解决跨域

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
less复制代码@Slf4j
@Component
@WebFilter(urlPatterns = { "/*" }, filterName = "headerFilter")
public class HeaderFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse resp, FilterChain chain) throws IOException, ServletException {
HttpServletResponse response = (HttpServletResponse) resp;
//解决跨域访问报错
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Access-Control-Allow-Methods", "POST, PUT, GET, OPTIONS, DELETE");
//设置过期时间
response.setHeader("Access-Control-Max-Age", "3600");
response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, client_id, uuid, Authorization");
// 支持HTTP 1.1.
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate");
// 支持HTTP 1.0. response.setHeader("Expires", "0");
response.setHeader("Pragma", "no-cache");
// 编码
response.setCharacterEncoding("UTF-8");
chain.doFilter(request, resp);
}

@Override
public void init(FilterConfig filterConfig) {
log.info("跨域过滤器启动");
}

@Override
public void destroy() {
log.info("跨域过滤器销毁");
}
}

姿势四

使用CrossOrigin 注解

可以使用在单个方法上也可以使用在类上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
typescript复制代码Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CrossOrigin {

/** @deprecated as of Spring 5.0, in favor of {@link CorsConfiguration#applyPermitDefaultValues} */
@Deprecated
String[] DEFAULT_ORIGINS = {"*"};

/** @deprecated as of Spring 5.0, in favor of {@link CorsConfiguration#applyPermitDefaultValues} */
@Deprecated
String[] DEFAULT_ALLOWED_HEADERS = {"*"};

/** @deprecated as of Spring 5.0, in favor of {@link CorsConfiguration#applyPermitDefaultValues} */
@Deprecated
boolean DEFAULT_ALLOW_CREDENTIALS = false;

/** @deprecated as of Spring 5.0, in favor of {@link CorsConfiguration#applyPermitDefaultValues} */
@Deprecated
long DEFAULT_MAX_AGE = 1800;


/**
* Alias for {@link #origins}.
*/
@AliasFor("origins")
String[] value() default {};

/**
* A list of origins for which cross-origin requests are allowed. Please,
* see {@link CorsConfiguration#setAllowedOrigins(List)} for details.
* <p>By default all origins are allowed unless {@code originPatterns} is
* also set in which case {@code originPatterns} is used instead.
*/
@AliasFor("value")
String[] origins() default {};

/**
* Alternative to {@link #origins()} that supports origins declared via
* wildcard patterns. Please, see
* @link CorsConfiguration#setAllowedOriginPatterns(List)} for details.
* <p>By default this is not set.
* @since 5.3
*/
String[] originPatterns() default {};

/**
* The list of request headers that are permitted in actual requests,
* possibly {@code "*"} to allow all headers.
* <p>Allowed headers are listed in the {@code Access-Control-Allow-Headers}
* response header of preflight requests.
* <p>A header name is not required to be listed if it is one of:
* {@code Cache-Control}, {@code Content-Language}, {@code Expires},
* {@code Last-Modified}, or {@code Pragma} as per the CORS spec.
* <p>By default all requested headers are allowed.
*/
String[] allowedHeaders() default {};

/**
* The List of response headers that the user-agent will allow the client
* to access on an actual response, other than "simple" headers, i.e.
* {@code Cache-Control}, {@code Content-Language}, {@code Content-Type},
* {@code Expires}, {@code Last-Modified}, or {@code Pragma},
* <p>Exposed headers are listed in the {@code Access-Control-Expose-Headers}
* response header of actual CORS requests.
* <p>The special value {@code "*"} allows all headers to be exposed for
* non-credentialed requests.
* <p>By default no headers are listed as exposed.
*/
String[] exposedHeaders() default {};

/**
* The list of supported HTTP request methods.
* <p>By default the supported methods are the same as the ones to which a
* controller method is mapped.
*/
RequestMethod[] methods() default {};

/**
* Whether the browser should send credentials, such as cookies along with
* cross domain requests, to the annotated endpoint. The configured value is
* set on the {@code Access-Control-Allow-Credentials} response header of
* preflight requests.
* <p><strong>NOTE:</strong> Be aware that this option establishes a high
* level of trust with the configured domains and also increases the surface
* attack of the web application by exposing sensitive user-specific
* information such as cookies and CSRF tokens.
* <p>By default this is not set in which case the
* {@code Access-Control-Allow-Credentials} header is also not set and
* credentials are therefore not allowed.
*/
String allowCredentials() default "";

/**
* The maximum age (in seconds) of the cache duration for preflight responses.
* <p>This property controls the value of the {@code Access-Control-Max-Age}
* response header of preflight requests.
* <p>Setting this to a reasonable value can reduce the number of preflight
* request/response interactions required by the browser.
* A negative value means <em>undefined</em>.
* <p>By default this is set to {@code 1800} seconds (30 minutes).
*/
long maxAge() default -1;

以上四种姿势都学会了么?学会了三连哦

可以关注公众号,学习更多的姿势
扫码_搜索联合传播样式-标准色版.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

「一探究竟」Java SPI机制

发表于 2021-08-26

⚠️本文为掘金社区首发签约文章,未获授权禁止转载

事件起因

七月中旬,我司的系统潜在风险排查工作在如火如荼的进行,其中我发现当前系统的调用源缺少Token信息,难以做到具体的识别和监控,因此需要对其优化。

针对刚提到的两个问题,我只需要实现某个框架基类,然后做一点业务处理即可,根据框架的说明文档,按步骤实现以下内容即可:

开发、调试一气呵成之后,我对这种实现方式起了好奇之心,非常疑惑它们是如何在框架中实例化并发挥作用的,有兴趣的话就跟我一起一探究竟吧(😜)

什么是SPI

最初我甚至都不知道这种技术/方案是Java自身支持的,还以为是框架自身设计的骚操作,后来询问其他同事才知晓这种灵活的提供服务能力的方式被称为SPI,官方一点的解释如下:

SPI:全称为 Service Provider Interface。是Java提供的一套用来被第三方实现或者扩展的接口,多用于框架扩展、插件开发等等。

例如上文中提到的实现参数过滤器就属于框架扩展范畴,简单了解后我们来整一个小Demo吧。

SPI的工作方式

SPI的发现能力是不需要依赖于其他类库,主要有两种实现方式:

  • sun.misc.Service Sun公司提供的加载能力
  • java.util.ServiceLoader#load JDK自身提供的加载能力

因为方法二是JDK内部代码,包含源码,因此后续都默认使用该方法进行说明

基本使用步骤:

  1. 定义一个需要对外提供能力的接口
1
2
3
java复制代码public interface SPIInterface {
String handle();
}
  1. 定义实现类,实现指定接口
1
2
3
4
5
6
java复制代码public class SPIInterfaceImpl implements SPIInterface {
@Override
public String handle() {
return "当前时间为: " + LocalDateTime.now();
}
}
  1. 在指定位置配置相关的实现类:resource/META-INF/services

注意 resource为资源文件

1
2
3
bash复制代码# 文件位置(resource/META-INF/services/com.mine.spi.SPIInterface)
# 内容(实现类的全类名)
com.mine.spi.impl.SPIInterfaceImpl
  1. 使用JDK提供的初始化能力,直接调用即可
1
2
3
4
5
6
7
8
9
10
11
java复制代码public class SpiApp {
public static void main(String[] args) {
ServiceLoader<SPIInterface> load = ServiceLoader.load(SPIInterface.class);
for (SPIInterface ser : load) {
System.out.println(ser.handle());
}
}
}

// 响应
// 当前时间为: 2021-08-24T03:30:52.397

简单到爆炸,关键还是在于JDK已经帮助我们实现了这一套发现和初始化的步骤,下面咱们来深入分析一下它的基本源码 😁

从方法:java.util.ServiceLoader#load 为入口,将当前接口Class类型及其类加载器传入至Loader变量中:

1
2
3
4
5
6
7
8
9
10
11
java复制代码/**
* service:接口类型
* loader:类加载器
* acc:安全管理器
*/
private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
reload();
}

变量传入之后,初始化类:LazyIterator,从名称就可以看出来这是一个懒加载的迭代器,只有真正使用触发时才会进行实例的初始化,核心初始化逻辑在方法:java.util.ServiceLoader.LazyIterator#nextService中。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码private S nextService() {
// 省略其他代码...

Class<?> c = null;
try {
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}

// 省略其他代码...
}

因为拿到了接口类型及其全类名,所以通过反射构建出实例对象还是非常容易的,拿到实例化的对象后,就和普通的代码没有什么区别了。

下面我们再看看几个框架实际使用SPI的例子,瞻仰一下前辈们的代码 😎

SPI使用案例分析

Log4j-Api

以Log4j日志框架为例,log4j-api-2.13.3.jar 版本就基于 SPI实现了 PropertySource接口,用以收集当前服务器相关的配置信息,如下图所示:

同样的,log4j-core-2.13.3.jar基于 SPI实现了日志门面的绑定,核心代码如下所示:

1
2
3
4
5
6
7
8
java复制代码/**
* Binding for the Log4j API.
*/
public class Log4jProvider extends Provider {
public Log4jProvider() {
super(10, "2.6.0", Log4jContextFactory.class);
}
}

JDBC驱动

以我们常用的JDBC驱动 mysql-connector-java-5.1.43.jar为例,它同样实现了SPI接口,驱动类分别为:Driver,FabricMySQLDriver,其底层实现是向驱动管理类注册自身,核心代码如下,它帮我们自动做了 Class.forName("com.mysql.jdbc.Driver")这一步加载动作。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class Driver extends NonRegisteringDriver implements java.sql.Driver {
//
// Register ourselves with the DriverManager
//
static {
try {
java.sql.DriverManager.registerDriver(new Driver());
} catch (SQLException E) {
throw new RuntimeException("Can't register driver!");
}
}
}

FabricMySQLDriver类则同理,当然了,我们也可以主动破坏这种加载的机制,比如自行实现一个MySQLDriver,来实现数据库连接,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class CustomDriver extends NonRegisteringDriver implements Driver {

static {
try {
java.sql.DriverManager.registerDriver(new CustomDriver());
} catch (SQLException ignored) {}
}

public CustomDriver() throws SQLException { }

@Override
public Connection connect(String url, Properties info) throws SQLException {
System.out.println("[Kerwin] 执行数据库连接...");
return super.connect(url, info);
}

@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
}

然后将 CustomDriver注入到SPI中即可。

需要注意的是 CustomDriver类需要实现继承 NonRegisteringDriver类,否则会被默认的Driver优先注册,完成之后使用上古的JDBC代码调用,即可模拟破坏SPI的情况,如图:

1
2
3
4
5
6
7
8
java复制代码public void customDriver() throws SQLException {
Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/db_file?characterEncoding=UTF-8&useSSL=false", "root", "");
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM script_dir LIMIT 1");
while (rs.next()) {
System.out.println(rs.getString(1));
}
}

可以看到,我们使用自定义驱动类成功获取到数据库连接,替换了原本的Driver驱动类,具体细节需要大家再Debug看看,因为涉及接口类型,拿到连接后Return等等。

控制台输出:

1
bash复制代码[Kerwin] 执行数据库连接...

SPI的应用场景

了解完它的基本使用方法和原理之后,SPI的神秘感顿时化为虚有,说到底就是基于约定在指定位置选择性配置接口实现类,由JDK动态初始化及执行的机制。

日常开发要不要使用SPI?

我们从上文中能直接体会到SPI机制的好处,它可以起到策略选择、动态初始化、解耦的作用,那我们在普通项目开发中要不要使用呢?我个人是不推荐使用SPI的方式,主要原因还是我们可以使用更优雅的方式来替代SPI机制,比如:

  • 动态初始化、策略选择 =》我们可以使用策略+工厂模式实现策略的动态选择,配合ZK来实现动态初始化(启用/禁用)
  • 解耦 =》基于良好的设计,可以很容易的实现解耦

基于上述的方案,可以保证项目代码具备SPI的好处的同时更加易读,降低理解成本。

框架/组件工具开发要不要使用SPI?

答案是毋庸置疑的,现在的诸多框架及工具就是使用SPI来实现的,引入了SPI机制后,服务接口与服务实现就会达成分离的状态,可以实现解耦以及可扩展机制。

例如Sharding-jdbc的加密算法接口,原生仅提供了AES和MD5两种加密方式,需要其他加密方式的项目就可以使用SPI机制将自己需要的加密方式写入框架内,然后根据需要调用即可,无论是使用还是维护都更加方便。

因为Java实现的SPI版本相对比较粗糙和暴力,导致它会把所有接口实现类全部实例化一遍,所以还有框架会对Java的SPI进行封装和优化,比如Dubbo,它将配置文件中的全类名修改为了键值对的方式,以满足按需加载的需要,同时增加了IOC及AOP的特性,自适应扩展等机制。

通过上文的工作方式我们就可以了解到SPI的机制并不神秘,如果个人需要简单封装的话,还是轻而易举的。

学习SPI的思想

SPI机制有一定的必然性,以上文提到的Sharding-jdbc的加密算法为例,只有真正的使用者才知道自己到底需要什么,因此把一部分决定权(实现)交给用户的能力是必须要具备的,不然的话框架也好,工具也罢,为了满足所有的情况,代码势必都会变的非常臃肿。这其中最关键的设计原则即:

依赖倒置原则(要针对抽象层编程,而不要针对具体类编程)

我们在日常开发中同样要思考如何设计接口,如何依赖抽象层进行编程,减少与实现类之间的耦合,同样的,为了实现这一要求,我们必然会去学习设计模式、设计原则之类的知识,去了解各种设计模式的最佳实践,一步步的去优化代码,在此推荐一下我之前的文章:设计模式总篇:从为什么需要原则到实际落地(附知识图谱)。

总结

截止到这里,我们明白了什么是SPI及其工作的原理,熟悉了它的典型案例,也了解了它的应用场景、设计理念等等,下面是一些针对性的建议:

  1. SPI机制是框架/工具级项目必备的能力之一,立志于高级工程师的小伙伴一定要吃透它的设计理念和实现原理
  2. SPI的核心思想:把一部分决定权(实现)交给用户,即依赖倒置
  3. 了解SPI的优势和特点后,在单体项目中我们完全可以使用别的方案达到更好的效果,切忌为了使用而去用它
  4. 未来在开发或使用某些中间件/工具时,可以多加留意它是否提供了相关的SPI接口,可能会起到事半功倍的效果。

如果觉得这篇内容对你有帮助的话:

  1. 当然要点赞支持一下啦~
  2. 另外,可以搜索并关注公众号「是Kerwin啊」,一起在技术的路上走下去吧~ 😋

参考文章

  1. ServiceLoader
  2. Java中SPI机制深入及源码解析

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JUC并发-工具类详解 一、🎈CountDownLatch(

发表于 2021-08-26

这是我参与8月更文挑战的第26天,活动详情查看:8月更文挑战

往期推荐

  • Java基础知识
  • Java并发编程

一、🎈CountDownLatch(减法计数器)

1.1 概述

CountDownLatch是一个同步辅助类,允许一个或多个线程等待,一直到其他线程执行的操作完成后再执行。

CountDownLatch是通过一个计数器来实现的,计数器的初始值是线程的数量。每当有一个线程执行完毕后,然后通过 CountDown方法来让计数器的值-1,当计数器的值为0时,表示所有线程都执行完毕,然后继续执行 await方法 之后的语句,即在锁上等待的线程就可以恢复工作了。

1.2 类的内部类

CountDownLatch类存在一个内部类Sync,继承自AbstractQueuedSynchronizer,其源代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
js复制代码private static final class Sync extends AbstractQueuedSynchronizer {
// 版本号
private static final long serialVersionUID = 4982264981922014374L;

// 构造器
Sync(int count) {
setState(count);
}

// 返回当前计数
int getCount() {
return getState();
}

// 试图在共享模式下获取对象状态
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

// 试图设置状态来反映共享模式下的一个释放
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 无限循环
for (;;) {
// 获取状态
int c = getState();
if (c == 0) // 没有被线程占有
return false;
// 下一个状态
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 比较并且设置成功
return nextc == 0;
}
}
}

说明: 对`CountDownLatch`方法的调用会转发到对Sync或AQS的方法的调用,
所以,AQS对CountDownLatch提供支持。

1.3 类的构造函数

1
2
3
4
5
6
7
8
9
10
js复制代码public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化状态数
this.sync = new Sync(count);
}


说明:
该构造函数可以构造一个用给定计数初始化的`CountDownLatch`,
并且构造函数内完成了sync的初始化,并设置了状态数。

1.4 CountDownLatch两个核心函数

1.4.1 countDown

  • 递减锁存器的计数,如果计数达到零,则释放所有等待的线程。
  • 如果当前计数大于零,则递减。 如果新计数为零,则为线程调度目的重新启用所有等待线程。
  • 如果当前计数为零,则什么也不会发生。
1
2
3
4
5
js复制代码public void countDown() {
sync.releaseShared(1);
}

说明: 对`countDown`的调用转换为对Sync对象的`releaseShared`(从AQS继承而来)方法的调用。
  • releaseShared源码如下
1
2
3
4
5
6
7
8
9
10
js复制代码public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

说明: 此函数会以共享模式释放对象,
并且在函数中会调用到`CountDownLatch`的`tryReleaseShared`函数,并且可能会调用AQS的`doReleaseShared`函数。
  • tryReleaseShared源码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
js复制代码protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 无限循环
for (;;) {
// 获取状态
int c = getState();
if (c == 0) // 没有被线程占有
return false;
// 下一个状态
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 比较并且设置成功
return nextc == 0;
}
}

说明: 此函数会试图设置状态来反映共享模式下的一个释放。
  • AQS的doReleaseShared的源码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
js复制代码private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 无限循环
for (;;) {
// 保存头结点
Node h = head;
if (h != null && h != tail) { // 头结点不为空并且头结点不为尾结点
// 获取头结点的等待状态
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 状态为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就继续
continue; // loop to recheck cases
// 释放后继结点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不成功,继续
continue; // loop on failed CAS
}
if (h == head) // 若头结点改变,继续循环
break;
}
}

说明: 此函数在共享模式下释放资源。

CountDownLatch的countDown调用链:

6.png

1.4.2 await

  • 使当前线程等待直到闩锁倒计时为零,除非线程被中断。
  • 如果当前计数为零,则此方法立即返回。即await 方法阻塞的线程会被唤醒,继续执行。
  • 如果当前计数大于零,则当前线程出于线程调度目的而被禁用并处于休眠状态。
1
2
3
4
5
js复制代码public void await() throws InterruptedException {
// 转发到sync对象上
sync.acquireSharedInterruptibly(1);
}
说明:对`CountDownLatch`对象的await的调用会转发为对Sync的`acquireSharedInterruptibly`(从AQS继承的方法)方法的调用。
  • acquireSharedInterruptibly源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
js复制代码public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}



说明:`acquireSharedInterruptibly`又调用了
`CountDownLatch`的内部类Sync的`tryAcquireShared`和AQS的`doAcquireSharedInterruptibly`函数。
  • tryAcquireShared函数的源码如下:
1
2
3
4
js复制代码protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
说明: 该函数只是简单的判断AQS的state是否为0,为0则返回1,不为0则返回-1。
  • doAcquireSharedInterruptibly函数的源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
js复制代码private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 添加节点至等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) { // 无限循环
// 获取node的前驱节点
final Node p = node.predecessor();
if (p == head) { // 前驱节点为头结点
// 试图在共享模式下获取对象状态
int r = tryAcquireShared(arg);
if (r >= 0) { // 获取成功
// 设置头结点并进行繁殖
setHeadAndPropagate(node, r);
// 设置节点next域
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 在获取失败后是否需要禁止线程并且进行中断检查
// 抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}


说明: 在AQS的`doAcquireSharedInterruptibly`中可能会再次调用
`CountDownLatch`的内部类Sync的`tryAcquireShared`方法和AQS的`setHeadAndPropagate`方法。
  • setHeadAndPropagate方法源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
js复制代码private void setHeadAndPropagate(Node node, int propagate) {
// 获取头结点
Node h = head; // Record old head for check below
// 设置头结点
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 进行判断
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取节点的后继
Node s = node.next;
if (s == null || s.isShared()) // 后继为空或者为共享模式
// 以共享模式进行释放
doReleaseShared();
}
}
说明: 该方法设置头结点并且释放头结点后面的满足条件的结点,该方法中可能会调用到AQS的`doReleaseShared`方法
  • doReleaseShared方法源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
js复制代码private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 无限循环
for (;;) {
// 保存头结点
Node h = head;
if (h != null && h != tail) { // 头结点不为空并且头结点不为尾结点
// 获取头结点的等待状态
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 状态为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就继续
continue; // loop to recheck cases
// 释放后继结点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不成功,继续
continue; // loop on failed CAS
}
if (h == head) // 若头结点改变,继续循环
break;
}
}

说明: 该方法在共享模式下释放。

CountDownLatch的await调用链:

7.png

1.5 CountDownLatch示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
js复制代码import java.util.concurrent.CountDownLatch;

class MyThread extends Thread {
private CountDownLatch countDownLatch;

public MyThread(String name, CountDownLatch countDownLatch) {
super(name);
this.countDownLatch = countDownLatch;
}

public void run() {
System.out.println(Thread.currentThread().getName() + " doing something");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " finish");
countDownLatch.countDown();
}
}

public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
MyThread t1 = new MyThread("t1", countDownLatch);
MyThread t2 = new MyThread("t2", countDownLatch);
t1.start();
t2.start();
System.out.println("Waiting for t1 thread and t2 thread to finish");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " continue");
}
}


运行结果:
Waiting for t1 thread and t2 thread to finish
t1 doing something
t2 doing something
t1 finish
t2 finish
main continue

二、🎀CyclicBarrier(加法计数器)

2.1 概述

CyclicBarrier也叫循环栅栏,是一个可循环利用的屏障。通过它可以实现让一组线程等待至某个状态之后再全部同时执行。每个线程在到达栅栏的时候都会调用await()方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。叫做循环是因为当所有等待线程都被释放以后,CyclicBarrier还可以被重用(调用CyclicBarrier的reset()方法)。

2.2 CyclicBarrier 主要方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
js复制代码public class CyclicBarrier {

private int dowait(boolean timed, long nanos); // 供await方法调用 判断是否达到条件 可以往下执行吗

//创建一个新的CyclicBarrier,它将在给定数量的参与方(线程)等待时触发,每执行一次CyclicBarrier就累加1,达到了parties,就会触发barrierAction的执行
public CyclicBarrier(int parties, Runnable barrierAction) ;

//创建一个新的CyclicBarrier ,参数就是目标障碍数,它将在给定数量的参与方(线程)等待时触发,每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句
public CyclicBarrier(int parties)

//返回触发此障碍所需的参与方数量。
public int getParties()

//等待,直到所有各方都在此屏障上调用了await 。
// 如果当前线程不是最后一个到达的线程,那么它会出于线程调度目的而被禁用并处于休眠状态.直到所有线程都调用了或者被中断亦或者发生异常中断退出
public int await()

// 基本同上 多了个等待时间 等待时间内所有线程没有完成,将会抛出一个超时异常
public int await(long timeout, TimeUnit unit)

//将障碍重置为其初始状态。
public void reset()

}

2.3 构造函数

  • CyclicBarrier(int, Runnable)型构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
js复制代码public CyclicBarrier(int parties, Runnable barrierAction) {
// 参与的线程数量小于等于0,抛出异常
if (parties <= 0) throw new IllegalArgumentException();
// 设置parties
this.parties = parties;
// 设置count
this.count = parties;
// 设置barrierCommand
this.barrierCommand = barrierAction;
}

说明: 该构造函数可以指定关联该`CyclicBarrier`的线程数量,并且可以指定在所有线程都进入屏障后的执行动作,
该执行动作由最后一个进行屏障的线程执行。
  • CyclicBarrier(int)型构造函数
1
2
3
4
5
6
js复制代码public CyclicBarrier(int parties) {
// 调用含有两个参数的构造函数
this(parties, null);
}

说明: 该构造函数仅仅执行了关联该CyclicBarrier的线程数量,没有设置执行动作。

2.4 CyclicBarrier两个核心函数

2.4.1 dowait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
js复制代码private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 保存当前锁
final ReentrantLock lock = this.lock;
// 锁定
lock.lock();
try {
// 保存当前代
final Generation g = generation;

if (g.broken) // 屏障被破坏,抛出异常
throw new BrokenBarrierException();

if (Thread.interrupted()) { // 线程被中断
// 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
breakBarrier();
// 抛出异常
throw new InterruptedException();
}

// 减少正在等待进入屏障的线程数量
int index = --count;
if (index == 0) { // 正在等待进入屏障的线程数量为0,所有线程都已经进入
// 运行的动作标识
boolean ranAction = false;
try {
// 保存运行动作
final Runnable command = barrierCommand;
if (command != null) // 动作不为空
// 运行
command.run();
// 设置ranAction状态
ranAction = true;
// 进入下一代
nextGeneration();
return 0;
} finally {
if (!ranAction) // 没有运行的动作
// 损坏当前屏障
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
// 无限循环
for (;;) {
try {
if (!timed) // 没有设置等待时间
// 等待
trip.await();
else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0
// 等待指定时长
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏
// 损坏当前屏障
breakBarrier();
// 抛出异常
throw ie;
} else { // 不等于当前带后者是屏障被损坏
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
// 中断当前线程
Thread.currentThread().interrupt();
}
}

if (g.broken) // 屏障被损坏,抛出异常
throw new BrokenBarrierException();

if (g != generation) // 不等于当前代
// 返回索引
return index;

if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0
// 损坏屏障
breakBarrier();
// 抛出异常
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}

dowait方法的逻辑流程:

7.png

2.4.2 nextGeneration

此函数在所有线程进入屏障后会被调用,即生成下一个版本,所有线程又可以重新进入到屏障中,其源代码如下:

1
2
3
4
5
6
7
8
9
10
js复制代码private void nextGeneration() {
// signal completion of last generation
// 唤醒所有线程
trip.signalAll();
// set up next generation
// 恢复正在等待进入屏障的线程数量
count = parties;
// 新生一代
generation = new Generation();
}

在此函数中会调用AQS的signalAll方法,即唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。其源代码如下:

1
2
3
4
5
6
7
8
9
10
11
js复制代码public final void signalAll() {
if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
// 保存condition队列头结点
Node first = firstWaiter;
if (first != null) // 头结点不为空
// 唤醒所有等待线程
doSignalAll(first);
}

说明: 此函数判断头结点是否为空,即条件队列是否为空,然后会调用`doSignalAll`函数。
  • doSignalAll函数源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
js复制代码private void doSignalAll(Node first) {
// condition队列的头结点尾结点都设置为空
lastWaiter = firstWaiter = null;
// 循环
do {
// 获取first结点的nextWaiter域结点
Node next = first.nextWaiter;
// 设置first结点的nextWaiter域为空
first.nextWaiter = null;
// 将first结点从condition队列转移到sync队列
transferForSignal(first);
// 重新设置first
first = next;
} while (first != null);
}

此函数会依次将条件队列中的节点转移到同步队列中,会调用到`transferForSignal`函数。
  • transferForSignal函数源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
js复制代码final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

说明: 此函数的作用就是将处于条件队列中的节点转移到同步队列中,并设置结点的状态信息,其中会调用到`enq`函数。
  • enq函数源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
js复制代码private Node enq(final Node node) {
for (;;) { // 无限循环,确保结点能够成功入队列
// 保存尾结点
Node t = tail;
if (t == null) { // 尾结点为空,即还没被初始化
if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点
tail = head; // 头结点与尾结点都指向同一个新生结点
} else { // 尾结点不为空,即已经被初始化过
// 将node结点的prev域连接到尾结点
node.prev = t;
if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
// 设置尾结点的next域为node
t.next = node;
return t; // 返回尾结点
}
}
}
}

说明: 此函数完成了结点插入同步队列的过程

newGeneration函数调用链:

9.png

2.5 CyclicBarrier示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
js复制代码public class CyclicBarrierDemo {
public static void main(String[] args) {

CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("集齐七张卡片,参与抽奖");
});

for (int i = 1; i <= 7; i++) {
final int temp=i;
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"收集到第"+temp+"张");
//阻塞任务线程
cyclicBarrier.await();

} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}


}
}

程序输出:
Thread-1收集到第2张
Thread-4收集到第5张
Thread-3收集到第4张
Thread-2收集到第3张
Thread-0收集到第1张
Thread-6收集到第7张
Thread-5收集到第6张
集齐七张卡片,参与抽奖

三、🩰Semaphore( 信号灯)

3.1 概述

Semaphore:信号量通常用于限制可以访问某些(物理或逻辑)资源的线程数。

使用场景:

限制资源,如抢位置、限流等。

3.2 Semaphore常用方法

  • void acquire() 许可数-1,从该信号量获取许可证,阻塞直到可用或线程被中断。
  • void acquire(int permits) 许可数 - permits,从该信号量获取给定数量的许可证,阻塞直到可用或线程被中断。
  • int availablePermits() 返回此信号量中当前可用的许可数。
  • void release() 许可数+1,释放许可证,将其返回到信号量。
  • void release(int permits) 许可数+permits,释放给定数量的许可证,将其返回到信号量。
  • boolean hasQueuedThreads() 查询是否有线程正在等待获取许可。
  • int getQueueLength() 返回等待获取许可的线程数的估计。

3.3 Semaphore构造函数

  • Semaphore(int)型构造函数
1
2
3
4
5
js复制代码public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

说明: 该构造函数会创建具有给定的许可数和非公平的公平设置的`Semaphore`。
  • Semaphore(int, boolean)型构造函数
1
2
3
4
5
js复制代码public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

说明: 该构造函数会创建具有给定的许可数和给定的公平设置的`Semaphore`。

3.4 Semaphore两个核心函数

3.4.1 acquire

此方法从信号量获取一个(多个)许可,在提供一个许可前一直将线程阻塞,或者线程被中断,其源码如下:

1
2
3
4
5
6
js复制代码public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

说明: 该方法中将会调用Sync对象的`acquireSharedInterruptibly`
(从AQS继承而来的方法)方法,而`acquireSharedInterruptibly`方法在`CountDownLatch`中已经进行了分析,在此不再累赘。

3.4.2 release

此方法释放一个(多个)许可,将其返回给信号量,源码如下:

1
2
3
4
5
6
js复制代码public void release() {
sync.releaseShared(1);
}

说明: 该方法中将会调用Sync对象的`releaseShared`
(从AQS继承而来的方法)方法,而`releaseShared`方法在`CountDownLatch`中已经进行了分析,在此不再累赘。

3.5 Semaphore示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
js复制代码/**
* @Author: Akiang
* @Date: 2021-08-26 09:03
* version 1.0
*/
public class SemaphoreDemo1 {
public static void main(String[] args) {
// 10台电脑
Semaphore semaphore = new Semaphore(10);

// 20 个小伙伴想要上网
for (int i = 1; i <= 20; i++) {
new Thread(() -> {
try {
//等待获取许可证
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到了电脑");
//抢到的小伙伴,迅速就开打啦 这里就模拟个时间哈,
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//打完几把游戏的小伙伴 女朋友来找了 溜啦溜啦 希望大家都有人陪伴
System.out.println("女朋友来找,"+Thread.currentThread().getName() + "离开了");
semaphore.release();//释放资源,离开了就要把电脑让给别人啦。
}
}, String.valueOf(i)).start();
}
}
}

四、 🏏简单讲述 | Phaser & Exchanger

4.1 Phaser

Phaser一种可重用的同步屏障,功能上类似于CyclicBarrier和CountDownLatch,但使用上更为灵活。非常适用于在多线程环境下同步协调分阶段计算任务(Fork/Join框架中的子任务之间需同步时,优先使用Phaser)

  • 函数列表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
js复制代码//构造方法
public Phaser() {
this(null, 0);
}
public Phaser(int parties) {
this(null, parties);
}
public Phaser(Phaser parent) {
this(parent, 0);
}
public Phaser(Phaser parent, int parties)
//注册一个新的party
public int register()
//批量注册
public int bulkRegister(int parties)
//使当前线程到达phaser,不等待其他任务到达。返回arrival phase number
public int arrive()
//使当前线程到达phaser并撤销注册,返回arrival phase number
public int arriveAndDeregister()
/*
* 使当前线程到达phaser并等待其他任务到达,等价于awaitAdvance(arrive())。
* 如果需要等待中断或超时,可以使用awaitAdvance方法完成一个类似的构造。
* 如果需要在到达后取消注册,可以使用awaitAdvance(arriveAndDeregister())。
*/
public int arriveAndAwaitAdvance()
//等待给定phase数,返回下一个 arrival phase number
public int awaitAdvance(int phase)
//阻塞等待,直到phase前进到下一代,返回下一代的phase number
public int awaitAdvance(int phase)
//响应中断版awaitAdvance
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException
//使当前phaser进入终止状态,已注册的parties不受影响,如果是分层结构,则终止所有phaser
public void forceTermination()
  • 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
js复制代码public class PhaserDemo {

private static Phaser phaser = new MyPhaser();

//自定义一个移相器来自定义输出
static class MyPhaser extends Phaser {
/**
* @deprecated 在即将到来的阶段提前时执行操作并控制终止的可覆盖方法。 此方法在推进此移相器的一方到达时调用(当所有其他等待方处于休眠状态时)。
* 如果此方法返回true ,则此移相器将在提前时设置为最终终止状态,并且对isTerminated后续调用将返回 true。
* @param phase 进入此方法的当前阶段号,在此移相器前进之前
* @param registeredParties 当前注册方的数量
* @return
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
if (phase == 0) {
System.out.println("所有人都到达了网吧,准备开始开黑!!!");
return false;
} else if (phase == 1) {
System.out.println("大家都同意,一起去次烧烤咯!!!");
return false;
} else if (phase == 2) {
System.out.println("大家一起回寝室!!!");
return true;
}
return true;
}
}

//构建一个线程任务
static class DoSomeThing implements Runnable {
@Override
public void run() {
/**
* 向此移相器添加一个新的未到达方
*/
phaser.register();
System.out.println(Thread.currentThread().getName() + "从家里出发,准备去学校后街上网开黑!!!");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + "上着上着饿了,说去次烧烤吗?");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + "烧烤次完了");
phaser.arriveAndAwaitAdvance();
}
}

public static void main(String[] args) throws Exception {
DoSomeThing thing = new DoSomeThing();
new Thread(thing, "小明").start();
new Thread(thing, "小王").start();
new Thread(thing, "小李").start();
}
}
/**
* 小李从家里出发,准备去学校后街上网开黑!!!
* 小王从家里出发,准备去学校后街上网开黑!!!
* 小明从家里出发,准备去学校后街上网开黑!!!
* 所有人都到达了网吧,准备开始开黑!!!
* 小李上着上着饿了,说去次烧烤吗?
* 小明上着上着饿了,说去次烧烤吗?
* 小王上着上着饿了,说去次烧烤吗?
* 大家都同意,一起去次烧烤咯!!!
* 小明烧烤次完了
* 小李烧烤次完了
* 小王烧烤次完了
* 大家一起回寝室!!!
*/

4.2 Exchanger

Exchanger允许两个线程在某个汇合点交换对象,在某些管道设计时比较有用。

Exchanger提供了一个同步点,在这个同步点,一对线程可以交换数据。每个线程通过exchange()方法的入口提供数据给他的伙伴线程,并接收他的伙伴线程提供的数据并返回。

当两个线程通过Exchanger交换了对象,这个交换对于两个线程来说都是安全的。Exchanger可以认为是SynchronousQueue的双向形式,在运用到遗传算法和管道设计的应用中比较有用。

  • Exchanger实现机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
js复制代码for (;;) {
if (slot is empty) { // offer
// slot为空时,将item 设置到Node 中
place item in a Node;
if (can CAS slot from empty to node) {
// 当将node通过CAS交换到slot中时,挂起线程等待被唤醒
wait for release;
// 被唤醒后返回node中匹配到的item
return matching item in node;
}
} else if (can CAS slot from node to empty) { // release
// 将slot设置为空
// 获取node中的item,将需要交换的数据设置到匹配的item
get the item in node;
set matching item in node;
// 唤醒等待的线程
release waiting thread;
}
// else retry on CAS failure
}
  • Exchanger示例

来一个非常经典的并发问题:你有相同的数据buffer,一个或多个数据生产者,和一个或多个数据消费者。只是Exchange类只能同步2个线程,所以你只能在你的生产者和消费者问题中只有一个生产者和一个消费者时使用这个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
jspublic复制代码    static class Producer extends Thread {
private Exchanger<Integer> exchanger;
private static int data = 0;
Producer(String name, Exchanger<Integer> exchanger) {
super("Producer-" + name);
this.exchanger = exchanger;
}

@Override
public void run() {
for (int i=1; i<5; i++) {
try {
TimeUnit.SECONDS.sleep(1);
data = i;
System.out.println(getName()+" 交换前:" + data);
data = exchanger.exchange(data);
System.out.println(getName()+" 交换后:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class Consumer extends Thread {
private Exchanger<Integer> exchanger;
private static int data = 0;
Consumer(String name, Exchanger<Integer> exchanger) {
super("Consumer-" + name);
this.exchanger = exchanger;
}

@Override
public void run() {
while (true) {
data = 0;
System.out.println(getName()+" 交换前:" + data);
try {
TimeUnit.SECONDS.sleep(1);
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName()+" 交换后:" + data);
}
}
}

public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<Integer>();
new Producer("", exchanger).start();
new Consumer("", exchanger).start();
TimeUnit.SECONDS.sleep(7);
System.exit(-1);
}
}

其结果可能如下:

Consumer- 交换前:0
Producer- 交换前:1
Consumer- 交换后:1
Consumer- 交换前:0
Producer- 交换后:0
Producer- 交换前:2
Producer- 交换后:0
Consumer- 交换后:2
Consumer- 交换前:0
Producer- 交换前:3
Producer- 交换后:0
Consumer- 交换后:3
Consumer- 交换前:0
Producer- 交换前:4
Producer- 交换后:0
Consumer- 交换后:4
Consumer- 交换前:0

五、CyclicBarrier与CountDownLatch的区别

  • CountDownLatch的await()方法阻塞的是主线程或调用await()的线程,而CyclicBarrier的await()阻塞的是任务线程,主线程或调用线程不受影响。
  • CountDownLatch无法重置计数次数,而CyclicBarrier可以通过reset()方法来重用
  • CountDownLatch和CyclicBarrier都是用作多线程同步,CountDownLatch基于AQS,CyclicBarrier基于ReentrantLock和Condition。

参考:《Java并发编程的艺术》

JUC系列(七)| JUC三大常用工具类CountDownLatch、CyclicBarrier、Semaphore

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

探究php in_array 的低性能

发表于 2021-08-26

这是我参与8月更文挑战的第26天,活动详情查看:8月更文挑战

PHP 首要说的就是性能方面的提升。对于 in_array()一直诟病很多,至于性能有多差,先简单测试一下,看看数据。

实例测试

测试程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
php复制代码<?php
/**
* 获取当前时间戳(毫秒级)
* @return float
*/
function microtime_float(){
list($usec, $sec) = explode(' ', microtime());
return ((float)$usec + (float)$sec);
}

/**
* 数组初始化
*/
$int_arr = $str_arr = [];
for($i=0; $i<200000; $i++){
$int_arr[] = $i;
$str_arr[] = "{$i}";
}

$time_start = microtime_float();
// 具体操作
for($j=0; $j<3000; $j++){
if(in_array(18000, $int_arr)){
continue;
}
// if(isset($int_arr[$key])){
// continue;
// }
}

$time_end = microtime_float();
echo '消耗时间:'.($time_end - $time_start);

测试结果

in_array 方式

  1. 字符型字符串
1
2
arduino复制代码// 3k 消耗时间:1.0687351226807
// 3w 消耗时间:10.569030046463
  1. int 型字符串
1
2
arduino复制代码// 3k 消耗时间:1.0500609874725
// 3w 消耗时间:10.290988922119

isset 方式

  1. 字符型字符串
1
2
arduino复制代码// 3k 消耗时间:0.00010299682617188
// 3w 消耗时间:0.00089907646179199
  1. int 型字符串
1
2
arduino复制代码// 3k 消耗时间:0.00010108947753906
// 3w 消耗时间:0.00085687637329102

结合上面测试数据,两种方式的性能差距还是挺明显的。

接着,我们利用 ltrace 来跟踪进程调用库函数的情况:

1
shell复制代码$ ltrace -c /usr/local/php/bin/php test.php

看到库函数__strtol_internal 的调用非常之频繁,这个库函数__strtol_internal 是原来是 strtol 的别名,简单的说就是把字符串转换成长整形,可以推测 PHP 引擎已经检测到这是一个字符串型的数字,所以期望将他们转换成长整型来比较,这个转换过程中消耗了太多时间,我们再次执行:

1
shell复制代码$ ltrace -e "__strtol_internal" /usr/local/php/bin/php test.php

可以轻松抓到大量下图这样的调用,到此,问题找到了,in_array 这种松比较,会将两个字符型数字串先转换为长整型再进行比较,却不知性能就耗在这上面了。

知道了症结所在,我们解决的办法就很多了,最简单的就是为 in_array 加第三个参数为 true,即变为严格比较,同时还要比较类型,这样避免了 PHP 自作聪明的转换类型,跑起来果然快多了,代码如下:

1
2
3
4
5
6
7
bash复制代码in_array(search,array,type)

参考上例,改为如下:
in_array(18000, $int_arr, true);
in_array('18000', $str_arr, true);

分别执行3k次的消耗时间分别为:0.88052606582642、0.88974308967591234

总结

总结一下,大数组的查询,用 in_array 函数是个糟糕的选择。应该尽量用 isset 函数来替代 。in_array 函数的复杂度是 O(n),而 isset 函数的复杂度是 O(1)。

  • END -

作者:架构精进之路,十年研发风雨路,大厂架构师,CSDN 博客专家,专注架构技术沉淀学习及分享,职业与认知升级,坚持分享接地气儿的干货文章,期待与你一起成长。

关注并私信我回复“01”,送你一份程序员成长进阶大礼包,欢迎勾搭。

Thanks for reading!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot+Redis Lua限流最佳实践

发表于 2021-08-26

这是我参与8月更文挑战的第26天,活动详情查看:8月更文挑战

常见的限流算法

计数器算法

计数器算法采用计数器实现限流有点简单粗暴,一般我们会限制一秒钟的能够通过的请求数,比如限流qps为100,算法的实现思路就是从第一个请求进来开始计时,在接下去的1s内,每来一个请求,就把计数加1,如果累加的数字达到了100,那么后续的请求就会被全部拒绝。等到1s结束后,把计数恢复成0,重新开始计数。具体的实现可以是这样的:对于每次服务调用,可以通过AtomicLong#incrementAndGet()方法来给计数器加1并返回最新值,通过这个最新值和阈值进行比较。这种实现方式,相信大家都知道有一个弊端:如果我在单位时间1s内的前10ms,已经通过了100个请求,那后面的990ms,只能眼巴巴的把请求拒绝,我们把这种现象称为“突刺现象”

漏桶算法

漏桶算法为了消除”突刺现象”,可以采用漏桶算法实现限流,漏桶算法这个名字就很形象,算法内部有一个容器,类似生活用到的漏斗,当请求进来时,相当于水倒入漏斗,然后从下端小口慢慢匀速的流出。不管上面流量多大,下面流出的速度始终保持不变。不管服务调用方多么不稳定,通过漏桶算法进行限流,每10毫秒处理一次请求。因为处理的速度是固定的,请求进来的速度是未知的,可能突然进来很多请求,没来得及处理的请求就先放在桶里,既然是个桶,肯定是有容量上限,如果桶满了,那么新进来的请求就丢弃。

image.png
在算法实现方面,可以准备一个队列,用来保存请求,另外通过一个线程池(ScheduledExecutorService)来定期从队列中获取请求并执行,可以一次性获取多个并发执行。这种算法,在使用过后也存在弊端:无法应对短时间的突发流量。

令牌桶算法

从某种意义上讲,令牌桶算法是对漏桶算法的一种改进,桶算法能够限制请求调用的速率,而令牌桶算法能够在限制调用的平均速率的同时还允许一定程度的突发调用。在令牌桶算法中,存在一个桶,用来存放固定数量的令牌。算法中存在一种机制,以一定的速率往桶中放令牌。每次请求调用需要先获取令牌,只有拿到令牌,才有机会继续执行,否则选择选择等待可用的令牌、或者直接拒绝。放令牌这个动作是持续不断的进行,如果桶中令牌数达到上限,就丢弃令牌,所以就存在这种情况,桶中一直有大量的可用令牌,这时进来的请求就可以直接拿到令牌执行,比如设置qps为100,那么限流器初始化完成一秒后,桶中就已经有100个令牌了,这时服务还没完全启动好,等启动完成对外提供服务时,该限流器可以抵挡瞬时的100个请求。所以,只有桶中没有令牌时,请求才会进行等待,最后相当于以一定的速率执行。

image.png

实现思路: 可以准备一个队列,用来保存令牌,另外通过一个线程池定期生成令牌放到队列中,每来一个请求,就从队列中获取一个令牌,并继续执行。

基于redis-lua实现令牌桶限流算法解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
lua复制代码-- 令牌桶在redis中的key值
local tokens_key = KEYS[1]
-- 该令牌桶上一次刷新的时间对应的key的值
local timestamp_key = KEYS[2]
-- 令牌单位时间填充速率
local rate = tonumber(ARGV[1])
-- 令牌桶容量
local capacity = tonumber(ARGV[2])
-- 当前时间
local now = tonumber(ARGV[3])
-- 请求需要的令牌数
local requested = tonumber(ARGV[4])
-- 令牌桶容量/令牌填充速率=令牌桶填满所需的时间
local fill_time = capacity/rate
-- 令牌过期时间 填充时间*2
local ttl = math.floor(fill_time*2)
-- 获取上一次令牌桶剩余的令牌数
local last_tokens = tonumber(redis.call("get", tokens_key))
-- 如果没有获取到,可能是令牌桶是新的,之前不存在该令牌桶,或者该令牌桶已经好久没有使用
-- 过期了,这里需要对令牌桶进行初始化,初始情况,令牌桶是满的
if last_tokens == nil then
last_tokens = capacity
end
-- 获取上一次刷新的时间,如果没有,或者已经过期,那么初始化为0
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
-- 计算上一次刷新时间和本次刷新时间的时间差
local delta = math.max(0, now-last_refreshed)
-- delta*rate = 这个时间差可以填充的令牌数,
-- 令牌桶中先存在的令牌数 = 填充令牌数+令牌桶中原有的令牌数
-- 以为令牌桶有容量,所以如果计算的值大于令牌桶容量,那么以令牌容容量为准
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 判断令牌桶中的令牌数是都满本次请求需要的令牌数,如果不满足,说明被限流了
local allowed = filled_tokens >= requested

-- 这里声明了两个变量,一个是新的令牌数,一个是是否被限流,0代表限流,1代表没有线路
local new_tokens = filled_tokens
local allowed_num = 0
-- 如果没有被限流,即,filled_tokens >= requested,
-- 新的令牌数=刚刚计算好的令牌桶中存在的令牌数减掉本次需要使用的令牌数
-- 并设置限流结果为未限流
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
-- 存储本次操作后,令牌桶中的令牌数以及本次刷新时间
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
-- 返回是否被限流标志以及令牌桶剩余令牌数
return { allowed_num, new_tokens }
  1. 这个KEYS[1],KEYS[2]ARGV[1],ARGV[2]… 表示调用该lua脚本时,传入的变量列表。
  2. KEYS[i] 表示调用lua脚本传过来的变量KEYS[i]作为一个key,从redis中获取具体的值
  3. ARGV[i] 表示调用lua脚本时传过来的变量ARGV[i]

举个例子吧,
我们在调用脚本的时候,传入了两组参数,一组是KEYS,一组是ARGV,这两组参数假设是

KEYS : [demo1,demo2]

ARGV: [3,3,11,1]

那么,

KEYS[1]等于redis.get(demo1)

ARGV[1]等于3

SpringBoot调用RedisLua

引入依赖

1
2
3
4
xml复制代码        <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

资源目录新建scripts文件夹,将lua脚本放进去

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
lua复制代码local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end

-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }

构造redis-script对象

springboot将每个lua脚本抽象为一个RedisScript对象,该类提供了两个方法,一个是设置lua脚本的io流,还有一个是直接将lua脚本以字符串的形式设置,这里用io流的形式。

该对象的泛型是lua脚本的返回值,我们的脚本返回的是两个long类型,所以使用List来接收。

1
2
3
4
5
6
7
8
java复制代码    @Bean(name = "rateLimitRedisScript")
public RedisScript<List<Long>> rateLimitRedisScript() {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
// redisScript.setScriptText();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("scripts/redis_rate_limit.lua")));
redisScript.setResultType(List.class);
return redisScript;
}

设置redis序列化规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Bean
@ConditionalOnMissingBean(StringRedisTemplate.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}

@Bean
@ConditionalOnMissingBean(RedisTemplate.class)
public RedisTemplate<String, Object> redisTemplate(
RedisConnectionFactory redisConnectionFactory)
throws UnknownHostException {

Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);

RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(redisConnectionFactory);
template.setKeySerializer(jackson2JsonRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}

调用lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码    @Resource
private RedisScript<List<Long>> rateLimitRedisScript;

@Resource
private StringRedisTemplate stringRedisTemplate;

@GetMapping
public List<Long> userToken() {
// 设置lua脚本的ARGV的值
List<String> scriptArgs = Arrays.asList(
1 + "",
3 + "",
(Instant.now().toEpochMilli()) + "",
"1");
// 设置lua脚本的KEYS值
List<String> keys = getKeys("test");
return stringRedisTemplate.execute(rateLimitRedisScript,keys, scriptArgs.toArray());
}

private List<String> getKeys(String id) {
// use `{}` around keys to use Redis Key hash tags
// this allows for using redis cluster

// Make a unique key per user.
String prefix = "request_rate_limiter.{" + id;

// You need two Redis keys for Token Bucket.
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}

调用成功后会返回两个数,一个是是否成功标志,0代表限流,1代表未限流,还有一个是令牌桶中剩余的令牌数

1
2
3
4
java复制代码[
1,
2
]

注意

  1. 这里在拼接key的时候,对id使用了大括号{}进行了包裹,这是因为lua脚本执行成功的前提条件是所用使用到的redis健值必须在一个hash槽中,使用大括号对key进行包裹后,redis在对key进行hash时,指挥hash大括号内部的字符,这样就可以保证lua脚本中的使用的key-value在同一个槽内。这样就确保了cluster模式下正常执行redis-lua脚本,但是需要注意的是,这里大括号内包裹的内容不能是不变的,如果是不变的话,会有大量的key-value被分配到同一个槽里,导致hash倾斜,key-value分布不均匀。
  2. 这里使用的不是RedisTemplate,而是使用的StringRedisTemplate执行lua脚本的,使用RedisTemplate执行lua脚本的时候,会报错。

AOP+RedisLua对接口进行限流

image.png

每次请求,获取令牌桶中的令牌,如果令牌获取成功,代表没有被限流,可以正常访问,如果获取失败代表被限流,访问失败,这时会抛出一个RateLimitException结束。

最终效果

  1. 我打算结合springboot的手动装配,制作一个限流的工具,最终可以被封装成一个jar包,其他项目需要,直接引入就可以,不用重复开发。
  2. 具体的用法是这样的
1. 在配置类上标注`@EnableRedisRateLimit`注解,激活限流工具
2. 在需要限流的接口上标注`@RateLimit`注解,并根据具体的场景设置限流规则
1
2
3
4
5
java复制代码 @RateLimit(replenishRate = 3,burstCapacity = 300)
@GetMapping("test-limit")
public Result<Void> testLimit(){
return Result.buildSuccess();
}

核心代码介绍

  1. @RateLimit 为了方便拓展,使得使用不同的场景,这里通过实现KeyResolver接口来指定具体的限流维度
  2. 这里说一下limitProperties的作用,我们可以默认使用注解中的参数指定配置信息,但是为了方便拓展,这里提供了limitProperties,如果指定了limitProperties,那么会以limitProperties的配置为准。
  3. 上篇文章介绍的限流lua脚本只能针对秒为时间单位进行限流,我这里对它的lua脚本做了一个小小的改变,使得可以支持秒,分钟,小时,天 为时间单位的限流。
  4. 限流注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {

/**
* 限流维度,默认使用uri进行限流
*
* @return uri
*/
Class<? extends KeyResolver> keyResolver() default UriKeyResolver.class;

/**
* 限流配置,如果实现了该接口,默认以这个为准
*
* @return limitProp
*/
Class<? extends LimitProperties> limitProperties() default DefaultLimitProperties.class;

/**
* 令牌桶每秒填充平均速率
*
* @return replenishRate
*/
int replenishRate() default 1;

/**
* 令牌桶总容量
*
* @return burstCapacity
*/
int burstCapacity() default 3;

/**
* 限流时间维度,默认为秒
* 支持秒,分钟,小时,天
* 即,
* {@link TimeUnit#SECONDS},
* {@link TimeUnit#MINUTES},
* {@link TimeUnit#HOURS},
* {@link TimeUnit#DAYS}
*
* @return TimeUnit
* @since 1.0.2
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
}
  1. 限流配置limitProperties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public interface LimitProperties {
/**
* 令牌桶每秒填充平均速率
*
* @return replenishRate
*/
int replenishRate();

/**
* 令牌桶总容量
*
* @return burstCapacity
*/
int burstCapacity();

/**
* 限流时间维度,默认为秒
* 支持秒,分钟,小时,天
* 即,
* {@link TimeUnit#SECONDS},
* {@link TimeUnit#MINUTES},
* {@link TimeUnit#HOURS},
* {@link TimeUnit#DAYS}
*
* @return TimeUnit
* @since 1.0.2
*/
TimeUnit timeUnit();
}
  1. 限流lua脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
lua复制代码local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local time_unit = tonumber(ARGV[5])
-- 填满令牌桶所需要的时间
local fill_time = capacity/rate
local ttl = math.floor((fill_time*time_unit)*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

return { allowed_num, new_tokens }
  1. 核心AOP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
java复制代码@Slf4j
@Aspect
public class RateLimitInterceptor implements ApplicationContextAware {

@Resource
private RedisTemplate<String, Object> stringRedisTemplate;

@Resource
private RedisScript<List<Long>> rateLimitRedisScript;

private ApplicationContext applicationContext;

@Around("execution(public * *(..)) && @annotation(org.ywb.aoplimiter.anns.RateLimit)")
public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
RateLimit rateLimit = method.getAnnotation(RateLimit.class);
// 断言不会被限流
assertNonLimit(rateLimit, pjp);
return pjp.proceed();
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public void assertNonLimit(RateLimit rateLimit, ProceedingJoinPoint pjp) {
Class<? extends KeyResolver> keyResolverClazz = rateLimit.keyResolver();
KeyResolver keyResolver = applicationContext.getBean(keyResolverClazz);
String resolve = keyResolver.resolve(HttpContentHelper.getCurrentRequest(), pjp);
List<String> keys = getKeys(resolve);

LimitProperties limitProperties = getLimitProperties(rateLimit);

// 根据限流时间维度计算时间
long timeLong = getCurrentTimeLong(limitProperties.timeUnit());

// The arguments to the LUA script. time() returns unixtime in seconds.
List<String> scriptArgs = Arrays.asList(limitProperties.replenishRate() + "",
limitProperties.burstCapacity() + "", (Instant.now().toEpochMilli() / timeLong) + "", "1", timeLong + "");
// 第一个参数是是否被限流,第二个参数是剩余令牌数
List<Long> rateLimitResponse = this.stringRedisTemplate.execute(this.rateLimitRedisScript, keys, scriptArgs.toArray());
Assert.notNull(rateLimitResponse, "redis execute redis lua limit failed.");
Long isAllowed = rateLimitResponse.get(0);
Long newTokens = rateLimitResponse.get(1);
log.info("rate limit key [{}] result: isAllowed [{}] new tokens [{}].", resolve, isAllowed, newTokens);
if (isAllowed <= 0) {
throw new RateLimitException(resolve);
}
}

private LimitProperties getLimitProperties(RateLimit rateLimit) {
Class<? extends LimitProperties> aClass = rateLimit.limitProperties();
if (aClass == DefaultLimitProperties.class) {
// 选取注解中的配置
return new DefaultLimitProperties(rateLimit.replenishRate(), rateLimit.burstCapacity(), rateLimit.timeUnit());
}
// 优先使用用户自己的配置类
return applicationContext.getBean(aClass);
}

private long getCurrentTimeLong(TimeUnit timeUnit) {
switch (timeUnit) {
case SECONDS:
return 1;
case MINUTES:
return 60;
case HOURS:
return 60 * 60;
case DAYS:
return 60 * 60 * 24;
default:
throw new IllegalArgumentException("timeUnit:" + timeUnit + " not support");
}
}

private List<String> getKeys(String id) {
// use `{}` around keys to use Redis Key hash tags
// this allows for using redis cluster

// Make a unique key per user.
String prefix = "request_rate_limiter.{" + id;

// You need two Redis keys for Token Bucket.
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}
}

核心代码就是这么多,完整的源码我已上传至github,

github.com/xiao-ren-wu…

如果感觉对您有帮助的话,请帮忙点个star,谢谢啦~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

synchronized和ReentrantLock的区别

发表于 2021-08-25

这是我参与8月更文挑战的第25天,活动详情查看:8月更文挑战

Java中常用的两个锁synchronized和ReentrantLock,那么它们在原理和使用上有什么区别呢?

二者的区别

1.原始构成

synchronized是关键字属于JVM层面

monitorenter

底层通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象只有在同步块或方法中才能调wait/notify等方法。

monitorexit

Lock是具体类()是api层面的锁

1
2
3
4
5
6
7
8
java复制代码public class Code02_Demo {
public static void main(String[] args) {
synchronized (new Object()){

}
new ReentrantLock();
}
}

javap-c得到

image.png

可以看到synchronized有两个monitorexit是因为第一个monitorexit是正常情况下的解锁,而第二个monitorexit是异常情况下的解锁,这样不会出现死锁

可以看到ReentrantLock是个类

2.使用方法

synchronized不需要用户去手动释放锁,当synchronized代码执行完后系统会自动让线程释放对锁的占用。

需要lock,unlock配合try finally完成

3.等待是否可中断

使用方法synchronized不可中断,除非抛出异常或者正常运行完成,

ReentrantLock可以中断,

1.设置超时方法tryLock(long time, TimeUnit unit)

2.lockInterruptibly()放代码块中,调用interrupt()方法可中断

4.加锁是否公平

synchronized默认是非公平锁

ReentrantLock默认也是非公平锁

5.锁绑定多个条件Condition

ReentrantLock可以分组唤醒线程,而不是像synchronized那样要么全部唤醒,要么随机唤醒一个。

题目

A线程打印1次,B线程打印2次,C线程打印3次,循环5次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
java复制代码import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Code01_ReentrantLockDemo {
public static void main(String[] args) {
ResourceInfo resourceInfo = new ResourceInfo();

new Thread(() -> {
for (int i = 1; i <= 5; i++) {
resourceInfo.print1();
}
}, "A").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
resourceInfo.print2();
}
}, "B").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
resourceInfo.print3();
}
}, "C").start();
}
}

class ResourceInfo {
Lock lock = new ReentrantLock();
private int num = 1; //标志变量num
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();


public void print1() {
lock.lock();
try {
while (num != 1) {
c1.await();
}
System.out.println(Thread.currentThread().getName() + "\t");
//唤醒B线程,修改标志位
num = 2;
c2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}

public void print2() {
lock.lock();
try {
while (num != 2) {
c2.await();
}
for (int i = 1; i <= 2; i++) {
System.out.println(Thread.currentThread().getName() + "\t");
}

//唤醒B线程,修改标志位
num = 3;
c3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}

public void print3() {
lock.lock();
try {
while (num != 3) {
c3.await();
}
for (int i = 1; i <= 3; i++) {
System.out.println(Thread.currentThread().getName() + "\t");
}

//唤醒B线程,修改标志位
num = 1;
c1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

关于git分支开发的总结 1 git分支开发图 2 git分

发表于 2021-08-25

这是我参与8月更文挑战的第25天,活动详情查看:8月更文挑战

因博主在多分支开发的情况下遇到一下开发规范问题,所以结合网上资料,总结一下git分支开发流程

1 git分支开发图

推荐一个在线画图工具: www.processon.com/

该工具可在线画各种流程图,支持多种格式导出,安利给大家使用.

下图就是使用precesson在线画图的,导出的图片文件.

image-20210825224252587

2 git分支开发说明

1 分支说明

对于不同的分支,通常做如下约定.

分支 说明
master 主分支,上线后从release分支合并,和生产分支同步.作为最稳定的分支
develop 开发分支,由feature分支合并
test 测试分支,由feature分支合并
feature 用于开发不同的新功能
release 发布分支
hotfix 热修复分支,修复生产上的紧急问题.通常从master上拉取hotfix分支,修复完成以后,合并回master和develop分支。

2 分支流程图说明

正常的开发流程如上图:

1 从master分支拉几个feature开发新功能,新功能开发完毕,合到develop分支,develop分支合到测试分支,测试通过后,合到release发布分支.此时可以打tag标签,作为一个新的版本发布.接着把release分支合到master分支.下一个迭代又从master分支拉取feature分支开发新功能.

2 如果在第一个迭代中,发现feature-03没有及时完成,不能发布.那个就只把feature-01和feature-02合到develop开发分支,走test分支,走release分支,最后到master分支.如果下个迭代feature-03完成,则可合到develop分支(注意: 此时一定要解决冲突,因为03分支是从上个master分支合出来的,develop是本次master分支合出来的),走test测试分支等

3 如果release分支发版后,遇到紧急bug需要修复, 从最新的master分支拉出hotfix分支,修改完后,可合到master分支,如果此时develop分支正在开发中,也需要合到develop分支.

3 Commit通用规范

在git中提交代码时,最好按照通用规范写清楚提交代码的功能,后续项目总结复盘和跟踪问题时,可节省大量的时间.

常见规范如下:

类型 描述
fix 修复bug
feat 新增了功能
doc 增加文档说明,或修改已有文档
refactor 重构代码
build 修改项目的依赖文件

ps:

1
2
3
4
5
6
7
8
java复制代码fix:
- 修复订单重复下单问题
- 修复列表分页失效问题
feat:
- 新增用户查询功能
doc:
- 修改README文档说明
- 增加下单接口说明

4 总结

git多分支开发,主要是建立一套完善健康的开发流程,熟练掌握后,对于项目整体开发的效率有很高的提升,且降低了项目出现故障的几率.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Spring Boot 快速入门】十、Spring Boo

发表于 2021-08-25

这是我参与8月更文挑战的第25天,活动详情查看:8月更文挑战

前言

  Druid Spring Boot Starter 用于帮助你在Spring Boot项目中轻松集成Druid数据库连接池和监控。本文将结合Spring Boot集成Druid,进行数据源监控。

初识Druid

  Druid是阿里开源的一个JDBC应用组件, 其包括三部分:DruidDriver 代理Driver,能够提供基于Filter-Chain模式的插件体系;
DruidDataSource 高效可管理的数据库连接池;SQLParser SQL语法分析;Druid是Java语言中最好的数据库连接池。Druid能够提供强大的监控和扩展功能。

下载Druid

  Druid是一个开源项目,源码托管在github上,源代码仓库地址是 github.com/alibaba/dru… 。同时每次Druid发布正式版本和快照的时候,都会把源码打包,你可以从上面的下载地址中找到相关版本的源码。

DruidDataSource支持数据库

  理论上说,支持所有有jdbc驱动的数据库。实际测试过的有

数据库 支持状态
mysql 支持,大规模使用
oracle 支持,大规模使用
sqlserver 支持
postgres 支持
db2 支持
h2 支持
derby 支持
sqlite 支持
sybase 支持

Druid自动识别DriverClass

   Druid是根据url前缀来识别DriverClass的,这样使得配置更方便简洁。

前缀 DriverCLass 描述信息
jdbc:odps com.aliyun.odps.jdbc.OdpsDriver
jdbc:derby org.apache.derby.jdbc.EmbeddedDriver
jdbc:mysql com.mysql.jdbc.Driver
jdbc:oracle oracle.jdbc.driver.OracleDriver
jdbc:microsoft com.microsoft.jdbc.sqlserver.SQLServerDriver
jdbc:sybase:Tds com.sybase.jdbc2.jdbc.SybDriver
jdbc:jtds net.sourceforge.jtds.jdbc.Driver
jdbc:postgresql org.postgresql.Driver
jdbc:fake com.alibaba.druid.mock.MockDriver
jdbc:mock com.alibaba.druid.mock.MockDriver
jdbc:hsqldb org.hsqldb.jdbcDriver
jdbc:db2 com.ibm.db2.jdbc.app.DB2Driver DB2的JDBC Driver十分混乱,这个匹配不一定对
jdbc:sqlite org.sqlite.JDBC
jdbc:ingres com.ingres.jdbc.IngresDriver
jdbc:h2 org.h2.Driver
jdbc:mckoi com.mckoi.JDBCDriver
jdbc:cloudscape com.cloudscape.core.JDBCDriver
jdbc:informix-sqli com.informix.jdbc.IfxDriver
jdbc:timesten com.timesten.jdbc.TimesTenDriver
jdbc:as400 com.ibm.as400.access.AS400JDBCDriver
jdbc:sapdb com.sap.dbtech.jdbc.DriverSapDB
jdbc:JSQLConnect com.jnetdirect.jsql.JSQLDriver
jdbc:JTurbo com.newatlanta.jturbo.driver.Driver
jdbc:firebirdsql org.firebirdsql.jdbc.FBDriver
jdbc:interbase interbase.interclient.Driver
jdbc:pointbase com.pointbase.jdbc.jdbcUniversalDriver
jdbc:edbc ca.edbc.jdbc.EdbcDriver
jdbc:mimer:multi1 com.mimer.jdbc.Driver

快速开始

加入依赖

  Druid 0.1.18 之后版本都发布到maven中央仓库中,所以你只需要在项目的pom.xml中加上dependency就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
xml复制代码      <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- mybatis-spring-boot-starter -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<!-- druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.11</version>
</dependency>

  主要使用到的依赖是druid这个依赖,其他的是MySQL和MyBatis的依赖。方便连接数据库使用。

配置DruidMoniterConfig

  配置一个Druid监控管理后台,主要是为了在WEB端可以查看监控管理信息。主要的配置信息有用户名、用户名密码、允许访问的权限路径、黑名单的IP等信息。然后需要配置一个web监控的filter,过滤静态文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typescript复制代码/**
* @ClassName DruidMoniterConfig
* @Description: DruidMoniterConfig
* @Author JavaZhan @公众号:Java全栈架构师
* @Date 2020/6/13
* @Version V1.0
**/
@Configuration
public class DruidMoniterConfig{
@Bean
public ServletRegistrationBean statViewServlet(){
ServletRegistrationBean bean = new ServletRegistrationBean(new StatViewServlet(), "/druid/*");
Map<String,String> initParams = new HashMap<>();
initParams.put("loginUsername","admin");
initParams.put("loginPassword","admin");
initParams.put("allow","");
initParams.put("deny","192.168.127.98");

bean.setInitParameters(initParams);
return bean;
}


@Bean
public FilterRegistrationBean webStatFilter(){
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new WebStatFilter());
Map<String,String> initParams = new HashMap<>();
initParams.put("exclusions","*.js,*.css,/druid/*");
bean.setInitParameters(initParams);
bean.setUrlPatterns(Arrays.asList("/*"));
return bean;
}
}

基础配置信息

  配置文件主要包含数据源的URL、数据库用户名、数据库密码、驱动类等数据源相关配置信息。

1
2
3
4
5
6
7
8
9
10
ini复制代码server.port=8888

# mysql
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
spring.datasource.username=test
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource

mybatis.mapper-locations=classpath*:mapper/**/*.xml

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typescript复制代码/**
* @ClassName DemoMyBatisApplication
* @Description: DemoMyBatisApplication
* @Author JavaZhan @公众号:Java全栈架构师
* @Date 2020/6/13
* @Version V1.0
**/
@SpringBootApplication
@MapperScan("com.example.demo.mapper")
public class DemoMyBatisApplication {

public static void main(String[] args) {
SpringApplication.run(DemoMyBatisApplication.class, args);
}

}

监控页面

  启动项目之后,在浏览器中输入http://127.0.0.1:8888/druid/,自动跳转到http://127.0.0.1:8888/druid/login.html页面,需要输入用户名和密码信息
图片.png
输入用户名和密码后,进入监控页面,页面信息如下:
图片.png
主要包含:数据源、SQL监控、SQL防火墙、Web应用、URI监控、session监控、 Spring监控、 JSON API等信息。好了Spring Boot集成Druid数据监控就已经完成了。

结语

  Spring Boot集成Druid数据监控,方便我们进行数据库连接的数据信息分析,对sql进行监控,都是基础入门的教程,更深入的配置将在以后教程中讲解。

  作者介绍:【小阿杰】一个爱鼓捣的程序猿,JAVA开发者和爱好者。公众号【Java全栈架构师】维护者,欢迎关注阅读交流。

  好了,感谢您的阅读,希望您喜欢,如对您有帮助,欢迎点赞收藏。如有不足之处,欢迎评论指正。下次见。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Cloud Gateway + Jwt + O

发表于 2021-08-25

一、背景

随着我们的微服务越来越多,如果每个微服务都要自己去实现一套鉴权操作,那么这么操作比较冗余,因此我们可以把鉴权操作统一放到网关去做,如果微服务自己有额外的鉴权处理,可以在自己的微服务中处理。

二、需求

1、在网关层完成url层面的鉴权操作。

  • 所有的OPTION请求都放行。
  • 所有不存在请求,直接都拒绝访问。
  • user-provider服务的findAllUsers需要 user.userInfo权限才可以访问。

2、将解析后的jwt token当做请求头传递到下游服务中。
3、整合Spring Security Oauth2 Resource Server

三、前置条件

1、搭建一个可用的认证服务器,可以参考之前的文章.

2、知道Spring Security Oauth2 Resource Server资源服务器如何使用,可以参考之前的文章.

四、项目结构

项目结构

五、网关层代码的编写

1、引入jar包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
xml复制代码<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-oauth2-resource-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>

2、自定义授权管理器

自定义授权管理器,判断用户是否有权限访问

此处我们简单判断

1、放行所有的 OPTION 请求。

2、判断某个请求(url)用户是否有权限访问。

3、所有不存在的请求(url)直接无权限访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
java复制代码package com.huan.study.gateway.config;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.security.authentication.AbstractAuthenticationToken;
import org.springframework.security.authorization.AuthorizationDecision;
import org.springframework.security.authorization.ReactiveAuthorizationManager;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationToken;
import org.springframework.security.web.server.authorization.AuthorizationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.PathMatcher;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.Objects;

/**
* 自定义授权管理器,判断用户是否有权限访问
*
* @author huan.fu 2021/8/24 - 上午9:57
*/
@Component
@Slf4j
public class CustomReactiveAuthorizationManager implements ReactiveAuthorizationManager<AuthorizationContext> {

/**
* 此处保存的是资源对应的权限,可以从数据库中获取
*/
private static final Map<String, String> AUTH_MAP = Maps.newConcurrentMap();

@PostConstruct
public void initAuthMap() {
AUTH_MAP.put("/user/findAllUsers", "user.userInfo");
AUTH_MAP.put("/user/addUser", "ROLE_ADMIN");
}


@Override
public Mono<AuthorizationDecision> check(Mono<Authentication> authentication, AuthorizationContext authorizationContext) {
ServerWebExchange exchange = authorizationContext.getExchange();
ServerHttpRequest request = exchange.getRequest();
String path = request.getURI().getPath();

// 带通配符的可以使用这个进行匹配
PathMatcher pathMatcher = new AntPathMatcher();
String authorities = AUTH_MAP.get(path);
log.info("访问路径:[{}],所需要的权限是:[{}]", path, authorities);

// option 请求,全部放行
if (request.getMethod() == HttpMethod.OPTIONS) {
return Mono.just(new AuthorizationDecision(true));
}

// 不在权限范围内的url,全部拒绝
if (!StringUtils.hasText(authorities)) {
return Mono.just(new AuthorizationDecision(false));
}

return authentication
.filter(Authentication::isAuthenticated)
.filter(a -> a instanceof JwtAuthenticationToken)
.cast(JwtAuthenticationToken.class)
.doOnNext(token -> {
System.out.println(token.getToken().getHeaders());
System.out.println(token.getTokenAttributes());
})
.flatMapIterable(AbstractAuthenticationToken::getAuthorities)
.map(GrantedAuthority::getAuthority)
.any(authority -> Objects.equals(authority, authorities))
.map(AuthorizationDecision::new)
.defaultIfEmpty(new AuthorizationDecision(false));
}
}

3、token认证失败、或超时的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码package com.huan.study.gateway.config;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.security.core.AuthenticationException;
import org.springframework.security.web.server.ServerAuthenticationEntryPoint;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

/**
* 认证失败异常处理
*
* @author huan.fu 2021/8/25 - 下午1:10
*/
public class CustomServerAuthenticationEntryPoint implements ServerAuthenticationEntryPoint {
@Override
public Mono<Void> commence(ServerWebExchange exchange, AuthenticationException ex) {

return Mono.defer(() -> Mono.just(exchange.getResponse()))
.flatMap(response -> {
response.setStatusCode(HttpStatus.UNAUTHORIZED);
String body = "{\"code\":401,\"msg\":\"token不合法或过期\"}";
DataBuffer buffer = response.bufferFactory().wrap(body.getBytes(StandardCharsets.UTF_8));
return response.writeWith(Mono.just(buffer))
.doOnError(error -> DataBufferUtils.release(buffer));
});
}
}

4、用户没有权限的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码package com.huan.study.gateway.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.web.server.authorization.ServerAccessDeniedHandler;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

/**
* 无权限访问异常
*
* @author huan.fu 2021/8/25 - 下午12:18
*/
@Slf4j
public class CustomServerAccessDeniedHandler implements ServerAccessDeniedHandler {

@Override
public Mono<Void> handle(ServerWebExchange exchange, AccessDeniedException denied) {

ServerHttpRequest request = exchange.getRequest();

return exchange.getPrincipal()
.doOnNext(principal -> log.info("用户:[{}]没有访问:[{}]的权限.", principal.getName(), request.getURI()))
.flatMap(principal -> {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.FORBIDDEN);
String body = "{\"code\":403,\"msg\":\"您无权限访问\"}";
DataBuffer buffer = response.bufferFactory().wrap(body.getBytes(StandardCharsets.UTF_8));
return response.writeWith(Mono.just(buffer))
.doOnError(error -> DataBufferUtils.release(buffer));
});
}
}

5、将token信息传递到下游服务器中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码package com.huan.study.gateway.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationToken;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

/**
* 将token信息传递到下游服务中
*
* @author huan.fu 2021/8/25 - 下午2:49
*/
public class TokenTransferFilter implements WebFilter {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

static {
OBJECT_MAPPER.registerModule(new Jdk8Module());
OBJECT_MAPPER.registerModule(new JavaTimeModule());
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication)
.cast(JwtAuthenticationToken.class)
.flatMap(authentication -> {
ServerHttpRequest request = exchange.getRequest();
request = request.mutate()
.header("tokenInfo", toJson(authentication.getPrincipal()))
.build();

ServerWebExchange newExchange = exchange.mutate().request(request).build();

return chain.filter(newExchange);
});
}

public String toJson(Object obj) {
try {
return OBJECT_MAPPER.writeValueAsString(obj);
} catch (JsonProcessingException e) {
return null;
}
}
}

6、网关层面的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
java复制代码package com.huan.study.gateway.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.security.authentication.AbstractAuthenticationToken;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.SecurityWebFiltersOrder;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.oauth2.jose.jws.SignatureAlgorithm;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.security.oauth2.jwt.NimbusReactiveJwtDecoder;
import org.springframework.security.oauth2.jwt.ReactiveJwtDecoder;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter;
import org.springframework.security.oauth2.server.resource.authentication.JwtGrantedAuthoritiesConverter;
import org.springframework.security.oauth2.server.resource.authentication.ReactiveJwtAuthenticationConverterAdapter;
import org.springframework.security.oauth2.server.resource.web.server.ServerBearerTokenAuthenticationConverter;
import org.springframework.security.web.server.SecurityWebFilterChain;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.nio.file.Files;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.interfaces.RSAPublicKey;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

/**
* 资源服务器配置
*
* @author huan.fu 2021/8/24 - 上午10:08
*/
@Configuration
@EnableWebFluxSecurity
public class ResourceServerConfig {

@Autowired
private CustomReactiveAuthorizationManager customReactiveAuthorizationManager;

@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) throws NoSuchAlgorithmException, IOException, InvalidKeySpecException {
http.oauth2ResourceServer()
.jwt()
.jwtAuthenticationConverter(jwtAuthenticationConverter())
.jwtDecoder(jwtDecoder())
.and()
// 认证成功后没有权限操作
.accessDeniedHandler(new CustomServerAccessDeniedHandler())
// 还没有认证时发生认证异常,比如token过期,token不合法
.authenticationEntryPoint(new CustomServerAuthenticationEntryPoint())
// 将一个字符串token转换成一个认证对象
.bearerTokenConverter(new ServerBearerTokenAuthenticationConverter())
.and()
.authorizeExchange()
// 所有以 /auth/** 开头的请求全部放行
.pathMatchers("/auth/**", "/favicon.ico").permitAll()
// 所有的请求都交由此处进行权限判断处理
.anyExchange()
.access(customReactiveAuthorizationManager)
.and()
.exceptionHandling()
.accessDeniedHandler(new CustomServerAccessDeniedHandler())
.authenticationEntryPoint(new CustomServerAuthenticationEntryPoint())
.and()
.csrf()
.disable()
.addFilterAfter(new TokenTransferFilter(), SecurityWebFiltersOrder.AUTHENTICATION);

return http.build();
}

/**
* 从jwt令牌中获取认证对象
*/
public Converter<Jwt, ? extends Mono<? extends AbstractAuthenticationToken>> jwtAuthenticationConverter() {

// 从jwt 中获取该令牌可以访问的权限
JwtGrantedAuthoritiesConverter authoritiesConverter = new JwtGrantedAuthoritiesConverter();
// 取消权限的前缀,默认会加上SCOPE_
authoritiesConverter.setAuthorityPrefix("");
// 从那个字段中获取权限
authoritiesConverter.setAuthoritiesClaimName("scope");

JwtAuthenticationConverter jwtAuthenticationConverter = new JwtAuthenticationConverter();
// 获取 principal name
jwtAuthenticationConverter.setPrincipalClaimName("sub");
jwtAuthenticationConverter.setJwtGrantedAuthoritiesConverter(authoritiesConverter);

return new ReactiveJwtAuthenticationConverterAdapter(jwtAuthenticationConverter);
}

/**
* 解码jwt
*/
public ReactiveJwtDecoder jwtDecoder() throws IOException, NoSuchAlgorithmException, InvalidKeySpecException {
Resource resource = new FileSystemResource("/Users/huan/code/study/idea/spring-cloud-alibaba-parent/gateway-oauth2/new-authoriza-server-public-key.pem");
String publicKeyStr = String.join("", Files.readAllLines(resource.getFile().toPath()));
byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyStr);
X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicKeyBytes);
KeyFactory keyFactory = KeyFactory.getInstance("RSA");
RSAPublicKey rsaPublicKey = (RSAPublicKey) keyFactory.generatePublic(keySpec);

return NimbusReactiveJwtDecoder.withPublicKey(rsaPublicKey)
.signatureAlgorithm(SignatureAlgorithm.RS256)
.build();
}
}

7、网关yaml配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
yaml复制代码spring:
application:
name: gateway-auth
cloud:
nacos:
discovery:
server-addr: localhost:8847
gateway:
routes:
- id: user-provider
uri: lb://user-provider
predicates:
- Path=/user/**
filters:
- RewritePath=/user(?<segment>/?.*), $\{segment}
compatibility-verifier:
# 取消SpringCloud SpringCloudAlibaba SpringBoot 等的版本检查
enabled: false
server:
port: 9203
debug: true

六、演示

1、客户端 gateway 在认证服务器拥有的权限为 user.userInfo
客户端gateway拥有的权限
2、user-provider服务提供了一个api findAllUsers,它会返回 系统中存在的用户(假的数据) 和 解码后的token信息。

3、在网关层面,findAllUsers 需要的权限为 user.userInfo,正好 gateway这个客户端有这个权限,所以可以访问。

演示GIF
在这里插入图片描述

七、代码路径

gitee.com/huan1993/sp…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…549550551…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%