Polar offers an ingestion framework to work with Polar’s event ingestion API.

Want to report events regarding Large Language Model usage, S3 file uploads or something else? Our Ingestion strategies are customized to make it as seamless as possible to fire ingestion events for complex needs.

Javascript SDK

LLM Strategy

Wrap any LLM model from the @ai-sdk/* library, to automatically fire prompt- & completion tokens used by every model call.

pnpm add @polar-sh/ingestion ai @ai-sdk/openai
import { Ingestion } from "@polar-sh/ingestion";
import { LLMStrategy } from "@polar-sh/ingestion/strategies/LLM";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";

// Setup the LLM Ingestion Strategy
const llmIngestion = Ingestion({ accessToken: process.env.POLAR_ACCESS_TOKEN })
  .strategy(new LLMStrategy(openai("gpt-4o")))
  .ingest("openai-usage");

export async function POST(req: Request) {
  const { prompt }: { prompt: string } = await req.json();

  // Get the wrapped LLM model with ingestion capabilities
  // Pass Customer Id to properly annotate the ingestion events with a specific customer
  const model = llmIngestion.client({
    customerId: request.headers.get("X-Polar-Customer-Id") ?? "",
  });

  const { text } = await generateText({
    model,
    system: "You are a helpful assistant.",
    prompt,
  });

  return Response.json({ text });
}

Ingestion Payload

{
  "customerId": "123",
  "name": "openai-usage",
  "metadata": {
    "promptTokens": 100,
    "completionTokens": 200
  }
}

S3 Strategy

Wrap the official AWS S3 Client with our S3 Ingestion Strategy to automatically ingest bytes uploaded.

pnpm add @polar-sh/ingestion @aws-sdk/client-s3
import { Ingestion } from "@polar-sh/ingestion";
import { S3Strategy } from "@polar-sh/ingestion/strategies/S3";
import { PutObjectCommand, S3Client } from "@aws-sdk/client-s3";

const s3Client = new S3Client({
  region: process.env.AWS_REGION,
  endpoint: process.env.AWS_ENDPOINT_URL,
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
  },
});

// Setup the S3 Ingestion Strategy
const s3Ingestion = Ingestion({ accessToken: process.env.POLAR_ACCESS_TOKEN })
  .strategy(new S3Strategy(s3Client))
  .ingest("s3-uploads");

export async function POST(request: Request) {
  try {
    // Get the wrapped S3 Client
    // Pass Customer Id to properly annotate the ingestion events with a specific customer
    const s3 = s3Ingestion.client({
      customerId: request.headers.get("X-Polar-Customer-Id") ?? "",
    });

    await s3.send(
      new PutObjectCommand({
        Bucket: process.env.AWS_BUCKET_NAME,
        Key: "a-random-key",
        Body: JSON.stringify({
          name: "John Doe",
          age: 30,
        }),
        ContentType: "application/json",
      }),
    );

    return Response.json({});
  } catch (error) {
    return Response.json({ error: error.message });
  }
}

Ingestion Payload

{
  "customerId": "123",
  "name": "s3-uploads",
  "metadata": {
    "bytes": 100,
    "bucket": "my-bucket",
    "key": "my-key",
    "contentType": "application/text"
  }
}

Stream Strategy

Wrap any Readable or Writable stream of choice to automatically ingest the bytes consumed.

pnpm add @polar-sh/ingestion
import { Ingestion } from '@polar-sh/ingestion';
import { StreamStrategy } from '@polar-sh/ingestion/strategies/Stream';

const myReadstream = createReadStream(...);

// Setup the Stream Ingestion Strategy
const streamIngestion = Ingestion({ accessToken: process.env.POLAR_ACCESS_TOKEN })
  .strategy(new StreamStrategy(myReadstream))
  .ingest("my-stream");

export async function GET(request: Request) {
  try {

    // Get the wrapped stream
    // Pass Customer Id to properly annotate the ingestion events with a specific customer
    const stream = streamIngestion.client({
      customerId: request.headers.get("X-Polar-Customer-Id") ?? ""
    });

    // Consume stream...
    stream.on('data', () => ...)

    return Response.json({});
  } catch (error) {
    return Response.json({ error: error.message });
  }
}

Ingestion Payload

{
  "customerId": "123",
  "name": "my-stream",
  "metadata": {
    "bytes": 100
  }
}

DeltaTime Strategy

Ingest delta time of arbitrary execution. Bring your own now-resolver.

pnpm add @polar-sh/ingestion
import { Ingestion } from "@polar-sh/ingestion";
import { DeltaTimeStrategy } from "@polar-sh/ingestion/strategies/DeltaTime";

const nowResolver = () => performance.now();
// const nowResolver = () => Number(hrtime.bigint())
// const nowResolver = () => Date.now()

// Setup the Delta Time Ingestion Strategy
const deltaTimeIngestion = Ingestion({
  accessToken: process.env.POLAR_ACCESS_TOKEN,
})
  .strategy(new DeltaTimeStrategy(nowResolver))
  .ingest("execution-time");

export async function GET(request: Request) {
  try {
    // Get the wrapped start clock function
    // Pass Customer Id to properly annotate the ingestion events with a specific customer
    const start = deltaTimeIngestion.client({
      customerId: request.headers.get("X-Polar-Customer-Id") ?? "",
    });

    const stop = start();

    await sleep(1000);

    // { deltaTime: xxx } is automatically ingested to Polar
    const delta = stop();

    return Response.json({ delta });
  } catch (error) {
    return Response.json({ error: error.message });
  }
}

Ingestion Payload

{
  "customerId": "123",
  "name": "execution-time",
  "metadata": {
    "deltaTime": 1000
  }
}

Help us improve

We’re always looking for ways to improve our ingestion strategies. Feel free to contribute — Polar Ingestion SDK.

Python SDK

Our Python SDK includes an ingestion helper and strategies for common use cases. It’s installed as part of the Polar SDK.

pip install polar-sdk

Ingestion helper

The ingestion helper is a simple wrapper around the Polar events ingestion API. It takes care of batching and sending events to Polar in the background, without blocking your main thread.

import os
from polar_sdk.ingestion import Ingestion

ingestion = Ingestion(os.getenv("POLAR_ACCESS_TOKEN"))

ingestion.ingest({
    "name": "my-event",
    "external_customer_id": "CUSTOMER_ID",
    "metadata": {
        "usage": 13.37,
    }
})

PydanticAI Strategy

PydanticAI is an AI agent framework for Python. A common use-case with AI applications is to track the usage of LLMs, like the number of input and output tokens, and bill the customer accordingly.

With our PydanticAI strategy, you can easily track the usage of LLMs and send the data to Polar for billing.

import os
from polar_sdk.ingestion import Ingestion
from polar_sdk.ingestion.strategies import PydanticAIStrategy
from pydantic import BaseModel
from pydantic_ai import Agent


ingestion = Ingestion(os.getenv("POLAR_ACCESS_TOKEN"))
strategy = ingestion.strategy(PydanticAIStrategy, "ai_usage")


class MyModel(BaseModel):
    city: str
    country: str


agent = Agent("gpt-4.1-nano", output_type=MyModel)

if __name__ == '__main__':
    result = agent.run_sync("The windy city in the US of A.")
    print(result.output)
    strategy.ingest("CUSTOMER_ID", result)

This example is inspired from the Pydantic Model example of PydanticAI documentation.

Ingestion Payload

{
  "name": "ai_usage",
  "external_customer_id": "CUSTOMER_ID",
  "metadata": {
    "requests": 1,
    "total_tokens": 78,
    "request_tokens": 58,
    "response_tokens": 20
  }
}

Was this page helpful?