开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

教你用Python发现即将流失的客户 烦恼 环境 清理 决策

发表于 2017-11-28

烦恼

作为一名数据分析师,你来到这家跨国银行工作已经半年了。

今天上午,老板把你叫到办公室,面色凝重。

你心里直打鼓,以为自己捅了什么篓子。幸好老板的话让你很快打消了顾虑。

他发愁,是因为最近欧洲区的客户流失严重,许多客户都跑到了竞争对手那里接受服务了。老板问你该怎么办?

你脱口而出“做好客户关系管理啊!”

老板看了你一眼,缓慢地说“我们想知道哪些客户最可能在近期流失”。

没错,在有鱼的地方钓鱼,才是上策。

你明白了自己的任务——通过数据锁定即将流失的客户。这个工作,确实是你这个数据分析师分内的事儿。

你很庆幸,这半年做了很多的数据动态采集和整理工作,使得你手头就有一个比较完备的客户数据集。

下面你需要做的,就是如何从数据中“沙里淘金”,找到那些最可能流失的客户。

可是,该怎么做呢?

你拿出欧洲区客户的数据,端详起来。

75a0cb231e70d9b3b4a4c8f8e73c75972d221bb0

客户主要分布在法国、德国和西班牙。

你手里掌握的信息,包括他们的年龄、性别、信用、办卡信息等。客户是否已流失的信息在最后一列(Exited)。

怎么用这些数据来判断顾客是否会流失呢?

以你的专业素养,很容易就判断出这是一个分类问题,属于机器学习中的监督式学习。但是,你之前并没有做过实际项目,该如何着手呢?

别发愁,我一步步给你演示如何用Python和深度神经网络(或者叫“深度学习”)来完成这个分类任务,帮你锁定那些即将流失的客户。

环境

工欲善其事,必先利其器。我们先来安装和搭建环境。

首先是安装Python。

请到这个网址下载Anaconda的最新版本。

95ce1d08113fad1858204c9daecd771b5435170f

请选择左侧的Python 3.6版本下载安装。

其次是新建文件夹,起名为demo-customer-churn-ann,并且从这个链接下载数据,放到该文件夹下。

(注:样例数据来自于匿名化处理后的真实数据集,下载自superdatascience官网。)

打开终端(或者命令行工具),进入demo-customer-churn-ann目录,执行以下命令:

1
复制代码

jupyter notebook

浏览器中会显示如下界面:

8e773058a898629c56e037bba6928486c92a0b6b

点击界面右上方的New按钮,新建一个Python 3 Notebook,起名为customer-churn-ann。

fbae258d0458c314c966dfffa269256d44ae9cf2

准备工作结束,下面我们开始清理数据。

清理

首先,读入数据清理最常用的pandas和numpy包。

1
复制代码

import numpy as npimport pandas as pd

从customer_churn.csv里读入数据:

1
复制代码

df = pd.read_csv(‘customer_churn.csv’)

看看读入效果如何:

1
复制代码

df.head()

这里我们使用了head()函数,只显示前5行。

d423c8a9ca1718137a747343b6ce21a8c30b71a7

可以看到,数据完整无误读入。但是并非所有的列都对我们预测用户流失有作用。我们一一甄别一下:

  • RowNumber:行号,这个肯定没用,删除
  • CustomerID:用户编号,这个是顺序发放的,删除
  • Surname:用户姓名,对流失没有影响,删除
  • CreditScore:信用分数,这个很重要,保留
  • Geography:用户所在国家/地区,这个有影响,保留
  • Gender:用户性别,可能有影响,保留
  • Age:年龄,影响很大,年轻人更容易切换银行,保留
  • Tenure:当了本银行多少年用户,很重要,保留
  • Balance:存贷款情况,很重要,保留
  • NumOfProducts:使用产品数量,很重要,保留
  • HasCrCard:是否有本行信用卡,很重要,保留
  • IsActiveMember:是否活跃用户,很重要,保留
  • EstimatedSalary:估计收入,很重要,保留
  • Exited:是否已流失,这将作为我们的标签数据

上述数据列甄别过程,就叫做“特征工程”(Feature Engineering),这是机器学习里面最常用的数据预处理方法。如果我们的数据量足够大,机器学习模型足够复杂,是可以跳过这一步的。但是由于我们的数据只有10000条,还需要手动筛选特征。

选定了特征之后,我们来生成特征矩阵X,把刚才我们决定保留的特征都写进来。

1
复制代码

X = df.loc[:,[‘CreditScore’, ‘Geography’, ‘Gender’, ‘Age’, ‘Tenure’, ‘Balance’, ‘NumOfProducts’, ‘HasCrCard’, ‘IsActiveMember’, ‘EstimatedSalary’]]

看看特征矩阵的前几行:

1
复制代码

X.head()

显示结果如下:

b29aa38a1355879e667132e83ddc5e35cff5986e

特征矩阵构建准确无误,下面我们构建目标数据y,也就是用户是否流失。

1
复制代码

y = df.Exited

1
复制代码

y.head()

1
复制代码

0 11 02 13 04 0Name: Exited, dtype: int64

此时我们需要的数据基本上齐全了。但是我们发现其中有几列数据还不符合我们的要求。

要做机器学习,只能给机器提供数值,而不能是字符串。可是看看我们的特征矩阵:

1
复制代码

X.head()

da0d2b3f6ec925412a5e11be0b2806fa3b9ef2ff

显然其中的Geography和Gender两项数据都不符合要求。它们都是分类数据。我们需要做转换,把它们变成数值。

在Scikit-learn工具包里面,专门提供了方便的工具LabelEncoder,让我们可以方便地将类别信息变成数值。

1
复制代码

from sklearn.preprocessing import LabelEncoder, OneHotEncoderlabelencoder1 = LabelEncoder()X.Geography= labelencoder1.fit_transform(X.Geography)labelencoder2 = LabelEncoder()X.Gender = labelencoder2.fit_transform(X.Gender)

我们需要转换两列,所以建立了两个不同的labelencoder。转换的函数叫做fit_transform。

经过转换,此时我们再来看看特征矩阵的样子:

1
复制代码

X.head()

3408605304626ffe7f34beb89f9631cf6ed23d48

显然,Geography和Gender这两列都从原先描述类别的字符串,变成了数字。

这样是不是就完事大吉了呢?显然,Geography和Gender这两列都从原先描述类别的字符串,变成了数字。

不对,Gender还好说,只有两种取值方式,要么是男,要么是女。我们可以把“是男性”定义为1,那么女性就取值为0。两种取值只是描述类别不同,没有歧义。

而Geography就不同了。因为数据集里面可能的国家地区取值有3种,所以就转换成了0(法国)、1(德国)、2(西班牙)。问题是,这三者之间真的有序列(大小)关系吗?

答案自然是否定的。我们其实还是打算用数值描述分类而已。但是取值有数量的序列差异,就会给机器带来歧义。它并不清楚不同的取值只是某个国家的代码,可能会把这种大小关系带入模型计算,从而产生错误的结果。

解决这个问题,我们就需要引入OneHotEncoder。它也是Scikit-learn提供的一个类,可以帮助我们把类别的取值转变为多个变量组合表示。

咱们这个数据集里,可以把3个国家分别用3个数字组合来表示。例如法国从原先的0,变成(1, 0, 0),德国从1变成(0, 1, 0),而西班牙从2变成 (0, 0, 1)。

这样,再也不会出现0和1之外的数字来描述类别,从而避免机器产生误会,错把类别数字当成大小来计算了。

特征矩阵里面,我们只需要转换国别这一列。因为它在第1列的位置(从0开始计数),因而categorical_features只填写它的位置信息。

1
复制代码

onehotencoder = OneHotEncoder(categorical_features = [1])X = onehotencoder.fit_transform(X).toarray()

这时候,我们的特征矩阵数据框就被转换成了一个数组。注意所有被OneHotEncoder转换的列会排在最前面,然后才是那些保持原样的数据列。

我们只看转换后的第一行:

1
复制代码

X[0]

1
复制代码

array([ 1.00000000e+00, 0.00000000e+00, 0.00000000e+00, 6.19000000e+02, 0.00000000e+00, 4.20000000e+01, 2.00000000e+00, 0.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.01348880e+05])

这样,总算转换完毕了吧?

没有。

因为本例中,OneHotEncoder转换出来的3列数字,实际上是不独立的。给定其中两列的信息,你自己都可以计算出其中的第3列取值。

好比说,某一行的前两列数字是(0, 0),那么第三列肯定是1。因为这是转换规则决定的。3列里只能有1个是1,其余都是0。

如果你做过多元线性回归,应该知道这种情况下,我们是需要去掉其中一列,才能继续分析的。不然会落入“虚拟变量陷阱”(dummy variable trap)。

我们删掉第0列,避免掉进坑里。

1
复制代码

X = np.delete(X, [0], 1)

再次打印第一行:

1
复制代码

X[0]

1
复制代码

array([ 0.00000000e+00, 0.00000000e+00, 6.19000000e+02, 0.00000000e+00, 4.20000000e+01, 2.00000000e+00, 0.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.01348880e+05])

检查完毕,现在咱们的特征矩阵处理基本完成。

但是监督式学习,最重要的是有标签(label)数据。本例中的标签就是用户是否流失。我们目前的标签数据框,是这个样子的。

1
复制代码

y.head()

1
复制代码

0 11 02 13 04 0Name: Exited, dtype: int64

它是一个行向量,我们需要把它先转换成为列向量。你可以想象成把它“竖过来”。

1
复制代码

y = y[:, np.newaxis]y

1
复制代码

array([[1], [0], [1], …, [1], [1], [0]])

这样在后面训练的时候,他就可以和前面的特征矩阵一一对应来操作计算了。

既然标签代表了类别,我们也把它用OneHotEncoder转换,这样方便我们后面做分类学习。

1
复制代码

onehotencoder = OneHotEncoder()y = onehotencoder.fit_transform(y).toarray()

此时的标签变成两列数据,一列代表顾客存留,一列代表顾客流失。

1
复制代码

y

1
复制代码

array([[ 0., 1.], [ 1., 0.], [ 0., 1.], …, [ 0., 1.], [ 0., 1.], [ 1., 0.]])

总体的数据已经齐全了。但是我们不能把它们都用来训练。

这就好像老师不应该把考试题目拿来给学生做作业和练习一样。只有考学生没见过的题,才能区分学生是掌握了正确的解题方法,还是死记硬背了作业答案。

我们拿出20%的数据,放在一边,等着用来做测试。其余8000条数据用来训练机器学习模型。

1
复制代码

from sklearn.model_selection import train_test_splitX_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)

我们看看训练集的长度:

1
复制代码

len(X_train)

1
复制代码

8000

再看看测试集的长度:

1
复制代码

len(X_test)

1
复制代码

2000

确认无误。

是不是可以开始机器学习了?

可以,但是下面这一步也很关键。我们需要把数据进行标准化处理。因为原先每一列数字的取值范围都各不相同,因此有的列方差要远远大于其他列。这样对机器来说,也是很困扰的。数据的标准化处理,可以在保持列内数据多样性的同时,尽量减少不同类别之间差异的影响,可以让机器公平对待全部特征。

我们调用Scikit-learn的StandardScaler类来完成这一过程。

1
复制代码

from sklearn.preprocessing import StandardScalersc = StandardScaler()X_train = sc.fit_transform(X_train)X_test = sc.transform(X_test)

注意,我们只对特征矩阵做标准化,标签是不能动的。另外训练集和测试集需要按照统一的标准变化。所以你看,训练集上,我们用了fit_transform函数,先拟合后转换;而在测试集上,我们直接用训练集拟合的结果,只做转换。

1
复制代码

X_train

1
复制代码

array([[-0.5698444 , 1.74309049, 0.16958176, …, 0.64259497, -1.03227043, 1.10643166], [ 1.75486502, -0.57369368, -2.30455945, …, 0.64259497, 0.9687384 , -0.74866447], [-0.5698444 , -0.57369368, -1.19119591, …, 0.64259497, -1.03227043, 1.48533467],
…, [-0.5698444 , -0.57369368, 0.9015152 , …, 0.64259497, -1.03227043, 1.41231994], [-0.5698444 , 1.74309049, -0.62420521, …, 0.64259497, 0.9687384 , 0.84432121], [ 1.75486502, -0.57369368, -0.28401079, …, 0.64259497, -1.03227043, 0.32472465]])

你会发现,许多列的方差比原先小得多。机器学习起来,会更加方便。

数据清理和转换工作至此完成。

决策树

如果读过我的《贷还是不贷:如何用Python和机器学习帮你决策?》一文,你应该有一种感觉——这个问题和贷款审批决策很像啊!既然在该文中,决策树很好使,我们继续用决策树不就好了?

好的,我们先测试一下经典机器学习算法表现如何。

从Scikit-learn中,读入决策树工具。然后拟合训练集数据。

1
复制代码

from sklearn import treeclf = tree.DecisionTreeClassifier()clf = clf.fit(X_train, y_train)

然后,利用我们建立的决策树模型做出预测。

1
复制代码

y_pred = clf.predict(X_test)

打印预测结果:

1
复制代码

y_pred

1
复制代码

array([[ 1., 0.], [ 0., 1.], [ 1., 0.], …, [ 1., 0.], [ 1., 0.], [ 0., 1.]])

这样看不出来什么。让我们调用Scikit-learn的classification_report模块,生成分析报告。

1
复制代码

from sklearn.metrics import classification_reportprint(classification_report(y_test, y_pred))

1
复制代码

precision recall f1-score support 0 0.89 0.86 0.87 1595 1 0.51 0.58 0.54 405avg / total 0.81 0.80 0.81 2000

经检测,决策树在咱们的数据集上,表现得还是不错的。总体的准确率为0.81,召回率为0.80,f1分数为0.81,已经很高了。对10个客户做流失可能性判断,它有8次都能判断正确。

但是,这样是否足够?

我们或许可以调整决策树的参数做优化,尝试改进预测结果。

或者我们可以采用深度学习。

深度

深度学习的使用场景,往往是因为原有的模型经典机器学习模型过于简单,无法把握复杂数据特性。

我不准备给你讲一堆数学公式,咱们动手做个实验。

请你打开这个网址。

你会看到如下图所示的深度学习游乐场:

df02e7b2e520edf9c3e08763e05fc556da685756

右侧的图形,里面是蓝色数据,外圈是黄色数据。你的任务就是要用模型分类两种不同数据。

你说那还不容易?我一眼就看出来了。

你看出来没有用。通过你的设置,让机器也能正确区分,才算数。

图中你看到许多加减号。咱们就通过操纵它们来玩儿一玩儿模型。

首先,点图中部上方的”2 HIDDEN LAYERS”左侧减号,把中间隐藏层数降低为1。

6bed67ad1e7b342be816ee732912603d4ae77a08

然后,点击”2 neurons”上面的减号,把神经元数量减少为1。

把页面上方的Activation函数下拉框打开,选择“Sigmoid”。

现在的模型,其实就是经典的逻辑回归(Logistic Regression)。

d1d5b32e1db2a2157b8094b571b523b4110bbd90

点击左上方的运行按钮,我们看看执行效果。

292adae7d203f937e17c95f974651bf7a3126f7f

由于模型过于简单,所以机器绞尽脑汁,试图用一条直线切分二维平面上的两类节点。

损失(loss)居高不下。训练集和测试集损失都在0.4左右,显然不符合我们的分类需求。

下面我们试试增加层数和神经元数量。这次点击加号,把隐藏层数加回到2,两层神经元数量都取2。

c8ac22aeb8d6e925bc11989d6b9f08ecd09da5ae

再次点击运行。

经过一段时间,结果稳定了下来,你发现这次电脑用了两条线,把平面切分成了3部分。

b8474cbb4fa03750cb961f1385ba24ff7ba6571d

测试集损失下降到了0.25左右,而训练集损失更是降低到了0.2以下。

模型复杂了,效果似乎更好一些。

再接再厉,我们把第一个隐藏层的神经元数量增加为4看看。

d76547064c85dfc20eaf66d14628fcea0de7d005

点击运行,不一会儿有趣的事情就发生了。

40a80fc71a80675e75f012543981c792ade4d7c0

机器用一条近乎完美的曲线把平面分成了内外两个部分。测试集和训练集损失都极速下降,训练集损失甚至接近于0。

这告诉我们,许多时候模型过于简单带来的问题,可以通过加深隐藏层次、增加神经元的方法提升模型复杂度,加以改进。

目前流行的划分方法,是用隐藏层的数量多少来区分是否“深度”。当神经网络中隐藏层数量达到3层以上时,就被称为“深度神经网络”,或者“深度学习”。

久闻大名的深度学习,原来就是这么简单。

如果有时间的话,建议你自己在这个游乐场里多动手玩儿一玩儿。你会很快对神经网络和深度学习有个感性认识。

框架

游乐场背后使用的引擎,就是Google的深度学习框架Tensorflow。

所谓框架,就是别人帮你构造好的基础软件应用。你可以通过调用它们,避免自己重复发明轮子,大幅度节省时间,提升效率。

支持Python语言的深度学习的框架有很多,除了Tensorflow外,还有PyTorch, Theano和MXNet等。

我给你的建议是,找到一个你喜欢的软件包,深入学习使用,不断实践来提升自己的技能。千万不要跟别人争论哪个深度学习框架更好。一来萝卜白菜各有所爱,每个人都有自己的偏好;二来深度学习的江湖水很深,言多有失。说错了话,别的门派可能会不高兴哟。

我比较喜欢Tensorflow。但是Tensorflow本身是个底层库。虽然随着版本的更迭,界面越来越易用。但是对初学者来说,许多细节依然有些过于琐碎,不容易掌握。

初学者的耐心有限,挫折过多容易放弃。

幸好,还有几个高度抽象框架,是建立在Tensorflow之上的。如果你的任务是应用现成的深度学习模型,那么这些框架会给你带来非常大的便利。

这些框架包括Keras, TensorLayer等。咱们今天将要使用的,叫做TFlearn。

它的特点,就是长得很像Scikit-learn。这样如果你熟悉经典机器学习模型,学起来会特别轻松省力。

实战

闲话就说这么多,下面咱们继续写代码吧。

写代码之前,请回到终端下,运行以下命令,安装几个软件包:

1
复制代码

pip install tensorflowpip install tflearn

执行完毕后,回到Notebook里。

我们呼叫tflearn框架。

1
复制代码

import tflearn

然后,我们开始搭积木一样,搭神经网络层。

首先是输入层。

1
复制代码

net = tflearn.input_data(shape=[None, 11])

注意这里的写法,因为我们输入的数据,是特征矩阵。而经过我们处理后,特征矩阵现在有11列,因此shape的第二项写11。

shape的第一项,None,指的是我们要输入的特征矩阵行数。因为我们现在是搭建模型,后面特征矩阵有可能一次输入,有可能分成组块输入,长度可大可小,无法事先确定。所以这里填None。tflearn会在我们实际执行训练的时候,自己读入特征矩阵的尺寸,来处理这个数值。

下面我们搭建隐藏层。这里我们要使用深度学习,搭建3层。

1
复制代码

net = tflearn.fully_connected(net, 6, activation=’relu’)net = tflearn.fully_connected(net, 6, activation=’relu’)net = tflearn.fully_connected(net, 6, activation=’relu’)

activation刚才在深度学习游乐场里面我们遇到过,代表激活函数。如果没有它,所有的输入输出都是线性关系。

Relu函数是激活函数的一种。它大概长这个样子。

ce8dfdbdf8a2b2cfeda704fd2b5b517414981429

如果你想了解激活函数的更多知识,请参考后文的学习资源部分。

隐藏层里,每一层我们都设置了6个神经元。其实至今为之,也不存在最优神经元数量的计算公式。工程界的一种做法,是把输入层的神经元数量,加上输出层神经元数量,除以2取整。咱们这里就是用的这种方法,得出6个。

搭好了3个中间隐藏层,下面我们来搭建输出层。

1
复制代码

net = tflearn.fully_connected(net, 2, activation=’softmax’)net = tflearn.regression(net)

这里我们用两个神经元做输出,并且说明使用回归方法。输出层选用的激活函数为softmax。处理分类任务的时候,softmax比较合适。它会告诉我们每一类的可能性,其中数值最高的,可以作为我们的分类结果。

积木搭完了,下面我们告诉TFlearn,以刚刚搭建的结构,生成模型。

1
复制代码

model = tflearn.DNN(net)

有了模型,我们就可以使用拟合功能了。你看是不是跟Scikit-learn的使用方法很相似呢?

1
复制代码

model.fit(X_train, y_train, n_epoch=30, batch_size=32, show_metric=True)

注意这里多了几个参数,我们来解释一下。

``n_epoch:数据训练几个轮次。batch_size:每一次输入给模型的数据行数。show_metric:训练过程中要不要打印结果。
以下就是电脑输出的最终训练结果。其实中间运行过程看着更激动人心,你自己试一下就知道了。

1
复制代码

Training Step: 7499 | total loss: [1m[32m0.39757[0m[0m | time: 0.656s| Adam | epoch: 030 | loss: 0.39757 - acc: 0.8493 – iter: 7968/8000Training Step: 7500 | total loss: [1m[32m0.40385[0m[0m | time: 0.659s| Adam | epoch: 030 | loss: 0.40385 - acc:
0.8487 – iter: 8000/8000–

我们看到训练集的损失(loss)大概为0.4左右。

打开终端,我们输入

1
复制代码

tensorboard –logdir=/tmp/tflearn_logs/

然后在浏览器里输入http://localhost:6006/

可以看到如下界面:

1cb028fd2b76cb86c8934a15132593caafb422f9

这是模型训练过程的可视化图形,可以看到准确度的攀升和损失降低的曲线。

打开GRAPHS标签页,我们可以查看神经网络的结构图形。

78f3b81e9449d099952f7993dde2d59b8730a3e1

我们搭积木的过程,在此处一目了然。

评估

训练好了模型,我们来尝试做个预测吧。

看看测试集的特征矩阵第一行。

1
复制代码

X_test[0]

1
复制代码

array([ 1.75486502, -0.57369368, -0.55204276, -1.09168714, -0.36890377, 1.04473698, 0.8793029 , -0.92159124, 0.64259497, 0.9687384 , 1.61085707])

我们就用它来预测一下分类结果。

1
复制代码

y_pred = model.predict(X_test)

打印出来看看:

1
复制代码

y_pred[0]

1
复制代码

array([ 0.70956731, 0.29043278], dtype=float32)

模型判断该客户不流失的可能性为0.70956731。

我们看看实际标签数据:

1
复制代码

y_test[0]

1
复制代码

array([ 1., 0.])

客户果然没有流失。这个预测是对的。

但是一个数据的预测正确与否,是无法说明问题的。我们下面跑整个测试集,并且使用evaluate函数评价模型。

1
复制代码

score = model.evaluate(X_test, y_test)print(‘Test accuarcy: %0.4f%%’ % (score[0] * 100))

1
复制代码

Test accuarcy: 84.1500%

在测试集上,准确性达到84.15%,好样的!

希望在你的努力下,机器做出的准确判断可以帮助银行有效锁定可能流失的客户,降低客户的流失率,继续日进斗金。

说明

你可能觉得,深度学习也没有什么厉害的嘛。原先的决策树算法,那么简单就能实现,也可以达到80%以上的准确度。写了这么多语句,深度学习结果也无非只提升了几个百分点而已。

首先,准确度达到某种高度后,提升是不容易的。这就好像学生考试,从不及格到及格,付出的努力并不需要很高;从95分提升到100,却是许多人一辈子也没有完成的目标。

其次,在某些领域里,1%的提升意味着以百万美元计的利润,或者几千个人的生命因此得到拯救。

第三,深度学习的崛起,是因为大数据的环境。在许多情况下,数据越多,深度学习的优势就越明显。本例中只有10000条记录,与“大数据”的规模还相去甚远。

原文发布时间为:2017-11-27

本文作者:王树义

本文来自云栖社区合作伙伴“数据派THU”,了解相关信息可以关注“数据派THU”微信公众号

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

并发番AbstractQueuedSynchronizer

发表于 2017-11-28

并发番@AbstractQueuedSynchronizer一文通

并发 1.8版


  • 并发番@AbstractQueuedSynchronizer一文通
    • 1.AQS概述
      • 1.1 AQS综述
      • 1.2 AQS组件
      • 1.3 AQS核心功能
        • 1.3.1 加锁
        • 1.3.2 解锁
        • 1.3.3 独占与共享
        • 1.3.4 中断与超时
      • 1.4 AQS使用
    • 2.AQS组成
      • 2.1 类定义
      • 2.2 构造器
      • 2.3 重要变量
      • 2.4 内部类
      • 2.5 UnSafe
    • 3.Node节点
      • 3.1 节点概述
      • 3.2 节点类
      • 3.2 节点状态
    • 4.状态维护
    • 5.同步队列
      • 5.1 同步队列入队
      • 5.2 同步队列出队
        • 5.2.1 独占模式成功获取锁后出队
        • 5.2.2 共享模式成功获取锁后出队
        • 5.2.3 因中断/超时获取锁失败后出队
    • 6.独占模式
      • 6.1 独占式获取锁
        • 6.1.1 不响应中断获取锁
        • 6.1.2 响应中断获取锁
        • 6.1.3 响应超时与中断获取锁
      • 6.2 独占式释放
    • 7.共享模式
      • 7.1 共享式获取锁
        • 7.1.1 不响应中断获取锁
        • 7.1.2 响应中断获取锁
        • 7.1.3 响应超时与中断获取锁
      • 7.2 共享式释放
    • 8.线程中断、阻塞、唤醒
      • 8.1 shouldParkAfterFailedAcquire
      • 8.2 parkAndCheckInterrupt
      • 8.3 selfInterrupt
      • 8.4 unparkSuccessor
    • 9.题外话
  • 推荐先阅读笔者的 并发番@AQS框架一文通 一文
  • 感谢支持 笔者掘金专栏博文

1.AQS概述

1.1 AQS综述

AQS(队列同步器)是一个用来构建锁和同步器的框架,Doug Lea期望其作为大部分同步需求的基础:

作用: AQS是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义

功能: AQS框架提供实现阻塞锁和依赖FIFO等待队列的关联同步器

按需调用: AQS并不会实现任何同步接口,相反仅是提供一些方法以便具体锁和关联同步器按需调用

适用: 该类适用于依赖单个原子int变量表示同步状态的多种形式的同步器

原理: 内部使用一个volatile int变量表示同步状态,通过FIFO同步队列实现资源获取线程的排队工作,通过UnSafe实现底层的等待与唤醒操作,通过ConditionObject实现条件变量的使用

1.2 AQS组件

整个AQS由4个核心组件构成:
1.同步状态: 即「volatile int State」变量,该状态其实质就是可用资源数(因此是数值而不是布尔值)
2.Node节点: 队列操作的基本元素,线程入队前会先被封装成Node节点,其会记录队列操作所需的重要属性
3.同步队列: FIFO等待队列,CLH锁的变种实现并同时支持独占和共享模式,当线程获取锁失败会进入同步队列中等待,成功获取锁或因中断、异常等原因获取锁失败时出队
4.条件队列: 只用于独占模式, 且使用Condition的前提是线程已经获取到锁,并发番@ConditionObject一文通

1.3 AQS核心功能

1.3.1 加锁

AQS中加锁的基本流程如下(以独占模式为例):

- 论文版:

1
2
3
4
5
6
7
8
9
复制代码//循环判断同步状态是否可取
while (synchronization state does not allow acquire) {
//不可取时,线程入同步队列(若尚未进入同步队列)
enqueue current thread if not already queued;
//阻塞当前线程
possibly block current thread;
}
//成功获取锁后出队(当然异常失败也需要出队)
dequeue current thread if it was queued;

- 实现版:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码//1.tryAcquire会CAS更新State,更新成功获取锁,否则进入同步队列以自旋方式获取锁
if(!tryAcquire(arg)){
//自旋方式获取锁
for (;;) {
//2.若当前节点的前驱节点为head, 则再次尝试获取锁
if(node.prev == head && tryAcquire(arg)){
//3.获取锁后重设head,共享模式下需要传播唤醒后继节点
setHead(node);
}
//4.获取锁失败,则安全更新前驱节点的waitStatus的值为SINGAL并对当前节点的线程进行阻塞
CAS(node,waitStatus,SINGAL) && park(node)
}
}

1.3.2 解锁

AQS中解锁的基本流程如下(以独占模式为例):

- 论文版:

1
2
3
4
5
6
复制代码//更新同步状态
update synchronization state;
//判断同步状态是否可取
if (state may permit a blocked thread to acquire)
//同步状态可取则从同步队列中释放阻塞线程
unblock one or more queued threads;

- 实现版:

1
2
3
4
5
6
复制代码//1.tryRelease会CAS更新State为0,成功返回true
if(tryRelease(arg)){
//2.若是SIGNAL,则将waitStatus回归为0同时唤醒后继节点
if (node.waitStatus != 0)
CAS(node,waitStatus,0) && unpark(node.next);
}

1.3.3 独占与共享

AQS同时支持两种模式,分别是独占模式和共享模式:
独占模式: 即只有一个线程能持有锁(单资源,排他性),AQS提供「acquire」「release」方法
共享模式: 即有多个线程能持有锁(多资源),AQS提供「acquireShared」「releaseShared」方法

1.3.4 中断与超时

AQS同时新增对中断和超时的响应支持,同时也区分独占模式和共享模式:
独占模式: 即只有一个线程能持有锁(单资源),AQS提供「acquireInterruptibly」「tryAcquireNanos」方法
共享模式: 即有多个线程能持有锁(多资源),AQS提供「acquireSharedInterruptibly」「tryAcquireSharedNanos」方法

1.4 AQS使用

自定义同步器只需实现「State」的获取和释放即可,即如下模板方法,状态维护和队列管理等已由AQS实现:
实现独占: 「tryAcquire」「tryRelease」「isHeldExclusively」
实现共享: 「tryAcquireShared」「tryReleaseShared」

补充: 在使用时使用者只需要实现独占和共享的其中一种即可(如「ReentrantLock」),当然也支持两者都实现(如「ReentrantReadWriteLock」)

2.AQS组成

2.1 类定义

1
2
3
复制代码public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable

2.2 构造器

1
复制代码protected AbstractQueuedSynchronizer() { }

2.3 重要变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码/**
* CLH锁同步队列的头节点,延迟初始化(懒加载)
* 除了初始化,其他时刻只能被setHead进行更新
* 注意:当head存在时,waitStatus不能是CANCELLED状态
*/
private transient volatile Node head;

/**
* CLH锁同步队列的头节点,延迟初始化(懒加载)
* 只能在入队时新增一个Node时进行更新
*/
private transient volatile Node tail;

/**
* 同步状态 - volatile保证其可见性
*/
private volatile int state;

2.4 内部类

AQS内包含两个内部类,分别是:
Node: 每个线程会被封装成一个Node节点,其中会记录线程、节点状态和前后节点等信息,详情请参见3.Node节点
ConditionObject: 条件变量,用于实现管程形式的条件控制,详情请参见并发番@AbstractQueuedSynchronized一文通

2.5 UnSafe

作用: 用于提供CAS原子更新操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
复制代码private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}

/**
* CAS原子更新head节点,仅用于同步队列的enq入队操作
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

/**
* CAS原子更新tail节点,仅用于同步队列的enq入队操作
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

/**
* CAS更新原子更新Node节点的waitStatus变量
*/
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}

/**
* CAS next field of a node.原子更新Node节点的next变量,仅用于cancelAcquire取消获取操作
*/
private static final boolean compareAndSetNext(Node node,Node expect,Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

3.Node节点

3.1 节点概述

待入队线程会在入队前被封装成一个Node节点,其中会记录线程、节点状态和前后节点等信息
作用: AQS框架的变种CLH锁借由Node组成的「FIFO双向链表队列」实现
链接: 每个Node通过「pred」链接其前驱节点,通过「next」链接其后继节点
条件支持: 每个Node同时会通过「nextWaiter」提供对「Condition」的支持
模式: 每个Node都可以支持独占「EXCLUSIVE」或共享「SHARED」模式
初始化: CLH锁只有在第一次入队时(即第一次出现竞争时)会初始化「Head」和「Tail」,主要是性能考究(默认少竞争)

3.2 节点类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
复制代码static final class Node {

/** 标记共享模式 */
static final Node SHARED = new Node();

/** 标记独占模式 */
static final Node EXCLUSIVE = null;

/** 标记节点被取消 */
static final int CANCELLED = 1;

/** 标记后继节点需要被唤醒 */
static final int SIGNAL = -1;

/** 标记节点位于条件阻塞队列中 */
static final int CONDITION = -2;

/**
* 标记共享模式下,节点共享状态正在被传播(acquireShared)
* 当前节点获得锁或释放锁时, 共享模式下节点的最终状态是 PROPAGATE
*/
static final int PROPAGATE = -3;

/**
* !!!重中之重!!!
* 标记节点状态,默认为0,负数无须唤醒,使用CAS原子更新
* 独占模式:SIGNAL、CANCEL、0
* 共享模式:SIGNAL、CANCEL、PROPAGATE、0
* 条件变量:CONDITION状态不会存在于CLH锁同步队列中,只用于条件阻塞队列
*/
volatile int waitStatus;

/**
* 在CLH锁同步队列中链接前驱节点,使用CAS原子更新,每次入队和GC出队时会被指派
* 当前驱节点被取消时,一定能找到一个未被取消的节点,因为Head节点永远不会被取消:头节点必须成功aquire
* 被取消的线程不会再次成功aquire,线程只能取消自己不会影响其他
* 主要作用是在循环中跳过CANCELLED状态的节点
*/
volatile Node prev;

/**
* 在CLH锁同步队列中链接后继节点,每次入队、前驱节点被取消以及GC出队时被指派
* 赋值操作非线程安全,next为null时并不意味着节点不存在后继节点
* 当next不为null时,next是可靠的
* 主要作用是在释放锁时对后继节点进行唤醒
*/
volatile Node next;

/** Node关联线程 */
volatile Thread thread;

/**
* 链接位于条件阻塞队列的节点或特定SHARED值
* 实际作用就是标记Node是共享模式还是独占模式
* 独占模式时为null,共享模式时为SHARED
* 在条件阻塞队列中指向下一个节点
*/
Node nextWaiter;

/**
* 判断Node是否为共享模式
* @Return true 是 false 不是
*/
final boolean isShared() {
//当是共享模式时,nextWaiter就是SHARED值,独占模式就是null
return nextWaiter == SHARED;
}

/**
* 返回前驱节点,当前驱节点为空时直接抛空指针异常(实际上Head永远不会为null)
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
//空指针判断只要是为了help gc
if (p == null)
throw new NullPointerException();
else
return p;
}

//默认共享模式
Node() {}

// Used by addWaiter 用于CLH锁同步队列
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

// Used by Condition 用于条件阻塞队列
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

3.2 节点状态

每个Node都有持有对应的「线程ID」,并通过int类型的「waitStatus」标记节点状态
SIGNAL(1): 标记唤醒,当前节点被释放后必须唤醒后继节点
CANCELLED(-1): 标记已取消,当Node因超时或中断被取消,取消状态不可变且对应线程不可再次阻塞 : CONDITION(-2): 标记条件阻塞,即Node位于条件变量的阻塞队列中(或者说是条件阻塞队列)
PROPAGATE(-3): 标记传递中,仅用于标记位于同步队列的头节点,表示共享状态该正在被传递中
0: 默认为0,当为条件阻塞时默认为-2,非负数意味着无须唤醒,此值使用CAS原子更新

4.状态维护

「State」状态主要用于记录线程获取锁的次数,其实质就是可用资源数,所有操作的目的其实都是为了获得她的青睐
1.必须使用CAS对「State」状态进行原子更新
2.当「State」状态>0时,说明当前线程已持有该锁;当「State」状态=0时,说明当前线程无该锁
3.由于State可自增,因此可用于实现可重入,如「ReentrantLock.lock()」
4.独占模式中「State」的值最多为1 ,共享模式中「State」的值可以任意大(如「CountDownLatch」)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码/**
* 同步状态
*/
private volatile int state;
/**
* 返回当前同步状态,该操作具有volatile内存读语义 - 直接从主内存读取到最新值
*/
protected final int getState() {
return state;
}
/**
* 设置当前同步状态,该操作具有volatile内存写语义 - 一旦变更直接刷新到主内存
*/
protected final void setState(int newState) {
state = newState;
}
/**
* CAS更新当前同步状态,该操作同时具备volatile内存读-写语义
* @param expect 期望值
* @param update 新值
* @return {@code true} 成功返回true,失败返回false(即实际值!=期望值)
*/
protected final boolean compareAndSetState(int expect, int update) {
// 底层调用通过Unsafe调用CAS
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

5.同步队列

5.1 同步队列入队

同步队列入队主要涉及4个操作:
1.封装Node: 将当前线程封装成Node同时指定独占或共享模式
2.初始化: 当未初始化时,初始化头节点head和尾节点tail
3.队尾追加: 根据FIFO原则,新增节点会被追加到队尾
4.CAS更新tail: 通常将新增节点作为新的tail


小问:如何保证新增节点一定入队并且tail设置成功?
友情小提示:读者可以思考CAS结合重试的解决方案

小答:由于入队很可能发生并发竞争,为了处理这种情况,Doug Lea老师使用了经典的并发解决方案-
自旋CAS变更volatile变量:通过反复重试结合CAS的方式保证至少有一次能入队成功,一次不行就再来一次


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
复制代码/**
* 创建和入队一个新node
* 根据FIFO,新增的追加到队尾并被设置为tail
*
* @param mode Node.EXCLUSIVE for exclusive 独占, Node.SHARED for shared 共享
* @return the new node 返回新Node
*/
private Node addWaiter(Node mode) {

//1.当前线程封装成Node,并指定独占或共享模式
Node node = new Node(Thread.currentThread(), mode);

//记录原tail -> 根据FIFO,新增的追加到队尾并被设置为tail
Node pred = tail;

/**
* 当pred为null时,说明还未初始化,应该直接走enq方法完成初始化
* if代码块是快速入队的一个优化:仅当CAS操作失败才会进入enq方法进行自旋
*/
if (pred != null) {
node.prev = pred;

/**
* CAS更新tail -> 将新增节点设置为tail
* 允许CAS失败,如果失败直接进入enq,采用自旋方式入队
*/
if (compareAndSetTail(pred, node)) {

//将原tail节点的next链接指向当前节点
pred.next = node;
return node;
}
}

//采用自旋方式入队
enq(node);
return node;
}

/**
* 向队尾插入一个节点,未初始化(tail为null)时完成初始化
* 必须CAS更新Head和Tail
* @param node the node to insert 待插入节点
* @return node's predecessor 待插入节点的前驱节点(即原tail)
*/
private Node enq(final Node node) {

//自旋方式入队
for (;;) {
Node t = tail;

/**
* 2.初始化 -> 当tail为null时就意味着队列为空
* - 初始化时会同时生成head和tail
* - 其中head一直作为dummy节点存在,不需要存储thread,
* 但需要记录waitStatus字段,以作为唤醒后继节点的依据
* - 其中tail一直作为实体节点存在,会存储thread
*/
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {

/**
* 3.新节点追加到队尾,关于prev有三个重要论点:
* 一.Node使用prev(前驱节点)作为形成链的根本依据
* 二.当节点位于同步队列中,prev一定非空
* 三.但prev非空并不意味着节点位于同步队列中
* 因为发生竞争时CAS更新tail是允许失败的,一旦CAS失败就再自旋一次
* 当CAS更新tail失败时,由于节点只是将prev指向tail但并没有设置tail成功
* 此时并不能算作真正的入队(原因在于后面获取和释放操作都是基于tail的)
*/
node.prev = t;

/**
* 4.CAS更新tail,关于next有三个重要论点:
* 一.Node使用next作为形成链的一种优化辅助手段
* 二.当next非空,节点一定存在同步队列中
* 三.但节点存在同步队列时,next不一定非空
*/
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

小问:为神马说next是非线程安全的?有什么隐患吗?
友情小提示:读者可以从tail的更新时机角度考虑

小答:有心的读者会发现在执行CAS更新tail成功之后才会执行t.next=node,此时节点已经真正入队;
但问题是在并发情况下,由于t.next=node非CAS操作,因此是非线程安全的;
但由于后续操作是依赖于tail的,next更多是个优化,因此即使非安全也没关系


小问:为神马要加入一个 dummy 节点(head)?
友情小提示:读者可以从CLH锁变种考虑

小答:原因是同步队列是CLH锁的一个变种
1.线程节点能否获取锁的判断就是通过其前继节点的状态,当前节点若想获取锁需要给前驱节点设置为SIGNAL状态,作用是当前驱节点释放锁后能通知其后继节点去获取锁
2.head节点用来表示当前已获得锁的节点,其无须存储线程,它的核心功能是作为必然存在的前驱节点通过记录waitStatus状态作为是否需要唤醒后继节点的判断依据


5.2 同步队列出队

5.2.1 独占模式成功获取锁后出队

独占模式成功获取锁出队会做三个操作:
1.设置Head: Head指向成功获取锁(即待出队)的节点,作为新的头节点
2.清空当前节点引用: 待出队的节点需要将thread和prev属性设置为null,help gc
3.清空前驱节点next: 由于当前节点需要被GC,因此也要清除其前驱节点的next

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node 成功获取锁的节点(即待出队节点)
*/
private void setHead(Node node) {
// 1.head指向待出队节点
head = node;
// 2.清空当前节点引用
node.thread = null;
node.prev = null;
}
// 3.清空前驱节点next
p.next = null;

小问:为神马要清空节点引用??
友情小提示:读者可以从Node的内部变量的作用以及head作用在同步队列中的作用这方面去考虑

小答:Head在同步队列中的定位就是作为待出队节点的一个状态记录点,以作为唤醒后继节点的依据
1.其核心在于waitStatus的值(当为SIGNAL时才会去唤醒后继节点),因此Node的其他属性并不重要,Head更多是虚拟节点的存在,只存储waitStatus即可,以作为唤醒后继的依据
2.同时清空thread、prev、prev.next有利于GC回收待出队的节点,因为该节点在给head设置完waitStatus之后就完成了使命,线程可以出队了


5.2.2 共享模式成功获取锁后出队

共享模式成功获取锁出队会做两个操作:
1.设置Head: 设置头节点,同时检测在共享模式下是否有后继者等待获取锁
2.向后传播唤醒: 如果存在,则在满足(propagate > 0 或 节点状态为PROPAGATE)时传播唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
复制代码/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* 共享模式成功获取锁出队
* 1.设置头节点,同时检测在共享模式下是否有后继者等待获取锁
* 2.如果存在,则在满足(propagate > 0 或 节点状态为PROPAGATE)时传播唤醒
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {

//设置头节点
Node h = head; // Record old head for check below
setHead(node);

/**
* 这里做了很多逻辑或判断,主要目的就是为了判断是否还有剩余资源以唤醒后续线程节点
* 1.propagate > 0 :意味着还有剩余资源(state>0),共享时当然需要继续唤醒
* 2.h == null : 头节点为空(这个比较诡异-we don't know, because it appears null)
* 3.h.waitStatus <0 : 头节点状态为负数(尤其是PROPAGATE),说明需要唤醒后继节点
*
* 注意: 这些保守的检查,在竞争环境下获取/释放锁可能会导致不必要的多次唤醒
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;

//注意:一旦遇到节点是独占模式,即使propagate>0也会停止往后传播啦
if (s == null || s.isShared())

// 唤醒后继结点
doReleaseShared();
}
}

小问:何时会出现h==null的情况?
友情小提示:这个可能需要测试极端环境,如果有读者有想法的欢迎留言指导笔者一二


5.2.3 因中断/超时获取锁失败后出队

友情小提示:建议在看完aquire操作之后再回看5.2.2

因中断/超时而放弃获取锁失败的出队会做三个操作:
1.清空线程引用:清空当前节点的线程引用,便于当前节点出队和GC
2.寻找非取消状态前驱节点:沿prev反向遍历直到找到一个非取消状态前驱节点
3.设置CANCELLED状态:核心步骤,失败的节点需要设置为取消状态
4.清除、重置、唤醒:见步骤4-8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
复制代码/**
* Cancels an ongoing attempt to acquire.
*
* 清除因中断/超时而放弃获取lock的线程节点
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

// 1.清空线程引用
node.thread = null;

// Skip cancelled predecessors 跳过所有取消状态前驱节点
Node pred = node.prev;

// 2.沿prev反向遍历直到找到一个非取消状态前驱节点,同时也顺便清除途中出现的取消状态节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// 注意predNext也是需要清除的
Node predNext = pred.next;

/**
* 3. 节点状态需要设置为CANCELLED,即标记为可回收的无用节点
* CANCELLED状态的节点之后除了被清除不会再参与任何操作,等同于"垃圾"了
*/
node.waitStatus = Node.CANCELLED;

/**
* 4.若需要清除的节点恰好是尾节点,需要将前驱节点CAS设置为新的尾节点
* 允许CAS失败,一旦失败会转而执行步骤6或8
*/
if (node == tail && compareAndSetTail(node, pred)) {

/**
* 5.删除节点predNext -> 因为当前节点已经无用了
* 允许CAS失败即使失败影响也不大,next更多是优化手段,prev才是根本的判断依据
*/
compareAndSetNext(pred, predNext, null);
} else {

int ws;

/**
* 6.这里会对next重置,同时会涉及多个判断,主要由三个逻辑与条件组成:
* 一.前驱节点非头节点 -> 步骤8会有解释
* 二.前驱节点为SIGNAL状态 或 非取消时CAS设置为SINGAL成功 -> 即最终前驱节点必须是SIGNAL
* 三.前驱节点的线程非空 -> 线程若为空,那么问题来了,谁想获取锁,卧底吗???
* 补充一点:head是不会储存thread的,因此之前会有个pred!=head的判断
*/
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {

/**
* 7.若后继节点waitStatus非取消状态,说明后继节点是想获取锁的
* 此时需要将next重置,主要目的是断开与当前节点的链接,建立新的链接
* 允许CAS失败,即使失败影响也不大,next更多是优化手段,prev才是根本的判断依据
*/
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {

// 8.若前驱节点正好是head节点或是取消状态,进入唤醒步骤,具体参见8.4 unparkSuccessor
unparkSuccessor(node);
}
node.next = node; // help GC
}
}

6.独占模式

6.1 独占式获取锁

独占式获取锁总共有三种方式:
1.不响应中断获取锁-acquire: 不响应中断指的是线程获取锁时被中断后能被重新唤醒并继续获取锁,在方法返回后会根据中断状态决定是否重新设置中断
2.响应中断获取锁-acquireInterruptibly: 响应中断指的是当线程获取锁时被中断会立即抛出异常,获取失败
3.响应中断和超时获取锁-tryAcquireNanos-: 处理方式等同响应中断获取,区别是多了超时后直接返回fasle,获取失败
(核心重点)独占式如何成功获取锁:
1.调用tryAcquire方法成功时才能成功获取锁
2.其他所有手段(比如同步队列、CAS自旋volatile变量等)全部是为了辅助调用tryAcquire方法
3.tryAcquire方法中会通过对state进行CAS操作判断是否能够获取锁,即获取锁的根源在于state的值

6.1.1 不响应中断获取锁

不响应中断主要遵循如下四步:
1.tryAcquire: 初次调用子类自实现的tryAcquire方法获取锁,成功即获得锁,否则进入第二步
2.addWaiter: 当前线程封装为独占Node并进入同步队列,等待前驱节点的SIGNAL状态并进入第三步
3.acquireQueued: 自旋获取锁直到调用tryAquire成功获取锁为止(前驱节点为Head时就会尝试调用tryAcquire),自旋过程中可能多次阻塞和解除阻塞,值得注意的是park是进入等待状态
4.selfInterrupt: 若线程获取锁途中被中断,当成功获取锁后,由于中断状态被中途清除,需要补中断状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
复制代码/**
* Acquires in exclusive mode, ignoring interrupts. Implemented by invoking
* at least once {@link #tryAcquire},returning on success.
* Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquire} until success.
* This method can be used to implement method {@link Lock#lock}.
*
* 独占模式获取锁,不响应中断
* 1.成功获取锁的实现是至少一次调用tryAcquire成功并返回true
* 2.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquire成功获取锁
* 3.该方法可用于实现Lock接口的lock方法
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {

//若初次调用tryAcquire失败需要封装成独占Node并加入到同步队列中
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

//若线程成功获取锁但之前已被中断,由于中断状态被中途清除,需要再次设置中断状态
selfInterrupt();
}

/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* 独占式不响应中断获取锁(当线程已在同步队列中)
* 同时条件队列的wait方法也是使用该方法执行独占式不响应中断获取锁操作
*
* @param node the node 当前线程节点
* @param arg the acquire argument 期望state状态值,通常是1
* @return {@code true} if interrupted while waiting 当获取锁过程中被中断返回true
*/
final boolean acquireQueued(final Node node, int arg) {

//记录获取锁是否失败
boolean failed = true;
try {

/**
* 记录获取锁过程中是否被中断
*/
boolean interrupted = false;

//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();

//前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
if (p == head && tryAcquire(arg)) {

//若成功获取锁,head要指向当前节点,即head是获取到锁的那个节点或者是null
setHead(node);

/**
* 由于setHead中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null; // help GC
failed = false;

//返回获取锁途中是否被中断过
return interrupted;
}

/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())

//途中哪怕只被中断一次也要设置中断为true
interrupted = true;
}
} finally {

//失败处理
if (failed)
cancelAcquire(node);
}
}

小问:为神马前驱节点为head时需要再次尝试获取锁呢?
友情小提示:读者可以从前驱节点为head时的锁获取情况去考虑

小答:因为当前驱节点为head时会有两种情况:
1.前驱节点已成功获取锁并正在占用该锁,但可能很快释放
2.前继节点是空节点, 此时已经释放锁, 因此后继节点就有机会获取锁了


6.1.2 响应中断获取锁

与不响应中断获取锁相比,响应中断获取锁只有两个区别:
1.当获取锁的过程中发生中断,立即抛出中断异常,然后进入finally处理失败
2.同时少了设置中断状态的步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
复制代码/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on success.
* Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* 独占式响应中断获取锁
* 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
* 2.成功获取锁的实现是至少一次调用tryAcquire成功并返回true
* 3.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquire成功获取锁或发生中断
* 4.该方法可用于实现Lock接口的lockInterruptibly方法
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {

/**
* 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
* 注意静态方法会清除中断标识
*/
if (Thread.interrupted())
throw new InterruptedException();

//若初次调用tryAcquire失败需要封装成独占Node并进入同步队列
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

/**
* Acquires in exclusive interruptible mode.
*
* 独占式响应中断超时获取锁
*
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {

//当前线程封装成Node
final Node node = addWaiter(Node.EXCLUSIVE);

//记录获取锁是否失败
boolean failed = true;

try {

//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();

//前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
if (p == head && tryAcquire(arg)) {

//若成功获取锁,head要指向当前节点,即head是获取到锁的那个节点或者是null
setHead(node);

/**
* 由于setHead中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null; // help GC
failed = false;
return;
}

/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
* 一旦发生中断,立即抛出异常,停止自旋,然后进入finally处理失败
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {

//失败处理(如线程中断)
if (failed)
cancelAcquire(node);
}
}

小问: 若 if (p == head && tryAcquire(arg)) { //恰好在执行到该方法内部时发生中断 },此时会如何处理中断?
友情小提示:读者可以从线程状态角度去考虑

小答:此时该中断会被忽略,原因是该线程目前是运行态,而运行态是不响应中断的
友情推荐:关于中断响应机制读者可参看笔者的 并发番@Thread一文通(1.7版)


6.1.3 响应超时与中断获取锁

与响应中断获取锁相比,响应超时与中断获取锁有三个区别:
1.若获取锁的过程中超过超时阈值,会先执行超时阻塞;否则会先再次自旋
2.一旦超时立即返回fasle,然后进入finally处理失败
3.有返回值返回是否获取锁,成功返回true,失败返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
复制代码/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquire}, returning on success. Otherwise, the thread is
* queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
*
* 独占式响应中断超时获取锁
* 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
* 2.若获取锁超时会立即返回,即获取锁失败
* 3.成功获取锁的实现是至少一次调用tryAcquire成功并返回true
* 4.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquire成功获取锁
* 或发生中断或超时
* 5.该方法可用于实现Lock接口的tryLock(long, TimeUnit)方法
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {

/**
* 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
* 注意静态方法会清除中断标识
*/
if (Thread.interrupted())
throw new InterruptedException();

/**
* 若初次调用tryAcquire失败需要封装成独占Node并进入同步队列
* 由于超时特性,会返回布尔值告知外部是否成功获取锁或获取锁是否超时
*/
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

/**
* Acquires in exclusive timed mode.
*
* 独占式响应中断超时获取锁
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {

//超时时间必须有效
if (nanosTimeout <= 0L)
return false;

//超时截止时间
final long deadline = System.nanoTime() + nanosTimeout;

//当前线程封装成独占模式Node
final Node node = addWaiter(Node.EXCLUSIVE);

//记录获取锁是否失败
boolean failed = true;
try {

//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();

//前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
if (p == head && tryAcquire(arg)) {

//若成功获取锁,head要指向当前节点,即head是获取到锁的那个节点或者是null
setHead(node);

/**
* 由于setHead中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null; // help GC
failed = false;

//成功获取锁需要返回成功标识,通知外部调用成功
return true;
}

//剩余时间
nanosTimeout = deadline - System.nanoTime();

//超时立即返回false,同时会在finally中执行失败处理
if (nanosTimeout <= 0L)
return false;

/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
* 若同时剩余时间超过自旋时间阈值(默认1000L)即超时阻塞
*/
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);

//若线程在获取锁过程中被中断,立即抛出异常,同时会在finally中执行失败处理
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
//失败处理(如线程中断或超时)
if (failed)
cancelAcquire(node);
}
}

小问:为神马要设置超时阈值?
友情小提示:读者可能从锁优化角度考虑

小答:在剩余时间(nanosTimeout)小于超时阈值(spinForTimeoutThreshold)时,自旋的效率比LockSupport.park更高且开销更少


6.2 独占式释放

独占式释放锁遵循如下步骤:
1.调用子类的tryRelease释放锁资源,若有重复锁需要完全释放
2.当head的waitStatus状态非0,意味着同步队列为空,需要尝试唤醒同步队列中的下一个等待唤醒的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
复制代码/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* 独占模式释放锁,成功释放依据是tryRelease返回true
* 该方法被用来实现Lock.unlock方法
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {

//调用tryRelease判断是否已成功完全释放锁 --即Status是否被CAS更新为0
if (tryRelease(arg)) {
Node h = head;

/**
* 当head的waitStatus状态非0,意味着同步队列为空
* 需要尝试唤醒同步队列中的下一个等待唤醒的线程
*/
if (h != null && h.waitStatus != 0)

//唤醒同步队列中的下一个等待唤醒的线程
unparkSuccessor(h);
return true;
}
return false;
}

7.共享模式

7.1 共享式获取锁

共享式获取锁也有三种方式,与独占式保持一致:
1.不响应中断获取锁-acquireShared
2.响应中断获取锁-acquireSharedInterruptibly
3.响应中断和超时获取锁-tryAcquireSharedNanos
共享式和独占式获取锁原理基本一致,主要区别在于:
1.子类需要实现共享方式获取锁tryAcquireShared
2.节点出队方式变更,当获取锁成功时前驱节点出队时会传播唤醒操作

7.1.1 不响应中断获取锁

不响应中断主要遵循如下两步:
1.tryAcquireShared: 先调用子类自实现的tryAcquireShared方法获取锁,成功即获得锁,否则进入第二步
2.doAcquireShared: 当前线程封装为共享Node并进入同步队列,自旋+共享方式获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
复制代码/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* 共享模式获取锁,不响应中断
* 1.成功获取锁的实现是至少一次调用tryAcquireShared成功并返回非负数
* 2.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquireShared成功获取锁
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {

/**
* 与独占模式的核心区别之一:
* 通过调用子类实现的tryAcquireShared获取锁
* tryAcquireShared < 0 说明没有获取到锁 ,因此需要确定好State的资源量
*/
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

/**
* Acquires in shared uninterruptible mode.
*
* 共享模式获取锁,不响应中断
*
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {

//当前线程封装为共享Node并进入同步队列,等待前驱节点的SIGNAL状态
final Node node = addWaiter(Node.SHARED);

//记录获取锁是否失败
boolean failed = true;
try {

/**
* 记录获取锁过程中是否被中断
* 注意提供中断响应的方法没有该变量,而是选择直接抛出中断异常
*/
boolean interrupted = false;

//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁
for (;;) {
final Node p = node.predecessor();
if (p == head) {

//前驱节点是head时需要再次尝试tryAcquireShared --成功获取锁的关键
int r = tryAcquireShared(arg);
if (r >= 0) {
/**
* 与独占模式的核心区别之二:
* 当获取锁成功时前驱节点出队,区别于独占模式,共享模式会往
* 后传播唤醒操作,目的是保证还在等待的线程能够尽快获取到锁
*/
setHeadAndPropagate(node, r);

/**
* 由于setHeadAndPropagate中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null;

/**
* 与独占模式的核心区别之三:
* 获取锁途中一旦被中断,直接设置中断标识为true
*/
if (interrupted)
selfInterrupt();

failed = false;
return;
}
}
/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//失败处理
if (failed)
cancelAcquire(node);
}
}

小问:为神马共享式不响应中断获取锁可以直接设置中断,而独占式却是返回中断状态(核心区别之三)?
友情小提示:读者可以用selfInterrupt()方法的复用情况方面考虑

小答:这涉及到条件队列的部分内容,条件队列只能用于独占模式(因为使用条件队列的前提就是先获取到锁 – 管程要求),而在条件队列的多个方法中会根据判断条件决定是否要执行selfInterrupt()方法,因此在共享模式中可以直接中断,而独占需要返回中断状态告知独占或条件队列是否执行selfInterrupt()方法


7.1.2 响应中断获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
复制代码/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* 共享式响应中断获取锁
* 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
* 2.成功获取锁的实现是至少一次调用tryAcquireShared成功并返回非负数
* 3.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquireShared成功获取锁或发生中断
* 4.该方法可用于实现Lock接口的lockInterruptibly方法
* @param arg the acquire argument.This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {

/**
* 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
* 注意静态方法会清除中断标识
*/
if (Thread.interrupted())
throw new InterruptedException();

/**
* 与独占模式的核心区别之一:
* 通过调用子类实现的tryAcquireShared获取锁
* tryAcquireShared < 0 说明没有获取到锁 ,因此需要确定好State的资源量
*/
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {

//当前线程封装成共享Node
final Node node = addWaiter(Node.SHARED);

//记录获取锁是否失败
boolean failed = true;
try {

//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();
if (p == head) {

//前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
int r = tryAcquireShared(arg);
if (r >= 0) {

/**
* 与独占模式的核心区别之二:
* 当获取锁成功时前驱节点出队,区别于独占模式,共享模式会往
* 后传播唤醒操作,目的是保证还在等待的线程能够尽快获取到锁
*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}

/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
* 一旦发生中断,立即抛出异常,停止自旋,然后进入finally处理失败
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//失败处理(如线程中断)
if (failed)
cancelAcquire(node);
}
}

7.1.3 响应超时与中断获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
复制代码/**
* Attempts to acquire in shared mode, aborting if interrupted, and
* failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquireShared}, returning on success. Otherwise, the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted or the timeout elapses.
*
* 共享式响应中断超时获取锁
* 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
* 2.若获取锁超时会立即返回,即获取锁失败
* 3.成功获取锁的实现是至少一次调用tryAcquireShared成功并返回非负数
* 4.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquireShared成功获取锁
* 或发生中断或超时
* 5.该方法可用于实现Lock接口的tryLock(long, TimeUnit)方法
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{

/**
* 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
* 注意静态方法会清除中断标识
*/
if (Thread.interrupted())
throw new InterruptedException();

/**
* 若初次调用tryAcquireShared失败需要封装成独占Node并进入同步队列
* 由于超时特性,会返回布尔值告知外部是否成功获取锁或获取锁是否超时
*/
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

/**
* Acquires in shared timed mode.
*
* 共享式响应中断超时获取锁
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {

//超时时间必须有效
if (nanosTimeout <= 0L)
return false;

//超时截止时间
final long deadline = System.nanoTime() + nanosTimeout;

//当前线程封装成共享模式Node
final Node node = addWaiter(Node.SHARED);

//记录获取锁是否失败
boolean failed = true;
try {

//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();

//注意传播也是从head开始一个个唤醒的,不是一次性的(只不过快到好比一次性)
if (p == head) {

//前驱节点是head时需要再次尝试tryAcquireShared --成功获取锁的关键
int r = tryAcquireShared(arg);
if (r >= 0) {

/**
* 与独占模式的核心区别之二:
* 当获取锁成功时前驱节点出队,区别于独占模式,共享模式会往
* 后传播唤醒操作,目的是保证还在等待的线程能够尽快获取到锁
*/
setHeadAndPropagate(node, r);

/**
* 由于setHead中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null; // help GC
failed = false;

//成功获取锁需要返回成功标识,通知外部调用成功
return true;
}
}

//剩余时间
nanosTimeout = head || rline - System.nanoTime();
//超时立即返回false,同时会在finally中执行失败处理
if (nanosTimeout <= 0L)
return false;

/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
* 若同时剩余时间超过自旋时间阈值(默认1000L)即超时阻塞
*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckI
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);

//若线程在获取锁过程中被中断,立即抛出异常,同时会在finally中执行失败处理
if (Thread.interrupted())
throw new InterruptedException();
}
}
} finally {
//失败处理(如线程中断或超时)
if (failed)
cancelAcquire(node);
}
}

7.2 共享式释放

共享式释放相比独占式来说有个特色:
当同步队列中存在多个共享节点且资源够数时可能会并发地唤醒后继节点,因为共享模式下获取锁后就近唤醒后继节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
复制代码/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* 共享模式释放锁
* 实现原理是tryReleaseShared返回true时解除一个或多个线程的阻塞
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {

//自旋方式释放锁
for (;;) {
Node h = head;

//队列不为空且有后继节点
if (h != null && h != tail) {
int ws = h.waitStatus;

//唤醒前提都是Head节点状态为SIGNAL
if (ws == Node.SIGNAL) {

//只有当CAS回归纯真成功时才选择去唤醒后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; //(1) loop to recheck cases
unparkSuccessor(h);

//之所以要设置PROPAGATE主要是区别于独占模式的0,以告知到时要用传播方式进行后续处理
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; //(2) loop on failed CAS
}

//如果head没变化,说明没有唤醒的线程啦,功成名就
if (h == head) //(3) loop if head changed
break;

//其他情况,比如head节点无变化、ws为取消状态等,直接跳过不理会就好
}
}

小问:共享模式为何采用自旋方式释放锁?
友情小提示:遇到CAS操作,我们都需要考虑三个点:1.为何此处用CAS 2.若CAS成功呢? 3.若CAS失败呢?
补充小提示:CAS之所以会失败,通常都是发生了竞争,当然也可能是异常、中断等,因此失败原因很重要


8.线程中断、阻塞、唤醒

8.1 shouldParkAfterFailedAcquire

当线程无法获取锁时,会先通过该方法处理三种状态情况:
1.前驱节点状态为SIGNAL: 一定会唤醒后继节点,直接返回true,说明可以安心park
2.前驱节点状态为CANCELLED: 需要反向找到一个非取消状态节点作为新前驱节点,返回false,此时不允许park
3.前驱节点为0或PROPAGATE(共享): 由于前驱节点和当前节点都存在,因此存在前驱节点需要唤醒后继节点的必要性,因此前驱节点需要被设置为SIGNAL状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
复制代码/**
* 当节点没有获取到锁时,通过检查和更新waitStatus,
* 以判断线程是否可以安心park进入等待状态
*
* @param pred 前驱节点
* @param node the node 当前节点
* @return {@code true} 线程应被阻塞时返回true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/**
* 线程能否安心park进入等待状态取决于前驱节点的waiStatus值
* 换句话说,前驱节点的waiStatus值决定了当前节点的'命'!
*/
int ws = pred.waitStatus;

//1.waitStatus == Node.SIGNAL 待唤醒状态
if (ws == Node.SIGNAL)

/**
* 只有前驱节点为SIGNAL状态,当前线程才能安心被park
* 否则前驱节点在获取锁之后是不会通知当前线程的,
* 当前线程一脸懵逼的哭死在无限等待中?!
*/
return true;

//2.waitStatus >0 即 waitStatus == Node.CANCELLED == 1 取消状态
//取消状态时需要重选一个非取消状态的前驱节点
if (ws > 0) {

/**
* 沿prev反向遍历直到找到一个非取消状态的前驱节点
*
* 小问:那么中间那些取消状态的节点怎么办??
* 小答:笑话,GC老爷子怎么会忘记回收在队列中
* 这些'在其位不谋其政'(放弃获取锁但仍在队列中)的'垃圾们'呢 -
* 正经说法:由于前塞动作,已放弃的节点相当于形成一条无引用链,不久就会被GC
*/
do {
//前塞动作 已放弃获取锁的取消状态节点会形成一条无引用链
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

//注意不要忘记将新的前驱节点的next指向当前节点(相爱相杀不能忘)
pred.next = node;
} else {

//3.waitStatus == Node.PROPAGATE || 0
/**
* 为了后继节点能够安心park,需要将前驱节点设置为SIGNAL状态
* 这样当前驱节点拿到锁了就可以通知后驱节点(即当前线程):后面那个,差不多该醒啦!
* 注意:该方法不存在Node.CONDITION的情况
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//前驱节点非Node.SIGNAL都不应park
return false;
}

小问:为神马compareAndSetWaitStatus(pred, ws, Node.SIGNAL)后仍要返回false,而不是直接返回true?
友情小提示:读者可以从自旋意义这一方面入手考虑

小答:其实从首先判断 ws==Node.SIGNAL我们就可以一窥一二,理由有三:
1.CAS是允许失败的,当CAS失败时说明前驱节点已经出现了问题(比如刚释放完),很明显是不安全的
2.即使CAS失败也可补救,因此搭配自旋,大不了再自旋一轮嘛,多大点事!确保安全第一位嘛!
3.当CAS成功后,结合首先判断ws==Node.SIGNAL,我们可以确保在安全环境下快速返回,提高效率

核心重点:该方法根本作用就是要确保成功地设置前驱节点的SIGNAL状态,以确保前驱节点释放锁后一定能唤醒被park阻塞的后继节点


8.2 parkAndCheckInterrupt

注意前提:只有在前驱节点为SIGNAL状态下才能park阻塞当前节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码/**
* 线程park进入等待状态,并返回是否被中断唤醒
* @return {@code true} if interrupted 若被中断唤醒返回true,否则false
*/
private final boolean parkAndCheckInterrupt() {
//PS:park后线程进入等待状态,线程阻塞在此处,不会继续往后执行,即不会执行到步骤2
LockSupport.park(this);//步骤1
/**
* 需要注意的是静态方法会清除中断标志
* 想要中断任务的话需要在外部再次设置中断
*
* PS:线程只有被unpark唤醒后才会接着继续执行步骤2
*/
return Thread.interrupted();//步骤2
}

友情推荐:

  • 关于中断机制读者可参看笔者的 并发番@Thread一文通(1.7版) 6.线程中断
  • 关于park读者可参看笔者的 并发番@LockSupport一文通(1.8版)

8.3 selfInterrupt

1
2
3
4
5
6
复制代码/**
* 补中断:设置线程中断标志为true
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

8.4 unparkSuccessor

唤醒后继节点有三个注意事项:
- 1.状态位变化:初始状态0 -> Node.SIGNAL(作为唤醒后继节点的依据) -> 0 (唤醒后继节点后回归纯真)
- 2.若当前节点的后继节点不存在或是取消状态,需要反向遍历去找最靠近当前节点的非取消状态后继节点
- 3.若存在非取消状态的后继节点就唤醒他,当然找不到就算了(说明没有需要获取锁的节点存在啦)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
复制代码/**
* 唤醒当前节点的后继节点
* @param node the node 当前节点
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//重置当前节点标志位
if (ws < 0)
/**
* 若是负数就表示未取消,需要回归纯真状态-> 0
* 允许失败,因为只有取消状态的节点才会被回收
*/
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally just the next node.
* But if cancelled or apparently null,traverse backwards from tail to find
* the actual non-cancelled successor.
*
* 通常node.next就是要被唤醒的后继节点
* 但若next节点被取消或为空,需要反向遍历去找最靠前的非取消状态节点并唤醒她
* waitStatus <= 0 说明未取消,就是节点还想获取锁的意思
* 共享模式时会发生s == null的情况
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
//找到同步队列中最靠前(即最靠近node)的想要获取锁的节点
if (t.waitStatus <= 0)
/**
* 小问:为啥是最靠前呢?
*
* 小答:因为在这里会不断把非取消状态的节点赋值给s
* 直到是最靠近node(即最靠前)的时候才停止
* 而不是一旦找到一个非取消状态的节点就停止循环啦!
*/
s = t;
}
/**
* 如果后继节点非空,那就让我们唤醒她吧!
*/
if (s != null)
LockSupport.unpark(s.thread);
}

小问:为神马要沿着prev从tail往前获取非取消节点呢?
友情小提示:读者可以从入队、出队、取消操作入手考虑prev和next的线程安全问题

小答:这个问题实际上考察了两个知识点,一个是为何要反向遍历,另一个是为何要跳过取消节点
1.反向遍历:prev赋值线程安全,next赋值非线程安全且在CLH队列不同形态会有不同的表现,判断逻辑会比prev复杂很多
2.跳过取消节点:取消状态节点都是需要被GC回收的节点,因为其放弃了获取锁的机会,正所谓强扭的瓜不甜呀!


小问:有心的读者会发现在unparkSuccessor方法中其实并没有操作队列,那么线程节点是何时出队的呢?
友情小提示:读者可以回顾一下获取锁的整个过程重新梳理一下

小答:unparkSuccessor方法主要做了个唤醒操作,真正的出队操作(比如调整prev、next)是在setHead、shouldParkAfterFailedAcquire、cancelAcquire等方法中实际完成的


9.题外话

笔者看了一下,距离上篇已过了一月有余,以至于期间不止一人问笔者是否已断更(疼)。笔者正好借此机会谈一下笔者的想法,笔者会一直写下去,之所以长时间不更新,主要有如下几点原因:
1.不可原谅的客观原因:笔者近期的确事情很多,不止于工作
2.出一篇令自己满意的、接近”精品”的文章真的给了笔者很大的压力,唯恐误人子弟
3.为了此篇,笔者花了很长时间去看了很多资料(也包括翻译Doug Lea老师的论文),但由于水平有限,在加上此篇的确需要好好研读,因此无论从构思还是内容都花了大量的时间,期间笔者都不知修改了多少次都不甚满意,甚至重写的情况,排期真是一变再变
4.由于并发的难以预料带来的难以理解,因此在理解AQS时,需要从多个维度去思考问题,包括每行代码的意义、CAS操作(如为何此处用CAS?CAS成功后?CAS失败后?)、从单个线程到并发、从入队到出队、从好人模式切换到坏人模式等等,AQS篇的精妙之处还远不止笔者目前所述(此篇还是略显仓促),因此随着笔者功力的加深,笔者会坚持继续维护此篇
5.因为种种原因造成的此篇内容有错或者不足之处,笔者十分抱歉,也感谢读者能够热心指点一二,不胜感激!!!
6.最好还是希望读者们继续支持笔者,支持XX番系列~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Express 提交数据的几种方式

发表于 2017-11-28

我们知道,HTTP 协议是以 ASCII 码传输,建立在 TCP/IP协议之上的应用层规范。该规范把 HTTP 请求分为三个部分:状态行、请求头、消息主体。类似于下面这样:

1
2
3
4
5
复制代码<method> <request-url> <version>

<headers>

<entity-body></entity-body></headers></version></request-url></method>

协议规定 POST 提交的数据必须放在消息主体(entity-body)中,但协议并没有规定数据必须使用什么编码方式。实际上,开发者完全可以自己决定消息主体的格式,只要最后发送的 HTTP 请求满足上面的格式就可以。

但是,数据发送出去,还要服务端解析成功才有意义。一般服务端如 Node.js、php、python、java 等,以及它们的 framework,都内置了自动解析常见数据格式的功能。服务端通常是根据请求头(headers)中的 Content-Type 字段来获知请求中的消息主体是用何种方式编码,再对主体进行解析。所以说到 POST 提交数据方案,包含了 Content-Type 和消息主体编码方式两部分。
关于POST 提交数据方式,详细的内容就不介绍了,请参考
四种常见的 POST 提交数据方式。

好了,进入正片 >>>
对于Node的Express框架来说,主要需要处理一下四种格式:

一.www-form-urlencoded

http默认的post请求是这种方式,注意这是是默认的提交方式,比如你写一个

1
复制代码<form>....<input type="submit" /></form>

form表单,里面的submit按钮默认就是这种 www-form-urlencoded 方式提交的。
我们采用body-parser这个插件就可以了,具体用法请点链接。

二. form-data

表单提交默认是www-form-urlencoded,但是当需要上传文件的时候,必须在form标签里做这样的标识 enctype=”multipart/form-data”。
Node中处理这类表单还需要以下其中一种中间件

  • connect-multiparty
  • formidable
  • multer

需要特别注意的是,form-data这三个插件本身就可以处理www-form-urlencoded,所以如果需要同时处理字段和文件,不需要再引入www-form-urlencoded相关的插件了。

三.application/json

bodyParser 支持此类参数解析.

注意: 在提交之前需要指定http请求头设为 content-type=application/json

四.text/xml

这种请求类型不是特别常见,body-parse默认也不解析这种数据格式,但是由于目前腾讯微信平台在使用这种数据交换格式,在做微信相关的接口用的还是很多.Node在express如何解析这种格式,没有好的办法,只能自己用代码处理,把请求体参数按照字符串读取出来,然后使用xml2json包把字符串解析成json对象,使用起来就方便多了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码const express = require('express'),
bodyParser = require('body-parser'),
xml2json=require('xml2json'),
app = express(),
server = require('http').createServer(app);

app.use(bodyParser.urlencoded({
extended: true
}));

app.post('/xml', function (req, res) {
req.rawBody = '';
let json={};
req.setEncoding('utf8');

req.on('data', function(chunk) {
req.rawBody += chunk;
});

req.on('end', function() {
json = xml2json.toJson(req.rawBody);
res.send(JSON.stringify(json));
});

});

const PORT = process.env.PORT || 8002;
server.listen(PORT);

第二种方式就是使用针对微信的中间件,wechat-parser .
推荐采用后者。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JVM系列之GC

发表于 2017-11-28

JVM系列之GC

谈到JVM,大家都知道GC(Garbage Collection),GC这个话题说浅了就一句话–JVM自动垃圾收集,说深了就无止尽了,回收算法,各种收集器,gc类型,gc触发点….等等,作者也是略懂皮毛,这里给大家推荐一个知乎上比较活跃的JVM大牛,RednaxelaFX,是专门做JVM开发的,业界号称”R大”。放个传送门:R大
鉴于作者才学疏浅,这篇博文还是准备用通熟易懂的话把作者自己对GC这一块的理解做陈述,概要如下:

文章结构

  1. 哪些内存需要回收(Which)
  2. 各种GC的触发时机(When)
  3. 如何回收(How)
    3.1 回收算法
    3.2 HotSpot的具体实现-各种收集器
  4. GC日志

1. 哪些内存需要回收(Which)

大多数没干过C或者C++的Javaer是幸福的,因为没有体会过那种自己new delete内存的感觉,创建对象就是new,不管内存的回收问题。其实我们的内存是JVM的GC机制来帮我们回收的。那么问题来了。到底哪些内存需要回收呢?
答案:可达性分析算法,说白了,就是JVM预先确定一组GC roots引用变量,如Student stu =new Student();这个stu就可以作为GC roots,当进行垃圾回收时,JVM通过GC Roots找到能够引用到的所有活对象,然后把剩下的对象标记为”无用”,即可回收状态!
能够作为GC roots的引用如下:

  • 所有Java线程当前活跃的栈帧里指向GC堆里的对象的引用;换句话说,当前 所有正在被调用的方法的引用类型的参数/局部变量/临时值。
  • VM的一些静态数据结构里指向GC堆里的对象的引用,例如说HotSpot VM里的Universe里有很多这样的引用。
  • JNI handles,包括global handles和local handles(看情况)所有当前被加载的Java类(看情况)Java类的引用类型静态变量(看情况)Java类的运行时常量池里的引用类型常量(String或Class类型)(看情况)String常量池(StringTable)里的引用

2. 各种GC的触发时机(When)

2.1 GC类型

说到GC类型,就更有意思了,为什么呢,因为业界没有统一的严格意义上的界限,也没有严格意义上的GC类型,都是左边一个教授一套名字,右边一个作者一套名字。为什么会有这个情况呢,因为GC类型是和收集器有关的,不同的收集器会有自己独特的一些收集类型。所以作者在这里引用R大关于GC类型的介绍,作者觉得还是比较妥当准确的。如下:

  • Partial GC:并不收集整个GC堆的模式
    • Young GC(Minor GC):只收集young gen的GC
    • Old GC:只收集old gen的GC。只有CMS的concurrent collection是这个模式
    • Mixed GC:收集整个young gen以及部分old gen的GC。只有G1有这个模式
  • Full GC(Major GC):收集整个堆,包括young gen、old gen、perm gen(如果存在的话)等所有部分的模式。

2.2 触发时机

上面大家也看到了,GC类型分分类是和收集器有关的,那么当然了,对于不同的收集器,GC触发时机也是不一样的,作者就针对默认的serial GC来说:

  • young GC:当young gen中的eden区分配满的时候触发。注意young GC中有部分存活对象会晋升到old gen,所以young GC后old gen的占用量通常会有所升高。
  • full GC:当准备要触发一次young GC时,如果发现统计数据说之前young GC的平均晋升大小比目前old gen剩余的空间大,则不会触发young GC而是转为触发full GC(因为HotSpot VM的GC里,除了CMS的concurrent collection之外,其它能收集old gen的GC都会同时收集整个GC堆,包括young gen,所以不需要事先触发一次单独的young GC);或者,如果有perm gen的话,要在perm gen分配空间但已经没有足够空间时,也要触发一次full GC;或者System.gc()、heap dump带GC,默认也是触发full GC。

3. 如何回收(How)

3.1 回收算法

由于网上已经拥有非常多的优秀博文来详细介绍关于回收算法这块,所以这块作者将引用其他博客的介绍并加上自己的一些描述:
3.1.1 标记清除算法(Mark-Sweep)

3.1.2复制算法(Coping)(绝大部分收集器的新生代使用的算法)

复制算法在JVM新生代垃圾回收中的运用:

Eden:From:TO =8:1:1
由于新生代中90%的对象都是”朝生夕死”,采用复制算法是比较合理的,首先只移动了存活下来的对象(比较少数),其次,内存在移动到To区域后是有顺序的,不存在内存碎片。
值得一提的是,假如在一次MinorGC时,Eden中存活的对象+From中存活的对象>To的剩余空间,则会通过担保机制将对象直接转移到Old gen ,如果Old gen的内存空间也不够,则进行一次Full gc .
当对象的年龄到达15岁时会转移到Old gen (可通过参数配置,一般不建议更改。)

3.1.3标记-整理算法(Mark-Compact):

由于Old gen 的大部分对象都是年龄很大的对象,所以存活率比较高,采用复制算法肯定是行不通的(较多的对象复制操作),所以才大部分收集器的old gen采用 Mark-Compact算法,避免了空间碎片。

3.1.4三种算法比较:

稍微解释一下常见的关于GC时间的问题:
为什么FGC的时间比MinorGC长很多?
答:FGC进行了old gen的gc,由于算法上采用Mark-Sweep或者Mark-Compact,进行了很多对象(老年代存活率很低)的移动,当然很耗时了!其实就是空间换时间,时间换空间的问题。

3.2 HotSpot的具体实现-各种收集器

关于收集器这块,由于本人也是JVM初学者,加上很少有在生产环境做收集器参数调整,搭配使用的机会。所以可以说对于一些HotSpot收集器只是停留在
书籍与博文层次,所以这里就不卖弄了。下面给一个传送门大家自行看一看吧:
www.jianshu.com/p/50d5c88b2…

4 GC日志

-XX:+PrintGCDateStamps
-XX:+PrintGCDetails
-Xloggc:/Users/zdy/Desktop/dump/gclog.txt

当服务器出现卡顿比较频繁时,尝试看下自己的GC日志,注意Full gc 频率。

最后,稍微说一下作者的心得:

  • 如果是服务器一次卡顿时间比较长,一般是full gc时间过长,而应用最求的是卡顿(STW)时间短,可以接受多次卡顿,那么可以考虑调整加大young gen的比例,和提高进入老年代的年龄(默认15岁)来尽量避免FGC.
  • 选择合适的收集器很重要。要根据应用的特点。是追求吞吐量,还是追求最小停顿。
  • 经常对照gc日志观察现实的情况,如多长时间一卡顿,多久一卡顿,然后来调整自己的收集器或者相关的内存比例来达到自己想要的效果。
  • 在有限的物理资源条件下,要避免让用户接受过多的STW,可以考虑在半夜自动进行gc(System.gc()),虽然不一定生效,但可以观察下效果。多数情况下是会触发full gc 的。
  • 大多数应用是可以接受频繁的mgc,但却不能接受full gc 的长时间卡顿,所以在观察gc日志时一定要注意自己full gc的频率和触发条件(是由于内存担保,还是年龄到了,还是TO内存太小,导致每次都fgc.).
    由于作者在gc这一块也不是特别厉害,并且缺少一定的实战经验,不敢妄自菲薄,所以给传送门供大家参考阅读:
    www.jianshu.com/p/088d71f20…
    www.cnblogs.com/mikevictor0…
    www.cnblogs.com/mikevictor0…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Faiss:Facebook开源的相似性搜索类库

发表于 2017-11-28

Facebook在今年3月份发布了Facebook AI相似性搜索(简称Faiss)项目,该项目提供的类库可以从多媒体文档中快速搜索出相似的条目——这个场景下的挑战是基于查询的传统搜索引擎无法解决的。Facebook 人工智能实验室(FAIR)基于十亿级别的数据集构建了最近邻搜索算法的实现,这比之前介绍的已知文献中在GPU上实现的最先进且最快的k-selection算法还要快大约8.5倍,因此创造了新的记录,包括第一个基于十亿高维向量构建的k最近邻图。

关于相似性搜索

传统的数据库是由包含符号信息的结构化数据表组成。比如,一个图片集可以表示为一个数据表,每行代表一个被索引的图片,包含图片标识符和描述文字之类的信息;每一行也可以与其他数据表中的实体关联起来,比如某个用户的一张图片可以与用户姓名表建立关联。

像文本嵌入(word2vec)或者卷积神经网络(CNN)描述符这样通过深度学习训练出的AI工具,都可以生成高维向量。这种表示远比一个固定的符号表示更加强大和灵活,正如后文将解释的那样。然而使用SQL查询的传统数据库并不适用这些新的表示方式。首先,海量多媒体信息的涌入产生了数十亿的向量;其次,且更重要的是,查找相似实体意味着查找相似的高维向量,如果只是使用标准查询语言这将非常低效和困难。

如何使用向量表示?

假设有一张建筑物的图片——比如某个你不记得名字的中等规模城市的市政大厅——然后你想在图片集中查找所有该建筑物的图片。由于不记得城市的名字,此时传统SQL中常用的key/value查询就帮不上忙了。

这就是相似性搜索的用武之地了。图片的向量化表示旨在为相似的图片生成相似向量,这里相似向量定义为欧氏距离最近的向量。

向量化表示的另一个应用是分类。假设需要一个分类器,来判定某个相册中的哪些图片属于菊花。分类器的训练过程众所周知:给算法分别输入菊花的图片和非菊花的图片(比如汽车、羊、玫瑰、矢车菊等);如果分类器是线性的,那么就输出一个分类向量,其属性值是它与图片向量的点积,反映了该图片包含菊花的可能性;然后分类器可以与相册中所有图片计算点积,并返回点积最大的图片。这种查询就是“最大内积”搜索。

所以,对于相似性搜索和分类,我们需要做下列处理:

  • 给定一个查询向量,返回与该向量的欧式距离最近的数据库对象列表。
  • 给定一个查询向量,返回与该向量点积最大的数据库对象列表。

一个额外的挑战是,要在一个超大规模比如数十亿向量上做这些运算。

软件包

现有软件工具都不足以完成上述数据库检索操作。传统的SQL数据库系统也不太适合,因为它们是为基于哈希的检索或1维区间检索而优化的;像OpenCV等软件包中的相似性搜索功能在扩展性方面则严重受限;同时其他的相似性搜索类库主要适用于小规模数据集(比如,1百万大小的向量);另外的软件包基本是为发表论文而输出的学术研究产物,旨在展示某些特定设置下的效果。

Faiss类库则解决了以上提到的种种局限,其优点如下:

  • 提供了多种相似性搜索方法,支持各种各样的不同用法和功能集。
  • 特别优化了内存使用和速度。
  • 为最相关索引方法提供了最先进的GPU实现。

相似性搜索评估

一旦从学习系统(从图片、视频、文本文件以及其他地方)抽取出向量,就能准备将其用于相似性搜索类库。

我们有一个暴力算法作为参考对比,该算法计算出了所有的相似度——非常精确和齐全——然后返回最相似的元素列表。这就提供了一个黄金标准的参考结果列表。需要注意的是,暴力算法的高效实现并不简单,一般依赖于其他组件的性能。

如果牺牲一些精度的话,比如允许与参考结果有一点点偏差,那么相似性搜索能快几个数量级。举个例子,如果一张图片的相似性搜索结果中的第一个和第二个交换了,可能并没有太大问题,因为对于一个给定的查询,它们可能都是正确结果。加快搜索速度还涉及到数据集的预处理,我们通常把这个预处理操作称作索引。

这样一来我们就关注到下面三个指标:

  • 速度。找到与查询最相似的10个或更多个向量要耗时多久?期望比暴力算法耗时更少,不然索引的意义何在?
  • 内存消耗。该方法需要消耗多少RAM?比原始向量更多还是更少?Faiss支持只在RAM上搜索,而磁盘数据库就会慢几个数量级,即便是SSD也是一样。
  • 精确度。返回的结果列表与暴力搜索结果匹配程度如何?精确度可以这样评估,计算返回的真正最近邻结果在查询结果第一位(这个指标一般叫做1-recall@1)的数量,或者衡量返回结果前10个(即指标10-intersection)中包含10个最近邻结果的平均占比。

通常我们都会在确定的内存资源下在速度和精准度之间权衡。Faiss专注于压缩原始向量的方法,因为这是扩展到数十亿向量数据集的不二之选:当必须索引十亿个向量的时候,每个向量32字节,就会消耗很大的内存。

许多索引类库适用于百万左右向量的小规模数据集,比如nmslib就包含了一些适于这种规模数据的非常高效的算法,这比Faiss快很多,但需要消耗更多的存储。

基于10亿向量的评估

由于工程界并没有针对这种大小数据集的公认基准,所以我们就基于研究结果来评估。

评估精度基于Deep1B,这是一个包含10亿图片的数据集。每张图片已通过CNN处理,CNN激活图之一用于图片描述。比较这些向量之间的欧氏距离,就能量化图片的相似程度。

Deep1B还带有一个较小的查询图片集,以及由暴力算法产生的真实相似性搜索结果。因此,如果运行一个搜索算法,就能评估结果中的1-recall@1。

选择索引

为了评估,我们把内存限制在30G以内。这个内存约束是我们选择索引方法和参数的依据。Faiss中的索引方法表示为一个字符串,在本例中叫做OPQ20_80,IMI2x14,PQ20。

该字符串包含的信息有,作用到向量上的预处理步骤(OPQ20_80),一个选择机制(IMI2x14)表明数据库如何分区,以及一个编码组件(PQ20)表示向量编码时使用一个产品量化器(PQ)来生成一个20字节的编码。所以在内存使用上,包括其他开销,累计少于30G。

这听起来技术性较强,所以Faiss文档提供了使用指南,来说明如何选择满足需求的最佳索引。

选好了索引类型,就可以开始执行索引过程了。Faiss中的算法实现会处理10亿向量并把它们置于一个索引库中。索引会存在磁盘上或立即使用,检索和增加/移除索引的操作可以穿插进行。

查询索引

当索引准备好以后,一系列搜索时间参数就会被设置来调整算法。为方便评估,这里使用单线程搜索。由于内存消耗是受限并固定的,所以需要在精确度和搜索时间之间权衡优化。举例说来,这表示为了获取40%的1-recall@1,可以设置参数以花费尽可能短的搜索时间。

幸运的是,Faiss带有一个自动调优机制,能扫描参数空间并收集提供最佳操作点的参数;也就是说,最可能的搜索时间对应某个精确度,反之亦然,最优的精确度对应某个搜索时间。Deep1B中操作点被可视化为如下图示:

本图中我们可以看到,达到40%的1-recall@1,要求每次查询耗时必须小于2ms,或者能优化到耗时0.5ms的话,就可以达到30%的1-recall@1。一次查询耗时2ms表示单核500 QPS的处理能力。

这个结果基本上能媲美目前业内最新研究成果了,即Babenko和Lempitsky在CVPR 2016发表的论文“Efficient Indexing of Billion-Scale Datasets of Deep Descriptors”,这篇论文介绍了Deep1B数据集,他们达到45%的1-recall@1需要耗时20ms。

10亿级数据集的GPU计算

GPU实现方面也做了很大的投入,在原生多GPU的支持下能产出惊人的单机性能。GPU实现已经可以作为对应CPU设备的替代,无需了解CUDA API就能挖掘出GPU的性能。Faiss支持所有Nvidia 2012之后发布的GPU(Kepler,计算能力 3.5+)。

我们把roofline model作为指南,它指出应当尽量让内存带宽或浮点运算单元满载。Faiss 的GPU实现在单GPU上的性能要比对应的CPU实现快5到10倍,像英伟达P100这样的新型Pascal架构硬件甚至会快20倍以上。

一些性能关键数字:

  • 对于近似的索引,使用YFCC100M数据集中的9500万张图片,一个基于128D CNN描述符的暴力k近邻图(k=10),只需4个Maxwell Titan X GPU就能在35分钟内构建完成,包括索引构建时间。
  • 十亿级向量的k近邻图现在触手可及。基于Deep1B数据集,可以构建一个暴力 k-NN图(k=10),达到0.65的10-intersection,需要使用4个Maxwell Titan X GPU花费不到12小时,或者达到0.8,使用8个Pascal P100-PCIe GPU消耗不到12小时。Titan X配置可以在不到5小时生成低质量的图。
  • 其他组件也表现出了骄人的性能。比如,构建上述Deep1B索引需要使用k均值聚类 6701万个120维的向量到262,144个簇,对于25 E-M迭代需要在4个Titan X GPU(12.6 tflop/s)上花139分钟,或者在8个P100 GPU(40 tflop/s)上花43.8分钟。注意聚类的训练数据集并不需要放在GPU内存中,因为数据可以在需要时流到GPU而没有额外的性能影响。

底层实现

Facebook AI研究团队2015年就开始开发Faiss,这建立在许多研究成果和大量工程实践的基础之上。对于Faiss类库,我们选择聚焦在一些基础技术方面的优化,特别是在CPU方面,我们重度使用了:

  • 采用多线程来利用多核资源,并在多个GPU上执行并行检索。
  • 使用BLAS类库通过矩阵和矩阵乘法来高效精准地完成距离计算。一个不采用BLAS的暴力实现很难达到最优。BLAS/LAPACK是Faiss唯一强制依赖的软件。
  • 采用机器SIMD向量化和popcount加速独立向量的距离计算。

关于GPU

对于前述相似性搜索的GPU实现,k-selection(查找k个最小或最大元素)有一个性能问题,因为传统CPU算法(比如堆查找算法)对GPU并不友好。针对Faiss GPU,我们设计了文献中已知的最快轻量k-selection算法(k<=1024)。所有的中间状态全部保存在寄存器,方便高速读写。可以对输入数据一次性完成k-select,运行至高达55%的理论峰值性能,作为输出的峰值GPU内存带宽。因为其状态单独保存在寄存器文件中,所以与其他内核很容易集成,使它成为极速的精准和近似检索算法。

大量的精力投在了为高效策略做铺垫,以及近似搜索的内核实现。通过数据分片或数据副本可以提供对多核GPU支持,而不会受限于单GPU的可用显存大小;还提供了对半精度浮点数的支持(float16),可在支持的GPU上做完整float16运算,以及早期架构上提供的中间float16存储。我们发现以float16编码向量技术可以做到精度无损加速。

简而言之,对关键因素的不断突破在实践中非常重要,Faiss确实在工程细节方面下了很大的功夫。

开始使用Faiss

Faiss使用C++实现,并支持Python。只要从Github下载源码并编译,然后在Python中导入Faiss模块即可开始使用。Faiss还完整集成了Numpy,并支持构造numpy(使用float32)数组的所有函数。

获取Faiss: github.com/facebookres…

索引对象

Faiss(包括C++和Python)提供了索引Index的实例。每个Index子类实现一个索引结构,以说明哪些向量可被加入和搜索。比如,IndexFlatL2是一个能使用L2距离搜索的暴力索引。

1
2
3
4
5
6
复制代码__Tue Nov 28 2017 16:16:39 GMT+0800 (CST)____Tue Nov 28 2017 16:16:39 GMT+0800 (CST)__import faiss                   # 导入faiss
index = faiss.IndexFlatL2(d) # 构建索引,d是向量大小
# here we assume xb contains a n-by-d numpy matrix of type float32
index.add(xb) # 把向量加入索引
print index.ntotal
__Tue Nov 28 2017 16:16:39 GMT+0800 (CST)____Tue Nov 28 2017 16:16:39 GMT+0800 (CST)__

这样会打印出索引向量的数量。增加到一个IndexFlat仅仅表示拷贝它们到索引的内部存储,因为后面没有其他操作会作用在该向量上。

执行一次搜索:

1
2
3
4
5
复制代码__Tue Nov 28 2017 16:16:39 GMT+0800 (CST)____Tue Nov 28 2017 16:16:39 GMT+0800 (CST)__# xq is a n2-by-d matrix with query vectors
k = 4 # 需要4个相似向量
D, I = index.search(xq, k) # 实际搜索
print I
__Tue Nov 28 2017 16:16:39 GMT+0800 (CST)____Tue Nov 28 2017 16:16:39 GMT+0800 (CST)__

I是一个整型矩阵,输出后是这样的:

1
2
3
4
复制代码__Tue Nov 28 2017 16:16:39 GMT+0800 (CST)____Tue Nov 28 2017 16:16:39 GMT+0800 (CST)__[[  0 393 363  78]
[ 1 555 277 364]
[ 2 304 101 13]]
__Tue Nov 28 2017 16:16:39 GMT+0800 (CST)____Tue Nov 28 2017 16:16:39 GMT+0800 (CST)__

对于xq的第一个向量,xb中最相似向量的索引是0(从0开始),第二相似的是 #393,第三是 #363。对于xq的第二个向量,相似向量列表是 #1, #555 等等。本例中,xq的前三个向量看起来与xb的前三个向量一样。

矩阵D是一个平方距离矩阵,与I的大小一致,表示对于每个结果向量查询的平方欧氏距离。

Faiss实现了十多个由其他索引组合的索引类型。可选的GPU版本有完全相同的接口,并有通道在CPU和CPU索引之间互通。Python接口主要由C++生成以凸显C++索引,所以可以很容易地将Python验证代码转换为集成的C++代码。

查看英文原文:code.facebook.com/posts/13737…


感谢蔡芳芳对本文的审校。

给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ,@丁晓昀),微信(微信号:
InfoQChina)关注我们。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于分布式环境下限流系统的设计

发表于 2017-11-28

前提

业务背景

就拿前些天的双十一的 “抢券活动” 来说,一般是设置整点开始抢的,你想想,淘宝的用户群体非常大,可以达到亿级别,而服务接口每秒能处理的量是有限的,那么这个时候问题就会出现,我们如何通过程序来控制用户抢券呢,于是就必须加上这个限流功能了。

生产环境

1、服务接口所能提供的服务上限(limit)假如是 500次/s

2、用户请求接口的次数未知,QPS可能达到 800次/s,1000次/s,或者更高

3、当服务接口的访问频率超过 500次/s,超过的量将拒绝服务,多出的信息将会丢失

4、线上环境是多节点部署的,但是调用的是同一个服务接口

于是,为了保证服务的可用性,就要对服务接口调用的速率进行限制(接口限流)。

什么是限流?

限流是对系统的出入流量进行控制,防止大流量出入,导致资源不足,系统不稳定。

限流系统是对资源访问的控制组件,控制主要的两个功能:限流策略和熔断策略,对于熔断策略,不同的系统有不同的熔断策略诉求,有的系统希望直接拒绝、有的系统希望排队等待、有的系统希望服务降级、有的系统会定制自己的熔断策略,这里只针对限流策略这个功能做详细的设计。

限流算法

1、限制瞬时并发数

Guava RateLimiter 提供了令牌桶算法实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现。

2、限制某个接口的时间窗最大请求数

即一个时间窗口内的请求数,如想限制某个接口/服务每秒/每分钟/每天的请求数/调用量。如一些基础服务会被很多其他系统调用,比如商品详情页服务会调用基础商品服务调用,但是怕因为更新量比较大将基础服务打挂,这时我们要对每秒/每分钟的调用量进行限速;一种实现方式如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码LoadingCache<Long, AtomicLong> counter =
CacheBuilder.newBuilder()
.expireAfterWrite(2, TimeUnit.SECONDS)
.build(new CacheLoader<Long, AtomicLong>() {
@Override
public AtomicLong load(Long seconds) throws Exception {
return new AtomicLong(0);
}
});
long limit = 1000;
while(true) {
//得到当前秒
long currentSeconds = System.currentTimeMillis() / 1000;
if(counter.get(currentSeconds).incrementAndGet() > limit) {
System.out.println("限流了:" + currentSeconds);
continue;
}
//业务处理
}

使用Guava的Cache来存储计数器,过期时间设置为2秒(保证1秒内的计数器是有的),然后我们获取当前时间戳然后取秒数来作为KEY进行计数统计和限流,这种方式也是简单粗暴,刚才说的场景够用了。

3、令牌桶


算法描述:

  • 假如用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中
  • 假设桶中最多可以存放b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃
  • 当流量以速率v进入,从桶中以速率v取令牌,拿到令牌的流量通过,拿不到令牌流量不通过,执行熔断逻辑

属性

  • 长期来看,符合流量的速率是受到令牌添加速率的影响,被稳定为:r
  • 因为令牌桶有一定的存储量,可以抵挡一定的流量突发情况
  • M是以字节/秒为单位的最大可能传输速率。 M>r
  • T max = b/(M-r) 承受最大传输速率的时间
  • B max = T max * M 承受最大传输速率的时间内传输的流量

优点:流量比较平滑,并且可以抵挡一定的流量突发情况

4、GOOGLE GUAVA 提供的工具库中 RATELIMITER 类(内部也是采用令牌桶算法实现)

最快的方式是使用 RateLimit 类,但是这仅限制在单节点,如果是分布式系统,每个节点的 QPS 是一样的,请求量到服务接口那的话就是 QPS * 节点数 了。所以这种方案在分布式的情况下不适用!

5、基于 REDIS 实现,存储两个 KEY,一个用于计时,一个用于计数。请求每调用一次,计数器增加 1,若在计时器时间内计数器未超过阈值,则可以处理任务。

这种能够很好地解决了分布式环境下多实例所导致的并发问题。因为使用redis设置的计时器和计数器均是全局唯一的,不管多少个节点,它们使用的都是同样的计时器和计数器,因此可以做到非常精准的流控。

代码就不公布了,毕竟涉及公司隐私了。

最后

参考文章:

基于Redis的限流系统的设计

感兴趣的可以看看别人的代码是怎么写的:github.com/wukq/rate-l…

原文作者:zhisheng
原文地址:基于分布式环境下限流系统的设计
版权说明:本文由极乐科技合作博主原创,转载请注明作者与出处,谢谢!
一元抢购微信小程序>>> 链接地址

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring笔记-异常处理

发表于 2017-11-28

Spring MVC提供了好几种方法让我来定制异常的处理。

本文参考:Exception Handling in Spring MVC

为异常定制HTTP状态码

默认如果我们在controller中抛出异常,Spring MVC会给用户响应500页面,并包含详细的错误信息。

如果我们想修改错误对应的HTTP状态码,我们可以在对应的异常上面添加@ResponseStatus注解,通过这个注解我们可以设置这个异常对应的HTTP状态码和错误信息,例子:

1
2
3
4
5
6
7
8
9
10
复制代码@Controller
public class ExceptionController {
@RequestMapping("/")
public void test(){
throw new NotFoundException();
}
}
@ResponseStatus(value = HttpStatus.NOT_FOUND, reason = "not found")
public class NotFoundException extends RuntimeException{
}

然后请求,可以发现页面不一样了:

Controller级别的错误拦截处理

通过@ResponseStatus注解,我们虽然可以定制HTTP状态码和错误信息了,但是完全不够用。

第一,只能设置自己写的异常,对于已有的异常,无法进行扩展。

第二,无法定制错误页面,默认的错误页面我们基本是不会使用的。

对于以上两个问题,可以在Controller里添加方法来拦截处理异常。方法需要使用@ExceptionHandler注解。注解后,方法会拦截当前Controller的请求处理方法(被@RequestMapping注解的方法)所抛出的异常。同时这个异常拦截方法,可以返回视图,该视图用于渲染错误信息。同时还可以在这个异常拦截方法上,使用@ResponseStatus来实现对已有异常的HTTP状态码定制,具体看例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
复制代码@Controller
public class ExceptionHandlingController {
// 请求处理方法
...

// 异常处理方法

// 定制一个已有异常的HTTP状态码
@ResponseStatus(value=HttpStatus.CONFLICT,
reason="Data integrity violation") // 409
@ExceptionHandler(DataIntegrityViolationException.class)
public void conflict() {
// 啥也不干
}

// 指定view来渲染对应的异常
@ExceptionHandler({SQLException.class,DataAccessException.class})
public String databaseError() {
// Nothing to do. Returns the logical view name of an error page, passed
// to the view-resolver(s) in usual way.
// Note that the exception is NOT available to this view (it is not added
// to the model) but see "Extending ExceptionHandlerExceptionResolver"
// below.
// 啥也不干,就返回异常页面view的名称
// 注意这里的view访问不到异常,因为异常没有添加到model中
return "databaseError";
}
// 拦截该Controller抛出的所有异常,同时把异常信息通过ModelAndView传给视图
// 或者你可以继承ExceptionHandlerExceptionResolver来实现,见下文
@ExceptionHandler(Exception.class)
public ModelAndView handleError(HttpServletRequest req, Exception ex) {
logger.error("Request: " + req.getRequestURL() + " raised " + ex);
ModelAndView mav = new ModelAndView();
mav.addObject("exception", ex);
mav.addObject("url", req.getRequestURL());
mav.setViewName("error");
return mav;
}
}

注意,使用@ExceptionHandler一定要指定处理的是哪个异常,否则会报异常:java.lang.IllegalArgumentException: No exception types mapped to {public java.lang.String XXController.exceptionHandler()}

全局异常处理

Controller级别的异常控制虽然已经够强大了,但是我们总不可能每个Controller都写一个handleError方法吧,所以我们一定需要一个全局的异常处理方法。借助@ControllerAdvice可以简单直接的实现这个需求。

@ControllerAdvice是Spring3.2添加的注解,和名字一样,这个注解提供了增强Controller的功能,可把advice类中的@ExceptionHandler、@InitBinder、@ModelAttribute注解的方法应用到所有的Controller中去。最常用的就是@ExceptionHandler了。本来我们需要在每个Controller中定义@ExceptionHandler,现在我们可以声明一个@ControllerAdvice类,然后定义一个统一的@ExceptionHandler方法。

比如上面的例子,用@ControllerAdvice的写法如下:

1
2
3
4
5
6
7
8
复制代码@ControllerAdvice
class GlobalControllerExceptionHandler {
@ResponseStatus(HttpStatus.CONFLICT) // 409
@ExceptionHandler(DataIntegrityViolationException.class)
public void handleConflict() {
// 啥也不干
}
}

如果你想拦截所有错误,那其实和上面的Controller级别的例子一样,设置拦截的Exception为Exception.class即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码@ControllerAdvice
class GlobalDefaultExceptionHandler {
public static final String DEFAULT_ERROR_VIEW = "error";
@ExceptionHandler(value = Exception.class)
public ModelAndView
defaultErrorHandler(HttpServletRequest req, Exception e) throws Exception {
// 这里需要注意一下,因为这个方法会拦截所有异常,包括设置了@ResponseStatus注解的异常,如果你不想拦截这些异常,可以过滤一下,然后重新抛出
if (AnnotationUtils.findAnnotation
(e.getClass(), ResponseStatus.class) != null)
throw e;
// 组装异常信息给视图
ModelAndView mav = new ModelAndView();
mav.addObject("exception", e);
mav.addObject("url", req.getRequestURL());
mav.setViewName(DEFAULT_ERROR_VIEW);
return mav;
}
}

更深层的拦截

上面说的Controller级别以及Controller Advice级别的拦截,是基于注解的,是高级特性。底层实现上,Spring使用的是HandlerExceptionResolver。

所有定义在DispatcherServlet应用上下文中的bean,只要是实现了HandlerExceptionResolver接口,都会用来异常拦截处理。

看一下接口的定义:

1
2
3
4
复制代码public interface HandlerExceptionResolver {
ModelAndView resolveException(HttpServletRequest request,
HttpServletResponse response, Object handler, Exception ex);
}

handler参数是抛出异常的Controller的引用。

Spring实现了几种HandlerExceptionResolver,这些类是上面提到的几个特性的基础:

  • ExceptionHandlerExceptionResolver:判断异常是否可以匹配到对应Controller或者Controller Advice中的@ExceptionHandler方法,如果可以则触发(前文提到的异常拦截方法的特性就是这个类实现的)
  • ResponseStatusExceptionResolver:判断异常是否被@ResponseStatus注解,如果是,则使用注解的信息来更新Response(前文提到的自定义HTTP状态码就是用这个特性实现的)
  • DefaultHandlerExceptionResolver:转换Spring异常,并转换为HTTP状态码(Spring内部使用)

这几个HandlerExceptionResolver会按照这个顺序来执行,也就是异常处理链。

这里可以看到,resolveException方法签名中没有Model参数,所以@ExceptionHandler方法也不能注入这个参数,所以上文中,异常拦截方法只能自己新建Model。

所以,如果你需要,你可以自己继承HandlerExceptionResolver来实现自己的异常处理链。然后再实现Ordered接口,这样就可以控制处理器的执行顺序。

SimpleMappingExceptionResolver

Spring提供了一个很方便使用的HandlerExceptionResolver,叫SimpleMappingExceptionResolver。他有很多实用的功能:

  • 映射异常名称到视图名称(异常名称只需要指定类名,不需要包名)
  • 指定一个默认的错误页面
  • 把异常打印到log上
  • 指定exception到视图中的属性名,默认的属性名就是exception。(@ExceptionHandler方法指定的视图默认没法获取异常,而SimpleMappingExceptionResolver指定的视图可以)

用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码<bean id="simpleMappingExceptionResolver" class=
"org.springframework.web.servlet.handler.SimpleMappingExceptionResolver">
<property name="exceptionMappings">
<map>
<entry key="DatabaseException" value="databaseError"/>
<entry key="InvalidCreditCardException" value="creditCardError"/>
</map>
</property>
<!-- See note below on how this interacts with Spring Boot -->
<property name="defaultErrorView" value="error"/>
<property name="exceptionAttribute" value="ex"/>

<!-- Name of logger to use to log exceptions. Unset by default,
so logging is disabled unless you set a value. -->
<property name="warnLogCategory" value="example.MvcLogger"/>
</bean>

Java Configuration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码@Configuration
@EnableWebMvc // Optionally setup Spring MVC defaults (if you aren't using
// Spring Boot & haven't specified @EnableWebMvc elsewhere)
public class MvcConfiguration extends WebMvcConfigurerAdapter {
@Bean(name="simpleMappingExceptionResolver")
public SimpleMappingExceptionResolver
createSimpleMappingExceptionResolver() {
SimpleMappingExceptionResolver r =
new SimpleMappingExceptionResolver();
Properties mappings = new Properties();
mappings.setProperty("DatabaseException", "databaseError");
mappings.setProperty("InvalidCreditCardException", "creditCardError");
r.setExceptionMappings(mappings); // None by default
r.setDefaultErrorView("error"); // No default
r.setExceptionAttribute("ex"); // Default is "exception"
r.setWarnLogCategory("example.MvcLogger"); // No default
return r;
}
...
}

这里最有用的可能就是defaultErrorView了,他可以用于定制默认的错误页面。

自己继承SimpleMappingExceptionResolver来扩展功能也是非常常见的

  • 继承类可以在构造函数中设置好默认配置
  • 覆盖buildLogMessage方法来自定义日志信息,默认返回固定的:Handler execution resulted in exception
  • 覆盖doResolveException方法,可以向错误日志传入更多自己需要的信息

例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码public class MyMappingExceptionResolver extends SimpleMappingExceptionResolver {
public MyMappingExceptionResolver() {
// 默认启用日志
setWarnLogCategory(MyMappingExceptionResolver.class.getName());
}
@Override
public String buildLogMessage(Exception e, HttpServletRequest req) {
return "MVC exception: " + e.getLocalizedMessage();
}

@Override
protected ModelAndView doResolveException(HttpServletRequest req,
HttpServletResponse resp, Object handler, Exception ex) {
// 调用父类飞方法来获得ModelAndView
ModelAndView mav = super.doResolveException(req, resp, handler, ex);

// 添加额外的字段给视图
mav.addObject("url", request.getRequestURL());
return mav;
}
}

REST异常处理

REST风格下,返回的错误信息是一个json而不是一个页面,要如何做呢?特别简单,定义一个返回信息的类:

1
2
3
4
5
6
7
8
复制代码public class ErrorInfo {
public final String url;
public final String ex;
public ErrorInfo(String url, Exception ex) {
this.url = url;
this.ex = ex.getLocalizedMessage();
}
}

然后在错误处理函数上加上@ResponseBody就行:

1
2
3
4
5
6
复制代码@ResponseStatus(HttpStatus.BAD_REQUEST)
@ExceptionHandler(MyBadDataException.class)
@ResponseBody ErrorInfo
handleBadRequest(HttpServletRequest req, Exception ex) {
return new ErrorInfo(req.getRequestURL(), ex);
}

什么时候用什么特效?

Spring给我们提供了很多选择,我们要如何选择呢?

  • 如果异常是你自己声明的,可以考虑使用@ResponseStatus注解
  • 其他的异常可以使用@ControllerAdvice中的@ExceptionHandler方法,或者用SimpleMappingExceptionResolver
  • 如果Controller需要定制异常,可以在Controller中添加@ExceptionHandler方法。

如果你混用这几个特性,那要注意了,Controller中的@ExceptionHandler方法优先级比@ControllerAdvice中的@ExceptionHandler方法高,而如果有多个@ControllerAdvice类,那执行顺序是不确定的。

参考资料

  • Spring3.2新注解@ControllerAdvice - 开涛的博客—公众号:kaitao-1234567,一如既往的干货分享 - ITeye博客
  • Exception Handling in Spring MVC

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Hibernate级联之一对多和inverse解析

发表于 2017-11-28

hibernate的级联可以说是hibernate最重要的部分,只有深入了解了级联的特性与用法,才能运用自如。

  这次讨论一对多的情况,所以就使用博客项目的用户表和博客表作为示例,来一起学习hibernate的级联

基本准备

文件结构:

hibernate核心配置文件hibernate.cfg.xml:

)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
复制代码<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE hibernate-configuration PUBLIC
"-//Hibernate/Hibernate Configuration DTD 3.0//EN"
"http://www.hibernate.org/dtd/hibernate-configuration-3.0.dtd">

<hibernate-configuration>
<!-- 先配置SessionFactory标签,一个数据库对应一个SessionFactory标签 -->
<session-factory>
<!-- 必须的配置的参数5个,4个连接参数,1个数据库方言 -->
<!--
#hibernate.connection.driver_class com.mysql.jdbc.Driver
#hibernate.connection.url jdbc:mysql:///test
#hibernate.connection.username gavin
#hibernate.connection.password
数据库方言
#hibernate.dialect org.hibernate.dialect.MySQLDialect
-->
<property name="hibernate.connection.driver_class">com.mysql.jdbc.Driver</property>
<property name="hibernate.connection.url">jdbc:mysql:///blog</property>
<property name="hibernate.connection.username">root</property>
<property name="hibernate.connection.password">123456</property>
<property name="hibernate.dialect">org.hibernate.dialect.MySQLDialect</property>
<!-- 可选配置 -->
<!-- 显示sql语句 -->
<property name="hibernate.show_sql">true</property>
<!-- 格式化sql语句 -->
<property name="hibernate.format_sql">false</property>
<!-- 生成数据库的表结构
(hbm2dd全称hibernate mapping to db define language auto create)
update 没表会自动创建,有表添加数据。
如果开发中间需要添加字段,可以在实体类添加属性。update会自动在数据库添加字段,并且不改变原来数据库值
validate 校验实体属性和数据库是否一致
-->
<property name="hibernate.hbm2ddl.auto">update</property>

<!-- 映射配置文件,可以在map配置文件右键copy qualified name-->
<mapping resource="com/cky/domain/User.hbm.xml"/>
<mapping resource="com/cky/domain/Blog.hbm.xml"/>
</session-factory>
</hibernate-configuration>

View Code
如果对hibernate的配置还不是很清楚,可以看看这里

实体类的创建

  Hibernate中,可以直接将表的关系用对象表示。

  如本例中,一个博客只能有一个作者,所以Blog就可以添加一个User对象。

  一个用户有多个博客,所以可以在User中添加一个Blog的Set 集合。

  这里需要注意的是如果关联的是一个对象,那么不能在类中进行初始化new操作。

如果关联的是一个集合,那么必须用HashSet在类中进行初始化new操作

实体类Blog.java

)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
复制代码package com.cky.domain;

import java.sql.Timestamp;

public class Blog {
private int bId;
private String bSubject;
private String bContent;
private Timestamp createtime;
private Timestamp updatetime;
//hibernate中关联对象不能初始化
private User user;

//...getter setter 方法省略
public int getbId() {
return bId;
}
public void setbId(int bId) {
this.bId = bId;
}
public String getbSubject() {
return bSubject;
}
public void setbSubject(String bSubject) {
this.bSubject = bSubject;
}
public String getbContent() {
return bContent;
}
public void setbContent(String bContent) {
this.bContent = bContent;
}
public Timestamp getCreatetime() {
return createtime;
}
public void setCreatetime(Timestamp createtime) {
this.createtime = createtime;
}
public Timestamp getUpdatetime() {
return updatetime;
}
public void setUpdatetime(Timestamp updatetime) {
this.updatetime = updatetime;
}
public User getUser() {
return user;
}
public void setUser(User user) {
this.user = user;
}


}

View Code
实体类User.java

)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
复制代码package com.cky.domain;

import java.sql.Timestamp;
import java.util.HashSet;
import java.util.Set;

public class User {
private Integer uId;
private String uEmail;
private String uName;
private String uUsername;
private String uPassword;
private String uAge;
private String uDetail;
private String uAvatar;
private String isAdmin;
private Timestamp createtime;
private Timestamp updatetime;
//hibernate的集合必须初始化
private Set<Blog> blogs=new HashSet<Blog>();

//...getter setter 方法省略
public Integer getuId() {
return uId;
}
public void setuId(Integer uId) {
this.uId = uId;
}
public String getuEmail() {
return uEmail;
}
public void setuEmail(String uEmail) {
this.uEmail = uEmail;
}
public String getuName() {
return uName;
}
public void setuName(String uName) {
this.uName = uName;
}
public String getuUsername() {
return uUsername;
}
public void setuUsername(String uUsername) {
this.uUsername = uUsername;
}
public String getuPassword() {
return uPassword;
}
public void setuPassword(String uPassword) {
this.uPassword = uPassword;
}
public String getuAge() {
return uAge;
}
public void setuAge(String uAge) {
this.uAge = uAge;
}
public String getuDetail() {
return uDetail;
}
public void setuDetail(String uDetail) {
this.uDetail = uDetail;
}
public String getuAvatar() {
return uAvatar;
}
public void setuAvatar(String uAvatar) {
this.uAvatar = uAvatar;
}
public String getIsAdmin() {
return isAdmin;
}
public void setIsAdmin(String isAdmin) {
this.isAdmin = isAdmin;
}
public Timestamp getCreatetime() {
return createtime;
}
public void setCreatetime(Timestamp createtime) {
this.createtime = createtime;
}
public Timestamp getUpdatetime() {
return updatetime;
}
public void setUpdatetime(Timestamp updatetime) {
this.updatetime = updatetime;
}
public Set<Blog> getBlogs() {
return blogs;
}
public void setBlogs(Set<Blog> blogs) {
this.blogs = blogs;
}


}

View Code
编写基础映射文件


多对一情况映射文件的编写

  多对一时,使用标签,只需要指定三个属性:

  name:指定此标签所映射的属性名

  class:关联的表所对应的实体类的全限定类名

  column:关联表的外键名

  Blog.hbm.xml文件具体内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE hibernate-mapping PUBLIC
"-//Hibernate/Hibernate Mapping DTD 3.0//EN"
"http://www.hibernate.org/dtd/hibernate-mapping-3.0.dtd">
<hibernate-mapping>
<class name="com.cky.domain.Blog" table="blog">
<id name="bId" column="b_id">
<generator class="native"></generator>
</id>
<!-- 普通属性 -->
<property name="bSubject" column="b_subject"></property>
<property name="bContent" column="b_content"></property>
<property name="createtime" column="createtime"></property>
<property name="updatetime" column="updatetime"></property>
<!--
private User user;
多对一 配置-->
<many-to-one name="user" class="com.cky.domain.User" column="u_id" ></many-to-one>
</class>
</hibernate-mapping>

一对多情况映射文件的编写

  与多对一情况不同的是,一对多时关联对象是一个set集合。

  配置文件需要使用标签来和集合对象建立联系,其中的name指定对应的属性名

  在中,需要指定查询关联对象所需要的表(实体类)和比较字段(外键)

  User.hbm.xml具体如下:

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE hibernate-mapping PUBLIC
"-//Hibernate/Hibernate Mapping DTD 3.0//EN"
"http://www.hibernate.org/dtd/hibernate-mapping-3.0.dtd">

<hibernate-mapping>
<class name="com.cky.domain.User" table="user">
<!-- 配置id
name实体类属性,column表字段,如果一样,column可以省略。-->
<id name="uId" column="u_id">
<!-- 主键生成策略 -->
<generator class="native"></generator>
</id>
<!-- 普通属性-->
<property name="uEmail" column="u_email"></property>
<property name="uName" column="u_name"></property>
<property name="uUsername" column="u_username"></property>
<property name="uPassword" column="u_password"></property>
<property name="uAge" column="u_age"></property>
<property name="uDetail" column="u_detail"></property>
<property name="uAvatar" column="u_avatar"></property>
<property name="isAdmin" column="is_admin"></property>
<property name="createtime" column="createtime"></property>
<property name="updatetime" column="updatetime"></property>
<!-- private Set<Blog> blogs=new HashSet<Blog>();
集合的配置
name:这个类中对应的属性名
-->
<set name="blogs">
<!--column: 外键,hibernate会根据这个字段来查询与这个对象对应的多端的所有对象 -->
<key column="u_id"></key>
<!--class:集合代表的实体类,同时也代表要查询的表。
与上面的条件结合,就可以查询出表中所有外键字段为指定值的所有结果的集合。
-->
<one-to-many class="com.cky.domain.Blog"/>
</set>
</class>
</hibernate-mapping>

为了方便使用,还需要一个工具类HibernateUtils.java,很简单就不介绍了,下面是代码:

)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码package com.cky.utils;

import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.cfg.Configuration;

public class HibernateUtils {
//ctrl+shift+x
private static final Configuration CONFIG;
private static final SessionFactory FACTORY;

//编写静态代码块
static {
//加载XML的配置文件
CONFIG =new Configuration().configure();
//构造工作
FACTORY=CONFIG.buildSessionFactory();
}
/**
* 从工厂获取session对象
*/
public static Session getSession() {
return FACTORY.openSession();
}
}

View Code
测试基础配置(不使用级联)


到这里,基本的配置都设置完了,接下来测试配置的怎么样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
复制代码package com.cky.Demo;

import org.hibernate.Session;
import org.hibernate.Transaction;
import org.junit.Test;

import com.cky.domain.Blog;
import com.cky.domain.User;
import com.cky.utils.HibernateUtils;

public class CascadeTest {
@Test
public void testMTO2() {
Session session = HibernateUtils.getSession();
Transaction tr = session.beginTransaction();
//保存用户和博客
User user=new User();
user.setuName("王五");

Blog blog1=new Blog();
blog1.setbSubject("王五日常一");
blog1.setbContent("看电视");

Blog blog2=new Blog();
blog2.setbSubject("王五日常二");
blog2.setbContent("玩游戏");
 //为用户添加博客
user.getBlogs().add(blog1);
user.getBlogs().add(blog2);
//保存用户
session.save(user);

tr.commit();
session.close();
}
}

什么,居然报错了:TransientObjectException: object references an unsaved transient instance - save the transient instance before flushing

翻译一下,大致意思就是user对象引用了一个瞬时对象,因为当save(user)时,user已经被保存到缓存成为持久态对象,而给他添加的blog1和blog2,因为没有设置级联,所以不会被自动添加到缓存中,依然是瞬时态对象。

解决方法就是把两个blog1和blog2也进行save(),保存到session中:

1
2
3
4
5
6
7
8
9
10
复制代码@Test
public void testMTO2() {
//.....上面省略
//保存用户
session.save(user);
session.save(blog1);
session.save(blog2);
tr.commit();
session.close();
}

关于级联

  hibernate中用cascade属性设置级联。

  在基础的配置中,因为没有设置级联,默认是none,也就是不进行级联操作。

  就如上面的代码一样,我们需要手动的保证对象和他级联的对象都在同一状态,才能正确运行,这显然是很麻烦的,下面就看看如何通过设置级联属性来让代码更简单。

  cascade取值共有5个:

    none     默认值,不级联

    save-update  在保存、更新操作时自动级联保存更新关联对象

    delete    在删除时自动级联删除关联对象

    all      类似save-update-delete,即所以的操作都会级联

    all-delete-orphan 解除某一节点的关系时删除该节点(默认只是清除外键关系)

  接下来就在上面的基础配置上添加上面的属性看看有什么区别:

  save-update:

为user配置文件的添加cascade属性

1
2
3
4
复制代码<set name="blogs" **cascade="save-update"**>
<key column="u_id"></key>
<one-to-many class="com.cky.domain.Blog" />
</set>

此时我们运行上次报错的那段代码:

)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码@Test
public void testMTO() {
Session session = HibernateUtils.getSession();
Transaction tr = session.beginTransaction();
//保存用户和博客
User user=new User();
user.setuName("王五");

Blog blog1=new Blog();
blog1.setbSubject("王五日常一");
blog1.setbContent("看电视");

Blog blog2=new Blog();
blog2.setbSubject("王五日常二");
blog2.setbContent("玩游戏");

user.getBlogs().add(blog1);
user.getBlogs().add(blog2);

blog1.setUser(user);
blog2.setUser(user);
//自动关联
session.save(user);
//删除掉保存blog的代码

tr.commit();
session.close();
}

View Code
发现可以正确执行,因为保存user时,会自动级联保存两个blog,所以他们就全是持久态。

我们同时为blog配置文件添加cascade属性

1
复制代码<many-to-one name="user"   class="com.cky.domain.User"     column="u_id"    cascade="save-update"></many-to-one>

然后保存一个blog看看会发生什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码@Test
public void testMTO() {
Session session = HibernateUtils.getSession();
Transaction tr = session.beginTransaction();

User user=new User();
user.setuName("王五");

Blog blog1=new Blog();
blog1.setbSubject("王五日常一");
blog1.setbContent("看电视");

Blog blog2=new Blog();
blog2.setbSubject("王五日常二");
blog2.setbContent("玩游戏");

user.getBlogs().add(blog1);
user.getBlogs().add(blog2);

blog1.setUser(user);
blog2.setUser(user);

/*session.save(user);
session.save(blog1);*/
//只保存blog2
session.save(blog2);

tr.commit();
session.close();
}

运行成功,不过更有意思的是他保存了三条信息,而不是两条。

因为当保存 blog2 时,会级联保存 user ,而user又会级联把 blog1 保存

删除也是同样的道理,就不演示了,下面再研究一个all-delete-orphan,传说的孤儿删除

关于all-delete-orphan

all-delete-orphan上面已经简单介绍过,就是解除关系时会把节点删除而不只是删除外键。

我们把使用和不使用孤儿删除分别用代码实现,并做一次比较:

正常情况下的解除关系:

  原来的blog表中两条数据都和user id=1产生关系

现在我们把user和其中一个blog id=1解除关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码//普通解除关系
@Test
public void testMTO4() {
Session session = HibernateUtils.getSession();
Transaction tr = session.beginTransaction();

User user=(User) session.get(User.class, 1);
Blog blog=(Blog) session.get(Blog.class, 1);
//解除关系只需要把user集合中的blog移除即可
user.getBlogs().remove(blog);

tr.commit();
session.close();
}

运行sql:

再看看表情况:

正常情况,解除关系只是删除外键。

使用all-delete-orphan时解除关系:

为user配置文件添加all-delete-orphan

1
2
3
4
复制代码 <set name="blogs" **cascade="all-delete-orphan"**>
<key column="u_id"></key>
<one-to-many class="com.cky.domain.Blog" />
</set>

执行同样的代码解除关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码//孤儿删除
@Test
public void testMTO4() {
Session session = HibernateUtils.getSession();
Transaction tr = session.beginTransaction();

User user=(User) session.get(User.class, 1);
Blog blog=(Blog) session.get(Blog.class, 1);
//解除关系只需要把user集合中的blog移除即可
user.getBlogs().remove(blog);

tr.commit();
session.close();
}

sql的执行情况

数据表变化:

关于inverse(外键维护)

什么是外键维护呢?

  就是在两个关联对象中,如果关系发生改变需要修改外键。这么一说感觉这个功能肯定是必备的,要不然这么保证对象之间的关系呢?

  在hibernate是根据对象关系来判断是否要维护外键。

  这里有两个关键字,对象关系和外键。

  什么是对象关系?在hibernate中就是你这个对象A存的有对象B的引用,那么对象A就有对象B的的对象关系。有趣的是,对象关系可以是单向的,即A有B的对象关系,B不一定有A的对象关系。Hibernate是根据对象的对象关系来进行外键处理的。如果两边的对象关系都改变,那么默认hibernate都会进行外键处理(处理两次)。

  举个例子

  user1有blog1和blog2俩对象关系 、user2有blog 3和blog4俩对象关系

  1.现在我们把blog3添加到user1中(对象关系改变)

  2.因为这时blog3中的user还是user2,还要把blog3的user换成user1(对象关系改变)

  上面两个操作都改变了对象关系,如之前说的,session的缓存和快照不一致了,对于User对象,需要更新外键,对于Blog对象,也需要更新外键。

  但是,他们更新的是同一外键,也就是说对同一外键更新了两次,多了一个无意义的操作无疑增加了数据库的压力。

  

  也许有人可能会说,我不执行步骤2不就行了,结果还是正确的,还减少了sql。

  但是按照人的思维定式,在不知道的情况还是会按上面两个步骤走,感觉更合理。

  所以解决方法就在一方放弃外键维护。并且在多对多的情况下必须有一方需要放弃外键,否者程序无法运行。

  

  

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Asyncdb(四):MySQL网络协议分析 引言 背景 M

发表于 2017-11-28

本文由 GodPan 发表在 ScalaCool 团队博客。

MySQL对大家来说,都应该很熟悉了,从大学里的课程到实际工作中数据的存储查询,很多时候都需要用到数据库,很多人也写过与数据库交互的程序,在Java中你可能一开始会使用原生mysql-connector-java来进行操作,后来你会接触到Hibernate,Mybatis等ORM框架,其实它们底层也是基于mysql-connector-java,但很多时候我们并不清楚程序是怎么跟数据库具体交互的,比如执行一个SQL查询,程序是如何从MySQL中获取数据的呢?今天就让我们来看看最基础的MySQL网络协议分析。

引言

阅读本文之前你需要对网络协议需要有基本的了解,比如两台机子之间的数据是如何通信的,硬件层可以暂时不需了解,但网络层和传输层的协议要有一定的理解,比如IP数据包,TCP/IP协议,UDP协议等相关概念,有了这些基础,有利于你阅读本文。

背景

在历史悠久的时代,数据库只作为单机存储,也不怎么需要与程序进行交互的时候的首,它的网络通信并不是那么重要,但随着时代的发展,数据库不再只是单纯的作为一个数据的仓库了,它需要提供与外界的交互,比如远程连接,程序操作数据库等,这时候一份规范的网络通信的协议就非常重要了,比如它是如何校验权限,如何解析SQL语句,如何返回执行结果都需要用到相应的协议,很多时候我们并不需要接触这些内容,因为它太底层了,我们直接使用把它们封装好的第三方包就可以了,为什么还要去学习它的网络协议呢?确实对于一开始学习编程的人来说,这有点操之过急,反而有时候会适得其反,但当你对这一方面有了一定的了解之后,你便会迫不及待得想去探索更深层的奥秘,去了解并学习我们平常用的第三方类库是怎么去实现,明白它的底层原理,甚至对一些莫名其妙的bug也不会再害怕。

MySQL连接方式

分析协议,我们首先要了解如何与数据库连接,说到MySQL连接方式,大家突然可能有点懵,其实它一直伴随着我们,比如我们第一次装数据库完成后执行的第一次登录,比如你没有设置密码:

1
复制代码mysql -uroot

这是最基本的一种数据库连接方式,那么MySQL连接方式到底有几种呢?到MySQL5.7为止,总共有五种,分别是TCP/IP,TLS/SSL,Unix Sockets,Shared Memory,Named pipes,下面我们就来看看这五种的区别:

方式 默认开启 支持系统 只支持本机 如何开启 参数配置
TCP/IP 是 所有系统 否 –skip-networking=yes/no. –port–bind-address
TLS/SSL 是 所有系统(基于TCP/IP)之上 否 –ssl=yes/no. –ssl-* options
Unix Sockets 是 类Unix系统 是 设置–socket= 来关闭. –socket=socket path
Shared Memory 否 Windows系统 是 –shared-memory=on/off. –shared-memory-base-name=
Named pipes 否 Windows系统 否 –enable-named-pipe=on/off. –socket=

从上表中我们可以清晰看出每种连接方式的区别,接下里我会具体说明几种连接是怎么操作的,由于我的机子是Mac OS系统,这里只模拟非Windows系统下的三种方式,因为这三种方式都是默认开启的,我们不需要进行任何配置:

1.Unix Sockets:

1
复制代码mysql -uroot

若你在本机使用这种方式连接MySQL数据库的话,它默认会使用Unix Sockets。

2.TCP/IP:

1
2
复制代码mysql --protocol=tcp -uroot
mysql -P3306 -h127.0.0.1 -uroot

连接的时候我们指定连接协议,或者指定相应的IP及端口,我们的连接方式就变成了TCP/IP方式。

3.TLS/SSL:

1
2
复制代码mysql --protocol=tcp -uroot --ssl=on
mysql -P3306 -h127.0.0.1 -uroot --ssl=on

上表说过,TLS/SSL是基于TCP/IP的,所以我们只需再指定打开ssl配置即可。

然后我们可以通过以下语句来查询目前数据库的连接情况:

1
复制代码SELECT DISTINCT connection_type from performance_schema.threads where connection_type is not null

那么我们如何选择连接方式呢?个人总结了以下几个原则:

  • 若是你能确定程序和数据库在同一台机子(类Unix系统)上,推荐使用Unix Sockets,因为它效率更高;
  • 若数据库分布在不同的机子上,且能确保连接安全或者安全性要求不是那么高,推荐使用TCP/IP,反之使用TLS/SSL;

MySQL数据包

通信中最重要的就是数据,那么程序是如何和MySQL Server进行通信,并交互数据的呢?比如如何验证账户,发送查询语句,返回执行结果等,我先画一个流程图来模拟一下整个过程,帮助大家理解:

mysql-process

整个过程相对来说还是比较清晰的,我们对连接请求和断开请求不需要过分关心,只需要了解这一点就可以了,重要的是其他几点,那么在这几步中,数据是怎么进行交互的呢?

其实主要就是两步,Client将执行命令编码成Server要求的格式传输给Server端执行,Server端将执行结果传输给Client端,Client端再根据相应的数据包格式解析获得所需的数据。

1.基本数据类型

虽然网络中的数据是用字节传输的,但它背后的数据源都是有类型的数据,MySQL协议也有基本的数据类型,好比Java中的8种基本数据类型,但MySQL协议中简单的多,它只有两种基本数据类型,分别为Integer(整型),String(字符串),下面我们就来看看这两种类型。

Integer(整型)

首先Integer在MySQL协议中有两种编码方式,分别为FixedLengthInteger和LengthEncodedInteger
,其中前者用于存储无符号定长整数,实际中使用的不多,这里着重讲一下后者。

使用LengthEncodedInteger编码的整数可能会使用1, 3, 4, 或者9 个字节,具体使用字节取决于数值的大小,下表是不同的数据长度的整数所使用的字节数:

最小值(包含) 最大值(不包含) 存储方式
0 251 1个字节
251 2^16 3个字节(0xFC + 2个字节具体数据)
2^16 2^24 4个字节(0xFD + 3个字节具体数据)
2^24 2^64 9个字节(0xFE + 8个字节具体数据)

举个简单的例子,比如1024的编码为:

1
复制代码0xFC 0x00 0x04

其中0x代表16进制,实际数据传输中并没有该标识,第一位代表这是一个251~2^16之间的数值,所以后面两位为数值具体的值,这里使用的是小端字节序,MySQL默认使用的也是这种编码次序,所以这里1024是0x00 0x04,字节序相关知识可以参考:理解字节序,到这里大家应该对这种编码格式有了一定的了解了,下面我们就来看看String。

String(字符串)

String的编码格式相对Integer来说会复杂一点,主要有以下几种:

  • FixedLengthString(定长方式):需先知道String的长度,MySQL中的一个例子就是ERR_Packet包(后续会讲到)就使用了这种编码方式,因为它的长度固定,用5个字节存储所有数据。
  • NullTerminatedString(Null结尾方式): 字符串以遇到Null作为结束标志,相应的字节为00。
  • VariableLengthString(动态计算字符串长度方式): 字符串的长度取决于其他变量计算而定,比如一个字符串由Integer + Value组成,我们通过计算Integer的值来获取Value的具体的长度。
  • LengthEncodedString(指定字符串长度方式): 与VariableLengthString原理相似,是它的一种特殊情况,具体例子就是我上条举的这个例子。
  • RestOfPacketString(包末端字符串方式):一个包末端的字符串,可根据包的总长度金和当前位置得到字符串的长度,实际中并不常用。

总的来说String的编码格式种类相对比较多,不同方式之间的区别也比较大,若要深刻理解还需从实际的例子里去学习,后续文章中我会写几个demo带大家一起去探索。

2.基本数据包格式

数据包格式也主要分为两种,一种是Server端向Client端发送的数据包格式,另一种则是Client向Server端发送的数据包。

Server to Client

Server向Client发送的数据包有两个原则:

  • 每个数据包大小不能超过2^24字节(16MB);
  • 每个数据包前都需要加上数据包信息;

每个包的基本格式:

Type Name Description
int<3> payload_length(包数据长度) 具体数据包的内容长度,从出去头部四个字节后开始的内容
int<1> sequence_id(包序列id) 每个包的序列id,总数据内容大于16MB时需要用,从0开始,依次增加,新的命令执行会重载为0
string payload(具体数据) 包中除去头部后的具体数据内容

举个列子:

例子 解释
1
basic复制代码01 00 00 00 01| <li>payload_length: 1</li> <li>sequence_id: 0x00</li><li>payload: 0x01</li>

若是数据内容大于或者等于2^24-1个字节,将会拆分发送,举个例子,比如发送16 777 215 (2^24-1) 字节的内容,则会按一下这种方式发送

1
2
复制代码ff ff ff 00 ...
00 00 00 01

第一个数据包满载,第二个数据包是一个空数据包(一种临界情况)。

Client to Server

Client向Server端发送的格式相对来说就简单一点了

Type Name Description
int<1> 执行命令 执行的操作,比如切换数据库,查询表等操作
string 参数 命令相应的参数

命令列表(摘抄自胡桃夹子的博客):

类型值 命令 功能
0x00 COM_SLEEP (内部线程状态)
0x01 COM_QUIT 关闭连接
0x02 COM_INIT_DB 切换数据库
0x03 COM_QUERY SQL查询请求
0x04 COM_FIELD_LIST 获取数据表字段信息
0x05 COM_CREATE_DB 创建数据库
0x06 COM_DROP_DB 删除数据库
0x07 COM_REFRESH 清除缓存
0x08 COM_SHUTDOWN 停止服务器
0x09 COM_STATISTICS 获取服务器统计信息
0x0A COM_PROCESS_INFO 获取当前连接的列表
0x0B COM_CONNECT (内部线程状态)
0x0C COM_PROCESS_KILL 中断某个连接
0x0D COM_DEBUG 保存服务器调试信息
0x0E COM_PING 测试连通性
0x0F COM_TIME (内部线程状态)
0x10 COM_DELAYED_INSERT (内部线程状态)
0x11 COM_CHANGE_USER 重新登陆(不断连接)
0x12 COM_BINLOG_DUMP 获取二进制日志信息
0x13 COM_TABLE_DUMP 获取数据表结构信息
0x14 COM_CONNECT_OUT (内部线程状态)
0x15 COM_REGISTER_SLAVE 从服务器向主服务器进行注册
0x16 COM_STMT_PREPARE 预处理SQL语句
0x17 COM_STMT_EXECUTE 执行预处理语句
0x18 COM_STMT_SEND_LONG_DATA 发送BLOB类型的数据
0x19 COM_STMT_CLOSE 销毁预处理语句
0x1A COM_STMT_RESET 清除预处理语句参数缓存
0x1B COM_SET_OPTION 设置语句选项
0x1C COM_STMT_FETCH 获取预处理语句的执行结果

这里距一个常见的的例子,比如切换数据库:

1
复制代码use godpan

相应的报文格式则为:

1
复制代码0x02 0x67 0x6f 0x64 0x70 0x61 0x6e

其中0x02代表切换数据库命令,后面的字节则为godpan的16进制表达。

数据包类型

有了以上的基础,我们基本知道的与MySQL通信之间的方式以及数据格式,那么与其通信间到底有哪几种数据包呢?接下去的内容是建立在MySQL4.1版本以后,之前版本的数据包类型这里不再论述。

这里主要分为两个阶段,第一个阶段是数据库账户认证阶段,第二个阶段则是执行具体命令阶段,我们先来看看前者。

数据库账户认证阶段

这个阶段就是我们平常所说的登录,主要步骤如下:

  • 1.Client与Server进行连接
  • 2.Server向Client发送Handshake packet
  • 3.Client与Server发送Auth packet
  • 4.Server向Client发送OK packet或者ERR packet

这里我们来看一看上面的Handshake packet和Auth packet,OK packet和ERR packet放在另一个阶段写。

Handshake packet

Handshake packet是由Server向Client发送的初始化包,因为所有从Server向Client端发送的包都是一样的格式,所以前面的四个字节是包头,前三位代表Handshake packet具体内容的数据,另外包序列号为0,很显然这个包内容小于16MB,下面是Handshake packet具体内容的格式:

相对包内容的位置 长度(字节) 名称 描述
0 1 协议版本 协议版本的版本号,通常为10(0x0A)
1 len = strlen (server_version) + 1 数据库版本 使用前面的NullTerminatedString格式编码,长度为数据库版本字符串的长度加上标示结束的的一个字节
len + 1 4 线程ID 此次连接MySQL Server启动的线程ID
len + 5 8 + 1(0x00表示结束) 挑战随机数(第一部分) 用于后续账户密码验证
len + 14 2 协议协商 用于与客户端协商通讯方式
len + 16 1 编码格式 标识数据库目前的编码方式
len + 17 2 服务器状态 用于表示服务器状态,比如是否是事务模式或者自动提交模式
len + 19 13 保留字节 未来可能会用到,预留字节
len + 32 12 + 1(0x00表示结束) 挑战随机数(第二部分) 用于后续账户密码验证

上表就是整个Handshake packet的这个包结构,属性的含义以及规范都有相应的说明,下面是我本机解析的某次连接数据库的Handshake packet包,仅供参考:

1
复制代码{protocolVersion=10, serverVersion='5.7.13', threadId=4055, scramble=[49, 97, 80, 3, 35, 118, 45, 15, 5, 118, 9, 11, 124, 93, 93, 5, 31, 47, 111, 109, 0, 0, 0, 0, 0], serverCapabilities=65535, serverLanguage=33, serverStatus=2}

Auth packet

Auth packet是由Client向Server发送的认证包,用于验证数据库账户登录,相应内容的格式:

相对包内容的位置 长度(字节) 名称 描述
0 4 协议协商 用于与服务端协商通讯方式
4 4 消息最长长度 客户端可以发送或接收的最长长度,0表示不做任何限制
8 1 字符编码 客服端字符编码方式
9 23 保留字节 未来可能会用到,预留字节,用0代替
1
basic复制代码32 |不定| 认证字符串 | 主要有三部分内容<br> <li>用户名:NullTerminatedString格式编码</li><li>加密后的密码:LengthEncodedString格式编码</li><li>数据库名称(可选):NullTerminatedString格式编码</li>

这部分内容是由客户端自己生成,所以说如果我们如果要写一个程序连接数据库,那么这个包就得按照这个格式,不然服务端将会无法识别。

命令执行阶段

在我们正确连接数据库后,我们就要执行相应的命令了,比如切换数据库,执行CRUD操作等,这个阶段主要分为两步,Client发送命令(上文已经给出,下面不再讨论),Server端接收命令执行相应的操作,我们主要关心Server端向我们发送数据包,可分为4类和一个最基础的报文结构Data Field:

  • Data Field:包数据的一个基础结构;
  • OK包(包括PREPARE_OK):Server端发送正确处理信息的包,包头标识为0x00;
  • Error包: Server端发送错误信息的包,包头标识为0xFF;
  • EOF包:用于Server向Client发送结束包,包头标识为0xFE;
  • Result Set包:用于Server向Client发送的查询结果包;

Data Field

Data Field是Server回应包里的一个核心,主要是数据的一种编码结构,跟我之前讲的LengthEncodedInteger和LengthEncodedString很类似,也主要分为三个部分

最小数据长度(包含)|最大数据长度(不包含)|数据长度|格式
—|—|—|
1 |251| 1个字节|1字节 + 具体数据
251 |2^16| 2个字节 | 0xFC + 2个字节数据长度 + 具体数据
2^16 |2^24| 4个字节 | 0xFD + 4个字节数据长度 + 具体数据
2^24 |2^64| 8个字节 | 0xFE + 8个字节数据长度 + 具体数据
NULL | NULL | 0个字节 | 0xFB

要注意的一点是如果出现0xFB(251)开头说明这个数据对应的是MySQL中的NULL。

OK 包

普通的OK包(PREPARE_OK包后面会讲到)会在以下几种情况下产生,由Server发送给相应的接收方:

  • COM_PING: 连接或者测试数据库
  • COM_QUERY: 不需要查询结果集的操作,比如INSERT, UPDATE, or ALTER TABLE
  • COM_REFRESH: 数据刷新
  • COM_REGISTER_SLAVE: 注册从服务器

OK 包的主要结构:

相对包内容的位置 长度(字节) 名称 描述
0 1 包头标识 0x00 代表这是一个OK 包
1 rows_len 影响行数 相应操作影响的行数,比如一个Update操作的记录是5条,那么这个值就为5
1 + rows_len id_len 自增id 插入一条记录时,如果是自增id的话,返回的id值
1 + rows_len + id_len 2 服务器状态 用于表示服务器状态,比如是否是事务模式或者自动提交模式
3 + rows_len + id_len 2 警告数 上次命令引起的警告数
5 + rows_len + id_len msg_len 额外信息 此次操作的一些额外信息

下面是我本机解析的某次正确连接数据库后的OK packet包,仅供参考:

1
复制代码OK{affectedRows=0, insertId=0, serverStatus=2, message='....'}

Error 包

顾名思义Error 包就是当出现错误的时候返回的信息,比如账户验证不通过,查询命令不合法,非空字段未指定值等相关操作,Server端都会向Client端发送Error 包。

Error 包的主要结构:

相对包内容的位置 长度(字节) 名称 描述
0 1 包头标识 0xFF 代表这是一个Error 包
1 2 错误代码 该错误的相应错误代码
3 1 标识位 SQL执行状态标识位,用’#’进行标识
4 5 执行状态 SQL的具体执行状态
9 msg_len 错误信息 具体的错误信息

比如我们现在已经连接了数据库,执行

1
复制代码use test_database;

但是我们数据库中并没有test_database这个数据库,我们将会得到相应的错误信息,下面是我本机解析的Error packet包,仅供参考:

1
复制代码Error{errno=1046, sqlState='3D000', message='No database selected'}

EOF Packet

EOF Packet是用于标识某个阶段数据结束的标志包,会在一下几种情况中产生:

  • 结果集中字段信息结束的时候;
  • 结果集中列信息结束的时候;
  • 服务器确认停止服务的时候;
  • 客户端发送COM_SET_OPTION and COM_DEBUG命令后,服务器回应的时候;
  • 服务器请求使用MySQL4.1版本之前的认证方式的时候;

EOF 包的主要结构:

相对包内容的位置 长度(字节) 名称 描述
0 1 包头标识 0xFE 代表这是一个EOF 包
1 2 警告数 上次命令引起的警告数
3 2 服务器状态

这里要注意的一点,我们上面分析了Data Field的结构,发现它是用0xFE作为长度需要8个字节编码值得标识头,所以我们在判断一个包是否是EOF 包的时候,需要下面两个条件:

  • 标识头(第一个字节)为0xFE;
  • 包的总长度小于9个字节;

Result Set包

Result Set包产生于我们每次数据库执行需要返回结果集的时候,Server端发送给我们的包,比如平常的SELECT,SHOW等命令,Result Set包相对比较复杂,主要包含以下五个方面:

内容 含义
Result Set Header 返回数据的列数量
Field 返回数据的列信息(多个)
EOF 列结束
Row Data 行数据(多个)
EOF 数据结束

我们逐个来分析,首先我们来看Result Set Header。

Result Set Header

Result Set Header表示返回数据的列数量以及一些额外的信息,其主要结构为:

长度 含义
1-9字节 数据的列数量(LengthEncodedInteger编码格式)
1-9字节 额外信息(LengthEncodedInteger编码格式)
Field

Field表示Result Set中数据列的具体信息,可出现多次,具体次数取决于Result Set Header中数据的列数量,它的主要结构为:

长度 含义
4 通常为ASCIIz字符串def
n 数据库名称(Data Field)
n 假如查询指定了表别名,就是表别名(Data Field)
n 原始的表名(Data Field)
n 假如查询指定了列别名,就是列别名(Data Field)
n 原始的列名(Data Field)
1 标识位,通常为12,表示接下去的12个字节是具体的field内容
2 field的编码
4 field的长度
1 field的类型
2 field的标识
2 field值的的小数点精度
2 预留字节
n 可选元素,如果存在,则表示该field的默认值

其中field的类型与标识具体定义和对应变量含义可参考这篇文章:MySQL协议分析

EOF 包

这里的EOF包是标识这列信息的结束,具体结构信息参考上面的EOF包解释。

Row Data

Row Data含着的是我们需要获取的数据,一个Result Set包里面包含着多个Row Data结构(得到的数据可能多行),每个Row Data中包含着多个字段值,它们之间没有间隔,比如我们现在查询到的数据为(id: 1, name: godpan) 那么Row Data内容为(1,godpan),这两个值是连在一起的,对应的值都用LengthEncodedString编码。

EOF 包

等待Row Data发送完之后,Server最后会向Client端发送一个EOF包,标识所有的行数据已经发送完毕。

PREPARE_OK包

PREPARE_OK包产生在Client端向Server发送预处理SQL语句,Server进行正确回应的时候,大家写写Java的时候肯定用过PreparedStatement,这里PreparedStatement的功能就是进行SQL的预处理,预处理的优点比较多,比如效率高,防SQL注入等,有兴趣的同学可以自己去学习下。下面是PREPARE_OK包的结构:

长度 含义
1 0x00(标识是一个OK包)
4 statement_handler_id(预处理语句id)
2 number of columns in result set(结果集中列的数量)
2 number of parameters in query(查询语句中参数的数量)
1 0x00 (填充值)
2 警告数

比如我现在执执行下面的语句:

1
2
3
复制代码PreparedStatement ps = connection.prepareStatement("SELECT * FROM `godpan_fans` where id=?");
ps.setInteger(1, 1);
ps.executeQuery();

得到下面的PREPARE_OK包,仅供参考:

1
复制代码PSOK{statementId=1, columns=5, parameters=1}

如果上面的columns大于0,以及parameters大于0,则将有额外的两个包传输,分别是columns的信息以及parameters的信息,对应信息结构:

内容 含义
Field columns信息(多个)
EOF columns信息结束
Field parameters(多个)
EOF parameters结束

到此整个PREPARE_OK包发送完毕。

Row Data Binary

这个包跟上面提到的Row Data包有什么差别呢?主要有两点:

  • 用不同的方式定义NULL;
  • 数据编码不再单纯的使用LengthEncodedString,而是根据数据类型的不同进行相应的编码;

后面我会分别解释这两点,我们先来看看它的结构:

相对包内容的位置 长度(字节) 名称 描述
0 1 包头标识 0x00
1 (col_count+7+2)/8 Null Bit Map 前两位为预留字节,主要用于区别与其他的几种包(OK,ERROR,EOF),在MySQL 5之后这两个字节都为0X00,其中col_count为列的数量
(col_count+7+2)/8 + 1 n column values 具体的列值,重复多次,根据值类型编码

现在我们来看一下它的两个特点,首先我们来看它是如何来定义NULL的,首先我们看到他的结构中有一个Null Bit Map,除去两个标识位,真正用于标识数据信息的就是(col_count+7)/8位字节,这里我先给出结论,后面再给大家具体分析:

参数个数 长度(字节) 具体值范围 描述
1-8 1 -1, 2^n组合 1 = 2^0表示第一个参数为NULL,3 = 2^0 + 2^1表示第一个和第二参数为NULL…

上面给出了标识NULL的基本算法,原则是哪个参数(次序为n)为NULL,则Null Bit Map相应的值加上2^n,8个参数为一个周期,以此类推。

接着我们来看一下第二点,是如何用具体值类型来对相应的值进行编码的,这里主要分为三类,基本数据类型,时间类型,字符串类型;

  • 基本数据类型:比如TINYINT使用一个字节编码,FLOAT使用四个字节,DOUBLE使用8个字节等;
  • 时间类型:使用类似LengthEncodedString的编码方式编码,具体可参考MySQL_PROTOCOL;
  • 字符串类:不属于上面两类的都属于字符串类型,使用普通的LengthEncodedString;

Execute包

Execute包顾名思义是一个执行包,它是由Client端发送到Server端的,但它和普通的命令又有点不同,它主要是用来执行预处理语句,并会携带相应参数,具体结构如下:

长度 含义
1 COM_EXECUTE(标识是一个Execute包)
4 预处理语句id
1 游标类型
4 预留字节
0 接下去的内容只有在有参数的情况下
(param_count+7)/8 null_bit_map(描述参数中NULL的情况)
1 参数绑定情况
n*2 参数类型(依次存储)
n 参数具体值(非NULL)(依次存储,使用Row Data Binary方式编码)

Execute包从Client端发送到Server端后可能会得到以下几个结果:

  • OK包
  • ERROR包
  • Result Set包(可能多个)

我们需要根据包的不同类型来进行不同的处理。

总结

本篇文章主要讲述了MySQL的连接方式,通信过程及协议,以及传输包的基本格式和相关传输包的类型,内容相对来说,比较多也比较复杂,我也是将近三周才写完,但总体按照我自学的思路走,不会太绕,有些点可能需要细心思考下,写的有误的地方也希望大家能指正,希望对大家有所帮助,后面可能会写几个实例和大家一起学习。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PHP容器--Pimple运行流程浅析

发表于 2017-11-28

需要具备的知识点

闭包

闭包和匿名函数在PHP5.3.0中引入的。

闭包是指:创建时封装周围状态的函数。即使闭包所处的环境不存在了,闭包中封装的状态依然存在。

理论上,闭包和匿名函数是不同的概念。但是PHP将其视作相同概念。实际上,闭包和匿名函数是伪装成函数的对象。他们是Closure类的实例。

闭包和字符串、整数一样,是一等值类型。

创建闭包:

1
2
3
4
5
6
复制代码<?php
$closure = function ($name) {
return 'Hello ' . $name;
};
echo $closure('nesfo');//Hello nesfo
var_dump(method_exists($closure, '__invoke'));//true

我们之所以能调用$closure变量,是因为这个变量的值是一个闭包,而且闭包对象实现了__invoke()魔术方法。只要变量名后有(),PHP就会查找并调用__invoke()方法。

通常会把PHP闭包当作函数的回调使用。

array_map(), preg_replace_callback()方法都会用到回调函数,这是使用闭包的最佳时机!

举个例子:

1
2
3
4
5
复制代码<?php
$numbersPlusOne = array_map(function ($number) {
return $number + 1;
}, [1, 2, 3]);
print_r($numbersPlusOne);

得到结果:

1
复制代码[2, 3, 4]

在闭包出现之前,只能单独创建具名函数,然后使用名称引用那个函数。这么做,代码执行会稍微慢点,而且把回调的实现和使用场景隔离了。

1
2
3
4
5
6
7
复制代码<?php
function incrementNum ($number) {
return $number + 1;
}

$numbersPlusOne = array_map('incrementNum', [1, 2, 3]);
print_r($numbersPlusOne);

SPL

ArrayAccess

实现ArrayAccess接口,可以使得object像array那样操作。ArrayAccess接口包含四个必须实现的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码interface ArrayAccess {
//检查一个偏移位置是否存在
public mixed offsetExists ( mixed $offset );

//获取一个偏移位置的值
public mixed offsetGet( mixed $offset );

//设置一个偏移位置的值
public mixed offsetSet ( mixed $offset );

//复位一个偏移位置的值
public mixed offsetUnset ( mixed $offset );
}

SplObjectStorage

SplObjectStorage类实现了以对象为键的映射(map)或对象的集合(如果忽略作为键的对象所对应的数据)这种数据结构。这个类的实例很像一个数组,但是它所存放的对象都是唯一。该类的另一个特点是,可以直接从中删除指定的对象,而不需要遍历或搜索整个集合。

::class语法

因为 ::class 表示是字符串。用 ::class 的好处在于 IDE 里面可以直接改名一个 class,然后 IDE 自动处理相关引用。
同时,PHP 执行相关代码时,是不会先加载相关 class 的。

同理,代码自动化检查 inspect 也可以正确识别 class。

Pimple容器流程浅析

Pimpl是php社区中比较流行的容器。代码不是很多,详见github.com/silexphp/P.… 。

我们的应用可以基于Pimple开发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
复制代码namespace EasyWeChat\Foundation;

use Pimple\Container;

class Application extends Container
{
/**
* Service Providers.
*
* @var array
*/
protected $providers = [
ServiceProviders\ServerServiceProvider::class,
ServiceProviders\UserServiceProvider::class
];

/**
* Application constructor.
*
* @param array $config
*/
public function __construct($config)
{
parent::__construct();

$this['config'] = function () use ($config) {
return new Config($config);
};

if ($this['config']['debug']) {
error_reporting(E_ALL);
}

$this->registerProviders();
}

/**
* Add a provider.
*
* @param string $provider
*
* @return Application
*/
public function addProvider($provider)
{
array_push($this->providers, $provider);

return $this;
}

/**
* Set providers.
*
* @param array $providers
*/
public function setProviders(array $providers)
{
$this->providers = [];

foreach ($providers as $provider) {
$this->addProvider($provider);
}
}

/**
* Return all providers.
*
* @return array
*/
public function getProviders()
{
return $this->providers;
}

/**
* Magic get access.
*
* @param string $id
*
* @return mixed
*/
public function __get($id)
{
return $this->offsetGet($id);
}

/**
* Magic set access.
*
* @param string $id
* @param mixed $value
*/
public function __set($id, $value)
{
$this->offsetSet($id, $value);
}
}

如何使用我们的应用:

1
2
复制代码$app = new Application([]);
$user = $app->user;

之后我们就可以使用$user对象的方法了。我们发现其实并没有$this->user这个属性,但是可以直接使用。主要是这两个方法起的作用:

1
2
复制代码public function offsetSet($id, $value){}
public function offsetGet($id){}

下面我们将解释在执行这两句代码,Pimple做了什么。但在解释这个之前,我们先看看容器的一些核心概念。

服务提供者

服务提供者是连接容器与具体功能实现类的桥梁。服务提供者需要实现接口ServiceProviderInterface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码namespace Pimple;

/**
* Pimple service provider interface.
*
* @author Fabien Potencier
* @author Dominik Zogg
*/
interface ServiceProviderInterface
{
/**
* Registers services on the given container.
*
* This method should only be used to configure services and parameters.
* It should not get services.
*
* @param Container $pimple A container instance
*/
public function register(Container $pimple);
}

所有服务提供者必须实现接口register方法。

我们的应用里默认有2个服务提供者:

1
2
3
4
复制代码protected $providers = [
ServiceProviders\ServerServiceProvider::class,
ServiceProviders\UserServiceProvider::class
];

以UserServiceProvider为例,我们看其代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码namespace EasyWeChat\Foundation\ServiceProviders;

use EasyWeChat\User\User;
use Pimple\Container;
use Pimple\ServiceProviderInterface;

/**
* Class UserServiceProvider.
*/
class UserServiceProvider implements ServiceProviderInterface
{
/**
* Registers services on the given container.
*
* This method should only be used to configure services and parameters.
* It should not get services.
*
* @param Container $pimple A container instance
*/
public function register(Container $pimple)
{
$pimple['user'] = function ($pimple) {
return new User($pimple['access_token']);
};
}
}

我们看到,该服务提供者的注册方法会给容器增加属性user,但是返回的不是对象,而是一个闭包。这个后面我再做讲解。

服务注册

我们在Application里构造函数里使用$this->registerProviders();对所有服务提供者进行了注册:

1
2
3
4
5
6
复制代码private function registerProviders()
{
foreach ($this->providers as $provider) {
$this->register(new $provider());
}
}

仔细看,我们发现这里实例化了服务提供者,并调用了容器Pimple的register方法:

1
2
3
4
5
6
7
8
9
10
复制代码public function register(ServiceProviderInterface $provider, array $values = array())
{
$provider->register($this);

foreach ($values as $key => $value) {
$this[$key] = $value;
}

return $this;
}

而这里调用了服务提供者的register方法,也就是我们在上一节中提到的:注册方法给容器增加了属性user,但返回的不是对象,而是一个闭包。

当我们给容器Pimple添加属性user的同时,会调用offsetSet($id, $value)方法:给容器Pimple的属性values、keys分别赋值:

1
2
复制代码$this->values[$id] = $value;
$this->keys[$id] = true;

到这里,我们还没有实例化真正提供实际功能的类EasyWeChat\User\Usr。但已经完成了服务提供者的注册工作。

当我们运行到这里:

1
复制代码$user = $app->user;

会调用offsetGet($id)并进行实例化真正的类:

1
2
3
4
5
6
7
复制代码$raw = $this->values[$id];
$val = $this->values[$id] = $raw($this);
$this->raw[$id] = $raw;

$this->frozen[$id] = true;

return $val;

$raw获取的是闭包:

1
2
3
复制代码$pimple['user'] = function ($pimple) {
return new User($pimple['access_token']);
};

$raw($this)返回的是实例化的对象User。也就是说只有实际调用才会去实例化具体的类。后面我们就可以通过$this['user']或者$this->user调用User类里的方法了。

当然,Pimple里还有很多特性值得我们去深入研究,这里不做过多讲解。

参考

1、PHP: 数组式访问 - Manual
php.net/manual/zh/c…
2、利用 SPL 快速实现 Observer 设计模式
www.ibm.com/developer..…
3、Pimple

  • A simple PHP Dependency Injection Container
    pimple.sensiolabs.org/
    4、Laravel源码里面为什么要用::class语法? - 知乎
    www.zhihu.com/questio…
    5、Laravel
    学习笔记 —— 神奇的服务容器 | Laravel China 社区 - 高品质的 Laravel 和 PHP 开发者社区 - Powered by PHPHub
    laravel-china.org/top…
    6、Pimple/README_zh.rst
    at master · 52fhy/Pimple
    github.com/52fhy/Pimp.…

原文发布于博客园:www.cnblogs.com/52fhy/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…927928929…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%