From 3e415c00e1af67ab16fbfc1a26f9e2e713bee5ce Mon Sep 17 00:00:00 2001 From: habema Date: Thu, 14 Aug 2025 15:55:53 +0300 Subject: [PATCH 1/2] implement structured storage with additional tables for messages and tool calls --- docs/sessions.md | 32 +++ examples/basic/structured_session_demo.py | 128 ++++++++++ src/agents/memory/session.py | 162 ++++++++++++- tests/test_structured_session.py | 273 ++++++++++++++++++++++ 4 files changed, 587 insertions(+), 8 deletions(-) create mode 100644 examples/basic/structured_session_demo.py create mode 100644 tests/test_structured_session.py diff --git a/docs/sessions.md b/docs/sessions.md index c66cb85ae..b37495c0e 100644 --- a/docs/sessions.md +++ b/docs/sessions.md @@ -141,6 +141,38 @@ result = await Runner.run( ) ``` +### Structured storage + +By default, SQLiteSession stores all conversation events as JSON blobs in a single table. You can enable structured storage to create additional tables for messages and tool calls: + +```python +from agents import SQLiteSession + +# Enable structured storage +session = SQLiteSession( + "user_123", + "conversations.db", + structured=True +) + +# This creates additional tables: +# - agent_conversation_messages: stores user, assistant, system messages +# - agent_tool_calls: stores tool call requests and outputs +``` + +With structured storage enabled, you can query conversations using standard SQL: + +```sql +-- Get all user messages in a session +SELECT content FROM agent_conversation_messages +WHERE session_id = 'user_123' AND role = 'user'; + +-- Get all tool calls and their results +SELECT tool_name, arguments, output, status +FROM agent_tool_calls +WHERE session_id = 'user_123'; +``` + ### Multiple sessions ```python diff --git a/examples/basic/structured_session_demo.py b/examples/basic/structured_session_demo.py new file mode 100644 index 000000000..2e8fceb03 --- /dev/null +++ b/examples/basic/structured_session_demo.py @@ -0,0 +1,128 @@ +"""A script to test and demonstrate the structured session storage feature.""" + +import asyncio +import random +import sqlite3 + +from agents import Agent, Runner, SQLiteSession, function_tool + + +async def main(): + # Create a tool + @function_tool + def get_random_number(max_val: int) -> int: + """Get a random number between 0 and max_val.""" + return random.randint(0, max_val) + + # Create an agent + agent = Agent( + name="Assistant", + instructions="Reply very concisely. When using tools, explain what you're doing.", + tools=[get_random_number], + ) + + # Create a session with structured storage enabled + db_path = "structured_conversation_demo.db" + session = SQLiteSession("demo_session", db_path, structured=True) + + print("=== Structured Session Storage Demo ===") + print("This demo shows structured storage that makes conversations easy to query.\n") + + # First turn + print("First turn:") + print("User: Pick a random number between 0 and 100") + result = await Runner.run( + agent, + "Pick a random number between 0 and 100", + session=session + ) + print(f"Assistant: {result.final_output}") + print() + + # Second turn - the agent will remember the previous conversation + print("Second turn:") + print("User: What number did you pick for me?") + result = await Runner.run( + agent, + "What number did you pick for me?", + session=session + ) + print(f"Assistant: {result.final_output}") + print() + + # Third turn - another tool call + print("Third turn:") + print("User: Now pick a number between 0 and 50") + result = await Runner.run( + agent, + "Now pick a number between 0 and 50", + session=session + ) + print(f"Assistant: {result.final_output}") + print() + + print("=== Conversation Complete ===") + print(f"Data stored in: {db_path}") + print() + + # Now demonstrate the structured storage benefits + print("=== Structured Storage Analysis ===") + print("With structured storage, you can easily query the conversation:") + print() + + conn = sqlite3.connect(db_path) + + # Show all messages + print("1. All conversation messages:") + cursor = conn.execute(""" + SELECT role, content FROM agent_conversation_messages + WHERE session_id = 'demo_session' + ORDER BY created_at + """) + for role, content in cursor.fetchall(): + content_preview = content[:60] + "..." if len(content) > 60 else content + print(f" {role}: {content_preview}") + print() + + # Show all tool calls + print("2. All tool calls and results:") + cursor = conn.execute(""" + SELECT tool_name, arguments, output, status + FROM agent_tool_calls + WHERE session_id = 'demo_session' + ORDER BY created_at + """) + for tool_name, arguments, output, status in cursor.fetchall(): + print(f" Tool: {tool_name}") + print(f" Args: {arguments}") + print(f" Result: {output}") + print(f" Status: {status}") + print() + + # Show message count by role + print("3. Message count by role:") + cursor = conn.execute(""" + SELECT role, COUNT(*) as count + FROM agent_conversation_messages + WHERE session_id = 'demo_session' + GROUP BY role + """) + for role, count in cursor.fetchall(): + print(f" {role}: {count} messages") + print() + + conn.close() + session.close() + + print("=== Query Examples ===") + print("You can now run SQL queries like:") + print("• SELECT * FROM agent_conversation_messages WHERE role = 'user';") + print("• SELECT tool_name, COUNT(*) FROM agent_tool_calls GROUP BY tool_name;") + print("• SELECT * FROM agent_tool_calls WHERE status = 'completed';") + print() + print("This makes conversation analysis, debugging, and building editing") + print("tools much easier than parsing JSON blobs!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/agents/memory/session.py b/src/agents/memory/session.py index 8db0971eb..f95987c25 100644 --- a/src/agents/memory/session.py +++ b/src/agents/memory/session.py @@ -118,6 +118,10 @@ def __init__( db_path: str | Path = ":memory:", sessions_table: str = "agent_sessions", messages_table: str = "agent_messages", + *, + structured: bool = False, + conversation_table: str = "agent_conversation_messages", + tool_calls_table: str = "agent_tool_calls", ): """Initialize the SQLite session. @@ -127,11 +131,20 @@ def __init__( sessions_table: Name of the table to store session metadata. Defaults to 'agent_sessions' messages_table: Name of the table to store message data. Defaults to 'agent_messages' + structured: If True, enables structured storage mode, creating + additional tables for messages and tool calls. Defaults to False. + conversation_table: Name for the structured conversation messages table. + Defaults to 'agent_conversation_messages'. + tool_calls_table: Name for the structured tool calls table. + Defaults to 'agent_tool_calls'. """ self.session_id = session_id self.db_path = db_path self.sessions_table = sessions_table self.messages_table = messages_table + self.structured = structured + self.conversation_table = conversation_table + self.tool_calls_table = tool_calls_table self._local = threading.local() self._lock = threading.Lock() @@ -141,11 +154,13 @@ def __init__( if self._is_memory_db: self._shared_connection = sqlite3.connect(":memory:", check_same_thread=False) self._shared_connection.execute("PRAGMA journal_mode=WAL") + self._shared_connection.execute("PRAGMA foreign_keys=ON") self._init_db_for_connection(self._shared_connection) else: # For file databases, initialize the schema once since it persists init_conn = sqlite3.connect(str(self.db_path), check_same_thread=False) init_conn.execute("PRAGMA journal_mode=WAL") + init_conn.execute("PRAGMA foreign_keys=ON") self._init_db_for_connection(init_conn) init_conn.close() @@ -162,6 +177,7 @@ def _get_connection(self) -> sqlite3.Connection: check_same_thread=False, ) self._local.connection.execute("PRAGMA journal_mode=WAL") + self._local.connection.execute("PRAGMA foreign_keys=ON") assert isinstance(self._local.connection, sqlite3.Connection), ( f"Expected sqlite3.Connection, got {type(self._local.connection)}" ) @@ -201,6 +217,63 @@ def _init_db_for_connection(self, conn: sqlite3.Connection) -> None: conn.commit() + # Create additional structured tables if enabled + if getattr(self, "structured", False): + # Conversation messages table + conn.execute( + f""" + CREATE TABLE IF NOT EXISTS {self.conversation_table} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + raw_event_id INTEGER NOT NULL, + role TEXT, + content TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (session_id) REFERENCES {self.sessions_table} (session_id) + ON DELETE CASCADE, + FOREIGN KEY (raw_event_id) REFERENCES {self.messages_table} (id) + ON DELETE CASCADE + ) + """ + ) + + conn.execute( + f""" + CREATE INDEX IF NOT EXISTS idx_{self.conversation_table}_session_id + ON {self.conversation_table} (session_id, created_at) + """ + ) + + # Tool calls table + conn.execute( + f""" + CREATE TABLE IF NOT EXISTS {self.tool_calls_table} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + raw_event_id INTEGER NOT NULL, + call_id TEXT, + tool_name TEXT, + arguments JSON, + output JSON, + status TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (session_id) REFERENCES {self.sessions_table} (session_id) + ON DELETE CASCADE, + FOREIGN KEY (raw_event_id) REFERENCES {self.messages_table} (id) + ON DELETE CASCADE + ) + """ + ) + + conn.execute( + f""" + CREATE INDEX IF NOT EXISTS idx_{self.tool_calls_table}_session_id + ON {self.tool_calls_table} (session_id, created_at) + """ + ) + + conn.commit() + async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: """Retrieve the conversation history for this session. @@ -278,13 +351,86 @@ def _add_items_sync(): ) # Add items - message_data = [(self.session_id, json.dumps(item)) for item in items] - conn.executemany( - f""" - INSERT INTO {self.messages_table} (session_id, message_data) VALUES (?, ?) - """, - message_data, - ) + if not self.structured: + # Flat storage: bulk insert for performance + message_data = [(self.session_id, json.dumps(item)) for item in items] + conn.executemany( + f""" + INSERT INTO {self.messages_table} (session_id, message_data) VALUES (?, ?) + """, + message_data, + ) + else: + # Structured storage: insert each item individually so we can capture rowid + for item in items: + raw_json = json.dumps(item) + cursor = conn.execute( + f""" + INSERT INTO {self.messages_table} (session_id, message_data) + VALUES (?, ?) + RETURNING id + """, + (self.session_id, raw_json), + ) + raw_event_id = cursor.fetchone()[0] + + # Handle structured inserts + if "role" in item: + role = item.get("role") + content_val = item.get("content") + try: + content_str = ( + json.dumps(content_val) + if content_val is not None + else None + ) + except TypeError: + content_str = str(content_val) + + conn.execute( + f""" + INSERT INTO {self.conversation_table} + (session_id, raw_event_id, role, content) + VALUES (?, ?, ?, ?) + """, + (self.session_id, raw_event_id, role, content_str), + ) + + event_type = item.get("type") + if event_type == "function_call": + call_id = item.get("call_id") + tool_name = item.get("name") + arguments_val = item.get("arguments") + conn.execute( + f""" + INSERT INTO {self.tool_calls_table} + (session_id, raw_event_id, call_id, tool_name, arguments, status) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + self.session_id, + raw_event_id, + call_id, + tool_name, + arguments_val, + item.get("status"), + ), + ) + elif event_type == "function_call_output": + call_id = item.get("call_id") + output_val = item.get("output") + conn.execute( + f""" + UPDATE {self.tool_calls_table} + SET output = ?, status = 'completed' + WHERE session_id = ? AND call_id = ? + """, + ( + json.dumps(output_val) if output_val is not None else None, + self.session_id, + call_id, + ), + ) # Update session timestamp conn.execute( @@ -326,6 +472,7 @@ def _pop_item_sync(): ) result = cursor.fetchone() + conn.commit() if result: @@ -334,7 +481,6 @@ def _pop_item_sync(): item = json.loads(message_data) return item except json.JSONDecodeError: - # Return None for corrupted JSON entries (already deleted) return None return None diff --git a/tests/test_structured_session.py b/tests/test_structured_session.py new file mode 100644 index 000000000..28d4cbbfa --- /dev/null +++ b/tests/test_structured_session.py @@ -0,0 +1,273 @@ +"""Tests for structured session storage functionality.""" + +import sqlite3 +import tempfile +from pathlib import Path + +import pytest + +from agents import Agent, Runner, SQLiteSession, function_tool +from agents.items import TResponseInputItem + +from .fake_model import FakeModel +from .test_responses import get_text_message + + +@pytest.mark.asyncio +async def test_structured_session_creation(): + """Test that structured session creates the additional tables.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_structured.db" + session = SQLiteSession("test_session", db_path, structured=True) + + # Check that the structured tables were created + conn = sqlite3.connect(str(db_path)) + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" + ) + tables = [row[0] for row in cursor.fetchall()] + conn.close() + + expected_tables = [ + "agent_conversation_messages", + "agent_messages", + "agent_sessions", + "agent_tool_calls", + ] + for table in expected_tables: + assert table in tables + + session.close() + + +@pytest.mark.asyncio +async def test_structured_session_disabled_by_default(): + """Test that structured tables are not created when structured=False.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_flat.db" + session = SQLiteSession("test_session", db_path, structured=False) + + # Check that only the basic tables were created + conn = sqlite3.connect(str(db_path)) + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" + ) + tables = [row[0] for row in cursor.fetchall()] + conn.close() + + expected_tables = ["agent_messages", "agent_sessions"] + for table in expected_tables: + assert table in tables + + # Structured tables should not exist + assert "agent_conversation_messages" not in tables + assert "agent_tool_calls" not in tables + + session.close() + + +@pytest.mark.asyncio +async def test_structured_session_conversation_flow(): + """Test a full conversation flow with structured storage.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_conversation.db" + session = SQLiteSession("test_session", db_path, structured=True) + + # Create a simple tool for testing + @function_tool + def get_test_number(max_val: int = 100) -> int: + """Get a test number.""" + return 42 + + model = FakeModel() + agent = Agent(name="test", model=model, tools=[get_test_number]) + + # Simulate a simple message without tool calls for this test + model.set_next_output([get_text_message("I'll pick a random number: 42")]) + + await Runner.run( + agent, + "Pick a random number", + session=session + ) + + # Check that data was stored in structured tables + conn = sqlite3.connect(str(db_path)) + + # Check conversation messages table + cursor = conn.execute( + """SELECT role, content FROM agent_conversation_messages + WHERE session_id = ? ORDER BY created_at""", + ("test_session",) + ) + conversation_rows = cursor.fetchall() + + # Should have user message and potentially assistant message + assert len(conversation_rows) >= 1 + assert conversation_rows[0][0] == "user" # First should be user role + assert "Pick a random number" in conversation_rows[0][1] + + # Check tool calls table (should be empty for this simple message test) + cursor = conn.execute( + "SELECT COUNT(*) FROM agent_tool_calls WHERE session_id = ?", + ("test_session",) + ) + tool_call_count = cursor.fetchone()[0] + assert tool_call_count == 0 # No tool calls in this simple test + + conn.close() + session.close() + + +@pytest.mark.asyncio +async def test_structured_session_backward_compatibility(): + """Test that structured=True doesn't break existing functionality.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_compat.db" + session = SQLiteSession("test_session", db_path, structured=True) + + model = FakeModel() + agent = Agent(name="test", model=model) + + # First turn + model.set_next_output([get_text_message("Hello!")]) + result1 = await Runner.run(agent, "Hi there", session=session) + assert result1.final_output == "Hello!" + + # Second turn - should have conversation history + model.set_next_output([get_text_message("I remember you said hi")]) + result2 = await Runner.run(agent, "What did I say?", session=session) + assert result2.final_output == "I remember you said hi" + + # Verify conversation history is working + items = await session.get_items() + assert len(items) >= 2 # Should have multiple items from the conversation + + session.close() + + +@pytest.mark.asyncio +async def test_structured_session_pop_item(): + """Test that pop_item works correctly with structured storage.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_pop.db" + session = SQLiteSession("test_session", db_path, structured=True) + + # Add some test items + items: list[TResponseInputItem] = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ] + await session.add_items(items) + + # Pop the last item + popped = await session.pop_item() + assert popped is not None + assert popped.get("role") == "assistant" + assert popped.get("content") == "Hi there!" + + # Check that structured tables are also cleaned up + conn = sqlite3.connect(str(db_path)) + cursor = conn.execute( + "SELECT COUNT(*) FROM agent_conversation_messages WHERE session_id = ?", + ("test_session",) + ) + count = cursor.fetchone()[0] + conn.close() + + # Should only have 1 message left (the user message) + assert count == 1 + + session.close() + + +@pytest.mark.asyncio +async def test_structured_session_clear(): + """Test that clear_session works correctly with structured storage.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_clear.db" + session = SQLiteSession("test_session", db_path, structured=True) + + # Add some test items + items: list[TResponseInputItem] = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + { + "type": "function_call", + "call_id": "call_123", + "name": "test_tool", + "arguments": '{"param": "value"}', + "status": "completed" + } + ] + await session.add_items(items) + + # Clear the session + await session.clear_session() + + # Check that all tables are empty + conn = sqlite3.connect(str(db_path)) + + cursor = conn.execute( + "SELECT COUNT(*) FROM agent_messages WHERE session_id = ?", + ("test_session",) + ) + assert cursor.fetchone()[0] == 0 + + cursor = conn.execute( + "SELECT COUNT(*) FROM agent_conversation_messages WHERE session_id = ?", + ("test_session",) + ) + assert cursor.fetchone()[0] == 0 + + cursor = conn.execute( + "SELECT COUNT(*) FROM agent_tool_calls WHERE session_id = ?", + ("test_session",) + ) + assert cursor.fetchone()[0] == 0 + + conn.close() + session.close() + + +@pytest.mark.asyncio +async def test_flat_vs_structured_storage_equivalence(): + """Test that flat and structured storage produce equivalent get_items results.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path_flat = Path(temp_dir) / "test_flat.db" + db_path_structured = Path(temp_dir) / "test_structured.db" + + session_flat = SQLiteSession("test_session", db_path_flat, structured=False) + session_structured = SQLiteSession("test_session", db_path_structured, structured=True) + + # Add the same items to both sessions + items: list[TResponseInputItem] = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + { + "type": "function_call", + "call_id": "call_123", + "name": "test_tool", + "arguments": '{"param": "value"}', + "status": "completed" + }, + { + "type": "function_call_output", + "call_id": "call_123", + "output": "result" + } + ] + + await session_flat.add_items(items) + await session_structured.add_items(items) + + # Get items from both sessions + items_flat = await session_flat.get_items() + items_structured = await session_structured.get_items() + + # Should be identical + assert len(items_flat) == len(items_structured) + assert items_flat == items_structured + + session_flat.close() + session_structured.close() From 2e450b360ab7119416477cf1779c01235ca1b8df Mon Sep 17 00:00:00 2001 From: habema Date: Thu, 14 Aug 2025 15:56:47 +0300 Subject: [PATCH 2/2] fix mypy --- examples/realtime/app/server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/realtime/app/server.py b/examples/realtime/app/server.py index 73fcf3e56..04f3def43 100644 --- a/examples/realtime/app/server.py +++ b/examples/realtime/app/server.py @@ -4,11 +4,12 @@ import logging import struct from contextlib import asynccontextmanager -from typing import TYPE_CHECKING, Any, assert_never +from typing import TYPE_CHECKING, Any from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles +from typing_extensions import assert_never from agents.realtime import RealtimeRunner, RealtimeSession, RealtimeSessionEvent