开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

我就想存个文件,怎么这么麻烦 ?- k8s PV、PVC、S

发表于 2021-11-26

Docker

当我们使用 Docker 时,设置数据卷(Volume)还是比较简单的,只需要在容器映射指定卷的路径,然后在容器中使用该路径即可。

比如这种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
yaml复制代码# tomcat
tomcat01:
hostname: tomcat01
restart: always
image: jdk-tomcat:v8
container_name: tomcat8-1
links:
- mysql:mysql
volumes:
- /home/soft/docker/tomcat/webapps:/usr/local/apache-tomcat-8.5.39/webapps
- /home/soft/docker/tomcat/logs:/usr/local/apache-tomcat-8.5.39/logs
- /etc/localtime:/etc/localtime
environment:
JAVA_OPTS: -Dspring.profiles.active=prod
TZ: Asia/Shanghai
LANG: C.UTF-8
LC_ALL: zh_CN.UTF-8
env_file:
- /home/soft/docker/env/tomcat.env

为什么要设置 Volume? 当然是因为我们要持久化数据,要把数据存储到硬盘上。

k8s

到了 k8s 这儿,你会发现事情没那么简单了,涌现出了一堆概念:

  • Pv
  • Pvc
  • StorageClass
  • Provisioner
  • …

先不管这些复杂的概念,我只想存个文件,有没有简单的方式?

有,我们先回顾下基本概念。

我们知道,Container 中的文件在磁盘上是临时存放的,当容器崩溃时文件丢失。kubelet 会重新启动容器, 但容器会以干净的状态重启。所以我们要使用 Volume 来持久化数据。

Docker 也有 卷(Volume) 的概念,但对它只有少量且松散的管理。 Docker 卷是磁盘上或者另外一个容器内的一个目录 Docker 提供卷驱动程序,但是其功能非常有限。

Kubernetes 支持很多类型的卷。 Pod 可以同时使用任意数目的卷类型。

临时卷类型的生命周期与 Pod 相同,但持久卷可以比 Pod 的存活期长。 当 Pod 不再存在时,Kubernetes 也会销毁临时卷;不过 Kubernetes 不会销毁 持久卷。对于给定 Pod 中任何类型的卷,在容器重启期间数据都不会丢失。

卷的核心是一个目录,其中可能存有数据,Pod 中的容器可以访问该目录中的数据。 所采用的特定的卷类型将决定该目录如何形成的、使用何种介质保存数据以及目录中存放 的内容。

使用卷时,在 .spec.volumes 字段中设置为 Pod 提供的卷,并在 .spec.containers[*].volumeMounts 字段中声明卷在容器中的挂载位置。 各个卷则挂载在镜像内的指定路径上。 卷不能挂载到其他卷之上,也不能与其他卷有硬链接。Pod 配置中的每个容器必须独立指定各个卷的挂载位置。

通过上面的概念我们知道 Volume 有不同的类型,有临时的,也有持久的,那么我们先说说简单的,即解决“我只想存个文件,有没有简单的方式”的需求。

hostPath

hostPath 卷能将主机节点文件系统上的文件或目录挂载到你的 Pod 中。看个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
yaml复制代码apiVersion: v1
kind: Pod
metadata:
name: test-webserver
spec:
containers:
- name: test-webserver
image: k8s.gcr.io/test-webserver:latest
volumeMounts:
- mountPath: /var/local/aaa
name: mydir
- mountPath: /var/local/aaa/1.txt
name: myfile
volumes:
- name: mydir
hostPath:
# 确保文件所在目录成功创建。
path: /var/local/aaa
type: DirectoryOrCreate
- name: myfile
hostPath:
path: /var/local/aaa/1.txt
type: FileOrCreate

通过 hostPath 能够简单解决文件在宿主机上存储的问题。

不过需要注意的是:

HostPath 卷存在许多安全风险,最佳做法是尽可能避免使用 HostPath。 当必须使用 HostPath 卷时,它的范围应仅限于所需的文件或目录,并以只读方式挂载。

使用 hostPath 还有一个局限性就是,我们的 Pod 不能随便漂移,需要固定到一个节点上,因为一旦漂移到其他节点上去了宿主机上面就没有对应的数据了,所以我们在使用 hostPath 的时候都会搭配 nodeSelector 来进行使用。

emptyDir

emptyDir 也是比较常见的一种存储类型。

上面的 hostPath 显示的定义了宿主机的目录。emptyDir 类似隐式的指定。

Kubernetes 会在宿主机上创建一个临时目录,这个目录将来就会被绑定挂载到容器所声明的 Volume 目录上。而 Pod 中的容器,使用的是 volumeMounts 字段来声明自己要挂载哪个 Volume,并通过 mountPath 字段来定义容器内的 Volume 目录

当 Pod 分派到某个 Node 上时,emptyDir 卷会被创建,并且在 Pod 在该节点上运行期间,卷一直存在。 就像其名称表示的那样,卷最初是空的。 尽管 Pod 中的容器挂载 emptyDir 卷的路径可能相同也可能不同,这些容器都可以读写 emptyDir 卷中相同的文件。 当 Pod 因为某些原因被从节点上删除时,emptyDir 卷中的数据也会被永久删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
yaml复制代码apiVersion: v1
kind: Pod
metadata:
name: test-pd
spec:
containers:
- image: k8s.gcr.io/test-webserver
name: test-container
volumeMounts:
- mountPath: /cache
name: cache-volume
volumes:
- name: cache-volume
emptyDir: {}

如果执行 kubectl describe 命令查看 pod 信息的话,可以验证前面我们说的内容:
“EmptyDir (a temporary directory that shares a pod’s lifetime)”

1
2
3
4
5
6
7
8
9
10
11
12
13
text复制代码...
Containers:
nginx:
Container ID: docker://07b4f89248791c2aa47787e3da3cc94b48576cd173018356a6ec8db2b6041343
Image: nginx:1.8
...
Environment: <none>
Mounts:
/usr/share/nginx/html from nginx-vol (rw)
...
Volumes:
nginx-vol:
Type: EmptyDir (a temporary directory that shares a pod's lifetime)

PV 和 PVC

  • PV(PersistentVolume): 持久化卷
  • PVC(PersistentVolumeClaim): 持久化卷声明

PV 和 PVC 的关系就像 java 中接口和实现的关系类似。

PVC 是用户存储的一种声明,PVC 和 Pod 比较类似,Pod 消耗的是节点,PVC 消耗的是 PV 资源,Pod 可以请求 CPU 和内存,而 PVC 可以请求特定的存储空间和访问模式。对于真正使用存储的用户不需要关心底层的存储实现细节,只需要直接使用 PVC 即可。

PV 是对底层共享存储的一种抽象,由管理员进行创建和配置,它和具体的底层的共享存储技术的实现方式有关,比如 Ceph、GlusterFS、NFS、hostPath 等,都是通过插件机制完成与共享存储的对接。

我们来看一个例子:

比如,运维人员可以定义这样一个 NFS 类型的 PV

1
2
3
4
5
6
7
8
9
10
11
12
13
yaml复制代码apiVersion: v1
kind: PersistentVolume
metadata:
name: nfs
spec:
storageClassName: manual
capacity:
storage: 1Gi
accessModes:
- ReadWriteMany
nfs:
server: 10.244.1.4
path: "/"

PVC 描述的,则是 Pod 所希望使用的持久化存储的属性。比如,Volume 存储的大小、可读写权限等等。

1
2
3
4
5
6
7
8
9
10
11
12
yaml复制代码
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: nfs
spec:
accessModes:
- ReadWriteMany
storageClassName: manual
resources:
requests:
storage: 1Gi

用户创建的 PVC 要真正被容器使用起来,就必须先和某个符合条件的 PV 进行绑定。

  • 第一个条件是 PV 和 PVC 的 spec 字段。比如,PV 的存储(storage)大小,就必须满足 PVC 的要求。
  • 第二个条件,则是 PV 和 PVC 的 storageClassName 字段必须一样

在成功地将 PVC 和 PV 进行绑定之后,Pod 就能够像使用 hostPath 等常规类型的 Volume 一样,在自己的 YAML 文件里声明使用这个 PVC 了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
yaml复制代码apiVersion: v1
kind: Pod
metadata:
labels:
role: web-frontend
spec:
containers:
- name: web
image: nginx
ports:
- name: web
containerPort: 80
volumeMounts:
- name: nfs
mountPath: "/usr/share/nginx/html"
volumes:
- name: nfs
persistentVolumeClaim:
claimName: nfs

我们前面使用的 hostPath 和 emptyDir 类型的 Volume 并不具备“持久化”特征,既有可能被 kubelet 清理掉,也不能被“迁移”到其他节点上。所以,大多数情况下,持久化 Volume 的实现,往往依赖于一个远程存储服务,比如:远程文件存储(比如,NFS、GlusterFS)、远程块存储(比如,公有云提供的远程磁盘)等等。

StorageClass

前面我们人工管理 PV 的方式就叫作 Static Provisioning。

一个大规模的 Kubernetes 集群里很可能有成千上万个 PVC,这就意味着运维人员必须得事先创建出成千上万个 PV。更麻烦的是,随着新的 PVC 不断被提交,运维人员就不得不继续添加新的、能满足条件的 PV,否则新的 Pod 就会因为 PVC 绑定不到 PV 而失败。在实际操作中,这几乎没办法靠人工做到。所以,Kubernetes 为我们提供了一套可以自动创建 PV 的机制,即:Dynamic Provisioning。

Dynamic Provisioning 机制工作的核心,在于一个名叫 StorageClass 的 API 对象。而 StorageClass 对象的作用,其实就是创建 PV 的模板。

具体地说,StorageClass 对象会定义如下两个部分内容:

  • 第一,PV 的属性。比如,存储类型、Volume 的大小等等。
  • 第二,创建这种 PV 需要用到的存储插件。比如,Ceph 等等。

有了这样两个信息之后,Kubernetes 就能够根据用户提交的 PVC,找到一个对应的 StorageClass 了。然后,Kubernetes 就会调用该 StorageClass 声明的存储插件,创建出需要的 PV。

在下面的例子中,PV 是被自动创建出来的。

1
2
3
4
5
6
7
8
9
10
11
12
yaml复制代码apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: claim1
spec:
accessModes:
- ReadWriteOnce
# 指定所使用的存储类,此存储类将会自动创建符合要求的 PV
storageClassName: fast
resources:
requests:
storage: 30Gi
1
2
3
4
5
6
7
yaml复制代码apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: fast
provisioner: kubernetes.io/gce-pd
parameters:
type: pd-ssd

StorageClass 的作用,则是充当 PV 的模板。并且,只有同属于一个 StorageClass 的 PV 和 PVC,才可以绑定在一起。StorageClass 的另一个重要作用,是指定 PV 的 Provisioner(存储插件)。这时候,如果你的存储插件支持 Dynamic Provisioning 的话,Kubernetes 就可以自动为你创建 PV 了。

Local PV

Kubernetes 依靠 PV、PVC 实现了一个新的特性,这个特性的名字叫作:Local Persistent Volume,也就是 Local PV。

Local PV 实现的功能就非常类似于 hostPath 加上 nodeAffinity,比如,一个 Pod 可以声明使用类型为 Local 的 PV,而这个 PV 其实就是一个 hostPath 类型的 Volume。如果这个 hostPath 对应的目录,已经在节点 A 上被事先创建好了,那么,我只需要再给这个 Pod 加上一个 nodeAffinity=nodeA,不就可以使用这个 Volume 了吗?理论上确实是可行的,但是事实上,我们绝不应该把一个宿主机上的目录当作 PV 来使用,因为本地目录的存储行为是完全不可控,它所在的磁盘随时都可能被应用写满,甚至造成整个宿主机宕机。所以,一般来说 Local PV 对应的存储介质是一块额外挂载在宿主机的磁盘或者块设备,我们可以认为就是“一个 PV 一块盘”。

Local PV 和普通的 PV 有一个很大的不同在于 Local PV 可以保证 Pod 始终能够被正确地调度到它所请求的 Local PV 所在的节点上面,对于普通的 PV 来说,Kubernetes 都是先调度 Pod 到某个节点上,然后再持久化节点上的 Volume 目录,进而完成 Volume 目录与容器的绑定挂载,但是对于 Local PV 来说,节点上可供使用的磁盘必须是提前准备好的,因为它们在不同节点上的挂载情况可能完全不同,甚至有的节点可以没这种磁盘,所以,这时候,调度器就必须能够知道所有节点与 Local PV 对应的磁盘的关联关系,然后根据这个信息来调度 Pod,实际上就是在调度的时候考虑 Volume 的分布。

例子:

先创建本地磁盘对应的 pv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
yaml复制代码apiVersion: v1
kind: PersistentVolume
metadata:
name: example-pv
spec:
capacity:
storage: 5Gi
volumeMode: Filesystem
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Delete
storageClassName: local-storage
local:
path: /mnt/disks/vol1
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- node-1

其中:

  • lcal.path 写对应的磁盘路径
  • 必须指定对应的 node , 用 .spec.nodeAffinity 来对应的 node
  • .spec.volumeMode 可以是 FileSystem(Default)和 Block
  • 确保先运行了 StorageClass (即下面写的文件)

再写对于的 StorageClass 文件

1
2
3
4
5
6
yaml复制代码kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer

其中:

  • provisioner 是 kubernetes.io/no-provisioner , 这是因为 local pv 不支持 Dynamic Provisioning, 所以它没有办法在创建出 pvc 的时候,自动创建对应 pv
  • volumeBindingMode 是 WaitForFirstConsumer , WaitForFirstConsumer 即延迟绑定 , 这样可以既保证推迟到调度的时候再进行绑定 , 又可以保证调度到指定的 pod 上 , 其实 WaitForFirstConsumer 又 2 种:一种是 WaitForFirstConsumer , 一种是 Immediate , 这里必须用延迟绑定模式。

再创建一个 pvc

1
2
3
4
5
6
7
8
9
10
11
yaml复制代码kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: example-local-claim
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: local-storage

这里需要注意的地方就是 storageClassName 要写出我们之前自己创建的 storageClassName 的名字:local-storage

之后应用这个文件 , 使用命令 kubectl get pvc 可以看到他的状态是 Pending , 这个时候虽然有了匹配的 pv , 但是也不会进行绑定 , 依然在等待。

之后我们写个 pod 应用这个 pvc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
yaml复制代码kind: Pod
apiVersion: v1
metadata:
name: example-pv-pod
spec:
volumes:
- name: example-pv-storage
persistentVolumeClaim:
claimName: example-local-claim
containers:
- name: example-pv-container
image: nginx
ports:
- containerPort: 80
name: "http-server"
volumeMounts:
- mountPath: "/usr/share/nginx/html"
name: example-pv-storage

这样就部署好了一个 local pv 在 pod 上 , 这样即使 pod 没有了 , 再次重新在这个 node 上创建,写入的文件也能持久化的存储在特定位置。

如何删除这个 pv 一定要按照流程来 , 要不然会删除失败

  • 删除使用这个 pv 的 pod
  • 从 node 上移除这个磁盘(按照一个 pv 一块盘)
  • 删除 pvc
  • 删除 pv

总结

本文我们讨论了 kubernetes 存储的几种类型,有临时存储如:hostPath、emptyDir,也有真正的持久化存储,还讨论了相关的概念,如:PVC、PV、StorageClass等,下图是对这些概念的一个概括:

、

参考

  • 极客时间:深入剖析 Kubernetes 课程
  • kubernetes.io/zh/docs/con…
  • www.qikqiak.com/k8strain/st…
  • www.kubernetes.org.cn/4078.html
  • haojianxun.github.io/2019/01/10/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

聊聊我们服务端数据库存储时间的几种格式

发表于 2021-11-26

「这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战」

hi ,大家好,我是三天打鱼,两天晒网的小六六

前言

文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820…

种一棵树最好的时间是十年前,其次是现在

之前因为时区的原因导致了一些Bug 然后发现自己对这块的知识也比较薄弱,所以系统的跟大家一起来学习学习。

UTC时间

UTC时间又称协调世界时
协调世界时,又称世界统一时间、世界标准时间、国际协调时间。由于英文(CUT)和法文(TUC)的缩写不同,作为妥协,简称UTC。
协调世界时是以原子时秒长为基础,在时刻上尽量接近于世界时的一种时间计量系统。中国大陆采用ISO 8601-1988的《数据元和交换格式信息交换日期和时间表示法》(GB/T 7408-1994)称之为国际协调时间,代替原来的GB/T 7408-1994;中国台湾采用CNS 7648的《资料元及交换格式–资讯交换–日期及时间的表示法》,称之为世界统一时间。

时区

地球的自转运动产生了太阳东升西落现象,日出为昼、日落为夜。世界各地以昼夜交替现象为基础确定日期变化和时间推移。

然而各地位置不同时间标准不一造成了时间上的混乱。为了克服这种局面,1884年,各国在华盛顿召开国际经度会议,与会国家创立了通用的标准时间制度——以180°经线为理论上的日界线,经度每隔15°,地方时相差1小时。

虽然我国横跨多个时区,但只采用了一个标准时区,这也是长期实践得来的结果。中美时区制度不同的根本原因在于人口分布的区别。受制于地形和经济发展水平问题,美国的大城市和人口聚集区在中部和东西海岸;而中国的人口分布相对集中,绝大多数在“黑河腾冲线”以东。

现在我们谈到的“北京时间”,并不是真正意义上的北京时间,而是以陕西省渭南市蒲城县授时中心发出的东八区时间(东经120度)作为统一称呼的“北京时间”。

时间戳

“时间戳是指格林威治时间自1970年1月1日(00:00:00GMT)至当前时间的总秒数。通俗的讲,时间戳是一份能够表示一份数据在一个特定时间点已经存在的完整的可验证的数据。”

在计算机中,「时间戳」一般是指 Unix 时间戳,即自从 Unix 纪元(格林威治时间 1970 年 1 月 1 日 00:00:00)到当前时间的秒数。

时间戳是可以转换成任何时区的时间

数据库如何存储时间

字符串存储日期

想必大家在刚开始接触开发的时候,这种做法是可取的,简单,容易上手,可识别性比较高,一看就懂

  • 字符串占用的空间更大
  • 字符串存储的日期比较效率比较低(逐个字符进行比对),无法用用日期相关的API进行计算比较。
  • 时区问题 你存字符串是你没办法存储时区的,如果是海外项目会有很多的问题,所以对于一些项目来说存它是非常不适合的

Datetime和Timestamp

Datetime 和 Timestamp 是 MySQL 提供的两种比较相似的保存时间的数据类型。他们两者究竟该如何选择呢?

首选TimeStamp。

datetime 更像日历上面的时间和你手表的时间的结合,就是指具体某个时间。

timestamp 更适合来记录时间,比如我在东八区时间现在是 2016-08-02 10:35:52, 你在日本(东九区此时时间为 2016-08-02 11:35:52),我和你在聊天,数据库记录了时间,取出来之后,对于我来说时间是 2016-08-02 10:35:52,对于日本的你来说就是 2016-08-02 11:35:52。所以就不用考虑时区的计算了。

时间范围是 timestamp 硬伤(1970-2038),当然 datetime (1000-9999)也记录不了刘备什么时候出生(161 年)。

DateTime类型没有时区信息的(时区无关)
DateTime 类型保存的时间都是当前会话所设置的时区对应的时间。这样就会有什么问题呢?当你的时区更换之后,比如你的服务器更换地址或者更换客户端连接时区设置的话,就会导致你从数据库中读出的时间错误。不要小看这个问题,很多系统就是因为这个问题闹出了很多笑话。

数值型时间戳是更好的选择吗

很多时候,我们也会使用 int 或者 bigint 类型的数值也就是时间戳来表示时间。

这种存储方式的具有 Timestamp 类型的所具有一些优点,并且使用它的进行日期排序以及对比等操作的效率会更高,跨系统也很方便,毕竟只是存放的数值。缺点也很明显,就是数据的可读性太差了,你无法直观的看到具体时间。

总结

推荐 Timestamp,原因是数值表示时间不够直观

每种方式都有各自的优势,根据实际场景才是王道。下面再对这三种方式做一个简单的对比,以供大家实际开发中选择正确的存放时间的数据类型:

image.png

好了今天的分享就到这了,我是小六六,三天打鱼,两天晒网。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

EXPLAIN 使用

发表于 2021-11-26

「这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战」

分析 EXPLAIN 的结果

现在我们来分析一下 EXPLAIN 的结果。

1
csharp复制代码explain select * from salaries where from_date ='1996-12-02';

image-20210828110401998

select_type 是 simple 表示这是一个简单查询;查询的表是 salaries;type 是 ALL 表示发生了全表扫描,全秒扫描的性能是最差的;possible_keys、key、key_len 都是空,说明没有使用索引;rows 执行这条 SQL 语句需要扫描两百多万行数据才能返回,filtered 的值是百分之十;最后 extra 的值是 Using where 表示使用了where 条件。

通过执行这条 SQL 语句,可以发现执行这条 SQL 语句需要花费 600 毫秒,说明这条 SQL 语句执行的性能比较差。

下面我们再来分析一条 SQL 语句。

image-20210828111739750

经过运行的结果来抗,从中可以发现 explain 展示了两行结果,当有多行结果的时候,这个 id 字段还是有用的,它可以描述 SQL 语句的执行过程。

如果 explain 的结果包含多个 id 值,比如 id 的值为 1 和 id 的值为 2,那么数字越大越先执行,也就是说 id 的值为 2 的先执行,id 的值为 1 的后执行。而对于相同 id 的行,那么会从上到下依次执行,这一点在前面也介绍过。

其中一条操作了 employees 表,另外一条操作了 salaries 表,由于我们起了别名,所以 table 这一列展示了别名。

从 type 来看,操作 employees 表的时候,是 const,这是一个非常好的级别,使用了主键索引,可能会使用组件,实际使用的也是组件,并且只扫描了一条数据,因为我们指定了条件去查询的。rows x filtered 为 1,也就是 MySQL 预估会使用 employees 表的一行数据和 salaries 的表去关联。

同理,操作 表的时候,type 的值是 ref,ref 的性能也是不错的。也使用了主键索引,最后经过估算,需要扫描 17 行数据。

可视化工具分析 SQL

这里我们介绍两款可视化工具,一款是 IntelliJ IDEA,只要这款工具怎么使用,相信很多小伙伴都知道,另一是 MySQL 官网提供一款 MySQL Workbench。下面我们一起来简单使用一下这两款工具。

使用 IntelliJ IDEA 分析 SQL。选中 SQL 语句,右键点击 Explain Plan。

image-20210829104308238

下图所展示的就是通过IntelliJ IDEA 分析 SQL 语句。

image-20210829104501776

此时,可以点击 Show Visualization,展示一个树状的结果。

image-20210829104724428

使用 MySQL 官方提供的 MySQL Workbench 可视化分析。MySQL Workbench 为数据库管理员、程序开发者和系统规划师提供可视化的Sql开发、数据库建模、以及数据库管理功能。

扩展的 EXPLAIN

EXPLAIN 可产生额外的更多的信息,可以通过在 EXPLAIN 语句后紧跟一条 SHOW WARNING 语句查看扩展信息,从而帮助我们了解 SQL 的更多详情。

扩展的 EXPLAIN 与 MySQL 的版本有很大的关系:

在MySQL 8.0.12及更高版本,扩展信息可用于 SELECT、DELETE、INSERT、REPLACE、UPDATE 语句;在 MySQL 8.0.12 之前,扩展信息仅适用于SELECT语句。

在 MySQL 5.6 及更低版本,需使用 EXPLAIN EXTENDED xxx 语句;而从 MySQL5.7 开始,无需添加EXTENDED 关键词。

使用实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
markdown复制代码 explain select * from employees e left join salaries s on e.emp_no = s.emp_no where e.emp_no=10001\G
*************************** 1. row ***************************
          id: 1
select_type: SIMPLE
       table: e
  partitions: NULL
        type: const
possible_keys: PRIMARY
         key: PRIMARY
    key_len: 4
        ref: const
        rows: 1
    filtered: 100.00
      Extra: NULL
*************************** 2. row ***************************
          id: 1
select_type: SIMPLE
       table: s
  partitions: NULL
        type: ref
possible_keys: PRIMARY
         key: PRIMARY
    key_len: 4
        ref: const
        rows: 17
    filtered: 100.00
      Extra: NULL
2 rows in set, 1 warning (0.00 sec)
​
mysql> show warnings\g
+-------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Level | Code | Message                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+-------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Note  | 1003 | /* select#1 */ select '10001' AS `emp_no`,'1953-09-02' AS `birth_date`,'Georgi' AS `first_name`,'Facello' AS `last_name`,'M' AS `gender`,'1986-06-26' AS `hire_date`,`employees`.`s`.`emp_no` AS `emp_no`,`employees`.`s`.`salary` AS `salary`,`employees`.`s`.`from_date` AS `from_date`,`employees`.`s`.`to_date` AS `to_date` from `employees`.`employees` `e` left join `employees`.`salaries` `s` on((`employees`.`s`.`emp_no` = 10001)) where 1 |
+-------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
​

注意:扩展的 EXPLAIN,使用 IntelliJ IDEA 是无法实现的,必须要在 MySQL 终端里面才可以实现。

通过上面的结果显示来看,MySQL 优化之后的 SQL 语句和原始的 SQL 语句差异还是挺大的。

由于 SHOW WARNING 的结果并不一定是一个有效的 SQL 语句,也不一定能够执行。

在上面实例中没出现特殊标记,在平时的项目中,扩展的 EXPLAIN 在输出中可能会出现以下特殊标记:

  • <auto_key>:自动生成的临时表 key。
  • (expr):表达式(如标量子查询)执行一次,并且将结果值保存在内存中供以后使用。对于由多个值组成的结果,可以创建一个临时表,将会会看到 的信息。
  • (query fragment):将子查询谓词转换为 EXISTS 谓词,然后将子查询转换为可以与 EXISTS 谓词一起使用 。
  • <in_optimizer>(query fragment):这是一个内部优化器对象,对用户没有任何意义。
  • <index_lookup>(query fragment):使用索引查找来处理查询片段,从而查找符合条件的行。
  • (condition, expr1, expr2):如果条件为 true,则取 expr1,否则取 expr2。
  • <is_not_null_test>(expr):验证表达式不为 NULL 的测试。
  • (query fragment):使用子查询实现。
  • materialized-subquery.col_name:在内部物化临时表中对 col_name 引用,以保存评估子查询的结果。
  • <primary_index_lookup>(query fragment):使用主键查找来处理查询片段以查找符合条件的行。
  • <ref_null_helper>(expr):这是一个内部优化器对象,对用户没有任何意义。
  • /* select#N */ select_stmt:将 SELECT 与在非扩展的 EXPLAIN 行相关联,其具有一输出 id 的值 N。
  • outer_tables semi join (inner_tables):半连接操作。 inner_tables 显示未拉出的表。
  • :表示为缓存中间结果而创建的内部临时表。

当某些表属于 const 或 system 类型时,这些表中的列所涉及的表达式将由提前评估涉,并不是所显示语句的一部分。但是,当使用 FORMAT=JSON,某些 const 表访问显示为 ref 使用常量值的访问。

估计查询性能

多数情况下,你可以通过计算磁盘的搜索次数来估算查询性能。对于比较小的表,通常可以在一次磁盘搜索中找到行(因为索引可能已经被缓存了),而对于更大的表,你可以使用 B-tree 索引进行估算︰你需要进行多少次查找才能找到行: log(row_count) / log(index_block_length / 3 * 2 / (index_length + data_pointer_length) ) + 1。

在MysQL中, index_block_length 通常是1024字节,数据指针一般是 4 字节。比方说,有一个 500,000 的表,key 是 3 字节,那么根据计算公式:log(500 ,000 ) /1g ( 1024/3*2/( 3+4) ) + 1 = 4 次搜索。

该索引将需要 500,000 x 7 X 3/2=5.2MB 的存储空间(假设典型的索引缓存的填充率是 2/3),因此你可以在内存中存放更多索引,可能只要一到两个调用就可以找到想要的行了。 但是,对于写操作,你需要四个搜索请求来查找在何处放置新的索引值,然后通常需要 2 次搜索来更新索引并写入行。

前面的讨论并不意味着你的应用性能会因为 logN 而缓慢下降。只要内容被 OS 或 MySQL 服务器缓存,随着表的变大,只会稍微变慢。在数据量变得太大而无法缓存后,将会变慢很多,直到你的应用程序受到磁盘搜索约束(按照 logN 增长)。为了避免这种情况,可以根据数据的增长而增加 key 的。对于MyISAM表,key 的缓存大小由名为 key_buffer_size 的

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

工具 pg_recovery 设计原理与源码解读 数

发表于 2021-11-26

作者:张连壮 PostgreSQL 研发工程师

从事多年 PostgreSQL 数据库内核开发,对 citus 有非常深入的研究。

上一期 我们介绍了 PostgreSQL 数据找回工具:pg_reconvery

本文将带大家了解 pg_recovery 工具的实现原理、设计思路,并带来源码解读。

| 数据找回的实现原理

一个数据库系统正常的数据读取方式,是从做 select * from pg_recovery 的查询开始(即执行事务),执行查询操作过程将同时生成事务的快照,通过 GetActiveSnapshot() 函数,便可以看到当前可见的数据。

| 设计思路

  1. 如何读取 Dead 元组?

PostgreSQL 通过 快照 来决定当前数据库数据的可见性,因此当一条数据被删除时,数据的实体仍然存在于数据库实例中,通常管这种不可见的数据叫做 Dead 元组(PostgreSQL 中一条数据称为一个元组)。

PostgreSQL 中提供了 SnapshotAny 的特殊快照(还有很多其他类型)。这个快照可以读取任何数据,pg_recovery 便是通过该方式读取的所有数据。默认情况下,只返回 recovery 的数据,不返回可见的数据。

  1. 函数一次返回多少数据?

数据量是按行返回的,并且每次限定一行。

  1. 如何控制内存?

函数会多次执行,而有些状态是全局级的。因此可以使用 multi_call_memory_ctx (内存池的上下文)参数,来控制内存。

关于函数的参数

通过 SQL 创建函数时,执行如下语句。函数使用请参照上一期内容。

1
sql复制代码CREATE FUNCTION pg_recovery(regclass, recoveryrow bool DEFAULT true) RETURNS SETOF record

regclass:PostgreSQL 的表类型,会将表名自动转换成 OID(OID 数据库内部对象的唯一标识),因此只需输入表名即可。

reconveryrow bool DEFAULT ture:默认值 true,表示只返回 recovery 数据。取值 false, 表示返回所有数据。
执行下列语句,修改参数默认值。

1
csharp复制代码select * from pg_recovery('aa', recoveryrow => false)

RETURNS SETOF record:函数返回行类型数据。

| 源码解读

必要的数据

1
2
3
4
5
6
7
8
9
ini复制代码typedef struct
{
Relation rel; -- 当前操作的表
TupleDesc reltupledesc; -- 表的元信息
TupleConversionMap *map; -- 表的映射图,即表的数据映射成自定义返回的列
TableScanDesc scan; -- 扫描表
HTAB *active_ctid; -- 可见数据的ctid
bool droppedcolumn; -- 是否删除列
} pg_recovery_ctx;

隐藏列

增加 recoveryrow 的隐藏列,当返回全部信息时,通过此列可以辨别出该行数据是 recovery 的数据,还是用户可见的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
objectivec复制代码static const struct system_columns_t {
char *attname;
Oid atttypid;
int32 atttypmod;
int attnum;
} system_columns[] = {
{ "ctid", TIDOID, -1, SelfItemPointerAttributeNumber },
{ "xmin", XIDOID, -1, MinTransactionIdAttributeNumber },
{ "cmin", CIDOID, -1, MinCommandIdAttributeNumber },
{ "xmax", XIDOID, -1, MaxTransactionIdAttributeNumber },
{ "cmax", CIDOID, -1, MaxCommandIdAttributeNumber },
{ "tableoid", OIDOID, -1, TableOidAttributeNumber },
{ "recoveryrow", BOOLOID, -1, DeadFakeAttributeNumber },
{ 0 },
};

pg_recovery 简化代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
ini复制代码Datum
pg_recovery(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
pg_recovery_ctx *usr_ctx;

recoveryrow = PG_GETARG_BOOL(1); -- 获取默认参数

if (SRF_IS_FIRSTCALL()) -- 每条数据,函数都会调用一次,因此需要先初始化数据
{
funcctx = SRF_FIRSTCALL_INIT(); -- 申请上下文
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); -- 使用内存池

usr_ctx->rel = heap_open(relid, AccessShareLock); -- 增加读锁
usr_ctx->reltupledesc = RelationGetDescr(usr_ctx->rel); -- 获取元信息
funcctx->tuple_desc = BlessTupleDesc(tupdesc); -- 函数使用的元信息
usr_ctx->map = recovery_convert_tuples_by_name(usr_ctx->reltupledesc,
funcctx->tuple_desc, "Error converting tuple descriptors!", &usr_ctx->droppedcolumn); -- 列映射
usr_ctx->scan = heap_beginscan(usr_ctx->rel, SnapshotAny, 0, NULL , NULL, 0); -- 扫描全部表数据
active_scan = heap_beginscan(usr_ctx->rel, GetActiveSnapshot(), 0, NULL , NULL, 0); -- 扫描可见数据
while ((tuplein = heap_getnext(active_scan, ForwardScanDirection)) != NULL)
hash_search(usr_ctx->active_ctid, (void*)&tuplein->t_self, HASH_ENTER, NULL); -- 缓存可见数据的 ctid

}

funcctx = SRF_PERCALL_SETUP(); -- 获取函数之前的上下文
usr_ctx = (pg_recovery_ctx *) funcctx->user_fctx;

get_tuple:
if ((tuplein = heap_getnext(usr_ctx->scan, ForwardScanDirection)) != NULL)
{
-- 检验表该数据是否是dead
hash_search(usr_ctx->active_ctid, (void*)&tuplein->t_self, HASH_FIND, &alive);

tuplein = recovery_do_convert_tuple(tuplein, usr_ctx->map, alive); -- 将原表数据转换成输出格式
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuplein)); -- 转换成Datum格式,返回数据
}
else
{
-- 读取完数据
heap_endscan(usr_ctx->scan); -- 结束扫描表
heap_close(usr_ctx->rel, AccessShareLock); -- 释放锁
SRF_RETURN_DONE(funcctx); --释放函数资源
}
}

生成映射表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ini复制代码TupleConversionMap *
recovery_convert_tuples_by_name(TupleDesc indesc,
TupleDesc outdesc,
const char *msg, bool *droppedcolumn)
{

attrMap = recovery_convert_tuples_by_name_map(indesc, outdesc, msg, droppedcolumn); -- 处理recoveryrow/隐藏列/可见列的映射

map->indesc = indesc;
map->outdesc = outdesc;
map->attrMap = attrMap;
map->outvalues = (Datum *) palloc(n * sizeof(Datum));
map->outisnull = (bool *) palloc(n * sizeof(bool));
map->invalues = (Datum *) palloc(n * sizeof(Datum));
map->inisnull = (bool *) palloc(n * sizeof(bool));
map->invalues[0] = (Datum) 0;
map->inisnull[0] = true;

return map;
}

元组转换函数

1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码HeapTuple
recovery_do_convert_tuple(HeapTuple tuple, TupleConversionMap *map, bool alive)
{
heap_deform_tuple(tuple, map->indesc, invalues + 1, inisnull + 1); -- 将元组拆分,提取列数据

for (i = 0; i < outnatts; i++)
{
outvalues[i] = invalues[j]; -- 转换数据
outisnull[i] = inisnull[j]; -- 转换数据
}

return heap_form_tuple(map->outdesc, outvalues, outisnull); -- 将列数据转换成元组
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Dubbo服务启动-Dubbo Consumer引用服务

发表于 2021-11-26

Consumer消费者Demo示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder/>

<dubbo:application name="serialization-java-consumer">
<dubbo:parameter key="qos.enable" value="true" />
<dubbo:parameter key="qos.accept.foreign.ip" value="false" />
<dubbo:parameter key="qos.port" value="33333" />
</dubbo:application>

<dubbo:registry address="zookeeper://${zookeeper.address:127.0.0.1}:2181"/>

<dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.serialization.api.DemoService"/>

</beans>
在之前的章节中已经知道,Dubbo基于Spring自定义标签规范实现了自定义标签,通过自定义标签完成了bean的加载,并且通过实现监听Spring容器刷新完毕事件启动dubbo客户端。启动客户端伴随着服务发布和服务的订阅。
1
2
3
4
5
6
7
8
9
10
java复制代码public class DubboConsumer {

public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-demo-consumer.xml");
context.start();
DemoService demoService = context.getBean("demoService", DemoService.class);
String hello = demoService.sayHello("world");
System.out.println(hello);
}
}
dubbo通过`<dubbo:reference`标签引用服务,之后在程序中通过Spring的Context依赖查找(getBean)的方式获取引用的服务的代理实例。`<dubbo:reference`加载的Bean是ReferenceBean,它实现了FactoryBean接口,getBean时会调用ReferenceBean的getObject()方法,这是获取引用的入口。getBean方法会判断Reference对象是否是空的,如果是空的,调用init方法。代码如下:
1
2
3
4
java复制代码    @Override
public Object getObject() {
return get();
}

ReferenceConfig#getObject()获取应用Bean

`ReferenceBean`继承了`ReferenceConfig`,当调用ReferenceBean的getObject()方法会调用`ReferenceBean`的get()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
java复制代码    public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// 代理引用如果是空的,调用init
if (ref == null) {
init();
}
return ref;
}

public synchronized void init() {
if (initialized) {
return;
}

if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
// 1. 检查配置ConsumerConfig,有的话检查配置,没有就新建一个ConsumerConfig
// 2. 反射创建调用的API
// 3. 初始化ServiceMetadata
// 4. 注册Consumer
// 5. 检查ReferenceConfig,RegistryConfig,ConsumerConfig
checkAndUpdateSubConfigs();

checkStubAndLocal(interfaceClass);
// 检查引用的接口是否mock
ConfigValidationUtils.checkMock(interfaceClass, this);
// consumer的信息
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
// 添加运行时参数到map,包括:dubbo,release,timestamp,pid
ReferenceConfigBase.appendRuntimeParameters(map);
// 是不是泛化,不是的话进入条件
if (!ProtocolUtils.isGeneric(generic)) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 获取方法,生成包装类,使用javassist,将生成的类放到WRAPPER_MAP中,key是org.apache.dubbo.samples.serialization.api.DemoService类对象,value是包装类的实例
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
// method放到map,这里method是sayHello
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
// interface org.apache.dubbo.samples.serialization.api.DemoService
map.put(INTERFACE_KEY, interfaceName);
// 追加其他参数到map中
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ConsumerConfig
// appendParameters(map, consumer, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, consumer);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
Map<String, AsyncMethodInfo> attributes = null;
if (CollectionUtils.isNotEmpty(getMethods())) {
attributes = new HashMap<>();
for (MethodConfig methodConfig : getMethods()) {
AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
if (asyncMethodInfo != null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
attributes.put(methodConfig.getName(), asyncMethodInfo);
}
}
}

String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(REGISTER_IP_KEY, hostToRegistry);
// 所有数据在存到serviceMetadata的attachments中
serviceMetadata.getAttachments().putAll(map);
// 创建Service代理
ref = createProxy(map);
// 设置ServiceMetadata的Service代理引用
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);
// 标记初始化完毕
initialized = true;

// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}

ReferenceConfig#createProxy()创建服务代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
java复制代码    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
// 是否是InJvm,协议是InJvm
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear();
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// 如果protocols不是injvm检查注册中心
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
// url列表,将zookeeper://改成registry://
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
// 监控URL
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
// 如果有监控配置放到map中
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 根据map中的信息生成url,这里生成的结果是
/*
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=serialization-java-consumer&dubbo=2.0.2&pid=66793&qos.accept.foreign.ip=false&qos.enable=true&qos.port=33333&refer=application%3Dserialization-java-consumer%26check%3Dtrue%26dubbo%3D2.0.2%26init%3Dfalse%26interface%3Dorg.apache.dubbo.samples.serialization.api.DemoService%26methods%3DsayHello%26pid%3D66793%26qos.accept.foreign.ip%3Dfalse%26qos.enable%3Dtrue%26qos.port%3D33333%26register.ip%3D192.168.58.45%26release%3D2.7.7%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1636532992568&registry=zookeeper&release=2.7.7&timestamp=1636533091883
*/
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
// 如果引用的URL就1个直接通过refer引用服务,这里和export相似,通过调用链实现的
// - ProtocolListenerWrapper
// - - ProtocolFilterWrapper
// - - - RegistryProtocol
if (urls.size() == 1) {
// 通过RegistryProtocol#refer引用服务
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
// 如果是引用的服务多个,循环处理
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
// 集群处理
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
// 加入到集群中,这里包含集中集群处理模式,分别是:
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
// invoer不可用处理
if (shouldCheck() && !invoker.isAvailable()) {
invoker.destroy();
throw new IllegalStateException("Failed to check the status of the service "
+ interfaceName
+ ". No provider available for the service "
+ (group == null ? "" : group + "/")
+ interfaceName +
(version == null ? "" : ":" + version)
+ " from the url "
+ invoker.getUrl()
+ " to the consumer "
+ NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData存储,SPI机制,支持内存和远程两种方式
*/
String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
创建代理服务会创建Invoker,在引用服务过程中,会判断协议是否为injvm,会根据协议做不同的处理,不是injvm协议会根据构造的配置信息(map)生成url并将url协议有zookeeper://改成registry://,然后通过`Protocol`接口`refer`方法引用服务,与发布服务相似,引用服务的过程也会包装方法的调用链,如下:
1
2
3
diff复制代码- ProtocolListenerWrapper
- - ProtocolFilterWrapper
- - - RegistryProtocol
在refer的过程中会对一个服务端的引用和一个服务多个服务端的服务进行区分处理,对于有多个服务端的服务会进行集群处理(cluster),会讲invoker列表加入到集群中,在调用过程中会根据集群策略来选择不同的策略进行调用,集群策略实现也实现了SPI机制,

RegistryProtocol#refer引用服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码    @Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 注册中心url 将 registry:// 转成 zookeeper://
url = getRegistryUrl(url);
// 获取注册中心
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// 从url中解析出引用服务信息
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
// 获取分组
String group = qs.get(GROUP_KEY);
// 如果设置分组了,那么使用MergeableCluster策略
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}

RegistryProtocol#doRefer引用服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 获取集群目录Directory,代表Invoker的集合
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
// 注册消费者url
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 向ZK发起订阅服务,并设置监听,这里比较复杂,最终会在ZookeeperRegistry#doSubscribe做服务订阅
directory.subscribe(toSubscribeUrl(subscribeUrl));
// 加入集群
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}

RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}

RegistryDirectory#subscribe订阅服务

`RegistryProtocol#doRefer`方法,directory.subscribe会按照下面的调用链进行处理,最后调用`ZookeeperRegistry#doSubscribe`方法向zk注册数据订阅接口,并设置监听。
1
2
3
4
diff复制代码- RegistryDirectory
- - ListenerRegistryWrapper
- - - FailbackRegistry
- - - - ZookeeperRegistry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码    @Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 初始化监听
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
// 创建zk节点
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

DubboProtocol#protocolBindingRefer创建Invoker

refer服务创建Invoker时会调用该方法,该方法会通过`getClients`创建网络客户端,创建客户端是会判断客户端是否为共享链接,根据`connections`创建客户端`ExchangeClient`,然后通过`initClient`初始化客户端,初始化过程中会判断是否为延迟的客户端`LazyConnectExchangeClient`,不是延迟客户端,就会通过`connect`连接服务提供者,与服务提供者连接,具体建立连接流程不在这里说明,会在网络通信介绍。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
java复制代码    @Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);

// 创建RPC Invoker,通过getClients(url)创建网络客户端
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
// 将invoker添加到invokers中
invokers.add(invoker);

return invoker;
}

// 创建客户端数组
private ExchangeClient[] getClients(URL url) {
// whether to share connection
// 是否共享链接
boolean useShareConnect = false;

int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;

/*
* The xml configuration should have a higher priority than properties.
* xml配置优先级高于properties配置
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
// 共享链接客户端
shareClients = getSharedClient(url, connections);
}
// 创建ExchangeClient
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
// 从共享客户端获取
clients[i] = shareClients.get(i);

} else {
// 初始化客户端
clients[i] = initClient(url);
}
}

return clients;
}

// 初始化客户端
private ExchangeClient initClient(URL url) {

// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}

ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
// 延迟加载客户端
client = new LazyConnectExchangeClient(url, requestHandler);

} else {
// 通过NettyTransporter connect创建客户端 与provider建立连接
client = Exchangers.connect(url, requestHandler);
}

} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}

return client;
}

总结

Dubbo Consumer整个的启动,引用服务的流程大致就看完了,`ReferenceBean`实现了Spring的`FactoryBean`接口,在使用Spring上下文getBean时就会调用到`ReferenceBean`的getObject方法,这时就会通过创建代理(createProxy),然后通过`Protocol`的`refer`方法引用服务;


引用服务的流程大致可以理解为,通过`DubboProtocol`的`refer`方法创建`DubboInvoker`调用`getClients`方法创建`ExchangeClient`,然后通过`initClient`方法初始化网络客户端,初始化客户端过程中会通过`Exchangers`的`connect`与服务提供者建立连接,后续就是网络客户端与服务端建立连接这里会和服务提供者相似,通过`doOpen`初始化网络客户端,然后调用`doConnect`建立连接。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Gin第五天---数据解析和绑定 Gin第五天

发表于 2021-11-26

Gin第五天

通过后端需要接收前端页面传来的参数,然后进行解析。传来的数据类型可能是json或者其他数据类型,分别看看几种不同的处理方式。

1.Json数据解析和绑定

创建一个处理Json格式的路由,其中接收数据的格式已经提前定义为一种结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
go复制代码package main

import (
"github.com/gin-gonic/gin"
"net/http"
)

// 定义接收数据的结构体
type Login struct {
// binding:"required"修饰的字段,若接收为空值,则报错,是必须字段
User string `form:"username" json:"user" uri:"user" xml:"user" binding:"required"`
Pssword string `form:"password" json:"password" uri:"password" xml:"password" binding:"required"`
}

func main() {
// 1.创建路由
// 默认使用了2个中间件Logger(), Recovery()
r := gin.Default()
// JSON绑定
r.POST("loginJSON", func(c *gin.Context) {
// 声明接收的变量
var json Login
// 将request的body中的数据,自动按照json格式解析到结构体
if err := c.ShouldBindJSON(&json); err != nil {
// 返回错误信息
// gin.H封装了生成json数据的工具
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 判断用户名密码是否正确
if json.User != "root" || json.Pssword != "admin" {
c.JSON(http.StatusBadRequest, gin.H{"status": "304"})
return
}
c.JSON(http.StatusOK, gin.H{"status": "200"})
})
r.Run()
}

image-20211126165743941

image-20211126165804133

image-20211126170133015

其中,c.json()就是用json格式返回响应

image-20211126170303919

源码也很简单,将传入的数据转为json格式输出到页面

关于ShouldBindJSON

这里面讲了ShouldBind以及Bind等的区别,大家可以看看。

  1. 表单数据解析和绑定

上面的是传入json数据,但是更多场景下,我们都是前端上传入表单数据给后端服务,所以更多还要处理表单数据。

先写一个简单的前端表单html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
html复制代码<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
</head>
<body>
<form action="http://localhost:8000/loginForm" method="post" enctype="application/x-www-form-urlencoded">
用户名<input type="text" name="username"><br>
密码<input type="password" name="password">
<input type="submit" value="提交">
</form>
</body>
</html>

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
go复制代码package main

import (
"net/http"

"github.com/gin-gonic/gin"
)

// 定义接收数据的结构体
type Login1 struct {
// binding:"required"修饰的字段,若接收为空值,则报错,是必须字段
User string `form:"username" json:"user" uri:"user" xml:"user" binding:"required"`
Pssword string `form:"password" json:"password" uri:"password" xml:"password" binding:"required"`
}

func main() {
// 1.创建路由
// 默认使用了2个中间件Logger(), Recovery()
r := gin.Default()
// JSON绑定
r.POST("/loginForm", func(c *gin.Context) {
// 声明接收的变量
var form Login1
// Bind()默认解析并绑定form格式
// 根据请求头中content-type自动推断
if err := c.Bind(&form); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 判断用户名密码是否正确
if form.User != "root" || form.Pssword != "admin" {
c.JSON(http.StatusBadRequest, gin.H{"status": "304"})
return
}
c.JSON(http.StatusOK, gin.H{"status": "200"})
})
r.Run()
}

image-20211126172447845

image-20211126172508109

  1. URI数据解析和绑定

还有一种情况,我们的参数全部就在我们的URI中,这种情况我们可以解析路由。联系到第二天我们讲了路由api中的通配符,其实这里就要用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
go复制代码package main

import (
"net/http"

"github.com/gin-gonic/gin"
)

// 定义接收数据的结构体
type Login2 struct {
// binding:"required"修饰的字段,若接收为空值,则报错,是必须字段
User string `form:"username" json:"user" uri:"user" xml:"user" binding:"required"`
Pssword string `form:"password" json:"password" uri:"password" xml:"password" binding:"required"`
}

func main() {
// 1.创建路由
// 默认使用了2个中间件Logger(), Recovery()
r := gin.Default()
// JSON绑定
r.GET("/:user/:password", func(c *gin.Context) {
// 声明接收的变量
var login Login2
// Bind()默认解析并绑定form格式
// 根据请求头中content-type自动推断
if err := c.ShouldBindUri(&login); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 判断用户名密码是否正确
if login.User != "root" || login.Pssword != "admin" {
c.JSON(http.StatusBadRequest, gin.H{"status": "304"})
return
}
c.JSON(http.StatusOK, gin.H{"status": "200"})
})
r.Run(":8000")
}

image-20211126173331643

我们在GET方法中就用到了通配符去匹配对应的参数,然后用Gin自带的解析器去解析和绑定

总结

其实框架已经帮我们把大部分解析绑定的任务实现了,我们只需要事先定义好接收数据的结构体,以及结构体中对应不同格式的字段名称,然后就可以实现解析和绑定。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

微服务的了解

发表于 2021-11-26

「这是我参与11月更文挑战的第7天,活动详情查看:2021最后一次更文挑战」

  1. 系统架构的演变

互联网早期,网站的应用流量小,只需要一个单一应用架构就可以支撑服务的功能,减少开发、部署维护的成本。

随着智能设备的普及,手机电脑的广泛使用,互联网人群增大,网站的访问量也逐渐增大,但是可能只是单一的模块的访问量增大,例如电商项目的订单模块,这时的架构师希望将单个系统拆分成多个系统,给访问量大的系统增加节点,提高效率(垂直应用架构)。

随着垂直应用架构的使用,弊端显现,系统间独立运行,无法相互调用,系统间有重复的模块。之后便对服务继续拆分,拆分成业务层(业务的逻辑)和表现层(页面的交互),服务层抽取公共功能作为服务层。(分布式架构)

但是服务越来越多,便增加一个服务调度中心,对服务进行资源调度和治理(SOA Service Oriented Architecture,面向服务的架构),这和现在的微服务架构相差无几了,但针对微服务的问题解决还差系统性的方案和处理。

微服务架构急需一系列针对微服务问题的系统性解决方案。

  1. 微服务组件解决的四个问题

【微服务需要解决的问题】

  • 这么多小服务,如何管理 - 服务治理 Eureka
  • 服务间如何通信 - 服务调用 OpenFeign
  • 客户端如何访问这些服务 - 服务网关 Zuul
  • 服务出错如何处理、排查 - 服务容错 Hystrix

各大厂商针对这些问题,提出解决的方案,SpringCloud将他们整合,便有了现在的微服务架构。

SpringCloud就是单纯的对这些组件进行整合,对用 SpringBoot进行开发的微服务进行管理的系统性生态。

【组件的作用】

  • Eureka:各个服务启动时,服务提供者都会将服务注册到Eureka Server,并且服务消费者还可以反过来从 Eureka Server拉取注册表,从而知道其他服务在哪里
  • OpenFeign
    • Ribbon:服务间发起请求的时候,基于 Ribbon做负载均衡,从一个服务的多台机器中选择一台
      • Feign:基于 Feign的动态代理机制,根据注解和选择的机器,拼接请求URL地址,发起请求
  • Hystrix:发起请求是通过Hystrix的线程池来走的,不同的服务走不同的线程池,实现了不同服务调用的隔离,避免了服务雪崩的问题
  • Zuul:如果前端、移动端要调用后端系统,统一从Zuul网关进入,由Zuul网关转发请求给对应的服务

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kafka 目录里的脚本那么多,它们都是用来干什么的?

发表于 2021-11-26

这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战

相关:Apache Kafka 的本地部署

以下是 Apacke Kafka 2.13 安装目录中 bin/ 中的脚本文件列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
arduino复制代码➜  tree -L 1 bin

bin
├── connect-distributed.sh
├── connect-mirror-maker.sh
├── connect-standalone.sh
├── kafka-acls.sh
├── kafka-broker-api-versions.sh
├── kafka-cluster.sh
├── kafka-configs.sh
├── kafka-console-consumer.sh
├── kafka-console-producer.sh
├── kafka-consumer-groups.sh
├── kafka-consumer-perf-test.sh
├── kafka-delegation-tokens.sh
├── kafka-delete-records.sh
├── kafka-dump-log.sh
├── kafka-features.sh
├── kafka-get-offsets.sh
├── kafka-leader-election.sh
├── kafka-log-dirs.sh
├── kafka-metadata-shell.sh
├── kafka-mirror-maker.sh
├── kafka-producer-perf-test.sh
├── kafka-reassign-partitions.sh
├── kafka-replica-verification.sh
├── kafka-run-class.sh
├── kafka-server-start.sh
├── kafka-server-stop.sh
├── kafka-storage.sh
├── kafka-streams-application-reset.sh
├── kafka-topics.sh
├── kafka-transactions.sh
├── kafka-verifiable-consumer.sh
├── kafka-verifiable-producer.sh
├── trogdor.sh
├── windows
├── zookeeper-security-migration.sh
├── zookeeper-server-start.sh
├── zookeeper-server-stop.sh
└── zookeeper-shell.sh

1 directory, 37 files

其中有一个叫做 windows 的目录,里面是 Windows 系统下使用的 .bat 批处理文件,其余都是 .sh 脚本文件,包含之前的文章介绍过的几个脚本:

  • zookeeper-server-start.sh 用来启动 ZooKeeper 服务
  • kafka-server-start.sh 用来启动 Kafka Broker
  • kafka-topics.sh 用来操作 Topics
  • kafka-configs.sh 用来修改 Broker 端的动态配置等
  • kafka-consumer-groups 前面的文章中我们用它来重设消费者组的位移等

这些脚本中,zookeeper 开头的几个脚本是用来操作 ZooKeeper 的,connect 开头的几个脚本与 Kafka Connect 有关,其余是大量的与 Kafka 相关的脚本。

这些脚本的文档,都可以通过文件名加 --help 查询到,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
vbnet复制代码➜ bin/kafka-log-dirs.sh --help

This tool helps to query log directory usage on the specified brokers.
Option Description
------ -----------
--bootstrap-server <String: The server REQUIRED: the server(s) to use for
(s) to use for bootstrapping> bootstrapping
--broker-list <String: Broker list> The list of brokers to be queried in
the form "0,1,2". All brokers in the
cluster will be queried if no broker
list is specified
--command-config <String: Admin client Property file containing configs to be
property file> passed to Admin Client.
--describe Describe the specified log directories
on the specified brokers.
--help Print usage information.
--topic-list <String: Topic list> The list of topics to be queried in
the form "topic1,topic2,topic3". All
topics will be queried if no topic
list is specified (default: )
--version Display Kafka version.

本文主要介绍这些脚本中常用的几个

kafka-broker-api-versions

这个脚本用于验证不同版本的 Broker 和 Consumer 之间的适配行。执行结果如下(控制台输出还有很多行,用省略号代替了):

1
2
3
4
5
6
7
less复制代码➜ bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

192.168.1.21:9092 (id: 0 rack: null) -> (
Produce(0): 0 to 9 [usable: 9],
Fetch(1): 0 to 12 [usable: 12],
ListOffsets(2): 0 to 7 [usable: 7],
......

这里列出了各种请求的适配情况,我们以 Produce(0): 0 to 9 [usable: 9] 为例来详细介绍。

  • Produce 表示 Produce 请求,生产者向 Broker 发送消息其实就是在发送 Produce 请求。
  • (0) 表示的是序号
  • 0 to 9 表示在当前的 Broker 中,支持 0 到 9 共 10 个版本的 Producer 请求。
  • [usable: 9] 表示当前的客户端使用的是版本序号为 9 的请求。这里的客户端指的其实就是我们使用的 kafka-broker-api-versions 脚本。如果你使用不同版本 Kafka 中的 kafka-broker-api-versions 脚本验证同一个 Broker 实例,这里会得到不同的结果。

kafka-console-consumer 和 kafka-console-producer

在之前的文章(Apache Kafka 的本地部署)中,曾经使用这两个命令来验证 Kafka 是否部署和运行成功。它们分别可以用来生产和消费消息。

使用一条简单的指令就可以向 Kafka 提交消息(记得提前创建好 Topic):

1
css复制代码➜ bin/kafka-console-producer.sh --topic hello-events --bootstrap-server localhost:9092

如果要提供多个 Broker 节点,可以将 --bootstrap-server 替换成 --broker-list,多个节点的地址用逗号隔开即可。

如果要消费一个主题的消息,可以使用如下方式:

1
css复制代码➜ bin/kafka-console-consumer.sh --topic hello-events --from-beginning --bootstrap-server localhost:9092

执行后,消息会被打印到控制台上。

这里有一个 --from-beginning 参数,它代表从当前最早的位移开始消费消息,相当于使用了 Earliest 策略重设位移(参考:Kafka 消费者组位移重设的几种方式)

这两个用于生产和消费消息的命令,一般很少用在实际的场景中,更多的是用来测试。

kafka-producer-perf-test 和 kafka-consumer-perf-test

这两个命令也是生产者和消费者对应的命令成对出现的,用于对生产和消费消息的性能做测试。

比如下面的脚本:

1
css复制代码bin/kafka-producer-perf-test.sh --topic hello-events --num-records 100000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=localhost:9092

向指定的 Topic 发送了十万条消息(这个数量太少了,这里仅做演示),每条 1024 个字节,得到如下结果:

1
matlab复制代码100000 records sent, 65919.578115 records/sec (64.37 MB/sec), 2.89 ms avg latency, 253.00 ms max latency, 0 ms 50th, 20 ms 95th, 22 ms 99th, 23 ms 99.9th.

这里展示了每秒发送的消息数、吞吐量、平均延时,以及几个分位数,我们可以重点关注末尾的分位数,23 ms 99.9th 表示 99.9% 的消息延时都在 23ms 之内,这是性能判断的重要依据。

消费端的性能测试命令会稍微简单一些:

1
bash复制代码bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 100000 --topic hello-events

结果如下:

1
2
lua复制代码start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-11-26 16:31:12:691, 2021-11-26 16:31:17:028, 98.1396, 22.6285, 100495, 23171.5472, 3594, 743, 132.0857, 135255.7201

只有一些时间和吞吐量的数据,并没有分位数的结果。

kafka-dump-log

用来查看消息文件的数据,或者读取到一个文件中。

1
bash复制代码bin/kafka-dump-log.sh --files /tmp/kafka-logs/hello-events-0/00000000000000000000.log

使用这个脚本需要指定一个 .log 文件的路径,读取结果的内容都是一条条类似如下格式的内容:

1
less复制代码baseOffset: 1035315 lastOffset: 1035329 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1073690676 CreateTime: 1637914980747 size: 15556 magic: 2 compresscodec: none crc: 4007605223 isvalid: true

包含了消息集合位移范围、数量、创建时间、压缩算法等信息。如果想看每一条的具体信息,可以通过在命令之后添加 --deep-iteration 查看。结果格式如下:

1
2
3
4
5
less复制代码baseOffset: 182055 lastOffset: 182069 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 188803172 CreateTime: 1637914971926 size: 15556 magic: 2 compresscodec: none crc: 2670957992 isvalid: true
| offset: 182055 CreateTime: 1637914971926 keySize: -1 valueSize: 1024 sequence: -1 headerKeys: []
| offset: 182056 CreateTime: 1637914971926 keySize: -1 valueSize: 1024 sequence: -1 headerKeys: []
| offset: 182057 CreateTime: 1637914971926 keySize: -1 valueSize: 1024 sequence: -1 headerKeys: []
(省略部分类似的内容)

增加了每一条消息的具体信息,你甚至还可以通过加 --print-data-log 查看到消息的具体内容。

kafka-consumer-groups

kafka-consumer-groups 脚本除了之前介绍过的用途,还可以用来查看消费者组的位移,命令如下:

1
css复制代码bin/kafka-consumer-groups.sh --describe --all-groups --bootstrap-server localhost:9092

执行后可以得到如下结果:

1
2
sql复制代码GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
perf-consumer-27937 hello-events 0 100495 3513680 3413185 - - -

这个命令会列出所有消费者组的信息,如果想要查看单个消费者组的信息,把其中的 --all-groups 替换成 --group <group_id> 即可。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

数据导出成Excel表格

发表于 2021-11-26
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
js复制代码//Controller层
@PostMapping(value = "/exportPlanDemandList")
@ApiOperation("需求条目导出")
@AdviceLog(value = "需求条目导出", type = LogType.OPERATE, operateType = LogOperateType.EXPORT)
public void exportDemandList(@RequestBody DemandPlanListVo vo, OutputStream out) {
try {
String sheetName = "需求条目";
// 获取所需数据的分页
Pager pager = demandPlanListService.getDemandListPage(vo);
// 将查询出的数据集合传入
List<Map<String, Object>> list = ConvertUtils.listConvert(pager.getRows());
String[] excelTilte = BasisFormationConstant.excelDemandList2;
String[] headerTitle = Arrays.stream(excelTilte).map(e -> e.split(":")[1]).toArray(size -> new String[size]);
String[] headerValues = Arrays.stream(excelTilte).map(e -> e.split(":")[0]).toArray(size -> new String[size]);
ExcelUtil.exportExcel2007WithSXSSF(sheetName, headerTitle, headerValues, list, out, null);
} catch (Exception e) {
e.printStackTrace();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
js复制代码
public class ConvertUtils {
private static final String CHINA_GREEN = "绿色";
private static final String ENG_GREEN = "green";

private static final String CHINA_RED = "红色";
private static final String ENG_RED = "red";

private static final String CHINA_ORANGE = "橙色";
private static final String ENG_ORANGE = "orange";

private static final String CHINA_PURPLE = "紫色";
private static final String ENG_PURPLE = "purple";

private static final String CHINA_YELLOW = "黄色";
private static final String ENG_YELLOW = "yellow";

private static final String CHINA_BLUE = "蓝色";
private static final String ENG_BLUE = "#0c60aa";





/**
* list克隆
*
* @param sources
* @param c
* @param <E>
* @param <T>
* @return
*/
public static <E, T> List<T> convertList(List<E> sources, Class<T> c) {
if (CollectionUtils.isEmpty(sources)) {
return new ArrayList<T>();
}
List<T> list = new ArrayList<T>();
for (E source : sources) {
list.add(convertBean(source, c));
}
return list;
}
/**
* param convert entity
* @param param
* @param tClass
* @param <T>
* @param <E>
* @return
*/
public static <T, E> T convertBean(E param, Class<T> tClass) {
try {
if (param == null) {
return tClass.newInstance();
}
T instance = tClass.newInstance();
BeanUtils.copyProperties(param, instance);
return instance;
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
return null;
}
}

public static <T> List<Map<String, Object>> listConvert(List<T> list) {
// 定义List<Map<String, Object>>数组<br>          
// list为外部传进来的list集合
List<Map<String, Object>> list_map = new ArrayList<Map<String, Object>>();
if (org.apache.commons.collections.CollectionUtils.isNotEmpty(list)) {
//PropertyUtils.describe(Object)转换
list.forEach(item -> {
Map<String, Object> map = null;
try {
map = (Map<String, Object>) PropertyUtils.describe(item);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
list_map.add(map);
});
}
return list_map;
}

public static String convertBinaryColor(String color){
if(StringUtil.isNotEmpty(color)){
color=color.replace(CHINA_GREEN,ENG_GREEN);
color=color.replace(CHINA_RED,ENG_RED);
color=color.replace(CHINA_ORANGE,ENG_ORANGE);
color=color.replace(CHINA_PURPLE,ENG_PURPLE);
color=color.replace(CHINA_YELLOW,ENG_YELLOW);
color=color.replace(CHINA_BLUE,ENG_BLUE);
}
return color;
}


/**
* 特殊将字符串转义
* @param value
* @return
*/
public static String escapeStr(String value)
{
StringBuffer result=new StringBuffer();
if(value == null){
return null;
}else{
char content[] = new char[value.length()];
value.getChars(0, value.length(), content, 0);
result = new StringBuffer(content.length + 50);
for(int i = 0; i < content.length; i++)
switch(content[i])
{
case 34: // '"'
result.append("&quot;");
break;

case 38: // '&'
result.append("&amp;");
break;

case 39: // '''
result.append("&#39;");
break;

case 60: // '<'
result.append("&lt;");
break;

case 62: // '>'
result.append("&gt;");
break;

default:
result.append(content[i]);
break;
}
}
return result.toString();
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
js复制代码// 常量类(表格的类名)
public class BasisFormationConstant {
public static final String[] excelDemandList2 = new String[] {
"num:编号",
"demandName:需求名称",
"demandCategoryName:需求类别",
"demandLevel:对应层级",
"demandTypeName:需求验证类型",
"demandProduct:承接产品",
"remark:说明",
"resourceName:需求来源",
"verifyStatus:验证状态",
"demandStatus:需求状态",
"isIcon:是否包含图标",
"createTime:创建时间"
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
js复制代码public class ExcelUtil {

// 2003 版本 最大支持65536 行
public final static String EXCEL_FILE_2003 = "2003";
// 2007 版本以上 最大支持1048576行
public final static String EXCEl_FILE_2007 = "2007";
// 2007 版本以上 大数据量导出
public final static String EXCEl_FILE_2007_SXSSF = "2007_SXSSF";

/**
* <p>
* 导出无头部标题行Excel <br>
* 时间格式默认:yyyy-MM-dd hh:mm:ss <br>
* </p>
*
* @param title 表格标题
* @param headersField 表格字段集合
* @param dataList 数据集合
* @param out 输出流
* @param version 2003 或者 2007,不传时默认生成2003版本
* @throws IOException
*/
public static void exportExcel(String title, String[] headersField, List<Map<String, Object>> dataList, OutputStream out,
String version) throws IOException {
if (StringUtils.isEmpty(version) || EXCEL_FILE_2003.equals(version.trim())) {
exportExcel2003(title, null, headersField, dataList, out, "yyyy-MM-dd HH:mm:ss");
} else if(EXCEl_FILE_2007_SXSSF.equals(version.trim())) {
exportExcel2007WithSXSSF(title, null, headersField, dataList, out, "yyyy-MM-dd HH:mm:ss");
} else {
exportExcel2007(title, null, headersField, dataList, out, "yyyy-MM-dd HH:mm:ss");
}
}

/**
* <p>
* 导出带有头部标题行的Excel <br>
* 时间格式默认:yyyy-MM-dd hh:mm:ss <br>
* </p>
*
* @param title 表格标题
* @param headers 头部标题集合
* @param headersField 表格头部标题对应的字段集合
* @param dataList 数据集合
* @param out 输出流
* @param version 2003 或者 2007,不传时默认生成2003版本
* @throws IOException
*/
public static void exportExcel(String title, String[] headers, String[] headersField, List<Map<String, Object>> dataList,
OutputStream out, String version) throws IOException {
if (StringUtils.isBlank(version) || EXCEL_FILE_2003.equals(version.trim())) {
exportExcel2003(title, headers, headersField, dataList, out, "yyyy-MM-dd HH:mm:ss");
} else if(EXCEl_FILE_2007_SXSSF.equals(version.trim())) {
exportExcel2007WithSXSSF(title, headers, headersField, dataList, out, "yyyy-MM-dd HH:mm:ss");
} else {
exportExcel2007(title, headers, headersField, dataList, out, "yyyy-MM-dd HH:mm:ss");
}
}

/**
* <p>
* 通用Excel导出方法,利用反射机制遍历对象的所有字段,将数据写入Excel文件中 <br>
* 此版本生成2007以上版本的文件 (文件后缀:xlsx)
* </p>
*
* @param title
* 表格标题名
* @param headers
* 表格头部标题集合
* @param headersField
* 表格头部标题对应的字段集合
* @param dataList
* 需要显示的数据集合,集合中一定要放置符合JavaBean风格的类的对象。此方法支持的
* JavaBean属性的数据类型有基本数据类型及String,Date
* @param out
* 与输出设备关联的流对象,可以将EXCEL文档导出到本地文件或者网络中
* @param pattern
* 如果有时间数据,设定输出格式。默认为"yyyy-MM-dd hh:mm:ss"
* @throws IOException
*/
public static void exportExcel2007WithSXSSF(String title, String[] headers, String[] headersField,
List<Map<String, Object>> dataList, OutputStream out, String pattern) throws IOException {

SXSSFWorkbook workbook = null;
try {
// 声明一个工作薄
workbook = new SXSSFWorkbook();
// 生成一个表格
Sheet sheet = workbook.createSheet(title);
sheet.setDefaultColumnWidth(30);
// 生成一个样式
CellStyle headerStyle = workbook.createCellStyle();
// 设置这些样式
headerStyle.setFillForegroundColor(IndexedColors.GREY_25_PERCENT.getIndex());
// 生成一个字体
org.apache.poi.ss.usermodel.Font font = workbook.createFont();
font.setFontName("宋体");
font.setFontHeightInPoints((short) 16);
headerStyle.setFont(font);

// 生成并设置另一个样式
CellStyle dataStyle = workbook.createCellStyle();
/* dataStyle.setFillForegroundColor(IndexedColors.GREY_25_PERCENT.getIndex());
dataStyle.setFillPattern(XSSFCellStyle.SOLID_FOREGROUND);*/
// 生成另一个字体
org.apache.poi.ss.usermodel.Font font2 = workbook.createFont();
dataStyle.setFont(font2);

// 产生表格标题行
Row row = sheet.createRow(0);
Cell cellHeader;
if(headers!=null){
for (int i = 0; i < headers.length; i++) {
cellHeader = row.createCell(i);
cellHeader.setCellStyle(headerStyle);
cellHeader.setCellValue(new XSSFRichTextString(headers[i]));
}
}

// 遍历集合数据,产生数据行
Iterator<Map<String, Object>> it = dataList.iterator();
Map<String, Object> map;
int index = 0;
Object value;
String textValue;
Cell cell;
Matcher matcher;
Pattern p = Pattern.compile("^//d+(//.//d+)?$");
SimpleDateFormat sdf=null;
if(StringUtils.isNotEmpty(pattern)){
sdf = new SimpleDateFormat(pattern);
}else{
sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
}
while (it.hasNext()) {
index++;
row = sheet.createRow(index);
map = (Map<String,Object>) it.next();
for (int n = 0; n < headersField.length; n++) {
cell = row.createCell(n);
cell.setCellStyle(dataStyle);
value = map.get(headersField[n]);
// 判断值的类型后进行强制类型转换
textValue = null;
if (value instanceof Integer) {
cell.setCellValue((Integer) value);
} else if (value instanceof Float) {
textValue = String.valueOf((Float) value);
cell.setCellValue(textValue);
} else if (value instanceof Double) {
textValue = String.valueOf((Double) value);
cell.setCellValue(textValue);
} else if (value instanceof Long) {
cell.setCellValue((Long) value);
}
if (value instanceof Boolean) {
textValue = "是";
if (!(Boolean) value) {
textValue = "否";
}
} else if (value instanceof Date) {
textValue = sdf.format((Date) value);
} else {
// 其它数据类型都当作字符串简单处理
if (value != null) {
textValue = value.toString();
}
}
if (textValue != null) {
matcher = p.matcher(textValue);
if (matcher.matches()) {
// 是数字当作double处理
cell.setCellValue(Double.parseDouble(textValue));
} else {
cell.setCellValue(textValue);
}
}
}
}

workbook.write(out);
out.flush();
} catch (IOException e) {
e.printStackTrace();
throw e;
}finally {
if(workbook != null) {
try {
workbook.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* <p>
* 通用Excel导出方法,利用反射机制遍历对象的所有字段,将数据写入Excel文件中 <br>
* 此版本生成2007以上版本的文件 (文件后缀:xlsx)
* </p>
*
* @param title
* 表格标题名
* @param headers
* 表格头部标题集合
* @param headersField
* 表格头部标题对应的字段集合
* @param dataList
* 需要显示的数据集合,集合中一定要放置符合JavaBean风格的类的对象。此方法支持的
* JavaBean属性的数据类型有基本数据类型及String,Date
* @param out
* 与输出设备关联的流对象,可以将EXCEL文档导出到本地文件或者网络中
* @param pattern
* 如果有时间数据,设定输出格式。默认为"yyyy-MM-dd hh:mm:ss"
* @throws IOException
*/
public static void exportExcel2007(String title, String[] headers, String[] headersField,
List<Map<String, Object>> dataList, OutputStream out, String pattern) throws IOException {

XSSFWorkbook workbook = null;
try {
// 声明一个工作薄
workbook = new XSSFWorkbook();
// 生成一个表格
XSSFSheet sheet = workbook.createSheet(title);
// 设置表格默认列宽度为30个字节
sheet.setDefaultColumnWidth(30);
// 生成一个样式
XSSFCellStyle headerStyle = workbook.createCellStyle();
// 设置这些样式
headerStyle.setFillForegroundColor(new XSSFColor(new Color(217, 217, 217)));
// 生成一个字体
XSSFFont font = workbook.createFont();
font.setFontName("宋体");
font.setColor(new XSSFColor(Color.BLACK));
font.setFontHeightInPoints((short) 16);
// 把字体应用到当前的样式
headerStyle.setFont(font);

// 生成并设置另一个样式
XSSFCellStyle dataStyle = workbook.createCellStyle();
/* dataStyle.setFillForegroundColor(new XSSFColor(java.awt.Color.WHITE));
dataStyle.setFillPattern(XSSFCellStyle.SOLID_FOREGROUND);*/
// 生成另一个字体
XSSFFont font2 = workbook.createFont();
// 把字体应用到当前的样式
dataStyle.setFont(font2);

// 产生表格标题行
XSSFRow row = sheet.createRow(0);
XSSFCell cellHeader;
if(headers!=null){
for (int i = 0; i < headers.length; i++) {
cellHeader = row.createCell(i);
cellHeader.setCellStyle(headerStyle);
cellHeader.setCellValue(new XSSFRichTextString(headers[i]));
}
}


// 遍历集合数据,产生数据行
Iterator<Map<String, Object>> it = dataList.iterator();
Map<String, Object> map;
int index = 0;
Object value;
String textValue;
XSSFCell cell;
XSSFRichTextString richString;
Matcher matcher;
Pattern p = Pattern.compile("^//d+(//.//d+)?$");
SimpleDateFormat sdf=null;
if(StringUtils.isNotEmpty(pattern)){
sdf = new SimpleDateFormat(pattern);
}else{
sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
}
while (it.hasNext()) {
index++;
row = sheet.createRow(index);
map = (Map<String,Object>) it.next();
for (int n = 0; n < headersField.length; n++) {
cell = row.createCell(n);
cell.setCellStyle(dataStyle);
value = map.get(headersField[n]);
// 判断值的类型后进行强制类型转换
textValue = null;
if (value instanceof Integer) {
cell.setCellValue((Integer) value);
} else if (value instanceof Float) {
textValue = String.valueOf((Float) value);
cell.setCellValue(textValue);
} else if (value instanceof Double) {
textValue = String.valueOf((Double) value);
cell.setCellValue(textValue);
} else if (value instanceof Long) {
cell.setCellValue((Long) value);
}
if (value instanceof Boolean) {
textValue = "是";
if (!(Boolean) value) {
textValue = "否";
}
} else if (value instanceof Date) {
textValue = sdf.format((Date) value);
} else {
// 其它数据类型都当作字符串简单处理
if (value != null) {
textValue = value.toString();
}
}
if (textValue != null) {
matcher = p.matcher(textValue);
if (matcher.matches()) {
// 是数字当作double处理
cell.setCellValue(Double.parseDouble(textValue));
} else {
richString = new XSSFRichTextString(textValue);
cell.setCellValue(richString);
}
}
}
}

workbook.write(out);
out.flush();
} catch (IOException e) {
e.printStackTrace();
throw e;
}finally {
if(workbook != null) {
try {
workbook.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* <p>
* 通用Excel导出方法,利用反射机制遍历对象的所有字段,将数据写入Excel文件中 <br>
* 此方法生成2003版本的excel,文件名后缀:xls <br>
* </p>
*
* @param title
* 表格标题名
* @param headers
* 表格头部标题集合
* @param headersField
* 表格头部标题对应的字段集合
* @param dataList
* 需要显示的数据集合,集合中一定要放置符合JavaBean风格的类的对象。此方法支持的
* JavaBean属性的数据类型有基本数据类型及String,Date
* @param out
* 与输出设备关联的流对象,可以将EXCEL文档导出到本地文件或者网络中
* @param pattern
* 如果有时间数据,设定输出格式。默认为"yyyy-MM-dd hh:mm:ss"
* @throws IOException
*/
public static void exportExcel2003(String title, String[] headers, String[] headersField,
List<Map<String, Object>> dataList, OutputStream out, String pattern) throws IOException {

HSSFWorkbook workbook = null;
try {
// 声明一个工作薄
workbook = new HSSFWorkbook();
// 生成一个表格
HSSFSheet sheet = workbook.createSheet(title);
// 设置表格默认列宽度为20个字节
sheet.setDefaultColumnWidth(20);
// 生成一个样式
HSSFCellStyle headerStyle = workbook.createCellStyle();
// 设置这些样式
// 生成一个字体
HSSFFont font = workbook.createFont();
font.setFontName("宋体");
font.setFontHeightInPoints((short) 11);
// 把字体应用到当前的样式
headerStyle.setFont(font);

// 生成并设置另一个样式
HSSFCellStyle dataStyle = workbook.createCellStyle();
// 生成另一个字体
HSSFFont font2 = workbook.createFont();
// 把字体应用到当前的样式
dataStyle.setFont(font2);

// 产生表格标题行
HSSFRow row = sheet.createRow(0);
HSSFCell cellHeader;
if(headers!=null && headers.length>0){
for (int i = 0; i < headers.length; i++) {
cellHeader = row.createCell(i);
cellHeader.setCellStyle(headerStyle);
cellHeader.setCellValue(new HSSFRichTextString(headers[i]));
}
}

// 遍历集合数据,产生数据行
Iterator<Map<String,Object>> it = dataList.iterator();
Map<String,Object> map;
int index = 0;
Object value;
String textValue;
HSSFCell cell;
HSSFRichTextString richString;
Matcher matcher;
Pattern p = Pattern.compile("^//d+(//.//d+)?$");
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
while (it.hasNext()) {
index++;
row = sheet.createRow(index);
map = (Map<String,Object>) it.next();
for (int n = 0; n < headersField.length; n++) {
cell = row.createCell(n);
cell.setCellStyle(dataStyle);
value = map.get(headersField[n]);
// 判断值的类型后进行强制类型转换
textValue = null;
if (value instanceof Integer) {
cell.setCellValue((Integer) value);
} else if (value instanceof Float) {
textValue = String.valueOf((Float) value);
cell.setCellValue(textValue);
} else if (value instanceof Double) {
textValue = String.valueOf((Double) value);
cell.setCellValue(textValue);
} else if (value instanceof Long) {
cell.setCellValue((Long) value);
}
if (value instanceof Boolean) {
textValue = "是";
if (!(Boolean) value) {
textValue = "否";
}
} else if (value instanceof Date) {
textValue = sdf.format((Date) value);
} else {
// 其它数据类型都当作字符串简单处理
if (value != null) {
textValue = value.toString();
}
}
if (textValue != null) {
matcher = p.matcher(textValue);
if (matcher.matches()) {
// 是数字当作double处理
cell.setCellValue(Double.parseDouble(textValue));
} else {
richString = new HSSFRichTextString(textValue);
cell.setCellValue(richString);
}
}
}
}

workbook.write(out);
out.flush();
} catch (IOException e) {
e.printStackTrace();
throw e;
}finally {
if(workbook != null) {
try {
workbook.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ReentrantLock源码学习

发表于 2021-11-26

学习ReentrantLock之前,先了解一下可重入锁的概念。何为可重入锁,顾名思义,就是可重入的。真是听君一席话,胜听一席话啊。

请添加图片描述

正经点,可重入锁就是能够支持同一个线程对资源的重复加锁。注意两个关键字:同一线程和重复。

像synchronized关键字也实现了可重入。用synchronized修饰的方法,在进行递归调用时,执行线程在获取了锁之后仍然能够连续多次获得该锁,并不会出现阻塞的情况。

再比如说,这篇文章要学习的ReentrantLock,也实现了可重入锁。并且ReentrantLock还支持公平锁和非公平锁(默认是非公平锁)。

1、ReentrantLock源码学习

1.1 构造方法

ReentrantLock的源码比较简单,并且它也是基于AQS实现的。先看看它的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码    /**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

默认就是非公平锁。

1.2 锁的释放

Sync类就是继承自AQS的,FairSync类和NonfairSync类又是继承自Sync。对于公平锁和非公平锁,其释放锁的逻辑都是一样的,所以在Sync类中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码    abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 判断当前线程是不是占有锁的线程,如果不是,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 同步变量state的值为0时,才释放锁,返回true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置同步变量的值
setState(c);
return free;
}
}

可以发现可重入锁的释放逻辑,对于占有锁的线程来说,只有在同步变量state的值为0的时候,才算是释放了锁。

1.3 锁的获取

锁的获取分公平锁和非公平锁。非公平锁的获取逻辑实现在Sync类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
* 非公平锁,获取锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 同步变量为0,说明没有线程占用锁
if (c == 0) {
// CAS获取锁,注意这里并没有判断该线程是不是同步队列的队头
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断当前线程是不是占有锁的线程
else if (current == getExclusiveOwnerThread()) {
// 增加同步变量state的值
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

可以发现Sync类似并没有重写AQS的tryAcquire方法,而是放到了它的子类FairSync类和NonfairSync类中去实现的。

看看NonfairSync类的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码    // 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

// 上来先CAS获取一下锁,如果获取失败,再调用AQS的acquire方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 调用Sync类的nonfairTryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

看看FairSync类的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码    // 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 直接调用AQS的acquire方法
final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 同步变量为0,说明没有线程占有锁
if (c == 0) {
/**
* 判断同步队列中当前节点是否有前驱节点,也就是只有当前节点是头结点并且CAS成功的情况下,当
* 前线程才能占有锁
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程已经占有锁,则增加state变量的值
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

可以发现公平锁和非公平锁在获取锁的时候,唯一的差别就是公平锁判断了当前节点是不是头结点,只有是头结点的情况下才可能获取到锁。非公平锁就不一样了,上来就直接CAS。

上面的方法都是在ReentrantLock类内部用的,对外提供的接口如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码// 获取锁,在等待获取锁的过程中休眠并禁止一切线程调度
public void lock() {
sync.lock();
}

// 在等待获取锁的过程中可被中断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

// 尝试获取锁,获取到锁并返回true;获取不到并返回false
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

// 在指定时间内等待获取锁;过程中可被中断
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

// 释放锁
public void unlock() {
sync.release(1);
}

2、测试

测试一下ReentrantLock的公平锁和非公平锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

public class FairAndUnfairTest {

private static Sync fairLock = new Sync(true);
private static Sync noFairLock = new Sync(false);

public static void testLock(Sync lock) {
// 开启5个线程
for (int i = 0; i < 5; i++) {
new Thread(new Job(lock), String.valueOf(i)).start();
}
}

private static class Job extends Thread {
private Sync lock;
public Job(Sync lock) {
this.lock = lock;
}

public void run() {
for (int i = 0; i < 2; i++) {
lock.lock();
System.out.println("locked by " + currentThread().getName() + ", waiting by " + lock.getQueueThreads());
lock.unlock();
}
}
}

private static class Sync extends ReentrantLock {
public Sync(boolean fair) {
super(fair);
}

/**
* 获取等待队列
* @return
*/
public List<String> getQueueThreads() {
List<Thread> arrayList = new ArrayList<Thread>(super. getQueuedThreads());
Collections.reverse(arrayList);
List<String> list = new ArrayList<>();
arrayList.forEach(el -> {
list.add(el.getName());
});
return list;
}
}

public static void main(String[] args) {
//testLock(fairLock);
testLock(noFairLock);
}
}

公平锁输出如下

1
2
3
4
5
6
7
8
9
10
java复制代码locked by 0, waiting by [1, 2]
locked by 1, waiting by [2, 4, 3, 0]
locked by 2, waiting by [4, 3, 0, 1]
locked by 4, waiting by [3, 0, 1, 2]
locked by 3, waiting by [0, 1, 2, 4]
locked by 0, waiting by [1, 2, 4, 3]
locked by 1, waiting by [2, 4, 3]
locked by 2, waiting by [4, 3]
locked by 4, waiting by [3]
locked by 3, waiting by []

非公平锁输出如下

1
2
3
4
5
6
7
8
9
10
java复制代码locked by 0, waiting by [2]
locked by 0, waiting by [2, 3, 1, 4]
locked by 2, waiting by [3, 1, 4]
locked by 2, waiting by [3, 1, 4]
locked by 3, waiting by [1, 4]
locked by 3, waiting by [1, 4]
locked by 1, waiting by [4]
locked by 1, waiting by [4]
locked by 4, waiting by []
locked by 4, waiting by []

可以发现公平锁总是按照顺序来依次获取锁。而非公平锁却是连续获取。回顾nonfairTryAcquire(int acquires)方法,当一 个线程请求锁时,只要获取了同步状态即成功获取锁。在这个前提下,刚释放锁的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。

非公平锁可能会出现线程饥饿的情况,当竞争的线程很多时,后面的线程可能一直都获取不到锁。那为啥ReentrantLock默认是非公平锁呢?经过上面的测试可以发现,在公平锁的情况下,线程进行了10次上下文切换,非公平锁情况下只进行了5次。

线程上下文切换是一个耗费时间和资源的操作,所以在线程竞争激烈的情况下,非公平锁无疑能够节省很多的资源。

总结一下就是:公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…168169170…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%