python 进程(池)、线程(池)

进程、多进程、进程池

进程总概述

进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
python3复制代码from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

多进程(进程池创建)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
python3复制代码from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))


if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(3)
for i in range(4):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

解析:
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
Parent process 87461.
Waiting for all subprocesses done…
Run task 0 (87462)…
Run task 1 (87463)…
Run task 2 (87464)…
Task 1 runs 1.66 seconds.
Run task 3 (87463)… —————–> task3在某个进程结束时,在创建
Task 2 runs 2.33 seconds.
Task 0 runs 2.54 seconds.
Task 3 runs 2.83 seconds.
All subprocesses done.

进程之间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
python3复制代码from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

线程总概述

线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
python3复制代码import time, threading

# 新线程执行的代码:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

线程锁-线程安全(操作同一个变量)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
python3复制代码balance = 0
lock = threading.Lock()

def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
global balance
balance = balance + n
balance = balance - n
finally:
# 改完了一定要释放锁:
lock.release()

线程池创建

ThreadPoolExecutor实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
python复制代码from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()

def echo_server(addr):
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

手动创建你自己的线程池, 通常可以使用一个Queue来轻松实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
python3复制代码from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
'''
Handle a client connection
'''
sock, client_addr = q.get()
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')

sock.close()

def echo_server(addr, nworkers):
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()

# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))
echo_server(('',15000), 128)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%