开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

用 Go map 要注意这个细节,避免依赖他!

发表于 2021-09-12

大家好,我是煎鱼。

最近又有同学问我这个日经话题,想转他文章时,结果发现我的公众号竟然没有发过,因此今天我再唠叨两句,好让大家避开这个 “坑”。

有的小伙伴没留意过 Go map 输出、遍历顺序,以为它是稳定的有序的,会在业务程序中直接依赖这个结果集顺序,结果栽了个大跟头,吃了线上 BUG。

有的小伙伴知道是无序的,但却不知道为什么,有的却理解错误?

奇怪的输出结果

今天通过本文,我们将揭开 for range map 输出的 “神秘” 面纱,看看它内部实现到底是怎么样的,顺序到底是怎么样?

开始吸鱼之路。

前言

例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
go复制代码func main() {
m := make(map[int32]string)
m[0] = "EDDYCJY1"
m[1] = "EDDYCJY2"
m[2] = "EDDYCJY3"
m[3] = "EDDYCJY4"
m[4] = "EDDYCJY5"

for k, v := range m {
log.Printf("k: %v, v: %v", k, v)
}
}

假设运行这段代码,输出的结果是怎么样?是有序,还是无序输出呢?

1
2
3
4
5
yaml复制代码k: 3, v: EDDYCJY4
k: 4, v: EDDYCJY5
k: 0, v: EDDYCJY1
k: 1, v: EDDYCJY2
k: 2, v: EDDYCJY3

从输出结果上来讲,是非固定顺序输出的,也就是每次都不一样。但这是为什么呢?

首先建议你先自己想想原因。其次我在面试时听过一些说法。有人说因为是哈希的所以就是无(乱)序等等说法。当时我是有点 ???

这也是这篇文章出现的原因,希望大家可以一起研讨一下,理清这个问题 :)

看一下汇编

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
scss复制代码    ...
0x009b 00155 (main.go:11) LEAQ type.map[int32]string(SB), AX
0x00a2 00162 (main.go:11) PCDATA $2, $0
0x00a2 00162 (main.go:11) MOVQ AX, (SP)
0x00a6 00166 (main.go:11) PCDATA $2, $2
0x00a6 00166 (main.go:11) LEAQ ""..autotmp_3+24(SP), AX
0x00ab 00171 (main.go:11) PCDATA $2, $0
0x00ab 00171 (main.go:11) MOVQ AX, 8(SP)
0x00b0 00176 (main.go:11) PCDATA $2, $2
0x00b0 00176 (main.go:11) LEAQ ""..autotmp_2+72(SP), AX
0x00b5 00181 (main.go:11) PCDATA $2, $0
0x00b5 00181 (main.go:11) MOVQ AX, 16(SP)
0x00ba 00186 (main.go:11) CALL runtime.mapiterinit(SB)
0x00bf 00191 (main.go:11) JMP 207
0x00c1 00193 (main.go:11) PCDATA $2, $2
0x00c1 00193 (main.go:11) LEAQ ""..autotmp_2+72(SP), AX
0x00c6 00198 (main.go:11) PCDATA $2, $0
0x00c6 00198 (main.go:11) MOVQ AX, (SP)
0x00ca 00202 (main.go:11) CALL runtime.mapiternext(SB)
0x00cf 00207 (main.go:11) CMPQ ""..autotmp_2+72(SP), $0
0x00d5 00213 (main.go:11) JNE 193
...

我们大致看一下整体过程,重点处理 Go map 循环迭代的是两个 runtime 方法,如下:

  • runtime.mapiterinit
  • runtime.mapiternext

但你可能会想,明明用的是 for range 进行循环迭代,怎么出现了这两个函数,怎么回事?

看一下转换后

1
2
3
4
5
6
7
8
go复制代码var hiter map_iteration_struct
for mapiterinit(type, range, &hiter); hiter.key != nil; mapiternext(&hiter) {
index_temp = *hiter.key
value_temp = *hiter.val
index = index_temp
value = value_temp
original body
}

实际上编译器对于 slice 和 map 的循环迭代有不同的实现方式,并不是 for 一扔就完事了,还做了一些附加动作进行处理。而上述代码就是 for range map 在编译器展开后的伪实现

看一下源码

runtime.mapiterinit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go复制代码func mapiterinit(t *maptype, h *hmap, it *hiter) {
...
it.t = t
it.h = h
it.B = h.B
it.buckets = h.buckets
if t.bucket.kind&kindNoPointers != 0 {
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}

r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
it.startBucket = r & bucketMask(h.B)
it.offset = uint8(r >> h.B & (bucketCnt - 1))
it.bucket = it.startBucket
...

mapiternext(it)
}

通过对 mapiterinit 方法阅读,可得知其主要用途是在 map 进行遍历迭代时进行初始化动作。共有三个形参,用于读取当前哈希表的类型信息、当前哈希表的存储信息和当前遍历迭代的数据

为什么

咱们关注到源码中 fastrand 的部分,这个方法名,是不是迷之眼熟。没错,它是一个生成随机数的方法。再看看上下文:

1
2
3
4
5
6
7
8
9
10
11
go复制代码...
// decide where to start
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
it.startBucket = r & bucketMask(h.B)
it.offset = uint8(r >> h.B & (bucketCnt - 1))

// iterator state
it.bucket = it.startBucket

在这段代码中,它生成了随机数。用于决定从哪里开始循环迭代。更具体的话就是根据随机数,选择一个桶位置作为起始点进行遍历迭代

因此每次重新 for range map,你见到的结果都是不一样的。那是因为它的起始位置根本就不固定!

runtime.mapiternext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
go复制代码func mapiternext(it *hiter) {
...
for ; i < bucketCnt; i++ {
...
k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.valuesize))
...
if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.reflexivekey || alg.equal(k, k)) {
...
it.key = k
it.value = v
} else {
rk, rv := mapaccessK(t, h, k)
if rk == nil {
continue // key has been deleted
}
it.key = rk
it.value = rv
}
it.bucket = bucket
if it.bptr != b {
it.bptr = b
}
it.i = i + 1
it.checkBucket = checkBucket
return
}
b = b.overflow(t)
i = 0
goto next
}

在上小节中,咱们已经选定了起始桶的位置。接下来就是通过 mapiternext 进行具体的循环遍历动作。该方法主要涉及如下:

  • 从已选定的桶中开始进行遍历,寻找桶中的下一个元素进行处理
  • 如果桶已经遍历完,则对溢出桶 overflow buckets 进行遍历处理

通过对本方法的阅读,可得知其对 buckets 的遍历规则以及对于扩容的一些处理(这不是本文重点。因此没有具体展开)

总结

在本文开始,咱们先提出核心讨论点:“为什么 Go map 遍历输出是不固定顺序?”。

经过这一番分析,原因也很简单明了。就是 for range map 在开始处理循环逻辑的时候,就做了随机播种…

你想问为什么要这么做?

当然是官方有意为之,因为 Go 在早期(1.0)的时候,虽是稳定迭代的,但从结果来讲,其实是无法保证每个 Go 版本迭代遍历规则都是一样的。而这将会导致可移植性问题。

因此,改之。也请不要依赖…

若有任何疑问欢迎评论区反馈和交流,最好的关系是互相成就,各位的点赞就是煎鱼创作的最大动力,感谢支持。

文章持续更新,可以微信搜【脑子进煎鱼了】阅读,回复【000】有我准备的一线大厂面试算法题解和资料。

本文 GitHub github.com/eddycjy/blo… 已收录,欢迎 Star 催更。

参考

  • Go maps in action

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

数据库与缓存数据不一致问题 缓存作用? 解决方案 先删除缓存

发表于 2021-09-12

缓存作用?

在客户端请求数据时,如果能在缓存中命中数据,那就查询缓存,不用在去查询数据库,从而减轻数据库的压力,提高服务器的性能。

由于引入了缓存,那么在数据更新时,不仅要更新数据库,而且要更新缓存,这两个更新操作存在前后的问题:

  • 先更新数据库,再更新缓存;
  • 先更新缓存,再更新数据库;

先更新数据库,再更新缓存


请求A和请求B同时更新一条数据,可能出现以下顺序:

image.png
A 请求先将数据库的数据更新为 1,然后在更新缓存前,请求 B 将数据库的数据更新为 2,紧接着也把缓存更新为 2,然后 A 请求更新缓存为 1。

此时,数据库中的数据是 2,而缓存中的数据却是 1,出现了缓存和数据库中的数据不一致的现象。

先更新缓存,再更新数据库

请求A和请求B同时更新一条数据,可能出现以下顺序:

image.png
A 请求先将缓存的数据更新为 1,然后在更新数据库前,B 请求来了, 将缓存的数据更新为 2,紧接着把数据库更新为 2,然后 A 请求将数据库的数据更新为 1。

此时,数据库中的数据是 1,而缓存中的数据却是 2,出现了缓存和数据库中的数据不一致的现象。

所以,无论是「先更新数据库,再更新缓存」,还是「先更新缓存,再更新数据库」,这两个方案都存在并发问题,当两个请求并发更新同一条数据的时候,可能会出现缓存和数据库中的数据不一致的现。

解决方案

不更新缓存,而是删除缓存中的数据。然后,到读取数据时,发现缓存中没了数据之后,再从数据库中读取数据,更新到缓存中。

Cache Aside 策略,中文是叫旁路缓存策略。

该策略又可以细分为「读策略」和「写策略」

image.png
写策略的步骤:

  • 更新数据库中的数据;
  • 删除缓存中的数据。

读策略的步骤:

  • 如果读取的数据命中了缓存,则直接返回数据;
  • 如果读取的数据没有命中缓存,则从数据库中读取数据,然后将数据写入到缓存,并且返回给用户。

【写策略】选择哪种顺序呢?

  • 先删除缓存,再更新数据库;
  • 先更新数据库,再删除缓存

先删除缓存,再更新数据库

假设某个用户的年龄是 20,请求 A 要更新用户年龄为 21,所以它会删除缓存中的内容。这时,另一个请求 B 要读取这个用户的年龄,它查询缓存发现未命中后,会从数据库中读取到年龄为 20,并且写入到缓存中,然后请求 A 继续更改数据库,将用户的年龄更新为 21

image.png

最终,该用户年龄在缓存中是 20(旧值),在数据库中是 21(新值),缓存和数据库的数据不一致。

可以看到,先删除缓存,再更新数据库,在「读 + 写」并发的时候,还是会出现缓存和数据库的数据不一致的问题。

先更新数据库,再删除缓存


假如某个用户数据在缓存中不存在,请求 A 读取数据时从数据库中查询到年龄为 20,在未写入缓存中时另一个请求 B 更新数据。它更新数据库中的年龄为 21,并且清空缓存。这时请求 A 把从数据库中读到的年龄为 20 的数据写入到缓存中。

image.png

最终,该用户年龄在缓存中是 20(旧值),在数据库中是 21(新值),缓存和数据库数据不一致。

从上面的理论上分析,先更新数据库,再删除缓存也是会出现数据不一致性的问题,但是在实际中,这个问题出现的概率并不高。

因为缓存的写入通常要远远快于数据库的写入,所以在实际中很难出现请求 B 已经更新了数据库并且删除了缓存,请求 A 才更新完缓存的情况。

而一旦请求 A 早于请求 B 删除缓存之前更新了缓存,那么接下来的请求就会因为缓存不命中而从数据库中重新读取数据,所以不会出现这种不一致的情况。

所以,「先更新数据库 + 再删除缓存」的方案,是可以保证数据一致性的。

为了确保万无一失,还给缓存数据加上了「过期时间」,就算在这期间存在缓存数据不一致,有过期时间来兜底,这样也能达到最终一致。

还会存在的问题?

「先更新数据库, 再删除缓存」其实是两个操作,前面的所有分析都是建立在这两个操作都能同时执行成功,而这次客户投诉的问题就在于,在删除缓存(第二个操作)的时候失败了,导致缓存中的数据是旧值。

如何保证「先更新数据库 ,再删除缓存」这两个操作能执行成功?
「先更新数据库,再删除缓存」的方案虽然保证了数据库与缓存的数据一致性,但是每次更新数据的时候,缓存的数据都会被删除,这样会对缓存的命中率带来影响。

所以,如果我们的业务对缓存命中率有很高的要求,我们可以采用「更新数据库 + 更新缓存」的方案,因为更新缓存并不会出现缓存未命中的情况。

但是这个方案前面我们也分析过,在两个更新请求并发执行的时候,会出现数据不一致的问题,因为更新数据库和更新缓存这两个操作是独立的,而我们又没有对操作做任何并发控制,那么当两个线程并发更新它们的话,就会因为写入顺序的不同造成数据的不一致。

所以我们得增加一些手段来解决这个问题,这里提供两种做法:

  • 在更新缓存前先加个分布式锁,保证同一时间只运行一个请求更新缓存,就会不会产生并发问题了,当然引入了锁后,对于写入的性能就会带来影响。
  • 在更新完缓存时,给缓存加上较短的过期时间,这样即时出现缓存不一致的情况,缓存的数据也会很快过期,对业务还是能接受的。

针对「先删除缓存,再删除数据库」方案在「读 + 写」并发请求而造成缓存不一致的解决办法是「延迟双删」

延迟双删实现的伪代码如下:

1
2
3
4
5
6
7
8
bash复制代码#删除缓存
redis.delKey(X)
#更新数据库
db.update(X)
#睡眠
Thread.sleep(N)
#再删除缓存
redis.delKey(X)

延迟双删实现的伪代码如下:

1
2
3
4
5
6
7
8
bash复制代码#删除缓存
redis.delKey(X)
#更新数据库
db.update(X)
#睡眠
Thread.sleep(N)
#再删除缓存
redis.delKey(X)

加了个睡眠时间,主要是为了确保请求 A 在睡眠的时候,请求 B 能够在这这一段时间完成「从数据库读取数据,再把缺失的缓存写入缓存」的操作,然后请求 A 睡眠完,再删除缓存。

所以,请求 A 的睡眠时间就需要大于请求 B 「从数据库读取数据 + 写入缓存」的时间。

但是具体睡眠多久其实是个玄学,很难评估出来,所以这个方案也只是尽可能保证一致性而已,极端情况下,依然也会出现缓存不一致的现象。

因此,还是比较建议用「先更新数据库,再删除缓存」的方案。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一篇文章把RabbitMQ、RocketMQ、Kafka三元

发表于 2021-09-12

三大主流MQ的组织结构

1、RabbitMQ

RabbitMQ各组件的功能

image.png

  • Broker: 一个RabbitMQ实例就是一个Broker
  • Virtual Host: 虚拟主机。相当于Mysql的DataBase, 一个Broker上可以存在多个vhost,vhost之间相互隔离。每个vhost都拥有自己的队列、交换机、绑定和权限机制。vhost必须在连接时指定,默认的vhost是 /。
  • Exchange: 交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Queue: 消息队列,用来保存消息直到发送给消费者。它是消息的容器。一个消息可投入一个或多个队列。
  • Banding: 绑定关系,用于消息队列和交换机之间的关联。通过路由键(Routing Key)将交换机和消息队列关联起来。
  • Channel: 管道,一条双向数据流通道。不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了管道的概念,以复用一条TCP连接。
  • Connection: 生产者/消费者 与broker之间的TCP连接。
  • Publisher: 消息的生产者。
  • Consumer: 消息的消费者。
  • Message: 消息,它是由消息头和消息体组成。消息头则包括Routing-Key、Priority(优先级)等。

RabbitMQ的多种交换机类型

Exchange分发消息给Queue时,Exchange的类型对应不同的分发策略,有3种类型的Exchange:Direct、Fanout、Topic。

  • Direct: 消息中的Routing Key如果和Binding中的Routing Key完全一致,Exchange就会将消息分发到对应的队列中。

image.png

  • Fanout: 每个发到 Fanout 类型交换机的消息都会分发到所有绑定的队列上去。Fanout交换机没有Routing Key。它在三种类型的交换机中转发消息是最快的。

image.png

  • Topic: Topic交换机通过模式匹配分配消息,将Routing Key和某个模式进行匹配。它只能识别两个通配符:“#”和”*“。#匹配0个或多个单词,*匹配1个单词。

image.png


TTL

TTL(Time To Live):生存时间。RabbitMQ支持消息的过期时间,一共2种。

  • 在消息发送时进行指定。通过配置消息体的Properties,可以指定当前消息的过期时间。
  • 在创建Exchange时指定。从进入消息队列开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。

生产者的消息确认机制

Confirm机制

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
  • 生产者进行接受应答,用来确认这条消息是否正常的发送到了Broker,这种方式也是消息的可靠性投递的核心保障!

如何实现Confirm确认消息?

image.png

  1. 在channel上开启确认模式:channel.confirmSelect()
  2. 在channel上开启监听:addConfirmListener,监听成功和失败的处理结果,根据具体的结果对消息进行重新发送或记录日志处理等后续操作。

Return消息机制

Return Listener用于处理一些不可路由的消息。

我们的消息生产者,通过指定一个Exchange和Routing,把消息送达到某一个队列中去,然后我们的消费者监听队列进行消息的消费处理操作。

但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达消息,就需要使用到Returrn Listener。

基础API中有个关键的配置项Mandatory:如果为true,监听器会收到路由不可达的消息,然后进行处理。如果为false,broker端会自动删除该消息。

同样,通过监听的方式,chennel.addReturnListener(ReturnListener rl)传入已经重写过handleReturn方法的ReturnListener。


消费端ACK与NACK

消费端进行消费的时候,如果由于业务异常可以进行日志的记录,然后进行补偿。但是对于服务器宕机等严重问题,我们需要手动ACK保障消费端消费成功。

1
2
3
4
js复制代码// deliveryTag:消息在mq中的唯一标识
// multiple:是否批量(和qos设置类似的参数)
// requeue:是否需要重回队列。或者丢弃或者重回队首再次消费。
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)

如上代码,消息在消费端重回队列是为了对没有成功处理消息,把消息重新返回到Broker。一般来说,实际应用中都会关闭重回队列(避免进入死循环),也就是设置为false。


死信队列DLX

死信队列(DLX Dead-Letter-Exchange): 当消息在一个队列中变成死信之后,它会被重新推送到另一个队列,这个队列就是死信队列。

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

2、RocketMQ

阿里巴巴双十一官方指定消息产品,支撑阿里巴巴集团所有的消息服务,历经十余年高可用与高可靠的严苛考验,是阿里巴巴交易链路的核心产品;

Rocket:火箭的意思。

image.png

RocketMQ的核心概念

他有以下核心概念:Broker、Topic、Tag、MessageQueue、NameServer、Group、Offset、Producer以及Consumer。

下面来详细介绍。

  • Broker:消息中转角色,负责存储消息,转发消息。
+ - **Broker**是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将**Topic**信息注册到NameServer,顺带一提底层的通信和连接都是**基于Netty实现**的。
+ - **Broker**负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。
+ - 官网上有数据显示:具有**上亿级消息堆积能力**,同时可**严格保证消息的有序性**。
  • Topic:主题!它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0个、1个、多个消费者订阅。
  • Tag:标签!可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。
  • MessageQueue:一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。
  • NameServer:类似Kafka中的Zookeeper, 但NameServer集群之间是没有通信的,相对ZK来说更加轻量。 它主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 去 NameServer 获取对应 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。
  • Producer: 生产者,支持三种方式发送消息:同步、异步和单向
+ - `单向发送`:消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应,且**没有回调函数**。
+ - `异步发送`:消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应,**有回调函数**。
+ - `同步发送`:消息发出去后,等待服务器响应成功或失败,才能继续后面的操作。
  • Consumer:消费者,支持PUSH和PULL两种消费模式,支持集群消费和广播消费
+ - `集群消费`: 该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
+ - `广播消费`: 会发给消费者组中的每一个消费者进行消费。相当于**RabbitMQ**的发布订阅模式。
  • Group:分组,一个组可以订阅多个Topic。分为 ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的
  • Offset:在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。

延时消息

开源版的RocketMQ不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1min 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。

延时等级如下:

1
ini复制代码messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

事务消息

image.png
消息队列 MQ 提供类似 X/Open XA 的分布式事务功能,通过消息队列 MQ 事务消息能达到分布式事务的最终一致。上图说明了事务消息的大致流程:正常事务消息的发送和提交、事务消息的补偿流程。

  • 事务消息发送及提交:
  1. 发送half消息
  2. 服务端响应消息写入结果
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行);
  4. 根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)。
  • 事务消息的补偿流程:
  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”;
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态。
  3. 根据本地事务状态,重新Commit或RollBack
    其中,补偿阶段用于解决消息Commit或Rollback发生超时或者失败的情况。
  • 事务消息状态:

事务消息共有三种状态:提交状态、回滚状态、中间状态:

  1. TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
  2. TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
  3. TransactionStatus.Unkonwn:中间状态,它代表需要检查消息队列来确定消息状态。

RocketMQ的高可用机制

RocketMQ是天生支持分布式的,可以配置主从以及水平扩展

Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。

消息消费的高可用(主从)

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。
RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。

消息发送高可用(配置多个主节点)

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。

主从复制

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。

  • 同步复制:同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态。如果Master出故障, Slave上有全部的备份数据,容易恢复同步复制会增大数据写入延迟,降低系统吞吐量。
  • 异步复制:异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失

通常情况下,应该把Master和Save配置成同步刷盘方式,主从之间配置成异步的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。


负载均衡

Producer负载均衡

Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:

image.png

Consumer负载均衡

如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。

  • 消费者的集群模式–启动多个消费者就可以保证消费者的负载均衡(均摊队列)
  • 默认使用的是均摊队列:会按照queue的数量和实例的数量平均分配queue给每个实例,这样每个消费者可以均摊消费的队列,如下图所示6个队列和三个生产者。

image.png

  • 另外一种平均的算法环状轮流分queue的形式,每个消费者,均摊不同主节点的一个消息队列,如下图所示:

image.png

对于广播模式并不是负载均衡的,要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。


死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。死信队列的名称是%DLQ%+ConsumGroup

死信队列具有以下特性:

  1. 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  2. 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  3. 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

3、Kafka

Kafka是一个分布式、支持分区的、多副本的,基于Zookeeper协调的分布式消息系统。

它最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,Web/Nginx日志、访问日志,消息服务等等,用Scala语言编写。属于Apache基金会的顶级开源项目。

先看一下Kafka的架构图
image.png

Kafka的核心概念

在Kafka中有几个核心概念:Broker、Topic、Producer、Consumer、ConsumerGroup、Partition、Leader、Follower、Offset。

  • Broker : 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
  • Topic : Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个 topic
  • Producer : 消息生产者,向Broker发送消息的客户端
  • Consumer : 消息消费者,从Broker读取消息的客户端
  • ConsumerGroup: 每个Consumer属于一个特定的ConsumerGroup,一条消息可以被多个不同的ConsumerGroup消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息
  • Partition : 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的
  • Leader : 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是负责数据读写的partition。
  • Follower : Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从ISR列表中删除,重新创建一个Follower。
  • Offset : 偏移量。kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可

可以这么来理解Topic,Partition和Broker:

一个Topic,代表逻辑上的一个业务数据集,比如订单相关操作消息放入订单Topic,用户相关操作消息放入用户Topic,对于大型网站来说,后端数据都是海量的,订单消息很可能是非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在Topic内部划分多个Partition来分片存储数据,不同的Partition可以位于不同的机器上,相当于分布式存储。每台机器上都运行一个Kafka的进程Broker。


Kafka核心总控制器Controller

在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),可以理解为Broker-Leader, 它负责管理整个 集群中所有分区和副本的状态。

  • 当某个Partition-Leader副本出现故障时,由控制器负责为该分区选举新的Leader副本。
  • 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
  • 当为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。

Controller选举机制

在kafka集群启动的时候,选举的过程是集群中每个broker都会 尝试在zookeeper上创建一个 /controller 临时节点,zookeeper会保证有且仅有一个broker能创建成功,这个broker 就会成为集群的总控器controller。

当这个controller角色的broker宕机了,此时zookeeper临时节点会消失,集群里其他broker会一直监听这个临时节 点,发现临时节点消失了,就竞争再次创建临时节点,就是我们上面说的选举机制,zookeeper又会保证有一个broker 成为新的controller。 具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:

  1. 监听broker相关的变化。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker 增减的变化。
  2. 监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减 的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。
  3. 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic 所对应的Zookeeper中的/brokers/topics/节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。
  4. 更新集群的元数据信息,同步到其他普通的broker节点中

Partition副本选举Leader机制

controller感知到分区Leader所在的broker挂了,controller会从 ISR列表(参数unclean.leader.election.enable=false的前提下)里挑第一个broker作为leader(第一个broker最先放进ISR 列表,可能是同步数据最多的副本),如果参数unclean.leader.election.enable为true,代表在ISR列表里所有副本都挂 了的时候可以在ISR列表以外的副本中选leader,这种设置,可以提高可用性,但是选出的新leader有可能数据少很多。 副本进入ISR列表有两个条件:

  1. 副本节点不能产生分区,必须能与zookeeper保持会话以及跟leader副本网络连通
  2. 副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步滞后的副本,是由 replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)

消费者消费消息的offset记录机制

每个consumer会定期将自己消费分区的offset提交给kafka内部topic:consumer_offsets,提交过去的时候,key是 consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的 那条数据

因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过 offsets.topic.num.partitions设置),这样可以通过加机器的方式抗大并发。


消费者Rebalance机制

rebalance就是说如果消费组里的消费者数量有变化或消费的分区数有变化,kafka会重新分配消费者与消费分区的关系。 比如consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他。

注意:rebalance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进 行rebanlance。

如下情况可能会触发消费者rebalance:

  1. 消费组里的consumer增加或减少了
  2. 动态给topic增加了分区
  3. 消费组订阅了更多的topic

rebalance过程中,消费者无法从kafka消费消息,这对kafka的TPS会有影响,如果kafka集群内节点较多,比如数百 个,那重平衡可能会耗时极多,所以应尽量避免在系统高峰期的重平衡发生。

Rebalance过程如下

当有消费者加入消费组时,消费者、消费组及组协调器之间会经历以下几个阶段
image.png

第一阶段:选择组协调器

组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控 这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。 consumer group中的每个consumer启动时会向kafka集群中的某个节点发送 FindCoordinatorRequest 请求来查找对 应的组协调器GroupCoordinator,并跟其建立网络连接。 组协调器选择方式: 通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker 就是这个consumer group的coordinator 公式:hash(consumer group id) % 对应主题的分区数

第二阶段:加入消费组JOIN GROUP

在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。然后GroupCoordinator 从一个consumer group中 选择第一个加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个 leader会负责制定分区方案。

第三阶段( SYNC GROUP)

consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各 个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。


消费者Rebalance分区分配策略

主要有三种rebalance的策略:range、round-robin、sticky。默认情况为range分配策略。

假设一个主题有10个分区(0-9),现在有三个consumer消费:

range策略:按照分区序号排序分配,假设 n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,那么前 m 个消 费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区。 比如分区0~ 3给一个consumer,分区4~ 6给一个consumer,分区7~9给一个consumer。

round-robin策略:轮询分配,比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、 8给一个consumer

sticky策略: 初始时分配策略与round-robin类似,但是在rebalance的时候,需要保证如下两个原则。

  1. 分区的分配要尽可能均匀 。
  2. 分区的分配尽可能与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标 。这样可以最大程度维持原来的分区分配的策略。 比如对于第一种range情况的分配,如果第三个consumer挂了,那么重新用sticky策略分配的结果如下: consumer1除了原有的0~ 3,会再分配一个7 consumer2除了原有的4~ 6,会再分配8和9


producer发布消息机制剖析

  1. 写入方式

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘 比 随机写 效率要高,保障 kafka 吞吐率)。
2. 消息路由

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

1. 指定了 patition,则直接使用;
2. 未指定 patition 但指定 key,通过`hash(key)%分区数`算出路由的patition, 如果patition 和 key 都未指定,使用轮询选出一个patition。
  1. 写入流程

image.png

  1. producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
  2. producer 将消息发送给该 leader
  3. leader 将消息写入本地 log
  4. followers 从 leader pull 消息,写入本地 log 后 向leader 发送 ACK
  5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

HW与LEO

HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW, consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状 态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW, 此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broker的读取请求,没有HW的限制。


日志分段存储

Kafka 一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段存储的, 每个段的消息都存储在不一样的log文件里,kafka规定了一个段位的 log 文 件最大为 1G,做这个限制目的是为了方便把 log 文件加载到内存去操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
js复制代码1 # 部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件, 
2 # 如果要定位消息的offset会先在这个文件里快速定位,再去log文件里找具体消息
3 00000000000000000000.index
4 # 消息存储文件,主要存offset和消息体
5 00000000000000000000.log
6 # 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset到timeindex文件,
7 # 如果需要按照时间来定位消息的offset,会先在这个文件里查找
8 00000000000000000000.timeindex
9
10 00000000000005367851.index
11 00000000000005367851.log
12 00000000000005367851.timeindex
13
14 00000000000009936472.index
15 00000000000009936472.log
16 00000000000009936472.timeindex

这个 9936472 之类的数字,就是代表了这个日志段文件里包含的起始 Offset,也就说明这个分区里至少都写入了接近 1000 万条数据了。 Kafka Broker 有一个参数,log.segment.bytes,限定了每个日志段文件的大小,最大就是 1GB。 一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做 log rolling,正在被写入的那个日志段文件,叫做 active log segment。


最后附一张zookeeper节点数据图

image.png

MQ带来的一些问题、及解决方案

  1. 如何保证顺序消费?

  1. RabbitMQ : 一个Queue对应一个Consumer即可解决。
  2. RocketMQ
    • 全局有序:Topic里面只有一个MessageQueue即可。
    • 局部有序: 根据路由算法,比如hash(key)%队列数得到路由索引,使得需要保证有序的消息都路由到同一个MessageQueue。
  3. Kafka:
    • 全局有序:Topic里面只有一个Partition即可。
    • 局部有序: 根据路由算法,比如hash(key)%分区数得到路由索引,使得需要保证有序的消息都路由到同一个Partition。
  1. 如何实现延迟消费?

  1. RabbitMQ: 两种方案
    • 死信队列 + TTL
    • 引入RabbitMQ的延迟插件
  2. RocketMQ:天生支持延时消息。
  3. Kafka: 步骤如下
    • 专门为要延迟的消息创建一个Topic
    • 新建一个消费者去消费这个Topic
    • 消息持久化
    • 再开一个线程定时去拉取持久化的消息,放入实际要消费的Topic
    • 实际消费的消费者从实际要消费的Topic拉取消息。

image.png

  1. 如何保证消息的可靠性投递

  1. RabbitMQ:
* Broker-->消费者 : 手动ACK
* 生产者-->Broker: 两种方案
* + `1. 数据库持久化`
1
2
3
4
5
6
7
js复制代码1.将业务订单数据和生成的Message进行持久化操作(一般情况下插入数据库,这里如果分库的话可能涉及到分布式事务)
2.将Message发送到Broker服务器中
3.通过RabbitMQ的Confirm机制,在producer端,监听服务器是否ACK。
4.如果ACK了,就将Message这条数据状态更新为已发送。如果失败,修改为失败状态。
5.分布式定时任务查询数据库3分钟(这个具体时间应该根据的时效性来定)之前的发送失败的消息
6.重新发送消息,记录发送次数
7.如果发送次数过多仍然失败,那么就需要人工排查之类的操作。

image.png

优点:能够保证消息百分百不丢失。

缺点:第一步会涉及到分布式事务问题。

* + `2. 消息的延迟投递`
1
2
3
4
5
6
js复制代码流程图中,颜色不同的代表不同的message
1.将业务订单持久化
2.发送一条Message到broker(称之为主Message),再发送相同的一条到不同的队列或者交换机(这条称为确认Message)中。
3.主Message由实际业务处理端消费后,生成一条响应Message。之前的确认Message由Message Service应用处理入库。
4~6.实际业务处理端发送的确认Message由Message Service接收后,将原Message状态修改。
7.如果该条Message没有被确认,则通过rpc调用重新由producer进行全过程。

image.png

优点:相对于持久化方案来说响应速度有所提升

缺点:系统复杂性有点高,万一两条消息都失败了,消息存在丢失情况,仍需Confirm机制做补偿。

  1. RocketMQ:
  • 生产者弄丢数据

Producer在把Message发送Broker的过程中,因为网络问题等发生丢失,或者Message到了Broker,但是出了问题,没有保存下来。针对这个问题,RocketMQ对Producer发送消息设置了3种方式:

1. `同步发送`:天生保证了可靠性投递
2. `异步发送`:需要在回调函数中,根据broker响应的结果自定义实现。
3. `单向发送`:保证不了可靠性投递
  • Broker弄丢数据
      Broker接收到Message暂存到内存,Consumer还没来得及消费,Broker挂掉了

  可以通过持久化设置去解决:

  1. 创建Queue的时候设置持久化,保证Broker持久化Queue的元数据,但是不会持久化Queue里面的消息

  2. 将Message的deliveryMode设置为2,可以将消息持久化到磁盘,这样只有Message支持化到磁盘之后才会发送通知Producer ack

  这两步过后,即使Broker挂了,Producer肯定收不到ack的,就可以进行重发

  • 消费者弄丢数据
      Consumer有消费到Message,但是内部出现问题,Message还没处理,Broker以为Consumer处理完了,只会把后续的消息发送。这时候,就要关闭autoack,消息处理过后,进行手动ack, 多次消费失败的消息,会进入死信队列,这时候需要人工干预。
  1. Kafka:
  • 生产者弄丢数据

设置了 acks=all,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

  • Broker弄丢数据

Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。

此时一般是要求起码设置如下 4 个参数:

    • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
    • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
    • 在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。
    • 在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。

  • 消费者弄丢数据
    你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。 但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

  1. 如何保证消息的幂等?

以 RocketMQ 为例,下面列出了消息重复的场景:

1.发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。

2.投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。

3.负载均衡时消息重复(包括但不限于网络抖动、Broker重启以及消费者应用重启)

当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。

那么,有什么解决方案呢? 直接上图。

image.png

  1. 如何解决消息积压的问题?

关于这个问题,有几个点需要考虑:

1. 如何快速让积压的消息被消费掉?

临时写一个消息分发的消费者,把积压队列里的消息均匀分发到N个队列中,同时一个队列对应一个消费者,相当于消费速度提高了N倍。

修改前:

image.png

修改后:

image.png

2. 积压时间太久,导致部分消息过期,怎么处理?

批量重导。在业务不繁忙的时候,比如凌晨,提前准备好程序,把丢失的那批消息查出来,重新导入到MQ中。

3. 消息大量积压,MQ磁盘被写满了,导致新消息进不来了,丢掉了大量消息,怎么处理?

这个没办法。谁让【消息分发的消费者】写的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Git常用命令大全

发表于 2021-09-12

Git下载

Git是一个免费的开源分布式版本控制系统,旨在快速高效地处理从小型到大型项目的所有内容。Git易于学习, 占地面积小,具有闪电般的快速性能。它具有Subversion,CVS,Perforce和ClearCase之类的SCM工具,具有廉价的本地分支,方便的暂存区域和多个工作流等功能。

配置

1
2
arduino复制代码 git config --global "Your Name"
 git config --global "Email Address"

初始化

1
csharp复制代码 git init

提交修改

1
2
3
4
5
sql复制代码 git add <file>
 git add -u 提交work directory中所有已track的文件至staging area
 git commit -m "descriptions"
 git commit --amend 对最近一次的提交做内容修改
 git commit --amend --author "user_name <user_email>" 修改最近提交用户名和邮箱

查看状态、比对

1
2
3
4
5
6
css复制代码 git status
 git status -s 文件状态缩略信息, 常见 A:新增; M:文件变更; ?:未track; D:删除
 git diff <file>
 git diff HEAD -- <file> 查看工作区和版本库里面最新版本的区别
 git diff --check <file>     检查是否有空白错误(regex:' {1,}$')
 git diff --cached <file>   查看已add的内容(绿M)

查看历史版本、历史操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
perl复制代码 git log
 git reflog
 git log -n                 最近n条的提交历史
 git log <branch_name> -n   分支branch_name最近n条的提交历史
 git log --stat             历次commit的文件变化
 git log --shortstat         对比--stat只显示最后的总文件和行数变化统计(n file changed, n insertions(+), n deletion(-))
 git log --name-status       显示新增、修改、删除的文件清单
 git log lhs_hash..rhs_hash 对比两次commit的变化(增删的主语为lhs, 如git log HEAD~2..HEAD == git log HEAD -3)
 git log -p                 历次commit的内容增删
 git log -p -W               历次commit的内容增删, 同时显示变更内容的上下文
 git log origin/EI-1024 -1 --stat -p -W 查看远端分支EI-1024前一次修改的详细内容
 git log origin/master..dev --stat -p -W 查看本地dev分支比远端master分支变化(修改)的详细内容
 ​
 git log <branch_name> --oneline   对提交历史单行排列
 git log <branch_name> --graph     对提交历史图形化排列
 git log <branch_name> --decorate 对提交历史关联相关引用, 如tag, 本地远程分支等
 git log <branch_name> --oneline --graph --decorate 拼接一下, 树形化显示历史
 git log --graph --pretty=format:'%Cred%h%Creset -%C(yellow)%d%Creset %s %Cgreen%ai(%cr) %C(bold blue)<%an>%Creset' --abbrev-commit 同上, 建议alais保存
 ​
 git log --pretty=format 常用的选项(摘自progit_v2.1.9)
 %H 提交对象(commit)的完整哈希字串
 %h 提交对象的简短哈希字串
 %T 树对象(tree)的完整哈希字串
 %t 树对象的简短哈希字串
 %P 父对象(parent)的完整哈希字串
 %p 父对象的简短哈希字串
 %an 作者(author)的名字
 %ae 作者的电子邮件地址
 %ad 作者修订日期(可以用 --date= 选项定制格式)
 %ar 作者修订日期,按多久以前的方式显示
 %cn 提交者(committer)的名字
 %ce 提交者的电子邮件地址
 %cd 提交日期
 %cr 提交日期,按多久以前的方式显示
 %s 提交说明
 ​
 git log --since --after     显示时间之后的提交
 git log --until --before   显示时间之前的提交
 git --author               显示指定作者的提交
 git --committer             显示指定committer的提交(注:committer不一定是author)
 git log -S [keyword]       仅显示添加或移除了某个关键字的提交(某些场景比单独git log -p | grep [keyword] 好用很多)
 git log origin/b3.3/master --author=yx-ren --since="2019-10-01" --before="2019-11-01" 查看某作者在某发布版本最近一个月的提交, 常见于线上背锅
 git log origin/b3.0/master --author=some_leave --since="1 month ago" 查看某刚离职同事过去一个月的提交, 常见于背锅
 git log --since=1.weeks     过去一周的提交(写周报的时候可以看看我这一周干了啥)
 git log --since=1.days     过去一天的提交(下班的时候可以看看我这一天干了啥)
 git log --since="1 weeks 2 days 3 hours 40 minutes 50 seconds ago" 过去1周2天3小时40分50秒之内的提交

版本回退、前进

1
2
3
css复制代码 git reset --hard HEAD^      回退到上1版本
 git reset --hard HEAD~5 回退到上5个版本
 git reset --hard id 回退到指定版本

撤销修改

1
2
3
4
sql复制代码 git checkout -- <file>      撤销修改:误修改工作区文件,未git add/commit
 git restore <file> 撤销修改:误修改工作区文件,未git add/commit
 git reset HEAD <file> 撤销git add:误将文件加入暂存区(git add),未git commit
 git reset --hard HEAD^ 撤销git commit:误将文件提交(一旦提交,只能通过版本回退进行撤销)

删除与恢复

1
2
3
xml复制代码 git rm/add <file>
 git commit -m "remove <file>" 删除版本库中的<file>:删除工作区文件后,继续删除版本库中相应的文件
 git checkout -- <file> 根据版本库中的<file>恢复工作区<file>

清理工作区

未track也未ignore的文件或文件夹(如各种临时.swp, .patch文件等)

1
2
3
4
5
bash复制代码git clean -i    #交互式清理, 不常用
git clean -n #查看清理文件列表(不包括文件夹), 不执行实际清理动作
git clean -n -d #查看清理文件列表(包括文件夹), 不执行实际清理动作
git clean -f #清理所有未track文件
git clean -df #清理所有未track文件和文件夹, 常用, 但使用前确保新增加的文件或文件夹已add, 否则新创建的文件或者文件夹也会被强制删除

关联GitHub远程仓库

本地到远程

1
2
3
4
5
6
7
xml复制代码git remote add origin <remote address>	在本地工作区目录下按照 GitHub 提示进行关联
git remote rm origin 解除错误关联
git push -u origin master 第一次将本地仓库推送至远程仓库(每次在本地提交后进行操作)
git push origin master 以后每次将本地仓库推送至远程仓库(每次在本地提交后进行操作)
<remote address>:
git@github.com:<username>/<repository>.git
https://github.com/<username>/<repository>.git

克隆GitHub远程仓库

远程到本地

1
bash复制代码git clone <remote address>	git协议速度更快但通常公司内网不允许,https协议速度慢

分支管理

创建、切换、查看、合并、删除

1
2
3
4
5
6
7
8
xml复制代码git branch <branch name>	创建<branch name>分支
git checkout <branch name> 切换至<branch name>分支
git switch <branch name> 切换至<branch name>分支
git checkout -b <branch name> 创建并切换至<branch name>分支
git switch -c <branch name> 创建并切换至<branch name>分支
git branch 查看已有分支(* 表示当前分支)
git merge <branch name> 合并<branch name>到当前分支(通常在master分支下操作)
git branch -d <branch name> 删除分支

解决合并冲突

1
2
3
css复制代码合并时报错“分支发生冲突”,首先vim相应文件,修改冲突位置,然后按照git add/commit重新提交,最后删除多余分支即可。
git log --graph --pretty=oneline --abbrev-commit
git log --graph

分支管理

合并后删除分支也在 log 中保留分支记录

1
perl复制代码git merge --no-ff -m "descriptions" <branch name>

开发流程

1
2
3
xml复制代码master分支		发布稳定版本
dev分支 发布开发版本
<developer name>分支 个人开发分支(个人开发完成将该分支并入dev,同时保留该分支,继续开发)

References

1
ruby复制代码https://git-scm.com/book/en/v2

\

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

TCP可靠传输原理

发表于 2021-09-12

停止等待协议

“停止等待”就是发送方在发送完一个分组后停止发送,等待接收方的确认后再继续发送。

超时重传

发送方在等待一定时间后如果还没有收到接收方的确认,此时发送方将认定分组没有送达,从而重新发送分组。

TCP通过以下的方式实现超时重传:

  • 超时计时器:每发送完一个分组后,tcp都会设置一个超时计时器。超时计时器的超时时间往往要大于报文的平均往返时间。
  • 分组副本:发送分组后,tcp会保留分组的副本,只有收到分组的确认后才会清除
  • 分组编号:TCP会对每一个分组编号,确认分组和发送的分组编号对应。

连续ARQ协议

如果TCP每发送一个分组就要等待的话,势必会浪费大量的时间,使得网络利用率降低。所以TCP采用了连续ARQ的方式,也就是每次发送多个分组,然后使用累积确认的方式确认。

  • 累积确认:接收方只对按序到达的最后一个分组发送确认。

假如发送方一次性发送了[1,2,3,4,5]五个分组,接收方只接受到了[1,2,4,5]四个分组。按照按序最后一个的规则,接收方只会发送 2 号分组的确认。发送方将收不到后面三个分组的确认,所以会重传3,4,5。

累积确认使用了滑动窗口实现,它的优点是不需要对每个分组发送确认,从而减少了网络开销。缺点是不能向发送方真实反映收到分组的信息,比如上面例子里,发送方认为 3 号分组以后的都没有收到,但是接收方其实是收到了4,5号分组的。

以字节为单位的滑动窗口

IMG_0470(20210912-144940).PNG

发送方和接收方分别维护 发送窗口 和 接收窗口 两个滑动窗口。

发送窗口维护了三个指针p1、p2、p3,它们划分了发送窗口的区域:

  • [p1, p2):等待确认区域,记录已发送但没收到确认的分组。
  • [p2, p3] :可用窗口,允许发送但是还未发送的分组。

既然是滑动窗口,那么它的左右边界应该是能够移动的,下面来分析发送窗口的左右边界的移动。

p2前移

p2指针指向的是第一个允许发送但还未发送的分组,所以p2的前移是发送方发送了新的分组。

p1前移

p1指针只有在收到确认后才会移动到被确认分组的下一个分组。

因为采用的累积确认的方式,接收方只会发送按序到达最后一个分组的确认,所以p1的前移可能不止一个分组。

比如向上图的情况,假如接收方收到了31,32,33三个分组,它只会发送按序到达最后分组的确认,也就是33号分组的确认。此时发送方的p1指针将会直接从31号位置移动到34号位置,也就是收到确认分组的下一个。

p3前移

p3的前移是收到接收方发送的确认报文的窗口字段控制的。

窗口值表示从确认分组号开始到p3的数据量(字节)。比如确认分组号为101,窗口值为200,那么发送窗口就还能将窗口扩大200字节。

接收方通过窗口值来控制发送窗口的大小也叫做流量控制,这里不过多介绍。

TCP缓存

TCP既然能够保留未确认的分组以及按序发送确认,它肯定需要一个内存空间作为缓存,而不是直接用应用进程的内存。

IMG_0471(20210912-152048).PNG

如图所示,接收方和发送方各自维护了一个缓存,发送窗口和接收窗口都在这个缓存中。首先TCP缓存有以下特点:

  • 因为缓存空间和序号有限,TCP缓存是循环使用的,是一个环形的结构。
  • 滑动窗口只是缓存的一部分,已经确认的数据会被删除。

发送缓存和接收缓存结构相同但是作用不同。

发送缓存

  • 缓存应用程序让TCP发送的数据
  • 暂存已经发送但未收到确认的数据

接收缓存

  • 暂存未按序到达的数据
  • 缓存按序到达但没有被应用程序读取的数据

总结

  1. TCP可靠传输的原理是停止等待协议和连续ARQ
  2. 超时重传时间大于分组平均往返时间
  3. 连续ARQ采用了累积确认的方式发送确认
  4. TCP通过发送窗口和接收窗口实现可靠传输
  5. 发送窗口大小受到接收方的窗口值控制,窗口值单位为字节
  6. 滑动窗口是TCP缓存的一部分,TCP缓存是一个环形结构,还负责缓存应用程序数据

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring 官宣,Spring Framework 6 和

发表于 2021-09-12

今日推荐:Github 标星 100k!2021 最新Java 学习线路图是怎样的?

SpringOne 大会上宣布了一件重大的事情:Spring Framework 6 和 Spring Boot 3 计划在 2022 年第四季度能够达到生产可用的发布标准。

Spring 6.0 的完整发布路线图如下:

简单解释一下不同版本的区别:

  • M1 M2 M3 M4 中 M 是 Milestone 里程碑的意思。代表功能可能还不完整,可能存在一些问题。
  • RC1 RC2 RC3 中的 RC 是 Release Candidate 的缩写,翻译过来的意思就是发布候选。代表功能完整且相对稳定,主要进行问题解决。
  • GA 是 General Availability 的缩写,翻译过来的一般可用,代表稳定可用于生产的版本。

Spring Framework 6 可以说是 Spring 下一步重点规划的一个项目,标志着 Spring 进入新的时代,很大程度上关系了 Spring 能够沿袭过去的辉煌。

在框架设计上,Spring Framework 6 相对来说比较激进。 Spring Framework 6 和 Spring Boot 3 在运行时至少需要 JDK 17,以及至少 Tomcat 10 / Jetty 11(为了与 Jakarta EE 9 兼容)。

Jakarta EE : Java EE,Java 平台企业版(Java Platform Enterprise Edition),之前称为 Java 2 Platform, Enterprise Edition (J2EE),2018 年 3 月更名为 Jakarta EE

这个也是在 Spring 官方在深思熟虑之后商定的结果。

因为,JDK 17 在 2022 年第四季度发表之前将取代 JDK 11 作为下一个长期支持的 JDK 版本。同时,这也是为 JDK 18 和 JDK 19 ,Jakarta EE 10 的到来做准备。

Spring 官方认为 JDK 11 仅仅是一个过渡使用的 JDK 版本,而 JDK 17 几乎是一个全新的编程语言,增强和完善了 API 和 JVM,这让升级 JDK 17 成为更具吸引力的选择。

原话是这样说的:“in comparison, JDK 11 is a transitional release. Also, JDK 17 provides an accumulated set of recent language, API and JVM enhancements, making it a more compelling upgrade.”

Spring Framework 5.3.x 和 Spring Boot 2.x 暂时仍在积极开发中,Spring Boot 2.6 和 Spring Boot 2.7 仍然会基于 Spring Framework 5.3.x。

预计会在今年 11 月推出 Spring Boot 2.6,明年 5 月份推出 Spring Boot 2.7。

SpringOne 大会上还详细介绍了 Spring Native 的相关情况以及最新进展。

Spring Native 是什么呢?官方是这样介绍的:

“Spring Native provides beta support for compiling Spring Boot applications to native executables with GraalVM, providing a new way to deploy Spring Boot applications that then run extremely efficiently.”

简单来说,这就是一种使用GraalVM将 Spring 应用编译成原生镜像的技术,你可以将其看作是部署 Spring Boot 应用程序的新方法,更加高效快速!

根据官方介绍,Spring Native 的构建时间通常要长一些,不过在容器镜像大小、内存占用、启动时间上优势非常大!

强烈建议小伙伴们找到对应的 PPT(我已整理,文末领取即可) 和视频看一看,讲解的非常清楚。

为了应对云原生时代其他编程语言的挑战,Spring 表示自己正在竭尽全力打造一个强大的 Java 云原生生态系统。拭目以待吧!

相关资料:

  • From Spring Framework 5.3 to 6.0
  • A Java 17 and Jakarta EE 9 baseline for Spring Framework 6

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官:生成订单 30 分钟未支付,则自动取消,该怎么实现?

发表于 2021-09-12

在开发中,往往会遇到一些关于延时任务的需求。

例如

  • 生成订单30分钟未支付,则自动取消
  • 生成订单60秒后,给用户发短信

对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别

定时任务有明确的触发时间,延时任务没有

定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期

定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务

下面,我们以判断订单是否超时为例,进行方案分析

方案分析

1、数据库轮询

思路

该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作

实现

博主当年早期是用quartz来实现的(实习那会的事),简单介绍一下

maven项目引入一个依赖如下所示

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.2</version>
</dependency>

调用Demo类MyJob如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
java复制代码package com.rjzheng.delay1;

import org.quartz.JobBuilder;

import org.quartz.JobDetail;

import org.quartz.Scheduler;

import org.quartz.SchedulerException;

import org.quartz.SchedulerFactory;

import org.quartz.SimpleScheduleBuilder;

import org.quartz.Trigger;

import org.quartz.TriggerBuilder;

import org.quartz.impl.StdSchedulerFactory;

import org.quartz.Job;

import org.quartz.JobExecutionContext;

import org.quartz.JobExecutionException;

public class MyJob implements Job {

public void execute(JobExecutionContext context)

throws JobExecutionException {

System.out.println("要去数据库扫描啦。。。");

}

public static void main(String[] args) throws Exception {

// 创建任务

JobDetail jobDetail = JobBuilder.newJob(MyJob.class)

.withIdentity("job1", "group1").build();

// 创建触发器 每3秒钟执行一次

Trigger trigger = TriggerBuilder

.newTrigger()

.withIdentity("trigger1", "group3")

.withSchedule(

SimpleScheduleBuilder.simpleSchedule()

.withIntervalInSeconds(3).repeatForever())

.build();

Scheduler scheduler = new StdSchedulerFactory().getScheduler();

// 将任务及其触发器放入调度器

scheduler.scheduleJob(jobDetail, trigger);

// 调度器开始调度任务

scheduler.start();

}

}

运行代码,可发现每隔3秒,输出如下

要去数据库扫描啦。。。

优缺点

优点:简单易行,支持集群操作

缺点:(1)对服务器内存消耗大

(2)存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟

(3)假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大

2、JDK的延迟队列

思路

该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。

DelayedQueue实现工作流程如下图所示

其中Poll():获取并移除队列的超时元素,没有则返回空

take():获取并移除队列的超时元素,如果没有则wait当前线程,直到有元素满足超时条件,返回结果。

实现

定义一个类OrderDelay实现Delayed,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码package com.rjzheng.delay2;

import java.util.concurrent.Delayed;

import java.util.concurrent.TimeUnit;

public class OrderDelay implements Delayed {

private String orderId;

private long timeout;

OrderDelay(String orderId, long timeout) {

this.orderId = orderId;

this.timeout = timeout + System.nanoTime();

}

public int compareTo(Delayed other) {

if (other == this)

return 0;

OrderDelay t = (OrderDelay) other;

long d = (getDelay(TimeUnit.NANOSECONDS) - t

.getDelay(TimeUnit.NANOSECONDS));

return (d == 0) ? 0 : ((d < 0) ? -1 : 1);

}

// 返回距离你自定义的超时时间还有多少

public long getDelay(TimeUnit unit) {

return unit.convert(timeout - System.nanoTime(),TimeUnit.NANOSECONDS);

}

void print() {

System.out.println(orderId+"编号的订单要删除啦。。。。");

}

}

运行的测试Demo为,我们设定延迟时间为3秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
java复制代码package com.rjzheng.delay2;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.DelayQueue;

import java.util.concurrent.TimeUnit;

public class DelayQueueDemo {

public static void main(String[] args) {

// TODO Auto-generated method stub

List<String> list = new ArrayList<String>();

list.add("00000001");

list.add("00000002");

list.add("00000003");

list.add("00000004");

list.add("00000005");

DelayQueue<OrderDelay> queue = newDelayQueue<OrderDelay>();

long start = System.currentTimeMillis();

for(int i = 0;i<5;i++){

//延迟三秒取出

queue.put(new OrderDelay(list.get(i),

TimeUnit.NANOSECONDS.convert(3,TimeUnit.SECONDS)));

try {

queue.take().print();

System.out.println("After " +

(System.currentTimeMillis()-start) + " MilliSeconds");

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
yaml复制代码00000001编号的订单要删除啦。。。。

After 3003 MilliSeconds

00000002编号的订单要删除啦。。。。

After 6006 MilliSeconds

00000003编号的订单要删除啦。。。。

After 9006 MilliSeconds

00000004编号的订单要删除啦。。。。

After 12008 MilliSeconds

00000005编号的订单要删除啦。。。。

After 15009 MilliSeconds

可以看到都是延迟3秒,订单被删除

优缺点

优点:效率高,任务触发时间延迟低。

缺点:

(1)服务器重启后,数据全部消失,怕宕机 (2)集群扩展相当麻烦 (3)因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常 (4)代码复杂度较高

3、时间轮算法

思路

先上一张时间轮的图(这图到处都是啦)

时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。

如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)

实现

我们用Netty的HashedWheelTimer来实现

给Pom加上下面的依赖

1
2
3
4
5
6
7
8
9
xml复制代码<dependency>

<groupId>io.netty</groupId>

<artifactId>netty-all</artifactId>

<version>4.1.24.Final</version>

</dependency>

测试代码HashedWheelTimerTest如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
java复制代码package com.rjzheng.delay3;

import io.netty.util.HashedWheelTimer;

import io.netty.util.Timeout;

import io.netty.util.Timer;

import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

public class HashedWheelTimerTest {

static class MyTimerTask implements TimerTask{

boolean flag;

public MyTimerTask(boolean flag){

this.flag = flag;

}

public void run(Timeout timeout) throws Exception {

// TODO Auto-generated method stub

System.out.println("要去数据库删除订单了。。。。");

this.flag =false;

}

}

public static void main(String[] argv) {

MyTimerTask timerTask = new MyTimerTask(true);

Timer timer = new HashedWheelTimer();

timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);

int i = 1;

while(timerTask.flag){

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

System.out.println(i+"秒过去了");

i++;

}

}

}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码1秒过去了

2秒过去了

3秒过去了

4秒过去了

5秒过去了

要去数据库删除订单了。。。。

6秒过去了

优缺点

优点:效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。

缺点:

(1)服务器重启后,数据全部消失,怕宕机

(2)集群扩展相当麻烦

(3)因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常

4、redis缓存

  • 思路一

利用redis的zset,zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值

添加元素:ZADD key score member [[score member] [score member] …]

按顺序查询元素:ZRANGE key start stop [WITHSCORES]

查询元素score:ZSCORE key member

移除元素:ZREM key member [member …]

测试如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
bash复制代码添加单个元素

redis> ZADD page_rank 10 google.com

(integer) 1

添加多个元素

redis> ZADD page_rank 9 baidu.com 8 bing.com

(integer) 2

redis> ZRANGE page_rank 0 -1 WITHSCORES

1) "bing.com"

2) "8"

3) "baidu.com"

4) "9"

5) "google.com"

6) "10"

查询元素的score值

redis> ZSCORE page_rank bing.com

"8"

移除单个元素

redis> ZREM page_rank google.com

(integer) 1

redis> ZRANGE page_rank 0 -1 WITHSCORES

1) "bing.com"

2) "8"

3) "baidu.com"

4) "9"

那么如何实现呢?我们将订单超时时间戳与订单号分别设置为score和member,系统扫描第一个元素判断是否超时,具体如下图所示

实现一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
java复制代码package com.rjzheng.delay4;

import java.util.Calendar;

import java.util.Set;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.Tuple;

public class AppTest {

private static final String ADDR = "127.0.0.1";

private static final int PORT = 6379;

private static JedisPool jedisPool = new JedisPool(ADDR, PORT);

public static Jedis getJedis() {

return jedisPool.getResource();

}

//生产者,生成5个订单放进去

public void productionDelayMessage(){

for(int i=0;i<5;i++){

//延迟3秒

Calendar cal1 = Calendar.getInstance();

cal1.add(Calendar.SECOND, 3);

int second3later = (int) (cal1.getTimeInMillis() / 1000);

AppTest.getJedis().zadd("OrderId",second3later,"OID0000001"+i);

System.out.println(System.currentTimeMillis()+"ms:redis生成了一个订单任务:订单ID为"+"OID0000001"+i);

}

}

//消费者,取订单

public void consumerDelayMessage(){

Jedis jedis = AppTest.getJedis();

while(true){

Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);

if(items == null || items.isEmpty()){

System.out.println("当前没有等待的任务");

try {

Thread.sleep(500);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

continue;

}

int score = (int) ((Tuple)items.toArray()[0]).getScore();

Calendar cal = Calendar.getInstance();

int nowSecond = (int) (cal.getTimeInMillis() / 1000);

if(nowSecond >= score){

String orderId = ((Tuple)items.toArray()[0]).getElement();

jedis.zrem("OrderId", orderId);

System.out.println(System.currentTimeMillis() +"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);

}

}

}

public static void main(String[] args) {

AppTest appTest =new AppTest();

appTest.productionDelayMessage();

appTest.consumerDelayMessage();

}

}

此时对应输出如下

可以看到,几乎都是3秒之后,消费订单。

然而,这一版存在一个致命的硬伤,在高并发条件下,多消费者会取到同一个订单号,我们上测试代码ThreadTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码package com.rjzheng.delay4;

import java.util.concurrent.CountDownLatch;

public class ThreadTest {

private static final int threadNum = 10;

private static CountDownLatch cdl = newCountDownLatch(threadNum);

static class DelayMessage implements Runnable{

public void run() {

try {

cdl.await();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

AppTest appTest =new AppTest();

appTest.consumerDelayMessage();

}

}

public static void main(String[] args) {

AppTest appTest =new AppTest();

appTest.productionDelayMessage();

for(int i=0;i<threadNum;i++){

new Thread(new DelayMessage()).start();

cdl.countDown();

}

}

}

输出如下所示

显然,出现了多个线程消费同一个资源的情况。

解决方案

(1)用分布式锁,但是用分布式锁,性能下降了,该方案不细说。

(2)对ZREM的返回值进行判断,只有大于0的时候,才消费数据,于是将consumerDelayMessage()方法里的

1
2
3
4
5
6
7
8
9
scss复制代码if(nowSecond >= score){

String orderId = ((Tuple)items.toArray()[0]).getElement();

jedis.zrem("OrderId", orderId);

System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);

}

修改为

1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码if(nowSecond >= score){

String orderId = ((Tuple)items.toArray()[0]).getElement();

Long num = jedis.zrem("OrderId", orderId);

if( num != null && num>0){

System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);

}

}

在这种修改后,重新运行ThreadTest类,发现输出正常了

  • 思路二

该方案使用redis的Keyspace Notifications,中文翻译就是键空间机制,就是利用该机制可以在key失效之后,提供一个回调,实际上是redis会给客户端发送一个消息。是需要redis版本2.8以上。

实现二

在redis.conf中,加入一条配置

notify-keyspace-events Ex

运行代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
java复制代码package com.rjzheng.delay5;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPubSub;

public class RedisTest {

private static final String ADDR = "127.0.0.1";

private static final int PORT = 6379;

private static JedisPool jedis = new JedisPool(ADDR, PORT);

private static RedisSub sub = new RedisSub();

public static void init() {

new Thread(new Runnable() {

public void run() {

jedis.getResource().subscribe(sub, "__keyevent@0__:expired");

}

}).start();

}

public static void main(String[] args) throws InterruptedException {

init();

for(int i =0;i<10;i++){

String orderId = "OID000000"+i;

jedis.getResource().setex(orderId, 3, orderId);

System.out.println(System.currentTimeMillis()+"ms:"+orderId+"订单生成");

}

}

static class RedisSub extends JedisPubSub {

<ahref='http://www.jobbole.com/members/wx610506454'>@Override</a>

public void onMessage(String channel, String message) {

System.out.println(System.currentTimeMillis()+"ms:"+message+"订单取消");

}

}

}

输出如下

可以明显看到3秒过后,订单取消了

ps:redis的pub/sub机制存在一个硬伤,官网内容如下

原:Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.

翻: Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。因此,方案二不是太推荐。当然,如果你对可靠性要求不高,可以使用。

优缺点

优点:(1)由于使用Redis作为消息通道,消息都存储在Redis中。如果发送程序或者任务处理程序挂了,重启之后,还有重新处理数据的可能性。(2)做集群扩展相当方便 (3)时间准确度高

缺点:(1)需要额外进行redis维护

5、使用消息队列

我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter

lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。结合以上两个特性,就可以模拟出延迟消息的功能,具体的,我改天再写一篇文章,这里再讲下去,篇幅太长。

优缺点

优点: 高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。

缺点:本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高。

原文链接:blog.csdn.net/hjm4702192/…

版权声明:本文为CSDN博主「hjm4702192」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

近期热文推荐:

1.1,000+ 道 Java面试题及答案整理(2021最新版)

2.别在再满屏的 if/ else 了,试试策略模式,真香!!

3.卧槽!Java 中的 xx ≠ null 是什么新语法?

4.Spring Boot 2.5 重磅发布,黑暗模式太炸了!

5.《Java开发手册(嵩山版)》最新发布,速速下载!

觉得不错,别忘了随手点赞+转发哦!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Mac安装java opencv依赖并在intellij上配

发表于 2021-09-12

近期申请驾照时,想着疫情不出门自己拍照PS了一波,结果市政厅大姐用标尺一量发现p的几张照片要不就是头大了,要不就是衣服和背景颜色有问题,好几张照片竟然没一张能用的,无奈只能重来一次。痛定思痛,决定开一个小项目,来自动将纯色背景的单人照输出成符合标准的id照片。

说搞就搞,因为后端比较熟悉Java就决定用Java。(其实图像处理相关的项目用c++写应该会方便很多)那首先要配置好本地用java+opencv的环境了,主要就是下载安装opencv的java包然后配置到intellij上。以下是具体步骤:

  1. Mac上需先安装命令工具
1
lua复制代码xcode-select --install
  1. 安装Ant,必须安装(之前自己想着用maven跳过了,结果导致最终没有生成需要的java文件夹)
1
复制代码brew install ant
  1. 接下来需要运行brew edit opencv先修改opnecv的默认配置, 将-DBUILD_opencv_java=OFF 改为 -DBUILD_opencv_java=ON,以确保之后生成需要的jar文件。
  2. 安装opencv,如果之前有安装过但是没有jar文件的话,需要运行brew reinstall --build-from-source opencv来重装。
1
csharp复制代码brew install --build-from-source opencv
  1. 安装成功后,在/usr/local/Cellar/opencv/4.5.3_2/share/java/opencv4下会有两个需要的文件:
1
2
复制代码libopencv_java453.dylib
opencv-453.jar

6.在intellij中将上面文件夹路径配置为VM Options

image.png

image.png

7.在需要使用opencv api之前需要先加入下面代码以加载opencv模块

1
ini复制代码System.loadLibrary(Core.NATIVE_LIBRARY_NAME);

常见问题:

  1. 配置完后运行报错[no opencv_java453 in java.library.path],这种情况是jar文件没有找到,需要确认路径配置正确,并且需要的两个文件在相应路径下
  2. 配置完成后遇到报错UnsatisfiedLinkError: ‘long org.opencv.imgcodecs.Imgcodecs.imread_1(java.lang.String)’,之中情况是opencv模块没有加载所以无法正确链接到相应的方法。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

LeetCode【14、最长公共前缀】(简单)

发表于 2021-09-11

题目描述

这是 LeetCode 上的 14. 最长公共前缀,难度为 简单。

关键字: 字符串

编写一个函数来查找字符串数组中的最长公共前缀。

如果不存在公共前缀,返回空字符串 ""。

示例 1:

1
2
ini复制代码输入:strs = ["flower","flow","flight"]
输出:"fl"

示例 2:

1
2
3
ini复制代码输入:strs = ["dog","racecar","car"]
输出:""
解释:输入不存在公共前缀。

提示:

  • 1 <= strs.length <= 200
  • 0 <= strs[i].length <= 200
  • strs[i] 仅由小写英文字母组成

划重点

  1. 不存在返回空字符串

解题思路

方法1:

我们可以遍历字符数组,每次拿到两个字符串获取公共前缀,再用这个公共前缀与下一个字符串进行比较获取新的公共前缀,一直到最后得到的就是所有字符串的最长公共前缀。一旦没有公共前缀可以返回空、不需要再比较后面的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码class Solution {
public String longestCommonPrefix(String[] strs) {
// 特殊情况判断
int len = strs.length;
String first = strs[0];
if (len == 1) {
return first;
}
// 定义最小公共前缀
String ans = first;
for (int i = 1; i < len; i++) {
ans = getPrefix(ans, strs[i]);
if (ans.length() == 0) {
break;
}
}
return ans;
}

public String getPrefix(String s1, String s2) {
int len = Math.min(s1.length(), s2.length());
int index = 0;
while (index < len && s1.charAt(index) == s2.charAt(index)) {
++index;
}
return s1.substring(0, index);
}
}
  • 时间复杂度:O(mn)
  • 空间复杂度:O(1)

方法2:

对于一个字符数组取最长公共前缀,我们可以将其看作一个二维的字符数组,每一个字符串就是一个字符数组。所以可以对每一个字符串的同一位进行比较,如果都相同则添加该字符到最长公共前缀中,一旦有一个不同的直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码class Solution {
public String longestCommonPrefix(String[] strs) {
String ans = "";
int len = strs.length;
String first = strs[0];
if (len == 1) {
return first;
}
for (int i = 0; i < 200; i++) {
// 判断当前遍历的位数是不是超过第一个字符串的长度,超过或者正好则直接返回
if (i >= first.length()) {
return ans;
}
// 获取第一个字符串所的遍历到的位的值
char c = first.charAt(i);
for (int j = 1; j < len; j++) {
// 超出长度或者和当前字符不同则返回
if (i >= strs[j].length() || strs[j].charAt(i) != c) {
return ans;
}
}
// 符合条件则将当前字符拼到最长公共前缀
ans += String.valueOf(c);
}
return ans;
}
}
  • 时间复杂度:O(mn)
  • 空间复杂度:O(1)

引用

我的 github 仓库已经同步建立,点击访问

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Mybatis批量插入哪家强?

发表于 2021-09-11

前言

本文使用 Mybatis 进行批量插入,比较两种不同的插入方式的区别。

测试

批量插入注意事项:

1、连接数据库时添加参数 allowMultiQueries=true,支持多语句执行,批处理

2、数据库是否支持大量数据写入,设置 max_allowed_packet参数保证批次提交的数据量

拼接 sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public void batchDemo() {
long start = System.currentTimeMillis();
List<User> list = new ArrayList<>();
for (int i = 0; i < 5000; i++) {
User user = new User();
user.setId(UUID.randomUUID().toString());
user.setName("feiyangyang");
user.setPwd("feiyangyang");
list.add(user);
}
userService.batchForeach(list);
long end = System.currentTimeMillis();
System.out.println("---------------" + (start - end) + "---------------");
}
1
2
3
4
5
6
7
8
xml复制代码<insert id="batchForeach" parameterType="com.fyy.druid.entity.User">
insert into
user(id,`name`,pwd)
values
<foreach collection ="userList" item="user" separator =",">
(#{user.id}, #{user.name}, #{user.pwd})
</foreach>
</insert>

batch插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public void batchInsert() {
long start = System.currentTimeMillis();
SqlSession sqlSession = sqlSessionTemplate.getSqlSessionFactory().openSession(ExecutorType.BATCH);
UserService mapper = sqlSession.getMapper(UserService.class);

for (int i = 0; i < 5000; i++) {
User user = new User();
user.setId(UUID.randomUUID().toString());
user.setName("feiyangyang");
user.setPwd("feiyangyang");
mapper.batchInsert(user);
}
sqlSession.commit();
long end = System.currentTimeMillis();
System.out.println("---------------" + (start - end) + " ---------------");
}

数据对比

拼接sql (ms) batch插入 (ms)
500条 1744 639
2000条 26966 2473
5000条 173668 7382

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…533534535…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%