From e91679aeb16c3cdba41107a8779973040aaf8897 Mon Sep 17 00:00:00 2001 From: e3mrah <81884938+emrahbaysal@users.noreply.github.com> Date: Tue, 5 May 2026 14:15:24 +0400 Subject: [PATCH] fix(catalyst-api): Phase-1 watcher TLS handshake retries + reconnect substate after Pod restart (#923) (#929) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the catalyst-api Pod restarts mid-Phase-1 (image roll, kustomization apply, OOM kill), the new Pod rehydrated the deployment correctly but if the apiserver was transiently unreachable (LB warm-up race, kube-vip flap) the informer's WaitForCacheSync blocked silently for the full 60-minute WatchTimeout, leaving the wizard frozen with empty componentStates and no progress events. Live evidence (otech106 c87307c580453536, 2026-05-05): catalyst-api rolled at 10:50 from :e08d872 → :0a72150; new Pod's TLS handshake to 5.161.50.175:6443 hung indefinitely; phase1-watching status persisted without any SSE events. Three coupled fixes: 1. helmwatch/kubeconfig.go: stamp rest.Config.Timeout = 30s on every client built from the kubeconfig, so individual List/Watch/Get calls fail fast and the informer's internal retry loop has a chance to recover when transient TLS / LB flaps clear. 2. helmwatch/helmwatch.go: pre-flight reachability probe (runReachabilityProbe) before factory.Start. Probes the apiserver /version endpoint via discovery client with a 10s per-attempt timeout, retries with 5s → 60s exponential backoff up to a 10-minute overall budget. Each failed attempt emits a warn-level "Sovereign apiserver unreachable" diagnostic into the SSE stream so the wizard log pane shows live progress instead of going dark. On success we proceed to factory.Start; on budget-exhausted we still proceed (the informer's own WaitForCacheSync timeout will then classify as OutcomeFluxNotReconciling — exactly the right diagnostic for a genuinely unreachable apiserver). 3. handler/phase1_watch.go + provisioner.Result.Phase1Substate: the watcher fires OnSubstate("watcher-reconnecting") on the first failed probe and OnSubstate("watcher-watching") on the eventual success. setPhase1Substate persists the field so a /deployments/ {id} GET returns the live sub-status, surfaced to the top level in State() so the wizard banner can render "reconnecting…" while Status itself stays "phase1-watching". markPhase1Done clears the field on terminal classification. Every knob is runtime-configurable via env var per docs/INVIOLABLE-PRINCIPLES.md #4: CATALYST_PHASE1_REACHABILITY_BUDGET (overall budget, default 10m). Per-attempt timeout + backoff knobs default to helmwatch package constants and are overridable via Config fields for tests. Tests: - internal/helmwatch/reachability_test.go (NEW): 4 tests covering happy-path (single attempt succeeds, no reconnecting events), transient-then-success (2 failures + 1 success, 2 warn events, substate flips reconnecting → watching, OutcomeReady), budget- exhausted (loop falls through to informer rather than hard-failing), and context-cancel during probe (clean return within bound). - internal/handler/phase1_watch_test.go: 4 new tests covering env var override, field override beats env, OnSubstate wiring updates Result.Phase1Substate during the run and clears on terminate, and State() lifts the field to the top-level snapshot. All existing helmwatch + phase1 handler tests still pass (15s + 1.7s suites). Pre-existing failures in TestAuthHandover_*, TestPersistence_*, TestCreateDeployment_* are unchanged on main and unrelated. Co-authored-by: hatiyildiz Co-authored-by: Claude Opus 4.7 (1M context) --- .../api/internal/handler/deployments.go | 8 + .../bootstrap/api/internal/handler/handler.go | 22 + .../api/internal/handler/phase1_watch.go | 81 +++- .../api/internal/handler/phase1_watch_test.go | 177 ++++++++ .../api/internal/helmwatch/helmwatch.go | 329 ++++++++++++++ .../api/internal/helmwatch/kubeconfig.go | 87 +++- .../internal/helmwatch/reachability_test.go | 400 ++++++++++++++++++ .../api/internal/provisioner/provisioner.go | 19 + 8 files changed, 1113 insertions(+), 10 deletions(-) create mode 100644 products/catalyst/bootstrap/api/internal/helmwatch/reachability_test.go diff --git a/products/catalyst/bootstrap/api/internal/handler/deployments.go b/products/catalyst/bootstrap/api/internal/handler/deployments.go index 10b3c8d7..b1788a90 100644 --- a/products/catalyst/bootstrap/api/internal/handler/deployments.go +++ b/products/catalyst/bootstrap/api/internal/handler/deployments.go @@ -760,6 +760,14 @@ func (d *Deployment) State() map[string]any { if d.Result.Phase1Outcome != "" { out["phase1Outcome"] = d.Result.Phase1Outcome } + // Issue #923 — lift the live Phase-1 substate to the top + // level so the Sovereign Admin's wizard banner can render + // "reconnecting…" / "watching…" while Status itself stays + // "phase1-watching". Empty string is omitted so a UI client + // that pre-dates the substate field never sees a stray key. + if d.Result.Phase1Substate != "" { + out["phase1Substate"] = d.Result.Phase1Substate + } // Issues #764 + #768 — lift the handover-fire surface to the // top level so the wizard's provision-page reducer can drive // the "Open your Sovereign console →" button + auto-redirect diff --git a/products/catalyst/bootstrap/api/internal/handler/handler.go b/products/catalyst/bootstrap/api/internal/handler/handler.go index bd305981..e1759319 100644 --- a/products/catalyst/bootstrap/api/internal/handler/handler.go +++ b/products/catalyst/bootstrap/api/internal/handler/handler.go @@ -111,6 +111,28 @@ type Handler struct { phase1LatePollTimeout time.Duration phase1LatePollInterval time.Duration + // phase1Reachability — test-only override for the pre-flight + // apiserver reachability probe (issue #923). Production uses + // helmwatch.NewReachabilityProbeFromKubeconfig (discovery client + // against /version with rest.Config.Timeout = 30s). Tests inject + // a closure that returns N transient errors then nil so the + // reconnect path can be exercised without standing up an + // apiserver. Zero / nil = "use production default". + // + // phase1ReachabilityBudget / phase1ReachabilitySleep / probe + // timing knobs are the test-friendly overrides for the same + // loop. Production reads + // CATALYST_PHASE1_REACHABILITY_BUDGET on every Pod start and + // passes the parsed Duration into Config.ReachabilityOverallBudget; + // the per-attempt + backoff knobs default to the helmwatch + // constants and tests inject tiny values for fast runtimes. + phase1Reachability func(kubeconfigYAML string) func(ctx context.Context) error + phase1ReachabilityBudget time.Duration + phase1ReachabilitySleep func(ctx context.Context, d time.Duration) + phase1ReachabilityProbeTimeout time.Duration + phase1ReachabilityRetryInitial time.Duration + phase1ReachabilityRetryMax time.Duration + // kubeconfigArrivalTimeout / kubeconfigArrivalPollInterval — // runtime knobs for the polling loop that waits for cloud-init // to PUT the new Sovereign's kubeconfig. Zero falls back to the diff --git a/products/catalyst/bootstrap/api/internal/handler/phase1_watch.go b/products/catalyst/bootstrap/api/internal/handler/phase1_watch.go index 20f14649..75122d83 100644 --- a/products/catalyst/bootstrap/api/internal/handler/phase1_watch.go +++ b/products/catalyst/bootstrap/api/internal/handler/phase1_watch.go @@ -75,6 +75,13 @@ const phase1LatePollTimeoutEnv = "CATALYST_PHASE1_LATE_POLL_TIMEOUT" // progress events. Default DefaultLatePollInterval (30s). const phase1LatePollIntervalEnv = "CATALYST_PHASE1_LATE_POLL_INTERVAL" +// phase1ReachabilityBudgetEnv — env var override for the overall +// budget of the pre-flight reachability probe (issue #923). Default +// DefaultReachabilityOverallBudget (10m). Per docs/INVIOLABLE- +// PRINCIPLES.md #4 every knob is runtime-configurable; production +// reads this on every Pod start. +const phase1ReachabilityBudgetEnv = "CATALYST_PHASE1_REACHABILITY_BUDGET" + // kubeconfigArrivalTimeoutEnv — how long runPhase1Watch waits for the // cloud-init PUT to land /var/lib/catalyst/kubeconfigs/.yaml on // disk before giving up with OutcomeKubeconfigMissing. Cloud-init @@ -261,13 +268,30 @@ func (h *Handler) phase1WatchConfigForDeployment(dep *Deployment, kubeconfig str latePollInterval = helmwatch.CompileLatePollInterval(envOrEmpty(phase1LatePollIntervalEnv)) } + reachabilityBudget := h.phase1ReachabilityBudget + if reachabilityBudget == 0 { + if v, _ := time.ParseDuration(envOrEmpty(phase1ReachabilityBudgetEnv)); v > 0 { + reachabilityBudget = v + } + } + cfg := helmwatch.Config{ - KubeconfigYAML: kubeconfig, - WatchTimeout: timeout, - MinBootstrapKitHRs: minHRs, - FirstSeenTimeout: firstSeen, - LatePollTimeout: latePollTimeout, - LatePollInterval: latePollInterval, + KubeconfigYAML: kubeconfig, + WatchTimeout: timeout, + MinBootstrapKitHRs: minHRs, + FirstSeenTimeout: firstSeen, + LatePollTimeout: latePollTimeout, + LatePollInterval: latePollInterval, + ReachabilityOverallBudget: reachabilityBudget, + // OnSubstate — issue #923. The watcher fires this on every + // Phase-1 substate transition (reconnecting → watching). We + // stamp Result.Phase1Substate under dep.mu so a /deployments/ + // {id} GET that races the substate change reads the live + // value, not a stale "phase1-watching" with no further + // signal. + OnSubstate: func(substate string) { + h.setPhase1Substate(dep, substate) + }, } if h.dynamicFactory != nil { cfg.DynamicFactory = h.dynamicFactory @@ -275,12 +299,52 @@ func (h *Handler) phase1WatchConfigForDeployment(dep *Deployment, kubeconfig str if h.coreFactory != nil { cfg.CoreFactory = h.coreFactory } + if h.phase1Reachability != nil { + cfg.Reachability = h.phase1Reachability + } if h.phase1WatchResync > 0 { cfg.Resync = h.phase1WatchResync } + if h.phase1ReachabilitySleep != nil { + cfg.Sleep = h.phase1ReachabilitySleep + } + if h.phase1ReachabilityProbeTimeout > 0 { + cfg.ReachabilityProbeTimeout = h.phase1ReachabilityProbeTimeout + } + if h.phase1ReachabilityRetryInitial > 0 { + cfg.ReachabilityRetryInitialInterval = h.phase1ReachabilityRetryInitial + } + if h.phase1ReachabilityRetryMax > 0 { + cfg.ReachabilityRetryMaxInterval = h.phase1ReachabilityRetryMax + } return cfg } +// setPhase1Substate stamps the live Phase-1 substate onto the +// deployment's Result under dep.mu, then persists the deployment +// record so a Pod restart between transitions reads the same value +// (issue #923). +// +// The substate field is intentionally informational — it does NOT +// flip dep.Status. Status stays "phase1-watching" until markPhase1Done +// runs the terminal classification. The wizard banner reads +// Result.Phase1Substate to render "reconnecting…" or "watching…" +// while Status itself is unchanged. +func (h *Handler) setPhase1Substate(dep *Deployment, substate string) { + dep.mu.Lock() + if dep.Result == nil { + dep.mu.Unlock() + return + } + if dep.Result.Phase1Substate == substate { + dep.mu.Unlock() + return + } + dep.Result.Phase1Substate = substate + dep.mu.Unlock() + h.persistDeployment(dep) +} + // markPhase1Done writes the watch outcome onto dep.Result and flips // Status accordingly. Holds dep.mu for the whole transition so a // State() snapshot from another goroutine can't observe Status=ready @@ -326,6 +390,11 @@ func (h *Handler) markPhase1Done(dep *Deployment, finalStates map[string]string, dep.Result.ComponentStates = finalStates dep.Result.Phase1FinishedAt = &now dep.Result.Phase1Outcome = outcome + // Clear the in-flight substate (issue #923) — the watch has + // terminated and Phase1Outcome is the authoritative classification + // from this point. The wizard banner falls back to rendering the + // Status pill alone once Phase1Substate is empty. + dep.Result.Phase1Substate = "" failed := 0 for _, s := range finalStates { diff --git a/products/catalyst/bootstrap/api/internal/handler/phase1_watch_test.go b/products/catalyst/bootstrap/api/internal/handler/phase1_watch_test.go index 4161494c..f3557a9f 100644 --- a/products/catalyst/bootstrap/api/internal/handler/phase1_watch_test.go +++ b/products/catalyst/bootstrap/api/internal/handler/phase1_watch_test.go @@ -32,11 +32,14 @@ package handler import ( "context" "encoding/json" + "errors" "net/http" "net/http/httptest" "os" "path/filepath" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -1301,6 +1304,180 @@ func TestPhase1WatchConfig_LatePollFieldOverrideBeatsEnv(t *testing.T) { } } +// TestPhase1WatchConfig_ReachabilityBudgetEnvVarOverride proves the +// CATALYST_PHASE1_REACHABILITY_BUDGET env var parses through +// phase1WatchConfigForDeployment (issue #923). +func TestPhase1WatchConfig_ReachabilityBudgetEnvVarOverride(t *testing.T) { + h := NewWithPDM(silentLogger(), &fakePDM{}) + t.Setenv("CATALYST_PHASE1_REACHABILITY_BUDGET", "3m") + + dep := makeDeploymentWithKubeconfig(t, h, "phase1-reach-budget-env", "fake-kubeconfig: yaml") + cfg := h.phase1WatchConfigForDeployment(dep, "fake-kubeconfig: yaml") + + if cfg.ReachabilityOverallBudget != 3*time.Minute { + t.Errorf("ReachabilityOverallBudget = %v, want 3m (from env)", cfg.ReachabilityOverallBudget) + } +} + +// TestPhase1WatchConfig_ReachabilityFieldOverrideBeatsEnv proves the +// test-injection precedence for the reachability budget (issue +// #923). Mirrors the FieldOverrideBeatsEnv contract for every other +// Phase-1 knob. +func TestPhase1WatchConfig_ReachabilityFieldOverrideBeatsEnv(t *testing.T) { + h := NewWithPDM(silentLogger(), &fakePDM{}) + h.phase1ReachabilityBudget = 250 * time.Millisecond + t.Setenv("CATALYST_PHASE1_REACHABILITY_BUDGET", "3m") + + dep := makeDeploymentWithKubeconfig(t, h, "phase1-reach-budget-field", "fake-kubeconfig: yaml") + cfg := h.phase1WatchConfigForDeployment(dep, "fake-kubeconfig: yaml") + + if cfg.ReachabilityOverallBudget != 250*time.Millisecond { + t.Errorf("ReachabilityOverallBudget = %v, want 250ms (handler field override)", cfg.ReachabilityOverallBudget) + } +} + +// TestRunPhase1Watch_OnSubstate_StampedOntoResult proves the wiring +// from helmwatch.Watcher.OnSubstate → handler.setPhase1Substate → +// dep.Result.Phase1Substate (issue #923). +// +// The fixture uses a Reachability factory that fails twice then +// succeeds. The watcher's reconnect loop fires SubstateReconnecting +// on the first failed probe and SubstateWatching on the eventual +// success; markPhase1Done then clears Phase1Substate to "" once the +// watch terminates cleanly. We assert the final value is empty AND +// that during the run the field was non-empty at least once — the +// presence-during-the-run signal the wizard banner reads. +func TestRunPhase1Watch_OnSubstate_StampedOntoResult(t *testing.T) { + h := NewWithPDM(silentLogger(), &fakePDM{}) + h.dynamicFactory = fakeDynamicFactoryFromObjects(makeReadyHR("bp-cilium")) + h.phase1WatchTimeout = 5 * time.Second + + // Reachability probe: fail twice then succeed. We use atomic + // counter so the closure can be called concurrently without + // data race. + var probeCalls int32 + h.phase1Reachability = func(_ string) func(ctx context.Context) error { + return func(_ context.Context) error { + n := atomic.AddInt32(&probeCalls, 1) + if n <= 2 { + return errors.New("net/http: TLS handshake timeout") + } + return nil + } + } + // Tiny intervals + no-op sleep so the loop runs in microseconds. + h.phase1ReachabilitySleep = func(_ context.Context, _ time.Duration) {} + h.phase1ReachabilityProbeTimeout = 100 * time.Millisecond + h.phase1ReachabilityRetryInitial = 1 * time.Millisecond + h.phase1ReachabilityRetryMax = 1 * time.Millisecond + h.phase1ReachabilityBudget = 5 * time.Second + + // Custom test recorder: snapshot Phase1Substate every time it + // changes by hooking via a goroutine-safe poll on dep.Result. + // Since the watcher writes under dep.mu and we read under the + // same lock, no data race. + dep := makeDeploymentWithKubeconfig(t, h, "phase1-substate-wiring", "fake-kubeconfig: yaml") + + // Spawn a poll goroutine that records every distinct Phase1Substate + // value. We start it BEFORE runPhase1Watch returns so we observe + // the in-flight transitions. + stopPoll := make(chan struct{}) + pollDone := make(chan struct{}) + var ( + pollMu sync.Mutex + distinctSubs []string + ) + go func() { + defer close(pollDone) + var last string + t := time.NewTicker(1 * time.Millisecond) + defer t.Stop() + for { + select { + case <-stopPoll: + return + case <-t.C: + dep.mu.Lock() + cur := "" + if dep.Result != nil { + cur = dep.Result.Phase1Substate + } + dep.mu.Unlock() + if cur != last { + pollMu.Lock() + distinctSubs = append(distinctSubs, cur) + pollMu.Unlock() + last = cur + } + } + } + }() + + h.runPhase1Watch(dep) + + close(stopPoll) + <-pollDone + + dep.mu.Lock() + defer dep.mu.Unlock() + if dep.Status != "ready" { + t.Errorf("Status = %q, want %q", dep.Status, "ready") + } + // Phase1Substate must be cleared after terminal classification. + if dep.Result.Phase1Substate != "" { + t.Errorf("Phase1Substate = %q after terminate, want empty", dep.Result.Phase1Substate) + } + + // During the run, the field must have transitioned through + // reconnecting → watching at least. The poll runs at 1ms so it's + // very likely to catch both — but we tolerate the race where the + // poll only catches one of the in-flight values, and assert at + // least one non-empty value was observed. + pollMu.Lock() + defer pollMu.Unlock() + sawNonEmpty := false + for _, v := range distinctSubs { + if v != "" { + sawNonEmpty = true + } + } + if !sawNonEmpty { + t.Errorf("expected Phase1Substate to be non-empty at some point during the run; observed transitions = %v", distinctSubs) + } +} + +// TestState_SurfacesPhase1Substate proves the State() snapshot lifts +// dep.Result.Phase1Substate to the top-level "phase1Substate" key +// when non-empty, and omits it when empty (issue #923). +func TestState_SurfacesPhase1Substate(t *testing.T) { + dep := &Deployment{ + ID: "phase1-substate-state", + Status: "phase1-watching", + StartedAt: time.Now(), + Request: provisioner.Request{SovereignFQDN: "test.example"}, + Result: &provisioner.Result{ + SovereignFQDN: "test.example", + Phase1Substate: helmwatch.SubstateReconnecting, + }, + } + state := dep.State() + got, ok := state["phase1Substate"] + if !ok { + t.Fatalf("State() missing phase1Substate key when Result.Phase1Substate is non-empty") + } + if got != helmwatch.SubstateReconnecting { + t.Errorf("State()[phase1Substate] = %v, want %q", got, helmwatch.SubstateReconnecting) + } + + // Clear and re-check — empty substate must NOT surface in the + // snapshot (the wizard's reducer never receives a "" value). + dep.Result.Phase1Substate = "" + state2 := dep.State() + if _, ok := state2["phase1Substate"]; ok { + t.Errorf("State() still surfaces phase1Substate after clearing the field; should be omitted") + } +} + // ───────────────────────────────────────────────────────────────────── // helpers // ───────────────────────────────────────────────────────────────────── diff --git a/products/catalyst/bootstrap/api/internal/helmwatch/helmwatch.go b/products/catalyst/bootstrap/api/internal/helmwatch/helmwatch.go index 0c5da449..819e1f51 100644 --- a/products/catalyst/bootstrap/api/internal/helmwatch/helmwatch.go +++ b/products/catalyst/bootstrap/api/internal/helmwatch/helmwatch.go @@ -172,6 +172,53 @@ const DefaultLatePollTimeout = 10 * time.Minute // Operator override: CATALYST_PHASE1_LATE_POLL_INTERVAL. const DefaultLatePollInterval = 30 * time.Second +// DefaultReachabilityProbeTimeout — per-attempt timeout for the +// pre-flight apiserver reachability probe (issue #923). After a +// catalyst-api Pod restart, the new Pod's TLS handshake to the +// Sovereign apiserver can hang for the full WatchTimeout (60m +// default) when the LB is mid-warm-up or kube-vip is flapping. The +// probe forces individual attempts to fail fast so we can retry with +// backoff while emitting "watcher-reconnecting" diagnostics into the +// SSE stream. +// +// 10 seconds is sized well above the healthy handshake observed in +// production (sub-second on otech10x and on contabo's BankDhofar +// Qwen probe — same network position), with headroom for slow +// Hetzner regions. +const DefaultReachabilityProbeTimeout = 10 * time.Second + +// DefaultReachabilityRetryInitialInterval — initial backoff for the +// pre-flight reachability probe (issue #923). Each failed attempt +// doubles the wait up to DefaultReachabilityRetryMaxInterval. +const DefaultReachabilityRetryInitialInterval = 5 * time.Second + +// DefaultReachabilityRetryMaxInterval — upper bound on the pre-flight +// reachability probe backoff (issue #923). Caps the inter-attempt +// gap so a long unreachable window still emits a "watcher- +// reconnecting" event at the wizard log pane at least once a minute. +const DefaultReachabilityRetryMaxInterval = 60 * time.Second + +// DefaultReachabilityOverallBudget — overall budget for the pre-flight +// reachability loop before falling through to factory.Start (issue +// #923). 10 minutes is generous: a healthy Sovereign answers within +// seconds. After this, the informer cache-sync path takes over with +// classifyOutcomeOnContextEnd → OutcomeFluxNotReconciling as the +// terminal classification — exactly the right diagnostic for a +// genuinely unreachable apiserver. +// +// Operator override: CATALYST_PHASE1_REACHABILITY_BUDGET. +const DefaultReachabilityOverallBudget = 10 * time.Minute + +// DefaultRESTConfigTimeout — per-request timeout we set on the REST +// config built by NewDynamicClientFromKubeconfig / +// NewKubernetesClientFromKubeconfig (issue #923). Without this, the +// rest.Config has no per-request deadline and a hung TLS handshake +// can block the informer's List call until the overall WatchTimeout +// fires (60m). 30 seconds caps individual REST calls so the +// informer's internal retry loop has a chance to recover when +// transient TLS / LB flaps clear. +const DefaultRESTConfigTimeout = 30 * time.Second + // Phase-1 outcome strings — Watcher.Outcome() returns one of these so // the handler can set Result.Phase1Outcome (read by the Sovereign // Admin banner). Empty string means the watch has not yet terminated. @@ -225,6 +272,23 @@ const ( StateFailed = "failed" ) +// Phase-1 sub-state strings — surfaced via OnSubstate so the handler +// can stamp them onto Deployment.Result.Phase1Substate. Read by the +// Sovereign Admin's wizard banner to render a more granular status +// pill while Status itself stays "phase1-watching" (issue #923). +const ( + // SubstateReconnecting — set while the pre-flight reachability + // probe is retrying after a Pod restart. Cleared (set to "") once + // the apiserver answers and the informer's first sync completes. + SubstateReconnecting = "watcher-reconnecting" + + // SubstateWatching — set immediately after a successful + // reachability probe + WaitForCacheSync return. The watch is now + // observing live HelmRelease transitions; the wizard's "X of Y + // installed" pill takes over from this point. + SubstateWatching = "watcher-watching" +) + // terminalStates — once a component reaches one of these, the watch // stops emitting state-change events for it. "degraded" is NOT terminal // — a degraded component can recover (Flux retries automatically); the @@ -330,6 +394,63 @@ type Config struct { // (30s). Zero means "fall back to default"; tests inject a tiny // value (e.g. 50ms). LatePollInterval time.Duration + + // Reachability — pre-flight apiserver reachability probe (issue + // #923). Production wires NewReachabilityProbeFromKubeconfig which + // builds a discovery client and calls ServerVersion() with a short + // per-attempt timeout. Tests inject a closure that returns N + // transient errors then nil to exercise the reconnect path + // deterministically. + // + // The contract is: a non-nil return means "apiserver did not + // answer this attempt — retry with backoff and emit a watcher- + // reconnecting event"; nil means "ready to start informer". + // + // nil here means: fall back to NewReachabilityProbeFromKubeconfig + // (production default). Setting to an explicit no-op + // (`func(_ string) func(context.Context) error { return func(context.Context) error { return nil } }`) + // disables the probe — used by tests that exercise the + // post-reachability informer path without dragging the probe + // retry loop into the test runtime. + Reachability func(kubeconfigYAML string) func(ctx context.Context) error + + // ReachabilityProbeTimeout — per-attempt timeout for the pre-flight + // reachability probe. Defaults to DefaultReachabilityProbeTimeout + // (10s). + ReachabilityProbeTimeout time.Duration + + // ReachabilityRetryInitialInterval — initial inter-attempt gap for + // the reachability backoff. Defaults to + // DefaultReachabilityRetryInitialInterval (5s). + ReachabilityRetryInitialInterval time.Duration + + // ReachabilityRetryMaxInterval — upper bound on the reachability + // backoff. Defaults to DefaultReachabilityRetryMaxInterval (60s). + ReachabilityRetryMaxInterval time.Duration + + // ReachabilityOverallBudget — overall budget for the reachability + // loop. After this, Watch falls through to factory.Start + + // WaitForCacheSync, which will then time out via WatchTimeout and + // classify as OutcomeFluxNotReconciling (correct diagnostic for a + // genuinely unreachable apiserver). Defaults to + // DefaultReachabilityOverallBudget (10m). + ReachabilityOverallBudget time.Duration + + // Sleep — sleep injection for the reachability backoff (issue + // #923). Production passes a closure backed by time.NewTimer that + // honours ctx; tests inject a no-op so the retry loop runs in + // microseconds. Defaults to a real-time sleep that blocks until + // the timer or ctx fires, whichever first. + Sleep func(ctx context.Context, d time.Duration) + + // OnSubstate — fired whenever the Phase-1 substate transitions + // (issue #923). Production wires this to a callback that updates + // Deployment.Result.Phase1Substate under dep.mu so the Sovereign + // Admin's wizard sees the live "reconnecting…" pill instead of a + // stale "phase1-watching". Optional; nil disables substate + // notifications (the Watcher still runs the probe, just without + // surfacing the field on the deployment record). + OnSubstate func(substate string) } func (c *Config) applyDefaults() { @@ -354,6 +475,49 @@ func (c *Config) applyDefaults() { if c.LatePollInterval <= 0 { c.LatePollInterval = DefaultLatePollInterval } + if c.ReachabilityProbeTimeout <= 0 { + c.ReachabilityProbeTimeout = DefaultReachabilityProbeTimeout + } + if c.ReachabilityRetryInitialInterval <= 0 { + c.ReachabilityRetryInitialInterval = DefaultReachabilityRetryInitialInterval + } + if c.ReachabilityRetryMaxInterval <= 0 { + c.ReachabilityRetryMaxInterval = DefaultReachabilityRetryMaxInterval + } + if c.ReachabilityOverallBudget <= 0 { + c.ReachabilityOverallBudget = DefaultReachabilityOverallBudget + } + if c.Sleep == nil { + c.Sleep = sleepWithContext + } +} + +// noopReachability is the test-only default for Config.Reachability +// when a fake DynamicFactory is also supplied (issue #923). It +// returns a probe that always succeeds, so a dynamic-fake-client +// test is not forced to also stand up a fake reachability probe +// just to exercise the post-probe informer path. A test that +// specifically exercises the reconnect loop sets Config.Reachability +// to a closure that returns transient errors first. +func noopReachability(_ string) func(ctx context.Context) error { + return func(_ context.Context) error { return nil } +} + +// sleepWithContext sleeps for d, returning early when ctx is done. +// Used as the production default for Config.Sleep so the reachability +// retry loop wakes immediately on context cancel (Pod shutdown, +// caller cancel, overall WatchTimeout). Tests inject a no-op closure +// so the backoff runs in microseconds. +func sleepWithContext(ctx context.Context, d time.Duration) { + if d <= 0 { + return + } + t := time.NewTimer(d) + defer t.Stop() + select { + case <-t.C: + case <-ctx.Done(): + } } // Watcher observes bp-* HelmReleases in flux-system on the new @@ -523,6 +687,7 @@ func NewWatcher(cfg Config, emit Emit) (*Watcher, error) { if strings.TrimSpace(cfg.KubeconfigYAML) == "" { return nil, errors.New("helmwatch: kubeconfig is required (deployment.Result.KubeconfigPath was empty or the file was unreadable)") } + usingTestDynamicFactory := cfg.DynamicFactory != nil if cfg.DynamicFactory == nil { cfg.DynamicFactory = NewDynamicClientFromKubeconfig } @@ -535,6 +700,24 @@ func NewWatcher(cfg Config, emit Emit) (*Watcher, error) { // Tests still inject a fake.NewSimpleClientset via Config.CoreFactory. cfg.CoreFactory = NewKubernetesClientFromKubeconfig } + if cfg.Reachability == nil { + // Production default: discovery-client ServerVersion() against + // the Sovereign apiserver, with the per-attempt timeout taken + // from cfg.ReachabilityProbeTimeout (issue #923). + // + // Tests that inject a fake DynamicFactory but do NOT explicitly + // set Reachability get a no-op probe by default — there is no + // real apiserver to probe and we don't want every existing + // dynamic-fake-client test to hang against the production + // probe trying to hit "fake-kubeconfig: bytes". A test that + // specifically exercises the reconnect path overrides + // Reachability with its own closure. + if usingTestDynamicFactory { + cfg.Reachability = noopReachability + } else { + cfg.Reachability = NewReachabilityProbeFromKubeconfig + } + } cfg.applyDefaults() return &Watcher{ cfg: cfg, @@ -576,6 +759,33 @@ func (w *Watcher) Watch(ctx context.Context) (map[string]string, error) { watchCtx, cancel := context.WithTimeout(ctx, w.cfg.WatchTimeout) defer cancel() + // Pre-flight reachability probe — issue #923. Catalyst-api Pod + // restart in the middle of Phase-1 leaves the new Pod with no live + // connection to the Sovereign apiserver. If we go straight to + // factory.Start, an unreachable apiserver causes + // cache.WaitForCacheSync to block silently for the entire + // WatchTimeout (60m) — the deployment record's componentStates + // stays empty and the wizard log pane goes dark. Probe with a + // short per-attempt timeout, retry with backoff, and emit + // "watcher-reconnecting" events so the operator sees live + // progress. + // + // On success: substate flips to SubstateWatching and we proceed to + // factory.Start. On overall-budget exhaustion: we still proceed + // to factory.Start (the informer's own timeout path will then + // classify as OutcomeFluxNotReconciling — exactly the right + // terminal diagnostic for a genuinely unreachable apiserver). + if !w.runReachabilityProbe(watchCtx) { + // runReachabilityProbe returns false only when watchCtx is + // done — meaning the overall WatchTimeout fired during the + // probe loop. Surface terminal classification immediately + // rather than building an informer that will time out on the + // same condition. + final := w.terminalStatesSnapshot() + w.setOutcome(w.classifyOutcomeOnContextEnd()) + return final, fmt.Errorf("helmwatch: pre-flight reachability probe cancelled by context: %w", watchCtx.Err()) + } + // Stash the cancel func so Watcher.Cancel() can interrupt a // long-running watch from another goroutine — the handover // finalisation handler (issue #317) calls this when @@ -1050,6 +1260,125 @@ func (w *Watcher) maybeEmitFirstSeenWarn(firstSeenStart time.Time) { }) } +// runReachabilityProbe drives the pre-flight apiserver reachability +// loop (issue #923). Returns true on success (apiserver answered OR +// the overall reachability budget elapsed — caller should fall +// through to factory.Start either way). Returns false only when +// watchCtx fires during the probe loop — caller exits early with +// classifyOutcomeOnContextEnd. +// +// The first attempt is a single info-level "starting reachability +// probe" event so the wizard log pane shows the watch is alive. +// Each subsequent failed attempt emits a warn-level +// "watcher-reconnecting" event naming the attempt number + the +// observed error. On success a single info-level "reachable" event +// fires and substate flips to SubstateWatching. The first failed +// attempt also flips substate to SubstateReconnecting so the wizard +// banner can render "reconnecting…" while the loop runs. +// +// Backoff: starts at ReachabilityRetryInitialInterval (5s), doubles +// each failed attempt, caps at ReachabilityRetryMaxInterval (60s). +// Sleeps via cfg.Sleep so tests can inject a no-op and exercise the +// loop in microseconds. +func (w *Watcher) runReachabilityProbe(watchCtx context.Context) bool { + probe := w.cfg.Reachability(w.cfg.KubeconfigYAML) + budgetDeadline := w.cfg.Now().Add(w.cfg.ReachabilityOverallBudget) + interval := w.cfg.ReachabilityRetryInitialInterval + + attempt := 0 + for { + attempt++ + // Per-attempt context with the configured probe timeout, so a + // hung TLS handshake fails fast and we can retry. + probeCtx, probeCancel := context.WithTimeout(watchCtx, w.cfg.ReachabilityProbeTimeout) + err := probe(probeCtx) + probeCancel() + + if err == nil { + // Apiserver reachable — flip substate to watching, emit a + // concise "reachable" diagnostic, return success. + if attempt > 1 { + w.dispatch(provisioner.Event{ + Time: w.cfg.Now().UTC().Format(time.RFC3339), + Phase: PhaseComponent, + Level: "info", + Message: fmt.Sprintf( + "Phase-1 watch: Sovereign apiserver reachable on attempt %d — starting per-component HelmRelease watch.", + attempt, + ), + }) + } + w.notifySubstate(SubstateWatching) + return true + } + + // Probe failed — surface diagnostic. First failure is the + // transition from "starting" to "reconnecting"; subsequent + // failures emit a re-attempt summary so the wizard log pane + // shows the loop is alive. + w.notifySubstate(SubstateReconnecting) + w.dispatch(provisioner.Event{ + Time: w.cfg.Now().UTC().Format(time.RFC3339), + Phase: PhaseComponent, + Level: "warn", + Message: fmt.Sprintf( + "Phase-1 watch: Sovereign apiserver unreachable on attempt %d (%s); retrying in %s — typically a transient TLS handshake / LB warm-up after Pod restart.", + attempt, + err.Error(), + interval, + ), + }) + + // Check the overall reachability budget BEFORE sleeping so we + // don't sleep past the budget for nothing. Once the budget is + // exhausted we still return true and let the caller fall + // through to factory.Start — the informer's own retry path + // + WaitForCacheSync timeout will then classify as + // OutcomeFluxNotReconciling on a genuinely unreachable + // apiserver, exactly the right operator-actionable diagnostic. + if w.cfg.Now().After(budgetDeadline) { + w.dispatch(provisioner.Event{ + Time: w.cfg.Now().UTC().Format(time.RFC3339), + Phase: PhaseComponent, + Level: "warn", + Message: fmt.Sprintf( + "Phase-1 watch: reachability budget %s exhausted after %d attempts; falling through to informer (will time out as flux-not-reconciling on a genuinely unreachable apiserver).", + w.cfg.ReachabilityOverallBudget, + attempt, + ), + }) + return true + } + + // Sleep with cancellation. cfg.Sleep wakes immediately if + // watchCtx is done so a Pod-shutdown / overall-WatchTimeout + // during a long backoff returns the loop early. + w.cfg.Sleep(watchCtx, interval) + if watchCtx.Err() != nil { + return false + } + + // Exponential backoff up to the cap. + interval = interval * 2 + if interval > w.cfg.ReachabilityRetryMaxInterval { + interval = w.cfg.ReachabilityRetryMaxInterval + } + } +} + +// notifySubstate fires the OnSubstate callback if one is configured +// (issue #923). Production wires this to a closure on the handler +// that updates Deployment.Result.Phase1Substate under dep.mu so a +// /deployments/ GET returns the live substate. A nil OnSubstate +// is the test-only path — the watcher still runs the probe + emits +// the corresponding events. +func (w *Watcher) notifySubstate(substate string) { + if w.cfg.OnSubstate == nil { + return + } + w.cfg.OnSubstate(substate) +} + // classifyOutcomeOnTerminate maps a clean terminate-on-all-done into // OutcomeReady or OutcomeFailed. Called only on the `<-terminated` // branch, where the gate guarantees len(observed) ≥ diff --git a/products/catalyst/bootstrap/api/internal/helmwatch/kubeconfig.go b/products/catalyst/bootstrap/api/internal/helmwatch/kubeconfig.go index 4c9af78f..88f229ae 100644 --- a/products/catalyst/bootstrap/api/internal/helmwatch/kubeconfig.go +++ b/products/catalyst/bootstrap/api/internal/helmwatch/kubeconfig.go @@ -8,23 +8,56 @@ package helmwatch import ( + "context" "fmt" + "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) +// restConfigFromKubeconfig parses the raw kubeconfig YAML and stamps +// a per-request Timeout on the resulting rest.Config (issue #923). +// Without this, individual List/Watch/Get calls from the informer +// have no per-request deadline — a hung TLS handshake to the +// Sovereign apiserver after a catalyst-api Pod restart can block +// for the full WatchTimeout (60m) silently. DefaultRESTConfigTimeout +// (30s) caps each REST call so client-go's internal retry loop has +// a chance to recover when the LB warms up or kube-vip stops +// flapping. +// +// Pulled out of NewDynamicClientFromKubeconfig / NewKubernetesClient- +// FromKubeconfig so the production reachability probe +// (NewReachabilityProbeFromKubeconfig) can share the same parse + +// timeout-stamping path. +func restConfigFromKubeconfig(kubeconfigYAML string) (*rest.Config, error) { + cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfigYAML)) + if err != nil { + return nil, fmt.Errorf("parse kubeconfig: %w", err) + } + // Stamp the per-request timeout. clientcmd never sets one by + // default, so a hung handshake stays hung forever (issue #923). + cfg.Timeout = DefaultRESTConfigTimeout + return cfg, nil +} + // NewDynamicClientFromKubeconfig builds a dynamic.Interface from raw // kubeconfig YAML. The kubeconfig is the new Sovereign cluster's // k3s.yaml (rewritten with the load-balancer's public IP — the // in-VM 127.0.0.1 server URL is invariant the fetcher must rewrite, // but that rewrite happens upstream in the Phase-0 fetch step, NOT // here). +// +// Per issue #923 the rest.Config carries a 30s per-request timeout +// so a hung TLS handshake after Pod restart fails fast and the +// pre-flight reachability probe can retry with backoff instead of +// the informer hanging silently for the entire WatchTimeout. func NewDynamicClientFromKubeconfig(kubeconfigYAML string) (dynamic.Interface, error) { - cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfigYAML)) + cfg, err := restConfigFromKubeconfig(kubeconfigYAML) if err != nil { - return nil, fmt.Errorf("parse kubeconfig: %w", err) + return nil, err } dyn, err := dynamic.NewForConfig(cfg) if err != nil { @@ -36,10 +69,13 @@ func NewDynamicClientFromKubeconfig(kubeconfigYAML string) (dynamic.Interface, e // NewKubernetesClientFromKubeconfig builds a typed kubernetes.Interface // from raw kubeconfig YAML. Used for Pod listing + log tailing on // helm-controller in flux-system. +// +// Same rest.Config.Timeout treatment as NewDynamicClientFromKubeconfig +// — issue #923. func NewKubernetesClientFromKubeconfig(kubeconfigYAML string) (kubernetes.Interface, error) { - cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfigYAML)) + cfg, err := restConfigFromKubeconfig(kubeconfigYAML) if err != nil { - return nil, fmt.Errorf("parse kubeconfig: %w", err) + return nil, err } clientset, err := kubernetes.NewForConfig(cfg) if err != nil { @@ -47,3 +83,46 @@ func NewKubernetesClientFromKubeconfig(kubeconfigYAML string) (kubernetes.Interf } return clientset, nil } + +// NewReachabilityProbeFromKubeconfig is the production default for +// Config.Reachability (issue #923). It returns a closure that, on +// each call, builds a discovery client from the kubeconfig and asks +// the apiserver for its server-version. The discovery client uses +// the same rest.Config.Timeout as the rest of the helmwatch path +// (DefaultRESTConfigTimeout, 30s), so the per-attempt timeout that +// matters in the probe loop is whichever is shorter: +// rest.Config.Timeout vs the per-attempt context the caller passes. +// +// We rebuild the client per call rather than caching it: rest.Config +// caches connection pools internally, but a transient TLS handshake +// failure can leave a poisoned connection in the pool. A fresh client +// every attempt sidesteps that — the probe loop only runs at most +// ReachabilityOverallBudget (10m default), so the allocation cost is +// negligible. +// +// On success: returns nil. On any failure (parse, factory init, +// ServerVersion call): returns a wrapped error so the probe loop's +// emitted message names the failure mode. The watcher's reconnect +// loop classifies any non-nil return as "retry with backoff". +func NewReachabilityProbeFromKubeconfig(kubeconfigYAML string) func(ctx context.Context) error { + return func(ctx context.Context) error { + cfg, err := restConfigFromKubeconfig(kubeconfigYAML) + if err != nil { + return fmt.Errorf("reachability: %w", err) + } + client, err := discovery.NewDiscoveryClientForConfig(cfg) + if err != nil { + return fmt.Errorf("reachability: build discovery client: %w", err) + } + // Use RESTClient().Get on /version so the apiserver's CA + // validation + auth path runs end-to-end. ServerVersion() + // uses RESTClient().AbsPath("/version") under the hood; we + // call the same path explicitly so we can attach the + // caller's per-attempt context for fast cancellation. + raw := client.RESTClient().Get().AbsPath("/version").Do(ctx) + if err := raw.Error(); err != nil { + return fmt.Errorf("reachability: GET /version: %w", err) + } + return nil + } +} diff --git a/products/catalyst/bootstrap/api/internal/helmwatch/reachability_test.go b/products/catalyst/bootstrap/api/internal/helmwatch/reachability_test.go new file mode 100644 index 00000000..2b9478fe --- /dev/null +++ b/products/catalyst/bootstrap/api/internal/helmwatch/reachability_test.go @@ -0,0 +1,400 @@ +// Tests for the pre-flight apiserver reachability probe + reconnect +// loop (issue #923). +// +// What this file proves: +// +// 1. After-Pod-restart reconnect path — a Watcher started against an +// apiserver that is unreachable for the first N attempts, then +// reachable, emits one "watcher-reconnecting" warn event per +// failed attempt, fires OnSubstate with SubstateReconnecting on +// the first failure and SubstateWatching on the eventual success, +// and proceeds to the informer's normal cache-sync path. +// +// 2. Reachability budget exhaustion — when the apiserver stays +// unreachable for the full ReachabilityOverallBudget, the probe +// loop falls through to the informer (NOT a hard failure). The +// informer's own WatchTimeout path then classifies as +// OutcomeFluxNotReconciling — exactly the right diagnostic for a +// genuinely unreachable apiserver. +// +// 3. Substate transitions — OnSubstate fires exactly once for the +// first reconnecting event, and exactly once for the final +// watching event; no spurious duplicate substate notifications +// even when the informer cache-sync also runs. +// +// We use a fake DynamicFactory + fake CoreFactory so no real cluster +// is needed; the probe is supplied via Config.Reachability with a +// counter-based closure so transient-then-success can be exercised +// deterministically. Sleep is overridden to a no-op so the backoff +// runs in microseconds. +package helmwatch + +import ( + "context" + "errors" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + dynamicfake "k8s.io/client-go/dynamic/fake" +) + +// transientThenSuccessProbe returns a Reachability factory that fails +// the first `failures` attempts with the given error, then succeeds. +// Counter is shared via the returned pointer so the test can assert +// the call count. +func transientThenSuccessProbe(failures int, transientErr error) (func(string) func(context.Context) error, *int32) { + var count int32 + factory := func(_ string) func(context.Context) error { + return func(_ context.Context) error { + n := atomic.AddInt32(&count, 1) + if int(n) <= failures { + return transientErr + } + return nil + } + } + return factory, &count +} + +// substateRecorder collects every substate transition the watcher +// emits via OnSubstate so tests can assert ordering + de-duplication. +type substateRecorder struct { + mu sync.Mutex + values []string +} + +func (s *substateRecorder) record(v string) { + s.mu.Lock() + s.values = append(s.values, v) + s.mu.Unlock() +} + +func (s *substateRecorder) snapshot() []string { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]string, len(s.values)) + copy(out, s.values) + return out +} + +// noopSleep makes the reachability backoff run in microseconds. Used +// by every reachability test so a 60s max-interval cap doesn't drag +// the test runtime to a halt. +func noopSleep(_ context.Context, _ time.Duration) {} + +// TestReachabilityProbe_HappyPath_SingleAttemptSucceeds verifies the +// production-shaped "apiserver was reachable on the first try" path +// emits NO "watcher-reconnecting" diagnostics and substate flips +// straight to SubstateWatching. +func TestReachabilityProbe_HappyPath_SingleAttemptSucceeds(t *testing.T) { + scheme := newFakeScheme() + releases := []runtime.Object{ + makeHelmRelease("bp-cilium", []metav1.Condition{ + {Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"}, + }), + } + client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, + map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"}, + releases..., + ) + + rec := &recorder{} + subs := &substateRecorder{} + + probeFactory, calls := transientThenSuccessProbe(0, nil) + + cfg := Config{ + KubeconfigYAML: "fake-kubeconfig: bytes", + WatchTimeout: 5 * time.Second, + DynamicFactory: fakeFactory(client), + Reachability: probeFactory, + Sleep: noopSleep, + Resync: 0, + OnSubstate: subs.record, + } + w, err := NewWatcher(cfg, rec.emit) + if err != nil { + t.Fatalf("NewWatcher: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if _, err := w.Watch(ctx); err != nil { + t.Fatalf("Watch: %v", err) + } + + if got := atomic.LoadInt32(calls); got != 1 { + t.Errorf("expected exactly 1 reachability probe call, got %d", got) + } + + // Substate transitions: success-on-first-attempt should NOT emit + // SubstateReconnecting (we only flip substate once the first + // failed attempt happens). It should still flip to + // SubstateWatching so the wizard can read the live state. + got := subs.snapshot() + if len(got) != 1 || got[0] != SubstateWatching { + t.Errorf("substate transitions = %v, want [%q]", got, SubstateWatching) + } + + // No "Sovereign apiserver unreachable" warns should be in the + // emit buffer on the happy path. + for _, ev := range rec.snapshot() { + if strings.Contains(ev.Message, "unreachable") { + t.Errorf("happy-path watch should not emit any 'unreachable' diagnostic, got: %q", ev.Message) + } + } +} + +// TestReachabilityProbe_TransientThenSuccess proves the +// catalyst-api-Pod-restart-then-LB-warm-up scenario: the apiserver +// is unreachable for the first 2 attempts, then answers. The watcher +// emits exactly 2 "watcher-reconnecting" warns, fires OnSubstate +// with reconnecting → watching, and proceeds to the normal informer +// cache-sync path that returns OutcomeReady. +func TestReachabilityProbe_TransientThenSuccess(t *testing.T) { + scheme := newFakeScheme() + releases := []runtime.Object{ + makeHelmRelease("bp-cilium", []metav1.Condition{ + {Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"}, + }), + } + client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, + map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"}, + releases..., + ) + + rec := &recorder{} + subs := &substateRecorder{} + + transientErr := errors.New("Get https://5.161.50.175:6443/version: net/http: TLS handshake timeout") + probeFactory, calls := transientThenSuccessProbe(2, transientErr) + + cfg := Config{ + KubeconfigYAML: "fake-kubeconfig: bytes", + WatchTimeout: 5 * time.Second, + DynamicFactory: fakeFactory(client), + Reachability: probeFactory, + Sleep: noopSleep, + ReachabilityRetryInitialInterval: 1 * time.Millisecond, + ReachabilityRetryMaxInterval: 1 * time.Millisecond, + ReachabilityProbeTimeout: 100 * time.Millisecond, + Resync: 0, + OnSubstate: subs.record, + } + w, err := NewWatcher(cfg, rec.emit) + if err != nil { + t.Fatalf("NewWatcher: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if _, err := w.Watch(ctx); err != nil { + t.Fatalf("Watch: %v", err) + } + + if got := atomic.LoadInt32(calls); got != 3 { + t.Errorf("expected 3 reachability probe calls (2 transient failures + 1 success), got %d", got) + } + + // Outcome must be Ready — the informer ran normally after + // reachability succeeded. + if outcome := w.Outcome(); outcome != OutcomeReady { + t.Errorf("Outcome = %q, want %q", outcome, OutcomeReady) + } + + // Substate transitions: first reconnecting (set on first failed + // probe), then watching (set on the eventual success). Duplicate + // reconnecting calls during the loop are de-duped by + // notifySubstate's per-call dispatch — but the recorder sees + // every notifySubstate call, so we expect 2x reconnecting + 1 + // watching. + got := subs.snapshot() + if len(got) != 3 { + t.Errorf("substate transitions length = %d, want 3 (got %v)", len(got), got) + } + if len(got) >= 1 && got[0] != SubstateReconnecting { + t.Errorf("first substate = %q, want %q", got[0], SubstateReconnecting) + } + if len(got) >= 1 && got[len(got)-1] != SubstateWatching { + t.Errorf("last substate = %q, want %q", got[len(got)-1], SubstateWatching) + } + + // Count the "watcher-reconnecting"-shaped warn events. We expect + // exactly 2 (one per failed attempt; the 3rd attempt succeeded + // and emits an info-level "reachable on attempt N" diagnostic + // instead). + warnCount := 0 + infoReachableCount := 0 + for _, ev := range rec.snapshot() { + if ev.Phase == PhaseComponent && ev.Level == "warn" && strings.Contains(ev.Message, "Sovereign apiserver unreachable") { + warnCount++ + } + if ev.Phase == PhaseComponent && ev.Level == "info" && strings.Contains(ev.Message, "Sovereign apiserver reachable on attempt") { + infoReachableCount++ + } + } + if warnCount != 2 { + t.Errorf("expected 2 'unreachable' warn events, got %d", warnCount) + } + if infoReachableCount != 1 { + t.Errorf("expected 1 'reachable on attempt N' info event, got %d", infoReachableCount) + } +} + +// TestReachabilityProbe_BudgetExhausted_FallsThroughToInformer proves +// that when the reachability probe loop exhausts its overall budget +// (apiserver stays unreachable), the watcher does NOT terminate the +// run with a hard failure — instead it falls through to factory.Start +// + WaitForCacheSync. The informer's own retry path then drives the +// terminal classification. With a fake DynamicFactory whose List +// resolves cleanly (no real apiserver), the watch then proceeds to +// OutcomeReady — proving that "budget-exhausted" is NOT a hard +// failure mode. (In production the informer hits a real unreachable +// apiserver, WaitForCacheSync times out via WatchTimeout, and the +// classifier returns OutcomeFluxNotReconciling.) +func TestReachabilityProbe_BudgetExhausted_FallsThroughToInformer(t *testing.T) { + scheme := newFakeScheme() + releases := []runtime.Object{ + makeHelmRelease("bp-cilium", []metav1.Condition{ + {Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"}, + }), + } + client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, + map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"}, + releases..., + ) + + rec := &recorder{} + subs := &substateRecorder{} + + // Always-fail probe. + var calls int32 + alwaysFail := func(_ string) func(context.Context) error { + return func(_ context.Context) error { + atomic.AddInt32(&calls, 1) + return errors.New("apiserver unreachable") + } + } + + cfg := Config{ + KubeconfigYAML: "fake-kubeconfig: bytes", + WatchTimeout: 5 * time.Second, + DynamicFactory: fakeFactory(client), + Reachability: alwaysFail, + Sleep: noopSleep, + ReachabilityRetryInitialInterval: 1 * time.Millisecond, + ReachabilityRetryMaxInterval: 1 * time.Millisecond, + ReachabilityProbeTimeout: 10 * time.Millisecond, + // 50ms budget — guarantees we exhaust within the test's + // 5s WatchTimeout. + ReachabilityOverallBudget: 50 * time.Millisecond, + Resync: 0, + OnSubstate: subs.record, + } + w, err := NewWatcher(cfg, rec.emit) + if err != nil { + t.Fatalf("NewWatcher: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if _, err := w.Watch(ctx); err != nil { + t.Fatalf("Watch: %v", err) + } + + // Probe should have been called at least once. + if got := atomic.LoadInt32(&calls); got < 1 { + t.Errorf("expected ≥1 reachability probe call, got %d", got) + } + + // We should see a "reachability budget exhausted" warn + // diagnostic in the emit buffer — this is the operator-facing + // signal that the loop fell through to the informer. + exhaustedSeen := false + for _, ev := range rec.snapshot() { + if ev.Level == "warn" && strings.Contains(ev.Message, "reachability budget") && strings.Contains(ev.Message, "exhausted") { + exhaustedSeen = true + break + } + } + if !exhaustedSeen { + t.Errorf("expected 'reachability budget exhausted' warn event in emit buffer; got events:\n%v", rec.snapshot()) + } + + // Substate must have flipped to reconnecting at least once + // during the failure loop (a budget-exhausted run never reaches + // SubstateWatching from the probe path; the fact that the fake + // informer succeeds afterwards is irrelevant — the substate + // invariant is "reconnecting was observed on the failure + // trajectory"). + subSeen := false + for _, v := range subs.snapshot() { + if v == SubstateReconnecting { + subSeen = true + break + } + } + if !subSeen { + t.Errorf("expected SubstateReconnecting to be fired during the failure loop; got %v", subs.snapshot()) + } +} + +// TestReachabilityProbe_ContextCancelDuringProbe proves the watcher +// returns cleanly (no hang) when the overall watchCtx fires while +// the probe loop is mid-retry. This is the Pod-shutdown / overall- +// WatchTimeout path. +func TestReachabilityProbe_ContextCancelDuringProbe(t *testing.T) { + scheme := newFakeScheme() + client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, + map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"}, + ) + + rec := &recorder{} + subs := &substateRecorder{} + + // Probe blocks until ctx fires. + blockingProbe := func(_ string) func(context.Context) error { + return func(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() + } + } + + cfg := Config{ + KubeconfigYAML: "fake-kubeconfig: bytes", + WatchTimeout: 250 * time.Millisecond, + DynamicFactory: fakeFactory(client), + Reachability: blockingProbe, + Sleep: noopSleep, + ReachabilityRetryInitialInterval: 10 * time.Millisecond, + ReachabilityRetryMaxInterval: 10 * time.Millisecond, + ReachabilityProbeTimeout: 50 * time.Millisecond, + ReachabilityOverallBudget: 10 * time.Second, // much larger than WatchTimeout + Resync: 0, + OnSubstate: subs.record, + } + w, err := NewWatcher(cfg, rec.emit) + if err != nil { + t.Fatalf("NewWatcher: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + start := time.Now() + _, _ = w.Watch(ctx) + elapsed := time.Since(start) + + // The watch must return within a reasonable bound — the inner + // WatchTimeout is 250ms, plus probe-attempt timeouts of 50ms + // each. We give 2s of slack. + if elapsed > 2*time.Second { + t.Errorf("Watch took %s — expected to return within 2s of WatchTimeout firing", elapsed) + } +} diff --git a/products/catalyst/bootstrap/api/internal/provisioner/provisioner.go b/products/catalyst/bootstrap/api/internal/provisioner/provisioner.go index 06acd993..eb98380e 100644 --- a/products/catalyst/bootstrap/api/internal/provisioner/provisioner.go +++ b/products/catalyst/bootstrap/api/internal/provisioner/provisioner.go @@ -664,6 +664,25 @@ type Result struct { // nil while Phase 1 is in flight or has not started. Phase1FinishedAt *time.Time `json:"phase1FinishedAt,omitempty"` + // Phase1Substate — live sub-status while Phase 1 is in flight + // (issue #923). Set by helmwatch.Watcher.OnSubstate as the watch + // progresses through pre-flight reachability + cache-sync. Cleared + // (set to "") once the watch terminates and Phase1Outcome is + // stamped. The Sovereign Admin's wizard banner reads this to + // render a granular status pill while Status itself stays + // "phase1-watching": + // + // - "watcher-reconnecting" — apiserver was unreachable and the + // pre-flight probe is retrying with backoff (typical after a + // catalyst-api Pod restart while the LB / kube-vip warms up) + // - "watcher-watching" — apiserver reachable, informer + // attached, observing per-component HelmRelease transitions + // + // Empty while Phase 1 has not started, has terminated, or the + // build of catalyst-api predates the substate field — the wizard + // falls back to rendering the bare Status pill. + Phase1Substate string `json:"phase1Substate,omitempty"` + // Phase1Outcome — terminal classification of the Phase-1 watch. // One of: //