Task Cancellation¶
When a user clicks "Stop" in the chat frontend, the platform sets a userAbortedAt flag on the assistant message in the database. The toolkit provides a CancellationWatcher to detect this flag and gracefully stop long-running agent loops or tool executions.
How It Works¶
sequenceDiagram
participant FE as Frontend
participant B as Backend
participant DB as Database
participant A as Agent (Python)
FE->>B: User clicks "Stop"
B->>DB: Set userAbortedAt on message
A->>DB: Poll for userAbortedAt
DB-->>A: userAbortedAt is set
A->>A: Set is_cancelled = True
A->>A: Notify subscribers via event bus
A->>A: Break out of loop
The CancellationWatcher is available on every ChatService instance via the cancellation property:
from unique_toolkit import ChatService
chat_service = ChatService(event)
watcher = chat_service.cancellation
Checking for Cancellation¶
Single-Shot Check¶
Use check_cancellation_async() to poll the database once. Returns True if the user has requested cancellation.
if await chat_service.cancellation.check_cancellation_async():
# handle cancellation
return
A synchronous variant is also available:
if chat_service.cancellation.check_cancellation():
return
Reading the Flag¶
After a successful check, is_cancelled stays True for the lifetime of the watcher. Use this for lightweight checks between operations without hitting the database again:
if chat_service.cancellation.is_cancelled:
break
Running a Coroutine with Cancellation¶
For long-running async operations, run_with_cancellation executes a coroutine while polling for cancellation in the background:
result = await chat_service.cancellation.run_with_cancellation(
some_long_running_coroutine(),
poll_interval=2.0,
)
If the user cancels during execution, the coroutine is cancelled and None is returned by default. You can specify a custom return value with cancel_result to avoid None checks:
result = await chat_service.cancellation.run_with_cancellation(
some_long_running_coroutine(),
cancel_result=my_default_response,
)
# result is guaranteed to be the same type as the coroutine's return
Subscribing to Cancellation Events¶
The watcher exposes a TypedEventBus via on_cancellation. Subscribe a handler to be notified the moment cancellation is detected:
from unique_toolkit.chat.cancellation import CancellationEvent
async def on_cancel(event: CancellationEvent):
logger.info(f"Cancelled: message {event.message_id}")
# perform cleanup, save partial results, etc.
sub = chat_service.cancellation.on_cancellation.subscribe(on_cancel)
try:
# ... run your agent loop ...
finally:
sub.cancel()
Both sync and async handlers are supported. The subscription is cleaned up by calling sub.cancel().
Putting It Together¶
A typical agent loop combines all three mechanisms:
async def run(self):
sub = self.chat_service.cancellation.on_cancellation.subscribe(
self._on_cancellation
)
try:
for i in range(max_iterations):
# 1. Check before starting an iteration
if await self.chat_service.cancellation.check_cancellation_async():
break
# 2. Run LLM call — stream abort is handled by the platform
response = await self._stream_complete()
# 3. Check the flag after the LLM call returns
if self.chat_service.cancellation.is_cancelled:
break
# 4. Run tools with background cancellation polling
result = await self.chat_service.cancellation.run_with_cancellation(
self._execute_tools(response),
cancel_result=default_result,
)
if self.chat_service.cancellation.is_cancelled:
break
finally:
sub.cancel()
await self.chat_service.modify_assistant_message_async(
set_completed_at=True,
)
The three layers provide defense in depth:
| Mechanism | When to use |
|---|---|
check_cancellation_async() |
At the start of each iteration — polls the DB |
is_cancelled |
Between operations — lightweight flag check, no DB call |
run_with_cancellation() |
Around long-running coroutines — automatic background polling |
API Reference¶
unique_toolkit.chat.cancellation.CancellationWatcher
¶
Polls the database for userAbortedAt and publishes to an event bus.
The watcher never raises exceptions for cancellation. Instead it:
- publishes a :class:CancellationEvent on the bus
- sets :attr:is_cancelled to True
Callers inspect :attr:is_cancelled to decide whether to stop.
Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | |
check_cancellation()
¶
Synchronous single-shot check.
Returns True if the message was cancelled. Subscribers are
notified via :meth:TypedEventBus.publish_and_wait (sync handlers
are called inline; async handlers are scheduled as tasks when a
running event loop is detected).
Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | |
check_cancellation_async()
async
¶
Poll the DB once. Returns True if the message was cancelled.
When cancellation is detected for the first time, all subscribers on the bus are notified (awaited) before this method returns.
Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | |
run_with_cancellation(coroutine, *, poll_interval=2.0, cancel_result=None)
async
¶
run_with_cancellation(
coroutine: Coroutine[Any, Any, T],
*,
poll_interval: float = ...,
cancel_result: T,
) -> T
run_with_cancellation(
coroutine: Coroutine[Any, Any, T],
*,
poll_interval: float = ...,
) -> T | None
Run coroutine while polling for cancellation in the background.
When cancelled, subscribers are notified via the bus and
:attr:is_cancelled is set to True.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
coroutine
|
Coroutine[Any, Any, Any]
|
The async coroutine to execute. |
required |
poll_interval
|
float
|
How often (in seconds) to poll for cancellation. |
2.0
|
cancel_result
|
Any
|
Value to return when cancelled. When provided
the return type matches the coroutine's return type so
callers don't need a |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
The coroutine's result on success, or cancel_result if |
Any
|
cancelled (defaults to |
Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | |
unique_toolkit.chat.cancellation.CancellationEvent
dataclass
¶
Published on the cancellation event bus when a user abort is detected.
Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
15 16 17 18 19 | |