开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Linux 完全公平调度算法

发表于 2021-06-25

Linux 进程调度算法经历了以下几个版本的发展:

  • 基于时间片轮询调度算法。(2.6之前的版本)
  • O(1) 调度算法。(2.6.23之前的版本)
  • 完全公平调度算法。(2.6.23以及之后的版本)

之前我写过一篇分析 O(1)调度算法 的文章:O(1)调度算法,而这篇主要分析 Linux 现在所使用的 完全公平调度算法。

分析 完全公平调度算法 前,我们先了解下 完全公平调度算法 的基本原理。

完全公平调度算法基本原理

完全公平调度算法 体现在对待每个进程都是公平的,那么怎么才能做到完全公平呢?有一个比较简单的方法就是:让每个进程都运行一段相同的时间片,这就是 基于时间片轮询调度算法。如下图所示:

图片

如上图所示,开始时进程1获得 time0 ~ time1 的CPU运行时间。当进程1的时间片使用完后,进程2获得 time1 ~ time2 的CPU运行时间。而当进程2的时间片使用完后,进程3获得 time2 ~ time3 的CPU运行时间。

如此类推,由于每个时间片都是相等的,所以理论上每个进程都能获得相同的CPU运行时间。这个算法看起来很不错,但存在两个问题:

  • 不能按权重分配不同的运行时间,例如有些进程权重大的应该获得更多的运行时间。
  • 每次调度时都需要遍历运行队列中的所有进程,找到优先级最大的进程运行,时间复杂度为 O(n)。对于一个高性能的操作系统来说,这是不能接受的。

为了解决上面两个问题,Linux内核的开发者创造了 完全公平调度算法。

完全公平调度的两个时间

为了实现 完全公平调度算法,为进程定义两个时间:

  1. 实际运行时间:

实际运行时间 = 调度周期 * 进程权重 / 所有进程权重之和

  • 调度周期:是指所有可进程运行一遍所需要的时间。
  • 进程权重:依据进程的重要性,分配给每个进程不同的权重。

例如,调度周期为 30ms,进程A的权重为 1,而进程B的权重为 2。那么:

进程A的实际运行时间为:30ms * 1 / (1 + 2) = 10ms

进程B的实际运行时间为:30ms * 2 / (1 + 2) = 20ms

  1. 虚拟运行时间:

虚拟运行时间 = 实际运行时间 * 1024 / 进程权重 = (调度周期 * 进程权重 / 所有进程权重之和) * 1024 / 进程权重 = 调度周期 * 1024 / 所有进程总权重

从上面的公式可以看出,在一个调度周期里,所有进程的 虚拟运行时间 是相同的。所以在进程调度时,只需要找到 虚拟运行时间 最小的进程调度运行即可。

为了能够快速找到 虚拟运行时间 最小的进程,Linux 内核使用 红黑树 来保存可运行的进程,而比较的键值就是进程的 虚拟运行时间。

如果不了解 红黑树 的话,可以把它看成一个自动排序的容器即可。如下图所示:

图片

如上图所示,红黑树 的左节点比父节点小,而右节点比父节点大。所以查找最小节点时,只需要获取 红黑树 的最左节点即可,时间复杂度为 O(logN)。

完全公平调度的两个对象

Linux 内核为了实现 完全公平调度算法,定义两个对象:cfs_rq (可运行进程队列) 和 sched_entity (调度实体)。

  • cfs_rq (可运行进程队列):使用 红黑树 结构来保存内核中可以运行的进程。
  • sched_entity (调度实体):可被内核调度的实体,如果忽略组调度(本文也不涉及组调度),可以把它当成是进程。

cfs_rq 对象定义

1
2
3
4
5
6
7
8
9
10
arduino复制代码struct cfs_rq {
struct load_weight load;
unsigned long nr_running; // 运行队列中的进程数
u64 exec_clock; // 当前时钟
u64 min_vruntime; // 用于修证虚拟运行时间

struct rb_root tasks_timeline; // 红黑树的根节点
struct rb_node *rb_leftmost; // 缓存红黑树最左端节点, 用于快速获取最小节点
...
};

对于 cfs_rq 对象,现在主要关注的是 tasks_timeline 成员,其保存了可运行进程队列的根节点(红黑树的根节点)。

sched_entity 对象定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rust复制代码struct sched_entity {
struct load_weight load;
  struct rb_node    run_node;         // 用于连接到运行队列的红黑树中
struct list_head group_node;
  unsigned int    on_rq;              // 是否已经在运行队列中

u64 exec_start; // 开始统计运行时间的时间点
u64 sum_exec_runtime; // 总共运行的实际时间
u64 vruntime; // 虚拟运行时间(用于红黑树的键值对比)
u64 prev_sum_exec_runtime; // 总共运行的虚拟运行时间

u64 last_wakeup;
u64 avg_overlap;
...
};

对于 sched_entity 对象,现在主要关注的是 run_node 成员,其主要用于把调度实体连接到可运行进程的队列中。

另外,进程描述符 task_struct 对象中,定义了一个类型为 sched_entity 的成员变量 se,如下:

1
2
3
4
5
arduino复制代码struct task_struct {
...
struct sched_entity se;
...
}

所以,他们之间的关系如下图:

图片

从上图可以看出,所有 sched_entity 对象通过其 run_node 成员组成了一颗红黑树,这颗红黑树的根结点保存在 cfs_rq 对象的 task_timeline 成员中。

另外,进程描述符 task_sturct 定义了一个类型为 sched_entity 的成员变量 se,所以通过进程描述符的 se 字段就可以把进程保存到可运行队列中。

完全公平调度算法实现

有了上面的基础,现在可以开始分析 Linux 内核中怎么实现 完全公平调度算法 了。

我们先来看看怎么更新一个进程的虚拟运行时间。

1. 更新进程虚拟运行时间

更新一个进程的虚拟运行时间是通过 __update_curr() 函数完成的,其代码如下:

/src/kernel/sched_fair.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
arduino复制代码static inline void
__update_curr(struct cfs_rq *cfs_rq, struct sched_entity *curr,
unsigned long delta_exec)
{
unsigned long delta_exec_weighted;

curr->sum_exec_runtime += delta_exec; // 增加进程总实际运行的时间
delta_exec_weighted = delta_exec; // 初始化进程使用的虚拟运行时间

// 根据进程的权重计算其使用的虚拟运行时间
if (unlikely(curr->load.weight != NICE_0_LOAD)) {
delta_exec_weighted = calc_delta_fair(delta_exec_weighted, &curr->load);
}

curr->vruntime += delta_exec_weighted; // 更新进程的虚拟运行时间
}

__update_curr() 函数各个参数意义如下:

  • cfs_rq:可运行队列对象。
  • curr:当前进程调度实体。
  • delta_exec:实际运行的时间。

__update_curr() 函数主要完成以下几个工作:

  1. 更新进程调度实体的总实际运行时间。
  2. 根据进程调度实体的权重值,计算其使用的虚拟运行时间。
  3. 把计算虚拟运行时间的结果添加到进程调度实体的 vruntime 字段。

我们接着分析怎么把进程添加到运行队列中。

2. 把进程调度实体添加到运行队列中

要将进程调度实体添加到运行队列中,可以调用 __enqueue_entity() 函数,其实现如下:

/src/kernel/sched_fair.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ini复制代码static void __enqueue_entity(struct cfs_rq *cfs_rq, struct sched_entity *se)
{
struct rb_node **link = &cfs_rq->tasks_timeline.rb_node; // 红黑树根节点
struct rb_node *parent = NULL;
struct sched_entity *entry;
s64 key = entity_key(cfs_rq, se); // 当前进程调度实体的虚拟运行时间
int leftmost = 1;

while (*link) { // 把当前调度实体插入到运行队列的红黑树中
parent = *link;
entry = rb_entry(parent, struct sched_entity, run_node);

if (key < entity_key(cfs_rq, entry)) { // 比较虚拟运行时间
link = &parent->rb_left;
} else {
link = &parent->rb_right;
leftmost = 0;
}
}

if (leftmost) {
cfs_rq->rb_leftmost = &se->run_node; // 缓存红黑树最左节点
cfs_rq->min_vruntime = max_vruntime(cfs_rq->min_vruntime, se->vruntime);
}

// 这里是红黑树的平衡过程(参考红黑树数据结构的实现)
rb_link_node(&se->run_node, parent, link);
rb_insert_color(&se->run_node, &cfs_rq->tasks_timeline);
}

__enqueue_entity() 函数的主要工作如下:

  1. 获取运行队列红黑树的根节点。
  2. 获取当前进程调度实体的虚拟运行时间。
  3. 把当前进程调度实体添加到红黑树中(可参考红黑树的插入算法)。
  4. 缓存红黑树最左端节点。
  5. 对红黑树进行平衡操作(可参考红黑树的平衡算法)。

调用 __enqueue_entity() 函数后,就可以把进程调度实体插入到运行队列的红黑树中。同时会把红黑树最左端的节点缓存到运行队列的 rb_leftmost 字段中,用于快速获取下一个可运行的进程。

3. 从可运行队列中获取下一个可运行的进程

要获取运行队列中下一个可运行的进程可以通过调用 __pick_next_entity() 函数,其实现如下:

/src/kernel/sched_fair.c

1
2
3
4
5
6
7
8
9
10
11
c复制代码static struct sched_entity *__pick_next_entity(struct cfs_rq *cfs_rq)
{
// 1. 先调用 first_fair() 获取红黑树最左端节点
// 2. 调用 rb_entry() 返回节点对应的调度实体
return rb_entry(first_fair(cfs_rq), struct sched_entity, run_node);
}

static inline struct rb_node *first_fair(struct cfs_rq *cfs_rq)
{
return cfs_rq->rb_leftmost; // 获取红黑树最左端节点
}

前面介绍过,红黑树的最左端节点就是虚拟运行时间最少的进程,所以 __pick_next_entity() 函数的流程就非常简单了,如下:

  • 首先调用 first_fair() 获取红黑树最左端节点。
  • 然后再调用 rb_entry() 返回节点对应的调度实体。

调度时机

前面介绍了 完全公平调度算法 怎么向可运行队列添加调度的进程和怎么从可运行队列中获取下一个可运行的进程,那么 Linux 内核在什么时候会进行进程调度呢?

答案是由 Linux 内核的时钟中断触发的。

时钟中断 是什么?如果没了解过操作系统原理的可能不了解这个东西,但是没关系,不了解可以把他当成是定时器就好了,就是每隔固定一段时间会调用一个回调函数来处理这个事件。

时钟中断 犹如 Linux 的心脏一样,每隔一定的时间就会触发调用 scheduler_tick() 函数,其实现如下:

1
2
3
4
5
6
javascript复制代码void scheduler_tick(void)
{
...
curr->sched_class->task_tick(rq, curr, 0); // 这里会调用 task_tick_fair() 函数
...
}

scheduler_tick() 函数会调用 task_tick_fair() 函数处理调度相关的工作,而 task_tick_fair() 主要通过调用 entity_tick() 来完成调度工作的,调用链如下:

scheduler_tick() -> task_tick_fair() -> entity_tick()

entity_tick() 函数实现如下:

1
2
3
4
5
6
7
8
9
scss复制代码static void
entity_tick(struct cfs_rq *cfs_rq, struct sched_entity *curr, int queued)
{
// 更新当前进程的虚拟运行时间
update_curr(cfs_rq);
...
if (cfs_rq->nr_running > 1 || !sched_feat(WAKEUP_PREEMPT))
check_preempt_tick(cfs_rq, curr); // 判断是否需要进行进程调度
}

entity_tick() 函数主要完成以下工作:

  • 调用 update_curr() 函数更新进程的虚拟运行时间,这个前面已经介绍过。
  • 调用 check_preempt_tick() 函数判断是否需要进行进程调度。

我们接着分析 check_preempt_tick() 函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码static void
check_preempt_tick(struct cfs_rq *cfs_rq, struct sched_entity *curr)
{
unsigned long ideal_runtime, delta_exec;

// 计算当前进程可用的时间片
ideal_runtime = sched_slice(cfs_rq, curr);

// 进程运行的实际时间
delta_exec = curr->sum_exec_runtime - curr->prev_sum_exec_runtime;

// 如果进程运行的实际时间大于其可用时间片, 那么进行调度
if (delta_exec > ideal_runtime)
resched_task(rq_of(cfs_rq)->curr);
}

check_preempt_tick() 函数主要完成以下工作:

  • 通过调用 sched_slice() 计算当前进程可用的时间片。
  • 获取当前进程在当前调度周期实际已运行的时间。
  • 如果进程实际运行的时间大于其可用时间片, 那么调用 resched_task() 函数进行进程调度。

可以看出,在 时钟中断 的处理中,有可能会进行进程调度。除了 时钟中断 外,一些主动让出 CPU 的操作也会进行进程调度(如一些 I/O 操作),这里就不详细分析了,有兴趣可以自己研究。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

来掘金三个月 , 我被榨干了|2021 年中总结

发表于 2021-06-25

这是我参与更文挑战的第16天,活动详情查看: 更文挑战

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

一 . 前言

**哈哈哈哈 , 当一次标题党 , 其实我早就是一条沸点老咸鱼了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151


其实,很早之前我就已经来掘金了, 时候的后端其实文章不多, 当时我的主要知识来源还是 CSDN **(PS:感觉博客园界面不好看😜)** , 在查资料的过程中无意中点开了简书 , **然后由简书指引到了掘金(百度上是真的搜不到 , 直到现在还是一言难尽)**


无意中发现了简书和掘金后 , 第一感觉就是文章质量比较高 , 看了一段时间文章后 , 在 2018 年正式的注册了账号🏷️🏷️🏷️



> 你以为我来掘金是看文章的 ?


**大错特错 ,** 吸引我来掘金是因为文章 , 但**是能留住我的最大原因就是沸点 !!!!**


工作之余刷刷沸点摸摸鱼 , 偶尔再来看下文章 , 不香吗 ?


看了太多了文章后 , 还是决定贡献自己的一份力量 ,**于是从三个月之前 , 我正式在掘金发文 >**


二 . 过往历程
--------


### 2.1 这2年 , 我干了什么


在2年前 , 我逐渐减少了 CSDN 的文档输出 , 主要原因之前也聊过 , **主要是由于文章质量一直处在较低的水平** , 所以想要沉淀一下 , 安安心心的学技术 , 看文章 , 涨见识 👉 **于是 , 两年前 , 我干了这些事 :**



> **实现了一个自己的博客网站**


**曾经我也有很多程序员的向往 :**


* 我想要一个自己的服务器 , 自己在上面捣鼓
* 想要一个自己的博客 , 做一个安静的技术分享者


所以我就做了 , 虽然不算很成功 , 但是也不算失败 , 我完成了自己的既定目标 **(尽管上面什么都没有)**


**欢迎看一眼我博客的遗容 ->** [www.antblack.xyz/](http://www.antblack.xyz/)


**PS : 由于有段时间没维护 ,CDN 过期了 , 现在已经不剩啥了**



> **写了一个超级大的 Demo 项目**


这归功于我一贯的 **先写demo , 后搭框架** 的良好习惯 , 每次搭相关的框架前都会自己写一个 demo


其中有的是自己摸索的 , 有的是参考前辈的 , 有的是官网资料


但是不吹不黑 , **比项目能比我多的 , 全世界应该不超过1000个**


不信你瞅瞅 👉 👉 [github.com/black-ant/c…](https://github.com/black-ant/case)



> **写了半个身份认证项目**


这是一个身份认证平台 , 我的主业就是身份认证领域的 , 在使用中产生了一些新的思考 ,于是用其他的框架实现了核心功能


这个项目的核心就是无限扩展 , 随意定制.


基本上验证了自己的想法后, 也就慢慢停止迭代了 , 毕竟时间有限


👉 👉 [github.com/black-ant/a…](https://github.com/black-ant/antsso)



> **写了半个流程引擎**


这是一个流程引擎, 目的主要是数据传输与数据清洗等传输操作 , 即是对工作内容的变种实现 , 也是对自己技能的熟悉


这个主业务写好后 , 也停止了 👉 👉[github.com/black-ant/e…](https://github.com/black-ant/expengine)



> **看了无数源码 , 写了无数文档 (当然是本地呀)**


这2年 , 秉承着厚积薄发的心态 ,我的文档基本上是没有发出来的 , 一直在我的本地乖乖的躺着 , **没事 就是秀一下**


![image.png](https://gitee.com/songjianzaina/juejin_p2/raw/master/img/344458fb881facbcda60dab0df15092d23464b02d90d5d5a9783a63aa754cf0f)



> 总结


**这 2年 , 很多事没做 ,但是也做了很多事 , 总体来说 , 是我提升最快的2年 , 不算太满意**


### 2.2 2021 上半年 ,我干了什么


![image.png](https://gitee.com/songjianzaina/juejin_p2/raw/master/img/6fed3d2c376620f525a62cc1e6a102ccc4529778502052f8f367215962a9d2e7)


去年12月 , 我把 **2020年年度计划**

👉 正式改为了 **2021年年度计划**

👉 并且继承了 **去年的对自己期盼的鼓励**

👉 继续完成 **前年定下的任务**


现如今的进度 , 就和上图一样 , 可以说 , 除了文档越来越说 , 其他的简直毫无进展呀😷😷


(PS : 假的假的 , 目标还是完成了不少 , 也留下了不少 ,今年肯定也会留下不少)


### 2.3 这3个月 , 我干了什么


大概半年前 , 我试着在自己的 Blog 发文 , 但是我放弃了


**你以为我是因为没人访问吗 ?** (当然是 )

**其实我是舍不得扩容我的带宽** (也是)

**还有我的服务器内存 , 我的 CPU ,我的 数据库 , 我的 CDN , 都买不起了**


👉 总得来说 , 养不起 , 也没空维护


于是 , 我开始将2年来积攒的文档开始整理输出出来 , 厚积薄发 , **我开始高产赛母猪** .


到了今天 , 陆陆续续发了 60 多篇 , 之前源码整理的东西也差不多了 , 后面当然也有一些 , **但是差不多也被榨干了**

要不是为了收藏纪念品 , 我才不写这么多呢 小编看这👉(6月要是没我 , 封笔封笔 !!!!!!!)

image.png

2.4 下半年 , 我想要做什么

第一计划肯定是把 2021 年度计划完成啦 ,为了让自己对进展更清楚 , 写文章的时候顺便做了一个简单的报表 ,来梳理以后的计划

image.png

附录 : 关于写文档的三次思考

每过一段时间 , 我都会对文档的质量做一次思考 , 即上次二次总结后 , 又想了很多很多

因为最近写文章的时候 ,发现了太多太多优秀的纯技术文章 , 于是我产生了迷茫 >>

  • 这篇文档怎么写的这么好
  • 他的时序图画的太清楚了
  • 他怎么对细节了解的这么清楚
  • 他的排版好清楚

我发现 ,我怎么写 , 也无法超越他们的文档 ,那么 , 我还应该继续吗???

那我还不如不写 , 把这时间花在细读文档上面不是更好吗???

思考了很长时间 , 我还是继续去写文档 , 我想清楚了这样几件事 :

  • 文章的目的是总结 , 服务的第一对象是自己 , 把别人的文档背的再熟 ,也没有自己写一篇深刻
  • 我比一定要比别人写的优秀 , 而只需要达到自己的目标
  • 无需排斥别人的文章 , 要带着感恩的心态去学习别人的知识 , 并且自我完善
  • 可以积极的推广别人的文章 , 文学的交流是分享

技术本来就是一条康庄大道 , 相同的文章从来就不是对手 , 我们始终可以从其他的技术文档中获得更多的动力

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java多线程实战|CyclicBarrier原理介绍及使用

发表于 2021-06-25

这是我参与更文挑战的第25天,活动详情查看: 更文挑战


前言:

上一期讲了CountDownLatch的使用方法感兴趣的可以回看一下 传送门:CountDownLatch原理介绍及使用场景,今天我们讲一下java.util.concurrent工具类里的另一个工具CyclicBarrier正如其名,“循环栅栏”,是Java提供的一种特定场景下的多线程之间进行交互的使用方法;

图片.png

CyclicBarrier:

举个例子,比如小明,小美,小华,小丽几人终于历经多年课本出题历程,高考结束,相约一起聚餐,然而他们每个人到达约会地点的耗时都一样,有的人会早到,有的人会晚到,但是他们要都到了以后才可以决定点那些菜。

这个时候我们就可以使用JUC包中为我们提供了一个同步工具类来模拟这类场景,CyclicBarrier,利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。这里没人人相当于一个线程,而餐厅就是 CyclicBarrier。

介绍:CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 和 CountDownLatch 很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。

网上找了一个很形象的动图如下:

src=http _img2.sycdn.imooc.com_5b890c850001309203140008.jpg&refer=http _img2.sycdn.imooc.com&app=2002&size=f9999,10000&q=a80&n=0&g=0n&fmt=jpeg.gif

CyclicBarrier它有两个构造函数:

1
2
3
4
5
6
7
8
9
10
csharp复制代码public CyclicBarrier(int parties) {
this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
  • parties 是参与线程的个数,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
  • 第二个构造方法有一个 Runnable 参数,这个参数的意思是,线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

await()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码//非定时等待
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}

//定时等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
  • 线程调用 await() 表示自己已经到达栅栏
  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
    可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法
dowait:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
scss复制代码private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//检查当前栅栏是否被打翻
if (g.broken) {
throw new BrokenBarrierException();
}
//检查当前线程是否被中断
if (Thread.interrupted()) {
//如果当前线程被中断会做以下三件事
//1.打翻当前栅栏
//2.唤醒拦截的所有线程
//3.抛出中断异常
breakBarrier();
throw new InterruptedException();
}
//每次都将计数器的值减1
int index = --count;
//计数器的值减为0则需唤醒所有线程并转换到下一代
if (index == 0) {
boolean ranAction = false;
try {
//唤醒所有线程前先执行指定的任务
final Runnable command = barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
//唤醒所有线程并转到下一代
nextGeneration();
return 0;
} finally {
//确保在任务未成功执行时能将所有线程唤醒
if (!ranAction) {
breakBarrier();
}
}
}

//如果计数器不为0则执行此循环
for (;;) {
try {
//根据传入的参数来决定是定时等待还是非定时等待
if (!timed) {
trip.await();
}else if (nanos > 0L) {
nanos = trip.awaitNanos(nanos);
}
} catch (InterruptedException ie) {
//若当前线程在等待期间被中断则打翻栅栏唤醒其他线程
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
Thread.currentThread().interrupt();
}
}
//如果线程因为打翻栅栏操作而被唤醒则抛出异常
if (g.broken) {
throw new BrokenBarrierException();
}
//如果线程因为换代操作而被唤醒则返回计数器的值
if (g != generation) {
return index;
}
//如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
typescript复制代码/**
* TODO
*
* @author taoze
* @version 1.0
* @date 6/24/21 3:16 PM
*/
public class CyclicBarrierTest {

public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("全部到达"+Thread.currentThread().getName()+"呼叫服务员开始点餐!");
service.shutdown();

}
});
for (int j = 0; j < 5; j++) {
service.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "同学到达");
barrier.await();
System.out.println(Thread.currentThread().getName()+"同学点餐");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

});
}
service.shutdown();
}
}

执行结果:

图片.png

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
  • CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
  • CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。

ok!今天的文章就到这了,以上就是CyclicBarrier的简单使用,希望可以对大家有帮助,有不对的地方希望大家可以提出来的,共同成长;

整洁成就卓越代码,细节之中只有天地

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【快速开发】Lombok 快速入门

发表于 2021-06-24

这是我参与更文挑战的第17天,活动详情查看:更文挑战

欢迎阅读:
新晋奶爸,在技术路上不断前进 |2021 年中总结

一、 Lombok 介绍

  Lombok能以简单的注解形式来简化java代码,提高开发人员的开发效率。例如开发中经常需要写的javabean,都需要花时间去添加相应的getter/setter,也许还要去写构造器、equals等方法,而且需要维护,当属性多时会出现大量的getter/setter方法,这些显得很冗长也没有太多技术含量,一旦修改属性,就容易出现忘记修改对应方法的失误。

  Lombok能通过注解的方式,在编译时自动为属性生成构造器、getter/setter、equals、hashcode、toString方法。出现的神奇就是在源码中没有getter和setter方法,但是在编译生成的字节码文件中有getter和setter方法。这样就省去了手动重建这些代码的麻烦,使代码看起来更简洁些。

  Lombok的使用跟引用jar包一样,可以在官网(projectlombok.org/download)下载…

1
2
3
4
5
6
7
js复制代码<!-- lombok 依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>

二、常用注解

  • @Data注解在类上,会为类的所有属性自动生成setter/getter、equals、canEqual、hashCode、toString方法,如为final属性,则不会为该属性生成setter方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
js复制代码import lombok.AccessLevel;
import lombok.Setter;
import lombok.Data;
import lombok.ToString;

@Data
public class DataExample {
private final String name;
@Setter(AccessLevel.PACKAGE) private int age;
private double score;
private String[] tags;

@ToString(includeFieldNames=true)
@Data(staticConstructor="of")
public static class Exercise<T> {
private final String name;
private final T value;
}
}
  • @Getter/@Setter如果觉得@Data太过残暴(因为@Data集合了@ToString、@EqualsAndHashCode、@Getter/@Setter、@RequiredArgsConstructor的所有特性)不够精细,可以使用@Getter/@Setter注解,此注解在属性上,可以为相应的属性自动生成Getter/Setter方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
js复制代码import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;

public class GetterSetterExample {

@Getter @Setter
private int age = 10;


@Setter(AccessLevel.PROTECTED)
private String name;

@Override public String toString() {
return String.format("%s (age: %d)", name, age);
}
}
  • @NonNull该注解用在属性或构造器上,Lombok会生成一个非空的声明,可用于校验参数,能帮助避免空指针。
1
2
3
4
5
6
7
8
9
10
js复制代码import lombok.NonNull;

public class NonNullExample extends Something {
private String name;

public NonNullExample(@NonNull Person person) {
super("Hello");
this.name = person.getName();
}
}
  • @Cleanup该注解能帮助我们自动调用close()方法,很大的简化了代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
js复制代码import lombok.Cleanup;
import java.io.*;

public class CleanupExample {
public static void main(String[] args) throws IOException {

@Cleanup
InputStream in = new FileInputStream(args[0]);

@Cleanup
OutputStream out = new FileOutputStream(args[1]);

byte[] b = new byte[10000];
while (true) {
int r = in.read(b);
if (r == -1) break;
out.write(b, 0, r);
}
}
}
  • @EqualsAndHashCode默认情况下,会使用所有非静态(non-static)和非瞬态(non-transient)属性来生成equals和hasCode,也能通过exclude注解来排除一些属性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
js复制代码import lombok.EqualsAndHashCode;

@EqualsAndHashCode
public class EqualsAndHashCodeExample {
private transient int transientVar = 10;
private String name;
private double score;
@EqualsAndHashCode.Exclude
private Shape shape = new Square(5, 10);
private String[] tags;
@EqualsAndHashCode.Exclude
private int id;

public String getName() {
return this.name;
}

@EqualsAndHashCode(callSuper=true)
public static class Square extends Shape {
private final int width, height;

public Square(int width, int height) {
this.width = width;
this.height = height;
}
}
}
  • @ToString类使用@ToString注解,Lombok会生成一个toString()方法,默认情况下,会输出类名、所有属性(会按照属性定义顺序),用逗号来分割。通过将includeFieldNames参数设为true,就能明确的输出toString()属性。这一点是不是有点绕口,通过代码来看会更清晰些。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
js复制代码import lombok.ToString;

@ToString
public class ToStringExample {
private static final int STATIC_VAR = 10;
private String name;
private Shape shape = new Square(5, 10);
private String[] tags;
@ToString.Exclude private int id;

public String getName() {
return this.name;
}

@ToString(callSuper=true, includeFieldNames=true)
public static class Square extends Shape {
private final int width, height;

public Square(int width, int height) {
this.width = width;
this.height = height;
}
}
}
  • @NoArgsConstructor, @RequiredArgsConstructor and @AllArgsConstructor无参构造器、部分参数构造器、全参构造器。Lombok没法实现多种参数构造器的重载。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
js复制代码import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.NonNull;

@RequiredArgsConstructor(staticName = "of")
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class ConstructorExample<T> {
private int x, y;
@NonNull private T description;

@NoArgsConstructor
public static class NoArgsExample {
@NonNull private String field;
}
}

三、优缺点

优点:

1、能通过注解的形式自动生成构造器、getter/setter、equals、hashcode、toString等方法,提高了一定的开发效率。

2、让代码变得简洁,不用过多的去关注相应的方法。

3、属性做修改时,也简化了维护为这些属性所生成的getter/setter方法等。

缺点:

1、不支持多种参数构造器的重载。

2、虽然省去了手动创建getter/setter方法的麻烦,但大大降低了源代码的可读性和完整性,降低了阅读源代码的舒适度。

四、官方文档

  阅读更多官方文档信息,请点击Lombok介绍。感谢各位阅读。

推荐专栏:

从零开始搭建个人技术博客

Java全栈架构师

Spring Cloud系列

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

有些写代码的,不过是一群羊!

发表于 2021-06-24

原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。

羊都是以群论的。如果你感觉它的单位是只,那只能证明你太穷。

真正牧羊的,从来不会因为晚上烤了只全羊,而感觉到自己的羊少了。不信重温一下李安的《断背山》看看,关注点别搞错了,我们是在谈羊。

当在它们耳朵上钉上红绳子,或者激进点彩绘一下,你就再也无法分辨哪只羊是张三,哪只羊是李四。牧羊犬也不会关心这东西,它只关心它的晚饭。

所以大多数羊没有身份。它只是只羊而已。

软件开发中,有些公司把程序员们当作资源池来利用,那么他们的本质就是羊(没错就是外包)。羊群的特征很有意思,但单体并没有什么存在感。大多数时候,低下头吃草就够了。

来瞧瞧吧。

羊吃草

羊的工作就是吃草。

它不管草有多少,也不管为什么会被发放到某个地方吃草,它吃草甚至是不用经过大脑思考。牧羊人圈定了一片草原,它们就会从左吃到右,然后再从右吃到左,直到把这片草原吃光为止。

羊的嘴很臭,被啃过的草一年内不会发出新芽。

如果你不管它们,草吃光了,有些羊就开始吃草根,草根吃光了就慢慢饿死。它从来不会想着到外面的世界看一看。

羊会反刍,就是把嚼过的东西,重新再弄到嘴里嚼一遍。这种重复的动作,使得羊的智商非常的低,只能进行这些低级的循环 。羊有疝气,它们嘴里的东西其实很难闻但它自己感觉不到,只有目光呆滞的在那里蠕动它的牙齿,直到嚼出草绿色的津液来。

羊惊群

有时候,羊会表现出让人莫名其妙的动作。

一个阳光明媚的下午,所有的羊都在认真吃草。突然,有一只羊悚然之间,竖起了耳朵瞪大了眼睛,仿佛看到了极具危险的事物。

image.png

或许是它的脑子短路,突然想到了一些不该属于它的东西;或许是草里的蚂蚱吓了它一跳。只要有一只羊表现出了这种态度,正在吃草的羊,就会在几秒之内全部停止咀嚼,竖起耳朵瞪大了眼睛。

这个状态会持续十几秒,直到危险解除–更大可能是根本就没有危险。

在群体中,个体的警觉会突然之间就传遍整个集体,但谁也弄不明白危险到底来自何方 。或许只有在这十几秒钟,在停止吃草的时间,羊才会思考为什么自己是只羊,而不是趴在旁边的牧羊犬。

更多时候,羊会低下头继续吃草,所有的羊都会快速忘掉这种集体性的思考。

羊不怕死

在杀猪的时候,猪会一直嚎叫;杀牛的时候,牛会流泪。

羊不一样,它不吭一声,甚至连害怕的神情都没有。

我们都习惯称羊为沉默的羔羊,就是因为它很安静。安静是因为它智商低,而不是因为它勇敢。

我杀过羊。从绑住它的蹄子吊起来,到刀子刺进它的脖子里放血。羊会因为痛挣扎几下,但整个过程出奇的顺利。所以杀羊的人,从来没有什么心理压力,因为这个过程太柔滑了。

有时候刀子刺的不准,羊会饶有兴趣的看着自己的血液躺下来,并伸出舌头感受一下。它可能会被一只蚂蚱吓得竖起双耳,但它并不怎么怕拿刀子的你。

它可能认为自己的主人,并不会拿它开刀。即使你在它面前宰了它的同伴,它也会觉得自己是特殊的。

搞笑的是,羊的温顺都是表现在对外方面,多数羊在内部并不老实。

两只羊,会因为一些谁都闹不清楚的原因,跳起来碰对方的头。头破血流的,掉羊角的比比皆是。那阵势,就是要以命相抵,越斗越来劲。

我越来越觉得,羊对生命并不渴望,对更好的生活也没有追求。 我甚至觉得,它希望死亡。

它从来不像狗一样,挖个坑储藏食物;也不像乌鸦一样,就喜欢收集发光的东西。羊从一生下来,眼光中就透露着生无所恋,只有在发情期的时候,才能表现的像个正常的动物一样。

image.png

它并不怕死,因为它从来没搞懂为什么活着。况且一直吃草、咀嚼的一生,本就没什么值得留恋的地方。

牧羊犬

羊的温顺,枯燥,会让人觉得没有攻击性,所以喜欢羊的人很多(我不是说伊拉克被解放的羊群们)。但羊也并没有什么忠诚度,它能因为某只小母羊,就串到别的羊群里,让你再也找不到它。

认清楚每一只羊,对人来说很困难,因为这整个过程会让人感到乏味、枯燥且没必要。豢养几只牧羊犬,是最常见的方式。牧羊犬会尽职尽责的盯梢,追逐拖后腿的、不听话的羊**。牧羊犬多数情况下并不需要劳动,也不需要啃草皮** ,所以阳光明媚的时候,它可以翻着肚皮晒太阳,把狗屎排在任何地方。

牧羊犬会自己找乐子。 它会戏弄某只羊,追着它跑,虽然最后都忘掉了;它会制定自己的规则,比如某些强迫症的牧羊犬会要求羊群必须以某个路线行走。

羊惊群的时候,也是牧羊犬最警觉的时候。哪怕十几秒钟,如果羊有独立思考的时间,整个生态就会有大的变化。只要有一只羊不正常的奔跑起来,整个群体就会发生踩踏,牧羊犬就会疲于奔命。

这种可能影响晚饭的场景,牧羊犬会特别上心。

牧羊人的晚饭,偶尔是烤全羊,而牧羊犬期待着落下的骨头。至于羊,并不太在乎这种结局。

看起来大家都很满意,只有看客们多虑了。我们还是数一下下面的图有多少只羊吧。

image.png

数不清楚?只能说明不适合当牧羊犬,没什么好值得伤心的。

作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构和Linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。我的个人微信xjjdog0,欢迎添加好友,进一步交流。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

聊聊golang map的实现细节 前言 map实现 总结

发表于 2021-06-24

前言

map是一种快速存取键值对的数据结构,通常能在O(1)的时间内执行增删改查操作。业务中经常用其实现set去重数据,或对一组对象建立映射关系等。除了怎么使用,作为开发者最关心的还是其底层实现,其get,put,扩容怎么实现。不像其他语言(例如java)可以直接看,go需要看源代码编译成汇编代码后调了哪些底层方案。本文对map的原理做简要介绍,分析其定位key,扩容,遍历的实现

map实现

数据结构

在研究各种操作前,需要了解其数据结构,先上map的结构体定义(基于go1.13)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
go复制代码// 位于runtime/map.go

type hmap struct {

// 键值对的数量

count int

// 标记当前是否有协程在写,或遍历

flags uint8

// 桶数组长度 = 2 ^ B。当n=3时,桶数组长度=8

B uint8

noverflow uint16

// hash种子,新建map时生成

hash0 uint32

// 指向桶数组

buckets unsafe.Pointer

// 指向老桶数组,扩容时用

oldbuckets unsafe.Pointer

// 以及搬迁的桶个数,扩容时用

nevacuate uintptr



extra *mapextra // optional fields

}

hmap为new出来的map,底层调用/runtime/map.makemap生成,其存储了map的一些基本信息

hash种子参与计算key的hash值,由于每个map的hash种子都是随机生成,因此就算同样的key在两个map中大概率定位到不同的桶,关于随机性的必要性后文说明

可以看到有两个字段指向桶数组。不扩容时,hmap.buckets指向正常桶数组

扩容时,由于是渐进式扩容,需要同时保存老桶和新桶,因此buckets指向新桶,oldbuckets指向老桶,

两个桶同时用于查找,遍历等操作,当扩容完毕删除oldbuckets

桶的结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码type bmap struct {

// tophash,作用下文说明

topbits [bucketCnt]uint8 // bucketCnt = 8

keys [bucketCnt]keytype

values [bucketCnt]valuetype

pad uintptr

// 指向溢出桶

overflow uintptr

}

可以看到一个桶最多存储8个键值对,若超过8个元素都被定位到该桶时,需建新桶,挂到前一个桶的overflow上,以保证有位置存放元素

来一张整体图:

WechatIMG68.jpeg

怎么定位key

根据key获取value,或者存储key,vlaue对都需先在在哈希表中定位key,然后返回value或将value存储到相应的位置,其流程如下:

  1. 计算hash(key),每种类型的哈希方法由系统初始化, string类型的key哈希函数为strhash,底层调用memhash,详见runtime/alg.go

假设生成的hash值如下(64位机器生成的hash值为64位):

1
yaml复制代码11011010 | 0001010011101000111001101110111010011100111111010011 | 1010
  1. 根据哈希值定位应该在哪个桶

假设B=4,桶的数量为2^4 = 16,则hash值最后4位用来定位key应该在哪个桶,本应该是

hash % len(buckets)其等价于 len(buckets) ^ (hash后4位),由于取余操作不如位运算操作快,因此一般采用后一种方式,本例定位到10号桶

  1. 定位槽

当定位到在哪个桶后,需要进一步定位key在桶中哪个槽,一般的方式为挨个遍历比较,但go map为bucket中每个key生成了一个tophash,取值为hash值前8位。由于每个key的tophash大概率不同,因此可以先和tophash的每个值比较,若不等,则key一定不在该槽,若相等,则大概率在,但还需要进一步调用equal() 方法验证。比较整数比比较其他类型速度更快,因此可快速排除不相等的key,提高搜索key的性能

整体布局如下所示:

WechatIMG69.jpeg

当tophash相等后,因为key,value皆以数组的方式存放,可根据其在topbits的下标直接通过内存地址计算出key,value的地址进行定位和后续操作

gin使用的路由框架httprouter也使用了类似的思想,当我们定义路由时,httprouter将其组织成压缩字典树的形式,从上到下保存每个节点的信息

例如 /api/order/get , 由3个节点分别保存 api,order,get,代表每级路由关系,若api下有多个路径,则api节点的子节点有多个,此时请求进入服务,需要挨个匹配路径。

httprouter在每个节点用indices存放了所有子节点的首字母,因此大多数情况下可以只比较一个字母进入下一级

  1. 下一个桶

当前桶遍历完没有key时,若bmap.overflow不为空,需要继续寻找溢出桶,定位方式和之前一样

用到溢出桶,需要一个桶8位塞满8个元素,这在其他语言发生概率较低,比如java的hashmap,默认装置因子0.75表示平均一个桶有0.75个元素就要发生扩容,但go map的扩容条件比较苛刻,装载因子为6.5,因此有一定的概率使用溢出桶

我们以代码val,ok := map[key]实际调用的方法mapaccess2() 来看看key定位流程,以下省略了一些无关代码,因此十分简短易懂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
scss复制代码func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {

// 不能同时读写

if h.flags&hashWriting != 0 {

throw("concurrent map read and map write")

}

// 获取计算key哈希的方法,每种类型的哈希方法由系统初始化,详见runtime/alg.go

alg := t.key.alg

// 计算hash值

hash := alg.hash(key, uintptr(h.hash0))

// 计算掩码

m := bucketMask(h.B)

// 定位在哪个bucket

b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + (hash&m)*uintptr(t.bucketsize)))



// 计算tophash,关于tophash下面说明

top := tophash(hash)

bucketloop:

// 遍历bucket

for ; b != nil; b = b.overflow(t) {

// 遍历bucket每个槽

for i := uintptr(0); i < bucketCnt; i++ {

if b.tophash[i] != top {

if b.tophash[i] == emptyRest {

break bucketloop

}

continue

}

k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

if t.indirectkey() {

k = *((*unsafe.Pointer)(k))

}

// 若key相等,说明找到,返回value

if alg.equal(key, k) {

e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))

if t.indirectelem() {

e = *((*unsafe.Pointer)(e))

}

return e, true

}

}

}

// 没有找到key,返回零值和false

return unsafe.Pointer(&zeroVal[0]), false

}

存放键值对的操作和获取类似,先定位key的内存地址,再进行操作,区别在于存放元素时涉及到扩容逻辑

扩容

当数据结构元素太多,影响了增删改查速度甚至装不下时,需要通过扩容缓解

除此之外,go map中就算元素不多,但空桶较多,会触发等量扩容

判断是否触发扩容的逻辑如下

1
2
3
4
5
6
7
8
9
scss复制代码// 当前没有在扩容,且需要扩容

if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {

hashGrow(t, h)

goto again

}
1
2
3
4
5
6
7
go复制代码// 元素个数大于8且元素个数大于 6.5 * 桶的数量

func overLoadFactor(count int, B uint8) bool {

return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)

}
1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码// 溢出桶过多

func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {

if B > 15 {

B = 15

}

return noverflow >= uint16(1)<<(B&15)

}

元素过多好理解,溢出桶过多也是一种需要扩容的情况,此时哈希表中可能元素不多,但有大量桶,且有大量空槽,这种场景出现的原因一般是先大量添加元素,再大量删除,就会留下一些空洞,定位key需要额外扫描空的位置,降低性能,需要进行处理

hashGrow函数做一些扩容的准备工作,例如创建新的桶数组,若为元素过多,则将数组容量翻倍。否则是溢出桶过多,采用原桶容量(元素不多,不需要更大的容量,只是需要重新整合,消除溢出桶)

将新桶数组挂到buckets,老桶数组则转移到oldbuckets

Go map不是一次性扩容,而是渐进式扩容,每次搬迁2个桶到新桶上,其中一个是key所在的桶,和一个另外的桶。这样扩容的好处是将扩容的复杂度均摊到每次操作,保证在map操作上耗时稳定,缺点是实现复杂。实际扩容发生在赋值,删除操作时

扩容以桶为单位,具体来说是将老桶的元素转移到两个新桶,直到该桶及其溢出桶的元素都被转移完毕

扩容状态会影响mapaccess方法,若当前正在扩容中,且遍历的新桶对应的老桶还没被搬迁,则需要到老桶上找元素,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
go复制代码// 检查是否正在写

if h.flags&hashWriting != 0 {

throw("concurrent map read and map write")

}

// ...

// 当前定位到的桶,默认是新桶

b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + (hash&m)*uintptr(t.bucketsize)))

// 老桶存在,说明正在扩容

if c := h.oldbuckets; c != nil {

if !h.sameSizeGrow() {

// There used to be half as many buckets; mask down one more power of two.

m >>= 1

}

// 定位到老桶

oldb := (*bmap)(unsafe.Pointer(uintptr(c) + (hash&m)*uintptr(t.bucketsize)))

// 老桶没有搬迁

if !evacuated(oldb) {

// 下面使用老桶

b = oldb

}

}

这里有个并发问题:如果决定使用老桶,同时cpu切换到另一个协程搬迁了老桶,则当前协程在老桶根据key定位不到数据,但事实上map中有该数据,只是被转移到新桶了。因此在并发场景下需使用sync.Map

另外若在扩容过程中,写入操作也有变化,写入之前会把该key对应的老bucket迁移,并将数据写入新桶,此时读取默认从新桶读,没有问题

遍历

一般的哈希表遍历比较简单,例如java的hashmap,从第一个桶的第一个元素挨个往后遍历就行,直到遍历完所有桶。但go的map比较特殊,体现在以下两点

  • 随机性:每次for-range遍历map的顺序不一样
  • 考虑扩容:由于是渐进式扩容,可能遍历的过程中同时扩容也在进行,有些bucket可能已经被搬到新map,有些没有。因此遍历时需要考虑在新老哪个map取数据,相反若扩容方式为一次性,则只用考虑在一个map上取数据

采用如下方式遍历map:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
go复制代码package main



import "fmt"



func main() {

m := map[string]int{

"beijing":1,

"shanghai":2,

}



for k,v := range m {

fmt.Println(k,v)

}

}

底层首先调用runtime.mapiterinit创建hiter对象

每次for循环调用mapiternext获取下一个kv键值对

hiter的作用为记录当前遍历到的kv,遍历到哪,下次该往哪遍历。详细数据结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
go复制代码type hiter struct {

// key,value的地址

key unsafe.Pointer

elem unsafe.Pointer

t *maptype

h *hmap

// 当前遍历的所有桶

buckets unsafe.Pointer

// 当前遍历的桶

bptr *bmap

overflow *[]*bmap

oldoverflow *[]*bmap

// 初始化时的桶

startBucket uintptr

// 每次从桶的哪个位置开始遍历,适用于所有桶

offset uint8

// 是否遍历到末尾又转到第一个桶

wrapped bool

// 长度,2^B = len(map)

B uint8

// 当前遍历到所在桶第i个位置

i uint8

// 当前遍历到第几个桶,

bucket uintptr

// 如果在扩容过程中,需要检查的bucket,详见考虑扩容

checkBucket uintptr

}

和源代码对应关系如下:

WechatIMG70.jpeg

随机性

go的map每次调用遍历结果顺序不一样,其实现为开始遍历的桶startBucket不一样,且遍历每个桶时开始位置offset也不同,若offset = 3,则每个桶遍历顺序为 [3,4,5,6,7,0,1,2]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码 // decide where to start

r := uintptr(fastrand())

if h.B > 31-bucketCntBits {

r += uintptr(fastrand()) << 31

}

// 同随机数生成startBucket和offset

it.startBucket = r & bucketMask(h.B)

it.offset = uint8(r >> h.B & (bucketCnt - 1))

为什么要使遍历顺序随机呢?go官方解释map遍历如下:

1
sql复制代码The old language specification did not define the order of iteration for maps, and in practice it differed across hardware platforms. This caused tests that iterated over maps to be fragile and non-portable, with the unpleasant property that a test might always pass on one machine but break on another.

总结起来是说以前的编程语言没有定义map的迭代顺序,不同平台的遍历顺序可能不一样,这导致基于特定顺序的代码在不同平台之间的运行结果可能不一致

另外map扩容后,一个bucket的数据会分散到两个bucket,也会导致顺序产生变化

因此为了防止程序员依赖特定的迭代顺序,从go1开始map的迭代就不可预测

考虑扩容

由于可以在每次遍历和写值可以交替执行,因此遍历和扩容有以下几种组合方式

  1. 遍历开始到结束没发生扩容
  2. 遍历开始前就正在扩容
  3. 遍历开始时没有扩容,过程中发生扩容

第一种比较简单,按照顺序遍历就行

第二种遍历扩容后的map,遍历的每个bucket都能对应到old map的一个bucket,首先先查对应老bucket是否已迁移,若已迁移,则遍历当前bucket就行,否则需要回到老bucket找数据。那是否把老bucket的数据取出来挨个遍历呢?

事情并没有这么简单,因为两个新bucket对应一个老bucket,若这里把老bucket数据遍历完,下次到另一个新bucket,又会遍历一次该老bucket,导致重复遍历

因此这里只是遍历该老bucket分裂到当前新bucket的数据,剩余的数据等下次再遍历

相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
javascript复制代码// checkBucket = 新map的bucket,也就是当前遍历的bucket

// 正在扩容且对应的老bucket没有迁移,且不是等长迁移

if checkBucket != noCheck && !h.sameSizeGrow() {

if t.reflexivekey() || alg.equal(k, k) {

hash := alg.hash(k, uintptr(h.hash0))

// 若老bucket的key未来不转移到该bucket,则continue

if hash&bucketMask(it.B) != checkBucket {

continue

}

第三种方式也比较特殊,初始化时hiter. B保存了当时的桶的个数,hiter.buckets保存了所有桶的指针,相当于生成快照。若为等量扩容,则在新数组上遍历,和组合二类似,否则在老数组上遍历

在老数组上遍历就涉及到,当前的bucket是否已被迁移,若没有迁移,那没什么好说的,直接遍历。有人会说,如果遍历过程中,其他协程的修改导致该桶扩容了,老桶被清空了怎么办?

其实不用担心,迁移当没有其他协程在遍历老桶,才会清空老桶

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码// 若没有协程在遍历老桶,才会清空

// Unlink the overflow buckets & clear key/elem to help GC.

if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {

b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))

// Preserve b.tophash because the evacuation

// state is maintained there.

ptr := add(b, dataOffset)

n := uintptr(t.bucketsize) - dataOffset

memclrHasPointers(ptr, n)

}

但此时该key的数据可能被更新或删除了,但没有反应到老桶上,因此需要去新桶查找一番

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
go复制代码// 老桶的数据没有被搬迁

if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY)

...

// 老桶的数据已被搬迁

} else {

// 去新桶查找

rk, re := mapaccessK(t, h, k)

if rk == nil {

continue // key has been deleted

}

it.key = rk

it.elem = re

}

但如果遍历该桶之前其数据就被清空了,则会出现找不到数据的情况,因此最好不要一边遍历一边修改map

总结

本文介绍了go map的数据结构,定位key的方案,涉及到使用索引,和httprouter进行类比。分析了渐进式扩容时怎么进行查找和遍历的方式,map最好在单协程使用,若需要并发读取使用sync.Map

引用

深度解密go语言之map

为什么go中map遍历顺序随机

github.com/julienschmi…

go语言for和range实现

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官:你是如何诊断“Kafka”消息发送到瓶颈在哪里的?

发表于 2021-06-24

在消息发送端遇到性能瓶颈时是否有办法正确地评估瓶颈在哪呢?如何针对性地进行调优呢?

1、Kafka 消息发送端监控指标

其实Kafka早就为我们考虑好了,Kafka提供了丰富的监控指标,并提供了JMX的方式来获取这些监控指标,在客户端提供的监控指标如下图所示:

主要的监控指标分类如下:

  • producer-metrics:消息发送端的监控指标,其子节点为该进程下所有的生产者
  • producer-node-metrics:以Broker节点为维度,每一个发送方的数据指标。
  • producer-topic-metrics:以topic为维度,统计该发送端的一些指标。

Kafka Producer相关的指标比较多,这边不一一罗列。

1.1 producer-metrics

producer-metrics是发送端一个非常重要的监控项,如下图所示:

其重点项说明如下:

  • batch-size-avg:Sender线程实际发送消息时一个批次(ProducerBatch)的平均大小。
  • batch-size-max:Sender线程时间发送消息时一个批次的最大大小。

实践指导:个人觉得这两个参数非常有必要进行采集,如果该值远小于batch.size设置的值,如果吞吐量不达预期,可以适当调大linger.ms。

  • batch-split-rate:Kafka提供了对大的ProducerBatch分割成小的机制,即如果客户端的ProducerBatch如果超过了服务端允许的最大消息大小,将会触发在客户端分割重新发送,该值记录每秒切割的速率
  • batch-split-total:Kafka 发生的 split 次数。

温馨提示:按照笔者对这部分源码的阅读,我觉得ProducerBatch的split的意义不大,因为新分配的ProducerBatch的容量会等于batch.size,未超过该大小,则该Batch不会被分隔,笔者认为该功能大概率无法完成实际的切割意图。

实践指导:如果该值不为0,则表示服务端,客户端设置的消息大小不合理,客户端设置的batch.szie大小应该小于服务端设置的 max.message.bytes,默认值100W字节(约等于1M)

  • buffer-available-bytes:当前发送端缓存区可用字节大小。
  • buffer-total-bytes:发送端总的缓存区大小,默认为32M,33,554,432个字节。

实战指导:如果缓存区剩余字节数持续较低,需要评估缓存区大小是否合适,Sender线程遇到了瓶颈,从而考虑网络、Brorker是否遇到瓶颈。

  • bufferpool-wait-ratio
  • bufferpool-wait-time-total:客户端从缓存区中申请内存用于创建ProducerBatch所阻塞的总时长。

实战指导:如果该值持续大于0,说明发送存在瓶颈,可以适当降低linger.ms的值,让消息有机会得到更加及时的处理。

  • produce-throttle-time-avg:消息发送被broker限流的平均时间
  • produce-throttle-time-max:消息发送被broker限流的最大时间
  • io-ratio:IO线程处理IO读写的总时间
  • io-time-ns-avg:每一次事件选择器调用IO操作的平均时间(单位为纳秒)
  • io-waittime-total:io线程等待读写就绪的平均时间(单位为纳秒)
  • iotime-total:io处理总时间。
  • network-io-rate:客户端每秒所有连接的网络读写tps。
  • network-io-total:客户端所有连接上的网络操作(读或写)总数

1.2 通用指标

Kafka在消息发送端除了上述指标外,还有一些通用类的监控指标,这类指标的统计维度包括:消息发送者、节点、TOPIC三个维度。

主要的维度说明如下:

  • producer-metics:发送端维度
  • producer-node-metrics:发送端-Broker节点维度
  • producer-topic-metrics:发送端-主题维度的统计

接下来说明的指标,分别以不同的维度进行统计,但其表示的含义表示一样,故接下来统一说明。

  • incoming-byte-rate:每秒的入端流量,每秒进入的字节数。
  • incoming-byte-total:总共进入的字节数。
  • outgoing-byte-total:总出发送的字节数。
  • request-latency-avg:消息发送的平均延时。
  • request-latency-max:消息发送的最大延迟时间。

实战指导:latency-avg与max可以反应消息发送的延迟性能,如果延迟过高,说明Sender线程发送消息存在瓶颈,建议该值与linger.ms进行比较,如果该值显著小于linger.ms,则为了提高吞吐率,可适当调整batch.size的大小。

  • request-rate:每秒发送Tps
  • request-size-avg:消息发送的平均大小。
  • request-size-max:Sender线程单次消息发送的最大大小。

实战指导:如果该值迟迟小于max.request.size,说明客户端消息积压的消息不多,如果从其他维度表明遇到了瓶颈,可以适当linger.ms,batch.size,可有效提高吞吐。

  • request-total:请求发送的总字节数
  • response-rate:每秒接受服务端响应TPS
  • response-total:收到服务端响应总数量。

2、监控指标采集

虽然Kafka内置了众多的监控指标,但这些指标默认是存储在内存中,既然是存放在内存中,为了避免监控数据无休止地增加内存触发内存溢出,通常监控数据的存储基本是基于滑动窗口,即只会存储最近一段时间内的监控数据,进行滚动覆盖。

故为了更加直观地展示这些指标,因为需要定时将这些信息进行采集,统一存储在其他数据库等持久化存储,可以根据历史数据绘制曲线,希望实现的效果如下图所示:

基本的监控采集系统架构设计如下图所示:

mq-collect应该是放在生产者SDK中,通过mq-collect类库异步定时将采集信息上传的到时序数据库InfluxDB,然后通过mq-portal门户展示页面,对每一个生产客户端按指标进行可视化展示,实现监控数据的可视化,从而为性能优化提供依据。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

看看掘友如何帮女神,定时发文章的🥱

发表于 2021-06-24

额,不知不觉我已经通宵发文1890天了,在这煎熬的夜晚,我频频失眠,连做梦都想着啥时候有个定时发文章功能,好让我睡个好觉😰😰

一 如何实现这个功能,有啥思路

WechatIMG16379.jpeg

  1. 那就是官方支持了🥺
  2. 只能自己想想办法
    1. 写个脚本 (太废了,成天开着机,不划算)
    2. 观察观察发文章时都干嘛了,想想办法搞他一下 (我们可以调接口啊,毕竟我们都是API调用工程师)

正文

  • 首先准备好我们准备发的文章
+ `必填选项`都必须填了,保存到草稿
![image.png](https://gitee.com/songjianzaina/juejin_p14/raw/master/img/a7cfaacf3c4964c6bffbfd777563c88458ba8ec26feea24ac320c7c56e8ac660)
  • 发布文章,然后抓包
+ 如图,我们会看到重要的一个接口
+ 红色框的就是,发布文章 (`重点`)
+ 红色框下面的是 跳到 发布成功页面 (`啥用都没`)

image.png

  • 下一步,看看这个接口都干嘛了
+ 先看下 Headers,重点看`Cookie` 里的 sessionid ,其他都没啥用
![image.png](https://gitee.com/songjianzaina/juejin_p14/raw/master/img/6385bed96708d86fd38d9f848d929866b562c958d90f2c4f516e3518cec04e0d)
+ 如何获取sessionid (这个页面咋进去,不用我多说了吧)
![image.png](https://gitee.com/songjianzaina/juejin_p14/raw/master/img/bcd4b4c908479620350a44e92d1a75bb4307abe1e8b54f54136a820ff7a7ff91)
+ 接下来看 参数
![image.png](https://gitee.com/songjianzaina/juejin_p14/raw/master/img/e328a0152d9e256c95e0788229ccfe26727103cb2ec39c82bf5a69b65f048276)
+ 解释一下参数:


    - draft\_id 这个是文章Id(`如何拿到看下图`)
    ![image.png](https://gitee.com/songjianzaina/juejin_p14/raw/master/img/5de2dc6f9786304928e114eb948cc8745866d4c48fca777258979df36b1137ad)
    - 其他参数目前来说`没啥用`
  • 那怎么知道发送成功没有
+ 只有响应结果中包含 `"err_no": 0` 就说明成功了

重点已经讲完,咋实现定时发布,各位大佬心里应该有自己想法了

🍀🍀 写代码时刻到了

  • 思路
+ 搞个定时任务
+ 调接口
+ 完事
  • 脚本方式 (缺点很大)
+ 编写一个shell脚本,保存到linux上,然后添加计划任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
shell复制代码#!/bin/bash

PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin
export PATH
curl --location --request POST 'https://api.juejin.cn/content_api/v1/article/publish' \
--header 'Content-Type: application/json' \
--header 'Cookie: sessionid=你的sessionid;' \
--data-raw '{
"draft_id": "你的文章ID",
"sync_to_org": false,
"column_ids": []
}'
echo "----------------------------------------------------------------------------"
endDate=`date +"%Y-%m-%d %H:%M:%S"`
echo "★[$endDate] Successful"
echo "----------------------------------------------------------------------------"
+ 添加计划任务
1
2
3
4
5
6
7
8
9
10
11
12
shell复制代码// 编辑计划任务
vim /etc/crontab

//添加一行
* * * * * user-name command to be executed

// * * * * * 就是core表达式
// user-name 执行的用户
// command to be executed 脚本路径

// 例如:每天3点执行一次
// 0 3 * * * root /xxxx上面的脚本.sh
  • 代码方式(java为例)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
java复制代码/**
* 用到了hutool
* <dependency>
* <groupId>cn.hutool</groupId>
* <artifactId>hutool-all</artifactId>
* <version>4.6.10</version>
* </dependency>
*/

public class Publish {

static DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");

static ScheduledThreadPoolExecutor executor;

public static void main(String[] args) {
if (args.length < 1) {
System.out.println("参数示例:java -cp xxxx.jar com.yeting.juejin.Publish sessionid 文章ID->发布时间 文章ID->发布时间");
System.out.println("sessionid:是掘金sessionid");
System.out.println("文章ID->发布时间:->是分隔符");
return;
}

String sessionId = args[0];

executor = new ScheduledThreadPoolExecutor(
args.length - 1,
ThreadFactoryBuilder.create().setNamePrefix("publish-").build()
);
//遍历所有任务
for (int i = 1; i < args.length; i++) {
String line = args[i];
//截取文章ID和发布时间
String[] split = line.split("->");
String draftId = split[0];
LocalDateTime publishTime = LocalDateTime.parse(split[1], dateFormat);
LocalDateTime now = LocalDateTime.now();
//计算出剩下多少分钟发布
Duration duration = Duration.between(now, publishTime);
long delay = duration.toMinutes();
//交给线程池,定时执行
executor.schedule(() -> {
Boolean publish = publish(draftId, sessionId);
if (publish) {
System.out.println("文章ID:" + draftId + " 发布成功");
} else {
System.err.println("文章ID:" + draftId + " 发布失败");
}
}, delay, TimeUnit.MINUTES);
System.out.println("解析成功,文章ID:" + draftId + " 将在:" + publishTime.format(dateFormat) + " 发布");
}
}

public static Boolean publish(String draftId, String sessionId) {
Map<String, String> headers = new HashMap<>(20);
headers.put("Host", "api.juejin.cn");
headers.put("Connection", "keep-alive");
headers.put("sec-ch-ua", "\" Not;A Brand\";v=\"99\", \"Google Chrome\";v=\"91\", \"Chromium\";v=\"91\"");
headers.put("sec-ch-ua-mobile", "?0");
headers.put("Accept", "*/*");
headers.put("Origin", "https://juejin.cn");
headers.put("Sec-Fetch-Site", "same-site");
headers.put("Sec-Fetch-Mode", "cors");
headers.put("Sec-Fetch-Dest", "empty");
headers.put("Referer", "https://juejin.cn/");
headers.put("Accept-Language", "zh-CN,zh;q=0.9");
headers.put("Cookie", "sessionid=" + sessionId + "; ");

Map<String, Object> body = new HashMap<>(6);
body.put("draft_id", draftId);
body.put("sync_to_org", false);
body.put("column_ids", Collections.emptyList());

String res = HttpUtil.createPost("https://api.juejin.cn/content_api/v1/article/publish")
.headerMap(headers, true)
.body(JSONUtil.toJsonStr(body))
.execute()
.body();
return "0".equals(JSONUtil.parseObj(res).getStr("err_no"));
}

}
  • 啥都懒得干 那就用现成的
    • 👉🏻传送门👈🏻
    • 用完记得点赞!!!!!!!!

为了不被时代抛弃,我也发张妹子图
image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

内外网分离日志方案 rsyslog logstash ten

发表于 2021-06-24

在公司的一周时间中,整理了一下我们自己的日志方案

需求

  1. 分为内网和外网
  2. 分为实时日志和非实时日志
  3. 实时日志通过消息队列通道传到内网中,处理后存储到ES中
  4. 非实时日志上传到腾讯云OSS,然后同步到内网中,作为后续的日志分析,处理后存储到ES和内网中CEPH对象存储中

技术选型

系统 ubuntu18.04,其他带有rsyslog8.x的系统即可,没有也可以安装一个

利用系统服务:rsyslog 8.4.20

logstash 7.13.0

elasticsearch 7.8.0

日志格式我们使用了

1
javascript复制代码HOSTNAME PROGRAME JSON字符串

为什么使用JSON字符串,肯定方便后续的程序去处理它,虽然增加了一些空间。

工作流程

程序写rsyslog日志 -> rsyslog转发到logstash -> logstash对实时日志和非实时日志分流到消息队列和对象存储

一个简单的日志格式

1
kotlin复制代码data yq-data: {"type": "off-line", "time": "2021-06-23T07:03:55.122584Z", "msg": "I am a log message", "level": "DEBUG"}

配置rsyslog

vim /etc/rsyslog.conf

增加下面配置

1
css复制代码local7.* @@logstash-host:5514

logstash-host:5514 为logstash启动syslog的IP和端口
local7为rsyslog的facility

增加一个rsyslog的配置,在/etc/rsyslog.d下面

vim /etc/rsyslog.d/20-default.conf,填入下面内容

1
2
3
ini复制代码template(name="DynaFile" type="string" string="/var/log/%programname%/%programname%.log")

local7.* action(type="omfile" dynaFile="DynaFile")

配置完成后重启systemctl restart rsyslog

什么意思呢,就是从local7发过来的日志,都会存储到/var/log/日志格式中的PROGRAME/日志格式中的PROGRAME.log文件

为什么要这样做呢?是为了不仅仅我们将日志发到logstash,还在我们本地也存放一份,毕竟不能保证logstash完全百分百分一直运转,当他出错的时候我们可以通过filebeats从新抽取一遍,或者直接丢给我们的日志程序去处理一下

以python logging为例

1
2
python3复制代码# address rsyslog的IP和端口
SysLogHandler(facility=SysLogHandler.LOG_LOCAL7, address=("172.16.102.227", 514))

再来配置logrotate

顾名思义,就是日志轮转,我们配置每天轮转,保存30天的日志,具体的去看下logrotate的文档就行了

vim /etc/logrotate/custom-log

1
2
3
4
5
6
7
8
9
10
11
12
bash复制代码/var/log/yq*/*.log
{
rotate 30
daily
missingok
notifempty
delaycompress
nocompress
postrotate
systemctl kill -s HUP rsyslog.service
endscript
}

这里为什么用yq*,因为项目太多了,所以我们为所有的项目定一个前缀,当然你也可以自己处理它

配置 logstash

vim /etc/logstash/conf.d/my.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
ini复制代码input {
syslog {
port => 5514
}
}

filter {
json {
source => "message"
}
prune {
whitelist_names => [ "msg", "logsource", "program", "time", "level", "type" ]
}
mutate {
remove_field => ["@timestamp", "timestamp"]
}
}

output {
# 对于内网直接存储到ES
if [type] == "off-line" {
elasticsearch {
hosts => ["ES-HOST"]
index => "my-log"
}
}
# 对于外网直接存储到腾讯云OSS 同上面的两者取其一
if [type] == "off-line" {
s3 {
endpoint => "https://cos.ap-nanjing.myqcloud.com"
access_key_id => "xxxx"
secret_access_key => "xxxx"
region => "ap-nanjing"
bucket => "xxxx"
# 10分钟
time_file => 10
codec => "json_lines"
canned_acl => "public-read"
}
}

if [type] == "real-time" {
rabbitmq {
exchange => "my-exchange"
host => "localhost"
exchange_type => "direct"
key => "logstash"
user => "admin"
password => "admin"
}
}
stdout {}
}

调试模式 /user/share/logstash/bin/logstash -f /etc/logstash/conf.d/my.conf

生产环境下 去掉stdout {}

systemctl restart logstash

docker迁移方案

以docker-compose为例

1
2
3
4
5
6
yaml复制代码logging:
driver: "syslog"
options:
syslog-address: "udp://127.0.0.1:514"
tag: yq-service-manager
syslog-facility: local7

后续的

自己可以从ES去查询或分析日志了。。。也可以从kibana可视查看

一些想聊得

  • logstash不必每个服务器都部署,平均多个节点,对应一个logstash节点就可以了
  • rsyslog服务不要使用远程host的方式,在高并发的日志写环境下,远程syslog并不能很好的处理它,甚至会丢弃一些数据
  • 处理离线日志时候可以使用flink或者spark,

我用过得flink,在4核8G的一个服务器中,部署了一个10slot的集群,平均处理速度在1k多条数据每秒,这个速度可以随着slot数量的提升而加快,在不考虑ES性能的情况下。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

还在用Unity开发游戏?那你就out了,试试用Unity做

发表于 2021-06-24

一、前言

大家都支持Unity是用来做游戏开发,比如说做2D游戏、3D游戏,或者工业虚拟仿真软件的开发。

其他Unity可以做的有很多,比如答题系统。

本篇就介绍答题系统的开发

这个答题系统,可以从文本文档中提取题目和分数,然后绑定到UI上,在答题的过程中,自动判断分数,自动判断正确率。

目的是实现一个可快速导入到项目中使用的小模块。

二、效果图及工程下载

题目文档:
wwr.lanzoui.com/ihV6nphkzsf
密码:47z2

源工程:
wwr.lanzoui.com/i7wpaphkzuh

三、实现

3-1 界面搭建

首先,新建工程,然后摆UI,如下图所示:
在这里插入图片描述

3-2 读取文档

题目存放在txt文档中,首先,我们看一下结构:
在这里插入图片描述

每一行都是一道题目,然后题号、题目、选项、得分,都是用冒号进行分割的。

下一步就需要用脚本进行读取文档了。

新建脚本Answer.cs:编写代码:

读取文档:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
csharp复制代码using System.Collections.Generic;
using UnityEngine;

public class Answer : MonoBehaviour
{
//读取文档
string[][] ArrayX;
string[] lineArray;
private int topicMax = 0;//最大题数
private List<bool> isAnserList = new List<bool>();//存放是否答过题的状态


void Start()
{
TextCsv();
}


/*****************读取txt数据******************/
void TextCsv()
{
//读取csv二进制文件
TextAsset binAsset = Resources.Load("YW", typeof(TextAsset)) as TextAsset;
//读取每一行的内容
lineArray = binAsset.text.Split('\r');
//创建二维数组
ArrayX = new string[lineArray.Length][];
//把csv中的数据储存在二维数组中
for (int i = 0; i < lineArray.Length; i++)
{
ArrayX[i] = lineArray[i].Split(':');
}
//查看保存的题目数据
for (int i = 0; i < ArrayX.Length; i++)
{
for (int j = 0; j < ArrayX[i].Length; j++)
{
Debug.Log(ArrayX[i][j]);
}
}
//设置题目状态
topicMax = lineArray.Length;
for (int x = 0; x < topicMax + 1; x++)
{
isAnserList.Add(false);
}
}
}

可以看到,所有的题目数据都读取出来了:
在这里插入图片描述

3-3 加载题目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
csharp复制代码using System.Collections.Generic;
using UnityEngine;
using UnityEngine.UI;

public class Answer : MonoBehaviour
{
//读取文档
string[][] ArrayX;//题目数据
string[] lineArray;//读取到题目数据
private int topicMax = 0;//最大题数
private List<bool> isAnserList = new List<bool>();//存放是否答过题的状态

//加载题目
public GameObject tipsbtn;//提示按钮
public Text tipsText;//提示信息
public List<Toggle> toggleList;//答题Toggle
public Text indexText;//当前第几题
public Text TM_Text;//当前题目
public List<Text> DA_TextList;//选项
private int topicIndex = 0;//第几题


void Start()
{
TextCsv();
LoadAnswer();
}


/*****************读取txt数据******************/
void TextCsv()
{
//读取csv二进制文件
TextAsset binAsset = Resources.Load("YW", typeof(TextAsset)) as TextAsset;
//读取每一行的内容
lineArray = binAsset.text.Split('\r');
//创建二维数组
ArrayX = new string[lineArray.Length][];
//把csv中的数据储存在二维数组中
for (int i = 0; i < lineArray.Length; i++)
{
ArrayX[i] = lineArray[i].Split(':');
}
//设置题目状态
topicMax = lineArray.Length;
for (int x = 0; x < topicMax + 1; x++)
{
isAnserList.Add(false);
}
}

/*****************加载题目******************/
void LoadAnswer()
{
tipsbtn.SetActive(false);
tipsText.text = "";
for (int x = 0; x < 4; x++)
{
toggleList[x].isOn = false;
}
indexText.text = "第" + (topicIndex + 1) + "题:";//第几题
TM_Text.text = ArrayX[topicIndex][1];//题目
int idx = ArrayX[topicIndex].Length - 3;//有几个选项
for (int x = 0; x < idx; x++)
{
DA_TextList[x].text = ArrayX[topicIndex][x + 2];//选项
}
}
}

在这里插入图片描述
题目正常加载:
在这里插入图片描述

3-4 按钮功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
csharp复制代码 /*****************按钮功能******************/
void Select_Answer(int index)
{
switch (index)
{
case 0://提示
int idx = ArrayX[topicIndex].Length - 1;
int n = int.Parse(ArrayX[topicIndex][idx]);
string nM = "";
switch (n)
{
case 1:
nM = "A";
break;
case 2:
nM = "B";
break;
case 3:
nM = "C";
break;
case 4:
nM = "D";
break;
}
tipsText.text = "<color=#FFAB08FF>" +"正确答案是:"+ nM + "</color>";
break;
case 1://上一题
if (topicIndex > 0)
{
topicIndex--;
LoadAnswer();
}
else
{
tipsText.text = "<color=#27FF02FF>" + "前面已经没有题目了!" + "</color>";
}
break;
case 2://下一题
if (topicIndex < topicMax-1)
{
topicIndex++;
LoadAnswer();
}
else
{
tipsText.text = "<color=#27FF02FF>" + "哎呀!已经是最后一题了。" + "</color>";
}
break;
case 3://跳转
int x = int.Parse(jumpInput.text) - 1;
if (x >= 0 && x < topicMax)
{
topicIndex = x;
jumpInput.text = "";
LoadAnswer();
}
else
{
tipsText.text = "<color=#27FF02FF>" + "不在范围内!" + "</color>";
}
break;
}
}

3-5 题目对错判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
csharp复制代码/*****************题目对错判断******************/
void AnswerRightRrongJudgment(bool check,int index)
{
if (check)
{
//判断题目对错
bool isRight;
int idx = ArrayX[topicIndex].Length - 1;
int n = int.Parse(ArrayX[topicIndex][idx]) - 1;
if (n == index)
{
tipsText.text = "<color=#27FF02FF>" + "恭喜你,答对了!" + "</color>";
isRight = true;
tipsbtn.SetActive(true);
}
else
{
tipsText.text = "<color=#FF0020FF>" + "对不起,答错了!" + "</color>";
isRight = false;
tipsbtn.SetActive(true);
}

//正确率计算
if (isAnserList[topicIndex])
{
tipsText.text = "<color=#FF0020FF>" + "这道题已答过!" + "</color>";
}
else
{
anserint++;
if (isRight)
{
isRightNum++;
}
isAnserList[topicIndex] = true;
TextAccuracy.text = "正确率:" + ((float)isRightNum / anserint * 100).ToString("f2") + "%";
}

//禁用掉选项
for (int i = 0; i < toggleList.Count; i++)
{
toggleList[i].interactable = false;
}
}
}

将按钮对象拖进卡槽中,运行程序即可:
在这里插入图片描述

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
csharp复制代码using System.Collections.Generic;
using UnityEngine;
using UnityEngine.UI;

public class Answer : MonoBehaviour
{
//读取文档
string[][] ArrayX;//题目数据
string[] lineArray;//读取到题目数据
private int topicMax = 0;//最大题数
private List<bool> isAnserList = new List<bool>();//存放是否答过题的状态

//加载题目
public GameObject tipsbtn;//提示按钮
public Text tipsText;//提示信息
public List<Toggle> toggleList;//答题Toggle
public Text indexText;//当前第几题
public Text TM_Text;//当前题目
public List<Text> DA_TextList;//选项
private int topicIndex = 0;//第几题

//按钮功能及提示信息
public Button BtnBack;//上一题
public Button BtnNext;//下一题
public Button BtnTip;//消息提醒
public Button BtnJump;//跳转题目
public InputField jumpInput;//跳转题目
public Text TextAccuracy;//正确率
private int anserint = 0;//已经答过几题
private int isRightNum = 0;//正确题数

void Awake()
{
TextCsv();
LoadAnswer();
}

void Start()
{
toggleList[0].onValueChanged.AddListener((isOn) => AnswerRightRrongJudgment(isOn,0));
toggleList[1].onValueChanged.AddListener((isOn) => AnswerRightRrongJudgment(isOn,1));
toggleList[2].onValueChanged.AddListener((isOn) => AnswerRightRrongJudgment(isOn,2));
toggleList[3].onValueChanged.AddListener((isOn) => AnswerRightRrongJudgment(isOn,3));

BtnTip.onClick.AddListener(() => Select_Answer(0));
BtnBack.onClick.AddListener(() => Select_Answer(1));
BtnNext.onClick.AddListener(() => Select_Answer(2));
BtnJump.onClick.AddListener(() => Select_Answer(3));
}


/*****************读取txt数据******************/
void TextCsv()
{
//读取csv二进制文件
TextAsset binAsset = Resources.Load("YW", typeof(TextAsset)) as TextAsset;
//读取每一行的内容
lineArray = binAsset.text.Split('\r');
//创建二维数组
ArrayX = new string[lineArray.Length][];
//把csv中的数据储存在二维数组中
for (int i = 0; i < lineArray.Length; i++)
{
ArrayX[i] = lineArray[i].Split(':');
}
//设置题目状态
topicMax = lineArray.Length;
for (int x = 0; x < topicMax + 1; x++)
{
isAnserList.Add(false);
}
}

/*****************加载题目******************/
void LoadAnswer()
{
for (int i = 0; i < toggleList.Count; i++)
{
toggleList[i].isOn = false;
}
for (int i = 0; i < toggleList.Count; i++)
{
toggleList[i].interactable = true;
}

tipsbtn.SetActive(false);
tipsText.text = "";

indexText.text = "第" + (topicIndex + 1) + "题:";//第几题
TM_Text.text = ArrayX[topicIndex][1];//题目
int idx = ArrayX[topicIndex].Length - 3;//有几个选项
for (int x = 0; x < idx; x++)
{
DA_TextList[x].text = ArrayX[topicIndex][x + 2];//选项
}
}

/*****************按钮功能******************/
void Select_Answer(int index)
{
switch (index)
{
case 0://提示
int idx = ArrayX[topicIndex].Length - 1;
int n = int.Parse(ArrayX[topicIndex][idx]);
string nM = "";
switch (n)
{
case 1:
nM = "A";
break;
case 2:
nM = "B";
break;
case 3:
nM = "C";
break;
case 4:
nM = "D";
break;
}
tipsText.text = "<color=#FFAB08FF>" +"正确答案是:"+ nM + "</color>";
break;
case 1://上一题
if (topicIndex > 0)
{
topicIndex--;
LoadAnswer();
}
else
{
tipsText.text = "<color=#27FF02FF>" + "前面已经没有题目了!" + "</color>";
}
break;
case 2://下一题
if (topicIndex < topicMax-1)
{
topicIndex++;
LoadAnswer();
}
else
{
tipsText.text = "<color=#27FF02FF>" + "哎呀!已经是最后一题了。" + "</color>";
}
break;
case 3://跳转
int x = int.Parse(jumpInput.text) - 1;
if (x >= 0 && x < topicMax)
{
topicIndex = x;
jumpInput.text = "";
LoadAnswer();
}
else
{
tipsText.text = "<color=#27FF02FF>" + "不在范围内!" + "</color>";
}
break;
}
}

/*****************题目对错判断******************/
void AnswerRightRrongJudgment(bool check,int index)
{
if (check)
{
//判断题目对错
bool isRight;
int idx = ArrayX[topicIndex].Length - 1;
int n = int.Parse(ArrayX[topicIndex][idx]) - 1;
if (n == index)
{
tipsText.text = "<color=#27FF02FF>" + "恭喜你,答对了!" + "</color>";
isRight = true;
tipsbtn.SetActive(true);
}
else
{
tipsText.text = "<color=#FF0020FF>" + "对不起,答错了!" + "</color>";
isRight = false;
tipsbtn.SetActive(true);
}

//正确率计算
if (isAnserList[topicIndex])
{
tipsText.text = "<color=#FF0020FF>" + "这道题已答过!" + "</color>";
}
else
{
anserint++;
if (isRight)
{
isRightNum++;
}
isAnserList[topicIndex] = true;
TextAccuracy.text = "正确率:" + ((float)isRightNum / anserint * 100).ToString("f2") + "%";
}

//禁用掉选项
for (int i = 0; i < toggleList.Count; i++)
{
toggleList[i].interactable = false;
}
}
}
}

四、后言

整体来看,只使用了一个场景,一个脚本,就完成了答题系统。

步骤如下:
1、读取文档
2、解析文档保存数据
3、根据数据加载题目
4、上一题下一题,选项选择,跳转,按钮的功能实现

代码还是延期了一贯的简洁风格,希望你可以在这篇文章学到东西。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…633634635…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%