开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

记一次feign接口耗时调优过程

发表于 2021-08-15

image.png

背景

    说一下我们的技术架构,我们目前是使用 SpringCloud Alibaba 版本,注册\配置中心nacos,接口间调用OpenFeign,网关用的gateway。

起因

    事情是这样的,我们这个项目之前是一个大的单体项目,最近一直在进行微服务拆分,拆出去的模块在单体项目中就用feign调用,这天业务人员在群里抱怨有一个页面很慢,之前也出现过这种情况,是公司网络的问题,因为在网关里看返回时间很短,100ms内,但是从服务器到公司之间很慢,打开网关的日志,准备甩锅给运维,但是这次情况好像不一样,定睛一看,网关里接口返回时间是12秒!!没看错是12秒!12000ms,这下炸锅了,赶紧开始找原因。
打开项目,看到源码,这是一个后台管理页面的一个请求列表数据,确实是有很多逻辑处理,每次返回10个数据,其中每次都要调用3个微服务,之前还是单体的时候,因为是本地方法调用,返回很快,整体接口返回在300ms内,这时候考虑feign调用网络耗时的问题,但是因为是内网调用,就算是http请求也应该很快才对啊。

image.png

查找原因

    这时候分别在3个调用feign接口的前后打印耗时时间,并且看调用的微服务的日志,我们每个微服务都会有一个fitter进行请求耗时统计,看到单体项目里每个feign接口调用耗时大概20-30ms,返回10个数据 每个都是90ms,加上其他的业务处理,怎么也得900ms以上,多调用几次后,发现有的微服务接口返回时间突然到了150ms+,这要是10个数据那不就慢的姥姥家去了,但是还好基本上慢feign调用出现的不是很多,这时候去微服务端查看,发现大部分时间很快1ms内就返回了,偶尔几个是100ms左右,左思右想,这微服务拆分出去后是多了网络消耗,但是不至于这么慢吧,本地远在执行1ms,加上网络消耗后最快也20ms,最慢100多ms,这里面肯定有问题。

  • [ 第一个锅是虚拟机的,主要是我们没使用对 ]
        先看看微服务端,因为执行的方法都一样,为什么有的1ms,有的却很耗时呢?看了看方法,并且没有锁,就是读mysql数据,后面再重复基本都是redis缓存了,那就只有1种可能垃圾回收!连上服务器,jstat -gc pid 1s 每秒打印一下,这一打印不得了,短短上线1周左右,FGC达到了48次,Ygc那就不用说了,首先我考虑是代码问题,会不会有内存泄漏?这个方法简单的令人发指,不应该有内存泄露啊? 看看堆的使用情况 jmap -heap pid ,好嘛,真相大白了,这堆大小只有300M+?要知道我们这台服务器可是4核8G的呀,原来是我们启动参数并没有加上初始化的堆大小,JVM默认初始化堆大小是物理内存的64/1,最大是4/1,也就是说我们的jvm已启动 堆大小是127,最大可以扩容到2G,每次满了就会触发FGC以及扩容,在测试环境重启了下项目,发现一启动就来了3次FGC,好嘛,赶紧加上启动参数,
1
2
ini复制代码-Xms4096m,-Xmx4096m,-XX:MetaspaceSize=256m,-XX:MaxMetaspaceSize=256m
固定了堆初始就4G,最大也4G不扩容,元空间规定256M

    再启动项目,调用看看,基本保持900ms左右,每个feign接口20-30ms,基本没有出现100+ms的feign接口了,GC情况也没有了。

  • [ 难道是http连接池链接不够用了?]
        但是就算20ms也不太对劲,明明我微服务里显示1ms就执行完毕返回了,网络消耗不能这么多吧?我们的feign是配置了httpClient客户端的,基于连接池,难道是连接数太少?看了一下,httpClient默认是200个连接,明明挺多啊,还是调大一些试试看,调到400,发版!不对劲,没有改善还是20-30ms。
  • [ 会不会有什么多余的配置? ]
        难道是我feign配置了编码解码的东西吗?正常来说不需要呀,看看代码配置,发现了如下代码:

image.png

果然是配置了编解码器,看着好像没啥用,删了试试,发版!

  • [完美!]
    这下天空都晴朗了,单体项目在调用的feign的时候基本也是1-2ms,微服务耗时也是1-2ms,整体接口也缩小至100ms内返回了。

总结

  1. JVM堆大小需要定好,不然初始默认物理内存的64/1,容易触发GC,慢慢扩容也很慢,影响系统性能,feign在调用的时候如果对方系统在GC,相应的feign接口也会等待返回100ms虽然不多,但是如果请求多次那受不了。
  2. feign默认走的JDK的http客户端,每次请求都新建链接,需要改成httpClient、或者okHttp3基于连接池的http客户端。
  3. 如果没有什么其他的业务需要,不要配置feign的编解码器,非常的影响性能。
        到此一个接口从12s慢慢优化到100ms内,feign接口从偶尔100ms,平时20-30ms优化到1-2ms,和本地方法执行耗时没有区别。

我是卢囧囧,欢迎大家关注我的公众号,每周不定时分享技术文章,和我一起慢慢成长吧!

qrcode_for_gh_1053c9a91bbb_430.jpg

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

《高性能利器》-32张图带你解决RocketMQ所有场景问题

发表于 2021-08-15

硬核干货分享,欢迎关注【Java补习课】成长的路上,我们一起前行 !

《高可用系列文章》 已收录在专栏,欢迎关注!

一、RocketMQ的基本原理

RocketMQ基本架构图如下

image.png

从这个架构图上我们可以知道,RocketMQ有4块核心部分:

  • NameServer:管理Broker的信息,让使用MQ的系统感知到集群里面的broker
  • Broker:主从架构实现数据多副本存储和高可用
  • producer:生产者
  • consumer:消费者

二、NameServer

2.1 Broker信息注册到哪个NameServer?

每台broker机器需要向所有的NameServer机器上注册自己的信息,防止单台NameServer挂掉导致Broker信息不全,保证NameServer的集群高可用。

2.2 Broker信息怎么注册?

基于Netty的网络通信。

2.3 Broker挂了如何感知?

  • NameServer感知:30s心跳机制和120s故障感知机制

image.png

broker会每隔30秒向NameServer发送一个的心跳 ,NameServer收到一个心跳会更新对应broker的最近一次心跳事件,然后NamServer会每隔十秒运行一个任务,去检查一下各个broker的最近一次心跳的时间,如果超过120s没有收到相应broker的心跳,则判定对应的broker已经挂掉。

三、Broker

3.1 Master-Slave模式

为了保证MQ的数据不丢失而且具备一定的高可用性,我们采用的是主从复制模式。

RocketMQ自身的Master-Slave模式主采取的是Slave主动从Master拉取消息。

3.2 究竟是如何从Master-Slave中进行读写呢?

image.png

  • 生产者在写入消息时,一般写入到Master
  • 消费者在拉取消息时,可能从Master拉取,也可能从Slave拉取,根据Master的负载情况和Slave的同步情况, 由Master给出建议
+ Master负载过高,建议下次从Slave获取消息
+ Slave未同步完全,建议下次从Master获取消息

3.3 Broker宕机分析

3.3.1 Slave宕机

对系统会存在一点影响,但是影响不大,只不过少了Slave Broker,会导致所有的读写压力都集中在Master Broker上

3.3.2 Master宕机:基于Dledger实现RocketMQ高可用自动切换

选举方式这里不做重点介绍。

image.png

四、生产者

4.1 MessageQueue是什么?

我们先看看Topic、Broker、Message之间的关系。

如图比如说一个TopicA有n条消息,然后一个TopicA中的n条数据分配放入给4个MessageQueue1-4。

image.png

所以本质上来说就是一个数据分片机制,通过MessageQueue将一个Topic的数据拆分为很多数据分片,在每个Broker机器上都存储一些MessageQueue。通过这个方法可以实现分布式存储。

4.2 生产者发送消息写入哪个MessageQueue?

image.png

因为从前面我们知道,生产者会跟NameServer通信获取相应Topic的路由数据,从而知道,一个Topic有几个MessageQueue,哪些MessageQueue在哪台Broker机器上,通过对应的规则写入对应的MessageQueue。

4.2.1 Master Broker故障分析

当MasterBroker宕机,此时SlaveBroker正在切换过程中,有一组Broker就没有Master可以写入。

此时我们可以打开Producer的自动容错机制开关:sendLatencyFaultEnable,比如说访问其中一个Broker发现网络延迟有1000ms还无法访问,我们会自动回避这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker。

过一段时间之后,MasterBroker修复好了,或者说SlaveBroker选举成功了,就可以提供给别人访问了。

image.png

4.3 Broker数据存储(核心环节)

Broker数据存储实际上是MQ最核心的环节:

  • 消息吞吐量
  • 消息不丢失

4.3.1 磁盘日志文件CommitLog

image.png

首先,Producer发送消息给Broker,Broker接收到消息后,把这个消息直接顺序写入写入到磁盘上的一个日志文件,叫做CommitLog。

  • CommitLog是由很多磁盘文件组成
  • 每个文件限定最多1GB

4.3.2 ConsumeQueue存储对应消息的偏移量

在Broker中,每一个Topic下的每一个MessageQueue都会有对应一系列的ConsumeQueue文件。

Broker磁盘存储类似于文件树的形式存在:

image.png

ConsumeQueue中存储着对应MessageQueue中的消息在CommitLog中的物理偏移量地址offset。

image.png

如图:

  1. Broker接受消息,顺序写入消息到CommitLog中
  2. 同时找到对应的TopicA/MessageQueue1/ConsumeQueue0写入对应的物理地址
  3. TopicA/MessageQueue1/ConsumeQueue0的物理地址,即为CommitLog文件中一个消息的引用

即:Topic的每个MessageQueue都对应了Broker机器上的多个ConsumeQueue文件,这些ConsumeQueue共同组成保存了MessageQueue的所有消息在CommitLog文件中的物理offset偏移量。

4.3.3 Broker写入磁盘CommitLog怎么近乎内存写性能?

磁盘文件顺序写+OS PageCache写入+OS异步刷盘的策略

image.png

如图:

  1. 数据写入CommitLog时候,不是直接写入磁盘,而是写入OS的PageCache内存缓冲中
  2. 后台开启线程,异步刷盘到CommitLog中

这样的话基本上可以让消息写入CommitLog的性能跟直接写入内存里面是差不多的,所以Broker才能具有高吞吐量。

4.3.4 异步刷盘和同步刷盘

  • 异步刷盘:高吞吐写入+丢失数据风险
  • 同步刷盘:吞吐量下降+数据不丢失

对于日志类型这种场景,可以允许数据的丢失,但是要求比较高的吞吐量,可以采用异步刷盘的方式。另外非核心的业务场景,不涉及重要核心数据变更的场景,也可以使用异步刷盘,比如订单支付成功,发送短信这种场景。但是对于涉及到核心的数据变更的场景,就需要使用同步刷盘,比如订单支付成功后扣减库存。

五、消费者

5.1 一个Topic上多个MessageQueue怎么被消费?

image.png

原则:一个Consumer机器可以消费处理多个MessageQueue,一个MessageQueue只能被一个相同ConsumerGroup中的同一个Consumer消费。

5.2 Broker收到消息拉取请求,返回给消费者处理提交消费进度

Broker收到消息拉取请求后,会找到对应的MessageQueue中开始消费的位置,在ConsumeQueue读取里面对应位置的的消息在CommitLog中的offset

image.png

如图:

  1. consumer找到要消费的MessageQueue对应的ConsumeQueue对应要消费的位置
  2. 消费完成之后消费者返回一个消费状态,broker会存储我们的消费位置
  3. 接下来可以根据这个消费位置进行下一步消费,不需要从头拉取

5.3 消费者消费消息的性能问题

生产者是基于os cache提升写性能的,broker收到一条消息,会写入CommitLog文件,但是会先把CommitLog文件中的数据写入os cache(操作系统管理的缓存中),然后os开启后台线程,异步的将os cache缓存中的CommitLog文件的数据刷入磁盘。

在消费者消费信息的时候:

第一步,我们会去读取ConsumeQueue中的offset偏移量,此时大量的读取压力全部都在ConsumeQueue,ConsumeQueue文件的读性能是很大程度上会影响消息拉取的性能和吞吐量。

所以,Broker对ConsumeQueue文件也是基于os cache来进行优化的。

image.png

实际上,ConsumeQueue主要只是存放消息的offset,所以每个文件很小,占不了多少磁盘空间,完全可以被os缓存在内存里。所以几乎可以说消息的读取性能达到内存级别。

第二步,根据读取到的offset去CommitLog里读取消息的完整数据。此时会有两种可能

  • 第一种:如果读取的是刚刚写入到CommitLog的数据,那么大概率他们还停留在os cache中,此时可以顺利的直接从os cache中读取CommitLog中的数据,这个就是直接读取内存,性能很高。
  • 第二种:读取较早之前的CommitLog的数据,已经被刷入磁盘不在os cache里面了,此时只能从磁盘上的文件读取了,这个性能稍微差一点。

这两种状态很好区分,比如说消费者一直在快速的拉取和消费处理,跟上了broker的消息写入速率,这么来说os cache中每次CommitLog的消息还没来得及被刷入磁盘中的时候就被消费者消费了;但是比如说broker负载很高,拉取消息的性能很低,跟不上生产者的速率,那么数据会保存在磁盘中进行读取。

5.4 Master Broker什么时候通知你去Slave Broker读取?

根据以上,我们可以判断了什么时候Master Broker负载会高,也就是当消费者读取消息的时候,要从磁盘中加载大量的数据出来,此时Master Broker就会知道本次的负载会比较高,通知消费者下次从Slave Broker去拉取数据。

本质上就是对比当前没有拉取消息的数量和大小,以及最多可以存放在os cache内存里的消****息的大小,如果没有拉取的消息超过了最大能使用的内存的量,那么之后会频繁的从磁盘加载数据,此时就让你从slave broker去加载数据了!

六、问题分析

举一个简单的例子作为分析的入口,将从各个环节可能发生的问题进行深入分析,如图:

image.png

  1. 用户进行一笔生活缴费
  2. 订单系统推送缴费订单支付消息到RocketMQ
  3. 红包系统接受订单消息
  4. 发红包给用户

6.1 消息发送失败

消息发送失败的原因多种多样,存在于多个环节,我们一一分析。

6.1.1 系统推送消息丢失

image.png

第一个环节就是,订单系统推送消息到MQ的过程中,由于网络等因素导致消息丢失。

6.1.2 RocketMQ的事务消息原理分析

为了解决系统推送消息丢失问题,RocketMQ有一个非常强悍的功能就是事务消息,能够确保我们消息一定会成功写入MQ里面,不会半路搞丢。

如图是在本系统中的一个基本事务消息的流程图。

image.png

  1. 订单系统先发送half消息到MQ中,试探MQ是否正常

如果此阶段,half消息发送给MQ失败,会执行一系列回滚操作,关闭这个订单的状态,因为后续的消息都操作不了

  1. 当half消息成功被RocketMQ接收时
1. 返回half消息的成功响应,进入第3步
2. 返回的响应未收到,但是此时MQ已经存储下来了一条half消息,进入第5步
  1. 得知half消息发送成功之后,订单系统可以更新数据库,此时会有两种情况对应两种不同的提交
1. 更新数据库等操作一切顺利,向RocketMQ发送一个commit请求
2. 由于网络异常或者数据库挂了等,为了执行数据库更新等操作,更新不了订单状态,发送rollback请求
3. 发送rollback或者commit失败,跳转到第5步
  1. RocketMQ收到commit或者rollback请求
1. 收到rollback请求删除half消息
2. 收到commit请求改变half消息状态为已提交,红包系统可以开始消费消息
  1. 未收到commit和rollback请求的消息,RocketMQ会有补偿机制,回调接口去判断订单的状态是已关闭,则发送rollback进行回滚。

6.1.3 RocketMQ的事务消息底层分析

image.png

如图解释如下:

  • 消费系统对half消息不可见的原因: 我们知道,消费者是是通过ConsumeQueue获取到对应的CommitLog里面的消息,如图,消费系统对half消息不可见的原因是因为half消息在未提交的时候,MQ维护了一个内部的TRANS_HALF_TOPIC,此时红包系统只获取TopicA中的MessageQueue中ConsumeQueue。
  • 返回half消息成功的响应时机: 当half消息写入成功到TRANS_HALF_TOPIC中的ConsumeQueue的时候,就回认为写入消息成功,返回给对应的订单系统成功响应。
  • 补偿机制: RocketMQ会启动一个定时任务,定时扫描half消息状态,如果还是为half消息,则回调订单系统接口,判断状态。
  • 如何标记消息回滚或提交: 消息回滚并不是直接删除,而是内部维护了一个OP_TOPIC,用一个OP操作来标记half消息的状态。
  • 执行commit操作后消费系统可见: 执行commit操作之后,OP操作会标记half为commit状态,并且把对应消息在TRANS_HALF_TOPIC中的消息offset写入到TOPICA中,此时消息可见

6.1.4 思考:一定要用事务消息吗?

上面这么复杂的事务消息机制可能导致整体的性能比较差,而且吞吐量会比较低,我们一定要用事务消息吗?

可以基于同步发送消息+反复多次重试的方案

6.1.5 消息成功发送到MQ中了,就一定不会丢了吗?

我们可以分析的到,事务消息能够保证我们的消息从生产者成功发送到broker中对应的消费者需要消费的Topic中,我们认为他的消息推送成功。

问题一:

但是这个消息推送仅仅先是推送到os cache缓存中,仅仅只是可以被消费系统看到,由于消息积压等原因,还没来得及去获取这条消息,还没来得及刷到ConsumeQueue的磁盘文件中去,此时万一机器突然宕机,os cache中的数据全部丢失,此时消息必然丢失,消费系统无法读到这条消息。

如图示意:image.png

解决:

为了解决这个问题,一定要确保消息零丢失的话,我们的解决办法就是将异步刷盘调整为同步刷盘。

放弃了异步刷盘的高吞吐量,确保消息数据的零丢失,也就是说只要MQ返回响应half消息发送成功了,此时消息就已经进入了磁盘文件了。

问题二:

就算os cache的消息写入ConsumeQueue的磁盘文件了,红包没来得及消费这条消息的时候,磁盘突然就坏了,一样会导致消息丢失。

image.png

所以说,无论是通过同步发送消息+反复多次重试的方案,还是事务消息的方案,哪怕保证写入MQ成功了,消息未必不会丢失。

解决:

对Broker使用主从架构的模式,每一个MasterBroker至少有一个SlaveBroker去同步他的数据,而且一条消息写入成功,必须让SlaveBroker也写入成功,保证数据有多个副本的冗余。

6.1.6 红包系统拿到了消息就一定会消费消息吗?

不一定。

问题分析:

因为当红包系统拿到消息数据进内存里时,此时还没有执行发红包的逻辑,然后此时红包系统就已经提交了这条消息的offset到broker中告诉broker已经消费掉了这条消息,消息位置会往后移。然后此时红包系统宕机,这条消息就会丢失,永远执行不了发红包的逻辑。

RocketMQ解决方案: 利用消息监听器同步处理消息

image.png

在RocketMQ的Consumer的默认消费模式下,我们在消息监听器中接收到一批消息之后,会执行处理消息的逻辑,处理完成之后才会返回SUCCESS状态提交offset到broker中,如果处理时宕机,不会返回SUCCESS状态给broker,broker会继续将这个消息给下一个Consumer消费。

6.2 消息发送全链路零丢失方案总结

6.2.1 发送消息到MQ的零丢失

  • 同步发送消息+反复多次尝试
  • 事务消息机制

6.2.2 MQ收到消息之后的零丢失

  • 同步刷盘策略:解决os cache未能刷入磁盘问题
  • 主从架构同步机制:解决单个broker磁盘文件损坏问题

6.2.3 消费消息的零丢失

  • 采用同步处理消息方式

6.2.4 适用场景

首先,消息零丢失方案会必然的导致从头到尾的性能下降和MQ的吞吐量下降

一般和金钱、交易以及核心数据相关的系统和核心链路,可以用这套零消息丢失方案:比如支付系统、订单系统等。

6.3 消息发送重复

重复发红包等问题

6.3.1 发送方重复发送

如图:

image.png

  • 用户支付缴费订单时候,会通知订单系统发送订单支付消息
  • 此时订单系统响应超时
  • 支付系统再次重试调用订单接口通知发送消息
  • 两个订单都成功,推送两条相同的消息到MQ
  • 红包系统收到两条消息发送两个红包

有类似很多这种消息重试,接口重试的情况都会有消息重复发送的可能性,还比如说当你发送消息成功到MQ,MQ返回的SUCCESS的响应由于网络原因未收到,重试机制会再次发送消息,导致消息重复。

解决方案:幂等性机制

  • 业务判断法:RocketMQ支持消息查询功能

image.png

  1. 由于订单系统调用超时,重试调用接口
  2. 当订单系统发消息之前,发送请求到MQ查询是否存在这条消息
  3. 如果MQ已经存在,则不重复发送
  • Redis缓存:

image.png

Redis缓存思想也比较简单,只需要根据对应的订单信息去缓存里面查询一下是否已经发送给MQ了。

但是这种解决方案也不是绝对的安全,因为你消息发送成功给MQ了还没来得及写Redis系统就挂了,之后也会被重复调用。

总结以上两种解决方案,我们不建议在消息的发送环节保证消息的不重复发送,会影响接口性能。

6.3.2 消费方重复消费

image.png

  • 消费方消费消息,执行完了发红包逻辑后,应该返回SUCCESS状态,提交消费进度
  • 但是刚发完红包,没来得及提交offset消费进度,红包系统重启
  • MQ没收到offset消费进度返回,将这个消息继续发送到消费系统进行消费
  • 二次发送红包。

解决方案:

  • 依据在生产者方设置消息的messageKey,然后每一条消息在消费方依据这个唯一的messageKey,进行幂等判断:
  • 业务判断,判断这个业务的环节是否执行成功,如果没有成功则消费,成功则舍弃消息
  • 维护一个消息表,当新的消息到达的时候,根据新消息的id在该表中查询是否已经存在该id,如果存在则表明消息已经被消费过,那么丢弃该消息不再进行业务操作即可
  • 若是消息,然后执行insert数据库方法,可以建立一个唯一主键,插入会保证不会重复

6.4 死信队列

通过以上的学习,我已经基本解决了MQ消息不丢失以及不会重复处理消息的问题,在正常流程下基本上没有什么问题。但是会出现以下问题。

我们一直都是假设的一个场景就是红包系统的MessageListener监听回调函数接收到消息都能顺利的处理好消息逻辑,发送红包,落库等操作,返回SUCCESS,提交offset到broker中去,然后从broker中获取下一批消息来处理。

如图:

image.png

问题:

但是如果在我们MessageListener处理消息逻辑时候,红包数据库宕机了,没办法完成发红包的逻辑,此时出现对消息处理的异常,我们应该怎么处理呢?

解决方案:

在MessageListener中,除了返回SUCCESS状态,我们还可以返回RECONSUME_LATER状态,也就是用try-cache包裹住我们的业务代码,成功则返回SUCCESS状态,顺利进行后续操作,如果出现异常则返回RECONSUME_LATER状态。

当RocketMQ收到我们返回的RECONSUME_LATER状态之后,会将这批消息放到对应消费组的重试队列中。

image.png

重试队列里面的消息会再次发给消费组,默认最多重试16次,如果重试16次失败则进入死信队列。

死信队列:

对于死信队列,一般我们可以专门开一个后台线程,订阅这个死信队列,对死信队列中的消息,一直不停的尝试。

6.5 消息乱序

6.5.1 业务场景

大数据团队要获取到订单系统的binlog,然后保存一份在自己的大数据存储系统中

数据库binlog:记录数据库的增删改查操作。

大数据团队不能直接跑复杂的大SQL在订单系统的数据库中跑出来一些数据报表,这样会严重影响订单系统的性能,为了优化方案,我们采用类似基于Canal这样的中间件去监听订单数据的binlog,然后把这个binlog发到MQ中去,然后我们的大数据系统自己用MQ里获取binlog,自己在自己的大数据存储中执行增删改查操作,得到我们需要的报表,如图下:

image.png

6.5.2 乱序问题原理分析

image.png

  • Canal监听到binlog日志中,操作数据库的顺序为先执行insert插入操作,后update更新操作。
  • 因为我们消息可能会发送到不同MessageQueue中的不同的ConsumeQueue中去
  • 然后同一个消费组的大数据消费系统获取到insert binlog和update binlog,这两个是并行操作的,所以不能确定哪个消息先获取到执行,可能会出现消息乱序。

6.5.3 消息乱序解决方案

出现上面问题的原因,根本问题就是一个订单binlog分别进入了两个MessageQueue中,解决这个问题的方法其实非常简单,就是得想办法让同一个订单的binlog进入到一个MessageQueue里面去。

方法很简单:我们可以根据订单id对MessageQueue的数量取模来对应每个订单究竟去哪个MessageQueue。

消息乱序解决方案不能和重试队列混用。

6.6 延迟消息

6.6.1 业务场景

大量订单点击提交未支付,超过30min需要自动退款,我们研究需要定时退款扫描问题。

如图:

image.png

当一个订单下单之后,没有支付会进入订单数据库保存,如果30分钟内没有支付,就必须订单系统自动关闭这个订单。

可能我们就需要有一个后台的线程,不停的去扫描订单数据库里所有的未支付状态的订单,超过30分钟了就必须把订单状态更新为关闭。这里会有一个问题,订单系统的后台线程必须不停的扫描各种未支付的订单,可能每个未支付的订单在30分钟之内会被扫描很多遍。这个扫描订单的服务是一个麻烦的问题。

针对这种场景,RocketMQ的延迟消息就登场了。

如图:

image.png

  • 创建一个订单,发送一条延迟消息到MQ中去
  • 需要等待30分钟之后,才能被订单扫描服务消费
  • 当订单扫描服务在30分钟后消费了一条消息,就针对这条消息查询订单数据库
  • 看过了30分钟了,他的支付状态如果是未支付,则关闭,这样只会被扫描到一次

所以RocketMQ的延迟消息,是非常常用并且非常有用的一个功能

6.7 经验总结

6.7.1 运用tags过滤数据

在一些真正的生产项目中,我们需要合理的规划Topic和里面的tags,一个Topic代表了某一类的业务消息类型数据,我们可以通过里面的tags来对同一个topic的一些消息进行过滤。

6.7.2 基于消息key来定位消息是否丢失

我们在消息零丢失方案中,万一消息真的丢失了,我们怎么去排查呢?在RocketMQ中我们可以给消息设置对应的Key值,比如设置一个订单ID:message.setKeys(orderId),这样这个消息就和一个key绑定起来,当这个消息发送到broker中去,会根据对应message的数量构建hash索引,存放在IndexFile索引文件中,我们可以通过MQ提供的命令去查询。

6.7.3 消息零丢失方案的补充

在我们这种大型的金融级的系统,或者跟钱有关的支付系统等等,需要有超高级别的高可用保障机制,所以对于零消息丢失解决方案来说,万一一整个MQ集群彻底崩溃了,我们需要有更完善的措施来保证我们消息不会丢失。

此时生产者发送不了消息到MQ,所以我们生产者就应该把消息在本地进行持久化,可以是存在本地磁盘,也可以是在数据库里去存起来,MQ恢复之后,再把持久化的消息投递到MQ中去。

6.7.4 提高消费者的吞吐量

最简单的方法去提高消费者的吞吐量,就是提高消费者的并行度,比如说部署更多的Consumer机器去消费消息。但是我们需要明确的一点就是对应的MessageQueue也要增加,因为一个MessageQueue只能被一个Consumer机器消费。

第二个办法是我们可以增加Consumer的线程数量,消费线程乐队,消费速度越快。

第三个办法是我们可以开启消费者的批量消费功能(有对应的参数设置)。

6.7.5 要不要消费历史记录

Consumer是支持设置在哪里开始消费的,常见的有两种:从Topic的第一条数据消费(CONSUME_FROM_LAST_OFFSET),或者是从最后一次消费过的消息之后开始消费(CONSUME_FROM_FIRTST_OFFSET),我们一般都是设置选择后者。

七、百万消息积压问题

7.1 业务场景

如图所示:在一个系统中,由生产者系统和消费者系统两个环节组成,生产者不断的向MQ里面写入消息,消费者不断的从MQ中消费消息。突然有一天消费者依赖的一些数据库挂了,消费者就处理不了当下的业务逻辑,消息也不能正常的被消费,此时生产者还在正常的向MQ中写入消息,结果在高峰期内,就往MQ中写入了百万条消息,都积压在了MQ里面了。

image.png

7.2 解决方案

第一, 最简单粗暴的方法,如果我们的消息能够容忍百万消息的丢失,那么我们可以直接修改消费者系统的代码,丢弃所有的消息,那么百万消息很快就被处理完了,但是往往对于绝大多数系统而言,我们不能使用这种办法。

第二, 我们需要等待消费者依赖的数据库恢复之后,根据线上的Topic的MEssageQueue来判断后续如何处理。

MessageQueue数量多:

  • 比如说我现在一个Topic中有20个MessageQueue,有4个消费者系统在消费,那么我每个消费者就会从5个MessageQueue中获取消息进行消费,毕竟积压了百万消息,仅仅依赖4个消费者是远远不够的。
  • 所以我们可以临时申请16台机器多部署16个消费者,这样20个消费者去同时消费20个MessageQueue,速度提高了5倍,积压的百万消息很快就能处理完毕。
  • 但是此时我们要考虑的是,增加了5倍的消费能力,那么数据库的压力就增加了5倍,这个是我们需要考虑的

如图:

image.png

MessageQueue数量少:

  • 比如说一个Topic总共就只有4个MessageQueue,然后就只有4个消费者系统,这时候没办法扩容消费系统
  • 所以此时我们可以临时修改那4个消费者系统的代码,让他们获取的消息不写入数据库,而是写入一个新的topic
  • 新的Topic有新增的20个MessageQUeue,部署20台临时增加的消费者系统去消费新的Topic中的Message。

如图:

image.png

八、MQ集群数据迁移问题:双读+双写

要做MQ的集群迁移,不是简单的粗暴的把Producer更新停机,新的代码重新上线发到新的MQ中去。

一般来说,我们需要做到两件事情:

  • 双写: 要迁移的时候,我们需要在所有的Producer系统中,要引入一个双写的代码,让他同时往新老两个MQ中写入消息,多写几天,起码要持续一个星期,我们会发现这两个MQ的数据几乎一模一样了,但是双写肯定是不够的的,我们还要同时进行双读。
  • 双读: 也就是说我在双写的时候,所有的Consumer系统都需要同时从新老两个MQ里面获取消息,分别都用一模一样的逻辑处理,只不过从老MQ中还是去走核心逻辑处理,可以落库存储之类的操作,但是新的MQ我们可以用一样的逻辑处理,但是不能把处理的结果具体落库,我们可以写入一个临时的存储中。
  • 观察: 双写和双读一段时间之后,我们通过消费端对比,发现处理消息数量一致。
  • 切换: 正式实施切换,停机Producer系统,再重新修改上线,全部修改为新MQ,此时他数据并不会丢失,因为之前已经双写一段时间了,然后Consumer系统代码修改上线。

点关注,不迷路

好了各位,以上就是这篇文章的全部内容了,我后面会每周都更新几篇高质量的大厂面试和常用技术栈相关的文章。感谢大伙能看到这里,如果这个文章写得还不错, 求三连!!! 感谢各位的支持和认可,我们下篇文章见!

我是 九灵 ,有需要交流的童鞋可以关注公众号:Java 补习课! 如果本篇博客有任何错误,请批评指教,不胜感激 !

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文入门zookeeper(下)

发表于 2021-08-15

这是我参与8月更文挑战的第15天,活动详情查看:8月更文挑战

一、集群安装

这里,提前准备好了三台机器,名为 hadoop101,Hadoop102、hadoop103.

我们需要在三台机器上都安装上 zookeeper。

1、将安装包上传到 linux 并解压,然后重命名为zookeeper

1
arduino复制代码 tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/

2、配置服务器的编号。

三台服务器的编号需要自己新建一个myid 文件,同时在 zoo.cfg 目录中指向这个文件所在的文件夹。

我们在 zookeeper 目录下创建一个 zkData 文件夹,然后里面新增myid 文件,写上编号1

1
2
bash复制代码 mkdir zkData
 touch myid

在文件中添加与 server 对应的编号:

1
复制代码 1

其他两台分别编上 2 和 3 。

3、配置 zoo.cfg 文件 将 conf/zoo_sample.cfg 重命名为 zoo.cfg

1
bash复制代码 vim conf/zoo.cfg

修改dataDir

1
ini复制代码 dataDir=/opt/sofeware/zookeeper/zkData

在文件末尾添加集群配置:

1
2
3
4
ini复制代码 #######################cluster##########################
 server.1=hadoop101:2888:3888
 server.2=hadoop102:2888:3888
 server.3=hadoop103:2888:3888

这里的配置参数需要讲解一下:

1
ini复制代码 server.A=B:C:D。

A 是一个数字,表示这个是第几号服务器;

集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。

B 是这个服务器的地址;

C 是这个服务器Follower与集群中的Leader服务器交换信息的端口;

D 是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

4、集群操作

可以直接启动 bin/zkServer.sh start 。但是需要每台机器上都配置下

集群下面为了方便写了个群起脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bash复制代码 #!/bin/bash
 for i in hadoop101 hadoop102 hadoop103
 do
    echo "===== $i zookeeper======"
    case $1 in
    "start")
        echo "==================== START $i ZOOKEEPER ==================== "
        ssh $i /opt/sofeware/zookeeper/bin/zkServer.sh start
    ;;
    "status")
        echo "==================== STATUS $i ZOOKEEPER ==================== "
        ssh $i /opt/sofeware/zookeeper/bin/zkServer.sh status
    ;;
    "stop")
        echo "==================== STOP $i ZOOKEEPER ==================== "
        ssh $i /opt/sofeware/zookeeper/bin/zkServer.sh stop
    ;;
    *)
    echo "Input error"
    exit
    ;;
    esac
 done

直接群起:

1
2
csharp复制代码 [root@hadoop101 bin]# zkServer.sh start
 ​

查看状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ini复制代码 [root@hadoop101 bin]# zkServer.sh status
 ===== hadoop101 zookeeper======
 ==================== STATUS hadoop101 ZOOKEEPER ====================
 /usr/bin/java
 ZooKeeper JMX enabled by default
 Using config: /opt/sofeware/zookeeper/bin/../conf/zoo.cfg
 Client port found: 2181. Client address: localhost.
 Mode: follower
 ===== hadoop102 zookeeper======
 ==================== STATUS hadoop102 ZOOKEEPER ====================
 /usr/bin/java
 ZooKeeper JMX enabled by default
 Using config: /opt/sofeware/zookeeper/bin/../conf/zoo.cfg
 Client port found: 2181. Client address: localhost.
 Mode: leader
 ===== hadoop103 zookeeper======
 ==================== STATUS hadoop103 ZOOKEEPER ====================
 /usr/bin/java
 ZooKeeper JMX enabled by default
 Using config: /opt/sofeware/zookeeper/bin/../conf/zoo.cfg
 Client port found: 2181. Client address: localhost.
 Mode: follower
 ​

可以看到 那台是主机、哪些是从机信息。

二、客户端基本命令

命令基本语法 功能描述
help 显示所有操作命令
ls path 使用 ls 命令来查看当前znode的子节点 [可监听]-w 监听子节点变化-s 附加次级信息
create 普通创建-s 含有序列-e 临时(重启或者超时消失)
get path 获得节点的值 [可监听]-w 监听节点内容变化-s 附加次级信息
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
deleteall 递归删除节点
2.1 启动客户端
1
bash复制代码 bin/zkCli.sh
2.2 显示所有操作命令
1
scss复制代码 [zk: localhost:2181(CONNECTED) 1] help
2.3 查看当前节点包含的内容
1
2
csharp复制代码 [zk: localhost:2181(CONNECTED) 0] ls /
 [zookeeper]
2.4 查看当前节点详细数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
ini复制代码 [zk: localhost:2181(CONNECTED) 2] ls2 /
 'ls2' has been deprecated. Please use 'ls [-s] path' instead.
 [zookeeper]
 cZxid = 0x0
 ctime = Thu Jan 01 08:00:00 CST 1970
 mZxid = 0x0
 mtime = Thu Jan 01 08:00:00 CST 1970
 pZxid = 0x0
 cversion = -1
 dataVersion = 0
 aclVersion = 0
 ephemeralOwner = 0x0
 dataLength = 0
 numChildren = 1
2.5 创建普通节点
1
2
3
4
5
bash复制代码 [zk: localhost:2181(CONNECTED) 3] create /jiangxi "nanchang"
 Created /jiangxi
 ​
 [zk: localhost:2181(CONNECTED) 4] create /jiangxi/nc "hl"
 Created /jiangxi/nc
2.6 获得节点的值
1
2
3
4
csharp复制代码 [zk: localhost:2181(CONNECTED) 5] get /jiangxi
 nanchang
 [zk: localhost:2181(CONNECTED) 6] get /jiangxi/nc
 hl
2.7 创建临时节点
1
bash复制代码 create -e /jiangxi/yichun "ll"

这个临时节点在重启客户端之后就无法见到了。

2.8 创建带序号的节点
1
2
3
4
bash复制代码 [zk: localhost:2181(CONNECTED) 7] create -s /jiangxi "jinxian"
 Created /jiangxi0000000001
 [zk: localhost:2181(CONNECTED) 8] create -s /jiangxi "yushan"
 Created /jiangxi0000000002
2.9 修改节点的值
1
2
3
csharp复制代码 [zk: localhost:2181(CONNECTED) 9] set /jiangxi "xiazhen"
 [zk: localhost:2181(CONNECTED) 10] get /jiangxi
 xiazhen
2.10 节点的值变化监听

hadoop102 机器开启监听:

1
csharp复制代码 [zk: localhost:2181(CONNECTED) 8] get -w /jiangxi

hadoop101 机器修改值之后,hadoop102 能观测到该值变化

1
2
3
4
bash复制代码 [zk: localhost:2181(CONNECTED) 1] 
 WATCHER::
 ​
 WatchedEvent state:SyncConnected type:NodeDataChanged path:/jiangxi

如果节点的子节点发生变化,则信息如下:

1
2
bash复制代码 WATCHER::
 WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/jiangxi
2.11 删除节点
1
arduino复制代码 delete /jiangxi/yushan
2.12 查看节点状态
1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码 [zk: localhost:2181(CONNECTED) 1] stat /jiangxi
 cZxid = 0x100000002
 ctime = Fri Aug 13 14:34:44 CST 2021
 mZxid = 0x100000008
 mtime = Fri Aug 13 14:39:31 CST 2021
 pZxid = 0x100000003
 cversion = 1
 dataVersion = 2
 aclVersion = 0
 ephemeralOwner = 0x0
 dataLength = 7
 numChildren = 1

三、API应用

1、添加 jar 包

1
2
3
4
5
6
7
8
9
10
11
12
xml复制代码 <dependencies>
  <dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>RELEASE</version>
  </dependency>
  <dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.5.7</version>
  </dependency>
 </dependencies>

2、APi 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
java复制代码 public class ZookeeperTest {
    public static String connectString = "hadoop101:2181,hadoop102:2181,hadoop103:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;
 ​
    /**
      * 创建 zookeeper 客户端
      * @throws IOException
      */
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 收到事件通知后的回调函数(用户的业务逻辑)
                System.out.println(watchedEvent.getType()+"--"+watchedEvent.getPath());
                // 再次启动监控
                try {
                    zkClient.getChildren("/",true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
 ​
    /**
      * 创建子节点
      * @throws InterruptedException
      * @throws KeeperException
      */
    @Test
    public void create() throws InterruptedException, KeeperException {
        // 参数1:要创建的节点的路径;
        // 参数2:节点数据 ;
        // 参数3:节点权限 ;
        // 参数4:节点的类型
        String node = zkClient.create("/xiaolei","leilei".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    }
 ​
    /**
      * 获取子节点并监听节点变化
      */
    @Test
    public void getChildren() throws InterruptedException, KeeperException {
        List<String> children = zkClient.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }
        // 延时阻塞,保持服务器在线
        Thread.sleep(Long.MAX_VALUE);
    }
 ​
    /**
      * 判断zookeeper 是否存在
      * @throws InterruptedException
      * @throws KeeperException
      */
    @Test
    public void exist() throws InterruptedException, KeeperException {
        Stat exists = zkClient.exists("/xiaolei", false);
        if(exists==null){
            System.out.println("not exist");
        }else{
            System.out.println("exist");
        }
    }
 ​
 }

四、zookeeper 总结

4.1 什么是 zookeeper

概述+作用+内部结构+应用场景+优缺点。

zookeeper 它是在大数据或者分布式领域中的一个常用的中间件,它主要用来当作注册中心的存在,用于服务注册和服务发现。它的内部结构和 linux文件系统很相似,整体看作一颗树,每个节点叫做 znode,默认能够存储 1mb的数据,这个节点也分为临时节点和持久两种类型, 总结下来,它就是一个分布式协调服务的框架。

4.2 zookeeper 都有哪些功能

它提供的服务包括:

  • 统一命名管理:客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息。
  • 统一配置管理:把公共的配置文件抽取出来,分发给其他系统。
  • 统一集群管理:监控节点存活状态、运行请求等;
  • 软负载均衡:让访问最少的服务器去处理最新的客户端请求。
  • 分布式锁:临时顺序节点实现的。独占锁、共享锁。独占锁即一次只能有一个线程使用资源,共享锁是读锁共享,读写互斥,即可以有多线线程同时读同一个资源,如果要使用写锁也只能有一个线程使用。Zookeeper 可以对分布式锁进行控制。

4.3 ZAB 协议你了解吗

Zookeeper 的核心是原子广播机制,这个机制保证了各个 server 之间的同步。实现这个机制的协议叫做 Zab 协议。Zab 协议有两种模式,它们分别是恢复模式和广播模式。

恢复模式

当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数 server 完成了和 leader 的状态同步以后,恢复模式就结束了。状态同步保证了 leader 和 server 具有相同的系统状态。

广播模式

一旦 leader 已经和多数的 follower 进行了状态同步后,它就可以开始广播消息了,即进入广播状态。这时候当一个 server 加入 ZooKeeper 服务中,它会在恢复模式下启动,发现 leader,并和 leader 进行状态同步。待到同步结束,它也参与消息广播。ZooKeeper 服务一直维持在 Broadcast 状态,直到 leader 崩溃了或者 leader 失去了大部分的 followers 支持。

4.4 znode 节点类型

(1)PERSISTENT-持久节点

除非手动删除,否则节点一直存在于 Zookeeper 上

(2)EPHEMERAL-临时节点

临时节点的生命周期与客户端会话绑定,一旦客户端会话失效(客户端与zookeeper 连接断开不一定会话失效),那么这个客户端创建的所有临时节点都会被移除。

(3)PERSISTENT_SEQUENTIAL-持久顺序节点

基本特性同持久节点,只是增加了顺序属性,节点名后边会追加一个由父节点维护的自增整型数字。

(4)EPHEMERAL_SEQUENTIAL-临时顺序节点

基本特性同临时节点,增加了顺序属性,节点名后边会追加一个由父节点维护的自增整型数字。

4.5 通知机制

客户端会对某个 znode 建立一个 watcher 事件,当该znode 发生变化时,这些 client 会收到 zk 的通知,然后 client 可以根据 znode 变化来做出业务上的改变等。

更多面试题:blog.csdn.net/ThinkWon/ar…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于Java和Bytemd用120行代码实现一个桌面版Mar

发表于 2021-08-15

这是我参与8月更文挑战的第2天,活动详情查看:8月更文挑战

前提

某一天点开掘金的写作界面的时候,发现了内置Markdown编辑器有一个Github的图标,点进去就是一个开源的Markdown编辑器项目bytemd(https://github.com/bytedance/bytemd):

这是一个NodeJs项目,由字节跳动提供。联想到之前业余的时候做过一些Swing或者JavaFx的Demo,记得JavaFx中有一个组件WebView已经支持Html5、CSS3和ES5,这个组件作为一个嵌入式浏览器,可以轻松地渲染一个URL里面的文本内容或者直接渲染一个原始的Html字符串。另外,由于原生的JavaFx的视觉效果比较丑,可以考虑引入Swing配合IntelliJ IDEA的主题提供更好的视觉效果。本文的代码基于JDK11开发。

引入依赖

很多人吐槽过Swing组件的视觉效果比较差,原因有几个:

  • 技术小众,现在有更好的组件进行混合开发和跨平台开发
  • 基于上一点原因,导致很少人会去开发Swing组件的UI,其实Swing的每个组件都可以重新实现UI的表现效果
  • compose-jb(JetBrains)组件很晚才发布出来,刚好碰上Swing官方停止维护,后面应该更加少人会使用Swing做GUI开发

使用Swing并且成功了的方案最知名的就是JetBrains全家桶。目前来看,为了解决这个”丑”的问题,现在有比较简单的处理方案:

  • 方案一:使用compose-jb(名字有点不好听,官方仓库为https://github.com/JetBrains/compose-jb)开发,这个是JetBrains系列的通用组件,基于Swing做二次封装,不过必须使用语言Kotlin,有点强买强卖的嫌疑,这列贴两个官方的图参考一下:

  • 方案二:FormDev(之前推出过Swing布局器的开发商,官网https://www.formdev.com/flatlaf)提供的FlatLaf(Flat Look and Feel),提供了Light Dark IntelliJ and Darcula themes,而且依赖少,使用起来十分简单,个人认为当前这个是Swing UI组件视觉效果首选

引入FlatLaf和OpenFx的依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
xml复制代码<dependency>
<groupId>com.formdev</groupId>
<artifactId>flatlaf</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>com.formdev</groupId>
<artifactId>flatlaf-intellij-themes</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>org.openjfx</groupId>
<artifactId>javafx-media</artifactId>
<version>11.0.2</version>
</dependency>
<dependency>
<groupId>org.openjfx</groupId>
<artifactId>javafx-swing</artifactId>
<version>11.0.2</version>
</dependency>
<dependency>
<groupId>org.openjfx</groupId>
<artifactId>javafx-web</artifactId>
<version>11.0.2</version>
</dependency>
<dependency>
<groupId>org.openjfx</groupId>
<artifactId>javafx-base</artifactId>
<version>11.0.2</version>
</dependency>
<dependency>
<groupId>org.openjfx</groupId>
<artifactId>javafx-graphics</artifactId>
<version>11.0.2</version>
</dependency>
<dependency>
<groupId>org.openjfx</groupId>
<artifactId>javafx-controls</artifactId>
<version>11.0.2</version>
</dependency>

布局和实现

布局的实现比较简单:

最终的H5文本渲染在WebView组件中(JFXPanel是JavaFx => Swing的适配器,WebView是JavaFx的组件,但是这里使用的外层容器都是Swing组件),具体的编码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
java复制代码public class MarkdownEditor {

private static final int W = 1200;
private static final int H = 1000;
private static final String TITLE = "markdown editor";

public static String CONTENT = "<!DOCTYPE html>\n" +
"<html lang=\"en\">\n" +
"<head>\n" +
" <meta charset=\"UTF-8\"/>\n" +
" <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"/>\n" +
" <title>ByteMD example</title>\n" +
" <link rel=\"stylesheet\" href=\"https://unpkg.com/bytemd/dist/index.min.css\"/>\n" +
" <link rel=\"stylesheet\" href=\"https://unpkg.com/github-markdown-css\"/>\n" +
" <script src=\"https://unpkg.com/bytemd\"></script>\n" +
" <script src=\"https://unpkg.com/@bytemd/plugin-gfm\"></script>\n" +
" <script src=\"https://unpkg.com/@bytemd/plugin-highlight\"></script>\n" +
" <style>\n" +
" .bytemd {\n" +
" height: calc(100vh - 50px);\n" +
" }\n" +
"\n" +
" .footer {\n" +
" width: 100%;\n" +
" height: 30px;\n" +
" left: 0;\n" +
" position: absolute;\n" +
" bottom: 0;\n" +
" text-align: center;\n" +
" }\n" +
" </style>\n" +
"</head>\n" +
"<body>\n" +
"<div class=\"footer\">\n" +
" <a href=\"https://github.com/bytedance/bytemd\">bytemd</a>\n" +
"</div>\n" +
"<script>\n" +
" const plugins = [bytemdPluginGfm(), bytemdPluginHighlight()];\n" +
" const editor = new bytemd.Editor({\n" +
" target: document.body,\n" +
" props: {\n" +
" value: '# heading\\n\\nparagraph\\n\\n> blockquote',\n" +
" plugins,\n" +
" },\n" +
" });\n" +
" editor.$on('change', (e) => {\n" +
" editor.$set({value: e.detail.value});\n" +
" });\n" +
"</script>\n" +
"</body>\n" +
"</html>";

static {
// 初始化主题
try {
UIManager.setLookAndFeel(FlatIntelliJLaf.class.getName());
} catch (Exception e) {
throw new IllegalStateException("theme init error", e);
}
}

private static JFrame buildFrame(int w, int h, LayoutManager layoutManager) {
JFrame frame = new JFrame();
frame.setLayout(layoutManager);
frame.setTitle(TITLE);
frame.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
frame.setSize(w, h);
Toolkit toolkit = Toolkit.getDefaultToolkit();
int x = (int) (toolkit.getScreenSize().getWidth() - frame.getWidth()) / 2;
int y = (int) (toolkit.getScreenSize().getHeight() - frame.getHeight()) / 2;
frame.setLocation(x, y);
return frame;
}

private static void initAndDisplay() {
// 构建窗体
JFrame frame = buildFrame(W, H, new BorderLayout());
JFXPanel panel = new JFXPanel();
Platform.runLater(() -> {
panel.setSize(W, H);
initWebView(panel, CONTENT);
frame.getContentPane().add(panel);
});
frame.setVisible(true);
}

public static void main(String[] args) {
SwingUtilities.invokeLater(MarkdownEditor::initAndDisplay);
}

private static void initWebView(JFXPanel fxPanel, String content) {
StackPane root = new StackPane();
Scene scene = new Scene(root);
WebView webView = new WebView();
WebEngine webEngine = webView.getEngine();
webEngine.setJavaScriptEnabled(true);
webEngine.loadContent(content);
root.getChildren().add(webView);
fxPanel.setScene(scene);
}
}

H5文本来源于bytemd的原生JS实现例子:

所有代码加上注释大概120多行。使用JDK11运行,结果如下:

目前有2个没有解决的问题(也有可能是):

  • JS的动作触发有轻微延迟
  • WebView组件初始化比较慢

小结

Oracle JDK官方已经宣布不再维护Swing项目,按照一般的尿性后面有可能从JDK中移除,也许是因为它体现不了自身的价值(低情商:不赚钱)。Swing的开发中布局是比较反人类的,一般可能一个Swing项目布局会耗费90%以上的时间,原生组件的UI设计看上去比较”丑”,没有丰富的扩展组件和活跃的社区,加上现在有其他更好的跨平台开发方案如Qt、React Native和Flutter等等,Swing被遗忘是一个既定的结局。往后除了一枝独秀的JetBrains,Swing的结局就是成为少数人业务的爱好,成为JDK GUI编程爱好者的收藏品。

Demo源码:

  • local-markdown-editor(https://gitee.com/throwableDoge/local-markdown-editor)

(本文完 e-a-20210815 c-1-d)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文搞懂 Redis 高性能之 IO 多路复用

发表于 2021-08-15

这是我参与8月更文挑战的第15天,活动详情查看:8月更文挑战

相信大家在面试过程中经常会被问到:“单线程的 Redis 为啥这么快?”

哈哈,反正我在面试时候经常会问候选人这个问题,这个问题其实是对 redis 内部机制的一个考察,可以牵扯出好多涉及底层深入原理的一些列问题。

回到问题本身,基本的回答就两点:

  • 完全基于内存
  • IO 多路复用

1、关于第 1 点比较好理解。Redis 绝大部分请求是纯粹的内存操作,非常快速。数据存在内存中,类似于 HashMap,查找和操作的时间复杂度都是 O(1)。

2、关于第 2 点 IO 多路复用,有些同学看到概念后感觉一头雾水,到底什么是 IO 多路复用?

本文从 IO 并发性能提升来整体思考,来逐步剖析 IO 多路复用的原理。

一、如何快速理解 IO 多路复用?

  • 多进程
  • 多线程
  • 基于单进程的 IO 多路复用(select/poll/epoll)

1、多进程

对于并发情况,假如一个进程不行,那搞多个进程不就可以同时处理多个客户端连接了么?

多进程这种方式的确可以解决了服务器在同一时间能处理多个客户端连接请求的问题,但是仍存在一些缺点:

  • fork()等系统调用会使得进程上下文进行切换,效率较低
  • 进程创建的数量随着连接请求的增加而增加。比如 10w 个请求,就要 fork 10w 个进程,开销太大
  • 进程与进程之间的地址空间是私有、独立的,使得进程之间的数据共享变得困难

2、多线程

线程是运行在进程上下文的逻辑流,一个进程可以包含多个线程,多个线程运行在同一进程上下文中,因此可共享这个进程地址空间的所有内容,解决了进程与进程之间通信难的问题。

同时,由于一个线程的上下文要比一个进程的上下文小得多,所以线程的上下文切换,要比进程的上下文切换效率高得多。

3、IO 多路复用

简单理解就是:一个服务端进程可以同时处理多个套接字描述符。

  • 多路:多个客户端连接(连接就是套接字描述符)
  • 复用:使用单进程就能够实现同时处理多个客户端的连接

以上是通过增加进程和线程的数量来并发处理多个套接字,免不了上下文切换的开销,而 IO 多路复用只需要一个进程就能够处理多个套接字,从而解决了上下文切换的问题。

其发展可以分 select->poll→epoll 三个阶段来描述。

二、如何简单理解 select/poll/epoll 呢?

按照以往惯例,还是联系一下我们日常中的现实场景,这样更助于大家理解。

举栗说明:

领导分配员工开发任务,有些员工还没完成。如果领导要每个员工的工作都要验收 check,那在未完成的员工那里,只能阻塞等待,等待他完成之后,再去 check 下一位员工的任务,造成性能问题。

那如何解决这个问题呢?

1、select

举栗说明:

领导找个 Team Leader(后文简称 TL),负责代自己 check 每位员工的开发任务。

TL 的做法是:遍历问各个员工“完成了么?”,完成的待 CR check 无误后合并到 Git 分支,对于其他未完成的,休息一会儿后再去遍历….

这样存在什么问题呢?

  • 这个 TL 存在能力短板问题,最多只能管理 1024 个员工
  • 很多员工的任务没有完成,而且短时间内也完不成的话,TL 还是会不停的去遍历问询,影响效率。

select 函数:

1
arduino复制代码int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout);

复制代码

select 函数监视的文件描述符分 3 类,分别是 writefds、readfds、和 exceptfds。调用后 select 函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有 except),或者超时(timeout 指定等待时间,如果立即返回设为 null 即可),函数返回。当 select 函数返回后,可以通过遍历 fdset,来找到就绪的描述符。

select 具有良好的跨平台支持,其缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在 Linux 上一般为 1024。

2、poll

举栗说明:

换一个能力更强的 New Team Leader(后文简称 NTL),可以管理更多的员工,这个 NTL 可以理解为 poll。

poll 函数:

1
2
3
4
5
6
7
arduino复制代码intpoll(structpollfd*fds, nfds_t nfds,int timeout);
typedef struct pollfd{
int fd; // 需要被检测或选择的文件描述符
short events; // 对文件描述符fd上感兴趣的事件
short revents; // 文件描述符fd上当前实际发生的事件
}
pollfd_t;

复制代码

poll 改变了文件描述符集合的描述方式,使用了 pollfd 结构而不是 select 的 fd_set 结构,使得 poll 支持的文件描述符集合限制远大于 select 的 1024。

3、epoll

举栗说明:

在上一步 poll 方式的 NTL 基础上,改进一下 NTL 的办事方法:遍历一次所有员工,如果任务没有完成,告诉员工待完成之后,其应该做 xx 操作(制定一些列的流程规范)。这样 NTL 只需要定期 check 指定的关键节点就好了。这就是 epoll。

Linux 中提供的 epoll 相关函数如下:

1
2
3
arduino复制代码intepoll_create(int size);
intepoll_ctl(int epfd,int op,int fd,struct epoll_event *event);
intepoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);

复制代码

epoll 是 Linux 内核为处理大批量文件描述符而作了改进的 poll,是 Linux 下多路复用 IO 接口 select/poll 的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统 CPU 利用率。

4、小结

  • select 就是轮询,在 Linux 上限制个数一般为 1024 个
  • poll 解决了 select 的个数限制,但是依然是轮询
  • epoll 解决了个数的限制,同时解决了轮询的方式

三、IO 多路复用在 Redis 中的应用

Redis 服务器是一个事件驱动程序, 服务器处理的事件分为时间事件和文件事件两类。

  • 文件事件:Redis 主进程中,主要处理客户端的连接请求与相应。
  • 时间事件:fork 出的子进程中,处理如 AOF 持久化任务等。

由于 Redis 的文件事件是单进程,单线程模型,但是确保持着优秀的吞吐量,IO 多路复用起到了主要作用。

文件事件是对套接字操作的抽象,每当一个套接字准备好执行连接应答、写入、读取、关闭等操作时,就会产生一个文件事件。因为一个服务器通常会连接多个套接字,所以多个文件事件有可能会并发地出现。

IO 多路复用程序负责监听多个套接字并向文件事件分派器传送那些产生了事件的套接字。文件事件分派器接收 IO 多路复用程序传来的套接字,并根据套接字产生的事件的类型,调用相应的事件处理器。示例如图所示:

文件处理器的四个组成部分

Redis 的 IO 多路复用程序的所有功能都是通过包装常见的 select、poll、evport 和 kqueue 这些 IO 多路复用函数库来实现的,每个 IO 多路复用函数库在 Redis 源码中都有对应的一个单独的文件。

Redis 为每个 IO 多路复用函数库都实现了相同的 API,所以 IO 多路复用程序的底层实现是可以互换的。如图:

多个IO复用库实现可选

Redis 把所有连接与读写事件、还有我们没提到的时间事件一起集中管理,并对底层 IO 多路复用机制进行了封装,最终实现了单进程能够处理多个连接以及读写事件。这就是 IO 多路复用在 redis 中的应用。

四、总结

Redis 6.0 之后的版本开始选择性使用多线程模型。

Redis 选择使用单线程模型处理客户端的请求主要还是因为 CPU 不是 Redis 服务器的瓶颈,使用多线程模型带来的性能提升并不能抵消它带来的开发成本和维护成本,系统的性能瓶颈也主要在网络 I/O 操作上;

而 Redis 引入多线程操作也是出于性能上的考虑,对于一些大键值对的删除操作,通过多线程非阻塞地释放内存空间也能减少对 Redis 主线程阻塞的时间,提高执行的效率。

凡事不能有绝对,寻找到适中的平衡点最重要!

  • END -

作者:架构精进之路,十年研发风雨路,大厂架构师,CSDN 博客专家,专注架构技术沉淀学习及分享,职业与认知升级,坚持分享接地气儿的干货文章,期待与你一起成长。

关注并私信我回复“01”,送你一份程序员成长进阶大礼包,欢迎勾搭。

文章首发于同名公众号《架构精进之路》,原文链接:关于MySQL varchar类型最大值,原来一直都理解错了

Thanks for reading!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MQ的死信队列和延迟队列 RabbitMQ系列(十)

发表于 2021-08-15

这是我参与8月更文挑战的第15天,活动详情查看:8月更文挑战


相关文章

MyBatis系列汇总:MyBatis系列


前言

  • 讲到这里,MQ的基础也讲的差不多了,这是基础文章的最后一篇了!下面就会整合SpringBoot来进行实战讲解了!
  • 死信队列,顾名思义就是无法被消费的消息。
  • 有哪些原因会导致消息无法被消费呢?
+ 消息TTL过期。
+ 队列超长,就是队列满了,新的消息无法再加入到MQ队列中。
+ 消息被拒绝,即 basic.reject或者basic.nack,并且拒绝放回原队列(requeue=false)。
  • 应用场景
+ 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
+ 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
+ 买火车票,半小时未付款即订单失效。
  • 逻辑架构图
+ ![image-20210806111539854.png](https://gitee.com/songjianzaina/juejin_p13/raw/master/img/8c333186b8b127b99876825feef725260b16193ff25ea426dc70d05aaf159dd0)

一、死信队列

①、消息TTL过期

  • 生产者

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    java复制代码/**
    * 这是一个测试的生产者
    *@author DingYongJun
    *@date 2021/8/6
    */
    public class DyProducerTest_dead {

    private static final String EXCHANGE_NAME = "normal_exchange";
    /**
    * 这里为了方便,我们使用main函数来测试
    * 纯属看你个人选择
    * @param args
    */
    public static void main(String[] args) throws Exception {
    publishMessageIndividually();
    }

    public static void publishMessageIndividually() throws Exception {
    //使用工具类来创建通道
    Channel channel = RabbitMqUtils.getChannel();
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    //设置过期时间,单位为ms
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    for (int i=0;i<5;i++){
    String msg = i+"消息";
    //发送消息到指定交换机中
    channel.basicPublish(EXCHANGE_NAME,"zhangsan",properties,msg.getBytes());
    System.out.println("我是生产者,我发送了+"+msg);
    }
    }
    }
  • 消费者

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    java复制代码/**
    * 这是一个测试的消费者
    *
    * @author DingYongJun
    * @date 2021/8/6
    */
    public class DyConsumerTest_dead01 {

    //普通交换机
    private static final String EXCHANGE_NAME = "normal_exchange";
    //死信交换机,专门消费私信队列里面的消息
    private static final String DEAD_NAME = "dead_exchange";

    public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //先申明两个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//普通
    channel.exchangeDeclare(DEAD_NAME, BuiltinExchangeType.DIRECT);//死信
    //申明死信队列
    String deadQueueName = "dead_queue";
    channel.queueDeclare(deadQueueName,false,false,false,null);
    //将死信队列绑定到交换机上
    channel.queueBind(deadQueueName,DEAD_NAME,"lisi");
    //正常队列绑定死信队列信息
    Map<String, Object> params = new HashMap<>();
    //正常队列设置死信交换机 参数 key 是固定值
    params.put("x-dead-letter-exchange", DEAD_NAME);
    //正常队列设置死信 routing-key 参数 key 是固定值
    params.put("x-dead-letter-routing-key", "lisi");

    String normalQueue = "normal-queue";
    channel.queueDeclare(normalQueue, false, false, false, params);
    channel.queueBind(normalQueue, EXCHANGE_NAME, "zhangsan");
    System.out.println("等待接收消息.....");
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("Consumer01 接收到消息"+message);
    };
    channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
    });
    }
    }
  • 先启动消费者,然后停止,不然消息直接消费掉了,为了模拟让消息能够十秒后过期

  • 查看后台页面,可以看到,普通队列和死信队列都启动成功了

  • image-20210806131910027.png

  • 此时启动生产者,发送5条消息

  • image-20210806132458266.png

  • 可以看到,普通队列中成功接受了5条消息。

  • image-20210806132510917.png

  • 等待10秒钟,因为我们关闭了消费者,所以消息会过期,可以看到5条消息全部进入了死信队列中。

  • 为什么死信中是10条?因为我先前已经测试了一遍,原先就有5条消息了。

  • image-20210806132528775.png

  • 这时候我们写专门消费私信队列的消费者

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    java复制代码/**
    * 这是一个测试的消费者
    *@author DingYongJun
    *@date 2021/8/1
    */
    public class DyConsumerTest_dead02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    String deadQueue = "dead_queue";
    channel.queueDeclare(deadQueue, false, false, false, null);
    channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
    System.out.println("等待接收死信队列消息.....");
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("Consumer02 接收死信队列的消息" + message);
    };
    channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
    });
    }
    }
  • 启动该消费者看结果

  • image-20210806132924716.png

  • image-20210806132945333.png

  • 可以看到,成功被消费掉。

②、队列超长

  • 生产者注释掉过期时间参数
  • image-20210806133346372.png
  • 正常消费者,增加队列长度的限制为3
  • image-20210806133958397.png
  • 注意这时候要把原先的队列删除,因为参数发生了改变,不删除启动会报错。
  • image-20210806133613092.png
  • 启动普通队列消费者和生产者
  • image-20210806135828924.png
  • 正常队列只保存了3条信息,而死信队列有7条。证明超过了队列的容量也会进入死信队列。

③、消息被拒

  • 生产者代码不变

  • 消费者模拟接受消息拒绝

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    java复制代码/**
    * 这是一个测试的消费者
    *
    * @author DingYongJun
    * @date 2021/8/6
    */
    public class DyConsumerTest_dead01 {

    //普通交换机
    private static final String EXCHANGE_NAME = "normal_exchange";
    //死信交换机,专门消费私信队列里面的消息
    private static final String DEAD_NAME = "dead_exchange";

    public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //先申明两个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//普通
    channel.exchangeDeclare(DEAD_NAME, BuiltinExchangeType.DIRECT);//死信

    //申明死信队列
    String deadQueueName = "dead_queue";
    channel.queueDeclare(deadQueueName,false,false,false,null);
    //将死信队列绑定到交换机上
    channel.queueBind(deadQueueName,DEAD_NAME,"lisi");

    //正常队列绑定死信队列信息
    Map<String, Object> params = new HashMap<>();
    //正常队列设置死信交换机 参数 key 是固定值
    params.put("x-dead-letter-exchange", DEAD_NAME);
    //正常队列设置死信 routing-key 参数 key 是固定值
    params.put("x-dead-letter-routing-key", "lisi");
    //设置队列的长度限制为3
    // params.put("x-max-length",3);

    String normalQueue = "normal-queue";
    channel.queueDeclare(normalQueue, false, false, false, params);
    channel.queueBind(normalQueue, EXCHANGE_NAME, "zhangsan");
    System.out.println("等待接收消息.....");
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    if (message.equals("3消息")){
    System.out.println("收到这个消息但是我不想要:"+message);
    //requeue设置为false,表明拒绝重新入队,如果该队列配置了死信队列,拒绝后交换机会将其发送的死信队列当中。设置true的话,会自动重新入列。
    channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
    }else {
    //其他消息正常应答
    System.out.println("Consumer01 接收到消息"+message);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    };
    channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
    });
    }
    }
  • 执行生产者后

  • image-20210806150831414.png

  • 启动消费者后

  • image-20210806150845568.png

二、延迟队列

  • 延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理。
  • 简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
  • 使用场景
+ 订单在十分钟之内未支付则自动取消
+ 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
  • 这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务
  • 设置TTL
  • 方式一
+ 对每条消息设置过期时间。
+ 对队列设置过期时间。
  • 这里就不用代码演示了,后面的整合SpringBoot的实战文章会讲解!
  • 大概讲解下思路
+ 我们有一个正常队。
+ 一个私信队列。
+ 对正常队列中的消息设置过期时间。
+ 如果到期之后该消息进入死信队列。
+ 然后有专门的消费者消费私信的队列的消息。
+ 这样是不是就相当于延迟队列?
  • 至于为什么在这里不讲,因为这玩意是队列,先进先出,不用MQ插件来实现的话,看不出来效果,留在实战文章对比着详细解释吧!

路漫漫其修远兮,吾必将上下求索~

如果你认为i博主写的不错!写作不易,请点赞、关注、评论给博主一个鼓励吧~hahah

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

毕业设计-停车场管理系统 停车场管理系统

发表于 2021-08-15

停车场管理系统

前言

​ 本期项目是停车场管理系统,主要包括数据监控大盘、车辆管理、黑名单管理、停车管理、车位管理、预约管理、日志管理、用户管理、角色管理。尽可能的把停车场功能做全,然后以企业级的开发标准来完成整个前后端代码,无论是用来作为毕业设计还是拿来学习,相信对初学者都会有很大帮助。

(想要源码和视频教程的同学私信我~)

工程架构

应用分层

image-20201226111957265

上面的分层架构摘自阿里巴巴java开发手册,我对此做了一些调整,实际分层结构如下:

image-20210814222319932

领域模型

  • DO(DataObject):与数据库表结构一一对应,通过DAO层向上传输数据源对象
  • BO(BusinessObject):业务对象。由Service层输出的封装业务逻辑的对象
  • VO(View Object):显示层对象,通常是Web向模板渲染引擎层传输的对象

BO和VO领域模型又分为BoRequest(输入模型)、BoResponse(输出模型)、VoRequest(输入模型)、VoResponse(输出模型)

技术栈

前端:vue + element

后端:jdk1.8 + springboot + redis + mysql

系统设计

接口设计

​ 整个项目接口采用的目前互联网比较流行的restful风格设计,每个接口、每个参数都有详细的文档说明。因为企业中开发必然是团队协作,必然前后端分离的开发模式,你得先把接口定义出来,然后前端可以和后端同步开发。还有一种就是对外提供接口,比如你们隔壁团队也想调用你这个服务的接口,但是你两排期是同一周,这时候你得先把接口定义出来给人家,然后大家同步开发,开发完了之后再进行联调。

image-20210814222543063

运行效果

系统登录

image-20210814222918495

dashboard

首页数据大盘,按最近7天饼图占比、最近30天折线图走势、最近一年柱状图分析、最近7天各个时间段占比分析全方位可视化分析数据。

image-20210814223026088

车辆管理

image-20210815081233437

黑名单管理

​ 对于一些漏缴费、不按规定停车、多次预约停车位却毁约的车辆,我们可以添加黑名单,加黑后的车辆将不被允许进入停车场。

image-20210815081307131

停车管理

​ 车辆入库后会生成一条停车记录,此时状态是’已入库’和’未支付’,等车辆出口后,系统会根据车位的每小时停车费*实际停车实际(按小时计算,超出一小时按一小时收费)。这里大家需要注意,真实的停车场收费都是摄像头拍照的,比如车子出库的时候,摄像头会拍摄车牌,然后生成收费信息,当你缴费后就可以出库了。这里我们是管理后台,系统并没有接入摄像头设备,所以出库需要人工点击出库按钮。(也可以接入支付宝扣费接口和摄像头接口,这样我们的系统就跟真实的停车管理系统一样了~)

image-20210815081618931

Excel导出

​ 所有模块都支持数据导出Excel,方便进行数据分析

停车记录导出

image-20210815083758271

车位数据导出

image-20210815083919978

车位管理

image-20210815082151937

预约管理

​ 车主可以提前预约,预约后将优先安排车辆入库停车

image-20210815082301459

日志管理

​ 日志管理默认是开给管理员的,在系统中的所有操作都会被记录,在系统出现异常时也便于管理员进行问题排查。

image-20210815082330122

用户管理

​ 默认也是只有管理员拥有用户管理菜单的权限,可以新建/编辑用户、分配用户角色、禁用/启用等操作

image-20210815082348987

编辑用户信息

image-20210815082407636

角色管理

​ 极其灵活的权限管理,系统中的所有按钮都可以单独分配权限,你可以给A角色只分配了查询和导出权限,也可以给B角色分配查询、编辑、新建权限,还可以给C角色只分配查询权限。可以满足几乎所有的业务需求,大家可以自由发挥定义权限组合。

image-20210815082437624

页面不存在时提示页面

image-20210815082839734

普通读者登录

​ 系统默认会创建两个角色,一个是超管角色,另一个则是普通用户角色(当然角色大家可以按前面说的自定义)。普通用户登录,比如停车管理菜单,普通用户就只有查询的权限,其他的新增、编辑、删除、导出和出库权限都没有。截图如下:

image-20210815083029659

个人信息修改

image-20201226090334871

密码修改

​ 管理员创建完用户之后的默认密码是“123456”,用户可以登录系统自己修改密码

image-20210815083303801

权限设计

​ 权限基于security和spring-session实现。权限可以分为认证和授权,认证其实就是登录,用户登录时会进行账号密码的校验,校验成功后会,会把session存入redis中。授权指的是用户是否拥有访问后端资源的权限,每个新用户在创建后都会分配角色,角色其实就是一个权限集合,这里的权限可以理解为访问后端一个个接口(资源)的权限。

​ 这里权限设计的非常灵活,细粒度到按钮级别,比如新增、删除、修改、查询、借阅动作,普通用户可能就只有查询权限,管理员则拥有新增、删除、修改的权限。普通用户即使通过接口直接访问后端的修改或者删除接口,后端也会返回授权失败错误,因为后端每个需要权限的接口都打了权限标识,只有拥有资源权限用户才能访问。

​ 比如下面的车辆修改接口,只有拥有“CAR_UPDATE”这个权限标识的用户才能访问这个接口,否则返回“未授权”的错误。

1
2
3
4
5
java复制代码@PutMapping("/{id}")
@PreAuthorize("hasAuthority(T(com.senior.book.console.api.security.Authority).BOOK_UPDATE.name())")
public Result<Boolean> update(@PathVariable("id") Long id, @Valid @RequestBody BookUpdateVoRequest request) {

}

日志方案

​ 日志采用lombok注解+slf4j+log4j2的实现方案,基于profile实现了多环境的日志配置,因为不同环境的日志打印策略是不一样,比如开发环境我可能需要打印到console控制台,需要debug级别的日志以便于本地开发调试,测试环境可能就需要打印到日志文件里,线上环境可能需要打印到文件的同时将日志发送到kafka然后收集到es中,这样当线上部署了多台机器后我们查日志不用一台一台机器去查日志了,因为都收集到es了,我们只需要登录kibana去搜索,这样就非常方便。这里说到的kafka+es+kibana这样一套日志解决方案也是目前互联网公司比较常用的一套解决方案。如果你动手能力够强,你可以本地搭一套kafka、es、kibana,然后只需要在配置文件中加入几行配置就实现了这么一套企业级的日志解决方案(默认是输出到日志文件)。

下面是部分关键配置,如果要配置kafka,只需要在标签中配置配置即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
xml复制代码    <?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" xmlns:xi="http://www.w3.org/2001/XInclude">
<Properties>
<Property name="LOG_FILE">system.log</Property>
<Property name="LOG_PATH">./logs</Property>
<Property name="PID">????</Property>
<Property name="LOG_EXCEPTION_CONVERSION_WORD">%xwEx</Property>
<Property name="LOG_LEVEL_PATTERN">%5p</Property>
<Property name="LOG_DATE_FORMAT_PATTERN">yyyy-MM-dd HH:mm:ss.SSS</Property>
<Property name="CONSOLE_LOG_PATTERN">%clr{%d{${LOG_DATE_FORMAT_PATTERN}}}{faint} %clr{${LOG_LEVEL_PATTERN}} %clr{${sys:PID}}{magenta} %clr{---}{faint} %clr{[%15.15t]}{faint} %clr{%-40.40c{1.}}{cyan} %clr{:}{faint} %m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}
</Property>
<Property name="FILE_LOG_PATTERN">%d{${LOG_DATE_FORMAT_PATTERN}} ${LOG_LEVEL_PATTERN} ${sys:PID} --- [%t] %-40.40c{1.}:%L : %m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}
</Property>
</Properties>
<Appenders>
<xi:include href="log4j2/file-appender.xml"/>
</Appenders>
<Loggers>
<logger name="com.senior.park" level="info"/>
<Root level="info">
<AppenderRef ref="FileAppender"/>
</Root>
</Loggers>
</Configuration>

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

工作中「责任链模式🤔」的感悟 首先明确含义

发表于 2021-08-15

这是我参与8月更文挑战的第15天,活动详情查看:8月更文挑战

首先明确含义

  • 责任链模式:为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦,属于行为型模式。
  • 在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。

在工作中我们使用到的场景

  • 比如我们尝实用的OA公司审批系统,比如在报销账单或者审批系统
  • 我们往往是boos具有绝对审批权利,然后是财务第二审批,部门领导第一审批
  • 那么我们直观的去思考业务逻辑就是
1
2
3
4
5
6
7
yaml复制代码if (leader.author != null) {
if (financialer.author != null) {
if (boss.author != null){
handle(...)
}
}
}
  • 来下面的例子深入理解责任链模式
  • 例子:教师可以查看学生的家庭信息、班长只能查看学生的籍贯信息,而团支书只能查看学生的团籍关系
  • 每层的执行权利都是可以往下链接的
  • 创建抽象类 abstractAuther,带有详细的各个职责的级别
    image.png
  • 创建教师类

image.png

  • 创建班长类

image.png

  • 创建团支书类

image.png

  • 创建不同级别的的角色,赋予它们不同的职责级别

image.png

  • 最后的输出
1
2
3
4
5
6
lua复制代码可以查看学生的家庭信息------家庭信息详细
可以查看学生的家庭信息------籍贯信息详细
可以查看学生的籍贯信息------籍贯信息详细
可以查看学生的家庭信息------团建信息详细
可以查看学生的籍贯信息------团建信息详细
可以查看学生的团籍关系------团建信息详细
  • 每个角色的职责可以逐步往下传递,也可以跨级传递

特性

  • 主要解决: 职责链上的处理者负责处理请求,用户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递,所以职责链将请求的发送者和请求的处理者解耦了。
  • 优点:
  1. 降低耦合度。它将请求的发送者和接收者解耦。
  2. 简化了对象。使得对象不需要知道链的结构。
  • 缺点:
  1. 不能保证请求一定被接收。
  2. 系统性能将受到一定影响,而且在进行代码调试时不太方便,可能会造成循环调用。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Linux 环境下安装 Python3 的操作方法

发表于 2021-08-15

这是我参与 8 月更文挑战的第 15 天,活动详情查看: 8月更文挑战

一名致力于在技术道路上的终身学习者、实践者、分享者,一位忙起来又偶尔偷懒的原创博主,一个偶尔无聊又偶尔幽默的少年。

欢迎各位掘友们微信搜索「杰哥的IT之旅」关注!

原文链接:Linux 环境下安装 Python3 的操作方法

一、Linux 下 Python 版本

目前Linux下的绝大部分系统都自带了python2.x的版本,而现在python的主流版本已经到了3.x。为此我们需要将python3.x安装在自己的Linux系统上。

二、查看 Python 版本

1
2
csharp复制代码[root@xiaohui ~]# python --version
Python 2.7.5

三、安装 Python 3 步骤

3.1 用 wget 下载 Python 3.x 的安装包

笔者下载的是 3.7.1 的版本,其余的版本也可根据自己的需要进行下载

1
arduino复制代码[root@xiaohui ~]# wget https://www.python.org/ftp/python/3.7.1/Python-3.7.1rc2.tgz

3.2 创建存放 Python3.x 的文件夹

1
csharp复制代码[root@xiaohui ~]# mkdir  /usr/local/python3/

3.3 将压缩包移至创建的文件夹内并切换至该文件夹解压安装包

1
2
3
csharp复制代码[root@xiaohui ~]# mv Python-3.7.1rc2.tgz /usr/local/python3
[root@xiaohui ~]# cd /usr/local/python3
[root@xiaohui python3]# tar -zxf Python-3.7.1rc2.tgz

3.4 切换至解压的文件夹

1
csharp复制代码[root@xiaohui python3]# cd ./Python-3.7.1rc2

3.5 配置、编译和执行安装

1
2
3
4
5
6
7
8
csharp复制代码[root@xiaohui Python-3.7.1rc2]# ./configure --with-ssl
[root@xiaohui Python-3.7.1rc2]# make
[root@xiaohui Python-3.7.1rc2]# make install
# 安装成功显示
Collecting setuptools
Collecting pip
Installing collected packages: setuptools, pip
Successfully installed pip-10.0.1 setuptools-39.0.1

步骤 5 中可能会出现一些 errors,主要是缺少相应的依赖包,只需要通过 yum 安装对应的依赖包即可解决。笔者就遇到了三个 errors。

错误1 缺少gcc

1
2
3
4
5
perl复制代码错误代码
configure: error: no acceptable C compiler found in $PATH
该错误是因为本机缺少gcc编译环境,只需安装gcc即可
# 安装命令
[root@xiaohui Python-3.7.1rc2]# yum install -y gcc

错误2 缺少zlib

1
2
3
4
5
ini复制代码错误代码
zipimport.ZipImportError: can't decompress data; zlib not available
该错误是因为本机缺少zlib解压缩类库,只需安装zlib即可
# 安装命令
[root@xiaohui Python-3.7.1rc2]# yum install -y zlib*

错误3 缺少libffi-devel

1
2
3
4
5
6
ruby复制代码错误代码
ModuleNotFoundError: No module named '_ctypes'
该错误是因为本机缺少libffi-devel包,只需安装此包即可
# 安装命令
[root@xiaohui Python-3.7.1rc2]# yum install -y libffi-devel
注意在安装完缺少的依赖包后,仍需重新运行对应所在的配置、编译和执行安装命令

3.6 配置及建立软链接

1
2
3
4
5
6
7
8
csharp复制代码将python库路径添加到/etc/ld.so.conf配置中
# ld.so.conf文件是存储etc目录下的所有.conf文件
[root@xiaohui Python-3.7.1rc2]# echo "/usr/python/lib" >> /etc/ld.so.conf
[root@xiaohui Python-3.7.1rc2]# ldconfig
# 建立新的软链接至python3.x,原本旧链接无需删除
# 原因在于例如CentOS的yum源是用python2.x编写的,删除可能会出一些错误
[root@xiaohui Python-3.7.1rc2]# ln -s /usr/python/bin/python3 /usr/bin/python3
[root@xiaohui Python-3.7.1rc2]# ln -s /usr/python/bin/pip3 /usr/bin/pip3

经过上述步骤后则成功完成了 Python3.x 的安装,我们可以检测系统的 Python 版本

1
2
3
4
5
csharp复制代码[root@xiaohui ~]# python3 --version
Python 3.7.1rc2
# python2.x依旧存在
[root@xiaohui ~]# python2 --version
Python 2.7.5

使用pip3测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
csharp复制代码[root@xiaohui Python-3.7.1rc2]# pip3 list
Package Version
---------- --------
certifi 2019.3.9
chardet 3.0.4
future 0.17.1
idna 2.8
itchat 1.2.32
pip 10.0.1
pypng 0.0.19
PyQRCode 1.2.1
requests 2.21.0
setuptools 39.0.1
urllib3 1.24.3
wxpy 0.3.9.8
You are using pip version 10.0.1, however version 19.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

测试成功,python3已成功安装在本Linux系统上

推荐阅读

99%的Linux运维工程师必须要掌握的命令及运用

Linux 环境下 Oracle 数据库常用命令

Linux 环境下 vi/vim 编辑器常用命令

Linux环境下安装及管理程序(软件包封装、RPM命令、源代码编译安装的基本过程)

Linux 环境下账号和权限管理

Linux 磁盘和文件系统管理

Linux 环境下进程和计划任务管理

Linux 环境下分析和排查系统故障

Linux 环境下虚拟化之 KVM 常用命令

轻松带你玩转 Linux 环境下日期的语法!

只需 2 条命令,人人都能玩转的 10 款命令行游戏,刺激!

17 个有趣却无用的 Linux 彩蛋,真是好玩到极致!

Yum 仓库的配置及使用,真香啊!

Linux 图形化终端下截图与快捷键实用技巧!

本文完。


原创不易,如果你觉得这篇文章对你有点用的话,麻烦你为本文点个赞、评论或转发一下,因为这将是我输出更多优质文章的动力,感谢!

对了,掘友们记得给我点个免费的关注哟!防止你迷路下次就找不到我了。

我们下期再见!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Mybatis】Mybatis之分页查询 准备 手写lim

发表于 2021-08-14

这是我参与8月更文挑战的第14天,活动详情查看:8月更文挑战

上文Mybatis之动态SQL介绍了Mybatis中很常用的动态标签,本来继续来介绍一下在Mybatis中十分常用的分页查询。废话不多说,开始今天的内容。

准备

  • 查询参数的POJO中,在offset参数的get方法中,对offset的值进行了相应的计算。
  • 查询结果的POJO中,在构造方法中对总页数pages进行了相应的计算。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
java复制代码// 封装查询参数的POJO
public class QueryCondition {
/**
* ID
*/
private Integer id;
/**
* 名称
*/
private String name;
/**
* 价格
*/
private Integer price;
/**
* 分类
*/
private Integer category;
private List<Integer> categoryList;

/**
* 分页参数
*/
//偏移量
private Integer offset;
//每页条数
private Integer pageSize;
//页数
private Integer pageNum;
// 计算分页的起始位置
public Integer getOffset() {
return ((pageNum == null || pageNum < 1 ? 1 : pageNum) - 1) * (pageSize == null ? 3 : pageSize);
}

// 省略其余get/set方法
}

// 封装查询结果的POJO
public class PageVO<T> {

/**
* 每页条数
*/
private int pageSize;
/**
* 页码
*/
private int pageNum;

/**
* 总页数
*/
private int pages;
/**
* 总条数
*/
private int total;

/**
* 当前页的数据
*/
private List<T> data;

public PageVO(int pageSize, int pageNum, int total, List<T> data) {
this.pageSize = pageSize;
this.pageNum = pageNum;
this.total = total;
this.data = data;
this.pages = total / pageSize + (total % pageSize == 0 ? 0 : 1);
}

// 省略get/set方法
}
  • 数据库数据

image.png

手写limit分页

  • 顾名思义,这种方法就是我们自己在SQL中添加limit关键字来实现分页;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
xml复制代码<select id="findPageByHand" parameterType="org.apache.ibatis.z_run.pojo.QueryCondition" resultType="org.apache.ibatis.z_run.pojo.Purchase">
select
<include refid="Base_Column_List" />
from purchase
<where>
<if test="id != null">
And id = #{id,jdbcType=INTEGER}
</if>
<if test="category != null">
And category = #{category,jdbcType=INTEGER}
</if>
</where>
<if test="pageSize != null and pageNum != null">
limit #{offset}, #{pageSize}
</if>
</select>
  • 另外,为了返回一个总条数,我们还得单独再写一条SQL,用于统计分页查询的总条数,在计算总页数的时候使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
xml复制代码<select id="count" parameterType="org.apache.ibatis.z_run.pojo.QueryCondition" resultType="java.lang.Integer">
select
count(1)
from
(
select
<include refid="Base_Column_List"/>
from purchase
<where>
<if test="id != null">
And id = #{id,jdbcType=INTEGER}
</if>
<if test="category != null">
And category = #{category,jdbcType=INTEGER}
</if>
</where>
) t1
</select>
  • 测试代码及返回结果
1
2
3
4
5
6
7
8
9
10
11
java复制代码int pageSize = 2;
int pageNum = 1;

@Test
public void pageQueryByHand() {
PurchaseMapper mapper = sqlSession.getMapper(PurchaseMapper.class);
QueryCondition condition = new QueryCondition();
condition.setPageNum(pageNum);
condition.setPageSize(pageSize);
System.out.println(new PageVO<>(pageSize, pageNum, mapper.count(condition), mapper.findPageByHand(condition)));
}
1
2
3
4
5
6
7
8
txt复制代码DEBUG [main] - ==>  Preparing: select count(1) from ( select id, `name`, price, category from purchase ) t1 
DEBUG [main] - ==> Parameters:
DEBUG [main] - <== Total: 1
DEBUG [main] - Cache Hit Ratio [org.apache.ibatis.z_run.mapper.PurchaseMapper]: 0.0
DEBUG [main] - ==> Preparing: select id, `name`, price, category from purchase limit ?, ?
DEBUG [main] - ==> Parameters: 0(Integer), 2(Integer)
DEBUG [main] - <== Total: 2
PageVO{pageSize=2, pageNum=1, pages=4, total=7, data=[Purchase{id=1, name='可乐', price=6, category=1}, Purchase{id=2, name='爆米花', price=18, category=2}]}

RowBounds分页

  • RowBounds对象是Mybatis提供的一个分页类,只需要在查询的方法参数中加上这个对象即可使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
xml复制代码<select id="findPageByRowBounds" parameterType="org.apache.ibatis.z_run.pojo.QueryCondition" resultType="org.apache.ibatis.z_run.pojo.Purchase">
select
<include refid="Base_Column_List" />
from purchase
<where>
<if test="id != null">
And id = #{id,jdbcType=INTEGER}
</if>
<if test="category != null">
And category = #{category,jdbcType=INTEGER}
</if>
</where>
</select>
  • 使用RowBounds对象后,就不需要在SQL语句中写limit语句了,但是仍然要对满足条件的数据条数进行单独查询,其结果将用于计算总页数,语句同上。
  • 测试代码及返回结果
1
2
3
4
5
6
7
8
9
10
java复制代码    int pageSize = 2;
int pageNum = 2;

@Test
public void pageQueryByRowBounds() {
PurchaseMapper mapper = sqlSession.getMapper(PurchaseMapper.class);
QueryCondition condition = new QueryCondition();
System.out.println(new PageVO<>(pageSize, pageNum, mapper.count(condition),
mapper.findPageByRowBounds(condition, new RowBounds((pageNum - 1) * pageSize, pageSize))));
}
1
2
3
4
5
6
7
txt复制代码DEBUG [main] - ==>  Preparing: select count(1) from ( select id, `name`, price, category from purchase ) t1 
DEBUG [main] - ==> Parameters:
DEBUG [main] - <== Total: 1
DEBUG [main] - Cache Hit Ratio [org.apache.ibatis.z_run.mapper.PurchaseMapper]: 0.0
DEBUG [main] - ==> Preparing: select id, `name`, price, category from purchase
DEBUG [main] - ==> Parameters:
PageVO{pageSize=2, pageNum=2, pages=4, total=7, data=[Purchase{id=8, name='火腿', price=3, category=1}, Purchase{id=9, name='火腿', price=3, category=1}]}
  • 可以看到,使用RowBounds进行分页时,SQL语句中并没有添加limit关键字进行分页,这是因为RowBounds分页是将所有的数据查询到内存中以后,再使用RowBounds参数进行分页的,对内存的压力很大,性能很低,因此这种方式不建议使用。

分页插件

  • 这里主要介绍目前使用最广泛的Pagehelper插件。Pagehelper插件的原理是使用拦截器拦截SQL语句的执行,并为SQL语句添加limit关键字进行分页查询,以及count语句来查询总数(就不需要我们自己手写count方法来计算数据总条数了)。
  • 导入依赖
1
2
3
4
5
6
xml复制代码<!-- https://mvnrepository.com/artifact/com.github.pagehelper/pagehelper -->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper</artifactId>
<version>5.2.0</version>
</dependency>
  • 配置文件
1
2
3
xml复制代码<plugins>
<plugin interceptor="com.github.pagehelper.PageInterceptor"></plugin>
</plugins>
  • XML语句同RowBounds相同,不需要在SQL语句中写limit关键字。
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码<select id="findByCondition" parameterType="org.apache.ibatis.z_run.pojo.QueryCondition" resultType="org.apache.ibatis.z_run.pojo.Purchase">
select
<include refid="Base_Column_List" />
from purchase
<where>
<if test="id != null">
And id = #{id,jdbcType=INTEGER}
</if>
<if test="category != null">
And category = #{category,jdbcType=INTEGER}
</if>
</where>
</select>
  • 测试代码及查询结果。查询结果的总条数以及总页数都可以在Page对象中直接获取。
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码int pageSize = 2;
int pageNum = 2;

@Test
public void pageQueryByPageHelper() {
PurchaseMapper mapper = sqlSession.getMapper(PurchaseMapper.class);
QueryCondition condition = new QueryCondition();
// 这一行代码之后需要立即接上需要分页的查询方法,否则可能导致分页失效
Page page = PageHelper.startPage(pageNum, pageSize);
List<Purchase> purchaseList = mapper.findByCondition(condition);
System.out.println(new PageVO<>(pageSize, pageNum, (int) page.getTotal(), purchaseList));
}
1
2
3
4
5
6
7
8
txt复制代码DEBUG [main] - ==>  Preparing: SELECT count(0) FROM purchase 
DEBUG [main] - ==> Parameters:
DEBUG [main] - <== Total: 1
DEBUG [main] - Cache Hit Ratio [org.apache.ibatis.z_run.mapper.PurchaseMapper]: 0.0
DEBUG [main] - ==> Preparing: select id, `name`, price, category from purchase LIMIT ?, ?
DEBUG [main] - ==> Parameters: 2(Long), 2(Integer)
DEBUG [main] - <== Total: 2
PageVO{pageSize=2, pageNum=2, pages=4, total=7, data=Page{count=true, pageNum=2, pageSize=2, startRow=2, endRow=4, total=7, pages=4, reasonable=false, pageSizeZero=false}[Purchase{id=8, name='火腿', price=3, category=1}, Purchase{id=9, name='火腿', price=3, category=1}]}

Tips

  • Mybatis配置文件的标签顺序是有讲究的,如果顺序出问题,是会报错的。例如:
1
2
3
4
5
6
7
xml复制代码<!--指定Mapper.xml所在位置-->
<mappers>
<mapper resource="resources/xml/PurchaseMapper.xml"/>
</mappers>
<plugins>
<plugin interceptor="com.github.pagehelper.PageInterceptor"></plugin>
</plugins>
  • 此时,配置文件报错,内容如下:

image.png

  • 运行项目报错,内容大体如下:
1
2
3
4
5
6
7
txt复制代码org.apache.ibatis.exceptions.PersistenceException: 
### Error building SqlSession.
### Cause: org.apache.ibatis.builder.BuilderException: Error creating document instance.
.
.
.
Caused by: org.xml.sax.SAXParseException; lineNumber: 101; columnNumber: 17; 元素类型为 "configuration" 的内容必须匹配 "(properties?,settings?,typeAliases?,typeHandlers?,objectFactory?,objectWrapperFactory?,reflectorFactory?,plugins?,environments?,databaseIdProvider?,mappers?)"。

以上便是对于Mybatis中分页方式的介绍,一般来说,使用Pagehelper更为方便,但是引入第三方插件之后可能会有一些bug,需要在遇到之后进行仔细的排查。而自己手写分页不容易出现问题,但是比较麻烦。具体情况具体分析,根据自身情况挑选适合自己的方式:)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…563564565…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%