开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

客端日志的收集、存储和分析(二) 前言 一、ClickHou

发表于 2021-09-28

前言

上一节我们介绍了如何接收客户端日志及数据存储方面ClickHouse的写入和更新相关的知识。在写入方面,我们提到了批量写入,批量写入是有利于ClickHouse处理新增数据的。但是,如果我们想保证拥有较快的查询速度,该以怎样的数据结构去组织数据写入呢?这一结我们来介绍一下如何为高效的查询组织数据写入ClickHouse。

一、ClickHouse数据分区

数据分区就是按照一个或多个字段,将数据放到不同的目录中,在很多的OLAP数据库中,都存在数据分区的设计,但是其它很多OLAP数据库都没有分区内小文件自动合并的功能。ClickHouse会对分区内的小文件定时进行合并,以便提高查询性能。分区最常用的就是按天分区。在ClickHouse的MergeTree系列引擎表中,是在创建表时通过partition by指定分区字段的,例如:

1
2
3
4
5
6
7
8
sql复制代码CREATE TABLE test (
event String,
user_id Int64,
event_time String,
dt String
) ENGINE = MergeTree()
partition by dt
order by event

其中dt就是分区字段。有了分区字段之后,我们查询数据时,就可以使用where条件通过分区字段进行过滤,减小数据扫描的范围,提高查询效率。例如,我们只想查询2021年09月1日到9月30日的数据量,可以执行如下sql获取:

1
sql复制代码select count(1) from test where dt between '20210901' and '20210930'

下图是分区的原理图

image.png
如20210201是就一个大的分区,就是我们定义的dt字段,在大分区中,还有很多小文件,每个文件都可能是一次insert的数据生成的文件,如20210201_1_1_0、20210201_2_2_0等。

分区虽然能减小查询时数据的扫描范围,但是切记分区不能分的太细,比如说按照分钟级别进行分区,因为MergeTree表引擎的一个分区对应一批文件,分的太细,可能会造成文件数过多,文件数过多,在查询时,就会扫描大量的小文件,会产生大量的文件寻址,这无疑会增加查询时间。再有,ClickHouse会定时合并每个分区中的小文件,分区数过多,可能会导致ClickHouse合并小文件的速度赶不上数据写入生成的小文件的速度。所以说,分区粒度不能太细。

二、ClickHouse MergeTree引擎一级索引

  1. 一级索引使用示例

ClickHouse提供了一级索引,查询时使用一级索引,能大幅度提高查询性能,一级索引在创建表时通过order by关键字声明,也可通过primary key声明。如果不声明primary key关键字,那么默认情况下order by关键字声明的字段就是一级索引。示例如下:

1
2
3
4
5
6
7
8
sql复制代码CREATE TABLE heaven_eye_analyse.event_production (
event String,
user_id Int64,
$lib String,
dt String
) ENGINE = MergeTree()
partition by dt
order by event

其中event就是一级索引。

下面我们来对比下使用一级索引和普通字段的查询时效。比较的总数据量是50亿。
先来看下使用普通字段的查询时效

image.png

通过普通字段$lib查询30天数据总量,耗时15s。再来看下使用一级索引的查询耗时

image.png
可以看到,耗时只有430ms,和使用普通字段查询相比,速度完全碾压。

2.一级索引原理

接着我们介绍下ClickHouse一级索引的原理。下面是一级索引的原理图

primary.idx、标记文件、数据文件的协同.png

MergeTree表中的每一列,都有两个文件,一个是.bin文件,另外一个是.mrk文件。一级索引除此之外还有一个primary.idx文件。

.bin文件

.bin文件用于存储数据。

由于同一列的数据大概率具有相似性,所以数据一般压缩后存储,可节省磁盘空间,但是ClickHosue的压缩不是针对整个.bin文件进行压缩,而是将.bin文件中的数据划分为多个部分,每个部分压缩在一起,称为一个压缩块,这样做的好处如下:比如说整个.bin文件有1亿条数据,如果是对整个文件进行压缩,那么假设只用读取其中一万条数据,那么需要对整个文件进行解压,这样就浪费了很多时间。相对的,如果只解压这一万条数据所在的存储块,那么效率会提升很多。

另外,每个压缩块中有一个元数据,元数据占9个字节,1个字节表示压缩算法类型,4个字节表示表示压缩前的数据大小,4个字节表示压缩后的数据大小。

.mrk文件

.mrk文件用于描述这一列对应的.bin文件中每个数据块(默认每8192条数据一个间隔)的起始位置及数据在数据块内的位置(解压后)

primary.idx文件

一级索引对应一个primary.idx文件。ClickHouse MergeTree表每隔一定量的数据(默认8192条)就生成一个索引标记,对于一亿条数据,使用12208行索引标记就能索引,由此来看,索引标记占用的空间会非常小,所以索引文件就可以常驻内存,加快存取速度。

primary.idx、标记文件、数据文件的协同

由上面的一级索引原理图可以看出,primary.idx中的每行索引标记和索引对应列的.mrk文件中的每个标记是一对一的关系,标记和压缩块根据一级索引对应的字段压缩前的大小,分为一对一、一对多和多对一。如果想根据一级索引字段查询某个范围内的数据,可以将查询条件和一级索引做交集,找到数据所在的区间对应的标记。举个例子,比如说想查询event between 5000 and 6000之间的数据,根据一级索引,发现索引4553和查询条件有交集,然后根据索引4553找到标记1,标记1指向压缩块1,此时先解压压缩块1,由于标记1的P2的位置是0,那么就从解压后的第一个元素开始,找到符合条件(event between 5000 和 6000)的数据。

总结

本节我们主要介绍ClickHouse提高查询效率的两个方式,即通过ClickHouse MergeTree表引擎的分区和一级索引存。两种方式结合使用,可以让我们在查询时大幅度提高性能。

下面划重点:

在使用分区时,切记分区不能分的太细。

在使用一级索引时,应该选择查询时能被经常命中的条件作为一级索引。

在知道如何高效的组织好数据存储后,我们下一节将介绍如何基于ClickHouse做业务上的数据分析,例如典型的漏斗模型分析、业务留存分析等。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot之SpringSecurity权限注解在

发表于 2021-09-28

前言

Spring Security支持方法级别的权限控制。在此机制上,我们可以在任意层的任意方法上加入权限注解,加入注解的方法将自动被Spring Security保护起来,仅仅允许特定的用户访问,从而还到权限控制的目的, 当然如果现有的权限注解不满足我们也可以自定义

快速开始

  1. 首先加入security依赖如下
1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>

2.接着新建安全配置类

Spring Security默认是禁用注解的,要想开启注解,要在继承WebSecurityConfigurerAdapter的类加@EnableMethodSecurity注解,并在该类中将AuthenticationManager定义为Bean。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@EnableWebSecurity
@Configuration
@EnableGlobalMethodSecurity(
prePostEnabled = true,
securedEnabled = true,
jsr250Enabled = true)
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Bean
@Override
public AuthenticationManager authenticationManagerBean() throws Exception {
return super.authenticationManagerBean();
}
}

我们看到@EnableGlobalMethodSecurity 分别有prePostEnabled 、securedEnabled、jsr250Enabled 三个字段,其中每个字段代码一种注解支持,默认为false,true为开启。那么我们就一一来说一下这三总注解支持。

prePostEnabled = true 的作用的是启用Spring Security的@PreAuthorize 以及@PostAuthorize 注解。

securedEnabled = true 的作用是启用Spring Security的@Secured 注解。

jsr250Enabled = true 的作用是启用@RoleAllowed 注解

更详细使用整合请参考我这两篇
轻松上手SpringBoot+SpringSecurity+JWT实RESTfulAPI权限控制实战

Spring Security核心接口用户权限获取,鉴权流程执行原理

在方法上设置权限认证

JSR-250注解

遵守了JSR-250标准注解
主要注解

  1. @DenyAll
  2. @RolesAllowed
  3. @PermitAll

这里面@DenyAll 和 @PermitAll 相信就不用多说了 代表拒绝和通过。

@RolesAllowed 使用示例

1
2
3
4
5
6
7
8
9
java复制代码@RolesAllowed("ROLE_VIEWER")
public String getUsername2() {
//...
}

@RolesAllowed({ "USER", "ADMIN" })
public boolean isValidUsername2(String username) {
//...
}

代表标注的方法只要具有USER, ADMIN任意一种权限就可以访问。这里可以省略前缀ROLE_,实际的权限可能是ROLE_ADMIN

在功能及使用方法上与 @Secured 完全相同

securedEnabled注解

主要注解

@Secured

  1. Spring Security的@Secured注解。注解规定了访问访方法的角色列表,在列表中最少指定一种角色
  2. @Secured在方法上指定安全性,要求 角色/权限等 只有对应 角色/权限 的用户才可以调用这些方法。 如果有人试图调用一个方法,但是不拥有所需的 角色/权限,那会将会拒绝访问将引发异常。

比如:

1
2
3
4
5
6
7
8
9
10
java复制代码@Secured("ROLE_VIEWER")
public String getUsername() {
SecurityContext securityContext = SecurityContextHolder.getContext();
return securityContext.getAuthentication().getName();
}

@Secured({ "ROLE_DBA", "ROLE_ADMIN" })
public String getUsername2() {
//...
}

@Secured("ROLE_VIEWER") 表示只有拥有ROLE_VIEWER角色的用户,才能够访问getUsername()方法。

@Secured({ "ROLE_DBA", "ROLE_ADMIN" }) 表示用户拥有”ROLE_DBA", "ROLE_ADMIN" 两个角色中的任意一个角色,均可访问 getUsername2 方法。

还有一点就是@Secured,不支持Spring EL表达式

prePostEnabled注解

这个开启后支持Spring EL表达式 算是蛮厉害的。如果没有访问方法的权限,会抛出AccessDeniedException。

主要注解

  1. @PreAuthorize –适合进入方法之前验证授权
  2. @PostAuthorize –检查授权方法之后才被执行并且可以影响执行方法的返回值
  1. @PostFilter –在方法执行之后执行,而且这里可以调用方法的返回值,然后对返回值进行过滤或处理或修改并返回
  1. @PreFilter –在方法执行之前执行,而且这里可以调用方法的参数,然后对参数值进行过滤或处理或修改

@PreAuthorize注解使用

1
2
3
4
java复制代码@PreAuthorize("hasRole('ROLE_VIEWER')")
public String getUsernameInUpperCase() {
return getUsername().toUpperCase();
}

@PreAuthorize(“hasRole(‘ROLE_VIEWER’)”) 相当于@Secured(“ROLE_VIEWER”)。

同样的 @Secured({“ROLE_VIEWER”,”ROLE_EDITOR”}) 也可以替换为:@PreAuthorize(“hasRole(‘ROLE_VIEWER') or hasRole(‘ROLE_EDITOR')”)。

除此以外,我们还可以在方法的参数上使用表达式:

在方法执行之前执行,这里可以调用方法的参数,也可以得到参数值,这里利用JAVA8的参数名反射特性,如果没有JAVA8,那么也可以利用Spring Secuirty的@P标注参数,或利用Spring Data的@Param标注参数。

1
2
3
4
5
6
java复制代码//无java8
@PreAuthorize("#userId == authentication.principal.userId or hasAuthority(‘ADMIN’)")
void changePassword(@P("userId") long userId ){}
//有java8
@PreAuthorize("#userId == authentication.principal.userId or hasAuthority(‘ADMIN’)")
void changePassword(long userId ){}

这里表示在changePassword方法执行之前,判断方法参数userId的值是否等于principal中保存的当前用户的userId,或者当前用户是否具有ROLE_ADMIN权限,两种符合其一,就可以访问该 方法。

@PostAuthorize注解使用

在方法执行之后执行可,以获取到方法的返回值,并且可以根据该方法来决定最终的授权结果(是允许访问还是不允许访问):

1
2
3
4
5
java复制代码@PostAuthorize
("returnObject.username == authentication.principal.nickName")
public CustomUser loadUserDetail(String username) {
return userRoleRepository.loadUserByUserName(username);
}

上述代码中,仅当loadUserDetail方法的返回值中的username与当前登录用户的username相同时才被允许访问

注意如果EL为false,那么该方法也已经执行完了,可能会回滚。EL变量returnObject表示返回的对象。

@PreFilter以及@PostFilter注解使用

Spring Security提供了一个@PreFilter 注解来对传入的参数进行过滤:

1
2
3
4
Java复制代码@PreFilter("filterObject != authentication.principal.username")
public String joinUsernames(List<String> usernames) {
return usernames.stream().collect(Collectors.joining(";"));
}

当usernames中的子项与当前登录用户的用户名不同时,则保留;当usernames中的子项与当前登录用户的用户名相同时,则移除。比如当前使用用户的用户名为zhangsan,此时usernames的值为{"zhangsan", "lisi", "wangwu"},则经@PreFilter过滤后,实际传入的usernames的值为{"lisi", "wangwu"}

如果执行方法中包含有多个类型为Collection的参数,filterObject 就不太清楚是对哪个Collection参数进行过滤了。此时,便需要加入 filterTarget 属性来指定具体的参数名称:

1
2
3
4
5
6
7
8
9
java复制代码@PreFilter
(value = "filterObject != authentication.principal.username",
filterTarget = "usernames")
public String joinUsernamesAndRoles(
List<String> usernames, List<String> roles) {

return usernames.stream().collect(Collectors.joining(";"))
+ ":" + roles.stream().collect(Collectors.joining(";"));
}

同样的我们还可以使用@PostFilter 注解来过返回的Collection进行过滤:

1
2
3
4
java复制代码@PostFilter("filterObject != authentication.principal.username")
public List<String> getAllUsernamesExceptCurrent() {
return userRoleRepository.getAllUsernames();
}

此时 filterObject 代表返回值。如果按照上述代码则实现了:移除掉返回值中与当前登录用户的用户名相同的子项。

自定义元注解

如果我们需要在多个方法中使用相同的安全注解,则可以通过创建元注解的方式来提升项目的可维护性。

比如创建以下元注解:

1
2
3
4
5
java复制代码@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@PreAuthorize("hasRole('ROLE_VIEWER')")
public @interface IsViewer {
}

然后可以直接将该注解添加到对应的方法上:

1
2
3
4
java复制代码@IsViewer
public String getUsername4() {
//...
}

在生产项目中,由于元注解分离了业务逻辑与安全框架,所以使用元注解是一个非常不错的选择。

类上使用安全注解

如果一个类中的所有的方法我们全部都是应用的同一个安全注解,那么此时则应该把安全注解提升到类的级别上:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Service
@PreAuthorize("hasRole('ROLE_ADMIN')")
public class SystemService {

public String getSystemYear(){
//...
}

public String getSystemDate(){
//...
}
}

上述代码实现了:访问getSystemYear 以及getSystemDate 方法均需要ROLE_ADMIN权限。

方法上应用多个安全注解

在一个安全注解无法满足我们的需求时,还可以应用多个安全注解:

1
2
3
4
5
java复制代码@PreAuthorize("#username == authentication.principal.username")
@PostAuthorize("returnObject.username == authentication.principal.nickName")
public CustomUser securedLoadUserDetail(String username) {
return userRoleRepository.loadUserByUserName(username);
}

此时Spring Security将在执行方法前执行@PreAuthorize的安全策略,在执行方法后执行@PostAuthorize的安全策略。

总结

在此结合我们的使用经验,给出以下两点提示:

  1. 默认情况下,在方法中使用安全注解是由Spring AOP代理实现的,这意味着:如果我们在方法1中去调用同类中的使用安全注解的方法2,则方法2上的安全注解将失效。
  2. Spring Security上下文是线程绑定的,这意味着:安全上下文将不会传递给子线程。
1
2
3
4
5
java复制代码public boolean isValidUsername4(String username) {
// 以下的方法将会跳过安全认证
this.getUsername();
return true;
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

CountDownLatch、CyclicBarrier、S

发表于 2021-09-28

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动

先说总结

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

  • CountDownLatch 一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
  • CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
  • CountDownLatch 是不能够重用的,而 CyclicBarrier 是可以重用的。

Semaphore 其实和锁有点类似,它一般用于控制对 某组 资源的访问权限,而锁是控制对 某个 资源的访问权限。

一、CountDownLatch

  • CountDownLatch 类位于 java.util.concurrent 包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
  • CountDownLatch类只提供了一个构造器:
1
java复制代码public CountDownLatch(int count) {  };  //参数count为计数值
  • 然后下面这3个方法是CountDownLatch类中最重要的方法:
1
2
3
4
5
6
java复制代码//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//将count值减1
public void countDown() { };
  • 代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码class Task implements Runnable{
private static int count = 0;
private final int id = count++;
final CountDownLatch latch ;
public Task(CountDownLatch latch){
this.latch = latch;
}

@Override
public void run(){
try {
print(this+"正在执行");
TimeUnit.MILLISECONDS.sleep(3000);
print(this+"执行完毕");
latch.countDown();
} catch (InterruptedException e) {
print(this + " 被中断");
}
}

@Override
public String toString() {
return "Task-"+id;
}
}

public class Test {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
ExecutorService exec = Executors.newCachedThreadPool();

exec.execute(new Task(latch));
exec.execute(new Task(latch));

try {
print("等待2个子线程执行完毕...");
long start = System.currentTimeMillis();
latch.await();
long end = System.currentTimeMillis();

print("2个子线程已经执行完毕 "+(end - start));
print("继续执行主线程");
}catch (InterruptedException e){
print("主线程被中断");
}
exec.shutdown();
}
}

#输出结果:
等待2个子线程执行完毕...
Task-0正在执行
Task-1正在执行
Task-0执行完毕
Task-1执行完毕
2个子线程已经执行完毕 3049
继续执行主线程

二、CyclicBarrier

  • 字面意思回环屏障,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。
  • CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:
  • 参数parties指让多少个线程或者任务等待至barrier状态
  • 参数barrierAction为当这些线程都达到barrier状态时会执行的内容
1
2
java复制代码public CyclicBarrier(int parties, Runnable barrierAction) {}
public CyclicBarrier(int parties) {}
  • 然后CyclicBarrier中最重要的方法就是 await 方法,它有2个重载版本:
  • 第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
  • 第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。
1
2
java复制代码public int await() throws InterruptedException, BrokenBarrierException { };
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
  • 代码展示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
java复制代码class WriteTask implements Runnable{
private static int count = 0;
private final int id = count++;
private CyclicBarrier barrier ;
private static Random random = new Random(47);
public WriteTask(CyclicBarrier cyclicBarrier) {
this.barrier = cyclicBarrier;
}

@Override
public void run() {
print(this+"开始写入数据...");
try {
// 以睡眠来模拟写入数据操作
TimeUnit.MILLISECONDS.sleep(random.nextInt(5000));
print(this+"写入数据完毕,等待其他线程写入完毕"+" "+System.currentTimeMillis());
barrier.await();
} catch (InterruptedException e) {
print(this + "is interrupted!");
}catch(BrokenBarrierException e){
throw new RuntimeException(e);
}
print("所有任务写入完毕,继续处理其他任务... "+System.currentTimeMillis());
}

@Override
public String toString() {
return getClass().getSimpleName()+"-"+id;
}
}

public class CyclicBarrierTest {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < N; ++i){
exec.execute(new WriteTask(barrier));
}
exec.shutdown();
}
}

#输出结果:
WriteTask-3 开始写入数据...
WriteTask-2 开始写入数据...
WriteTask-1 开始写入数据...
WriteTask-0 开始写入数据...
WriteTask-2 写入数据完毕,等待其他线程写入完毕 1512048648904
WriteTask-1 写入数据完毕,等待其他线程写入完毕 1512048650042
WriteTask-0 写入数据完毕,等待其他线程写入完毕 1512048650209
WriteTask-3 写入数据完毕,等待其他线程写入完毕 1512048652606
所有任务写入完毕,继续处理其他任务... 1512048652607
所有任务写入完毕,继续处理其他任务... 1512048652607
所有任务写入完毕,继续处理其他任务... 1512048652607
所有任务写入完毕,继续处理其他任务... 1512048652607
  • 如果说想在所有线程写入操作完之后,进行额外的其他操作可以为CyclicBarrier提供Runnable参数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码class WriteTask implements Runnable{
private static int count = 0;
private final int id = count++;
private CyclicBarrier barrier ;
private static Random random = new Random(47);
public WriteTask(CyclicBarrier cyclicBarrier) {
this.barrier = cyclicBarrier;
}

@Override
public void run() {
print(this+" 开始写入数据...");
try {
// 以睡眠来模拟写入数据操作
TimeUnit.MILLISECONDS.sleep(random.nextInt(5000));
print(this+" 写入数据完毕,等待其他线程写入完毕"+" "+System.currentTimeMillis());
barrier.await();
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
print(this + "is interrupted!");
}catch(BrokenBarrierException e){
throw new RuntimeException(e);
}
print("所有任务写入完毕,继续处理其他任务... "+System.currentTimeMillis()+Thread.currentThread());
}

@Override
public String toString() {
return getClass().getSimpleName()+"-"+id;
}
}

public class CyclicBarrierTest {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {
@Override
public void run() {
print(Thread.currentThread());
}
});

ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < N; ++i){
exec.execute(new WriteTask(barrier));
}
exec.shutdown();
}
}
#输出结果为:
WriteTask-3 开始写入数据...
WriteTask-1 开始写入数据...
WriteTask-2 开始写入数据...
WriteTask-0 开始写入数据...
WriteTask-1 写入数据完毕,等待其他线程写入完毕 1512049061954
WriteTask-2 写入数据完毕,等待其他线程写入完毕 1512049063092
WriteTask-0 写入数据完毕,等待其他线程写入完毕 1512049063261
WriteTask-3 写入数据完毕,等待其他线程写入完毕 1512049065657
Thread[pool-1-thread-4,5,main]
所有任务写入完毕,继续处理其他任务... 1512049065668Thread[pool-1-thread-2,5,main]
所有任务写入完毕,继续处理其他任务... 1512049065668Thread[pool-1-thread-1,5,main]
所有任务写入完毕,继续处理其他任务... 1512049065668Thread[pool-1-thread-4,5,main]
所有任务写入完毕,继续处理其他任务... 1512049065668Thread[pool-1-thread-3,5,main]

从结果可以看出,当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable。

  • 另外CyclicBarrier是可以重用的,看下面这个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
java复制代码class WriteTask implements Runnable{
private static int count = 0;
private final int id = count++;
private CyclicBarrier barrier ;
private static Random random = new Random(47);
public WriteTask(CyclicBarrier cyclicBarrier) {
this.barrier = cyclicBarrier;
}

@Override
public void run() {

while (!Thread.interrupted()){
print(this+" 开始写入数据...");
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(5000));
print(this+" 写入数据完毕,等待其他线程写入完毕"+" "+System.currentTimeMillis());
barrier.await();
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
print(this + "is interrupted!");
}catch(BrokenBarrierException e){
throw new RuntimeException(e);
}
print("所有任务写入完毕,继续处理其他任务... "+System.currentTimeMillis());
}

}

@Override
public String toString() {
return getClass().getSimpleName()+"-"+id;
}
}

class CyclicBarrierManager implements Runnable{
private CyclicBarrier barrier ;
private ExecutorService exec;
public CyclicBarrierManager(CyclicBarrier barrier, ExecutorService exec,int N){
this.barrier = barrier ;
this.exec = exec;
for (int i = 0; i < N-1; ++i){
exec.execute(new WriteTask(barrier));
}
}

@Override
public void run(){
while (!Thread.interrupted()){
try {
barrier.await();
}catch (InterruptedException e){
print(getClass().getSimpleName()+" 被中断了!");
}catch (BrokenBarrierException e){
throw new RuntimeException(e);
}
}
}
}

public class CyclicBarrierTest {
public static void main(String[] args) throws Exception{
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new CyclicBarrierManager(barrier,exec,N));
exec.shutdown();
}
}

#输出结果:
WriteTask-1 开始写入数据...
WriteTask-2 开始写入数据...
WriteTask-0 开始写入数据...
WriteTask-2 写入数据完毕,等待其他线程写入完毕 1512051484365
WriteTask-0 写入数据完毕,等待其他线程写入完毕 1512051485503
WriteTask-1 写入数据完毕,等待其他线程写入完毕 1512051488068
所有任务写入完毕,继续处理其他任务... 1512051488078
所有任务写入完毕,继续处理其他任务... 1512051488078
WriteTask-2 开始写入数据...
所有任务写入完毕,继续处理其他任务... 1512051488078
WriteTask-1 开始写入数据...
WriteTask-0 开始写入数据...
WriteTask-0 写入数据完毕,等待其他线程写入完毕 1512051488513
WriteTask-1 写入数据完毕,等待其他线程写入完毕 1512051489045
WriteTask-2 写入数据完毕,等待其他线程写入完毕 1512051489945
所有任务写入完毕,继续处理其他任务... 1512051489955
WriteTask-0 开始写入数据...
所有任务写入完毕,继续处理其他任务... 1512051489955
所有任务写入完毕,继续处理其他任务... 1512051489955
WriteTask-2 开始写入数据...
WriteTask-1 开始写入数据...
WriteTask-2 写入数据完毕,等待其他线程写入完毕 1512051490155
WriteTask-1 写入数据完毕,等待其他线程写入完毕 1512051494477
WriteTask-0 写入数据完毕,等待其他线程写入完毕 1512051494823
所有任务写入完毕,继续处理其他任务... 1512051494833
所有任务写入完毕,继续处理其他任务... 1512051494833
WriteTask-0 开始写入数据...
所有任务写入完毕,继续处理其他任务... 1512051494833
WriteTask-1 开始写入数据...
WriteTask-2 开始写入数据...
WriteTask-2 写入数据完毕,等待其他线程写入完毕 1512051494961
WriteTask-0 写入数据完毕,等待其他线程写入完毕 1512051496040
WriteTask-1 写入数据完毕,等待其他线程写入完毕 1512051498121
所有任务写入完毕,继续处理其他任务... 1512051498132
所有任务写入完毕,继续处理其他任务... 1512051498132
WriteTask-1 开始写入数据...
所有任务写入完毕,继续处理其他任务... 1512051498132

三、Semaphore

  • Semaphore翻译成字面意思为 信号量,Semaphore 可以同时让多个线程同时访问共享资源,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
  • Semaphore类位于java.util.concurrent包下,它提供了2个构造器:
1
2
3
4
5
6
7
8
java复制代码// 参数permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {
sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}
  • 下面说一下Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:
1
2
3
4
java复制代码public void acquire() throws InterruptedException {  }     //获取一个许可
public void acquire(int permits) throws InterruptedException { } //获取permits个许可
public void release() {} //释放一个许可
public void release(int permits) {} //释放permits个许可
  • acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
  • release()用来释放许可。
  • 注意,在释放许可之前,必须先获获得许可。

这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码// 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire() { };
// 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };
// 尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits) { };
//尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { };
package sychronized;
import java.util.Random;
import java.util.concurrent.*;
import static net.mindview.util.Print.*;

class Worker implements Runnable{
private static int count = 0;
private final int id = count++;
private int finished = 0;
private Random random = new Random(47);
private Semaphore semaphore;
public Worker(Semaphore semaphore){
this.semaphore = semaphore;
}

@Override
public void run(){
try {
while (!Thread.interrupted()){
semaphore.acquire();
print(this+" 占用一个机器在生产... ");
TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
synchronized (this){
print(" 已经生产了"+(++finished)+"个产品,"+"释放出机器");
}
semaphore.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return getClass().getSimpleName()+"-"+id;
}
}

public class SemaphoreTest {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < N; ++i){
exec.execute(new Worker(semaphore));
}
exec.shutdown();
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

🚀今天,我们来详细的聊一聊SpringBoot自动配置原理,

发表于 2021-09-28

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

SpringBoot是我们经常使用的框架,那么你能不能针对SpringBoot实现自动配置做一个详细的介绍。如果可以的话,能不能画一下实现自动配置的流程图。牵扯到哪些关键类,以及哪些关键点。

下面我们一起来看看吧!!

前言:

阅读完本文:

  1. 你能知道 SpringBoot 启动时的自动配置的原理知识
  2. 你能知道 SpringBoot 启动时的自动配置的流程
  3. 以及对于 SpringBoot 一些常用注解的了解

一步一步 debug 从浅到深。

注意:本文的 SpringBoot 版本为 2.5.2

一、启动类

前言什么的,就不说了,大家都会用的,我们直接从 SpringBoot 启动类说起。

1
2
3
4
5
6
java复制代码@SpringBootApplication
public class Hello {
public static void main(String[] args) {
SpringApplication.run(Hello.class);
}
}

@SpringBootApplication 标注在某个类上说明这个类是 SpringBoot 的主配置类, SpringBoot 就应该运行这个类的main方法来启动 SpringBoot 应用;是我们研究的重点!!!它的本质是一个组合注解,我们点进去,看看javadoc上是怎么写的,分析从浅到深,从粗略到详细。

我们点进去看:

1
2
3
4
5
6
java复制代码@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(excludeFilters = { @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
@Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })
public @interface SpringBootApplication {}

Javadoc上是这么写的

表示声明一个或多个@Bean方法并触发 auto-configuration 和 component scanning 的 configuration 类。 这是一个方便的注解,相当于声明了 @Configuration 、 @EnableAutoConfiguration 和@ComponentScan 。

—为什么它能集成这么多的注解的功能呢?

是在于它上面的 @Inherited 注解, @Inherited 表示自动继承注解类型。

这里的最重要的两个注解是 @SpringBootConfiguration 和 @EnableAutoConfiguration。

1.1、@SpringBootConfiguration

我们先点进去看看 @SpringBootConfiguration注解:

1
2
3
4
5
6
java复制代码@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Configuration
@Indexed
public @interface SpringBootConfiguration {}。

1.2、@EnableAutoConfiguration

再看看 @EnableAutoConfiguration.

1
2
3
4
5
6
7
java复制代码@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import(AutoConfigurationImportSelector.class)
public @interface EnableAutoConfiguration {}

1.3、@ComponentScan

@ComponentScan:配置用于 Configuration 类的组件扫描指令。 提供与 Spring XML 的 <context:component-scan> 元素并行的支持。
可以 basePackageClasses 或basePackages ( 或其别名value )来定义要扫描的特定包。 如果没有定义特定的包,将从声明该注解的类的包开始扫描。

作为了解,不是本文重点。

1.4、探究方向

image-20210927135108478

主要探究图中位于中间部分那条主线,其他只会稍做讲解。

二、@SpringBootConfiguration

我们刚刚已经简单看了一下 @SpringBootConfiguration 啦。

1
2
3
java复制代码@Configuration
@Indexed
public @interface SpringBootConfiguration {}

它是 springboot 的配置类,标注在某个类上,表示这是一个 springboot的配置类。

我们在这看到 @Configuration ,这个注解我们在 Spring 中就已经看到过了,它的意思就是将一个类标注为 Spring 的配置类,相当于之前 Spring 中的 xml 文件,可以向容器中注入组件。

不是探究重点。

三、@EnableAutoConfiguration

我们来看看这玩意,它的字面意思就是:自动导入配置。

1
2
3
4
java复制代码@Inherited 
@AutoConfigurationPackage ////自动导包
@Import(AutoConfigurationImportSelector.class) ////自动配置导入选择
public @interface EnableAutoConfiguration {}

从这里顾名思义就能猜到这里肯定是跟自动配置有关系的。

我们接着来看看这上面的两个注解 @AutoConfigurationPackage 和 @Import(AutoConfigurationImportSelector.class) ,这两个才是我们研究的重点。

3.1、@AutoConfigurationPackage

点进去一看:

1
2
3
java复制代码@Inherited
@Import(AutoConfigurationPackages.Registrar.class)
public @interface AutoConfigurationPackage {}

@Import 为 spring 的注解,导入一个配置文件,在 springboot 中为给容器导入一个组件,而导入的组件由 AutoConfigurationPackages.Registrar.class 执行逻辑来决定的。

往下👇看:Registrar

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码static class Registrar implements ImportBeanDefinitionRegistrar, DeterminableImports {

@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
register(registry, new PackageImports(metadata).getPackageNames().toArray(new String[0]));
}

@Override
public Set<Object> determineImports(AnnotationMetadata metadata) {
return Collections.singleton(new PackageImports(metadata));
}
}

在这个地方我们可以打个断点,看看 new PackageImports(metadata).getPackageNames().toArray(new String[0]) 它是一个什么值。

image-20210927143028116

我们用 Evaluate 计算 new PackageImports(metadata).getPackageNames().toArray(new String[0]) 出来可以看到就是 com.crush.hello ,当前启动类所在的包。

继续往下看的话就是和 Spring 注册相关了,更深入 xdm 可以继续 debug。

在这里我们可以得到一个小小的结论:

@AutoConfigurationPackage 这个注解本身的含义就是将主配置类(@SpringBootApplication 标注的类)所在的包下面所有的组件都扫描到 spring 容器中。

如果将一个 Controller 放到 com.crush.hello 以外就不会被扫描到了,就会报错。

3.2、@Import(AutoConfigurationImportSelector.class)

AutoConfigurationImportSelector 开启自动配置类的导包的选择器(导入哪些组件的选择器)

我们点进 AutoConfigurationImportSelector 类来看看,有哪些重点知识,这个类中存在方法可以帮我们获取所有的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public class AutoConfigurationImportSelector implements DeferredImportSelector, BeanClassLoaderAware,
ResourceLoaderAware, BeanFactoryAware, EnvironmentAware, Ordered {

/**选择需要导入的组件 ,*/
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
if (!isEnabled(annotationMetadata)) {
return NO_IMPORTS;
}
AutoConfigurationEntry autoConfigurationEntry = getAutoConfigurationEntry(annotationMetadata);
return StringUtils.toStringArray(autoConfigurationEntry.getConfigurations());
}

//根据导入的@Configuration类的AnnotationMetadata返回AutoConfigurationImportSelector.AutoConfigurationEntry 。
protected AutoConfigurationEntry getAutoConfigurationEntry(AnnotationMetadata annotationMetadata) {
if (!isEnabled(annotationMetadata)) {
return EMPTY_ENTRY;
}
AnnotationAttributes attributes = getAttributes(annotationMetadata);
// 可以在这打个断点,看看 返回的数据
List<String> configurations = getCandidateConfigurations(annotationMetadata, attributes);
//删除重复项
configurations = removeDuplicates(configurations);
// 排除依赖
Set<String> exclusions = getExclusions(annotationMetadata, attributes);
//检查
checkExcludedClasses(configurations, exclusions);
//删除需要排除的依赖
configurations.removeAll(exclusions);
configurations = getConfigurationClassFilter().filter(configurations);
fireAutoConfigurationImportEvents(configurations, exclusions);
return new AutoConfigurationEntry(configurations, exclusions);
}
}

我们看看这个断点,configurations 数组长度为131,并且文件后缀名都为 **AutoConfiguration

image-20210927153110296

这里的意思是将所有需要导入的组件以全类名的方式返回,并添加到容器中,最终会给容器中导入非常多的自动配置类(xxxAutoConfiguration),给容器中导入这个场景需要的所有组件,并配置好这些组件。有了自动配置,就不需要我们自己手写了。


3.2.1、getCandidateConfigurations()

我们还需要思考一下,这些配置都从 getCandidateConfigurations 方法中获取,这个方法可以用来获取所有候选的配置,那么这些候选的配置又是从哪来的呢?

一步一步点进去:

1
2
3
4
5
6
7
8
java复制代码protected List<String> getCandidateConfigurations(AnnotationMetadata metadata, AnnotationAttributes attributes) {
// 这里有个 loadFactoryNames 方法 执行的时候还传了两个参数,一个是BeanClassLoader ,另一个是 getSpringFactoriesLoaderFactoryClass() 我们一起看看
List<String> configurations = SpringFactoriesLoader.loadFactoryNames(getSpringFactoriesLoaderFactoryClass(),
getBeanClassLoader());
Assert.notEmpty(configurations, "No auto configuration classes found in META-INF/spring.factories. If you "
+ "are using a custom packaging, make sure that file is correct.");
return configurations;
}

看一下getSpringFactoriesLoaderFactoryClass() 方法,这里传过去的是

1
2
3
java复制代码protected Class<?> getSpringFactoriesLoaderFactoryClass() {
return EnableAutoConfiguration.class;
}

这个 EnableAutoConfiguration 是不是特别眼熟,(我们探究的起点 @EnableAutoConfiguration ,有没有感觉自己离答案越来越近啦)

我们再看看 loadFactoryNames() 方法带着它去做了什么处理:

image-20210927155811449

先是将 EnableAutoConfiguration.class 传给了 factoryType ,然后 .getName( ) ,所以factoryTypeName 值为 EnableAutoConfiguration。

3.2.2、loadSpringFactories()

接下里又开始调用 loadSpringFactories 方法

image-20210927160059271

这里的 FACTORIES_RESOURCE_LOCATION 在上面有定义:

public static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories";

我们再回到 getCandidateConfigurations 方法处。
image-20210927160251498

1
2
java复制代码Assert.notEmpty(configurations, "No auto configuration classes found in META-INF/spring.factories. If you "
+ "are using a custom packaging, make sure that file is correct.");

这句断言的意思是:“在 META-INF/spring.factories 中没有找到自动配置类。如果您使用自定义包装,请确保该文件是正确的。“

这个 META-INF/spring.factories 在哪里呢?

image-20210927160422635

里面的内容:

image-20210927160459778

我们日常用到的,基本上都有一个配置类。

比如 webmvc,

image-20210928112842076

我们点进 WebMvcProperties 类中去看一下:

image-20210928113056250

那这里到底是要干什么呢?

image-20210927161053520

这里的意思首先是把这个文件的 urls 拿到之后并把这些 urls 每一个遍历,最终把这些文件整成一个properties 对象,loadProperties方法

image-20210927161313114

然后再从 properties 对象里边获取一些我们需要的值,把这些获取到的值来加载我们最终要返回的这个结果,结果 result 为 map 集合,然后返回到loadFactoryNames方法中。

然后我们再回到 loadSpringFactories(classLoaderToUse).getOrDefault(factoryTypeName, Collections.emptyList()); 的调用处。

image-20210927161649177

这个 factoryTypeName 值为 EnableAutoConfiguration

因为 loadFactoryNames 方法携带过来的第一个参数为 EnableAutoConfiguration.class,所以 factoryType 值也为 EnableAutoConfiguration.class,那么 factoryTypeName 值为 EnableAutoConfiguration。

image-20210927161846907

那么map集合中 getOrDefault 方法为什么意思呢?意思就是当 Map 集合中有这个 key 时,就使用这个 key值,如果没有就使用默认值 defaultValue (第二个参数),所以是判断是否包含 EnableAutoConfiguration

看下图,这不就是嘛?

image-20210927161931299

所以就是把 spring-boot-autoconfigure-2.5.2.jar/META-INF/spring.factories 这个文件下的EnableAutoConfiguration 下面所有的组件,每一个 xxxAutoConfiguration 类都是容器中的一个组件,都加入到容器中。加入到容器中之后的作用就是用它们来做自动配置,这就是Springboot自动配置开始的地方。

只有这些自动配置类进入到容器中以后,接下来这个自动配置类才开始进行启动

那 spring.factories 中存在那么多的配置,每次启动时都是把它们全部加载吗?

是全部加载嘛?不可能的哈,这谁都知道哈,全部加载启动一个项目不知道要多久去了。它是有选择的。

我们随便点开一个类,都有这个 @ConditionalOnXXX 注解

image-20210927162055644

@Conditional 其实是 spring 底层注解,意思就是根据不同的条件,来进行自己不同的条件判断,如果满足指定的条件,那么整个配置类里边的配置才会生效。

所以在加载自动配置类的时候,并不是将 spring.factories 的配置全部加载进来,而是通过这个注解的判断,如果注解中的类都存在,才会进行加载。

这就是SpringBoot的自动配置啦.

四、小结

简单总结起来就是:

启动类中有一个 @SpringBootApplication 注解,包含了 @SpringBootConfiguration、 @EnableAutoConfiguration , @EnableAutoConfiguration 代表开启自动装配,注解会去 spring-boot-autoconfigure 工程下寻找 META-INF/spring.factories 文件,此文件中列举了所有能够自动装配类的清单,然后自动读取里面的自动装配配置类清单。因为有 @ConditionalOn 条件注解,满足一定条件配置才会生效,否则不生效。 如: @ConditionalOnClass(某类.class) 工程中必须包含一些相关的类时,配置才会生效。所以说当我们的依赖中引入了一些对应的类之后,满足了自动装配的条件后,自动装配才会被触发。

image-20210927163514187

五、自言自语

纸上得来终觉浅,绝知此事要躬行。

如果可以,可以自己 debug 一遍,画一画流程图。🛌 (躺平)

你好,我是博主宁在春:主页

希望本篇文章能让你感到有所收获!!!

祝 我们:待别日相见时,都已有所成。

如有疑惑,大家可以留言评论。

如有不足之处,请大家指出来,非常感谢 👨‍💻。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Shiro】3 Shiro授权流程

发表于 2021-09-28

授权

授权,即访问控制,控制谁能访问哪些资源。主体进行身份认证后需要分配权限方可访问系统的资源,对于某些资源没有权限是无法访问的。

授权的关键对象

授权可简单理解为Who对What / Which进行How操作。

  • Who:主体(Subject),主体需要访问系统中的资源。
  • What / Which:资源(Resource),如系统菜单、页面、按钮、类方法、系统商品信息等。资源包括资源类型和资源实例,比如商品信息为资源类型,类型为Type01的商品为资源实例,编号为001的商品信息也属于资源实例。
  • How:权限(Permission),规定了主体对资源的操作许可。权限离开资源没有意义,如用户查询权限、用户添加权限、某个类方法的调用权限、编号为001用户的修改权限等,通过权限可知主体对哪些资源都有哪些操作许可。

授权流程

授权一定是基于认证通过后才执行的操作。

20210927094653.png

授权方式(RBAC)

RBAC主要包含两种授权模式:

  • 基于角色的访问控制(Role-Based Access Control):以角色为中心进行访问控制。

伪代码可表示为:

1
2
3
java复制代码if (subject.hasRole("admin")) {
// 执行操作
}
  • 基于资源的访问控制(Resource-Based Access Control):以资源为中心进行访问控制。

伪代码可表示为:

1
2
3
java复制代码if (subject.isPermitted("user:find:*")) {
// 执行操作
}

isPermission 传入参数是一个权限字符串,其格式为 “资源类型 : 操作 : 资源实例” 。

“ user : find : * “ 表示的是Subject对所有User实例具有查询的权限(操作类型)。

“ user : * : 001 “ 表示的是Subject对ID为001的User实例具有所有的权限(操作实例)。

Shiro权限字符串

1. 组成规则

在Shiro中使用权限字符串必须按照Shiro指定的规则。

权限字符串组合规则为:“资源类型标识符 : 操作 : 资源实例标识符”

  • 资源类型标识符: 一般会按模块,对系统划分资源。比如user模块,product模块,order模块等,对应的资源类型标识符就是:user,product,order。
  • 操作: 一般为增删改查(create,delete,update,find),还有 * 标识统配。
  • 资源实例标识符: 如果Subject控制的是资源类型,那么资源实例标识符就是 “*“ ;如果Subject控制的是资源实例,那么就需要在资源实例标识符就是该资源的唯一标识(ID等)。

2. 示例

“ * : * : * “ 表示Subject对所有类型的所有实例有所有操作权限,相当于超级管理员。

“ user : create : * “ 表示Subject对user类型的所有实例有创建的权限,可以简写为:” user : create “。

“ user : update : 001 “ 表示Subject对ID为001的user实例有修改的权限。

“ user : * : 001 “ 表示Subject对ID为001的user实例有所有权限。

Shiro授权方式

Shiro中对于后台授权提供了三种实现方式:

  • 编程式
  • 注解式
  • 标签式(已淘汰,只能在JSP,Thymeleaf等模板引擎中使用)

1. 编程式

1
2
3
4
5
6
java复制代码Subject subject = SecurityUtils.getSubject();
if (subject.hasRole("admin")) {
// 有权限
} else {
// 无权限
}

2. 注解式

1
2
3
4
java复制代码@RequiresRoles("admin")
public void find() {
// 有权限
}

Shiro授权源码

SimpleAccountRealm 类就是Shiro默认完成认证和授权的底层操作,其中 doGetAuthenticationInfo 方法处理认证 ,doGetAuthorizationInfo 方法处理授权。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码public class SimpleAccountRealm extends AuthorizingRealm {

...

// 认证处理
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) throws AuthenticationException {
UsernamePasswordToken upToken = (UsernamePasswordToken) token;
SimpleAccount account = getUser(upToken.getUsername());

if (account != null) {

if (account.isLocked()) {
throw new LockedAccountException("Account [" + account + "] is locked.");
}
if (account.isCredentialsExpired()) {
String msg = "The credentials for account [" + account + "] are expired";
throw new ExpiredCredentialsException(msg);
}

}

return account;
}

// 授权处理
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
String username = getUsername(principals);
USERS_LOCK.readLock().lock();
try {
return this.users.get(username);
} finally {
USERS_LOCK.readLock().unlock();
}
}

}

Shiro授权Demo

授权必须在认证的基础上,因此本Demo基于前文中MD5加密认证的Demo实现授权。

1. 基于角色访问控制

在定义Realm中的授权操作中给Subject添加角色

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public class UserMD5Realm extends AuthorizingRealm {

// 授权操作
@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
// 获取身份信息(用户名)
String username = (String) principals.getPrimaryPrincipal();

// 根据身份信息从数据库中该用户的角色信息装载进入SimpleAuthorizationInfo
SimpleAuthorizationInfo simpleAuthorizationInfo = new SimpleAuthorizationInfo();
simpleAuthorizationInfo.addRole("admin");
simpleAuthorizationInfo.addRole("user");

// 返回权限信息对象
return simpleAuthorizationInfo;
}

// 认证操作
@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) throws AuthenticationException {
...

return null;
}
}

授权流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public static void main(String[] args) {

// 认证操作
...

// 如果认证成功,可以进行授权操作
if (subject.isAuthenticated()) {

// 验证Subject是否具有admin角色
if (subject.hasRole("admin")) {
// 具体授权操作
}

// 验证Subject是否同时具有admin角色和user角色
if (subject.hasAllRoles(Arrays.asList("admin", "user"))) {
// 具体授权操作
}

// 验证Subject是否具有以下角色中的一种或者多种
boolean[] hasRoles = subject.hasRoles(Arrays.asList("admin", "user"));
for (boolean hasRole : hasRoles) {
if (hasRole) {
// 具体授权操作
}
}
}
}

2. 基于权限访问控制

在定义Realm中的授权操作中给Subject添加授权信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public class UserMD5Realm extends AuthorizingRealm {

// 授权操作
@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
// 获取身份信息(用户名)
String username = (String) principals.getPrimaryPrincipal();

// 根据身份信息从数据库中该用户的权限信息装载进入SimpleAuthorizationInfo
SimpleAuthorizationInfo simpleAuthorizationInfo = new SimpleAuthorizationInfo();
simpleAuthorizationInfo.addStringPermission("user:create:001");
simpleAuthorizationInfo.addStringPermission("user:update:*");

// 返回权限信息对象
return simpleAuthorizationInfo;
}

// 认证操作
@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) throws AuthenticationException {
...

return null;
}
}

授权流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public static void main(String[] args) {

// 认证操作
...

// 如果认证成功,可以进行授权操作
if (subject.isAuthenticated()) {

// 判断Subject是否有创建所有user实例的权限
if (subject.isPermitted("user:create:*")) {
// 具体授权操作
}

// 判断Subject同时具有创建所有user实例的权限和修改001号user实例的权限
if (subject.isPermittedAll("user:create:*", "user:update:001")) {
// 具体授权操作
}

// 判断Subject是否具有以下权限中的一种或多种
boolean[] permissions = subject.isPermitted("user:create:*", "user:update:001");
for (boolean permission : permissions) {
if (permission) {
// 具体授权操作
}
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

金九银十,Netty这些核心知识点你又了解多少?

发表于 2021-09-28

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

前言

作为一名Java中高级开发,不会Netty都不好意思出去面试,现在正值一些小伙伴找工作的的时间,我相信你面试的过程肯定有面试官会问到Netty的知识点,本篇文章就为小伙伴整理一下Netty的核心知识点,希望你们面试都能成功拿到自己满意的薪资。

本人收集了一些简历模板资料,希望对小伙伴们有帮助:

百度云盘:

链接:https://pan.baidu.com/s/1Xr-uIzhdDBo4qxMQ7kITCQ
提取码: y9jq

关于怎么写简历,我上午看到这篇文章写得挺好的,分享给你们,链接如下:

简历写得好,工作找得早,程序员的秋招简历指南!

正文

BIO、NIO 和 AIO

  • BIO:俗称同步阻塞 IO,一种非常传统的 IO 模型。简单来说,在服务器中BIO是一个连接由一个专门的线程来服务的工作模式。
  • NIO:一种同步非阻塞的 I/O 模型。简单来说,客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
  • AIO:采用订阅-通知模式,即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。

详细的介绍可以看我之前的文章,相关连接如下:

深入分析 Java IO (二)BIO

深入分析 Java IO (三)NIO

深入分析 Java IO (四)AIO

Netty是什么?为什么要用 Netty?

Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

虽然JAVA NIO 和 JAVA AIO框架提供了多路复用IO/异步IO的支持,但是并没有提供上层“信息格式”的良好封装。用这些API实现一款真正的网络应用则并非易事。

JAVA NIO 和 JAVA AIO并没有提供断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流等的处理,这些都需要开发者自己来补齐相关的工作。

AIO在实践中,并没有比NIO更好。AIO在不同的平台有不同的实现,windows系统下使用的是一种异步IO技术:IOCP;Linux下由于没有这种异步 IO 技术,所以使用的是epoll 对异步 IO 进行模拟。所以 AIO 在 Linux 下的性能并不理想。AIO 也没有提供对 UDP 的支持。

综上,在实际的大型互联网项目中,Java 原生的 API 应用并不广泛,取而代之的是一款第三方Java 框架,这就是Netty。

详细的介绍可以看我之前的文章:

Netty 源码分析系列(二)Netty 架构设计

Netty 源码分析系列(一)Netty 概述

Netty的线程模型

Netty 通过 Reactor 模型基于多路复用器接收并处理用户请求,内部实现了两个线程池,boss线程池和work线程池,其中boss线程池的线程负责处理请求的 accept 事件,当接收到 accept 事件的请求时,把对应的socket封装到一个NioSocketChannel中,并交给 work线程池,其中 work线程池负责请求的read和write事件,由对应的Handler处理。

详细的介绍可以看我之前的文章:Netty 源码分析系列(十)Reactor 模型

Netty 的零拷贝了解么?

在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据。而在 Netty 层面 ,零拷贝主要体现在对于数据操作的优化。

  1. 使用 Netty 提供的 CompositeByteBuf 类,可以将多个ByteBuf 合并为一个逻辑上的 ByteBuf,避免了各个 ByteBuf 之间的拷贝。
  2. ByteBuf 支持 slice 操作,因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf,避免了内存的拷贝。
  3. 通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输,可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环 write 方式导致的内存拷贝问题。

详细的介绍可以看我之前的文章,相关链接如下:

一文彻底弄懂零拷贝原理

Netty 源码分析系列(八)Netty 如何实现零拷贝

Netty 中有哪种重要组件?

  • Channel:Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 等。
  • EventLoop:主要是配合 Channel 处理 I/O 操作,用来处理连接的生命周期中所发生的事情。
  • ChannelFuture:Netty 框架中所有的 I/O 操作都为异步的,因此我们需要 ChannelFuture 的 addListener()注册一个 ChannelFutureListener 监听事件,当操作执行成功或者失败时,监听就会自动触发返回结果。
  • ChannelHandler:充当了所有处理入站和出站数据的逻辑容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
  • ChannelPipeline:为 ChannelHandler 链提供了容器,当 channel 创建时,就会被自动分配到它专属的 ChannelPipeline,这个关联是永久性的。
  • Bootstrap:主要用于配置服务端或客户端的 Netty 程序的启动信息。
  • ByteBuf:字节数据容器,提供比 Java NIO ByteBuffer更好的的 API。

详细的介绍可以看我之前的文章,相关链接如下:

Netty 源码分析系列(三)Channel 概述

Netty 源码分析系列(四)ChannelHandler

Netty 源码分析系列(五)ChannelPipeline源码分析

Netty 源码分析系列(六)字节缓冲区 ByteBuf(上)

Netty 源码分析系列(七)字节缓冲区 ByteBuf(下)

Netty 源码分析系列(九)Netty 程序引导类

什么是 TCP 粘包/拆包?有什么解决办法呢?

TCP 是以流的方式来处理数据,一个完整的包可能会被 TCP 拆分成多个包进行发送,也可能把小的封装成一个大的数据包发送。 TCP 粘包/分包的原因:应用程序写入的字节大小大于套接字发送缓冲区的大小,会发生拆包现象,而应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包现象; 进行 MSS 大小的 TCP 分段,当 TCP 报文长度-TCP 头部长度 >MSS 的时候将发生拆包 以太网帧的 payload(净荷)大于 MTU(1500 字节)进行 ip 分片。

解决的方法:

1.使用 Netty 自带的解码器

(1)通过FixedLengthFrameDecoder 定长解码器来解决定长消息的黏包问题;

(2)通过LineBasedFrameDecoder和StringDecoder来解决以回车换行符作为消息结束符的TCP黏包的问题;

(3)通过DelimiterBasedFrameDecoder 特殊分隔符解码器来解决以特殊符号作为消息结束符的TCP黏包问题;

(4)通过LengthFieldBasedFrameDecoder 自定义长度解码器解决TCP黏包问题。

2.自定义序列化编解码器

通常情况下,我们使用 Protostuff、Hessian2、json 序列方式比较多,另外还有一些序列化性能非常好的序列化方式也是很好的选择。

关于编解码器的介绍,可以看我之前的文章,相关链接如下:

Netty 源码分析系列(十二)Netty 解码器

Netty 源码分析系列(十三)Netty 编码器

Netty 源码分析系列(十四)Netty 编解码器

Netty 源码分析系列(十五)自定义解码器、编码器、编解码器

Netty的工作原理

下图就是 Netty 的工作原理图:

image-20210815210630321

详细的介绍可以看我之前的文章,相关链接:Netty 源码分析系列(十一)Netty 工作原理详解

小结

其实在学习 Netty 源码之前,首先要让自己成为一个熟练工,掌握基本理论。然后再去学习相关的博客、源码等资源。希望上面这份Netty笔记能够帮助到有需要的小伙伴!

看完记得点赞、关注、收藏哟!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

关于Go程序错误处理的一些建议

发表于 2021-09-28

Go的错误处理这块是日常被大家吐槽较多的地方,我在工作中也观察到一些现象,比较严重的是在各层级的逻辑代码中对错误的处理有些重复。

比如,有人写代码就会在每一层都判断错误并记录日志,从代码层面看,貌似很严谨,但是如果看日志会发现一堆重复的信息,等到排查问题时反而会造成干扰。

今天给大家总结三点Go代码错误处理相关的最佳实践给大家。

这些最佳实践也是网上一些前辈分享的,我自己实践后在这里用自己的语言描述出来,希望能对大家有所帮助。

认识error

Go程序通过error类型的值表示错误

error类型是一个内建接口类型,该接口只规定了一个返回字符串值的Error方法。

1
2
3
go复制代码type error interface {
Error() string
}

Go语言的函数经常会返回一个error值,调用者通过测试error值是否是nil来进行错误处理。

1
2
3
4
5
6
go复制代码i, err := strconv.Atoi("42")
if err != nil {
fmt.Printf("couldn't convert number: %v\n", err)
return
}
fmt.Println("Converted integer:", i)

error为nil时表示成功;非nil的error表示失败。

自定义错误记得要实现error接口

我们经常会定义符合自己需要的错误类型,但是记住要让这些类型实现error接口,这样就不用在调用方的程序里引入额外的类型。

比如下面我们自己定义了myError这个类型,如果不实现error接口的话,调用者的代码中就会被myError这个类型侵入。比如下面的run函数,在定义返回值类型时,直接定义成error即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
go复制代码package myerror

import (
"fmt"
"time"
)

type myError struct {
Code int
When time.Time
What string
}

func (e *myError) Error() string {
return fmt.Sprintf("at %v, %s, code %d",
e.When, e.What, e.Code)
}

func run() error {
return &MyError{
1002,
time.Now(),
"it didn't work",
}
}

func TryIt() {
if err := run(); err != nil {
fmt.Println(err)
}
}

如果myError不实现error接口的话,这里的返回值类型就要定义成myError类型。可想而知,紧接着调用者的程序里就要通过myError.Code == xxx 来判断到底是哪种具体的错误(当然想要这么干得先把myError改成导出的MyError)。

那调用者判断自定义error是具体哪种错误的时候应该怎么办呢,myError并未向包外暴露,答案是通过向包外暴露检查错误行为的方法来实现。

1
2
scss复制代码myerror.IsXXXError(err) 
...

抑或是通过比较error本身与包向外暴露的常量错误是否相等来判断,比如操作文件时常用来判断文件是否结束的io.EOF。

类似的还有gorm.ErrRecordNotFound等各种开源包对外暴露的错误常量。

1
2
3
kotlin复制代码if err != io.EOF {
return err
}

错误处理常犯的错误

先看一段简单的程序,看大家能不能发现一些细微的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
go复制代码func WriteAll(w io.Writer, buf []byte) error {
_, err := w.Write(buf)
if err != nil {
log.Println("unable to write:", err) // annotated error goes to log file
return err // unannotated error returned to caller
}
return nil
}

func WriteConfig(w io.Writer, conf *Config) error {
buf, err := json.Marshal(conf)
if err != nil {
log.Printf("could not marshal config: %v", err)
return err
}
if err := WriteAll(w, buf); err != nil {
log.Println("could not write config: %v", err)
return err
}
return nil
}

func main() {
err := WriteConfig(f, &conf)
fmt.Println(err) // io.EOF
}

错误处理常犯的两个问题

上面程序的错误处理暴露了两个问题:

  1. 底层函数WriteAll在发生错误后,除了向上层返回错误外还向日志里记录了错误,上层调用者做了同样的事情,记录日志然后把错误再返回给程序顶层。

因此在日志文件中得到一堆重复的内容

1
2
3
lua复制代码unable to write: io.EOF
could not write config: io.EOF
...

\2. 在程序的顶部,虽然得到了原始错误,但没有相关内容,换句话说没有把WriteAll、WriteConfig记录到 log 里的那些信息包装到错误里,返回给上层。

针对这两个问题的解决方案可以是,在底层函数WriteAll、WriteConfig中为发生的错误添加上下文信息,然后将错误返回上层,由上层程序最后处理这些错误。

一种简单的包装错误的方法是使用fmt.Errorf函数,给错误添加信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码func WriteConfig(w io.Writer, conf *Config) error {
buf, err := json.Marshal(conf)
if err != nil {
return fmt.Errorf("could not marshal config: %v", err)
}
if err := WriteAll(w, buf); err != nil {
return fmt.Errorf("could not write config: %v", err)
}
return nil
}
func WriteAll(w io.Writer, buf []byte) error {
_, err := w.Write(buf)
if err != nil {
return fmt.Errorf("write failed: %v", err)
}
return nil
}

给错误附加上下文信息

fmt.Errorf只是给错误添加了简单的注解信息,如果你想在添加信息的同时还加上错误的调用栈,可以借助github.com/pkg/errors这个包提供的错误包装能力。

1
2
3
4
5
6
7
8
go复制代码//只附加新的信息
func WithMessage(err error, message string) error

//只附加调用堆栈信息
func WithStack(err error) error

//同时附加堆栈和信息
func Wrap(err error, message string) error

有包装方法,就有对应的解包方法,Cause方法会返回包装错误对应的最原始错误–即会递归地进行解包。

1
go复制代码func Cause(err error) error

下面是使用github.com/pkg/errors改写后的错误处理程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
go复制代码func ReadFile(path string) ([]byte, error) {
f, err := os.Open(path)
if err != nil {
return nil, errors.Wrap(err, "open failed")
}
defer f.Close()
buf, err := ioutil.ReadAll(f)
if err != nil {
return nil, errors.Wrap(err, "read failed")
}
return buf, nil
}
func ReadConfig() ([]byte, error) {
home := os.Getenv("HOME")
config, err := ReadFile(filepath.Join(home, ".settings.xml"))
return config, errors.WithMessage(err, "could not read config")
}


func main() {
_, err := ReadConfig()
if err != nil {
fmt.Printf("original error: %T %v\n", errors.Cause(err), errors.Cause(err))
fmt.Printf("stack trace:\n%+v\n", err)
os.Exit(1)
}
}

上面格式化字符串时用的 %+v 是在 % v 基础上,对值进行展开,即展开复合类型值,比如结构体的字段值等明细。

这样既能给错误添加调用栈信息,又能保留对原始错误的引用,通过Cause可以还原到最初始引发错误的原因。

总结

总结一下,错误处理的原则就是:

  1. 错误只在逻辑的最外层处理一次,底层只返回错误。
  2. 底层除了返回错误外,要对原始错误进行包装,增加错误信息、调用栈等这些有利于排查的上下文信息。

喜欢网管的文章内容和写作风格,记得把我安利给更多人哦(微信搜:网管叨bi叨)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

kafka扫盲-思考与实现

发表于 2021-09-28

由于工作中经常用到kafka,但是对kafka的一些内部机制不是很熟悉,所以最近在看kafka相关的知识,我们知道kafka非常经典的消息引擎,它以高性能、高可用著称。那么问题来了,它是怎么做到高性能、高可用的?它的消息是以什么样的形式持久化的?既然写了磁盘,为何速度还那么快?它是如何保证消息不丢失的…?带着这一系列的问题,我们来扒开kafka的面纱。

首先我们思考这样一个问题:为什么需要消息引擎?为什么不能直接走rpc? 以一个订单系统为例:当我们下了一个订单的时候,应该是要先减商品库存,然后用户支付扣钱,商家账户加钱…,最后可能还要发推送或者短信告诉用户下单成功,告诉商家来订单了。

这整个下单过程,如果全部同步阻塞,那么耗时会增加,用户等待的时间会加长,体验不太好,同时下单过程依赖的链路越长,风险越大。为了加快响应,减少风险,我们可以把一些非必须卡在主链路中的业务拆解出去,让它们和主业务解耦。下单的最关键核心就是要保证库存、用户支付、商家打款的一致性,消息的通知完全可以走异步。这样整个下单过程不会因为通知商家或者通知用户阻塞而阻塞,也不会因为它们失败而提示订单失败。

接下来就是如何设计一个消息引擎了,宏观来看一个消息引擎支持发送、存储、接收就行了。

那么如上图一个简易消息队列模型出现了,Engine把发送方的消息存储起来,这样当接收方来找Engine要数据的时候,Engine再从存储中把数据响应给接收放就ok了。既然涉及到持久化的存储,那么缓慢的磁盘IO是要考虑的问题。还有接收方可能不止一个,以上述订单为例,下单完成之后,通过消息把完成事件发出去,这时候负责用户侧推送的开发需要消费这条消息,负责商户侧推送的开发也需要消费这条消息,能想到的最简单的做法就是copy出两套消息,但是这样是不是显得有点浪费?高可用也是一个需要考虑的点,那么我们的engine是不是得副本,有了副本之后,如果一个engine节点挂掉,我们可以选举出一个新副本来工作。光有副本也不行,发送方可能也是多个,这时候如果所有的发送方都把数据打到一个Leader(主)节点上似乎也不合理,单个节点的压力太大。可能你会说:不是有副本吗?让接收方直接从副本读取消息。这样的话又带来另一个问题:副本复制Leader的消息延迟了咋办?读不到消息再读一次Leader?如果这样的话,引擎的设计的貌似更加复杂了,似乎不太合理。那就得想一种既能不通过副本又能分散单节点压力就行了,答案就是分片技术,既然单个Leader节点压力太大,那么就分成多个Leader节点,我们只需要一个好的负载均衡算法,通过负载均衡把消息平均分配到各个分片节点就好了,于是我们可以设计出一套大概长这样的生产者-消费者模型。

但是这些只是简单的想法,具体如何实现还是很复杂的,带着这一系列问题和想法,我们来看看kafka是如何实现的。

思考与实现

首先我们还是从kafka的几个名词入手,主要介绍下消息、主题、分区和消费者组。

一条消息该怎么设计

消息是服务的源头,一切的设计都是为了将消息从一端送到另一端,这里面涉及到消息的结构,消息体不能太大,太大容易造成存储成本上升,网络传输开销变大,所以消息体只需要包含必要的信息,最好不要冗余。消息最好也支持压缩,通过压缩可以在消息体本身就精简的情况下变的更小,那么存储和网络开销可以进一步降低。消息是要持久化的,被消费掉的消息不能一直存储,或者说非常老的消息被再次消费的可能性不大,需要一套机制来清理老的消息,释放磁盘空间,如何找出老的消息是关键,所以每个消息最好带个消息生产时的时间戳,通过时间戳计算出老的消息,在合适的时候进行删除。消息也是需要编号的,编号一方面代表了消息的位置,另一方面消费者可以通过编号找到对应的消息。大量的消息如何存储也是个问题,全部存储在一个文件中,查询效率低且不利于清理老数据,所以采用分段,通过分段的方式把大的日志文件切割成多个相对小的日志文件来提升维护性,这样当插入消息的时候只要追加在段的最后就行,但是在查找消息的时候如果把整个段加载到内存中一条一条找,似乎也需要很大的内存开销,所以需要一套索引机制,通过索引来加速访问对应的Message。

总结:一条kafka的消息包含创造时间、消息的序号、支持消息压缩,存储消息的日志是分段存储,并且是有索引的。

为什么需要Topic

宏观来看消息引擎就是一发一收,有个问题:生产者A要给消费者B发送消息,同时也要给消费者C发送消息。那么消费者B和消费者C如何只消费到自己需要的数据?能想到的简单的做法就是在消息中加Tag,消费者根据Tag来获取自己的消息,不是自己的消息直接跳过,但是这样似乎不太优雅,而且存在cpu资源浪费在消息的过滤上。所以最有效的办法就是对于给B消息不会给C,给C的消息不会给B,这就是Topic。通过Topic来区分不同的业务,每个消费者只需要订阅自己关注的Topic即可,生产者把消费者需要的消息通过约定好的Topic发过去,那么简单的理解就是消息按照Topic分类了。

总结:Topic是个逻辑的概念,Topic可以很好的做业务划分,每个消费者只需要关注自己的Topic即可。

分区如何保证顺序

通过上文我们知道分区的目的就是分散单节点的压力,再结合Topic和Message,那么消息的大概分层就是Topic(主题)->Partition(分区)->Message(消息)。也许你会问,既然分区是为了降低单节点的压力,那么干嘛不用多个topic代替多个分区,在多个机器节点的情况下,我们可以把多个topic部署在多个节点上,似乎也能实现分布式,简单一想似乎可行,仔细一想,还是不对。我们最终还要服务业务的,这样的话,本来一个topic的业务,要拆解成多个topic,反而把业务的定义打散了。

好吧,既然有多个分区了,那么消息的分配是个问题,如果topic下面的数据过于集中在某个分区上,又会造成分布不均匀,解决这个问题,一套好的分配算法是很有必要的。

kafka支持轮询法,即在多分区的情况下,通过轮询可以均匀地把消息分给每个分区,这里需要注意的是,每个分区里的数据是有序的,但是整体的数据是无法保证顺序的,如果你的业务强依赖消息的顺序,那么就要慎重考虑这种方案,比如生产者依次发了A、B、C三个消息,它们分别分布在3个分区中,那么有可能出现的消费顺序是B、A、C。

那么如何保证消息的顺序性?从整体的角度来看,只要分区数大于1,就永远无法保证消息的顺序性,除非你把分区数设置成1,但是这样的话吞吐就是问题。从实际的业务场景来说,一般我们可能需要某个用户的消息、或者某个商品的消息有序就可以了,用户A和用户B的消息谁先谁后没关系,因为它们之间没什么关联,但是用户A的消息我们可能要保持有序,比如消息描述的是用户的行为,行为的先后顺序是不能乱的。这时候我们可以考虑用key hash的方式,同一个用户id,通过hash始终能保持分到一个分区上,我们知道分区内部是有序的,所以这样的话,同一个用户的消息一定是有序的,同时不同的用户可以分配到不同的分区上,这样也利用到了多分区的特性。

总结:kafka整体消息是无法保证有序的,但是单个分区的消息是可以保证有序的。

如何设计一个合理的消费者模型

既然是设计消息模型,那么消费者必不可少,实现消费者最简单的方式就是起一个进程或者线程直接去broker里面拉取消息即可,这很合理,但是如果生产的速度大于当前的消费速度怎么办?第一时间想到的就是再起一个消费者,通过多个消费者来提升消费速度,这里似乎又有个问题,两个消费者都消费到了同一条消息怎么办?加锁是个解决方案,但是效率会降低,也许你会说消费的本质就是读,读是可以共享的,只要保证业务幂等,重复消费消息也没关系。这样的话,如果10个消费者都争抢到了同样的消息,结果有9个消费者都是白白浪费资源的。因此在需要多个消费者提升消费能力的同时,还要保证每个消费者都消费到没被处理的消息,这就是消费者组,消费者组下面可以有多个消费者,我们知道topic是分区的,因此只要消费者组内的每个消费者订阅不同的分区就可以了。理想的情况下是每个消费者都分配到相同数据量分区,如果某个消费者获得的分区数不平均(较多或者较少),出现数据倾斜状态,那么就会导致某些消费者非常繁忙或者轻松,这样就不合理,这就需要一套均衡的分配策略。

kafka消费者分区分配策略主要有3种:

  1. Range:这种策略是针对topic的,会把topic的分区数和消费者数进行一个相除,如果有余数,那就说明多余的分区不够平均分了,此时排在前面的消费者会多分得1个分区,乍看其实挺合理,毕竟本来数量就不均衡。但是如果消费者订阅了多个topic,并且每个topic平均算下来都多几个个分区,那么对于排在前面的消费者就会多消费很多分区。

由于是按照topic维度来划分的,所以最终:

  • c1消费 Topic0-p0、Topic0-p1、Topic1-p0、Topic1-p1
  • c2消费 Topic0-p2、Topic1-p2

最终可以发现消费者c1比消费者c2整整多两个分区,完全可以把c1的分区分一个给c2,这样就可以均衡了。

  1. RoundRobin:这种策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询算法逐个将分区以此分配给每个消费者。假设现在有两个topic,每个topic3个分区,并且有3个消费者。那么大致消费状况是这样的:

  • c0消费 Topic0-p0、Topic1-p0
  • c1消费 Topic0-p1、Topic1-p1
  • c2消费 Topic0-p2、Topic1-p2

看似很完美,但是如果现在有3个topic,并且每个topic分区数是不一致的,比如topic0只有一个分区,topic1有两个分区,topic2有三个分区,而且消费者c0订阅了topic0,消费者c1订阅了topic0和topic1,消费者c2订阅了topic0、topic1、topic2,那么大致消费状况是这样的:

  • c0消费 Topic0-p0
  • c1消费 Topic1-p0
  • c2消费 Topic1-p1、Topic2-p0、Topic2-p1、Topic2-p2

这么看来RoundRobin并不是最完美的,在不考虑每个topic分区吞吐能力的差异,可以看到c2的消费负担明显很大,完全可以将Topic1-p1分区分给消费者c1。

  1. Sticky:Range和RoundRobin都有各自的缺点,某些情况下可以更加均衡,但是没有做到。

Sticky引入目的之一就是:分区的分配要尽可能均匀。以上面RoundRobin 3个topic分别对应1、2、3个分区的case来说,因为c1完全可以消费Topic1-p1,但是它没有。针对这种情况,在Sticky模式下,就可以做到把Topic1-p1分给c1。

Sticky引入目的之二就是:分区的分配尽可能与上次分配的保持相同。这里主要解决就是rebalance后分区重新分配的问题,假设现在有3个消费者c0、c1、c2,他们都订阅了topic0、topic1、topic2、topic3,并且每个topic都有两个分区,此时消费的状况大概是这样:

这种分配方式目前看RoundRobin没什么区别,但是如果此时消费者c1退出,消费者组内只剩c0、c2。那么就需要把c1的分区重新分给c0和c2,我们先来看看RoundRobin是如何rebalance的:

可以发现原来c0的topic1-p1分给了c2,原来c2的topic1-p0分给了c0。这种情况可能会造成重复消费问题,在消费者还没来得及提交的时候,发现分区已经被分给了一个新的消费者,那么新的消费者就会产生重复消费。但是从理论的角度来说,在c1退出之后,可以没必要去动c0和c2的分区,只需要把原本c1的分区瓜分给c0和c2即可,这就是sticky的做法:

需要注意的是Sticky策略中,如果分区的分配要尽可能均匀和分区的分配尽可能与上次分配的保持相同发生冲突,那么会优先实现第一个。

总结:kafka默认支持以上3种分区分配策略,也支持自定义分区分配,自定义的方式需要自己去实现,从效果来看RoundRobin要好于Range的,Sticky是要好于RoundRobin的,推荐大家使用版本支持的最好的策略。

往期精彩:

  • 一文搞懂锁知识
  • 一文搞懂回滚和持久化
  • redis IO模型的演进

微信搜【假装懂编程】,领取电子书,分享大厂面试经验

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

密码学系列之 1Password的加密基础PBKDF2 简介

发表于 2021-09-28

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动

简介

1password是一个非常优秀的密码管理软件,有了它你可以轻松对你的密码进行管理,从而不用再考虑密码泄露的问题,据1password官方介绍,它的底层使用的是PBKDF2算法对密码进行加密。

那么PBKDF2是何方神圣呢?它有什么优点可以让1password得以青睐呢?一起来看看吧。

PBKDF2和PBKDF1

PBKDF的全称是Password-Based Key Derivation Function,简单的说,PBKDF就是一个密码衍生的工具。既然有PBKDF2那么就肯定有PBKDF1,那么他们两个的区别是什么呢?

PBKDF2是PKCS系列的标准之一,具体来说他是PKCS#5的2.0版本,同样被作为RFC 2898发布。它是PBKDF1的替代品,为什么会替代PBKDF1呢?那是因为PBKDF1只能生成160bits长度的key,在计算机性能快速发展的今天,已经不能够满足我们的加密需要了。所以被PBKDF2替换了。

在2017年发布的RFC 8018(PKCS #5 v2.1)中,是建议是用PBKDF2作为密码hashing的标准。

PBKDF2和PBKDF1主要是用来防止密码暴力破解的,所以在设计中加入了对算力的自动调整,从而抵御暴力破解的可能性。

PBKDF2的工作流程

PBKDF2实际上就是将伪散列函数PRF(pseudorandom function)应用到输入的密码、salt中,生成一个散列值,然后将这个散列值作为一个加密key,应用到后续的加密过程中,以此类推,将这个过程重复很多次,从而增加了密码破解的难度,这个过程也被称为是密码加强。

我们看一个标准的PBKDF2工作的流程图:

从图中可以看到,初始的密码跟salt经过PRF的操作生成了一个key,然后这个key作为下一次加密的输入和密码再次经过PRF操作,生成了后续的key,这样重复很多次,生成的key再做异或操作,生成了最终的T,然后把这些最终生成的T合并,生成最终的密码。

根据2000年的建议,一般来说这个遍历次数要达到1000次以上,才算是安全的。当然这个次数也会随着CPU计算能力的加强发生变化。这个次数可以根据安全性的要求自行调整。

有了遍历之后,为什么还需要加上salt呢?加上salt是为了防止对密码进行彩虹表攻击。也就是说攻击者不能预选计算好特定密码的hash值,因为不能提前预测,所以安全性得以提高。标准salt的长度推荐是64bits,美国国家标准与技术研究所推荐的salt长度是128 bits。

详解PBKDF2的key生成流程

上面一小节,我们以一种通俗易懂的方式告诉大家,PBKDF2到底是怎么工作的。一般来说,了解到这一层也就够了,但是如果你想更加深入,了解PBKDF2的key生成的底层原理,那么还请关注这一小节。

我们上面介绍了PBKDF2是一个生成衍生key的函数,作为一个函数,那么就有输入和输出,我们先看下PBKDF2的定义:

1
ini复制代码DK = PBKDF2(PRF, Password, Salt, c, dkLen)

PBKDF2有5个函数,我们看下各个参数代表什么意思:

  • PRF 是一个伪随机散列函数,我们可以根据需要对其进行替换,比如替换成为HMAC函数。
  • Password 是主密码用来生成衍生key。
  • Salt是一个bits序列,用来对密码加盐。
  • c 是循环的次数。
  • dkLen 是生成的key要求的bits长度。
  • DK是最后生成的衍生key。

在上一节中,我们可以看到其实最后的衍生key是由好几部分组成的,上图中的每一个T都代表着衍生key的一部分,最后将这些T合并起来就得到了最终的衍生key,其公式如下:

1
2
ini复制代码DK = T1 + T2 + ⋯ + Tdklen/hlen
Ti = F(Password, Salt, c, i)

上面的F是c次遍历的异或链。其公式如下:

1
r复制代码F(Password, Salt, c, i) = U1 ^ U2 ^ ⋯ ^ Uc

其中:

1
2
3
4
ini复制代码U1 = PRF(Password, Salt + INT_32_BE(i))
U2 = PRF(Password, U1)
⋮
Uc = PRF(Password, Uc−1)

HMAC密码碰撞

如果PBKDF2的PRF使用的是HMAC的话,那么将会发送一些很有意思的问题。对于HMAC来说,如果密码的长度大于HMAC可以接受的范围,那么该密码会首先被做一次hash运算,然后hash过后的字符串会被作为HMAC的输入。

我们举个例子,如果用户输入的密码是:

1
markdown复制代码    Password: plnlrtfpijpuhqylxbgqiiyipieyxvfsavzgxbbcfusqkozwpngsyejqlmjsytrmd

经过一次HMAC-SHA1运算之后,得到:

1
markdown复制代码    SHA1 (hex): 65426b585154667542717027635463617226672a

将其转换成为字符串得到:

1
markdown复制代码    SHA1 (ASCII): eBkXQTfuBqp'cTcar&g*

所以说,如果使用PBKDF2-HMAC-SHA1的加密方式的话,下面两个密码生成衍生key是一样的。

1
2
arduino复制代码    "plnlrtfpijpuhqylxbgqiiyipieyxvfsavzgxbbcfusqkozwpngsyejqlmjsytrmd"
"eBkXQTfuBqp'cTcar&g*"

PBKDF2的缺点

虽然PBKDF2可以通过调节循环遍历的次数来提高密码破解的难度。但是可以为其研制特殊的处理器,只需要很少的RAM就可以对其进行破解。为此bcrypt 和 scrypt 等依赖于大量RAM的加密算法,这样就导致那些廉价的ASIC处理器无用武之地。

总结

以上就是PBKDF2的简单介绍,想要详细了解更多的朋友,可以参考我的其他关于密码学的文章。

本文已收录于 www.flydean.com/41-pbkdf2/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PHP中的FFmpeg安装及使用 FFmpeg简介 FFmp

发表于 2021-09-28

本文和大家分享PHP中的FFmeg安装及使用,希望能帮助到大家

FFmpeg简介

FFmpeg是视频处理最常用的开源软件。
它功能强大,用途广泛,大量用于视频网站和商业软件(比如 Youtube 和 iTunes),也是许多音频和视频格式的标准编码/解码实现。
关于FFMPEG视音频编解码的知识可以参考大神雷霄骅的系列教程
blog.csdn.net/leixiaohua1…

FFmpeg安装

下载fmmpeg代码

git clone https://git.ffmpeg.org/ffmpeg.git ffmpeg

安装依赖库文件

主要安装三个:yasm ,sdl1.2 和 sdl2.0

安装 yasm

sudo apt-get install yasm

安装sdl1.2

sudo apt-get install libsdl1.2-dev

安装 sdl2.0

sudo apt-get install libstl2-dev

编译安装FFmpeg

进入到解压之后的 ffmpeg文件夹,依次执行以下命令:

1
2
3
go复制代码./configure
make
sudo make install

测试安装成功与否

输入以下命令查看输出:

在这里插入图片描述

linux下 FFmpeg 命令介绍

FFmpeg 常用的命令行参数如下。

1
2
3
4
5
6
7
8
9
diff复制代码-c:指定编码器
-c copy:直接复制,不经过重新编码(这样比较快)
-c:v:指定视频编码器
-c:a:指定音频编码器
-i:指定输入文件
-an:去除音频流
-vn: 去除视频流
-preset:指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
-y:不经过确认,输出时直接覆盖同名文件。

常见命令

查看文件信息

查看视频文件的元信息,比如编码格式和比特率,可以只使用-i参数。
$ ffmpeg -i input.mp4

分离视频音频流

1
2
go复制代码ffmpeg -i input_file -vcodec copy -an output_file_video  //分离视频流
ffmpeg -i input_file -acodec copy -vn output_file_audio  //分离音频流

视频转码

转换编码格式(transcoding)指的是, 将视频文件从一种编码转成另一种编码。比如转成 H.264 编码,一般使用编码器libx264,所以只需指定输出文件的视频编码器即可。
$ ffmpeg -i [input.file] -c:v libx264 output.mp4

改变分辨率

下面是改变视频分辨率(transsizing)的例子,从 1080p 转为 480p 。

1
2
3
4
css复制代码$ ffmpeg \
-i input.mp4 \
-vf scale=480:-1 \
output.mp4

添加音轨

添加音轨(muxing)指的是,将外部音频加入视频,比如添加背景音乐或旁白。

1
2
3
css复制代码$ ffmpeg \
-i input.aac -i input.mp4 \
output.mp4

截图

1
2
3
4
5
diff复制代码$ ffmpeg \
-ss 01:23:45 \
-i input \
-vframes 1 -q:v 2 \
output.jpg

上面例子中,-ss 01:23:45表示截取的时间戳, -vframes 1指定只截取一帧,-q:v 2表示输出的图片质量,一般是1到5之间(1 为质量最高)。

为音频添加封面

下面命令可以将音频文件,转为带封面的视频文件

1
2
3
4
5
css复制代码$ ffmpeg \
-loop 1 \
-i cover.jpg -i input.mp3 \
-c:v libx264 -c:a aac -b:a 192k -shortest \
output.mp4

PHP中FFmpeg使用

exec方法

视频压缩

1
2
3
4
5
6
php复制代码public static function reSize($path, $outPath, $toSize)
{
$shell = "ffmpeg -i " . $path . " -strict -2 -y -fs " . $toSize . " ". $outPath . " 2>&1";
exec($shell, $output, $ret);
return $ret;
}

视频截图

1
2
3
4
5
6
php复制代码public static function screenshot($path, $outPath)
{
$shell = "ffmpeg -i " . $path . " -ss 1 -y -frames:v 1 -q:v 1 " . $outPath . " 2>&1";
exec($shell, $output, $ret);
return $res;
}

php-ffmpeg扩展

composer安装php-ffmpeg
$ composer require php-ffmpeg/php-ffmpeg

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
php复制代码require 'vendor/autoload.php';
$ffmpeg = FFMpeg\FFMpeg::create();
$video = $ffmpeg->open('video.mpg');
$video
->filters()
->resize(new FFMpeg\Coordinate\Dimension(320, 240))
->synchronize();
$video
->frame(FFMpeg\Coordinate\TimeCode::fromSeconds(10))
->save('frame.jpg');
$video
->save(new FFMpeg\Format\Video\X264(), 'export-x264.mp4')
->save(new FFMpeg\Format\Video\WMV(), 'export-wmv.wmv')
->save(new FFMpeg\Format\Video\WebM(), 'export-webm.webm');

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…515516517…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%