Ledger

Usage Metering

High-throughput event ingestion for tracking customer consumption in real-time.

Usage Metering is the high-performance event ingestion system that records customer consumption of metered features. It's designed for 10,000+ events per second with batched writes, partitioned storage, and exactly-once semantics.

Metering architecture

API Call → Meter() → Ring Buffer → Batch Processor → Store

                    (Non-blocking)

The metering pipeline:

  1. Non-blocking ingestionMeter() returns immediately after adding to buffer
  2. Ring buffer storage — Lock-free circular buffer for high concurrency
  3. Automatic batching — Flush on size or time thresholds
  4. Persistent storage — Batch insert to database for efficiency

Event structure

type Event struct {
    ID             id.UsageID
    SubscriptionID id.SubscriptionID
    CustomerID     id.CustomerID
    TenantID       string
    AppID          string
    Meter          string
    Value          float64
    Timestamp      time.Time
    IdempotencyKey string
    Metadata       map[string]any
}

Meter names

Meters correspond to plan features:

MeterTypeDescription
api-callsCounterIncrement by 1 per request
storage-gbGaugeCurrent storage used in GB
compute-minutesDurationCPU time consumed
bandwidth-gbCounterData transferred

Recording usage

Meter package API

Usage events are recorded with the meter.Record function:

import "github.com/xraph/ledger/meter"

event := &meter.Event{
    SubscriptionID: subscription.ID,
    CustomerID:     customer.ID,
    Meter:          "api-calls",
    Value:          1,
    Timestamp:      time.Now(),
    IdempotencyKey: requestID, // Prevent duplicates
}

if err := meter.Record(ctx, event); err != nil {
    log.Error("meter failed", "error", err)
}

Engine-level API

The engine exposes a simplified, non-blocking metering API:

// Track a single API call
engine.Meter(ctx, "api_calls", 1)

// Track storage usage
engine.Meter(ctx, "storage_gb", 2.5)

// Track bandwidth
engine.Meter(ctx, "bandwidth_gb", 0.125)

// Track compute time
engine.Meter(ctx, "compute_minutes", 15)

Metering configuration

Configure batching behavior for your workload:

engine := ledger.New(store,
    ledger.WithMeterConfig(
        100,              // Batch size (events)
        5*time.Second,    // Flush interval
    ),
)

Tuning guidelines:

WorkloadBatch SizeFlush IntervalRationale
High-volume API10001sMaximize throughput
Real-time analytics10100msLow latency updates
Background jobs50030sOptimize for efficiency
Development10sImmediate visibility

Event metadata

Attach metadata to usage events for detailed tracking:

// Track with metadata
engine.MeterWithMetadata(ctx, meter.Event{
    Feature:  "api_calls",
    Quantity: 1,
    Metadata: map[string]string{
        "endpoint":    "/api/v1/users",
        "method":      "GET",
        "status_code": "200",
        "region":      "us-west-2",
    },
    Timestamp: time.Now(),
    IdempotencyKey: "req_abc123", // Prevent duplicates
})

Batched recording

For high-throughput scenarios, batch events before writing:

events := []*meter.Event{
    {SubscriptionID: sub1, Meter: "api-calls", Value: 1},
    {SubscriptionID: sub2, Meter: "api-calls", Value: 1},
    {SubscriptionID: sub1, Meter: "bandwidth-gb", Value: 0.5},
}

if err := meter.RecordBatch(ctx, events); err != nil {
    log.Error("batch failed", "error", err)
}

Batches are written atomically in a single database transaction.

Batch context

Share context across batched events using the engine API:

batch := engine.NewMeterBatch(ctx)
defer batch.Flush()

for _, request := range requests {
    batch.Add("api_calls", 1)
    // Process request...
}
// Auto-flushes on defer

Idempotency

Use IdempotencyKey to prevent duplicate recording. Keys are stored for 24 hours.

Meter package

event := &meter.Event{
    SubscriptionID: subscription.ID,
    Meter:          "api-calls",
    Value:          1,
    IdempotencyKey: fmt.Sprintf("req:%s", requestID),
}

// If called twice with same IdempotencyKey, only records once
meter.Record(ctx, event)
meter.Record(ctx, event) // No-op

Engine API

// First call - recorded
engine.MeterIdempotent(ctx, "api_calls", 1, "request_123")

// Duplicate call - ignored
engine.MeterIdempotent(ctx, "api_calls", 1, "request_123")

Time-series aggregation

Usage is aggregated per subscription per billing period:

type Aggregation struct {
    SubscriptionID id.SubscriptionID
    Meter          string
    PeriodStart    time.Time
    PeriodEnd      time.Time
    Sum            float64
    Count          int64
    Min            float64
    Max            float64
    Avg            float64
}

agg, err := meter.Aggregate(ctx, subscription.ID, "api-calls",
    meter.WithPeriod(periodStart, periodEnd),
)

fmt.Printf("Total API calls: %.0f\n", agg.Sum)

Aggregation caching

Aggregations are expensive to compute. Ledger caches them in Redis:

// First call: computes from database
agg1, _ := meter.Aggregate(ctx, subID, "api-calls", meter.WithPeriod(start, end))

// Second call: returns from cache
agg2, _ := meter.Aggregate(ctx, subID, "api-calls", meter.WithPeriod(start, end))

Cache is invalidated when new events are recorded for that subscription+meter+period.

Engine-level aggregation queries

// Get usage for current billing period
usage, err := engine.GetUsage(ctx, subscription.ID, "api_calls")
fmt.Printf("Total API calls: %d\n", usage.Total)

// Get detailed usage breakdown
breakdown, err := engine.GetUsageBreakdown(ctx,
    subscription.ID,
    time.Now().AddDate(0, -1, 0), // Start
    time.Now(),                    // End
)

for _, item := range breakdown {
    fmt.Printf("%s: %d events\n", item.Feature, item.Count)
}

Querying usage

Get current period usage

currentUsage, err := meter.GetCurrentUsage(ctx, subscription.ID, "api-calls")
fmt.Printf("Used: %.0f / %d\n", currentUsage.Sum, plan.Limit)

Get historical usage

history, err := meter.GetUsageHistory(ctx, subscription.ID, "api-calls",
    meter.WithLast(6, meter.PeriodMonthly),
)

for _, period := range history {
    fmt.Printf("%s: %.0f calls\n", period.Month, period.Sum)
}

Partitioning strategy

Usage events are partitioned by month for efficient querying and archival:

CREATE TABLE meter_events_2024_01 PARTITION OF meter_events
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

CREATE TABLE meter_events_2024_02 PARTITION OF meter_events
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

Old partitions can be archived or dropped without affecting current data.

Real-time vs batch processing

Ledger supports two metering modes:

Real-time (default)

Events are written immediately with sub-10ms latency:

meter.Record(ctx, event) // Writes immediately

Use for interactive billing experiences (e.g., live usage dashboards).

Batch (high-throughput)

Events are buffered and flushed every 5 seconds:

meter.StartBatchMode(ctx, meter.WithFlushInterval(5*time.Second))
defer meter.StopBatchMode(ctx)

// These are buffered
meter.Record(ctx, event1)
meter.Record(ctx, event2)
// Flushed together in 5s or when buffer is full

Achieves 10,000+ events/sec with batch sizes of 1000.

Backpressure handling

Handle high load gracefully:

// Configure overflow behavior
engine := ledger.New(store,
    ledger.WithMeterConfig(1000, 1*time.Second),
    ledger.WithMeterOverflow(ledger.OverflowDrop), // or OverflowBlock
)

// Check if metering succeeded
if err := engine.MeterSafe(ctx, "api_calls", 1); err != nil {
    if errors.Is(err, ledger.ErrMeterBufferFull) {
        // Buffer is full, event dropped
        log.Warn("Metering buffer full, event dropped")
    }
}

Overage detection

When a metered limit is exceeded, Ledger emits an event:

// Automatic overage detection
if currentUsage.Sum > plan.Limit {
    events.Emit(ctx, &events.OverageEvent{
        SubscriptionID: subscription.ID,
        Meter:          "api-calls",
        Limit:          plan.Limit,
        Current:        currentUsage.Sum,
    })
}

Use this to trigger notifications, soft limits, or automatic upgrades.

Buffer monitoring

Monitor buffer health and performance:

stats := engine.MeterStats()

fmt.Printf("Buffer stats:\n")
fmt.Printf("  Pending events: %d\n", stats.Pending)
fmt.Printf("  Total processed: %d\n", stats.TotalProcessed)
fmt.Printf("  Dropped events: %d\n", stats.Dropped)
fmt.Printf("  Flush count: %d\n", stats.FlushCount)
fmt.Printf("  Avg flush time: %v\n", stats.AvgFlushTime)

Performance optimization

Memory pooling

Reduce allocations with event pooling:

var eventPool = sync.Pool{
    New: func() interface{} {
        return &meter.Event{}
    },
}

func trackUsage(engine *ledger.Ledger, feature string, qty float64) {
    event := eventPool.Get().(*meter.Event)
    defer eventPool.Put(event)

    event.Feature = feature
    event.Quantity = qty
    event.Timestamp = time.Now()

    engine.MeterEvent(ctx, event)
}

Usage patterns

Pattern: Rate limiting

Combine entitlements with metering for rate limiting:

func HandleAPIRequest(ctx context.Context, engine *ledger.Ledger) error {
    // Check rate limit
    result, _ := engine.Entitled(ctx, "api_calls_per_minute")
    if !result.Allowed {
        return errors.New("rate limit exceeded")
    }

    // Process request...

    // Track usage
    engine.Meter(ctx, "api_calls_per_minute", 1)
    return nil
}

Pattern: Cost tracking

Track costs alongside usage:

type CostTracker struct {
    engine *ledger.Ledger
}

func (c *CostTracker) TrackCompute(ctx context.Context, minutes int, instanceType string) {
    // Track usage
    c.engine.Meter(ctx, "compute_minutes", minutes)

    // Track cost based on instance type
    costPerMinute := c.getCostPerMinute(instanceType)
    c.engine.MeterWithMetadata(ctx, meter.Event{
        Feature:  "compute_cost",
        Quantity: float64(minutes) * costPerMinute,
        Metadata: map[string]string{
            "instance_type": instanceType,
        },
    })
}

Pattern: Batch import

Import historical usage data:

events := []meter.Event{
    {Feature: "api_calls", Quantity: 1000, Timestamp: time1},
    {Feature: "storage_gb", Quantity: 50, Timestamp: time2},
    {Feature: "bandwidth_gb", Quantity: 25, Timestamp: time3},
}

// Batch import
for _, event := range events {
    engine.MeterAt(ctx, event.Feature, event.Quantity, event.Timestamp)
}

// Force flush
engine.FlushMeter(ctx)

Store interface

type Store interface {
    RecordEvent(ctx context.Context, event *Event) error
    RecordBatch(ctx context.Context, events []*Event) error
    GetEvents(ctx context.Context, filter *EventFilter) ([]*Event, error)
    Aggregate(ctx context.Context, subID id.SubscriptionID, meter string, opts ...AggOption) (*Aggregation, error)
    GetCurrentUsage(ctx context.Context, subID id.SubscriptionID, meter string) (*Aggregation, error)
    GetUsageHistory(ctx context.Context, subID id.SubscriptionID, meter string, opts ...HistoryOption) ([]*Aggregation, error)
}

API routes

MethodPathDescription
POST/ledger/meter/eventsRecord a usage event
POST/ledger/meter/events/batchRecord multiple events
GET/ledger/meter/usage/{subscriptionId}/{meter}Get current usage
GET/ledger/meter/usage/{subscriptionId}/{meter}/historyGet historical usage
GET/ledger/meter/eventsQuery events with filters

Troubleshooting

Common issues and solutions:

IssueCauseSolution
Events not persistingBuffer not flushingReduce flush interval or call FlushMeter()
High memory usageLarge batch sizeReduce batch size or flush more frequently
Dropped eventsBuffer overflowIncrease buffer size or use blocking mode
Duplicate chargesMissing idempotencyUse MeterIdempotent() with unique keys

Best practices

  1. Use appropriate batch sizes — Balance between throughput and latency
  2. Include metadata — Track details for analytics and debugging
  3. Handle errors gracefully — Log dropped events in overflow scenarios
  4. Use idempotency — Prevent double-charging on retries
  5. Monitor buffer stats — Watch for dropped events or high latency
  6. Test under load — Verify your configuration handles peak traffic
  7. Use partitioned storage — Partition by month for efficient querying and archival
  8. Cache aggregations — Let Redis handle repeated aggregation queries
  9. Detect overages — Emit events when metered limits are exceeded to trigger notifications or upgrades

On this page