一文搞定分布式ID-基于Redission和Snowflak

基于Redission 分布式ID:

基于 Redisson 实现分布式 ID 生成通常涉及使用 Redis 的原子操作,如 INCR 或 INCRBY,来确保生成的 ID 是唯一的。
以下是一个简单的示例,展示如何使用 Redisson 来创建一个带有自定义前缀的分布式 ID 生成器。
使用 Maven

配置:

pom.xml 文件中添加以下依赖:

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.4</version>
</dependency>

然后,可以创建一个类来实现分布式 ID 生成器的逻辑:

注意点:

  1. 缓存前缀相关的 RAtomicLong 实例:为了避免每次生成 ID 时都获取一个新的 RAtomicLong 实例,我们可以缓存这些实例。
  2. 批量生成 ID:如果你的应用程序需要频繁生成 ID,可以一次性批量生成多个 ID,以减少与 Redis 服务器的通信次数。
  3. 添加异常处理:在与 Redis 交互时,可能会出现连接问题或其他异常,应该添加异常处理来确保系统的稳定性。
  4. 配置化:使 ID 生成器的某些方面(如键的名称前缀)可配置,以便在不同环境或不同情况下灵活使用。

简化版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码import org.redisson.api.RedissonClient;
import org.redisson.api.RAtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Component
public class DistributedIdGenerator {

private final RedissonClient redissonClient;
private final ConcurrentMap<String, RAtomicLong> atomicLongCache;
private static final String KEY_PREFIX = "id_generator_"; // 可配置化的键前缀
private static final int ID_BATCH_SIZE = 100; // 批量生成 ID 的大小
private long currentId = 0;
private long maxId = 0;

@Autowired
public DistributedIdGenerator(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.atomicLongCache = new ConcurrentHashMap<>();
}

public synchronized String generateId(String prefix) {
// 检查当前批次是否已用完
if (currentId >= maxId) {
// 从 Redis 获取下一个批次的起始 ID
RAtomicLong atomicLong = atomicLongCache.computeIfAbsent(prefix, k -> redissonClient.getAtomicLong(KEY_PREFIX + k));
currentId = atomicLong.addAndGet(ID_BATCH_SIZE);
maxId = currentId + ID_BATCH_SIZE;
}
// 返回自定义前缀和 ID 的组合
return prefix + "_" + (currentId++);
}
}

在上述代码中,generateId 方法现在是同步的,以确保在批量生成 ID 时线程安全。我们使用 currentIdmaxId 变量来跟踪当前批次的 ID 范围。当当前批次的 ID 用完时,通过增加 ID_BATCH_SIZE 来从 Redis 获取下一个批次的起始 ID。

此外,我们使用 ConcurrentMap 来缓存 RAtomicLong 实例,以便我们可以重用它们,而不是每次调用 generateId 时都创建新的实例。

优化版:

针对以上代码,可以进一步优化:
考虑以下方面:

  1. 异步获取 ID:提供异步版本的 ID 获取方法,以避免在高并发环境中阻塞调用线程。
  2. 预取 ID 范围:在当前 ID 范围即将用完时,异步预取下一个 ID 范围,以减少生成 ID 时的等待时间。
  3. 优化锁的使用:减少锁的粒度,避免在高并发下的性能瓶颈。
  4. 监控和告警:加入监控机制,当 ID 的使用接近上限时发送告警。

以下是对 DistributedIdGenerator 类进行进一步优化的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
java复制代码import org.redisson.api.RedissonClient;
import org.redisson.api.RAtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

@Component
public class DistributedIdGenerator {

private final RedissonClient redissonClient;
private final ConcurrentMap<String, AtomicLong> currentIdMap;
private final ConcurrentMap<String, AtomicLong> maxIdMap;
private static final String KEY_PREFIX = "id_generator_";
private static final int ID_BATCH_SIZE = 100;
private static final int PREFETCH_THRESHOLD = 10; // 当剩余 ID 少于此值时,开始预取

@Autowired
public DistributedIdGenerator(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.currentIdMap = new ConcurrentHashMap<>();
this.maxIdMap = new ConcurrentHashMap<>();
}

public String generateId(String prefix) {
AtomicLong currentId = currentIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));
AtomicLong maxId = maxIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));

long id = currentId.getAndIncrement();
if (id >= maxId.get()) {
synchronized (maxId) {
if (id >= maxId.get()) {
RAtomicLong atomicLong = redissonClient.getAtomicLong(KEY_PREFIX + prefix);
long newMaxId = atomicLong.addAndGet(ID_BATCH_SIZE);
maxId.set(newMaxId);
currentId.set(newMaxId - ID_BATCH_SIZE + 1);
id = currentId.getAndIncrement();
}
}
}

// Prefetch next ID range if necessary
long remainingIds = maxId.get() - currentId.get();
if (remainingIds <= PREFETCH_THRESHOLD) {
CompletableFuture.runAsync(() -> fetchNextIdRange(prefix));
}

return prefix + "_" + id;
}
private void fetchNextIdRange(String prefix) {
AtomicLong maxId = maxIdMap.get(prefix);
synchronized (maxId) {
// Double-check in case another thread already fetched the range
if (currentIdMap.get(prefix).get() < maxId.get()) {
return;
}
RAtomicLong atomicLong = redissonClient.getAtomicLong(KEY_PREFIX + prefix);
long newMaxId = atomicLong.addAndGet(ID_BATCH_SIZE);
maxId.set(newMaxId);
}
}
}

在上述代码中,我们使用 ConcurrentMap 来维护当前 ID 和最大 ID 的原子性,同时减少了锁的粒度,只在需要更新最大 ID 时进行同步。此外,我们引入了 PREFETCH_THRESHOLD 来确定何时开始预取下一个 ID 范围,并使用 CompletableFuture 来异步预取,从而避免阻塞当前线程。

为了监控和告警,可以在适当的位置添加日志记录或集成监控系统,以跟踪 ID 的使用情况并在接近上限时发送告警。

请注意,这个优化版本假设 ID 的消耗速率不会在短时间内迅速耗尽当前批次的 ID。如果你的系统有这种可能性,可能需要进一步调整 PREFETCH_THRESHOLD 的值,或者实现更复杂的预取逻辑来确保 ID 的连续供应。

最终版:

最后分布式环境中给出最后代码实现:

在分布式环境中,进一步优化 DistributedIdGenerator 类可能涉及到以下几个方面:

  1. 分段锁:为了减少锁的争用,可以使用分段锁(Striped Locks)来减少同步块的争用。
  2. 本地缓存与预取结合:在本地缓存一定量的 ID 范围,并在接近耗尽时预取下一批次的 ID,以减少对 Redis 的请求频率。
  3. 扩展性和灵活性:考虑到不同的使用场景和需求,提供配置项以调整批次大小、预取阈值等参数。
  4. 故障恢复和重试机制:在与 Redis 交互时可能会遇到网络故障或其他异常,需要实现重试逻辑和故障恢复机制。
  5. 监控和告警:实现监控机制,监控 ID 生成的健康状况和性能指标,并在出现问题时及时告警。

以下是考虑到分布式环境的进一步优化代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
java复制代码import org.redisson.api.RedissonClient;
import org.redisson.api.RAtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

/**
* @Author derek_smart
* @Date 202/4/25 10:15
* @Description 分布式Id 最终版
* <p>
*/
@Component
public class DistributedIdGenerator {

private final RedissonClient redissonClient;
private final ConcurrentMap<String, AtomicLong> currentIdMap;
private final ConcurrentMap<String, AtomicLong> maxIdMap;
private static final String KEY_PREFIX = "id_generator_";

@Value("${id.generator.batch.size:100}")
private int idBatchSize;

@Value("${id.generator.prefetch.threshold:10}")
private int prefetchThreshold;

@Autowired
public DistributedIdGenerator(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.currentIdMap = new ConcurrentHashMap<>();
this.maxIdMap = new ConcurrentHashMap<>();
}

public String generateId(String prefix) {
AtomicLong currentId = currentIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));
AtomicLong maxId = maxIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));

long id = currentId.getAndIncrement();
if (id >= maxId.get()) {
synchronized (maxId) {
if (id >= maxId.get()) {
fetchNextIdRange(prefix, maxId);
id = currentId.getAndIncrement();
}
}
}

// Prefetch next ID range if necessary
long remainingIds = maxId.get() - currentId.get();
if (remainingIds <= prefetchThreshold) {
fetchNextIdRange(prefix, maxId);
}

return prefix + "_" + id;
}

private void fetchNextIdRange(String prefix, AtomicLong maxId) {
synchronized (maxId) {
// Double-check in case another thread already fetched the range
if (currentIdMap.get(prefix).get() < maxId.get()) {
return;
}
RAtomicLong atomicLong = redissonClient.getAtomicLong(KEY_PREFIX + prefix);
long newMaxId = atomicLong.addAndGet(idBatchSize);
maxId.set(newMaxId);
}
}
}

image.png

在上述代码中,我们通过 @Value 注解从配置文件中读取批次大小和预取阈值,使其更加灵活。同时,我们为 fetchNextIdRange 方法添加了一个额外的参数 maxId,以便在预取时直接操作 AtomicLong 对象,而不是每次都从 maxIdMap 中查询。

此外,为了应对分布式环境中的潜在问题,我们可以添加重试逻辑和故障恢复机制。例如,如果与 Redis 的交互失败,可以实现一个重试策略,重试几次后仍然失败则记录错误日志或触发告警。

监控和告警可以通过外部监控系统(如 Prometheus、Grafana)或内部监控机制(如 Spring Boot Actuator、Micrometer)来实现,以确保及时发现和解决问题。

最后,如果需要进一步提高性能,可以考虑使用 Redis 的 Lua 脚本来减少网络往返时间,并原子地执行 ID 范围的获取和更新操作。

为了使用这个 ID 生成器,你可以在你的服务中注入 DistributedIdGenerator 并调用 generateId 方法:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Service
public class SomeService {

@Autowired
private DistributedIdGenerator idGenerator;

public void doSomething() {
// 生成一个带有自定义前缀 "xx" 的分布式 ID
String distributedId = idGenerator.generateId("xx");
// 使用生成的 ID 进行业务操作
}
}

请注意,为了使上述示例工作,你需要配置和提供一个 RedissonClient 实例。这通常在你的 Spring 配置类中完成,如下所示:

1
2
3
4
5
6
7
8
9
10
java复制代码@Configuration
public class RedissonConfig {

@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}

确保 Redis 服务器正在运行,并且配置中的地址和端口与你的 Redis 服务器设置相匹配。以上代码提供了一些基本的分布式 ID 生成器,可以根据自己的业务需求进行调整和扩展。

SnowflakeId 分布式ID:

理论替换方案:

如果没有redis,可以考虑以下替代方案:

  1. UUID (Universally Unique Identifier):
    UUID 是一个很好的选择,因为它可以在本地生成,不需要网络通信,并且碰撞的概率极低。UUID 是 128 位的值,通常以 36 个字符(包括 4 个破折号)的字符串形式表示。
1
2
3
4
5
6
7
java复制代码 import java.util.UUID;

public class UUIDGenerator {
public static String generateUUID() {
return UUID.randomUUID().toString();
}
}
  1. Snowflake 算法:
    Twitter 的 Snowflake 算法是另一个流行的分布式 ID 生成方法。它生成一个 64 位的长整型 ID,其中包含时间戳、数据中心 ID、机器 ID 和序列号。为了使用 Snowflake 算法,你需要确保每个机器或服务实例有唯一的数据中心 ID 和机器 ID。

你可以使用现成的 Snowflake 实现,或者根据 Snowflake 的原理自己实现一个。
3. 数据库序列:
如果你的应用已经使用了关系数据库,你可以依赖数据库的序列功能来生成唯一 ID。大多数现代关系数据库都支持序列,虽然这种方法会引入数据库作为中心化依赖。
4. 分布式唯一 ID 生成器服务:
如果你的系统架构允许,可以部署一个独立的分布式 ID 生成器服务,如 Twitter 的 Snowflake、Sony 的 Flake 或者 Boundary 的 Flake。这些服务可以独立于你的主应用运行,并通过网络调用来生成 ID。
5. 组合策略:
在某些情况下,你可以组合使用以上方法。例如,使用机器 ID 或 IP 地址作为前缀,再加上本地生成的 UUID 或时间戳和随机数的组合。

SnowflakeId解决方案:

简化版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码
public class SnowflakeIdGenerator {

private final long twepoch = 1288834974657L;
private final long workerIdBits = 5L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;

private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

private long workerId;
private long datacenterId;
private long sequence = 0L;
private long lastTimestamp = -1L;

public SnowflakeIdGenerator(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("Worker ID can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("Datacenter ID can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}

public synchronized long nextId() {
long timestamp = timeGen();

if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}

if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}

lastTimestamp = timestamp;

return ((timestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}

private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}

private long timeGen() {
return System.currentTimeMillis();
}

public static void main(String[] args) {
SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(0, 0);
for (int i = 0; i < 1000; i++) {
long id = idGenerator.nextId();
System.out.println(id);
}
}
}

在这个实现中,我们创建了一个 SnowflakeIdGenerator 类,它需要两个参数:workerIddatacenterId。这两个参数用于在分布式环境中区分不同的工作节点和数据中心。

Snowflake 算法生成的是一个 64 位的长整型 ID,其中包含:

  • 1 位未使用(因为 Java 中的长整型是有符号的)
  • 41 位时间戳(毫秒级)
  • 5 位数据中心 ID
  • 5 位工作节点 ID

优化版:

SnowflakeIdGenerator 类以更好地处理分布式和高并发的情况,我们可以考虑以下几点:

  1. 确保时钟回拨问题的处理:在分布式系统中,时钟回拨是一个常见问题。我们需要确保在时钟回拨时,ID 生成器能够正常工作或者抛出异常来避免生成重复的 ID。
  2. 使用原子操作:使用 AtomicLong 来替代 synchronized 方法,以提高并发性能。
  3. 确保每个实例的唯一性:在分布式环境中,每个实例都应该有一个唯一的工作节点 ID 和数据中心 ID。这通常可以通过配置文件或环境变量来设置。

下面是优化后的 SnowflakeIdGenerator 类的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
java复制代码import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

/
* @Author derek_smart
* @Date 202/4/26 10:25
* @Description SnowflakeId id 生成器
* <p>
*/
@Component
public class SnowflakeIdGenerator {

private final long twepoch = 1288834974657L;
private final long workerIdBits = 5L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;

private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

private final long workerId;
private final long datacenterId;
private final AtomicLong sequence = new AtomicLong(0L);
private final AtomicLong lastTimestamp = new AtomicLong(-1L);

public SnowflakeIdGenerator(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("Worker ID can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("Datacenter ID can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}

//异步单个
public CompletableFuture<Long> nextIdAsync() {
return CompletableFuture.supplyAsync(this::nextId);
}
//批量
public List<Long> nextIds(int count) {
List<Long> ids = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
ids.add(nextId());
}
return ids;
}
//异步 多个
public CompletableFuture<List<Long>> nextIdsAsync(int count) {
return CompletableFuture.supplyAsync(() -> nextIds(count));
}

//获取
public long nextId() {
long currentTimestamp = -1L;
long lastTimestampValue = -1L;

while (true) {
lastTimestampValue = lastTimestamp.get();
currentTimestamp = timeGen();

if (currentTimestamp < lastTimestampValue) {
// Clock moved backwards, handle the exception
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestampValue - currentTimestamp));
}

if (currentTimestamp == lastTimestampValue) {
long currentSequence = sequence.incrementAndGet() & sequenceMask;
if (currentSequence == 0) {
currentTimestamp = waitNextMillis(currentTimestamp);
} else {
return generateId(currentTimestamp, currentSequence);
}
} else {
if (sequence.compareAndSet(sequence.get(), 0L)) {
return generateId(currentTimestamp, sequence.get());
}
}
}
}

private long generateId(long currentTimestamp, long sequence) {
return ((currentTimestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}

private long waitNextMillis(long currentTimestamp) {
while (currentTimestamp <= lastTimestamp.get()) {
currentTimestamp = timeGen();
}
lastTimestamp.set(currentTimestamp);
return currentTimestamp;
}

private long timeGen() {
return System.currentTimeMillis();
}
}

1714105622027.png

在这个优化版本中,我们使用 AtomicLong 替代了 synchronized 方法,以提高并发性能。nextId 方法现在是无锁的,使用原子操作来确保序列号的正确递增,并且在时钟回拨时抛出异常。
我们还添加了 waitNextMillis 方法来确保在同一毫秒内序列号用完时,能够等待到下一个毫秒值。generateId 方法是一个帮助方法,用于基于时间戳、数据中心 ID、工作节点 ID 和序列号生成 ID。

请注意,为了在分布式环境中使用此 ID 生成器,你需要确保每个实例都有唯一的工作节点 ID 和数据中心 ID,这通常可以通过配置文件或环境变量来设置。
此外,如果你的系统对时钟回拨非常敏感,可以添加额外的逻辑来处理这种情况,例如使用 NTP(Network Time Protocol)来同步服务器时间。

强健版:

最后我们可以实现批量生成和双缓冲策略及更加强健版本的BufferedSnowflakeIdGenerator

BufferedSnowflakeIdGenerator 并解决潜在的错误处理和资源管理问题,
我们可以使用 ExecutorService 来管理后台线程,以及添加额外的逻辑来确保在系统关闭时正确地关闭和清理这些线程。此外,我们将添加异常处理来确保在填充缓冲区时发生的任何异常都能被捕获和处理。以下是优化后的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
java复制代码import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/
* @Author derek_smart
* @Date 202/4/26 10:45
* @Description 双重缓存 SnowflakeId id 生成器
* <p>
*/
@Component
public class BufferedSnowflakeIdGenerator {

private final Lock bufferLock = new ReentrantLock();
private final SnowflakeIdGenerator snowflakeIdGenerator;
private AtomicReference<long[]> bufferOne;
private AtomicReference<long[]> bufferTwo;
private AtomicLong bufferIndex;
private AtomicReference<long[]> currentBuffer;
private final ExecutorService executorService;

private final int bufferSize;

private String customPrefix;

public BufferedSnowflakeIdGenerator(long workerId, long datacenterId, int bufferSize) {
this.snowflakeIdGenerator = new SnowflakeIdGenerator(workerId, datacenterId);
this.bufferSize = bufferSize;
this.bufferOne = new AtomicReference<>(new long[bufferSize]);
this.bufferTwo = new AtomicReference<>(new long[bufferSize]);
this.bufferIndex = new AtomicLong(-1);
this.currentBuffer = new AtomicReference<>(bufferOne.get());
this.executorService = Executors.newSingleThreadExecutor();

// Initial buffer fill
fillBuffer(bufferOne.get());
fillBuffer(bufferTwo.get());
}

public BufferedSnowflakeIdGenerator(long workerId, long datacenterId, int bufferSize, String customPrefix) {
this.snowflakeIdGenerator = new SnowflakeIdGenerator(workerId, datacenterId);
this.bufferSize = bufferSize;
this.bufferOne = new AtomicReference<>(new long[bufferSize]);
this.bufferTwo = new AtomicReference<>(new long[bufferSize]);
this.bufferIndex = new AtomicLong(-1);
this.currentBuffer = new AtomicReference<>(bufferOne.get());
this.executorService = Executors.newSingleThreadExecutor();
this.customPrefix = customPrefix;
// Initial buffer fill
fillBuffer(bufferOne.get());
fillBuffer(bufferTwo.get());
}


private void fillBuffer(long[] buffer) {
try {
for (int i = 0; i < bufferSize; i++) {
buffer[i] = snowflakeIdGenerator.nextId();
}
} catch (Exception e) {
// Handle exceptions during buffer fill here
e.printStackTrace();
}
}

/
* 带自定义
* @return
*/
public String nextIdWithPrefix() {
return customPrefix + "_" + nextId();
}

/*
* 不带自定义表示
* @return
*/
public long nextId() {
long index = bufferIndex.incrementAndGet();
if (index < bufferSize) {
return currentBuffer.get()[(int) index];
} else {
bufferLock.lock();
try {
// Double check inside the lock
index = bufferIndex.get();
if (index >= bufferSize) {
currentBuffer.set(currentBuffer.get() == bufferOne.get() ? bufferTwo.get() : bufferOne.get());
bufferIndex.set(0);
index = 0;
// Trigger buffer refill asynchronously
long[] bufferToFill = currentBuffer.get() == bufferOne.get() ? bufferTwo.get() : bufferOne.get();
executorService.submit(() -> fillBuffer(bufferToFill));
}
} finally {
bufferLock.unlock();
}
return currentBuffer.get()[(int) index];
}
}

public void shutdown() {
executorService.shutdown();
// Optionally add code to wait for termination and handle tasks that could not be executed
}

// Ensure to call shutdown when the application stops
protected void finalize() throws Throwable {
try {
shutdown();
} finally {
super.finalize();
}
}
}

1714105790485.png

BufferedSnowflakeIdGenerator包含两个缓冲区bufferOnebufferTwo,以及一个指示当前使用的缓冲区的 currentBufferbufferIndex` 跟踪当前缓冲区中的位置。

nextId 方法检查 bufferIndex 是否在 bufferSize 范围内。如果是,它返回当前缓冲区中的 ID。如果 bufferIndex 超出范围,则尝试通过锁来切换缓冲区,并在另一个线程中异步填充已耗尽的缓冲区。

使用了 ExecutorService 来管理后台线程,这样我们就可以更优雅地处理线程的生命周期。fillBuffer 方法包含了异常处理逻辑,以便在填充缓冲区时捕获并处理任何异常。

我们还添加了 shutdown 方法来关闭 ExecutorService。这个方法应该在应用程序停止时被调用,以确保所有后台任务都被正确清理。为了防止忘记调用 shutdown 方法,我们在类的 finalize 方法中也调用了 shutdown,这样即使忘记了显式调用,垃圾收集器在回收对象时也会尝试关闭线程池。

请注意,依赖 finalize 方法来关闭资源并不是一种推荐的做法,因为 finalize 方法的调用时机是不确.

关于上面的各类id 生成,可以采用工厂模式来创建 ID 生成器的实例,并且使用单例模式确保每个节点上 ID 生成器的唯一性。同时,我们可以使用策略模式来允许在不同的 ID 生成策略(如 Snowflake、UUID 等)之间灵活切换。

扩展设计模式:

上面都是各种实现类,完全可以使用工厂设计模式将其统一处理。只是一个简化版。

以下是结合设计模式的分布式 ID 生成器的代码实现:
首先,定义一个 ID 生成策略接口:

1
2
3
java复制代码public interface IdGenerationStrategy {
long generateId();
}

然后,实现 Snowflake 算法作为一个策略:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class SnowflakeStrategy implements IdGenerationStrategy {

// ... (其他成员变量和构造函数保持不变)

@Override
public long generateId() {
return nextId();
}

// ... (nextId, waitNextMillis, timeGen, generateId 方法保持不变)
}

接下来,如果需要,也可以实现一个基于 UUID 的策略:

1
2
3
4
5
6
7
8
9
10
11
java复制代码import java.util.UUID;

public class UUIDStrategy implements IdGenerationStrategy {

@Override
public long generateId() {
// UUIDs are 128-bit values, but we need to return a long (64-bit)
// This can be done by taking the hash of the UUID for example
return UUID.randomUUID().toString().hashCode();
}
}

现在,创建一个 ID 生成器工厂,它根据配置或环境来实例化相应的策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class IdGeneratorFactory {

private static IdGenerationStrategy idGeneratorStrategy;

public static synchronized IdGenerationStrategy getIdGeneratorStrategy() {
if (idGeneratorStrategy == null) {
// The choice of strategy can be made configurable
// For example, based on system properties, configuration files, etc.
String strategyType = System.getProperty("id.generator.strategy", "snowflake");
if ("snowflake".equals(strategyType)) {
long workerId = ...; // Retrieve from configuration
long datacenterId = ...; // Retrieve from configuration
idGeneratorStrategy = new SnowflakeStrategy(workerId, datacenterId);
} else if ("uuid".equals(strategyType)) {
idGeneratorStrategy = new UUIDStrategy();
} else {
throw new IllegalArgumentException("Unknown ID Generator strategy type");
}
}
return idGeneratorStrategy;
}
}

最后,客户端代码可以通过工厂获取 ID 生成器实例,并生成 ID:

1
2
3
4
5
6
7
8
java复制代码public class IdGenerationClient {

public static void main(String[] args) {
IdGenerationStrategy idGenerator = IdGeneratorFactory.getIdGeneratorStrategy();
long id = idGenerator.generateId();
System.out.println("Generated ID: " + id);
}
}

在这个设计中,IdGeneratorFactory 确保了在每个节点上 ID 生成器的唯一性,并且提供了一个全局访问点。IdGenerationStrategy 接口和它的实现类 SnowflakeStrategy 以及 UUIDStrategy 允许在不同的 ID 生成策略之间灵活切换。

这种设计模式的组合提供了良好的扩展性和可维护性,使得添加新的 ID 生成策略变得非常简单,只需要实现 IdGenerationStrategy 接口即可。同时,它也支持分布式环境,因为可以通过配置为不同的节点分配不同的工作节点 ID 和数据中心 ID,从而使得在 Snowflake 策略中生成的 ID 具有全局唯一性。
以上就是分布式id 实现。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%