Skip to main content
This example shows how to protect streaming LLM responses, detecting threats and PII as tokens arrive rather than waiting for the complete response.

Overview

Streaming presents unique security challenges:
  • Responses arrive token-by-token
  • Threats may span multiple chunks
  • Users see partial content before full analysis
PromptGuard handles streaming with real-time scanning.

How Streaming Protection Works

Implementation

import promptguard
promptguard.init(
    api_key="pg_xxx",
    mode="enforce",
    scan_responses=True,
)

from openai import OpenAI
client = OpenAI()

# Streaming works exactly as before - protection is automatic
stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Write a story about a hacker"}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

With Response Scanning

Enable scan_responses=True to scan the complete response after streaming:
promptguard.init(
    api_key="pg_xxx",
    mode="enforce",
    scan_responses=True,  # Scan after stream completes
)

# If the full response contains threats, an error is raised
# after the stream completes
try:
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello"}],
        stream=True,
    )

    full_response = ""
    for chunk in stream:
        content = chunk.choices[0].delta.content or ""
        full_response += content
        print(content, end="", flush=True)

    # Response scan happens here (at stream end)
    print("\n\nStream completed safely!")

except promptguard.PromptGuardBlockedError as e:
    print(f"\n\nResponse contained: {e.decision.threat_type}")

Real-Time Chunk Scanning

For immediate threat detection during streaming:
from promptguard import GuardClient

guard = GuardClient(api_key="pg_xxx")

def secure_stream(messages: list):
    """Stream with real-time scanning."""
    from openai import OpenAI
    client = OpenAI()

    stream = client.chat.completions.create(
        model="gpt-4",
        messages=messages,
        stream=True,
    )

    buffer = ""
    chunk_size = 50  # Scan every 50 characters

    for chunk in stream:
        content = chunk.choices[0].delta.content or ""
        buffer += content

        # Scan when buffer reaches threshold
        if len(buffer) >= chunk_size:
            decision = guard.scan(
                messages=[{"role": "assistant", "content": buffer}],
                direction="output",
            )

            if decision.blocked:
                yield "[CONTENT BLOCKED]"
                return

            if decision.redacted:
                yield decision.redacted_messages[0]["content"]
            else:
                yield buffer

            buffer = ""

    # Scan remaining buffer
    if buffer:
        decision = guard.scan(
            messages=[{"role": "assistant", "content": buffer}],
            direction="output",
        )

        if decision.blocked:
            yield "[CONTENT BLOCKED]"
        elif decision.redacted:
            yield decision.redacted_messages[0]["content"]
        else:
            yield buffer

# Usage
for safe_chunk in secure_stream([{"role": "user", "content": "Tell me a story"}]):
    print(safe_chunk, end="", flush=True)

Node.js Streaming

Auto-Instrumentation

import { init } from 'promptguard-sdk';
import OpenAI from 'openai';

init({
  apiKey: 'pg_xxx',
  mode: 'enforce',
  scanResponses: true,
});

const client = new OpenAI();

async function streamChat() {
  const stream = await client.chat.completions.create({
    model: 'gpt-4',
    messages: [{ role: 'user', content: 'Write a poem' }],
    stream: true,
  });

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    process.stdout.write(content);
  }
}

streamChat();

With Vercel AI SDK

import { init, promptGuardMiddleware } from 'promptguard-sdk';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';

init({ apiKey: 'pg_xxx' });

async function handler(req: Request) {
  const { messages } = await req.json();

  const result = await streamText({
    model: openai('gpt-4'),
    messages,
    experimental_middleware: promptGuardMiddleware({
      scanResponses: true,
    }),
  });

  return result.toDataStreamResponse();
}

Server-Sent Events (SSE)

import { init, GuardClient } from 'promptguard-sdk';
import OpenAI from 'openai';
import { Response } from 'express';

init({ apiKey: 'pg_xxx' });

async function streamSSE(res: Response, messages: any[]) {
  const client = new OpenAI();
  const guard = new GuardClient({ apiKey: 'pg_xxx' });

  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');

  const stream = await client.chat.completions.create({
    model: 'gpt-4',
    messages,
    stream: true,
  });

  let buffer = '';

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    buffer += content;

    // Scan periodically
    if (buffer.length >= 100) {
      const decision = await guard.scanAsync({
        messages: [{ role: 'assistant', content: buffer }],
        direction: 'output',
      });

      if (decision.blocked) {
        res.write('data: [BLOCKED]\n\n');
        res.end();
        return;
      }

      const safeContent = decision.redacted
        ? decision.redactedMessages[0].content
        : buffer;

      res.write(`data: ${JSON.stringify({ content: safeContent })}\n\n`);
      buffer = '';
    }
  }

  // Send remaining
  if (buffer) {
    res.write(`data: ${JSON.stringify({ content: buffer })}\n\n`);
  }

  res.write('data: [DONE]\n\n');
  res.end();
}

FastAPI Streaming

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import promptguard
from openai import OpenAI

promptguard.init(api_key="pg_xxx", scan_responses=True)

app = FastAPI()
client = OpenAI()

@app.post("/chat/stream")
async def stream_chat(request: dict):
    messages = request.get("messages", [])

    async def generate():
        stream = client.chat.completions.create(
            model="gpt-4",
            messages=messages,
            stream=True,
        )

        for chunk in stream:
            content = chunk.choices[0].delta.content or ""
            yield f"data: {content}\n\n"

        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Handling Blocked Streams

When a threat is detected mid-stream:
from promptguard import PromptGuardBlockedError

def safe_stream_handler(messages):
    try:
        stream = client.chat.completions.create(
            model="gpt-4",
            messages=messages,
            stream=True,
        )

        for chunk in stream:
            yield chunk.choices[0].delta.content or ""

    except PromptGuardBlockedError as e:
        # Stream was blocked - notify user
        yield "\n\n[Response interrupted for security reasons]"

        # Log the event
        logging.warning(f"Stream blocked: {e.decision.event_id}")

Performance Considerations

ModeLatencySecurity
Input-only scanningMinimalGood
Full response scan+50-100ms at endBetter
Real-time chunk scan+20-50ms per chunkBest

Recommendations

  1. For chat interfaces: Use input scanning + end-of-stream response scan
  2. For sensitive data: Use real-time chunk scanning
  3. For low-latency needs: Use input-only scanning with async response analysis

Best Practices

  1. Buffer appropriately - Don’t scan every token, batch into meaningful chunks
  2. Handle interruptions gracefully - Users may see partial content
  3. Log blocked streams - Track for security analysis
  4. Consider UX - Decide if you show partial content before blocking

Next Steps