开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

还不懂mysql的undo log和mvcc?算我输!

发表于 2021-05-16

  最近一直没啥时间写点东西,坚持分享真的好难,也不知道该分享点啥,正好有人要问我这些东西,所以腾出点时间,写一下这个主题。同样本篇可以给读者承诺,听不懂或者没收获算我输,哈哈!

  众所周知,mysql中读取方式按照是否需要传统意义的锁,分为锁定读和非锁定读两种。锁定读不用多说,那就一堆算法了,行锁,间隙锁,next-key锁,无非就是为了保证,一个事务中锁定读取一条或者多条数据时,不能读到别的事务没有提交的更改(不能脏读),不能同一个事务两次读到的数据内容不一致(应该要可重复读),不能同一个事务,两次读到的数据条数都不一致(不能幻读)。针对最常见的RR隔离级别,什么情况下至少使用什么类型的锁(行锁,gap锁,next-key锁)才能避免脏读,不可重复读,幻读。又会联系到几个事务级别,从锁粒度从小到大,RU(读未提交),RC(读已提交),RR(可重复读),S(序列化)

  RU(读未提交):英文全称就留给读者自己装B了哈哈),很显然,这个级别是最无节操的级别,就相当于我在房间,做秘密的事情,我都还没有觉得别人可以进来,门自己觉得我做完了,然后就给我开了,然后你就进来了,你好歹让我收拾一下把,再通知一下,我做完了,再进来把。当然也不会像有些人理解的门压根没上锁,总要的事情做到一半,你就可以进来,也不会这么没有节操的。在mysql中也就是就算是最低级别RU,也不会让你读到一半数据,所以还是有锁的,只不过这个锁并不是我们自己主观去决定打开的,他会认为每一条数据执行完就可以开锁了。虽然你还没手动提交事务,很显然这种级别,还是会让你看到一些不该看到的东西。脏读估计就是从这里来的把。

  RC(读已提交):很显然,只有等我事情做完,并且通知你可以进来了,你才能进来。这样你总不能看到脏东西了吧,但是有一种情况,门开着你进门上厕所,这时候有一卷新卫生纸,然后我进去用了一点,然后门打开你又进去发现纸少了一半,你是不是会怀疑这尼玛怎么两次看到的纸不一样啊,这在生活中很常见,我甚至觉得这个理所当然的没毛病。但是在数据库领域它就是觉得有毛病,比如一段代码,假设你要买一卷纸100块(好吧,我承认有点小贵),先查询你的卡里的钱发现有100,结果这是时候你妹子用微信付款花了50(明显妹子微信绑定你的卡),然后拿纸,扣钱,结果50 - 100 = -50扣款失败。这是什么神仙逻辑,就不能在我交易的过程中不要让妹子可以用钱吗,我想假设查账,扣款要是一个完整的逻辑,biu的一下不需要时间就可以搞定,就不会有这种尴尬了,因为要是每个操作的时间都足够快,要么妹子在我交易之前一刻用50,要么就在我之后一刻50,而不会在我中间一刻用50。我想正是因为我们的系统不可能做到那么快,才需要认为定义这个东西,才会遇到这种不可重复读的情况吧

  RR(可重复读):为了解决上面不可重复读的问题,谁让我们条件不够快呢?很显然,我开始要用钱的时候,直接把我的卡的钱锁住,不让别人用,等我交易完才让别人用。

  S(序列化):很显然,锁好门,排好队,我的事情确定都做完才放其他人进来

  以上回顾了一下锁定读和四种隔离级别,下面进入正题,来说说mvcc和undo log吧。mvcc作为多版本并发控制,使用undo log实现,同样也可以实现上述四种隔离级别,只不过实现手段不是通过传统意义上的锁罢了。当然针对RU(读未提交)隔离级别,所有更改的语句别的事务都可以直接看到,那根本没有保留多个版本的必要,用到的就是最新的唯一版本,同样S(序列化)级别,排队一个个去读写,也根本没有保留多个版本数据的必要,因为都是用最新的数据就行了。

  到了这里,要进入正题,其实我想说下为啥,在网上那么一大堆,并且一大堆mysql的好书,我为啥还死皮赖脸的分享undo log和mvcc。还嫌知识不够多不够乱吗?那是因为我之前学习这些,经历过很多误导,可以这么说网上那么一大堆,我还没有看到有一篇博客或者网络课堂,把这两个东西说的准确和清楚的,书上那么一大堆,我想一般人要是不仔细揣摩一阵子,可能永远都会活在自己的世界里,并以之为真理,下面我列举一下网上的那些错误或者说不准确的地方。后续再回过头来看

  1)为了实现mvcc,每行数据会多两列DATA_TRX_ID和DATA_ROLL_PTR(有些人也不要抬杠,不对可能还有一列DB_ROW_ID,当没有默认主键时会自动加上这列,请不要说于本篇无关的内容),这时候他们的解释DATA_TRX_ID表示当前数据的事务版本,没啥毛病,DATA_ROLL_PTR表示事务删除版本号,纳尼!ptr一般不是用来表示指针的吗,你跟我说时删除版本号,我书读的少,你不要忽悠我。我不否认用这样的理解方式真的可以让自己感觉理解了,但是面试要问到你这么答真的没毛病吗,除非面试官也这么理解的(菩萨保佑)。

  2)mvcc里面使用一个可读视图ReadView来辅助判断那些事务版本号,里面主要有几个元素构成,当前活跃事务ID集合mIds,mIds的最小事务ID,mIds的最大事务ID,网络上的资料99%都是这样描述的,只能说可能那些作者没有理解清楚或者是人云亦云。正确如何后面再解释

  3)书上说insert 的undo log区别于delete和update操作的undo log,insert 操作的记录,只对事务本身可见,对其他事务不可见(这是事务隔离性的要求),故该undo log可以在事务提交后直接删除。而delete和update的undo log需要等待purge线程在合适的条件删除。为啥insert的undo log就可以直接删除,为啥会有区别,各位读者看到这句话是直接死记硬背就满足了吗,你能从这句话看懂是为啥吗,然而可惜的是,至今没看到有哪本书解释过,可能是我书读的少吧。

  4)都说redo是物理日志(绝大部分是,不纠结),undo是逻辑日志,并且你新增,undo log会记录删除,你更新他会记录相反的更新,这里有两个点,怎么理解物理日志和逻辑日志,怎么理解是记录相反操作,新增和删除相反可以理解,那更新x = x - 10相反难道是x = x + 10,你减去10,相反我就给你加上10,我相信肯定好大一部分人都这么理解,这么理解就坑大了。

  5)mvcc能解决幻读吗?如果能解决为啥就mysql吹嘘RR级别解决了幻读,难道别的数据库没有mvcc,如果不能那怎么敢吹嘘呢?这真的是一个人才问题,后面再谈吧!

  我们都知道mysql数据库是以主键ID作为索引使用B+Tree的结构组织整个表的数据,存放在xx.IBD文件中,数据存放在叶子节点块中,每一块都有后一块叶子节点的指针,当然我们这里忘了强调,本篇以InnDB引擎来说明的,不然又要有人纠结了。当然这些基本的索引知识,包括上述描述的各种锁算法,可以自己看书或者别人家博客,或者得空我再分享一篇。

  回到正题,MVCC是个什么鬼?

  1)官方一点的解释:并发访问(读或写)数据库时,对正在事务内处理的数据做 多版本的管理。以达到用来避免写操作的堵塞,从而引发读操作的并发问题。

  2)无节操解释,拿厕所的纸巾举例子,为了让不同的人多次进来看到的纸巾都是一样的,那么每次有人用纸巾,先做个标记版本A,(A能看到的),然后放在一个柜子里,然后复制一个一模一样的纸巾放在纸巾盒里,标记成当前版本(自己看到的)。然后假定做一个看的规则,每个人进来只能根据规则看到之前看到的那卷纸巾,保证每个人多次进来看到的纸巾是一致的,好吧,纸巾真多,够麻烦!后面在详细解释

  MVCC是做什么的?

  1)用于事务的回滚

  2)MVCC

  undo log我们关注的类型有哪些?

  1)insert undo log

  2)update undo log

  InnoDB中的MVCC实现原理

  数据表增加两个隐藏列DATA_TRX_ID和DATA_ROLL_PTR,用于实现mvcc

![](https://gitee.com/songjianzaina/juejin_p9/raw/master/img/647731f772592d4567402a139eb3033e7ef56f67bf77ea5bbf66a2913e4743c3)

  事务 A 对值 x 进行更新之后,该行即产生一个新版本和旧版本。假设之前插入该行的事务 ID 为 100,事务 A 的 ID 为 200。操作过程如下

  1)对 ID = 1 的记录加排他锁,毕竟要修改了,总不能加共享锁把

  2)把该行原本的值拷贝到 undo log 中

3)修改改行值并且更新 DATA\_TRX\_ID,将 DATA\_ROLL\_PTR 指向刚刚拷贝到 undo log 链中的旧版本记录,记住undo log是个链表,如果多个事务多次修改会继续生成undo log并通过DATA\_ROLL\_PTR建立指向关系

  上文中的undo log是一个链表结构,也就是如果多个事务都修改了这行数据,会根据事务ID的先后,以链表形式存放,至于旧版本存放在链表的先后顺序,这个其实无关紧要,只要方便获取就好,我倾向于每次修改后把旧版放在链表的头部,这样可以保证从指针递归下来,先找到较新的数据,再找到更旧的数据,一个个版本去判断是否是自己可以看到的版本。

那么现在的核心问题就是当前事务读取数据的时候如何判断应该读取哪个版本?mysql中引入了一个可读试图ReadView的概念。主要包含如下属性

  1)mIds 代表生成ReadView时,当前活跃所有的事务ID,活跃的意思就是事务开启了还没提交,这里可以提一点,事务开启事务ID会自增,实际上事务ID就是一个全局自增的数字

  2)min_trx_id 表示当前活跃的mIds中最小的事务ID

  3)max_trx_id 表示生成ReadView时,最大的事务ID,这里一定不要理解成mIds中最大的ID,这是一个相当错误的理解,后面再解释

  4)creator_trx_id 该ReadView在那个事务里创建的,

ReadView有了上面4个属性后,那么应该以什么样的规则,判断当前事务到底可以读取哪个版本的数据呢?

 1)如果被访问版本的 data_trx_id 小于 m_ids 中的最小值,说明生成该版本的事务在 ReadView 生成前就已经提交了,那么该版本可以被当前事务访问。

2)如果被访问版本的 data_trx_id大于当前事务的最大值,说明生成该版本数据的事务在生成 ReadView 后才生成,那么该版本不可以被当前事务访问。为什么这里的最大值不是mIds的最大值,因为事务ID虽然是全局递增的,但是并不代表事务ID大的一定要在事务ID小的后面提交,也就是事务开启有先后,但是事务结束的先后和开启的先后并不是完全一致的,毕竟事务有长有短。如果此时数据的事务版本是200,而mIds中没有200,那么mIds最大值就可能小于200,那么以规则2判断就可能让本该可以访问到的数据因为这个规则,而访问不到了,归根结底就是因为没有正确找到生成ReadView时的最大事务ID,所以不能肯定的说生成该版本数据的事务在生成 ReadView 后才生成

3)如果被访问版本的 data_trx_id属性值在 最大值和最小值之间(包含),那就需要判断一下 trx_id 的值是不是在 m_ids 列表中。如果在,说明创建 ReadView 时生成该版本所属事务还是活跃的,因此该版本不可以被访问;如果不在,说明创建 ReadView 时生成该版本的事务已经被提交,该版本可以被访问。

  通俗点来说,也就是ReadView中通过最大事务ID,mIds最小事务ID,mIds活跃事务列表,将当前要读的数据的事务ID分成了3种情况,要么小于mIds的最小事务ID,很明显又在当前活跃的最小事务之前生成,又不在活跃事务中,一定是已提交的事务,这个版本肯定可以访问;要么大于生成ReadView的当前的最大事务ID,很明显在所有活跃事务之后,并且也不可能存在于活跃事务列表中,那么就说明,该版本在当前活跃事务之后才出现,总不能读取到未来的版本吧;要么处于最大最小值之间,这时候就有两种情况,因为并不是说最大最小值之间就一定是活跃的,毕竟先开启的事务并不一定会先结束,事务有大小长短,这时候就很简单,在mIds中就是还没提交的活跃版本,不可被读取,不在就是已经提交的版本,可以被读取。当一个事务要读取一行数据,首先用上面规则判断数据的最新版本也就是那行记录,如果发现可以访问就直接读取了,如果发现不能访问,就通过DATA_ROLL_PTR指针找到undo log,递归往下去找每个版本,直到读取到自己可以读取的版本为止,如果读取不到那就返回空呗。

  还有个问题就是MVCC在RC和RR隔离级别下有啥区别?

  

  很明显,如果是RC级别,那么事务A两次读取到的分别是10和20,如果是RR级别两次读取到的都是10,如果同样由ReadView判断需要怎么样才能区分两个隔离级别取的版本不一样呢?先说RC级别,两个版本不一致,说明可能事务A两次使用的ReadView里的内容肯定是有不一样,结合B事务中间有提交,而提交事务很明显会影响到mIds当前活跃事务列表,因为事务提交之后就不是活跃事务了不可能再出现在mIds列表中了,这一点很好理解。再来看RR隔离级别事务A,如果要两次读取的x值一致,除非两次用来判定的ReadView没有啥变化,这不由得让我们想起了缓存的用法,是不是可以在A事务开启的时候生成一个ReadView,然后在整个A事务期间都用这一份ReadView就行了呢,就像用缓存一样。而RC级别每次查询都生成一个最新的ReadView,是不是就可以产生区别了,这算是一个比较常规并且巧妙的设计了。

  目前为止,应该基本了解了mvcc和undo log是咋回事,那么接下来就该回到刚开始提到的,网上各种博客,在线课堂,甚至书上,所讲到的错误,不准确和模糊的地方了,为了凑字数(开个玩笑,为了方便一个个说清楚),再次copy一下上面的问题。

  1)为了实现mvcc,每行数据会多两列DATA_TRX_ID和DATA_ROLL_PTR(有些人也不要抬杠,不对可能还有一列DB_ROW_ID,当没有默认主键时会自动加上这列,请不要说于本篇无关的内容),这时候他们的解释DATA_TRX_ID表示当前数据的事务版本,没啥毛病,DATA_ROLL_PTR表示事务删除版本号。

  2)mvcc里面使用一个可读视图ReadView来辅助判断那些事务版本号,里面主要有几个元素构成,当前活跃事务ID集合mIds,mIds的最小事务ID,mIds的最大事务ID,网络上的资料99%都是这样描述的,只能说可能那些作者没有理解清楚或者是人云亦云。正确如何后面再解释

  3)书上说insert 的undo log区别于delete和update操作的undo log,insert 操作的记录,只对事务本身可见,对其他事务不可见(这是事务隔离性的要求),故该undo log可以在事务提交后直接删除。而delete和update的undo log需要等待purge线程在合适的条件删除。为啥insert的undo log就可以直接删除,为啥会有区别,各位读者看到这句话是直接死记硬背就满足了吗,你能从这句话看懂是为啥吗,然而可惜的是,至今没看到有哪本书解释过,可能是我书读的少吧。

  4)都说redo是物理日志(绝大部分是,不纠结),undo是逻辑日志,并且你新增,undo log会记录删除,你更新他会记录相反的更新,这里有两个点,怎么理解物理日志和逻辑日志,怎么理解是记录相反操作,新增和删除相反可以理解,那更新x = x - 10相反难道是x = x + 10,你减去10,相反我就给你加上10,我相信肯定好大一部分人都这么理解,这么理解就坑大了。

  5)mvcc能解决幻读吗?如果能解决为啥就mysql能吹嘘RR级别解决了幻读。

  对于问题1)我想不用说了,这个很明确了,不管看哪本书都不会这么讲,PTR一般都表示指针了,要说删除版本号,怎么不叫ROLL_ID呢,从基本的单词解释都不可能是删除版本号吧,不纠结了。

  对于问题2)ReadView中假设那么最大事务ID是mIds里的最大事务ID,那当我要读取的数据版本号大于这个活跃的最大事务ID,就一定认为我这个数据的版本是在生成ReadView之后了吗,先开启的事务一定会先提交吗,当前最大的活跃事务ID,一定是当时最大的事务ID吗?这不刚生成ReadView的时候好几个大事务ID提交了,不行吗?

  对于问题3)insert undo log和update undo log为啥要分开,为啥提交之后insert undo log可以直接删除了,update undo log还要命苦的等着purge呢?首先insert的特殊性,如果某个事务ID=100新增了一条记录,那么在这个事务版本之前这个记录是不存在的,也就是这条数据要么就是事务100提交,然后就存在这条数据了,事务100没有提交,这条数据就是null,那么请问还需要mvcc多版本控制吗,这条数据本身不就是一个版本吗,要么就是不存在,读取不到,要么就是存在,可以读取,数据是否存在,在RC和RR级别不就看事务有没有提交吗,至于RU和S前面早就说了不需要用到MVCC了。不用纠结数据在哪里读取出来的,是缓存还是磁盘,也不用纠结事务提交后数据是否真的落磁盘了,总之提交后数据可以被读取到,没提交数据就读取不到,我想这就是书上所说的事务隔离性的要求吧。所以根本不需要用到多版本的冗余,当然事务提交就可以直接删除insert的undo log了。至于update的undo log可能同时存在事务A,B,C在修改数据,到底是事务A,B或者C提交后就删除undo log呢,显然不知道吧,所以要等到purge线程事后再决定啥时候删除了。

  对于问题4)redo确实绝大部分是物理日志,物理日志的意思就是有个日志文件存放,记录了每个物理地址目前的值到底是多少,至于undo log,存在于一个特殊的段中,存在于表空间中,很明显就是和主键id组织的数据存在一个文件中,毕竟每行数据都有个指向undo log的指针了,合并单独放在一个文件中呢。如果一个新增操作,undo log记录的是一个删除类型,甚至都不需要copy任何数据,当读到这个版本,发现了删除标记,就可以直接返回null了,如果是个更新操作,那么copy一下更新前的值,没有更新的当然不用copy,也并不需要记录某个物理地址上是某个特定的值,当你读到这个undo log,那么就把读到的数据根据需要更新成undo log里对应的数据就行了。如果是一个删除操作,则将这行记录copy到undo log中,然后将原始数据标记成已经删除。这种日志难道不能看成是一种逻辑日志吗,与当前操作相反的一种逻辑日志,不需要记录对应物理地址上是些什么内容的逻辑日志。

  对于问题5)乍一看很唬人,很容易把你唬懵了。先搞清楚幻读怎么产生的,假如事务A中先后读取了age>10的数据(age加了索引),第一次读取了一条age=12的,由于紧随其后事务B又插入了一条13,导致事务A接着第二次查询发现获取了两条数据,说好的一条,怎么现在是两条,是不是我喝醉酒眼花了产生了幻觉。而在mysql的锁定读场景很明显通过间隙锁/next-key锁解决了幻读,当我读取age>10的时候,就把我周围右边的间隙的范围都给锁住,其它事务休想再插入age>10的数据,然后就解决了幻读,从源头上就让你不能插入。再来说mvcc,在RR隔离级别,当事务A开启的时候会生成一个事务的快照ReadView,里面记录了当前生成的最大事务ID,假定事务A第一次查询就一条记录,这时候事务B的事务ID最多存在两种可能,要么此时正在运行还没提交(废话你要提交了,我怎么可能就读到一条),那就一定在mIds列表里,要么此时该事务还没生成,那么事务B插入的时候,该数据的事务版本必然是大于当前ReadView中的事务最大值的,不管是从那种情况来看根据ReadView的判别规则该数据都不可能读到。我就不明白为啥网上一大把人义正严词的说单凭mvcc解决不了幻读,信息时代网上一大把资料有的说可以解决,有的说不能解决,但是又不给理由,渐渐的就让人们分成两个派别了,苦恼啊!其实我觉得mvcc天然就可以解决幻读,并且基本所有现代关系型数据库都有mvcc的实现,我有理由相信那些数据库的快照读都解决了幻读(个人猜测,毕竟没有深入研究过其它数据库)。我想人们都说mysql的RR可以解决幻读其它数据库不行,那只是针对锁定读,因为mysql 的RR级别有间隙锁,其它数据库没有这种算法,所以这么说把。不相信的人可以多看几遍上面的推理过程也可以开两个连接,准备如下两个语句,上述所得两种情况分别对应事务A和B先后执行begin,自己去测试下,没有什么比自己亲自测试让人相信了。

1
2
3
4
5
6
sql复制代码-- 事务A
begin;
select * from test where age > 10;
-- 先执行上面两句,再去别的连接执行插入
select * from test where age > 10;
rollback;
1
2
3
4
sql复制代码-- 事务B
begin;
insert into test(age) values(13);
COMMIT;

  我也看了网上一些测试,其实很多人在事务A中间加入一个更新语句让以前查不到的数据,第二次可以查到,我想说这种自己事务的操作,自己难道都不能看到吗?这是幻读吗,要是自己事务里面的修改,自己都看不到,我估计你要怀疑数据库出毛病了吧,刚修改居然看不到。我说你怎么不在事务A加一条插入age=13的语句再查询呢,绝对可以查到,自己插入修改的自己都看不到,那不是幻读了那是错误了。不信可以把事务A两个语句都加上for update,然后中间修改或者插入一条居于,现象都是一样的,因为一般都是可重入的,不会锁自己锁自己的。

  至此,基本理论知识都告一段落了,如果你们以为这样就完了,那只能说你们想多了,哈哈,作为一个专业的码农,当然是要亮出代码,下面我会将自己的理解用java代码的方式写一套简单的关于MVCC,ReadView和UNDO LOG的逻辑,代码是最简单的流水帐的模式,目的只是为了程序猿们能进一步理解本篇说的所有内容,如有雷同绝对是抄袭我的,哈哈!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
csharp复制代码package com.mvcc;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 事务类,只是为了方便看懂原理和避免偏离主题,所以这里省略了本应该需要用到的锁
* @author rongdi
* @date 2020-07-25 20:17
*/
public class Transaction {

/**
* 全局事务id
*/
private static AtomicInteger globalTrxId = new AtomicInteger();

/**
* 当前活跃的事务
*/
private static Map<Integer,Transaction> currRunningTrxMap = new ConcurrentHashMap<>();

/**
* 当前事务id
*/
private Integer currTrxId = 0;

/**
* 事务隔离级别,ru,rc,rr和s
*/
private String trxMode = "rr";

/**
* 只有rc和rr级别非锁定读才需要用到mvcc,这个readView是为了方便判断到底哪个版本的数据可以被
* 当前事务获取到的视图工具
*/
private ReadView readView;

/**
* 开启事务
*/
public void begin() {
/**
* 根据全局事务计数器拿到当前事务ID
*/
currTrxId = globalTrxId.incrementAndGet();
/**
* 将当前事务放入当前活跃事务映射中
*/
currRunningTrxMap.put(currTrxId,this);
/**
* 构造或者更新当前事务使用的mvcc辅助判断视图ReadView
*/
updateReadView();
}

/**
* 构造或者更新当前事务使用的mvcc辅助判断视图ReadView
*/
public void updateReadView() {
/**
* 构造辅助视图工具ReadView
*/
readView = new ReadView(currTrxId);
/**
* 设置当前事务最大值
*/
readView.setMaxTrxId(globalTrxId.get());
List<Integer> mIds = new ArrayList<>(currRunningTrxMap.keySet());
Collections.sort(mIds);
/**
* 设置当前活跃事务id
*/
readView.setmIds(new ArrayList<>(currRunningTrxMap.keySet()));
/**
* 设置mIds中最小事务ID
*/
readView.setMinTrxId(mIds.isEmpty()? 0 : mIds.get(0));
/**
* 设置当前事务ID
*/
readView.setCurrTrxId(currTrxId);
}

/**
* 提交事务
*/
public void commit() {
currRunningTrxMap.remove(currTrxId);
}

public static AtomicInteger getGlobalTrxId() {
return globalTrxId;
}

public static void setGlobalTrxId(AtomicInteger globalTrxId) {
Transaction.globalTrxId = globalTrxId;
}

public static Map<Integer, Transaction> getCurrRunningTrxMap() {
return currRunningTrxMap;
}

public static void setCurrRunningTrxMap(Map<Integer, Transaction> currRunningTrxMap) {
Transaction.currRunningTrxMap = currRunningTrxMap;
}

public Integer getCurrTrxId() {
return currTrxId;
}

public void setCurrTrxId(Integer currTrxId) {
this.currTrxId = currTrxId;
}

public String getTrxMode() {
return trxMode;
}

public void setTrxMode(String trxMode) {
this.trxMode = trxMode;
}

public ReadView getReadView() {
return readView;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
kotlin复制代码package com.mvcc;

import java.util.ArrayList;
import java.util.List;

/**
* 模拟mysql中的ReadView
* @author rongdi
* @date 2020-07-25 20:31
*/
public class ReadView {

/**
* 记录当前活跃的事务ID
*/
private List<Integer> mIds = new ArrayList<>();

/**
* 记录当前活跃的最小事务ID
*/
private Integer minTrxId;

/**
* 记录当前最大事务ID,注意并不是活跃的最大ID,包括已提交的,因为有可能最大的事务ID已经提交了
*/
private Integer maxTrxId;

/**
* 记录当前生成readView时的事务ID
*/
private Integer currTrxId;

public ReadView(Integer currTrxId) {
this.currTrxId = currTrxId;
}

public Data read(Data data) {
/**
* 先判断当前最新数据是否可以访问
*/
if(canRead(data.getDataTrxId())) {
return data;
}
/**
* 获取到该数据的undo log引用
*/
UndoLog undoLog = data.getNextUndoLog();
do {
/**
* 如果undoLog存在并且可读,则合并返回
*/
if(undoLog != null && canRead(undoLog.getTrxId())) {
return merge(data,undoLog);
}
/**
* 还没找到可读版本,继续获取下一个更旧版本
*/
undoLog = undoLog.getNext();
} while(undoLog != null && undoLog.getNext() != null);

/**
* 整个undo log链都找不到可读的,没办法了我也帮不鸟你
*/
return null;
}

/**
* 合并最新数据和目标版本的undo log数据,返回最终可访问数据
*/
private Data merge(Data data,UndoLog undoLog) {
if(undoLog == null) {
return data;
}
/**
* update 更新 直接把undo保存的数据替换过来返回
* add 新增 直接把undo保存的数据替换过来返回
* del 删除 数据当时是不存在的,直接返回null就好了
*/
if("update".equalsIgnoreCase(undoLog.getOperType())) {
data.setValue(undoLog.getValue());
return data;
} else if("add".equalsIgnoreCase(undoLog.getOperType())) {
data.setId(undoLog.getRecordId());
data.setValue(undoLog.getValue());
return data;
} else if("del".equalsIgnoreCase(undoLog.getOperType())) {
return null;
} else {
//其余情况,不管了,直接返回算了
return data;
}
}

private boolean canRead(Integer dataTrxId) {
/**
* 1.如果当前数据的所属事务正好是当前事务或者数据的事务小于mIds的最小事务ID,
* 则说明产生该数据的事务在生成ReadView之前已经提交了,该数据可以访问
*/
if(dataTrxId == null || dataTrxId.equals(currTrxId) || dataTrxId < minTrxId) {
return true;
}
/**
* 2.如果当前数据所属事务大于当前最大事务ID(并不是mIds的最大事务,好多人都觉得是),则
* 说明产生该数据是在生成ReadView之后,则当前事务不可访问
*/
if(dataTrxId > maxTrxId) {
return false;
}
/**
* 3.如果当前数据所属事务介于mIds最小事务和当前最大事务ID之间,则需要进一步判断
*/
if(dataTrxId >= minTrxId && dataTrxId <= maxTrxId) {
/**
* 如果当前数据所属事务包含在mIds当前活跃事务列表中,则说明该事务还没提交,
* 不可访问,反之表示数据所属事务已经提交了,可以访问
*/
if(mIds.contains(dataTrxId)) {
return false;
} else {
return true;
}
}
return false;
}


public List<Integer> getmIds() {
return mIds;
}

public void setmIds(List<Integer> mIds) {
this.mIds = mIds;
}

public Integer getMinTrxId() {
return minTrxId;
}

public void setMinTrxId(Integer minTrxId) {
this.minTrxId = minTrxId;
}

public Integer getMaxTrxId() {
return maxTrxId;
}

public void setMaxTrxId(Integer maxTrxId) {
this.maxTrxId = maxTrxId;
}

public Integer getCurrTrxId() {
return currTrxId;
}

public void setCurrTrxId(Integer currTrxId) {
this.currTrxId = currTrxId;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
arduino复制代码package com.mvcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* 模仿数据库的数据存储地方
* @author rongdi
* @date 2020-07-25 19:24
*/
public class Data {

/**
* 模拟一个存放数据的表
*/
private static Map<Integer,Data> dataMap = new ConcurrentHashMap<>();

/**
* 记录的ID
*/
private Integer id;

/**
* 记录的数据
*/
private String value;

/**
* 记录当前记录的事务ID
*/
private Integer dataTrxId;

/**
* 指向上个版本的undo log的引用
*/
private UndoLog nextUndoLog;

/**
* 标记数据是否删除,实际数据库会根据情况由purge线程完成真实数据的清楚
*/
private boolean isDelete;

public Data(Integer dataTrxId) {
this.dataTrxId = dataTrxId;
}

/**
* 模拟数据库更新操作,这里就不要自增id,直接指定id了
* @param id
* @param value
* @return
*/
public Integer update(Integer id,String value) {
/**
* 获取原值,这里就不判断是否存在,于本例核心偏离的逻辑了
*/
Data oldData = dataMap.get(id);
/**
* 更新当前数据
*/
this.id = id;
this.value = value;
/**
* 不要忘了,为了数据的一致性,要准备好随时失败回滚的undo log,这里既然是修改数据,那代表以前
* 这条数据是就记录一下以前的旧值,将旧值构造成一个undo log记录
*/
UndoLog undoLog = new UndoLog(id,oldData.getValue(),oldData.getDataTrxId(),"update");
/**
* 将旧值的undo log挂到当前新的undo log之后,形成一个按从新到旧顺序的一个undo log链表
*/
undoLog.setNext(oldData.getNextUndoLog());
/**
* 将当前数据的undo log引用指向新生成的undo log
*/
this.nextUndoLog = undoLog;
/**
* 更新数据表当前id的数据
*/
dataMap.put(id,this);
return id;
}

/**
* 按照上面更新操作的理解,删除相当于是把原纪录修改成标记成已删除状态的记录了
* @param id
*/
public void delete(Integer id) {
/**
* 获取原值,这里就不判断是否存在,于本例核心偏离的逻辑了
*/
Data oldData = dataMap.get(id);
this.id = id;
/**
* 将当前数据标记成已删除
*/
this.setDelete(true);
/**
* 同样,为了数据的一致性,要准备好随时失败回滚的undo log,这里既然是删除数据,那代表以前
* 这条数据存在,就记录一下以前的旧值,并将旧值构造成一个逻辑上新增的undo log记录
*/
UndoLog undoLog = new UndoLog(id,oldData.getValue(),oldData.getDataTrxId(),"add");
/**
* 将旧值的undo log挂到当前新的undo log之后,形成一个按从新到旧顺序的一个undo log链表
*/
undoLog.setNext(oldData.getNextUndoLog());
/**
* 将当前数据的undo log引用指向新生成的undo log
*/
this.nextUndoLog = undoLog;
/**
* 更新数据表当前id的数据
*/
dataMap.put(id,this);
}

/**
* 按照上面更新操作的理解,新增相当于是把原纪录原来不存在的记录修改成了新的记录
* @param id
*/
public void insert(Integer id,String value) {
/**
* 更新当前数据
*/
this.id = id;
this.value = value;
/**
* 同样,为了数据的一致性,要准备好随时失败回滚的undo log,这里既然是新增数据,那代表以前
* 这条数据不存在,就记录一下以前为空值,并将空值构造成一个逻辑上删除的undo log记录
*/
UndoLog undoLog = new UndoLog(id,null,null,"delete");
/**
* 将当前数据的undo log引用指向新生成的undo log
*/
this.nextUndoLog = undoLog;
/**
* 更新数据表当前id的数据
*/
dataMap.put(id,this);
}

/**
* 模拟使用mvcc非锁定读,这里的mode表示事务隔离级别,只有rc和rr级别才需要用到mvcc,同样为了方便,
* 使用英文表示隔离级别,rc表示读已提交,rr表示可重复读
*/
public Data select(Integer id) {
/**
* 拿到当前事务,然后判断事务隔离级别,如果是rc,则执行一个语句就要更新一下ReadView,这里写的
* 这么直接就是为了好理解
*/
Transaction currTrx = Transaction.getCurrRunningTrxMap().get(this.getDataTrxId());
String trxMode = currTrx.getTrxMode();
if("rc".equalsIgnoreCase(trxMode)) {
currTrx.updateReadView();
}
/**
* 拿到当前事务辅助视图ReadView
*/
ReadView readView = currTrx.getReadView();
/**
* 模拟根据id取出一行数据
*/
Data data = Data.getDataMap().get(id);
/**
* 使用readView判断并读取当前事务可以读取到的最终数据
*/
return readView.read(data);
}


public static Map<Integer, Data> getDataMap() {
return dataMap;
}

public static void setDataMap(Map<Integer, Data> dataMap) {
Data.dataMap = dataMap;
}

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public Integer getDataTrxId() {
return dataTrxId;
}

public void setDataTrxId(Integer dataTrxId) {
this.dataTrxId = dataTrxId;
}

public UndoLog getNextUndoLog() {
return nextUndoLog;
}

public void setNextUndoLog(UndoLog nextUndoLog) {
this.nextUndoLog = nextUndoLog;
}

public boolean isDelete() {
return isDelete;
}

public void setDelete(boolean delete) {
isDelete = delete;
}

@Override
public String toString() {
return "Data{" +
"id=" + id +
", value='" + value + '\'' +
'}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
typescript复制代码package com.mvcc;

/**
* 模仿undo log链
* @author rondi
* @date 2020-07-25 19:52
*/
public class UndoLog {

/**
* 指向上一个undo log
*/
private UndoLog pre;

/**
* 指向下一个undo log
*/
private UndoLog next;

/**
* 记录数据的ID
*/
private Integer recordId;

/**
* 记录的数据
*/
private String value;
/**
* 记录当前数据所属的事务ID
*/
private Integer trxId;

/**
* 操作类型,感觉用整型好一点,但是如果用整型,又要搞个枚举,麻烦,所以直接用字符串了,能表达意思就好
* update 更新
* add 新增
* del 删除
*/
private String operType;

public UndoLog(Integer recordId, String value, Integer trxId, String operType) {
this.recordId = recordId;
this.value = value;
this.trxId = trxId;
this.operType = operType;
}

public UndoLog getPre() {
return pre;
}

public void setPre(UndoLog pre) {
this.pre = pre;
}

public UndoLog getNext() {
return next;
}

public void setNext(UndoLog next) {
this.next = next;
}

public Integer getRecordId() {
return recordId;
}

public void setRecordId(Integer recordId) {
this.recordId = recordId;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public Integer getTrxId() {
return trxId;
}

public void setTrxId(Integer trxId) {
this.trxId = trxId;
}

public String getOperType() {
return operType;
}

public void setOperType(String operType) {
this.operType = operType;
}
}

  一共就四个类,敢兴趣的读者可以写个测试类,然后开几个线程,每个事务使用一个线程模拟,并加上睡眠延时去跑一下代码。再次强调,本代码只是为了让读者理解一下上面说的理论知识,并不是mysql的真实实现,可以理解成作者本人认为的一种可行的简单实现。回顾全文,发现真心不知道如何排版,艺术能力有限,自我感觉只是把问题说清楚了,希望各位见谅!好了,下次再见吧!

  

  

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

25种代码坏味道总结+优化示例

发表于 2021-05-16

前言

什么样的代码是好代码呢?好的代码应该命名规范、可读性强、扩展性强、健壮性……而不好的代码又有哪些典型特征呢?这25种代码坏味道大家要注意啦

  • 公众号:捡田螺的小男孩
  • github地址

1. Duplicated Code (重复代码)

重复代码就是不同地点,有着相同的程序结构。一般是因为需求迭代比较快,开发小伙伴担心影响已有功能,就复制粘贴造成的。重复代码很难维护的,如果你要修改其中一段的代码逻辑,就需要修改多次,很可能出现遗漏的情况。

如何优化重复代码呢?分三种情况讨论:

  1. 同一个类的两个函数含有相同的表达式
1
2
3
4
5
6
7
8
9
10
11
12
csharp复制代码class A {
public void method1() {
doSomething1
doSomething2
doSomething3
}
public void method2() {
doSomething1
doSomething2
doSomething4
}
}

优化手段:可以使用Extract Method(提取公共函数) 抽出重复的代码逻辑,组成一个公用的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
csharp复制代码class A {
public void method1() {
commonMethod();
doSomething3
}
public void method2() {
commonMethod();
doSomething4
}

public void commonMethod(){
doSomething1
doSomething2
}
}
  1. 两个互为兄弟的子类内含相同的表达式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
csharp复制代码class A extend C {
public void method1() {
doSomething1
doSomething2
doSomething3
}
}

class B extend C {
public void method1() {
doSomething1
doSomething2
doSomething4
}
}

优化手段:对两个类都使用Extract Method(提取公共函数),然后把抽取出来的函数放到父类中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
csharp复制代码class C {
public void commonMethod(){
doSomething1
doSomething2
}
}
class A extend C {
public void method1() {
commonMethod();
doSomething3
}
}

class B extend C {
public void method1() {
commonMethod();
doSomething4
}
}
  1. 两个毫不相关的类出现重复代码

如果是两个毫不相关的类出现重复代码,可以使用Extract Class将重复代码提炼到一个类中。这个新类可以是一个普通类,也可以是一个工具类,看具体业务怎么划分吧。

2 .Long Method (长函数)

长函数是指一个函数方法几百行甚至上千行,可读性大大降低,不便于理解。反例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
csharp复制代码public class Test {
private String name;
private Vector<Order> orders = new Vector<Order>();

public void printOwing() {
//print banner
System.out.println("****************");
System.out.println("*****customer Owes *****");
System.out.println("****************");

//calculate totalAmount
Enumeration env = orders.elements();
double totalAmount = 0.0;
while (env.hasMoreElements()) {
Order order = (Order) env.nextElement();
totalAmount += order.getAmout();
}

//print details
System.out.println("name:" + name);
System.out.println("amount:" + totalAmount);
......
}
}

可以使用Extract Method,抽取功能单一的代码段,组成命名清晰的小函数,去解决长函数问题,正例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
scss复制代码public class Test {
private String name;
private Vector<Order> orders = new Vector<Order>();

public void printOwing() {

//print banner
printBanner();
//calculate totalAmount
double totalAmount = getTotalAmount();
//print details
printDetail(totalAmount);
}

void printBanner(){
System.out.println("****************");
System.out.println("*****customer Owes *****");
System.out.println("****************");
}

double getTotalAmount(){
Enumeration env = orders.elements();
double totalAmount = 0.0;
while (env.hasMoreElements()) {
Order order = (Order) env.nextElement();
totalAmount += order.getAmout();
}
return totalAmount;
}

void printDetail(double totalAmount){
System.out.println("name:" + name);
System.out.println("amount:" + totalAmount);
}

}

3. Large Class (过大的类)

一个类做太多事情,维护了太多功能,可读性变差,性能也会下降。举个例子,订单相关的功能你放到一个类A里面,商品库存相关的也放在类A里面,积分相关的还放在类A里面...反例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
csharp复制代码Class A{
public void printOrder(){
System.out.println("订单");
}

public void printGoods(){
System.out.println("商品");
}

public void printPoints(){
System.out.println("积分");
}
}

试想一下,乱七八糟的代码块都往一个类里面塞,还谈啥可读性。应该按单一职责,使用Extract Class把代码划分开,正例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
csharp复制代码Class Order{
public void printOrder(){
System.out.println("订单");
}
}

Class Goods{
public void printGoods(){
System.out.println("商品");
}
}

Class Points{
public void printPoints(){
System.out.println("积分");
}
}
}

4. Long Parameter List (过长参数列)

方法参数数量过多的话,可读性很差。如果有多个重载方法,参数很多的话,有时候你都不知道调哪个呢。并且,如果参数很多,做新老接口兼容处理也比较麻烦。

1
2
3
arduino复制代码public void getUserInfo(String name,String age,String sex,String mobile){
// do something ...
}

如何解决过长参数列问题呢?将参数封装成结构或者类,比如我们将参数封装成一个DTO类,如下:

1
2
3
4
5
6
7
8
9
10
arduino复制代码public void getUserInfo(UserInfoParamDTO userInfoParamDTO){
// do something ...
}

class UserInfoParamDTO{
private String name;
private String age;
private String sex;
private String mobile;
}

5. Divergent Change (发散式变化)

对程序进行维护时, 如果添加修改组件, 要同时修改一个类中的多个方法, 那么这就是 Divergent Change。举个汽车的例子,某个汽车厂商生产三种品牌的汽车:BMW、Benz和LaoSiLaiSi,每种品牌又可以选择燃油、纯电和混合动力。反例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
csharp复制代码/**
* 公众号:捡田螺的小男孩
*/
public class Car {

private String name;

void start(Engine engine) {
if ("HybridEngine".equals(engine.getName())) {
System.out.println("Start Hybrid Engine...");
} else if ("GasolineEngine".equals(engine.getName())) {
System.out.println("Start Gasoline Engine...");
} else if ("ElectricEngine".equals(engine.getName())) {
System.out.println("Start Electric Engine");
}
}

void drive(Engine engine,Car car) {
this.start(engine);
System.out.println("Drive " + getBrand(car) + " car...");
}

String getBrand(Car car) {
if ("Baoma".equals(car.getName())) {
return "BMW";
} else if ("BenChi".equals(car.getName())) {
return "Benz";
} else if ("LaoSiLaiSi".equals(car.getName())) {
return "LaoSiLaiSi";
}
return null;
}
}

如果新增一种品牌新能源电车,然后它的启动引擎是核动力呢,那么就需要修改Car类的start和getBrand方法啦,这就是代码坏味道:Divergent Change (发散式变化)。

如何优化呢?一句话总结:拆分类,将总是一起变化的东西放到一块。

  • 运用提炼类(Extract Class) 拆分类的行为。
  • 如果不同的类有相同的行为,提炼超类(Extract Superclass) 和 提炼子类(Extract Subclass)。

正例如下:

因为Engine是独立变化的,所以提取一个Engine接口,如果新加一个启动引擎,多一个实现类即可。如下:

1
2
3
4
5
6
7
8
9
10
11
csharp复制代码//IEngine
public interface IEngine {
void start();
}

public class HybridEngineImpl implements IEngine {
@Override
public void start() {
System.out.println("Start Hybrid Engine...");
}
}

因为drive方法依赖于Car,IEngine,getBand方法;getBand方法是变化的,也跟Car是有关联的,所以可以搞个抽象Car的类,每个品牌汽车继承于它即可,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
scala复制代码public abstract class AbstractCar {

protected IEngine engine;

public AbstractCar(IEngine engine) {
this.engine = engine;
}

public abstract void drive();
}

//奔驰汽车
public class BenzCar extends AbstractCar {

public BenzCar(IEngine engine) {
super(engine);
}

@Override
public void drive() {
this.engine.start();
System.out.println("Drive " + getBrand() + " car...");
}

private String getBrand() {
return "Benz";
}
}

//宝马汽车
public class BaoMaCar extends AbstractCar {

public BaoMaCar(IEngine engine) {
super(engine);
}

@Override
public void drive() {
this.engine.start();
System.out.println("Drive " + getBrand() + " car...");
}

private String getBrand() {
return "BMW";
}
}

细心的小伙伴,可以发现不同子类BaoMaCar和BenzCar的drive方法,还是有相同代码,所以我们可以再扩展一个抽象子类,把drive方法推进去,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
scala复制代码public abstract class AbstractRefinedCar extends AbstractCar {

public AbstractRefinedCar(IEngine engine) {
super(engine);
}

@Override
public void drive() {
this.engine.start();
System.out.println("Drive " + getBrand() + " car...");
}

abstract String getBrand();
}

//宝马
public class BaoMaRefinedCar extends AbstractRefinedCar {

public BaoMaRefinedCar(IEngine engine) {
super(engine);
}

@Override
String getBrand() {
return "BMW";
}
}

如果再添加一个新品牌,搞个子类,继承AbstractRefinedCar即可,如果新增一种启动引擎,也是搞个类实现IEngine接口即可

6. Shotgun Surgery(散弹式修改)

当你实现某个小功能时,你需要在很多不同的类做出小修改。这就是Shotgun Surgery(散弹式修改)。它跟发散式变化(Divergent Change) 的区别就是,它指的是同时对多个类进行单一的修改,发散式变化指在一个类中修改多处。反例如下:

1
2
3
4
5
6
7
8
9
10
11
kotlin复制代码public class DbAUtils {
@Value("${db.mysql.url}")
private String mysqlDbUrl;
...
}

public class DbBUtils {
@Value("${db.mysql.url}")
private String mysqlDbUrl;
...
}

多个类使用了db.mysql.url这个变量,如果将来需要切换mysql到别的数据库,如Oracle,那就需要修改多个类的这个变量!

如何优化呢?将各个修改点,集中到一起,抽象成一个新类。

可以使用 Move Method (搬移函数)和 Move Field (搬移字段)把所有需要修改的代码放进同一个类,如果没有合适的类,就去new一个。

正例如下:

1
2
3
4
5
kotlin复制代码public class DbUtils {
@Value("${db.mysql.url}")
private String mysqlDbUrl;
...
}

7. Feature Envy (依恋情节)

某个函数为了计算某个值,从另一个对象那里调用几乎半打的取值函数。通俗点讲,就是一个函数使用了大量其他类的成员,有人称之为红杏出墙的函数。反例如下:

1
2
3
4
5
6
7
8
9
10
11
csharp复制代码public class User{
private Phone phone;
public User(Phone phone){
this.phone = phone;
}
public void getFullPhoneNumber(Phone phone){
System.out.println("areaCode:" + phone.getAreaCode());
System.out.println("prefix:" + phone.getPrefix());
System.out.println("number:" + phone.getNumber());
}
}

如何解决呢?在这种情况下,你可以考虑将这个方法移动到它使用的那个类中。例如,要将 getFullPhoneNumber() 从 User 类移动到Phone 类中,因为它调用了Phone 类的很多方法。

8. Data Clumps(数据泥团)

数据项就像小孩子,喜欢成群结队地呆在一块。如果一些数据项总是一起出现的,并且一起出现更有意义的,就可以考虑,按数据的业务含义来封装成数据对象。反例如下:

1
2
3
4
5
6
7
8
9
10
arduino复制代码public class User {

private String firstName;
private String lastName;

private String province;
private String city;
private String area;
private String street;
}

正例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
arduino复制代码public class User {

private UserName username;
private Adress adress;
}

class UserName{
private String firstName;
private String lastName;
}
class Address{
private String province;
private String city;
private String area;
private String street;
}

9. Primitive Obsession (基本类型偏执)

多数编程环境都有两种数据类型,结构类型和基本类型。这里的基本类型,如果指Java语言的话,不仅仅包括那八大基本类型哈,也包括String等。如果是经常一起出现的基本类型,可以考虑把它们封装成对象。我个人觉得它有点像Data Clumps(数据泥团) 举个反例如下:

1
2
3
4
5
6
7
vbnet复制代码// 订单
public class Order {
private String customName;
private String address;
private Integer orderId;
private Integer price;
}

正例:

1
2
3
4
5
6
7
8
9
10
11
vbnet复制代码// 订单类
public class Order {
private Custom custom;
private Integer orderId;
private Integer price;
}
// 把custom相关字段封装起来,在Order中引用Custom对象
public class Custom {
private String name;
private String address;
}

当然,这里不是所有的基本类型,都建议封装成对象,有关联或者一起出现的,才这么建议哈。

10. Switch Statements (Switch 语句)

这里的Switch语句,不仅包括Switch相关的语句,也包括多层if...else的语句哈。很多时候,switch语句的问题就在于重复,如果你为它添加一个新的case语句,就必须找到所有的switch语句并且修改它们。

示例代码如下:

1
2
3
4
5
6
7
8
9
csharp复制代码    String medalType = "guest";
if ("guest".equals(medalType)) {
System.out.println("嘉宾勋章");
} else if ("vip".equals(medalType)) {
System.out.println("会员勋章");
} else if ("guard".equals(medalType)) {
System.out.println("守护勋章");
}
...

这种场景可以考虑使用多态优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
typescript复制代码//勋章接口
public interface IMedalService {
void showMedal();
}

//守护勋章策略实现类
public class GuardMedalServiceImpl implements IMedalService {
@Override
public void showMedal() {
System.out.println("展示守护勋章");
}
}
//嘉宾勋章策略实现类
public class GuestMedalServiceImpl implements IMedalService {
@Override
public void showMedal() {
System.out.println("嘉宾勋章");
}
}

//勋章服务工厂类
public class MedalServicesFactory {

private static final Map<String, IMedalService> map = new HashMap<>();
static {
map.put("guard", new GuardMedalServiceImpl());
map.put("vip", new VipMedalServiceImpl());
map.put("guest", new GuestMedalServiceImpl());
}
public static IMedalService getMedalService(String medalType) {
return map.get(medalType);
}
}

当然,多态只是优化的一个方案,一个方向。如果只是单一函数有些简单选择示例,并不建议动不动就使用动态,因为显得有点杀鸡使用牛刀了。

11.Parallel Inheritance Hierarchies( 平行继承体系)

平行继承体系 其实算是Shotgun Surgery的特殊情况啦。当你为A类的一个子类Ax,也必须为另一个类B相应的增加一个子类Bx。

解决方法:遇到这种情况,就要消除两个继承体系之间的引用,有一个类是可以去掉继承关系的。

12. Lazy Class (冗赘类)

把这些不再重要的类里面的逻辑,合并到相关类,删掉旧的。一个比较常见的场景就是,假设系统已经有日期工具类DateUtils,有些小伙伴在开发中,需要用到日期转化等,不管三七二十一,又自己实现一个新的日期工具类。

13. Speculative Generality(夸夸其谈未来性)

尽量避免过度设计的代码。例如:

  • 只有一个if else,那就不需要班门弄斧使用多态;
  • 如果某个抽象类没有什么太大的作用,就运用Collapse Hierarchy(折叠继承体系)
    1
    2
    3
    4
    5
    6
    7
    * 如果函数的某些参数没用上,就移除。


    ### 14. Temporary Field(令人迷惑的临时字段)


    某个实例变量仅为某种特定情况而定而设,这样的代码就让人不易理解,我们称之为 `Temporary Field(令人迷惑的临时字段)`。 反例如下:

arduino复制代码public class PhoneAccount {

private double excessMinutesCharge;
private static final double RATE = 8.0;

public double computeBill(int minutesUsed, int includedMinutes) {
    excessMinutesCharge = 0.0;
    int excessMinutes = minutesUsed - includedMinutes;
    if (excessMinutes >= 1) {
        excessMinutesCharge = excessMinutes * RATE;
    }
    return excessMinutesCharge;
}

public double chargeForExcessMinutes(int minutesUsed, int includedMinutes) {
    computeBill(minutesUsed, includedMinutes);
    return excessMinutesCharge;
}

}

1
2
3
4
5
6
7
8

思考一下,临时字段`excessMinutesCharge`是否多余呢?


### 15. Message Chains (过度耦合的消息链)


当你看到用户向一个对象请求另一个对象,然后再向后者请求另一个对象,然后再请求另一个对象...这就是消息链。实际代码中,你看到的可能是一长串`getThis()`或一长串临时变量。反例如下:

scss复制代码A.getB().getC().getD().getTianLuoBoy().getData();

1
2
3
4
5
6
7
8

A想要获取需要的数据时,必须要知道B,又必须知道C,又必须知道D...其实A需要知道得太多啦,回头想下**封装性**,嘻嘻。其实可以通过拆函数或者移动函数解决,比如由B作为代理,搞个函数直接返回A需要数据。


### 16. Middle Man (中间人)


对象的基本特征之一就是封装,即对外部世界隐藏其内部细节。封装往往伴随委托,过度运用委托就不好:某个类接口有一半的函数都委托给其他类。可以使用`Remove Middle Man`优化。反例如下:

javascript复制代码A.B.getC(){
return C.getC();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90

其实,A可以直接通过C去获取C,而不需要通过B去获取。


### 17. Inappropriate Intimacy(狎昵关系)


如果两个类过于亲密,过分狎昵,你中有我,我中有你,两个类彼此使用对方的私有的东西,就是一种坏代码味道。我们称之为`Inappropriate Intimacy(狎昵关系)`


![](https://gitee.com/songjianzaina/juejin_p15/raw/master/img/99b6b67c2f0a634c43eb7f8d2f28058cd4ba17fc439e5965def5054b105bea8f)


建议尽量把有关联的方法或属性抽离出来,放到公共类,以减少关联。


![](https://gitee.com/songjianzaina/juejin_p15/raw/master/img/07bddba30c873536c0004cb95a040821f868b595abca79438bd248c4c94b8eb2)


### 18. Alternative Classes with Different Interfaces (异曲同工的类)


A类的接口a,和B类的接口b,做的的是相同一件事,或者类似的事情。我们就把A和B叫做异曲同工的类。


![](https://gitee.com/songjianzaina/juejin_p15/raw/master/img/29816c7a9e2c37ea615ae5215e9aca7a250881610a7dc9b5276c613d35efc796)


可以通过**重命名,移动函数,或抽象子类**等方式优化


### 19. Incomplete Library Class (不完美的类库)


大多数对象只要够用就好,如果类库构造得不够好,我们不可能修改其中的类使它完成我们希望完成的工作。可以酱紫:**包一层函数或包成新的类**。


### 20. Data Class (纯数据类)


什么是Data Class? 它们拥有一些字段,以及用于访问(读写)这些字段的函数。这些类很简单,仅有公共成员变量,或简单操作的函数。


如何优化呢?将**相关操作封装进去,减少public成员变量**。比如:


* 如果拥有public字段-> `Encapsulate Field`
* 如果这些类内含容器类的字段,应该检查它们是不是得到了恰当地封装-> `Encapsulate Collection` 封装起来
* 对于不该被其他类修改的字段-> `Remove Setting Method` ->找出取值/设置函数被其他类运用的地点-> `Move Method` 把这些调用行为搬移到`Data Class`来。如果无法搬移整个函数,就运用 `Extract Method`产生一个可被搬移的函数->`Hide Method`把这些取值/设置函数隐藏起来。


### 21. Refused Bequest (被拒绝的馈赠)


子类应该继承父类的数据和函数。子类继承得到所有函数和数据,却只使用了几个,那就是**继承体系设计错误**,需要优化。


* 需要为这个子类新建一个兄弟类->`Push Down Method`和`Push Down Field`把所有用不到的函数下推给兄弟类,这样一来,超类就只持有所有子类共享的东西。所有超类都应该是抽象的。
* 如果子类复用了超类的实现,又不愿意支持超类的接口,可以不以为然。但是不能胡乱修改继承体系->`Replace Inheritance with Delegation`(用委派替换继承).


![](https://gitee.com/songjianzaina/juejin_p15/raw/master/img/ba2414f9c92161745715efafd945209b1e0a73075631cad0aec5cd8c3acd8f31)


### 22. Comments (过多的注释)


这个点不是说代码不建议写注释哦,而是,建议大家**避免用注释解释代码,避免过多的注释**。这些都是常见注释的坏味道:


* 多余的解释
* 日志式注释
* 用注释解释变量等
* ...


如何优化呢?


* 方法函数、变量的**命名要规范、浅显易懂**、避免用注释解释代码。
* **关键、复杂的业务**,使用**清晰、简明**的注释


### 23. 神奇命名


方法函数、变量、类名、模块等,都需要简单明了,浅显易懂。避免靠自己主观意识瞎起名字。


反例:

ini复制代码boolean test = chenkParamResult(req);

1
2

正例:

ini复制代码boolean isParamPass = chenkParamResult(req);

1
2
3
4
5

### 24. 神奇魔法数


日常开发中,经常会遇到这种代码:

scss复制代码if(userType==1){
//doSth1
}else If( userType ==2){
//doSth2
}
…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

代码中的这个`1和2`都表示什么意思呢?再比如`setStatus(1)`中的`1`又表示什么意思呢?看到类似坏代码,可以这两种方式优化:


* **新建个常量类**,把一些常量放进去,统一管理,并且写好注释;
* **建一个枚举类**,把相关的魔法数字放到一起管理。


### 25. 混乱的代码层次调用


我们代码一般会分`dao层`、`service层`和`controller层`。


* dao层主要做数据持久层的工作,与数据库打交道。
* service层主要负责业务逻辑处理。
* controller层负责具体的业务模块流程的控制。


所以一般就是`controller`调用`service`,`service`调`dao`。如果你在代码看到`controller`直接调用`dao`,那可以考虑是否优化啦。**反例如下**:

kotlin复制代码@RestController(“user”)
public class UserController {

Autowired
private UserDao userDao;

@RequestMapping("/queryUserInfo")
public String queryUserInfo(String userName) {
    return userDao.selectByUserName(userName);
}

}


### 参考与感谢


* [软工实验:常见的代码坏味道以及重构举例](https://blog.csdn.net/weixin_45763536/article/details/106315969?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-5.vipsorttest&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-5.vipsorttest)
* [22种代码的坏味道,一句话概括](https://blog.csdn.net/windcao/article/details/25773219?utm_medium=distribute.pc_relevant_download.none-task-blog-2~default~BlogCommendFromBaidu~default-15.nonecase&depth_1-utm_source=distribute.pc_relevant_download.none-task-blog-2~default~BlogCommendFromBaidu~default-15.nonecas)
* [【重构】 代码的坏味道总结 Bad Smell](https://blog.csdn.net/shulianghan/article/details/20009689)
* [Code Smell](https://www.dazhuanlan.com/2019/12/26/5e04029e800c9/)
* 《重构改善既有代码的设计》


**本文转载自:** [掘金](https://juejin.cn/post/6962812178537644063)

*[开发者博客 – 和开发相关的 这里全都有](https://dev.newban.cn/)*

怎样才能将模板和策略设计模式结合起来使用

发表于 2021-05-16

前言

我在之前的文章孔乙己“茴”字四种写法引起我对策略模式实现的思考中留下了一个悬念,文章中的代码实现出现了较多的重复代码块,这样的问题对于一个对代码质量有较高要求的人是不可容忍的。为啥这么说呢?因为这样的不合格的代码,无论是你还是他人进行维护或者更新新的功能,都必将难以下手,终将成为令众人“敬仰”的祖传代码。

给我安排上
我们先大致回顾一下前文的内容,前文设定了一个抽奖的业务场景,奖品共有现金、优惠券、积分和谢谢参与四类,后续很大可能增加新的奖品类型如赠送抽奖次数,用户抽中则实时将奖励发送给用户。现在呢我们稍稍的改变下业务场景,我们还是回到之前的订单活动业务场景,活动分为多次住宿活动、连住订单活动、首次入住活动、会籍订单活动等等,订单满足活动条件会赠送积分、优惠券、会籍等奖品。由此可见,活动的类型和奖品的类型都是会逐渐的增多,所以针对此场景引出了策略设计模式,同时在编码的过程中发现很多重复的代码块和固定的的流程和逻辑,这个场景符合模板设计模式,于是引出了我们本文的另一主角:模板设计模式。

问题在哪里

首先我们先找出有哪些重复代码块:

  • IRewardSendStrategy 接口的 isTypeMatch 方法,每个策略的最终实现内容都是一样的,这个是重复代码。
  • 使用实现 InitializingBean 接口的方式组合策略类时,afterPropertiesSet 方法的实现,每个策略类的实现代码也都是重复代码。
  • 判断订单是否符合活动的条件,符合条件则发送奖励,不符合则结束处理。这些逻辑是固定的,具体的判断过程和具体的奖励发放过程是不固定的,我们可以控制判断、发奖励的流程,让具体的判断过程和具体的奖励发放过程让子类去实现。

重复代码块的出现,明显是不符合面向对象OOP的开发原则的,必将对软件的健康带来影响,那接下来我们来看看如何用模板设计模式来解决。

模板设计模式

我们知道可以用模板设计模式来解决重复代码的问题,提高代码利用率的同时,也可以让代码更加的健壮。那什么是模板设计模式?我先介绍一下模板设计模式。

在模板模式(Template Pattern)中,一个抽象类公开定义了执行它的方法的方式/模板。它的子类可以按需要重写方法实现,但调用将以抽象类中定义的方式进行,这种类型的设计模式属于行为型模式。模板模式中涉及到在父类实现算法骨架,具体步骤在子类实现,所以必须要有抽象类(Java8中的接口的 default 方法貌似也可以实现)。

介绍
  • 意图:定义一个操作中的算法的骨架,而将一些步骤延迟到子类中,模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。
  • 主要解决:一些方法通用,却在每一个子类都重新写了这一方法。
  • 何时使用:有一些通用的方法。
  • 如何解决:将这些通用算法抽象出来。
  • 关键代码:在抽象类实现,其他步骤延迟到子类实现。
应用实例:
  • JDK中 ReentrantLock中公平锁和非公平锁的实现
  • Spring 中对 Hibernate 的支持,将一些已经定好的方法封装起来,比如开启事务、获取 Session、关闭 Session 等。
优点:
  • 封装不变部分,扩展可变部分。
  • 提取公共代码,便于维护。
  • 行为由父类控制,子类实现。
缺点:
  • 可能会增加代码的阅读难度。
  • 每一个不同的实现都需要一个子类来实现,导致类的个数增加,使得系统更加庞大。
使用场景:
  • 有多个子类共有的方法,且逻辑相同。
  • 重要的、复杂的方法,可以考虑作为模板方法。
注意事项:

为防止子类重写,一般模板方法都加上 final 关键词。

具体应该怎样做

接下来我就用代码来展现具体的做法,参照之前的代码,进行一些改动,具体代码我都会直接贴在文章中,我建议大家学习时,还是要动手去敲一敲,不要上来就要源码。要知道纸上得来终觉浅,自己还是要亲自去实践一把,才能得到不一样的经验。接下来我就用代码来展现具体的做法,参照之前的代码,进行一些改动,具体代码我都会直接贴在文章中,我建议大家学习时,还是要动手去敲一敲,不要上来就要源码。要知道纸上得来终觉浅,自己还是要亲自去实践一把,才能得到不一样的经验。不要再说了,赶快开始吧!!!

talk is cheap , show me the code

改造好的代码的UML图和机构图如下:

代码的UML图

代码结构

  1. 定义奖励、活动类型枚举
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
java复制代码@Getter
public enum RewardTypeEnum {
/**
* 现金奖励
*/
CASH("1","现金"),
/**
* 积分
*/
POINT("2","积分"),
/**
* 优惠券
*/
COUPON("3","优惠券"),
/**
* 谢谢参与
*/
THANK_YOU("4","谢谢参与"),
;

private String code;

private String desc;

RewardTypeEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
}



@Getter
public enum ActiveTypeEnum {


/**
* 酒店订单
*/
HOTEL_ORDER("1","酒店订单"),

/**
* 会籍订单
*/
LEVEL_ORDER("2","会籍订单"),


;

private String code;

private String desc;

ActiveTypeEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
}
  1. 定义活动、奖励发放策略接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码/**
* 奖励发送的策略接口
*
* @author Sam
* @date 2021/5/16
* @since 1.0.0
*/
public interface IRewardSendStrategy {

/**
* 发放奖励的类型,通过这个方法来标示不同的策略
* @return
*/
String type();

/**
* 是否匹配
* @param type 奖励类型
* @return
*/
boolean isTypeMatch(String type);


/**
* 发送奖励
* @param memberId 会员id
*/
void sendReward(Long memberId);

}



/**
* 订单活动逻辑判断的策略接口
*
* @author Sam
* @date 2020/11/26
* @since 1.7.3
*/
public interface IActiveHandleStrategy {
/**
* 返回活动的类型
* ActiveCategoryEnum 枚举
*
* @return
*/
String getCategory();

/**
* 返回活动的详细类型
* ActiveCategoryDetailEnum 枚举
*
* @return
*/
String getCategoryDetail();

/**
* 是否匹配
*
* @param category 活动类型
* @return
*/
boolean isTypeMatch(String category);


/**
* 订单检查
*
* @param temporaryOrderDto 临时订单
* @param activeDto 活动dto
* @return Result
*/
boolean checkOrder(ActiveOrderDto temporaryOrderDto, ActiveDto activeDto);
}
  1. 关键的一步就是识别出公共不变的方法、逻辑,将公共不变方法、固定的逻辑抽取到抽象父类中。

参照 标题:问题在哪里 提出的问题 ,最终优化的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
java复制代码
/**
* 奖励发送的策略抽象类,将 isTypeMatch 方法和实现 InitializingBean 提升到抽象类中,达到代码复用的目的
*
* @author Sam
* @date 2020/11/26
* @since 1.7.3
*/
public abstract class AbstractRewardSendStrategy implements InitializingBean, RewardSendStrategy {


@Override
public final boolean isTypeMatch(String type) {
return Objects.equals(type, this.type());
}

@Override
public final void afterPropertiesSet() throws Exception {
RewardSendStrategyFactory.registerStrategy(this.type(), this);
}

}



/**
* 活动抽象类,抽取公共方法,
* 把订单是否符合奖励的判断之后发送奖励的公共逻辑在此处实现,
* 订单具体条件的判断延迟由子类去实现.
* 策略和模板模式组合使用
*
* @author Sam
* @date 2020/11/26
* @since 1.7.3
*/
@Slf4j
@Component
public abstract class AbstractActiveHandleStrategy implements IActiveHandleStrategy {

/**
* 其他抽象方法
*/
public abstract void otherMethod();


public final void otherMethod1() {
System.out.println("其他公用方法");
}

@Override
public final boolean isTypeMatch(String categoryDetail) {
return Objects.equals(categoryDetail, this.getCategoryDetail());
}

/**
* 外部真正要调用的方法
*
* @param temporaryOrderDto 订单
*/
public final boolean handle(ActiveOrderDto temporaryOrderDto, ActiveDto activeDto) {

// 调用接口中需要子类实现的方法
boolean result = checkOrder(temporaryOrderDto, activeDto);
if (!result) {
log.error("订单 {} 不符合活动 {} 的奖励发放条件", temporaryOrderDto.getOrderNo(), activeDto.getId());
return false;
}
return sendReward(temporaryOrderDto, temporaryOrderDto.getMemberId(), activeDto);
}

/**
* 统一的发送奖励的方法
*
* @param temporaryOrderDto 订单
* @param memberId 用户ID
* @param activeDto 活动
*/
protected final boolean sendReward(ActiveOrderDto temporaryOrderDto, long memberId, ActiveDto activeDto) {
AbstractIRewardSendStrategy impl = RewardSendStrategyFactory.getImpl(activeDto.getRewardType());
impl.sendReward(memberId, activeDto);
return true;
}
}

抽象类中是可以没有抽象方法的,但一个类中如果有抽象方法,那这个类就必须定义成抽象类。

  1. 为了更好的提供给第三方调用,创建策略工厂整合策略。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
java复制代码@Slf4j
@Component
public class RewardSendStrategyFactory {

/**
* 保存策略集合
*/
private final static Map<String, AbstractRewardSendStrategy> STRATEGY_MAP = new ConcurrentHashMap<>(16);


/**
* 添加策略实例
*
* @param type
* @param strategy
*/
public static void registerStrategy(String type, AbstractRewardSendStrategy strategy) {
STRATEGY_MAP.put(type, strategy);
}

/**
* 获取策略实例
*
* @param type
* @return
*/
public static AbstractRewardSendStrategy getImpl(String type) {

return STRATEGY_MAP.get(type);
}

}




/**
* 负责所有活动处理的入口,根据 getImpl(String categoryDetail)类型来判断调用具体的活动策略
*
* @author Sam
* @date 2020/7/13
* @since 1.6.8
*/
@Slf4j
@Component
public class ActiveHandleFactory {

@Autowired
private List<AbstractActiveHandleStrategy> activeHandleList;


/**
* 对外的统一入口
*
* @param categoryDetail 类型
* @return
*/
public AbstractActiveHandleStrategy getImpl(String categoryDetail) {
return activeHandleList.stream().filter(strategy -> strategy.isTypeMatch(categoryDetail))
.findAny()
.orElseThrow(() -> new UnsupportedOperationException("没有找到策略实现"));
}

}
  1. 具体的策略实现

因为策略实现代码比较简单,我这个地方就给出一个优惠券发放和会籍订单活动的策略实现,其他的大家照猫画虎就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码
@Slf4j
@Service("couponRewardSendStrategyV1")
public class CouponRewardSendStrategy extends AbstractRewardSendStrategy {


@Override
public String type() {
return RewardTypeEnum.COUPON.getCode();
}


@Override
public void sendReward(Long memberId) {
log.info("给[{}]发送优惠券奖品", memberId);
}

}


/**
* 会籍订单的处理
*
* @author Sam
* @date 2020/11/26
* @since 1.7.3
*/
@Slf4j
@Service
public class LevelOrderActiveHandleStrategy extends AbstractActiveHandleStrategy {
@Override
public void otherMethod() {
log.info("会籍订单的实现");
}

@Override
public String getCategory() {
return ActiveTypeEnum.LEVEL_ORDER.getCode();
}

@Override
public String getCategoryDetail() {
return ActiveTypeEnum.LEVEL_ORDER.getCode();
}

@Override
public boolean checkOrder(ActiveOrderDto temporaryOrderDto, ActiveDto activeDto) {

log.info("判断订单 {} 的属性是否符合活动 {} 的条件", temporaryOrderDto, activeDto);
Random random = new Random();
int i = random.nextInt(4);
if (i >= 2) {
return false;
}
return true;
}
}
  1. 写个单元测试,看看具体的效果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码
@ContextConfiguration(locations = {"classpath:spring/spring-dao.xml", "classpath:spring/spring-service.xml"})
@RunWith(value = SpringJUnit4ClassRunner.class)
public class RewardSendStrategyFactoryTest {

@Autowired
ActiveHandleFactory activeHandleFactory;


@Test
public void test() {

ActiveDto activeDto = new ActiveDto();
activeDto.setId(101L);
activeDto.setCategory(ActiveTypeEnum.HOTEL_ORDER.getCode());
activeDto.setCategoryDetail(ActiveTypeEnum.HOTEL_ORDER.getCode());
activeDto.setRewardType(RewardTypeEnum.COUPON.getCode());
activeDto.setReward("No213215632" + RewardTypeEnum.COUPON.getDesc());

ActiveOrderDto activeOrderDto = new ActiveOrderDto();
activeOrderDto.setMemberId(11L);
activeOrderDto.setOrderType("1");
activeOrderDto.setOrderNo("202105111");

AbstractActiveHandleStrategy impl = activeHandleFactory.getImpl(activeDto.getCategoryDetail());
impl.handle(activeOrderDto, activeDto);
}

}

单元测试的输出结果如下:

单元测试结果

总结

这样我们的代码就优化完了,AbstractActiveHandleStrategy.handle() 方法中逻辑是固定不变的,这样就确定好了代码的逻辑骨架,后续有新的订单活动或者新的奖励类型,这个地方的代码都不需要改动,只需要增加对应接口的子类就行,符合开闭原则。大家看我代码写了很多,其实关键地方就在那个handle方法上。这样看起来模板设计模式是不是很简单,而且模板和策略两个模式也能很好的结合,最后的效果也不错,其实这两个只要你写稍微复杂一点的代码,都是有他们俩的使用场景的。另外由上文我们就可以看出,设计模式之间其实并不是割裂的,复杂的业务代码实现时,可能会符合多种设计模式。作为程序员我们就要对业务进行抽象,用更多更好并且合理的模式去实现。另外多说一句,大家在学习的时候一定要多多思考,多多动手,千万不要养成眼高手低的习惯,要知道纸上得来终觉浅,绝知此事要躬行的道理。设计模式系列的文章我会持续的更新,下一篇的主题有可能是管道模式,请大家敬请期待吧!哈哈!

文章首发于个人博客 Sam的个人博客,禁止未经授权转载,违者依法追究相关法律责任!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

OOM怎么办,教你生成dump文件以及查看 前言 什么是du

发表于 2021-05-16

文章已收录Github精选,欢迎Star:github.com/yehongzhi/l…

前言

在日常开发中,即使代码写得有多谨慎,免不了还是会发生各种意外的事件,比如服务器内存突然飙高,又或者发生内存溢出(OOM)。当发生这种情况时,我们怎么去排查,怎么去分析原因呢?

这时就引出这篇文章要讲的dump文件,各位看官且往下看。

什么是dump文件

dump文件是一个进程或者系统在某一个给定的时间的快照。

dump文件是用来给驱动程序编写人员调试驱动程序用的,这种文件必须用专用工具软件打开。

dump文件中包含了程序运行的模块信息、线程信息、堆栈调用信息、异常信息等数据。

在服务器运行我们的Java程序时,是无法跟踪代码的,所以当发生线上事故时,dump文件就成了一个很关键的分析点。

如何生成dump文件

这里介绍两种方式,一种是主动的,一种是被动的。

方式一

主动生成dump文件。首先要查找运行的Java程序的pid。

使用top命令:

然后使用jmap命令生成dump文件。file后面是保存的文件名称,1246则是java程序的PID。

1
shell复制代码jmap -dump:format=b,file=user.dump 1246

方式二

其实在很多时候我们是不知道何时会发生OOM,所以需要在发生OOM时自动生成dump文件。

其实很简单,只需要在启动时加上如下参数即可。HeapDumpPath表示生成dump文件保存的目录。

1
shell复制代码-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\tmp

我们还需要模拟出OOM错误,以此触发产生dump文件,首先写个接口:

1
2
3
4
5
6
7
8
9
java复制代码private static Map<String, String> map = new HashMap<>();

@RequestMapping("/oom")
public String oom() throws Exception {
for (int i = 0; i < 100000; i++) {
map.put("key" + i, "value" + i);
}
return "oom";
}

然后在启动时设置堆内存大小为32M。

1
shell复制代码-Xms32M -Xmx32M

因为要后台启动,并且输出日志,所以最后启动命令就是这样:

1
shell复制代码nohup java -jar -Xms32M -Xmx32M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/usr/local user-0.0.1-SNAPSHOT.jar  > log.file  2>&1 &

然后请求oom的接口,查看日志,果然发生了OOM错误。

查看保存dump的目录,果然生成了对应的dump文件。

如何查看dump文件

这里我介绍使用Jprofiler,有可视化界面,功能也比较完善,能够打开JVM工具(通过-XX:+HeapDumpOnOutOfMemoryError JVM参数触发)创建的hporf文件。

安装过程这里就省略了,网上谷歌,百度自行查找。我们把刚刚自动生成的java_pid1257.hprof用Jprofiler打开,看到是这个样子。

明显可以看出HashMap的Node对象,还有String对象的实例很多,占用内存也是最多的。这里还不够明显,我们看Biggest Objects。

这里就看出是UserController类的HashMap占用了大量的内存。所以造成OOM的原因不难看出,就是在UserController里的Map集合。

总结

当然线上的代码量,类的数量,实例的数量都非常庞大,所以没有那么简单就能找出报错的原因,但是要用什么工具,怎么用至少要知道,那么当遇到问题时,才不会慌张。

我问过一些技术大佬,为什么技术大佬代码写得不是很多,但是工资却特别高。大佬说,那是因为当线上出现问题时,大佬能解决大家解决不了的问题,这种能力就体现出他个人的价值。

一句话讲完,业务代码大部分程序员都会写,而线上排错能力并不是大部分程序员都会排。

这篇文章就讲到这里了,感谢大家的阅读,希望看完大家能有所收获!

文章持续更新,微信搜索『java技术爱好者』,关注后第一时间收到推送的技术文章,文章分类收录于github:github.com/yehongzhi,总能找到你感兴趣的

觉得有用就点个赞吧,你的点赞是我创作的最大动力~

我是一个努力让大家记住的程序员。我们下期再见!!!

能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

后端开发都应该了解点接口的压力测试(Apache Bench

发表于 2021-05-16

背景

小A:小B,最近调你的接口老是超时呀,8秒都还没返回结果,是不是有性能问题呀!

小B :我看看~~

类似这样的对话,在现实中是时有发生的,不是特别严重的话,往往大家也不会去重视这个事。

尤其是在一些测试资源并不完备的,开发人员对性能测试没有接触过的一些公司,遇到这些会显得更加力不从心。

本着对自己写出来的东西负责,上线之前,我们都应该对自己的接口进行一个简单的压力测试。

其实做这一步也是为了让我们心里有个度,有个底,不至于说连能承受多少量都不知道。如果什么都不知道,那很容易陷入一个无底深渊,这是一件很可怕的事情。

老黄平时用的比较多的是Apache Bench,当然,jmeter、wrk和vegeta也都是非常不错的。

说说用这个的原因吧,免安装,绿色版,开箱(解压)即用,说白了,老黄其实就是懒鬼~~

jmeter要装jdk,懒得弄。

wrk/wrk2 要自己编译,mac上面可以直接安装,但是我的mac配置不高,不折腾。

vegeta用起来感觉不是很顺手,后面再慢慢挖掘。

Apache Bench 介绍

Apache Bench 是Apache服务器自带的一个web压力测试工具,简称ab。

ab是一个命令行工具,对发起负载的本机要求很低,根据ab命令可以创建很多的并发访问线程,模拟多个访问者同时对某一URL地址进行访问,因此可以用来测试目标服务器的负载压力。总的来说ab工具小巧简单,上手学习较快,可以提供需要的基本性能指标,但是没有图形化结果,不能监控。

ab进行的测试的本质是基于HTTP协议,可以理解为对web服务器软件的黑盒性能测试,获得的一切数据和计算结果,都是可以通过HTTP来解释的。

Apache Bench 简单使用

下面的示例都是在CentOS上面的。

通过下面的命令安装,

1
复制代码yum -y install httpd-tools

装好之后,运行一下ab -V就可以看到版本信息。

1
2
3
4
5
ruby复制代码ab -V

This is ApacheBench, Version 2.3 <$Revision: 1430300 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

运行一下ab -h就可以看到帮助信息。

帮助信息有很多,也比较详细,老黄把一些常用的参数都添加了注释,方便大家查看。

其中最常用的两个参数是 -c 和 -n,当然如果是POST/PUT请求的话,还离不开 -T 和 -p。

说了不少废话,来看看怎么使用才是重点。

下面准备了两个简单的接口来进行演示

1
2
3
4
5
6
7
8
9
10
cs复制代码[ApiController]
[Route("test")]
public class TestController : ControllerBase
{
[HttpGet]
public string Get() => "GET";

[HttpPost]
public string Post([FromBody]string value) => value;
}

这个接口是跑在docker里面的(一台2c4g的活动云服务器),下面的图可以看到具体的信息,还有两个接口的访问详情。

先来压一下GET请求的接口, 200并发,5000个请求。

1
bash复制代码ab -c 200 -n 5000 localhost:9000/test

POST请求有点不一样,要先准备一下body的参数,这里在/tmp目录创建了一个post.json文件,里面就一个字符串。

1
2
bash复制代码cat /tmp/post.json
"Post"

在压测命令里面,指定Content-Type为application/json, 参数的数据文件路径是/tmp/post.json

同样也是200并发,5000个请求。

1
bash复制代码ab -c 200 -n 5000 -T "application/json" -p /tmp/post.json localhost:9000/test

到这里,大家对怎么压测,应该都不会太陌生了,比较陌生的应该是压测的结果要怎么看。

下面拿其中一个结果示例来说明各项指标的含义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
yaml复制代码Server Software:        Kestrel  # 服务器软件名称
Server Hostname: localhost # 服务器主机名
Server Port: 9000 # 服务器端口

Document Path: /test # 测试的URL路径
Document Length: 3 bytes # 文档大小

Concurrency Level: 200 # 并发数
Time taken for tests: 2.386 seconds # 消耗的总时间
Complete requests: 5000 # 总次数
Failed requests: 0 # 失败的请求数
Write errors: 0 # 网络连接写入错误数
Total transferred: 680000 bytes # 传输的总数据量
HTML transferred: 15000 bytes # HTML文档的总数据量
Requests per second: 2095.43 [#/sec] (mean) # (平均每秒的请求数) 这个是非常重要的参数数值,服务器的吞吐量
Time per request: 95.446 [ms] (mean) # (所有并发用户都请求一次的平均时间)
Time per request: 0.477 [ms] (mean, across all concurrent requests) # (单个用户请求一次的平均时间)
Transfer rate: 278.30 [Kbytes/sec] received # 每秒获取的数据长度 (传输速率,单位:KB/s)

# 网络上消耗的时间的分解
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 3 3.7 2 20
Processing: 12 68 120.0 53 1057
Waiting: 1 67 120.1 52 1050
Total: 15 71 119.8 55 1058

# 整个场景中所有请求的响应情况
Percentage of the requests served within a certain time (ms)
50% 55
66% 63
75% 68
80% 71
90% 77
95% 88
98% 105
99% 1012
100% 1058 (longest request)

这么多的指标,我们可以重点关注下面几个

  1. Requests per second
  2. Failed requests
  3. 90%,95%和98%的响应时间

第一个是吞吐量,这个上不去,其实是挺尴尬的。

第二个是失败的请求数,这个数要尽可能的低,最好是0,没有失败的。设想一下,100个请求,80个都是失败的,这个结果还能有意义不。

第三个是响应时间,这个可以看到大部分请求的速度如何。

进行压测时的一些小建议:

  • 压测尽可能让并发数从低往高慢慢递增,避免一开始就设的太大,一个比较好的参考依据是现在阶段线上环境的并发数
  • 压测的持续时间可以持续久一点,这样可以看到更多可能出现的情况,可以考虑5分钟,8分钟,15分钟等
  • 有条件的,压测和被压测的机器要独立,因为压测的时候也会有资源占用,可能会影响被压测的接口
  • 不要只看压测的结果数据,还要留意被压测机的cpu,内存等指标在压测时是否正常
  • 内网压测的效果达到预期后,再考虑外网的,网络因素,可控性不强

分布式压测和全链路压测,暂时不用想了,留给更专业的人去做吧~~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

nodejs 合并 pdf 的艰难之路

发表于 2021-05-16

刚刚群里面分享了纸质版的樊登故事会, 都 2021 年了我觉得看纸质太麻烦了, 网上应该有应该有一堆 pdf 打包的合集, 下载下来岂不美哉. 一顿搜索之后发现都是单个 pdf , 这个云盘那个网盘这个码那个码的, 一个个下载太慢了, 之后再搜索后来这笔原来抖音 7100w 的粉丝, 转念一想应该有人做过类似的公众号合集, 一搜还真有, 分析之后发现接口是透明的, 直接返回 pdf, 但是一本完整的书 pdf 都是 n 个 pdf 整在一起的, 之后肯定需要合并起来成为一个完整的, 先不管3721, 直接爬虫全部下载下来. 如下.

1.png

合并的时候直接谷歌搜索 nodejs pdf merge

我先找到了一个 www.npmjs.com/package/eas…

一看这下载量堪比某老师的作品, 这 api 如此接地气, 这调用他妈的如此简单. 我呼吸变得有点急促, 嗯, 确定过眼神. 你就是我要找的人.

cnpm i 娶她进门, 复制粘贴, 一气呵成, 遍历文件, pdf 生成, 理因如是.

node xxx 一顿操作, 出来个图形界面针不错, 沃曹磊, Super surprise. 这简直是喜当爹, 我突然觉得头上多了一顶某种颜色的帽子.

2.png

很显然这并不是我一个人的错. 感情这东西是双向的, 单方面无条件的付出是不行滴.

但我又不想马上离婚, 因为我已经付出了感情, 把她从我生命里删除不是一件容易的事情.

我觉得是时候跟她坐下来好好聊聊了, 一看源代码,

1
2
3
4
5
6
7
8
9
10
js复制代码 let command = [
"java", "-jar", jarPath, "PDFMerger"
];

let maxHeapOpt = opts.maxHeap ? '-Xmx' + opts.maxHeap : null
if (maxHeapOpt) {
command.splice(2, 0, maxHeapOpt)
}

exec(command.join(' '), opts

子进程调用 java, 哎, 为何偏偏是 java !!!!

人是不能始乱终弃的, 更何况是我这样的人. 所以只能换个老婆了.

天涯何处无芳草, 更何况我有 npm 这个青青草原呢.

一顿搜索又找到一个备胎, www.npmjs.com/package/nod…

一看 api, 函数调用, 这下载量, 嗯, 怎么说呢, 有种初恋的味道. 确定过眼神, 你就是对的人.

当然, 无数先贤告诫我们, 人不能在一个地方跌倒两次, 当一个人用一段情换来一身伤痕的时候, 以后就不会那么奋不顾身.

所以慢慢的我们会变得越来越谨慎, 保守, 变得不那么傻.

所以当我再次 node xxx,

3.png

嗯, 我 TM 就知道, 她肯定存在我所不知道的姿势.

进入正经模式.

这个库用到 pdftk, linux 系统下面可以用 sudo apt-get install pdftk

macos 下面可以用

1
2
3
bash复制代码brew tap spl/pdftk

brew install pdftk

之后运行还是不对, 又发现了一些细小的说明, 原来是版本不对

Update 2 (2016-01-12): For OS X 10.11 El Capitan users, the source build ofpdftkdoes not seem to work. There is a.pkgavailable athttps://stackoverflow.com/a/33248310/545794. As of this date, PDF Labs has not yet released a source update that fixes the build.

stackoverflow.com/questions/2…
安装 pkg 包

运行

❯ node foo.js

4.png

收工

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JVM简单的问题排查-内存占用高

发表于 2021-05-15

文章介绍了关于发现JVM内存过大问题时,的一些简单的排查方法,主要分为3个小方法:

  • 使用jmap -histo查询当前占用内存较大的类
  • dump出堆内存,文件过大时在服务器使用jhat 查看dump文件堆内存状态
  • dump出堆内存,下载堆文件到本地,通过VisualVM查看dump文件堆内存状态

一、模拟JVM占用高内存

模拟项目使用使用jdk8

这里使用了springBoot启动项目是为了更贴近实际项目,如果自己尝试可以直接用@Test启动,启动后线程内使用类会更少,排查堆内存会更清晰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码package com.cflong.JVM问题排查;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
* MemoryBase模拟对象
*
* @author cflong
* @date 2021/05/13 21:24:27
*/
@Data
@AllArgsConstructor
public class MemoryBean {

private String name ;

private int age;

private String remark;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码package com.cflong.JVM问题排查;

import java.util.ArrayList;
import java.util.List;

/**
* 模拟内存占用高的情况
*
* @author cflong
* @date 2021/05/08 18:06:44
*/
public class MemoryTakeHighTest {

public static void test() throws InterruptedException {
List<MemoryBean> list = new ArrayList<>(1000);
int i = 0;
while (i < 10000) {
list.add(new MemoryBean("name",12,new String("测试大内存备注,测试大内存备注," +
"测试大内存备注,测试大内存备注,测试大内存备注," +
"测试大内存备注,测试大内存备注,测试大内存备注," +
"测试大内存备注,测试大内存备注,测试大内存备注,")));
i++;
}
Thread.sleep(1000000000);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码package com.cflong;

import com.cflong.JVM问题排查.MemoryTakeHighTest;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TestJvmOomApplication {

public static void main(String[] args) throws Throwable {
SpringApplication.run(TestJvmOomApplication.class, args);
//模拟内存占用过高
MemoryTakeHighTest.test();
}
}

二、如何发现内存问题

可以通过使用linux中的top命令查询服务器中占用内存的状态:
WX20210515-165535@2x.png
使用top命了后会出现下图内容,默认是CPU排序的服务器情况。我们可以调整为内存排序,整理不同的服务按键不一样。WX20210515-165630@2x.png

  • Linux:直接在窗口点击“m”键,就使用内存排序,如下图:

WX20210515-170000@2x.png

  • macOS:直接在窗口点击o键,会出现输入框,在输入框中输入“mem”回车,就会使用内存排序,如下图:

WX20210515-165746@2x.png
通过上面的命了,我们可以看到类似下图的,发现java程序占用大内存的情况:
WX20210515-170110@2x.png
我们只知道这个是java程序,但是不一定是我们自己的程序,可能不是我们目标的java程序,这时候可以通过命了ps -ef|grep pid(进程号)来查询进程是那个程序的,如下图:

1
shell复制代码ps -ef|grep <pid>

WX20210515-192204.png
在这里可以发现是我们自己开发的程序,这个时候真正进入JVM的内存占用的排查。

如果大家练习可以使用jps命令来快速查到所有java的进程号,如下图

image.png

三、找到高内存的类

jmap -histo

使用jmap -histo查询当前占用内存较大的类
这里使用下面命令查看占用内存高的前20个类,如下图排查了一些基本常用的类,可以看出里面包含了一个10000对象的我们自建的MemoryBean类,这里就定位到问题类了。

当然这里问题也可能出现在类如String、HashMap这些类里面,这个需要更复杂的排查方法,不在这次的研究范围之内,后续我们可以在进行讨论。

1
shell复制代码jamp -histo <pid> | head -20

WX20210515-183635.png
​

jhat

dump出堆内存,文件过大时在服务器使用jhat 查看dump文件堆内存状态
在服务区中使用jmap命令dump出服务的堆信息。

1
shell复制代码jamp -dump:format=b,file=文件名字.hporf <pid>

命令中format=b为固定值,file为dump出来的文件名称,后缀使用hporf

WX20210515-171315@2x.png
当dump出来的文件非常大,不方便导出服务区进行分析,我们可以使用jhat 命令启动分析服务进行内存分析。

1
shell复制代码jhat 文件名.hporf(刚刚dump出来的文件)

image.png
执行晚命令后会出现如图的,显示分析服务已启动,端口为7000,我们使用浏览器访问ip:700(我是本地使用jhat,所以ip为localhost)
image.png

我们下拉到最下面,点击Show heap histogram连接
image.png

我们能看到堆占用类的降序排列,可以看出里面包含了一个10000对象的我们自建的MemoryBean类,这里就定位到问题类了。
image.png
​

VisualVM

dump出堆内存,下载堆文件到本地,通过VisualVM查看dump文件堆内存状态

VisualVM如何安装这里不详细描述,大家可以自己搜索一下

打开VisualVM,点击【文件】【装入】
image.png

下拉选择【堆DUMP(.hprof,.*】,在选择上一个方法中dump出来的文件
image.png

这里后会出现下图画面,选择【类】,我们能看到堆占用类的降序排列,可以看出里面包含了一个10000对象的我们自建的MemoryBean类,这里就定位到问题类了。
image.png

到这里已经介绍完关于3个简单排查JVM内存占用大的方法了,如有什么错误的地方谢谢指正。觉得写的不错的同学可以关注我,我会更新更多好的干货。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ES-基础查询 基础命令语句 查询

发表于 2021-05-15

基础命令语句

  • 使用PUT 进行更新时需要将所有属性都抄一遍,否则会被置空
  • 而通过POST /索引名/类型名/文档id/_update 只需填写要修改的属性即可,灵活性更高
    method url 描述
    PUT localhost:9200/索引名/类型名/文档id 创建文档(指定id)
    POST localhost:9200/索引名/类型名 创建文档(随机文档id)
    POST localhost:9200/索引名/类型名/文档id/_update 修改文档
    DELETE localhost:9200/索引名/类型名/文档id 删除文档
    GET localhost:9200/索引名/类型名/文档id 通过id查询文档
    POST localhost:9200/索引名/类型名/_search 查询所有数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
json复制代码// 创建一个空的索引
PUT /test2
{
"mappings": {
"properties": {
"name": {"type": "text"},
"age": { "type": "integer"},
"birthday": {"type": "date"}
}
}
}
//创建一个索引test1 类型type1 id=1
PUT /test1/type1/1
{
// 往索引中插入数据
"name": "xxxxx",
"age": 18
}
// 获取索引信息
GET test1

查询

简单查询

1
2
3
4
5
6
7
8
9
json复制代码GET test1/type1/_search
{
// 查询的参数体
"query":{
"match": { //查询的结果为模糊匹配
"name": "遇见" //查询的条件
}
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
json复制代码{
"took" : 901,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {// 查询的结果都放在hits里了
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 0.26706278,
"hits" : [
{
"_index" : "test1",
"_type" : "type1",
"_id" : "2",
"_score" : 0.26706278, //匹配度越大,数值越大
"_source" : {
"name" : "遇见_line",
"age" : 20
}
},
{
"_index" : "test1",
"_type" : "type1",
"_id" : "3",
"_score" : 0.26706278,
"_source" : {
"name" : "遇见_line3",
"age" : 20
}
},
{
"_index" : "test1",
"_type" : "type1",
"_id" : "1",
"_score" : 0.26706278,
"_source" : {
"name" : "遇见_line1",
"age" : 20
}
}
]
}
}

结果过滤

在原本的查询中添加 _source即可进行字段的筛选

1
2
3
4
5
6
7
8
9
json复制代码GET test1/type1/_search
{
"query":{
"match": {
"name": "遇见"
}
},
"_source" : ["name"]
}

结果:

1
2
3
4
5
6
7
8
9
10
11
json复制代码"hits" : [
{
"_index" : "test1",
"_type" : "type1",
"_id" : "2",
"_score" : 0.26706278,
"_source" : {
"name" : "遇见_line"
// age 没有了
}
}

结果排序

通过sort 来排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
json复制代码GET test1/type1/_search
{
"query":{
"match": {
"name": "遇见"
}
},
"_source" : ["name"],
"sort":[
{
"age":{
"order": "desc" //"asc" 升序, desc降序
}
}
]

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
json复制代码"hits" : [
{
"_index" : "test1",
"_type" : "type1",
"_id" : "3",
"_score" : null,
"_source" : {
"name" : "遇见_line3"
},
"sort" : [
21
]
},
{
"_index" : "test1",
"_type" : "type1",
"_id" : "2",
"_score" : null,
"_source" : {
"name" : "遇见_line2"
},
"sort" : [
20
]
},
{
"_index" : "test1",
"_type" : "type1",
"_id" : "1",
"_score" : null,
"_source" : {
"name" : "遇见_line1"
},
"sort" : [
19
]
}
]
}

分页查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
json复制代码GET test1/type1/_search
{
"query":{
"match": {
"name": "遇见"
}
},
"_source" : ["name"],
"sort":[
{
"age":{
"order": "desc"
}
}
],
"from":0, //从第0条数据开始 ,数据下标从0开始
"size":2 // 每页显示2条数据
}

多条件查询 | Bool 查询

must(and) 所有条件都必须符合

查询名字包含遇见且年龄为19的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
json复制代码GET test1/type1/_search
{
"query":{
"bool": {
"must": [
{
"match": {
"name": "遇见"
}
},
{
"match": {
"age": 19
}
}
]
}
}
}

should (or)

查询年龄20 or 19的人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
json复制代码GET test1/type1/_search
{
"query":{
"bool": {
"should": [
{
"match": {
"age": "20"
}
},
{
"match": {
"age": 19
}
}
]
}
}
}

msut_not 等价于not

查询既不是19也不是20 岁的人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
json复制代码GET test1/type1/_search
{
"query":{
"bool": {
"must_not": [
{
"match": {
"age": "20"
}
},
{
"match": {
"age": 19
}
}
]
}
}
}

过滤器 filter

查询年龄在【10,20】,且名称包含遇见的人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
json复制代码GET test1/type1/_search
{
"query":{
"bool": {
"must": [
{
"match": {
"name": "遇见"
}
}
],
"filter": [
{
"range": {
"age": {
"gte": 10, //gt> gte>=
"lte": 20
}
}
}
]
}
}
}

多匹配查询 tags

查询标签中包含男or 技术的人

1
2
3
4
5
6
7
json复制代码GET test1/type1/_search
{
"query":{
"match": {
"tags": "男 技术" // 空格表示or
}
}

精准查询term

通过倒排索引进行查询

原始数据 倒排索引
博客id 标签 标签 博客id
1 windows windows 1,2,3
2 windows linux 3,4
3 windows,linux
4 liunx
  • term 直接精准匹配
  • match 使用分词器(先分析文档,在查找)
  • text :会被分词器拆分
  • keyword 被当成一个整体不会被分词器拆分
1
2
3
4
5
6
7
8
json复制代码GET test1/type1/_search
{
"query":{
"term": {
"name": "遇见_line1"
}
}
}

高亮查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
json复制代码GET test1/type1/_search
{
"query":{
"match": {
"name": "遇见"
}
},
// 选择name字段高亮
"highlight" : {
"fields":{
"name":{}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
json复制代码"hits" : [
{
"_index" : "test1",
"_type" : "type1",
"_id" : "3",
"_score" : 0.17402273,
"_source" : {
"name" : "遇见_line3",
"age" : 21,
"tags" : [
"直男",
"技术宅",
"内向"
]
},
"highlight" : {
"name" : [
// 被em标签包裹的就是高亮部分
"<em>遇</em><em>见</em>_line3"
]
}
},

我们也可以自定义高亮标签

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
json复制代码GET test1/type1/_search
{
"query":{
"match": {
"name": "遇见"
}
},
"highlight" : {
"pre_tags": "<p color='red'>",
"post_tags": "</p>",
"fields":{
"name":{}
}
}
}
1
2
3
4
5
6
json复制代码"highlight" : {
"name" : [
"<p color='red'>遇</p><p color='red'>见</p>_line3"
]
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

java8 Stream 之collect(Collecto

发表于 2021-05-15

前言

第一讲在这里:关于collect(),Collector的解析【理解Collector非常重要,否则无法学习收集部分】

  • 本次我们分析 groupingBy 分组,及嵌套分组
  • 下一讲分析 partitioningBy分区

一、groupingBy使用

groupingBy是用来分组的,它是Collectors的静态方法,类似于SQL中的group by,一般如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
Person p1 = new Person("hehe", 20);
Person p2 = new Person("wang", 20);
Person p6 = new Person("hou", 25);
Person p5 = new Person("hou", 25);
Person p3 = new Person("mabi", 30);
Person p4 = new Person("li", 30);
List<Person> personList = Arrays.asList(p1, p2, p3, p4, p5, p6);
/*按照age进行分组*/
Map<Integer, List<Person>> collect = personList.stream().collect(Collectors.groupingBy(Person::getAge));
/*groupby 嵌套 先根据 age,再根据 name 分组*/
Map<Integer, Map<String, List<Person>>> collect6 = personList.stream().collect(groupingBy(Person::getAge, groupingBy(Person::getName)));

可以发现,它一般生成Map K就是分组的条件, V一般都用List接受,当然可以对List继续分组,实现多层Map。

二、方法参数解析

还是强调:先熟悉Collector的构造才可以理解。

1
2
3
4
java复制代码public static <T, K, D, A, M extends Map<K, D>>
Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream)

groupingBy方法需要传入三个参数:

  • 1、classifier,提供 T,返回 K, 这个的做主要作用是:流中元素是T,我们需要根据它得到的K 进行分组。
  • 2、mapFactory,它最终的形态是 M extends Map<K, D> ,比如示例中的Map<Integer, List>
    是最终返回的容器, 这个mapFactory 是要替换 Collector中的 Supplier<A>
  • 3、downstream,它是Collector的实现类,T是流中元素,和classifier中的T 对应,A 就是supplier提供的容器, D是finisher 返回的结果容器。

三、代码详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码public static <T, K, D, A, M extends Map<K, D>>
Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream) {
->1.获取collector的容器A
Supplier<A> downstreamSupplier = downstream.supplier();
->2.获取collector的累加器accumulator
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
->3.制造一个新的累加器,这个lambda表达式,最终需要调用它的accept(Map m,T t)
BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
->3.1 规约的开始,将T转换为K 将来作为map的键
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
->3.2 Map的default方法,key不存在,则生成一个V,其为容器A
A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
->3.3 每次的accumulate只是将 map中key对应的V拿出来,增加t
downstreamAccumulator.accept(container, t);
};
->4.融合,主要是用于并行流,创建了多个容器的时候,需要对多个map进行合并,所以并行流并不一定块
BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
@SuppressWarnings("unchecked")
->将mapFactory强转为 中间结果容器,一定会成功的,我们只是在finisher时,改为<K,D>
Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;

->5.判断IDENTITY_FINISH,如果A==D 那么就没有执行finisher的意义
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
->这个地方请仔细体会:我们构造了完全Map<K,A>形式的 supplier、accmulator、combiner
return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
}
else {
//6.否则 执行D的强转
@SuppressWarnings("unchecked")
Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
Function<Map<K, A>, M> finisher = intermediate -> {
intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
@SuppressWarnings("unchecked")
M castResult = (M) intermediate;
return castResult;
};
//7、最终都是生产一个collecotr,供collect()调用
return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
}
}

四:总结

1、需要体会 groupingBy的嵌套分组。
2、通过代码可以看出,如果并发收集Map,需要进行combiner,效率可能并没有串行流高,甚至当A和D不相同时,会进行一个replaceAll,效率更低,所以希望使用时,根据需要进行并行or串行, A 和 D的类型尽量可以统一,是否使用IDENTITY_FINISH 要心中有数。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

把Redis当作队列来用,真的合适吗? 从最简单的开始:Li

发表于 2021-05-15

微信搜索关注「水滴与银弹」公众号,第一时间获取优质技术干货。7年资深后端研发,给你呈现不一样的技术视角。

大家好,我是 Kaito。

我经常听到很多人讨论,关于「把 Redis 当作队列来用是否合适」的问题。

有些人表示赞成,他们认为 Redis 很轻量,用作队列很方便。

也些人则反对,认为 Redis 会「丢」数据,最好还是用「专业」的队列中间件更稳妥。

究竟哪种方案更好呢?

这篇文章,我就和你聊一聊把 Redis 当作队列,究竟是否合适这个问题。

我会从简单到复杂,一步步带你梳理其中的细节,把这个问题真正的讲清楚。

看完这篇文章后,我希望你对这个问题你会有全新的认识。

在文章的最后,我还会告诉你关于「技术选型」的思路,文章有点长,希望你可以耐心读完。

从最简单的开始:List 队列

首先,我们先从最简单的场景开始讲起。

如果你的业务需求足够简单,想把 Redis 当作队列来使用,肯定最先想到的就是使用 List 这个数据类型。

因为 List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。

如果把 List 当作队列,你可以这么来用。

生产者使用 LPUSH 发布消息:

1
2
3
4
shell复制代码127.0.0.1:6379> LPUSH queue msg1
(integer) 1
127.0.0.1:6379> LPUSH queue msg2
(integer) 2

消费者这一侧,使用 RPOP 拉取消息:

1
2
3
4
shell复制代码127.0.0.1:6379> RPOP queue
"msg1"
127.0.0.1:6379> RPOP queue
"msg2"

这个模型非常简单,也很容易理解。

但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL。

1
2
shell复制代码127.0.0.1:6379> RPOP queue
(nil) // 没消息了

而我们在编写消费者逻辑时,一般是一个「死循环」,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:

1
2
3
4
5
6
7
python复制代码while true:
msg = redis.rpop("queue")
// 没有消息,继续循环
if msg == null:
continue
// 处理消息
handle(msg)

如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。

怎么解决这个问题呢?

也很简单,当队列为空时,我们可以「休眠」一会,再去尝试拉取消息。代码可以修改成这样:

1
2
3
4
5
6
7
8
python复制代码while true:
msg = redis.rpop("queue")
// 没有消息,休眠2s
if msg == null:
sleep(2)
continue
// 处理消息
handle(msg)

这就解决了 CPU 空转问题。

这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。

假设设置的休眠时间是 2s,那新消息最多存在 2s 的延迟。

要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发 CPU 空转问题。

鱼和熊掌不可兼得。

那如何做,既能及时处理新消息,还能避免 CPU 空转呢?

Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知我的消费者立即处理新消息呢?

幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。

现在,你可以这样来拉取消息了:

1
2
3
4
5
6
7
python复制代码while true:
// 没消息阻塞等待,0表示不设置超时时间
msg = redis.brpop("queue", 0)
if msg == null:
continue
// 处理消息
handle(msg)

使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。

这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得。

注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。

解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点?

我们一起来分析一下:

  1. 不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据
  2. 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了

第一个问题是功能上的,使用 List 做消息队列,它仅仅支持最简单的,一组生产者对应一组消费者,不能满足多组生产者和消费者的业务场景。

第二个问题就比较棘手了,因为从 List 中 POP 一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了。

这也意味着,如果消费者在处理消息时异常宕机,那这条消息就相当于丢失了。

针对这 2 个问题怎么解决呢?我们一个个来看。

发布/订阅模型:Pub/Sub

从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。

它正好可以解决前面提到的第一个问题:重复消费。

即多组生产者、消费者的场景,我们来看它是如何做的。

Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。

假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。

首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。

1
2
3
4
5
6
shell复制代码// 2个消费者 都订阅一个队列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1

此时,2 个消费者都会被阻塞住,等待新消息的到来。

之后,再启动一个生产者,发布一条消息。

1
2
shell复制代码127.0.0.1:6379> PUBLISH queue msg1
(integer) 1

这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。

1
2
3
4
5
shell复制代码127.0.0.1:6379> SUBSCRIBE queue
// 收到新消息
1) "message"
2) "queue"
3) "msg1"

看到了么,使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。

除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。

1
2
3
4
5
6
shell复制代码// 订阅符合规则的队列
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "queue.*"
3) (integer) 1

这里的消费者,订阅了 queue.* 相关的队列消息。

之后,生产者分别向 queue.p1 和 queue.p2 发布消息。

1
2
3
4
shell复制代码127.0.0.1:6379> PUBLISH queue.p1 msg1
(integer) 1
127.0.0.1:6379> PUBLISH queue.p2 msg2
(integer) 1

这时再看消费者,它就可以接收到这 2 个生产者的消息了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
shell复制代码127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
...
// 来自queue.p1的消息
1) "pmessage"
2) "queue.*"
3) "queue.p1"
4) "msg1"

// 来自queue.p2的消息
1) "pmessage"
2) "queue.*"
3) "queue.p2"
4) "msg2"

我们可以看到,Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。

讲完了它的优点,那它有什么缺点呢?

其实,Pub/Sub 最大问题是:丢数据。

如果发生以下场景,就有可能导致数据丢失:

  1. 消费者下线
  2. Redis 宕机
  3. 消息堆积

究竟是怎么回事?

这其实与 Pub/Sub 的实现方式有很大关系。

Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。

一个完整的发布、订阅消息处理流程是这样的:

  1. 消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
  2. 生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它

看到了么,整个过程中,没有任何的数据存储,一切都是实时转发的。

这种设计方案,就导致了上面提到的那些问题。

例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。

如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。

所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。

这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。

另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。

也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。

最后,我们来看 Pub/Sub 在处理「消息积压」时,为什么也会丢数据?

当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。

如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。

但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失!

这是怎么回事?

还是回到 Pub/Sub 的实现细节上来说。

每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。

当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。

之后,消费者不断地从缓冲区读取消息,处理消息。

但是,问题就出在这个缓冲区上。

因为这个缓冲区其实是有「上限」的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。

如果超过了缓冲区配置的上限,此时,Redis 就会「强制」把这个消费者踢下线。

这时消费者就会消费失败,也会丢失数据。

如果你有看过 Redis 的配置文件,可以看到这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60。

它的参数含义如下:

  • 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
  • 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线

Pub/Sub 的这一点特点,是与 List 作队列差异比较大的。

从这里你应该可以看出,List 其实是属于「拉」模型,而 Pub/Sub 其实属于「推」模型。

List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。

但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。

当生产、消费速度不匹配时,就会导致缓冲区的内存开始膨胀,Redis 为了控制缓冲区的上限,所以就有了上面讲到的,强制把消费者踢下线的机制。

好了,现在我们总结一下 Pub/Sub 的优缺点:

  1. 支持发布 / 订阅,支持多组生产者、消费者处理消息
  2. 消费者下线,数据会丢失
  3. 不支持数据持久化,Redis 宕机,数据也会丢失
  4. 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失

有没有发现,除了第一个是优点之外,剩下的都是缺点。

所以,很多人看到 Pub/Sub 的特点后,觉得这个功能很「鸡肋」。

也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多。

目前只有哨兵集群和 Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。

我们再来看一下,Pub/Sub 有没有解决,消息处理时异常宕机,无法再次消费的问题呢?

其实也不行,Pub/Sub 从缓冲区取走数据之后,数据就从 Redis 缓冲区删除了,消费者发生异常,自然也无法再次重新消费。

好,现在我们重新梳理一下,我们在使用消息队列时的需求。

当我们在使用一个消息队列时,希望它的功能如下:

  • 支持阻塞等待拉取消息
  • 支持发布 / 订阅模式
  • 消费失败,可重新消费,消息不丢失
  • 实例宕机,消息不丢失,数据可持久化
  • 消息可堆积

Redis 除了 List 和 Pub/Sub 之外,还有符合这些要求的数据类型吗?

其实,Redis 的作者也看到了以上这些问题,也一直在朝着这些方向努力着。

Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque。

这个项目的定位,就是一个基于内存的分布式消息队列中间件。

但由于种种原因,这个项目一直不温不火。

终于,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream。

下面我们就来看看,它能符合上面提到的这些要求吗?

趋于成熟的队列:Stream

我们来看 Stream 是如何解决上面这些问题的。

我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?

首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:

  • XADD:发布消息
  • XREAD:读取消息

生产者发布 2 条消息:

1
2
3
4
5
shell复制代码// *表示让Redis自动生成消息ID
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"

使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。

这个消息 ID 的格式是「时间戳-自增序号」。

消费者拉取消息:

1
2
3
4
5
6
7
8
9
shell复制代码// 从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
2) 1) 1) "1618469123380-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618469127777-0"
2) 1) "name"
2) "lisi"

如果想继续拉取消息,需要传入上一条消息的 ID:

1
2
shell复制代码127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)

没有消息,Redis 会返回 NULL。

以上就是 Stream 最简单的生产、消费。

这里不再重点介绍 Stream 命令的各种参数,我在例子中演示时,凡是大写的单词都是「固定」参数,凡是小写的单词,都是可以自己定义的,例如队列名、消息长度等等,下面的例子规则也是一样,为了方便你理解,这里有必要提醒一下。

下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?

1) Stream 是否支持「阻塞式」拉取消息?

可以的,在读取消息时,只需要增加 BLOCK 参数即可。

1
2
shell复制代码// BLOCK 0 表示阻塞等待,不设置超时时间
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0

这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。

2) Stream 是否支持发布 / 订阅模式?

也没问题,Stream 通过以下命令完成发布订阅:

  • XGROUP:创建消费者组
  • XREADGROUP:在指定消费组下,开启消费者拉取消息

下面我们来看具体如何做?

首先,生产者依旧发布 2 条消息:

1
2
3
4
shell复制代码127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"

之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

1
2
3
4
5
6
shell复制代码// 创建消费者组1,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
// 创建消费者组2,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK

消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。

第一个消费组开始消费:

1
2
3
4
5
6
7
8
9
shell复制代码// group1的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
2) 1) 1) "1618470740565-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618470743793-0"
2) 1) "name"
2) "lisi"

同样地,第二个消费组开始消费:

1
2
3
4
5
6
7
8
9
shell复制代码// group2的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
2) 1) 1) "1618470740565-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618470743793-0"
2) 1) "name"
2) "lisi"

我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。

这样一来,就达到了多组消费者「订阅」消费的目的。

3) 消息处理时异常,Stream 能否保证消息不丢失,重新消费?

除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。

当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。

1
2
shell复制代码// group1下的 1618472043089-0 消息已处理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0

如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。

待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

1
2
3
4
5
6
7
8
9
10
shell复制代码// 消费者重新上线,0-0表示重新拉取未ACK的消息
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
// 之前没消费成功的数据,依旧可以重新消费
1) 1) "queue"
2) 1) 1) "1618472043089-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618472045158-0"
2) 1) "name"
2) "lisi"

4) Stream 数据会写入到 RDB 和 AOF 做持久化吗?

Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

5) 消息堆积时,Stream 是怎么处理的?

其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

  1. 生产者限流:避免消费者处理不及时,导致持续积压
  2. 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

而 Redis 在实现 Stream 时,采用了第 2 个方案。

在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

1
2
3
shell复制代码// 队列长度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。

这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

除了以上介绍到的命令,Stream 还支持查看消息长度(XLEN)、查看消费者状态(XINFO)等命令,使用也比较简单,你可以查询官方文档了解一下,这里就不过多介绍了。

好了,通过以上介绍,我们可以看到,Redis 的 Stream 几乎覆盖到了消息队列的各种场景,是不是觉得很完美?

既然它的功能这么强大,这是不是意味着,Redis 真的可以作为专业的消息队列中间件来使用呢?

但是还「差一点」,就算 Redis 能做到以上这些,也只是「趋近于」专业的消息队列。

原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。

到这里,就不得不把 Redis 与专业的队列中间件做对比了。

下面我们就来看一下,Redis 在作队列时,到底还有哪些欠缺?

与专业的消息队列对比

其实,一个专业的消息队列,必须要做到两大块:

  1. 消息不丢
  2. 消息可堆积

前面我们讨论的重点,很大篇幅围绕的是第一点展开的。

这里我们换个角度,从一个消息队列的「使用模型」来分析一下,怎么做,才能保证数据不丢?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者。

消息是否会发生丢失,其重点也就在于以下 3 个环节:

  1. 生产者会不会丢消息?
  2. 消费者会不会丢消息?
  3. 队列中间件会不会丢消息?

1) 生产者会不会丢消息?

当生产者在发布消息时,可能发生以下异常情况:

  1. 消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败
  2. 不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了

如果是情况 1,消息根本没发出去,那么重新发一次就好了。

如果是情况 2,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止。

生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。

也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理。

但发现没有?这也意味着消息可能会重复发送。

是的,在使用消息队列时,要保证消息不丢,宁可重发,也不能丢弃。

那消费者这边,就需要多做一些逻辑了。

对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。

从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。

所以,无论是 Redis 还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的。

2) 消费者会不会丢消息?

这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了,那消费者还能否重新消费失败的消息?

要解决这个问题,消费者在处理完消息后,必须「告知」队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者。

这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。

无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ、Kafka,其实都是这么做的。

所以,从这个角度来看,Redis 也是合格的。

3) 队列中间件会不会丢消息?

前面 2 个问题都比较好处理,只要客户端和服务端配合好,就能保证生产端、消费端都不丢消息。

但是,如果队列中间件本身就不可靠呢?

毕竟生产者和消费这都依赖它,如果它不可靠,那么生产者和消费者无论怎么做,都无法保证数据不丢。

在这个方面,Redis 其实没有达到要求。

Redis 在以下 2 个场景下,都会导致数据丢失。

  1. AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
  2. 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)

基于以上原因我们可以看到,Redis 本身的无法保证严格的数据完整性。

所以,如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。

再来看那些专业的消息队列中间件是如何解决这个问题的?

像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。

也正因为如此,RabbitMQ、Kafka在设计时也更复杂。毕竟,它们是专门针对队列场景设计的。

但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。

最后,我们来看消息积压怎么办?

4) 消息积压怎么办?

因为 Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。

综上,我们可以看到,把 Redis 当作队列来使用时,始终面临的 2 个问题:

  1. Redis 本身可能会丢数据
  2. 面对消息积压,Redis 内存资源紧张

到这里,Redis 是否可以用作队列,我想这个答案你应该会比较清晰了。

如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。

而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量。

如果你的业务场景对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么我建议你使用专业的消息队列中间件。

总结

好了,总结一下。这篇文章我们从「Redis 能否用作队列」这个角度出发,介绍了 List、Pub/Sub、Stream 在做队列的使用方式,以及它们各自的优劣。

之后又把 Redis 和专业的消息队列中间件做对比,发现 Redis 的不足之处。

最后,我们得出 Redis 做队列的合适场景。

这里我也列了一个表格,总结了它们各自的优缺点。

后记

最后,我想和你再聊一聊关于「技术方案选型」的问题。

你应该也看到了,这篇文章虽然始于 Redis,但并不止于 Redis。

我们在分析 Redis 细节时,一直在提出问题,然后寻找更好的解决方案,在文章最后,又聊到一个专业的消息队列应该怎么做。

其实,我们在讨论技术选型时,就是一个关于如何取舍的问题。

而这里我想传达给你的信息是,在面对技术选型时,不要不经过思考就觉得哪个方案好,哪个方案不好。

你需要根据具体场景具体分析,这里我把这个分析过程分为 2 个层面:

  1. 业务功能角度
  2. 技术资源角度

这篇文章所讲到的内容,都是以业务功能角度出发做决策的。

但这里的第二点,从技术资源角度出发,其实也很重要。

技术资源的角度是说,你所处的公司环境、技术资源能否匹配这些技术方案。

这个怎么解释呢?

简单来讲,就是你所在的公司、团队,是否有匹配的资源能 hold 住这些技术方案。

我们都知道 Kafka、RabbitMQ 是非常专业的消息中间件,但它们的部署和运维,相比于 Redis 来说,也会更复杂一些。

如果你在一个大公司,公司本身就有优秀的运维团队,那么使用这些中间件肯定没问题,因为有足够优秀的人能 hold 住这些中间件,公司也会投入人力和时间在这个方向上。

但如果你是在一个初创公司,业务正处在快速发展期,暂时没有能 hold 住这些中间件的团队和人,如果贸然使用这些组件,当发生故障时,排查问题也会变得很困难,甚至会阻碍业务的发展。

而这种情形下,如果公司的技术人员对于 Redis 都很熟,综合评估来看,Redis 也基本可以满足业务 90% 的需求,那当下选择 Redis 未必不是一个好的决策。

所以,做技术选型不只是技术问题,还与人、团队、管理、组织结构有关。

也正是因为这些原因,当你在和别人讨论技术选型问题时,你会发现每个公司的做法都不相同。

毕竟每个公司所处的环境和文化不一样,做出的决策当然就会各有差异。

如果你不了解这其中的逻辑,那在做技术选型时,只会趋于表面现象,无法深入到问题根源。

而一旦你理解了这个逻辑,那么你在看待这个问题时,不仅对于技术会有更加深刻认识,对技术资源和人的把握,也会更加清晰。

希望你以后在做技术选型时,能够把这些因素也考虑在内,这对你的技术成长之路也是非常有帮助的。

qr_search.png

想看更多硬核技术文章?欢迎关注我的公众号「水滴与银弹」。

我是 Kaito,是一个对于技术有思考的资深后端程序员,在我的文章中,我不仅会告诉你一个技术点是什么,还会告诉你为什么这么做?我还会尝试把这些思考过程,提炼成通用的方法论,让你可以应用在其它领域中,做到举一反三。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…668669670…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%