开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Linux命令行与shell脚本编程大全pdf学习

发表于 2021-07-15

这是一本关于 Linux 命令行与 shell 脚本编程的全方位教程,主要包括四大部分 :Linux 命令行,shell
脚本编程基础,高级 shell 脚本编程,如何创建实用的 shell 脚本。本书针对 Linux 系统的最新特性进行了全面更新,不仅涵盖了详尽的动手教程和现实世界中的实用信息,还提供了与所学内容相关的参考信息和背景资料。通过本书的学习,你将轻松写出自己的 shell 脚本。

本书适合 Linux 程序设计人员阅读。

image.png

核心Linux发行版含有内核、一个或多个图形化桌面环境以及预编译好的几乎所有能见到的Linux应用,它提供了一站式的完整Linux安装

阅读之前,请务必花30秒查看前言说明(在第一、二章前面部分)

《Unix & Linux 大学教程》 - 第一、二章 学习笔记 Unix简介 & 什么是Linux?什么是Unix

《Unix & Linux 大学教程》 - 第三、四章 学习笔记 Unix连接 & 开始使用Unix
《Unix & Linux 大学教程》 - 第五、六章 学习笔记 GUI:图形用户界面 & Unix工作环境
《Unix & Linux 大学教程》 - 第七、八章 学习笔记 Unix键盘使用 & 能够立即使用的程序
《Unix & Linux 大学教程》 - 第九、十章 学习笔记 文档资料:Unix手册与Info & 命令语法
《Unix & Linux 大学教程》 - 第十一、十二章 学习笔记 shell & 使用shell:变量和选项
《Unix & Linux 大学教程》 - 第十三章 学习笔记 使用shell:命令和定制
《Unix & Linux 大学教程》 - 第十四、十五章 学习笔记 使用shell:初始化文件
《Unix & Linux 大学教程》 - 第十六、十七章 学习笔记 过滤器:简介和基本操作
《Unix & Linux 大学教程》 - 第十八章 学习笔记 过滤器:统计和格式化
《Unix & Linux 大学教程》 - 第十九章(一) 学习笔记 过滤器:选取、排序、组合及变换
《Unix & Linux 大学教程》 - 第十九章(二)、第二十章 学习笔记 过滤器:选取、排序、组合及变换 正则表达式
《Unix & Linux 大学教程》 - 第二十一章 学习笔记 显示文件
《Unix & Linux 大学教程》 - 第二十二章(一) 学习笔记 vi文本编辑器(一)
《Unix & Linux 大学教程》 - 第二十二章(二) 学习笔记 vi文本编辑器(二)
《Unix & Linux 大学教程》 - 第二十二章(三) 学习笔记 vi文本编辑器(三)
《Unix & Linux 大学教程》 - 第二十三章 学习笔记 Unix文件系统
《Unix & Linux 大学教程》 - 第二十四章 学习笔记 目录操作
《Unix & Linux 大学教程》 - 第二十五章 学习笔记 文件操作
《Unix & Linux 大学教程》 - 第二十六章(一) 学习笔记 进程和作业控制
《Unix & Linux 大学教程》 - 第二十六章(二) 学习笔记 进程和作业控制

下面是一些较流行的核心Linux发行版

image.png

专业Linux发行版

Linux LiveCD

下载地址:

【Linux命令行与shell脚本编程大全.第3版】

链接:pan.baidu.com/s/1PRhxUHdO…

提取码:wjnm

更多

关注我领取更多资源链接

最后,照旧安利一波我们的公众号:「github掘金」,目前每天都会推荐一篇优质的开源项目文章,主要分享比较实用或有趣的开发工具与开源项目。我们的目标是:挖掘开源的价值。这个公众号超级值得大家关注。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 Seata Server 端事务的 Session

发表于 2021-07-15

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

一 . 前言

上一篇对 Session 的管理进行了了解 , 这一篇对其 SQL 的处理了解看看 , 相关的概念需要看一看上一篇 : 盘点 Seata : Server 端事务的 Session 初始化


整个 Session 的处理会分别对2个操作进行处理 , 一个为 global_table , 一个为 branch_table , 依次来说 :

Pro 1 : global_table 的作用

global_table 用于持久化全局事务 , 可以通过 store.db.global.table 进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Pro 2 : branch_table 的作用

branch_table 用于标识分支事务 , 可以通过 store.db.branch.table 进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

global_table 和 branch_table 是主事务和分支事务的关系 . 当一个业务运行时 , 会在 global_table 中创建一条数据 , 而每一个数据库操作单元 (RM 事务) , 都会创建一条 branch_table 数据.

二 . session 数据走向

来看一下整个流程的数据走向 , 先看一下整体的处理逻辑

1
2
3
4
5
6
java复制代码C1- Netty Request 发起请求
C2- AbstractTCInboundHandler 处理请求 , 发起 Handler 处理
C3- GlobalSession 开启事务
C4- AbstractSessionManager 抽象级的管理 Session
C5- DataBaseTransactionStoreManager 进行实际的 Session 管理
C6- LogStoreDataBaseDAO 进行持久化处理

2.1 Session 数据的入口

在上一篇文档中 , 已经了解到通过 DataBaseTransactionStoreManager # writeSession 开启了 Session 的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 再来看一下 , 可以看到 , 每个 if 中都有一个具体的操作类型 , 主要分为 Global 和 Branch 2个部分
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
// 处理 BRANCH 的相关操作
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {

throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
}

// 可以看到这里通过 if 进行了多个类型的判断 , 再来看一下整体的调用逻辑

2.2 Session 数据的管理

Session 的管理通过 AbstractSessionManager 和 具体的对应类来实现 , AbstractSessionManager 中提供了如下的接口对 Session 进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码public interface SessionManager extends SessionLifecycleListener, Disposable {

// Global 部分
void addGlobalSession(GlobalSession session) throws TransactionException;
GlobalSession findGlobalSession(String xid) ;
GlobalSession findGlobalSession(String xid, boolean withBranchSessions);
void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException;
void removeGlobalSession(GlobalSession session) throws TransactionException;

// Branch 部分
void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;
void updateBranchSessionStatus(BranchSession session, BranchStatus status) throws TransactionException;
void removeBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;

Collection<GlobalSession> allSessions();
// 定时处理
List<GlobalSession> findGlobalSessions(SessionCondition condition);
// 锁定后执行
<T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
throws TransactionException;

// 定时锁
default boolean scheduledLock(String key) {
return true;
}

default boolean unScheduledLock(String key) {
return true;
}
}


// 相对的 , Seata 为 SessionManger 提供了多个实现类
@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager implements Initialize

@LoadLevel(name = "file", scope = Scope.PROTOTYPE)
public class FileSessionManager extends AbstractSessionManager implements Reloadable

@LoadLevel(name = "redis", scope = Scope.PROTOTYPE)
public class RedisSessionManager extends AbstractSessionManager implements Initialize

2.3 Session 的创建

当第一次业务执行的时候 , 就会发起 addGlobalSession 的逻辑流程 , Session 的创建是通过 SessionManager 创建

  • C- AbstractTCInboundHandler # handle
  • C- DefaultCoordinator # doGlobalBegin
  • C- DefaultCore # begin
  • C- GlobalSession # begin
  • C- AbstractSessionManager # onBegin
  • C- DataBaseSessionManager # addGlobalSession
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
java复制代码// Step 临时 : onRequest   
AbstractResultMessage onRequest(AbstractMessage request, RpcContext context)

// Step 1 : 开启全局事务 -> business-service-seata-service-group
C- DefaultCoordinator
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
}

// Step 2 : DefaultCore
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 开启 GlobalSession
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

session.begin();

// 发布事务 GlobalTransactionEvent
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

return session.getXid();
}



// Step 3: 添加 GlobalSession C- DataBaseSessionManager
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (StringUtils.isBlank(taskName)) {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
} else {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
}
}

// Step 4: 构建 convertGlobalTransactionDO
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
{
"applicationId": "business-seata-example",
"beginTime": 1626253527508,
"status": 1,
"timeout": 300000,
"transactionId": 4386660905323928286,
"transactionName": "dubbo-gts-seata-example",
"transactionServiceGroup": "business-service-seata-service-group",
"xid": "192.168.181.2:8091:4386660905323928286"
}


// Step 5: 插入到 SQL 中 , 其中参数如下
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(1, "192.168.181.2:8091:4386660905323928286");
ps.setLong(2, 4386660905323928286);
ps.setInt(3, 1);
ps.setString(4, "business-seata-example");
ps.setString(5, "business-service-seata-service-group");
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,
transactionNameColumnSize) : transactionName;
ps.setString(6, "dubbo-gts-seata-example");
ps.setInt(7, 300000);
ps.setLong(8, 1626253527508);
ps.setString(9, null);
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}

2.4 Session 数据的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码看完了管理 ,来看一下整个流程过程中 , 怎么实现 Session 的读取和使用流程


// Session 的读取主要还是通过 AbstractSessionManager 来管理 , Session 的读取有以下几个途径 :

// 途径一 : RetryRollback 回退
1. DefaultCoordinator # init
2. DefaultCoordinator # handleRetryRollbacking : 处理重试 Rollback 逻辑
3. DataBaseSessionManager # allSessions : 处理 SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME
4. DataBaseSessionManager # findGlobalSessions

// 同样以这个流程处理的还有以下几种 :
1. retryCommitting
2. timeoutCheck
3. asyncCommitting


// 途径二 : Session 的业务流程

在正常逻辑下的处理 , 后来来看一下 , 在整个业务逻辑中 , Session 在其中是如何作用的

2.4.1 Session 的获取

Session 中通过 DataBaseSessionManager # readSession 发起 Session 的读取 , 其中提供了三种方法 :

  • GlobalSession findGlobalSession(String xid)
  • GlobalSession findGlobalSession(String xid, boolean withBranchSessions)
  • List findGlobalSessions(SessionCondition condition) : 主要基于定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
java复制代码
// Step 1 : findSession 逻辑
public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {
// 主要是对 ReadSession 的调用
return transactionStoreManager.readSession(xid, withBranchSessions);
// transactionStoreManager.readSession(condition)
}


// Step 2 : readSession 操作
public GlobalSession readSession(String xid, boolean withBranchSessions) {
// 查询 GlobalTransactionDO
GlobalTransactionDO globalTransactionDO = logStore.queryGlobalTransactionDO(xid);
if (globalTransactionDO == null) {
return null;
}
//branch transactions
List<BranchTransactionDO> branchTransactionDOs = null;
//reduce rpc with db when branchRegister and getGlobalStatus
if (withBranchSessions) {
branchTransactionDOs = logStore.queryBranchTransactionDO(globalTransactionDO.getXid());
}
return getGlobalSession(globalTransactionDO, branchTransactionDOs);
}

// Step 2-1 : 调用具体的 queryBranchTransactionDO
// 此处使用的是 LogStore 的实现类 LogStoreDataBaseDAO
public List<GlobalTransactionDO> queryGlobalTransactionDO(int[] statuses, int limit) {

conn = logStoreDataSource.getConnection();
// 自动提交事务
conn.setAutoCommit(true);

String paramsPlaceHolder = org.apache.commons.lang.StringUtils.repeat("?", ",", statuses.length);
// select xid, transaction_id, status, application_id, transaction_service_group,
// transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified
// from global_table where status in (?,?,?,?) order by gmt_modified limit ?
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQLByStatus(globalTable, paramsPlaceHolder);
// 发起 SQL 处理
ps = conn.prepareStatement(sql);

}

// Step 2-2 : 将 GlobalTransactionDO 转换为 GlobalSession
private GlobalSession getGlobalSession(GlobalTransactionDO globalTransactionDO,
List<BranchTransactionDO> branchTransactionDOs) {
GlobalSession globalSession = SessionConverter.convertGlobalSession(globalTransactionDO);
//branch transactions
if (CollectionUtils.isNotEmpty(branchTransactionDOs)) {
// 加入分支事务 BranchTransactionDO
for (BranchTransactionDO branchTransactionDO : branchTransactionDOs) {
globalSession.add(SessionConverter.convertBranchSession(branchTransactionDO));
}
}
return globalSession;
}

// 到了这里一个简单的 Session 的查询流程就完了 , 与之对应的还有 SessionCondition 和 通过 statuses 进行查询

public List<GlobalSession> readSession(GlobalStatus[] statuses) {
int[] states = new int[statuses.length];
for (int i = 0; i < statuses.length; i++) {
states[i] = statuses[i].getCode();
}
// 查询 status 对应的所有的GlobalTransactionDO
List<GlobalTransactionDO> globalTransactionDOs = logStore.queryGlobalTransactionDO(states, logQueryLimit);
if (CollectionUtils.isEmpty(globalTransactionDOs)) {
return null;
}
List<String> xids = globalTransactionDOs.stream().map(GlobalTransactionDO::getXid).collect(Collectors.toList());
// 获取所有的分支事务
List<BranchTransactionDO> branchTransactionDOs = logStore.queryBranchTransactionDO(xids);
Map<String, List<BranchTransactionDO>> branchTransactionDOsMap = branchTransactionDOs.stream()
.collect(Collectors.groupingBy(BranchTransactionDO::getXid, LinkedHashMap::new, Collectors.toList()));
// 返回 Gloabl 集合
return globalTransactionDOs.stream().map(globalTransactionDO ->
getGlobalSession(globalTransactionDO, branchTransactionDOsMap.get(globalTransactionDO.getXid())))
.collect(Collectors.toList());
}

public List<GlobalSession> readSession(SessionCondition sessionCondition) {
// 定时情况会提供 Xid 或者 TransactionId
if (StringUtils.isNotBlank(sessionCondition.getXid())) {
GlobalSession globalSession = readSession(sessionCondition.getXid());
if (globalSession != null) {
List<GlobalSession> globalSessions = new ArrayList<>();
globalSessions.add(globalSession);
return globalSessions;
}
} else if (sessionCondition.getTransactionId() != null) {
GlobalSession globalSession = readSession(sessionCondition.getTransactionId());
if (globalSession != null) {
List<GlobalSession> globalSessions = new ArrayList<>();
globalSessions.add(globalSession);
return globalSessions;
}
} else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) {
return readSession(sessionCondition.getStatuses());
}
return null;
}

2.4.2 Session 的添加

1
java复制代码 // 在Session 的创建后 ,后续 BranchSession 会加入 GlobalSession  -> 详见 3.1.3 注册主逻辑

2.5 Session 数据的销毁

主要流程如下 :

  • C- SessionHelper # endRollbacked
  • C- GlobalSession # end()
  • C- GlobalSession # onEnd()
  • C- DataBaseSessionManager # removeGlobalSession
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码// DataBaseSessionManager

// Step 1 : 销毁的入口 DefaultCore # doGlobalCommit
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), null, globalSession.getStatus()));

// ........ 省略 commit 提交逻辑
//If success and there is no branch, end the global transaction.
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession);

// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
}
return success;
}


// Step 2 : commit 完结
public static void endCommitted(GlobalSession globalSession) throws TransactionException {
globalSession.changeStatus(GlobalStatus.Committed);
globalSession.end();
}

// Step 3 : 生命周期关闭环节
public void end() throws TransactionException {
// Clean locks first
clean();

for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEnd(this);
}
}


// Step 4 : 移除全局事务
public void removeGlobalSession(GlobalSession session) throws TransactionException {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session);
if (!ret) {
throw new StoreException("removeGlobalSession failed.");
}
}

三 . branch_table 流程

3.1 branch_table 的创建流程

3.1.1 注册入口

这里主要用到的是 AbstractTCInboundHandler , 这个类很重要 , Branch 逻辑均会由该类进行处理 , 该类存在一个实现类 : DefaultCoordinator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码C- AbstractTCInboundHandler # handle

public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
BranchRegisterResponse response = new BranchRegisterResponse();
// 注意 , 这里创建了一个 AbstractCallback
exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
@Override
public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
throws TransactionException {
try {
// 进行 Branch 注册
doBranchRegister(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String
.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
}
}
}, request, response);
return response;
}

3.1.2 doBranchRegister 注册主流程

每一个事务处理单元都会注册一个 Branch ,

1
2
3
4
5
6
7
8
java复制代码C- DefaultCoordinator # doBranchRegister
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
RpcContext rpcContext) throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setBranchId(
core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
request.getXid(), request.getApplicationData(), request.getLockKey()));
}

3.1.3 注册主逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 构建 Branch 业务
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
branchSessionLock(globalSession, branchSession);
try {
// 添加 Branch 业务
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
throw new BranchTransactionException(........);
}
return branchSession.getBranchId();
});
}

3.1.4 添加 Branch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public void addBranch(BranchSession branchSession) throws TransactionException {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onAddBranch(this, branchSession);
}
branchSession.setStatus(BranchStatus.Registered);
add(branchSession);
}


public void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException {
if (StringUtils.isNotBlank(taskName)) {
return;
}
// 又到了核心节点 , 调用 writeSession
boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_ADD, session);
if (!ret) {
throw new StoreException("addBranchSession failed.");
}
}

// 这里省略 writeSession 逻辑 , 反正就那些

来看一下保存的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码C- LogStoreDataBaseDAO 
public boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertBranchTransactionSQL(branchTable);
Connection conn = null;
PreparedStatement ps = null;
try {
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(1, "192.168.181.2:8091:7214921829625028621");
ps.setLong(2, "7214921829625028621");
ps.setLong(3, "7214921829625028637");
ps.setString(4, null);
ps.setString(5, "jdbc:mysql://127.0.0.1:3306/seata");
ps.setString(6, "AT");
ps.setInt(7, 0);
ps.setString(8, "account-seata-example:192.168.181.2:49676");
ps.setString(9, null);
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}

五 . 其他环节补充

5.1 BranchSession 的删除

BranchSession 在 asyncCommit 逻辑中会对BranchSession 进行删除处理 , 主要流程如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
java复制代码// Step 1 : asyncCommit 提交 globalCommit
protected void handleAsyncCommitting() {
SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
core.doGlobalCommit(asyncCommittingSession, true);
});
}


// Step 2 : 提交 Global Commit 事务
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), null, globalSession.getStatus()));

if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
// 省略....
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

switch (branchStatus) {
case PhaseTwo_Committed:
globalSession.removeBranch(branchSession);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
if (globalSession.canBeCommittedAsync()) {

return CONTINUE;
} else {
SessionHelper.endCommitFailed(globalSession);
return false;
}
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {

return CONTINUE;
} else {
return false;
}
}
} catch (Exception ex) {

}
return CONTINUE;
});

}
//If success and there is no branch, end the global transaction.
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession);

// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));

}
return success;
}


// Step 3 : 移除 Session
public void removeBranch(BranchSession branchSession) throws TransactionException {
// do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting),
// because it's already unlocked in 'DefaultCore.commit()'
if (status != Committing && status != CommitRetrying && status != AsyncCommitting) {
if (!branchSession.unlock()) {
throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId());
}
}
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onRemoveBranch(this, branchSession);
}
remove(branchSession);
}


// Step 4 : DataBaseTransactionStoreManager # writeSession 处理 Branch 删除

5.2 对比 Rollback 回退过程

和 Global 一样 , 异步的方式是不一样的

1
java复制代码TODO : 后续完善

5.3 DefaultCoordinator 类简述

这里来看一下 DefaultCoordinator 类有什么 ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码

// 内部对象
C- DefaultCoordinator
F- int TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS = 5000; // 定时任务关闭最大等待时间
F- long COMMITTING_RETRY_PERIOD // 提交重试时间 , 默认 1000L
F- long ASYNC_COMMITTING_RETRY_PERIOD // 异步提交重试时间
F- long ROLLBACKING_RETRY_PERIOD //回退重试时间
F- long TIMEOUT_RETRY_PERIOD // 超时重试
F- long UNDO_LOG_DELETE_PERIOD // UNDO_LOG 删除周期
F- long UNDO_LOG_DELAY_DELETE_PERIOD = 3 * 60 * 1000 // Undo_log延迟删除周期
F- int ALWAYS_RETRY_BOUNDARY = 0 // 总是重试
F- Duration MAX_COMMIT_RETRY_TIMEOUT // 最大提交重试超时时间
F- Duration MAX_ROLLBACK_RETRY_TIMEOUT // 最大回滚重试超时时间
F- boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE // 开启超时解锁
F- ScheduledThreadPoolExecutor retryRollbacking
F- ScheduledThreadPoolExecutor retryCommitting
F- ScheduledThreadPoolExecutor asyncCommitting
F- ScheduledThreadPoolExecutor timeoutCheck
F- ScheduledThreadPoolExecutor undoLogDelete
F- RemotingServer remotingServer
F- DefaultCore core
F-EventBus eventBus

5.4 GlobalSession 是什么

当创建全局事务后 , 会创建 全局 Session , 实现SessionLifecycle接口,提供begin,changeStatus,changeBranchStatus,addBranch,removeBranch等操作session和branchSession的方法

  • GlobalSessionLock : 该对象内部持有ReentrantLock对象,利用ReentrantLock的lock和unlock机制
  • BranchSession : 分支session,管理分支数据,受globalSession统一调度管理,它的lock和unlock方法由lockManger实现
  • DefaultLockManager : DefaultLockManager是LockManager的默认实现,它获取branchSession的lockKey

说一说 lifecycleListeners 的作用
private Set lifecycleListeners = new HashSet<>()

在不同的方法中 , 会分别循环 Listeners 调用对应的逻辑

  • begin() -> lifecycleListener.onBegin(this)
  • changeStatus(GlobalStatus status) -> lifecycleListener.onStatusChange(this, status)
  • changeBranchStatus(BranchSession branchSession, BranchStatus status) -> lifecycleListener.onBranchStatusChange(this, branchSession, status)
  • close() -> lifecycleListener.onClose(this)
  • end() -> lifecycleListener.onEnd(this)

SessionLifecycleListener 模块

Seata-SessionLifecycleListener.png

可以看到 , 主要有3种实现 , DataBaseSessionMananger , FileSessionManager , RedisSessionManager

他们分别会对应各自的 Store 管理器 :

FileSessionManager -> FileTransactionStoreManager

RedisSessionManager -> RedisTransactionStoreManager

DataBaseSessionManager -> DataBaseTransactionStoreManager

5.5 主逻辑流程图

分布式事务-gloablSession.jpg

总结

文章内容比较简单 , 主要是为了完善整个 Seata 的版图 , 算是梳理个七七八八了

后续再把 undo-log 梳理完 , 整个流程就差不多可以结束了

等 Seata 完成了单纯得源码分析也可以告一段落了 , 后续慢慢的要开启应用篇 , 拭目以待

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

摸鱼神器——Reader

发表于 2021-07-15

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」

前言

摸鱼,又称科学上班法。

最近项目已上线,可以偷一下闲。今天给大家安利一个上班摸鱼必备神器,有了它,摸鱼于无形之中!

Reader介绍

Reader阅读器,是一款非常小巧的电脑本地阅读器。它能和我们的编辑器窗口融合到什么程度呢?下面就开始给大家一一介绍,目前只支持Windows。

1. 这看起来是个工作窗口

图片.png

2. 如果你再仔细看

图片.png

3. 放大了再仔细看,是不是会发现有什么不一样了

图片.png

4. 我把设置标题栏打开

图片.png
是不是虎躯一震,有被震精到吧?要是你领导不伏头盯着看,任他在你后面走过来走过去,也只会看到你“盯着”屏幕,努力工作的样子!

Reader功能介绍

目前只支持txt和epub两种格式的文件。

图片.png
图片.png

图片.png

图片.png

快捷键
  1. F12: 隐藏/显示边框
  2. F11: 全屏/退出全屏
  3. Esc: 退出全屏
  4. Ctrl + O:打开新文件
  5. Ctrl + F: 全文关键字查找,并跳转到指定位置
  6. Ctrl + T:置顶/取消置顶
  7. Ctrl + G: 全文进度百分比跳转
  8. Ctrl + →:快速跳转到下一章
  9. Ctrl + ←:快速跳转到上一章
  10. →: 下一页
  11. ←: 上一页
  12. ↑: 向上滚动 N 行 (N: 菜单【设置】 > 【基本设置】 > 【文本滚动速度】,默认为1)
  13. ↓: 向下滚动 N 行 (N: 菜单【设置】 > 【基本设置】 > 【文本滚动速度】,默认为1)
  14. Ctrl + 鼠标滚轮:调节窗口透明度
    向上滚动:透明度变低
    向下滚动:透明度变高
  15. Ctrl + Shift + 鼠标滚轮:调节窗口透明度
    向上滚动:透明度直接最低(alpha = 0xff)
    向下滚动:透明度直接最高(alpha = 0x01),基本全透明
    (!!无边框时对窗口做了特殊处理:1. 支持看起全透明(alpha=1) 2. 支持背景透明,字体不透明)
    (!!有边框时暂不支持上面功能。)
  16. 空格键: 开始/停止自动翻页 (菜单【设置】 > 【基本设置】 > 【自动翻页时间间隔】,默认为3000ms)
    点击菜单选项时,会自动暂停“自动翻页”,菜单完成后会自动恢复“自动翻页”
    书本翻到最后一页会自动停止“自动翻页”
    自动翻页可以设置【翻页】与【滚动】两种模式
  17. 鼠标左右键同时按下:快速隐藏窗口,效果与热键 Alt + H 相同
    注意:隐藏后,需要使用热键 Alt + H 才能再次显示
    可以在菜单【设置】 > 【基本设置】 > 【左右键同时按下隐藏窗口】开启/关闭此功能
  18. Ctrl + M: 添加书签。(每本书最大支持256个书签)
    菜单栏 > 【书签】可以显示书签列表
    在书签列表项上面鼠标右击,可以删除书签
  19. Ctrl + E: 进入编辑模式,可以用于文本复制,和简单编辑操作。

图片.png

1
markdown复制代码       最后放一张自己摸鱼图。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

微服务部署:蓝绿部署、滚动部署、灰度发布、金丝雀发布 一、蓝

发表于 2021-07-15

​
在项目迭代的过程中,不可避免需要”上线“。上线对应着部署,或者重新部署;部署对应着修改;修改则意味着风险。

目前有很多用于部署的技术,有的简单,有的复杂;有的得停机,有的不需要停机即可完成部署。本文的目的就是将目前常用的布署方案做一个总结。

一、蓝绿布署

Blue/Green Deployment(蓝绿部署)

1、定义

蓝绿部署是不停老版本,部署新版本然后进行测试,确认OK,将流量切到新版本,然后老版本同时也升级到新版本。

1、特点

蓝绿部署无需停机,并且风险较小。

2、布署过程

第一步、部署版本1的应用(一开始的状态)

所有外部请求的流量都打到这个版本上。

image.png
image.png

第二步、部署版本2的应用

版本2的代码与版本1不同(新功能、Bug修复等)。

第三步、将流量从版本1切换到版本2。

image.png

第四步、如版本2测试正常,就删除版本1正在使用的资源(例如实例),从此正式用版本2。

3、小结

从过程不难发现,在部署的过程中,我们的应用始终在线。并且,新版本上线的过程中,并没有修改老版本的任何内容,在部署期间,老版本的状态不受影响。这样风险很小,并且,只要老版本的资源不被删除,理论上,我们可以在任何时间回滚到老版本。

4、蓝绿发布的注意事项

当你切换到蓝色环境时,需要妥当处理未完成的业务和新的业务。如果你的数据库后端无法处理,会是一个比较麻烦的问题;

可能会出现需要同时处理“微服务架构应用”和“传统架构应用”的情况,如果在蓝绿部署中协调不好这两者,还是有可能会导致服务停止。
需要提前考虑数据库与应用部署同步迁移 /回滚的问题。
蓝绿部署需要有基础设施支持。
在非隔离基础架构( VM 、 Docker 等)上执行蓝绿部署,蓝色环境和绿色环境有被摧毁的风险。

二、Rolling update(滚动发布)

1、滚动发布定义

滚动发布:一般是取出一个或者多个服务器停止服务,执行更新,并重新将其投入使用。周而复始,直到集群中所有的实例都更新成新版本。

2、特点

这种部署方式相对于蓝绿部署,更加节约资源——它不需要运行两个集群、两倍的实例数。我们可以部分部署,例如每次只取出集群的20%进行升级。

这种方式也有很多缺点,例如:

(1) 没有一个确定OK的环境。使用蓝绿部署,我们能够清晰地知道老版本是OK的,而使用滚动发布,我们无法确定。

(2) 修改了现有的环境。

(3) 如果需要回滚,很困难。举个例子,在某一次发布中,我们需要更新100个实例,每次更新10个实例,每次部署需要5分钟。当滚动发布到第80个实例时,发现了问题,需要回滚,这个回滚却是一个痛苦,并且漫长的过程。

(4) 有的时候,我们还可能对系统进行动态伸缩,如果部署期间,系统自动扩容/缩容了,我们还需判断到底哪个节点使用的是哪个代码。尽管有一些自动化的运维工具,但是依然令人心惊胆战。

(5) 因为是逐步更新,那么我们在上线代码的时候,就会短暂出现新老版本不一致的情况,如果对上线要求较高的场景,那么就需要考虑如何做好兼容的问题。

三、灰度发布/金丝雀部署

1、定义

灰度发布是指在黑与白之间,能够平滑过渡的一种发布方式。AB test就是一种灰度发布方式,让一部分用户继续用A,一部分用户开始用B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度,而我们平常所说的金丝雀部署也就是灰度发布的一种方式。

注释:矿井中的金丝雀
17世纪,英国矿井工人发现,金丝雀对瓦斯这种气体十分敏感。空气中哪怕有极其微量的瓦斯,金丝雀也会停止歌唱;而当瓦斯含量超过一定限度时,虽然鲁钝的人类毫无察觉,金丝雀却早已毒发身亡。当时在采矿设备相对简陋的条件下,工人们每次下井都会带上一只金丝雀作为“瓦斯检测指标”,以便在危险状况下紧急撤离。

灰度发布结构图如下:

image.png

2、灰度发布/金丝雀发布由以下几个步骤组成:

准备好部署各个阶段的工件,包括:构建工件,测试脚本,配置文件和部署清单文件。
从负载均衡列表中移除掉“金丝雀”服务器。
升级“金丝雀”应用(排掉原有流量并进行部署)。
对应用进行自动化测试。
将“金丝雀”服务器重新添加到负载均衡列表中(连通性和健康检查)。
如果“金丝雀”在线使用测试成功,升级剩余的其他服务器。(否则就回滚)
除此之外灰度发布还可以设置路由权重,动态调整不同的权重来进行新老版本的验证。

​

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

三分钟了解Nacos:架构及部署 一、什么是Nacos? 二

发表于 2021-07-14

一、什么是Nacos?

官方描述:一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。

Nacos致力于帮助您发现、配置和管理微服务。Nacos提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。

Nacos帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。

愿景:Nacos通过提供简单易用的动态服务发现、服务配置、服务共享与管理等服务基础设施,帮助用户在云原生时代,在私有云、混合云或者公有云等所有云环境中,更好的构建、交付、管理自己的微服务平台,更快的复用和组合业务服务,更快的交付商业创新的价值,从而为用户赢得市场。

二、功能有哪些?

动态配置服务

动态配置服务让您能够以中心化、外部化和动态化的方式管理所有环境的配置。动态配置消除了配置变更时重新部署应用和服务的需要。配置中心化管理让实现无状态服务更简单,也让按需弹性扩展服务更容易。

服务发现及管理

动态服务发现对以服务为中心的(例如微服务和云原生)应用架构方式非常关键。Nacos支持DNS-Based和RPC-Based(Dubbo、gRPC)模式的服务发现。Nacos也提供实时健康检查,以防止将请求发往不健康的主机或服务实例。借助Nacos,您可以更容易地为您的服务实现断路器。

动态DNS服务

通过支持权重路由,动态DNS服务能让您轻松实现中间层负载均衡、更灵活的路由策略、流量控制以及简单数据中心内网的简单DNS解析服务。动态DNS服务还能让您更容易地实现以DNS协议为基础的服务发现,以消除耦合到厂商私有服务发现API上的风险。

三、特性有哪些?

易于使用

动态配置管理、服务发现和动态的一站式解决方案20多种开箱即用的以服务为中心的架构特性基本符合生产要求的轻量级

更适应云架构

无缝支持Kubernetes和Spring Cloud在主流公共云上更容易部署和运行(例如阿里云和AWS)多租户和多环境支持

生产等级

脱胎于历经阿里巴巴10年生产验证的内部产品支持具有数百万服务的大规模场景具备企业级SLA的开源产品

丰富的应用场景

支持限流、大促销预案和异地多活直接支持或稍作扩展即可支持大量有用的互联网应用场景流量调度和服务治理

四、Nacos地图

image.png

特性大图:要从功能特性,非功能特性,全面介绍我们要解的问题域的特性诉求

架构大图:通过清晰架构,让您快速进入Nacos世界

业务大图:利用当前特性可以支持的业务场景,及其最佳实践

生态大图:系统梳理Nacos和主流技术生态的关系

优势大图:展示Nacos核心竞争力

战略大图:要从战略到战术层面讲Nacos的宏观优势

五、Nacos生态图

image.png

如Nacos全景图所示,Nacos无缝支持一些主流的开源生态,例如

Spring Cloud

Apache Dubbo and Dubbo Mesh

Kubernetes and CNCF。

使用Nacos简化服务发现、配置管理、服务治理及管理的解决方案,让微服务的发现、管理、共享、组合更加容易。

关于如何在这些生态中使用 Nacos,请参考官方文档:什么是Nacos

六、Nacos概念

可以通过如下官方文档的描述,先行了解Nacos的概念:

nacos.io/zh-cn/docs/…

PS: 以上概念非常重要,需要认真看一下,否则会对 Nacos 理解产生很多阻碍

下面我抽出几个单独讲一下:

命名空间:常用于生产环境隔离,比如dev/test/beta/prod等

配置:应用的配置文件,常用 yml/json 格式存储,一个应用服务可以配置多个配置文件,需要设置扩展,配置ID唯一,常用表现形式=应用名/应用名+Profile(举个栗子:mall-test.yml)

配置管理:可以对配置文件进行管理,配置的编辑、存储、分发、变更管理、历史版本管理、变更审计等所有与配置相关的活动

配置项:key/value形式的配置内容,应用application.yml/bootstrap.yml等文件的内容项

服务:服务名称=应用名称,服务分组=一组服务的聚合,服务集群=同分组同应用名多个实例服务聚合,服务权重=加权,权重越大,流量越大

元信息:Nacos数据(如配置和服务)描述信息,如服务版本、权重、容灾策略、负载均衡策略、鉴权配置、各种自定义标签 (label),从作用范围来看,分为服务级别的元信息、集群的元信息及实例的元信息。

七、Nacos架构

nacos.io/zh-cn/docs/…

以上为官方对于Nacos架构的解释,同学们可以认真看一下,下面我简单讲一下常用到的一些组件和配置。

数据存储

Nacos的数据存储可以分为两种,一种是内存,一种是DB,常用的都是存储在MySQL中,将数据持久化,避免数据丢失;官方也提供了默认的初始化SQL,需要在 Github 上面去下载,地址:初始化SQL。

控制台

Nacos也提供了一个便于用户使用的后台页面,通过链接 http://localhost:8848/naocs 就能访问,密码也是 nacos/nacos,控制台提供包括配置版本跟踪、金丝雀发布、一键回滚配置以及客户端配置更新状态跟踪在内的一系列开箱即用的配置管理特性,帮助您更安全地在生产环境中管理配置变更和降低配置变更带来的风险。控制台样例

服务管理

服务管理包括了对服务的CRUD、负载转发策略、健康检查等,包含了一整套服务注册、发现的管理。

服务配置

服务配置包括了对配置的CRUD、版本控制、监听管理、推送轨迹、数据聚合等功能。

用户、角色、权限

和普通的系统一样,Nacos 提供了一套用户-角色-权限的基础系统,解决了注册-登陆-权限控制的问题。

两种启动模式

Nacos支持将注册中心(Service Registry)与配置中心(Config Center) 在一个进程合并部署或者将2者分离部署的两种模式。

八、Nacos部署

nacos.io/zh-cn/docs/…

本地模式

基础的配置,可以根据以上链接中官方文档描述来实现,这里将详细解释几个注意事项。

版本:当前最新稳定版v1.4.2,尽量用最新版本,官方会不断修复问题,新版的支持会更好,尽量暂时不用v2.0.1版本,这个版本还有待验证,尽量避免踩坑,稳定以后再尝试

环境:同学一定要注意官方对于环境的要求,64位jdk1.8+、64位linux/windows、maven3.2+

安装包:如果没有特殊情况,尽量使用官方编译后的压缩包,避免下载官方Git后自己打包出现的问题,需要二次开发或者自定义配置的除外

配置:启动配置,官方默认启动使用了内嵌数据库的形式,这里作者推荐使用MySQL外部数据库方式,如果使用MySQL方式,那么需要修改 conf/application.properties 配置文件,如下:

If use MySQL as datasource:

spring.datasource.platform=mysql

Count of DB:

db.num=1

### Connect URL of DB: db.url.0=jdbc:mysql://localhost:3306/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai db.user.0=xxx db.password.0=xxx

这时候,如果不知道 Nacos MySQL 初始化脚本的同学,可以通过 初始化SQL 下载脚本,或者通过 conf/nacos-mysql.sql 配置脚本。

如果机器的内存不够充足的,启动前一定要修改 bin/startup.sh 的启动配置,如下:

`#===========================================================================================

JVM Configuration

#===========================================================================================
if [[ “MODE”==”standalone”]];thenJAVAOPT=”{MODE}” == “standalone” ]]; then
JAVA_OPT=”MODE”==”standalone”]];thenJAVAO​PT=”{JAVA_OPT} -Xms512m -Xmx512m -Xmn256m”
JAVA_OPT=”JAVAOPT−Dnacos.standalone=true”elseif[[“{JAVA_OPT} -Dnacos.standalone=true”
else
if [[ “JAVAO​PT−Dnacos.standalone=true”elseif[[“{EMBEDDED_STORAGE}” == “embedded” ]]; then
JAVA_OPT=”JAVAOPT−DembeddedStorage=true”fiJAVAOPT=”{JAVA_OPT} -DembeddedStorage=true”
fi
JAVA_OPT=”JAVAO​PT−DembeddedStorage=true”fiJAVAO​PT=”{JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”
JAVA_OPT=”JAVAOPT−XX:−OmitStackTraceInFastThrow−XX:+HeapDumpOnOutOfMemoryError−XX:HeapDumpPath={JAVA_OPT} -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=JAVAO​PT−XX:−OmitStackTraceInFastThrow−XX:+HeapDumpOnOutOfMemoryError−XX:HeapDumpPath={BASE_DIR}/logs/java_heapdump.hprof”
JAVA_OPT=”${JAVA_OPT} -XX:-UseLargePages”
fi`

PS:启动命令(-m standalone 代表着单机模式运行,如果不加这个参数则代表使用集群模式运行)

Docker模式

Docker和普通模式差不多,也是需要设置配置文件和启动参数,具体的配置和启动方式,我放在了Github的实例项目中,有需求的同学可以看一下,如下:

github.com/YClimb/dock…
默认使用 standalone-mysql-5.7.yaml 配置来启动Nacos,详细描述见以上链接项目。

管理后台

服务启动以后,如果是本地或者外网服务器,就可以通过http://IP:8848/nacos访问控制台,账号密码:nacos/nacos 。

九、权限及运维

通过以上实例,一个基础的Nacos配置就完成了,如果我们还需要Nacos的权限配置和运维相关操作,那么可以继续往下看;

权限配置

首先,需要查看Nacos的系统参数介绍:Nacos系统参数介绍,看完以后,了解conf/application.properties参数含义;

然后,可以开始配置权限:鉴权,通过官方文档的描述,可以修改配置完成鉴权的开启,需要注意的是,对于不同版本来说,开启服务身份识别功能不太一样,这里需要同学认真查看,尽量用最新稳定版本,否则版本问题的坑可能在不经意间就会遇到。

运维问题

上面所有讲到的部署都是单机版本,如果有同学需要集群版本,可以查看以下官方文档:

nacos.io/zh-cn/docs/…
对于集群版本来说,默认是通过 [ip/域名 -> nginx -> 实例] 的方式实现的,这里有几个问题需要注意:

环境:64位Linux、64位JDK1.8+、Maven3.2+、3个节点及以上(重要,否则无法选举成功)

安装包:推荐使用安装包模式,tar.gz,这里需要注意,集群版本和单机版本不一样,官方推荐的集群版本是1.3.0,一定要看好,否则会报错

集群配置:配置文件conf/cluster.conf,配置3+节点

数据源:使用MySQL数据库

启动:sh startup.sh启动即可,默认使用外置数据源

域名:通过域名来使用时,不可设置压缩gzip模式,否则应用服务获取配置时会有乱码问题;

总结

我这边整理了一份Spring相关资料文档、Spring系列全家桶、Java的系统化资料:(包括Java核心知识点、面试专题和21年最新的互联网真题、电子书等)有需要的朋友可以关注公众号【程序媛小琬】即可获取。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

B 站崩了,受害程序员聊聊

发表于 2021-07-14

非吃瓜,B 站事件始末分析 + 防治技术分享

大家好,我是鱼皮,昨天小破站崩了的事情相信很多朋友都听说了。

这要是搁以前,不爱吃瓜的我根本不会去关注这种事,崩了就崩了呗,反正天塌下来有程序员大佬们扛着,很快就会好的。

但这次不太一样,因为我自己也成为了本事件的 “受害者” !

所以今天以一名程序员的视角,带大家回顾 B 站崩了事件的始末、理性推测原因、并分享一些防治技术和收获感悟。

事件始末

B 站刚刚崩,但还没有完全崩的时候,我正在直播间写小代码、和小伙伴们友好交流。由于我在写代码的时候不会经常看弹幕,没有注意到弹幕不动了,没有任何小伙伴发弹幕。

起初我以为只是自己写代码太无聊了,没人搭理我。然后我就搁哪儿喃喃自语:奇怪了,怎么没有小伙伴发弹幕?喂,有人么?Hello?Hi?歪比八步?

后来我才发现,弹幕区连进房提示都没了,总不可能几分钟没人进来吧?肯定是出事了!

我以为是弹幕卡了,于是就关闭弹幕再打开,结果还是一样。然后我就想着重启下直播,结果关掉之后再也打不开了,屏幕上直接提示:似乎已断开和服务器的链接。

实话说,在此之前,我根本没想到 B 站这种亿级流量的平台会崩掉。所以第一反应和大家一样,都怀疑是自己网络的问题,结果发现网页能打开,换了网也连不上。于是我突然细思极恐:握草?B 站竟然也把我封了?(老通缉犯了)

就是这样,我是事故现场的受害人,是倒在地上懵逼的那个,所以直到事故发生十几分钟后,我才通过其他途径了解到,哦,原来是 B 站出事了。

虽然错过了第一现场,但通过热搜,也能了解到 B 站崩盘的大致过程,简单地说,就是在 几个小时 内,用户无法正常访问 B 站的任何功能!

打开 B 站,先是 404 Not Found 找不到资源:

然后是 502 错误网关:

1 个小时后,一些小伙伴表示 B 站的部分功能已经可以使用了,但还是没有完全恢复,直到 14 日凌晨,B 站官方才终于回应,恢复正常了。

原因猜测

昨晚剪视频到凌晨 2 点多,本来想直接睡觉,但手贱又打开了知乎,发现 “B 站崩了” 是 Top 1 热门的问题,出于好奇就点进去想了解下事故背后的真正原因,看看大家的高见。

本来我一个非 B 站工作的外来人,对它的技术架构没有深入了解;再加上缺少关键信息、没有可靠的推测凭据,所以不准备发表意见的。结果发现前排没有几个程序员在从技术的角度推测事故原因,都是一些帮大家吃瓜更香的小回答。那我不妨根据过往学到的架构知识,做一波推测,万一推中了感觉也挺惊喜的。

其实在 20 年的时候,B 站技术总监毛剑老师就在腾讯云 + 社区分享过《B 站高可用架构实践》讲座,当时我全程看完了,但没想到,有一天,高可用的 B 站不可用了。

所以在这次分析前,我先把《B 站高可用架构实践》文章又读了一遍,有趣的是,短短半天,这篇文章的阅读量涨了 15 万!

而且更有趣的是,文章底下多了不少 “嘲讽”,什么 “八股文架构师” 之类的:

讲座评论区

不过我觉得没必要,因为毛剑老师分享的技术确实是很实用的高可用解决方案,只不过还是缺少了一些印证吧。

文章地址:cloud.tencent.com/developer/a…

下面说下我的猜测。

猜测 1:网关挂了

首先,这次小破站事故发生时,其他站点竟然也崩了!比如 A 站、晋江、豆瓣,统统都上了热搜。

这些事故同时发生,说明是这些系统依赖的公共服务出了问题,而唯一有能力导致大规模服务瘫痪的就是 CDN 了。

CDN 是内容分发网络,提前将源站内容发到各个地区的服务器节点,之后就可以让不同地区的用户就近获取内容,而不是都到源站获取,从而起到内容加速、负载均衡的作用。

用户就近访问内容

一旦 CDN 挂了,该地区用户的流量会全部打到网关上:

CDN 挂了

网关就像是家族老大,用户有需求就跟老大说,然后老大再分配需求给弟弟们去完成。

此外,网关通常还承担起了保护服务弟弟们的使命,统一负载均衡、控制流量、熔断降级等。

按道理来讲,通常网关不仅要保护下游的服务,自身也是需要安全保护的。但为什么网关没有保护好自己呢?

我的猜测是:网关还没有来的及开启保护措施(自身的熔断降级等),就被流量瞬狙了。

网关一挂,服务没爹,服务缺少了调用入口,自然就不可用了,未必所有网关后的服务都处于瘫痪状态。

猜测 2:服务雪崩

还有一种猜测是 B 站系统存在很多服务的 调用链 。由于 CDN 或者部分机器挂掉,导致某个下游服务 A 的执行耗时增加,从而导致上游调用服务 A 的服务 B 执行耗时也增加,让系统单位时间的处理能力变差。再加上上游不断积压请求,最终导致整个调用链雪崩,所有链上服务从儿子到爸爸全部灭门。

服务调用链

举个通俗的例子就是家里的马桶堵了,桶里的还没充下去,上面却还在不断 “送货”,最终下场就是你不能再 “送货” 了,马桶爆了!

官方解释

在官方解释是服务器机房发生故障之后,又看了其他老师的分析,感觉官方的解释还说的过去。

的确之前 B 站在对外分享高可用架构时几乎没有提到 灾备 和 多活 方面的设计,更多的是在本地服务层和应用层去处理,比如限流、降级、熔断、重试、超时处理等,所以在设计大规模分布式系统时还是要考虑更全面一些,引以为戒~

直到发文前,知乎 Top 1 的回答者又很用心地整理了线索:

为什么其他两家很快就恢复了,B 站却花了几个小时才恢复正常呢?

感觉多少和 B 站自研组件有关系,一方面受到云服务商的影响,导致下游的服务连锁挂掉了,故障面积大 ;另一方面重启也需要时间,而且重启过程中,上游的负载均衡也未必能承受住流量高峰,所以想要恢复到正常水平,至少要等待很多容器副本完全重启。

另外昨天 23 点半左右,我打开 B 站时,看到的内容是几个小时前的老数据,说明这个时候 B 站已经重启了部分服务副本,并且开启了降级措施,并没有查询真实数据。

没想到自己的这个回答还在知乎小火了一把,第一次成为了 千万浏览量 问题的 Top 2,受宠若惊,受宠若惊。。。

保命:以上本身就是我的猜测哈哈,专业度有限,欢迎大家评论区讨论,轻喷轻喷。

防治技术

再简单聊一下服务故障的防治技术,就是如何保证服务的高可用性,尽量持续为用户提供服务而不宕机。

我将了解到的技术简单分类,整理成了一张思维导图:

故障防治思维导图

暂时想到这么多,当然还有其他的技术。

时间有限,就先不对这些技术展开去讲了。关于如何减少系统出现的 Bug、保证服务高可用,欢迎大家阅读我的历史文章:揭秘软件开发的达摩克利斯之剑,以上很多技术也都有讲解。

收获感悟

关于这次事故,我作为受害者之一,也有一些收获和感悟,而不是吃瓜吃了个寂寞。

首先是要有 质疑精神 ,我们在写程序出现问题时,习惯性地先从自己身上找原因没有任何问题,但自己排查没有发现 Bug 后,应该大胆推测是我们用到的类库、组件、或者依赖服务、甚至有可能是编辑器出了问题,而不是认为知名的东西一定正确。像小破站出了问题后,我竟然怀疑是自己的直播被封了哈哈,差点想找到管理去跪了。

在编程方面,我们不能只去背知识、听别人讲,做 八股文架构师;而是要做实践经验丰富的工程师,不盲目相信、不想当然,而是在实践中积累经验、结合实际去优化系统。

通过这次结合实际故障过程的分析,我也复习了一遍之前学到的架构知识,对一些高可用的设计有了更深的理解。有朝一日,尽量不让 编程导航(www.code-nav.cn) 成为下一个 B 站(狗头)。

还有就是上面提到的,要时刻居安思危,养成防御性编程的好习惯,而不是出了问题再去补救。像 B 站这种知名平台,出一点小问题,对用户、对企业带来的损失都是难以估量的。

感谢 B 站爸爸送来的一天大会员补偿 ❤️


最后再送大家一些 帮助我拿到大厂 offer 的学习资料:

跑了,留下 6T 的资源!

我是如何从零开始通过自学,拿到腾讯、字节等大厂 offer 的,可以看这篇文章,不再迷茫!

我学计算机的四年,共勉!

我是鱼皮,点赞 还是要求一下的,祝大家都能心想事成、发大财、行大运。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

OncePerRequestFilter和HttpServl

发表于 2021-07-14

场景

  • 修改请求中的参数,某字段加密(接口需要解密后的数据),此处统一解密
  • 请求参数和响应结果,记录日志/存数据库
  • 在请求参数中添加 额外字段(标识某些系统等)

OncePerRequestFilter

GET方式

getParameterMap(),只能够获取到GET请求的参数

1
2
3
4
5
6
7
8
9
java复制代码//请求参数
parameterMap = httpRequest.getParameterMap();


//下为通用
//请求方式
String requestMethod = httpRequest.getMethod();
String remoteAddr = httpRequest.getRemoteAddr();
int remotePort = httpRequest.getRemotePort();

注意:异常 No modifications are allowed to a locked ParameterMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
java复制代码//类 org.apache.catalina.util.ParameterMap
@Override
public V put(K key, V value) {
checkLocked();
return delegatedMap.put(key, value);
}
private void checkLocked() {
if (locked) {
throw new IllegalStateException(sm.getString("parameterMap.locked"));
}
}

//类 org.apache.tomcat.util.res.StringManager
public String getString(String key) {
if (key == null){
String msg = "key may not have a null value";
throw new IllegalArgumentException(msg);
}

String str = null;

try {
// Avoid NPE if bundle is null and treat it like an MRE
if (bundle != null) {
str = bundle.getString(key);
}
} catch (MissingResourceException mre) {
//bad: shouldn't mask an exception the following way:
// str = "[cannot find message associated with key '" + key +
// "' due to " + mre + "]";
// because it hides the fact that the String was missing
// from the calling code.
//good: could just throw the exception (or wrap it in another)
// but that would probably cause much havoc on existing
// code.
//better: consistent with container pattern to
// simply return null. Calling code can then do
// a null check.
str = null;
}

return str;
}

/*错误码对应文件 tomcat下 org.apache.catalina.util.LocalStrings.properties
PropertyResourceBundle bundle会在服务启动加载到内存
*/
parameterMap.locked=No modifications are allowed to a locked ParameterMap

//类 java.util.PropertyResourceBundle
private final Map<String,Object> lookup;
public PropertyResourceBundle (Reader reader) throws IOException {
Properties properties = new Properties();
properties.load(reader);
lookup = new HashMap(properties);
}
public Object handleGetObject(String key) {
if (key == null) {
throw new NullPointerException();
}
return lookup.get(key);
}

POST方式

POST的请求参数是在请求体body中,而body参数是以流形式存在的。

1
2
3
4
5
6
7
8
9
Java复制代码ServletInputStream inputStream = httpRequest.getInputStream();
InputStreamReader reader = new InputStreamReader(inputStream,StandardCharsets.UTF_8);
BufferedReader bfReader = new BufferedReader(reader);
StringBuilder sb = new StringBuilder();
String line;
while ((line = bfReader.readLine()) != null){
sb.append(line);
}
System.out.println(sb.toString());

注意:异常 request body missing

过滤器获得POST请求参数,但是controller层报错。

httpRequest.getInputStream() 只能使用一次,再次使用报错

1
2
3
4
5
6
7
8
9
arduino复制代码InputStream read方法内部有一个postion标志。
当前流读取到的位置,每读取一次,位置就会移动一次,如果读到最后,InputStream.read方法会返回-1。

如果想再次读取,可调用inputstream.reset方法,position就会移动到上次调用mark的位置,mark默认是0,所以就能从头再读。

是否能reset又是由markSupported决定的,为true能reset,为false就不能reset。
从源码可以看到,markSupported是为false的,而且一调用reset就是直接异常。

需要将读取到的InputStream 保存下来,使用HttpServletRequestWrapper进行处理

继承并重写方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
Java复制代码@Component
@Slf4j
public class ParamFilter extends OncePerRequestFilter {

@Value("${app.self.name}")
private String selfName;

@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response,
FilterChain chain) throws ServletException, IOException {

RequestParameterWrapper requestWrapper=null;
String method=request.getMethod();
if("GET".equalsIgnoreCase(method)){
try {
requestWrapper = new RequestParameterWrapper(request, RequestMethod.GET);
Map<String, String[]> parameterMap = new HashMap<>(requestWrapper.getParameterMap());
log.info("get 参数={}", parameterMap.toString());
parameterMap.put("selfName", new String[]{selfName});
requestWrapper.setParameterMap(parameterMap);
}catch (Exception e){
e.printStackTrace();
}
}else if("POST".equals(method)){
try {
requestWrapper = new RequestParameterWrapper(request, RequestMethod.POST);
//获取post消息体
String body = requestWrapper.getBody();
JSONObject jsonObject = new JSONObject(body);
String dataAuthCode = (String) jsonObject.get("selfName");
if (StringUtils.isNotBlank(dataAuthCode)) {
chain.doFilter(request, response);
} else {
jsonObject.put("selfName", selfName);
requestWrapper.setBody(jsonObject);
}

}catch (Exception e){
e.printStackTrace();
}
}

if(requestWrapper == null) {
chain.doFilter(request, response);
} else {
chain.doFilter(requestWrapper, response);
}
}
}

继承HttpServletRequestWrapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
java复制代码public class RequestParameterWrapper extends HttpServletRequestWrapper {

private String body;

private Map<String, String[]> parameterMap;

public RequestParameterWrapper(HttpServletRequest request, RequestMethod method) throws Exception{
super(request);

if(RequestMethod.POST.equals(method)){
StringBuilder stringBuilder = new StringBuilder();
try(BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(request.getInputStream(),"UTF-8"));) {
char[] charBuffer = new char[128];
int bytesRead;
while ((bytesRead = bufferedReader.read(charBuffer)) > 0) {
stringBuilder.append(charBuffer, 0, bytesRead);
}
} catch (Exception ex) {
throw ex;
}
body = stringBuilder.toString();
}else if(RequestMethod.GET.equals(method)){
parameterMap=request.getParameterMap();
}

}

public String getBody() {
return this.body;
}

/**
* post方式使用
* @param object
*/
public void setBody(JSONObject object) {
this.body=object.toString();
}



@Override
public ServletInputStream getInputStream() throws IOException {
final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body.getBytes());
return new ServletInputStream() {
@Override
public boolean isFinished() {
return false;
}

@Override
public boolean isReady() {
return false;
}

@Override
public void setReadListener(ReadListener readListener) {

}

public int read() throws IOException {
return byteArrayInputStream.read();
}
};
}


@Override
public Enumeration<String> getParameterNames() {
Vector<String> vector = new Vector<String>(parameterMap.keySet());
return vector.elements();
}

@Override
public String getParameter(String name) {
String[] results = parameterMap.get(name);
return results[0];
}

@Override
public Map<String, String[]> getParameterMap() {
return parameterMap;
}

@Override
public String[] getParameterValues(String name) {
return parameterMap.get(name);
}

public void setParameterMap(Map<String, String[]> parameterMap) {
this.parameterMap = parameterMap;
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

2021 年 7 月程序员工资统计,平均 15302 元

发表于 2021-07-14

2021年7月全国招收程序员490325人。2021年7月全国程序员平均工资15302元,工资中位数14000元,其中96%的人的工资介于1750元到150000元。
在这里插入图片描述

01 主要城市
在这里插入图片描述
在这里插入图片描述
广州的工资下跌比较大。

02 职能
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
推荐算法才是最赚钱的。

作者 | 有数可据

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于springboot netty 实现的自定义通讯协议

发表于 2021-07-14

基于netty实现的自定义协议通讯协议

  1. 通讯协议定义

字段 占用的字节数 描述
帧 头 2 bytes 固定为 0x55 0xAA
长 度 2 bytes 长度 = 命令字 + 参数 + 校验和 ,不包括帧头和长度字节
命 令 1 bytes 0 心跳, 1 认证, 2 获取信息
参 数 0~65535 bytes 业务数据
校验和 2 bytes 校验和 = 帧头 + 长度 + 命令字 + 参数的字节累加和

框架功能

  1. 心跳机制
  2. TCP半包,黏包处理
  3. IP过滤
  4. 日志打印
  5. 自定义协议解析

业务描述

(1)Netty 协议栈客户端发送握手请求消息,携带认证信息;

(2)Netty 协议栈服务端对握手请求消息进行合法性校验,校验通过后,返回登录成功的握手应答消息;

(3)链路建立成功之后,客户端发送心跳消息, 客户端发送业务消息;

(6)服务端响应心跳和业务消息;

(7)服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接。

完整代码下载地址

代码截图

image.png

客户端启动代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码package com.king.netty.core.client;

import com.king.netty.core.DataFrameDecoder;
import com.king.netty.core.DataFrameEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
* @author King
* @date 2021/7/14
*/
public class NettyClient {

public static void main(String[] args) throws InterruptedException {
startServer();
}

static void startServer() throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
// 设置线程组
.group(group)
// 设置为NIO模式
.channel(NioSocketChannel.class)
// 设置pipeline中的全部的channelHandler
// 入站方向的channelHandler需要保证顺序
// 出站方向的channelHandler需要保证顺序
.handler(new ClientHandlerInit());
bootstrap.connect("127.0.0.1", 8888).sync();
}

static class ClientHandlerInit extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 日志打印
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
// LengthFieldBasedFrameDecoder 用于解决TCP黏包半包问题
pipeline.addLast(new LengthFieldBasedFrameDecoder(
65535, // maxFrameLength 消息最大长度
2, // lengthFieldOffset 指的是长度域的偏移量,表示跳过指定个数字节之后的才是长度域
2, // lengthFieldLength 记录该帧数据长度的字段,也就是长度域本身的长度
0, // lengthAdjustment 长度的一个修正值,可正可负,Netty 在读取到数据包的长度值 N 后, 认为接下来的 N 个字节都是需要读取的,但是根据实际情况,有可能需要增加 N 的值,也 有可能需要减少 N 的值,具体增加多少,减少多少,写在这个参数里
2 // initialBytesToStrip 从数据帧中跳过的字节数,表示得到一个完整的数据包之后,扔掉 这个数据包中多少字节数,才是后续业务实际需要的业务数据。
));
// 自定义协议解码器
pipeline.addLast(new DataFrameDecoder());
// 自定义协议编码器
pipeline.addLast(new DataFrameEncoder());
// 处理认证请求的handler
pipeline.addLast(new AuthorizationRequestHandler());
// 处理心跳的handler
pipeline.addLast(new HeartBeatRequestHandler());
// 客户端业务handler
pipeline.addLast(new ClientBusinessHandler());
}
}
}

客户端请求认证Handler代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码package com.king.netty.core.client;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
* @author King
* @date 2021/7/14
*/
public class AuthorizationRequestHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接成功后发起认证请求
ctx.writeAndFlush(DataFrame.getAuthorizationDataFrame());
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
// 处理认证响应
if (dataFrame.getCmd() == DataFrame.CMD_AUTHORIZATION) {
byte[] params = dataFrame.getParams();
if (! "success".equals(new String(params))){
// 认证失败,关闭连接
ReferenceCountUtil.release(msg);
ctx.close();
}
}
// 认证成功,继续传递消息
// 非认证的响应,交给后续业务处理
ctx.fireChannelRead(msg);
}
}

客户端心跳Handler代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码package com.king.netty.core.client;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.util.concurrent.TimeUnit;

/**
* @author King
* @date 2021/7/14
*/
public class HeartBeatRequestHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
switch (dataFrame.getCmd()){
// 如果是心跳应答, release掉, 因为后续的业务handler关心
case DataFrame.CMD_HEART_BEAT:
ReferenceCountUtil.release(msg);
break;
// 如果是认证成功的响应, 定时发送心跳
case DataFrame.CMD_AUTHORIZATION:
// 使用netty自带的任务处理器, 10s发送一次心跳
ctx.executor().scheduleAtFixedRate(() -> {
ctx.writeAndFlush(DataFrame.getHeartBeatDataFrame());
}, 0, 10, TimeUnit.SECONDS);
ctx.fireChannelRead(msg);
break;
default:
// 向后传递消息,让业务handler处理
ctx.fireChannelRead(msg);
break;
}
}
}

客户端业务Handler代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码package com.king.netty.core.client;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author King
* @date 2021/7/14
*/
public class ClientBusinessHandler extends ChannelInboundHandlerAdapter {

public static final Logger logger = LoggerFactory.getLogger(ClientBusinessHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
if (dataFrame.getCmd() == DataFrame.CMD_AUTHORIZATION) {
// 发送业务请求
ctx.writeAndFlush(new DataFrame(DataFrame.CMD_GET_INFO, "which language is the best ?".getBytes()));
}else {
// 打印服务器发送的消息
logger.debug("receive message: " + dataFrame);
}
ReferenceCountUtil.release(msg);
}
}

服务器启动代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
java复制代码package com.king.netty.core.server;

import com.king.netty.core.DataFrameDecoder;
import com.king.netty.core.DataFrameEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ipfilter.IpFilterRule;
import io.netty.handler.ipfilter.IpFilterRuleType;
import io.netty.handler.ipfilter.RuleBasedIpFilter;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/**
* @author King
* @date 2021/7/14
*/
@Component
public class NettyServer implements InitializingBean, DisposableBean {

private boolean started;
private Channel channel;
private NioEventLoopGroup parentGroup;
private NioEventLoopGroup childGroup;

@Override
public void destroy() throws Exception {
// spring销毁对象时调用stop释放服务器
if (started){
stopServer();
}
}

@Override
public void afterPropertiesSet() throws Exception {
// spring初始化对象后, 调用启动方法,启动服务
if (started){
return;
}
startServer();
}

void startServer() throws InterruptedException {
this.parentGroup = new NioEventLoopGroup();
this.childGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
// 设置线程组
.group(parentGroup, childGroup)
// 设置为NIO模式
.channel(NioServerSocketChannel.class)
// 设置TCP sync队列大小, 防止洪泛攻击
.childOption(ChannelOption.SO_BACKLOG, 1024)
// 设置pipeline中的全部的channelHandler
// 入站方向的channelHandler需要保证顺序
// 出站方向的channelHandler需要保证顺序
.childHandler(new ServerHandlerInit());
this.channel = serverBootstrap.bind(8888).sync().channel();
started = true;
}

void stopServer(){
try{
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
}finally {
this.parentGroup = null;
this.childGroup = null;
this.channel = null;
started = false;
}
}

static class ServerHandlerInit extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 日志打印
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
// IP过滤
pipeline.addLast(new RuleBasedIpFilter(new IpFilterRule() {
@Override
public boolean matches(InetSocketAddress remoteAddress) {
// 自定义IP地址拦截器, 非127开头的IP不允许连接
return ! remoteAddress.getHostName().startsWith("127");
}
@Override
public IpFilterRuleType ruleType() {
return IpFilterRuleType.REJECT;
}
}));
// LengthFieldBasedFrameDecoder 用于解决TCP黏包半包问题
pipeline.addLast(new LengthFieldBasedFrameDecoder(
65535, // maxFrameLength 消息最大长度
2, // lengthFieldOffset 指的是长度域的偏移量,表示跳过指定个数字节之后的才是长度域
2, // lengthFieldLength 记录该帧数据长度的字段,也就是长度域本身的长度
0, // lengthAdjustment 长度的一个修正值,可正可负,Netty 在读取到数据包的长度值 N 后, 认为接下来的 N 个字节都是需要读取的,但是根据实际情况,有可能需要增加 N 的值,也 有可能需要减少 N 的值,具体增加多少,减少多少,写在这个参数里
2 // initialBytesToStrip 从数据帧中跳过的字节数,表示得到一个完整的数据包之后,扔掉 这个数据包中多少字节数,才是后续业务实际需要的业务数据。
));
// 设置心跳的超时时间 30s, 如果30s内未收到心跳则会抛出ReadTimeoutException
pipeline.addLast(new ReadTimeoutHandler(30));
// 自定义协议解码器
pipeline.addLast(new DataFrameDecoder());
// 自定义协议编码器
pipeline.addLast(new DataFrameEncoder());
// 认证处理
pipeline.addLast(new AuthorizationResponseHandler());
// 心跳处理
pipeline.addLast(new HeartBeatResponseHandler());
// 业务处理handler
pipeline.addLast(new ServerBusinessHandler());
}
}
}

服务器认证处理Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码package com.king.netty.core.server;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
* @author King
* @date 2021/7/14
*/
public class AuthorizationResponseHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
if (dataFrame.getCmd() == DataFrame.CMD_AUTHORIZATION) {
String auth = "{\"username\":\"test\", \"password\":\"abcdef\"}";
byte[] params = dataFrame.getParams();
if (auth.equals(new String(params))){
// 认证成功
ctx.writeAndFlush(new DataFrame(dataFrame.getCmd(), "success".getBytes()));
}else {
// 认证失败
ctx.writeAndFlush(new DataFrame(dataFrame.getCmd(), "fail".getBytes()));
}
// 释放消息
ReferenceCountUtil.release(msg);
}else {
// 非认证的请求,交给后续业务处理
ctx.fireChannelRead(msg);
}
}
}

服务器心跳处理Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码package com.king.netty.core.server;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;

/**
* @author King
* @date 2021/7/14
*/
public class HeartBeatResponseHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
// 如果是心跳请求, release掉, 因为后续的业务handler关心
if (dataFrame.getCmd() == DataFrame.CMD_HEART_BEAT) {
ctx.writeAndFlush(DataFrame.getHeartBeatDataFrame());
ReferenceCountUtil.release(msg);
} else {// 向后传递消息,让业务handler处理
ctx.fireChannelRead(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ReadTimeoutException){
// 断开客户端连接
ctx.close();
return;
}
super.exceptionCaught(ctx, cause);
}
}

服务器业务处理Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码package com.king.netty.core.server;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author King
* @date 2021/7/14
*/
public class ServerBusinessHandler extends ChannelInboundHandlerAdapter {

public static final Logger logger = LoggerFactory.getLogger(ServerBusinessHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
logger.debug("receive message: " + dataFrame);

// 返回客户端数据
DataFrame response = doBusiness(dataFrame);
ctx.writeAndFlush(response);
}

private DataFrame doBusiness(DataFrame dataFrame){
// 处理自己的业务
// todo
// 响应客户端
return new DataFrame(dataFrame.getCmd(), "java is the best language".getBytes());
}
}

协议定义代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
java复制代码package com.king.netty.core;

import lombok.Data;

/**
* @author King
* @date 2021/7/14
*/
@Data
public class DataFrame {

public static final byte CMD_HEART_BEAT = 0;
public static final byte CMD_AUTHORIZATION = 1;
public static final byte CMD_GET_INFO = 2;

/**
* 帧 头 长 度 命 令 参 数 校验和
* 0x55 0xAA 2byte 1byte 0~1476bytes 2bytes
*
* 长度 = 命令字 + 参数 + 校验和 ,不包括帧头和长度字节;
* 校验和 = 帧头 + 长度 + 命令字 + 参数的字节累加和。
*
*/

public static final byte[] HEADER = new byte[] {0b01010101, (byte) 0b10101010};

private byte cmd;

private byte[] params;

private int crc;

public DataFrame(byte cmd, byte[] params, int crc) {
this.cmd = cmd;
this.params = params;
this.crc = crc;
}

public DataFrame(byte cmd, byte[] params) {
this.cmd = cmd;
this.params = params;
this.crc = getCrc();
}

public boolean checkCrc(){
return getCrc() == this.crc;
}

public int getLength() {
// 长度 = 命令字 + 参数 + 校验和 ,不包括帧头和长度字节;
return 1 + params.length + 2;
}

public int getCrc(){
// 校验和 = 帧头 + 长度 + 命令字 + 参数的字节累加和。
int crc = 0;
// 帧头
crc += 0b01010101;
crc += 0b10101010;
// 长度
crc += getLength();
// 参数和
for (byte b: params){
crc += (b & 0xFF);
}
return crc;
}

public static DataFrame getHeartBeatDataFrame(){
return new DataFrame(DataFrame.CMD_HEART_BEAT, new byte[]{});
}

public static DataFrame getAuthorizationDataFrame(){
String msg = "{\"username\":\"test\", \"password\":\"abcdef\"}";
return new DataFrame(DataFrame.CMD_AUTHORIZATION, msg.getBytes());
}

@Override
public String toString() {
return "DataFrame{" +
"cmd=" + cmd +
", params=" + new String(params) +
'}';
}
}

协议解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码package com.king.netty.core;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
* @author King
* @date 2021/7/14
*/
public class DataFrameDecoder extends ByteToMessageDecoder {

/**
* 帧 头 长 度 命 令 参 数 校验和
* 0x55 0xAA 2byte 1byte 0~1476bytes 2bytes
*
* 长度 = 命令字 + 参数 + 校验和 ,不包括帧头和长度字节;
* 校验和 = 帧头 + 长度 + 命令字 + 参数的字节累加和。
*
*/

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 长度位
int length = in.readShort();
// 命令位
byte cmd = in.readByte();
// 参数
byte[] params = new byte[length-3];
in.readBytes(params);
// 校验和
int crc = in.readShort();
DataFrame dataFrame = new DataFrame(cmd, params, crc);
// 计算校验和
if (dataFrame.checkCrc()){
// 将解析后的数据加入到list中,传递给后续的channelHandler
out.add(dataFrame);
};
}
}

协议编码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码package com.king.netty.core;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
* @author King
* @date 2021/7/14
*/
public class DataFrameEncoder extends MessageToByteEncoder<DataFrame> {

@Override
protected void encode(ChannelHandlerContext ctx, DataFrame msg, ByteBuf out) throws Exception {
// 写出帧头
out.writeBytes(DataFrame.HEADER);
// 写出长度
out.writeShort(msg.getLength());
// 写出命令
out.writeByte(msg.getCmd());
// 参 数
out.writeBytes(msg.getParams());
// 校验和
out.writeShort(msg.getCrc());
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python Socket编程|Python 主题月

发表于 2021-07-14

本文正在参加「Python主题月」,详情查看活动链接

背景

关于Python Socket编程,首先需要了解几个计算机网络的知识,通过以下的几个问题,有助于更好的理解Socket编程的意义,以及整个框架方面的知识:

  • TCP和UDP协议本质上的区别?

TCP协议,面向连接,可靠,基于字节流的传输层通信协议;UDP协议无连接,不可靠,基于数据包的传输层协议。
TCP协议在建立连接的过程需要经历三次握手,断开连接则需要经历四次挥手,而这建立连接的过程增加了传输过程中的安全性。而建立连接的过程则会消耗系统的资源,消耗更多的时间,而相比较UDP协议传输过程则不会出现这种问题。
总结来讲,基于TCP协议传输,需要不断的确认对方是否收到信息,从而建立连接(确认过程次数有限制,即三次握手),UDP协议传输则不需要确认接收端是否收到信息,只需要将信息发给对方。

  • TCP/IP协议栈、HTTP协议、Socket之间的区别和联系?

TCP/IP协议栈就是一系列网络协议,可以分为四层模型来分析:应用层、传输层、网络层、链路层;
HTTP协议(超文本传输协议)就是在这一协议栈中的应用层协议;HTTP协议简单来说,它的作用就是规范数据的格式,让程序能够方便的识别,并且收发双方都需要遵循同样的协议格式进行数据传输。(应用层的协议也和HTTP协议的作用类似,不一样的是定义不同的数据格式。)
Socket可以理解为TCP/IP协议栈提供的对外的操作接口,即应用层通过网络协议进行通信的接口。Socket可以使用不同的网络协议进行端对端的通信;

  • TCP Socket服务器的通信过程?

Server端:
建立连接(socket()函数创建socket描述符、bind()函数绑定特定的监听地址(ip+port)、listen()函数监听socket、accept()阻塞等待客户端连接)
数据交互(read()函数阻塞等待客户端发送数据、write()函数发送给客户端数据)
Client端:
建立连接(socket()函数创建socket描述符、connect()函数向指定的监听地址发送连接请求)
数据交互(wirte()函数发送服务端数据、read()函数足阻塞等待接受服务端发送的数据)

  • socket和websocket之间的联系?

webosocket是一种通信协议,不同于HTTP请求,客户端请求服务端资源,服务端响应的通信过程;websocket允许服务端主动向客户端推送消息,同时做到客户端和服务端双向通讯的协议。(具体底层原理有待后面实践,暂时未接触)

  • HTTP,WSGI协议的联系和区别?

HTTP协议(超文本传输协议),属于TCP/IP协议栈中应用层的协议。用于规范传输数据的格式,是一种客户端和服务端传输的规则。
WSGI协议则是Python定义的Web服务器和框架程序通信的接口规则。两者联系不大,强行说的话,Python框架程序主要处理的是HTTP请求。
(后期可以实现一个WSGI协议的Python框架,用于处理HTTP请求的实验。)

  • 主流Web框架,异步Web框架?

主流Web框架:Django、Flask

异步Web框架:Tornado(内置异步模块)、Snaic(Python自带asyncio)、FastAPI(基于Starlette库) 、aiohttp(基于asyncio)

  • asyncio,aiohttp之间的联系?(异步编程)

asyncio是一个异步IO库,aiohttp就是基于asyncio的异步HTTP框架(支持客户端/服务端)

代码设计

Python提供了基本的socket模块:

  1. socket模块;提供了标准的BSD Sockets API;
  2. socketserver模块:提供了服务器中心类,简化服务器的开发;

TCP Socket服务端

socket模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
python复制代码# -*- coding: utf-8 -*-
from socket import socket, AF_INET, SOCK_STREAM

def echo_handler(sock ,address):
print("Get Connection from address:", address)

while True:
response = sock.recv(8192)
if not response:
break
print(f"Got {response}")
sock.sendall(response)

def echo_server(address, back_log=5):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(address)
sock.listen(back_log)

while True:
sock_client, address = sock.accept()
echo_handler(sock_client, address)

if __name__ == "__main__":
echo_server(('localhost', 5000))

代码详解:

  • 创建一个基于IPV4和TCP协议的Socket,这里AF_INET指的是使用IPV4协议,SOCK_STREAM指定使用面向流的TCP协议,绑定监听端口,设置等待连接的最大数量
  • 创建一个永久循环,获取客户端请求的连接,accept()会等待并返回一个客户端的连接;
  • 连接建立后,等待客户端数据,接受完客户端数据,然后返回数据给客户端,最后关闭连接

存在的问题:当出现多个客户端请求时,由于是单个线程会发生阻塞的情况,所以如果需要多线程处理多个客户端请求,可以这样改;

1
2
3
4
5
6
python复制代码from threading import Thread

while True:
client_sock, address = sock.accept()
thread = Thread(target=echo_handler, args=(client_sock, address))
thread.start()

这样的话,就会在每个客户端请求的时候,生成一个子线程然后处理请求;
(但是存在一个问题:当突然大量请求连接,消耗系统资源达到上限后,很可能造成程序无法处理后续请求。)

socketserver模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
python复制代码# -*- coding: utf-8 -*-

from socketserver import BaseRequestHandler, StreamRequestHandler
from socketserver import TCPServer, ThreadingTCPServer


class SingleHandler(BaseRequestHandler):
def handle(self):
print("Got Connections From: %s" % str(self.client_address))

while True:
msg = self.request.recv(8192)
print(f"Got msg: {msg}")
if not msg:
break
self.request.send(msg)

class StreamHandler(StreamRequestHandler):
def handle(self):
print("Got Connection From: %s" % str(self.client_address))

for line in self.rfile:
print(line)
self.wfile.write(line)


if __name__ == "__main__":
# server = TCPServer(("", 5000), SingleHandler)
# server = TCPServer(("", 5000), StreamHandler)
server = ThreadingTCPServer(("", 5000), StreamHandler)
server.serve_forever()

代码详解:

  • 处理多个客户端,初始化一个ThreadingTCPServer实例,ThreadingTCPServer处理客户端的连接,会为每个客户端创建一个线程进行交互。
  • 设置绑定的IP地址和端口,以及处理类;
  • 可以直接使用BaseRequestHandler,这个所有请求处理类的父类,子类处理请求则需要重写handle()方法,该模块是和服务类组合来处理请求;
  • 使用StreamRequestHandler(使用流的请求处理程序类,类似file-like对象,提供标准文件接口简化通信过程),重写里面的handle方法,获取请求数据,返回数据给客户端;
  • 使用StreamRequestHandler处理类时,在读取客户端发送的数据,会将recv()多次调用,直到遇到换行符为止,所以客户端在发送数据的末尾需要加上换行符

TCP Socket客户端

socket模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
python复制代码# -*- coding: utf-8 -*-
from socket import socket, AF_INET, SOCK_STREAM
import time

def request_handler():
start_time = time.time()
sock_client = socket(AF_INET, SOCK_STREAM)
sock_client.connect(('localhost', 5000))

book_content = ""
with open("send_books.txt", "r") as f:
book_content = f.read()

content_list = book_content.split("\n")
for content in content_list:
if content:
# 要在每段发送的内容结尾加上换行符,用于StreamRequestHandler识别
sock_client.send((content+"\n").encode())
time.sleep(1)
response = sock_client.recv(8192)
print(response)

end_time = time.time()
print("总共耗时:", end_time-start_time)

if __name__ == "__main__":
request_handler()

UDP Socket

Socket模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
python复制代码# -*- coding: utf-8

from socket import socket, AF_INET, SOCK_DGRAM


def echo_handler(address):
server = socket(AF_INET, SOCK_DGRAM)
server.bind(address)

while True:
msg, addr = server.recvfrom(8192)
if not msg:
break

print(f"Got Message From: {addr} \n {msg}")
server.sendto(msg, addr)

if __name__ == "__main__":
echo_handler(("", 8888))

代码不详解,和之前的差不多,注意不同的协议就完事了

客户端测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
python复制代码# -*- coding: utf-8 -*-

from socket import socket, AF_INET, SOCK_DGRAM
import time


def request_handler(addr):
client = socket(AF_INET, SOCK_DGRAM)

book_content = ""
with open("send_books.txt", "r") as f:
book_content = f.read()

book_list = book_content.split("\n")
for content in book_list:
if content:
client.sendto(content.encode(), addr)
response = client.recv(8192)
print(response)
time.sleep(1)

if __name__ == "__main__":
addr = ("localhost", 8888)
request_handler(addr)

socketserver模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
python复制代码# -*- coding: utf-8 -*-
from socketserver import BaseRequestHandler, UDPServer

class EchoHandler(BaseRequestHandler):

def handle(self):
print(f"Got Connections From: {self.client_address}")

data, sock = self.request
print(data)

if data:
sock.sendto(data, self.client_address)

if __name__ == "__main__":
server = UDPServer(("", 8888), EchoHandler)
server.serve_forever()

代码不在赘述,如果需要多线程处理并发操作可以使用ThreadingUDPServer

总结

关于本篇介绍Python Socket编程,大都是皮毛,只是谈到了Python实际处理socket的几个模块,
关于socket底层方面的知识并未提及,先了解个大概,从实际使用方面出发,在实际使用过程中结合计算机网络知识,能够理解socket在整个TCP/IP协议栈中的作用。
socket和socketserver模块都可以用来编写网络程序,不同的是socketserver省事很多,你可以专注业务逻辑,不用去理会socket的各种细节,包括不限于多线程/多进程,接收数据,发送数据,通信过程。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…607608609…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%