Skip to the content.

Synchronization Primitives - Event

Solution

For Processes

import logging
import multiprocessing
from multiprocessing.synchronize import Event
import time


def worker_1(event: Event, logger: logging.Logger):
    """wait for worker_2."""
    logger.debug('wait for event')
    event.wait()
    assert event.is_set()
    logger.debug('event is set')


def worker_2(event: Event, logger: logging.Logger):
    time.sleep(1.0)
    logger.debug('set event')
    event.set()


def worker_timeout(event: Event, logger: logging.Logger):
    """wait with timeout."""
    timeout: float = 1.5
    logger.debug(f'wait for event, timeout: {timeout}')
    event.wait(timeout)
    if event.is_set():
        logger.debug('run')
    else:
        logger.debug(f'timeout: {timeout}')


def worker_wait_main(event: Event, logger: logging.Logger):
    """wait for event set by the main process."""
    logger.debug('wait for event')
    event.wait()
    assert event.is_set()
    logger.debug('event is set')


if __name__ == '__main__':
    logger = multiprocessing.log_to_stderr(logging.DEBUG)

    e1 = multiprocessing.Event()
    p1 = multiprocessing.Process(target=worker_1,
                                 name='worker_1',
                                 args=(e1, logger))
    p2 = multiprocessing.Process(target=worker_2,
                                 name='worker_2',
                                 args=(e1, logger))
    p1.start()
    p2.start()

    # event timeout
    e2 = multiprocessing.Event()
    p_timeout = multiprocessing.Process(target=worker_timeout,
                                        name='worker_timeout',
                                        args=(e2, logger))
    p_timeout.start()

    # wait for main process
    e3 = multiprocessing.Event()
    p_main = multiprocessing.Process(target=worker_wait_main,
                                     name='worker_wait_main',
                                     args=(e3, logger))
    p_main.start()
    time.sleep(2.0)
    e3.set()

    # enumerate active child processes
    for p in multiprocessing.active_children():
        p.join()

For Threads

import logging
import threading
import time


logging.basicConfig(
    level=logging.DEBUG,
    style='{',
    format='[{asctime}] [{threadName:<24}] {message}'
)


def worker_1(event: threading.Event):
    """wait for worker_2."""
    logging.debug('wait for event')
    event.wait()
    assert event.is_set()
    logging.debug('event is set')


def worker_2(event: threading.Event):
    time.sleep(1.0)
    logging.debug('set event')
    event.set()


def worker_timeout(event: threading.Event):
    """wait with timeout."""
    timeout: float = 1.5
    logging.debug(f'wait for event, timeout: {timeout}')
    event.wait(timeout)
    if event.is_set():
        logging.debug('run')
    else:
        logging.debug(f'timeout: {timeout}')


def worker_wait_mainthread(event: threading.Event):
    """wait for event set by the main thread."""
    logging.debug('wait for event')
    event.wait()
    assert event.is_set()
    logging.debug('event is set')


e1 = threading.Event()
t1 = threading.Thread(target=worker_1, name='worker_1', args=(e1,))
t2 = threading.Thread(target=worker_2, name='worker_2', args=(e1,))
t1.start()
t2.start()

e2 = threading.Event()
t_timeout = threading.Thread(target=worker_timeout,
                             name='worker_timeout',
                             args=(e2,))
t_timeout.start()

e3 = threading.Event()
t_main = threading.Thread(target=worker_wait_mainthread,
                          name='worker_wait_mainthread',
                          args=(e3,))
t_main.start()
time.sleep(2.0)
e3.set()


# Wait until the threads terminate.
main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()

References