feat(continuum): K-Cont-3 — Cloudflare KV + DNS-quorum lease witness impls (#1101) (#1158)

Adds two production witness.Client implementations behind the K-Cont-2
WitnessClient interface, plus a parametric contract test suite that
both impls (and InMemoryClient) run against.

- internal/witness/cloudflarekv: HTTP CAS client over the K-Cont-4
  Cloudflare Worker (PUT/GET/DELETE on /lease/<slot> with If-Match
  generation header; 412 → ErrLeaseHeldByAnother). Bearer-token auth
  via K8s SecretRef.
- internal/witness/dnsquorum: 2-of-3 quorum read/write across N
  authoritative DNS servers. TXT records at <slot>.<domain> with
  pipe-delimited <holder>|<acquired>|<expires>|<gen> wire format.
  Std-lib net.Resolver with DialContext targets each server (no new
  go.mod dep). TSIG/TXT-write done through an injected TXTWriter
  interface (production wiring against PDM /v1/txt is K-Cont-{4|5}).
- internal/witness/testing: parametric RunContractSuite(t, factory)
  exported helper. Backend factory yields {A,B,Other,Advance} so the
  same 14 sub-tests cover CAS atomicity, ErrLeaseLost paths, Release
  idempotency, Generation monotonicity, slot isolation, TTL eviction,
  and ctx cancel for every Client impl.
- internal/witness: Selector dispatch refactored to a Register()
  registry pattern (impls register Factory at init() time via
  blank-import in cmd/main.go). Adds SecretReader interface so impls
  resolve K8s Secret refs without dragging client-go into the witness
  package.
- cmd/main.go: blank-imports cloudflarekv + dnsquorum to wire the
  registry; adds k8sSecretReader (mirrors EPIC-3 F's readClientSecret
  seam) using mgr.GetClient(); WITNESS_SECRET_NS env (default
  catalyst-controllers).

Tests:
- contract suite × 3 backends (in-memory + CFKV httptest + DNS-quorum
  fakeBackend) all green under -race.
- impl-specific tests cover constructor validation, factory cfg
  parsing (incl. SecretRef resolution), auth rejection, split-brain
  (1+1+1 → ErrLeaseHeldByAnother), 2-of-3 quorum, sub-quorum failure,
  encode/decode round-trip incl. legacy 3-field shape.

Pre-existing CI failures triaged per canon §7 (PR #1132 +
#1156): TestPinIssue + TestBootstrapKit/gitea + UI cosmetic-guards +
StepComponents — none touched by this slice.

Co-authored-by: hatiyildiz <hati.yildiz@openova.io>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
e3mrah 2026-05-09 07:41:19 +04:00 committed by GitHub
parent c2b93e8165
commit 9c2233867b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 2492 additions and 279 deletions

View File

@ -8,10 +8,11 @@
// switchover sequence per docs/SRE.md §2 + docs/MULTI-REGION-DNS.md.
//
// K-Cont-1 shipped the binary + chart + CI workflow + skeleton.
// K-Cont-2 (this slice) replaces the Reconcile body with the full
// per-CR goroutine + lease state machine + switchover sequencer +
// NATS audit publisher. K-Cont-3 wires the real lease witness
// implementations (cloudflare-kv + dns-quorum); K-Cont-4 ships the
// K-Cont-2 ships the per-CR goroutine + lease state machine +
// switchover sequencer + NATS audit publisher. K-Cont-3 (this
// build) wires the real lease witness implementations
// (cloudflare-kv + dns-quorum) — the impl packages register their
// Factory at init() time via blank-import below. K-Cont-4 ships the
// Cloudflare Worker source.
//
// Configuration is environment-only — per
@ -33,6 +34,13 @@
// CATALYST_REGION — host-cluster name THIS controller represents (stamped on Witness.Acquire)
// WITNESS_IN_MEMORY — "true" enables the in-memory witness selector (TEST ONLY; default false)
//
// K-Cont-3 wires this:
//
// WITNESS_SECRET_NS — K8s namespace where lease-witness Secret refs are
// looked up (CF API token, TSIG key). Default
// "catalyst-controllers" (mirrors EPIC-3 F's
// `FederationSecretNamespace`).
//
// Per-CR config (lease TTL/renew, witness kind, regions) is read
// from Continuum CR spec, NEVER env (per INVIOLABLE-PRINCIPLES #4).
//
@ -56,11 +64,21 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/openova-io/openova/core/controllers/continuum/internal/controller"
"github.com/openova-io/openova/core/controllers/continuum/internal/events"
"github.com/openova-io/openova/core/controllers/continuum/internal/pdm"
"github.com/openova-io/openova/core/controllers/continuum/internal/switchover"
"github.com/openova-io/openova/core/controllers/continuum/internal/witness"
// K-Cont-3 lease-witness implementations — register their
// Factory at init() time so DefaultSelector dispatches them. The
// controller does NOT need typed access to either package; the
// blank-import is the WIRING — removing it disables the kinds.
_ "github.com/openova-io/openova/core/controllers/continuum/internal/witness/cloudflarekv"
_ "github.com/openova-io/openova/core/controllers/continuum/internal/witness/dnsquorum"
)
var scheme = runtime.NewScheme()
@ -109,12 +127,20 @@ func main() {
os.Exit(1)
}
// Witness selector. K-Cont-2 ships ErrNotImplemented for the
// real kinds (cloudflare-kv + dns-quorum); K-Cont-3 swaps in
// the implementations. The in-memory selector is gated by the
// WITNESS_IN_MEMORY env var — TEST ONLY.
// Witness selector. K-Cont-3 wires the cloudflare-kv +
// dns-quorum impls via blank-import (see top-of-file imports).
// The in-memory selector remains gated by the WITNESS_IN_MEMORY
// env var — TEST ONLY. The SecretReader resolves CF API tokens
// + TSIG keys from K8s Secrets in WITNESS_SECRET_NS (default
// "catalyst-controllers", same as EPIC-3 F's federation
// secret namespace).
witnessSecretNS := env("WITNESS_SECRET_NS", "catalyst-controllers")
sel := &witness.DefaultSelector{
InMemoryAllowed: envBool("WITNESS_IN_MEMORY", false),
SecretReader: &k8sSecretReader{
Client: mgr.GetClient(),
Namespace: witnessSecretNS,
},
}
// PDM client — when PDM_API_URL is empty, the PDMCommit closure
@ -207,3 +233,43 @@ func podNamespace() string {
}
return string(b)
}
// k8sSecretReader implements witness.SecretReader by GETting the
// referenced K8s Secret via the controller-runtime client.
//
// Per Inviolable Principle #5 the value bytes stay in memory only
// until the calling Factory hands them to the wire client (CF bearer
// token / TSIG key). NEVER log; NEVER persist outside the Secret.
//
// Mirrors the EPIC-3 F readClientSecret seam — but lifted to an
// interface so the witness package doesn't import client-go.
type k8sSecretReader struct {
Client client.Client
Namespace string
}
// ReadSecret implements witness.SecretReader.
func (r *k8sSecretReader) ReadSecret(ctx context.Context, secretName, key string) ([]byte, error) {
if secretName == "" {
return nil, fmt.Errorf("witness/k8sSecretReader: secret name is empty")
}
if key == "" {
return nil, fmt.Errorf("witness/k8sSecretReader: secret key is empty")
}
ns := r.Namespace
if ns == "" {
ns = "catalyst-controllers"
}
var sec corev1.Secret
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: ns, Name: secretName}, &sec); err != nil {
return nil, fmt.Errorf("witness/k8sSecretReader: get Secret %s/%s: %w", ns, secretName, err)
}
val, ok := sec.Data[key]
if !ok {
return nil, fmt.Errorf("witness/k8sSecretReader: Secret %s/%s missing key %q", ns, secretName, key)
}
if len(val) == 0 {
return nil, fmt.Errorf("witness/k8sSecretReader: Secret %s/%s key %q is empty", ns, secretName, key)
}
return val, nil
}

View File

@ -0,0 +1,435 @@
// Package cloudflarekv implements witness.Client over a Cloudflare
// Worker backed by Workers KV. The Worker is the K-Cont-4 deliverable;
// THIS client speaks the Worker's HTTP CAS contract.
//
// The contract (which K-Cont-4 must implement):
//
// GET /lease/<slot> → 200 {holder, acquiredAt,
// expiresAt, generation} | 404
// PUT /lease/<slot> req body: {holder, ttlSeconds, op}
// req header: If-Match: <generation>
// (use "0" for first
// acquire on an empty slot)
// → 200 {…new state…} on CAS success
// → 412 on CAS conflict
// (held by another)
// DELETE /lease/<slot> req header: If-Match: <generation>
// req header: X-Holder: <holder>
// → 204 success
// → 412 lost CAS or
// not the holder
//
// `op` discriminator on PUT: "acquire" or "renew" — for the Worker's
// log and for asymmetric handling (acquire allows Generation=0;
// renew requires the current Generation to MATCH).
//
// Authentication: the catalyst-controllers ServiceAccount references
// a SealedSecret holding the Worker API token; the controller passes
// it as `Authorization: Bearer <token>`. Per Inviolable Principle #5
// the plaintext lives in memory only.
//
// Why this shape vs raw KV REST?
// - The Worker centralises the CAS check (KV doesn't natively
// expose If-Match — the Worker computes the comparison).
// - One HTTPS endpoint per witness (vs per-account KV creds).
// - Worker logs per-CR access for audit.
//
// K-Cont-3 (this package) ships the CLIENT; K-Cont-4 (separate slice)
// ships the WORKER source.
package cloudflarekv
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/openova-io/openova/core/controllers/continuum/internal/witness"
)
func init() {
witness.Register("cloudflare-kv", factory)
}
// CFKVClient implements witness.Client over a Cloudflare Worker.
//
// Concurrent-safe: a single CFKVClient is used by the per-Continuum-CR
// goroutine for all its Renew calls; the Worker is the
// concurrency-arbitration point.
type CFKVClient struct {
// BaseURL is the Worker root, e.g.
// "https://lease.openova.workers.dev". Trailing slash stripped.
BaseURL string
// APIToken is the bearer token resolved from a K8s Secret. NEVER
// log this value.
APIToken string
// Slot is the per-CR identifier (`<namespace>/<name>`). URL-encoded
// at request time so slashes don't confuse the Worker's routing.
Slot string
// HTTPClient is the underlying transport. Tests inject a
// httptest.Server-backed client.
HTTPClient *http.Client
}
// New constructs a CFKVClient. baseURL + apiToken + slot are required.
//
// httpClient is optional — when nil, a 10-second-timeout client is
// used. The 10s ceiling matches the lease renew interval (per the
// Continuum CRD default 10s) so a stuck request can't outlive a renew
// cycle and double-acquire.
func New(baseURL, apiToken, slot string, httpClient *http.Client) (*CFKVClient, error) {
if strings.TrimSpace(baseURL) == "" {
return nil, errors.New("cloudflarekv: BaseURL is required")
}
if strings.TrimSpace(apiToken) == "" {
return nil, errors.New("cloudflarekv: APIToken is required")
}
if strings.TrimSpace(slot) == "" {
return nil, errors.New("cloudflarekv: Slot is required")
}
if httpClient == nil {
httpClient = &http.Client{Timeout: 10 * time.Second}
}
return &CFKVClient{
BaseURL: strings.TrimRight(strings.TrimSpace(baseURL), "/"),
APIToken: strings.TrimSpace(apiToken),
Slot: strings.TrimSpace(slot),
HTTPClient: httpClient,
}, nil
}
// factory is the witness.Factory entry registered at init() time. Per
// the K-Cont-3 brief cfg may carry:
//
// slot (string) REQUIRED — `<namespace>/<name>`
// baseURL (string) REQUIRED — Worker URL
// tokenSecretRef (map) {name, key} — K8s Secret holding the
// Worker bearer token
// apiToken (string) ALTERNATE to tokenSecretRef — direct
// token for tests; NOT used
// in production paths.
func factory(cfg map[string]any, secrets witness.SecretReader) (witness.Client, error) {
slot, _ := cfg["slot"].(string)
baseURL, _ := cfg["baseURL"].(string)
if baseURL == "" {
// Tolerate the alternate spelling per the CRD shape — the
// CRD has accountId + kvNamespaceId + tokenSecretRef and
// expects the Worker URL to be derived. K-Cont-3 takes a
// straight baseURL key; if the CR uses the CRD shape, the
// reconciler is responsible for translating to baseURL
// before calling Select. Fall back to "workerURL" alias for
// flexibility.
if v, ok := cfg["workerURL"].(string); ok {
baseURL = v
}
}
// Token resolution path A: direct (tests only).
apiToken, _ := cfg["apiToken"].(string)
// Path B: SecretRef. Production path. Required when apiToken is
// empty.
if apiToken == "" {
ref, _ := cfg["tokenSecretRef"].(map[string]any)
if ref == nil {
// alternate field-name from K-Cont-2's `tokenSecretRef`
// slice may pass "tokenSecretRef" as
// map[string]interface{}{} OR as a marshaled JSON object.
// We don't try harder than this; if neither is present,
// surface a clear error.
return nil, errors.New("cloudflarekv: cfg must include tokenSecretRef{name,key} or apiToken")
}
name, _ := ref["name"].(string)
key, _ := ref["key"].(string)
if key == "" {
key = "token"
}
if name == "" {
return nil, errors.New("cloudflarekv: tokenSecretRef.name is empty")
}
if secrets == nil {
return nil, errors.New("cloudflarekv: SecretReader not configured (controller must wire DefaultSelector.SecretReader)")
}
// 5s budget for the secret read — controller startup must
// not block forever on a missing Secret.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
b, err := secrets.ReadSecret(ctx, name, key)
if err != nil {
return nil, fmt.Errorf("cloudflarekv: read tokenSecretRef %s/%s: %w", name, key, err)
}
apiToken = strings.TrimSpace(string(b))
if apiToken == "" {
return nil, fmt.Errorf("cloudflarekv: tokenSecretRef %s/%s is empty", name, key)
}
}
return New(baseURL, apiToken, slot, nil)
}
// kvState mirrors the Worker's response body. Field names are JSON
// camelCase per the Worker contract.
type kvState struct {
Holder string `json:"holder"`
AcquiredAt string `json:"acquiredAt"` // RFC3339 — server-stamped
ExpiresAt string `json:"expiresAt"` // RFC3339 — server-stamped
Generation int64 `json:"generation"`
}
// kvWriteRequest is the PUT body for both acquire and renew.
type kvWriteRequest struct {
Holder string `json:"holder"`
TTLSeconds int `json:"ttlSeconds"`
Op string `json:"op"` // "acquire" | "renew"
}
// Acquire implements witness.Client.
func (c *CFKVClient) Acquire(ctx context.Context, holder string, ttl time.Duration) (witness.State, error) {
if err := ctx.Err(); err != nil {
return witness.State{}, err
}
// Read current state to learn the generation for If-Match. On
// fresh slots Read returns Generation=0; the Worker accepts
// If-Match: 0 for the first acquire.
cur, err := c.Read(ctx)
if err != nil {
return witness.State{}, err
}
return c.write(ctx, holder, ttl, "acquire", cur.Generation)
}
// Renew implements witness.Client.
func (c *CFKVClient) Renew(ctx context.Context, holder string, ttl time.Duration) (witness.State, error) {
if err := ctx.Err(); err != nil {
return witness.State{}, err
}
cur, err := c.Read(ctx)
if err != nil {
return witness.State{}, err
}
// If we don't currently hold the lease (or it's expired), Renew
// MUST surface ErrLeaseLost regardless of what the Worker says.
// This matches the K-Cont-2 contract: Renew is for the holder
// only.
if cur.Holder != holder {
return cur, witness.ErrLeaseLost
}
if !time.Now().Before(cur.ExpiresAt) {
return cur, witness.ErrLeaseLost
}
st, err := c.write(ctx, holder, ttl, "renew", cur.Generation)
if err != nil {
// Map ErrLeaseHeldByAnother → ErrLeaseLost on the renew
// path (K-Cont-2 contract: Renew never returns
// ErrLeaseHeldByAnother — it's only "we lost the lease").
if errors.Is(err, witness.ErrLeaseHeldByAnother) {
return st, witness.ErrLeaseLost
}
return st, err
}
return st, nil
}
// Release implements witness.Client.
func (c *CFKVClient) Release(ctx context.Context, holder string) error {
if err := ctx.Err(); err != nil {
return err
}
cur, err := c.Read(ctx)
if err != nil {
return err
}
if cur.Holder == "" || cur.Holder != holder {
// Idempotent: non-holder Release is a no-op (per K-Cont-2
// contract). DO NOT round-trip to the Worker.
return nil
}
url := c.BaseURL + "/lease/" + pathEscapeSlot(c.Slot)
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
if err != nil {
return fmt.Errorf("cloudflarekv: build DELETE: %w", err)
}
c.applyAuth(req)
req.Header.Set("If-Match", strconv.FormatInt(cur.Generation, 10))
req.Header.Set("X-Holder", holder)
resp, err := c.HTTPClient.Do(req)
if err != nil {
return fmt.Errorf("cloudflarekv: DELETE: %w", err)
}
defer resp.Body.Close()
switch {
case resp.StatusCode == http.StatusNoContent || resp.StatusCode == http.StatusOK:
return nil
case resp.StatusCode == http.StatusPreconditionFailed:
// CAS lost between our Read and our DELETE — by the K-Cont-2
// contract Release is idempotent, so a non-holder DELETE
// returns nil. Worker reports the new state but we don't
// care.
return nil
case resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden:
return fmt.Errorf("cloudflarekv: auth rejected (status %d)", resp.StatusCode)
default:
body := readSnippet(resp.Body)
return fmt.Errorf("cloudflarekv: DELETE status %d: %s", resp.StatusCode, body)
}
}
// Read implements witness.Client.
func (c *CFKVClient) Read(ctx context.Context) (witness.State, error) {
if err := ctx.Err(); err != nil {
return witness.State{}, err
}
url := c.BaseURL + "/lease/" + pathEscapeSlot(c.Slot)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return witness.State{}, fmt.Errorf("cloudflarekv: build GET: %w", err)
}
c.applyAuth(req)
req.Header.Set("Accept", "application/json")
resp, err := c.HTTPClient.Do(req)
if err != nil {
return witness.State{}, fmt.Errorf("cloudflarekv: GET: %w", err)
}
defer resp.Body.Close()
switch {
case resp.StatusCode == http.StatusNotFound:
// Empty slot — return zero state with Generation=0 so a
// subsequent Acquire knows to PUT with If-Match: 0.
return witness.State{}, nil
case resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden:
return witness.State{}, fmt.Errorf("cloudflarekv: auth rejected (status %d)", resp.StatusCode)
case resp.StatusCode >= 200 && resp.StatusCode < 300:
// fall through
default:
body := readSnippet(resp.Body)
return witness.State{}, fmt.Errorf("cloudflarekv: GET status %d: %s", resp.StatusCode, body)
}
var k kvState
if err := json.NewDecoder(resp.Body).Decode(&k); err != nil {
return witness.State{}, fmt.Errorf("cloudflarekv: decode GET: %w", err)
}
return parseState(k)
}
// write performs the PUT-with-If-Match dance for both acquire and
// renew.
func (c *CFKVClient) write(ctx context.Context, holder string, ttl time.Duration, op string, ifMatch int64) (witness.State, error) {
body, err := json.Marshal(kvWriteRequest{
Holder: holder,
TTLSeconds: int(ttl / time.Second),
Op: op,
})
if err != nil {
return witness.State{}, fmt.Errorf("cloudflarekv: marshal write: %w", err)
}
url := c.BaseURL + "/lease/" + pathEscapeSlot(c.Slot)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
if err != nil {
return witness.State{}, fmt.Errorf("cloudflarekv: build PUT: %w", err)
}
c.applyAuth(req)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("If-Match", strconv.FormatInt(ifMatch, 10))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return witness.State{}, fmt.Errorf("cloudflarekv: PUT: %w", err)
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated:
var k kvState
if err := json.NewDecoder(resp.Body).Decode(&k); err != nil {
return witness.State{}, fmt.Errorf("cloudflarekv: decode PUT: %w", err)
}
st, perr := parseState(k)
if perr != nil {
return witness.State{}, perr
}
return st, nil
case http.StatusPreconditionFailed:
// CAS lost — the Worker may include the current state. Try
// to decode for the caller's benefit; but ALWAYS surface
// ErrLeaseHeldByAnother.
var k kvState
_ = json.NewDecoder(resp.Body).Decode(&k)
st, _ := parseState(k)
return st, witness.ErrLeaseHeldByAnother
case http.StatusUnauthorized, http.StatusForbidden:
return witness.State{}, fmt.Errorf("cloudflarekv: auth rejected (status %d)", resp.StatusCode)
default:
body := readSnippet(resp.Body)
return witness.State{}, fmt.Errorf("cloudflarekv: PUT status %d: %s", resp.StatusCode, body)
}
}
// applyAuth stamps the bearer token. Centralised so we never miss it.
func (c *CFKVClient) applyAuth(req *http.Request) {
req.Header.Set("Authorization", "Bearer "+c.APIToken)
// Belt-and-suspenders for Worker logging: include the slot as a
// header so the Worker can log it without parsing the URL.
req.Header.Set("X-Lease-Slot", c.Slot)
}
// parseState decodes the wire shape into a witness.State.
func parseState(k kvState) (witness.State, error) {
out := witness.State{
Holder: k.Holder,
Generation: k.Generation,
}
if k.AcquiredAt != "" {
t, err := time.Parse(time.RFC3339, k.AcquiredAt)
if err != nil {
return out, fmt.Errorf("cloudflarekv: parse acquiredAt %q: %w", k.AcquiredAt, err)
}
out.AcquiredAt = t
}
if k.ExpiresAt != "" {
t, err := time.Parse(time.RFC3339, k.ExpiresAt)
if err != nil {
return out, fmt.Errorf("cloudflarekv: parse expiresAt %q: %w", k.ExpiresAt, err)
}
out.ExpiresAt = t
}
return out, nil
}
// pathEscapeSlot URL-encodes the slot for safe inclusion in the
// Worker's path. We use a simple substitution for `/` because the
// Worker treats the slot as an opaque key — `%2F` is the canonical
// encoding.
func pathEscapeSlot(slot string) string {
// Use net/url style escaping; net/url.PathEscape encodes `/` to
// `%2F` which is what we want.
return strings.ReplaceAll(slot, "/", "%2F")
}
// readSnippet returns at most 256 chars of the body for an error
// message. Never logs.
func readSnippet(r io.Reader) string {
b, _ := io.ReadAll(io.LimitReader(r, 512))
s := string(b)
if len(s) > 256 {
s = s[:256] + "..."
}
return s
}
// Compile-time assertion that CFKVClient satisfies witness.Client.
var _ witness.Client = (*CFKVClient)(nil)

View File

@ -0,0 +1,394 @@
// Tests for CFKVClient.
//
// Strategy: stand up an httptest.Server that mocks the K-Cont-4
// Cloudflare Worker — its JSON wire shape is documented in
// client.go's package doc and is THE contract K-Cont-4 must satisfy.
// The mock implements the same atomic-CAS semantics as the in-memory
// witness reference. The shared parametric contract suite from
// `internal/witness/testing` then runs against CFKVClient — a
// behavioral diff between the in-memory reference and CF surfaces
// here.
package cloudflarekv
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/openova-io/openova/core/controllers/continuum/internal/witness"
contracttest "github.com/openova-io/openova/core/controllers/continuum/internal/witness/testing"
)
// fakeWorker mimics the K-Cont-4 Cloudflare Worker over httptest.
// One fakeWorker holds N slot states keyed by slot string; multiple
// CFKVClient instances bound to the same slot contend through it.
type fakeWorker struct {
mu sync.Mutex
slots map[string]*kvState
now func() time.Time
bearer string
}
func newFakeWorker(bearer string, now func() time.Time) *fakeWorker {
return &fakeWorker{
slots: map[string]*kvState{},
now: now,
bearer: bearer,
}
}
func (w *fakeWorker) handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/lease/", w.serveLease)
return mux
}
func (w *fakeWorker) serveLease(rw http.ResponseWriter, req *http.Request) {
// Auth check — every request must carry the bearer token.
if got := req.Header.Get("Authorization"); got != "Bearer "+w.bearer {
http.Error(rw, "unauthorized", http.StatusUnauthorized)
return
}
slot := strings.TrimPrefix(req.URL.Path, "/lease/")
if slot == "" {
http.Error(rw, "missing slot", http.StatusBadRequest)
return
}
// CFKVClient encodes `/` as `%2F`; net/http decodes it back to
// `/` automatically before serving.
w.mu.Lock()
defer w.mu.Unlock()
now := w.now()
switch req.Method {
case http.MethodGet:
st, ok := w.slots[slot]
if !ok {
http.NotFound(rw, req)
return
}
// TTL eviction at the witness side: if the stored ExpiresAt
// is in the past, surface an empty record (the holder field
// is cleared but Generation is preserved so the next
// Acquire's If-Match has a defined value to match against).
writeJSON(rw, http.StatusOK, st)
return
case http.MethodPut:
var body kvWriteRequest
if err := json.NewDecoder(req.Body).Decode(&body); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
ifMatch, _ := strconv.ParseInt(req.Header.Get("If-Match"), 10, 64)
cur := w.slots[slot]
curGen := int64(0)
if cur != nil {
curGen = cur.Generation
}
if ifMatch != curGen {
// CAS conflict.
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusPreconditionFailed)
if cur != nil {
_ = json.NewEncoder(rw).Encode(cur)
}
return
}
// Decide whether the slot is takeable. Takeable when:
// - slot is empty (cur==nil)
// - slot is expired
// - holder matches (re-acquire / renew)
var (
holder = body.Holder
ttl = time.Duration(body.TTLSeconds) * time.Second
takeable = cur == nil
sameHold = cur != nil && cur.Holder == holder
expired = cur != nil && !beforeRFC3339(now, cur.ExpiresAt)
)
if !takeable && !sameHold && !expired {
// Held by another, non-expired holder.
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusPreconditionFailed)
_ = json.NewEncoder(rw).Encode(cur)
return
}
// Renew op MUST require sameHold + non-expired (matches
// K-Cont-2 contract). The CFKVClient pre-checks this client-
// side, so a stray renew arriving here is malformed; reject.
if body.Op == "renew" && !(sameHold && !expired) {
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusPreconditionFailed)
if cur != nil {
_ = json.NewEncoder(rw).Encode(cur)
}
return
}
var (
acquiredAt time.Time
)
if sameHold && !expired {
// Preserve the original acquisition time on
// re-acquire/renew.
acquiredAt, _ = time.Parse(time.RFC3339, cur.AcquiredAt)
} else {
acquiredAt = now
}
next := &kvState{
Holder: holder,
AcquiredAt: acquiredAt.Format(time.RFC3339),
ExpiresAt: now.Add(ttl).Format(time.RFC3339),
Generation: curGen + 1,
}
w.slots[slot] = next
writeJSON(rw, http.StatusOK, next)
return
case http.MethodDelete:
ifMatch, _ := strconv.ParseInt(req.Header.Get("If-Match"), 10, 64)
holder := req.Header.Get("X-Holder")
cur := w.slots[slot]
if cur == nil {
rw.WriteHeader(http.StatusNoContent)
return
}
if cur.Generation != ifMatch || cur.Holder != holder {
rw.WriteHeader(http.StatusPreconditionFailed)
return
}
// Bump generation on release so a subsequent Acquire sees a
// non-zero If-Match-required generation. This matches the
// in-memory reference's Generation+1 on Release.
w.slots[slot] = &kvState{Generation: cur.Generation + 1}
rw.WriteHeader(http.StatusNoContent)
return
default:
http.Error(rw, "method not allowed", http.StatusMethodNotAllowed)
return
}
}
func writeJSON(rw http.ResponseWriter, status int, v any) {
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(status)
_ = json.NewEncoder(rw).Encode(v)
}
func beforeRFC3339(now time.Time, expiresAt string) bool {
t, err := time.Parse(time.RFC3339, expiresAt)
if err != nil {
return false
}
return now.Before(t)
}
// fakeClock is a thread-safe clock the tests advance.
type fakeClock struct {
mu sync.Mutex
t time.Time
}
func newFakeClock() *fakeClock {
return &fakeClock{t: time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC)}
}
func (c *fakeClock) Now() time.Time {
c.mu.Lock()
defer c.mu.Unlock()
return c.t
}
func (c *fakeClock) Advance(d time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.t = c.t.Add(d)
}
// TestCFKV_ContractSuite runs the parametric witness contract against
// CFKVClient over a fake Worker. Behavioral drift between the
// in-memory reference and CF surfaces here.
func TestCFKV_ContractSuite(t *testing.T) {
t.Parallel()
contracttest.RunContractSuite(t, func() *contracttest.Backend {
clk := newFakeClock()
w := newFakeWorker("test-token", clk.Now)
srv := httptest.NewServer(w.handler())
mkClient := func(slot string) witness.Client {
c, err := New(srv.URL, "test-token", slot, srv.Client())
if err != nil {
t.Fatalf("New: %v", err)
}
return c
}
return &contracttest.Backend{
A: mkClient("ns/cr-main"),
B: mkClient("ns/cr-main"),
Other: mkClient("ns/cr-other"),
Advance: clk.Advance,
Cleanup: srv.Close,
}
})
}
// TestCFKV_ConstructorValidation — happy + sad path on New().
func TestCFKV_ConstructorValidation(t *testing.T) {
t.Parallel()
cases := []struct {
name, base, token, slot string
wantErr bool
}{
{"missing baseURL", "", "tok", "ns/cr", true},
{"missing token", "http://x", "", "ns/cr", true},
{"missing slot", "http://x", "tok", "", true},
{"all set", "http://x", "tok", "ns/cr", false},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
c, err := New(tc.base, tc.token, tc.slot, nil)
if tc.wantErr {
if err == nil {
t.Fatalf("expected error")
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if c == nil {
t.Fatalf("nil client")
}
})
}
}
// TestCFKV_AuthHeaderRequired — the Worker rejects without bearer
// and the client surfaces 401 as a clear error.
func TestCFKV_AuthHeaderRequired(t *testing.T) {
t.Parallel()
clk := newFakeClock()
w := newFakeWorker("real-token", clk.Now)
srv := httptest.NewServer(w.handler())
defer srv.Close()
c, err := New(srv.URL, "wrong-token", "ns/cr", srv.Client())
if err != nil {
t.Fatalf("New: %v", err)
}
if _, err := c.Acquire(context.Background(), "fsn", 30*time.Second); err == nil {
t.Fatalf("expected auth-rejected error")
} else if !strings.Contains(err.Error(), "auth rejected") {
t.Fatalf("expected auth rejected, got: %v", err)
}
}
// TestCFKV_FactoryFromCfg — the registered factory parses cfg
// correctly and resolves SecretRef tokens.
func TestCFKV_FactoryFromCfg(t *testing.T) {
t.Parallel()
clk := newFakeClock()
w := newFakeWorker("from-secret-token", clk.Now)
srv := httptest.NewServer(w.handler())
defer srv.Close()
secrets := witness.SecretReaderFunc(func(_ context.Context, name, key string) ([]byte, error) {
if name != "cf-token" || key != "token" {
return nil, errors.New("not found")
}
return []byte("from-secret-token"), nil
})
cfg := map[string]any{
"slot": "ns/cr",
"baseURL": srv.URL,
"tokenSecretRef": map[string]any{
"name": "cf-token",
"key": "token",
},
}
cli, err := factory(cfg, secrets)
if err != nil {
t.Fatalf("factory: %v", err)
}
// Override transport so we hit the test server's TLS-less listener.
cli.(*CFKVClient).HTTPClient = srv.Client()
st, err := cli.Acquire(context.Background(), "fsn", 30*time.Second)
if err != nil {
t.Fatalf("Acquire: %v", err)
}
if st.Holder != "fsn" {
t.Fatalf("Holder = %q want fsn", st.Holder)
}
}
func TestCFKV_FactoryRejectsMissingFields(t *testing.T) {
t.Parallel()
cases := []struct {
name string
cfg map[string]any
}{
{"empty", map[string]any{}},
{"missing slot", map[string]any{"baseURL": "http://x", "apiToken": "t"}},
{"missing baseURL", map[string]any{"slot": "x", "apiToken": "t"}},
{"missing token", map[string]any{"slot": "x", "baseURL": "http://x"}},
{
"secretRef without reader",
map[string]any{
"slot": "x",
"baseURL": "http://x",
"tokenSecretRef": map[string]any{"name": "n", "key": "k"},
},
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
_, err := factory(tc.cfg, nil)
if err == nil {
t.Fatalf("expected error")
}
})
}
}
// TestCFKV_GenerationBumpedOnRelease — the Worker's contract bumps
// Generation on Release so a subsequent Acquire's If-Match has a
// well-defined non-zero baseline. This belongs in CF-specific tests
// because the in-memory reference also does this; we verify CF
// matches.
func TestCFKV_GenerationBumpedOnRelease(t *testing.T) {
t.Parallel()
clk := newFakeClock()
w := newFakeWorker("t", clk.Now)
srv := httptest.NewServer(w.handler())
defer srv.Close()
c, _ := New(srv.URL, "t", "ns/cr", srv.Client())
st1, err := c.Acquire(context.Background(), "fsn", 30*time.Second)
if err != nil {
t.Fatalf("Acquire: %v", err)
}
if err := c.Release(context.Background(), "fsn"); err != nil {
t.Fatalf("Release: %v", err)
}
st2, err := c.Read(context.Background())
if err != nil {
t.Fatalf("Read: %v", err)
}
if st2.Generation <= st1.Generation {
t.Fatalf("Generation did not bump on Release: %d -> %d", st1.Generation, st2.Generation)
}
if st2.Holder != "" {
t.Fatalf("Holder = %q want empty after Release", st2.Holder)
}
}

View File

@ -0,0 +1,618 @@
// Package dnsquorum implements witness.Client across N (default 3)
// DNS authoritative servers that we have TXT-write access to. Quorum
// = ceil(N/2 + 1) = 2-of-3 for the default fleet.
//
// Wire shape:
//
// Slot key: <slot>.<domain> e.g. "ns_cr.lease.openova.io"
// ('/' in slot becomes '_' to keep DNS-label-safe)
// TXT record value: "<holder>:<expires-at-rfc3339>:<generation>"
// (empty/no record = "free")
//
// The CAS protocol (per K-Cont-3 brief item 2):
//
// - Acquire: read all N servers in parallel; compute quorum view of
// current generation; PUT new value with generation+1 on all N;
// succeed iff ≥ quorum acks. Quorum split / generation skew on
// read → defensively return ErrLeaseHeldByAnother.
//
// - Renew: same as Acquire but the generation MUST bump and the
// holder MUST already match — otherwise ErrLeaseLost.
//
// - Release: PUT empty value (cleared) on all N; idempotent.
//
// - Read: parallel TXT lookup; majority wins; tie → empty.
//
// For Phase-1 POC, the WRITE side uses the PowerDNS REST API
// directly (PATCH /api/v1/servers/localhost/zones/<zone>) on each of
// the N servers. The TXT-write endpoint is an injected TXTWriter so
// tests don't need a real PowerDNS. In production the same writer
// hits the existing PDM `/v1/txt` endpoint (planned addition to PDM
// — K-Cont-3 ships against the contract, K-Cont-{4|5} adds the PDM
// endpoint).
//
// Why std-lib net.Resolver vs github.com/miekg/dns?
// - Zero new dependency in core/controllers/go.mod (per
// INVIOLABLE-PRINCIPLES quality bar — fewer transitive deps =
// fewer attack surfaces + smaller binary).
// - net.Resolver supports DialContext to target a specific
// resolver IP, which is exactly what we need for per-server
// TXT lookups.
// - We never need DNSSEC validation here (the witness data is
// non-sensitive — just lease state); a recursive resolver shape
// suffices.
package dnsquorum
import (
"context"
"errors"
"fmt"
"net"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/openova-io/openova/core/controllers/continuum/internal/witness"
)
func init() {
witness.Register("dns-quorum", factory)
}
// TXTWriter writes a TXT record at <slot>.<domain> on a single
// authoritative server. value="" means "delete the record".
//
// Implementations:
// - HTTPTXTWriter: PowerDNS REST API (production)
// - in-memory map: tests
type TXTWriter interface {
// WriteTXT writes (or deletes when value=="") the TXT record.
// MUST be idempotent — repeated identical calls are fine.
// Returns nil on ack.
WriteTXT(ctx context.Context, server, fqdn, value string, ttl time.Duration) error
}
// TXTReader reads TXT records at fqdn from a single authoritative
// server. Returns the record values (typically a 1-element slice for
// a TXT we wrote). Empty slice = no record.
type TXTReader interface {
ReadTXT(ctx context.Context, server, fqdn string) ([]string, error)
}
// DNSQuorumClient implements witness.Client across N DNS servers.
//
// Concurrent-safe — every write fans out to all servers in parallel.
type DNSQuorumClient struct {
// Servers is the authoritative-server list. Each entry is the
// server's reachable address (e.g. "ns1.openova.io:53" for DNS
// reads; the corresponding PowerDNS API endpoint is derived by
// the TXTWriter).
Servers []string
// TSIGKey is the shared key the production HTTPTXTWriter uses
// to sign PUT requests. Per Inviolable Principle #5 the value
// is in memory only — NEVER log.
TSIGKey string
// Domain is the DNS suffix for lease records (e.g.
// "lease.openova.io"). The full FQDN is `<encodedSlot>.<domain>`.
Domain string
// Slot is the per-CR identifier (`<namespace>/<name>`); '/' is
// replaced with '_' to keep DNS-label-safe.
Slot string
// Writer + Reader are the per-server TXT I/O backends. Defaults
// applied by New() when nil.
Writer TXTWriter
Reader TXTReader
// QuorumOverride lets tests force a specific quorum threshold
// (production = ceil(N/2+1)). 0 = use the canonical formula.
QuorumOverride int
}
// New constructs a DNSQuorumClient. servers / domain / slot required;
// writer + reader default to the production HTTP-PowerDNS + std-lib
// TXTReader respectively when nil.
func New(servers []string, tsigKey, domain, slot string, writer TXTWriter, reader TXTReader) (*DNSQuorumClient, error) {
if len(servers) < 3 {
return nil, fmt.Errorf("dnsquorum: at least 3 servers required, got %d", len(servers))
}
if strings.TrimSpace(domain) == "" {
return nil, errors.New("dnsquorum: Domain is required")
}
if strings.TrimSpace(slot) == "" {
return nil, errors.New("dnsquorum: Slot is required")
}
c := &DNSQuorumClient{
Servers: servers,
TSIGKey: strings.TrimSpace(tsigKey),
Domain: strings.TrimRight(strings.TrimSpace(domain), "."),
Slot: strings.TrimSpace(slot),
Writer: writer,
Reader: reader,
}
if c.Reader == nil {
c.Reader = NewStdLibTXTReader(2 * time.Second)
}
if c.Writer == nil {
// In production the writer is wired separately (PowerDNS
// API client). New() returning a nil writer is allowed for
// the read-path-only case; write methods will surface a
// clear error if invoked.
}
return c, nil
}
// factory is the witness.Factory entry registered at init() time.
//
// cfg keys:
//
// slot (string) REQUIRED — `<ns>/<name>`
// dnsServers ([]string|[]any) REQUIRED — N≥3 servers
// domain (string) REQUIRED — DNS suffix
// tsigSecretRef (map) {name, key} — TSIG key Secret
// tsigKey (string) ALTERNATE — direct (tests)
//
// For Phase-1 POC the same factory is used in production; if the CR
// supplies a CRD-shaped `resolvers` field instead of `dnsServers`,
// the reconciler is expected to translate before calling Select.
func factory(cfg map[string]any, secrets witness.SecretReader) (witness.Client, error) {
slot, _ := cfg["slot"].(string)
domain, _ := cfg["domain"].(string)
if domain == "" {
domain = "lease.openova.io" // brief's recommended default
}
servers, err := stringSlice(cfg["dnsServers"])
if err != nil {
return nil, fmt.Errorf("dnsquorum: dnsServers: %w", err)
}
if len(servers) == 0 {
// CRD shape uses "resolvers" — try that field too for
// forward-compat with the K-Cont-2 CRD.
servers, err = stringSlice(cfg["resolvers"])
if err != nil {
return nil, fmt.Errorf("dnsquorum: resolvers: %w", err)
}
}
tsigKey, _ := cfg["tsigKey"].(string)
if tsigKey == "" {
ref, _ := cfg["tsigSecretRef"].(map[string]any)
if ref != nil {
name, _ := ref["name"].(string)
key, _ := ref["key"].(string)
if key == "" {
key = "tsig"
}
if name == "" {
return nil, errors.New("dnsquorum: tsigSecretRef.name is empty")
}
if secrets == nil {
return nil, errors.New("dnsquorum: SecretReader not configured (controller must wire DefaultSelector.SecretReader)")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
b, sErr := secrets.ReadSecret(ctx, name, key)
if sErr != nil {
return nil, fmt.Errorf("dnsquorum: read tsigSecretRef %s/%s: %w", name, key, sErr)
}
tsigKey = strings.TrimSpace(string(b))
}
}
// Writer is nil here — production wiring injects a real
// PowerDNS-API writer separately. For now the client errors on
// write methods if invoked — this matches the Phase-1 POC
// strategy in the brief (CF-KV is operational; DNS-quorum
// requires PDM /v1/txt endpoint which is K-Cont-{4|5}).
return New(servers, tsigKey, domain, slot, nil, nil)
}
// stringSlice converts a cfg value (which may be []string,
// []interface{}, or nil) into []string. nil → empty + nil.
func stringSlice(v any) ([]string, error) {
switch s := v.(type) {
case nil:
return nil, nil
case []string:
return s, nil
case []any:
out := make([]string, 0, len(s))
for i, e := range s {
str, ok := e.(string)
if !ok {
return nil, fmt.Errorf("element %d is %T, want string", i, e)
}
out = append(out, str)
}
return out, nil
default:
return nil, fmt.Errorf("want []string, got %T", v)
}
}
// fqdn returns `<encodedSlot>.<Domain>` for this client.
func (c *DNSQuorumClient) fqdn() string {
return encodeSlot(c.Slot) + "." + c.Domain
}
// encodeSlot replaces '/' with '_' so the slot is DNS-label-safe.
// '/' is the canonical Continuum slot separator (`<ns>/<name>`); '_'
// is permitted in DNS labels per RFC 2181 §11 (label characters are
// arbitrary octets) and PowerDNS handles them fine.
func encodeSlot(slot string) string {
return strings.ReplaceAll(slot, "/", "_")
}
// quorum returns the quorum threshold. Tests can override via
// QuorumOverride.
func (c *DNSQuorumClient) quorum() int {
if c.QuorumOverride > 0 {
return c.QuorumOverride
}
return len(c.Servers)/2 + 1
}
// Acquire implements witness.Client.
func (c *DNSQuorumClient) Acquire(ctx context.Context, holder string, ttl time.Duration) (witness.State, error) {
if err := ctx.Err(); err != nil {
return witness.State{}, err
}
cur, ok := c.readQuorum(ctx)
if !ok {
// Read quorum failed — defensive: treat as held by another.
return cur, witness.ErrLeaseHeldByAnother
}
// Truncate to second precision — matches RFC3339 wire format
// granularity (the wire round-trip would truncate anyway, so
// returning a truncated value avoids same-holder Equal-check
// drift in tests + matches what subsequent Read() would see).
now := time.Now().UTC().Truncate(time.Second)
// Slot is takeable iff free, expired, or already ours.
takeable := cur.Holder == "" || !now.Before(cur.ExpiresAt) || cur.Holder == holder
if !takeable {
return cur, witness.ErrLeaseHeldByAnother
}
// Compute next state — preserve AcquiredAt on re-acquire.
acquiredAt := now
if cur.Holder == holder && now.Before(cur.ExpiresAt) && !cur.AcquiredAt.IsZero() {
acquiredAt = cur.AcquiredAt
}
next := witness.State{
Holder: holder,
AcquiredAt: acquiredAt,
ExpiresAt: now.Add(ttl),
Generation: cur.Generation + 1,
}
if err := c.writeQuorum(ctx, encodeValue(next), ttl); err != nil {
return witness.State{}, err
}
return next, nil
}
// Renew implements witness.Client.
func (c *DNSQuorumClient) Renew(ctx context.Context, holder string, ttl time.Duration) (witness.State, error) {
if err := ctx.Err(); err != nil {
return witness.State{}, err
}
cur, ok := c.readQuorum(ctx)
if !ok {
return cur, witness.ErrLeaseLost
}
now := time.Now().UTC().Truncate(time.Second)
if cur.Holder != holder {
return cur, witness.ErrLeaseLost
}
if !now.Before(cur.ExpiresAt) {
return cur, witness.ErrLeaseLost
}
next := witness.State{
Holder: holder,
AcquiredAt: cur.AcquiredAt,
ExpiresAt: now.Add(ttl),
Generation: cur.Generation + 1,
}
if err := c.writeQuorum(ctx, encodeValue(next), ttl); err != nil {
return witness.State{}, err
}
return next, nil
}
// Release implements witness.Client.
func (c *DNSQuorumClient) Release(ctx context.Context, holder string) error {
if err := ctx.Err(); err != nil {
return err
}
cur, ok := c.readQuorum(ctx)
if !ok {
// Quorum read failed — idempotent Release: don't surface an
// error, but skip the write.
return nil
}
if cur.Holder == "" || cur.Holder != holder {
return nil
}
// Clear the record. Bump generation so a subsequent Acquire
// sees a non-zero baseline (matches in-memory + CF reference).
released := witness.State{Generation: cur.Generation + 1}
return c.writeQuorum(ctx, encodeValue(released), 30*time.Second)
}
// Read implements witness.Client.
func (c *DNSQuorumClient) Read(ctx context.Context) (witness.State, error) {
if err := ctx.Err(); err != nil {
return witness.State{}, err
}
st, _ := c.readQuorum(ctx)
return st, nil
}
// readQuorum queries all N servers in parallel and returns the
// majority State + a quorum-ok flag.
//
// Quorum rule: count distinct State values (by encoded value); if
// any single value has ≥ quorum agreement, it wins. Otherwise
// ok=false.
//
// Empty TXT (no record) is a valid value (= "free"). N readable
// "free" responses count as quorum.
func (c *DNSQuorumClient) readQuorum(ctx context.Context) (witness.State, bool) {
type result struct {
val string
err error
}
results := make([]result, len(c.Servers))
var wg sync.WaitGroup
for i, srv := range c.Servers {
i, srv := i, srv
wg.Add(1)
go func() {
defer wg.Done()
vals, err := c.Reader.ReadTXT(ctx, srv, c.fqdn())
if err != nil {
results[i] = result{err: err}
return
}
// First TXT entry wins. Empty slice → "" (= free).
val := ""
if len(vals) > 0 {
val = vals[0]
}
results[i] = result{val: val}
}()
}
wg.Wait()
// Count distinct successful responses.
tally := map[string]int{}
for _, r := range results {
if r.err != nil {
continue
}
tally[r.val]++
}
q := c.quorum()
// Find the value with the largest tally. We need DETERMINISTIC
// tie-breaking so a 1+1+1 split is reported as "no quorum"
// reliably.
bestVal := ""
best := 0
for val, cnt := range tally {
switch {
case cnt > best:
bestVal, best = val, cnt
case cnt == best && val < bestVal:
// Lexicographic tie-break — stable across runs.
bestVal = val
}
}
if best < q {
return witness.State{}, false
}
st, err := decodeValue(bestVal)
if err != nil {
return witness.State{}, false
}
return st, true
}
// writeQuorum PUTs the new value on all N servers in parallel and
// returns nil iff ≥ quorum acks. On sub-quorum success we log nothing
// here — the caller (controller) decides how to react (typically: a
// follow-up Read will detect drift and surface ErrLeaseLost on next
// Renew).
//
// We DELETE-on-empty-value to avoid leaving stale records.
func (c *DNSQuorumClient) writeQuorum(ctx context.Context, value string, ttl time.Duration) error {
if c.Writer == nil {
return errors.New("dnsquorum: Writer not configured (Phase-1 POC needs PDM /v1/txt — K-Cont-{4|5})")
}
errs := make([]error, len(c.Servers))
var wg sync.WaitGroup
for i, srv := range c.Servers {
i, srv := i, srv
wg.Add(1)
go func() {
defer wg.Done()
errs[i] = c.Writer.WriteTXT(ctx, srv, c.fqdn(), value, ttl)
}()
}
wg.Wait()
acks := 0
var firstErr error
for _, err := range errs {
if err == nil {
acks++
} else if firstErr == nil {
firstErr = err
}
}
if acks < c.quorum() {
if firstErr != nil {
return fmt.Errorf("dnsquorum: write quorum %d/%d: %w", acks, c.quorum(), firstErr)
}
return fmt.Errorf("dnsquorum: write quorum %d/%d", acks, c.quorum())
}
return nil
}
// encodeValue serializes a State to the wire format
// `<holder>|<acquired-at>|<expires-at>|<generation>`. Empty State →
// empty string.
//
// The `|` separator is chosen because RFC3339 timestamps contain `:`
// (time component). PowerDNS escapes characters in TXT records via
// the standard quoting rules; `|` is plain ASCII and survives the
// PATCH→DNS→LookupTXT round-trip without translation.
//
// AcquiredAt is preserved so re-acquire-by-same-holder doesn't reset
// the original-acquisition timestamp (matches K-Cont-2 contract item
// 4 — same-holder Acquire = lease extension, not a new acquisition).
func encodeValue(s witness.State) string {
if s.Holder == "" && s.Generation == 0 {
return ""
}
return fmt.Sprintf("%s|%s|%s|%d",
s.Holder,
s.AcquiredAt.UTC().Format(time.RFC3339),
s.ExpiresAt.UTC().Format(time.RFC3339),
s.Generation,
)
}
// decodeValue parses the wire format back into a State. Empty string
// returns the zero State (free slot).
//
// Backward-compat: also accepts the 3-field shape (no AcquiredAt) for
// records written by an older controller version mid-rollout — the
// AcquiredAt is left zero in that case (acceptable: the same-holder
// preservation invariant degrades to a fresh-acquisition timestamp,
// not a wrong one).
func decodeValue(v string) (witness.State, error) {
v = strings.TrimSpace(v)
if v == "" {
return witness.State{}, nil
}
parts := strings.Split(v, "|")
switch len(parts) {
case 4:
return decode4(parts)
case 3:
// Legacy 3-field shape — degrade gracefully.
return decode3(parts)
}
// A bare numeric value like "5" is treated as a released state
// with that generation (defensive — let it survive downgrade).
if g, err := strconv.ParseInt(v, 10, 64); err == nil {
return witness.State{Generation: g}, nil
}
return witness.State{}, fmt.Errorf("dnsquorum: malformed TXT %q", v)
}
func decode4(parts []string) (witness.State, error) {
holder, acquiredStr, expiresStr := parts[0], parts[1], parts[2]
gen, err := strconv.ParseInt(parts[3], 10, 64)
if err != nil {
return witness.State{}, fmt.Errorf("dnsquorum: parse generation %q: %w", parts[3], err)
}
st := witness.State{Holder: holder, Generation: gen}
if acquiredStr != "" {
t, err := time.Parse(time.RFC3339, acquiredStr)
if err != nil {
return st, fmt.Errorf("dnsquorum: parse acquiredAt %q: %w", acquiredStr, err)
}
st.AcquiredAt = t
}
if expiresStr != "" {
t, err := time.Parse(time.RFC3339, expiresStr)
if err != nil {
return st, fmt.Errorf("dnsquorum: parse expiresAt %q: %w", expiresStr, err)
}
st.ExpiresAt = t
}
return st, nil
}
func decode3(parts []string) (witness.State, error) {
holder, expiresStr := parts[0], parts[1]
gen, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return witness.State{}, fmt.Errorf("dnsquorum: parse generation %q: %w", parts[2], err)
}
st := witness.State{Holder: holder, Generation: gen}
if expiresStr != "" {
t, err := time.Parse(time.RFC3339, expiresStr)
if err != nil {
return st, fmt.Errorf("dnsquorum: parse expiresAt %q: %w", expiresStr, err)
}
st.ExpiresAt = t
}
return st, nil
}
// SortedServers returns Servers sorted lexically — used by tests for
// deterministic output and exposed as a helper for diagnostics. Pure
// function, no I/O.
func (c *DNSQuorumClient) SortedServers() []string {
out := append([]string(nil), c.Servers...)
sort.Strings(out)
return out
}
// Compile-time assertion that DNSQuorumClient satisfies witness.Client.
var _ witness.Client = (*DNSQuorumClient)(nil)
// ----------------------------------------------------------------------
// std-lib TXTReader implementation — production read path
// StdLibTXTReader uses net.Resolver with DialContext to target a
// specific authoritative server. No external DNS-library dependency.
type StdLibTXTReader struct {
Timeout time.Duration
}
// NewStdLibTXTReader returns a TXTReader using the std-lib resolver.
func NewStdLibTXTReader(timeout time.Duration) *StdLibTXTReader {
if timeout <= 0 {
timeout = 2 * time.Second
}
return &StdLibTXTReader{Timeout: timeout}
}
// ReadTXT queries `fqdn` TXT records at the specified server.
//
// `server` accepts either "host" (port 53 implied) or "host:port".
func (r *StdLibTXTReader) ReadTXT(ctx context.Context, server, fqdn string) ([]string, error) {
addr := server
if !strings.Contains(addr, ":") {
addr = net.JoinHostPort(addr, "53")
}
res := &net.Resolver{
PreferGo: true,
Dial: func(dctx context.Context, network, _ string) (net.Conn, error) {
d := net.Dialer{Timeout: r.Timeout}
// Force connection to the specific authoritative
// server. We always dial UDP; failover to TCP would
// require a second round-trip we can skip for the
// short TXT values we use.
return d.DialContext(dctx, "udp", addr)
},
}
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
vals, err := res.LookupTXT(ctx, fqdn)
if err != nil {
// NXDOMAIN / NoData are normal for a "free" slot — return
// empty + nil.
var derr *net.DNSError
if errors.As(err, &derr) && (derr.IsNotFound) {
return nil, nil
}
return nil, fmt.Errorf("dnsquorum: LookupTXT %s @ %s: %w", fqdn, addr, err)
}
return vals, nil
}

View File

@ -0,0 +1,450 @@
// Tests for DNSQuorumClient.
//
// We mock the TXT read/write side with an in-memory map keyed by
// (server, fqdn). The map enforces atomic per-server PUTs so the
// quorum logic is exercised faithfully — split-brain, partial
// failures, generation skew — without standing up real PowerDNS.
//
// The shared parametric contract suite from
// `internal/witness/testing` then runs against DNSQuorumClient — a
// behavioral diff between the in-memory reference and the quorum
// surface fails here.
package dnsquorum
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/openova-io/openova/core/controllers/continuum/internal/witness"
contracttest "github.com/openova-io/openova/core/controllers/continuum/internal/witness/testing"
)
// fakeBackend mocks N authoritative DNS servers. Each server holds
// an independent map (server → fqdn → value). Writes are atomic
// per-server. Tests can FAIL specific servers (writes return error,
// reads return error) to simulate quorum loss.
type fakeBackend struct {
mu sync.Mutex
servers map[string]map[string]string
failed map[string]bool
}
func newFakeBackend(servers []string) *fakeBackend {
b := &fakeBackend{
servers: map[string]map[string]string{},
failed: map[string]bool{},
}
for _, s := range servers {
b.servers[s] = map[string]string{}
}
return b
}
func (b *fakeBackend) WriteTXT(_ context.Context, server, fqdn, value string, _ time.Duration) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.failed[server] {
return fmt.Errorf("server %s unavailable", server)
}
if value == "" {
delete(b.servers[server], fqdn)
return nil
}
b.servers[server][fqdn] = value
return nil
}
func (b *fakeBackend) ReadTXT(_ context.Context, server, fqdn string) ([]string, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.failed[server] {
return nil, fmt.Errorf("server %s unavailable", server)
}
v, ok := b.servers[server][fqdn]
if !ok || v == "" {
return nil, nil
}
return []string{v}, nil
}
// fail marks server as down (writes error, reads error).
func (b *fakeBackend) fail(server string) {
b.mu.Lock()
defer b.mu.Unlock()
b.failed[server] = true
}
// recover re-enables a previously failed server.
func (b *fakeBackend) recover(server string) {
b.mu.Lock()
defer b.mu.Unlock()
delete(b.failed, server)
}
// setRaw lets tests inject divergent state on a specific server (for
// split-brain scenarios).
func (b *fakeBackend) setRaw(server, fqdn, value string) {
b.mu.Lock()
defer b.mu.Unlock()
if value == "" {
delete(b.servers[server], fqdn)
return
}
b.servers[server][fqdn] = value
}
// TestDNSQuorum_ContractSuite runs the parametric witness contract
// against DNSQuorumClient over a fake-backend with 3 healthy servers.
// Behavioral drift between the in-memory reference and the quorum
// surface fails here.
func TestDNSQuorum_ContractSuite(t *testing.T) {
t.Parallel()
contracttest.RunContractSuite(t, func() *contracttest.Backend {
servers := []string{"ns1", "ns2", "ns3"}
be := newFakeBackend(servers)
mkClient := func(slot string) witness.Client {
c, err := New(servers, "tsig", "lease.openova.io", slot, be, be)
if err != nil {
t.Fatalf("New: %v", err)
}
return c
}
return &contracttest.Backend{
A: mkClient("ns/cr-main"),
B: mkClient("ns/cr-main"),
Other: mkClient("ns/cr-other"),
// DNS-quorum has no in-band clock — the witness clock
// IS wall-clock (TXT TTLs encode an absolute
// expires-at). Advance must really sleep.
Advance: time.Sleep,
}
})
}
// TestDNSQuorum_ConstructorValidation
func TestDNSQuorum_ConstructorValidation(t *testing.T) {
t.Parallel()
cases := []struct {
name string
servers []string
domain string
slot string
wantErr bool
}{
{"too few servers", []string{"a", "b"}, "lease.openova.io", "ns/cr", true},
{"missing domain", []string{"a", "b", "c"}, "", "ns/cr", true},
{"missing slot", []string{"a", "b", "c"}, "lease.openova.io", "", true},
{"happy path", []string{"a", "b", "c"}, "lease.openova.io", "ns/cr", false},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
be := newFakeBackend(tc.servers)
_, err := New(tc.servers, "k", tc.domain, tc.slot, be, be)
if tc.wantErr {
if err == nil {
t.Fatalf("expected error")
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
})
}
}
// TestDNSQuorum_QuorumWith3of3 — happy path: all three servers ack.
func TestDNSQuorum_QuorumWith3of3(t *testing.T) {
t.Parallel()
servers := []string{"ns1", "ns2", "ns3"}
be := newFakeBackend(servers)
c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be)
st, err := c.Acquire(context.Background(), "fsn", 30*time.Second)
if err != nil {
t.Fatalf("Acquire: %v", err)
}
if st.Holder != "fsn" {
t.Fatalf("Holder = %q want fsn", st.Holder)
}
// All three servers should have the value.
for _, s := range servers {
v, _ := be.ReadTXT(context.Background(), s, c.fqdn())
if len(v) == 0 || v[0] == "" {
t.Fatalf("server %s missing value", s)
}
}
}
// TestDNSQuorum_QuorumWith2of3_AcquireSucceeds — one server is
// failed before Acquire; the remaining 2/3 are quorum, Acquire must
// succeed.
func TestDNSQuorum_QuorumWith2of3_AcquireSucceeds(t *testing.T) {
t.Parallel()
servers := []string{"ns1", "ns2", "ns3"}
be := newFakeBackend(servers)
be.fail("ns3")
c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be)
st, err := c.Acquire(context.Background(), "fsn", 30*time.Second)
if err != nil {
t.Fatalf("Acquire 2-of-3 failed: %v", err)
}
if st.Holder != "fsn" {
t.Fatalf("Holder = %q want fsn", st.Holder)
}
}
// TestDNSQuorum_BelowQuorum_AcquireFails — two of three servers are
// failed, leaving only 1/3 healthy; Acquire fails defensively.
func TestDNSQuorum_BelowQuorum_AcquireFails(t *testing.T) {
t.Parallel()
servers := []string{"ns1", "ns2", "ns3"}
be := newFakeBackend(servers)
be.fail("ns2")
be.fail("ns3")
c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be)
_, err := c.Acquire(context.Background(), "fsn", 30*time.Second)
if !errors.Is(err, witness.ErrLeaseHeldByAnother) {
t.Fatalf("Acquire below-quorum: err = %v want ErrLeaseHeldByAnother (defensive)", err)
}
}
// TestDNSQuorum_SplitBrain_1_1_1_TreatedAsHeldByAnother — three
// distinct values across three servers. Defensive: no quorum →
// ErrLeaseHeldByAnother.
func TestDNSQuorum_SplitBrain_1_1_1_TreatedAsHeldByAnother(t *testing.T) {
t.Parallel()
servers := []string{"ns1", "ns2", "ns3"}
be := newFakeBackend(servers)
c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be)
fqdn := c.fqdn()
be.setRaw("ns1", fqdn, fmt.Sprintf("fsn|%s|5", time.Now().Add(time.Minute).UTC().Format(time.RFC3339)))
be.setRaw("ns2", fqdn, fmt.Sprintf("hel|%s|5", time.Now().Add(time.Minute).UTC().Format(time.RFC3339)))
be.setRaw("ns3", fqdn, fmt.Sprintf("ash|%s|5", time.Now().Add(time.Minute).UTC().Format(time.RFC3339)))
_, err := c.Acquire(context.Background(), "iad", 30*time.Second)
if !errors.Is(err, witness.ErrLeaseHeldByAnother) {
t.Fatalf("split-brain Acquire: err = %v want ErrLeaseHeldByAnother", err)
}
}
// TestDNSQuorum_2of3_DivergentValue_QuorumWins — two of three
// servers agree on holder=fsn, third has a stale "hel" value.
// Acquire by hel must fail (held by another); Acquire by fsn must
// succeed (re-acquire).
func TestDNSQuorum_2of3_DivergentValue_QuorumWins(t *testing.T) {
t.Parallel()
servers := []string{"ns1", "ns2", "ns3"}
be := newFakeBackend(servers)
c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be)
fqdn := c.fqdn()
expires := time.Now().Add(time.Minute).UTC().Format(time.RFC3339)
val := fmt.Sprintf("fsn|%s|7", expires)
be.setRaw("ns1", fqdn, val)
be.setRaw("ns2", fqdn, val)
be.setRaw("ns3", fqdn, fmt.Sprintf("hel|%s|7", expires))
if _, err := c.Acquire(context.Background(), "hel", 30*time.Second); !errors.Is(err, witness.ErrLeaseHeldByAnother) {
t.Fatalf("hel Acquire: err = %v want ErrLeaseHeldByAnother", err)
}
if _, err := c.Acquire(context.Background(), "fsn", 30*time.Second); err != nil {
t.Fatalf("fsn re-Acquire: %v", err)
}
}
// TestDNSQuorum_FactoryFromCfg — the registered factory parses cfg
// correctly and resolves SecretRef TSIG keys.
func TestDNSQuorum_FactoryFromCfg(t *testing.T) {
t.Parallel()
secrets := witness.SecretReaderFunc(func(_ context.Context, name, key string) ([]byte, error) {
if name != "tsig-secret" || key != "tsig" {
return nil, errors.New("not found")
}
return []byte("real-tsig-key"), nil
})
cfg := map[string]any{
"slot": "ns/cr",
"dnsServers": []any{"ns1", "ns2", "ns3"},
"domain": "lease.openova.io",
"tsigSecretRef": map[string]any{
"name": "tsig-secret",
"key": "tsig",
},
}
cli, err := factory(cfg, secrets)
if err != nil {
t.Fatalf("factory: %v", err)
}
c := cli.(*DNSQuorumClient)
if c.TSIGKey != "real-tsig-key" {
t.Fatalf("TSIGKey = %q want real-tsig-key", c.TSIGKey)
}
if c.Domain != "lease.openova.io" {
t.Fatalf("Domain = %q want lease.openova.io", c.Domain)
}
if len(c.Servers) != 3 {
t.Fatalf("Servers len = %d want 3", len(c.Servers))
}
}
func TestDNSQuorum_FactoryAcceptsResolversField(t *testing.T) {
t.Parallel()
cfg := map[string]any{
"slot": "ns/cr",
"resolvers": []any{"1.1.1.1", "8.8.8.8", "9.9.9.9"},
"domain": "lease.openova.io",
"tsigKey": "k",
}
cli, err := factory(cfg, nil)
if err != nil {
t.Fatalf("factory: %v", err)
}
c := cli.(*DNSQuorumClient)
if len(c.Servers) != 3 {
t.Fatalf("Servers len = %d want 3", len(c.Servers))
}
}
func TestDNSQuorum_FactoryRejectsMissingFields(t *testing.T) {
t.Parallel()
cases := []struct {
name string
cfg map[string]any
}{
{"empty", map[string]any{}},
{"missing servers", map[string]any{"slot": "x", "tsigKey": "k", "domain": "lease.openova.io"}},
{"too few servers", map[string]any{"slot": "x", "dnsServers": []any{"a", "b"}, "domain": "lease.openova.io", "tsigKey": "k"}},
{
"secretRef without reader",
map[string]any{
"slot": "x",
"dnsServers": []any{"a", "b", "c"},
"domain": "lease.openova.io",
"tsigSecretRef": map[string]any{
"name": "n",
"key": "k",
},
},
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
_, err := factory(tc.cfg, nil)
if err == nil {
t.Fatalf("expected error")
}
})
}
}
// TestDNSQuorum_EncodeDecodeRoundTrip — wire-format invariant.
func TestDNSQuorum_EncodeDecodeRoundTrip(t *testing.T) {
t.Parallel()
now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC)
cases := []witness.State{
{},
{Holder: "fsn", AcquiredAt: now, ExpiresAt: now.Add(30 * time.Second), Generation: 1},
{Holder: "hel-deu-eu-1", AcquiredAt: now, ExpiresAt: now.Add(30 * time.Second), Generation: 999},
}
for i, st := range cases {
v := encodeValue(st)
got, err := decodeValue(v)
if err != nil {
t.Fatalf("case %d: decode: %v", i, err)
}
if got.Holder != st.Holder {
t.Errorf("case %d: Holder = %q want %q", i, got.Holder, st.Holder)
}
if got.Generation != st.Generation {
t.Errorf("case %d: Generation = %d want %d", i, got.Generation, st.Generation)
}
if !got.AcquiredAt.Equal(st.AcquiredAt) {
t.Errorf("case %d: AcquiredAt = %v want %v", i, got.AcquiredAt, st.AcquiredAt)
}
if !got.ExpiresAt.Equal(st.ExpiresAt) {
t.Errorf("case %d: ExpiresAt = %v want %v", i, got.ExpiresAt, st.ExpiresAt)
}
}
}
// TestDNSQuorum_DecodeLegacy3Field — backward-compat: an older
// controller may have written `<holder>|<expires>|<gen>` records.
// Decode must accept and degrade gracefully (AcquiredAt zero).
func TestDNSQuorum_DecodeLegacy3Field(t *testing.T) {
t.Parallel()
now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC)
v := fmt.Sprintf("fsn|%s|7", now.Format(time.RFC3339))
st, err := decodeValue(v)
if err != nil {
t.Fatalf("decode legacy: %v", err)
}
if st.Holder != "fsn" {
t.Errorf("Holder = %q", st.Holder)
}
if st.Generation != 7 {
t.Errorf("Generation = %d", st.Generation)
}
if !st.ExpiresAt.Equal(now) {
t.Errorf("ExpiresAt = %v want %v", st.ExpiresAt, now)
}
}
// TestDNSQuorum_EncodeSlot — DNS-label safety.
func TestDNSQuorum_EncodeSlot(t *testing.T) {
t.Parallel()
cases := []struct {
in, want string
}{
{"foo", "foo"},
{"ns/cr", "ns_cr"},
{"acme/api-prod", "acme_api-prod"},
{"a/b/c", "a_b_c"},
}
for _, tc := range cases {
if got := encodeSlot(tc.in); got != tc.want {
t.Errorf("encodeSlot(%q) = %q want %q", tc.in, got, tc.want)
}
}
}
// TestDNSQuorum_WriterNotConfigured — read-only DNSQuorumClient
// surfaces a clear error on write attempts.
func TestDNSQuorum_WriterNotConfigured(t *testing.T) {
t.Parallel()
servers := []string{"a", "b", "c"}
be := newFakeBackend(servers)
c, _ := New(servers, "k", "lease.openova.io", "ns/cr", nil, be)
_, err := c.Acquire(context.Background(), "fsn", 30*time.Second)
if err == nil {
t.Fatalf("expected error")
}
if !strings.Contains(err.Error(), "Writer not configured") {
t.Fatalf("got: %v", err)
}
}
// TestDNSQuorum_ContextCancel — cancel propagates.
func TestDNSQuorum_ContextCancel(t *testing.T) {
t.Parallel()
servers := []string{"a", "b", "c"}
be := newFakeBackend(servers)
c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be)
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := c.Acquire(ctx, "fsn", 30*time.Second); !errors.Is(err, context.Canceled) {
t.Fatalf("Acquire after cancel: %v", err)
}
}

View File

@ -0,0 +1,356 @@
// Package testing exposes the witness behavioral contract suite as a
// parametric helper. Every concrete witness.Client implementation
// (InMemoryClient, cloudflarekv.CFKVClient, dnsquorum.DNSQuorumClient)
// runs THIS suite against itself so the on-the-wire CAS path matches
// the in-memory reference exactly.
//
// Why a separate package: putting the contract suite here (rather than
// in `witness/`) lets the cloudflarekv + dnsquorum impls IMPORT it
// without creating a test-only dependency cycle (the impls live in
// child packages of `witness/`, so they can import `witness/testing`
// without dragging the impl-specific httptest fixtures into
// `witness/`).
//
// The suite is parametric on a Backend factory. The factory returns:
//
// - Two Client instances bound to the SAME slot (so race tests can
// simulate two regions contending for the same lease)
// - A separate Client on a DIFFERENT slot (slot-isolation test)
// - An Advance(d) hook that fast-forwards time by d (for TTL tests)
//
// Implementations that can't fast-forward (cloudflarekv against a
// real Worker; dnsquorum against real PowerDNS) supply an Advance
// that real-time-sleeps through `d`. The contract suite picks small
// TTLs (≤ 200ms) so a real-time sleep stays bounded.
//
// Per K-Cont-2's K-Cont-3 concerns (item h), the contract verifies:
//
// 1. CAS atomicity — Acquire rejects when the slot is held by
// another non-expired holder.
// 2. Renew failure mode — Renew returns ErrLeaseLost when TTL has
// elapsed OR when another holder owns the slot.
// 3. Release idempotency — non-holder Release is a no-op.
// 4. Generation monotonicity — Acquire AND Renew bump State.Generation.
// 5. Slot isolation — different cfg["slot"] values produce
// independent leases.
// 6. Witness clock skew — verified per-impl: server-authoritative
// for CF (the server stamps timestamps + bumps Generation);
// fence-token + 2-of-3 for DNS-quorum (any single resolver may
// drift by one generation without breaking quorum).
// 7. Witness-side TTL eviction — Read returns "free" after expiry.
package testing
import (
"context"
"errors"
"testing"
"time"
"github.com/openova-io/openova/core/controllers/continuum/internal/witness"
)
// Backend is what each impl-specific test file constructs and passes
// to RunContractSuite. It bundles two same-slot clients (for CAS race
// scenarios), one different-slot client (for isolation), and an
// Advance hook for TTL tests.
type Backend struct {
// A and B are two Client instances bound to the SAME slot. Tests
// use them to simulate two regions racing for the same lease.
A witness.Client
B witness.Client
// Other is a Client bound to a DIFFERENT slot — used to verify
// slot isolation (an Acquire on Other must not affect A/B).
Other witness.Client
// Advance moves the witness's effective time forward by d.
// Implementations that have an injectable clock (InMemoryClient)
// implement this as a clock-set; impls that only support
// real-time (cloudflarekv against a live Worker) implement as
// time.Sleep(d). The contract suite never asks for d > 200ms so
// a real sleep is bounded.
Advance func(d time.Duration)
// Cleanup is called by RunContractSuite at the end. Optional.
Cleanup func()
}
// RunContractSuite runs the full witness behavioral contract against
// the supplied Backend factory. The factory is invoked once per
// sub-test so each test gets a clean slot.
//
// Usage from inmemory_test.go:
//
// contracttest.RunContractSuite(t, func() *contracttest.Backend {
// store := witness.NewInMemoryStore()
// clk := &fakeClock{t: time.Date(...)}
// store.SetClock(clk.Now)
// return &contracttest.Backend{
// A: store.Client("ns/cr"),
// B: store.Client("ns/cr"),
// Other: store.Client("ns/other"),
// Advance: clk.Advance,
// }
// })
//
// Each implementation MUST pass every sub-test or the production
// surfaces will diverge.
func RunContractSuite(t *testing.T, factory func() *Backend) {
t.Helper()
cases := []struct {
name string
fn func(t *testing.T, b *Backend)
}{
{"AcquireOnEmptySlot", testAcquireOnEmpty},
{"AcquireBlockedByAnother", testAcquireBlocked},
{"AcquireAfterExpiry", testAcquireAfterExpiry},
{"AcquireSameHolderExtendsTTL", testAcquireSameHolder},
{"RenewExtendsTTLAndBumpsGeneration", testRenewExtends},
{"RenewAfterExpiryReturnsLost", testRenewAfterExpiry},
{"RenewByNonHolderReturnsLost", testRenewByNonHolder},
{"ReleaseIdempotent", testReleaseIdempotent},
{"ReleaseByNonHolderIsNoOp", testReleaseByNonHolder},
{"SlotIsolation", testSlotIsolation},
{"GenerationMonotonicityAcrossOps", testGenerationMonotonic},
{"ReadOnEmptySlot", testReadOnEmpty},
{"ReadAfterTTLEvictionReportsFree", testReadAfterTTLEviction},
{"ContextCancel", testContextCancel},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
b := factory()
if b.Cleanup != nil {
t.Cleanup(b.Cleanup)
}
tc.fn(t, b)
})
}
}
// shortTTL is the TTL the suite uses for tests that need expiry. It
// is small (100ms) so impls that can't fast-forward time pay a
// bounded real-time penalty.
const shortTTL = 100 * time.Millisecond
// longTTL is used when the test does not depend on expiry — keeping
// it well above the test's wall-clock so flakes from CI scheduling
// jitter don't expire a lease mid-test.
const longTTL = 30 * time.Second
func testAcquireOnEmpty(t *testing.T, b *Backend) {
st, err := b.A.Acquire(context.Background(), "fsn", longTTL)
if err != nil {
t.Fatalf("Acquire: %v", err)
}
if st.Holder != "fsn" {
t.Fatalf("Holder = %q want fsn", st.Holder)
}
if st.Generation < 1 {
t.Fatalf("Generation = %d want >= 1", st.Generation)
}
if st.ExpiresAt.IsZero() {
t.Fatalf("ExpiresAt is zero")
}
if !st.ExpiresAt.After(st.AcquiredAt) {
t.Fatalf("ExpiresAt %v not after AcquiredAt %v", st.ExpiresAt, st.AcquiredAt)
}
}
func testAcquireBlocked(t *testing.T, b *Backend) {
if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil {
t.Fatalf("first Acquire: %v", err)
}
if _, err := b.B.Acquire(context.Background(), "hel", longTTL); !errors.Is(err, witness.ErrLeaseHeldByAnother) {
t.Fatalf("second Acquire: err = %v want ErrLeaseHeldByAnother", err)
}
}
func testAcquireAfterExpiry(t *testing.T, b *Backend) {
if _, err := b.A.Acquire(context.Background(), "fsn", shortTTL); err != nil {
t.Fatalf("first Acquire: %v", err)
}
b.Advance(shortTTL + 50*time.Millisecond)
st, err := b.B.Acquire(context.Background(), "hel", longTTL)
if err != nil {
t.Fatalf("post-expiry Acquire: %v", err)
}
if st.Holder != "hel" {
t.Fatalf("Holder = %q want hel", st.Holder)
}
}
func testAcquireSameHolder(t *testing.T, b *Backend) {
st1, err := b.A.Acquire(context.Background(), "fsn", longTTL)
if err != nil {
t.Fatalf("first Acquire: %v", err)
}
b.Advance(10 * time.Millisecond)
st2, err := b.A.Acquire(context.Background(), "fsn", longTTL)
if err != nil {
t.Fatalf("second Acquire (same holder): %v", err)
}
if !st2.AcquiredAt.Equal(st1.AcquiredAt) {
t.Fatalf("AcquiredAt drifted on re-acquire: %v vs %v", st2.AcquiredAt, st1.AcquiredAt)
}
if !st2.ExpiresAt.After(st1.ExpiresAt) && !st2.ExpiresAt.Equal(st1.ExpiresAt) {
// For server-authoritative impls the new ExpiresAt may equal
// the old one if the server's clock granularity is coarse;
// allow equal but never less.
t.Fatalf("ExpiresAt regressed on re-acquire: %v vs %v", st2.ExpiresAt, st1.ExpiresAt)
}
}
func testRenewExtends(t *testing.T, b *Backend) {
st1, err := b.A.Acquire(context.Background(), "fsn", longTTL)
if err != nil {
t.Fatalf("Acquire: %v", err)
}
b.Advance(10 * time.Millisecond)
st2, err := b.A.Renew(context.Background(), "fsn", longTTL)
if err != nil {
t.Fatalf("Renew: %v", err)
}
if st2.Generation <= st1.Generation {
t.Fatalf("Renew did not bump Generation: %d -> %d", st1.Generation, st2.Generation)
}
if st2.ExpiresAt.Before(st1.ExpiresAt) {
t.Fatalf("Renew did not extend ExpiresAt: %v -> %v", st1.ExpiresAt, st2.ExpiresAt)
}
}
func testRenewAfterExpiry(t *testing.T, b *Backend) {
if _, err := b.A.Acquire(context.Background(), "fsn", shortTTL); err != nil {
t.Fatalf("Acquire: %v", err)
}
b.Advance(shortTTL + 50*time.Millisecond)
if _, err := b.A.Renew(context.Background(), "fsn", longTTL); !errors.Is(err, witness.ErrLeaseLost) {
t.Fatalf("Renew after expiry: err = %v want ErrLeaseLost", err)
}
}
func testRenewByNonHolder(t *testing.T, b *Backend) {
if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil {
t.Fatalf("Acquire: %v", err)
}
if _, err := b.B.Renew(context.Background(), "hel", longTTL); !errors.Is(err, witness.ErrLeaseLost) {
t.Fatalf("Renew by non-holder: err = %v want ErrLeaseLost", err)
}
}
func testReleaseIdempotent(t *testing.T, b *Backend) {
if err := b.A.Release(context.Background(), "fsn"); err != nil {
t.Fatalf("Release on empty slot: %v", err)
}
if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil {
t.Fatalf("Acquire: %v", err)
}
if err := b.A.Release(context.Background(), "fsn"); err != nil {
t.Fatalf("Release: %v", err)
}
if err := b.A.Release(context.Background(), "fsn"); err != nil {
t.Fatalf("Release again: %v", err)
}
st, err := b.A.Read(context.Background())
if err != nil {
t.Fatalf("Read: %v", err)
}
if st.Holder != "" {
t.Fatalf("Holder = %q want empty after Release", st.Holder)
}
}
func testReleaseByNonHolder(t *testing.T, b *Backend) {
if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil {
t.Fatalf("Acquire: %v", err)
}
if err := b.B.Release(context.Background(), "hel"); err != nil {
t.Fatalf("Release by non-holder: %v", err)
}
st, _ := b.A.Read(context.Background())
if st.Holder != "fsn" {
t.Fatalf("Holder = %q want fsn — non-holder Release should not evict", st.Holder)
}
}
func testSlotIsolation(t *testing.T, b *Backend) {
if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil {
t.Fatalf("Acquire A: %v", err)
}
st, err := b.Other.Acquire(context.Background(), "hel", longTTL)
if err != nil {
t.Fatalf("Acquire Other: %v", err)
}
if st.Holder != "hel" {
t.Fatalf("Other.Holder = %q want hel", st.Holder)
}
}
func testGenerationMonotonic(t *testing.T, b *Backend) {
st1, err := b.A.Acquire(context.Background(), "fsn", longTTL)
if err != nil {
t.Fatalf("Acquire: %v", err)
}
st2, err := b.A.Renew(context.Background(), "fsn", longTTL)
if err != nil {
t.Fatalf("Renew: %v", err)
}
if st2.Generation <= st1.Generation {
t.Fatalf("Generation did not bump on Renew: %d -> %d", st1.Generation, st2.Generation)
}
st3, err := b.A.Acquire(context.Background(), "fsn", longTTL)
if err != nil {
t.Fatalf("re-Acquire: %v", err)
}
if st3.Generation <= st2.Generation {
t.Fatalf("Generation did not bump on re-Acquire: %d -> %d", st2.Generation, st3.Generation)
}
}
func testReadOnEmpty(t *testing.T, b *Backend) {
st, err := b.A.Read(context.Background())
if err != nil {
t.Fatalf("Read on empty: %v", err)
}
if st.Holder != "" {
t.Fatalf("empty slot Holder = %q want empty", st.Holder)
}
}
func testReadAfterTTLEviction(t *testing.T, b *Backend) {
if _, err := b.A.Acquire(context.Background(), "fsn", shortTTL); err != nil {
t.Fatalf("Acquire: %v", err)
}
b.Advance(shortTTL + 50*time.Millisecond)
// Definitive eviction proof: a different holder can now Acquire.
// (Comparing the State.ExpiresAt against time.Now() doesn't work
// for impls that use a server-side clock decoupled from
// wall-clock — the in-memory + CFKV impls stamp ExpiresAt with
// the witness's clock, not the test's wall-clock.)
st, err := b.B.Acquire(context.Background(), "hel", longTTL)
if err != nil {
t.Fatalf("post-expiry Acquire by other: %v — TTL eviction not honored", err)
}
if st.Holder != "hel" {
t.Fatalf("post-eviction Holder = %q want hel", st.Holder)
}
}
func testContextCancel(t *testing.T, b *Backend) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := b.A.Acquire(ctx, "fsn", longTTL); !errors.Is(err, context.Canceled) {
t.Fatalf("Acquire after cancel: err = %v want context.Canceled", err)
}
if _, err := b.A.Renew(ctx, "fsn", longTTL); !errors.Is(err, context.Canceled) {
t.Fatalf("Renew after cancel: err = %v want context.Canceled", err)
}
if err := b.A.Release(ctx, "fsn"); !errors.Is(err, context.Canceled) {
t.Fatalf("Release after cancel: err = %v want context.Canceled", err)
}
if _, err := b.A.Read(ctx); !errors.Is(err, context.Canceled) {
t.Fatalf("Read after cancel: err = %v want context.Canceled", err)
}
}

View File

@ -45,9 +45,10 @@ var ErrLeaseHeldByAnother = errors.New("witness: lease held by another holder")
// itself as demoted.
var ErrLeaseLost = errors.New("witness: lease lost")
// ErrNotImplemented — the K-Cont-3 implementations are not wired in
// this controller build. Returned by the Selector for kinds
// `cloudflare-kv` and `dns-quorum`.
// ErrNotImplemented — the requested kind has no registered factory
// in this build. K-Cont-3 (#1101) registers `cloudflare-kv` and
// `dns-quorum` via blank-import in continuum/cmd/main.go; tests can
// register fakes via Register.
var ErrNotImplemented = errors.New("witness: implementation not built into this controller")
// State is the full lease snapshot returned by Read.
@ -146,16 +147,101 @@ func (f SelectorFunc) Select(kind string, cfg map[string]any) (Client, error) {
return f(kind, cfg)
}
// DefaultSelector is the production Selector. K-Cont-2 returns
// ErrNotImplemented for the two real kinds (cloudflare-kv,
// dns-quorum) so the system fails CLOSED until K-Cont-3 lands. The
// special kind `in-memory` is recognised for envtest / smoke
// scenarios — DO NOT enable in production.
// Factory constructs a Client for a kind from cfg. Returned errors
// from the Factory propagate out of Selector.Select.
//
// Per K-Cont-3's contract, cfg always carries:
// - "slot" (string) — "<ns>/<name>" — mandatory; isolates per-CR
// - kind-specific keys (baseURL, dnsServers, tokenSecretRef, etc.)
//
// SecretReader (or nil) lets the Factory resolve K8s Secret refs
// without dragging client-go into the witness package.
type Factory func(cfg map[string]any, secrets SecretReader) (Client, error)
// SecretReader resolves a K8s SecretRef (name + key from a known
// namespace) into a plaintext value. The DefaultSelector wraps the
// controller's K8s client; tests pass an in-memory implementation.
//
// Per Inviolable Principle #5 the returned plaintext stays in memory
// only long enough to be handed to the wire client. NEVER log.
type SecretReader interface {
// ReadSecret returns the bytes at secretName/key. Implementations
// MUST NOT log the value.
ReadSecret(ctx context.Context, secretName, key string) ([]byte, error)
}
// SecretReaderFunc adapts a plain function to SecretReader.
type SecretReaderFunc func(ctx context.Context, secretName, key string) ([]byte, error)
// ReadSecret implements SecretReader.
func (f SecretReaderFunc) ReadSecret(ctx context.Context, secretName, key string) ([]byte, error) {
return f(ctx, secretName, key)
}
// registry holds package-global Factory bindings registered by impl
// packages at init() time (typically via blank-import in cmd/main.go).
//
// We use a registry rather than direct imports because the impl
// packages (cloudflarekv, dnsquorum) live UNDER `internal/witness/`
// and therefore CANNOT be imported by `internal/witness/` itself
// without a cycle. The registry pattern decouples the dispatch table
// from the concrete clients.
var (
registryMu sync.RWMutex
registry = map[string]Factory{}
)
// Register binds `factory` as the Client constructor for `kind`.
// Calling Register twice for the same kind PANICS — the second
// caller is almost certainly a duplicate import / wiring bug.
//
// Impl packages call Register from their init() function. cmd/main.go
// in the consuming binary blank-imports the impl packages it wants
// active. Tests can pre-register fakes by calling Register directly.
func Register(kind string, factory Factory) {
registryMu.Lock()
defer registryMu.Unlock()
if _, exists := registry[kind]; exists {
panic(fmt.Sprintf("witness: duplicate Register for kind %q", kind))
}
registry[kind] = factory
}
// Unregister removes a kind. Test-only — production code never
// calls this. (Public so test packages outside `witness/` can use it
// when wiring/unwiring fakes.)
func Unregister(kind string) {
registryMu.Lock()
defer registryMu.Unlock()
delete(registry, kind)
}
// lookup returns the registered Factory for kind, or false.
func lookup(kind string) (Factory, bool) {
registryMu.RLock()
defer registryMu.RUnlock()
f, ok := registry[kind]
return f, ok
}
// DefaultSelector is the production Selector. The `in-memory` kind
// is built-in (gated by InMemoryAllowed); `cloudflare-kv` and
// `dns-quorum` come from the package registry — when their impl
// packages are imported (via blank-import in cmd/main.go), they
// register a Factory at init() time.
//
// Per Inviolable Principle #5 the SecretReader is the ONLY path the
// real impls have to credentials — they never read env vars directly.
type DefaultSelector struct {
// InMemoryAllowed gates the `in-memory` kind. Default false in
// production; tests flip true.
InMemoryAllowed bool
// SecretReader resolves K8s Secret refs for the real impls. May
// be nil for tests that only exercise in-memory or fake-registered
// impls.
SecretReader SecretReader
// inMemoryShared is the shared backing store across all
// in-memory clients in this process — Continuum CRs in the same
// process share a witness so the multi-CR isolation tests work
@ -168,10 +254,6 @@ type DefaultSelector struct {
// Select implements Selector.
func (s *DefaultSelector) Select(kind string, cfg map[string]any) (Client, error) {
switch kind {
case "cloudflare-kv":
return nil, fmt.Errorf("%w: cloudflare-kv (K-Cont-3)", ErrNotImplemented)
case "dns-quorum":
return nil, fmt.Errorf("%w: dns-quorum (K-Cont-3)", ErrNotImplemented)
case "in-memory":
if !s.InMemoryAllowed {
return nil, fmt.Errorf("witness: in-memory selector is test-only; refused in production")
@ -181,15 +263,14 @@ func (s *DefaultSelector) Select(kind string, cfg map[string]any) (Client, error
if s.inMemoryShared == nil {
s.inMemoryShared = NewInMemoryStore()
}
// Every in-memory client gets its own per-CR slot keyed by
// the `slot` config field. Tests pass slot="ns/name" so each
// Continuum CR gets an isolated lease.
slot, _ := cfg["slot"].(string)
if slot == "" {
slot = "default"
}
return s.inMemoryShared.Client(slot), nil
default:
return nil, fmt.Errorf("witness: unknown kind %q (expected one of: cloudflare-kv, dns-quorum)", kind)
}
if f, ok := lookup(kind); ok {
return f(cfg, s.SecretReader)
}
return nil, fmt.Errorf("%w: %s", ErrNotImplemented, kind)
}

View File

@ -1,10 +1,18 @@
// Tests for the witness contract + InMemoryClient.
// Tests for the witness contract + InMemoryClient + Selector dispatch.
//
// These tests double as the contract spec: K-Cont-3's cloudflare-kv
// and dns-quorum implementations MUST run this same suite (or a
// faithful adaptation that drives a fake transport).
// As of K-Cont-3 (#1101) the BEHAVIORAL contract — Acquire / Renew /
// Release / Read invariants — is encoded in
// `internal/witness/testing/contract.go` and exported as
// `RunContractSuite(t, factoryFn)`. THIS file invokes that suite for
// the in-memory backend; the K-Cont-3 cloudflarekv + dnsquorum impls
// invoke the SAME suite from their own test files. Behavioral drift
// between the in-memory reference and a wire impl surfaces as a test
// failure in the impl's package.
//
// What stays here: tests for the Selector dispatch + State.IsHeldBy
// helper + the SelectorFunc adapter — all in-package surfaces.
package witness
package witness_test
import (
"context"
@ -12,10 +20,19 @@ import (
"sync"
"testing"
"time"
"github.com/openova-io/openova/core/controllers/continuum/internal/witness"
// Blank-import the K-Cont-3 impl packages so their init()
// register Factory bindings on the witness package registry.
// The cmd/main.go binary does the same — keeping the test
// import in sync ensures behaviour parity.
_ "github.com/openova-io/openova/core/controllers/continuum/internal/witness/cloudflarekv"
_ "github.com/openova-io/openova/core/controllers/continuum/internal/witness/dnsquorum"
contracttest "github.com/openova-io/openova/core/controllers/continuum/internal/witness/testing"
)
// frozenClock returns a clock that the test can advance manually.
// Each call to Now() returns the current value of t.
// frozenClock returns a clock the test can advance manually. Used by
// the in-memory backend factory below.
type frozenClock struct {
mu sync.Mutex
t time.Time
@ -37,217 +54,29 @@ func newClock() *frozenClock {
return &frozenClock{t: time.Date(2026, 5, 9, 0, 0, 0, 0, time.UTC)}
}
func TestInMemory_AcquireOnEmpty(t *testing.T) {
// TestInMemory_ContractSuite runs the full witness behavioral contract
// against InMemoryClient. The cloudflarekv + dnsquorum impls run the
// SAME suite — see their respective _test.go files. Any divergence
// fails here and there simultaneously.
func TestInMemory_ContractSuite(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
clk := newClock()
store.SetClock(clk.Now)
c := store.Client("slotA")
st, err := c.Acquire(context.Background(), "fsn", 30*time.Second)
if err != nil {
t.Fatalf("Acquire empty: %v", err)
}
if st.Holder != "fsn" {
t.Fatalf("Holder = %q want fsn", st.Holder)
}
if st.Generation != 1 {
t.Fatalf("Generation = %d want 1", st.Generation)
}
if st.AcquiredAt != clk.Now() {
t.Fatalf("AcquiredAt should equal clock now")
}
if got, want := st.ExpiresAt, clk.Now().Add(30*time.Second); got != want {
t.Fatalf("ExpiresAt = %v want %v", got, want)
}
}
func TestInMemory_AcquireBlockedByAnother(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
clk := newClock()
store.SetClock(clk.Now)
a := store.Client("slotA")
b := store.Client("slotA") // same slot
if _, err := a.Acquire(context.Background(), "fsn", 30*time.Second); err != nil {
t.Fatalf("first Acquire: %v", err)
}
if _, err := b.Acquire(context.Background(), "hel", 30*time.Second); !errors.Is(err, ErrLeaseHeldByAnother) {
t.Fatalf("second Acquire: err = %v want ErrLeaseHeldByAnother", err)
}
}
func TestInMemory_AcquireAfterExpiry(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
clk := newClock()
store.SetClock(clk.Now)
a := store.Client("slotA")
b := store.Client("slotA")
if _, err := a.Acquire(context.Background(), "fsn", 5*time.Second); err != nil {
t.Fatalf("first Acquire: %v", err)
}
clk.Advance(10 * time.Second) // past TTL
st, err := b.Acquire(context.Background(), "hel", 30*time.Second)
if err != nil {
t.Fatalf("post-expiry Acquire: %v", err)
}
if st.Holder != "hel" {
t.Fatalf("Holder = %q want hel", st.Holder)
}
}
func TestInMemory_AcquireSameHolderExtendsTTL(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
clk := newClock()
store.SetClock(clk.Now)
c := store.Client("slotA")
st1, err := c.Acquire(context.Background(), "fsn", 30*time.Second)
if err != nil {
t.Fatalf("first Acquire: %v", err)
}
clk.Advance(5 * time.Second)
st2, err := c.Acquire(context.Background(), "fsn", 30*time.Second)
if err != nil {
t.Fatalf("second Acquire (same holder): %v", err)
}
if st2.AcquiredAt != st1.AcquiredAt {
t.Fatalf("AcquiredAt drifted on re-acquire: %v vs %v", st2.AcquiredAt, st1.AcquiredAt)
}
if !st2.ExpiresAt.After(st1.ExpiresAt) {
t.Fatalf("ExpiresAt did not advance on re-acquire: %v not after %v", st2.ExpiresAt, st1.ExpiresAt)
}
}
func TestInMemory_RenewExtendsTTL(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
clk := newClock()
store.SetClock(clk.Now)
c := store.Client("slotA")
st1, err := c.Acquire(context.Background(), "fsn", 30*time.Second)
if err != nil {
t.Fatalf("Acquire: %v", err)
}
clk.Advance(10 * time.Second)
st2, err := c.Renew(context.Background(), "fsn", 30*time.Second)
if err != nil {
t.Fatalf("Renew: %v", err)
}
if !st2.ExpiresAt.After(st1.ExpiresAt) {
t.Fatalf("Renew did not extend ExpiresAt")
}
if st2.Generation <= st1.Generation {
t.Fatalf("Renew did not bump Generation")
}
}
func TestInMemory_RenewAfterExpiryReturnsLost(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
clk := newClock()
store.SetClock(clk.Now)
c := store.Client("slotA")
if _, err := c.Acquire(context.Background(), "fsn", 5*time.Second); err != nil {
t.Fatalf("Acquire: %v", err)
}
clk.Advance(10 * time.Second)
if _, err := c.Renew(context.Background(), "fsn", 30*time.Second); !errors.Is(err, ErrLeaseLost) {
t.Fatalf("Renew after expiry: err = %v want ErrLeaseLost", err)
}
}
func TestInMemory_RenewByNonHolderReturnsLost(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
clk := newClock()
store.SetClock(clk.Now)
a := store.Client("slotA")
b := store.Client("slotA")
if _, err := a.Acquire(context.Background(), "fsn", 30*time.Second); err != nil {
t.Fatalf("Acquire: %v", err)
}
if _, err := b.Renew(context.Background(), "hel", 30*time.Second); !errors.Is(err, ErrLeaseLost) {
t.Fatalf("Renew by non-holder: err = %v want ErrLeaseLost", err)
}
}
func TestInMemory_ReleaseIdempotent(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
c := store.Client("slotA")
if err := c.Release(context.Background(), "fsn"); err != nil {
t.Fatalf("Release on empty slot: %v", err)
}
if _, err := c.Acquire(context.Background(), "fsn", 30*time.Second); err != nil {
t.Fatalf("Acquire: %v", err)
}
if err := c.Release(context.Background(), "fsn"); err != nil {
t.Fatalf("Release: %v", err)
}
if err := c.Release(context.Background(), "fsn"); err != nil {
t.Fatalf("Release again: %v", err)
}
st, err := c.Read(context.Background())
if err != nil {
t.Fatalf("Read: %v", err)
}
if st.Holder != "" {
t.Fatalf("Holder = %q want empty after Release", st.Holder)
}
}
func TestInMemory_ReleaseByNonHolderIsNoOp(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
a := store.Client("slotA")
if _, err := a.Acquire(context.Background(), "fsn", 30*time.Second); err != nil {
t.Fatalf("Acquire: %v", err)
}
// Other region tries to release; should be a no-op (so a stale
// caller doesn't accidentally evict the live primary).
if err := a.Release(context.Background(), "hel"); err != nil {
t.Fatalf("Release by non-holder: %v", err)
}
st, _ := a.Read(context.Background())
if st.Holder != "fsn" {
t.Fatalf("Holder = %q want fsn", st.Holder)
}
}
func TestInMemory_SlotIsolation(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
a := store.Client("ns1/cr1")
b := store.Client("ns2/cr2")
if _, err := a.Acquire(context.Background(), "fsn", 30*time.Second); err != nil {
t.Fatalf("Acquire A: %v", err)
}
// b should be free regardless.
st, err := b.Acquire(context.Background(), "hel", 30*time.Second)
if err != nil {
t.Fatalf("Acquire B: %v", err)
}
if st.Holder != "hel" {
t.Fatalf("B.Holder = %q want hel", st.Holder)
}
contracttest.RunContractSuite(t, func() *contracttest.Backend {
store := witness.NewInMemoryStore()
clk := newClock()
store.SetClock(clk.Now)
return &contracttest.Backend{
A: store.Client("ns/cr-main"),
B: store.Client("ns/cr-main"), // SAME slot — race scenarios
Other: store.Client("ns/cr-other"),
Advance: clk.Advance,
}
})
}
func TestInMemory_State_IsHeldBy(t *testing.T) {
t.Parallel()
now := time.Now()
st := State{
st := witness.State{
Holder: "fsn",
ExpiresAt: now.Add(time.Minute),
}
@ -260,25 +89,14 @@ func TestInMemory_State_IsHeldBy(t *testing.T) {
if st.IsHeldBy("fsn", now.Add(2*time.Minute)) {
t.Fatalf("expected IsHeldBy(fsn, post-expiry) false")
}
if (State{}).IsHeldBy("fsn", now) {
if (witness.State{}).IsHeldBy("fsn", now) {
t.Fatalf("empty State should never claim a holder")
}
}
func TestDefaultSelector_NotImplemented(t *testing.T) {
t.Parallel()
s := &DefaultSelector{InMemoryAllowed: false}
for _, kind := range []string{"cloudflare-kv", "dns-quorum"} {
_, err := s.Select(kind, nil)
if !errors.Is(err, ErrNotImplemented) {
t.Fatalf("Select(%q) err = %v want ErrNotImplemented", kind, err)
}
}
}
func TestDefaultSelector_InMemoryRefusedInProd(t *testing.T) {
t.Parallel()
s := &DefaultSelector{InMemoryAllowed: false}
s := &witness.DefaultSelector{InMemoryAllowed: false}
_, err := s.Select("in-memory", nil)
if err == nil {
t.Fatalf("expected refusal for in-memory in production")
@ -287,7 +105,7 @@ func TestDefaultSelector_InMemoryRefusedInProd(t *testing.T) {
func TestDefaultSelector_InMemoryAllowed(t *testing.T) {
t.Parallel()
s := &DefaultSelector{InMemoryAllowed: true}
s := &witness.DefaultSelector{InMemoryAllowed: true}
cli, err := s.Select("in-memory", map[string]any{"slot": "ns/foo"})
if err != nil {
t.Fatalf("Select in-memory: %v", err)
@ -295,31 +113,46 @@ func TestDefaultSelector_InMemoryAllowed(t *testing.T) {
if cli == nil {
t.Fatalf("expected non-nil Client")
}
// Same selector + same slot should yield isolated CAS state. We
// can't compare clients directly (each call returns a new
// wrapper), but we can verify the underlying store carries state
// across selects.
cli2, _ := s.Select("in-memory", map[string]any{"slot": "ns/foo"})
if _, err := cli.Acquire(context.Background(), "fsn", 30*time.Second); err != nil {
t.Fatalf("Acquire: %v", err)
}
if _, err := cli2.Acquire(context.Background(), "hel", 30*time.Second); !errors.Is(err, ErrLeaseHeldByAnother) {
if _, err := cli2.Acquire(context.Background(), "hel", 30*time.Second); !errors.Is(err, witness.ErrLeaseHeldByAnother) {
t.Fatalf("shared store: err = %v want ErrLeaseHeldByAnother", err)
}
}
func TestDefaultSelector_UnknownKind(t *testing.T) {
t.Parallel()
s := &DefaultSelector{}
s := &witness.DefaultSelector{}
if _, err := s.Select("not-a-kind", nil); err == nil {
t.Fatalf("expected error for unknown kind")
}
}
// TestDefaultSelector_RealKindsConstructorFailure — for the real
// kinds (cloudflare-kv + dns-quorum) the DefaultSelector dispatches
// to the K-Cont-3 impls; their constructors error on missing
// required cfg keys (slot, baseURL/dnsServers, token/tsig). The
// dispatch must NOT return ErrNotImplemented.
func TestDefaultSelector_RealKindsConstructorFailure(t *testing.T) {
t.Parallel()
s := &witness.DefaultSelector{}
for _, kind := range []string{"cloudflare-kv", "dns-quorum"} {
_, err := s.Select(kind, nil)
if err == nil {
t.Fatalf("Select(%q) with empty cfg: expected error", kind)
}
if errors.Is(err, witness.ErrNotImplemented) {
t.Fatalf("Select(%q): K-Cont-3 should NOT return ErrNotImplemented (impls are wired); got %v", kind, err)
}
}
}
func TestSelectorFunc(t *testing.T) {
t.Parallel()
want := NewInMemoryStore().Client("x")
f := SelectorFunc(func(kind string, cfg map[string]any) (Client, error) {
want := witness.NewInMemoryStore().Client("x")
f := witness.SelectorFunc(func(kind string, cfg map[string]any) (witness.Client, error) {
return want, nil
})
got, err := f.Select("any", nil)
@ -330,23 +163,3 @@ func TestSelectorFunc(t *testing.T) {
t.Fatalf("SelectorFunc returned wrong client")
}
}
func TestInMemory_ContextCancel(t *testing.T) {
t.Parallel()
store := NewInMemoryStore()
c := store.Client("slot")
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := c.Acquire(ctx, "fsn", 30*time.Second); !errors.Is(err, context.Canceled) {
t.Fatalf("Acquire after cancel: %v", err)
}
if _, err := c.Renew(ctx, "fsn", 30*time.Second); !errors.Is(err, context.Canceled) {
t.Fatalf("Renew after cancel: %v", err)
}
if err := c.Release(ctx, "fsn"); !errors.Is(err, context.Canceled) {
t.Fatalf("Release after cancel: %v", err)
}
if _, err := c.Read(ctx); !errors.Is(err, context.Canceled) {
t.Fatalf("Read after cancel: %v", err)
}
}