Skip to content

Building an Event-Driven Agentic Table Application

This tutorial shows how to build an Agentic Table application that responds to user interactions through event handlers. An Agentic Table is an interactive spreadsheet component in the Unique platform that can be automated using Python event handlers.

What You'll Learn

  • Understanding the event-driven architecture of Agentic Tables
  • Setting up a FastAPI application to receive and route table events
  • Implementing handlers for different table lifecycle events
  • Using the AgenticTableService to interact with tables programmatically
  • Working with file metadata to build intelligent routing logic
  • Creating clickable references that link table content to source documents

Understanding the Event Flow

Agentic Tables work through an event-driven architecture. When users interact with a table in the Unique platform, events are sent to your webhook server. Here are the main events you'll handle:

Table Lifecycle Events

  1. SHEET_CREATED

    • Triggered when a user creates a new table sheet
    • Use this to initialize your table structure (headers, column styles)
  2. ADD_META_DATA

    • Triggered when a user adds question files or source files
    • Use this to process uploaded files and populate table data
    • Implement custom logic for handling different file types
  3. UPDATE_CELL

    • Triggered when a user edits an editable cell
    • Use this to implement business rules and automation
    • React to specific column changes with custom workflows
  4. GENERATE_ARTIFACT

    • Triggered when a user clicks to export -> generate a document
    • Use this to create exportable reports or documents from table data
  5. SHEET_COMPLETED

    • Triggered when a user marks a sheet as completed
    • Use this for final validation, archival, or triggering downstream processes

Here's how the event flow works:

sequenceDiagram
    participant User
    participant UniquePlatform
    participant YourWebhook
    participant AgenticTableService

    User->>UniquePlatform: Interacts with table
    UniquePlatform->>YourWebhook: Sends event webhook
    YourWebhook->>UniquePlatform: Register agent (lock table)
    YourWebhook->>YourWebhook: Route to specific handler
    YourWebhook->>AgenticTableService: Process event (update cells, status, etc.)
    AgenticTableService->>UniquePlatform: Apply changes
    YourWebhook->>UniquePlatform: Deregister agent (unlock table)
    UniquePlatform->>User: Display updated table

Essential Setup: The Event Handler and Application

Before diving into individual handlers, let's set up the core infrastructure. The main event handler receives all events and routes them to specialized handlers:

#main_event_handler
import logging
from unique_sdk.api_resources._agentic_table import ActivityStatus
from unique_toolkit.agentic_table.service import AgenticTableService
from unique_toolkit.agentic_table.schemas import MagicTableAction, MagicTableEvent

logger = logging.getLogger(__name__)


async def agentic_table_event_handler(event: MagicTableEvent) -> int:
    """
    Main event handler that routes table events to specialized handlers.
    """
    # Initialize the service to interact with the table
    at_service = AgenticTableService(
        user_id=event.user_id,
        company_id=event.company_id,
        table_id=event.payload.table_id,
    )

    # Register agent - locks the table during processing
    await at_service.register_agent()

    try:
        # Route events based on action type
        if event.payload.action == MagicTableAction.SHEET_CREATED:
            await at_service.deregister_agent()
            await handle_sheet_created(at_service, event.payload)

        elif event.payload.action == MagicTableAction.ADD_META_DATA:
            downloader = get_downloader(event.user_id, event.company_id, event.payload.chat_id)
            file_content_getter = get_file_content_getter(event.user_id, event.company_id, event.payload.chat_id)
            reference_builder = get_augmented_text_with_references(
                event.user_id, event.company_id, event.payload.chat_id, event.payload.assistant_id
            )
            await handle_metadata_added(
                at_service, 
                event.payload, 
                downloader,
                file_content_getter,
                reference_builder
            )

        elif event.payload.action == MagicTableAction.UPDATE_CELL:
            await handle_cell_updated(at_service, event.payload)

        elif event.payload.action == MagicTableAction.GENERATE_ARTIFACT:
            uploader = get_uploader(event.user_id, event.company_id, event.payload.chat_id)
            await handle_artifact_generated(at_service, event.payload, uploader)

        elif event.payload.action == MagicTableAction.SHEET_COMPLETED:
            logger.info(f"Sheet completed: {event.payload.sheet_name}")
            # Implement your completion logic here

        else:
            await at_service.set_activity(
                activity=event.payload.action,
                status=ActivityStatus.FAILED,
                text=f"Unknown action: {event.payload.action}",
            )
            raise Exception(f"Unknown action: {event.payload.action}")

        return 0  # Success

    except Exception as e:
        logger.error(f"Error in handler: {e}")
        return 1  # Failure

    finally:
        # Always deregister - unlocks the table
        await at_service.deregister_agent()

Now create the FastAPI application and run it:

#create_and_run_app
from pathlib import Path
from unique_toolkit.agentic_table.schemas import MagicTableEventTypes
from unique_toolkit.app.fast_api_factory import build_unique_custom_app
from unique_toolkit.app.unique_settings import UniqueSettings

# Initialize settings
_SETTINGS = UniqueSettings.from_env(env_file=Path(__file__).parent / "unique.env")
_SETTINGS.init_sdk()

# Create app with the event handler
_MINIMAL_APP = build_unique_custom_app(
    title="Unique Minimal Agentic Table App",
    settings=_SETTINGS,
    event_handler=agentic_table_event_handler,
    event_constructor=MagicTableEvent,
    subscribed_event_names=[ev.value for ev in MagicTableEventTypes],
)

# Run the application
if __name__ == "__main__":
    import uvicorn

    uvicorn.run(
        "fastapi_app_agentic_table:_MINIMAL_APP",
        host="0.0.0.0",
        port=5001,
        reload=True,
        log_level="debug",
    )

Run with: python fastapi_app_agentic_table.py

Defining Your Table Structure

Before implementing the handlers, we need to define what our table looks like. This example demonstrates a configuration pattern for defining columns. While this is hardcoded here, you can easily initialize it from a configuration payload, database, or user settings.

The column definitions specify:

  • Order: Position of the column (0-indexed)
  • Name: Column header text
  • Width: Column width in pixels
  • Renderer: Type of cell renderer (dropdown, checkbox, collaborator selector, etc.)
  • Editable: Whether users can edit the column
Column Definition Example (Click to expand)
from enum import StrEnum

from pydantic import BaseModel
from unique_sdk import CellRendererTypes


class ExampleColumnNames(StrEnum):
    QUESTION = "Question"
    SECTION = "Section"
    ANSWER = "Answer"
    CRITICAL_CONSISTENCY = "Critical Consistency"
    STATUS = "Status"
    REVIEWER = "Reviewer"

class ColumnDefinition(BaseModel):
    """
    Defines a single table column's structure and styling.

    Attributes:
        order: Column position (0-indexed)
        name: Column header text
        width: Column width in pixels
        renderer: Optional cell renderer type (dropdown, checkbox, etc.)
        editable: Whether the column is editable
    """

    order: int
    name: ExampleColumnNames
    width: int
    renderer: CellRendererTypes | None = None
    editable: bool = True


class ColumnDefinitions(BaseModel):
    """
    Container for all column definitions in the table.

    Provides helper methods to access columns by name.
    """

    columns: list[ColumnDefinition]

    @property
    def column_map(self) -> dict[str, ColumnDefinition]:
        """Map of column names to their definitions."""
        return {column.name: column for column in self.columns}

    def get_column_by_name(self, name: str) -> ColumnDefinition:
        """Get column definition by name."""
        return self.column_map[name]

    def get_column_name_by_order(self, order: int) -> ExampleColumnNames:
        """Get column definition by order."""
        return self.columns[order].name

    def get_column_names(self) -> list[str]:
        """Get list of all column names."""
        return list(self.column_map.keys())





example_configuration = {
    "columns": [
        {
            "order": 0,
            "name": ExampleColumnNames.QUESTION,
            "width": 300,
            "renderer": None,
            "editable": False,
        },
        {
            "order": 1,
            "name": ExampleColumnNames.SECTION,
            "width": 150,
            "renderer": None,
            "editable": False,
        },
        {
            "order": 2,
            "name": ExampleColumnNames.ANSWER,
            "width": 400,
            "renderer": CellRendererTypes.SELECTABLE_CELL_RENDERER,
            "editable": True,
        },
        {
            "order": 3,
            "name": ExampleColumnNames.CRITICAL_CONSISTENCY,
            "width": 200,
            "renderer": None,
            "editable": True,
        },
        {
            "order": 4,
            "name": ExampleColumnNames.STATUS,
            "width": 150,
            "renderer": CellRendererTypes.REVIEW_STATUS_DROPDOWN,
            "editable": True,
        },
        {
            "order": 5,
            "name": ExampleColumnNames.REVIEWER,
            "width": 150,
            "renderer": CellRendererTypes.COLLABORATOR_DROPDOWN,
            "editable": True,
        },
    ]
}

example_column_definitions = ColumnDefinitions.model_validate(example_configuration)

This creates a typed interface for your table that ensures consistency across all operations. You'll reference these definitions in your handlers to access columns by name.

Implementing Event Handlers

Now let's implement each handler. Each one uses the AgenticTableService (at_service) to interact with the table.

Handler 1: Sheet Created Event

When: Triggered when a user creates a new table sheet.

Goal: Initialize the table by setting up column headers and applying styling.

What we do with at_service:

  • set_activity() - Shows status messages to users (e.g., "Initializing table schema...")
  • set_cell() - Sets individual cell values (used here to write column headers in row 0)
  • set_column_style() - Applies styling to columns (width, renderer type, editability)

This handler prepares the empty table structure so it's ready to receive data. After this runs, users will see a properly formatted table with headers and the correct column configurations.

Sheet Created Handler Implementation (Click to expand)
from logging import getLogger

from unique_sdk.api_resources._agentic_table import ActivityStatus

from unique_toolkit.agentic_table.schemas import MagicTableSheetCreatedPayload
from unique_toolkit.agentic_table.service import AgenticTableService

from .agentic_table_example_column_definition import example_column_definitions

logger = getLogger(__name__)


async def handle_sheet_created(
    at_service: AgenticTableService, payload: MagicTableSheetCreatedPayload
) -> None:
    """
    Example handler for the sheet creation event.

    This demo shows how to initialize a new table by:
    - Setting column headers in row 0
    - Applying column styles (width, renderer type, editability)

    The table is ready to receive data after initialization.

    Args:
        at_service: Service instance for table operations
        payload: Event payload with table_id and sheet_name
    """
    logger.info(f"Initializing Source of Wealth table: {payload.sheet_name}")

    # Set activity status
    await at_service.set_activity(
        text="Initializing table schema...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    # Set headers in row 0
    for col_def in example_column_definitions.columns:
        await at_service.set_cell(row=0, column=col_def.order, text=col_def.name.value)

    logger.info(f"Set {len(example_column_definitions.columns)} column headers")

    # Apply column styles
    for col_def in example_column_definitions.columns:
        await at_service.set_column_style(
            column=col_def.order,
            width=col_def.width,
            cell_renderer=col_def.renderer,
            editable=col_def.editable,
        )

    logger.info("Applied column styles with all CellRendererTypes")

    # Set completion status
    await at_service.set_activity(
        text="Table schema initialized successfully",
        activity=payload.action,
        status=ActivityStatus.COMPLETED,
    )

Handler 2: Metadata Added Event

When: Triggered when a user uploads a question file or source file to the table.

Goal: Process the uploaded file and populate the table with data. This handler demonstrates two powerful framework capabilities:

  1. CSV Processing: Parse question files and batch-populate cells
  2. Content Metadata & References: Retrieve file metadata and create clickable references

Part A: Processing Question Files (CSV)

The first part handles question files by downloading and parsing CSV data:

What we do with at_service:

  • set_activity() - Updates users on progress through multiple stages ("Downloading CSV...", "Parsing...", "Populating...")
  • set_multiple_cells() - Batch operation to set many cells at once (much more efficient than individual updates)

This showcases how to automate data entry. You could extend this to:

  • Use AI agents to generate answers for questions in the CSV
  • Validate or enrich data before populating the table
  • Implement custom parsing logic for different file types

Part B: Processing Source Files with Metadata

The second part demonstrates two core framework capabilities that are essential for building sophisticated applications:

1. Retrieving File Content and Metadata

When users upload source files, you can retrieve the full Content objects which include:

  • content.id: Unique identifier
  • content.metadata: Custom key-value pairs (e.g., {"section": "Finance", "department": "Legal"})
  • content.title: File name or title
  • content.text: Extracted text content
  • content.chunks: List of ContentChunk objects for chunked documents

Example: Organizing Files by Metadata

# Retrieve Content objects for uploaded files
content = file_content_getter_fn(file_id)

# Access metadata
if content.metadata:
    section = content.metadata.get("section")
    department = content.metadata.get("department")

# EXAMPLE: Use ContentRegistry to group files by metadata keys
# (This is just one approach - implement your own filtering logic!)
content_registry = ContentRegistry(
    keys=["Finance", "Legal", "Technical"],
    contents=all_contents
)

# Retrieve all files with specific metadata key
finance_files = content_registry.get_contents_by_metadata_key("Finance")

# Alternative: Implement your own filtering
finance_files = [
    c for c in all_contents 
    if c.metadata and c.metadata.get("department") == "Finance"
]

Use Cases: - Route different source files to different table rows based on categories - Filter content by department, section, or custom tags - Build conditional logic based on file properties

2. Creating Clickable References

The framework provides a reference system that converts inline citations into clickable links in the UI. This is crucial when AI agents generate text with citations to source documents.

Why This Matters: - Users can click references to view source documents - Creates audit trails for AI-generated content - Improves transparency and trust in automated workflows - Enables verification of information

The Reference Workflow:

Step 1: AI/Logic generates text with inline citations
  "According to the Q3 report [chunk_abc123], revenue increased [chunk_xyz789]."

Step 2: Create reference registry mapping IDs to Content objects
  {
    "chunk_abc123": <Content object for Q3 report>,
    "chunk_xyz789": <Content object for financial data>
  }

Step 3: Convert citations to numbered references
  "According to the Q3 report [1], revenue increased [2]."

Step 4: Frontend renders as clickable links
  "According to the Q3 report [1]↗, revenue increased [2]↗."
  (clicking [1] opens the Q3 report document)

Implementation Example:

# Step 1: Create temporary IDs for your content items
reference_registry = create_id_map(relevant_contents, prefix="chunk")
# Returns: {"chunk_a1b2c3d4": content1, "chunk_x9y8z7w6": content2, ...}

# Step 2: Generate text with inline citations (from AI or your logic)
text_with_citations = "Based on the analysis [chunk_a1b2c3d4], we conclude..."

# Step 3: Convert to clickable references
augmented_text = augmented_text_with_references_fn(
    text_with_citations,
    reference_registry,
    prefix="chunk",
    citation_pattern=r"\[chunk_([a-zA-Z0-9\-]+)\]"
)
# Returns: "Based on the analysis [1&message_123], we conclude..."
# Frontend renders as: "Based on the analysis [1]↗, we conclude..." (clickable)

Use Cases: - Link AI-generated answers to source documents - Create audit trails for compliance and review - Enable fact-checking and verification workflows - Build transparent, explainable AI systems - Track provenance of information across your pipeline

What we do with at_service:

  • set_activity() - Provides progress updates during processing
  • get_sheet() - Retrieves table data to understand what's already populated
  • get_num_rows() - Gets the current number of rows
  • set_multiple_cells() - Batch updates cells with referenced content

The key takeaway is the batch operation pattern - when dealing with large datasets, always use set_multiple_cells() instead of individual set_cell() calls.

Metadata Added Handler Implementation (Click to expand)
import io
from logging import getLogger
from typing import Callable

import pandas as pd
from unique_sdk.api_resources._agentic_table import ActivityStatus

from unique_toolkit.agentic_table.schemas import (
    MagicTableAddMetadataPayload,
    MagicTableCell,
)
from unique_toolkit.agentic_table.service import AgenticTableService
from unique_toolkit.content import Content, ContentChunk

from .agentic_table_example_column_definition import (
    ExampleColumnNames,
    example_column_definitions,
)
from .agentic_table_helper_functions import ContentRegistry, create_id_map

logger = getLogger(__name__)


async def handle_question_files(
    at_service: AgenticTableService,
    payload: MagicTableAddMetadataPayload,
    downloader_fn: Callable[[str], bytes],
) -> int:
    """
    Handle question files by downloading and parsing CSV to populate the table.

    Args:
        at_service: Service instance for table operations
        payload: Event payload with metadata and file IDs
        downloader_fn: Function to download file contents

    Returns:
        Number of rows added to the table

    Raises:
        Exception: If CSV processing fails
    """
    # Check if question files were provided
    if not payload.metadata.question_file_ids:
        logger.warning("No question files provided in metadata")
        return 0

    await at_service.set_activity(
        text="Downloading CSV file...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    # Get the first question file (CSV)
    file_id = payload.metadata.question_file_ids[0]

    logger.info(f"Downloading file: {file_id}")
    # Download file content
    file_content = downloader_fn(file_id)

    await at_service.set_activity(
        text="Parsing CSV file...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    file_content_stream = io.BytesIO(file_content)

    # Parse CSV file
    df = pd.read_csv(file_content_stream)
    df = df.fillna("")  # Convert NA values to empty strings
    logger.info(f"Parsed CSV with {len(df)} rows and {len(df.columns)} columns")
    logger.info(df.head())

    await at_service.set_activity(
        text=f"Populating table with {len(df)} rows...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    # Create batch cells
    cells = []
    for row_idx, row_data in df.iterrows():
        for col_def in example_column_definitions.columns:
            cell_value = row_data.get(col_def.name.value, "")
            if not cell_value:
                continue
            cells.append(
                MagicTableCell(
                    row_order=int(row_idx)  # type: ignore[arg-type]
                    + 1,  # +1 for header row
                    column_order=col_def.order,
                    text=str(cell_value),
                    sheet_id=payload.table_id,
                )
            )

    logger.info(f"Created {len(cells)} cells for batch upload")

    # Batch upload all cells
    await at_service.set_multiple_cells(cells=cells)

    logger.info(f"Successfully populated table with {len(df)} rows")

    return len(df)


async def handle_source_files(
    at_service: AgenticTableService,
    payload: MagicTableAddMetadataPayload,
    file_content_getter_fn: Callable[[str], Content | None],
    augmented_text_with_references_fn: Callable[
        [str, dict[str, Content | ContentChunk], str, str], str
    ],
) -> int:
    """
    Handle source files by retrieving content and organizing by metadata.

    This handler demonstrates two key framework capabilities:
    1. Retrieving file content and accessing metadata
    2. Creating clickable references that link table cells to source documents

    The example shows:
    - How to fetch Content objects for uploaded files
    - How to use ContentRegistry to group files by metadata keys
    - How to generate text with inline citations and convert them to clickable references
    - How to populate table cells with referenced content

    Args:
        at_service: Service instance for table operations
        payload: Event payload with metadata and file IDs
        file_content_getter_fn: Function to retrieve file content objects
        augmented_text_with_references_fn: Function to convert citations to references

    Returns:
        Number of content items processed
    """
    # Check if source files were provided
    if not payload.metadata.source_file_ids:
        logger.warning("No source files provided in metadata")
        return 0

    await at_service.set_activity(
        text="Processing source files metadata...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )
    cells_to_update: list[MagicTableCell] = []

    num_rows = await at_service.get_num_rows()

    # STEP 1: Retrieve Content objects for all source files
    # Each Content object contains:
    # - content.id: Unique identifier
    # - content.metadata: Custom key-value pairs (e.g., {"section": "Finance"})
    # - content.title: File name or title
    # - content.text: Extracted text content
    # - content.chunks: List of ContentChunk objects for chunked documents
    all_contents = []
    for file_id in payload.metadata.source_file_ids:
        content = file_content_getter_fn(file_id)
        if content is None:
            logger.warning(f"No content found for file: {file_id}")
            continue
        if content.metadata is None:
            logger.warning(f"No metadata found for file: {file_id}")
            continue
        all_contents.append(content)

    # STEP 2: Organize content by metadata keys
    # This example assumes source files have metadata like:
    # {"Team": "true"}, {"Finance": "true"}, {"Technical": "true"}, etc.
    sections_of_interest = [
        "Team",
        "Finance",
        "Technical",
        "Planning",
    ]

    # ContentRegistry groups files by metadata keys
    # You can later retrieve all files tagged with "Finance", "Team", etc.
    content_registry = ContentRegistry(keys=sections_of_interest, contents=all_contents)

    # STEP 3: Process each row in the table
    # This demonstrates row-by-row processing where each row might need different source files
    for row_index in range(1, num_rows + 1):
        # Retrieve the current row to check what data it has
        row_cells = await at_service.get_sheet(
            start_row=row_index, end_row=row_index + 1
        )
        retrieved_cells: dict[ExampleColumnNames, MagicTableCell] = {
            example_column_definitions.get_column_name_by_order(cell.column_order): cell
            for cell in row_cells.magic_table_cells
        }

        logger.info(f"Retrieved cells: {retrieved_cells}")

        answer_cell = retrieved_cells.get(ExampleColumnNames.ANSWER)

        # Check if the answer cell exists. This means that the answer was already generated.
        if answer_cell is not None:
            logger.info(f"Answer found for row {row_index}: {answer_cell.text}")
        else:
            # Get the section for this row (e.g., "Finance", "Team")
            section_name = retrieved_cells.get(ExampleColumnNames.SECTION)

            if section_name is None:
                logger.warning(f"No section found for row {row_index}")
                continue

            # STEP 4: Retrieve relevant content based on row metadata
            # Use the ContentRegistry to get all files tagged with this section
            relevant_contents = content_registry.get_contents_by_metadata_key(
                section_name.text
            )

            if len(relevant_contents) == 0:
                logger.warning(f"No contents found for section '{section_name.text}'")
                continue

            logger.info(
                f"Found {len(relevant_contents)} content items for section '{section_name}'"
            )

            # STEP 5: Create a reference registry for citation mapping
            # This creates temporary IDs like "chunk_a1b2c3d4" for each content item
            # These IDs will be used in inline citations: [chunk_a1b2c3d4]
            chunk_prefix = "chunk"
            reference_registry = create_id_map(relevant_contents, chunk_prefix)

            logger.info(f"Reference registry: {reference_registry.keys()}")

            # STEP 6: Generate text with inline citations
            # In a real application, this would be AI-generated text with citations
            # Here we simulate it by listing the content titles with citation markers
            simulated_text_generation_with_references = (
                "The following are the contents of the section: \n"
            )
            for chunk_id, content in reference_registry.items():
                # Add inline citation in format [chunk_xxx]
                simulated_text_generation_with_references += (
                    f"{content.title} [{chunk_id}]\n"
                )

            # STEP 7: Convert inline citations to clickable references
            # This transforms [chunk_a1b2c3d4] into numbered references like [1], [2]
            # The frontend will render these as clickable links to the source files
            augmented_text = augmented_text_with_references_fn(
                simulated_text_generation_with_references,
                reference_registry,  # type: ignore[arg-type]
                chunk_prefix,
                r"\[chunk_([a-zA-Z0-9\-]+)\]",  # Citation pattern to match
            )

            # STEP 8: Update the table cell with referenced text
            cells_to_update.append(
                MagicTableCell(
                    row_order=row_index,
                    column_order=example_column_definitions.get_column_by_name(
                        ExampleColumnNames.ANSWER
                    ).order,
                    text=augmented_text,
                    sheet_id=payload.table_id,
                )
            )

    # Apply any cell updates
    if cells_to_update:
        await at_service.set_multiple_cells(cells=cells_to_update)

    await at_service.set_activity(
        text=f"Successfully processed {len(all_contents)} source files",
        activity=payload.action,
        status=ActivityStatus.COMPLETED,
    )

    return len(all_contents)


async def handle_metadata_added(
    at_service: AgenticTableService,
    payload: MagicTableAddMetadataPayload,
    downloader_fn: Callable[[str], bytes],
    file_content_getter_fn: Callable[[str], Content | None],
    augmented_text_with_references_fn: Callable[
        [str, dict[str, Content | ContentChunk], str, str], str
    ],
) -> None:
    """
    Example handler for the metadata addition event.

    This demo shows how to populate a table from uploaded files:
    - Process question files: Downloads CSV files and populates the table
    - Process source files: Retrieves content and groups by metadata

    Args:
        at_service: Service instance for table operations
        payload: Event payload with metadata and file IDs
        downloader_fn: Function to download file contents
        file_content_getter_fn: Function to retrieve file content objects
    """
    logger.info(f"Processing metadata for sheet: {payload.sheet_name}")

    try:
        # Handle question files (CSV processing)
        num_question_rows = await handle_question_files(
            at_service=at_service,
            payload=payload,
            downloader_fn=downloader_fn,
        )

        # Handle source files (content and metadata processing)
        num_source_rows = await handle_source_files(
            at_service=at_service,
            payload=payload,
            file_content_getter_fn=file_content_getter_fn,
            augmented_text_with_references_fn=augmented_text_with_references_fn,
        )

        # This is different from the LogEntry which shows in the cell history
        await at_service.set_activity(
            text=f"Successfully loaded {num_question_rows} rows from CSV and {num_source_rows} source file metadata rows",
            activity=payload.action,
            status=ActivityStatus.COMPLETED,
        )

    except Exception as e:
        logger.exception(f"Error processing files: {e}", exc_info=True)
        await at_service.set_activity(
            text=f"Failed to process files: {str(e)}",
            activity=payload.action,
            status=ActivityStatus.FAILED,
        )
        raise

Handler 3: Cell Updated Event

When: Triggered when a user edits an editable cell.

Goal: Implement business rules that react to cell changes. In this example, we monitor a specific column and lock rows when they reach a certain state.

What we do with at_service:

  • set_cell() - Updates the cell with additional context (log entries)
  • update_row_verification_status() - Changes the verification status of entire rows (can be used to lock/unlock or mark as verified)

This handler demonstrates workflow automation. The example checks if the "Critical Consistency" column is changed to "Consistent" and then:

  1. Adds a log entry documenting the change (creates an audit trail)
  2. Marks the row as verified (which can trigger visual indicators or prevent further edits)

You could extend this pattern to:

  • Regenerate AI responses when questions are modified
  • Trigger validation workflows
  • Update dependent cells automatically
  • Send notifications or trigger external systems
  • Implement approval chains or review processes

The power here is in defining your business logic - the framework just provides the hooks.

Cell Updated Handler Implementation (Click to expand)
from unique_toolkit.agentic_table.schemas import MagicTableUpdateCellPayload
from unique_toolkit.agentic_table.service import AgenticTableService
from logging import getLogger
from datetime import datetime
from unique_toolkit.language_model.schemas import LanguageModelMessageRole
from unique_sdk import RowVerificationStatus
from unique_toolkit.agentic_table.schemas import LogEntry
from .agentic_table_example_column_definition import (
    example_column_definitions,
    ExampleColumnNames,
)

logger = getLogger(__name__)


async def handle_cell_updated(
    at_service: AgenticTableService, payload: MagicTableUpdateCellPayload
) -> None:
    """
    Example handler for the cell update event.

    This demo shows a simple workflow automation: when the Critical Consistency column
    changes to "Consistent", it adds a log entry and updates
    the row verification status.

    Args:
        at_service: Service instance for table operations
        payload: Event payload with row, column, and new value
    """
    logger.info(
        f"Cell updated at row {payload.row_order}, "
        f"column {payload.column_order}: {payload.data}"
    )

    critical_consistency_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.CRITICAL_CONSISTENCY
    )

    # Check if the Critical Consistency column was updated
    if payload.column_order == critical_consistency_col.order:
        status_value = payload.data.strip()

        logger.info(f"Status changed to: {status_value}")

        # Check if status is Completed or Verified (lock row)
        if status_value.lower() in ["consistent"]:
            logger.info(
                f"Locking row {payload.row_order} due to status: {status_value}"
            )

            # Note: Column-level locking affects all rows. In a production system,
            # you might track locked rows in metadata and validate edits server-side.
            # Here we demonstrate the pattern with a log entry.

            # Add log entry to document the status change and locking
            log_entries = [
                LogEntry(
                    text=f"Row {payload.row_order} marked as {status_value}. Further edits should be restricted.",
                    created_at=datetime.now().isoformat(),
                    actor_type=LanguageModelMessageRole.ASSISTANT,
                )
            ]

            await at_service.set_cell(
                row=payload.row_order,
                column=payload.column_order,
                text=status_value,
                log_entries=log_entries,
            )

            # Update row verification status
            await at_service.update_row_verification_status(
                row_orders=[payload.row_order], status=RowVerificationStatus.VERIFIED
            )

            logger.info(f"Row {payload.row_order} verified and logged")

Handler 4: Artifact Generation Event

When: Triggered when a user clicks a button to generate a document or report.

Goal: Create exportable artifacts from table data. In this example, we generate a formatted Word document, but you could create any type of output.

What we do with at_service:

  • set_activity() - Provides progress updates during the potentially long-running generation process
  • get_sheet() - Retrieves all table data (you can specify row ranges for large tables)
  • set_artifact() - Links the generated file back to the table so users can easily find and download it

This handler shows the complete artifact generation workflow:

  1. Read table data
  2. Process/transform it (here we group by sections and format as markdown)
  3. Generate output file (DOCX in this case)
  4. Upload to the content system
  5. Link back to the table as an artifact

You could make this much more sophisticated:

  • Use AI agents to synthesize intelligent summaries
  • Generate multiple artifact types (PDF, Excel, PowerPoint)
  • Include charts, visualizations, or analytics
  • Apply custom templates and branding
  • Add conditional logic based on table content

The example demonstrates the basic pattern - reading data, transforming it, and creating a downloadable result.

Artifact Generation Handler Implementation (Click to expand)
from datetime import datetime
from logging import getLogger
from typing import Callable

from unique_sdk.api_resources._agentic_table import ActivityStatus

from unique_toolkit._common.docx_generator import (
    DocxGeneratorConfig,
    DocxGeneratorService,
)
from unique_toolkit.agentic_table.schemas import (
    MagicTableGenerateArtifactPayload,
    MagicTableSheet,
)
from unique_toolkit.agentic_table.service import AgenticTableService
from unique_toolkit.content.schemas import Content

from .agentic_table_example_column_definition import (
    ExampleColumnNames,
    example_column_definitions,
)

logger = getLogger(__name__)


async def handle_artifact_generated(
    at_service: AgenticTableService,
    payload: MagicTableGenerateArtifactPayload,
    uploader_fn: Callable[[bytes, str, str], Content],
) -> None:
    """
    Example handler for the artifact generation event.

    This demo shows how to export table data as a Word document:
    - Fetches all table data
    - Organizes it by sections
    - Generates a markdown report
    - Converts to DOCX and uploads it
    - Links the artifact back to the table

    Args:
        at_service: Service instance for table operations
        payload: Event payload with artifact type
        uploader_fn: Function to upload the generated file
    """
    logger.info(f"Generating artifact of type: {payload.data.artifact_type}")

    await at_service.set_activity(
        text="Starting report generation...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    try:
        # Read and organize data
        sheet = await at_service.get_sheet(start_row=0, end_row=None)
        rows_data = organize_sheet_data(sheet)

        # Build markdown report
        markdown = build_markdown_report(rows_data)

        # Generate DOCX
        await at_service.set_activity(
            text="Generating document...",
            activity=payload.action,
            status=ActivityStatus.IN_PROGRESS,
        )

        docx_generator = DocxGeneratorService(
            config=DocxGeneratorConfig(
                template_content_id="content-template-generic",
            )
        )

        content_fields = docx_generator.parse_markdown_to_list_content_fields(markdown)
        docx_file = docx_generator.generate_from_template(content_fields)

        if not docx_file:
            raise Exception("Failed to generate DOCX file")

        # Upload to chat
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"Table_Report_{timestamp}.docx"

        content = uploader_fn(
            docx_file,
            "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
            filename,
        )

        # Set artifact reference
        await at_service.set_artifact(
            artifact_type=payload.data.artifact_type,
            content_id=content.id,
            mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
            name=filename,
        )

        await at_service.set_activity(
            text=f"Report generated successfully: {filename}",
            activity=payload.action,
            status=ActivityStatus.COMPLETED,
        )

    except Exception as e:
        logger.error(f"Error generating artifact: {e}")
        await at_service.set_activity(
            text=f"Report generation failed: {str(e)}",
            activity=payload.action,
            status=ActivityStatus.FAILED,
        )
        raise


def organize_sheet_data(sheet: MagicTableSheet) -> dict[int, dict[int, str]]:
    """
    Convert flat cell list to nested dictionary structure.

    Returns:
        Dictionary with structure {row_order: {column_order: cell_text}}
    """
    rows_data: dict[int, dict[int, str]] = {}

    for cell in sheet.magic_table_cells:
        if cell.row_order not in rows_data:
            rows_data[cell.row_order] = {}
        rows_data[cell.row_order][cell.column_order] = cell.text

    return rows_data


def build_markdown_report(rows_data: dict[int, dict[int, str]]) -> str:
    """
    Build a markdown report grouped by sections.

    Returns:
        Markdown string with sections and question details
    """
    markdown_lines = [
        "# Table Report",
        "",
        f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
        "",
        "---",
        "",
    ]

    # Get column indices
    question_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.QUESTION
    ).order
    section_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.SECTION
    ).order
    answer_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.ANSWER
    ).order
    consistency_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.CRITICAL_CONSISTENCY
    ).order
    status_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.STATUS
    ).order
    reviewer_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.REVIEWER
    ).order

    # Get data rows (excluding header row 0)
    data_rows = {k: v for k, v in rows_data.items() if k > 0}

    # Group by section
    sections: dict[str, list[dict[int, str]]] = {}
    for row_data in data_rows.values():
        section = row_data.get(section_col, "General")
        if section not in sections:
            sections[section] = []
        sections[section].append(row_data)

    # Add each section
    for section_name, section_rows in sections.items():
        markdown_lines.extend(
            [
                f"## {section_name}",
                "",
            ]
        )

        for row_data in section_rows:
            question = row_data.get(question_col, "N/A")
            answer = row_data.get(answer_col, "N/A")
            consistency = row_data.get(consistency_col, "N/A")
            status = row_data.get(status_col, "N/A")
            reviewer = row_data.get(reviewer_col, "Unassigned")

            markdown_lines.extend(
                [
                    f"**Question:** {question}",
                    "",
                    f"**Answer:** {answer}",
                    "",
                    f"**Consistency:** {consistency}",
                    "",
                    f"**Status:** {status}",
                    "",
                    f"**Reviewer:** {reviewer}",
                    "",
                    "---",
                    "",
                ]
            )

    return "\n".join(markdown_lines)

Framework Utilities: Helper Functions

The tutorial examples use several helper functions that encapsulate common patterns for working with files, metadata, and references. These are available in the toolkit and demonstrate best practices for your own applications.

File Content Retrieval

Function: get_file_content_getter_fn(user_id, company_id, chat_id)

Creates a function to retrieve full Content objects by file ID. This is essential for accessing file metadata and properties.

# Create the getter function with authentication context
file_content_getter = get_file_content_getter_fn(user_id, company_id, chat_id)

# Retrieve content by ID
content = file_content_getter(file_id)

# Access properties
if content:
    metadata = content.metadata  # Custom key-value pairs
    title = content.title        # File name
    text = content.text          # Extracted text
    chunks = content.chunks      # ContentChunk objects

Use in your handlers: Pass this function to handlers that need to access source file metadata.

File Download

Function: get_downloader_fn(user_id, company_id, chat_id)

Creates a function to download raw file bytes. Useful for processing CSV, Excel, or other binary formats.

1
2
3
4
5
6
7
8
# Create the downloader with authentication context
downloader = get_downloader_fn(user_id, company_id, chat_id)

# Download file content
file_bytes = downloader(file_id)

# Process the bytes (e.g., parse CSV)
csv_data = pd.read_csv(io.BytesIO(file_bytes))

Use in your handlers: Pass this function to handlers that need to download and process file contents.

File Upload

Function: get_uploader_fn(user_id, company_id, chat_id)

Creates a function to upload files to the chat. Used for uploading generated artifacts.

# Create the uploader with authentication context
uploader = get_uploader_fn(user_id, company_id, chat_id)

# Upload a file
content = uploader(
    file_bytes,
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
    "report.docx"
)

# Use the returned content ID
content_id = content.id

Use in your handlers: Pass this function to artifact generation handlers.

Reference Creation

Function: get_augmented_text_with_references_fn(user_id, company_id, chat_id, assistant_id)

Creates a function to convert inline citations into clickable references. This is the core utility for building traceable, source-linked content.

# Create the reference builder with authentication context
reference_builder = get_augmented_text_with_references_fn(
    user_id, company_id, chat_id, assistant_id
)

# Create a reference registry
reference_registry = create_id_map(content_items, prefix="chunk")

# Convert citations to references
text_with_citations = "According to the report [chunk_abc123]..."
augmented_text = reference_builder(
    text_with_citations,
    reference_registry,
    prefix="chunk",
    citation_pattern=r"\[chunk_([a-zA-Z0-9\-]+)\]"
)
# Result: "According to the report [1]..." (clickable in UI)

Use in your handlers: Pass this function to handlers that generate AI content with source citations.

Content Organization

Class: ContentRegistry(keys, contents)

An example utility class demonstrating how to organize Content objects by metadata keys. This is just one approach - you should implement your own filtering logic based on your specific needs.

Example usage:

1
2
3
4
5
6
7
8
9
# Create a registry with metadata keys
content_registry = ContentRegistry(
    keys=["Finance", "Legal", "Technical"],
    contents=all_content_objects
)

# Retrieve files by metadata key
finance_files = content_registry.get_contents_by_metadata_key("Finance")
legal_files = content_registry.get_contents_by_metadata_key("Legal")

This example shows: Grouping by metadata key existence (e.g., file has {"Finance": "true"})

Your implementation might: - Filter by metadata values instead of keys (e.g., {"status": "approved"}) - Support complex queries (AND/OR conditions, ranges, regex) - Combine multiple metadata attributes (e.g., section AND department) - Implement scoring/ranking logic for content relevance - Use caching strategies for large content sets

Key point: Build your own registry class that matches your business logic and metadata structure!

ID Mapping

Function: create_id_map(items, prefix)

Generates temporary IDs for a list of items. Essential for creating citation systems.

1
2
3
4
5
6
# Create unique IDs for content items
id_map = create_id_map(content_list, prefix="chunk")
# Returns: {"chunk_a1b2c3d4": content1, "chunk_x9y8z7w6": content2, ...}

# Use in text generation
text = f"See source [{list(id_map.keys())[0]}]"

Pattern: Always use consistent prefixes and citation patterns throughout your application.

Helper Functions Implementation (Click to expand)
from collections import defaultdict
from logging import getLogger
from typing import Callable, TypeVar

from unique_toolkit.chat import ChatMessageRole
from unique_toolkit.content import ContentChunk
from unique_toolkit.content.schemas import Content, ContentReference

logger = getLogger(__name__)


def get_file_content_getter_fn(
    user_id: str, company_id: str, chat_id: str
) -> Callable[[str], Content | None]:
    """
    Factory function to create a content retriever with authentication context.

    The returned function allows you to:
    - Retrieve full Content objects by file ID
    - Access file metadata (custom key-value pairs attached during upload)
    - Access file chunks (for chunked documents)
    - Access file text and other properties

    Returns:
        A function that retrieves Content by file_id, or None if not found
    """
    from unique_toolkit.content.functions import search_contents

    def get_content_fn(file_id: str) -> Content | None:
        # Search for content by exact ID match
        where = {"id": {"equals": file_id}}
        contents = search_contents(
            user_id=user_id, company_id=company_id, chat_id=chat_id, where=where
        )
        assert len(contents) <= 1

        if len(contents) == 0:
            logger.warning(f"No content info found for file: {file_id}")
            return None

        if contents[0].metadata is None:
            logger.warning(f"No metadata found for file: {file_id}")

        logger.info(f"Metadata for file {file_id}: {contents[0].metadata}")
        return contents[0]

    return get_content_fn


def get_downloader_fn(
    user_id: str, company_id: str, chat_id: str
) -> Callable[[str], bytes]:
    """
    Factory function to create a file downloader with authentication context.

    Returns a function that downloads files by content_id.
    """
    from unique_toolkit.content.functions import download_content_to_bytes

    return lambda file_id: download_content_to_bytes(
        user_id=user_id, company_id=company_id, chat_id=chat_id, content_id=file_id
    )


def get_uploader_fn(
    user_id: str, company_id: str, chat_id: str
) -> Callable[[bytes, str, str], Content]:
    """
    Factory function to create a file uploader with authentication context.

    Returns a function that uploads files to the chat.
    """
    from unique_toolkit.content.functions import upload_content_from_bytes

    def uploader(content: bytes, mime_type: str, content_name: str) -> Content:
        return upload_content_from_bytes(
            user_id=user_id,
            company_id=company_id,
            content=content,
            mime_type=mime_type,
            content_name=content_name,
            chat_id=chat_id,
            skip_ingestion=True,
        )

    return uploader


def convert_content_chunk_to_reference(
    *,
    message_id: str,
    content_or_chunk: Content | ContentChunk,
    sequence_number: int | None = None,
    start_page: int | None = None,
    end_page: int | None = None,
) -> ContentReference:
    title = content_or_chunk.title or content_or_chunk.key or content_or_chunk.id

    page_suffix = None
    if start_page:
        if end_page:
            page_suffix = f": {start_page} - {end_page}"
        else:
            page_suffix = f": {start_page}"

    title = f"{title}{page_suffix}" if page_suffix else title

    return ContentReference(
        message_id=message_id,
        url=f"unique://content/{content_or_chunk.id}",
        source_id=content_or_chunk.id,
        name=title,
        sequence_number=sequence_number or 0,
        source="agentic-table",
    )


def get_augmented_text_with_references_fn(
    user_id: str, company_id: str, chat_id: str, assistant_id: str
) -> Callable[[str, dict[str, Content | ContentChunk], str, str], str]:
    """
    Factory function to create a reference builder with authentication context.

    The returned function converts inline citations in text (e.g., [chunk_abc123]) into 
    clickable references in the Unique UI. These references:
    - Appear as numbered citations (e.g., [1], [2]) in the frontend
    - Are clickable and navigate to the source content
    - Include metadata like title, page numbers, and source ID

    This is useful when:
    - AI agents generate text with citations to source documents
    - You want to create audit trails linking table cells to source files
    - You need to show provenance of data in the table

    Returns:
        A function that converts inline citations to numbered references
    """

    import re

    from unique_toolkit.chat.functions import create_message, modify_message

    # Default pattern matches citations like [chunk_abc123] or [chunk_xyz-456]
    _DEFAULT_CITATION_PATTERN = r"\[chunk_([a-zA-Z0-9\-]+)\]"

    def reference_builder(
        text: str,
        reference_registry: dict[str, Content | ContentChunk],
        prefix: str = "chunk",
        citation_pattern: str = _DEFAULT_CITATION_PATTERN,
    ) -> str:
        """
        Converts inline citations in text to numbered references with full content metadata.

        This function:
        1. Extracts all citation IDs from the text (e.g., [chunk_abc123])
        2. Looks up each citation in the reference registry
        3. Converts them to numbered references (e.g., [1&message_id])
        4. Creates a message with the processed text and reference metadata

        Args:
            text: The text containing inline citations in format [chunk_xxx].
            reference_registry: Dictionary mapping citation IDs to their full Content or ContentChunk objects.
            citation_pattern: Regex pattern to extract citation IDs from text (default matches [chunk_xxx]).

        Returns:
            The processed text with inline citations converted to numbered references.
        """

        # Create a new assistant message to hold the references
        message = create_message(
            user_id=user_id,
            company_id=company_id,
            chat_id=chat_id,
            assistant_id=assistant_id,
            role=ChatMessageRole.ASSISTANT,
        )
        assert message.id is not None

        # Extract all citation IDs from the text (e.g., "abc123" from "[chunk_abc123]")
        chunk_ids = re.findall(citation_pattern, text)

        logger.info(f"Found {len(chunk_ids)} chunk IDs in text")
        logger.info(f"Chunk IDs: {chunk_ids}")

        # Track which citations we've already processed to avoid duplicates
        processed_citations = {}

        # Collect all reference metadata to attach to the message
        message_references = []

        # Process each citation found in the text
        for chunk_id in chunk_ids:
            # Check if we've already processed this citation
            if chunk_id in processed_citations:
                # Reuse the same reference notation for duplicate citations
                reference_notation = processed_citations[chunk_id]
            else:
                # Look up the full content/chunk object for this citation
                referenced_content = reference_registry.get(f"{prefix}_{chunk_id}")

                if referenced_content:
                    # This is a valid citation - create a numbered reference
                    sequence_number = len(processed_citations) + 1

                    # Add the reference metadata to the message
                    message_references.append(
                        convert_content_chunk_to_reference(
                            message_id=message.id,
                            content_or_chunk=referenced_content,
                            sequence_number=sequence_number,
                        )
                    )

                    # Format: [sequence_number&message_id] (e.g., [1&msg_123])
                    reference_notation = f"[{sequence_number}&{message.id}]"
                    processed_citations[chunk_id] = reference_notation
                else:
                    # Citation ID not found in registry - mark as invalid
                    reference_notation = "[???]"

            # Replace the inline citation with the reference notation
            text = text.replace(f"[chunk_{chunk_id}]", reference_notation)

        # Update the message with the processed text and all references
        modify_message(
            assistant_message_id=message.id,
            user_message_id=message.id,
            user_message_text=text,
            assistant=True,
            user_id=user_id,
            company_id=company_id,
            chat_id=chat_id,
            references=message_references,
            content=text,
        )

        return text

    return reference_builder


class ContentRegistry:
    """
    An EXAMPLE utility class for organizing Content objects by metadata keys.

    This demonstrates ONE WAY to manage content with metadata. You should implement
    your own filtering logic based on your specific requirements.

    Example use case:
        If your source files have metadata like {"section": "Finance"} or {"section": "Legal"},
        this class groups them by those keys so you can retrieve all Finance-related files
        when processing a Finance row in your table.

    This is intentionally simple to show the pattern. For production use, consider:
    - Filtering by metadata VALUES, not just keys (e.g., {"status": "approved"})
    - Complex queries (AND/OR conditions, ranges, regex patterns)
    - Multiple metadata attributes (e.g., section AND department)
    - Caching strategies for large content sets
    - Custom scoring/ranking logic for content relevance

    Build your own registry class that fits your business logic!
    """

    def __init__(self, keys: list[str], contents: list[Content]):
        """
        Initialize with metadata keys and a list of Content objects.

        This example implementation groups content by checking if metadata keys exist.
        Your implementation might filter by metadata values, use complex queries,
        or implement completely different logic.

        Args:
            keys: List of metadata keys to group by (e.g., ["Finance", "Legal"])
            contents: List of Content objects to organize
        """
        self.keys = keys
        self.contents = contents

        grouped: dict[str, list[Content]] = defaultdict(list)

        # Group content by metadata keys
        for content in self.contents:
            if content.metadata is None:
                logger.warning(f"No metadata found for content: {content.id}")
                continue

            # Check if any of our target keys exist in this content's metadata
            for key in keys:
                if key in content.metadata:
                    logger.info(f"Found metadata key: {key} for content: {content.id}")
                    grouped[key].append(content)

        self.contents_by_key = dict(grouped)

    def get_contents_by_metadata_key(self, key: str) -> list[Content]:
        """
        Retrieve all content items that have the specified metadata key.

        Args:
            key: The metadata key to filter by

        Returns:
            List of Content objects with that metadata key, or empty list if none found
        """
        return self.contents_by_key.get(key, [])


T = TypeVar("T")


def create_id_map(items: list[T], prefix: str) -> dict[str, T]:
    """
    Create a mapping of generated IDs to items for use in reference systems.

    This helper generates unique IDs for a list of items (Content or ContentChunk objects)
    so they can be cited in text and later resolved back to their full objects.

    Args:
        items: List of items to create IDs for (typically Content or ContentChunk objects)
        prefix: Prefix for generated IDs (e.g., "chunk" creates IDs like "chunk_a1b2c3d4")

    Returns:
        Dictionary mapping generated IDs to items

    Example:
        >>> contents = [content1, content2, content3]
        >>> id_map = create_id_map(contents, "chunk")
        >>> # Returns: {"chunk_a1b2c3d4": content1, "chunk_x9y8z7w6": content2, ...}
    """
    from uuid import uuid4

    return {f"{prefix}_{uuid4().hex[:8]}": item for item in items}

Key Concepts

Agent Registration

Before processing any event, you must call at_service.register_agent(). This locks the table to prevent concurrent modifications. Always deregister in a finally block to ensure proper cleanup even if errors occur.

Activity Status Updates

Use at_service.set_activity() liberally to communicate with users. These appear as status banners in the UI and are essential for long-running operations. Include:

  • IN_PROGRESS status with descriptive text during processing
  • COMPLETED status when successful
  • FAILED status with error messages if something goes wrong

Batch Operations

The set_multiple_cells() method is crucial for performance when dealing with multiple cell updates. It's dramatically faster than individual set_cell() calls and reduces network overhead.

File Operations and Metadata Patterns

The tutorial demonstrates factory functions that encapsulate authentication context. This pattern:

  • Keeps authentication logic centralized
  • Makes handlers more testable
  • Simplifies the handler function signatures

File Download Pattern

Use get_downloader_fn() for downloading raw file bytes (CSV, Excel, binary formats):

downloader = get_downloader_fn(user_id, company_id, chat_id)
file_bytes = downloader(file_id)

Content Retrieval Pattern

Use get_file_content_getter_fn() for accessing file metadata and properties:

1
2
3
4
5
6
content_getter = get_file_content_getter_fn(user_id, company_id, chat_id)
content = content_getter(file_id)

# Access metadata
if content.metadata:
    section = content.metadata.get("section")

File Upload Pattern

Use get_uploader_fn() for uploading generated artifacts:

uploader = get_uploader_fn(user_id, company_id, chat_id)
content = uploader(file_bytes, mime_type, filename)

Reference Creation Pattern

Use get_augmented_text_with_references_fn() for creating clickable source links:

reference_builder = get_augmented_text_with_references_fn(
    user_id, company_id, chat_id, assistant_id
)

# Create ID mapping for your content
id_map = create_id_map(content_items, prefix="chunk")

# Convert citations to references
augmented_text = reference_builder(
    text_with_citations,
    id_map,
    prefix="chunk",
    citation_pattern=r"\[chunk_([a-zA-Z0-9\-]+)\]"
)

Key Benefits: - Citations become clickable links in the UI - Users can verify source information - Creates audit trails for AI-generated content - Improves transparency and trust

Flexibility is Key

Remember that all handlers in this tutorial are examples. The framework provides events and tools - how you use them depends on your business requirements. You can integrate with any external system, use AI models, implement complex workflows, or create custom validation rules.

Summary of Core Capabilities

This tutorial covered the essential building blocks for Agentic Table applications:

Event Handling

  • SHEET_CREATED: Initialize table structure with headers and styling
  • ADD_META_DATA: Process uploaded files (questions and sources)
  • UPDATE_CELL: React to user edits with business logic
  • GENERATE_ARTIFACT: Export table data as documents
  • SHEET_COMPLETED: Finalize and validate completed tables

Data Operations

  • Batch updates: Use set_multiple_cells() for performance
  • Row operations: Read, update, and verify individual rows
  • Status management: Communicate progress with set_activity()
  • Artifact linking: Connect generated files back to tables

File Management

  • Download files: Process CSV, Excel, and binary formats
  • Retrieve content: Access file metadata and properties
  • Upload artifacts: Create and link generated documents

Advanced Capabilities

  • Metadata filtering: Organize files by custom categories
  • Reference creation: Convert citations to clickable links
  • Content routing: Direct files to appropriate table rows
  • Audit trails: Track provenance of data and decisions

Integration Patterns

All helper functions use the factory pattern with authentication context: - Centralizes authentication logic - Makes handlers testable - Simplifies function signatures - Enables dependency injection

Key Takeaway: The framework provides the infrastructure (events, table operations, content management) while you implement the business logic (AI agents, validation rules, custom workflows).

Next Steps

Full Example Files (Click to expand)
import logging
from pathlib import Path

from unique_sdk.api_resources._agentic_table import ActivityStatus

from docs.examples_from_docs.agentic_table_example_artifact_generated_event_handler import (
    handle_artifact_generated,
)
from docs.examples_from_docs.agentic_table_example_cell_updated_event_handler import (
    handle_cell_updated,
)
from docs.examples_from_docs.agentic_table_example_metadata_added_event_handler import (
    handle_metadata_added,
)
from docs.examples_from_docs.agentic_table_example_sheet_created_event_handler import (
    handle_sheet_created,
)
from docs.examples_from_docs.agentic_table_helper_functions import (
    get_augmented_text_with_references_fn,
    get_downloader_fn,
    get_file_content_getter_fn,
    get_uploader_fn,
)
from unique_toolkit.agentic_table.schemas import (
    MagicTableAction,
    MagicTableEvent,
    MagicTableEventTypes,
)
from unique_toolkit.agentic_table.service import AgenticTableService
from unique_toolkit.app.fast_api_factory import build_unique_custom_app
from unique_toolkit.app.unique_settings import UniqueSettings

# Configure logging at module level so it works regardless of how the app is started
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


# Default event handler
async def agentic_table_event_handler(event: MagicTableEvent) -> int:
    """
    Default event handler that serves as a controller for the Agentic Table.
    """
    # Initialize Agentic Table Service to interact with agentic table.
    at_service = AgenticTableService(
        user_id=event.user_id,
        company_id=event.company_id,
        table_id=event.payload.table_id,
    )

    # Initialize the configuration from the event for your custom application
    # config = YourConfigClass.model_validate(event.payload.configuration)

    # You can now potentially use this configuration to initialize any other services needed for your application.
    ...

    # Register the agent
    # This locks the table from any modifications until the agent is completed.
    # Once registered the sheet status is shown as "Updating"
    await at_service.register_agent()

    try:
        # We use if-else statements here instead of match/case because it enables more precise typing and type narrowing for the payload based on the action (which functions as a discriminator).
        # Depending on the event received/action, run the corresponding functionality
        if event.payload.action == MagicTableAction.SHEET_CREATED:
            # This event is triggered when a new sheet is created.
            # You can use this for housekeeping tasks like displaying the table headers, etc.
            #
            # Payload type (MagicTableSheetCreatedPayload):
            logger.info(f"Sheet created: {event.payload.sheet_name}")

            # Deregister the agent to unblock upcoming events
            await at_service.deregister_agent()

            # Handle the sheet creation event (This is usally to setup the headers and column styles)
            await handle_sheet_created(at_service, event.payload)

            # In a standard workflow the user will select sources and a question file
            # This will trigger the add_metadata event and it can come quite quickly after the sheet_created event
            # therefore we need to deregister the agent to unblock the upcoming events

        elif event.payload.action == MagicTableAction.ADD_META_DATA:
            # This event is triggered when a new question or question file or source file is added.
            #
            # Payload type (MagicTableAddMetadataPayload):
            logger.info(f"Metadata added: {event.payload.metadata}")

            downloader_fn = get_downloader_fn(
                event.user_id, event.company_id, event.payload.chat_id
            )
            file_content_getter_fn = get_file_content_getter_fn(
                event.user_id, event.company_id, event.payload.chat_id
            )
            augmented_text_with_references_fn = get_augmented_text_with_references_fn(
                event.user_id, event.company_id, event.payload.chat_id, event.payload.assistant_id
            )
            await handle_metadata_added(
                at_service, event.payload, downloader_fn, file_content_getter_fn, augmented_text_with_references_fn
            )

        elif event.payload.action == MagicTableAction.UPDATE_CELL:
            # This event is triggered when a cell is updated.
            #
            # Payload type (MagicTableUpdateCellPayload):
            logger.info(
                f"Cell updated: {event.payload.column_order}, {event.payload.row_order}, {event.payload.data}"
            )

            await handle_cell_updated(at_service, event.payload)

        elif event.payload.action == MagicTableAction.GENERATE_ARTIFACT:
            # This event is triggered when a report generation button is clicked.
            #
            # Payload type (MagicTableGenerateArtifactPayload):
            logger.info(f"Artifact generated: {event.payload.data}")

            uploader_fn = get_uploader_fn(
                event.user_id, event.company_id, event.payload.chat_id
            )

            await handle_artifact_generated(at_service, event.payload, uploader_fn)

        elif event.payload.action == MagicTableAction.SHEET_COMPLETED:
            # This event is triggered when the sheet is marked as completed.
            #
            # Payload type (MagicTableSheetCompletedPayload):
            logger.info(f"Sheet completed: {event.payload.sheet_name}")

            # Here you can call a handler function that will handle the sheet completion event.

        elif event.payload.action == MagicTableAction.LIBRARY_SHEET_ROW_VERIFIED:
            # This event is triggered when a row in a "Library" sheet is verified.
            # This is a special sheet type and is only relevant within the context of Rfp Agent.
            # You can ignore this event/block if you are not working with the library feature.
            #
            # Payload type (MagicTableLibrarySheetRowVerifiedPayload):
            logger.info(
                f"Library sheet row verified: {event.payload.metadata.row_order}"
            )

            # Here you can call a handler function that will handle the library sheet row verified event.

        else:
            logger.error(f"Unknown action: {event.payload.action}")
            await at_service.set_activity(
                activity=event.payload.action,
                status=ActivityStatus.FAILED,
                text=f"Unknown action: {event.payload.action}",
            )
            raise Exception(f"Unknown action: {event.payload.action}")

        return 0  # Success

    except Exception as e:
        logger.error(f"Error in agentic table event handler: {e}")
        return 1  # Failure

    finally:
        # De-register the agent
        await at_service.deregister_agent()


# Create the default app instance at module level
# This MUST be at module level so uvicorn can find it when importing

_SETTINGS = UniqueSettings.from_env(env_file=Path(__file__).parent / "unique.env")
_SETTINGS.init_sdk()

# Create app using factory
_MINIMAL_APP = build_unique_custom_app(
    title="Unique Minimal Agentic Table App",
    settings=_SETTINGS,
    event_handler=agentic_table_event_handler,
    event_constructor=MagicTableEvent,
    subscribed_event_names=[ev.value for ev in MagicTableEventTypes],
)

if __name__ == "__main__":
    import logging

    import uvicorn

    # Initialize settings

    # Enable debug logging
    logging.basicConfig(
        level=logging.DEBUG,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )

    # Run the server
    uvicorn.run(
        "fastapi_app_agentic_table:_MINIMAL_APP",
        host="0.0.0.0",
        port=5001,
        reload=True,
        log_level="debug",
    )
from enum import StrEnum

from pydantic import BaseModel
from unique_sdk import CellRendererTypes


class ExampleColumnNames(StrEnum):
    QUESTION = "Question"
    SECTION = "Section"
    ANSWER = "Answer"
    CRITICAL_CONSISTENCY = "Critical Consistency"
    STATUS = "Status"
    REVIEWER = "Reviewer"

class ColumnDefinition(BaseModel):
    """
    Defines a single table column's structure and styling.

    Attributes:
        order: Column position (0-indexed)
        name: Column header text
        width: Column width in pixels
        renderer: Optional cell renderer type (dropdown, checkbox, etc.)
        editable: Whether the column is editable
    """

    order: int
    name: ExampleColumnNames
    width: int
    renderer: CellRendererTypes | None = None
    editable: bool = True


class ColumnDefinitions(BaseModel):
    """
    Container for all column definitions in the table.

    Provides helper methods to access columns by name.
    """

    columns: list[ColumnDefinition]

    @property
    def column_map(self) -> dict[str, ColumnDefinition]:
        """Map of column names to their definitions."""
        return {column.name: column for column in self.columns}

    def get_column_by_name(self, name: str) -> ColumnDefinition:
        """Get column definition by name."""
        return self.column_map[name]

    def get_column_name_by_order(self, order: int) -> ExampleColumnNames:
        """Get column definition by order."""
        return self.columns[order].name

    def get_column_names(self) -> list[str]:
        """Get list of all column names."""
        return list(self.column_map.keys())





example_configuration = {
    "columns": [
        {
            "order": 0,
            "name": ExampleColumnNames.QUESTION,
            "width": 300,
            "renderer": None,
            "editable": False,
        },
        {
            "order": 1,
            "name": ExampleColumnNames.SECTION,
            "width": 150,
            "renderer": None,
            "editable": False,
        },
        {
            "order": 2,
            "name": ExampleColumnNames.ANSWER,
            "width": 400,
            "renderer": CellRendererTypes.SELECTABLE_CELL_RENDERER,
            "editable": True,
        },
        {
            "order": 3,
            "name": ExampleColumnNames.CRITICAL_CONSISTENCY,
            "width": 200,
            "renderer": None,
            "editable": True,
        },
        {
            "order": 4,
            "name": ExampleColumnNames.STATUS,
            "width": 150,
            "renderer": CellRendererTypes.REVIEW_STATUS_DROPDOWN,
            "editable": True,
        },
        {
            "order": 5,
            "name": ExampleColumnNames.REVIEWER,
            "width": 150,
            "renderer": CellRendererTypes.COLLABORATOR_DROPDOWN,
            "editable": True,
        },
    ]
}

example_column_definitions = ColumnDefinitions.model_validate(example_configuration)
from logging import getLogger

from unique_sdk.api_resources._agentic_table import ActivityStatus

from unique_toolkit.agentic_table.schemas import MagicTableSheetCreatedPayload
from unique_toolkit.agentic_table.service import AgenticTableService

from .agentic_table_example_column_definition import example_column_definitions

logger = getLogger(__name__)


async def handle_sheet_created(
    at_service: AgenticTableService, payload: MagicTableSheetCreatedPayload
) -> None:
    """
    Example handler for the sheet creation event.

    This demo shows how to initialize a new table by:
    - Setting column headers in row 0
    - Applying column styles (width, renderer type, editability)

    The table is ready to receive data after initialization.

    Args:
        at_service: Service instance for table operations
        payload: Event payload with table_id and sheet_name
    """
    logger.info(f"Initializing Source of Wealth table: {payload.sheet_name}")

    # Set activity status
    await at_service.set_activity(
        text="Initializing table schema...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    # Set headers in row 0
    for col_def in example_column_definitions.columns:
        await at_service.set_cell(row=0, column=col_def.order, text=col_def.name.value)

    logger.info(f"Set {len(example_column_definitions.columns)} column headers")

    # Apply column styles
    for col_def in example_column_definitions.columns:
        await at_service.set_column_style(
            column=col_def.order,
            width=col_def.width,
            cell_renderer=col_def.renderer,
            editable=col_def.editable,
        )

    logger.info("Applied column styles with all CellRendererTypes")

    # Set completion status
    await at_service.set_activity(
        text="Table schema initialized successfully",
        activity=payload.action,
        status=ActivityStatus.COMPLETED,
    )
import io
from logging import getLogger
from typing import Callable

import pandas as pd
from unique_sdk.api_resources._agentic_table import ActivityStatus

from unique_toolkit.agentic_table.schemas import (
    MagicTableAddMetadataPayload,
    MagicTableCell,
)
from unique_toolkit.agentic_table.service import AgenticTableService
from unique_toolkit.content import Content, ContentChunk

from .agentic_table_example_column_definition import (
    ExampleColumnNames,
    example_column_definitions,
)
from .agentic_table_helper_functions import ContentRegistry, create_id_map

logger = getLogger(__name__)


async def handle_question_files(
    at_service: AgenticTableService,
    payload: MagicTableAddMetadataPayload,
    downloader_fn: Callable[[str], bytes],
) -> int:
    """
    Handle question files by downloading and parsing CSV to populate the table.

    Args:
        at_service: Service instance for table operations
        payload: Event payload with metadata and file IDs
        downloader_fn: Function to download file contents

    Returns:
        Number of rows added to the table

    Raises:
        Exception: If CSV processing fails
    """
    # Check if question files were provided
    if not payload.metadata.question_file_ids:
        logger.warning("No question files provided in metadata")
        return 0

    await at_service.set_activity(
        text="Downloading CSV file...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    # Get the first question file (CSV)
    file_id = payload.metadata.question_file_ids[0]

    logger.info(f"Downloading file: {file_id}")
    # Download file content
    file_content = downloader_fn(file_id)

    await at_service.set_activity(
        text="Parsing CSV file...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    file_content_stream = io.BytesIO(file_content)

    # Parse CSV file
    df = pd.read_csv(file_content_stream)
    df = df.fillna("")  # Convert NA values to empty strings
    logger.info(f"Parsed CSV with {len(df)} rows and {len(df.columns)} columns")
    logger.info(df.head())

    await at_service.set_activity(
        text=f"Populating table with {len(df)} rows...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    # Create batch cells
    cells = []
    for row_idx, row_data in df.iterrows():
        for col_def in example_column_definitions.columns:
            cell_value = row_data.get(col_def.name.value, "")
            if not cell_value:
                continue
            cells.append(
                MagicTableCell(
                    row_order=int(row_idx)  # type: ignore[arg-type]
                    + 1,  # +1 for header row
                    column_order=col_def.order,
                    text=str(cell_value),
                    sheet_id=payload.table_id,
                )
            )

    logger.info(f"Created {len(cells)} cells for batch upload")

    # Batch upload all cells
    await at_service.set_multiple_cells(cells=cells)

    logger.info(f"Successfully populated table with {len(df)} rows")

    return len(df)


async def handle_source_files(
    at_service: AgenticTableService,
    payload: MagicTableAddMetadataPayload,
    file_content_getter_fn: Callable[[str], Content | None],
    augmented_text_with_references_fn: Callable[
        [str, dict[str, Content | ContentChunk], str, str], str
    ],
) -> int:
    """
    Handle source files by retrieving content and organizing by metadata.

    This handler demonstrates two key framework capabilities:
    1. Retrieving file content and accessing metadata
    2. Creating clickable references that link table cells to source documents

    The example shows:
    - How to fetch Content objects for uploaded files
    - How to use ContentRegistry to group files by metadata keys
    - How to generate text with inline citations and convert them to clickable references
    - How to populate table cells with referenced content

    Args:
        at_service: Service instance for table operations
        payload: Event payload with metadata and file IDs
        file_content_getter_fn: Function to retrieve file content objects
        augmented_text_with_references_fn: Function to convert citations to references

    Returns:
        Number of content items processed
    """
    # Check if source files were provided
    if not payload.metadata.source_file_ids:
        logger.warning("No source files provided in metadata")
        return 0

    await at_service.set_activity(
        text="Processing source files metadata...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )
    cells_to_update: list[MagicTableCell] = []

    num_rows = await at_service.get_num_rows()

    # STEP 1: Retrieve Content objects for all source files
    # Each Content object contains:
    # - content.id: Unique identifier
    # - content.metadata: Custom key-value pairs (e.g., {"section": "Finance"})
    # - content.title: File name or title
    # - content.text: Extracted text content
    # - content.chunks: List of ContentChunk objects for chunked documents
    all_contents = []
    for file_id in payload.metadata.source_file_ids:
        content = file_content_getter_fn(file_id)
        if content is None:
            logger.warning(f"No content found for file: {file_id}")
            continue
        if content.metadata is None:
            logger.warning(f"No metadata found for file: {file_id}")
            continue
        all_contents.append(content)

    # STEP 2: Organize content by metadata keys
    # This example assumes source files have metadata like:
    # {"Team": "true"}, {"Finance": "true"}, {"Technical": "true"}, etc.
    sections_of_interest = [
        "Team",
        "Finance",
        "Technical",
        "Planning",
    ]

    # ContentRegistry groups files by metadata keys
    # You can later retrieve all files tagged with "Finance", "Team", etc.
    content_registry = ContentRegistry(keys=sections_of_interest, contents=all_contents)

    # STEP 3: Process each row in the table
    # This demonstrates row-by-row processing where each row might need different source files
    for row_index in range(1, num_rows + 1):
        # Retrieve the current row to check what data it has
        row_cells = await at_service.get_sheet(
            start_row=row_index, end_row=row_index + 1
        )
        retrieved_cells: dict[ExampleColumnNames, MagicTableCell] = {
            example_column_definitions.get_column_name_by_order(cell.column_order): cell
            for cell in row_cells.magic_table_cells
        }

        logger.info(f"Retrieved cells: {retrieved_cells}")

        answer_cell = retrieved_cells.get(ExampleColumnNames.ANSWER)

        # Check if the answer cell exists. This means that the answer was already generated.
        if answer_cell is not None:
            logger.info(f"Answer found for row {row_index}: {answer_cell.text}")
        else:
            # Get the section for this row (e.g., "Finance", "Team")
            section_name = retrieved_cells.get(ExampleColumnNames.SECTION)

            if section_name is None:
                logger.warning(f"No section found for row {row_index}")
                continue

            # STEP 4: Retrieve relevant content based on row metadata
            # Use the ContentRegistry to get all files tagged with this section
            relevant_contents = content_registry.get_contents_by_metadata_key(
                section_name.text
            )

            if len(relevant_contents) == 0:
                logger.warning(f"No contents found for section '{section_name.text}'")
                continue

            logger.info(
                f"Found {len(relevant_contents)} content items for section '{section_name}'"
            )

            # STEP 5: Create a reference registry for citation mapping
            # This creates temporary IDs like "chunk_a1b2c3d4" for each content item
            # These IDs will be used in inline citations: [chunk_a1b2c3d4]
            chunk_prefix = "chunk"
            reference_registry = create_id_map(relevant_contents, chunk_prefix)

            logger.info(f"Reference registry: {reference_registry.keys()}")

            # STEP 6: Generate text with inline citations
            # In a real application, this would be AI-generated text with citations
            # Here we simulate it by listing the content titles with citation markers
            simulated_text_generation_with_references = (
                "The following are the contents of the section: \n"
            )
            for chunk_id, content in reference_registry.items():
                # Add inline citation in format [chunk_xxx]
                simulated_text_generation_with_references += (
                    f"{content.title} [{chunk_id}]\n"
                )

            # STEP 7: Convert inline citations to clickable references
            # This transforms [chunk_a1b2c3d4] into numbered references like [1], [2]
            # The frontend will render these as clickable links to the source files
            augmented_text = augmented_text_with_references_fn(
                simulated_text_generation_with_references,
                reference_registry,  # type: ignore[arg-type]
                chunk_prefix,
                r"\[chunk_([a-zA-Z0-9\-]+)\]",  # Citation pattern to match
            )

            # STEP 8: Update the table cell with referenced text
            cells_to_update.append(
                MagicTableCell(
                    row_order=row_index,
                    column_order=example_column_definitions.get_column_by_name(
                        ExampleColumnNames.ANSWER
                    ).order,
                    text=augmented_text,
                    sheet_id=payload.table_id,
                )
            )

    # Apply any cell updates
    if cells_to_update:
        await at_service.set_multiple_cells(cells=cells_to_update)

    await at_service.set_activity(
        text=f"Successfully processed {len(all_contents)} source files",
        activity=payload.action,
        status=ActivityStatus.COMPLETED,
    )

    return len(all_contents)


async def handle_metadata_added(
    at_service: AgenticTableService,
    payload: MagicTableAddMetadataPayload,
    downloader_fn: Callable[[str], bytes],
    file_content_getter_fn: Callable[[str], Content | None],
    augmented_text_with_references_fn: Callable[
        [str, dict[str, Content | ContentChunk], str, str], str
    ],
) -> None:
    """
    Example handler for the metadata addition event.

    This demo shows how to populate a table from uploaded files:
    - Process question files: Downloads CSV files and populates the table
    - Process source files: Retrieves content and groups by metadata

    Args:
        at_service: Service instance for table operations
        payload: Event payload with metadata and file IDs
        downloader_fn: Function to download file contents
        file_content_getter_fn: Function to retrieve file content objects
    """
    logger.info(f"Processing metadata for sheet: {payload.sheet_name}")

    try:
        # Handle question files (CSV processing)
        num_question_rows = await handle_question_files(
            at_service=at_service,
            payload=payload,
            downloader_fn=downloader_fn,
        )

        # Handle source files (content and metadata processing)
        num_source_rows = await handle_source_files(
            at_service=at_service,
            payload=payload,
            file_content_getter_fn=file_content_getter_fn,
            augmented_text_with_references_fn=augmented_text_with_references_fn,
        )

        # This is different from the LogEntry which shows in the cell history
        await at_service.set_activity(
            text=f"Successfully loaded {num_question_rows} rows from CSV and {num_source_rows} source file metadata rows",
            activity=payload.action,
            status=ActivityStatus.COMPLETED,
        )

    except Exception as e:
        logger.exception(f"Error processing files: {e}", exc_info=True)
        await at_service.set_activity(
            text=f"Failed to process files: {str(e)}",
            activity=payload.action,
            status=ActivityStatus.FAILED,
        )
        raise
from unique_toolkit.agentic_table.schemas import MagicTableUpdateCellPayload
from unique_toolkit.agentic_table.service import AgenticTableService
from logging import getLogger
from datetime import datetime
from unique_toolkit.language_model.schemas import LanguageModelMessageRole
from unique_sdk import RowVerificationStatus
from unique_toolkit.agentic_table.schemas import LogEntry
from .agentic_table_example_column_definition import (
    example_column_definitions,
    ExampleColumnNames,
)

logger = getLogger(__name__)


async def handle_cell_updated(
    at_service: AgenticTableService, payload: MagicTableUpdateCellPayload
) -> None:
    """
    Example handler for the cell update event.

    This demo shows a simple workflow automation: when the Critical Consistency column
    changes to "Consistent", it adds a log entry and updates
    the row verification status.

    Args:
        at_service: Service instance for table operations
        payload: Event payload with row, column, and new value
    """
    logger.info(
        f"Cell updated at row {payload.row_order}, "
        f"column {payload.column_order}: {payload.data}"
    )

    critical_consistency_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.CRITICAL_CONSISTENCY
    )

    # Check if the Critical Consistency column was updated
    if payload.column_order == critical_consistency_col.order:
        status_value = payload.data.strip()

        logger.info(f"Status changed to: {status_value}")

        # Check if status is Completed or Verified (lock row)
        if status_value.lower() in ["consistent"]:
            logger.info(
                f"Locking row {payload.row_order} due to status: {status_value}"
            )

            # Note: Column-level locking affects all rows. In a production system,
            # you might track locked rows in metadata and validate edits server-side.
            # Here we demonstrate the pattern with a log entry.

            # Add log entry to document the status change and locking
            log_entries = [
                LogEntry(
                    text=f"Row {payload.row_order} marked as {status_value}. Further edits should be restricted.",
                    created_at=datetime.now().isoformat(),
                    actor_type=LanguageModelMessageRole.ASSISTANT,
                )
            ]

            await at_service.set_cell(
                row=payload.row_order,
                column=payload.column_order,
                text=status_value,
                log_entries=log_entries,
            )

            # Update row verification status
            await at_service.update_row_verification_status(
                row_orders=[payload.row_order], status=RowVerificationStatus.VERIFIED
            )

            logger.info(f"Row {payload.row_order} verified and logged")
from datetime import datetime
from logging import getLogger
from typing import Callable

from unique_sdk.api_resources._agentic_table import ActivityStatus

from unique_toolkit._common.docx_generator import (
    DocxGeneratorConfig,
    DocxGeneratorService,
)
from unique_toolkit.agentic_table.schemas import (
    MagicTableGenerateArtifactPayload,
    MagicTableSheet,
)
from unique_toolkit.agentic_table.service import AgenticTableService
from unique_toolkit.content.schemas import Content

from .agentic_table_example_column_definition import (
    ExampleColumnNames,
    example_column_definitions,
)

logger = getLogger(__name__)


async def handle_artifact_generated(
    at_service: AgenticTableService,
    payload: MagicTableGenerateArtifactPayload,
    uploader_fn: Callable[[bytes, str, str], Content],
) -> None:
    """
    Example handler for the artifact generation event.

    This demo shows how to export table data as a Word document:
    - Fetches all table data
    - Organizes it by sections
    - Generates a markdown report
    - Converts to DOCX and uploads it
    - Links the artifact back to the table

    Args:
        at_service: Service instance for table operations
        payload: Event payload with artifact type
        uploader_fn: Function to upload the generated file
    """
    logger.info(f"Generating artifact of type: {payload.data.artifact_type}")

    await at_service.set_activity(
        text="Starting report generation...",
        activity=payload.action,
        status=ActivityStatus.IN_PROGRESS,
    )

    try:
        # Read and organize data
        sheet = await at_service.get_sheet(start_row=0, end_row=None)
        rows_data = organize_sheet_data(sheet)

        # Build markdown report
        markdown = build_markdown_report(rows_data)

        # Generate DOCX
        await at_service.set_activity(
            text="Generating document...",
            activity=payload.action,
            status=ActivityStatus.IN_PROGRESS,
        )

        docx_generator = DocxGeneratorService(
            config=DocxGeneratorConfig(
                template_content_id="content-template-generic",
            )
        )

        content_fields = docx_generator.parse_markdown_to_list_content_fields(markdown)
        docx_file = docx_generator.generate_from_template(content_fields)

        if not docx_file:
            raise Exception("Failed to generate DOCX file")

        # Upload to chat
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"Table_Report_{timestamp}.docx"

        content = uploader_fn(
            docx_file,
            "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
            filename,
        )

        # Set artifact reference
        await at_service.set_artifact(
            artifact_type=payload.data.artifact_type,
            content_id=content.id,
            mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
            name=filename,
        )

        await at_service.set_activity(
            text=f"Report generated successfully: {filename}",
            activity=payload.action,
            status=ActivityStatus.COMPLETED,
        )

    except Exception as e:
        logger.error(f"Error generating artifact: {e}")
        await at_service.set_activity(
            text=f"Report generation failed: {str(e)}",
            activity=payload.action,
            status=ActivityStatus.FAILED,
        )
        raise


def organize_sheet_data(sheet: MagicTableSheet) -> dict[int, dict[int, str]]:
    """
    Convert flat cell list to nested dictionary structure.

    Returns:
        Dictionary with structure {row_order: {column_order: cell_text}}
    """
    rows_data: dict[int, dict[int, str]] = {}

    for cell in sheet.magic_table_cells:
        if cell.row_order not in rows_data:
            rows_data[cell.row_order] = {}
        rows_data[cell.row_order][cell.column_order] = cell.text

    return rows_data


def build_markdown_report(rows_data: dict[int, dict[int, str]]) -> str:
    """
    Build a markdown report grouped by sections.

    Returns:
        Markdown string with sections and question details
    """
    markdown_lines = [
        "# Table Report",
        "",
        f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
        "",
        "---",
        "",
    ]

    # Get column indices
    question_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.QUESTION
    ).order
    section_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.SECTION
    ).order
    answer_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.ANSWER
    ).order
    consistency_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.CRITICAL_CONSISTENCY
    ).order
    status_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.STATUS
    ).order
    reviewer_col = example_column_definitions.get_column_by_name(
        ExampleColumnNames.REVIEWER
    ).order

    # Get data rows (excluding header row 0)
    data_rows = {k: v for k, v in rows_data.items() if k > 0}

    # Group by section
    sections: dict[str, list[dict[int, str]]] = {}
    for row_data in data_rows.values():
        section = row_data.get(section_col, "General")
        if section not in sections:
            sections[section] = []
        sections[section].append(row_data)

    # Add each section
    for section_name, section_rows in sections.items():
        markdown_lines.extend(
            [
                f"## {section_name}",
                "",
            ]
        )

        for row_data in section_rows:
            question = row_data.get(question_col, "N/A")
            answer = row_data.get(answer_col, "N/A")
            consistency = row_data.get(consistency_col, "N/A")
            status = row_data.get(status_col, "N/A")
            reviewer = row_data.get(reviewer_col, "Unassigned")

            markdown_lines.extend(
                [
                    f"**Question:** {question}",
                    "",
                    f"**Answer:** {answer}",
                    "",
                    f"**Consistency:** {consistency}",
                    "",
                    f"**Status:** {status}",
                    "",
                    f"**Reviewer:** {reviewer}",
                    "",
                    "---",
                    "",
                ]
            )

    return "\n".join(markdown_lines)