Swap one import — your OpenAI, Anthropic, Bedrock, Gemini, or Mistral client becomes a drop-in that attributes every call to the agent and run that made it. Construction and call sites stay the same — no proxy, no prompt changes.
spaturzu sits at the call site, not the invoice. It records token counts and cost as each request happens — your keys still call the provider directly, and prompts never touch us.
The Python SDK is published on PyPI as spaturzu, and the Node SDK on npm as @spaturzu/sdk. Install it alongside the provider client you already use.
# published on npm
pnpm add @spaturzu/sdk openai# published on PyPI
pip install spaturzu openaiThe SDK reads these on construction. Set them in your shell, your process manager, or a .env file.
SPATURZU_API_KEYYour project key. Create or copy it from the Onboarding page in your dashboard — the SDK authenticates every call to the gateway with it.
SPATURZU_BASE_URLYour spaturzu gateway URL. The SDK posts token and cost metadata here; it falls back to https://spaturzu-api.superchiu.org if unset.
OPENAI_API_KEYYour provider key (or ANTHROPIC_API_KEY). The drop-in client calls OpenAI or Anthropic directly with it — exactly as the real client does — and it never reaches the spaturzu gateway.
Change the import specifier (openai → @spaturzu/sdk/openai in Node, spaturzu.openai in Python) — construction and call sites are unchanged. Then tag any call with the agent that made it: .withAgent("name") — no wrapper, no closure. To group several calls into one unit of work, wrap them in run(...) (see Attribution).
// Swap "openai" for "@spaturzu/sdk/openai" — construction + calls unchanged.
import OpenAI from "@spaturzu/sdk/openai";
import { flush } from "@spaturzu/sdk";
// SPATURZU_API_KEY + SPATURZU_BASE_URL are read from env; OpenAI() reads
// OPENAI_API_KEY, exactly like the real client.
const openai = new OpenAI();
async function main() {
// Tag the call with the agent that made it - no wrapper, no closure.
const res = await openai.withAgent("support-triage").chat.completions.create({
model: "gpt-4o-mini",
messages: [{ role: "user", content: "Summarise this ticket" }],
});
console.log(res.choices[0]?.message?.content);
// CLIs, Lambdas, CI: flush queued metadata before the process exits.
await flush();
}
main();
# Swap "openai" for "spaturzu.openai" — construction + calls unchanged.
from spaturzu.openai import OpenAI
from spaturzu import flush
# SPATURZU_API_KEY + SPATURZU_BASE_URL are read from env; OpenAI() reads
# OPENAI_API_KEY, exactly like the real client.
openai = OpenAI()
# Tag the call with the agent that made it - no wrapper, no with-block.
res = openai.with_agent("support-triage").chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Summarise this ticket"}],
)
print(res.choices[0].message.content)
# CLIs, Lambdas, CI: flush queued metadata before the process exits.
flush()
Need a custom-configured or injected client (Azure, a proxy baseURL, a client you built elsewhere)? The explicit new Spaturzu() + sp.wrapOpenAI(client) path still works — see Providers → Bring your own client.
npx tsx agent.ts
# Node 22.6+: node --experimental-strip-types agent.tspython agent.pyWithin a couple of seconds of the first call, the agent, its run, and the cost appear in your dashboard. Short-lived processes (CLIs, Lambdas, CI jobs) must call flush() before exit — otherwise queued metadata is dropped when the process ends.
spaturzu ships a drop-in module per provider — swap the import and your client is auto-instrumented from env, construction and call sites unchanged. Each instruments only the inference methods listed below; everything else on the client passes through untouched.
Same in Python — from spaturzu.openai import OpenAI, spaturzu.anthropic, spaturzu.google (Client), spaturzu.mistral, spaturzu.bedrock (BedrockRuntime). See the Python SDK reference for full Python signatures.
| Provider | Drop-in import | Methods intercepted | Install |
|---|---|---|---|
| OpenAI | @spaturzu/sdk/openai | chat.completions.create | pnpm add openai |
| Anthropic | @spaturzu/sdk/anthropic | messages.create | pnpm add @anthropic-ai/sdk |
| Bedrock | @spaturzu/sdk/bedrock | converse, converseStream | pnpm add @aws-sdk/client-bedrock-runtime |
spaturzu attributes every call to a hierarchy of context: process-wide tags from configure(), frame-scoped tags from run(...), per-call agents from .withAgent(...), and (for nested frames) an agentPath array tracking the parent → child chain. The hierarchy is built per process; nothing leaves your servers besides token counts, cost, and the tags you set.
Same API in Python — run() is a context manager importable from spaturzu: with run("nightly-report"): for sync code, async with run("nightly-report"): for async. Frame-scoped tags pass as a tags= kwarg: async with run("synthesize", tags={"phase": "draft"}):.
.withAgent()The simplest tag is per-call: .withAgent(name) attributes a single call to the agent that made it — no run() block, no wrapper. It nests under any enclosing run() as a sub-agent, and is concurrency-safe — each call gets its own frame. In Python it is .with_agent(name).
// Standalone - its own run, agentPath = ["writer"]
await openai.withAgent("writer").chat.completions.create({ /* ... */ });
// Inside a run() - nests as a sub-agent, agentPath = ["planner", "writer"]
await run("planner", async () => {
await openai.withAgent("writer").chat.completions.create({ /* ... */ });
});# Standalone - its own run, agent_path = ["writer"]
openai.with_agent("writer").chat.completions.create(...)
# Inside a run() - nests as a sub-agent, agent_path = ["planner", "writer"]
with run("planner"):
openai.with_agent("writer").chat.completions.create(...)Set once at startup with configure()— before constructing any drop-in client — and merged into every logged call. Use for dimensions that don't change inside the process: env, deployment, region, version, pod name. Values are coerced to strings at the SDK boundary so the wire format stays . Per-run tags override these on key conflict. (On the explicit path, pass to the constructor instead.)
The same client that meters cost can also enforce it. Pass { budget: { hardCap: true } } as the drop-in client's second argument and it throws BudgetExceededError before the call hits the provider when any applicable hard-cap budget is breached — no token cost incurred for refused calls. Enforcement is in-process; no proxy in the request path.
Same API in Python — pass a spaturzu= kwarg to the drop-in: OpenAI(spaturzu={"budget": {"hard_cap": True, "on_breach": "throw"}}). The error class is spaturzu.BudgetExceededError; field names match.
Budgets are configured in the dashboard's Budgets page, scoped to a project or to a specific agent name. The SDK keeps an in-process policy cache that refreshes via SSE within seconds of a budget being hit; if the SSE connection drops, a polling backstop keeps the cache fresh.
import OpenAI from "@spaturzu/sdk/openai";
// Second arg carries spaturzu options; the first is the usual client config.
const openai = new OpenAI({}, {
budget: { hardCap: true, onBreach: "throw" }, // 'warn' to log + proceed
});from spaturzu.openai import OpenAI
# spaturzu= carries budget/fallback; other kwargs go to the real client.
openai = OpenAI(spaturzu={"budget": {"hard_cap": True, "on_breach": "throw"}})BudgetExceededErrorexposes the scope, period, current cost, limit cost, and agent name. Re-throw anything else so the upstream call's own errors continue to surface normally.
import { BudgetExceededError } from "@spaturzu/sdk";
try {
await run("nightly-report", async () => {
await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: "..." }],
});
});
} catch (err) {
if (err instanceof BudgetExceededError) {
// err.scope — "project" | "agent"
// err.period — "daily" | "monthly"
// err.limitCost — string, e.g. "10.00" (USD)
// err.currentCost — string, e.g. "10.42"
// err.agentName — string | null
console.error(`Budget cap hit: ${err.message}`);
return;
}
throw err;
}from spaturzu import BudgetExceededError
try:
with run("nightly-report"):
openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "..."}],
)
except BudgetExceededError as err:
# err.scope — "project" | "agent"
# err.period — "daily" | "monthly"
# err.limit_cost — str, e.g. "10.00" (USD)
# err.current_cost — str, e.g. "10.42"
# err.agent_name — str | None
print(f"Budget cap hit: {err}")
# Other exceptions propagate normally — only this type is caught.Three knobs to handle the realities of production LLM traffic: cross-provider fallback when a provider 429s or 5xxs, streaming usage capture across heterogeneous provider response formats, and constructor options that shape how the metering plane itself behaves under load.
Same API in Python — spaturzu.flush(timeout_s=None) and spaturzu.shutdown() are sync. Fallback config is a list of dicts. Constructor option names use snake_case but with two divergences from Node: timeout is timeout_s (seconds, float — not timeout_ms), and on_error is (exc, entry) → None (Node passes only the error).
On a retryable upstream error (429, 5xx, connection-class), the wrap walks the chain. Request and response shapes are translated automatically; your code keeps receiving OpenAI-shaped responses even when an Anthropic or Bedrock fallback served the call. All 20 directional pairs (5 providers × 4 other-providers) are supported.
import OpenAI from "@spaturzu/sdk/openai";
import Anthropic from "@anthropic-ai/sdk";
import { BedrockRuntime } from "@aws-sdk/client-bedrock-runtime";
// Fallback targets are raw provider clients, passed as the second arg.
const openai = new OpenAI({}, {
fallback: [
{
provider: "anthropic",
client: new Anthropic(),
model: "claude-3-5-haiku-20241022",
},
{
provider: "bedrock",
client: new BedrockRuntime({ region: "us-east-1" }),
model: "anthropic.claude-3-5-haiku-20241022-v1:0",
},
],
});from spaturzu.openai import OpenAI
from anthropic import Anthropic
import boto3
# spaturzu= carries the fallback chain; targets are raw provider clients.
openai = OpenAI(spaturzu={
"fallback": [
{
"provider": "anthropic",
"client": Anthropic(),
"model": "claude-3-5-haiku-20241022",
},
{
"provider": "bedrock",
"client": boto3.client("bedrock-runtime", region_name="us-east-1"),
"model": "anthropic.claude-3-5-haiku-20241022-v1:0",
},
],
})v1 fallback limits: non-streaming only, text content only, no tools, no response_format. Calls that use any of those features quietly skip fallback and surface the original error.
| Gemini | @spaturzu/sdk/google | models.generateContent, models.generateContentStream | pnpm add @google/genai |
|---|
| Mistral | @spaturzu/sdk/mistral | chat.complete, chat.stream | pnpm add @mistralai/mistralai |
|---|
import OpenAI from "@spaturzu/sdk/openai";
const openai = new OpenAI();
await run("support-triage", async () => {
await openai.chat.completions.create({
model: "gpt-4o-mini",
messages: [{ role: "user", content: "Summarise this ticket" }],
});
});from spaturzu.openai import OpenAI
openai = OpenAI()
with run("support-triage"):
openai.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Summarise this ticket"}],
)import Anthropic from "@spaturzu/sdk/anthropic";
const anthropic = new Anthropic();
await run("researcher", async () => {
await anthropic.messages.create({
model: "claude-3-5-sonnet-20241022",
max_tokens: 256,
messages: [{ role: "user", content: "Hello" }],
});
});from spaturzu.anthropic import Anthropic
anthropic = Anthropic()
with run("researcher"):
anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "Hello"}],
)Note: Anthropic requires max_tokens on every messages.create call.
import { BedrockRuntime } from "@spaturzu/sdk/bedrock";
const bedrock = new BedrockRuntime({ region: "us-east-1" });
await run("triage", async () => {
await bedrock.converse({
modelId: "anthropic.claude-3-5-sonnet-20241022-v2:0",
messages: [{ role: "user", content: [{ text: "Hello" }] }],
});
});from spaturzu.bedrock import BedrockRuntime
bedrock = BedrockRuntime(region_name="us-east-1")
with run("triage"):
bedrock.converse(
modelId="anthropic.claude-3-5-sonnet-20241022-v2:0",
messages=[{"role": "user", "content": [{"text": "Hello"}]}],
)Targets BedrockRuntime — the aggregated client with the named methods converse / converseStream, not the low-level BedrockRuntimeClient. The Command form (client.send(new ConverseCommand(...))) is not instrumented in v1.
import { GoogleGenAI } from "@spaturzu/sdk/google";
const gemini = new GoogleGenAI({ apiKey: process.env.GEMINI_API_KEY });
await run("summariser", async () => {
await gemini.models.generateContent({
model: "gemini-2.5-pro",
contents: [{ role: "user", parts: [{ text: "Hello" }] }],
});
});import os
from spaturzu.google import Client
gemini = Client(api_key=os.environ["GEMINI_API_KEY"])
with run("summariser"):
gemini.models.generate_content(
model="gemini-2.5-pro",
contents=[{"role": "user", "parts": [{"text": "Hello"}]}],
)import { Mistral } from "@spaturzu/sdk/mistral";
const mistral = new Mistral({ apiKey: process.env.MISTRAL_API_KEY });
await run("classifier", async () => {
await mistral.chat.complete({
model: "mistral-large-latest",
messages: [{ role: "user", content: "Hello" }],
});
});import os
from spaturzu.mistral import Mistral
mistral = Mistral(api_key=os.environ["MISTRAL_API_KEY"])
with run("classifier"):
mistral.chat.complete(
model="mistral-large-latest",
messages=[{"role": "user", "content": "Hello"}],
)chat.complete and chat.stream are separate methods — there is no stream: true flag.
Instrumentation duck-types against the runtime shape of chat.completions.create, not a specific SDK build. Any OpenAI-compatible endpoint (vLLM, Together, Groq, any OpenAI-API proxy) works — pass its baseURL to the constructor: new OpenAI({ baseURL }). Cost is computed per the model name in the response, so use a mainstream model id or add a model-registry entry first.
When you need to construct the client yourself — Azure OpenAI, a custom baseURL, or a client your framework injects — use the explicit path: build a Spaturzu instance and wrap the client you already have. The drop-in modules are a thin shortcut over exactly this, so every feature below works the same way.
import { Spaturzu } from "@spaturzu/sdk";
import OpenAI from "openai";
const sp = new Spaturzu({ tags: { env: "prod" } });
const openai = sp.wrapOpenAI(
new OpenAI({ baseURL: "https://api.groq.com/openai/v1" }),
);
await sp.run("support-triage", async () => {
await openai.chat.completions.create({ /* ... */ });
});from spaturzu import spaturzu
from openai import OpenAI
sp = spaturzu(tags={"env": "prod"})
openai = sp.wrap_openai(OpenAI(base_url="https://api.groq.com/openai/v1"))
with sp.run("support-triage"):
openai.chat.completions.create(...)Record<string, string>tagsSpaturzuimport { configure } from "@spaturzu/sdk";
// Call once at startup, before constructing any drop-in client.
configure({ tags: { env: "prod", region: "us-east-1", version: "2.1.0" } });from spaturzu import configure
# Call once at startup, before constructing any drop-in client.
configure(tags={"env": "prod", "region": "us-east-1", "version": "2.1.0"})The three-arg form of run() applies tags to every LLM call inside the frame (and inside any nested run()s, unless they override the same key). Merge order: process tags → frame tags → nested-frame tags; inner scope wins on key conflict.
await run("nightly-report", { tags: { team: "search", phase: "draft" } }, async () => {
await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: "..." }],
});
});with run("nightly-report", tags={"team": "search", "phase": "draft"}):
openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "..."}],
)run() and agentPathNested run()calls share the parent's runId and extend agentPath. The dashboard shows the path as a research / synthesize crumb on every call. Tags merge per the rule above.
await run("research", async () => {
await openai.chat.completions.create({ /* ... */ });
// agentPath = ["research"]
await run("synthesize", { tags: { phase: "draft" } }, async () => {
await anthropic.messages.create({ /* ... */ });
// agentPath = ["research", "synthesize"], tags.phase = "draft"
});
});with run("research"):
openai.chat.completions.create(...) # agent_path = ["research"]
with run("synthesize", tags={"phase": "draft"}):
anthropic.messages.create(...) # path = ["research", "synthesize"], tags.phase = "draft"Per-provider source for the token-usage event on streamed calls — useful if you're seeing zero token counts on a streamed response:
stream_options: { include_usage: true } when streaming; usage arrives in the final delta.message_delta event at end-of-stream.metadata event.usageMetadata.data.usage.If a stream completes without a usage event, the SDK falls back to a local tiktoken estimate and flags the row with usageSource: 'tiktoken'. Estimates are rough for non-OpenAI tokenizers.
Pass these to configure({ ... }) at startup (drop-in) or to the Spaturzu constructor. They shape how the SDK posts metadata to the gateway — never the provider call.
| Option | Default | What it controls |
|---|---|---|
| timeoutMs | 10000 | Per-attempt POST timeout to the gateway. |
| backoffMs | [1000, 2000, 4000, 8000, 16000] | Retry backoff schedule. Length = retries after the initial send. Set [] to disable retries. |
| maxConcurrent | 50 | Max in-flight log POSTs at once. Excess calls queue FIFO. |
| onError | silent (no-op) | Called when a log POST fails after all retries. Default swallows the failure so the customer's app never crashes because the metering plane hiccupped. |
flush() vs shutdown()Long-running servers can ignore both — the connection cleans up on process termination. Short-lived processes (CLIs, Lambdas, CI scripts) must call one of them; otherwise queued log POSTs are dropped and (with budgets enabled) the open SSE socket keeps the event loop alive past your process.exit().
import { flush } from "@spaturzu/sdk";
// CLIs, Lambdas, CI scripts — flush queued metadata before exit
await flush();from spaturzu import flush
# CLIs, Lambdas, CI scripts — flush queued metadata before exit
flush()If you enabled a hard cap ({ budget: { hardCap: true } }), prefer shutdown() instead — it subsumes flush()and also tears down the BudgetGuard's SSE socket so the event loop can exit cleanly.
import { shutdown } from "@spaturzu/sdk";
// Budget-enabled clients: prefer this over flush()
await shutdown();from spaturzu import shutdown
# Budget-enabled clients: prefer this over flush()
shutdown()