This example shows how to protect streaming LLM responses, detecting threats and PII as tokens arrive rather than waiting for the complete response.
Overview
Streaming presents unique security challenges:
- Responses arrive token-by-token
- Threats may span multiple chunks
- Users see partial content before full analysis
PromptGuard handles streaming with real-time scanning.
How Streaming Protection Works
Implementation
Auto-Instrumentation (Recommended)
import promptguard
promptguard.init(
api_key="pg_xxx",
mode="enforce",
scan_responses=True,
)
from openai import OpenAI
client = OpenAI()
# Streaming works exactly as before - protection is automatic
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a story about a hacker"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
With Response Scanning
Enable scan_responses=True to scan the complete response after streaming:
promptguard.init(
api_key="pg_xxx",
mode="enforce",
scan_responses=True, # Scan after stream completes
)
# If the full response contains threats, an error is raised
# after the stream completes
try:
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
stream=True,
)
full_response = ""
for chunk in stream:
content = chunk.choices[0].delta.content or ""
full_response += content
print(content, end="", flush=True)
# Response scan happens here (at stream end)
print("\n\nStream completed safely!")
except promptguard.PromptGuardBlockedError as e:
print(f"\n\nResponse contained: {e.decision.threat_type}")
Real-Time Chunk Scanning
For immediate threat detection during streaming:
from promptguard import GuardClient
guard = GuardClient(api_key="pg_xxx")
def secure_stream(messages: list):
"""Stream with real-time scanning."""
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True,
)
buffer = ""
chunk_size = 50 # Scan every 50 characters
for chunk in stream:
content = chunk.choices[0].delta.content or ""
buffer += content
# Scan when buffer reaches threshold
if len(buffer) >= chunk_size:
decision = guard.scan(
messages=[{"role": "assistant", "content": buffer}],
direction="output",
)
if decision.blocked:
yield "[CONTENT BLOCKED]"
return
if decision.redacted:
yield decision.redacted_messages[0]["content"]
else:
yield buffer
buffer = ""
# Scan remaining buffer
if buffer:
decision = guard.scan(
messages=[{"role": "assistant", "content": buffer}],
direction="output",
)
if decision.blocked:
yield "[CONTENT BLOCKED]"
elif decision.redacted:
yield decision.redacted_messages[0]["content"]
else:
yield buffer
# Usage
for safe_chunk in secure_stream([{"role": "user", "content": "Tell me a story"}]):
print(safe_chunk, end="", flush=True)
Node.js Streaming
Auto-Instrumentation
import { init } from 'promptguard-sdk';
import OpenAI from 'openai';
init({
apiKey: 'pg_xxx',
mode: 'enforce',
scanResponses: true,
});
const client = new OpenAI();
async function streamChat() {
const stream = await client.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: 'Write a poem' }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
process.stdout.write(content);
}
}
streamChat();
With Vercel AI SDK
import { init, promptGuardMiddleware } from 'promptguard-sdk';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
init({ apiKey: 'pg_xxx' });
async function handler(req: Request) {
const { messages } = await req.json();
const result = await streamText({
model: openai('gpt-4'),
messages,
experimental_middleware: promptGuardMiddleware({
scanResponses: true,
}),
});
return result.toDataStreamResponse();
}
Server-Sent Events (SSE)
import { init, GuardClient } from 'promptguard-sdk';
import OpenAI from 'openai';
import { Response } from 'express';
init({ apiKey: 'pg_xxx' });
async function streamSSE(res: Response, messages: any[]) {
const client = new OpenAI();
const guard = new GuardClient({ apiKey: 'pg_xxx' });
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
const stream = await client.chat.completions.create({
model: 'gpt-4',
messages,
stream: true,
});
let buffer = '';
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
buffer += content;
// Scan periodically
if (buffer.length >= 100) {
const decision = await guard.scanAsync({
messages: [{ role: 'assistant', content: buffer }],
direction: 'output',
});
if (decision.blocked) {
res.write('data: [BLOCKED]\n\n');
res.end();
return;
}
const safeContent = decision.redacted
? decision.redactedMessages[0].content
: buffer;
res.write(`data: ${JSON.stringify({ content: safeContent })}\n\n`);
buffer = '';
}
}
// Send remaining
if (buffer) {
res.write(`data: ${JSON.stringify({ content: buffer })}\n\n`);
}
res.write('data: [DONE]\n\n');
res.end();
}
FastAPI Streaming
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import promptguard
from openai import OpenAI
promptguard.init(api_key="pg_xxx", scan_responses=True)
app = FastAPI()
client = OpenAI()
@app.post("/chat/stream")
async def stream_chat(request: dict):
messages = request.get("messages", [])
async def generate():
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content or ""
yield f"data: {content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
Handling Blocked Streams
When a threat is detected mid-stream:
from promptguard import PromptGuardBlockedError
def safe_stream_handler(messages):
try:
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True,
)
for chunk in stream:
yield chunk.choices[0].delta.content or ""
except PromptGuardBlockedError as e:
# Stream was blocked - notify user
yield "\n\n[Response interrupted for security reasons]"
# Log the event
logging.warning(f"Stream blocked: {e.decision.event_id}")
| Mode | Latency | Security |
|---|
| Input-only scanning | Minimal | Good |
| Full response scan | +50-100ms at end | Better |
| Real-time chunk scan | +20-50ms per chunk | Best |
Recommendations
- For chat interfaces: Use input scanning + end-of-stream response scan
- For sensitive data: Use real-time chunk scanning
- For low-latency needs: Use input-only scanning with async response analysis
Best Practices
- Buffer appropriately - Don’t scan every token, batch into meaningful chunks
- Handle interruptions gracefully - Users may see partial content
- Log blocked streams - Track for security analysis
- Consider UX - Decide if you show partial content before blocking
Next Steps