This example shows how to protect streaming LLM responses, detecting threats and PII as tokens arrive rather than waiting for the complete response.
Overview
Streaming presents unique security challenges:
Responses arrive token-by-token
Threats may span multiple chunks
Users see partial content before full analysis
PromptGuard handles streaming with real-time scanning.
How Streaming Protection Works
Implementation
Auto-Instrumentation (Recommended)
import promptguard
promptguard.init(
api_key = "pg_xxx" ,
mode = "enforce" ,
scan_responses = True ,
)
from openai import OpenAI
client = OpenAI()
# Streaming works exactly as before - protection is automatic
stream = client.chat.completions.create(
model = "gpt-4" ,
messages = [{ "role" : "user" , "content" : "Write a story about a hacker" }],
stream = True ,
)
for chunk in stream:
if chunk.choices[ 0 ].delta.content:
print (chunk.choices[ 0 ].delta.content, end = "" , flush = True )
With Response Scanning
Enable scan_responses=True to scan the complete response after streaming:
promptguard.init(
api_key = "pg_xxx" ,
mode = "enforce" ,
scan_responses = True , # Scan after stream completes
)
# If the full response contains threats, an error is raised
# after the stream completes
try :
stream = client.chat.completions.create(
model = "gpt-4" ,
messages = [{ "role" : "user" , "content" : "Hello" }],
stream = True ,
)
full_response = ""
for chunk in stream:
content = chunk.choices[ 0 ].delta.content or ""
full_response += content
print (content, end = "" , flush = True )
# Response scan happens here (at stream end)
print ( " \n\n Stream completed safely!" )
except promptguard.PromptGuardBlockedError as e:
print ( f " \n\n Response contained: { e.decision.threat_type } " )
Real-Time Chunk Scanning
For immediate threat detection during streaming:
from promptguard import GuardClient
guard = GuardClient( api_key = "pg_xxx" )
def secure_stream ( messages : list ):
"""Stream with real-time scanning."""
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model = "gpt-4" ,
messages = messages,
stream = True ,
)
buffer = ""
chunk_size = 50 # Scan every 50 characters
for chunk in stream:
content = chunk.choices[ 0 ].delta.content or ""
buffer += content
# Scan when buffer reaches threshold
if len (buffer) >= chunk_size:
decision = guard.scan(
messages = [{ "role" : "assistant" , "content" : buffer}],
direction = "output" ,
)
if decision.blocked:
yield "[CONTENT BLOCKED]"
return
if decision.redacted:
yield decision.redacted_messages[ 0 ][ "content" ]
else :
yield buffer
buffer = ""
# Scan remaining buffer
if buffer:
decision = guard.scan(
messages = [{ "role" : "assistant" , "content" : buffer}],
direction = "output" ,
)
if decision.blocked:
yield "[CONTENT BLOCKED]"
elif decision.redacted:
yield decision.redacted_messages[ 0 ][ "content" ]
else :
yield buffer
# Usage
for safe_chunk in secure_stream([{ "role" : "user" , "content" : "Tell me a story" }]):
print (safe_chunk, end = "" , flush = True )
Node.js Streaming
Auto-Instrumentation
import { init } from 'promptguard-sdk' ;
import OpenAI from 'openai' ;
init ({
apiKey: 'pg_xxx' ,
mode: 'enforce' ,
scanResponses: true ,
});
const client = new OpenAI ();
async function streamChat () {
const stream = await client . chat . completions . create ({
model: 'gpt-4' ,
messages: [{ role: 'user' , content: 'Write a poem' }],
stream: true ,
});
for await ( const chunk of stream ) {
const content = chunk . choices [ 0 ]?. delta ?. content || '' ;
process . stdout . write ( content );
}
}
streamChat ();
With Vercel AI SDK
import { init , promptGuardMiddleware } from 'promptguard-sdk' ;
import { streamText } from 'ai' ;
import { openai } from '@ai-sdk/openai' ;
init ({ apiKey: 'pg_xxx' });
async function handler ( req : Request ) {
const { messages } = await req . json ();
const result = await streamText ({
model: openai ( 'gpt-4' ),
messages ,
experimental_middleware: promptGuardMiddleware ({
scanResponses: true ,
}),
});
return result . toDataStreamResponse ();
}
Server-Sent Events (SSE)
import { init , GuardClient } from 'promptguard-sdk' ;
import OpenAI from 'openai' ;
import { Response } from 'express' ;
init ({ apiKey: 'pg_xxx' });
async function streamSSE ( res : Response , messages : any []) {
const client = new OpenAI ();
const guard = new GuardClient ({ apiKey: 'pg_xxx' });
res . setHeader ( 'Content-Type' , 'text/event-stream' );
res . setHeader ( 'Cache-Control' , 'no-cache' );
const stream = await client . chat . completions . create ({
model: 'gpt-4' ,
messages ,
stream: true ,
});
let buffer = '' ;
for await ( const chunk of stream ) {
const content = chunk . choices [ 0 ]?. delta ?. content || '' ;
buffer += content ;
// Scan periodically
if ( buffer . length >= 100 ) {
const decision = await guard . scanAsync ({
messages: [{ role: 'assistant' , content: buffer }],
direction: 'output' ,
});
if ( decision . blocked ) {
res . write ( 'data: [BLOCKED] \n\n ' );
res . end ();
return ;
}
const safeContent = decision . redacted
? decision . redactedMessages [ 0 ]. content
: buffer ;
res . write ( `data: ${ JSON . stringify ({ content: safeContent }) } \n\n ` );
buffer = '' ;
}
}
// Send remaining
if ( buffer ) {
res . write ( `data: ${ JSON . stringify ({ content: buffer }) } \n\n ` );
}
res . write ( 'data: [DONE] \n\n ' );
res . end ();
}
FastAPI Streaming
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import promptguard
from openai import OpenAI
promptguard.init( api_key = "pg_xxx" , scan_responses = True )
app = FastAPI()
client = OpenAI()
@app.post ( "/chat/stream" )
async def stream_chat ( request : dict ):
messages = request.get( "messages" , [])
async def generate ():
stream = client.chat.completions.create(
model = "gpt-4" ,
messages = messages,
stream = True ,
)
for chunk in stream:
content = chunk.choices[ 0 ].delta.content or ""
yield f "data: { content } \n\n "
yield "data: [DONE] \n\n "
return StreamingResponse(
generate(),
media_type = "text/event-stream"
)
Handling Blocked Streams
When a threat is detected mid-stream:
from promptguard import PromptGuardBlockedError
def safe_stream_handler ( messages ):
try :
stream = client.chat.completions.create(
model = "gpt-4" ,
messages = messages,
stream = True ,
)
for chunk in stream:
yield chunk.choices[ 0 ].delta.content or ""
except PromptGuardBlockedError as e:
# Stream was blocked - notify user
yield " \n\n [Response interrupted for security reasons]"
# Log the event
logging.warning( f "Stream blocked: { e.decision.event_id } " )
Mode Latency Security Input-only scanning Minimal Good Full response scan +50-100ms at end Better Real-time chunk scan +20-50ms per chunk Best
Recommendations
For chat interfaces : Use input scanning + end-of-stream response scan
For sensitive data : Use real-time chunk scanning
For low-latency needs : Use input-only scanning with async response analysis
Best Practices
Buffer appropriately - Don’t scan every token, batch into meaningful chunks
Handle interruptions gracefully - Users may see partial content
Log blocked streams - Track for security analysis
Consider UX - Decide if you show partial content before blocking
Next Steps
Streaming Reference Full streaming documentation
Error Handling Handle errors gracefully