Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions docs/source/guides/using-aiqtoolkit-ui-and-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,15 @@ result back to the client. The transaction schema is defined by the workflow.
```json
"data": { "value": "No, 4 + 4 (which is 8) is not greater than the current hour of the day (which is 15)." }
```
### Generate Streaming Raw Transaction
- **Route:** `/generate/stream/full`
### Generate Streaming Full Transaction
- **Route:** `/generate/full`
- **Description:** Same as `/generate/stream` but provides raw `IntermediateStep` objects
without any step adaptor translations. This endpoint is used for remote evaluation of workflows.
without any step adaptor translations. Use the `filter_steps` query parameter to filter
steps by type (comma-separated list) or set to 'none' to suppress all intermediate steps.
- **HTTP Request Example:**
```bash
curl --request POST \
--url http://localhost:8000/generate/stream/full \
--url http://localhost:8000/generate/full \
--header 'Content-Type: application/json' \
--data '{
"input_message": "Is 4 + 4 greater than the current hour of the day"
Expand All @@ -132,6 +133,23 @@ result back to the client. The transaction schema is defined by the workflow.
```json
"data": {"value":"No, 4 + 4 (which is 8) is not greater than the current hour of the day (which is 11)."}
```
- **HTTP Request Example with Filter:**
By default, all intermediate steps are streamed. Use the `filter_steps` query parameter to filter steps by type (comma-separated list) or set to `none` to suppress all intermediate steps.

Suppress all intermediate steps (only get final output):
```bash
curl --request POST \
--url 'http://localhost:8000/generate/full?filter_steps=none' \
--header 'Content-Type: application/json' \
--data '{"input_message": "Is 4 + 4 greater than the current hour of the day"}'
```
Get only specific step types:
```bash
curl --request POST \
--url 'http://localhost:8000/generate/full?filter_steps=LLM_END,TOOL_END' \
--header 'Content-Type: application/json' \
--data '{"input_message": "Is 4 + 4 greater than the current hour of the day"}'
```

### Chat Non-Streaming Transaction
- **Route:** `/chat`
Expand Down
2 changes: 1 addition & 1 deletion src/aiq/eval/remote_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def run_workflow_remote_single(self, session: aiohttp.ClientSession, item:

try:
# Use the streaming endpoint
endpoint = f"{self.config.endpoint}/generate/stream/full"
endpoint = f"{self.config.endpoint}/generate/full"
async with session.post(endpoint, json=payload) as response:
response.raise_for_status() # Raise an exception for HTTP errors

Expand Down
41 changes: 25 additions & 16 deletions src/aiq/front_ends/fastapi/fastapi_front_end_plugin_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from aiq.front_ends.fastapi.job_store import JobStore
from aiq.front_ends.fastapi.response_helpers import generate_single_response
from aiq.front_ends.fastapi.response_helpers import generate_streaming_response_as_str
from aiq.front_ends.fastapi.response_helpers import generate_streaming_response_raw_as_str
from aiq.front_ends.fastapi.response_helpers import generate_streaming_response_full_as_str
from aiq.front_ends.fastapi.step_adaptor import StepAdaptor
from aiq.front_ends.fastapi.websocket import AIQWebSocket
from aiq.runtime.session import AIQSessionManager
Expand Down Expand Up @@ -396,14 +396,16 @@ async def get_stream():

def get_streaming_raw_endpoint(streaming: bool, result_type: type | None, output_type: type | None):

async def get_stream():
async def get_stream(filter_steps: str | None = None):

return StreamingResponse(headers={"Content-Type": "text/event-stream; charset=utf-8"},
content=generate_streaming_response_raw_as_str(None,
session_manager=session_manager,
streaming=streaming,
result_type=result_type,
output_type=output_type))
content=generate_streaming_response_full_as_str(
None,
session_manager=session_manager,
streaming=streaming,
result_type=result_type,
output_type=output_type,
filter_steps=filter_steps))

return get_stream

Expand Down Expand Up @@ -443,14 +445,16 @@ def post_streaming_raw_endpoint(request_type: type,
Stream raw intermediate steps without any step adaptor translations.
"""

async def post_stream(payload: request_type):
async def post_stream(payload: request_type, filter_steps: str | None = None):

return StreamingResponse(headers={"Content-Type": "text/event-stream; charset=utf-8"},
content=generate_streaming_response_raw_as_str(payload,
session_manager=session_manager,
streaming=streaming,
result_type=result_type,
output_type=output_type))
content=generate_streaming_response_full_as_str(
payload,
session_manager=session_manager,
streaming=streaming,
result_type=result_type,
output_type=output_type,
filter_steps=filter_steps))

return post_stream

Expand Down Expand Up @@ -478,11 +482,14 @@ async def post_stream(payload: request_type):
)

app.add_api_route(
path=f"{endpoint.path}/stream/full",
path=f"{endpoint.path}/full",
endpoint=get_streaming_raw_endpoint(streaming=True,
result_type=GenerateStreamResponseType,
output_type=GenerateStreamResponseType),
methods=[endpoint.method],
description="Stream raw intermediate steps without any step adaptor translations.\n"
"Use filter_steps query parameter to filter steps by type (comma-separated list) or\
set to 'none' to suppress all intermediate steps.",
)

elif (endpoint.method == "POST"):
Expand Down Expand Up @@ -510,14 +517,16 @@ async def post_stream(payload: request_type):
)

app.add_api_route(
path=f"{endpoint.path}/stream/full",
path=f"{endpoint.path}/full",
endpoint=post_streaming_raw_endpoint(request_type=GenerateBodyType,
streaming=True,
result_type=GenerateStreamResponseType,
output_type=GenerateStreamResponseType),
methods=[endpoint.method],
response_model=GenerateStreamResponseType,
description="Stream raw intermediate steps without any step adaptor translations",
description="Stream raw intermediate steps without any step adaptor translations.\n"
"Use filter_steps query parameter to filter steps by type (comma-separated list) or \
set to 'none' to suppress all intermediate steps.",
responses={500: response_500},
)

Expand Down
52 changes: 33 additions & 19 deletions src/aiq/front_ends/fastapi/response_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,26 @@ async def generate_single_response(
return await runner.result(to_type=result_type)


async def generate_streaming_response_raw(payload: typing.Any,
*,
session_manager: AIQSessionManager,
streaming: bool,
result_type: type | None = None,
output_type: type | None = None) -> AsyncGenerator[AIQResponseSerializable]:
async def generate_streaming_response_full(payload: typing.Any,
*,
session_manager: AIQSessionManager,
streaming: bool,
result_type: type | None = None,
output_type: type | None = None,
filter_steps: str | None = None) -> AsyncGenerator[AIQResponseSerializable]:
"""
Similar to generate_streaming_response but provides raw AIQResponseIntermediateStep objects
without any step adaptor translations.
"""
# Parse filter_steps into a set of allowed types if provided
# Special case: if filter_steps is "none", suppress all steps
allowed_types = None
if filter_steps:
if filter_steps.lower() == "none":
allowed_types = set() # Empty set means no steps allowed
else:
allowed_types = set(filter_steps.split(','))

async with session_manager.run(payload) as runner:
q: AsyncIOProducerConsumerQueue[AIQResponseSerializable] = AsyncIOProducerConsumerQueue()

Expand All @@ -150,7 +160,9 @@ async def pull_result():

async for item in q:
if (isinstance(item, AIQResponseIntermediateStep)):
yield item
# Filter intermediate steps if filter_steps is provided
if allowed_types is None or item.type in allowed_types:
yield item
else:
yield AIQResponsePayloadOutput(payload=item)
except Exception as e:
Expand All @@ -160,20 +172,22 @@ async def pull_result():
await q.close()


async def generate_streaming_response_raw_as_str(payload: typing.Any,
*,
session_manager: AIQSessionManager,
streaming: bool,
result_type: type | None = None,
output_type: type | None = None) -> AsyncGenerator[str]:
async def generate_streaming_response_full_as_str(payload: typing.Any,
*,
session_manager: AIQSessionManager,
streaming: bool,
result_type: type | None = None,
output_type: type | None = None,
filter_steps: str | None = None) -> AsyncGenerator[str]:
"""
Similar to generate_streaming_response_raw but converts the response to a string format.
Similar to generate_streaming_response but converts the response to a string format.
"""
async for item in generate_streaming_response_raw(payload,
session_manager=session_manager,
streaming=streaming,
result_type=result_type,
output_type=output_type):
async for item in generate_streaming_response_full(payload,
session_manager=session_manager,
streaming=streaming,
result_type=result_type,
output_type=output_type,
filter_steps=filter_steps):
if (isinstance(item, AIQResponseIntermediateStep) or isinstance(item, AIQResponsePayloadOutput)):
yield item.get_stream_data()
else:
Expand Down
6 changes: 3 additions & 3 deletions tests/aiq/eval/test_remote_evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def rag_streamed_intermediate_payloads(rag_intermediate_steps) -> list[str]:
@pytest.fixture
def stream_response_app(rag_eval_input, rag_streamed_intermediate_payloads):
"""
Returns an aiohttp app with a /generate/stream/full route that simulates streaming:
Returns an aiohttp app with a /generate/full route that simulates streaming:
- One final output (data line)
- Several intermediate steps (intermediate_data lines)
"""
Expand All @@ -72,7 +72,7 @@ async def stream_response(request):
return resp

app = web.Application()
app.router.add_post("/generate/stream/full", stream_response)
app.router.add_post("/generate/full", stream_response)
return app


Expand Down Expand Up @@ -140,7 +140,7 @@ async def stream_response(request):
return resp

app = web.Application()
app.router.add_post("/generate/stream/full", stream_response)
app.router.add_post("/generate/full", stream_response)
server = TestServer(app)
await server.start_server()

Expand Down
Loading