Package ezq
Simple wrapper for python multiprocessing
and threading
.
Why?
ezq
makes it easy to connect subprocesses and threads (both considered "workers") using queues with a simpler API than concurrent.futures
, multiprocessing
, or threading
.
Install
python -m pip install ezq
Example: Quick Start
If you just want to apply a function to some inputs, you can use map()
to run it on all available CPUs and get the results back.
import ezq
print(list(ezq.map(lambda x: x * 2, range(6))))
# => [0, 2, 4, 6, 8, 10]
Example: Sum Messages
Here's a simple example of a worker that reads from an input queue, sums up the messages, and puts the result on an output queue.
import ezq
def worker(q, out):
"""Add up all the messages."""
total = 0
for msg in q: # read a message from the queue
total += msg.data
# after reading all the messages, write the total
out.put(total)
def main():
"""Run several workers."""
# Step 1: Creates the queues and start the workers.
q, out = ezq.Q(), ezq.Q() # input & output queues
workers = [ezq.run(worker, q, out) for _ in range(ezq.NUM_CPUS)]
# workers are all running
# Step 2: Send work to the workers.
for i in range(1000):
q.put(i) # send work
# Step 3: Tell the workers to finish.
q.stop(workers)
# workers are all stopped
# Step 4: Process the results.
want = sum(range(1000))
have = sum(msg.data for msg in out.items())
assert have == want
print(have)
if __name__ == "__main__":
main()
Typical worker lifecycle
-
The main process creates queues with
Q
. -
The main process creates workers with
run()
(alias forWorker.process()
) orrun_thread()
(alias forWorker.thread()
). -
The main process sends data using
Q.put()
. -
The worker iterates over the queue.
-
The main process ends the queue with
Q.stop()
. -
The worker returns when it reaches the end of the queue.
-
(Optional) The main process processes the results.
Process
vs Thread
ezq
supports two kinds of workers: Process
and Thread
. There is a lot of existing discussion about when to use which approach, but a general rule of thumb is:
-
Process
is for parallelism so you can use multiple CPUs at once. Ideal for CPU-bound tasks like doing lots of mathematical calculations. -
Thread
is for concurrency so you can use a single CPU to do multiple things. Ideal for I/O-bound tasks like waiting for a disk, database, or network.
Some more differences:
-
Shared memory: Each
Process
worker has data sent to it viapickle
(actuallydill
, apickle
replacement) and it doesn't share data with other workers. By contrast, eachThread
worker shares its memory with all other workers on the same CPU, so it can accidentally change global state. -
Queue overhead:
Q
has more overhead forProcess
workers thanThread
workers. -
Creating sub-workers:
Process
andThread
workers can create additionalThread
workers, but they cannot create additionalProcess
workers.
Create queues
In the main process, create the queues you'll need. Here are my common situations:
-
0 queues: I'm using a simple function and can ask
map()
to make the queues for me. -
1 queue: the worker reads from an input queue and persists the result somewhere else (e.g., writing to disk, making a network call, running some other program).
-
2 queues (most common): the worker reads from an input queue and write the results to an output queue.
-
3 queues: multiple stages of work are happening where workers are reading from one queue and writing to another queue for another worker to process.
NOTE: If you're using Thread
workers, you can save some overhead by passing Q("thread")
. This lightweight queue also doesn't use pickle
, so you can use it to pass hard-to-pickle things (e.g., database connection).
q, out = ezq.Q(), ezq.Q() # most common
q2 = ez.Q("thread") # only ok for Thread workers
A worker task is just a function
In general, there's nothing special about a worker function, but note:
-
If you're using
Process
workers, all arguments are passed throughpickle
first. -
We don't currently do anything with the return value of this function (unless you use
map()
). You'll need an output queue to return data back to the main process/thread.
Create workers
In the main process, create workers using run()
or run_thread()
which take a function and any additional parameters. Typically, you'll pass the queues you created to the workers at this point.
NOTE: Process
and Thread
workers can create additional Thread
workers, but they cannot create additional Process
workers.
Send data
Once you've created the workers, you send them data with Q.put()
which creates Msg
objects and puts them in the queue. Each message has three attributes (all optional):
-
data: Any
- This is the data you want the worker to work on. -
kind: str
- You can use this to send multiple kinds of work to the same worker. Note that the specialEND
kind is used to indicate the end of a queue. -
order: int
- This is the message order which can help you reorder results or ensure that messages from a queue are read in a particular order (that's whatQ.sorted()
uses).
Beware pickle
If you are using Process
workers, everything passed to the worker (arguments, messages) is first passed to pickle
(actually, dill
). Anything that cannot be pickled with dill (e.g., database connections), cannot be passed to Process
workers. Note that dill
can serialize many more types than pickle
(e.g. lambda
functions).
Beware shared state
If you are using Thread
workers, workers can share certain variables, so you need to be careful of how variables are access to avoid accidentally corrupting data.
Iterate over messages
Inside the worker, iterate over the queue to read each message until the queue ends (see below). If the messages need to be processed in order, use Q.sorted()
.
for msg in q: # read each message until the queue ends
...
for msg in q.sorted(): # read each message in order
...
End the queue
After the main process has sent all the data to the workers, it needs to indicate
that there's no additional work to be done. This is done by calling Q.stop()
using the input queue that the workers are reading from and passing the list of workers to wait for.
In some rare situations, you can use Q.end()
to explicitly end the queue.
Process results
If you have an output queue, you may want to to process the results. You can use Q.items()
to end the queue and read the current messages.
import ezq
out = ezq.Q()
...
result = [msg.data for msg in out.items()]
# OR
result = [msg.data for msg in out.items(sort=True)] # sorted by Msg.order
# OR
result = [msg.data for msg in out.items(cache=True)] # cache the messages
Example: Read and Write Queues
In this example, several workers read from a queue, process data, and then write to a different queue that a single worker uses to print to the screen sorting the results as it goes along.
Note that we use a single writer
to avoid clashes or overwriting.
import ezq
def printer(out: ezq.Q) -> None:
"""Print results in increasing order."""
for msg in out.sorted():
print(msg.data)
def collatz(q: ezq.Q, out: ezq.Q) -> None:
"""Read numbers and compute values."""
for msg in q:
num = float(msg.data)
if msg.kind == "EVEN":
out.put((num, num / 2), order=msg.order)
elif msg.kind == "ODD":
out.put((num, 3 * num + 1), order=msg.order)
def main() -> None:
"""Run several threads with a subprocess for printing."""
q, out = ezq.Q("thread"), ezq.Q()
readers = [ezq.run_thread(collatz, q, out) for _ in range(ezq.NUM_THREADS)]
writer = ezq.run(printer, out)
for num in range(40):
kind = "EVEN" if num % 2 == 0 else "ODD"
q.put(num, kind=kind, order=num)
q.stop(readers)
out.stop(writer)
if __name__ == "__main__":
main()
License
Expand source code
"""Simple wrapper for python `multiprocessing` and `threading`.
.. include:: ../../README.md
:start-line: 2
"""
# native
from dataclasses import dataclass
from operator import attrgetter
from os import cpu_count
from platform import system
from queue import Empty
from queue import Queue as ThreadSafeQueue
from threading import Thread
from typing import Any
from typing import Callable
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Literal
from typing import Optional
from typing import Sequence
from typing import TYPE_CHECKING
from typing import Union
# lib
from multiprocess import Process # type: ignore
from multiprocess import Queue
__all__ = (
"__version__",
"Task",
"Context",
"ContextName",
"Msg",
"END_MSG",
"MsgQ",
"NUM_CPUS",
"NUM_THREADS",
"IS_MACOS",
"Worker",
"Q",
"run",
"run_thread",
"map",
)
__version__ = "3.0.4"
Task = Callable[..., Any]
"""Task function signature (any `Callable`)."""
Context = Union[Process, Thread]
"""Execution contexts (`Process`, `Thread`)."""
ContextName = Literal["process", "thread"]
"""Execution context names (`"process"`, `"thread"`)."""
@dataclass
class Msg:
"""Message for a queue."""
data: Any = None
"""Message data to be transmitted."""
kind: str = ""
"""Optional marker of message type."""
order: int = 0
"""Optional ordering of messages."""
# NOTE: The python `queue.Queue` is not properly a generic.
# See: https://stackoverflow.com/a/48554601
# TODO [2024-10-14]: @ py3.8 EOL remove this conditional
if TYPE_CHECKING: # pragma: no cover
MsgQ = Union[Queue[Msg], ThreadSafeQueue] # pylint: disable=unsubscriptable-object
else:
MsgQ = Queue
END_MSG: Msg = Msg(kind="END")
"""Message that indicates no future messages will be sent."""
## Hardware-Specific Information ##
NUM_CPUS: int = cpu_count() or 1
"""Number of CPUs on this machine."""
NUM_THREADS: int = min(32, NUM_CPUS + 4)
"""Default number of threads (up to 32).
See: [CPython's default for this value][1].
[1]: https://github.com/python/cpython/blob/a635d6386041a2971cf1d39837188ffb8139bcc7/Lib/concurrent/futures/thread.py#L142
"""
IS_MACOS: bool = system().lower().startswith("darwin")
"""`True` if we're running on MacOS.
Currently, we only use this value for testing, but there are certain features that
do not work properly on MacOS.
See: [Example of MacOS-specific issues][1].
[1]: https://github.com/python/cpython/blob/c5b670efd1e6dabc94b6308734d63f762480b80f/Lib/multiprocessing/queues.py#L125
"""
class Worker:
"""A function running in a `Process` or `Thread`."""
_worker: Context
"""Execution context."""
@staticmethod
def process(task: Task, *args: Any, **kwargs: Any) -> "Worker":
"""Create a `Process`-based `Worker`.
Args:
task (Task): function to run
*args (Any): additional arguments to `task`
**kwargs (Any): additional keyword arguments to `task`
Returns:
Worker: wrapped worker.
"""
# NOTE: On MacOS, python 3.8 switched the default method
# from "fork" to "spawn" because fork is considered dangerous.
# Some posts say "forkserver" should be ok.
# See: https://bugs.python.org/issue?@action=redirect&bpo=33725
#
# if IS_MACOS:
# ctx = get_context("forkserver")
# else:
# ctx = get_context()
return Worker(Process(daemon=True, target=task, args=args, kwargs=kwargs))
@staticmethod
def thread(task: Task, *args: Any, **kwargs: Any) -> "Worker":
"""Create a `Thread`-based `Worker`.
Args:
task (Task): function to run
*args (Any): additional arguments to `task`
**kwargs (Any): additional keyword arguments to `task`
Returns:
Worker: wrapped worker.
"""
return Worker(Thread(daemon=False, target=task, args=args, kwargs=kwargs))
def __init__(self, context: Context):
"""Construct a worker from a context.
Args:
context (Context): a `Process` or a `Thread`
"""
self._worker = context
self._worker.start()
def __getattr__(self, name: str) -> Any:
"""Delegate properties to the underlying task.
Args:
name (str): attribute name
Returns:
Any: attribute from the task
"""
return getattr(self._worker, name)
class Q:
"""Simple message queue."""
_q: MsgQ
"""Wrapped queue."""
_cache: Optional[List[Msg]] = None
"""Cache of queue messages when calling `.items(cache=True)`."""
_timeout: float = 0.05
"""Time in seconds to poll the queue."""
def __init__(self, kind: ContextName = "process"):
"""Construct a queue wrapper.
Args:
kind (ContextName, optional): If `"thread"`, construct a lighter-weight
`Queue` that is thread-safe. Otherwise, construct a full
`multiprocess.Queue`. Defaults to `"process"`.
"""
if kind == "process":
self._q = Queue()
elif kind == "thread":
self._q = ThreadSafeQueue()
else: # pragma: no cover
raise ValueError(f"Unknown queue type: {kind}")
def __getattr__(self, name: str) -> Any:
"""Delegate properties to the underlying queue.
Args:
name (str): name of the attribute to access
Returns:
Any: attribute from the queue
"""
return getattr(self._q, name)
def __iter__(self) -> Iterator[Msg]:
"""Iterate over messages in a queue until `END_MSG` is received.
Yields:
Iterator[Msg]: iterate over messages in the queue
"""
while True:
try:
msg = self._q.get(block=True, timeout=self._timeout)
if msg.kind == END_MSG.kind:
# We'd really like to put the `END_MSG` back in the queue
# to prevent reading past the end, but in practice
# this often creates an uncatchable `BrokenPipeError`.
# q.put(END_MSG)
break
yield msg
except Empty: # pragma: no cover
# queue might not actually be empty
# see: https://bugs.python.org/issue20147
continue
def items(self, cache: bool = False, sort: bool = False) -> Iterator[Msg]:
"""End a queue and read all the current messages.
Args:
cache (bool, optional): if `True`, cache the messages. This allows you
to call this method multiple times to get the same messages.
Defaults to `False`.
sort (bool, optional): if `True` messages are sorted by `Msg.order`.
Defaults to `False`.
Yields:
Iterator[Msg]: iterate over messages in the queue
"""
if cache:
if self._cache is None: # need to build a cache
self.end()
self._cache = list(self.sorted() if sort else self)
return iter(self._cache)
# not cached
self.end()
return self.sorted() if sort else iter(self)
def sorted(self, start: int = 0) -> Iterator[Msg]:
"""Iterate over messages sorted by `Msg.order`.
NOTE: `Msg.order` must be incremented by one for each message.
If there are any gaps, messages after the gap won't be yielded
until the end.
Args:
start (int, optional): initial message number. Defaults to `0`.
Yields:
Iterator[Msg]: message yielded in the correct order
"""
prev = start - 1
key = attrgetter("order")
waiting: List[Msg] = []
for item in self:
if not waiting and key(item) == prev + 1:
prev += 1
yield item
continue
# items came out of order
waiting.append(item)
waiting.sort(key=key, reverse=True) # sort in-place for performance
while waiting and key(waiting[-1]) == prev + 1:
prev += 1
yield waiting.pop()
# generator ended; yield any waiting items
while waiting:
yield waiting.pop()
def put(self, data: Any = None, *, kind: str = "", order: int = 0) -> "Q":
"""Put a message on the queue.
Args:
data (Any, optional): message data. Defaults to `None`.
kind (str, optional): kind of message. Defaults to `""`.
order (int, optional): message order. Defaults to `0`.
Returns:
Self: self for chaining
"""
if isinstance(data, Msg):
self._q.put(data)
else:
self._q.put(Msg(data=data, kind=kind, order=order))
return self
def end(self) -> "Q":
"""Add the `END_MSG` to indicate the end of work.
Returns:
Self: self for chaining
"""
self._q.put(END_MSG)
return self
def stop(self, workers: Union[Worker, Sequence[Worker]]) -> "Q":
"""Use this queue to notify workers to end and wait for them to join.
Args:
workers (Worker, Sequence[Worker]): workers to wait for
Returns:
Self: self for chaining
"""
_workers = [workers] if isinstance(workers, Worker) else workers
for _ in range(len(_workers)):
self.end()
for task in _workers:
task.join()
return self
def run(task: Task, *args: Any, **kwargs: Any) -> Worker:
"""Run a function as a subprocess.
Args:
task (Task): function to run in each subprocess
*args (Any): additional positional arguments to `task`.
**kwargs (Any): additional keyword arguments to `task`.
Returns:
Worker: worker started in a subprocess
.. changed:: 2.0.4
This function now returns a `Worker` instead of a `Process`.
"""
return Worker.process(task, *args, **kwargs)
def run_thread(task: Task, *args: Any, **kwargs: Any) -> Worker:
"""Run a function as a thread.
Args:
task (Task): function to run in each thread
*args (Any): additional positional arguments to `task`.
**kwargs (Any): additional keyword arguments to `task`.
Returns:
Worker: worker started in a thread
.. changed:: 2.0.4
This function now returns a `Worker` instead of a `Thread`.
"""
return Worker.thread(task, *args, **kwargs)
def map(
task: Task,
*args: Iterable[Any],
num: Optional[int] = None,
kind: ContextName = "process",
) -> Iterator[Any]:
"""Call a function with arguments using multiple workers.
Args:
func (Callable): function to call
*args (list[Any]): arguments to `func`. If multiple lists are provided,
they will be passed to `zip` first.
num (int, optional): number of workers. If `None`, `NUM_CPUS` or
`NUM_THREADS` will be used as appropriate. Defaults to `None`.
kind (ContextName, optional): execution context to use.
Defaults to `"process"`.
Yields:
Any: results from applying the function to the arguments
"""
q, out = Q(kind=kind), Q(kind=kind)
def worker(_q: Q, _out: Q) -> None:
"""Internal call to `func`."""
for msg in _q.sorted():
_out.put(data=task(*msg.data), order=msg.order)
if kind == "process":
workers = [Worker.process(worker, q, out) for _ in range(num or NUM_CPUS)]
elif kind == "thread":
workers = [Worker.thread(worker, q, out) for _ in range(num or NUM_THREADS)]
else: # pragma: no cover
raise ValueError(f"Unknown worker context: {kind}")
for order, value in enumerate(zip(*args)):
q.put(value, order=order)
q.stop(workers)
for msg in out.end().sorted():
yield msg.data
Global variables
var Task
-
Task function signature (any
Callable
). var Context
-
Execution contexts (
Process
,Thread
). var ContextName
-
Execution context names (
"process"
,"thread"
). var END_MSG : Msg
-
Message that indicates no future messages will be sent.
var NUM_CPUS : int
-
Number of CPUs on this machine.
var NUM_THREADS : int
-
Default number of threads (up to 32).
var IS_MACOS : bool
-
True
if we're running on MacOS.Currently, we only use this value for testing, but there are certain features that do not work properly on MacOS.
Functions
def MsgQ(maxsize=0)
-
Returns a queue object
Expand source code
def Queue(self, maxsize=0): '''Returns a queue object''' from .queues import Queue return Queue(maxsize, ctx=self.get_context())
def run(task: Callable[..., Any], *args: Any, **kwargs: Any) ‑> Worker
-
Run a function as a subprocess.
Args
task
:Task
- function to run in each subprocess
*args
:Any
- additional positional arguments to
task
. **kwargs
:Any
- additional keyword arguments to
task
.
Returns
Worker
- worker started in a subprocess
Changed: 2.0.4
This function now returns a
Worker
instead of aProcess
.Expand source code
def run(task: Task, *args: Any, **kwargs: Any) -> Worker: """Run a function as a subprocess. Args: task (Task): function to run in each subprocess *args (Any): additional positional arguments to `task`. **kwargs (Any): additional keyword arguments to `task`. Returns: Worker: worker started in a subprocess .. changed:: 2.0.4 This function now returns a `Worker` instead of a `Process`. """ return Worker.process(task, *args, **kwargs)
def run_thread(task: Callable[..., Any], *args: Any, **kwargs: Any) ‑> Worker
-
Run a function as a thread.
Args
task
:Task
- function to run in each thread
*args
:Any
- additional positional arguments to
task
. **kwargs
:Any
- additional keyword arguments to
task
.
Returns
Worker
- worker started in a thread
Changed: 2.0.4
This function now returns a
Worker
instead of aThread
.Expand source code
def run_thread(task: Task, *args: Any, **kwargs: Any) -> Worker: """Run a function as a thread. Args: task (Task): function to run in each thread *args (Any): additional positional arguments to `task`. **kwargs (Any): additional keyword arguments to `task`. Returns: Worker: worker started in a thread .. changed:: 2.0.4 This function now returns a `Worker` instead of a `Thread`. """ return Worker.thread(task, *args, **kwargs)
def map(task: Callable[..., Any], *args: Iterable[Any], num: Optional[int] = None, kind: Literal['process', 'thread'] = 'process') ‑> Iterator[Any]
-
Call a function with arguments using multiple workers.
Args
func
:Callable
- function to call
*args
:list[Any]
- arguments to
func
. If multiple lists are provided, they will be passed tozip
first. num
:int
, optional- number of workers. If
None
,NUM_CPUS
orNUM_THREADS
will be used as appropriate. Defaults toNone
. kind
:ContextName
, optional- execution context to use.
Defaults to
"process"
.
Yields
Any
- results from applying the function to the arguments
Expand source code
def map( task: Task, *args: Iterable[Any], num: Optional[int] = None, kind: ContextName = "process", ) -> Iterator[Any]: """Call a function with arguments using multiple workers. Args: func (Callable): function to call *args (list[Any]): arguments to `func`. If multiple lists are provided, they will be passed to `zip` first. num (int, optional): number of workers. If `None`, `NUM_CPUS` or `NUM_THREADS` will be used as appropriate. Defaults to `None`. kind (ContextName, optional): execution context to use. Defaults to `"process"`. Yields: Any: results from applying the function to the arguments """ q, out = Q(kind=kind), Q(kind=kind) def worker(_q: Q, _out: Q) -> None: """Internal call to `func`.""" for msg in _q.sorted(): _out.put(data=task(*msg.data), order=msg.order) if kind == "process": workers = [Worker.process(worker, q, out) for _ in range(num or NUM_CPUS)] elif kind == "thread": workers = [Worker.thread(worker, q, out) for _ in range(num or NUM_THREADS)] else: # pragma: no cover raise ValueError(f"Unknown worker context: {kind}") for order, value in enumerate(zip(*args)): q.put(value, order=order) q.stop(workers) for msg in out.end().sorted(): yield msg.data
Classes
class Msg (data: Any = None, kind: str = '', order: int = 0)
-
Message for a queue.
Expand source code
@dataclass class Msg: """Message for a queue.""" data: Any = None """Message data to be transmitted.""" kind: str = "" """Optional marker of message type.""" order: int = 0 """Optional ordering of messages."""
Class variables
var data : Any
-
Message data to be transmitted.
var kind : str
-
Optional marker of message type.
var order : int
-
Optional ordering of messages.
class Worker (context: Union[multiprocess.context.Process, threading.Thread])
-
A function running in a
Process
orThread
.Construct a worker from a context.
Args
context
:Context
- a
Process
or aThread
Expand source code
class Worker: """A function running in a `Process` or `Thread`.""" _worker: Context """Execution context.""" @staticmethod def process(task: Task, *args: Any, **kwargs: Any) -> "Worker": """Create a `Process`-based `Worker`. Args: task (Task): function to run *args (Any): additional arguments to `task` **kwargs (Any): additional keyword arguments to `task` Returns: Worker: wrapped worker. """ # NOTE: On MacOS, python 3.8 switched the default method # from "fork" to "spawn" because fork is considered dangerous. # Some posts say "forkserver" should be ok. # See: https://bugs.python.org/issue?@action=redirect&bpo=33725 # # if IS_MACOS: # ctx = get_context("forkserver") # else: # ctx = get_context() return Worker(Process(daemon=True, target=task, args=args, kwargs=kwargs)) @staticmethod def thread(task: Task, *args: Any, **kwargs: Any) -> "Worker": """Create a `Thread`-based `Worker`. Args: task (Task): function to run *args (Any): additional arguments to `task` **kwargs (Any): additional keyword arguments to `task` Returns: Worker: wrapped worker. """ return Worker(Thread(daemon=False, target=task, args=args, kwargs=kwargs)) def __init__(self, context: Context): """Construct a worker from a context. Args: context (Context): a `Process` or a `Thread` """ self._worker = context self._worker.start() def __getattr__(self, name: str) -> Any: """Delegate properties to the underlying task. Args: name (str): attribute name Returns: Any: attribute from the task """ return getattr(self._worker, name)
Static methods
def process(task: Callable[..., Any], *args: Any, **kwargs: Any) ‑> Worker
-
Create a
Process
-basedWorker
.Args
task
:Task
- function to run
*args
:Any
- additional arguments to
task
**kwargs
:Any
- additional keyword arguments to
task
Returns
Worker
- wrapped worker.
Expand source code
@staticmethod def process(task: Task, *args: Any, **kwargs: Any) -> "Worker": """Create a `Process`-based `Worker`. Args: task (Task): function to run *args (Any): additional arguments to `task` **kwargs (Any): additional keyword arguments to `task` Returns: Worker: wrapped worker. """ # NOTE: On MacOS, python 3.8 switched the default method # from "fork" to "spawn" because fork is considered dangerous. # Some posts say "forkserver" should be ok. # See: https://bugs.python.org/issue?@action=redirect&bpo=33725 # # if IS_MACOS: # ctx = get_context("forkserver") # else: # ctx = get_context() return Worker(Process(daemon=True, target=task, args=args, kwargs=kwargs))
def thread(task: Callable[..., Any], *args: Any, **kwargs: Any) ‑> Worker
-
Create a
Thread
-basedWorker
.Args
task
:Task
- function to run
*args
:Any
- additional arguments to
task
**kwargs
:Any
- additional keyword arguments to
task
Returns
Worker
- wrapped worker.
Expand source code
@staticmethod def thread(task: Task, *args: Any, **kwargs: Any) -> "Worker": """Create a `Thread`-based `Worker`. Args: task (Task): function to run *args (Any): additional arguments to `task` **kwargs (Any): additional keyword arguments to `task` Returns: Worker: wrapped worker. """ return Worker(Thread(daemon=False, target=task, args=args, kwargs=kwargs))
class Q (kind: Literal['process', 'thread'] = 'process')
-
Simple message queue.
Construct a queue wrapper.
Args
kind
:ContextName
, optional- If
"thread"
, construct a lighter-weightQueue
that is thread-safe. Otherwise, construct a fullmultiprocess.Queue
. Defaults to"process"
.
Expand source code
class Q: """Simple message queue.""" _q: MsgQ """Wrapped queue.""" _cache: Optional[List[Msg]] = None """Cache of queue messages when calling `.items(cache=True)`.""" _timeout: float = 0.05 """Time in seconds to poll the queue.""" def __init__(self, kind: ContextName = "process"): """Construct a queue wrapper. Args: kind (ContextName, optional): If `"thread"`, construct a lighter-weight `Queue` that is thread-safe. Otherwise, construct a full `multiprocess.Queue`. Defaults to `"process"`. """ if kind == "process": self._q = Queue() elif kind == "thread": self._q = ThreadSafeQueue() else: # pragma: no cover raise ValueError(f"Unknown queue type: {kind}") def __getattr__(self, name: str) -> Any: """Delegate properties to the underlying queue. Args: name (str): name of the attribute to access Returns: Any: attribute from the queue """ return getattr(self._q, name) def __iter__(self) -> Iterator[Msg]: """Iterate over messages in a queue until `END_MSG` is received. Yields: Iterator[Msg]: iterate over messages in the queue """ while True: try: msg = self._q.get(block=True, timeout=self._timeout) if msg.kind == END_MSG.kind: # We'd really like to put the `END_MSG` back in the queue # to prevent reading past the end, but in practice # this often creates an uncatchable `BrokenPipeError`. # q.put(END_MSG) break yield msg except Empty: # pragma: no cover # queue might not actually be empty # see: https://bugs.python.org/issue20147 continue def items(self, cache: bool = False, sort: bool = False) -> Iterator[Msg]: """End a queue and read all the current messages. Args: cache (bool, optional): if `True`, cache the messages. This allows you to call this method multiple times to get the same messages. Defaults to `False`. sort (bool, optional): if `True` messages are sorted by `Msg.order`. Defaults to `False`. Yields: Iterator[Msg]: iterate over messages in the queue """ if cache: if self._cache is None: # need to build a cache self.end() self._cache = list(self.sorted() if sort else self) return iter(self._cache) # not cached self.end() return self.sorted() if sort else iter(self) def sorted(self, start: int = 0) -> Iterator[Msg]: """Iterate over messages sorted by `Msg.order`. NOTE: `Msg.order` must be incremented by one for each message. If there are any gaps, messages after the gap won't be yielded until the end. Args: start (int, optional): initial message number. Defaults to `0`. Yields: Iterator[Msg]: message yielded in the correct order """ prev = start - 1 key = attrgetter("order") waiting: List[Msg] = [] for item in self: if not waiting and key(item) == prev + 1: prev += 1 yield item continue # items came out of order waiting.append(item) waiting.sort(key=key, reverse=True) # sort in-place for performance while waiting and key(waiting[-1]) == prev + 1: prev += 1 yield waiting.pop() # generator ended; yield any waiting items while waiting: yield waiting.pop() def put(self, data: Any = None, *, kind: str = "", order: int = 0) -> "Q": """Put a message on the queue. Args: data (Any, optional): message data. Defaults to `None`. kind (str, optional): kind of message. Defaults to `""`. order (int, optional): message order. Defaults to `0`. Returns: Self: self for chaining """ if isinstance(data, Msg): self._q.put(data) else: self._q.put(Msg(data=data, kind=kind, order=order)) return self def end(self) -> "Q": """Add the `END_MSG` to indicate the end of work. Returns: Self: self for chaining """ self._q.put(END_MSG) return self def stop(self, workers: Union[Worker, Sequence[Worker]]) -> "Q": """Use this queue to notify workers to end and wait for them to join. Args: workers (Worker, Sequence[Worker]): workers to wait for Returns: Self: self for chaining """ _workers = [workers] if isinstance(workers, Worker) else workers for _ in range(len(_workers)): self.end() for task in _workers: task.join() return self
Methods
def items(self, cache: bool = False, sort: bool = False) ‑> Iterator[Msg]
-
End a queue and read all the current messages.
Args
cache
:bool
, optional- if
True
, cache the messages. This allows you to call this method multiple times to get the same messages. Defaults toFalse
. sort
:bool
, optional- if
True
messages are sorted byMsg.order
. Defaults toFalse
.
Yields
Iterator[Msg]
- iterate over messages in the queue
Expand source code
def items(self, cache: bool = False, sort: bool = False) -> Iterator[Msg]: """End a queue and read all the current messages. Args: cache (bool, optional): if `True`, cache the messages. This allows you to call this method multiple times to get the same messages. Defaults to `False`. sort (bool, optional): if `True` messages are sorted by `Msg.order`. Defaults to `False`. Yields: Iterator[Msg]: iterate over messages in the queue """ if cache: if self._cache is None: # need to build a cache self.end() self._cache = list(self.sorted() if sort else self) return iter(self._cache) # not cached self.end() return self.sorted() if sort else iter(self)
def sorted(self, start: int = 0) ‑> Iterator[Msg]
-
Iterate over messages sorted by
Msg.order
.NOTE:
Msg.order
must be incremented by one for each message. If there are any gaps, messages after the gap won't be yielded until the end.Args
start
:int
, optional- initial message number. Defaults to
0
.
Yields
Iterator[Msg]
- message yielded in the correct order
Expand source code
def sorted(self, start: int = 0) -> Iterator[Msg]: """Iterate over messages sorted by `Msg.order`. NOTE: `Msg.order` must be incremented by one for each message. If there are any gaps, messages after the gap won't be yielded until the end. Args: start (int, optional): initial message number. Defaults to `0`. Yields: Iterator[Msg]: message yielded in the correct order """ prev = start - 1 key = attrgetter("order") waiting: List[Msg] = [] for item in self: if not waiting and key(item) == prev + 1: prev += 1 yield item continue # items came out of order waiting.append(item) waiting.sort(key=key, reverse=True) # sort in-place for performance while waiting and key(waiting[-1]) == prev + 1: prev += 1 yield waiting.pop() # generator ended; yield any waiting items while waiting: yield waiting.pop()
def put(self, data: Any = None, *, kind: str = '', order: int = 0) ‑> Q
-
Put a message on the queue.
Args
data
:Any
, optional- message data. Defaults to
None
. kind
:str
, optional- kind of message. Defaults to
""
. order
:int
, optional- message order. Defaults to
0
.
Returns
Self
- self for chaining
Expand source code
def put(self, data: Any = None, *, kind: str = "", order: int = 0) -> "Q": """Put a message on the queue. Args: data (Any, optional): message data. Defaults to `None`. kind (str, optional): kind of message. Defaults to `""`. order (int, optional): message order. Defaults to `0`. Returns: Self: self for chaining """ if isinstance(data, Msg): self._q.put(data) else: self._q.put(Msg(data=data, kind=kind, order=order)) return self
def end(self) ‑> Q
-
Expand source code
def end(self) -> "Q": """Add the `END_MSG` to indicate the end of work. Returns: Self: self for chaining """ self._q.put(END_MSG) return self
def stop(self, workers: Union[Worker, Sequence[Worker]]) ‑> Q
-
Use this queue to notify workers to end and wait for them to join.
Args
Returns
Self
- self for chaining
Expand source code
def stop(self, workers: Union[Worker, Sequence[Worker]]) -> "Q": """Use this queue to notify workers to end and wait for them to join. Args: workers (Worker, Sequence[Worker]): workers to wait for Returns: Self: self for chaining """ _workers = [workers] if isinstance(workers, Worker) else workers for _ in range(len(_workers)): self.end() for task in _workers: task.join() return self