Skip to content

Task Cancellation

When a user clicks "Stop" in the chat frontend, the platform sets a userAbortedAt flag on the assistant message in the database. The toolkit provides a CancellationWatcher to detect this flag and gracefully stop long-running agent loops or tool executions.

How It Works

sequenceDiagram
    participant FE as Frontend
    participant B as Backend
    participant DB as Database
    participant A as Agent (Python)

    FE->>B: User clicks "Stop"
    B->>DB: Set userAbortedAt on message
    A->>DB: Poll for userAbortedAt
    DB-->>A: userAbortedAt is set
    A->>A: Set is_cancelled = True
    A->>A: Notify subscribers via event bus
    A->>A: Break out of loop

The CancellationWatcher is available on every ChatService instance via the cancellation property:

#cancellation-init-watcher
from unique_toolkit import ChatService

chat_service = ChatService(event)
watcher = chat_service.cancellation

Checking for Cancellation

Single-Shot Check

Use check_cancellation_async() to poll the database once. Returns True if the user has requested cancellation.

#cancellation-check-async
if await chat_service.cancellation.check_cancellation_async():
    # handle cancellation
    return

A synchronous variant is also available:

#cancellation-check-sync
if chat_service.cancellation.check_cancellation():
    return

Reading the Flag

After a successful check, is_cancelled stays True for the lifetime of the watcher. Use this for lightweight checks between operations without hitting the database again:

#cancellation-is-cancelled-flag
if chat_service.cancellation.is_cancelled:
    break

Running a Coroutine with Cancellation

For long-running async operations, run_with_cancellation executes a coroutine while polling for cancellation in the background:

#cancellation-run-with-cancellation
result = await chat_service.cancellation.run_with_cancellation(
    some_long_running_coroutine(),
    poll_interval=2.0,
)

If the user cancels during execution, the coroutine is cancelled and None is returned by default. You can specify a custom return value with cancel_result to avoid None checks:

#cancellation-run-with-cancel-result
result = await chat_service.cancellation.run_with_cancellation(
    some_long_running_coroutine(),
    cancel_result=my_default_response,
)
# result is guaranteed to be the same type as the coroutine's return

Subscribing to Cancellation Events

The watcher exposes a TypedEventBus via on_cancellation. Subscribe a handler to be notified the moment cancellation is detected:

#cancellation-subscribe-event-bus
from unique_toolkit.chat.cancellation import CancellationEvent

async def on_cancel(event: CancellationEvent):
    logger.info(f"Cancelled: message {event.message_id}")
    # perform cleanup, save partial results, etc.

sub = chat_service.cancellation.on_cancellation.subscribe(on_cancel)
try:
    # ... run your agent loop ...
finally:
    sub.cancel()

Both sync and async handlers are supported. The subscription is cleaned up by calling sub.cancel().

Putting It Together

A typical agent loop combines all three mechanisms:

#cancellation-agent-loop-combined
async def run(self):
    sub = self.chat_service.cancellation.on_cancellation.subscribe(
        self._on_cancellation
    )
    try:
        for i in range(max_iterations):
            # 1. Check before starting an iteration
            if await self.chat_service.cancellation.check_cancellation_async():
                break

            # 2. Run LLM call — stream abort is handled by the platform
            response = await self._stream_complete()

            # 3. Check the flag after the LLM call returns
            if self.chat_service.cancellation.is_cancelled:
                break

            # 4. Run tools with background cancellation polling
            result = await self.chat_service.cancellation.run_with_cancellation(
                self._execute_tools(response),
                cancel_result=default_result,
            )

            if self.chat_service.cancellation.is_cancelled:
                break
    finally:
        sub.cancel()
        await self.chat_service.modify_assistant_message_async(
            set_completed_at=True,
        )

The three layers provide defense in depth:

Mechanism When to use
check_cancellation_async() At the start of each iteration — polls the DB
is_cancelled Between operations — lightweight flag check, no DB call
run_with_cancellation() Around long-running coroutines — automatic background polling

API Reference

unique_toolkit.chat.cancellation.CancellationWatcher

Polls the database for userAbortedAt and publishes to an event bus.

The watcher never raises exceptions for cancellation. Instead it: - publishes a :class:CancellationEvent on the bus - sets :attr:is_cancelled to True

Callers inspect :attr:is_cancelled to decide whether to stop.

Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class CancellationWatcher:
    """Polls the database for ``userAbortedAt`` and publishes to an event bus.

    The watcher never raises exceptions for cancellation. Instead it:
    - publishes a :class:`CancellationEvent` on the bus
    - sets :attr:`is_cancelled` to ``True``

    Callers inspect :attr:`is_cancelled` to decide whether to stop.
    """

    def __init__(
        self,
        *,
        user_id: str,
        company_id: str,
        chat_id: str,
        assistant_message_id: str,
    ) -> None:
        self._bus: TypedEventBus[CancellationEvent] = TypedEventBus()
        self._user_id = user_id
        self._company_id = company_id
        self._chat_id = chat_id
        self._assistant_message_id = assistant_message_id
        self._cancelled = False

    @property
    def is_cancelled(self) -> bool:
        return self._cancelled

    @property
    def on_cancellation(self) -> TypedEventBus[CancellationEvent]:
        return self._bus

    async def check_cancellation_async(self) -> bool:
        """Poll the DB once.  Returns ``True`` if the message was cancelled.

        When cancellation is detected for the first time, all subscribers
        on the bus are notified (awaited) before this method returns.
        """
        if self._cancelled:
            return True
        try:
            raw_msg = await unique_sdk.Message.retrieve_async(
                user_id=self._user_id,
                company_id=self._company_id,
                id=self._assistant_message_id,
                chatId=self._chat_id,
            )
            user_aborted_at = getattr(raw_msg, "userAbortedAt", None)
            if user_aborted_at is not None:
                self._cancelled = True
                event = CancellationEvent(message_id=self._assistant_message_id)
                await self._bus.publish_and_wait_async(event)
                return True
        except Exception as exc:
            logger.warning(
                "Failed to check cancellation: %s: %s", type(exc).__name__, exc
            )
        return False

    def check_cancellation(self) -> bool:
        """Synchronous single-shot check.

        Returns ``True`` if the message was cancelled.  Subscribers are
        notified via :meth:`TypedEventBus.publish_and_wait` (sync handlers
        are called inline; async handlers are scheduled as tasks when a
        running event loop is detected).
        """
        if self._cancelled:
            return True
        try:
            raw_msg = unique_sdk.Message.retrieve(
                user_id=self._user_id,
                company_id=self._company_id,
                id=self._assistant_message_id,
                chatId=self._chat_id,
            )
            user_aborted_at = getattr(raw_msg, "userAbortedAt", None)
            if user_aborted_at is not None:
                self._cancelled = True
                event = CancellationEvent(message_id=self._assistant_message_id)
                self._bus.publish_and_wait(event)
                return True
        except Exception as exc:
            logger.warning(
                "Failed to check cancellation: %s: %s", type(exc).__name__, exc
            )
        return False

    @overload
    async def run_with_cancellation(
        self,
        coroutine: Coroutine[Any, Any, T],
        *,
        poll_interval: float = ...,
        cancel_result: T,
    ) -> T: ...

    @overload
    async def run_with_cancellation(
        self,
        coroutine: Coroutine[Any, Any, T],
        *,
        poll_interval: float = ...,
    ) -> T | None: ...

    async def run_with_cancellation(
        self,
        coroutine: Coroutine[Any, Any, Any],
        *,
        poll_interval: float = 2.0,
        cancel_result: Any = None,
    ) -> Any:
        """Run *coroutine* while polling for cancellation in the background.

        When cancelled, subscribers are notified via the bus and
        :attr:`is_cancelled` is set to ``True``.

        Args:
            coroutine: The async coroutine to execute.
            poll_interval: How often (in seconds) to poll for cancellation.
            cancel_result: Value to return when cancelled.  When provided
                the return type matches the coroutine's return type so
                callers don't need a ``None`` check.

        Returns:
            The coroutine's result on success, or *cancel_result* if
            cancelled (defaults to ``None``).
        """
        task = asyncio.create_task(coroutine)

        async def _watcher() -> None:
            while not task.done():
                cancelled = await self.check_cancellation_async()
                if cancelled:
                    task.cancel()
                    return
                await asyncio.sleep(poll_interval)

        watcher = asyncio.create_task(_watcher())
        try:
            return await task
        except asyncio.CancelledError:
            return cancel_result
        finally:
            watcher.cancel()
            try:
                await watcher
            except asyncio.CancelledError:
                pass

check_cancellation()

Synchronous single-shot check.

Returns True if the message was cancelled. Subscribers are notified via :meth:TypedEventBus.publish_and_wait (sync handlers are called inline; async handlers are scheduled as tasks when a running event loop is detected).

Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def check_cancellation(self) -> bool:
    """Synchronous single-shot check.

    Returns ``True`` if the message was cancelled.  Subscribers are
    notified via :meth:`TypedEventBus.publish_and_wait` (sync handlers
    are called inline; async handlers are scheduled as tasks when a
    running event loop is detected).
    """
    if self._cancelled:
        return True
    try:
        raw_msg = unique_sdk.Message.retrieve(
            user_id=self._user_id,
            company_id=self._company_id,
            id=self._assistant_message_id,
            chatId=self._chat_id,
        )
        user_aborted_at = getattr(raw_msg, "userAbortedAt", None)
        if user_aborted_at is not None:
            self._cancelled = True
            event = CancellationEvent(message_id=self._assistant_message_id)
            self._bus.publish_and_wait(event)
            return True
    except Exception as exc:
        logger.warning(
            "Failed to check cancellation: %s: %s", type(exc).__name__, exc
        )
    return False

check_cancellation_async() async

Poll the DB once. Returns True if the message was cancelled.

When cancellation is detected for the first time, all subscribers on the bus are notified (awaited) before this method returns.

Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
async def check_cancellation_async(self) -> bool:
    """Poll the DB once.  Returns ``True`` if the message was cancelled.

    When cancellation is detected for the first time, all subscribers
    on the bus are notified (awaited) before this method returns.
    """
    if self._cancelled:
        return True
    try:
        raw_msg = await unique_sdk.Message.retrieve_async(
            user_id=self._user_id,
            company_id=self._company_id,
            id=self._assistant_message_id,
            chatId=self._chat_id,
        )
        user_aborted_at = getattr(raw_msg, "userAbortedAt", None)
        if user_aborted_at is not None:
            self._cancelled = True
            event = CancellationEvent(message_id=self._assistant_message_id)
            await self._bus.publish_and_wait_async(event)
            return True
    except Exception as exc:
        logger.warning(
            "Failed to check cancellation: %s: %s", type(exc).__name__, exc
        )
    return False

run_with_cancellation(coroutine, *, poll_interval=2.0, cancel_result=None) async

run_with_cancellation(
    coroutine: Coroutine[Any, Any, T],
    *,
    poll_interval: float = ...,
    cancel_result: T,
) -> T
run_with_cancellation(
    coroutine: Coroutine[Any, Any, T],
    *,
    poll_interval: float = ...,
) -> T | None

Run coroutine while polling for cancellation in the background.

When cancelled, subscribers are notified via the bus and :attr:is_cancelled is set to True.

Parameters:

Name Type Description Default
coroutine Coroutine[Any, Any, Any]

The async coroutine to execute.

required
poll_interval float

How often (in seconds) to poll for cancellation.

2.0
cancel_result Any

Value to return when cancelled. When provided the return type matches the coroutine's return type so callers don't need a None check.

None

Returns:

Type Description
Any

The coroutine's result on success, or cancel_result if

Any

cancelled (defaults to None).

Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
async def run_with_cancellation(
    self,
    coroutine: Coroutine[Any, Any, Any],
    *,
    poll_interval: float = 2.0,
    cancel_result: Any = None,
) -> Any:
    """Run *coroutine* while polling for cancellation in the background.

    When cancelled, subscribers are notified via the bus and
    :attr:`is_cancelled` is set to ``True``.

    Args:
        coroutine: The async coroutine to execute.
        poll_interval: How often (in seconds) to poll for cancellation.
        cancel_result: Value to return when cancelled.  When provided
            the return type matches the coroutine's return type so
            callers don't need a ``None`` check.

    Returns:
        The coroutine's result on success, or *cancel_result* if
        cancelled (defaults to ``None``).
    """
    task = asyncio.create_task(coroutine)

    async def _watcher() -> None:
        while not task.done():
            cancelled = await self.check_cancellation_async()
            if cancelled:
                task.cancel()
                return
            await asyncio.sleep(poll_interval)

    watcher = asyncio.create_task(_watcher())
    try:
        return await task
    except asyncio.CancelledError:
        return cancel_result
    finally:
        watcher.cancel()
        try:
            await watcher
        except asyncio.CancelledError:
            pass

unique_toolkit.chat.cancellation.CancellationEvent dataclass

Published on the cancellation event bus when a user abort is detected.

Source code in unique_toolkit/unique_toolkit/chat/cancellation.py
15
16
17
18
19
@dataclass(frozen=True, slots=True)
class CancellationEvent:
    """Published on the cancellation event bus when a user abort is detected."""

    message_id: str