Documentation Index
Fetch the complete documentation index at: https://docs.sketricgen.ai/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Python SDK for the SketricGen Chat Server API. Build AI-powered workflows, chat applications, and document analysis tools.
Requirements
- Python 3.9+
- Dependencies:
httpx>=0.25.0, pydantic>=2.0.0, typing-extensions>=4.0.0
Installation
Or with other package managers:
# uv
uv add sketricgen
# poetry
poetry add sketricgen
Quick Start
from sketricgen import SketricGenClient
# Initialize client
client = SketricGenClient(api_key="your-api-key")
# Run a workflow
response = await client.run_workflow(
agent_id="agent-123",
user_input="Hello, how are you?",
)
print(response.response)
Features
- Run Workflows: Execute chat/workflow requests with AI agents
- Streaming: Real-time streaming responses using Server-Sent Events
- File Attachments: Attach images and PDFs to workflows
- Async & Sync: Both async and synchronous API support
- Type Safety: Full type hints for IDE support
- Error Handling: Comprehensive custom exception types
Client Reference
SketricGenClient
The main client for interacting with the SketricGen API.
Constructor
client = SketricGenClient(
api_key: str, # Your SketricGen API key (required)
timeout: int = 300, # Request timeout in seconds
upload_timeout: int = 300, # Upload timeout for large files
max_retries: int = 3, # Maximum retry attempts
)
From Environment Variables
# Set SKETRICGEN_API_KEY environment variable
client = SketricGenClient.from_env()
| Environment Variable | Default | Description |
|---|
SKETRICGEN_API_KEY | required | Your API key |
SKETRICGEN_TIMEOUT | 300 | Request timeout in seconds |
SKETRICGEN_UPLOAD_TIMEOUT | 300 | Upload timeout for large files |
SKETRICGEN_MAX_RETRIES | 3 | Maximum retry attempts |
run_workflow()
Execute a workflow/chat request (async).
response = await client.run_workflow(
agent_id: str, # Agent ID to chat with (required)
user_input: str, # User message, max 10000 chars (required)
conversation_id: str = None, # Resume a conversation
contact_id: str = None, # External contact ID
file_paths: list[str] = None, # Files to attach
stream: bool = False, # Enable streaming
)
Returns: ChatResponse if stream=False, AsyncIterator[StreamEvent] if stream=True
Example: Non-Streaming
response = await client.run_workflow(
agent_id="agent-123",
user_input="What is the weather like today?",
conversation_id="conv-456", # Optional: resume conversation
)
print(f"Response: {response.response}")
print(f"Conversation ID: {response.conversation_id}")
Example: Streaming
import json
async for event in await client.run_workflow(
agent_id="agent-123",
user_input="Tell me a story",
stream=True,
):
data = json.loads(event.data)
if data["type"] == "TEXT_MESSAGE_CONTENT":
print(data["delta"], end="", flush=True)
elif data["type"] == "RUN_FINISHED":
print()
run_workflow_sync()
Synchronous version of run_workflow().
response = client.run_workflow_sync(
agent_id: str,
user_input: str,
conversation_id: str = None,
contact_id: str = None,
file_paths: list[str] = None,
stream: bool = False,
)
Returns: ChatResponse if stream=False, Iterator[StreamEvent] if stream=True
Example
# Non-streaming
response = client.run_workflow_sync(
agent_id="agent-123",
user_input="Hello!",
)
print(response.response)
# Streaming
for event in client.run_workflow_sync(
agent_id="agent-123",
user_input="Tell me a story",
stream=True,
):
data = json.loads(event.data)
if data["type"] == "TEXT_MESSAGE_CONTENT":
print(data["delta"], end="", flush=True)
Response Models
ChatResponse
Response from a non-streaming workflow request.
| Field | Type | Description |
|---|
agent_id | str | Workflow/Agent ID |
user_id | str | User identifier |
conversation_id | str | Conversation ID for follow-up messages |
response | str | The assistant’s response text |
owner | str | Owner of the agent |
error | bool | Error flag (default: False) |
StreamEvent
Individual event from a streaming response.
| Field | Type | Description |
|---|
event_type | str | Type of the SSE event |
data | str | JSON string containing event data |
id | str | None | Optional event ID |
Streaming Events
The streaming API uses the AG-UI Protocol. Parse the data field as JSON to access event details.
Event Types
| Event Type | Description | Key Fields |
|---|
RUN_STARTED | Workflow execution started | thread_id, run_id |
TEXT_MESSAGE_START | Assistant message started | message_id, role |
TEXT_MESSAGE_CONTENT | Text chunk received | message_id, delta |
TEXT_MESSAGE_END | Assistant message completed | message_id |
TOOL_CALL_START | Tool/function call started | tool_call_id, tool_call_name |
TOOL_CALL_END | Tool/function call completed | tool_call_id |
RUN_FINISHED | Workflow completed | thread_id, run_id |
RUN_ERROR | Workflow error occurred | message |
CUSTOM | Custom event | varies |
Complete Streaming Example
import json
from sketricgen import SketricGenClient
client = SketricGenClient(api_key="your-api-key")
async for event in await client.run_workflow(
agent_id="agent-123",
user_input="Search for info and summarize it",
stream=True,
):
data = json.loads(event.data)
event_type = data["type"]
if event_type == "RUN_STARTED":
print(f"Started run: {data.get('run_id')}")
elif event_type == "TEXT_MESSAGE_CONTENT":
print(data["delta"], end="", flush=True)
elif event_type == "TOOL_CALL_START":
print(f"\n[Calling: {data['tool_call_name']}]")
elif event_type == "TOOL_CALL_END":
print("[Done]")
elif event_type == "RUN_FINISHED":
print("\n\nCompleted!")
elif event_type == "RUN_ERROR":
print(f"\nError: {data['message']}")
File Attachments
Attach files to workflows for document analysis and image understanding.
Supported File Types
| Type | MIME Types | Max Size |
|---|
| Images | image/jpeg, image/png, image/webp, image/gif | 20 MB |
| Documents | application/pdf | 20 MB |
Single File
response = await client.run_workflow(
agent_id="agent-123",
user_input="Summarize this document",
file_paths=["/path/to/document.pdf"],
)
print(response.response)
Multiple Files
response = await client.run_workflow(
agent_id="agent-123",
user_input="Compare these two documents",
file_paths=[
"/path/to/document1.pdf",
"/path/to/document2.pdf",
],
)
print(response.response)
Streaming with Files
async for event in await client.run_workflow(
agent_id="agent-123",
user_input="Analyze this image",
file_paths=["/path/to/image.png"],
stream=True,
):
data = json.loads(event.data)
if data["type"] == "TEXT_MESSAGE_CONTENT":
print(data["delta"], end="", flush=True)