Skip to main content

Overview

Anannas supports streaming responses via Server-Sent Events (SSE). Enable streaming by setting stream: true in your request. The Anannas API returns incremental text deltas as tokens are generated, enabling real-time chat interfaces.

Enabling Streaming

Set the stream parameter to true:
{
  "model": "openai/gpt-5-mini",
  "messages": [
    {"role": "user", "content": "Explain quantum computing"}
  ],
  "stream": true,
  "max_tokens": 500
}

Response Format

Streaming responses use Server-Sent Events (SSE) with Content-Type: text/event-stream. Each event contains a JSON object following the OpenAI streaming format.

Event Structure

Each SSE event has this format:
data: <json_object>

The JSON object structure:
type StreamChunk = {
  id: string;
  object: "chat.completion.chunk";
  created: number;
  model: string;
  choices: Array<{
    index: number;
    delta: {
      role?: "assistant";
      content?: string;
      tool_calls?: ToolCall[];
    };
    finish_reason?: "stop" | "length" | "tool_calls" | "content_filter" | null;
  }>;
};

Example Stream

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"openai/gpt-5-mini","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"openai/gpt-5-mini","choices":[{"index":0,"delta":{"content":"Quantum"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"openai/gpt-5-mini","choices":[{"index":0,"delta":{"content":" computing"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"openai/gpt-5-mini","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Implementation Examples

Python

import requests
import json

response = requests.post(
  "https://api.anannas.ai/v1/chat/completions",
  headers={
    "Authorization": "Bearer <ANANNAS_API_KEY>",
    "Content-Type": "application/json",
  },
  json={
    "model": "openai/gpt-5-mini",
    "messages": [{"role": "user", "content": "Count to 5"}],
    "stream": True
  },
  stream=True
)

for line in response.iter_lines():
  if line:
    line = line.decode('utf-8')
    if line.startswith('data: '):
      data = line[6:]  # Remove 'data: ' prefix
      if data == '[DONE]':
        break
      try:
        chunk = json.loads(data)
        if chunk['choices'][0]['delta'].get('content'):
          print(chunk['choices'][0]['delta']['content'], end='', flush=True)
      except json.JSONDecodeError:
        pass

TypeScript/JavaScript

const response = await fetch("https://api.anannas.ai/v1/chat/completions", {
  method: "POST",
  headers: {
    "Authorization": "Bearer <ANANNAS_API_KEY>",
    "Content-Type": "application/json",
  },
  body: JSON.stringify({
    model: "openai/gpt-5-mini",
    messages: [{ role: "user", content: "Count to 5" }],
    stream: true,
  }),
});

const reader = response.body?.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader!.read();
  if (done) break;

  const chunk = decoder.decode(value);
  const lines = chunk.split('\n');

  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = line.slice(6);
      if (data === '[DONE]') break;

      try {
        const json = JSON.parse(data);
        const content = json.choices[0]?.delta?.content;
        if (content) {
          process.stdout.write(content);
        }
      } catch (e) {
        // Ignore parse errors
      }
    }
  }
}

Using OpenAI SDK

The OpenAI SDK handles streaming automatically:
from openai import OpenAI

client = OpenAI(
  base_url="https://api.anannas.ai/v1",
  api_key="<ANANNAS_API_KEY>"
)

stream = client.chat.completions.create(
  model="openai/gpt-5-mini",
  messages=[{"role": "user", "content": "Count to 5"}],
  stream=True
)

for chunk in stream:
  if chunk.choices[0].delta.content:
    print(chunk.choices[0].delta.content, end='', flush=True)

SSE Comments

The stream may include comment lines for keep-alive:
: ANANNAS
These can be ignored per the SSE specification. They serve as connection keep-alive signals.

Cancellation

Cancel streaming requests by closing the connection. Billing stops immediately for supported providers when the connection is closed.

Python

# Close the response stream
response.close()

TypeScript

const controller = new AbortController();

fetch(url, {
  signal: controller.signal,
  // ... other options
});

// Cancel
controller.abort();

Tool Calls in Streams

Tool calls are streamed incrementally. The delta.tool_calls array contains partial tool call information:
{
  "choices": [{
    "delta": {
      "tool_calls": [{
        "index": 0,
        "id": "call_abc123",
        "type": "function",
        "function": {
          "name": "get_weather",
          "arguments": "{\"location\":\""
        }
      }]
    }
  }]
}
Accumulate arguments strings to reconstruct the complete JSON.

Error Handling

Errors in streaming responses are sent as regular SSE events:
data: {"error": {"message": "Rate limit exceeded", "type": "rate_limit_error"}}
Handle errors by checking for the error field in parsed JSON objects.

Finish Reasons

The final chunk includes finish_reason:
  • stop: Natural completion or stop sequence
  • length: Reached max_tokens
  • tool_calls: Model requested tool execution
  • content_filter: Content filtered by safety systems
  • null: Stream incomplete

Usage Tracking

Streaming responses include usage information in the final chunk or a separate event. Accumulate token counts across chunks for accurate usage tracking.

Best Practices

  1. Buffer Management: Accumulate deltas client-side for display
  2. Error Recovery: Implement retry logic for network failures
  3. Connection Management: Handle connection drops gracefully
  4. Token Counting: Track usage from final chunk or usage event
  5. Cancellation: Always cancel streams when user navigates away

See Also