开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Elastic APM , APM服务配置安全访问, TSL

发表于 2021-11-19

Elastic APM , APM服务配置安全访问, TSL/SSL下的accessToken模式

如何配置 accessToken 访问 ,官方文档地址

在 apm-server 的配置文件apm-server.yaml 里面的 apm-server.auth.secret_token 自定义一个字符串,比如:
apm-server.auth.secret_token :123456xxx

然后在你的客户端Agent里配置:Node.js示例:源码Github地址

1
2
3
4
5
6
7
8
9
10
11
php复制代码var apm = require('elastic-apm-node').start({
// Override service name from package.json
// Allowed characters: a-z, A-Z, 0-9, -, _, and space
serviceName: 'zipcode service',

// Use if APM Server requires a token
secretToken: '123456xxx', //留意这里

// Set custom APM Server URL (default: http://localhost:8200)
serverUrl: 'https://localhost:8200',
})

然后重启apm-server,重启客户端,登录kibana,在APM控制台里就可以看到实例了,如果agent未配置secretToken访问,则会报错。

上述过程是未配置apm-server的SSL/TSL访问,这会带来一个安全问题,凭证通过http请求很容易被窃取,所以需要配置成https服务。这是官方推荐的做法。

官网SSL/TSL配置文档地址 这是英文的并且说的并不详细。推荐一个更详细的教程,地址

这个教程里面大部分的操作都是正确的,不过也有一些是错误,比如下面的代码 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//复制代码var apm = require('elastic-apm-node').start({
// Override service name from package.json
// Allowed characters: a-z, A-Z, 0-9, -, _, and space
serviceName: 'zipcode service',

// Use if APM Server requires a token
secretToken: '1234561',

//配置好 ca 以后 , 已经是https服务器了
// Set custom APM Server URL (default: http://localhost:8200)
serverUrl: 'http://localhost:8200' //这里是错误的
serverUrl: 'https://localhost:8200' //这样才是正确的

verifyServerCert: true,
serverCaCertFile: "ca.crt"
})

如果不改正,直接按照教程里面的代码,apm-server 会报 APM Server transport error (400): Unexpected APM Server response\nClient sent an HTTP request to an HTTPS server.\n",意思是说客户端向HTTPS服务器发送了HTTP请求。

另外,教程里面的:

1
2
3
4
5
csharp复制代码另注:我们可以使用如下的命令把一个.crt 的证书转换为一个 .pem 的证书:

openssl x509 -in mycert.crt -out mycert.pem -outform PEM

这是对于我们的配置也是不影响的,可以不用看,本人看教程的时候就被影响了。

PS: 还有一个需要注意的点,教程里面生成证书的时候传的 name 是localhost ,生成的证书也是 localhost.crt 和 localhost.key 这里是和你最后访问的https地址(https://localhost:8200)对应的,改成其他也是不行的,至于原理就是https原理的问题了,这里就是不坐赘述了。

教程使用的是Mac操作的,本人是在windows下操作的,是可以的,linux下未实操。

PS: Agent 连接apm-server的时候如有报错,报错信息都提示的很清楚,出现问题的时候第一时间想到看这里的错误信息(Agent的报错信息 , 还有 apm-server 的报错信息 )

衷心感谢原教程,原教程写的非常好,都很详细,就是有一点错误,之所以重新写一篇文章,是因为实在不怎么喜欢 CSDN ,所以没有在CSDN下留言。所以把补充都写在这里了。如有错误,欢迎指正!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kubernetes、KuberSphere初识和实践 结语

发表于 2021-11-19

本文对KuberNetes 中的概念做简单介绍。同时记录了一次KuberNetes和KuberSphere的部署经历。

初识Kubernetes

生产级别的容器编排系统 自动化的容器部署、扩展和管理

简单的说Kubernetes就是用来管理Docker的。它解决了Docker之上,应用之下,这个层面上许多复杂的问题。如自动部署,限制资源,自动重启、迁移故障,简化应用的访问等。

准备知识

master管理节点,与node工作节点相对。node干活master管理。

其中master包含组件:

api server 操作入口。Kubernetes的所有操作指令由对此下达。

etcd 集群内部用键值数据库。

scheduler调度器。调度工作的。

controller控制器。发布实际的指令。

node 中包含组件:

pod 几个相关容器(docker),它们被统一管理。Kubernetes的基本管理单位。

kubelet代理。master节点命令的实际执行者。

kube-proxy网络代理,网络入口和出口。

kubectl 命令行管理工具。

volume 数据卷,保存数据。

概念:

deployment 部署。指在节点部署的pod。

service服务,组合多个部署的pod,提供对外访问。

label 标签,selector 选择器,如给节点打上某标签,可以在部署应用时,只部署在包含标签的节点上。

namespace 命名空间,做逻辑隔离。

kubeadm 集群部署工具。

Ingress应用网关,统一出口、负载均衡。

架构图

t1.jpg

KubeSphere

面向云原生应用的容器混合云

Kubernetes的图形化的、优秀的管理界面,代替命令行的操作方式。

可以管理多云上的集群。将是日常工作中打交道最多的工具。

Kubernetes集群搭建

程序版本:

  • kubernetes v1.17.3
  • Docker version 18.03.1-ce, build 9ee9f40
  • KubeSphere 3.0.0

配置文件:

安装过程中用到的配置文件。

gitee.com/blue1018/bl…

安装前置条件

  • 一台或多台机器,操作系统Centos7。
  • 硬件配置:3072MB或更多RAM,2个CPU或更多CPU,硬盘30GB或更多。
  • 各节点网络通畅。

所有节点下的操作

操作系统设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bash复制代码#修改时区
timedatectl set-timezone Asia/Shanghai
#关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
#关闭Linux
sed -i 's/enforcing/disabled/' /etc/selinux/config
setenforce 0
#关闭swap
swapoff -a #临时关闭
sed -ri 's/.*swap.*/#&/' /etc/fstab #永久关闭
free -g #验证,swap必须为0
#将桥接的IPV4流量传递到iptables的链:
cat > /etc/sysctl.d/k8s.conf <<EOF
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
EOF
sysctl --system

设置阿里云Centos镜像源

1
2
3
4
bash复制代码wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
yum clean all
yum makecache
yum update -y

安装Docker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bash复制代码#依赖
yum install -y yum-utils device-mapper-persistent-data lvm2
#源
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
#版本
yum list docker-ce --showduplicates | sort -r
#安装
yum install docker-ce-18.03.1.ce-1.el7.centos -y
#启动
systemctl start docker
systemctl enable docker
#设置阿里云docker镜像源

sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://mt1tth70.mirror.aliyuncs.com"]
}
EOF

systemctl daemon-reload
systemctl restart docker

安装 kubectl、kubeadm、kubelet

1
2
3
4
5
6
7
8
9
ini复制代码cat <<EOF > /etc/yum.repos.d/kubernetes.repo
[kubernetes]
name=Kubernetes
baseurl=https://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64/
enabled=1
gpgcheck=1
repo_gpgcheck=1
gpgkey=https://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg https://mirrors.aliyun.com/kubernetes/yum/doc/rpm-package-key.gpg
EOF
1
复制代码yum install -y kubelet-1.17.3 kubeadm-1.17.3 kubectl-1.17.3
1
bash复制代码systemctl enable kubelet && systemctl start kubelet

主节点下的操作

镜像拉取

1
复制代码sh master_images.sh

初始化主节点

其中apiserver-advertise-address 要换成自己的IP。

1
2
3
4
5
6
ini复制代码kubeadm init \
--apiserver-advertise-address=192.168.31.201 \
--image-repository registry.cn-hangzhou.aliyuncs.com/google_containers \
--kubernetes-version v1.17.3 \
--service-cidr=10.96.0.0/16 \
--pod-network-cidr=10.244.0.0/16

安装正常时的返回结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bash复制代码Your Kubernetes control-plane has initialized successfully!

To start using your cluster, you need to run the following as a regular user:

mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

You should now deploy a pod network to the cluster.
Run "kubectl apply -f [podnetwork].yaml" with one of the options listed at:
https://kubernetes.io/docs/concepts/cluster-administration/addons/

Then you can join any number of worker nodes by running the following on each as root:

kubeadm join 192.168.31.201:6443 --token 09543n.9a2q9btquudvw8gb \
--discovery-token-ca-cert-hash sha256:d3b644e51a52fb80fb79037dfaf7e8c6af8435ea5f1a2567699b46cd02a800ec

执行返回结果中的命令

1
2
3
bash复制代码  mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

pod进度监控

新开个ssh窗口监视pod安装进度

1
sql复制代码watch kubectl get all --all-namespaces -o wide

安装网络插件

1
复制代码kubectl apply -f  kube-flannel.yml

子节点加入集群

主节点中建立TOKEN

1
lua复制代码kubeadm token create --print-join-command

子节点执行,执行返回结果中的命令,加入集群

1
2
sql复制代码kubeadm join 192.168.31.201:6443 --token sg47f3.4asffoi6ijb8ljhq \
--discovery-token-ca-cert-hash sha256:81fccdd29970cbc1b7dc7f171ac0234d53825bdf9b05428fc9e6767436991bfb
1
2
3
4
5
6
7
8
9
10
csharp复制代码[preflight] Running pre-flight checks
[WARNING IsDockerSystemdCheck]: detected "cgroupfs" as the Docker cgroup driver. The recommended driver is "systemd". Please follow the guide at https://kubernetes.io/docs/setup/cri/
[WARNING SystemVerification]: this Docker version is not on the list of validated versions: 18.03.1-ce. Latest validated version: 19.03
[preflight] Reading configuration from the cluster...
[preflight] FYI: You can look at this config file with 'kubectl -n kube-system get cm kubeadm-config -oyaml'
[kubelet-start] Downloading configuration for the kubelet from the "kubelet-config-1.17" ConfigMap in the kube-system namespace
[kubelet-start] Writing kubelet configuration to file "/var/lib/kubelet/config.yaml"
[kubelet-start] Writing kubelet environment file with flags to file "/var/lib/kubelet/kubeadm-flags.env"
[kubelet-start] Starting the kubelet
[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap...

在主节点查看集群的node

1
2
3
4
5
6
sql复制代码kubectl get nodes

NAME STATUS ROLES AGE VERSION
node201 Ready master 3m3s v1.17.3
node202 Ready <none> 111s v1.17.3
node203 Ready <none> 83s v1.17.3

部署Ingess

1
复制代码kubectl apply -f ingress-controller.yaml

部署一个nginx

以nginx为例测试Ingress

1
2
css复制代码kubectl create deployment nginx --image=nginx
kubectl expose deployment nginx --port=80 --target-port=80 --type=NodePort

用Ingress暴露一个服务

1
复制代码kubectl apply -f nginx-ingress.yaml

通过Ingress访问 Nginx服务

浏览器里通过 nginx.dev.com 访问。

这里Ingress 用域名 nginx.dev.com 暴露的Nginx服务。需要修改本地host文件,把域名映射到部署了ingress的节点的可访问IP。

KuberSphere 安装

在主节点最小化安装。

安装openebs

Kubernetes的存储方案

删除污点,node201 换成你自己的master hostname。

1
bash复制代码kubectl taint nodes node201 node-role.kubernetes.io/master:NoSchedule-

建立命名空间、安装

1
2
arduino复制代码kubectl create ns openebs
kubectl apply -f openebs-operator.yaml

等待安装结束

1
arduino复制代码kubectl get sc
1
2
3
4
5
sql复制代码NAME                         PROVISIONER                                                RECLAIMPOLICY   VOLUMEBINDINGMODE      ALLOWVOLUMEEXPANSION   AGE
openebs-device openebs.io/local Delete WaitForFirstConsumer false 16m
openebs-hostpath openebs.io/local Delete WaitForFirstConsumer false 16m
openebs-jiva-default openebs.io/provisioner-iscsi Delete Immediate false 16m
openebs-snapshot-promoter volumesnapshot.external-storage.k8s.io/snapshot-promoter Delete Immediate false 16m

设置默认storageclass

1
kotlin复制代码kubectl patch storageclass openebs-hostpath -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'
1
bash复制代码storageclass.storage.k8s.io/openebs-hostpath patched

安装完成后还原污点

1
ini复制代码kubectl taint nodes node201  node-role.kubernetes.io/master=:NoSchedule

安装kubesphere

1
2
复制代码kubectl apply -f kubesphere-installer.yaml
kubectl apply -f cluster-configuration.yaml

查看安装日志

1
sql复制代码kubectl logs -n kubesphere-system $(kubectl get pod -n kubesphere-system -l app=ks-install -o jsonpath='{.items[0].metadata.name}') -f

等待几分钟的时间、日志出现下面内容就可以访问了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
csharp复制代码#####################################################
### Welcome to KubeSphere! ###
#####################################################

Console: http://10.0.2.12:30880
Account: admin
Password: P@88w0rd

NOTES:
1. After logging into the console, please check the
monitoring status of service components in
the "Cluster Management". If any service is not
ready, please wait patiently until all components
are ready.
2. Please modify the default password after login.

#####################################################
https://kubesphere.io 2021-11-15 16:21:06
#####################################################

结语

完成集群搭建后,可阅读 Kubernetes、KuberSphere官方文档。做进一步探索,如高可用的Kubernetes集群、KuberSphere完整的DEVOPS流。

附录

常用命令例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ini复制代码kubectl get all --all-namespaces  -o wide #查看集群所有资源
kubectl get pods --all-namespaces #查看所有空间下的pod
kubectl create deployment nginx --image=nginx #以nginx镜像建立一个名为nginx的部署。
kubectl get deployment #查看部署
kubectl scale --replicas=4 deployment nginx #修改名为nginx的部署,副本数量为4
kubectl get pod nginx -o yaml #输出名为nginx的pod的yaml配置信息。
kubectl create deployment nginx --image=nginx --dry-run -o yaml #dry-run不真正运行,只为输出配置信息。
kubectl api-versions #查看api版本
kubectl expose deployment nginx --port=80 --target-port=80 --type=NodePort #暴露nginx服务,访问端口80.
kubectl get ingress --all-namespaces #查看网关的路由。
kubectl describe pod redis-6fd6c6d6f9-zwvmd -n kubesphere-system #查看指定空间、指定名称的pod的详细信息。
kubectl describe ingress test #查看名为test的路由的详细信息
kubectl delete ingress web #删除名为web的ingress路由
kubectl edit ingress nginx-web#编辑名为nginx-web的路由
kubectl get nodes --show-labels #查看节点的标签
kubectl apply -f nginx.yaml #根据配置文件进行操作
kubectl delete deployment.apps/nginx #删除nginx部署
kubectl delete service/nginx #删除nginx服务

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【得物技术】基于自注意机制的图像识别算法 背景 业务需求 模

发表于 2021-11-19

背景

对于做 CV 同学的人来说,图像识别是入门中最简单的模型,也是最最基础的模型。在不同的 CV 任务重,即使发展多年,都一直保留着利用训练好的图像识别任务重的模型权重,作为 backbone 用于加速训练收敛的作用。但是面对一些图像识别的任务,我们如何根据业务需求去改造一个已经比较完备的识别任务,还是蛮有意思的一个话题。

业务需求

能够筛选出用户上传的球鞋图片中带有干净统一背景的图片,我们暂且就叫这个项目为 背景复杂度检测 任务。帮助后续估价以及出售环节算法降低难度,以及维持整个 app 图像质量在一个水平线上。

项目要求

  1. 能够在测试集的准确率达到 80% 以上,可以用于给予用户提示,但是并不强制;而达到 90% 以上,可以强制用户按照要求达到上传图片质量的要求。
  2. 能够实现端侧应用。

模型设计

mobilenet backbone + FPN + modified SAM

最终做出来的模型各个模块分解来看其实很简单,没有什么特别难懂的地方。

整体需要努力的方向无外乎以下几点:

  1. 分析业务特点,这是一个典型的空间型识别任务,即是通过图像的某部分或者某几部分区域的内容完成目的。对于 背景复杂度 我们要刨去主体之外的部分,判断剩余部分是否是“复杂的”,即可,所以想到可以用空间系列的注意力机制。
  2. 我们不可能对于用户严格控制拍照中的主体占据图片中比例的大小。有些用户习惯将主体占满屏,有些用户喜欢留白多一些。对于不同的尺度,如果想做到精细分类,是需要在较高分辨率下的 feature map 做文章,所以这里用 FPN。
  1. 为了能够在端侧实现应用,选用 mobilenet 系列是很天然的想法。

以上几点的设计思路其实就是完全围绕当前 业务场景 而去做出的设计。

最终在测试集上,利用设计好的 CNN 模型达到了 96% 的正确率。可以作为强制用户上传高品质图片质量的依据。

如果想了解(白嫖)这个项目所用模型,其实分享到这就可以达到目标了。每一个模块的设计,其实都不难,没有新的概念。代码也很简明,实现也很方便。

但项目本身最有趣的实际上是实现整个项目的思想,为什么最后选用这样的模型,中间其他的思路是如何被排除,如何试错的!!!

项目过程

传统 CV

项目需要在端侧实现,虽然没有明确手机实时性的要求,但是需要整个算法文件,所引用的 库 占据内存足够的小。第一能够想到的就是如果不用深度网络,这个问题是否能够解决。

错误的思路

分析 背景复杂度,我曾想到用

  1. 对于和整个业务无关但是又严重影响结果的主体内容,用一个固定大小的高斯核进行过滤,然后进行其他处理。
  2. 利用边缘检测、梯度信息分析一张图片的背景复杂度
  1. 傅里叶频率信息分析图片的高低频信息来判断:高频代表背景信息多,较为复杂;低频代表干净。
  2. 通过多张背景干净的图片做简单像素加权平均当做模板,然后利用异常检测的方法,筛选出背景复杂的图片。

以上都是在项目初期,我能想到的一些简单的 idea。但是观察图片样本后,这些想法,除去第一条的思路是对的(空间维度的信息),其他三条的思路都是错的。举个例子,一双鞋放在地毯上,地毯本身是干净的,图片也除去主体和地毯没有其他任何物体。但是地毯有自己的纹路,高频信息很多,梯度信息很多。按照上述 idea 2、3、4 条的原则,这样的背景属于复杂度很高的背景。但是实际上,在样本中,这样的背景算是干净的。

通过分析数据的样本以及反思业务逻辑得出以下结论:

  1. 背景复杂度概念不是针对不同图片之间背景的差异。
  2. 背景复杂度的概念是针对每张图片自身是否出现了不同的背景和不是主体的一些前景物体。

那么,整体的思路就要做出一些改变,就是要通过 自相似度 的方式来判断背景复杂度。这也是对于业务越来越熟悉之后做出策略上的调整。

模板匹配

那么需要用到哪些部分可以用于自相似度的判断呢?

  1. 四个角

上面说过,错误的 idea 中,有一个思路是对的,就是要利用空间维度去思考业务,去掉主体信息思路是对的。即我们在判断背景自相似度的情况,是需要避免划到主体信息的。根据这个思路,我们可根据经验,4 个角可以在一定程度上可以代表一张图的背景信息。如果 4 个角上的内容相似,可以解决大部分业务的目标。利用 4 个角的信息计算两两自相似度,有 6 个值。6 个值越高,说明 4 个角的相似度越高,是干净背景的概率也就越大。

  1. 两个角

通过观察实际的 业务 样本后发现,用户拍照的习惯往往是上面的留白会比下面的多。也就是说,下方的 2 个角经常会含有主体信息。所以最终确认的方案就是只利用 上方的 2 个角进行相似度匹配。

  1. 还没完

但是,只利用 2 个角的相似度匹配只有 1 个值的输出,那这样的值对于整个业务来说,太不稳定了,存在很大的风险。于是我分别利用上方两个角自己作为模板,然后分别去做滑窗做匹配,记录大于一定阈值的个数,作为一个相似度评价的一个指标。所以,最终有 3 个指标来衡量一张图背景的相似度情况。一个是上方两个角彼此的相似度值。另外两个是每一个角,独自作为模板,与整幅图上其他空间上内容能够匹配的个数。根据预设定好的权重,进行加权,最终得到一个分数。这个分数用来判断最后这幅图是否是背景复杂。

最终,通过这种方法,输入图片进行统一的 resize,保持图像在一定大小,让模板匹配速度控制在合理的范围内,根据以上的做法,在测试机就可以达到 80% 的正确率。这个算法可以在产品端作为引导晕乎拍照的工具。当算法判断为复杂背景时,给予提示希望用户重新拍照。

CNN

以上传统 CV 没有听懂,没有关系。它只是叙述下整个项目利用业务分析,得到一些宝贵思路的过程。下面的方式才是此文章的重点。

baseline

为了项目快速迭代优化,选用识别的思路做这个项目。也就是很普通的图像识别任务。由于希望能够放在手机里的模型,自己能够想到也比较熟悉的是 mobilenet v1。

优化模型

目标检测

在文章开始的时候,我提到过,这是一个标准的空间行识别问题。对于这样的识别问题,我通常的做法就是利用 检测模型 解决识别任务。

这种思路很常见。比如在社区中,我们想识别一个图文帖子中,图中达人他的穿搭是什么样的,有没有背什么牌子的包包,穿了什么品牌的鞋。不考虑实际算法难度,端对端的思路就是一个识别问题。输入一张图,输出几个标签。但是实际上,直接利用识别模型是很难的,因为有很多冗余背景的信息。我们面对这样的问题还是需要利用检测模型去做。最后只是不需要输出 bounding box 就好,可能还需要做一些去重的策略上的工作。

回到本业务,有一点不同的是,我们最终关心的区域恰恰是去除主体的区域。在 传统 CV 中有一个思路就是用一个固定大小的高斯核去做这样类似的事情。但是恰恰是因为这一点,我们不能像上文社区那样的例子利用一个目标检测模型加上一些策略就直接拿到输出。

首先需要一个目标检测模型拿到主体。然后用 mask 过滤的方案,接上传统 CV 算法也好,CNN 的 baseline 也罢,就可以得到最终的答案。

利用目标检测的方法,是可以很明晰地解决这样的一个业务问题,也是最直观的选择。

隐性目标检测

目标检测的方法虽然直观,但是存在两个巨大的缺陷:

  1. 不能够端对端,至少需要两个步骤,或者两个模型,对于速度,和内存空间来说都是一个负担。
  2. 目标检测需要标签的成本也比识别的成本大的多。

那么,是否我们可以优化掉上述的这两个缺陷呢?答案是肯定的。

我们可以像目标检测的步骤那样,在中间的模型的过程中,预测一个区域,就是一个 bounding box。输出也就是 4 个维度。通常是可以输出 左上角、右下角;左上角、宽高;或者中心坐标、宽高。然后利用原图像和这 4 个维度的结果做一个 mask 过滤,过滤掉主体,再进行下一步的识别作用。

这样做的好处就可以避免直接用 目标检测 任务所有的缺陷。他把上一种方案分步做合并成一个模型。中间标签也是自己学出来的而不是人工标出来的,省去很多训练之前的准备工作时间。

其实著名的 拍立淘 的图搜算法就是利用了这样的思路进行图搜识别的。一般的图搜也是分为 目标检测 + 识别的方法,但是 拍立淘的方案利用上述这种方法去除了目标检测的前置任务。

隐性分割

既然目标检测可以,那么其实语义分割,或者实例分割也可以做这样的内容。我们看看隐性目标检测也有这个他的局限性:

  1. 由于 bounding box 本身的问题,他依然囊括了一些或者去除了一些必要信息。
  2. 对于多个物体的话,这种方式就很不灵活。(当然这个问题不是不可以解决。也可以用到目标检测设计一些卷积得到 feature map 的方式解决多目标检测的问题,但是那样参数量也会上升。)

同样,我们利用典型的分割思路,在 mask 生成的阶段,抛弃 bounding box 这种规整的形状,去省城一个物体的外轮廓边界。这个边界同样也是隐性的,是不需要人工标注的,然后用输入图像去过滤这个mask。

空间注意力机制

隐性分割固然很有道理,但是实际上参数量会上升,计算量会加大。其实无论是目标检测还是分割的方法,其实从广义上讲就是一个空间注意力强监督。也就是我们人为划定好的空间注意力机制。但是还是那个问题,我们最终的目标不是学习 bounding box 的坐标,不是学习到背景的外轮廓信息 mask 有多么精确。这些中间结果对于最终目标不是很关键。或者说,模型可以学习到自己的一套关注的区域就可以了,不需要可解释性很强的区域也是可以的。那么对于这样的场景,我们没有必要在分割模型上做到最上层,也就是分辨率最高那层的 mask。只要做到中间层效果就已经出来了,无需浪费更多的参数量。

对于“U”型的图像分割模型,我们只需做到“J”型就够了。“J”型结构就是标准的 backbone + FPN。FPN 即保留了一定高分辨率的信息,又结合了底层上来的语义信息,是一个完美的“融合体”。在这一层再做一个 mask 就相当于是在这一层做了语义分割。其实这就是属于空间注意力机制。

空间 + channel 注意力机制

既然有了空间注意力,来都来了,我们也可以加上 channel 维度的注意力机制。我们耳熟能详的 attention 模块像 SE 属于 channel 维度的。BAM/CBAM 是空间维度 + channel 维度都有的注意力机制。但是他是分开做的。无论是串行还是并行,都是分为两个模块做。这个项目中利用 YOLOV4 中改写的 Modified SAM 模块,一步合成了 空间 + channel 的权重。简单而又便于理解。但是 channel 维度,确实就是个“顺便”,在此业务中,我觉得加不加都可以。

我们看一下中间的 grad CAM 结果

结果对比

通过与没有加入上述 attention 模块以及 FPN 的模型对比改进之后的模型。原有模型在验证集上的正确率为 93%,改进后的模型在 96%。但更重要的是,改进后的模型在可解释性上有明显的增强。当可解释性增强之后,才会有明确的优化方向的可能性。

总结

以上就是对于如何在图像识别中做出创新的一种思路。但是创新的目的不是为了创新,整个项目下来最重要的核心就是为了 解决业务问题。

文/诗诗

关注得物技术,做最潮技术人!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

『面试の神』Java怎样实现流合并?

发表于 2021-11-19

「这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战」。

前言

Hello 大家好,我是l拉不拉米,流式计算一直是Java面试中出现频率很高的一个问题,今天『面试の神』系列就来讲一讲 Java 流合并的实现方式。

普通Java方法

JDK 8 Stream 类有一些有用的静态方法。比如concat():

合并两个流

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Test 
public void merge() {
Stream<Integer> stream1 = Stream.of(1, 3, 5);
Stream<Integer> stream2 = Stream.of(2, 4, 6);

Stream<Integer> resultingStream = Stream.concat(stream1, stream2);

assertEquals(
Arrays.asList(1, 3, 5, 2, 4, 6),
resultingStream.collect(Collectors.toList()));
}

合并多个流

当我们要合并多个流时,操作会变得复杂一些。一个可行的办法是,先合并前两个流,再用合并结果合并其他的流。

比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Test 
public void merge() {
Stream<Integer> stream1 = Stream.of(1, 3, 5);
Stream<Integer> stream2 = Stream.of(2, 4, 6);
Stream<Integer> stream3 = Stream.of(18, 15, 36);

Stream<Integer> resultingStream = Stream.concat(
Stream.concat(stream1, stream2), stream3);

assertEquals(
Arrays.asList(1, 3, 5, 2, 4, 6, 18, 15, 36),
resultingStream.collect(Collectors.toList()));
}

这种方法对于更多的流变得不可行。当然,我们可以创建中间变量或辅助方法以使其更具可读性,但我们还有更好的办法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Test 
public void merge() {
Stream<Integer> stream1 = Stream.of(1, 3, 5);
Stream<Integer> stream2 = Stream.of(2, 4, 6);
Stream<Integer> stream3 = Stream.of(18, 15, 36);
Stream<Integer> stream4 = Stream.of(99);

Stream<Integer> resultingStream = Stream.of(
stream1, stream2, stream3, stream4)
.flatMap(i -> i);

assertEquals(
Arrays.asList(1, 3, 5, 2, 4, 6, 18, 15, 36, 99),
resultingStream.collect(Collectors.toList()));
}

它经过两个步骤:

  • 首先创建一个包含 4 个 Streams 的新 Stream,生成嵌套的流 Stream
  • 然后我们使用恒等函数将它 flatMap() 转换为 Stream

使用 StreamEx

StreamEx 是一个开源 Java 库,它扩展了 Java 8 Streams 的可能性。它使用 StreamEx 类作为对 JDK 的 Stream 接口的增强。

合并流

StreamEx 库允许我们使用 append() 实例方法合并流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Test 
public void merge() {
Stream<Integer> stream1 = Stream.of(1, 3, 5);
Stream<Integer> stream2 = Stream.of(2, 4, 6);
Stream<Integer> stream3 = Stream.of(18, 15, 36);
Stream<Integer> stream4 = Stream.of(99);

Stream<Integer> resultingStream = StreamEx.of(stream1)
.append(stream2)
.append(stream3)
.append(stream4);

assertEquals(
Arrays.asList(1, 3, 5, 2, 4, 6, 18, 15, 36, 99),
resultingStream.collect(Collectors.toList()));
}

由于它是一个实例方法,我们可以轻松地将它链接起来并附加多个 streams。

使用prepend()合并流

StreamEx 还包含一个方法,该方法在另一个之前添加元素,称为 prepend():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Test 
public void merge() {
Stream<String> stream1 = Stream.of("foo", "bar");
Stream<String> openingBracketStream = Stream.of("[");
Stream<String> closingBracketStream = Stream.of("]");

Stream<String> resultingStream = StreamEx.of(stream1)
.append(closingBracketStream)
.prepend(openingBracketStream);

assertEquals(
Arrays.asList("[", "foo", "bar", "]"),
resultingStream.collect(Collectors.toList()));
}

使用Jooλ

jooλ是JDK 8兼容库,可为JDK提供有用的扩展。此处最重要的流抽象称为 SEQ。请注意,这是一个顺序和有序的流,因此调用 parallel() 将没有效果。

合并流

和StreamEx库一样,jooλ也有append()方法:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Test 
public void merge() {
Stream<Integer> seq1 = Stream.of(1, 3, 5);
Stream<Integer> seq2 = Stream.of(2, 4, 6);

Stream<Integer> resultingSeq = Seq.ofType(seq1, Integer.class)
.append(seq2);

assertEquals(
Arrays.asList(1, 3, 5, 2, 4, 6),
resultingSeq.collect(Collectors.toList()));
}

使用prepend()合并流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@Test 
public void merge() {
Stream<String> seq = Stream.of("foo", "bar");
Stream<String> openingBracketStream = Stream.of("[");
Stream<String> closingBracketStream = Stream.of("]");

Stream<String> resultingStream = Seq.ofType(seq, String.class)
.append(closingBracketStream)
.prepend(openingBracketStream);

assertEquals(
Arrays.asList("[", "foo", "bar", "]"),
resultingStream.collect(Collectors.toList()));
}

总结

可以看出来合并两个流的时候使用 JDK 8 比较简单方便,但当我们需要多个流合并时,使用SteamEx或JOOλ库可以通过append()的方式追加流,可读性更高。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis中keys, scan, smembers命令的区

发表于 2021-11-19

这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战

起因

昨天在群里看到了一道面试题,题目是:

1
2
vbnet复制代码假如Redis里面有1亿个key,其中有10w个key是以某个固定的已知的前缀开头的
如果将它们全部找出来并且说明使用的方法的底层原理?

看群里的小伙伴们各抒己见,最后总结起来有如下三个命令:

  • keys:用于查找所有符合给定模式 pattern 的 key
  • smembers:返回集合 key 中的所有成员
  • scan:增量迭代当前数据库中的所有数据库键

其中,keys和smembers命令都是返回所有成员,时间复杂度为O(n)O(n)O(n)。scan虽然时间复杂度也是O(n)O(n)O(n),但是它采用的是增量的方式,数据量较大的场景下,不会像前两者一样一直阻塞着线程。

上面都是通过网上搜索来的答案,具体是不是这样我也不确定。于是我跑去github上面下了redis的源码,看看源码里面是怎么样的。

github.com/redis/redis

源码

在源码文件中有一个命令表,位于redis\src\server.c文件中,在这里我们可以快速的找到各个命令对应的函数入口。

image.png

image.png

命令的入口找到了,我们就可以分别跳转过去看它的对应实现了。

keys

在keys命令函数中可以看到它是遍历了整个键空间字典,将与给定的pattern匹配的添加到返回列表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
c复制代码void keysCommand(client *c) {
dictIterator *di;
dictEntry *de;
sds pattern = c->argv[1]->ptr;
int plen = sdslen(pattern), allkeys;
unsigned long numkeys = 0;
void *replylen = addReplyDeferredLen(c);

di = dictGetSafeIterator(c->db->dict);
allkeys = (pattern[0] == '*' && plen == 1);
while((de = dictNext(di)) != NULL) {
sds key = dictGetKey(de);
robj *keyobj;

if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
keyobj = createStringObject(key,sdslen(key));
if (!keyIsExpired(c->db,keyobj)) {
addReplyBulk(c,keyobj);
numkeys++;
}
decrRefCount(keyobj);
}
}
dictReleaseIterator(di);
setDeferredArrayLen(c,replylen,numkeys);
}

smembers

smembers命令这边调用的是sinterGenericCommand函数,搜索范围传递的是0,表示全局搜索。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
C复制代码/* SINTER key [key ...] */
void sinterCommand(client *c) {
sinterGenericCommand(c, c->argv+1, c->argc-1, NULL, 0, 0);
}

/* SINTER / SINTERSTORE / SINTERCARD
*
* 'cardinality_only' work for SINTERCARD, 只返回具有最小处理和内存开销的基数.
*
* 'limit' work for SINTERCARD, 达到限制后停止搜索。传递 0 表示无限制.
*/
void sinterGenericCommand(client *c, robj **setkeys,
unsigned long setnum, robj *dstkey,
int cardinality_only, unsigned long limit) {}

scan

由于scanGenericCommand代码太长来这里就不全贴出来,小伙伴们感兴趣的话可以下载源码搜索一下。

该函数的大概实现逻辑应该是先解析附带的参数,得到需要返回的数据长度以及游标值,再迭代集合获取所需数据。

具体的入口函数为disScan。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
c复制代码/* The SCAN command completely relies on scanGenericCommand. */
void scanCommand(client *c) {
unsigned long cursor;
if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
scanGenericCommand(c,NULL,cursor);
}

/* This command implements SCAN, HSCAN and SSCAN commands.
* 如果对象 'o' 被传递,那么它必须是一个 Hash、Set 或 Zset 对象,
* 否则如果 'o' 为 NULL,该命令将对与当前数据库关联的字典进行操作
*
* 当 'o' 不为 NULL 时,该函数假定客户端参数向量中的第一个参数是一个键,
* 因此它会在迭代之前跳过它以解析选项。
*
* 在 Hash 对象的情况下,该函数返回 Hash 上每个元素的字段和值。 */
void scanGenericCommand(client *c, robj *o, unsigned long cursor) {}

最后

由于我是Java出身,对于c/c++的语法不熟悉,备注都是阅读源码后结合注释进行理解的,如果文中有备注错误的地方,希望大佬们可以帮忙指点一下!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

雪花算法,到底是个啥?

发表于 2021-11-19

这是我参与11月更文挑战的第7天,活动详情查看:2021最后一次更文挑战

SnowFlake 算法

是 Twitter 开源的分布式 id 生成算法。其核心思想就是:使用一个 64 bit 的 long 型的数字作为全局唯一 id。在分布式系统中的应用十分广泛,且 ID 引入了时间戳,基本上保持自增的,后面的代码中有详细的注解。

这 64 个 bit 中,其中 1 个 bit 是不用的,然后用其中的 41 bit 作为毫秒数,用 10 bit 作为工作机器 id,12 bit 作为序列号。

给大家举个例子吧,比如下面那个 64 bit 的 long 型数字:

  • 第一个部分,是 1 个 bit:0,这个是无意义的。
  • 第二个部分是 41 个 bit:表示的是时间戳。
  • 第三个部分是 5 个 bit:表示的是机房 id,10001。
  • 第四个部分是 5 个 bit:表示的是机器 id,1 1001。
  • 第五个部分是 12 个 bit:表示的序号,就是某个机房某台机器上这一毫秒内同时生成的 id 的序号,0000 00000000。

1、第一部分:1 bit,是不用的,为啥呢?

因为二进制里第一个 bit 为如果是 1,那么都是负数,但是我们生成的 id 都是正数,所以第一个 bit 统一都是 0。

2、第二部分:41 bit,表示的是时间戳,单位是毫秒

41 bit 可以表示的数字多达 2^41 - 1,也就是可以标识 2 ^ 41 - 1 个毫秒值,换算成年就是表示 69 年的时间。

3、第三、四部分:10 bit:记录工作机器 id

代表的是这个服务最多可以部署在 2^10 台机器上,也就是 1024 台机器。

但是 10 bit 里 5 个 bit 代表机房 id,5 个 bit 代表机器 id。意思就是最多代表 2 ^ 5 个机房(32 个机房),每个机房里可以代表 2 ^ 5 个机器(32 台机器),也可以根据自己公司的实际情况确定。

4、第五部分:12 bit:这个是用来记录同一个毫秒内产生的不同 id

12 bit 可以代表的最大正整数是:

1
ini复制代码2 ^ 12 - 1 = 4096

简单来说,你的某个服务假设要生成一个全局唯一 id,那么就可以发送一个请求给部署了 SnowFlake 算法的系统,由这个 SnowFlake 算法系统来生成唯一 id。

  • 这个 SnowFlake 算法系统首先肯定是知道自己所在的机房和机器的,比如机房 id = 17,机器 id = 12;
  • 接着 SnowFlake 算法系统接收到这个请求之后,首先就会用二进制位运算的方式生成一个 64 bit 的 long 型 id,64 个 bit 中的第一个 bit 是无意义的;
  • 接着 41 个 bit,就可以用当前时间戳(单位到毫秒),然后接着 5 个 bit 设置上这个机房 id,还有 5 个 bit 设置上机器 id;
  • 最后再判断一下,当前这台机房的这台机器上这一毫秒内,这是第几个请求,给这次生成 id 的请求累加一个序号,作为最后的 12 个 bit。

最终一个 64 个 bit 的 id 就出来了,类似于:

这个算法可以保证说,一个机房的一台机器上,在同一毫秒内,生成了一个唯一的 id。可能一个毫秒内会生成多个 id,但是有最后 12 个 bit 的序号来区分开来。

  • END -

作者:架构精进之路,十年研发风雨路,大厂架构师,CSDN 博客专家,专注架构技术沉淀学习及分享,职业与认知升级,坚持分享接地气儿的干货文章,期待与你一起成长。

关注并私信我回复“01”,送你一份程序员成长进阶大礼包,欢迎勾搭。

Thanks for reading!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Mybatis-源码解析】12 启动过程中,Mapper

发表于 2021-11-19

1.2 启动过程中,Mapper.xml的解析

整体流程

简易流程图

整体流程解析引用类缓存解析当前类缓存扫描器进行扫描解析入参映射关系解析出参映射关系解析SQL代码块解析SQL执行语句解析SQL节点结束

文本描述

  • 启动时会初始化XMLMapperBuilder对象
    • 内部构建XPathParser
  • 进行解析
    • 解析引用类缓存(cache-ref节点) cacheRefElement
    • 解析当前类缓存(cache节点) cacheElement
      • Configuration在构建过程中会在typeAliasRegistry的TYPE_ALIASES赋值
    • 解析入参映射关系(/mapper/parameterMap 节点)parameterMapElement
    • 解析出参映射关系(/mapper/resultMap 节点)resultMapElements
      • 处理构造函数节点
      • 构建ResultMapping属性
      • 解析discriminator(鉴频器)
    • 解析SQL代码块(/mapper/sql 节点) sqlElement
      • 解析SQL执行语句() buildStatementFromContext 解析
    • 解析SQL节点(select|insert|update|delete节点
      • 解析SelectKeyNodes processSelectKeyNodes

父类接口 BaseBuilder

其他类解析:XPathParser

XML中的使用 XPathParser 进行解析

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typescript复制代码  // 启动时通过此方法构建
public XMLMapperBuilder(InputStream inputStream, Configuration configuration, String resource, Map<String, XNode> sqlFragments) {
this(new XPathParser(inputStream, true, configuration.getVariables(), new XMLMapperEntityResolver()),
configuration,
resource,
sqlFragments);
}

// 核心初始化方法 设置父类的configuration、根据配置和资源构建builder助理、设置parser(解析器)、设置sql片段、设置资源
private XMLMapperBuilder(XPathParser parser, Configuration configuration, String resource, Map<String, XNode> sqlFragments) {
super(configuration);
this.builderAssistant = new MapperBuilderAssistant(configuration, resource);
this.parser = parser;
this.sqlFragments = sqlFragments;
this.resource = resource;
}

核心方法 parse(解析)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
scss复制代码  public void parse() {
// 判断资源是否被加载(判断configuration.loadedResources是否有该资源),如果没有加载,进行加载
if (!configuration.isResourceLoaded(resource)) {
// 解析 /mapper 元素 通过xpath解析节点
configurationElement(parser.evalNode("/mapper"));

// 将该资源记载到 configuration.loadedResources中,标记为已加载
configuration.addLoadedResource(resource);

/**
* 根据 Namespace 构建Mapper
* 1. 获取到解析出来的 Namespace 如果 Namespace 不为空 加载该接口
* 2. 判断 configuration.knownMappers 是否有该接口类
* 3. 如果没有在 configuration.loadedResources 中添加 "namespace:" + namespace 类,并将 configuration.knownMappers 添加 该接口类
*/
bindMapperForNamespace();
}

// 获取IncompleteResultMap中在解析过程中失败的ResultMap,再重新进行加载
parsePendingResultMaps();

// 获取IncompleteCacheRef中在解析过程中失败的CacheRef,再重新进行加载
parsePendingCacheRefs();

// 获取IncompleteStatement中在解析过程中失败的Statement,再重新进行加载
parsePendingStatements();
}

解析元素 configurationElement

如果Config中没有解析过该文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
csharp复制代码  private void configurationElement(XNode context) {
try {
// 在节点中获取namespace属性 获取到映射的mapper接口类
String namespace = context.getStringAttribute("namespace");
if (namespace == null || namespace.equals("")) {
throw new BuilderException("Mapper's namespace cannot be empty");
}

// 构建器中获取到映射属性
builderAssistant.setCurrentNamespace(namespace);

// 在mapper节点中 获取到cache-ref节点 用于配置引用类缓存
cacheRefElement(context.evalNode("cache-ref"));

// 在mapper节点中 获取cache节点 用于配置当前类缓存
cacheElement(context.evalNode("cache"));

// 在mapper节点中 获取 /mapper/parameterMap 节点 设置参数映射
parameterMapElement(context.evalNodes("/mapper/parameterMap"));

// 在mapper节点中 获取 /mapper/resultMap 节点 设置返回结果
resultMapElements(context.evalNodes("/mapper/resultMap"));

// 在mapper节点中 获取 /mapper/sql 节点
sqlElement(context.evalNodes("/mapper/sql"));

// 解析sql语句
buildStatementFromContext(context.evalNodes("select|insert|update|delete"));
} catch (Exception e) {
throw new BuilderException("Error parsing Mapper XML. The XML location is '" + resource + "'. Cause: " + e, e);
}
}

解析引用类缓存(cache-ref节点) cacheRefElement

在当前mapper执行过程中,如果配置了CacheRef,那么CacheRef中配置的接口类,将在当前类使用过程中使用缓存,首先请确保CacheRef中配置的接口类已经使用了缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码  private void cacheRefElement(XNode context) {
// 如果节点不为空
if (context != null) {
// 配置中添加缓存信息(添加到cacheRefMap中,KEY为当前的Mapper接口全限定名,value为需要缓存的权限限定名)
configuration.addCacheRef(builderAssistant.getCurrentNamespace(), context.getStringAttribute("namespace"));

// 构建CacheRefResolver
CacheRefResolver cacheRefResolver = new CacheRefResolver(builderAssistant, context.getStringAttribute("namespace"));
try {
// 根据缓存的namespace获取缓存对象,并设置当前缓存currentCache
cacheRefResolver.resolveCacheRef();
} catch (IncompleteElementException e) {
// 如果解析失败,存储到IncompleteCacheRef中,在处理完后加载
configuration.addIncompleteCacheRef(cacheRefResolver);
}
}
}

解析当前类缓存(cache节点) cacheElement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
ini复制代码  private void cacheElement(XNode context) throws Exception {
// 如果节点不为空
if (context != null) {
// 在节点中获取type属性,如果type属性为空,那么设置的默认值为:PERPETUAL
String type = context.getStringAttribute("type", "PERPETUAL");

// 根据缓存类型注册缓存获取缓存类
/**
* 在typeAliasRegistry.TYPE_ALIASES中根据类型获取
* 如果获取不到,通过Resources.classForName加载传入的类名
*/
Class<? extends Cache> typeClass = typeAliasRegistry.resolveAlias(type);

// 在节点中获取eviction属性,如果eviction属性为空,name设置默认值为:LRU
String eviction = context.getStringAttribute("eviction", "LRU");

// 根据eviction类型加载eviction类
Class<? extends Cache> evictionClass = typeAliasRegistry.resolveAlias(eviction);

// 在节点中获取flushInterval属性
Long flushInterval = context.getLongAttribute("flushInterval");
// 在节点中获取size属性
Integer size = context.getIntAttribute("size");
// 在节点中获取是否只读标识,默认为false
boolean readWrite = !context.getBooleanAttribute("readOnly", false);
// 在节点中获取是否阻塞标识,默认为false
boolean blocking = context.getBooleanAttribute("blocking", false);
// 在节点中获取子节点 构建成props对象
Properties props = context.getChildrenAsProperties();
// 生成代理器,使用缓存
/**
* 构建缓存对象:设置缓存类、缓存的回收策略、缓存刷新间隔、缓存大小、读写状态、阻塞状态、扩展属性
* 并在configuration中设置缓存对象
* 设置当前类的缓存对象
*/
builderAssistant.useNewCache(typeClass, evictionClass, flushInterval, size, readWrite, blocking, props);
}
}
Configuration在构建过程中会在typeAliasRegistry的TYPE_ALIASES赋值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
arduino复制代码    typeAliasRegistry.registerAlias("JDBC", JdbcTransactionFactory.class);
typeAliasRegistry.registerAlias("MANAGED", ManagedTransactionFactory.class);

typeAliasRegistry.registerAlias("JNDI", JndiDataSourceFactory.class);
typeAliasRegistry.registerAlias("POOLED", PooledDataSourceFactory.class);
typeAliasRegistry.registerAlias("UNPOOLED", UnpooledDataSourceFactory.class);

typeAliasRegistry.registerAlias("PERPETUAL", PerpetualCache.class);
typeAliasRegistry.registerAlias("FIFO", FifoCache.class);
typeAliasRegistry.registerAlias("LRU", LruCache.class);
typeAliasRegistry.registerAlias("SOFT", SoftCache.class);
typeAliasRegistry.registerAlias("WEAK", WeakCache.class);

typeAliasRegistry.registerAlias("DB_VENDOR", VendorDatabaseIdProvider.class);

typeAliasRegistry.registerAlias("XML", XMLLanguageDriver.class);
typeAliasRegistry.registerAlias("RAW", RawLanguageDriver.class);

typeAliasRegistry.registerAlias("SLF4J", Slf4jImpl.class);
typeAliasRegistry.registerAlias("COMMONS_LOGGING", JakartaCommonsLoggingImpl.class);
typeAliasRegistry.registerAlias("LOG4J", Log4jImpl.class);
typeAliasRegistry.registerAlias("LOG4J2", Log4j2Impl.class);
typeAliasRegistry.registerAlias("JDK_LOGGING", Jdk14LoggingImpl.class);
typeAliasRegistry.registerAlias("STDOUT_LOGGING", StdOutImpl.class);
typeAliasRegistry.registerAlias("NO_LOGGING", NoLoggingImpl.class);

typeAliasRegistry.registerAlias("CGLIB", CglibProxyFactory.class);
typeAliasRegistry.registerAlias("JAVASSIST", JavassistProxyFactory.class);
缓存的回收策略
  • LRU:最近最少使用,移除最长时间不被使用的对象
  • FIFO:先进先出,按对象进入缓存的顺序来移除它们
  • SOFT:软引用,移除基于垃圾回收器状态和软引用规则的对象
  • WEAK:弱引用,更积极地移除基于垃圾收集器和弱引用规则的对象

解析入参映射关系(/mapper/parameterMap 节点)parameterMapElement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
ini复制代码private void parameterMapElement(List<XNode> list) throws Exception {
// 遍历parameterMap节点
for (XNode parameterMapNode : list) {
// 获取节点中的 ID 属性
String id = parameterMapNode.getStringAttribute("id");

// 获取节点中的 type 属性
String type = parameterMapNode.getStringAttribute("type");

// 加载 type类
Class<?> parameterClass = resolveClass(type);

// 在parameterMap节点中获取到 parameter节点
List<XNode> parameterNodes = parameterMapNode.evalNodes("parameter");

// 声明参数参数的映射关系
List<ParameterMapping> parameterMappings = new ArrayList<ParameterMapping>();

// 遍历parameter节点
for (XNode parameterNode : parameterNodes) {
// 在parameter节点中 获取 property、javaType、jdbcType、resultMap、mode、typeHandler、numericScale 属性
/**
* property 类中的属性名
* javaType java类型
* jdbcType 数据库类型
* resultMap 结果映射关系
* mode 标识参数类型:IN - 入参,OUT - 出参,INOUT - 入参和出参
* typeHandler 类型处理器
* numericScale 小数精度
*/
String property = parameterNode.getStringAttribute("property");
String javaType = parameterNode.getStringAttribute("javaType");
String jdbcType = parameterNode.getStringAttribute("jdbcType");
String resultMap = parameterNode.getStringAttribute("resultMap");
String mode = parameterNode.getStringAttribute("mode");
String typeHandler = parameterNode.getStringAttribute("typeHandler");
Integer numericScale = parameterNode.getIntAttribute("numericScale");

// 加载参数类型枚举
ParameterMode modeEnum = resolveParameterMode(mode);

// 加载java类型类
Class<?> javaTypeClass = resolveClass(javaType);
// 获取到jdbc类型枚举
JdbcType jdbcTypeEnum = resolveJdbcType(jdbcType);

// 加载 类型处理器
Class<? extends TypeHandler<?>> typeHandlerClass = (Class<? extends TypeHandler<?>>) resolveClass(typeHandler);

// 根据加载好的参数 构建参数映射
/**
* 1. 解析resultMap (拼接包名)
* 2. 解析javaTypeClass,如果javaType为空那么:判断JdbcType为CURSOR时加载java.sql.ResultSet类,如果返回结果为Map时加载object类,如果获取不到加载Object类型
* 3. 解析类型处理器
* 4. 构建ParameterMapping对象
*/
ParameterMapping parameterMapping = builderAssistant.buildParameterMapping(parameterClass, property, javaTypeClass, jdbcTypeEnum, resultMap, modeEnum, typeHandlerClass, numericScale);

// 将结果对象添加到parameterMappings中
parameterMappings.add(parameterMapping);
}

// 构建代理添加 configuration.parameterMaps对象中(KEY 为 id,value为构建好的ParameterMap对象)
builderAssistant.addParameterMap(id, parameterClass, parameterMappings);
}
}

解析出参映射关系(/mapper/resultMap 节点)resultMapElements

遍历所有的resultMap节点 调用resultMapElement方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
scss复制代码  private ResultMap resultMapElement(XNode resultMapNode, List<ResultMapping> additionalResultMappings) throws Exception {
// 在ThreadLocal中记录
ErrorContext.instance().activity("processing " + resultMapNode.getValueBasedIdentifier());

// 在 节点中获取 id属性
String id = resultMapNode.getStringAttribute("id", resultMapNode.getValueBasedIdentifier());

// 在 节点中获取 type类型,如果type为空获取ofType、如果ofType为空获取resultType、如果resultType为空获取javaType
String type = resultMapNode.getStringAttribute("type",
resultMapNode.getStringAttribute("ofType",
resultMapNode.getStringAttribute("resultType",
resultMapNode.getStringAttribute("javaType"))));

// 在 节点中获取 extend属性
String extend = resultMapNode.getStringAttribute("extends");
// 在 节点中获取 autoMapping属性
Boolean autoMapping = resultMapNode.getBooleanAttribute("autoMapping");

// 加载resultMap的类
Class<?> typeClass = resolveClass(type);

// 声明 discriminator(鉴频器)
Discriminator discriminator = null;
// 声明 resultMappings
List<ResultMapping> resultMappings = new ArrayList<ResultMapping>();
// 添加原有的resultMappings
resultMappings.addAll(additionalResultMappings);

// 在resultMap节点中获取子节点
List<XNode> resultChildren = resultMapNode.getChildren();

// 遍历所有的 resultMap节点
for (XNode resultChild : resultChildren) {
// 如果节点名为 构造函数 节点
if ("constructor".equals(resultChild.getName())) {
/**
* 1. 获取constructor节点的子节点
* 2. 添加flags中添加构造函数,如果节点名为idArg那么添加id
* 3. 构建ResultMapping 并将结果添加到resultMappings中
*/
processConstructorElement(resultChild, typeClass, resultMappings);

}
// 如果节点名为 discriminator(鉴频器)
else if ("discriminator".equals(resultChild.getName())) {
// 构建 discriminator(鉴频器)
discriminator = processDiscriminatorElement(resultChild, typeClass, resultMappings);
} else {
// 判断节点名字是否为id 如果为ID 在标志中添加id字段
List<ResultFlag> flags = new ArrayList<ResultFlag>();
if ("id".equals(resultChild.getName())) {
flags.add(ResultFlag.ID);
}
// 构建ResultMapping 并将结果添加到resultMappings中
resultMappings.add(buildResultMappingFromContext(resultChild, typeClass, flags));
}
}

// 根据上述属性构建 resultMapResolver
ResultMapResolver resultMapResolver = new ResultMapResolver(builderAssistant, id, typeClass, extend, discriminator, resultMappings, autoMapping);
try {
/**
* 1. 如果当前resultMap设置了集成属性,在configuration中获取继承的ResultMap,并取 继承的和本身的并集
* 2. 将组装好的ResultMap存储在configuration的resultMaps中 key 为 设置的ID value 为构建好的映射关系合集
*/
return resultMapResolver.resolve();
} catch (IncompleteElementException e) {
// 如果解析异常了,存放到IncompleteResultMap中,在处理结束后再次加载
configuration.addIncompleteResultMap(resultMapResolver);
throw e;
}
}
处理构造函数节点
1
2
3
4
5
6
7
8
9
10
11
csharp复制代码  private void processConstructorElement(XNode resultChild, Class<?> resultType, List<ResultMapping> resultMappings) throws Exception {
List<XNode> argChildren = resultChild.getChildren();
for (XNode argChild : argChildren) {
List<ResultFlag> flags = new ArrayList<ResultFlag>();
flags.add(ResultFlag.CONSTRUCTOR);
if ("idArg".equals(argChild.getName())) {
flags.add(ResultFlag.ID);
}
resultMappings.add(buildResultMappingFromContext(argChild, resultType, flags));
}
}
构建ResultMapping属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
ini复制代码  private ResultMapping buildResultMappingFromContext(XNode context, Class<?> resultType, List<ResultFlag> flags) throws Exception {
String property;
// 如果存在构造函数标志 property为节点中的name属性 否则 为节点的property属性
if (flags.contains(ResultFlag.CONSTRUCTOR)) {
property = context.getStringAttribute("name");
} else {
property = context.getStringAttribute("property");
}

// 在节点中获取 column 列名
String column = context.getStringAttribute("column");
// 在节点中获取 javaType java类型
String javaType = context.getStringAttribute("javaType");
// 在节点中获取 jdbcType 数据库类型
String jdbcType = context.getStringAttribute("jdbcType");

// 在节点中获取 select 用于执行对应的select语句
String nestedSelect = context.getStringAttribute("select");

// 在节点中获取 resultMap
String nestedResultMap = context.getStringAttribute("resultMap",
processNestedResultMappings(context, Collections.<ResultMapping> emptyList()));

// 在节点中获取 notNullColumn 不为空的列 (如果该列为空,不生成子对象 - 待认证
String notNullColumn = context.getStringAttribute("notNullColumn");

// 在节点中获取 columnPrefix 当有多个结果集时,用该字段区分
String columnPrefix = context.getStringAttribute("columnPrefix");

// 在节点中获取 typeHandler 类型处理器
String typeHandler = context.getStringAttribute("typeHandler");

// 在节点中获取 resultSet 当有多个结果集时 获取设置的结果集
String resultSet = context.getStringAttribute("resultSet");
// 在节点中获取 foreignColumn 当有多个结果集时 映射字段
String foreignColumn = context.getStringAttribute("foreignColumn");

// 在节点中获取 fetchType 默认值为 在配置中获取的是否启动懒加载,如果启动返回TRUE 否则返回 FALSE
boolean lazy = "lazy".equals(context.getStringAttribute("fetchType", configuration.isLazyLoadingEnabled() ? "lazy" : "eager"));

// 加载 java类
Class<?> javaTypeClass = resolveClass(javaType);

// 加载 类型处理器类
Class<? extends TypeHandler<?>> typeHandlerClass = (Class<? extends TypeHandler<?>>) resolveClass(typeHandler);

// 解析 jdbc类型
JdbcType jdbcTypeEnum = resolveJdbcType(jdbcType);

// 构建代理 根据配置 构建ResultMapping对象返回
return builderAssistant.buildResultMapping(resultType, property, column, javaTypeClass, jdbcTypeEnum, nestedSelect, nestedResultMap, notNullColumn, columnPrefix, typeHandlerClass, flags, resultSet, foreignColumn, lazy);
}
解析discriminator(鉴频器)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ini复制代码  private Discriminator processDiscriminatorElement(XNode context, Class<?> resultType, List<ResultMapping> resultMappings) throws Exception {
// 在节点中获取到 column 列名
String column = context.getStringAttribute("column");
// 在节点中获取到 javaType java类型
String javaType = context.getStringAttribute("javaType");
// 在节点中获取到 jdbcType 数据库类型
String jdbcType = context.getStringAttribute("jdbcType");
// 在节点中获取到 typeHandler 类型处理器
String typeHandler = context.getStringAttribute("typeHandler");

// 解析java类
Class<?> javaTypeClass = resolveClass(javaType);
// 解析类型处理器
Class<? extends TypeHandler<?>> typeHandlerClass = (Class<? extends TypeHandler<?>>) resolveClass(typeHandler);
// 解析数据库类型
JdbcType jdbcTypeEnum = resolveJdbcType(jdbcType);

// 声明discriminator(鉴频器)Map
Map<String, String> discriminatorMap = new HashMap<String, String>();
// 遍历所有的子节点
for (XNode caseChild : context.getChildren()) {
// 获取子节点的 value 属性
String value = caseChild.getStringAttribute("value");
// 获取子节点的 resultMap 属性
String resultMap = ceChild.getStringAttribute("resultMap", processNestedResultMappings(caseChild, resultMappings));
// 将value 和 resultMap 放置到discriminatorMap中
discriminatorMap.put(value, resultMap);
}
// 构建 discriminator(鉴频器)
return builderAssistant.buildDiscriminator(resultType, column, javaTypeClass, jdbcTypeEnum, typeHandlerClass, discriminatorMap);
}

解析SQL代码块(/mapper/sql 节点) sqlElement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码  private void sqlElement(List<XNode> list) throws Exception {
// 如果 数据库配置了 DatabaseId(数据库厂商 方言)使用方言解析sql 额否则直接解析
if (configuration.getDatabaseId() != null) {
sqlElement(list, configuration.getDatabaseId());
}
sqlElement(list, null);
}

private void sqlElement(List<XNode> list, String requiredDatabaseId) throws Exception {
// 遍历所有的sql节点
for (XNode context : list) {
// 在节点中获取 databaseId 数据库方言
String databaseId = context.getStringAttribute("databaseId");
// 在节点中获取 id 属性
String id = context.getStringAttribute("id");
// 组装当前的命名空间 将 id 和当前类锁定
id = builderAssistant.applyCurrentNamespace(id, false);
// 判断 databaseId (数据库方言) 是否匹配 如果匹配在sqlFragments添加记录 (同时找到带有 databaseId 和不带 databaseId 的相同语句,则后者会被舍弃)
if (databaseIdMatchesCurrent(id, databaseId, requiredDatabaseId)) {
sqlFragments.put(id, context);
}
}
}
解析SQL执行语句() buildStatementFromContext 解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scss复制代码  private void buildStatementFromContext(List<XNode> list) {
// 如果 数据库配置了 DatabaseId(数据库厂商 方言)使用方言解析sql 额否则直接解析
if (configuration.getDatabaseId() != null) {
buildStatementFromContext(list, configuration.getDatabaseId());
}
buildStatementFromContext(list, null);
}

// 解析 SQL 语句
private void buildStatementFromContext(List<XNode> list, String requiredDatabaseId) {
// 遍历所有节点
for (XNode context : list) {
// 创建一个xml构建器
final XMLStatementBuilder statementParser = new XMLStatementBuilder(configuration, builderAssistant, context, requiredDatabaseId);
try {
// 使用构建器解析
statementParser.parseStatementNode();
} catch (IncompleteElementException e) {
// 如果解析失败,添加到IncompleteStatement中,等当前文件解析完后重新构建
configuration.addIncompleteStatement(statementParser);
}
}
}

解析SQL节点(select|insert|update|delete节点)XMLStatementBuilder.parseStatementNode

使用XMLStatementBuilder解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
ini复制代码  public void parseStatementNode() {
// sql节点中 获取 id
String id = context.getStringAttribute("id");
// sql节点中 获取 数据库方言
String databaseId = context.getStringAttribute("databaseId");

// 如果数据库方言不匹配直接返回
if (!databaseIdMatchesCurrent(id, databaseId, this.requiredDatabaseId)) {
return;
}

// sql节点中 获取 fetchSize
Integer fetchSize = context.getIntAttribute("fetchSize");
// sql节点中 获取 timeout 超时时间
Integer timeout = context.getIntAttribute("timeout");
// sql节点中 获取 parameterMap 参数映射
String parameterMap = context.getStringAttribute("parameterMap");
// sql节点中 获取 parameterType 参数类型
String parameterType = context.getStringAttribute("parameterType");

// 解析参数类型
Class<?> parameterTypeClass = resolveClass(parameterType);

// sql节点中 获取 resultMap 结果映射
String resultMap = context.getStringAttribute("resultMap");
// sql节点中 获取 resultType 结果类型
String resultType = context.getStringAttribute("resultType");

// sql节点中 获取 lang 为特定的语句指定语言(驱动类)?
String lang = context.getStringAttribute("lang");
// 解析语言驱动
LanguageDriver langDriver = getLanguageDriver(lang);

// 解析结果类型
Class<?> resultTypeClass = resolveClass(resultType);

// sql节点中 获取 resultSetType 指定的结果类型
String resultSetType = context.getStringAttribute("resultSetType");

// sql节点中 获取并解析 statementType 语句类型
StatementType statementType = StatementType.valueOf(context.getStringAttribute("statementType", StatementType.PREPARED.toString()));
// 解析 结果类型
ResultSetType resultSetTypeEnum = resolveResultSetType(resultSetType);

// 获取节点的名字
String nodeName = context.getNode().getNodeName();
// 获取节点的SQL语句类型
SqlCommandType sqlCommandType = SqlCommandType.valueOf(nodeName.toUpperCase(Locale.ENGLISH));

// 如果为查询语句,那么设置标记
boolean isSelect = sqlCommandType == SqlCommandType.SELECT;

// sql节点中 获取 flushCache 是否刷新缓存,默认不是查询都会开启
boolean flushCache = context.getBooleanAttribute("flushCache", !isSelect);

// sql节点中 获取 useCache 是否使用缓存,默认是查询都会开启
boolean useCache = context.getBooleanAttribute("useCache", isSelect);


// sql节点中 获取 resultOrdered 是否返回多个结果集,默认是不开启
boolean resultOrdered = context.getBooleanAttribute("resultOrdered", false);

// 解析 include 节点
XMLIncludeTransformer includeParser = new XMLIncludeTransformer(configuration, builderAssistant);
includeParser.applyIncludes(context.getNode());

// 解析 SelectKey 节点
processSelectKeyNodes(id, parameterTypeClass, langDriver);

// Parse the SQL (pre: <selectKey> and <include> were parsed and removed)

// langDriver(语言驱动)创建SQL原数据 (设置不同节点的节点处理器)
SqlSource sqlSource = langDriver.createSqlSource(configuration, context, parameterTypeClass);


// sql节点中 获取 resultSets 返回结果的集合名字
String resultSets = context.getStringAttribute("resultSets");
// sql节点中 获取 keyProperty 返回后设置的属性名
String keyProperty = context.getStringAttribute("keyProperty");
// sql节点中 获取 keyColumn 返回后设置的属性的列
String keyColumn = context.getStringAttribute("keyColumn");

// 声明Key生成器
KeyGenerator keyGenerator;
// 声明 keyStatementId ? 为当前SQL语句的ID + !selectKey
String keyStatementId = id + SelectKeyGenerator.SELECT_KEY_SUFFIX;
// 构建当前类的ID
keyStatementId = builderAssistant.applyCurrentNamespace(keyStatementId, true);

// 如果配置有ID生成器 通过ID生成器获取keyGenerator 否则 在节点中获取 useGeneratedKeys 使用的KEY生成器
if (configuration.hasKeyGenerator(keyStatementId)) {
keyGenerator = configuration.getKeyGenerator(keyStatementId);
} else {
keyGenerator = context.getBooleanAttribute("useGeneratedKeys",
configuration.isUseGeneratedKeys() && SqlCommandType.INSERT.equals(sqlCommandType))
? Jdbc3KeyGenerator.INSTANCE : NoKeyGenerator.INSTANCE;
}

// 构建MappedStatement 构建 MappedStatement 存放在 Configuration.ji sql查询的id
builderAssistant.addMappedStatement(id, sqlSource, statementType, sqlCommandType,
fetchSize, timeout, parameterMap, parameterTypeClass, resultMap, resultTypeClass,
resultSetTypeEnum, flushCache, useCache, resultOrdered,
keyGenerator, keyProperty, keyColumn, databaseId, langDriver, resultSets);
}
解析SelectKeyNodes processSelectKeyNodes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
ini复制代码  private void processSelectKeyNodes(String id, Class<?> parameterTypeClass, LanguageDriver langDriver) {
// sql节点中 获取 selectKey 节点
List<XNode> selectKeyNodes = context.evalNodes("selectKey");

// 如果 数据库配置了 DatabaseId(数据库厂商 方言)使用方言解析 额否则直接解析
if (configuration.getDatabaseId() != null) {
parseSelectKeyNodes(id, selectKeyNodes, parameterTypeClass, langDriver, configuration.getDatabaseId());
}
parseSelectKeyNodes(id, selectKeyNodes, parameterTypeClass, langDriver, null);


removeSelectKeyNodes(selectKeyNodes);
}

private void parseSelectKeyNodes(String parentId, List<XNode> list, Class<?> parameterTypeClass, LanguageDriver langDriver, String skRequiredDatabaseId) {
// 遍历获取到的节点
for (XNode nodeToHandle : list) {
// 设置id (由SQL语句的ID + !selectKey 组成)
String id = parentId + SelectKeyGenerator.SELECT_KEY_SUFFIX;

// 获取数据库驱动
String databaseId = nodeToHandle.getStringAttribute("databaseId");

// 如果驱动一致
if (databaseIdMatchesCurrent(id, databaseId, skRequiredDatabaseId)) {
parseSelectKeyNode(id, nodeToHandle, parameterTypeClass, langDriver, databaseId);
}
}
}

private void parseSelectKeyNode(String id, XNode nodeToHandle, Class<?> parameterTypeClass, LanguageDriver langDriver, String databaseId) {
// 在节点中 获取 resultType 结果类型
String resultType = nodeToHandle.getStringAttribute("resultType");
// 解析结果类型
Class<?> resultTypeClass = resolveClass(resultType);

// sql节点中 获取并解析 statementType 语句类型
StatementType statementType = StatementType.valueOf(nodeToHandle.getStringAttribute("statementType", StatementType.PREPARED.toString()));

// sql节点中 获取 keyProperty 语句结果应该被设置到的目标属性。如果生成列不止一个,可以用逗号分隔多个属性名称
String keyProperty = nodeToHandle.getStringAttribute("keyProperty");

// 在节点中 获取 keyColumn 返回结果集中生成列属性的列名。如果生成列不止一个,可以用逗号分隔多个属性名称
String keyColumn = nodeToHandle.getStringAttribute("keyColumn");

// 在节点中 获取 order 设置selectKey的执行属性 在执行语句之前,还是之后
boolean executeBefore = "BEFORE".equals(nodeToHandle.getStringAttribute("order", "AFTER"));

// 设置默认值 不适用缓存、没有多个结果集、没有Key生成器、返回大小为空、不设置超时时间、不刷新缓存、入参映射为空、出参映射为空、返回结果为空
boolean useCache = false;
boolean resultOrdered = false;
KeyGenerator keyGenerator = NoKeyGenerator.INSTANCE;
Integer fetchSize = null;
Integer timeout = null;
boolean flushCache = false;
String parameterMap = null;
String resultMap = null;
ResultSetType resultSetTypeEnum = null;

// langDriver(语言驱动)创建SQL原数据 (设置不同节点的节点处理器)
SqlSource sqlSource = langDriver.createSqlSource(configuration, nodeToHandle, parameterTypeClass);

// SQL执行类型为SELECT(查询)
SqlCommandType sqlCommandType = SqlCommandType.SELECT;

// 构建MappedStatement 构建 MappedStatement 存放在 Configuration.mappedStatements KEY为SelectKey的id
builderAssistant.addMappedStatement(id, sqlSource, statementType, sqlCommandType,
fetchSize, timeout, parameterMap, parameterTypeClass, resultMap, resultTypeClass,
resultSetTypeEnum, flushCache, useCache, resultOrdered,
keyGenerator, keyProperty, keyColumn, databaseId, langDriver, null);

id = builderAssistant.applyCurrentNamespace(id, false);

// 并将当前SQL存在 configuration.keyGenerators 中 key为SQL语句的ID
MappedStatement keyStatement = configuration.getMappedStatement(id, false);
configuration.addKeyGenerator(id, new SelectKeyGenerator(keyStatement, executeBefore));
}

文章链接

  • 【Myabtis】- 1. 整合spring boot后启动流程
  • 【Mybatis】- 1.1 启动过程中,Mapper接口的扫描与代理注册
  • 【Mybatis】- 1.2 启动过程中,Mapper.xml的解析
  • 【Mybatis】- 2. SQL语句的执行过程
  • 【Mybatis-附件1】- Mapper.xml 参数配置以及含义

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

位移主题:Kafka 的消费者组是怎么保存消费位移的?

发表于 2021-11-19

这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战

相关:Kafka 中的消费者组

消费者位移

前一篇介绍了 Kafka 的消费者组,在消费者组中,每一个消费者实例会被分配到若干个主题分区,负责消费这些分区的消息。在消费的过程中,需要记录下每个分区中的消息「目前消费到哪一个了」,也就是 Kafka 中的消费者位移。这些位移信息都保存在什么地方呢?

消费者位移的存储

在最初的版本中,这些信息都被存储在 ZooKeeper 中,当提交消费者位移的时候(无论是手动还是自动提交),这些信息会被提交到 ZooKeeper 中保存,如果消费者重新启动了,它会自动从 ZooKeeper 中读取已保存的位移信息,这样就知道了上次消费到了什么位置。

但是,消费者每次消费成功一个消息的时候,都会提交位移,而 ZooKeeper 并不适合这种高频的写操作。于是,在后来的版本中,Kafka 更新了位移的管理机制。Kafka 将位移信息作为一条一条普通的 Kafka 消息,保存在一个特定的 Topic 中,这个 Topic 的名字叫 __consumer_offsets,也可以叫做位移主题,这样高频读写就不是什么问题了。

要注意的是,虽然这个 Topic 相较于其他的 Topic 并没有什么特殊之处,但是强烈建议不要操作这个 Topic,老老实实使用 Kafka 的 API 提交位移。

这个主题的创建时机是第一个消费者程序启动的时候,Kafka 会自动创建这个主题,默认情况下,这个主题是一个 50 分区 3 副本的主题,你可以通过 Broker 端的参数 offsets.topic.num.partitions 和 offsets.topic.replication.factor 参数来修改这两个默认值。或者,也可以在第一个消费者程序启动之前,使用 Kafka 提供的 API 来手动创建这个主题,并指定分区数和副本数。

位移主题的内容

在位移主题中,消息的格式是 Kafka 定义的,因此,这也是强烈建议不要手动操作这个主题或者向其中发送信息的原因,如果你发送了不符合 Kafka 规定格式的信息,那么 Kafka 就无法解析它,造成的后果就是 Broker 崩溃。

这位移主题中的消息,可以简单地理解为「某个消费者组在某个主题的某个分区的消费位移是多少」,也可以理解为一个键值对,其中的 Key 是 Group ID + Topic + Partition,Value 就是位移,其中的细节这里不多做介绍。

Compact 压实策略

你可能会考虑到一个问题,一个消费者组在一个主题一个分区的位移数据,会随着消费不断地发生变化,这时就会有保存位移的消息不断地被发送到位移主题中,但其实,同一个消费者组在同一个主题同一个分区的位移,只保留最新的一条就可以了,也就是说,只保留最新的结果就可以了,没必要保存它不断变化的过程,这样可以节省大量的存储空间。

Kafka 就是这么做的,它通过 Compact 策略,对于同一个 Key 的消息,只保留最新的一条。(如果你了解 Redis,这与 Redis 的 AOF 重写机制很相似)。

image.png

这个工作是由一个专门的后台线程 Log Cleaner 来完成的,它会定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring 自动装配【12】processImport中

发表于 2021-11-19

这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战」

前言

上一篇文章中我们通过idea引用查找的功能,找到分析了:

  • @Import注解是在哪里被处理的

这里我们接着往下,来看看@Import最终具体执行的三种情况:

  • DeferImportSelector
  • ImportBeanDefinitionRegistrar
  • 可以按照**@Configuration**处理的

这些情况下分别是如何处理的。

DeferImportSelector

上一篇文章里我们知道了:

  • DeferImportSelector在processImport方法中的处理方式,暂时认为只是暂时保存到deferredImportSelectors这个list中保存起来了。

那么,这些保存起来的信息具体是在哪里使用的?

处理

这个类中有一个process方法,那么就应该是处理的方法了,我们来看看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码public void process() {
List<DeferredImportSelectorHolder> deferredImports = this.deferredImportSelectors;
this.deferredImportSelectors = null;
try {
if (deferredImports != null) {
DeferredImportSelectorGroupingHandler handler = new DeferredImportSelectorGroupingHandler();
deferredImports.sort(DEFERRED_IMPORT_COMPARATOR);
//这里的register其实只是把数据往里面塞的动作
deferredImports.forEach(handler::register);
//看下面
handler.processGroupImports();
}
}
finally {
this.deferredImportSelectors = new ArrayList<>();
}
}

public void processGroupImports() {
for (DeferredImportSelectorGrouping grouping : this.groupings.values()) {
Predicate<String> exclusionFilter = grouping.getCandidateFilter();
grouping.getImports().forEach(entry -> {
ConfigurationClass configurationClass = this.configurationClasses.get(entry.getMetadata());
try {
//是不是看到了我们的老朋友?
processImports(configurationClass, asSourceClass(configurationClass, exclusionFilter),
Collections.singleton(asSourceClass(entry.getImportClassName(), exclusionFilter)),
exclusionFilter, false);
}
catch (BeanDefinitionStoreException ex) {
throw ex;
}
catch (Throwable ex) {
throw new BeanDefinitionStoreException(
"Failed to process import candidates for configuration class [" +
configurationClass.getMetadata().getClassName() + "]", ex);
}
});
}
}

这里就看到,又调到processImports里了。

之前的handler的方法我们没有细究,这里再贴一下代码,这样子运转机制就清晰了:

1
2
3
4
5
6
7
8
9
10
11
12
13
> java复制代码public void handle(ConfigurationClass configClass, DeferredImportSelector importSelector) {
> DeferredImportSelectorHolder holder = new DeferredImportSelectorHolder(configClass, importSelector);
> if (this.deferredImportSelectors == null) {
> DeferredImportSelectorGroupingHandler handler = new DeferredImportSelectorGroupingHandler();
> handler.register(holder);
> handler.processGroupImports();
> }
> else {
> this.deferredImportSelectors.add(holder);
> }
> }
>
>

其实这里的processGroupImport,在处理import注解的Defer的时候已经调用了(processImport),那么此处综合起来看就是

  • 处理的时候,每当有一个新的Defer进来,先处理之前的存的,再把这个defer的存起来。
  • 最后调到这个process的时候,就会把所有的group执行一下,并把这些收集的group回收掉。

这个关系如下:

processImport新的defer来了如果容器为空注册holder并执行如果容器不为空存起来process将容器置为null调用processGroupImport注册所有holder并执行新建容器
而这个容器(deferredImportSelectors),只在process方法的时候会置空,处理完之后就变成了新的容器(new)。

这说明:

  • 这里的defer延迟,意即:
    • 如果process方法不在执行中时,这些importSelector会暂时保存起来。
    • 如果process方法在执行并且已经开始了(标志是:容器为空),那么此时不能偷懒往后延迟了,只能自己来执行。

在DeferredImportSelectorHandler的handle方法上的注解很明确地说了这一点:

1
2
3
4
5
6
> java复制代码Handle the specified {@link DeferredImportSelector}. If deferred import
> selectors are being collected, this registers this instance to the list. If
> they are being processed, the {@link DeferredImportSelector} is also processed
> immediately according to its {@link DeferredImportSelector.Group}.
>
>

调用

代码中我们可以发现:

  • 只有通过handler,才能调用和保存这些DeferredImportSelector。

而这个类只在ConfigurationClassParser.parse和processImports中被调用:

  • parse调用process方法
  • processimport调用handle方法

小结

这里也能看出来,DeferredImportSelector也并没有实际上将import落地处理掉,处理的时候也是得委托到下面两个来。

ImportBeanDefinitionRegistrar

在processImport中,这部分的处理其实也比较简单:

1
2
3
4
5
java复制代码Class<?> candidateClass = candidate.loadClass();
ImportBeanDefinitionRegistrar registrar =
ParserStrategyUtils.instantiateClass(candidateClass, ImportBeanDefinitionRegistrar.class,
this.environment, this.resourceLoader, this.registry);
configClass.addImportBeanDefinitionRegistrar(registrar, currentSourceClass.getMetadata());

实际上,就是实例化该类,并放到configClass中。那么这部分最终在哪里处理呢?

借助idea可以看到在这里被用到了(类:ConfigurationClassBeanDefinitionReader):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码//这个方法在loadBeanDefinitions被循环调用	
private void loadBeanDefinitionsForConfigurationClass(
ConfigurationClass configClass, TrackedConditionEvaluator trackedConditionEvaluator) {
//........
loadBeanDefinitionsFromRegistrars(configClass.getImportBeanDefinitionRegistrars());
}

//一般来说,这里的registry实际上是在Spring启动中的容器(各种context)
private void loadBeanDefinitionsFromRegistrars(Map<ImportBeanDefinitionRegistrar, AnnotationMetadata> registrars) {
registrars.forEach((registrar, metadata) ->
registrar.registerBeanDefinitions(metadata, this.registry, this.importBeanNameGenerator));
}

//而上面的这里:
default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry,
BeanNameGenerator importBeanNameGenerator) {

registerBeanDefinitions(importingClassMetadata, registry);
}

default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
//子类实现
}

看到registerBeanDefinitions,这里就比较清楚了:

  • 解析到的ImportBeanDefinitionRegistrar,最终会被丢到context中来初始化成beanDefinition。

这个接口的注解也说明了这一点:

1
2
java复制代码Register bean definitions as necessary based on the given annotation metadata of
the importing {@code @Configuration} class.

如果要真的看明白这个方法是如何实现的,那么就需要trace到上游调用和子类实现了,因此我们放到后面再说。【ImportBeanDefinitionRegistrar】

@Configuration方式

这里的处理在这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码processConfigurationClass(candidate.asConfigClass(configClass), exclusionFilter);

protected void processConfigurationClass(ConfigurationClass configClass, Predicate<String> filter) throws IOException {
if (this.conditionEvaluator.shouldSkip(configClass.getMetadata(), ConfigurationPhase.PARSE_CONFIGURATION)) {
return;
}

ConfigurationClass existingClass = this.configurationClasses.get(configClass);
//如果这里已经保存了
if (existingClass != null) {
//并且想放进去的configClass,importedBy容器中不是空的
if (configClass.isImported()) {
//原来已有的也是通过@Import注册的(importedBy容器中不是空的)
if (existingClass.isImported()) {
//我们把原来的合并一下
existingClass.mergeImportedBy(configClass);
}
// Otherwise ignore new imported config class; existing non-imported class overrides it.
return;
}
else {
//这里的else对应的是上面的configClass.isImported(),意思就是:
//我们期望放进来的,不是通过import注解的
//看下面的注释,意思就是:把原来的记录给删掉,已现在的为准
//但是我们是通过@Import注解进来的,这个是咋会不是import的呢?
// Explicit bean definition found, probably replacing an import.
// Let's remove the old one and go with the new one.
this.configurationClasses.remove(configClass);
this.knownSuperclasses.values().removeIf(configClass::equals);
}
}

//递归地构建这个sourceClass
// Recursively process the configuration class and its superclass hierarchy.
SourceClass sourceClass = asSourceClass(configClass, filter);
do {
//这里是processImport调用的一个地方,在前面我们提到过,到这里就变成递归调用了
sourceClass = doProcessConfigurationClass(configClass, sourceClass, filter);
}
while (sourceClass != null);

this.configurationClasses.put(configClass, configClass);
}

这里也能看到:

  • 如果是@Configuration,其实最终还是得变成ImportBeanDefinitionRegistrar,否则就继续递归了。

小结

看完这部分,应该对processImport中的四种方式有了一个大概了解:

  • 如果是要最终解析的,那么八成是得通过ImportBeanDefinitionRegistrar来导入了,否则就是一直递归到最下面,变成ImportBeanDefinitionRegistrar。
  • 这里埋得坑:
+ 类:【**ImportBeanDefinitionRegistrar**】
    - 何处处理的?
    - 子类如何实现的?
    - configurationClass类中,**loadBeanDefinitionsForConfigurationClass**在哪里,何时被调用?
+ 方法:**processConfigurationClass**
    - 这个方法中为什么会有**ConfigurationClass**不是import的情况出现呢,也就是说:
        * ConfigurationClass中的importedBy是如何维护的?
+ 还有之前没有处理的问题:
    - **doProcessConfigurationClass**干了啥?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何并发执行Linux命令

发表于 2021-11-19

这是我参与11月更文挑战的第8天

串行是按照顺序一条接着一条的执行命令,与串行相对的是多条命令同时执行,称做并行

用途

在bash脚本中,并发执行命令的用途很多,比如:批量上传和下载,批量启动和关闭程序,批量打包日志,批量检查远程机器是否可达等等

并行执行能充分利用系统资源,极大的提高效率,节省大量的时间

例如:现需要下载20个文件,假设每个文件下载时间需要1分钟

串行下载是一个文件下载完之后再下载另一个文件,这样20个文件就需要20分钟才能下载完

并行下载是启动20个进程,20个进程同时下载,每个进程负责下载一个文件,这样20个文件理论上只需要1分钟左右就能全部下载完

串行和并行

在命令后面加上 & 符号就表示这条命令会在子进程中执行,下面是比较串行和并行的两个实例

c.sh脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
bash复制代码#!/bin/bash
func_a()
{
echo "func_a..$1.."
sleep 1
}

for n in $(seq 1 5);
do
func_a $n
done

echo "c.sh..finish.."

执行 time ./c.sh,结果如下

1
2
3
4
5
6
7
8
9
10
11
erlang复制代码[root@ecs-centos-7 mult]# time ./c.sh 
func_a..1..
func_a..2..
func_a..3..
func_a..4..
func_a..5..
c.sh..finish..

real 0m5.008s
user 0m0.005s
sys 0m0.003s

上面的例子,连续调用5次 func_a函数,每次调用都会睡眠1秒,,只有等上一次调用结束了才能进行下一次调用,所以5次调用总共花费了5秒钟

修改 c.sh脚本,内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
bash复制代码#!/bin/bash
func_a()
{
echo "func_a..$1.."
sleep 1
}

for n in $(seq 1 5);
do
func_a $n &
done
wait
echo "c.sh..finish.."

再次执行 time ./c.sh,结果如下

1
2
3
4
5
6
7
8
9
10
11
erlang复制代码[root@ecs-centos-7 mult]# time ./c.sh 
func_a..1..
func_a..3..
func_a..2..
func_a..4..
func_a..5..
c.sh..finish..

real 0m1.006s
user 0m0.004s
sys 0m0.005s

修改之后的脚本在循环体内 func_a $n 语句后面加了 & 符号,表示后在子进程中执行 func_a $n命令

循环结束后面有一个 wait 语句,表示等待前面循环体里的子进程全部结束才会执行后面的命令

由于子进程不会阻塞当前进程,所以当前进程可以继续执行下一次调用,所以5次调用总共只花费了1秒左右

在执行 time ./c.sh命令期间,打开另一个终端输入ps aux | grep c.sh 回车,输出如下

1
2
3
4
5
6
7
8
bash复制代码[root@ecs-centos-7 ~]# ps aux | grep c.sh
root 29086 0.0 0.0 113188 1400 pts/1 S+ 22:55 0:00 /bin/bash ./c.sh
root 29088 0.0 0.0 113188 628 pts/1 S+ 22:55 0:00 /bin/bash ./c.sh
root 29089 0.0 0.0 113188 628 pts/1 S+ 22:55 0:00 /bin/bash ./c.sh
root 29090 0.0 0.0 113188 628 pts/1 S+ 22:55 0:00 /bin/bash ./c.sh
root 29091 0.0 0.0 113188 628 pts/1 S+ 22:55 0:00 /bin/bash ./c.sh
root 29093 0.0 0.0 113188 628 pts/1 S+ 22:55 0:00 /bin/bash ./c.sh
root 29099 0.0 0.0 112728 972 pts/2 S+ 22:55 0:00 grep --color=auto c.sh

从上面的输出可以看到,同时有6个c.sh脚本进程,其中5个是5次循环产生的子进程,剩下1个是执行c.sh脚本本身的进程

检查主机是否可达

当有许多远程主机的需要管理的时候,经常会对这些主机做一些检查,而批量检查主机是否可达是常见的操作,下面简单介绍下如何利用并发执行快速检查主机是否可达

假如有 host.txt 文件,里面存储的是主机的IP地址,具体内容如下

1
2
3
4
5
6
7
8
9
10
csharp复制代码[root@ecs-centos-7 mult]# cat host.txt
192.168.0.1
192.168.0.2
192.168.0.3
192.168.0.4
192.168.0.5
192.168.0.6
192.168.0.7
192.168.0.8
192.168.0.9

c.sh脚本,内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bash复制代码#!/bin/bash

ping_test()
{
ping $1 -c 2 -W 3 &>/dev/null
if [ $? -eq 0 ]; then
echo "$1 reachable..."
else
echo "$1 unreachable..."
fi
}

for n in $(cat host.txt);
do
ping_test $n &
done

wait

echo "c.sh..finish.."

执行 time ./c.sh,结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
erlang复制代码[root@ecs-centos-7 mult]# time ./c.sh 
192.168.0.1 reachable...
192.168.0.9 reachable...
192.168.0.4 unreachable...
192.168.0.3 unreachable...
192.168.0.6 unreachable...
192.168.0.2 unreachable...
192.168.0.8 unreachable...
192.168.0.7 unreachable...
192.168.0.5 unreachable...
c.sh..finish..

real 0m3.010s
user 0m0.014s
sys 0m0.011s

脚本中 ping $1 -c 2 -w 3 语句是ping远程主机的命令, -c 2 表示发送数据包的次数 ,-W 3 是把超时时间设置为 3 秒

从检测结果可以看出,192.168.0.1 和 192.168.0.9主机是可达的,其他主机都是不可达,总共花费了3秒左右

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…277278279…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%