Lifecycle & Policies¶
Queues have a well-defined lifecycle: create, start, submit tasks, run, and shutdown. This guide covers context managers, manual lifecycle, modes, failure policies, shutdown behavior, and hooks.
Context Managers (Recommended)¶
The simplest way to manage a queue's lifecycle:
On normal exit, the context manager drains the queue gracefully. On exception,
it cancels all work with force=True.
Manual Lifecycle¶
For more control, manage the lifecycle explicitly:
q = osiiso.AsyncQueue(workers=4)
await q.start()
q.submit(work, 1)
summary = await q.run()
await q.shutdown()
Use shutdown(force=True) to cancel all pending and active work immediately.
Reset and Reuse¶
After a run(), reuse the same queue object with reset():
async with osiiso.AsyncQueue(workers=4) as q:
q.submit(fetch, "first-batch")
await q.run()
q.reset()
q.submit(fetch, "second-batch")
await q.run()
What reset() clears
reset() clears results, handles, and internal state. It reopens the queue
for new submissions. You cannot call reset() while run() is in progress.
Use clear_results() to free stored result history without resetting the entire
queue:
Queue Modes¶
Finite Mode (Default)¶
mode="finite" drains all submitted tasks and stops:
This is the default. Workers process all tasks and then run() returns.
Infinite Mode¶
mode="infinite" keeps workers alive until shutdown or timeout. Use this when
producers keep adding work while workers are active:
In infinite mode, run() blocks until:
shutdown()is called, or- The
timeoutexpires
Failure Policies¶
Continue (Default)¶
fail_policy="continue" records failures and keeps processing remaining tasks:
q = osiiso.ThreadQueue(fail_policy="continue")
summary = q.run()
# summary.failed may be > 0, but all tasks were attempted
Fail First¶
fail_policy="fail_first" cancels remaining eligible work after the first failure:
You can set the policy at construction time or override it per-run:
q = osiiso.AsyncQueue(fail_policy="continue")
summary = await q.run(fail_policy="fail_first") # Override for this run
Shutdown Behavior¶
The on_exit parameter controls what happens when a timeout or forced shutdown
occurs:
Complete Priority (Default)¶
on_exit="complete_priority" cancels ordinary pending work but lets
must_complete=True tasks finish:
q.submit(save_checkpoint, data, must_complete=True, priority=0)
q.submit(optional_cleanup, priority=5)
summary = q.run(timeout=5)
# save_checkpoint finishes; optional_cleanup may be cancelled
Cancel¶
on_exit="cancel" cancels everything it can, including must_complete
tasks:
Lifecycle Hooks¶
Hooks are synchronous callbacks that fire at key points in task execution. Exceptions raised inside hooks are logged but never crash the queue.
def on_start(handle):
print(f"Starting: {handle.name}")
def on_complete(result):
print(f"Done: {result.name} → {result.status} ({result.duration:.3f}s)")
def on_retry(handle, exc):
print(f"Retrying: {handle.name} after {exc}")
q = osiiso.AsyncQueue(
on_start=on_start,
on_complete=on_complete,
on_retry=on_retry,
)
| Hook | Signature | When |
|---|---|---|
on_start |
(handle) -> None |
Just before a task begins executing |
on_complete |
(result) -> None |
Immediately after a task finishes (any status) |
on_retry |
(handle, exception) -> None |
Before a retry attempt |
Lifecycle Diagram¶
stateDiagram-v2
[*] --> Created: __init__()
Created --> Started: start() / __enter__
Started --> Running: run()
Running --> Started: run() returns
Started --> Reset: reset()
Reset --> Started: start()
Started --> Shutdown: shutdown() / __exit__
Running --> Shutdown: timeout / error
Shutdown --> [*]
Next Steps¶
- Bound Tasks — Decorator-based task registration
- Results & Summaries — Inspecting execution outcomes