openova/core/services/metering-sidecar/main.go
e3mrah 9645a9044a
feat(metering): NewAPI NATS publisher + sme-billing subscriber + POST /metering/record (#798) (#818)
* feat(metering): NewAPI NATS publisher + sme-billing subscriber + POST /metering/record (#798)

Per #795 [Q-mine-3] (NATS not RedPanda) + [Q-mine-4] (one ledger), add
the SME-2 metering integration end-to-end. NewAPI is consumed as the
upstream image `ghcr.io/openova-io/openova/newapi-mirror` (a pinned
mirror, not a fork) — the metering envelope is produced by a Go sidecar
that observes the OpenAI-style `usage.total_tokens` field on every
2xx /v1/* response. This avoids forking the upstream binary while still
producing the canonical envelope shape on `catalyst.usage.recorded`.

A) NewAPI metering sidecar — core/services/metering-sidecar/
   - Transparent reverse proxy in front of NewAPI on its own port; the
     bp-newapi Service routes the cluster-fronting port to the sidecar,
     which forwards to NewAPI on the pod's loopback.
   - Observes successful /v1/* JSON responses, parses
     `usage.{prompt_tokens,completion_tokens,total_tokens}`, computes
     amount_micro_omr = -tokens * priceMicroOMRPerToken, and publishes
     one envelope on `catalyst.usage.recorded` per completed request.
   - Failed (non-2xx), non-JSON, and admin-path requests are NOT billed.
   - Customer-facing latency is NEVER blocked on metering: the response
     body is restored before publish; on NATS unreachable the envelope
     is persisted to disk and retried by a background drain loop.
   - 14 unit tests (proxy + publisher + safeFilename guards).

B) sme-billing NATS subscriber — core/services/billing/handlers/
   metering_consumer.go
   - JetStream durable consumer `sme-billing-metering` on stream
     `CATALYST_USAGE` (provisioned by sme-billing on startup).
   - Idempotent on metadata.request_id via a UNIQUE partial index on
     credit_ledger.external_ref; redelivery from the broker collapses
     to a single ledger row.
   - Customer auto-create on cold start (the rbac sme.user.created
     envelope may land AFTER the first metered request; we don't strand
     usage waiting for it).
   - 11 unit tests covering happy-path, idempotency, malformed-payload
     poison-pill, missing-request-id, non-negative amount guard,
     resolver error → Nak, derive-micro-OMR-from-OMR, DB-error → Nak.

C) HTTP handler POST /billing/metering/record — handlers/metering.go
   - Synchronous validate → INSERT credit_ledger → return
     {ledger_entry_id, balance_after_omr, balance_after_micro_omr,
     duplicate}. Same payload + idempotency guard as the NATS path.
   - Auth: superadmin OR sovereign-admin (operator-admin model;
     end-user LLM traffic flows through the sidecar, never this URL).
   - 8 unit tests covering happy-path, idempotency, role gating,
     malformed-JSON, positive-amount rejection, customer-not-found.

D) Schema — core/services/billing/store/store.go
   - ALTER TABLE credit_ledger ADD COLUMN amount_micro_omr BIGINT
     (1 OMR = 1,000,000 micro-OMR; -0.000234 OMR = -234 micro-OMR
     exact integer — preserves precision at metering rates).
   - ADD COLUMN external_ref TEXT + UNIQUE partial index for
     idempotency dedup.
   - ADD COLUMN metadata JSONB for the raw envelope.
   - GetCreditBalance projects both amount_omr (legacy) and
     amount_micro_omr (new) into the integer-OMR view.
   - GetCreditBalanceMicroOMR returns canonical precision.
   - RecordUsage method: ON CONFLICT DO UPDATE … RETURNING (xmax<>0)
     distinguishes fresh insert from duplicate without a follow-up
     SELECT.

E) Wiring
   - core/services/shared/events/nats.go — minimal NATS JetStream
     publisher + subscriber surface; legacy RedPanda producer/consumer
     in events.go untouched per [Q-mine-3].
   - core/services/billing/main.go — NATS_URL env; subscriber wired
     in parallel with the existing RedPanda tenant-events consumer.
   - middleware/jwt.go — exported test helper WithClaims so handler
     tests can construct an authenticated context without minting a
     real signed token.
   - .github/workflows/services-build.yaml — metering-sidecar added
     to the build matrix; deploy job skips it (image consumed by the
     bp-newapi chart, not products/catalyst sme-services).

F) bp-newapi chart (1.0.0 → 1.1.0)
   - meteringSidecar block in values.yaml: image, port, NATS URL,
     priceMicroOMRPerToken (default 156 = 0.000156 OMR/token), spool
     dir, header names, resources, securityContext (read-only-rootfs).
   - deployment.yaml renders the sidecar container + emptyDir spool
     volume when meteringSidecar.enabled (default true).
   - service.yaml routes the cluster-fronting :3000 to the sidecar
     when enabled, exposes a separate :3001 → NewAPI direct port for
     bp-catalyst-platform admin-API traffic (ADR-0003 §3.2).
   - networkpolicy.yaml allows the sidecar's port + nats-system
     egress for JetStream publish.

Tests: 33 new (14 sidecar + 11 subscriber + 8 HTTP handler), all green.
Helm template renders cleanly with sidecar enabled and disabled.

Closes #798

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(billing/store): cast SUM to BIGINT so lib/pq scans into int64 (#798)

Postgres returns `SUM(int) + SUM(bigint)/integer` as `numeric`, which
lib/pq presents as a `[]uint8` decimal string ("50.000000000000000000000000")
that does NOT scan directly into Go int64 — the integration test
TestVoucherLifecycle_IssueRedeemAndCreditApplied caught this in CI on
the post-redeem balance read.

Wrap the SUM expressions in CAST(... AS BIGINT) so the column type is
unambiguously bigint and Scan target stays uniform across pre-#798 rows
(amount_omr only) and post-#798 rows (amount_micro_omr present).

Affects:
  - GetCreditBalance
  - GetCreditBalanceMicroOMR
  - RecordUsage's running-balance read

Test mocks updated to match the new SQL prefix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: hatiyildiz <hatice.yildiz@openova.io>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 22:32:42 +04:00

181 lines
6.4 KiB
Go

// Package main is the catalyst-metering-sidecar — a transparent
// reverse proxy in front of NewAPI that emits one
// catalyst.usage.recorded NATS envelope per completed LLM request.
//
// Why a sidecar instead of patching NewAPI source:
//
// NewAPI is consumed as the upstream image
// ghcr.io/openova-io/openova/newapi-mirror — a pinned mirror of the
// upstream Go binary, NOT a fork we own. Patching its source would
// fork the upstream and create a long-tail rebase debt for every
// subsequent upstream release. Per #795 [Q-mine-3] + ADR-0001 §6 the
// metering surface MUST be NATS JetStream, but the metering itself
// doesn't need to live inside NewAPI's process — observing the
// request/response pair at the network edge is sufficient because the
// OpenAI-compatible /v1/* response carries `usage.{prompt_tokens,
// completion_tokens, total_tokens}` for every successful call.
//
// Deployment shape: bp-newapi (#799) renders this sidecar as a second
// container in the NewAPI Pod. Customer traffic flows
// ingress :443 → sidecar :8086 → newapi :3000
// and the response body is observed on its way back. Failed requests
// (non-2xx, network errors) are NOT billed — only successfully-completed
// LLM calls produce a NATS envelope.
//
// At-least-once delivery: the sidecar publishes via JetStream synchronous
// Publish (broker-acked). On NATS unreachable >5s, the envelope is
// persisted to /var/lib/metering-sidecar/spool/<request_id>.json and
// retried in the background. The customer-facing LLM call is NEVER
// blocked on metering — billing is observability for the response, not
// in the critical path.
package main
import (
"context"
"log/slog"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/openova-io/openova/core/services/metering-sidecar/handlers"
"github.com/openova-io/openova/core/services/metering-sidecar/proxy"
"github.com/openova-io/openova/core/services/shared/events"
"github.com/openova-io/openova/core/services/shared/health"
)
func main() {
natsURL := getEnv("NATS_URL", "nats://nats-jetstream.nats-system.svc.cluster.local:4222")
upstreamURL := getEnv("NEWAPI_UPSTREAM_URL", "http://localhost:3000")
listenPort := getEnv("LISTEN_PORT", "8086")
spoolDir := getEnv("SPOOL_DIR", "/var/lib/metering-sidecar/spool")
priceMicroOMRPerToken := mustParseInt64(getEnv("PRICE_MICRO_OMR_PER_TOKEN", "156"))
publishTimeout := mustParseDuration(getEnv("NATS_PUBLISH_TIMEOUT", "5s"))
tenantIDFromHeader := getEnv("TENANT_ID_HEADER", "x-tenant-id")
customerIDFromHeader := getEnv("CUSTOMER_ID_HEADER", "x-customer-id")
upstream, err := url.Parse(upstreamURL)
if err != nil {
slog.Error("invalid NEWAPI_UPSTREAM_URL", "url", upstreamURL, "error", err)
os.Exit(1)
}
if err := os.MkdirAll(spoolDir, 0o700); err != nil {
slog.Error("failed to create spool directory", "dir", spoolDir, "error", err)
os.Exit(1)
}
natsConn, err := events.ConnectNATS(natsURL)
if err != nil {
// Sidecar startup MUST NOT crash on NATS being unreachable —
// the customer-facing LLM proxy must come up regardless. We
// log loudly + run with publisher=nil; envelopes spool to
// disk and a background retry drains them once NATS recovers.
slog.Warn("NATS unavailable at startup — metering will spool to disk",
"url", natsURL, "error", err)
} else {
// Per ADR-0001 §6 the canonical consumer (sme-billing) owns
// the Stream lifecycle. The sidecar does NOT call
// EnsureUsageStream — if the Stream is missing, sme-billing
// will create it on its next startup and the spool drain
// will succeed afterward.
defer natsConn.Close()
}
publisher := &proxy.MeteringPublisher{
NATS: natsConn,
PublishTimeout: publishTimeout,
SpoolDir: spoolDir,
}
// Background spool drain: every 30 seconds, attempt to publish
// any envelopes persisted during a prior NATS outage. Per
// docs/INVIOLABLE-PRINCIPLES.md #1 (event-driven, never polling)
// this is a localised retry loop, not a CronJob — the sidecar
// owns its own spool and no external trigger is needed.
drainCtx, cancelDrain := context.WithCancel(context.Background())
defer cancelDrain()
go publisher.DrainSpoolLoop(drainCtx, 30*time.Second)
revProxy := &proxy.MeteringProxy{
Upstream: upstream,
Publisher: publisher,
PriceMicroOMRPerToken: priceMicroOMRPerToken,
TenantIDHeader: strings.ToLower(tenantIDFromHeader),
CustomerIDHeader: strings.ToLower(customerIDFromHeader),
}
mux := http.NewServeMux()
mux.HandleFunc("GET /healthz", health.Handler())
mux.HandleFunc("GET /metrics", handlers.MetricsHandler(publisher))
// Everything else proxies through to NewAPI.
mux.Handle("/", revProxy)
server := &http.Server{
Addr: ":" + listenPort,
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
// Long client read/write timeouts — LLM requests can be slow.
ReadTimeout: 300 * time.Second,
WriteTimeout: 300 * time.Second,
IdleTimeout: 120 * time.Second,
}
go func() {
slog.Info("metering sidecar listening",
"addr", server.Addr,
"upstream", upstream.String(),
"price_micro_omr_per_token", priceMicroOMRPerToken,
"nats_url", natsURL)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("server error", "error", err)
os.Exit(1)
}
}()
// Graceful shutdown: SIGTERM from K8s drains the spool one last
// time so envelopes generated during the final 30s of the pod's
// life are not lost.
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
slog.Info("shutdown signal received — draining spool one last time")
cancelDrain()
drainOnce, cancelOnce := context.WithTimeout(context.Background(), 10*time.Second)
publisher.DrainSpoolOnce(drainOnce)
cancelOnce()
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelShutdown()
server.Shutdown(shutdownCtx)
}
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func mustParseInt64(s string) int64 {
v, err := strconv.ParseInt(s, 10, 64)
if err != nil {
slog.Error("invalid integer env value", "value", s, "error", err)
os.Exit(1)
}
return v
}
func mustParseDuration(s string) time.Duration {
d, err := time.ParseDuration(s)
if err != nil {
slog.Error("invalid duration env value", "value", s, "error", err)
os.Exit(1)
}
return d
}