开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

曲线点抽稀算法- Python 实现 何为抽稀 道格拉斯-普

发表于 2017-11-27

何为抽稀

在处理矢量化数据时,记录中往往会有很多重复数据,对进一步数据处理带来诸多不便。多余的数据一方面浪费了较多的存储空间,另一方面造成所要表达的图形不光滑或不符合标准。因此要通过某种规则,在保证矢量曲线形状不变的情况下, 最大限度地减少数据点个数,这个过程称为抽稀。

通俗的讲就是对曲线进行采样简化,即在曲线上取有限个点,将其变为折线,并且能够在一定程度保持原有形状。比较常用的两种抽稀算法是:道格拉斯-普克(Douglas-Peuker)算法和垂距限值法。

道格拉斯-普克(Douglas-Peuker)算法

Douglas-Peuker算法(DP算法)过程如下:

  • 1、连接曲线首尾两点A、B;
  • 2、依次计算曲线上所有点到A、B两点所在曲线的距离;
  • 3、计算最大距离D,如果D小于阈值threshold,则去掉曲线上出A、B外的所有点;如果D大于阈值threshold,则把曲线以最大距离分割成两段;
  • 4、对所有曲线分段重复1-3步骤,知道所有D均小于阈值。即完成抽稀。

这种算法的抽稀精度与阈值有很大关系,阈值越大,简化程度越大,点减少的越多;反之简化程度越低,点保留的越多,形状也越趋于原曲线。

下面是Python代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
复制代码# -*- coding: utf-8 -*-
"""
-------------------------------------------------
  File Name:    DouglasPeuker
  Description :  道格拉斯-普克抽稀算法
  Author :        J_hao
  date:          2017/8/16
-------------------------------------------------
  Change Activity:
                  2017/8/16: 道格拉斯-普克抽稀算法
-------------------------------------------------
"""
from __future__ import division
 
from math import sqrt, pow
 
__author__ = 'J_hao'
 
THRESHOLD = 0.0001  # 阈值
 
 
def point2LineDistance(point_a, point_b, point_c):
    """
    计算点a到点b c所在直线的距离
    :param point_a:
    :param point_b:
    :param point_c:
    :return:
    """
    # 首先计算b c 所在直线的斜率和截距
    if point_b[0] == point_c[0]:
        return 9999999
    slope = (point_b[1] - point_c[1]) / (point_b[0] - point_c[0])
    intercept = point_b[1] - slope * point_b[0]
 
    # 计算点a到b c所在直线的距离
    distance = abs(slope * point_a[0] - point_a[1] + intercept) / sqrt(1 + pow(slope, 2))
    return distance
 
 
class DouglasPeuker(object):
    def __init__(self):
        self.threshold = THRESHOLD
        self.qualify_list = list()
        self.disqualify_list = list()
 
    def diluting(self, point_list):
        """
        抽稀
        :param point_list:二维点列表
        :return:
        """
        if len(point_list) < 3:
            self.qualify_list.extend(point_list[::-1])
        else:
            # 找到与收尾两点连线距离最大的点
            max_distance_index, max_distance = 0, 0
            for index, point in enumerate(point_list):
                if index in [0, len(point_list) - 1]:
                    continue
                distance = point2LineDistance(point, point_list[0], point_list[-1])
                if distance > max_distance:
                    max_distance_index = index
                    max_distance = distance
 
            # 若最大距离小于阈值,则去掉所有中间点。 反之,则将曲线按最大距离点分割
            if max_distance < self.threshold:
                self.qualify_list.append(point_list[-1])
                self.qualify_list.append(point_list[0])
            else:
                # 将曲线按最大距离的点分割成两段
                sequence_a = point_list[:max_distance_index]
                sequence_b = point_list[max_distance_index:]
 
                for sequence in [sequence_a, sequence_b]:
                    if len(sequence) < 3 and sequence == sequence_b:
                        self.qualify_list.extend(sequence[::-1])
                    else:
                        self.disqualify_list.append(sequence)
 
    def main(self, point_list):
        self.diluting(point_list)
        while len(self.disqualify_list) > 0:
            self.diluting(self.disqualify_list.pop())
        print self.qualify_list
        print len(self.qualify_list)
 
 
if __name__ == '__main__':
    d = DouglasPeuker()
    d.main([[104.066228, 30.644527], [104.066279, 30.643528], [104.066296, 30.642528], [104.066314, 30.641529],
            [104.066332, 30.640529], [104.066383, 30.639530], [104.066400, 30.638530], [104.066451, 30.637531],
            [104.066468, 30.636532], [104.066518, 30.635533], [104.066535, 30.634533], [104.066586, 30.633534],
            [104.066636, 30.632536], [104.066686, 30.631537], [104.066735, 30.630538], [104.066785, 30.629539],
            [104.066802, 30.628539], [104.066820, 30.627540], [104.066871, 30.626541], [104.066888, 30.625541],
            [104.066906, 30.624541], [104.066924, 30.623541], [104.066942, 30.622542], [104.066960, 30.621542],
            [104.067011, 30.620543], [104.066122, 30.620086], [104.065124, 30.620021], [104.064124, 30.620022],
            [104.063124, 30.619990], [104.062125, 30.619958], [104.061125, 30.619926], [104.060126, 30.619894],
            [104.059126, 30.619895], [104.058127, 30.619928], [104.057518, 30.620722], [104.057625, 30.621716],
            [104.057735, 30.622710], [104.057878, 30.623700], [104.057984, 30.624694], [104.058094, 30.625688],
            [104.058204, 30.626682], [104.058315, 30.627676], [104.058425, 30.628670], [104.058502, 30.629667],
            [104.058518, 30.630667], [104.058503, 30.631667], [104.058521, 30.632666], [104.057664, 30.633182],
            [104.056664, 30.633174], [104.055664, 30.633166], [104.054672, 30.633289], [104.053758, 30.633694],
            [104.052852, 30.634118], [104.052623, 30.635091], [104.053145, 30.635945], [104.053675, 30.636793],
            [104.054200, 30.637643], [104.054756, 30.638475], [104.055295, 30.639317], [104.055843, 30.640153],
            [104.056387, 30.640993], [104.056933, 30.641830], [104.057478, 30.642669], [104.058023, 30.643507],
            [104.058595, 30.644327], [104.059152, 30.645158], [104.059663, 30.646018], [104.060171, 30.646879],
            [104.061170, 30.646855], [104.062168, 30.646781], [104.063167, 30.646823], [104.064167, 30.646814],
            [104.065163, 30.646725], [104.066157, 30.646618], [104.066231, 30.645620], [104.066247, 30.644621], ])

垂距限值法

垂距限值法其实和DP算法原理一样,但是垂距限值不是从整体角度考虑,而是依次扫描每一个点,检查是否符合要求。

算法过程如下:

  • 1、以第二个点开始,计算第二个点到前一个点和后一个点所在直线的距离d;
  • 2、如果d大于阈值,则保留第二个点,计算第三个点到第二个点和第四个点所在直线的距离d;若d小于阈值则舍弃第二个点,计算第三个点到第一个点和第四个点所在直线的距离d;
  • 3、依次类推,直线曲线上倒数第二个点。

下面是Python代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
复制代码# -*- coding: utf-8 -*-
"""
-------------------------------------------------
  File Name:    LimitVerticalDistance
  Description :  垂距限值抽稀算法
  Author :        J_hao
  date:          2017/8/17
-------------------------------------------------
  Change Activity:
                  2017/8/17:
-------------------------------------------------
"""
from __future__ import division
 
from math import sqrt, pow
 
__author__ = 'J_hao'
 
THRESHOLD = 0.0001  # 阈值
 
 
def point2LineDistance(point_a, point_b, point_c):
    """
    计算点a到点b c所在直线的距离
    :param point_a:
    :param point_b:
    :param point_c:
    :return:
    """
    # 首先计算b c 所在直线的斜率和截距
    if point_b[0] == point_c[0]:
        return 9999999
    slope = (point_b[1] - point_c[1]) / (point_b[0] - point_c[0])
    intercept = point_b[1] - slope * point_b[0]
 
    # 计算点a到b c所在直线的距离
    distance = abs(slope * point_a[0] - point_a[1] + intercept) / sqrt(1 + pow(slope, 2))
    return distance
 
 
class LimitVerticalDistance(object):
    def __init__(self):
        self.threshold = THRESHOLD
        self.qualify_list = list()
 
    def diluting(self, point_list):
        """
        抽稀
        :param point_list:二维点列表
        :return:
        """
        self.qualify_list.append(point_list[0])
        check_index = 1
        while check_index < len(point_list) - 1:
            distance = point2LineDistance(point_list[check_index],
                                          self.qualify_list[-1],
                                          point_list[check_index + 1])
 
            if distance < self.threshold:
                check_index += 1
            else:
                self.qualify_list.append(point_list[check_index])
                check_index += 1
        return self.qualify_list
 
 
if __name__ == '__main__':
    l = LimitVerticalDistance()
    diluting = l.diluting([[104.066228, 30.644527], [104.066279, 30.643528], [104.066296, 30.642528], [104.066314, 30.641529],
            [104.066332, 30.640529], [104.066383, 30.639530], [104.066400, 30.638530], [104.066451, 30.637531],
            [104.066468, 30.636532], [104.066518, 30.635533], [104.066535, 30.634533], [104.066586, 30.633534],
            [104.066636, 30.632536], [104.066686, 30.631537], [104.066735, 30.630538], [104.066785, 30.629539],
            [104.066802, 30.628539], [104.066820, 30.627540], [104.066871, 30.626541], [104.066888, 30.625541],
            [104.066906, 30.624541], [104.066924, 30.623541], [104.066942, 30.622542], [104.066960, 30.621542],
            [104.067011, 30.620543], [104.066122, 30.620086], [104.065124, 30.620021], [104.064124, 30.620022],
            [104.063124, 30.619990], [104.062125, 30.619958], [104.061125, 30.619926], [104.060126, 30.619894],
            [104.059126, 30.619895], [104.058127, 30.619928], [104.057518, 30.620722], [104.057625, 30.621716],
            [104.057735, 30.622710], [104.057878, 30.623700], [104.057984, 30.624694], [104.058094, 30.625688],
            [104.058204, 30.626682], [104.058315, 30.627676], [104.058425, 30.628670], [104.058502, 30.629667],
            [104.058518, 30.630667], [104.058503, 30.631667], [104.058521, 30.632666], [104.057664, 30.633182],
            [104.056664, 30.633174], [104.055664, 30.633166], [104.054672, 30.633289], [104.053758, 30.633694],
            [104.052852, 30.634118], [104.052623, 30.635091], [104.053145, 30.635945], [104.053675, 30.636793],
            [104.054200, 30.637643], [104.054756, 30.638475], [104.055295, 30.639317], [104.055843, 30.640153],
            [104.056387, 30.640993], [104.056933, 30.641830], [104.057478, 30.642669], [104.058023, 30.643507],
            [104.058595, 30.644327], [104.059152, 30.645158], [104.059663, 30.646018], [104.060171, 30.646879],
            [104.061170, 30.646855], [104.062168, 30.646781], [104.063167, 30.646823], [104.064167, 30.646814],
            [104.065163, 30.646725], [104.066157, 30.646618], [104.066231, 30.645620], [104.066247, 30.644621], ])
    print len(diluting)
    print(diluting)

最后

其实DP算法和垂距限值法原理一样,DP算法是从整体上考虑一条完整的曲线,实现时较垂距限值法复杂,但垂距限值法可能会在某些情况下导致局部最优。另外在实际使用中发现采用点到另外两点所在直线距离的方法来判断偏离,在曲线弧度比较大的情况下比较准确。如果在曲线弧度比较小,弯曲程度不明显时,这种方法抽稀效果不是很理想,建议使用三点所围成的三角形面积作为判断标准。下面是抽稀效果:

7164538
7164538

1 赞 1 收藏 评论

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

源码 并发一枝花之BlockingQueue 接口定义 实现

发表于 2017-11-27

今天来介绍Java并发编程中最受欢迎的同步类——堪称并发一枝花之BlockingQueue。

JDK版本:oracle java 1.8.0_102

继续阅读之前,需确保你对锁和条件队列的使用方法烂熟于心,特别是条件队列,否则你可能无法理解以下源码的精妙之处,甚至基本的正确性。本篇暂不涉及此部分内容,需读者自行准备。

接口定义

BlockingQueue继承自Queue,增加了阻塞的入队、出队等特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);

void put(E e) throws InterruptedException;

// can extends from Queue. i don't know why overriding here
boolean offer(E e);

boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

E take() throws InterruptedException;

// extends from Queue
// E poll();

E poll(long timeout, TimeUnit unit)
throws InterruptedException;

int remainingCapacity();

boolean remove(Object o);

public boolean contains(Object o);

int drainTo(Collection<? super E> c);

int drainTo(Collection<? super E> c, int maxElements);
}

为了方便讲解,我调整了部分方法的顺序,还增加了注释辅助说明。

需要关注的是两对方法:

  • 阻塞方法BlockingQueue#put()和BlockingQueue#take():如果入队(或出队,下同)失败(如希望入队但队列满,下同),则等待,一直到满足入队条件,入队成功。
  • 非阻塞方法BlockingQueue#offer()和BlockingQueue#poll(),及它们的超时版本:非超时版本是瞬时动作,如果入队当前入队失败,则立刻返回失败;超时版本可在此基础上阻塞一段时间,相当于限时的BlockingQueue#put()和BlockingQueue#take()。

实现类

BlockingQueue有很多实现类。根据github的code results排名,最常用的是LinkedBlockingQueue(253k)和ArrayBlockingQueue(95k)。LinkedBlockingQueue的性能在大部分情况下优于ArrayBlockingQueue,本文主要介绍LinkedBlockingQueue,文末会简要提及二者的对比。

LinkedBlockingQueue

阻塞方法put()和take()

两个阻塞方法相对简单,有助于理解LinkedBlockingQueue的核心思想:在队头和队尾各持有一把锁,入队和出队之间不存在竞争。

前面在Java实现生产者-消费者模型中循序渐进的引出了BlockingQueue#put()和BlockingQueue#take()的实现,可以先去复习一下,了解为什么LinkedBlockingQueue要如此设计。以下是更细致的讲解。

阻塞的入队操作put()

在队尾入队。putLock和notFull配合完成同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

现在触发一个入队操作,分情况讨论。

case1:入队前,队列非空非满(长度大于等于2)

入队前需得到锁putLock。检查队列非满,无需等待条件notFull,直接入队。入队后,检查队列非满(精确说是入队前“将满”,但不影响理解),随机通知一个生产者条件notFull满足。最后,检查入队前队列非空,则无需通知条件notEmpty。

注意点:

  • 入队前队列非空非满(长度大于等于2),则head和tail指向的节点不同,入队与出队操作不会同时更新同一节点也就不存在竞争。因此,分别用两个锁同步入队、出队操作才能是线程安全的。进一步的,由于入队已经由锁putLock保护,则enqueue内部实现不需要加锁。
  • 条件notFull可以只随机通知一个等待该条件的生产者线程(使用signal()而不是signalAll())。即“单次通知”,目的是减少无效竞争。但这不会产生“信号劫持”的问题,因为只有生产者在等待该条件。
  • 条件通知方法singal()是近乎“幂等”的:如果有线程在等待该条件,则随机选择一个线程通知;如果没有线程等待,则什么都不做,不会造成什么恶劣影响。
case2:入队前,队列满

入队前需得到锁putLock。检查队列满,则等待条件notFull。条件notFull可能由出队成功触发(必要的),也可能由入队成功触发(也是必要的,避免“信号不足”的问题)。条件notFull满足后,入队。入队后,假设检查队列满(队列非满的情况同case1),则无需通知条件notFull。最后,检查入队前队列非空,则无需通知条件notEmpty。

注意点:

  • “信号不足”问题:假设队列满时,存在3个生产者P1-P3(多于一个就可以)同时阻塞在10行;如果此时5个消费者C1-C5(多于一个就可以)快速、连续的出队,但最后只会有一个信号发出(19-20行在take()中的对偶逻辑,只会在队列之前消费前队列满的情况发出信号);一个信号只能唤醒一个生产者P1,但明显此时队列缺少了5个元素,该逻辑不足以唤醒P2、P3。因此,14-15行“入队完成时的通知”是必要的,保证了只要队列非满,每次入队后都能唤醒1个阻塞的生产者,来等待锁释放后竞争锁。即,P1完成入队后,如果检查到队列非满,会随机唤醒一个生产者P2,让P2在P1释放锁putLock后竞争锁,继续入队,P3同理。相比于signalAll()唤醒所有生产者,这种解决方案使得同一时间最多只有一个生产者在清醒的竞争锁,性能提升非常明显。

补充signalNotEmpty()、signalNotFull()的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
case3:入队前,队列空

入队前需得到锁putLock。检查队列空,则无需等待条件notFull,直接入队。入队后,如果队列非满,则同case1;如果队列满,则同case2。最后,假设检查入队前队列空(队列非空的情况同case1),则随机通知一个消费者条件notEmpty满足。

注意点:

  • 只有入队前队列空的情况下,才需要通知条件notEmpty满足。即“条件通知”,是一种减少无效通知的措施。因为如果队列非空,则出队操作不会阻塞在条件notEmpty上。另一方面,虽然已经有生产者完成了入队,但可能有消费者在生产者释放锁putLock后、通知条件notEmpty满足前,使队列变空;不过这没有影响,take()方法的while循环能够在线程竞争到锁之后再次确认。
  • 通过入队和出队前检查队列长度(while+await),隐含保证了队列空时只允许入队操作,不存在竞争队列。
case4:入队前,队列长度为1

case4是一个特殊情况,分析方法类似于case1,但可能入队与出队之间存在竞争,我们稍后分析。

阻塞的出队操作take()

在队头入队。takeLock和notEmpty配合完成同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

依旧是四种case,put()和take()是对偶的,很容易分析,不赘述。

“case4 队列长度为1”时的特殊情况

队列长度为1时,到底入队和出队之间存在竞争吗?这取决于LinkedBlockingQueue的底层数据结构。

最简单的是使用朴素链表,可以自己实现,也可以使用JDK提供的非线程安全集合类,如LinkedList等。但是,队列长度为1时,朴素链表中的head、tail指向同一个节点,从而入队、出队更新同一个节点时存在竞争。

朴素链表:一个节点保存一个元素,不加任何控制和trick。典型如LinkedList。

增加dummy node可解决该问题(或者叫哨兵节点什么的)。定义Node(item, next),描述如下:

  • 初始化链表时,创建dummy node:
    • dummy = new Node(null, null)
    • head = dummy.next // head 为 null <=> 队列空
    • tail = dummy // tail.item 为 null <=> 队列空
  • 在队尾入队时,tail后移:
    • tail.next = new Node(newItem, null)
    • tail = tail.next
  • 在队头出队时,dummy后移,同步更新head:
    • oldItem = head.item
    • dummy = dummy.next
    • dummy.item = null
    • head = dummy.next
    • return oldItem

在新的数据结构中,更新操作发生在dummy和tail上,head仅仅作为示意存在,跟随dummy节点更新。队列长度为1时,虽然head、tail仍指向同一个节点,但dummy、tail指向不同的节点,从而更新dummy和tail时不存在竞争。

源码中的head即为dummy,first即为head:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码...
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
...
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
...
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
...

enqueue和count自增的先后顺序

以put()为例,count自增一定要晚于enqueue执行,否则take()方法的while循环检查会失效。

用一个最简单的场景来分析,只有一个生产者线程T1,一个消费者线程T2。

如果先count自增再enqueue

假设目前队列长度0,则事件发生顺序:

  1. T1线程:count 自增
  2. T2线程:while 检查 count > 0,无需等待条件 notEmpty
  3. T2线程:dequeue 执行
  4. T1线程:enqueue 执行

很明显,在事件1发生后事件4发生前,虽然count>0,但队列中实际是没有元素的。因此,事件3 dequeue会执行失败(预计抛出NullPointerException)。事件4也就不会发生了。

如果先enqueue再count自增

如果先enqueue再count自增,就不会存在该问题。

仍假设目前队列长度0,则事件发生顺序:

  1. T1线程:enqueue 执行
  2. T2线程:while 检查 count == 0,等待条件 notEmpty
  3. T1线程:count 自增
  4. T1线程:通知条件notFull满足
  5. T1线程:通知条件notEmpty满足
  6. T2线程:收到条件notEmpty
  7. T2线程:while 检查 count > 0,无需等待条件 notEmpty
  8. T2线程:dequeue 执行

换个方法,用状态机来描述:

  • 事件E1发生前,队列处于状态S1
  • 事件E1发生,线程T1 增加了一个队列元素,导致队列元素的数量大于count(1>0),队列转换到状态S2
  • 事件E1发生后、直到事件E3发生前,队列一直处于状态S2
  • 事件E3发生,线程T1 使count自增,导致队列元素的数量等于count(1=1),队列转换到状态S1
  • 事件E3发生后、事件E8发生前,队列一直处于状态S1

很多读者可能第一次从状态机的角度来理解并发程序设计,所以猴子选择先写出状态迁移序列,如果能理解上述序列,我们再进行进一步的抽象。实际的状态机定义比下面要严谨的多,不过这里的描述已经足够了。

现在补充定义如下,不考虑入队和出队的区别:

  • 队列元素的数量等于count的状态定义为状态S1
  • 队列元素的数量大于count的状态定义为状态S2
  • enqueue操作定义为状态转换S1->S2
  • count自增操作定义为状态转换S2->S1

LinkedBlockingQueue中的同步机制保证了不会有其他线程看到状态S2,即,S1->S2->S1两个状态转换只能由线程T1连续完成,其他线程无法在中间插入状态转换。

在猴子的理解中,并发程序设计的本质是状态机,即维护合法的状态和状态转换。以上是一个极其简单的场景,用状态机举例子就可以描述;然而,复杂场景需要用状态机做数学证明,这使得用状态机描述并发程序设计不太受欢迎(虽然口头描述也不能算严格证明)。不过,理解实现中的各种代码顺序、猛不丁蹦出的trick,这些只是“知其所以然”;通过简单的例子来掌握其状态机本质,才能让我们了解其如何保证线程安全性,自己也能写出类似的实现,做到“知其然而知其所以然”。后面会继续用状态机分析ConcurrentLinkedQueue的源码,敬请期待。

非阻塞方法offer()和poll()

分析了两个阻塞方法put()、take()后,非阻塞方法就简单了。

瞬时版

以offer为例,poll()同理。假设此时队列非空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
case1:入队前,队列非满

入队前需得到锁putLock。检查队列非满(隐含表明“无需等待条件notFull”),直接入队。入队后,检查队列非满,随机通知一个生产者(包括使用put()方法的生产者,下同)条件notFull满足。最后,检查入队前队列非空,则无需通知条件notEmpty。

可以看到,瞬时版offer()在队列非满时的行为与put()相同。

case2:入队前,队列满

入队前需得到锁putLock。检查队列满,直接退出try-block。后同case1。

队列满时,offer()与put()的区别就显现出来了。put()通过while循环阻塞,一直等到条件notFull得到满足;而offer()却直接返回。

一个小point:

c在申请锁putLock前被赋值为-1。接下来,如果入队成功,会执行c = count.getAndIncrement();一句,则释放锁后,c的值将大于等于0。于是,这里直接用c是否大于等于0来判断是否入队成功。这种实现牺牲了可读性,只换来了无足轻重的性能或代码量的优化。自己在开发时,不要编写这种代码。

超时版

同上,以offer()为例。假设此时队列非空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

该方法同put()很像,12-13行判断nanos超时的情况(吞掉了timeout参数非法的异常情况),所以区别只有14行:将阻塞的notFull.await()换成非阻塞的超时版notFull.awaitNanos(nanos)。

awaitNanos()的实现有点意思,这里不表。其实现类中的Javadoc描述非常干练:“Block until signalled, interrupted, or timed out.”,返回值为剩余时间。剩余时间小于等于参数nanos,表示:

  1. 条件notFull满足(剩余时间大于0)
  2. 等待的总时长已超过timeout(剩余时间小于等于0)

nanos首先被初始化为timeout;接下来,消费者线程可能阻塞、收到信号多次,每次收到信号被唤醒,返回的剩余时间都大于0并小于等于参数nanos,再用剩余时间作为下次等待的参数nanos,直到剩余时间小于等于0。以此实现总时长不超过timeout的超时检测。

其他同put()方法。

12-13行判断nanos参数非法后,直接返回了false。实现有问题,有可能违反接口声明。

根据Javadoc的返回值声明,返回值true表示入队成功,false表示入队失败。但如果传进来的timeout是一个负数,那么5行初始化的nanos也将是一个负数;进而一进入while循环,就在13行返回了false。然而,这是一种参数非法的情况,返回false让人误以为参数正常,只是入队失败。这违反了接口声明,并且非常难以发现。

应该在函数头部就将参数非法的情况检查出来,相应抛出IllegalArgumentException。

LinkedBlockingQueue与ArrayBlockingQueue的区别

github上LinkedBlockingQueue和ArrayBlockingQueue的使用频率都很高。大部分情况下都可以也建议使用LinkedBlockingQueue,但清楚二者的异同点,方能对症下药,在针对不同的优化场景选择最合适的方案。

相同点:

  • 支持有界

不同点

  • LinkedBlockingQueue底层用链表实现:ArrayBlockingQueue底层用数组实现
  • LinkedBlockingQueue支持不指定容量的无界队列(长度最大值Integer.MAX_VALUE);ArrayBlockingQueue必须指定容量,无法扩容
  • LinkedBlockingQueue支持懒加载:ArrayBlockingQueue不支持
  • ArrayBlockingQueue入队时不生成额外对象:LinkedBlockingQueue需生成Node对象,消耗时间,且GC压力大
  • LinkedBlockingQueue的入队和出队分别用两把锁保护,无竞争,二者不会互相影响;ArrayBlockingQueue的入队和出队共用一把锁,入队和出队存在竞争,一方速度高时另一方速度会变低。不考虑分配对象、GC等因素的话,ArrayBlockingQueue并发性能要低于LinkedBlockingQueue

可以看到,LinkedBlockingQueue整体上是优于ArrayBlockingQueue的。所以,除非某些特殊原因,否则应优先使用LinkedBlockingQueue。

可能不全,欢迎评论,随时增改。

总结

没有。


本文链接:源码|并发一枝花之BlockingQueue
作者:猴子007
出处:monkeysayhi.github.io
本文基于 知识共享署名-相同方式共享 4.0 国际许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名及链接。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java模拟赛跑过程 需求 设计 && 实现 总结

发表于 2017-11-27

Java并发面试中的一个经典问题——手写代码模拟赛跑过程。该问题考查CountDownLatch的用法,比Java实现生产者-消费者模型的考查更直接:

  • 对Java并发模型的理解
  • 对Java并发编程接口(CountDownLatch)的熟练程度
  • 简化问题的能力
  • bug free
  • coding style

JDK版本:oracle java 1.8.0_102

需要注意是“简化问题的能力”。建议读者在继续阅读之前,自己先实现一个版本,之后再与文中的代码比较,看哪种思路更简洁,实现更美观。

需求

问题很经典,则需求概括如下:

  • 所有选手就位后,裁判鸣枪开始比赛
  • 选手到达终点或中途退赛后,记录成绩
  • 所有选手都已记录成绩后,结束比赛

再次建议读者自己先实现一个版本,再继续阅读。

设计 && 实现

设计

题目描述很清晰,现简化需求并设计如下:

1
复制代码选手逐渐就位 -> 开始比赛 -> 选手逐渐跑到终点或发生异常 -> 比赛结束

在三个连接处各使用一个CountDownLatch即可。

实现

三个连接处分别命名为ready、start、end,参赛者数量racerCnt,则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
复制代码public class Race {

public static void race(int racerCnt) throws InterruptedException {
CountDownLatch ready = new CountDownLatch(racerCnt);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(racerCnt);

for (int i = 0; i < racerCnt; i++) {
final int racerNo = i;
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("ready: " + racerNo);
ready.countDown();
try {
start.await();
System.out.println("start: " + racerNo);
} catch (InterruptedException e) {
System.out.println("I am hurt, and have to interrupt the race...");
Thread.currentThread().interrupt();
} finally {
System.out.println("end: " + racerNo);
end.countDown();
}
}
}).start();
}

ready.await();
System.out.println("********************** all ready!!! **********************");
System.out.println("********************** will start soon **********************");
start.countDown();
end.await();
System.out.println("********************** all end!!! **********************");
}

public static void main(String[] args) throws InterruptedException {
race(5);
}
}

选择一个比较乱的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码ready: 1
ready: 0
ready: 2
ready: 3
ready: 4
********************** all ready!!! **********************
********************** will start soon **********************
start: 1
start: 2
start: 3
end: 3
start: 0
end: 0
end: 2
end: 1
start: 4
end: 4
********************** all end!!! **********************

很明显:第一名是3号选手;最后一名是4号选手;3号跑完的时候,0号还没有开跑。

总结

用CountDownLatch模拟赛跑过程虽然简单,但能简洁、美观的实现却不容易,很考验面试者简化问题的能力。


本文链接:Java模拟赛跑过程
作者:猴子007
出处:monkeysayhi.github.io
本文基于 知识共享署名-相同方式共享 4.0 国际许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名及链接。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Maven的profile文件过滤引起的字符编码BUG

发表于 2017-11-27

本文记录了一次文件编码差异引起的profile替换占位符失败的bug,及处理思路。记录成文,以便以后反思,或让后来遇到问题的同学能有据可循。

起因及bug描述

相信大家对于Maven中打包不同环境使用不同profile文件的做法已经很熟悉了。我所在的项目分了好多个profile,每个profile对应不同的properties,如下图:

多个properties

多个properties

在开发的过程中,出现其中某几个properties编译打包出来的工程,占位符没有被替换,但是其他的properties则没有问题。war包中的配置文件中还是会有${xxx.xxx.xxx}的字符。

多个properties

多个properties

处理思路

  • 首先排除了编码失误,字符写错等等初级错误
  • 然后我怀疑是maven-resources-plugin插件在filter的过程中出现字符的问题,这个想法是由于在Maven的resource打包过程中,触发resources的filter操作的插件就是maven-resources-plugin,所以出问题一定是在这个插件的工作过程中,查看Maven的package过程日志,如下图:
    maven-resources-plugin的工作日志

maven-resources-plugin的工作日志

日志提示使用UTF-8的编码格式,拷贝了16个经过filter操作的resource文件。然后排除了properties文件中的中文干扰,但是依然没有解决问题,Maven打包后依然是那几个properties没有有效的替换占位符。

  • 暂时没有其他解决思路,只能逐字逐字的对比能成功替换占位符的properties和不能成功替换占位符的properties。还用上了各种对比工具。结局依然是没有发现什么异常的地方。

文件字符的对比

文件字符的对比

  • 确定字符没问题后,怀疑是文件本身的问题。复制正常和异常两种properties文件,修改名字,修改properties里面的内容信息。打包测试。测试的结果是,能正常替换占位符的properties复制出来的都能够正常替换占位符,反之不能正确替换占位符的properties复制出来的都不能正常替换占位符。由此可见,这两个properties可能在存档的时候就存在差异。从IDE里复制异常的properties中的字符,到记事本里保存到本地,果然是存档时的编码有问题,而IDE没有提示这个问题。

异常的字符

异常的字符

  • 最后替换了有异常编码的字符,就能成功打包替换了。

小结

在排除了大部分低级错误后,不要盲目进行没有必要的盲目测试,投石问路虽然是不错的方法,但是在排除bug时,一步一步的投石问路未免太过耗时。比方说在这次的bug解决中,在排除了大部分低级错误后,不应盲目进行测试,妄图以此找到解决问题的办法,而应该站在更高层次的角度来看待问题出现的缘由。所以在以后的编码学习过程中,应该加强自己的逻辑思维能力,提高分析解决问题的能力。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RocketMQ快速入门

发表于 2017-11-27

前面几篇文章介绍了为什么选择RocketMQ,以及与kafka的一些对比: 阿里 RocketMQ 优势对比,方便大家对于RocketMQ有一个简单的整体了解,之后介绍了:MQ 应用场景,让我们知道MQ在什么时候可以使用,可以解决什么问题,之后介绍了:RocketMQ集群部署配置;本篇文章接着上篇内容之后,来给大家介绍下RocketMQ快速入门。

如何使用

1、引入 rocketmq-client

1
2
3
4
5
复制代码<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>

2、编写Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码 DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
//指定NameServer地址
producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的

/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();

for (int i = 0; i < 997892; i++) {
try {
//构建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);

//发送同步消息
SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}


producer.shutdown();

3、编写Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
复制代码/**
* Consumer Group,非常重要的概念,后续会慢慢补充
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的

/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg:msgs){
String msgbody = new String(msg.getBody(), "utf-8");
System.out.println(" MessageBody: "+ msgbody);//输出消息内容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
}
});


consumer.start();

System.out.printf("Consumer Started.%n");

4、说明

各位根据自己的环境,修改NamesrvAddr的值,我的集群请参考:RocketMQ集群部署配置。稍后通过RocketMQ管控台就可以看到之前搭建的多Master多Slave模式,异步复制集群模式。

5、通过RocketMQ管控台

rocketmq-console-ng获取方式为:rocketmq-console-ng,之后通过mavne进行编译获取jar,命令如下:

1
2
复制代码mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

得到rocketmq-console-ng-1.0.0.jar之后,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根据自己的NamesrvAddr进行修改rocketmq.config.namesrvAddr的值。

直接启动:

1
复制代码java -jar rocketmq-console-ng-1.0.0.jar


管控台是基于springboot的,的确springboot非常方便和非常火了,所以有必要去学习下springboot了(其实还是spring系列,所以spring也必要深入学习下),稍后通过管控台进行观察运行。

6、运行观察

一个好的习惯是先运行Consumer,之后在运行Producer,之后通过rocketmq-console-ng管控台观察

运行中截图

运行完成之后,的确broker-a的数据加上broker-b的数据量就等于我们发送的数据量,而且slave的数量也master的数量也是一致的,效果如下:

运行完成

查看发送这些数据,2台机器的磁盘情况如下:

rocketmq1占用磁盘空间


rocketmq2占用磁盘空间
到目前位置,关于RocketMQ快速入门就结束了,未完待续……

如果读完觉得有收获的话,欢迎点赞加关注。


个人公众号,欢迎关注,查阅更多精彩历史!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

python27+Flask框架+新浪SAE之微信公众平台

发表于 2017-11-27

一、步骤:

以下为我在搭建时的大体步骤,遵循两条主线,其中编写代码文件和注册文件最重要,需要仔细阅读新浪云和微信公众号的开发文档 1、注册新浪云(需要身份认证)··>创建云应用··>编写代码文件和注册文件··>git上传代码到sae仓库··>测试连通性 2、注册微信公众平台··>进行开发者基本配置··>验证token··>(使用测试工具测试接口)··>启用

二、理解

对于整个过程的理解,转用别人的一张图,其实可以按照我们平时上网冲浪来理解,浏览器输入网址发送请求–>服务器接收到请求–>返回显示的html网页或者json、xml数据–>浏览器处理后显示,只是输入网址这个动作由微信公众号来完成了, flask框架就负责在服务器端接收请求处理请求,处理请求可以是多种多样,例如你可根据关注者发送来的图片进行一番图像处理然后返回,又或者识别信息,根据需求再去请求第三方api获得数据再返回,总之就看各位看官的创意啦。

三、开始动手实践

i.—-新浪云步骤—–

1、首先是去新浪云:http://t.cn/RC4mL5Y 官网注册账号,其实使用微博账号授权登录的,如果你有需要请点击我以上的链接进行注册登录哟,这样你和我两个都会有奖励的,还有一个要啰嗦的就是,新用户会有一定的云豆让你试用,但是如果你身份认证通过后,即使你不使用服务,云豆也会每天自动消耗的,这个要注意,尽早使用或者等要用再认证身份哟。 2、你可以点击云应用,然后进入 控制台,创建新应用

3、选择python,只有Python2.7可以选择,如果你学习的是Python3,建议你先去看一下菜鸟教程这篇关于py2和3区别的文章哟,千万不要因为版本问题而被吓退:
Python2.x与3.x版本区别

http://www.runoob.com/python/python-2x-3x.html

4、代码管理上传我使用的是git(没为什么,就因为我只会使用git),然后二级域名随便填,遵循不重名可用原则, 应用名称也是随便填,你开心就好。

5、创建应用成功后,在你电脑本地新建一个文件夹,关联sae仓库,如果你不熟悉git操作,不用怕,我为你准备了一本入门github电子书 :

learn-github-from-zero :http://pan.baidu.com/s/1eSw61Tg 提取密码:ishd

6、这时假设你创建了一个文件夹,在这个文件夹里使用git命令将你创建的云应用仓库克隆下来,然后添加两个文件,并分别写上如下内容,config.yaml文件是声明你的应用名称和版本号,index.wsgi就是将你创建的flask app注册进云应用开始启动(按照我的理解哈)

vendor文件夹里面存放的是云应用上python环境没有的第三方库,安装第三方库你需要指定该文件夹安装,然后git上传代码 命令:pip install -t vendor PACKAGE …

7、接下来就是编写代码测试连通性了,在你创建的文件夹里新建一个py文件(我的是weixin.py),先创建一个最简单的flask应用进行测试,git上传代码,这里要注意,
你创建的文件名要跟index.wsgi里面的导入文件名一致(看上图),如下

1
kotlin复制代码# -*- coding: utf-8 -*-# filename: weixin.pyfrom flask import Flaskapp = Flask(__name__)@app.route('/test/')def test():    return '<h1 style ="color:red">喂喂,我已经收到你发来的请求啦</h1>'

8、在浏览器输入你的二级域名网址,注意,再加一个路径test,熟悉flask的童鞋不用说了吧。这里说明新浪云应用可以正常接收请求并返回信息了。

ii.—-微信公众号步骤—–

1、前提就是你得要先有一个微信公众号,找到基本配置,填写你新浪云的二级域名网址,也就是请求网址,,Token按照要求随便填,是进行验证的,填完后进行提交这时token验证是没通过的,需要完善云应用上的代码。

2、然后来到weixin.py文件根据下面的流程图编写验证代码,我这里的token使用的就是在基本配置中填写的。

1
pgsql复制代码# -*- coding: utf-8 -*-# filename: weixin.pyfrom flask import Flask, request, make_responseimport time, hashlib@app.route('/wechat/', methods=['GET', 'POST'])def wechat():    # 微信验证token    if request.method == 'GET':        token = ' 你的token''        query = request.args        signature = query.get('signature', '')        timestamp = query.get('timestamp', '')        nonce = query.get('nonce', '')        echostr = query.get('echostr', '')        s = [timestamp, nonce, token]        s.sort()        s = ''.join(s)        if hashlib.sha1(s).hexdigest() == signature:            return make_response(echostr)

3、返回到微信公众平台的基本配置,进行提交,token就验证通过啦(撒花。。)

实现文字图片回复

1、原理:

微信公众号在前台收到关注者的信息,就会发送一段xml信息到请求网址,这时服务端的flask解析这段xml,然后将要回复的信息再打包成规定的xml信息进行返回。根据解析出的MsgType判断消息类型,文本是”text”,图片是”image”,进行相应的处理。

2、编写代码,先编写两个模型实体,代表接收消息体和返回消息体。

1
css复制代码# -*- coding: utf-8 -*-# filename: receive.pyimport xml.etree.ElementTree as ET'''该文件的类是接受体模型'''#进行消息类型判断,返回相应的接收体def parse_xml(web_data):    if len(web_data) == 0:        return None    xmlData = ET.fromstring(web_data)    msg_type = xmlData.find('MsgType').text    if msg_type == 'text':        return TextMsg(xmlData)    elif msg_type == 'image':        return ImageMsg(xmlData)#消息基类class Msg(object):    def __init__(self, xmlData):        self.ToUserName = xmlData.find('ToUserName').text        self.FromUserName = xmlData.find('FromUserName').text        self.CreateTime = xmlData.find('CreateTime').text        self.MsgType = xmlData.find('MsgType').text        self.MsgId = xmlData.find('MsgId').text#文本消息类class TextMsg(Msg):    def __init__(self, xmlData):        Msg.__init__(self, xmlData)        self.Content = xmlData.find('Content').text.encode("utf-8")#图片消息类class ImageMsg(Msg):    def __init__(self, xmlData):        Msg.__init__(self, xmlData)        self.PicUrl = xmlData.find('PicUrl').text        self.MediaId = xmlData.find('MediaId').text
1
dust复制代码# -*- coding: utf-8 -*-# filename: reply.pyimport time'''该文件的类是发送体模型,传参数进来即返回要发送的xml信息'''class Msg(object):    def __init__(self):        pass    def send(self):        return "success"class TextMsg(Msg):    def __init__(self, toUserName, fromUserName,content):        self.__dict = dict()        self.__dict['ToUserName'] = toUserName        self.__dict['FromUserName'] = fromUserName        self.__dict['CreateTime'] =int(time.time())        self.__dict['Content'] = content    def send(self):        XmlForm = """        <xml>        <ToUserName><![CDATA[{ToUserName}]]></ToUserName>        <FromUserName><![CDATA[{FromUserName}]]></FromUserName>        <CreateTime>{CreateTime}</CreateTime>        <MsgType><![CDATA[text]]></MsgType>        <Content><![CDATA[{Content}]]></Content>        </xml>        """        return XmlForm.format(**self.__dict)class ImageMsg(Msg):    def __init__(self, toUserName, fromUserName, mediaId):        self.__dict = dict()        self.__dict['ToUserName'] = toUserName        self.__dict['FromUserName'] = fromUserName        self.__dict['CreateTime'] = int(time.time())        self.__dict['MediaId'] = mediaId    def send(self):        XmlForm = """        <xml>        <ToUserName><![CDATA[{ToUserName}]]></ToUserName>        <FromUserName><![CDATA[{FromUserName}]]></FromUserName>        <CreateTime>{CreateTime}</CreateTime>        <MsgType><![CDATA[image]]></MsgType>        <Image>        <MediaId><![CDATA[{MediaId}]]></MediaId>        </Image>        </xml>        """        return XmlForm.format(**self.__dict)

3、主文件编写,代码都进行了注释,阅读代码即可

1
css复制代码# -*- coding: utf-8 -*-# filename: weixin.pyfrom flask import Flask, request, make_responseimport time, hashlibimport xml.etree.ElementTree as ETimport replyimport receiveapp = Flask(__name__)@app.route('/test/')def test():    return '<h1 style ="color:red">喂喂,我已经收到你发来的请求啦</h1>'@app.route('/wechat/', methods=['GET', 'POST'])def wechat():    # 微信验证token    if request.method == 'GET':        token = '你的token'        query = request.args        signature = query.get('signature', '')        timestamp = query.get('timestamp', '')        nonce = query.get('nonce', '')        echostr = query.get('echostr', '')        s = [timestamp, nonce, token]        s.sort()        s = ''.join(s)        if hashlib.sha1(s).hexdigest() == signature:       #哈希加密跟signature进行比对            return make_response(echostr)    else:        rec_msg = receive.parse_xml(request.stream.read())  #判断当前的消息类型,获取到接收实例        if rec_msg.MsgType == 'text':                 content = unicode(rec_msg.Content,"utf-8")  #转换编码为unicode,方便提取需要的文字进行判断            if content.startswith(u"笑话",0,2):       #如果是以笑话两字开头,则进行相应回复                 rep_text_msg = reply.TextMsg(rec_msg.FromUserName, rec_msg.ToUserName, "哈哈,我给你讲个笑话吧哈哈哈 \n %s"%getTime() )                   return rep_text_msg.send()     #返回需要返回的xml信息              else:                 rep_text_msg = reply.TextMsg(rec_msg.FromUserName,rec_msg.ToUserName,"复述:%s \n %s"%(rec_msg.Content,getTime()))                 return rep_text_msg.send()        elif  rec_msg.MsgType =="image": #我这里的处理是,如果是图片,就返回同样的MediaId,即是回复同样的图片            rep_img_msg = reply.ImageMsg(rec_msg.FromUserName,rec_msg.ToUserName,rec_msg.MediaId)            return rep_img_msg.send()        else:            return "success"      #微信公众号规定,超过5秒未进行回复,则发起重请求,所以如果是无法识别的消息,则返回“success”,                                       #   就不会在消息界面提示公众号异常,提升用户体验。#获取时间戳def getTime():    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

4、git提交代码,测试

总结:

实践出真知,看似简单的几行代码,对于新手来说,是要不断的进行查阅文档和资料,不断的进行试错的,我连git课本都给你准备好了,你还有什么理由不去动手实践一下?仔细的去琢磨一下官方的文档,没有什么比文档更加标准的啦。本文只是实现了一种回复消息和判断消息的大体思路,更多好玩的功能等你去发掘。码字辛苦,如果你觉得好就点个喜欢吧。

参考:

微信公众平台开发之用Flask+SAE实现简单被动消息回复功能

http://www.jianshu.com/p/ff3bd799e5d1

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PHP headers already sent 原因分析

发表于 2017-11-27

原文地址:PHP headers already sent 原因分析

先上结论,为了避免 headers already sent 错误,你应该[^1]:

  • 检查 PHP 代码,确认 <?php 前没有空格和空行
  • 避免在业务代码中使用 echo 和 print 系函数,只在框架组织 HTTP body 输出的时候使用,这些函数包括
+ print, echo, printf, vprintf
+ trigger\_error, ob\_flush, ob\_end\_flush, var\_dump, print\_r
+ readfile, passthru, flush, imagepng, imagejpeg

原因分析

最近上线代码之后遇到了一个问题,在某些情况下会抛出异常:Uncaught Exception: ErrorException: Severity: 2; Message: Cannot modify header information - headers already sent by…。而且这个异常并非总是会出现,在不了解原因的情况下想要在测试环境重现比较困难,以下是分析步骤。

异常产生的原因

它本质上是一个 E_WARNING,被 error_handler 截获而抛出异常:

1
2
3
4
5
6
7
8
9
10
复制代码<?php
function _error_handler($severity, $message, $filepath, $line)
{
// ...
if (($severity & error_reporting()) == $severity)
{
// db rollback
throw new ErrorException("Severity: $severity; Message: $message");
}
}

在 index.php 中我们设置 error_reporting 要报告 E_WARNING 错误,所以会走到这里并抛出异常。也就是说,我们需要找到 E_WARNING 抛出的位置和原因。

E_WARNING 产生的原因

1
2
3
复制代码<p>Severity: Warning</p>
<p>Message: Cannot modify header information - headers already sent by (output started at .../application/controllers/my_script.php:xxx)</p>
<p>Filename: libraries/Session.php</p>

这个错误从字面理解,就是设置 header() 的时候发现 header 中已经有内容了,那么,在异常信息中, headers already sent by () 括号里的内容就很重要了,它表明了是那一行的输出导致了这个问题。按照定位的位置,是脚本中的一个printf语句;继续看,是 Session 中的 setcookie() 方法发现这个 printf 语句已经输出内容了。

想要解决这个问题,可以使用 sprintf 来组装字符串,使用 fwrite 等标准输出将内容输出到控制台。

为什么会出现 headers already sent

在 PHP 中,不能在header()之前 echo 任何内容,一旦 echo,PHP 会发送已有的 header 内容,我们做一下实验。

在实验之前,你需要把php.ini中的 output_buffering 关闭或者设置一个很小的值。之后重启 php-fpm。

1
2
3
4
复制代码[PHP]
...
output_buffering = 3
...

这样设置表明输出的 buffer 不超过 3 个字符。

然后重现一下这个 bug:

1
2
3
4
5
6
复制代码<?php
public function test()
{
echo 'asd';
header('a: b');
}

使用 curl 访问一下,返回的 HTTP body 是 asd 和一个 headers already sent 错误信息,curl -I http://localhost/test一下看看 header,发现 a: b 并没有输出到 header 中。

echo 的内容超出了缓冲区限制的长度,便会作为 HTTP body 输出给 WEB 服务器。一旦 echo,PHP 输出 header 的任务就等于结束了,那么此时调用header()就会抛出 headers already sent 的错误。

修改一下代码:

1
2
3
4
5
6
7
复制代码<?php
public function test()
{
header('b: c');
echo 'asd';
header('a: b');
}

此时输出的 HTTP body 内容是相同的,但是 curl -I 看到的 header 中多了 b: c,说明 echo 之前的header()正确的输出了内容。

setcookie 方法也会发送 header:set-cookie: xxx,所以一样会引起这个问题。

在上面的例子中,我们将 output_buffering 设置为 3,如果 echo 的内容小于 3,是不会引起问题的,因为缓冲区缓冲了 echo 的内容,会在 header 输出之后再输出缓冲内容。在实际的应用中,可以给 output_buffering 一个稍大一些的值。

但是,不能依赖 output_buffering 的大小,应该尽量避免在业务代码中使用 echo 和 print 系函数。

怎样使用 echo

echo 很方便,古董 PHP 开发还会使用 echo 调试大法,而且我们要输出 HTTP 内容肯定要用到 echo 或者 print,怎么可能避免使用呢?

业务代码中尽量避免

我们应该避免在业务中使用,而不是禁止使用。当使用 echo 的时候,因为上述原因出现 headers already sent 错误,要看 output_buffering 设置的大小和 echo 内容的长度,这给 debug 带来了很大的不确定性,测试环境很可能会漏掉这个 case。

在业务中,可能用到 echo 的原因有:1. 调试代码,查看变量;2. 命令行脚本的输出。对于 1,建议通过调试工具调试,或者使用插件 clockwork;对于 2,可以在脚本中通过标准输出来输出重要内容,并不需要使用 echo。

1
2
复制代码<?php
fwrite(STDOUT, $content);

如果基于某种原因一定要使用,可以将一段输出用 ob_start 和 ob_end 包裹起来。被包裹的输出会进入内部缓冲区,在需要的时候再 flush 出来。

1
2
3
复制代码<?php
// ob_start 的函数定义
bool ob_start ([ callable $output_callback = NULL [, int $chunk_size = 0 [, int $flags = PHP_OUTPUT_HANDLER_STDFLAGS ]]])

$chunk_size=0的时候,只有在关闭缓冲区的时候才会输出缓冲区的内容。[^3]

1
2
3
4
5
6
7
8
复制代码<?php
public function test()
{
ob_start(); // 打开缓冲区
echo 'asd';
header('a: b');
ob_end_flush(); // 关闭缓冲区,将缓冲区的内容输出到 HTTP body
}

一般框架的输出都是这样设计的,echo 会包裹在 ob_start 和 ob_end 之间。

ob_start 的问题

ob_start 不能解决 PHP 代码不规范导致的 headers already sent:

1
2
3
4
5
6
7
8
9
复制代码           <?php
public function test()
{
ob_start(); // 打开缓冲区
echo 'asd';
header('a: b');
ob_end_flush(); // 关闭缓冲区,将缓冲区的内容输出到 HTTP body
}
// 这段代码也会报错

使用 ob_start 需要及时的将数据输出出去,否则可能会因为字符串拼接和二进制内容冲突:

1
2
3
4
5
6
7
8
9
复制代码<?php
public function test()
{
ob_start(); // 打开缓冲区
echo 'asd';
imagepng($resource);
ob_end_flush(); // 关闭缓冲区,将缓冲区的内容输出到 HTTP body
}
// asd 和 imagepng() 的内容混在一起,输出的图片不可用

好的实践

综上所述,一个良好的实践是:

  • output_buffering 关闭或者设置一个较小的数值[^2]
  • 如非必要,不使用 echo 和 print 系函数
  • 使用 echo 时,尽量用 ob_start 和 ob_end 包裹
  • 使用 ob_start 和 ob_end 包裹时,对自己包裹的内容有清晰的认识,尽量不要跨函数使用 ob_start 和 ob_end

[^1]: 参见 stackoverflow 回答,除此之外,还有 UTF-8 BOM 等其他原因
[^2]: 参见PHP程序访问报错Warning: Cannot modify header information - headers already sent by 和 PHP: 运行时配置 - Manual,开启 output_buffering 可能影响 PHP 执行效率
[^3]: 使用 ob_start 的时候不受 php.ini 中的 output_buffering 大小的影响

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

python异常定义与原理

发表于 2017-11-27
异常在程序中的作用
  • Error Handling:能够在异常处理语句中捕获并响应错误信息
  • Event Notification:即当我们应用程序在传入数据并进行数据处理过程中,针对不合法的事件我们是采取抛出异常而不是返回一个表示不合法的数据结果
  • Special-case handling:在异常处理器处理程序个别极端情况,可以通过assert来检查条件是否如我们的预期值一样
  • Termination actions:即保证程序中的资源能够在异常发生之后正常关闭
  • Unusual control flows:不正常的控制流,使用raise抛出异常信息
异常处理机制

默认异常处理器,即由python自动搜索匹配对应的异常信息

1
2
3
4
复制代码`# python命令行,默认是3.6,以下没有特殊说明全部是基于这个版本>>> str="keithl">>> str[10]`
Traceback (most recent call last):
 File "<stdin>", line 1, in <module>
IndexError: string index out of range

捕获异常信息

1
2
3
4
5
6
7
8
复制代码`# exception.pydef catch_index():`
   str="keithl"
   try:
       print(str[10])        # print(str[2])
   except IndexError as e:
       print(e)        else:
       print("try正常执行,没有异常发生...")if __name__ == '__main__':
   catch_index()

抛出异常,使用raise

1
2
3
4
5
复制代码`# exception.pydef raise_index():`
   str = "keithl"
   try:
       print(str[10])        except IndexError as e:                raise e        if __name__ == '__main__':
   raise_index()

自定义异常

1
2
3
4
5
6
复制代码`# exception.pyclass MyException(Exception):`
   def __str__(self):
       return "my exception object"    def define_exception():
       try:                    raise MyException            except MyException as e:
           print("get my exception error[%s]" % str(e))if __name__ == '__main__':
   define_exception()

使用finally终止try语句

1
2
3
4
5
6
7
复制代码`# exception.pydef raise_index_finally():`
   str = "keithl"
   try:
       print(str[10])        except IndexError:
       print("except index error")        finally:
       print("try statement to close resource ...")if __name__ == '__main__':
   raise_index_finally()

异常语句

try/except/else语句

  • 语句块的定义
1
2
3
4
5
6
7
复制代码`try:`
   print("程序业务逻辑.")except name1:
   print("捕获异常name1..")except (name2, name3):
   print("捕获异常name2 或 name3..")except name4 as var:
   print("捕获异常name4,并传递其引用变量到语句块中..")except:         # 5      
   print("捕获所有异常(上述的name1,name2,name3,name4除外..)")else:
   print("没有异常,try语句正常执行..")
  • try/except/else工作原理
  • 当在try语句块中发生异常时,异常类型将会匹配except对应的name,然后根据对应的name分配对应的异常类对象,执行statement中的语句
  • 当在try语句块中发生异常但没有在except中匹配到对应的name,python将会查询其他的异常直至进程最高级别的异常并退出程序,打印出默认的异常信息
  • 如果try语句正常执行,那么最后也将会执行else语句
  • 捕获任意或所有异常
  • 使用except:,即后面没有携带任何异常类,将会捕获先前没有定义的异常类
  • 使用except (e1,e2,..),即只要业务代码中抛出的异常是在定义的一系列异常列表中(e1,e2,..),那么就会在except语句中被捕获处理
  • except:与 except Exception:
  • except:是捕获所有的异常,但存在不足,一是捕获和业务代码无关的系统错误,二是会拦截其他程序的异常处理
  • except Exception::py3.x建议使用这个Exception类,同时可以避免上述问题,Exception会忽略与系统操作相关的异常,如系统退出py时报出的异常
1
2
3
4
5
6
复制代码`# exception.py# `except:`执行遇到系统退出的时候会捕获异常def test_exception():`
   try:                for i in range(10):
           print(i)                        if i == 5:
               sys.exit(-1)        except:
       print("直接使用空的异常来捕获")if __name__ == '__main__':
   test_exception()>>> python exception.py

1
2
3
4
5
6
复制代码`# `except Exception:`在系统退出的时候没有捕获异常def test_exception():`
   try:                for i in range(10):
           print(i)                        if i == 5:
               sys.exit(-1)        except Exception:
       print("直接使用空的异常来捕获")if __name__ == '__main__':
   test_exception()>>> python exception.py

  • py2.x与py3.x的异常语句作用域
  • py2.x可以支持except Exception as e以及 except Exception,e,py3.x仅支持except Exception as e
  • py2.x的异常语句Exception对应的实例变量e是全局变量,在try语句块外还可以直接访问
  • py3.x的异常语句Exception对应的实例变量e是局部变量,在try语句块外不能访问,但是可以保存在一个全局的变量中

try/finally语句

  • 语句块定义
1
2
3
复制代码`try:`
   print("执行业务代码语句块...")finally:
   print("关闭处理业务的资源...")
  • try/finally工作原理
  • 当try语句块中的业务逻辑是正常执行的时候,在程序退出返回的时候将会执行finally语句块
  • 当try语句块中的业务逻辑出现异常的时候,仍然会执行finally语句块并将异常信息一并向顶层程序抛出
  • 示例
1
2
3
4
5
6
复制代码`# exception.pydef raise_exception():`
   raise MyException()def test_try_finally():
   try:
       raise_exception()        finally:
       print("execute test_try_finally statement ...")if __name__ == '__main__':
   test_try_finally()>>> python exception.py

try/except/finally语句

  • 这个在上面的基础上添加一个异常处理块,其一般模板为
1
2
3
4
5
6
复制代码`try:`
   print("处理核心业务逻辑")except Exception as e1:
   print("捕获异常信息e1")except Exception as e2:
   print("捕获异常信息e2")else:
   print("try的业务语句没有异常,正常执行完毕后将执行else语句")finally:
   print("关闭消耗CPU的资源")

raise语句

  • raise instance:raise 语句抛出一个异常信息类型图13
  • raise class:raise 语句创建一个异常类class的实例并抛出异常信息图14
  • raise:将一个异常类class信息抛出并往顶层程序传播
  • 在一个try语句中发生的异常信息类会在except语句中按照顺序进行匹配
  • 抛出的异常类是except定义的异常类的子类或本类,其他在except定义的异常类将不捕获异常
  • 抛出的异常类如果是except定义的异常类的父类,也不会捕获到该父类的异常

  • 仅py3.x支持的from语句

raise newException from otherException

assert语句

  • 语法:assert test,data # data是可选的
  • 工作原理如下:
1
复制代码if __debug__:        if not test:                raise AssertionError(data)
  • 示例:

with/as上下文管理器

  • 基本语法
1
复制代码with expression [as variable]        with-block# 表达式expression返回的是一个实现上下文管理器协议的对象
  • 场景一:IO操作自动打开和关闭资源
1
2
复制代码`# 打开文件filepath并且自动读取with open("filepath") as file_object:        for line in file_object:`
       print(line)
  • 场景二:应用在线程锁中,自动获取和释放锁
1
2
3
复制代码`# 在一个线程中对语句块进行锁操作lock = threading.Lock()     # 导入threading模块with lock:                  # 在业务代码执行之前自动获取锁,在执行完成之后自动释放锁,除非有异常抛出`
   # 执行相关的业务代码
   pass
  • 场景三:在decimal模块中使用上下文管理来设置decimal操作业务数据的格式,退出后格式自动清除失效
1
2
3
复制代码`import decimalwith decimal.localcontext() as ctx:` 
   ctx.prec = 2        # 保留小数点后两位
   x = decimal.Decimal('5.00') / decimal.Decimal('3.00')
  • with表达式语句的工作原理
  • 上下文管理器对象必须有实现内置操作符__enter__和 __exit__方法
  • 在with语句中返回一个对象管理器并分配一个变量的时候将会回调__enter__方法
  • 执行嵌套的语句块,也就是上面的相关业务代码
  • 当有异常信息抛出的时候,就会回调__exit__的方法,同时携带type,value,traceback三个参数(通过sys.exc_info获取到)
  • 正常执行完成之后,也会回调__exit__的方法
1
2
3
4
5
6
7
8
9
10
复制代码`# exception.pyclass WithContextObject:`
   def message(self,args):
       print(args)            def __enter__(self):
       print("execute enter method ..")                return self            def __exit__(self, exc_type, exc_val, exc_tb):
       if exc_type is None:
           print("execute normally...")                else:
           print("raise exception ...")                        return Falsedef test_with():
   with WithContextObject() as context:
       context.message("take message")         if __name__ == '__main__':
   test_with()>>> python exception.py

  • 使用多个with上下文管理器
1
2
3
复制代码`# 基本语法with open(file_path1) as reader,with open(file_path2) as writer:        for line in reader:`
       writer.write(line)    # 等价于with open(file_path1) as reader:        with open(file_path2) as writer:                for line in reader:
           writer.write(line)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

聊聊spring security的permitAll以及w

发表于 2017-11-27

序

本文主要聊一下spring security的permitAll以及webIgnore的区别

permitAll配置实例

1
2
3
4
5
6
7
8
9
10
复制代码@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
public void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/css/**", "/js/**","/fonts/**").permitAll()
.anyRequest().authenticated();
}
}

web ignore配置实例

1
2
3
4
5
6
7
8
9
复制代码@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
public void configure(WebSecurity web) throws Exception {
web.ignoring().antMatchers("/css/**");
web.ignoring().antMatchers("/js/**");
web.ignoring().antMatchers("/fonts/**");
}
}

二者区别

顾名思义,WebSecurity主要是配置跟web资源相关的,比如css、js、images等等,但是这个还不是本质的区别,关键的区别如下:

  • ingore是完全绕过了spring security的所有filter,相当于不走spring security
  • permitall没有绕过spring security,其中包含了登录的以及匿名的。

AnonymousAuthenticationFilter

spring-security-web-4.2.3.RELEASE-sources.jar!/org/springframework/security/web/authentication/AnonymousAuthenticationFilter.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
复制代码/**
* Detects if there is no {@code Authentication} object in the
* {@code SecurityContextHolder}, and populates it with one if needed.
*
* @author Ben Alex
* @author Luke Taylor
*/
public class AnonymousAuthenticationFilter extends GenericFilterBean implements
InitializingBean {
//......
public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain)
throws IOException, ServletException {

if (SecurityContextHolder.getContext().getAuthentication() == null) {
SecurityContextHolder.getContext().setAuthentication(
createAuthentication((HttpServletRequest) req));

if (logger.isDebugEnabled()) {
logger.debug("Populated SecurityContextHolder with anonymous token: '"
+ SecurityContextHolder.getContext().getAuthentication() + "'");
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("SecurityContextHolder not populated with anonymous token, as it already contained: '"
+ SecurityContextHolder.getContext().getAuthentication() + "'");
}
}

chain.doFilter(req, res);
}

protected Authentication createAuthentication(HttpServletRequest request) {
AnonymousAuthenticationToken auth = new AnonymousAuthenticationToken(key,
principal, authorities);
auth.setDetails(authenticationDetailsSource.buildDetails(request));

return auth;
}

//......
}

这个filter的主要功能就是给没有登陆的用户,填充AnonymousAuthenticationToken到SecurityContextHolder的Authentication,后续依赖Authentication的代码可以统一处理。

FilterComparator

spring-security-config-4.1.4.RELEASE-sources.jar!/org/springframework/security/config/annotation/web/builders/FilterComparator.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
复制代码final class FilterComparator implements Comparator<Filter>, Serializable {
private static final int STEP = 100;
private Map<String, Integer> filterToOrder = new HashMap<String, Integer>();

FilterComparator() {
int order = 100;
put(ChannelProcessingFilter.class, order);
order += STEP;
put(ConcurrentSessionFilter.class, order);
order += STEP;
put(WebAsyncManagerIntegrationFilter.class, order);
order += STEP;
put(SecurityContextPersistenceFilter.class, order);
order += STEP;
put(HeaderWriterFilter.class, order);
order += STEP;
put(CorsFilter.class, order);
order += STEP;
put(CsrfFilter.class, order);
order += STEP;
put(LogoutFilter.class, order);
order += STEP;
put(X509AuthenticationFilter.class, order);
order += STEP;
put(AbstractPreAuthenticatedProcessingFilter.class, order);
order += STEP;
filterToOrder.put("org.springframework.security.cas.web.CasAuthenticationFilter",
order);
order += STEP;
put(UsernamePasswordAuthenticationFilter.class, order);
order += STEP;
put(ConcurrentSessionFilter.class, order);
order += STEP;
filterToOrder.put(
"org.springframework.security.openid.OpenIDAuthenticationFilter", order);
order += STEP;
put(DefaultLoginPageGeneratingFilter.class, order);
order += STEP;
put(ConcurrentSessionFilter.class, order);
order += STEP;
put(DigestAuthenticationFilter.class, order);
order += STEP;
put(BasicAuthenticationFilter.class, order);
order += STEP;
put(RequestCacheAwareFilter.class, order);
order += STEP;
put(SecurityContextHolderAwareRequestFilter.class, order);
order += STEP;
put(JaasApiIntegrationFilter.class, order);
order += STEP;
put(RememberMeAuthenticationFilter.class, order);
order += STEP;
put(AnonymousAuthenticationFilter.class, order);
order += STEP;
put(SessionManagementFilter.class, order);
order += STEP;
put(ExceptionTranslationFilter.class, order);
order += STEP;
put(FilterSecurityInterceptor.class, order);
order += STEP;
put(SwitchUserFilter.class, order);
}

//......
}

这个类定义了spring security内置的filter的优先级,AnonymousAuthenticationFilter在倒数第五个执行,在FilterSecurityInterceptor这个类之前。

FilterSecurityInterceptor

spring-security-web-4.2.3.RELEASE-sources.jar!/org/springframework/security/web/access/intercept/FilterSecurityInterceptor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码/**
* Performs security handling of HTTP resources via a filter implementation.
* <p>
* The <code>SecurityMetadataSource</code> required by this security interceptor is of
* type {@link FilterInvocationSecurityMetadataSource}.
* <p>
* Refer to {@link AbstractSecurityInterceptor} for details on the workflow.
* </p>
*
* @author Ben Alex
* @author Rob Winch
*/
public class FilterSecurityInterceptor extends AbstractSecurityInterceptor implements
Filter {
//......
}

这个相当于spring security的核心处理类了,它继承抽象类AbstractSecurityInterceptor

spring-security-core-4.2.3.RELEASE-sources.jar!/org/springframework/security/access/intercept/AbstractSecurityInterceptor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
复制代码public abstract class AbstractSecurityInterceptor implements InitializingBean,
ApplicationEventPublisherAware, MessageSourceAware {
//......
protected InterceptorStatusToken beforeInvocation(Object object) {
Assert.notNull(object, "Object was null");
final boolean debug = logger.isDebugEnabled();

if (!getSecureObjectClass().isAssignableFrom(object.getClass())) {
throw new IllegalArgumentException(
"Security invocation attempted for object "
+ object.getClass().getName()
+ " but AbstractSecurityInterceptor only configured to support secure objects of type: "
+ getSecureObjectClass());
}

Collection<ConfigAttribute> attributes = this.obtainSecurityMetadataSource()
.getAttributes(object);

if (attributes == null || attributes.isEmpty()) {
if (rejectPublicInvocations) {
throw new IllegalArgumentException(
"Secure object invocation "
+ object
+ " was denied as public invocations are not allowed via this interceptor. "
+ "This indicates a configuration error because the "
+ "rejectPublicInvocations property is set to 'true'");
}

if (debug) {
logger.debug("Public object - authentication not attempted");
}

publishEvent(new PublicInvocationEvent(object));

return null; // no further work post-invocation
}

if (debug) {
logger.debug("Secure object: " + object + "; Attributes: " + attributes);
}

if (SecurityContextHolder.getContext().getAuthentication() == null) {
credentialsNotFound(messages.getMessage(
"AbstractSecurityInterceptor.authenticationNotFound",
"An Authentication object was not found in the SecurityContext"),
object, attributes);
}

Authentication authenticated = authenticateIfRequired();

// Attempt authorization
try {
this.accessDecisionManager.decide(authenticated, object, attributes);
}
catch (AccessDeniedException accessDeniedException) {
publishEvent(new AuthorizationFailureEvent(object, attributes, authenticated,
accessDeniedException));

throw accessDeniedException;
}

if (debug) {
logger.debug("Authorization successful");
}

if (publishAuthorizationSuccess) {
publishEvent(new AuthorizedEvent(object, attributes, authenticated));
}

// Attempt to run as a different user
Authentication runAs = this.runAsManager.buildRunAs(authenticated, object,
attributes);

if (runAs == null) {
if (debug) {
logger.debug("RunAsManager did not change Authentication object");
}

// no further work post-invocation
return new InterceptorStatusToken(SecurityContextHolder.getContext(), false,
attributes, object);
}
else {
if (debug) {
logger.debug("Switching to RunAs Authentication: " + runAs);
}

SecurityContext origCtx = SecurityContextHolder.getContext();
SecurityContextHolder.setContext(SecurityContextHolder.createEmptyContext());
SecurityContextHolder.getContext().setAuthentication(runAs);

// need to revert to token.Authenticated post-invocation
return new InterceptorStatusToken(origCtx, true, attributes, object);
}
}
//......
}

主要的逻辑在这个beforeInvocation方法,它就依赖了authentication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码private Authentication authenticateIfRequired() {
Authentication authentication = SecurityContextHolder.getContext()
.getAuthentication();

if (authentication.isAuthenticated() && !alwaysReauthenticate) {
if (logger.isDebugEnabled()) {
logger.debug("Previously Authenticated: " + authentication);
}

return authentication;
}

authentication = authenticationManager.authenticate(authentication);

// We don't authenticated.setAuthentication(true), because each provider should do
// that
if (logger.isDebugEnabled()) {
logger.debug("Successfully Authenticated: " + authentication);
}

SecurityContextHolder.getContext().setAuthentication(authentication);

return authentication;
}

这个方法判断authentication如果是已经校验过的,则返回;没有校验过的话,则调用authenticationManager进行鉴权。

而AnonymousAuthenticationFilter设置的authentication在这个时候就派上用场了
spring-security-core-4.2.3.RELEASE-sources.jar!/org/springframework/security/authentication/AnonymousAuthenticationToken.java

1
2
3
4
5
> public class AnonymousAuthenticationToken extends AbstractAuthenticationToken implements  
> Serializable {
> private AnonymousAuthenticationToken(Integer keyHash, Object principal,
> Collection<? extends GrantedAuthority> authorities) {
> super(authorities);

复制代码 if (principal == null || “”.equals(principal)) {
throw new IllegalArgumentException(“principal cannot be null or empty”);
}
Assert.notEmpty(authorities, “authorities cannot be null or empty”);

this.keyHash = keyHash;
this.principal = principal;
setAuthenticated(true);

}
//……

1
}

它默认就是authenticated

小结

  • web ignore比较适合配置前端相关的静态资源,它是完全绕过spring security的所有filter的;
  • 而permitAll,会给没有登录的用户适配一个AnonymousAuthenticationToken,设置到SecurityContextHolder,方便后面的filter可以统一处理authentication。

doc

  • spring security 的几个细节
  • Spring Security – security none, filters none, access permitAll
  • Spring Security permitAll() not allowing anonymous access
  • Difference between access=“permitAll” and filters=“none”?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go Channel 详解 目录 【−】

发表于 2017-11-27

目录 [−]

  1. Channel类型
  2. blocking
  3. Buffered Channels
  4. Range
  5. select
    1. timeout
  6. Timer和Ticker
  7. close
  8. 同步
  9. 参考资料

Channel是Go中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication)。

它的操作符是箭头 <- 。

1
2
复制代码ch <- v    // 发送值v到Channel ch中
v := <-ch // 从Channel ch中接收数据,并将数据赋值给v

(箭头的指向就是数据的流向)

就像 map 和 slice 数据类型一样, channel必须先创建再使用:

1
复制代码ch := make(chan int)

Channel类型

Channel类型的定义格式如下:

1
复制代码ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType .

它包括三种类型的定义。可选的<-代表channel的方向。如果没有指定方向,那么Channel就是双向的,既可以接收数据,也可以发送数据。

1
2
3
复制代码chan T          // 可以接收和发送类型为 T 的数据
chan<- float64 // 只可以用来发送 float64 类型的数据
<-chan int // 只可以用来接收 int 类型的数据

<-总是优先和最左边的类型结合。(The <- operator associates with the leftmost chan possible)

1
2
3
4
复制代码chan<- chan int    // 等价 chan<- (chan int)
chan<- <-chan int // 等价 chan<- (<-chan int)
<-chan <-chan int // 等价 <-chan (<-chan int)
chan (<-chan int)

使用make初始化Channel,并且可以设置容量:

1
复制代码make(chan int, 100)

容量(capacity)代表Channel容纳的最多的元素的数量,代表Channel的缓存的大小。
如果没有设置容量,或者容量设置为0, 说明Channel没有缓存,只有sender和receiver都准备好了后它们的通讯(communication)才会发生(Blocking)。如果设置了缓存,就有可能不发生阻塞, 只有buffer满了后 send才会阻塞, 而只有缓存空了后receive才会阻塞。一个nil channel不会通信。

可以通过内建的close方法可以关闭Channel。

你可以在多个goroutine从/往 一个channel 中 receive/send 数据, 不必考虑额外的同步措施。

Channel可以作为一个先入先出(FIFO)的队列,接收的数据和发送的数据的顺序是一致的。

channel的 receive支持 multi-valued assignment,如

1
复制代码v, ok := <-ch

它可以用来检查Channel是否已经被关闭了。

  1. send语句
    send语句用来往Channel中发送数据, 如ch <- 3。
    它的定义如下:
1
2
复制代码SendStmt = Channel "<-" Expression .
Channel = Expression .

在通讯(communication)开始前channel和expression必选先求值出来(evaluated),比如下面的(3+4)先计算出7然后再发送给channel。

1
2
3
4
5
复制代码c := make(chan int)
defer close(c)
go func() { c <- 3 + 4 }()
i := <-c
fmt.Println(i)

send被执行前(proceed)通讯(communication)一直被阻塞着。如前所言,无缓存的channel只有在receiver准备好后send才被执行。如果有缓存,并且缓存未满,则send会被执行。

往一个已经被close的channel中继续发送数据会导致run-time panic。

往nil channel中发送数据会一致被阻塞着。

  1. receive 操作符
    <-ch用来从channel ch中接收数据,这个表达式会一直被block,直到有数据可以接收。

从一个nil channel中接收数据会一直被block。

从一个被close的channel中接收数据不会被阻塞,而是立即返回,接收完已发送的数据后会返回元素类型的零值(zero value)。

如前所述,你可以使用一个额外的返回参数来检查channel是否关闭。

1
2
3
复制代码x, ok := <-ch
x, ok = <-ch
var x, ok = <-ch

如果OK 是false,表明接收的x是产生的零值,这个channel被关闭了或者为空。

blocking

缺省情况下,发送和接收会一直阻塞着,直到另一方准备好。这种方式可以用来在gororutine中进行同步,而不必使用显示的锁或者条件变量。

如官方的例子中x, y := <-c, <-c这句会一直等待计算结果发送到channel中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // send sum to c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // receive from c
fmt.Println(x, y, x+y)
}

Buffered Channels

make的第二个参数指定缓存的大小:ch := make(chan int, 100)。

通过缓存的使用,可以尽量避免阻塞,提供应用的性能。

Range

for …… range语句可以处理Channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码func main() {
go func() {
time.Sleep(1 * time.Hour)
}()
c := make(chan int)
go func() {
for i := 0; i < 10; i = i + 1 {
c <- i
}
close(c)
}()
for i := range c {
fmt.Println(i)
}
fmt.Println("Finished")
}

range c产生的迭代值为Channel中发送的值,它会一直迭代直到channel被关闭。上面的例子中如果把close(c)注释掉,程序会一直阻塞在for …… range那一行。

select

select语句选择一组可能的send操作和receive操作去处理。它类似switch,但是只是用来处理通讯(communication)操作。
它的case可以是send语句,也可以是receive语句,亦或者default。

receive语句可以将值赋值给一个或者两个变量。它必须是一个receive操作。

最多允许有一个default case,它可以放在case列表的任何位置,尽管我们大部分会将它放在最后。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码import "fmt"
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}

如果有同时多个case去处理,比如同时有多个channel可以接收数据,那么Go会伪随机的选择一个case处理(pseudo-random)。如果没有case需要处理,则会选择default去处理,如果default case存在的情况下。如果没有default case,则select语句会阻塞,直到某个case需要处理。

需要注意的是,nil channel上的操作会一直被阻塞,如果没有default case,只有nil channel的select会一直被阻塞。

select语句和switch语句一样,它不是循环,它只会选择一个case来处理,如果想一直处理channel,你可以在外面加一个无限的for循环:

1
2
3
4
5
6
7
8
9
复制代码for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}

timeout

select有很重要的一个应用就是超时处理。 因为上面我们提到,如果没有case需要处理,select语句就会一直阻塞着。这时候我们可能就需要一个超时操作,用来处理超时的情况。
下面这个例子我们会在2秒后往channel c1中发送一个数据,但是select设置为1秒超时,因此我们会打印出timeout 1,而不是result 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码import "time"
import "fmt"
func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
}

其实它利用的是time.After方法,它返回一个类型为<-chan Time的单向的channel,在指定的时间发送一个当前时间给返回的channel中。

Timer和Ticker

我们看一下关于时间的两个Channel。
timer是一个定时器,代表未来的一个单一事件,你可以告诉timer你要等待多长时间,它提供一个Channel,在将来的那个时间那个Channel提供了一个时间值。下面的例子中第二行会阻塞2秒钟左右的时间,直到时间到了才会继续执行。

1
2
3
复制代码timer1 := time.NewTimer(time.Second * 2)
<-timer1.C
fmt.Println("Timer 1 expired")

当然如果你只是想单纯的等待的话,可以使用time.Sleep来实现。

你还可以使用timer.Stop来停止计时器。

1
2
3
4
5
6
7
8
9
复制代码timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 expired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}

ticker是一个定时触发的计时器,它会以一个间隔(interval)往Channel发送一个事件(当前时间),而Channel的接收者可以以固定的时间间隔从Channel中读取事件。下面的例子中ticker每500毫秒触发一次,你可以观察输出的时间。

1
2
3
4
5
6
复制代码ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
}
}()

类似timer, ticker也可以通过Stop方法来停止。一旦它停止,接收者不再会从channel中接收数据了。

close

内建的close方法可以用来关闭channel。

总结一下channel关闭后sender的receiver操作。
如果channel c已经被关闭,继续往它发送数据会导致panic: send on closed channel:

1
2
3
4
5
6
7
8
9
10
11
复制代码import "time"
func main() {
go func() {
time.Sleep(time.Hour)
}()
c := make(chan int, 10)
c <- 1
c <- 2
close(c)
c <- 3
}

但是从这个关闭的channel中不但可以读取出已发送的数据,还可以不断的读取零值:

1
2
3
4
5
6
7
8
复制代码c := make(chan int, 10)
c <- 1
c <- 2
close(c)
fmt.Println(<-c) //1
fmt.Println(<-c) //2
fmt.Println(<-c) //0
fmt.Println(<-c) //0

但是如果通过range读取,channel关闭后for循环会跳出:

1
2
3
4
5
6
7
复制代码c := make(chan int, 10)
c <- 1
c <- 2
close(c)
for i := range c {
fmt.Println(i)
}

通过i, ok := <-c可以查看Channel的状态,判断值是零值还是正常读取的值。

1
2
3
4
复制代码c := make(chan int, 10)
close(c)
i, ok := <-c
fmt.Printf("%d, %t", i, ok) //0, false

同步

channel可以用在goroutine之间的同步。
下面的例子中main goroutine通过done channel等待worker完成任务。 worker做完任务后只需往channel发送一个数据就可以通知main goroutine任务完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码import (
"fmt"
"time"
)
func worker(done chan bool) {
time.Sleep(time.Second)
// 通知任务已完成
done <- true
}
func main() {
done := make(chan bool, 1)
go worker(done)
// 等待任务完成
<-done
}

参考资料

  1. gobyexample.com/channels
  2. tour.golang.org/concurrency…
  3. golang.org/ref/spec#Se…
  4. github.com/a8m/go-lang…
  5. devs.cloudimmunity.com/gotchas-and…
  6. guzalexander.com/2013/12/06/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…930931932…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%