开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

HTTP协议中的Vary头,怎么用?

发表于 2021-11-30

本文正在参与 “网络协议必知必会”征文活动


一、Vary 解决什么问题?

浏览器 或 代理服务器 会根据 请求URL 生成 key 来建立与缓存文件的一一对应关系,这样后面相同的请求才能找到并使用缓存。

1、问题

为了应对不同设备请求(譬如电脑和手机),网站对于相同 URL 采用不同页面来响应,这会让创建缓存文件的工作陷入混乱,让手机来显示电脑版网页缓存,必然会出现错误。

2、解决方案

用 Vary 来区分,不同设备的相同URL请求,完美解决这个混乱。Vary 是在 HTTP 协议的 1.1 版本中新添加功能。详细说明,见下面 Vary: User-Agent。


3、Vary 语法
1
2
makefile复制代码Vary: *
Vary: <header-name>, <header-name>, ...
  • * :表示不使用缓存,不推荐使用。用 Cache-Control: no-store 表示不使用缓存更合适,因为这样的代码阅读更清晰明了。
  • header-name :表示 HTTP 头,可以有多个。

二、实际案例
1、Vary:Content-Encoding

问题说明:相同页面在 HTTP 传输过程中使用了不同压缩算法,如果不能正确区分,会造成页面打开错误。

解决方案:服务器返回 Vary:Content-Encoding 来建立跟压缩算法相关的缓存;客户端通过发送 Accept-Encoding:br 明确需要的算法类型,从而获得正确的缓存资源。

  • Client1 向代理服务器请求 /doc 页面,共享缓存Cache为空,向源服务器发请求,代理服务器收到页面数据并发送给Client1,同时缓存此次页面;
  • Client2 向代理服务器请求 /doc 页面,代理服务器查找无对应缓存,向源服务器发请求来响应Client2,代理服务器收到页面数据并发送给Client2,同时缓存此次页面;
  • Client3 向代理服务器请求 /doc 页面,代理服务器查找到缓存,直接把缓存发送给Client3.

HTTP-Vary


2、Vary: User-Agent
  • 问题说明:

网站为了让 电脑 和 手机 都能访问,通常会有两个版本。手机先访问网站,产生了手机版缓存,之后电脑也访问了,获得了手机版缓存,这样显示页面必然会产生错误。

  • 解决方案:

服务器返回 Vary: User-Agent 针对不同设备生成不同缓存。客户端请求时带上 User-Agent 头,明确设备类型。完美解决了,过程跟上图类似,只是创建和寻找缓存的依据,换成了 User-Agent。


三、参考文档
  • Vary - MDN
  • Caching - MDN
  • Content_negotiation - MDN

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

懒人畅听网,有声小说类目数据采集,多线程速采案例,Pytho

发表于 2021-11-30

「这是我参与11月更文挑战的第30天,活动详情查看:2021最后一次更文挑战」


theme: vue-pro

多线程在 Python 爬虫学习过程中应用落地,提速,提速,再提速。

目标站点分析

本次要抓取的目标为懒人畅听网,其中我随机选择了一个分类,有声小说频道,其余频道可使用雷同的办法抓取,增加遍历之后,可以对全站进行抓取。
懒人畅听网,有声小说类目数据采集,多线程速采案例,Python爬虫120例之23例
列表页分页规则如下
本次依旧只对列表页数据进行提取,只增加多线程模块 threading 的应用,提高采集效率。

1
2
txt复制代码http://www.lrts.me/book/category/1/recommend/1/20
http://www.lrts.me/book/category/1/recommend/2/20

提取规则模板如下:

1
txt复制代码http://www.lrts.me/book/category/1/recommend/页码/20

全站页码数,可以直接人眼读取,如果增加动态获取,提取读取一下分页处数据即可。

提取最终的数据源如下图所示,包括书名,作者,主播三部分内容。

懒人畅听网,有声小说类目数据采集,多线程速采案例,Python爬虫120例之23例

编码时间

本次案例中对于多线程部分,除共享全局变量外,增加信号量机制,即限制线程并发数量。

信号量机制的简单 Demo 如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
python复制代码
import threading
import time


def run(n, semaphore):
# 加锁
semaphore.acquire()
time.sleep(2)
print(f'正在运行线程{n}')
# 释放锁
semaphore.release()


if __name__ == '__main__':
num = 0
# 最多允许 3 个线程同时运行
semaphore = threading.BoundedSemaphore(3)
for i in range(10):
t = threading.Thread(target=run, args=(f'线程号:{i}', semaphore))
t.start()
while threading.active_count() != 1:
pass
else:
print('所有线程运行完毕')

运行代码,会发现先运行 3 个线程,再运行 3 个线程,当然同时运行的线程之间是没有先后顺序的。

懒人畅听网,有声小说类目数据采集,多线程速采案例,Python爬虫120例之23例
信号量,即使用 threading 模块的 BoundedSemaphore 类,该类可以设置允许一定数量的线程更改数据,即最多可同时运行几个线程。

代码完整案例如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
python复制代码import threading
from threading import Lock,Thread
import random,requests
from lxml import etree

def get_headers():
uas = [
"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
]
ua = random.choice(uas)
headers = {
"user-agent": ua,
"referer": "https://www.baidu.com/"
}
return headers


def run(url,semaphore):
headers = get_headers()
semaphore.acquire() #加锁
res = requests.get(url,headers=headers,timeout=5)
if res:
text = res.text
element = etree.HTML(text)
titles = element.xpath('//a[@class="book-item-name"]/text()')
authors = element.xpath('//a[@class="author"]/text()')
weakens = element.xpath('//a[@class="g-user-shutdown"]/text()')
save(url,titles,authors,weakens)


semaphore.release() #释放

def save(url,titles,authors,weakens):
data_list = zip(titles,authors,weakens)
for item in data_list:
with open("./data.csv","a+",encoding="utf-8") as f:
f.write(f"{item[0]},{item[1]},{item[2]}\n")
print(url,"该URL地址数据写入完毕")
if __name__== '__main__':
lock = Lock()
url_format = 'https://www.lrts.me/book/category/1/recommend/{}/20'
# 拼接URL,全局共享变量
urls = [url_format.format(i) for i in range(1, 1372)]
l = []
semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
for url in urls:
t = threading.Thread(target=run,args=(url,semaphore))
t.start()
while threading.active_count() !=1:
pass
else:
print('所有线程运行完毕')

代码中 threading.active_count() 部分,用于检测是否存在活跃线程,如无,程序结束。

运行代码,得到如下结果,至此第 23 例已经学习完毕。

懒人畅听网,有声小说类目数据采集,多线程速采案例,Python爬虫120例之23例

收藏时间

代码仓库地址:codechina.csdn.net/hihell/pyth…,去给个关注或者 Star 吧。

==来都来了,不发个评论,点个赞,收个藏吗?==

今天是持续写作的第 203 / 365 天。
可以关注我,点赞我、评论我、收藏我啦。


本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

(一)初识Java线程 程序、进程、线程 Thread in

发表于 2021-11-30

程序、进程、线程

程序

程序是「静态」的,存放在硬盘中的可执行文件,包括「代码指令」和「数据」

进程

进程是「运行中的程序」,是程序的一次启动和执行:操作系统从磁盘载入程序到内存,分配必要的资源,开始运行程序的指令

进程的组成:「代码段」,「数据段」,「PCB」(id,name,status,priority,物理地址,context,file,others)(描述信息,调度信息,资源信息,上下文)

线程

线程是“进程代码段”的一段「执行流程」

线程的组成:「PC」,「栈内存」(方法的调用和返回对应了栈帧的入栈和出栈),「线程基本信息」(id,name,status,priority,others)

进程和线程的区别

  • 一个进程至少有一个线程
  • 线程之间共享同一块地址空间(方法去内存,堆内存),打开的文件和其他资源,故上下文切换速度较快
  • 调度执行的基本单位:线程
  • 资源分配的基本单位:进程

Thread in Java

Thread类

image.png

public void start()用来启动一个线程,当调用start()方法后,JVM才会开启一个新的线程来执行用户定义的线程代码逻辑,在这个过程中会为相应的线程分配需要的资源

public void run()是线程代码的逻辑入口,不是给用户程序来调用的,当调用start()方法之后,只要该线程获得了CPU时间片,就会执行run()方法

创建一个空线程

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class Nov29 {
public static void main(String[] args) {
Thread emptyThread = new Thread("empty thread");
emptyThread.setPriority(5);
System.out.println("emptyThread.getId() = " + emptyThread.getId());
System.out.println("emptyThread.getName() = " + emptyThread.getName());
System.out.println("emptyThread.getPriority() = " + emptyThread.getPriority());
emptyThread.start();
System.out.println("end of emptyThread");
}
}

Thread类中,由于target对象为null,run()方法什么也没做就结束了

1
2
3
4
5
6
typescript复制代码@Override
public void run() {
if (target != null) {
target.run();
}
}

继承Thread类

  1. 继承Thread类
  2. 重写run()方法
1
2
3
4
5
6
java复制代码public class FirstThread extends Thread{
@Override
public void run(){
System.out.println(" run() function in FirstThread ");
}
}
1
2
3
4
5
6
java复制代码public class Nov29 {
public static void main(String[] args) {
Thread firstThread = new FirstThread();
firstThread.start();
}
}

实现Runnable接口

  1. 定义一个类实现Runnable接口
  2. 实现run()方法
  3. 通过Thread类的构造方法public Thread(Runnable target)传入该类

函数式接口:有且仅有一个方法的接口

1
2
3
4
5
6
java复制代码public class SecondThread implements Runnable{
@Override
public void run() {
System.out.println(" run() in SecondThread ");
}
}
1
2
3
4
5
6
7
java复制代码public class Nov29 {
public static void main(String[] args) {
SecondThread secondThread = new SecondThread();
Thread thread = new Thread(secondThread);
thread.start();
}
}

对比:

  1. 通过继承Thread类来创建线程,可以直接使用Thread类的实例的方法
  2. 实现Runnable接口来创建线程则不能直接调用Thread类的实例的方法,需要通过Thread类的静态方法public Thread currentThread()来获取当前类
  3. 在实际开发中,由于Java不允许多继承,所以一般使用实现Runnable接口的方法来创建线程
  4. 通过实现Runnable接口的方法来创建线程能更好地实现数据和逻辑的分离
  5. Thread继承了Runnable接口,其中的run()方法无返回值,若需要异步获取程序的执行结构,应该采用实现Callable的方式

数据和逻辑的分离

todo:这个不太理解,之后需要细看

匿名内部类

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class ThirdThread {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("new a thread using anonymous inner class");
System.out.println("implement method in Runnable");
}
}).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
java复制代码class ThirdThread {
public static void main(String[] args) {
new Thread() {
@Override
public void run() {
System.out.println("new a thread using anonymous inner class");
System.out.println("override method in Thread");
}
}.start();
}
}

Lambda表达式

一行代码搞定

1
2
3
4
5
java复制代码public class Nov29 {
public static void main(String[] args) {
new Thread(() -> System.out.println("new a thread using lambda")).start();
}
}

实现Callable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class FourthThread {
// 1. 新建Callable实现类
static class RunnableTask implements Callable<Long> {
// 2. 重写其中的call()方法,可以有返回值
@Override
public Long call() throws Exception {
Thread.sleep(3000);
return Long.MAX_VALUE;
}
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 3. 使用FutureTask实例作为Thread构造器的target入参,构造新的Thread线程实例。
RunnableTask runnableTask = new RunnableTask();
FutureTask<Long> longFutureTask = new FutureTask<>(runnableTask);
Thread thread = new Thread(longFutureTask);
// 4. 调用Thread实例的start()方法启动新线程,启动新线程的run()方法并发执行
// 其内部的执行过程为:Thread实例的run(),FutureTask实例的run()方法,Callable实现类的call()方法
thread.start();
// 5. 调用FutureTask对象的get()方法阻塞性地获得并发线程的执行结果
System.out.println("longFutureTask.get() = " + longFutureTask.get());
}
}

使用线程池

不推荐使用 Executors 创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class ThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("无返回值execute()");
}
});
System.out.println("有返回值submit()" + pool.submit(new FourthThread.RunnableTask()).get());
}
}
  • execute() ,无返回值,参数只能是Runnable实现类,不能异步控制只能等其自动停止
  • submit() ,有返回值,参数可以是有返回值的Callable实现类,可以异步控制

Java线程原理

Java线程和操作系统线程

Java虚拟机把Java线程一对一映射到操作系统线程,线程的调度交给操作系统

在JVM看来,操作系统视角下的 running 状态和 ready 状态统一称为 runnable 状态

优先级

Java线程具有优先级,优先级高的获得调度机会统计规律上大于优先级低的线程,但不绝对

生命周期

image.png

1
2
3
4
5
6
7
8
java复制代码public enum State {
NEW,
RUNNABLE, // start()方法调用后,就是runnable状态,但并不一定立刻获得CPU时间片执行run()方法
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED; // 抛出异常或run()方法执行完
}

Java线程基本操作

操作

  • sleep:使线程休眠
  • yield:让出CPU时间片,让操作系统重新调度一次
  • interrupt:将线程状态设置为中断,不会像stop()方法那样中止一个线程,线程状态设置为中断后线程自己判断是继续执行还是终止
  • join:合并两个线程,让其中一个等待另一个执行完毕再执行;或者设置一个时间,时间到了就不等了
  • daemon:守护线程(为用户进程提供服务的进程,与用户进程的关系是和JVM线程的关系一个是主动一个是被动)

调试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
sh复制代码➜  ~ jps
1217 Launcher
1218 ThreadSleep
1220 Jps
807
➜ ~ jstack 1218
2021-11-30 11:20:29
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.301-b09 mixed mode):

"Attach Listener" #12 daemon prio=9 os_prio=31 tid=0x00007fbfcc9fd800 nid=0x4503 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"DestroyJavaVM" #11 prio=5 os_prio=31 tid=0x00007fbfcc9fa000 nid=0x1003 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Thread-0" #10 prio=5 os_prio=31 tid=0x00007fbfca8eb800 nid=0x3c03 waiting on condition [0x000070000d21f000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at ThreadSleep.lambda$main$0(ThreadSleep.java:5)
at ThreadSleep$$Lambda$1/1828972342.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)

"Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007fbfca86b800 nid=0x4703 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #8 daemon prio=9 os_prio=31 tid=0x00007fbfcc04b000 nid=0x4803 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #7 daemon prio=9 os_prio=31 tid=0x00007fbfcc8b5000 nid=0x3703 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #6 daemon prio=9 os_prio=31 tid=0x00007fbfcc04a000 nid=0x3503 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Monitor Ctrl-Break" #5 daemon prio=5 os_prio=31 tid=0x00007fbfca85a000 nid=0x4903 runnable [0x000070000cc0d000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x0000000795713788> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x0000000795713788> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:49)

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fbfcc049000 nid=0x4a03 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fbfca81c800 nid=0x5103 in Object.wait() [0x000070000c901000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000795588ee0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
- locked <0x0000000795588ee0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:216)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fbfcb01b000 nid=0x5203 in Object.wait() [0x000070000c7fe000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000795586c00> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x0000000795586c00> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"VM Thread" os_prio=31 tid=0x00007fbfca817800 nid=0x2b03 runnable

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fbfcc00a000 nid=0x2407 runnable

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fbfcc80e800 nid=0x2003 runnable

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fbfca80e000 nid=0x2203 runnable

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fbfcc80f000 nid=0x2a03 runnable

"VM Periodic Task Thread" os_prio=31 tid=0x00007fbfcc04e000 nid=0x3b03 waiting on condition

JNI global references: 319

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Matplotlib绘制图形 Matplotlib绘制图形

发表于 2021-11-30

这是我参与11月更文挑战的第29天,活动详情查看:2021最后一次更文挑战

Matplotlib绘制图形

基本参数

设置标题 plt.title()

设置坐标轴标签 ply.xlabel() plt.ylabel()

设置坐标轴范围 plt.xlim() plt.ylim()

设置图例 plt.legend()

设置图像大小 plt.figure()

折线图

plot()

参数:

x x轴上的数值

y y轴上的数值

ls 线条风格

lw 线条宽度

c 颜色

label 标签文本

1
2
3
4
5
6
7
8
9
python复制代码import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
df=pd.DataFrame(np.random.randn(5,4),columns=['a','b','c','d'])
x=df.index
y=df['a']
plt.plot(x,y,ls='-',lw=2,c='r',label='a')
plt.legend()
plt.show()

image-20211129170739165

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
python复制代码import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['FangSong'] # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False
df=pd.DataFrame(np.random.randn(5,4),columns=['a','b','c','d'])
x=df.index
y=df['a']
plt.plot(x,y,ls='-',lw=2,c='r')
y=df['b']
plt.plot(x,y,ls='-',lw=2,c='b')
y=df['c']
plt.plot(x,y,ls='-',lw=2,c='y')
y=df['d']
plt.plot(x,y,ls='-',lw=2,c='g')
plt.title('折线图')
# 设置图例
plt.legend(df.columns)
plt.show()

image-20211130093414034

柱状图

bar()

基本参数

x x轴上的数值

y y轴上的数值

color 颜色

label 标签文本

1
2
3
4
5
6
7
8
9
10
python复制代码plt.rcParams['font.sans-serif'] = ['FangSong']  # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False
df=pd.DataFrame(np.random.rand(5,4),columns=['a','b','c','d'])
x=df.index
y=df['a']
plt.ylabel('x')
plt.xlabel('y')
plt.title('柱状图')
plt.bar(x,y,color='b')
plt.show()

image-20211130094904610

条形图(横柱状图)

barh()

1
2
3
4
5
6
7
8
9
10
11
12
13
python复制代码import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['FangSong'] # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False
df=pd.DataFrame(np.random.rand(5,4),columns=['a','b','c','d'])
x=df.index
y=df['a']
plt.ylabel('x')
plt.xlabel('y')
plt.title('柱状图')
plt.barh(x,y,color='b')
plt.show()

image-20211130095741444

散点图

scatter()

1
2
3
4
5
6
7
8
9
10
11
12
13
python复制代码import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['FangSong'] # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False
df=pd.DataFrame(np.random.rand(100,4),columns=['a','b','c','d'])
x=df.index
y=df['a']
plt.ylabel('x')
plt.xlabel('y')
plt.title('散点图')
plt.scatter(x,y,color='b')
plt.show()

image-20211130102821232

s 设置标记的大小

marker 设置标记的形状

1
2
3
4
5
6
7
8
9
10
11
12
13
python复制代码import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['FangSong'] # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False
df=pd.DataFrame(np.random.rand(100,4),columns=['a','b','c','d'])
x=df.index
y=df['a']
plt.ylabel('x')
plt.xlabel('y')
plt.title('散点图')
plt.scatter(x,y,s=5,color='b',marker=6)
plt.show()

image-20211130102756269

marker 参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
typescript复制代码============================== ====== =========================================
marker symbol description
============================== ====== =========================================
``"."`` |m00| point
``","`` |m01| pixel
``"o"`` |m02| circle
``"v"`` |m03| triangle_down
``"^"`` |m04| triangle_up
``"<"`` |m05| triangle_left
``">"`` |m06| triangle_right
``"1"`` |m07| tri_down
``"2"`` |m08| tri_up
``"3"`` |m09| tri_left
``"4"`` |m10| tri_right
``"8"`` |m11| octagon
``"s"`` |m12| square
``"p"`` |m13| pentagon
``"P"`` |m23| plus (filled)
``"*"`` |m14| star
``"h"`` |m15| hexagon1
``"H"`` |m16| hexagon2
``"+"`` |m17| plus
``"x"`` |m18| x
``"X"`` |m24| x (filled)
``"D"`` |m19| diamond
``"d"`` |m20| thin_diamond
``"|"`` |m21| vline
``"_"`` |m22| hline
``0`` (``TICKLEFT``) |m25| tickleft
``1`` (``TICKRIGHT``) |m26| tickright
``2`` (``TICKUP``) |m27| tickup
``3`` (``TICKDOWN``) |m28| tickdown
``4`` (``CARETLEFT``) |m29| caretleft
``5`` (``CARETRIGHT``) |m30| caretright
``6`` (``CARETUP``) |m31| caretup
``7`` (``CARETDOWN``) |m32| caretdown
``8`` (``CARETLEFTBASE``) |m33| caretleft (centered at base)
``9`` (``CARETRIGHTBASE``) |m34| caretright (centered at base)
``10`` (``CARETUPBASE``) |m35| caretup (centered at base)
``11`` (``CARETDOWNBASE``) |m36| caretdown (centered at base)

多图

subplot(1,2,1)

第一个参数表示 几行分布

第二个参数表示 一行记个图

第三个参数表示 图显示的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
python复制代码import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['FangSong'] # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False
df=pd.DataFrame(np.random.rand(100,4),columns=['a','b','c','d'])
x=df.index
y=df['a']
plt.subplot(2,3,1)
plt.scatter(x,y,color='b')
plt.subplot(2,3,2)
y=df['b']
plt.scatter(x,y,s=5,color='b',marker=6)
plt.subplot(2,3,6)
y=df['b']
plt.scatter(x,y,s=5,color='b',marker=6)
plt.show()

image-20211130103350507

subplots_adjust设置图像高宽

subplots 减少区域分块后 设置子图的title 和xy轴

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
python复制代码import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['FangSong'] # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False
df=pd.DataFrame(np.random.rand(100,4),columns=['a','b','c','d'])
x=df.index
y=df['a']
fig,ax=plt.subplots(2,3)
ax1=ax[0][0]
ax1.set_title('1')
ax1.set_ylabel('y')
ax1.set_xlabel('x')
ax1.scatter(x,y,color='b')
ax2=ax[0][2]
ax2.set_title('2')
ax2.scatter(x,y,color='r')
ax3=ax[1][1]
ax3.set_title('3')
ax3.scatter(x,y,color='g')
plt.subplots_adjust(wspace=2,hspace=2)
plt.show()

image-20211130105509309

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

EasyC++40,重载模板

发表于 2021-11-30

大家好,我是梁唐。

这是EasyC++系列的第40篇,来聊聊重载模板。

想要追求更好阅读体验的同学,可以点击访问github仓库:EasyLeetCode。

重载模板

函数模板可以使得同一个函数对不同类型使用,非常地方便。但有的时候类型不同,只是通过模板是没办法解决的, 可能逻辑上也会有所区别,这个时候只是使用模板是无法解决的。

为了满足这种需求,我们可以像是重载函数那样重载模板。和常规的函数一样,重载的模板的函数特征,也就是入参的数量和类型必须有所不同。

举个例子,比如我们之前定义了一个函数模板用来交换两个变量的值。如果我们要交换的不只是变量,而是两个数组,就必须要修改逻辑了。

1
2
3
4
5
C++复制代码template <typename T>
void Swap(T &a, T &b);

template <typename T>
void Swap(T *a, T *b, int n);

可以看到我们额外传入了一个int n,它表示数组的长度。另外,我们入参的类型也发生了变化,不再是模板类型T的引用,而是指针了。因为我们要接收的是一个数组,而数组在函数传递当中都是以指针的形式进行的。所以这里要写成指针,当然也可以写成这样:T a[],两种形式本质上没有区别。

所以我们实现的话会是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
C++复制代码template <typename T>
void Swap(T &a, T &b) {
T temp = a;
a = b;
b = temp;
}

template <typename T>
void Swap(T *a, T *b, int n) {
for (int i = 0; i < n; i++) {
Swap(a[i], b[i]);
}
}

问题

到这里,相信大家也能看出一点问题。

假设我们有这样一个模板函数:

1
2
C++复制代码template <typename T>
void Swap(T a, T b);

虽然理论上类型T是万能类型,什么类型都可以接受。但我们操作的时候会有很多问题,比如我们执行a = b,对于数组类型就会报错。

再比如我们执行a > b,很多类型也无法进行比较大小。再比如进行算术运算等等,很多类型比如指针、数组或者结构体也没办法进行算术运算。

总之模板的功能是很局限的,有的时候只能处理某些类型,很难通用覆盖所有情况。当然有的时候也是有一些其他办法绕开的,比如结构体也可以重载比较运算符,也可以重载一些算术运算符等等。

除此之外,C++当中也提供了另外的解决方案。由于篇幅的限制,我们下次再说~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

LockSupport的park等待的底层实现

发表于 2021-11-30

从上一篇文章中的JDK的延迟队列中,最终是通过LockSupport.park实现线程的等待,那么底层是如何实现等待和超时等待的

LockSupport的park和unpark的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
arduino复制代码public static void park() {
UNSAFE.park(false, 0L);
}

public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}

public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

从上面可以看到实际LockSupport.park是通过Unsafe的的park方法实现,从下面的方法可以看出这个是一个native方法.

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码/**
* Blocks current thread, returning when a balancing
* {@code unpark} occurs, or a balancing {@code unpark} has
* already occurred, or the thread is interrupted, or, if not
* absolute and time is not zero, the given time nanoseconds have
* elapsed, or if absolute, the given deadline in milliseconds
* since Epoch has passed, or spuriously (i.e., returning for no
* "reason"). Note: This operation is in the Unsafe class only
* because {@code unpark} is, so it would be strange to place it
* elsewhere.
*/
public native void park(boolean isAbsolute, long time);

JVM的Unsafe的park方法

从下面JDK中代码中可以thread的Parker的对象的park方法进行一段时间的等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scss复制代码UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
EventThreadPark event;

JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
if (event.should_commit()) {
const oop obj = thread->current_park_blocker();
if (time == 0) {
post_thread_park_event(&event, obj, min_jlong, min_jlong);
} else {
if (isAbsolute != 0) {
post_thread_park_event(&event, obj, min_jlong, time);
} else {
post_thread_park_event(&event, obj, time, min_jlong);
}
}
}
HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END

Thread.hpp的文件中内部定义的Park对象

1
2
3
4
arduino复制代码  private:
Parker _parker;
public:
Parker* parker() { return &_parker; }

下面是Os_posix.cpp中是Linux中实现的Park的park的实现方式

  1. 首先将_counter的变量通过CAS设置为0,返回就旧的值,如果之前是大于0,则说明是允许访问,不用阻塞,直接返回。
  2. 获取当前线程。
  3. 判断线程是否是中断中,如果是,则直接返回,(也就是说线程处于中断状态下会忽略park,不会阻塞等待)
  4. 判断如果传入的time参数小于0 或者 是绝对时间并且time是0,则直接返回,(上面的Unsafe调用park传入的参数是 false、0,所以不满足这种情况)
  5. 如果time大于0,则转换成绝对时间。
  6. 创建ThreadBlockInVM对象,并且调用pthread_mutex_trylock获取线程互斥锁,如果没有获取到锁,则直接返回,
  7. 判断_counter变量是否大于0,如果是,则重置_counter为0,释放线程锁,直接返回。
  8. 调用 OrderAccess::fence(); 加入内存屏障,禁止指令重排序,确保加锁和释放锁的指令的顺序。
  9. 创建OSThreadWaitState对象,
  10. 判断time是否大于0,如果是0,则调用pthread_cond_wait进行等待,如果不是0,然后调用pthread_cond_timedwait进行时间参数为absTime的等待,
  11. 调用pthread_mutex_unlock进行释放_mutex锁,
  12. 再次调用OrderAccess::fence()禁止指令重排序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
scss复制代码 // Parker::park decrements count if > 0, else does a condvar wait.  Unpark
// sets count to 1 and signals condvar. Only one thread ever waits
// on the condvar. Contention seen when trying to park implies that someone
// is unparking you, so don't wait. And spurious returns are fine, so there
// is no need to track notifications.

void Parker::park(bool isAbsolute, jlong time) {

// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(&_counter, 0) > 0) return;

JavaThread *jt = JavaThread::current();

// Optional optimization -- avoid state transitions if there's
// an interrupt pending.
if (jt->is_interrupted(false)) {
return;
}

// Next, demultiplex/decode time arguments
struct timespec absTime;
if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
return;
}
if (time > 0) {
to_abstime(&absTime, time, isAbsolute, false);
}

// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);

// Can't access interrupt state now that we are _thread_blocked. If we've
// been interrupted since we checked above then _counter will be > 0.

// Don't wait if cannot get lock since interference arises from
// unparking.
if (pthread_mutex_trylock(_mutex) != 0) {
return;
}

int status;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait(&_cond[_cur_index], _mutex);
assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT),
status, "cond_wait");
}else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
assert_status(status == 0 || status == ETIMEDOUT,
status, "cond_timedwait");
}
_cur_index = -1;

_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();

Linux操作系统是如何实现pthread_cond_timedwait进行时间等待的

pthread_cond_timedwait函数位于glibc中pthread_cond_wait.c, 可以看到是调用
__pthread_cond_wait_common实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
arduino复制代码/* See __pthread_cond_wait_common.  */
int
___pthread_cond_timedwait64 (pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct __timespec64 *abstime)
{
/* Check parameter validity. This should also tell the compiler that
it can assume that abstime is not NULL. */
if (! valid_nanoseconds (abstime->tv_nsec))
return EINVAL;

/* Relaxed MO is suffice because clock ID bit is only modified
in condition creation. */
unsigned int flags = atomic_load_relaxed (&cond->__data.__wrefs);
clockid_t clockid = (flags & __PTHREAD_COND_CLOCK_MONOTONIC_MASK)
? CLOCK_MONOTONIC : CLOCK_REALTIME;
return __pthread_cond_wait_common (cond, mutex, clockid, abstime);
}

下面__pthread_cond_wait_common是实现通过__futex_abstimed_wait_cancelable64实现时间等待

1
2
3
4
5
6
7
8
9
arduino复制代码static __always_inline int
__pthread_cond_wait_common (pthread_cond_t *cond, pthread_mutex_t *mutex,
clockid_t clockid, const struct __timespec64 *abstime)
{
''省略''`
err = __futex_abstimed_wait_cancelable64 (
cond->__data.__g_signals + g, 0, clockid, abstime, private);
''省略''`
}

__futex_abstimed_wait_cancelable64是调用__futex_abstimed_wait_common

1
2
3
4
5
6
7
8
9
arduino复制代码int
__futex_abstimed_wait_cancelable64 (unsigned int* futex_word,
unsigned int expected, clockid_t clockid,
const struct __timespec64* abstime,
int private)
{
return __futex_abstimed_wait_common (futex_word, expected, clockid,
abstime, private, true);
}

__futex_abstimed_wait_common下面则是通过判断平台是64位或者32位,调用__futex_abstimed_wait_common64或者__futex_abstimed_wait_common32

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
arduino复制代码static int
__futex_abstimed_wait_common (unsigned int* futex_word,
unsigned int expected, clockid_t clockid,
const struct __timespec64* abstime,
int private, bool cancel)
{
int err;
unsigned int clockbit;

/* Work around the fact that the kernel rejects negative timeout values
despite them being valid. */
if (__glibc_unlikely ((abstime != NULL) && (abstime->tv_sec < 0)))
return ETIMEDOUT;

if (! lll_futex_supported_clockid (clockid))
return EINVAL;

clockbit = (clockid == CLOCK_REALTIME) ? FUTEX_CLOCK_REALTIME : 0;
int op = __lll_private_flag (FUTEX_WAIT_BITSET | clockbit, private);

#ifdef __ASSUME_TIME64_SYSCALLS
err = __futex_abstimed_wait_common64 (futex_word, expected, op, abstime,
private, cancel);
#else
bool need_time64 = abstime != NULL && !in_time_t_range (abstime->tv_sec);
if (need_time64)
{
err = __futex_abstimed_wait_common64 (futex_word, expected, op, abstime,
private, cancel);
if (err == -ENOSYS)
err = -EOVERFLOW;
}
else
err = __futex_abstimed_wait_common32 (futex_word, expected, op, abstime,
private, cancel);
#endif

switch (err)
{
case 0:
case -EAGAIN:
case -EINTR:
case -ETIMEDOUT:
case -EINVAL:
case -EOVERFLOW: /* Passed absolute timeout uses 64 bit time_t type, but
underlying kernel does not support 64 bit time_t futex
syscalls. */
return -err;

case -EFAULT: /* Must have been caused by a glibc or application bug. */
case -ENOSYS: /* Must have been caused by a glibc bug. */
/* No other errors are documented at this time. */
default:
futex_fatal_error ();
}
}

__futex_abstimed_wait_common64是调用INTERNAL_SYSCALL_CANCEL宏定义实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
arduino复制代码static int
__futex_abstimed_wait_common64 (unsigned int* futex_word,
unsigned int expected, int op,
const struct __timespec64* abstime,
int private, bool cancel)
{
if (cancel)
return INTERNAL_SYSCALL_CANCEL (futex_time64, futex_word, op, expected,
abstime, NULL /* Unused. */,
FUTEX_BITSET_MATCH_ANY);
else
return INTERNAL_SYSCALL_CALL (futex_time64, futex_word, op, expected,
abstime, NULL /* Ununsed. */,
FUTEX_BITSET_MATCH_ANY);
}

系统调用的的宏定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ini复制代码/* Issue a syscall defined by syscall number plus any other argument
required. Any error will be returned unmodified (including errno). */
#define INTERNAL_SYSCALL_CANCEL(...) \
({ \
long int sc_ret; \
if (NO_SYSCALL_CANCEL_CHECKING) \
sc_ret = INTERNAL_SYSCALL_CALL (__VA_ARGS__); \
else \
{ \
int sc_cancel_oldtype = LIBC_CANCEL_ASYNC (); \
sc_ret = INTERNAL_SYSCALL_CALL (__VA_ARGS__); \
LIBC_CANCEL_RESET (sc_cancel_oldtype); \
} \
sc_ret; \
})

总结

主要对LockSupport的park等待实现的底层实现的浅析,针对于Linux的系统调用还没有找到源码,后续会继续跟踪,希望有读者知道的满帆可以告知下,谢谢。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何在 Kubernetes 集群中玩转 Fluid + J

发表于 2021-11-30

作者简介:吕冬冬,云知声超算平台架构师, 负责大规模分布式机器学习平台架构设计与功能研发,负责深度学习算法应用的优化与 AI 模型加速。研究领域包括高性能计算、分布式文件存储、分布式缓存等。

朱唯唯,Juicedata 全栈工程师,负责 JuiceFS CSI Driver 的开发和维护,负责 JuiceFS 在云原生领域的发展。

云知声 Atlas 团队在 2021 年初开始接触并跟进 JuiceFS 存储,并且在早期已经积累了丰富的 Fluid 使用经验。近期,云知声团队与 Juicedata 团队合作开发了 Fluid JuiceFS 加速引擎,使用户能够更好地在 Kubernetes 环境中使用 JuiceFS 缓存管理能力。本篇文章讲解如何在 Kubernetes 集群中玩转 Fluid + JuiceFS。

背景介绍

Fluid 简介

CNCF Fluid 是一个开源的 Kubernetes 原生的分布式数据集编排和加速引擎,主要服务于云原生场景下的数据密集型应用,例如大数据应用、AI 应用等,关于 Fluid 更多信息可以参考地址。

Fluid 不是全存储加速和管理,而是应用使用的数据集加速和管理。Fluid 提供了一种更加云原生的方式对数据集进行管理,通过缓存加速引擎实现将底层存储系统的数据 cache 在计算节点的内存或者硬盘上,解决了计算与存储分离架构中由于数据传输带宽限制以及底层存储带宽与 IOPS 能力限制等问题,导致的 IO 效率不高等问题。Fluid 提供缓存数据调度能力,缓存被纳入 kubernetes 扩展资源,kubernetes 在进行任务的调度的时候,能够参考缓存进行调度策略的分配。

Fluid 有 2个重要的概念:Dataset 与 Runtime

  • Dataset: 数据集是逻辑上相关的一组数据的集合,一致的文件特性,会被同一运算引擎使用。
  • Runtime: 实现数据集安全性,版本管理和数据加速等能力的执行引擎的接口,定义了一系列生命周期的方法。

Fluid 的 Runtime 定义了标准化的接口,Cache Runtime Engine 可以对接多种缓存引擎,提供了用户更灵活的选择,用户能够针对不同的场景与需求,充分利用缓存引擎加速相应的场景应用。

JuiceFS 简介

JuiceFS 是一个面向云环境设计的高性能开源分布式文件系统,完全兼容 POSIX、HDFS、S3 接口,适用于大数据、AI 模型训练、Kubernetes 共享存储、海量数据归档管理等场景。

使用 JuiceFS 存储数据,数据本身会被持久化在对象存储(例如,Amazon S3),而数据所对应的元数据可以根据场景需求被持久化在 Redis、MySQL、TiKV 等多种数据库引擎中。JuiceFS 客户端具有数据缓存能力,当通过 JuiceFS 客户端读取数据时,这些数据将会智能地缓存到应用配置的本地缓存路径(可以是内存,也可以是磁盘),同时元数据也会缓存到客户端节点本地内存中。

对于 AI 模型训练场景来说,第一个 epoch 完成之后后续的计算都可以直接从缓存中获取训练数据,极大地提升了训练效率。JuiceFS 也具有预读、并发读取数据的能力,在 AI 训练场景能够保证每个 mini-batch 的生成效率,提前准备好数据。数据预热能够提前将公有云上的数据换到到本地节点,对于 AI 训练场景能够保证申请完 GPU 资源后,即有预热的数据进行运算,为宝贵的 GPU 使用节省了时间。

为什么使用 JuiceFSRuntime

云知声 Atlas 超算平台作为底层基础架构,支持着公司在 AI 各个领域的模型训练与推理服务的开展。云知声很早就开始布局建设业界领先的 GPU/CPU 异构 Atlas 计算平台和分布式文件存储系统,该计算集群可为 AI 计算提供高性能计算和海量数据的存储访问能力。云知声 Atlas 团队在 2021 年初开始接触并跟进 JuiceFS 存储,进行了一系列 POC 测试,在数据可靠性与业务场景的适配,都满足我们目前的需求。

在训练场景我们充分利用 JuiceFS 客户端的缓存能力,为 AI 模型训练做数据加速,但是在使用过程中发现了一些问题:

  • 训练 Pod 通过 hostpath 挂载,需要在每个计算节点挂载 JuiceFS 客户端,挂载需要管理员操作,挂载参数固定,不够灵活。
  • 用户无法对计算节点客户端的缓存管理,缓存无法手动清理与扩容。
  • 缓存数据集无法像 Kubernetes 自定义资源一样能够被 kubernetes 进行调度。

由于我们在生产环境已经积累了一定的 Fluid 使用经验,所以我们与 Juicedata 团队合作设计并开发了 JuiceFSRuntime,将 Fluid 对数据编排与管理能力和 JuiceFS 的缓存能力结合起来。

什么是 Fluid + JuiceFS(JuiceFSRuntime)

JuiceFSRuntime 是 Fluid 自定义的一种 Runtime,其中可以指定 JuiceFS 的 worker、fuse 镜像以及相应的缓存参数。其构建方式与 Fluid 其他 Runtime 一致,即通过 CRD 的方式构建,JuiceFSRuntime Controller 监听 JuiceFSRuntime 资源,实现缓存 Pod 的管理。

JuiceFSRuntime 支持数据亲和性调度(nodeAffinity),选择合适的缓存节点,支持 Fuse pod 懒启动,支持用户以 POSIX 接口访问数据,目前只支持一个挂载点。

其架构图如上图所示,JuiceFSRuntime 由 Fuse Pod 与 Worker Pod 组成。Worker pod 主要实现缓存的管理,如 Runtime 退出时的缓存清理;Fuse pod 主要负责 JuiceFS 客户端的参数设置及挂载。

如何使用 JuiceFSRunime

下面来看看如何使用 JuiceFSRuntime 进行缓存加速。

前期准备

要使用 JuiceFSRuntime 首先需要准备元数据引擎和对象存储。

构建元数据引擎

用户可以很容易的在云计算平台购买到各种配置的云 Redis 数据库,如果是评估测试使用可以使用 Docker 快速的在服务器上运行一个 Redis 数据库实例:

1
2
3
4
5
lua复制代码$ sudo docker run -d --name redis \
-v redis-data:/data \
-p 6379:6379 \
--restart unless-stopped \
redis redis-server --appendonly yes

准备对象存储

和 Redis 数据库一样,几乎所有的公有云计算平台都提供对象存储服务。因为 JuiceFS 支持几乎所有主流平台的对象存储服务,用户可以结合自己的情况进行部署。

这里是评估测试应该使用的是 Dokcer 运行的 minio 实例:

1
2
3
4
5
6
shell复制代码$ $ sudo docker run -d --name minio \
-p 9000:9000 \
-p 9900:9900 \
-v $PWD/minio-data:/data \
--restart unless-stopped \
minio/minio server /data --console-address ":9900"

对象存储初始的 Access Key 和 Secret Key 均为 minioadmin。

下载并安装 Fluid

按照文档步骤安装 Fluid,在 Fluid 的安装 chart values.yaml 中将 runtime.juicefs.enable 设置为 true,并安装 Fluid。确保 Fluid 集群正常运行:

1
2
3
4
5
6
7
8
sql复制代码kubectl get po -n fluid-system
NAME READY STATUS RESTARTS AGE
csi-nodeplugin-fluid-ctc4l 2/2 Running 0 113s
csi-nodeplugin-fluid-k7cqt 2/2 Running 0 113s
csi-nodeplugin-fluid-x9dfd 2/2 Running 0 113s
dataset-controller-57ddd56b54-9vd86 1/1 Running 0 113s
fluid-webhook-84467465f8-t65mr 1/1 Running 0 113s
juicefsruntime-controller-56df96b75f-qzq8x 1/1 Running 0 113s

确保 juicefsruntime-controller、dataset-controller、fluid-webhook 的 pod 以及若干 csi-nodeplugin pod 正常运行。

创建 Dataset

在使用 JuiceFS 之前,需要提供元数据服务(如 redis)及对象存储服务(如 minio)的参数,并创建对应的 secret:

1
2
3
4
ini复制代码kubectl create secret generic jfs-secret \
--from-literal=metaurl=redis://$IP:6379/1 \ # redis 的地址 IP 为 redis 所在节点的 IP
--from-literal=access-key=minioadmin \ # 对象存储的 ak
--from-literal=secret-key=minioadmin #对象存储的 sk

创建 Dataset yaml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
yaml复制代码cat<<EOF >dataset.yaml
apiVersion: data.fluid.io/v1alpha1
kind: Dataset
metadata:
name: jfsdemo
spec:
mounts:
- name: minio
mountPoint: "juicefs:///demo"
options:
bucket: "<bucket>"
storage: "minio"
encryptOptions:
- name: metaurl
valueFrom:
secretKeyRef:
name: jfs-secret
key: metaurl
- name: access-key
valueFrom:
secretKeyRef:
name: jfs-secret
key: access-key
- name: secret-key
valueFrom:
secretKeyRef:
name: jfs-secret
key: secret-key
EOF

由于 JuiceFS 采用的是本地缓存,对应的 Dataset 只支持一个 mount,且 JuiceFS 没有 UFS,mountpoint 中可以指定需要挂载的子目录 (“juicefs:///“ 为根路径),会作为根目录挂载到容器内。

创建 Dataset 并查看 Dataset 状态

1
2
3
4
5
6
swift复制代码$ kubectl create -f dataset.yaml
dataset.data.fluid.io/jfsdemo created

$ kubectl get dataset jfsdemo
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE AGE
jfsdemo NotBound 44s

如上所示,status 中的 phase 属性值为 NotBound,这意味着该 Dataset 资源对象目前还未与任何 JuiceFSRuntime 资源对象绑定,接下来,我们将创建一个 JuiceFSRuntime 资源对象。

创建 JuiceFSRuntime

创建 JuiceFSRuntime 的 yaml 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
yaml复制代码$ cat<<EOF >runtime.yaml
apiVersion: data.fluid.io/v1alpha1
kind: JuiceFSRuntime
metadata:
name: jfsdemo
spec:
replicas: 1
tieredstore:
levels:
- mediumtype: SSD
path: /cache
quota: 40960 # JuiceFS 中 quota 的最小单位是 MiB,所以这里是 40GiB
low: "0.1"
EOF

创建并查看 JuiceFSRuntime

1
2
3
4
5
6
ruby复制代码$ $ kubectl create -f runtime.yaml
juicefsruntime.data.fluid.io/jfsdemo created

$ kubectl get juicefsruntime
NAME WORKER PHASE FUSE PHASE AGE
jfsdemo Ready Ready 72s

查看 JuiceFS 相关组件 Pod 的状态

1
2
sql复制代码$$ kubectl get po |grep jfs
jfsdemo-worker-mjplw 1/1 Running 0 4m2s

JuiceFSRuntime 没有 master 组件,而 Fuse 组件实现了懒启动,会在 pod 使用时再创建。

创建缓存加速作业

创建需要加速的应用,其中 Pod 使用上面创建的 Dataset 的方式为指定同名的 PVC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
yaml复制代码$ cat<<EOF >sample.yaml
apiVersion: v1
kind: Pod
metadata:
name: demo-app
spec:
containers:
- name: demo
image: nginx
volumeMounts:
- mountPath: /data
name: demo
volumes:
- name: demo
persistentVolumeClaim:
claimName: jfsdemo
EOF

创建 Pod

1
2
bash复制代码$ kubectl create -f sample.yaml
pod/demo-app created

查看 pod 状态

1
2
3
4
sql复制代码$ kubectl get po |grep demo
demo-app 1/1 Running 0 31s
jfsdemo-fuse-fx7np 1/1 Running 0 31s
jfsdemo-worker-mjplw 1/1 Running 0 10m

可以看到 pod 已经创建成功,同时 JuiceFS 的 Fuse 组件也启动成功。

进入 Pod 执行 df -hT 查看缓存目录是否挂载:

1
2
3
4
5
6
bash复制代码$ kubectl exec -it demo-app  bash -- df -h
Filesystem Size Used Avail Use% Mounted on
overlay 20G 14G 5.9G 71% /
tmpfs 64M 0 64M 0% /dev
tmpfs 3.9G 0 3.9G 0% /sys/fs/cgroup
JuiceFS:minio 1.0P 7.9M 1.0P 1% /data

可以看到这时候缓存目录已经成功挂载了。

接下来,我们在 demo-app 这个 pod 中测试一下写功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ruby复制代码$ kubectl exec -it demo-app bash
[root@demo-app /]# df
Filesystem 1K-blocks Used Available Use% Mounted on
overlay 20751360 14585944 6165416 71% /
tmpfs 65536 0 65536 0% /dev
tmpfs 3995028 0 3995028 0% /sys/fs/cgroup
JuiceFS:minio 1099511627776 8000 1099511619776 1% /data
/dev/sda2 20751360 14585944 6165416 71% /etc/hosts
shm 65536 0 65536 0% /dev/shm
tmpfs 3995028 12 3995016 1% /run/secrets/kubernetes.io/serviceaccount
tmpfs 3995028 0 3995028 0% /proc/acpi
tmpfs 3995028 0 3995028 0% /proc/scsi
tmpfs 3995028 0 3995028 0% /sys/firmware
[root@demo-app /]#
[root@demo-app /]# cd /data
[root@demo-app data]# echo "hello fluid" > hello.txt
[root@demo-app data]# cat hello.txt
hello fluid

最后再来看看缓存功能,在 demo-app 这个 pod 中的挂载目录 /data 中创建一个 1G 的文件,然后再 cp 出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
sql复制代码$ kubectl exec -it demo-app  bash
root@demo-app:~# dd if=/dev/zero of=/data/test.txt count=1024 bs=1M
1024+0 records in
1024+0 records out
1073741824 bytes (1.1 GB, 1.0 GiB) copied, 6.55431 s, 164 MB/s
root@demo-app:~# time cp /data/test.txt ./test.txt
real 0m5.014s
user 0m0.003s
sys 0m0.702s
root@demo-app:~# time cp /data/test.txt ./test.txt
real 0m0.602s
user 0m0.004s
sys 0m0.584s

从执行结果来看,第一次 cp 用了 5s,此时建立缓存,第二次 cp 的时候由于缓存已经存在,只用了 0.6s。JuiceFS 所提供的强大的缓存能力,使得只要访问某个文件一次,该文件就会被缓存在本地缓存路径中中,所有接下来的重复访问都是从 JuiceFS 中直接获取数据。

后续规划

目前 JuiceFSRuntime 支持的功能并不多,未来我们会继续完善,比如 Fuse Pod 以 Nonroot 的方式运行,以及 Dataload 数据预热功能等。

推荐阅读:
知乎 x JuiceFS:利用 JuiceFS 给 Flink 容器启动加速

如有帮助的话欢迎关注我们 Juicedata/JuiceFS 哟! (0ᴗ0✿)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

十张图带你了解负载均衡 前言

发表于 2021-11-30

「这是我参与11月更文挑战的第8天,活动详情查看:2021最后一次更文挑战」

hi ,大家好,我是三天打鱼,两天晒网的小六六

前言

文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820…
种一棵树最好的时间是十年前,其次是现在

叨絮

相信很多小伙伴的公司都是服务治理,自动化运维了吧,那么我们很多东西都变成我们自己去设置了,比如自己创建一个域名,绑定他的代理机器,它的web负载均衡这些东西。所以今天跟大家一起来看看负载均衡

如果不愿意看文字的话,图也是很清晰的哦

你怎么看负载均衡

负载均衡是高可用网络基础架构的关键组件,通常用于将工作负载分布到多个服务器来提高网站、应用、数据库或其他服务的性能和可靠性。

  • 相信很多小伙伴,一听到负载均衡四个字,第一个想到就是我们所说的Nginx吧,因为这个是离我们开发比较近的一个组件了。
  • 第二个呢?就是我们Springcloud的组件中自带了负载均衡(ribbon),这个也是离我们开发比较近的
  • 第三个?就是其实我们k8s里面的服务也是能做负载均衡的,目前主流容器使用方式
  • 第四个就是我们DNS之后的一个负载均衡了SLB(这个之前运费负责的多点)

为啥要负责均衡呢?

大家看下面的图,当我们访问一个网站的时候,如果突然的流量增加,就会导致我们的服务不可用(单点故障)

image.png

image.png

一个没有负载均衡的 web 架构类似下面这样:

image.png

image.png

所以为了解决单点问题我们需要负载均衡(也是我们高可用,高性能,高并发的基石)

有负载均衡的架构

image.png

image.png

web架构

image.png

image.png

聊聊SLB

image.png

image.png

相信很多公司都有用到把,

负载均衡的组成

  • 负载均衡实例 (Instances):一个负载均衡实例是一个运行的负载均衡服务,用来接收流量并将其转发给后端服务器,至少添加一个监听(Listeners)和两台ECS实例。
  • 监听(Listeners):监听客户端的请求并将其转发给后端服务器,监听也会对后端服务器进行健康检查。
  • 后端服务器(Backend Servers):后端服务器是一组接收前端请求的ECS实例,可以单独添加ECS实例到后端服务器池;

健康检查

image.png

image.png

Nginx负载均衡

如果你们还没用上容器,那么肯定是用Nginx来做负载均衡的了。

image.png

image.png

至于搭建这边就不讲了,大家百度下,肯定是能知道的。

SpringCloud负载均衡ribbon

springcloud中提供了一系列的组件,我们使用ribbon实现负载均衡,eureka中也内置了ribbon,所以,引入了eureka其实就可以直接使用ribbon了

ribbon中的负载均衡用在客户端,或者说成消费端也可以,在消费者访问提供者时,就会进行负载均衡算法,然后找到一个最优的提供者提供服务

image.png

image.png

K8s 服务治理的负载均衡(Ingress)

k8S的负载均衡模式还挺多的,这边我就说一个吧Ingress

Ingress 是 k8s 的一种资源对象, 该对象允许外部访问 k8s 服务, 通过创建规则集合来配置访问权限,这些规则定义了哪些入站连接可以访问哪些服务; Ingress 仅支持 HTTP 和 HTTPS 协议; ingress 可配置用于提供外部可访问的服务 url、负载均衡流量、SSL终端和提供虚拟主机名配置。

ingress 的工作流程如下;

image.png

image.png

大概的访问路径如下:

用户访问 –> LB –> ingress-nginx-service –> ingressController-ingress-nginx-pod –> ingress字段中调用的后端pod

面试题 nginx四层和七层负载均衡的区别

简单理解四层和七层负载均衡

  • 所谓四层就是基于IP+端口的负载均衡;七层就是基于URL等应用层信息的负载均衡;同理,还有基于MAC地址的二层负载均衡和基于IP地址的三层负载均衡。 换句换说,二层负载均衡会通过一个虚拟MAC地址接收请求,然后再分配到真实的MAC地址;三层负载均衡会通过一个虚拟IP地址接收请求,然后再分配到真实的IP地址;四层通过虚拟IP+端口接收请求,然后再分配到真实的服务器;七层通过虚拟的URL或主机名接收请求,然后再分配到真实的服务器。
  • 所谓的四到七层负载均衡,就是在对后台的服务器进行负载均衡时,依据四层的信息或七层的信息来决定怎么样转发流量。 比如四层的负载均衡,就是通过发布三层的IP地址(VIP),然后加四层的端口号,来决定哪些流量需要做负载均衡,对需要处理的流量进行NAT处理,转发至后台服务器,并记录下这个TCP或者UDP的流量是由哪台服务器处理的,后续这个连接的所有流量都同样转发到同一台服务器处理。七层的负载均衡,就是在四层的基础上(没有四层是绝对不可能有七层的),再考虑应用层的特征,比如同一个Web服务器的负载均衡,除了根据VIP加80端口辨别是否需要处理的流量,还可根据七层的URL、浏览器类别、语言来决定是否要进行负载均衡。举个例子,如果你的Web服务器分成两组,一组是中文语言的,一组是英文语言的,那么七层负载均衡就可以当用户来访问你的域名时,自动辨别用户语言,然后选择对应的语言服务器组进行负载均衡处理。
  • 负载均衡器通常称为四层交换机或七层交换机。四层交换机主要分析IP层及TCP/UDP层,实现四层流量负载均衡。七层交换机除了支持四层负载均衡以外,还有分析应用层的信息,如HTTP协议URI或Cookie信息。

结束

作为一个互联网工具人,你必须知道整一个用户流量的流转过程,这样你才能对流量的每个环节去掌握,也就是先要把一个东西串起来,然后一个个去拆解里面的细节。

总结下上面几个说到的负载均衡的顺序吧

首先流量DNS解析之后肯定我们一般用的是SLB这种,然后如果是物理机部署(Nginx负载均衡+SpringCloud负载均衡) 如果是容器的话,那么就是k8s的负载均衡了,主流的就是这样情况了。

一个系统的完整架构图

image.png

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Cloud Gateway源码解析-11-扩展

发表于 2021-11-30

由来

在Spring Cloud Gateway源码解析-10-自定义Predicate实现黑名单中我们自定义了Predicate来实现黑名单,但发现每次更改黑名单规则都要重启项目来实现,因此需要将路由信息存储在外部数据源中,定时刷新SCG内存中的路由信息。

思路

在这里插入图片描述

在Spring Cloud Gateway源码解析-03-RouteDefinitionLocator、RouteLocator解析中我们已经介绍过RouteDefinitionRepository,该接口在SCG中只有一个实现InMemoryRouteDefinitionRepository,并且该接口继承了RouteDefinitionWriter,RouteDefinitionWriter中定义了save、delete方法,通过方法名称可以知道是用来保存/添加/删除路由信息。

  1. 因此我们可以实现RouteDefinitionRepository用来保存从Redis中获取到的RouteDefinitionRedisRouteDefinitionRepository,由于RouteDefinitionRepository继承了RouteDefinitionLocator,因此会被CompositeRouteDefinitionLocator组合进去,从而被CachingRouteLocator拿到对应的Redis中的RouteDefinition装换成Route。
  2. 有了地方存储Redis中的定义的RouteDefinition,那是不是要有一个角色用来获取Redis中的数据,并组装成RouteDefinition保存到RedisRouteDefinitionRepository中,因此需要定义RedisRouteDefinitionRepositoryOperator用来从Redis中获取到数据库后生成RouteDefinition。可能我们的路由信息以后会放到MySQL、MongoDB等,因此可以抽象出一个从Repository中获取数据转换为RouteDefinition的接口RouteDefinitionRepositoryOperator。
  3. 基于上边这些,我们就实现了当SCG启动时从Redis中获取数据转换为RouteDefinition,并保存到RedisRouteDefinitionRepository中,但是想要实现当修改了Redis中的路由信息后同步SCG更新,还不够,需要有一个类似Nacos的心跳机制,定时通知SCG去重新获取一次Redis中的数据。因此可以模仿Nacos的心跳机制实现RedisRouteDefinitionWatch发送心跳事件,触发CachingRouteLocator重新获取RouteDefinition来重新生成Route。

实现

RouteDefinitionRepositoryOperator

1
2
3
4
5
6
7
8
9
10
11
json复制代码/**
* 定义从不同数据源获取RouteDefinition的抽象
* @author li.hongjian
* @email lhj502819@163.com
* @Date 2021/4/1
*/
public interface RouteDefinitionRepositoryOperator {

Flux<RouteDefinition> getRouteDefinitions();

}

RedisRouteDefinitionRepositoryOperator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
json复制代码/**
* Description:用来获取Redis中的RouteDefinition 并保存到{@link RedisRouteDefinitionRepository}
*
* @author li.hongjian
* @email lhj502819@163.com
* @Date 2021/4/1
*/
public class RedisRouteDefinitionRepositoryOperator implements RouteDefinitionRepositoryOperator {

private final String REDIS_ROUTE_ID_PREFIX = "route-*";

private StringRedisTemplate redisTemplate;

public RedisRouteDefinitionRepositoryOperator(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}


@Override
public Flux<RouteDefinition> getRouteDefinitions() {
//获取指定前缀的RedisKey。Redis的数据结构使用Hash,value的结构为predicates和filters,
//predicates数据结构JsonArray,可配置多个
// 由于PredicateDefinition的构造方法支持传入类似Path=/api/hello这种格式的参数,并会自动封装为name和args,因此我们取巧可以在Redis中存储如下结构
// 如:["Path=/api/hello","BlackRemoteAddr=172.17.30.1/18,172.17.31.1/18"],表示PathRoutePredicateFactory和BlackRemoteAddrRoutePredicateFactory
//filters与predicates一样
return Flux.fromStream(redisTemplate.keys(REDIS_ROUTE_ID_PREFIX).parallelStream().map(routeId -> {
RouteDefinition routeDefinition = new RouteDefinition();
//以RedisKey作为RouteID
routeDefinition.setId(routeId);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(routeId);
String uri = (String) entries.get("uri");
try {
routeDefinition.setUri(new URI(uri));
} catch (URISyntaxException e) {
e.printStackTrace();
}
//初始化PredicateDefinition,并添加到RouteDefinition中
initPredicate(routeDefinition, entries);

//初始化FilterDefinition,并添加到RouteDefinition中
initFilter(routeDefinition, entries);
return routeDefinition;
}));
}

private void initPredicate(RouteDefinition routeDefinition, Map<Object, Object> entries) {
Object predicates = entries.get("predicates");
if (predicates == null) {
return;
}
JSONArray predicateArry = JSONArray.parseArray((String) predicates);
predicateArry.parallelStream().forEach(predicate -> {
//遍历predicates,创建RouteDefinition,并添加到RouteDefinition中
PredicateDefinition predicateDefinition = new PredicateDefinition((String) predicate);
routeDefinition.getPredicates().add(predicateDefinition);
});
}

private void initFilter(RouteDefinition routeDefinition, Map<Object, Object> entries) {
Object filters = entries.get("filters");
if (filters == null) {
return;
}
JSONArray predicateArry = JSONArray.parseArray((String) filters);
predicateArry.parallelStream().forEach(filter -> {
//遍历predicates,创建RouteDefinition,并添加到RouteDefinition中
FilterDefinition filterDefinition = new FilterDefinition((String) filter);
routeDefinition.getFilters().add(filterDefinition);
});
}
}

RedisRouteDefinitionRepository

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
json复制代码/**
* Description:基于Redis作为RouteDefinition Repository
*
* @author li.hongjian
* @email lhj502819@163.com
* @Date 2021/4/1
*/
public class RedisRouteDefinitionRepository implements RouteDefinitionRepository{

private final Map<String, RouteDefinition> routes = synchronizedMap(
new LinkedHashMap<String, RouteDefinition>());

private RedisRouteDefinitionRepositoryOperator redidRouteDefinitionOperator;

/**
* 将RedisRouteDefinitionRepositoryOperator组装进来
* @param redidRouteDefinitionOperator
*/
public RedisRouteDefinitionRepository(RedisRouteDefinitionRepositoryOperator redidRouteDefinitionOperator) {
this.redidRouteDefinitionOperator = redidRouteDefinitionOperator;
}

/**
* 在{@link CompositeRouteDefinitionLocator#getRouteDefinitions()}调用时 调用redidRouteDefinitionOperator去Redis中取数据
* @return
*/
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
redidRouteDefinitionOperator.getRouteDefinitions().flatMap(r -> save(Mono.just(r))).subscribe();
return Flux.fromIterable(routes.values());
}

@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap(r -> {
if (StringUtils.isEmpty(r.getId())) {
return Mono.error(new IllegalArgumentException("id may not be empty"));
}
routes.put(r.getId(), r);
return Mono.empty();
});
}

@Override
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap(id -> {
if (routes.containsKey(id)) {
routes.remove(id);
return Mono.empty();
}
return Mono.defer(() -> Mono.error(
new NotFoundException("RouteDefinition not found: " + routeId)));
});
}
}

RedisRouteDefinitionWatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
json复制代码/**
* @author li.hongjian
* @email lhj502819@163.com
* @Date 2021/4/1
*/
public class RedisRouteDefinitionWatch implements ApplicationEventPublisherAware, SmartLifecycle {

private final TaskScheduler taskScheduler = getTaskScheduler();

private final AtomicLong redisWatchIndex = new AtomicLong(0);

private final AtomicBoolean running = new AtomicBoolean(false);

private ApplicationEventPublisher publisher;

private ScheduledFuture<?> watchFuture;

private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("Redis-Watch-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
}


@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}

@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::redisServicesWatch, 30000); //启动一个定时,30s执行一次
}
}

/**
* 这里最好是自定义一个事件,因为如果使用了Nacos的话,会冲突,这样的话需要修改SCG的源码,监听自定义的事件
* 我们就不这么做了,感兴趣的可以自行实现
*/
private void redisServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent( //30s发布一次事件,通知SCG重新拉取
new HeartbeatEvent(this, redisWatchIndex.getAndIncrement()));
}

@Override
public void stop() {
if (this.running.compareAndSet(true, false) && this.watchFuture != null) {
this.watchFuture.cancel(true);
}
}

@Override
public boolean isRunning() {
return false;
}
}

这样就大功告成了,实现了基于Redis配置路由信息并且可动态刷新的功能。

使用

1、Redis中数据:
在这里插入图片描述

2、将RedisRouteDefinitionWatch、RedisRouteDefinitionRepository、RedisRouteDefinitionRepositoryOperator放到Spring容器中,比如@Bean注入
通过以上两步,即可完成。代码写的比较简陋。

大家可自行验证下,亲测有效。代码仓库地址

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

400 第 N 位数字 位数统计模拟题

发表于 2021-11-30

「这是我参与11月更文挑战的第 30 天,活动详情查看:2021最后一次更文挑战」。

题目描述

这是 LeetCode 上的 400. 第 N 位数字 ,难度为 中等。

Tag : 「数学」、「模拟」

给你一个整数 nnn ,请你在无限的整数序列 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ...] 中找出并返回第 nnn 位数字。

示例 1:

1
2
3
ini复制代码输入:n = 3

输出:3

示例 2:

1
2
3
4
5
ini复制代码输入:n = 11

输出:0

解释:第 11 位数字在序列 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ... 里是 0 ,它是 10 的一部分。

提示:

  • 1<=n<=231−11 <= n <= 2^{31} - 11<=n<=231−1

模拟

我们知道,对于长度为 lenlenlen 的数字的范围为 [10len−1,10len−1][10^{len - 1}, 10^{len} - 1][10len−1,10len−1](共 9∗10len−19 * 10^{len - 1}9∗10len−1 个),总长度为:

L=len∗9∗10len−1L = len * 9 * 10^{len - 1}L=len∗9∗10len−1
因此我们可以先对 nnn 进行不断试减(更新 nnn),确定下来目标数字 xxx 的长度为多少,假设为 lenlenlen。

然后直接计算出长度 lenlenlen 的最小值为 s=10len−1s = 10^{len - 1}s=10len−1,由于范围内的数长度都是 lenlenlen,因此我们可以直接定位到目标数字 xxx 为何值。

根据目标值 xxx 必然满足关系式:

(x−s+1)∗len≥n(x - s + 1) * len \geq n(x−s+1)∗len≥n
进行变形可得:

x≥⌊nlen⌋−1+sx \geq \left \lfloor \frac{n}{len} \right \rfloor - 1 + sx≥⌊lenn​⌋−1+s
对 nnn 进行最后一次的试减(更新 nnn),若恰好有 n=0n = 0n=0,说明答案为 xxx 的最后一位,可由 x % 10 取得;若大于 000,说明答案是 x+1x + 1x+1 的第 nnn 位(十进制表示,从左往右数),可由 (x + 1) / (int) (Math.pow(10, len - n)) % 10 取得。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
Java复制代码class Solution {
public int findNthDigit(int n) {
int len = 1;
while (len * 9 * Math.pow(10, len - 1) < n) {
n -= len * 9 * Math.pow(10, len - 1);
len++;
}
long s = (long) Math.pow(10, len - 1);
long x = n / len - 1 + s;
n -= (x - s + 1) * len;
return n == 0 ? (int) (x % 10) : (int) ((x + 1) / (int) (Math.pow(10, len - n)) % 10);
}
}
  • 时间复杂度:O(log⁡n)O(\log{n})O(logn)
  • 空间复杂度:O(1)O(1)O(1)

最后

这是我们「刷穿 LeetCode」系列文章的第 No.400 篇,系列开始于 2021/01/01,截止于起始日 LeetCode 上共有 1916 道题目,部分是有锁题,我们将先把所有不带锁的题目刷完。

在这个系列文章里面,除了讲解解题思路以外,还会尽可能给出最为简洁的代码。如果涉及通解还会相应的代码模板。

为了方便各位同学能够电脑上进行调试和提交代码,我建立了相关的仓库:github.com/SharingSour… 。

在仓库地址里,你可以看到系列文章的题解链接、系列文章的相应代码、LeetCode 原题链接和其他优选题解。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…107108109…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%