Skip to the content.

Synchronization Primitives - Condition Variable

typical use case: producer-consumer situation with unlimited buffer capacity:

Solution

For Processes

import multiprocessing


def consumer(cond: multiprocessing.Condition, q: multiprocessing.Queue):
    with cond:
        # is equivalent to:
        #
        # def an_item_is_available(): return not q.empty()
        # if cond.wait_for(an_item_is_available):
        while q.empty():
            cond.wait()
            item = q.get()


def producer(cond: multiprocessing.Condition, q: multiprocessing.Queue):
    with cond:
        q.put('an item')  # produce_an_item
        cond.notify(2)  # default 1, notify_all() for all


if __name__ == '__main__':
    cond = multiprocessing.Condition()
    q = multiprocessing.Queue()

    c1 = multiprocessing.Process(target=consumer, name='c1', args=(cond, q))
    c2 = multiprocessing.Process(target=consumer, name='c2', args=(cond, q))
    p = multiprocessing.Process(target=producer, name='producer', args=(cond, q))

    c1.start()
    c2.start()
    p.start()

    c1.join()
    c2.join()
    p.join()

For Threads

import queue
import threading

def consumer(cond: threading.Condition, q: queue.SimpleQueue):
    with cond:
        # is equivalent to:
        #
        # def an_item_is_available(): return not q.empty()
        # if cond.wait_for(an_item_is_available):
        while q.empty():
            cond.wait()
            item = q.get()


def producer(cond: threading.Condition, q: queue.SimpleQueue):
    with cond:
        q.put('an item')  # produce_an_item
        cond.notify(2)  # default 1, notify_all() for all


cond = threading.Condition()
q = queue.SimpleQueue()

c1 = threading.Thread(target=consumer, name='c1', args=(cond, q))
c2 = threading.Thread(target=consumer, name='c2', args=(cond, q))
p = threading.Thread(target=producer, name='producer', args=(cond, q))

c1.start()
c2.start()
p.start()

References