开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

《大厂面试》面试官看了直呼想要的简历

发表于 2019-12-30

你知道的越多,你不知道的越多

点赞再看,养成习惯

本文 GitHub github.com/JavaFamily 已收录,有一线大厂面试点思维导图,也整理了很多我的文档,欢迎Star和完善,大家面试可以参照考点复习,希望我们一起有点东西。

前言

昂每周我的邮箱都会收到各式各样的简历,但是说实话通过率真的太低了,hr都要吐槽我了,大家还是要好好写简历呀,能力再强简历差了,也不行啊。

HR经常一天过几百份,甚至上千份简历,可能看10-30秒就会觉得留下来继续看一下,是邀约面试,还是PASS掉,但是很多简历甚至那10秒都撑不到。

都说“人靠衣装佛靠金装”,同理我们出去找工作,简历也是我们的门面了,你想找到好工作,就得先过简历这一关了。

那被PASS的原因就很多了,要么是不符合招聘要求,比如学历不符、个人硬实力不符、年限、薪资不匹配、简历花里胡哨、简历看不下去了等等。

总之总有一个因素是你简历投递出去,石沉大海,也不要抱怨对方不给你机会,或许是你简历乱写,不注意细节,这算是自己不给自己机会呢,对吧?

正文

吹完了那我们简历到底要有哪些基本要素,又应该怎样去写每一块东西呢?

  • 简历模板
  • 格式排版
  • 基本信息
  • 专业技能&教育背景(学校非必要,后面我会说为啥)
  • 工作履历(校招选填)
  • 实习经历/项目经历
  • 技能证书(社招选填)
  • 自我评价

我想这些基本上是一个简历所必要的点,基本上写完这些,刚好两页,也是比较适合的页数了,我自己在实习和后面投递简历的时候基本上也是按照这个布局写的,也都是两页刚好那样。

那说了这些点,我们就逐个点分析下。

简历模板

我们技术人简历的模板其实简约点就好了,我曾经见过炫彩夺目的简历,那可真是闪瞎了我的钛合金狗眼,各种花里胡哨,你又不是搞艺术的,搞那么花哨干嘛。

选个简约的模板,找对字体就好了,整个简历的颜色不要超过三种颜色,其实黑白就挺好的。

总之不要花里胡哨,那样肯定会把你当非主流筛选掉的,字体也不要花里花哨,标点符号也要统一,总之格式这种东西见微知著,可以看出你是否是个细心的人。

像下面这种,像上个世纪的非主流,不仅不会吸引到面试官,反而会反手直接得到一个拒绝。

说是要简约,但是也不要简约过头了,像我们初高中那种格子简历,出社会还是成熟点好了。

同样是黑白的色调,敖丙自己的简历是不是就相对而言会整洁很多,看起来也比较条理清晰。

Tip:简历内容都是我改过了的,不过简历的模板和布局倒是我之前一直在用的,公众号回复【模板】获取简历模板。

格式排版

就比如有小朋友发给我的简历,一会英文标点符号,一会中文标点,讲道理不好看哟。

还有格式,大段大段的项目介绍都不带分段的,看着脑仁疼,你确定面试官会愿意在你这上面浪费这么多时间嘛?

完全可以对内容进行分段,让面试官阅读的时候有个比较好的体验。

还有就是字体格式,标题统一黑体就黑体,正文统一宋体就宋体,甚至看到过,同一段字体不一样,大小不一样的都有,注意哟。

简历最好输出为PDF格式,因为word有太多版本了,或许你用的03的hr的电脑是06的,又或者用的WPS的,你不能确保格式在他那不会乱,那输出成PDF就不存在兼容问题了。

基本信息

基本信息我想大家都有的吧,那基本信息里面可以填写的要素有哪些呢?

  • 姓名:这个不用说了名字总的有吧,甚至遇到过名字都写错的仔,噗此。
  • 电话:最重要的联系方式,不能错!!!

我身边有过一个小伙伴,简历投递出去2周了没有收到任何回复,我当时看他简历我觉得也没啥问题,一个邀约都没很反常,然后我让他确认电话啥的,你猜怎么着,这逼把电话填了一个自己没开机的号,因为他有两个号码,当时我想抽他了,你也知道这两周得浪费多少钱,而且心态也容易崩。

  • 电子邮箱:这个跟电话一样,都是很重要的联系方式,很多面试邀请也会通过邮箱发出,Offer就全是邮箱发的了,所以错不得,以前有说法用QQ邮箱面试没格局,不知道现在还有没有这种说法,反正我用的别的邮箱。
  • 求职意向:Java后端就后端,Web前端就前端,CEO就CEO咯,应聘啥写啥。
  • 工作年限:这个不是每个仔都有的,比如应届生,实习生等,大家就不用写了,我甚至收到应届生写工作经验2年的,实习不算工作年限哟,反正hr那也会给你忽略掉的,你这么写,hr没注意你还没毕业,把简历给技术,面试的时候就按照两年的标准问你,你这不是给自己挖坑?

几年就写几年的,没必要搞这些花里胡哨的,去大厂你年限写错了,可能还有简历造假被拉黑的嫌疑哟。

  • 年龄:这个我觉得写好了也是个加分项,比如你比较年轻,你就写个22啥的,hr一看原来是个年轻力壮的小伙子,估计可以进来加班,越他面试试试,如果年龄偏大就不写了,免得在简历赛选的时候就GG了,真在意年龄的公司最后可能会PASS你,但是你至少得到了面试的机会嘛。
  • 婚姻状况:这个可写可不写,但是你写个单身,这hr得两眼放光,配合上面的年龄,基本上就是一套完整的combo,嗯不错单身年轻,估计加班肯定比别人能顶。
  • 在职状态:应届仔也不用管这个,社招仔可以写下,是否已离职,这样HR小姐姐就知道。
  • 博客/GitHub:这个就完全是加分项了,如果你有好的开源项目,甚至你给某些知名的开源项目提交过Pr啥的都可以提一嘴,博客一定要有一定的内容,光是各种散记,那面试官也会忽略的。

基本信息这里,能表达清楚基本的信息其实就够了,有博客啥的加分项那就再好不过了。

Tip:我这里就是给了DEMO,正常情况点击博客和CSDN是要有链接跳转的,清晰明了,直接去看博客、GitHub看看你是不是真的有点东西,很关键。

专业技能&教育背景

  • 学历:这个感觉还是很有必要填写的,你不填HR也会问你的,本科就本科,专科就专科,没啥好避讳的,介意的话他也不会要你,不然你面到最后人家发现你学历不符合,这是在浪费大家的时间。
  • 学校:这个我说过了,也是个选填项,如果你的学校是个一本,或者挺不错的二本还是可以写一下的,但是如果跟帅丙一样是个渣本,名不见经传那种,你就还是不要写了,免得HR一看就给你跳过了。

不过问你的时候你要一五一十的说,最好提前准备好学信网的截图啥的。

  • 专业:是啥专业就写啥专业,非计算机相关的可以不写,但是基本上HR都会问的。
  • 技能:这个点是整个简历我个人认为最核心的点,因为在这里HR、面试官都会很关注这个点(在学历年限等条件符合的条件下),这里大概可以看出你是否能和当前所需岗位匹配。

一般都是缺什么样子的人才才会招什么样的人,那缺的东西他们就会写成岗位要求,比如:

上面这个就是很常见的职位要求了,那跟你的技能这一栏有啥关系呢?

敖丙我就会针对我比较心仪的岗位写不同的技能,比如上面这个,他写要求啥,我就写我会啥,那你投递过去是不是几率大了?(前提你真的得会)

  • 熟练程度:一般分为:熟悉、熟练掌握、精通几个等级,精通二字我劝大家不要乱写,当然你有那个实力也是可以的,我身边的仔出去面试简历上我也只能看到一个精通,我甚至看到过应届生3个精通的,当时我尿了,这面试官也怕呀,你比他还牛,这能招你?开玩笑。

因为我自己的学校比较渣,所以我就没写学校,就提了一下是个本科,真的在意学校HR反正也会问你的。

专业技能在写的时候,最好也是有层次一点,从java基础,到中间件,数据库,然后到框架啥的,看个人习惯吧,基础放前面没啥问题。

工作履历

  • 在职时间:这个如实写一点都不能错,没必要编造,查一下你社保公积金就啥都知道了,你编没有一点意义,而且被查出来造假,还会被拉黑,得不偿失。
  • 所属行业:这个我个人认为也有必要,毕竟你公司不是那种耳熟能详的公司的话,HR可能还得去百度查一下,你觉得在简历本身就没啥特点的情况下,她会花几分钟去查你公司是干啥的么?

你写在这里还减少了小姐姐的工作了,一看你是个卵男,保不齐就约你面试呢?你们说我说得对不对吧。

  • 公司名称:让小姐姐知道你老东家的大名很有必要,要是是大厂基本上分分钟就可以等约面了,小点的我们也不自卑,这个问题不大。
  • 担任职位:写职责是为了看你之前所做的工作,是否一直都是这个垂直领域的,你干了5年阿里的后厨,你来应聘资深算法,我觉得怕是有点不得行哟。

在职时间真的一点都不要编造哟,只有小伙伴入职腾讯,背调的时候发现差了一个月的时间,就取消了入职还拉黑了,这个正常写就好了。

切记不要频繁跳槽,频繁跳槽会让面试官觉得你很浮躁,呆半年就跳一个,谁敢要你呀对吧,不满一年就想走啥的,兄弟我觉得忍一下,怎么都要满一个整年。

在名不见经传的小公司,最好多点经验再尝试大厂,毕竟没啥好的背书,等能力够得着p6的级别再尝试,不要过多的跳槽,这样弊大于利。

切记不要任何形式的造假,被拉黑了,就是一辈子无缘这个公司了。

实习经历/项目经历

  • 项目名称:一般这个不会带来什么加分点,你写个出名的项目另说。
  • 项目描述:你用较为简短的话语介绍完你的项目,让面试官和HR一看就知道这大概是个什么项目,是个电商商场,还是个后台管理系统什么的。
  • 涉及技术:把项目中的亮点技术栈都枚举出来,如果能跟招聘要求大部分匹配上,那又是一个隐藏的加分点老铁,稳了呀。

列出来的技术,是当前比较流行的技术栈那是极好的,不是也没关系别瞎写,写个你不会的,你在给自己挖坑,有小伙伴肯定会说不会可以学,面试时间本来就是很高的成本,你还花这么多时间学新东西,可以,但是没必要。

  • 设计技术:这个主要是把其中比较有东西的链路拿出来说一下,并且说一下这个链路用了什么思路,用了什么技术栈,面试官问都方便问了,知道为啥问得我都会么?因为都是我写出来让他问的哈哈。
  • 个人职责:写清楚你在小组内的角色,以及你的个人职责,比如你是小组的后端开发,负责订单模块啥的。

这个点的目的是为了让面试官在看的时候清楚你在整个项目的位置,你负责了啥模块他问的时候也有针对性的问了。

  • 难点收获:这个没有就不写了,有的话还是可以写一下的,像我自己一些大促场景的线上BUG,还有一些难点攻坚的,我都会写一下,这样可能会给面试官进一步了解你的机会。

从项目描述,涉及技术,个人职责,清晰的调理去介绍你的个人项目,以及你的职责,这样面试官一看完心里就有数了,你几斤几两或者怎么面你,他都知道了。

项目一定要自己亲手负责的,因为面试肯定会问你负责了啥,有啥难点,你说你负责下单我就问你下单怎么实现,你说你做了秒杀,我就问你秒杀怎么实现,聊到对应的技术栈也不会放过你的,所以编就是在给自己挖坑。

还有小伙伴面试的时候想用支支吾吾,口齿不清的回答掩盖自己的尴尬,卧*,你当面试官是傻子嘛?

不要写N个不大不小的项目,一个公司你写一到两个比较大型和独特的项目就够了,写多了面试官也觉得get不到重点,也觉得你不行,我遇到过写了Demo级别项目上去的,问他啥都不知道,啥都没实现,完犊子。

还是那句话,不要任何形式的项目造假,一问就能问出来了,你不会就不会,你编个项目那性质就是造假,就是骗人了,你就永久进入这个面试官的黑名单了。

常见项目问题:

  • 你负责了项目的哪块内容?
  • 项目的难点痛点是什么?你们怎么解决的?
  • 你使用XX技术栈的时候有没有什么坑,你们怎么解决的?
  • 项目中遇到过什么印象比较深的Bug?
  • 遇到XX情况么?怎么解决的,怎么优化的?能多说几种方案么?
  • 你是根据哪些指标进行针对性优化的?

奖项证书

这一块社招的仔选填吧,除非你真的有比较香的奖,像我身边就有什么ACM金牌的仔,这种就是很加分了,也可能是SSP的加分理由。

校招的同学因为没啥好的项目经历,那面试官可能就会看你的个人奖项是否有含金量了,也可以充实简历,不然简历干巴巴的看上去就那么一点东西,实在尴尬。

英语等级证书是比较加分而且比较硬的,别的证书大家都懂的就没必要怼了,除非是真的拿到的不错的奖,买的写上去面试官也懂的,所以。

自我评价

性格:展现你真实的性格就好了,反正面试也能问出来

加班出差:这个因为我一直都是单身,所以都提到了接受加班和出差,无所谓出来打工嘛,没差,不过你又家人,有女仔啥的,该写啥就写啥,别强行接受了,到时候你哭也没用的。

业余爱好:可以写自己爱运动,爱阅读,还可以写点性格,喜欢钻研,喜欢阅读源码啥的

这一块别跟自己出入太大了,正常写就好了,你就是你,这条街最靓的仔不是嘛。

总结

Tip:公众号回复【模板】获取简历模板

上面这些就构成了一个朴实无华的简历了,很多小伙伴说学校不好没人要,我只能说我学校也不好,但是我的面试邀约基本上也还是有的,很多都是因为简历各种不合格,在最开始那关就被PASS的。

至于项目经常说没高并发项目啥的,也没事,没做过高并发的项目,你自己就不去研究高并发和多线程了?

你研究了那你为啥不写上去,面试官看到你会,一样会给你面试机会,过不过那就得看你的了。

我觉得学历不是限制大家的原因,没有好项目也不是,能力才是,好好学习,我们一起进步,大厂也不过尔尔。

点关注,不迷路

好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是人才。

我后面会每周都更新几篇一线互联网大厂面试和常用技术栈相关的文章,非常感谢人才们能看到这里,如果这个文章写得还不错,觉得「敖丙」我有点东西的话 求点赞👍 求关注❤️ 求分享👥 对暖男我来说真的 非常有用!!!

白嫖不好,创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见!

敖丙 | 文 【原创】

如果本篇博客有任何错误,请批评指教,不胜感激 !


文章每周持续更新,可以微信搜索「 三太子敖丙 」第一时间阅读和催更(比博客早一到两篇哟),本文 GitHub github.com/JavaFamily 已经收录,有一线大厂面试点思维导图,也整理了很多我的文档,欢迎Star和完善,大家面试可以参照考点复习,希望我们一起有点东西。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis配置文件详解(全网最全的原创版本)

发表于 2019-12-26

配置文件版本使用的是redis 4.0.14,
某些参数需要处理了解linux kernel,笔者不太了解linux内核参数,后面还要继续努力呀。

  1. 常规命令

1.1 ./redis-server /path/to/redis.conf

启动redis并使配置文件生效

1.2 include /path/to/local.conf

include 可以使用多个配置文件,如果配置文件有相同值,后面的会覆盖前面的:

1
2
lua复制代码include /path/to/local.conf
include /path/to/other.conf

1.3 loadmodule /path/to/my_module.so

加载modules 没什么用好像,还需要继续研究

1
2
bash复制代码loadmodule /path/to/my_module.so
loadmodule /path/to/other_module.so

1.4 bind 127.0.0.1

绑定ip地址,为了安全最好都绑定

1.5 protected-mode yes

保护模式,如果保护模式开了,而且redis既没有bind ip,也没设置密码,那redis只接收127.0.0.1的连接。
默认都开

1.6 port 6379

端口,设置为0就不会监听

1.7 tcp-backlog 511

linux 内核tcp_max_syn_backlog和somaxconn 参数调优

1.8 unixsocket /tmp/redis.sock unixsocketperm 700

unix socket ,默认不监听,没用

1.9 timeout 0

连接闲置N秒时关闭连接

1.10 tcp-keepalive 300

开启TCP长连接,如果设置非0,会使用系统的SO_KEEPALIVE间隔发送TCP ACK给客户端,以防连接被弃用。这个很有用:

  • 检测死掉的连接。
  • 如果网络之间还有其他的网络设备,可以连接保活
    注意,如果想依靠这个机制关闭连接,可能需要两倍的时间,主要取决于kernel的配置。

默认值是300。

  1. 标准配置

2.1 daemonize yes

默认情况redis不会按照守护进程的模式去运行。如果你需要,可以设置来开启
注意,如果开启守护进程模式,会生成/var/run/redis.pid保存pid

2.2 supervised no

这个不太明白,暂时不翻译了,搞懂后更新

1
2
3
4
5
6
7
8
9
vbnet复制代码 If you run Redis from upstart or systemd, Redis can interact with your
supervision tree. Options:
supervised no - no supervision interaction
supervised upstart - signal upstart by putting Redis into SIGSTOP mode
supervised systemd - signal systemd by writing READY=1 to $NOTIFY_SOCKET
supervised auto - detect upstart or systemd method based on
UPSTART_JOB or NOTIFY_SOCKET environment variables
Note: these supervision methods only signal "process is ready."
They do not enable continuous liveness pings back to your supervisor.

2.3 pidfile /var/run/redis_6379.pid

pid文件路径 ,默认值/var/run/redis.pid
如果在非守护进程模式下,而且也没配置pidfile路径,那么不会生成pid文件。如果是守护进程模式,
pidfile总会生成,没配置pidfile就会用默认路径。

2.4 loglevel notice

指定服务的日志级别:

  • debug
  • verbose
  • notice
  • warning
    默认 notice

2.5 logfile ""

指定redis日志文件名称和路径。你也可以设置logfile ""强制redis将日志输出的标准输出。
注意,如果你使用标准输出,而且redis使用守护进程模式运行,那log日志会被发送给/dev/null,就没了

2.6 syslog-enabled no

1
2
vbnet复制代码To enable logging to the system logger, just set 'syslog-enabled' to yes,
and optionally update the other syslog parameters to suit your needs.

2.7 syslog-ident redis

Specify the syslog identity.

2.8 syslog-facility local0

Specify the syslog facility. Must be USER or between LOCAL0-LOCAL7.

2.9 databases 16

  • 使用集群模式时,database就是0
  • 设置数据库的数量。redis默认的数据库就是0,你可以选择不同的数据库,在一个redis连接中执行selcet ,dbid可选的范围是0(databases-1),默认就是015

2.10 always-show-logo yes

搞笑配置,永远显示redis的logo

  1. 快照相关

3.1 开启RDB持久化

开启RDB持久化,save <seconds> <changes>

save 900 1:就是900秒有一次更改就做一次rdb快照到磁盘。

禁用rdb就是注释掉save行,如果你配置了save "",也可以禁用rdb。

下面是默认值

1
2
3
复制代码save 900 1
save 300 10
save 60 10000

3.2 stop-writes-on-bgsave-error yes

默认情况下,如果RDB快照功能开启而且最后一次rdb快照save失败时,redis会停止接收写请求,
这其实就是一种强硬的方式来告知用户数据持久化功能不正常,否则没有人会知道当前系统出大问题了。
如果后台save进程正常工作了(正常保存了rdb文件),那么redis会自动允许写请求。
不过如果你已经设置了一些监控到redis服务器,你可能想要禁用这个功能,这样redis在磁盘出问题时依旧可以继续处理写请求。只要set stop-writes-on-bgsave-error yes

3.3 rdbcompression yes

使用LZF算法对rdb文件进行压缩,如果要节省一些CPU,可以设置为no。

3.4 rdbchecksum yes

自从redis 5.0,rdb文件的末尾会设置一个CRC64校验码(循环冗余码)。这可以起到一定的的纠错作用,但是也要
付出10%的性能损失,你可以关闭这个功能来获取最大的性能。
如果rdb文件校验功能关闭,那么系统读取不到检验码时会自动跳过校验。
rdbchecksum yes

3.5 dbfilename dump.rdb

rdb文件名

3.6 dir ./

redis的工作目录 ,aof文件,rdb文件还有redis cluster模式下的node.conf文件均会创建在这个目录下。

3.7 slaveof <masterip> <masterport>

主从复制。使用slaveof配置将redis实例变为其他redis服务器的一个拷贝。

  • redis的主从复制时异步的,但是当主节点无法连接到给定数量的从节点时,你可以设置主节点停止处理写请求
  • 如果主从复制断了一小段时间,redis从节点可以执行一次局部的重新同步,你可能需要设置复制的backlog size
  • 主从复制时自动的无需用户干预。如果网络中间断了,从节点会自动重连主节点,并发起一次重新同步。

3.8 masterauth <master-password>

如果主节点有密码,从节点必须配置这个密码,否则主节点拒绝复制请求。

3.9 slave-serve-stale-data yes

当主从同步失败时,从节点有两种行为:

  • 配置为yes,从节点可以继续响应客户端的请求。
  • 配置为no,从节点直接报错”SYNC with master in progress”,不过INFO和SALVEOF命令是可以执行的。

3.10 slave-read-only yes

你可以设置从节点能够处理写请求。向从节点写入一些临时数据有时候是有用的(因为数据在resync后很快就会删除),如果配错了也可能会造成一些问题。
2.6版本以后默认都是read-only。
read-only不是设计成对抗那些不可信的客户端的。只是怕客户端用错命令。read-only模式下一些管理类命令还是会输出的。如果要限制这种命令,你可以使用rename-command来重命名那些管理类命令

3.11 repl-diskless-sync no

主从同步策略:disk或socket。

警告:diskless复制目前只是试验阶段
当出现新的从节点或重连的从节点无法进行增量同步时,就需要做一次全量同步(full synchronization)。
一个RDB文件会从主节点传输到从节点,传输方式有两种:

  • disk-backed:主节点创建一个新的进程将RDB文件写到磁盘。然后这个文件会被主进程逐步传送给多个从节点
  • diskless:主节点创建一个新的进程,直接将RDB文件写给从节点的socket连接,从头到尾不会碰磁盘。

使用disk-backed复制,在rdb文件生成完毕后,主节点会为每个从节点创建队列来传说RDB文件,直到传输结束。
使用diskless复制,一旦开始传输rdb,当时有多少从节点建立连接,就只能并行传输多少从节点,如果此时有新的从节点发起全量同步,就只能等之前的都传完。
如果使用diskless复制,主节点会在传输之前等待一小段时间(这个时间可以配置),这样可以让多个从节点到达
,并做并行传输。
如果磁盘贼慢,网络带宽特别好,diskless复制策略效果会更好一些。

3.12 repl-diskless-sync-delay 5

如果开启了diskless复制,需要配置一个延迟时间,让主节点等待所有从节点都到达。
这是非常重要的,因为一旦开始传输,主节点就无法响应新的从节点的全量复制请求,只能先到队列中等待下一次RDB传输,所以主节点需要等待一段时间,让所有从节点全量复制都到达。
这个延迟时间的单位是秒,默认是5秒。关闭这个特性可以将其设置成0,这样传输总是马上开始。

3.13 repl-ping-slave-period 10

从节点在一定间隔时间发送ping到主节点。默认是是10秒。

3.14 repl-timeout 60

这个值对三个场景都有效:

  1. 大量的I/O操作,从节点收到主节点的响应时间。
  2. 从节点认为主节点的超时时间
  3. 主节点认为从节点的超时时间

注意,这个值一定要设置的比repl-ping-slave-period大,否则每次心跳检测都超时

3.15 repl-disable-tcp-nodelay no

在从节点socket 发起SYNC同步后是否需要关闭TCP_NODELAY?
如果选择YES,redis会使用较小的tcppacket和较小的带宽去发送数据到从节点。但是这会让主从复制增加部分延迟,差不多40毫秒,取决于linux kernel配置。
如果选择no,主从复制延迟会稍微减少,但是会消耗更大的网络带宽。
默认我们倾向于低延迟,但是如果网络状况不好的情况时将这个选项置为yes或许是个好方案。

3.16 repl-backlog-size 1mb

设置主从复制backlog大小。backlog是一个缓冲区。
当主从不同步时,主节点缓存主从复制数据到backlog缓冲区中,当从节点重新连接到主节点时,从节点可以从缓冲区中拿到增量同步数据,并进行增量同步(partitial synchronization)。
backlog越大,允许从节点断线的时间就越长。backlog缓冲区只有在最少有一个从节点连接时才会创建。

3.17 repl-backlog-ttl 3600

如果主节点再也没有连接到从节点,那个从节点的backlog会被释放。
当从节点断线开始,这个配置的时间就开始计时了。单位是秒。
设置为0说明永远都不会释放backlog。

3.18 slave-priority 100

这个配置是给哨兵模式用的,当主节点挂掉时,哨兵会选取一个priority最小的从节点去升主,如果某个redis节点的这个值配成0,那么这个节点永远都不会被升为主节点。默认值就是100。

3.19 min-slaves-to-write 3和min-slaves-max-lag 10

如果lag秒内主节点在线的从节点少于N个,主节点停止接收写请求。
例如10秒内最少3个从节点在线时,主节点才接受写请求,可以用如下配置:

1
2
arduino复制代码min-slaves-to-write 3
min-slaves-max-lag 10

将这两个配置任意一个设置为0,就禁用此功能。默认是禁用的。

3.20 slave-announce-ip 5.5.5.5 和 slave-announce-port 1234

有多种方式可以显示主节点当前在线的从节点的ip和端口。
例如,info replication 部分,或者在主节点执行ROLE命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bash复制代码#
# The listed IP and address normally reported by a slave is obtained
# in the following way:
#
# IP: The address is auto detected by checking the peer address
# of the socket used by the slave to connect with the master.
#
# Port: The port is communicated by the slave during the replication
# handshake, and is normally the port that the slave is using to
# list for connections.
#
# However when port forwarding or Network Address Translation (NAT) is
# used, the slave may be actually reachable via different IP and port
# pairs. The following two options can be used by a slave in order to
# report to its master a specific set of IP and port, so that both INFO
# and ROLE will report those values.
#
# There is no need to use both the options if you need to override just
# the port or the IP address.
#
# slave-announce-ip 5.5.5.5
# slave-announce-port 1234
  1. 安全

4.1 requirepass foobared

给redis设置密码,因为redis快的一逼,一秒钟攻击者能尝试150000次密码,所以你的密码必须非常强壮否则很容易被暴力破解。

4.2 rename-command CONFIG ""和rename-command CONFIG b840fc02d524045429941cc15f59e41cb7be6c52

完全杀掉一个命令就用rename-command CONFIG ""

rename-command CONFIG b840fc02d524045429941cc15f59e41cb7be6c52可以将命令改掉,这样彩笔程序员就不会使用危险命令了。

注意,如果你把命令给改名了,那么从节点什么的都要统一改名字,否则会有问题。

  1. 客户端

5.1 maxclients 10000

设置同一时刻的最大客户端数。
默认值是10000,只要达到最大值,redis会关闭所有新的链接,并且发送一个错误“max number of clients readched”给客户端。

  1. 内存管理

6.1 maxmemory

设置一个内存的最大值。当内存达到最大值之后,redis会按照选择的内存淘汰策略去删除key。
如果redis根据淘汰策略无法删除key,或者淘汰策略是noeviction,客户端发送写请求时redis会开始返回报错,并且不会使用更多的内存。但是读请求还是会继续支持的。
注意,如果你有很多从节点,那么内存设置不能太大,否则从节点发起全量同步时,output buffer占用的内存也在这个maxmemory的范围内,例如,最大值配的是4GB,如果内存已经3G了,此时一个从节点发起全量同步,outputbuffer你设置的是2G这样内存直接就满了,然后就要开始淘汰key,这肯定不是我们想要的。

6.2 maxmemory-policy noeviction

内存淘汰策略,决定了当redis内存满时如何删除key。
默认值是noeviction。

  • volatile-lru -> 在过期key中使用近似LRU驱逐
  • allkeys-lru -> 在所有key中使用近似LRU
  • volatile-lfu -> 在过期key中使用近似LFU驱逐
  • allkeys-lfu -> 在所有key中使用近似LFU
  • volatile-random -> 在过期key中随机删除一个
  • allkeys-random -> 在所有的key中随机删除一个
  • volatile-ttl -> 谁快过期就删谁
  • noeviction -> 不删除任何key,内存满了直接返回报错

LRU means Least Recently Used
LFU means Least Frequently Used

LRU, LFU and volatile-ttl 基于近似随机算法实现。
注意,使用上述策略时,如果没有合适的key去删除时,redis在处理写请求时都会返回报错。

写明令: set setnx setex append
incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
getset mset msetnx exec sort。

6.3 maxmemory-samples 5

LRU, LFU and minimal TTL algorithms
不是精确的算法,是一个近似的算法(主要为了节省内存),
所以你可以自己权衡速度和精确度。默认redis会检查5个key,选择一个最近最少使用的key,你可以改变这个数量。默认的5可以提供不错的结果。你用10会非常接近真实的LRU但是会耗费更多的CPU,用3会更快,但是就不那么精确了。

  1. LAZY FREEZING 懒释放

redis有两个删除key的基本命令。一个是DEL,这是一个阻塞的删除。DEL会让redis停止处理新请求,然后redis会用一种同步的方式去回收DEL要删除的对象的内存。如果这个key对应的是一个非常小的对象,那么DEL的执行时间会非常短,接近O(1)或者O(log n)。不过,如果key对应的对象很大,redis就会阻塞很长时间来完成这个命令。鉴于上述的问题,redis也提供了非阻塞删除命令,例如UNLINK(非阻塞的DEL)和异步的删除策略:FLUSHALL和FLUSHDB,这样可以在后台进行内存回收。这些命令的执行时间都是常量时间。一个新的线程会在后台渐进的删除并释放内存。

上面说的那些命令都是用户执行的,具体用哪种命令,取决于用户的场景。
但是redis本身也会因为一些原因去删除key或flush掉整个内存数据库。
除了用户主动删除,redis自己去删除key的场景有以下几个:

  • 内存淘汰(eviction),设置了内存淘汰策略后,为了给新数据清理空间,需要删除被淘汰的数据,否则内存就爆了。
  • 过期(expire),当一个key过期时
  • key已经存在时的一些边际影响。例如,set一个已经存在的key,旧的value需要被删除,然后设置新的key。
  • 主从复制时,从节点执行一个全量同步,从节点之前的内存数据需要被flush掉。
    如果你希望上面那四种场景使用异步删除,可以使用如下配置:
1
2
3
4
perl复制代码lazyfree-lazy-eviction no
lazyfree-lazy-expire no
lazyfree-lazy-server-del no
slave-lazy-flush no
  1. AOF

8.1 appendonly no

默认情况下,redis异步的dump内存镜像到磁盘(RDB)。这个模式虽然已经很不错了,但是如果在发起dump之前机器宕机,就会丢失一些数据。
AOF(Append only file)是一种可选的持久化策略提供更好数据安全性。使用默认配置的情况下,
redis最多丢失一秒钟的写入数据,你甚至可以提高级别,让redis最多丢失一次write操作。
AOF和RDB持久化可以同时开启。如果开了AOF,redis总会先加载AOF的文件,因为AOF提供更高的可用性。

8.2 appendfilename "appendonly.aof"

aof 文件的名称。

8.3 appendfsync everysec

对操作系统的fsync()调用告诉操作系统将output buffer中的缓冲数据写入到磁盘。有些操作系统会
真正的写磁盘,有一些会尽量去写,也可能会等一下。
redis 支持三种方式:

  • no: 不去主动调用fsync(),让操作系统自己决定何时写磁盘
  • always:每次write操作之后都调用fsync(),非常慢,但是数据安全性最高。
  • everysec:每秒调用一次fsync(),一个折中的策略。

默认就是everysec,一般也是推荐的策略,平衡了速度和数据安全性。

1
2
3
perl复制代码appendfsync always
appendfsync everysec
appendfsync no

8.4 no-appendfsync-on-rewrite no

当AOF fsync 策略设置成always或者everysec,而且一个后台的save进程(可能RDB的bgsave进程,也可能是
AOF rewrite进程)正在执行大量磁盘I/O操作,在一些linux配置中,redis可能会对fsync()执行太长的调用。
这个问题目前没什么办法修复,也就是说就算起一个后台进程去做fsync,如果之前已经有进程再做fsync了,后来的调用
会被阻塞。
为了缓和这个问题,可以使用下面的配置,当已经有BGSAVE和BGREWRITEAOF在做fsync()时,就不要再起新进程
了。
如果已经有子进程在做bgsave或者其他的磁盘操作时,redis无法继续写aof文件,等同于appendsync none。
在实际情况中,这意味着可能会丢失多达30秒的日志。也就是说,这是会丢数据的,如果对数据及其敏感,要注意这个问题。
如果你有延迟类问题,可以设置成yes,否则设置为no,这样能保证数据的安全性最高,极少丢数据。

8.5 auto-aof-rewrite-percentage 100和auto-aof-rewrite-min-size 64mb

自动重写aof文件。当aof文件增大到某个百分比时,redis会重写aof文件。
redis会记住上次rewrite后aof文件的大小(如果启动后还没发生过rewrite,那么会使用aof原始大小)。
这个size大小会和当前aof文件的size大小做比较。如果当前size大于指定的百分比,就做rewrite。
并且,还要指定最小的size,如果当前aof文件小于最小size,不会触发rewrite,这是为了防止文件其实很小,但是
已经符合增长百分比时的多余的rewrite操作。
如果指定percentage为0代表禁用aof rewrite功能

8.6 aof-load-truncated yes

当Redis启动时会加载AOF文件将数据还原到内存中,但是有时候这个AOF的文件可能被损坏掉了
,例如文件末尾是坏的。这种情况一般都是由于redis宕机导致的,尤其是使用ext4文件系统挂载
时没配置 data=ordered选项。
在这种情况下,redis可以直接报错,或者尽可能的读取剩余可读的AOF文件。

如果 aof-load-truncated=yes,redis依然会读取这个损坏的aof文件,但是会打出一个报错日志,
通知用户。
如果 aof-load-truncated=no,redis就会报错并拒绝启动服务,用户需要使用redis-check-aof工具
修复aof文件,再启动redis。
如果redis运行时aof文件崩溃,redis依然会报错并退出。这个选项救不了这种情况。

8.7 aof-use-rdb-preamble no

当redis重写aof文件时,redis可以先读一个rdb来加快重写的速度,当这个选项打开时,重写的aof文件由
两部分组成:rdb文件+aof文件。
当redis启动时加载的aof文件以 “REDIS”开头,就会加载rdb文件,然后再读取剩余的AOF文件。
默认这个选项是关闭的,

  1. LUA脚本

9.1 lua-time-limit 5000

表示一个lua脚本的最大执行毫秒数。
如果执行时间达到了最大时间,redis会log这个脚本已经超时了,并且会报个error。
当一个脚本执行超时,只有SCRIPT KILL和SHUTDOWN NOSAVE命令是可用的。第一个命令可以去
停止一个不包含写命令的脚本。第二个命令是唯一一个可以停掉超时写命令的脚本。
将lua-time-limit设置成0或负数表示你不限制执行时间,并且不会有任何警告。

  1. Redis Cluster

10.1 cluster-enabled yes

普通的redis实例无法成为cluster的一员的;只有node可以。想要将节点加入redis cluster,需要将cluster-enabled设置为yes。

10.2 cluster-config-file nodes-6379.conf

所有cluster node都有一个cluster配置文件。这个文件不是为了人工编辑的,是redis自己创建的。
每个redis-node都要使用不同的cluster配置文件。一定要确保运行在同一个系统中的多个redis
cluster节点使用的是不同的redis配置文件,不要互相覆盖。

10.3 cluster-node-timeout 15000

cluster node timeout是一个节点无响应的最长毫秒数。大多数超时时间限制都是这个值的倍数。

10.4 cluster-slave-validity-factor 10

一个主节点宕机后,如果它的从节点A数据太旧(长期处于未同步状态),那么A不会触发failover,
它不会升为主。
没有一个简单方式去策略一个从节点的“数据年龄”。下面提供了两种方式来评估从节点的数据是否过老:

  • 如果有多个从节点都可以failover,他们会交换信息选出一个拥有最大复制offset的从节点(这说明这个节点从主节点那里复制了更多的数据)。各个从节点会计算各自的offset级别,在开始failover之前会延迟一段时间,具体多久取决于他们的offset级别。
  • 每个从节点计算上次和主节点交互的时间。这个交互可以是最后一次ping操作,或者是主节点推送过来的写命令,再或者是上次和主节点断开的时间。如果上次交互时间已经过去太久了,这个从节点就根本不会发起failover。
    第二点用户可以自行调整。如果一个从节点和主节点上次交互时间大于(node-timeout * slave-validity-factor) + repl-ping-slave-period
    ,从节点就不会发生failover。
    例如,如果node-timeout=30秒,slave-validity-factor=10,repl-ping-slave-period=10秒,
    如果从节点与主节点上次交互时间已经过去了310秒,那么从节点就不会做failover。
    调大slave-validity-factor会允许从节点持有过旧的数据时提升为主节点,调小这个值可能会
    导致从节点永远都无法升为主节点。
    考虑最高的可用性,可以将slave-validity-factor设置为0,这样从节点会忽略和主节点的上次
    交互时间,永远都会尝试去做failover。(但是依然会做延迟选举的操作)

10.5 cluster-migration-barrier 1

从节点可以迁移至孤儿主节点(这种主节点没有从节点)。
从节点只有在原来的主节点最少有N个从节点时才会迁移到其他的孤儿主节点,这个给定的数字N就是migration-barrier,也叫迁移临界点。migration barrier=1代表,主节点如果有2个从节点,
当集群中出现孤儿主节点时,其中一个从节点可以被迁移过去。
想要禁止从节点迁移可以将这个值设置成很大的值,例如999。
只有在debug模式才可以将这个值设置为0,生产环境别乱设置。

10.6 cluster-require-full-coverage yes

默认情况下,redis cluster在发现还有最少1个hash slot没有被分配时会禁止查询操作。
**这样的话,如果cluster出现部分宕机时,整个集群就不可用了。**只有在其他的hash slot都被分配才可以。
你可能会需要cluster的子集可以继续提供服务,要想这样,只要设置cluster-require-full-coverage no即可

10.7 cluster-slave-no-failover no

这个选项如果设置为yes,在主节点宕机是,从节点永远都不会升为主。但是主节点依然可以执行常规的failover。
在多数据中心的场景下,这个配置会比较有用,我们希望某一个数据中心永远都不要升级为主节点,否则主节点就漂移到别的数据中心了,这可能挺麻烦的。

  1. CLUSTER DOCKER/NAT 支持

11.1 集群主动告知ip

在一些特定的部署场景下,redis cluster 节点地址自动发现会失败,因为地址被NAT了,或者端口
被转发了(Docker容器中)。
为了让redis cluster在这种环境下正常工作,需要静态配置地址和端口,具体配置如下:

  • cluster-announce-ip 10.10.10.10
  • cluster-announce-port 6379
  • cluster-announce-bus-port 6380
  • 如果配置文件中没有上述配置项,那么rediscluster会使用标准的自动发现机制。*
  1. SLOW LOG

12.1 slowlog-log-slower-than 10000和slowlog-max-len 128

redis slow log是系统记录慢操作的,只要超过了给定的时间,都会记录。
执行时间不包括I/O操作的时间。
你可以通过两个参数来配置slow log:

  • slowlog-log-slower-than 10000:单位是微秒,1000000等于1秒
  • slowlog-max-len 128:slow长度,如果命令大于128,老的那个就没了,这个值没有限制,如果设置太大会占内存
    可以通过SLOWLOG RESET命令来重置这个队列。
  1. LATENCY MONITOR

13.1 latency-monitor-threshold 0

redis 延迟监控系统会在运行时抽样一部分命令来帮助用户分析redis卡顿的原因。
通过LATENCY命令可以打印一些视图和报告。
redis只会记录那些大于设定毫秒数的命令。
如果要关闭这个功能,就将latency-monitor-threshold设置为0。
默认情况下monitor是关闭的,没有延迟问题不要一直开着monitor,因为开这个功能可能会对性能有很大影响。
在运行时也可以开这个功能,执行这个命令即可:CONFIG SET latency-monitor-threshold <milliseconds>

  1. 事件通知

14.1 notify-keyspace-events ""

当特定的key space有事件发生时,redis 可以通知 pub/sub 客户端。
如果开启事件通知功能,一个client对key”foo”执行了del操作,通过pub/sub,两条消息会被推送:

  • PUBLISH keyspace@0:foo del
  • PUBLISH keyevent@0:del foo
    可以选择redis通知的事件级别。所有的级别都被标记为一个单独的字符:
  • K Keyspace events, published with keyspace@ prefix.
  • E Keyevent events, published with keyevent@ prefix.
  • g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, …
  • $ String commands
  • l List commands
  • s Set commands
  • h Hash commands
  • z Sorted set commands
  • x Expired events (events generated every time a key expires)
  • e Evicted events (events generated when a key is evicted for maxmemory)
  • A Alias for g$lshzxe, so that the “AKE” string means all the events.
    notify-keyspace-events参数接受多个字符,或者0个字符串。如果将notify-keyspace-events设置为
    空字符串,就等于禁用通知。
  1. 高级设置

15.1 ziplist相关配置

当数据量很少时,哈希值可以使用一种更高效的数据结构。这个阈值可以使用以下的配置来设置:

1
2
python复制代码hash-max-ziplist-entries 512
hash-max-ziplist-value 64

15.2 list-max-ziplist-size -2

list也可以使用一种特殊编码方式来节省内存。list底层的数据结构是quicklist,quicklist的每一个
节点都是一个ziplist,这个参数主要来控制每个ziplist的大小,如果配置正数,
那么quicklist每个ziplist中的节点数最大不会超过配置的值。
如果配置负数,就是指定ziplist的长度:

  • -5: max size: 64 Kb <– not recommended for normal workloads
  • -4: max size: 32 Kb <– not recommended
  • -3: max size: 16 Kb <– probably not recommended
  • -2: max size: 8 Kb <– good
  • -1: max size: 4 Kb <– good
    配置-2和-1是性能最高的

15.3 list-compress-depth 0

list也可以被压缩。
list底层是一个双向链表,压缩深度代表除了head和tail节点有多少node不会被压缩。
head和tail节点是永远都不会被压缩的。

  • 0: 关闭压缩
  • 1: 代表除了head和tail之外所有的内部节点都会被压缩
    [head]->node->node->…->node->[tail]
    [head], [tail] 不会被压缩; 内部 nodes 会被压缩.
  • 2: [head]->[next]->node->node->…->node->[prev]->[tail]
    head 、 head->next 、 tail->prev 和 tail四个节点不会被压缩,
    他们之间的其他节点会被压缩.
  • 3: [head]->[next]->[next]->node->node->…->node->[prev]->[prev]->[tail]
    以此类推

15.4 set-max-intset-entries 512

set也支持内部优化,当set内部元素都是64位以下的十进制整数时,这个set的底层实现会使用intset,
当添加的元素大于set-max-intset-entries时,底层实现会由intset转换为dict。

15.5 zset-max-ziplist-entries 128和zset-max-ziplist-value 64

当zset内部元素大于128,或者value超过64字节时,zset底层将不再使用ziplist

15.6 hll-sparse-max-bytes 3000

HyperLogLog稀疏表示字节限制。这个限制包括16字节的header。当HyperLogLog使用稀疏表示时,如果
达到了这个限制,它将会转换成紧凑表示。
这个值设置成大于16000是没意义的,因为16000时用紧凑表示对内存会更友好。
推荐的值是0~3000,这样可以布降低PFADD命令的执行时间时还能节省空间。如果内存空间相对cpu资源更紧张,
,可以将这个值提升到10000。

15.7 activerehashing yes

动态rehash使用每100毫秒中的1毫秒来主动为Redis哈希表做rehash操作。Redis的哈希表实现默认使用
一种lazy rehash模式:你对hash表的操作越多,越多rehash步骤会被执行,所以如果服务在空闲状态下,
rehash操作永远都不会结束,而且hash表会占用更多的内存。
默认会每秒做10次动态rehash来释放内存。

15.8 客户端传输缓冲区相关配置

client output buffer限制是用来强制断开client连接的,当client没有及时将缓冲区的数据读取完
时,redis会认为这个client可能出现宕机,就会断掉连接。

这个限制可以根据三种不同的情况去设置:

  • normal -> 一般的clients包括MONITOR client
  • slave -> 从节点 clients
  • pubsub -> pub/sub clients
    设置语法如下:
    client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
  • hard limit:如果缓冲区的数据达到了hard limit的值,redis直接断开和这个客户端的连接
  • soft limit & soft seconds:如果缓冲区的数据达到了soft limit的值,redis和这个client的连接还会保留soft seconds

默认情况下normal的clients不会有这个限制,因为normal的clients获取数据都是先执行个命令,不存在
redis主动给normal推送数据的情况。
如果设置三个0代表无限制,永远不断连接。
但是这样可能会撑爆内存。

1
2
3
arduino复制代码client-output-buffer-limit normal 0 0 0
client-output-buffer-limit slave 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60

15.9 client-query-buffer-limit 1gb

client query buffer缓冲新的命令。默认使用一个固定数量来避免protocol desynchronization。

15.10 proto-max-bulk-len 512mb

redis协议中,大多数的请求,都是string,默认value不会大于512mb,当然,你可以改这个限制。

15.11 hz 10

redis调用一些内部函数来执行很多后台任务,像关闭超时连接,清理从未请求的过期的key等等。
不是所有的后台任务都使用相同的频率来执行,redis使用hz参数来决定执行任务的频率。
默认hz是10.提高这个值会在redis空闲时消耗更多的cpu,但是同时也会让redis更主动的清理过期
key,而且清理超时连接的操作也会更精确。
这个值的范围是1~500,不过并不推荐设置大于100的值。多数的用户应该使用默认值,或者最多调高到100。

15.12 aof-rewrite-incremental-fsync yes

当子进程重写aof文件是,如果这个功能开启,redis会以每32MB的数据主动提交到文件。这种
递增提交文件到磁盘可以避免大的延迟尖刺。

15.13 LFU调优

redis lfu淘汰策略可以调优。
每个key的LFU counter只有8个key,最大值是255,所以redis使用一个基于概率的对数增长算法,
并不是每次访问key都会counter+1。当一个key被访问后,会按照以下方式去做counter+1:

  1. 在0~1之间生成一个随机数R
  2. 计算概率P=1/(old_value*lfu_log_factor+1)
  3. r<p就counter+1

默认 lfu-log-factor 是 10.
下面是不同的factor下的增长速度,可以看到,factor越小增长越快:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
lua复制代码 +--------+------------+------------+------------+------------+------------+
| factor | 100 hits | 1000 hits | 100K hits | 1M hits | 10M hits |
+--------+------------+------------+------------+------------+------------+
| 0 | 104 | 255 | 255 | 255 | 255 |
+--------+------------+------------+------------+------------+------------+
| 1 | 18 | 49 | 255 | 255 | 255 |
+--------+------------+------------+------------+------------+------------+
| 10 | 10 | 18 | 142 | 255 | 255 |
+--------+------------+------------+------------+------------+------------+
| 100 | 8 | 11 | 49 | 143 | 255 |
+--------+------------+------------+------------+------------+------------+


注意: 上表结论是执行以下命令得出的:

redis-benchmark -n 1000000 incr foo
redis-cli object freq foo

注意 2: counter初始值是5,否则key很快就被淘汰了

lfu-decay-time我还不太理解,等研究透彻后补充

lfu-log-factor 10
lfu-decay-time 1

  1. 动态碎片整理(处于试验阶段,暂不翻译)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
ini复制代码# WARNING THIS FEATURE IS EXPERIMENTAL. However it was stress tested
# even in production and manually tested by multiple engineers for some
# time.
#
# What is active defragmentation?
# -------------------------------
#
# Active (online) defragmentation allows a Redis server to compact the
# spaces left between small allocations and deallocations of data in memory,
# thus allowing to reclaim back memory.
#
# Fragmentation is a natural process that happens with every allocator (but
# less so with Jemalloc, fortunately) and certain workloads. Normally a server
# restart is needed in order to lower the fragmentation, or at least to flush
# away all the data and create it again. However thanks to this feature
# implemented by Oran Agra for Redis 4.0 this process can happen at runtime
# in an "hot" way, while the server is running.
#
# Basically when the fragmentation is over a certain level (see the
# configuration options below) Redis will start to create new copies of the
# values in contiguous memory regions by exploiting certain specific Jemalloc
# features (in order to understand if an allocation is causing fragmentation
# and to allocate it in a better place), and at the same time, will release the
# old copies of the data. This process, repeated incrementally for all the keys
# will cause the fragmentation to drop back to normal values.
#
# Important things to understand:
#
# 1. This feature is disabled by default, and only works if you compiled Redis
# to use the copy of Jemalloc we ship with the source code of Redis.
# This is the default with Linux builds.
#
# 2. You never need to enable this feature if you don't have fragmentation
# issues.
#
# 3. Once you experience fragmentation, you can enable this feature when
# needed with the command "CONFIG SET activedefrag yes".
#
# The configuration parameters are able to fine tune the behavior of the
# defragmentation process. If you are not sure about what they mean it is
# a good idea to leave the defaults untouched.

# Enabled active defragmentation
# activedefrag yes

# Minimum amount of fragmentation waste to start active defrag
# active-defrag-ignore-bytes 100mb

# Minimum percentage of fragmentation to start active defrag
# active-defrag-threshold-lower 10

# Maximum percentage of fragmentation at which we use maximum effort
# active-defrag-threshold-upper 100

# Minimal effort for defrag in CPU percentage
# active-defrag-cycle-min 25

# Maximal effort for defrag in CPU percentage
# active-defrag-cycle-max 75

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot Kafka概览、配置及优雅地实现发布

发表于 2019-12-26

本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 xiaobaiai.net

[TOC]

1 前言

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章主要介绍Spring Kafka的常用配置、主题自动创建、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka做测试配置相关内容,最后通过两种方式去实现消息的发布和订阅功能,其中一种是基于Spring Integration方式。本文内容基于Spring Kafka2.3.3文档及Spring Boot Kafka相关文档,Spring创建了一个名为Spring kafka的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka,Spring-Kafka项目提供了Apache Kafka自动化配置,通过Spring Boot的简化配置(以spring.kafka.*作为前缀的配置参数),在Spring Boot中使用Kafka特别简单。并且Spring Boot还提供了一个嵌入式Kafka代理方便做测试。

1
2
复制代码spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

实现下面的所涉及到的功能实现,需要有如下环境:

  • Java运行或开发环境(JRE/JDK)
  • Kafka安装成功

更多的配置可以参考《Kafka,ZK集群开发或部署环境搭建及实验》这一篇文章。

本文尽量做到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,通过Spring Boot中的Kafka几大注解实现发布订阅功能,同时通过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文通过自己实验和整理了较久的时间,涵盖了Spring Kafka大部分内容,希望大家耐心读下来,有什么问题随时反馈,一起学习。

2 Spring Kafka功能概览

Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性如下(截至2019年12月9日):

Spring for Apache Kafka Spring Integration for Apache Kafka Version kafka-clients
2.3.x 3.2.x 2.3.1
2.2.x 3.1.x 2.0.1, 2.1.x, 2.2.x
2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0
1.3.x 2.3.x 0.11.0.x, 1.0.x

具体更多版本特点可以看官网,spring kafka当前最新为2.3.4版本。

Spring Kafka相关的注解有如下几个:

注解类型 描述
EnableKafka 启用由AbstractListenerContainerFactory在封面(covers)下创建的Kafka监听器注解端点,用于配置类;
EnableKafkaStreams 启用默认的Kafka流组件
KafkaHandler 在用KafkaListener注解的类中,将方法标记为Kafka消息监听器的目标的注解
KafkaListener 将方法标记为指定主题上Kafka消息监听器的目标的注解
KafkaListeners 聚合多个KafkaListener注解的容器注解
PartitionOffset 用于向KafkaListener添加分区/初始偏移信息
TopicPartition 用于向KafkaListener添加主题/分区信息

如使用@EnableKafka可以监听AbstractListenerContainerFactory子类目标端点,如ConcurrentKafkaListenerContainerFactory是AbstractKafkaListenerContainerFactory的子类。

1
2
复制代码public class ConcurrentKafkaListenerContainerFactory<K,V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
1
2
3
4
5
6
7
8
9
10
11
12
复制代码@Configuration
@EnableKafka
public class AppConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
return factory;
}
// other @Bean definitions
}

@EnableKafka并不是在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,因此不需要使用显式的@EnableKafka。如果想要自己实现Kafka配置类,则需要加上@EnableKafka,如果你不想要Kafka自动配置,比如测试中,需要做的只是移除KafkaAutoConfiguration:

1
复制代码@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")

2.1 自动创建主题

💡 要在应用启动时就创建主题,可以添加NewTopic类型的Bean。如果该主题已经存在,则忽略Bean。

2.2 发送消息

Spring的KafkaTemplate是自动配置的,你可以直接在自己的Bean中自动连接它,如下例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码@Component
public class MyBean {

private final KafkaTemplate kafkaTemplate;

@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

// ...

}

KafkaTemplate包装了一个生产者,并提供了向kafka主题发送数据的方便方法。提供异步和同步(发送阻塞)方法,异步(发送非阻塞)方法返回ListenableFuture,以此监听异步发送状态,成功还是失败,KafkaTemplate提供如下接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}

sendDefault API 要求已向模板提供默认主题。部分API接受一个时间戳作为参数,并将该时间戳存储在记录中,如何存储用户提供的时间戳取决于Kafka主题上配置的时间戳类型,如果主题配置为使用CREATE_TIME,则记录用户指定的时间戳(如果未指定则生成)。如果将主题配置为使用LOG_APPEND_TIME,则忽略用户指定的时间戳,并且代理将添加本地代理时间。metrics 和 partitionsFor方法委托给底层Producer上的相同方法。execute方法提供对底层生产者的直接访问

要使用模板,可以配置一个生产者工厂并在模板的构造函数中提供它。下面的示例演示了如何执行此操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
// KafkaTemplate构造函数中输入生产者工厂配置
return new KafkaTemplate<Integer, String>(producerFactory());
}

然后,要使用模板,可以调用其方法之一发送消息。

当你使用包含Message<?>参数的方法时,主题、分区和键信息在消息头中提供,有如下子项:

1
2
3
4
复制代码KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP

如访问头部信息中某一项信息:

1
2
3
复制代码public void handleMessage(Message<?> message) throws MessagingException {
LOGGER.debug("===Received Msg Topic: {}", message.getHeaders().get(KafkaHeaders.TOPIC));
}

可选的功能是,可以使用ProducerListener配置KafkaTemplate,以获得带有发送结果(成功或失败)的异步回调,而不是等待将来完成。以下列表显示了ProducerListener接口的定义:

1
2
3
4
5
复制代码public interface ProducerListener<K, V> {
void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
void onError(String topic, Integer partition, K key, V value, Exception exception);
boolean isInterestedInSuccess();
}

默认情况下,模板配置有LoggingProducerListener,它只记录错误,在发送成功时不执行任何操作。只有当isInterestedInSuccess返回true时才调用onSuccess。
为了方便起见,如果你只想实现其中一个方法,那么将提供抽象ProducerListenerAdapter。对于isInterestedInSuccess,它返回false。下面演示了异步结果回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码public void sendMessage(String msg) {
LOGGER.info("===Producing message[{}]: {}", mTopic, msg);
ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.info("===Producing message success");
}

@Override
public void onFailure(Throwable ex) {
LOGGER.info("===Producing message failed");
}

});
}

如果希望阻止式发送线程等待结果,可以调用future的get()方法。你可能希望在等待之前调用flush(),或者为了方便起见,模板有一个带有autoFlush参数的构造函数,该构造函数在每次发送时都会导致模板flush()。不过,请注意,刷新可能会显著降低性能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);

try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}

使用DefaultKafkaProducerFactory:

如上面使用KafkaTemplate中所示,ProducerFactory用于创建生产者。默认情况下,当不使用事务时,DefaultKafkaProducerFactory会创建一个供所有客户机使用的单例生产者,如KafkaProducer javadocs中所建议的那样。但是,如果对模板调用flush(),这可能会导致使用同一个生产者的其他线程延迟。从2.3版开始,DefaultKafkaProducerFactory有一个新属性producerPerThread。当设置为true时,工厂将为每个线程创建(和缓存)一个单独的生产者,以避免此问题。

当producerPerThread为true时,当不再需要生产者时,用户代码必须在工厂上调用closeThreadBoundProducer()。这将实际关闭生产者并将其从ThreadLocal中移除。调用reset()或destroy()不会清理这些生产者。

创建DefaultKafkaProducerFactory时,可以通过调用只接受属性映射的构造函数(请参阅使用KafkaTemplate中的示例)从配置中获取键和/或值序列化器类,或者序列化程序实例可以传递给DefaultKafkaProducerFactory构造函数(在这种情况下,所有生产者共享相同的实例)。或者,可以提供Supplier<Serializer> s(从版本2.3开始),用于为每个生产者获取单独的Serializer实例:

1
2
3
4
5
6
7
8
9
复制代码@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

使用ReplyingKafkaTemplate:

版本2.1.3引入了KafkaTemplate的一个子类来提供请求/应答语义。这个类名为ReplyingKafkaTemplate,并且有一个方法(除了超类中的那些方法之外)。下面的列表显示了方法签名:

1
2
3
复制代码RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);

结果是一个ListenableFuture,它被结果异步填充(或者超时时出现异常)。结果还有一个sendFuture属性,这是调用KafkaTemplate.send()的结果。你可以使用此Future确定发送操作的结果。这里就不展开了。

2.3 接收消息

可以通过配置MessageListenerContainer并提供消息监听器或使用@KafkaListener注解来接收消息。

2.3.1 消息监听器

使用消息监听器容器(message listener container)时,必须提供监听器才能接收数据。目前有八个消息监听器支持的接口。下面的列表显示了这些接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
复制代码// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 作接收的单个ConsumerRecord实例
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。提供对使用者对象的访问。
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。提供对使用者对象的访问。
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

上述消费者对象不是线程安全的。只能在调用侦听器的线程上调用其方法。

2.3.1.1 消息监听器容器

提供了两个MessageListenerContainer的实现:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer从单个线程上的所有主题或分区接收所有消息(即一个分区只能分配到一个消费者,一个消费者可以被分配多个分区)。ConcurrentMessageListenerContainer委托给一个或多个KafkaMessageListenerContainer实例,以提供多线程使用,从多线程上去处理主题或分区的所有消息。

从Spring Kafka2.2.7版开始,你可以将RecordInterceptor添加到侦听器容器中;在调用侦听器以允许检查或修改记录之前,将调用它。如果拦截器返回null,则不调用侦听器。侦听器是批处理侦听器时不调用侦听器。从2.3版开始,CompositeRecordInterceptor可用于调用多个拦截器。

默认情况下,使用事务时,侦听器在事务启动后调用。从2.3.4版开始,你可以设置侦听器容器的interceptBeforeTx属性,以便在事务启动之前调用侦听器。没有为批处理侦听器提供侦听器,因为Kafka已经提供了ConsumerInterceptor。

2.3.1.2 使用KafkaMessageListenerContainer

有如下构造函数可用:

1
2
3
4
5
复制代码public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties,
TopicPartitionOffset... topicPartitions)

每个都获取一个ConsumerFactory以及有关主题和分区的信息,以及ContainerProperties对象中的其他配置。ConcurrentMessageListenerContainer(稍后介绍)使用第二个构造函数跨使用者实例分发TopicPartitionOffset。ContainerProperties具有以下构造函数:

1
2
3
复制代码public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)

第一个构造函数接受一个TopicPartitionOffset参数数组来显式地指示容器要使用哪些分区(使用消费者的 assign()方法)和可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值是相对于分区内的当前最后偏移量。提供了TopicPartitionOffset的构造函数,该构造函数接受一个附加的布尔参数。如果是true,则初始偏移(正偏移或负偏移)相对于该消耗器的当前位置。容器启动时应用偏移量。第二个是主题数组,Kafka基于group.id属性:在组中分布分区来分配分区。第三个使用regex表达式来选择主题。

要将MessageListener分配给容器,可以在创建容器时使用ContainerProps.setMessageListener方法。下面的示例演示了如何执行此操作:

1
2
3
4
5
6
7
8
9
复制代码ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

注意当创建一个Defaultkafkafkaconsumerfactory时,使用构造器,该构造器仅以其特性为基础,就意味着从配置中获取了key/value的Deserializer类别。或者,反序列化程序实例可以传递给key/value的DefaultKafkaConsumerFactory构造函数,在这种情况下,所有消费者共享相同的实例。另一个选项是提供Supplier<Deserializer>s(从版本2.3开始),用于为每个使用者获取单独的反序列化程序实例:

1
2
3
4
5
复制代码DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关可以设置的各种属性的更多信息,请参阅Javadoc 中ContainerProperties。

从版本Spring Kafka 2.1.1开始,一个名为logContainerConfig的新属性就可用了。当启用true和INFO日志记录时,每个侦听器容器都会写入一条日志消息,总结其配置属性。

例如,要将日志级别更改为INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)。

从版本Spring Kafka 2.2开始,添加了名为missingtopicsfailal的新容器属性(默认值:true)。如果代理上不存在任何客户端发布或订阅涉及到的主题,这将阻止容器启动。如果容器配置为侦听主题模式(regex),则不适用。以前,容器线程在consumer.poll()方法中循环,等待在记录许多消息时出现主题。除了日志,没有迹象表明有问题。要恢复以前的行为,可以将属性设置为false,这个时候,Broker设置项allow.auto.create.topics=true,且这个容器属性为false,则会自动创建不存在的topic。

2.3.1.3 使用 ConcurrentMessageListenerContainer

单个构造函数类似于第一个KafkaListenerContainer构造函数。下面的列表显示了构造函数的签名:

1
2
复制代码public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)

它还有一个并发属性。例如,container.setConcurrency(3)即表示创建三个KafkaMessageListenerContainer实例。对于第一个构造函数,Kafka使用它的组管理功能将分区分布到消费者之间。

当监听多个主题时,默认的分区分布可能不是你期望的那样。例如,如果你有三个主题,每个主题有五个分区,并且希望使用concurrency=15,那么你只看到五个活动的消费者,每个消费者从每个主题中分配一个分区,其他十个消费者处于空闲状态。这是因为默认的Kafka PartitionAssignor是RangeAssignor(参见其Javadoc)。对于这种情况,你可能需要考虑改用RoundRobinAssignor,它将分区分布到所有使用者。然后,为每个使用者分配一个主题或分区。若要更改PartitionAssignor,你可以在提供给DefaultKafkaConsumerFactory的属性中设置partition.assignment.strategy消费者配置参数(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

使用Spring Boot时,可以按如下方式分配设置策略:

1
2
复制代码spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

对于第二个构造函数,ConcurrentMessageListenerContainer将TopicPartition实例分布在委托KafkaMessageListenerContainer实例上。

例如,如果提供了六个TopicPartition实例,并发性为3;每个容器得到两个分区。对于五个TopicPartition实例,两个容器得到两个分区,第三个容器得到一个分区。如果并发性大于TopicPartitions的数量,则会向下调整并发性,以便每个容器获得一个分区。调整分区的方式可以使用命令行工具kafka-topics.sh查询和调整主题上的分区数。还可以添加一个NewTopic Bean,如果NewTopic设定的数目大于当前数目,spring boot的自动配置的KafkaAdmin将向上调整分区。

client.id属性(如果已设置)将附加-n,其中n是对应于并发的消费者实例。当启用JMX时,这是为MBeans提供唯一名称所必需的。

从版本Spring Kafka 1.3开始,MessageListenerContainer提供了对底层KafkaConsumer的度量的访问。对于ConcurrentMessageListenerContainer,metrics()方法返回所有目标KafkaMessageListenerContainer实例的度量(metrics)。根据为底层KafkaConsumer提供的client-id度量被分组到Map<MetricName, ?extends Metric>。

从2.3版开始,ContainerProperties提供了一个idleBetweenPolls选项,允许侦听器容器中的主循环在KafkaConsumer.poll()调用之间睡眠。从提供的选项中选择实际睡眠间隔作为最小值,并且选择max.poll.interval.ms 消费者配置和当前记录批处理时间之间的差异。

2.3.1.4 提交偏移量

提供了几个提交偏移量的选项。如果enable.auto.commit使用者属性为true,则Kafka将根据其配置自动提交偏移量。如果为false,则容器支持多个AckMode设置(在下一个列表中描述)。默认的确认模式是批处理。从2.3版开始,框架将enable.auto.commit设置为false,除非在配置中显式设置。以前,如果未设置属性,则使用Kafka默认值(true)。消费者 poll()方法返回一个或多个ConsumerRecords。为每个记录调用MessageListener。以下列表描述了容器对每个AckMode采取的操作:

  • RECORD: 当侦听器在处理记录后返回时提交偏移量。
  • BATCH: 处理完poll()返回的所有记录后提交偏移量。
  • TIME: 在处理完poll()返回的所有记录后提交偏移量,只要超过上次提交后的ackTime
  • COUNT: 在处理完poll()返回的所有记录后提交偏移量,只要上次提交后收到ackCount记录。
  • COUNT_TIME: 类似于TIME和COUNT,但如果两个条件都为true,则执行提交。
  • MANUAL: 消息侦听器负责acknowledge()和Acknowledgment。之后,应用与BATCH相同的语义。
  • MANUAL_IMMEDIATE: 侦听器调用Acknowledgement.acknowledge()方法时立即提交偏移量。

MANUAL和MANUAL_IMMEDIATE 要求侦听器是AcknowledgingMessageListener 或BatchAcknowledgingMessageListener。请参见消息侦听器。

根据syncCommits容器属性,使用消费者上的commitSync()或commitAsync()方法。默认情况下,syncCommits为true;另请参阅setSyncCommitTimeout。请参阅setCommitCallback以获取异步提交的结果;默认回调是LoggingCommitCallback,它记录错误(以及调试级别的成功)。

因为侦听器容器有自己的提交偏移的机制,所以它希望Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG为false。从2.3版开始,除非在使用者工厂或容器的使用者属性重写中特别设置,否则它将无条件地将其设置为false。

Acknowledgment有以下方法:

1
2
3
复制代码public interface Acknowledgment {
void acknowledge();
}

此方法使侦听器可以控制何时提交偏移。

从版本2.3开始,确认接口有两个附加方法nack(long sleep)和nack(int index, long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引发IllegalStateException。

nack()只能在调用侦听器的消费者线程上调用。

使用批处理侦听器时,可以在发生故障的批内指定索引。调用nack()时,将在对失败和丢弃的记录的分区执行索引和查找之前提交记录的偏移量,以便在下次poll()时重新传递这些偏移量。这是对SeekToCurrentBatchErrorHandler的改进,SeekToCurrentBatchErrorHandler只能查找整个批次以便重新交付。

注意:通过组管理使用分区分配时,确保sleep参数(加上处理上一次轮询记录所花费的时间)小于consumer max.poll.interval.ms属性非常重要。

2.3.1.5 侦听器容器自动启动和手动启动

侦听器容器实现了SmartLifecycle(通过SmartLifecycle在Spring加载和初始化所有bean后,接着执行一些任务或者启动需要的异步服务),默认情况下autoStartup为true。容器在后期启动(Integer.MAX-VALUE - 100)。实现SmartLifecycle以处理来自侦听器的数据的其他组件应该在较早的阶段启动。-100为以后的阶段留出了空间,使组件能够在容器之后自动启动。比如我们通过@Bean将监听器容器交给Spring管理,这个时候通过SmartLifecycle自动执行了初始化的任务,但是当我们手动通过new监听器容器实例,则后初始化则不会执行,比如KafkaMessageListenerContainer实例需要手动执行start()。

autoStartup在手动执行start中设置true与false没有作用,可以参见@KafkaListener声明周期管理这一小节。

2.3.2 @KafkaListener注解

2.3.2.1 Record Listeners

@KafkaListener注解用于将bean方法指定为侦听器容器的侦听器。bean包装在一个MessagingMessageListenerAdapter中,该适配器配置有各种功能,如转换器,用于转换数据(如有必要)以匹配方法参数。通过使用属性占位符(${…}),或者可以使用SpEL(#{…​})配置注释上的大多数属性。有关更多信息,请参阅Javadoc。

@KafkaListener:

  • id:listener唯一id,当GroupId没有被配置的时候,默认id为自动产生,此值指定后会覆盖group id。
  • containerFactory:上面提到了@KafkaListener区分单数据还是多数据消费只需要配置一下注解的containerFactory属性就可以了,这里面配置的是监听容器工厂,也就是ConcurrentKafkaListenerContainerFactory,配置Bean名称
  • topics:需要监听的Topic,可监听多个,可以是表达式或者占位符关键字或者直接是主题名称,如多个主题监听:{"topic1" , "topic2"}
  • topicPattern: 此侦听器的主题模式。条目可以是“主题模式”、“属性占位符键”或“表达式”。框架将创建一个容器,该容器订阅与指定模式匹配的所有主题,以获取动态分配的分区。模式匹配将针对检查时存在的主题周期性地执行。表达式必须解析为主题模式(支持字符串或模式结果类型)。这使用组管理,Kafka将为组成员分配分区。
  • topicPartitions:用于使用手动主题/分区分配时
  • errorHandler:监听异常处理器,配置Bean名称,默认为空
  • groupId:消费组ID
  • idIsGroup:id是否为GroupId
  • clientIdPrefix:消费者Id前缀
  • beanRef:真实监听容器的Bean名称,需要在 Bean名称前加 “__“

@KafkaListener注解为简单的POJO侦听器提供了一种机制。下面的示例演示如何使用它:

1
2
3
4
5
6
复制代码public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}

此机制生效需要@Configuration类之一上的@EnableKafka注解和用于配置基础ConcurrentMessageListenerContainer的侦听器容器工厂。默认情况下,需要名为kafkaListenerContainerFactory的bean。以下示例演示如何使用ConcurrentMessageListenerContain:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码@Configuration
@EnableKafka
public class KafkaConfig {

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}

注意,要设置容器属性,必须在工厂上使用getContainerProperties()方法。它用作注入容器的实际属性的模板。

从版本2.1.1开始,现在可以为注解创建的消费者设置client.id属性。clientdprefix的后缀是-n,其中n是一个整数,表示使用并发时的容器号。

从2.2版开始,现在可以通过使用批注本身的属性来重写容器工厂的并发性和自动启动属性。属性可以是简单值、属性占位符或SpEL表达式。下面的示例演示了如何执行此操作:

1
2
3
4
5
复制代码@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}

你还可以使用显式主题和分区(以及可选的初始偏移量)配置POJO侦听器。下面的示例演示了如何执行此操作:

1
2
3
4
5
6
7
8
复制代码@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}

你可以在partitions或partitionOffsets属性中指定每个分区,但不能同时指定两者。

使用手动AckMode时,还可以向侦听器提供Acknowledgment。下面的示例还演示了如何使用不同的容器工厂:

1
2
3
4
5
6
复制代码@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}

最后,可以从消息头获得有关消息的元数据。你可以使用以下头名称来检索消息头内容:

1
2
3
4
5
6
复制代码KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE

示例:

1
2
3
4
5
6
7
8
9
复制代码@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
2.3.2.2 批处理侦听器

从版本1.1开始,可以配置@KafkaListener方法来接收从消费者接收的整批消费者记录。要将侦听器容器工厂配置为创建批处理侦听器,可以设置batchListener属性。下面的示例演示了如何执行此操作:

1
2
3
4
5
6
7
8
复制代码@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}

以下示例显示如何接收有效载荷列表:

1
2
3
4
复制代码@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}

主题、分区、偏移量等在与有效负载并行的头中可用。下面的示例演示如何使用标题:

1
2
3
4
5
6
7
8
复制代码@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}

或者,您可以接收消息列表Message<?>对象,其中包含每个偏移量和每个消息中的其他详细信息,但它必须是唯一的参数(除了使用手动提交时的Acknowledgment和/或Consumer<?, ?>参数)。下面的示例演示如何执行此操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}

在这种情况下,不会对有效载荷执行转换。如果BatchMessagingMessageConverter配置了RecordMessageConverter,则还可以向消息参数添加泛型类型,并转换有效负载。有关详细信息,请参阅使用批处理侦听器的负载转换。

你还可以收到一个ConsumerRecord<?, ?>对象,但它必须是唯一的参数(当使用手动提交或Consumer<?, ?>参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:

1
2
3
4
5
6
7
8
9
复制代码@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}

从版本2.2开始,侦听器可以接收poll()方法返回的完整的ConsumerRecords<?, ?>对象,允许侦听器访问其他方法,例如partitions()(返回列表中的TopicPartition实例)和records(TopicPartition)(获取选择性记录)。同样,这必须是唯一的参数(当使用手动提交或Consumer<?, ?>参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操作:

1
2
3
4
复制代码@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}

2.3.3 @KafkaListener@Payload验证

从2.2版开始,现在更容易添加验证程序来验证@KafkaListener``@Payload参数。以前,你必须配置一个自定义的DefaultMessageHandlerMethodFactory并将其添加到注册器中。现在,你可以将验证器添加到注册器本身。以下代码说明了如何执行此操作:

1
2
3
4
5
6
7
8
9
10
11
复制代码@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

...

@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
}
}

当你在Spring Boot使用validation starter,会自动配置LocalValidatorFactoryBean,如下例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

@Autowired
private LocalValidatorFactoryBean validator;
...

@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}

以下示例演示如何验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码public static class ValidatedClass {

@Max(10)
private int bar;

public int getBar() {
return this.bar;
}

public void setBar(int bar) {
this.bar = bar;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
复制代码@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}

2.3.4 重新平衡监听者

ContainerProperties有一个名为consumerRebalanceListener的属性,该属性接受Kafka客户端的consumerRebalanceListener接口的实现。如果未提供此属性,则容器将配置日志侦听器,该侦听器将在信息级别记录重新平衡事件。该框架还添加了一个子接口ConsumerRawareRebalanceListener。以下列表显示了ConsumerRawareRebalanceListener接口定义:

1
2
3
4
5
复制代码public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}

2.3.5 转发监听者消息

从2.0版开始,如果还使用@SendTo注解注释@KafkaListener,并且方法调用返回结果,则结果将转发到@SendTo指定的主题。如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
复制代码@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

@KafkaHandler
public String foo(String in) {
...
}

@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}

}

2.3.6 @KafkaListener生命周期管理

为@KafkaListener注解创建的侦听器容器不是应用程序上下文中的bean。相反,它们是用KafkaListenerEndpointRegistry类型的基础设施bean注册的。这个bean由框架自动声明并管理容器的生命周期;它将自动启动任何autoStartup设置为true的容器。所有容器工厂创建的所有容器必须处于同一phase。有关详细信息,请参阅侦听器容器自动启动。你可以使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,可以通过使用单个容器的id属性来获取对该容器的引用。可以在批注上设置autoStartup,这将覆盖容器工厂中配置的默认设置(setAutoStartup(true))。你可以从应用程序上下文中获取对bean的引用,例如自动连接,以管理其注册的容器。以下示例说明了如何执行此操作:

1
2
复制代码@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
1
2
3
4
5
6
7
8
复制代码@Autowired
private KafkaListenerEndpointRegistry registry;

...

this.registry.getListenerContainer("myContainer").start();

...

注册表只维护其管理的容器的生命周期;声明为bean的容器不受注册表管理,可以从应用程序上下文中获取。可以通过调用注册表的getListenerContainers()方法来获取托管容器的集合。Spring Kafka版本2.2.5添加了一个方便方法getAllListenerContainers(),它返回所有容器的集合,包括由注册表管理的容器和声明为bean的容器。返回的集合将包括任何已初始化的原型bean,但它不会初始化任何延迟bean声明。

2.4 流处理

Spring for Apache Kafka提供了一个工厂bean来创建StreamsBuilder对象并管理其流的生命周期。只要kafka流在classpath上并且kafka流通过@EnableKafkaStreams注解开启,Spring Boot就会自动配置所需的KafkaStreamsConfiguration bean。

启用Kafka流意味着必须设置应用程序id和引导服务器(bootstrap servers)。前者可以使用spring.kafka.streams.application-id配置,如果未设置,则默认为spring.application.name。后者可以全局设置,也可以专门为流覆写。

使用专用属性可以使用其他几个属性;可以使用spring.Kafka.streams.properties命名空间设置其他任意Kafka属性。有关详细信息,Additional Kafka Properties 。

默认情况下,由它创建的StreamBuilder对象管理的流将自动启动。可以使用spring.kafka.streams.auto-startup属性自定义此行为。

要使用工厂bean,只需将StreamsBuilder连接到@bean,如下例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {

@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}

}

默认情况下,由它创建的StreamBuilder对象管理的流将自动启动。可以使用spring.kafka.streams.auto-startup属性自定义此行为。

2.5 附加配置

自动配置支持的属性显示在公用应用程序属性中。注意,在大多数情况下,这些属性(连字符或驼峰样式)直接映射到Apache Kafka点式属性。有关详细信息,请参阅Apache Kafka文档。

前面提到的几个属性应用于所有组件(生产者、消费者、管理员和流),但如果希望使用不同的值,则可以在组件级别指定。Apache Kafka指定重要性为HIGH、MEDIUM或LOW的属性。Spring Boot自动配置支持所有高重要性属性、某些选定的中、低属性以及任何没有默认值的属性。

只有Kafka支持的属性的一个子集可以通过KafkaProperties类直接使用,如果要使用不直接支持的其他属性配置生产者或消费者,请使用以下属性:

1
2
3
4
5
复制代码spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

上面的参数设置示例将公共prop.oneKafka属性设置为first(适用于生产者、消费者和管理员),prop.two admin属性设置为second,prop.three consumer属性设置为third,prop.four producer属性设置为fourth,prop.five streams属性设置为fifth。

你还可以配置Spring Kafka JsonDeserializer,如下所示:

1
2
3
复制代码spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

类似地,可以禁用JsonSerializer在头中发送类型信息的默认行为:

1
2
复制代码spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

注意: 以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。

2.6 使用Embdded Kafka做测试

Spring for Apache Kafka提供了一种使用嵌入式Apache Kafka代理测试项目的便捷方法。要使用此功能,请使用Spring Kafka测试模块中的@EmbeddedKafka注解测试类。有关更多信息,请参阅Spring For Apache Kafka参考手册。

要使Spring Boot自动配置与前面提到的嵌入式Apache Kafka代理一起工作,需要将嵌入式代理地址(由EmbeddedKafkaBroker填充)的系统属性重新映射到Apache Kafka的Spring Boot配置属性中。有几种方法可以做到这一点:

  • 提供系统属性以将嵌入的代理地址映射到测试类中的spring.kafka.bootstrap-servers:
1
2
3
复制代码static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
  • 在@EmbeddedKafka注解上配置属性名:
1
2
复制代码@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  • 在配置属性中使用占位符:
1
复制代码spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

2.7 Spring Integration支持

Spring Integration也有Kafka的适配器,因此我们可以很方便的采用Spring Integration去实现发布订阅,当然你也可以不使用Spring Integration。

Spring Integration是什么,具体有什么作用,可以参考另一篇文章《Spring Integration最详解》。

3 Spring Kafka配置参数

这里对所有配置做个说明的是,spring kafka配置分全局配置和子模块配置,子模块配置会复写全局配置,比如SSL认证可以全局配置,但是也可以在每个子模块,如消费者、生产者、流式处理中都可以单独配置SSL(可能是微服务部署,消费者和生产者不在同一个应用中)。这里重点介绍生产者和消费者配置吧,其他就不展开了,用到的时候再去查找和补充。

3.1 全局配置

1
2
3
4
5
6
7
8
复制代码# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.bootstrap-servers
# 在发出请求时传递给服务器的ID。用于服务器端日志记录
spring.kafka.client-id,默认无
# 用于配置客户端的其他属性,生产者和消费者共有的属性
spring.kafka.properties.*
# 消息发送的默认主题,默认无
spring.kafka.template.default-topic

3.2 生产者

Spring Boot中,Kafka 生产者相关配置(所有配置前缀为spring.kafka.producer.):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码# 生产者要求Leader在考虑请求完成之前收到的确认数
spring.kafka.producer.acks
# 默认批量大小。较小的批处理大小将使批处理不太常见,并可能降低吞吐量(批处理大小为零将完全禁用批处理)
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
# 生产者可用于缓冲等待发送到服务器的记录的总内存大小。
spring.kafka.producer.buffer-memory
# 在发出请求时传递给服务器的ID。用于服务器端日志记录。
spring.kafka.producer.client-id
# 生产者生成的所有数据的压缩类型
spring.kafka.producer.compression-type
# 键的序列化程序类
spring.kafka.producer.key-serializer
spring.kafka.producer.properties.*
# 大于零时,启用失败发送的重试次数
spring.kafka.producer.retries
spring.kafka.producer.ssl.key-password
spring.kafka.producer.ssl.key-store-location
spring.kafka.producer.ssl.key-store-password
spring.kafka.producer.ssl.key-store-type
spring.kafka.producer.ssl.protocol
spring.kafka.producer.ssl.trust-store-location
spring.kafka.producer.ssl.trust-store-password
spring.kafka.producer.ssl.trust-store-type
# 非空时,启用对生产者的事务支持
spring.kafka.producer.transaction-id-prefix
spring.kafka.producer.value-serializer

3.3 消费者

Spring Boot中,Kafka 消费者相关配置(所有配置前缀为spring.kafka.consumer.):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
复制代码# 如果“enable.auto.commit”设置为true,设置消费者偏移自动提交到Kafka的频率,默认值无,单位毫秒(ms)
spring.kafka.consumer.auto-commit-interval
# 当Kafka中没有初始偏移或服务器上不再存在当前偏移时策略设置,默认值无,latest/earliest/none三个值设置
# earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset
# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.consumer.bootstrap-servers
# 在发出请求时传递给服务器的ID,用于服务器端日志记录
spring.kafka.consumer.client-id
# 消费者的偏移量是否在后台定期提交
spring.kafka.consumer.enable-auto-commit
# 如果没有足够的数据来立即满足“fetch-min-size”的要求,则服务器在取回请求之前阻塞的最大时间量
spring.kafka.consumer.fetch-max-wait
# 服务器应为获取请求返回的最小数据量。
spring.kafka.consumer.fetch-min-size
# 标识此消费者所属的默认消费者组的唯一字符串
spring.kafka.consumer.group-id
# 消费者协调员的预期心跳间隔时间。
spring.kafka.consumer.heartbeat-interval
# 用于读取以事务方式写入的消息的隔离级别。
spring.kafka.consumer.isolation-level
# 密钥的反序列化程序类
spring.kafka.consumer.key-deserializer
# 在对poll()的单个调用中返回的最大记录数。
spring.kafka.consumer.max-poll-records
# 用于配置客户端的其他特定于消费者的属性。
spring.kafka.consumer.properties.*
# 密钥存储文件中私钥的密码。
spring.kafka.consumer.ssl.key-password
# 密钥存储文件的位置。
spring.kafka.consumer.ssl.key-store-location
# 密钥存储文件的存储密码。
spring.kafka.consumer.ssl.key-store-password
# 密钥存储的类型,如JKS
spring.kafka.consumer.ssl.key-store-type
# 要使用的SSL协议,如TLSv1.2, TLSv1.1, TLSv1
spring.kafka.consumer.ssl.protocol
# 信任存储文件的位置。
spring.kafka.consumer.ssl.trust-store-location
# 信任存储文件的存储密码。
spring.kafka.consumer.ssl.trust-store-password
# 信任存储区的类型。
spring.kafka.consumer.ssl.trust-store-type
# 值的反序列化程序类。
spring.kafka.consumer.value-deserializer

3.4 监听器

Spring Boot中,Kafka Listener相关配置(所有配置前缀为spring.kafka.listener.):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码# ackMode为“COUNT”或“COUNT_TIME”时偏移提交之间的记录数
spring.kafka.listener.ack-count=
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
# 如果Broker上不存在至少一个配置的主题(topic),则容器是否无法启动,
# 该设置项结合Broker设置项allow.auto.create.topics=true,如果为false,则会自动创建不存在的topic
spring.kafka.listener.missing-topics-fatal=true
# 非响应消费者的检查间隔时间。如果未指定持续时间后缀,则将使用秒作为单位
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type

3.5 管理

1
2
3
4
5
6
7
8
9
10
11
12
复制代码spring.kafka.admin.client-id
# 如果启动时代理不可用,是否快速失败
spring.kafka.admin.fail-fast=false
spring.kafka.admin.properties.*
spring.kafka.admin.ssl.key-password
spring.kafka.admin.ssl.key-store-location
spring.kafka.admin.ssl.key-store-password
spring.kafka.admin.ssl.key-store-type
spring.kafka.admin.ssl.protocol
spring.kafka.admin.ssl.trust-store-location
spring.kafka.admin.ssl.trust-store-password
spring.kafka.admin.ssl.trust-store-type

3.6 授权服务(JAAS)

1
2
3
4
复制代码spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=false
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule
spring.kafka.jaas.options.*

3.7 SSL认证

1
2
3
4
5
6
7
8
复制代码spring.kafka.ssl.key-password
spring.kafka.ssl.key-store-location
spring.kafka.ssl.key-store-password
spring.kafka.ssl.key-store-type
spring.kafka.ssl.protocol
spring.kafka.ssl.trust-store-location
spring.kafka.ssl.trust-store-password
spring.kafka.ssl.trust-store-type

3.8 Stream流处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码spring.kafka.streams.application-id
spring.kafka.streams.auto-startup
spring.kafka.streams.bootstrap-servers
spring.kafka.streams.cache-max-size-buffering
spring.kafka.streams.client-id
spring.kafka.streams.properties.*
spring.kafka.streams.replication-factor
spring.kafka.streams.ssl.key-password
spring.kafka.streams.ssl.key-store-location
spring.kafka.streams.ssl.key-store-password
spring.kafka.streams.ssl.key-store-type
spring.kafka.streams.ssl.protocol
spring.kafka.streams.ssl.trust-store-location
spring.kafka.streams.ssl.trust-store-password
spring.kafka.streams.ssl.trust-store-type
spring.kafka.streams.state-dir

4 Kafka订阅发布基本特性回顾

  • 同一消费组下所有消费者协同消费订阅主题的所有分区
    • 同消费组,多消费者订阅单主题单分区,则分区只会分配给其中一个消费者,除非这个消费者挂掉,才会分配给其他一个消费者消费消息,意思就是其他消费者在旁边看着吃东西
    • 同消费组,N个消费者订阅单主题N个分区,则默认每个消费者都会被分配一个分区
    • 同消费组,N个消费者订阅单主题M个分区,当M > N时,则会有消费者多分配多于一个分区的情况;当M < N时,则会有空闲消费者,类似第一条
    • 所有上面所说的消费者实例可以是线程方式或者是进程方式存在,所说的分区分配机制叫做重平衡(rebalance)
    • 当消费者内成员个数发生变化会触发重平衡;订阅的主题个数发生变化会触发重平衡;订阅的主题分区个数发生变化会触发重平衡;
    • 总之就是一个分区只能分配到一个消费者,一个消费者可以被分配多个分区
  • 消费者offset管理机制
    • 每个主题分区中的消息都有一个唯一偏移值,具有先后顺序,与消费者具有对应关系,消费者每消费一条消息,偏移量加1,并记录在消费者本地,并定期的将记录同步到服务端(Broker),这里的同步机制是可以设置的
    • 消息是被持久化的,当组内所有消费者重新订阅主题时,可以设置是否从头开始消费消息或者是从最后记录的偏移值位置开始消费
  • 分区和消费者个数如何设置
    • 我们知道主题分区是分布在不同的Broker上的,每个分区对应一个消费者,从而具有消息处理具有很高的吞吐量
    • 分区是调优Kafka并行度的最小单元,多线程消费者连接多分区消费消息,在实现上,通过socket连接,因此也会占用文件句柄个数
    • 创建分区都是会占用一定内存的,并不是分区越多越好,当然现在kafka社区在优化这一部分,让分区数达到更大,性能也不会有所影响

具体怎么调优副本、分区、消费者等这里就不展开了,后面专门来研究这个问题。

5 发布订阅示例

实现下面的示例需要的环境:

  • Kafka + Zookeeper单点服务器或集群已配置好(如果环境搭建不熟悉,可以去翻看前面写的关于Kafka的环境搭建和测试那一篇),或者是使用Spring-kafka-test embedded Kafka Server
  • Spring Boot开发环境(2.2.1)
    • JDK(1.8或以上)
    • STS(4.4.RELEASE)
    • MARVEN构建方式

5.1 使用Embedded Kafka Server

我们知道Kafka是Scala+Zookeeper构建的,可以从官方网站下载部署包并在本地部署。不过,Spring Kafka Test已经封装了Kafka测试的带注解的一键式功能,以打开Kafka服务器,从而简化了验证Kafka相关功能的开发过程,使用起来也非常简单。

添加依赖:

1
2
3
4
5
复制代码<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

启动服务,下面使用Junit测试用例直接启动Kafka服务器服务,包括四个代理节点,Run as JUnit Test。:

1
2
3
4
5
6
7
8
9
复制代码@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
@Test
public void contextLoads()throws IOException {
System.in.read();
}
}

@EmbeddedKafka中可以设置相关参数:

  • value: 设置创建代理的个数
  • count: 同value
  • ports: 代理端口号列表
  • brokerPropertiesLocation:指定配置文件,如 “classpath:application.properties”

注意:EmbeddedKafka这样默认是没有创建主题的。会提示Topic(s) [test] is/are not present and missingTopicsFatal is true错误。@EmbeddedKafka默认情况是创建一个代理,该代理具有一个不带任何参数的随机端口,它将在启动日志中输出特定端口和默认配置项。

5.2 简单的发布订阅实现(无自定义配置)

下面实现一个简单发布订阅功能,通过前端WEB调用一个API,然后在该API控制器中得到请求后生产者开始发送消息,消费者后台监听消息,如果收到消费者消息,则打印出来。

5.2.1 添加依赖及配置Kafka

添加Kafka依赖:

1
2
3
4
复制代码<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

配置Kafka,这里消费者和生产者在同一应用中,我们只需要配置Kafka Brokers的服务地址+端口:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码server: 
port: 9000
spring:
kafka:
bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094
listener:
# 设置不监听主题错误,false时,如果broker设置了llow.auto.create.topics = true,生产者发送到未创建主题时,会默认自动创建主题
# 且默认创建的主题是单副本单分区的
missing-topics-fatal: false
consumer:
# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
auto-offset-reset: earliest

5.2.2 添加生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码@Service
public class Producer {

private static final Logger LOGGER = LogManager.getLogger(Producer.class);
private static final String TOPIC = "users";

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String message) {
LOGGER.info(String.format("===Producing message: {}", message));
this.kafkaTemplate.send(TOPIC, message);
}
}

5.2.3 添加消费者

1
2
3
4
5
6
7
8
9
10
11
复制代码@Service
public class Consumer {

private static final Logger LOGGER = LogManager.getLogger(Consumer.class);

@KafkaListener(topics = "test", groupId = "group_test")
public void consume(String message) throws IOException {
LOGGER.info(String.format("#### -> Consumed message -> %s", message));
}

}

5.2.4 添加WEB控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

private final Producer producer;

@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}

@GetMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.sendMessage(message);
}
}

5.2.5 测试

添加Spring Boot Application:

1
2
3
4
5
6
7
复制代码@SpringBootApplication
public class TestKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(TestKafkaApplication.class, args);
}

}

启动Kafka Brokers后,需要手动创建主题(如果想自动创建,则需要借助KafkaAdmin,或者是Kafka Broker设置了allow.auto.create.topics=true且应用设置了listener.missing-topics-fatal=false):

1
2
3
复制代码# 如果对kafka-topics.sh这里不熟悉,可以去翻看前面写的关于Kafka的相关文章(环境搭建和测试那一篇)
# 创建test主题
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test

打开浏览器测试:

1
复制代码http://localhost:9000/kafka/publish?message=hello

则应用控制台会打印hello。整个发布订阅的实现只使用了跟Kafka相关的@KafkaListener注解接收消息和KafkaTemplate模板发送消息,很是简单。

5.3 基于自定义配置发布订阅实现

上面是简单的通过Spring Boot依赖的Spring Kafka配置即可快速实现发布订阅功能,这个时候我们是无法在程序中操作这些配置的,因此这一小节就是利用我们之前《Spring Boot从零入门7_最新配置文件配置及优先级详细介绍》文章中讲述的自定义配置文件方式去实现发布订阅功能。

实现内容有:

  • 自定义Kafka配置参数文件(非application.properties/yml)
  • 可实现多生产者(每个生产者为单服务单线程),多消费者(非@KafkaListener实现消息监听)
  • 支持SSL安全配置
  • 监听生产者

源码不会直接贴,只给出主体部分。

配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码@Configuration
@ConfigurationProperties(prefix = "m2kc")
@PropertySource("classpath:kafka.properties")
@Validated
public class M2KCKafkaConfig {

@Value("${m2kc.kafka.bootstrap.servers}")
private String kafkaBootStrapServers;

@Value("${m2kc.kafka.key.serializer.class}")
private String kafkaKeySerializerClass;

......
......
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
复制代码@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaProducer {
private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class);
private String mTopic = "test";
private M2KCKafkaConfig mM2KCKafkaConfig;
private KafkaTemplate<String, String> mKafkaTemplate;

@Autowired
public KafkaProducer(M2KCKafkaConfig kafkaConfig) {
mTopic = kafkaConfig.getKafkaSourceTopic();
mM2KCKafkaConfig = kafkaConfig;
mKafkaTemplate = getKafkaTemplate();
}

public KafkaTemplate<String, String> getKafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
return kafkaTemplate;
}

public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass());
if (mM2KCKafkaConfig.isKafkaSslEnable()) {
// TODO : to test
properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
}

return new DefaultKafkaProducerFactory<String, String>(properties);
}

public void sendMessage(String msg) {
LOGGER.info("===Producing message[{}]: {}", mTopic, msg);
ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.info("===Producing message success");
}

@Override
public void onFailure(Throwable ex) {
LOGGER.info("===Producing message failed");
}

});
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
复制代码@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaConsumer implements InitializingBean {
private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class);

private String mTopic;
private M2KCKafkaConfig mM2KCKafkaConfig;
private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer;

@Autowired
public KafkaConsumer(M2KCKafkaConfig kafkaConfig) {
LOGGER.info("===KafkaConsumer construct");
mTopic = kafkaConfig.getKafkaSourceTopic();
mM2KCKafkaConfig = kafkaConfig;
}

@PostConstruct
public void start(){
LOGGER.info("===KafkaConsumer start");
}

@Override
public void afterPropertiesSet() throws Exception {
LOGGER.info("===afterPropertiesSet is called");
createContainer();
}

private void createContainer() {
mKafkaMessageListenerContainer = createKafkaMessageListenerContainer();
mKafkaMessageListenerContainer.setAutoStartup(false);;
mKafkaMessageListenerContainer.start();
LOGGER.info("===", mKafkaMessageListenerContainer);
}

private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() {
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(),
createContainerProperties());
LOGGER.info("===createKafkaMessageListenerContainer");
return container;
}

private ContainerProperties createContainerProperties() {
ContainerProperties containerProps = new ContainerProperties(mTopic);
containerProps.setMessageListener(createMessageListener());
return containerProps;
}

private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID());
if (mM2KCKafkaConfig.isKafkaSslEnable()) {
// TODO : to test
properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
}

return new DefaultKafkaConsumerFactory<String, String>(properties);
}

private MessageListener<String, String> createMessageListener() {
return new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// TODO Auto-generated method stub
LOGGER.info("===Consuming msg: {}", data.value());
}

};
}
}

继承InitializingBean只是为了初始化,也可以去掉,将初始化写入了构造函数中。这里的消费者和生产者都使用@Scope,所以需要手动获取实例,通过context去调用getBean()。另外配置文件没有写全,这里需要注意。

5.3 基于Spring Integration发布订阅实现

Spring Integration也有对Kafka支持的适配器,采用Spring Integration,我们也能够快速的实现发布订阅功能,且实现群组多消费者批量消费功能:

  • 实现Kafka自定义配置类
  • 采用Spring Integration
  • 发布订阅
  • 群组多消费者批量消费
  • 采用DSL特定领域语法去编写
  • 生产者发布成功与失败异常处理

我们可以先看看整体的Kafka消息传递通道:

  • 出站通道中KafkaProducerMessageHandler用于将消息发送到主题
  • KafkaMessageDrivenChannelAdapter用于设置入站通道和消息处理

具体的Demo可以参考Github中的一个sample :

  • github.com/spring-proj…

6 总结

本篇文章详细介绍了Spring Kafka的发送消息和接收消息功能,其他包括Spring Kafka Stream的简单介绍、Spring Kafka参数配置,以及在Spring Boot中如何通过三种方式去实现Kafka的发布订阅功能,涉及了Kafka的多消费者多订阅者,SSL安全传输,Spring Integration Kafka等。文章很长,把握总体,结合实际,差不多基本内容都有所涉及了。

7 知识扩展

Spring Expression Language(简称SpEL),在Spring中,不同于属性占位符${...},而SpEL表达式则要放到#{...}中(除代码块中用Expression外)。如配置文件中有topics参数spring.kafka.topics,则可以将配置文件中参数传入注解@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")。

SpEL表达式常用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码// 字面量
#{3.1415926} // 浮点数
#{9.87E4} // 科学计数法表示98700
#{'Hello'} // String 类型
#{false} // Boolean 类型
// 引用Bean、属性和方法
#{sgtPeppers} // 使用这个bean
#{sgtPeppers.artist} // 引用bean中的属性
#{sgtPeppers.selectArtist()} // 引用bean中的方法
#{sgtPeppers.selectArtist().toUpperCase()} // 方法返回值的操作
#{sgtPeppers.selectArtist()?.toUpperCase()} // 防止selectArtist()方法返回null,?表示非null则执行toUpperCase()
// 访问类作用域的方法和常量的话,使用T()这个关键的运算符
#{T(java.lang.Math)}
#{T(java.lang.Math).PI} // 引用PI的值
#{T(java.lang.Math).random()} // 获取0-1的随机数
#{T(System).currentTimeMillis()} // 获取时间到当前的毫秒数
// 替代属性占位符获取配置文件属性值
@Value("#{表达式}"
private String variable;

8 参考资料

  • docs.spring.io/spring-kafk…
  • docs.spring.io/spring-boot…
  • blog.csdn.net/lishuangzhe…
  • docs.spring.io/spring-boot…
  • docs.spring.io/spring-boot…
  • docs.spring.io/spring-kafk…
  • docs.spring.io/spring-kafk…
  • www.javatt.com/p/16904
  • github.com/cwenao/spri…
  • docs.spring.io/spring-kafk…
  • docs.spring.io/spring-kafk…
  • docs.spring.io/spring-batc…
  • www.intertech.com/Blog/spring…
  • joshlong.com/jl/blogPost…
  • examples.javacodegeeks.com/enterprise-…
  • www.orchome.com/553
  • docs.spring.io/spring-inte…
  • programming.vip/docs/spring… (事务型消息)
  • docs.confluent.io/current/kaf…
  • github.com/spring-proj…
  • www.jianshu.com/p/27fd3754b…
  • www.jianshu.com/p/cec449a7e…
  • memorynotfound.com/spring-kafk…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

《提升能力,涨薪可待》—Java并发之Synchronize

发表于 2019-12-26

欢迎关注公众号【Ccww技术博客】,原创技术文章第一时间推出

往期文章:

  • 《提升能力,涨薪可待》-Java并发之AQS全面详
  • java多线程并发系列–基础知识点(笔试、面试必备)
  • …

Synchronized简介

线程安全是并发编程中的至关重要的,造成线程安全问题的主要原因:

  • 临界资源, 存在共享数据
  • 多线程共同操作共享数据

而Java关键字synchronized,为多线程场景下防止临界资源访问冲突提供支持, 可以保证在同一时刻,只有一个线程可以执行某个方法或某个代码块操作共享数据。

即当要执行代码使用synchronized关键字时,它将检查锁是否可用,然后获取锁,执行代码,最后再释放锁。而synchronized有三种使用方式:

  • synchronized方法: synchronized当前实例对象,进入同步代码前要获得当前实例的锁
  • synchronized静态方法: synchronized当前类的class对象 ,进入同步代码前要获得当前类对象的锁
  • synchronized代码块:synchronized括号里面的对象,对给定对象加锁,进入同步代码库前要获得给定对象的锁

Synchronized方法

首先看一下没有使用synchronized关键字,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
复制代码public class ThreadNoSynchronizedTest {

public void method1(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("method1");
}

public void method2() {
System.out.println("method2");
}

public static void main(String[] args) {
ThreadNoSynchronizedTest tnst= new ThreadNoSynchronizedTest();

Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
tnst.method1();
}
});

Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
tnst.method2();
}
});
t1.start();
t2.start();
}
}

在上述的代码中,method1比method2多了2s的延时,因此在t1和t2线程同时执行的情况下,执行结果:

method2

method1

当method1和method2使用了synchronized关键字后,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码public synchronized void method1(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("method1");
}

public synchronized void method2() {
System.out.println("method2");
}

此时,由于method1占用了锁,因此method2必须要等待method1执行完之后才能执行,执行结果:

method1

method2

因此synchronized锁定是当前的对象,当前对象的synchronized方法在同一时间只能执行其中的一个,另外的synchronized方法需挂起等待,但不影响非synchronized方法的执行。下面的synchronized方法和synchronized代码块(把整个方法synchronized(this)包围起来)等价的。

1
2
3
4
5
6
7
8
复制代码public synchronized void method1(){

}

public void method2() {
synchronized(this){
}
}

Synchronized静态方法

synchronized静态方法是作用在整个类上面的方法,相当于把类的class作为锁,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码public class TreadSynchronizedTest {

public static synchronized void method1(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println("method1");
}

public static void method2() {
synchronized(TreadTest.class){
System.out.println("method2");
}
}

public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
TreadSynchronizedTest.method1();
}
});

Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
TreadSynchronizedTest.method2();
}
});
t1.start();
t2.start();
}

}

由于将class作为锁,因此method1和method2存在着竞争关系,method2中synchronized(ThreadTest.class)等同于在method2的声明时void前面直接加上synchronized。上述代码的执行结果仍然是先打印出method1的结果:

method1

method2

Synchronized代码块

synchronized代码块应用于处理临界资源的代码块中,不需要访问临界资源的代码可以不用去竞争资源,减少了资源间的竞争,提高代码性能。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
复制代码public class TreadSynchronizedTest {

private Object obj = new Object();

public void method1(){
System.out.println("method1 start");
synchronized(obj){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("method1 end");
}
}

public void method2() {
System.out.println("method2 start");


// 延时10ms,让method1线获取到锁obj
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
synchronized(obj){
System.out.println("method2 end");
}
}

public static void main(String[] args) {
TreadSynchronizedTest tst = new TreadSynchronizedTest();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
tst.method1();
}
});

Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
tst.method2();
}
});
t1.start();
t2.start();
}
}

执行结果如下:

method1 start

method2 start

method1 end

method2 end

上述代码中,执行method2方法,先打印出 method2 start, 之后执行同步块,由于此时obj被method1获取到,method2只能等到method1执行完成后再执行,因此先打印method1 end,然后在打印method2 end。

Synchronized原理

synchronized 是JVM实现的一种锁,其中锁的获取和释放分别是monitorenter 和 monitorexit 指令。

加了 synchronized 关键字的代码段,生成的字节码文件会多出 monitorenter 和 monitorexit 两条指令,并且会多一个 ACC_SYNCHRONIZED 标志位,

当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。

在方法执行期间,其他任何线程都无法再获得同一个monitor对象。其实本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。

在Java1.6之后,sychronized在实现上分为了偏向锁、轻量级锁和重量级锁,其中偏向锁在 java1.6 是默认开启的,轻量级锁在多线程竞争的情况下会膨胀成重量级锁,有关锁的数据都保存在对象头中。

  • 偏向锁:在只有一个线程访问同步块时使用,通过CAS操作获取锁
  • 轻量级锁:当存在多个线程交替访问同步快,偏向锁就会升级为轻量级锁。当线程获取轻量级锁失败,说明存在着竞争,轻量级锁会膨胀成重量级锁,当前线程会通过自旋(通过CAS操作不断获取锁),后面的其他获取锁的线程则直接进入阻塞状态。
  • 重量级锁:锁获取失败则线程直接阻塞,因此会有线程上下文的切换,性能最差。

锁优化-适应性自旋(Adaptive Spinning)

从轻量级锁获取的流程中我们知道,当线程在获取轻量级锁的过程中执行CAS操作失败时,是要通过自旋来获取重量级锁的。问题在于,自旋是需要消耗CPU的,如果一直获取不到锁的话,那该线程就一直处在自旋状态,白白浪费CPU资源。

其中解决这个问题最简单的办法就是指定自旋的次数,例如让其循环10次,如果还没获取到锁就进入阻塞状态。但是JDK采用了更聪明的方式——适应性自旋,简单来说就是线程如果自旋成功了,则下次自旋的次数会更多,如果自旋失败了,则自旋的次数就会减少。

锁优化-锁粗化(Lock Coarsening)

锁粗化的概念应该比较好理解,就是将多次连接在一起的加锁、解锁操作合并为一次,将多个连续的锁扩展成一个范围更大的锁。举个例子:

1
2
3
4
5
6
7
8
复制代码public class StringBufferTest {
StringBuffer stringBuffer = new StringBuffer();
public void append(){
stringBuffer.append("a");
stringBuffer.append("b");
stringBuffer.append("c");
}
}

这里每次调用stringBuffer.append方法都需要加锁和解锁,如果虚拟机检测到有一系列连串的对同一个对象加锁和解锁操作,就会将其合并成一次范围更大的加锁和解锁操作,即在第一次append方法时进行加锁,最后一次append方法结束后进行解锁。

锁优化-锁消除(Lock Elimination)

锁消除即删除不必要的加锁操作。根据代码逃逸技术,如果判断到一段代码中,堆上的数据不会逃逸出当前线程,那么可以认为这段代码是线程安全的,不必要加锁。看下面这段程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码public class SynchronizedTest02 {

public static void main(String[] args) {
SynchronizedTest02 test02 = new SynchronizedTest02();
for (int i = 0; i < 10000; i++) {
i++;
}
long start = System.currentTimeMillis();
for (int i = 0; i < 100000000; i++) {
test02.append("abc", "def");
}
System.out.println("Time=" + (System.currentTimeMillis() - start));
}

public void append(String str1, String str2) {
StringBuffer sb = new StringBuffer();
sb.append(str1).append(str2);
}
}

虽然StringBuffer的append是一个同步方法,但是这段程序中的StringBuffer属于一个局部变量,并且不会从该方法中逃逸出去,所以其实这过程是线程安全的,可以将锁消除。

Sychronized缺点

Sychronized会让没有得到锁的资源进入Block状态,争夺到资源之后又转为Running状态,这个过程涉及到操作系统用户模式和内核模式的切换,代价比较高。

Java1.6为 synchronized 做了优化,增加了从偏向锁到轻量级锁再到重量级锁的过度,但是在最终转变为重量级锁之后,性能仍然较低。

各位看官还可以吗?喜欢的话,动动手指点个💗,点个关注呗!!谢谢支持!

欢迎关注公众号【Ccww技术博客】,原创技术文章第一时间推出

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

2019年常见ElasticSearch 面试题解析(上)

发表于 2019-12-25

前言

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java语言开发的,并作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎。ElasticSearch用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。官方客户端在Java、.NET(C#)、PHP、Python、Apache Groovy、Ruby和许多其他语言中都是可用的。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr,也是基于Lucene。Elasticsearch 面试题

1、elasticsearch 了解多少,说说你们公司 es 的集群架构,索引数据大小,分片有多少,以及一些调优手段 。2、elasticsearch 的倒排索引是什么3、elasticsearch 索引数据多了怎么办,如何调优,部署4、elasticsearch 是如何实现 master 选举的5、详细描述一下 Elasticsearch 索引文档的过程6、详细描述一下 Elasticsearch 搜索的过程?7、Elasticsearch 在部署时,对 Linux 的设置有哪些优化方法8、lucence 内部结构是什么?9、Elasticsearch 是如何实现 Master 选举的?10、Elasticsearch 中的节点(比如共 20 个),其中的 10 个选了一个master,另外 10 个选了另一个 master,怎么办?11、客户端在和集群连接时,如何选择特定的节点执行请求的?12、详细描述一下 Elasticsearch 索引文档的过程。1、elasticsearch 了解多少,说说你们公司 es 的集群架构,索引数据大小,分片有多少,以及一些调优手段 。

面试官:想了解应聘者之前公司接触的 ES 使用场景、规模,有没有做过比较大规模的索引设计、规划、调优。解答:如实结合自己的实践场景回答即可。比如:ES 集群架构 13 个节点,索引根据通道不同共 20+索引,根据日期,每日递增 20+,索引:10 分片,每日递增 1 亿+数据,每个通道每天索引大小控制:150GB 之内。仅索引层面调优手段:### 1.1、设计阶段调优

(1)根据业务增量需求,采取基于日期模板创建索引,通过 roll over API 滚动索引;(2)使用别名进行索引管理;(3)每天凌晨定时对索引做 force_merge 操作,以释放空间;(4)采取冷热分离机制,热数据存储到 SSD,提高检索效率;冷数据定期进行 shrink操作,以缩减存储;(5)采取 curator 进行索引的生命周期管理;(6)仅针对需要分词的字段,合理的设置分词器;(7)Mapping 阶段充分结合各个字段的属性,是否需要检索、是否需要存储等。……..### 1.2、写入调优

(1)写入前副本数设置为 0;(2)写入前关闭 refresh_interval 设置为-1,禁用刷新机制;(3)写入过程中:采取 bulk 批量写入;(4)写入后恢复副本数和刷新间隔;(5)尽量使用自动生成的 id。### 1.3、查询调优

(1)禁用 wildcard;(2)禁用批量 terms(成百上千的场景);(3)充分利用倒排索引机制,能 keyword 类型尽量 keyword;(4)数据量大时候,可以先基于时间敲定索引再检索;(5)设置合理的路由机制。### 1.4、其他调优

部署调优,业务调优等。上面的提及一部分,面试者就基本对你之前的实践或者运维经验有所评估了。
2、elasticsearch 的倒排索引是什么


面试官:想了解你对基础概念的认知。解答:通俗解释一下就可以。传统的我们的检索是通过文章,逐个遍历找到对应关键词的位置。而倒排索引,是通过分词策略,形成了词和文章的映射关系表,这种词典+映射表即为倒排索引。有了倒排索引,就能实现 o(1)时间复杂度的效率检索文章了,极大的提高了检索效率。学术的解答方式:倒排索引,相反于一篇文章包含了哪些词,它从词出发,记载了这个词在哪些文档中出现过,由两部分组成——词典和倒排表。加分项:倒排索引的底层实现是基于:FST(Finite State Transducer)数据结构。lucene 从 4+版本后开始大量使用的数据结构是 FST。FST 有两个优点:(1)空间占用小。通过对词典中单词前缀和后缀的重复利用,压缩了存储空间;(2)查询速度快。O(len(str))的查询时间复杂度。
3、elasticsearch 索引数据多了怎么办,如何调优,部署


面试官:想了解大数据量的运维能力。解答:索引数据的规划,应在前期做好规划,正所谓“设计先行,编码在后”,这样才能有效的避免突如其来的数据激增导致集群处理能力不足引发的线上客户检索或者其他业务受到影响。如何调优,正如问题 1 所说,这里细化一下:### 3.1 动态索引层面

基于模板+时间+rollover api 滚动创建索引,举例:设计阶段定义:blog 索引的模板格式为:blog_index_时间戳的形式,每天递增数据。这样做的好处:不至于数据量激增导致单个索引数据量非常大,接近于上线 2 的32 次幂-1,索引存储达到了 TB+甚至更大。一旦单个索引很大,存储等各种风险也随之而来,所以要提前考虑+及早避免。### 3.2 存储层面

冷热数据分离存储,热数据(比如最近 3 天或者一周的数据),其余为冷数据。对于冷数据不会再写入新数据,可以考虑定期 force_merge 加 shrink 压缩操作,节省存储空间和检索效率。### 3.3 部署层面

一旦之前没有规划,这里就属于应急策略。结合 ES 自身的支持动态扩展的特点,动态新增机器的方式可以缓解集群压力,注意:如果之前主节点等规划合理,不需要重启集群也能完成动态新增的。
4、elasticsearch 是如何实现 master 选举的


面试官:想了解 ES 集群的底层原理,不再只关注业务层面了。解答:前置前提:(1)只有候选主节点(master:true)的节点才能成为主节点。(2)最小主节点数(min_master_nodes)的目的是防止脑裂。核对了一下代码,核心入口为 findMaster,选择主节点成功返回对应 Master,否则返回 null。选举流程大致描述如下:第一步:确认候选主节点数达标,elasticsearch.yml 设置的值discovery.zen.minimum_master_nodes;第二步:比较:先判定是否具备 master 资格,具备候选主节点资格的优先返回;若两节点都为候选主节点,则 id 小的值会主节点。注意这里的 id 为 string 类型。题外话:获取节点 id 的方法。

1
2
3
复制代码1GET /_cat/nodes?v&h=ip,port,heapPercent,heapMax,id,name

2ip port heapPercent heapMax id name

5、详细描述一下 Elasticsearch 索引文档的过程

面试官:想了解 ES 的底层原理,不再只关注业务层面了。解答:这里的索引文档应该理解为文档写入 ES,创建索引的过程。文档写入包含:单文档写入和批量 bulk 写入,这里只解释一下:单文档写入流程。记住官方文档中的这个图。第一步:客户写集群某节点写入数据,发送请求。(如果没有指定路由/协调节点,请求的节点扮演路由节点的角色。)第二步:节点 1 接受到请求后,使用文档_id 来确定文档属于分片 0。请求会被转到另外的节点,假定节点 3。因此分片 0 的主分片分配到节点 3 上。第三步:节点 3 在主分片上执行写操作,如果成功,则将请求并行转发到节点 1和节点 2 的副本分片上,等待结果返回。所有的副本分片都报告成功,节点 3 将向协调节点(节点 1)报告成功,节点 1 向请求客户端报告写入成功。如果面试官再问:第二步中的文档获取分片的过程?回答:借助路由算法获取,路由算法就是根据路由和文档 id 计算目标的分片 id 的过程。

1
复制代码1shard = hash(_routing) % (num_of_primary_shards)

6、详细描述一下 Elasticsearch 搜索的过程?

面试官:想了解 ES 搜索的底层原理,不再只关注业务层面了。解答:搜索拆解为“query then fetch” 两个阶段。query 阶段的目的:定位到位置,但不取。步骤拆解如下:(1)假设一个索引数据有 5 主+1 副本 共 10 分片,一次请求会命中(主或者副本分片中)的一个。(2)每个分片在本地进行查询,结果返回到本地有序的优先队列中。(3)第 2)步骤的结果发送到协调节点,协调节点产生一个全局的排序列表。fetch 阶段的目的:取数据。路由节点获取所有文档,返回给客户端。7、Elasticsearch 在部署时,对 Linux 的设置有哪些优化方法

面试官:想了解对 ES 集群的运维能力。解答:(1)关闭缓存 swap;(2)堆内存设置为:Min(节点内存/2, 32GB);(3)设置最大文件句柄数;(4)线程池+队列大小根据业务需要做调整;(5)磁盘存储 raid 方式——存储有条件使用 RAID10,增加单节点性能以及避免单节点存储故障。
8、lucence 内部结构是什么?


面试官:想了解你的知识面的广度和深度。解答:Lucene 是有索引和搜索的两个过程,包含索引创建,索引,搜索三个要点。可以基于这个脉络展开一些。
9、Elasticsearch 是如何实现 Master 选举的?


(1)Elasticsearch 的选主是 ZenDiscovery 模块负责的,主要包含 Ping(节点之间通过这个 RPC 来发现彼此)和 Unicast(单播模块包含一个主机列表以控制哪些节点需要 ping 通)这两部分;(2)对所有可以成为 master 的节点(node.master: true)根据 nodeId 字典排序,每次选举每个节点都把自己所知道节点排一次序,然后选出第一个(第 0 位)节点,暂且认为它是 master 节点。(3)如果对某个节点的投票数达到一定的值(可以成为 master 节点数 n/2+1)并且该节点自己也选举自己,那这个节点就是 master。否则重新选举一直到满足上述条件。(4)补充:master 节点的职责主要包括集群、节点和索引的管理,不负责文档级别的管理;data 节点可以关闭 http 功能*。10、Elasticsearch 中的节点(比如共 20 个),其中的 10 个

选了一个 master,另外 10 个选了另一个 master,怎么办?(1)当集群 master 候选数量不小于 3 个时,可以通过设置最少投票通过数量(discovery.zen.minimum_master_nodes)超过所有候选节点一半以上来解决脑裂问题;(3)当候选数量为两个时,只能修改为唯一的一个 master 候选,其他作为 data节点,避免脑裂问题。11、客户端在和集群连接时,如何选择特定的节点执行请求的?

TransportClient 利用 transport 模块远程连接一个 elasticsearch 集群。它并不加入到集群中,只是简单的获得一个或者多个初始化的 transport 地址,并以 轮询 的方式与这些地址进行通信。12、详细描述一下 Elasticsearch 索引文档的过程。

协调节点默认使用文档 ID 参与计算(也支持通过 routing),以便为路由提供合适的分片。

1
复制代码shard = hash(document_id) % (num_of_primary_shards)

(1)当分片所在的节点接收到来自协调节点的请求后,会将请求写入到 MemoryBuffer,然后定时(默认是每隔 1 秒)写入到 Filesystem Cache,这个从 MomeryBuffer 到 Filesystem Cache 的过程就叫做 refresh;(2)当然在某些情况下,存在 Momery Buffer 和 Filesystem Cache 的数据可能会丢失,ES 是通过 translog 的机制来保证数据的可靠性的。其实现机制是接收到请求后,同时也会写入到 translog 中 ,当 Filesystem cache 中的数据写入到磁盘中时,才会清除掉,这个过程叫做 flush;(3)在 flush 过程中,内存中的缓冲将被清除,内容被写入一个新段,段的 fsync将创建一个新的提交点,并将内容刷新到磁盘,旧的 translog 将被删除并开始一个新的 translog。(4)flush 触发的时机是定时触发(默认 30 分钟)或者 translog 变得太大(默认为 512M)时;
补充:关于 Lucene 的 Segement:(1)Lucene 索引是由多个段组成,段本身是一个功能齐全的倒排索引。(2)段是不可变的,允许 Lucene 将新的文档增量地添加到索引中,而不用从头重建索引。(3)对于每一个搜索请求而言,索引中的所有段都会被搜索,并且每个段会消耗CPU 的时钟周、文件句柄和内存。这意味着段的数量越多,搜索性能会越低。(4)为了解决这个问题,Elasticsearch 会合并小段到一个较大的段,提交新的合并段到磁盘,并删除那些旧的小段。最后
–

欢迎大家关注我的公众号【程序员追风】,2019年多家公司java面试题整理了1000多道400多页pdf文档,文章都会在里面更新,整理的资料也会放在里面。

喜欢文章记得关注我点个赞哟,感谢支持!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文带你看懂JAVA IO流(一),史上最全面的IO教学啦(

发表于 2019-12-24

一、IO流是什么

惯例引用百科的回答

流是一种抽象概念,它代表了数据的无结构化传递。按照流的方式进行输入输出,数据被当成无结构的字节序或字符序列。从流中取得数据的操作称为提取操作,而向流中添加数据的操作称为插入操作。用来进行输入输出操作的流就称为IO流。换句话说,IO流就是以流的方式进行输入输出 [1] .

我对IO流的理解就是”你的程序和系统之间读写文件的操作就是IO操作,和系统之间读写用的东西就是IO流”。

JAVA IO流就是JAVA程序和操作系统之间通信用的方法。

二、JAVA IO系统脑图

给大家看下JAVA IO的脑图

自己画的JAVA的脑图,如果有需要原文件的去我公众号,发送:JAVA IO,就可以得到脑图的原文件了,带黄色文件夹标注的是我自己写的注释嗷。

什么你还不知道我公众号,微信搜索,千珏(jue),就可以关注到我了。

三、JAVA IO流详解

在这里由于篇幅的原因我只讲解JAVA IO流中的字节流和字符流,别的就等以后再写,比如:NIO AIO BIO这些,以后有时间抽出时间出来写一篇,要是想看的记得点个关注哦。

下面进入正题:

3.1 字节流和字符流的区别

字节流和字符流操作的本质区别只有一个:字节流是原生的操作,字符流是经过处理后的操作。

画个图,字节流在操作时不会用到缓冲区,也就是不会用到内存,文件本身直接操作的,而字符流在操作时使用了缓冲区,通过缓冲区再操作文件,看下图:

为什么要有字符流而不直接用字节流呢?

我相信有些读者心里肯定要问这个问题,我刚开始学习的时候也想过这个问题,为什么不直接用字节流解决呢,还非要搞个字符流出来呢。

我的理解就是字节流处理多个字节表示的东西的时候有可能会出现乱码的问题,比如汉字,用字节流读取的时候有可能因为一位字节没有读到就变成了乱码,字符流呢就完美解决了这个问题,字符流你们可以这样理解,字节流和编码表的组合就是字符流。因为有了编码表所以可以确定这个汉字有多少个字节,这样字节流就可以根据位数准确的读写汉字了。

以上纯为个人理解,如有不对的地方请在评论区给我留言哦。

3.2 字节流

字节流顾名思义就是通过字节直接操作字符,更底层一些。

字节流最基础的两个类就是 InputStream和 OutputStream ,根据这两个派生而来类都含有 read()和 write() 的基本方法,用于读写单个字节或者字节数组。

3.2.1 InputStream 和 OutputStream类

InputStream类是一个抽象类 ,是所有字节输入流类的父类。

OutputStream类是一个抽象类,是所有字节输出流的父类

InputStream的常见子类有:

  • FileInputStream:看这个名字就知道用于从文件中读取信息。
  • ByteArrayInputStream: 字节数组输入流,
  • ObjectInputStream:序列化时使用 一般和ObjectOutputStream一起使用
  • FilterInputStream: 过滤输入流,为基础的输入流提供一些额外的操作。

OutputStream的常见子类有:

  • FileOutPutStream: 文件输出流对文件进行操作
  • ByteArrayOutputStream: 字节数组输出流
  • ObjectOutputStream: 序列化时使用 一般和OjbectInputStream一起使用
  • FilterOutputStream:过滤输出流,为基础的输出流提供一些额外的操作。

我们一个一个过要不然怎么能叫一文带你看懂JAVA IO流了呢,那样不就是标题党了吗[滑稽]。

3.2.1.1 FileInputStream 和 FileOutPutStream类
1) FileInputStream 和 FileOutPutStream概念

FileInputStream是文件字节输入流,就是对文件数据以字节的方式来处理,如音乐、视频、图片等。

FileOutPutStream是文件字节输出流,

2)FileInputStream里面的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
复制代码//通过文件的名字来创建一个对象
public FileInputStream(String name) throws FileNotFoundException{}
//通过File对象来创建一个对象
public FileInputStream(File file) throws FileNotFoundException{}
/**
* 通过FileDescriptor来创建一个对象
* FileDescriptor是一个文件描述符号
* 有in,out,err三种类型
* in:标准输入描述符,out:标准输出的描述符,err:标准错误输出的描述号
*/
public FileInputStream(FileDescriptor fdObj){}
//打开指定的文件进行读取 ,是java和c之间进行操作的api 我们并不会用到
private native void open0(String name){}
//打开指定的文件进行读取,我们并不会用到 因为在构造方法里面帮我们打开了这个文件
private void open(String name){}
//从输入流中读取一个字节的数据,如果到达文件的末尾则返回-1
public int read() throws IOException{}
//读取一个字节数组
private native int readBytes(byte b[], int off, int len) throws IOException;
private native int read0() throws IOException;
//从输入流中读取b.length的数据到b中
public int read(byte b[]) throws IOException{}
//从输入流中读取off到len之间的数据到b中
public int read(byte b[], int off, int len) throws IOException{}
//跳过并丢弃输入流中的n个数据
public long skip(long n) throws IOException{}
private native long skip0(long n) throws IOException;
//可以从此输入流中读取的剩余字节数
public int available() throws IOException {}
private native int available0() throws IOException;
//关闭此文件输入流并释放与该流关联的所有系统资源
public void close() throws IOException {}
//返回FileDescriptor对象
public final FileDescriptor getFD() throws IOException{}
//该方法返回与此文件输入流关联的通道 NIO中会用到 本文不会提及
public FileChannel getChannel(){}
private static native void initIDs();
private native void close0() throws IOException;
//没有更多引用时,调用此方法来关闭输入流 一般不使用
protected void finalize() throws IOException {}

由于篇幅起见FileOutputStream代码里面的方法我就不仔细的带你们看了(我不会说我是因为懒才不带你们看的,溜。

一般常用的方法就几个,举个例子,往D盘下面hello.txt里面输入“hello world”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
复制代码public class Test {
public static void main(String []args) throws IOException {
//根据文件夹的名字来创建对象
FileOutputStream fileOutputStream = new FileOutputStream("D:\\hello.txt");
//往文件里面一个字节一个字节的写入数据
fileOutputStream.write((int)'h');
fileOutputStream.write((int)'e');
fileOutputStream.write((int)'l');
fileOutputStream.write((int)'l');
fileOutputStream.write((int)'o');
String s = " world";
//入文件里面一个字节数组的写入文件
fileOutputStream.write(s.getBytes());
fileOutputStream.close();
//传文件夹的名字来创建对象
FileInputStream fileInputStream = new FileInputStream("D:\\hello.txt");
int by = 0;
//一个字节一个字节的读出数据
while((by = fileInputStream.read()) != -1){
System.out.println((char)by);
}
//关闭流
fileInputStream.close();
//通过File对象来创建对象
fileInputStream = new FileInputStream("new File("D:\\hello.txt")");
byte []bytes = new byte[10];
//一个字节数组的读出数据
while ((by = fileInputStream.read(bytes)) != -1){
for(int i = 0; i< by ; i++){
System.out.print((char) bytes[i]);
}
}
//关闭流
fileInputStream.close();
}
}

常用的就上述代码里面的三种方法。

3.2.1.2 ByteArrayInputStream和ByteArrayOutputStream
1)ByteArrayInputStream和ByteArrayOutputStream概念

ByteArrayInputStream是字节数组输入流,它里面包含一个内部的缓冲区(就是一个字节数组 ),该缓冲区含有从流中读取的字节。

ByteArrayOutputStream是字节数组输出流

2)ByteArrayInputStream里面的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码//通过byte数组来创建对象
public ByteArrayInputStream(byte buf[]) {}
//通过byte数组,并给定开始下标和结束下标来创建对象
public ByteArrayInputStream(byte buf[], int offset, int length){}
//从这个输入流读取下一个字节 末尾会返回
public synchronized int read(){}
//从输入流中读取off到len之间的数据到b中
public synchronized int read(byte b[], int off, int len){}
//跳过并丢弃输入流中的n个数据
public synchronized long skip(long n){}
//可以从此输入流中读取的剩余字节数
public synchronized int available(){}
//判断这个输入流是否支持标记,他一直返回true
public boolean markSupported(){}
//将mark的值设置为当前读取的下标,readAheadLimit这个参数没有意义,因为没用到
public void mark(int readAheadLimit){}
//将当前的下标设置为mark一般和mark()方法一起使用
public synchronized void reset(){}
//关闭这个输入流,因为ByteArrayInputStream操作的是数组所以没有必要关闭流
public void close() throws IOException{}

由于篇幅起见ByteArrayOutputStream代码里面的方法我就不仔细的带你们看了(我不会说我是因为懒才不带你们看的,溜

举个例子,从一个字符串读取数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码public class Test {
public static void main(String[] args) throws IOException {
//创建一个字节输出流对象
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
//一个字节一个字节的写入数据
byteArrayOutputStream.write('h');
byteArrayOutputStream.write('e');
byteArrayOutputStream.write('l');
byteArrayOutputStream.write('l');
byteArrayOutputStream.write('o');
//一个字节数组的写入数据
byteArrayOutputStream.write(" world".getBytes());
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray();
//从这个流中读取数据
int b = 0;
//从这个流中一个字节一个字节的读数据
while ((b = byteArrayInputStream.read()) != -1) {
System.out.println((char) b);
}
byteArrayInputStream = new ByteArrayInputStream(bytes);
byte[] bs = new byte[10];
//从这个流中一次性读取bs.length的数据
while ((b = byteArrayInputStream.read(bs)) != -1) {
for (int i = 0; i < b; i++) {
System.out.print((char) bs[i]);
}
System.out.println();
}
}
}

如上代码所示,我平时常用的也就这几个方法。

3.2.1.3 ObjectInputStream 和ObjectOutpuStream
1)概念

ObjectInputStream是反序列化流,一般和ObjectOutputStream配合使用。

用ObjectOutputStream将java对象序列化然后存入文件中,然后用ObjectInputStream读取出来

这个类的作用,我的理解是有些类在这个程序生命周期结束后,还会被用到所以要序列化保存起来

2)ObjectInputStream 和 ObjectOutpuStream 基本方法

常用的其实就两个方法

1
2
复制代码public final Object readObject(){}
public final void writeObject(Object obj) throws IOException{}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码 class Data implements Serializable {
private int n;
public Data(int n){
this.n=n;
}
@Override
public String toString(){
return Integer.toString(n);
}
}
public class Test {
public static void main(String[] args) throws IOException, ClassNotFoundException {
Data w=new Data(2);
ObjectOutputStream out=new ObjectOutputStream(new FileOutputStream("worm.out"));
//序列化对象,把对象写到worm.out里面
out.writeObject("Worm storage\n");
//序列化对象,把对象写到worm.out里面
out.writeObject(w);
out.close();
//从worm.out里面读取对象
ObjectInputStream in=new ObjectInputStream(new FileInputStream("worm.out"));
//读取String对象
String s=(String)in.readObject();
//读取Data对象
Data d=(Data)in.readObject();
System.out.println(s+"Data = "+d);
}
}
3.2.1.4 FilterInputStream 和 FilterOutputStream
1) 概念

FilterInputStream和FilteOutputStream分别是过滤输入流和过滤输出流,他们的作用是为基础流提供一些额外的功能

2)FilterInputStream 和 FilterOutputStream的常用子类

FilterInputStream常用子类

  • DataInputStream:可以从流中读取基本数据类型,与DataOutpuStream配合一起使用
  • BufferedInputStream:可以从缓冲区中读取数据,不用每次和文件的操作都进行实际操作了。

FilterOutputStream常用子类

  • DataOutputStream:可以向文件中写入基本类型的数据
  • PrintStream:用于产生格式化的输出
  • BufferedOutputStream:通过缓冲区像文件中写入数据。

DataInputStream基本类型写入方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码// 将一个 byte 值写入流中。
void writeByte(int v)
// 将一个 short 值写入流中
void writeShort(int v)
//将一个 int 值写入流中
void writeInt(int v)
// 将一个 long 值写入流中
void writeLong(long v)
//使用 Float 类中的 floatToIntBits 方法将 float 参数转换为一个 int 值,然后该int值写入流中
void writeFloat(float v)
//使用Double 类中的 doubleToLongBits 方法将 double 参数转换为一个 long 值,然后将该long写入到流中。
void writeDouble(double v)
//写入一个char
void writeChar(int v)
//将一个 boolean 值写入流。
void writeBoolean(boolean v)
//将字节数组写入流中
void write(byte[] b, int off, int len)
//将字符串写出到基础输出流中。
oid writeBytes(String s)
//采用UTF-16be方式写入,也就是java字符串的编码
void writeChars(String s)
// 以utf-8形式写入一个字符串
void writeUTF(String str)
//清空此数据输出流,写入文件
void flush()

测试一下方法试试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
复制代码public class Test {
public static void main(String[] args) throws IOException, ClassNotFoundException {
DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream("D:\\hello.txt"));
// 写入byte类型数据
dataOutputStream.writeByte(20);
// 写入short类型数据
dataOutputStream.writeShort(30);
// 写入int类型
dataOutputStream.writeInt(900);
// 写入float类型
dataOutputStream.writeFloat(12.3f);
// 写入long类型
dataOutputStream.writeLong(800L);
// 写入double类型
dataOutputStream.writeDouble(14.23);
//写入boolean类型
dataOutputStream.writeBoolean(true);
// 写入char类型
dataOutputStream.writeChar('中');
dataOutputStream.close();
DataInputStream dataInputStream = new DataInputStream(new FileInputStream("D:\\hello.txt"));
System.out.println(dataInputStream.readByte());
System.out.println(dataInputStream.readShort());
System.out.println(dataInputStream.readInt());
System.out.println(dataInputStream.readFloat());
System.out.println(dataInputStream.readLong());
System.out.println(dataInputStream.readDouble());
System.out.println(dataInputStream.readBoolean());
System.out.println(dataInputStream.readChar());

dataInputStream.close();
//创建一个对象
PrintStream printStream = new PrintStream("D:\\hello.txt");
//写入一个字节数组
printStream.write("helloworld".getBytes());
//写入一个换行符号
printStream.println();
//格式化写入数据
printStream.format("文件名称:%s","hello.txt");
printStream.println();
printStream.append("abcde" );
printStream.close();
}
}

BufferedInputStream和BufferedOutputStream我另开一篇文章写,里面要介绍的东西很多,一篇文章介绍不完。

emmm,还有字符流下篇文章写,今天是完不成了,觉得这篇文章还可以想看下一篇文章的关注我呀,或者可以关注我公众号:千珏呀,后台留言给我呀,是千珏(jue),不是千钰。

3.3 字符流

3.3.1 Reader类和wirter类

字符流:就是在字节流的基础上,加上编码,形成的数据流

字符流出现的意义:因为字节流在操作字符时,可能会有中文导致的乱码,所以由字节流引申出了字符流。

字符流最基础的两个类就是 Reader和 wirter,根据这两个派生而来类都含有read()和write()` 的基本方法。

处理图片、视频、音乐的时候还是用字节流吧,处理文本文件的时候用字符流会好很多。

Reader类常见子类有:

  • FileReader:文件输入流
  • BufferedReader: 带缓冲区的字符输入流

Writer类常见子类有:

  • FileWriter:文件输出流
  • BufferedWriter:带缓冲区的字符输出流

3.3.1.1 FileReader类和FileWriter类

直接看实例吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码public class Test {

public static void main(String[] args) throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException {
FileWriter fileWriter = new FileWriter("D:\\hello.txt");
//向文件里面写入文件
fileWriter.write("123");
//向文件里面写入文件,和writer的区别就是append返回的是FileWriter对象,而write没有返回值
fileWriter.append("hello world");
fileWriter.append("中");
//把流中的数据刷新到文件中,还能继续使用
// 如果没有刷新,也没有关闭流的话 数据是不会写入文件的
fileWriter.flush();
//关闭流
fileWriter.close();
FileReader fileReader = new FileReader("D:\\hello.txt");
int len = 0;
while ((len = fileReader.read()) != -1) {
System.out.println((char) len);
}
//用char数组读数据。
char[] chars = new char[1024];
while ((len = fileReader.read(chars)) != -1) {
System.out.println(chars);
}
fileReader.close();
}
}

FilerWriter 和FileReader 没啥好讲的,具体常用的方法也就上面的,具体使用我也写在注释上面了。

3.3.1.2 BufferedReader类和BufferedWriter类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码public class Test {

public static void main(String[] args) throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException {
//从控制台得到输入流
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
//创建文件
BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter("D:\\hello.txt"));
String input = null;
while(!(input = bufferedReader.readLine()).equals("exit")){
//将从控制台得到的数据写入文件
bufferedWriter.write(input);
//写入一个当前系统下的空行
bufferedWriter.newLine();
}
bufferedWriter.close();
bufferedReader.close();

}
}

写下面的程序的时候还有一点小插曲,我调用close方法关程序的时候,我调用了两次bufferedReader.close();这就会导致我的文件里面没有数据,痛哭,加了flush发现之后发现文件里面又有数据了,这个时候突然间有个疑问,难道是BufferedWriter的close()方法里面没有调用flush()吗,要自己手动的调用 flush(),淦,那也太蠢了吧,于是去源码里面看了一眼。

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码public void close() throws IOException {
synchronized (lock) {
if (out == null) {
return;
}
try (Writer w = out) {
flushBuffer();
} finally {
out = null;
cb = null;
}
}
}

源码里面是有flush()操作的呀,为什么我的程序出问题,一顿检查之后 发现文末写了两个bufferedReader的关闭方法,好吧为自己的愚蠢浪费了十分钟的时间QAQ。

四、总结

一般大佬都有总结 ,我也写个总结吧,也算是跟个风。

总的来说这篇文章写的很差,本来想带你们详细看下字节流和字符流的,结果因为年末的时候事情太多,导致我没有办法完成这一事情,只能简略的带你们看看io是怎么用的,并没有看io流为什么能这样用,没有看io流的实现,本质是想一个源码一个一个源码的过一边,一是因为时间不够,二是因为篇幅太长太枯燥,怕你们也看不下去,只能等以后出单章一个类一个类的源码解析看看,如果你们想看的话,给我留言吧。

这次不是太满意 ,希望下篇文章可以写的满意些吧。
各位看官觉得写得还可以的,点个关注吧。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

灵魂拷问:Java如何获取数组和字符串的长度?length还

发表于 2019-12-24

限时 1 秒钟给出答案,来来来,听我口令:“Java 如何获取数组和字符串的长度?length 还是 length()?”

在逛 programcreek 的时候,我发现了上面这个主题。说实话,我当时脑海中浮现出了这样一副惊心动魄的画面:

面试官老马坐在我的对面,地中海式的发型令我敬佩有加。尽管略显疲惫,但他仍然自信地向我抛出了上面这个问题。稍稍迟疑了一下,我回答说:“数组用 length,字符串用 length 跟上小括号”。老马不愧是面试中的高手,一瞬间就从我的回答中捕获到了不自信。我能感受得出来,因为我看到老马的嘴角微微地动了一下,似乎想要咂咂嘴。但出于对于我的礼貌,他克制住了。

到底该用 length 还是 length(),说真的,我当时真有点吃不准,怀念 IDE 的代码自动提醒功能啊!

1
2
3
4
5
java复制代码int[] arr = new int[4];  
System.out.println(arr.length);// 获取数组的长度

String str = "沉默王二";
System.out.println(str.length());// 获取字符串的长度

按理说,数组和字符串都是对象,访问长度都用 length() 方法就好了。为什么数组偏偏剑走偏锋用的 length 字段呢?

首先呢,我们必须要明白:数组是一个容器,当它被创建后,不仅元素的类型是确定的,元素的个数也是确定的。换句话说,数组的长度是确定的,不可能再变长或者变短。因此,数组可以使用一个字段(length)来表示长度。

创建数组的方法有两种,这个应该大家都知道了。一种是通过 new 关键字创建指定长度后再赋值,另外一种是通过 {} 直接进行初始化。

1
2
3
4
5
6
7
8
9
java复制代码// new  
int[] arr = new int[4];
arr[0] = 0;
arr[1] = 1;
arr[2] = 2;
arr[3] = 3;

// {}
int [] arr1 = {0, 1, 2, 3};

但不管用哪种方法,数组的长度是可以明确知道的。并且不会再变长或者变短(学不了孙悟空的金箍棒)。

由于数组也是对象,所以以下代码是合法的。

1
java复制代码Object arr2 = new int[4];

这就意味着数组继承了超类 java.lang.Object 的所有成员方法和字段。事实上,的确如此,我们可以通过以下代码来获取数组的类型信息 Class。

1
2
3
4
5
java复制代码Object arr2 = new int[4];  
System.out.println(arr2.getClass());

Object arr3 = new String[4];
System.out.println(arr3.getClass());

输出的结果会是什么呢?

1
2
复制代码class [I  
class [Ljava.lang.String;

class [I 表示一个“int 类型数组”在运行时的对象类型信息;class [Ljava.lang.String; 表示一个“字符串类型数组”在运行时的对象类型信息。

那为什么数组不单独定义一个类来表示呢?就像字符串 String 类那样呢?

一个合理的解释是 Java 将其隐藏了。假如真的存在一个 Array.java,我们也可以假想它真实的样子,它必须要定义一个容器来存放数组的元素,就像 String 类那样。

1
2
3
4
5
java复制代码public final class String  
    implements java.io.Serializable, Comparable<String>, CharSequence {
    /** The value is used for character storage. */
    private final char value[];
}

但这样做真的有必要吗?为数组单独定义一个类,是不是有点画蛇添足的意味。那既然数组没必要定义成一个类,也就没有必要再定义一个 length() 方法来获取数组的长度了,直接用 length 这个字段就可以了,不是吗?

那为什么字符串 String 类会有 length() 方法呢?来看一下源码就明白了。

1
2
3
4
5
6
7
8
java复制代码    /**  
     * Returns the length of this string.
     * The length is equal to the number of Unicode
     * code units in the string.
     */
    public int length() {
        return value.length;
    }

length() 方法返回的正是字符数组 value 的长度(length),value 本身是 private 的,因此很有必要为 String 类提供一个 public 级别的方法来供外部访问字符的长度。

总结一下,Java 获取数组长度的时候用 length,获取字符串长度的时候用的是 length(),他们之间的区别我相信大家已经搞清楚了。

最后提醒一点:万丈高楼平地起。一栋楼能盖多高,一座大桥能造多长,重要的是它们的地基。同样对于我们技术人员来说,基础知识越扎实,走得就会越远。


好了各位读者朋友们,以上就是本文的全部内容了。能看到这里的都是最优秀的程序员,二哥必须要伸出可爱的大拇指为你点个赞👍。如果觉得不过瘾,还想看到更多,我再推荐几篇给大家。

灵魂拷问:为什么 Java 字符串是不可变的?

灵魂拷问:创建 Java 字符串,用””还是构造函数

灵魂拷问:如何检查Java数组中是否包含某个值 ?

灵魂拷问:Java 的 substring() 是如何工作的?

养成好习惯!如果是二哥的铁杆读者的话,请不要吝啬你的鼠标左键,点赞就对了,来年升职加薪就是你的了!如果想第一时间看到二哥的原创文章,扫下面这个码就对了。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis 数据结构+线程模型+持久化+内存淘汰+分布式 简

发表于 2019-12-24

简介

Redis 是一个开源的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询等。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。

为什么使用缓存服务器

缓存是高并发场景下提高热点数据访问性能的一个有效手段,在开发项目时会经常使用到。

缓存的类型分为:本地缓存、分布式缓存和多级缓存。

本地缓存 就是在进程的内存中进行缓存,比如我们的 JVM 堆中,本地缓存是内存访问,没有远程交互开销,性能最好,但是受限于单机容量,一般缓存较小且无法扩展。

分布式缓存一般都具有良好的水平扩展能力,对较大数据量的场景也能应付自如。缺点就是需要进行远程请求,性能不如本地缓存。

为了平衡这种情况,实际业务中一般采用多级缓存,本地缓存只保存访问频率最高的部分热点数据,其他的热点数据放在分布式缓存中。

Redis & Memcached

  • 数据结构:Memcached只支持简单的key/value数据结构,不像Redis可以支持丰富的数据类型。
  • 持久化:Memcached无法进行持久化,数据不能备份,只能用于缓存使用,且重启后数据全部丢失。
  • 多线程:Redis 使用单线程反而避免了多线程的频繁上下文切换问题,预防了多线程可能产生的竞争问题。

Memcache 存在着支持并发性不好、可运维性欠佳、原子性操作不够、在误操作时产生数据不一致等问题。

由于 Redis 只使用单核,而 Memcached 可以使用多核,所以 Redis 在存储小数据时比 Memcached 性能更高。而在 100k 以上的数据中,Memcached 性能要高于 Redis,虽然 Redis 最近也在存储大数据的性能上进行优化,但是比起 Memcached,还是稍有逊色。

  • 分布式:Redis原生支持集群模式,Memcached没有原生的集群模式。

Redis 支持通过Replication进行数据复制,通过master-slave机制,可以实时进行数据的同步复制,支持多级复制和增量复制,master-slave机制是Redis进行HA的重要手段。

线程模型

在这里插入图片描述

Redis 内部使用文件事件处理器 file event handler,这个文件事件处理器是单线程的,所以 Redis 才叫做单线程的模型。它采用 IO 多路复用机制同时监听多个 Socket,根据 Socket 上的事件来选择对应的事件处理器进行处理。
文件事件处理器的结构包含 4 个部分:

  • 多个 Socket
  • IO 多路复用程序
  • 文件事件分派器
  • 事件处理器(连接应答处理器、命令请求处理器、命令回复处理器)

多个 Socket 可能会并发产生不同的操作,每个操作对应不同的文件事件,但是 IO 多路复用程序会监听多个 Socket,会将 Socket 产生的事件放入队列中排队,事件分派器每次从队列中取出一个事件,把该事件交给对应的事件处理器进行处理。

为什么Redis单线程也能效率这么高?

  • 完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。它的数据存在内存中,类似于HashMap,HashMap 的优势就是查找和操作的时间复杂度都是O(1);
  • 采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;
  • 使用多路I/O复用模型,非阻塞IO;
  • Redis 直接自己构建了 VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求;

数据结构

基本:String、Hash、List、Set、SortedSet

进阶:HyperLogLog、Geo、Pub/Sub、bitmaps

高级:BloomFilter

基本

String:存储简单的对象序列化字符串,应用场景:缓存热点数据、计数器、会话token

Hash:保存无嵌套的对象属性

List:列表,应用场景:简单消息队列、分页

Set:无序集合,自动去重,应用场景:共同好友

Sorted Set:有序集合,自动去重,应用场景:排行榜、微博热搜

进阶

Geo:可以用来保存地理位置,并作位置距离计算或者根据半径计算位置等。 应用场景:附近的人

Pub/Sub:订阅/发布,应用场景:简单消息队列

HyperLogLog:用来做基数统计的算法 ,比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数统计就是在误差可接受的范围内,快速计算基数。 应用场景:日活跃用户

Bitmap:支持按bit位来存储信息,等同于byte数组,计数效率高,应用场景:日活跃用户、布隆过滤器

在这里插入图片描述

ps:HyperLogLog只需要用12K内存就可以统计2^64个不同元素的基数
​ bitmaps存储一亿用户需要12.5M内存

高级

BloomFilter

布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。 存在误判,可能要查到的元素并没有在容器中,但是hash之后得到的k个位置上值都是1。如果bloom filter中存储的是黑名单,那么可以通过建立一个白名单来存储可能会误判的元素。删除困难。一个放入容器的元素映射到bit数组的k个位置上是1,删除的时候不能简单的直接置为0,可能会影响其他元素的判断。

布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

Bloom Filter跟单哈希函数Bit-Map不同之处在于:Bloom Filter使用了k个哈希函数,每个字符串跟k个bit对应。从而降低了冲突的概率。

在这里插入图片描述

应用场景:允许一定误差的大数据去重(举个栗子:黑名单、推荐和浏览历史去重)
底层数据结构


SDS

Redis 用SDS(Simple Dynamic String)来保存字符串,SDS还被用作缓冲区(buffer)AOF模块中的AOF缓冲区。

1
2
3
4
5
6
7
8
9
10
复制代码struct sdshdr {  
// buf 中已占用空间的长度
int len;

// buf 中剩余可用空间的长度
int free;

// 数据空间
char buf[];
};

使用SDS的好处:

  • 便于获取字符串长度
  • 杜绝缓冲区溢出
  • 减少修改字符串时带来的内存重分配次数

链表

List的底层实现之一就是链表

1
2
3
4
5
复制代码typedef struct listNode{
struct listNode *prev;
struct listNode * next;
void * value;
}

在这里插入图片描述

  • 双端:链表节点带有prev 和next 指针,获取某个节点的前置节点和后置节点的时间复杂度都是O(N)
  • 无环:表头节点的 prev 指针和表尾节点的next 都指向NULL,对立案表的访问时以NULL为截止
  • 表头和表尾:因为链表带有head指针和tail 指针,程序获取链表头结点和尾节点的时间复杂度为O(1)
  • 长度计数器:链表中存有记录链表长度的属性 len

整数集合

整数集合是集合(set)的底层实现之一,当一个集合中只包含整数,且这个集合中的元素数量不多时,redis就会使用整数集合intset作为集合的底层实现。

1
2
3
4
5
6
7
8
复制代码typedef struct intset{
//编码方式
uint32_t enconding;
// 集合包含的元素数量
uint32_t length;
//保存元素的数组
int8_t contents[];
}

整数集合的底层实现为数组,这个数组以有序,无重复的范式保存集合元素,在有需要时,程序会根据新添加的元素类型改变这个数组的类型。

字典

字典,又称为符号表(symbol table)、关联数组(associative array)或映射(map),是一种用于保存键值对的抽象数据结构。类似HashMap,当发生哈希冲突时,采用头插法向单向链表表头插入元素。

在这里插入图片描述

rehash的时候将ht[0]数据重新分配到ht[1]中,将ht[0]释放,将ht[1]设置成ht[0],最后为ht[1]分配一个空白哈希表。

在这里插入图片描述

然而在实际开发过程中,rehash 操作并不是一次性、集中式完成的,而是分多次、渐进式地完成的。
渐进式rehash 的详细步骤:

1、为ht[1] 分配空间,让字典同时持有ht[0]和ht[1]两个哈希表

2、在字典中维持一个索引计数器变量rehashidx,并将它的值设置为0,表示rehash 开始

3、在rehash 进行期间,每次对字典执行CRUD操作时,程序除了执行指定的操作以外,还会将ht[0]中的数据rehash 到ht[1]表中,并且将rehashidx加1

4、当ht[0]中所有数据转移到ht[1]中时,将rehashidx 设置成-1,表示rehash 结束

采用渐进式rehash 的好处在于它采取分而治之的方式,避免了集中式rehash 带来的庞大计算量。

跳表

跳表(skiplist)是一种有序数据结构,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。跳跃表是一种随机化的数据,跳跃表以有序的方式在层次化的链表中保存元素,效率和平衡树媲美 ——查找、删除、添加等操作都可以在对数期望时间下完成,并且比起平衡树来说,跳表的实现要简单直观得多。

Redis 只在两个地方用到了跳表,一个是实现有序集合键,另外一个是在集群节点中用作内部数据结构。

在这里插入图片描述

跳表数据结构其实相当于给原始链表加上多级索引
在这里插入图片描述

ps: 跳表是通过随机函数来维护“平衡性”。当我们往跳表中插入数据的时候,我们可以通过一个随机函数,来决定这个结点插入到哪几级索引层中,比如随机函数生成了值K,那我们就将这个结点添加到第一级到第K级这个K级索引中。
在这里插入图片描述

持久化

RDB做镜像全量持久化,AOF做增量持久化。因为RDB会耗费较长时间,不够实时,在停机的时候会导致大量丢失数据,所以需要AOF来配合使用。

RDB

RDB对Redis的性能影响非常小,是因为在同步数据的时候他只是fork了一个子进程去做持久化的,fork是指redis通过创建子进程来进行RDB操作,cow指的是copy on write,子进程创建后,父子进程共享数据段,父进程继续提供读写服务,写脏的页面数据会逐渐和子进程分离开来。

RDB在数据恢复时比AOF快,因为数据文件小,每条记录只保存了一次,AOF一条记录可能保存多次操作记录。RDB文件的存储格式和Redis数据在内存中的编码格式是一致的,不需要再进行数据编码工作,所以在CPU消耗上要远小于AOF日志的加载。

缺点:快照截屏间隔可能较久,如果采用RDB进行持久化,服务挂掉可能造成更多数据的丢失;在生成快照时如果文件很大可能导致客户端卡顿

SAVE命令由服务器进程直接执行保存操作,会阻塞服务器。BGSAVE命令由子进程执行保存操作,不会阻塞服务器。

服务器状态中会保存所有用save选项设置的保存条件,当任意一个保存条件被满足时,服务器会自动执行BGSAVE命令。

AOF

根据默认配置,RDB 五分钟一次生成快照,但是 AOF 是一秒一次去通过一个后台的线程fsync操作,那最多丢这一秒的数据。

AOF在对日志文件进行操作的时候是以 append-only 的方式去写的,他只是追加的方式写数据,自然就少了很多磁盘寻址的开销了,写入性能惊人,文件也不容易破损。

AOF的日志是通过一个叫 非常可读 的方式记录的,这样的特性就适合做 灾难性数据误删除 的紧急恢复了,比如公司的实习生通过 flushall 清空了所有的数据,只要这个时候后台重写还没发生,你马上拷贝一份 AOF 日志文件,把最后一条 flushall 命令删了就完事了。

缺点:一样的数据,AOF 文件比 RDB 还要大;AOF开启后,Redis支持写的QPS会比RDB支持写的要低,因为每秒异步刷新一次日志

AOF文件通过保存所有修改数据库的写命令请求来记录服务器的数据库状态;命令请求会先保存到AOF缓冲区里面,之后再定期写入并同步到AOF文件。

AOF重写 首先从数据库中读取键现在的值,然后用一条命令去记录键值对,代替之前记录这个键值对的多条命令 ,产生一个新的AOF文件,新AOF文件和原AOF保存的数据库状态一样,但体积更小;

在执行BGREWRITEAOF命令时,Redis会维护一个AOF重写缓冲区,该缓冲区会在子进程创建新的AOF文件期间,记录服务器执行的所有写命令。当子进程完成创建新AOF文件之后,服务器会将重写缓冲区中的所有内容追加到新的AOF文件的末尾。最后,服务器用新的AOF文件替换旧的AOF文件,以此来完成AOF文件重写操作。

配置项

no-appendfsync-on-rewrite

是否在后台写时阻塞,默认值no(表示阻塞写操作)。no表示新的主进程的set操作会被阻塞掉,而yes表示新的主进程的set不会被阻塞,待整个后台写完成之后再将这部分set操作同步到aof文件中。但这可能会存在数据丢失的风险(机率很小),如果对性能有要求,可以设置为yes,仅在后台写时会异步处理命令。

auto-aof-rewrite-percentage

AOF文件的体积比上一次重写之后的增长比例,假设用户对Redis设置了配置选项auto-aof-rewrite-percentage 100,那么当AOF文件的体积比上一次重写之后的文件大小大了至少一倍(100%)的时候,Redis将执行BGREWRITEAOF命令。

auto-aof-rewrite-min-size

触发AOF文件重写的最小的文件大小,即最开始AOF文件必须要达到这个文件大小时才触发重写,后面的每次重写就不会根据这个变量了(根据上一次重写完成之后的大小) 。

内存淘汰

过期策略

Redis键的过期策略,是有定期删除+惰性删除两种。

定期好理解,默认100ms就 随机 抽一些设置了过期时间的key,去检查是否过期,过期了就删了。

惰性删除,查询时再判断是否过期,过期就删除键不返回值。

内存淘汰机制

当新增数据发现内存达到限制时,Redis触发内存淘汰机制。

配置项

maxmemory

配置Redis存储数据时指定限制的内存大小,比如100m。当缓存消耗的内存超过这个数值时, 将触发数据淘汰。该数据配置为0时,表示缓存的数据量没有限制, 即LRU功能不生效。64位的系统默认值为0,32位的系统默认内存限制为3GB。

maxmemory_policy

触发数据淘汰后的淘汰策略

  • no-eviction:当内存限制达到并且客户端尝试执行会让更多内存被使用的命令返回错误(大部分的写入指令,但DEL和几个例外)
  • allkeys-lru: 尝试回收最少使用的键(LRU),使得新添加的数据有空间存放。
  • volatile-lru: 尝试回收最少使用的键(LRU),但仅限于在过期集合的键,使得新添加的数据有空间存放。
  • allkeys-lfu: 尝试回收最近最不常用的键(LFU),使得新添加的数据有空间存放。
  • volatile-lfu: 尝试回收最近最不常用的键(LFU),但仅限于在过期集合的键,使得新添加的数据有空间存放。
  • allkeys-random: 回收随机的键使得新添加的数据有空间存放。
  • volatile-random: 回收随机的键使得新添加的数据有空间存放,但仅限于在过期集合的键。
  • volatile-ttl: 回收在过期集合的键,并且优先回收存活时间(TTL)较短的键,使得新添加的数据有空间存放。

maxmemory_samples

随机采样的精度,也就是随即取出key的数目。该数值配置越大, 越接近于真实的LRU算法,但是数值越大,相应消耗也变高,对性能有一定影响,样本值默认为5。

近似LRU算法

真实LRU算法需要一个双向链表来记录数据的最近被访问顺序,比较耗费内存。

Redis 通过对少量键进行取样,然后回收其中的最久未被访问的键。通过调整每次回收时的采样数量maxmemory-samples,可以实现调整算法的精度。

Redis 的键空间是放在一个哈希表中的,要从所有的键中选出一个最久未被访问的键,需要另外一个数据结构存储这些源信息,这显然不划算。最初,Redis只是随机的选3个key,然后从中淘汰,后来算法改进到N个key的策略,默认是5个。

Redis 3.0之后又改善了算法的性能,会提供一个待淘汰候选key的pool,里面默认有16个key,按照空闲时间排好序。更新时从Redis键空间随机选择N个key,分别计算它们的空闲时间 idle,key只会在pool不满或者空闲时间大于pool里最小的时,才会进入pool,然后从pool中选择空闲时间最大的key淘汰掉。

Redis为什么不使用真实的LRU实现是因为这需要太多的内存。不过近似的LRU算法(approximated LRU)对于应用而言应该是等价的。

在这里插入图片描述

  • 浅灰色带是已经被回收的对象。
  • 灰色带是没有被回收的对象。
  • 绿色带是被添加的对象。
  • 在LRU实现的理论中,我们希望的是,在旧键中的第一半将会过期。Redis的LRU算法则是概率的过期旧的键。

分布式

Redis支持三种分布式部署的方式:主从复制、哨兵模式、集群模式

主从复制

在这里插入图片描述

工作方式

Redis可以使用主从同步,从从同步。第一次同步时,主节点做一次bgsave,并同时将后续修改操作记录到内存buffer,待完成后将RDB文件全量同步到复制节点,复制节点接受完成后将RDB镜像加载到内存。加载完成后,再通知主节点将期间修改的操作记录同步到复制节点进行重放就完成了同步过程。后续的增量数据通过AOF日志同步即可,有点类似数据库的binlog。

优点

可以进行读写分离,分担了主服务器读操作的压力

缺点

Redis不具备自动容错和恢复功能,主机从机的宕机都会导致前端部分读写请求失败,较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。

哨兵模式

在这里插入图片描述

当主服务器中断服务后,可以将一个从服务器升级为主服务器,以便继续提供服务,但是这个过程需要人工手动来操作。 为此,Redis 2.8中提供了哨兵工具来实现自动化的系统监控和故障恢复功能。

哨兵的作用就是监控Redis系统的运行状况。它的功能包括以下两个。

(1)监控主服务器和从服务器是否正常运行。
(2)主服务器出现故障时自动将从服务器转换为主服务器。

工作方式

Sentinel是Redis的高可用性(HA)解决方案,由一个或多个Sentinel实例组成的Sentinel系统可以监视任意多个主服务器,以及这些主服务器属下的所有从服务器,并在被监视的主服务器进行下线状态时,自动将下线主服务器属下的某个从服务器升级为新的主服务器,然后由新的主服务器代替已下线的主服务器继续处理命令请求。

Redis提供的sentinel(哨兵)机制,通过sentinel模式启动redis后,自动监控master/slave的运行状态,基本原理是:心跳机制+投票裁决,每个sentinel只有一次选举的机会,当主库出现故障,哨兵会投票从库中选出一个承担主库的任务,剩下的还是从库

优点

  • 可以进行读写分离,分担了主服务器读操作的压力
  • 主从可以自动切换,可用性更高

缺点

  • Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂

集群模式

redis集群是一个由多个主从节点群组成的分布式服务器群,它具有复制、高可用和分片特性。Redis集群不需要sentinel哨兵也能完成节点移除和故障转移的功能。需要将每个节点设置成集群模式,这种集群模式没有中心节点,可水平扩展,据官方文档称可以线性扩展到上万个节点(官方推荐不超过1000个节点)。redis集群的性能和高可用性均优于之前版本的哨兵模式,且集群配置非常简单。

集群模式有以下几个特点:

  • 由多个Redis服务器组成的分布式网络服务集群;
  • 集群之中有多个Master主节点,每一个主节点都可读可写;
  • 节点之间会互相通信,两两相连;
  • Redis集群无中心节点。

优点

  • 在哨兵模式中,仍然只有一个Master节点。当并发写请求较大时,哨兵模式并不能缓解写压力。 我们知道只有主节点才具有写能力,那如果在一个集群中,能够配置多个主节点,缓解写压力,redis-cluster集群模式能达到此类要求。
  • 在Redis-Cluster集群中,可以给每一个主节点添加从节点,主节点和从节点直接遵循主从模型的特性。

当用户需要处理更多读请求的时候,添加从节点可以扩展系统的读性能。

故障转移

Redis集群的主节点内置了类似Redis Sentinel的节点故障检测和自动故障转移功能,当集群中的某个主节点下线时,集群中的其他在线主节点会注意到这一点,并对已下线的主节点进行故障转移。
集群进行故障转移的方法和Redis Sentinel进行故障转移的方法基本一样,不同的是,在集群里面,故障转移是由集群中其他在线的主节点负责进行的,所以集群不必另外使用Redis Sentinel。

集群分片策略

常见的集群分片算法有:一般哈希算法、一致性哈希算法以及Hash Slot算法,Redis采用的是Hash Slot

一般哈希算法

计算方式:hash(key)%N

缺点:如果增加一个redis,映射公式变成了 hash(key)%(N+1)

​ 如果一个redis宕机了,映射公式变成了 hash(key)%(N-1)

​ 在以上两种情况下,几乎所有的缓存都失效了。

一致性哈希算法

先构造出一个长度为2^32整数环,根据节点名称的hash值(分布在[0,2^32-1])放到这个环上。现在要存放资源,根据资源的Key的Hash值(也是分布在[0,2^32-1]),在环上顺时针的找到离它最近的一个节点,就建立了资源和节点的映射关系。

优点:一个节点宕机时,上面的数据转移到顺时针的下一个节点中,新增一个节点时,也只需要将部分数据迁移到这个节点中,对其他节点的影响很小

删除一个节点

在这里插入图片描述

新增节点
在这里插入图片描述

缺点:由于数据在环上分布不均,可能存在某个节点存储的数据比较多,那么当他宕机的时候,会导致大量数据涌入下一个节点中,把另一个节点打挂了,然后所有节点都挂了

改进:引进了虚拟节点的概念,想象在这个环上有很多“虚拟节点”,数据的存储是沿着环的顺时针方向找一个虚拟节点,每个虚拟节点都会关联到一个真实节点

在这里插入图片描述

Hash Slot算法

Redis采用的是Hash Slot分片算法,用来计算key存储位置的。集群将整个数据库分为16384个槽位slot,所有key-value数据都存储在这些slot中的某一个上。一个slot槽位可以存放多个数据,key的槽位计算公式为:slot_number=CRC16(key)%16384,其中CRC16为16位的循环冗余校验和函数。

客户端可能会挑选任意一个redis实例去发送命令,每个redis实例接收到命令,都会计算key对应的hash slot,如果在本地就在本地处理,否则返回moved给客户端,让客户端进行重定向到对应的节点执行命令

注意事项

过期时间的设置

如果大量的key过期时间设置的过于集中,到过期的那个时间点,Redis可能会出现短暂的卡顿现象。严重的话会出现缓存雪崩,我们一般需要在时间上加一个随机值,使得过期时间分散一些。

电商首页经常会使用定时任务刷新缓存,可能大量的数据失效时间都十分集中,如果失效时间一样,又刚好在失效的时间点大量用户涌入,就有可能造成缓存雪崩。

缓存雪崩:大量缓存的key同时失效,同时大批量的请求落到了数据库上,数据库扛不住挂了。(处理方法:失效时间加上一个随机值,避免同时失效)

缓存穿透:用户不断访问缓存和数据库都没有的数据。(处理方式:请求参数校验,将查询不到数据的key放到缓存中,value设为null)

缓存击穿:对于热点数据,如果一直有高并发的请求,刚好缓存失效,这时大量的请求就会落在数据库上(设置热点期间不过期)

keys缺陷

Redis的单线程的。keys指令会导致线程阻塞一段时间,线上服务会停顿,直到指令执行完毕,服务才能恢复。 使用scan指令可以无阻塞的提取出指定模式的key列表,但是会有一定的重复概率,在客户端做一次去重就可以了,但是整体所花费的时间会比直接用keys指令长 。

疑问补充

Redis字典渐进式扩容

虽然redis实现了在读写操作时,辅助服务器进行渐进式rehash操作,但是如果服务器比较空闲,redis数据库将很长时间内都一直使用两个哈希表。所以在redis周期函数中,如果发现有字典正在进行渐进式rehash操作,则会花费1毫秒的时间,帮助一起进行渐进式rehash操作

持久化

Redis持久化的数据库文件即是RDB文件,如果开启了AOF,数据则持久化在AOF文件中

为什么哈希槽是16384个

在这里插入图片描述

1、正常的心跳包携带节点的完整配置,可以用幂等方式替换旧节点以更新旧配置。 这意味着它们包含原始形式的节点的插槽配置,它使用带有16k插槽只需要2k空间,但使用65k插槽时将使用高达8k的空间。
2、同时,由于其他设计权衡,Redis Cluster不太可能扩展到超过1000个主节点。因此,16k处于正确的范围内,以确保每个主站有足够的插槽,最多1000个主节点,但足够小的数字可以轻松地将插槽配置传播为原始位图。
写在最后
====

最近开始写博客文章,朋友们要是觉得写得还可以,拜托大家点个赞让我知道自己没有白白花费时间,你们的赞是对我莫大的鼓舞,大家有什么意见建议也欢迎在评论区共同探讨。

不以浮沙筑高台,与诸君共勉。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

这应该是把计算机网络五层模型讲的最好是文章了,看不懂你打我

发表于 2019-12-23

帅地:用心写好每一篇文章!

前言

天各一方的两台计算机是如何通信的呢?在成千上万的计算机中,为什么一台计算机能够准确着寻找到另外一台计算机,并且把数据发送给它呢?

可能很多人都听说过网络通信的 5 层模型,但是可能并不是很清楚为什么需要五层模型,五层模型负责的任务也有可能经常混淆。下面是网络通信的五层模型

说实话,五层模型的具体内容还是极其复杂的,不过今天这篇文章,我将用最简洁的模式,通过网络通信的五层模型来讲解一台计算机是如何找到另外一台计算机并且把数据发送给另一台计算机的,就算你没学过计算机网络,也能够听的懂。

1. 物理层

一台计算机与另一台计算机要进行通信,第一件要做的事是什么?当然是要把这台计算机与另外的其他计算机连起来啊,这样,我们才能把数据传输过去。例如可以通过光纤啊,电缆啊,双绞线啊等介质把他们连接起来,然后才能进行通信。

也就是说,物理层负责把两台计算机连起来,然后在计算机之间通过高低电频来传送0,1这样的电信号。

2. 数据链路层

前面说了,物理层它只是单纯着负责把计算机连接起来,并且在计算机之间传输0,1这样的电信号。如果这些0,1组合的传送毫无规则的话,计算机是解读不了的。一大堆0,1谁知道是什么鬼啊。

因此,我们需要制定一套规则来进行0,1的传送。例如多少个电信号为一组啊,每一组信号应该如何标识才能让计算机读懂啊等等。

于是,有了以太网协议。

1. 以太网协议

以太网协议规定,一组电信号构成一个数据包,我们把这个数据包称之为帧。每一个桢由标头(Head)和数据(Data)两部分组成。

帧的大小一般为 64 - 1518 个字节。假如需要传送的数据很大的话,就分成多个桢来进行传送。

对于表头和数据这两个部分,他们存放的都是一些什么数据呢?我猜你眯着眼睛都能想到他们应该放什么数据。 毫无疑问,我们至少得知道这个桢是谁发送,发送给谁的等这些信息吧?所以标头部分主要是一些说明数据,例如发送者,接收者等信息。而数据部分则是这个数据包具体的,想给接守者的内容。

大家想一个问题,一个桢的长度是 64~1518 个字节,也就是说桢的长度不是固定的,那你觉得标头部分的字节长度是固定的吗?它当然是固定的啊,假如不是固定的,每个桢都是单独发的,那计算机怎么知道标头是几个字节,数据是几个字节呢。所以标头部分的字节是固定的,并且固定为18个字节。

把一台计算的的数据通过物理层和链路层发送给另一台计算机,究竟是谁发给谁的,计算机与计算机之间如何区分,,你总得给他们一个唯一的标识吧?

于是,MAC 地址出现了。

文章首发于公众号『苦逼的码农』,更多经常文章欢迎搜索关注,已有150多篇原创。

2. MAC 地址

连入网络的每一个计算机都会有网卡接口,每一个网卡都会有一个唯一的地址,这个地址就叫做 MAC 地址。计算机之间的数据传送,就是通过 MAC 地址来唯一寻找、传送的。

MAC地址 由 48 个字节所构成,在网卡生产时就被唯一标识了。

3. 广播与ARP协议

(1). 广播

如图,假如计算机 A 知道了计算机 B 的 MAC 地址,然后计算机 A 想要给计算机 B 传送数据,虽然计算机 A 知道了计算机 B 的 MAC 地址,可是它要怎么给它传送数据呢?计算机 A 不仅连着计算机 B,而且计算机 A 也还连着其他的计算机。 虽然计算机 A 知道计算机 B 的 MAC 地址,可是计算机 A 却不知道知道计算机 B 是分布在哪边路线上,为了解决这个问题,于是,有了广播的出现。

在同一个子网中,计算机 A 要向计算机 B 发送一个数据包,这个数据包会包含接收者的 MAC 地址。当发送时,计算机 A 是通过广播的方式发送的,这时同一个子网中的计算机 C, D 也会收到这个数据包的,然后收到这个数据包的计算机,会把数据包的 MAC 地址取出来,与自身的 MAC 地址对比,如果两者相同,则接受这个数据包,否则就丢弃这个数据包。这种发送方式我们称之为广播,就像我们平时在广场上通过广播的形式呼叫某个人一样,如果这个名字是你,你就理会一下,如果不是你,你就当作听不见。

(2). ARP 协议。

那么问题来了,计算机 A 是如何知道计算机 B 的 MAC 地址的呢?这个时候就得由 ARP 协议这个家伙来解决了,不过 ARP 协议会涉及到IP地址,我们下面才会扯到IP地址。因此我们先放着,就当作是有这么一个 ARP 协议,通过它我们可以知道子网中其他计算机的 MAC 地址。

3. 网络层

上面我们有说到子网这个关键词,实际上我们所处的网络,是由无数个子网络构成的。广播的时候,也只有同一个子网里面的计算机能够收到。

假如没有子网这种划分的话,计算机 A 通过广播的方式发一个数据包给计算机 B , 其他所有计算机也都能收到这个数据包,然后进行对比再舍弃。世界上有那么多它计算机,每一台计算机都能收到其他所有计算机的数据包,那就不得了了。那还不得奔溃。 因此产生了子网这么一个东西。

那么问题来了,我们如何区分哪些 MAC 地址是属于同一个子网的呢?假如是同一个子网,那我们就用广播的形式把数据传送给对方,如果不是同一个子网的,我们就会把数据发给网关,让网关进行转发。

为了解决这个问题,于是,有了 IP 协议。

1. IP协议

IP协议,它所定义的地址,我们称之为IP地址。IP协议有两种版本,一种是 IPv4,另一种是 IPv6。不过我们目前大多数用的还是 IPv4,我们现在也只讨论 IPv4 这个版本的协议。

这个 IP 地址由 32 位的二进制数组成,我们一般把它分成4段的十进制表示,地址范围为0.0.0.0~255.255.255.255。

每一台想要联网的计算机都会有一个IP地址。这个IP地址被分为两部分,前面一部分代表网络部分,后面一部分代表主机部分。并且网络部分和主机部分所占用的二进制位数是不固定的。

假如两台计算机的网络部分是一模一样的,我们就说这两台计算机是处于同一个子网中。例如 192.168.43.1 和 192.168.43.2, 假如这两个 IP 地址的网络部分为 24 位,主机部分为 8 位。那么他们的网络部分都为 192.168.43,所以他们处于同一个子网中。

可是问题来了,你怎么知道网络部分是占几位,主机部分又是占几位呢?也就是说,单单从两台计算机的IP地址,我们是无法判断他们的是否处于同一个子网中的。

这就引申出了另一个关键词————子网掩码。子网掩码和IP地址一样也是 32 位二进制数,不过它的网络部分规定全部为 1,主机部分规定全部为 0.也就是说,假如上面那两个IP地址的网络部分为 24 位,主机部分为 8 位的话,那他们的子网掩码都为 11111111.11111111.11111111.00000000,即255.255.255.0。

那有了子网掩码,如何来判端IP地址是否处于同一个子网中呢。显然,知道了子网掩码,相当于我们知道了网络部分是几位,主机部分是几位。我们只需要把 IP 地址与它的子网掩码做与(and)运算,然后把各自的结果进行比较就行了,如果比较的结果相同,则代表是同一个子网,否则不是同一个子网。

例如,192.168.43.1和192.168.43.2的子码掩码都为255.255.255.0,把IP与子码掩码相与,可以得到他们都为192.168.43.0,进而他们处于同一个子网中。

2. ARP协议

有了上面IP协议的知识,我们回来讲一下ARP协议。

有了两台计算机的IP地址与子网掩码,我们就可以判断出它们是否处于同一个子网之中了。

假如他们处于同一个子网之中,计算机A要给计算机B发送数据时。我们可以通过ARP协议来得到计算机B的MAC地址。

ARP协议也是通过广播的形式给同一个子网中的每台电脑发送一个数据包(当然,这个数据包会包含接收方的IP地址)。对方收到这个数据包之后,会取出IP地址与自身的对比,如果相同,则把自己的MAC地址回复给对方,否则就丢弃这个数据包。这样,计算机A就能知道计算机B的MAC地址了。

可能有人会问,知道了MAC地址之后,发送数据是通过广播的形式发送,询问对方的MAC地址也是通过广播的形式来发送,那其他计算机怎么知道你是要传送数据还是要询问MAC地址呢?其实在询问MAC地址的数据包中,在对方的MAC地址这一栏中,填的是一个特殊的MAC地址,其他计算机看到这个特殊的MAC地址之后,就能知道广播想干嘛了。

假如两台计算机的IP不是处于同一个子网之中,这个时候,我们就会把数据包发送给网关,然后让网关让我们进行转发传送

3. DNS服务器

这里再说一个问题,我们是如何知道对方计算机的IP地址的呢?这个问题可能有人会觉得很白痴,心想,当然是计算机的操作者来进行输入了。这没错,当我们想要访问某个网站的时候,我们可以输入IP来进行访问,但是我相信绝大多数人是输入一个网址域名的,例如访问百度是输入 www.baidu.com 这个域名。其实当我们输入这个域名时,会有一个叫做DNS服务器的家伙来帮我们解析这个域名,然后返回这个域名对应的IP给我们的。

因此,网络层的功能就是让我们在茫茫人海中,能够找到另一台计算机在哪里,是否属于同一个子网等。

文章首发于公众号『苦逼的码农』,更多经常文章欢迎搜索关注,已有150多篇原创。

4. 传输层

通过物理层、数据链路层以及网络层的互相帮助,我们已经把数据成功从计算机A传送到计算机B了,可是,计算机B里面有各种各样的应用程序,计算机该如何知道这些数据是给谁的呢?

这个时候,**端口(Port)**这个家伙就上场了,也就是说,我们在从计算机A传数据给计算表B的时候,还得指定一个端口,以供特定的应用程序来接受处理。

也就是说,传输层的功能就是建立端口到端口的通信。相比网络层的功能是建立主机到主机的通信。

也就是说,只有有了IP和端口,我们才能进行准确着通信。这个时候可能有人会说,我输入IP地址的时候并没有指定一个端口啊。其实呢,对于有些传输协议,已经有设定了一些默认端口了。例如http的传输默认端口是80,这些端口信息也会包含在数据包里的。

传输层最常见的两大协议是 TCP 协议和 UDP 协议,其中 TCP 协议与 UDP 最大的不同就是 TCP 提供可靠的传输,而 UDP 提供的是不可靠传输。

5. 应用层

终于说到应用层了,应用层这一层最接近我们用户了。

虽然我们收到了传输层传来的数据,可是这些传过来的数据五花八门,有html格式的,有mp4格式的,各种各样。你确定你能看的懂?

因此我们需要指定这些数据的格式规则,收到后才好解读渲染。例如我们最常见的 Http 数据包中,就会指定该数据包是 什么格式的文件了。

文章首发于公众号『苦逼的码农』,更多经常文章欢迎搜索关注,已有150多篇原创。

总结

五层模型至此讲到这里。对于有些层讲的比较简洁,就随便概况了一下。因为如果我说的详细一点的话,篇幅肯定会特别特别长,我着已经是尽最大的努力以最简洁的方式来讲的了。如果你想详细去了解,可以去买计算机网络相应的资料,强烈推荐《计算机网络:自顶向下》这本书。希望我的讲解能让你对计算机之间数据的传输有个大概的了解。

另外,我正在整理一份计算机类书单,只为让大家更加方便找到自己想要的书籍,目前已经收集了几百本了,贡献给需要的人:计算机的书籍很贵?史上最全计算机类电子书整理(持续更新)

有收获?希望老铁们来个三连击,给更多的人看到这篇文章

1、给俺点个赞呗,可以让更多的人看到这篇文章,顺便激励下我,嘻嘻。

2、老铁们,关注我的原创微信公众号「帅地玩编程」,专注于写算法 + 计算机基础知识(计算机网络+ 操作系统+数据库+Linux)。

保存让你看完有所收获,不信你打我。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Dubbo源码解析(四十五)服务引用过程 dubbo服务引用

发表于 2019-12-23

dubbo服务引用过程

目标:从源码的角度分析服务引用过程。

前言

前面服务暴露过程的文章讲解到,服务引用有两种方式,一种就是直连,也就是直接指定服务的地址来进行引用,这种方式更多的时候被用来做服务测试,不建议在生产环境使用这样的方法,因为直连不适合服务治理,dubbo本身就是一个服务治理的框架,提供了很多服务治理的功能。所以更多的时候,我们都不会选择绕过注册中心,而是通过注册中心的方式来进行服务引用。

服务引用过程

dubbo-refer

大致可以分为三个步骤:

  1. 配置加载
  2. 创建invoker
  3. 创建服务接口代理类

引用起点

dubbo服务的引用起点就类似于bean加载。dubbo中有一个类ReferenceBean,它实现了FactoryBean接口,继承了ReferenceConfig,所以ReferenceBean作为dubbo中能生产对象的工厂Bean,而我们要引用服务,也就是要有一个该服务的对象。

服务引用被触发有两个时机:

  • Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务(饿汉式)
  • 在 ReferenceBean 对应的服务被注入到其他类中时引用(懒汉式)

默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启。

因为ReferenceBean实现了FactoryBean接口的getObject()方法,所以在加载bean的时候,会调用ReferenceBean的getObject()方法

ReferenceBean的getObject()
1
2
3
复制代码public Object getObject() {
return get();
}

这个get方法是ReferenceConfig的get()方法

ReferenceConfig的get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码public synchronized T get() {
// 检查并且更新配置
checkAndUpdateSubConfigs();

// 如果被销毁,则抛出异常
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// 检测 代理对象ref 是否为空,为空则通过 init 方法创建
if (ref == null) {
// 用于处理配置,以及调用 createProxy 生成代理类
init();
}
return ref;
}

关于checkAndUpdateSubConfigs()方法前一篇文章已经讲了,我就不再讲述。这里关注init方法。该方法也是处理各类配置的开始。

配置加载(1)

ReferenceConfig的init()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
复制代码private void init() {
// 如果已经初始化过,则结束
if (initialized) {
return;
}
// 设置初始化标志为true
initialized = true;
// 本地存根合法性校验
checkStubAndLocal(interfaceClass);
// mock合法性校验
checkMock(interfaceClass);
// 用来存放配置
Map<String, String> map = new HashMap<String, String>();

// 存放这是消费者侧
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);

// 添加 协议版本、发布版本,时间戳 等信息到 map 中
appendRuntimeParameters(map);
// 如果是泛化调用
if (!isGeneric()) {
// 获得版本号
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
// 设置版本号
map.put(Constants.REVISION_KEY, revision);
}

// 获得所有方法
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// 把所有方法签名拼接起来放入map
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), Constants.COMMA_SEPARATOR));
}
}
// 加入服务接口名称
map.put(Constants.INTERFACE_KEY, interfaceName);
// 添加metrics、application、module、consumer、protocol的所有信息到map
appendParameters(map, metrics);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
Map<String, Object> attributes = null;
if (CollectionUtils.isNotEmpty(methods)) {
attributes = new HashMap<String, Object>();
// 遍历方法配置
for (MethodConfig methodConfig : methods) {
// 把方法配置加入map
appendParameters(map, methodConfig, methodConfig.getName());
// 生成重试的配置key
String retryKey = methodConfig.getName() + ".retry";
// 如果map中已经有该配置,则移除该配置
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
// 如果配置为false,也就是不重试,则设置重试次数为0次
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
// 设置异步配置
attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
}
}

// 获取服务消费者 ip 地址
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
// 如果为空,则获取本地ip
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
}
// 设置消费者ip
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

// 创建代理对象
ref = createProxy(map);

// 生产服务key
String serviceKey = URL.buildKey(interfaceName, group, version);
// 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,
// 并将 ConsumerModel 存入到 ApplicationModel 中
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
}

该方法大致分为以下几个步骤:

  1. 检测本地存根和mock合法性。
  2. 添加协议版本、发布版本,时间戳、metrics、application、module、consumer、protocol等的所有信息到 map 中
  3. 单独处理方法配置,设置重试次数配置以及设置该方法对异步配置信息。
  4. 添加消费者ip地址到map
  5. 创建代理对象
  6. 生成ConsumerModel存入到 ApplicationModel 中

在这里处理配置到逻辑比较清晰。下面就是看ReferenceConfig的createProxy()方法。

创建invoker

ReferenceConfig的createProxy()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
复制代码private T createProxy(Map<String, String> map) {
// 根据配置检查是否为本地调用
if (shouldJvmRefer(map)) {
// 生成url,protocol使用的是injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
// 利用InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
// 如果url不为空,则用户可能想进行直连来调用
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
// 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
// 遍历所有的url
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
// 设置接口全限定名为 url 路径
url = url.setPath(interfaceName);
}
// 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
// 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
// 最后将合并后的配置设置为 url 查询字符串中。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// 校验注册中心
checkRegistry();
// 加载注册中心的url
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
// 遍历所有的注册中心
for (URL u : us) {
// 生成监控url
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
// 加入监控中心url的配置
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 参数到 url 中,并将 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 如果urls为空,则抛出异常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}

// 如果只有一个注册中心,则直接调用refer方法
if (urls.size() == 1) {
// 调用 RegistryProtocol 的 refer 构建 Invoker 实例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 遍历所有的注册中心url
for (URL url : urls) {
// 通过 refprotocol 调用 refer 构建 Invoker,
// refprotocol 会在运行时根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
// 把生成的Invoker加入到集合中
invokers.add(refprotocol.refer(interfaceClass, url));
// 如果是注册中心的协议
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// 则设置registryURL
registryURL = url; // use last registry url
}
}
// 优先用注册中心的url
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's cluster is available
// 只有当注册中心当链接可用当时候,采用RegistryAwareCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
// 由集群进行多个invoker合并
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
// 直接进行合并
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}

// 如果需要核对该服务是否可用,并且该服务不可用
if (shouldCheck() && !invoker.isAvailable()) {
// make it possible for consumer to retry later if provider is temporarily unavailable
// 修改初始化标志为false
initialized = false;
// 抛出异常
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
// 元数据中心服务
MetadataReportService metadataReportService = null;
// 加载元数据服务,如果成功
if ((metadataReportService = getMetadataReportService()) != null) {
// 生成url
URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
// 把消费者配置加入到元数据中心中
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}

该方法的大致逻辑可用分为以下几步:

  1. 如果是本地调用,则直接使用InjvmProtocol 的 refer 方法生成 Invoker 实例。
  2. 如果不是本地调用,但是是选择直连的方式来进行调用,则分割配置的多个url。如果协议是配置是registry,则表明用户想使用指定的注册中心,配置url后将url并且保存到urls里面,否则就合并url,并且保存到urls。
  3. 如果是通过注册中心来进行调用,则先校验所有的注册中心,然后加载注册中心的url,遍历每个url,加入监控中心url配置,最后把每个url保存到urls。
  4. 针对urls集合的数量,如果是单注册中心,直接引用RegistryProtocol 的 refer 构建 Invoker 实例,如果是多注册中心,则对每个url都生成Invoker,利用集群进行多个Invoker合并。
  5. 最终输出一个invoker。

Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。关于这几个接口的定义介绍可以参考《dubbo源码解析(十九)远程调用——开篇》,Protocol 实现类有很多,下面会分析 RegistryProtocol 和 DubboProtocol,我们可以看到上面的源码中讲到,当只有一个注册中心的时候,会直接使用RegistryProtocol。所以先来看看RegistryProtocol的refer()方法。

RegistryProtocol生成invoker

RegistryProtocol的refer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 取 registry 参数值,并将其设置为协议头,默认是dubbo
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
// 获得注册中心实例
Registry registry = registryFactory.getRegistry(url);
// 如果是注册中心服务,则返回注册中心服务的invoker
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
// 将 url 查询字符串转为 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
// 获得group值
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
// 如果有多个组,或者组配置为*,则使用MergeableCluster,并调用 doRefer 继续执行服务引用逻辑
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 只有一个组或者没有组配置,则直接执行doRefer
return doRefer(cluster, registry, type, url);
}

上面的逻辑比较简单,如果是注册服务中心,则直接创建代理。如果不是,先处理组配置,根据组配置来决定Cluster的实现方式,然后调用doRefer方法。

RegistryProtocol的doRefer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心
directory.setRegistry(registry);
// 设置协议
directory.setProtocol(protocol);
// all attributes of REFER_KEY
// 所有属性放到map中
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服务消费者链接
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注册服务消费者,在 consumers 目录下新节点
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 注册服务消费者
registry.register(directory.getRegisteredConsumerUrl());
}
// 创建路由规则链
directory.buildRouterChain(subscribeUrl);
// 订阅 providers、configurators、routers 等节点数据
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个,生成一个invoker
Invoker invoker = cluster.join(directory);
// 在服务提供者处注册消费者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}

该方法大致可以分为以下步骤:

  1. 创建一个 RegistryDirectory 实例,然后生成服务者消费者链接。
  2. 向注册中心进行注册。
  3. 紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。
  4. 由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。关于 RegistryDirectory 和 Cluster,可以看我前面写的一些文章介绍。

DubboProtocol生成invoker

首先还是从DubboProtocol的refer()开始。

DubboProtocol的refer()
1
2
3
4
5
6
7
8
9
10
11
复制代码public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);

// create rpc invoker.
// 创建一个DubboInvoker实例
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
// 加入到集合中
invokers.add(invoker);

return invoker;
}

创建DubboInvoker比较简单,调用了构造方法,这里主要讲这么生成ExchangeClient,也就是getClients方法。

DubboProtocol的getClients()

可以参考《dubbo源码解析(二十四)远程调用——dubbo协议》的(三)DubboProtocol中的源码分析。最新版本基本没有什么变化,只是因为加入了配置中心,配置的优先级更加明确了,所以增加了xml配置优先级高于properties配置的代码逻辑,都比较容易理解。

其中如果是配置的共享,则获得共享客户端对象,也就是getSharedClient()方法,否则新建客户端也就是initClient()方法。

DubboProtocol的getSharedClient()

可以参考《dubbo源码解析(二十四)远程调用——dubbo协议》的(三)DubboProtocol中的源码分析,该方法比较简单,先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的 ExchangeClient 实例。

DubboProtocol的initClient()

可以参考《dubbo源码解析(二十四)远程调用——dubbo协议》的(三)DubboProtocol中的源码分析,initClient 方法首先获取用户配置的客户端类型,最新版本已经改为默认 netty4。然后设置用户心跳配置,然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。这里的 LazyConnectExchangeClient 代码并不是很复杂,该类会在 request 方法被调用时通过 Exchangers 的 connect 方法创建 ExchangeClient 客户端。下面我们分析一下 Exchangers 的 connect 方法。

Exchangers的connect()
1
2
3
4
5
6
7
8
9
10
11
复制代码public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger 实例,默认为 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}

getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单。接下来分析 HeaderExchangeClient 的connect的实现。

HeaderExchangeClient 的connect()
1
2
3
4
5
6
7
复制代码public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 创建 HeaderExchangeHandler 对象
// 创建 DecodeHandler 对象
// 通过 Transporters 构建 Client 实例
// 创建 HeaderExchangeClient 对象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

其中HeaderExchangeHandler、DecodeHandler等可以参考《dubbo源码解析(九)远程通信——Transport层》和《dubbo源码解析(十)远程通信——Exchange层》的分析。这里重点关注Transporters 构建 Client,也就是Transporters的connect方法。

Transporters的connect()

可以参考《dubbo源码解析(八)远程通信——开篇》的(十)Transporters中源码分析。其中获得自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。假设是netty4的实现,则执行以下代码。

1
2
3
复制代码public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}

到这里为止,DubboProtocol生成invoker过程也结束了。再回到createProxy方法的最后一句代码,根据invoker创建服务代理对象。

创建代理

为服务接口生成代理对象。有了代理对象,即可进行远程调用。首先来看AbstractProxyFactory 的 getProxy()方法。

AbstractProxyFactory 的 getProxy()

可以参考《dubbo源码解析(二十三)远程调用——Proxy》的(一)AbstractProxyFactory的源码分析。可以看到第二个getProxy方法其实就是获取 interfaces 数组,调用到第三个getProxy方法时,该getProxy是个抽象方法,由子类来实现,我们还是默认它的代理实现方式为Javassist。所以可以看JavassistProxyFactory的getProxy方法。

JavassistProxyFactory的getProxy()
1
2
3
4
复制代码public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

我们重点看Proxy的getProxy方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
复制代码/**
* Get proxy.
*
* @param ics interface class array.
* @return Proxy instance.
*/
public static Proxy getProxy(Class<?>... ics) {
// 获得Proxy的类加载器来进行生成代理类
return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

/**
* Get proxy.
*
* @param cl class loader.
* @param ics interface class array.
* @return Proxy instance.
*/
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
if (ics.length > Constants.MAX_PROXY_COUNT) {
throw new IllegalArgumentException("interface limit exceeded");
}

StringBuilder sb = new StringBuilder();
// 遍历接口列表
for (int i = 0; i < ics.length; i++) {
String itf = ics[i].getName();
// 检测是否是接口,如果不是,则抛出异常
if (!ics[i].isInterface()) {
throw new RuntimeException(itf + " is not a interface.");
}

Class<?> tmp = null;
try {
// 重新加载接口类
tmp = Class.forName(itf, false, cl);
} catch (ClassNotFoundException e) {
}
// 检测接口是否相同,这里 tmp 有可能为空,也就是该接口无法被类加载器加载的。
if (tmp != ics[i]) {
throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
}

// 拼接接口全限定名,分隔符为 ;
sb.append(itf).append(';');
}

// use interface class name list as key.
// 使用拼接后的接口名作为 key
String key = sb.toString();

// get cache by class loader.
Map<String, Object> cache;
// 把该类加载器加到本地缓存
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>());
}

Proxy proxy = null;
synchronized (cache) {
do {
// 从缓存中获取 Reference<Proxy> 实例
Object value = cache.get(key);
if (value instanceof Reference<?>) {
proxy = (Proxy) ((Reference<?>) value).get();
if (proxy != null) {
return proxy;
}
}
// 并发控制,保证只有一个线程可以进行后续操作
if (value == PendingGenerationMarker) {
try {
// 其他线程在此处进行等待
cache.wait();
} catch (InterruptedException e) {
}
} else {
// 放置标志位到缓存中,并跳出 while 循环进行后续操作
cache.put(key, PendingGenerationMarker);
break;
}
}
while (true);
}

long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
// 创建 ClassGenerator 对象
ccp = ClassGenerator.newInstance(cl);

Set<String> worked = new HashSet<>();
List<Method> methods = new ArrayList<>();

for (int i = 0; i < ics.length; i++) {
// 检测接口访问级别是否为 protected 或 privete
if (!Modifier.isPublic(ics[i].getModifiers())) {
// 获取接口包名
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
// 非 public 级别的接口必须在同一个包下,否者抛出异常
if (!pkg.equals(npkg)) {
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}
}
// 添加接口到 ClassGenerator 中
ccp.addInterface(ics[i]);

// 遍历接口方法
for (Method method : ics[i].getMethods()) {
// 获取方法描述,可理解为方法签名
String desc = ReflectUtils.getDesc(method);
// 如果方法描述字符串已在 worked 中,则忽略。考虑这种情况,
// A 接口和 B 接口中包含一个完全相同的方法
if (worked.contains(desc)) {
continue;
}
worked.add(desc);

int ix = methods.size();
// 获取方法返回值类型
Class<?> rt = method.getReturnType();
// 获取参数列表
Class<?>[] pts = method.getParameterTypes();

// 生成 Object[] args = new Object[1...N]
StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j++) {
// 生成 args[1...N] = ($w)$1...N;
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
}
// 生成 InvokerHandler 接口的 invoker 方法调用语句,如下:
// Object ret = handler.invoke(this, methods[1...N], args);
code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
// 返回值不为 void
if (!Void.TYPE.equals(rt)) {
// 生成返回语句,形如 return (java.lang.String) ret;
code.append(" return ").append(asArgument(rt, "ret")).append(";");
}

methods.add(method);
// 添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
}
}

if (pkg == null) {
pkg = PACKAGE_NAME;
}

// create ProxyInstance class.
// 构建接口代理类名称:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
// 生成 private java.lang.reflect.InvocationHandler handler;
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
// 为接口代理类添加带有 InvocationHandler 参数的构造方法,比如:
// porxy0(java.lang.reflect.InvocationHandler arg0) {
// handler=$1;
// }
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
// 为接口代理类添加默认构造方法
ccp.addDefaultConstructor();
// 生成接口代理类
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));

// create Proxy class.
// 构建 Proxy 子类名称,比如 Proxy1,Proxy2 等
String fcn = Proxy.class.getName() + id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
// 为 Proxy 的抽象方法 newInstance 生成实现代码,形如:
// public Object newInstance(java.lang.reflect.InvocationHandler h) {
// return new org.apache.dubbo.proxy0($1);
// }
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
Class<?> pc = ccm.toClass();
// 生成 Proxy 实现类
proxy = (Proxy) pc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
// release ClassGenerator
if (ccp != null) {
// 释放资源
ccp.release();
}
if (ccm != null) {
ccm.release();
}
synchronized (cache) {
if (proxy == null) {
cache.remove(key);
} else {
// 写缓存
cache.put(key, new WeakReference<Proxy>(proxy));
}
// 唤醒其他等待线程
cache.notifyAll();
}
}
return proxy;
}

代码比较多,大致可以分为以下几步:

  1. 对接口进行校验,检查是否是一个接口,是否不能被类加载器加载。
  2. 做并发控制,保证只有一个线程可以进行后续的代理生成操作。
  3. 创建cpp,用作为服务接口生成代理类。首先对接口定义以及包信息进行处理。
  4. 对接口的方法进行处理,包括返回类型,参数类型等。最后添加方法名、访问控制符、参数列表、方法代码等信息到 ClassGenerator 中。
  5. 创建接口代理类的信息,比如名称,默认构造方法等。
  6. 生成接口代理类。
  7. 创建ccm,ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。
  8. 设置名称、创建构造方法、添加方法
  9. 生成 Proxy 实现类。
  10. 释放资源
  11. 创建弱引用,写入缓存,唤醒其他线程。

到这里,接口代理类生成后,服务引用也就结束了。

后记

参考官方文档:dubbo.apache.org/zh-cn/docs/…

该文章讲解了dubbo的服务引用过程,下一篇就讲解服务方法调用过程。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…839840841…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%