开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

如何与 etcd 服务端进行通信?客户端 API 实践与核心

发表于 2021-10-22

你好,我是 aoho,今天我和你分享的是通信接口:客户端 API 实践与核心方法。

etcd 提供了哪些接口?你仔细阅读过 etcd 的接口文档吗?接口该如何使用?

学习客户端与 etcd 服务端的通信以及 etcd 集群节点的内部通信接口对于我们更好地使用和掌握 etcd 组件很有帮助,也是所必需了解的内容。本文我们将会介绍 etcd 的 gRPC 通信接口以及客户端的实践。

etcd clientv3 客户端这些

etcd 客户端 clientv3 接入的示例将会以 Go 客户端为主,读者需要准备好基本的开发环境。

首先是 etcd clientv3 的初始化,我们根据指定的 etcd 节点,建立客户端与 etcd 集群的连接。

1
2
3
4
php复制代码    cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})

如上的代码实例化了一个 client,这里需要传入的两个参数:

  • Endpoints:etcd 的多个节点服务地址,因为我是单点本机测试,所以只传 1 个。
  • DialTimeout:创建 client 的首次连接超时,这里传了 5 秒,如果 5 秒都没有连接成功就会返回 err;值得注意的是,一旦 client 创建成功,我们就不用再关心后续底层连接的状态了,client 内部会重连。

etcd 客户端初始化

解决完包依赖之后,我们初始化 etcd 客户端。客户端初始化代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
go复制代码// client_init_test.go
package client

import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"testing"
"time"
)
// 测试客户端连接
func TestEtcdClientInit(t *testing.T) {

var (
config clientv3.Config
client *clientv3.Client
err error
)
// 客户端配置
config = clientv3.Config{
// 节点配置
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
} else {
// 输出集群信息
fmt.Println(client.Cluster.MemberList(context.TODO()))
}
client.Close()
}

如上的代码,预期的执行结果如下:

1
2
3
4
css复制代码=== RUN   TestEtcdClientInit
&{cluster_id:14841639068965178418 member_id:10276657743932975437 raft_term:3 [ID:10276657743932975437 name:"default" peerURLs:"http://localhost:2380" clientURLs:"http://0.0.0.0:2379" ] {} [] 0} <nil>
--- PASS: TestEtcdClientInit (0.08s)
PASS

可以看到 clientv3 与 etcd Server 的节点 localhost:2379 成功建立了连接,并且输出了集群的信息,下面我们就可以对 etcd 进行操作了。

client 定义

接着我们来看一下 client 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance

// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
}

注意,这里显示的都是可导出的模块结构字段,代表了客户端能够使用的几大核心模块,其具体功能介绍如下:

  • Cluster:向集群里增加 etcd 服务端节点之类,属于管理员操作。
  • KV:我们主要使用的功能,即操作 K-V。
  • Lease:租约相关操作,比如申请一个 TTL=10 秒的租约。
  • Watcher:观察订阅,从而监听最新的数据变化。
  • Auth:管理 etcd 的用户和权限,属于管理员操作。
  • Maintenance:维护 etcd,比如主动迁移 etcd 的 leader 节点,属于管理员操作。

proto3

etcd v3 的通信基于 gRPC,proto 文件是定义服务端和客户端通讯接口的标准。包括:

  • 客户端该传什么样的参数
  • 服务端该返回什么参数
  • 客户端该怎么调用
  • 是阻塞还是非阻塞
  • 是同步还是异步。

gRPC 推荐使用 proto3 消息格式,在进行核心 API 的学习之前,我们需要对 proto3 的基本语法有初步的了解。proto3 是原有 Protocol Buffer 2(被称为 proto2)的升级版本,删除了一部分特性,优化了对移动设备的支持。

gRPC 服务

发送到 etcd 服务器的每个 API 请求都是一个 gRPC 远程过程调用。etcd3 中的 RPC 接口定义根据功能分类到服务中。

处理 etcd 键值的重要服务包括:

  • KV 服务,创建、更新、获取和删除键值对。
  • 监视,监视键的更改。
  • 租约,消耗客户端保持活动消息的基元。
  • 锁,etcd 提供分布式共享锁的支持。
  • 选举,暴露客户端选举机制。

image.png

请求和响应

etcd3 中的所有 RPC 都遵循相同的格式。每个 RPC 都有一个函数名,该函数将 NameRequest 作为参数并返回 NameResponse 作为响应。例如,这是 Range RPC 描述:

1
2
3
4
scss复制代码service KV {
Range(RangeRequest) returns (RangeResponse)
...
}

响应头

etcd API 的所有响应都有一个附加的响应标头,其中包括响应的群集元数据:

1
2
3
4
5
6
ini复制代码message ResponseHeader {
uint64 cluster_id = 1;
uint64 member_id = 2;
int64 revision = 3;
uint64 raft_term = 4;
}
  • Cluster_ID - 产生响应的集群的 ID。
  • Member_ID - 产生响应的成员的 ID。
  • Revision - 产生响应时键值存储的修订版本号。
  • Raft_Term - 产生响应时,成员的 Raft 称谓。

应用服务可以通过 Cluster_ID 和 Member_ID 字段来确保,当前与之通信的正是预期的那个集群或者成员。

应用服务可以使用修订号字段来知悉当前键值存储库最新的修订号。当应用程序指定历史修订版以进行时程查询并希望在请求时知道最新修订版时,此功能特别有用。

应用服务可以使用 Raft_Term 来检测集群何时完成一个新的 leader 选举。

下面开始介绍 etcd 中这几个重要的服务和接口。

KV 存储

kv 对象的实例获取通过如下的方式:

1
css复制代码kv  := clientev3.NewKV(client)

我们来看一下 kv 接口的具体定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scss复制代码type KV interface {

Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

// 检索 keys.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

// 删除 key,可以使用 WithRange(end), [key, end) 的方式
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

// 压缩给定版本之前的 KV 历史
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)

// 指定某种没有事务的操作
Do(ctx context.Context, op Op) (OpResponse, error)

// Txn 创建一个事务
Txn(ctx context.Context) Txn
}

从 KV 对象的定义我们可知,它就是一个接口对象,包含几个主要的 kv 操作方法:

kv 存储 put

put 的定义如下:

1
scss复制代码    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

其中的参数:

  • ctx: Context 包对象,是用来跟踪上下文的,比如超时控制
  • key: 存储对象的 key
  • val: 存储对象的 value
  • opts:  可变参数,额外选项

Put 将一个键值对放入 etcd 中。请注意,键值可以是纯字节数组,字符串是该字节数组的不可变表示形式。要获取字节字符串,请执行 string([] byte {0x10,0x20}) 。

put 的使用方法如下所示:

1
css复制代码putResp, err := kv.Put(context.TODO(),"aa", "hello-world!")

kv 查询 get

现在可以对存储的数据进行取值了。默认情况下,Get 将返回 “ key” 对应的值。

1
vbnet复制代码Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

OpOption 为可选的函数传参,传参为 WithRange(end) 时,Get 将返回 [key,end)范围内的键;传参为 WithFromKey() 时,Get 返回大于或等于 key 的键;当通过 rev> 0 传递 WithRev(rev) 时,Get 查询给定修订版本的键;如果压缩了所查找的修订版本,则返回请求失败,并显示 ErrCompacted。 传递 WithLimit(limit) 时,返回的 key 数量受 limit 限制;传参为 WithSort 时,将对键进行排序。对应的使用方法如下:

1
css复制代码getResp, err := kv.Get(context.TODO(), "aa")

从以上数据的存储和取值,我们知道 put 返回 PutResponse、get 返回 GetResponse,注意:不同的 KV 操作对应不同的 response 结构,定义如下:

1
2
3
4
5
6
7
bash复制代码type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)

我们分别来看一看 PutResponse 和 GetResponse 映射的 RangeResponse 结构的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
rust复制代码type PutResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// if prev_kv is set in the request, the previous key-value pair will be returned.
PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
//Header 里保存的主要是本次更新的 revision 信息

type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}

Kvs 字段,保存了本次 Get 查询到的所有 kv 对,我们继续看一下 mvccpb.KeyValue 对象的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vbnet复制代码type KeyValue struct {

Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// create_revision 是当前 key 的最后创建版本
CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"`
// mod_revision 是指当前 key 的最新修订版本
ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"`
// key 的版本,每次更新都会增加版本号
Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`

Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`

// 绑定了 key 的租期 Id,当 lease 为 0 ,则表明没有绑定 key;租期过期,则会删除 key
Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"`
}

至于 RangeResponse.More 和 Count,当我们使用 withLimit() 选项进行 Get 时会发挥作用,相当于分页查询。

接下来,我们通过一个特别的 Get 选项,获取 aa 目录下的所有子目录:

1
css复制代码rangeResp, err := kv.Get(context.TODO(), "/aa", clientv3.WithPrefix())

WithPrefix() 用于查找以 /aa 为前缀的所有 key,因此可以模拟出查找子目录的效果。

我们知道 etcd 是一个有序的 kv 存储,因此 /aa 为前缀的 key 总是顺序排列在一起。

withPrefix 实际上会转化为范围查询,它根据前缀 /aa 生成了一个 key range,[“/aa/”, “/aa0”),这是因为比 / 大的字符是 0,所以以 /aa0 作为范围的末尾,就可以扫描到所有的 /aa/ 打头的 key 了。

KV 操作实践

键值对的操作是 etcd 中最基本、最常用的功能,主要包括读、写、删除三种基本的操作。在 etcd 中定义了 kv 接口,用来对外提供这些操作,下面我们进行具体的测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
go复制代码package client

import (
"context"
"fmt"
"github.com/google/uuid"
"go.etcd.io/etcd/clientv3"
"testing"
"time"
)

func TestKV(t *testing.T) {
rootContext := context.Background()
// 客户端初始化
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 2 * time.Second,
})
// etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
if cli == nil || err == context.DeadlineExceeded {
// handle errors
fmt.Println(err)
panic("invalid connection!")
}
// 客户端断开连接
defer cli.Close()
// 初始化 kv
kvc := clientv3.NewKV(cli)
//获取值
ctx, cancelFunc := context.WithTimeout(rootContext, time.Duration(2)*time.Second)
response, err := kvc.Get(ctx, "cc")
cancelFunc()
if err != nil {
fmt.Println(err)
}
kvs := response.Kvs
// 输出获取的 key
if len(kvs) > 0 {
fmt.Printf("last value is :%s\r\n", string(kvs[0].Value))
} else {
fmt.Printf("empty key for %s\n", "cc")
}
//设置值
uuid := uuid.New().String()
fmt.Printf("new value is :%s\r\n", uuid)
ctx2, cancelFunc2 := context.WithTimeout(rootContext, time.Duration(2)*time.Second)
_, err = kvc.Put(ctx2, "cc", uuid)
// 设置成功之后,将该 key 对应的键值删除
if delRes, err := kvc.Delete(ctx2, "cc"); err != nil {
fmt.Println(err)
} else {
fmt.Printf("delete %s for %t\n", "cc", delRes.Deleted > 0)
}
cancelFunc2()
if err != nil {
fmt.Println(err)
}
}

如上的测试用例,主要是针对 kv 的操作,依次获取 key,即 Get(),对应 etcd 底层实现的 range 接口;其次是写入键值对,即 put 操作;最后删除刚刚写入的键值对。预期的执行结果如下所示:

1
2
3
4
5
6
sql复制代码=== RUN   Test
empty key for cc
new value is: 41e1362a-28a7-4ac9-abf5-fe1474d93f84
delete cc for true
--- PASS: Test (0.11s)
PASS

可以看到,刚开始 etcd 并没有存储键 cc 的值,随后写入新的键值对并测试将其删除。

其他通信接口

其他常用的接口还有 Transaction、Compact、watch、Lease、Lock 等。我们依次看看这些接口的定义。

事务 Transaction

Txn 方法在单个事务中处理多个请求。txn 请求增加键值存储的修订版本并为每个完成的请求生成带有相同修订版本的事件。etcd 不容许在一个 txn 中多次修改同一个 key。 Txn 接口定义如下:

1
scss复制代码rpc Txn(TxnRequest) returns (TxnResponse) {}

Compact 方法

Compact 方法压缩 etcd 键值对存储中的事件历史。键值对存储应该定期压缩,否则事件历史会无限制的持续增长。

1
scss复制代码rpc Compact(CompactionRequest) returns (CompactionResponse) {}

请求的消息体是 CompactionRequest, CompactionRequest 压缩键值对存储到给定修订版本。所有修订版本比压缩修订版本小的键都将被删除

watch

Watch API 提供了一个基于事件的接口,用于异步监视键的更改。etcd3 监视程序通过从给定的修订版本(当前版本或历史版本)持续监视 key 更改,并将 key 更新流回客户端。

在 rpc.proto 中 Watch service 定义如下:

image.png

1
2
3
scss复制代码service Watch {
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {}
}

Watch 观察将要发生或者已经发生的事件。输入和输出都是流;输入流用于创建和取消观察,而输出流发送事件。一个观察 RPC 可以在一次性在多个 key 范围上观察,并为多个观察流化事件。整个事件历史可以从最后压缩修订版本开始观察。
WatchService 只有一个 Watch 方法。

Lease service

Lease service 提供租约的支持。Lease 是一种检测客户端存活状况的机制。群集授予具有生存时间的租约。如果 etcd 群集在给定的 TTL 时间内未收到 keepAlive,则租约到期。

为了将租约绑定到键值存储中,每个 key 最多可以附加一个租约。当租约到期或被撤销时,该租约所附的所有 key 都将被删除。每个过期的密钥都会在事件历史记录中生成一个删除事件。

在 rpc.proto 中 Lease service 定义的接口如下:

1
2
3
4
5
6
7
8
9
10
scss复制代码service Lease {

rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) {}

rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) {}

rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {}

rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) {}
}
  • LeaseGrant 创建一个租约
  • LeaseRevoke 撤销一个租约
  • LeaseKeepAlive 用于维持租约
  • LeaseTimeToLive 获取租约信息

Lock service

Lock service 提供分布式共享锁的支持。Lock service 以 gRPC 接口的方式暴露客户端锁机制。在 v3lock.proto 中 Lock service 定义如下:

1
2
3
4
5
6
scss复制代码service Lock {

rpc Lock(LockRequest) returns (LockResponse) {}

rpc Unlock(UnlockRequest) returns (UnlockResponse) {}
}
  • Lock 方法,在给定命令锁上获得分布式共享锁。
  • Unlock 使用 Lock 返回的 key 并释放对锁的持有。

小结

这篇文章主要介绍了 etcd 的 gRPC 通信接口以及 clientv3 客户端的实践,主要包括键值对操作(增删改查)、watch、Lease、锁和 Compact 等接口。通过对客户端 API 通信接口的学习,了解 etcd 客户端的使用以及常用功能的接口定义,对于我们在日常工作中能够得心应手的使用 etcd 实现相应的功能能够很有帮助。

了解更多关于 etcd 的原理与实践,欢迎支持我的新书 **《etcd工作笔记:架构分析、优化与最佳实践》 ,现已完成印刷,已经登陆各大网上商城。**

etcd图书.jpg

欢迎各位同学批评指正,购买链接:item.jd.com/12990887.ht…
感谢支持。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Laravel本地开发环境选择Homestead还是Vale

发表于 2021-10-22

小知识,大挑战!本文正在参与「程序员必备小知识」创作活动

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

首先这两个开发环境我都用过,而且都比较好用,目前我使用的 Valet ,因为 Valet 更轻量级,开箱即用,简单高效。

Valet 还是 Homestead

Laravel 提供了两种本地开发环境 Homestead 和 Valet。 Homestead 和 Valet 的不同之处在于目标受众和本地开发的方式。

Homestead 提供了一个完整的、具有自动化的 Nginx 配置的 Ubuntu 虚拟机。如果你想要完全虚拟化的 Linux 开发环境或 Windows/Linux,Homestead 是一个不错的选择。

Valet 仅支持 Mac,并要求你将 PHP 和数据库服务器直接安装到本地机器上。

这可以很容易地通过使用 Homebrew 相关命令来实现,比如 brew install php72 和 brew install mysql。

Valet 提供了一个极快的、资源消耗最少本地开发环境,非常适合只需要 PHP/MySQL 并且不需要虚拟开发环境的开发人员。

Valet 和 Homestead 都是配置 Laravel 开发环境的绝佳选择。选择哪一个仅仅取决于个人喜好和团队的需求。

我的选择

没错,我用 Windows 开发时使用的 Homestead ,用Mac开发之后就改用 Valet 了。

Valet 简介

Valet 是 Mac 极简主义者的 Laravel 开发环境。没有 Vagrant,不需要配置 /etc/hosts 文件。甚至可以使用本地隧道公开共享你的站点。

Laravel Valet 为您的 Mac 设置了开机后始终在后台运行 Nginx 服务。Valet 使用 DnsMasq 将所有指向安装在本地的计算机站点请求代理到 *.test 结尾的域名上。

简单高效的完美诠释:一个速度极快的 Laravel 开发环境只占用 7MB 内存。

Valet 并不是想要完全替换 Vagrant 或 Homestead,只是提供另外一种使用起来更加灵活、方便、以及内存占用更小的选择。

Valet 支持但不局限于以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
css复制代码Laravel
Lumen
Bedrock
CakePHP 3
Concrete5
Contao
Craft
Drupal
Jigsaw
Joomla
Katana
Kirby
Magento
OctoberCMS
Sculpin
Slim
Statamic
Static HTML
Symfony
WordPress
Zend

Valet的安装

Valet 需要 macOS 系统和 Homebrew。在安装之前,要确保没有其它程序(如 Apache 或 Nginx)占用了本地机器的 80 端口。

  • 使用 brew update 将 Homebrew 安装或更新到最新版本。
  • 使用 Homebrew 的 brew install php 命令安装 PHP 7.3。
  • 安装 Composer.
  • 使用 Composer 的 composer global require laravel/valet 命令安装 Valet。并确保 ~/.composer/vendor/bin 目录在系统的 “PATH” 中。
  • 运行 valet install 命令来配置和安装 Valet 和 DnsMasq,并注册 Valet 后台服务,随系统运行自行启动。

安装完 Valet 后,可以尝试使用类似 ping xxx.test 的命令在终端上 ping 任何一个 *.test 的域名。如果 Valet 安装正确,可以在终端上看到来自 127.0.0.1 的响应。

每次机器启动时,Valet 会自动启动其进程。所以只要完成了 Valet 的初始化,就无需再次运行 valet start 或 valet install。

如何使用其它域名

默认情况下,Valet 使用 .test 顶级域名为你的项目提供服务。如果你想使用其它域名,可以使用 valet tld tld-name 命令。

例如,如果你想使用 .xyz 来代替 .test,运行 valet tld xyz,Valet 会自动将站点顶级域名改为 *.xyz 。

数据库

如果你要使用数据库,请在终端运行 brew install mysql@5.7 安装 MySQL。

一旦安装完成,你可以使用 brew services start mysql@5.7 命令启动 MySQL。

然后,你可以使用 root 用户名和空字符串密码连接到 127.0.0.1 的数据库。

指定 PHP 版本

Valet 允许你使用 valet use php@version 命令来切换 PHP 版本。

如果指定版本尚未安装,Valet 将通过 Brew 来安装指定的 PHP 版本:

1
2
3
perl复制代码valet use php@7.2

valet use php

Valet的升级

可以在终端使用 composer global update 命令来更新 Valet。

升级后,如有需要,最好再次运行 valet install ,以便 Valet 对配置文件进行升级。

升级到 Valet 2.0

Valet 2.0 将 Valet 底层的 Web 服务器从 Caddy 转移到 Nginx。

升级到此版本之前,你应该运行以下命令停止并卸载现有的 Caddy 进程:

1
2
rust复制代码valet stop
valet uninstall

接下来,就根据你采用的安装方式来升级 Valet (通常是通过 Git 或 Composer )。

如果是通过 Composer 安装了 Valet ,则应使用以下命令更新到最新的主要版本:

1
javascript复制代码composer global require laravel/valet

如果更新了 Valet 的源码,应该执行 install 命令:

1
2
rust复制代码valet install
valet restart

升级过后,可能需要重新设置或重新链接你的站点。

服务站点

安装了 Valet 之后,就可以开始设置站点。

Valet 提供两个命令来为 Laravel 的站点提供服务:park 和 link。

park 命令

运行 mkdir ~/Sites 命令在 Mac 上创建一个新的目录。

接下来,运行 cd ~/Sites 和 valet park 将当前的工作目录作为 Valet 搜索站点的路径。

接下来,在这个目录中创建一个新的 Laravel 站点,比如:laravel new blog。

在浏览器中打开 blog.test

就这么简单,在 『parked』的目录中创建的任何 Laravel 项目都将自动使用这种方式访问: folder-name.test

link 命令

如果要在目录中提供单个站点而不是整个目录,就使用 link 命令。

要使用该命令,先在终端里切换到你的某个项目并运行 valet link app-name。

Valet 会在 ~/.config/valet/Sites 中创建一个符号链接指向当前的目录。

运行 link 命令后,你可以在浏览器通过这种方式访问站点: app-name.test

运行 valet links 命令可以查看所有目录链接的列表。你还可以使用 valet unlink app-name 来删除符号链接。

注意:可以使用 valet link 将多个(子)域名指向同一个应用。要添加子域名或其它域名到应用,可以在应用目录下运行 valet link subdomain.app-name。

使用 TLS 保护站点

默认情况下,Valet 服务站点通过纯 HTTP 的方式。

但是,如果想一个站点使用 HTTP/2 通过加密 TLS 提供服务,使用 secure 命令。

例如,如果站点通过 Valet 在 laravel.test 域上提供服务,你应当运行如下的命令去保护它:

1
rust复制代码valet secure laravel

要一个站点『解除保护』并恢复为通过纯 HTTP 提供服务它的流量,使用 unsecure 命令。

与 secure 命令一样,该命令接受你希望去解除保护的主机名称:

1
rust复制代码valet unsecure laravel

总结

好了,上述就是 Valet 的基础功能,是不是非常简单高效。

上述这些功能已经能够满足我们在本地开发了,当然 Valet 支持的功能不止这些,还包括:

共享站点、配置网站的环境变量、自定义 Valet 驱动、进程相关操作等等。

这些进阶的内容下一篇给大家整理出来。

相关推荐

同时墙裂安利这篇优质爆文,感谢大家支持:
Git使用实战:多人协同开发,紧急修复线上bug的Git操作指南。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

扯一把 Spring 的三种注入方式,到底哪种注入方式最佳?

发表于 2021-10-22

@[toc]
循环依赖这个问题,按理说我们在日常的程序设计中应该避免,其实这个本来也是能够避免的。不过由于总总原因,我们可能还是会遇到一些循环依赖的问题,特别是在面试的过程中,面试考察循环依赖,主要是想考察候选人对 Spring 源码的熟悉程度,因为要把循环依赖这个问题解释清楚,涉及到不少 Spring 源码。

今天松哥抽空和大家简单聊聊这个话题,问题比较庞大,我可能花几篇文章来和大家分享下,今天先来聊聊实例的注入方式。

  1. 实例的注入方式

首先来看看 Spring 中的实例该如何注入,总结起来,无非三种:

  • 属性注入
  • set 方法注入
  • 构造方法注入

我们分别来看下。

1.1 属性注入

属性注入是大家最为常见也是使用最多的一种注入方式了,代码如下:

1
2
3
4
5
6
java复制代码@Service
public class BService {
@Autowired
AService aService;
//...
}

这里是使用 @Autowired 注解注入。另外也有 @Resource 以及 @Inject 等注解,都可以实现注入。

不过不知道小伙伴们有没有留意过,在 IDEA 里边,使用属性注入,会有一个警告⚠️:

不推荐属性注入!

原因我们后面讨论。

1.2 set 方法注入

set 方法注入太过于臃肿,实际上很少使用:

1
2
3
4
5
6
7
8
9
java复制代码@Service
public class BService {
AService aService;

@Autowired
public void setaService(AService aService) {
this.aService = aService;
}
}

这代码看一眼都觉得难受,坚决不用。

1.3 构造方法注入

构造方法注入方式如下:

1
2
3
4
5
6
7
8
java复制代码@Service
public class AService {
BService bService;
@Autowired
public AService(BService bService) {
this.bService = bService;
}
}

如果类只有一个构造方法,那么 @Autowired 注解可以省略;如果类中有多个构造方法,那么需要添加上 @Autowired 来明确指定到底使用哪个构造方法。

  1. 实例注入方式大 PK

上面给大家列出来了三种注入方式,那么三种注入方式各自有何区别呢?

结合 Spring 官方文档,我们来分析下。

松哥翻出了 12 年前的 Spring3.0 的文档(docs.spring.io/spring-fram…

我来简单翻译下(意译):

使用构造方法注入还是使用 set 方法注入?
由于构造方法注入和 set 方法注入可以混合使用,因此,如果需要强制注入,我们可以使用构造方法注入的方式;如果是可选注入,则我们可以使用 set 方法注入的方式。当然,我们在 setter 上使用 @Required 注解可以让 set 方法注入也变为强制性注入。
Spring 团队通常提倡 setter 注入,因为当属性特别多的时候,构造方法看起来会特别臃肿,特别是当属性是可选的时(属性可选意味着没必要通过构造方法注入)。Setter 方法注入还有一个好处就是可以使该类的属性可以在以后重新配置或重新注入。
一些纯粹主义者喜欢基于构造函数的注入,这样意味着所有的属性都被初始化了,缺点则是对象变得不太适合重新配置和重新注入。
另外在一些特殊的场景下,如一个第三方类要注入到 Spring 容器,但是该类没有提供 set 方法,那么此时你就只能使用构造方法注入了。

英文水平有限,大概翻译了下。小伙伴们重点看加粗部分,也就是说在 Spring3.0 时代,官方还是提倡 set 方法注入的。

不过从 Spring4.x 开始,官方就不推荐这种注入方式了,转而推荐构造器注入。

我们来看看 Spring4.x 的文档怎么说(docs.spring.io/spring-fram…

这段内容我就不一一翻译了,大家重点看第二段第一句:

The Spring team generally advocates constructor injection

这句话就是说 Spring 团队倡导通过构造方法完成注入。才一个大版本更新,Spring 咋就变了呢?别急,人家也给出用构造方法注入的理由,第二段翻译一下大概是这个意思:

通过构造方法注入的方式,能够保证注入的组件不可变,并且能够确保需要的依赖不为空。此外,构造方法注入的依赖总是能够在返回客户端(组件)代码的时候保证完全初始化的状态。

上面这段话主要说了三件事:

  1. 依赖不可变:这个好理解,通过构造方法注入依赖,在对象创建的时候就要注入依赖,一旦对象创建成功,以后就只能使用注入的依赖而无法修改了,这就是依赖不可变(通过 set 方法注入将来还能通过 set 方法修改)。
  2. 依赖不为空:通过构造方法注入的时候,会自动检查注入的对象是否为空,如果为空,则注入失败;如果不为空,才会注入成功。
  3. 完全初始化:由于获取到了依赖对象(这个依赖对象是初始化之后的),并且调用了要初始化组件的构造方法,因此最终拿到的就是完全初始化的对象了。

在 Spring3.0 文档中,官方说如果构造方法注入的话,属性太多可能会让代码变得非常臃肿,那么在 4.0 文档中,官方对这个说法也做了一些订正:如果用构造方法注入的时候,参数过多以至于代码过于臃肿,那么此时你需要考虑这个类的设计是否合理,这个类是否参杂了太多的其他无关功能,这个类是否做到了单一职责。

好吧,你说的总是有理!

这是构造方法注入和 set 方法注入的问题,那么上面我们还提到不推荐属性注入,这又是咋回事呢?

属性注入其实有一个显而易见的缺点,那就是对于 IOC 容器以外的环境,除了使用反射来提供它需要的依赖之外,无法复用该实现类。因为该类没有提供该属性的 set 方法或者相应的构造方法来完成该属性的初始化。换言之,要是使用属性注入,那么你这个类就只能在 IOC 容器中使用,要是想自己 new 一下这个类的对象,那么相关的依赖无法完成注入。

以上分析都是根据 Spring 官方文档得来,日常开发应该还是属性注入较多,这个咱们不必纠结,代码该咋写还咋写,Spring 官方的态度了解一下即可,当然,如果项目允许,也不妨试试 Spring 推荐的代码规范。

  1. 小结

好啦,今天就和小伙伴们随便扯扯 Spring 中的注入方式,因为我最近又要重新捡起 Spring 源码分析了,所以先来个简单的预热一下哈哈~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

netty系列之 使用netty实现支持http2的服务器

发表于 2021-10-22

小知识,大挑战!本文正在参与「程序员必备小知识」创作活动

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

「欢迎在评论区讨论,掘金官方将在掘力星计划活动结束后,在评论区抽送100份掘金周边,抽奖详情见活动文章」。

简介

上一篇文章中,我们提到了如何在netty中配置TLS,让他支持HTTP2。事实上TLS并不是https的一个必须要求,它只是建议的标准。那么除了TLS之外,还需要如何设置才能让netty支持http2呢?一起来看看吧。

基本流程

netty支持http2有两种情况,第一种情况是使用tls,在这种情况下需要添加一个ProtocolNegotiationHandler来对握手之后的协议进行协商,在协商之后,需要决定到底使用哪一种协议。

上一篇文章,我们已经介绍TLS支持http2的细节了,这里不再赘述,感兴趣的朋友可以查看我之前的文章。

如果不使用tls,那么有两种情况,一种是直接使用http1.1了,我们需要为http1.1添加一个ChannelInboundHandler即可。

另一种情况就是使用clear text从HTTP1.1升级到HTTP2。

HTTP/2 ClearText也叫做h2c,我们看一个简单的升级请求,首先是客户端请求:

1
2
3
4
5
makefile复制代码GET /index HTTP/1.1
Host: server.flydean.com
Connection: Upgrade, HTTP2-Settings
Upgrade: h2c
HTTP2-Settings: (SETTINGS payload)

然后是服务器端的响应,如果服务器端不支持升级,则返回:

1
2
3
4
5
6
bash复制代码
HTTP/1.1 200 OK
Content-length: 100
Content-type: text/html

(... HTTP/1.1 response ...)

如果服务器支持升级,则返回:

1
2
3
4
5
makefile复制代码HTTP/1.1 101 Switching Protocols 
Connection: Upgrade
Upgrade: h2c

(... HTTP/2 response ...)

CleartextHttp2ServerUpgradeHandler

有了上面的基本流程,我们只需要在netty中提供对应的handler类就可以解决netty对http2的支持了。

不过上面的升级流程看起来比较复杂,所以netty为我们提供了一个封装好的类:CleartextHttp2ServerUpgradeHandler来实现h2c的功能。

这个类需要传入3个参数,分别是HttpServerCodec、HttpServerUpgradeHandler和ChannelHandler。

HttpServerCodec就是处理http server的编码类,一般我们使用HttpServerCodec。

HttpServerUpgradeHandler是从http1.1升级到http2的处理类。

netty也提供了一个现成的类:HttpServerUpgradeHandler,来处理升级的编码。

HttpServerUpgradeHandler需要两个参数,一个是sourceCodec,也就是http原始的编码类HttpServerCodec,一个是用来返回UpgradeCodec的工厂类,返回netty自带的Http2ServerUpgradeCodec。

1
2
3
scss复制代码    public HttpServerUpgradeHandler(SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory) {
this(sourceCodec, upgradeCodecFactory, 0);
}

ChannelHandler是真正处理HTTP2的handler,我们可以根据需要对这个handler进行自定义。

有了UpgradeHandler,将其加入ChannelPipeline即可。

Http2ConnectionHandler

不管是HttpServerUpgradeHandler,还是CleartextHttp2ServerUpgradeHandler,都需要传入一个真正能够处理http2的handler。这个handler就是Http2ConnectionHandler。

Http2ConnectionHandler是一个实现类,它已经实现了处理各种inbound frame events的事件,然后将这些事件委托给 Http2FrameListener。

所以Http2ConnectionHandler需要跟Http2FrameListener配合使用。

这里要详细讲解一下Http2FrameListener,它主要处理HTTP2 frame的各种事件。

先来看下http2FrameListener中提供的event trigger方法:

从上图可以看到,主要是各种frame的事件触发方法,其中http2中有这样几种frame:

  • DATA frame
  • HEADERS frame
  • PRIORITY frame
  • RST_STREAM frame
  • SETTINGS acknowledgment frame
  • SETTINGS frame
  • PING frame
  • PING acknowledgment
  • PUSH_PROMISE frame
  • GO_AWAY frame
  • WINDOW_UPDATE frame
  • Unknown Frame

这几种frame基本上列举了http2 frame中所有的类型。

我们要做的就是自定义一个handler类,继承Http2ConnectionHandler,然后实现Http2FrameListener接口即可。

1
scala复制代码    public final class CustHttp2Handler extends Http2ConnectionHandler implements Http2FrameListener

在使用clear text从HTTP1.1升级到HTTP2的过程中,我们需要处理两个事情,第一个事情就是处理http1.1使用http头升级到http2,可以重写继承自Http2ConnectionHandler的userEventTriggered方法,通过判断event的类型是否是UpgradeEvent,来触发对应的Http2FrameListener接口中的方法,比如这里的onHeadersRead:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码
/**
* 处理HTTP upgrade事件
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent) {
HttpServerUpgradeHandler.UpgradeEvent upgradeEvent =
(HttpServerUpgradeHandler.UpgradeEvent) evt;
onHeadersRead(ctx, 1, upgradeToHttp2Headers(upgradeEvent.upgradeRequest()), 0 , true);
}
super.userEventTriggered(ctx, evt);
}

upgradeToHttp2Headers方法将传入的FullHttpRequest,转换成为Http2Headers:

1
2
3
4
5
6
7
8
9
10
11
scss复制代码    private static Http2Headers upgradeToHttp2Headers(FullHttpRequest request) {
CharSequence host = request.headers().get(HttpHeaderNames.HOST);
Http2Headers http2Headers = new DefaultHttp2Headers()
.method(HttpMethod.GET.asciiName())
.path(request.uri())
.scheme(HttpScheme.HTTP.name());
if (host != null) {
http2Headers.authority(host);
}
return http2Headers;
}

还有一个要实现的方法,就是sendResponse方法,将数据写回给客户端,回写需要包含headers和data两部分,如下所示:

1
2
3
4
5
6
7
8
scss复制代码    /**
* 发送响应数据到客户端
*/
private void sendResponse(ChannelHandlerContext ctx, int streamId, ByteBuf payload) {
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
encoder().writeData(ctx, streamId, payload, 0, true, ctx.newPromise());
}

总结

到此,一个处理clear text从HTTP1.1升级到HTTP2的handler就做好了。加上之前讲解的TLS扩展协议的支持,就构成了一个完整的支持http2的netty服务器。

本文的例子可以参考:learn-netty4

本文已收录于 www.flydean.com/27-netty-ht…

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

很多教程中的Spring Security JWT 过滤器的

发表于 2021-10-22

小知识,大挑战!本文正在参与“ 程序员必备小知识 ”创作活动

本文同时参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金

最近有个同学提了个问题,说他在Spring Security中用JWT做退出登录的时无法获取当前用户,导致无法证明“我就是要退出的那个我”,退出失败!经过我一番排查找到了原因,而且这个错误包括我自己在内的大部分人都犯过。

Session会话

之所以要说Session会话,是因为Spring Security默认配置就是有会话的,所以当你登录以后Session就会由服务端保持直到你退出登录。只要Session保持住,你的请求只要进入服务器就可以从ServletRequest中获取到当前的HttpSession,然后会根据HttpSession来加载当前的SecurityContext。相关的逻辑在Spring Security默认的过滤器SecurityContextPersistenceFilter中,有兴趣可以看相关的源码。

而且默认情况下SecurityContextPersistenceFilter的优先级是高于退出过滤器LogoutFilter的,所以能够保证有Session会话的情况下退出一定能够获取当前用户。

无Session会话

使用了JWT后,每次请求都要携带Bearer Token并且被专门的过滤器拦截解析之后才能将用户认证信息保存到SecurityContext中去。参考Spring Security实战干货教程中的Token认证实现JwtAuthenticationFilter,相关逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码 // 当token匹配         
 if (jwtToken.equals(accessToken)) {
                   // 解析 权限集合 这里
                 JSONArray jsonArray = jsonObject.getJSONArray("roles");
                 List<String> roles = jsonArray.toList(String.class);
                 String[] roleArr = roles.toArray(new String[0]);
 ​
                 List<GrantedAuthority> authorities = AuthorityUtils.createAuthorityList(roleArr);
                 User user = new User(username, "[PROTECTED]", authorities);
                 // 构建用户认证token
                 UsernamePasswordAuthenticationToken usernamePasswordAuthenticationToken = new UsernamePasswordAuthenticationToken(user, null, authorities);
                 usernamePasswordAuthenticationToken.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));
                 // 放入安全上下文中
                 SecurityContextHolder.getContext().setAuthentication(usernamePasswordAuthenticationToken);
            } else {
                 // token 不匹配
                 if (log.isDebugEnabled()){
                     log.debug("token : {} is not in matched", jwtToken);
                }
 ​
                 throw new BadCredentialsException("token is not matched");
            }

为什么退出登录无法获取当前用户

分析了两种情况下用户认证信息的安全上下文配置后,我们回到问题的本身。来看看为什么用JWT会出现无法获取当前认证信息的原因。在HttpSecurity中,那位同学是这样配置JwtAuthenticationFilter的顺序的:

1
java复制代码   httpSecurity.addFilterBefore(jwtAuthenticationFilter, UsernamePasswordAuthenticationFilter.class)

我们再看看Spring Security过滤器排序图:

Spring Security过滤器排序

也就说LogoutFilter执行退出的时候,JWT还没有被JwtAuthenticationFilter拦截,当然无法获取当前认证上下文SecurityContext。

解决方法

解决方法就是必须在LogoutFilter执行前去解析JWT并将成功认证的信息存到SecurityContext。我们可以这样配置:

1
java复制代码   httpSecurity.addFilterBefore(jwtAuthenticationFilter, LogoutFilter.class)

这样当使用携带JWT调用退出登录时,会先把用户的信息放到SecurityContext中,就不会出现无法获取认证信息的问题了。Keycloak的Spring Security适配器就是这么配置的。不过也有同学认为无状态的退出直接让JWT失效就可以了,不必这么麻烦。对此你怎么看,欢迎留言讨论。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SignalR 从开发到生产部署闭坑指南

发表于 2021-10-22

前天倒腾了一份[SignalR在react/go技术栈的实践], 步骤和思路大部分是外围框架的应用, 今天趁热打铁, 给一个我总结的SignalR避坑指南。

1.SignalR 默认协商

不管是.NET客户端还是JavaScript客户端,构建连接时都存在一个默认配置:SkipNegotiation=fasle,负负得正就等于要求协商,这个默认配置的完整含义是 建立SignalR连接时,客户端要求协商传输方式。

对应产生下图:

小技巧:如果你确定你的网络环境能稳定的走websocket传输, 为了快速建立实时通信,可跳过协商请求(设置SkipNegotiation=true), 毕竟每次刷新页面,react组价都会重新加载,重新协商再传输 费时费力。

1
2
3
4
5
6
7
8
9
php复制代码 const connection = new HubConnectionBuilder()
.withUrl(process.env.REACT_APP_APIBASEURL+"realtime", {
skipNegotiation: true,
transport: HttpTransportType.WebSockets
})
.withAutomaticReconnect()
.withHubProtocol(new JsonHubProtocol())
.configureLogging(LogLevel.Information)
.build();

注意: SkipNegotiation=true,仅限于客户端的传输方式指定为 websocket, 其他方式均会报错。

2.SignalR 传输协商是fetch请求

跟ajax一样,fetch请求也是浏览器脚本的一种,所以很明显也会涉及跨域,标准的CORS方案依然对其有效。

http://localhost:9598/realtime/negotiate?negotiateVersion=1

Post请求

有自定义的请求头 X-Requested-With, X-Signalr-User-Agent

很明显,这又会触发预检Option请求

故你还需要在使用 CORS Middleware时允许这几个自定义请求头。

1
2
3
4
5
6
7
8
9
10
11
12
php复制代码  // 下面是Go github.com/rs/cors package 支持CORS的代码

c := cors.New(cors.Options{
// AllowedOrigins: []string{"http://localhost:3000","http://rosenbridge.17usoft.com"},
AllowOriginFunc: func(origin string) bool {
return true
},
AllowedMethods: []string{"POST", "GET", "OPTIONS", "PUT", "DELETE"}, // 下面要加上signalr传输协商要用到的自定义请求头
AllowedHeaders: []string{"Content-Type", "x-requested-with", "x-signalr-user-agent"},
AllowCredentials: true,
Debug: cfg.Log.Debug,
})
  1. websocket也有同源限制

ws://localhost:9598/realtime?id=aoSD_WZhqbRfPyXVTYsHig==

WebSocket也有同源限制,但是标准的CORS对其无效,因为CORS解决是HTTP脚本请求的跨域问题,WebSocket说到底不算http协议。

浏览器依旧会为我们携带Origin标头,所以服务端需要验证这些标头,确保只允许来自预期来源的WebSocket。

1
2
3
4
5
6
7
8
9
10
ini复制代码// 以下是.NET Core 针对websocket同源限制做出的跨域策略

var webSocketOptions = new WebSocketOptions()
{
KeepAliveInterval = TimeSpan.FromSeconds(120),
};
webSocketOptions.AllowedOrigins.Add("https://client.com");
webSocketOptions.AllowedOrigins.Add("https://www.client.com");

app.UseWebSockets(webSocketOptions);

btw, 我使用的GO SignalR库不支持WebSocket跨域, 我提了一个PR, 已经成功合并,兴奋,这是我首次向开源项目提MR且获得通过的项目。

  1. 部署生产后,需要nginx支持

按照默认配置,一般会先协商,再使用websocket传输。

部署到生产之后,协商后优先使用WebSocket模式, 但是传输失败了, 自动降级为服务器发送事件SSE模式,传输成功。

浏览器开发者工具看不出啥端倪, 使用Fiddler抓包发现 400 状态码

网上搜索了一下,可能是生产的nginx不识别websocket标头。
在nginx配置里面添加如下配置就可以了

1
2
3
4
5
ini复制代码location / {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}

以上是马甲哥整理的SignalR从开发到部署的闭坑指南,因为微信公众号内容发布后不方便重新编辑,后续有更多闭坑技能,会同步到这里。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

华为云数据库内核专家为您揭秘MySQL Volcano模型迭

发表于 2021-10-22

​​摘要:性能提升1400+倍,快来看MySQL Volcano模型迭代器的谓词位置优化详解。

本文分享自华为云社区《华为云数据库内核专家为您揭秘MySQLVolcano模型迭代器性能提升千倍的秘密》,作者:GaussDB 数据库 。

20年以上数据库内核研发经验。原IBMDB2数据库内核专家,专长数据库内核性能优化、SQL查询优化、MPP分布式数据仓库技术等。现就职于华为加拿大研究所,全程参与了RDS for MySQL以及GaussDB(for MySQL)的研发工作,熟悉GaussDB(for MySQL) 全栈技术。负责NDP的总体架构设计和实现,并成功落地上线。拥有多项技术发明专利,并co-author了SIGMOD 2020 Taurus( GaussDB(forMySQL)) Paper,目前专注于下一代云数据库智能优化器的研究。

一、背景介绍

MySQL 8.0.18引入了一个新的SQL执行引擎,它遵循了Volcano模型。该模型的关键思想是将所有操作建模为“迭代器”。迭代器提供基本迭代组件:初始化、迭代和终止。所有迭代器都提供如以上相同的接口,因此迭代器可以任意组合堆叠在一起,形成执行计划。

MySQL 8.0.18还包括一个新的连接方法:哈希连接。哈希连接有探测端和构建端。哈希表是使用构建端的连接列作为哈希键值构建的;然后使用探测端的连接列来查找哈希表中的匹配行。

关于Volcano模型和哈希连接的细节不在本文的范围内。本文重点讨论一个问题,即哈希连接的谓词没有附加到合适的哈希连接迭代器,该问题可能会导致严重的性能下降。请注意:这不是一个功能问题,因为尽管有显著的性能下降,但最终的查询结果是正确的。

华为云数据库内核专家林舒向MySQL官方提交了此错误报告,以及对应的补丁,具体信息请参考这里:bugs.mysql.com/?id=104760。…

本文中的查询在MySQL 8.0.26上测试,使用100MB的TPC-H数据库。为了说明问题,在查询语句中使用了索引提示(Index Hint)来促使SQL优化器选择哈希连接。

二、问题描述

该问题使用的查询及其执行计划如下:

问题查询:

1
2
3
4
5
sql复制代码explain format=tree
select avg(case ps_partkey when null then 1 else l_quantity end)
from lineitem left outer join
( partsupp ignore index (primary) join part ignore index (primary) on ps_partkey = p_partkey and p_name like '%snow%')
on ps_suppkey = l_suppkey and ps_partkey = l_partkey

执行计划:

1
2
3
4
5
6
7
8
9
sql复制代码-> Aggregate: avg((case partsupp.PS_PARTKEY when NULL then 1 else lineitem.L_QUANTITY end))  (cost=179829230844583.70 rows=899131908749360)
-> Left hash join (part.P_PARTKEY = lineitem.L_PARTKEY), (partsupp.PS_PARTKEY = lineitem.L_PARTKEY), (partsupp.PS_SUPPKEY = lineitem.L_SUPPKEY) (cost=89916039969647.67 rows=899131908749360)
-> Table scan on lineitem (cost=61043.00 rows=596410)
-> Hash
-> Filter: (part.P_NAME like '%snow%') (cost=2849092735.78 rows=1507573496)
-> Inner hash join (no condition) (cost=2849092735.78 rows=1507573496)
-> Table scan on partsupp (cost=0.23 rows=77726)
-> Hash
-> Table scan on part (cost=0.01 rows=19396)

注意以上查询计划的内哈希连接(Inner hash join)部分,partsupp和part之间的连接是“无条件”的,而谓词(part.P_NAME like“%snow%”)是在内哈希连接完成之后才被应用来过滤结果集的。观察原始查询语句,我们会注意到,partsupp和part之间存在一个连接谓词(ps_partkey = p_partkey),这个谓词去哪里了呢?它隐含在外层的左哈希连接(Left hash join)的连接谓词中,即 (part.P_PARTKEY =lineitem.L_PARTKEY)和(partupp.PS_PartKEY=lineitem.L_PartKEY)这两个谓词描述里。在内哈希连接中缺少谓词会导致性能问题,因为它使得有谓词的连接操作被替换为笛卡尔积,由于缺少连接条件进行过滤,结果集会被放大。此外,本地谓词(part.P_NAME like“%snow%”),可以在内哈希连接之前被应用,提前过滤掉无效的行。

三、原因分析

这个问题发生的场景是,一个语句使用了外连接,而这个外连接选用哈希连接来完成,并且这个外连接的一部分涉及到了多于一张表。当这些条件同时具备,就可能会引发此问题。

在哈希连接中,其构建端是无法访问探测端上的任何列,反之亦然。一个引用了双端的谓词只能放置在哈希连接迭代器上。当MySQL优化器为表分配谓词时,它假定谓词可以引用表前面出现的所有表,然而这显然不适用于哈希连接。因此,当优化器的计划转换为迭代器计划时,优化器到迭代器的转换代码需要做出额外的补救,现在MySQL的转换代码中缺少相关的处理,导致了本文的问题。

四、如何修复

针对这个问题的修复主要专注在优化器结构到迭代器转换代码中的主函数:ConnectJoins()。基本的思路是让该函数知道哪些表在当前不是可用的,因为这些表位于哈希连接的另一边。当函数将谓词放置在迭代器上时,尚未应用的谓词将沿着迭代器向上推,并在所需的表可用后立即被应用。

以下是在MySQL 8.0.26之上应用修复后的执行计划。partupp和part之间的内哈希连接现在有一个连接谓词:partupp.PS_PARTKEY=part.P_KEY。另外 ,本地谓词(part.P_NAMElike“%snow%”)现在出现在内部连接下面,也就是先于内哈希连接而被应用。

1
2
3
4
5
6
7
8
9
sql复制代码-> Aggregate: avg((case partsupp.PS_PARTKEY when NULL then 1 else lineitem.L_QUANTITY end))  (cost=179829230844583.70 rows=899131908749360)
-> Left hash join (part.P_PARTKEY = lineitem.L_PARTKEY), (partsupp.PS_SUPPKEY = lineitem.L_SUPPKEY) (cost=89916039969647.67 rows=899131908749360)
-> Table scan on lineitem (cost=61043.00 rows=596410)
-> Hash
-> Inner hash join (partsupp.PS_PARTKEY = part.P_PARTKEY) (cost=2849092735.78 rows=1507573496)
-> Table scan on partsupp (cost=0.23 rows=77726)
-> Hash
-> Filter: (part.P_NAME like '%snow%') (cost=0.01 rows=19396)
-> Table scan on part (cost=0.01 rows=19396)

下面是应用补丁前后的查询时间比较:打补丁前,查询耗时需要11分37秒

打补丁后,查询耗时仅需0.49秒

11分37秒 vs 0.49秒,修改前后的性能差距有1400多倍,区别是巨大的。希望这个问题可以在下一个MySQL版本中得到解决。

我们知道,MySQL社区的发展离不开每个数据库领域从业人员的努力,华为云GaussDB也一直重视开源社区的发展,积极对社区版本进行优化和改进,为社区做贡献。本次MySQLVolcano模型迭代器的谓词位置优化,助力MySQL查询性能提升千倍,正是华为云GaussDB 对社区发展的积极反馈。

另外,告诉大家一个好消息,华为云数据库专场活动正在进行,云数据库MySQL包年19.9元起,助力企业无忧上云,更多活动详情戳:activity.huaweicloud.com/dbs_Promoti…

参考资料:

[1] G. Graefe, “Volcano— An Extensible andParallel Query Evaluation System,” IEEE Transactions on Knowledge and DataEngineering, pp. 120-135, 1994.

[2] “WL#11785: Volcano iteratordesign,” [Online]. Available:dev.mysql.com/worklog/tas….

[3] “WL#12074: Volcano iterator executorbase,” [Online]. Available: dev.mysql.com/worklog/tas….

[4] “WL#12470: Volcano iteratorsemijoin,” [Online]. Available:dev.mysql.com/worklog/tas….

[5] “Hash Join Optimization,” [Online].Available: dev.mysql.com/doc/refman/….

[6] “WL#2241: Hash join,” [Online].Available: dev.mysql.com/worklog/tas….

“TPC-HHomepage,” [Online]. Available: www.tpc.org/tpch/.

点击关注,第一时间了解华为云新鲜技术~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java程序员必须知道的Java9特性

发表于 2021-10-22

小知识,大挑战!本文正在参与“ 程序员必备小知识 ”创作活动

本文同时参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金

Java 17 是Java 最重要的一个LTS版本之一,但是直接从Java 8 过渡到Java 17跨度太大了,势必有些困难。所以胖哥抽时间梳理了一下从Java 9到Java 17的一些常用API的变动。今天先来看看Java 9 都有什么东西。

Java 9

Java 9 最大的变化就是引入了一个JShell和模块化,日常并没有用太多,所以今天不花时间在这些功能上面。

新的创建集合的方法

用过谷歌Guava类库的知道,Guava提供了创建集合的静态工厂方法,而且能够推断泛型,举个例子:

1
2
ini复制代码 List<Person> list = Lists.newArrayList();
 Map<KeyType, Person> map = Maps.newLinkedHashMap();

而原生态需要各种new来定义。Java 9中改善了这一现状,现在你可以:

1
2
3
4
5
6
ini复制代码 // [1, 2, 3, 4]
 List<Integer> integers = List.of(1, 2, 3, 4);
 // {1,2,3}
 Set<Integer> integerSet = Set.of(1, 2, 3);
 // {"hello":"world","hi":"java"}
 Map<String, String> hello = Map.of("hello", "world", "hi", "java");

但是请注意:这些API创建的集合都是不可变的(Immutable),你不能对这些集合进行增删改。

Stream扩展

Stream API 是Java 8中引入的最重要的特性之一。在Java 9中Stream进一步得到了加强。

ofNullable

Stream<T> ofNullable(T t) 返回包含单个元素的顺序Stream ,如果非空,否则返回空Stream 。这个比较简单就不举例了。

iterate

1
r复制代码 Stream<T> iterate(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next)

这个是用来生成有限流的新迭代实现。

  • seed 初始种子值
  • hasNext 用来判断何时结束流,这个与seed有关。如何该函数不迭代保留seed计算,返回的流可能为空。
  • next函数用来计算下一个元素值。

举个例子:

1
2
css复制代码 Stream.iterate(0, i -> i < 5, i -> i + 1)
  .forEach(System.out::println);

等同于传统的:

1
2
3
4
css复制代码 for (int i = 0; i < 5; ++i) {
     System.out.println(i);
 }
 ​

takeWhile

Stream.takeWhile(Predicate) Stream中元素会被断言Predicate,一旦元素断言为false就中断操作,忽略掉没有断言的元素(及时未断言中的元素有满足条件的),仅仅把之前满足元素返回。

1
2
3
scss复制代码 Stream.of(1, 2, 3, 4, 2, 5)
        .takeWhile(x -> x < 4)
        .forEach(System.out::println);

上面的例子中只会输出1、2、3。

dropWhile

这个API和takeWhile机制类似,也用来筛选Stream中的元素。不过符合断言的元素会被从Stream中移除。一旦元素断言为false,就会把断言为false的元素以及后面的元素统统返回。

1
2
3
scss复制代码 Stream.of(1, 2, 3, 4, 2, 5)
        .dropWhile(x -> x < 4)
        .forEach(System.out::println);

上面的例子会输出4、2、5。

和filter操作可不一样啊,切记!

Optional扩展

Optional增加了三个有用的API。

  • stream() Optional现在可以转Stream。
  • ifPresentOrElse(Consumer<? super T> action, Runnable emptyAction) 如果有值了怎么消费,没有值了怎么消费。
  • or(Supplier<? extends Optional<? extends T>> supplier) 如果有值就返回有值的Optional,否则就提供能获取一个有值的Optional的渠道(Supplier)。

try-with-resources 优化

在Java 7 中引入了try-with-resources功能,保证了每个声明了的资源在语句结束的时候都会被关闭。任何实现了java.lang.AutoCloseable接口的对象,和实现了java.io.Closeable接口的对象,都可以当做资源使用。

Java 7时需要这样写:

1
2
3
4
5
6
java复制代码     try (BufferedInputStream bufferedInputStream = new BufferedInputStream(System.in);
              BufferedInputStream bufferedInputStream1 = new BufferedInputStream(System.in)) {
             // do something
        } catch (IOException e) {
             e.printStackTrace();
        }

而到了Java 9简化为:

1
2
3
4
5
6
7
8
9
ini复制代码 BufferedInputStream bufferedInputStream = new BufferedInputStream(System.in);
 BufferedInputStream bufferedInputStream1 = new BufferedInputStream(System.in);
 ​
 try (bufferedInputStream;
      bufferedInputStream1) {
     // do something
 } catch (IOException e) {
     e.printStackTrace();
 }

接口私有方法

继Java 8 引入了接口静态方法和接口默认方法后,又引入了接口私有方法:

1
2
3
4
5
6
7
8
csharp复制代码 public interface Catable {
     /**
      * 接口私有方法
      */
     private void doSomething() {
 ​
    }
 }

引入HttpClient

定义一个新的 HTTP 客户端 API 来实现 HTTP/2 和 WebSocket,并且可以替换旧的HttpURLConnectionAPI。Java以前原生的确实难用,所以诞生了Apache HttpClientComponents 、OkHttp等好用的客户端。新的也不怎么好用,不过也算从零到一了。

1
2
3
4
5
6
7
8
scss复制代码 HttpRequest httpRequest = HttpRequest.newBuilder(newURI)
        .header("Content-Type","*/*")
        .GET()
        .build();
 HttpClient httpClient = HttpClient.newBuilder()
        .connectTimeout(Duration.of(10, ChronoUnit.SECONDS))
        .version(HttpClient.Version.HTTP_2)
        .build();

Flow

Spring WebFlux响应式Web框架已经4年了,响应流规范(reactive streams)在Java 9 中也初步引入到了JDK中。这个东西目前还有些先进,胖哥还没找到具体应用场景,先挖个坑。

总结

其实Java 9 还有一些底层的优化,不过对于普通开发者来说了解这些就够用了。上面几个特性,比较常用的就是静态不变集合、try-with-resources优化。 其它的特性需要你对Java 8非常熟练的情况下才会锦上添花。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Centos安装和卸载docker

发表于 2021-10-22

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

查询当前容器:docker container ls -all

删除当前容器:docker container rm mycentos(提示: 这一步要确定删除容器没问题的情况下, 才可以做)

查看docker 列表

docker ps

进入docker

docker attach 234kadhf234adf

自己随便yum 安装了docker 安装了发现一直启动不了,一直报Failed to start docker.service: Unit not found. 这个错误,于是卸载重装,记录一下这个坑爹的过程

一、查看自己已经安装的docker

1
perl复制代码yum list installed |grep docker

输入后可以清楚的看到控制台会输出

1
2
3
ini复制代码[root@xxwlog docker]# yum list installed |grep docker
docker-client.x86_64 2:1.13.1-162.git64e9980.el7.centos
docker-common.x86_64 2:1.13.1-162.git64e9980.el7.centos

二、卸载已经安装的docker

1
arduino复制代码yum -y remove docker-ce.x86_64

client会自动卸载掉如果没有卸载掉继续使用yum -y remove 卸载

再次输入 yum list installed |grep docker 查看是否删除成功

三、以正确的姿态安装docker

这里推荐使用Docker Engineshequ社区的存储库安装docker。所以在安装之前需要设置Docker存储库。之后,您可以从存储库安装和更新Docker。

3.1 安装存储库:

安装所需的包。yum-utils提供了yum-config-manager 效用,并device-mapper-persistent-data和lvm2由需要 devicemapper存储驱动程序。

1
kotlin复制代码sudo yum install -y yum-utils device-mapper-persistent-data lvm2

3.2设置稳定存储库yum源为阿里docker源

1
arduino复制代码yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

3.3 第一种安装方式 最新版本安装

1
复制代码sudo yum install -y docker-ce

安装完成后,请直接跳过3.4

3.4 特定版本安装

要安装特定版本的Docker Engine - Community,请列出repo中的可用版本,然后选择并安装

3.4.1列出可安装版本

1
bash复制代码yum list docker-ce --showduplicates | sort -r

3.4.2 安装指定版本

1
2
bash复制代码#从安装列表中指定版本17.09.0
yum install docker-ce-17.09.0.ce -y

3.5 启动并加入开启启动

1
2
bash复制代码sudo systemctl start docker
sudo systemctl enable docker

3.6 验证安装是否成功(有client和service两部分表示docker安装启动都成功了)

1
复制代码docker version

3.7 验证启动成功

1
css复制代码docker ps -a

3.8 常用管理命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bash复制代码#docker启动
sudo systemctl start docker

#设置开机自启动
sudo systemctl enable docker

#重启docker
sudo systemctl restart docker

#关闭docker
sudo systemctl stop docker

#查看是否启动成功
docker ps -a

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

微服务SpringCloud项目(十二):初步整合rabbi

发表于 2021-10-22

小知识,大挑战!本文正在参与「程序员必备小知识」创作活动

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

📖前言

1
复制代码心态好了,就没那么累了。心情好了,所见皆是明媚风景。

“一时解决不了的问题,那就利用这个契机,看清自己的局限性,对自己进行一场拨乱反正。”正如老话所说,一念放下,万般自在。如果你正被烦心事扰乱心神,不妨学会断舍离。断掉胡思乱想,社区垃圾情绪,离开负面能量。心态好了,就没那么累了。心情好了,所见皆是明媚风景。

1
2
3
4
5
6
7
8
9
10
11
text复制代码十年生死两茫茫,写程序,到天亮。

千行代码,Bug何处藏。

纵使上线又怎样,朝令改,夕断肠。

领导每天新想法,天天改,日日忙。

相顾无言,惟有泪千行。

每晚灯火阑珊处,夜难寐,再写两行!

RabbitMQ 简介

AMQP,即 Advanced Message Queuing Protocol ,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX 。用于在 分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

要学习 RabbitMQ 最靠谱的地方当然是他的官网:www.rabbitmq.com/,其他地方难免有解读误…

1. 引入依赖


本次需要创建2个子项目,一个 rabbitmq-provider (生产者),一个 rabbitmq-consumer(消费者)。

首先创建 rabbitmq-provider,

pom.xml 里用到的jar依赖:

1
2
3
4
5
6
7
8
9
xml复制代码        <!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

2. 然后 application.yml:


ps:里面的 虚拟 / 配置项 不是必须的,你们不创建,就不用加这个配置项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
yaml复制代码srping: 
# rabbitmq消息队列配置
rabbitmq:
password: 账号
username: 密码
port: 5672
addresses: 地址
#开启发送失败返回
publisher-returns: true
#配置确认回调
publisher-confirm-type: correlated
listener:
simple:
#指定最小的消费者数量.
concurrency: 5
#指定最大的消费者数量.
max-concurrency: 10
#开启ack
acknowledge-mode: auto
# 最多一次消费多少条数据 -限流
prefetch: 1
#开启ack
direct:
acknowledge-mode: auto
#支持消息的确认与返回
template:
mandatory: true

3. direct exchange(直连型交换机)


创建 DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
java复制代码package com.cyj.dream.test.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Description: 直连型交换机
* 1. 接着我们先使用下direct exchange(直连型交换机),创建DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,
* 在注释里有说明,后面的不同交换机的配置就不做同样说明了):
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.test.config
* @Author: ChenYongJia
* @CreateTime: 2021-10-18
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Configuration
public class DirectRabbitConfig {

/**
* 队列 起名:TestDirectQueue
*
* @return
*/
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);

//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue", true);
}

/**
* Direct交换机 起名:TestDirectExchange
*
* @return
*/
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange", true, false);
}

/**
* 绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
*
* @return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}


@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}

}

1. SendMessageController 推送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码package com.cyj.dream.test.controller;

import com.cyj.dream.core.util.user.UUIDUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

/**
* @Description: 消息推送接口
* 1. 再写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java:
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.test.controller
* @Author: ChenYongJia
* @CreateTime: 2021-10-18
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@RestController
public class SendMessageController {

/**
* 使用RabbitTemplate,这提供了接收/发送等等方法
*/
@Autowired
RabbitTemplate rabbitTemplate;

@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "这是一条测试消息, 你好啊骚年!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}

}

2. 把生产者项目项目运行,调用下接口:

1634864786.jpg

因为我们目前还没弄消费者 rabbitmq-consumer,消息没有被消费的,我们可以去 rabbitMq 管理页面看看,是否推送成功,这里就不多说了。

3. 接下来,创建 rabbitmq-consumer 项目:

pom.xml 的依赖加入

1
2
3
4
5
6
7
8
9
xml复制代码        <!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

然后是 application.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
yaml复制代码spring: 
# rabbitmq消息队列配置
rabbitmq:
password: 账号
username: 密码
port: 5672
addresses: 地址
#虚拟host 可以不设置,使用server默认host
#virtual-host: JCcccHost
#开启发送失败返回
publisher-returns: true
#配置确认回调
publisher-confirm-type: correlated
listener:
simple:
#指定最小的消费者数量.
concurrency: 5
#指定最大的消费者数量.
max-concurrency: 10
#开启ack
acknowledge-mode: auto
# 最多一次消费多少条数据 -限流
prefetch: 1
#开启ack
direct:
acknowledge-mode: auto
#支持消息的确认与返回
template:
mandatory: true

4. 创建消息接收监听类

DirectReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码package com.cyj.dream.file.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @Description: 消息接收监听类,DirectReceiver.java:
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.listener
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {

@RabbitHandler
public void process(Map testMessage) {
log.info("DirectReceiver消费者收到消息 : {}", testMessage.toString());
}

}

然后将 rabbitmq-consumer 项目运行起来,可以看到把之前推送的那条消息消费下来了:(继续调用 rabbitmq-provider 项目的推送消息接口,你将可以看到消费者即时消费消息:)

1634864787(1).jpg

等你尝试:直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?将以轮询的方式对消息进行消费,而且不存在重复消费。

4. Topic Exchange 主题交换机。


在生产者项目中创建TopicRabbitConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码package com.cyj.dream.test.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Description: 通配符(话题)模式
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.config
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Configuration
public class TopicRabbitConfig {

/**
* 绑定键
*/
public final static String man = "topic.man";

public final static String woman = "topic.woman";

@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}

@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}


/**
* 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
*
* 这样只要是消息携带的路由键是topic.man,才会分发到该队列
* @return
*/
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}

/**
* 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
* 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
* @return
*/
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}

}

1. 在接口添加方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码// ================================= 生产者使用 Topic 话题模式 ================================

@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}

@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}

2. 在消费者项目创建 TopicManReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码package com.cyj.dream.file.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @Description: Topic话题模式消费者
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.listener
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {

@RabbitHandler
public void process(Map testMessage) {
log.info("TopicManReceiver消费者收到消息 : " + testMessage.toString());
}

}

3. 再建一个 TopicTotalReceiver 监听 topic.woman

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码package com.cyj.dream.file.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @Description: Topic话题模式总数监听器
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.listener
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {

@RabbitHandler
public void process(Map testMessage) {
log.info("TopicTotalReceiver消费者收到消息 : " + testMessage.toString());
}

}

4. 添加 主题交换机 配置 TopicRabbitConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码package com.cyj.dream.file.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Description: 通配符(话题)模式--消费者一定要加这个配置吗? 不需要的其实
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.file.config
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Configuration
public class TopicRabbitConfig {

/**
* 绑定键
*/
public final static String man = "topic.man";

public final static String woman = "topic.woman";

@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}

@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}


/**
* 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
*
* 这样只要是消息携带的路由键是topic.man,才会分发到该队列
* @return
*/
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}

/**
* 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
* 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
* @return
*/
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}

}

5. 重启生产者和消费者,然后分别调用两个新加的接口,然后看消费者 rabbitmq-consumer 的控制台输出情况:

1634865296(1).jpg

1
2
3
java复制代码TopicTotalReceiver消费者收到消息  : {createTime=2021-10-22 09:14:45, messageId=54d2732244db48a3af3e579ae8e18465, messageData=message: M A N }
TopicManReceiver消费者收到消息 : {createTime=2021-10-22 09:14:45, messageId=54d2732244db48a3af3e579ae8e18465, messageData=message: M A N }
TopicTotalReceiver消费者收到消息 : {createTime=2021-10-22 09:15:11, messageId=0020ad91b22d49d28b49e5bc4d7b3550, messageData=message: woman is all }

5. 消息回调


那么接下来我们继续讲讲消息的回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)。

在 rabbitmq-provider 项目的 application.yml 文件上,加上消息确认的配置项后:

ps: 本篇文章使用 springboot 版本为 2.3.6.RELEASE ; 如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为版本导致的配置项不起效,可以把 publisher-confirm-type: correlated 替换为 publisher-confirms: true

1. 配置相关的消息确认回调函数

RabbitConfig.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码package com.cyj.dream.test.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Description: 配置相关的消息确认回调函数
* @BelongsProject: DreamChardonnay
* @BelongsPackage: com.cyj.dream.rabbitmq.config
* @Author: ChenYongJia
* @CreateTime: 2021-10-19
* @Email: chen87647213@163.com
* @Version: 1.0
*/
@Slf4j
@Configuration
public class RabbitConfig {

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("ConfirmCallback: " + "相关数据:" + correlationData);
log.info("ConfirmCallback: " + "确认情况:" + ack);
log.info("ConfirmCallback: " + "原因:" + cause);
}
});

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("ReturnCallback: " + "消息:" + message);
log.info("ReturnCallback: " + "回应码:" + replyCode);
log.info("ReturnCallback: " + "回应信息:" + replyText);
log.info("ReturnCallback: " + "交换机:" + exchange);
log.info("ReturnCallback: " + "路由键:" + routingKey);
}
});

return rabbitTemplate;
}

}

2. 触发条件

到这里,生产者推送消息的消息确认调用回调函数已经完毕。可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;那么这两种回调函数在什么情况会触发呢?

先从总体的情况分析,推送消息存在四种情况:

  1. 消息推送到server,但是在server里找不到交换机
  2. 消息推送到server,找到交换机了,但是没找到队列
  3. 消息推送到server,交换机和队列啥都没找到
  4. 消息推送成功

3. 分别测试一下

第一种
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@GetMapping("/TestMessageAck")
public String TestMessageAck() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "message: non-existent-exchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
return "ok";
}

调用结果如下:(这种情况触发的是 ConfirmCallback 回调函数。)

1
2
3
java复制代码2021-10-22 09:22:33.335  INFO 17412 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig   : ConfirmCallback:     相关数据:null
2021-10-22 09:22:33.335 INFO 17412 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 确认情况:false
2021-10-22 09:22:33.335 INFO 17412 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
第二种

这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在 DirectRabitConfig 里面新增一个直连交换机,名叫 ‘lonelyDirectExchange’ ,但没给它做任何绑定配置操作:

1
2
3
4
typescript复制代码@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}

然后写个测试接口,把消息推送到名为 ‘lonelyDirectExchange’ 的交换机上(这个交换机是没有任何队列配置的):

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@GetMapping("/TestMessageAck2")
public String TestMessageAck2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: lonelyDirectExchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
return "ok";
}

结果如下:(这种情况触发的是 ConfirmCallback 和 RetrunCallback 两个回调函数。两个函数都被调用了;这种情况下,消息是推送成功到服务器了的,所以 ConfirmCallback 对消息确认情况是 true)

1
2
3
4
5
6
7
8
java复制代码2021-10-22 09:26:33.881  INFO 26800 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig   : ConfirmCallback:     相关数据:null
2021-10-22 09:26:33.881 INFO 26800 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 确认情况:true
2021-10-22 09:26:33.881 INFO 26800 --- [nectionFactory2] com.cyj.dream.test.config.RabbitConfig : ConfirmCallback: 原因:null
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 消息:(Body:'{createTime=2021-10-22 09:26:33, messageId=660c817afa474d8f899b9b7ace6d1386, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 回应码:312
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 回应信息:NO_ROUTE
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 交换机:lonelyDirectExchange
2021-10-22 09:26:33.882 INFO 26800 --- [nectionFactory1] com.cyj.dream.test.config.RabbitConfig : ReturnCallback: 路由键:TestDirectRouting
第三种–这种情况触发的是 ConfirmCallback 回调函数。

消息推送到sever,交换机和队列啥都没找到,这种情况其实跟 第一种 很像,第三种 和 第一种 情况回调是一致的,就不做结果说明了。

第四种–消息推送成功

按照正常调用之前消息推送的接口就行,这种情况触发的是 ConfirmCallback 回调函数。


PS:今天先说到这里吧,明天聊聊 消息确认机制,最后感谢大家耐心观看完毕,留个点赞收藏是您对我最大的鼓励!


🎉总结:

  • 更多参考精彩博文请看这里:《陈永佳的博客》
  • 喜欢博主的小伙伴可以加个关注、点个赞哦,持续更新嘿嘿!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…477478479…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%