Mysql 学习 Client 端调用主流程

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 .前言

出于好奇 , 这一篇来看一下 MySQL Client 端的调用主流程 , 为后续的 MySQL 系列文档开个头

二 . 创建 Connect

以获取连接为例 , 当获取连接时 , 会通过多种方式调用 Spring 的 DataSourceUtils # getConnection

此时还处在 Spring 的业务体系中. Connect 的流程在启动时创建和运行时调用是两个完全不同的流程 , 先来看一 CreateConnect 的主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码// Step 1 : Connect 的创建入口
public static Connection doGetConnection(DataSource dataSource) throws SQLException {

// ...
Connection con = fetchConnection(dataSource);
// ...

}


// Step 2 : 连接池处理
// 如果使用了连接池 , 此时的连接会交给对应连接池来处理
pool = result = new HikariPool(this);


> 以下跳过连接池的相关原理 ,直接看到 com.mysql.cj.jdbc.Driver 的核心处理流程


// Step 3 : MySQL 驱动入口
public java.sql.Connection connect(String url, Properties info) throws SQLException {

try {
if (!ConnectionUrl.acceptsUrl(url)) {
/*
* According to JDBC spec:
* The driver should return "null" if it realizes it is the wrong kind of driver to connect to the given URL. This will be common, as when the
* JDBC driver manager is asked to connect to a given URL it passes the URL to each loaded driver in turn.
*/
return null;
}

ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info);
// 根据类型创建不同的连接方式
switch (conStr.getType()) {
case SINGLE_CONNECTION:
return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost());

case LOADBALANCE_CONNECTION:
return LoadBalancedConnectionProxy.createProxyInstance((LoadbalanceConnectionUrl) conStr);

case FAILOVER_CONNECTION:
return FailoverConnectionProxy.createProxyInstance(conStr);

case REPLICATION_CONNECTION:
return ReplicationConnectionProxy.createProxyInstance((ReplicationConnectionUrl) conStr);

default:
return null;
}

} catch (UnsupportedConnectionStringException e) {
// when Connector/J can't handle this connection string the Driver must return null
return null;

} catch (CJException ex) {
throw ExceptionFactory.createException(UnableToConnectException.class,
Messages.getString("NonRegisteringDriver.17", new Object[] { ex.toString() }), ex);
}
}

三 . 运行 SQL 流程

这一节来看一下执行 SQL 时的调用流程

在一个 SQL 的生命周期中 , 主要有2个主要的流程 :

  • 流程一 : 基于事务起点的 SET autocommit
  • 流程二 : 真正核心的 SQL 执行语句

3.1 事务的入口

Spring 的事务起点是 TransactionAspectSupport , 进入一系列流程后 , 会进入连接池的处理中 , 这里涉及到 SpringTransaction 的流程 , 可以看看这篇文章 基于 Spring 的事务管理, 这里只是简单过一下

  • Step 1 : TransactionImpl # begin : 由 Begin 开启事务流程
  • Step 2 :ConnectionImpl # setAutoCommit : 开启自动提交流程
  • Step 3 : NativeSession # execSQL : 进入 SQL 执行核心流程
1
2
3
4
5
6
java复制代码public void begin() {
if ( !doConnectionsFromProviderHaveAutoCommitDisabled() ) {
getConnectionForTransactionManagement().setAutoCommit( false );
}
status = TransactionStatus.ACTIVE;
}

事务会调用setAutoCommit , 其根本也是调用一个execSQL 语句来控制事务

1
java复制代码this.session.execSQL(null, autoCommitFlag ? "SET autocommit=1" : "SET autocommit=0", -1, null, false, this.nullStatementResultSetFactory, this.database, null, false);

到了这里会直接调用到 execSQL , 而流程二的普通 SQL 语句 , 会由对应的 executeUpdate / executeQuery /executeInternal 发起流程处理

3.2 普通业务执行流程

  • Step 1 : AbstractEntityPersister # insert : 由 Hibernate/JPA 发起的操作流程
  • Step 2 : ResultSetReturnImpl # executeUpdate : 执行 Update 语句 (或者 Query -> executeQuery)
  • Step 3 : HikariProxyPreparedStatement # executeUpdate : 连接池的中间处理 , 后续可以专门看看
  • Step 4 : ClientPreparedStatement # executeUpdate : 由 mysql 驱动接管
  • Step 5 : ClientPreparedStatement # executeUpdateInternal :
  • Step 5 : ClientPreparedStatement # executeInternal : 由底层方法调用抽象类 , 最终调用 execSQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码// 处理 Update 语句 , 核心流程如下 : 
protected long executeUpdateInternal(QueryBindings<?> bindings, boolean isReallyBatch) throws SQLException {

// 1. 获取 JDBC 连接
JdbcConnection locallyScopedConn = this.connection;

// 2. 解析出要发送给 MySQL 的语句包
Message sendPacket = ((PreparedQuery<?>) this.query).fillSendPacket(bindings);

// 3. 调用通过处理方法执行 SQL
rs = executeInternal(-1, sendPacket, false, false, null, isReallyBatch);

// 4. 设置处理结果
this.results = rs;

// 设置更新数量 ,这里会对重复语句进行统计处理
this.updateCount = rs.getUpdateCount();

// 5. 获取最后插入的 ID
this.lastInsertId = rs.getUpdateID();

// 6. 返回更新数量
return this.updateCount;

}

executeInternal 主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码protected <M extends Message> ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, M sendPacket, boolean createStreamingResultSet,
boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException {

// Step 1 : 获取连接
JdbcConnection locallyScopedConnection = this.connection;

// Step 2 : 获取插入值得绑定关系
((PreparedQuery<?>) this.query).getQueryBindings()
.setNumberOfExecutions(((PreparedQuery<?>) this.query).getQueryBindings().getNumberOfExecutions() + 1);

// Step 3 :设置返回结果设置方法
ResultSetInternalMethods rs;

// Step 4 : 设置超时方法
CancelQueryTask timeoutTask = startQueryTimer(this, getTimeoutInMillis());

// Step 5 : 调用具体的 SQL 执行语句
rs = ((NativeSession) locallyScopedConnection.getSession()).execSQL(this, null, maxRowsToRetrieve, (NativePacketPayload) sendPacket,
createStreamingResultSet, getResultSetFactory(), this.getCurrentCatalog(), metadata, isBatch);

// Step 6 : 超时时间处理 ,省略
}

3.3 业务处理通用流程

看了入口方法, 现在来看一下 execQuery 的具体处理流程 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码// C- NativeSession
public <T extends Resultset> T execSQL(Query callingQuery, String query, int maxRows, NativePacketPayload packet, boolean streamResults,
ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory, String catalog, ColumnDefinition cachedMetadata, boolean isBatch) {

//
int endOfQueryPacketPosition = endOfQueryPacketPosition = packet.getPosition();

//
long queryStartTime = System.currentTimeMillis();

// 如果 packet == null , 调用如下处理
return ((NativeProtocol) this.protocol).sendQueryString(callingQuery, query, encoding, maxRows, streamResults, catalog, cachedMetadata,
this::getProfilerEventHandlerInstanceFunction, resultSetFactory);

//
return ((NativeProtocol) this.protocol).sendQueryPacket(callingQuery, packet, maxRows, streamResults, catalog, cachedMetadata,
this::getProfilerEventHandlerInstanceFunction, resultSetFactory);

}

在上文调用 sendQueryPacket 发起了SQL 执行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码// C- NativeProtocol
public final <T extends Resultset> T sendQueryPacket(Query callingQuery, NativePacketPayload queryPacket, int maxRows, boolean streamResults,
String catalog, ColumnDefinition cachedMetadata, GetProfilerEventHandlerInstanceFunction getProfilerEventHandlerInstanceFunction,
ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
this.statementExecutionDepth++;

byte[] queryBuf = null;
int oldPacketPosition = 0;
long queryStartTime = 0;
long queryEndTime = 0;

queryBuf = queryPacket.getByteBuffer();
oldPacketPosition = queryPacket.getPosition(); // save the packet position

// 查询启动时间
queryStartTime = getCurrentTimeNanosOrMillis();

// 查询语句
LazyString query = new LazyString(queryBuf, 1, (oldPacketPosition - 1));

// 发送命令
NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);

// 获取所有的 Result 结果
T rs = readAllResults(maxRows, streamResults, resultPacket, false, cachedMetadata, resultSetFactory);

// 反射拦截器
T interceptedResults = invokeQueryInterceptorsPost(query, callingQuery, rs, false);

// 返回结果
return rs;

}

3.5 发送命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码// C- NativeProtocol
public final NativePacketPayload sendCommand(Message queryPacket, boolean skipCheck, int timeoutMillis) {

int command = queryPacket.getByteBuffer()[0];
this.commandCount++;

if (this.queryInterceptors != null) {
NativePacketPayload interceptedPacketPayload = (NativePacketPayload) invokeQueryInterceptorsPre(queryPacket, false);

if (interceptedPacketPayload != null) {
return interceptedPacketPayload;
}
}

this.packetReader.resetMessageSequence();

// 获取旧 timeout 时间并且设置新的超时时间
// PS : 这里的 oldTimeout 在 finally 中会再次设置 soTimeout
int oldTimeout = 0;
oldTimeout = this.socketConnection.getMysqlSocket().getSoTimeout();
this.socketConnection.getMysqlSocket().setSoTimeout(timeoutMillis);

//
checkForOutstandingStreamingData();

// 设置互斥锁
this.serverSession.setStatusFlags(0, true);

// 清空输入流
clearInputStream();
this.packetSequence = -1;

// 发起包
send(queryPacket, queryPacket.getPosition());

// 获取 Return 结果
// 1. resultPacket = readMessage(this.reusablePacket)
// 2. checkErrorMessage(resultPacket)
NativePacketPayload returnPacket = checkErrorMessage(command);

return returnPacket;
}
1
2
3
4
5
6
7
java复制代码// C- NativeProtocol
public final void send(Message packet, int packetLen) {
//....

// 通过 Sender 发送远程包
this.packetSender.send(packet.getByteBuffer(), packetLen, this.packetSequence);
}

C- SimplePacketSender

1
2
3
4
5
6
7
8
9
10
11
JAVA复制代码public void send(byte[] packet, int packetLen, byte packetSequence) throws IOException {
PacketSplitter packetSplitter = new PacketSplitter(packetLen);

// 持续从远程读取包
while (packetSplitter.nextPacket()) {
this.outputStream.write(NativeUtils.encodeMysqlThreeByteInteger(packetSplitter.getPacketLen()));
this.outputStream.write(packetSequence++);
this.outputStream.write(packet, packetSplitter.getOffset(), packetSplitter.getPacketLen());
}
this.outputStream.flush();
}

3.6 解析 Result

Step 1 : packetReader.readMessage

这里的 packetReader 主要包含以下几种实现 MultiPacketReader :

MySQL-MessageReader.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
JAVA复制代码public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {

// 获取长度和 Message 实现对哦下
int packetLength = header.getMessageSize();
NativePacketPayload buf = this.packetReader.readMessage(reuse, header);

// 此处通过 do-while 进行循环获取
do {

//......
this.packetReader.readMessage(Optional.of(multiPacket), hdr);
// 写入 byte 数据
buf.writeBytes(StringLengthDataType.STRING_FIXED, multiPacket.getByteBuffer(), 0, multiPacketLength);
// 循环获取 , 直到最大长度 -> MAX_PACKET_SIZE = 256 * 256 * 256 - 1;
} while (multiPacketLength == NativeConstants.MAX_PACKET_SIZE);

return buf;
}

checkErrorMessage 判断是否为错误返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码public void checkErrorMessage(NativePacketPayload resultPacket) {

resultPacket.setPosition(0);

// 获取状态嘛
byte statusCode = (byte) resultPacket.readInteger(IntegerDataType.INT1);

// Error handling
// 此处通过状态码判断是否为异常结果
if (statusCode == (byte) 0xff) {
// 省略 error 处理环节


// 此处会通过状态和异常处理的结果抛出对应的异常 , 该方法没有具体的返回值
if (xOpen != null) {
if (xOpen.startsWith("22")) {
throw new DataTruncationException(errorBuf.toString(), 0, true, false, 0, 0, errno);
}

if (errno == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD) {
throw ExceptionFactory.createException(PasswordExpiredException.class, errorBuf.toString(), getExceptionInterceptor());

} else if (errno == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD_LOGIN) {
throw ExceptionFactory.createException(ClosedOnExpiredPasswordException.class, errorBuf.toString(), getExceptionInterceptor());
}
}

throw ExceptionFactory.createException(errorBuf.toString(), xOpen, errno, false, null, getExceptionInterceptor());

}
}

获取最终返回结果

1
2
3
4
5
6
7
8
9
10
java复制代码public <T extends Resultset> T readAllResults(int maxRows, boolean streamResults, NativePacketPayload resultPacket, boolean isBinaryEncoded,
ColumnDefinition metadata, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {

// 调用具体的实现类获取最终结果 , 结果会被放在 rowData 中
T topLevelResultSet = read(Resultset.class, maxRows, streamResults, resultPacket, isBinaryEncoded, metadata, resultSetFactory);


}

// 这里的 Read 有多种 , 后面再来看一看如何实现 Byte 读取的

image.png

总结

东西不多 , 主要是一些主流程代码 , 其中很多环节都比较模糊 , 主要是为了串联整个流程 , 后续小细节会抽空深入的了解一下.

相对于其他的框架代码 , Mysql 的代码看起来很生涩难懂 , 每个主流程间参杂了很多辅助属性 , 如果业务上出现了问题 , 又不确定最终的执行语句 ,可以考虑在 sendCommand 等方法中添加断点

连接池的使用通常在 ResultSetReturnImpl 和 ClientPreparedStatement 之间进行处理 , 后续可以关注一下

image.png

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%