并行应用常常需要在进程之间交换数据。Multiprocessing库有两个Communication Channel可以交换对象:队列(queue)和管道(pipe)。
1. 使用队列交换对象
我们可以通过队列数据结构来共享对象。
Queue
返回一个进程共享的队列,是线程安全的,也是进程安全的。任何可序列化的对象(Python通过 pickable
模块序列化对象)都可以通过它进行交换。
2. 如何做…
在下面的例子中,我们将展示如何使用队列来实现生产者-消费者问题。 Producer
类生产item放到队列中,然后 Consumer
类从队列中移除它们。
import multiprocessing
import random
import time
class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
print("Process Producer : item %d appended to queue %s" % (item, self.name))
time.sleep(1)
print("The size of queue is %s" % self.queue.qsize())
class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
if self.queue.empty():
print("the queue is empty")
break
else:
time.sleep(2)
item = self.queue.get()
print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
time.sleep(1)
if __name__ == '__main__':
queue = multiprocessing.Queue()
process_producer = Producer(queue)
process_consumer = Consumer(queue)
process_producer.start()
process_consumer.start()
process_producer.join()
process_consumer.join()
运行结果如下(译者注:macOS High Sierra运行失败,错误是 NotImplementedError
可能是因为 self._sem._semlock._get_value()
没有实现):
C:\Python CookBook\Chapter 3 - Process Based Parallelism\Example Codes
Chapter 3>python using_queue.py
Process Producer : item 69 appended to queue producer-1
The size of queue is 1
Process Producer : item 168 appended to queue producer-1
The size of queue is 2
Process Consumer : item 69 popped from by consumer-2
Process Producer : item 235 appended to queue producer-1
The size of queue is 2
Process Producer : item 152 appended to queue producer-1
The size of queue is 3
Process Producer : item 213 appended to queue producer-1
Process Consumer : item 168 popped from by consumer-2
The size of queue is 3
Process Producer : item 35 appended to queue producer-1
The size of queue is 4
Process Producer : item 218 appended to queue producer-1
The size of queue is 5
Process Producer : item 175 appended to queue producer-1
Process Consumer : item 235 popped from by consumer-2
The size of queue is 5
Process Producer : item 140 appended to queue producer-1
The size of queue is 6
Process Producer : item 241 appended to queue producer-1
The size of queue is 7
Process Consumer : item 152 popped from by consumer-2
Process Consumer : item 213 popped from by consumer-2
Process Consumer : item 35 popped from by consumer-2
Process Consumer : item 218 popped from by consumer-2
Process Consumer : item 175 popped from by consumer-2
Process Consumer : item 140 popped from by consumer-2
Process Consumer : item 241 popped from by consumer-2
the queue is empty
3. 如何做…
我们使用 multiprocessing
类在主程序中创建了 Queue
的实例:
if __name__ == '__main__':
queue = multiprocessing.Queue()
然后我们创建了两个进程,生产者和消费者, Queue
对象作为一个属性。
process_producer = Producer(queue)
process_consumer = Consumer(queue)
生产者类负责使用 put()
方法放入10个item:
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
消费者进程负责使用 get()
方法从队列中移除item,并且确认队列是否为空,如果为空,就执行 break
跳出 while
循环:
def run(self):
while True:
if self.queue.empty():
print("the queue is empty")
break
else:
time.sleep(2)
item = self.queue.get()
print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
time.sleep(1)
4. 了解更多
队列还有一个 JoinaleQueue
子类,它有以下两个额外的方法:
task_done()
: 此方法意味着之前入队的一个任务已经完成,比如,get()
方法从队列取回item之后调用。所以此方法只能被队列的消费者调用。join()
: 此方法将进程阻塞,直到队列中的item全部被取出并执行。
Microndgt 注:因为使用队列进行通信是一个单向的,不确定的过程,所以你不知道什么时候队列的元素被取出来了,所以使用task_done来表示队列里的一个任务已经完成。
这个方法一般和join一起使用,当队列的所有任务都处理之后,也就是说put到队列的每个任务都调用了task_done方法后,join才会完成阻塞。
5. 使用管道交换对象
第二种Communication Channel是管道。一个管道可以做以下事情:
- 返回一对被管道连接的连接对象
- 然后对象就有了 send/receive 方法可以在进程之间通信
6. 如何做…
下面是管道用法的一个简单示例。这里有一个进程管道从0到9发出数字,另一个进程接收数字并进行平方计算。
import multiprocessing
def create_items(pipe):
output_pipe, _ = pipe
for item in range(10):
output_pipe.send(item)
output_pipe.close()
def multiply_items(pipe_1, pipe_2):
close, input_pipe = pipe_1
close.close()
output_pipe, _ = pipe_2
try:
while True:
item = input_pipe.recv()
output_pipe.send(item * item)
except EOFError:
output_pipe.close()
if __name__== '__main__':
# 第一个进程管道发出数字
pipe_1 = multiprocessing.Pipe(True)
process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
process_pipe_1.start()
# 第二个进程管道接收数字并计算
pipe_2 = multiprocessing.Pipe(True)
process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
process_pipe_2.start()
pipe_1[0].close()
pipe_2[0].close()
try:
while True:
print(pipe_2[1].recv())
except EOFError:
print("End")
程序的输出如下:
7. 讨论
Pipe()
函数返回一对通过双向管道连接起来的对象。在本例中, out_pipe
包含数字0-9,通过目标函数 create_items()
产生:
def create_items(pipe):
output_pipe, _ = pipe
for item in range(10):
output_pipe.send(item)
output_pipe.close()
在第二个进程中,我们有两个管道,输入管道和包含结果的输出管道:
process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
最后打印出结果:
try:
while True:
print(pipe_2[1].recv())
except EOFError:
print("End")