feat(pdm): pool-domain-manager service skeleton (Phase 1 of #163)

Build a new Go service core/pool-domain-manager that becomes the SOLE
authority for OpenOva-pool subdomain allocation across the fleet.

Why this exists: today products/catalyst/bootstrap/api/internal/handler/
subdomains.go does naive net.LookupHost() to decide whether a candidate
subdomain is taken. Dynadot's wildcard parking record at the apex of
omani.works (and any future pool domain) makes EVERY subdomain resolve
to 185.53.179.128, so the check rejects everything. DNS is the wrong
source of truth for an OpenOva-managed pool — the central control plane
must own the allocation table.

What this commit adds (no integration with catalyst-api yet — that lands
in a follow-up commit):

  core/pool-domain-manager/
    cmd/pdm/main.go                     chi router, healthz, sweeper boot
    api/openapi.yaml                     wire contract for every endpoint
    Containerfile                        alpine final stage, UID 65534
    internal/store/                      pgx + CNPG; pool_allocations table
      migrations.sql                       idempotent CREATE TABLE schema
      store.go                             Reserve/Get/Commit/Release/List
      store_test.go                        integration tests (PDM_TEST_DSN)
    internal/dynadot/                    moved + extended; SOLE Dynadot caller
      dynadot.go                           AddRecord, AddSovereignRecords,
                                           DeleteSubdomainRecords (read-modify-
                                           write to honour feedback_dynadot_dns)
      dynadot_test.go                      managed-domain resolution tests
    internal/reserved/                   centralised reserved-name list
      reserved.go                          IsReserved/All; pulled out of
                                           catalyst-api's subdomains.go
    internal/handler/                    HTTP surface
      handler.go                           /api/v1/pool/{domain}/{check,reserve,
                                           commit,release,list}, /healthz,
                                           /api/v1/reserved
    internal/allocator/                  state machine + sweeper goroutine

Architecture choices and how they map to docs/INVIOLABLE-PRINCIPLES.md:

  - Principle #4 (never hardcode): every value (PORT, PDM_DATABASE_URL,
    DYNADOT_MANAGED_DOMAINS, PDM_RESERVATION_TTL, PDM_SWEEPER_INTERVAL)
    flows from env vars; the K8s ExternalSecret will populate them at
    deploy time. The reserved-subdomain list lives in ONE place
    (internal/reserved); catalyst-api will not duplicate it.

  - Principle #2 (no quality compromise): the state machine commits the
    DB row before the Dynadot side-effect, so a crash between the two
    leaves the system in a recoverable state (operator runs Release).
    The reservation_token in the row protects against stale-tab commit
    races. UPSERT semantics + a CHECK constraint mean two operators
    racing /reserve get a clean 23505 (unique_violation) → HTTP 409.

  - Principle #3 (follow architecture): PDM is a ClusterIP service in
    openova-system — it is not a Crossplane provider, not a Flux
    HelmRelease, not bespoke OpenTofu state. catalyst-api speaks to it
    via plain HTTP. The Crossplane Composition that wraps PDM as a
    declarative MR (XDynadotPoolAllocation) lands in a follow-up phase.

The DNS-wildcard problem the issue describes is fixed STRUCTURALLY here:
PDM never calls net.LookupHost. The /check path is a single SELECT
against pool_allocations. omani.works's wildcard A record at the apex
becomes architecturally irrelevant.

Tests exercised in this commit:
  - internal/reserved: full unit coverage (case-insensitive, sorted, set
    membership)
  - internal/dynadot: managed-domain runtime resolution (env-var,
    legacy single-domain fallback, built-in defaults, list parsing)
  - internal/store: integration suite gated on PDM_TEST_DSN env var,
    covers reserve happy-path, reserve race (ErrConflict), TTL expiry
    frees, commit happy-path, commit token mismatch, release removes
    row, sweeper deletes expired rows

Closes phase 1 of #163. Phase 2 (catalyst-api wiring), Phase 3 (CI +
manifests), Phase 4 (Crossplane composition), Phase 6 (deploy +
verification curl) follow in separate commits.

Refs: #163
This commit is contained in:
hatiyildiz 2026-04-29 06:37:34 +02:00 committed by Emrah Baysal
parent 296fd68819
commit 585b046f5d
14 changed files with 2530 additions and 0 deletions

View File

@ -0,0 +1,38 @@
# pool-domain-manager — central authority for OpenOva-pool subdomain
# allocation. Per docs/INVIOLABLE-PRINCIPLES.md the image is statically
# compiled, runs as a non-root numeric UID, and ships nothing beyond the
# binary + CA bundle.
#
# Two stages:
# build — golang:1.23-alpine with go modules cached
# final — alpine:3.20 minimal runtime (CA certs + the binary)
FROM docker.io/library/golang:1.23-alpine AS build
WORKDIR /app
# Cache layer for go.mod / go.sum so day-to-day source rebuilds skip the
# module download.
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build \
-ldflags="-s -w -X main.version=$(cat /etc/hostname)" \
-o /pdm ./cmd/pdm
# Use a minimal runtime stage. We need:
# - ca-certificates so the Dynadot HTTPS calls can verify the API cert
# - tzdata so timestamps render correctly in operator logs
# Nothing else.
FROM docker.io/library/alpine:3.20
RUN apk add --no-cache ca-certificates tzdata
COPY --from=build /pdm /pdm
# Alpine 3.20 already ships UID 65534 as `nobody`. Reuse that rather than
# creating a duplicate `nonroot` account. The numeric form satisfies
# runAsNonRoot=true + runAsUser=65534 in the Deployment.
USER 65534:65534
EXPOSE 8080
ENTRYPOINT ["/pdm"]

View File

@ -0,0 +1,190 @@
openapi: 3.0.3
info:
title: pool-domain-manager
version: 1.0.0
description: |
Central authority for OpenOva-pool subdomain allocation. Closes #163.
The PDM is the SOLE source of truth for which (poolDomain, subdomain)
pairs have been reserved or activated across the OpenOva fleet, and the
SOLE service in the fleet that calls api.dynadot.com.
State machine per (domain, subdomain) pair:
NULL ─reserve→ RESERVED ─commit→ ACTIVE
│ │
expire/ release/
release destroy
↓ ↓
NULL NULL
servers:
- url: http://pool-domain-manager.openova-system.svc.cluster.local:8080
description: In-cluster catalyst-api → PDM call path
- url: https://pool.openova.io
description: Operator-facing endpoint (auth-gated)
paths:
/healthz:
get:
summary: Liveness probe
responses:
'200':
description: PDM is healthy and CNPG is reachable
content:
application/json:
schema:
type: object
properties:
status: { type: string, example: ok }
managedDomains:
type: array
items: { type: string }
'503':
description: PDM is up but CNPG is unreachable
/api/v1/reserved:
get:
summary: Canonical reserved-subdomain list
responses:
'200':
description: List of reserved subdomain labels
content:
application/json:
schema:
type: object
properties:
reserved:
type: array
items: { type: string }
/api/v1/pool/{domain}/check:
get:
summary: Fast availability read
parameters:
- in: path
name: domain
required: true
schema: { type: string }
- in: query
name: sub
required: true
schema: { type: string }
responses:
'200':
description: Always 200 — clients use body.available, not status
content:
application/json:
schema:
$ref: '#/components/schemas/CheckResult'
/api/v1/pool/{domain}/reserve:
post:
summary: Atomic reserve with TTL
parameters:
- in: path
name: domain
required: true
schema: { type: string }
requestBody:
required: true
content:
application/json:
schema:
type: object
required: [subdomain]
properties:
subdomain: { type: string }
createdBy: { type: string }
responses:
'201':
description: Reservation created
content:
application/json:
schema:
$ref: '#/components/schemas/ReserveResponse'
'409':
description: Subdomain already taken
'422':
description: Invalid input (format / unsupported pool)
/api/v1/pool/{domain}/commit:
post:
summary: Promote reservation → active and write Dynadot records
parameters:
- in: path
name: domain
required: true
schema: { type: string }
requestBody:
required: true
content:
application/json:
schema:
type: object
required: [subdomain, reservationToken, sovereignFQDN, loadBalancerIP]
properties:
subdomain: { type: string }
reservationToken: { type: string, format: uuid }
sovereignFQDN: { type: string }
loadBalancerIP: { type: string, format: ipv4 }
responses:
'200': { description: Committed }
'202': { description: Committed in DB; Dynadot retry needed }
'403': { description: Reservation token mismatch }
'404': { description: No reservation exists }
'409': { description: Already active }
'410': { description: Reservation TTL expired }
/api/v1/pool/{domain}/release:
delete:
summary: Free a (pool, subdomain) and remove Dynadot records
parameters:
- in: path
name: domain
required: true
schema: { type: string }
requestBody:
content:
application/json:
schema:
type: object
required: [subdomain]
properties:
subdomain: { type: string }
responses:
'200': { description: Released }
'202': { description: Row deleted; Dynadot delete partial }
'404': { description: No allocation exists }
/api/v1/pool/{domain}/list:
get:
summary: Operator-facing list of allocations
parameters:
- in: path
name: domain
required: true
schema: { type: string }
responses:
'200':
description: All allocations for the pool
components:
schemas:
CheckResult:
type: object
properties:
available: { type: boolean }
reason: { type: string }
detail: { type: string }
fqdn: { type: string }
ReserveResponse:
type: object
properties:
poolDomain: { type: string }
subdomain: { type: string }
state: { type: string, enum: [reserved] }
reservedAt: { type: string, format: date-time }
expiresAt: { type: string, format: date-time }
reservationToken: { type: string, format: uuid }
createdBy: { type: string }

View File

@ -0,0 +1,180 @@
// Command pdm — pool-domain-manager service entrypoint.
//
// Wires CNPG/Postgres (store), the Dynadot client, and the chi-based HTTP
// router. Starts the TTL-expiry sweeper as a goroutine. Handles SIGTERM by
// closing the listener gracefully so K8s rolling deploys finish in-flight
// requests before the pod terminates.
//
// All configuration is read from environment variables — per
// docs/INVIOLABLE-PRINCIPLES.md #4 nothing here is hardcoded:
//
// PORT — listen port (default 8080)
// PDM_DATABASE_URL — postgres DSN, REQUIRED
// DYNADOT_API_KEY — dynadot api key, REQUIRED
// DYNADOT_API_SECRET — dynadot api secret, REQUIRED
// DYNADOT_MANAGED_DOMAINS — comma-separated managed pool list
// DYNADOT_DOMAIN — legacy single-domain fallback
// PDM_RESERVATION_TTL — go duration string, default "10m"
// PDM_SWEEPER_INTERVAL — go duration string, default "30s"
// PDM_LOG_LEVEL — debug | info | warn | error (default info)
package main
import (
"context"
"errors"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/openova-io/openova/core/pool-domain-manager/internal/allocator"
"github.com/openova-io/openova/core/pool-domain-manager/internal/dynadot"
"github.com/openova-io/openova/core/pool-domain-manager/internal/handler"
"github.com/openova-io/openova/core/pool-domain-manager/internal/store"
)
func main() {
log := newLogger(env("PDM_LOG_LEVEL", "info"))
slog.SetDefault(log)
cfg, err := loadConfig()
if err != nil {
log.Error("config load failed", "err", err)
os.Exit(2)
}
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
startCtx, startCancel := context.WithTimeout(ctx, 30*time.Second)
defer startCancel()
s, err := store.New(startCtx, cfg.DatabaseURL)
if err != nil {
log.Error("postgres connect failed", "err", err)
os.Exit(1)
}
defer s.Close()
dyn := dynadot.New(cfg.DynadotAPIKey, cfg.DynadotAPISecret)
alloc := allocator.New(s, dyn, log, cfg.ReservationTTL)
go alloc.RunSweeper(ctx, cfg.SweeperInterval)
h := handler.New(alloc, s, log)
root := chi.NewRouter()
root.Use(middleware.RequestID)
root.Use(middleware.RealIP)
root.Use(middleware.Logger)
root.Use(middleware.Recoverer)
root.Mount("/", h.Routes())
srv := &http.Server{
Addr: ":" + cfg.Port,
Handler: root,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 2 * time.Minute,
}
// Surface the managed-domain list at startup so operators can grep logs
// for misconfiguration (e.g. typo in the secret's `domains` key).
log.Info("pool-domain-manager starting",
"port", cfg.Port,
"reservationTTL", cfg.ReservationTTL.String(),
"sweeperInterval", cfg.SweeperInterval.String(),
"managedDomains", dynadot.ManagedDomains(),
)
go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error("http server failed", "err", err)
os.Exit(1)
}
}()
<-ctx.Done()
log.Info("shutdown signal received, draining")
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 20*time.Second)
defer shutdownCancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Error("graceful shutdown failed", "err", err)
os.Exit(1)
}
log.Info("shutdown complete")
}
// config bundles the runtime configuration so loadConfig can return a single
// struct + error.
type config struct {
Port string
DatabaseURL string
DynadotAPIKey string
DynadotAPISecret string
ReservationTTL time.Duration
SweeperInterval time.Duration
}
func loadConfig() (*config, error) {
c := &config{
Port: env("PORT", "8080"),
}
c.DatabaseURL = strings.TrimSpace(os.Getenv("PDM_DATABASE_URL"))
if c.DatabaseURL == "" {
return nil, errors.New("PDM_DATABASE_URL is required")
}
c.DynadotAPIKey = strings.TrimSpace(os.Getenv("DYNADOT_API_KEY"))
if c.DynadotAPIKey == "" {
return nil, errors.New("DYNADOT_API_KEY is required")
}
c.DynadotAPISecret = strings.TrimSpace(os.Getenv("DYNADOT_API_SECRET"))
if c.DynadotAPISecret == "" {
return nil, errors.New("DYNADOT_API_SECRET is required")
}
ttlStr := env("PDM_RESERVATION_TTL", "10m")
ttl, err := time.ParseDuration(ttlStr)
if err != nil {
return nil, errors.New("PDM_RESERVATION_TTL is not a valid duration: " + err.Error())
}
c.ReservationTTL = ttl
swStr := env("PDM_SWEEPER_INTERVAL", "30s")
sw, err := time.ParseDuration(swStr)
if err != nil {
return nil, errors.New("PDM_SWEEPER_INTERVAL is not a valid duration: " + err.Error())
}
c.SweeperInterval = sw
return c, nil
}
func env(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func newLogger(level string) *slog.Logger {
var lvl slog.Level
switch strings.ToLower(level) {
case "debug":
lvl = slog.LevelDebug
case "warn":
lvl = slog.LevelWarn
case "error":
lvl = slog.LevelError
default:
lvl = slog.LevelInfo
}
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: lvl}))
}

View File

@ -0,0 +1,18 @@
module github.com/openova-io/openova/core/pool-domain-manager
go 1.23
require (
github.com/go-chi/chi/v5 v5.2.1
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.7.2
)
require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/text v0.21.0 // indirect
)

View File

@ -0,0 +1,32 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8=
github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,297 @@
// Package allocator wires the persistence layer (store) to the DNS writer
// (dynadot) and exposes the four lifecycle operations PDM's HTTP handlers
// need: Check, Reserve, Commit, Release.
//
// The state machine the allocator implements is:
//
// NULL ─reserve→ RESERVED ─commit→ ACTIVE
// │ │
// expire/ release/
// release destroy
// ↓ ↓
// NULL NULL
//
// Per docs/INVIOLABLE-PRINCIPLES.md #2 every transition is committed to the
// CNPG row before the corresponding side-effect (Dynadot write/delete) is
// invoked, so a crash between the two leaves the system in a recoverable
// state: at worst we have a row claiming state='active' with stale or
// missing DNS records, which an operator can reconcile by calling Release
// then re-running the wizard.
package allocator
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/openova-io/openova/core/pool-domain-manager/internal/dynadot"
"github.com/openova-io/openova/core/pool-domain-manager/internal/reserved"
"github.com/openova-io/openova/core/pool-domain-manager/internal/store"
)
// Allocator owns the state-machine logic. It is concurrency-safe — every
// operation maps to a single Postgres transaction in store; there is no
// in-memory mutable state on the Allocator itself.
type Allocator struct {
store *store.Store
dynadot *dynadot.Client
log *slog.Logger
// reservationTTL — how long a /reserve holds the name before the
// sweeper reclaims it. Per the issue body this is 10 minutes.
reservationTTL time.Duration
}
// New constructs an Allocator. ttl is the reservation TTL; pass
// 10*time.Minute for production.
func New(s *store.Store, d *dynadot.Client, log *slog.Logger, ttl time.Duration) *Allocator {
return &Allocator{
store: s,
dynadot: d,
log: log,
reservationTTL: ttl,
}
}
// CheckResult is the wire shape for /api/v1/pool/{domain}/check?sub=X.
type CheckResult struct {
Available bool `json:"available"`
Reason string `json:"reason,omitempty"`
Detail string `json:"detail,omitempty"`
FQDN string `json:"fqdn,omitempty"`
}
// Check returns whether the (poolDomain, subdomain) pair is free, with a
// machine-readable reason when it is not. Failure modes are:
//
// "unsupported-pool" — poolDomain is not in DYNADOT_MANAGED_DOMAINS
// "reserved" — subdomain is in the reserved-name list
// "reserved-state" — somebody has reserved (TTL not expired) this name
// "active-state" — somebody has committed this name as a live Sovereign
//
// Note: NO net.LookupHost is invoked anywhere in this code path. PDM is the
// authoritative allocation source — DNS-wildcard parking records can never
// cause a false positive here. (This is the entire point of the service.)
func (a *Allocator) Check(ctx context.Context, poolDomain, subdomain string) (*CheckResult, error) {
if !dynadot.IsManagedDomain(poolDomain) {
return &CheckResult{
Available: false,
Reason: "unsupported-pool",
Detail: fmt.Sprintf("pool domain %s is not managed by OpenOva — pick a different pool or use BYO", poolDomain),
}, nil
}
if reserved.IsReserved(subdomain) {
return &CheckResult{
Available: false,
Reason: "reserved",
Detail: "this subdomain is reserved for the Sovereign control plane — pick a different name",
}, nil
}
available, err := a.store.IsAvailable(ctx, poolDomain, subdomain)
if err != nil {
return nil, fmt.Errorf("check availability: %w", err)
}
fqdn := subdomain + "." + poolDomain
if available {
return &CheckResult{Available: true, FQDN: fqdn}, nil
}
// Disambiguate the unavailable reason for the wizard's UX.
row, err := a.store.Get(ctx, poolDomain, subdomain)
if err != nil {
return nil, fmt.Errorf("read existing allocation: %w", err)
}
switch row.State {
case store.StateReserved:
return &CheckResult{
Available: false,
Reason: "reserved-state",
Detail: "this subdomain has been reserved by another deployment in progress — try again in a few minutes",
FQDN: fqdn,
}, nil
case store.StateActive:
return &CheckResult{
Available: false,
Reason: "active-state",
Detail: "this subdomain is already taken by a live Sovereign — pick a different name",
FQDN: fqdn,
}, nil
default:
return &CheckResult{
Available: false,
Reason: "unknown-state",
Detail: "allocation exists in an unrecognised state — contact platform operators",
FQDN: fqdn,
}, nil
}
}
// ReserveInput carries the optional createdBy attribution. Defaults to
// "catalyst-api" when empty.
type ReserveInput struct {
CreatedBy string
}
// Reserve transitions NULL → RESERVED for the (poolDomain, subdomain) pair,
// holding the name for the configured TTL. Returns the Allocation, including
// the reservation token the caller MUST pass back to Commit.
//
// Errors:
//
// store.ErrConflict — the row exists in any non-expired state
// dynadot.ErrUnmanagedDomain — pool domain is not managed
func (a *Allocator) Reserve(ctx context.Context, poolDomain, subdomain string, in ReserveInput) (*store.Allocation, error) {
if !dynadot.IsManagedDomain(poolDomain) {
return nil, dynadot.ErrUnmanagedDomain
}
if reserved.IsReserved(subdomain) {
return nil, fmt.Errorf("subdomain %q is reserved", subdomain)
}
createdBy := in.CreatedBy
if createdBy == "" {
createdBy = "catalyst-api"
}
alloc, err := a.store.Reserve(ctx, poolDomain, subdomain, a.reservationTTL, createdBy)
if err != nil {
return nil, err
}
a.log.Info("pool-domain reserved",
"poolDomain", poolDomain,
"subdomain", subdomain,
"ttl", a.reservationTTL.String(),
"createdBy", createdBy,
"expiresAt", alloc.ExpiresAt.Format(time.RFC3339),
)
return alloc, nil
}
// CommitInput carries the data /commit needs to flip RESERVED → ACTIVE.
type CommitInput struct {
ReservationToken string
SovereignFQDN string
LoadBalancerIP string
}
// Commit flips a reservation to ACTIVE and writes the Dynadot DNS records
// (wildcard + canonical control-plane prefixes) for the new Sovereign.
//
// Order of operations is deliberate:
// 1. Verify the row exists and the reservation token matches (in a single
// row-locked Postgres transaction so a concurrent Release can't race).
// 2. Update the row to state='active' (still in the same transaction).
// 3. Commit the transaction.
// 4. Write the Dynadot records. If this fails we LEAVE the row in
// state='active' and surface the error to the caller — the operator
// decides whether to Release (which will fix DNS) or retry.
//
// Per the auto-memory `feedback_dynadot_dns.md`: Dynadot writes are
// idempotent with add_dns_to_current_setting=yes, so step 4 is safe to
// retry from outside (the wizard's retry button calls Commit again with
// the same token and the same LB IP).
func (a *Allocator) Commit(ctx context.Context, poolDomain, subdomain string, in CommitInput) (*store.Allocation, error) {
if !dynadot.IsManagedDomain(poolDomain) {
return nil, dynadot.ErrUnmanagedDomain
}
alloc, err := a.store.Commit(ctx, poolDomain, subdomain, store.CommitInput{
ReservationToken: in.ReservationToken,
SovereignFQDN: in.SovereignFQDN,
LoadBalancerIP: in.LoadBalancerIP,
})
if err != nil {
return nil, err
}
if err := a.dynadot.AddSovereignRecords(ctx, poolDomain, subdomain, in.LoadBalancerIP); err != nil {
// Row is already state='active'; do not roll it back. The caller can
// Release if they want a clean slate. Surface the DNS error so the
// wizard's UX shows the partial-failure path.
a.log.Error("dynadot write after commit failed",
"poolDomain", poolDomain,
"subdomain", subdomain,
"loadBalancerIP", in.LoadBalancerIP,
"err", err,
)
return alloc, fmt.Errorf("dynadot write: %w", err)
}
a.log.Info("pool-domain committed",
"poolDomain", poolDomain,
"subdomain", subdomain,
"sovereignFQDN", in.SovereignFQDN,
"loadBalancerIP", in.LoadBalancerIP,
)
return alloc, nil
}
// Release deletes the row regardless of state, then (if the row was active)
// removes the Dynadot DNS records. Reserved rows have no DNS side-effect to
// clean up.
//
// Returns the freed Allocation (so the caller can log what was removed) or
// store.ErrNotFound when there was nothing to release.
func (a *Allocator) Release(ctx context.Context, poolDomain, subdomain string) (*store.Allocation, error) {
if !dynadot.IsManagedDomain(poolDomain) {
return nil, dynadot.ErrUnmanagedDomain
}
alloc, err := a.store.Release(ctx, poolDomain, subdomain)
if err != nil {
return nil, err
}
if alloc.State == store.StateActive {
if dnsErr := a.dynadot.DeleteSubdomainRecords(ctx, poolDomain, subdomain); dnsErr != nil {
a.log.Error("dynadot delete after release failed",
"poolDomain", poolDomain,
"subdomain", subdomain,
"err", dnsErr,
)
return alloc, fmt.Errorf("dynadot delete: %w", dnsErr)
}
}
a.log.Info("pool-domain released",
"poolDomain", poolDomain,
"subdomain", subdomain,
"previousState", string(alloc.State),
)
return alloc, nil
}
// List returns every allocation under the given pool domain.
func (a *Allocator) List(ctx context.Context, poolDomain string) ([]store.Allocation, error) {
if !dynadot.IsManagedDomain(poolDomain) {
return nil, dynadot.ErrUnmanagedDomain
}
return a.store.List(ctx, poolDomain)
}
// RunSweeper starts a background loop that periodically deletes expired
// reservations. Cancel the parent context to stop the sweeper. Should run as
// a goroutine off cmd/pdm/main.
func (a *Allocator) RunSweeper(ctx context.Context, interval time.Duration) {
if interval <= 0 {
interval = 30 * time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
a.log.Info("sweeper shutdown")
return
case <-ticker.C:
deleted, err := a.store.ExpireReservations(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
a.log.Error("sweeper expire failed", "err", err)
continue
}
if deleted > 0 {
a.log.Info("sweeper expired reservations", "count", deleted)
}
}
}
}

View File

@ -0,0 +1,466 @@
// Package dynadot — DNS API client for OpenOva-managed pool domains.
//
// This package is the SOLE caller of api.dynadot.com in the OpenOva fleet.
// catalyst-api, the wizard, and every other product talk to DNS through
// pool-domain-manager — they never import this package directly. Centralising
// the writer means the auto-memory invariant `feedback_dynadot_dns.md`
// (NEVER run exploratory set_dns2 — each call wipes all records) is enforced
// architecturally: there's one writer, one commit path.
//
// Design choices baked in:
//
// - Every write uses add_dns_to_current_setting=yes so it appends rather
// than replaces. The Dynadot API treats set_dns2 as "REPLACE the entire
// zone" by default — the auto-memory documents an incident where this
// wiped MX records.
//
// - The managed-domain list comes from runtime configuration
// (DYNADOT_MANAGED_DOMAINS env var) per docs/INVIOLABLE-PRINCIPLES.md #4.
// Adding a fourth pool domain is purely a secret update — no rebuild.
//
// - Reads (set_dns2 has no list-records counterpart) are done via the
// get_dns command, which returns the current zone we then filter by
// subdomain prefix when DeleteSubdomainRecords needs to clean up.
package dynadot
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
)
// Client wraps the Dynadot REST API. Construct once and reuse.
type Client struct {
APIKey string
APISecret string
HTTP *http.Client
}
// New returns a Dynadot client. Credentials come from PDM's K8s secret
// `dynadot-api-credentials`; passing them in keeps this package free of
// direct env-var reads (the cmd/pdm main wires it together).
func New(apiKey, apiSecret string) *Client {
return &Client{
APIKey: apiKey,
APISecret: apiSecret,
HTTP: &http.Client{Timeout: 30 * time.Second},
}
}
// Record is a single DNS record we want Dynadot to publish.
type Record struct {
// Subdomain — leave empty (or "@") for apex. e.g. "console", "*",
// "*.omantel". Multi-label subdomains ARE supported; Dynadot's set_dns2
// allows arbitrary labels in the subdomain column.
Subdomain string
// Type — A, AAAA, CNAME, TXT, MX, etc.
Type string
// Value — depends on Type. For A: IPv4 string; for CNAME: target FQDN.
Value string
// TTL — seconds. Dynadot supports 60, 300, 1800, 3600, 7200, 14400,
// 28800, 43200, 86400. Defaults to 300 if zero.
TTL int
}
// AddRecord appends a single record to the domain's existing DNS configuration
// using add_dns_to_current_setting=yes. Idempotent across re-runs as long as
// the (subdomain, type, value) tuple is identical (Dynadot dedupes).
func (c *Client) AddRecord(ctx context.Context, domain string, rec Record) error {
if rec.TTL == 0 {
rec.TTL = 300
}
params := url.Values{}
params.Set("key", c.APIKey)
params.Set("secret", c.APISecret)
params.Set("command", "set_dns2")
params.Set("domain", domain)
params.Set("add_dns_to_current_setting", "yes")
if rec.Subdomain == "" || rec.Subdomain == "@" {
params.Set("main_record_type0", rec.Type)
params.Set("main_record0", rec.Value)
params.Set("main_recordx0", fmt.Sprintf("%d", rec.TTL))
} else {
params.Set("subdomain0", rec.Subdomain)
params.Set("sub_record_type0", rec.Type)
params.Set("sub_record0", rec.Value)
params.Set("sub_recordx0", fmt.Sprintf("%d", rec.TTL))
}
endpoint := "https://api.dynadot.com/api3.json?" + params.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return fmt.Errorf("build dynadot request: %w", err)
}
resp, err := c.HTTP.Do(req)
if err != nil {
return fmt.Errorf("dynadot api: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("dynadot api status %d: %s", resp.StatusCode, truncate(string(body), 256))
}
var result struct {
SetDNS2Response struct {
ResponseHeader struct {
ResponseCode string `json:"ResponseCode"`
Status string `json:"Status"`
Error string `json:"Error"`
} `json:"ResponseHeader"`
} `json:"SetDns2Response"`
}
if err := json.Unmarshal(body, &result); err != nil {
return fmt.Errorf("parse dynadot response: %w (body=%s)", err, truncate(string(body), 256))
}
hdr := result.SetDNS2Response.ResponseHeader
if !strings.EqualFold(hdr.Status, "success") && !strings.EqualFold(hdr.ResponseCode, "0") {
return fmt.Errorf("dynadot api error: code=%s status=%s err=%s", hdr.ResponseCode, hdr.Status, hdr.Error)
}
return nil
}
// AddSovereignRecords writes the canonical record set for a new Sovereign
// subdomain: wildcard + canonical control-plane prefixes (console, gitea,
// harbor, admin, api). All records point at the load balancer IP.
//
// Idempotent: re-running with the same (domain, subdomain, ip) is safe.
// Re-running with a different IP appends extra records (Dynadot append
// semantics) so the caller is responsible for calling DeleteSubdomainRecords
// first when re-pointing.
func (c *Client) AddSovereignRecords(ctx context.Context, domain, subdomain, lbIP string) error {
prefixes := []string{
"", // wildcard apex of the subdomain — *.omantel.omani.works
"console", // console.omantel.omani.works
"gitea", // gitea.omantel.omani.works
"harbor", // harbor.omantel.omani.works
"admin", // admin.omantel.omani.works
"api", // api.omantel.omani.works
}
for _, p := range prefixes {
var sub string
if p == "" {
sub = "*." + subdomain
} else {
sub = p + "." + subdomain
}
if err := c.AddRecord(ctx, domain, Record{
Subdomain: sub,
Type: "A",
Value: lbIP,
TTL: 300,
}); err != nil {
return fmt.Errorf("add %s record: %w", sub, err)
}
}
return nil
}
// DeleteSubdomainRecords removes every record under "*.<subdomain>",
// "<prefix>.<subdomain>" for the canonical Sovereign prefixes, by
// re-writing the zone WITHOUT those rows. Dynadot's API has no per-record
// delete; the path is "fetch zone, omit the rows we want gone, write it
// back". We use add_dns_to_current_setting=no for this path because the
// goal IS to replace the zone — but we replace it with a copy that lacks
// the targeted rows AND preserves every other row exactly.
//
// To avoid the auto-memory incident (set_dns2 wiping MX/TXT records), the
// implementation reads the full zone first via get_dns, mutates the in-
// memory representation, and writes back the COMPLETE zone minus the
// targeted rows. The result is a no-op for unrelated records.
//
// Returns nil even when no matching records existed — DeleteSubdomain is
// idempotent.
func (c *Client) DeleteSubdomainRecords(ctx context.Context, domain, subdomain string) error {
zone, err := c.getZone(ctx, domain)
if err != nil {
return fmt.Errorf("read zone: %w", err)
}
// Targets to remove: the wildcard + each canonical prefix.
targets := map[string]struct{}{
"*." + subdomain: {},
"console." + subdomain: {},
"gitea." + subdomain: {},
"harbor." + subdomain: {},
"admin." + subdomain: {},
"api." + subdomain: {},
}
keep := zone.SubRecords[:0]
for _, sr := range zone.SubRecords {
if _, drop := targets[sr.Subdomain]; drop {
continue
}
keep = append(keep, sr)
}
zone.SubRecords = keep
return c.writeZone(ctx, domain, zone)
}
// zoneSnapshot is the in-memory representation of the records returned by
// Dynadot's get_dns command, plus the apex (main) record set.
type zoneSnapshot struct {
MainRecords []mainRecord
SubRecords []subRecord
TTL int
}
type mainRecord struct {
Type string
Value string
}
type subRecord struct {
Subdomain string
Type string
Value string
}
// getZone reads the current zone via get_dns. Dynadot's response shape is
// nested under GetDnsResponse.Content.MxRecords + .NameServerSettings; we
// only care about the main + sub record arrays for the delete path.
func (c *Client) getZone(ctx context.Context, domain string) (*zoneSnapshot, error) {
params := url.Values{}
params.Set("key", c.APIKey)
params.Set("secret", c.APISecret)
params.Set("command", "get_dns")
params.Set("domain", domain)
endpoint := "https://api.dynadot.com/api3.json?" + params.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return nil, err
}
resp, err := c.HTTP.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("dynadot get_dns status %d: %s", resp.StatusCode, truncate(string(body), 256))
}
var raw struct {
GetDNSResponse struct {
ResponseHeader struct {
ResponseCode string `json:"ResponseCode"`
Status string `json:"Status"`
Error string `json:"Error"`
} `json:"ResponseHeader"`
Content struct {
NameServerSettings struct {
MainDomains []struct {
RecordType string `json:"record_type"`
Value string `json:"value"`
} `json:"MainDomains"`
SubDomains []struct {
Subhost string `json:"Subhost"`
RecordType string `json:"RecordType"`
Value string `json:"Value"`
} `json:"SubDomains"`
TTL int `json:"TTL"`
} `json:"NameServerSettings"`
} `json:"Content"`
} `json:"GetDnsResponse"`
}
if err := json.Unmarshal(body, &raw); err != nil {
return nil, fmt.Errorf("parse get_dns: %w (body=%s)", err, truncate(string(body), 256))
}
hdr := raw.GetDNSResponse.ResponseHeader
if !strings.EqualFold(hdr.Status, "success") && !strings.EqualFold(hdr.ResponseCode, "0") {
return nil, fmt.Errorf("dynadot get_dns: code=%s status=%s err=%s", hdr.ResponseCode, hdr.Status, hdr.Error)
}
out := &zoneSnapshot{TTL: raw.GetDNSResponse.Content.NameServerSettings.TTL}
for _, m := range raw.GetDNSResponse.Content.NameServerSettings.MainDomains {
out.MainRecords = append(out.MainRecords, mainRecord{Type: m.RecordType, Value: m.Value})
}
for _, s := range raw.GetDNSResponse.Content.NameServerSettings.SubDomains {
out.SubRecords = append(out.SubRecords, subRecord{Subdomain: s.Subhost, Type: s.RecordType, Value: s.Value})
}
return out, nil
}
// writeZone calls set_dns2 with add_dns_to_current_setting=NO and the full
// zone serialised. This is the dangerous code path the auto-memory warns
// about — we use it only when the caller has read the zone first via
// getZone and wants to write back a deliberate mutation.
func (c *Client) writeZone(ctx context.Context, domain string, zone *zoneSnapshot) error {
params := url.Values{}
params.Set("key", c.APIKey)
params.Set("secret", c.APISecret)
params.Set("command", "set_dns2")
params.Set("domain", domain)
// NOTE: deliberately NOT setting add_dns_to_current_setting — we want
// replace semantics here. The zone we serialise contains every row
// that was present minus the targeted deletions.
ttl := zone.TTL
if ttl == 0 {
ttl = 300
}
for i, m := range zone.MainRecords {
params.Set(fmt.Sprintf("main_record_type%d", i), m.Type)
params.Set(fmt.Sprintf("main_record%d", i), m.Value)
params.Set(fmt.Sprintf("main_recordx%d", i), fmt.Sprintf("%d", ttl))
}
for i, s := range zone.SubRecords {
params.Set(fmt.Sprintf("subdomain%d", i), s.Subdomain)
params.Set(fmt.Sprintf("sub_record_type%d", i), s.Type)
params.Set(fmt.Sprintf("sub_record%d", i), s.Value)
params.Set(fmt.Sprintf("sub_recordx%d", i), fmt.Sprintf("%d", ttl))
}
endpoint := "https://api.dynadot.com/api3.json?" + params.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return err
}
resp, err := c.HTTP.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("dynadot set_dns2 status %d: %s", resp.StatusCode, truncate(string(body), 256))
}
var result struct {
SetDNS2Response struct {
ResponseHeader struct {
ResponseCode string `json:"ResponseCode"`
Status string `json:"Status"`
Error string `json:"Error"`
} `json:"ResponseHeader"`
} `json:"SetDns2Response"`
}
if err := json.Unmarshal(body, &result); err != nil {
return fmt.Errorf("parse set_dns2: %w (body=%s)", err, truncate(string(body), 256))
}
hdr := result.SetDNS2Response.ResponseHeader
if !strings.EqualFold(hdr.Status, "success") && !strings.EqualFold(hdr.ResponseCode, "0") {
return fmt.Errorf("dynadot set_dns2 error: code=%s status=%s err=%s", hdr.ResponseCode, hdr.Status, hdr.Error)
}
return nil
}
// managedDomainsState mirrors the catalyst-api dynadot package's runtime
// resolution: env-var first, then legacy single-domain fallback, then a
// minimal built-in default (kept ONLY so unit tests work without an env).
var managedDomainsState struct {
once sync.Once
set map[string]struct{}
}
func resolveManagedDomains() map[string]struct{} {
managedDomainsState.once.Do(func() {
managedDomainsState.set = computeManagedDomains()
})
return managedDomainsState.set
}
func computeManagedDomains() map[string]struct{} {
out := make(map[string]struct{})
if raw := os.Getenv("DYNADOT_MANAGED_DOMAINS"); strings.TrimSpace(raw) != "" {
for _, tok := range splitDomainsList(raw) {
out[tok] = struct{}{}
}
if len(out) > 0 {
return out
}
}
if d := strings.ToLower(strings.TrimSpace(os.Getenv("DYNADOT_DOMAIN"))); d != "" {
out[d] = struct{}{}
return out
}
out["openova.io"] = struct{}{}
out["omani.works"] = struct{}{}
return out
}
// ResetManagedDomains clears the cache so tests can re-evaluate after
// mutating env vars.
func ResetManagedDomains() {
managedDomainsState.once = sync.Once{}
managedDomainsState.set = nil
}
// ManagedDomains returns a sorted, deduplicated copy of the configured
// managed-domain list. Useful for /healthz exposure and operator logs.
func ManagedDomains() []string {
set := resolveManagedDomains()
out := make([]string, 0, len(set))
for d := range set {
out = append(out, d)
}
for i := 1; i < len(out); i++ {
for j := i; j > 0 && out[j-1] > out[j]; j-- {
out[j-1], out[j] = out[j], out[j-1]
}
}
return out
}
// IsManagedDomain reports whether the given domain is one whose DNS Dynadot
// manages on behalf of OpenOva.
func IsManagedDomain(domain string) bool {
domain = strings.ToLower(strings.TrimSpace(domain))
if domain == "" {
return false
}
_, ok := resolveManagedDomains()[domain]
return ok
}
// splitDomainsList parses a `DYNADOT_MANAGED_DOMAINS`-style string —
// comma- or whitespace-separated, lower-cased, trimmed, deduped.
func splitDomainsList(raw string) []string {
raw = strings.ToLower(raw)
raw = strings.ReplaceAll(raw, ",", " ")
parts := strings.Fields(raw)
seen := make(map[string]struct{}, len(parts))
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
if _, ok := seen[p]; ok {
continue
}
seen[p] = struct{}{}
out = append(out, p)
}
return out
}
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max] + "..."
}
// Errors surfaced by the package for callers that want to type-switch.
var (
// ErrUnmanagedDomain — caller asked for an action against a domain not in
// DYNADOT_MANAGED_DOMAINS. Hard fail to defend against misconfiguration.
ErrUnmanagedDomain = errors.New("domain is not in the Dynadot managed list")
)

View File

@ -0,0 +1,62 @@
package dynadot
import (
"os"
"sort"
"strings"
"testing"
)
func TestIsManagedDomainEnvVar(t *testing.T) {
t.Setenv("DYNADOT_MANAGED_DOMAINS", "openova.io, omani.works acme.io")
t.Setenv("DYNADOT_DOMAIN", "")
ResetManagedDomains()
for _, d := range []string{"openova.io", "omani.works", "acme.io"} {
if !IsManagedDomain(d) {
t.Errorf("IsManagedDomain(%q) = false, want true", d)
}
}
if IsManagedDomain("not-managed.com") {
t.Errorf("IsManagedDomain(not-managed.com) = true, want false")
}
got := ManagedDomains()
sort.Strings(got)
want := []string{"acme.io", "omani.works", "openova.io"}
if strings.Join(got, ",") != strings.Join(want, ",") {
t.Errorf("ManagedDomains() = %v, want %v", got, want)
}
}
func TestIsManagedDomainLegacyFallback(t *testing.T) {
t.Setenv("DYNADOT_MANAGED_DOMAINS", "")
t.Setenv("DYNADOT_DOMAIN", "legacy.io")
ResetManagedDomains()
if !IsManagedDomain("legacy.io") {
t.Errorf("IsManagedDomain(legacy.io) = false, want true")
}
if IsManagedDomain("openova.io") {
t.Errorf("legacy fallback should not include built-in defaults")
}
}
func TestIsManagedDomainBuiltInDefaults(t *testing.T) {
os.Unsetenv("DYNADOT_MANAGED_DOMAINS")
os.Unsetenv("DYNADOT_DOMAIN")
ResetManagedDomains()
for _, d := range []string{"openova.io", "omani.works"} {
if !IsManagedDomain(d) {
t.Errorf("built-in default missing %q", d)
}
}
}
func TestSplitDomainsList(t *testing.T) {
got := splitDomainsList("Foo.com, BAR.IO\tbaz.io ,foo.com")
want := []string{"foo.com", "bar.io", "baz.io"}
if strings.Join(got, ",") != strings.Join(want, ",") {
t.Errorf("splitDomainsList = %v, want %v", got, want)
}
}

View File

@ -0,0 +1,451 @@
// Package handler — HTTP surface for pool-domain-manager.
//
// Endpoints (all JSON; per the issue body):
//
// GET /api/v1/pool/{domain}/check?sub=X Fast read; PDM-DB only.
// POST /api/v1/pool/{domain}/reserve Atomic reserve; 10-min TTL.
// POST /api/v1/pool/{domain}/commit Promote → ACTIVE + Dynadot.
// DELETE /api/v1/pool/{domain}/release Free; remove Dynadot.
// GET /api/v1/pool/{domain}/list Operator-facing list.
// GET /api/v1/reserved Public reserved-name list.
// GET /healthz Liveness probe.
//
// Per docs/INVIOLABLE-PRINCIPLES.md #4 the handler does not hardcode domain
// names — every value comes from the URL path or request body, validated
// against the runtime DYNADOT_MANAGED_DOMAINS list.
package handler
import (
"encoding/json"
"errors"
"log/slog"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
"github.com/openova-io/openova/core/pool-domain-manager/internal/allocator"
"github.com/openova-io/openova/core/pool-domain-manager/internal/dynadot"
"github.com/openova-io/openova/core/pool-domain-manager/internal/reserved"
"github.com/openova-io/openova/core/pool-domain-manager/internal/store"
)
// Handler holds the dependencies shared by every endpoint.
type Handler struct {
Alloc *allocator.Allocator
Store *store.Store // exposed for /healthz Ping
Log *slog.Logger
}
// New constructs a Handler.
func New(alloc *allocator.Allocator, s *store.Store, log *slog.Logger) *Handler {
return &Handler{Alloc: alloc, Store: s, Log: log}
}
// Routes returns the chi.Router with all PDM routes wired up.
func (h *Handler) Routes() *chi.Mux {
r := chi.NewRouter()
r.Get("/healthz", h.Healthz)
r.Route("/api/v1", func(r chi.Router) {
r.Get("/reserved", h.ListReserved)
r.Route("/pool/{domain}", func(r chi.Router) {
r.Get("/check", h.Check)
r.Get("/list", h.List)
r.Post("/reserve", h.Reserve)
r.Post("/commit", h.Commit)
r.Delete("/release", h.Release)
})
})
return r
}
// ── Healthz ────────────────────────────────────────────────────────────
// Healthz returns 200 if Postgres is reachable and the dynadot config is
// loaded; otherwise 503. The response includes the runtime managed-domain
// list so operators can grep for misconfiguration.
func (h *Handler) Healthz(w http.ResponseWriter, r *http.Request) {
if err := h.Store.Ping(r.Context()); err != nil {
writeJSON(w, http.StatusServiceUnavailable, map[string]any{
"status": "unhealthy",
"db": err.Error(),
})
return
}
writeJSON(w, http.StatusOK, map[string]any{
"status": "ok",
"managedDomains": dynadot.ManagedDomains(),
})
}
// ── Reserved-list ──────────────────────────────────────────────────────
// ListReserved exposes the canonical reserved-subdomain list. The wizard can
// consume this to render an inline hint instead of waiting for the user to
// type a reserved name and seeing an error.
func (h *Handler) ListReserved(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{
"reserved": reserved.All(),
})
}
// ── /pool/{domain}/check ───────────────────────────────────────────────
// Check is the read-only availability query. Always returns 200 OK with a
// JSON body — clients use the body's `available` field, not the HTTP
// status, to decide.
func (h *Handler) Check(w http.ResponseWriter, r *http.Request) {
domain := normaliseLabel(chi.URLParam(r, "domain"))
sub := normaliseLabel(r.URL.Query().Get("sub"))
if !isValidDNSLabel(sub) {
writeJSON(w, http.StatusOK, allocator.CheckResult{
Available: false,
Reason: "invalid-format",
Detail: "subdomain must be a-z, 0-9 and hyphens, start with a letter, max 63 characters",
})
return
}
res, err := h.Alloc.Check(r.Context(), domain, sub)
if err != nil {
h.Log.Error("check failed", "domain", domain, "sub", sub, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusOK, res)
}
// ── /pool/{domain}/reserve ─────────────────────────────────────────────
// ReserveRequest is the body shape POSTed to /reserve.
type ReserveRequest struct {
Subdomain string `json:"subdomain"`
CreatedBy string `json:"createdBy,omitempty"`
}
// ReserveResponse is the wire shape returned to the caller.
type ReserveResponse struct {
PoolDomain string `json:"poolDomain"`
Subdomain string `json:"subdomain"`
State string `json:"state"`
ReservedAt string `json:"reservedAt"`
ExpiresAt string `json:"expiresAt"`
ReservationToken string `json:"reservationToken"`
CreatedBy string `json:"createdBy"`
}
// Reserve atomically reserves the (domain, subdomain) pair for the
// configured TTL. Returns 201 Created on success, 409 Conflict if the name
// is taken, 422 Unprocessable Entity on validation failure.
func (h *Handler) Reserve(w http.ResponseWriter, r *http.Request) {
domain := normaliseLabel(chi.URLParam(r, "domain"))
var req ReserveRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON body"})
return
}
sub := normaliseLabel(req.Subdomain)
if !isValidDNSLabel(sub) {
writeJSON(w, http.StatusUnprocessableEntity, map[string]string{
"error": "invalid-format",
"detail": "subdomain must be a-z, 0-9 and hyphens, start with a letter, max 63 characters",
})
return
}
alloc, err := h.Alloc.Reserve(r.Context(), domain, sub, allocator.ReserveInput{CreatedBy: req.CreatedBy})
if err != nil {
switch {
case errors.Is(err, store.ErrConflict):
writeJSON(w, http.StatusConflict, map[string]string{
"error": "conflict",
"detail": "this subdomain is already reserved or active",
})
case errors.Is(err, dynadot.ErrUnmanagedDomain):
writeJSON(w, http.StatusUnprocessableEntity, map[string]string{
"error": "unsupported-pool",
"detail": "pool domain " + domain + " is not managed by OpenOva",
})
default:
h.Log.Error("reserve failed", "domain", domain, "sub", sub, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
}
return
}
resp := ReserveResponse{
PoolDomain: alloc.PoolDomain,
Subdomain: alloc.Subdomain,
State: string(alloc.State),
ReservedAt: alloc.ReservedAt.Format("2006-01-02T15:04:05Z07:00"),
ReservationToken: alloc.ReservationToken,
CreatedBy: alloc.CreatedBy,
}
if alloc.ExpiresAt != nil {
resp.ExpiresAt = alloc.ExpiresAt.Format("2006-01-02T15:04:05Z07:00")
}
writeJSON(w, http.StatusCreated, resp)
}
// ── /pool/{domain}/commit ──────────────────────────────────────────────
// CommitRequest carries the data needed to flip RESERVED → ACTIVE.
type CommitRequest struct {
Subdomain string `json:"subdomain"`
ReservationToken string `json:"reservationToken"`
SovereignFQDN string `json:"sovereignFQDN"`
LoadBalancerIP string `json:"loadBalancerIP"`
}
// Commit promotes a reservation to active and writes Dynadot records.
// Status codes:
//
// 200 OK — committed; row is active and DNS records exist
// 202 Accepted — committed in DB but Dynadot write failed (caller
// can retry Commit with same body; idempotent)
// 404 Not Found — no row exists for this (domain, subdomain)
// 409 Conflict — row is already active (re-commit attempt)
// 410 Gone — reservation TTL expired before commit; caller must
// Reserve again
// 403 Forbidden — reservation token mismatch
func (h *Handler) Commit(w http.ResponseWriter, r *http.Request) {
domain := normaliseLabel(chi.URLParam(r, "domain"))
var req CommitRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON body"})
return
}
sub := normaliseLabel(req.Subdomain)
if !isValidDNSLabel(sub) {
writeJSON(w, http.StatusUnprocessableEntity, map[string]string{
"error": "invalid-format",
"detail": "subdomain must be a-z, 0-9 and hyphens, start with a letter, max 63 characters",
})
return
}
if strings.TrimSpace(req.LoadBalancerIP) == "" {
writeJSON(w, http.StatusUnprocessableEntity, map[string]string{
"error": "missing-lb-ip",
"detail": "loadBalancerIP is required for commit",
})
return
}
alloc, err := h.Alloc.Commit(r.Context(), domain, sub, allocator.CommitInput{
ReservationToken: req.ReservationToken,
SovereignFQDN: req.SovereignFQDN,
LoadBalancerIP: req.LoadBalancerIP,
})
if err != nil {
// Allocator returns a wrapped "dynadot write" error AFTER the row was
// flipped to active. Surface 202 in that case so the caller knows the
// row is committed but DNS is pending.
if alloc != nil && strings.Contains(err.Error(), "dynadot write") {
writeJSON(w, http.StatusAccepted, map[string]any{
"warning": "row committed but Dynadot write failed; retry commit to publish DNS",
"detail": err.Error(),
"poolDomain": alloc.PoolDomain,
"subdomain": alloc.Subdomain,
"state": string(alloc.State),
"sovereignFQDN": alloc.SovereignFQDN,
"loadBalancerIP": alloc.LoadBalancerIP,
})
return
}
switch {
case errors.Is(err, store.ErrNotFound):
writeJSON(w, http.StatusNotFound, map[string]string{
"error": "not-found",
"detail": "no reservation exists for this (poolDomain, subdomain) — call /reserve first",
})
case errors.Is(err, store.ErrConflict):
writeJSON(w, http.StatusConflict, map[string]string{
"error": "already-active",
"detail": "this allocation is already active",
})
case errors.Is(err, store.ErrTokenMismatch):
writeJSON(w, http.StatusForbidden, map[string]string{
"error": "token-mismatch",
"detail": "reservation token does not match the held reservation",
})
case errors.Is(err, store.ErrExpired):
writeJSON(w, http.StatusGone, map[string]string{
"error": "reservation-expired",
"detail": "the reservation TTL elapsed before commit; reserve again",
})
case errors.Is(err, dynadot.ErrUnmanagedDomain):
writeJSON(w, http.StatusUnprocessableEntity, map[string]string{
"error": "unsupported-pool",
"detail": "pool domain " + domain + " is not managed by OpenOva",
})
default:
h.Log.Error("commit failed", "domain", domain, "sub", sub, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
}
return
}
writeJSON(w, http.StatusOK, allocationResponse(alloc))
}
// ── /pool/{domain}/release ─────────────────────────────────────────────
// ReleaseRequest is the body shape DELETEd to /release. We accept a body
// rather than a query param so the wire shape matches reserve/commit.
type ReleaseRequest struct {
Subdomain string `json:"subdomain"`
}
// Release deletes the row and removes Dynadot records (when state was
// active). Returns 200 OK on success, 404 if no row, 422 on validation.
//
// We do NOT require the reservation token here — Release is operator-side
// (also invoked by catalyst-api on tofu destroy) and the catalyst-api may
// not still hold the original token by the time the destroy fires.
func (h *Handler) Release(w http.ResponseWriter, r *http.Request) {
domain := normaliseLabel(chi.URLParam(r, "domain"))
var req ReleaseRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
// Allow ?sub= query fallback so curl -X DELETE without a body works.
req.Subdomain = r.URL.Query().Get("sub")
}
sub := normaliseLabel(req.Subdomain)
if !isValidDNSLabel(sub) {
writeJSON(w, http.StatusUnprocessableEntity, map[string]string{
"error": "invalid-format",
"detail": "subdomain must be a-z, 0-9 and hyphens, start with a letter, max 63 characters",
})
return
}
alloc, err := h.Alloc.Release(r.Context(), domain, sub)
if err != nil {
// Partial: row deleted but Dynadot delete failed.
if alloc != nil && strings.Contains(err.Error(), "dynadot delete") {
writeJSON(w, http.StatusAccepted, map[string]any{
"warning": "row deleted but Dynadot delete failed; operator must clean up DNS manually",
"detail": err.Error(),
"freed": allocationResponse(alloc),
})
return
}
switch {
case errors.Is(err, store.ErrNotFound):
writeJSON(w, http.StatusNotFound, map[string]string{
"error": "not-found",
"detail": "no allocation exists for this (poolDomain, subdomain)",
})
case errors.Is(err, dynadot.ErrUnmanagedDomain):
writeJSON(w, http.StatusUnprocessableEntity, map[string]string{
"error": "unsupported-pool",
"detail": "pool domain " + domain + " is not managed by OpenOva",
})
default:
h.Log.Error("release failed", "domain", domain, "sub", sub, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
}
return
}
writeJSON(w, http.StatusOK, map[string]any{
"freed": allocationResponse(alloc),
})
}
// ── /pool/{domain}/list ────────────────────────────────────────────────
// List returns every allocation for the given pool domain. Operator-only;
// the manifest gates the path behind a Traefik auth middleware.
func (h *Handler) List(w http.ResponseWriter, r *http.Request) {
domain := normaliseLabel(chi.URLParam(r, "domain"))
allocs, err := h.Alloc.List(r.Context(), domain)
if err != nil {
if errors.Is(err, dynadot.ErrUnmanagedDomain) {
writeJSON(w, http.StatusUnprocessableEntity, map[string]string{
"error": "unsupported-pool",
"detail": "pool domain " + domain + " is not managed by OpenOva",
})
return
}
h.Log.Error("list failed", "domain", domain, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
out := make([]map[string]any, 0, len(allocs))
for i := range allocs {
out = append(out, allocationResponse(&allocs[i]))
}
writeJSON(w, http.StatusOK, map[string]any{
"poolDomain": domain,
"allocations": out,
})
}
// ── helpers ────────────────────────────────────────────────────────────
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
func normaliseLabel(s string) string {
return strings.ToLower(strings.TrimSpace(s))
}
// isValidDNSLabel validates an RFC 1035 label (lower-case-only, since we
// normalise upstream).
func isValidDNSLabel(s string) bool {
if s == "" || len(s) > 63 {
return false
}
// Allow domain labels with embedded dots ONLY for the {domain} URL
// param — those are validated separately via dynadot.IsManagedDomain.
// For subdomain inputs we require a single label.
for i, r := range s {
switch {
case r >= 'a' && r <= 'z':
continue
case r >= '0' && r <= '9':
if i == 0 {
return false
}
continue
case r == '-':
if i == 0 || i == len(s)-1 {
return false
}
continue
default:
return false
}
}
return true
}
func allocationResponse(a *store.Allocation) map[string]any {
out := map[string]any{
"poolDomain": a.PoolDomain,
"subdomain": a.Subdomain,
"state": string(a.State),
"reservedAt": a.ReservedAt.Format("2006-01-02T15:04:05Z07:00"),
"createdBy": a.CreatedBy,
}
if a.ExpiresAt != nil {
out["expiresAt"] = a.ExpiresAt.Format("2006-01-02T15:04:05Z07:00")
}
if a.SovereignFQDN != "" {
out["sovereignFQDN"] = a.SovereignFQDN
}
if a.LoadBalancerIP != "" {
out["loadBalancerIP"] = a.LoadBalancerIP
}
if a.ReservationToken != "" {
out["reservationToken"] = a.ReservationToken
}
return out
}

View File

@ -0,0 +1,77 @@
// Package reserved holds the canonical list of subdomain names that no tenant
// may claim under any OpenOva pool domain. It used to live in
// products/catalyst/bootstrap/api/internal/handler/subdomains.go as a private
// var — duplicated knowledge with no clear owner.
//
// Per docs/INVIOLABLE-PRINCIPLES.md #4 the list lives in ONE place: PDM.
// catalyst-api consults PDM via /check; the wizard consults catalyst-api via
// /api/v1/subdomains/check. There is no second copy of this list anywhere
// else in the fleet.
//
// The list is the union of:
// - control-plane prefixes the OpenTofu module will materialise as DNS
// records on every Sovereign (api, admin, console, gitea, harbor — see
// dynadot.AddSovereignRecords)
// - infrastructure prefixes that map to specific OpenOva services
// (openova, catalyst, openbao, vault, flux, k8s, system)
// - operational prefixes that look enough like a Sovereign to be
// dangerous if a tenant grabbed them (www, mail, smtp, imap, vpn, app,
// status, docs)
//
// The IsReserved() function is the only exported surface — callers don't
// see (and can't mutate) the underlying map.
package reserved
import "strings"
// reservedSubdomains — names we never let a tenant claim as their Sovereign
// root subdomain. Tenants get *.omantel.omani.works style records
// automatically; allowing a tenant to claim "console" would create
// "console.console.omani.works" which is meaningless and confusing.
var reservedSubdomains = map[string]struct{}{
"api": {},
"admin": {},
"console": {},
"gitea": {},
"harbor": {},
"keycloak": {},
"www": {},
"mail": {},
"smtp": {},
"imap": {},
"vpn": {},
"openova": {},
"catalyst": {},
"docs": {},
"status": {},
"app": {},
"system": {},
"openbao": {},
"vault": {},
"flux": {},
"k8s": {},
}
// IsReserved reports whether the given subdomain (lower-cased, trimmed) is
// in the reserved set. Caller is responsible for validating the input is a
// well-formed DNS label first; this function only checks set membership.
func IsReserved(subdomain string) bool {
_, ok := reservedSubdomains[strings.ToLower(strings.TrimSpace(subdomain))]
return ok
}
// All returns a sorted copy of the reserved list. Used by /api/v1/reserved
// for clients (e.g. the wizard) that want to render the list inline as a
// hint to the user.
func All() []string {
out := make([]string, 0, len(reservedSubdomains))
for k := range reservedSubdomains {
out = append(out, k)
}
for i := 1; i < len(out); i++ {
for j := i; j > 0 && out[j-1] > out[j]; j-- {
out[j-1], out[j] = out[j], out[j-1]
}
}
return out
}

View File

@ -0,0 +1,45 @@
package reserved
import "testing"
func TestIsReserved(t *testing.T) {
cases := map[string]bool{
"api": true,
"console": true,
"openbao": true,
"k8s": true,
"omantel": false,
"acme": false,
"": false,
"API": true, // case-insensitive
" api ": true, // trims whitespace
"foo-bar": false,
"my-corp": false,
}
for k, want := range cases {
if got := IsReserved(k); got != want {
t.Errorf("IsReserved(%q) = %v, want %v", k, got, want)
}
}
}
func TestAllSorted(t *testing.T) {
names := All()
if len(names) == 0 {
t.Fatal("expected non-empty reserved list")
}
for i := 1; i < len(names); i++ {
if names[i-1] >= names[i] {
t.Errorf("All() not sorted at index %d: %q >= %q", i, names[i-1], names[i])
}
}
}
func TestAllNoReachableNames(t *testing.T) {
// Sanity-check: every name in All() must be IsReserved.
for _, n := range All() {
if !IsReserved(n) {
t.Errorf("%q is in All() but IsReserved returned false", n)
}
}
}

View File

@ -0,0 +1,31 @@
-- pool-domain-manager schema, applied idempotently at process start.
-- Per docs/INVIOLABLE-PRINCIPLES.md #3 the database is CloudNativePG and
-- the schema lives in this single file (no Atlas / Goose / Flyway needed
-- for two tables); running the same SQL repeatedly is safe.
--
-- The CHECK constraint on state and the PRIMARY KEY on (pool_domain,
-- subdomain) together guarantee that no two callers can hold a name in
-- conflicting states. The expires_at index speeds up the sweeper.
CREATE TABLE IF NOT EXISTS pool_allocations (
pool_domain TEXT NOT NULL,
subdomain TEXT NOT NULL,
state TEXT NOT NULL CHECK (state IN ('reserved', 'active')),
reserved_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ,
sovereign_fqdn TEXT,
load_balancer_ip TEXT,
reservation_token UUID,
created_by TEXT NOT NULL,
PRIMARY KEY (pool_domain, subdomain)
);
-- Sweeper-friendly partial index — only the (small) set of reserved rows
-- need to be scanned for TTL expiry.
CREATE INDEX IF NOT EXISTS pool_allocations_expires_idx
ON pool_allocations (expires_at)
WHERE state = 'reserved';
-- Operator-facing index — list all active rows for a pool fast.
CREATE INDEX IF NOT EXISTS pool_allocations_state_idx
ON pool_allocations (pool_domain, state);

View File

@ -0,0 +1,463 @@
// Package store — CloudNativePG / Postgres persistence for pool-domain-manager.
//
// The PDM owns a single table — pool_allocations — that holds the canonical
// allocation state for every (pool_domain, subdomain) pair the OpenOva fleet
// has ever reserved or activated. The table is intentionally simple: PDM is
// a small, stateless HTTP service backed by a single-writer Postgres database
// (CloudNativePG running in the openova-system namespace). Concurrency is
// resolved by Postgres row-level locks + UPSERT semantics rather than any
// application-level mutex.
//
// Schema (also encoded as a migration in migrations.sql):
//
// CREATE TABLE pool_allocations (
// pool_domain TEXT NOT NULL,
// subdomain TEXT NOT NULL,
// state TEXT NOT NULL CHECK (state IN ('reserved','active')),
// reserved_at TIMESTAMPTZ NOT NULL,
// expires_at TIMESTAMPTZ, -- NULL when state='active'
// sovereign_fqdn TEXT, -- set when state='active'
// load_balancer_ip TEXT, -- set when state='active'
// reservation_token UUID, -- set when state='reserved'
// created_by TEXT NOT NULL,
// PRIMARY KEY (pool_domain, subdomain)
// );
// CREATE INDEX pool_allocations_expires_idx ON pool_allocations (expires_at)
// WHERE state = 'reserved';
//
// Per docs/INVIOLABLE-PRINCIPLES.md #4 the connection string is read from the
// PDM_DATABASE_URL env var — never hardcoded. The K8s ExternalSecret pulls
// the credentials out of CNPG's auto-generated app secret and projects them
// here.
package store
import (
"context"
_ "embed"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)
// State enumerates the three lifecycle states of a (pool, subdomain) row.
// NULL — implicit (no row) — is the fourth state, represented by absence.
type State string
const (
// StateReserved — the name is held with a TTL. expires_at is non-NULL.
// On TTL expiry the row is deleted by the sweeper goroutine.
StateReserved State = "reserved"
// StateActive — the name has been committed and Dynadot DNS records have
// been written. expires_at is NULL; the row stays until Release.
StateActive State = "active"
)
// ErrConflict — somebody else holds this (pool_domain, subdomain) — used by
// the allocator to map to HTTP 409 Conflict.
var ErrConflict = errors.New("pool allocation conflict — name already reserved or active")
// ErrNotFound — no row exists for the (pool_domain, subdomain) pair. The
// handlers map this to HTTP 404.
var ErrNotFound = errors.New("pool allocation not found")
// Allocation is the persistent shape of a row in pool_allocations.
type Allocation struct {
PoolDomain string `json:"poolDomain"`
Subdomain string `json:"subdomain"`
State State `json:"state"`
ReservedAt time.Time `json:"reservedAt"`
ExpiresAt *time.Time `json:"expiresAt,omitempty"`
SovereignFQDN string `json:"sovereignFQDN,omitempty"`
LoadBalancerIP string `json:"loadBalancerIP,omitempty"`
ReservationToken string `json:"reservationToken,omitempty"`
CreatedBy string `json:"createdBy"`
}
// Store wraps the pgxpool.Pool with the SQL operations PDM needs.
type Store struct {
pool *pgxpool.Pool
}
// New connects to Postgres using the DSN, applies migrations, and returns a
// ready-to-use Store. Caller is responsible for calling Close on shutdown.
func New(ctx context.Context, dsn string) (*Store, error) {
cfg, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, fmt.Errorf("parse PDM_DATABASE_URL: %w", err)
}
// Modest pool — PDM handles low QPS (a wizard click rate of order 0.1/s
// fleet-wide) and we'd rather queue than monopolise CNPG connections.
cfg.MaxConns = 8
cfg.MinConns = 1
cfg.MaxConnLifetime = 30 * time.Minute
cfg.MaxConnIdleTime = 5 * time.Minute
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("connect postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("ping postgres: %w", err)
}
s := &Store{pool: pool}
if err := s.migrate(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("apply migrations: %w", err)
}
return s, nil
}
// Close releases the underlying connection pool.
func (s *Store) Close() {
s.pool.Close()
}
// Ping verifies the database is reachable. Used by /healthz.
func (s *Store) Ping(ctx context.Context) error {
return s.pool.Ping(ctx)
}
//go:embed migrations.sql
var migrationsSQL string
// migrate applies the embedded migrations.sql idempotently. We deliberately
// avoid a full migration framework — the schema is tiny and idempotent SQL
// keeps PDM's startup path one TCP connect + one CREATE TABLE IF NOT EXISTS.
func (s *Store) migrate(ctx context.Context) error {
_, err := s.pool.Exec(ctx, migrationsSQL)
return err
}
// Reserve atomically inserts a row in state='reserved' with the given TTL.
// Returns ErrConflict if a row already exists for the (pool, subdomain) pair
// in any state — including an expired reservation that the sweeper has not
// yet collected (we treat sweeper lag conservatively and rely on the caller
// running ExpireReservations periodically; the SQL also prunes expired rows
// for the specific key it touches as part of the same transaction).
func (s *Store) Reserve(ctx context.Context, poolDomain, subdomain string, ttl time.Duration, createdBy string) (*Allocation, error) {
if ttl <= 0 {
return nil, fmt.Errorf("Reserve: ttl must be positive, got %s", ttl)
}
tx, err := s.pool.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx) //nolint:errcheck
// Step 1: opportunistic prune — if a row exists for this exact key with
// state='reserved' and expires_at in the past, delete it. This keeps the
// reservation path responsive even if the background sweeper is slow.
if _, err := tx.Exec(ctx, `
DELETE FROM pool_allocations
WHERE pool_domain = $1
AND subdomain = $2
AND state = 'reserved'
AND expires_at < NOW()
`, poolDomain, subdomain); err != nil {
return nil, fmt.Errorf("prune expired: %w", err)
}
now := time.Now().UTC()
expires := now.Add(ttl)
token := uuid.New()
// Step 2: insert. If the row already exists (active OR not-yet-expired
// reservation) the unique constraint fires and we return ErrConflict.
_, err = tx.Exec(ctx, `
INSERT INTO pool_allocations
(pool_domain, subdomain, state, reserved_at, expires_at, reservation_token, created_by)
VALUES
($1, $2, 'reserved', $3, $4, $5, $6)
`, poolDomain, subdomain, now, expires, token, createdBy)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == "23505" /* unique_violation */ {
return nil, ErrConflict
}
return nil, fmt.Errorf("insert reservation: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("commit reservation: %w", err)
}
exp := expires
return &Allocation{
PoolDomain: poolDomain,
Subdomain: subdomain,
State: StateReserved,
ReservedAt: now,
ExpiresAt: &exp,
ReservationToken: token.String(),
CreatedBy: createdBy,
}, nil
}
// Get returns the current allocation for the (pool, subdomain) pair, or
// ErrNotFound. Get does NOT prune expired reservations on read — callers
// who want fresh results should run after ExpireReservations or accept that
// /check may briefly return state='reserved' for a row that has just expired.
func (s *Store) Get(ctx context.Context, poolDomain, subdomain string) (*Allocation, error) {
row := s.pool.QueryRow(ctx, `
SELECT pool_domain, subdomain, state, reserved_at, expires_at,
sovereign_fqdn, load_balancer_ip, reservation_token, created_by
FROM pool_allocations
WHERE pool_domain = $1 AND subdomain = $2
`, poolDomain, subdomain)
var a Allocation
var (
expiresAt *time.Time
sovereignFQDN *string
loadBalancerIP *string
reservationToken *uuid.UUID
)
err := row.Scan(
&a.PoolDomain,
&a.Subdomain,
&a.State,
&a.ReservedAt,
&expiresAt,
&sovereignFQDN,
&loadBalancerIP,
&reservationToken,
&a.CreatedBy,
)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("scan allocation: %w", err)
}
a.ExpiresAt = expiresAt
if sovereignFQDN != nil {
a.SovereignFQDN = *sovereignFQDN
}
if loadBalancerIP != nil {
a.LoadBalancerIP = *loadBalancerIP
}
if reservationToken != nil {
a.ReservationToken = reservationToken.String()
}
return &a, nil
}
// IsAvailable reports whether the (pool, subdomain) pair is free for a fresh
// reservation. A row in state='reserved' that has expired is treated as free
// (we transparently prune it on next Reserve). All other rows = taken.
func (s *Store) IsAvailable(ctx context.Context, poolDomain, subdomain string) (bool, error) {
a, err := s.Get(ctx, poolDomain, subdomain)
if errors.Is(err, ErrNotFound) {
return true, nil
}
if err != nil {
return false, err
}
if a.State == StateReserved && a.ExpiresAt != nil && a.ExpiresAt.Before(time.Now().UTC()) {
return true, nil
}
return false, nil
}
// CommitInput is what the /commit endpoint provides.
type CommitInput struct {
ReservationToken string
SovereignFQDN string
LoadBalancerIP string
}
// Commit promotes an existing reservation to state='active'. Verifies the
// reservation_token matches (so a stale wizard tab can't commit somebody
// else's reservation) and that the reservation has not yet expired.
//
// Returns ErrNotFound if the row is gone, ErrConflict if it is already
// active, ErrTokenMismatch if the token doesn't match, and ErrExpired if the
// reservation TTL elapsed.
func (s *Store) Commit(ctx context.Context, poolDomain, subdomain string, in CommitInput) (*Allocation, error) {
tx, err := s.pool.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx) //nolint:errcheck
// Lock the row for the duration of this transaction.
row := tx.QueryRow(ctx, `
SELECT state, expires_at, reservation_token
FROM pool_allocations
WHERE pool_domain = $1 AND subdomain = $2
FOR UPDATE
`, poolDomain, subdomain)
var (
state State
expiresAt *time.Time
reservationToken *uuid.UUID
)
if err := row.Scan(&state, &expiresAt, &reservationToken); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("lock row: %w", err)
}
if state == StateActive {
return nil, ErrConflict
}
if expiresAt != nil && expiresAt.Before(time.Now().UTC()) {
return nil, ErrExpired
}
wantToken, err := uuid.Parse(in.ReservationToken)
if err != nil {
return nil, ErrTokenMismatch
}
if reservationToken == nil || *reservationToken != wantToken {
return nil, ErrTokenMismatch
}
if _, err := tx.Exec(ctx, `
UPDATE pool_allocations
SET state = 'active',
expires_at = NULL,
reservation_token = NULL,
sovereign_fqdn = $3,
load_balancer_ip = $4
WHERE pool_domain = $1 AND subdomain = $2
`, poolDomain, subdomain, in.SovereignFQDN, in.LoadBalancerIP); err != nil {
return nil, fmt.Errorf("commit update: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("commit tx: %w", err)
}
return s.Get(ctx, poolDomain, subdomain)
}
// Release deletes the row for (pool, subdomain) regardless of state. Returns
// the released row's previous state so the handler can decide whether to
// fire Dynadot delete calls (only state='active' rows have DNS records).
func (s *Store) Release(ctx context.Context, poolDomain, subdomain string) (*Allocation, error) {
row := s.pool.QueryRow(ctx, `
DELETE FROM pool_allocations
WHERE pool_domain = $1 AND subdomain = $2
RETURNING pool_domain, subdomain, state, reserved_at, expires_at,
sovereign_fqdn, load_balancer_ip, reservation_token, created_by
`, poolDomain, subdomain)
var a Allocation
var (
expiresAt *time.Time
sovereignFQDN *string
loadBalancerIP *string
reservationToken *uuid.UUID
)
err := row.Scan(
&a.PoolDomain,
&a.Subdomain,
&a.State,
&a.ReservedAt,
&expiresAt,
&sovereignFQDN,
&loadBalancerIP,
&reservationToken,
&a.CreatedBy,
)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("delete allocation: %w", err)
}
a.ExpiresAt = expiresAt
if sovereignFQDN != nil {
a.SovereignFQDN = *sovereignFQDN
}
if loadBalancerIP != nil {
a.LoadBalancerIP = *loadBalancerIP
}
if reservationToken != nil {
a.ReservationToken = reservationToken.String()
}
return &a, nil
}
// List returns every allocation for the given pool domain. Used by the
// operator-facing /list endpoint.
func (s *Store) List(ctx context.Context, poolDomain string) ([]Allocation, error) {
rows, err := s.pool.Query(ctx, `
SELECT pool_domain, subdomain, state, reserved_at, expires_at,
sovereign_fqdn, load_balancer_ip, reservation_token, created_by
FROM pool_allocations
WHERE pool_domain = $1
ORDER BY subdomain
`, poolDomain)
if err != nil {
return nil, fmt.Errorf("list allocations: %w", err)
}
defer rows.Close()
var out []Allocation
for rows.Next() {
var a Allocation
var (
expiresAt *time.Time
sovereignFQDN *string
loadBalancerIP *string
reservationToken *uuid.UUID
)
if err := rows.Scan(
&a.PoolDomain,
&a.Subdomain,
&a.State,
&a.ReservedAt,
&expiresAt,
&sovereignFQDN,
&loadBalancerIP,
&reservationToken,
&a.CreatedBy,
); err != nil {
return nil, fmt.Errorf("scan list row: %w", err)
}
a.ExpiresAt = expiresAt
if sovereignFQDN != nil {
a.SovereignFQDN = *sovereignFQDN
}
if loadBalancerIP != nil {
a.LoadBalancerIP = *loadBalancerIP
}
if reservationToken != nil {
a.ReservationToken = reservationToken.String()
}
out = append(out, a)
}
return out, rows.Err()
}
// ExpireReservations deletes every state='reserved' row whose expires_at is
// in the past. Returns the count of rows deleted. Called periodically by the
// sweeper goroutine.
func (s *Store) ExpireReservations(ctx context.Context) (int64, error) {
tag, err := s.pool.Exec(ctx, `
DELETE FROM pool_allocations
WHERE state = 'reserved'
AND expires_at < NOW()
`)
if err != nil {
return 0, fmt.Errorf("expire reservations: %w", err)
}
return tag.RowsAffected(), nil
}
// ErrTokenMismatch — the reservation_token in the request did not match
// the row's stored token. The caller likely is a stale tab.
var ErrTokenMismatch = errors.New("reservation token does not match")
// ErrExpired — the reservation TTL has elapsed; the caller must Reserve
// again before committing.
var ErrExpired = errors.New("reservation expired before commit")

View File

@ -0,0 +1,180 @@
package store
import (
"context"
"errors"
"os"
"testing"
"time"
"github.com/google/uuid"
)
// integrationDSN — set CI/local env to a writable Postgres for these tests.
// When unset, the integration tests skip; the rest of the package gets
// covered by allocator/handler tests with a thin in-memory shim. We
// deliberately don't pull testcontainers into the build path — Catalyst-
// Zero CI already runs Postgres as a service for other suites and the same
// container can host PDM's tests via PDM_TEST_DSN.
func integrationDSN(t *testing.T) string {
t.Helper()
dsn := os.Getenv("PDM_TEST_DSN")
if dsn == "" {
t.Skip("PDM_TEST_DSN not set — skipping integration test")
}
return dsn
}
func newTestStore(t *testing.T) *Store {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s, err := New(ctx, integrationDSN(t))
if err != nil {
t.Fatalf("connect: %v", err)
}
t.Cleanup(func() {
// Truncate after each test so subsequent runs start clean.
_, _ = s.pool.Exec(context.Background(), `TRUNCATE pool_allocations`)
s.Close()
})
return s
}
func TestReserveHappyPath(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
a, err := s.Reserve(ctx, "omani.works", "tenant1", 10*time.Minute, "test")
if err != nil {
t.Fatalf("reserve: %v", err)
}
if a.State != StateReserved {
t.Errorf("state=%s want reserved", a.State)
}
if _, err := uuid.Parse(a.ReservationToken); err != nil {
t.Errorf("reservation token not a UUID: %v", err)
}
if a.ExpiresAt == nil || a.ExpiresAt.Before(time.Now().UTC()) {
t.Errorf("expiresAt must be in the future, got %v", a.ExpiresAt)
}
}
func TestReserveConflict(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
if _, err := s.Reserve(ctx, "omani.works", "tenant1", 10*time.Minute, "test"); err != nil {
t.Fatalf("first reserve: %v", err)
}
_, err := s.Reserve(ctx, "omani.works", "tenant1", 10*time.Minute, "test")
if !errors.Is(err, ErrConflict) {
t.Fatalf("second reserve: want ErrConflict, got %v", err)
}
}
func TestExpiryFreesName(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
if _, err := s.Reserve(ctx, "omani.works", "tenant1", 1*time.Millisecond, "test"); err != nil {
t.Fatalf("reserve: %v", err)
}
time.Sleep(50 * time.Millisecond)
// Reserve again — should succeed because the previous reservation has
// expired (the Reserve path prunes the expired row in the same tx).
a, err := s.Reserve(ctx, "omani.works", "tenant1", 10*time.Minute, "test")
if err != nil {
t.Fatalf("re-reserve after expiry: %v", err)
}
if a.State != StateReserved {
t.Errorf("state=%s want reserved", a.State)
}
}
func TestCommitFlipsState(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
r, err := s.Reserve(ctx, "omani.works", "tenant1", 10*time.Minute, "test")
if err != nil {
t.Fatalf("reserve: %v", err)
}
committed, err := s.Commit(ctx, "omani.works", "tenant1", CommitInput{
ReservationToken: r.ReservationToken,
SovereignFQDN: "tenant1.omani.works",
LoadBalancerIP: "1.2.3.4",
})
if err != nil {
t.Fatalf("commit: %v", err)
}
if committed.State != StateActive {
t.Errorf("state=%s want active", committed.State)
}
if committed.LoadBalancerIP != "1.2.3.4" {
t.Errorf("lbIP=%s", committed.LoadBalancerIP)
}
}
func TestCommitTokenMismatch(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
if _, err := s.Reserve(ctx, "omani.works", "tenant1", 10*time.Minute, "test"); err != nil {
t.Fatalf("reserve: %v", err)
}
_, err := s.Commit(ctx, "omani.works", "tenant1", CommitInput{
ReservationToken: uuid.NewString(),
SovereignFQDN: "tenant1.omani.works",
LoadBalancerIP: "1.2.3.4",
})
if !errors.Is(err, ErrTokenMismatch) {
t.Fatalf("commit: want ErrTokenMismatch, got %v", err)
}
}
func TestReleaseRemovesRow(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
r, err := s.Reserve(ctx, "omani.works", "tenant1", 10*time.Minute, "test")
if err != nil {
t.Fatalf("reserve: %v", err)
}
if _, err := s.Commit(ctx, "omani.works", "tenant1", CommitInput{
ReservationToken: r.ReservationToken,
SovereignFQDN: "tenant1.omani.works",
LoadBalancerIP: "1.2.3.4",
}); err != nil {
t.Fatalf("commit: %v", err)
}
freed, err := s.Release(ctx, "omani.works", "tenant1")
if err != nil {
t.Fatalf("release: %v", err)
}
if freed.State != StateActive {
t.Errorf("freed.State=%s want active (the previous state)", freed.State)
}
// Now the row is gone — Get must return ErrNotFound.
if _, err := s.Get(ctx, "omani.works", "tenant1"); !errors.Is(err, ErrNotFound) {
t.Errorf("after release: want ErrNotFound, got %v", err)
}
}
func TestExpireReservationsSweeper(t *testing.T) {
s := newTestStore(t)
ctx := context.Background()
if _, err := s.Reserve(ctx, "omani.works", "x", 1*time.Millisecond, "test"); err != nil {
t.Fatalf("reserve: %v", err)
}
time.Sleep(20 * time.Millisecond)
deleted, err := s.ExpireReservations(ctx)
if err != nil {
t.Fatalf("expire: %v", err)
}
if deleted != 1 {
t.Errorf("deleted=%d want 1", deleted)
}
}