开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

微信支付JSAPI下单和微信小程序调起支付(V2版本) 一、

发表于 2021-07-23

一、前文

参考
基于WxJava实现微信支付SpringBoot后端接口的使用
微信支付—开发者文档—小程序—小程序调起支付API(V3版本)
微信支付—开发文档—小程序支付—小程序调起支付API(V2)
微信支付接口签名校验工具
微信官方文档—小程序—支付 /wx.requestPayment(V3)

  • V2版本的API,通过MD5加密(本博文主要简述)
  • V3版本的API,使用RAS加密

二、流程图

微信小程序SpringBoot后端用户点击付款按钮调用后端下单接口JSAPI统一下单微信订单数据存入数据库生成小程序调起支付所需的数据返回小程序调起支付所需的数据wx.requestPayment发起微信支付支付成功/失败回调JSAPI查询订单确认支付成功,更新数据库微信小程序SpringBoot后端
三、SpringBoot接口实现
================

2.1 微信调起支付所需数据

Md5Utils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码/**
* Md5加密方法
*
* @author weijian
*/
public class Md5Utils
{
private static final Logger log = LoggerFactory.getLogger(Md5Utils.class);

/**
* 生成 MD5
*
* @param data 待处理数据
* @return MD5结果
*/
public static String MD5(String data) {
try {
java.security.MessageDigest md = MessageDigest.getInstance("MD5");
byte[] array = md.digest(data.getBytes("UTF-8"));
StringBuilder sb = new StringBuilder();
for (byte item : array) {
sb.append(Integer.toHexString((item & 0xFF) | 0x100).substring(1, 3));
}
return sb.toString().toUpperCase();
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}

WxPayment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
java复制代码
/**
* 微信调起支付数据 WxPayment
*
* @author weijian
*/
@ApiModel(value="WxPayment",description="微信调起支付数据")
public class WxPayment
{
@ApiModelProperty(value="wxTradeId", required = true)
@Excel(name = "wxTradeId")
private Long wxTradeId;

@ApiModelProperty(value="小程序id", required = true)
@Excel(name = "小程序id")
private String appId;

@ApiModelProperty(value="时间戳", required = true)
@Excel(name = "时间戳")
private String timeStamp;

@ApiModelProperty(value="随机字符串", required = true)
@Excel(name = "随机字符串")
private String nonceStr;

@ApiModelProperty(value="订单详情扩展字符串", required = true)
@Excel(name = "订单详情扩展字符串")
private String _package;

@ApiModelProperty(value="签名方式", required = true)
@Excel(name = "签名方式")
private String signType;

@ApiModelProperty(value="签名", required = true)
@Excel(name = "签名")
private String paySign;

public void sign(String key){
String sign = ("appId="+appId+"&");
sign += ("nonceStr="+nonceStr+"&");
sign += ("package="+_package+"&");
sign += ("signType="+signType+"&");
sign += ("timeStamp="+timeStamp+"&");
sign += ("key="+key);
System.out.println("sign="+sign);
setPaySign(Md5Utils.MD5(sign).toUpperCase());
}

public Long getWxTradeId() {
return wxTradeId;
}

public void setWxTradeId(Long wxTradeId) {
this.wxTradeId = wxTradeId;
}

public String getAppId() {
return appId;
}

public void setAppId(String appId) {
this.appId = appId;
}

public String getTimeStamp() {
return timeStamp;
}

public void setTimeStamp(String timeStamp) {
this.timeStamp = timeStamp;
}

public String getNonceStr() {
return nonceStr;
}

public void setNonceStr(String nonceStr) {
this.nonceStr = nonceStr;
}

public String get_package() {
return _package;
}

public void set_package(String _package) {
this._package = _package;
}

public String getSignType() {
return signType;
}

public void setSignType(String signType) {
this.signType = signType;
}

public String getPaySign() {
return paySign;
}

public void setPaySign(String paySign) {
this.paySign = paySign;
}

@Override
public String toString() {
return "WxPayment{" +
"wxTradeId=" + wxTradeId +
", appId='" + appId + '\'' +
", timeStamp='" + timeStamp + '\'' +
", nonceStr='" + nonceStr + '\'' +
", _package='" + _package + '\'' +
", signType='" + signType + '\'' +
", paySign='" + paySign + '\'' +
'}';
}
}

2.2 下单接口

WxPayController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码
@ApiOperation("微信统一下单")
// @PreAuthorize("@ss.hasPermi('wallet:update')")
@Log(title = "微信统一下单", businessType = BusinessType.UPDATE)
@PostMapping("/wx/unifiedOrder")
public AjaxResult wxUnifiedOrder(@ApiParam() Long billId) throws Exception
{
Bill bill = billService.selectBillById(billId);
if(ObjectUtil.isNull(bill)){
return AjaxResult.error("bill异常");
}

//1. JSAPI微信统一下单
WxPayUnifiedOrderRequest wxPayUnifiedOrderRequest = new WxPayUnifiedOrderRequest();
wxPayUnifiedOrderRequest.setOutTradeNo(IdUtils.fastSimpleUUID(16));
wxPayUnifiedOrderRequest.setTotalFee(1);
wxPayUnifiedOrderRequest.setSpbillCreateIp(IpUtils.getHostIp());
wxPayUnifiedOrderRequest.setNotifyUrl("https://www.nb.cn:8080/v1/wx/notify");
wxPayUnifiedOrderRequest.setTradeType("JSAPI");
wxPayUnifiedOrderRequest.setBody("账单支付");
wxPayUnifiedOrderRequest.setOpenid(SecurityUtils.getLoginUser().getUser().getTenantOpenId());
wxPayUnifiedOrderRequest.setAppid("wx********");
WxPayUnifiedOrderResult wxPayUnifiedOrderResult = wxService.unifiedOrder(wxPayUnifiedOrderRequest);

//2. 微信订单数据存入数据库
WxTrade wxTrade = new WxTrade();
wxTrade.setBillId(billId);
wxTrade.setUserId(SecurityUtils.getUserId());
wxTrade.setUserName(SecurityUtils.getUsername());
wxTrade.setAppId(wxPayUnifiedOrderResult.getAppid());
wxTrade.setMchId(wxPayUnifiedOrderResult.getMchId());
wxTrade.setSubMchId(wxPayUnifiedOrderResult.getSubMchId());
wxTrade.setOutTradeNo(wxPayUnifiedOrderRequest.getOutTradeNo());
wxTrade.setTotalFee(wxPayUnifiedOrderRequest.getTotalFee());
wxTrade.setBody(wxPayUnifiedOrderRequest.getBody());
wxTrade.setStatus(Constants.TRADE_UNKNOWN);
wxTradeService.insertWxTrade(wxTrade);

//3. 生成小程序调起支付所需的数据
WxPayment wxPayment = new WxPayment();
wxPayment.setWxTradeId(wxTrade.getId());
wxPayment.setAppId(wxPayUnifiedOrderResult.getAppid());
wxPayment.setTimeStamp((System.currentTimeMillis()/1000)+"");
wxPayment.set_package("prepay_id="+wxPayUnifiedOrderResult.getPrepayId());
wxPayment.setNonceStr(IdUtils.fastSimpleUUID(32));
wxPayment.setSignType("MD5");
wxPayment.sign(wxPayProperties.getMchKey());
logger.info(wxPayment.toString());

//4. 返回wxPayment
return AjaxResult.success(wxPayment);
}

2.3 支付回调接口

支付回调接口

  1. 查询该微信订单是否完成
  2. 确认支付完成,则执行具体的业务逻辑
1
2
3
4
5
6
java复制代码    /**
* 订单状态, -1:未知;0:支付失败;1:支付成功
*/
public static final int TRADE_UNKNOWN = -1;
public static final int TRADE_FAIL = 0;
public static final int TRADE_SUCCESS = 1;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码    public int queryWxOrder(String outTradeNo){
try {
WxPayOrderQueryResult wxPayOrderQueryResult = wxService.queryOrder(null, outTradeNo);
logger.info(wxPayOrderQueryResult.toString());
if(wxPayOrderQueryResult.getTradeState().equals("NOTPAY")){
return Constants.TRADE_UNKNOWN;
}
return Constants.TRADE_SUCCESS;
}catch (Exception e){
// e.printStackTrace();
//异常,订单不存在
}
return Constants.TRADE_FAIL;
}

三、微信小程序实现

大部分的加密校验,以及与微信支付接口的交互都在Java后台操作,所以微信小程序的代码就相对简洁简单。

3.1 JSAP统一下单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
javascript复制代码  //统一下单
unifiedOrder(){
var _this = this
app.showLoading(true)
keyueliSdk.wxUnifiedOrder(
{
billId: _this.data.id,
},
(res) => {
console.log("wxUnifiedOrder", res)
_this.requestPayment(res.data.data)
},
(res) => {})
},

3.2 小程序调起支付

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
javascript复制代码  //官方标准的支付方法,调起支付界面
requestPayment(payData) {
var _this = this
wx.requestPayment({
timeStamp: payData.timeStamp,
nonceStr: payData.nonceStr,
package: payData._package,
signType: payData.signType,
paySign: payData.paySign,
success(res) {
console.log("支付成功", res)
_this.paySuccess(payData)
},
fail(res) {
console.log("支付失败", res)
app.showLoading(false)
app.showToast("支付失败")
}
})
},

3.3 支付回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
javascript复制代码  //支付成功记录
paySuccess(payData){
keyueliSdk.wxPay(
payData.wxTradeId,
(res) => {
wx.hideLoading()
console.log("wxPay", res)
if(res.data.code==200){
app.showToast("支付成功")
wx.navigateBack({
delta: 1,
})
return
}
app.showToast(res.data.msg)
},
(res) => {})
},

觉得好,就一键三连呗(点赞+收藏+关注)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Oauth20实现单点登录的原理流程,这也太通俗易懂了!

发表于 2021-07-23

单点登录是多域名企业站点流行的登录方式。本文以现实生活场景辅助理解,力争彻底理清 OAuth2.0 实现单点登录的原理流程。同时总结了权限控制的实现方案,及其在微服务架构中的应用。

1 什么是单点登录

1.1 多点登录

传统的多点登录系统中,每个站点都实现了本站专用的帐号数据库和登录模块。各站点的登录状态相互不认可,各站点需要逐一手工登录。如下图,有两个术语含义如下:

  • 认证(authentication): 验证用户的身份;
  • 授权(authorization): 验证用户的访问权限。

img

1.2 单点登录

单点登录,英文是 Single Sign On,缩写为 SSO。多个站点(192.168.1.20X)共用一台认证授权服务器(192.168.1.110,用户数据库和认证授权模块共用)。用户经由其中任何一个站点(比如 192.168.1.201)登录后,可以免登录访问其他所有站点。而且,各站点间可以通过该登录状态直接交互。

img

2 OAuth2 认证授权的原理流程

2.1 生活实例【★★重点★★】

为了直观的理解 OAuth2.0 原理流程,我们假设这样一个生活场景:

(1)档案局A(客户端 / Client):以“档案局ID/密码”标识,是掌握档案资源的机构。并列还有很多档案局B/C/…,每个档案局存储的档案内容(资源 / Resource)不一样,比如政治、经济、军事、文化等;

(2)公民张三(资源所有者 / Resource Owner):以“用户名/密码”标识,需要到各个档案局查档案;

(3)派出所(授权服务器 / Authentication Server):可以是单个巨大的派出所,也可以是数据共享的派出所集群,掌管的信息、提供的对外接口功能有:

  • 档案局信息:所有档案局的“档案局ID/密码”,证明档案局的身份;
  • 公民信息:所有公民的“用户名/密码”,能提供张三是张三的用户身份证明(认证 / Authentication)
  • 公民对于档案局的权限:有张公民和档案局的权限的映射表,可查得各公民对各档案局是否有操作权限(授权 / Authorization)。通常,设计中会增加官职(角色 / Role)一层,各公民属于哪个官职(角色),哪个官职(角色)对于特定档案局有操作权限。

2.1.1 张三首次访问档案局A

张三之前从未到访档案局,第一次来档案局。对照下图序号理解:

(1)张三来到“档案局A”的“档案处”,该处要求实名登记后才能查询,被指示到“用户登记处”办理(HTTP重定向);

(2)张三来到“档案局A”的“用户登记处”,既不能证明身份(认证),又不能证明自己有查档案A的权限(授权)。张三携带档案局A的标识(client-id),被重定向至“授权信开具处”;

(3)张三来到“派出所”的“授权信开具处”,出示档案局A的标识,希望开具授权信(授权)。该处要求首先证明身份(认证),被重定向至“用户身份验证处”;

(4)张三来到“派出所”的“用户身份验证处”,领取了用户身份表(网页登录表单 Form);

(5)张三填上自己的用户名和密码,交给(提交 / Submit)“用户身份验证处”,该处从私用数据库中查得用户名密码匹配,确定此人是张三,开具身份证明信,完成 认证。张三带上身份证明信和档案局A的标识,被重定向至“授权信开具处”;

(6)张三再次来到“授权信开具处”,出示身份证明信和档案局A的标识,该处从私用数据库中查得,张三的官职是市长级别(角色),该官职具有档案局A的查询权限,就开具“允许张三查询档案局A”的授权信(授权码 / code),张三带上授权信被重定向至“档案局”的“用户登录处”;

(7)张三到了“档案局”的“用户登录处”,该处私下拿出档案局A的标识(client-id)和密码,再附上张三出示的授权信(code),向“派出所”的“腰牌发放处”为张三申请的“腰牌”(token),将来张三可以带着这个腰牌表明身份和权限。又被重定向到“档案处”;

关注公众号【码猿技术专栏】回复关键词Spring Boot 进阶、Mybatis 进阶领取作者原创书籍。

(8)张三的会话(Session)已经关联上了腰牌(token),可以直接通过“档案处”查档案。

img

2.1.2 张三首次访问档案局B

张三已经成功访问了档案局A,现在他要访问档案局B。对照下图序号理解:

(1)/(2) 同上;

(3)张三已经有“身份证明信”,直接在“派出所”的“授权信开具处”成功开具“访问档案局B”的授权信;

(4)/(5)/(6) 免了;

(7)“档案局B”的“用户登记处”完成登记;

(8)“档案局B”的“档案处”查得档案。

img

2.1.3 张三再次访问档案局A

张三已经成功访问了档案局A,现在他要访问档案局A。对照下图序号理解:

关注公众号【码猿技术专栏】回复关键词Spring Boot 进阶、Mybatis 进阶领取作者原创书籍。

(1)直接成功查到了档案;

(2~8)都免了。

img

2.2 HTTP 重定向原理

HTTP 协议中,浏览器的 REQUEST 发给服务器之后,服务器如果发现该业务不属于自己管辖,会把你支派到自身服务器或其他服务器(host)的某个接口(uri)。正如我们去政府部门办事,每到一个窗口,工作人员会说“你带上材料A,到本所的X窗口,或者其他Y所的Z窗口”进行下一个手续。

img

2.3 SSO 工作流程

至此,就不难理解 OAuth 2.0 的认证/授权流程,此处不再赘述。请拿下图对照“2.1 生活实例”一节来理解。

img

2.4 OAuth2.0 进阶

  • tools.ietf.org/html/rfc674…
  • tools.ietf.org/html/rfc675…
  • blog.csdn.net/seccloud/ar…

根据官方标准,OAuth 2.0 共用四种授权模式:

  • Authorization Code: 用在服务端应用之间,这种最复杂,也是本文采用的模式;
  • Implicit: 用在移动app或者web app(这些app是在用户的设备上的,如在手机上调起微信来进行认证授权)
  • Resource Owner Password Credentials(password): 应用直接都是受信任的(都是由一家公司开发的,本例子使用)
  • Client Credentials: 用在应用API访问。

img

3 基于 SpringBoot 实现认证/授权

3.1 授权服务器(Authorization Server)

(1) pom.xml

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-oauth2</artifactId>
</dependency>

(2) application.properties

1
properties复制代码server.port=8110 ## 监听端口

(3) AuthorizationServerApplication.java

1
2
3
4
java复制代码@EnableResourceServer // 启用资源服务器
public class AuthorizationServerApplication {
// ...
}

(4) 配置授权服务的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Configuration
@EnableAuthorizationServer
public class Oauth2AuthorizationServerConfigurer extends AuthorizationServerConfigurerAdapter {
@Override
public void configure(final ClientDetailsServiceConfigurer clients) throws Exception {
clients.inMemory()
.withClient("webapp").secret("secret") //客户端 id/secret
.authorizedGrantTypes("authorization code") //授权妈模式
.scopes("user_info")
.autoApprove(true) //自动审批
.accessTokenValiditySeconds(3600); //有效期1hour
}
}

@Configuration
public class Oauth2WebSecurityConfigurer extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http.requestMatchers()
.antMatchers("/login", "/oauth/authorize/oauth/logout")
.and().authorizeRequests().anyRequest().authenticated()
.and().formLogin().permitAll();
}

@Override
protected void configure(AuthenticationManagerBuilder auth) throws Exception {
auth.inMemoryAuthentication().withUser("admin").password("admin123").roles("ADMIN");
}
}

3.2 客户端(Client, 业务网站)

(1) pom.xml

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-oauth2</artifactId>
</dependency>

(2) application.properties

1
2
3
4
5
6
properties复制代码server port=8080
security.oauth2.client.client-id=webapp
security.oauth2.client.client-secret=secret
security.oauth2.client.access-token-uri=http://localhost:8110/oauth/token
security.oauth2.client.user-authorization-uri=http://localhost:8110/oauth/authorize
security.oauth2.resource.user-info-uri=http://localhost:8110/oauth/user

(3) 配置 WEB 安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@Configuration
@EnableOAuth2Sso
public class Oauth2WebsecurityConfigurer extends WebSecurityConfigurerAdapter {
@Override
public void configure(HttpSecurity http) throws Exception {
http.antMatcher("/**").authorizeRequests()
.antMatchers("/", "/login").permitAll()
.anyRequest().authenticated();
}
}

@RestController
public class Oauth2ClientController {
@GetMapping("/")
public ModelAndView index() {
return new ModelAndView("index");
}

@GetMapping("/welcome")
public ModelAndView welcome() {
return new ModelAndView("welcome");
}
}

3.3 用户权限控制(基于角色)

  • 授权服务器中,定义各用户拥有的角色: user=USER, admin=ADMIN/USER, root=ROOT/ADMIN/USER
  • 业务网站中(client),注解标明哪些角色可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@RestController
public class Oauth2ClientController {
@GetMapping("/welcome")
public ModelAndView welcome() {
return new ModelAndView("welcome");
}

@GetMapping("/api/user")
@PreAuthorize("hasAuthority('USER')")
public Map<String, Object> apiUser() {
}

@GetMapping("/api/admin")
@PreAuthorize("hasAuthority('ADMIN')")
public Map<String, Object> apiAdmin() {
}

@GetMapping("/api/root")
@PreAuthorize("hasAuthority('ROOT')")
public Map<String, Object> apiRoot() {
}
}

4 综合运用

4.1 权限控制方案

下图是基本的认证/授权控制方案,主要设计了认证授权服务器上相关数据表的基本定义。可对照本文“2.1 生活实例”一节来理解。img

4.2 在微服务架构中的应用

与常规服务架构不同,在微服务架构中,Authorization Server/Resource Server 是作为微服务存在的,用户的登录可以通过API网关一次性完成,无需与无法跳转至内网的 Authorization Server 来完成。

img

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 Seata Server 端接收请求

发表于 2021-07-22

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

突然发现 Server 端接受请求这一块漏掉了 , 这一块有点绕 , 建了几个线程来处理 , 所以有必要补上

Seata 的 Server 和 Client 端主要是通过 Netty 进行通信的

Client 与 Server 通信的主要目的是为了创建事务 GlobalSession 以及 往GlobalSession 中注册 Branch .

二 . Client 段请求

前置补充 : 前文中 说了 Client 的调用流程

1
2
3
4
5
6
java复制代码// C- DefaultTransactionManager Begin 中发起 GlobalSession 
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
// 发起 Netty 请求
// timeout=300000,transactionName=dubbo-gts-seata-example
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
}

三 . Seata Server 端处理

上一篇说了 seata client 发起了一个 GlobalLockRequest , 这里看一下 Server 端如何处理的 , 之前我们已经知道 , Seata 通过 Netty 实现的前后端交互 , 其主要逻辑如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码
// 入口类 : AbstractNettyRemotingServer
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
//.........
}

// 调用流程 :
C- AbstractNettyRemotingServer # channelRead
C- AbstractNettyRemoting # processMessage


// RM 的注册
RegRmProcessor

//
BatchLogHandler
DefaultCoordinator
AbstractNettyRemotingServer

3.1 流程的入口

seata-system-AbstractNettyRemoting.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
java复制代码// Step 1 : Netty 请求的入口
C- AbstractNettyRemotingServer
M- channelRead(final ChannelHandlerContext ctx, Object msg)
- processMessage(ctx, (RpcMessage) msg)


// Step 2 : 处理 Message
C- AbstractNettyRemoting
M- processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage)

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;

// 准备 Pair 对象 , 获取执行类 -> 3.2
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());

if (pair != null) {
if (pair.getSecond() != null) {
try {
// 步骤一 : 执行 ExecutorService ,准备线程
pair.getSecond().execute(() -> {
try {
// 步骤二 :执行 process
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
int idx = new Random().nextInt(100);
try {
Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
} catch (IOException exx) {
}
allowDumpStack = false;
}
}
} else {
pair.getFirst().process(ctx, rpcMessage);
}
}
}
}

seata-netty-message-request.jpg

3.2 ExecutorService 执行

ExecutorService 的初始化流程

从上文的结构图中可以看到 , 其最终的抽象类为 AbstractNettyRemoting , 这里主要有2个ExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码// 该 ExecutorService 主要在 init 后定期执行
ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("timeoutChecker", 1, true));

// 用于 Netty Request 时处理 , 该对象在 Server 初始化时通过构造器传入
ThreadPoolExecutor messageExecutor;

// PS : 这里来回顾一下
C- Server # main
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

ExecutorService 的使用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 从上文可以看到 , 这里主要使用的是 NamedThreadFactory 创建线程
public Thread newThread(Runnable r) {
String name = prefix + "_" + counter.incrementAndGet();
if (totalSize > 1) {
name += "_" + totalSize;
}
// Group : java.lang.ThreadGroup[name=main,maxpri=10]
// name : ServerHandlerThread_1_18_500
Thread thread = new FastThreadLocalThread(group, r, name);

thread.setDaemon(makeDaemons);
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}

// PS : 为什么用 FastThreadLocalThread 呢 ?
- 因为百度第一个搜索结果为 <惊:FastThreadLocal吞吐量居然是ThreadLocal的3倍!!!>
- 主要原因就是吞吐量快 , 但是其快主要提现在单线程情况下 , 因为其使用数组避免了hash冲突的发生

- 另外 FastThreadLocalThread 是 Netty 内部自己写的线程类

3.3 Processor 处理

3.3.1 Processor 的加载处理

Processor 的加载是在 NettyRemotingServer 中进行 ,简述就是3步 :

Step 1 : Server # main 中开启 nettyRemotingServer init

Step 2 : registerProcessor 注册各种 Processor

Step 3 : 构建 Pair 放入集合 , 用于运行时使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
java复制代码// Step 1 : Server # main 中执行 (PS : Server 是 Seata 启动类 , 可以详见之前的文章)
public static void main(String[] args) throws IOException {

// .... 省略其他逻辑 , 最后对 Netty 进行初始化
try {
nettyRemotingServer.init();
} catch (Throwable e) {
System.exit(-1);
}
System.exit(0);
}


// Step 2 : init 初始化
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}

// Step 3 : 注册 Processor
private void registerProcessor() {
ServerOnRequestProcessor onRequestProcessor =new ServerOnRequestProcessor(this, getHandler());

super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);

//------
ServerOnResponseProcessor onResponseProcessor =new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);

//------
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);

//------
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

//------
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);

}

// PS : 可以看到 , 这里大概有以下几种处理器 :
- ServerOnRequestProcessor : 处理 RM/TM 客户端请求信息
- ServerOnResponseProcessor : 处理 RM/TM 客户端返回信息
- RegRmProcessor : RM 注册处理器
- RegTmProcessor : TM 注册处理器
- ServerHeartbeatProcessor : 处理心跳信息

// PS : 这里还能看到 , 其中 传入了一个 messageExecutor (ThreadPoolExecutor) , 该对象用于后续创建线程


// Step End : 可以看到 , 这里就已经构建了 Pair , 用于后方使用
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}

// PS : 这里往父抽象类中设置了集合 (AbstractNettyRemoting)
protected final HashMap<Integer, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

3.2 对请求进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码// Step 1 : 进入Process 处理 (ServerOnRequestProcessor)
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
onRegRmMessage(ctx, rpcMessage);
}

// Step 2 : 处理 Request
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
// 省略 Log 操作
if (!(message instanceof AbstractMessage)) {
return;
}
if (message instanceof MergedWarpMessage) {
AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];
// 如果是多请求
for (int i = 0; i < results.length; i++) {
final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);
// 3.3 调用 Handler 对 Message 进行处理
results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
} else {
// 如果是单独的请求
final AbstractMessage msg = (AbstractMessage) message;
// 3.3 调用 Handler 对 Message 进行处理
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
}
}

3.3 Handler 分别处理 Message

可以看到 , 其中有多个 MessageHandler 进行处理

seata-message-handler.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码// 其核心处理类为 TransactionMessageHandler : 在上层处理接收到的RPC消息
I- TransactionMessageHandler
M- AbstractResultMessage onRequest(AbstractMessage request, RpcContext context)
M- void onResponse(AbstractResultMessage response, RpcContext context)

C- DefaultCoordinator
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);

return transactionRequest.handle(context);
}


C- AbstractTCInboundHandler
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
try {
// 发起 Gouble 处理
doGlobalBegin(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore,
String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
e);
}
}
}, request, response);
return response;
}

3.4 开始处理

可以看到 , 3.3 中就正式的开始了 doGlobalBegin 的处理 , 也就和 Session 那个章节连接起来了

1
java复制代码public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {

四 . 补充

4.1 . Seata RM 端注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码// Server 端注册的主要对象为 : RegRmProcessor

@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
onRegRmMessage(ctx, rpcMessage);
}

// 发起 Server 注册
private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
// RegisterRMRequest{resourceIds='null', applicationId='business-seata-example', transactionServiceGroup='business-service-seata-service-group'}
RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
boolean isSuccess = false;
String errorInfo = StringUtils.EMPTY;
try {
if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
// ChannelManager 注册当前 channel
ChannelManager.registerRMChannel(message, ctx.channel());
// 控制 Channel 版本
Version.putChannelVersion(ctx.channel(), message.getVersion());
isSuccess = true;
}
} catch (Exception exx) {
isSuccess = false;
errorInfo = exx.getMessage();
}
// 构建返回结果
RegisterRMResponse response = new RegisterRMResponse(isSuccess);
if (StringUtils.isNotEmpty(errorInfo)) {
response.setMsg(errorInfo);
}
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
}

RegTmProcessor 同理 , 这里就不看了 , 以后有兴趣可以把这个channel 和版本控制专门看一下

4.2 ServerHeartbeatProcessor 心跳检测

1
2
3
4
5
6
7
8
9
10
11
java复制代码public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
try {
// 好像啥事没做 , 就打了个 log
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), HeartbeatMessage.PONG);
} catch (Throwable throwable) {
LOGGER.error("send response error: {}", throwable.getMessage(), throwable);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("received PING from {}", ctx.channel().remoteAddress());
}
}

总结

终于把这个环节补齐了 , 整个 Seata 就通了 , 后面不准备开新文 ,准备把文章好好的优化下 ,补充细节 !!!

总结一下就是在 Server # main 中初始化了 ThreadPoolExecutor 和 NettyRemotingServer , 在AbstractNettyRemotingServer 中对 NettyRequest 进行处理 .

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

java爬虫爬取b站视频分享iframe代码并保存10000

发表于 2021-07-22

需求:自己开发的一个视频网站不想把自己上传视频和封面图片,因为一个一个上传视频文件和封面图片还是很费时间的,想着直接抓取点b站上的视频iframe分享的链接和图片链接到我的数据库中,这样网站就很快就填充起来了,看着就丰满多了(单纯是为了让我开发的系统看着有数据丰满点好看)。

思路分析:爬取大量的iframe代码和视频标题以及视频封面照片的链接保存到数据库就行。当然如果您是要爬取后把视频文件也下载到自己电脑的话,也可以用java代码实现的哦。

实际操作:

1、先用浏览器打开b站,开发者模式(浏览器开发者模式可以f12打开)进入network面板分析下数据,一般网站都是前后分离的,页面上的动态数据一般都是发送ajax请求后台接口获取到的(当然也有那种提前渲染的,那种不在咱爬取的行列

2、最好是找那种带分页的页面去分析爬取数据。

3、点击一个一个的请求分析请求头和响应数据(主要是看响应,找那种返回是json格式的)

我这里找到一个这样的接口

api.bilibili.com/x/web-inter…

4、开始写java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
ini复制代码public static void main(String[] args) throws Exception {
//1加载驱动
Class.forName("com.mysql.cj.jdbc.Driver");
//2获取连接
Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useunicode=true&useSSL=FALSE&serverTimezone=UTC", "root", "123456");
//3获得语句执行者
Statement st = conn.createStatement();

String offset="761597855_1626694963";
long num = 0;

for(int page=1; page<3500; page++) {
System.out.println("页码:" + page);

//请求获取结果
String result = HttpClientUtil.getByJsonParam("https://api.bilibili.com/x/web-interface/web/channel/multiple/list?channel_id=17683&sort_type=hot&offset="+offset+"&page_size=30");
System.out.println(result);

ObjectMapper objectMapper = new ObjectMapper();
HashMap resultMap = objectMapper.readValue(result, HashMap.class);
HashMap dataMap = (HashMap)resultMap.get("data");
//获取下一页数据
offset= (String) dataMap.get("offset");
ArrayList> dataList = (ArrayList>)dataMap.get("list");


for(int i=0; i itemMap = dataList.get(i);
int user_id=1;
//aid
String aid = String.valueOf(itemMap.get("id"));
//System.out.println(aid);
//bvid
String bvid =(String)itemMap.get("bvid");
//System.out.println(bvid);
//cid
//String cid = String.valueOf(itemMap.get("cid"));
//System.out.println(cid);
String vsrc = "//player.bilibili.com/player.html?aid=" + aid + "&bvid=" + bvid + "&page=1&high_quality=1&danmaku=0";
System.out.println(vsrc);
//图片
String v_image = (String)itemMap.get("cover");
System.out.println(v_image);
//标题
String vname = (String)itemMap.get("name");
System.out.println(vname);
//描述
//String descript = (String)itemMap.get("desc");
//System.out.println(descript);


/*作者信息
HashMap owner = (HashMap)itemMap.get("owner");
//mid
String mid = String.valueOf(owner.get("mid"));
System.out.println(mid);
//name
String name = (String)owner.get("name");
System.out.println(name);
//face
String face = (String)owner.get("face");
System.out.println(face);
*/


String itemSql = "INSERT video (vid, vname, vsrc, userid, praise, get_icon, type_id, v_image, play_num, tags, vtime) VALUES ('" + num + "','" + vname + "', '" + vsrc +"', '" + 1 + "', '" + 0 + "', '" + 0 + "', '" + (int)(1+Math.random()*(9-1+1)) + "', '" + v_image + "', '" + (int)(1+Math.random()*(100000-1+1)) + "', '" + "NULL" + "', now()) ";
//System.out.println(itemSql);
//st.executeUpdate(itemSql);
System.out.println(itemSql);
try {
st.executeUpdate(itemSql);
}
catch (Exception e){
continue;
}

System.out.println("//=========================================");


}
}
st.close();
conn.close();

}

5、运行第4步的代码就可以把数据轻松保存到数据库(运行程序的时间可能有点长)

6、最后直接运行我的系统后的效果:

有什么不懂得再q我1913284695。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

rapidjson帮你进行参数校验 Python 主题月

发表于 2021-07-22

本文正在参加「Python主题月」,详情查看 活动链接

微信公众号搜索【程序媛小庄】,Rest cannot be enjoyed by lazy people~

前言

在使用Django框架开发前后端分离的项目时,通常需要对前端传递过来的参数进行校验,校验的方式有多种,可以使用drf进行校验,也可以使用json进行校验,本文介绍在Python中rapidjson的基本使用以及如何进行参数校验。

rapidjson简介和安装

rapidjson是一个性能非常好的C++ JSON解析器和序列化库,它被包装成了Python3的扩展包,就是说在Python3中可以使用rapidjson进行数据的序列化和反序列化操作并且可以对参数进行校验,非常方便好用。

rapidjson安装命令:pip install python-rapidjson。

rapidjson基本使用

rapidjson和json模块在基本使用方法上一致的,只不过rapidjson在某些参数方面和json模块不兼容,这些参数并不常用,这里不做过多介绍,详情可参照rapidjson官方文档。基本使用介绍两个序列化的方法dump/dumps,反序列化的load/loads使用json模块的即可。

dumps & dump这两个方法都是将Python实例对象序列化为JSON格式的字符串,用法和参数大致相同,dump方法比dumps方法多了一个必要的file_like参数。

dumps() 方法

该方法返回的结果是一个Python 字符串实例。参数非常多,这里只介绍经常使用的三个参数。

1
python复制代码rapidjson.dumps(obj, *, skipkeys=False, ensure_ascii=True, write_mode=WM_COMPACT, indent=4, default=None, sort_keys=False, number_mode=None, datetime_mode=None, uuid_mode=None, bytes_mode=BM_UTF8, iterable_mode=IM_ANY_ITERABLE, mapping_mode=MM_ANY_MAPPING, allow_nan=True)

skipkeys

该参数表示是否跳过不可用的字典的key进行序列化,如果默认为False,如果修改为True字典的key如果不属于基本数据类型(str int float bool None)之一就会跳过该key而不会抛出TypeError的异常。

1
2
3
4
5
6
7
8
9
10
11
12
python复制代码import rapidjson
from pprint import pprint

dic = {
True: False,
(0,): 'python'
}
res = rapidjson.dumps(dic)
pprint(res) # TypeError: {True: False, (0,): 'python'} is not JSON serializable

res = rapidjson.dumps(dic, skipkeys=True)
pprint(res) # '{}'

ensure_ascii

该参数表示序列化的结果是否只包含ASCII字符,默认值是True,将Python实例序列化后所有的非ASCII码的字符都会被转义,如果将该参数的值修改为False,增会将字符原样输出。

1
2
3
4
5
6
7
8
9
python复制代码dic = {
'name': '丽丽',
'name1': 'lili'
}
res = rapidjson.dumps(dic)
pprint(res) # '{"name":"\\u4E3D\\u4E3D","name1":"lili"}'

res = rapidjson.dumps(dic, ensure_ascii=False)
pprint(res) # '{"name":"丽丽","name1":"lili"}'

sort_keys

该参数表示序列化时是否将字典的key按照字母进行排序。默认是False,如果修改为True,字典序列化得到的结果就是按照字典的key的字母顺序进行排序的。

1
2
3
4
5
6
python复制代码dic = {
'name': '丽丽',
'age': '10'
}
res = rapidjson.dumps(dic, ensure_ascii=False, sort_keys=True)
pprint(res) # '{"age":"10","name":"丽丽"}'

dump()方法

该方法和dumps方法非常类似,不同的是该方法需要一个额外的必须的参数 - 一个file-like的可写流式对象,比如文件对象,将第一个参数obj进行序列化写入可写的流式对象中。

1
python复制代码rapidjson.dump(obj, stream, *, skipkeys=False, ensure_ascii=True, write_mode=WM_COMPACT, indent=4, default=None, sort_keys=False, number_mode=None, datetime_mode=None, uuid_mode=None, bytes_mode=BM_UTF8, iterable_mode=IM_ANY_ITERABLE, mapping_mode=MM_ANY_MAPPING, chunk_size=65536, allow_nan=True)

下面是该方法的基本使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
python复制代码# 写入文件
dic = {
'name': '丽丽',
'age': '10'
}
f = open('1.py', 'w', encoding='utf8')
res = rapidjson.dump(dic, f)
pprint(res)

# 或者下面这种用法
import io

stream = io.BytesIO()
dump('bar', stream)
print(stream.getvalue()) # b'"bar"'

Validator class

rapidjson中的Validator类可以用来做参数校验。Validator的参数是JSON schema,当我们需要知道JSON数据中预期的字段以及值的表示方式时,这就是JSON Schema的用武之地,是描述JSON数据结构的一种声明格式,也可以通俗的理解为是参数的校验规则。如果JSON schema是不可用的JSON格式的数据,就会抛出JSONDecodeError的异常。

类的参数就是校验规则,如果给定的JSON数据没有通过校验就会抛出ValidationError异常,异常包括三个部分,分别是错误的类型、校验的规则以及在JSON字符串中错误出现的位置。

1
2
3
4
5
6
python复制代码import rapidjson
from pprint import pprint

validate = rapidjson.Validator('{"required": ["a", "b"]}') # 表示a和b这两个参数是必须的
validate('{"a": null, "b": 1}') # 符合规则
validate('{"a": null, "c": false}') # rapidjson.ValidationError: ('required', '#', '#')
1
2
3
4
5
6
python复制代码validate = rapidjson.Validator('{"type": "array",'  # 参数类型是array
' "items": {"type": "string"},' # array中的每个元素类型是string
' "minItems": 1}') # array中元素数量最少为1

validate('["foo", "bar"]') # 符合规则
validate('[]') # rapidjson.ValidationError: ('minItems', '#', '#')

关于JSON schema的更多参数校验规则以及定义规范可以参考*JSON schema官方文档*,下述是一种JSON schema格式仅供参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
json复制代码LOGIN_SCHEMA = {
"type": "object",
"properties": {
"token": "string",
"number": "integer"
},
"required": ["token"],
}
}

validate = rapidjson.Validator(rapidjson.dumps(LOGIN_SCHEMA))
data = {
'token': 'python',
'number': 10
}
validate(rapidjson.dumps(data))

结语

文章首发于微信公众号程序媛小庄,同步于掘金。

码字不易,转载请说明出处,走过路过的小伙伴们伸出可爱的小指头点个赞再走吧(╹▽╹)

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

高并发教程八:秒杀系统设计 秒杀场景 理解秒杀系统 架构原则

发表于 2021-07-22

秒杀场景

最典型的就是淘宝京东等电商双十一秒杀了,短时间上亿的用户涌入,瞬间流量巨大(高并发)。例如,200万人准备在凌晨12:00准备抢购一件商品,但是商品的数量是有限的100件,这样真实能购买到该件商品的用户也只有100人及以下,不能卖超

但是从业务上来说,秒杀活动是希望更多的人来参与,也就是抢购之前希望有越来越多的人来看购买商品,但是,在抢购时间达到后,用户开始真正下单时,秒杀的服务器后端却不希望同时有几百万人同时发起抢购请求

我们都知道服务器的处理资源是有限的,所以出现峰值的时候,很容易导致服务器宕机,用户无法访问的情况出现,这就好比出行的时候存在早高峰和晚高峰的问题,为了解决这个问题,出行就有了错峰限行的解决方案

同理,在线上的秒杀等业务场景,也需要类似的解决方案,需要平安度过同时抢购带来的流量峰值的问题,这就是流量削峰的由来

理解秒杀系统

那么,如何才能更好地理解秒杀系统呢?我觉得作为一个程序员,你首先需要从高维度出发,从整体上思考问题。在我看来,秒杀其实主要解决两个问题,一个是并发读,一个是并发写。并发读的核心优化理念是尽量减少用户到服务端来“读”数据,或者让他们读更少的数据;并发写的处理原则也一样,它要求我们在数据库层面独立出来一个库,做特殊的处理。另外,我们还要针对秒杀系统做一些保护,针对意料之外的情况设计兜底方案,以防止最坏的情况发生。

而从一个架构师的角度来看,要想打造并维护一个超大流量并发读写、高性能、高可用的系统,在整个用户请求路径上从浏览器到服务端我们要遵循几个原则,就是要保证用户请求的数据尽量少、请求数尽量少、路径尽量短、依赖尽量少,并且不要有单点。这些关键点我会在后面的文章里重点讲解。

其实,秒杀的整体架构可以概括为“稳、准、快”几个关键字。

所谓“稳”,就是整个系统架构要满足高可用,流量符合预期时肯定要稳定,就是超出预期时也同样不能掉链子,你要保证秒杀活动顺利完成,即秒杀商品顺利地卖出去,这个是最基本的前提。

然后就是“准”,就是秒杀10台iPhone,那就只能成交10台,多一台少一台都不行。一旦库存不对,那平台就要承担损失,所以“准”就是要求保证数据的一致性。

最后再看“快”,“快”其实很好理解,它就是说系统的性能要足够高,否则你怎么支撑这么大的流量呢?不光是服务端要做极致的性能优化,而且在整个请求链路上都要做协同的优化,每个地方快一点,整个系统就完美了。

所以从技术角度上看“稳、准、快”,就对应了我们架构上的高可用、一致性和高性能的要求,我们的专栏也将主要围绕这几个方面来展开,具体如下:

  • 高性能。 秒杀涉及大量的并发读和并发写,因此支持高并发访问这点非常关键。将从设计数据的动静分离方案、热点的发现与隔离、请求的削峰与分层过滤、服务端的极致优化这4个方面重点介绍。
  • 一致性。 秒杀中商品减库存的实现方式同样关键。可想而知,有限数量的商品在同一时刻被很多倍的请求同时来减库存,减库存又分为“拍下减库存”“付款减库存”以及预扣等几种,在大并发更新的过程中都要保证数据的准确性,其难度可想而知。因此,我将用一篇文章来专门讲解如何设计秒杀减库存方案。
  • 高可用。 虽然我介绍了很多极致的优化思路,但现实中总难免出现一些我们考虑不到的情况,所以要保证系统的高可用和正确性,我们还要设计一个PlanB来兜底,以便在最坏情况发生时仍然能够从容应对。专栏的最后,我将带你思考可以从哪些环节来设计兜底方案。

架构原则“4要1不要”

数据要尽量少

  • 所谓“数据要尽量少”,首先是指用户请求的数据能少就少。请求的数据包括上传给系统的数据和系统返回给用户的数据(通常就是网页)。
  • 为啥“数据要尽量少”呢?因为首先这些数据在网络上传输需要时间,其次不管是请求数据还是返回数据都需要服务器做处理,而服务器在写网络时通常都要做压缩和字符编码,这些都非常消耗CPU,所以减少传输的数据量可以显著减少CPU的使用。例如,我们可以简化秒杀页面的大小,去掉不必要的页面装修效果,等等。
  • 其次,“数据要尽量少”还要求系统依赖的数据能少就少,包括系统完成某些业务逻辑需要读取和保存的数据,这些数据一般是和后台服务以及数据库打交道的。调用其他服务会涉及数据的序列化和反序列化,而这也是CPU的一大杀手,同样也会增加延时。而且,数据库本身也容易成为一个瓶颈,所以和数据库打交道越少越好,数据越简单、越小则越好

请求数要尽量少

  • 用户请求的页面返回后,浏览器渲染这个页面还要包含其他的额外请求,比如说,这个页面依赖的CSS/JavaScript、图片,以及Ajax请求等等都定义为“额外请求”,这些额外请求应该尽量少。因为浏览器每发出一个请求都多少会有一些消耗,例如建立连接要做三次握手,有的时候有页面依赖或者连接数限制,一些请求(例如JavaScript)还需要串行加载等。另外,如果不同请求的域名不一样的话,还涉及这些域名的DNS解析,可能会耗时更久。所以你要记住的是,减少请求数可以显著减少以上这些因素导致的资源消耗。
  • 例如,减少请求数最常用的一个实践就是合并CSS和JavaScript文件,把多个JavaScript文件合并成一个文件,在URL中用逗号隔开(g.xxx.com/tm/xx-b/4.0…??module-preview/index.xtpl.js,module-jhs/index.xtpl.js,module-focus/index.xtpl.js)。这种方式在服务端仍然是单个文件各自存放,只是服务端会有一个组件解析这个URL,然后动态把这些文件合并起来一起返回

路径要尽量短

  • 所谓“路径”,就是用户发出请求到返回数据这个过程中,需求经过的中间的节点数。
  • 通常,这些节点可以表示为一个系统或者一个新的Socket连接(比如代理服务器只是创建一个新的Socket连接来转发请求)。每经过一个节点,一般都会产生一个新的Socket连接。
  • 然而,每增加一个连接都会增加新的不确定性。从概率统计上来说,假如一次请求经过5个节点,每个节点的可用性是99.9%的话,那么整个请求的可用性是:99.9%的5次方,约等于99.5%。
  • 所以缩短请求路径不仅可以增加可用性,同样可以有效提升性能(减少中间节点可以减少数据的序列化与反序列化),并减少延时(可以减少网络传输耗时)。
  • 要缩短访问路径有一种办法,就是多个相互强依赖的应用合并部署在一起,把远程过程调用(RPC)变成JVM内部之间的方法调用。在《大型网站技术架构演进与性能优化》一书中,我也有一章介绍了这种技术的详细实现。

依赖要尽量少

  • 所谓依赖,指的是要完成一次用户请求必须依赖的系统或者服务,这里的依赖指的是强依赖。
  • 举个例子,比如说你要展示秒杀页面,而这个页面必须强依赖商品信息、用户信息,还有其他如优惠券、成交列表等这些对秒杀不是非要不可的信息(弱依赖),这些弱依赖在紧急情况下就可以去掉。
  • 要减少依赖,我们可以给系统进行分级,比如0级系统、1级系统、2级系统、3级系统,0级系统如果是最重要的系统,那么0级系统强依赖的系统也同样是最重要的系统,以此类推。
  • 注意,0级系统要尽量减少对1级系统的强依赖,防止重要的系统被不重要的系统拖垮。例如支付系统是0级系统,而优惠券是1级系统的话,在极端情况下可以把优惠券给降级,防止支付系统被优惠券这个1级系统给拖垮。

不要有单点

  • 系统中的单点可以说是系统架构上的一个大忌,因为单点意味着没有备份,风险不可控,我们设计分布式系统最重要的原则就是“消除单点”

不同场景下的不同架构案例

随着请求量的加大(比如从1w/s到了10w/s的量级),这个简单的架构很快就遇到了瓶颈,因此需要做架构改造来提升系统性能。这些架构改造包括:

  • 把秒杀系统独立出来单独打造一个系统,这样可以有针对性地做优化,例如这个独立出来的系统就减少了店铺装修的功能,减少了页面的复杂度;
  • 在系统部署上也独立做一个机器集群,这样秒杀的大流量就不会影响到正常的商品购买集群的机器负载;
  • 将热点数据(如库存数据)单独放到一个缓存系统中,以提高“读性能”;
  • 增加秒杀答题,防止有秒杀器抢单

image.png

然而这个架构仍然支持不了超过100w/s的请求量,所以为了进一步提升秒杀系统的性能,我们又对架构做进一步升级,比如:

  • 对页面进行彻底的动静分离,使得用户秒杀时不需要刷新整个页面,而只需要点击抢宝按钮,借此把页面刷新的数据降到最少;
  • 在服务端对秒杀商品进行本地缓存,不需要再调用依赖系统的后台服务获取数据,甚至不需要去公共的缓存集群中查询数据,这样不仅可以减少系统调用,而且能够避免压垮公共缓存集群。
  • 增加系统限流保护,防止最坏情况发生

image.png

如何做好动静分离

何为动静数据

那到底什么才是动静分离呢?所谓“动静分离”,其实就是把用户请求的数据(如HTML页面)划分为“动态数据”和“静态数据”。

简单来说, “动态数据”和“静态数据”的主要区别就是看页面中输出的数据是否和URL、浏览者、时间、地域相关,以及是否含有Cookie等私密数据。比如说:

  1. 很多媒体类的网站,某一篇文章的内容不管是你访问还是我访问,它都是一样的。所以它就是一个典型的静态数据,但是它是个动态页面。
  2. 我们如果现在访问淘宝的首页,每个人看到的页面可能都是不一样的,淘宝首页中包含了很多根据访问者特征推荐的信息,而这些个性化的数据就可以理解为动态数据了。

理解了静态数据和动态数据,我估计你很容易就能想明白“动静分离”这个方案的来龙去脉了。分离了动静数据,我们就可以对分离出来的静态数据做缓存,有了缓存之后,静态数据的“访问效率”自然就提高了。

那么,怎样对静态数据做缓存呢?我在这里总结了几个重点:

  • 第一,你应该把静态数据缓存到离用户最近的地方。静态数据就是那些相对不会变化的数据,因此我们可以把它们缓存起来。缓存到哪里呢?常见的就三种,用户浏览器里、CDN上或者在服务端的Cache中。你应该根据情况,把它们尽量缓存到离用户最近的地方。
  • 第二,静态化改造就是要直接缓存HTTP连接。相较于普通的数据缓存而言,你肯定还听过系统的静态化改造。静态化改造是直接缓存HTTP连接而不是仅仅缓存数据,如下图所示,Web代理服务器根据请求URL,直接取出对应的HTTP响应头和响应体然后直接返回,这个响应过程简单得连HTTP协议都不用重新组装,甚至连HTTP请求头也不需要解析。
  • 第三,让谁来缓存静态数据也很重要。不同语言写的Cache软件处理缓存数据的效率也各不相同。以Java为例,因为Java系统本身也有其弱点(比如不擅长处理大量连接请求,每个连接消耗的内存较多,Servlet容器解析HTTP协议较慢),所以你可以不在Java层做缓存,而是直接在Web服务器层上做,这样你就可以屏蔽Java语言层面的一些弱点;而相比起来,Web服务器(如Nginx、Apache、Varnish)也更擅长处理大并发的静态文件请求

image.png

动静分离架构方案

在将整个系统做动静分离后,我们自然会想到更进一步的方案,就是将Cache进一步前移到CDN上,因为CDN离用户最近,效果会更好。

但是要想这么做,有以下几个问题需要解决。

  • 失效问题。前面我们也有提到过缓存时效的问题,不知道你有没有理解,我再来解释一下。谈到静态数据时,我说过一个关键词叫“相对不变”,它的言外之意是“可能会变化”。比如一篇文章,现在不变,但如果你发现个错别字,是不是就会变化了?如果你的缓存时效很长,那用户端在很长一段时间内看到的都是错的。所以,这个方案中也是,我们需要保证CDN可以在秒级时间内,让分布在全国各地的Cache同时失效,这对CDN的失效系统要求很高。
  • 命中率问题。Cache最重要的一个衡量指标就是“高命中率”,不然Cache的存在就失去了意义。同样,如果将数据全部放到全国的CDN上,必然导致Cache分散,而Cache分散又会导致访问请求命中同一个Cache的可能性降低,那么命中率就成为一个问题。
  • 发布更新问题。如果一个业务系统每周都有日常业务需要发布,那么发布系统必须足够简洁高效,而且你还要考虑有问题时快速回滚和排查问题的简便性

从前面的分析来看,将商品详情系统放到全国的所有CDN节点上是不太现实的,因为存在失效问题、命中率问题以及系统的发布更新问题。那么是否可以选择若干个节点来尝试实施呢?答案是“可以”,但是这样的节点需要满足几个条件:

  • 靠近访问量比较集中的地区;
  • 离主站相对较远;
  • 节点到主站间的网络比较好,而且稳定;
  • 节点容量比较大,不会占用其他CDN太多的资源
  • 节点不要太多

基于上面几个因素,选择CDN的二级Cache比较合适,因为二级Cache数量偏少,容量也更大,让用户的请求先回源的CDN的二级Cache中,如果没命中再回源站获取数据,部署方式如下图所示:

image.png

使用CDN的二级Cache作为缓存,可以达到和当前服务端静态化Cache类似的命中率,因为节点数不多,Cache不是很分散,访问量也比较集中,这样也就解决了命中率问题,同时能够给用户最好的访问体验,是当前比较理想的一种CDN化方案。除此之外,CDN化部署方案还有以下几个特点:

  • 把整个页面缓存在用户浏览器中;
  • 如果强制刷新整个页面,也会请求CDN;
  • 实际有效请求,只是用户对“刷新抢宝”按钮的点击。

这样就把90%的静态数据缓存在了用户端或者CDN上,当真正秒杀时,用户只需要点击特殊的“刷新抢宝”按钮,而不需要刷新整个页面。这样一来,系统只是向服务端请求很少的有效数据,而不需要重复请求大量的静态数据。

秒杀的动态数据和普通详情页面的动态数据相比更少,性能也提升了3倍以上。所以“抢宝”这种设计思路,让我们不用刷新页面就能够很好地请求到服务端最新的动态数据。

处理好系统的“热点数据”

假设你的系统中存储有几十亿上百亿的商品,而每天有千万级的商品被上亿的用户访问,那么肯定有一部分被大量用户访问的热卖商品,这就是我们常说的“热点商品”。

为什么要关注热点

首先,热点请求会大量占用服务器处理资源,虽然这个热点可能只占请求总量的亿分之一,然而却可能抢占90%的服务器资源,如果这个热点请求还是没有价值的无效请求,那么对系统资源来说完全是浪费

什么是“热点”

热点分为热点操作和热点数据。

所谓“热点操作”,例如大量的刷新页面、大量的添加购物车、双十一零点大量的下单等都属于此类操作。对系统来说,这些操作可以抽象为“读请求”和“写请求”,这两种热点请求的处理方式大相径庭,读请求的优化空间要大一些,而写请求的瓶颈一般都在存储层,优化的思路就是根据CAP理论做平衡

“热点数据”比较好理解,那就是用户的热点请求对应的数据。而热点数据又分为“静态热点数据”和“动态热点数据”

  • 所谓“静态热点数据”,就是能够提前预测的热点数据。例如,我们可以通过卖家报名的方式提前筛选出来,通过报名系统对这些热点商品进行打标。另外,我们还可以通过大数据分析来提前发现热点商品,比如我们分析历史成交记录、用户的购物车记录,来发现哪些商品可能更热门、更好卖,这些都是可以提前分析出来的热点
  • 所谓“动态热点数据”,就是不能被提前预测到的,系统在运行过程中临时产生的热点。例如,卖家在抖音上做了广告,然后商品一下就火了,导致它在短时间内被大量购买

发现热点数据

发现静态热点数据

如前面讲的,静态热点数据可以通过商业手段,例如强制让卖家通过报名参加的方式提前把热点商品筛选出来,实现方式是通过一个运营系统,把参加活动的商品数据进行打标,然后通过一个后台系统对这些热点商品进行预处理,如提前进行缓存。但是这种通过报名提前筛选的方式也会带来新的问题,即增加卖家的使用成本,而且实时性较差,也不太灵活。

  • 不过,除了提前报名筛选这种方式,你还可以通过技术手段提前预测,例如对买家每天访问的商品进行大数据计算,然后统计出TOP N的商品,我们可以认为这些TOP N的商品就是热点商品。

发现动态热点数据

动态热点发现系统的具体实现

  1. 构建一个异步的系统,它可以收集交易链路上各个环节中的中间件产品的热点Key,如Nginx、缓存、RPC服务框架等这些中间件(一些中间件产品本身已经有热点统计模块)。
  2. 建立一个热点上报和可以按照需求订阅的热点服务的下发规范,主要目的是通过交易链路上各个系统(包括详情、购物车、交易、优惠、库存、物流等)访问的时间差,把上游已经发现的热点透传给下游系统,提前做好保护。比如,对于大促高峰期,详情系统是最早知道的,在统一接入层上Nginx模块统计的热点URL。
  3. 将上游系统收集的热点数据发送到热点服务台,然后下游系统(如交易系统)就会知道哪些商品会被频繁调用,然后做热点保护。

这里我给出了一个图,其中用户访问商品时经过的路径有很多,我们主要是依赖前面的导购页面(包括首页、搜索页面、商品详情、购物车等)提前识别哪些商品的访问量高,通过这些系统中的中间件来收集热点数据,并记录到日志中。

image.png

我们通过部署在每台机器上的Agent把日志汇总到聚合和分析集群中,然后把符合一定规则的热点数据,通过订阅分发系统再推送到相应的系统中。你可以是把热点数据填充到Cache中,或者直接推送到应用服务器的内存中,还可以对这些数据进行拦截,总之下游系统可以订阅这些数据,然后根据自己的需求决定如何处理这些数据

流量削峰

削峰从本质上来说就是更多地延缓用户请求,以及层层过滤用户的访问需求,遵从最后落地到数据库的请求数要尽量少的原则
流量削峰主要有三种操作思路(排队,答题,过滤),简单说下

  1. 排队最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去,在这里,消息队列就像水库一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的
  2. 答题目的其实就是延缓请求,起到对请求流量进行削峰的作用,从而让系统能够更好地支持瞬时的流量高峰
  3. 前面介绍的排队和答题,要么是在接收请求时做缓冲,要么是减少请求的同时发送,而针对秒杀场景还有一种方法,就是对请求进行分层过滤,从而过滤掉一些无效的请求,从Web层接到请求,到缓存,消息队列,最终到数据库这样就像漏斗一样,尽量把数据量和请求量一层一层地过滤和减少了,最终,到漏斗最末端(数据库)的才是有效请求

几种流量消峰方案

排队

要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去。在这里,消息队列就像“水库”一样, 拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的

image.png
但是,如果流量峰值持续一段时间达到了消息队列的处理上限,例如本机的消息积压达到了存储空间的上限,消息队列同样也会被压垮,这样虽然保护了下游的系统,但是和直接把请求丢弃也没多大的区别。就像遇到洪水爆发时,即使是有水库恐怕也无济于事。

答题

你是否还记得,最早期的秒杀只是纯粹地刷新页面和点击购买按钮,它是后来才增加了答题功能的。那么,为什么要增加答题功能呢?

这主要是为了增加购买的复杂度,从而达到两个目的。

第一个目的是防止部分买家使用秒杀器在参加秒杀时作弊。2011年秒杀非常火的时候,秒杀器也比较猖獗,因而没有达到全民参与和营销的目的,所以系统增加了答题来限制秒杀器。增加答题后,下单的时间基本控制在2s后,秒杀器的下单比例也大大下降。答题页面如下图所示。

第二个目的其实就是延缓请求,起到对请求流量进行削峰的作用,从而让系统能够更好地支持瞬时的流量高峰。这个重要的功能就是把峰值的下单请求拉长,从以前的1s之内延长到2s~10s。这样一来,请求峰值基于时间分片了。这个时间的分片对服务端处理并发非常重要,会大大减轻压力。而且,由于请求具有先后顺序,靠后的请求到来时自然也就没有库存了,因此根本到不了最后的下单步骤,所以真正的并发写就非常有限了。这种设计思路目前用得非常普遍,如当年支付宝的“咻一咻”、微信的“摇一摇”都是类似的方式。

这里,我重点说一下秒杀答题的设计思路。

分层过滤

前面介绍的排队和答题要么是少发请求,要么对发出来的请求进行缓冲,而针对秒杀场景还有一种方法,就是对请求进行分层过滤,从而过滤掉一些无效的请求。分层过滤其实就是采用“漏斗”式设计来处理请求的,如下图所示

image.png

分层过滤的核心思想是:在不同的层次尽可能地过滤掉无效请求,让“漏斗”最末端的才是有效请求。而要达到这种效果,我们就必须对数据做分层的校验

分层校验的基本原则是:

  • 将动态请求的读数据缓存(Cache)在Web端,过滤掉无效的数据读;
  • 对读数据不做强一致性校验,减少因为一致性校验产生瓶颈的问题;
  • 对写数据进行基于时间的合理分片,过滤掉过期的失效请求;
  • 对写请求做限流保护,将超出系统承载能力的请求过滤掉;
  • 对写数据进行强一致性校验,只保留最后有效的数据

分层校验的目的是:在读系统中,尽量减少由于一致性校验带来的系统瓶颈,但是尽量将不影响性能的检查条件提前,如用户是否具有秒杀资格、商品状态是否正常、用户答题是否正确、秒杀是否已经结束、是否非法请求、营销等价物是否充足等;在写数据系统中,主要对写的数据(如“库存”)做一致性检查,最后在数据库层保证数据的最终准确性(如“库存”不能减为负数)

秒杀实践(纯后端设计部分)

  1. 用户通过前端校验最终发起请求到后端
  2. 然后校验库存,扣库存,创建订单
  3. 最终数据落地,持久化保存

准备工作

数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
sql复制代码--- 删除数据库
drop database seckill;
--- 创建数据库
create database seckill;
--- 使用数据库
use seckill;
--- 创建库存表
DROP TABLE IF EXISTS `t_seckill_stock`;
CREATE TABLE `t_seckill_stock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '库存ID',
`name` varchar(50) NOT NULL DEFAULT 'OnePlus 7 Pro' COMMENT '名称',
`count` int(11) NOT NULL COMMENT '库存',
`sale` int(11) NOT NULL COMMENT '已售',
`version` int(11) NOT NULL COMMENT '乐观锁,版本号',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='库存表';
--- 插入一条商品,初始化10个库存
INSERT INTO `t_seckill_stock` (`count`, `sale`, `version`) VALUES ('10', '0', '0');
--- 创建库存订单表
DROP TABLE IF EXISTS `t_seckill_stock_order`;
CREATE TABLE `t_seckill_stock_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`stock_id` int(11) NOT NULL COMMENT '库存ID',
`name` varchar(30) NOT NULL DEFAULT 'OnePlus 7 Pro' COMMENT '商品名称',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='库存订单表';

缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
ruby复制代码1、安装
brew install redis

2、查看安装及配置文件位置
- Homebrew安装的软件会默认在`/usr/local/Cellar/`路径下
- redis的配置文件`redis.conf`存放在`/usr/local/etc`路径下

3、启动redis服务
//方式一:使用brew帮助我们启动软件
brew services start redis
//方式二
redis-server /usr/local/etc/redis.conf

//执行以下命令
redis-server

4、查看redis服务进程
ps axu | grep redis

5、redis-cli连接redis服务
//redis默认端口号**6379**,默认**auth**为空,输入以下命令即可连接
redis-cli -h 127.0.0.1 -p 6379

6、启动 redis 客户端,打开终端并输入命令 **redis-cli**。该命令会连接本地的 redis 服务
$redis-cli
redis 127.0.0.1:6379>
redis 127.0.0.1:6379> PING
PONG


7、关闭redis服务
//正确停止Redis的方式应该是向Redis发送SHUTDOWN命令
redis-cli shutdown

//强行终止redis
sudo pkill redis-server

代码项目

github.com/lmandlyp163…

传统方式

我们首先搭建一个后台服务接口(实现校验库存,扣库存,创建订单),不做任何限制,使用JMeter,模拟500个并发线程测试购买10个库存的商品

思路介绍

不做任何控制,按照流程进行检查库存,扣库存,下订单,这种方式会存在并发问题

代码实现

接口入口

1
2
3
4
5
6
7
8
less复制代码/**
* 传统方式下订单
*/
@PostMapping("/createWrongOrder/{id}")
public ResponseBean createWrongOrder(@PathVariable("id") Integer id) throws Exception {
Integer orderCount = seckillEvolutionService.createWrongOrder(id);
return new ResponseBean(HttpStatus.OK.value(), "购买成功", orderCount);
}

核心service 逻辑(纯DB操作):校验库存,扣库存,创建订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Override
@Transactional(rollbackFor = Exception.class)
public Integer createWrongOrder(Integer id) throws Exception {
// 检查库存
StockDto stockDto = stockDao.selectByPrimaryKey(id);
if (stockDto.getCount() <= 0) {
throw new CustomException("库存不足");
}
// 扣库存
stockDto.setCount(stockDto.getCount() - 1);
stockDto.setSale(stockDto.getSale() + 1);
Integer saleCount = stockDao.updateByPrimaryKey(stockDto);
if (saleCount <= 0) {
throw new CustomException("扣库存失败");
}
// 下订单
StockOrderDto stockOrderDto = new StockOrderDto();
stockOrderDto.setStockId(stockDto.getId());
Integer orderCount = stockOrderDao.insertSelective(stockOrderDto);
if (saleCount <= 0) {
throw new CustomException("下订单失败");
}
return orderCount;
}

开始测试

使用JMeter测试上面的代码,JMeter的使用可以查看:JMeter的安装使用

1、初始化数据库库存

image.png
2、配置JMeter

image.png

image.png

image.png
打开JMeter,添加测试计划模拟1000个并发线程测试秒杀10个库存的商品,填写请求地址,点击启动图标开始

3、结果

image.png
商品实际显示为卖出10,库存还有0,而订单表却有大于10条数据

Druid SQL分析

image.png

  • 发现SQL下单与更新库存74次、明显出现了库存超卖

可以发现并发事务下会出现错误,出现卖超问题,这是因为同一时间大量线程同时请求校验库存,扣库存,创建订单,这三个操作不在同一个原子,比如,很多线程同时读到库存为10,这样都穿过了校验库存的判断,所以出现卖超问题

使用乐观锁控制超卖

在传统方式下会出现超卖,所以在这种情况下就引入了锁的概念,锁区分为乐观锁和悲观锁,悲观锁都是牺牲性能保证数据,所以在这种高并发场景下,一般都是使用乐观锁解决

思路介绍

这次我们引入乐观锁,这里可以先查看一篇文章: MySQL那些锁(opens new window)

主要改造是扣库存,每个线程在检查库存的时候会拿到当前商品的乐观锁版本号,然后在扣库存时,如果版本号不对,就会扣减失败,抛出异常结束,这样每个版本号就只能有一个线程操作成功,其他相同版本号的线程秒杀失败,就不会存在卖超问题了

代码实现

接口入口

1
2
3
4
5
6
7
8
less复制代码/**
* 使用乐观锁下订单
*/
@PostMapping("/createOptimisticLockOrder/{id}")
public ResponseBean createOptimisticLockOrder(@PathVariable("id") Integer id) throws Exception {
Integer orderCount = seckillEvolutionService.createOptimisticLockOrder(id);
return new ResponseBean(HttpStatus.OK.value(), "购买成功", orderCount);
}

核心service 逻辑(纯DB操作):校验库存,扣库存,创建订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@Override
@Transactional(rollbackFor = Exception.class)
public Integer createOptimisticLockOrder(Integer id) throws Exception {
// 检查库存
StockDto stockDto = stockDao.selectByPrimaryKey(id);
if (stockDto.getCount() <= 0) {
throw new CustomException("库存不足");
}
// 扣库存
Integer saleCount = stockDao.updateByOptimisticLock(stockDto);
if (saleCount <= 0) {
throw new CustomException("扣库存失败");
}
// 下订单
StockOrderDto stockOrderDto = new StockOrderDto();
stockOrderDto.setStockId(stockDto.getId());
Integer orderCount = stockOrderDao.insertSelective(stockOrderDto);
if (saleCount <= 0) {
throw new CustomException("下订单失败");
}
return orderCount;
}
1
2
3
4
5
6
7
less复制代码/**
* 乐观锁更新扣减库存
*/
@Update("UPDATE t_seckill_stock SET count = count - 1, sale = sale + 1, version = version + 1 " +
"WHERE id = #{id, jdbcType = INTEGER} AND version = #{version, jdbcType = INTEGER} " +
"")
int updateByOptimisticLock(StockDto stockDto);

开始测试

使用JMeter测试上面的代码,JMeter的使用可以查看:JMeter的安装使用

1、初始化数据库库存

image.png

2、配置JMeter

image.png

image.png

打开JMeter,添加测试计划模拟1000个并发线程测试秒杀10个库存的商品,填写请求地址,点击启动图标开始

3、结果

image.png
商品实际显示为卖出10,库存还有0,而订单表也只有10条数据

Druid SQL分析

image.png
多个线程同时在检查库存的时候都会拿到当前商品的相同乐观锁版本号,然后在扣库存时,如果版本号不对,就会扣减失败,抛出异常结束,这样每个版本号就只能有第一个线程扣库存操作成功,其他相同版本号的线程秒杀失败,就不会存在卖超问题了

使用缓存&乐观锁

思路介绍

1、一般秒杀都会提前预热缓存,我们添加一个缓存预热的方法,初始化库存后再缓存预热,这样不会出现Cache读取Miss的情况

2、这里我采用的是先更新数据库再更新缓存,因为这里缓存数据计算简单,只需要进行加减一即可,所以我们直接进行更新缓存

3、这次主要改造是检查库存和扣库存方法,检查库存直接去Redis获取,不再去查数据库,而在扣库存这里本身是使用的乐观锁操作,只有操作成功(扣库存成功)的才需要更新缓存数据

代码实现

缓存预热

1
2
3
4
5
6
7
8
9
10
11
12
less复制代码/**
* 缓存预热
*/
@PostMapping("/initCache/{id}")
public ResponseBean initCache(@PathVariable("id") Integer id) {
StockDto stockDto = stockService.selectByPrimaryKey(id);
// 商品缓存预热
JedisUtil.set(Constant.PREFIX_COUNT + id.toString(), stockDto.getCount().toString());
JedisUtil.set(Constant.PREFIX_SALE + id.toString(), stockDto.getSale().toString());
JedisUtil.set(Constant.PREFIX_VERSION + id.toString(), stockDto.getVersion().toString());
return new ResponseBean(HttpStatus.OK.value(), "缓存预热成功", null);
}

接口入口

1
2
3
4
5
6
7
8
9
10
11
less复制代码    /**
* 使用乐观锁下订单,并且添加读缓存,性能提升
*/
@PostMapping("/createOptimisticLockOrderWithRedis/{id}")
public ResponseBean createOptimisticLockOrderWithRedis(@PathVariable("id") Integer id) throws Exception {
// 错误的,线程不安全
// Integer orderCount = seckillEvolutionService.createOptimisticLockOrderWithRedisWrong(id);
// 正确的,线程安全
Integer orderCount = seckillEvolutionService.createOptimisticLockOrderWithRedisSafe(id);
return new ResponseBean(HttpStatus.OK.value(), "购买成功", null);
}

核心service 逻辑:校验库存缓存,扣库存,创建订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
scss复制代码@Override
@Transactional(rollbackFor = Exception.class)
public Integer createOptimisticLockOrderWithRedisSafe(Integer id) throws Exception {
// 检查库存
// 使用缓存读取库存,减轻DB压力,Redis批量操作(具有原子性)解决线程安全问题
List<String> dataList = JedisUtil.mget(Constant.PREFIX_COUNT + id,
Constant.PREFIX_SALE + id, Constant.PREFIX_VERSION + id);
Integer count = Integer.parseInt(dataList.get(0));
Integer sale = Integer.parseInt(dataList.get(1));
Integer version = Integer.parseInt(dataList.get(2));
if (count <= 0) {
throw new CustomException("库存不足");
}
// 还有库存
StockDto stockDto = new StockDto();
stockDto.setId(id);
stockDto.setCount(count);
stockDto.setSale(sale);
stockDto.setVersion(version);
// 扣库存
Integer saleCount = stockDao.updateByOptimisticLock(stockDto);
// 操作数据大于0,说明扣库存成功
if (saleCount > 0) {
logger.info("版本号:{} {} {}", stockDto.getCount(), stockDto.getSale(), stockDto.getVersion());
// 更新缓存,这里更新需要保证三个数据(库存,已售,乐观锁版本号)的一致性,使用mset原子操作
updateCache(stockDto);
}
if (saleCount <= 0) {
throw new CustomException("扣库存失败");
}
// 下订单
StockOrderDto stockOrderDto = new StockOrderDto();
stockOrderDto.setStockId(stockDto.getId());
Integer orderCount = stockOrderDao.insertSelective(stockOrderDto);
if (saleCount <= 0) {
throw new CustomException("下订单失败");
}
Thread.sleep(10);
return orderCount;
}
1
2
3
4
5
6
7
8
9
10
11
12
scss复制代码/**
* 这里遵循先更新数据库,再更新缓存,详细的数据库与缓存一致性解析可以查看
* https://note.dolyw.com/cache/00-DataBaseConsistency.html
*/
public void updateCache(StockDto stockDto) {
Integer count = stockDto.getCount() - 1;
Integer sale = stockDto.getSale() + 1;
Integer version = stockDto.getVersion() + 1;
JedisUtil.mset(Constant.PREFIX_COUNT + stockDto.getId(), count.toString(),
Constant.PREFIX_SALE + stockDto.getId(), sale.toString(),
Constant.PREFIX_VERSION + stockDto.getId(), version.toString());
}

开始测试

使用JMeter测试上面的代码,JMeter的使用可以查看:JMeter的安装使用

0、初始化缓存库存

image.png

image.png

1、初始化数据库库存

image.png

2、配置JMeter

image.png

image.png

打开JMeter,添加测试计划模拟1000个并发线程测试秒杀10个库存的商品,填写请求地址,点击启动图标开始

3、结果

image.png

image.png

商品实际显示为卖出10,库存还有0,而订单表也只有10条数据

Druid SQL分析

image.png
使用了缓存,可以看到库存查询SQL,只执行了一次,就是缓存预热那执行了一次,不像之前每次库存都去查数据库

分布式限流&缓存&乐观锁

思路介绍

之前说到乐观锁更新操作还是执行了近 100 次 SQL,其实这 100 次里就只有 10 次扣库存成功才是有效请求,其他的都是无效请求,为了遵从最后落地到数据库的请求数要尽量少的原则,这里我们使用限流,把大部分无效请求拦截,尽可能保证最终到达数据库的都是有效请求,限流算法参考限流算法

我们这里使用固定时间窗口最好,这里使用 Redis + Lua 的分布式限流方式

代码实现

缓存预热

1
2
3
4
5
6
7
8
9
10
11
12
less复制代码/**
* 缓存预热
*/
@PostMapping("/initCache/{id}")
public ResponseBean initCache(@PathVariable("id") Integer id) {
StockDto stockDto = stockService.selectByPrimaryKey(id);
// 商品缓存预热
JedisUtil.set(Constant.PREFIX_COUNT + id.toString(), stockDto.getCount().toString());
JedisUtil.set(Constant.PREFIX_SALE + id.toString(), stockDto.getSale().toString());
JedisUtil.set(Constant.PREFIX_VERSION + id.toString(), stockDto.getVersion().toString());
return new ResponseBean(HttpStatus.OK.value(), "缓存预热成功", null);
}

Lua脚本

  • 秒级限流(每秒限制多少请求)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
lua复制代码-- 实现原理
-- 每次请求都将当前时间,精确到秒作为 key 放入 Redis 中
-- 超时时间设置为 2s, Redis 将该 key 的值进行自增
-- 当达到阈值时返回错误,表示请求被限流
-- 写入 Redis 的操作用 Lua 脚本来完成
-- 利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性

-- 资源唯一标志位
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])

-- 获取当前流量大小
local currentLimit = tonumber(redis.call('get', key) or "0")

if currentLimit + 1 > limit then
-- 达到限流大小 返回
return 0;
else
-- 没有达到阈值 value + 1
redis.call("INCRBY", key, 1)
-- 设置过期时间
redis.call("EXPIRE", key, 2)
return currentLimit + 1
end
  • 自定义参数限流(自定义多少时间限制多少请求)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
lua复制代码-- 实现原理
-- 每次请求都去 Redis 取到当前限流开始时间和限流累计请求数
-- 判断限流开始时间加超时时间戳(限流时间)大于当前请求时间戳
-- 再判断当前时间窗口请求内是否超过限流最大请求数
-- 当达到阈值时返回错误,表示请求被限流,否则通过
-- 写入 Redis 的操作用 Lua 脚本来完成
-- 利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性

-- 一个时间窗口开始时间(限流开始时间)key名称
local timeKey = KEYS[1]
-- 一个时间窗口内请求的数量累计(限流累计请求数)key名称
local requestKey = KEYS[2]
-- 限流大小,限流最大请求数
local maxRequest = tonumber(ARGV[1])
-- 当前请求时间戳
local nowTime = tonumber(ARGV[2])
-- 超时时间戳,一个时间窗口时间(毫秒)(限流时间)
local timeRequest = tonumber(ARGV[3])

-- 获取限流开始时间,不存在为0
local currentTime = tonumber(redis.call('get', timeKey) or "0")
-- 获取限流累计请求数,不存在为0
local currentRequest = tonumber(redis.call('get', requestKey) or "0")

-- 判断当前请求时间戳是不是在当前时间窗口中
-- 限流开始时间加超时时间戳(限流时间)大于当前请求时间戳
if currentTime + timeRequest > nowTime then
-- 判断当前时间窗口请求内是否超过限流最大请求数
if currentRequest + 1 > maxRequest then
-- 在时间窗口内且超过限流最大请求数,返回
return 0;
else
-- 在时间窗口内且请求数没超,请求数加一
redis.call("INCRBY", requestKey, 1)
return currentRequest + 1;
end
else
-- 超时后重置,开启一个新的时间窗口
redis.call('set', timeKey, nowTime)
redis.call('set', requestKey, '0')
-- 设置过期时间
redis.call("EXPIRE", timeKey, timeRequest / 1000)
redis.call("EXPIRE", requestKey, timeRequest / 1000)
-- 请求数加一
redis.call("INCRBY", requestKey, 1)
return 1;
end

接口入口

1
2
3
4
5
6
7
8
9
10
less复制代码/**
* 使用乐观锁下订单,并且添加读缓存,再添加限流
*/
@Limit
@PostMapping("/createOptimisticLockOrderWithRedisLimit/{id}")
public ResponseBean createOptimisticLockOrderWithRedisLimit(@PathVariable("id") Integer id) throws Exception {
// 正确的,线程安全
Integer orderCount = seckillEvolutionService.createOptimisticLockOrderWithRedisSafe(id);
return new ResponseBean(HttpStatus.OK.value(), "购买成功", null);
}

限流注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
less复制代码/**
* 限流注解
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Limit {

/**
* 限流最大请求数
* @return
*/
String maxRequest() default "10";

/**
* 一个时间窗口(毫秒)
* @return
*/
String timeRequest() default "1000";

}

LimitAspect限流切面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
less复制代码/**
* LimitAspect限流切面
*/
@Order(0)
@Aspect
@Component
public class LimitAspect {

/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(LimitAspect.class);

/**
* 一个时间窗口时间(毫秒)(限流时间)
*/
private static final String TIME_REQUEST = "1000";

/**
* RedisLimitUtil
*/
@Autowired
private RedisLimitUtil redisLimitUtil;

/**
* 对应注解
*/
@Pointcut("@annotation(com.example.limit.Limit)")
public void aspect() {}

/**
* 切面
*/
@Around("aspect() && @annotation(limit)")
public Object Interceptor(ProceedingJoinPoint proceedingJoinPoint, Limit limit) {
Object result = null;
Long maxRequest = 0L;
// 一个时间窗口(毫秒)为1000的话默认调用秒级限流判断(每秒限制多少请求)
if (TIME_REQUEST.equals(limit.timeRequest())) {
maxRequest = redisLimitUtil.limit(limit.maxRequest());
} else {
maxRequest = redisLimitUtil.limit(limit.maxRequest(), limit.timeRequest());
}
// 返回请求数量大于0说明不被限流
if (maxRequest > 0) {
// 放行,执行后续方法
try {
result = proceedingJoinPoint.proceed();
} catch (Throwable throwable) {
throw new CustomException(throwable.getMessage());
}
} else {
// 直接返回响应结果
throw new CustomException("请求拥挤,请稍候重试");
}
return result;
}

/**
* 执行方法前再执行
*/
@Before("aspect() && @annotation(limit)")
public void before(Limit limit) {
// logger.info("before");
}

/**
* 执行方法后再执行
*/
@After("aspect() && @annotation(limit)")
public void after(Limit limit) {
// logger.info("after");
}

}

RedisLimitUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
csharp复制代码/**
* RedisLimitUtil
*/
@Component
public class RedisLimitUtil {

/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(RedisLimitUtil.class);

/**
* 秒级限流(每秒限制多少请求)字符串脚本
*/
private static String LIMIT_SECKILL_SCRIPT = null;

/**
* 自定义参数限流(自定义多少时间限制多少请求)字符串脚本
*/
private static String LIMIT_CUSTOM_SCRIPT = null;

/**
* redis-key-前缀-limit-限流
*/
private static final String LIMIT = "limit:";

/**
* redis-key-名称-limit-一个时间窗口内请求的数量累计(限流累计请求数)
*/
private static final String LIMIT_REQUEST = "limit:request";

/**
* redis-key-名称-limit-一个时间窗口开始时间(限流开始时间)
*/
private static final String LIMIT_TIME = "limit:time";

/**
* 构造方法初始化加载Lua脚本
*/
public RedisLimitUtil() {
LIMIT_SECKILL_SCRIPT = getScript("redis/limit-seckill.lua");
LIMIT_CUSTOM_SCRIPT = getScript("redis/limit-custom.lua");
}

/**
* 秒级限流判断(每秒限制多少请求)
*/
public Long limit(String maxRequest) {
// 获取key名,当前时间戳
String key = LIMIT + String.valueOf(System.currentTimeMillis() / 1000);
// 传入参数,限流最大请求数
List<String> args = new ArrayList<>();
args.add(maxRequest);
return eval(LIMIT_SECKILL_SCRIPT, Collections.singletonList(key), args);
}

/**
* 自定义参数限流判断(自定义多少时间限制多少请求)
*/
public Long limit(String maxRequest, String timeRequest) {
// 获取key名,一个时间窗口开始时间(限流开始时间)和一个时间窗口内请求的数量累计(限流累计请求数)
List<String> keys = new ArrayList<>();
keys.add(LIMIT_TIME);
keys.add(LIMIT_REQUEST);
// 传入参数,限流最大请求数,当前时间戳,一个时间窗口时间(毫秒)(限流时间)
List<String> args = new ArrayList<>();
args.add(maxRequest);
args.add(String.valueOf(System.currentTimeMillis()));
args.add(timeRequest);
return eval(LIMIT_CUSTOM_SCRIPT, keys, args);
}

/**
* 执行Lua脚本方法
*/
private Long eval(String script, List<String> keys, List<String> args) {
// 执行脚本
Object result = JedisUtil.eval(script, keys, args);
// 结果请求数大于0说明不被限流
return (Long) result;
}

/**
* 获取Lua脚本
*/
private static String getScript(String path) {
StringBuilder stringBuilder = new StringBuilder();
InputStream inputStream = RedisLimitUtil.class.getClassLoader().getResourceAsStream(path);
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
String str;
while ((str = bufferedReader.readLine()) != null) {
stringBuilder.append(str).append(System.lineSeparator());
}
} catch (IOException e) {
logger.error(Arrays.toString(e.getStackTrace()));
throw new CustomException("获取Lua限流脚本出现问题: " + Arrays.toString(e.getStackTrace()));
}
return stringBuilder.toString();
}

}

JedisUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
typescript复制代码    /**
* 脚本执行
*/
public static Object eval(String script, List<String> keys, List<String> args) {
Object result = null;
try (Jedis jedis = jedisPool.getResource()) {
result = jedis.eval(script, keys, args);
return result;
} catch (Exception e) {
throw new CustomException("Redis脚本执行eval方法异常:script=" + script + " keys=" +
keys.toString() + " args=" + args.toString() + " cause=" + e.getMessage());
}
}

核心service 逻辑:先限流,再校验库存缓存,扣库存,创建订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
scss复制代码@Override
@Transactional(rollbackFor = Exception.class)
public Integer createOptimisticLockOrderWithRedisSafe(Integer id) throws Exception {
// 检查库存
// 使用缓存读取库存,减轻DB压力,Redis批量操作(具有原子性)解决线程安全问题
List<String> dataList = JedisUtil.mget(Constant.PREFIX_COUNT + id,
Constant.PREFIX_SALE + id, Constant.PREFIX_VERSION + id);
Integer count = Integer.parseInt(dataList.get(0));
Integer sale = Integer.parseInt(dataList.get(1));
Integer version = Integer.parseInt(dataList.get(2));
if (count <= 0) {
throw new CustomException("库存不足");
}
// 还有库存
StockDto stockDto = new StockDto();
stockDto.setId(id);
stockDto.setCount(count);
stockDto.setSale(sale);
stockDto.setVersion(version);
// 扣库存
Integer saleCount = stockDao.updateByOptimisticLock(stockDto);
// 操作数据大于0,说明扣库存成功
if (saleCount > 0) {
logger.info("版本号:{} {} {}", stockDto.getCount(), stockDto.getSale(), stockDto.getVersion());
// 更新缓存,这里更新需要保证三个数据(库存,已售,乐观锁版本号)的一致性,使用mset原子操作
updateCache(stockDto);
}
if (saleCount <= 0) {
throw new CustomException("扣库存失败");
}
// 下订单
StockOrderDto stockOrderDto = new StockOrderDto();
stockOrderDto.setStockId(stockDto.getId());
Integer orderCount = stockOrderDao.insertSelective(stockOrderDto);
if (saleCount <= 0) {
throw new CustomException("下订单失败");
}
Thread.sleep(10);
return orderCount;
}
1
2
3
4
5
6
7
8
9
10
11
12
scss复制代码/**
* 这里遵循先更新数据库,再更新缓存,详细的数据库与缓存一致性解析可以查看
* https://note.dolyw.com/cache/00-DataBaseConsistency.html
*/
public void updateCache(StockDto stockDto) {
Integer count = stockDto.getCount() - 1;
Integer sale = stockDto.getSale() + 1;
Integer version = stockDto.getVersion() + 1;
JedisUtil.mset(Constant.PREFIX_COUNT + stockDto.getId(), count.toString(),
Constant.PREFIX_SALE + stockDto.getId(), sale.toString(),
Constant.PREFIX_VERSION + stockDto.getId(), version.toString());
}

开始测试

使用JMeter测试上面的代码,JMeter的使用可以查看:JMeter的安装使用

0、初始化缓存库存

image.png

image.png

1、初始化数据库库存

image.png

2、配置JMeter

image.png
PS: 这次我们填写 Ramp-Up 时间为 5 秒,意思为执行 5 秒,每秒执行 100 个并发,因为如果都在 1S 内执行完,会被限流

image.png

打开JMeter,添加测试计划模拟1000个并发线程测试秒杀10个库存的商品,填写请求地址,点击启动图标开始

3、结果

image.png
我们看下后台日志,可以看到很多请求直接被限流限制了,这样就达到了我们的目的

image.png

image.png

商品实际显示为卖出10,库存还有0,而订单表也只有10条数据

Druid SQL分析

image.png
使用了限流,可以看到乐观锁更新不像之前那样执行 61 次了,只执行了 19 次,很多请求直接被限流了

异步下单

那我们还可以怎么优化提高吞吐量以及性能呢,我们上文所有例子其实都是同步请求,完全可以利用同步转异步来提高性能,这里我们将下订单的操作进行异步化,利用消息队列来进行解耦,这样可以然 DB 异步执行下单

每当一个请求通过了限流和库存校验之后就将订单信息发给消息队列,这样一个请求就可以直接返回了,消费程序做下订单的操作,对数据进行入库落地,因为异步了,所以最终需要采取回调或者是其他提醒的方式提醒用户购买完成

参考

  • 感谢网络大神们的笔记资料
    www.mamicode.com/info-detail…
    time.geekbang.org/column/arti…
    note.dolyw.com/distributed…
    note.dolyw.com/seckill-evo…
    www.cnblogs.com/stulzq/p/89…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JAVA开发Mysql limit概述与优化 1、limit

发表于 2021-07-22

1、limit 分页

在我们使用查询语句的时候,经常要返回前几条或者中间某几行数据,在数据过多的时候想要分页,我们应该怎么解决这个问题?

在mysql中,它为我们提供了这样一个功能。

1
sql复制代码    SELECT * FROM table LIMIT [offset,] rows | rows OFFSET offset

limit 子句可以被用于强制 select语句返回指定的记录数,limit 只能接受一个或两个数字参数,并且参数必须是一个整数常量。

如果给定两个参数,第一个参数指定第一个返回记录行的偏移量,第二个参数指定返回记录行的最大数目。

初始记录行的偏移量是 0(而不是 1): 为了与 PostgreSQL 兼容,MySQL 也支持句法: limit 检索记录行

1
2
3
4
5
6
7
8
9
10
11
sql复制代码SELECT * FROM table LIMIT 0; // 起始记录行为 0

SELECT * FROM table LIMIT 3,10; // 检索记录行 3-10

//为了检索从某一个偏移量到记录集的结束所有的记录行,可以指定第二个参数为 -1:
SELECT * FROM table LIMIT 10,-1; // 检索记录行 10-last. 这个只适用于特定的mysql数据库版本, last最末

//如果只给定一个参数,它表示返回最大的记录行数目:
SELECT * FROM table LIMIT 10; //检索前 10 个记录行

//换句话说,LIMIT n 等价于 LIMIT 0,n。
1
2
3
4
sql复制代码limit X,Y ,跳过前X条数据,读取Y条数据

X表示第一个返回记录行的偏移量,Y表示返回记录行的最大数目
如果X为0的话,即 limit 0, Y,相当于limit Y、

1.2 入门测试:

题目描述

有一个员工employees表简况如下:
在这里插入图片描述
建表语句如下:

1
2
3
4
5
6
7
8
sql复制代码CREATE TABLE `employees` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` char(1) NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`));

1.请你查找employees里最晚入职员工的所有信息,以上例子输出如下:
在这里插入图片描述
2.请你查找employees里入职员工时间排名倒数第三的员工所有信息,以上例子输出如下: 在这里插入图片描述

2、limit 的性能分析

【参考大佬博客:悠悠i】
当需要从数据库查询的表有上万条记录的时候,一次性查询所有结果会变得很慢,特别是随着数据量的增加特别明显,这时需要使用分页查询。对于数据库分页查询,也有很多种方法和优化的点。下面简单说一下我知道的一些方法。

2.1 准备工作

为了对下面列举的一些优化进行测试,下面针对已有的一张表进行说明。

表名:order_history
描述:某个业务的订单历史表
主要字段:unsigned int id,tinyint(4) int type
字段情况:该表一共37个字段,不包含text等大型数据,最大为varchar(500),id字段为索引,且为递增。
数据量:5709294
MySQL版本:5.7.16
线下找一张百万级的测试表可不容易,如果需要自己测试的话,可以写shell脚本什么的插入数据进行测试。
以下的 sql 所有语句执行的环境没有发生改变,下面是基本测试结果:

1
sql复制代码select count(*) from orders_history;

返回结果:5709294

三次查询时间分别为:

  • 8903 ms
  • 8323 ms
  • 8401 ms

2.2 一般分页查询

一般的分页查询使用简单的 limit 子句就可以实现。limit 子句声明如下:

1
sql复制代码SELECT * FROM table LIMIT [offset,] rows | rows OFFSET offset

LIMIT 子句可以被用于指定 SELECT 语句返回的记录数。

需注意以下几点:

  • 第一个参数指定第一个返回记录行的偏移量,注意从0开始
  • 第二个参数指定返回记录行的最大数目
  • 如果只给定一个参数:它表示返回最大的记录行数目
  • 第二个参数为 -1 表示检索从某一个偏移量到记录集的结束所有的记录行
  • 初始记录行的偏移量是 0(而不是 1)

下面是一个应用实例:

1
sql复制代码select * from orders_history where type=8 limit 1000,10;

该条语句将会从表 orders_history 中查询offset: 1000开始之后的10条数据,也就是第1001条到第1010条数据(1001 <= id <= 1010)。

数据表中的记录默认使用主键(一般为id)排序,上面的结果相当于:

1
sql复制代码select * from orders_history where type=8 order by id limit 10000,10;

三次查询时间分别为:

  • 3040 ms
  • 3063 ms
  • 3018 ms

针对这种查询方式,下面测试查询记录量对时间的影响:

1
2
3
4
5
sql复制代码select * from orders_history where type=8 limit 10000,1;
select * from orders_history where type=8 limit 10000,10;
select * from orders_history where type=8 limit 10000,100;
select * from orders_history where type=8 limit 10000,1000;
select * from orders_history where type=8 limit 10000,10000;

三次查询时间如下:

1
2
3
4
5
sql复制代码查询1条记录:3072ms 3092ms 3002ms
查询10条记录:3081ms 3077ms 3032ms
查询100条记录:3118ms 3200ms 3128ms
查询1000条记录:3412ms 3468ms 3394ms
查询10000条记录:3749ms 3802ms 3696ms

另外我还做了十来次查询,从查询时间来看,基本可以确定,在查询记录量低于100时,查询时间基本没有差距,随着查询记录量越来越大,所花费的时间也会越来越多。

针对查询偏移量的测试:

1
2
3
4
5
sql复制代码select * from orders_history where type=8 limit 100,100;
select * from orders_history where type=8 limit 1000,100;
select * from orders_history where type=8 limit 10000,100;
select * from orders_history where type=8 limit 100000,100;
select * from orders_history where type=8 limit 1000000,100;

三次查询时间如下:

1
2
3
4
5
sql复制代码查询100偏移:25ms 24ms 24ms
查询1000偏移:78ms 76ms 77ms
查询10000偏移:3092ms 3212ms 3128ms
查询100000偏移:3878ms 3812ms 3798ms
查询1000000偏移:14608ms 14062ms 14700ms

随着查询偏移的增大,尤其查询偏移大于10万以后,查询时间急剧增加。

这种分页查询方式会从数据库第一条记录开始扫描,所以越往后,查询速度越慢,而且查询的数据越多,也会拖慢总查询速度。

2.3 使用子查询优化

这种方式先定位偏移位置的 id,然后往后查询,这种方式适用于 id 递增的情况。

1
sql复制代码select * from orders_history where type=8 limit 100000,1;
1
sql复制代码select id from orders_history where type=8 limit 100000,1;
1
2
3
sql复制代码select * from orders_history where type=8 and 
id>=(select id from orders_history where type=8 limit 100000,1)
limit 100;
1
sql复制代码select * from orders_history where type=8 limit 100000,100;

四条语句的查询时间如下:

  • 第1条语句:3674ms
  • 第2条语句:1315ms
  • 第3条语句:1327ms
  • 第4条语句:3710ms

针对上面的查询需要注意:

  • 比较第1条语句和第2条语句:使用 select id 代替 select * 速度增加了3倍
  • 比较第2条语句和第3条语句:速度相差几十毫秒
  • 比较第3条语句和第4条语句:得益于 select id 速度增加,第3条语句查询速度增加了3倍

这种方式相较于原始一般的查询方法,将会增快数倍。

2.4 使用 id 限定优化:

这种方式假设数据表的id是连续递增的,则我们根据查询的页数和查询的记录数可以算出查询的id的范围,可以使用 id between and 来查询:

1
2
sql复制代码select * from orders_history where type=2 
and id between 1000000 and 1000100 limit 100;

查询时间:15ms 12ms 9ms

这种查询方式能够极大地优化查询速度,基本能够在几十毫秒之内完成。限制是只能使用于明确知道id的情况,不过一般建立表的时候,都会添加基本的id字段,这为分页查询带来很多便利。

还可以有另外一种写法:

1
sql复制代码select * from orders_history where id >= 1000001 limit 100;

当然还可以使用 in 的方式来进行查询,这种方式经常用在多表关联的时候进行查询,使用其他表查询的id集合,来进行查询:

1
2
3
sql复制代码select * from orders_history where id in
(select order_id from trade_2 where goods = 'pen')
limit 100;

这种 in 查询的方式要注意:某些 mysql 版本不支持在 in 子句中使用 limit。

2.5 使用临时表优化

这种方式已经不属于查询优化,这儿附带提一下。

对于使用 id 限定优化中的问题,需要 id 是连续递增的,但是在一些场景下,比如使用历史表的时候,或者出现过数据缺失问题时,可以考虑使用临时存储的表来记录分页的id,使用分页的id来进行 in 查询。这样能够极大的提高传统的分页查询速度,尤其是数据量上千万的时候。

2.6 关于数据表的id说明

一般情况下,在数据库中建立表的时候,强制为每一张表添加 id 递增字段,这样方便查询。

如果像是订单库等数据量非常庞大,一般会进行分库分表。这个时候不建议使用数据库的 id 作为唯一标识,而应该使用分布式的高并发唯一 id 生成器来生成,并在数据表中使用另外的字段来存储这个唯一标识。

使用先使用范围查询定位 id (或者索引),然后再使用索引进行定位数据,能够提高好几倍查询速度。即先 select id,然后再 select *;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot技术实践-整合Quartz任务调度 一、

发表于 2021-07-22

一、Quartz任务调度

1.1 Quartz

  1. Quartz是一个定时任务框架,基础核心使用可以参考官网
  2. Quartz源码:github.com/quartz-sche…
  3. Quartz官网地址:www.quartz-scheduler.org/documentati…
  4. 项目源码地址:gitee.com/tianxincoor…

1.2 Scheduler

  1. Scheduler为quartz中的调度器,Quartz通过调度器来注册、暂停、删除Trigger和JobDetail
  2. Scheduler拥有SchedulerContext,顾名思义就是上下文,通过SchedulerContext可以获取到触发器和任务的一些信息

1.3 Trigger

  1. Trigger为触发器,通过cron表达式或日历指定任务执行的周期
  2. 系统时间走到触发器指定的时间时,触发器就会触发任务的执行

1.4 JobDetail

  1. Job接口是真正需要执行的任务
  2. JobDetail核心调度实现了Job类的任务类,Trigger和Scheduler实际用到的都是JobDetail

1.5 Job

  1. 完成任务的最小实现类,如果需要被定时调度的类都需要实现此接口

二、SpringBoot整合

2.1 环境准备

  1. 新建SpringBoot项目,SpringBoot基于2.2.0.RELEASE,完整pom如下
    • 核心依赖为spring-boot-starter-quartz、
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.codecoord</groupId>
<artifactId>springboot-quartz</artifactId>
<version>1.0</version>
<name>springboot-quartz</name>

<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 公共依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>

<!-- quartz依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
  1. 配置数据源相关链接
    • quartz需要单据的数据库,所以需要单据创建一个库来给quartz使用,此处为quartz_config
    • 其他相关配置可以参考官网:官网配置参考
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
yaml复制代码server:
port: 8888
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/source?useSSL=false
username: root
password: tianxin
# 定时配置
quartz:
# 相关属性配置
properties:
org:
quartz:
# 数据源
dataSource:
globalJobDataSource:
# URL必须大写
URL: jdbc:mysql://127.0.0.1:3306/quartz_config?useUnicode=true&characterEncoding=utf-8&useSSL=false
driver: com.mysql.cj.jdbc.Driver
maxConnections: 5
username: root
password: tianxin
# 必须指定数据源类型
provider: hikaricp
scheduler:
instanceName: globalScheduler
# 实例id
#instanceId: AUTO
type: com.alibaba.druid.pool.DruidDataSource
jobStore:
# 数据源
dataSource: globalJobDataSource
# JobStoreTX将用于独立环境,提交和回滚都将由这个类处理
class: org.quartz.impl.jdbcjobstore.JobStoreTX
# 驱动配置
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 表前缀
tablePrefix: QRTZ_
# 线程池配置
threadPool:
class: org.quartz.simpl.SimpleThreadPool
# 线程数
threadCount: 10
# 优先级
threadPriority: 5
  1. 数据表结构如下,在github中有对应的表结构信息
    • 源码地址:表结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
sql复制代码drop database quartz_config;
create database quartz_config default character set utf8mb4;
use quartz_config;

DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;


CREATE TABLE QRTZ_JOB_DETAILS
(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);

CREATE TABLE QRTZ_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);

CREATE TABLE QRTZ_SIMPLE_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_CRON_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(200) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
STR_PROP_1 VARCHAR(512) NULL,
STR_PROP_2 VARCHAR(512) NULL,
STR_PROP_3 VARCHAR(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 VARCHAR(1) NULL,
BOOL_PROP_2 VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_BLOB_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_CALENDARS
(
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(200) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);

CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_FIRED_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);

CREATE TABLE QRTZ_SCHEDULER_STATE
(
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);

CREATE TABLE QRTZ_LOCKS
(
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);
  1. 上述步骤完成后,quartz&SpringBoot整合基础环境完成,完整项目结构如下

image.png

2.2 代码示例

新建启动类SpringBootQuartz

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码package com.codecoord.springboot.quartz;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* SpringBootQuartz
*
* @author tianxincoord@163.com
* @date 2021/7/20
*/
@SpringBootApplication
public class SpringBootQuartz {

public static void main(String[] args) {
SpringApplication.run(SpringBootQuartz.class, args);
}
}

新建实体类,用于接收基础任务信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
java复制代码package com.codecoord.springboot.quartz.domain;

/**
* 任务明细
*
* @author tianxincoord@163.com
* @date 2021/7/20
*/
public class JobInfo {
/**
* 任务名称
*/
private String jobName;
/**
* 任务组
*/
private String jobGroup;
/**
* 触发器名称
*/
private String triggerName;
/**
* 触发器组
*/
private String triggerGroup;
/**
* cron表达式
*/
private String cron;
/**
* 类名
*/
private String className;
/**
* 状态
*/
private String status;
/**
* 下一次执行时间
*/
private String nextTime;
/**
* 上一次执行时间
*/
private String prevTime;
/**
* 配置信息(data)
*/
private String config;

public String getJobName() {
return jobName;
}

public void setJobName(String jobName) {
this.jobName = jobName;
}

public String getJobGroup() {
return jobGroup;
}

public void setJobGroup(String jobGroup) {
this.jobGroup = jobGroup;
}

public String getTriggerName() {
return triggerName;
}

public void setTriggerName(String triggerName) {
this.triggerName = triggerName;
}

public String getTriggerGroup() {
return triggerGroup;
}

public void setTriggerGroup(String triggerGroup) {
this.triggerGroup = triggerGroup;
}

public String getCron() {
return cron;
}

public void setCron(String cron) {
this.cron = cron;
}

public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public String getNextTime() {
return nextTime;
}

public void setNextTime(String nextTime) {
this.nextTime = nextTime;
}

public String getPrevTime() {
return prevTime;
}

public void setPrevTime(String prevTime) {
this.prevTime = prevTime;
}

public String getConfig() {
return config;
}

public void setConfig(String config) {
this.config = config;
}
}

新建任务类,用于调度任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码package com.codecoord.springboot.quartz.job;

import java.time.LocalDateTime;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.stereotype.Component;

/**
* 计划提醒
*
* @author tianxincoord@163.com
* @date 2021/7/20
*/
@Component
@DisallowConcurrentExecution
public class PlanRemindJob implements Job {

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("PlanRemindJob正在执行..." + LocalDateTime.now());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码package com.codecoord.springboot.quartz.job;

import java.time.LocalDateTime;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.stereotype.Component;

/**
* 时间日提醒
*
* @author tianxincoord@163.com
* @date 2021/7/20
*/
@Component
@DisallowConcurrentExecution
public class TimeEventJob implements Job {

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("TimeEventJob正在执行..." + LocalDateTime.now());
}
}

新建核心处理类,处理任务的添加、删除、暂停等操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
java复制代码package com.codecoord.springboot.quartz.handler;

import com.alibaba.fastjson.JSONObject;
import com.codecoord.springboot.quartz.domain.JobInfo;
import java.util.List;
import java.util.Objects;
import javax.annotation.Resource;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.springframework.stereotype.Component;

/**
* job处理器
*
* @author tianxincoord@163.com
* @date 2021/7/20
*/
@Component
public class JobHandler {
@Resource
private Scheduler scheduler;

/**
* 添加任务
*/
@SuppressWarnings("unchecked")
public void addJob(JobInfo jobInfo) throws SchedulerException, ClassNotFoundException {
Objects.requireNonNull(jobInfo, "任务信息不能为空");

// 生成job key
JobKey jobKey = JobKey.jobKey(jobInfo.getJobName(), jobInfo.getJobGroup());
// 当前任务不存在才进行添加
if (!scheduler.checkExists(jobKey)) {
Class<Job> jobClass = (Class<Job>)Class.forName(jobInfo.getClassName());
// 任务明细
JobDetail jobDetail = JobBuilder
.newJob(jobClass)
.withIdentity(jobKey)
.withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup())
.withDescription(jobInfo.getJobName())
.build();
// 配置信息
jobDetail.getJobDataMap().put("config", jobInfo.getConfig());
// 定义触发器
TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getTriggerName(), jobInfo.getTriggerGroup());
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(jobInfo.getCron()))
.build();
scheduler.scheduleJob(jobDetail, trigger);
} else {
throw new SchedulerException(jobInfo.getJobName() + "任务已存在,无需重复添加");
}
}

/**
* 任务暂停
*/
public void pauseJob(String jobGroup, String jobName) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (scheduler.checkExists(jobKey)) {
scheduler.pauseJob(jobKey);
}
}

/**
* 继续任务
*/
public void continueJob(String jobGroup, String jobName) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (scheduler.checkExists(jobKey)) {
scheduler.resumeJob(jobKey);
}
}

/**
* 删除任务
*/
public boolean deleteJob(String jobGroup, String jobName) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (scheduler.checkExists(jobKey)) {
return scheduler.deleteJob(jobKey);
}
return false;
}

/**
* 获取任务信息
*/
public JobInfo getJobInfo(String jobGroup, String jobName) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
if (!scheduler.checkExists(jobKey)) {
return null;
}
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
if (Objects.isNull(triggers)) {
throw new SchedulerException("未获取到触发器信息");
}
TriggerKey triggerKey = triggers.get(0).getKey();
Trigger.TriggerState triggerState = scheduler.getTriggerState(triggerKey);
JobDetail jobDetail = scheduler.getJobDetail(jobKey);

JobInfo jobInfo = new JobInfo();
jobInfo.setJobName(jobGroup);
jobInfo.setJobGroup(jobName);
jobInfo.setTriggerName(triggerKey.getName());
jobInfo.setTriggerGroup(triggerKey.getGroup());
jobInfo.setClassName(jobDetail.getJobClass().getName());
jobInfo.setStatus(triggerState.toString());

if (Objects.nonNull(jobDetail.getJobDataMap())) {
jobInfo.setConfig(JSONObject.toJSONString(jobDetail.getJobDataMap()));
}

CronTrigger theTrigger = (CronTrigger) triggers.get(0);
jobInfo.setCron(theTrigger.getCronExpression());
return jobInfo;
}
}

新建controller接口对外提供操作入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
java复制代码package com.codecoord.springboot.quartz.controller;

import com.codecoord.springboot.quartz.domain.JobInfo;
import com.codecoord.springboot.quartz.handler.JobHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.annotation.Resource;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
* Quartz控制器
*
* @author tianxincoord@163.com
* @date 2021/7/20
*/
@RestController
@RequestMapping("/job")
public class QuartzController {

@Resource
private JobHandler jobHandler;
@Resource
private Scheduler scheduler;

/**
* http://localhost:8888/job/all
*/
@RequestMapping("/all")
public List<JobInfo> list() throws SchedulerException {
List<JobInfo> jobInfos = new ArrayList<>();
List<String> triggerGroupNames = scheduler.getTriggerGroupNames();
for (String triggerGroupName : triggerGroupNames) {
Set<TriggerKey> triggerKeySet = scheduler
.getTriggerKeys(GroupMatcher.triggerGroupEquals(triggerGroupName));
for (TriggerKey triggerKey : triggerKeySet) {
Trigger trigger = scheduler.getTrigger(triggerKey);
JobKey jobKey = trigger.getJobKey();
JobInfo jobInfo = jobHandler.getJobInfo(jobKey.getGroup(), jobKey.getName());
jobInfos.add(jobInfo);
}
}
return jobInfos;
}

/**
* http://localhost:8888/job/add
*
* {
* "className": "com.codecoord.springboot.quartz.job.PlanRemindJob",
* "config": "配置信息,例如存储json",
* "cron": "0/3 * * * * ?",
* "jobGroup": "STANDARD_JOB_GROUP",
* "jobName": "计划任务通知任务",
* "triggerGroup": "STANDARD_TRIGGER_GROUP",
* "triggerName": "计划任务通知触发器"
* }
*
* {
* "className": "com.codecoord.springboot.quartz.job.TimeEventJob",
* "config": "配置信息,例如存储json",
* "cron": "0/10 * * * * ?",
* "jobGroup": "STANDARD_JOB_GROUP",
* "jobName": "时间通知任务",
* "triggerGroup": "STANDARD_TRIGGER_GROUP",
* "triggerName": "时间通知触发器"
* }
*/
@PostMapping("/add")
public JobInfo addJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ClassNotFoundException {
jobHandler.addJob(jobInfo);
return jobInfo;
}

/**
* http://localhost:8888/job/pause?jobGroup=STANDARD_JOB_GROUP&jobName=计划任务通知任务
* http://localhost:8888/job/pause?jobGroup=STANDARD_JOB_GROUP&jobName=时间通知任务
*/
@RequestMapping("/pause")
public void pauseJob(@RequestParam("jobGroup") String jobGroup, @RequestParam("jobName") String jobName)
throws SchedulerException {
jobHandler.pauseJob(jobGroup, jobName);
}

/**
* http://localhost:8888/job/continue?jobGroup=STANDARD_JOB_GROUP&jobName=计划任务通知任务
* http://localhost:8888/job/continue?jobGroup=STANDARD_JOB_GROUP&jobName=时间通知任务
*/
@RequestMapping("/continue")
public void continueJob(@RequestParam("jobGroup") String jobGroup, @RequestParam("jobName") String jobName)
throws SchedulerException {
jobHandler.continueJob(jobGroup, jobName);
}

/**
* http://localhost:8888/job/delete?jobGroup=STANDARD_JOB_GROUP&jobName=计划任务通知任务
* http://localhost:8888/job/delete?jobGroup=STANDARD_JOB_GROUP&jobName=时间通知任务
*/
@RequestMapping("/delete")
public boolean deleteJob(@RequestParam("jobGroup") String jobGroup, @RequestParam("jobName") String jobName)
throws SchedulerException {
return jobHandler.deleteJob(jobGroup, jobName);
}
}

三、Quartz测试

  1. 通过POSTMAN/apiPost等工具添加任务,请求数据都在对应接口上
  2. 观察对应任务是否有进行执行以及数据库中是否有对应任务,例如下面3张表
1
2
3
4
5
sql复制代码use quartz_config;

select * from QRTZ_CRON_TRIGGERS;
select * from QRTZ_TRIGGERS;
select * from QRTZ_JOB_DETAILS;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis持久化总结

发表于 2021-07-22

Redis之所以能被称之为内存数据库而不单单是内存缓存是因为其有自己的持久化机制,可以持久化数据库到硬盘当中,在宕机之后能及时的恢复,其持久化方式主要有两种,AOF和RDB,下面的内容是对这两种方式的总结。

AOF

基本概念

AOF方式也就是Append Only File,其是在执行命令之后,将对应的日志写到日志文件中的,因为是先执行操作,再写日志,因此不会阻塞当前的写操作。

何时将日志刷盘,决定了Redis宕机恢复时丢失数据的多少,Redis提供了三种策略

  • Always: 同步写回,命令执行完成,立马写回到磁盘
  • Everysec: 每秒写回,隔一秒就将缓存页数据写回磁盘
  • No: 依赖操作系统控制的写回,操作系统自己决定刷盘时机。
    以上三种策略,对数据的可靠性要求越高的对性能的影响也就越大,因此如何配置是一个取舍问题。

AOF日志写的格式是执行命令的Redis协议内容,Redis协议是文本协议,自然占据空间比较大,而由于其记录的是操作流,对同一个Key的不断修改使得AOF日志中记录了很多历史的Value,为了解决这个问题,Redis引入了AOF重写机制,如何重写,是否影响Redis对外提供的服务是我们关心的内容,下面是关于AOF重写过程的总结,以问题的形式呈现。

AOF重写

1. 何时开启重写

这是由redis中的配置决定的,可以根据百分比和文件大小设置,相关的配置是auto-aof-rewrite-*

2. 重写过程中会不会影响Redis对外提供服务?

不会,重写过程是后台子进程bgrewriteaof来完成的,由于在fork过程中操作系统会拷贝页表等相关信息,这个过程会阻塞,但是拷贝完成之后,子进程会同父进程共享内存空间,而父进程也由于Copy On Write机制对内存的修改不会影响到子进程。子进程重写AOF,父进程对外提供服务,同时工作,整体上来看还是不会影响的。

3. 重写过程会直接删除原来的AOF日志么?

不会,原来的日志依然会使用,直到AOF重写完成。

4. 重写AOF的日志的进程内存空间是快照,那重写过程中的数据怎么办?

在重写过程中,数据会写到两个缓存中,一个是原来的AOF缓存,一个是AOF重写缓存区,当重写完成之后,重写缓冲区记录的最新操作也会写入新的AOF文件中,以保证数据库最新的记录,完成之后进行替换

RDB

上面讲到了AOF来记录Redis操作,当进行数据恢复的时候,AOF需要从头到尾执行命令才能恢复到最新的状态,而如果能够直接将内存快照下来,就会快很多,这就是RDB也就是Redis Data Base
执行RDB操作有两个命令可以直接执行

  • save
  • bgsave
    其中save是在主进程中执行save操作,会阻塞主进程,而bgsave会创建一个子进程,来负责生成快照,子进程如何做到不阻塞在上面AOF中有解释。

对于RDB,还有一个问题是快照频率的问题,从可靠性的角度出发,恨不得每时每刻都可以创建快照,但是由于拷贝的是整个Redis内存数据,开销和空间占用都比较大,对磁盘的压力都很大。此时AOF的优势就展现了出来,因此引出在Redis4.0中提出的AOF和RDB混合模式

AOF与RDB混合模式

混合模式同时使用了两种方式来生成快照,在配置文件中指定的时间周期生成RDB,最后又将过程中发生的写操作以AOF的形式写到快照文件中。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【建议收藏】毕设/私活/大佬必备,开源一个SpringBoo

发表于 2021-07-22

本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!

小伙伴们,大家好,我是花Gie,一个正经的程序猿。

今天和大家分享一个项目,这个项目花Gie不眠不休足足肝了两天两夜,这是个什么东东呢,它是集SpringBoot+Mybatis+redis+shiro+jwt+vue于一体的标准项目框架。

整个项目不涉及任何具体业务场景,只配置了一些常用功能,如:权限管理,用户管理,菜单管理等,外加redis中间件,以及很多好用的工具类(RedisUtil,Id生成器,JWt等),可以说是即拿即用,扩展性也非常强,下面就就进入主题,看看怎么使用吧。

image.png

一、SpringBoot + Mybatis介绍与搭建

不知道大家在开发过程中,有没有从0开始搭建一个项目,反正花Gie刚开始学Java的时候,被SSM折腾的死去活来,各种复杂的配置文件搞的我一直在崩溃的边缘试探,而近年来随着SpringBoot的普遍使用,新一批程序猿大军要舒服多了,因为SpringBoot简化了配置,让开发变得极其简单而快速。

1.1 SpringBoot是什么

SpringBoot 是 Pivotal 团队开发的一套全新框架,设计目的是为了简化项目初始搭建以及开发过程,去除了大量的 xml 配置文件,简化了复杂的依赖管理,配合各种 starter 使用,基本上可以做到自动化配置。如果你使用的是IDEA开发工具,你只要点击几下 [下一步] 就可以完成所有配置,实现轻松启动。

1.2 框架搭建介绍

搭建基础框架有多种方式,由于不是我们本文的重点,这里介绍两种最常用的方式,小伙伴们可以根据自己习惯来选取,花Gie一般使用IDEA来搭建,相当方便。

  • 1.通过Spring Initializr创建

打开浏览器,输入地址start.spring.io,正常情况可以看到下面这个界面。

image.png

我们可以看到有很多配置项,这里简单介绍一下主要配置项的含义:

  • Project: 选择使用Maven或Gradle来创建项目;
  • Language: 开发语言;
  • Spring Boot: Spring Boot版本选择,默认最新版本(非里程碑和快照版本);
  • Project Metadata: 指定项目的一些基本信息:
0. **Group:** 一般分为多个段,如com.basic.business,其中第一段为域,第二段为公司名称。域又分为org、com、cn等,其中org为非营利组织,com为商业组织。举个apache公司的tomcat项目例子:这个项目的groupId是,它的域是org(tomcat是非营利项目),公司名称是apache,artigactId是tomcat。
1. **Artfact:** 一般是项目名或者模块名,和Group一起保证项目唯一性
2. **name**:项目名称
3. **Description**:项目描述
4. **Package Name**:包名,如com.huage.base
5. **Packaging**:打包方式
6. **Java** :JDK版本号
  • Dependencies: 选择需要的依赖,它会在创建项目时自动在生成的pom.xml(Maven)或者build.gradle(Gradle)引入依赖。

填好所有信息后,点击Generate the project 按钮,Spring Initializr就会生成一个项目,这个项目会以zip文件的形式下载。解压到本地后,可以通过IDEA导入项目。

2.通过IDEA创建

依次点击IDEA菜单栏:File -> New -> Project,打开如下窗口:

image.png

选择好JDK版本后,点击下一步会看到如下界面,是不是很熟悉,这个和上面说的网页端操作是一样的,这里就不再赘述,继续点击【下一步】

image.png

下面这里是我们初始化依赖的地方,我们可以根据项目需要合理选择,也可以后续在pom文件中添加,此外这里还可以选择SpringBoot版本,一般使用最新稳定版本。

image.png

1.3 数据库设计

因为本项目不涉及任何具体业务,所以只需要一些基础表即可,如:sys_user、sys_role、sys_menu等。

image.png

1.4【标准版本】基础信息配置

基础项目搭建完成后,此时整个项目只有空的文件夹和一个空的application.properties,这时我们需要对数据库和依赖包以及mybatis进行配置。

  • 1. 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ini复制代码server.port=18082
spring.application.name=first-program
​
# mysql db
spring.datasource.url=jdbc:mysql://localhost:3306/firstProgram?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
​
# mybatis
mybatis.type-aliases-package=com.basic.business
mybatis.mapper-locations=classpath:mapper/*.xml
​
pagehelper.helper-dialect= mysql
pagehelper.reasonable= false
pagehelper.support-methods-arguments= true
pagehelper.params= count=countsql
  • 2. pom文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
xml复制代码 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<!--mysql 驱动-->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>5.1.40</version>
   <scope>runtime</scope>
</dependency>
<!--druid-->
<dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid-spring-boot-starter</artifactId>
   <version>1.1.10</version>
</dependency>
<dependency>
   <groupId>jakarta.validation</groupId>
   <artifactId>jakarta.validation-api</artifactId>
   <version>2.0.2</version>
</dependency>
<dependency>
   <groupId>com.github.pagehelper</groupId>
   <artifactId>pagehelper-spring-boot-starter</artifactId>
   <version>${pagehelper-spring-boot-starter.version}</version>
</dependency>

1.5 使用EasyCode快捷生成基础代码

配置好以上信息后,我们使用EasyCode代码生成器,来生成Entity、Dao、Service、Controller等文件,花Gie在一个小时肝了一周的需求,看我如何使用EasyCode完成封神 中已经详细介绍过用法,这里就不再重复造轮子啦。

二、【标准版本】 集成Swagger

EasyCode代码生成后,会自动带有Swagger注解,我们需要两步配置即可完成Swagger注解引入

  • pom依赖
1
2
3
4
5
6
7
8
9
10
11
xml复制代码 <!--swagger配置-->
<dependency>
   <groupId>io.swagger</groupId>
   <artifactId>swagger-models</artifactId>
   <version>1.5.21</version>
</dependency>
<dependency>
   <groupId>io.springfox</groupId>
   <artifactId>springfox-swagger-ui</artifactId>
   <version>2.9.2</version>
</dependency>
  • 新建配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scss复制代码@Configuration
public class Swagger2Configurate {
   //是否开启swagger
   @Value("${swagger.enable}")
   private boolean enableSwagger;
​
   @Bean
   public Docket createRestApi() {
       return new Docket(DocumentationType.SWAGGER_2)
              .enable(enableSwagger)
              .apiInfo(apiInfo())
              .select()
              .apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class))
              .paths(PathSelectors.any())
              .build();
  }
​
   private ApiInfo apiInfo() {
       return new ApiInfoBuilder()
              .title("标准版本API文档")
              .version("1.0")
              .build();
  }
}

接下来我们启动一下项目,在浏览器输入地址http://127.0.0.1:18082/swagger-ui.html,看看效果吧,如下图,我们可以正常看到swagger界面,尝试调用了一下接口,也都可以正常请求:

image.png

三、【标准版本】集成 redis

redis想必小伙伴们即使没有用过,也是经常听到的,在工作中,redis用到的频率非常高,所以【标准版本】项目中,花Gie也集成了redis还有一些相关工具类,相当贴心。

3.1 redis是什么

用通俗点的话解释,redis就是一个数据库,直接运行在内存中,因此其运行速度相当快,同时其并发能力也非常强。redis是以key-value键值对的形式存在(如:"name":huage),它的key有五种常见类型:

  • String:字符串
  • Hash:字典
  • List:列表
  • Set:集合
  • SortSet:有序集合

除此之外,redis还有一些高级数据结构,如HyperLogLog、Geo、Pub/Sub以及BloomFilter、RedisSearch等,这个后面花Gie会有专门的系列来讲解,这里不再展开啦(不然肝不完了)。

3.2 集成redis步骤

  • pom文件配置
1
2
3
4
5
6
7
8
9
10
11
xml复制代码<!--redis-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--jedis-->
<dependency>
   <groupId>redis.clients</groupId>
   <artifactId>jedis</artifactId>
   <version>2.9.0</version>
</dependency>
  • 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码#redis配置开始
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=1024
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=10000
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=200
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=10000
#redis配置结束
spring.redis.block-when-exhausted=true
  • 初始化配置文件
1
2
3
4
5
6
7
8
9
10
11
12
scss复制代码//初始化jedis
public JedisPool redisPoolFactory() throws Exception {
   JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
   jedisPoolConfig.setMaxIdle(maxIdle);
   jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
   // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
   jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted);
   // 是否启用pool的jmx管理功能, 默认true
   jedisPoolConfig.setJmxEnabled(true);
   JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password);
   return jedisPool;
}

3.3 代码演示

完成上面的配置后,我们只需要使用@Autowired引入RedisTemplate,就可以很方便的存取redis了,此外花Gie在项目中增加了一个RedisUtil工具类,囊括了redis大部分命令,足够平时开发使用。

1
2
3
4
5
6
7
8
kotlin复制代码//引入redis
@Autowired
private RedisTemplate redisTemplate;
​
//将【name:花哥】 存入redis
redisTemplate.opsForValue().set("name","花哥");
//取出redis中key为name的数据
redisTemplate.opsForValue().get("name");

四、【标准版本】引入 JWT

标准版本采用的是前后端分离架构,因此要想保持前后端正常通信,需要前端请求时,带上一个身份识别字段token,来确认接口请求的合法性,所以就引入了jwt。

4.1jwt是什么

JSON Web Token (JWT)是一个开放标准(RFC 7519),它定义了一种紧凑的、自包含的方式,用于作为JSON对象在各方之间安全地传输信息。该信息可以被验证和信任,因为它是数字签名的。

4.2 jwt适用场景

  • Authorization (授权) : 这是使用JWT的最常见场景。一旦用户登录,后续每个请求都将包含JWT,允许用户访问该令牌允许的路由、服务和资源。单点登录是现在广泛使用的JWT的一个特性,因为它的开销很小,并且可以轻松地跨域使用。
  • Information Exchange (信息交换) : 对于安全的在各方之间传输信息而言,JSON Web Tokens无疑是一种很好的方式。因为JWT可以被签名,例如,用公钥/私钥对,你可以确定发送人就是它们所说的那个人。另外,由于签名是使用头和有效负载计算的,您还可以验证内容没有被篡改。

4.3 教程简介

这里花Gie看到知乎的一篇文章,写的也是相当好,链接奉上五分钟带你了解啥是JWT

五、【标准版本】集成 Shiro

基线版本采用了shiro进行鉴权,一是因为花Gie工作中用的比较多,对它相对比较熟悉;二来呢,shiro确实相较于另一种类似功能的spring-security,更加容易理解,代码更加易于阅读。

5.1 Shiro简介

Apache Shiro是一个强大且易用的Java安全框架,我们可以通过shiro完成:认证、授权、加密、会话管理、与 Web 集成、缓存等。这些简直不要太嗨,一个人就可以肝这么多事情,堪称肝王。而且shiro是Apache下的项目,给人一种很靠谱的赶脚,并且它不跟任何框架或容器绑定。

5.2 集成shiro步骤

标准版本采用shiro+jwt来进行身份验证,抛开原理不说,我们先来看一下如何将shiro集成到我们标准版本中。

这里需要提醒一下工作经验不多的小司机,遇到不懂的技术切勿抓破头皮读概念,有时候亲自动手实现一遍代码,等到代码运行一遍,看完结果之后,再回过头来,重新看概念,捋一捋代码实现,会有不同的感受,如果在学习上还是感觉很难学下去,可以在评论区发出来,社区有很多大神可以解答,切勿闭门造车。

image.png

  • 配置核心安全事务管理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scss复制代码@Bean
public SecurityManager securityManager() {
   /** 1. 引入两种身份验证realm */
   /**loginRealm用于登录认证,customRealm用于其他接口认证 */
   securityManager.setRealms(Lists.newArrayList(customRealm(), loginRealm()));
​
    DefaultWebSecurityManager securityManager = new DefaultWebSecurityManager();
   /** 2.设置认证策略*/
   securityManager.setAuthenticator(authenticator());
   
   /** 3.关闭shiro自带的session。让每次请求都得到过滤*/
   DefaultSubjectDAO subjectDAO = new DefaultSubjectDAO();
   subjectDAO.setSessionStorageEvaluator(sessionStorageEvaluator());
   securityManager.setSubjectDAO(subjectDAO);
   return securityManager;
}

先来看 【第一步】 ,我们引入了两种身份验证(loginRealm、customRealm),令人头大,这个Realm到底是啥呢,求求花Gie说说人话吧。

image.png

简单的说,后台会接收各种场景发来的请求,比如登录、注册、新增用户等等,我们不可能一棒子打死,全部用一种策略来校验所有请求,比如登录请求需要调用数据库查询登录信息是否正确,而一旦登录成功后,此后再发送其他接口请求,每次再连接数据库查询用户登录信息是否正确,显然是不合适的,所以需要其他策略。

image.png

因此,标准版本使用了两种Realm,其中用于处理登录验证的loginRealm,该realm只处理鉴权登录请求,校验登录是否正确;而对于其他所有的接口请求,都会被customRealm进行拦截处理。

接下来我们看 【第二步】 ,setAuthenticator()方法是用来设置认证策略,那什么是认证策略呢,这听起来也不怎么像人话,但其实比较好理解,先来看一下下面这段代码:

1
2
3
4
5
6
7
8
9
10
11
java复制代码/**
*多个Realm时,设置认证策略
*/
@Bean
public ModularRealmAuthenticator authenticator() {
   ModularRealmAuthenticator authenticator = new MultiRealmAuthenticator();
   // 多个Realm的认证策略,默认 AtLeastOneSuccessfulStrategy
   AuthenticationStrategy strategy = new FirstSuccessfulStrategy();
   authenticator.setAuthenticationStrategy(strategy);
   return authenticator;
}

是不是一脸懵逼,不要急,这里举个具体场景:我们标准版本里有两个用于认证的Realm,默认情况下,我们接口每次请求都会被这两个Realm分别校验,那如果有一个校验成功,另一个校验失败,那这次请求到底成功还是失败呢,这时候就需要一种策略来规范,官方给出三种策略:

  • FirstSuccessfulStrategy:只要有一个Realm验证成功即可,只返回第一个Realm身份验证成功的认证信息,其他的忽略;(注:这里”第一个”指的是认证成功得那一个realm)
  • AtLeastOneSuccessfulStrategy:只要有一个Realm验证成功即可,和FirstSuccessfulSTRATEGY不同,将返回所有Realm身份验证成功的认证信息;
  • AllSuccessfulStrategy:所有Realm验证成功过才算成功,且返回所有Realm身份验证的认证信息,如果有一个失败就失败。

最后看一下 【第三步】 ,这里我们禁用session, 不保存用户登录状态,保证每次请求都重新认证。

1
2
3
4
5
6
7
8
9
csharp复制代码/**
* 禁用session, 不保存用户登录状态。保证每次请求都重新认证
*/
@Bean
protected SessionStorageEvaluator sessionStorageEvaluator() {
   DefaultSessionStorageEvaluator sessionStorageEvaluator = new DefaultSessionStorageEvaluator();
   sessionStorageEvaluator.setSessionStorageEnabled(false);
   return sessionStorageEvaluator;
}
  • 自定义LoginRealm

LoginRealm 只负责处理登录请求,它的实现逻辑就是获取请求接口中账号密码,通过数据库查询,并将查询结果保存到redis,供后续接口请求时验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码/**
* 查询数据库,将获取到的用户安全数据封装返回(伪代码)
*/
@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authenticationToken) throws AuthenticationException {
   LoginToken token = (LoginToken) authenticationToken;
   String username = token.getUsername();
   String password = new String((char[]) authenticationToken.getCredentials());
​
   //查询是否存在当前用户,有效用户查询结果只能为1
   SysUser user = loginService.getUserByNameAndPwd(username, password);
   if(user == null){
       throw new AuthException(LOGIN_PWD_ERROR.getCode(), "登录验证失败, 用户名或密码错误");
  }
   // 缓存当前登录用户信息
   redisUtil.setex(key, value);
   return new SimpleAuthenticationInfo(jwtEntity, password, getName());
}
  • 自定义CustomRealm

上面的LoginReaml会将登陆成功的用户信息缓存,此后接口每次调用后台时,都会在header请求头中携带token字段,用于鉴权,而鉴权就是在CustomRealm中进行,伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authenticationToken) throws AuthenticationException {
   //1.获取请求token
   CustomerToken jwtToken = (CustomerToken) authenticationToken;
   
  ....省略
   //2.从jwt中解析出用户名、密码
   String username = "";
​
//3.取出redis中用户信息
   String jwtCache = redisUtil.get(StringUtils.joinWith(SYMBOL_UNDERLINE, REDIS_KEY_PREFIX_LOGIN_ACCOUNT, username), 0);
   
   //4.对缓存信息进行处理
   if (jwtCache == null) {
       log.warn("[获取密码缓存失败, 查询数据库 account = {}]", username);
  } else {
       log.error("[当前账号在其他地方登录, account = {}]", username);
  } else if (!StringUtils.equals(password, cacheJwtObj.getPassword())) {
       log.error("缓存密码校验失败");
  }
}
return new SimpleAuthenticationInfo(jwtObject, jwtToken.getCredentials(), getName());
}
​
  • 配置拦截内容

有些请求是不需要被拦截的,像注册这类接口及一些静态资源(css、图片、js等),这时我们就需要在shiro中提前设置,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
dart复制代码ShiroFilterFactoryBean shiroFilterFactoryBean = new ShiroFilterFactoryBean();
shiroFilterFactoryBean.setSecurityManager(securityManager);
//登录地址
shiroFilterFactoryBean.setLoginUrl("/login");
//登录成功后要跳转的连接
shiroFilterFactoryBean.setSuccessUrl("/authorized");
//未授权跳转地址
shiroFilterFactoryBean.setUnauthorizedUrl("/403");
​
//增加自定义过滤器(JwtFilter)
Map<String, Filter> filterMap = Maps.newHashMapWithExpectedSize(1);
filterMap.put(JwtFilter.class.getName(), new JwtFilter());
shiroFilterFactoryBean.setFilters(filterMap);
​
//配置资源访问权限(anon 表示资源都可以匿名访问)
LinkedHashMap<String, String> filterChainDefinitionMap = new LinkedHashMap<>();
filterChainDefinitionMap.put("/css/**", "anon");
filterChainDefinitionMap.put("/js/**", "anon");
filterChainDefinitionMap.put("/login", "anon");
filterChainDefinitionMap.put("/**", JwtFilter.class.getName());
shiroFilterFactoryBean.setFilterChainDefinitionMap(filterChainDefinitionMap);
  • 登录测试

这里我们获取到前端传入的用户名、密码,封装到LoginToken中调用subject.login()方法,这时就会触发LoginRealm进行鉴权,为什么会触发LoginRealm而不是CustomRealm,我们上面已经详细介绍了。

1
2
3
4
5
6
7
ini复制代码LoginToken loginToken = new LoginToken(username,password);
Subject subject = SecurityUtils.getSubject();
//登录操作(前往LoginRealm鉴权)
subject.login(usernamePasswordToken);
//返回前端token信息
JwtEntity jwtEntity = (JwtEntity) subject.getPrincipal();
return new SysUserLoginDto(JwtUtils.createJwtToken(JSON.toJSONString(jwtEntity)), jwtEntity.getLoginId());

测试结果,可以拿到 jwt生成的token值,后续再次请求时,需要携带该token进行鉴权:

1
json复制代码{"token":"eyJhbGciOiJIUzI1NiJ9.eyJqd3RLZXkiOiJ7XCJhY2NvdW50XCI6XCIxXCIsXCJsb2dpbklkXCI6MSxcInBhc3N3b3JkXCI6XCIweEMwbWZJVGR4MjJ3ejFKMVU1c3pnPT1cIixcInNlc3Npb25JZFwiOlwiODg2MjcyNDkyOTE0NjA2MTJcIn0iLCJleHAiOjE2MjY5MjI5MDJ9.VF8R9X3hZb6SDvShZbRdSRgwaAUUE7dC7XQhIuWBSn4","loginId":1}

五、前端项目搭建

前端使用的是ant-design-vue2.0版本,虽然写过不少前端项目,也用过其他框架,如elementUi。但是对于一个后端码农来说,我的审美有点low,平时颜色搭配属于红配绿那种….这里也就不详细介绍前端的搭建过程了,掘金社区有非常多非常优秀的前端大佬,小伙伴们可以去学习学习。

image.png

image.png

总结

篇幅有点长,能坚持看到这里的小伙伴都是 真大佬,但是如果看完后还是觉得晦涩难懂,可以直接下载花Gie开源出来的标准版本项目,对照着调试。如果有其他想法也可以联系花Gie,如果恰巧我也知道的花,肯定会给小伙伴们解答的,今天的分享就到这了,下期还有什么好玩的呢,关注我,继续肝。

如果小伙伴们有其他好的想法,可以来花Gie的开源小队,一起研究技术,一起学习,非常欢迎大家的加入

点关注,防走丢

以上就是本期全部内容,如有纰漏之处,请留言指教,非常感谢。我是花Gie ,有问题大家随时留言讨论 ,我们下期见🦮。

文章持续更新,可以微信搜一搜 花哥编程 第一时间阅读,并且可以获取面试资料学习视频等,有兴趣的小伙伴欢迎关注,一起学习,一起哈🐮🥃。

原创不易,你怎忍心白嫖,如果你觉得这篇文章对你有点用的话,感谢老铁为本文点个赞、评论或转发一下,因为这将是我输出更多优质文章的动力,感谢!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…596597598…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%