Usage Metering
High-throughput event ingestion for tracking customer consumption in real-time.
Usage Metering is the high-performance event ingestion system that records customer consumption of metered features. It's designed for 10,000+ events per second with batched writes, partitioned storage, and exactly-once semantics.
Metering architecture
API Call → Meter() → Ring Buffer → Batch Processor → Store
↓
(Non-blocking)The metering pipeline:
- Non-blocking ingestion —
Meter()returns immediately after adding to buffer - Ring buffer storage — Lock-free circular buffer for high concurrency
- Automatic batching — Flush on size or time thresholds
- Persistent storage — Batch insert to database for efficiency
Event structure
type Event struct {
ID id.UsageID
SubscriptionID id.SubscriptionID
CustomerID id.CustomerID
TenantID string
AppID string
Meter string
Value float64
Timestamp time.Time
IdempotencyKey string
Metadata map[string]any
}Meter names
Meters correspond to plan features:
| Meter | Type | Description |
|---|---|---|
api-calls | Counter | Increment by 1 per request |
storage-gb | Gauge | Current storage used in GB |
compute-minutes | Duration | CPU time consumed |
bandwidth-gb | Counter | Data transferred |
Recording usage
Meter package API
Usage events are recorded with the meter.Record function:
import "github.com/xraph/ledger/meter"
event := &meter.Event{
SubscriptionID: subscription.ID,
CustomerID: customer.ID,
Meter: "api-calls",
Value: 1,
Timestamp: time.Now(),
IdempotencyKey: requestID, // Prevent duplicates
}
if err := meter.Record(ctx, event); err != nil {
log.Error("meter failed", "error", err)
}Engine-level API
The engine exposes a simplified, non-blocking metering API:
// Track a single API call
engine.Meter(ctx, "api_calls", 1)
// Track storage usage
engine.Meter(ctx, "storage_gb", 2.5)
// Track bandwidth
engine.Meter(ctx, "bandwidth_gb", 0.125)
// Track compute time
engine.Meter(ctx, "compute_minutes", 15)Metering configuration
Configure batching behavior for your workload:
engine := ledger.New(store,
ledger.WithMeterConfig(
100, // Batch size (events)
5*time.Second, // Flush interval
),
)Tuning guidelines:
| Workload | Batch Size | Flush Interval | Rationale |
|---|---|---|---|
| High-volume API | 1000 | 1s | Maximize throughput |
| Real-time analytics | 10 | 100ms | Low latency updates |
| Background jobs | 500 | 30s | Optimize for efficiency |
| Development | 1 | 0s | Immediate visibility |
Event metadata
Attach metadata to usage events for detailed tracking:
// Track with metadata
engine.MeterWithMetadata(ctx, meter.Event{
Feature: "api_calls",
Quantity: 1,
Metadata: map[string]string{
"endpoint": "/api/v1/users",
"method": "GET",
"status_code": "200",
"region": "us-west-2",
},
Timestamp: time.Now(),
IdempotencyKey: "req_abc123", // Prevent duplicates
})Batched recording
For high-throughput scenarios, batch events before writing:
events := []*meter.Event{
{SubscriptionID: sub1, Meter: "api-calls", Value: 1},
{SubscriptionID: sub2, Meter: "api-calls", Value: 1},
{SubscriptionID: sub1, Meter: "bandwidth-gb", Value: 0.5},
}
if err := meter.RecordBatch(ctx, events); err != nil {
log.Error("batch failed", "error", err)
}Batches are written atomically in a single database transaction.
Batch context
Share context across batched events using the engine API:
batch := engine.NewMeterBatch(ctx)
defer batch.Flush()
for _, request := range requests {
batch.Add("api_calls", 1)
// Process request...
}
// Auto-flushes on deferIdempotency
Use IdempotencyKey to prevent duplicate recording. Keys are stored for 24 hours.
Meter package
event := &meter.Event{
SubscriptionID: subscription.ID,
Meter: "api-calls",
Value: 1,
IdempotencyKey: fmt.Sprintf("req:%s", requestID),
}
// If called twice with same IdempotencyKey, only records once
meter.Record(ctx, event)
meter.Record(ctx, event) // No-opEngine API
// First call - recorded
engine.MeterIdempotent(ctx, "api_calls", 1, "request_123")
// Duplicate call - ignored
engine.MeterIdempotent(ctx, "api_calls", 1, "request_123")Time-series aggregation
Usage is aggregated per subscription per billing period:
type Aggregation struct {
SubscriptionID id.SubscriptionID
Meter string
PeriodStart time.Time
PeriodEnd time.Time
Sum float64
Count int64
Min float64
Max float64
Avg float64
}
agg, err := meter.Aggregate(ctx, subscription.ID, "api-calls",
meter.WithPeriod(periodStart, periodEnd),
)
fmt.Printf("Total API calls: %.0f\n", agg.Sum)Aggregation caching
Aggregations are expensive to compute. Ledger caches them in Redis:
// First call: computes from database
agg1, _ := meter.Aggregate(ctx, subID, "api-calls", meter.WithPeriod(start, end))
// Second call: returns from cache
agg2, _ := meter.Aggregate(ctx, subID, "api-calls", meter.WithPeriod(start, end))Cache is invalidated when new events are recorded for that subscription+meter+period.
Engine-level aggregation queries
// Get usage for current billing period
usage, err := engine.GetUsage(ctx, subscription.ID, "api_calls")
fmt.Printf("Total API calls: %d\n", usage.Total)
// Get detailed usage breakdown
breakdown, err := engine.GetUsageBreakdown(ctx,
subscription.ID,
time.Now().AddDate(0, -1, 0), // Start
time.Now(), // End
)
for _, item := range breakdown {
fmt.Printf("%s: %d events\n", item.Feature, item.Count)
}Querying usage
Get current period usage
currentUsage, err := meter.GetCurrentUsage(ctx, subscription.ID, "api-calls")
fmt.Printf("Used: %.0f / %d\n", currentUsage.Sum, plan.Limit)Get historical usage
history, err := meter.GetUsageHistory(ctx, subscription.ID, "api-calls",
meter.WithLast(6, meter.PeriodMonthly),
)
for _, period := range history {
fmt.Printf("%s: %.0f calls\n", period.Month, period.Sum)
}Partitioning strategy
Usage events are partitioned by month for efficient querying and archival:
CREATE TABLE meter_events_2024_01 PARTITION OF meter_events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE meter_events_2024_02 PARTITION OF meter_events
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');Old partitions can be archived or dropped without affecting current data.
Real-time vs batch processing
Ledger supports two metering modes:
Real-time (default)
Events are written immediately with sub-10ms latency:
meter.Record(ctx, event) // Writes immediatelyUse for interactive billing experiences (e.g., live usage dashboards).
Batch (high-throughput)
Events are buffered and flushed every 5 seconds:
meter.StartBatchMode(ctx, meter.WithFlushInterval(5*time.Second))
defer meter.StopBatchMode(ctx)
// These are buffered
meter.Record(ctx, event1)
meter.Record(ctx, event2)
// Flushed together in 5s or when buffer is fullAchieves 10,000+ events/sec with batch sizes of 1000.
Backpressure handling
Handle high load gracefully:
// Configure overflow behavior
engine := ledger.New(store,
ledger.WithMeterConfig(1000, 1*time.Second),
ledger.WithMeterOverflow(ledger.OverflowDrop), // or OverflowBlock
)
// Check if metering succeeded
if err := engine.MeterSafe(ctx, "api_calls", 1); err != nil {
if errors.Is(err, ledger.ErrMeterBufferFull) {
// Buffer is full, event dropped
log.Warn("Metering buffer full, event dropped")
}
}Overage detection
When a metered limit is exceeded, Ledger emits an event:
// Automatic overage detection
if currentUsage.Sum > plan.Limit {
events.Emit(ctx, &events.OverageEvent{
SubscriptionID: subscription.ID,
Meter: "api-calls",
Limit: plan.Limit,
Current: currentUsage.Sum,
})
}Use this to trigger notifications, soft limits, or automatic upgrades.
Buffer monitoring
Monitor buffer health and performance:
stats := engine.MeterStats()
fmt.Printf("Buffer stats:\n")
fmt.Printf(" Pending events: %d\n", stats.Pending)
fmt.Printf(" Total processed: %d\n", stats.TotalProcessed)
fmt.Printf(" Dropped events: %d\n", stats.Dropped)
fmt.Printf(" Flush count: %d\n", stats.FlushCount)
fmt.Printf(" Avg flush time: %v\n", stats.AvgFlushTime)Performance optimization
Memory pooling
Reduce allocations with event pooling:
var eventPool = sync.Pool{
New: func() interface{} {
return &meter.Event{}
},
}
func trackUsage(engine *ledger.Ledger, feature string, qty float64) {
event := eventPool.Get().(*meter.Event)
defer eventPool.Put(event)
event.Feature = feature
event.Quantity = qty
event.Timestamp = time.Now()
engine.MeterEvent(ctx, event)
}Usage patterns
Pattern: Rate limiting
Combine entitlements with metering for rate limiting:
func HandleAPIRequest(ctx context.Context, engine *ledger.Ledger) error {
// Check rate limit
result, _ := engine.Entitled(ctx, "api_calls_per_minute")
if !result.Allowed {
return errors.New("rate limit exceeded")
}
// Process request...
// Track usage
engine.Meter(ctx, "api_calls_per_minute", 1)
return nil
}Pattern: Cost tracking
Track costs alongside usage:
type CostTracker struct {
engine *ledger.Ledger
}
func (c *CostTracker) TrackCompute(ctx context.Context, minutes int, instanceType string) {
// Track usage
c.engine.Meter(ctx, "compute_minutes", minutes)
// Track cost based on instance type
costPerMinute := c.getCostPerMinute(instanceType)
c.engine.MeterWithMetadata(ctx, meter.Event{
Feature: "compute_cost",
Quantity: float64(minutes) * costPerMinute,
Metadata: map[string]string{
"instance_type": instanceType,
},
})
}Pattern: Batch import
Import historical usage data:
events := []meter.Event{
{Feature: "api_calls", Quantity: 1000, Timestamp: time1},
{Feature: "storage_gb", Quantity: 50, Timestamp: time2},
{Feature: "bandwidth_gb", Quantity: 25, Timestamp: time3},
}
// Batch import
for _, event := range events {
engine.MeterAt(ctx, event.Feature, event.Quantity, event.Timestamp)
}
// Force flush
engine.FlushMeter(ctx)Store interface
type Store interface {
RecordEvent(ctx context.Context, event *Event) error
RecordBatch(ctx context.Context, events []*Event) error
GetEvents(ctx context.Context, filter *EventFilter) ([]*Event, error)
Aggregate(ctx context.Context, subID id.SubscriptionID, meter string, opts ...AggOption) (*Aggregation, error)
GetCurrentUsage(ctx context.Context, subID id.SubscriptionID, meter string) (*Aggregation, error)
GetUsageHistory(ctx context.Context, subID id.SubscriptionID, meter string, opts ...HistoryOption) ([]*Aggregation, error)
}API routes
| Method | Path | Description |
|---|---|---|
POST | /ledger/meter/events | Record a usage event |
POST | /ledger/meter/events/batch | Record multiple events |
GET | /ledger/meter/usage/{subscriptionId}/{meter} | Get current usage |
GET | /ledger/meter/usage/{subscriptionId}/{meter}/history | Get historical usage |
GET | /ledger/meter/events | Query events with filters |
Troubleshooting
Common issues and solutions:
| Issue | Cause | Solution |
|---|---|---|
| Events not persisting | Buffer not flushing | Reduce flush interval or call FlushMeter() |
| High memory usage | Large batch size | Reduce batch size or flush more frequently |
| Dropped events | Buffer overflow | Increase buffer size or use blocking mode |
| Duplicate charges | Missing idempotency | Use MeterIdempotent() with unique keys |
Best practices
- Use appropriate batch sizes — Balance between throughput and latency
- Include metadata — Track details for analytics and debugging
- Handle errors gracefully — Log dropped events in overflow scenarios
- Use idempotency — Prevent double-charging on retries
- Monitor buffer stats — Watch for dropped events or high latency
- Test under load — Verify your configuration handles peak traffic
- Use partitioned storage — Partition by month for efficient querying and archival
- Cache aggregations — Let Redis handle repeated aggregation queries
- Detect overages — Emit events when metered limits are exceeded to trigger notifications or upgrades