开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Spring Integration 集成 MQTT

发表于 2021-09-15

Spring Integration 是什么?

Spring Integration 在 Spring 家族不太有名气,如果不是有需求,一般也不会仔细去看。那么 Spring Integration 是什么呢?用官方的一句话来解释就是:它是一种轻量级消息传递模块,并支持通过声明式适配器与外部系统集成。简单来说,Spring Integration 抽象了用于消息传递的一套规范,并且基于这套规范提供了很多企业级的中间件的集成。比如他支持基于 AMQP 的消息队列、MQTT、RMI 等等中间件。

用过 Spring 家族组件的同学应该会比较容易理解了。例如,Spring Data 抽象了数据访问的一系列接口,后端可支持多种 ORM;Spring Cache 抽象了缓存使用的接口,后端支持 Caffeine、Redis、Memcached 等缓存中间件。其实这都是一样的。好处是,我们只需要熟悉这一种规范,就可以任意的去对接各种企业级框架,起到快速开发的作用;劣势是,这些企业级的框架只能再 Spring 抽象的这套规范下工作,对于一些细节的开发,可能仍然需要使用原生的框架来实现。

本文主要介绍的是 Spring Integration,以及它是如何集成 MQTT 协议的。

Spring Integration 消息抽象

刚刚我们讲了,Spring Integration 实际上就是抽象出了消息传递的规范,然后再适配各种消息中间件。那么下面我们先简单了解下 Spring Integration 消息通信的模式。

image.png

image.png

image.png

image.png

image.png

image.png

以上几张官方提供的图可以大致厘清 Spring Integration 的各类组件和工作模式:

  1. Message 包含 Header 和 Payload 两部分。
  2. MessageChannel 用于解耦生产者和消费者,实现消息发送。
  3. MessageRouter 用来控制消息转发的 Channel。
  4. Service Activitor 用来绑定 MessageHandler 和用于消费消息的 MessageChannel。
  5. ChannelAdapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。

在开发上就需要去了解这些抽象组件的具体实现了,在下面讲到 MQTT 的集成上可以再体会一下 SI 的设计思路。

MQTT 协议

MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

这是 MQTT 协议的官方描述,它是一种应用于物联网的轻量级的发布订阅协议,类似于 AMQP。详细了解可以参考:

  • MQTT Specifications
  • emqx mqtt 协议介绍 协议)
  • MQTT 协议中文版
  • 消息推送标准协议:MQTT

下面提一些重要的或者开发中需要配置的点。

通信方式

默认是发布 / 订阅模式的。

  1. 通信系统中有发布者和订阅者。发布者发布消息而订阅者接收消息。我们把发布者和订阅者统称为客户端。客户端可以同时是发布者和订阅者。
  2. 在系统中有另外一个角色,它接收发布者的消息并且将消息派发给订阅者。我们一般称这个角色为消息 Broker。
  3. 在 MQTT 中默认是广播的,也就是说订阅了相同 topic 的订阅者都能收到发布者发送的消息。

基于主题 (Topic) 消息路由

MQTT 协议基于主题 (Topic) 进行消息路由,主题 (Topic) 类似 URL 路径,例如:

1
2
3
bash复制代码chat/room/1
sensor/10/temperature
sensor/+/temperature

主题 (Topic) 通过’/‘分割层级,支持’+’, ‘#’通配符:

  • ‘+’: 表示通配一个层级,例如 a/+,匹配 a/x, a/y
  • ‘#’: 表示通配多个层级,例如 a/#,匹配 a/x, a/b/c/d
  • 订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。

QoS

为了满足不同的场景,MQTT 支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:

  • 0:At most once。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
  • 1:At least once。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
  • 2:Exactly onces。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别 2 是最合适的。

订阅者收到 MQTT 消息的 QoS 级别,最终取决于发布消息的 QoS 和主题订阅的 QoS。

Broker 选型

本文使用的 MQTT Broker 是 EMQ X 的开源版。

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。 Erlang/OTP 是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed) 的语言平台。

客户端代码集成

Java 客户端一般使用 Eclipse Paho Java Client,此客户端为 Java SE 版本的,为了在 SpringBoot 上有更好的集成,这里我们使用 Spring Integration,Spring Integration MQTT Support 默认集成的就是 Eclipse Paho Java Client V3 版本。

依赖和参数配置

1
2
3
4
5
6
7
8
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1
2
3
4
5
yml复制代码mqtt:
url: tcp://172.17.218.94:1883
username: admin
password: public
clientId: mqtt-sender
1
2
3
4
5
6
7
8
9
java复制代码@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
private String url;
private String username;
private String password;
private String clientId;
}

发布者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
java复制代码@Configuration
@IntegrationComponentScan
public class MqttConfig {
@Autowired
private MqttProperties prop;

@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setServerURIs(new String[]{prop.getUrl()});
mqttConnectOptions.setUserName(prop.getUsername());
mqttConnectOptions.setPassword(prop.getPassword().toCharArray());
// 客户端断线时暂时不清除,直到超时注销
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setAutomaticReconnect(true);
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
prop.getClientId() + "-pub-" + Instant.now().toEpochMilli(), mqttClientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultRetained(false);
messageHandler.setAsyncEvents(false);
// Exactly Once
messageHandler.setDefaultQos(2);
messageHandler.setDefaultTopic(ApiConst.MQTT_TOPIC_SUFFIX);
return messageHandler;
}

@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttTemplate {
void send(String payload);

void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic);

void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
}
  1. @IntegrationComponentScan,开启 Spring Integration 的注解扫描。
  2. 注入客户端工厂类 MqttPahoClientFactory,此处可以配置认证参数、超时时间等 broker 连接参数。
  3. 注入 MessageChannel 实例。
  4. 注入 MessageHandler 的实例,并通过 @ServiceActivator 绑定到对应的 MessageChannel。此处可配置消息处理的模式、QoS、默认的 Topic 等。
  5. 定义一个 @MessagingGateway 修饰的接口,用于消息的发送,@MessagingGateway 的 defaultRequestChannel 参数用于绑定具体的 MessageChannel。
  6. 在使用的地方自动注入 MqttTemplate 的实例,即可调用方法发送消息。

订阅者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码@Configuration
@IntegrationComponentScan
public class MqttConfig {

private final MqttProperties prop;
private final MqttInboundMessageHandler mqttInboundMessageHandler;

public MqttConfig(MqttProperties prop,
MqttInboundMessageHandler mqttInboundMessageHandler) {
this.prop = prop;
this.mqttInboundMessageHandler = mqttInboundMessageHandler;
}

@Bean
public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(prop.getClientId() + "-sub-" + Instant.now().toEpochMilli(), mqttClientFactory,
"facego/reply");
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}

@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler InboundMessageHandler() {
return mqttInboundMessageHandler;
}

@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
}

@Slf4j
@Component
public class MqttInboundMessageHandler implements MessageHandler {

@Override
public void handleMessage(Message<?> message) throws MessagingException {
log.info("mqtt reply: {}", message.getPayload());
}
}
  1. 注入消息处理的 MessageChannel。
  2. 注入自己实现的 MqttInboundMessageHandler,并通过 @ServiceActivator 绑定到对应的 MessageChannel。
  3. 注入 Channel Adapter 的实例,配置客户端订阅的 Topic 和相应的 MessageChannel。

Spring Integration 大致交互逻辑

对于发布者:

  1. 消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
  2. DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

  1. 通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel。
  2. 同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定,这里是 MQTT Eclipse Paho Java Client。

总结

本文主要介绍了 Java 使用 MQTT 通信的方式,由于使用了 SpringBoot,因此使用 Spring Integration 来集成会比直接只用 Eclipse Paho Java Client 更符合 Spring 的哲学,所有的 Bean 均单例注入统一管理。

Spring Integration 的好处在于,我们只需要了解其消息通信的基本机制,屏蔽了 Eclipse Paho Java Client 的具体细节,便于编码。从上面的代码可以看出,我们仅仅注入了相关的 Bean,给出响相应的配置信息即可。

参考文献

  • Spring Integration Reference Guide
  • Spring Integration 中文手册(完整版)
  • SpringBoot 集成 MQTT 配置
  • Spring Boot 集成 MQTT
  • 消息推送标准协议:MQTT

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ShardingSphere 实战之读写分离

发表于 2021-09-15

简述

采用 ShardingShpere 的 Sharding-Porxy(透明化的数据库代理端) 模式在本地实现 mysql 数据库读写分离,并通过 java 应用程序连接

sharding-sphere

本地下载并安装 最新 5.0 的 beta 版本:dlcdn.apache.org/shardingsph…

由于我需要连接 mysql 数据库所以,需要下载 mysql-connector-java-5.1.47.jar(repo1.maven.org/maven2/mysq… %SHARDINGSPHERE_PROXY_HOME%/lib 目录

修改配置文件

  • ShardingSphere-Proxy 支持多逻辑数据源,每个以 config- 前缀命名的 YAML 配置文件,即为一个逻辑数据源,比如默认文件 config-database-discovery.yaml
  • ShardingSphere-Proxy 默认使用 3307 端口,可以通过启动脚本追加参数作为启动端口号。如:bin/start.sh 3308
  • ShardingSphere-Proxy 使用 conf/server.yaml 配置注册中心、认证信息以及公用属性。

先来添加并修改 config-myapp.yaml 文件(注意扩展名要写 yaml,写 yml 不能识别)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
yaml复制代码
schemaName: my-app

dataSources:
write_ds:
url: jdbc:mysql://mysql.local.test.myapp.com:23306/test?allowPublicKeyRetrieval=true&useSSL=false&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
username: root
password: nicai
connectionTimeoutMilliseconds: 3000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
minPoolSize: 1
maintenanceIntervalMilliseconds: 30000
read_ds_0:
url: jdbc:mysql://mysql.local.test.read1.myapp.com:23306/test?allowPublicKeyRetrieval=true&useSSL=false&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
username: root
password: nicai
connectionTimeoutMilliseconds: 3000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
minPoolSize: 1
maintenanceIntervalMilliseconds: 30000

rules:
- !READWRITE_SPLITTING # 配置读写分离规则
dataSources:
pr_ds: # 读写分离的逻辑数据源名称 `pr_ds` 用于在数据分片中使用
writeDataSourceName: write_ds #写库数据源名称
readDataSourceNames: #读库数据源名称
- read_ds_0
loadBalancerName: roundRobin # 负载均衡算法名称
# 负载均衡算法配置
loadBalancers:
roundRobin:
type: ROUND_ROBIN # 一共两种一种是 RANDOM(随机),一种是 ROUND_ROBIN(轮询)

如上配置我只添加了一个主库一个只读从库,而数据库之间的主从同步过程由于不是重点本文就省略了,具体我这边比较简单直接用的云创建的只读实例,也就是说主从实例同步让云帮我实现了,当然你也可以用原生的方法,通过 mysql 的 master-salve 等配置来实现。

server.yaml 文件修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
yaml复制代码rules:
- !AUTHORITY
users:
- root@%:root
- sharding@:sharding
provider:
type: ALL_PRIVILEGES_PERMITTED

props:
max-connections-size-per-query: 1
executor-size: 16 # Infinite by default.
proxy-frontend-flush-threshold: 128 # The default value is 128.
# LOCAL: Proxy will run with LOCAL transaction.
# XA: Proxy will run with XA transaction.
# BASE: Proxy will run with B.A.S.E transaction.
proxy-transaction-type: LOCAL
xa-transaction-manager-type: Atomikos
proxy-opentracing-enabled: false
proxy-hint-enabled: false
sql-show: true
check-table-metadata-enabled: false
lock-wait-timeout-milliseconds: 50000 # The maximum time to wait for a lock

启动

1 在 bin 目录下 执行 start.sh 启动 ShardingSphere

2 用任意 mysql 客户端连接数据库,就像连接一个往常的数据库一样

  • 地址:127.0.0.1
  • 端口:3307
  • 账号:root/root
  • 数据库 my-app (这里就用到上面配置的逻辑数据库名了)

3 查看库表中的数据,应该跟主、从库中的一致。

这里我遇到了一个小问题,就是 select 数据的时候用 库名。表名不行,直接写表名可以。

java 应用程序修改配置

1
2
3
4
5
6
7
yaml复制代码spring:
profiles:
include: common-local
datasource:
url: jdbc:mysql://127.0.0.1:3307/my-app?allowPublicKeyRetrieval=true&useSSL=false&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
username: root
password: root

由于采用 proxy 模式,对应用几乎无感,不需要修改代码,只需要修改数据库部分配置文件。

测试

在我测试读写分离时,读库的请求情况:

证明读请求都打到读库上了。

遇到过的问题

1 启动报错,需要配置 server.yaml 第一次启动的时候没配置

2 启动报错: The MySQL server is running with the --read-only option so it cannot execute this statement
我的从库是设置的只读库,但不知道为什么会报错,没有解决,再次启动就好了。

3 启动成功后,用客户端,无法连接 sharding-proxy 数据库,连接异常报错,解决方法是修改了 server.yaml 文件

1
2
3
4
5
6
7
yaml复制代码rules:
- !AUTHORITY
users:
- root@%:root
- sharding@:sharding
provider:
type: ALL_PRIVILEGES_PERMITTED

将 provider 的 type 从之前的 NATIVE 修改为了 ALL_PRIVILEGES_PERMITTED (默认授予所有权限(不鉴权),不会与实际数据库数据库交互。)

参考:

  • shardingsphere.apache.org/document/5.…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

设计模式(10):策略模式 策略模式

发表于 2021-09-15

策略模式

定义

定义一组算法,将每个算法都封装起来,并且使他们之间可以互换。策略模式让算法独立于使用它的客户而变化,也称为政策模式(Policy)

简单的说,就是让类根据传入的不同的具体策略(实现了策略接口,策略接口包含一个自己的算法)来调用不同的算法。同时让一个Context类来管理策略。

示例与代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码//定义策略接口,声明对应的算法
interface Strategy {
void algorithm();
}

//实现不同的策略
class FirstStrategy {
@override
public void algorithm() {
...
}
}

class SecondStrategy {
@override
public void algorithm() {
...
}
}

//定义Context类来管理Strategy
class Context {
private Strategy strategy;
public void changeStrategy(Strategy strategy) {
this.strategy = strategy;
}

public void algorithm() {
Strategy.algorithm();
}
}

//使用
public class Main {
public static void main(String[] args) {
Context work = new Context();
work.changeStrategy(new FirstStrategy());
work.algorithm();

work.changStrategy(new SecondStrategy());
work.algorithm();
}
}

在Java开发中,ThreadPool用到了这个设计模式。最后一个参数RejectedExecutionHandler是一个接口,相当于就是我们这里的抽象策略,而使用的时候则要传入一个具体的拒绝策略。

1
2
3
4
5
6
7
java复制代码public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

策略模式vs状态模式

从代码来看,基本和状态模式一模一样,其实这两种设计模式的总体思想都是一样的,都是为了去耦合。但是这两者对应不同的使用场景。具体来说,不同的状态可能不只是导致仅仅一个调用方法的改变,还有可能会改变一系列不同的调用方法,同时Context的其他成员变量值也有可能会变化,而且状态有可能自发的改变(有限状态机);而策略模式则更具体一点,不同的策略仅仅是调用不同的算法(algorithm),同时策略也不能自行变化,一般必须根据不同的场景使用不同的策略。

总结

相比于状态模式,策略模式更具有一般性一些,在实践中,可以用策略模式来封装几乎任何类型的规则,只要在分析过程中听到需要在不同实践应用不同的业务规则,就可以考虑使用策略模式处理,在这点上策略模式是包含状态模式的功能的,策略模式是一个重要的设计模式。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Vue3 + TypeScript + Gin 实现后台权限

发表于 2021-09-15

最近一直在学习 Vue3 相关的技术栈,包括 CompositionAPI 、TypeScript、vite以及Element UI 对 Vue3 的支持版本 Element Plus。

因此想要使用 Vue3 写一个简单的 RBAC 的用户权限系统。

之前一直是使用 MySQL 这个关系型数据库,正好最近在学习 Kong 网关的时候,接触到了 Postgres 这个关系型数据库,并且还看到了一句话,说是:“MySQL 是目前使用最广泛的数据库,但是 Postgres 是目前最先进的数据库。”

当然这个最先进也是 Postgres 的开源组织自己标榜的。但是对我来说,Postgres 貌似确实要比 MySQL 用起来爽,最起码在数据类型的支持方面,就是一个非常不错的点。

我经常会使用 Json 或者 Array 这种字段,Postgres 就支持的非常不错,当然 MySQL 5.7 以后也是支持 Json 字段的,但是从性能和使用上来说,我还感觉还是 Postgres 用的更好一些,也不排除,是我自己对 MySQL 了解不深入。

好了,废话不多说,我们来实际看看项目吧。

本项目不管是前后端都不会进行过度的封装,该封装的封装,不该封装的就不会进行封装,不会为了装 X 而过度封装代码,减少大家在看代码的时间成本。

演示站点:fdevops.com:8088

github:github.com/lanyulei/sk… , 如果觉得还可以,还请不要吝啬指尖的跳动,轻轻点上一个 star 。

Casbin 权限控制

本系统,使用 Casbin 作为接口的权限管理依赖,使用 RBAC 的方式进行管理,支持用户的多角色绑定。

Casbin 模型文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ini复制代码[request_definition]
r = sub, obj, act

[policy_definition]
p = sub, obj, act

[role_definition]
g = _, _

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act

其实就是 Casbin 官网上给出的 RBAC 模型内容,如果需要使用自定义函数,可参考官网自行添加即可。

casbin.org/docs/zh-CN/…

Casbin Gin 中间件的简单封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
go复制代码package permission

import (
"sky/pkg/conn"
"sky/pkg/logger"
"sky/pkg/tools/response"
"time"

"github.com/casbin/casbin/v2"
gormAdapter "github.com/casbin/gorm-adapter/v3"
"github.com/gin-gonic/gin"
"github.com/spf13/viper"
)

var Enforcer *casbin.SyncedEnforcer

func CasbinSetup() *casbin.SyncedEnforcer {
var (
err error
adapter *gormAdapter.Adapter
)
adapter, err = gormAdapter.NewAdapterByDBWithCustomTable(conn.Orm, nil, viper.GetString("casbin.tableName"))
if err != nil {
logger.Fatal("创建 casbin gorm adapter 失败,错误:%v", err)
}

Enforcer, err = casbin.NewSyncedEnforcer(viper.GetString("casbin.rbacModel"), adapter)
if err != nil {
logger.Fatal("创建 casbin enforcer 失败,错误:%v", err)
}

err = Enforcer.LoadPolicy()
if err != nil {
logger.Fatal("从数据库加载策略失败,错误:%v", err)
}

// 定时同步策略
if viper.GetBool("casbin.isTiming") {
// 间隔多长时间同步一次权限策略,单位:秒
Enforcer.StartAutoLoadPolicy(time.Second * time.Duration(viper.GetInt("casbin.intervalTime")))
}

return Enforcer
}

func CheckPermMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
//获取资源
obj := c.Request.URL.Path
//获取方法
act := c.Request.Method
//获取实体
sub := c.GetString("username")

isAdmin := c.GetBool("isAdmin")
if isAdmin {
c.Next()
} else {
//判断策略中是否存在
if ok, _ := Enforcer.Enforce(sub, obj, act); ok {
c.Next()
} else {
response.Error(c, nil, response.NoPermissionError)
c.Abort()
}
}
}
}

菜单权限

菜单通过对角色可访问的菜单标识进行权限控制,可使多个菜单标识。

路由实例如下:

1
2
3
4
5
6
7
8
9
css复制代码{
path: '/user',
name: 'User',
component: () => import('/@/views/user/list.vue'),
meta: {
title: '用户列表',
auth: ['system:user:list'] // 此为路由标识,只有角色关联了此标识,才可访问。
},
}

校验当前用户是否有权限,用户的权限标识列表将存入到 Vuex 中,当前系统给用户定义了超级管理员的概念,因此当遇到程序是超级管理员的话,则直接放行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typescript复制代码export function setFilterRoute(chil: any) {
let filterRoute: any = [];
chil.forEach((route: any) => {
if (route.meta.auth) {
route.meta.auth.forEach((metaAuth: any) => {
store.state.userInfos.userInfos.authPageList.forEach((auth: any) => {
// 如果是超级管理员,则直接通过
if (store.state.userInfos.userInfos.is_admin || metaAuth === auth) filterRoute.push({ ...route });
});
});
}
});
return filterRoute;
}

项目演示

菜单管理,进行菜单创建、页面元素创建及菜单绑定 API 接口。

image.png

为菜单绑定 API 接口方便进行接口权限管理。

image.png

页面元素管理,包括但是不限于按钮。

image.png

接口管理,后端所有需要通过 Casbin 进行接口校验的接口。

image.png

为角色授权。

image.png

截图内容,仅是功能的一部分,详细的内容,可自行访问演示站点查看。

有任何问题,可以到此处进行留言讨论,www.fdevops.com 。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

史上最详细微信小程序授权登录与后端SprIngBoot交互操

发表于 2021-09-15

你好,我是博主宁在春,一起学习吧!!!

写这篇文章的原因,主要是因为最近在写毕业设计,用到了小程序,这中间曲曲折折,一言难尽啊。毕业设计真的让人麻脑阔😂。唉

最近在持续更新,每天推送完代码,遇到的问题都记下来,希望对大家也能有所帮助。

在网上找了很多很多,看了不下几十篇,说实话,有些给出了核心代码,添上一个微信官方的那张流程图就结束了,会的人一下就懂了。但是说实话,真的不适合入门学者,浪费很多时间都不一定能解决问题,将代码复制完不是少这就是少那,或者就是不齐,不知道看到这篇文章的你有没有遇到过这样的问题。


所以我自己将踩坑的经历写下来了,希望能够帮助到大家,开源进步,交流进步,一起学习!!!

挺多小伙伴遇到过这个问题,如果大家对文章内容存有疑惑或者实现不了这个小demo亦或者文章中有什么错误,可以直接评论、留言或可以直接发问题到邮箱:nzc_wyh@163.com

希望能够帮助到大家(当然,如果我可以做到的话 🦮)

看到都会尽快回复大家,谢谢大家,一起努力

微信官方文档

一、微信小程序官方登录流程图

img

个人理解:

  1. 调用wx.login() 获取code,这个code的作用是实现微信临时登录的url中的一个非常重要的参数。
* 微信授权的url="[api.weixin.qq.com/sns/jscode2…](https://api.weixin.qq.com/sns/jscode2session?appid=%7B0%7D&secret=%7B1%7D&js_code=%7B2%7D&grant_type=authorization_code)"
* `js_code`所用到的值就是 获取到的code。
  1. 把获取到的code传给我们自己的SpringBoot后端,由我们后端向微信接口服务发送请求。
*
1
2
3
java复制代码String url = "https://api.weixin.qq.com/sns/jscode2session?appid={0}&secret={1}&js_code={2}&grant_type=authorization_code";
String replaceUrl = url.replace("{0}", appid).replace("{1}", secret).replace("{2}", code);
String res = HttpUtil.get(replaceUrl);//后面有代码的,莫急
* `appid`:应用ID,`secret`:应用密钥,`js_code`:前台传给我们的`code`
* `secret`获取方式:


    1. 进入[微信公众平台](https://mp.weixin.qq.com/)
    2. 左侧菜单选择【开发管理】
    3. 右侧tab选择【开发设置】
    4. AppSecret栏右侧点击重置会弹出一个二维码,需要开发者扫描二维码才可以重置AppSecret。出现AppSecret后点击复制,并保存你的AppSecret。
    5. 没保存就只能重新生成了。
  1. 后端发送请求后获取到的返回信息:
1
cmd复制代码{"session_key":"G59Evf\/Em54X6WsFsrpA1g==","openid":"o2ttv5L2yufc4-VoSPhTyUnToY60"}
  1. 按照官方文档所讲:自定义登录态与openid和session_key关联,有很多方式可以实现的,如:
* 第一种方式:我们可以将`openid和session_key`存进redis中,前端来访问的时候带上就能够访问了。
* 第二种方式:利用`jwt`方式生成`Token`返回给前端,让前端下次请求时能够带上,就能允许他们访问了。
  1. 前端将token存入storage
  2. 前端在wx.request()发起业务请求携带自定义登录态,后端进行请求头的检查就可以了。
  3. 后端返回业务数据

上述就是官方的方式,但是在现在的时代,数据是非常重要的,不可能说不将用户数据持久化的,所以这个流程会稍稍多一些操作的。

二、个人实现登录流程图

image-20210915094137005

三、小程序端

先说一下,这里只是测试的Demo,是分开测试的。

我本地没有微信的编程坏境是拿小伙伴的测试的。

2.1、调用wx.login()

1
2
3
4
5
6
7
javascript复制代码wx.login({
success:function(res){
if(res.code){
console.log(res.code);
}
}
})

就是这样的一个字符串:

image-20210914210516147

我们将这个返回的code,先保存起来,稍后我们在后端测试中会用上的。

2.2、调用getUserInfo()

1
html复制代码<button open-type="getUserInfo" bindgetuserinfo="userInfoHandler"> Click me <tton>
1
2
3
4
5
6
javascript复制代码// 微信授权
wx.getUserInfo({
success: function(res) {
console.log(res);
}
})

打印出来是这样的一些数据。

image-20210915104220420

我们需要保存的是

  1. encrytedData:包括敏感数据在内的完整用户信息的加密数据(即可以通过反解密,获取出用户数据),详见 用户数据的签名验证和加解密
  2. iv:加密算法的初始向量,详见 用户数据的签名验证和加解密

至此,我们需要在前台获取的数据,已经结束了,接下来就用我们获取到的数据一起来看后端吧!!!


四、SpringBoot后端

为了将代码精简,我这边只是把获取到的数据输出出来,并未真实的保存到数据中。业务操作用注释在文中展示。

项目结构:

image-20210914211642298

3.1、相关jar

创建一个SpringBoot项目,或者maven项目都可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
xml复制代码  <parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.5.2</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>

<!--使用hutool中对http封装工具类 调用 HTTP 请求-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

3.2、yml配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
yml复制代码server:
port: 8081
spring:
application:
name: springboot-weixin
redis:
database: 0
port: 6379
host: localhost
password:
weixin:
appid: 'appid'
secret: '应用密钥'

3.3、公共类

就是一常量

1
2
3
java复制代码public class RedisKey {
public static final String WX_SESSION_ID = "wx_session_id";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码/**
* 统一响应结果集
* @author crush
*/
@Data
public class Result<T> {

//操作代码
Integer code;

//提示信息
String message;

//结果数据
T data;

public Result() {
}

public Result(ResultCode resultCode) {
this.code = resultCode.code();
this.message = resultCode.message();
}

public Result(ResultCode resultCode, T data) {
this.code = resultCode.code();
this.message = resultCode.message();
this.data = data;
}

public Result(String message) {
this.message = message;
}

public static Result SUCCESS() {
return new Result(ResultCode.SUCCESS);
}

public static <T> Result SUCCESS(T data) {
return new Result(ResultCode.SUCCESS, data);
}

public static Result FAIL() {
return new Result(ResultCode.FAIL);
}

public static Result FAIL(String message) {
return new Result(message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
java复制代码
/**
* 通用响应状态
*/
public enum ResultCode {

/* 成功状态码 */
SUCCESS(0, "操作成功!"),

/* 错误状态码 */
FAIL(-1, "操作失败!"),

/* 参数错误:10001-19999 */
PARAM_IS_INVALID(10001, "参数无效"),
PARAM_IS_BLANK(10002, "参数为空"),
PARAM_TYPE_BIND_ERROR(10003, "参数格式错误"),
PARAM_NOT_COMPLETE(10004, "参数缺失"),

/* 用户错误:20001-29999*/
USER_NOT_LOGGED_IN(20001, "用户未登录,请先登录"),
USER_LOGIN_ERROR(20002, "账号不存在或密码错误"),
USER_ACCOUNT_FORBIDDEN(20003, "账号已被禁用"),
USER_NOT_EXIST(20004, "用户不存在"),
USER_HAS_EXISTED(20005, "用户已存在"),

/* 系统错误:40001-49999 */
FILE_MAX_SIZE_OVERFLOW(40003, "上传尺寸过大"),
FILE_ACCEPT_NOT_SUPPORT(40004, "上传文件格式不支持"),

/* 数据错误:50001-599999 */
RESULT_DATA_NONE(50001, "数据未找到"),
DATA_IS_WRONG(50002, "数据有误"),
DATA_ALREADY_EXISTED(50003, "数据已存在"),
AUTH_CODE_ERROR(50004, "验证码错误"),


/* 权限错误:70001-79999 */
PERMISSION_UNAUTHENTICATED(70001, "此操作需要登陆系统!"),

PERMISSION_UNAUTHORISE(70002, "权限不足,无权操作!"),

PERMISSION_EXPIRE(70003, "登录状态过期!"),

PERMISSION_TOKEN_EXPIRED(70004, "token已过期"),

PERMISSION_LIMIT(70005, "访问次数受限制"),

PERMISSION_TOKEN_INVALID(70006, "无效token"),

PERMISSION_SIGNATURE_ERROR(70007, "签名失败"),

//操作代码
int code;
//提示信息
String message;

ResultCode(int code, String message) {
this.code = code;
this.message = message;
}

public int code() {
return code;
}

public String message() {
return message;
}

public void setCode(int code) {
this.code = code;
}

public void setMessage(String message) {
this.message = message;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
java复制代码package com.crush.mybatisplus.config;

import cn.hutool.core.lang.Assert;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.cache.RedisCacheWriter;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;

/**
* redis 配置类
*
* @author crush
*/
@EnableCaching
@Configuration
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
public class RedisConfig {

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//key hasKey的序列化
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);

redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}

@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
return stringRedisTemplate;
}
}

3.4、Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码import com.crush.weixin.commons.Result;
import com.crush.weixin.entity.WXAuth;
import com.crush.weixin.service.IWeixinService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
*
* @author crush
* @since 2021-09-14
*/
@Slf4j
@RestController
@RequestMapping("/weixin")
public class WeixinController {

@Autowired
IWeixinService weixinService;

//这个就是那个使用传code进来的接口
@GetMapping("/sessionId/{code}")
public String getSessionId(@PathVariable("code") String code){
return weixinService.getSessionId(code);
}

@PostMapping("/authLogin")
public Result authLogin(@RequestBody WXAuth wxAuth) {
Result result = weixinService.authLogin(wxAuth);
log.info("{}",result);
return result;
}
}

3.5、service层

1
2
3
4
5
6
java复制代码public interface IWeixinService extends IService<Weixin> {

String getSessionId(String code);

Result authLogin(WXAuth wxAuth);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
java复制代码import cn.hutool.core.lang.UUID;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.crush.weixin.commons.RedisKey;
import com.crush.weixin.commons.Result;
import com.crush.weixin.entity.WXAuth;
import com.crush.weixin.entity.Weixin;
import com.crush.weixin.entity.WxUserInfo;
import com.crush.weixin.mapper.WeixinMapper;
import com.crush.weixin.service.IWeixinService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;


/**
* @author crush
* @since 2021-09-14
*/
@Slf4j
@Service
public class WeixinServiceImpl extends ServiceImpl<WeixinMapper, Weixin> implements IWeixinService {


@Value("${weixin.appid}")
private String appid;

@Value("${weixin.secret}")
private String secret;

@Autowired
StringRedisTemplate redisTemplate;

@Autowired
WxService wxService;


@Override
public String getSessionId(String code) {
String url = "https://api.weixin.qq.com/sns/jscode2session?appid={0}&secret={1}&js_code={2}&grant_type=authorization_code";
String replaceUrl = url.replace("{0}", appid).replace("{1}", secret).replace("{2}", code);
String res = HttpUtil.get(replaceUrl);
String s = UUID.randomUUID().toString();
redisTemplate.opsForValue().set(RedisKey.WX_SESSION_ID + s, res);
return s;
}

@Override
public Result authLogin(WXAuth wxAuth) {
try {
String wxRes = wxService.wxDecrypt(wxAuth.getEncryptedData(), wxAuth.getSessionId(), wxAuth.getIv());
log.info("用户信息:"+wxRes);
//用户信息:{"openId":"o2ttv5L2yufc4-sVoSPhTyUnToY60","nickName":"juana","gender":2,"language":"zh_CN","city":"Changsha","province":"Hunan","country":"China","avatarUrl":"头像链接","watermark":{"timestamp":1631617387,"appid":"应用id"}}
WxUserInfo wxUserInfo = JSON.parseObject(wxRes,WxUserInfo.class);
// 业务操作:你可以在这里利用数据 对数据库进行查询, 如果数据库中没有这个数据,就添加进去,即实现微信账号注册
// 如果是已经注册过的,就利用数据,生成jwt 返回token,实现登录状态
return Result.SUCCESS(wxUserInfo);
} catch (Exception e) {
e.printStackTrace();
}
return Result.FAIL();
}
}

2121年11月27号:应该是微信接口更新了,在此处通过解密获取到的信息中,并不包含openId啦,得自己去拿到才可以。

牵扯到用户信息解密的方法,想要了解,可以去微信官方文档中进行了解,我对此没有深入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码import cn.hutool.core.codec.Base64;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.crush.weixin.commons.RedisKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.security.spec.AlgorithmParameterSpec;
import java.util.Random;

@Slf4j
@Component
public class WxService {
@Autowired
private StringRedisTemplate redisTemplate;

public String wxDecrypt(String encryptedData, String sessionId, String vi) throws Exception {
// 开始解密
String json = redisTemplate.opsForValue().get(RedisKey.WX_SESSION_ID + sessionId);
log.info("之前存储在redis中的信息:"+json);
//之前存储在redis中的信息:{"session_key":"G59Evf\/Em54X6WsFsrpA1g==","openid":"o2ttv5L2yufc4-VoSPhTyUnToY60"}
JSONObject jsonObject = JSON.parseObject(json);
String sessionKey = (String) jsonObject.get("session_key");
byte[] encData = cn.hutool.core.codec.Base64.decode(encryptedData);
byte[] iv = cn.hutool.core.codec.Base64.decode(vi);
byte[] key = Base64.decode(sessionKey);
AlgorithmParameterSpec ivSpec = new IvParameterSpec(iv);
Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
SecretKeySpec keySpec = new SecretKeySpec(key, "AES");
cipher.init(Cipher.DECRYPT_MODE, keySpec, ivSpec);
return new String(cipher.doFinal(encData), "UTF-8");
}
}

最后写个启动类就可以开始测试了。

1
2
3
4
5
6
java复制代码@SpringBootApplication
public class SpringBootWeixin {
public static void main(String[] args) {
SpringApplication.run(SpringBootWeixin.class);
}
}

五、测试

写完后端,接下来,可以利用我们之前收集的那些小程序中获取到的数据啦。

1、先发送第一个请求:

code:就是之前我们获取到的数据。

1
http复制代码http://localhost:8081/weixin/sessionId/{code}

会返回一个sessionId回来,在第二个请求中需要携带。

2、再发送第二个请求

1
http复制代码http://localhost:8081/weixin/authLogin

请求方式:post

data:json格式数据

1
2
3
4
5
json复制代码{
"encryptedData":"sYiwcAM73Ci2EB3y9+C6.....",
"iv": "xZGOj6RwaOS==",
"sessionId":"我们上一个请求获取到sessionId"
}

请求成功是下面这样的。

image-20210914214348638

我们把我们需要的存储到数据库持久化即可啦。

六、自言自语

这只是一个小demo,在使用中大都会结合security安全框架和Jwt一起使用,周末吧,周末比较有空,有空就会更新出来。


你好,我是博主宁在春,有问题可以留言评论或者私信我,大家一起交流学习!

不过都看到这里啦,点个赞吧👩‍💻

366

源码:

SpringBoot-weixin-gitee

SpringBoot-weixin-github

补充:

如果拿github查看项目的话,可以在google安装一个扩展程序。看代码会更加的方便

image-20210915102927418.png

安装之后,在github上查看项目,会在左侧多一个下面这样的目录结构。比较方便

image-20210915103022611.png

今天的文章就到了这里啦,下次再见!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go 工程师必学:Go 大杀器之跟踪剖析 trace

发表于 2021-09-15

大家好,我是煎鱼。

‍‍‍‍‍‍‍‍‍前段时间分享了《Go 程序崩了?煎鱼教你用 PProf 工具来救火!》,但有时候单单使用 pprof 还不一定足够完整观查并解决问题,因为在真实的程序中还包含许多的隐藏动作,例如:

  • Goroutine 在执行时会做哪些操作?
  • Goroutine 执行/阻塞了多长时间?
  • Syscall 在什么时候被阻止?在哪里被阻止的?
  • 谁又锁/解锁了 Goroutine ?
  • GC 是怎么影响到 Goroutine 的执行的?

这些东西用 pprof 是很难分析出来的,但如果你又想知道上述的答案的话,你可以用本章节的主角 go tool trace 来打开新世界的大门。

一起愉快地开始吸鱼之路。

初步了解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
go复制代码import (
 "os"
 "runtime/trace"
)

func main() {
 trace.Start(os.Stderr)
 defer trace.Stop()

 ch := make(chan string)
 go func() {
  ch <- "Go语言编程之旅"
 }()

 <-ch
}

生成跟踪文件:

1
go复制代码$ go run main.go 2> trace.out

启动可视化界面:

1
2
3
4
csharp复制代码$ go tool trace trace.out
2019/06/22 16:14:52 Parsing trace...
2019/06/22 16:14:52 Splitting trace...
2019/06/22 16:14:52 Opening browser. Trace viewer is listening on http://127.0.0.1:57321

查看可视化界面:

图片

  • View trace:查看跟踪
  • Goroutine analysis:Goroutine 分析
  • Network blocking profile:网络阻塞概况
  • Synchronization blocking profile:同步阻塞概况
  • Syscall blocking profile:系统调用阻塞概况
  • Scheduler latency profile:调度延迟概况
  • User defined tasks:用户自定义任务
  • User defined regions:用户自定义区域
  • Minimum mutator utilization:最低 Mutator 利用率

调度延迟概况

在刚开始查看问题时,除非是很明显的现象,否则不应该一开始就陷入细节。

因此我们一般先查看 “Scheduler latency profile”,我们能通过 Graph 看到整体的调用开销情况,如下:

图片

演示程序比较简单,因此这里就两块,一个是 trace 本身,另外一个是 channel 的收发。

Goroutine 分析

第二步看 “Goroutine analysis”,我们能通过这个功能看到整个运行过程中,每个函数块有多少个有 Goroutine 在跑。

观察每个的 Goroutine 的运行开销都花费在哪个阶段。如下:

图片

通过上图我们可以看到共有 3 个 goroutine,分别是:

  • runtime.main。
  • runtime/trace.Start.func1。
  • main.main.func1。

它们都做了些什么事呢,我们可以通过点击具体细项去观察。如下:

图片

同时也可以看到当前 Goroutine 在整个调用耗时中的占比,以及 GC 清扫和 GC 暂停等待的一些开销。

如果你觉得还不够,可以把图表下载下来分析,相当于把整个 Goroutine 运行时掰开来看了,这块能够很好的帮助我们对 Goroutine 运行阶段做一个的剖析,可以得知到底慢哪,然后再决定下一步的排查方向。

如下:

名称 含义 耗时
Execution Time 执行时间 3140ns
Network Wait Time 网络等待时间 0ns
Sync Block Time 同步阻塞时间 0ns
Blocking Syscall Time 调用阻塞时间 0ns
Scheduler Wait Time 调度等待时间 14ns
GC Sweeping GC 清扫 0ns
GC Pause GC 暂停 0ns

查看跟踪

在对当前程序的 Goroutine 运行分布有了初步了解后,我们再通过 “查看跟踪” 看看之间的关联性,如下:

图片

这个跟踪图粗略一看,相信有的小伙伴会比较懵逼,我们可以依据注解一块块查看,如下:

  1. 时间线:显示执行的时间单元,根据时间维度的不同可以调整区间,具体可执行 shift + ? 查看帮助手册。
  2. 堆:显示执行期间的内存分配和释放情况。
  3. 协程:显示在执行期间的每个 Goroutine 运行阶段有多少个协程在运行,其包含 GC 等待(GCWaiting)、可运行(Runnable)、运行中(Running)这三种状态。
  4. OS 线程:显示在执行期间有多少个线程在运行,其包含正在调用 Syscall(InSyscall)、运行中(Running)这两种状态。
  5. 虚拟处理器:每个虚拟处理器显示一行,虚拟处理器的数量一般默认为系统内核数。
  6. 协程和事件:显示在每个虚拟处理器上有什么 Goroutine 正在运行,而连线行为代表事件关联。

图片

点击具体的 Goroutine 行为后可以看到其相关联的详细信息,这块很简单,大家实际操作一下就懂了。文字解释如下:

  • Start:开始时间
  • Wall Duration:持续时间
  • Self Time:执行时间
  • Start Stack Trace:开始时的堆栈信息
  • End Stack Trace:结束时的堆栈信息
  • Incoming flow:输入流
  • Outgoing flow:输出流
  • Preceding events:之前的事件
  • Following events:之后的事件
  • All connected:所有连接的事件

查看事件

我们可以通过点击 View Options-Flow events、Following events 等方式,查看我们应用运行中的事件流情况。如下:

图片

通过分析图上的事件流,我们可得知:

  • 这程序从 G1 runtime.main 开始运行。
  • 在运行时创建了 2 个 Goroutine:
  • 先是创建 G18 runtime/trace.Start.func1。
  • 再是创建 G19 main.main.func1。

同时我们可以通过其 Goroutine Name 去了解它的调用类型。如下:

图片

  • runtime/trace.Start.func1 就是程序中在 main.main 调用了 runtime/trace.Start 方法。
  • 紧接着该方法又利用协程创建了一个闭包 func1 去进行调用。

在这里我们结合开头的代码去看的话,很明显就是 ch 的输入输出的过程了。

实战演练

凌晨三点,突然生产环境突然出现了问题,机智的你早已埋好 _ "net/http/pprof" 这个神奇的工具。

被告警电话叫醒的你,迷迷糊糊地通过特定的方式执行了如下命令:

1
2
ruby复制代码$ curl http://127.0.0.1:6060/debug/pprof/trace\?seconds\=20 > trace.out
$ go tool trace trace.out

查看跟踪

你很快的看到了熟悉的 List 界面,然后不信邪点开了 View trace 界面,如下:

图片

完全看懵的你,稳住,对着合适的区域执行快捷键 W 不断地放大时间线,如下:

图片

经过初步排查,你发现上述绝大部分的 G 竟然都和 google.golang.org/grpc.(*Server).Serve.func 有关,关联的一大串也是 Serve 所触发的相关动作。

图片

这时候有经验的你心里已经有了初步结论,你可以继续追踪 View trace 深入进去。

不过建议先鸟瞰全貌,因此我们再往下看 “Network blocking profile” 和 “Syscall blocking profile” 所提供的信息。

网络阻塞概况

图片

系统调用阻塞概况

图片

通过对以上三项的跟踪分析,加上这个泄露,这个阻塞的耗时,这个涉及的内部方法名,很明显就是哪位又忘记关闭客户端连接了。

这时候我们就可以接下进行下一步的排查和修改了。

总结

通过本文我们习得了 go tool trace 的武林秘籍,它能够跟踪捕获各种执行中的事件,例如:

  • Goroutine 的创建/阻塞/解除阻塞。
  • Syscall 的进入/退出/阻止,GC 事件。
  • Heap 的大小改变。
  • Processor 启动/停止等等。

希望你能够用好 Go 的两大杀器 pprof + trace 组合,此乃排查好搭档,谁用谁清楚,即使他并不是绝对的万能。

若有任何疑问欢迎评论区反馈和交流,最好的关系是互相成就,各位的点赞就是煎鱼创作的最大动力,感谢支持。

文章持续更新,可以微信搜【脑子进煎鱼了】阅读。本文 GitHub github.com/eddycjy/blo… 已收录,欢迎 Star 催更。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Map容器家族(Map接口源码详解)

发表于 2021-09-15

一、在Map集合体系中的位置及概述

 Map接口式是Map集合体系的顶层接口,它定义和规范了该集合体系的大体规范,所有子类该实现的方法。该体系是以键值对(key-value)对位元素进行存储。


描述:


(1)映射(map)是一个存储键、键值对的对象,给定一个键,可以查询得到它的值,键和值都可以是对象。  

(2)键必须是唯一的,值可以重复(Map接口映射唯一的键到值)

(3)有些映射可以接收null键和null值,而有的不行

(4)下面的接口支持映射:

Map接口 映射唯一关键字给值

Map.Entry接口 描述映射中的元素(关键字/值对),这是Map的一个内部类

SortedMap接口 扩展Map以便关键字按升序保持

(5)键(Key)是以后用于检索值的对象,给定一个键和一个值,可以存储这个值到一个Map对象中,以后可以  

使用对应的键来检索它

二、约束子类必须实现的方法

1.查询操作(Query Operations)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scss复制代码    /**
* 功能:查询元素个数
*/
int size();

/**
* 功能:是否为空
*/
boolean isEmpty();

/**
* 功能:判断键是否存在
*/
boolean containsKey(Object key);

/**
* 功能:判断值是否存在
*/
boolean containsValue(Object value);

/**
* 功能:根据键获取值
*/
V get(Object key);

2.修改操作(Modification Operations)

1
2
3
4
5
6
7
8
9
scss复制代码    /**
* 功能:向map集合中添加键值对,如果键原来没有返回null,如果有返回被替换的值
*/
V put(K key, V value);

/**
* 功能:根据key移除键值对,并返回值
*/
V remove(Object key);

3.批量操作(Bulk Operations)

1
2
3
4
5
6
7
8
9
csharp复制代码    /**
* 功能:批量添加map集合
*/
void putAll(Map<? extends K, ? extends V> m);

/**
* 功能:清空map集合中的所有键值对
*/
void clear();

4.视图操作(Views)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scss复制代码    /**
* 功能:返回map集合中所有键的Set集合
*/
Set<K> keySet();

/**
* 功能:返回map集合中所有值的Collection集合
*/
Collection<V> values();

/**
* 功能:返回map集合中所有键值对的Entry表示形式的Set集合
*/
Set<Map.Entry<K, V>> entrySet();

5.键值对视图接口(interface Entry<K,V>),该接口的方法如下:

该接口的作用是保存一个Key和Value键值对,和约束了一些操作键值的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scss复制代码        /**
* 功能:获取键
*/
K getKey();

/**
* 功能:获取值
*/
V getValue();

/**
* 功能:设置键的值
*/
V setValue(V value);

/**
* 功能:比较是否相等
*/
boolean equals(Object o);

/**
* 功能:返回hash码
*/
int hashCode();

JDK1.8后新加的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
swift复制代码        /**
* 功能:返回一个按键在自然顺序排序的比较器
*/
public static <K extends Comparable<? super K>, V> Comparator<Map.Entry<K,V>> comparingByKey() {
return (Comparator<Map.Entry<K, V>> & Serializable)
(c1, c2) -> c1.getKey().compareTo(c2.getKey());
}

/**
* 功能:返回一个按值在自然顺序排序的比较器
*/
public static <K, V extends Comparable<? super V>> Comparator<Map.Entry<K,V>> comparingByValue() {
return (Comparator<Map.Entry<K, V>> & Serializable)
(c1, c2) -> c1.getValue().compareTo(c2.getValue());
}

/**
* 功能:返回一个按键自定义比较器顺序排序的比较器
*/
public static <K, V> Comparator<Map.Entry<K, V>> comparingByKey(Comparator<? super K> cmp) {
Objects.requireNonNull(cmp);
return (Comparator<Map.Entry<K, V>> & Serializable)
(c1, c2) -> cmp.compare(c1.getKey(), c2.getKey());
}

/**
* 功能:返回一个按值自定义比较器顺序排序的比较器
*/
public static <K, V> Comparator<Map.Entry<K, V>> comparingByValue(Comparator<? super V> cmp) {
Objects.requireNonNull(cmp);
return (Comparator<Map.Entry<K, V>> & Serializable)
(c1, c2) -> cmp.compare(c1.getValue(), c2.getValue());
}

6.比较和哈希(Comparison and hashing)

1
2
3
4
5
6
7
8
9
arduino复制代码    /**
* 功能:比较是否相等
*/
boolean equals(Object o);

/**
* 功能:返回哈希值
*/
int hashCode();

7.JDK1.8新添加的default方法(Defaultable methods)

default关键字是jdk1.8新添加的,它打破了接口中的方法不能有方法体的标准。

(1)获取方法 getOrDefault

1
2
3
4
5
6
7
8
9
scss复制代码    /**
* 功能:返回指定键映射到的值,如果此映射不包含键的映射,则返回指定的默认值
*/
default V getOrDefault(Object key, V defaultValue) {
V v;
return (((v = get(key)) != null) || containsKey(key))
? v
: defaultValue;
}

(2)遍历方法 forEcah

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scss复制代码    /**
* 功能:对每个键值对执行相同的操作,该操作使用lambda表达式定义
* 注:BiConsumer函数式接口为两个输入
*/
default void forEach(BiConsumer<? super K, ? super V> action) {
Objects.requireNonNull(action);
for (Map.Entry<K, V> entry : entrySet()) {
K k;
V v;
try {
k = entry.getKey();
v = entry.getValue();
} catch(IllegalStateException ise) {
// this usually means the entry is no longer in the map.
throw new ConcurrentModificationException(ise);
}
action.accept(k, v);
}
}
使用方法演示
1
2
3
4
5
6
go复制代码        Map<Integer, Integer> map = new HashMap<>();
map.put(1, 1);
map.put(2, 2);
map.put(3, 3);

map.forEach((x1, x2) -> System.out.println(x1 + "=" + x2));

(3)对所有值进行操作 raplaceAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
javascript复制代码    /**
* 功能:根据指定的操作,对所有键的值进行相同的操作
* 注:BiFunction函数值接口为两个输入,一个输出
*/
default void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
Objects.requireNonNull(function);
for (Map.Entry<K, V> entry : entrySet()) {
K k;
V v;
try {
k = entry.getKey();
v = entry.getValue();
} catch(IllegalStateException ise) {
// this usually means the entry is no longer in the map.
throw new ConcurrentModificationException(ise);
}

// ise thrown from function is not a cme.
v = function.apply(k, v);

try {
entry.setValue(v);
} catch(IllegalStateException ise) {
// this usually means the entry is no longer in the map.
throw new ConcurrentModificationException(ise);
}
}
}
使用方法演示
1
2
3
4
5
6
7
8
go复制代码        Map<Integer, Integer> map = new HashMap<>();
map.put(1, 1);
map.put(2, 2);
map.put(3, 3);

// x1和x2分别为键和值的变量,x1+x2为对所有值的操作
map.replaceAll((x1, x2) -> x1 + x2);
map.forEach((x1, x2) -> System.out.println(x1 + "=" + x2));

(4)添加元素 putIfAbsent

1
2
3
4
5
6
7
8
9
10
11
ini复制代码    /**
* 功能:如果map中没有指定的键,则put进该键值对
*/
default V putIfAbsent(K key, V value) {
V v = get(key);
if (v == null) {
v = put(key, value);
}

return v;
}

(5)删除功能 remove

1
2
3
4
5
6
7
8
9
10
11
12
vbnet复制代码    /**
* 功能:移除指定的键值对
*/
default boolean remove(Object key, Object value) {
Object curValue = get(key);
if (!Objects.equals(curValue, value) ||
(curValue == null && !containsKey(key))) {
return false;
}
remove(key);
return true;
}

(6)替换功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
vbnet复制代码    /**
* 功能:替换
*/
default boolean replace(K key, V oldValue, V newValue) {
Object curValue = get(key);
if (!Objects.equals(curValue, oldValue) ||
(curValue == null && !containsKey(key))) {
return false;
}
put(key, newValue);
return true;
}

default V replace(K key, V value) {
V curValue;
if (((curValue = get(key)) != null) || containsKey(key)) {
curValue = put(key, value);
}
return curValue;
}

三、总结

Map接口是对键值对形式存储的数据结构方法的约束,子类方法需要实现其规定的抽象方法。在JDK1.8之后又添加了一些default声明的方法去扩充它本身的方法,新添加的方法中有很大一部分是关于1.8后新引入的函数式接口的方法,我个人认使用Lambda表达式这在很大程度上简化了编程代码的代码量。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

mysql连接池

发表于 2021-09-15

一、为什么要使用连接池

频繁的建立、关闭连接会降低系统性能

1)资源重用,减少开销

2)响应速度快

3)可根据预先设定好的超时,强制回收连接,避免资源泄露

二、实现

在内部对象池维护一定数量的数据库连接,对外暴露获取和放回的方法

Golang 的连接池实现在标准库 database/sql/sql.go 下

2.1 连接池的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
go复制代码type DB struct {
waitDuration int64 //等待新连接的总时间
mu sync.Mutex
connector driver.Connector
freeConn []*driverConn
connRequests map[uint64]chan connRequest
nextRequest uint64 //在 connRequests 中使用的下一个键
resetterCh chan *driverConn

numOpen int //打开和挂起的打开连接数
maxIdle int
maxOpen int
maxLifetime time.Duration //连接可重用的最长时间
waitCount int64 //等待的连接总数
maxIdleClosed int64 //由于最大可用限制而关闭的连接总数
closed bool
}

type driverConn struct {
db *DB
ci driver.Conn
onPut []func()
closed bool
inUse bool
lastErr error
createdAt time.Time
sync.Mutex
}

type connRequest struct {
conn *driverConn
err error
}

2.2 获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
go复制代码func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
//1.判断连接池是否已经关闭
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errors.New("sql: database is closed")
}
//2.检测context是否被取消
select {
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
default:
}
lifetime := db.maxLifetime
//3.如果有空闲连接,取出一个
numFree := len(db.freeConn)
if numFree > 0 && strategy == cachedOrNewConn {
conn := db.freeConn[0]
db.freeConn = db.freeConn[1:numFree]
conn.inUse = true
db.mu.Unlock()
//3.1检查连接是否过期
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
//3.2返回可用连接
return conn, nil
}
//4.新建一个request,并等待空闲连接
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()

waitStart := time.Now()

select {
case <-ctx.Done():
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()

atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

select {
default:
case ret, ok := <-req:
if ok && ret.conn != nil {
//context已经被取消,但连接已经取出来了,需要放回去
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
case ret, ok := <-req:
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
// Lock around reading lastErr to ensure the session resetter finished.
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
//5.创建新连接
db.numOpen++
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen--
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
dc := &driverConn{
db: db,
createdAt: time.Now(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}

2.3 释放连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
go复制代码func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
db.mu.Lock()
if !dc.inUse {
if debugGetPut {
fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
}
panic("sql: connection returned that was never out")
}
if debugGetPut {
}

dc.inUse = false

for _, fn := range dc.onPut {
fn()
}
dc.onPut = nil

//不要重用坏连接
if err == driver.ErrBadConn {
db.maybeOpenNewConnections()
db.mu.Unlock()
dc.Close()
return
}

if putConnHook != nil {
putConnHook(db, dc)
}

if db.closed {
//如果连接将被关闭,则不需要重置它们
//防止在 DB 关闭后写入 resetterCh
resetSession = false
}
if resetSession {
if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
dc.Lock()
}
}
//归还连接
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
//归还失败 关闭连接并返回
if !added {
if resetSession {
dc.Unlock()
}
dc.Close()
return
}
if !resetSession {
return
}
//把连接写入chan resetterCh
select {
default:
dc.lastErr = driver.ErrBadConn
dc.Unlock()
case db.resetterCh <- dc:
}
}

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
//如果已经超过最大打开数量了,就不需要在回归pool了
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
//从connRequest这个map中随机取出一个排队等待的请求,就不归还池子了
if c := len(db.connRequests); c > 0 {
var reqkey uint64
var req chan connRequest
for reqkey, req = range db.connRequests {
break
}
//删除这个排队的请求
delete(db.connRequests, reqkey)
if err == nil {
dc.inUse = true
}
//把连接给这个排队的请求
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
//如果最大空闲数大于当前空闲连接数,则放回池中
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
}

return true
}

2.4 其它定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go复制代码type connReuseStrategy int8

var errDBClosed = errors.New("sql: database is closed")

const cachedOrNewConn = 1
const debugGetPut bool = false

func (db *DB) nextRequestKeyLocked() uint64 {
next := db.nextRequest
db.nextRequest++
return next
}

func (dc *driverConn) expired(timeout time.Duration) bool {
if timeout <= 0 { // 不会过期
return false
}
return dc.createdAt.Add(timeout).Before(time.Now())
}

func (db *driverConn) Close() {
db.closed = true
}

三、使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
erlang复制代码import (
"fmt"
"time"

"database/sql"

"gorm.io/driver/mysql"
"gorm.io/gorm"
)

func main() {
d, e := sql.Open("mysql", "dsn")
_, _ = d, e

dsn := "root:proot@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
fmt.Println("error:", err)
return
}
sqlDB, err := db.DB()
if err != nil {
fmt.Println("error:", err)
return
}
sqlDB.SetMaxIdleConns(10)
sqlDB.SetMaxOpenConns(1000)
sqlDB.SetConnMaxLifetime(time.Hour)
sqlDB.Stats()

db.Select("")
db.Update("", "")
db.Delete("")
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

工作两三年了,整不明白架构图都画啥?

发表于 2021-09-15

作者:小傅哥

原文:mp.weixin.qq.com/s/50574gVPP…

一、前言

很多程序员画架构图头疼,不知道画什么、怎么画!

分享、评审、述职、答辩,只要你在程序员这个行业,就几乎离不开要画图。

一提到画图很多人就想站会起来喊,”内卷“、”内卷啦“、”PPT工程师“,但程序代码本身就是一种数学逻辑的具体实现,如果没有一些图表配合文字的阐述,讲真很难让所有人都能在共同的共识下进行交流。

这不像是文科,”八表流云澄夜色,九霄华月动春城“ 上来就能联想到它是在描述啥。但是偏理科代码逻辑或架构设计,只能把抽象的内容用图表的形式展现出来,让大家在同一的共识下共同协同工作。

而我们画的架构图、流程图、结构图、功能图、逻辑图等,都需要好看、好懂、好用、好搞,因为:

  • 好看是为了提升沟通效率,
  • 好懂是为了提升交流共识,
  • 好用是为了提升交付质量,
  • 好搞是为了提升实施速度。

这就像君子在追求漂亮姑娘一样,好看就想主动撩一下、有品行和共同的三观很快让你开口说我懂你、接下来就是交付质量和实施速度了,那也是水到渠成的事。

好,别激动,接下来我们就开始专心研究研究架构图,都有哪些,该怎么画,有什么手法。

二、架构图有哪几种?

仅说技术架构图的话,通常我们☞指的是选型各项技术组件来支撑整个服务建设的系统架构。但用于不同人群范围和不同场景下会有其他分类,如图 26-1 架构图分类

图 26-1 架构图分类

  • 业务架构:需求初期业务的结果和过程描述一般比较模糊,可能来自于某个老板、运营或用户的反馈。

客户说海尔洗衣机洗土豆会堵,海尔立马设计专门的土豆洗衣机

业务方向往往是定方向和结果的叫战略,主要包括业务规划、业务模块和流程以及问题域的列表等。

  • 应用架构:服务复用、跨组协同,简单、灵活、整合是应用架构必须考虑的点,就像你要上线一个聊天功能,那么聊天内容的输入法、文字识别、舆情监控以及视频服务、支付服务等,它们都是在应用架构分层下沉淀到平台的产物,在供各个方使用。
  • 产品架构:业务提需求,产品定方案,相对于业务的粗放流程,产品架构会更加细腻以及考虑各个模块的分层和边界。
  • 数据架构:数据的获取、数据的存放和数据的使用是数据架构要解决的三个问题,数据库存放、大数据汇总、数据分析等。
  • 技术架构:是离程序员最近的架构设计,它不仅是系统搭建的架构图设计,还包括了结构、功能、流程、逻辑等内容。它的具体描述就是整个系统如何落地的具体实现方案。

三、Zachman框架是什么?

Zachman框架,由约翰 扎科曼(John Zachman )在1987年创立的全球第一个企业架构理论,其论文《信息系统架构框架》至今仍被业界认为是企业架构设计方面最权威的理论。

Zachman框架(Zachman framework)是一种逻辑结构,它可以对企业信息按照不同分类和不同角度进行表示。

Zachman框架,从横向六个角度看待企业,这个六个观点可以分为;什么内容、如何工作、什么地点、谁负责、为什么这么做(称为W5H)。

框架的列由一组工件组成,分为规划者、拥有者、设计者(架构师)、建造者、分包者、产品,或者有时表示为视点:范围上下文,业务概念,系统逻辑,技术,物理,组件组装和操作类。整体如图 26-2 TOGAF Zachman框架

图 26-2 TOGAF Zachman框架,小傅哥根据描述重新绘制

表格横向六项 代表了用于描述信息系统的某一个方面,对于任何一个事物只要在这几个基本方面对其进行清洗的解释就足够可以描述清楚。

  • 数据(What,即什么内容):什么是业务数据,信息或对象?
  • 功能(How,即如何工作):业务如何运作,即什么是业务流程?
  • 网络(Where,即何处):企业运营、部署在哪里?
  • 人(Who,即何人负责):什么人?什么是业务部门及其等级制度?
  • 时间(When,即什么时间):业务计划和工作流程是什么?什么时候执行?
  • 原因(Why,即为什么做):为什么选择的解决方案?这是怎么产生的?

表格纵向六项 代表了在信息系统构造过程中所涉及到的人在描述信息系统时所采用的视角,包括:

  • 范围/规划者(Planner):此视图描述了业务目的和策略,充当其他视图将被派生和管理的上下文。
  • 业务模型/拥有者(Owner):这是对信息系统必须在其中运作的组织的描述。
  • 系统模型/设计师(Designer):该视图概述了系统如何满足组织的信息需求。
  • 技术模型/建造者(Builder):这是系统如何实施的表示,它使特定的解决方案和技术显而易见。
  • 详细表述/分包者(Sub-Contractor):这些表示说明了某些系统元素的特定于实现的细节:在生产开始之前需要进一步说明的部分。
  • 功能系统/产品(Functioning Enterprise):在1987年的论文(《A framework for information systems architecture》)中并没有这一行的内容,实际上此行的内容也并不在架构描述的范畴的之内,不过为了使得架构Zachman框架对于架构的表述更加完备,这一行最终还是被加了进去。

根据 TOGAF 的定义,企业是具有一系列共同目标组织的集合,而架构则是为了有效地实现这一系列目标。

在实现的过程中 定义了企业的结构和运作模式的概念蓝图(SearchCIO),以及构成企业的所有关键元素和其关系的综合描述(Zachman)。通过创建、沟通和优化用以描述企业未来状态和发展的关键原则和模型以将业务愿景和战略转化成有效的企业变更的过程(Gartner)。

可以这一部分内容会比较绕,但可以作为架构设计的知识扩展进行学习理解以及运用。

四、陪你画个架构图

简单来说,架构图就是为了达成交流共识的实现方案演示,并不一定非得拘泥于某种形式,只要你能画的清楚,讲的明白就最合适不过了。

1. 架构选型图

架构选型图

  • 难度:⭐⭐⭐
  • 作用:通常在新项目开发初期,都要做一些技术选型工作。在负载、网关、架构、治理、框架、服务、数据以及环境和支撑服务上,要选择适合当前开发的技术。

2. 微服务架构

微服务架构,简化版

  • 难度:⭐⭐⭐⭐
  • 作用:技术选型完毕后,接下来就是对于这些技术的运用。这个过程有点像搭积木一样,把每一个区域用适合此位置的积木填充进去。如果是团队初建或者是技术升级,那么这个过程还是比较复杂的,需要大量的验证。不过其实互联网的技术分层和使用已经相对稳定,搭建一个这样的微服务并不会耗费太长的时间。

3. 技术架构图

技术架构图

  • 难度:⭐⭐⭐⭐
  • 作用:技术架构图主要是对于研发层面做技术实现指导的,它可以把系统分层和实现结构划分清楚。另外一般也会把案例工程的结构拿出来一起讲解,这样可以让团队伙伴快速的进入开发。

五、总结

  • 本章节向大家讲解了什么是架构图,架构图的分类和怎么画架构图,通过这样的内容可以让大家对架构图有一个全貌的认知。在以后自己画架构图了也可以非常明确的知道面对的什么用户群体,要画的内容是什么。
  • TOGAF有一套非常完善的企业架构理论,它描述了一种开发和管理企业体系结构生命周期的方法,并构成了TOGAF的核心。所涉及到的知识非常丰富,值得认真看一下。
  • 好看,能把一件事做的好看非常重要,好看能让人提起兴趣、好看可以使沟通成本降低。也鼓励大家尽可能把经过自己手里的东西,做的好看一些。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

springboot使用rabbitmq记录 1 加入ma

发表于 2021-09-15
  1. 加入maven依赖

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置rabbitmq

1
2
3
4
5
6
7
yaml复制代码spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /users
username: users
password: 1234
  1. 生产者测试类

注:单独调用生产者测试类,如果没有消费者监听,并不会生产消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
typescript复制代码import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitMqProviderTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
//点对点和工作队列模式,发送普通消息
public void sendMsg() {
for (int i=0; i<10; i++) {
rabbitTemplate.convertAndSend("hello", i+" hello world");
}
}

@Test
//交换机模式/广播模式发送消息,引入交换机,路由为空
public void sendExchangeMsg() {
rabbitTemplate.convertAndSend("logs", "", "this is a log");
}

@Test
//路由模式发送消息,相比交换机模式,发送路由key
public void sendRoutingMsg() {
rabbitTemplate.convertAndSend("directs", "info", "this is a info log");
rabbitTemplate.convertAndSend("directs", "error", "this is a error log");
}

@Test
//动态路由模式发送消息,相比路由模式,路由key可以是多个单词,通过分隔符.分割
public void sendTopicMsg() {
rabbitTemplate.convertAndSend("topics", "user.save", "topic test");
rabbitTemplate.convertAndSend("topics", "user.dept.save", "topic test");
}

}
  1. 点对点和工作队列模式 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
less复制代码import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class HelloConsumer {

@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("hello"))
public void receive1(String msg) {
System.out.println("msg1=" + msg);
}

@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("hello"))
public void receive2(String msg) {
System.out.println("msg2=" + msg);
}
}
  1. 交换机模式/广播模式 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
less复制代码import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class ExchangeConsumer {

@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs", type = "fanout"))
})
public void exchangeMsg1(String msg) {
System.out.println("exchangeMsg1=" + msg);
}

@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs", type = "fanout"))
})
public void exchangeMsg2(String msg) {
System.out.println("exchangeMsg2=" + msg);
}
}
  1. 路由模式 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
less复制代码import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class RoutingConsumer {

@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "directs"),
key = {"info", "error"})
})
public void exchangeMsg1(String msg) {
System.out.println("directMsg1=" + msg);
}

@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "directs"),
key = {"error"})
})
public void exchangeMsg2(String msg) {
System.out.println("directMsg2=" + msg);
}
}
  1. 动态路由模式 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
less复制代码import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "topics", type = "topic"),
key = {"user.*"})
})
public void exchangeMsg1(String msg) {
System.out.println("topics1=" + msg);
}

@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "topics", type = "topic"),
key = {"user.#"})
})
public void exchangeMsg2(String msg) {
System.out.println("topics2=" + msg);
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…528529530…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%