开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

高频面试题-请聊一下Spring中BeanFactory与F

发表于 2021-06-17

这是我参与更文挑战的第6天,活动详情查看: 更文挑战

如果你的简历上这么写,熟练掌握Spring,并对Spring源码有一定的见解(或者是读过Spring源码),那么无法避免的就是要问几个问题。

IOC的实现原理是什么? AOP底层是如何实现的? 因为这些基本上都是面试必然要准备的题目,所以如果你能粗略的说上一些来,就可能会问一些Spring中使用到的接口,就比如今天我们要说的BeanFactory和FactoryBean有什么不一样(区别)?

如果你确实有些疑惑,那么接着看下去吧。

什么是BeanFactory?

其实Spring的源码中,当然很多知名技术的源码也一样,其中声明的类、接口,亦或者变量,都是能凭借英文看出大概的意思来的。

这个BeanFactory就是这样,首先字面意思就是Bean工厂,再转换一下思路,也就是个Bean容器了。

先看源代码,上图!

1.png

可以看到BeanFactory是一个接口,它在Spring中是最基础的接口之一了,作用就是定义获取bean及bean的各种属性,在源代码中也可以看出来,里面有我们在编码过程中常用到的getBean()方法。

如果你看过一些Spring源码,知道其中有一个XmlBeanFactory类,这个类被称为是IOC容器的基础类,就是这么一个基础类其实也是基于这个BeanFactory接口来实现的,有兴趣可以去看看这个类的源码;这里建议使用IDEA中的Diagrams功能查看UML类图来查看各类、接口之间的关系,更容易理解。

什么是FactoryBean?

先读一下字面意思,工厂Bean?

也可以说成是个工厂Bean,它的职责是Spring提供给用户的一种实例化bean的方式,当然,是要通过实现该接口才可以。

这里还要说一下,其实Spring还有其他两种实例化bean的方式。

一种是通过反射来利用获取bean的class属性,通过其class属性实例化具体的bean。

一种是通过配置文件来实例化。

但是这两种方法都不够灵活,如果大家还想多了解一下这两种方式,还请多查阅一下资料和源码。

总结一下

总结下来,其实BeanFactory和FactoryBean半毛钱关系都没有,就是名字像,我依稀记得以前被问到的时候,还是一脸懵逼,后来了解了以后,发现竟然是这样。

果然面试官才是最有套路的,就是想看一下你是不是真正的去读了一些源码。

当被问这个问题时,只需要将两种具体的职责,和为Spring所做的贡献讲一讲,基本上面试官就不会在这个问题上为难你了。

各位,加油,喜欢就点个赞,赞,赞,赞,赞,赞。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 Flow Activiti7 Task 入门

发表于 2021-06-17

这是我参与更文挑战的第10天,活动详情查看: 更文挑战

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

一 . 前言

此篇文档将开启 Activiti 的系列文档 , 所以这篇文章的内容主要以流程使用为主 , 为了更清晰 , 我们由 Task 往外层分析 >>>

后续再来看看 Process 实例的创建以及配置的处理

二 . 整体使用

2.1 Maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
xml复制代码<dependency>
<groupId>org.activiti</groupId>
<artifactId>activiti-spring-boot-starter</artifactId>
<version>7.0.0.Beta2</version>
</dependency>

<!-- 由于需要使用数据库 , 所以需要加入如下 DAO 框架 -->

<!-- DAO -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- PS : 该包主要是为了构建一个 DataSource-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

2.2 application.yml

PS : 无需创建数据库 , Activiti 中会默认创建表 , 创建流程我们以后来看 >>>

1
2
3
4
5
6
7
8
9
10
java复制代码spring:
datasource:
url: jdbc:mysql://127.0.0.1:3306/activiti007?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT&nullCatalogMeansCurrent=true
username : root
password : 123456
driver-class-name: com.mysql.jdbc.Driver
activiti:
database-schema-update: true
server:
port: 8086

默认会建如下表 :

image.png

2.3 前置准备

Activiti 默认是需要和用户绑定的 , 此处需要进行必要的配置 :

在缓存中添加2个用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码@Configuration
public class SecurityConfiguration {

private Logger logger = LoggerFactory.getLogger(SecurityConfiguration.class);

@Bean
public UserDetailsService myUserDetailsService() {

InMemoryUserDetailsManager inMemoryUserDetailsManager = new InMemoryUserDetailsManager();
logger.info("> Registering new user: " + "root" + " with the following Authorities[ 'ACTIVE' , 'ADMIN' ]");

// 构建 Group 组信息
List<SimpleGrantedAuthority> groupList = new ArrayList<>();
// 注意 , 该权限是必须的
groupList.add(new SimpleGrantedAuthority("ROLE_ACTIVITI_USER"));
groupList.add(new SimpleGrantedAuthority("ADMIN"));

// 准备2个用户 : Root , Admin
inMemoryUserDetailsManager.createUser(new User("root", passwordEncoder().encode("123456"), groupList));
inMemoryUserDetailsManager.createUser(new User("admin", passwordEncoder().encode("123456"), groupList));

return inMemoryUserDetailsManager;
}


@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}

}

模拟登录工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码@Component
public class SecurityUtil {

private Logger logger = LoggerFactory.getLogger(SecurityUtil.class);

@Autowired
private UserDetailsService userDetailsService;

@Autowired
private PasswordEncoder passwordEncoder;

public void logInAs(String username) {

UserDetails user = userDetailsService.loadUserByUsername(username);

logger.info("> 用户安全配置 (1) : 简单校验用户是否存在 [{}]", username);
if (user == null) {
throw new IllegalStateException("User " + username + " doesn't exist, please provide a valid user");
}

logger.info("------> 用户安全配置 (2) , Security 中模拟登录对象 :{} <-------", username);
SecurityContextHolder.setContext(new SecurityContextImpl(new UsernamePasswordAuthenticationToken(user.getUsername(), "123456")));

logger.info("------> 用户安全配置 (3) , Activiti 中设置对象 :{} <-------", username);
org.activiti.engine.impl.identity.Authentication.setAuthenticatedUserId(username);
}
}

2.4 一个简单的 flow 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
java复制代码@RestController
public class StartController {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
private ProcessRuntime processRuntime; //实现流程定义相关操作

@Autowired
private TaskRuntime taskRuntime; //实现任务相关操作

@Autowired
private SecurityUtil securityUtil;//SpringSecurity相关的工具类

@RequestMapping("/test")
public String test() {
logger.info("------> [成功进入 StartController] <-------");
return "Success !";
}

@GetMapping("/info")
public String getInfd() {
Page<ProcessDefinition> processDefinitionPage = processRuntime
.processDefinitions(Pageable.of(0, 10));
logger.info("------> 可用的流程定义数量:[{}] <-------", processDefinitionPage.getTotalItems());
for (ProcessDefinition pd : processDefinitionPage.getContent()) {
logger.info("------> 流程定义:[{}] <-------", pd);
}

return "success";
}

@GetMapping("/startFlow")
public String startFlow() {
securityUtil.logInAs("root");
ProcessInstance pi = processRuntime.start(ProcessPayloadBuilder
.start()
// processers 中定义的 .bpm 文件
.withProcessDefinitionKey("SimpleProcess")
.build());//启动流程实例

logger.info("------>流程实例ID: + [{}] <-------", pi.getId());
return "开启流程";
}

@GetMapping("/selectFlow")
public String selectFlow() {
securityUtil.logInAs("root");
Page<Task> taskPage = taskRuntime.tasks(Pageable.of(0, 10));
if (taskPage.getTotalItems() > 0) {
taskPage.getContent().forEach(item -> {
logger.info("------> 剩余任务 :[{}] <-------", JSONObject.toJSONString(item));
});
} else {
logger.info("------> 任务全部执行完成 <-------", taskPage.getContent());
}


return "查询 Flow : " + taskPage.getTotalItems();
}

@GetMapping("/doFlow")
public String doFlowBusiness() {

logger.info("------> [进入 doFlowBusiness 处理流程] <-------");
securityUtil.logInAs("root");
Page<Task> taskPage = taskRuntime.tasks(Pageable.of(0, 10));

logger.info("------> Task 启动完成 <-------");

if (taskPage.getTotalItems() > 0) {
for (Task task : taskPage.getContent()) {

logger.info("------> 循环处理任务 [{}] <-------", task.getName());

//拾取任务
taskRuntime.claim(TaskPayloadBuilder.claim().withTaskId(task.getId()).build());

//执行任务
taskRuntime.complete(TaskPayloadBuilder.complete().withTaskId(task.getId()).build());
}
}

logger.info("------> 查询任务的结果 <-------");

Page<Task> taskPage2 = taskRuntime.tasks(Pageable.of(0, 10));
if (taskPage2.getTotalItems() > 0) {
logger.info("------> 剩余任务 :[{}] <-------", taskPage2.getContent());
} else {
logger.info("------> 任务全部执行完成 <-------", taskPage2.getContent());
}

return "Success : Do Flow Business 处理完成";
}

@GetMapping("deleteFlow")
public String deleteFlow() {
// PS : 此处如果是多个用户 , 需要切换用户
// securityUtil.logInAs("admin");
Page<Task> temTaskList = taskRuntime.tasks((Pageable.of(0, 10)));
temTaskList.getContent().forEach(item -> {
try {
logger.info("------> Step 4 item : 删除Task :{} <-------", item.getId());
taskRuntime.delete(TaskPayloadBuilder.delete().withTaskId(item.getId()).build());
} catch (Exception e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}

});
return "success";
}


}

三 . Task 核心流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
java复制代码@Component
public class ActivitiTaskRuntimeService implements ApplicationRunner {

private Logger logger = LoggerFactory.getLogger(this.getClass());

private static String taskId = "MyTask001";

@Autowired
private TaskRuntime taskRuntime;
@Autowired
private SecurityUtil securityUtil;

@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("------> [开启一个完整的 Activiti 流程] , 首先模拟登录一个用户 <-------");

// PS : Activiti 默认依赖 Spring Security
securityUtil.logInAs("root");

deleteTask();

// 创建流程
createTask();

// 查询对象
getTaskInfo();
selectTaskInfo();

selectTaskInfoByUserId("admin");
selectTaskInfoByUserId("root");

// 执行流程
doTask();

// 再次查询流程
getTaskInfo();
selectTaskInfo();
}


public void getTaskInfo() {
try {
// Step 2 : 查询单个 Task 信息
Task temTask = taskRuntime.task(taskId);
logger.info("------> Step 2 查询 ID : {} - 对应的 Task :{} <-------", taskId, JSONObject.toJSONString(temTask));
} catch (NotFoundException e) {
logger.error("E----> 当前 Task 已经处理完成 , 未查询到 error :{} -- content :{}", e.getClass(), e.getMessage());
}
}

/**
* 查询当前的 Task 案例
*/
public void selectTaskInfo() {
// Step 2 : 查询已知的所有的 Task 信息
Pageable pageable = Pageable.of(0, 10);
Page<Task> temTaskList = taskRuntime.tasks(pageable);
temTaskList.getContent().forEach(item -> {
logger.info("------> Step 2-1 查询系列数量 - [{}] - 对应的 Task :{} <-------", temTaskList.getTotalItems(), JSONObject.toJSONString(item));
});

}

/**
* 对应委托人查询自己的任务
*/
public void selectTaskInfoByUserId(String assignee) {
// Step 2 : 查询已知的所有的 Task 信息
Pageable pageable = Pageable.of(0, 10);
Page<Task> temTaskList = taskRuntime.tasks(pageable, TaskPayloadBuilder.tasks().withAssignee(assignee).build());
temTaskList.getContent().forEach(item -> {
logger.info("------> Step 2-2 查询 assignee :{} 系列数量 - [{}] - 对应的 Task :{} <-------", assignee, temTaskList.getTotalItems(), JSONObject.toJSONString(item));
});

}


/**
* 创建一个 Task 任务
*/
public void createTask() {
logger.info("------> Step 1 : 创建一个 Task 开始 <-------");
CreateTaskPayload taskPayloadBuilder = TaskPayloadBuilder.create()
.withName("First Team Task")
.withDescription("This is something really important")
// 设置当前 Task 的用户组
.withGroup("ADMIN")
.withPriority(10)
.build();
Task temTask = taskRuntime.create(taskPayloadBuilder);

logger.info("------> Step 1 创建第二个 Task , 注意 , 此处设置了 Assignee <-------");
CreateTaskPayload taskPayloadBuilderTo = TaskPayloadBuilder.create()
.withName("Second Team Task")
.withDescription("This is something really important hava Assignee")
// 设置当前 Task 的用户组
.withGroup("ADMIN")
.withAssignee("admin")
.withPriority(10)
.build();
taskRuntime.create(taskPayloadBuilderTo);


this.taskId = temTask.getId();
}

/**
* 执行一个 Task
*/
public void doTask() {

logger.info("------> Step 3-1 : 声明一个 Task 开始 claimed <-------");
taskRuntime.claim(TaskPayloadBuilder.claim().withTaskId(taskId).build());
logger.info("------> Step 3-3 : 完成一个 Task 开始 complete <-------");
taskRuntime.complete(TaskPayloadBuilder.complete().withTaskId(taskId).build());
}

/**
* 删除 Task : PS : 注意 , 删除也需要权限
*/
public void deleteTask() {
logger.info("------> Step 4 : 删除所有的Task <-------");

Pageable pageable = Pageable.of(0, 10);
Page<Task> temTaskList = taskRuntime.tasks(pageable);
temTaskList.getContent().forEach(item -> {
try {
logger.info("------> Step 4 item : 删除Task :{} <-------", item.getId());
taskRuntime.delete(TaskPayloadBuilder.delete().withTaskId(item.getId()).build());
} catch (Exception e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}

});

securityUtil.logInAs("admin");
Pageable pageableAdmin = Pageable.of(0, 10);
Page<Task> temTaskListAdmin = taskRuntime.tasks(pageable);
temTaskListAdmin.getContent().forEach(item -> {
try {
logger.info("------> Step 4 item : 删除Task :{} <-------", item.getId());
taskRuntime.delete(TaskPayloadBuilder.delete().withTaskId(item.getId()).build());
} catch (Exception e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}
});

securityUtil.logInAs("root");


}
}

3.1 TaskRuntime 模块

接口一览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码C- TaskRuntime
public interface TaskRuntime {
TaskRuntimeConfiguration configuration();
Task task(String taskId);
Page tasks(Pageable pageable);
Page tasks(Pageable pageable, GetTasksPayload payload);
Task create(CreateTaskPayload payload);
Task claim(ClaimTaskPayload payload);
Task release(ReleaseTaskPayload payload);
Task complete(CompleteTaskPayload payload);
Task update(UpdateTaskPayload payload);
Task delete(DeleteTaskPayload payload);
...
}

方法作用

  • Task task(String taskId) : 通过id获取任务
  • Page tasks(Pageable pageable); 获取当前认证用户所有的 Task
  • Page tasks(Pageable pageable,GetTasksPayload getTasksPayload) : 获取在Payload中应用筛选器的所有任务
  • Task create(CreateTaskPayload createTaskPayload) : 创建 Task
  • Task claim(ClaimTaskPayload claimTaskPayload) : 声明一个 Task
    • 如果没有经过验证的用户抛出IllegalStateException
    • 如果当前认证用户不是一个候选用户抛出IllegalStateException
    • 当前方法不支持模拟,它将始终接受当前已验证的用户
    • 在声明之后,任务应该处于分配状态
  • Task release(ReleaseTaskPayload releaseTaskPayload) : 发布一个先前声明的任务
  • Task complete(CompleteTaskPayload completeTaskPayload) : 在有效负载中设置变量来完成选定的任务
    • 此方法还检查任务是否在完成之前分配给当前已验证的用户
    • 该方法返回一个浅层Task对象,其中包含验证任务已完成所需的基本信息
  • Task update(UpdateTaskPayload updateTaskPayload) : 更新任务详细信息
  • Task delete(DeleteTaskPayload deleteTaskPayload) : 删除任务

3.2 Task 对象

Task 是流程中的核心流转对象 , 来看一下该对象的参数有哪些 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码public interface Task extends TaskInfo {

/**
* 创建新任务时优先级的默认值
*/
int DEFAULT_PRIORITY = 50;

/** 任务的名称或标题. */
void setName(String name);

/** 为任务设置可选的本地化名称. */
void setLocalizedName(String name);

/** 修改任务描述 */
void setDescription(String description);

/** 为任务设置可选的本地化描述. */
void setLocalizedDescription(String description);

/** 设定任务的重要性/紧迫性*/
void setPriority(int priority);

/**
* 负责此任务的人员的userId.
*/
void setOwner(String owner);

/**
* 被委派此任务的人的userId
*/
void setAssignee(String assignee);

/** 此任务的当前委派状态. */
DelegationState getDelegationState();

/** 此任务的当前委派状态。 */
void setDelegationState(DelegationState delegationState);

/**更改任务的到期日期 */
void setDueDate(Date dueDate);

/**
* 更改任务的类别。这是一个可选的字段,允许将任务标记为属于某个类别。
*/
void setCategory(String category);

/** 父任务 ID */
void setParentTaskId(String parentTaskId);

/** 修改任务的tenantId */
void setTenantId(String tenantId);

/** 更改任务的表单键 */
void setFormKey(String formKey);

/** 指示此任务是否挂起. */
boolean isSuspended();

}

3.3 TaskRuntimeImpl 模块

TaskRuntimeImpl 是 TaskRuntime 的实现类 , Task 处理中 ,通过该对象实现业务的具体操作 , 我们以删除操作为例 , 看一下整体的流程 :

Step 1 : Delete 的触发

在业务中 ,通过调用 方法触发对 Task 的删除操作

1
2
3
4
5
6
7
8
9
10
11
java复制代码    public void deleteTask() {
logger.info("------> Step 4 : 删除所有的Task <-------");
// 设置当前用户
securityUtil.logInAs("admin");
Pageable pageableAdmin = Pageable.of(0, 10);
Page<Task> temTaskListAdmin = taskRuntime.tasks(pageable);
temTaskListAdmin.getContent().forEach(item -> {
taskRuntime.delete(TaskPayloadBuilder.delete().withTaskId(item.getId()).build());
});

}

PS : 这里我们做了一个特殊操作 ->securityUtil.logInAs("admin");

这是因为 Task 的操作是由权限划分的 ,对应的人员只能操作自己的 Task

Step 2 : 调用 TaskRuntimeImpl # delete 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码 public Task delete(DeleteTaskPayload deleteTaskPayload) {
// 获取一个 Task
Task task;
try {
task = task(deleteTaskPayload.getTaskId());
} catch (IllegalStateException ex) {
throw new IllegalStateException("T....");
}

// 获取当前认证的 Userid
String authenticatedUserId = securityManager.getAuthenticatedUserId();
// 验证您是否试图删除您是受让人或所有者的任务
if ((task.getAssignee() == null
|| task.getAssignee().isEmpty()
|| !task.getAssignee().equals(authenticatedUserId))
&& (task.getOwner() == null
|| task.getOwner().isEmpty()
|| !task.getOwner().equals(authenticatedUserId))) {
throw new IllegalStateException(".....");
}

// 这里可以理解为通过原本的数据构建了一个同样的 Task
TaskImpl deletedTaskData = new TaskImpl(task.getId(),
task.getName(),
Task.TaskStatus.DELETED);

// 设置 Reason 原因
if (!deleteTaskPayload.hasReason()) {
deleteTaskPayload.setReason("Cancelled by " + authenticatedUserId);
}

// 执行 Service 删除 -> PS:0001
taskService.deleteTask(deleteTaskPayload.getTaskId(),
deleteTaskPayload.getReason(),
true);
return deletedTaskData;
}


// PS:0001 此处调用 taskService 进行处理 -> TaskServiceImpl
public void deleteTask(String taskId, String deleteReason, boolean cancel) {
commandExecutor.execute(new DeleteTaskCmd(taskId, deleteReason, false, cancel));
}

其中有2个需要注意的地方 :

  • C- DeleteTaskCmd : 该对象是一个命令对象 , 猜测这里是使用命令模式进行处理
  • C- commandExecutor 对象

来看一下对象是干嘛的 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码// commandExecutor 是一个接口 , 其中有三个方法
C- commandExecutor :
M- CommandConfig getDefaultConfig() : 获取默认的CommandConfig,如果没有提供就使用
M- <T> T execute(CommandConfig config, Command<T> command) : 使用指定的CommandConfig执行命令
M- <T> T execute(Command<T> command) : 使用默认的 CommandConfig执行命令

C- CommandExecutorImpl
I- commandExecutor

// 我们来看一下 , 执行了什么
public <T> T execute(CommandConfig config, Command<T> command) {
// 这里的 first 是 LogInterceptor
return first.execute(config, command);
}

PS : 到了 first.execute(config, command); 这一步 ,实际上就开始调用拦截器链了

Step 3 : 拦截器链的调用

拦截链的生成后面展示 , 这里只说一说执行了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码// 从前面我们分析 Delete 流程的时候就可以看到 , 其走了一个拦截链  ,如下图所示 : 

- CommandContextInterceptor
- CommandInvoker
- DebugCommandInvoker
- CommandInvoker
- JtaRetryInterceptor
- JtaTransactionInterceptor
- LoggingCommandInvoker
- SpringTransactionInterceptor
- TotalExecutionTimeCommandInterceptor
- TransactionContextInterceptor
- RetryInterceptor

avtiviti-AbstractCommandInterceptor.png

这些拦截链并不是全部会走 , 主要走的如下几个 :

  • Step 1 : SpringTransactionInterceptor : 控制事务
  • Step 2 : CommandContextInterceptor : 准备容器
  • Step 3 : TransactionContextInterceptor : 构建 TransactionContext
  • Step 4 : CommandInvoker 中准备 DbSqlSession , 通过 executeOperation # runnable.run() 执行处理线程
  • Step 5 : NeedsActiveTaskCmd 执行 TaskCmd
  • Step 6 : CompleteTaskCmd 发起 execute 执行 (PS : 操作类型在 Command 中 ,典型的命令模式)
  • Step End : TaskEntityManagerImpl 中执行具体的 DB 操作

总结

这一篇基本上把主流程大概介绍完了 , 后面来深入一下以下操作 :

  • SpringConfiguration 操作
  • 整体 Process 流程
  • 定制操作处理

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java面试指北!13个认证授权常见面试题/知识点总结!

发表于 2021-06-17

大家好,我是 Guide哥!端午已过,又要开始工作学习啦!

我发现有很多小伙伴对认证授权方面的知识不是特别了解,搞不清 Session 认证、JWT 以及 Cookie 这些概念。

所以,根据我根据日常对这部分学习已经在项目中的实际运用总结了这 13 个相关的问题并且附上了详细的回答。希望能够对大家有帮助!

认证 (Authentication) 和授权 (Authorization)的区别是什么?

这是一个绝大多数人都会混淆的问题。首先先从读音上来认识这两个名词,很多人都会把它俩的读音搞混,所以我建议你先先去查一查这两个单词到底该怎么读,他们的具体含义是什么。

说简单点就是:

  • 认证 (Authentication): 你是谁。
  • 授权 (Authorization): 你有权限干什么。

稍微正式点(啰嗦点)的说法就是 :

  • Authentication(认证) 是验证您的身份的凭据(例如用户名/用户 ID 和密码),通过这个凭据,系统得以知道你就是你,也就是说系统存在你这个用户。所以,Authentication 被称为身份/用户验证。
  • Authorization(授权) 发生在 Authentication(认证) 之后。授权嘛,光看意思大家应该就明白,它主要掌管我们访问系统的权限。比如有些特定资源只能具有特定权限的人才能访问比如 admin,有些对系统资源操作比如删除、添加、更新只能特定人才具有。

认证 :

授权:

这两个一般在我们的系统中被结合在一起使用,目的就是为了保护我们系统的安全性。

RBAC 模型了解吗?

系统权限控制最常采用的访问控制模型就是 RBAC 模型 。

什么是 RBAC 呢?

RBAC 即基于角色的权限访问控制(Role-Based Access Control)。这是一种通过角色关联权限,角色同时又关联用户的授权的方式。

简单地说:一个用户可以拥有若干角色,每一个角色有可以被分配若干权限这样,就构造成“用户-角色-权限” 的授权模型。在这种模型中,用户与角色、角色与权限之间构成了多对多的关系,如下图

RBAC

在 RBAC 中,权限与角色相关联,用户通过成为适当角色的成员而得到这些角色的权限。这就极大地简化了权限的管理。

通常 RBAC 下的权限设计相关的表有5 张,其中有 2 张用于建立表之间的联系:

通过这个权限模型,我们可以创建不同的角色并为不同的角色分配不同的权限范围(菜单)。

通常来说,如果系统对于权限控制要求比较严格的话,一般都会选择使用 RBAC 模型来做权限控制。

计算机基础相关书籍电子版我已经整理好了,需要的小伙伴自取

什么是 Cookie ? Cookie 的作用是什么?

Cookie 和 Session 都是用来跟踪浏览器用户身份的会话方式,但是两者的应用场景不太一样。

维基百科是这样定义 Cookie 的:

Cookies 是某些网站为了辨别用户身份而储存在用户本地终端上的数据(通常经过加密)。

简单来说: Cookie 存放在客户端,一般用来保存用户信息。

下面是 Cookie 的一些应用案例:

  1. 我们在 Cookie 中保存已经登录过的用户信息,下次访问网站的时候页面可以自动帮你登录的一些基本信息给填了。除此之外,Cookie 还能保存用户首选项,主题和其他设置信息。
  2. 使用 Cookie 保存 Session 或者 Token ,向后端发送请求的时候带上 Cookie,这样后端就能取到 Session 或者 Token 了。这样就能记录用户当前的状态了,因为 HTTP 协议是无状态的。
  3. Cookie 还可以用来记录和分析用户行为。举个简单的例子你在网上购物的时候,因为 HTTP 协议是没有状态的,如果服务器想要获取你在某个页面的停留状态或者看了哪些商品,一种常用的实现方式就是将这些信息存放在 Cookie
  4. ……

如何在项目中使用 Cookie 呢?

我这里以 Spring Boot 项目为例。

1)设置 Cookie 返回给客户端

1
2
3
4
5
6
7
8
9
10
11
java复制代码@GetMapping("/change-username")
public String setCookie(HttpServletResponse response) {
// 创建一个 cookie
Cookie cookie = new Cookie("username", "Jovan");
//设置 cookie过期时间
cookie.setMaxAge(7 * 24 * 60 * 60); // expires in 7 days
//添加到 response 中
response.addCookie(cookie);

return "Username is changed!";
}

2) 使用 Spring 框架提供的 @CookieValue 注解获取特定的 cookie 的值

1
2
3
4
java复制代码@GetMapping("/")
public String readCookie(@CookieValue(value = "username", defaultValue = "Atta") String username) {
return "Hey! My username is " + username;
}

3) 读取所有的 Cookie 值

1
2
3
4
5
6
7
8
9
10
11
java复制代码@GetMapping("/all-cookies")
public String readAllCookies(HttpServletRequest request) {

Cookie[] cookies = request.getCookies();
if (cookies != null) {
return Arrays.stream(cookies)
.map(c -> c.getName() + "=" + c.getValue()).collect(Collectors.joining(", "));
}

return "No cookies";
}

更多关于如何在 Spring Boot 中使用 Cookie 的内容可以查看这篇文章:How to use cookies in Spring Boot 。

Cookie 和 Session 有什么区别?

Session 的主要作用就是通过服务端记录用户的状态。 典型的场景是购物车,当你要添加商品到购物车的时候,系统不知道是哪个用户操作的,因为 HTTP 协议是无状态的。服务端给特定的用户创建特定的 Session 之后就可以标识这个用户并且跟踪这个用户了。

Cookie 数据保存在客户端(浏览器端),Session 数据保存在服务器端。相对来说 Session 安全性更高。为了保证 Cookie中信息的安全性,最好能将 Cookie 信息加密然后使用到的时候再去服务器端解密。

那么,如何使用 Session 进行身份验证?

如何使用 Session-Cookie 方案进行身份验证?

很多时候我们都是通过 SessionID 来实现特定的用户,SessionID 一般会选择存放在 Redis 中。举个例子:

  1. 用户成功登陆系统,然后返回给客户端具有 SessionID 的 Cookie
  2. 当用户向后端发起请求的时候会把 SessionID 带上,这样后端就知道你的身份状态了。

关于这种认证方式更详细的过程如下:

  1. 用户向服务器发送用户名、密码、验证码用于登陆系统。
  2. 服务器验证通过后,服务器为用户创建一个 Session,并将 Session 信息存储起来。
  3. 服务器向用户返回一个 SessionID,写入用户的 Cookie。
  4. 当用户保持登录状态时,Cookie 将与每个后续请求一起被发送出去。
  5. 服务器可以将存储在 Cookie 上的 SessionID 与存储在内存中或者数据库中的 Session 信息进行比较,以验证用户的身份,返回给用户客户端响应信息的时候会附带用户当前的状态。

使用 Session 的时候需要注意下面几个点:

  1. 依赖 Session 的关键业务一定要确保客户端开启了 Cookie。
  2. 注意 Session 的过期时间。

另外,Spring Session 提供了一种跨多个应用程序或实例管理用户会话信息的机制。如果想详细了解可以查看下面几篇很不错的文章:

  • Getting Started with Spring Session
  • Guide to Spring Session
  • Sticky Sessions with Spring Session & Redis

多服务器节点下 Session-Cookie 方案如何做?

Session-Cookie 方案在单体环境是一个非常好的身份认证方案。但是,当服务器水平拓展成多节点时,Session-Cookie 方案就要面临挑战了。

举个例子:假如我们部署了两份相同的服务 A,B,用户第一次登陆的时候 ,Nginx 通过负载均衡机制将用户请求转发到 A 服务器,此时用户的 Session 信息保存在 A 服务器。结果,用户第二次访问的时候 Nginx 将请求路由到 B 服务器,由于 B 服务器没有保存 用户的 Session 信息,导致用户需要重新进行登陆。

我们应该如何避免上面这种情况的出现呢?

有几个方案可供大家参考:

  1. 某个用户的所有请求都通过特性的哈希策略分配给同一个服务器处理。这样的话,每个服务器都保存了一部分用户的 Session 信息。服务器宕机,其保存的所有 Session 信息就完全丢失了。
  2. 每一个服务器保存的 Session 信息都是互相同步的,也就是说每一个服务器都保存了全量的 Session 信息。每当一个服务器的 Session 信息发生变化,我们就将其同步到其他服务器。这种方案成本太大,并且,节点越多时,同步成本也越高。
  3. 单独使用一个所有服务器都能访问到的数据节点(比如缓存)来存放 Session 信息。为了保证高可用,数据节点尽量要避免是单点。

如果没有 Cookie 的话 Session 还能用吗?

这是一道经典的面试题!

一般是通过 Cookie 来保存 SessionID ,假如你使用了 Cookie 保存 SessionID 的方案的话, 如果客户端禁用了 Cookie,那么 Session 就无法正常工作。

但是,并不是没有 Cookie 之后就不能用 Session 了,比如你可以将 SessionID 放在请求的 url 里面https://javaguide.cn/?Session_id=xxx 。这种方案的话可行,但是安全性和用户体验感降低。当然,为了你也可以对 SessionID 进行一次加密之后再传入后端。

为什么 Cookie 无法防止 CSRF 攻击,而 Token 可以?

CSRF (Cross Site Request Forgery)一般被翻译为 跨站请求伪造 。那么什么是 跨站请求伪造 呢?说简单用你的身份去发送一些对你不友好的请求。举个简单的例子:

小壮登录了某网上银行,他来到了网上银行的帖子区,看到一个帖子下面有一个链接写着“科学理财,年盈利率过万”,小壮好奇的点开了这个链接,结果发现自己的账户少了 10000 元。这是这么回事呢?原来黑客在链接中藏了一个请求,这个请求直接利用小壮的身份给银行发送了一个转账请求,也就是通过你的 Cookie 向银行发出请求。

1
html复制代码<a src=http://www.mybank.com/Transfer?bankId=11&money=10000>科学理财,年盈利率过万</>

上面也提到过,进行 Session 认证的时候,我们一般使用 Cookie 来存储 SessionId,当我们登陆后后端生成一个 SessionId 放在 Cookie 中返回给客户端,服务端通过 Redis 或者其他存储工具记录保存着这个 SessionId,客户端登录以后每次请求都会带上这个 SessionId,服务端通过这个 SessionId 来标示你这个人。如果别人通过 Cookie 拿到了 SessionId 后就可以代替你的身份访问系统了。

Session 认证中 Cookie 中的 SessionId 是由浏览器发送到服务端的,借助这个特性,攻击者就可以通过让用户误点攻击链接,达到攻击效果。

但是,我们使用 Token 的话就不会存在这个问题,在我们登录成功获得 Token 之后,一般会选择存放在 localStorage (浏览器本地存储)中。然后我们在前端通过某些方式会给每个发到后端的请求加上这个 Token,这样就不会出现 CSRF 漏洞的问题。因为,即使有个你点击了非法链接发送了请求到服务端,这个非法请求是不会携带 Token 的,所以这个请求将是非法的。

需要注意的是不论是 Cookie 还是 Token 都无法避免 跨站脚本攻击(Cross Site Scripting)XSS 。

跨站脚本攻击(Cross Site Scripting)缩写为 CSS 但这会与层叠样式表(Cascading Style Sheets,CSS)的缩写混淆。因此,有人将跨站脚本攻击缩写为 XSS。

XSS 中攻击者会用各种方式将恶意代码注入到其他用户的页面中。就可以通过脚本盗用信息比如 Cookie 。

推荐阅读:如何防止 CSRF 攻击?—美团技术团队

什么是 Token?什么是 JWT?

我们在前面的问题中探讨了使用 Session 来鉴别用户的身份,并且给出了几个 Spring Session 的案例分享。 我们知道 Session 信息需要保存一份在服务器端。这种方式会带来一些麻烦,比如需要我们保证保存 Session 信息服务器的可用性、不适合移动端(依赖 Cookie)等等。

有没有一种不需要自己存放 Session 信息就能实现身份验证的方式呢?使用 Token 即可!JWT (JSON Web Token) 就是这种方式的实现,通过这种方式服务器端就不需要保存 Session 数据了,只用在客户端保存服务端返回给客户的 Token 就可以了,扩展性得到提升。

JWT 本质上就一段签名的 JSON 格式的数据。由于它是带有签名的,因此接收者便可以验证它的真实性。

下面是 RFC 7519 对 JWT 做的较为正式的定义。

JSON Web Token (JWT) is a compact, URL-safe means of representing claims to be transferred between two parties. The claims in a JWT are encoded as a JSON object that is used as the payload of a JSON Web Signature (JWS) structure or as the plaintext of a JSON Web Encryption (JWE) structure, enabling the claims to be digitally signed or integrity protected with a Message Authentication Code (MAC) and/or encrypted. ——JSON Web Token (JWT)

JWT 由 3 部分构成:

  1. Header : 描述 JWT 的元数据,定义了生成签名的算法以及 Token 的类型。
  2. Payload : 用来存放实际需要传递的数据
  3. Signature(签名) :服务器通过Payload、Header和一个密钥(secret)使用 Header 里面指定的签名算法(默认是 HMAC SHA256)生成。

如何基于 Token 进行身份验证?

在基于 Token 进行身份验证的的应用程序中,服务器通过Payload、Header和一个密钥(secret)创建令牌(Token)并将 Token 发送给客户端,客户端将 Token 保存在 Cookie 或者 localStorage 里面,以后客户端发出的所有请求都会携带这个令牌。你可以把它放在 Cookie 里面自动发送,但是这样不能跨域,所以更好的做法是放在 HTTP Header 的 Authorization 字段中:Authorization: Bearer Token。

jwt

  1. 用户向服务器发送用户名和密码用于登陆系统。
  2. 身份验证服务响应并返回了签名的 JWT,上面包含了用户是谁的内容。
  3. 用户以后每次向后端发请求都在 Header 中带上 JWT。
  4. 服务端检查 JWT 并从中获取用户相关信息。

什么是 SSO?

SSO(Single Sign On)即单点登录说的是用户登陆多个子系统的其中一个就有权访问与其相关的其他系统。举个例子我们在登陆了京东金融之后,我们同时也成功登陆京东的京东超市、京东国际、京东生鲜等子系统。

sso

什么是 OAuth 2.0?

OAuth 是一个行业的标准授权协议,主要用来授权第三方应用获取有限的权限。而 OAuth 2.0 是对 OAuth 1.0 的完全重新设计,OAuth 2.0 更快,更容易实现,OAuth 1.0 已经被废弃。详情请见:rfc6749。

实际上它就是一种授权机制,它的最终目的是为第三方应用颁发一个有时效性的令牌 Token,使得第三方应用能够通过该令牌获取相关的资源。

OAuth 2.0 比较常用的场景就是第三方登录,当你的网站接入了第三方登录的时候一般就是使用的 OAuth 2.0 协议。

另外,现在 OAuth 2.0 也常见于支付场景(微信支付、支付宝支付)和开发平台(微信开放平台、阿里开放平台等等)。

微信支付账户相关参数:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yqIC91bs-1623925796543)(./images/basis-of-authority-certification/微信支付-fnglfdlgdfj.png)]

下图是 Slack OAuth 2.0 第三方登录的示意图:

推荐阅读:

  • OAuth 2.0 的一个简单解释
  • 10 分钟理解什么是 OAuth 2.0 协议
  • OAuth 2.0 的四种方式
  • GitHub OAuth 第三方登录示例教程

花了半个月写的最新版 Java学习路线已近更新!可能是你看过最用心、最全面的 Java 后端学习路线。

最适合新手的Java系统学习路线!

我是 Guide哥,拥抱开源,喜欢烹饪。开源项目 JavaGuide 作者,Github:Snailclimb - Overview 。未来几年,希望持续完善 JavaGuide,争取能够帮助更多学习 Java 的小伙伴!共勉!凎!点击查看我的2020年工作汇报!

原创不易,欢迎点赞分享,欢迎关注 @JavaGuide,我会持续分享原创干货~

本回答为我本人原创,如需转载,还请注明出处啊!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java开发环境搭建详细教程(JDK+Eclipse)

发表于 2021-06-17

前言

学习 Java 编程的第一件事情就是把开发环境搭建起来,这样才能开始快乐的编程学习之旅。然而,很多萌新经常因为种种问题无法搭建好开发环境,导致自己信心受挫,学习兴趣和激情也大减,甚至因此彻底放弃学习 Java 编程!网上虽然有很多相关资料,但鱼龙混杂,众说纷纭,不仅无法解决问题,还搞得萌新晕头转向。

为了帮助更多萌新顺利进入 Java 精彩世界,我精心编写了本篇教程,不仅提供了非常详细的操作步骤,还收集整理了各种常见问题的解决方法(持续更新)。各位萌新只需要花几分钟时间认真看完下面的内容,就一定可以将 Java 开发环境成功搭建起来!

JDK:Java Development Kit,即 Java 开发工具包,它提供了开发和调试 Java 程序所需的各种工具,Java 编程必备。本教程使用的是开源免费的 JDK 版本,即 OpenJDK,而不是 Oracle 公司提供的商业化 JDK 版本(简称 OracleJDK),在这里我建议大家使用 OpenJDK,二者的区别会在常见问题中讲到。

安装配置好 JDK 后其实就可以使用系统自带的记事本编写 Java 代码,然后在命令行中调用 javac 和 java 命令编译运行代码,无需安装其他任何工具。但这种开发方式很低效,编程体验也很糟糕,简单程序或许还能应付,如果用于开发大型复杂项目,绝对会让你痛苦到怀疑人生!

工欲善其事必先利其器,因此我们还需要安装一款功能更强大的集成化开发工具,将开发所需的各种工具整合在一个软件里面,方便我们使用,也就是通常所说的 IDE,本教程使用的是世界流行的 Eclipse 软件。

小知识

IDE:Integrated Development Environment,即集成开发环境,用于提供程序开发环境的软件,它一般包括代码编辑器、编译器、调试器和图形用户界面等工具,集成了代码编写、分析、编译、调试等开发所需功能。

Eclipse 是一款著名的跨平台 IDE,开源免费,其本身就是用 Java 语言开发的,最初主要用于 Java 语言开发,但由于它是一个可扩展的框架平台,可以通过安装不同的插件轻松实现对其它编程语言的支持,比如 C++、Python 等,许多软件开发商以 Eclipse 为框架开发自己的 IDE。

本教程所使用的系统及软件版本如下:

  • Windows 10 64位
  • openjdk-15.0.1_windows-x64
  • eclipse-jee-2020-09-R-win32-x86_64

PS:不同版本的系统或软件的操作方法基本相同,为了节省宝贵时间,建议大家使用和本教程一样的版本!

操作步骤

1. 下载 JDK 和 Eclipse

官网是全英文的,还特别容易迷路,为了方便大家,我已经把从官网下载好的 JDK 和 Eclipse 安装包上传到百度网盘中,供大家直接下载使用。

链接:pan.baidu.com/s/181xs-LGn…

提取码:4zwr

下载后得到两个压缩包文件,如下:

  • openjdk-15.0.1_windows-x64_bin.zip
  • eclipse-jee-2020-09-R-win32-x86_64.zip

2. 解压 JDK 和 Eclipse

在 D 盘下新建一个 Java 文件夹,然后将上一步得到的两个压缩包直接解压到该文件夹下。如下图所示:

3. 配置 JDK 相关环境变量

3.1 在桌面上右击“此电脑”,点击“属性”菜单,在打开的窗口中点击左侧的“高级系统设置”菜单;

3.2 在打开的“系统属性”窗口中点击下方的“环境变量”按钮;

3.3 在打开的“环境变量”窗口中点击“系统变量”栏中的“新建”按钮;

3.4 在打开的“新建系统变量”窗口中按下图所示输入变量名和变量值,然后点击下方的“确定”按钮;

PS:变量名是 JAVA_HOME,不区分大小写,变量值是 JDK 的实际解压路径。

3.5 在“环境变量”窗口中双击“系统变量”栏中的”Path”变量,在打开的“编辑环境变量”窗口中按下图所示步骤进行操作;

PS:这里输入的路径就是 JDK 解压后的文件夹里面的 bin 文件夹的完整路径,将这个路径加入系统 Path 环境变量后就可以在命令行下直接通过名字执行 bin 文件夹下的 javac、java 等程序,而不用指定它们的具体路径。

小知识

命令就是可执行程序,当在命令行中输入一个命令的名字(不指定具体路径)执行它时,系统会读取 Path 环境变量的值,并按顺序依次搜索其值表示的所有路径,直到找到对应的可执行程序就停止继续搜索,然后运行它。如果搜索完所有路径都没有找到对应的可执行程序,就会报错。

3.6 在“环境变量”窗口中点击底部的“确定”按钮,剩下的其他窗口都可以直接关闭了。

PS:这里一定要记得点击“确定”按钮,否则之前对环境变量进行的所有配置操作将不会起作用!

4. 测试 JDK 是否部署成功

按 WIN + R 快捷键打开“运行”窗口,输入 cmd 并按回车键打开“命令提示符”窗口。在打开的窗口中输入 javac -version 和 java -version 命令并按回车键执行,如果能成功看到如下图所示的版本信息,则证明 JDK 已经部署成功。如果发生错误,请仔细检查前面的每一步操作,尤其是环境变量的配置,多尝试几次肯定能成功!

运行窗口

命令提示符窗口

5. 启动 Eclipse 软件

在成功部署 JDK 后,打开 Eclipse 解压后所在的文件夹(这里是 D:\Java\eclipse),双击“eclipse.exe”这个可执行文件就可以直接启动 Eclipse 软件了。为了方便后续使用,可以将该文件发送到桌面快捷方式(桌面快捷方式可以任意修改名字)。

启动Eclipse

给Eclipse创建桌面快捷方式

6. 写出第一个最简单程序

6.1 启动 Eclipse 后打开如下窗口,提示设置工作空间,可以点击“Browse…”按钮进行修改或保持默认,然后点击底部的“Launch”按钮;

小知识

Workspace:工作空间,就是用于存放 Java 项目(Project)的文件夹。

6.2 稍等片刻即可打开 Eclipse 主界面,可以直接关闭欢迎(Welcome)界面;

6.3 依次点击菜单 File –> New –> Java Project;

6.4 在打开的窗口中按下图所示输入项目名称(Project name)、指定项目文件夹存放路径,其他配置项可以保持默认,然后点击底部的“Next”按钮;

PS:项目名称原则上可以任意指定,但为了便于开发和维护,建议按照项目功能进行命名。每个项目都对应一个文件夹,称之为项目文件夹,项目中的所有源文件、资源文件、配置文件等都统一存放在这个文件夹里面,以方便管理。

6.5 在打开的窗口中按下图所示进行操作,最后点击底部的“Finish”按钮即可创建好一个 Java 项目;

6.6 双击左侧边栏中的项目名称,展开项目结构,然后右击“src”,依次点击菜单 New –> Class;

6.7 在打开的窗口中按下图所示输入包名和类名,可以勾选自动生成主方法,然后点击底部的“Finish”按钮即可创建好一个新的 Java 类;

PS:包(Package)和类(Class)的相关知识和细节在这里不展开,后续教程会以专题进行详细深入讲解。主方法(main)是一个 Java 程序的执行入口,即 Java 程序启动后会自动从主方法中的第一行代码开始逐行往下执行,主方法执行结束返回后,整个程序也会随之结束。

6.8 在主方法中输入 System.out.println("你好,中国!"); 这行代码后,按 Ctrl + S 快捷键保存,然后点击上方工具栏中的绿色三角形按钮编译并运行程序代码。如果没有错误的话,就可以在下方的 Console 窗口中看到这段程序代码的运行结果。

PS:这里输入的代码的功能就是在控制台(Console)窗口中显示“你好,中国!”这行文字内容。

第一个最简单程序

常见问题

1. 按照上面的步骤操作还是没有成功或者遇到了其他问题,怎么办?

欢迎在文章下方评论区中将你遇到的问题发出来,我帮你分析解决,也可以私信我交流。

2. OpenJDK 和 OracleJDK 有什么区别?

从费用角度看,OracleJDK 从 2019 年 4 月 16 日开始实施商用收费政策,当你使用 OracleJDK 8u211及以上版本开发具有商业用途的软件时,必须先向 Oracle 公司付费取得授权,否则可能会面临巨大的法律风险。而 OpenJDK 是完全开源免费的,可以自由使用。

从技术角度看,OracleJDK 是基于 OpenJDK 构建的,二者相差不大,不过在性能优化和稳定性方面,OracleJDK 略胜一筹。越来越多的企业都在实际生产环境中使用 OpenJDK,我们君工教育的 Web 和移动应用后台采用的就是 OpenJDK。对于学习者而言,二者基本上没有任何区别。另外,Oracle 公司对开源社区和个人开发者的态度不太友好,而 OpenJDK 社区则非常活跃,忠实用户也越来越多。

总结一下,建议大家使用 OpenJDK。

3. 为什么不使用 OracleJDK 8 这个版本?

目前很多个人和教程还在使用 OracleJDK 8 这个比较老的版本,而拒绝使用更新的版本,很大程度上是因为新版本的 OracleJDK 不再免费了(OracleJDK 8u202 为最后一个免费版本)。新版本 JDK 不仅仅会带来一些新的 Java 语法特性,还会修复一些 Bug,提升性能和稳定性,坚持使用旧版本自然就无法获得这些东西。如果你想使用更新版本的 JDK,但又不想为商业用途付费,最好的解决方案就是使用新版本的 OpenJDK。

4. javac 或 java 命令无法执行,报错如下:

JDK 相关环境变量没有配置成功,具体来说就是 Path 环境变量的值中没有包含 JDK 安装(解压)文件夹中的 bin 文件夹的正确路径,请对照前面的操作步骤仔细检查并修改。

5. 为什么没有配置 CLASSPATH 环境变量?

在 JDK 5 版本之后(本教程使用的是 JDK 15),不再需要配置 CLASSPATH 环境变量。当然,如果你非要多此一举,也不会有什么问题。

6. 执行 javac 或 java 命令能成功,但显示的版本信息却不是 15.0.1

出现这种情况,说明你的电脑之前安装过其他版本的 JDK 或者卸载后没有删除相关的环境变量,建议你将所有 JAVA_HOME、CLASSPATH 环境变量以及 Path 环境变量的值中的 JDK 路径都删除,然后对照前面的操作步骤重新配置环境变量。

7. Eclipse 无法启动,报错弹窗如下:

JDK 相关环境变量没有配置成功,请对照前面的操作步骤仔细检查并修改,保证 JDK 部署成功后才能正常运行 Eclipse。

8. Eclipse 无法启动,报错弹窗如下:

系统中起作用的 JDK 版本为 JDK 8,而当前 Eclipse 需要 JDK 11 或更高版本才能正常启动。请将之前安装的低版本的 JDK 完全删除(删除 JDK 文件夹和相关环境变量)后,重新部署本教程提供的 JDK,然后再来运行 Eclipse。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

kettle真是神器啊,3分钟将mysql中的数据迁移到po

发表于 2021-06-17

今天公司机房因为停电几台开发机的磁盘损坏了。运维同学告诉我,磁盘恢复比较慢,如果着急用,可以先申请一个新的虚拟机使用。没办法又要各种装装装,这是第二次了吧,心中真是策马奔腾

src=http___5b0988e595225.cdn.sohucs.com_images_20171019_786c4675aa214a58b73ba006e0e07934.jpeg&refer=http___5b0988e595225.cdn.sohucs.jpg

再暗骂一句老板抠门,也不知道土豪的大厂是不是一人一台ECS用。

src=http___ww1.sinaimg.cn_large_006r3PQBjw1fautmtyppgj30c80c8aa5.jpg&refer=http___www.sina.jpg

由于第一次故障,丢了几天代码,现在不管功能做没做完,都会先把代码push到仓库,仓库要是挂了公司的IT部门是不是都要去顶楼jump、jump了。

就剩下数据库了,虽然有备份,但是老是重装也挺浪费时间的,马上618了,看能不能去网上搜一个便宜一点的ecs装个mysql,虽然不能报销,但节省下来自己的时间摸鱼不香么。

14c9aa9b-3e18-480b-bf0b-49864e0bcc38.png

犹豫一下准备下单了。隔壁的XD冲过来说,有个免费的云数据库在搞活动,他有邀请码,可以先试试,ecs先不慌买。
试试又不花钱,不过好像是postgres的,我之前是用的mysql,得先把数据同步上去。用kettle来做数据同步,有点杀鸡用牛刀的感觉。需要科普的同学可以看b站这个视频,www.bilibili.com/video/BV1kE… 不想看的,我截了几张图在下面

kettle介绍

9498d584-a8f5-4f0e-92ed-cec8f226f741.png

程序目录,这里丢个网盘地址 链接:pan.baidu.com/s/1hPxrJUFA… 提取码:d3lf

5c426ed6-dbb1-4798-bf69-cc859580d024.png

下载完成后直接解压,点击里面的spoon.bat启动

03e38d22-32e9-4e9a-a58f-d36075a7d62e.png

新建job,然后点击左边的“主对象树”,配置“DB连接”

6a9fe4bf-f8c0-47c2-8515-c5871d1de907.png

配置源数据库,我的源数据库是Mysql的,这里填上相应的参数

cf82d1b0-a61c-451c-8355-fae9c9376853.png

配置目标数据库,这里登陆云数据库的控制台MemFireCloud获取数据库连接IP地址,名称,将参数填写到kettle

0715ca21-a728-4f80-9cd9-c8118137a442.png

1.png

点击“工具”–“向导”–“复制多表向导”,选择源数据库和目标数据库

2.png

选择要复制的表

3.png

修改任务名

4.png

点击“Finish”完成配置

5.png

点击“Run”开始复制数据,复制完成后,可以登录云数据库控制台查看数据

6.png

kettle功能非常丰富,很多大数据公司都用他来做数据集成,这个迁移只能算是杀鸡用牛刀了。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RocketMQ-事务消息(分布式事务)

发表于 2021-06-17

1、业务场景

在电商场景里面,成功付款后,会发放优惠券。

上面的场景:在电商系统中,会出现,付款成功后,准备发优惠券的时候,服务器宕机了。这个时候会造成用户成功付款,却没收到优惠券的情况。这种情况下,我们很容易想到用事务来保证付款和发优惠券的原子性即可:要么付款和发优惠券同时成功,要么同时失败,是不允许其他一个成功,另一个失败的。

但上面,存在一种情况:付款和发优惠券高度耦合,这样子容易出现:发优惠券一直失败,会导致付款一直失败的场景。

对于这种场景的解决方案:引入消息中间件MQ来解耦。

但是上述这种情景,存在MQ不可用,宕机的情况。会产生付款成功,发优惠券失败的情况。

针对这种情况,需要引入分布式事务。

2、事务消息

分布式事务是一种抽象的概念。

那具体的实现呢?

是有很多种实现的。

在这里,主要介绍:RocketMQ的事务消息。

事务消息的流程图

流程步骤:
  • 1、生产者发送half消息
  • 2、MQ回复ACK确认消息
  • 3、执行本地事务:订单付款。如果订单付款成功,那么就给MQ发送commit消息。如果订单付款失败,就发送rollback消息
  • 4、如果步骤3发送消息失败,这个时候MQ的定时器会检查half消息。MQ回调方法,去检查本地事务的执行情况。如果执行成功,就返回commit消息。如果执行失败,就返回rollback消息。
  • 5、如果MQ收到的是commit消息,此时会把half消息复制到真正的topic中
  • 6、消费者对消息进行消费,下发优惠券

3、如何使用

上面,大概知道了事务消息的流程。

接下来,要知道如何使用。

还是以付款下发优惠券为例。

3.1 发送half消息-MQ回复ACK确认消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码
@Override
public void finishedOrder(String orderNo, String phoneNumber) {

try {
// 退房事务消息,topic:完成订单
Message msg = new Message(orderFinishedTopic, JSON.toJSONString(orderInfo).getBytes(StandardCharsets.UTF_8));

// 发送half消息
TransactionSendResult transactionSendResult = orderFinishedTransactionMqProducer.sendMessageInTransaction(msg, null);

} catch (MQClientException e) {

}

}

3.2 执行本地事务:付款

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

try {
// 修改订单的状态
orderService.payOrder();

// 成功 提交prepare消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 执行本地事务失败 回滚prepare消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

3.3 MQ定时器回调查询half消息状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {

try {
//查询订单状态
Integer orderStatus = orderService.getOrderStatus();
if (Objects.equals(orderStatus, OrderStatusEnum.FINISHED.getStatus())) { //返回commit消息
return LocalTransactionState.COMMIT_MESSAGE;
} else {
//返回rollback消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
// 查询订单状态失败
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

3.4 消费者进行消费,下发优惠券

1
2
3
4
5
6
7
8
9
10
java复制代码 @Bean(value = "orderFinishedConsumer")
public DefaultMQPushConsumer finishedConsumer(@Qualifier(value = "orderFinishedMessageListener") OrderFinishedMessageListener orderFinishedMessageListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(orderFinishedConsumerGroup);
consumer.setNamesrvAddr(namesrvAddress);
//topic:完成订单
consumer.subscribe(orderFinishedTopic, "*");
consumer.setMessageListener(orderFinishedMessageListener);
consumer.start();
return consumer;
}
监听器:OrderFinishedMessageListener
1
2
3
4
5
6
7
8
java复制代码@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//下发优惠券
couponService.distributeCoupon();

}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

4、知其然知其所以然

你看完上面,已经知道如何使用事务消息。

接下来,你需要了解其底层原理:看看源码(面试常问)

step1:首先看发送half消息的代码:

step2:进入代码里面:

step3:其实就是默认调用了DefaultMQProducer#sendMessageInTransaction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码public TransactionSendResult sendMessageInTransaction(final Message msg,
...省略一堆代码

SendResult sendResult = null;
// 给待发送消息添加属性,表名是一个事务消息,即半消息,这里设置为true。(这个属性后面会用到)
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//发送消息--重点0
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
//消息发送成功
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {

localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//执行本地事务,executeLocalTransaction需要子类去具体实现
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

try {
// 最后,给broker发送提交或者回滚事务的RPC请求
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// 组装结果返回
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

上面的DefaultMQProducerImpl#sendMessageInTransaction方法主要流程:

  • 简单的数据校验
  • 给消息添加属性,表明这个事务消息
  • 发送消息,且返回消息的结果–重点0
  • 根据消息不同结果,进行不同的处理
  • 如果消息发送成功,那么就执行本地事务(付款),返回本地事务的结果–重点1
  • 最后,根据本地事务的结果,给broker发送Commit或rollback的消息–重点2

上面我们简述了一个大概的流程。未涉及到太多细节,是对一个整体流程的了解。

接下来,我们深入了解一些细节:

我们先研究一下重点0:sendResult = this.send(msg);
我们点进去会发现,send的底层其实就是调用了DefaultMQProducerImpl#sendKernelImpl方法。

step4:接着到SendMessageProcessor#sendMessage

step5:事务消息,继续进入TransactionalMessageServiceImpl#prepareMessage–>TransactionalMessageBridge#putHalfMessage–>TransactionalMessageBridge#parseHalfMessageInner

step6:接着,我们坐着研究一下重点1,即transactionListener.executeLocalTransaction(msg, arg);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

你会发现,这是一个接口,有2个方法,一个是执行本地事务executeLocalTransaction。另一个是检查本地事务checkLocalTransaction。这两个方法需要实现类去实现。

比如:执行本地事务:付款

step7:接着我们来看重点2:this.endTransaction(sendResult, localTransactionState, localException);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public void endTransaction(
// 省略一堆代码
//事务id
String transactionId = sendResult.getTransactionId();
// broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
// 根据事务消息和本地事务的执行结果,发送不同的结果给broker
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
//发送给broker
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

到这个时候,我们已经把消息从生产者发送到了broker里面。

那接下来,我们就需要了解broker是如何处理事务消息的。

step8: 事务消息如何回查

直接看代码注解即可

TransactionalMessageCheckService#onWaitEnd

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Override
protected void onWaitEnd() {
//timeout是从broker配置文件中获取transactionTimeOut值,代表事务的过期时间,(一个消息的存储时间 + timeout) > 系统当前时间,才会对该消息执行事务状态会查
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
//checkMax是从broker配置文件中获取transactionCheckMax值,代表事务的最大检测次数,如果超过检测次数,消息会默认为丢弃,即rollback消息
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
//回查:核心点org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.check
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

step9:进入check方法:TransactionalMessageServiceImpl#check。

直接看注解即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
java复制代码@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
//RMQ_SYS_TRANS_HALF_TOPIC主题
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
//获取RMQ_SYS_TRANS_HALF_TOPIC主题下的所有队列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
//数据校验
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
//遍历队列
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
//根据队列获取对应topic:RMQ_SYS_TRANS_OP_HALF_TOPIC下的opQueue
//RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主题,事务消息首先先进入到该主题。
//RMQ_SYS_TRANS_OP_HALF_TOPIC:当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下
MessageQueue opQueue = getOpQueue(messageQueue);
//messageQueue队列的偏移量
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
//opQueue队列的偏移量
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
//如果其中一个队列的偏移量小于0,就跳过
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
//doneOpOffset和removeMap主要的目的是避免重复调用事务回查接口
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
//空消息的次数
int getMessageNullCount = 1;
//RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新偏移量
long newOffset = halfOffset;
//RMQ_SYS_TRANS_HALF_TOPIC的偏移量
long i = halfOffset;
while (true) {
//限制每次最多处理的时间是60s
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
//removeMap包含当前信息,则跳过,处理下一条信息
//removeMap的信息填充是在上面的fillOpRemoveMap
//fillOpRemoveMap具体逻辑是:具体实现逻辑是从RMQ_SYS_TRANS_OP_HALF_TOPIC主题中拉取32条,
//如果拉取的消息队列偏移量大于等于RMQ_SYS_TRANS_HALF_TOPIC#queueId当前的处理进度时
//会添加到removeMap中,表示已处理过
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
//根据消息队列偏移量i从RMQ_SYS_TRANS_HALF_TOPIC队列中获取消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
//如果消息为空
if (msgExt == null) {
//则根据允许重复次数进行操作,默认重试一次 MAX_RETRY_COUNT_WHEN_HALF_NULL=1
//如果超过重试次数,直接跳出while循环,结束该消息队列的事务状态回查
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
//如果是由于没有新的消息而返回为空(拉取状态为:PullStatus.NO_NEW_MSG),则结束该消息队列的事务状态回查。
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
//其他原因,则将偏移量i设置为: getResult.getPullResult().getNextBeginOffset(),重新拉取
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
//判断该消息是否需要discard(吞没,丢弃,不处理)、或skip(跳过)
//needDiscard 依据:如果该消息回查的次数超过允许的最大回查次数,
// 则该消息将被丢弃,即事务消息提交失败,不能被消费者消费,其做法,
// 主要是每回查一次,在消息属性TRANSACTION_CHECK_TIMES中增1,默认最大回查次数为5次。

//needSkip依据:如果事务消息超过文件的过期时间,
// 默认72小时(具体请查看RocketMQ过期文件相关内容),则跳过该消息。
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
//消息的存储时间大于开始时间,中断while循环
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
//该消息已存储的时间=系统当前时间-消息存储的时间戳
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
//checkImmunityTime:检测事务的时间
//transactionTimeout:事务消息的超时时间
long checkImmunityTime = transactionTimeout;
//用户设定的checkImmunityTimeStr
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
//checkImmunityTime=Long.valueOf(checkImmunityTimeStr)
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
//最近进度=当前消息进度+1
newOffset = i + 1;
i++;
continue;
}
}
} else {//如果当前时间小于事务超时时间,则结束while循环
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
//是否需要回查,判断依据如下:
//消息已存储的时间大于事务超时时间
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);

if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {//11
continue;
}
//重点:进行事务回查(异步)
listener.resolveHalfMsg(msgExt);
} else {
//加载已处理的消息进行筛选
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
//保存half消息队列的回查进度
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
//保存处理队列opQueue的处理今夕
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Throwable e) {
log.error("Check error", e);
}

}

step10:继续深入研究一下:resolveHalfMsg

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//针对每个待反查的half消息,进行回查本地事务结果
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}

step11:继续追进sendCheckMessage(msgExt)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码/**
* 发送回查消息
* @param msgExt
* @throws Exception
*/
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
//原主题
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
//原队列id
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) {
//回调查询本地事务状态
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}

到这里,基本上把事务消息的流程和实现细节走了一遍。

还有什么问题的话,在留言区或者私信大头菜

5、问题:分布式事务还有其他实现

上面的事务消息是分布式事务的一种实现。

事务消息被称为二段提交。

问题:分布式事务,还有哪些具体的实现方式?

欢迎留言

6、后续文章

  • RocketMQ-入门(已更新)
  • RocketMQ-架构和角色(已更新)
  • RocketMQ-消息发送(已更新)
  • RocketMQ-消费信息
  • RocketMQ-消费者的广播模式和集群模式(已更新)
  • RocketMQ-顺序消息(已更新)
  • RocketMQ-延迟消息(已更新)
  • RocketMQ-批量消息
  • RocketMQ-过滤消息
  • RocketMQ-事务消息(已更新)
  • RocketMQ-消息存储
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主从复制
  • RocketMQ-刷盘机制
  • RocketMQ-幂等性
  • RocketMQ-消息重试
  • RocketMQ-死信队列

…

欢迎各位入(guan)股(zhu),后续文章干货多多。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

用 Go struct 不能犯的一个低级错误!

发表于 2021-06-17

微信搜索【脑子进煎鱼了】关注这一只爆肝煎鱼。本文 GitHub github.com/eddycjy/blo… 已收录,有我的系列文章、资料和开源 Go 图书。

大家好,我是煎鱼。

前段时间我分享了 《手撕 Go 面试官:Go 结构体是否可以比较,为什么?》的文章,把基本 Go struct 的比较依据研究了一番。这不,最近有一位读者,遇到了一个关于 struct 的新问题,不得解。

大家一起来看看,建议大家在看到代码例子后先思考一下答案,再往下看。

独立思考很重要。

疑惑的例子

其给出的例子一如下:

1
2
3
4
5
6
7
golang复制代码type People struct {}

func main() {
a := &People{}
b := &People{}
fmt.Println(a == b)
}

你认为输出结果是什么呢?

输出结果是:false。

再稍加改造一下,例子二如下:

1
2
3
4
5
6
7
8
9
golang复制代码type People struct {}

func main() {
a := &People{}
b := &People{}
fmt.Printf("%p\n", a)
fmt.Printf("%p\n", b)
fmt.Println(a == b)
}

输出结果是:true。

他的问题是 “为什么第一个返回 false 第二个返回 true,是什么原因导致的?

煎鱼进一步的精简这个例子,得到最小示例:

1
2
3
4
5
6
7
8
9
10
golang复制代码func main() {
a := new(struct{})
b := new(struct{})
println(a, b, a == b)

c := new(struct{})
d := new(struct{})
fmt.Println(c, d)
println(c, d, c == d)
}

输出结果:

1
2
3
4
5
6
7
bash复制代码// a, b; a == b
0xc00005cf57 0xc00005cf57 false

// c, d
&{} &{}
// c, d, c == d
0x118c370 0x118c370 true

第一段代码的结果是 false,第二段的结果是 true,且可以看到内存地址指向的完全一样,也就是排除了输出后变量内存指向改变导致的原因。

进一步来看,似乎是 fmt.Print 方法导致的,但一个标准库里的输出方法,会导致这种奇怪的问题?

问题剖析

如果之前有被这个 “坑” 过,或有看过源码的同学。可能能够快速的意识到,导致这个输出是逃逸分析所致的结果。

我们对例子进行逃逸分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
go复制代码// 源代码结构
$ cat -n main.go
5 func main() {
6 a := new(struct{})
7 b := new(struct{})
8 println(a, b, a == b)
9
10 c := new(struct{})
11 d := new(struct{})
12 fmt.Println(c, d)
13 println(c, d, c == d)
14 }

// 进行逃逸分析
$ go run -gcflags="-m -l" main.go
# command-line-arguments
./main.go:6:10: a does not escape
./main.go:7:10: b does not escape
./main.go:10:10: c escapes to heap
./main.go:11:10: d escapes to heap
./main.go:12:13: ... argument does not escape

通过分析可得知变量 a, b 均是分配在栈中,而变量 c, d 分配在堆中。

其关键原因是因为调用了 fmt.Println 方法,该方法内部是涉及到大量的反射相关方法的调用,会造成逃逸行为,也就是分配到堆上。

为什么逃逸后相等

关注第一个细节,就是 “为什么逃逸后,两个空 struct 会是相等的?”。

这里主要与 Go runtime 的一个优化细节有关,如下:

1
2
golang复制代码// runtime/malloc.go
var zerobase uintptr

变量 zerobase 是所有 0 字节分配的基础地址。更进一步来讲,就是空(0字节)的在进行了逃逸分析后,往堆分配的都会指向 zerobase 这一个地址。

所以空 struct 在逃逸后本质上指向了 zerobase,其两者比较就是相等的,返回了 true。

为什么没逃逸不相等

关注第二个细节,就是 “为什么没逃逸前,两个空 struct 比较不相等?”。

Go spec

从 Go spec 来看,这是 Go 团队刻意而为之的设计,不希望大家依赖这一个来做判断依据。如下:

This is an intentional language choice to give implementations flexibility in how they handle pointers to zero-sized objects. If every pointer to a zero-sized object were required to be different, then each allocation of a zero-sized object would have to allocate at least one byte. If every pointer to a zero-sized object were required to be the same, it would be different to handle taking the address of a zero-sized field within a larger struct.

还说了一句很经典的,细品:

Pointers to distinct zero-size variables may or may not be equal.

另外空 struct 在实际使用中的场景是比较少的,常见的是:

  • 设置 context,传递时作为 key 时用到。
  • 设置空 struct 业务场景中临时用到。

但业务场景的情况下,也大多数会随着业务发展而不断改变,假设有个远古时代的 Go 代码,依赖了空 struct 的直接判断,岂不是事故上身?

不可直接依赖

因此 Go 团队这番操作,与 Go map 的随机性如出一辙,避免大家对这类逻辑的直接依赖,是值得思考的。

而在没逃逸的场景下,两个空 struct 的比较动作,你以为是真的在比较。实际上已经在代码优化阶段被直接优化掉,转为了 false。

因此,虽然在代码上看上去是 == 在做比较,实际上结果是 a == b 时就直接转为了 false,比都不需要比了。

你说妙不?

没逃逸让他相等

既然我们知道了他是在代码优化阶段被优化的,那么相对的,知道了原理的我们也可以借助在 go 编译运行时的 gcflags 指令,让他不优化。

在运行前面的例子时,执行 -gcflags="-N -l" 指令:

1
2
3
4
go复制代码$ go run -gcflags="-N -l" main.go 
0xc000092f06 0xc000092f06 true
&{} &{}
0x118c370 0x118c370 true

你看,两个比较的结果都是 true 了。

总结

在今天这篇文章中,我们针对 Go 语言中的空结构体(struct)的比较场景进行了进一步的补全。经过这两篇文章的洗礼,你会更好的理解 Go 结构体为什么叫既可比较又不可比较了。

而空结构比较的奇妙,主要原因如下:

  • 若逃逸到堆上,空结构体则默认分配的是 runtime.zerobase 变量,是专门用于分配到堆上的 0 字节基础地址。因此两个空结构体,都是 runtime.zerobase,一比较当然就是 true 了。
  • 若没有发生逃逸,也就分配到栈上。在 Go 编译器的代码优化阶段,会对其进行优化,直接返回 false。并不是传统意义上的,真的去比较了。

不会有人拿来出面试题,不会吧,为什么 Go 结构体说可比较又不可比较?

若有任何疑问欢迎评论区反馈和交流,最好的关系是互相成就,各位的点赞就是煎鱼创作的最大动力,感谢支持。

文章持续更新,可以微信搜【脑子进煎鱼了】阅读,回复【000】有我准备的一线大厂面试算法题解和资料;本文 GitHub github.com/eddycjy/blo… 已收录,欢迎 Star 催更。

参考

  • 欧神的微信交流
  • 曹大的一个空 struct 的“坑”

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

再见RocketMQ!全新一代消息中间件,带可视化管理,文档

发表于 2021-06-17

SpringBoot实战电商项目mall(40k+star)地址:github.com/macrozheng/…

摘要

最近很火的消息中间件Pulsar,本想学习下,发现网上很多都是介绍性能和对比Kafka的文章,实践的文章很少!于是对着官方文档实践了一波,写下了这篇文章,估计是国内第一篇Pulsar实战文章了,希望对大家有所帮助!

Pulsar简介

Pulsar是一个用于服务端到服务端的消息中间件,具有多租户、高性能等优势。Pulsar最初由Yahoo开发,目前由Apache软件基金会管理。Pulsar采用发布-订阅的设计模式,Producer发布消息到Topic,Consumer订阅Topic、处理Topic中的消息。

Pulsar具有如下特性:

  • Pulsar的单个实例原生支持集群。
  • 极低的发布延迟和端到端延迟。
  • 可无缝扩展到超过一百万个Topic。
  • 简单易用的客户端API,支持Java、Go、Python和C++。
  • 支持多种Topic订阅模式(独占订阅、共享订阅、故障转移订阅)。
  • 通过Apache BookKeeper提供的持久化消息存储机制保证消息传递。

Pulsar安装

使用Docker安装Pulsar是最简单的,这次我们使用Docker来安装。

  • 首先下载Pulsar的Docker镜像;
1
bash复制代码docker pull apachepulsar/pulsar:2.7.1
  • 下载完成后运行Pulsar容器,http协议访问使用8080端口,pulsar协议(Java、Python等客户端)访问使用6650端口。
1
2
3
4
5
6
7
bash复制代码docker run --name pulsar \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
-d apachepulsar/pulsar:2.7.1 \
bin/pulsar standalone

Pulsar可视化

Pulsar Manager是官方提供的可视化工具,可以对多个Pulsar进行可视化管理,虽然功能不多,但也基本够用了,支持Docker部署。

  • 下载pulsar-manager的Docker镜像;
1
bash复制代码docker pull apachepulsar/pulsar-manager:v0.2.0
  • 下载完成后运行pulsar-manager容器,从9527端口可以访问Web页面;
1
2
3
4
bash复制代码docker run -it --name pulsar-manager\
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
-d apachepulsar/pulsar-manager:v0.2.0
  • 运行成功后,我们刚开始无法访问,需要创建管理员账号,这里创建账号为admin:apachepulsar:
1
2
3
4
5
6
7
bash复制代码CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H 'Content-Type: application/json' \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
  • 创建成功后,通过登录页面进行登录,访问地址:http://192.168.5.78:9527

  • 登录成功后我们需要先配置一个环境,就是将需要管理的Pulsar服务配置上去,配置的Service URL为:http://192.168.5.78:8080

  • 可以查看Tenant列表;

  • 可以查看Topic列表和管理Topic;

  • 还可以查看Topic的详细信息。

Pulsar结合SpringBoot使用

Pulsar结合SpringBoot使用也是非常简单的,我们可以使用Pulsar官方的Java SDK,也可以使用第三方的SpringBoot Starter。这里使用Starter,非常简单!

  • 首先在pom.xml中添加Pulsar相关依赖;
1
2
3
4
5
6
xml复制代码<!--SpringBoot整合Pulsar-->
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>1.0.4</version>
</dependency>
  • 然后在application.yml中添加Pulsar的Service URL配置;
1
2
yaml复制代码pulsar:
service-url: pulsar://192.168.5.78:6650
  • 再添加Pulsar的Java配置,声明两个Topic,并确定好发送的消息类型;
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码/**
* Pulsar配置类
* Created by macro on 2021/5/21.
*/
@Configuration
public class PulsarConfig {
@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
.addProducer("bootTopic", MessageDto.class)
.addProducer("stringTopic", String.class);
}
}
  • 创建Pulsar生产者,往Topic中发送消息,这里可以发现Pulsar是支持直接发送消息对象的;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码/**
* Pulsar消息生产者
* Created by macro on 2021/5/19.
*/
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<MessageDto> template;

public void send(MessageDto message){
try {
template.send("bootTopic",message);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}
  • 创建Pulsar消费者,从Topic中获取并消费消息,也是可以直接获取到消息对象的;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码/**
* Pulsar消息消费者
* Created by macro on 2021/5/19.
*/
@Slf4j
@Component
public class PulsarRealConsumer {

@PulsarConsumer(topic="bootTopic", clazz= MessageDto.class)
public void consume(MessageDto message) {
log.info("PulsarRealConsumer consume id:{},content:{}",message.getId(),message.getContent());
}

}
  • 添加测试接口,调用生产者发送消息;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码/**
* Pulsar功能测试
* Created by macro on 2021/5/19.
*/
@Api(tags = "PulsarController", description = "Pulsar功能测试")
@Controller
@RequestMapping("/pulsar")
public class PulsarController {

@Autowired
private PulsarProducer pulsarProducer;

@ApiOperation("发送消息")
@RequestMapping(value = "/sendMessage", method = RequestMethod.POST)
@ResponseBody
public CommonResult sendMessage(@RequestBody MessageDto message) {
pulsarProducer.send(message);
return CommonResult.success(null);
}
}
  • 在Swagger中调用接口进行测试;

  • 调用成功后,控制台将输入如下信息,表示消息已经被成功接收并消费了。
1
bash复制代码2021-05-21 16:25:07.756  INFO 11472 --- [al-listener-3-1] c.m.m.tiny.component.PulsarRealConsumer  : PulsarRealConsumer consume id:1,content:SpringBoot Message!

总结

上次写了一篇《吊炸天的 Kafka 图形化工具 Eagle,必须推荐给你!》介绍了Kafka的基本使用,这里和Pulsar做个对比。Pulsar对Docker支持无疑是更好的,官方文档也更全。对比下图形化工具Pulsar Manager和Kafka Eagle,Pulsar的图形化工具感觉有点简陋。介于目前雅虎、腾讯、360等互联网大厂都在使用Pulsar,Pulsar的性能和稳定性应该是很不错的!

参考资料

Pulsar的官方文档很全,样式也不错,基本照着文档来一遍就能入门了。

  • Pulsar官方文档:pulsar.apache.org/docs/en/sta…
  • SpringBoot Starter官方文档:github.com/majusko/pul…

项目源码地址

github.com/macrozheng/…

本文 GitHub github.com/macrozheng/… 已经收录,欢迎大家Star!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

图解对象分配的过程

发表于 2021-06-17

这是我参与更文挑战的第3天,活动详情查看: 更文挑战

前言

当我们new一个对象实例时,首先是存入堆中新生代中的伊甸园区,如果伊甸园区空间满了,就会进行YGC,本篇文章就讲述一下对象的分配过程是如何的…

一般过程

第一次轻GC

image-20210615215811323.png

  1. new的对象先放着伊甸园区;
  2. 当伊甸园区空间满了时,程序又要创建新的对象时,JVM垃圾回收器将对伊甸园区进行垃圾回收,将伊甸园区的不再被其他对象所引用的对象进行销毁,再加载新的对象到伊甸园区;
  3. 然后将伊甸园区剩余的对象移动到幸存者0区,每个对象被分配了一个年龄计数器(age),每进行一次GC,幸存下来的对象age累加,上面的两个幸存对象age被赋值为1;
  4. GC完成后,此时伊甸园区是空的了。

说明:

  1. S0区和S1区,也被叫做From区和To区,判断二者很简单,谁是空的谁是To,则另外一个就是From区;
  2. 每次GC时,幸存的对象则会被放入To区,以上面的为例,第一次GC时,S0是To区,当GC完成后,S1区是空的就变成了To区;

第二次轻GC

image-20210615221650327.png

  1. 当伊甸园区空间又满了时,进行第二次GC,将伊甸园区幸存的对象放入S1区,age此时为1;
  2. 同时对S0区的对象进行判断是否还被使用,如果被使用的话,就将其放入S1区,age累加此时为2;
  3. 第二次GC完成后,S0区为空了,此时S0区为To区,S1区为From区;

第N次GC

image-20210615223737583.png

  1. 第N次GC时,我们发现S1区的有两个对象的age已经是15了,如果此时这两个对象还是被引用的,则将其晋升到Old区;
  2. 其中15为默认阈值,这个阈值是可以自己设置:-XX:MaxTenuringThreshold=<N>

特殊过程

image-20210615232416390.png

  1. 创建一个新的对象时,首先判断Eden区是否放的下,如果放的下,就为其分配内存,放不下的话,就进行YGC;
  2. 然后再判断Eden区是否放的下,如果此时放的下的话,就为其分配内存,如果还是放不下,说明这个对象比伊甸园区的空间还要大,这个对象是个超大对象,此时将其放入Old区,如果Old区也放不下,就进行FGC,然后再判断Old区能不能放下,放的下就存在Old区,如果还是放不下,就会出现OOM异常;
  3. 在进行YGC的时候,幸存的对象会放入幸存区,此时判断是否能放下,如果幸存区放不下,就会直接放入Old区。

总结

  1. 什么时候进行GC呢?
* 当伊甸园区满的时候才会进行GC,幸存区满的时候不会进行GC,只有当伊甸园区满的时候,幸存区才会被动进行GC。
  1. 关于幸存者S0和S1区:复制之后有交换,谁空谁是To;
  2. 对于垃圾回收:频繁在新生区进行回收,很少在老年区收集,几乎不在永久区/元空间收集。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 Seata Seata Server 配置流程

发表于 2021-06-16

这是我参与更文挑战的第9天,活动详情查看: 更文挑战

总文档 :文章目录

Github : github.com/black-ant

一 .前言

这是 Seata 部分的第二篇 , 主要来说一说 Seata Server 中如何完成配置的处理 , Seata 的启动可以参考 Seata 的启动流程 , 本篇文档主要包含如下内容 :

  • 配置的扫描
  • 配置的加载
  • - 配置的动态加载(下一篇)

二 . 配置的扫描

上一篇我们知道 , 配置主要有2个 , nacos.conf / registry.conf , 这2个文件主要由ConfigurationFactory 加载完成

配置处理的起点 :

先来看一下配置的起点 , 在前面说了 , 在处理 main 的时候 , 处理过 ParameterParser ,一切都是从那里开始 :

  • Step 1 : Server # main 函数中 , 发起过 ParameterParser
  • Step 2 : ParameterParser # init , 发起 ConfigurationFactory 的构建 ,同时传入 Mode 类型
1
2
3
4
5
6
7
8
9
java复制代码private void init(String[] args) {
if (StringUtils.isBlank(storeMode)) {

//.......
storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
SERVER_DEFAULT_STORE_MODE);
}

}

PS : 在读取网络配置之前 , 会优先从本地配置中获取连接信息

配置的读取类 :

配置这里使用 AbstractConfiguration 进行配置的读取 , 此处有以下几种配置类 :

  • FileConfiguration
  • SpringCloudConfiguration
  • ApolloConfiguration
  • NacosConfiguration
  • ConsulConfiguration
  • EtcdConfiguration
  • ZookeeperConfiguration

seata-AbstractConfiguration-system.png

2.1 配置的扫描流程

registry.conf 的读取主要是由 ConfigurationFactory 完成 , 来看一下主要的逻辑 :

2.1.1 : 配置类的初始化

在获取 ConfigurationFactory 实例的时候, 会触发一个静态代码块 , 调用 ConfigurationFactory # load , 主要分为 5 个流程 :

  • Step 1 : 依次获取配置类名称
  • Step 2 : 如果未设置特色配置 , 此处会获得名称 -> registry
  • Step 3 : envValue 可以看成 Spring 中的 Profile , 会获得特定环境的配置
  • Step 4 : 通过获取的参数 , 构建 Configuration 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码
private static final String SYSTEM_PROPERTY_SEATA_CONFIG_NAME = "seata.config.name";
private static final String REGISTRY_CONF_DEFAULT = "registry";


private static void load() {
// Step 1 : 依次获取配置类名称
// 从系统中获取配置文件名称 , 此处通常为 null
String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
if (seataConfigName == null) {
// ENV_SEATA_CONFIG_NAME -> SEATA_CONFIG_NAME
seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
}

// Step 2 : 如果未设置特色配置 , 此处会获得名称 -> registry
if (seataConfigName == null) {
seataConfigName = REGISTRY_CONF_DEFAULT;
}

// Step 3 : envValue 可以看成 Spring 中的 Profile , 会获得特定环境的配置
String envValue = System.getProperty(ENV_PROPERTY_KEY);
if (envValue == null) {
envValue = System.getenv(ENV_SYSTEM_KEY);
}

// Step 4 : 通过获取的参数 , 构建 Configuration 对象 -> 2.2.2
Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName,
false) : new FileConfiguration(seataConfigName + "-" + envValue, false);
Configuration extConfiguration = null;

try {
// Step 5 : SPI 扩展 , 通过ExtConfigurationProvider的provide方法 , 替换扩展配置 -> Pro:211001
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
// End : 存在扩展配置,则返回扩展配置实例,否则返回文件配置实例
CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
}

// Pro:211001 SPI 的扩展方式
例如使用 seata-spring-boot-starer时 , 会通过 SpringBootConfigurationProvider 进行扩展 ,此时会通过 application.properties/application.yaml中获取参数

2.2.2 : FileConfiguration 的构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码
public FileConfiguration(String name, boolean allowDynamicRefresh) {
// Step 1 :获取 registry.conf 的 File 对象
File file = getConfigFile(name);
if (file == null) {
targetFilePath = null;
} else {
targetFilePath = file.getPath();
// Step 2 : 加载 FileConfig , 这个 对象 比我们想的更大 -> PS222001
fileConfig = FileConfigFactory.load(file, name);
}

if (targetFilePath == null) {
fileConfig = FileConfigFactory.load();
this.allowDynamicRefresh = false;
} else {
targetFileLastModified = new File(targetFilePath).lastModified();
this.allowDynamicRefresh = allowDynamicRefresh;
}

this.name = name;

// 顺带构建了一个连接池 , 该连接池会用于后面发起 ConfigOperateRunnable
configOperateExecutor = new ThreadPoolExecutor(CORE_CONFIG_OPERATE_THREAD, MAX_CONFIG_OPERATE_THREAD,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new NamedThreadFactory("configOperate", MAX_CONFIG_OPERATE_THREAD));
}

Step 1 : getConfigFile 的获取流程

1
2
3
4
5
6
7
8
9
10
java复制代码private File getConfigFile(String name) {
// 是否为 file: 开头
boolean filePathCustom = name.startsWith(SYS_FILE_RESOURCE_PREFIX);
// 获取 File 路径 , 此处是 registry
String filePath = filePathCustom ? name.substring(SYS_FILE_RESOURCE_PREFIX.length()) : name;
String decodedPath = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name());

// 此处获得最终的 File : D:\java\study\seata_code\server\target\classes\registry.conf
File targetFile = getFileFromFileSystem(decodedPath);
}

Step 2 : FileConfigFactory.load(file, name) 加载流程

1
2
3
4
5
java复制代码public static FileConfig load(File targetFile, String name) {
String fileName = targetFile.getName();
String configType = getConfigType(fileName);
return loadService(configType, new Class[]{File.class, String.class}, new Object[]{targetFile, name});
}

Step 3 : 通过 EnhancedServiceLoader 进行加载

1
2
3
4
java复制代码private static FileConfig loadService(String name, Class[] argsType, Object[] args) {
FileConfig fileConfig = EnhancedServiceLoader.load(FileConfig.class, name, argsType, args);
return fileConfig;
}

Step 4 : EnhancedServiceLoader # load 流程

1
2
3
4
5
6
7
8
9
java复制代码 private S loadExtension(String activateName, ClassLoader loader, Class[] argTypes,
Object[] args) {
// 获取扩展配置类型 , 此处主要是
loadAllExtensionClass(loader);
ExtensionDefinition cachedExtensionDefinition = getCachedExtensionDefinition(activateName);
return getExtensionInstance(cachedExtensionDefinition, loader, argTypes, args);
}

// PS : 最终会通过 constructor.newInstance(args) 将 File 转换为 SimpleFileConfig

PS222001 : FileConfig 对象详情

该对象为一个接口 , 包括2个实现类 YamlFileConfig , SimpleFileConfig , 看下图可以知道 ,除了Nacos 配置 , 还有 Java 等等很多其他的配置

image.png

2.2.3 : 配置的加载

当成进行了 ConfigurationFactory.getInstance() 初始化后 , 会执行 getConfig 获取参数 , 最终调用 getLatestConfig :

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码
public String getLatestConfig(String dataId, String defaultValue, long timeoutMills) {

String value = getConfigFromSysPro(dataId);
if (value != null) {
return value;
}
// 构建一个 ConfigFuture 用于获取参数 : {"dataId":"config.type","operation":"GET","timeout":true}
ConfigFuture configFuture = new ConfigFuture(dataId, defaultValue, ConfigOperation.GET, timeoutMills);
configOperateExecutor.submit(new ConfigOperateRunnable(configFuture));
Object getValue = configFuture.get();
return getValue == null ? null : String.valueOf(getValue);
}

ConfigOperateRunnable 的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public void run() {
if (configFuture != null) {
if (configFuture.isTimeout()) {
setFailResult(configFuture);
return;
}
try {
if (allowDynamicRefresh) {
long tempLastModified = new File(targetFilePath).lastModified();
if (tempLastModified > targetFileLastModified) {
FileConfig tempConfig = FileConfigFactory.load(new File(targetFilePath), name);
if (tempConfig != null) {
fileConfig = tempConfig;
targetFileLastModified = tempLastModified;
}
}
}
if (configFuture.getOperation() == ConfigOperation.GET) {
String result = fileConfig.getString(configFuture.getDataId());
configFuture.setResult(result);
} else if (configFuture.getOperation() == ConfigOperation.PUT) {
configFuture.setResult(Boolean.TRUE);
} else if (configFuture.getOperation() == ConfigOperation.PUTIFABSENT) {
configFuture.setResult(Boolean.TRUE);
} else if (configFuture.getOperation() == ConfigOperation.REMOVE) {
configFuture.setResult(Boolean.TRUE);
}
} catch (Exception e) {
setFailResult(configFuture);
}
}
}

三 . 配置的加载

在上一个步骤中 , 是对 ConfigurationFactory 的创建 , 在创建过程中 , 对配置文件进行了扫描处理 , 后面会通过 buildConfiguration 来 创建 Configuration 对象 :

3.1 配置的加载入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码// 此处以 Nacos 配置为例 : 
private static Configuration buildConfiguration() {
// 从 File 中获取 数据类型 -> 上文中获取
String configTypeName = CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);

if (StringUtils.isBlank(configTypeName)) {
throw new NotSupportYetException("config type can not be null");
}
// 获取 Config 类型 -> Nacos
ConfigType configType = ConfigType.getType(configTypeName);

Configuration extConfiguration = null;
Configuration configuration;
// 对 File 进行专门的处理
if (ConfigType.File == configType) {
String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY);
String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
configuration = new FileConfiguration(name);
try {
extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
}
} else {
// 3.2 -> 通过 ConfigurationProvider 处理
configuration = EnhancedServiceLoader
.load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
}


try {
Configuration configurationCache;
if (null != extConfiguration) {
configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
} else {
// 3.3 ConfigurationCache 代理
configurationCache = ConfigurationCache.getInstance().proxy(configuration);
}
if (null != configurationCache) {
extConfiguration = configurationCache;
}
} catch (EnhancedServiceNotFoundException ignore) {

} catch (Exception e) {
LOGGER.error("failed to load configurationCacheProvider:{}", e.getMessage(), e);
}
return null == extConfiguration ? configuration : extConfiguration;
}


// Step 4. NacosConfigurationProvider 的处理
@LoadLevel(name = "Nacos", order = 1)
public class NacosConfigurationProvider implements ConfigurationProvider {
@Override
public Configuration provide() {
return NacosConfiguration.getInstance();
}
}

知识点一 : ConfigurationProvider

3.2 NacosConfiguration 配置

我们以一个 NacosConfiguration 来看整个的处理 , 其实都是类型的 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码// NacosConfiguration 成员变量
private static volatile ConfigService configService; // 注意 volatile

// NacosConfiguration 构造函数
private NacosConfiguration() {
if (configService == null) {
try {
// Step 1 : 构建 ConfigService
configService = NacosFactory.createConfigService(getConfigProperties());
// Step 2 : 初始化 Seata Config
initSeataConfig();
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
}

Step 1 : NacosFactory.createConfigService(getConfigProperties()); -> PS:32001

这里 getConfigProperties() 会获得一个 Properties 对象 ,其中为 Nacos 的配置属性 , 主要流程如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码private static Properties getConfigProperties() {
// Step 1 : 准备 Properties 对象
Properties properties = new Properties();

// Step 2 : 判断 System 是否存在 endpoint 和 serverAddr 属性
if (System.getProperty(ENDPOINT_KEY) != null) {
properties.setProperty(ENDPOINT_KEY, System.getProperty(ENDPOINT_KEY));
properties.put(ACCESS_KEY, Objects.toString(System.getProperty(ACCESS_KEY), ""));
properties.put(SECRET_KEY, Objects.toString(System.getProperty(SECRET_KEY), ""));
} else if (System.getProperty(PRO_SERVER_ADDR_KEY) != null) {
properties.setProperty(PRO_SERVER_ADDR_KEY, System.getProperty(PRO_SERVER_ADDR_KEY));
} else {
// Step 3 : 获取 Nacos Address 路径
String address = FILE_CONFIG.getConfig(getNacosAddrFileKey());
if (address != null) {
properties.setProperty(PRO_SERVER_ADDR_KEY, address);
}
}

if (System.getProperty(PRO_NAMESPACE_KEY) != null) {
properties.setProperty(PRO_NAMESPACE_KEY, System.getProperty(PRO_NAMESPACE_KEY));
} else {
// Step 4 : 设置 namespace
String namespace = FILE_CONFIG.getConfig(getNacosNameSpaceFileKey());
if (namespace == null) {
namespace = DEFAULT_NAMESPACE;
}
properties.setProperty(PRO_NAMESPACE_KEY, namespace);
}
String userName = StringUtils.isNotBlank(System.getProperty(USER_NAME))
? System.getProperty(USER_NAME)
: FILE_CONFIG.getConfig(getNacosUserName());
if (StringUtils.isNotBlank(userName)) {
String password = StringUtils.isNotBlank(System.getProperty(PASSWORD))
? System.getProperty(PASSWORD)
: FILE_CONFIG.getConfig(getNacosPassword());
if (StringUtils.isNotBlank(password)) {
properties.setProperty(USER_NAME, userName);
properties.setProperty(PASSWORD, password);
}
}

// 通常继承配置拿到的是 {"namespace":"","serverAddr":"127.0.0.1:8848"}
return properties;
}

PS: 此处并不是从 Nacos 获取配置!

Step 2 : initSeataConfig 运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码
private static void initSeataConfig() {
try {
// nacosDataId -> seataServer.properties
String nacosDataId = getNacosDataId();

// Step 1 : 此处才是从 NacosConfigService 中获取 Nacos 配置 (getNacosGroup -> SEATA_GROUP)
String config = configService.getConfig(nacosDataId, getNacosGroup(), DEFAULT_CONFIG_TIMEOUT);
if (StringUtils.isNotBlank(config)) {
try (Reader reader
= new InputStreamReader(new ByteArrayInputStream(config.getBytes()), StandardCharsets.UTF_8)) {
// 流处理 , 加载到 Properties 中
seataConfig.load(reader);
}
NacosListener nacosListener = new NacosListener(nacosDataId, null);
// 添加 NacosListener , 该 Listernr 用于动态通知
configService.addListener(nacosDataId, getNacosGroup(), nacosListener);
}
} catch (NacosException | IOException e) {
LOGGER.error("init config properties error", e);
}
}

附加一 : NacosConfigService

该类是 com.alibaba.nacos.client.config 的工具类 , 后续我们来分析配置的动态处理

3.3 ConfigurationCache

此处有点意思 , 还能这么缓存 ,通过把 get 方法代理 ,实现了配置的缓存操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码private static final ConcurrentHashMap<String, ObjectWrapper> CONFIG_CACHE = new ConcurrentHashMap<>();


public Configuration proxy(Configuration originalConfiguration) {
return (Configuration)Enhancer.create(Configuration.class,
(MethodInterceptor)(proxy, method, args, methodProxy) -> {
// 判断是 get 方法且不是 LatestConfig
if (method.getName().startsWith(METHOD_PREFIX)
&& !method.getName().equalsIgnoreCase(METHOD_LATEST_CONFIG)) {
String rawDataId = (String)args[0];
ObjectWrapper wrapper = CONFIG_CACHE.get(rawDataId);
// 获取参数名
String type = method.getName().substring(METHOD_PREFIX.length());
if (!ObjectWrapper.supportType(type)) {
type = null;
}
if (null == wrapper) {
Object result = method.invoke(originalConfiguration, args);
// ObjectWrapper -> 包装器 , 数据只在非空时存在于缓存中
if (result != null) {
wrapper = new ObjectWrapper(result, type);
CONFIG_CACHE.put(rawDataId, wrapper);
}
}
return wrapper == null ? null : wrapper.convertData(type);
}
// 没有缓存则走原方法
return method.invoke(originalConfiguration, args);
});
}

总结

这篇文章相对上篇 , 稍稍深入了一下.

附录

查找资料的时候 ,发现了一个非常好的图片 , 在完善这篇文章的时候 , 拜读了很久 , 强烈建议大家看一下原文档 , 说的比我清楚多了 @ booogu.top/2021/02/28/…

seata_config_initialization.png

PS : 脑子说我也画的出来 , 手说你会画个P

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…640641642…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%