开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

从零蛋开始集成Spring Security 5 (三)

发表于 2021-11-17

「这是我参与11月更文挑战的第3天,活动详情查看:2021最后一次更文挑战」。

承接上文…

3.4 实现UserDetailsService

实现UserDetails, 创建了自己的用户类后, 还需要实现UserDetailsService, 将用户信息从数据库中查询出来

和UserDetails相同, 先来看一下官方文档:

3-4-1 UserDtails官方文档.png

只有一个根据用户名查询用户的方法, 实现起来并不困难

笔者使用的是MyBatis, 先创建查询仓库接口UserRepository:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public interface UserRepository {
​
 /**
  * 根据用户名查询用户信息
  *
  * @param username 用户名
  * @return 用户实体
  */
 User loadUserByUsername(@Param("username") String username);
​
}

具体的查询语句需要根据实际使用的数据库编写

需要注意的是, user结果中的角色属性roles应包含其对应的权限属性permissions, permissions中又应包含接口属性apis

创建用户服务类UserService:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@Component
@Data
@AllArgsConstructor
public class UserService implements UserDetailsService {
​
 private final UserRepository repository;
​
 @Override
 public UserDetail loadUserByUsername(String username) throws UsernameNotFoundException {
   return repository.loadUserByUsername(username);
}
​
}

3.5 Spring Security 5 配置

只剩下最后一步, 把刚才手动实现的内容进行配置, 告诉Spring Security就好啦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
​
 private final UserDetailRepository userDetailRepository;
​
 @Override
 protected void configure(HttpSecurity http) throws Exception {
   http.csrf().disable()
    .authorizeRequests().anyRequest().authenticated()
    .and()
    .formLogin();
}
​
 @Override
 @Bean
 @ConditionalOnMissingBean
 public UserDetailsService userDetailsServiceBean() {
   return new UserDetailsServiceImpl(userDetailRepository);
}
​
}

authenticated开启接口权限校验, formLogin开启默认表单登录

这里有个小坑, 我们需要手动配置一下PasswordEncoder:

1
2
3
4
5
6
7
8
9
10
java复制代码public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
​
 // 其它配置...
​
 @Bean
 public PasswordEncoder passwordEncoder() {
   return PasswordEncoderFactories.createDelegatingPasswordEncoder();
}
​
}

至此为止, 所有的Spring Security集成已经完成, 增加个测试用例ExampleApi:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@RestController
@RequestMapping(value = "/api/v1/example")
public interface ExampleApi {
​
 /**
  * 分页查询
  *
  * @param parameters 查询条件
  * @param pageable   分页
  * @return 分页结果
  */
 @GetMapping(value = "")
 @PreAuthorize("hasAuthority('api:example:search')")
 RestDataResult<Page<Example>> search(SearchParameter parameters, Pageable pageable);
   
}

利用@PreAuthorize注解声明接口权限校验规则, 例如上述代码中, 访问/api/v1/example需要用户拥有api:example:search权限

4 问题

本文所完成的登录认证存在如下两个问题:

  1. 使用Spring Security默认登录页面, 丑
  2. 有状态登录, 关闭浏览器或重启应用后, 登录状态将失效, 需要重新登录

笔者将会在下篇文章中继续集成Spring Security, 引入Jwt, 实现无状态模式登录

5 拓展阅读

  • Spring Security 自定义登录界面

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

分布式事务(3) TCC 补偿式事务

发表于 2021-11-17

这是我参与11月更文挑战的第13天,活动详情查看:2021最后一次更文挑战

一、什么是 TCC 事务

TCC 模式可以解决 2PC 中的资源锁定和阻塞问题,减少资源锁定时间。

TCC 是 try 、confirm、cancel 三个词语的缩写,TCC 要求每个分支事务实现三个操作:预处理Try 、确认Confirm、撤销 Cancel 。Try 操作坐业务检查及资源预留,Confirm 做业务确认操作,Cancel 实现一个 与 Try 相反的操作即回滚操作。TM 首先发起所有的分支事务的 try 操作,任何一个分支事务的 try 操作执行失败,TM 将会发起所有分支事务的cancel 操作,若try操作全部成功,TM 将会发起所有分支事务的Confirm 操作,其中 confirm/cancel 操作若执行失败,TM 会进行重试。

TCC 分为三个阶段:

  • Try 阶段做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的 confirm 一起才能真正构成一个完整的业务逻辑。
  • Confirm 阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行 Confirm。通常情况下,采用TCC 则认为Confirm 阶段是不会出错的。即:只要 Try成功,Confirm一定成功,若Confirm 阶段真的出错了,就需要引入重试机制或人工处理。
  • Cancel 阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采用TCC 则认为Cancel 阶段也是一定成功的。若Cancel 阶段真的出错了,需要引入重试机制会人工处理。
  • TM事务管理器:TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM 的角色,TM 独立出来是为了称为公用组件,是为了考虑系统结构和软件复用。

TM在发起全局事务时生成全局事务记录,全局事务 ID 贯穿整个分布式事务调用链条,用来记录事务上下文,追踪和记录状态,由于Confirm 和 cancel 失败需进行重试,因此需要实现为幂等,幂等性是指同一个操作无论请求多少次,其结果都相同。

img

执行流程:

  • 1、启动事务
  • 2、调用各个服务中的 try 接口,分布尝试执行事务
  • 3、判断 try 的结果,如果都成功了就提交事务,否则就要回滚事务
  • 4、提交事务就调用Confirm 接口,回滚事务就调用Cancel 接口
  • 5、系统判断要不要回滚是通过 ”是否抛出异常“ 来判断的,而不是根据返回值。

二、TCC 解决方案

2.1 概述

目前主流的 TCC 方案如下:

  • Seata :阿里云推出的组件,支持较多方案,主推AT(二阶段+分布式锁)
  • tcc-transaction:不和底层rpc耦合,使用dubbo,http,thrift,webservice都可
  • tx-lcn:支持常用的dubbo,springcloud框架,维护不频繁,热度有所下降
  • hmily:国内工程师开发,异步高性能TCC框架,适应国内环境
  • ByteTcc:国内开发,兼容JTA规范的TCC框架
  • EasyTransaction:柔性事务、TCC、SAGA、可靠消息等功能齐全,一站式解决

相对来说,Hmily 它更加轻量级,无需部署独立的 TCC 协调器,也适合Dubbo 和 SpringCloud 环境,它具备以下优点:

  • 无缝集成Spring,Spring boot start。
  • 无缝集成Dubbo,SpringCloud,Motan等rpc框架。
  • 多种事务日志的存储方式(redis,mongdb,mysql等)。
  • 多种不同日志序列化方式(Kryo,protostuff,hession)。
  • 事务自动恢复。
  • 支持内嵌事务的依赖传递。
  • 代码零侵入,配置简单灵活。

2.2 TCC中的特别处理

空回滚

在没有调用TCC中Try方法的情况下,调用了第二阶段的Cancel方法,Cancel方法需要直接识别出来这是一次空回滚,直接返回成功结果。

出现原因:分支事务所在系统服务宕机或者网络波段,分支事务的调用记录是失败的,该情况下其实分支事务没有进行Try操作,当故障恢复后,分布式事务进行了回滚则会调用二阶段的Cancel方法,既而形成了空回滚。

解决方法:已知全局事务id会贯穿整个全局分布式事务的调用链,额外增加一张分支事务记录表,其中有全局事务id和分支事务id,每一次成功的Try执行后插入一条分支事务执行记录,第二阶段Cancel执行时读取该表记录,如果该分支事务对应的执行记录存在,就回滚,如果不存在就认为是空回滚,直接返回成功。

幂等性 为了保证第二阶段中出现的失败情况,Hmily会有重试机制,此时就会出现幂等性问题。如果Cancel执行过程中没有保证好幂等性问题,会导致数据污染。

悬挂 悬挂就是Cancel先于Try执行了。

出现原因:当全局事务发起者通过RPC方式调用分支分支事务执行Try的时候,出现了调用网络延迟的问题,此时TM会认为RPC调用超时需要回滚,但是可能这次RPC的Try请求在回滚之后执行成功了。此次Try操作预留的资源只有该分布式事务可以使用,该次预留的资源无法进行后续的处理,这就是选个挂。

解决思路:在执行第一阶段Try操作之前,要在事务记录表中是否有该分支事务对应的二阶段事务,如果有记录就不执行Try。

总结:

  • 在TCC模式下,所有操作要保证幂等性
  • 需要有事务的执行记录
  • 执行Try和Cancel时候都需要进行操作记录判断

2.3 优势和缺点

  • 优势:TCC执行的每一阶段都会提交本地事务并释放锁,并不需要等待其他事务的执行结果。而如果其他事务执行失败,最后不是回滚,而是执行补偿操作。这样就避免了资源的长期锁定和阻塞等待,执行效率比较高,属于性能比较好的分布式事务方式。
  • 缺点:
+ 代码侵入:需要认为编写代码实现 try 、confirm、 cancel 代码侵入较多
+ 开发成本高:一个业务需要拆分成 3个步骤,分别编写业务实现,业务编写比较复杂
+ 安全性考虑:cancel 动作如果执行失败,资源就无法释放,需要引入重试机制,而重试导致重复执行哦,还有考虑重试的幂等性问题。

2.4 案例

场景:A 转账给30元 给 B,A 和 B 在不同的服务。

2.4.1 方案1:

账户 A

1
2
3
4
5
6
7
arduino复制代码try :
检查余额是否够30元
扣减30元
confirm:
空
cancel:
增加30元

账户B

1
2
3
4
5
6
arduino复制代码try :
增加30元
confirm:
空
cancel:
减少30元

该方案存在问题:

  • 1、没有做幂等控制:try、confirm 和 cancel 没有幂等控制,都可能重复执行
  • 2、空回滚:如果A 的try没有执行,在cancel 的时候就多加了30
  • 3、账户B 的try 阶段增加了30元,可能在try执行完成后被其他线程消费了,然后导致无法减少30元而报错。
2.4.2 优化方案

账户A

1
2
3
4
5
6
7
8
9
10
11
vbnet复制代码try:
try 幂等校验
try 悬挂处理
检查余额是否够30元
扣减30元
confirm:
空
cancel:
cancel 幂等校验
cancel 空回滚处理
增加余额30元

账户B

1
2
3
4
5
6
7
arduino复制代码try:
空
confirm:
confirm 幂等校验
正式增加30元
cancel:
空

2.5 使用场景

  • 对事务有一定的一致性要求(最终一致性)
  • 对性能要求较高
  • 开发人员具备较高的编码能力和幂等处理经验

\

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

快速搞定Go语言环境,帮你避坑

发表于 2021-11-17

这是我参与11月更文挑战的第9天,活动详情查看:2021最后一次更文挑战

大家好,我是老表,最近有想法了解学习下Go语言,也顺便开启Go语言学习专栏,希望自己站在一个初学者角度,能写出一些对其他初学者或者有问题的读者朋友有帮助的内容。

Go 是一个开源的编程语言,它能让构造简单、可靠且高效的软件变得容易,主要有以下特点:

  • 可直接编译成机器码,性能强劲
  • 支持并发,垃圾回收
  • 标准库很多,也很小,开发效率高
  • 代码简洁,格式统一,阅读方便

安装环境

进入go语言官网下载安装包,这里我们下载稳定版中的 installer版本。
官网地址:golang.google.cn/dl/

相比于解压版,直接安装包安装可以省去自己配置环境变量等操作,傻瓜式一键安装,方便,适合初学者。

安装完成后,我们可以打开终端,查看go是否安装好了,输入go version即可测试go指令是否正常,并查看安装的go版本。

正常显示版本信息,就说明go环境安装成功啦~

测试使用:Hello world

新建一个目录go,用于存放go语言相关代码,在go目录下新建一个hello.go,

1
2
bash复制代码cd Desktop/Project/ && mkdir go
touch hello.go

直接用文本编辑器打开,将下面代码复制进去,然后保存关闭,这样我们就写好go语言第一个项目代码啦。

1
2
3
4
5
6
7
go复制代码package main
import "fmt"

func main() {
/* 程序员的第一行代码 */
fmt.Println("Hello, World!")
}

打开终端,进入到对应的目录,并执行go run运行代码:

1
2
bash复制代码cd Desktop/Project/go 
go run hello.go


接下来我们简单解析下这几行代码各自的作用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
go复制代码// 表示当前文件hello_go.go 所在包是main
package main

// 表示导入一个包,和Python中导包类似
import "fmt"

/*
go中用func来定义函数
函数定义格式:func 函数名字(){
函数内容
}
main是程序执行入口
*/
func main() {
// 程序员的第一行代码
// 调用fmt包中的Println打印字符串
fmt.Println("Hello, World!")
}

以上我们就安装并简单使用go编写了一个项目,接下来我们将进行进一步学习。

安装&使用vs code

为了更方便的学习、编写代码,安装一个好的开发工具是很必要的,网络上查了一下,最终选择了vs code,主要原因:开源免费、插件多、之前也听很多大佬说过,两个字:好用。

基础安装

官网下载地址:code.visualstudio.com/

选择好对应的系统,然后下载稳定版。

下载好,解压后直接点击即可打开使用,都不用安装,还可以安装中文语言包,直接界面汉化,使用更方便。

太爱了,安装了很多工具包:

快捷操作

1
2
3
4
5
6
bash复制代码Ctrl/Command+P 快速调出查询窗口,可以查找文件,跳转
Ctrl/Command+Shift+P 快速调出命令窗口,可以执行一些指令,比如snippets,设置快捷代码块
Ctrl/Command+Shift+M 快速显示“问题”面板
Ctrl/Command+H 查找替换
Ctrl/Command+Shift+F 在整个文件夹中查找
Ctrl + ~ 调出终端或者隐藏
  • 设置快捷代码块
    在vs code中 Ctrl/Command+Shift+P,然后输入snippets,点击进去后,先选择要添加代码片段的语言,比如go。

    进入配置文件后,会有注释提示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
json复制代码{
// Place your snippets for go here. Each snippet is defined under a snippet name and has a prefix, body and
// description. The prefix is what is used to trigger the snippet and the body will be expanded and inserted. Possible variables are:
// $1, $2 for tab stops, $0 for the final cursor position, and ${1:label}, ${2:another} for placeholders. Placeholders with the
// same ids are connected.
// Example:
// "Print to console": {
// "prefix": "log",
// "body": [
// "console.log('$1');",
// "$2"
// ],
// "description": "Log output to console"
// }
}

我们根据注释提示输入一个字典(json串)即可,比如下面利用pln拓展fmt.Println($0)。

1
2
3
4
5
json复制代码"println":{
"prefix": "pln", # 触发代码块生效
"body":"fmt.Println($0)", # 拓展代码块内容
"description": "println" # 注释
}

在vs code中新建一个go项目

直接文件->新建文件会新建一个代码文件,不是一个文件夹或者初始化的什么项目,同时这个文件还可以先自己选择编程语言,或者程序自己识别。

这里我们选择文件-打开然后到Desktop/Project/go(我们之前专门用于存储go项目的目录)下面新建一个hello_go文件夹,以这样的方式新建一个项目。

新建好项目文件后,可以再文件夹中新建一个01_hello_go.go文件,然后将前面写的hello_world代码复制粘贴进去即可。

Ctrl+F5可以自动运行,也可以在功能栏选择运行-非调试模式运行。

第一次运行会提示需要安装相关工具包,在让程序安装前需要先设置Go国内加速镜像,先打开终端/cmd,输入下面代码即可:

1
2
3
4
5
6
7
bash复制代码# mac
export GO111MODULE=on
export GOPROXY=https://goproxy.io,direct

# windows
$env:GO111MODULE = "on"
$env:GOPROXY = "https://goproxy.io,direct"

然后在在vscode内更新go工具包即可,如果找不到安装键了,可以再运行一遍代码或者按Ctrl/Command+Shift+P,然后输入update tools选择go这个就可以。

如果还是更新失败,我觉得你可能和我一样是文件权限问题,我安装不上是因为/usr/local/go/bin目录权限问题导致(最后才发现)!!!

1
bash复制代码sudo chmod -R 777 /usr/local/go/bin

如果还是不行,那太难了,你就只能手动安装了,步骤如下:

第一步:下载相关工具包
下面所有git地址我都换成了国内可以访问的快速地址,希望对大家有所帮助~

1
2
3
4
5
6
7
8
bash复制代码# 进入go环境,并创建src/golang.org/x目录
cd $GOPATH
mkdir -p src/golang.org/x && cd src/golang.org/x
# 从github下载必要插件工具包
git clone https://hub.fastgit.org/golang/tools.git
git clone https://hub.fastgit.org/golang/lint.git
git clone https://hub.fastgit.org/golang/mod.git
git clone https://hub.fastgit.org/golang/xerrors.git

第二部:安装相关工具包

1
2
3
4
bash复制代码# 这里比较坑的是 tools的工具包需要你到tools/cmd目录下进行安装,或者在后面加一个@latest。比如:
cd $GOPATH/src/golang.org/x/tools/cmd
go install golang.org/x/tools/cmd/guru
go install golang.org/x/tools/cmd/gorename
1
2
3
4
5
6
7
8
bash复制代码cd $GOPATH
mkdir -p src/github.com && cd src/github.com
git clone https://hub.fastgit.org/stamblerre/gocode.git stamblerre/gocode
git clone https://hub.fastgit.org/acroca/go-symbols.git acroca/go-symbols
git clone https://hub.fastgit.org/ramya-rao-a/go-outline.git ramya-rao-a/go-outline
git clone https://hub.fastgit.org/mdempsky/gocode.git mdempsky/gocode
git clone https://hub.fastgit.org/go-delve/delve.git go-delve/delve
git clone https://hub.fastgit.org/uudashr/gopkgs.git uudashr/gopkgs
1
2
3
4
5
6
7
8
bash复制代码# 新版本安装包 需要加版本信息,如果不确定,可以在后面加@latest
# 表示安装最新版本
cd $GOPATH
go install github.com/stamblerre/gocode@latest
go install github.com/ramya-rao-a/go-outline@latest
go install github.com/acroca/go-symbols@latest
go install github.com/mdempsky/gocode@latest
go install github.com/go-delve/delve/cmd/dlv@latest

如果安装完这些还提示缺什么,就按上面方法git下载,然后install即可。

(前面已经说了,这里再感叹下,这个问题卡了我1天时间!!!)前面这么多手动操作,到最后发现是自己目录权限问题导致安装不上的,直接修改下权限就可以安装成功了,悟了!

1
bash复制代码sudo chmod -R 777 /usr/local/go/bin

相关依赖包的安装好后,我们再运行代码就可以成功的输出Hello World啦~

下期见,我是爱猫爱技术的老表,如果觉得本文对你学习有所帮助,欢迎点赞、评论、关注我!

参考文章

[a]为什么要使用 Go 语言?Go 语言的优势在哪里?:www.zhihu.com/question/21…

[b]Mac 系统下 Go 语言环境的安装与配置:bbs.huaweicloud.com/blogs/detai…

[c]Go语言有什么好用的IDE吗?www.zhihu.com/question/25…

[d]Atom、Sublime Text、VSCode 三者比较,各有哪些优势和弱势?www.zhihu.com/question/41…

[e]LiteIDE X 与 VSCode www.reddit.com/r/golang/co…

[f]VS Code配置Go语言开发环境 www.liwenzhou.com/posts/Go/00…

[e]macos安装go语言和vscode插件 blog.csdn.net/luxingjyp/a…

[f]vscode 安装 golang插件,报错 permission denied www.cnblogs.com/fsqsec/p/14…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

go-zero基础组件-并发执行MapReduce

发表于 2021-11-17

image-20211117182142710

为什么需要 MapReduce

在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。

比如要查询商品详情:

  1. 商品服务-查询商品属性
  2. 库存服务-查询库存属性
  3. 价格服务-查询价格属性
  4. 营销服务-查询营销属性

如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。

简单的场景下使用 waitGroup 也能够满足需求,但是如果我们需要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?继续使用 waitGroup 就有点力不从心了,go 的官方库中并没有这种工具(java 中提供了 CompleteFuture),go-zero 作者依据 mapReduce 架构思想实现了进程内的数据批处理 mapReduce 并发工具类。

设计思路

我们尝试把自己代入到作者的角色梳理一下并发工具可能的业务场景:

  1. 查询商品详情:支持并发调用多个服务来组合产品属性,支持调用错误可以立即结束。
  2. 商品详情页自动推荐用户卡券:支持并发校验卡券,校验失败自动剔除,返回全部卡券。

以上实际都是在进行对输入数据进行处理最后输出清洗后的数据,针对数据处理有个非常经典的异步模式:生产者消费者模式。于是我们可以抽象一下数据批处理的生命周期,大致可以分为三个阶段:

image-20211116114416425

  1. 数据生产 generate
  2. 数据加工 mapper
  3. 数据聚合 reducer

其中数据生产是不可或缺的阶段,数据加工、数据聚合是可选阶段,数据生产与加工支持并发调用,数据聚合基本属于纯内存操作单协程即可。

再来思考一下不同阶段之间数据应该如何流转,既然不同阶段的数据处理都是由不同 goroutine 执行的,那么很自然的可以考虑采用 channel 来实现 goroutine 之间的通信。

image-20211116114334026

如何实现随时终止流程呢?

很简单,goroutine 中监听一个全局的结束 channel 就行。

go-zero 代码实现

core/mr/mapreduce.go

详细源码可查看 github.com/Ouyangan/go…

前置知识 - channel 基本用法

因为 MapReduce 源码中大量使用 channel 进行通信,大概提一下 channel 基本用法:

  1. channel 写结束后记得关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go复制代码	ch := make(chan interface{})
//写入完毕需要主动关闭channel
defer func() {
close(ch)
}()
go func() {
//v,ok模式 读取channel
for {
v, ok := <-ch
if !ok {
return
}
t.Log(v)
}

//for range模式读取channel,channel关闭循环自动退出
for i := range ch {
t.Log(i)
}

//清空channel,channel关闭循环自动退出
for range ch {

}
}()
for i := 0; i < 10; i++ {
ch <- i
time.Sleep(time.Second)
}
  1. 已关闭的 channel 依然支持读取
  2. 限定 channel 读写权限
1
2
3
4
5
6
7
8
9
10
11
go复制代码//只读channel
func readChan(rch <-chan interface{}) {
for i := range rch {
log.Println(i)
}
}

//只写channel
func writeChan(wch chan<- interface{}) {
wch <- 1
}

接口定义

先来看最核心的三个函数定义:

  1. 数据生产
  2. 数据加工
  3. 数据聚合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码	//数据生产func
//source - 数据被生产后写入source
GenerateFunc func(source chan<- interface{})

//数据加工func
//item - 生产出来的数据
//writer - 调用writer.Write()可以将加工后的向后传递至reducer
//cancel - 终止流程func
MapperFunc func(item interface{}, writer Writer, cancel func(error))

//数据聚合func
//pipe - 加工出来的数据
//writer - 调用writer.Write()可以将聚合后的数据返回给用户
//cancel - 终止流程func
ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))

面向用户的方法定义

使用方法可以查看官方文档,这里不做赘述

面向用户的方法比较多,方法主要分为两大类:

  1. 无返回
    1. 执行过程发生错误立即终止
    2. 执行过程不关注错误
  2. 有返回值
    1. 手动写入 source,手动读取聚合数据 channel
    2. 手动写入 source,自动读取聚合数据 channel
    3. 外部传入 source,自动读取聚合数据 channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go复制代码//并发执行func,发生任何错误将会立即终止流程
func Finish(fns ...func() error) error

//并发执行func,即使发生错误也不会终止流程
func FinishVoid(fns ...func())

//需要用户手动将生产数据写入 source,加工数据后返回一个channel供读取
//opts - 可选参数,目前包含:数据加工阶段协程数量
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option)

//无返回值,不关注错误
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option)

//无返回值,关注错误
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option)

//需要用户手动将生产数据写入 source ,并返回聚合后的数据
//generate 生产
//mapper 加工
//reducer 聚合
//opts - 可选参数,目前包含:数据加工阶段协程数量
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error)

//支持传入数据源channel,并返回聚合后的数据
//source - 数据源channel
//mapper - 读取source内容并处理
//reducer - 数据处理完毕发送至reducer聚合
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error)

核心方法是 MapReduceWithSource 和 Map,其他方法都在内部调用她两。弄清楚了 MapReduceWithSource 方法 Map 也不在话下。

MapReduceWithSource 源码实现

一切都在这张图里面了

image-20211116170314073

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
go复制代码//支持传入数据源channel,并返回聚合后的数据
//source - 数据源channel
//mapper - 读取source内容并处理
//reducer - 数据处理完毕发送至reducer聚合
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
//可选参数设置
options := buildOptions(opts...)
//聚合数据channel,需要手动调用write方法写入到output中
output := make(chan interface{})
//output最后只会被读取一次
defer func() {
//如果有多次写入的话则会造成阻塞从而导致协程泄漏
//这里用 for range检测是否可以读出数据,读出数据说明多次写入了
//为什么这里使用panic呢?显示的提醒用户用法错了会比自动修复掉好一些
for range output {
panic("more than one element written in reducer")
}
}()
//创建有缓冲的chan,容量为workers
//意味着最多允许 workers 个协程同时处理数据
collector := make(chan interface{}, options.workers)
//数据聚合任务完成标志
done := syncx.NewDoneChan()
//支持阻塞写入chan的writer
writer := newGuardedWriter(output, done.Done())
//单例关闭
var closeOnce sync.Once
var retErr errorx.AtomicError
//数据聚合任务已结束,发送完成标志
finish := func() {
//只能关闭一次
closeOnce.Do(func() {
//发送聚合任务完成信号,close函数将会向chan写入一个零值
done.Close()
//关闭数据聚合chan
close(output)
})
}
//取消操作
cancel := once(func(err error) {
//设置error
if err != nil {
retErr.Set(err)
} else {
retErr.Set(ErrCancelWithNil)
}
//清空source channel
drain(source)
//调用完成方法
finish()
})

go func() {
defer func() {
//清空聚合任务channel
drain(collector)
//捕获panic
if r := recover(); r != nil {
//调用cancel方法,立即结束
cancel(fmt.Errorf("%v", r))
} else {
//正常结束
finish()
}
}()
//执行数据加工
//注意writer.write将加工后数据写入了output
reducer(collector, writer, cancel)
}()
//异步执行数据加工
//source - 数据生产
//collector - 数据收集
//done - 结束标志
//workers - 并发数
go executeMappers(func(item interface{}, w Writer) {
mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)
//reducer将加工后的数据写入了output,
//需要数据返回时读取output即可
//假如output被写入了超过两次
//则开始的defer func那里将还可以读到数据
//由此可以检测到用户调用了多次write方法
value, ok := <-output
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return value, nil
} else {
return nil, ErrReduceNoOutput
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
go复制代码//数据加工
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
done <-chan lang.PlaceholderType, workers int) {
//goroutine协调同步信号量
var wg sync.WaitGroup
defer func() {
//等待数据加工任务完成
//防止数据加工的协程还未处理完数据就直接退出了
wg.Wait()
//关闭数据加工channel
close(collector)
}()
//带缓冲区的channel,缓冲区大小为workers
//控制数据加工的协程数量
pool := make(chan lang.PlaceholderType, workers)
//数据加工writer
writer := newGuardedWriter(collector, done)
for {
select {
//监听到外部结束信号,直接结束
case <-done:
return
//控制数据加工协程数量
//缓冲区容量-1
//无容量时将会被阻塞,等待释放容量
case pool <- lang.Placeholder:
//阻塞等待生产数据channel
item, ok := <-input
//如果ok为false则说明input已被关闭或者清空
//数据加工完成,执行退出
if !ok {
//缓冲区容量+1
<-pool
//结束本次循环
return
}
//wg同步信号量+1
wg.Add(1)
// better to safely run caller defined method
//异步执行数据加工,防止panic错误
threading.GoSafe(func() {
defer func() {
//wg同步信号量-1
wg.Done()
//缓冲区容量+1
<-pool
}()

mapper(item, writer)
})
}
}
}

总结

mapReduce 的源码我大概看了两个晚上,整体看下来比较累。一方面是我自身 go 语言并不是很熟练尤其是 channel 的用法,导致我需要频繁停下来查询相关文档理解作者的写法,另一方面是多个 goroutine 之间通过 channel 进行通信实现协作真的蛮烧脑(佩服作者的思维能力)。

其次看源码时第一遍看起来肯定会比较懵的,其实没关系找到程序的入口(公共基础组件一般是面向的方法)先沿着主线一路看下去把每一句代码都看懂加上注释,再看支线代码。

如果有实在看不懂的地方就查查这段代码的提交记录非常有可能是解决某个bug改动的,比如下面这段代码我死活看了好多遍都不理解。

1
2
3
4
5
6
7
8
9
10
11
go复制代码	//聚合数据channel,需要手动调用write方法写入到output中
output := make(chan interface{})
//output最后只会被读取一次
defer func() {
//如果有多次写入的话则会造成阻塞从而导致协程泄漏
//这里用 for range检测是否可以读出数据,读出数据说明多次写入了
//为什么这里使用panic呢?显示的提醒用户用法错了会比自动修复掉好一些
for range output {
panic("more than one element written in reducer")
}
}()

最后画出流程图基本就能把源码看懂了,对于我而言这方法比较笨但有效。

资料

Go Channel 详解

go-zero MapReduce文档

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ThreadLocal使用及源码分析

发表于 2021-11-17

通常的业务开发中,ThreadLocal 有两种典型的使用场景。

  • ThreadLocal 用作保存每个线程独享的对象,为每个线程都创建一个副本,这样每个线程都可以修改自己所拥有的副本, 而不会影响其他线程的副本,确保了线程安全。
  • ThreadLocal 用作每个线程内需要独立保存信息,以便供其他方法更方便地获取该信息的场景。每个线程获取到的信息可能都是不一样的,前面执行的方法保存了信息后,后续方法可以通过 ThreadLocal直接获取到,避免了传参,类似于全局变量的概念。

ThreadLocal会隔离不同线程之间的操作,使得相互不受影响。那么底层的存储结构是怎样的?
首先ThreadLocal是线程维度的,Thread,ThreadLocal,ThreadLocalMap之间是什么关系?

image.png

如图Thread1,这是一个线程,它的箭头指向了ThreadLocalMap1,其要表达的意思是,每个Thread 对象中都持有一个ThreadLocalMap 类型的成员变量,在这里 Thread 1 所拥有的成员变量就是 ThreadLocalMap1。

而ThreadLocalMap 自身类似于是一个Map,里面会有一个个 key-value 形式的键值对。那么就来看一下它的 key 和 value 分别是什么。可以看到这个表格的左侧是 ThreadLocal1、ThreadLocal2…… ThreadLocaln,能看出这里的 key 就是 ThreadLocal 的引用。

而在表格的右侧是一个一个的 value,这就是我们希望 ThreadLocal 存储的内容,例如起那么所说的全部变量 user 对象等。

这里需要重点看到它们的数量对应关系:一个 Thread 里面只有一个ThreadLocalMap ,而在一个 ThreadLocalMap 里面却可以有很多的 ThreadLocal,每一个 ThreadLocal 都对应一个 value。因为一个 Thread 是可以调用多个ThreadLocal的,所以 Thread内部就采用了 ThreadLocalMap 这样 Map 的数据结构来存放 ThreadLocal 和 value。

通过这张图片,我们就可以搞清楚 Thread、 ThreadLocal 及 ThreadLocalMap 三者在宏观上的关系了。

源码分析

知道了它们的关系之后,我们再来进行源码分析,来进一步地看到它们内部的实现。

  • get 方法
    源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public T get() {

    //获取到当前线程

    Thread t = Thread.currentThread();

    //获取到当前线程内的 ThreadLocalMap 对象,每个线程内都有一个 ThreadLocalMap 对象

    ThreadLocalMap map = getMap(t);

    if (map != null) {

        //获取 ThreadLocalMap 中的 Entry 对象并拿到 Value

        ThreadLocalMap.Entry e = map.getEntry(this);

        if (e != null) {

            @SuppressWarnings("unchecked")

            T result = (T)e.value;

            return result;

        }

    }

    //如果线程内之前没创建过 ThreadLocalMap,就创建

    return setInitialValue();

}

这是 ThreadLocal 的 get 方法,可以看出它利用了 Thread.currentThread 来获取当前线程的引用,并且把这个引用传入到了 getMap 方法里面,来拿到当前线程的 ThreadLocalMap。

然后就是一个 if ( map != null ) 条件语句,那我们先来看看 if (map == null) 的情况,如果 map == null,则说明之前这个线程中没有创建过 ThreadLocalMap,于是就去调用 setInitialValue 来创建;如果 map != null,我们就应该通过 this 这个引用(也就是当前的 ThreadLocal 对象的引用)来获取它所对应的 Entry,同时再通过这个 Entry 拿到里面的 value,最终作为结果返回。

值得注意的是,这里的 ThreadLocalMap 是保存在线程 Thread 类中的,而不是保存在ThreadLocal 中的。

  • getMap 方法
    源码
1
2
3
4
5
java复制代码ThreadLocalMap getMap(Thread t) {

    return t.threadLocals;

}

可以看到,这个方法很清楚地表明了 Thread 和 ThreadLocalMap 的关系,可以看出 ThreadLocalMap 是线程的一个成员变量。这个方法的作用就是获取到当前线程内的 ThreadLocalMap 对象,每个线程都有 ThreadLocalMap 对象,而这个对象的名字就叫作 threadLocals,初始值为 null,代码如下:

1
java复制代码ThreadLocal.ThreadLocalMap threadLocals = null;
  • set 方法
    源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public void set(T value) {

    Thread t = Thread.currentThread();

    ThreadLocalMap map = getMap(t);

    if (map != null)

        map.set(this, value);

    else

        createMap(t, value);

}

set 方法的作用是把想要存储的 value 给保存进去。可以看出,首先,它还是需要获取到当前线程的引用,并且利用这个引用来获取到 ThreadLocalMap ;然后,如果 map == null 则去创建这个 map,而当 map != null 的时候就利用 map.set 方法,把 value 给 set 进去。

可以看出,map.set(this, value) 传入的这两个参数中,第一个参数是 this,就是当前 ThreadLocal 的引用,这也再次体现了,在 ThreadLocalMap 中,它的 key 的类型是 ThreadLocal;而第二个参数就是我们所传入的 value,这样一来就可以把这个键值对保存到 ThreadLocalMap 中去了。

  • ThreadLocalMap 类,也就是 Thread.threadLocals
    ThreadLocalMap 这个类,下面这段代码截取自定义在 ThreadLocal 类中的 ThreadLocalMap 类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码static class ThreadLocalMap {



    static class Entry extends WeakReference<ThreadLocal<?>> {

        /** The value associated with this ThreadLocal. */

        Object value;





        Entry(ThreadLocal<?> k, Object v) {

            super(k);

            value = v;

        }

    }

   private Entry[] table;

//...

}

ThreadLocalMap 类是每个线程 Thread 类里面的一个成员变量,其中最重要的就是截取出的这段代码中的 Entry 内部类。在 ThreadLocalMap 中会有一个 Entry 类型的数组,名字叫 table。我们可以把 Entry 理解为一个 map,其键值对为:

  • 键,当前的ThreadLocal;
  • 值,实际需要存储的变量

总结:主要分析了Thread、 ThreadLocal 和 ThreadLocalMap 这三个非常重要的类的关系。用图画的方式表明了它们之间的关系:一个 Thread 有一个 ThreadLocalMap,而 ThreadLocalMap 的 key 就是一个个的 ThreadLocal,它们就是用这样的关系来存储并维护内容的.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Flink CDC 21 正式发布,稳定性大幅提升,新增

发表于 2021-11-17

本文作者徐榜江 (雪尽)

以下视频为伍翀 (云邪) 分享的 Flink CDC 前世今生:
www.bilibili.com/video/BV1jT…

前言

CDC (Change Data Capture) 是一种用于捕捉数据库变更数据的技术,Flink 从 1.11 版本开始原生支持 CDC 数据(changelog)的处理,目前已经是非常成熟的变更数据处理方案。

img

Flink CDC Connectors 是 Flink 的一组 Source 连接器,是 Flink CDC 的核心组件,这些连接器负责从 MySQL、PostgreSQL、Oracle、MongoDB 等数据库读取存量历史数据和增量变更数据。Flink CDC Connectors 是一个独立的开源项目,从去年 7 月份开源以来,社区保持了相当高速的发展,平均两个月一个版本,在开源社区的关注度持续走高,也逐渐有越来越多的用户使用 Flink CDC 来快速构建实时数仓和数据湖。

img

在今年 7 月份,Flink CDC Maintainer 徐榜江 (雪尽) 在北京的 Flink Meetup 首次分享了 Flink CDC 2.0 的设计。随后的 8 月份,Flink CDC 社区发布 2.0 版本,解决了诸多生产实践上的痛点,Flink CDC 社区的用户群也随之迅速壮大。

img

除了社区用户群体的迅速扩大,社区的开发者也在快速增加,目前已经有国内外多家公司的开发者加入到 Flink CDC 社区的开源共建,有来自北美 Cloudera 的开发者,也有来自欧洲 Vinted,Ververica 的开发者,国内的开发者更加活跃,有来自腾讯,阿里,字节等互联网公司的开发者,也有来自 XTransfer,新华文轩等创业公司和传统企业的开发者。此外,国内外的多家云厂商,其流计算产品都已经集成了 Flink CDC,让更多的用户体验到 Flink CDC 的强大与便捷。

一、Flink CDC 2.1 概览

在社区开发者们共同努力下,今天 Flink CDC 社区很开心地宣布 Flink CDC 2.1 正式发布了:github.com/ververica/f…

本文带着你 10 分钟了解 Flink CDC 2.1 版本的重大改进和核心功能。 2.1 版本包含 23 位贡献者贡献的 100+ PR,重点提升了 MySQL CDC 连接器的性能和生产稳定性,重磅推出 Oracle CDC 连接器和 MongoDB CDC 连接器。

  • MySQL CDC 支持百亿级数据的超大表,支持 MySQL 全部数据类型,通过连接池复用等优化大幅提升稳定性。同时提供支持无锁算法,并发读取的 DataStream API,用户可以借此搭建整库同步链路;
  • 新增了 Oracle CDC 连接器, 支持从 Oracle 数据库获取全量历史数据和增量变更数据;
  • 新增了 MongoDB CDC 连接器,支持从 MongoDB 数据库获取全量历史数据和增量变更数据;
  • 所有连接器均支持了 metadata column 功能, 用户通过 SQL 就可以访问库名,表名,数据变更时间等 meta 信息,这对分库分表场景的数据集成非常实用;
  • 丰富 Flink CDC 入门文档,增加多种场景的端到端实践教程。

二、MySQL CDC 连接器改进详解

在 Flink CDC 2.0 版本里,MySQL CDC 连接器提供了无锁算法,并发读取,断点续传等高级特性, 一并解决了诸多生产实践上的痛点,随后大量用户开始投入使用并大规模上线。在上线过程中,我们配合用户解决了诸多生产问题,同时也开发了一些用户迫切需要的高优功能,Flink CDC 2.1 版本针对 MySQL CDC 连接器的改进主要包括两类,一类是稳定性提升,一类是功能增强。

1. 稳定性提升

  • 针对不同的主键分布,引入动态分片算法

对主键是非数值、Snowflake ID、稀疏主键、联合主键等场景,通过动态分析源表的主键分布的均匀程度,根据分布的均匀程度自动地计算分片大小,让切片更加合理,让分片计算更快。动态分片算法能够很好地解决稀疏主键场景下分片过多的,联合主键场景下分片过大等问题,让每个分片包含的行数尽量维持在用户指定的 chunk size,这样用户通过 chunk size 就能控制分片大小和分片数量,无需关心主键类型。

  • 支持百亿级超大规模表

在表规模非常大时,以前会报 binlog 分片下发失败的错误,这是因为在超大表对应的 snapshot 分片会非常多,而 binlog 分片需要包含所有 snapshot 分片信息,当 SourceCoordinator 下发 binglog 分片到 SourceReader 节点时,分片 size 超过 RPC 通信框架支持的最大 size 会导致分片下发失败。虽然可以通过修改 RPC 框架的参数缓解分片 size 过大问题,但无法彻底解决。2.1 版本里通过将多个 snapshot 分片信息划分成 group 发送,一个 binlog 分片会切分成多个 group 逐个发送,从而彻底解决该问题。

  • 引入连接池管理数据库连接,提升稳定性

通过引入连接池管理数据库连接,一方面降低了数据库连接数,另外也避免了极端场景导致的连接泄露。

  • 支持分库分表 schema 不一致时,缺失字段自动填充 NULL 值

2. 功能增强

  • 支持所有 MySQL 数据类型

包括枚举类型、数组类型、地理信息类型等复杂类型。

  • 支持 metadata column

用户可以在 Flink DDL 中通过 db_name STRING METADATA FROM ‘database_name’ 的方式来访问库名(database_name)、表名(table_name)、变更时间(op_ts)等 meta 信息。这对分库分表场景的数据集成非常使用。

  • 支持并发读取的 DataStream API

在 2.0 版本中,无锁算法,并发读取等功能只在 SQL API 上透出给用户,而 DataStream API 未透出给用户,2.1 版本支持了 DataStream API,可通过 MySqlSourceBuilder 创建数据源。用户可以同时捕获多表数据,借此搭建整库同步链路。同时通过 MySqlSourceBuilder#includeSchemaChanges 还能捕获 schema 变更。

  • 支持 currentFetchEventTimeLag,currentEmitEventTimeLag,sourceIdleTime 监控指标

这些指标遵循 FLIP-33 [1] 的连接器指标规范,可以查看 FLIP-33 获取每个指标的含义。其中,currentEmitEventTimeLag 指标记录的是 Source 发送一条记录到下游节点的时间点和该记录在 DB 里产生时间点差值,用于衡量数据从 DB 产生到离开 Source 节点的延迟。用户可以通过该指标判断 source 是否进入了 binlog 读取阶段:

+ 即当该指标为 0 时,代表还在全量历史读取阶段;
+ 当大于 0 时,则代表进入了 binlog 读取阶段。

三、详解新增 Oracle CDC 连接器

Oracle 也是使用很广泛的数据库, Oracle CDC 连接器支持捕获并记录 Oracle 数据库服务器中发生的行级变更,其原理是使用 Oracle 提供的 LogMiner [2] 工具或者原生的 XStream API [3] 从 Oracle 中获取变更数据。

LogMiner 是 Oracle 数据库提供的一个分析工具,该工具可以解析 Oracle Redo 日志文件,从而将数据库的数据变更日志解析成变更事件输出。通过 LogMiner 方式时,Oracle 服务器对解析日志文件的进程做了严格的资源限制,所以对规模特别大的表,数据解析会比较慢,优点是 LogMiner 是可以免费使用的。

XStream API 是 Oracle 数据库为 Oracle GoldenGate (OGG) 提供的内部接口, 客户端可以通过 XStream API 高效地获取变更事件,其变更数据不是从 Redo 日志文件中获取,而是从 Oralce 服务器中的一块内存中直接读取,省去了数据落盘到日志文件和解析日志文件的开销,效率更高,但是必须购买 Oracle GoldenGate (OGG) 的 License。

img

Oracle CDC 连接器支持 LogMiner 和 XStream API 两种方式捕获变更事件。理论上能支持各种 Oracle 版本,目前 Flink CDC 项目里测试了 Oracle 11,12 和 19 三个版本。使用 Oracle CDC 连接器,用户只需要声明如下 Flink SQL 就能实时捕获 Oracle 数据库中的变更数据:

img

利用 Flink 丰富的周边生态,用户可以非常方便地写入各种下游存储,如消息队列,数据仓库,数据湖等。

Oracle CDC 连接器已经将底层的 CDC 细节屏蔽,整个实时同步链路,用户只需要几行 Flink SQL,不用开发任何 Java 代码,就可以将 Oracle 的数据变更实时捕获并发送。

此外,Oracle CDC 连接器也提供两种工作模式,即读取全量数据 + 增量变更数据,和只读取增量变更数据。Flink CDC 框架均保证一条不多一条不少的 exactly-once 语义。

四、详解新增 MongoDB CDC 连接器

MongoDB CDC 连接器并不依赖 Debezium,是在 Flink CDC 项目里独立开发。 MongoDB CDC 连接器支持捕获并记录 MongoDB 数据库中实时变更数据,其原理是伪装一个 MongoDB 集群里副本 [4],利用 MongoDB 集群的高可用机制,该副本可以从 master 节点获取完整 oplog(operation log) 事件流。Change Streams API 则提供实时订阅这些 oplog 事件流的能力,可以将这些实时的 oplog 事件流推送给订阅的应用程序。

img

从 ChangeStreams API 获取的更新事件中,对于 update 事件,没有 update 事件的前镜像值,即 MongoDB CDC 数据源只能作为一个 upsert source。不过 Flink 框架会自动为 MongoDB CDC 附加一个 Changelog Normalize 节点,补齐 update 事件的前镜像值(即 UPDATE_BEFORE 事件),从而确保 CDC 数据的语义正确性。

使用 MongoDB CDC 连接器,用户只需要声明如下 Flink SQL 就能实时捕获 MongoDB 数据库中的全量和增量变更数据,借助 Flink 强大的集成能力,用户可以非常方便地将 MongoDB 中的数据实时同步到 Flink 支持的所有下游存储。

img

整个数据捕获过程,用户不需要学习 MongoDB 的副本机制和原理,极大地简化了流程,降低了使用门槛。MongoDB CDC 也支持两种启动模式:

  • 默认的initial 模式是先同步表中的存量的数据,然后同步表中的增量数据;
  • latest-offset 模式则是从当前时间点开始只同步表中增量数据。

此外,MongoDB CDC 还提供了丰富的配置和优化参数,对于生产环境来说,这些配置和参数能够极大地提升实时链路的性能和稳定性。

五、总结和展望

在短短的一年多时间里,Flink CDC 项目取得了现象级的发展和关注,这离不开 Flink CDC 开源社区的贡献者们的无私贡献, 也离不开广大 Flink CDC 用户的积极反馈,正是这两者的良性互动才使得 Flink CDC 项目健康发展,这种良性互动也是开源社区的魅力所在。

Flink CDC 社区将会继续做好开源社区建设,在未来规划中,主要有三个方向:

  • 做深 CDC 技术

如抽象复用 mysql-cdc 实现,让 Oracle, MongoDB 等也能快速支持无锁读取、并发读取等特性。

  • 做广数据库生态

我们会支持更丰富的数据库 CDC 数据源,如 TiDB, DB2, MS SqlServer等。

  • 做好数据集成场景
+ 更好地集成实时数仓、数据湖的下游生态,包括 Hudi、Iceberg、ClickHouse、Doris 等。
+ 进一步降低 CDC 数据入湖入仓的门槛,解决整库同步、表结构变更同步等痛点。

致谢

特别感谢来自 Cloudera 公司的 Marton Balassi, Tamas Kiss 贡献的 Oracle CDC 连接器,来自 XTransfer 公司的 Jiabao Sun 贡献的 MongoDB CDC 连接器。

贡献者列表:

Gunnar Morling, Jark Wu, Jiabao Sun, Leonard Xu, MartijnVisser, Marton Balassi, Shengkai, Tamas Kiss, Tamas Zseder, Zongwen Li, dongdongking008, frey66, gongzhongqiang, ili zh, jpatel, lsy, luoyuxia, manmao, mincwang, taox, tuple, wangminchao, yangbin09

[1] cwiki.apache.org/confluence/…

[2] oracle-base.com/articles/8i…

[3] docs.oracle.com/cd/E11882_0…

[4] docs.mongodb.com/manual/repl…


Flink Forward Asia 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。
flink-forward.org.cn/

另有首届 Flink Forward Asia Hackathon 正式启动,10W 奖金等你来!
www.aliyun.com/page-source…
img


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

怎么自学Python,大概要多久?

发表于 2021-11-17

这是我参与11月更文挑战的第9天,活动详情查看:2021最后一次更文挑战

大家好,我是老表,希望本文对初学或者有所迷茫的朋友有所帮助。

先正面回答一下问题,需要多久:入门一周+选定方向一周+进阶学习三个月,整体算下来就是3个月2周的时间,这个过程最主要的是坚持和坚守,坚持就是坚持走下去,不要停歇;坚守就是坚守如一,选定方向和教程就不要更换。

一、首先是入门

入门比较建议找一个视频课程,花一周时间快速过掉(不用担心掌握的不牢固,因为在后面进阶学习中也会涉及基础知识回顾)。

入门阶段最重要的是:环境搭建和多敲代码、解决问题。

环境搭建

环境搭建是一切好的开始的基础,就像建房子打地基一样,对于初学者来说,环境搭建也是最容易出错、卡壳的地方,运气好,可能跟着教程一下就好了,运气不好,可能弄个两三天还是不行。

所以为了学习积极性不被打击,我建议初期可以看看周边有没有什么朋友是了解相关知识的,多请教下,实在没有,你也可以通过在本文评论你的问题,看到后就会给你解答。

Python环境搭建教程推荐阅读

Windows/Mac 安装、使用Python环境+jupyter notebook

多敲代码

基础学习的时候记住,千万不要觉得前期是理论知识,多记多背就可以了,学习编程最主要的是需要多敲代码,多复现视频或者图书中的代码,在这个过程中你不止会积累敲代码的经验,你还会积累很多解决问题的经验。

问题怎么解决

在说下,初学遇到问题怎么解决,一般有以下几种解决方法:

1) 自己根据报错提示,检查代码问题

2) 仔细核对视频或者图书中的代码和自己写的有何不同

3) 浏览器搜索错误提示,一般你遇到的问题,其他人都遇到过~

图片

图片

4) 请教认识的相关领域学习者(比自己厉害就行,越厉害越好~),请教的时候记住,仔细描述问题,一般问题描述模板:

1
2
3
4
5
javascript复制代码xx你好,我最近学习python遇到这个问题,多方尝试没有解决,向你请教下,
问题:错误描述,如:TypeError: Cannot perform 'ror...
我尝试解决方案:xxx,xxx,xxxx
相关代码截图:xxx
相关报错截图:xxxx

记住,和大佬交流别问:在不在;能不能帮忙解决个问题;想向你请教下,有没有时间…如果真觉得不好意思,不太熟的人,发个红包表示诚意就行,然后直接抛问题,不浪费彼此的时间。

5) 可以到一些论坛提问

这个解决方案顺序是依次向下的,能自己解决最好自己解决,一个问题卡壳超过一个小时或者更多时间已经影响自己学习心态了,就不要硬肝了,像大佬或者互联网求助~

具体学习资料和学习路线啥的,大家网络中直接搜索就可以了,善用浏览器,选定一个就行,记住,初期学习一定要坚持。

二、选定方向

Python的应用非常广泛,包括:爬虫、数据分析、web开发、算法、机器学习、人工智能等,所以在进阶学习前,你需要先选定一个方向,这非常关键,当然你可以都先去了解,简单学习下,然后选一个自己感兴趣的就行。

在选定方向的时候一定要注意,不要在一些不那么感兴趣的方向花费太多精力,所以这个阶段最好一周内结束,而且在这个过程你还得不断复习巩固前面的基础。

这个阶段你可以考虑多去网络上听一些免费的课程、或者是讲座,或者是找一些大佬进行语音交流,多去了解每个方向的发展前景,和具体需要做些什么,可以不用那么急想去快点写个高级爬虫,复现个机器学习算法等~

三、进阶学习

这里就很重要了,前面两步可以说都是打基础,这一步就是建高楼了。

首先根据你选定的进阶学习方向,在网络上选一门付费或者免费的教程,个人更推荐付费(99-500的样子),因为一般付费课程都会有配套的学习交流群和答疑老师,这对你学习会有很大的帮助。

这个阶段,你除了关注基础练习和相关行业了解外,你更应该多进行相关项目练习,多和其他人交流,只有不断的交流,分享,你才会汲取到更多知识,知识掌握的也会更牢固。

四、总结

最后总结下:首先需要打好基础(环境搭建、基础学习、问题解决思路),然后选定一个自己感兴趣且有发展前景的方向,然后选择一门相关方向的系列教程,坚持学习下去即可。加油,希望我们都学有所成。

具体学习资料和学习路线啥的,我就不推荐了,大家网络中直接搜索就可以了,善用浏览器,选定一个就行,记住,初期学习一定要坚持。

下期见,我是爱猫爱技术的老表,如果觉得本文对你学习有所帮助,欢迎点赞、评论、关注我!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

『十倍程序员』Java五大对象映射框架,总有一款适合你😘

发表于 2021-11-17

「这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战」。

1.前言

Hello 大家好,我是l拉不拉米,今天的『十倍程序员』系列给大家分享Java五大映射框架的介绍以及简单实用,如果您还在用BeanUtils.copyProperties(),请赶快换成映射框架

2.简介

创建由多层组成的大型 Java 应用程序需要使用多个模型,例如持久性模型、域模型或所谓的 DTO。为不同的应用层使用多个模型将需要我们提供一种 bean 之间的映射方式。

手动执行此操作可以快速创建大量样板代码并消耗大量时间。幸运的是,Java 有多种对象映射框架。

在本文中,我们将比较最流行的五款 Java 映射框架的性能。

3.映射框架

3.1. Dozer

Dozer 是一种映射框架,它使用递归将数据从一个对象复制到另一个对象。该框架不仅能够在 bean 之间复制属性,而且还可以在不同类型之间自动转换。

要使用 Dozer 框架,我们需要将此类依赖项添加到我们的项目中:

1
2
3
4
5
xml复制代码<dependency> 
<groupId>com.github.dozermapper</groupId>
<artifactId>dozer-core</artifactId>
<version>6.5.0</version>
</dependency>

3.2. Orika

Orika 是一个 bean 到 bean 的映射框架,它递归地将数据从一个对象复制到另一个对象。

Orika 的一般工作原理类似于推土机。两者之间的主要区别在于 Orika 使用字节码生成。这允许以最小的开销生成更快的映射器。

要使用它,我们需要将此类依赖项添加到我们的项目中:

1
2
3
4
5
xml复制代码<dependency> 
<groupId>ma.glasnost.orika</groupId>
<artifactId>orika-core</artifactId>
<version>1.5.4</version>
</dependency>

3.3. MapStruct

MapStruct 是一个自动生成 bean 映射器类的代码生成器。

MapStruct 还具有在不同数据类型之间进行转换的能力。

要将 MapStruct 添加到我们的项目中,我们需要包含以下依赖项:

1
2
3
4
5
xml复制代码<dependency> 
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>1.3.1.Final</version>
</dependency>

3.4. ModelMapper

ModelMapper 是一个框架,旨在通过根据约定确定对象如何相互映射来简化对象映射。它提供类型安全和重构安全的 API。

要在我们的项目中包含 ModelMapper,我们需要添加以下依赖项:

1
2
3
4
5
xml复制代码<dependency> 
<groupId>org.modelmapper</groupId>
<artifactId>modelmapper</artifactId>
<version>2.3.8</version>
</dependency>

3.5. JMapper

JMapper 是一个映射框架,旨在提供一个易于使用的、高性能的 Java Bean 之间的映射。

该框架旨在使用注释和关系映射来应用 DRY 原则。

该框架支持不同的配置方式:基于注解、基于 XML 或基于 API。

要在我们的项目中包含 JMapper,我们需要添加它的依赖项:

1
2
3
4
5
xml复制代码<dependency> 
<groupId>com.googlecode.jmapper-framework</groupId>
<artifactId>jmapper-core</artifactId>
<version>1.6.1.CR2</version>
</dependency>

4.简单实现

4.1. Orika转换器

Orika 允许完整的 API 实现,这极大地简化了映射器的创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class OrikaConverter implements Converter{ 
private MapperFacade mapperFacade;

public OrikaConverter() {
MapperFactory mapperFactory = new DefaultMapperFactory.Builder().build();
mapperFactory.classMap(Order.class, SourceOrder.class).field("orderStatus", "status").byDefault().register();
mapperFacade = mapperFactory.getMapperFacade();
}

@Override
public Order convert(SourceOrder sourceOrder) {
return mapperFacade.map(sourceOrder, Order.class);
}

@Override
public DestinationCode convert(SourceCode sourceCode) {
return mapperFacade.map(sourceCode, DestinationCode.class);
}
}

4.2. Dozer转换器

Dozer 需要 XML 映射文件,包含以下部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
xml复制代码<mappings xmlns="http://dozermapper.github.io/schema/bean-mapping" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://dozermapper.github.io/schema/bean-mapping
https://dozermapper.github.io/schema/bean-mapping.xsd">

<mapping>
<class-a>com.baeldung.performancetests.model.source.SourceOrder</class-a>
<class-b>com.baeldung.performancetests.model.destination.Order</class-b>
<field>
<a>status</a>
<b>orderStatus</b>
</field>
</mapping>
<mapping>
<class-a>com.baeldung.performancetests.model.source.SourceCode</class-a>
<class-b>com.baeldung.performancetests.model.destination.DestinationCode</class-b>
</mapping>
</mappings>

在定义了 XML 映射之后,我们可以从代码中使用它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class DozerConverter implements Converter { 
private final Mapper mapper;

public DozerConverter() {
this.mapper = DozerBeanMapperBuilder.create().withMappingFiles("dozer-mapping.xml").build();
}

@Override public Order convert(SourceOrder sourceOrder) {
return mapper.map(sourceOrder,Order.class);
}

@Override public DestinationCode convert(SourceCode sourceCode) {
return mapper.map(sourceCode, DestinationCode.class);
}
}

4.3. MapStruct转换器

MapStruct 的定义非常简单,因为它完全基于代码生成:

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Mapper 
public interface MapStructConverter extends Converter {
MapStructConverter MAPPER = Mappers.getMapper(MapStructConverter.class);

@Mapping(source = "status", target = "orderStatus")
@Override
Order convert(SourceOrder sourceOrder);

@Override
DestinationCode convert(SourceCode sourceCode);
}

更多使用案例可以参考我之前写的一篇文章:《优雅的对象转换-MapStruct》

4.4. JMapperConverter转换器

JMapperConverter 需要做更多的工作。实现接口后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class JMapperConverter implements Converter { 
JMapper realLifeMapper;
JMapper simpleMapper;

public JMapperConverter() {
JMapperAPI api = new JMapperAPI().add(JMapperAPI.mappedClass(Order.class));
realLifeMapper = new JMapper(Order.class, SourceOrder.class, api);
JMapperAPI simpleApi = new JMapperAPI().add(JMapperAPI.mappedClass(DestinationCode.class));
simpleMapper = new JMapper( DestinationCode.class, SourceCode.class, simpleApi);
}

@Override
public Order convert(SourceOrder sourceOrder) {
return (Order) realLifeMapper.getDestination(sourceOrder);
}

@Override
public DestinationCode convert(SourceCode sourceCode) {
return (DestinationCode) simpleMapper.getDestination(sourceCode);
}
}

我们还需要为目标类的每个字段添加@JMap 注释。此外,JMapper 无法自行在枚举类型之间进行转换,它需要我们创建自定义映射函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@JMapConversion(from = "paymentType", to = "paymentType") 
public PaymentType conversion(com.baeldung.performancetests.model.source.PaymentType type) {
PaymentType paymentType = null;
switch(type) {
case CARD:
paymentType = PaymentType.CARD;
break;

case CASH:
paymentType = PaymentType.CASH;
break;

case TRANSFER:
paymentType = PaymentType.TRANSFER;
break;
}
return paymentType;
}

4.5. ModelMapper转换器

ModelMapperConverter 要求我们只提供我们想要映射的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class ModelMapperConverter implements Converter { 
private ModelMapper modelMapper;

public ModelMapperConverter() {
modelMapper = new ModelMapper();
}

@Override
public Order convert(SourceOrder sourceOrder) {
return modelMapper.map(sourceOrder, Order.class);
}

@Override
public DestinationCode convert(SourceCode sourceCode) {
return modelMapper.map(sourceCode, DestinationCode.class);
}
}

5.最后

可以看到不管使用哪种映射框架,代码量都比使用BeanUtils.copyProperties()要多,不过性能要比BeanUtils.copyProperties()强很多,BeanUtils.copyProperties()使用反射实现,性能较差。

作者个人比较推荐使用mapStruct,在编译期间生成增强代码,性能较高。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

让人头疼的分布式事务?

发表于 2021-11-17

「这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战」

数据库事务问题以及Spring事务管理的相关内容已经在上篇文章已经做了深入的理解,那么本篇文章我们就来一起讨论一下分布式事务,以及我们为什么我们要避免分布式事务。

  1. 分布式事务介绍

1.1 分布式的介绍

首先呢,分布式的发展生离不开云计算,微服务,分库分表等技术的发展。

分布式一般指代分布式服务,所谓分布式服务就是我们将服务部署于不同机器(物理机,虚拟机),让他们一起为我们提供服务,分布式服务可以说是单节点服务的扩展,单节点服务能力是有限的,我们通过拓展节点提升服务的能力。

1.2 事务的介绍

事务一般指代数据库事务,同一个事务操作能够保证其具有ACID四大特性。

  • 原子性(Atomicity) :要么都成功要么都失败。
  • 一致性(Consistency) :AID都是数据库处理的特征,一致性是强调应用系统从一个正确的状态到另一个正确的状态,可以说AID是为了保证C。
  • 隔离性(Isolation) :多个事务同时处理相同数据,事务间会互相隔离,不会互相影响。
  • 持久性(Durability) :事务完成,会将结果持久化到磁盘。

1.3 分布式事务

如果是同一服务操作同一数据库,Spring就可以很好的为我们解决事务问题,这种情况下是不存在分布式事务的。

什么是分布式事务?

分布式事务就是在数据库事务无法保证数据一致性,借助其他手段来保证数据的一致性。

这里的一致性需要参考CAP理论和BASE理论,它可以是强一致性,也可以是最终一致性。

分布式事务的产生可以归咎于两个原因:

  • 应用服务的多节点

从早期的SOA到现在的微服务,应用细粒度拆分成为一种趋势,不同的业务能力被拆分成多个服务,有些业务是相互关联的,比如商品,订单,积分等分配到不同的服务中,但是用户下单的时候,就需要扣减商品库存,增加用户积分,扣除用户余额,这样下单流程就会产生服务间的互相调用。这样我们就需要去保证这一系列操作的一致性,这时就需要分布式事务来解决。

  • 数据库服务的多节点

随着互联网的发展,各个公司拥有的数据量也在突飞猛涨,分库分表技术也日趋成熟,对于MySQL数据库来说单表数据量达到五千万就需要进行分库分表。事务是数据库层面,但凡是操作不同的数据库就会无法保证是同一个事务,需要使用分布式事务来处理。

  1. 分布式事务的解决方案

本篇文章不会花大量篇幅讲述分布式事务的解决方案,后期会专门写一篇文章深入讲解一下。

  • 二阶段提交(2PC)
+ XA


XA是一个分布式事务协议,由Tuxedo提出。XA中大致分为两部分:事务管理器和本地资源管理器。其中本地资源管理器由数据库实现,比如主流的数据库Oracle、Mysql等数据库都实现了XA接口,而事务管理器作为全局的调度者,负责各个本地资源的提交回滚。
+ AT


Seata中的一种实现方式,全局事务,无倾入性。
  • 三阶段提交(3PC)

3PC其实在2PC的基础上增加了CanCommit阶段,是2PC的变种,并引入了超时机制。

  • 补偿事务
+ Saga


记录回滚方案,如果一个正向操作执行失败,那么分布式事务会去执行前面各参与者的逆向回滚操作,使其回到初始状态。
+ TCC


TCC是基于业务逻辑去实现,对代码侵入性很强。它与2PC很像,但是需要应用自己去实现,所有的业务逻辑都需要实现**try、confirm、cancel**三个操作。
  • 本地消息表(最终一致性)

在数据库中加一张消息表,通过改状态,加定时任务,与消息事物类似,也是保证最终一致性

  • 消息事务(最终一致性)

消息事务是借助消息中间件来实现分布式事务,此种方式是保证最终一致性。

这里肯定有人疑惑为什么没有Seata,Seata实际是对上面几种方式的实现,Seata有四种模式AT、XA、Saga、TCC,AT和XA都是基于2PC的方式。

  1. 避免分布式事务

分布式事务的性能其实是很低的,往往在我们的服务拆分后,加入大量的分布式事务,看似很高大上。但是用过分布式事务的才知道分布式事务的坑有多少,本身数据库事务就很耗费资源,我们还加强了事务的复杂度,不但服务响应速度会受影响,出了错误也比较难排查,有时候本应该回滚的事物确没有回滚,或者部分回滚都是很让人恼火的事情。

所以说我们一定要尽量避免分布式事务,防止过度设计。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python matplotlib 绘制饼图 前言 1 等

发表于 2021-11-17

这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战

前言

众所周知,matplotlib.pyplot 提供绘制不同表格绘制方法,如使用plot()方法绘制折线,bar()绘制柱状图,hist()绘制直方图等等,关于它们使用详情可以查看如下链接。

  • matplotlib 绘制折线图:对折线图相关属性进行汇总说明
  • matplotlib 绘制柱状图:对柱状图相关属性进行汇总说明
  • matplotlib 绘制直方图:对直方图相关属性进行汇总说明
  • matplotlib 绘制散点图:对散点图相关属性进行汇总说明
  • matplotlib 绘制等高线图:对等高线图相关属性进行汇总说明

在matplotlib.pyplot 中还有一种图表用于直观表示占比情况的饼图,在matplotlib官网上也列举出非常多关于饼图的案例。

image.png

本期,我们将详细学习matplotlib 绘制饼图相关属性的学习,let’s go~

  1. 等高线图概述

  • 什么是饼图?

+ 饼图将各项的大小与总项和的比例显示在一个圆里
+ 饼图展示通过不同大小,来确定各项的占比
+ 饼图相同颜色的数据标记组成一个数据系列
+ 饼图可分为三维饼图、复合饼图、分离饼图
  • 饼图常用场景

+ 饼图可用在需要暂时各个部分构成比
+ 饼图可反映一个维度各项指标占总体的占比情况
+ 饼图适用在只看大体占比,不要数据精度的情况
  • 绘制等饼图步骤

1. 导入matplotlib.pyplot模块
2. 准备数据,可以使用numpy/pandas整理数据
3. 调用pyplot.pie()绘制饼图
4. 调用axis方法调整x/y轴间距相等
  • 案例展示

本期,我们将适用饼图来分析操作系统市场占比情况

+ 案例数据准备:使用random.randint产生5个数值



1
2
python复制代码import numpy as np
size = np.random.randint(0,100,5)
+ 绘制饼图
1
2
3
4
5
6
7
python复制代码import matplotlib.pyplot as plt\

plt.pie(size,labels=["Windows","MAC","Linux","Android","Other"])

plt.title("手机系统占比分析")

plt.show()
![image.png](https://gitee.com/songjianzaina/juejin_p12/raw/master/img/15ac4f94f14714bb62d831e3492375721e95a9adf088373b7165d2278532454a)
  1. 饼图属性

  • 设置饼图的颜色

+ 关键字:colors
+ 可取值选项:None或者颜色列表
+ 颜色列表可以由如下组成:
    - 表示颜色的英文单词:如红色"red"
    - 表示颜色单词的简称如:红色"r",黄色"y"
    - RGB格式:十六进制格式如"#88c999";(r,g,b)元组形式
  • 设置标签

+ 关键字:labels
+ 默认为:None
+ 需要传入列表形式的值
  • 设置突出部分

+ 关键字:explode
+ 默认为:None
+ 需要传入列表数据
+ 如果设置值后,其指定的部分会突出显示
  • 设置填入百分比数值

+ 关键字:autopct
+ 默认为:None
+ 可选择值形式:
    - 格式字符串如:'%1.1f%%'
    - 函数:可以调用函数内容
  • 饼图旋转

+ 从x轴逆时针旋转角度:startangle;默认为0,浮点类型
+ 指定分数方向顺逆时针:counterclock;默认为True,bool类型
  • 设置阴影

+ 关键字:shadow
+ 默认为False
+ 在饼图下绘制出阴影
  • 我们结合第一节的案例添加一些属性,需要显示占比数值,颜色显示指定颜色,突出MAC占比
1
2
3
4
python复制代码plt.pie(size,labels=["Windows","MAC","Linux","Android","Other"],
autopct="%1.1f%%",
explode=[0,0.1,0,0,0],
colors=("r","blue","#88c999",(1,1,0),"0.5"))

image.png

  1. 调整饼图的大小

我们在实际制作饼图时,会遇到改变饼图的大小,这是我们可以借助饼图属性关键字radius

  • radius:设置饼图半径大小

除此之外,我们还要使用textprops来控制显示的标签的大小

1
2
3
python复制代码plt.pie(size,labels=["Windows","MAC","Linux","Android","Other"],autopct="%1.1f%%",
explode=[0,0.1,0,0,0],
colors=("r","blue","#88c999",(1,1,0),"0.5"),radius=0.5,textprops={'size':"smaller"})

image.png

  1. 添加图例

我们在饼图中显示各项占比情况时,会在图表的旁边添加一组图例说明。

  • pyplot.pie()方法会返回patchee.Wedge列表、文本列表等数据
  • pyplot.legend()方法传入wedge元素和指定的labels标签
  • 同时可以同legend()方法bbox_to_anchor来设置图例的位置
1
2
3
4
5
6
7
8
9
10
python复制代码La = ["Windows","MAC","Linux","Android","Other"]

def f(pct,n):
num = int(round(pct*np.sum(n)))
return "{:.1f}%\n{:d}w".format(pct,num)

wedges ,text,autotexts =plt.pie(size,autopct=lambda pct: f(pct,size),
colors=("r","blue","#88c999",(1,1,0),"0.5"),textprops=dict(color='w'))

plt.legend(wedges,La,loc="right",bbox_to_anchor=(1,0,0.3,1))

image.png

  1. 镂空饼图

在饼图中,我们有时候也用到嵌套镂空的饼图。

  • 嵌套可以多次调用pyplot.pie()方法
  • 镂空可以借助pyplot.pie()属性wedgeprops来进行设置
  • wedgeprops={“width”:0.3,”edgecolor”:’w’}
1
2
3
4
5
python复制代码cmap = plt.get_cmap("tab20c")
plt.pie(size,
colors=("r","blue","#88c999",(1,1,0),"0.5"),textprops=dict(color='w'),wedgeprops=dict(width=0.3,edgecolor='w'))
plt.pie(size,
colors= cmap(np.arange(3)*5),radius=0.7,wedgeprops=dict(width=0.3,edgecolor='w'),textprops={'size':"smaller"})

image.png

总结

本期,对matplotlib.pyplot 绘制饼图pie()相关属性的学习。在绘制饼图时,我们会根据实际需求改变饼图的大小,嵌套饼图、添加柱状图等图形辅助查看

以上是本期内容,欢迎大佬们点赞评论,下期见~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…306307308…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%