使用条件进行线程同步

条件指的是应用程序状态的改变。这是另一种同步机制,其中某些线程在等待某一条件发生,其他的线程会在该条件发生的时候进行通知。一旦条件发生,线程会拿到共享资源的唯一权限。

1. 准备工作

解释条件机制最好的例子还是生产者-消费者问题。在本例中,只要缓存不满,生产者一直向缓存生产;只要缓存不空,消费者一直从缓存取出(之后销毁)。当缓冲队列不为空的时候,生产者将通知消费者;当缓冲队列不满的时候,消费者将通知生产者。

2. 如何做…

为了演示条件机制,我们将再一次使用生产者-消费者的例子:

from threading import Thread, Condition
import time
items = []
condition = Condition()
class consumer(Thread):
    def __init__(self):
        Thread.__init__(self)
    def consume(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 0:
            condition.wait()
            print("Consumer notify : no item to consume")
        items.pop()
        print("Consumer notify : consumed 1 item")
        print("Consumer notify : items to consume are " + str(len(items)))
        condition.notify()
        condition.release()
    def run(self):
        for i in range(0, 20):
            time.sleep(2)
            self.consume()
class producer(Thread):
    def __init__(self):
        Thread.__init__(self)
    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 10:
            condition.wait()
            print("Producer notify : items producted are " + str(len(items)))
            print("Producer notify : stop the production!!")
        items.append(1)
        print("Producer notify : total items producted " + str(len(items)))
        condition.notify()
        condition.release()
    def run(self):
        for i in range(0, 20):
            time.sleep(1)
            self.produce()
if __name__ == "__main__":
    producer = producer()
    consumer = consumer()
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

运行的结果如下:

3. 讨论

译者在这里添加一段。乍一看这段代码好像会死锁,因为 condition.acquire() 之后就在 .wait() 了,好像会一直持有锁。其实 .wait() 会将锁释放,然后等待其他线程 .notify() 之后会重新尝试获得锁。但是要注意 .notify() 并不会自动释放锁,所以代码中有两行,先 .notify() 然后再 .release()

译者画了一张图,方便大家理解。这里的过程应该是这样子的(注意 wait() 里面实际有一个释放锁重新获得锁的过程):

译者的私货完毕,建议看一下官方文档: https://docs.python.org/3/library/threading.html )

消费者通过拿到锁来修改共享的资源 items[]

condition.acquire()

如果list的长度为0,那么消费者就进入等待状态:

if len(items) == 0:
    condition.wait()

否则就通过 pop 操作消费一个item:

items.pop()

然后,消费者的状态被通知给生产者,同时共享资源释放:

condition.notify()
condition.release()

生产者拿到共享资源,然后确认缓冲队列是否已满(在我们的这个例子中,最大可以存放10个item),如果已经满了,那么生产者进入等待状态,直到被唤醒:

condition.acquire()
if len(items) == 10:
    condition.wait()

如果队列没有满,就生产1个item,通知状态并释放资源:

condition.notify()
condition.release()

4. 了解更多

Python对条件同步的实现很有趣。如果没有已经存在的锁传给构造器的话,内部的 _Condition 会创建一个 RLock() 对象。同时,这个RLock也会通过 acquire()release() 管理:

class _Condition(_Verbose):
    def __init__(self, lock=None, verbose=None):
        _Verbose.__init__(self, verbose)
        if lock is None:
           lock = RLock()
        self.__lock = lock

(以下又是笔者的私货,最近看到一道面试题是这样的,开3个线程按照顺序打印ABC 10次。正好是 Condition 的使用场景。我把我写的代码贴在这里供大家参考。

# -*- coding: utf-8 -*-
"""
Three threads print A B C in order.
"""
from threading import Thread, Condition
condition = Condition()
current = "A"
class ThreadA(Thread):
    def run(self):
        global current
        for _ in range(10):
            with condition:
                while current != "A":
                    condition.wait()
                print("A")
                current = "B"
                condition.notify_all()
class ThreadB(Thread):
    def run(self):
        global current
        for _ in range(10):
            with condition:
                while current != "B":
                    condition.wait()
                print("B")
                current = "C"
                condition.notify_all()
class ThreadC(Thread):
    def run(self):
        global current
        for _ in range(10):
            with condition:
                while current != "C":
                    condition.wait()
                print("C")
                current = "A"
                condition.notify_all()
a = ThreadA()
b = ThreadB()
c = ThreadC()
a.start()
b.start()
c.start()
a.join()
b.join()
c.join()

原理很简单,就是线程拿到锁先检查是不是自己渴望的状态。比如打印“B”的线程,渴望的状态 current = 'B' 然后打印出B,将状态改成 C ,这样就成了打印“C”的线程渴望的状态。

但是这里不能唤醒指定的线程,只好唤醒所有的线程,让他们自己再检查一遍状态了。)