开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Vmware安装Ubuntu2204实现ssh连接

发表于 2024-04-26

因为公司用的是windows电脑,熟悉下docker技术,就想着安装一个linux虚拟机,vmware使用的是16pro,之前用过centos想着服务器用的ubuntu,ubantu使用提ubuntu-22.04。vmware正常安装好ubuntu后,就是连接不上ssh。把网上的方法都试了个遍,都不得行.

尝试的方法

安装sshd

1
bash复制代码sudo apt-get install openssh-server openssh-client

检查防火墙并放行

1
bash复制代码sudo ufw allow 22

重启sshd

1
sql复制代码systemctl start ssh

查看宿主机和虚拟机网络

1
2
bash复制代码ping www.baidu.com -t #正常
ping 虚拟机ip #正常

我的主机是192.168.2.130 虚拟机设置的是128

image.png
所有检查都正确,就是出现以下错误

image.png

解决办法

打开终端

在终端中,运行以下命令来检查您的网络接口名称:我的是ens33

1
bash复制代码 ip link

image.png

编辑网络配置文件

继续在终端中,运行以下命令来编辑网络配置文件:

1
arduino复制代码 sudo nano /etc/netplan/00-installer-config.yaml

此命令将使用nano文本编辑器打开网络配置文件。

配置静态IP地址

在编辑器中,找到用于您的网络接口的配置部分。例如,如果您的网络接口名称为ens33,则配置部分可能如下所示:

1
2
3
4
5
6
yaml复制代码network:
version: 2
renderer: networkd
ethernets:
ens33:
dhcp4: true

将dhcp4: true修改为dhcp4: false,然后添加addresses和gateway4行来配置静态IP地址和默认网关。假设您的网络网段为192.168.2.0/24,要将Ubuntu 22.04配置为静态IP地址192.168.2.168,默认网关为192.168.21.1,则配置部分应如下所示:

1
2
3
4
5
6
7
8
yaml复制代码network:
version: 2
renderer: networkd
ethernets:
ens33:
dhcp4: false
addresses: [192.168.2.168/24]
gateway4: 192.168.2.1

保存和退出编辑器

完成配置后,按下Ctrl + O保存更改,在按回车键,然后按下Ctrl + X退出nano编辑器。

应用配置更改

在终端中,运行以下命令以应用网络配置更改:

1
bash复制代码 sudo netplan apply

此命令将使您的静态IP地址配置立即生效。

验证静态IP地址配置

最后,运行以下命令来验证您的静态IP地址是否已正确配置:

1
css复制代码ip address show ens33

您将看到类似以下输出:

1
2
3
4
5
6
7
8
9
bash复制代码
rh@ronhai:~$ ip address show ens33
2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 00:0c:29:ad:53:d9 brd ff:ff:ff:ff:ff:ff
altname enp2s1
inet 192.168.2.168/24 brd 192.168.2.255 scope global ens33
valid_lft forever preferred_lft forever
inet6 fe80::20c:29ff:fead:53d9/64 scope link
valid_lft forever preferred_lft forever

image.png
在输出中,inet行显示您的静态IP地址已成功配置为192.168.2.168。

恭喜!您已成功在Ubuntu 22.04上配置了静态IP地址。现在您的Ubuntu系统将在每次启动时使用您配置的静态IP地址连接到网络。

使用MobaXterm_Personal连接ssh

连接成功
可以使用任何连接工具来连接

image.png

总结三点

  • 检查网络
    看主机和虚拟机是否在同一个网段

我感觉我的问题是互Ping可以但是不能访问就感觉是ip的转发问题,就换了个ip地址,手动配置网络文件来实现,之前是通过网上的教程在vmware上网络配置了桥接后在windows的网络配置以下图的,但是没有用。最好的办法还是通过编辑配置文件来实现

image.png

  • 检查防火墙
    看防火墙是否开启,是否放行端口22
  • 检查sshd
    看是否安装,安装后是否正常启动

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

不同平台配置pip源的几种方案 注意啦,除了在公有云上不然这

发表于 2024-04-26

pip 是 Python 的包管理工具,用于安装和管理 Python 包。如果你想使用清华大学提供的 Python 包镜像源,可以通过修改 pip 的配置文件或者在使用 pip 命令时指定镜像源。以下是两种常用方法:

方法1:修改配置文件(推荐)

对于不同操作系统,pip 的配置文件位置可能不同:

  • 对于 Unix 和 macOS 系统,配置文件通常位于你的主目录下的 .pip/pip.conf(或者 .config/pip/pip.conf)。
  • 对于 Windows 系统,配置文件通常位于 C:\Users\<Username>\pip\pip.ini。

如果这些目录下没有相应的文件,你可以手动创建它们。

这里是配置文件的内容,使用清华大学的镜像源:

1
2
3
4
ini复制代码[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple
[install]
trusted-host = pypi.tuna.tsinghua.edu.cn

在 Unix 和 macOS 上,你可以通过以下命令来创建配置文件并写入上述内容:

1
2
3
4
sh复制代码mkdir -p ~/.pip && echo "[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple
[install]
trusted-host = pypi.tuna.tsinghua.edu.cn" > ~/.pip/pip.conf

在 Windows 上,你可以手动创建或修改 pip.ini 文件,将上述内容粘贴进去。

方法2:命令行参数

你也可以在使用 pip 命令时通过 --index-url 参数指定使用清华源,例如:

1
sh复制代码pip install -i https://pypi.tuna.tsinghua.edu.cn/simple some-package

这种方法不需要修改配置文件,但你需要在每次使用 pip 命令时都指定镜像源。

永久性修改

如果你想要永久修改 pip 的默认源,应该使用方法1,即修改配置文件。这样,你在使用 pip 安装包时就不需要每次都指定镜像源了。

请注意,由于网络环境的变化,镜像源的URL有可能会发生变化。如果你发现无法访问,请检查清华大学镜像站的官方网站以获取最新的URL。

其他源

注意啦,除了在公有云上不然这里不推荐其他源

在 Python 社区中,除了清华大学的镜像源之外,还有其他一些著名的镜像源可供选择。以下是一些流行的 Python 包镜像源:

  1. 阿里云 (Alibaba Cloud)
1
2
3
4
ini复制代码[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
[install]
trusted-host = mirrors.aliyun.com
  1. 中国科技大学 (University of Science and Technology of China, USTC)
1
2
3
4
ini复制代码[global]
index-url = https://pypi.mirrors.ustc.edu.cn/simple/
[install]
trusted-host = pypi.mirrors.ustc.edu.cn
  1. 豆瓣(Douban)
1
2
3
4
ini复制代码[global]
index-url = https://pypi.doubanio.com/simple/
[install]
trusted-host = pypi.doubanio.com
  1. 华为云 (Huawei Cloud)
1
2
3
4
ini复制代码[global]
index-url = https://mirrors.huaweicloud.com/repository/pypi/simple
[install]
trusted-host = mirrors.huaweicloud.com
  1. 腾讯云 (Tencent Cloud)
1
2
3
4
ini复制代码[global]
index-url = https://mirrors.cloud.tencent.com/pypi/simple
[install]
trusted-host = mirrors.cloud.tencent.com

要使用这些源,你可以按照之前提到的方法修改 pip 配置文件,或者在命令行中使用 --index-url 参数。例如,如果你想使用阿里云的镜像源,可以在命令行中输入:

1
sh复制代码pip install -i https://mirrors.aliyun.com/pypi/simple/ some-package

或者将阿里云的镜像源设置到你的 pip 配置文件中,这样以后所有的 pip 命令都会默认使用这个源。

请注意,镜像站点可能会不定期更改其服务的URL,如果你发现某个镜像源无法使用,请访问对应的官方网站以获取最新的URL信息。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot集成oauth2快速入门demo

发表于 2024-04-26

1.什么是Spring AI?

Spring AI API 涵盖了广泛的功能。 每个主要功能都在其专门的部分中进行了详细介绍。 为了提供概述,可以使用以下关键功能:

  • 跨 AI 提供商的可移植 API,用于聊天、文本到图像和嵌入模型。 支持同步和流 API 选项。 还支持下拉访问模型特定功能。 我们支持 OpenAI、Microsoft、Amazon、Google、Huggingface 等公司的 AI 模型。
  • 跨 Vector Store 提供商的可移植 API,包括同样可移植的新颖的类似 SQL 的元数据过滤器 API。 支持 8 个矢量数据库。
  • 函数调用。 Spring AI 使 AI 模型可以轻松调用 POJO java.util.Function 对象。
  • AI 模型和向量存储的 Spring Boot 自动配置和启动器。
  • 数据工程的 ETL 框架。 这为将数据加载到矢量数据库提供了基础,有助于实现检索增强生成模式,使您能够将数据引入 AI 模型以纳入其响应中。

Chat Completion API

  • 聊天 API 使开发人员能够将人工智能支持的聊天功能集成到他们的应用程序中。 它利用预先训练的语言模型,例如 GPT(生成式预训练变压器),以自然语言对用户输入生成类似人类的响应。
  • API 通常通过向 AI 模型发送提示或部分对话来工作,然后 AI 模型根据其训练数据和对自然语言模式的理解生成对话的完成或延续。 然后,完成的响应将返回到应用程序,应用程序可以将其呈现给用户或将其用于进一步处理。
  • Spring AI Chat Completion API 被设计为一个简单且可移植的接口,用于与各种 AI 模型交互,允许开发人员以最少的代码更改在不同模型之间切换。 这种设计符合 Spring 的模块化和可互换性理念。
  • 此外,在输入封装 Prompt 和输出处理 ChatResponse 等配套类的帮助下,聊天完成 API 统一了与 AI 模型的通信。 它管理请求准备和响应解析的复杂性,提供直接且简化的 API 交互。

2.openapi相关环境准备

参考链接:www.rebelmouse.com/openai-acco…

免费提供api-key

加入博主的知识星球,另外现在加入还可以带大家手把手做一个出海项目,作为项目共创者5c55d703-a577-4942-aa50-f222bf45bcdf

3.代码工程

**

实验目的:实现聊天功能api

**

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.1</version>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>ai</artifactId>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>0.8.0-SNAPSHOT</version>
</dependency>


</dependencies>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
</repositories>

</project>

application.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
yaml复制代码server:
port: 8088

spring:
ai:
openai:
base-url: https://api.openai.com/
api-key: sk-xxx
embedding:
options:
model: text-davinci-003
chat:
#指定某一个API配置(覆盖全局配置)
api-key: sk-xxx
base-url: https://api.openai.com/
options:
model: gpt-3.5-turbo # 模型配置

controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
kotlin复制代码package com.et.ai.controller;

import jakarta.annotation.Resource;
import org.springframework.ai.chat.ChatClient;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.embedding.EmbeddingClient;
import org.springframework.ai.embedding.EmbeddingResponse;
import org.springframework.ai.openai.OpenAiChatClient;
import org.springframework.ai.openai.api.OpenAiApi;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@RestController
public class HelloWorldController {
@Autowired
EmbeddingClient embeddingClient;
@Autowired
ChatClient chatClient;
@GetMapping("/ai/embedding")
public Map embed(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
EmbeddingResponse embeddingResponse = this.embeddingClient.embedForResponse(List.of(message));
return Map.of("embedding", embeddingResponse);
}
@GetMapping("/ai/chat")
public String chat(@RequestParam(value = "message", defaultValue = "Tell me a joke") String message) {
Prompt prompt = new Prompt(message);
return chatClient.call(prompt).getResult().getOutput().getContent();
}
}

DemoApplication.java

1
2
3
4
5
6
7
8
9
10
11
12
typescript复制代码package com.et.ai;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • github.com/Harries/spr…

4.测试

  • 启动Spring Boot应用
  • 访问http://127.0.0.1:8088/ai/chat,返回响应消息

Why couldn’t the bicycle stand up by itself? Because it was two tired!

5.参考引用

  • docs.spring.io/spring-ai/r…
  • www.liuhaihua.cn/archives/71…
  • springboot.io/t/topic/516…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

掌握Go语言:Go语言类型转换,解锁高级用法,轻松驾驭复杂数

发表于 2024-04-26

在Go语言中,类型转换不仅仅局限于简单的基本类型之间的转换,还可以涉及到自定义类型、接口类型、指针类型等的转换。以下是Go语言类型转换的高级用法详解:

Go语言类型转换的高级用法

1. 自定义类型之间的转换

在Go语言中,可以使用类型别名或自定义类型来创建新的数据类型。自定义类型之间的转换需要显示转换,但是可以在逻辑上实现类型的安全转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码package main

import "fmt"

type Celsius float64
type Fahrenheit float64

func main() {
var f Fahrenheit = 100
var c Celsius
c = Celsius((f - 32) * 5 / 9)
fmt.Println("Temperature in Celsius:", c)
}

以上代码演示了在Go语言中自定义类型之间的转换,具体来说,定义了两个自定义类型 Celsius 和 Fahrenheit,分别表示摄氏度和华氏度。然后在 main 函数中,将华氏度转换为摄氏度,并输出结果。

  • 首先,声明了 Celsius 和 Fahrenheit 两个自定义类型,它们分别是 float64 的别名。
  • 在 main 函数中,声明了一个华氏度变量 f 并赋值为 100。
  • 接着,声明了一个摄氏度变量 c。
  • 然后,将 f 转换为摄氏度类型,并将结果赋值给 c,转换的公式是 (f - 32) * 5 / 9。
  • 最后,使用 fmt.Println 输出摄氏度的值。

这段代码展示了如何利用Go语言的类型转换机制,将不同的自定义类型之间的值进行转换,以适应不同的业务需求。

2. 接口类型转换

在Go语言中,接口类型可以存储任意类型的值。当需要从接口类型中取出具体的值时,需要进行类型转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码package main

import "fmt"

func main() {
var i interface{} = 10
value, ok := i.(int)
if ok {
fmt.Println("Value:", value)
} else {
fmt.Println("Conversion failed")
}
}

以上代码演示了在Go语言中使用类型断言来判断接口类型变量中存储的值的实际类型,并进行相应的类型转换。

  • 在 main 函数中,声明了一个空接口类型变量 i,并将其赋值为整数 10。
  • 然后,使用类型断言 i.(int) 尝试将 i 中的值转换为整数类型,并将结果赋值给 value。
  • 如果类型断言成功(即 i 中的值为整数类型),则 ok 的值为 true,否则为 false。
  • 最后,根据 ok 的值来判断类型转换是否成功,如果成功则输出转换后的整数值,否则输出提示信息 “Conversion failed”。

这段代码展示了如何使用类型断言来动态判断接口类型变量中存储的值的实际类型,并根据需要进行类型转换,以实现更灵活的编程。

3. 指针类型转换

在Go语言中,指针类型之间可以进行转换,但是需要确保目标类型是源类型的子类型或者是相同类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码package main

import "fmt"

type Animal struct {
Name string
}

type Dog struct {
*Animal
Breed string
}

func main() {
animal := Animal{Name: "Animal"}
dog := Dog{Animal: &animal, Breed: "Labrador"}

fmt.Println("Dog name:", dog.Name)
}

以上代码演示了在Go语言中如何使用结构体嵌套和指针来实现组合关系。

  • 首先,定义了两个结构体类型 Animal 和 Dog。
  • Animal 结构体包含一个字段 Name,用于表示动物的名称。
  • Dog 结构体嵌套了一个指向 Animal 结构体的指针,并拥有自己的字段 Breed,用于表示狗的品种。
  • 在 main 函数中,创建了一个 Animal 类型的变量 animal,并初始化其 Name 字段为 “Animal”。
  • 接着,创建了一个 Dog 类型的变量 dog,通过结构体嵌套将 Animal 结构体作为 Dog 结构体的一个字段,同时指定了 Breed 字段的值为 “Labrador”。
  • 最后,通过 dog.Name 访问了嵌套的 Animal 结构体的 Name 字段,并输出了狗的名称。

这段代码展示了如何在Go语言中使用结构体嵌套和指针来构建复杂的数据结构,并实现了对象之间的组合关系。

应用场景

  1. 数据转换

在处理数据时,可能需要将一种数据类型转换为另一种数据类型,例如将字符串转换为整数、将整数转换为浮点数等。
2. 接口类型断言

当使用接口类型时,可能需要将接口类型断言为具体的类型以进行后续操作,例如从接口类型中取出具体的值进行处理。
3. 指针类型转换

在处理复杂数据结构时,可能需要将指针类型进行转换以获取相关数据或进行操作。

注意事项

  1. 类型断言安全性

在进行类型断言时,需要注意判断断言是否成功,以避免出现panic。

1
2
3
4
5
6
go复制代码var i interface{} = "hello"
if value, ok := i.(int); ok {
fmt.Println("Value:", value)
} else {
fmt.Println("Conversion failed")
}

以上代码演示了在Go语言中进行类型断言时的处理方式。

  • 首先,创建了一个空接口类型 i,并将字符串 “hello” 赋值给它。
  • 接着,在 if 语句中使用了类型断言 i.(int),试图将 i 断言为 int 类型。
  • 如果断言成功,将 i 转换为 int 类型的值,并将其赋值给 value,同时将 ok 设为 true,然后输出转换后的值。
  • 如果断言失败,即 i 的实际类型不是 int,则将 ok 设为 false,表示转换失败,并输出 “Conversion failed”。

由于 i 的实际类型是 string,而不是 int,因此断言失败,最终输出 “Conversion failed”。

  1. 指针类型转换

在进行指针类型转换时,需要确保目标类型是源类型的子类型或者是相同类型,否则可能会导致编译错误或运行时错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码type Animal struct {
Name string
}

type Dog struct {
*Animal
Breed string
}

func main() {
animal := Animal{Name: "Animal"}
dog := Dog{Animal: &animal, Breed: "Labrador"}

fmt.Println("Dog name:", dog.Name)
}

以上代码演示了在Go语言中嵌入结构体的用法。

  • 首先,定义了两个结构体类型:Animal 和 Dog。
  • Dog 结构体嵌入了 Animal 结构体,这意味着 Dog 结构体包含了 Animal 结构体的所有字段和方法。
  • 在 main 函数中,创建了一个名为 animal 的 Animal 类型变量,并初始化其 Name 字段为 “Animal”。
  • 接着,创建了一个名为 dog 的 Dog 类型变量,其中 Animal 字段被赋值为指向 animal 变量的指针,并设置了 Breed 字段为 “Labrador”。
  • 最后,通过 dog.Name 可以访问到 Animal 结构体中的 Name 字段,并输出 “Dog name: Animal”。

这种结构体嵌入的方式可以让 Dog 结构体获得 Animal 结构体的所有属性和方法,实现了代码的复用和组合。

总结

Go语言类型转换的高级用法涉及到自定义类型、接口类型和指针类型的转换,可以在程序中实现复杂数据结构的处理和操作。在进行类型转换时,需要注意类型安全性和转换的合法性,以确保程序的正确性和稳定性。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

阿里也出手了!Spring Cloud Alibaba AI

发表于 2024-04-26

什么是Spring AI

Spring AI是从著名的Python项目LangChain和LlamaIndex中汲取灵感,它不是这些项目的直接移植,它的成立信念是,下一波生成式人工智能应用程序将不仅适用于 Python 开发人员,而且将在许多编程语言中无处不在。

我们可以从Spring AI的官网描述中,总结出Spring AI的几个核心的关键词:

  • 提供抽象能力
  • 简化AI应用的开发
  • 模型与向量支持
  • AI集成与自动配置

Spring AI简化了我们构建大型复杂的AI应用的过程,当然如果你的项目仅仅是需要调用一个AI接口,那其实直接调用官方SDK反而更方便。

Spring AI提供的功能如下:

  • 支持所有主要的模型提供商,如OpenAI,Microsoft,Amazon,Google和Huggingface。支持的模型类型包括聊天和文本到图像。
  • 跨 AI 提供商的可移植 API,用于聊天和嵌入模型。支持同步和流 API 选项。还支持下拉以访问特定于模型的功能。
  • 将 AI 模型输出映射到 POJO。
  • 支持所有主要的向量数据库,例如 Azure Vector Search、Chroma、Milvus、Neo4j、PostgreSQL/PGVector、PineCone、Qdrant、Redis 和 Weaviate。
  • 跨 Vector Store 提供程序的可移植 API,包括新颖的类似 SQL 的元数据过滤器 API,该 API 也是可移植的。
  • AI 模型和矢量存储的 Spring Boot stater。
  • 用于数据工程的 ETL 框架

什么是Spring Cloud Alibaba AI

原始的Spring AI并没有国内相关大模型的接入,对国内开发者不太友好。

总的来说,Spring Cloud Alibaba AI 目前基于Spring AI 0.8.1版本 API 完成通义系列大模型的接入。

在当前最新版本中,Spring Cloud Alibaba AI 主要完成了几种常见生成式模型的适配,包括对话、文生图、文生语音等,开发者可以使用 Spring Cloud Alibaba AI 开发基于通义的聊天、图片或语音生成 AI 应用,框架还提供 OutParser、Prompt Template、Stuff 等实用能力。

Spring Cloud Alibaba AI官方还提供了包括聊天对话、文生图、文生语音等多种应用的开发示例,具体可以前往官网查看:快速开始 | https://sca.aliyun.com

动手体验Spring Cloud Alibaba AI

首先新建一个Maven项目,JDK选的是17版本。

Maven文件需要引入spring-cloud-alibaba-dependencies和spring-cloud-starter-alibaba-ai两个依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
xml复制代码<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2023.0.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-ai</artifactId>
</dependency>
</dependencies>

配置阿里云通义千问的Api-Key,没有的读者可以从官网上申请。

1
2
3
4
5
6
7
8
9
10
yml复制代码server:
port: 8080
spring:
application:
name: alibaba-spring-ai-demo

cloud:
ai:
tongyi:
api-key: 你的api-key

新建SpringBoot启动类:

1
2
3
4
5
6
java复制代码@SpringBootApplication
public class MyAiApplication {
public static void main(String[] args) {
SpringApplication.run(MyAiApplication.class,args);
}
}

对接文本模型

我们首先测试如何对接文本大模型。

新建一个控制器类:新建/simple接口,用来测试基本QA。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@RestController
@RequestMapping("/ai")
@CrossOrigin
public class TongYiController {
@Autowired
@Qualifier("tongYiSimpleServiceImpl")
private TongYiService tongYiSimpleService;

@GetMapping("/simple")
public String completion(
@RequestParam(value = "message", defaultValue = "AI时代下Java开发者该何去何从?")
String message
) {
return tongYiSimpleService.completion(message);
}
}

新建一个TongyiService服务类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public interface TongYiService {

/**
* 基本问答
*/
String completion(String message);
/**
* 文生图
*/
ImageResponse genImg(String imgPrompt);

/**
* 语音合成
*/
String genAudio(String text);

}

具体的实现类如下:由 Spring AI 自动注入 ChatClient、StreamingChatClient,ChatClient 屏蔽底层通义大模型交互细节,后者用于流式调用。

对于QA而言,仅仅通过client.call(prompt)一行代码就可以完成对模型的调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Service
@Slf4j
public class TongYiSimpleServiceImpl extends AbstractTongYiServiceImpl {
/**
* 自动注入ChatClient、StreamingChatClient,屏蔽模型调用细节
*/
private final ChatClient chatClient;

private final StreamingChatClient streamingChatClient;

@Autowired
public TongYiSimpleServiceImpl(ChatClient chatClient, StreamingChatClient streamingChatClient) {
this.chatClient = chatClient;
this.streamingChatClient = streamingChatClient;
}
/**
* 具体实现:
*/
@Override
public String completion(String message) {
Prompt prompt = new Prompt(new UserMessage(message));
return chatClient.call(prompt).getResult().getOutput().getContent();
}
}

我们发送一个请求,prompt是AI时代下Java开发者该何去何从?测试结果如下:

image.png

文生图模型

这里只给出service的代码,其它代码同上面的文本问答。

可以看到,只需要实例化一个imagePrompt,再调用模型即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Slf4j
@Service
public class TongYiImagesServiceImpl extends AbstractTongYiServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(TongYiService.class);
private final ImageClient imageClient;
@Autowired
public TongYiImagesServiceImpl(ImageClient client) {
this.imageClient = client;
}
@Override
public ImageResponse genImg(String imgPrompt) {
var prompt = new ImagePrompt(imgPrompt);
return imageClient.call(prompt);
}
}

测试的prompt是:Painting a boy coding in front of the desk, with his dog.,测试结果如下,效果还是很不错的:

img测试.jpg
img接口.jpg
语音合成模型


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Slf4j
@Service
public class TongYiAudioSimpleServiceImpl extends AbstractTongYiServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(TongYiService.class);
private final SpeechClient speechClient;
@Autowired
public TongYiAudioSimpleServiceImpl(SpeechClient client) {
this.speechClient = client;
}
@Override
public String genAudio(String text) {
logger.info("gen audio prompt is: {}", text);
var resWAV = speechClient.call(text);
// save的代码省略,就是将音频保存到本地而已
return save(resWAV, SpeechSynthesisAudioFormat.WAV.getValue());
}
}

测试结果也是成功的:

image.png

使用体验小结

不得不说,阿里在Java开发领域一直是走在国内的前沿的,我也期待阿里继续完善Spring Cloud Alibaba AI的相关功能,为我们这些国内Java开发者提供更加方便的开发工具。

本文仅仅简单测试了文本问答、文生图以及语音合成三个功能,(最后一个没列出来),Spring Cloud Alibaba AI还有很多丰富的功能,如流式调用、POJO转换、AI Role等功能,各位读者感兴趣可以自行前往官方example仓库查看。后续也我打算利用Spring Cloud Alibaba AI尝试构建一个RAG问答应用。

下面给出我的使用小结:

  1. 简化开发。个人开发者如果仅仅需要简答的问答接口,无需使用Spring AI,然而,当项目中需要开发比较复杂的AI功能,如果仅仅使用官方的SDK,写出的代码可能不太容易长期维护。
  2. 响应时间。接口响应时间还有很大的优化空间,可以看到基本的文本问答的响应就耗费了10s,不过这也取决于所处理任务的大小。
  3. 模型选择。之前使用SDK可以自己选择通义提供的各种模型,而使用Spring AI框架,暂时不知道如何选择其它模型进行调用,有知道的掘友也可以在评论区说一下。

未来,Spring Cloud Alibaba AI还将继续完成 VectorStore、Embedding、ETL Pipeline 等更多适配,简化 RAG 等更多 AI 应用开发场景。身为Java开发者,我也将继续关注Spring Cloud Alibaba 社区的最新动态。

参考

  1. Spring Cloud Alibaba AI 概述 | https://sca.aliyun.com
  2. Spring AI 抢先体验,5 分钟玩转 Java AI 应用开发 (mp.weixin.qq.com)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于Redission布隆过滤器-优化版

发表于 2024-04-26

概述:

原因:

juejin.cn/post/735501… (基于Redission布隆过滤器原理,优缺点及工具类和使用示例)文中简单介绍Redission布隆过滤器使用,这边基于此上再次给出优化版本,防止元素存在误差的场景:

优化:

优化理论:

为了进一步优化 RedissonBloomFilterUtil,我们可以考虑以下几点来增强其功能和鲁棒性:

  1. 配置管理:允许动态配置布隆过滤器的参数,如预期插入数量和误报率。
  2. 异常处理:增加异常处理机制来确保系统稳定运行,即使底层服务(缓存或数据库)出现问题。
  3. 日志记录:记录关键操作和异常,以便于问题追踪和性能监控。
  4. 重建和同步布隆过滤器:提供机制定期或根据需要重建布隆过滤器,以减少误报率。
  5. 分布式锁:在更新缓存和布隆过滤器时使用分布式锁,以防止并发更新导致的数据不一致。
  6. 回退策略:在缓存和数据库服务不可用时提供回退策略,以保证服务的持续可用性。

优化流程:

使用布隆过滤器的基本流程如下:

  1. 当请求到来时,首先查询布隆过滤器。
  2. 如果布隆过滤器认为数据不存在,则直接返回,不继续查询数据库。
  3. 如果布隆过滤器认为数据可能存在,继续查询缓存。
  4. 如果缓存中有数据,返回缓存数据。
  5. 如果缓存中没有数据,继续查询数据库。
  6. 如果数据库中有数据,将其放入缓存并返回。
  7. 如果数据库中也没有数据,可以选择更新布隆过滤器(如果布隆过滤器支持删除操作),或者记录这个不存在的查询,以便在布隆过滤器重建时排除这些误报。

优化实现:

以下是考虑了上述优化点的 RedissonBloomFilterUtil 类的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
java复制代码import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedissonBloomFilterUtil {

private static final Logger logger = LoggerFactory.getLogger(RedissonBloomFilterUtil.class);

private RBloomFilter<String> bloomFilter;
private RedissonClient redissonClient;
private CacheService cacheService; // 假设这是你的缓存服务
private DatabaseService databaseService; // 假设这是你的数据库服务

public RedissonBloomFilterUtil(RedissonClient redissonClient, CacheService cacheService, DatabaseService databaseService, long expectedInsertions, double falseProbability) {
this.redissonClient = redissonClient;
this.cacheService = cacheService;
this.databaseService = databaseService;
this.bloomFilter = redissonClient.getBloomFilter("bloomFilter");
this.bloomFilter.tryInit(expectedInsertions, falseProbability);
}

public Object getData(String key) {
try {
if (!bloomFilter.contains(key)) {
logger.debug("Bloom filter does not contain key: {}", key);
return null;
}

Object cachedData = cacheService.getFromCache(key);
if (cachedData != null) {
logger.debug("Returning data from cache for key: {}", key);
return cachedData;
}

Object databaseData = databaseService.getFromDatabase(key);
if (databaseData != null) {
cacheService.addToCache(key, databaseData);
logger.debug("Data added to cache for key: {}", key);
return databaseData;
}

logger.debug("Data not found in database for key: {}", key);
recordFalsePositive(key);
} catch (Exception e) {
logger.error("Error retrieving data for key: {}", key, e);
// Implement fallback strategy (e.g., retry logic, circuit breaker)
}
return null;
}

private void recordFalsePositive(String key) {
// Implement logic to record false positives
// This could involve logging or updating the bloom filter
logger.info("Recorded false positive for key: {}", key);
}

public void rebuildBloomFilter() {
// Implement logic to rebuild the bloom filter
// This might involve clearing the bloom filter and re-adding keys from a reliable source
logger.info("Rebuilding bloom filter");
}

// ... (shutdown method and other methods)
}

优化总结:

在这个实现中,我们添加了日志记录来帮助跟踪操作和捕获异常。我们还为 getData 方法添加了异常处理逻辑,以便在发生错误时记录错误并执行回退策略(例如重试逻辑或断路器模式)。
recordFalsePositive 方法现在记录了误报的信息,这有助于监控布隆过滤器的性能,并在必要时进行调整。

rebuildBloomFilter 方法是一个存根,你可以在这里实现布隆过滤器的重建逻辑。这可能包括从数据库或其他可靠源重新填充布隆过滤器,以确保其准确性。

扩展:

请注意,为了实现分布式锁和回退策略,可能需要更复杂的逻辑和额外的依赖。
可以使用 Redisson 提供的分布式锁功能,在更新缓存和布隆过滤器时保持数据一致性。
对于回退策略,你可能需要使用像 Hystrix 这样的库来实现断路器模式,或者自己实现重试逻辑。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

世界上最大的 API 中心:RapidAPI,RapidAP

发表于 2024-04-26

一、RapidAPI 介绍

RapidAPI 是一个 API 中心,使超过 300 万的开发人员能够查找、管理和连接 API。 RapidAPI 让开发人员可以从一个地方管理所有 API 集成,并提供实时性能指标。 RapidAPI 成立于 2014 年,总部位于加利福尼亚州旧金山。

RapidAPI 是由开发人员为开发人员制作的,因此他们可以在一个地方访问 API 和微服务,并更高效、更轻松地构建应用程序。 今天,RapidAPI 是世界上最大的 API 中心,近 300 万开发人员可以在其中查找、测试和连接数以万计的 API——所有 API 都使用一个帐户、一个 API 密钥和一个 SDK。

软件开发人员还可以使用 RapidAPI for Teams 共享和协作处理内部 API,RapidAPI for Teams 是一个用于发布内部 API 和共享公共 API 订阅的通用工作区。 此外,组织可以使用 RapidAPI Enterprise 创建集中式中心环境,以帮助开发人员更快地重用和连接到现有 API,同时为 IT 提供企业范围的 API 使用可见性和治理。

二、RapidAPI (R Software Inc.)产品百科

  • RapidAPI Hub:查找并连接到数千个 API 一个 SDK;一个 API 密钥;一个仪表板。 您的开发团队、合作伙伴和客户可以发现并连接到您的 API——所有这些都来自一个单一的下一代 API 平台。 RapidAPI 的 Enterprise Hub 可以定制以匹配您公司的品牌,与内部系统和工具无缝集成,支持您的所有 API,并且可以部署为基于云的服务、本地和跨多云环境。
  • RapidAPI for Teams:随着公司过渡到在其架构中使用微服务,整个组织中创建了更多 API。 因此,开发人员在开发新软件时找到这些内部 API 并重用它们变得更具挑战性。 当开发人员订阅外部 API 时,他们需要一个协作解决方案来在内部共享它们。 RapidAPI for Teams 使您能够发布、共享和连接到内部 API 以及外部 API 和微服务。 使用 RapidAPI for Teams,开发人员可以创建一个组织并邀请其他人从私有工作区共享内部和外部 API。
  • RapidAPI Enterprise Hub:API 已成为构建软件的重要工具——在整个组织中激增,并从数百个增加到数千个。 随着开发团队越来越多地使用更多 API,他们正在探索新的 API 类型,例如 GraphQL、Kafka 等。 使这一挑战更加复杂的是,大多数公司的服务在具有多个网关以及混合和多云场景的异构环境中运行。RapidAPI Enterprise Hub 通过与任何 API 环境集成的下一代平台来满足这一需求。
  • RapidAPI Testing:RapidAPI 测试是一种功能性 API 测试和监控解决方案,可提供直观的用户体验、对任何 API 类型的支持以及与 RapidAPI Hub 和 RapidAPI Enterprise Hub 的集成。 RapidAPI 测试使用户和企业能够:1)确保 API 功能 – 轻松创建复杂的功能测试,以对 API 进行深度验证;2)集中监控——监控和管理跨多个地区的 API 测试;3)提高效率——集成到 CI/CD 管道,跨团队协作,并与 RapidAPI Hub 和 RapidAPI Enterprise Hub 本地集成 。
  • RapidAPI Client:RapidAPI Design by Paw是一个功能齐全的 API 客户端,可加速 API 交付并改善开发人员体验。 它通过提供具有直观 UI 和卓越性能的 API 客户端来简化开发工作流程并简化 API 协作。

三、RapidAPI 如何付费呢?

这里需要开一张Fomepay的虚拟信用卡即可付费,我这里使用的是559666进行付费,点击获取虚拟卡

微信图片_20240108105643.png

卡信息在卡中心cvc卡密里面

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文搞定分布式ID-基于Redission和Snowflak

发表于 2024-04-26

基于Redission 分布式ID:

基于 Redisson 实现分布式 ID 生成通常涉及使用 Redis 的原子操作,如 INCR 或 INCRBY,来确保生成的 ID 是唯一的。
以下是一个简单的示例,展示如何使用 Redisson 来创建一个带有自定义前缀的分布式 ID 生成器。
使用 Maven

配置:

在 pom.xml 文件中添加以下依赖:

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.4</version>
</dependency>

然后,可以创建一个类来实现分布式 ID 生成器的逻辑:

注意点:

  1. 缓存前缀相关的 RAtomicLong 实例:为了避免每次生成 ID 时都获取一个新的 RAtomicLong 实例,我们可以缓存这些实例。
  2. 批量生成 ID:如果你的应用程序需要频繁生成 ID,可以一次性批量生成多个 ID,以减少与 Redis 服务器的通信次数。
  3. 添加异常处理:在与 Redis 交互时,可能会出现连接问题或其他异常,应该添加异常处理来确保系统的稳定性。
  4. 配置化:使 ID 生成器的某些方面(如键的名称前缀)可配置,以便在不同环境或不同情况下灵活使用。

简化版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码import org.redisson.api.RedissonClient;
import org.redisson.api.RAtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Component
public class DistributedIdGenerator {

private final RedissonClient redissonClient;
private final ConcurrentMap<String, RAtomicLong> atomicLongCache;
private static final String KEY_PREFIX = "id_generator_"; // 可配置化的键前缀
private static final int ID_BATCH_SIZE = 100; // 批量生成 ID 的大小
private long currentId = 0;
private long maxId = 0;

@Autowired
public DistributedIdGenerator(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.atomicLongCache = new ConcurrentHashMap<>();
}

public synchronized String generateId(String prefix) {
// 检查当前批次是否已用完
if (currentId >= maxId) {
// 从 Redis 获取下一个批次的起始 ID
RAtomicLong atomicLong = atomicLongCache.computeIfAbsent(prefix, k -> redissonClient.getAtomicLong(KEY_PREFIX + k));
currentId = atomicLong.addAndGet(ID_BATCH_SIZE);
maxId = currentId + ID_BATCH_SIZE;
}
// 返回自定义前缀和 ID 的组合
return prefix + "_" + (currentId++);
}
}

在上述代码中,generateId 方法现在是同步的,以确保在批量生成 ID 时线程安全。我们使用 currentId 和 maxId 变量来跟踪当前批次的 ID 范围。当当前批次的 ID 用完时,通过增加 ID_BATCH_SIZE 来从 Redis 获取下一个批次的起始 ID。

此外,我们使用 ConcurrentMap 来缓存 RAtomicLong 实例,以便我们可以重用它们,而不是每次调用 generateId 时都创建新的实例。

优化版:

针对以上代码,可以进一步优化:
考虑以下方面:

  1. 异步获取 ID:提供异步版本的 ID 获取方法,以避免在高并发环境中阻塞调用线程。
  2. 预取 ID 范围:在当前 ID 范围即将用完时,异步预取下一个 ID 范围,以减少生成 ID 时的等待时间。
  3. 优化锁的使用:减少锁的粒度,避免在高并发下的性能瓶颈。
  4. 监控和告警:加入监控机制,当 ID 的使用接近上限时发送告警。

以下是对 DistributedIdGenerator 类进行进一步优化的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
java复制代码import org.redisson.api.RedissonClient;
import org.redisson.api.RAtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

@Component
public class DistributedIdGenerator {

private final RedissonClient redissonClient;
private final ConcurrentMap<String, AtomicLong> currentIdMap;
private final ConcurrentMap<String, AtomicLong> maxIdMap;
private static final String KEY_PREFIX = "id_generator_";
private static final int ID_BATCH_SIZE = 100;
private static final int PREFETCH_THRESHOLD = 10; // 当剩余 ID 少于此值时,开始预取

@Autowired
public DistributedIdGenerator(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.currentIdMap = new ConcurrentHashMap<>();
this.maxIdMap = new ConcurrentHashMap<>();
}

public String generateId(String prefix) {
AtomicLong currentId = currentIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));
AtomicLong maxId = maxIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));

long id = currentId.getAndIncrement();
if (id >= maxId.get()) {
synchronized (maxId) {
if (id >= maxId.get()) {
RAtomicLong atomicLong = redissonClient.getAtomicLong(KEY_PREFIX + prefix);
long newMaxId = atomicLong.addAndGet(ID_BATCH_SIZE);
maxId.set(newMaxId);
currentId.set(newMaxId - ID_BATCH_SIZE + 1);
id = currentId.getAndIncrement();
}
}
}

// Prefetch next ID range if necessary
long remainingIds = maxId.get() - currentId.get();
if (remainingIds <= PREFETCH_THRESHOLD) {
CompletableFuture.runAsync(() -> fetchNextIdRange(prefix));
}

return prefix + "_" + id;
}
private void fetchNextIdRange(String prefix) {
AtomicLong maxId = maxIdMap.get(prefix);
synchronized (maxId) {
// Double-check in case another thread already fetched the range
if (currentIdMap.get(prefix).get() < maxId.get()) {
return;
}
RAtomicLong atomicLong = redissonClient.getAtomicLong(KEY_PREFIX + prefix);
long newMaxId = atomicLong.addAndGet(ID_BATCH_SIZE);
maxId.set(newMaxId);
}
}
}

在上述代码中,我们使用 ConcurrentMap 来维护当前 ID 和最大 ID 的原子性,同时减少了锁的粒度,只在需要更新最大 ID 时进行同步。此外,我们引入了 PREFETCH_THRESHOLD 来确定何时开始预取下一个 ID 范围,并使用 CompletableFuture 来异步预取,从而避免阻塞当前线程。

为了监控和告警,可以在适当的位置添加日志记录或集成监控系统,以跟踪 ID 的使用情况并在接近上限时发送告警。

请注意,这个优化版本假设 ID 的消耗速率不会在短时间内迅速耗尽当前批次的 ID。如果你的系统有这种可能性,可能需要进一步调整 PREFETCH_THRESHOLD 的值,或者实现更复杂的预取逻辑来确保 ID 的连续供应。

最终版:

最后分布式环境中给出最后代码实现:

在分布式环境中,进一步优化 DistributedIdGenerator 类可能涉及到以下几个方面:

  1. 分段锁:为了减少锁的争用,可以使用分段锁(Striped Locks)来减少同步块的争用。
  2. 本地缓存与预取结合:在本地缓存一定量的 ID 范围,并在接近耗尽时预取下一批次的 ID,以减少对 Redis 的请求频率。
  3. 扩展性和灵活性:考虑到不同的使用场景和需求,提供配置项以调整批次大小、预取阈值等参数。
  4. 故障恢复和重试机制:在与 Redis 交互时可能会遇到网络故障或其他异常,需要实现重试逻辑和故障恢复机制。
  5. 监控和告警:实现监控机制,监控 ID 生成的健康状况和性能指标,并在出现问题时及时告警。

以下是考虑到分布式环境的进一步优化代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
java复制代码import org.redisson.api.RedissonClient;
import org.redisson.api.RAtomicLong;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

/**
* @Author derek_smart
* @Date 202/4/25 10:15
* @Description 分布式Id 最终版
* <p>
*/
@Component
public class DistributedIdGenerator {

private final RedissonClient redissonClient;
private final ConcurrentMap<String, AtomicLong> currentIdMap;
private final ConcurrentMap<String, AtomicLong> maxIdMap;
private static final String KEY_PREFIX = "id_generator_";

@Value("${id.generator.batch.size:100}")
private int idBatchSize;

@Value("${id.generator.prefetch.threshold:10}")
private int prefetchThreshold;

@Autowired
public DistributedIdGenerator(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
this.currentIdMap = new ConcurrentHashMap<>();
this.maxIdMap = new ConcurrentHashMap<>();
}

public String generateId(String prefix) {
AtomicLong currentId = currentIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));
AtomicLong maxId = maxIdMap.computeIfAbsent(prefix, k -> new AtomicLong(0));

long id = currentId.getAndIncrement();
if (id >= maxId.get()) {
synchronized (maxId) {
if (id >= maxId.get()) {
fetchNextIdRange(prefix, maxId);
id = currentId.getAndIncrement();
}
}
}

// Prefetch next ID range if necessary
long remainingIds = maxId.get() - currentId.get();
if (remainingIds <= prefetchThreshold) {
fetchNextIdRange(prefix, maxId);
}

return prefix + "_" + id;
}

private void fetchNextIdRange(String prefix, AtomicLong maxId) {
synchronized (maxId) {
// Double-check in case another thread already fetched the range
if (currentIdMap.get(prefix).get() < maxId.get()) {
return;
}
RAtomicLong atomicLong = redissonClient.getAtomicLong(KEY_PREFIX + prefix);
long newMaxId = atomicLong.addAndGet(idBatchSize);
maxId.set(newMaxId);
}
}
}

image.png

在上述代码中,我们通过 @Value 注解从配置文件中读取批次大小和预取阈值,使其更加灵活。同时,我们为 fetchNextIdRange 方法添加了一个额外的参数 maxId,以便在预取时直接操作 AtomicLong 对象,而不是每次都从 maxIdMap 中查询。

此外,为了应对分布式环境中的潜在问题,我们可以添加重试逻辑和故障恢复机制。例如,如果与 Redis 的交互失败,可以实现一个重试策略,重试几次后仍然失败则记录错误日志或触发告警。

监控和告警可以通过外部监控系统(如 Prometheus、Grafana)或内部监控机制(如 Spring Boot Actuator、Micrometer)来实现,以确保及时发现和解决问题。

最后,如果需要进一步提高性能,可以考虑使用 Redis 的 Lua 脚本来减少网络往返时间,并原子地执行 ID 范围的获取和更新操作。

为了使用这个 ID 生成器,你可以在你的服务中注入 DistributedIdGenerator 并调用 generateId 方法:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Service
public class SomeService {

@Autowired
private DistributedIdGenerator idGenerator;

public void doSomething() {
// 生成一个带有自定义前缀 "xx" 的分布式 ID
String distributedId = idGenerator.generateId("xx");
// 使用生成的 ID 进行业务操作
}
}

请注意,为了使上述示例工作,你需要配置和提供一个 RedissonClient 实例。这通常在你的 Spring 配置类中完成,如下所示:

1
2
3
4
5
6
7
8
9
10
java复制代码@Configuration
public class RedissonConfig {

@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}

确保 Redis 服务器正在运行,并且配置中的地址和端口与你的 Redis 服务器设置相匹配。以上代码提供了一些基本的分布式 ID 生成器,可以根据自己的业务需求进行调整和扩展。

SnowflakeId 分布式ID:

理论替换方案:

如果没有redis,可以考虑以下替代方案:

  1. UUID (Universally Unique Identifier):
    UUID 是一个很好的选择,因为它可以在本地生成,不需要网络通信,并且碰撞的概率极低。UUID 是 128 位的值,通常以 36 个字符(包括 4 个破折号)的字符串形式表示。
1
2
3
4
5
6
7
java复制代码 import java.util.UUID;

public class UUIDGenerator {
public static String generateUUID() {
return UUID.randomUUID().toString();
}
}
  1. Snowflake 算法:
    Twitter 的 Snowflake 算法是另一个流行的分布式 ID 生成方法。它生成一个 64 位的长整型 ID,其中包含时间戳、数据中心 ID、机器 ID 和序列号。为了使用 Snowflake 算法,你需要确保每个机器或服务实例有唯一的数据中心 ID 和机器 ID。

你可以使用现成的 Snowflake 实现,或者根据 Snowflake 的原理自己实现一个。
3. 数据库序列:
如果你的应用已经使用了关系数据库,你可以依赖数据库的序列功能来生成唯一 ID。大多数现代关系数据库都支持序列,虽然这种方法会引入数据库作为中心化依赖。
4. 分布式唯一 ID 生成器服务:
如果你的系统架构允许,可以部署一个独立的分布式 ID 生成器服务,如 Twitter 的 Snowflake、Sony 的 Flake 或者 Boundary 的 Flake。这些服务可以独立于你的主应用运行,并通过网络调用来生成 ID。
5. 组合策略:
在某些情况下,你可以组合使用以上方法。例如,使用机器 ID 或 IP 地址作为前缀,再加上本地生成的 UUID 或时间戳和随机数的组合。

SnowflakeId解决方案:

简化版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码
public class SnowflakeIdGenerator {

private final long twepoch = 1288834974657L;
private final long workerIdBits = 5L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;

private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

private long workerId;
private long datacenterId;
private long sequence = 0L;
private long lastTimestamp = -1L;

public SnowflakeIdGenerator(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("Worker ID can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("Datacenter ID can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}

public synchronized long nextId() {
long timestamp = timeGen();

if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}

if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}

lastTimestamp = timestamp;

return ((timestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}

private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}

private long timeGen() {
return System.currentTimeMillis();
}

public static void main(String[] args) {
SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(0, 0);
for (int i = 0; i < 1000; i++) {
long id = idGenerator.nextId();
System.out.println(id);
}
}
}

在这个实现中,我们创建了一个 SnowflakeIdGenerator 类,它需要两个参数:workerId 和 datacenterId。这两个参数用于在分布式环境中区分不同的工作节点和数据中心。

Snowflake 算法生成的是一个 64 位的长整型 ID,其中包含:

  • 1 位未使用(因为 Java 中的长整型是有符号的)
  • 41 位时间戳(毫秒级)
  • 5 位数据中心 ID
  • 5 位工作节点 ID

优化版:

SnowflakeIdGenerator 类以更好地处理分布式和高并发的情况,我们可以考虑以下几点:

  1. 确保时钟回拨问题的处理:在分布式系统中,时钟回拨是一个常见问题。我们需要确保在时钟回拨时,ID 生成器能够正常工作或者抛出异常来避免生成重复的 ID。
  2. 使用原子操作:使用 AtomicLong 来替代 synchronized 方法,以提高并发性能。
  3. 确保每个实例的唯一性:在分布式环境中,每个实例都应该有一个唯一的工作节点 ID 和数据中心 ID。这通常可以通过配置文件或环境变量来设置。

下面是优化后的 SnowflakeIdGenerator 类的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
java复制代码import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

/
* @Author derek_smart
* @Date 202/4/26 10:25
* @Description SnowflakeId id 生成器
* <p>
*/
@Component
public class SnowflakeIdGenerator {

private final long twepoch = 1288834974657L;
private final long workerIdBits = 5L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;

private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

private final long workerId;
private final long datacenterId;
private final AtomicLong sequence = new AtomicLong(0L);
private final AtomicLong lastTimestamp = new AtomicLong(-1L);

public SnowflakeIdGenerator(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("Worker ID can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("Datacenter ID can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}

//异步单个
public CompletableFuture<Long> nextIdAsync() {
return CompletableFuture.supplyAsync(this::nextId);
}
//批量
public List<Long> nextIds(int count) {
List<Long> ids = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
ids.add(nextId());
}
return ids;
}
//异步 多个
public CompletableFuture<List<Long>> nextIdsAsync(int count) {
return CompletableFuture.supplyAsync(() -> nextIds(count));
}

//获取
public long nextId() {
long currentTimestamp = -1L;
long lastTimestampValue = -1L;

while (true) {
lastTimestampValue = lastTimestamp.get();
currentTimestamp = timeGen();

if (currentTimestamp < lastTimestampValue) {
// Clock moved backwards, handle the exception
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestampValue - currentTimestamp));
}

if (currentTimestamp == lastTimestampValue) {
long currentSequence = sequence.incrementAndGet() & sequenceMask;
if (currentSequence == 0) {
currentTimestamp = waitNextMillis(currentTimestamp);
} else {
return generateId(currentTimestamp, currentSequence);
}
} else {
if (sequence.compareAndSet(sequence.get(), 0L)) {
return generateId(currentTimestamp, sequence.get());
}
}
}
}

private long generateId(long currentTimestamp, long sequence) {
return ((currentTimestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}

private long waitNextMillis(long currentTimestamp) {
while (currentTimestamp <= lastTimestamp.get()) {
currentTimestamp = timeGen();
}
lastTimestamp.set(currentTimestamp);
return currentTimestamp;
}

private long timeGen() {
return System.currentTimeMillis();
}
}

1714105622027.png

在这个优化版本中,我们使用 AtomicLong 替代了 synchronized 方法,以提高并发性能。nextId 方法现在是无锁的,使用原子操作来确保序列号的正确递增,并且在时钟回拨时抛出异常。
我们还添加了 waitNextMillis 方法来确保在同一毫秒内序列号用完时,能够等待到下一个毫秒值。generateId 方法是一个帮助方法,用于基于时间戳、数据中心 ID、工作节点 ID 和序列号生成 ID。

请注意,为了在分布式环境中使用此 ID 生成器,你需要确保每个实例都有唯一的工作节点 ID 和数据中心 ID,这通常可以通过配置文件或环境变量来设置。
此外,如果你的系统对时钟回拨非常敏感,可以添加额外的逻辑来处理这种情况,例如使用 NTP(Network Time Protocol)来同步服务器时间。

强健版:

最后我们可以实现批量生成和双缓冲策略及更加强健版本的BufferedSnowflakeIdGenerator

BufferedSnowflakeIdGenerator 并解决潜在的错误处理和资源管理问题,
我们可以使用 ExecutorService 来管理后台线程,以及添加额外的逻辑来确保在系统关闭时正确地关闭和清理这些线程。此外,我们将添加异常处理来确保在填充缓冲区时发生的任何异常都能被捕获和处理。以下是优化后的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
java复制代码import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/
* @Author derek_smart
* @Date 202/4/26 10:45
* @Description 双重缓存 SnowflakeId id 生成器
* <p>
*/
@Component
public class BufferedSnowflakeIdGenerator {

private final Lock bufferLock = new ReentrantLock();
private final SnowflakeIdGenerator snowflakeIdGenerator;
private AtomicReference<long[]> bufferOne;
private AtomicReference<long[]> bufferTwo;
private AtomicLong bufferIndex;
private AtomicReference<long[]> currentBuffer;
private final ExecutorService executorService;

private final int bufferSize;

private String customPrefix;

public BufferedSnowflakeIdGenerator(long workerId, long datacenterId, int bufferSize) {
this.snowflakeIdGenerator = new SnowflakeIdGenerator(workerId, datacenterId);
this.bufferSize = bufferSize;
this.bufferOne = new AtomicReference<>(new long[bufferSize]);
this.bufferTwo = new AtomicReference<>(new long[bufferSize]);
this.bufferIndex = new AtomicLong(-1);
this.currentBuffer = new AtomicReference<>(bufferOne.get());
this.executorService = Executors.newSingleThreadExecutor();

// Initial buffer fill
fillBuffer(bufferOne.get());
fillBuffer(bufferTwo.get());
}

public BufferedSnowflakeIdGenerator(long workerId, long datacenterId, int bufferSize, String customPrefix) {
this.snowflakeIdGenerator = new SnowflakeIdGenerator(workerId, datacenterId);
this.bufferSize = bufferSize;
this.bufferOne = new AtomicReference<>(new long[bufferSize]);
this.bufferTwo = new AtomicReference<>(new long[bufferSize]);
this.bufferIndex = new AtomicLong(-1);
this.currentBuffer = new AtomicReference<>(bufferOne.get());
this.executorService = Executors.newSingleThreadExecutor();
this.customPrefix = customPrefix;
// Initial buffer fill
fillBuffer(bufferOne.get());
fillBuffer(bufferTwo.get());
}


private void fillBuffer(long[] buffer) {
try {
for (int i = 0; i < bufferSize; i++) {
buffer[i] = snowflakeIdGenerator.nextId();
}
} catch (Exception e) {
// Handle exceptions during buffer fill here
e.printStackTrace();
}
}

/
* 带自定义
* @return
*/
public String nextIdWithPrefix() {
return customPrefix + "_" + nextId();
}

/*
* 不带自定义表示
* @return
*/
public long nextId() {
long index = bufferIndex.incrementAndGet();
if (index < bufferSize) {
return currentBuffer.get()[(int) index];
} else {
bufferLock.lock();
try {
// Double check inside the lock
index = bufferIndex.get();
if (index >= bufferSize) {
currentBuffer.set(currentBuffer.get() == bufferOne.get() ? bufferTwo.get() : bufferOne.get());
bufferIndex.set(0);
index = 0;
// Trigger buffer refill asynchronously
long[] bufferToFill = currentBuffer.get() == bufferOne.get() ? bufferTwo.get() : bufferOne.get();
executorService.submit(() -> fillBuffer(bufferToFill));
}
} finally {
bufferLock.unlock();
}
return currentBuffer.get()[(int) index];
}
}

public void shutdown() {
executorService.shutdown();
// Optionally add code to wait for termination and handle tasks that could not be executed
}

// Ensure to call shutdown when the application stops
protected void finalize() throws Throwable {
try {
shutdown();
} finally {
super.finalize();
}
}
}

1714105790485.png

BufferedSnowflakeIdGenerator包含两个缓冲区bufferOne和bufferTwo,以及一个指示当前使用的缓冲区的 currentBuffer。bufferIndex` 跟踪当前缓冲区中的位置。

nextId 方法检查 bufferIndex 是否在 bufferSize 范围内。如果是,它返回当前缓冲区中的 ID。如果 bufferIndex 超出范围,则尝试通过锁来切换缓冲区,并在另一个线程中异步填充已耗尽的缓冲区。

使用了 ExecutorService 来管理后台线程,这样我们就可以更优雅地处理线程的生命周期。fillBuffer 方法包含了异常处理逻辑,以便在填充缓冲区时捕获并处理任何异常。

我们还添加了 shutdown 方法来关闭 ExecutorService。这个方法应该在应用程序停止时被调用,以确保所有后台任务都被正确清理。为了防止忘记调用 shutdown 方法,我们在类的 finalize 方法中也调用了 shutdown,这样即使忘记了显式调用,垃圾收集器在回收对象时也会尝试关闭线程池。

请注意,依赖 finalize 方法来关闭资源并不是一种推荐的做法,因为 finalize 方法的调用时机是不确.

关于上面的各类id 生成,可以采用工厂模式来创建 ID 生成器的实例,并且使用单例模式确保每个节点上 ID 生成器的唯一性。同时,我们可以使用策略模式来允许在不同的 ID 生成策略(如 Snowflake、UUID 等)之间灵活切换。

扩展设计模式:

上面都是各种实现类,完全可以使用工厂设计模式将其统一处理。只是一个简化版。

以下是结合设计模式的分布式 ID 生成器的代码实现:
首先,定义一个 ID 生成策略接口:

1
2
3
java复制代码public interface IdGenerationStrategy {
long generateId();
}

然后,实现 Snowflake 算法作为一个策略:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class SnowflakeStrategy implements IdGenerationStrategy {

// ... (其他成员变量和构造函数保持不变)

@Override
public long generateId() {
return nextId();
}

// ... (nextId, waitNextMillis, timeGen, generateId 方法保持不变)
}

接下来,如果需要,也可以实现一个基于 UUID 的策略:

1
2
3
4
5
6
7
8
9
10
11
java复制代码import java.util.UUID;

public class UUIDStrategy implements IdGenerationStrategy {

@Override
public long generateId() {
// UUIDs are 128-bit values, but we need to return a long (64-bit)
// This can be done by taking the hash of the UUID for example
return UUID.randomUUID().toString().hashCode();
}
}

现在,创建一个 ID 生成器工厂,它根据配置或环境来实例化相应的策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class IdGeneratorFactory {

private static IdGenerationStrategy idGeneratorStrategy;

public static synchronized IdGenerationStrategy getIdGeneratorStrategy() {
if (idGeneratorStrategy == null) {
// The choice of strategy can be made configurable
// For example, based on system properties, configuration files, etc.
String strategyType = System.getProperty("id.generator.strategy", "snowflake");
if ("snowflake".equals(strategyType)) {
long workerId = ...; // Retrieve from configuration
long datacenterId = ...; // Retrieve from configuration
idGeneratorStrategy = new SnowflakeStrategy(workerId, datacenterId);
} else if ("uuid".equals(strategyType)) {
idGeneratorStrategy = new UUIDStrategy();
} else {
throw new IllegalArgumentException("Unknown ID Generator strategy type");
}
}
return idGeneratorStrategy;
}
}

最后,客户端代码可以通过工厂获取 ID 生成器实例,并生成 ID:

1
2
3
4
5
6
7
8
java复制代码public class IdGenerationClient {

public static void main(String[] args) {
IdGenerationStrategy idGenerator = IdGeneratorFactory.getIdGeneratorStrategy();
long id = idGenerator.generateId();
System.out.println("Generated ID: " + id);
}
}

在这个设计中,IdGeneratorFactory 确保了在每个节点上 ID 生成器的唯一性,并且提供了一个全局访问点。IdGenerationStrategy 接口和它的实现类 SnowflakeStrategy 以及 UUIDStrategy 允许在不同的 ID 生成策略之间灵活切换。

这种设计模式的组合提供了良好的扩展性和可维护性,使得添加新的 ID 生成策略变得非常简单,只需要实现 IdGenerationStrategy 接口即可。同时,它也支持分布式环境,因为可以通过配置为不同的节点分配不同的工作节点 ID 和数据中心 ID,从而使得在 Snowflake 策略中生成的 ID 具有全局唯一性。
以上就是分布式id 实现。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Oracle 数据迁移同步优化(三)

发表于 2024-04-26

简述

CloudCanal 最近再次对其 Oracle 源端数据同步进行了一系列优化,这些优化基于用户在真实场景中的反馈,具备很强的生产级别参考意义。

本文将简要介绍这些优化项,希望带给读者一些收获。

  • 增量事件 SCN 乱序问题
  • MISSING_SCN 事件干扰
  • 新增的归档日志消费模式

优化点

增量事件 SCN 乱序问题

Oracle 源端 Logminer 数据同步原理大致如下:

  • 获取所有包含当前 SCN 位点的 Redo 或 Archive 日志文件,并添加到 Logminer 中
  • 计算本次需要分析的 SCN 范围(START_SCN, END_SCN)
  • Logminer 对于 SCN 范围进行日志分析,分析结果展现在 V$LOGMNR_CONTENTS 视图中
  • 扫描 V$LOGMNR_CONTENTS 视图,转换处理后同步到目标端

image.png

老版本 CloudCanal 扫描 V$LOGMNR_CONTENTS 视图时指定了 SCN 范围进行查询,但在实际客户场景中偶发 SCN 乱序问题。

同时 Oracle 官方也建议查询视图时不要进行过多的范围过滤或排序处理,以避免查询结果乱序。

因此我们首先 进行了 2 个优化 ,以此解决该问题:

  • 扫描 V$LOGMNR_CONTENTS 视图时直接查询所有记录,其 SCN 范围完全依赖于 Logminer 所指定的文件
  • 设定 Logminer 分析的步长参数(logMiningScnStep)控制分析性能

MISSING_SCN 事件干扰

使用 Logminer 分析 Redo 日志时,有时会出现 MISSING_SCN 事件,老版本 CloudCanal 遇到该事件则会忽略,但这会导致事件漏扫从而丢数据。

MISSING_SCN 事件具体意义为

  • Logminer 分析 Redo 日志时,由于日志切换或其他特殊情况,导致部分 SCN 事件没有被 Logminer 分析到,因此在 V$LOGMNR_CONTENTS 视图中体现为
    MISSING_SCN。

因此我们做了 第 3 个优化,当遇到 MISSING_SCN 事件时采取一定的策略规避漏扫问题,具体动作为:

  • 停止扫描,回退当前 SCN
  • 根据当前 SCN 重新分析和消费日志文件

image.png

重新分析后,缺失的 SCN 记录会被 Logminer 分析到,并且此类型事件出现频率较小,因此对同步效率影响非常小。

归档日志消费模式

Logminer 分析 Redo 日志时,如果 END_SCN 与最新 SCN 接近,可能会导致部分 SCN 无法被 Logminer 分析,从而出现数据丢失。

这种情况难以避免,因为很难在 Logminer 层面确定是否有 SCN 被漏掉。

CloudCanal 老版本通过设置 fallBackScnStep 参数与最新的 SCN 保持一定距离,这种做法虽牺牲了一部分实时性,但换取了数据的准确性,而该方式和 只消费归档日志模式 有一定的相似性。

归档日志不会再发生变化,从而能够保证 Logminer 分析的准确性,对于不太注重实时性的业务(比如日报),这是一个可接受的方式(增量同步的好处不光只是实时性)。

CloudCanal 第 4 个优化 即增加了只消费归档日志模式(参数:archiveLogOnlyMode)。

在该模式下, 同步任务会根据 Archive 日志文件 + SCN 双位点 的方式,以 Archive 生成的时间顺序逐个消费,这样可以保证不漏扫任何一个 Archive 文件。

image.png

未来展望

优化性能

本次优化侧重于数据的准确性,优化了 SCN 乱序问题、MISSING_SCN 问题,但部分高并发场景回退 SCN 可能会导致性能下降。

所以优化性能是后续 CloudCanal Oracle 数据同步重要的一个方向。

数据订正能力

Oracle 部署形态多样,用户场景不一,数据类型复杂,在做足事前防范工作之后,事后如何补救也是非常重要的能力。

借助 CloudCanal 数据校验订正体系,后续丰富和优化 Oracle 源端数据校验和订正能力是一个重要的工作。

总结

本篇文章主要介绍 CloudCanal 对于 Oracle 源端数据同步的深度优化,希望对读者有所帮助。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 21 虚拟线程使用与最佳实现 前言 创建虚拟线程的

发表于 2024-04-26

前言

虚拟线程(Virtual Thread)(Go语言里叫协程),是Java 19引入的一种轻量级线程,在Java21 转正。

在理解虚拟线程前,我们先回顾一下线程的特点:

  • 线程是由操作系统创建并调度的资源;
  • 线程切换会耗费大量CPU时间;
  • 一个系统能同时调度的线程数量是有限的,通常在几百至几千级别。

因此,我们说线程是一种重量级资源,我们为了增加系统的吞吐量,要不断增加线程的数量,但机器的线程是昂贵的、可用线程数量也是有限的。即使我们使用了各种线程池来最大化线程的性价比,但是线程往往会在 CPU、网络或者内存资源耗尽之前成为我们应用程序的性能提升瓶颈,不能最大限度的释放硬件应该具有的性能。

虚拟线程就是为了解决以上问题,最大限度释放硬件性能,但虚拟线程最适合具有高延迟的任务,例如 I/O 操作、等待锁或线程将花费大量时间等待的任何其他操作,而需要连续计算的CPU密集型场景,并不适合虚拟线程。

以下是平台线程与虚拟线程的关系图
image.png

创建虚拟线程的方式

三种方式效果均一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码/**
* 方式1:传入Runnable实例并立刻运行
*/
@Test
public void test1() {
// 方式1:传入Runnable实例并立刻运行:
Thread vt = Thread.startVirtualThread(() -> {
System.out.println("Start virtual thread...");
});

//这样也行
Thread.ofVirtual().start(() -> {
System.out.println("Start virtual thread...");
});
}

/**
* 方式2:传入Runnable实例并后置运行
*/
@Test
public void test2() {
Thread vt = Thread.ofVirtual().unstarted(() -> {
System.out.println("Start virtual thread...");
});
vt.start();
}

/**
* 方式3:创建ThreadFactory
*/
@Test
public void test3() {
ThreadFactory tf = Thread.ofVirtual().factory();
Thread vt = tf.newThread(() -> {
System.out.println("Start virtual thread...");
});
vt.start();
}


/**
* 方式4:使用ExecutorService (但是每次submit仍然是新创建虚拟线程 java.util.concurrent.ThreadPerTaskExecutor)
*/
@Test
public void test4() {
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

executor.submit(() -> {
System.out.println("Start virtual thread...");
return true;
});
}

使用虚拟线时还需要线程池吗

答案是,不需要,使用虚拟线程时,每次都创建新的即可,以下是官方文档描述

Do not pool virtual threads

Developers will typically migrate application code to the virtual-thread-per-task ExecutorService from a traditional thread-pool based ExecutorService. A thread pool, like any resource pool, is intended to share expensive resources, but virtual threads are not expensive so there is never a need to pool them.

Developers sometimes use thread pools to limit concurrent access to limited resources. For example, if a service cannot handle more than 20 concurrent requests then making all requests to the service via tasks submitted to a thread pool of size 20 will ensure that. This idiom has become ubiquitous because the high cost of platform threads has made thread pools ubiquitous, but do not be tempted to pool virtual threads in order to limit concurrency. Instead use constructs specifically designed for that purpose, such as semaphores.

In conjunction with thread pools, developers sometimes use thread-local variables to share expensive resources among multiple tasks that share the same thread. For example, if a database connection is expensive to create then you can open it once and store it in a thread-local variable for later use by other tasks in the same thread. If you migrate code from using a thread pool to using a virtual thread per task, be wary of usages of this idiom since creating an expensive resource for every virtual thread may degrade performance significantly. Change such code to use alternative caching strategies so that expensive resources can be shared efficiently among a very large number of virtual threads.

这里有一篇更详细的例子说明:zhuanlan.zhihu.com/p/671154148

最佳实现

  1. 若IO密集型的项目中使用到CompletableFuture,可以直接将自定义线程池替换成Executors.newVirtualThreadPerTaskExecutor()
  2. 使用虚拟线程时,synchronized 改为 ReentrantLock,以减少虚拟线程被固定到平台线程
  3. 使用虚拟线程时,不需要池化
  4. 虚拟线程池适合IO密集型应用,CPU密集型还是需要用平台线程池
  5. 结合java21的结构化并发写法,代码更具可读性,StructuredTaskScope底层用的就是虚拟线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码try (var scope = new StructuredTaskScope<>()) {
// 使用fork方法派生线程来执行子任务
StructuredTaskScope.Subtask<String> future1 = scope.fork(() -> "111");
StructuredTaskScope.Subtask<Integer> future2 = scope.fork(() -> 222);

// 等待线程完成
scope.join();
// 结果的处理可能包括处理或重新抛出异常

System.out.println(future1.get());
System.out.println(future2.get());

} // close
catch (InterruptedException e) {
throw new RuntimeException(e);
}

参考文章:
mp.weixin.qq.com/s/yyApBXxpX…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…121314…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%