开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

经典面试题:你觉得 Go 在什么时候会抢占 P?

发表于 2021-06-24

微信搜索【脑子进煎鱼了】关注这一只爆肝煎鱼。本文 GitHub github.com/eddycjy/blo… 已收录,有我的系列文章、资料和开源 Go 图书。

大家好,我是煎鱼。

前几天我们有聊到《单核 CPU,开两个 Goroutine,其中一个死循环,会怎么样?》的问题,我们在一个细节部分有提到:

有新的小伙伴会产生更多的疑问,那就是在 Go 语言中,是如何抢占 P 的呢,这里面是怎么做的?

今天这篇文章我们就来解密抢占 P。

调度器的发展史

在 Go 语言中,Goroutine 早期是没有设计成抢占式的,早期 Goroutine 只有读写、主动让出、锁等操作时才会触发调度切换。

这样有一个严重的问题,就是垃圾回收器进行 STW 时,如果有一个 Goroutine 一直都在阻塞调用,垃圾回收器就会一直等待他,不知道等到什么时候…

这种情况下就需要抢占式调度来解决问题。如果一个 Goroutine 运行时间过久,就需要进行抢占来解决。

这块 Go 语言在 Go1.2 起开始实现抢占式调度器,不断完善直至今日:

  • Go0.x:基于单线程的程调度器。
  • Go1.0:基于多线程的调度器。
  • Go1.1:基于任务窃取的调度器。
  • Go1.2 - Go1.13:基于协作的抢占式调度器。
  • Go1.14:基于信号的抢占式调度器。

调度器的新提案:非均匀存储器访问调度(Non-uniform memory access,NUMA),
但由于实现过于复杂,优先级也不够高,因此迟迟未提上日程。

有兴趣的小伙伴可以详见 Dmitry Vyukov, dvyukov 所提出的 NUMA-aware scheduler for Go。

为什么要抢占 P

为什么会要想去抢占 P 呢,说白了就是不抢,就没机会运行,会 hang 死。又或是资源分配不均了,

这在调度器设计中显然是不合理的。

跟这个例子一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
golang复制代码// Main Goroutine 
func main() {
// 模拟单核 CPU
runtime.GOMAXPROCS(1)

// 模拟 Goroutine 死循环
go func() {
for {
}
}()

time.Sleep(time.Millisecond)
fmt.Println("脑子进煎鱼了")
}

这个例子在老版本的 Go 语言中,就会一直阻塞,没法重见天日,是一个需要做抢占的场景。

但可能会有小伙伴问,抢占了,会不会有新问题。因为原本正在使用 P 的 M 就凉凉了(M 会与 P 进行绑定),没了 P 也就没法继续执行了。

这其实没有问题,因为该 Goroutine 已经阻塞在了系统调用上,暂时是不会有后续的执行新诉求。

但万一代码是在运行了好一段时间后又能够运行了(业务上也允许长等待),也就是该 Goroutine 从阻塞状态中恢复了,期望继续运行,没了 P 怎么办?

这时候该 Goroutine 可以和其他 Goroutine 一样,先检查自身所在的 M 是否仍然绑定着 P:

  • 若是有 P,则可以调整状态,继续运行。
  • 若是没有 P,可以重新抢 P,再占有并绑定 P,为自己所用。

也就是抢占 P,本身就是一个双向行为,你抢了我的 P,我也可以去抢别人的 P 来继续运行。

怎么抢占 P

讲解了为什么要抢占 P 的原因后,我们进一步深挖,“他” 是怎么抢占到具体的 P 的呢?

这就涉及到前文所提到的 runtime.retake 方法了,其处理以下两种场景:

  • 抢占阻塞在系统调用上的 P。
  • 抢占运行时间过长的 G。

在此主要针对抢占 P 的场景,分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
golang复制代码func retake(now int64) uint32 {
n := 0
// 防止发生变更,对所有 P 加锁
lock(&allpLock)
// 走入主逻辑,对所有 P 开始循环处理
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
...
if s == _Psyscall {
// 判断是否超过 1 个 sysmon tick 周期
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}

...
}
}
unlock(&allpLock)
return uint32(n)
}

该方法会先对 allpLock 上锁,这个变量含义如其名,allpLock 可以防止该数组发生变化。

其会保护 allp、idlepMask 和 timerpMask 属性的无 P 读取和大小变化,以及对 allp 的所有写入操作,可以避免影响后续的操作。

场景一

前置处理完毕后,进入主逻辑,会使用万能的 for 循环对所有的 P(allp)进行一个个处理。

1
2
3
4
5
6
golang复制代码			t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}

第一个场景是:会对 syscalltick 进行判定,如果在系统调用(syscall)中存在超过 1 个 sysmon tick 周期(至少 20us)的任务,则会从系统调用中抢占 P,否则跳过。

场景二

如果未满足会继续往下,走到如下逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
golang复制代码func retake(now int64) uint32 {
for i := 0; i < len(allp); i++ {
...
if s == _Psyscall {
// 从此处开始分析
if runqempty(_p_) &&
atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 &&
pd.syscallwhen+10*1000*1000 > now {
continue
}
...
}
}
unlock(&allpLock)
return uint32(n)
}

第二个场景,聚焦到这一长串的判断中:

  • runqempty(_p_) == true 方法会判断任务队列 P 是否为空,以此来检测有没有其他任务需要执行。
  • atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 会判断是否存在空闲 P 和正在进行调度窃取 G 的 P。
  • pd.syscallwhen+10*1000*1000 > now 会判断系统调用时间是否超过了 10ms。

这里奇怪的是 runqempty 方法明明已经判断了没有其他任务,这就代表了没有任务需要执行,是不需要抢夺 P 的。

但实际情况是,由于可能会阻止 sysmon 线程的深度睡眠,最终还是希望继续占有 P。

在完成上述判断后,进入到抢夺 P 的阶段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
golang复制代码func retake(now int64) uint32 {
for i := 0; i < len(allp); i++ {
...
if s == _Psyscall {
// 承接上半部分
unlock(&allpLock)
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
  • 解锁相关属性:需要调用 unlock 方法解锁 allpLock,从而实现获取 sched.lock,以便继续下一步。
  • 减少闲置 M:需要在原子操作(CAS)之前减少闲置 M 的数量(假设有一个正在运行)。 否则在发生抢夺 M 时可能会退出系统调用,递增 nmidle 并报告死锁事件。
  • 修改 P 状态:调用 atomic.Cas 方法将所抢夺的 P 状态设为 idle,以便于交于其他 M 使用。
  • 抢夺 P 和调控 M:调用 handoffp 方法从系统调用或锁定的 M 中抢夺 P,会由新的 M 接管这个 P。

总结

至此完成了抢占 P 的基本流程,我们可得出满足以下条件:

  1. 如果存在系统调用超时:存在超过 1 个 sysmon tick 周期(至少 20us)的任务,则会从系统调用中抢占 P。
  2. 如果没有空闲的 P:所有的 P 都已经与 M 绑定。需要抢占当前正处于系统调用之,而实际上系统调用并不需要的这个 P 的情况,会将其分配给其它 M 去调度其它 G。
  3. 如果 P 的运行队列里面有等待运行的 G,为了保证 P 的本地队列中的 G 得到及时调度。而自己本身的 P 又忙于系统调用,无暇管理。此时会寻找另外一个 M 来接管 P,从而实现继续调度 G 的目的。

若有任何疑问欢迎评论区反馈和交流,最好的关系是互相成就,各位的点赞就是煎鱼创作的最大动力,感谢支持。

文章持续更新,可以微信搜【脑子进煎鱼了】阅读,回复【000】有我准备的一线大厂面试算法题解和资料;本文 GitHub github.com/eddycjy/blo… 已收录,欢迎 Star 催更。

参考

  • NUMA-aware scheduler for Go
  • go-under-the-hood
  • 深入解析 Go-抢占式调度
  • Go语言调度器源代码情景分析

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java多线程实战|CountDownLatch原理介绍及使

发表于 2021-06-24

这是我参与更文挑战的第24天,活动详情查看: 更文挑战

前言:

对于多线程编程,从JDK 1.5以后出现了java.util.concurrent处理线程的一个工具包,包里给我们提供了很多的多线程使用的工具类在特定的场景下可以起到很好的作用,包括 Semaphore,CountDownLatch,CyclicBarrier,Exchanger,Phaser这些常用的多线程工具类,我们今天主要讲一下CountDownLatch的原理及使用场景;

CountDownLatch

定义:

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。CountDownLatch主要提供的机制是多个(具体数量等于初始化CountDownLatch时count的值)线程都达到了预期状态或者完成了预期工作时触发事件,其他线程可以等待这个事件来触发自己后续的工作。到达自己预期状态的线程会调用CountDownLatch的countDown方法,而等待的线程会调用CountDownLatch的await方法。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

使用场景:

  1. 某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
csharp复制代码/**
* TODO
*
* @author taoze
* @version 1.0
* @date 6/23/21 7:54 PM
*/
public class CountdownLatchTest1 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
final CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("子线程" + Thread.currentThread().getName() + "开始执行");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("子线程" + Thread.currentThread().getName() + "执行完成");
latch.countDown();//当前线程调用此方法,则计数减一
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});


}
try {
System.out.println("主线程" + Thread.currentThread().getName() + "等待子线程执行完成...");
latch.await();//阻塞当前线程,直到计数器的值为0
System.out.println("主线程" + Thread.currentThread().getName() + "开始执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

执行结果:

图片.png
2. 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
csharp复制代码/**
* TODO
*
* @author taoze
* @version 1.0
* @date 6/24/21 10:16 AM
*/
public class CountdownLatchTest2 {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cdOrder = new CountDownLatch(1);
final CountDownLatch cdAnswer = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("选手" + Thread.currentThread().getName() + "正在等待裁判发布口令");
cdOrder.await();
System.out.println("选手" + Thread.currentThread().getName() + "已接受裁判口令");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("选手" + Thread.currentThread().getName() + "到达终点");
cdAnswer.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("裁判"+Thread.currentThread().getName()+"即将发布口令");
cdOrder.countDown();
System.out.println("裁判"+Thread.currentThread().getName()+"已发送口令,正在等待所有选手到达终点");
cdAnswer.await();
System.out.println("所有选手都到达终点");
System.out.println("裁判"+Thread.currentThread().getName()+"汇总成绩排名");
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
}
}

执行结果:

图片.png

ok!今天的文章就到这了,以上就是CountDownLatch的俩种使用方法,希望可以对大家有帮助,有不对的地方希望大家可以提出来的,共同成长;

整洁成就卓越代码,细节之中只有天地

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Pandas高级教程之 处理缺失数据 简介 NaN的例子 整

发表于 2021-06-24

简介

在数据处理中,Pandas会将无法解析的数据或者缺失的数据使用NaN来表示。虽然所有的数据都有了相应的表示,但是NaN很明显是无法进行数学运算的。

本文将会讲解Pandas对于NaN数据的处理方法。

NaN的例子

上面讲到了缺失的数据会被表现为NaN,我们来看一个具体的例子:

我们先来构建一个DF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
less复制代码In [1]: df = pd.DataFrame(np.random.randn(5, 3), index=['a', 'c', 'e', 'f', 'h'],
...: columns=['one', 'two', 'three'])
...:

In [2]: df['four'] = 'bar'

In [3]: df['five'] = df['one'] > 0

In [4]: df
Out[4]:
one two three four five
a 0.469112 -0.282863 -1.509059 bar True
c -1.135632 1.212112 -0.173215 bar False
e 0.119209 -1.044236 -0.861849 bar True
f -2.104569 -0.494929 1.071804 bar False
h 0.721555 -0.706771 -1.039575 bar True

上面DF只有acefh这几个index,我们重新index一下数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
sql复制代码In [5]: df2 = df.reindex(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])

In [6]: df2
Out[6]:
one two three four five
a 0.469112 -0.282863 -1.509059 bar True
b NaN NaN NaN NaN NaN
c -1.135632 1.212112 -0.173215 bar False
d NaN NaN NaN NaN NaN
e 0.119209 -1.044236 -0.861849 bar True
f -2.104569 -0.494929 1.071804 bar False
g NaN NaN NaN NaN NaN
h 0.721555 -0.706771 -1.039575 bar True

数据缺失,就会产生很多NaN。

为了检测是否NaN,可以使用isna()或者notna() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sql复制代码In [7]: df2['one']
Out[7]:
a 0.469112
b NaN
c -1.135632
d NaN
e 0.119209
f -2.104569
g NaN
h 0.721555
Name: one, dtype: float64

In [8]: pd.isna(df2['one'])
Out[8]:
a False
b True
c False
d True
e False
f False
g True
h False
Name: one, dtype: bool

In [9]: df2['four'].notna()
Out[9]:
a True
b False
c True
d False
e True
f True
g False
h True
Name: four, dtype: bool

注意在Python中None是相等的:

1
2
ini复制代码In [11]: None == None                                                 # noqa: E711
Out[11]: True

但是np.nan是不等的:

1
2
ini复制代码In [12]: np.nan == np.nan
Out[12]: False

整数类型的缺失值

NaN默认是float类型的,如果是整数类型,我们可以强制进行转换:

1
2
3
4
5
6
7
ini复制代码In [14]: pd.Series([1, 2, np.nan, 4], dtype=pd.Int64Dtype())
Out[14]:
0 1
1 2
2 <NA>
3 4
dtype: Int64

Datetimes 类型的缺失值

时间类型的缺失值使用NaT来表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
sql复制代码In [15]: df2 = df.copy()

In [16]: df2['timestamp'] = pd.Timestamp('20120101')

In [17]: df2
Out[17]:
one two three four five timestamp
a 0.469112 -0.282863 -1.509059 bar True 2012-01-01
c -1.135632 1.212112 -0.173215 bar False 2012-01-01
e 0.119209 -1.044236 -0.861849 bar True 2012-01-01
f -2.104569 -0.494929 1.071804 bar False 2012-01-01
h 0.721555 -0.706771 -1.039575 bar True 2012-01-01

In [18]: df2.loc[['a', 'c', 'h'], ['one', 'timestamp']] = np.nan

In [19]: df2
Out[19]:
one two three four five timestamp
a NaN -0.282863 -1.509059 bar True NaT
c NaN 1.212112 -0.173215 bar False NaT
e 0.119209 -1.044236 -0.861849 bar True 2012-01-01
f -2.104569 -0.494929 1.071804 bar False 2012-01-01
h NaN -0.706771 -1.039575 bar True NaT

In [20]: df2.dtypes.value_counts()
Out[20]:
float64 3
datetime64[ns] 1
bool 1
object 1
dtype: int64

None 和 np.nan 的转换

对于数字类型的,如果赋值为None,那么会转换为相应的NaN类型:

1
2
3
4
5
6
7
8
9
10
ini复制代码In [21]: s = pd.Series([1, 2, 3])

In [22]: s.loc[0] = None

In [23]: s
Out[23]:
0 NaN
1 2.0
2 3.0
dtype: float64

如果是对象类型,使用None赋值,会保持原样:

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码In [24]: s = pd.Series(["a", "b", "c"])

In [25]: s.loc[0] = None

In [26]: s.loc[1] = np.nan

In [27]: s
Out[27]:
0 None
1 NaN
2 c
dtype: object

缺失值的计算

缺失值的数学计算还是缺失值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
less复制代码In [28]: a
Out[28]:
one two
a NaN -0.282863
c NaN 1.212112
e 0.119209 -1.044236
f -2.104569 -0.494929
h -2.104569 -0.706771

In [29]: b
Out[29]:
one two three
a NaN -0.282863 -1.509059
c NaN 1.212112 -0.173215
e 0.119209 -1.044236 -0.861849
f -2.104569 -0.494929 1.071804
h NaN -0.706771 -1.039575

In [30]: a + b
Out[30]:
one three two
a NaN NaN -0.565727
c NaN NaN 2.424224
e 0.238417 NaN -2.088472
f -4.209138 NaN -0.989859
h NaN NaN -1.413542

但是在统计中会将NaN当成0来对待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
less复制代码In [31]: df
Out[31]:
one two three
a NaN -0.282863 -1.509059
c NaN 1.212112 -0.173215
e 0.119209 -1.044236 -0.861849
f -2.104569 -0.494929 1.071804
h NaN -0.706771 -1.039575

In [32]: df['one'].sum()
Out[32]: -1.9853605075978744

In [33]: df.mean(1)
Out[33]:
a -0.895961
c 0.519449
e -0.595625
f -0.509232
h -0.873173
dtype: float64

如果是在cumsum或者cumprod中,默认是会跳过NaN,如果不想统计NaN,可以加上参数skipna=False

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
r复制代码In [34]: df.cumsum()
Out[34]:
one two three
a NaN -0.282863 -1.509059
c NaN 0.929249 -1.682273
e 0.119209 -0.114987 -2.544122
f -1.985361 -0.609917 -1.472318
h NaN -1.316688 -2.511893

In [35]: df.cumsum(skipna=False)
Out[35]:
one two three
a NaN -0.282863 -1.509059
c NaN 0.929249 -1.682273
e NaN -0.114987 -2.544122
f NaN -0.609917 -1.472318
h NaN -1.316688 -2.511893

使用fillna填充NaN数据

数据分析中,如果有NaN数据,那么需要对其进行处理,一种处理方法就是使用fillna来进行填充。

下面填充常量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sql复制代码In [42]: df2
Out[42]:
one two three four five timestamp
a NaN -0.282863 -1.509059 bar True NaT
c NaN 1.212112 -0.173215 bar False NaT
e 0.119209 -1.044236 -0.861849 bar True 2012-01-01
f -2.104569 -0.494929 1.071804 bar False 2012-01-01
h NaN -0.706771 -1.039575 bar True NaT

In [43]: df2.fillna(0)
Out[43]:
one two three four five timestamp
a 0.000000 -0.282863 -1.509059 bar True 0
c 0.000000 1.212112 -0.173215 bar False 0
e 0.119209 -1.044236 -0.861849 bar True 2012-01-01 00:00:00
f -2.104569 -0.494929 1.071804 bar False 2012-01-01 00:00:00
h 0.000000 -0.706771 -1.039575 bar True 0

还可以指定填充方法,比如pad:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ini复制代码In [45]: df
Out[45]:
one two three
a NaN -0.282863 -1.509059
c NaN 1.212112 -0.173215
e 0.119209 -1.044236 -0.861849
f -2.104569 -0.494929 1.071804
h NaN -0.706771 -1.039575

In [46]: df.fillna(method='pad')
Out[46]:
one two three
a NaN -0.282863 -1.509059
c NaN 1.212112 -0.173215
e 0.119209 -1.044236 -0.861849
f -2.104569 -0.494929 1.071804
h -2.104569 -0.706771 -1.039575

可以指定填充的行数:

1
ini复制代码In [48]: df.fillna(method='pad', limit=1)

fill方法统计:

方法名 描述
pad / ffill 向前填充
bfill / backfill 向后填充

可以使用PandasObject来填充:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
less复制代码In [53]: dff
Out[53]:
A B C
0 0.271860 -0.424972 0.567020
1 0.276232 -1.087401 -0.673690
2 0.113648 -1.478427 0.524988
3 NaN 0.577046 -1.715002
4 NaN NaN -1.157892
5 -1.344312 NaN NaN
6 -0.109050 1.643563 NaN
7 0.357021 -0.674600 NaN
8 -0.968914 -1.294524 0.413738
9 0.276662 -0.472035 -0.013960

In [54]: dff.fillna(dff.mean())
Out[54]:
A B C
0 0.271860 -0.424972 0.567020
1 0.276232 -1.087401 -0.673690
2 0.113648 -1.478427 0.524988
3 -0.140857 0.577046 -1.715002
4 -0.140857 -0.401419 -1.157892
5 -1.344312 -0.401419 -0.293543
6 -0.109050 1.643563 -0.293543
7 0.357021 -0.674600 -0.293543
8 -0.968914 -1.294524 0.413738
9 0.276662 -0.472035 -0.013960

In [55]: dff.fillna(dff.mean()['B':'C'])
Out[55]:
A B C
0 0.271860 -0.424972 0.567020
1 0.276232 -1.087401 -0.673690
2 0.113648 -1.478427 0.524988
3 NaN 0.577046 -1.715002
4 NaN -0.401419 -1.157892
5 -1.344312 -0.401419 -0.293543
6 -0.109050 1.643563 -0.293543
7 0.357021 -0.674600 -0.293543
8 -0.968914 -1.294524 0.413738
9 0.276662 -0.472035 -0.013960

上面操作等同于:

1
css复制代码In [56]: dff.where(pd.notna(dff), dff.mean(), axis='columns')

使用dropna删除包含NA的数据

除了fillna来填充数据之外,还可以使用dropna删除包含na的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ini复制代码In [57]: df
Out[57]:
one two three
a NaN -0.282863 -1.509059
c NaN 1.212112 -0.173215
e NaN 0.000000 0.000000
f NaN 0.000000 0.000000
h NaN -0.706771 -1.039575

In [58]: df.dropna(axis=0)
Out[58]:
Empty DataFrame
Columns: [one, two, three]
Index: []

In [59]: df.dropna(axis=1)
Out[59]:
two three
a -0.282863 -1.509059
c 1.212112 -0.173215
e 0.000000 0.000000
f 0.000000 0.000000
h -0.706771 -1.039575

In [60]: df['one'].dropna()
Out[60]: Series([], Name: one, dtype: float64)

插值interpolation

数据分析时候,为了数据的平稳,我们需要一些插值运算interpolate() ,使用起来很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
yaml复制代码In [61]: ts
Out[61]:
2000-01-31 0.469112
2000-02-29 NaN
2000-03-31 NaN
2000-04-28 NaN
2000-05-31 NaN
...
2007-12-31 -6.950267
2008-01-31 -7.904475
2008-02-29 -6.441779
2008-03-31 -8.184940
2008-04-30 -9.011531
Freq: BM, Length: 100, dtype: float64
1
2
3
4
5
6
7
8
9
10
11
12
13
14
yaml复制代码In [64]: ts.interpolate()
Out[64]:
2000-01-31 0.469112
2000-02-29 0.434469
2000-03-31 0.399826
2000-04-28 0.365184
2000-05-31 0.330541
...
2007-12-31 -6.950267
2008-01-31 -7.904475
2008-02-29 -6.441779
2008-03-31 -8.184940
2008-04-30 -9.011531
Freq: BM, Length: 100, dtype: float64

插值函数还可以添加参数,指定插值的方法,比如按时间插值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
yaml复制代码In [67]: ts2
Out[67]:
2000-01-31 0.469112
2000-02-29 NaN
2002-07-31 -5.785037
2005-01-31 NaN
2008-04-30 -9.011531
dtype: float64

In [68]: ts2.interpolate()
Out[68]:
2000-01-31 0.469112
2000-02-29 -2.657962
2002-07-31 -5.785037
2005-01-31 -7.398284
2008-04-30 -9.011531
dtype: float64

In [69]: ts2.interpolate(method='time')
Out[69]:
2000-01-31 0.469112
2000-02-29 0.270241
2002-07-31 -5.785037
2005-01-31 -7.190866
2008-04-30 -9.011531
dtype: float64

按index的float value进行插值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ini复制代码In [70]: ser
Out[70]:
0.0 0.0
1.0 NaN
10.0 10.0
dtype: float64

In [71]: ser.interpolate()
Out[71]:
0.0 0.0
1.0 5.0
10.0 10.0
dtype: float64

In [72]: ser.interpolate(method='values')
Out[72]:
0.0 0.0
1.0 1.0
10.0 10.0
dtype: float64

除了插值Series,还可以插值DF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
less复制代码In [73]: df = pd.DataFrame({'A': [1, 2.1, np.nan, 4.7, 5.6, 6.8],
....: 'B': [.25, np.nan, np.nan, 4, 12.2, 14.4]})
....:

In [74]: df
Out[74]:
A B
0 1.0 0.25
1 2.1 NaN
2 NaN NaN
3 4.7 4.00
4 5.6 12.20
5 6.8 14.40

In [75]: df.interpolate()
Out[75]:
A B
0 1.0 0.25
1 2.1 1.50
2 3.4 2.75
3 4.7 4.00
4 5.6 12.20
5 6.8 14.40

interpolate还接收limit参数,可以指定插值的个数。

1
2
3
4
5
6
7
8
9
10
11
12
r复制代码In [95]: ser.interpolate(limit=1)
Out[95]:
0 NaN
1 NaN
2 5.0
3 7.0
4 NaN
5 NaN
6 13.0
7 13.0
8 NaN
dtype: float64

使用replace替换值

replace可以替换常量,也可以替换list:

1
2
3
4
5
6
7
8
9
10
ini复制代码In [102]: ser = pd.Series([0., 1., 2., 3., 4.])

In [103]: ser.replace(0, 5)
Out[103]:
0 5.0
1 1.0
2 2.0
3 3.0
4 4.0
dtype: float64
1
2
3
4
5
6
7
8
ini复制代码In [104]: ser.replace([0, 1, 2, 3, 4], [4, 3, 2, 1, 0])
Out[104]:
0 4.0
1 3.0
2 2.0
3 1.0
4 0.0
dtype: float64

可以替换DF中特定的数值:

1
2
3
4
5
6
7
8
9
10
css复制代码In [106]: df = pd.DataFrame({'a': [0, 1, 2, 3, 4], 'b': [5, 6, 7, 8, 9]})

In [107]: df.replace({'a': 0, 'b': 5}, 100)
Out[107]:
a b
0 100 100
1 1 6
2 2 7
3 3 8
4 4 9

可以使用插值替换:

1
2
3
4
5
6
7
8
ini复制代码In [108]: ser.replace([1, 2, 3], method='pad')
Out[108]:
0 0.0
1 0.0
2 0.0
3 0.0
4 4.0
dtype: float64

本文已收录于 www.flydean.com/07-python-p…

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Hive 大厂面试题

发表于 2021-06-24

这是我参与更文挑战的第24天,活动详情查看:更文挑战

1 Hive的架构

Hive元数据默认存储在derby数据库,不支持多客户端访问,所以将元数据存储在MySQl,支持多客户端访问。
image.png

2 Hive和数据库比较

Hive 和数据库除了拥有类似的查询语言,再无类似之处。

  1. 数据存储位置
      Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
  2. 数据更新
      Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,
  3. 执行延迟
      Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
  4. 数据规模
      Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。

3 内部表和外部表

元数据、原始数据

  1. 删除数据时:
      内部表:元数据、原始数据,全删除

  外部表:元数据 只删除
2. 在公司生产环境下,什么时候创建内部表,什么时候创建外部表?
  在公司中绝大多数场景都是外部表。

  自己使用的临时表,才会创建内部表;

4 4个By区别

  1. Order By:全局排序,只有一个Reducer;
  2. Sort By:分区内有序;
  3. Distrbute By:类似MR中Partition,进行分区,结合sort by使用。
  4. Cluster By:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。

  在生产环境中Order By用的比较少,容易导致OOM。

  在生产环境中Sort By + Distrbute By用的多。

5 系统函数

  1. date_add、date_sub函数(加减日期)
  2. next_day函数(周指标相关)
  3. date_format函数(根据格式整理日期)
  4. last_day函数(求当月最后一天日期)
  5. collect_set函数
  6. get_json_object解析json函数
  7. NVL(表达式1,表达式2)
      如果表达式1为空值,NVL返回值为表达式2的值,否则返回表达式1的值。

6 自定义UDF、UDTF函数

  1. 在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?
1. 用UDF函数解析公共字段;用UDTF函数解析事件字段。
2. 自定义UDF:继承UDF,重写evaluate方法
3. 自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
  1. 为什么要自定义UDF/UDTF?
      因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试。

  引入第三方jar包时,也需要。

7 窗口函数

  1. Rank
1. RANK() 排序相同时会重复,总数不会变
2. DENSE\_RANK() 排序相同时会重复,总数会减少
3. ROW\_NUMBER() 会根据顺序计算
  1. OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化
1. CURRENT ROW:当前行
2. n PRECEDING:往前n行数据
3. n FOLLOWING:往后n行数据
4. UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点
5. LAG(col,n):往前第n行数据
6. LEAD(col,n):往后第n行数据
7. NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。
  1. 手写TopN

8 Union与Union all区别

  1. union会将联合的结果集去重,效率较union all差
  2. union all不会对结果集去重,所以效率高

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

对不起,率先脱单了,能学会的找对象神器--Nacos

发表于 2021-06-23

这是我参与更文挑战的第 12 天,活动详情查看:更文挑战

作者:JavaGieGie

微信公众号:Java开发零到壹

前言

上次教大家用EasyCode怎么实现1小时解决一周的需求,不知道小伙伴们有没有机会尝试一波呢。今天先不哈牛皮了,给大家介绍一款集配置中心与注册中心于一体的神器—Nacos。本文可以说是单身老铁的福音,学完之后你可以骄傲的和旁边的小妹妹同事说,嗨,你知道Nacos是什么嘛,不出意外很快就能脱单,抱得美人归😁。

20190723874757_VyaTxZ.jpg

正文

Nacos是什么

Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置,帮助您更敏捷和容易地构建、交付和管理微服务平台。

Nacos有哪些功能

Nacos可以动态将application.yml中的文件动态配置,无需重启服务,也可以配合dubbo,类似Zookeeper的功能。主要有以下几个功能,有兴趣的小伙伴可以在文末访问Nacos官网查看:

  • 服务发现及监测:支持基于 DNS 和基于 RPC 的服务发现;
  • 动态配置服务:修改完配置文件,无需重新部署应用和服务;
  • DNS 服务:更容易地实现中间层负载均衡、更灵活的路由策略、流量控制以及数据中心内网的简单DNS解析服务。

几款注册中心比较:

image.png

动态配置流程讲解

  1. 下载Nacos

演示环境:Win10、Jdk8

点击下载,即可下载2.0.2版本的Nacos,下载完成后解压到本地目录,需要进行以下两个步骤进行配置:

  • 本地新建数据库nacos,将conf / nacos-mysql.sql导入到新建的数据库中;
  • 修改conf / application.properties中数据库文件
1
2
3
4
5
properties复制代码spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
db.user=root
db.password=root
  1. 启动Nacos服务

打开Nacos安装目录中bin目录,Windows用户直接双击startup.cmd启动服务

Linux/Unix/Mac

启动命令(standalone代表着单机模式运行,非集群模式):

1
2
3
> 复制代码sh startup.sh -m standalone
>
>

如果您使用的是ubuntu系统,或者运行脚本报错提示[[符号找不到,可尝试如下运行:

1
2
3
> 复制代码bash startup.sh -m standalone
>
>

Windows

启动命令(standalone代表着单机模式运行,非集群模式):

1
2
3
> 复制代码startup.cmd -m standalone
>
>
  1. 打开Nacos网址

浏览器输入地址**http://127.0.0.1:8848/nacos/index.html**便可打开服务图形化页面,默认用户名密码(nacos/nacos),

如图所示,界面非常清爽舒服,在左侧栏【配置管理】是用于添加配置文件。点击新建配置

image.png

点击新建配置,输入几个配置项,点击【发布】即可完成配置:

  • Data ID:配置文件名称,可以配置dev、prod等来区分环境。
  • Group:不修改则使用默认组名称。
  • 配置格式:根据实际需求选择。

image.png

3. 新建SpringBoot项目

新建过程忽略,大家应该都能够熟练掌握,花Gie新建一个项目【nacos-】已经完成,我们需要配置如下几个参数:

  • 启动类:添加@NacosPropertySource(dataId = "application-dev.yml", autoRefreshed = true) ,application-dev.yml为刚刚配置的Data Id;
  • application.properties:添加nacos.config.server-addr=127.0.0.1:8848,即刚刚启动的Nacos服务地址;
  • pom文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
xml复制代码<properties>
<nacos-config-spring-boot.version>0.2.1</nacos-config-spring-boot.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>${nacos-config-spring-boot.version}</version>
</dependency>

<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-actuator</artifactId>
<version>${nacos-config-spring-boot.version}</version>
</dependency>
</dependencies>
  • 测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@RestController
@ResponseBody
public class NacosController {

@NacosValue(value = "${name}", autoRefreshed = true)
private String name;

@RequestMapping(value = "/get", method = RequestMethod.GET)
@ResponseBody
public String get() {
return name;
}
}

启动项目,测试配置中心

启动SpringBoot项目,浏览器输入http://127.0.0.1:8080/get,就会显示Nacos刚刚配置的花Gie,此时修改Nacos中配置文件内容,无需重启项目,再次访问该地址,会发现内容已经更新。

image.png

总结

今天简单分享了Nacos的配置管理用法,下一章将对配置中心进行讲解分享。既然选择了这条路,我们不能一直沉浸在旧的技术止步不前,毕竟互联网行业技术更新迭代非常快,我们要注重基础的同时也要适当使用一些好用的新技术(虽然也不新了,但我是最近才用到….害羞的的低下头),这样即使面试被问起来心里也有点底气。

耐心看到最后的小伙伴,肯定都是热爱学习的好同志,也是对花Gie分享的技术非常赞同(不要脸+1),看了下时间还有二十分钟就到零晨了,又是充实的一天(差点一口气没上来),睡了睡了,狗命要紧。

点关注,防走丢

以上就是本期全部内容,如有纰漏之处,请留言指教,非常感谢。我是花Gie,有问题大家随时留言讨论 ,我们下期见🦮。

文章持续更新,可以微信搜一搜 Java开发零到壹 第一时间阅读,并且可以获取面试资料学习视频等,有兴趣的小伙伴欢迎关注,一起学习,一起哈🐮🥃。

原创不易,你怎忍心白嫖,如果你觉得这篇文章对你有点用的话,感谢老铁为本文点个赞、评论或转发一下,因为这将是我输出更多优质文章的动力,感谢!

官方Demo : github.com/nacos-group…

官方手册 : nacos.io/zh-cn/docs/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 Seata Client 端 AT 事务请求Ser

发表于 2021-06-23

这是我参与更文挑战的第15天,活动详情查看: 更文挑战

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

一 . 前言

前面一篇说了 AT 模式中到 Template 的所有流程 , 这一篇来看一下后面的 begiunTransaction 做了什么

二 . 流程梳理

流程分为几个节点 :

  • 属性部分 : TransactionInfo + GlobalLockConfig
  • 事务部分 : beginTransaction + commitTransaction
  • 逻辑部分 : execute + TransactionalExecutor

2.1 TransactionInfo 详情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码> PS:M52_02_01 TransactionInfo 对象包含了什么?


```java
C53- TransactionInfo
F53_01- int timeOut : 超时时间
F53_02- String name : 事务名
F53_03- Set<RollbackRule> rollbackRules : 回退规则
F53_04- Propagation propagation
F53_05- int lockRetryInternal : 重试间隔
F53_06- int lockRetryTimes : 重试次数
M53_01- rollbackOn :
M53_02- getPropagation()

public final class TransactionInfo implements Serializable {

public boolean rollbackOn(Throwable ex) {

RollbackRule winner = null;
int deepest = Integer.MAX_VALUE;

if (CollectionUtils.isNotEmpty(rollbackRules)) {
winner = NoRollbackRule.DEFAULT_NO_ROLLBACK_RULE;
for (RollbackRule rule : this.rollbackRules) {
int depth = rule.getDepth(ex);
if (depth >= 0 && depth < deepest) {
deepest = depth;
winner = rule;
}
}
}

return !(winner instanceof NoRollbackRule);
}

public Propagation getPropagation() {
if (this.propagation != null) {
return this.propagation;
}
//default propagation
return Propagation.REQUIRED;
}

}

Propagation 的作用 ?

Propagation 是一个枚举 , 表示的是事务传播的模式 , 包括如下几种 :

  • REQUIRED : 如果事务存在,则使用当前事务执行,否则使用新事务执行
  • REQUIRES_NEW : 如果事务存在,将暂停它,然后使用新事务执行业务。
  • NOT_SUPPORTED : 如果事务存在,则挂起它,然后执行没有事务的业务
  • SUPPORTS : 如果事务不存在,则不执行全局事务,否则执行当前事务的业务
  • NEVER : 如果事务存在,抛出异常,否则执行没有事务的业务
  • MANDATORY: 如果事务不存在,抛出异常,否则执行与当前事务相关的业务

2.2 GlobalLockConfig 详情

对象属性 :

1
2
3
4
5
6
7
8
java复制代码// 再次回顾一下之前看过的对象
public class GlobalLockConfig {

// 锁定重试间隔
private int lockRetryInternal;
// 锁定重试次数
private int lockRetryTimes;
}

逻辑处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码
// 再来看一下配置的方式
private GlobalLockConfig replaceGlobalLockConfig(TransactionInfo info) {

GlobalLockConfig myConfig = new GlobalLockConfig();
myConfig.setLockRetryInternal(info.getLockRetryInternal());
myConfig.setLockRetryTimes(info.getLockRetryTimes());
// 主要看一下这个里面做了什么
return GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
}


public class GlobalLockConfigHolder {

// 关键一 : 线程存储
private static ThreadLocal<GlobalLockConfig> holder = new ThreadLocal<>();

// 关键二 : 这里有个有趣的地方 , 可以看到取得 Previous , 同时设置 Current
public static GlobalLockConfig setAndReturnPrevious(GlobalLockConfig config) {
GlobalLockConfig previous = holder.get();
holder.set(config);
return previous;
}

}

[Pro] : 为什么关键二中 , 获取得是之前的 Config

获取前一个 GlobalLockConfig 主要是用于回退

1
2
3
4
5
6
7
8
java复制代码GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
//.......事务
} finally {
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}

那么问题来了 ,我都已经到了下一个操作了 ,再获取前一个全局锁是为什么 ?

大概想了一下 , 主要应该是这样的流程 , 当第一个事务获取全局锁时 , 其他本地事务如果要执行 ,必须获取全局锁 , 那么 , 下一个事务应该去关注上一个全局锁配置.

因为上一个全局锁未处理完的情况下 , 下一个事务实际上是拿不到一个全局锁的.

1
2
3
4
5
6
7
java复制代码private void resumeGlobalLockConfig(GlobalLockConfig config) {
if (config != null) {
GlobalLockConfigHolder.setAndReturnPrevious(config);
} else {
GlobalLockConfigHolder.remove();
}
}

PS : 不知道这里理解得对不对 ,因为这里ThreadLocal 获取到的是当前线程的配置 , 即一个线程内我的全局锁唯一吗?

TODO : 后文看全局锁的时候再来回顾一下

2.3 beginTransaction 开启事务

上文看完了配置信息 , 这里来看一下事务的启动

1
2
3
4
5
6
7
8
9
10
java复制代码// 其中可以看到 , 主要是3步走 >>>
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);
}
}

2.3.1 triggerBeforeBegin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
java复制代码// 先来看一下 trigger 主要逻辑
M51_03- triggerBeforeBegin
FOR- 循环所有的 TransactionHook : getCurrentHooks
- hook.beforeBegin()

M51_04- triggerAfterBegin
FOR- 循环所有的 TransactionHook : getCurrentHooks
- hook.afterBegin()

// 2个的核心都是调用 TransactionHook的对应方法 , 这里带来了2个问题 :
- TransactionHook 是什么 ?
- TransactionHook 的管理 ?

// [Pro1] : TransactionHook 是什么 ?
TransactionHook 是一个接口 , 它允许通过插槽的方式对流程进行附加操作 , 它的主要实现类为 TransactionHookAdapter

public interface TransactionHook {

/**
* before tx begin
*/
void beforeBegin();

/**
* after tx begin
*/
void afterBegin();

/**
* before tx commit
*/
void beforeCommit();

/**
* after tx commit
*/
void afterCommit();

/**
* before tx rollback
*/
void beforeRollback();

/**
* after tx rollback
*/
void afterRollback();

/**
* after tx all Completed
*/
void afterCompletion();
}

// 这里大概看了一下 , 应该是可以手动配置 Hook 的 , 后面来详细看一下, 案例 :
public void testTransactionCommitHook() throws Throwable {
TransactionHook transactionHook = Mockito.mock(TransactionHook.class);

TransactionHookManager.registerHook(transactionHook);
TransactionalTemplate template = new TransactionalTemplate();
template.execute(transactionalExecutor);
}



// [Pro2] :TransactionHook 的管理 ?
private List<TransactionHook> getCurrentHooks() {
// 通过 TransactionHookManager 对 TransactionHook 进行管理
return TransactionHookManager.getHooks();
}

public final class TransactionHookManager {
// 同样的 , 其内部也是通过一个 ThreadLocal 进行管理
private static final ThreadLocal<List<TransactionHook>> LOCAL_HOOKS = new ThreadLocal<>();
}

2.3.2 DefaultGlobalTransaction # begin 处理

继续来看三步中的核心步骤 : tx.begin(txInfo.getTimeOut(), txInfo.getName())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码C52- DefaultGlobalTransaction        
M52_01- begin(int timeout, String name)
- RootContext.getXID() : 获取 currentXid
- transactionManager.begin(null, null, name, timeout) : transactionManager 开始管理
- GlobalStatus.Begin : 修改装填
- RootContext.bind(xid) : 绑定事务 ID

public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
return;
}
assertXIDNull();
// Step 1 : 获取当前事务 ID
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
// Step 2 : 调用
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
}

PS : RootContext 是什么 ?

RootContext 是根上下文 ,它会当当前 XID 进行管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码C- RootContext
F- ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
F- BranchType DEFAULT_BRANCH_TYPE;
M- bind(@Nonnull String xid)
- CONTEXT_HOLDER.put(KEY_XID, xid)
?- 此处的 CONTEXT_HOLDER 为 FastThreadLocalContextCore


C- FastThreadLocalContextCore
private FastThreadLocal<Map<String, Object>> fastThreadLocal = new FastThreadLocal<Map<String, Object>>() {
@Override
protected Map<String, Object> initialValue() {
return new HashMap<>();
}
};

seata_ContextCore_system.png

2.3.3 TransactionManager 详情

seata_transactionManager.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 这里终于使用 TransactionManager 进行相关的管理了
C53- DefaultTransactionManager
M53_01- begin(String applicationId, String transactionServiceGroup, String name, int timeout)
1- 构架一个新的 GlobalBeginRequest
2- setTransactionName + setTimeout
3- 调用 syncCall 开启事务 , 同时用 GlobalBeginResponse 接收


public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// Step 1 : 构建 Request
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
// Step 2 : 发起请求
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
// Step 3 : 获取Response
return response.getXid();
}

GlobalBeginRequest 与 GlobalBeginResponse 详情

image.png

2.3.4 远程调用

远程调用分为2步 :

  • Step 1 : syncCall 发起远程调用主逻辑
  • Step 2 : sendSyncRequest(Object msg) 正式的调用

Step 1 : syncCall 发起远程调用主逻辑

1
2
3
4
5
6
7
8
java复制代码private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
// 通过 TmNettyRemotingClient 发起 Netty 远程调用
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}

Step 2 : sendSyncRequest(Object msg) 正式的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码 public Object sendSyncRequest(Object msg) throws TimeoutException {
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
int timeoutMillis = NettyClientConfig.getRpcRequestTimeout();
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

// 批量发送消息 , 将消息放入basketMap
if (NettyClientConfig.isEnableClientBatchSendRequest()) {

// 发送批处理消息是同步请求,需要创建messageFuture并将其放入futures
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);

// 把信息放入 basketMap
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
basket.offer(rpcMessage);
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}

try {
// 消息发送获取
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}

} else {
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}

}

2.4 commitTransaction 提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
}


// 去掉log 后 , 可以看到其中的核心代码就是 transactionManager.commit(xid)
public void commit() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
return;
}
assertXIDNotNull();
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
}


@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
// 同样的 , commit 也是发起 syncCall
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}

2.5 execute 处理

具体业务的处理只有一句 rs = business.execute() , 来看一下着其中的所有逻辑 :

从上一篇文章我们知道 , business 是再 Inter 中构建的一个 TransactionalExecutor 匿名对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码 C52- TransactionalExecutor
M52_01- execute
M52_02- getTransactionInfo : 获取 TransactionInfo 对象 -> PS:M52_02_01

// Step 1 : 发起远程调用
C- GlobalTransactionalInterceptor
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
// 核心语句 , 方法代理
return methodInvocation.proceed();
}
});


// Step 2 :AOP 拦截
这里可以看到 , 实际上这里和AOP逻辑是一致的 , 最终通过 CglibAopProxy 中实现了方法的代理

methodInvocation 详情
image.png

总结

这一篇暂时不说 rollback 流程 , 仅仅说了正常的事务处理流程 , 下一篇来说说rollback 已经 Server 端的处理

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java中的拦截器和过滤器有什么区别 (一)什么是过滤器 (

发表于 2021-06-23

本文收录于JavaStarter ,里面有我完整的Java系列文章,学习或面试都可以看看

(一)什么是过滤器

过滤器Filter基于Servlet实现,过滤器的主要应用场景是对字符编码、跨域等问题进行过滤。Servlet的工作原理是拦截配置好的客户端请求,然后对Request和Response进行处理。Filter过滤器随着web应用的启动而启动,只初始化一次。

Filter的使用比较简单,继承Filter 接口,实现对应的init、doFilter以及destroy方法即可。

1、init:在容器启动时调用初始化方法,只会初始化一次

2、doFilter:每次请求都会调用doFilter方法,通过FilterChain 调用后续的方法

3、destroy:当容器销毁时,执行destory方法,只会被调用一次。

下面是详细的代码编写方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class MyFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
System.out.println("初始化拦截器");
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
//做一些处理
chain.doFilter(request,response);
}

@Override
public void destroy() {
System.out.println("销毁拦截器");
}
}

(二)什么是拦截器

拦截器是SpringMVC中实现的一种基于Java反射(动态代理)机制的方法增强工具,拦截器的实现是继承HandlerInterceptor 接口,并实现接口的preHandle、postHandle和afterCompletion方法。

1、preHandle:请求方法前置拦截,该方法会在Controller处理之前进行调用,Spring中可以有多个Interceptor,这些拦截器会按照设定的Order顺序调用,当有一个拦截器在preHandle中返回false的时候,请求就会终止。

2、postHandle:preHandle返回结果为true时,在Controller方法执行之后,视图渲染之前被调用

3、afterCompletion:在preHandle返回ture,并且整个请求结束之后,执行该方法。

具体的代码实现如下,首先编写一个拦截器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码@Component
public class UserInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
System.out.println("preHandle");
String userName=request.getParameter("userName");
String password = request.getParameter("password");
if (userName==null||password==null){
response.setStatus(500);
response.setContentType("text/html; charset=UTF-8");
response.getWriter().print("参数缺失");
return false;
}
//进行用户校验
if (userName.equals("admin")&&password.equals("admin")){
return true;
}else {
response.setStatus(500);
response.setContentType("text/html; charset=UTF-8");
response.getWriter().print("用户名或密码错误");
return false;
}
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
System.out.println("postHandle");
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
System.out.println("afterCompletion");
}
}

编写完拦截器之后,通过一个配置类设置拦截器,并且可以通过addPathPatterns和excludePathPatterns执行哪些请求需要被拦截,哪些不需要被拦截。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Autowired
private UserInterceptor userInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(userInterceptor)
.addPathPatterns("/**")
.excludePathPatterns("/error");
}
}

(三)拦截器与过滤器的区别

相同点:

1、拦截器与过滤器都是体现了AOP的思想,对方法实现增强,都可以拦截请求方法。

2、拦截器和过滤器都可以通过Order注解设定执行顺序

不同点:

1、过滤器属于Servlet级别,拦截器属于Spring级别
Filter是在javax.servlet包中定义的,要依赖于网络容器,因此只能在web项目中使用。

Interceptor是SpringMVC中实现的,归根揭底拦截器是一个Spring组件,由Spring容器进行管理。

2、过滤器和拦截器的执行顺序不同:

下面通过一张图展示Filter和Interceprtor的执行顺序


首先当一个请求进入Servlet之前,过滤器的doFilter方法进行过滤,

进入Servlet容器之后,执行Controller方法之前,拦截器的preHandle方法进行拦截,

执行Controller方法之后,视图渲染之前,拦截器的postHandle方法进行拦截,

请求结束之后,执行拦截器的postHandle方法。

3、过滤器基于函数回调方式实现,拦截器基于Java反射机制实现

(四)总结

实际开发中,拦截器的应用场景会比过滤器要更多,下面是拦截器和过滤器的主要应用场景

拦截器的应用场景:权限控制,日志打印,参数校验

过滤器的应用场景:跨域问题解决,编码转换

我翻了一下历史项目的代码,拦截器在用户权限校验场景下使用会比较多,由于一般是前后端分离项目,过滤器的使用场景就会少很多。

拦截器和过滤器算是比较常用的了,但是还是得注意两者的差距,我是鱼仔,我们下期再见!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 并发之CAS概念以及Atomic**类

发表于 2021-06-23

前言

CAS的应用场景

  • 悲观锁:synchronized关键字与Lock等锁机制都是悲观锁。无论做何种操作,首先都需要先上锁,接下来再去执行后续操作,从而确保了接下来的所有操作都是由当前这个线程来执行的。
  • 乐观锁:线程在操作之前不会做任何预先的处理,而是直接去执行;当在最后执行变量更新的时候,当前线程需要有一种机制来确保当前被操作的变量是没有被其他线程修改的;CAS是乐观锁的一种极为重要的实现方式。

CAS ( Compare And Swap )

  • 比较与交换:这是一个不断循环的过程,一直到变量值被修改成功为止。CAS本身是由硬件指令来提供支持的,换句话说,硬件中是通过一个原子指令来实现比较与交换的;因此,CAS可以确保变量操作的原子性。
  • 例如,有一个变a=1,此时,有一个线程读取到变量a=1,它会记住这个变量,当这个线程想要进行更新变量a=a+1操作时,先查看变量a是不是它以前记得的a=1,如果是就执行a=a+1;如果不是,就记住这个变量的最新值,不进行更新操作(a=a+1),在下一个循环进行更新前再进行比较,是不是前面记录的最新值,是就更新,不是就不能更新,直到更新成功为止。(在多线程情况下,变量有可能被其他线程更改,所有需要CAS)
  • 记住,CAS是原子操作,由硬件实现。 不可能在能进行更新时的瞬间,变量被其他线程更改的情况。

先从一个样例说起

1
2
3
4
5
6
7
8
9
10
java复制代码public class CAST {
private int count;

public int getCount() {
return count;
}
public void inCrease(){
++this.count;
}
}

image.png

  • 可以看到 ++this.count;一行代码有4个字节码指令,就不能保证它的原子性。
  • 读取->修改->写入 这三个操作并非原子操作。
  • 可以使用synchronized,加锁,来让其操作具有原子性,但这是悲观锁,有点耗性能。

Java并发包中有针对对单个变量操作具有原子性的类—Atomic**

  • 前面说过CAS在CPU是一调指令,但是Java代码却不是。Java为使用CPU的特性,比如CPU的缓存一致性协议,Java引入了volatile,来触发CPU缓存一致性,实现可见性,禁止指令重排等。
  • 为了使用CPU的CAS指令Java也进行了类似的操作,即并发包中的atomic包。

image.png

以AtomicInteger为例

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class AtomicTest {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(5);//初始值为5
System.out.println(atomicInteger.get());//返回值
System.out.println(atomicInteger.getAndSet(8));//返回旧的值5,设置新的值8
System.out.println(atomicInteger.get());//返回值
System.out.println(atomicInteger.getAndIncrement());//返回旧的值8,并自增1
System.out.println(atomicInteger.get());//返回值
/*
上面的方法都是原子操作
*/
}
}

image.png

大致了解AtomicInteger的原理

对于CAS来说,其操作数主要涉及到如下三个:

1. 需要被操作的内存值V

2. 需要进行比较的值A

3. 需要进行写入的值B

只有当V==A的时候,CAS才会通过原子操作的手段来将V的值更新为B。

image.png

  • 看看里面的getAndSet()方法

image.png

image.png

Atomic**缺点

  • 关于CAS的限制或是问题:
  1. 循环开销问题:并发量大的情况下会导致线程一直自旋(循环)
  2. 只能保证一个变量的原子操作,即不可以a=b+1,但是可以通过AtomicReference类来实现对多个变量的原子操作
  3. ABA问题:1-> 3 ->1。即在更新前,值被另外一个线程从旧值(1)变成新值(3)再变成旧值(1),当前线程没有发现值在中途有变化过,在更新前进行比较发现值没变(还是1),遂进行更新。
    • 虽然该问题对结果没什么影响,但在语义上是不正确的
    • 可以通过给变量加(绑定)版本号(version),即变量更新一次,版本号+1。或者
    • 通过时间戳(Timestamp),即变量更新时将更新的时间与其绑定,

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Traefik--云原生下的Nginx替代品

发表于 2021-06-23

不可否认,Nginx作为老牌的负载软件经久不衰,依然是绝大多数情况下的不二选择,但是在云原生时代,Nginx却显得力有不逮。

由于微服务架构以及Docker技术和K8s编排工具最近几年才开始逐渐流行,所以一开始的反向代理服务器比如Nginx、Apache等并未提供其支持。所以才会出现Ingress Controller这种东西来做k8s和Nginx之间的衔接。而Traefik天生就提供了Docker、k8s的支持,也就是说traefik本身就能跟k8s api交互感知后端变化,因此在使用traefik时,Ingress controller和nginx这类工具就失去了存在的意义。

在目前我们使用Docker-Compose编排所有微服务的阶段,Traefik能够依赖自身的特性,自动发现服务的变更,动态调整自身的负载均衡配置。

一、示例

下面我们通过一个示例,来展现traefik在容器化时代的过人之处。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bash复制代码version: "3.8"

services:

traefik:
image: traefik:v2.4.8
ports:
- 18181:80
- 18182:8080
volumes:
- "/var/run/docker.sock:/var/run/docker.sock" # trafik通过docker.sock监控后端变化
command:
- "--api.insecure=true" # 开启不安全接口方便本地调试
- "--api.dashboard=true" # 开启Dashboard便于调式,生产环境建议关闭
- "--accesslog=true" # 开启日志
- "--providers.docker=true" # 后端使用Docker作为工作模式
- "--entrypoints.testapi.address=:80" # 新建一个名为 testapi 的入口监听80端口


whoami:
image: containous/whoami
labels:
- "traefik.http.routers.app.rule=Host(`192.168.1.178`)" # 用户使用 localhost 访问时,此服务进行响应
- "traefik.http.routers.app.entrypoints=testapi" # 入口为 testapi

docker-compoes启动 docker-compose up --scale whoami=3后,使用 curl 进行访问 curl http://192.168.1.178:18181/, 得到的响应如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
makefile复制代码Hostname: 21274e83b339
IP: 127.0.0.1
IP: 172.30.0.2
RemoteAddr: 172.30.0.3:56066
GET / HTTP/1.1
Host: 192.168.1.178:18181
User-Agent: curl/7.64.1
Accept: */*
Accept-Encoding: gzip
X-Forwarded-For: 10.100.1.7
X-Forwarded-Host: 192.168.1.178:18181
X-Forwarded-Port: 18181
X-Forwarded-Proto: http
X-Forwarded-Server: 89a1ed50d8e4
X-Real-Ip: 10.100.1.7

多次访问,每次响应的Hostname都是不同的,说明traefik实现了请求在多个whoami容器之间进行负载。

如果使用多级路径进行路由判断可以添加如下配置

1
bash复制代码- "traefik.http.routers.app.rule=Path(`/test`)"  # 指定/test路径才响应

此时我们需要使用 http://192.168.1.178:18181/test 才能获得正确响应。

二、跨 docker-compose 负载演示

上面的内容都是基础性的配置演示,下面简单介绍一下在docker-compose中更有意思的配置。

我们在另一个 docker-compose 中启动一个新的 whoami 服务

1
2
3
4
5
6
7
8
bash复制代码version: "3.8"
services:
whoami2:
image: containous/whoami
labels:
- "traefik.http.routers.1.rule=Host(`192.168.1.178`)" # 用户使用 localhost 访问时,此服务进行响应
- "traefik.http.routers.1.rule=Path(`/test2`)"
- "traefik.http.routers.1.entrypoints=testapi" # 入口为 testapi

当我们使用 http://192.168.1.178:18181/test 访问时响应的是第一个docker-compse中的服务,使用http://192.168.1.178:18181/test2访问时,响应的则是第二个docker-compose中的服务。

三、负载均衡演示

那么如果想通过 /test 路径实现跨docker-compose中的两个服务之间的负载呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
bash复制代码version: "3.8"

services:

traefik:
image: traefik:v2.4.8
ports:
- 18181:80
- 18182:8080
volumes:
- "/var/run/docker.sock:/var/run/docker.sock" # trafik通过docker.sock监控后端变化
command:
- "--api.insecure=true" # 开启不安全接口方便本地调试
- "--api.dashboard=true" # 开启Dashboard便于调式,生产环境建议关闭
- "--accesslog=true" # 开启日志
- "--providers.docker=true" # 后端使用Docker作为工作模式
- "--entrypoints.testapi.address=:80" # 新建一个名为 testapi 的入口监听80端口


whoami:
image: containous/whoami
labels:
- "traefik.http.routers.app.entrypoints=testapi" # 入口为 testapi
- "traefik.http.routers.app.rule=Host(`192.168.1.178`)" # 用户使用 localhost 访问时,此服务进行响应
- "traefik.http.routers.app.rule=Path(`/test`)"
- "traefik.http.services.whoami.loadbalancer.server.scheme=http"
- "traefik.http.services.whoami.loadbalancer.server.port=80"


whoami_2:
image: containous/whoami
labels:
- "traefik.http.services.whoami.loadbalancer.server.scheme=http"
- "traefik.http.services.whoami.loadbalancer.server.port=80"

此时使用curl http://192.168.1.178:18181/test 会有2个容器分别进行负载响应,从此以后,容器内的服务变更都不在需要更新nginx配置文件了

五、可观测性

在Traefik-2.x的生态里,将可观测性分成了如下几部分

  • 服务日志: Traefik 进程本身相关的操作日志
  • 访问日志: 由Traefik接管的代理服务的访问日志
  • Metrics: Traefik 提供的自身详细的metrics数据
  • Tracing: Traefik也提供了链路追踪相关接口,可用来可视化分布式或微服务中的调用情况

我们主要说明Metrics中的监控能力,首先开启Promethus数据

1
2
ini复制代码- "--metrics.prometheus=true"
- "--metrics.prometheus.buckets=0.100000, 0.300000, 1.200000, 5.000000"

配置prometheus的监控任务

1
2
3
4
yaml复制代码  # 监控面板: 11462
- job_name: "Traefik"
static_configs:
- targets: ["192.168.1.178:18182"]

可观测到如下面板

image.png

六、其它问题

如何解决CORS跨域问题: doc.traefik.io/traefik/mid…
如何使用HTTPS: doc.traefik.io/traefik/htt…

更多配置、更多用法请自行查询相关文档!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

广州某小公司:ThreadLocal面试

发表于 2021-06-23

《对线面试官》系列目前已经连载24篇啦!有深度风趣的系列!

  • 【对线面试官】Java注解
  • 【对线面试官】Java泛型
  • 【对线面试官】 Java NIO
  • 【对线面试官】Java反射 && 动态代理
  • 【对线面试官】多线程基础
  • 【对线面试官】 CAS
  • 【对线面试官】synchronized
  • 【对线面试官】AQS&&ReentrantLock
  • 【对线面试官】线程池
  • 【对线面试官】ThreadLocal
  • 【对线面试官】CountDownLatch和CyclicBarrier
  • 【对线面试官】为什么需要Java内存模型?
  • 【对线面试官】List
  • 【对线面试官】Map
  • 【对线面试官】SpringMVC
  • 【对线面试官】Spring基础
  • 【对线面试官】SpringBean生命周期
  • 【对线面试官】Redis基础
  • 【对线面试官】Redis持久化
  • 【对线面试官】Kafka基础
  • 【对线面试官】使用Kafka会考虑什么问题?
  • 【对线面试官】MySQL索引
  • 【对线面试官】MySQL 事务&&锁机制&&MVCC
  • 【对线面试官】MySQL调优

文章以纯面试的角度去讲解,所以有很多的细节是未铺垫的。

鉴于很多同学反馈没看懂【对线面试官】系列,基础相关的知识我确实写过文章讲解过啦,但有的同学就是不爱去翻。

为了让大家有更好的体验,我把基础文章也找出来(重要的知识点我还整理过电子书,比如说像多线程、集合这种面试必考的早就已经转成PDF格式啦)

我把这些上传到网盘,你们有需要直接下载就好了。做到这份上了,不会还想白嫖吧?点赞和转发又不用钱。

链接:pan.baidu.com/s/1pQTuKBYs… 密码:3wom

欢迎关注我的微信公众号【Java3y】来聊聊Java面试

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…634635636…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%