Skip to main content
PromptGuard fully supports streaming responses. Security scanning happens on the input before the request is forwarded, so streaming adds no additional latency to token delivery.

How Streaming Works

  1. Your request is sent to PromptGuard
  2. PromptGuard scans the input for threats (~150ms)
  3. If safe, the request is forwarded to the LLM provider
  4. The LLM provider streams tokens directly back through PromptGuard
  5. Tokens arrive in real-time as they’re generated

Using the OpenAI SDK

The simplest way to stream — works with your existing OpenAI/Anthropic code.
from openai import OpenAI

client = OpenAI(
    api_key="your_promptguard_api_key",
    base_url="https://api.promptguard.co/api/v1"
)

stream = client.chat.completions.create(
    model="gpt-5-nano",
    messages=[{"role": "user", "content": "Explain quantum computing"}],
    stream=True
)

for chunk in stream:
    content = chunk.choices[0].delta.content
    if content is not None:
        print(content, end="", flush=True)

Using the PromptGuard SDK

from promptguard import PromptGuard

pg = PromptGuard(api_key="pg_xxx")

stream = pg.chat.completions.create(
    model="gpt-5-nano",
    messages=[{"role": "user", "content": "Write a short story"}],
    stream=True
)

for chunk in stream:
    content = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
    if content:
        print(content, end="", flush=True)

Server-Sent Events (SSE)

When streaming, the API returns Server-Sent Events. Each event contains a JSON chunk:
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Framework Integration

FastAPI (Python)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()

client = OpenAI(
    api_key="your_promptguard_api_key",
    base_url="https://api.promptguard.co/api/v1"
)

@app.post("/chat/stream")
async def stream_chat(message: str):
    def generate():
        stream = client.chat.completions.create(
            model="gpt-5-nano",
            messages=[{"role": "user", "content": message}],
            stream=True
        )
        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                yield f"data: {content}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Express (Node.js)

import express from 'express';
import OpenAI from 'openai';

const app = express();
app.use(express.json());

const openai = new OpenAI({
  apiKey: process.env.PROMPTGUARD_API_KEY,
  baseURL: 'https://api.promptguard.co/api/v1'
});

app.post('/chat/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  const stream = await openai.chat.completions.create({
    model: 'gpt-5-nano',
    messages: [{ role: 'user', content: req.body.message }],
    stream: true
  });

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      res.write(`data: ${JSON.stringify({ content })}\n\n`);
    }
  }

  res.write('data: [DONE]\n\n');
  res.end();
});

Next.js (React)

// app/api/chat/route.ts
import OpenAI from 'openai';

const openai = new OpenAI({
  apiKey: process.env.PROMPTGUARD_API_KEY!,
  baseURL: 'https://api.promptguard.co/api/v1'
});

export async function POST(req: Request) {
  const { message } = await req.json();

  const stream = await openai.chat.completions.create({
    model: 'gpt-5-nano',
    messages: [{ role: 'user', content: message }],
    stream: true
  });

  const encoder = new TextEncoder();
  const readable = new ReadableStream({
    async start(controller) {
      for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content;
        if (content) {
          controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`));
        }
      }
      controller.enqueue(encoder.encode('data: [DONE]\n\n'));
      controller.close();
    }
  });

  return new Response(readable, {
    headers: { 'Content-Type': 'text/event-stream' }
  });
}

Error Handling During Streaming

Errors during streaming are delivered as SSE events:
try:
    stream = client.chat.completions.create(
        model="gpt-5-nano",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            print(content, end="")
except Exception as e:
    if "policy_violation" in str(e):
        print("\nRequest blocked by security policy")
    elif "rate_limit" in str(e):
        print("\nRate limited - retry with backoff")
    else:
        print(f"\nError: {e}")
Security blocks happen before streaming begins (during input scanning). If a request passes the security check, the stream will complete normally. You won’t receive a mid-stream security block.

Performance

MetricValue
Input scan overhead~150ms (one-time, before streaming starts)
Per-token overhead~0ms (tokens pass through directly)
Time to first tokenSame as direct provider + ~150ms
Streaming is recommended for all user-facing applications. The perceived latency is significantly lower because users see tokens appear in real-time rather than waiting for the full response.