diff --git a/core/controllers/continuum/cmd/main.go b/core/controllers/continuum/cmd/main.go index b34affaf..3a6bf9be 100644 --- a/core/controllers/continuum/cmd/main.go +++ b/core/controllers/continuum/cmd/main.go @@ -8,10 +8,11 @@ // switchover sequence per docs/SRE.md §2 + docs/MULTI-REGION-DNS.md. // // K-Cont-1 shipped the binary + chart + CI workflow + skeleton. -// K-Cont-2 (this slice) replaces the Reconcile body with the full -// per-CR goroutine + lease state machine + switchover sequencer + -// NATS audit publisher. K-Cont-3 wires the real lease witness -// implementations (cloudflare-kv + dns-quorum); K-Cont-4 ships the +// K-Cont-2 ships the per-CR goroutine + lease state machine + +// switchover sequencer + NATS audit publisher. K-Cont-3 (this +// build) wires the real lease witness implementations +// (cloudflare-kv + dns-quorum) — the impl packages register their +// Factory at init() time via blank-import below. K-Cont-4 ships the // Cloudflare Worker source. // // Configuration is environment-only — per @@ -33,6 +34,13 @@ // CATALYST_REGION — host-cluster name THIS controller represents (stamped on Witness.Acquire) // WITNESS_IN_MEMORY — "true" enables the in-memory witness selector (TEST ONLY; default false) // +// K-Cont-3 wires this: +// +// WITNESS_SECRET_NS — K8s namespace where lease-witness Secret refs are +// looked up (CF API token, TSIG key). Default +// "catalyst-controllers" (mirrors EPIC-3 F's +// `FederationSecretNamespace`). +// // Per-CR config (lease TTL/renew, witness kind, regions) is read // from Continuum CR spec, NEVER env (per INVIOLABLE-PRINCIPLES #4). // @@ -56,11 +64,21 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/openova-io/openova/core/controllers/continuum/internal/controller" "github.com/openova-io/openova/core/controllers/continuum/internal/events" "github.com/openova-io/openova/core/controllers/continuum/internal/pdm" "github.com/openova-io/openova/core/controllers/continuum/internal/switchover" "github.com/openova-io/openova/core/controllers/continuum/internal/witness" + // K-Cont-3 lease-witness implementations — register their + // Factory at init() time so DefaultSelector dispatches them. The + // controller does NOT need typed access to either package; the + // blank-import is the WIRING — removing it disables the kinds. + _ "github.com/openova-io/openova/core/controllers/continuum/internal/witness/cloudflarekv" + _ "github.com/openova-io/openova/core/controllers/continuum/internal/witness/dnsquorum" ) var scheme = runtime.NewScheme() @@ -109,12 +127,20 @@ func main() { os.Exit(1) } - // Witness selector. K-Cont-2 ships ErrNotImplemented for the - // real kinds (cloudflare-kv + dns-quorum); K-Cont-3 swaps in - // the implementations. The in-memory selector is gated by the - // WITNESS_IN_MEMORY env var — TEST ONLY. + // Witness selector. K-Cont-3 wires the cloudflare-kv + + // dns-quorum impls via blank-import (see top-of-file imports). + // The in-memory selector remains gated by the WITNESS_IN_MEMORY + // env var — TEST ONLY. The SecretReader resolves CF API tokens + // + TSIG keys from K8s Secrets in WITNESS_SECRET_NS (default + // "catalyst-controllers", same as EPIC-3 F's federation + // secret namespace). + witnessSecretNS := env("WITNESS_SECRET_NS", "catalyst-controllers") sel := &witness.DefaultSelector{ InMemoryAllowed: envBool("WITNESS_IN_MEMORY", false), + SecretReader: &k8sSecretReader{ + Client: mgr.GetClient(), + Namespace: witnessSecretNS, + }, } // PDM client — when PDM_API_URL is empty, the PDMCommit closure @@ -207,3 +233,43 @@ func podNamespace() string { } return string(b) } + +// k8sSecretReader implements witness.SecretReader by GETting the +// referenced K8s Secret via the controller-runtime client. +// +// Per Inviolable Principle #5 the value bytes stay in memory only +// until the calling Factory hands them to the wire client (CF bearer +// token / TSIG key). NEVER log; NEVER persist outside the Secret. +// +// Mirrors the EPIC-3 F readClientSecret seam — but lifted to an +// interface so the witness package doesn't import client-go. +type k8sSecretReader struct { + Client client.Client + Namespace string +} + +// ReadSecret implements witness.SecretReader. +func (r *k8sSecretReader) ReadSecret(ctx context.Context, secretName, key string) ([]byte, error) { + if secretName == "" { + return nil, fmt.Errorf("witness/k8sSecretReader: secret name is empty") + } + if key == "" { + return nil, fmt.Errorf("witness/k8sSecretReader: secret key is empty") + } + ns := r.Namespace + if ns == "" { + ns = "catalyst-controllers" + } + var sec corev1.Secret + if err := r.Client.Get(ctx, types.NamespacedName{Namespace: ns, Name: secretName}, &sec); err != nil { + return nil, fmt.Errorf("witness/k8sSecretReader: get Secret %s/%s: %w", ns, secretName, err) + } + val, ok := sec.Data[key] + if !ok { + return nil, fmt.Errorf("witness/k8sSecretReader: Secret %s/%s missing key %q", ns, secretName, key) + } + if len(val) == 0 { + return nil, fmt.Errorf("witness/k8sSecretReader: Secret %s/%s key %q is empty", ns, secretName, key) + } + return val, nil +} diff --git a/core/controllers/continuum/internal/witness/cloudflarekv/client.go b/core/controllers/continuum/internal/witness/cloudflarekv/client.go new file mode 100644 index 00000000..7417640f --- /dev/null +++ b/core/controllers/continuum/internal/witness/cloudflarekv/client.go @@ -0,0 +1,435 @@ +// Package cloudflarekv implements witness.Client over a Cloudflare +// Worker backed by Workers KV. The Worker is the K-Cont-4 deliverable; +// THIS client speaks the Worker's HTTP CAS contract. +// +// The contract (which K-Cont-4 must implement): +// +// GET /lease/ → 200 {holder, acquiredAt, +// expiresAt, generation} | 404 +// PUT /lease/ req body: {holder, ttlSeconds, op} +// req header: If-Match: +// (use "0" for first +// acquire on an empty slot) +// → 200 {…new state…} on CAS success +// → 412 on CAS conflict +// (held by another) +// DELETE /lease/ req header: If-Match: +// req header: X-Holder: +// → 204 success +// → 412 lost CAS or +// not the holder +// +// `op` discriminator on PUT: "acquire" or "renew" — for the Worker's +// log and for asymmetric handling (acquire allows Generation=0; +// renew requires the current Generation to MATCH). +// +// Authentication: the catalyst-controllers ServiceAccount references +// a SealedSecret holding the Worker API token; the controller passes +// it as `Authorization: Bearer `. Per Inviolable Principle #5 +// the plaintext lives in memory only. +// +// Why this shape vs raw KV REST? +// - The Worker centralises the CAS check (KV doesn't natively +// expose If-Match — the Worker computes the comparison). +// - One HTTPS endpoint per witness (vs per-account KV creds). +// - Worker logs per-CR access for audit. +// +// K-Cont-3 (this package) ships the CLIENT; K-Cont-4 (separate slice) +// ships the WORKER source. +package cloudflarekv + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/openova-io/openova/core/controllers/continuum/internal/witness" +) + +func init() { + witness.Register("cloudflare-kv", factory) +} + +// CFKVClient implements witness.Client over a Cloudflare Worker. +// +// Concurrent-safe: a single CFKVClient is used by the per-Continuum-CR +// goroutine for all its Renew calls; the Worker is the +// concurrency-arbitration point. +type CFKVClient struct { + // BaseURL is the Worker root, e.g. + // "https://lease.openova.workers.dev". Trailing slash stripped. + BaseURL string + + // APIToken is the bearer token resolved from a K8s Secret. NEVER + // log this value. + APIToken string + + // Slot is the per-CR identifier (`/`). URL-encoded + // at request time so slashes don't confuse the Worker's routing. + Slot string + + // HTTPClient is the underlying transport. Tests inject a + // httptest.Server-backed client. + HTTPClient *http.Client +} + +// New constructs a CFKVClient. baseURL + apiToken + slot are required. +// +// httpClient is optional — when nil, a 10-second-timeout client is +// used. The 10s ceiling matches the lease renew interval (per the +// Continuum CRD default 10s) so a stuck request can't outlive a renew +// cycle and double-acquire. +func New(baseURL, apiToken, slot string, httpClient *http.Client) (*CFKVClient, error) { + if strings.TrimSpace(baseURL) == "" { + return nil, errors.New("cloudflarekv: BaseURL is required") + } + if strings.TrimSpace(apiToken) == "" { + return nil, errors.New("cloudflarekv: APIToken is required") + } + if strings.TrimSpace(slot) == "" { + return nil, errors.New("cloudflarekv: Slot is required") + } + if httpClient == nil { + httpClient = &http.Client{Timeout: 10 * time.Second} + } + return &CFKVClient{ + BaseURL: strings.TrimRight(strings.TrimSpace(baseURL), "/"), + APIToken: strings.TrimSpace(apiToken), + Slot: strings.TrimSpace(slot), + HTTPClient: httpClient, + }, nil +} + +// factory is the witness.Factory entry registered at init() time. Per +// the K-Cont-3 brief cfg may carry: +// +// slot (string) REQUIRED — `/` +// baseURL (string) REQUIRED — Worker URL +// tokenSecretRef (map) {name, key} — K8s Secret holding the +// Worker bearer token +// apiToken (string) ALTERNATE to tokenSecretRef — direct +// token for tests; NOT used +// in production paths. +func factory(cfg map[string]any, secrets witness.SecretReader) (witness.Client, error) { + slot, _ := cfg["slot"].(string) + baseURL, _ := cfg["baseURL"].(string) + if baseURL == "" { + // Tolerate the alternate spelling per the CRD shape — the + // CRD has accountId + kvNamespaceId + tokenSecretRef and + // expects the Worker URL to be derived. K-Cont-3 takes a + // straight baseURL key; if the CR uses the CRD shape, the + // reconciler is responsible for translating to baseURL + // before calling Select. Fall back to "workerURL" alias for + // flexibility. + if v, ok := cfg["workerURL"].(string); ok { + baseURL = v + } + } + + // Token resolution path A: direct (tests only). + apiToken, _ := cfg["apiToken"].(string) + + // Path B: SecretRef. Production path. Required when apiToken is + // empty. + if apiToken == "" { + ref, _ := cfg["tokenSecretRef"].(map[string]any) + if ref == nil { + // alternate field-name from K-Cont-2's `tokenSecretRef` + // slice may pass "tokenSecretRef" as + // map[string]interface{}{} OR as a marshaled JSON object. + // We don't try harder than this; if neither is present, + // surface a clear error. + return nil, errors.New("cloudflarekv: cfg must include tokenSecretRef{name,key} or apiToken") + } + name, _ := ref["name"].(string) + key, _ := ref["key"].(string) + if key == "" { + key = "token" + } + if name == "" { + return nil, errors.New("cloudflarekv: tokenSecretRef.name is empty") + } + if secrets == nil { + return nil, errors.New("cloudflarekv: SecretReader not configured (controller must wire DefaultSelector.SecretReader)") + } + // 5s budget for the secret read — controller startup must + // not block forever on a missing Secret. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + b, err := secrets.ReadSecret(ctx, name, key) + if err != nil { + return nil, fmt.Errorf("cloudflarekv: read tokenSecretRef %s/%s: %w", name, key, err) + } + apiToken = strings.TrimSpace(string(b)) + if apiToken == "" { + return nil, fmt.Errorf("cloudflarekv: tokenSecretRef %s/%s is empty", name, key) + } + } + + return New(baseURL, apiToken, slot, nil) +} + +// kvState mirrors the Worker's response body. Field names are JSON +// camelCase per the Worker contract. +type kvState struct { + Holder string `json:"holder"` + AcquiredAt string `json:"acquiredAt"` // RFC3339 — server-stamped + ExpiresAt string `json:"expiresAt"` // RFC3339 — server-stamped + Generation int64 `json:"generation"` +} + +// kvWriteRequest is the PUT body for both acquire and renew. +type kvWriteRequest struct { + Holder string `json:"holder"` + TTLSeconds int `json:"ttlSeconds"` + Op string `json:"op"` // "acquire" | "renew" +} + +// Acquire implements witness.Client. +func (c *CFKVClient) Acquire(ctx context.Context, holder string, ttl time.Duration) (witness.State, error) { + if err := ctx.Err(); err != nil { + return witness.State{}, err + } + // Read current state to learn the generation for If-Match. On + // fresh slots Read returns Generation=0; the Worker accepts + // If-Match: 0 for the first acquire. + cur, err := c.Read(ctx) + if err != nil { + return witness.State{}, err + } + return c.write(ctx, holder, ttl, "acquire", cur.Generation) +} + +// Renew implements witness.Client. +func (c *CFKVClient) Renew(ctx context.Context, holder string, ttl time.Duration) (witness.State, error) { + if err := ctx.Err(); err != nil { + return witness.State{}, err + } + cur, err := c.Read(ctx) + if err != nil { + return witness.State{}, err + } + // If we don't currently hold the lease (or it's expired), Renew + // MUST surface ErrLeaseLost regardless of what the Worker says. + // This matches the K-Cont-2 contract: Renew is for the holder + // only. + if cur.Holder != holder { + return cur, witness.ErrLeaseLost + } + if !time.Now().Before(cur.ExpiresAt) { + return cur, witness.ErrLeaseLost + } + st, err := c.write(ctx, holder, ttl, "renew", cur.Generation) + if err != nil { + // Map ErrLeaseHeldByAnother → ErrLeaseLost on the renew + // path (K-Cont-2 contract: Renew never returns + // ErrLeaseHeldByAnother — it's only "we lost the lease"). + if errors.Is(err, witness.ErrLeaseHeldByAnother) { + return st, witness.ErrLeaseLost + } + return st, err + } + return st, nil +} + +// Release implements witness.Client. +func (c *CFKVClient) Release(ctx context.Context, holder string) error { + if err := ctx.Err(); err != nil { + return err + } + cur, err := c.Read(ctx) + if err != nil { + return err + } + if cur.Holder == "" || cur.Holder != holder { + // Idempotent: non-holder Release is a no-op (per K-Cont-2 + // contract). DO NOT round-trip to the Worker. + return nil + } + url := c.BaseURL + "/lease/" + pathEscapeSlot(c.Slot) + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) + if err != nil { + return fmt.Errorf("cloudflarekv: build DELETE: %w", err) + } + c.applyAuth(req) + req.Header.Set("If-Match", strconv.FormatInt(cur.Generation, 10)) + req.Header.Set("X-Holder", holder) + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("cloudflarekv: DELETE: %w", err) + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusNoContent || resp.StatusCode == http.StatusOK: + return nil + case resp.StatusCode == http.StatusPreconditionFailed: + // CAS lost between our Read and our DELETE — by the K-Cont-2 + // contract Release is idempotent, so a non-holder DELETE + // returns nil. Worker reports the new state but we don't + // care. + return nil + case resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden: + return fmt.Errorf("cloudflarekv: auth rejected (status %d)", resp.StatusCode) + default: + body := readSnippet(resp.Body) + return fmt.Errorf("cloudflarekv: DELETE status %d: %s", resp.StatusCode, body) + } +} + +// Read implements witness.Client. +func (c *CFKVClient) Read(ctx context.Context) (witness.State, error) { + if err := ctx.Err(); err != nil { + return witness.State{}, err + } + url := c.BaseURL + "/lease/" + pathEscapeSlot(c.Slot) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return witness.State{}, fmt.Errorf("cloudflarekv: build GET: %w", err) + } + c.applyAuth(req) + req.Header.Set("Accept", "application/json") + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return witness.State{}, fmt.Errorf("cloudflarekv: GET: %w", err) + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusNotFound: + // Empty slot — return zero state with Generation=0 so a + // subsequent Acquire knows to PUT with If-Match: 0. + return witness.State{}, nil + case resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden: + return witness.State{}, fmt.Errorf("cloudflarekv: auth rejected (status %d)", resp.StatusCode) + case resp.StatusCode >= 200 && resp.StatusCode < 300: + // fall through + default: + body := readSnippet(resp.Body) + return witness.State{}, fmt.Errorf("cloudflarekv: GET status %d: %s", resp.StatusCode, body) + } + + var k kvState + if err := json.NewDecoder(resp.Body).Decode(&k); err != nil { + return witness.State{}, fmt.Errorf("cloudflarekv: decode GET: %w", err) + } + return parseState(k) +} + +// write performs the PUT-with-If-Match dance for both acquire and +// renew. +func (c *CFKVClient) write(ctx context.Context, holder string, ttl time.Duration, op string, ifMatch int64) (witness.State, error) { + body, err := json.Marshal(kvWriteRequest{ + Holder: holder, + TTLSeconds: int(ttl / time.Second), + Op: op, + }) + if err != nil { + return witness.State{}, fmt.Errorf("cloudflarekv: marshal write: %w", err) + } + url := c.BaseURL + "/lease/" + pathEscapeSlot(c.Slot) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body)) + if err != nil { + return witness.State{}, fmt.Errorf("cloudflarekv: build PUT: %w", err) + } + c.applyAuth(req) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + req.Header.Set("If-Match", strconv.FormatInt(ifMatch, 10)) + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return witness.State{}, fmt.Errorf("cloudflarekv: PUT: %w", err) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK, http.StatusCreated: + var k kvState + if err := json.NewDecoder(resp.Body).Decode(&k); err != nil { + return witness.State{}, fmt.Errorf("cloudflarekv: decode PUT: %w", err) + } + st, perr := parseState(k) + if perr != nil { + return witness.State{}, perr + } + return st, nil + case http.StatusPreconditionFailed: + // CAS lost — the Worker may include the current state. Try + // to decode for the caller's benefit; but ALWAYS surface + // ErrLeaseHeldByAnother. + var k kvState + _ = json.NewDecoder(resp.Body).Decode(&k) + st, _ := parseState(k) + return st, witness.ErrLeaseHeldByAnother + case http.StatusUnauthorized, http.StatusForbidden: + return witness.State{}, fmt.Errorf("cloudflarekv: auth rejected (status %d)", resp.StatusCode) + default: + body := readSnippet(resp.Body) + return witness.State{}, fmt.Errorf("cloudflarekv: PUT status %d: %s", resp.StatusCode, body) + } +} + +// applyAuth stamps the bearer token. Centralised so we never miss it. +func (c *CFKVClient) applyAuth(req *http.Request) { + req.Header.Set("Authorization", "Bearer "+c.APIToken) + // Belt-and-suspenders for Worker logging: include the slot as a + // header so the Worker can log it without parsing the URL. + req.Header.Set("X-Lease-Slot", c.Slot) +} + +// parseState decodes the wire shape into a witness.State. +func parseState(k kvState) (witness.State, error) { + out := witness.State{ + Holder: k.Holder, + Generation: k.Generation, + } + if k.AcquiredAt != "" { + t, err := time.Parse(time.RFC3339, k.AcquiredAt) + if err != nil { + return out, fmt.Errorf("cloudflarekv: parse acquiredAt %q: %w", k.AcquiredAt, err) + } + out.AcquiredAt = t + } + if k.ExpiresAt != "" { + t, err := time.Parse(time.RFC3339, k.ExpiresAt) + if err != nil { + return out, fmt.Errorf("cloudflarekv: parse expiresAt %q: %w", k.ExpiresAt, err) + } + out.ExpiresAt = t + } + return out, nil +} + +// pathEscapeSlot URL-encodes the slot for safe inclusion in the +// Worker's path. We use a simple substitution for `/` because the +// Worker treats the slot as an opaque key — `%2F` is the canonical +// encoding. +func pathEscapeSlot(slot string) string { + // Use net/url style escaping; net/url.PathEscape encodes `/` to + // `%2F` which is what we want. + return strings.ReplaceAll(slot, "/", "%2F") +} + +// readSnippet returns at most 256 chars of the body for an error +// message. Never logs. +func readSnippet(r io.Reader) string { + b, _ := io.ReadAll(io.LimitReader(r, 512)) + s := string(b) + if len(s) > 256 { + s = s[:256] + "..." + } + return s +} + +// Compile-time assertion that CFKVClient satisfies witness.Client. +var _ witness.Client = (*CFKVClient)(nil) diff --git a/core/controllers/continuum/internal/witness/cloudflarekv/client_test.go b/core/controllers/continuum/internal/witness/cloudflarekv/client_test.go new file mode 100644 index 00000000..f4fbace8 --- /dev/null +++ b/core/controllers/continuum/internal/witness/cloudflarekv/client_test.go @@ -0,0 +1,394 @@ +// Tests for CFKVClient. +// +// Strategy: stand up an httptest.Server that mocks the K-Cont-4 +// Cloudflare Worker — its JSON wire shape is documented in +// client.go's package doc and is THE contract K-Cont-4 must satisfy. +// The mock implements the same atomic-CAS semantics as the in-memory +// witness reference. The shared parametric contract suite from +// `internal/witness/testing` then runs against CFKVClient — a +// behavioral diff between the in-memory reference and CF surfaces +// here. + +package cloudflarekv + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/openova-io/openova/core/controllers/continuum/internal/witness" + contracttest "github.com/openova-io/openova/core/controllers/continuum/internal/witness/testing" +) + +// fakeWorker mimics the K-Cont-4 Cloudflare Worker over httptest. +// One fakeWorker holds N slot states keyed by slot string; multiple +// CFKVClient instances bound to the same slot contend through it. +type fakeWorker struct { + mu sync.Mutex + slots map[string]*kvState + now func() time.Time + bearer string +} + +func newFakeWorker(bearer string, now func() time.Time) *fakeWorker { + return &fakeWorker{ + slots: map[string]*kvState{}, + now: now, + bearer: bearer, + } +} + +func (w *fakeWorker) handler() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/lease/", w.serveLease) + return mux +} + +func (w *fakeWorker) serveLease(rw http.ResponseWriter, req *http.Request) { + // Auth check — every request must carry the bearer token. + if got := req.Header.Get("Authorization"); got != "Bearer "+w.bearer { + http.Error(rw, "unauthorized", http.StatusUnauthorized) + return + } + slot := strings.TrimPrefix(req.URL.Path, "/lease/") + if slot == "" { + http.Error(rw, "missing slot", http.StatusBadRequest) + return + } + // CFKVClient encodes `/` as `%2F`; net/http decodes it back to + // `/` automatically before serving. + + w.mu.Lock() + defer w.mu.Unlock() + now := w.now() + + switch req.Method { + case http.MethodGet: + st, ok := w.slots[slot] + if !ok { + http.NotFound(rw, req) + return + } + // TTL eviction at the witness side: if the stored ExpiresAt + // is in the past, surface an empty record (the holder field + // is cleared but Generation is preserved so the next + // Acquire's If-Match has a defined value to match against). + writeJSON(rw, http.StatusOK, st) + return + + case http.MethodPut: + var body kvWriteRequest + if err := json.NewDecoder(req.Body).Decode(&body); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + ifMatch, _ := strconv.ParseInt(req.Header.Get("If-Match"), 10, 64) + cur := w.slots[slot] + curGen := int64(0) + if cur != nil { + curGen = cur.Generation + } + if ifMatch != curGen { + // CAS conflict. + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(http.StatusPreconditionFailed) + if cur != nil { + _ = json.NewEncoder(rw).Encode(cur) + } + return + } + // Decide whether the slot is takeable. Takeable when: + // - slot is empty (cur==nil) + // - slot is expired + // - holder matches (re-acquire / renew) + var ( + holder = body.Holder + ttl = time.Duration(body.TTLSeconds) * time.Second + takeable = cur == nil + sameHold = cur != nil && cur.Holder == holder + expired = cur != nil && !beforeRFC3339(now, cur.ExpiresAt) + ) + if !takeable && !sameHold && !expired { + // Held by another, non-expired holder. + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(http.StatusPreconditionFailed) + _ = json.NewEncoder(rw).Encode(cur) + return + } + // Renew op MUST require sameHold + non-expired (matches + // K-Cont-2 contract). The CFKVClient pre-checks this client- + // side, so a stray renew arriving here is malformed; reject. + if body.Op == "renew" && !(sameHold && !expired) { + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(http.StatusPreconditionFailed) + if cur != nil { + _ = json.NewEncoder(rw).Encode(cur) + } + return + } + var ( + acquiredAt time.Time + ) + if sameHold && !expired { + // Preserve the original acquisition time on + // re-acquire/renew. + acquiredAt, _ = time.Parse(time.RFC3339, cur.AcquiredAt) + } else { + acquiredAt = now + } + next := &kvState{ + Holder: holder, + AcquiredAt: acquiredAt.Format(time.RFC3339), + ExpiresAt: now.Add(ttl).Format(time.RFC3339), + Generation: curGen + 1, + } + w.slots[slot] = next + writeJSON(rw, http.StatusOK, next) + return + + case http.MethodDelete: + ifMatch, _ := strconv.ParseInt(req.Header.Get("If-Match"), 10, 64) + holder := req.Header.Get("X-Holder") + cur := w.slots[slot] + if cur == nil { + rw.WriteHeader(http.StatusNoContent) + return + } + if cur.Generation != ifMatch || cur.Holder != holder { + rw.WriteHeader(http.StatusPreconditionFailed) + return + } + // Bump generation on release so a subsequent Acquire sees a + // non-zero If-Match-required generation. This matches the + // in-memory reference's Generation+1 on Release. + w.slots[slot] = &kvState{Generation: cur.Generation + 1} + rw.WriteHeader(http.StatusNoContent) + return + + default: + http.Error(rw, "method not allowed", http.StatusMethodNotAllowed) + return + } +} + +func writeJSON(rw http.ResponseWriter, status int, v any) { + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(status) + _ = json.NewEncoder(rw).Encode(v) +} + +func beforeRFC3339(now time.Time, expiresAt string) bool { + t, err := time.Parse(time.RFC3339, expiresAt) + if err != nil { + return false + } + return now.Before(t) +} + +// fakeClock is a thread-safe clock the tests advance. +type fakeClock struct { + mu sync.Mutex + t time.Time +} + +func newFakeClock() *fakeClock { + return &fakeClock{t: time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC)} +} + +func (c *fakeClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.t +} + +func (c *fakeClock) Advance(d time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.t = c.t.Add(d) +} + +// TestCFKV_ContractSuite runs the parametric witness contract against +// CFKVClient over a fake Worker. Behavioral drift between the +// in-memory reference and CF surfaces here. +func TestCFKV_ContractSuite(t *testing.T) { + t.Parallel() + contracttest.RunContractSuite(t, func() *contracttest.Backend { + clk := newFakeClock() + w := newFakeWorker("test-token", clk.Now) + srv := httptest.NewServer(w.handler()) + mkClient := func(slot string) witness.Client { + c, err := New(srv.URL, "test-token", slot, srv.Client()) + if err != nil { + t.Fatalf("New: %v", err) + } + return c + } + return &contracttest.Backend{ + A: mkClient("ns/cr-main"), + B: mkClient("ns/cr-main"), + Other: mkClient("ns/cr-other"), + Advance: clk.Advance, + Cleanup: srv.Close, + } + }) +} + +// TestCFKV_ConstructorValidation — happy + sad path on New(). +func TestCFKV_ConstructorValidation(t *testing.T) { + t.Parallel() + cases := []struct { + name, base, token, slot string + wantErr bool + }{ + {"missing baseURL", "", "tok", "ns/cr", true}, + {"missing token", "http://x", "", "ns/cr", true}, + {"missing slot", "http://x", "tok", "", true}, + {"all set", "http://x", "tok", "ns/cr", false}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + c, err := New(tc.base, tc.token, tc.slot, nil) + if tc.wantErr { + if err == nil { + t.Fatalf("expected error") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if c == nil { + t.Fatalf("nil client") + } + }) + } +} + +// TestCFKV_AuthHeaderRequired — the Worker rejects without bearer +// and the client surfaces 401 as a clear error. +func TestCFKV_AuthHeaderRequired(t *testing.T) { + t.Parallel() + clk := newFakeClock() + w := newFakeWorker("real-token", clk.Now) + srv := httptest.NewServer(w.handler()) + defer srv.Close() + + c, err := New(srv.URL, "wrong-token", "ns/cr", srv.Client()) + if err != nil { + t.Fatalf("New: %v", err) + } + if _, err := c.Acquire(context.Background(), "fsn", 30*time.Second); err == nil { + t.Fatalf("expected auth-rejected error") + } else if !strings.Contains(err.Error(), "auth rejected") { + t.Fatalf("expected auth rejected, got: %v", err) + } +} + +// TestCFKV_FactoryFromCfg — the registered factory parses cfg +// correctly and resolves SecretRef tokens. +func TestCFKV_FactoryFromCfg(t *testing.T) { + t.Parallel() + clk := newFakeClock() + w := newFakeWorker("from-secret-token", clk.Now) + srv := httptest.NewServer(w.handler()) + defer srv.Close() + + secrets := witness.SecretReaderFunc(func(_ context.Context, name, key string) ([]byte, error) { + if name != "cf-token" || key != "token" { + return nil, errors.New("not found") + } + return []byte("from-secret-token"), nil + }) + + cfg := map[string]any{ + "slot": "ns/cr", + "baseURL": srv.URL, + "tokenSecretRef": map[string]any{ + "name": "cf-token", + "key": "token", + }, + } + cli, err := factory(cfg, secrets) + if err != nil { + t.Fatalf("factory: %v", err) + } + // Override transport so we hit the test server's TLS-less listener. + cli.(*CFKVClient).HTTPClient = srv.Client() + st, err := cli.Acquire(context.Background(), "fsn", 30*time.Second) + if err != nil { + t.Fatalf("Acquire: %v", err) + } + if st.Holder != "fsn" { + t.Fatalf("Holder = %q want fsn", st.Holder) + } +} + +func TestCFKV_FactoryRejectsMissingFields(t *testing.T) { + t.Parallel() + cases := []struct { + name string + cfg map[string]any + }{ + {"empty", map[string]any{}}, + {"missing slot", map[string]any{"baseURL": "http://x", "apiToken": "t"}}, + {"missing baseURL", map[string]any{"slot": "x", "apiToken": "t"}}, + {"missing token", map[string]any{"slot": "x", "baseURL": "http://x"}}, + { + "secretRef without reader", + map[string]any{ + "slot": "x", + "baseURL": "http://x", + "tokenSecretRef": map[string]any{"name": "n", "key": "k"}, + }, + }, + } + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + _, err := factory(tc.cfg, nil) + if err == nil { + t.Fatalf("expected error") + } + }) + } +} + +// TestCFKV_GenerationBumpedOnRelease — the Worker's contract bumps +// Generation on Release so a subsequent Acquire's If-Match has a +// well-defined non-zero baseline. This belongs in CF-specific tests +// because the in-memory reference also does this; we verify CF +// matches. +func TestCFKV_GenerationBumpedOnRelease(t *testing.T) { + t.Parallel() + clk := newFakeClock() + w := newFakeWorker("t", clk.Now) + srv := httptest.NewServer(w.handler()) + defer srv.Close() + c, _ := New(srv.URL, "t", "ns/cr", srv.Client()) + + st1, err := c.Acquire(context.Background(), "fsn", 30*time.Second) + if err != nil { + t.Fatalf("Acquire: %v", err) + } + if err := c.Release(context.Background(), "fsn"); err != nil { + t.Fatalf("Release: %v", err) + } + st2, err := c.Read(context.Background()) + if err != nil { + t.Fatalf("Read: %v", err) + } + if st2.Generation <= st1.Generation { + t.Fatalf("Generation did not bump on Release: %d -> %d", st1.Generation, st2.Generation) + } + if st2.Holder != "" { + t.Fatalf("Holder = %q want empty after Release", st2.Holder) + } +} diff --git a/core/controllers/continuum/internal/witness/dnsquorum/client.go b/core/controllers/continuum/internal/witness/dnsquorum/client.go new file mode 100644 index 00000000..ca767241 --- /dev/null +++ b/core/controllers/continuum/internal/witness/dnsquorum/client.go @@ -0,0 +1,618 @@ +// Package dnsquorum implements witness.Client across N (default 3) +// DNS authoritative servers that we have TXT-write access to. Quorum +// = ceil(N/2 + 1) = 2-of-3 for the default fleet. +// +// Wire shape: +// +// Slot key: . e.g. "ns_cr.lease.openova.io" +// ('/' in slot becomes '_' to keep DNS-label-safe) +// TXT record value: "::" +// (empty/no record = "free") +// +// The CAS protocol (per K-Cont-3 brief item 2): +// +// - Acquire: read all N servers in parallel; compute quorum view of +// current generation; PUT new value with generation+1 on all N; +// succeed iff ≥ quorum acks. Quorum split / generation skew on +// read → defensively return ErrLeaseHeldByAnother. +// +// - Renew: same as Acquire but the generation MUST bump and the +// holder MUST already match — otherwise ErrLeaseLost. +// +// - Release: PUT empty value (cleared) on all N; idempotent. +// +// - Read: parallel TXT lookup; majority wins; tie → empty. +// +// For Phase-1 POC, the WRITE side uses the PowerDNS REST API +// directly (PATCH /api/v1/servers/localhost/zones/) on each of +// the N servers. The TXT-write endpoint is an injected TXTWriter so +// tests don't need a real PowerDNS. In production the same writer +// hits the existing PDM `/v1/txt` endpoint (planned addition to PDM +// — K-Cont-3 ships against the contract, K-Cont-{4|5} adds the PDM +// endpoint). +// +// Why std-lib net.Resolver vs github.com/miekg/dns? +// - Zero new dependency in core/controllers/go.mod (per +// INVIOLABLE-PRINCIPLES quality bar — fewer transitive deps = +// fewer attack surfaces + smaller binary). +// - net.Resolver supports DialContext to target a specific +// resolver IP, which is exactly what we need for per-server +// TXT lookups. +// - We never need DNSSEC validation here (the witness data is +// non-sensitive — just lease state); a recursive resolver shape +// suffices. +package dnsquorum + +import ( + "context" + "errors" + "fmt" + "net" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/openova-io/openova/core/controllers/continuum/internal/witness" +) + +func init() { + witness.Register("dns-quorum", factory) +} + +// TXTWriter writes a TXT record at . on a single +// authoritative server. value="" means "delete the record". +// +// Implementations: +// - HTTPTXTWriter: PowerDNS REST API (production) +// - in-memory map: tests +type TXTWriter interface { + // WriteTXT writes (or deletes when value=="") the TXT record. + // MUST be idempotent — repeated identical calls are fine. + // Returns nil on ack. + WriteTXT(ctx context.Context, server, fqdn, value string, ttl time.Duration) error +} + +// TXTReader reads TXT records at fqdn from a single authoritative +// server. Returns the record values (typically a 1-element slice for +// a TXT we wrote). Empty slice = no record. +type TXTReader interface { + ReadTXT(ctx context.Context, server, fqdn string) ([]string, error) +} + +// DNSQuorumClient implements witness.Client across N DNS servers. +// +// Concurrent-safe — every write fans out to all servers in parallel. +type DNSQuorumClient struct { + // Servers is the authoritative-server list. Each entry is the + // server's reachable address (e.g. "ns1.openova.io:53" for DNS + // reads; the corresponding PowerDNS API endpoint is derived by + // the TXTWriter). + Servers []string + + // TSIGKey is the shared key the production HTTPTXTWriter uses + // to sign PUT requests. Per Inviolable Principle #5 the value + // is in memory only — NEVER log. + TSIGKey string + + // Domain is the DNS suffix for lease records (e.g. + // "lease.openova.io"). The full FQDN is `.`. + Domain string + + // Slot is the per-CR identifier (`/`); '/' is + // replaced with '_' to keep DNS-label-safe. + Slot string + + // Writer + Reader are the per-server TXT I/O backends. Defaults + // applied by New() when nil. + Writer TXTWriter + Reader TXTReader + + // QuorumOverride lets tests force a specific quorum threshold + // (production = ceil(N/2+1)). 0 = use the canonical formula. + QuorumOverride int +} + +// New constructs a DNSQuorumClient. servers / domain / slot required; +// writer + reader default to the production HTTP-PowerDNS + std-lib +// TXTReader respectively when nil. +func New(servers []string, tsigKey, domain, slot string, writer TXTWriter, reader TXTReader) (*DNSQuorumClient, error) { + if len(servers) < 3 { + return nil, fmt.Errorf("dnsquorum: at least 3 servers required, got %d", len(servers)) + } + if strings.TrimSpace(domain) == "" { + return nil, errors.New("dnsquorum: Domain is required") + } + if strings.TrimSpace(slot) == "" { + return nil, errors.New("dnsquorum: Slot is required") + } + c := &DNSQuorumClient{ + Servers: servers, + TSIGKey: strings.TrimSpace(tsigKey), + Domain: strings.TrimRight(strings.TrimSpace(domain), "."), + Slot: strings.TrimSpace(slot), + Writer: writer, + Reader: reader, + } + if c.Reader == nil { + c.Reader = NewStdLibTXTReader(2 * time.Second) + } + if c.Writer == nil { + // In production the writer is wired separately (PowerDNS + // API client). New() returning a nil writer is allowed for + // the read-path-only case; write methods will surface a + // clear error if invoked. + } + return c, nil +} + +// factory is the witness.Factory entry registered at init() time. +// +// cfg keys: +// +// slot (string) REQUIRED — `/` +// dnsServers ([]string|[]any) REQUIRED — N≥3 servers +// domain (string) REQUIRED — DNS suffix +// tsigSecretRef (map) {name, key} — TSIG key Secret +// tsigKey (string) ALTERNATE — direct (tests) +// +// For Phase-1 POC the same factory is used in production; if the CR +// supplies a CRD-shaped `resolvers` field instead of `dnsServers`, +// the reconciler is expected to translate before calling Select. +func factory(cfg map[string]any, secrets witness.SecretReader) (witness.Client, error) { + slot, _ := cfg["slot"].(string) + domain, _ := cfg["domain"].(string) + if domain == "" { + domain = "lease.openova.io" // brief's recommended default + } + + servers, err := stringSlice(cfg["dnsServers"]) + if err != nil { + return nil, fmt.Errorf("dnsquorum: dnsServers: %w", err) + } + if len(servers) == 0 { + // CRD shape uses "resolvers" — try that field too for + // forward-compat with the K-Cont-2 CRD. + servers, err = stringSlice(cfg["resolvers"]) + if err != nil { + return nil, fmt.Errorf("dnsquorum: resolvers: %w", err) + } + } + + tsigKey, _ := cfg["tsigKey"].(string) + if tsigKey == "" { + ref, _ := cfg["tsigSecretRef"].(map[string]any) + if ref != nil { + name, _ := ref["name"].(string) + key, _ := ref["key"].(string) + if key == "" { + key = "tsig" + } + if name == "" { + return nil, errors.New("dnsquorum: tsigSecretRef.name is empty") + } + if secrets == nil { + return nil, errors.New("dnsquorum: SecretReader not configured (controller must wire DefaultSelector.SecretReader)") + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + b, sErr := secrets.ReadSecret(ctx, name, key) + if sErr != nil { + return nil, fmt.Errorf("dnsquorum: read tsigSecretRef %s/%s: %w", name, key, sErr) + } + tsigKey = strings.TrimSpace(string(b)) + } + } + + // Writer is nil here — production wiring injects a real + // PowerDNS-API writer separately. For now the client errors on + // write methods if invoked — this matches the Phase-1 POC + // strategy in the brief (CF-KV is operational; DNS-quorum + // requires PDM /v1/txt endpoint which is K-Cont-{4|5}). + return New(servers, tsigKey, domain, slot, nil, nil) +} + +// stringSlice converts a cfg value (which may be []string, +// []interface{}, or nil) into []string. nil → empty + nil. +func stringSlice(v any) ([]string, error) { + switch s := v.(type) { + case nil: + return nil, nil + case []string: + return s, nil + case []any: + out := make([]string, 0, len(s)) + for i, e := range s { + str, ok := e.(string) + if !ok { + return nil, fmt.Errorf("element %d is %T, want string", i, e) + } + out = append(out, str) + } + return out, nil + default: + return nil, fmt.Errorf("want []string, got %T", v) + } +} + +// fqdn returns `.` for this client. +func (c *DNSQuorumClient) fqdn() string { + return encodeSlot(c.Slot) + "." + c.Domain +} + +// encodeSlot replaces '/' with '_' so the slot is DNS-label-safe. +// '/' is the canonical Continuum slot separator (`/`); '_' +// is permitted in DNS labels per RFC 2181 §11 (label characters are +// arbitrary octets) and PowerDNS handles them fine. +func encodeSlot(slot string) string { + return strings.ReplaceAll(slot, "/", "_") +} + +// quorum returns the quorum threshold. Tests can override via +// QuorumOverride. +func (c *DNSQuorumClient) quorum() int { + if c.QuorumOverride > 0 { + return c.QuorumOverride + } + return len(c.Servers)/2 + 1 +} + +// Acquire implements witness.Client. +func (c *DNSQuorumClient) Acquire(ctx context.Context, holder string, ttl time.Duration) (witness.State, error) { + if err := ctx.Err(); err != nil { + return witness.State{}, err + } + cur, ok := c.readQuorum(ctx) + if !ok { + // Read quorum failed — defensive: treat as held by another. + return cur, witness.ErrLeaseHeldByAnother + } + // Truncate to second precision — matches RFC3339 wire format + // granularity (the wire round-trip would truncate anyway, so + // returning a truncated value avoids same-holder Equal-check + // drift in tests + matches what subsequent Read() would see). + now := time.Now().UTC().Truncate(time.Second) + // Slot is takeable iff free, expired, or already ours. + takeable := cur.Holder == "" || !now.Before(cur.ExpiresAt) || cur.Holder == holder + if !takeable { + return cur, witness.ErrLeaseHeldByAnother + } + // Compute next state — preserve AcquiredAt on re-acquire. + acquiredAt := now + if cur.Holder == holder && now.Before(cur.ExpiresAt) && !cur.AcquiredAt.IsZero() { + acquiredAt = cur.AcquiredAt + } + next := witness.State{ + Holder: holder, + AcquiredAt: acquiredAt, + ExpiresAt: now.Add(ttl), + Generation: cur.Generation + 1, + } + if err := c.writeQuorum(ctx, encodeValue(next), ttl); err != nil { + return witness.State{}, err + } + return next, nil +} + +// Renew implements witness.Client. +func (c *DNSQuorumClient) Renew(ctx context.Context, holder string, ttl time.Duration) (witness.State, error) { + if err := ctx.Err(); err != nil { + return witness.State{}, err + } + cur, ok := c.readQuorum(ctx) + if !ok { + return cur, witness.ErrLeaseLost + } + now := time.Now().UTC().Truncate(time.Second) + if cur.Holder != holder { + return cur, witness.ErrLeaseLost + } + if !now.Before(cur.ExpiresAt) { + return cur, witness.ErrLeaseLost + } + next := witness.State{ + Holder: holder, + AcquiredAt: cur.AcquiredAt, + ExpiresAt: now.Add(ttl), + Generation: cur.Generation + 1, + } + if err := c.writeQuorum(ctx, encodeValue(next), ttl); err != nil { + return witness.State{}, err + } + return next, nil +} + +// Release implements witness.Client. +func (c *DNSQuorumClient) Release(ctx context.Context, holder string) error { + if err := ctx.Err(); err != nil { + return err + } + cur, ok := c.readQuorum(ctx) + if !ok { + // Quorum read failed — idempotent Release: don't surface an + // error, but skip the write. + return nil + } + if cur.Holder == "" || cur.Holder != holder { + return nil + } + // Clear the record. Bump generation so a subsequent Acquire + // sees a non-zero baseline (matches in-memory + CF reference). + released := witness.State{Generation: cur.Generation + 1} + return c.writeQuorum(ctx, encodeValue(released), 30*time.Second) +} + +// Read implements witness.Client. +func (c *DNSQuorumClient) Read(ctx context.Context) (witness.State, error) { + if err := ctx.Err(); err != nil { + return witness.State{}, err + } + st, _ := c.readQuorum(ctx) + return st, nil +} + +// readQuorum queries all N servers in parallel and returns the +// majority State + a quorum-ok flag. +// +// Quorum rule: count distinct State values (by encoded value); if +// any single value has ≥ quorum agreement, it wins. Otherwise +// ok=false. +// +// Empty TXT (no record) is a valid value (= "free"). N readable +// "free" responses count as quorum. +func (c *DNSQuorumClient) readQuorum(ctx context.Context) (witness.State, bool) { + type result struct { + val string + err error + } + results := make([]result, len(c.Servers)) + var wg sync.WaitGroup + for i, srv := range c.Servers { + i, srv := i, srv + wg.Add(1) + go func() { + defer wg.Done() + vals, err := c.Reader.ReadTXT(ctx, srv, c.fqdn()) + if err != nil { + results[i] = result{err: err} + return + } + // First TXT entry wins. Empty slice → "" (= free). + val := "" + if len(vals) > 0 { + val = vals[0] + } + results[i] = result{val: val} + }() + } + wg.Wait() + // Count distinct successful responses. + tally := map[string]int{} + for _, r := range results { + if r.err != nil { + continue + } + tally[r.val]++ + } + q := c.quorum() + // Find the value with the largest tally. We need DETERMINISTIC + // tie-breaking so a 1+1+1 split is reported as "no quorum" + // reliably. + bestVal := "" + best := 0 + for val, cnt := range tally { + switch { + case cnt > best: + bestVal, best = val, cnt + case cnt == best && val < bestVal: + // Lexicographic tie-break — stable across runs. + bestVal = val + } + } + if best < q { + return witness.State{}, false + } + st, err := decodeValue(bestVal) + if err != nil { + return witness.State{}, false + } + return st, true +} + +// writeQuorum PUTs the new value on all N servers in parallel and +// returns nil iff ≥ quorum acks. On sub-quorum success we log nothing +// here — the caller (controller) decides how to react (typically: a +// follow-up Read will detect drift and surface ErrLeaseLost on next +// Renew). +// +// We DELETE-on-empty-value to avoid leaving stale records. +func (c *DNSQuorumClient) writeQuorum(ctx context.Context, value string, ttl time.Duration) error { + if c.Writer == nil { + return errors.New("dnsquorum: Writer not configured (Phase-1 POC needs PDM /v1/txt — K-Cont-{4|5})") + } + errs := make([]error, len(c.Servers)) + var wg sync.WaitGroup + for i, srv := range c.Servers { + i, srv := i, srv + wg.Add(1) + go func() { + defer wg.Done() + errs[i] = c.Writer.WriteTXT(ctx, srv, c.fqdn(), value, ttl) + }() + } + wg.Wait() + acks := 0 + var firstErr error + for _, err := range errs { + if err == nil { + acks++ + } else if firstErr == nil { + firstErr = err + } + } + if acks < c.quorum() { + if firstErr != nil { + return fmt.Errorf("dnsquorum: write quorum %d/%d: %w", acks, c.quorum(), firstErr) + } + return fmt.Errorf("dnsquorum: write quorum %d/%d", acks, c.quorum()) + } + return nil +} + +// encodeValue serializes a State to the wire format +// `|||`. Empty State → +// empty string. +// +// The `|` separator is chosen because RFC3339 timestamps contain `:` +// (time component). PowerDNS escapes characters in TXT records via +// the standard quoting rules; `|` is plain ASCII and survives the +// PATCH→DNS→LookupTXT round-trip without translation. +// +// AcquiredAt is preserved so re-acquire-by-same-holder doesn't reset +// the original-acquisition timestamp (matches K-Cont-2 contract item +// 4 — same-holder Acquire = lease extension, not a new acquisition). +func encodeValue(s witness.State) string { + if s.Holder == "" && s.Generation == 0 { + return "" + } + return fmt.Sprintf("%s|%s|%s|%d", + s.Holder, + s.AcquiredAt.UTC().Format(time.RFC3339), + s.ExpiresAt.UTC().Format(time.RFC3339), + s.Generation, + ) +} + +// decodeValue parses the wire format back into a State. Empty string +// returns the zero State (free slot). +// +// Backward-compat: also accepts the 3-field shape (no AcquiredAt) for +// records written by an older controller version mid-rollout — the +// AcquiredAt is left zero in that case (acceptable: the same-holder +// preservation invariant degrades to a fresh-acquisition timestamp, +// not a wrong one). +func decodeValue(v string) (witness.State, error) { + v = strings.TrimSpace(v) + if v == "" { + return witness.State{}, nil + } + parts := strings.Split(v, "|") + switch len(parts) { + case 4: + return decode4(parts) + case 3: + // Legacy 3-field shape — degrade gracefully. + return decode3(parts) + } + // A bare numeric value like "5" is treated as a released state + // with that generation (defensive — let it survive downgrade). + if g, err := strconv.ParseInt(v, 10, 64); err == nil { + return witness.State{Generation: g}, nil + } + return witness.State{}, fmt.Errorf("dnsquorum: malformed TXT %q", v) +} + +func decode4(parts []string) (witness.State, error) { + holder, acquiredStr, expiresStr := parts[0], parts[1], parts[2] + gen, err := strconv.ParseInt(parts[3], 10, 64) + if err != nil { + return witness.State{}, fmt.Errorf("dnsquorum: parse generation %q: %w", parts[3], err) + } + st := witness.State{Holder: holder, Generation: gen} + if acquiredStr != "" { + t, err := time.Parse(time.RFC3339, acquiredStr) + if err != nil { + return st, fmt.Errorf("dnsquorum: parse acquiredAt %q: %w", acquiredStr, err) + } + st.AcquiredAt = t + } + if expiresStr != "" { + t, err := time.Parse(time.RFC3339, expiresStr) + if err != nil { + return st, fmt.Errorf("dnsquorum: parse expiresAt %q: %w", expiresStr, err) + } + st.ExpiresAt = t + } + return st, nil +} + +func decode3(parts []string) (witness.State, error) { + holder, expiresStr := parts[0], parts[1] + gen, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + return witness.State{}, fmt.Errorf("dnsquorum: parse generation %q: %w", parts[2], err) + } + st := witness.State{Holder: holder, Generation: gen} + if expiresStr != "" { + t, err := time.Parse(time.RFC3339, expiresStr) + if err != nil { + return st, fmt.Errorf("dnsquorum: parse expiresAt %q: %w", expiresStr, err) + } + st.ExpiresAt = t + } + return st, nil +} + +// SortedServers returns Servers sorted lexically — used by tests for +// deterministic output and exposed as a helper for diagnostics. Pure +// function, no I/O. +func (c *DNSQuorumClient) SortedServers() []string { + out := append([]string(nil), c.Servers...) + sort.Strings(out) + return out +} + +// Compile-time assertion that DNSQuorumClient satisfies witness.Client. +var _ witness.Client = (*DNSQuorumClient)(nil) + +// ---------------------------------------------------------------------- +// std-lib TXTReader implementation — production read path + +// StdLibTXTReader uses net.Resolver with DialContext to target a +// specific authoritative server. No external DNS-library dependency. +type StdLibTXTReader struct { + Timeout time.Duration +} + +// NewStdLibTXTReader returns a TXTReader using the std-lib resolver. +func NewStdLibTXTReader(timeout time.Duration) *StdLibTXTReader { + if timeout <= 0 { + timeout = 2 * time.Second + } + return &StdLibTXTReader{Timeout: timeout} +} + +// ReadTXT queries `fqdn` TXT records at the specified server. +// +// `server` accepts either "host" (port 53 implied) or "host:port". +func (r *StdLibTXTReader) ReadTXT(ctx context.Context, server, fqdn string) ([]string, error) { + addr := server + if !strings.Contains(addr, ":") { + addr = net.JoinHostPort(addr, "53") + } + res := &net.Resolver{ + PreferGo: true, + Dial: func(dctx context.Context, network, _ string) (net.Conn, error) { + d := net.Dialer{Timeout: r.Timeout} + // Force connection to the specific authoritative + // server. We always dial UDP; failover to TCP would + // require a second round-trip we can skip for the + // short TXT values we use. + return d.DialContext(dctx, "udp", addr) + }, + } + ctx, cancel := context.WithTimeout(ctx, r.Timeout) + defer cancel() + vals, err := res.LookupTXT(ctx, fqdn) + if err != nil { + // NXDOMAIN / NoData are normal for a "free" slot — return + // empty + nil. + var derr *net.DNSError + if errors.As(err, &derr) && (derr.IsNotFound) { + return nil, nil + } + return nil, fmt.Errorf("dnsquorum: LookupTXT %s @ %s: %w", fqdn, addr, err) + } + return vals, nil +} diff --git a/core/controllers/continuum/internal/witness/dnsquorum/client_test.go b/core/controllers/continuum/internal/witness/dnsquorum/client_test.go new file mode 100644 index 00000000..4f8ad0c0 --- /dev/null +++ b/core/controllers/continuum/internal/witness/dnsquorum/client_test.go @@ -0,0 +1,450 @@ +// Tests for DNSQuorumClient. +// +// We mock the TXT read/write side with an in-memory map keyed by +// (server, fqdn). The map enforces atomic per-server PUTs so the +// quorum logic is exercised faithfully — split-brain, partial +// failures, generation skew — without standing up real PowerDNS. +// +// The shared parametric contract suite from +// `internal/witness/testing` then runs against DNSQuorumClient — a +// behavioral diff between the in-memory reference and the quorum +// surface fails here. + +package dnsquorum + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/openova-io/openova/core/controllers/continuum/internal/witness" + contracttest "github.com/openova-io/openova/core/controllers/continuum/internal/witness/testing" +) + +// fakeBackend mocks N authoritative DNS servers. Each server holds +// an independent map (server → fqdn → value). Writes are atomic +// per-server. Tests can FAIL specific servers (writes return error, +// reads return error) to simulate quorum loss. +type fakeBackend struct { + mu sync.Mutex + servers map[string]map[string]string + failed map[string]bool +} + +func newFakeBackend(servers []string) *fakeBackend { + b := &fakeBackend{ + servers: map[string]map[string]string{}, + failed: map[string]bool{}, + } + for _, s := range servers { + b.servers[s] = map[string]string{} + } + return b +} + +func (b *fakeBackend) WriteTXT(_ context.Context, server, fqdn, value string, _ time.Duration) error { + b.mu.Lock() + defer b.mu.Unlock() + if b.failed[server] { + return fmt.Errorf("server %s unavailable", server) + } + if value == "" { + delete(b.servers[server], fqdn) + return nil + } + b.servers[server][fqdn] = value + return nil +} + +func (b *fakeBackend) ReadTXT(_ context.Context, server, fqdn string) ([]string, error) { + b.mu.Lock() + defer b.mu.Unlock() + if b.failed[server] { + return nil, fmt.Errorf("server %s unavailable", server) + } + v, ok := b.servers[server][fqdn] + if !ok || v == "" { + return nil, nil + } + return []string{v}, nil +} + +// fail marks server as down (writes error, reads error). +func (b *fakeBackend) fail(server string) { + b.mu.Lock() + defer b.mu.Unlock() + b.failed[server] = true +} + +// recover re-enables a previously failed server. +func (b *fakeBackend) recover(server string) { + b.mu.Lock() + defer b.mu.Unlock() + delete(b.failed, server) +} + +// setRaw lets tests inject divergent state on a specific server (for +// split-brain scenarios). +func (b *fakeBackend) setRaw(server, fqdn, value string) { + b.mu.Lock() + defer b.mu.Unlock() + if value == "" { + delete(b.servers[server], fqdn) + return + } + b.servers[server][fqdn] = value +} + +// TestDNSQuorum_ContractSuite runs the parametric witness contract +// against DNSQuorumClient over a fake-backend with 3 healthy servers. +// Behavioral drift between the in-memory reference and the quorum +// surface fails here. +func TestDNSQuorum_ContractSuite(t *testing.T) { + t.Parallel() + contracttest.RunContractSuite(t, func() *contracttest.Backend { + servers := []string{"ns1", "ns2", "ns3"} + be := newFakeBackend(servers) + mkClient := func(slot string) witness.Client { + c, err := New(servers, "tsig", "lease.openova.io", slot, be, be) + if err != nil { + t.Fatalf("New: %v", err) + } + return c + } + return &contracttest.Backend{ + A: mkClient("ns/cr-main"), + B: mkClient("ns/cr-main"), + Other: mkClient("ns/cr-other"), + // DNS-quorum has no in-band clock — the witness clock + // IS wall-clock (TXT TTLs encode an absolute + // expires-at). Advance must really sleep. + Advance: time.Sleep, + } + }) +} + +// TestDNSQuorum_ConstructorValidation +func TestDNSQuorum_ConstructorValidation(t *testing.T) { + t.Parallel() + cases := []struct { + name string + servers []string + domain string + slot string + wantErr bool + }{ + {"too few servers", []string{"a", "b"}, "lease.openova.io", "ns/cr", true}, + {"missing domain", []string{"a", "b", "c"}, "", "ns/cr", true}, + {"missing slot", []string{"a", "b", "c"}, "lease.openova.io", "", true}, + {"happy path", []string{"a", "b", "c"}, "lease.openova.io", "ns/cr", false}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + be := newFakeBackend(tc.servers) + _, err := New(tc.servers, "k", tc.domain, tc.slot, be, be) + if tc.wantErr { + if err == nil { + t.Fatalf("expected error") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + } +} + +// TestDNSQuorum_QuorumWith3of3 — happy path: all three servers ack. +func TestDNSQuorum_QuorumWith3of3(t *testing.T) { + t.Parallel() + servers := []string{"ns1", "ns2", "ns3"} + be := newFakeBackend(servers) + c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be) + + st, err := c.Acquire(context.Background(), "fsn", 30*time.Second) + if err != nil { + t.Fatalf("Acquire: %v", err) + } + if st.Holder != "fsn" { + t.Fatalf("Holder = %q want fsn", st.Holder) + } + // All three servers should have the value. + for _, s := range servers { + v, _ := be.ReadTXT(context.Background(), s, c.fqdn()) + if len(v) == 0 || v[0] == "" { + t.Fatalf("server %s missing value", s) + } + } +} + +// TestDNSQuorum_QuorumWith2of3_AcquireSucceeds — one server is +// failed before Acquire; the remaining 2/3 are quorum, Acquire must +// succeed. +func TestDNSQuorum_QuorumWith2of3_AcquireSucceeds(t *testing.T) { + t.Parallel() + servers := []string{"ns1", "ns2", "ns3"} + be := newFakeBackend(servers) + be.fail("ns3") + c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be) + + st, err := c.Acquire(context.Background(), "fsn", 30*time.Second) + if err != nil { + t.Fatalf("Acquire 2-of-3 failed: %v", err) + } + if st.Holder != "fsn" { + t.Fatalf("Holder = %q want fsn", st.Holder) + } +} + +// TestDNSQuorum_BelowQuorum_AcquireFails — two of three servers are +// failed, leaving only 1/3 healthy; Acquire fails defensively. +func TestDNSQuorum_BelowQuorum_AcquireFails(t *testing.T) { + t.Parallel() + servers := []string{"ns1", "ns2", "ns3"} + be := newFakeBackend(servers) + be.fail("ns2") + be.fail("ns3") + c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be) + + _, err := c.Acquire(context.Background(), "fsn", 30*time.Second) + if !errors.Is(err, witness.ErrLeaseHeldByAnother) { + t.Fatalf("Acquire below-quorum: err = %v want ErrLeaseHeldByAnother (defensive)", err) + } +} + +// TestDNSQuorum_SplitBrain_1_1_1_TreatedAsHeldByAnother — three +// distinct values across three servers. Defensive: no quorum → +// ErrLeaseHeldByAnother. +func TestDNSQuorum_SplitBrain_1_1_1_TreatedAsHeldByAnother(t *testing.T) { + t.Parallel() + servers := []string{"ns1", "ns2", "ns3"} + be := newFakeBackend(servers) + c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be) + fqdn := c.fqdn() + + be.setRaw("ns1", fqdn, fmt.Sprintf("fsn|%s|5", time.Now().Add(time.Minute).UTC().Format(time.RFC3339))) + be.setRaw("ns2", fqdn, fmt.Sprintf("hel|%s|5", time.Now().Add(time.Minute).UTC().Format(time.RFC3339))) + be.setRaw("ns3", fqdn, fmt.Sprintf("ash|%s|5", time.Now().Add(time.Minute).UTC().Format(time.RFC3339))) + + _, err := c.Acquire(context.Background(), "iad", 30*time.Second) + if !errors.Is(err, witness.ErrLeaseHeldByAnother) { + t.Fatalf("split-brain Acquire: err = %v want ErrLeaseHeldByAnother", err) + } +} + +// TestDNSQuorum_2of3_DivergentValue_QuorumWins — two of three +// servers agree on holder=fsn, third has a stale "hel" value. +// Acquire by hel must fail (held by another); Acquire by fsn must +// succeed (re-acquire). +func TestDNSQuorum_2of3_DivergentValue_QuorumWins(t *testing.T) { + t.Parallel() + servers := []string{"ns1", "ns2", "ns3"} + be := newFakeBackend(servers) + c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be) + fqdn := c.fqdn() + + expires := time.Now().Add(time.Minute).UTC().Format(time.RFC3339) + val := fmt.Sprintf("fsn|%s|7", expires) + be.setRaw("ns1", fqdn, val) + be.setRaw("ns2", fqdn, val) + be.setRaw("ns3", fqdn, fmt.Sprintf("hel|%s|7", expires)) + + if _, err := c.Acquire(context.Background(), "hel", 30*time.Second); !errors.Is(err, witness.ErrLeaseHeldByAnother) { + t.Fatalf("hel Acquire: err = %v want ErrLeaseHeldByAnother", err) + } + if _, err := c.Acquire(context.Background(), "fsn", 30*time.Second); err != nil { + t.Fatalf("fsn re-Acquire: %v", err) + } +} + +// TestDNSQuorum_FactoryFromCfg — the registered factory parses cfg +// correctly and resolves SecretRef TSIG keys. +func TestDNSQuorum_FactoryFromCfg(t *testing.T) { + t.Parallel() + secrets := witness.SecretReaderFunc(func(_ context.Context, name, key string) ([]byte, error) { + if name != "tsig-secret" || key != "tsig" { + return nil, errors.New("not found") + } + return []byte("real-tsig-key"), nil + }) + + cfg := map[string]any{ + "slot": "ns/cr", + "dnsServers": []any{"ns1", "ns2", "ns3"}, + "domain": "lease.openova.io", + "tsigSecretRef": map[string]any{ + "name": "tsig-secret", + "key": "tsig", + }, + } + cli, err := factory(cfg, secrets) + if err != nil { + t.Fatalf("factory: %v", err) + } + c := cli.(*DNSQuorumClient) + if c.TSIGKey != "real-tsig-key" { + t.Fatalf("TSIGKey = %q want real-tsig-key", c.TSIGKey) + } + if c.Domain != "lease.openova.io" { + t.Fatalf("Domain = %q want lease.openova.io", c.Domain) + } + if len(c.Servers) != 3 { + t.Fatalf("Servers len = %d want 3", len(c.Servers)) + } +} + +func TestDNSQuorum_FactoryAcceptsResolversField(t *testing.T) { + t.Parallel() + cfg := map[string]any{ + "slot": "ns/cr", + "resolvers": []any{"1.1.1.1", "8.8.8.8", "9.9.9.9"}, + "domain": "lease.openova.io", + "tsigKey": "k", + } + cli, err := factory(cfg, nil) + if err != nil { + t.Fatalf("factory: %v", err) + } + c := cli.(*DNSQuorumClient) + if len(c.Servers) != 3 { + t.Fatalf("Servers len = %d want 3", len(c.Servers)) + } +} + +func TestDNSQuorum_FactoryRejectsMissingFields(t *testing.T) { + t.Parallel() + cases := []struct { + name string + cfg map[string]any + }{ + {"empty", map[string]any{}}, + {"missing servers", map[string]any{"slot": "x", "tsigKey": "k", "domain": "lease.openova.io"}}, + {"too few servers", map[string]any{"slot": "x", "dnsServers": []any{"a", "b"}, "domain": "lease.openova.io", "tsigKey": "k"}}, + { + "secretRef without reader", + map[string]any{ + "slot": "x", + "dnsServers": []any{"a", "b", "c"}, + "domain": "lease.openova.io", + "tsigSecretRef": map[string]any{ + "name": "n", + "key": "k", + }, + }, + }, + } + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + _, err := factory(tc.cfg, nil) + if err == nil { + t.Fatalf("expected error") + } + }) + } +} + +// TestDNSQuorum_EncodeDecodeRoundTrip — wire-format invariant. +func TestDNSQuorum_EncodeDecodeRoundTrip(t *testing.T) { + t.Parallel() + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + cases := []witness.State{ + {}, + {Holder: "fsn", AcquiredAt: now, ExpiresAt: now.Add(30 * time.Second), Generation: 1}, + {Holder: "hel-deu-eu-1", AcquiredAt: now, ExpiresAt: now.Add(30 * time.Second), Generation: 999}, + } + for i, st := range cases { + v := encodeValue(st) + got, err := decodeValue(v) + if err != nil { + t.Fatalf("case %d: decode: %v", i, err) + } + if got.Holder != st.Holder { + t.Errorf("case %d: Holder = %q want %q", i, got.Holder, st.Holder) + } + if got.Generation != st.Generation { + t.Errorf("case %d: Generation = %d want %d", i, got.Generation, st.Generation) + } + if !got.AcquiredAt.Equal(st.AcquiredAt) { + t.Errorf("case %d: AcquiredAt = %v want %v", i, got.AcquiredAt, st.AcquiredAt) + } + if !got.ExpiresAt.Equal(st.ExpiresAt) { + t.Errorf("case %d: ExpiresAt = %v want %v", i, got.ExpiresAt, st.ExpiresAt) + } + } +} + +// TestDNSQuorum_DecodeLegacy3Field — backward-compat: an older +// controller may have written `||` records. +// Decode must accept and degrade gracefully (AcquiredAt zero). +func TestDNSQuorum_DecodeLegacy3Field(t *testing.T) { + t.Parallel() + now := time.Date(2026, 5, 9, 12, 0, 0, 0, time.UTC) + v := fmt.Sprintf("fsn|%s|7", now.Format(time.RFC3339)) + st, err := decodeValue(v) + if err != nil { + t.Fatalf("decode legacy: %v", err) + } + if st.Holder != "fsn" { + t.Errorf("Holder = %q", st.Holder) + } + if st.Generation != 7 { + t.Errorf("Generation = %d", st.Generation) + } + if !st.ExpiresAt.Equal(now) { + t.Errorf("ExpiresAt = %v want %v", st.ExpiresAt, now) + } +} + +// TestDNSQuorum_EncodeSlot — DNS-label safety. +func TestDNSQuorum_EncodeSlot(t *testing.T) { + t.Parallel() + cases := []struct { + in, want string + }{ + {"foo", "foo"}, + {"ns/cr", "ns_cr"}, + {"acme/api-prod", "acme_api-prod"}, + {"a/b/c", "a_b_c"}, + } + for _, tc := range cases { + if got := encodeSlot(tc.in); got != tc.want { + t.Errorf("encodeSlot(%q) = %q want %q", tc.in, got, tc.want) + } + } +} + +// TestDNSQuorum_WriterNotConfigured — read-only DNSQuorumClient +// surfaces a clear error on write attempts. +func TestDNSQuorum_WriterNotConfigured(t *testing.T) { + t.Parallel() + servers := []string{"a", "b", "c"} + be := newFakeBackend(servers) + c, _ := New(servers, "k", "lease.openova.io", "ns/cr", nil, be) + _, err := c.Acquire(context.Background(), "fsn", 30*time.Second) + if err == nil { + t.Fatalf("expected error") + } + if !strings.Contains(err.Error(), "Writer not configured") { + t.Fatalf("got: %v", err) + } +} + +// TestDNSQuorum_ContextCancel — cancel propagates. +func TestDNSQuorum_ContextCancel(t *testing.T) { + t.Parallel() + servers := []string{"a", "b", "c"} + be := newFakeBackend(servers) + c, _ := New(servers, "k", "lease.openova.io", "ns/cr", be, be) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if _, err := c.Acquire(ctx, "fsn", 30*time.Second); !errors.Is(err, context.Canceled) { + t.Fatalf("Acquire after cancel: %v", err) + } +} diff --git a/core/controllers/continuum/internal/witness/testing/contract.go b/core/controllers/continuum/internal/witness/testing/contract.go new file mode 100644 index 00000000..ebcea512 --- /dev/null +++ b/core/controllers/continuum/internal/witness/testing/contract.go @@ -0,0 +1,356 @@ +// Package testing exposes the witness behavioral contract suite as a +// parametric helper. Every concrete witness.Client implementation +// (InMemoryClient, cloudflarekv.CFKVClient, dnsquorum.DNSQuorumClient) +// runs THIS suite against itself so the on-the-wire CAS path matches +// the in-memory reference exactly. +// +// Why a separate package: putting the contract suite here (rather than +// in `witness/`) lets the cloudflarekv + dnsquorum impls IMPORT it +// without creating a test-only dependency cycle (the impls live in +// child packages of `witness/`, so they can import `witness/testing` +// without dragging the impl-specific httptest fixtures into +// `witness/`). +// +// The suite is parametric on a Backend factory. The factory returns: +// +// - Two Client instances bound to the SAME slot (so race tests can +// simulate two regions contending for the same lease) +// - A separate Client on a DIFFERENT slot (slot-isolation test) +// - An Advance(d) hook that fast-forwards time by d (for TTL tests) +// +// Implementations that can't fast-forward (cloudflarekv against a +// real Worker; dnsquorum against real PowerDNS) supply an Advance +// that real-time-sleeps through `d`. The contract suite picks small +// TTLs (≤ 200ms) so a real-time sleep stays bounded. +// +// Per K-Cont-2's K-Cont-3 concerns (item h), the contract verifies: +// +// 1. CAS atomicity — Acquire rejects when the slot is held by +// another non-expired holder. +// 2. Renew failure mode — Renew returns ErrLeaseLost when TTL has +// elapsed OR when another holder owns the slot. +// 3. Release idempotency — non-holder Release is a no-op. +// 4. Generation monotonicity — Acquire AND Renew bump State.Generation. +// 5. Slot isolation — different cfg["slot"] values produce +// independent leases. +// 6. Witness clock skew — verified per-impl: server-authoritative +// for CF (the server stamps timestamps + bumps Generation); +// fence-token + 2-of-3 for DNS-quorum (any single resolver may +// drift by one generation without breaking quorum). +// 7. Witness-side TTL eviction — Read returns "free" after expiry. +package testing + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/openova-io/openova/core/controllers/continuum/internal/witness" +) + +// Backend is what each impl-specific test file constructs and passes +// to RunContractSuite. It bundles two same-slot clients (for CAS race +// scenarios), one different-slot client (for isolation), and an +// Advance hook for TTL tests. +type Backend struct { + // A and B are two Client instances bound to the SAME slot. Tests + // use them to simulate two regions racing for the same lease. + A witness.Client + B witness.Client + + // Other is a Client bound to a DIFFERENT slot — used to verify + // slot isolation (an Acquire on Other must not affect A/B). + Other witness.Client + + // Advance moves the witness's effective time forward by d. + // Implementations that have an injectable clock (InMemoryClient) + // implement this as a clock-set; impls that only support + // real-time (cloudflarekv against a live Worker) implement as + // time.Sleep(d). The contract suite never asks for d > 200ms so + // a real sleep is bounded. + Advance func(d time.Duration) + + // Cleanup is called by RunContractSuite at the end. Optional. + Cleanup func() +} + +// RunContractSuite runs the full witness behavioral contract against +// the supplied Backend factory. The factory is invoked once per +// sub-test so each test gets a clean slot. +// +// Usage from inmemory_test.go: +// +// contracttest.RunContractSuite(t, func() *contracttest.Backend { +// store := witness.NewInMemoryStore() +// clk := &fakeClock{t: time.Date(...)} +// store.SetClock(clk.Now) +// return &contracttest.Backend{ +// A: store.Client("ns/cr"), +// B: store.Client("ns/cr"), +// Other: store.Client("ns/other"), +// Advance: clk.Advance, +// } +// }) +// +// Each implementation MUST pass every sub-test or the production +// surfaces will diverge. +func RunContractSuite(t *testing.T, factory func() *Backend) { + t.Helper() + + cases := []struct { + name string + fn func(t *testing.T, b *Backend) + }{ + {"AcquireOnEmptySlot", testAcquireOnEmpty}, + {"AcquireBlockedByAnother", testAcquireBlocked}, + {"AcquireAfterExpiry", testAcquireAfterExpiry}, + {"AcquireSameHolderExtendsTTL", testAcquireSameHolder}, + {"RenewExtendsTTLAndBumpsGeneration", testRenewExtends}, + {"RenewAfterExpiryReturnsLost", testRenewAfterExpiry}, + {"RenewByNonHolderReturnsLost", testRenewByNonHolder}, + {"ReleaseIdempotent", testReleaseIdempotent}, + {"ReleaseByNonHolderIsNoOp", testReleaseByNonHolder}, + {"SlotIsolation", testSlotIsolation}, + {"GenerationMonotonicityAcrossOps", testGenerationMonotonic}, + {"ReadOnEmptySlot", testReadOnEmpty}, + {"ReadAfterTTLEvictionReportsFree", testReadAfterTTLEviction}, + {"ContextCancel", testContextCancel}, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + b := factory() + if b.Cleanup != nil { + t.Cleanup(b.Cleanup) + } + tc.fn(t, b) + }) + } +} + +// shortTTL is the TTL the suite uses for tests that need expiry. It +// is small (100ms) so impls that can't fast-forward time pay a +// bounded real-time penalty. +const shortTTL = 100 * time.Millisecond + +// longTTL is used when the test does not depend on expiry — keeping +// it well above the test's wall-clock so flakes from CI scheduling +// jitter don't expire a lease mid-test. +const longTTL = 30 * time.Second + +func testAcquireOnEmpty(t *testing.T, b *Backend) { + st, err := b.A.Acquire(context.Background(), "fsn", longTTL) + if err != nil { + t.Fatalf("Acquire: %v", err) + } + if st.Holder != "fsn" { + t.Fatalf("Holder = %q want fsn", st.Holder) + } + if st.Generation < 1 { + t.Fatalf("Generation = %d want >= 1", st.Generation) + } + if st.ExpiresAt.IsZero() { + t.Fatalf("ExpiresAt is zero") + } + if !st.ExpiresAt.After(st.AcquiredAt) { + t.Fatalf("ExpiresAt %v not after AcquiredAt %v", st.ExpiresAt, st.AcquiredAt) + } +} + +func testAcquireBlocked(t *testing.T, b *Backend) { + if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil { + t.Fatalf("first Acquire: %v", err) + } + if _, err := b.B.Acquire(context.Background(), "hel", longTTL); !errors.Is(err, witness.ErrLeaseHeldByAnother) { + t.Fatalf("second Acquire: err = %v want ErrLeaseHeldByAnother", err) + } +} + +func testAcquireAfterExpiry(t *testing.T, b *Backend) { + if _, err := b.A.Acquire(context.Background(), "fsn", shortTTL); err != nil { + t.Fatalf("first Acquire: %v", err) + } + b.Advance(shortTTL + 50*time.Millisecond) + st, err := b.B.Acquire(context.Background(), "hel", longTTL) + if err != nil { + t.Fatalf("post-expiry Acquire: %v", err) + } + if st.Holder != "hel" { + t.Fatalf("Holder = %q want hel", st.Holder) + } +} + +func testAcquireSameHolder(t *testing.T, b *Backend) { + st1, err := b.A.Acquire(context.Background(), "fsn", longTTL) + if err != nil { + t.Fatalf("first Acquire: %v", err) + } + b.Advance(10 * time.Millisecond) + st2, err := b.A.Acquire(context.Background(), "fsn", longTTL) + if err != nil { + t.Fatalf("second Acquire (same holder): %v", err) + } + if !st2.AcquiredAt.Equal(st1.AcquiredAt) { + t.Fatalf("AcquiredAt drifted on re-acquire: %v vs %v", st2.AcquiredAt, st1.AcquiredAt) + } + if !st2.ExpiresAt.After(st1.ExpiresAt) && !st2.ExpiresAt.Equal(st1.ExpiresAt) { + // For server-authoritative impls the new ExpiresAt may equal + // the old one if the server's clock granularity is coarse; + // allow equal but never less. + t.Fatalf("ExpiresAt regressed on re-acquire: %v vs %v", st2.ExpiresAt, st1.ExpiresAt) + } +} + +func testRenewExtends(t *testing.T, b *Backend) { + st1, err := b.A.Acquire(context.Background(), "fsn", longTTL) + if err != nil { + t.Fatalf("Acquire: %v", err) + } + b.Advance(10 * time.Millisecond) + st2, err := b.A.Renew(context.Background(), "fsn", longTTL) + if err != nil { + t.Fatalf("Renew: %v", err) + } + if st2.Generation <= st1.Generation { + t.Fatalf("Renew did not bump Generation: %d -> %d", st1.Generation, st2.Generation) + } + if st2.ExpiresAt.Before(st1.ExpiresAt) { + t.Fatalf("Renew did not extend ExpiresAt: %v -> %v", st1.ExpiresAt, st2.ExpiresAt) + } +} + +func testRenewAfterExpiry(t *testing.T, b *Backend) { + if _, err := b.A.Acquire(context.Background(), "fsn", shortTTL); err != nil { + t.Fatalf("Acquire: %v", err) + } + b.Advance(shortTTL + 50*time.Millisecond) + if _, err := b.A.Renew(context.Background(), "fsn", longTTL); !errors.Is(err, witness.ErrLeaseLost) { + t.Fatalf("Renew after expiry: err = %v want ErrLeaseLost", err) + } +} + +func testRenewByNonHolder(t *testing.T, b *Backend) { + if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil { + t.Fatalf("Acquire: %v", err) + } + if _, err := b.B.Renew(context.Background(), "hel", longTTL); !errors.Is(err, witness.ErrLeaseLost) { + t.Fatalf("Renew by non-holder: err = %v want ErrLeaseLost", err) + } +} + +func testReleaseIdempotent(t *testing.T, b *Backend) { + if err := b.A.Release(context.Background(), "fsn"); err != nil { + t.Fatalf("Release on empty slot: %v", err) + } + if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil { + t.Fatalf("Acquire: %v", err) + } + if err := b.A.Release(context.Background(), "fsn"); err != nil { + t.Fatalf("Release: %v", err) + } + if err := b.A.Release(context.Background(), "fsn"); err != nil { + t.Fatalf("Release again: %v", err) + } + st, err := b.A.Read(context.Background()) + if err != nil { + t.Fatalf("Read: %v", err) + } + if st.Holder != "" { + t.Fatalf("Holder = %q want empty after Release", st.Holder) + } +} + +func testReleaseByNonHolder(t *testing.T, b *Backend) { + if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil { + t.Fatalf("Acquire: %v", err) + } + if err := b.B.Release(context.Background(), "hel"); err != nil { + t.Fatalf("Release by non-holder: %v", err) + } + st, _ := b.A.Read(context.Background()) + if st.Holder != "fsn" { + t.Fatalf("Holder = %q want fsn — non-holder Release should not evict", st.Holder) + } +} + +func testSlotIsolation(t *testing.T, b *Backend) { + if _, err := b.A.Acquire(context.Background(), "fsn", longTTL); err != nil { + t.Fatalf("Acquire A: %v", err) + } + st, err := b.Other.Acquire(context.Background(), "hel", longTTL) + if err != nil { + t.Fatalf("Acquire Other: %v", err) + } + if st.Holder != "hel" { + t.Fatalf("Other.Holder = %q want hel", st.Holder) + } +} + +func testGenerationMonotonic(t *testing.T, b *Backend) { + st1, err := b.A.Acquire(context.Background(), "fsn", longTTL) + if err != nil { + t.Fatalf("Acquire: %v", err) + } + st2, err := b.A.Renew(context.Background(), "fsn", longTTL) + if err != nil { + t.Fatalf("Renew: %v", err) + } + if st2.Generation <= st1.Generation { + t.Fatalf("Generation did not bump on Renew: %d -> %d", st1.Generation, st2.Generation) + } + st3, err := b.A.Acquire(context.Background(), "fsn", longTTL) + if err != nil { + t.Fatalf("re-Acquire: %v", err) + } + if st3.Generation <= st2.Generation { + t.Fatalf("Generation did not bump on re-Acquire: %d -> %d", st2.Generation, st3.Generation) + } +} + +func testReadOnEmpty(t *testing.T, b *Backend) { + st, err := b.A.Read(context.Background()) + if err != nil { + t.Fatalf("Read on empty: %v", err) + } + if st.Holder != "" { + t.Fatalf("empty slot Holder = %q want empty", st.Holder) + } +} + +func testReadAfterTTLEviction(t *testing.T, b *Backend) { + if _, err := b.A.Acquire(context.Background(), "fsn", shortTTL); err != nil { + t.Fatalf("Acquire: %v", err) + } + b.Advance(shortTTL + 50*time.Millisecond) + // Definitive eviction proof: a different holder can now Acquire. + // (Comparing the State.ExpiresAt against time.Now() doesn't work + // for impls that use a server-side clock decoupled from + // wall-clock — the in-memory + CFKV impls stamp ExpiresAt with + // the witness's clock, not the test's wall-clock.) + st, err := b.B.Acquire(context.Background(), "hel", longTTL) + if err != nil { + t.Fatalf("post-expiry Acquire by other: %v — TTL eviction not honored", err) + } + if st.Holder != "hel" { + t.Fatalf("post-eviction Holder = %q want hel", st.Holder) + } +} + +func testContextCancel(t *testing.T, b *Backend) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if _, err := b.A.Acquire(ctx, "fsn", longTTL); !errors.Is(err, context.Canceled) { + t.Fatalf("Acquire after cancel: err = %v want context.Canceled", err) + } + if _, err := b.A.Renew(ctx, "fsn", longTTL); !errors.Is(err, context.Canceled) { + t.Fatalf("Renew after cancel: err = %v want context.Canceled", err) + } + if err := b.A.Release(ctx, "fsn"); !errors.Is(err, context.Canceled) { + t.Fatalf("Release after cancel: err = %v want context.Canceled", err) + } + if _, err := b.A.Read(ctx); !errors.Is(err, context.Canceled) { + t.Fatalf("Read after cancel: err = %v want context.Canceled", err) + } +} diff --git a/core/controllers/continuum/internal/witness/witness.go b/core/controllers/continuum/internal/witness/witness.go index 3c7a0e23..11397f7f 100644 --- a/core/controllers/continuum/internal/witness/witness.go +++ b/core/controllers/continuum/internal/witness/witness.go @@ -45,9 +45,10 @@ var ErrLeaseHeldByAnother = errors.New("witness: lease held by another holder") // itself as demoted. var ErrLeaseLost = errors.New("witness: lease lost") -// ErrNotImplemented — the K-Cont-3 implementations are not wired in -// this controller build. Returned by the Selector for kinds -// `cloudflare-kv` and `dns-quorum`. +// ErrNotImplemented — the requested kind has no registered factory +// in this build. K-Cont-3 (#1101) registers `cloudflare-kv` and +// `dns-quorum` via blank-import in continuum/cmd/main.go; tests can +// register fakes via Register. var ErrNotImplemented = errors.New("witness: implementation not built into this controller") // State is the full lease snapshot returned by Read. @@ -146,16 +147,101 @@ func (f SelectorFunc) Select(kind string, cfg map[string]any) (Client, error) { return f(kind, cfg) } -// DefaultSelector is the production Selector. K-Cont-2 returns -// ErrNotImplemented for the two real kinds (cloudflare-kv, -// dns-quorum) so the system fails CLOSED until K-Cont-3 lands. The -// special kind `in-memory` is recognised for envtest / smoke -// scenarios — DO NOT enable in production. +// Factory constructs a Client for a kind from cfg. Returned errors +// from the Factory propagate out of Selector.Select. +// +// Per K-Cont-3's contract, cfg always carries: +// - "slot" (string) — "/" — mandatory; isolates per-CR +// - kind-specific keys (baseURL, dnsServers, tokenSecretRef, etc.) +// +// SecretReader (or nil) lets the Factory resolve K8s Secret refs +// without dragging client-go into the witness package. +type Factory func(cfg map[string]any, secrets SecretReader) (Client, error) + +// SecretReader resolves a K8s SecretRef (name + key from a known +// namespace) into a plaintext value. The DefaultSelector wraps the +// controller's K8s client; tests pass an in-memory implementation. +// +// Per Inviolable Principle #5 the returned plaintext stays in memory +// only long enough to be handed to the wire client. NEVER log. +type SecretReader interface { + // ReadSecret returns the bytes at secretName/key. Implementations + // MUST NOT log the value. + ReadSecret(ctx context.Context, secretName, key string) ([]byte, error) +} + +// SecretReaderFunc adapts a plain function to SecretReader. +type SecretReaderFunc func(ctx context.Context, secretName, key string) ([]byte, error) + +// ReadSecret implements SecretReader. +func (f SecretReaderFunc) ReadSecret(ctx context.Context, secretName, key string) ([]byte, error) { + return f(ctx, secretName, key) +} + +// registry holds package-global Factory bindings registered by impl +// packages at init() time (typically via blank-import in cmd/main.go). +// +// We use a registry rather than direct imports because the impl +// packages (cloudflarekv, dnsquorum) live UNDER `internal/witness/` +// and therefore CANNOT be imported by `internal/witness/` itself +// without a cycle. The registry pattern decouples the dispatch table +// from the concrete clients. +var ( + registryMu sync.RWMutex + registry = map[string]Factory{} +) + +// Register binds `factory` as the Client constructor for `kind`. +// Calling Register twice for the same kind PANICS — the second +// caller is almost certainly a duplicate import / wiring bug. +// +// Impl packages call Register from their init() function. cmd/main.go +// in the consuming binary blank-imports the impl packages it wants +// active. Tests can pre-register fakes by calling Register directly. +func Register(kind string, factory Factory) { + registryMu.Lock() + defer registryMu.Unlock() + if _, exists := registry[kind]; exists { + panic(fmt.Sprintf("witness: duplicate Register for kind %q", kind)) + } + registry[kind] = factory +} + +// Unregister removes a kind. Test-only — production code never +// calls this. (Public so test packages outside `witness/` can use it +// when wiring/unwiring fakes.) +func Unregister(kind string) { + registryMu.Lock() + defer registryMu.Unlock() + delete(registry, kind) +} + +// lookup returns the registered Factory for kind, or false. +func lookup(kind string) (Factory, bool) { + registryMu.RLock() + defer registryMu.RUnlock() + f, ok := registry[kind] + return f, ok +} + +// DefaultSelector is the production Selector. The `in-memory` kind +// is built-in (gated by InMemoryAllowed); `cloudflare-kv` and +// `dns-quorum` come from the package registry — when their impl +// packages are imported (via blank-import in cmd/main.go), they +// register a Factory at init() time. +// +// Per Inviolable Principle #5 the SecretReader is the ONLY path the +// real impls have to credentials — they never read env vars directly. type DefaultSelector struct { // InMemoryAllowed gates the `in-memory` kind. Default false in // production; tests flip true. InMemoryAllowed bool + // SecretReader resolves K8s Secret refs for the real impls. May + // be nil for tests that only exercise in-memory or fake-registered + // impls. + SecretReader SecretReader + // inMemoryShared is the shared backing store across all // in-memory clients in this process — Continuum CRs in the same // process share a witness so the multi-CR isolation tests work @@ -168,10 +254,6 @@ type DefaultSelector struct { // Select implements Selector. func (s *DefaultSelector) Select(kind string, cfg map[string]any) (Client, error) { switch kind { - case "cloudflare-kv": - return nil, fmt.Errorf("%w: cloudflare-kv (K-Cont-3)", ErrNotImplemented) - case "dns-quorum": - return nil, fmt.Errorf("%w: dns-quorum (K-Cont-3)", ErrNotImplemented) case "in-memory": if !s.InMemoryAllowed { return nil, fmt.Errorf("witness: in-memory selector is test-only; refused in production") @@ -181,15 +263,14 @@ func (s *DefaultSelector) Select(kind string, cfg map[string]any) (Client, error if s.inMemoryShared == nil { s.inMemoryShared = NewInMemoryStore() } - // Every in-memory client gets its own per-CR slot keyed by - // the `slot` config field. Tests pass slot="ns/name" so each - // Continuum CR gets an isolated lease. slot, _ := cfg["slot"].(string) if slot == "" { slot = "default" } return s.inMemoryShared.Client(slot), nil - default: - return nil, fmt.Errorf("witness: unknown kind %q (expected one of: cloudflare-kv, dns-quorum)", kind) } + if f, ok := lookup(kind); ok { + return f(cfg, s.SecretReader) + } + return nil, fmt.Errorf("%w: %s", ErrNotImplemented, kind) } diff --git a/core/controllers/continuum/internal/witness/witness_test.go b/core/controllers/continuum/internal/witness/witness_test.go index 170dee04..132a8447 100644 --- a/core/controllers/continuum/internal/witness/witness_test.go +++ b/core/controllers/continuum/internal/witness/witness_test.go @@ -1,10 +1,18 @@ -// Tests for the witness contract + InMemoryClient. +// Tests for the witness contract + InMemoryClient + Selector dispatch. // -// These tests double as the contract spec: K-Cont-3's cloudflare-kv -// and dns-quorum implementations MUST run this same suite (or a -// faithful adaptation that drives a fake transport). +// As of K-Cont-3 (#1101) the BEHAVIORAL contract — Acquire / Renew / +// Release / Read invariants — is encoded in +// `internal/witness/testing/contract.go` and exported as +// `RunContractSuite(t, factoryFn)`. THIS file invokes that suite for +// the in-memory backend; the K-Cont-3 cloudflarekv + dnsquorum impls +// invoke the SAME suite from their own test files. Behavioral drift +// between the in-memory reference and a wire impl surfaces as a test +// failure in the impl's package. +// +// What stays here: tests for the Selector dispatch + State.IsHeldBy +// helper + the SelectorFunc adapter — all in-package surfaces. -package witness +package witness_test import ( "context" @@ -12,10 +20,19 @@ import ( "sync" "testing" "time" + + "github.com/openova-io/openova/core/controllers/continuum/internal/witness" + // Blank-import the K-Cont-3 impl packages so their init() + // register Factory bindings on the witness package registry. + // The cmd/main.go binary does the same — keeping the test + // import in sync ensures behaviour parity. + _ "github.com/openova-io/openova/core/controllers/continuum/internal/witness/cloudflarekv" + _ "github.com/openova-io/openova/core/controllers/continuum/internal/witness/dnsquorum" + contracttest "github.com/openova-io/openova/core/controllers/continuum/internal/witness/testing" ) -// frozenClock returns a clock that the test can advance manually. -// Each call to Now() returns the current value of t. +// frozenClock returns a clock the test can advance manually. Used by +// the in-memory backend factory below. type frozenClock struct { mu sync.Mutex t time.Time @@ -37,217 +54,29 @@ func newClock() *frozenClock { return &frozenClock{t: time.Date(2026, 5, 9, 0, 0, 0, 0, time.UTC)} } -func TestInMemory_AcquireOnEmpty(t *testing.T) { +// TestInMemory_ContractSuite runs the full witness behavioral contract +// against InMemoryClient. The cloudflarekv + dnsquorum impls run the +// SAME suite — see their respective _test.go files. Any divergence +// fails here and there simultaneously. +func TestInMemory_ContractSuite(t *testing.T) { t.Parallel() - store := NewInMemoryStore() - clk := newClock() - store.SetClock(clk.Now) - c := store.Client("slotA") - - st, err := c.Acquire(context.Background(), "fsn", 30*time.Second) - if err != nil { - t.Fatalf("Acquire empty: %v", err) - } - if st.Holder != "fsn" { - t.Fatalf("Holder = %q want fsn", st.Holder) - } - if st.Generation != 1 { - t.Fatalf("Generation = %d want 1", st.Generation) - } - if st.AcquiredAt != clk.Now() { - t.Fatalf("AcquiredAt should equal clock now") - } - if got, want := st.ExpiresAt, clk.Now().Add(30*time.Second); got != want { - t.Fatalf("ExpiresAt = %v want %v", got, want) - } -} - -func TestInMemory_AcquireBlockedByAnother(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - clk := newClock() - store.SetClock(clk.Now) - a := store.Client("slotA") - b := store.Client("slotA") // same slot - - if _, err := a.Acquire(context.Background(), "fsn", 30*time.Second); err != nil { - t.Fatalf("first Acquire: %v", err) - } - - if _, err := b.Acquire(context.Background(), "hel", 30*time.Second); !errors.Is(err, ErrLeaseHeldByAnother) { - t.Fatalf("second Acquire: err = %v want ErrLeaseHeldByAnother", err) - } -} - -func TestInMemory_AcquireAfterExpiry(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - clk := newClock() - store.SetClock(clk.Now) - a := store.Client("slotA") - b := store.Client("slotA") - - if _, err := a.Acquire(context.Background(), "fsn", 5*time.Second); err != nil { - t.Fatalf("first Acquire: %v", err) - } - clk.Advance(10 * time.Second) // past TTL - st, err := b.Acquire(context.Background(), "hel", 30*time.Second) - if err != nil { - t.Fatalf("post-expiry Acquire: %v", err) - } - if st.Holder != "hel" { - t.Fatalf("Holder = %q want hel", st.Holder) - } -} - -func TestInMemory_AcquireSameHolderExtendsTTL(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - clk := newClock() - store.SetClock(clk.Now) - c := store.Client("slotA") - - st1, err := c.Acquire(context.Background(), "fsn", 30*time.Second) - if err != nil { - t.Fatalf("first Acquire: %v", err) - } - clk.Advance(5 * time.Second) - st2, err := c.Acquire(context.Background(), "fsn", 30*time.Second) - if err != nil { - t.Fatalf("second Acquire (same holder): %v", err) - } - if st2.AcquiredAt != st1.AcquiredAt { - t.Fatalf("AcquiredAt drifted on re-acquire: %v vs %v", st2.AcquiredAt, st1.AcquiredAt) - } - if !st2.ExpiresAt.After(st1.ExpiresAt) { - t.Fatalf("ExpiresAt did not advance on re-acquire: %v not after %v", st2.ExpiresAt, st1.ExpiresAt) - } -} - -func TestInMemory_RenewExtendsTTL(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - clk := newClock() - store.SetClock(clk.Now) - c := store.Client("slotA") - - st1, err := c.Acquire(context.Background(), "fsn", 30*time.Second) - if err != nil { - t.Fatalf("Acquire: %v", err) - } - clk.Advance(10 * time.Second) - st2, err := c.Renew(context.Background(), "fsn", 30*time.Second) - if err != nil { - t.Fatalf("Renew: %v", err) - } - if !st2.ExpiresAt.After(st1.ExpiresAt) { - t.Fatalf("Renew did not extend ExpiresAt") - } - if st2.Generation <= st1.Generation { - t.Fatalf("Renew did not bump Generation") - } -} - -func TestInMemory_RenewAfterExpiryReturnsLost(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - clk := newClock() - store.SetClock(clk.Now) - c := store.Client("slotA") - - if _, err := c.Acquire(context.Background(), "fsn", 5*time.Second); err != nil { - t.Fatalf("Acquire: %v", err) - } - clk.Advance(10 * time.Second) - if _, err := c.Renew(context.Background(), "fsn", 30*time.Second); !errors.Is(err, ErrLeaseLost) { - t.Fatalf("Renew after expiry: err = %v want ErrLeaseLost", err) - } -} - -func TestInMemory_RenewByNonHolderReturnsLost(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - clk := newClock() - store.SetClock(clk.Now) - a := store.Client("slotA") - b := store.Client("slotA") - - if _, err := a.Acquire(context.Background(), "fsn", 30*time.Second); err != nil { - t.Fatalf("Acquire: %v", err) - } - if _, err := b.Renew(context.Background(), "hel", 30*time.Second); !errors.Is(err, ErrLeaseLost) { - t.Fatalf("Renew by non-holder: err = %v want ErrLeaseLost", err) - } -} - -func TestInMemory_ReleaseIdempotent(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - c := store.Client("slotA") - - if err := c.Release(context.Background(), "fsn"); err != nil { - t.Fatalf("Release on empty slot: %v", err) - } - if _, err := c.Acquire(context.Background(), "fsn", 30*time.Second); err != nil { - t.Fatalf("Acquire: %v", err) - } - if err := c.Release(context.Background(), "fsn"); err != nil { - t.Fatalf("Release: %v", err) - } - if err := c.Release(context.Background(), "fsn"); err != nil { - t.Fatalf("Release again: %v", err) - } - st, err := c.Read(context.Background()) - if err != nil { - t.Fatalf("Read: %v", err) - } - if st.Holder != "" { - t.Fatalf("Holder = %q want empty after Release", st.Holder) - } -} - -func TestInMemory_ReleaseByNonHolderIsNoOp(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - a := store.Client("slotA") - - if _, err := a.Acquire(context.Background(), "fsn", 30*time.Second); err != nil { - t.Fatalf("Acquire: %v", err) - } - // Other region tries to release; should be a no-op (so a stale - // caller doesn't accidentally evict the live primary). - if err := a.Release(context.Background(), "hel"); err != nil { - t.Fatalf("Release by non-holder: %v", err) - } - st, _ := a.Read(context.Background()) - if st.Holder != "fsn" { - t.Fatalf("Holder = %q want fsn", st.Holder) - } -} - -func TestInMemory_SlotIsolation(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - a := store.Client("ns1/cr1") - b := store.Client("ns2/cr2") - - if _, err := a.Acquire(context.Background(), "fsn", 30*time.Second); err != nil { - t.Fatalf("Acquire A: %v", err) - } - // b should be free regardless. - st, err := b.Acquire(context.Background(), "hel", 30*time.Second) - if err != nil { - t.Fatalf("Acquire B: %v", err) - } - if st.Holder != "hel" { - t.Fatalf("B.Holder = %q want hel", st.Holder) - } + contracttest.RunContractSuite(t, func() *contracttest.Backend { + store := witness.NewInMemoryStore() + clk := newClock() + store.SetClock(clk.Now) + return &contracttest.Backend{ + A: store.Client("ns/cr-main"), + B: store.Client("ns/cr-main"), // SAME slot — race scenarios + Other: store.Client("ns/cr-other"), + Advance: clk.Advance, + } + }) } func TestInMemory_State_IsHeldBy(t *testing.T) { t.Parallel() now := time.Now() - st := State{ + st := witness.State{ Holder: "fsn", ExpiresAt: now.Add(time.Minute), } @@ -260,25 +89,14 @@ func TestInMemory_State_IsHeldBy(t *testing.T) { if st.IsHeldBy("fsn", now.Add(2*time.Minute)) { t.Fatalf("expected IsHeldBy(fsn, post-expiry) false") } - if (State{}).IsHeldBy("fsn", now) { + if (witness.State{}).IsHeldBy("fsn", now) { t.Fatalf("empty State should never claim a holder") } } -func TestDefaultSelector_NotImplemented(t *testing.T) { - t.Parallel() - s := &DefaultSelector{InMemoryAllowed: false} - for _, kind := range []string{"cloudflare-kv", "dns-quorum"} { - _, err := s.Select(kind, nil) - if !errors.Is(err, ErrNotImplemented) { - t.Fatalf("Select(%q) err = %v want ErrNotImplemented", kind, err) - } - } -} - func TestDefaultSelector_InMemoryRefusedInProd(t *testing.T) { t.Parallel() - s := &DefaultSelector{InMemoryAllowed: false} + s := &witness.DefaultSelector{InMemoryAllowed: false} _, err := s.Select("in-memory", nil) if err == nil { t.Fatalf("expected refusal for in-memory in production") @@ -287,7 +105,7 @@ func TestDefaultSelector_InMemoryRefusedInProd(t *testing.T) { func TestDefaultSelector_InMemoryAllowed(t *testing.T) { t.Parallel() - s := &DefaultSelector{InMemoryAllowed: true} + s := &witness.DefaultSelector{InMemoryAllowed: true} cli, err := s.Select("in-memory", map[string]any{"slot": "ns/foo"}) if err != nil { t.Fatalf("Select in-memory: %v", err) @@ -295,31 +113,46 @@ func TestDefaultSelector_InMemoryAllowed(t *testing.T) { if cli == nil { t.Fatalf("expected non-nil Client") } - // Same selector + same slot should yield isolated CAS state. We - // can't compare clients directly (each call returns a new - // wrapper), but we can verify the underlying store carries state - // across selects. cli2, _ := s.Select("in-memory", map[string]any{"slot": "ns/foo"}) if _, err := cli.Acquire(context.Background(), "fsn", 30*time.Second); err != nil { t.Fatalf("Acquire: %v", err) } - if _, err := cli2.Acquire(context.Background(), "hel", 30*time.Second); !errors.Is(err, ErrLeaseHeldByAnother) { + if _, err := cli2.Acquire(context.Background(), "hel", 30*time.Second); !errors.Is(err, witness.ErrLeaseHeldByAnother) { t.Fatalf("shared store: err = %v want ErrLeaseHeldByAnother", err) } } func TestDefaultSelector_UnknownKind(t *testing.T) { t.Parallel() - s := &DefaultSelector{} + s := &witness.DefaultSelector{} if _, err := s.Select("not-a-kind", nil); err == nil { t.Fatalf("expected error for unknown kind") } } +// TestDefaultSelector_RealKindsConstructorFailure — for the real +// kinds (cloudflare-kv + dns-quorum) the DefaultSelector dispatches +// to the K-Cont-3 impls; their constructors error on missing +// required cfg keys (slot, baseURL/dnsServers, token/tsig). The +// dispatch must NOT return ErrNotImplemented. +func TestDefaultSelector_RealKindsConstructorFailure(t *testing.T) { + t.Parallel() + s := &witness.DefaultSelector{} + for _, kind := range []string{"cloudflare-kv", "dns-quorum"} { + _, err := s.Select(kind, nil) + if err == nil { + t.Fatalf("Select(%q) with empty cfg: expected error", kind) + } + if errors.Is(err, witness.ErrNotImplemented) { + t.Fatalf("Select(%q): K-Cont-3 should NOT return ErrNotImplemented (impls are wired); got %v", kind, err) + } + } +} + func TestSelectorFunc(t *testing.T) { t.Parallel() - want := NewInMemoryStore().Client("x") - f := SelectorFunc(func(kind string, cfg map[string]any) (Client, error) { + want := witness.NewInMemoryStore().Client("x") + f := witness.SelectorFunc(func(kind string, cfg map[string]any) (witness.Client, error) { return want, nil }) got, err := f.Select("any", nil) @@ -330,23 +163,3 @@ func TestSelectorFunc(t *testing.T) { t.Fatalf("SelectorFunc returned wrong client") } } - -func TestInMemory_ContextCancel(t *testing.T) { - t.Parallel() - store := NewInMemoryStore() - c := store.Client("slot") - ctx, cancel := context.WithCancel(context.Background()) - cancel() - if _, err := c.Acquire(ctx, "fsn", 30*time.Second); !errors.Is(err, context.Canceled) { - t.Fatalf("Acquire after cancel: %v", err) - } - if _, err := c.Renew(ctx, "fsn", 30*time.Second); !errors.Is(err, context.Canceled) { - t.Fatalf("Renew after cancel: %v", err) - } - if err := c.Release(ctx, "fsn"); !errors.Is(err, context.Canceled) { - t.Fatalf("Release after cancel: %v", err) - } - if _, err := c.Read(ctx); !errors.Is(err, context.Canceled) { - t.Fatalf("Read after cancel: %v", err) - } -}