开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

阿里也出手了!Spring Cloud Alibaba AI

发表于 2024-04-26

什么是Spring AI

Spring AI是从著名的Python项目LangChain和LlamaIndex中汲取灵感,它不是这些项目的直接移植,它的成立信念是,下一波生成式人工智能应用程序将不仅适用于 Python 开发人员,而且将在许多编程语言中无处不在。

我们可以从Spring AI的官网描述中,总结出Spring AI的几个核心的关键词:

  • 提供抽象能力
  • 简化AI应用的开发
  • 模型与向量支持
  • AI集成与自动配置

Spring AI简化了我们构建大型复杂的AI应用的过程,当然如果你的项目仅仅是需要调用一个AI接口,那其实直接调用官方SDK反而更方便。

Spring AI提供的功能如下:

  • 支持所有主要的模型提供商,如OpenAI,Microsoft,Amazon,Google和Huggingface。支持的模型类型包括聊天和文本到图像。
  • 跨 AI 提供商的可移植 API,用于聊天和嵌入模型。支持同步和流 API 选项。还支持下拉以访问特定于模型的功能。
  • 将 AI 模型输出映射到 POJO。
  • 支持所有主要的向量数据库,例如 Azure Vector Search、Chroma、Milvus、Neo4j、PostgreSQL/PGVector、PineCone、Qdrant、Redis 和 Weaviate。
  • 跨 Vector Store 提供程序的可移植 API,包括新颖的类似 SQL 的元数据过滤器 API,该 API 也是可移植的。
  • AI 模型和矢量存储的 Spring Boot stater。
  • 用于数据工程的 ETL 框架

什么是Spring Cloud Alibaba AI

原始的Spring AI并没有国内相关大模型的接入,对国内开发者不太友好。

总的来说,Spring Cloud Alibaba AI 目前基于Spring AI 0.8.1版本 API 完成通义系列大模型的接入。

在当前最新版本中,Spring Cloud Alibaba AI 主要完成了几种常见生成式模型的适配,包括对话、文生图、文生语音等,开发者可以使用 Spring Cloud Alibaba AI 开发基于通义的聊天、图片或语音生成 AI 应用,框架还提供 OutParser、Prompt Template、Stuff 等实用能力。

Spring Cloud Alibaba AI官方还提供了包括聊天对话、文生图、文生语音等多种应用的开发示例,具体可以前往官网查看:快速开始 | https://sca.aliyun.com

动手体验Spring Cloud Alibaba AI

首先新建一个Maven项目,JDK选的是17版本。

Maven文件需要引入spring-cloud-alibaba-dependencies和spring-cloud-starter-alibaba-ai两个依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
xml复制代码<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2023.0.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-ai</artifactId>
</dependency>
</dependencies>

配置阿里云通义千问的Api-Key,没有的读者可以从官网上申请。

1
2
3
4
5
6
7
8
9
10
yml复制代码server:
port: 8080
spring:
application:
name: alibaba-spring-ai-demo

cloud:
ai:
tongyi:
api-key: 你的api-key

新建SpringBoot启动类:

1
2
3
4
5
6
java复制代码@SpringBootApplication
public class MyAiApplication {
public static void main(String[] args) {
SpringApplication.run(MyAiApplication.class,args);
}
}

对接文本模型

我们首先测试如何对接文本大模型。

新建一个控制器类:新建/simple接口,用来测试基本QA。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@RestController
@RequestMapping("/ai")
@CrossOrigin
public class TongYiController {
@Autowired
@Qualifier("tongYiSimpleServiceImpl")
private TongYiService tongYiSimpleService;

@GetMapping("/simple")
public String completion(
@RequestParam(value = "message", defaultValue = "AI时代下Java开发者该何去何从?")
String message
) {
return tongYiSimpleService.completion(message);
}
}

新建一个TongyiService服务类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public interface TongYiService {

/**
* 基本问答
*/
String completion(String message);
/**
* 文生图
*/
ImageResponse genImg(String imgPrompt);

/**
* 语音合成
*/
String genAudio(String text);

}

具体的实现类如下:由 Spring AI 自动注入 ChatClient、StreamingChatClient,ChatClient 屏蔽底层通义大模型交互细节,后者用于流式调用。

对于QA而言,仅仅通过client.call(prompt)一行代码就可以完成对模型的调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Service
@Slf4j
public class TongYiSimpleServiceImpl extends AbstractTongYiServiceImpl {
/**
* 自动注入ChatClient、StreamingChatClient,屏蔽模型调用细节
*/
private final ChatClient chatClient;

private final StreamingChatClient streamingChatClient;

@Autowired
public TongYiSimpleServiceImpl(ChatClient chatClient, StreamingChatClient streamingChatClient) {
this.chatClient = chatClient;
this.streamingChatClient = streamingChatClient;
}
/**
* 具体实现:
*/
@Override
public String completion(String message) {
Prompt prompt = new Prompt(new UserMessage(message));
return chatClient.call(prompt).getResult().getOutput().getContent();
}
}

我们发送一个请求,prompt是AI时代下Java开发者该何去何从?测试结果如下:

image.png

文生图模型

这里只给出service的代码,其它代码同上面的文本问答。

可以看到,只需要实例化一个imagePrompt,再调用模型即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Slf4j
@Service
public class TongYiImagesServiceImpl extends AbstractTongYiServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(TongYiService.class);
private final ImageClient imageClient;
@Autowired
public TongYiImagesServiceImpl(ImageClient client) {
this.imageClient = client;
}
@Override
public ImageResponse genImg(String imgPrompt) {
var prompt = new ImagePrompt(imgPrompt);
return imageClient.call(prompt);
}
}

测试的prompt是:Painting a boy coding in front of the desk, with his dog.,测试结果如下,效果还是很不错的:

img测试.jpg
img接口.jpg
语音合成模型


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Slf4j
@Service
public class TongYiAudioSimpleServiceImpl extends AbstractTongYiServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(TongYiService.class);
private final SpeechClient speechClient;
@Autowired
public TongYiAudioSimpleServiceImpl(SpeechClient client) {
this.speechClient = client;
}
@Override
public String genAudio(String text) {
logger.info("gen audio prompt is: {}", text);
var resWAV = speechClient.call(text);
// save的代码省略,就是将音频保存到本地而已
return save(resWAV, SpeechSynthesisAudioFormat.WAV.getValue());
}
}

测试结果也是成功的:

image.png

使用体验小结

不得不说,阿里在Java开发领域一直是走在国内的前沿的,我也期待阿里继续完善Spring Cloud Alibaba AI的相关功能,为我们这些国内Java开发者提供更加方便的开发工具。

本文仅仅简单测试了文本问答、文生图以及语音合成三个功能,(最后一个没列出来),Spring Cloud Alibaba AI还有很多丰富的功能,如流式调用、POJO转换、AI Role等功能,各位读者感兴趣可以自行前往官方example仓库查看。后续也我打算利用Spring Cloud Alibaba AI尝试构建一个RAG问答应用。

下面给出我的使用小结:

  1. 简化开发。个人开发者如果仅仅需要简答的问答接口,无需使用Spring AI,然而,当项目中需要开发比较复杂的AI功能,如果仅仅使用官方的SDK,写出的代码可能不太容易长期维护。
  2. 响应时间。接口响应时间还有很大的优化空间,可以看到基本的文本问答的响应就耗费了10s,不过这也取决于所处理任务的大小。
  3. 模型选择。之前使用SDK可以自己选择通义提供的各种模型,而使用Spring AI框架,暂时不知道如何选择其它模型进行调用,有知道的掘友也可以在评论区说一下。

未来,Spring Cloud Alibaba AI还将继续完成 VectorStore、Embedding、ETL Pipeline 等更多适配,简化 RAG 等更多 AI 应用开发场景。身为Java开发者,我也将继续关注Spring Cloud Alibaba 社区的最新动态。

参考

  1. Spring Cloud Alibaba AI 概述 | https://sca.aliyun.com
  2. Spring AI 抢先体验,5 分钟玩转 Java AI 应用开发 (mp.weixin.qq.com)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于Redission布隆过滤器-优化版

发表于 2024-04-26

概述:

原因:

juejin.cn/post/735501… (基于Redission布隆过滤器原理,优缺点及工具类和使用示例)文中简单介绍Redission布隆过滤器使用,这边基于此上再次给出优化版本,防止元素存在误差的场景:

优化:

优化理论:

为了进一步优化 RedissonBloomFilterUtil,我们可以考虑以下几点来增强其功能和鲁棒性:

  1. 配置管理:允许动态配置布隆过滤器的参数,如预期插入数量和误报率。
  2. 异常处理:增加异常处理机制来确保系统稳定运行,即使底层服务(缓存或数据库)出现问题。
  3. 日志记录:记录关键操作和异常,以便于问题追踪和性能监控。
  4. 重建和同步布隆过滤器:提供机制定期或根据需要重建布隆过滤器,以减少误报率。
  5. 分布式锁:在更新缓存和布隆过滤器时使用分布式锁,以防止并发更新导致的数据不一致。
  6. 回退策略:在缓存和数据库服务不可用时提供回退策略,以保证服务的持续可用性。

优化流程:

使用布隆过滤器的基本流程如下:

  1. 当请求到来时,首先查询布隆过滤器。
  2. 如果布隆过滤器认为数据不存在,则直接返回,不继续查询数据库。
  3. 如果布隆过滤器认为数据可能存在,继续查询缓存。
  4. 如果缓存中有数据,返回缓存数据。
  5. 如果缓存中没有数据,继续查询数据库。
  6. 如果数据库中有数据,将其放入缓存并返回。
  7. 如果数据库中也没有数据,可以选择更新布隆过滤器(如果布隆过滤器支持删除操作),或者记录这个不存在的查询,以便在布隆过滤器重建时排除这些误报。

优化实现:

以下是考虑了上述优化点的 RedissonBloomFilterUtil 类的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
java复制代码import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedissonBloomFilterUtil {

private static final Logger logger = LoggerFactory.getLogger(RedissonBloomFilterUtil.class);

private RBloomFilter<String> bloomFilter;
private RedissonClient redissonClient;
private CacheService cacheService; // 假设这是你的缓存服务
private DatabaseService databaseService; // 假设这是你的数据库服务

public RedissonBloomFilterUtil(RedissonClient redissonClient, CacheService cacheService, DatabaseService databaseService, long expectedInsertions, double falseProbability) {
this.redissonClient = redissonClient;
this.cacheService = cacheService;
this.databaseService = databaseService;
this.bloomFilter = redissonClient.getBloomFilter("bloomFilter");
this.bloomFilter.tryInit(expectedInsertions, falseProbability);
}

public Object getData(String key) {
try {
if (!bloomFilter.contains(key)) {
logger.debug("Bloom filter does not contain key: {}", key);
return null;
}

Object cachedData = cacheService.getFromCache(key);
if (cachedData != null) {
logger.debug("Returning data from cache for key: {}", key);
return cachedData;
}

Object databaseData = databaseService.getFromDatabase(key);
if (databaseData != null) {
cacheService.addToCache(key, databaseData);
logger.debug("Data added to cache for key: {}", key);
return databaseData;
}

logger.debug("Data not found in database for key: {}", key);
recordFalsePositive(key);
} catch (Exception e) {
logger.error("Error retrieving data for key: {}", key, e);
// Implement fallback strategy (e.g., retry logic, circuit breaker)
}
return null;
}

private void recordFalsePositive(String key) {
// Implement logic to record false positives
// This could involve logging or updating the bloom filter
logger.info("Recorded false positive for key: {}", key);
}

public void rebuildBloomFilter() {
// Implement logic to rebuild the bloom filter
// This might involve clearing the bloom filter and re-adding keys from a reliable source
logger.info("Rebuilding bloom filter");
}

// ... (shutdown method and other methods)
}

优化总结:

在这个实现中,我们添加了日志记录来帮助跟踪操作和捕获异常。我们还为 getData 方法添加了异常处理逻辑,以便在发生错误时记录错误并执行回退策略(例如重试逻辑或断路器模式)。
recordFalsePositive 方法现在记录了误报的信息,这有助于监控布隆过滤器的性能,并在必要时进行调整。

rebuildBloomFilter 方法是一个存根,你可以在这里实现布隆过滤器的重建逻辑。这可能包括从数据库或其他可靠源重新填充布隆过滤器,以确保其准确性。

扩展:

请注意,为了实现分布式锁和回退策略,可能需要更复杂的逻辑和额外的依赖。
可以使用 Redisson 提供的分布式锁功能,在更新缓存和布隆过滤器时保持数据一致性。
对于回退策略,你可能需要使用像 Hystrix 这样的库来实现断路器模式,或者自己实现重试逻辑。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

世界上最大的 API 中心:RapidAPI,RapidAP

发表于 2024-04-26

一、RapidAPI 介绍

RapidAPI 是一个 API 中心,使超过 300 万的开发人员能够查找、管理和连接 API。 RapidAPI 让开发人员可以从一个地方管理所有 API 集成,并提供实时性能指标。 RapidAPI 成立于 2014 年,总部位于加利福尼亚州旧金山。

RapidAPI 是由开发人员为开发人员制作的,因此他们可以在一个地方访问 API 和微服务,并更高效、更轻松地构建应用程序。 今天,RapidAPI 是世界上最大的 API 中心,近 300 万开发人员可以在其中查找、测试和连接数以万计的 API——所有 API 都使用一个帐户、一个 API 密钥和一个 SDK。

软件开发人员还可以使用 RapidAPI for Teams 共享和协作处理内部 API,RapidAPI for Teams 是一个用于发布内部 API 和共享公共 API 订阅的通用工作区。 此外,组织可以使用 RapidAPI Enterprise 创建集中式中心环境,以帮助开发人员更快地重用和连接到现有 API,同时为 IT 提供企业范围的 API 使用可见性和治理。

二、RapidAPI (R Software Inc.)产品百科

  • RapidAPI Hub:查找并连接到数千个 API 一个 SDK;一个 API 密钥;一个仪表板。 您的开发团队、合作伙伴和客户可以发现并连接到您的 API——所有这些都来自一个单一的下一代 API 平台。 RapidAPI 的 Enterprise Hub 可以定制以匹配您公司的品牌,与内部系统和工具无缝集成,支持您的所有 API,并且可以部署为基于云的服务、本地和跨多云环境。
  • RapidAPI for Teams:随着公司过渡到在其架构中使用微服务,整个组织中创建了更多 API。 因此,开发人员在开发新软件时找到这些内部 API 并重用它们变得更具挑战性。 当开发人员订阅外部 API 时,他们需要一个协作解决方案来在内部共享它们。 RapidAPI for Teams 使您能够发布、共享和连接到内部 API 以及外部 API 和微服务。 使用 RapidAPI for Teams,开发人员可以创建一个组织并邀请其他人从私有工作区共享内部和外部 API。
  • RapidAPI Enterprise Hub:API 已成为构建软件的重要工具——在整个组织中激增,并从数百个增加到数千个。 随着开发团队越来越多地使用更多 API,他们正在探索新的 API 类型,例如 GraphQL、Kafka 等。 使这一挑战更加复杂的是,大多数公司的服务在具有多个网关以及混合和多云场景的异构环境中运行。RapidAPI Enterprise Hub 通过与任何 API 环境集成的下一代平台来满足这一需求。
  • RapidAPI Testing:RapidAPI 测试是一种功能性 API 测试和监控解决方案,可提供直观的用户体验、对任何 API 类型的支持以及与 RapidAPI Hub 和 RapidAPI Enterprise Hub 的集成。 RapidAPI 测试使用户和企业能够:1)确保 API 功能 – 轻松创建复杂的功能测试,以对 API 进行深度验证;2)集中监控——监控和管理跨多个地区的 API 测试;3)提高效率——集成到 CI/CD 管道,跨团队协作,并与 RapidAPI Hub 和 RapidAPI Enterprise Hub 本地集成 。
  • RapidAPI Client:RapidAPI Design by Paw是一个功能齐全的 API 客户端,可加速 API 交付并改善开发人员体验。 它通过提供具有直观 UI 和卓越性能的 API 客户端来简化开发工作流程并简化 API 协作。

三、RapidAPI 如何付费呢?

这里需要开一张Fomepay的虚拟信用卡即可付费,我这里使用的是559666进行付费,点击获取虚拟卡

微信图片_20240108105643.png

卡信息在卡中心cvc卡密里面

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文搞定分布式ID-基于Redission和Snowflak

发表于 2024-04-26

基于Redission 分布式ID:

基于 Redisson 实现分布式 ID 生成通常涉及使用 Redis 的原子操作,如 INCR 或 INCRBY,来确保生成的 ID 是唯一的。
以下是一个简单的示例,展示如何使用 Redisson 来创建一个带有自定义前缀的分布式 ID 生成器。
使用 Maven

配置:

在 pom.xml 文件中添加以下依赖:

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.4</version>
</dependency>

然后,可以创建一个类来实现分布式 ID 生成器的逻辑:

注意点:

  1. 缓存前缀相关的 RAtomicLong 实例:为了避免每次生成 ID 时都获取一个新的 RAtomicLong 实例,我们可以缓存这些实例。
  2. 批量生成 ID:如果你的应用程序需要频繁生成 ID,可以一次性批量生成多个 ID,以减少与 Redis 服务器的通信次数。
  3. 添加异常处理:在与 Redis 交互时,可能会出现连接问题或其他异常,应该添加异常处理来确保系统的稳定性。
  4. 配置化:使 ID 生成器的某些方面(如键的名称前缀)可配置,以便在不同环境或不同情况下灵活使用。

简化版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码import org.redisson.api.RedissonClient;
import org.redisson.api.RAtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Component
public class DistributedIdGenerator {

private final RedissonClient redissonClient;
private final ConcurrentMap<String, RAtomicLong> atomicLongCache;
private static final String KEY_PREFIX = "id_generator_"; // 可配置化的键前缀
private static final int ID_BATCH_SIZE = 100; // 批量生成 ID 的大小
private long currentId = 0;
private long maxId = 0;

@Autowired
public DistributedIdGenerator(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.atomicLongCache = new ConcurrentHashMap<>();
}

public synchronized String generateId(String prefix) {
// 检查当前批次是否已用完
if (currentId >= maxId) {
// 从 Redis 获取下一个批次的起始 ID
RAtomicLong atomicLong = atomicLongCache.computeIfAbsent(prefix, k -> redissonClient.getAtomicLong(KEY_PREFIX + k));
currentId = atomicLong.addAndGet(ID_BATCH_SIZE);
maxId = currentId + ID_BATCH_SIZE;
}
// 返回自定义前缀和 ID 的组合
return prefix + "_" + (currentId++);
}
}

在上述代码中,generateId 方法现在是同步的,以确保在批量生成 ID 时线程安全。我们使用 currentId 和 maxId 变量来跟踪当前批次的 ID 范围。当当前批次的 ID 用完时,通过增加 ID_BATCH_SIZE 来从 Redis 获取下一个批次的起始 ID。

此外,我们使用 ConcurrentMap 来缓存 RAtomicLong 实例,以便我们可以重用它们,而不是每次调用 generateId 时都创建新的实例。

优化版:

针对以上代码,可以进一步优化:
考虑以下方面:

  1. 异步获取 ID:提供异步版本的 ID 获取方法,以避免在高并发环境中阻塞调用线程。
  2. 预取 ID 范围:在当前 ID 范围即将用完时,异步预取下一个 ID 范围,以减少生成 ID 时的等待时间。
  3. 优化锁的使用:减少锁的粒度,避免在高并发下的性能瓶颈。
  4. 监控和告警:加入监控机制,当 ID 的使用接近上限时发送告警。

以下是对 DistributedIdGenerator 类进行进一步优化的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
java复制代码import org.redisson.api.RedissonClient;
import org.redisson.api.RAtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

@Component
public class DistributedIdGenerator {

private final RedissonClient redissonClient;
private final ConcurrentMap<String, AtomicLong> currentIdMap;
private final ConcurrentMap<String, AtomicLong> maxIdMap;
private static final String KEY_PREFIX = "id_generator_";
private static final int ID_BATCH_SIZE = 100;
private static final int PREFETCH_THRESHOLD = 10; // 当剩余 ID 少于此值时,开始预取

@Autowired
public DistributedIdGenerator(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.currentIdMap = new ConcurrentHashMap<>();
this.maxIdMap = new ConcurrentHashMap<>();
}

public String generateId(String prefix) {
AtomicLong currentId = currentIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));
AtomicLong maxId = maxIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));

long id = currentId.getAndIncrement();
if (id >= maxId.get()) {
synchronized (maxId) {
if (id >= maxId.get()) {
RAtomicLong atomicLong = redissonClient.getAtomicLong(KEY_PREFIX + prefix);
long newMaxId = atomicLong.addAndGet(ID_BATCH_SIZE);
maxId.set(newMaxId);
currentId.set(newMaxId - ID_BATCH_SIZE + 1);
id = currentId.getAndIncrement();
}
}
}

// Prefetch next ID range if necessary
long remainingIds = maxId.get() - currentId.get();
if (remainingIds <= PREFETCH_THRESHOLD) {
CompletableFuture.runAsync(() -> fetchNextIdRange(prefix));
}

return prefix + "_" + id;
}
private void fetchNextIdRange(String prefix) {
AtomicLong maxId = maxIdMap.get(prefix);
synchronized (maxId) {
// Double-check in case another thread already fetched the range
if (currentIdMap.get(prefix).get() < maxId.get()) {
return;
}
RAtomicLong atomicLong = redissonClient.getAtomicLong(KEY_PREFIX + prefix);
long newMaxId = atomicLong.addAndGet(ID_BATCH_SIZE);
maxId.set(newMaxId);
}
}
}

在上述代码中,我们使用 ConcurrentMap 来维护当前 ID 和最大 ID 的原子性,同时减少了锁的粒度,只在需要更新最大 ID 时进行同步。此外,我们引入了 PREFETCH_THRESHOLD 来确定何时开始预取下一个 ID 范围,并使用 CompletableFuture 来异步预取,从而避免阻塞当前线程。

为了监控和告警,可以在适当的位置添加日志记录或集成监控系统,以跟踪 ID 的使用情况并在接近上限时发送告警。

请注意,这个优化版本假设 ID 的消耗速率不会在短时间内迅速耗尽当前批次的 ID。如果你的系统有这种可能性,可能需要进一步调整 PREFETCH_THRESHOLD 的值,或者实现更复杂的预取逻辑来确保 ID 的连续供应。

最终版:

最后分布式环境中给出最后代码实现:

在分布式环境中,进一步优化 DistributedIdGenerator 类可能涉及到以下几个方面:

  1. 分段锁:为了减少锁的争用,可以使用分段锁(Striped Locks)来减少同步块的争用。
  2. 本地缓存与预取结合:在本地缓存一定量的 ID 范围,并在接近耗尽时预取下一批次的 ID,以减少对 Redis 的请求频率。
  3. 扩展性和灵活性:考虑到不同的使用场景和需求,提供配置项以调整批次大小、预取阈值等参数。
  4. 故障恢复和重试机制:在与 Redis 交互时可能会遇到网络故障或其他异常,需要实现重试逻辑和故障恢复机制。
  5. 监控和告警:实现监控机制,监控 ID 生成的健康状况和性能指标,并在出现问题时及时告警。

以下是考虑到分布式环境的进一步优化代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
java复制代码import org.redisson.api.RedissonClient;
import org.redisson.api.RAtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

/**
* @Author derek_smart
* @Date 202/4/25 10:15
* @Description 分布式Id 最终版
* <p>
*/
@Component
public class DistributedIdGenerator {

private final RedissonClient redissonClient;
private final ConcurrentMap<String, AtomicLong> currentIdMap;
private final ConcurrentMap<String, AtomicLong> maxIdMap;
private static final String KEY_PREFIX = "id_generator_";

@Value("${id.generator.batch.size:100}")
private int idBatchSize;

@Value("${id.generator.prefetch.threshold:10}")
private int prefetchThreshold;

@Autowired
public DistributedIdGenerator(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.currentIdMap = new ConcurrentHashMap<>();
this.maxIdMap = new ConcurrentHashMap<>();
}

public String generateId(String prefix) {
AtomicLong currentId = currentIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));
AtomicLong maxId = maxIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));

long id = currentId.getAndIncrement();
if (id >= maxId.get()) {
synchronized (maxId) {
if (id >= maxId.get()) {
fetchNextIdRange(prefix, maxId);
id = currentId.getAndIncrement();
}
}
}

// Prefetch next ID range if necessary
long remainingIds = maxId.get() - currentId.get();
if (remainingIds <= prefetchThreshold) {
fetchNextIdRange(prefix, maxId);
}

return prefix + "_" + id;
}

private void fetchNextIdRange(String prefix, AtomicLong maxId) {
synchronized (maxId) {
// Double-check in case another thread already fetched the range
if (currentIdMap.get(prefix).get() < maxId.get()) {
return;
}
RAtomicLong atomicLong = redissonClient.getAtomicLong(KEY_PREFIX + prefix);
long newMaxId = atomicLong.addAndGet(idBatchSize);
maxId.set(newMaxId);
}
}
}

image.png

在上述代码中,我们通过 @Value 注解从配置文件中读取批次大小和预取阈值,使其更加灵活。同时,我们为 fetchNextIdRange 方法添加了一个额外的参数 maxId,以便在预取时直接操作 AtomicLong 对象,而不是每次都从 maxIdMap 中查询。

此外,为了应对分布式环境中的潜在问题,我们可以添加重试逻辑和故障恢复机制。例如,如果与 Redis 的交互失败,可以实现一个重试策略,重试几次后仍然失败则记录错误日志或触发告警。

监控和告警可以通过外部监控系统(如 Prometheus、Grafana)或内部监控机制(如 Spring Boot Actuator、Micrometer)来实现,以确保及时发现和解决问题。

最后,如果需要进一步提高性能,可以考虑使用 Redis 的 Lua 脚本来减少网络往返时间,并原子地执行 ID 范围的获取和更新操作。

为了使用这个 ID 生成器,你可以在你的服务中注入 DistributedIdGenerator 并调用 generateId 方法:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Service
public class SomeService {

@Autowired
private DistributedIdGenerator idGenerator;

public void doSomething() {
// 生成一个带有自定义前缀 "xx" 的分布式 ID
String distributedId = idGenerator.generateId("xx");
// 使用生成的 ID 进行业务操作
}
}

请注意,为了使上述示例工作,你需要配置和提供一个 RedissonClient 实例。这通常在你的 Spring 配置类中完成,如下所示:

1
2
3
4
5
6
7
8
9
10
java复制代码@Configuration
public class RedissonConfig {

@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}

确保 Redis 服务器正在运行,并且配置中的地址和端口与你的 Redis 服务器设置相匹配。以上代码提供了一些基本的分布式 ID 生成器,可以根据自己的业务需求进行调整和扩展。

SnowflakeId 分布式ID:

理论替换方案:

如果没有redis,可以考虑以下替代方案:

  1. UUID (Universally Unique Identifier):
    UUID 是一个很好的选择,因为它可以在本地生成,不需要网络通信,并且碰撞的概率极低。UUID 是 128 位的值,通常以 36 个字符(包括 4 个破折号)的字符串形式表示。
1
2
3
4
5
6
7
java复制代码 import java.util.UUID;

public class UUIDGenerator {
public static String generateUUID() {
return UUID.randomUUID().toString();
}
}
  1. Snowflake 算法:
    Twitter 的 Snowflake 算法是另一个流行的分布式 ID 生成方法。它生成一个 64 位的长整型 ID,其中包含时间戳、数据中心 ID、机器 ID 和序列号。为了使用 Snowflake 算法,你需要确保每个机器或服务实例有唯一的数据中心 ID 和机器 ID。

你可以使用现成的 Snowflake 实现,或者根据 Snowflake 的原理自己实现一个。
3. 数据库序列:
如果你的应用已经使用了关系数据库,你可以依赖数据库的序列功能来生成唯一 ID。大多数现代关系数据库都支持序列,虽然这种方法会引入数据库作为中心化依赖。
4. 分布式唯一 ID 生成器服务:
如果你的系统架构允许,可以部署一个独立的分布式 ID 生成器服务,如 Twitter 的 Snowflake、Sony 的 Flake 或者 Boundary 的 Flake。这些服务可以独立于你的主应用运行,并通过网络调用来生成 ID。
5. 组合策略:
在某些情况下,你可以组合使用以上方法。例如,使用机器 ID 或 IP 地址作为前缀,再加上本地生成的 UUID 或时间戳和随机数的组合。

SnowflakeId解决方案:

简化版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码
public class SnowflakeIdGenerator {

private final long twepoch = 1288834974657L;
private final long workerIdBits = 5L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;

private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

private long workerId;
private long datacenterId;
private long sequence = 0L;
private long lastTimestamp = -1L;

public SnowflakeIdGenerator(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("Worker ID can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("Datacenter ID can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}

public synchronized long nextId() {
long timestamp = timeGen();

if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}

if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}

lastTimestamp = timestamp;

return ((timestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}

private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}

private long timeGen() {
return System.currentTimeMillis();
}

public static void main(String[] args) {
SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(0, 0);
for (int i = 0; i < 1000; i++) {
long id = idGenerator.nextId();
System.out.println(id);
}
}
}

在这个实现中,我们创建了一个 SnowflakeIdGenerator 类,它需要两个参数:workerId 和 datacenterId。这两个参数用于在分布式环境中区分不同的工作节点和数据中心。

Snowflake 算法生成的是一个 64 位的长整型 ID,其中包含:

  • 1 位未使用(因为 Java 中的长整型是有符号的)
  • 41 位时间戳(毫秒级)
  • 5 位数据中心 ID
  • 5 位工作节点 ID

优化版:

SnowflakeIdGenerator 类以更好地处理分布式和高并发的情况,我们可以考虑以下几点:

  1. 确保时钟回拨问题的处理:在分布式系统中,时钟回拨是一个常见问题。我们需要确保在时钟回拨时,ID 生成器能够正常工作或者抛出异常来避免生成重复的 ID。
  2. 使用原子操作:使用 AtomicLong 来替代 synchronized 方法,以提高并发性能。
  3. 确保每个实例的唯一性:在分布式环境中,每个实例都应该有一个唯一的工作节点 ID 和数据中心 ID。这通常可以通过配置文件或环境变量来设置。

下面是优化后的 SnowflakeIdGenerator 类的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
java复制代码import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

/
* @Author derek_smart
* @Date 202/4/26 10:25
* @Description SnowflakeId id 生成器
* <p>
*/
@Component
public class SnowflakeIdGenerator {

private final long twepoch = 1288834974657L;
private final long workerIdBits = 5L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;

private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

private final long workerId;
private final long datacenterId;
private final AtomicLong sequence = new AtomicLong(0L);
private final AtomicLong lastTimestamp = new AtomicLong(-1L);

public SnowflakeIdGenerator(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("Worker ID can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("Datacenter ID can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}

//异步单个
public CompletableFuture<Long> nextIdAsync() {
return CompletableFuture.supplyAsync(this::nextId);
}
//批量
public List<Long> nextIds(int count) {
List<Long> ids = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
ids.add(nextId());
}
return ids;
}
//异步 多个
public CompletableFuture<List<Long>> nextIdsAsync(int count) {
return CompletableFuture.supplyAsync(() -> nextIds(count));
}

//获取
public long nextId() {
long currentTimestamp = -1L;
long lastTimestampValue = -1L;

while (true) {
lastTimestampValue = lastTimestamp.get();
currentTimestamp = timeGen();

if (currentTimestamp < lastTimestampValue) {
// Clock moved backwards, handle the exception
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestampValue - currentTimestamp));
}

if (currentTimestamp == lastTimestampValue) {
long currentSequence = sequence.incrementAndGet() & sequenceMask;
if (currentSequence == 0) {
currentTimestamp = waitNextMillis(currentTimestamp);
} else {
return generateId(currentTimestamp, currentSequence);
}
} else {
if (sequence.compareAndSet(sequence.get(), 0L)) {
return generateId(currentTimestamp, sequence.get());
}
}
}
}

private long generateId(long currentTimestamp, long sequence) {
return ((currentTimestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}

private long waitNextMillis(long currentTimestamp) {
while (currentTimestamp <= lastTimestamp.get()) {
currentTimestamp = timeGen();
}
lastTimestamp.set(currentTimestamp);
return currentTimestamp;
}

private long timeGen() {
return System.currentTimeMillis();
}
}

1714105622027.png

在这个优化版本中,我们使用 AtomicLong 替代了 synchronized 方法,以提高并发性能。nextId 方法现在是无锁的,使用原子操作来确保序列号的正确递增,并且在时钟回拨时抛出异常。
我们还添加了 waitNextMillis 方法来确保在同一毫秒内序列号用完时,能够等待到下一个毫秒值。generateId 方法是一个帮助方法,用于基于时间戳、数据中心 ID、工作节点 ID 和序列号生成 ID。

请注意,为了在分布式环境中使用此 ID 生成器,你需要确保每个实例都有唯一的工作节点 ID 和数据中心 ID,这通常可以通过配置文件或环境变量来设置。
此外,如果你的系统对时钟回拨非常敏感,可以添加额外的逻辑来处理这种情况,例如使用 NTP(Network Time Protocol)来同步服务器时间。

强健版:

最后我们可以实现批量生成和双缓冲策略及更加强健版本的BufferedSnowflakeIdGenerator

BufferedSnowflakeIdGenerator 并解决潜在的错误处理和资源管理问题,
我们可以使用 ExecutorService 来管理后台线程,以及添加额外的逻辑来确保在系统关闭时正确地关闭和清理这些线程。此外,我们将添加异常处理来确保在填充缓冲区时发生的任何异常都能被捕获和处理。以下是优化后的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
java复制代码import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/
* @Author derek_smart
* @Date 202/4/26 10:45
* @Description 双重缓存 SnowflakeId id 生成器
* <p>
*/
@Component
public class BufferedSnowflakeIdGenerator {

private final Lock bufferLock = new ReentrantLock();
private final SnowflakeIdGenerator snowflakeIdGenerator;
private AtomicReference<long[]> bufferOne;
private AtomicReference<long[]> bufferTwo;
private AtomicLong bufferIndex;
private AtomicReference<long[]> currentBuffer;
private final ExecutorService executorService;

private final int bufferSize;

private String customPrefix;

public BufferedSnowflakeIdGenerator(long workerId, long datacenterId, int bufferSize) {
this.snowflakeIdGenerator = new SnowflakeIdGenerator(workerId, datacenterId);
this.bufferSize = bufferSize;
this.bufferOne = new AtomicReference<>(new long[bufferSize]);
this.bufferTwo = new AtomicReference<>(new long[bufferSize]);
this.bufferIndex = new AtomicLong(-1);
this.currentBuffer = new AtomicReference<>(bufferOne.get());
this.executorService = Executors.newSingleThreadExecutor();

// Initial buffer fill
fillBuffer(bufferOne.get());
fillBuffer(bufferTwo.get());
}

public BufferedSnowflakeIdGenerator(long workerId, long datacenterId, int bufferSize, String customPrefix) {
this.snowflakeIdGenerator = new SnowflakeIdGenerator(workerId, datacenterId);
this.bufferSize = bufferSize;
this.bufferOne = new AtomicReference<>(new long[bufferSize]);
this.bufferTwo = new AtomicReference<>(new long[bufferSize]);
this.bufferIndex = new AtomicLong(-1);
this.currentBuffer = new AtomicReference<>(bufferOne.get());
this.executorService = Executors.newSingleThreadExecutor();
this.customPrefix = customPrefix;
// Initial buffer fill
fillBuffer(bufferOne.get());
fillBuffer(bufferTwo.get());
}


private void fillBuffer(long[] buffer) {
try {
for (int i = 0; i < bufferSize; i++) {
buffer[i] = snowflakeIdGenerator.nextId();
}
} catch (Exception e) {
// Handle exceptions during buffer fill here
e.printStackTrace();
}
}

/
* 带自定义
* @return
*/
public String nextIdWithPrefix() {
return customPrefix + "_" + nextId();
}

/*
* 不带自定义表示
* @return
*/
public long nextId() {
long index = bufferIndex.incrementAndGet();
if (index < bufferSize) {
return currentBuffer.get()[(int) index];
} else {
bufferLock.lock();
try {
// Double check inside the lock
index = bufferIndex.get();
if (index >= bufferSize) {
currentBuffer.set(currentBuffer.get() == bufferOne.get() ? bufferTwo.get() : bufferOne.get());
bufferIndex.set(0);
index = 0;
// Trigger buffer refill asynchronously
long[] bufferToFill = currentBuffer.get() == bufferOne.get() ? bufferTwo.get() : bufferOne.get();
executorService.submit(() -> fillBuffer(bufferToFill));
}
} finally {
bufferLock.unlock();
}
return currentBuffer.get()[(int) index];
}
}

public void shutdown() {
executorService.shutdown();
// Optionally add code to wait for termination and handle tasks that could not be executed
}

// Ensure to call shutdown when the application stops
protected void finalize() throws Throwable {
try {
shutdown();
} finally {
super.finalize();
}
}
}

1714105790485.png

BufferedSnowflakeIdGenerator包含两个缓冲区bufferOne和bufferTwo,以及一个指示当前使用的缓冲区的 currentBuffer。bufferIndex` 跟踪当前缓冲区中的位置。

nextId 方法检查 bufferIndex 是否在 bufferSize 范围内。如果是,它返回当前缓冲区中的 ID。如果 bufferIndex 超出范围,则尝试通过锁来切换缓冲区,并在另一个线程中异步填充已耗尽的缓冲区。

使用了 ExecutorService 来管理后台线程,这样我们就可以更优雅地处理线程的生命周期。fillBuffer 方法包含了异常处理逻辑,以便在填充缓冲区时捕获并处理任何异常。

我们还添加了 shutdown 方法来关闭 ExecutorService。这个方法应该在应用程序停止时被调用,以确保所有后台任务都被正确清理。为了防止忘记调用 shutdown 方法,我们在类的 finalize 方法中也调用了 shutdown,这样即使忘记了显式调用,垃圾收集器在回收对象时也会尝试关闭线程池。

请注意,依赖 finalize 方法来关闭资源并不是一种推荐的做法,因为 finalize 方法的调用时机是不确.

关于上面的各类id 生成,可以采用工厂模式来创建 ID 生成器的实例,并且使用单例模式确保每个节点上 ID 生成器的唯一性。同时,我们可以使用策略模式来允许在不同的 ID 生成策略(如 Snowflake、UUID 等)之间灵活切换。

扩展设计模式:

上面都是各种实现类,完全可以使用工厂设计模式将其统一处理。只是一个简化版。

以下是结合设计模式的分布式 ID 生成器的代码实现:
首先,定义一个 ID 生成策略接口:

1
2
3
java复制代码public interface IdGenerationStrategy {
long generateId();
}

然后,实现 Snowflake 算法作为一个策略:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class SnowflakeStrategy implements IdGenerationStrategy {

// ... (其他成员变量和构造函数保持不变)

@Override
public long generateId() {
return nextId();
}

// ... (nextId, waitNextMillis, timeGen, generateId 方法保持不变)
}

接下来,如果需要,也可以实现一个基于 UUID 的策略:

1
2
3
4
5
6
7
8
9
10
11
java复制代码import java.util.UUID;

public class UUIDStrategy implements IdGenerationStrategy {

@Override
public long generateId() {
// UUIDs are 128-bit values, but we need to return a long (64-bit)
// This can be done by taking the hash of the UUID for example
return UUID.randomUUID().toString().hashCode();
}
}

现在,创建一个 ID 生成器工厂,它根据配置或环境来实例化相应的策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class IdGeneratorFactory {

private static IdGenerationStrategy idGeneratorStrategy;

public static synchronized IdGenerationStrategy getIdGeneratorStrategy() {
if (idGeneratorStrategy == null) {
// The choice of strategy can be made configurable
// For example, based on system properties, configuration files, etc.
String strategyType = System.getProperty("id.generator.strategy", "snowflake");
if ("snowflake".equals(strategyType)) {
long workerId = ...; // Retrieve from configuration
long datacenterId = ...; // Retrieve from configuration
idGeneratorStrategy = new SnowflakeStrategy(workerId, datacenterId);
} else if ("uuid".equals(strategyType)) {
idGeneratorStrategy = new UUIDStrategy();
} else {
throw new IllegalArgumentException("Unknown ID Generator strategy type");
}
}
return idGeneratorStrategy;
}
}

最后,客户端代码可以通过工厂获取 ID 生成器实例,并生成 ID:

1
2
3
4
5
6
7
8
java复制代码public class IdGenerationClient {

public static void main(String[] args) {
IdGenerationStrategy idGenerator = IdGeneratorFactory.getIdGeneratorStrategy();
long id = idGenerator.generateId();
System.out.println("Generated ID: " + id);
}
}

在这个设计中,IdGeneratorFactory 确保了在每个节点上 ID 生成器的唯一性,并且提供了一个全局访问点。IdGenerationStrategy 接口和它的实现类 SnowflakeStrategy 以及 UUIDStrategy 允许在不同的 ID 生成策略之间灵活切换。

这种设计模式的组合提供了良好的扩展性和可维护性,使得添加新的 ID 生成策略变得非常简单,只需要实现 IdGenerationStrategy 接口即可。同时,它也支持分布式环境,因为可以通过配置为不同的节点分配不同的工作节点 ID 和数据中心 ID,从而使得在 Snowflake 策略中生成的 ID 具有全局唯一性。
以上就是分布式id 实现。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Oracle 数据迁移同步优化(三)

发表于 2024-04-26

简述

CloudCanal 最近再次对其 Oracle 源端数据同步进行了一系列优化,这些优化基于用户在真实场景中的反馈,具备很强的生产级别参考意义。

本文将简要介绍这些优化项,希望带给读者一些收获。

  • 增量事件 SCN 乱序问题
  • MISSING_SCN 事件干扰
  • 新增的归档日志消费模式

优化点

增量事件 SCN 乱序问题

Oracle 源端 Logminer 数据同步原理大致如下:

  • 获取所有包含当前 SCN 位点的 Redo 或 Archive 日志文件,并添加到 Logminer 中
  • 计算本次需要分析的 SCN 范围(START_SCN, END_SCN)
  • Logminer 对于 SCN 范围进行日志分析,分析结果展现在 V$LOGMNR_CONTENTS 视图中
  • 扫描 V$LOGMNR_CONTENTS 视图,转换处理后同步到目标端

image.png

老版本 CloudCanal 扫描 V$LOGMNR_CONTENTS 视图时指定了 SCN 范围进行查询,但在实际客户场景中偶发 SCN 乱序问题。

同时 Oracle 官方也建议查询视图时不要进行过多的范围过滤或排序处理,以避免查询结果乱序。

因此我们首先 进行了 2 个优化 ,以此解决该问题:

  • 扫描 V$LOGMNR_CONTENTS 视图时直接查询所有记录,其 SCN 范围完全依赖于 Logminer 所指定的文件
  • 设定 Logminer 分析的步长参数(logMiningScnStep)控制分析性能

MISSING_SCN 事件干扰

使用 Logminer 分析 Redo 日志时,有时会出现 MISSING_SCN 事件,老版本 CloudCanal 遇到该事件则会忽略,但这会导致事件漏扫从而丢数据。

MISSING_SCN 事件具体意义为

  • Logminer 分析 Redo 日志时,由于日志切换或其他特殊情况,导致部分 SCN 事件没有被 Logminer 分析到,因此在 V$LOGMNR_CONTENTS 视图中体现为
    MISSING_SCN。

因此我们做了 第 3 个优化,当遇到 MISSING_SCN 事件时采取一定的策略规避漏扫问题,具体动作为:

  • 停止扫描,回退当前 SCN
  • 根据当前 SCN 重新分析和消费日志文件

image.png

重新分析后,缺失的 SCN 记录会被 Logminer 分析到,并且此类型事件出现频率较小,因此对同步效率影响非常小。

归档日志消费模式

Logminer 分析 Redo 日志时,如果 END_SCN 与最新 SCN 接近,可能会导致部分 SCN 无法被 Logminer 分析,从而出现数据丢失。

这种情况难以避免,因为很难在 Logminer 层面确定是否有 SCN 被漏掉。

CloudCanal 老版本通过设置 fallBackScnStep 参数与最新的 SCN 保持一定距离,这种做法虽牺牲了一部分实时性,但换取了数据的准确性,而该方式和 只消费归档日志模式 有一定的相似性。

归档日志不会再发生变化,从而能够保证 Logminer 分析的准确性,对于不太注重实时性的业务(比如日报),这是一个可接受的方式(增量同步的好处不光只是实时性)。

CloudCanal 第 4 个优化 即增加了只消费归档日志模式(参数:archiveLogOnlyMode)。

在该模式下, 同步任务会根据 Archive 日志文件 + SCN 双位点 的方式,以 Archive 生成的时间顺序逐个消费,这样可以保证不漏扫任何一个 Archive 文件。

image.png

未来展望

优化性能

本次优化侧重于数据的准确性,优化了 SCN 乱序问题、MISSING_SCN 问题,但部分高并发场景回退 SCN 可能会导致性能下降。

所以优化性能是后续 CloudCanal Oracle 数据同步重要的一个方向。

数据订正能力

Oracle 部署形态多样,用户场景不一,数据类型复杂,在做足事前防范工作之后,事后如何补救也是非常重要的能力。

借助 CloudCanal 数据校验订正体系,后续丰富和优化 Oracle 源端数据校验和订正能力是一个重要的工作。

总结

本篇文章主要介绍 CloudCanal 对于 Oracle 源端数据同步的深度优化,希望对读者有所帮助。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 21 虚拟线程使用与最佳实现 前言 创建虚拟线程的

发表于 2024-04-26

前言

虚拟线程(Virtual Thread)(Go语言里叫协程),是Java 19引入的一种轻量级线程,在Java21 转正。

在理解虚拟线程前,我们先回顾一下线程的特点:

  • 线程是由操作系统创建并调度的资源;
  • 线程切换会耗费大量CPU时间;
  • 一个系统能同时调度的线程数量是有限的,通常在几百至几千级别。

因此,我们说线程是一种重量级资源,我们为了增加系统的吞吐量,要不断增加线程的数量,但机器的线程是昂贵的、可用线程数量也是有限的。即使我们使用了各种线程池来最大化线程的性价比,但是线程往往会在 CPU、网络或者内存资源耗尽之前成为我们应用程序的性能提升瓶颈,不能最大限度的释放硬件应该具有的性能。

虚拟线程就是为了解决以上问题,最大限度释放硬件性能,但虚拟线程最适合具有高延迟的任务,例如 I/O 操作、等待锁或线程将花费大量时间等待的任何其他操作,而需要连续计算的CPU密集型场景,并不适合虚拟线程。

以下是平台线程与虚拟线程的关系图
image.png

创建虚拟线程的方式

三种方式效果均一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码/**
* 方式1:传入Runnable实例并立刻运行
*/
@Test
public void test1() {
// 方式1:传入Runnable实例并立刻运行:
Thread vt = Thread.startVirtualThread(() -> {
System.out.println("Start virtual thread...");
});

//这样也行
Thread.ofVirtual().start(() -> {
System.out.println("Start virtual thread...");
});
}

/**
* 方式2:传入Runnable实例并后置运行
*/
@Test
public void test2() {
Thread vt = Thread.ofVirtual().unstarted(() -> {
System.out.println("Start virtual thread...");
});
vt.start();
}

/**
* 方式3:创建ThreadFactory
*/
@Test
public void test3() {
ThreadFactory tf = Thread.ofVirtual().factory();
Thread vt = tf.newThread(() -> {
System.out.println("Start virtual thread...");
});
vt.start();
}


/**
* 方式4:使用ExecutorService (但是每次submit仍然是新创建虚拟线程 java.util.concurrent.ThreadPerTaskExecutor)
*/
@Test
public void test4() {
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

executor.submit(() -> {
System.out.println("Start virtual thread...");
return true;
});
}

使用虚拟线时还需要线程池吗

答案是,不需要,使用虚拟线程时,每次都创建新的即可,以下是官方文档描述

Do not pool virtual threads

Developers will typically migrate application code to the virtual-thread-per-task ExecutorService from a traditional thread-pool based ExecutorService. A thread pool, like any resource pool, is intended to share expensive resources, but virtual threads are not expensive so there is never a need to pool them.

Developers sometimes use thread pools to limit concurrent access to limited resources. For example, if a service cannot handle more than 20 concurrent requests then making all requests to the service via tasks submitted to a thread pool of size 20 will ensure that. This idiom has become ubiquitous because the high cost of platform threads has made thread pools ubiquitous, but do not be tempted to pool virtual threads in order to limit concurrency. Instead use constructs specifically designed for that purpose, such as semaphores.

In conjunction with thread pools, developers sometimes use thread-local variables to share expensive resources among multiple tasks that share the same thread. For example, if a database connection is expensive to create then you can open it once and store it in a thread-local variable for later use by other tasks in the same thread. If you migrate code from using a thread pool to using a virtual thread per task, be wary of usages of this idiom since creating an expensive resource for every virtual thread may degrade performance significantly. Change such code to use alternative caching strategies so that expensive resources can be shared efficiently among a very large number of virtual threads.

这里有一篇更详细的例子说明:zhuanlan.zhihu.com/p/671154148

最佳实现

  1. 若IO密集型的项目中使用到CompletableFuture,可以直接将自定义线程池替换成Executors.newVirtualThreadPerTaskExecutor()
  2. 使用虚拟线程时,synchronized 改为 ReentrantLock,以减少虚拟线程被固定到平台线程
  3. 使用虚拟线程时,不需要池化
  4. 虚拟线程池适合IO密集型应用,CPU密集型还是需要用平台线程池
  5. 结合java21的结构化并发写法,代码更具可读性,StructuredTaskScope底层用的就是虚拟线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码try (var scope = new StructuredTaskScope<>()) {
// 使用fork方法派生线程来执行子任务
StructuredTaskScope.Subtask<String> future1 = scope.fork(() -> "111");
StructuredTaskScope.Subtask<Integer> future2 = scope.fork(() -> 222);

// 等待线程完成
scope.join();
// 结果的处理可能包括处理或重新抛出异常

System.out.println(future1.get());
System.out.println(future2.get());

} // close
catch (InterruptedException e) {
throw new RuntimeException(e);
}

参考文章:
mp.weixin.qq.com/s/yyApBXxpX…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis成长记 - Redis的陷阱(一) 缓存穿透 缓存

发表于 2024-04-26

相信很多老铁在求职过程中都看到过类似下面这样的任职要求

image.png

你申请的岗位上面写着”熟悉Redis”,那么你已经准备好回答面试官可能会问到的问题了么?
后面我将开启一个针对Redis的系列分享,希望能帮助刚刚开始学习Redis的朋友们。

在开始阅读本篇文章之前,默认你已经具备基础的Redis知识,如果你没有,可以先阅读文末相关文章推荐

当使用 Redis 作为缓存或数据存储时,虽然它提供了高性能和灵活性,但也存在一些陷阱需要注意。之前看博客的时候看到过这样一句话”Experts aren’t the only people who know what to do. They’re the people who know what not to do.“ 专家并不是唯一知道如何做的人,他们只是知道如何避免一些陷阱。

下面讲诉的是一些常见的 Redis 陷阱,或者说容易忽略的问题。内容较多,可能会分多篇文章,尽情期待。
同时由于要讲的内容实在是太多,所以本文更多的只是起到”抛砖“的作用,更多的详细的内容还需要老铁们自己再深层次的去学习。

缓存穿透

缓存穿透指的是恶意请求或者大量不存在的 key 导致缓存无法命中,从而绕过缓存直接访问数据库,导致数据库压力过大,甚至宕机的情况。

image.png

缓存穿透的原因

缓存穿透通常发生在以下情况下:

  1. 恶意请求:攻击者发送大量不存在于缓存中的 key,导致缓存无法命中,直接访问数据库。
  2. 大量并发查询:当并发查询量很大时,可能会出现大量不存在于缓存中的 key,从而导致缓存穿透。

缓存穿透的影响

  • 数据库压力过大:大量无效请求直接访问数据库,导致数据库压力过大,甚至导致数据库宕机。
  • 系统性能下降:数据库压力增大,可能导致系统响应变慢,影响用户体验。

缓存穿透的解决方法

  1. 空对象缓存:当查询结果为空时,也将该空结果缓存起来,但设置一个较短的过期时间,防止攻击者利用缓存穿透问题。
  2. 布隆过滤器:在缓存层之前增加布隆过滤器,用于快速过滤掉不存在于缓存中的 key,从而避免缓存穿透。
  3. 热点数据预热:将热点数据提前加载到缓存中,提高命中率,减少缓存穿透的发生。
  4. 限流控制:对于频繁查询的接口,可以进行限流控制,防止攻击者发起大量无效请求。
  5. 使用缓存锁:在查询数据库时,使用缓存锁进行串行化处理,防止大量并发查询导致缓存穿透。

下面是一个使用 C# 空对象缓存的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
C#复制代码public class UserBll
{
public static readonly int CACHE_NULL_TTL = 10;
public static readonly int CACHE_TTL = 20;
/**
* 缓存穿透*
* @param id
* @ return
*/
public WebUserInfo QueryUser(string key)
{
var redis = Redis.GetDatabase(0);
// 1.从redis中查询store缓存
String value = redis.StringGet(key);
WebUserInfo user = null;
// 2.判断是否存在
if( value.HasValue() ) {
user = value.FromJson<WebUserInfo>();
// 3.存在,直接返回
return user;
}
// 判断命中是否为空值
if ( value == null) {
// 返回错误信息
return null;
}
// 4.不存在,根据id查询数据库
user = DatabaseQuery(key);
// 5. 不存在,返回错误
if( user == null ) {
// 向redis写入空值(缓存穿透)
redis.StringSet(key, "", new TimeSpan(0, CACHE_NULL_TTL, 0));
return null;
}
// 6.存在,写入redis
redis.StringSet(key, user.ToJson(), new TimeSpan(0, CACHE_TTL, 0));
// 7. 返回
return user;
}


public WebUserInfo DatabaseQuery(string key)
{
// 模拟从数据库中查询数据
// 实际情况下,这里可以是访问数据库、调用外部 API 等操作
// 这里简化为返回 null
return null;
}


}

class Program
{
static void Main(string[] args)
{
UserBll cache = new UserBll();

// 第一次查询,缓存中不存在,但是查询结果为空
string result1 = cache.QueryUser("key1");
Console.WriteLine("Result 1: " + result1); // Output: Result 1: (null)

// 第二次查询,缓存中已存在空对象缓存,直接返回空值
string result2 = cache.QueryUser("key1");
Console.WriteLine("Result 2: " + result2); // Output: Result 2: (null)

// 第三次查询,模拟数据库中存在对应值的情况
string result3 = cache.QueryUser("key2");
Console.WriteLine("Result 3: " + result3); // Output: Result 3: <value from database>
}
}

上述代码中,当用户请求一个key时,redis和数据库都不存在。我们直接将key对应的null值缓存到redis中,这样下次用户重复请求这个key的时候,redis就可以命中(hit null),只是不会询问数据库

  • 优点:实现简单,易于维护
  • 缺点:额外的内存消耗(可以通过添加TTL来解决)

同时可能会造成短暂的不一致(控制TTL时间可以在一定程度上缓解)。当null被缓存时,我们只是在数据库中设置值,而用户query为空,但数据库中实际存在,会造成不一致(可以通过插入数据时自动覆盖之前的空数据来解决)

缓存雪崩

缓存雪崩指的是在缓存失效的瞬间,大量的请求同时涌入数据库或其他数据源,导致数据库负载剧增,甚至造成数据库宕机的情况。

image.png

缓存雪崩的原因

缓存雪崩通常是由于缓存中的大量数据同时失效而引起的。当多个缓存键具有相同的失效时间,并且这些缓存键又在同一时间失效时,就会导致大量请求直接击穿缓存,同时涌入数据源,造成缓存雪崩

缓存雪崩的解决方案

1. 设置随机过期时间

通过给缓存键设置随机的过期时间,可以有效地分散缓存失效的时间点,降低大量缓存同时失效的可能性,从而减轻了缓存雪崩的风险。

2. 使用多级缓存策略

采用多级缓存架构,包括本地缓存、分布式缓存和持久化存储,当主缓存失效时,可以从备用缓存中获取数据,降低对数据库的直接访问。

3. 限流和降级

在缓存失效期间,可以对请求进行限流,降低请求的并发数量,从而减轻了数据库的压力。同时,可以对部分非关键请求进行降级处理,暂时屏蔽一些非必要的服务,保证核心服务的稳定性。

4. 预热缓存

在系统启动或低峰期,预先加载缓存数据,提前将常用数据缓存起来,避免在高峰期间大量请求直接击穿缓存。

缓存击穿

缓存击穿是指某个热点key突然失效或者未命中,导致大量请求直接访问数据库,造成数据库压力剧增的现象。这种情况通常发生在具有高并发访问量的系统中,特别是在缓存系统中使用了较短的过期时间或者热点数据的访问频率突然增加时。

image.png

  1. 设置热点数据永不过期: 对于一些热点数据,可以设置永不过期,或者设置较长的过期时间,保证其不会在短时间内失效,从而避免了缓存击穿的发生。
  2. 加锁机制: 在缓存失效时,可以通过加锁机制确保只有一个线程能够进入数据库查询数据,并将查询结果更新到缓存中,避免了多个线程同时查询数据库的情况。
  3. 使用互斥锁和分布式锁: 使用互斥锁或者分布式锁来保证在查询数据库的过程中,只有一个线程能够执行查询操作,其他线程需要等待锁释放后再进行查询,避免了并发访问数据库的情况。
  4. 使用缓存预热: 在系统启动或者低峰期,可以预先将热点数据加载到缓存中,提前减少了缓存失效时的并发请求量,从而避免了缓存击穿的发生。

下面是一个用C#实现的示例代码,演示了如何使用互斥锁来解决缓存击穿问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
C#复制代码using System;
using System.Collections.Generic;
using System.Threading;

public class Cache
{
private Dictionary<string, string> cache = new Dictionary<string, string>();
private Mutex mutex = new Mutex();

public string Get(string key)
{
// 先尝试从缓存中获取数据
string value;
if (cache.TryGetValue(key, out value))
{
return value;
}

// 如果缓存中不存在,加锁查询数据库
mutex.WaitOne();
try
{
// 再次检查缓存,防止多个线程同时查询数据库
if (cache.TryGetValue(key, out value))
{
return value;
}

// 模拟从数据库中查询数据
value = QueryFromDatabase(key);

// 将查询结果更新到缓存中
cache[key] = value;
}
finally
{
mutex.ReleaseMutex();
}

return value;
}

private string QueryFromDatabase(string key)
{
// 模拟从数据库中查询数据的过程
Thread.Sleep(100); // 模拟耗时查询操作
return "value for " + key;
}
}

class Program
{
static void Main(string[] args)
{
Cache cache = new Cache();

// 并发查询
List<Thread> threads = new List<Thread>();
for (int i = 0; i < 10; i++)
{
Thread thread = new Thread(() =>
{
string value = cache.Get("key");
Console.WriteLine(Thread.CurrentThread.Name + ": " + value);
});
thread.Name = "Thread " + i;
threads.Add(thread);
}

foreach (Thread thread in threads)
{
thread.Start();
}

foreach (Thread thread in threads)
{
thread.Join();
}
}
}

相关文章推荐

  • Redis官方文档
  • Redis教程

今天就不总结了,未完待续😪…

更多一手讯息,可关注公众号:ITProHub

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

java多线程面试——新版api

发表于 2024-04-26

在Android开发的面试中,Java多线程的问题是绕不开的。这个系列主要介绍面试过程中涉及到的多线程的知识点,以及相关的面试题。这是本系列的第三篇,介绍Java中多线程的新版api,及其对应的常见的面试题。

  • Java多线程面试系列——为什么需要多线程 - 掘金 (juejin.cn)
  • java多线程面试——旧版api - 掘金 (juejin.cn)
  • java多线程面试——新版api - 掘金 (juejin.cn)

ReteenLock

ReteenLock 是 java 1.5 以后提出的加锁API。使用它加锁解锁非常简单,只需要调用 lock、 unlock
方法

1
2
3
4
5
java复制代码private Lock lock = new ReentrantLock();
lock.lock();//加锁
//执行一些操作
...
lock.unlock();//解锁

除此之外,ReteenLock 还提供了 lockInterruptibly 和 tryLock 的不同的加锁接口,对加锁操作进行更细致的控制。

  • lockInterruptibly

lockInterruptibly 可以中断等待锁的线程。当线程调用 lockInterruptibly ,没有获取锁时,线程处于等待锁的状态,这时就可以通过 interrupt 方法来中断线程的等待状态。注意使用 lock 方法、synchronized 关键字是不能中断等待或者阻塞状态的。

  • tryLock
1
2
3
4
java复制代码//直接尝试获取锁,没有获取则直接返回
boolean tryLock();
//在一定时间尝试获取锁,否则等待获取锁,超时会返回 false
boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;

tryLock 来尝试获取锁,如果没有获取到锁会返回 false。我们也可以设置规定时间内等待获取锁。

1
2
3
4
5
6
7
8
csharp复制代码Lock lock = new ReentrantLock();  
if(lock.tryLock()) {
//执行一些操作
...
lock.unlock();
} else {
//未获取锁的操作
}

await 、signal 和 signalAll

1
2
3
4
5
6
7
java复制代码Condition condition = lock.newCondition();  
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
condition.signalAll();

在新版java多线程的api中,我们可以通过 Condition 的 await 和 signal 方法来等待和唤醒线程。await 和 wait 对应,signal 和 notify 对应。调用 await 方法会让线程进入等待状态(WAITING),同时释放掉锁。如果需要唤醒线程,需要调用 signal 或者 signalAll 方法。signal 方法是随机唤醒一个线程,而 signalAll 方法是会唤醒所有等待线程。

面试题:Synchronized的原理以及与ReentrantLock的区别。

  • ReenTrantLock 可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。
  • ReenTrantLock 提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
  • ReenTrantLock 通过 lock.lockInterruptibly() 提供了一种能够中断等待锁的线程的机制

ReentrantReadWriteLock 和 StampedLock

ReentrantReadWriteLock 是读写锁。解决的场景是:读多写少。当没有写操作时,多线程读取数据,此时加锁会影响并发的性能。这时就需要读写锁,它有两个特点:

  1. 当线程写操作时,其他线程不能读取
  2. 当线程读操作时,允许其他线程读操作,但是不能写操作

代码示例如下:

1
2
3
java复制代码ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock(); //获取读锁
Lock writeLock = readWriteLock.writeLock();//获取写锁

StampedLock 和 ReentrantReadWriteLock 类似。它特殊的地方是,它可以乐观读,即当线程读取共享变量时,其他线程可以写共享变量。而这在 ReentrantReadWriteLock 中是不被允许的。

代码示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码/*
*使用StampedLock读操作模板
*/
final StampedLock sl = new StampedLock();
// 乐观读 ,获取 stamp 版本号
long stamp = sl.tryOptimisticRead();
// 读取共享变量,并用局部变量保存
......
// 校验 stamp 版本号
if (!sl.validate(stamp)){
// 如果失败,则升级为悲观读锁
stamp = sl.readLock();
try {
// 再次读取共享变量,并用局部变量保存
.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 执行业务操作
......



/*
*使用StampedLock写操作模板
*/
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}

StampedLock 虽然通过共享读提升了读多写少场景的性能,但是也提高了它的复杂度。需要注意的是:StampedLock 是不可重入的

Semaphore

Semaphore 是一个限制器,可以限制访问资源的线程数量。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码//限制只有四个线程可以访问
//第5个线程会被阻塞,直到其他线程release
Semaphore semaphore = new Semaphore(4);
try {
semaphore.acquire();
//执行操作
...
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
}

CountDownLatch 和 CyclicBarrier

CountDownLatch 和 CyclicBarrier 都可以解决多线程并发依赖的问题。它们的主要区别是:CountDownLatch 主要用来 解决一个线程等待多个线程的场景;而CyclicBarrier 解决的是一组线程之间互相等待的场景。

CountDownLatch

如上图所示,当我们需要判断当前两个用户是否是好友时,线程3需要依赖线程1和线程2的结果,才可以执行。这时就可以使用 CountDownLatch,代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
scss复制代码final CountDownLatch countDownLatch = new CountDownLatch(2);  
Runnable runnable1 = new Runnable() {
@Override
public void run() {
//获取用户的id
...
//计数减一
countDownLatch.countDown();
}
};
Runnable runnable2 = new Runnable() {
@Override
public void run() {
//获取其他用户的id
...
//计数减一
countDownLatch.countDown();
}
};
Runnable runnable3 = new Runnable() {
@Override
public void run() {
try {
//等待其他线程执行完成
countDownLatch.await();
//判断两个人是否是好友
...
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
new Thread(runnable1).start();
new Thread(runnable2).start();
new Thread(runnable3).start();

CyclicBarrier

如上图所示,CyclicBarrier 解决的是一组线程之间互相等待的场景,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码final CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {  
@Override
public void run() {
//计数归0时的回调
...
}
});
Runnable runnable1 = new Runnable() {
@Override
public void run() {
while (是否满足退出条件) {
try {
//执行操作
...
//阻塞线程,计数减少1
cyclicBarrier.await();
//执行操作
...
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
};

Runnable runnable2 = new Runnable() {
@Override
public void run() {
while (是否满足退出条件) {
try {
//执行操作
...
//阻塞线程,计数减少1
cyclicBarrier.await();
//执行操作
...
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
};
new Thread(runnable1).start();
new Thread(runnable2).start();

CyclicBarrier 类是通过 await 方法来让计数减一,同时会阻塞当前线程。通过这种方式,让不同线程步调保持一致,以此来实现一组线程之间的互相等待。

需要注意,CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。但是 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。

原子类

原子类.jpg

如上图,是在java 1.5 以后新增了原子类,这些类可以分成五种类型:基本类型、数组类型、对象引用类型、对象属性类型、累加器类型。

基本类型

基本类型的原子类有三个,分别是:AtomicBoolean、AtomicInteger、AtomicLong。它们的方法都是类似的,这里以 AtomicInteger 为例。AtomicInteger 对基本数据类型的 自增(加1操作),自减(减1操作)、以及加法操作(加一个数),减法操作(减一个数)进行了封装,保证这些操作是原子性操作。

AtomicInteger的方法 作用
int get() 获取当前值
void set(int newValue) 设置 value 值
int getAndIncrement() 先取得值,然后加1
int getAndDecrement() 先取得值,然后减1
int incrementAndGet() 加1,然后返回新值
int decrementAndGet() 减1,然后返回新值
int getAndAdd(int delta) 先取得值,然后增加指定值
int addAndGet(int delta) 增加指定值,然后返回新值
boolean compareAndSet(int expect, int update) 将旧值设置成新值(先要获取当前值)

数组类型

数组类型的原子类有三个,分别是:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。这里以 AtomicIntegerArray 为例,它的方法与上面的方法类似,只是多了index 的参数。

AtomicIntegerArray的方法 作用
int get(int index) 获取数组index位置的值
void set(int index, int newValue) 设置数组index位置的value 值
int getAndIncrement(int index) 获取数组index位置的值,然后加1
int getAndDecrement(int index) 数组index位置的值,然后减1
int incrementAndGet(int index) 让数组index位置的值加1,然后返回新值
int decrementAndGet(int index) 让数组index位置的值减1,然后返回新值
int getAndAdd(int index, int delta) 先获取数组index位置的值,然后增加指定值
int addAndGet(int index, int delta) 让数组index位置的值增加指定值,然后返回新值
boolean compareAndSet(int index, int expect, int update) 让数组index位置的值设置成新值(先要获取当前值)

代码示例如下:

1
2
3
4
java复制代码int[] v = new int[]{1, 2, 3};  
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(v);
int value = atomicIntegerArray.get(0);
System.out.println(atomicIntegerArray.compareAndSet(0, value, 100));

对象引用类型

对象引用类型的原子类有三个,分别是 AtomicReference、AtomicStampedReference、AtomicMarkableReference。AtomicReference 的方法相对于上面就少了很多,但是大致的功能是一样的。

AtomicReference的方法 作用
int get() 获取当前对象值
void set(T newValue) 设置对象值
int getAndSet(T newValue) 先取得对象值,然后设置新的对象值
boolean compareAndSet(T expect, T update) 将旧对象值设置成新对象值(先要获取当前值)

代码示例如下:

1
2
3
java复制代码Test test = new Test();  
AtomicReference<Test> atomicReference = new AtomicReference<>(test);
atomicReference.compareAndSet(atomicReference.get(), new Test());

AtomicStampedReference 和 AtomicMarkableReference 相对于 AtomicReference 的不同是,它们分别通过 Stamp(整数标记) 和 Mark(布尔标记) 解决了ABA问题。

对象属性类型

对象属性类型的原子类有三个,分别是 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。下面代码以 AtomicIntegerFieldUpdater 为例,其方法与 AtomicInteger 类似,区别是新增了对象入参。代码示例如下:

1
2
3
4
5
ini复制代码Test test = new Test();  
test.id = 0;
AtomicIntegerFieldUpdater<Test> updater = AtomicIntegerFieldUpdater.newUpdater(Test.class, "id");
//获取当前对象的id值,并加1
updater.getAndIncrement(test);

注意:对象属性类型的原子类只支持被 volatile 关键字修饰的可见成员属性

累加器类型

累加器类型的原子类有四个,分别是LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator。它们是java 1.8加入的,专门用来执行数值类型的数据累加操作,相对于 AtomicLong 性能更好。代码如下:

1
2
java复制代码LongAdder longAdder = new LongAdder();  
longAdder.increment();

LongAdder 和 LongAccumulator 的区别:LongAdder的功能增强版,它支持自定义的函数操作。DoubleAdder 和 DoubleAccumulator 的区别也一样。

并发容器

思维导图结构图.jpg

如上图,java新版的并发容器可以分成 List、Set、Map、Queue 四种类型。

List

对于List,新版Api只提供了 CopyOnWriteArrayList 这个并发容器。
Copy On Write(写时复制),意思就是在对其进行修改操作的时候,复制一个新的ArrayList,在新的ArrayList上进行修改操作,从而不影响旧的ArrayList的读操作。

详情看阿里Java面试官:CopyOnWriteArrayList底层是怎么保证线程安全的

Map

对于Map类型,有两个实现类,分别是 ConcurrentHashMap、ConcurrentSkipListMap。从应用的角度来看,主要区别在于ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。所以如果需要保证 key 的顺序,就只能使用 ConcurrentSkipListMap。

使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key和 value 都不能为空,否则会抛出NullPointerException这个运行时异常

Set

对于Set类型,有两个实现类,分别是 CopyOnWriteArraySet、ConcurrentSkipListSet。

  • CopyOnWriteArraySet:基于数组实现的并发 Set,内部是使用 CopyOnWriteArrayList 来实现的
  • ConcurrentSkipListSet:基于跳表实现的并发 Set,其内部是通过 ConcurrentSkipListMap 来实现的

Queue

对于Queue类型,有九个实现类,分别是 ArrayBlockingQueue、LinkedBlockingQueue、

SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue、
DelayQueue、LinkedBlockingDeque、ConcurrentLinkedQueue、ConcurrentLinkedDeque。

Queue.jpg

如上图所示,根据其数据结构方式和是否可以阻塞可以分成四种。Queue表示单端队列,遵循的先进先出的原则;Deque表示双端队列,该队列两端的元素既能入队,也能出队。阻塞指的是当队列已满时,入队操作阻塞,直到队列有空位才能插入;当队列已空时,出队操作阻塞,直到队列不为空才返回。非阻塞则是指入队出队操作不会阻塞,如果队列已满或者为空,(根据调用的方法)直接返回null或者报错。

队列 作用
ArrayBlockingQueue 基于数组的阻塞队列,使用数组存储数据,并需要指定其长度,所以是一个有界队列
LinkedBlockingQueue 基于链表的阻塞队列,使用链表存储数据,默认是一个无界队列;也可以通过构造方法中的capacity设置最大元素数量,所以也可以作为有界队列
SynchronousQueue 一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并且立刻消费
PriorityBlockingQueue 优先级别的阻塞队列,底层基于数组实现,是一个无界队列
DelayQueue 延迟队列,其中的元素只有到了其指定的延迟时间,才能够从队列中出队
LinkedTransferQueue 基于链表的数据交换队列,基于链表实现,是一个无界队列

线程池

java中自带的线程池

线程池 特点 获取方式
FixedThreadPool 线程数固定的线程池 Executors.newFixedThreadPool
CachedThreadPool 线程数根据任务动态调整的线程池 Executors.newCachedThreadPool
SingleThreadExecutor 只有一个线程的线程池 Executors.newSingleThreadExecutor()
ScheduledThreadPool 定时或周期性执行任务的线程池 Executors.newScheduledThreadPool()
SingleThreadScheduledExecutor 定时或周期性执行任务的线程池,但是它的线程数只有一个 Executors.newSingleThreadScheduledExecutor()

一般在业务中,我们不会使用java中自带的线程池,而是根据自己的需要自定义线程池。

ThreadPoolExecutor

自定义的线程池需要通过 ThreadPoolExecutor 来创建。ThreadPoolExecutor 的构造函数非常复杂,如下面代码所示:

1
2
3
4
5
6
7
8
arduino复制代码ThreadPoolExecutor(  
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

参数意义如下:

  • corePoolSize:核心线程数,由于频繁创建线程会对性能产生影响,因此就需要线程被创建后一直存在,这就是核心线程。Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,包括核心线程
  • maximumPoolSize:线程池创建的最大线程数。当核心线程都在执行任务时,还有任务需要处理,就会创建新的线程来处理,但是系统的资源不是无限的,因此需要限制最多创建的线程。
  • keepAliveTime :非核心线程的存在时间。当一个线程如果在一段时间内,都没有执行任务,就回收该非核心线程
  • unit :上面 keepAliveTime 的时间参数,有秒、分钟等
  • workQueue:阻塞队列(BlockingQueue),具体实现类上面已经介绍过了。当线程数达到最大时,这时还有任务来,就把任务放到这个任务队列中,等待处理。
  • threadFactory:自定义如何创建线程,例如可以给线程指定一个有意义的名字。
  • handler:自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。这时你可以通过 handler 这个参数来指定拒绝的策略

面试题:shutdown 、shutdownNow 的区别

  • shutdown() : 执行后停止接受新任务,会把队列的任务执行完毕。
  • shutdownNow() : 执行后停止接受新任务,但会中断所有的任务(不管是否正在执行中),将线程池状态变为 STOP状态。

面试题:当任务超过阻塞队列数量时,有哪些拒绝策略

ThreadPoolExecutor 已经提供了以下 4 种策略:

  • CallerRunsPolicy:提交任务的线程自己去执行该任务。
  • AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
  • DiscardPolicy:直接丢弃任务,没有任何异常抛出。
  • DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列

面试题:使用线程池要注意些什么

  • 不要使用无界的 LinkedBlockingQueue,在高负载情境下,无界队列很容易导致 OOM。很多Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,使用前需要特别注意。
  • 默认拒绝策略要慎重使用。当任务过多时,会有拒绝策略,最好针对业务的情况来自定义拒绝策略

面试题:自定义线程池的参数如何配置

根据任务的不同,推荐配置如下:

  • CPU密集型: cpu数量 + 1
  • IO密集型: cpu 数量 * 2

也可以使用动态线程池,可以看动态线程池

面试题:线程池都有哪些状态?

  • RUNNING:这是最正常的状态,接受新的任务,处理等待队列中的任务。
  • SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务。
  • STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程。
  • TIDYING:所有的任务都销毁了,workCount 为 0,线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()。
  • TERMINATED:terminated()方法结束后,线程池的状态就会变成这个。

面试题: 线程池中 submit() 和 execute() 方法有什么区别?

  • execute():只能执行 Runnable 类型的任务。
  • submit():可以执行 Runnable 和 Callable 类型的任务。

Callable 类型的任务可以获取执行的返回值,而 Runnable 执行无返回值。

ForkJoinPool

ForkJoinPool 是java7引入的线程池,它可以把一个大任务拆成多个小任务并行执行。代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码class SumTask extends RecursiveTask<Long> {
protected Long compute() {
if(判断是否需要拆分任务) {
//创建两个子任务
SumTask task1 = new SumTask(...);
SumTask task2 = new SumTask(...);
// invokeAll会并行运行两个子任务:
invokeAll(task1, task2);
// 等待获得子任务的结果:
Long result1 = task1.join();
Long result2 = task2.join();
return result1 + result2;
} else {
//执行sum操作
...
return result;
}
}
}


ForkJoinTask<Long> task = new SumTask(...);
Long result = ForkJoinPool.commonPool().invoke(task);

Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。

CompletableFuture

CompletableFuture 是 java 1.8 以后提供的类。它可以处理任务之间的时序关系,如串行关系、并行关系、汇聚关系等用来简化异步编程。它内部默认是通过ForkJoinPool线程池来执行任务,当然我们也可以设置自己的线程池。CompletableFuture 是官方提供的异步编程类,可以满足简单的异步编程需求,在Android中复杂的异步编程使用最多的是RxJava,或者现在的kotlin 协程,这个了解即可。

CompletableFuture

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码//串行任务,任务1、2、3串行执行
CompletableFuture<String> result =
CompletableFuture.supplyAsync(() -> "hello") //任务1
.thenApply(s -> s + " world"); //任务2
.thenApply(String::toUpperCase); //任务3
System.out.println(result.join());


//汇聚关系
CompletableFuture<String> result =
CompletableFuture.supplyAsync(() -> "a")
.thenCombineAsync(CompletableFuture.supplyAsync(() -> "b"),
(a, b) -> a + b );
System.out.println(result.join());

如果想深入了解CompletableFuture,具体可以看异步编程利器:CompletableFuture详解 |Java 开发实战 - 掘金 (juejin.cn)

CompletionService

CompletionService 是一种能处理批量异步任务并在完成时获取结果的并发工具类。你可以把它看成 线程池 + 队列,当一个任务完成时,就可以通过 completionService.take().get() 获取返回值(任务执行完的值存储在队列)。如果所有任务都在执行,调用 take 方法时会阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码ExecutorService executorService = Executors.newFixedThreadPool(3);  
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
completionService.submit(() -> { //任务1
Thread.sleep(200);
return 1;
});
completionService.submit(() -> { //任务2
Thread.sleep(100);
return 2;
});
completionService.submit(() -> { //任务3
Thread.sleep(150);
return 3;
});
int sum = 0;
for(int i = 0; i < 3; i++) {
try {
//这里获取顺序是 2 3 1
sum += completionService.take().get();
System.out.println(sum);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

参考

  • 14个Java并发容器,Java高手都知道!-阿里云开发者社区 (aliyun.com)
  • Java面试题|多线程22道必看面试题 - 知乎 (zhihu.com)
  • 【建议收藏】106道Android核心面试题及答案汇总(总结最全面的面试题)
  • 面试官问我什么是JMM - 知乎 (zhihu.com)
  • Java 并发编程实战 (geekbang.org)
  • final保证可见性和this引用逃逸 - 知乎 (zhihu.com)
  • Synchronized的底层实现原理(原理解析,面试必备)_synchronized底层实现原理-CSDN博客
  • 线程间到底共享了哪些进程资源 - 知乎 (zhihu.com)
  • stackoverflow.com/questions/1…
  • spotcodereviews.com/articles/co…
  • Linux内核同步机制之(三):memory barrier (wowotech.net)
  • 万字长文!一文彻底搞懂Java多线程 - 掘金 (juejin.cn)
  • 【死磕Java并发】常用并发原子类详解-腾讯云开发者社区-腾讯云 (tencent.com)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官问如何实现二🐔缓存怎么进行回答以及延伸出更多知识点呢?

发表于 2024-04-26

二级缓存的优势与缺点

优点:

1)二级缓存相比只调用一层 Redis 缓存,访问速度更快。对于一些不经常修改的数据而查询十分频繁的可以直接放在本地缓存(一级)里面。

作为面试者的扩展延伸:我在本地缓存的实现中,我使用到了本地缓存性能之王 Caffeine 作为一级缓存,在市面上很多像 Redisson、Druid、Hbase 等知名开源项目都用到了 Caffeine 。它实现了更加好用的缓存淘汰算法 W-TinyLFU 算法,结合了 LRU(最近最久未使用算法) 算法以及 LFU(最少使用算法) 算法的优点,所以选择它能使本地缓存的使用更加方便快速。

2)使用了本地缓存相比直接去 Redis 中取,能够减少与远程 Redis 的数据 I/O 网络交互,降低了网络之间的消耗。

缺点:

1)增加了本地缓存对于一致性的维护更加复杂,提高了维护成本。

2)在分布式环境下,如何解决各个节点本地缓存一致性问题?使用类 Redis 的发布订阅功能,当一个节点的数据发生修改时,直接通知其他节点的缓存进行更新。

是不是已经初步了解了二级缓存的应用咧~ 下来先带你实现一下二级缓存。

image-20240426105159466

简单实现

1)引入依赖

1
2
3
4
5
6
7
8
9
xml复制代码<dependency>
   <groupId>com.github.ben-manes.caffeine</groupId>
   <artifactId>caffeine</artifactId>
   <version>2.9.2</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2)配置 Caffeine

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Configuration
public class CaffeineConfig {
   @Bean
   public Cache<String,Object> caffeineCache(){
       return Caffeine.newBuilder()
              .initialCapacity(128)//初始大小
              .maximumSize(1024)//最大数量
              .expireAfterWrite(60, TimeUnit.SECONDS)//过期时间
              .build();
  }
}

3)使用二级缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码@Resource
private Cache<String, Object> cache;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Test
void testCache() {
   // 缓存测试存取
   cache.put("test", "test");
   assertEquals("test", cache.getIfPresent("test"));

   //实现二级缓存读取
   cache.get("test",
             k -> {
                 //先查询 Redis
                 Object obj = redisTemplate.opsForValue().get(k);
                 if (Objects.nonNull(obj)) {
                     System.out.println("缓存命中:" + k + " 从 Redis 读取成功");
                     return obj;
                }

                 // Redis没有则查询 DB
                 System.out.println("缓存没有命中:" + k + " 从 DB 读取");
                 // 从 DB 读取 ..此处模拟省略
                 obj = "test";
                 // 存入 Redis
                 redisTemplate.opsForValue().set(k, "test");
                 //存入本地缓存由cache.get执行
                 return obj;
            });

}

这样一个缓存简单的二级缓存使用就是这样啦,但这样是不是感觉对代码的侵入性有点强了,使用起来不够灵活,下面再来带大家使用 Spring 提供的 CacheManager 接口以及注解来实现它。

image-20240426101016819

使用 Spring 的 CacheManager 实现

1)引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
xml复制代码<dependency>
   <groupId>com.github.ben-manes.caffeine</groupId>
   <artifactId>caffeine</artifactId>
   <version>2.9.2</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-cache</artifactId>
</dependency>

2)配置 CacheManager

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Configuration
public class CacheManagerConfig {
   @Bean
   public CacheManager cacheManager(){
       CaffeineCacheManager cacheManager=new CaffeineCacheManager();
       cacheManager.setCaffeine(Caffeine.newBuilder()
              .initialCapacity(128)
              .maximumSize(1024)
              .expireAfterWrite(60, TimeUnit.SECONDS));
       return cacheManager;
  }
}

3)在启动类上面加上 @EnableCaching 注解

1
2
3
4
5
java复制代码@SpringBootApplication()
@EnableCaching
public class Application {

}

4)使用二级缓存查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Resource
private RedisTemplate<String,Object> redisTemplate;
@Cacheable(value = "test",key = "#id")
public String getStringTextById(Long id) {
   String key= "test" + id;
   //先查询 Redis
   String obj = (String) redisTemplate.opsForValue().get(key);
   if (Objects.nonNull(obj)){
       log.info("从Redis中获取数据");
       return obj;
  }
   // Redis没有则查询 DB
   log.info("从DB中获取数据");
   //此处省略查询DB的代码
   obj = "test";
   //redis 中存入
   redisTemplate.opsForValue().set(key,obj,120, TimeUnit.SECONDS);
   return obj;
}

这里要注意的是!

1)@Cacheable 注解的 value 跟 cacheNames 是互为别名的关系,表示当前方法的结果被缓存到哪个 cache 上面,它可以是一个数组绑定多个 cache。

2)@Cacheable 注解的 key 用来指定返回结果的 key,这个属性可以支持 SpringEL 表达式。

SpringEL表达式如下:

1
2
3
bash复制代码#参数名
#参数对象.属性名
#参数为数组对应下标

5)使用二级缓存更新数据

1
2
3
4
5
6
7
8
9
10
java复制代码@CachePut(cacheNames = "test",key = "#id")
public String updateOrder(String testData, Long id) {
   log.info("更新数据");
   //更新数据库 此处省略
   //修改 Redis
   redisTemplate.opsForValue().set("test" + id,
                                   testData, 120, TimeUnit.SECONDS);
   //返回要缓存本地的数据
   return testData;
}

这里要注意的是!需要返回对象或值,因为要进行本地缓存操作。

6)使用二级缓存删除数据

1
2
3
4
5
6
7
java复制代码@CacheEvict(cacheNames = "test",key = "#id")
public void deleteOrder(Long id) {
   log.info("删除数据");
   //此处省略删除数据库的代码
   //删除 Redis
   redisTemplate.delete("test" + id);
}

在这里简单的使用就到这里啦!还有更加优雅的方式大家可以去实现一下,通过 AOP 去实现二级缓存~

image-20240426103353051

二级缓存抛砖引玉

在上面跟大家一起谈了谈二级缓存的实现以及使用,这里我们可以跟面试官再次周旋一下!我曾经在阅读 Spring 源码时我还了解到了 Spring 的三级缓存实现。

在 Spring 框架中,循环依赖是指两个或多个 Bean 之间相互依赖,形成一个循环引用的情况。这种情况下,Spring IOC 容器在实例化 Bean 时可能会出现问题,因为它无法决定应该首先实例化哪个 Bean。为了解决这个问题,Spring 引入了三级缓存。

三级缓存是指 Spring IOC 容器中用于解决循环依赖问题的一种机制,它包含三个缓存阶段:

1)singletonObjects:这是 Spring IOC 容器的一级缓存,用于存储已经完全创建并初始化的 Bean实例。当 Bean 完全创建后,它会被放置在这个缓存中。

2)earlySingletonObjects:这是 Spring IOC 容器的二级缓存,用于存储提前暴露的、尚未完全初始化的 Bean 实例。当 Bean 正在被创建但尚未完成初始化时,它会被放置在这个缓存中。

3)singletonFactories:这是 Spring IOC 容器的三级缓存,用于存储 Bean 的工厂对象。在创建Bean 实例时,Spring 首先会在这个缓存中查找工厂对象,如果找到则使用该工厂对象来创建 Bean 实例。

三级缓存的作用

三级缓存的作用是为了解决循环依赖时的初始化顺序问题。在初始化 Bean 时,Spring 会首先将 Bean 的实例放入三级缓存中,然后进行属性注入等操作。如果发现循环依赖,Spring 会在二级缓存中查找对应的 Bean 实例,如果找到则直接返回,否则会调用Bean的工厂方法来创建 Bean 实例,并将其放入二级缓存中。当 Bean 实例化完成后,会从二级缓存中移除,并放入一级缓存中。

为什么需要三级缓存而不是二级缓存

二级缓存只存储了尚未初始化完成的 Bean 实例,而三级缓存存储了 Bean 的工厂对象。这样做的好处是,当发现循环依赖时,可以通过 Bean 的工厂对象来创建 Bean 实例,从而避免了直接从二级缓存中获取可能尚未完成初始化的 Bean 实例而导致的问题。因此,三级缓存提供了更加灵活和可靠的解决方案,能够更好地处理循环依赖问题。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

阿里巴巴瓴羊基于 Flink 实时计算的优化和实践

发表于 2024-04-26

摘要:本⽂整理⾃阿里云智能集团技术专家王柳焮⽼师在 Flink Forward Asia 2023 中平台建设专场的分享。内容主要为以下四部分:

  1. 阿里巴巴瓴羊基于 Flink 实时计算的平台演进
  2. Flink 能力优化与建设
  3. 基于 Flink 的最佳实践
  4. 未来规划
  1. 阿里巴巴瓴羊基于 Flink 实时计算的平台演进

1.1 关于瓴羊

首先简单介绍一下瓴羊,瓴羊是阿里云智能集团的重要业务,致力于将阿里巴巴沉淀十余年的数字化服务经验,系统化、产品化地全面对外输出给千行百业。2012 年开始,阿里提出数据建设方法论,内部形成多款标杆数据产品,如生意参谋、双十一数据大屏等。2015 年阿里巴巴启动中台战略,进行全域数据建设,强化数据能力在业务端的价值显现,让数字化能力广泛服务于各个业务。2018 年应商家日益增长的需求,阿里巴巴集团中台能力进行对外输出,推出 Dataphin、QuickBI 等产品,帮助企业通过数字化技术驱动创新和增长,在生态内外产生显著价值。2021 年瓴羊成立,成为阿里巴巴动物园中的一员。

1.2 Dataphin平台实时业务规模

而在这个过程中打造出的智能数据建设与治理 Dataphin,不仅在阿里巴巴集团内部支撑着各 BU 实时业务,同时也在云上服务于各行业推进企业数字化进程。

在集团内部,平台约承载有 15000+ 的实时计算任务,1000+ 的流批一体任务,覆盖约 50+ 个BU。

在云上,我们的客户分布在电商、金融、交通、零售、制造等行业,应用场景覆盖诸如实时 ETL、实时大屏、实时集成等等实时主要场景,以及像金融行业的特征计算以及风控场景等等。

1.3 Dataphin平台实时架构大图

下面介绍一下 Dataphin 平台的实时架构大图。

首先部署形态上平台支持多云输出,提供公共云、私有云、专有云、混合云等多种输出形态。在多云架构之上,支持提供基于不同调度系统的作业运行能力。在这之上,平台提供不同 Flink 发行版的多版本多集群管理能力,以及多数据源、计算源的元数据管理体系。

最上层,主要由四大模块组成。

● 数据集成,提供无代码化全增量一体的数据集成能力。

● 研发中心,提供编译优化、任务模版、流批一体等能力的实时研发体验。

● 运维中心,提供任务全方位的监控告警体系,保障线上任务的稳定性。

● 资产,支持表、字段、函数、血缘等数据资产的盘点与展示、标准定义与管理、质量评估及保障、分类分级与脱敏等等能力

如上支撑着整个 Dataphin 平台实时计算的全流程研发体系。

  1. Flink 能力优化与建设

接下来将围绕采、管、建、用四个方面,分别讲解我们在 CDC 能力提升、元数据管理、流批一体建设、运维体系上分享平台对 Flink 能力的优化与建设。

2.1 CDC 能力提升

在实时数据采集上,平台除了支持社区 Flink CDC 的能力外,还提供了自研的 Flink Connector 去满足更多样化的数据同步场景需求。左右两列是平台目前已经支持的提供增强 CDC 去做数据同步的部分输入输出,更多的数据源还在陆续增加中,我们目前做到的四个能力分别是:

● 一个是自动化感知来源库/表变更的能力,

● 支持配置多样的规则满足不同数据源的入湖入仓场景,

● 同时平台提供无代码化的整库实时同步能力,

● 最后凭借元数据管理体系支撑这种丰富多样的源端到源端的数据流通能力。

平台期望基于 Flink 引擎的能力,提供集实时性、全增量一体、无代码化、自动化为一体的实时数据同步能力。

搭载自研 Connector 的 Flink 采集任务目前支持自动感知8种数据库的源端操作,以及提供 3 种不同的处理策略。

● 对于8种数据库的源端操作,因为对行数据的增删改操作进行感知是 CDC 最基本的能力,这里就不赘述了。除此之外,平台还支持对数据表中列的一系列操作,比如列的新增、列的删除、列的重命名、列类型变更,特别的像表的重命名、清空表等等操作,都会进行自动采集发往下游进行处理。当然更多的场景用户期望的是整库同步,平台对数据库中表的删除或者表的新增,都会形成记录进行采集。

● 对于3种不同的处理策略,同时平台支持用户针对每一种源端操作的记录,去配置不同的消息处理策略。除了正常处理外,还可以配置忽略或者出错策略。比如数据开发人员对业务库一些新增的列不需要关注的可以选择不去处理,但对于已采集的列如果被删除或者更改的下游不可接受的情况,可以终止任务采集并发出告警。

同时针对 CDC 数据发往下游入湖入仓的场景,提供一些可配的规则。

● 支持用户选择目标库已有表,或者自动建表。

● 可以配置自动建表的转换规则,比如支持基于来源表名字加上前后缀来创建目标表名。

● 平台提供表级别映射状态监控,用户可以实时查看整库同步中每一张来源表到目标表的映射状态。

● 最后支持自动在目标表添加变更记录的描述字段,包括变更发生时间、操作类型等,以便业务识别使用,同时用户还可以自行添加全局字段满足特有场景的需求。

刚才介绍的所有的 CDC 能力的提升,平台侧都可通过配置化的任务形式,无需代码,只需配置一个任务即可实现整库的数据实时同步。

如上图所示,用户可以按数据源的类型选择同步任务的输入数据源和输出数据源。在同步规则配置里,支持选用整库、圈选表、排除表的方式从来源库选择需要同步的表,平台能实时感应所配置的数据源中的元数据信息,并进行展示,用户直接勾选对应的数据表即可。

如上图1是刚才说的配置表的映射规则,在列表中会显示当前的映射状态。比如规则选择已有表时,检测到目标库并没有对应的表名时,映射状态就会显示告警,在同步任务运行前就能发现问题。

如上图2是配置源端操作处理策略的配置页面,针对每一种源端操作都可分别配置对应的 DDL 处理策略。其中行数据的增删改默认是正常处理的。

2.2元数据管理

在Flink支持这样丰富的多源数据同步的场景下,面对多达几十上百种不同的数据源,用户又该如何有效应对呢?如何有效的管理不同的数据源连接信息,做好元数据管理就显得尤为重要了。

用户常常会遇到如下情况:

● 相同的源例如 Mysql,既能 CDC 又能 JDBC,如何有效识别并按需引用?

● 相同的 DDL 在不同的作业间如何复用?

● 相同的 DDL 在不同的作业使用场景能否进行差异化配置?

● 离线物理表是否可以直接引用,并与实时 DDL 进行联动从而接入全域血缘?

于是我们基于这些问题,构建了一套元数据的管理体系,分别是 Source 层、Meta 层和 Job 层。

● Source 层按照物理数据源进行管理,与引擎解耦,管理基础的连接信息,这样离线和实时可以复用。如果有些数据源能当做计算引擎还能将其配置为离线计算源,比如 Hive、Hologres、Adbpg 等等。源的 Catalog 也会在这一层进行管理。

● 再上一层构造 Meta 层,这一层会基于数据源的构建,可以是按照 Connector 类型区分的 Flink DDL,我们将称为实时元表;也可以是来自数据源或者计算源中的物理表。还可以是基于实时元表或物理表构建的流批一体映射镜像表或者是批查询的混合元表,每一种表都各有用处后续会介绍。而在这一层的表,平台会自动将其翻译映射为对应的 Flink DDL 供上层的作业层引用。

● 最上一层的 Job 层,可以重复引用 Meta 层的表,这样除了避免反复构建 DDL,也可以使得 DDL 的变更可以被所有引用的作业所感知。同时平台提供自研的 SET 词法,可以针对不同的任务差异化配置 DDL,比如不同作业 Kafka 的 Consumer Group 是不一样的,可以通过 SET 词法单独配置。当然作业里也可以直接写原生 Flink DDL 语句,支持混合使用,灵活性很高。

下面我们来看看具体的案例

目前平台支持接入数十种数据源,其中大约 10 种数据源的 Catalog 接入到 Flink,这里以 Paimon 为例,作业通过引用对应的数据源编码ID,平台可帮用户翻译生成 Create Catalog 语句,用户可直接在作业中访问 Catalog 中的物理表。

如果要基于数据源创建实时元表,对于可以识别到表结构的数据源比如 MaxCompute,平台会自动帮助用户填充对应的字段快速创建元表。如果用户想在流批一体场景下创建逻辑镜像表,可根据字段进行智能映射,提高效率。这里以 kafka 为例,作业通过引用对应的实时元表名称,平台可帮用户翻译生成对应的 DD L语句,提交到引擎侧运行。

在作业层,还是以 Kafka 为例,对于同一个 Kafka 的 Topic,不同作业可分别 Set 不同的消费组 ID 来针对同一个实时元表进行复用,或者可以根据业务场景 Set 诸如计算列、Watermark 等参数,这些作业的差异化配置,平台都会翻译在最终提交到引擎的可执行 SQL 上,如右图所示,以此来做作业粒度的差异化配置。

用户还可直接在平台上写原生 DDL,平台也支持反向地将 DDL 创建映射为 Meta 层的实时元表。基于元表的体系还有个比较大的好处,就是数据安全,源和表可以针对敏感信息比如密码进行加密,避免 Flink DDL 在 SQL 作业层的明文展示,保护我们的数据安全。

综上这套就是目前平台正在使用的元数据管理体系。

2.3 流批一体建设

当我们把数据也采集上来,元数据也管理起来,就可以基于 Flink 进行业务上的任务开发了。那接下来就流批一体这个典型场景,分享一下在元数据管理体系之上,平台在流批一体建设中所做的一些工作。

首先,Flink 作为一个流批一体引擎,其流任务和批任务还是需要区分模式分别运行。这就造成了开发人员在 Flink 流和批任务两个作业间来回切换,开发体验割裂,容易出现变更遗漏,也就导致数据一致性和质量难以保障。从存储层面来看,流批存储系统隔离,提供的数据服务不一致,维护成本高。

针对这种现状,平台的解法是提供流批一体化的开发模式。首先面向开发人员,只需维护一套代码,由平台根据流批不同的模式翻译为对应的流批可执行 SQL。在这个场景下,对一个流批任务进行变更,流或者批的计算口径会同时变更,解决业务口径不对齐带来的数据质量风险。同时在存储层面提供面向流批一体逻辑镜像表,无论底层是统一存储或流批不同存储,面向 SQL 侧看来是一张表,这张表可对外提供一致的数据服务。

下面我们来看看流批一体的具体的案例

最上层是在上一节提到的元数据体系。通过配置流批一体混合源镜像表关联流和批对应元数据表(上节说过,元表可以是 DDL 或者 Catalog 中的物理表),随后我们可以在一个 Flink SQL 作业中对其进行引用,如下图所示,从这张混合源表中读取数据,根据条件进行聚合计算,写入下游表。

平台侧提供两种模式的开关:

● 纯实时模式,平台根据用户配置的实时参数帮助用户翻译为对应的实时可执行 SQL。

● 纯离线模式,用户可以在当前作业配置离线的调度配置。

当同时打开这两种模式时,用户就开启了“实时+离线”的流批一体模式。用户可以用 Set 语句操作混合源表,如红框所示,实现对流批不同的 Flink DDL 的 With 参数进行差异化配置。也可以设置宏变量对不同的 Where 条件进行分别配置。

依靠平台自研的引擎编译模块:

● 对实时模式对应的 Kafka 表,设置相应的消费组,配置相应的数据偏移过滤条件,最终可生成的如左图所示的可执行 SQL。可以看到 DDL 的 With 参数新增了消费组的配置项,过滤条件被替换为实时的过滤条件。

● 对离线模式对应的 MaxCompute 表,同样能设置离线特有的分区,配置相应的过滤条件,最终可生成如右图所示的可执行 SQL。可以看到 DDL 的 With 参数新增了分区的配置项,过滤条件被替换为离线的过滤条件。

值得注意的是,时间参数 stat_date、bizdate 可以在任务的启动或者调度时按需配置。综上所述就是基于平台解决方案实现的一个 Flink 流批一体场景。

2.4运维体系

在刚才的三个小节,已经介绍过实时计算的采集、管理、建设,接下来最后的关键环节就是如何去运维好 Flink 任务。整个运维体系是支持多调度模式、多集群环境、多引擎版本的。

● 首先是基于 Hadoop 的 Yarn 体系,平台目前支持诸如 CDH、CDP、EMR、TDH 等等的 Hadoop 发行版,用户只要有个 Hadoop 集群,无需自行安装任何的 Flink 组件,由平台提供对应的 Client 帮助用户与集群对接,并帮助管理引擎的多版本。

● 其次还支持像阿里云实时计算 Flink、华为云 MRS Flink 的全托管方式对接。

● 另外平台也支持 K8s 的任务调度模式,Dataphin 部署的时候可内置 K8s 集群,用户可在无 Hadoop 集群情况下享受实时计算的能力。如果用户期望对接自己的 K8s 集群,平台后续也将支持。

基于这种多调度模式、多集群环境、多引擎版本的体系,用户可按需选择适合其的 Flink 运行方式。

好了,当 Flink 任务真正运行起来之后,需要通过全面的监控告警体系来保证业务的正常。对于运行中的 Flink 任务,平台通过 Metric Reporter 和 Log Agent 分别采集任务的运行指标和日志,指标会落到时间序列数据库中进行持久化存储,日志会落到文件存储中,平台提供统一的监控看板供用户查看。

同时用户也可基于相应的指标配置对应的告警策略,告警中心匹配告警事件后通过通知服务多渠道发送消息。用户收到告警,可在监控看板上定位问题,形成闭环。

在这个体系中

● 平台为用户提供丰富的监控指标监控,包括:Checkpoint、Io、Watermark、CPU、Memory、Jvm 等等。

● 提供灵活的报警策略,包括值班表、自定义消息渠道,用户不想被频繁打扰时还支持配置发送次数和报警抑制逻辑。

● 对于异常日志,平台支持 Warn 级别以上日志的定位,方便用户直接定位到具体某个 Jobmanager 或 Taskmanager 上,减少用户的检索时间。

● 指标支持当前到数天内均可查询,一是方便用户进行查看复盘,另一方面后续平台也会尝试去基于历史数据提供 AIops 的能力帮助用户提前感知任务异常。

● 对于 Flink batch 的任务,平台提供整个离线调度链路的基线产出监控,链路上可能包括其他 Hive、MaxCompute、Hologres 等离线任务,破线同样会发出告警。

如上展示的是监控看板中对监控视图和日志采集的一些产品截图。

以上就是 Dataphin 平台对 Flink 在采、管、建、用四个方面所做的一些工作,通过这四方面对 Flink 的能力优化以及平台化的功能建设,让用户在实时计算全生命周期的研发效能和体验有较大提升。

  1. 基于 Flink 的最佳实践

接下来介绍平台基于 Flink 的最佳实践。

这里挑选两个比较典型的场景来给大家做一个分享,一个是特征计算,另一个是湖仓一体。

3.1 特征计算

首先是特征计算,一开始我们做特征计算,使用的明细数据方案,那时候数据量还没那么大,明细数据直接通过 Flink 实时地写入到 Hbase 表中,离线数据从 ODPS 回流 Hbase 中,用于做特征的冷启动加速。

在这个架构中,计算任务只负责导入明细数据到视图中,聚合逻辑全部放在服务任务中。

这个架构的痛点也很明显,当处理热点数据的特征加工时,明细数据在服务端的计算压力很大,上述的方案是无法满足大数据量的计算要求的。

所以后面我们考虑利用分账的思想,将计算任务分别生成日维度、时维度、分维度的固窗数据,提前计算并保存在数据库中,查询任务利用窗口合并的方式,提供滑窗的在线计算能力。

如上图所示,因为提前计算好不同维度的窗口,用户配置任意的窗口,都可以根据天、时、分钟的固窗数据进行增量聚合来计算出最终结果。在这里我们自己对 Flink 的 State 进行管理,并新增了很多自研算子,像偏度、峰度等等。通过自研的算子 Operator 去适配这种自定义的窗口触发方式。

基于刚才的解法,我们称其为预计算方案。Flink 依赖算子拆分日、时、分维度计算并存储于 Hbase,配置不同滑窗的特征批量生成,基于 Hbase 查询及内存计算,这样在服务端聚合计算的是已经预聚合后的数据,计算压力大大减少。

再后来,我们想能不能直接利用 Flink 算出结果,并利用其批处理的能力做到流批一体的冷启动,所以后来我们提出了全计算方案。直接服务端进行划窗聚合,写入 Hbase。这样集群的存储数据量相比于预计算会更低,服务端的聚合查询性能由于是点查会更好,但同样的 Flink 任务计算端消耗的资源会相应增加。

如上图下半部分是预计算方案和全计算方案的一个对比表格,可以看出有几个区别:

一个是特征开发的灵活性,对预计算方案来说,依赖自研的状态算子按照天、时、分来做预计算,开发成本高。对于全计算方案来说,除了可依赖自研算子外还能用官方的滑动或固定窗口进行计算,灵活性更高一点。

另一点特征快上的能力来说,对于特征口径的更改,预计算方案对任意窗口的计算放在服务端,可以及时生效。而全计算方案在Flink端,需要操作任务来让配置生效,快上的能力预计算更优。

最后从计算成本来说,全计算方案相对于预计算方案,Flink 新增资源消耗会比 Hbase 的存储减少来说,成本更高。

下面分析一下基于自研状态算子和官方滑动窗口的性能对比。场景是以交易TT流为输入,Hbase 为存储输出,每5s触发过去24小时的滑动窗口计算任一卖家累计销售金额。方式是通过回拉 3 天点位进行性能压测。从端到端延迟、任务 RPS、资源消耗、下游 Hbase 写入IO、State、Checkpoint 等多个维度进行对比,这里只展示其中差异比较典型的三个,分别是回追过程中的数据时延、任务 Failover、Checkpoint 大小以及成功率。从监控图表对比可以看出,自定义状态算子相较于官方窗口,有更快的数据回追能力,因为官方窗口的滑动窗口是不共享状态的,而全计算方案针对同一份天、时、分计算状态进行聚合。导致官方窗口在任务中有过多窗口处于计算中,Checkpoint 一直无法成功打上,导致作业频繁的 Failover,无法有效输出最终的计算结果,作业延迟无法追上。也就使得自研状态算子,有更优的作业稳定性,更好的 Checkpoint 成功率。

以上就是我们在特征计算中的一些实践。

3.2 湖仓一体

另一个实践场景是基于 Flink 的湖仓一体,首先我们来看一下整体的架构体系。

结构化、半结构化、非结构化的数据一方面可通过基于 Flink 或者 DataX 的集成方案写入数据湖中,另一方面也提供 OpenAPI 的方式将数据写入湖的底层存储中。

我们以 Paimon 数据湖为例,底层的存储根据云环境和部署形态,可选择基于 HDFS 或者 OSS。按业务可分为 ods、dwd、dws 层,分层之间可通过 Flink 进行数据处理,也可通过其他离线引擎分开处理。

● 湖的上层提供诸如 Hive、Spark、Presto 等等的 Adhoc 即席查询能力,提供不同场景的取数需求。

● 湖中的表、字段、血缘经过解析都会进入到资产中,进行统一的元数据管理。

● 湖中的数据可通过数据服务对外提供服务,提供给诸如实时报表、实时监控、算法服务等等应用中。

目前的湖仓架构,基于 OSS 或 HDFS 的文件系统,有三种数据湖 Paimon、Iceberg、Hudi 的可供选择。

外层对接的数据源按类型分为6大类存储,分别是:

● 文件系统,如 Oss、Hdfs 等

● 消息队列,如 Kafka、Mq 等

● 大数据存储,如 Hive、Maxcompute、Impala 等

● 关系型数据库,如 Mysql、Pgsql、Oracle 等

● NoSQL,如 Hbase 等

● 用户还可以自定义数据源,提供自定义 Flink Connector 接入平台

对数据来说,不流通无价值,无价值不流通。在整个湖仓一体体系中,基于 Flink 与 DataX 架构的数据同步能力,我们让数据很方便地流通起来,使得数据入湖、数据出湖、湖与湖之间、仓与仓之间的数据流转变得简单高效。如此跨源支持数据流通,就可以轻松汇集和保存海量业务数据。

但再方便的入湖工具,历史数据都有迁移成本,无论是人力成本、数据验证成本等等。所以当用户觉得历史数据太重,不想完整地把仓的数据迁移到湖中的时候,我们的解法是增量数据将其入湖,历史数据不迁移,平台侧提供统一引擎服务智能识别分区。

如上图左半部分所示,将数仓里的 Hive 表和湖的 Paimon 表形成一张混合源的表,One Service 查询引擎对这张混合源的表进行查询。

如右图所示,假设这张混合源的表我们将其命名为 ods_orders,用户可以写如下语句,select * from ods_orders where dt = 某个分区,One Service 查询引擎判断混合源表的分区界限,当 dt 小于迁移时间时,继续查询原 Hive 表返回结果。当dt大于迁移时间时,查询新 Paimon 表。这样对用户的体验来说,他面向的还是一张表,用数取数的体感和一张表还是比较一致的。但是对于跨分区区间查询的场景,还是更推荐用户通过上一节讲的平台同步工具将数据完整入湖。

以上就是我们在湖仓一体中的一些实践,湖仓一体场景也是我们一直在实践优化的一块,后续也会持续在这个场景上发力探索,敬请期待。

  1. 未来规划

接下来分享一下平台的未来规划。

一个是行业解决方案方面,我们会优先将湖仓一体的场景深化落地,在产品上做成完整的解决方案面向客户,让客户能在 Dataphin 平台上便捷高效地构建起自己的湖仓架构。

第二个是平台功能完备,首先会继续丰富 Flink CDC 支持的数据源,覆盖更多客户的场景。第二个是建设全域元数据中心,在前面有介绍过我们的元数据体系,后面我们会将 Dataphin 平台内,即域内;平台外的元数据体系,指域外。将域外和域内进行打通,形成全域元数据中心,以形成更全面的表、字段的血缘体系。

第三个是持续进行体验优化,首先我们期望的是对于一条实时数据从上游往下游发送的过程中,平台能帮助用户定位到其具体的流向位置,方便纠错排查。其次,在运维体系上,期望能基于一些典型的异常场景,通过智能诊断,做到预警,在可能的异常要发生前就能将告警发出通知到用户。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…131415…399

开发者博客

3990 日志
1304 标签
RSS
© 2024 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%