开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

减治法——假币问题 简介 问题:假币问题 想法 寄语

发表于 2021-11-28

「这是我参与11月更文挑战的第27天,活动详情查看:2021最后一次更文挑战」

简介

今天给大家介绍一个很经典的减治法解决假币问题的方法,改方法引自最优化算法一书。

问题:假币问题

在n枚外观相同的硬币中,有一枚是假币,并且已知假币较轻。可以通过一架天平来任意比较两组硬币,从而得知两组硬币的重量是否相同,或者哪一组更轻一些,假币问题(base coin problem)要求设计一个高效的算法来检测出这枚假币。

想法

解决假币问题的最自然的想法就是一分为二,也就是把n枚硬币分成两组,每组有n/2枚硬币,如果n为奇数,就留下一枚硬币,然后把两组硬币分别放到天平的两端。如果两组硬币的重量相同,那么留下的硬币就是假币;否则,用同样的方法对较轻的那组硬币进行同样的处理,因为假币一定在较轻的那组里。

在假币问题中,尽管我们把硬币分成了两组,但每次用天平比较后,只需解决一个规模减半的问题,所以,它属于一个间质减治算法。该算法在最坏情况下满足如下递推式:

  • T(n) = 0 当 n = 1 时
  • T(n) = T(n/2) + 1 当 n > 1 时

应用扩展递归技术求解这个递推式,得到T(n) = O(log2n)。

对于假币问题,将问题规模减半的算法很容易想到,但实际上,减半不是一个最好的选择,考虑不是把硬币分成两组,而是分成三组,前两组有[n/3]组硬币,其余的硬币作为第三组,将前两组硬币放到天平上,如果它们的重量不同,则假币一定在第三组中,用同样的方法对第三组进行处理;如果前两组的重量不同,则假币一定在较轻的一组中,用同样的方法对较轻的那组硬币进行处理。显然这个算法满足如下递推式:

  • T(n) = 0 当 n = 1 时
  • T(n) = T(n/3) + 1 当 n > 1 时

这个递推式的解是T(n) = O(log3n),将原问题一分为三从而获得了更少的比较次数。

算法

采用递归技术设计假币问题的算法,设计函数Coin实现假币问题,算法用伪代码描述如下:

1
2
3
4
5
6
7
8
9
10
伪代码复制代码假币问题Coin
输入:硬币所在数组的下标范围low和high,硬币的个数n
输出:假币在硬币集合中的序号
1. 如果n等于1,则该硬币即为假币,输出对应的序号,算法结束;
2. 计算3组的硬币个数num1、num2和num3;
3. add1=第一组硬币的重量和;add2=第二组硬币的重量和;
4. 根据情况执行下述三种操作之一:
4.1 如果add1大于add2,则在第一组硬币中查找;
4.2 如果add1大于add2,则在第二组硬币中查找;
4.3 如果add1等于add2,则在第三组硬币中查找。

寄语

今天也是小落叶复习考试的一天,时间有限只能初步的给大家介绍有关假币问题,这一经典的减治算法。希望可以对大家有所帮助。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

LeetCode561 数组拆分 I

发表于 2021-11-28

这是我参与11月更文挑战的第27天,活动详情查看:2021最后一次更文挑战。

题目描述:

561. 数组拆分 I - 力扣(LeetCode) (leetcode-cn.com)

给定长度为 2n 的整数数组 nums ,你的任务是将这些数分成 n 对, 例如 (a1, b1), (a2, b2), ..., (an, bn),使得从 1 到 n 的 min(ai, bi) 总和最大。

返回该 最大总和 。

示例一

1
2
3
4
5
6
7
scss复制代码输入:nums = [1,4,3,2]
输出:4
解释:所有可能的分法(忽略元素顺序)为:
1. (1, 4), (2, 3) -> min(1, 4) + min(2, 3) = 1 + 2 = 3
2. (1, 3), (2, 4) -> min(1, 3) + min(2, 4) = 1 + 2 = 3
3. (1, 2), (3, 4) -> min(1, 2) + min(3, 4) = 1 + 3 = 4
所以最大总和为 4

示例二

1
2
3
scss复制代码输入:nums = [6,2,6,5,1,2]
输出:9
解释:最优的分法为 (2, 1), (2, 5), (6, 6). min(2, 1) + min(2, 5) + min(6, 6) = 1 + 2 + 6 = 9

提示:

  • 1 <= n <= 10^4
  • nums.length == 2 * n
  • -10^4 <= nums[i] <= 10^4

思路分析

贪心

假设排完序的结果为a1<=b1<=a2<=b2<=...<=an<=bn

那么a1应该跟谁一组呢?

a1作为全局最小值,无论跟谁一组a1都会被累加进答案,相反,a1的搭档会被永久排除。

既然如此,莫不如排除一个较小的数,即给a1找一个“最小的搭档”b1。

当a1、b1被处理之后,a2同理分析。

所以,最终选择a1,a2,…,an会得到最好的结果。

这是按贪心的思路来的,然后官方答案中还有此解法的另一种解读,顺带推理过程的,感兴趣的去看看。

AC代码

1
2
3
4
5
6
7
8
9
10
11
Kotlin复制代码class Solution {
fun arrayPairSum(nums: IntArray): Int {
nums.sort()
val len = nums.size
var maxNum = 0
for (index in 0 until len step 2){
maxNum += nums[index]
}
return maxNum
}
}

总结

同一套代码,有不同的解题思路,贪心算法和数学证明法,殊途同归。

参考

数组拆分 I - 数组拆分 I - 力扣(LeetCode) (leetcode-cn.com)

【宫水三叶の相信科学系列】反证法证明贪心算法的正确性 - 数组拆分 I - 力扣(LeetCode) (leetcode-cn.com)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

flink sql 知其所以然(十二):流 join 很难嘛

发表于 2021-11-28

1.序篇

看了那么多的技术文,你能明白作者想让你在读完文章后学到什么吗?

*大数据羊说__的文章会让你明白*

1. 博主会阐明博主期望本文能给小伙伴们带来什么帮助,让小伙伴萌能直观明白博主的心思

2. 博主会以实际的应用场景和案例入手,不只是知识点的简单堆砌

3. 博主会把重要的知识点的原理进行剖析,让小伙伴萌做到深入浅出

进入正文。

源码公众号后台回复1.13.2 sql join 的奇妙解析之路获取。

下面即是文章目录,也对应到本文的结论,小伙伴可以先看结论快速了解本文能给你带来什么帮助:

  1. 背景及应用场景介绍:join 作为离线数仓中最常见的场景,在实时数仓中也必然不可能缺少它,flink sql 提供的丰富的 join 方式(总结 6 种:regular join,维表 join,temporal join,interval join,array 拍平,table function 函数)对我们满足需求提供了强大的后盾
  2. 先来一个实战案例:以一个曝光日志 left join 点击日志为案例展开,介绍 flink sql join 的解决方案
  3. flink sql join 的解决方案以及存在问题的介绍:主要介绍 regular join 的在上述案例的运行结果及分析源码机制,它虽然简单,但是 left join,right join,full join 会存在着 retract 的问题,所以在使用前,你应该充分了解其运行机制,避免出现数据发重,发多的问题。
  4. 本文主要介绍 regular join retract 的问题,下节介绍怎么使用 interval join 来避免这种 retract 问题,并满足第 2 点的实战案例需求。

2.背景及应用场景介绍

在我们的日常场景中,应用最广的一种操作必然有 join 的一席之地,例如

  1. 计算曝光数据和点击数据的 CTR,需要通过唯一 id 进行 join 关联
  2. 事实数据关联维度数据获取维度,进而计算维度指标

上述场景,在离线数仓应用之广就不多说了。

那么,实时流之间的关联要怎么操作呢?

flink sql 为我们提供了四种强大的关联方式,帮助我们在流式场景中达到流关联的目的。如下图官网截图所示:

图片

join

  1. regular join:即 left join,right join,full join,inner join
  2. 维表 lookup join:维表关联
  3. temporal join:快照表 join
  4. interval join:两条流在一段时间区间之内的 join
  5. array 炸开:列转行
  6. table function join:通过 table function 自定义函数实现 join(类似于列转行的效果,或者说类似于维表 join 的效果)

在实时数仓中,regular join 以及 interval join,以及两种 join 的结合使用是最常使用的。所以本文主要介绍这两种(太长的篇幅大家可能也不想看,所以之后的文章就以简洁,短为目标)。

3.先来一个实战案例

先来一个实际案例来看看在具体输入值的场景下,输出值应该长啥样。

场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。

来一波输入数据:

曝光数据:

log_id timestamp show_params
1 2021-11-01 00:01:03 show_params
2 2021-11-01 00:03:00 show_params2
3 2021-11-01 00:05:00 show_params3

点击数据:

log_id timestamp click_params
1 2021-11-01 00:01:53 click_params
2 2021-11-01 00:02:01 click_params2

预期输出数据如下:

log_id timestamp show_params click_params
1 2021-11-01 00:01:00 show_params click_params
2 2021-11-01 00:01:00 show_params2 click_params2
3 2021-11-01 00:02:00 show_params3 null

熟悉离线 hive sql 的同学可能 10s 就写完上面这个 sql 了,如下 hive sql

1
2
3
4
5
6
7
8
vbnet复制代码INSERT INTO sink_table
SELECT
    show_log.log_id as log_id,
    show_log.timestamp as timestamp,
    show_log.show_params as show_params,
    click_log.click_params as click_params
FROM show_log
LEFT JOIN click_log ON show_log.log_id = click_log.log_id;

那么我们看看上述需求如果要以 flink sql 实现需要怎么做呢?

虽然不 flink sql 提供了 left join 的能力,但是在实际使用时,可能会出现预期之外的问题。下节详述。

4.flink sql join

4.1.flink sql

还是上面的案例,我们先实际跑一遍看看结果:

1
2
3
4
5
6
7
8
vbnet复制代码INSERT INTO sink_table
SELECT
    show_log.log_id as log_id,
    show_log.timestamp as timestamp,
    show_log.show_params as show_params,
    click_log.click_params as click_params
FROM show_log
LEFT JOIN click_log ON show_log.log_id = click_log.log_id;

flink web ui 算子图如下:

图片

flink web ui

结果如下:

1
2
3
4
5
yaml复制代码+[1 | 2021-11-01 00:01:03 | show_params | null]
-[1 | 2021-11-01 00:01:03 | show_params | null]
+[1 | 2021-11-01 00:01:03 | show_params | click_params]
+[2 | 2021-11-01 00:03:00 | show_params | click_params]
+[3 | 2021-11-01 00:05:00 | show_params | null]

从结果上看,其输出数据有 +,-,代表其输出的数据是一个 retract 流的数据。分析原因发现是,由于第一条 show_log 先于 click_log 到达, 所以就先直接发出 +[1 | 2021-11-01 00:01:03 | show_params | null],后面 click_log 到达之后,将上一次未关联到的 show_log 撤回, 然后将关联到的 +[1 | 2021-11-01 00:01:03 | show_params | click_params] 下发。

但是 retract 流会导致写入到 kafka 的数据变多,这是不可被接受的。我们期望的结果应该是一个 append 数据流。

为什么 left join 会出现这种问题呢?那就要从 left join 的原理说起了。

来定位到具体的实现源码。先看一下 transformations。

图片

transformations

可以看到 left join 的具体 operator 是 org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator。

其核心逻辑就集中在 processElement 方法上面。并且源码对于 processElement 的处理逻辑有详细的注释说明,如下图所示。

图片

StreamingJoinOperator#processElement

注释看起来逻辑比较复杂。我们这里按照 left join,inner join,right join,full join 分类给大家解释一下。

4.2.left join

首先是 left join,以上面的 show_log(左表) left join click_log(右表) 为例:

  1. 首先如果 join xxx on 中的条件是等式则代表 join 是在相同 key 下进行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的数据会被发送到一个并发中进行处理。如果 join xxx on 中的条件是不等式,则两个流的 source 算子向 join 算子下发数据是按照 global 的 partition 策略进行下发的,并且 join 算子并发会被设置为 1,所有的数据会被发送到这一个并发中处理。
  2. 相同 key 下,当 show_log 来一条数据,如果 click_log 有数据:则 show_log 与 click_log 中的所有数据进行遍历关联一遍输出[+(show_log,click_log)]数据,并且把 show_log 保存到左表的状态中(以供后续 join 使用)。
  3. 相同 key 下,当 show_log 来一条数据,如果 click_log 中没有数据:则 show_log 不会等待,直接输出[+(show_log,null)]数据,并且把 show_log 保存到左表的状态中(以供后续 join 使用)。
  4. 相同 key 下,当 click_log 来一条数据,如果 show_log 有数据:则 click_log 对 show_log 中所有的数据进行遍历关联一遍。在输出数据前,会判断,如果被关联的这条 show_log 之前没有关联到过 click_log(即往下发过[+(show_log,null)]),则先发一条[-(show_log,null)],后发一条[+(show_log,click_log)] ,代表把之前的那条没有关联到 click_log 数据的 show_log 中间结果给撤回,把当前关联到的最新结果进行下发,并把 click_log 保存到右表的状态中(以供后续左表进行关联)。这也就解释了为什么输出流是一个 retract 流。
  5. 相同 key 下,当 click_log 来一条数据,如果 show_log 没有数据:把 click_log 保存到右表的状态中(以供后续左表进行关联)。

4.3.inner join

以上面的 show_log(左表) inner join click_log(右表) 为例:

  1. 首先如果 join xxx on 中的条件是等式则代表 join 是在相同 key 下进行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的数据会被发送到一个并发中进行处理。如果 join xxx on 中的条件是不等式,则两个流的 source 算子向 join 算子下发数据是按照 global 的 partition 策略进行下发的,并且 join 算子并发会被设置为 1,所有的数据会被发送到这一个并发中处理。
  2. 相同 key 下,当 show_log 来一条数据,如果 click_log 有数据:则 show_log 与 click_log 中的所有数据进行遍历关联一遍输出[+(show_log,click_log)]数据,并且把 show_log 保存到左表的状态中(以供后续 join 使用)。
  3. 相同 key 下,当 show_log 来一条数据,如果 click_log 中没有数据:则 show_log 不会输出数据,会把 show_log 保存到左表的状态中(以供后续 join 使用)。
  4. 相同 key 下,当 click_log 来一条数据,如果 show_log 有数据:则 click_log 与 show_log 中的所有数据进行遍历关联一遍输出[+(show_log,click_log)]数据,并且把 click_log 保存到右表的状态中(以供后续 join 使用)。
  5. 相同 key 下,当 click_log 来一条数据,如果 show_log 没有数据:则 click_log 不会输出数据,会把 click_log 保存到右表的状态中(以供后续 join 使用)。

4.4.right join

right join 和 left join 一样,只不过顺序反了,这里不再赘述。

4.5.full join

以上面的 show_log(左表) full join click_log(右表) 为例:

  1. 首先如果 join xxx on 中的条件是等式则代表 join 是在相同 key 下进行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的数据会被发送到一个并发中进行处理。如果 join xxx on 中的条件是不等式,则两个流的 source 算子向 join 算子下发数据是按照 global 的 partition 策略进行下发的,并且 join 算子并发会被设置为 1,所有的数据会被发送到这一个并发中处理。
  2. 相同 key 下,当 show_log 来一条数据,如果 click_log 有数据:则 show_log 对 click_log 中所有的数据进行遍历关联一遍。在输出数据前,会判断,如果被关联的这条 click_log 之前没有关联到过 show_log(即往下发过[+(null,click_log)]),则先发一条[-(null,click_log)],后发一条[+(show_log,click_log)] ,代表把之前的那条没有关联到 show_log 数据的 click_log 中间结果给撤回,把当前关联到的最新结果进行下发,并把 show_log 保存到左表的状态中(以供后续 join 使用)
  3. 相同 key 下,当 show_log 来一条数据,如果 click_log 中没有数据:则 show_log 不会等待,直接输出[+(show_log,null)]数据,并且把 show_log 保存到左表的状态中(以供后续 join 使用)。
  4. 相同 key 下,当 click_log 来一条数据,如果 show_log 有数据:则 click_log 对 show_log 中所有的数据进行遍历关联一遍。在输出数据前,会判断,如果被关联的这条 show_log 之前没有关联到过 click_log(即往下发过[+(show_log,null)]),则先发一条[-(show_log,null)],后发一条[+(show_log,click_log)] ,代表把之前的那条没有关联到 click_log 数据的 show_log 中间结果给撤回,把当前关联到的最新结果进行下发,并把 click_log 保存到右表的状态中(以供后续 join 使用)
  5. 相同 key 下,当 click_log 来一条数据,如果 show_log 中没有数据:则 click_log 不会等待,直接输出[+(null,click_log)]数据,并且把 click_log 保存到右表的状态中(以供后续 join 使用)。

4.6.regular join 的总结

总的来说上述四种 join 可以按照以下这么划分。

  1. inner join 会互相等,直到有数据才下发。
  2. left join,right join,full join 不会互相等,只要来了数据,会尝试关联,能关联到则下发的字段是全的,关联不到则另一边的字段为 null。后续数据来了之后,发现之前下发过为没有关联到的数据时,就会做回撤,把关联到的结果进行下发

4.7.怎样才能解决 retract 导致数据重复下发到 kafka 这个问题呢?

既然 flink sql 在 left join、right join、full join 实现上的原理就是以这种 retract 的方式去实现的,就不能通过这种方式来满足业务了。

我们来转变一下思路,上述 join 的特点就是不会相互等,那有没有一种 join 是可以相互等待的呢。以 left join 的思路为例,左表在关联不到右表的时候,可以选择等待一段时间,如果超过这段时间还等不到再下发 (show_log,null),如果等到了就下发(show_log,click_log)。

interval join 闪亮登场。关于 interval join 是如何实现上述场景,及其原理实现,本篇的(下)会详细介绍,敬请期待。

5.总结与展望

源码公众号后台回复1.13.2 sql join 的奇妙解析之路获取。

本文主要介绍了 flink sql regular 的在满足 join 场景时存在的问题,并通过解析其实现说明了运行原理,主要包含下面两部分:

  1. 背景及应用场景介绍:join 作为离线数仓中最常见的场景,在实时数仓中也必然不可能缺少它,flink sql 提供的丰富的 join 方式(总结 4 种:regular join,维表 join,temporal join,interval join)对我们满足需求提供了强大的后盾
  2. 先来一个实战案例:以一个曝光日志 left join 点击日志为案例展开,介绍 flink sql join 的解决方案
  3. flink sql join 的解决方案以及存在问题的介绍:主要介绍 regular join 的在上述案例的运行结果及分析源码机制,它虽然简单,但是 left join,right join,full join 会存在着 retract 的问题,所以在使用前,你应该充分了解其运行机制,避免出现数据发重,发多的问题。
  4. 本文主要介绍 regular join retract 的问题,下节介绍怎么使用 interval join 来避免这种 retract 问题,并满足第 2 点的实战案例需求。

本文使用 文章同步助手 同步

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

代码解释圆周率

发表于 2021-11-28

「这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战」

圆周率(Pi)是圆的周长与直径的比值,一般用希腊字母π表示,是一个在数学及物理学中普遍存在的数学常数。π也等于圆形之面积与半径平方之比。是精确计算圆周长、圆面积、球体积等几何形状的关键值。为了探索圆周率,本文用代码的方式来验算三种计算π的方式

圆周率的计算方法

割圆术

所谓“割圆术”,是用圆内接正多边形的面积去无限逼近圆面积并以此求取圆周率的方法
image.png

代码如下

1
2
3
4
5
6
7
8
9
java复制代码i = 0
n = 15
an = 1
while i < n:
an1Pow = 2 - math.sqrt(4-math. pow( an,2))
an1 = math.sqrt(an1Pow)
an = an1
i += 1
print("由正%d边形求出的PI为%.5Of”% (6*math.pow(2,n),(3*math.pow(2,n)*an)))

莱布尼兹公式

不同于牛顿-莱布尼茨公式)(微积分学),莱布尼茨公式用于对两个函数的乘积求取其高阶导数,一般的,如果函数u = u (x)与函数v= v(x)在点x处都具有n阶导数,那么此时有

image.png

image.png
代码如下:

1
2
3
4
5
6
java复制代码s = 0
result = 0
for i in range(0,100):
s += (-1)**i /(2*i+1)
result = s*4
print(’%.2f’ % result)

结果为:3.13

随机投点法(蒙特卡洛算法)

原理:随机模拟是一种随机试验的方法,也称为蒙特卡罗方法这种方法利用随机试验,根据频率与概率、平均值与期望值等之间的关系,推断出预期的结果。

image.png

在(0,0)和(1,1)范围内随机投test_sum个点,如果落到圆内,hit_sum数量加1,最后用hit_sum/test_sum算出落在圆内的概率

由image.png得圆周率 PI=hit_sum / test_sum * 4

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class PI {

public static void main(string[ ] args) {
int test_sum = 1000000;//投的点数
int hit_sum = o;//投中的个数
double x, y;// x和y坐标点
for (int i = o; i < test_sum; i++) {
//随机得到一个坐标
x = Math.random();
y = Math.random( );

if (x* x +y * y <= 1)//判断掷入的这个点在不在在圆内
hit_sum++;
}
//统计得到rt的值
double pi = (double) hit_sum / test_sum * 4;
System.out.println(“π的结果为:" + pi);
}
}

随机测试的几次结果:

π的结果为:3.140336

π的结果为:3.141012

π的结果为:3.141396

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如果还不懂如何使用 Consumer 接口,来公司我当面给你

发表于 2021-11-28

「这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战」。

上文中我们介绍了 Supplier 的使用方法,本文我们来看看 Consumer 和 Function 的源代码与使用情况。

Consumer

我们一般称之为“消费者”,它表示接受单个输入参数但不返回结果的操作。不同于其它函数式接口,Consumer 预期通过副作用进行操作。

那什么又是副作用呢?说一下我所理解的副作用,副作用其实就是一个函数是否会修改它范围之外的资源,如果有就叫有副作用,反之为没有副作用。比如修改全局变量,修改输入参数所引用的对象等。

1
2
3
4
5
6
7
8
9
10
java复制代码@FunctionalInterface
public interface Consumer<T> {

void accept(T t);

default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}

方法解析:

  • test:对给定的参数执行此操作。
  • andThen:返回一个组合的 Consumer ,依次执行此操作,然后执行after操作。如果执行任一操作会抛出异常,它将被转发到组合操作的调用者。如果执行此操作会引发异常,则不会执行after操作。

正如我们案例中遇到的场景,我们只需要将要执行的逻辑方法当作参数传入 getResponse() 中,然后在该方法中执行 accept() 方法进行消费即可。如果还不理解,我们可以把它转换为匿名内部类的调用方式。

1
2
3
4
5
6
java复制代码 getResponse(dto, response, new Consumer<B>() {
@Override
public void accept(B bb) {
mapper.insert(bb);
}
});

当调用accept() 方法的时候就会去调用匿名内部类的方法了,也就是我们传入 getResponse() 的逻辑方法。

Function

我把它称为“转换者”,表示接收一个参数通过处理之后返回一个结果的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@FunctionalInterface
public interface Function<T, R> {

R apply(T t);

default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
Objects.requireNonNull(before);
return (V v) -> apply(before.apply(v));
}


default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t) -> after.apply(apply(t));
}


static <T> Function<T, T> identity() {
return t -> t;
}
}

方法解析:

  • apply:将 T 类型的参数传入,经过函数表达式的计算,返回 R 类型的结果;
  • compose:返回一个组合函数,先将参数应用于 before 函数,然后将结果应用于当前函数,返回最终结果。如果对任一函数的求值引发异常,则会将其转发给组合函数的调用方。
  • andThen:返回一个组合函数,先将参数应用与当前函数,然后将结果应用于 after 函数,返回最终的结果。如果对任一函数的求值引发异常,则会将其转发给组合函数的调用方。
  • identity:返回始终返回其输入参数的函数。

我们在 lambda 表达式中应用比较多,所以我们来简单演示下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Data
@AllArgsConstructor
public class Teacher {
private String name;
private int age;
}

public class TeacherTest {
public static void main(String[] args) {
List<Teacher> list = Arrays.asList(
new Teacher("张三",25),
new Teacher("李四",28),
new Teacher("王五",18));
List<String> collect = list.stream().map(item -> item.getName()).collect(Collectors.toList());
System.out.println(collect);
}
}

其中 map 接收的参数就是 Function 类型, item 为传入参数,item.getName() 为返回处理的结果,最后输出结果为

1
xml复制代码[张三, 李四, 王五]

看到这,Consumer 和 Function 的使用方法相信你一定是掌握了吧!如果你有不同的意见或者更好的idea,欢迎联系阿Q,添加阿Q可以加入技术交流群参与讨论呦!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【设计模式】 策略模式源码学习与实践

发表于 2021-11-28

前言

在业务开发中,我们最经常使用到的判断就是if…else,只要涉及到多种策略的实现方式,我们脑海中就会使用这个判断。有时候产品需求的不明确,一个版本迭代来一种判断,随着时间的推移,这个实现方法就会变得又长又臭,那有什么办法可以来觉得呢,通过学习策略模式,他能够很好的帮我们解决这个问题。

纲要

在学习之前,有一句话我觉得比设计模式更重要。

设计原则和思想比设计模式更加普适和重要

策略模式

什么是策略模式?

简单的来说,就是定义一系列算法,封装每个算法,并使它们可以互换。

策略让算法独立于使用它的客户端而变化。

1. 三个关键角色

三个关键角色

三个关键角色

2. 目的

​ 实现不同的策略,将策略分离

3. 为什么要使用策略模式

​ 主要有三个原因

为什么使用策略模式?

为什么使用策略模式?

优缺点

优缺点

优缺点

场景分析

通过第一部分,我们对策略模式有了一个大概的认识,那他主要针对于什么场景呢

  1. 需要动态切换不同算法
  2. 多重的条件选择业务场景
  3. 客户端只关心调用,不关心算法细节
  4. 分离策略

源码案例

源码案例

源码案例

我们可以通过优秀的代码中学习策略模式的实现

SimpleInstantiationStrategy中分析出它的关键角色

可以从命名中看出,SimpleInstantiationStrategy是其中的某一个策略,我们在写策略模式的时候最好也以Strategy为结尾,表名这就是个策略。

1.抽象策略类

1
2
3
4
5
6
7
less复制代码public interface InstantiationStrategy {
    Object instantiate(RootBeanDefinition var1, @Nullable String var2, BeanFactory var3) throws BeansException;

    Object instantiate(RootBeanDefinition var1, @Nullable String var2, BeanFactory var3, Constructor<?> var4, Object... var5) throws BeansException;

    Object instantiate(RootBeanDefinition var1, @Nullable String var2, BeanFactory var3, @Nullable Object var4, Method var5, Object... var6) throws BeansException;
}

2.具体策略类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
less复制代码public class SimpleInstantiationStrategy implements InstantiationStrategy {
    private static final ThreadLocal<Method> currentlyInvokedFactoryMethod = new ThreadLocal();

    public SimpleInstantiationStrategy() {
    }

    @Nullable
    public static Method getCurrentlyInvokedFactoryMethod() {
        return (Method)currentlyInvokedFactoryMethod.get();
    }

    public Object instantiate(RootBeanDefinition bd, @Nullable String beanName, BeanFactory owner) {
       //...
    }

    public Object instantiate(RootBeanDefinition bd, @Nullable String beanName, BeanFactory owner, Constructor<?> ctor, Object... args) {
      //...
    }

    public Object instantiate(RootBeanDefinition bd, @Nullable String beanName, BeanFactory owner, @Nullable Object factoryBean, Method factoryMethod, Object... args) {
      //...
}

3.上下文信息类

这里没有找到上下文信息类,我们可以理解为上下文就是来存取策略的一个地方。在这里可以与工厂模式有一个很好的结合,在实践demo中给大家展示。

实践

与工厂模式结合:

上下文信息类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typescript复制代码@Component
public class rulesFactory {
    private static final Map<Integer,RuleStrategy> map = new HashMap<>(20);

    @Resource
    public selectStrategy selectStrategy;

    //@PostConstruct 用来修饰一个非静态的void()方法.而且这个方法不能有抛出异常声明。在服务器加载Servlet的时候运行,并且只会被服务器调用一次
    @PostConstruct
    public void init(){
        map.put(1,selectStrategy);
    }

    public RuleStrategy creator(Integer type) {
        return map.get(type);
    }
}

抽象策略(Strategy)角色:

1
2
3
4
csharp复制代码public interface RuleStrategy<T extends RulesProcessorBO> {
    
    void process(T t);
}

环境角色(组件,直接调用工厂):

1
2
3
4
5
6
7
8
9
10
11
12
13
typescript复制代码@Component
public class RuleContext {

    @Resource
    private rulesFactory bizRuleFactory;

    public void process(RulesProcessorBO rulesProcessorBO) {

        bizRuleFactory.creator(rulesProcessorBO.getRuleCode()).process(rulesProcessorBO);

    }

}

具体策略(ConcreteStrategy)角色:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
less复制代码@Component
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public class selectStrategy implements RuleStrategy<selectProcessorBo> {

    @Autowired
    private ManagerMapper managerMapper;

    @Override
    public void process(selectProcessorBo rulesProcessorBO) {
        Manager m = managerMapper.getById("1");
        System.out.println(m.toString());

    }
}

这就是一个策略模式的实现,可以看出代码量一下就上去了,如果在规则简单而且情况少的情况下,我们可能就不需要再去使用策略模式,因为它不够直观。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

孔夫子旧书网数据采集,举一反三学爬虫,Python爬虫120

发表于 2021-11-28

「这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战」

电商类网站爬虫,永远是爬虫圈必爬项目。今天我们就拿《孔夫子旧书网》练练手。

爬取目标源数据分析

本次要爬取的目标网址为 https://book.kongfz.com/Cxiaoshuo/v6/,打开页面寻找分页数据,在下图所示位置可以进行页码切换。

孔夫子旧书网数据采集,举一反三学爬虫,Python爬虫120例第21例
在切换页码的同时,捕获到分页链接,并寻找分页规则。

1
2
3
txt复制代码https://book.kongfz.com/Cxiaoshuo/v6w1/
https://book.kongfz.com/Cxiaoshuo/v6w2/
https://book.kongfz.com/Cxiaoshuo/v6w3/

提炼列表页地址模板为 https://book.kongfz.com/C{类别}/v6w{页码}/。

上述内容梳理完毕,就可以对列表页进行采集爬取了,本次爬取分为三个步骤进行。

  1. 提取所有图书分类;
  2. 采集每个类别下的列表页(测试数据,只采集单一分类下的 5 页数据);
  3. 提取目标数据,例如图书名称,作者,出版社,出版时间,店铺名称等信息。

接下来按照步骤实现即可。

提取所有图书分类

通过开发者工具,捕获图书分类区域 HTML 代码,如下所示:

孔夫子旧书网数据采集,举一反三学爬虫,Python爬虫120例第21例
上述数据,可访问任意分类页即可获取,核心代码如下所示,其中 self.get_headers() 函数,可参考之前的博客,或者下载代码查阅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
python复制代码import requests
from lxml.html import etree
import random
import time


class SSS:
def __init__(self):

self.url_format = 'https://book.kongfz.com/C{}/v6w{}/'
# 待抓取的分类,可以扩展
self.types = ["wenxue", "xiaoshuo"]
self.session = requests.Session()
self.headers = self.get_headers()
self.categorys =[]

def get_categorys(self):

with self.session.get(url='https://book.kongfz.com/Cfalv/',headers=self.headers) as res:
if res:
html = etree.HTML(res.text)
items = html.cssselect('.tushu div.link-item a')
# 匹配出URL中的type
for item in items:
# print(item)
# print(item.get("href"))
href = item.get("href")
type = href[href.find('C')+1:-1]
self.categorys.append(type)

此时简单运行之后,就会得到如下清单,即孔夫子旧书网所有图书分类。

1
2
3
4
5
6
7
txt复制代码xiaoshuo
wenxue
yuyan
lishi
dili
yishu
……

此时遍历该列表,即可获取所有图书列表页数据,学习阶段,可取其中一条进行分析,例如我选择的文学与小说分类,self.types = ["wenxue", "xiaoshuo"]。

采集分类页静态页面数据

对于静态页面数据,采用之前的方法保存到本地即可,在 SSS 类中增加 get_detail 与 run 函数,页码由于数据量的原因,最大为 200,可以先设置为 5,便于爬取,下述代码在运行时,注意提前建立好 孔夫子 文件夹。

代码继续使用 session.get 方法,进行数据请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
python复制代码    def get_detail(self, type, page):
with self.session.get(url=self.url_format.format(type, page), headers=self.headers, timeout=5) as res:
if res.text:
with open(f"./孔夫子/{type}_{page}.html", "w+", encoding="utf-8") as f:
f.write(res.text)
else:
# 如果无数据,重新请求
print(f"页码{page}请求异常,重新请求")
self.get_detail(page)

def run(self):
pagesize = 5
for type in self.types:
for page in range(1, pagesize):
self.get_detail(type, page)
time.sleep(2)
print(f"分类:{type},页码:{page}页面储存完毕!")

运行代码,得到如下数据,实测过程中,并未发现反爬措施,为了便于测试,可针对性控制请求速度。
孔夫子旧书网数据采集,举一反三学爬虫,Python爬虫120例第21例

提取数据

最后对本地 HTML 进行操作,获取最终的目标数据。

在进行提取的时候,依旧是 CSS 选择器 的使用熟练程度起决定性作用,当然对于异常数据的处理,也需要注意一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
python复制代码# 数据提取类
class Analysis:
def __init__(self):
# 待抓取的分类,可以扩展
self.types = ["wenxue", "xiaoshuo"]

# 去除特殊字符
def remove_character(self, origin_str):
if origin_str is None:
return
origin_str = origin_str.replace('\n', '')
origin_str = origin_str.replace(',', ',')
return origin_str

def format(self, text):
html = etree.HTML(text)
# 获取所有项目区域 div
div_books = html.cssselect('div#listBox>div.item')
for book in div_books:
# 获取标题属性值
title = book.cssselect('div.item-info>div.title')[0].get('title')
# 作者默认给空值
author = None
author_div = book.cssselect('div.item-info>div.zl-isbn-info>span:nth-child(1)')
if len(author_div)>0:
author = author_div[0].text
# 出版社相同操作
publisher = None
publisher_div = book.cssselect('div.item-info>div.zl-isbn-info>span:nth-child(2)')
if len(publisher_div)>0:
# 进行数据提取与截取
publisher = publisher_div[0].text.split(' ')[1]
print(publisher)

def run(self):
pagesize = 5
for type in self.types:
for page in range(1, pagesize):
with open(f"./孔夫子/{type}_{page}.html", "r", encoding="utf-8") as f:
text = f.read()
# print(text)
self.format(text)

提取过程中出现了部分异常数据,针对异常数据进行特殊化处理即可,例如下述截图数据。
孔夫子旧书网数据采集,举一反三学爬虫,Python爬虫120例第21例
学习阶段,就不再继续提取更多的数据,仅提取书名,作者和出版社。

1
2
3
4
5
6
7
8
9
txt复制代码长篇小说:达哈士孔的狒狒(精装) [法]阿尔丰斯·都德  著;李劼人  译 四川文艺出版社
剑王朝.4 论剑 无罪 长江出版社
只有月亮听得见 康玲玲 四川文艺出版社
元尊1·潜龙在渊 天蚕土豆 著 长江出版社
畅销书女王:张爱玲的33堂写作课 端木向宇 天津人民出版社
区块链改变世界 严行方 中国纺织出版社
我们还会再见吗 苗勇刚、贾宇萍 译 中国出版集团,现代出版社
仲夏夜之恋I 小妮子 著 作家出版社
长篇霸都亚纳(精装) [法]赫勒·马郎 著;李劼人 译 四川文艺出版社

收藏时间

代码下载地址:codechina.csdn.net/hihell/pyth…,可否给个 Star。

==来都来了,不发个评论,点个赞,收个藏吗?==

今天是持续写作的第 201 / 365 天。
可以关注我,点赞我、评论我、收藏我啦。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

在SQL Sever复制数据库时出现启动、执行SQL Ser

发表于 2021-11-28

在SQL Sever复制数据库时出现启动、执行SQL Server代理作业错误

如下图所示:
1.png

问题一:启动SQL Server代理作业错误。

问题二:执行SQL Server代理作业错误。

问题一的解决方式是直接在右侧开启SQL Server代理。

2.png

不过问题二解决后,会影响SQL Server代理的状态,需要重新开启SQL Server代理。

问题二的原因一般是目标服务器的SQL Server代理服务的权限过低。有两种解决方案。

方案一:更改代理服务权限为LocalSystem账户。

第一步:由于我的SQL Sever是2012版的,所以在此电脑里输入C:\Windows\SysWOW64\SQLServerManager11.msc

找到配置管理器。(不同版本的SQL Sever配置管理器的地址可能有所差异,可在官网上找到对应的地址。)。

第二步:分别将SQL Server和SQL Server代理两个服务都更改为Local System 。

3.png

4.png

第三步:开启SQL Server代理(问题一的解决方法)。

再次复制数据库就可以成功了!

5.png

方案二:更改身份验证为SQL Server验证。

第一步:启动SQL Server时更改身份验证为SQL Server验证。登录名为sa(自带),自己设置一个密码,可勾选记住密码。

6.png

第二步:开启SQL Server代理(问题一的解决方法)。

第三步:在选择源服务器和目标服务器时,选择使用SQL Server验证,输入(确认)登录名和密码。

7.png

8.png

第三步:选择传输方法时,选择使用SQL管理对象方法(M)。

9.png

最后就能复制成功了!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Linux图形化界面及命令行

发表于 2021-11-28

这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战

Linux经常被人们使用的方式是以命令窗口的形式出现的,但是图形化界面也是我们所需要的使用方式。图形化界面,通常被分为以下两个部分

(1)X Windows

(2)KDE、GNOME或者其他桌面环境

一、X Window 系统

因为版权问题Linux不能与window系统有太多的相像,所以在1984年的美国麻省理工的计算机科学研究室开发了一个X Window视窗系统(基于“服务器/客户端”C/S架构)这个架构我们常见于进行APP的开发,与之相对的就是B/S架构这个常用于web项目的开发。

X Window 系统为GUI环境提供了最基本的框架,这样你就可以进行基本的GUI操作,在屏幕上绘图,移动窗口,以及进行鼠标和键盘的互动。

X Window的组成如图

图片.png

客户端和服务端通过通信协议来进行交互。互联网通信是十分重要的东西,今天我们就来简单说说互联网通信的一些内容两台计算机通过网络实现文件共享行为,简单来说就是利用互联网来进行用户之间信息交换的行为就是互联网通信。

目前的X Window存在两种实现方式XFree86和X.Org服务器。

二、KDE桌面

KDE,为1996年10月创建,K桌面项目始建于1996年10月,确切的公布日期是1996年10月14日。K桌面项目是由图形排版工具Lyx的开发者、一位名为Matthias Ettrich的德国人发起的,目的是为满足普通用户也能够通过简单易用的桌面来管理Unix工作站上的各种应用软件以及完成各种任务。

三、认识Linux Shell

Shell是系统给用户界面提供的用户和内核进行交互操作的一种接口。Shell在Linux环境下操作多种版本,大概版本被分为

Bourne Shell:是最初的UNIX shell ,是一个交换式的命令解释器和命令编程语言。

C Shell:为了让用户更容易的使用交互式功能,并且把ALGOL风格的语法结构变成了C语言风格。

Korn Shell:结合了C Shell 的交互式特性,和Bourne Shell 的语法,所以广受好评。

如图:

图片.png

四、Shell的使用

(1)打开终端

(2)使用文本编辑器来编辑内容

(3)最后来进行运行

五、使用Linux的一些注意事项

我们在执行Shell脚本,或者执行一些命令的时候,Linux需要我们具有一定的权限,但是如果使用root权限可能会在不经意间做出一些错误的操作。所以我们平时使用的时候最好还是使用普通用户权限。需要的时候我们再进行申请管理员权限。在 Linux 系统中,我们所有的操作实质都是在进行进程访问文件的操作。我们访问文件需要先取得相应的访问权限,而访问权限是通过 Linux 系统中的安全模型获得的。

对于 Linux 系统中的安全模型,我们需要知道下面两点:

1、Linux 系统上最初的安全模型叫 DAC, 全称是 Discretionary Access Control ,翻译为自主访问控制。

2、后来又增加设计了一个新的安全模型叫 MAC, 全称是 Mandatory Access Control, 翻译为强制访问控制。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

XXL-JOB- admin 源码

发表于 2021-11-28

xxl-job 的 admin 服务是 xxl-job 的调度中心,负责管理和调度注册的 job,关于 xxl-job 的使用,可以阅读 “参考阅读” 中的《XXL-JOB分布式调度框架全面详解》,这里主要是介绍 admin 中的源码。


admin 服务除了管理页面上的一些接口外,还有一些核心功能,比如:

1、根据 job 的配置,自动调度 job;

2、接收 executor 实例的请求,实现注册和下线;

3、监视失败的 job,进行重试;

4、结束一些异常的 job;

5、清理和统计日志;

这些功能都是在 admin 服务启动后,在后台自动运行的,下面将详细介绍 admin 服务这些功能的实现。

XxlJobAdminConfig

admin 的配置类

XxlJobAdminConfig 是 admin 服务的配置类,在 admin 服务启动时,它除了配置 admin 服务的一些参数外,还会启动 admin 服务的所有后台线程。

属性

该类的属性主要分为5类:

1、配置文件中的参数,比如 accessToken;

2、DAO 层各个数据表的 mapper;

3、Spring 容器中的一些 Bean,比如 JobAlarmer、DataSource 等;

4、私有变量 XxlJobScheduler 对象;

1
java复制代码private XxlJobScheduler xxlJobScheduler;

5、私有静态变量 adminConfig,指向实例自身。

1
2
3
4
5
java复制代码private static XxlJobAdminConfig adminConfig = null;

public static XxlJobAdminConfig getAdminConfig() {
return adminConfig;
}

方法

该类有两个重要方法,分别实现自接口 InitializingBean、DisposableBean,作用如下:

  • afterPropertiesSet 方法,在 Spring 容器中 Bean 初始化完成之后,在该方法中进行初始化;
  • destroy 方法,在容器销毁 Bean 时,会执行销毁操作;

这两个方法分别调用了 XxlJobScheduler 对象的 init、destroy 方法,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;

xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}

@Override
public void destroy() throws Exception {
xxlJobScheduler.destroy();
}

XxlJobAdminConfig 作为 admin 服务的配置类,作用就是在 Spring 容器启动时,调用 XxlJobScheduler 的初始化方法,来初始化和启动 admin 服务的功能。

XxlJobScheduler

XxlJobScheduler 的作用就是调用各个辅助类(xxxHelper)来启动和结束不同的线程和功能,初始化方法 init 的代码如下:

如果把 XxlJobScheduler 看做是一个启动器,那么 init 方法就是启动按钮,XxlJobAdminConfig 的作用就是按下这个按钮。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public void init() throws Exception {
// 0、初始化国际化消息,不是很重要忽略
initI18n();

// 1、初始化调度器的线程池
JobTriggerPoolHelper.toStart();

// 2、启动注册监视器线程
JobRegistryHelper.getInstance().start();

// 3、启动 失败 job 监视器线程,查询失败日志进行重试
JobFailMonitorHelper.getInstance().start();

// 4、启动 丢失 job 监视器线程,一些 job 发出调度指令后,一直没有响应,状态一直是“运行中”
JobCompleteHelper.getInstance().start();

// 5、启动日志统计和清理线程
JobLogReportHelper.getInstance().start();

// 6、启动调度线程,定时调度 job
JobScheduleHelper.getInstance().start();

logger.info(">>>>>>>>> init xxl-job admin success.");
}

下面我们主要介绍 init 中各个类及其作用,最后再简单一下介绍 destroy 的作用。


JobTriggerPoolHelper

Trigger 线程池的辅助类:管理 Trigger 线程池、添加 trigger 线程到线程池

当 admin 服务向 executor 实例发出一个调度请求来执行 job 时,会调用 XxlJobTrigger.trigger() 方法把要传输的参数(比如 job_id、jobHandler、job_log_id、阻塞策略等,包装成 TriggerParam 对象)传给 ExecutorBiz 对象来执行一次调度。

xxl-job 对调度过程做了两个优化:

  • 每次发出调度请求时,会新建一个线程,异步执行 XxlJobTrigger 的方法;
  • 在新建线程时,会根据执行 XxlJobTrigger 方法的耗时,选择不同的线程池;

属性 和 start

初始化线程池

JobTriggerPoolHelper 在 toStart 方法中初始化了它的两个线程池属性,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码/**
* 快速、慢速线程池,分别执行调度任务不一样的任务,实现隔离,避免相互阻塞
*/
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;

public void start() {
fastTriggerPool = new ThreadPoolExecutor(
10,
// 至少200
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));

slowTriggerPool = new ThreadPoolExecutor(
10,
// 至少100
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(2000),
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()));
}

每次有调度请求时,就会在这两个线程池中创建线程,创建线程的逻辑在 addTrigger 方法中。

addTrigger

新建线程,调用 XxlJobTrigger.trigger() 方法

不同 job 存在执行时长的差异,为了避免不同耗时 job 之间相互阻塞,xxl-job 根据 job 的响应时间,对 job 进行了区分,主要体现在:

  • 如果 job 耗时短,就在 fastTriggerPool 线程池中创建线程;
  • 如果 job 耗时长且调用频繁,就在 slowTriggerPool 线程池中创建线程;

如果快 job 与调用频繁的慢 job 在同一个线程池中创建线程,慢 job 会占用大量的线程,导致快 job 线程不能及时运行,降低了线程池和线程的利用率。xxl-job 通过快慢隔离,避免了这个问题。

问题:如果快慢 job 使用同一个线程池时,慢 job 占用了线程,导致快 job 线程不能及时运行,正常情况下,我们的反应是增加线程池的线程数,这样做能否解决问题?

不能,因为慢 job 还是会占用大量线程,抢占了快 job 的线程资源;增加线程池中的线程数不但没有提升利用率,还会导致大量线程看空闲,利用率反而降低了。最好的方法还是用两个线程池把两者隔离,可以合理地使用各自线程池的资源。


为了记录慢 job 的超时次数,代码中使用一个 map(变量 jobTimeoutCountMap )来记录一分钟内 job 超时次数,key 值是 job_id,value 是超时次数。在调用 XxlJobTrigger.trigger() 方法之前,会先判断 map 中,该 job_id 的超时次数是否大于 10,如果大于10,就是使用 slowTriggerPool,代码如下:

1
2
3
4
5
6
7
8
9
java复制代码// 属性变量
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();

// 选择线程池,如果在一分钟内调度超过10次,使用 slowTriggerPool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) {
triggerPool_ = slowTriggerPool;
}

调用 XxlJobTrigger.trigger() 方法后,根据两个值来更新 jobTimeoutCountMap 的值:

  • 当前时间与上次调用是否在一分钟以内,如果不在一分钟以内,就清空 map;
  • 本次 XxlJobTrigger.trigger() 的调用是否超过 500 毫秒,如果超过 500 毫秒,就在 map 中增加 job_id 的超时次数;

和上面的代码相结合,一个 job 在一分钟内有10次调用超过 500 毫秒,就认为该 job 是一个 频繁调度且耗时的 job。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码// 属性变量,初始值等于 JobTriggerPoolHelper 对象构造时的分钟数
// 每次调用 XxlJobTrigger.trigger() 方法时,值等于上一次调用的分钟数
private volatile long minTim = System.currentTimeMillis() / 60000;

// 当前时间的分钟数,如果和前一次调用不在同一分钟内,就清空 jobTimeoutCountMap
long minTim_now = System.currentTimeMillis() / 60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}

// 开始调用 XxlJobTrigger.trigger() 的时间
long start = System.currentTimeMillis();

// ... 调用 XxlJobTrigger.trigger() 方法
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);

// 如果用时超过 500 毫秒,就增加一次它的慢调用次数
long cost = System.currentTimeMillis() - start;
if (cost > 500) {
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}

XxlJobTrigger.trigger() 方法在下面的 XxlJobTrigger 类中有详细介绍,这里只需要知道它会对一个 job 发起一次执行请求。

在该类中,属性变量 minTim 和 jobTimeoutCountMap 都使用 volatile 来修饰,保证了并发调用 addTrigger 时数据的一致性和可见性。

问题:为什么要每分钟清空一次 map 中的数据?

admin 服务发起 job 调度请求时,是在静态方法 public static void trigger() 中调用静态变量 private static JobTriggerPoolHelper helper 的 addTrigger 方法来发起请求的。minTim 和 jobTimeoutCountMap 虽然不是 static 修饰的,但可以看做是全局唯一的(因为持有它们的对象是全局唯一的),因此这两个参数维护的是 admin 服务全局的调度时间和超时次数,为了避免记录的数据量过大,需要每分钟清空一次数据的操作。

JobRegistryHelper

executor 注册和下线的辅助类

admin 服务提供了接口给 executor 来注册和下线,另外,当 executor 长时间(90秒)没有发心跳时,要把 executor 自动下线。前一个功能通过暴露一个接口来接收请求,后一个功能需要开启一个线程,定时更新过期 executor 的状态。

xxl-job 为了提升 admin 服务的性能,在前一个功能的接口接收到 executor 的请求时,不是同步执行,而是在线程池中开启一个线程,异步执行 executor 的注册和下线请求。

JobRegistryHelper 类就负责管理这个线程池和定时线程的。

注册和下线

线程池的定义和初始化代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码// 注册或移除 executor 的线程池
private ThreadPoolExecutor registryOrRemoveThreadPool = null;

// 注册或移除线程池
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2, 10, 30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2000),
r -> new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode()),
(r, executor) -> {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match thread pool rejected handler(run now).");
});

线程池的核心线程数是 2,最大线程数是10,允许一个 2000 的队列,如果 executor 实例很多,会导致注册延迟的。当然,一般不会把2000个 executor 注册到同一个 admin 服务。

executor 实例在发起注册和下线请求时,会调用 AdminBizImpl 类的对应方法,该类的方法如下:


可以看到,AdminBizImpl 类的两个方法都是调用了 JobRegistryHelper 方法来实现,其中 JobRegistryHelper.registry 方法代码如下(registryRemove 代码与之相似):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public ReturnT<String> registry(RegistryParam registryParam) {
// 校验参数
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// 在线程池中创建线程
registryOrRemoveThreadPool.execute(() -> {
// update
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao()
.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(),
registryParam.getRegistryValue(), new Date());
// update 失败,insert
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao()
.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(),
registryParam.getRegistryValue(), new Date());
// 刷新,空方法
freshGroupRegistryInfo(registryParam);
}
});
return ReturnT.SUCCESS;
}

这两个方法是通过在线程池 registryOrRemoveThreadPool 中创建线程来异步执行请求,然后把数据更新或新建到数据表 xxl_job_registry 中。

更新和管理 Job_group

当 executor 注册到 admin 服务后(数据入库到 xxl_job_registry 表),是不会在页面上显示的,需要要用户手动添加 job_group 数据(添加到 xxl_job_group 表),admin 服务会自动把用户添加的 job_group 数据与 xxl_job_registry 数据关联。这就需要 admin 定时从 xxl_job_group 表读取数据,关联 xxl_job_registry 表和 xxl_job_group 表的数据。

这个功能是与 “executor 自动下线” 功能在同一个线程中实现,该线程的主要逻辑是:

  • 从 xxl_job_group 表查询出 “自动设置 address” 的 group 列表,如果 group 列表不为空,才继续向下执行;
  • 从 xxl_job_registry 表删除不再存活(90秒内都没有更新)的记录,避免无效记录影响后续操作;
  • 从 xxl_job_registry 表取出存活的记录,根据 appName 设置 xxl_job_group 记录的 address_list 值,多个 address 使用逗号拼接;
  • sleep 30 秒,这个线程每 30 秒执行一次。

相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码// 注册监视器线程
private Thread registryMonitorThread;

// 停止标志位
private volatile boolean toStop = false;


// 自动注册的 job group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);

// 删除已经下线(90 秒内没有心跳)的注册 admin/executor
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids != null && ids.size() > 0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}

// 刷线还在线(90秒内有心跳)的 admin/executor 的地址 map<appName,List<address>>
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
HashMap<String, List<String>> appAddressMap = new HashMap<>(list.size());
// 略...


// 每 30 秒执行一次
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);

从这里可以看出,如果是对外接口(接收请求等)的功能,使用线程池和异步线程来实现;如果是一些自动任务,则是通过一个线程来定时执行。

JobFailMonitorHelper

Job 执行失败的监视线程辅助类

如果一个 Job 调度后,没有响应返回,需要定时重试。作为一种“自动执行”的任务,很显然可以像前面 JobRegistryHelper 一样,使用一个线程定时重试。

在这个类中,定义了一个监视线程,以每10 秒一次的频率运行,对失败的 job 进行重试。如果 job 剩余的重试次数大于0,就会 job 进行重试,并把发送告警信息。线程的定义如下:

1
2
3
4
java复制代码/**
* 监视器线程
*/
private Thread monitorThread;

这里需要关注的问题是:当 admin 服务是集群部署时(共用一个数据库),怎么避免一个 job 被多个实例多次重试?需要有一个“分布式锁”。

加锁

在这个线程中,它利用 “数据库执行 UPDATE 语句时会加上互斥锁” 的特性,使用了 “基于数据库的分布式锁”,代码如下所示:

1
2
3
4
5
6
7
8
9
10
java复制代码// UPDATE 语句给该条记录加互斥锁,如果能加上,说明没有其他线程在修改该记录,也说明该记录还没被修改过
// 设置新值 -1,表示该记录已经被加锁了
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao()
.updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}

// 解锁
xlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId,-1, newAlarmStatus);

在这个语句中,会把 jobLog 的状态设置为 -1,这是一个无效状态值,当其他线程通过有效状态值来搜索失败记录时,会略过该记录,这样该记录就不会被其他线程重试,达到的分布式锁的功能(这个锁是一个行锁)。或者说,-1状态类似于 java 中的对象头的锁标志位,表明该记录已经被加锁了,其他线程会“忽略”该记录。

问题:这里的加锁解锁代码有什么问题?

在 try 代码块中加锁和解锁,如果加锁后重试时抛出异常,会导致该记录永远无法解锁。所以,应该在 finnally 块中执行解锁操作,或者使用 redis 给锁加一个过期时间来实现分布式锁。

重试

从失败的日志中取出 jobId,查询出对应的 jobInfo 数据,如果日志中的剩余重试次数大于 0,就执行重试。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码// 取出失败的日志和对应的 job
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

// 1、重试失败的 job,更新日志的 trigger_msg 字段值
if (log.getExecutorFailRetryCount() > 0) {
// 调度任务调度
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY,
(log.getExecutorFailRetryCount() - 1),
log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>" + I18nUtil.getString("jobconf_trigger_type_retry") + "<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
// 更新 log
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}

// 2、如果 job 不为空,说明存在失败的任务,发送告警消息
// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
int newAlarmStatus = 0;
if (info != null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult ? 2 : 3;
} else {
newAlarmStatus = 1;
}

// 3、更新 jobLog 的 alarm_status 值
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);

调度任务使用的就是前面介绍的 JobTriggerPoolHelper.trigger 方法,最后更新 jobLog 的 alarm_status 值,有两个作用:

  • 释放分布式锁;
  • 日志记录的 alarm_status 被设置为大于 0 的值,不会再被作为失败日志查询出来(findFailJobLogIds 方法的查询条件之一是 alarm_status == 0),避免了在下一次线程执行时再被重试。

JobCompleteHelper

Job 完成线程的辅助类

这个类与 JobRegistryHelper 类似,都有一个线程池、一个线程,通过前面 JobRegistryHelper 的学习,可以大胆猜测:

  • 线程池用来创建线程,处理接收到的请求;
  • 线程用来执行执行一些定“定时任务”。

实际上,该类中线程池和线程的作用就是用来 “完成” 一个 job。

接收回调

当 executor 接收到 admin 的调度请求后,会异步执行 job,并立刻返回一个回调。

admin 接受到回调后,和前面的 “注册、下线” 一样,在线程池中创建线程来处理回调,主要是更新 job 和日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码/**
* 接收回调请求的线程池
*/
private ThreadPoolExecutor callbackThreadPool = null;

// 初始化线程池
callbackThreadPool = new ThreadPoolExecutor(
2, 20, 30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3000),
r -> new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode()),
(r, executor) -> {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
});

当有回调请求时,public callback 方法(该方法被 AdminBizImpl 调用)会在线程池中创建一个线程,遍历回调请求的参数列表,依次处理回调参数,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码// 在线程池中创建线程处理回调参数
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {

callbackThreadPool.execute(new Runnable() {
@Override
public void run() {
for (HandleCallbackParam handleCallbackParam : callbackParamList) {
ReturnT<String> callbackResult = callback(handleCallbackParam);
// ...
}
}
});
return ReturnT.SUCCESS;
}

private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
// valid log item
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
// 更新 log 数据,略...

// 更新和完成 job
XxlJobCompleter.updateHandleInfoAndFinish(log);

return ReturnT.SUCCESS;
}

callBack 的调用顺序:JobApiController -> AdminBizImpl -> public allback -> private callback.

从代码可以看出,最后调用 XxlJobCompleter.updateHandleInfoAndFinish 方法完成回调逻辑。

更新 Job

如果一个 job 较长时间前被调度,但是一直处于 “运行中” 且它所属的 executor 已经超过 90 秒没有心跳了,那么可以认为该 job 已经丢失了,需要把该 job 结束掉。这个就是线程 monitorThread 的主要功能。

monitorThread 会以 60秒 一次的频率,从 xxl_job_log 表中找出 10分钟前调度、仍处于”运行中“状态、executor 已经下线 的 job,然后调用 XxlJobCompleter.updateHandleInfoAndFinish 来更新 handler 的信息和结束 job,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码/**
* 监视 丢失job 的线程
*/
private Thread monitorThread;

// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记为失败
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

if (losedJobIds != null && losedJobIds.size() > 0) {
for (Long logId : losedJobIds) {

XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);

jobLog.setHandleTime(new Date());
// 设置 handler_code
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg(I18nUtil.getString("joblog_lost_fail"));
// 更新执行信息和结束 job
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
}

从代码可以看出,上面的两个功能最后都调用了 XxlJobCompleter.updateHandleInfoAndFinish 方法,关于该方法的介绍,可以看后面 XxlJobCompleter 部分的介绍,这里不详细展开。

JobLogReportHelper

Job 日志统计辅助类

如果去看 XxlJobTrigger.triger 方法,会发现每次调度 job 时,都会先新增一个 jobLog 记录,这也是为什么 JobFailMonitorHelper 中的线程在重试时,先查询 jobLog 的原因。

JobLog 作为 job 的调度记录,还可以用来统计一段时间内 job 的调度次数、成功数等;另外,会清理超出有效期(配置的参数 logretentiondays)的日志,避免日志数据过大。很显然,这又是一个 ”自动任务“,可以使用一个线程定时完成。

该类持有一个线程变量,线程以 每分钟一次的频率,执行两个操作:

  • 统计一段时间的 job 数据,主要统计指标有:总的调度次数、处于调度运行中的次数、调度失败的次数、调度成功的次数;
  • 清理过期的日志数。

在线程 run 方法的前半部分,线程会统计 3 天内,每天的调度次数、运行次数、成功运行数、失败次数;然后更新或新增 xxl_job_log_report 表的数据。

清理日志

在线程 run 方法的后半部分,线程按天对日志进行清理,如果当前时间与上次清理的时间相隔超过一天,就会清理日志记录,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 根据上次执行时间、配置的过期参数,来决定是否执行清理
// 上次清理时间与当前超过1天才清理
long lastCleanLogTime = 0;


if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays() > 0
&& System.currentTimeMillis() - lastCleanLogTime > 24 * 60 * 60 * 1000) {
// 清理的开始时间... 略

// 开始清理日志
List<Long> logIds = null;
do {
logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(
0, 0, clearBeforeTime, 0, 1000);
if (logIds != null && logIds.size() > 0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
}
} while (logIds != null && logIds.size() > 0);

// 更新执行清理操作的时间
lastCleanLogTime = System.currentTimeMillis();
}

问题:为什么要用 lastCleanLogTime 记录上一次的清理时间?每次执行时,不能直接清理一天前创建的数据吗?

如果不使用参数 lastCleanLogTime 来记录上次清理的时间,只是清理一天前创建的数据记录。那么该线程每分钟执行一次时,都会删除前天当前时刻的数据,导致前一年的数不完整。

使用参数 lastCleanLogTime 来记录上次清理的时间,并且与当前时间相差超过一天时才清理,能保证前一天的日志是完整的。

问题:为什么每次只删除 1000 条日志?

不明白为什么清理日志时,不是一次性删除全部的过期日志,而是每次删除 1000条。按理说,这些旧的日志数据应该已经不在 buffer pool 中了,trigger_time 字段又是普通索引,那么 DELETE 操作会先更新到 change buffer 中,之后再合并。现在先查询再删除,相当于多了一次 IO 且没有使用到 change buffer。

JobScheduleHelper

Job 调度辅助类

admin 服务是用来管理和调度 job 的,用户也可以在它的管理后台新建一个 job,配置 CRON 和 JobHandler,然后 admin 服务就会按照配置的参数来调度 job。很显然,这种“自动化工作”也是由线程定时执行的。

1、如果使用线程调度 Job,存在的第一个问题是:如果某个 Job 在调度时比较耗时,就可能阻塞后续的 Job,导致后续 job 的执行有延迟,怎么解决这个问题?

在前面 JobTriggerPoolHelper 我们已经知道,admin 在调度 job 时是 ”使用线程池、线程“ 异步执行调度任务,避免了主线程的阻塞。

2、使用线程定时调度 job,存在的第二个问题是:怎么保证 job 在指定的时间执行,而不会出现大量延迟?

admin 使用 ”预读“ 的方式,提前读取在未来一段时间内要执行的 job,提前取到内存中,并使用 “时间轮算法” 按时间分组 job,把未来要执行的 job 下一个时间段执行。

3、还隐藏第三个问题:admin 服务是可以多实例部署的,在这种情况下该怎么避免一个 job 被多个实例重复调度?

admin 把一张数据表作为 “分布式锁” 来保证只有一个 admin 实例能执行 job 调度,又通过随机 sleep 线程一段时间,来降低线程之间的竞争。

下面我们就通过代码来了解 xxl-job 是怎么解决上述问题的。

调度线程

在该类中,定义了一个调度线程,用来调度要执行的 job 和已经过期一段时间的 job,定义代码如下:

1
2
3
4
5
6
7
8
java复制代码/**
* 预读的毫秒数
*/
public static final long PRE_READ_MS = 5000;
/**
* 预读和调度过期任务的线程
*/
private Thread scheduleThread;

预读

下面代码中的 pushTimeRing,是把 job 添加到一个 map 对象 ringData 中,然后让另一个线程从该 map 对象中取出,再次调度

该线程会预读出 “下次执行时间 <= now + 5000 毫秒内” 的部分 job,根据它们下一次执行时间划分成三段,执行三种不同的逻辑。

1、下次执行时间在 (−∞,now−5000)(-∞, now - 5000)(−∞,now−5000) 范围内

说明过期时间已经大于 5000 毫秒,这时如果过期策略要求调度,就调度一次。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// 调度一次
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
}
// 更新下一次执行时间
refreshNextValidTime(jobInfo, new Date());
}

2、下次执行时间在 [now−5000,now)[now - 5000, now)[now−5000,now) 范围内

说明过期时间小于5000毫秒,只能算是延迟不能算是过期,直接调度一次,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码if (nowTime > jobInfo.getTriggerNextTime()) {
// 1、 调度一次
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
// 2、 更新下一次调度时间
refreshNextValidTime(jobInfo, new Date());

// 3、 如果当前 job 处于 ”可以被调度“ 的状态,且下一次执行时间在 5000 毫秒内,就记录下 job Id,等待后面轮询
if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 下次调度的时刻:秒
int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
// 保存进 ringData 中
pushTimeRing(ringSecond, jobInfo.getId());
// 刷新下一次的调度时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}

如果 job 的下一次执行时间在 5000 毫秒以内,为了省下下次预读的 IO 耗时,这里会记录下 job id,等待后面的调度。

3、下次执行时间在 [now,now+5000)[now, now + 5000)[now,now+5000) 范围内

说明还没到执行时间,先记录下 job id,等待后面的调度,代码如下:

1
2
3
java复制代码int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
pushTimeRing(ringSecond, jobInfo.getId());
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

上面的3个步骤结束后,会更新 jobInfo 的 trigger_last_time、trigger_next_time、trigger_status 字段:

1
2
3
4
java复制代码// 更新 job 数据
for (XxlJobInfo jobInfo : scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}

可以看到,通过预读,一方面会把过期一小段时间的 job 执行一遍,另一方面会把未来一小段时间内要执行的 job 取出,保存进一个 map 对象 ringData 中,等待另一个线程调度。这样就避免了某些 job 到了时间还没执行。

分布式锁

因为 admin 是可以多实例部署的,所以在调度 job 时,需要考虑怎么避免 job 被多次调度。

xxl-job 在前面 JobFailMonitorHelper 中遍历失败的 job 时,会对每个 job 设置一个无效的状态作为 ”分布式行锁“,如果设置失败就跳过。而在这里,如果还使用该方法,有可能出现,一个 job 被设置为无效状态后,线程就崩溃了,导致该 job 永远无法被调度。因此,要尽量避免对 job 状态的修改。

在这里,admin 服务使用一张表 xxl_job_lock 作为分布式锁,每个 admin 实例都要先尝试获取该表的锁,获取成功才能继续执行;同时,为了降低不同实例之间的竞争,会在线程开始执勤随机 sleep 一段时间。

如何获取分布式锁?

在线程中会开启一个事务,设置为手动提交,然后对表 xxl_job_lock 执行 FOR UPDATE 查询。如果该线程执行语句成功,其他实例的线程就会排队等待该表的锁,实现了分布式锁功能。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码// 获取数据库链接,通过 SELECT FOR UPDATE 来尝试获取 X锁
// 在事务提交前一直持有该锁,其他实例的线程想获取该锁就会失败,并且会排队等待,直到第一个事务提交释放或锁超时
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
preparedStatement.execute();


// 提交事务,释放 X锁
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
// ...
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
// ...
}
try {
conn.close();
} catch (SQLException e) {
// ...
}
}

怎么降低锁的竞争?

为了降低锁竞争,在线程开始前会先 sleep 4000~5000 毫秒的随机值(不能大于 5000 毫秒,5000 毫秒是预读的时间范围);在线程结束当前循环时,会根据耗时和是否有预读数据,选择不同的 sleep 策略:

  • 耗时超过1000 毫秒,不sleep,直接开始下一次循环;
  • 耗时小于1000 毫秒,根据是否有预读数据,sleep 一个大小不同的随机时长:
    • 有预读数据,sleep 时间短一些,在 0~1000 毫秒范围内;
    • 没有预读数据,sleep 时间长一些,在 0~4000 毫秒范围内;

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码try {
// 随机 sleep 4000~5000 毫秒,通过这种方式,降低多实例部署时对锁的竞争
// 这里也看出来,最多部署 5000 台实例 ==..==
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}


// 耗时超过 1000 毫秒,就不 sleep
// 不超过 1000 ms,就 sleep 一个随机时长
long cost = System.currentTimeMillis() - start;
if (cost < 1000) {
try {
// 没有预读数据,就 sleep 时间长一点;有预读数据,就 sleep 时间短一些
TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}

ringThread:时间轮

在前面的线程中,对即将要开始的 job,不是立刻调度,而是按照执行的时刻(秒),把 job id 保存进一个 map 中,然后由 ringThread 线程按时刻进行调度,这只典型的“时间轮算法”。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码/**
* 调度线程2
*/
private Thread ringThread;
/**
* 按时刻(秒)调度 job
*/
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

// 调度过程

List<Integer> ringItemData = new ArrayList<>();
// 每次取出 2 个时刻的 job 来调度
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// 遍历 job Id,执行调度
if (ringItemData.size() > 0) {
for (int jobId : ringItemData) {
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
ringItemData.clear();
}

每次轮询调度时,只取出当前时刻(秒)、前一秒内的 job,不会去调度与现在相隔太久的 job。

在执行轮询调度前,有一个时间在 0~1000 毫秒范围内的 sleep。如果没有这个 sleep,该线程会一直执行,而 ringData 中当前时刻(秒)的数据可能已经为空,会导致大量无效的操作;增加了这个 sleep 之后,可以避免这种无效的操作。之所以 sleep 时间在 1000 毫秒以内,是因为调度时刻最小精确到秒,一秒的 sleep 可以避免 job 的延迟。

1
2
3
4
5
6
7
java复制代码try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}

问题:为什么在时间轮调度时,没有加分布式锁?

因为在前面的 scheduleThread 线程中,最后一个操作是把 job 的 next_trigger_time 值更新为大于 now + 5000 毫秒,其他 admin 实例 scheduleThread 线程的查询条件是:next_trigger_time < now + 5000,不会查询出这里调度的 job,所以不需要加分布式锁。

至此,XxlJobScheduler-init 方法的作用我们介绍完毕,下面我们简单介绍一下 XxlJobScheduler-destroy 方法


XxlJobScheduler-destroy

destroy 方法很简单,就是销毁前面初始化的线程池和线程,它销毁的顺序与前面启动的顺序相反。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码/**
* 销毁,销毁过程与 init 顺序相反
*/
public void destroy() throws Exception {

// 1、销毁 调度线程
JobScheduleHelper.getInstance().toStop();

// 2、销毁 日志统计和清理线程
JobLogReportHelper.getInstance().toStop();

// 3、销毁 丢失 job 监视器线程
JobCompleteHelper.getInstance().toStop();

// 4、销毁 失败 job 监视器线程
JobFailMonitorHelper.getInstance().toStop();

// admin registry stop
JobRegistryHelper.getInstance().toStop();

// admin trigger pool stop
JobTriggerPoolHelper.toStop();
}

因为各个 toStop 方法都很相似,所以我们只介绍 JobScheduleHelper 的 toStop 方法。

该方法的步骤如下:

1、设置停止标志位为 true;

2、sleep 一段时间,让出 CPU 时间片给线程执行任务;

3、如果线程不是终止状态(线程正在 sleep),中断它;

4、线程执行 join 方法,直到线程结束,执行最后一次。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码scheduleThreadToStop = true;
// 给线程 1s 的时间去执行任务
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
// 如果线程不是终止状态,就让它执行完所有任务
if (scheduleThread.getState() != Thread.State.TERMINATED) {
scheduleThread.interrupt();
try {
scheduleThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}

至此,JobScheduleHelper 的主要功能就介绍完了,可以看出, admin 服务在启动时,启动了多个线程池和线程,异步执行任务和异步响应 executor 的请求。

下面,我们介绍前面涉及到的 XxlJobTrigger 和 XxlJobCompleter。


XxlJobTrigger

调度 job 时的封装类

XxlJobTrigger 是调度 job 时的封装类,它主要工作就是接受传入的 jobId、调度参数等,查询对应的 jobGroup、jobInfo,然后调用 ExecutorBiz 对象来执行调度(run 方法)。

注意:这个类本身不会执行 http 请求,http 请求是在 core 包下的工具类 XxlJobRemotingUtil 中执行的。

该类中三个核心方法及其调用关系如下:trigger -> processTrigger -> runExecutor,

trigger

该方法的功能比较简单,就是根据传入的参数查询 jobGroup 和 jobInfo 对象,设置相关的字段值,然后调用 processTrigger 方法。

processTrigger

该方法的主要工作分为以下几步:

1、保存一条调度日志;

2、从 jobInfo、jobGroup 中取出字段值,构造 TriggerParam 对象;

3、根据 jobInfo 的路由策略,从 jobGroup 中取出要调度的 executor 地址;

4、调用 runExecutor 方法执行调度;

5、保存调度参数、设置调度信息、更新日志。

这里不会修改 jobInfo、jobGroup 对象的字段值,只取出字段值来使用,对这两个对象字段的修改,是在前一步 trigger方法中进行的。

runExecutor

该方法会执行调度,并返回调度结果,它的核心代码如下:

1
2
java复制代码ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);

这里使用 XxlJobScheduler 类取出 ExecutorBiz 对象,以 “懒加载” 的方式给每个 address 创建一个 ExecutorBiz 对象,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();

public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address == null || address.trim().length() == 0) {
return null;
}

// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}

// set-cache
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());

executorBizRepository.put(address, executorBiz);
return executorBiz;
}

吐槽一句:这个功能完全可以放在 XxlJobTrigger 类中、或者封装在 ExecutorBiz 内部,不知道为什么要放在 XxlJobScheduler,平白无故多了一层调用。

可以看出,该类中的三个方法其实可以归类为:pre -> execute -> post,在执行前、执行时、执行后做一些前置和收尾工作。

XxlJobCompleter

job 的完成类

该类在前面 JobCompleteHelper 中被使用,最终 job 的完成就是在该类中执行的,该类有两个主要方法:

  • updateHandleInfoAndFinish:公共方法,调用 finishJob 方法和更新日志;
  • finishJob:私有方法,执行子任务和更新日志;

下面主要介绍 finishJob 方法。

finishJob

finishJob 的主要功能是:如果当前任务执行成功了,就调度它的所有子任务,最后把子任务的调度消息添加到当前 job 的日志中。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码private static void finishJob(XxlJobLog xxlJobLog) {
// 1、job 执行成功,开始调度子任务handle success, to trigger child job
StringBuilder triggerChildMsg = null;
if (XxlJobContext.HANDLE_CODE_SUCCESS == xxlJobLog.getHandleCode()) {
XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId());
if (xxlJobInfo != null && xxlJobInfo.getChildJobId() != null && xxlJobInfo.getChildJobId().trim().length() > 0) {
// 2、遍历子任务ID
String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
for (int i = 0; i < childJobIds.length; i++) {
int childJobId = (childJobIds[i] != null && childJobIds[i].trim().length() > 0 && isNumeric(childJobIds[i])) ? Integer.parseInt(childJobIds[i]) : -1;
if (childJobId > 0) {
// 3、调度子任务
JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null);
ReturnT<String> triggerChildResult = ReturnT.SUCCESS;
// 4、添加日志信息....略
}
}
}
// 5、保存子任务的调度消息到日志
if (triggerChildMsg != null) {
xxlJobLog.setHandleMsg(xxlJobLog.getHandleMsg() + triggerChildMsg);
}
}

需要注意的是:

1、这里依赖于 JobTriggerPoolHelper 来调度 job,所以在 JobCompleteHelper 的监视线程开始时,有一个 50 秒的等待,就是等待 JobTriggerPoolHelper 启动完成;

2、在 finishJob 方法中,调度子任务的时候,默认子任务的调度结果是成功,注意,这里是指 “调度” 这个行为是成功的,而不是指子任务执行是成功的。

总结

1、XxlJobAdminConfig 作为 admin 服务的启动入口,要尽可能保持简洁,作用类似于一个仓库,来管理和持有所有的类和对象,并不会去启动具体的线程,它只需要“按下启动器的按钮”就可以了;

2、XxlJobScheduler 是 admin 服务的启动器类,它会调用各个辅助类(xxxHelper)来启动对应的线程;

3、对外的接口,比如调度 job、接收注册或下线等,都是使用线程池 + 线程 的异步方式实现,避免 job 对主线程的阻塞;

4、对“自动任务“类的功能,都是使用线程定时执行;

参考阅读

XXL-JOB分布式调度框架全面详解

时间轮算法

一个开源的时间轮算法介绍

原文链接:mp.weixin.qq.com/s/Qti9UEpUL…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…134135136…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%