Package ezq

Simple wrapper for python multiprocessing and threading.

Octavia the Octopus
Octavia the Octopus

Build PyPI Supported Python Versions

Why?

ezq makes it easy to connect subprocesses and threads (both considered "workers") using queues with a simpler API than concurrent.futures, multiprocessing, or threading.

Install

python -m pip install ezq

Example: Quick Start

If you just want to apply a function to some inputs, you can use map() to run it on all available CPUs and get the results back.

import ezq
print(list(ezq.map(lambda x: x * 2, range(6))))
# => [0, 2, 4, 6, 8, 10]

Example: Sum Messages

Here's a simple example of a worker that reads from an input queue, sums up the messages, and puts the result on an output queue.

import ezq


def worker(q, out):
    """Add up all the messages."""
    total = 0
    for msg in q:  # read a message from the queue
        total += msg.data

    # after reading all the messages, write the total
    out.put(total)


def main():
    """Run several workers."""
    # Step 1: Creates the queues and start the workers.
    q, out = ezq.Q(), ezq.Q()  # input & output queues
    workers = [ezq.run(worker, q, out) for _ in range(ezq.NUM_CPUS)]
    # workers are all running

    # Step 2: Send work to the workers.
    for i in range(1000):
        q.put(i)  # send work

    # Step 3: Tell the workers to finish.
    q.stop(workers)
    # workers are all stopped

    # Step 4: Process the results.
    want = sum(range(1000))
    have = sum(msg.data for msg in out.items())
    assert have == want
    print(have)


if __name__ == "__main__":
    main()

Typical worker lifecycle

Process vs Thread

ezq supports two kinds of workers: Process and Thread. There is a lot of existing discussion about when to use which approach, but a general rule of thumb is:

  • Process is for parallelism so you can use multiple CPUs at once. Ideal for CPU-bound tasks like doing lots of mathematical calculations.

  • Thread is for concurrency so you can use a single CPU to do multiple things. Ideal for I/O-bound tasks like waiting for a disk, database, or network.

Some more differences:

Create queues

In the main process, create the queues you'll need. Here are my common situations:

  • 0 queues: I'm using a simple function and can ask map() to make the queues for me.

  • 1 queue: the worker reads from an input queue and persists the result somewhere else (e.g., writing to disk, making a network call, running some other program).

  • 2 queues (most common): the worker reads from an input queue and write the results to an output queue.

  • 3 queues: multiple stages of work are happening where workers are reading from one queue and writing to another queue for another worker to process.

NOTE: If you're using Thread workers, you can save some overhead by passing Q("thread"). This lightweight queue also doesn't use pickle, so you can use it to pass hard-to-pickle things (e.g., database connection).

q, out = ezq.Q(), ezq.Q() # most common
q2 = ez.Q("thread") # only ok for Thread workers

A worker task is just a function

In general, there's nothing special about a worker function, but note:

  • If you're using Process workers, all arguments are passed through pickle first.

  • We don't currently do anything with the return value of this function (unless you use map()). You'll need an output queue to return data back to the main process/thread.

Create workers

In the main process, create workers using run() or run_thread() which take a function and any additional parameters. Typically, you'll pass the queues you created to the workers at this point.

NOTE: Process and Thread workers can create additional Thread workers, but they cannot create additional Process workers.

Send data

Once you've created the workers, you send them data with Q.put() which creates Msg objects and puts them in the queue. Each message has three attributes (all optional):

  • data: Any - This is the data you want the worker to work on.

  • kind: str - You can use this to send multiple kinds of work to the same worker. Note that the special END kind is used to indicate the end of a queue.

  • order: int - This is the message order which can help you reorder results or ensure that messages from a queue are read in a particular order (that's what Q.sorted() uses).

Beware pickle

If you are using Process workers, everything passed to the worker (arguments, messages) is first passed to pickle (actually, dill). Anything that cannot be pickled with dill (e.g., database connections), cannot be passed to Process workers. Note that dill can serialize many more types than pickle (e.g. lambda functions).

Beware shared state

If you are using Thread workers, workers can share certain variables, so you need to be careful of how variables are access to avoid accidentally corrupting data.

Iterate over messages

Inside the worker, iterate over the queue to read each message until the queue ends (see below). If the messages need to be processed in order, use Q.sorted().

for msg in q: # read each message until the queue ends
  ...

for msg in q.sorted(): # read each message in order
  ...

End the queue

After the main process has sent all the data to the workers, it needs to indicate that there's no additional work to be done. This is done by calling Q.stop() using the input queue that the workers are reading from and passing the list of workers to wait for.

In some rare situations, you can use Q.end() to explicitly end the queue.

Process results

If you have an output queue, you may want to to process the results. You can use Q.items() to end the queue and read the current messages.

import ezq
out = ezq.Q()
...
result = [msg.data for msg in out.items()]
# OR
result = [msg.data for msg in out.items(sort=True)] # sorted by Msg.order
# OR
result = [msg.data for msg in out.items(cache=True)] # cache the messages

Example: Read and Write Queues

In this example, several workers read from a queue, process data, and then write to a different queue that a single worker uses to print to the screen sorting the results as it goes along.

Note that we use a single writer to avoid clashes or overwriting.

import ezq


def printer(out: ezq.Q) -> None:
    """Print results in increasing order."""
    for msg in out.sorted():
        print(msg.data)


def collatz(q: ezq.Q, out: ezq.Q) -> None:
    """Read numbers and compute values."""
    for msg in q:
        num = float(msg.data)
        if msg.kind == "EVEN":
            out.put((num, num / 2), order=msg.order)
        elif msg.kind == "ODD":
            out.put((num, 3 * num + 1), order=msg.order)


def main() -> None:
    """Run several threads with a subprocess for printing."""
    q, out = ezq.Q("thread"), ezq.Q()
    readers = [ezq.run_thread(collatz, q, out) for _ in range(ezq.NUM_THREADS)]
    writer = ezq.run(printer, out)

    for num in range(40):
        kind = "EVEN" if num % 2 == 0 else "ODD"
        q.put(num, kind=kind, order=num)

    q.stop(readers)
    out.stop(writer)


if __name__ == "__main__":
    main()

License

MIT License

Expand source code
"""Simple wrapper for python `multiprocessing` and `threading`.

.. include:: ../../README.md
   :start-line: 2
"""

# native
from dataclasses import dataclass
from operator import attrgetter
from os import cpu_count
from platform import system
from queue import Empty
from queue import Queue as ThreadSafeQueue
from threading import Thread
from typing import Any
from typing import Callable
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Literal
from typing import Optional
from typing import Sequence
from typing import TYPE_CHECKING
from typing import Union

# lib
from multiprocess import Process  # type: ignore
from multiprocess import Queue

__all__ = (
    "__version__",
    "Task",
    "Context",
    "ContextName",
    "Msg",
    "END_MSG",
    "MsgQ",
    "NUM_CPUS",
    "NUM_THREADS",
    "IS_MACOS",
    "Worker",
    "Q",
    "run",
    "run_thread",
    "map",
)

__version__ = "3.0.4"

Task = Callable[..., Any]
"""Task function signature (any `Callable`)."""

Context = Union[Process, Thread]
"""Execution contexts (`Process`, `Thread`)."""

ContextName = Literal["process", "thread"]
"""Execution context names (`"process"`, `"thread"`)."""


@dataclass
class Msg:
    """Message for a queue."""

    data: Any = None
    """Message data to be transmitted."""

    kind: str = ""
    """Optional marker of message type."""

    order: int = 0
    """Optional ordering of messages."""


# NOTE: The python `queue.Queue` is not properly a generic.
# See: https://stackoverflow.com/a/48554601
# TODO [2024-10-14]: @ py3.8 EOL remove this conditional
if TYPE_CHECKING:  # pragma: no cover
    MsgQ = Union[Queue[Msg], ThreadSafeQueue]  # pylint: disable=unsubscriptable-object
else:
    MsgQ = Queue

END_MSG: Msg = Msg(kind="END")
"""Message that indicates no future messages will be sent."""

## Hardware-Specific Information ##

NUM_CPUS: int = cpu_count() or 1
"""Number of CPUs on this machine."""

NUM_THREADS: int = min(32, NUM_CPUS + 4)
"""Default number of threads (up to 32).

See: [CPython's default for this value][1].

[1]: https://github.com/python/cpython/blob/a635d6386041a2971cf1d39837188ffb8139bcc7/Lib/concurrent/futures/thread.py#L142
"""

IS_MACOS: bool = system().lower().startswith("darwin")
"""`True` if we're running on MacOS.

Currently, we only use this value for testing, but there are certain features that
do not work properly on MacOS.

See: [Example of MacOS-specific issues][1].

[1]: https://github.com/python/cpython/blob/c5b670efd1e6dabc94b6308734d63f762480b80f/Lib/multiprocessing/queues.py#L125
"""


class Worker:
    """A function running in a `Process` or `Thread`."""

    _worker: Context
    """Execution context."""

    @staticmethod
    def process(task: Task, *args: Any, **kwargs: Any) -> "Worker":
        """Create a `Process`-based `Worker`.

        Args:
            task (Task): function to run
            *args (Any): additional arguments to `task`
            **kwargs (Any): additional keyword arguments to `task`

        Returns:
            Worker: wrapped worker.
        """
        # NOTE: On MacOS, python 3.8 switched the default method
        # from "fork" to "spawn" because fork is considered dangerous.
        # Some posts say "forkserver" should be ok.
        # See:  https://bugs.python.org/issue?@action=redirect&bpo=33725
        #
        # if IS_MACOS:
        #     ctx = get_context("forkserver")
        # else:
        #     ctx = get_context()
        return Worker(Process(daemon=True, target=task, args=args, kwargs=kwargs))

    @staticmethod
    def thread(task: Task, *args: Any, **kwargs: Any) -> "Worker":
        """Create a `Thread`-based `Worker`.

        Args:
            task (Task): function to run
            *args (Any): additional arguments to `task`
            **kwargs (Any): additional keyword arguments to `task`

        Returns:
            Worker: wrapped worker.
        """
        return Worker(Thread(daemon=False, target=task, args=args, kwargs=kwargs))

    def __init__(self, context: Context):
        """Construct a worker from a context.

        Args:
            context (Context): a `Process` or a `Thread`
        """
        self._worker = context
        self._worker.start()

    def __getattr__(self, name: str) -> Any:
        """Delegate properties to the underlying task.

        Args:
            name (str): attribute name

        Returns:
            Any: attribute from the task
        """
        return getattr(self._worker, name)


class Q:
    """Simple message queue."""

    _q: MsgQ
    """Wrapped queue."""

    _cache: Optional[List[Msg]] = None
    """Cache of queue messages when calling `.items(cache=True)`."""

    _timeout: float = 0.05
    """Time in seconds to poll the queue."""

    def __init__(self, kind: ContextName = "process"):
        """Construct a queue wrapper.

        Args:
            kind (ContextName, optional): If `"thread"`, construct a lighter-weight
                `Queue` that is thread-safe. Otherwise, construct a full
                `multiprocess.Queue`. Defaults to `"process"`.
        """
        if kind == "process":
            self._q = Queue()
        elif kind == "thread":
            self._q = ThreadSafeQueue()
        else:  # pragma: no cover
            raise ValueError(f"Unknown queue type: {kind}")

    def __getattr__(self, name: str) -> Any:
        """Delegate properties to the underlying queue.

        Args:
            name (str): name of the attribute to access

        Returns:
            Any: attribute from the queue
        """
        return getattr(self._q, name)

    def __iter__(self) -> Iterator[Msg]:
        """Iterate over messages in a queue until `END_MSG` is received.

        Yields:
            Iterator[Msg]: iterate over messages in the queue
        """
        while True:
            try:
                msg = self._q.get(block=True, timeout=self._timeout)
                if msg.kind == END_MSG.kind:
                    # We'd really like to put the `END_MSG` back in the queue
                    # to prevent reading past the end, but in practice
                    # this often creates an uncatchable `BrokenPipeError`.
                    # q.put(END_MSG)
                    break
                yield msg
            except Empty:  # pragma: no cover
                # queue might not actually be empty
                # see: https://bugs.python.org/issue20147
                continue

    def items(self, cache: bool = False, sort: bool = False) -> Iterator[Msg]:
        """End a queue and read all the current messages.

        Args:
            cache (bool, optional): if `True`, cache the messages. This allows you
                to call this method multiple times to get the same messages.
                Defaults to `False`.

            sort (bool, optional): if `True` messages are sorted by `Msg.order`.
                Defaults to `False`.

        Yields:
            Iterator[Msg]: iterate over messages in the queue
        """
        if cache:
            if self._cache is None:  # need to build a cache
                self.end()
                self._cache = list(self.sorted() if sort else self)
            return iter(self._cache)

        # not cached
        self.end()
        return self.sorted() if sort else iter(self)

    def sorted(self, start: int = 0) -> Iterator[Msg]:
        """Iterate over messages sorted by `Msg.order`.

        NOTE: `Msg.order` must be incremented by one for each message.
        If there are any gaps, messages after the gap won't be yielded
        until the end.

        Args:
            start (int, optional): initial message number. Defaults to `0`.

        Yields:
            Iterator[Msg]: message yielded in the correct order
        """
        prev = start - 1
        key = attrgetter("order")
        waiting: List[Msg] = []
        for item in self:
            if not waiting and key(item) == prev + 1:
                prev += 1
                yield item
                continue

            # items came out of order
            waiting.append(item)
            waiting.sort(key=key, reverse=True)  # sort in-place for performance
            while waiting and key(waiting[-1]) == prev + 1:
                prev += 1
                yield waiting.pop()

        # generator ended; yield any waiting items
        while waiting:
            yield waiting.pop()

    def put(self, data: Any = None, *, kind: str = "", order: int = 0) -> "Q":
        """Put a message on the queue.

        Args:
            data (Any, optional): message data. Defaults to `None`.

            kind (str, optional): kind of message. Defaults to `""`.

            order (int, optional): message order. Defaults to `0`.

        Returns:
            Self: self for chaining
        """
        if isinstance(data, Msg):
            self._q.put(data)
        else:
            self._q.put(Msg(data=data, kind=kind, order=order))
        return self

    def end(self) -> "Q":
        """Add the `END_MSG` to indicate the end of work.

        Returns:
            Self: self for chaining
        """
        self._q.put(END_MSG)
        return self

    def stop(self, workers: Union[Worker, Sequence[Worker]]) -> "Q":
        """Use this queue to notify workers to end and wait for them to join.

        Args:
            workers (Worker, Sequence[Worker]): workers to wait for

        Returns:
            Self: self for chaining
        """
        _workers = [workers] if isinstance(workers, Worker) else workers

        for _ in range(len(_workers)):
            self.end()

        for task in _workers:
            task.join()

        return self


def run(task: Task, *args: Any, **kwargs: Any) -> Worker:
    """Run a function as a subprocess.

    Args:
        task (Task): function to run in each subprocess

        *args (Any): additional positional arguments to `task`.

        **kwargs (Any): additional keyword arguments to `task`.

    Returns:
        Worker: worker started in a subprocess

    .. changed:: 2.0.4
       This function now returns a `Worker` instead of a `Process`.
    """
    return Worker.process(task, *args, **kwargs)


def run_thread(task: Task, *args: Any, **kwargs: Any) -> Worker:
    """Run a function as a thread.

    Args:
        task (Task): function to run in each thread

        *args (Any): additional positional arguments to `task`.

        **kwargs (Any): additional keyword arguments to `task`.

    Returns:
        Worker: worker started in a thread

    .. changed:: 2.0.4
       This function now returns a `Worker` instead of a `Thread`.
    """
    return Worker.thread(task, *args, **kwargs)


def map(
    task: Task,
    *args: Iterable[Any],
    num: Optional[int] = None,
    kind: ContextName = "process",
) -> Iterator[Any]:
    """Call a function with arguments using multiple workers.

    Args:
        func (Callable): function to call

        *args (list[Any]): arguments to `func`. If multiple lists are provided,
            they will be passed to `zip` first.

        num (int, optional): number of workers. If `None`, `NUM_CPUS` or
            `NUM_THREADS` will be used as appropriate. Defaults to `None`.

        kind (ContextName, optional): execution context to use.
            Defaults to `"process"`.

    Yields:
        Any: results from applying the function to the arguments
    """
    q, out = Q(kind=kind), Q(kind=kind)

    def worker(_q: Q, _out: Q) -> None:
        """Internal call to `func`."""
        for msg in _q.sorted():
            _out.put(data=task(*msg.data), order=msg.order)

    if kind == "process":
        workers = [Worker.process(worker, q, out) for _ in range(num or NUM_CPUS)]
    elif kind == "thread":
        workers = [Worker.thread(worker, q, out) for _ in range(num or NUM_THREADS)]
    else:  # pragma: no cover
        raise ValueError(f"Unknown worker context: {kind}")

    for order, value in enumerate(zip(*args)):
        q.put(value, order=order)
    q.stop(workers)

    for msg in out.end().sorted():
        yield msg.data

Global variables

var Task

Task function signature (any Callable).

var Context

Execution contexts (Process, Thread).

var ContextName

Execution context names ("process", "thread").

var END_MSGMsg

Message that indicates no future messages will be sent.

var NUM_CPUS : int

Number of CPUs on this machine.

var NUM_THREADS : int

Default number of threads (up to 32).

See: CPython's default for this value.

var IS_MACOS : bool

True if we're running on MacOS.

Currently, we only use this value for testing, but there are certain features that do not work properly on MacOS.

See: Example of MacOS-specific issues.

Functions

def MsgQ(maxsize=0)

Returns a queue object

Expand source code
def Queue(self, maxsize=0):
    '''Returns a queue object'''
    from .queues import Queue
    return Queue(maxsize, ctx=self.get_context())
def run(task: Callable[..., Any], *args: Any, **kwargs: Any) ‑> Worker

Run a function as a subprocess.

Args

task : Task
function to run in each subprocess
*args : Any
additional positional arguments to task.
**kwargs : Any
additional keyword arguments to task.

Returns

Worker
worker started in a subprocess

Changed: 2.0.4

This function now returns a Worker instead of a Process.

Expand source code
def run(task: Task, *args: Any, **kwargs: Any) -> Worker:
    """Run a function as a subprocess.

    Args:
        task (Task): function to run in each subprocess

        *args (Any): additional positional arguments to `task`.

        **kwargs (Any): additional keyword arguments to `task`.

    Returns:
        Worker: worker started in a subprocess

    .. changed:: 2.0.4
       This function now returns a `Worker` instead of a `Process`.
    """
    return Worker.process(task, *args, **kwargs)
def run_thread(task: Callable[..., Any], *args: Any, **kwargs: Any) ‑> Worker

Run a function as a thread.

Args

task : Task
function to run in each thread
*args : Any
additional positional arguments to task.
**kwargs : Any
additional keyword arguments to task.

Returns

Worker
worker started in a thread

Changed: 2.0.4

This function now returns a Worker instead of a Thread.

Expand source code
def run_thread(task: Task, *args: Any, **kwargs: Any) -> Worker:
    """Run a function as a thread.

    Args:
        task (Task): function to run in each thread

        *args (Any): additional positional arguments to `task`.

        **kwargs (Any): additional keyword arguments to `task`.

    Returns:
        Worker: worker started in a thread

    .. changed:: 2.0.4
       This function now returns a `Worker` instead of a `Thread`.
    """
    return Worker.thread(task, *args, **kwargs)
def map(task: Callable[..., Any], *args: Iterable[Any], num: Optional[int] = None, kind: Literal['process', 'thread'] = 'process') ‑> Iterator[Any]

Call a function with arguments using multiple workers.

Args

func : Callable
function to call
*args : list[Any]
arguments to func. If multiple lists are provided, they will be passed to zip first.
num : int, optional
number of workers. If None, NUM_CPUS or NUM_THREADS will be used as appropriate. Defaults to None.
kind : ContextName, optional
execution context to use. Defaults to "process".

Yields

Any
results from applying the function to the arguments
Expand source code
def map(
    task: Task,
    *args: Iterable[Any],
    num: Optional[int] = None,
    kind: ContextName = "process",
) -> Iterator[Any]:
    """Call a function with arguments using multiple workers.

    Args:
        func (Callable): function to call

        *args (list[Any]): arguments to `func`. If multiple lists are provided,
            they will be passed to `zip` first.

        num (int, optional): number of workers. If `None`, `NUM_CPUS` or
            `NUM_THREADS` will be used as appropriate. Defaults to `None`.

        kind (ContextName, optional): execution context to use.
            Defaults to `"process"`.

    Yields:
        Any: results from applying the function to the arguments
    """
    q, out = Q(kind=kind), Q(kind=kind)

    def worker(_q: Q, _out: Q) -> None:
        """Internal call to `func`."""
        for msg in _q.sorted():
            _out.put(data=task(*msg.data), order=msg.order)

    if kind == "process":
        workers = [Worker.process(worker, q, out) for _ in range(num or NUM_CPUS)]
    elif kind == "thread":
        workers = [Worker.thread(worker, q, out) for _ in range(num or NUM_THREADS)]
    else:  # pragma: no cover
        raise ValueError(f"Unknown worker context: {kind}")

    for order, value in enumerate(zip(*args)):
        q.put(value, order=order)
    q.stop(workers)

    for msg in out.end().sorted():
        yield msg.data

Classes

class Msg (data: Any = None, kind: str = '', order: int = 0)

Message for a queue.

Expand source code
@dataclass
class Msg:
    """Message for a queue."""

    data: Any = None
    """Message data to be transmitted."""

    kind: str = ""
    """Optional marker of message type."""

    order: int = 0
    """Optional ordering of messages."""

Class variables

var data : Any

Message data to be transmitted.

var kind : str

Optional marker of message type.

var order : int

Optional ordering of messages.

class Worker (context: Union[multiprocess.context.Process, threading.Thread])

A function running in a Process or Thread.

Construct a worker from a context.

Args

context : Context
a Process or a Thread
Expand source code
class Worker:
    """A function running in a `Process` or `Thread`."""

    _worker: Context
    """Execution context."""

    @staticmethod
    def process(task: Task, *args: Any, **kwargs: Any) -> "Worker":
        """Create a `Process`-based `Worker`.

        Args:
            task (Task): function to run
            *args (Any): additional arguments to `task`
            **kwargs (Any): additional keyword arguments to `task`

        Returns:
            Worker: wrapped worker.
        """
        # NOTE: On MacOS, python 3.8 switched the default method
        # from "fork" to "spawn" because fork is considered dangerous.
        # Some posts say "forkserver" should be ok.
        # See:  https://bugs.python.org/issue?@action=redirect&bpo=33725
        #
        # if IS_MACOS:
        #     ctx = get_context("forkserver")
        # else:
        #     ctx = get_context()
        return Worker(Process(daemon=True, target=task, args=args, kwargs=kwargs))

    @staticmethod
    def thread(task: Task, *args: Any, **kwargs: Any) -> "Worker":
        """Create a `Thread`-based `Worker`.

        Args:
            task (Task): function to run
            *args (Any): additional arguments to `task`
            **kwargs (Any): additional keyword arguments to `task`

        Returns:
            Worker: wrapped worker.
        """
        return Worker(Thread(daemon=False, target=task, args=args, kwargs=kwargs))

    def __init__(self, context: Context):
        """Construct a worker from a context.

        Args:
            context (Context): a `Process` or a `Thread`
        """
        self._worker = context
        self._worker.start()

    def __getattr__(self, name: str) -> Any:
        """Delegate properties to the underlying task.

        Args:
            name (str): attribute name

        Returns:
            Any: attribute from the task
        """
        return getattr(self._worker, name)

Static methods

def process(task: Callable[..., Any], *args: Any, **kwargs: Any) ‑> Worker

Create a Process-based Worker.

Args

task : Task
function to run
*args : Any
additional arguments to task
**kwargs : Any
additional keyword arguments to task

Returns

Worker
wrapped worker.
Expand source code
@staticmethod
def process(task: Task, *args: Any, **kwargs: Any) -> "Worker":
    """Create a `Process`-based `Worker`.

    Args:
        task (Task): function to run
        *args (Any): additional arguments to `task`
        **kwargs (Any): additional keyword arguments to `task`

    Returns:
        Worker: wrapped worker.
    """
    # NOTE: On MacOS, python 3.8 switched the default method
    # from "fork" to "spawn" because fork is considered dangerous.
    # Some posts say "forkserver" should be ok.
    # See:  https://bugs.python.org/issue?@action=redirect&bpo=33725
    #
    # if IS_MACOS:
    #     ctx = get_context("forkserver")
    # else:
    #     ctx = get_context()
    return Worker(Process(daemon=True, target=task, args=args, kwargs=kwargs))
def thread(task: Callable[..., Any], *args: Any, **kwargs: Any) ‑> Worker

Create a Thread-based Worker.

Args

task : Task
function to run
*args : Any
additional arguments to task
**kwargs : Any
additional keyword arguments to task

Returns

Worker
wrapped worker.
Expand source code
@staticmethod
def thread(task: Task, *args: Any, **kwargs: Any) -> "Worker":
    """Create a `Thread`-based `Worker`.

    Args:
        task (Task): function to run
        *args (Any): additional arguments to `task`
        **kwargs (Any): additional keyword arguments to `task`

    Returns:
        Worker: wrapped worker.
    """
    return Worker(Thread(daemon=False, target=task, args=args, kwargs=kwargs))
class Q (kind: Literal['process', 'thread'] = 'process')

Simple message queue.

Construct a queue wrapper.

Args

kind : ContextName, optional
If "thread", construct a lighter-weight Queue that is thread-safe. Otherwise, construct a full multiprocess.Queue. Defaults to "process".
Expand source code
class Q:
    """Simple message queue."""

    _q: MsgQ
    """Wrapped queue."""

    _cache: Optional[List[Msg]] = None
    """Cache of queue messages when calling `.items(cache=True)`."""

    _timeout: float = 0.05
    """Time in seconds to poll the queue."""

    def __init__(self, kind: ContextName = "process"):
        """Construct a queue wrapper.

        Args:
            kind (ContextName, optional): If `"thread"`, construct a lighter-weight
                `Queue` that is thread-safe. Otherwise, construct a full
                `multiprocess.Queue`. Defaults to `"process"`.
        """
        if kind == "process":
            self._q = Queue()
        elif kind == "thread":
            self._q = ThreadSafeQueue()
        else:  # pragma: no cover
            raise ValueError(f"Unknown queue type: {kind}")

    def __getattr__(self, name: str) -> Any:
        """Delegate properties to the underlying queue.

        Args:
            name (str): name of the attribute to access

        Returns:
            Any: attribute from the queue
        """
        return getattr(self._q, name)

    def __iter__(self) -> Iterator[Msg]:
        """Iterate over messages in a queue until `END_MSG` is received.

        Yields:
            Iterator[Msg]: iterate over messages in the queue
        """
        while True:
            try:
                msg = self._q.get(block=True, timeout=self._timeout)
                if msg.kind == END_MSG.kind:
                    # We'd really like to put the `END_MSG` back in the queue
                    # to prevent reading past the end, but in practice
                    # this often creates an uncatchable `BrokenPipeError`.
                    # q.put(END_MSG)
                    break
                yield msg
            except Empty:  # pragma: no cover
                # queue might not actually be empty
                # see: https://bugs.python.org/issue20147
                continue

    def items(self, cache: bool = False, sort: bool = False) -> Iterator[Msg]:
        """End a queue and read all the current messages.

        Args:
            cache (bool, optional): if `True`, cache the messages. This allows you
                to call this method multiple times to get the same messages.
                Defaults to `False`.

            sort (bool, optional): if `True` messages are sorted by `Msg.order`.
                Defaults to `False`.

        Yields:
            Iterator[Msg]: iterate over messages in the queue
        """
        if cache:
            if self._cache is None:  # need to build a cache
                self.end()
                self._cache = list(self.sorted() if sort else self)
            return iter(self._cache)

        # not cached
        self.end()
        return self.sorted() if sort else iter(self)

    def sorted(self, start: int = 0) -> Iterator[Msg]:
        """Iterate over messages sorted by `Msg.order`.

        NOTE: `Msg.order` must be incremented by one for each message.
        If there are any gaps, messages after the gap won't be yielded
        until the end.

        Args:
            start (int, optional): initial message number. Defaults to `0`.

        Yields:
            Iterator[Msg]: message yielded in the correct order
        """
        prev = start - 1
        key = attrgetter("order")
        waiting: List[Msg] = []
        for item in self:
            if not waiting and key(item) == prev + 1:
                prev += 1
                yield item
                continue

            # items came out of order
            waiting.append(item)
            waiting.sort(key=key, reverse=True)  # sort in-place for performance
            while waiting and key(waiting[-1]) == prev + 1:
                prev += 1
                yield waiting.pop()

        # generator ended; yield any waiting items
        while waiting:
            yield waiting.pop()

    def put(self, data: Any = None, *, kind: str = "", order: int = 0) -> "Q":
        """Put a message on the queue.

        Args:
            data (Any, optional): message data. Defaults to `None`.

            kind (str, optional): kind of message. Defaults to `""`.

            order (int, optional): message order. Defaults to `0`.

        Returns:
            Self: self for chaining
        """
        if isinstance(data, Msg):
            self._q.put(data)
        else:
            self._q.put(Msg(data=data, kind=kind, order=order))
        return self

    def end(self) -> "Q":
        """Add the `END_MSG` to indicate the end of work.

        Returns:
            Self: self for chaining
        """
        self._q.put(END_MSG)
        return self

    def stop(self, workers: Union[Worker, Sequence[Worker]]) -> "Q":
        """Use this queue to notify workers to end and wait for them to join.

        Args:
            workers (Worker, Sequence[Worker]): workers to wait for

        Returns:
            Self: self for chaining
        """
        _workers = [workers] if isinstance(workers, Worker) else workers

        for _ in range(len(_workers)):
            self.end()

        for task in _workers:
            task.join()

        return self

Methods

def items(self, cache: bool = False, sort: bool = False) ‑> Iterator[Msg]

End a queue and read all the current messages.

Args

cache : bool, optional
if True, cache the messages. This allows you to call this method multiple times to get the same messages. Defaults to False.
sort : bool, optional
if True messages are sorted by Msg.order. Defaults to False.

Yields

Iterator[Msg]
iterate over messages in the queue
Expand source code
def items(self, cache: bool = False, sort: bool = False) -> Iterator[Msg]:
    """End a queue and read all the current messages.

    Args:
        cache (bool, optional): if `True`, cache the messages. This allows you
            to call this method multiple times to get the same messages.
            Defaults to `False`.

        sort (bool, optional): if `True` messages are sorted by `Msg.order`.
            Defaults to `False`.

    Yields:
        Iterator[Msg]: iterate over messages in the queue
    """
    if cache:
        if self._cache is None:  # need to build a cache
            self.end()
            self._cache = list(self.sorted() if sort else self)
        return iter(self._cache)

    # not cached
    self.end()
    return self.sorted() if sort else iter(self)
def sorted(self, start: int = 0) ‑> Iterator[Msg]

Iterate over messages sorted by Msg.order.

NOTE: Msg.order must be incremented by one for each message. If there are any gaps, messages after the gap won't be yielded until the end.

Args

start : int, optional
initial message number. Defaults to 0.

Yields

Iterator[Msg]
message yielded in the correct order
Expand source code
def sorted(self, start: int = 0) -> Iterator[Msg]:
    """Iterate over messages sorted by `Msg.order`.

    NOTE: `Msg.order` must be incremented by one for each message.
    If there are any gaps, messages after the gap won't be yielded
    until the end.

    Args:
        start (int, optional): initial message number. Defaults to `0`.

    Yields:
        Iterator[Msg]: message yielded in the correct order
    """
    prev = start - 1
    key = attrgetter("order")
    waiting: List[Msg] = []
    for item in self:
        if not waiting and key(item) == prev + 1:
            prev += 1
            yield item
            continue

        # items came out of order
        waiting.append(item)
        waiting.sort(key=key, reverse=True)  # sort in-place for performance
        while waiting and key(waiting[-1]) == prev + 1:
            prev += 1
            yield waiting.pop()

    # generator ended; yield any waiting items
    while waiting:
        yield waiting.pop()
def put(self, data: Any = None, *, kind: str = '', order: int = 0) ‑> Q

Put a message on the queue.

Args

data : Any, optional
message data. Defaults to None.
kind : str, optional
kind of message. Defaults to "".
order : int, optional
message order. Defaults to 0.

Returns

Self
self for chaining
Expand source code
def put(self, data: Any = None, *, kind: str = "", order: int = 0) -> "Q":
    """Put a message on the queue.

    Args:
        data (Any, optional): message data. Defaults to `None`.

        kind (str, optional): kind of message. Defaults to `""`.

        order (int, optional): message order. Defaults to `0`.

    Returns:
        Self: self for chaining
    """
    if isinstance(data, Msg):
        self._q.put(data)
    else:
        self._q.put(Msg(data=data, kind=kind, order=order))
    return self
def end(self) ‑> Q

Add the END_MSG to indicate the end of work.

Returns

Self
self for chaining
Expand source code
def end(self) -> "Q":
    """Add the `END_MSG` to indicate the end of work.

    Returns:
        Self: self for chaining
    """
    self._q.put(END_MSG)
    return self
def stop(self, workers: Union[Worker, Sequence[Worker]]) ‑> Q

Use this queue to notify workers to end and wait for them to join.

Args

workers : Worker, Sequence[Worker]
workers to wait for

Returns

Self
self for chaining
Expand source code
def stop(self, workers: Union[Worker, Sequence[Worker]]) -> "Q":
    """Use this queue to notify workers to end and wait for them to join.

    Args:
        workers (Worker, Sequence[Worker]): workers to wait for

    Returns:
        Self: self for chaining
    """
    _workers = [workers] if isinstance(workers, Worker) else workers

    for _ in range(len(_workers)):
        self.end()

    for task in _workers:
        task.join()

    return self