开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Golang网络 核心API实现剖析(一)

发表于 2017-12-04

这一章节我们将详细描述网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,我们选择忽略所有的错误。这样可以使得代码看起来更为简单。 而且我们只关注tcp协议实现,udp和unix socket不是我们关心的。

Listen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
复制代码func Listen(net, laddr string) (Listener, error) {
la, err := resolveAddr("listen", net, laddr, noDeadline)
......
switch la := la.toAddr().(type) {
case *TCPAddr:
l, err = ListenTCP(net, la)
case *UnixAddr:
......
}
......
}

// 对于tcp协议,返回的的是TCPListener
func ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) {
......
fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")
......
return &TCPListener{fd}, nil
}

func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) {
......
return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)
}

func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) {
// 创建底层socket,设置属性为O_NONBLOCK
s, err := sysSocket(family, sotype, proto)
......
setDefaultSockopts(s, family, sotype, ipv6only)
// 创建新netFD结构
fd, err = newFD(s, family, sotype, net)
......
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// 调用底层listen监听创建的套接字
fd.listenStream(laddr, listenerBacklog)
return fd, nil
case syscall.SOCK_DGRAM:
......
}
}
}

// 最终调用该函数来创建一个socket
// 并且将socket属性设置为O_NONBLOCK
func sysSocket(family, sotype, proto int) (int, error) {
syscall.ForkLock.RLock()
s, err := syscall.Socket(family, sotype, proto)
if err == nil {
syscall.CloseOnExec(s)
}
syscall.ForkLock.RUnlock()
if err != nil {
return -1, err
}
if err = syscall.SetNonblock(s, true); err != nil {
syscall.Close(s)
return -1, err
}
return s, nil
}

func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {
if err := setDefaultListenerSockopts(fd.sysfd)
if lsa, err := laddr.sockaddr(fd.family); err != nil {
return err
} else if lsa != nil {
// Bind绑定至该socket
if err := syscall.Bind(fd.sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
}
// 监听该socket
if err := syscall.Listen(fd.sysfd, backlog);
// 这里非常关键:初始化socket与异步IO相关的内容
if err := fd.init(); err != nil {
return err
}
lsa, _ := syscall.Getsockname(fd.sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}

我们这里看到了如何实现Listen。流程基本都很简单,但是因为我们使用了异步编程,因此,我们在Listen完该socket后,还必须将其添加到监听队列中,以后该socket有事件到来时能够及时通知到。

对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那我们看对一个监听socket,是如何将其添加到epoll的监听队列中呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码func (fd *netFD) init() error {
if err := fd.pd.Init(fd); err != nil {
return err
}
return nil
}

func (pd *pollDesc) Init(fd *netFD) error {
// 利用了Once机制,保证一个进程只会执行一次
// runtime_pollServerInit:
// TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0
// JMP runtime·netpollServerInit(SB)
serverInit.Do(runtime_pollServerInit)
// runtime_pollOpen:
// TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0
// JMP runtime·netpollOpen(SB)
ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
if errno != 0 {
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}

这里就是socket异步编程的关键:

netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;

1
2
3
4
5
6
7
8
9
10
11
12
复制代码func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 {
return
}
epfd = epollcreate(1024)
if epfd >= 0 {
closeonexec(epfd)
return
}
......
}

netpollOpen则在socket被创建出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen。

1
2
3
4
5
6
复制代码func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

OK,看到这里,我们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,然后将该socket添加到 epoll的事件监听队列中。

Accept

既然我们描述的重点的tcp协议,因此,我们看看TCPListener的Accept方法是怎么实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
复制代码func (l *TCPListener) Accept() (Conn, error) {
c, err := l.AcceptTCP()
......
}

func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
......
fd, err := l.fd.accept()
......
// 返回给调用者一个新的TCPConn
return newTCPConn(fd), nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
// 为什么对该函数加读锁?
if err := fd.readLock(); err != nil {
return nil, err
}
defer fd.readUnlock()
......
for {
// 这个accept是golang包装的系统调用
// 用来处理跨平台
s, rsa, err = accept(fd.sysfd)
if err != nil {
if err == syscall.EAGAIN {
// 如果没有可用连接,WaitRead()阻塞该协程
// 后面会详细分析WaitRead.
if err = fd.pd.WaitRead(); err == nil {
continue
}
} else if err == syscall.ECONNABORTED {
// 如果连接在Listen queue时就已经被对端关闭
continue
}
}
break
}

netfd, err = newFD(s, fd.family, fd.sotype, fd.net)
......
// 这个前面已经分析,将该fd添加到epoll队列中
err = netfd.init()
......
lsa, _ := syscall.Getsockname(netfd.sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}

OK,从前面的编程事例中我们知道,一般在主协程中会accept新的connection,使用异步编程我们知道,如果没有 新连接到来,该协程会一直被阻塞,直到新连接到来有人唤醒了该协程。

一般在主协程中调用accept,如果返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来时被唤醒,WaitRead以及唤醒过程我们会在后面仔细分析。

Read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
复制代码func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
return c.fd.Read(b)
}

func (fd *netFD) Read(p []byte) (n int, err error) {
// 为什么对函数调用加读锁
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
// 这个又是干嘛?
if err := fd.pd.PrepareRead(); err != nil {
return 0, &OpError{"read", fd.net, fd.raddr, err}
}
for {
n, err = syscall.Read(int(fd.sysfd), p)
if err != nil {
n = 0
// 如果返回EAGIN,阻塞当前协程直到有数据可读被唤醒
if err == syscall.EAGAIN {
if err = fd.pd.WaitRead(); err == nil {
continue
}
}
}
// 检查错误,封装io.EOF
err = chkReadErr(n, err, fd)
break
}
if err != nil && err != io.EOF {
err = &OpError{"read", fd.net, fd.raddr, err}
}
return
}

func chkReadErr(n int, err error, fd *netFD) error {
if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
return io.EOF
}
return err
}

Read的流程与Accept流程极其一致,阅读起来也很简单。相信不用作过多解释,自己看吧。 需要注意的是每次Read不能保证可以读到想读的那么多内容,比如缓冲区大小是10,而实际可能只读到5,应用程序需要能够处理这种情况。

Write

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
复制代码func (fd *netFD) Write(p []byte) (nn int, err error) {
// 为什么这里加写锁
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
// 这个是干什么?
if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
// nn记录总共写入的数据量,每次Write可能只能写入部分数据
for {
var n int
n, err = syscall.Write(int(fd.sysfd), p[nn:])
if n > 0 {
nn += n
}
// 如果数组数据已经全部写完,函数返回
if nn == len(p) {
break
}
// 如果写入数据时被block了,阻塞当前协程
if err == syscall.EAGAIN {
if err = fd.pd.WaitWrite(); err == nil {
continue
}
}
if err != nil {
n = 0
break
}
// 如果返回值为0,代表了什么?
if n == 0 {
err = io.ErrUnexpectedEOF
break
}
}
if err != nil {
err = &OpError{"write", fd.net, fd.raddr, err}
}
return nn, err
}

注意Write语义与Read不一样的地方:

Write尽量将用户缓冲区的内容全部写入至底层socket,如果遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时立即返回,可能会导致读取的数据量少于用户缓冲区的大小; 为什么会在实现上有此不同,我想可能read的优先级比较高吧,应用程序可能一直在等着,我们不能等到数据一直读完才返回,会阻塞用户。 而写不一样,优先级相对较低,而且用户一般也不着急写立即返回,所以可以将所有的数据全部写入,而且这样 也能简化应用程序的写法。

总结

上面我们基本说完了golang网络编程内的关键API流程,我们遗留了一个关键内容:当系统调用返回EAGAIN时,会 调用WaitRead/WaitWrite来阻塞当前协程,我会在接下来的章节中继续分析。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MesaLock Linux 开源:一个用户空间内存安全的

发表于 2017-12-04

MesaLock Linux 是一个通用 Linux 发行版本,其目标是用 Rust、Go 等内存安全语言重写用户空间应用(user space applications),以在用户空间中逐步消除高危的内存安全漏洞。这将极大的降低整个系统的攻击面,并且使得剩余的攻击面可审计、可收敛。MesaLock Linux 在保留 Linux 硬件兼容性的前提下,实质性地提升了 Linux 生态的安全性。MesaLock Linux 的主要目标应用场景是容器(例如:docker 镜像),以及高安全性嵌入式环境,以后随着逐渐成熟,也可以向服务器或其他场景延伸。

为提供完善的功能,并保证强健的安全性,MesaLock Linux 将遵循 Rust SGX SDK 项目中提出的混合代码内存安全架构三原则:

  1. 隔离并模块化由非内存安全代码编写的组件,并最小化其代码量。
  1. 由非内存安全代码编写的组件不应减弱安全模块的安全性,尤其是公共 API 和公共数据结构。
  1. 由非内存安全代码编写的组件需清晰可辨识并且易于更新。

MesaLock Linux 已经开源在 GitHub 上,地址是 https://github.com/mesalock-linux ,目前 MesaLock Linux 项目主要包括三个方面:

• mesalock-distro:提供了 MesaLock Linux 发行版的编译程序,提供了 Live ISO 和 rootfs 两种发行方式。Live ISO 可以在虚拟机中启动或者直接在设备上运行,rootfs 主要为 docker 容器使用。

• packages:这里面包括了 MesaLock Linux 提供的软件包的编译脚本,我们提供了使用内存安全的编程语言 Go 和 Rust 编写的一些常用软件,包括 shell、coreutils、findutils、文本编辑器等等。

• minit, mgetty, giproute2:我们同时提供了用 Rust/Go 编写了启动 MesaLock Linux 过程中的核心组件。

除此之外,我们也提供了相关的文档,包括编译和使用 MesaLock Linux,编写新的软件包。

因为 MesaLock Linux 提供了 Docker 容器的运行环境,如果想要简单快速的尝试 MesaLock Linux,你只需运行:

$ docker run -p 8000:8000 –rm -it mesalocklinux/mesalock-linux

以上命令将进入 MesaLock Linux 的 shell,可以开始使用 MesaLock 中的其他工具。当然,也可以试用我们提供的 demo 程序,由 Rocket(Rust 编写的 web framework)写的网络微服务。demo 存放在 /root/mesalock-demo 目录下 。因为 docker 将 container 中的 8000 端口转发到 host 机器的 8000 端口,运行 /root/mesalock-demo/rocket-hello-world/hello_world,即可以在
host 机器上访问 demo。

MesaLock Linux 项目及其子项目选择使用 BSD 开源协议下开源。BSD 开源协议是一个给于使用者很大自由的协议,我们希望有更多的人来使用 MesaLock Linux,保护整个 Linux 系统的安全生态。同时,安全生态的可持续发展也离不开开源社区、工业界、学术界的共同参与,我们非常欢迎大家的贡献和支持。支持 MesaLock Linux 的途径有很多,例如:

• 尝试使用 MesaLock Linux,给我们反馈使用感受和改进的建议等

• 参与贡献 MesaLock Linux 的开发流程、完善文档、帮助解答常见问题等

• 参与贡献 MesaLock Linux 维护的核心工具代码,比如说 minit、mgetty 等

• 尝试使用 Go、Rust 这类内存安全的编程语言编写用户空间应用,我们将非常欢迎新的应用加入 MesaLock Linux 大家庭

• 参与审计 MesaLock Linux 及其子项目和相关软件包的代码安全

如果你对于 MesaLock Linux 感兴趣,可以参与社区讨论,主要方式是 IRC 讨论组,请访问 freenode 服务器 (irc://chat.freenode.net) 中的 #mesalock-linux 频道。如果你不是非常熟悉 IRC,也可以使用 Riot (https://riot.im/app/#/room/#mesalock-linux:matrix.org),Riot 和 IRC 的讨论内容会自动同步。

注:MesaLock 的 logo 灵感来自于中国古代的孔明锁。

MesaLock Linux: A Memory-Safe Linux Distribution

MesaLock Linux is a general purpose Linux distribution which aims to provide a safe and secure user space environment. To eliminate high-severe vulnerabilities caused by memory corruption, the whole user space applications are rewritten in memory-safe
programming languages like Rust and Go. This extremely reduces attack surfaces of an operating system exposed in the wild, leaving the remaining attack surfaces auditable and restricted. Therefore, MesaLock Linux can substantially improve the
security of the Linux ecosystem. Additionally, thanks to the Linux kernel, MesaLock Linux supports a broad hardware environment, making it deployable in many places. Two main usage scenarios of MesaLock Linux are for containers and security-sensitive
embedded devices. With the growth of the ecosystem, MesaLock Linux would also be adopted in the server environment in the future.

To get better functionality along with strong security guarantees, MesaLock Linux follows the following rules-of-thumb for hybrid memory-safe architecture designing proposed by the Rust SGX SDK project.

  1. Unsafe components should be appropriately isolated and modularized, and the size should be small (or minimized).
  1. Unsafe components should not weaken the safe, especially, public APIs and data structures.
  1. Unsafe components should be clearly identified and easily upgraded.

The MesaLock Linux project is released under the BSD license and source code is on GitHub https://github.com/mesalock-linux . There are three main parts of the MesaLock Linux project:

• mesalock-distro: providing scripts to build the MesaLock Linux distributions such as building packages, Live ISO, and rootfs. Live ISO can be bootstrap in the virutal machine or real devices, while rootfs is for docker container.

• packages: this project includes all building scripts of packages. We provide many essential packages written in Rust and Go, which are memory safe programming languages. These packages includes shell, coreutils, findutils, editor, etc.

• minit, mgetty, and giproute2: providing some core components for bootstraping MesaLock Linux.

We also provide documentations of building, using MesaLock Linux and contributing new packages.

Becase MesaLock Linux provides a root filesystem for docker container, You can quickly get started using this command:

$ docker run -p 8000:8000 –rm -it mesalocklinux/mesalock-linux

This will bring you to a shell, then you can start experience MesaLock Linux. You can also try our demo program, which contains some micro web service writtin in Rocket (a web framework written in Rust). The demo programs are in /root/mesalock-demo
directory. Because the 8000 port is forwarded, you can execute /root/mesalock-demo/rocket-hello-world/hello_world and visit the demo website on your host machine.

The MesaLock Linux project is released under BSD license, which is a permissive free software license, imposing minimal restrictions on the use and redistribution. We would like to make a secure and open operating system for all people appreciate
security. In the meantime, the growth of the MesaLock Linux cannot be done without you guys regardless from the community, industry, and academia. Therefore, you are very welcomed to contribute and support MesaLock Linux. There are many ways to
support MeaLock Linux:

• Try to use MesaLock Linux, report issue, enhancement suggestions, etc

• Contribute to MesaLock Linux: optimize development process, improve documents, closing issues, etc

• Contribute to core packages of MesaLock Linux: improving minit, mgetty, giproute2, etc

• Writing applications using memory safe programming languages like Rust/Go, and joining the the MesaLock Linux packages

• Auditing source code of the MesaLock Linux projects and related packages

If you are interested in the MesaLock Linux project, please find us on the #mesalock-linux IRC channel on the freenode server at irc://chat.freenode.net and the bridged room on Matrix. If you’re not familiar with IRC, we recommend chatting through
Matrix via Riot (https://riot.im/app/#/room/#mesalock-linux:matrix.org) or via the Kiwi web IRC client (https://kiwiirc.com/client/irc.mozilla.org/#mesalock-linux).

Note: the MeasLock logo is originated from the Chinese traditional artifact - Kongming Lock.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

源码 并发一枝花之CopyOnWriteArrayList

发表于 2017-12-04

CopyOnWriteArrayList的设计思想非常简单,但在设计层面有一些小问题需要注意。

JDK版本:oracle java 1.8.0_102

本来不想写的,但是github上CopyOnWriteArrayList的code results也有165k,为了流量还是写一写吧。

实现

看两个方法你就懂了。

读元素set()

1
2
3
4
5
6
复制代码public E get(int index) {
return get(getArray(), index);
}
final Object[] getArray() {
return array;
}

get()方法直接调用内部的getArray()方法,而getArray()方法则直接返回成员变量array。

我没明白为什么要再封装一层,而不是直接访问。

array指向一个数组,是CopyOnWriteArrayList的内部数据结构:

1
复制代码private transient volatile Object[] array;

敲黑板!!!

**array是一个volatile变量,**其读、写操作具有Happends-Before关系。具体来讲,线程W1通过set()方法“修改”集合后,线程R1能立刻通过get()方法得到array的最新值。

你可以理解为volatile变量的读、写是原子的,不过,我更希望你能从顺序和可见性的角度理解理解volatile、锁等具有偏序关系的操作。volatile的原理和用法见volatile关键字的作用、原理。

写元素set()

重点是set()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码public E set(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index);

if (oldValue != element) {
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
return oldValue;
} finally {
lock.unlock();
}
}
final void setArray(Object[] a) {
array = a;
}

set()方法也很简单,两个要点:

  1. 通过锁lock保护队列修改过程
  2. 在副本上修改,最后替换array引用

按照独占锁的思路,仅仅给写线程加锁是不行的,会有读、写线程的竞争问题。但是get()中明明没有加锁,为什么也没有问题呢?

通过加锁,保证同一时间最多只有一个写线程W1进入try block;假设要设置的值与旧值不同。9-10行首先将数据复制一份(此时,没有其他写线程能进入try block修改集合),11行在副本上修改相应元素,12行修改array引用。array是volatile变量,所以写的最新值对其他读线程、写线程都是可见的。

这就是所谓的“写时复制”。

其他问题

15行volatile写的作用

实际上,15行的volatile写是多余的。这只是为了能从代码里理解到volatile写的语义,并不必要的保证什么——不过这种考虑也是不恰当的,反而使代码迷惑。一个类似的例子是addIfAbsent():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
if (indexOf(e, current, common, len) >= 0)
return false;
}
Object[] newElements = Arrays.copyOf(current, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

基本思想相同,17、19行都是直接返回,并没有做多余的“volatile写”。

在网上搜的话,还有很多其他观点。如果你认为我的观点是错误的,欢迎交流。

addIfAbsent的编码风格跟set()区别很大,不像一个人写的。需要认识到,JDK是一个发展、变化的产品,一个包、甚至一个类都可能不是同一个人、同一段时间写的,编码风格、设计思想可能发生变化;更不要假定JDK的实现一定是对的(当然,绝大部分时候是对的),要基于正确的逻辑去分析,再做判断。

为什么必须要给set加锁?

TODO 20171024

看起来,如果不给set加锁,似乎并发性能更高,一致性也没有削弱多少。未解决,欢迎交流。

设计思想

最后总结CopyOnWriteArrayList的设计思想:

  • 用并发访问“数组副本的引用”代替并发访问“数组元素的引用”,大大降低了维护线程安全的难度。
  • 当前副本可能是失效的,但一定是集合在某一瞬间的快照(一定程度上满足不变性),满足弱一致性。

本文链接:源码|并发一枝花之CopyOnWriteArrayList

作者:猴子007

出处:monkeysayhi.github.io

本文基于知识共享署名-相同方式共享 4.0国际许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名及链接。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

源码 并发一枝花之ConcurrentLinkedQueue

发表于 2017-12-04

首先声明,本文是伪源码分析。主要是基于状态机自己实现一个简化的并发队列,有助于读者掌握并发程序设计的核心——状态机;最后对源码实现略有提及。

ConcurrentLinkedQueue不支持阻塞,没有BlockingQueue那么易用;但在中等规模的并发场景下,其性能却比BlockingQueue高不少,而且相当稳定。同时,ConcurrentLinkedQueue是学习CAS的经典案例。根据github的code results排名,ConcurrentLinkedQueue(164k)也十分流行,比我想象中的使用量大多了。非常值得一讲。

对于状态机和并发程序设计的基本理解,可以参考源码|并发一枝花之BlockingQueue,建议第一次接触状态机的同学速读参考文章之后,再来阅读此文章。

JDK版本:oracle java 1.8.0_102

准备知识:CAS

读者可以跳过这部分,后面讲到offer()方法的实现时再回顾。

悲观锁与乐观锁

  • 悲观锁:假定并发环境是悲观的,如果发生并发冲突,就会破坏一致性,所以要通过独占锁彻底禁止冲突发生。有一个经典比喻,“如果你不锁门,那么捣蛋鬼就回闯入并搞得一团糟”,所以“你只能一次打开门放进一个人,才能时刻盯紧他”。
  • 乐观锁:假定并发环境是乐观的,即,虽然会有并发冲突,但冲突可发现且不会造成损害,所以,可以不加任何保护,等发现并发冲突后再决定放弃操作还是重试。可类比的比喻为,“如果你不锁门,那么虽然捣蛋鬼会闯入,但他们一旦打算破坏你就能知道”,所以“你大可以放进所有人,等发现他们想破坏的时候再做决定”。

通常认为乐观锁的性能比悲观所更高,特别是在某些复杂的场景。这主要由于悲观锁在加锁的同时,也会把某些不会造成破坏的操作保护起来;而乐观锁的竞争则只发生在最小的并发冲突处,如果用悲观锁来理解,就是“锁的粒度最小”。但乐观锁的设计往往比较复杂,因此,复杂场景下还是多用悲观锁。

首先保证正确性,有必要的话,再去追求性能。

CAS

乐观锁的实现往往需要硬件的支持,多数处理器都都实现了一个CAS指令,实现“Compare And Swap”的语义(这里的swap是“换入”,也就是set),构成了基本的乐观锁。

CAS包含3个操作数:

  • 需要读写的内存位置V
  • 进行比较的值A
  • 拟写入的新值B

当且仅当位置V的值等于A时,CAS才会通过原子方式用新值B来更新位置V的值;否则不会执行任何操作。无论位置V的值是否等于A,都将返回V原有的值。

一个有意思的事实是,“使用CAS控制并发”与“使用乐观锁”并不等价。CAS只是一种手段,既可以实现乐观锁,也可以实现悲观锁。乐观、悲观只是一种并发控制的策略。下文将分别用CAS实现悲观锁和乐观锁?
我们先不讲JDK提供的实现,用状态机模型来分析一下,看我们能不能自己实现一版。

队列的状态机模型

状态机模型与是否需要并发无关,一个类不管是否是线程安全的,其状态机模型从类被实现(此时,所有类行为都是确定的)开始就是确定的。接口是类行为的一个子集,我们从接口出发,逐渐构建出简化版ConcurrentLinkedQueue的状态机模型。

队列接口

ConcurrentLinkedQueue实现了Queue接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);

boolean offer(E e);

E remove();

E poll();

E element();

E peek();
}

需要关注的是一对方法:

  • offer():入队,成功返回true,失败返回false。JDK中ConcurrentLinkedQueue实现为无界队列,这里我们也只讨论无界的情况——因此,offer()方法必返回true。
  • poll():出队,有元素返回元素,没有就返回null。

同时,理想的线程安全队列中,入队和出队之间不应该存在竞争,这样入队的状态机模型和出队的状态机模型可以完全解耦,互不影响。

对我们的状态机作出两个假设:

  • 假设1:只支持这入队、出队两种行为。
  • 假设2:入队、出队之间不存在竞争,即入队模型与出队模型是对偶、独立的两个状态机。

从而,可以先分析入队,再参照分析出队;然后可尝试去掉假设2,看如何完善我们的实现来保证假设2成立;最后看看真·神Doug Lea如何实现,学习一波。

状态机定义

现在基于假设1和假设2,尝试定义入队模型的状态机。

我们构造一个简化的场景:存在2个生产者P1、P2,同时触发入队操作。

状态集

如果是单线程环境,入队操作将是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码// 准备
newNode.next = null;
curTail = tail;

// 入队前
assert tail == curTail && tail.next == null; // 状态S1
// 开始入队
tail.next = newNode; // 事件E1
// 入队中
assert tail == curTail && tail.next == newNode; // 状态S2
tail = tail.next; // 事件E2
// 结束入队
// 入队后
assert tail == newNode && tail.next == null; // 状态S3,合并到状态S1

该过程涉及对两个域的修改:tail.next、tail。则随着操作的进行,队列会经历2种状态:

  • 状态S1:事件E1执行前,tail指向实际的尾节点curTail,tail.next==null。如生产者P1、P2都还没有触发入队时,队列处于状态S1;生产者P1完成入队P2还没触发入队时,队列处于状态S1。
  • 状态S2:事件E1执行后、E2执行前,tail指向旧的尾节点curTail,tail.next==newNode。
  • 状态S3:事件E2执行后,tail指向新的尾节点newNode,tail.next==null。同状态S1,合并。

状态转换集

两个事件分别对应两个状态转换:

  • 状态转换T1:S1->S2,即tail.next = newNode。
  • 状态转换T2:S2->S1,即tail = tail.next。

是不是很熟悉?因为ConcurrentLinkedQueue也是队列,必然同BlockingQueue相似甚至相同。区别在于如何维护这些状态和状态转换。

自撸ConcurrentLinkedQueue

依赖CAS,两个状态转换T1、T2都可以实现为原子操作。留给我们的问题是,如何维护合法的状态转换。

入队方法offer()

入队过程需要经过两个状态转换,且这两个状态转换必须连续发生。

不严谨。“连续”并不是必要的,最后分析源码的时候会看到。不过,我们暂时使用强一致性的模型。

思路1:让同一个生产者P1连续完成两个状态转换T1、T2,保证P2不会插入进来

LinkedBlockingQueue的思路即是如此。这是一种悲观策略——一次开门只放进来一个生产者,似乎只能像LinkedBlockingQueue那样,用传统的锁putLock实现,实际上,依靠CAS也能实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
复制代码public class ConcurrentLinkedQueue1<E> {
private volatile Node<E> tail;

public ConcurrentLinkedQueue1() {
throw new UnsupportedOperationException("Not implement");
}

public boolean offer(E e) {
Node<E> newNode = new Node<E>(e, new AtomicReference<>(null));
while (true) {
Node<E> curTail = tail;
AtomicReference<Node<E>> curNext = curTail.next;
// 尝试T1:CAS设置tail.next
if (curNext.compareAndSet(null, newNode)) {
// 成功者视为获得独占锁,完成了T1。直接执行T2:设置tail
tail = curNext.get();
return true;
}
// 失败者自旋等待
}
}

private static class Node<E> {
private volatile E item;
private AtomicReference<Node<E>> next;

public Node(E item, AtomicReference<Node<E>> next) {
this.item = item;
this.next = next;
}
}
}

思路2:生产者P1完成状态转换T1后,P2代劳完成状态转换T2

再来分析下T1、T2两个状态转换:

  • T1涉及newNode,只能由封闭持有newNode的生产者P1完成
  • T2只涉及队列中的信息,任何持有队列的生产者都有能力完成。P1可以,P2也可以

思路1是悲观的,认为T1、T2必须都由P1完成,如果P2插入就会“搞破坏”。而思路2则打开大门,欢迎任何“有能力”的生产者完成T2,是典型的乐观策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
复制代码public class ConcurrentLinkedQueue2<E> {
private AtomicReference<Node<E>> tail;

public ConcurrentLinkedQueue2() {
throw new UnsupportedOperationException("Not implement");
}

public boolean offer(E e) {
Node<E> newNode = new Node<E>(e, new AtomicReference<>(null));
while (true) {
Node<E> curTail = tail.get();
AtomicReference<Node<E>> curNext = curTail.next;
// 尝试T1:CAS设置tail.next
if (curNext.compareAndSet(null, newNode)) {
// 成功者完成了T1,队列处于S2,继续尝试T2:CAS设置tail
tail.compareAndSet(curTail, curNext.get());
// 成功表示该生产者P1完成连续完成了T1、T2,队列处于S1
// 失败表示T2已经由生产者P2完成,队列处于S1
return true;
}
// 失败者得知队列处于S2,则尝试T2:CAS设置tail
tail.compareAndSet(curTail, curNext.get());
// 如果成功,队列转换到S1;如果失败,队列表示T2已经由生产者P1完成,队列已经处于S1
// 然后循环,重新尝试T1
}
}

private static class Node<E> {
private volatile E item;
private AtomicReference<Node<E>> next;

public Node(E item, AtomicReference<Node<E>> next) {
this.item = item;
this.next = next;
}
}
}

减少无效的竞争

我们涉及的状态比较少(只有2个状态),继续看看能否减少无效的竞争,比如:

  • 前两种实现的第一步都是CAS尝试T1,失败了就退化成一次探查(compare and swap中的compare)。发起CAS前,可能队列已经处于S2,这时CAS尝试T1就成了浪费,只需要探查即可。这有点像DCL单例的思路(面试中单例模式有几种写法?),可以直接通过tail.next判断队列是否处于S1,来完成一部分探查,以减少无效的竞争。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
复制代码public class ConcurrentLinkedQueue3<E> {
private AtomicReference<Node<E>> tail;

public ConcurrentLinkedQueue3() {
throw new UnsupportedOperationException("Not implement");
}

public boolean offer(E e) {
Node<E> newNode = new Node<E>(e, new AtomicReference<>(null));
while (true) {
Node<E> curTail = tail.get();
AtomicReference<Node<E>> curNext = curTail.next;

// 先检查一下队列状态的状态,tail.next==null表示队列处于状态S1,仅此时才有CAS尝试T1的必要
if (curNext.get() == null) {
// 如果处于S1,尝试T1:CAS设置tail.next
if (curNext.compareAndSet(null, newNode)) {
// 成功者完成了T1,队列处于S2,继续尝试T2:CAS设置tail
tail.compareAndSet(curTail, curNext.get());
// 成功表示该生产者P1完成连续完成了T1、T2,队列处于S1
// 失败表示T2已经由生产者P2完成,队列处于S1
return true;
}
}
// 否则队列处于处于S2,或CAS尝试T1的失败者得知队列处于S2,则尝试T2:CAS设置tail
tail.compareAndSet(curTail, curNext.get());
// 如果成功,队列转换到S1;如果失败,队列表示T2已经由生产者P1完成,队列已经处于S1
// 然后循环,重新尝试T1
}
}

private static class Node<E> {
private volatile E item;
private AtomicReference<Node<E>> next;

public Node(E item, AtomicReference<Node<E>> next) {
this.item = item;
this.next = next;
}
}
}

注意,上述实现中,while代码块后都没有返回值。这是被编译器允许的,因为编译器可以分析出,该方法不可能运行到while代码块之后,所以while代码块后的返回值语句也是无效的。

出队方法poll()

对偶的构造一个简化的场景:存在2个消费者C1、C2,同时触发出队操作。

不需要考虑悲观策略和优化方案,我们尝试基于思路2的第一种实现撸一版基础的poll()方法。

然后,,,没撸动。想了一下,朴素链表(如LinkedList)中,直接用head表示维护头结点无法区分“已取出item未移动head指针”和“未取出item未移动head指针”(同“已取出item已移动head指针”)两种状态。所以还是写一写才知道深浅啊,碰巧前两天写了BlockingQueue的分析,dummy node正好派上用场。

队列初始化如下:

1
2
3
复制代码dummy = new Node(null, null);
// tail = dummy; // 后面会用到
// head = dummy.next; // dummy.next 表示实际的头结点,但我们不需要存储它

状态机

单线程环境的出队过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码// 准备
curDummy = dummy;
curNext = curDummy.next;
oldItem = curNext.item;

// 出队前
assert dummy == curDummy && dummy.next.item == oldItem; // 状态S1
// 开始出队
dummy.next.item = null; // 事件E1
// 出队中
assert dummy == curDummy && dummy.next.item == null; // 状态S2
dummy = dummy.next; // 事件E2
// 结束出队
// 出队后
assert dummy == curNext && dummy.next.item != null; // 状态S3,合并到状态S1

状态:

  • 状态S1:事件E1执行前,dummy指向实际的dummy节点curDummy,dummy.next.item== oldItem。如消费者C1、C2都还没有触发出队时,队列处于状态S1;消费者C1完成入队C2还没触发出队时,队列处于状态S1。
  • 状态S2:事件E1执行后、E2执行前,dummy指向旧的dummy节点curDummy,dummy.next.item==null。
  • 状态S3:事件E2执行后,dummy指向新的dummy节点curNext,dummy.next.item!=null。这在本质上同状态S1是一致的,合并。

状态转换:

  • 状态转换T1:S1->S2,即dummy.next.item = null。
  • 状态转换T2:S2->S1,即dummy = dummy.next。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码public class ConcurrentLinkedQueue4<E> {
private AtomicReference<Node<E>> dummy;

public ConcurrentLinkedQueue4() {
dummy = new AtomicReference<>(new Node<>(null, null));
}

public E poll() {
while (true) {

Node<E> curDummy = dummy.get();
Node<E> curNext = curDummy.next;
E oldItem = curNext.item.get();
// 尝试T1:CAS设置dummy.next.item
if (curNext.item.compareAndSet(oldItem, null)) {
// 成功者完成了T1,队列处于S2,继续尝试T2:CAS设置dummy
dummy.compareAndSet(curDummy, curNext);
// 成功表示该消费者C1完成连续完成了T1、T2,队列处于S1
// 失败表示T2已经由消费者C2完成,队列处于S1
return oldItem;
}
// 失败者得知队列处于S2,则尝试T2:CAS设置dummy
dummy.compareAndSet(curDummy, curNext);
// 如果成功,队列转换到S1;如果失败,队列表示T2已经由消费者P1完成,队列已经处于S1
// 然后循环,重新尝试T1
}
}

private static class Node<E> {
private AtomicReference<E> item;
private volatile Node<E> next;

public Node(AtomicReference<E> item, Node<E> next) {
this.item = item;
this.next = next;
}
}
}

另一种状态机

实际上,前面的讨论有意回避了一个问题——如果入队/出队操作顺序不同,我们会构造出不同的状态机。这相当于同一个类的另一种实现,不违反前面作出的声明:

状态机模型与是否需要并发无关,一个类不管是否是线程安全的,其状态机模型从类被实现(此时,所有类行为都是确定的)开始就是确定的。

继续以出队为例,假设在单线程下,采用这样的顺序出队:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码// 准备
curDummy = dummy;
curNext = curDummy.next;
oldItem = curNext.item;

// 出队前
assert dummy == curDummy && dummy.item == null; // 状态S1

// 开始出队
dummmy = dummy.next; // 事件E1
// 出队中
assert dummy == curNext && dummy.item == oldItem; // 状态S2
dummy.item = null; // 事件E2
// 结束出队

// 出队后
assert dummy == curNext && dummy.item == null; // 状态S3,合并到状态S1

看起来,这样的操作顺序更容易定义各状态:

  • 状态S1:事件E1执行前,dummy指向实际的dummy节点curDummy,dummy.item == null。如消费者C1、C2都还没有触发出队时,队列处于状态S1;消费者C1完成入队C2还没触发出队时,队列处于状态S1。
  • 状态S2:事件E1执行后、E2执行前,dummy指向新的dummy节点curNext,dummy.item == oldItem。
  • 状态S3:事件E2执行后,dummy指向新的dummy节点curNext,dummy.item == null。显然同状态S1,合并。

状态转换:

  • 状态转换T1:S1->S2,即dummmy = dummy.next。
  • 状态转换T2:S2->S1,即dummy.item = null。

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码public class ConcurrentLinkedQueue5<E> {
private AtomicReference<Node<E>> dummy;

public ConcurrentLinkedQueue5() {
dummy = new AtomicReference<>(new Node<>(null, null));
}

public E poll() {
while (true) {

Node<E> curDummy = dummy.get();
Node<E> curNext = curDummy.next;
E oldItem = curNext.item.get();
// 尝试T1:CAS设置dummmy
if (dummy.compareAndSet(curDummy, curNext)) {
// 成功者完成了T1,队列处于S2,继续尝试T2:CAS设置dummy.item
curDummy.item.compareAndSet(oldItem, null);
// 成功表示该消费者C1完成连续完成了T1、T2,队列处于S1
// 失败表示T2已经由消费者C2完成,队列处于S1
return oldItem;
}
// 失败者得知队列处于S2,则尝试T2:CAS设置dummy.item
curDummy.item.compareAndSet(oldItem, null);
// 如果成功,队列转换到S1;如果失败,队列表示T2已经由消费者P1完成,队列已经处于S1
// 然后循环,重新尝试T1
}
}

private static class Node<E> {
private AtomicReference<E> item;
private volatile Node<E> next;

public Node(AtomicReference<E> item, Node<E> next) {
this.item = item;
this.next = next;
}
}
}

一个trick

实现上面状态机的过程中,我想出了一个针对出队操作的trick:可以去掉dummy node,用head维护头结点+一步状态转换完成出队。

对啊,我写着写着又撸出来了。。。

去掉了dummy node,那么head.item的初始状态就是非空的,下面是简化的状态机。

单线程出队的操作顺序:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码// 准备
curHead = head;
curNext = curHead.next;
oldItem = curHead.item;

// 出队前
assert head == curHead; // 状态S1

// 出队
head = head.next; // 事件E1

// 出队后
assert head == curNext; // 状态S2,合并到状态S1

出队只需要尝试head后移,成功者可从旧的头结点curHead中取出item,之后curHead将被废弃;失败者再重新尝试即可。如果在尝试前就得到了item的引用,那么E1发生后,不管成功与否,在curHead上做什么都是无所谓的了,因为事实上没有任何消费者会再去访问它。

这是一个单状态的状态机,则状态:

  • 状态S1:head指向实际的头节点curHead。队列始终处于状态S1。
  • 状态S2:head指向新的头节点curNext。同S1,合并

状态转换:

  • 状态转换T1:S1->S1,即head = head.next。

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
复制代码public class ConcurrentLinkedQueue6<E> {
private AtomicReference<Node<E>> head;

public ConcurrentLinkedQueue6() {
throw new UnsupportedOperationException("Not implement");
}

public E poll() {
while (true) {

Node<E> curHead = head.get();
Node<E> curNext = curHead.next;
// 尝试T1:CAS设置head
if (head.compareAndSet(curHead, curNext)) {
// 成功者完成了T1,队列处于S1
return curHead.item; // 只让成功者取出item
}
// 失败者重试尝试
}
}

private static class Node<E> {
private volatile E item;
private volatile Node<E> next;

public Node(E item, Node<E> next) {
this.item = item;
this.next = next;
}
}
}

其他特殊情况

前面都是基于假设2“入队、出队无竞争”讨论的。现在需要放开假设2,看如何完善已有的实现以保证假设2成立。或者如果不能保证假设2的话,如何解决竞争问题。

根据对LinkedBlockingQueue的分析,我们得知,如果底层数据结构是朴素链表,那么队列空或长度为1的时候,head、tail都指向同一个节点(或都为null),这时必然存在竞争;dummy node较好的解决了这一问题。ConcurrentLinkedQueue4是基于dummy node的方案,我们尝试在此基础上修改。

回顾dummy node的使用方法(配合ConcurrentLinkedQueue2和ConcurrentLinkedQueue4做了调整和精简):

  • 初始化链表时,创建dummy node:
    • dummy = new Node(null, null)
    • // head = dummy.next // head 为 null <=> 队列空
    • tail = dummy // tail.item 为 null <=> 队列空
  • 在队尾入队时,tail后移:
    • tail.next = new Node(newItem, null)
    • tail = tail.next
  • 在队头出队时,dummy后移,同步更新head:
    • oldItem = dummy.next.item // == head.item
    • dummy.next.item = null
    • dummy = dummy.next
    • // head = dummy.next
    • return oldItem

下面分情况讨论。

case1:队列空

队列空时,队列处于一个特殊的状态,从该状态出发,仅能完成入队相关的状态转换——通俗讲就是队列空时只允许入队操作。这时消除竞争很简单,只允许入队不允许出队即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
复制代码public class ConcurrentLinkedQueue7<E> {
private AtomicReference<Node<E>> dummy;
private AtomicReference<Node<E>> tail;

public ConcurrentLinkedQueue7() {
Node<E> initNode = new Node<E>(
new AtomicReference<E>(null), new AtomicReference<Node<E>>(null));
dummy = new AtomicReference<>(initNode);
tail = new AtomicReference<>(initNode);
// Node<E> head = dummy.get().next.get();
}

public boolean offer(E e) {
Node<E> newNode = new Node<E>(new AtomicReference<>(e), new AtomicReference<>(null));
while (true) {
Node<E> curTail = tail.get();
AtomicReference<Node<E>> curNext = curTail.next;
if (curNext.compareAndSet(null, newNode)) {
tail.compareAndSet(curTail, curNext.get());
return true;
}
tail.compareAndSet(curTail, curNext.get());
}
}

public E poll() {
while (true) {
Node<E> curDummy = dummy.get();
Node<E> curNext = curDummy.next.get();
// 既可以用 dummy.next == null (head) 判空,也可以用 tail.item == null
// 不过鉴于处于poll()方法中,使用 dummy.next 可读性更好
if (curNext == null) {
return null;
}
E oldItem = curNext.item.get();
if (curNext.item.compareAndSet(oldItem, null)) {
dummy.compareAndSet(curDummy, curNext);
return oldItem;
}
dummy.compareAndSet(curDummy, curNext);
}
}

private static class Node<E> {
private AtomicReference<E> item;
private AtomicReference<Node<E>> next;

public Node(AtomicReference<E> item, AtomicReference<Node<E>> next) {
this.item = item;
this.next = next;
}
}
}

ConcurrentLinkedQueue7需要原子的操作item和next,因此Node的item、next域都被声明为了AtomicReference。

队列空的时候:offer()方法同ConcurrentLinkedQueue2#offer(),不需要做特殊处理;poll()方法在ConcurrentLinkedQueue4#poll()的基础上,增加了32-34行的队列空检查。需要注意的是,检查必须放在队列转换的过程中,防止消费者C2第一次尝试时队列非空,但第二次尝试时队列变空(由于C1取出了唯一的元素)的情况。

case2:队列长度等于1

队列长度等于1时,入队与出队不会同时修改同一节点,这时一定不会发生竞争。分析如下。

假设存在一个生产者P1,一个消费者C1,同时触发入队/出队,队列中只有一个元素,所以只两个节点dummyNode、singleNode则此时:

1
2
3
4
5
复制代码assert dummy == dummyNode;
assert dummy.next.item == singleNode.item;

assert tail == singleNode;
assert tail.next == singleNode.next;

回顾ConcurrentLinkedQueue7的实现:

  • poll()方法修改引用dummy、singleNode.item
  • offer()方法操tail、singleNode.next

因此,由于dummy node的引入,队列长度为1时,入队、出队之间天生就不存在竞争。

小结

至此,我们从最简单的场景触发,基于状态机实现了一个支持高性能offer()、poll()方法的ConcurrentLinkedQueue7。CAS的好处暂且不表,重要的是基于状态机进行并发程序设计的思想。只有抓住其状态机的本质,才能设计出正确、高效的并发类。

如果还是没有体会到状态机的精妙之处,可以抛开状态机,并自己尝试基于乐观策略实现ConcurrentLinkedQueue。(之所以要基于乐观策略,是因为悲观策略可以认为是乐观策略的是特例,容易让人忽略其状态机的本质)

JDK实现

希望看到这里,你已经理解了ConcurrentLinkedQueue的状态机本质,因为下面就不再是本文的重点。

真·神Doug Lea的实现基于一个弱一致性的状态机:允许队列处于多种不一致的状态,通过恰当的选择“不一致的状态”,能做到用户无感;虽然增加了状态机的复杂度,但也进一步提高了性能。

网上分析文章非常多,读者可自行阅读,有一定难度。本文不打算讲解Doug Lea的实现,贴出源码仅供大家膜拜。

构造方法

常用的是默认的空构造函数:

1
2
3
4
5
6
7
8
9
10
复制代码public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
...
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
...
}

Doug Lea也使用了dummy node,不过命名为了head。初始化方法同我们实现的ConcurrentLinkedQueue7。

入队方法offer()

ConcurrentLinkedQueue7#offer()相当于ConcurrentLinkedQueue#offer()的一个特例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
if (p.casNext(null, newNode)) {
if (p != t)
casTail(t, newNode);
return true;
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}

具体来讲,ConcurrentLinkedQueue允许的多个状态大体是这样的:

  • 状态S1:一致;newNode已衔接在tail.next,但tail指向倒数第1个节点
  • 状态S2:不一致;newNode已衔接在tail.next,但tail指向倒数第2个节点
  • 状态S3:不一致;newNode已衔接在tail.next,但tail指向倒数第3个节点
  • …

状态转换的规则也随之打破——不再需要连续完成T1、T2,可以连续执行多次类T1,最后执行一次类T2。

for循环中的几个分支就是在处理这些一致和不一致的状态。我们前面定义的状态机空间中只允许状态S1、S2,因此是一个子集。增加的这些不一致的状态主要是为了减少CAS次数,进一步提高队列性能,这包含两个重要意义:

  • 降低延迟:部分入队请求不再需要走完完整的状态转换,只需要循环到tail.next.cas(null, newNode)成功。
  • 提高吞吐:之前每一次入队请求都要设置一次tail节点;目前只需要积攒几次入队,在某个新的newNode入队时,直接尝试tail.cas(t, newNode),将tail跳跃到最新的newNode。

增加这些不一致的状态是很危险的,如S3,当队列长度为1的时候,tail与head的位置存在交叉。Doug Lea牛逼之处在于,在保证正确性的前提下,不仅通过增加状态提高了性能,还减少了实际的CAS次数。

出队方法poll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;

if (item != null && p.casItem(item, null)) {
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}

分析方法类似于offer()。注意下updateHead()。

未完

本来是想分析ConcurrentLinkedQueue源码的,没想到写完状态机就3600多字了,干货却不多。前路漫漫,源码咱下回见。


本文链接:源码|并发一枝花之ConcurrentLinkedQueue【伪】

作者:猴子007

出处:monkeysayhi.github.io

本文基于知识共享署名-相同方式共享 4.0国际许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名及链接。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Apache Pulsar的多租户消息系统

发表于 2017-12-04

在之前的博客文章中,我们介绍了多个选择Apache Pulsar作为企业级实时业务所用消息解决方案的原因。后续文章中,将会深入介绍其中的一些企业级功能,例如预防数据丢失的
持久化存储、多租户功能、多地域复制,以及加密和安全。

本文将着重介绍Apache Pulsar中的多租户消息功能。多租户是指通过一个软件实例为多个租户提供服务的能力。租户是指对系统有着相同“视图”的一组用户。作为企业的消息中枢,Apache Pulsar自诞生之日起就支持多租户,因为该项目最初就是为了满足Yahoo的严格需求,而当时市面上没有任何可用的开源系统能够提供多租户功能,包括常用的日志抽象系统,例如Apache Kafka。为多个用户或职能部门创建多个Pulsar实例的做法通常是无法接受的,因为这样做会使得用户难以跨越不同部门实时分享数据,造成了隔离。

作为一种企业级的消息系统,Pulsar的多租户能力按照设计可满足下列需求:

  • 确保严苛的SLA可顺利满足
  • 保证不同租户之间的隔离
  • 针对资源利用率强制实施配额
  • 提供每租户和系统级的安全性
  • 确保低成本运维以及尽可能简单的管理

Apache Pulsar通过下列方式满足了上述需求:

  • 通过为每个租户进行身份验证、授权和ACL(访问控制列表)获得所需安全性。
  • 为每个租户强制实施存储配额。
  • 以策略的方式定义所有隔离机制,策略可在运行过程中更改,借此降低运维成本并简化管理工作。

Pulsar简介

为了帮助大家更好地理解Pulsar的多租户能力,首先简单看看Pulsar的消息模型。

与很多其他发布-订阅系统类似,将数据送入Pulsar的应用程序可叫做生产者(Producer),使用来自Pulsar的数据的应用程序则可叫做消费者(Consumer)。消费者应用程序有时候也可称之为订阅者(Subscriber)。与一般意义上的发布-订阅者式类似,主题(Topic)同样也是Pulsar最核心的消息构造。大致来说,主题可以代表供生产者添加数据的渠道,而消费者可以从主题中拉取数据。一组消费者可以针对某一主题组成一个订阅。不同的消费者组可以针对同一个主题选择自己首选的消息消费者式:独享(Exclusive)、共享(Shared)或故障转移(Failover)。不同订阅模式如图1所示。


图1:Pulsar的订阅模式:独享、共享和故障转移

Pulsar从设计之初就可以支持多租户。因此主题可按照与多租户有关的两个资源进行组织:资产(Property)和名称空间(Namespace)。资产代表系统中的租户,租户可以在自己的资产内配置多个名称空间,每个名称空间可包含任意数量个主题。名称空间是Pulsar中每个租户最基本的管理单位,用户可针对名称空间设置ACL,调整副本数目设置,管理跨集群的消息数据多地域复制,控制消息的过期,并执行其他重要的运维操作。


图2:一个Pulsar部署中包含了三个相互独立的租户

如果希望进一步了解Pulsar,建议阅读Pulsar简介一文。下文将进一步谈谈Pulsar实现多租户能力所用的机制。

安全性

为了顺利实现多租户能力,首先需要确保每个租户:(a) 只能访问自己有权访问的主题,并且 (b) 不能访问自己本不应看到或访问的主题。这是通过一种插接式(Pluggable)的身份验证和授权机制实现的。

在Pulsar中,当客户端连接到消息Broker后,Broker会使用身份验证插件为该客户端创建身份,随后(可能)为该客户端分配角色令牌。该角色令牌是一个字符串,例如admin或application-1,可代表一个或多个客户端。角色令牌可用于控制客户端针对特定主题进行生产或消费操作的权限,并可用于管理租户
资产的配置。

默认情况下Pulsar可支持两种身份验证提供程序:TLS client auth和Athenz,后者是由Yahoo开发的身份验证系统。用户也可实现自己的身份验证提供程序,详情可参阅Pulsar的
文档。

身份验证提供程序识别出某个客户端的角色令牌后,Pulsar Broker会使用一个授权提供程序来确定该客户端有权执行什么操作。授权是在资产层面上管理的,这意味着一个Pulsar集群中可以使用多个同时活跃的授权架构(Scheme)。例如,用户可以创建一个shopping资产并为其设置一组角色,将其应用给企业中所用的购物应用程序;并创建一个inventory资产,仅将其应用给库存应用程序。权限是在
名称空间的层面上管理的,也就是在资产内部管理的。我们可以针对一个名称空间,为特定角色的一系列操作,例如produce和consume分配权限。有关如何在资产层面上配置授权并为名称空间分配权限的详情,请参阅Pulsar的
文档。

最后,身份验证和授权实现了租户间的隔离,租户无法访问自己无权访问的主题或执行无权限的操作。下文一起看看Pulsar如何针对租户进行资源隔离以满足租户对SLA的要求。

隔离

除了通过隔离满足安全方面的需求,多租户应用程序还需要满足SLA的要求,为此Pulsar还针对健壮性和性能进行了隔离。这是通过软隔离实现的,例如磁盘配额、流控制、限流调节。此外还有硬隔离,例如将某些租户隔离在提供服务的某个Broker子网内部,并使用BookKeeper bookie实现存储隔离。

在介绍具体的隔离机制前,先来看看Apache Pulsar集群到底是什么样的。图3展示了一个典型的安装环境。Pulsar集群包含一组Broker(用于服务发布-订阅流量)、Bookie(用于消息存储),以及一个负责整体协调和配置管理的Apache ZooKeeper。Pulsar Broker是负责接收和交付消息的组件,Bookie则是为最终消费前的消息提供持久存储的Apache BookKeeper服务器。


图3:一个典型的Apache Pulsar环境。

软隔离

Broker和Bookie通常是被多个生产者和消费者共享的物理资源。为了保护租户并满足SLA要求,Pulsar在Broker和Bookie方面提供了多种不同机制。

存储

Apache Pulsar使用Apache BookKeeper作为消息的持久存储系统。Apache BookKeeper中的每个Bookie通常可高效地为成百上千个Ledger(每个Ledger是对一个主题创建的一个片段)提供服务。BookKeeper能够实现这样的效率主要是因为它在设计上就考虑到了I/O隔离的需求。每个Bookie都有自己专用的
日志(Journal)(位于自己专用的磁盘驱动器上),借此通过聚合的方式处理所有添加进来的写操作。随后消息会定期在后台清空(Flush),并存储到专用的存储磁盘驱动器中。这样的I/O架构能实现读写操作的隔离,这意味着租户可以用尽可能快的速度读取,获得存储设备所能提供的最大化I/O性能,同时不至于影响到写操作的吞吐率和延迟。

除了I/O隔离,不同租户还可为不同名称空间配置不同的存储配额。Pulsar还可让租户在配额耗尽后继续执行指定的操作,例如阻止继续生产消息,抛出异常,或丢弃老的消息。

Pulsar的Broker

除了Bookie层面上采取的机制,为了满足SLA要求,Pulsar还在Broker层面提供了不同的机制。首先,Pulsar Broker中的一切事务均可异步进行,此外还可对每个Broker所能使用的内存数量设置上限。如果Broker的CPU或内存用量超限,可在很短的时间内将流量(手工或自动)迁移至负载不那么高的Broker。每个Pulsar Broker中的负载管理器组件就是专门做这件事的。

此外还要注意,为满足SLA要求,Pulsar可以快速在Broker之间迁移流量,因为该系统的服务层和存储层是分开的。这样Broker就可以真正实现无状态的特征。与其他消息系统不同,其他系统中的消息分区只能存储在Broker组成的子集中,而Pulsar的Broker无需在本地存储任何数据。将主题从一个Broker到另一个Broker的开销实现了最小化,因此流量可极为快速地再平衡,并能为租户提供更迅速的保护。

其次,消息的生产和消费端均部署了流控制协议。在生产端,租户可以为Broker和Bookie处于传输过程中的消息数量配置限制,这样就可以抑制用户以超出系统容纳速度的方式发布消息。在消费端,租户可以针对Broker交付给消费者的未完成消息数量进行限制。

最后,在消费端,Pulsar还可将交付给消费者的消息数量限流调节为指定的速率。这样即可防止消费者以超出系统处理速度的方式消费消息。

所有这些软件机制确保了生产者和消费者的SLA都可妥善满足。

硬隔离

上述机制主要是为了确保Pulsar能够在满足租户SLA要求的前提下高效地共享资源(Broker和Bookie)。然而在某些情况下,应用程序还需要对物理资源进行隔离。Pulsar可通过选项将某些租户或名称空间隔离到Broker的某个子集中,借此满足需求。这样即可确保这些租户或名称空间可以全面使用Broker子集所具备的全部资源。

该选项还可用于对不同配置进行实验、调试,或快速响应生产环境中出现的非预期情况。例如,某个用户可能会触发Broker执行糟糕的行为,进而导致其他租户的性能受到影响。此时即可将这个租户物理隔离到某个不为其他租户流量提供服务的Broker子集中,直到通过部署修复程序顺利解决这种情况后再取消隔离。

除了在Broker上对流量进行物理隔离,还可以对用于存储消息的Bookie的流量进行隔离。为此可针对名称空间配置必要的放置策略(Placement policy)。

Pulsar使用的这些机制可以看作针对不同租户提供的多集群环境的轻量级版本,但实际上,通常并不需要分别设置这一切。借此即可实现类似于单一集群的物理隔离,同时可简化运维工作。

结论

Apache Pulsar是一种真正的多租户消息系统,可在不同资源之间提供不同程度的隔离。本文介绍了Pulsar用于实现多租户能力的各种机制,包括通过身份验证和授权实现安全隔离,通过流控制、限流调节和存储配额实现共享物理资源的隔离,以及通过放置策略实现物理资源的隔离。希望本文可以帮助大家更好地理解Apache Pulsar及其多租户企业级功能。后续文章还将进一步介绍Apache Pulsar的另一项企业级功能:多地域复制。

如果对Pulsar感兴趣,可通过下列方式参与Pulsar社区:

  • Pulsar Slack频道,可自行在这里注册:apache-pulsar.herokuapp.com/。
  • Pulsar邮件列表。

有关Apache Pulsar项目的更多常规信息,可访问官网:pulsar.incubator.apache.org/,并可关注该项目的Twitter帐号:@apache_pulsar。

作者:Matteo Merli and Sijie Guo,阅读英文原文:Multi-tenant messaging with Apache Pulsar

感谢杜小芳对本文的审校。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

实战(一)之使用自带urllib和re正则表达式获取电影详情

发表于 2017-12-04

一、关于urllib库

网上urllib库的教程文章非常多而且详细具体,我就不过多叙述和重复了,个人更加倾向于将精力放在实战上,而不是反反复复拷贝教程(其实是因为我懒),来几篇干货参考。

python2:

  1. Python爬虫入门三之Urllib库的基本使用
  2. Python爬虫入门四之Urllib库的高级用法

python3:

3.python爬虫从入门到放弃(三)之 Urllib库的基本使用

正则表达式re:

1.菜鸟教程–python正则表达式

2.python爬虫从入门到放弃(五)之 正则的基本使用

二、思路:

以爬取www.ygdy8.com这个电影下载网站为例子。

1.打开浏览器,搜狗或者chrome都可以,访问该网站,以爬取最新电影这个栏目的电影为例,我们可以找到这个板块首页的url为:
http://www.ygdy8.com/html/gndy/dyzz/index.html,
再点击下一页,会发现页码的url为
http://www.ygdy8.com/html/gndy/dyzz/list\_23\_2.html,
由此可知,每一页的url为:
http://www.ygdy8.com/html/gndy/dyzz/list\_23\_+**页数**.html

image.png

2.在将鼠标移动到电影名字上右键,审查元素,这里易会发现电影详情页的链接为****

image.png[将鼠标移动到末页字眼右键,审查元素,这里会发现总共有167页。
image.png

3.根据上面的分析查找,基本可以按照这样的思路进行爬取了。

1.先请求index最新电影板块首页,获取到最大的页数。
2.循环构造页码链接,请求获取所有的带有电影列表的页面。
3.在获取到页面基础上,获得电影列表的所有电影详情页链接,为下一步提取电影的标题,迅雷链接做准备。

三、开始动手

1、构造一个urllib请求页面的opener,尽量模仿浏览器,减少被禁的风险。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码# -*- coding:utf-8 -*-
#!/usr/bin python
import urllib.request
import re
import time
import random
from decorator import fun_timer

#构建一个模仿浏览器的opener
base_url = 'http://www.ygdy8.com'
#浏览器请求头,这里用的是搜狗浏览器
user_agents=[('User-Agent','Mozilla/5.0(Macintosh;U;IntelMacOSX10_6_8;
en-us)AppleWebKit/534.50(KHTML,likeGecko)Version/5.1Safari/534.50'), ]
Headers = [ random.choice(user_agents),
("Connection","keep-alive") ]
#构建opener
opener = urllib.request.build_opener()
#添加请求头
opener.addheaders=Headers

2、构造请求首页,获取最大页码数的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码# 1、获取最大的页码数
# 2、你可以到页面查看最大页码数,但是页码数是会不断增加的,
#如果你要自动化反复去爬取,就需要进行页码数的自动获取。

def get_pageNum():
url = "http://www.ygdy8.com/html/gndy/dyzz/index.html"
data = opener.open(url).read()
#上面的分析已经知道页码的链接是以 list_23_页数.html结尾的,可以匹配正则表达式来查找。
res = re.findall(r'list\_23\_[0-9]*', str(data))
if res:
page_num = res[-1].split("_")[-1]
print('获取到页码数%s'%page_num)
return int(page_num)

3、知道页数后,就可以循环构造链接进行爬取了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码#根据页数爬取每页的电影列表
def find_by_page(page = 1):
if page%10==0:
#每隔10页进行延时,防止ip被禁
time.sleep(random.randint(1,3))
url = "http://www.ygdy8.com/html/gndy/dyzz/list_23_%s.html" % (str(page))
data = opener.open(url).read()

# 正则表达式获取链接
#因为页面的左侧会有其他类别电影板块的外链,所以要先定位到class为co_content8的目标板块,
#再获取板块里的所有链接。

res = re.findall(r'class\=\"co\_content8\"[\s\S]*\<\/ul\>', str(data))[0]
res = re.findall(r'\/html\/gndy[/a-z]+[0-9/]{1,}\.html', str(res))
urls = [base_url + url for url in res]
print('爬取第%s页: '%page+str(urls))
return urls


#汇总所有的列表,上面的函数是每页返回一个电影详情页链接列表

def get_urls(pages=1):
all_url = []
for page in range(1,pages+1):
for r in find_by_page(page):
all_url.append(r)
#去重
all_url= set(all_url)
print('获取到%s条链接'%len(all_url))
return all_url

4、最后开始爬取并输出到TXT文件上。

1
2
3
4
5
6
7
复制代码if __name__ == "__main__":
out = ''
for url in get_urls(get_pageNum()):
url = str(url)+'\n'
out= out+url
with open('all_urls.txt','w') as f:
f.write(out)

image.png

四、总结

其实总结起来,简单小规模的爬虫无非就是分析页面—请求页面–获取目标元素–保存等若干步骤,但是真正动起手来,就会遇到各种问题,解决问题也是一个进步的过程。
解决ip被禁有很多种途径,例如使用代理ip,模拟登陆,随机延时请求,本次我是简单粗暴地每隔10页进行随机的延时。
这样我们获取到所有目标电影的详情页链接,下一步就是进行详情页的分析爬取了,这涉及到动态加载页面的爬取,将放到后面几篇分享哈,下一篇,将介绍使用第三方库requests库和xpath匹配爬取详情页链接哟。

201711042223224161.jpg)](/html/gndy/dyzz/20171112/55519.html)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

IAS2017互联网架构峰会(实录) 前言 写在前面 第一天

发表于 2017-12-04

前言

“架构是什么?不同的领域、不同的视角会有不同的答案。如果加上了“互联网”三个字,那么她的内涵就更加丰富了。”这是ias官方的介绍词,本次活动很意外地在南京举行了,虽然相比其他城市的学术峰会来讲,参加的来宾不算多,但主讲嘉宾足够重量,也许这意味着南京互联网企业开始进入蓬勃发展期了。

写在前面

先聊聊我对这2天的总结吧:
今年大部分的主讲嘉宾都分享服务治理、服务发现、云原生、微服务、熔断、异地多活、弹性扩容、压测、分布式日志收集、docker、k8s实践等。

关于服务器治理,可能我们普遍以为spring cloud使用率并不高,而事实上包括mobike等多个公司使用了spring cloud。关于dubbo框架,大家都知道阿里停止更新好久了,可是,最近听说最近阿里也要重启该框架了,看来服务器治理真的要火。

很多人可能会说前几年的技术我们都还没普及,一下子又来了一大堆新技术,不得不说,作为各个行业的领头军,他们的技术水平可能甩了我们很多小公司几千条街。但我觉得,对于我们,只需要从中汲取能应用到自己公司的技术即可。

第一天

可能是主办方对来的嘉宾有点低估,或者是因为南京很少有这种峰会,人来的特别多,主会场后面站了有两三排人。

中台架构支撑企业向互联网架构升级

赵勇 阿里云架构师
赵老师主要给我们介绍了阿里的中台架构,主要意思是将原来的淘宝、天猫、1688等烟囱式架构,改造为中台服务,具体功能开发时,调用底层中台接口即可。

大型互联网支付平台的演进历程

陈斌 易宝支付CTO,前eBay/PayPal资深架构师,《架构即未来》译者
记得陈老师说的我觉得比较自豪的话是,以前他们有30+的运维人数,现在运维只有七八人。每年并发流量都是疯狂增长,而下一年预算时,只需要前一年的80%费用用来采购机器。这些都得益于DevOps的落地。

摩拜微服务架构演进

郑长帅 摩拜单车架构师
郑老师给我们讲,摩拜内部服务器治理使用的是spring cloud,而且用的很得心应手。

饿了么交易系统应用架构演进

石佳宁 饿了么研发总监
饿了么服务器治理用的也是spring cloud。

58速运智能派单系统的架构演进

胡显波 58到家技术中心后端架构负责人

途牛系统架构优化实践

赵国光 途牛旅游网首席架构师
赵老师给我的感觉就是比较牛,比较实在,本来以为途牛的业务不是非常复杂,经赵老师这么一讲,原来旅游的学问是那么的多。

第二天

技术架构演变全景图- 从单体式到云原生

张亮 当当网首席架构师
曹祖鹏 千米网首席架构师
为什么会两位老师,说来很搞笑,曹老师跟我们说这也是他的一种尝试,前一天他看了一晚上郭德纲视频,就是为了第二天能给我们带来不一样的感觉。

小米弹性调度平台Ocean——从PaaS到DCOS

孙寅 小米资深架构师

新语言,新思维(解读一个并发问题的7种实现)

陶召胜 华为 资深技术专家
陶老师讲了好多种语言,可以看出陶老师是个全栈工程师啊,可是在场的大部分是java工程师,所以很多人不太懂。

高可用架构演进与实践

孙朝南 Uber UberEATS技术负责人

京东京麦开放平台网关与消息高可用架构

王新栋 京东商城资深架构师

大促备战新模式-从性能确定性到资源容量确定性

隐寒 阿里巴巴技术专家

隐寒老师给我们讲了阿里的内部全链路压测,大概实现原理是,压测的时候给全国各地cdn节点发送压测命令,然后在压测的请求中加标识,所有标识为压测的请求会被中间件抽取,然后所有的读写都是在 阿里所谓的影子表、或者影子key中,并不会给阿里真实的用户数据造成污染。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python数据结构:字典

发表于 2017-12-03

这一篇是《流畅的 python》读书笔记。主要介绍:

  • 常见的字典方法
  • 如何处理查不到的键
  • 标准库中 dict 类型的变种
  • 散列表的工作原理

泛映射类型

collections.abc 模块中有 Mapping 和 MutableMapping 这两个抽象基类,它们的作用是为 dict 和其他类似的类型定义形式接口。

标准库里所有映射类型都是利用 dict 来实现的,它们有个共同的限制,即只有可散列的数据类型才能用做这些映射里的键。

问题: 什么是可散列的数据类型?

在 python 词汇表(docs.python.org/3/glossary.…)中,关于可散列类型的定义是这样的:

如果一个对象是可散列的,那么在这个对象的生命周期中,它的散列值是不变的,而且这个对象需要实现 __hash__() 方法。另外可散列对象还要有 __eq__() 方法,这样才能跟其他键做比较。如果两个可散列对象是相等的,那么它们的散列只一定是一样的

根据这个定义,原子不可变类型(str,bytes和数值类型)都是可散列类型,frozenset 也是可散列的(因为根据其定义,frozenset 里只能容纳可散列类型),如果元组内都是可散列类型的话,元组也是可散列的(元组虽然是不可变类型,但如果它里面的元素是可变类型,这种元组也不能被认为是不可变的)。

一般来讲,用户自定义的类型的对象都是可散列的,散列值就是它们的 id() 函数的返回值,所以这些对象在比较的时候都是不相等的。(如果一个对象实现了 eq 方法,并且在方法中用到了这个对象的内部状态的话,那么只有当所有这些内部状态都是不可变的情况下,这个对象才是可散列的。)

根据这些定义,字典提供了很多种构造方法,docs.python.org/3/library/s… 这个页面有个例子来说明创建字典的不同方式。

1
2
3
4
5
6
7
复制代码>>> a = dict(one=1, two=2, three=3)
>>> b = {'one': 1, 'two': 2, 'three': 3}
>>> c = dict(zip(['one', 'two', 'three'], [1, 2, 3]))
>>> d = dict([('two', 2), ('one', 1), ('three', 3)])
>>> e = dict({'three': 3, 'one': 1, 'two': 2})
>>> a == b == c == d == e
True

除了这些方法以外,还可以用字典推导的方式来建造新 dict。

字典推导

自 Python2.7 以来,列表推导和生成器表达式的概念就移植到了字典上,从而有了字典推导。字典推导(dictcomp)可以从任何以键值对作为元素的可迭代对象中构建出字典。

比如:

1
2
3
4
复制代码>>> data = [(1, 'a'), (2, 'b'), (3, 'c')]
>>> data_dict = {num: letter for num, letter in data}
>>> data_dict
{1: 'a', 2: 'b', 3: 'c'}

常见的映射方法

下表为我们展示了 dict、defaultdict 和 OrderedDict 的常见方法(后两种是 dict 的变种,位于 collections模块内)。

  • default_factory 并不是一个方法,而是一个可调用对象,它的值 defaultdict 初始化的时候由用户设定。
  • OrderedDict.popitem() 会移除字典最先插入的元素(先进先出);可选参数 last 如果值为真,则会移除最后插入的元素(后进先出)。
  • 用 setdefault 处理找不到的键
    当字典 d[k] 不能找到正确的键的时候,Python 会抛出异常,平时我们都使用d.get(k, default) 来代替 d[k],给找不到的键一个默认值,还可以使用效率更高的 setdefault
1
2
3
4
5
复制代码my_dict.setdefault(key, []).append(new_value)
# 等同于
if key not in my_dict:
my_dict[key] = []
my_dict[key].append(new_value)

这两段代码的效果一样,只不过,后者至少要进行两次键查询,如果不存在,就是三次,而用 setdefault 只需一次就可以完成整个操作。

那么,我们取值的时候,该如何处理找不到的键呢?

映射的弹性查询

有时候,就算某个键在映射里不存在,我们也希望在通过这个键读取值的时候能得到一个默认值。有两个途径能帮我们达到这个目的,一个是通过 defaultdict 这个类型而不是普通的 dict,另一个是给自己定义一个 dict 的子类,然后在子类中实现 __missing__ 方法。

defaultdict:处理找不到的键的一个选择

首先我们看下如何使用 defaultdict :

1
2
3
4
复制代码import collections

index = collections.defaultdict(list)
index[new_key].append(new_value)

这里我们新建了一个字典 index,如果键 new_key 在 index 中不存在,表达式 index[new_key] 会按以下步骤来操作:

  1. 调用 list() 来建立一个新的列表
  2. 把这个新列表作为值,’new_key’ 作为它的键,放入 index 中
  3. 返回这个列表的引用。

而这个用来生成默认值的可调用对象存放在名为 default_factory 的实例属性中。

defaultdict 中的 default_factory 只会在 getitem 里调用,在其他方法中不会发生作用。比如 index[k] 这个表达式会调用 default_factory 创造的某个默认值,而 index.get(k) 则会返回 None。(这是因为特殊方法 missing 会在 defaultdict 遇到找不到的键的时候调用 default_factory,实际上,这个特性所有映射方法都可以支持)。

特殊方法 missing

所有映射在处理找不到的键的时候,都会牵扯到 missing 方法。但基类 dict 并没有提供 这个方法。不过,如果有一个类继承了 dict ,然后这个继承类提供了 missing 方法,那么在 getitem 碰到找不到键的时候,Python 会自动调用它,而不是抛出一个 KeyError 异常。

__missing__ 方法只会被 __getitem__ 调用。提供 missing 方法对 get 或者 contains(in 运算符会用到这个方法)这些方法的是有没有影响。

下面这段代码实现了 StrKeyDict0 类,StrKeyDict0 类在查询的时候把非字符串的键转化为字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码class StrKeyDict0(dict): # 继承 dict
def __missing__(self, key):
if isinstance(key, str):
# 如果找不到的键本身就是字符串,抛出 KeyError
raise KeyError(key)
# 如果找不到的键不是字符串,转化为字符串再找一次
return self[str(key)]
def get(self, key, default=None):
# get 方法把查找工作用 self[key] 的形式委托给 __getitem__,这样在宣布查找失败钱,还能通过 __missing__ 再给键一个机会
try:
return self[key]
except KeyError:
# 如果抛出 KeyError 说明 __missing__ 也失败了,于是返回 default
return default
def __contains__(self, key):
# 先按传入的键查找,如果没有再把键转为字符串再找一次
return key in self.keys() or str(key) in self.keys()

contains 方法存在是为了保持一致性,因为 k in d 这个操作会调用它,但我们从 dict 继承到的 contains 方法不会在找不到键的时候用 missing 方法。

my_dict.keys() 在 Python3 中返回值是一个 “视图”,”视图”就像是一个集合,而且和字典一样速度很快。但在 Python2中,my_dict.keys() 返回的是一个列表。 所以 k in my_dict.keys() 操作在 python3中速度很快,但在 python2 中,处理效率并不高。

如果要自定义一个映射类型,合适的策略是继承 collections.UserDict 类。这个类就是把标准 dict 用 python 又实现了一遍,UserDict 是让用户继承写子类的,改进后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码import collections

class StrKeyDict(collections.UserDict):

def __missing__(self, key):
if isinstance(key, str):
raise KeyError(key)
return self[str(key)]

def __contains__(self, key):
# 这里可以放心假设所有已经存储的键都是字符串。因此只要在 self.data 上查询就好了
return str(key) in self.data

def __setitem__(self, key, item):
# 这个方法会把所有的键都转化成字符串。
self.data[str(key)] = item

因为 UserDict 继承的是 MutableMapping,所以 StrKeyDict 里剩下的那些映射类型都是从 UserDict、MutableMapping 和 Mapping 这些超类继承而来的。

Mapping 中提供了 get 方法,和我们在 StrKeyDict0 中定义的一样,所以我们在这里不需要定义 get 方法。

字典的变种

在 collections 模块中,除了 defaultdict 之外还有其他的映射类型。

  • collections.OrderedDict
  • collections.ChainMap
  • collections.Counter

不可变的映射类型

问题:标准库中所有的映射类型都是可变的,如果我们想给用户提供一个不可变的映射类型该如何处理呢?

从 Python3.3 开始 types 模块中引入了一个封装类名叫 MappingProxyType。如果给这个类一个映射,它会返回一个只读的映射视图(如果原映射做了改动,这个视图的结果页会相应的改变)。例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码>>> from types import MappingProxy Type
>>> d = {1: 'A'}
>>> d_proxy = MappingProxyType(d)
>>> d_proxy
mappingproxy({1: 'A'})
>>> d_proxy[1]
'A'
>>> d_proxy[2] = 'x'
Traceback(most recent call last):
File "<stdin", line 1, in <module>
TypeError: 'MappingProxy' object does not support item assignment
>>> d[2] = 'B'
>>> d_proxy[2] # d_proxy 是动态的,d 的改动会反馈到它上边
'B'

字典中的散列表

散列表其实是一个稀疏数组(总有空白元素的数组叫稀疏数组),在 dict 的散列表中,每个键值都占用一个表元,每个表元都有两个部分,一个是对键的引用,另一个是对值的引用。因为所有表元的大小一致,所以可以通过偏移量来读取某个表元。
python 会设法保证大概有1/3 的表元是空的,所以在快要达到这个阈值的时候,原有的散列表会被复制到一个更大的空间。

如果要把一个对象放入散列表,那么首先要计算这个元素的散列值。
Python内置的 hash() 方法可以用于计算所有的内置类型对象。

如果两个对象在比较的时候是相等的,那么它们的散列值也必须相等。例如 1==1.0 那么,hash(1) == hash(1.0)

散列表算法

为了获取 my_dict[search_key] 的值,Python 会首先调用 hash(search_key) 来计算 search_key 的散列值,把这个值的最低几位当做偏移量在散列表中查找元。若表元为空,抛出 KeyError 异常。若不为空,则表元会有一对 found_key:found_value。
这时需要校验 search_key == found_key,如果相等,返回 found_value。
如果不匹配(散列冲突),再在散列表中再取几位,然后处理一下,用处理后的结果当做索引再找表元。 然后重复上面的步骤。

取值流程图如下:

字典取值流程图

添加新值和上述的流程基本一致,只不过对于前者,在发现空表元的时候会放入一个新元素,而对于后者,在找到相应表元后,原表里的值对象会被替换成新值。

另外,在插入新值是,Python 可能会按照散列表的拥挤程度来决定是否重新分配内存为它扩容,如果增加了散列表的大小,那散列值所占的位数和用作索引的位数都会随之增加

字典的优势和限制

1、键必须是可散列的

可散列对象要求如下:

  • 支持 hash 函数,并且通过__hash__() 方法所得的散列值不变
  • 支持通过 eq() 方法检测相等性
  • 若 a == b 为真, 则 hash(a) == hash(b) 也为真

2、字典开销巨大

因为字典使用了散列表,而散列表又必须是稀疏的,这导致它在空间上效率低下。

3、键查询很快

dict 的实现是典型的空间换时间:字典类型由着巨大的内存开销,但提供了无视数据量大小的快速访问。

4、键的次序决定于添加顺序

当往 dict 里添加新键而又发生散列冲突时,新建可能会被安排存放在另一个位置。

5、往字典里添加新键可能会改变已有键的顺序

无论何时向字典中添加新的键,Python 解释器都可能做出为字典扩容的决定。扩容导致的结果就是要新建一个更大的散列表,并把原有的键添加到新的散列表中,这个过程中可能会发生新的散列冲突,导致新散列表中次序发生变化。
因此,不要对字典同时进行迭代和修改。

总结

这一篇主要介绍了:

  • 常见的字典方法
  • 如何处理查不到的键
  • 标准库中 dict 类型的变种
  • 散列表的工作原理
  • 散列表带来的潜在影响

参考链接

  • docs.python.org/3/glossary.…
  • docs.python.org/3/library/s…

最后,感谢女朋友支持。

欢迎关注(April_Louisa) 请我喝芬达
欢迎关注 请我喝芬达

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【项目管理】Mybatis-Generator之最完美配置详

发表于 2017-12-03

小伙伴们注意了,全哥花了一些时间,重新整理了一个最完美的Mybatis Generator (简称MBG)配置文件详细说明,老板再也不用担心我的学习!!!

不用多说,直接上代码:

****<!DOCTYPEgeneratorConfiguration

   PUBLIC “-//mybatis.org//DTD MyBatis GeneratorConfiguration 1.0//EN”

       “http://mybatis.org/dtd/mybatis-generator-config\_1\_0.dtd"*>***

**

<generatorConfiguration>

*<!– 可以用于加载配置项或者配置文件,在整个配置文件中就可以使用${propertyKey}的方式来引用配置项*

resource:配置资源加载地址,使用resource,MBG从classpath开始找,比如com/myproject/generatorConfig.properties

url:配置资源加载地质,使用URL的方式,比如file:///C:/myfolder/generatorConfig.properties.

注意,两个属性只能选址一个;

另外,如果使用了mybatis-generator-maven-plugin,那么在pom.xml中定义的properties都可以直接在generatorConfig.xml中使用

*

–>*

*<!– 在MBG工作的时候,需要额外加载的依赖包

location属性指明加载jar/zip包的全路径  

–>*

*<!–

context:生成一组对象的环境  

id:必选,上下文id,用于在生成错误时提示  

defaultModelType:指定生成对象的样式  

    1,conditional:类似hierarchical;  

    2,flat:所有内容(主键,blob)等全部生成在一个对象中;  

    3,hierarchical:主键生成一个XXKey对象(key class),Blob等单独生成一个对象,其他简单属性在一个对象中(record

class)

targetRuntime:  

    1,MyBatis3:默认的值,生成基于MyBatis3.x以上版本的内容,包括XXXBySample;  

    2,MyBatis3Simple:类似MyBatis3,只是不生成XXXBySample;  

introspectedColumnImpl:类全限定名,用于扩展MBG  

–>*

<contextid=”mysql”
defaultModelType=”hierarchical”
targetRuntime=”MyBatis3Simple”
>
*<!– 自动识别数据库关键字,默认false,如果设置为true,根据SqlReservedWords中定义的关键字列表;

   一般保留默认值,遇到数据库关键字(Java关键字),使用columnOverride覆盖  

-->* <**propertyname="autoDelimitKeywords"

value=”false”**/>

** <propertyname=”javaFileEncoding”
value=”UTF-8”
/>

** <propertyname=”javaFormatter”
value=”org.mybatis.generator.api.dom.DefaultJavaFormatter”
/>

** <propertyname=”xmlFormatter”
value=”org.mybatis.generator.api.dom.DefaultXmlFormatter”
/>

** <propertyname=”beginningDelimiter”
value=”`”
/>

<propertyname=”endingDelimiter”
value=”`”
/>

*<!– 必须要有的,使用这个配置链接数据库

@****TODO:是否可以扩展*** *-->*

<jdbcConnectiondriverClass=”com.mysql.jdbc.Driver”connectionURL=”jdbc:mysql:///pss”userId=”root”
password=”admin”
>
** </jdbcConnection>

*<!– java类型处理器

用于处理DB中的类型到Java中的类型,默认使用JavaTypeResolverDefaultImpl;  

注意一点,默认会先尝试使用Integer,Long,Short等来对应DECIMAL和

NUMERIC数据类型;

-->*

<javaTypeResolvertype=”org.mybatis.generator.internal.types.JavaTypeResolverDefaultImpl” >
*<!–

   true:使用BigDecimal对应DECIMAL和 NUMERIC数据类型  

   false:默认,  

      scale>0;length>18:使用BigDecimal;  

      scale=0;length[10,18]:使用Long;  

      scale=0;length[5,9]:使用Integer;  

      scale=0;length<5:使用Short;  

-->* <**propertyname="forceBigDecimals"

value=”false”/>
</
javaTypeResolver** >

*<!– java模型创建器,是必须要的元素

   负责:1,key类(见context的defaultModelType);2,java类;3,查询类  

   targetPackage:生成的类要放的包,真实的包受enableSubPackages属性控制;  

   targetProject:目标项目,指定一个存在的目录下,生成的内容会放到指定目录中,如果目录不存在,MBG不会自动建目录  

-->* <**javaModelGeneratortargetPackage="com.\_520it.mybatis.domain"

targetProject=”src/main/java”**>
*<!– for MyBatis3/MyBatis3Simple

  自动为每一个生成的类创建一个构造方法,构造方法包含了所有的field;而不是使用setter;  

-->* <**propertyname="constructorBased"

value=”false”*/>
*

     <propertyname=”enableSubPackages”
value=”true”
/>

*<!-- for MyBatis3 / MyBatis3Simple  

   是否创建一个不可变的类,如果为true,  

   那么MBG会创建一个没有setter方法的类,取而代之的是类似constructorBased的类  

-->*

<propertyname=”immutable”
value=”false”
/>

*<!-- 设置一个根对象,  

     如果设置了这个根对象,那么生成的keyClass或者recordClass会继承这个类;在Table的rootClass属性中可以覆盖该选项  

     注意:如果在key class或者record class中有root class相同的属性,MBG就不会重新生成这些属性了,包括:  

        1,属性名相同,类型相同,有相同的getter/setter方法;  

  -->* <**propertyname="rootClass"

value=”com._520it.mybatis.domain.BaseDomain”**/>

*<!-- 设置是否在getter方法中,对String类型字段调用trim()方法-->* <**propertyname="trimStrings"

value=”true”/>
</
javaModelGenerator** >

*<!– 生成SQL map的XML文件生成器,

   注意,在Mybatis3之后,我们可以使用mapper.xml文件+Mapper接口(或者不用mapper接口),  

       或者只使用Mapper接口+Annotation,所以,如果 javaClientGenerator配置中配置了需要生成XML的话,这个元素就必须配置  

   targetPackage/targetProject:同javaModelGenerator  

-->* <**sqlMapGeneratortargetPackage="com.\_520it.mybatis.mapper"

targetProject=”src/main/resources”>
** <
propertyname=”enableSubPackages”
value=”true”/>
</
sqlMapGenerator** >

*<!– 对于mybatis来说,即生成Mapper接口,注意,如果没有配置该元素,那么默认不会生成Mapper接口

    targetPackage/targetProject:同javaModelGenerator  

    type:选择怎么生成mapper接口(在MyBatis3/MyBatis3Simple下):  

        1,ANNOTATEDMAPPER:会生成使用Mapper接口+Annotation的方式创建(SQL生成在annotation中),不会生成对应的XML;  

        2,MIXEDMAPPER:使用混合配置,会生成Mapper接口,并适当添加合适的Annotation,但是XML会生成在XML中;  

        3,XMLMAPPER:会生成Mapper接口,接口完全依赖XML;  

    注意,如果context是MyBatis3Simple:只支持ANNOTATEDMAPPER和XMLMAPPER  

-->* <**javaClientGeneratortargetPackage="com.\_520it.mybatis.mapper"

type=”ANNOTATEDMAPPER”
targetProject=”src/main/java”>
** <
propertyname=”enableSubPackages”
value=”true”**/>
*<!– 可以为所有生成的接口添加一个父接口,但是MBG只负责生成,不负责检查

<propertyname="rootInterface" value=""/>  

 -->* </**javaClientGenerator**>  

*<!– 选择一个table来生成相关文件,可以有一个或多个table,必须要有table元素

选择的table会生成一下文件:  

1,SQL map文件  

2,生成一个主键类;  

3,除了BLOB和主键的其他字段的类;  

4,包含BLOB的类;  

5,一个用户生成动态查询的条件类(selectByExample,deleteByExample),可选;  

6,Mapper接口(可选)  

tableName(必要):要生成对象的表名;  

注意:大小写敏感问题。正常情况下,MBG会自动的去识别数据库标识符的大小写敏感度,在一般情况下,MBG会  

    根据设置的schema,catalog或tablename去查询数据表,按照下面的流程:  

    1,如果schema,catalog或tablename中有空格,那么设置的是什么格式,就精确的使用指定的大小写格式去查询;  

    2,否则,如果数据库的标识符使用大写的,那么MBG自动把表名变成大写再查找;  

    3,否则,如果数据库的标识符使用小写的,那么MBG自动把表名变成小写再查找;  

    4,否则,使用指定的大小写格式查询;  

另外的,如果在创建表的时候,使用的""把数据库对象规定大小写,就算数据库标识符是使用的大写,在这种情况下也会使用给定的大小写来创建表名;  

这个时候,请设置delimitIdentifiers="true"即可保留大小写格式;*

*可选:

1,schema:数据库的schema;  

2,catalog:数据库的catalog;  

3,alias:为数据表设置的别名,如果设置了alias,那么生成的所有的SELECT SQL语句中,列名会变成:alias\_actualColumnName  

4,domainObjectName:生成的domain类的名字,如果不设置,直接使用表名作为domain类的名字;可以设置为somepck.domainName,那么会自动把domainName类再放到somepck包里面;  

5,enableInsert(默认true):指定是否生成insert语句;  

6,enableSelectByPrimaryKey(默认true):指定是否生成按照主键查询对象的语句(就是getById或get);  

7,enableSelectByExample(默认true):MyBatis3Simple为false,指定是否生成动态查询语句;  

8,enableUpdateByPrimaryKey(默认true):指定是否生成按照主键修改对象的语句(即update);  

9,enableDeleteByPrimaryKey(默认true):指定是否生成按照主键删除对象的语句(即delete);  

10,enableDeleteByExample(默认true):MyBatis3Simple为false,指定是否生成动态删除语句;  

11,enableCountByExample(默认true):MyBatis3Simple为false,指定是否生成动态查询总条数语句(用于分页的总条数查询);  

12,enableUpdateByExample(默认true):MyBatis3Simple为false,指定是否生成动态修改语句(只修改对象中不为空的属性);  

13,modelType:参考context元素的defaultModelType,相当于覆盖;  

14,delimitIdentifiers:参考tableName的解释,注意,默认的delimitIdentifiers是双引号,如果类似MYSQL这样的数据库,使用的是`(反引号,那么还需要设置context的beginningDelimiter和endingDelimiter属性)  

15,delimitAllColumns:设置是否所有生成的SQL中的列名都使用标识符引起来。默认为false,delimitIdentifiers参考context的属性*

*注意,table里面很多参数都是对javaModelGenerator,context等元素的默认属性的一个复写;

-->*

<tabletableName=”userinfo”>
** <propertyname=”constructorBased”
value=”false”
/>
** <propertyname=”ignoreQualifiersAtRuntime”
value=”false”
/>
** <propertyname=”immutable”
value=”false”
/>
** <propertyname=”modelOnly”
value=”false”
/>
*<!– 参考 javaModelGenerator
的 rootClass
属性

<propertyname="rootClass" value=""/>  

 -->*

*<!– 参考javaClientGenerator
的 rootInterface
属性

<propertyname="rootInterface" value=""/>  

-->*

*<!– 如果设置了runtimeCatalog,那么在生成的SQL中,使用该指定的catalog,而不是table元素上的catalog

<property name="runtimeCatalog" value=""/>*

–>

*<!– 如果设置了runtimeSchema,那么在生成的SQL中,使用该指定的schema,而不是table元素上的schema

<propertyname="runtimeSchema" value=""/>  

-->*

*<!– 如果设置了runtimeTableName,那么在生成的SQL中,使用该指定的tablename,而不是table元素上的tablename

<propertyname="runtimeTableName" value=""/>  

-->*

*<!– 注意,该属性只针对MyBatis3Simple有用;

   如果选择的runtime是MyBatis3Simple,那么会生成一个SelectAll方法,如果指定了selectAllOrderByClause,那么会在该SQL中添加指定的这个order条件;  

-->*

<propertyname=”selectAllOrderByClause”
value=”age desc,username asc”
/>
** <propertyname=”useActualColumnNames”
value=”false”
/>

*<!– generatedKey用于生成生成主键的方法,如果设置了该元素,MBG会在生成的元素中生成一条正确的元素,该元素可选

column:主键的列名;  

sqlStatement:要生成的selectKey语句,有以下可选项:  

   Cloudscape:相当于selectKey的SQL为: VALUESIDENTITY\_VAL\_LOCAL()  

   DB2       :相当于selectKey的SQL为: VALUESIDENTITY\_VAL\_LOCAL()  

   DB2\_MF    :相当于selectKey的SQL为:SELECT IDENTITY\_VAL\_LOCAL()FROM

SYSIBM.SYSDUMMY1

Derby      :相当于selectKey的SQL为:VALUESIDENTITY\_VAL\_LOCAL()  

 HSQLDB      :相当于selectKey的SQL为:CALL IDENTITY()  

Informix  :相当于selectKey的SQL为:selectdbinfo('sqlca.sqlerrd1')

from systables where tabid=1

       MySql      :相当于selectKey的SQL为:SELECT1263732  

       SqlServer:相当于selectKey的SQL为:SELECTSCOPE\_IDENTITY()  

        SYBASE      :相当于selectKey的SQL为:SELECT @@IDENTITY  

       JDBC      :相当于在生成的insert元素上添加useGeneratedKeys="true"和keyProperty属性  

<generatedKey column=""sqlStatement=""/>  

 -->*

*<!–

    该元素会在根据表中列名计算对象属性名之前先重命名列名,非常适合用于表中的列都有公用的前缀字符串的时候,  

    比如列名为:CUST\_ID,CUST\_NAME,CUST\_EMAIL,CUST\_ADDRESS等;  

    那么就可以设置searchString为"^CUST\_",并使用空白替换,那么生成的Customer对象中的属性名称就不是  

   custId,custName等,而是先被替换为ID,NAME,EMAIL,然后变成属性:id,name,email;  

    注意,MBG是使用java.util.regex.Matcher.replaceAll来替换searchString和replaceString的,  

    如果使用了columnOverride元素,该属性无效;  

<columnRenamingRulesearchString="" replaceString=""/>  

 -->*

*<!– 用来修改表中某个列的属性,MBG会使用修改后的列来生成domain的属性;

   column:要重新设置的列名;  

   注意,一个table元素中可以有多个columnOverride元素哈~  

-->* <**columnOverridecolumn="username"** >  
    *<!-- 使用property属性来指定列要生成的属性名称

–>* <propertyname=”property”
value=”userName”
/>
*<!– javaType用于指定生成的domain的属性类型,使用类型的全限定名

<propertyname="javaType" value=""/>  

 -->  

<!--jdbcType用于指定该列的JDBC类型  

<propertyname="jdbcType" value=""/>  

 -->  

<!--typeHandler 用于指定该列使用到的TypeHandler,如果要指定,配置类型处理器的全限定名  

   注意,mybatis中,不会生成到mybatis-config.xml中的typeHandler  

   只会生成类似:where id =#{id,jdbcType=BIGINT,typeHandler=com.\_520it.mybatis.MyTypeHandler}的参数描述  

<propertyname="jdbcType" value=""/>  

-->*

*<!–参考table元素的delimitAllColumns配置,默认为false

     <propertyname="delimitedColumnName" value=""/>  

      -->*</**columnOverride**>  




*<!--ignoreColumn设置一个MGB忽略的列,如果设置了改列,那么在生成的domain中,生成的SQL中,都不会有该列出现  

     column:指定要忽略的列的名字;  

    delimitedColumnName:参考table元素的delimitAllColumns配置,默认为false*

*注意,一个table元素中可以有多个ignoreColumn元素

<ignoreColumncolumn="deptId" delimitedColumnName=""/>  

-->* </**table**>  

</context >
</generatorConfiguration>

好了,就到这里了,此文档可以作为一个工具文档进行查看。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

实战(二)轻松使用requests库和beautifulso

发表于 2017-12-03

前言

实战(一)之使用自带urllib和re正则表达式获取电影详情页链接

其实大多情况下,python自带的urllib和re正则表达式已经可以满足我们的日常需求了,但是,聪明的世人怎么会满足于此呢,只有更好就没有最好。所以,就诞生了requests和beautifulsoup这两个作为爬虫的黄金组合。而python的魅力就在于此,你可以找到很多好用易上手到让你心头一颤的第三方库。

一、安装&简单使用入门。

1、安装

使用Pip可以很方便的安装:

pip install requests

pip install beautifulsoup4

2、requests 入门。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码     import requests

## get请求
r = requests.get('https://github.com/timeline.json')
r.json() ##如果是JSON 响应内容,使用r.json()会自动将json结果转换成dict
r.content() ##二进制相应内容

headers = {'user-agent': 'my-app/0.0.1'} #定制请求头
r = requests.get('https://github.com/timeline.json', headers=headers)

payload = {'key1': 'value1', 'key2': 'value2'} #传递url参数
r = requests.get("http://httpbin.org/get", params=payload)

## post请求
payload = {'key1': 'value1', 'key2': 'value2'} ## post数据
r = requests.post("http://httpbin.org/post", data=payload)

## 上传文件
url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}
r = requests.post(url, files=files)

更多详细的请查看官方的中文文档,详细易懂权威。

http://docs.python-requests.org/zh\_CN/latest/user/quickstart.html

3、beautifulsoup入门。

beautifulsoup可以快速的去定位HTML文档,HTML是用来描述网页的一种超文本标记语言,不是一种编程语言。
如果你没有HTML基础,可以去花一天的时间了解下。
菜鸟教程–HTML

http://www.runoob.com/html/html-tutorial.html

注意的点
  • 现在假设知道了HTML是个什么东西,你会发现HTML就是由一层又一层的tag组成,每个tag节点有自己的class属性或者其他属性、自己的父tag、子tag以及兄弟tag,而beautifulsoup的作用就是通过这种蛛丝马迹,轻易的把你要的凶手。。哦不目标节点揪出来,免去了写正则表达式的繁琐噩梦。
  • beautifulsoup对HTML的解释是依赖第三方解释库的,常用的有html.parser、lxml、支持xml解释的lxml-xml、html5Lib,各有优缺点,根据我的经验,有时候使用beautifulsoup返回的待处理文本,会缺少一些tag节点,但是你又确定这不是动态加载的话,其实这是因为解释器无法解释,直接跳过导致的,这时候,可以更换解释器尝试一下。
常用的方法

我这里只用下find和find_all作为实例,详细的用法请去权威易懂的官方文档。 毕竟做搬运工是件很累且无意义的事情。

http://beautifulsoup.readthedocs.io/zh\_CN/latest/

1
2
3
4
5
6
7
复制代码from bs4 import BeautifulSoup

##配合requests
r = requests.get('http://www.douban.com')
##一锅待处理的soup汤
soup = BeautifulSoup(r.content,'lxml') #使用lxml解释库
print(soup)

我们会得到如下的soup体,然后定位到红色框的a块。

image.png

通过属性定位查找该节点 (find)

1
2
3
4
复制代码    a = soup.find('a',attrs={'class':'lnk-book'})
print(a)
print('链接: '+a['href'])
print('文字: '+a.text)

image.png

返回包含所有该节点的列表(find_all)

1
2
复制代码a_s = soup.find_all('a')
print (a_s)

image.png

提示:有时候需要先将目标范围逐层缩小,这样容易获取目标节点。

二、爬取豆瓣图书top250

分析页面。

1、 我们点击底部的页码,会发现页数是25的倍数,从0开始,这样我们就可以构造对应页的url了。

image.png

2、我们定位到每页,查找书本的信息,获取对应的url,下图为每页爬取到的书本详情页url。

image.png

image.png

3、在图书的详情页,我们定位到如下元素。获取该书的书名,评分和评分人数。

image.png

image.png

代码编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
复制代码# -*- coding:utf-8 -*-

#author:waiwen
#email:iwaiwen@163.com
#time: 2017/12/3 12:27

from bs4 import BeautifulSoup
import requests
import random


#uer_agent库,随机选取,防止被禁
USER_AGENT_LIST = [
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",
"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24",
"Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24"
]


#请求网页的代码整合
def get_response(url):
#random.choice从一个集合中随机选出请求头
headers = {'user-agent':random.choice(USER_AGENT_LIST)}

resp = requests.get(url,headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.content, 'lxml')
return soup

#找到每本书的链接
def get_book_url(page):
if page>10:
return []
num=(page-1)*25
url ='https://book.douban.com/top250?start=%s'%str(num)
soup = get_response(url)
book_div = soup.find('div', attrs={'class': 'indent'})
books = book_div.find_all('tr', attrs={'class': 'item'})
urls = [ book.td.a['href'] for book in books ]
print('获取第%s页'%page,urls)
return urls

#获得每本书的信息
def get_book_info(book_url):
soup = get_response(book_url)

div_info = soup.find('div',attrs={'id':'info'})

book_author = div_info.a.text.split(' ')[-1] #将空格去除
book = soup.find('div',attrs={'class':'rating_wrap clearbox'})
book_name= soup.find('span',attrs={'property':'v:itemreviewed'}).text

book_grade = book.find('strong',attrs={'class':'ll rating_num '}).text
book_man = book.find('a',attrs={'class':'rating_people'}).span.text
book_info ={}

book_info['name']=book_name
book_info['author']=book_author
book_info['rating_num'] = int(book_man)
book_info['grade'] = float(book_grade)
print(book_info)

return book_info



if __name__ == '__main__':
all_urls = []
#从第1页到第10页爬取,链接拼接到一起。
for page in range(1,11):
urls = get_book_url(page)
all_urls = all_urls+urls
print('获取到的链接数:',len(all_urls))
out=''
for url in all_urls:
try:
info = get_book_info(url)
except Exception as e:
print(e)
continue
out=out+str(info)+'\n'
with open('douban_book_top250.txt','w') as f: #输出到TXT文件
f.write(out)

image.png

总结

分析页面,找到目标元素所在的tag节点,记录其属性,通过beautifulsoup的find和find_all找到该区域,然后进行相应的提取。这个过程中,要说明一下,使用单线程阻塞爬取250本书的信息,比较耗时,计划后面将会使用多线程进行改进,敬请期待哟。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…920921922…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%