fix(catalyst-api): Phase-1 watcher TLS handshake retries + reconnect substate after Pod restart (#923) (#929)
When the catalyst-api Pod restarts mid-Phase-1 (image roll, kustomization
apply, OOM kill), the new Pod rehydrated the deployment correctly but if
the apiserver was transiently unreachable (LB warm-up race, kube-vip
flap) the informer's WaitForCacheSync blocked silently for the full
60-minute WatchTimeout, leaving the wizard frozen with empty
componentStates and no progress events.
Live evidence (otech106 c87307c580453536, 2026-05-05): catalyst-api
rolled at 10:50 from :e08d872 → :0a72150; new Pod's TLS handshake to
5.161.50.175:6443 hung indefinitely; phase1-watching status persisted
without any SSE events.
Three coupled fixes:
1. helmwatch/kubeconfig.go: stamp rest.Config.Timeout = 30s on every
client built from the kubeconfig, so individual List/Watch/Get
calls fail fast and the informer's internal retry loop has a chance
to recover when transient TLS / LB flaps clear.
2. helmwatch/helmwatch.go: pre-flight reachability probe
(runReachabilityProbe) before factory.Start. Probes the apiserver
/version endpoint via discovery client with a 10s per-attempt
timeout, retries with 5s → 60s exponential backoff up to a
10-minute overall budget. Each failed attempt emits a
warn-level "Sovereign apiserver unreachable" diagnostic into the
SSE stream so the wizard log pane shows live progress instead of
going dark. On success we proceed to factory.Start; on
budget-exhausted we still proceed (the informer's own
WaitForCacheSync timeout will then classify as
OutcomeFluxNotReconciling — exactly the right diagnostic for a
genuinely unreachable apiserver).
3. handler/phase1_watch.go + provisioner.Result.Phase1Substate: the
watcher fires OnSubstate("watcher-reconnecting") on the first
failed probe and OnSubstate("watcher-watching") on the eventual
success. setPhase1Substate persists the field so a /deployments/
{id} GET returns the live sub-status, surfaced to the top level
in State() so the wizard banner can render "reconnecting…" while
Status itself stays "phase1-watching". markPhase1Done clears the
field on terminal classification.
Every knob is runtime-configurable via env var per
docs/INVIOLABLE-PRINCIPLES.md #4: CATALYST_PHASE1_REACHABILITY_BUDGET
(overall budget, default 10m). Per-attempt timeout + backoff knobs
default to helmwatch package constants and are overridable via Config
fields for tests.
Tests:
- internal/helmwatch/reachability_test.go (NEW): 4 tests covering
happy-path (single attempt succeeds, no reconnecting events),
transient-then-success (2 failures + 1 success, 2 warn events,
substate flips reconnecting → watching, OutcomeReady), budget-
exhausted (loop falls through to informer rather than hard-failing),
and context-cancel during probe (clean return within bound).
- internal/handler/phase1_watch_test.go: 4 new tests covering env
var override, field override beats env, OnSubstate wiring updates
Result.Phase1Substate during the run and clears on terminate, and
State() lifts the field to the top-level snapshot.
All existing helmwatch + phase1 handler tests still pass (15s + 1.7s
suites). Pre-existing failures in TestAuthHandover_*, TestPersistence_*,
TestCreateDeployment_* are unchanged on main and unrelated.
Co-authored-by: hatiyildiz <hati@openova.io>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
650eea59d6
commit
e91679aeb1
@ -760,6 +760,14 @@ func (d *Deployment) State() map[string]any {
|
||||
if d.Result.Phase1Outcome != "" {
|
||||
out["phase1Outcome"] = d.Result.Phase1Outcome
|
||||
}
|
||||
// Issue #923 — lift the live Phase-1 substate to the top
|
||||
// level so the Sovereign Admin's wizard banner can render
|
||||
// "reconnecting…" / "watching…" while Status itself stays
|
||||
// "phase1-watching". Empty string is omitted so a UI client
|
||||
// that pre-dates the substate field never sees a stray key.
|
||||
if d.Result.Phase1Substate != "" {
|
||||
out["phase1Substate"] = d.Result.Phase1Substate
|
||||
}
|
||||
// Issues #764 + #768 — lift the handover-fire surface to the
|
||||
// top level so the wizard's provision-page reducer can drive
|
||||
// the "Open your Sovereign console →" button + auto-redirect
|
||||
|
||||
@ -111,6 +111,28 @@ type Handler struct {
|
||||
phase1LatePollTimeout time.Duration
|
||||
phase1LatePollInterval time.Duration
|
||||
|
||||
// phase1Reachability — test-only override for the pre-flight
|
||||
// apiserver reachability probe (issue #923). Production uses
|
||||
// helmwatch.NewReachabilityProbeFromKubeconfig (discovery client
|
||||
// against /version with rest.Config.Timeout = 30s). Tests inject
|
||||
// a closure that returns N transient errors then nil so the
|
||||
// reconnect path can be exercised without standing up an
|
||||
// apiserver. Zero / nil = "use production default".
|
||||
//
|
||||
// phase1ReachabilityBudget / phase1ReachabilitySleep / probe
|
||||
// timing knobs are the test-friendly overrides for the same
|
||||
// loop. Production reads
|
||||
// CATALYST_PHASE1_REACHABILITY_BUDGET on every Pod start and
|
||||
// passes the parsed Duration into Config.ReachabilityOverallBudget;
|
||||
// the per-attempt + backoff knobs default to the helmwatch
|
||||
// constants and tests inject tiny values for fast runtimes.
|
||||
phase1Reachability func(kubeconfigYAML string) func(ctx context.Context) error
|
||||
phase1ReachabilityBudget time.Duration
|
||||
phase1ReachabilitySleep func(ctx context.Context, d time.Duration)
|
||||
phase1ReachabilityProbeTimeout time.Duration
|
||||
phase1ReachabilityRetryInitial time.Duration
|
||||
phase1ReachabilityRetryMax time.Duration
|
||||
|
||||
// kubeconfigArrivalTimeout / kubeconfigArrivalPollInterval —
|
||||
// runtime knobs for the polling loop that waits for cloud-init
|
||||
// to PUT the new Sovereign's kubeconfig. Zero falls back to the
|
||||
|
||||
@ -75,6 +75,13 @@ const phase1LatePollTimeoutEnv = "CATALYST_PHASE1_LATE_POLL_TIMEOUT"
|
||||
// progress events. Default DefaultLatePollInterval (30s).
|
||||
const phase1LatePollIntervalEnv = "CATALYST_PHASE1_LATE_POLL_INTERVAL"
|
||||
|
||||
// phase1ReachabilityBudgetEnv — env var override for the overall
|
||||
// budget of the pre-flight reachability probe (issue #923). Default
|
||||
// DefaultReachabilityOverallBudget (10m). Per docs/INVIOLABLE-
|
||||
// PRINCIPLES.md #4 every knob is runtime-configurable; production
|
||||
// reads this on every Pod start.
|
||||
const phase1ReachabilityBudgetEnv = "CATALYST_PHASE1_REACHABILITY_BUDGET"
|
||||
|
||||
// kubeconfigArrivalTimeoutEnv — how long runPhase1Watch waits for the
|
||||
// cloud-init PUT to land /var/lib/catalyst/kubeconfigs/<id>.yaml on
|
||||
// disk before giving up with OutcomeKubeconfigMissing. Cloud-init
|
||||
@ -261,13 +268,30 @@ func (h *Handler) phase1WatchConfigForDeployment(dep *Deployment, kubeconfig str
|
||||
latePollInterval = helmwatch.CompileLatePollInterval(envOrEmpty(phase1LatePollIntervalEnv))
|
||||
}
|
||||
|
||||
reachabilityBudget := h.phase1ReachabilityBudget
|
||||
if reachabilityBudget == 0 {
|
||||
if v, _ := time.ParseDuration(envOrEmpty(phase1ReachabilityBudgetEnv)); v > 0 {
|
||||
reachabilityBudget = v
|
||||
}
|
||||
}
|
||||
|
||||
cfg := helmwatch.Config{
|
||||
KubeconfigYAML: kubeconfig,
|
||||
WatchTimeout: timeout,
|
||||
MinBootstrapKitHRs: minHRs,
|
||||
FirstSeenTimeout: firstSeen,
|
||||
LatePollTimeout: latePollTimeout,
|
||||
LatePollInterval: latePollInterval,
|
||||
KubeconfigYAML: kubeconfig,
|
||||
WatchTimeout: timeout,
|
||||
MinBootstrapKitHRs: minHRs,
|
||||
FirstSeenTimeout: firstSeen,
|
||||
LatePollTimeout: latePollTimeout,
|
||||
LatePollInterval: latePollInterval,
|
||||
ReachabilityOverallBudget: reachabilityBudget,
|
||||
// OnSubstate — issue #923. The watcher fires this on every
|
||||
// Phase-1 substate transition (reconnecting → watching). We
|
||||
// stamp Result.Phase1Substate under dep.mu so a /deployments/
|
||||
// {id} GET that races the substate change reads the live
|
||||
// value, not a stale "phase1-watching" with no further
|
||||
// signal.
|
||||
OnSubstate: func(substate string) {
|
||||
h.setPhase1Substate(dep, substate)
|
||||
},
|
||||
}
|
||||
if h.dynamicFactory != nil {
|
||||
cfg.DynamicFactory = h.dynamicFactory
|
||||
@ -275,12 +299,52 @@ func (h *Handler) phase1WatchConfigForDeployment(dep *Deployment, kubeconfig str
|
||||
if h.coreFactory != nil {
|
||||
cfg.CoreFactory = h.coreFactory
|
||||
}
|
||||
if h.phase1Reachability != nil {
|
||||
cfg.Reachability = h.phase1Reachability
|
||||
}
|
||||
if h.phase1WatchResync > 0 {
|
||||
cfg.Resync = h.phase1WatchResync
|
||||
}
|
||||
if h.phase1ReachabilitySleep != nil {
|
||||
cfg.Sleep = h.phase1ReachabilitySleep
|
||||
}
|
||||
if h.phase1ReachabilityProbeTimeout > 0 {
|
||||
cfg.ReachabilityProbeTimeout = h.phase1ReachabilityProbeTimeout
|
||||
}
|
||||
if h.phase1ReachabilityRetryInitial > 0 {
|
||||
cfg.ReachabilityRetryInitialInterval = h.phase1ReachabilityRetryInitial
|
||||
}
|
||||
if h.phase1ReachabilityRetryMax > 0 {
|
||||
cfg.ReachabilityRetryMaxInterval = h.phase1ReachabilityRetryMax
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
// setPhase1Substate stamps the live Phase-1 substate onto the
|
||||
// deployment's Result under dep.mu, then persists the deployment
|
||||
// record so a Pod restart between transitions reads the same value
|
||||
// (issue #923).
|
||||
//
|
||||
// The substate field is intentionally informational — it does NOT
|
||||
// flip dep.Status. Status stays "phase1-watching" until markPhase1Done
|
||||
// runs the terminal classification. The wizard banner reads
|
||||
// Result.Phase1Substate to render "reconnecting…" or "watching…"
|
||||
// while Status itself is unchanged.
|
||||
func (h *Handler) setPhase1Substate(dep *Deployment, substate string) {
|
||||
dep.mu.Lock()
|
||||
if dep.Result == nil {
|
||||
dep.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if dep.Result.Phase1Substate == substate {
|
||||
dep.mu.Unlock()
|
||||
return
|
||||
}
|
||||
dep.Result.Phase1Substate = substate
|
||||
dep.mu.Unlock()
|
||||
h.persistDeployment(dep)
|
||||
}
|
||||
|
||||
// markPhase1Done writes the watch outcome onto dep.Result and flips
|
||||
// Status accordingly. Holds dep.mu for the whole transition so a
|
||||
// State() snapshot from another goroutine can't observe Status=ready
|
||||
@ -326,6 +390,11 @@ func (h *Handler) markPhase1Done(dep *Deployment, finalStates map[string]string,
|
||||
dep.Result.ComponentStates = finalStates
|
||||
dep.Result.Phase1FinishedAt = &now
|
||||
dep.Result.Phase1Outcome = outcome
|
||||
// Clear the in-flight substate (issue #923) — the watch has
|
||||
// terminated and Phase1Outcome is the authoritative classification
|
||||
// from this point. The wizard banner falls back to rendering the
|
||||
// Status pill alone once Phase1Substate is empty.
|
||||
dep.Result.Phase1Substate = ""
|
||||
|
||||
failed := 0
|
||||
for _, s := range finalStates {
|
||||
|
||||
@ -32,11 +32,14 @@ package handler
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -1301,6 +1304,180 @@ func TestPhase1WatchConfig_LatePollFieldOverrideBeatsEnv(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestPhase1WatchConfig_ReachabilityBudgetEnvVarOverride proves the
|
||||
// CATALYST_PHASE1_REACHABILITY_BUDGET env var parses through
|
||||
// phase1WatchConfigForDeployment (issue #923).
|
||||
func TestPhase1WatchConfig_ReachabilityBudgetEnvVarOverride(t *testing.T) {
|
||||
h := NewWithPDM(silentLogger(), &fakePDM{})
|
||||
t.Setenv("CATALYST_PHASE1_REACHABILITY_BUDGET", "3m")
|
||||
|
||||
dep := makeDeploymentWithKubeconfig(t, h, "phase1-reach-budget-env", "fake-kubeconfig: yaml")
|
||||
cfg := h.phase1WatchConfigForDeployment(dep, "fake-kubeconfig: yaml")
|
||||
|
||||
if cfg.ReachabilityOverallBudget != 3*time.Minute {
|
||||
t.Errorf("ReachabilityOverallBudget = %v, want 3m (from env)", cfg.ReachabilityOverallBudget)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPhase1WatchConfig_ReachabilityFieldOverrideBeatsEnv proves the
|
||||
// test-injection precedence for the reachability budget (issue
|
||||
// #923). Mirrors the FieldOverrideBeatsEnv contract for every other
|
||||
// Phase-1 knob.
|
||||
func TestPhase1WatchConfig_ReachabilityFieldOverrideBeatsEnv(t *testing.T) {
|
||||
h := NewWithPDM(silentLogger(), &fakePDM{})
|
||||
h.phase1ReachabilityBudget = 250 * time.Millisecond
|
||||
t.Setenv("CATALYST_PHASE1_REACHABILITY_BUDGET", "3m")
|
||||
|
||||
dep := makeDeploymentWithKubeconfig(t, h, "phase1-reach-budget-field", "fake-kubeconfig: yaml")
|
||||
cfg := h.phase1WatchConfigForDeployment(dep, "fake-kubeconfig: yaml")
|
||||
|
||||
if cfg.ReachabilityOverallBudget != 250*time.Millisecond {
|
||||
t.Errorf("ReachabilityOverallBudget = %v, want 250ms (handler field override)", cfg.ReachabilityOverallBudget)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunPhase1Watch_OnSubstate_StampedOntoResult proves the wiring
|
||||
// from helmwatch.Watcher.OnSubstate → handler.setPhase1Substate →
|
||||
// dep.Result.Phase1Substate (issue #923).
|
||||
//
|
||||
// The fixture uses a Reachability factory that fails twice then
|
||||
// succeeds. The watcher's reconnect loop fires SubstateReconnecting
|
||||
// on the first failed probe and SubstateWatching on the eventual
|
||||
// success; markPhase1Done then clears Phase1Substate to "" once the
|
||||
// watch terminates cleanly. We assert the final value is empty AND
|
||||
// that during the run the field was non-empty at least once — the
|
||||
// presence-during-the-run signal the wizard banner reads.
|
||||
func TestRunPhase1Watch_OnSubstate_StampedOntoResult(t *testing.T) {
|
||||
h := NewWithPDM(silentLogger(), &fakePDM{})
|
||||
h.dynamicFactory = fakeDynamicFactoryFromObjects(makeReadyHR("bp-cilium"))
|
||||
h.phase1WatchTimeout = 5 * time.Second
|
||||
|
||||
// Reachability probe: fail twice then succeed. We use atomic
|
||||
// counter so the closure can be called concurrently without
|
||||
// data race.
|
||||
var probeCalls int32
|
||||
h.phase1Reachability = func(_ string) func(ctx context.Context) error {
|
||||
return func(_ context.Context) error {
|
||||
n := atomic.AddInt32(&probeCalls, 1)
|
||||
if n <= 2 {
|
||||
return errors.New("net/http: TLS handshake timeout")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// Tiny intervals + no-op sleep so the loop runs in microseconds.
|
||||
h.phase1ReachabilitySleep = func(_ context.Context, _ time.Duration) {}
|
||||
h.phase1ReachabilityProbeTimeout = 100 * time.Millisecond
|
||||
h.phase1ReachabilityRetryInitial = 1 * time.Millisecond
|
||||
h.phase1ReachabilityRetryMax = 1 * time.Millisecond
|
||||
h.phase1ReachabilityBudget = 5 * time.Second
|
||||
|
||||
// Custom test recorder: snapshot Phase1Substate every time it
|
||||
// changes by hooking via a goroutine-safe poll on dep.Result.
|
||||
// Since the watcher writes under dep.mu and we read under the
|
||||
// same lock, no data race.
|
||||
dep := makeDeploymentWithKubeconfig(t, h, "phase1-substate-wiring", "fake-kubeconfig: yaml")
|
||||
|
||||
// Spawn a poll goroutine that records every distinct Phase1Substate
|
||||
// value. We start it BEFORE runPhase1Watch returns so we observe
|
||||
// the in-flight transitions.
|
||||
stopPoll := make(chan struct{})
|
||||
pollDone := make(chan struct{})
|
||||
var (
|
||||
pollMu sync.Mutex
|
||||
distinctSubs []string
|
||||
)
|
||||
go func() {
|
||||
defer close(pollDone)
|
||||
var last string
|
||||
t := time.NewTicker(1 * time.Millisecond)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-stopPoll:
|
||||
return
|
||||
case <-t.C:
|
||||
dep.mu.Lock()
|
||||
cur := ""
|
||||
if dep.Result != nil {
|
||||
cur = dep.Result.Phase1Substate
|
||||
}
|
||||
dep.mu.Unlock()
|
||||
if cur != last {
|
||||
pollMu.Lock()
|
||||
distinctSubs = append(distinctSubs, cur)
|
||||
pollMu.Unlock()
|
||||
last = cur
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
h.runPhase1Watch(dep)
|
||||
|
||||
close(stopPoll)
|
||||
<-pollDone
|
||||
|
||||
dep.mu.Lock()
|
||||
defer dep.mu.Unlock()
|
||||
if dep.Status != "ready" {
|
||||
t.Errorf("Status = %q, want %q", dep.Status, "ready")
|
||||
}
|
||||
// Phase1Substate must be cleared after terminal classification.
|
||||
if dep.Result.Phase1Substate != "" {
|
||||
t.Errorf("Phase1Substate = %q after terminate, want empty", dep.Result.Phase1Substate)
|
||||
}
|
||||
|
||||
// During the run, the field must have transitioned through
|
||||
// reconnecting → watching at least. The poll runs at 1ms so it's
|
||||
// very likely to catch both — but we tolerate the race where the
|
||||
// poll only catches one of the in-flight values, and assert at
|
||||
// least one non-empty value was observed.
|
||||
pollMu.Lock()
|
||||
defer pollMu.Unlock()
|
||||
sawNonEmpty := false
|
||||
for _, v := range distinctSubs {
|
||||
if v != "" {
|
||||
sawNonEmpty = true
|
||||
}
|
||||
}
|
||||
if !sawNonEmpty {
|
||||
t.Errorf("expected Phase1Substate to be non-empty at some point during the run; observed transitions = %v", distinctSubs)
|
||||
}
|
||||
}
|
||||
|
||||
// TestState_SurfacesPhase1Substate proves the State() snapshot lifts
|
||||
// dep.Result.Phase1Substate to the top-level "phase1Substate" key
|
||||
// when non-empty, and omits it when empty (issue #923).
|
||||
func TestState_SurfacesPhase1Substate(t *testing.T) {
|
||||
dep := &Deployment{
|
||||
ID: "phase1-substate-state",
|
||||
Status: "phase1-watching",
|
||||
StartedAt: time.Now(),
|
||||
Request: provisioner.Request{SovereignFQDN: "test.example"},
|
||||
Result: &provisioner.Result{
|
||||
SovereignFQDN: "test.example",
|
||||
Phase1Substate: helmwatch.SubstateReconnecting,
|
||||
},
|
||||
}
|
||||
state := dep.State()
|
||||
got, ok := state["phase1Substate"]
|
||||
if !ok {
|
||||
t.Fatalf("State() missing phase1Substate key when Result.Phase1Substate is non-empty")
|
||||
}
|
||||
if got != helmwatch.SubstateReconnecting {
|
||||
t.Errorf("State()[phase1Substate] = %v, want %q", got, helmwatch.SubstateReconnecting)
|
||||
}
|
||||
|
||||
// Clear and re-check — empty substate must NOT surface in the
|
||||
// snapshot (the wizard's reducer never receives a "" value).
|
||||
dep.Result.Phase1Substate = ""
|
||||
state2 := dep.State()
|
||||
if _, ok := state2["phase1Substate"]; ok {
|
||||
t.Errorf("State() still surfaces phase1Substate after clearing the field; should be omitted")
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────
|
||||
// helpers
|
||||
// ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
@ -172,6 +172,53 @@ const DefaultLatePollTimeout = 10 * time.Minute
|
||||
// Operator override: CATALYST_PHASE1_LATE_POLL_INTERVAL.
|
||||
const DefaultLatePollInterval = 30 * time.Second
|
||||
|
||||
// DefaultReachabilityProbeTimeout — per-attempt timeout for the
|
||||
// pre-flight apiserver reachability probe (issue #923). After a
|
||||
// catalyst-api Pod restart, the new Pod's TLS handshake to the
|
||||
// Sovereign apiserver can hang for the full WatchTimeout (60m
|
||||
// default) when the LB is mid-warm-up or kube-vip is flapping. The
|
||||
// probe forces individual attempts to fail fast so we can retry with
|
||||
// backoff while emitting "watcher-reconnecting" diagnostics into the
|
||||
// SSE stream.
|
||||
//
|
||||
// 10 seconds is sized well above the healthy handshake observed in
|
||||
// production (sub-second on otech10x and on contabo's BankDhofar
|
||||
// Qwen probe — same network position), with headroom for slow
|
||||
// Hetzner regions.
|
||||
const DefaultReachabilityProbeTimeout = 10 * time.Second
|
||||
|
||||
// DefaultReachabilityRetryInitialInterval — initial backoff for the
|
||||
// pre-flight reachability probe (issue #923). Each failed attempt
|
||||
// doubles the wait up to DefaultReachabilityRetryMaxInterval.
|
||||
const DefaultReachabilityRetryInitialInterval = 5 * time.Second
|
||||
|
||||
// DefaultReachabilityRetryMaxInterval — upper bound on the pre-flight
|
||||
// reachability probe backoff (issue #923). Caps the inter-attempt
|
||||
// gap so a long unreachable window still emits a "watcher-
|
||||
// reconnecting" event at the wizard log pane at least once a minute.
|
||||
const DefaultReachabilityRetryMaxInterval = 60 * time.Second
|
||||
|
||||
// DefaultReachabilityOverallBudget — overall budget for the pre-flight
|
||||
// reachability loop before falling through to factory.Start (issue
|
||||
// #923). 10 minutes is generous: a healthy Sovereign answers within
|
||||
// seconds. After this, the informer cache-sync path takes over with
|
||||
// classifyOutcomeOnContextEnd → OutcomeFluxNotReconciling as the
|
||||
// terminal classification — exactly the right diagnostic for a
|
||||
// genuinely unreachable apiserver.
|
||||
//
|
||||
// Operator override: CATALYST_PHASE1_REACHABILITY_BUDGET.
|
||||
const DefaultReachabilityOverallBudget = 10 * time.Minute
|
||||
|
||||
// DefaultRESTConfigTimeout — per-request timeout we set on the REST
|
||||
// config built by NewDynamicClientFromKubeconfig /
|
||||
// NewKubernetesClientFromKubeconfig (issue #923). Without this, the
|
||||
// rest.Config has no per-request deadline and a hung TLS handshake
|
||||
// can block the informer's List call until the overall WatchTimeout
|
||||
// fires (60m). 30 seconds caps individual REST calls so the
|
||||
// informer's internal retry loop has a chance to recover when
|
||||
// transient TLS / LB flaps clear.
|
||||
const DefaultRESTConfigTimeout = 30 * time.Second
|
||||
|
||||
// Phase-1 outcome strings — Watcher.Outcome() returns one of these so
|
||||
// the handler can set Result.Phase1Outcome (read by the Sovereign
|
||||
// Admin banner). Empty string means the watch has not yet terminated.
|
||||
@ -225,6 +272,23 @@ const (
|
||||
StateFailed = "failed"
|
||||
)
|
||||
|
||||
// Phase-1 sub-state strings — surfaced via OnSubstate so the handler
|
||||
// can stamp them onto Deployment.Result.Phase1Substate. Read by the
|
||||
// Sovereign Admin's wizard banner to render a more granular status
|
||||
// pill while Status itself stays "phase1-watching" (issue #923).
|
||||
const (
|
||||
// SubstateReconnecting — set while the pre-flight reachability
|
||||
// probe is retrying after a Pod restart. Cleared (set to "") once
|
||||
// the apiserver answers and the informer's first sync completes.
|
||||
SubstateReconnecting = "watcher-reconnecting"
|
||||
|
||||
// SubstateWatching — set immediately after a successful
|
||||
// reachability probe + WaitForCacheSync return. The watch is now
|
||||
// observing live HelmRelease transitions; the wizard's "X of Y
|
||||
// installed" pill takes over from this point.
|
||||
SubstateWatching = "watcher-watching"
|
||||
)
|
||||
|
||||
// terminalStates — once a component reaches one of these, the watch
|
||||
// stops emitting state-change events for it. "degraded" is NOT terminal
|
||||
// — a degraded component can recover (Flux retries automatically); the
|
||||
@ -330,6 +394,63 @@ type Config struct {
|
||||
// (30s). Zero means "fall back to default"; tests inject a tiny
|
||||
// value (e.g. 50ms).
|
||||
LatePollInterval time.Duration
|
||||
|
||||
// Reachability — pre-flight apiserver reachability probe (issue
|
||||
// #923). Production wires NewReachabilityProbeFromKubeconfig which
|
||||
// builds a discovery client and calls ServerVersion() with a short
|
||||
// per-attempt timeout. Tests inject a closure that returns N
|
||||
// transient errors then nil to exercise the reconnect path
|
||||
// deterministically.
|
||||
//
|
||||
// The contract is: a non-nil return means "apiserver did not
|
||||
// answer this attempt — retry with backoff and emit a watcher-
|
||||
// reconnecting event"; nil means "ready to start informer".
|
||||
//
|
||||
// nil here means: fall back to NewReachabilityProbeFromKubeconfig
|
||||
// (production default). Setting to an explicit no-op
|
||||
// (`func(_ string) func(context.Context) error { return func(context.Context) error { return nil } }`)
|
||||
// disables the probe — used by tests that exercise the
|
||||
// post-reachability informer path without dragging the probe
|
||||
// retry loop into the test runtime.
|
||||
Reachability func(kubeconfigYAML string) func(ctx context.Context) error
|
||||
|
||||
// ReachabilityProbeTimeout — per-attempt timeout for the pre-flight
|
||||
// reachability probe. Defaults to DefaultReachabilityProbeTimeout
|
||||
// (10s).
|
||||
ReachabilityProbeTimeout time.Duration
|
||||
|
||||
// ReachabilityRetryInitialInterval — initial inter-attempt gap for
|
||||
// the reachability backoff. Defaults to
|
||||
// DefaultReachabilityRetryInitialInterval (5s).
|
||||
ReachabilityRetryInitialInterval time.Duration
|
||||
|
||||
// ReachabilityRetryMaxInterval — upper bound on the reachability
|
||||
// backoff. Defaults to DefaultReachabilityRetryMaxInterval (60s).
|
||||
ReachabilityRetryMaxInterval time.Duration
|
||||
|
||||
// ReachabilityOverallBudget — overall budget for the reachability
|
||||
// loop. After this, Watch falls through to factory.Start +
|
||||
// WaitForCacheSync, which will then time out via WatchTimeout and
|
||||
// classify as OutcomeFluxNotReconciling (correct diagnostic for a
|
||||
// genuinely unreachable apiserver). Defaults to
|
||||
// DefaultReachabilityOverallBudget (10m).
|
||||
ReachabilityOverallBudget time.Duration
|
||||
|
||||
// Sleep — sleep injection for the reachability backoff (issue
|
||||
// #923). Production passes a closure backed by time.NewTimer that
|
||||
// honours ctx; tests inject a no-op so the retry loop runs in
|
||||
// microseconds. Defaults to a real-time sleep that blocks until
|
||||
// the timer or ctx fires, whichever first.
|
||||
Sleep func(ctx context.Context, d time.Duration)
|
||||
|
||||
// OnSubstate — fired whenever the Phase-1 substate transitions
|
||||
// (issue #923). Production wires this to a callback that updates
|
||||
// Deployment.Result.Phase1Substate under dep.mu so the Sovereign
|
||||
// Admin's wizard sees the live "reconnecting…" pill instead of a
|
||||
// stale "phase1-watching". Optional; nil disables substate
|
||||
// notifications (the Watcher still runs the probe, just without
|
||||
// surfacing the field on the deployment record).
|
||||
OnSubstate func(substate string)
|
||||
}
|
||||
|
||||
func (c *Config) applyDefaults() {
|
||||
@ -354,6 +475,49 @@ func (c *Config) applyDefaults() {
|
||||
if c.LatePollInterval <= 0 {
|
||||
c.LatePollInterval = DefaultLatePollInterval
|
||||
}
|
||||
if c.ReachabilityProbeTimeout <= 0 {
|
||||
c.ReachabilityProbeTimeout = DefaultReachabilityProbeTimeout
|
||||
}
|
||||
if c.ReachabilityRetryInitialInterval <= 0 {
|
||||
c.ReachabilityRetryInitialInterval = DefaultReachabilityRetryInitialInterval
|
||||
}
|
||||
if c.ReachabilityRetryMaxInterval <= 0 {
|
||||
c.ReachabilityRetryMaxInterval = DefaultReachabilityRetryMaxInterval
|
||||
}
|
||||
if c.ReachabilityOverallBudget <= 0 {
|
||||
c.ReachabilityOverallBudget = DefaultReachabilityOverallBudget
|
||||
}
|
||||
if c.Sleep == nil {
|
||||
c.Sleep = sleepWithContext
|
||||
}
|
||||
}
|
||||
|
||||
// noopReachability is the test-only default for Config.Reachability
|
||||
// when a fake DynamicFactory is also supplied (issue #923). It
|
||||
// returns a probe that always succeeds, so a dynamic-fake-client
|
||||
// test is not forced to also stand up a fake reachability probe
|
||||
// just to exercise the post-probe informer path. A test that
|
||||
// specifically exercises the reconnect loop sets Config.Reachability
|
||||
// to a closure that returns transient errors first.
|
||||
func noopReachability(_ string) func(ctx context.Context) error {
|
||||
return func(_ context.Context) error { return nil }
|
||||
}
|
||||
|
||||
// sleepWithContext sleeps for d, returning early when ctx is done.
|
||||
// Used as the production default for Config.Sleep so the reachability
|
||||
// retry loop wakes immediately on context cancel (Pod shutdown,
|
||||
// caller cancel, overall WatchTimeout). Tests inject a no-op closure
|
||||
// so the backoff runs in microseconds.
|
||||
func sleepWithContext(ctx context.Context, d time.Duration) {
|
||||
if d <= 0 {
|
||||
return
|
||||
}
|
||||
t := time.NewTimer(d)
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
// Watcher observes bp-* HelmReleases in flux-system on the new
|
||||
@ -523,6 +687,7 @@ func NewWatcher(cfg Config, emit Emit) (*Watcher, error) {
|
||||
if strings.TrimSpace(cfg.KubeconfigYAML) == "" {
|
||||
return nil, errors.New("helmwatch: kubeconfig is required (deployment.Result.KubeconfigPath was empty or the file was unreadable)")
|
||||
}
|
||||
usingTestDynamicFactory := cfg.DynamicFactory != nil
|
||||
if cfg.DynamicFactory == nil {
|
||||
cfg.DynamicFactory = NewDynamicClientFromKubeconfig
|
||||
}
|
||||
@ -535,6 +700,24 @@ func NewWatcher(cfg Config, emit Emit) (*Watcher, error) {
|
||||
// Tests still inject a fake.NewSimpleClientset via Config.CoreFactory.
|
||||
cfg.CoreFactory = NewKubernetesClientFromKubeconfig
|
||||
}
|
||||
if cfg.Reachability == nil {
|
||||
// Production default: discovery-client ServerVersion() against
|
||||
// the Sovereign apiserver, with the per-attempt timeout taken
|
||||
// from cfg.ReachabilityProbeTimeout (issue #923).
|
||||
//
|
||||
// Tests that inject a fake DynamicFactory but do NOT explicitly
|
||||
// set Reachability get a no-op probe by default — there is no
|
||||
// real apiserver to probe and we don't want every existing
|
||||
// dynamic-fake-client test to hang against the production
|
||||
// probe trying to hit "fake-kubeconfig: bytes". A test that
|
||||
// specifically exercises the reconnect path overrides
|
||||
// Reachability with its own closure.
|
||||
if usingTestDynamicFactory {
|
||||
cfg.Reachability = noopReachability
|
||||
} else {
|
||||
cfg.Reachability = NewReachabilityProbeFromKubeconfig
|
||||
}
|
||||
}
|
||||
cfg.applyDefaults()
|
||||
return &Watcher{
|
||||
cfg: cfg,
|
||||
@ -576,6 +759,33 @@ func (w *Watcher) Watch(ctx context.Context) (map[string]string, error) {
|
||||
watchCtx, cancel := context.WithTimeout(ctx, w.cfg.WatchTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Pre-flight reachability probe — issue #923. Catalyst-api Pod
|
||||
// restart in the middle of Phase-1 leaves the new Pod with no live
|
||||
// connection to the Sovereign apiserver. If we go straight to
|
||||
// factory.Start, an unreachable apiserver causes
|
||||
// cache.WaitForCacheSync to block silently for the entire
|
||||
// WatchTimeout (60m) — the deployment record's componentStates
|
||||
// stays empty and the wizard log pane goes dark. Probe with a
|
||||
// short per-attempt timeout, retry with backoff, and emit
|
||||
// "watcher-reconnecting" events so the operator sees live
|
||||
// progress.
|
||||
//
|
||||
// On success: substate flips to SubstateWatching and we proceed to
|
||||
// factory.Start. On overall-budget exhaustion: we still proceed
|
||||
// to factory.Start (the informer's own timeout path will then
|
||||
// classify as OutcomeFluxNotReconciling — exactly the right
|
||||
// terminal diagnostic for a genuinely unreachable apiserver).
|
||||
if !w.runReachabilityProbe(watchCtx) {
|
||||
// runReachabilityProbe returns false only when watchCtx is
|
||||
// done — meaning the overall WatchTimeout fired during the
|
||||
// probe loop. Surface terminal classification immediately
|
||||
// rather than building an informer that will time out on the
|
||||
// same condition.
|
||||
final := w.terminalStatesSnapshot()
|
||||
w.setOutcome(w.classifyOutcomeOnContextEnd())
|
||||
return final, fmt.Errorf("helmwatch: pre-flight reachability probe cancelled by context: %w", watchCtx.Err())
|
||||
}
|
||||
|
||||
// Stash the cancel func so Watcher.Cancel() can interrupt a
|
||||
// long-running watch from another goroutine — the handover
|
||||
// finalisation handler (issue #317) calls this when
|
||||
@ -1050,6 +1260,125 @@ func (w *Watcher) maybeEmitFirstSeenWarn(firstSeenStart time.Time) {
|
||||
})
|
||||
}
|
||||
|
||||
// runReachabilityProbe drives the pre-flight apiserver reachability
|
||||
// loop (issue #923). Returns true on success (apiserver answered OR
|
||||
// the overall reachability budget elapsed — caller should fall
|
||||
// through to factory.Start either way). Returns false only when
|
||||
// watchCtx fires during the probe loop — caller exits early with
|
||||
// classifyOutcomeOnContextEnd.
|
||||
//
|
||||
// The first attempt is a single info-level "starting reachability
|
||||
// probe" event so the wizard log pane shows the watch is alive.
|
||||
// Each subsequent failed attempt emits a warn-level
|
||||
// "watcher-reconnecting" event naming the attempt number + the
|
||||
// observed error. On success a single info-level "reachable" event
|
||||
// fires and substate flips to SubstateWatching. The first failed
|
||||
// attempt also flips substate to SubstateReconnecting so the wizard
|
||||
// banner can render "reconnecting…" while the loop runs.
|
||||
//
|
||||
// Backoff: starts at ReachabilityRetryInitialInterval (5s), doubles
|
||||
// each failed attempt, caps at ReachabilityRetryMaxInterval (60s).
|
||||
// Sleeps via cfg.Sleep so tests can inject a no-op and exercise the
|
||||
// loop in microseconds.
|
||||
func (w *Watcher) runReachabilityProbe(watchCtx context.Context) bool {
|
||||
probe := w.cfg.Reachability(w.cfg.KubeconfigYAML)
|
||||
budgetDeadline := w.cfg.Now().Add(w.cfg.ReachabilityOverallBudget)
|
||||
interval := w.cfg.ReachabilityRetryInitialInterval
|
||||
|
||||
attempt := 0
|
||||
for {
|
||||
attempt++
|
||||
// Per-attempt context with the configured probe timeout, so a
|
||||
// hung TLS handshake fails fast and we can retry.
|
||||
probeCtx, probeCancel := context.WithTimeout(watchCtx, w.cfg.ReachabilityProbeTimeout)
|
||||
err := probe(probeCtx)
|
||||
probeCancel()
|
||||
|
||||
if err == nil {
|
||||
// Apiserver reachable — flip substate to watching, emit a
|
||||
// concise "reachable" diagnostic, return success.
|
||||
if attempt > 1 {
|
||||
w.dispatch(provisioner.Event{
|
||||
Time: w.cfg.Now().UTC().Format(time.RFC3339),
|
||||
Phase: PhaseComponent,
|
||||
Level: "info",
|
||||
Message: fmt.Sprintf(
|
||||
"Phase-1 watch: Sovereign apiserver reachable on attempt %d — starting per-component HelmRelease watch.",
|
||||
attempt,
|
||||
),
|
||||
})
|
||||
}
|
||||
w.notifySubstate(SubstateWatching)
|
||||
return true
|
||||
}
|
||||
|
||||
// Probe failed — surface diagnostic. First failure is the
|
||||
// transition from "starting" to "reconnecting"; subsequent
|
||||
// failures emit a re-attempt summary so the wizard log pane
|
||||
// shows the loop is alive.
|
||||
w.notifySubstate(SubstateReconnecting)
|
||||
w.dispatch(provisioner.Event{
|
||||
Time: w.cfg.Now().UTC().Format(time.RFC3339),
|
||||
Phase: PhaseComponent,
|
||||
Level: "warn",
|
||||
Message: fmt.Sprintf(
|
||||
"Phase-1 watch: Sovereign apiserver unreachable on attempt %d (%s); retrying in %s — typically a transient TLS handshake / LB warm-up after Pod restart.",
|
||||
attempt,
|
||||
err.Error(),
|
||||
interval,
|
||||
),
|
||||
})
|
||||
|
||||
// Check the overall reachability budget BEFORE sleeping so we
|
||||
// don't sleep past the budget for nothing. Once the budget is
|
||||
// exhausted we still return true and let the caller fall
|
||||
// through to factory.Start — the informer's own retry path
|
||||
// + WaitForCacheSync timeout will then classify as
|
||||
// OutcomeFluxNotReconciling on a genuinely unreachable
|
||||
// apiserver, exactly the right operator-actionable diagnostic.
|
||||
if w.cfg.Now().After(budgetDeadline) {
|
||||
w.dispatch(provisioner.Event{
|
||||
Time: w.cfg.Now().UTC().Format(time.RFC3339),
|
||||
Phase: PhaseComponent,
|
||||
Level: "warn",
|
||||
Message: fmt.Sprintf(
|
||||
"Phase-1 watch: reachability budget %s exhausted after %d attempts; falling through to informer (will time out as flux-not-reconciling on a genuinely unreachable apiserver).",
|
||||
w.cfg.ReachabilityOverallBudget,
|
||||
attempt,
|
||||
),
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
// Sleep with cancellation. cfg.Sleep wakes immediately if
|
||||
// watchCtx is done so a Pod-shutdown / overall-WatchTimeout
|
||||
// during a long backoff returns the loop early.
|
||||
w.cfg.Sleep(watchCtx, interval)
|
||||
if watchCtx.Err() != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Exponential backoff up to the cap.
|
||||
interval = interval * 2
|
||||
if interval > w.cfg.ReachabilityRetryMaxInterval {
|
||||
interval = w.cfg.ReachabilityRetryMaxInterval
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// notifySubstate fires the OnSubstate callback if one is configured
|
||||
// (issue #923). Production wires this to a closure on the handler
|
||||
// that updates Deployment.Result.Phase1Substate under dep.mu so a
|
||||
// /deployments/<id> GET returns the live substate. A nil OnSubstate
|
||||
// is the test-only path — the watcher still runs the probe + emits
|
||||
// the corresponding events.
|
||||
func (w *Watcher) notifySubstate(substate string) {
|
||||
if w.cfg.OnSubstate == nil {
|
||||
return
|
||||
}
|
||||
w.cfg.OnSubstate(substate)
|
||||
}
|
||||
|
||||
// classifyOutcomeOnTerminate maps a clean terminate-on-all-done into
|
||||
// OutcomeReady or OutcomeFailed. Called only on the `<-terminated`
|
||||
// branch, where the gate guarantees len(observed) ≥
|
||||
|
||||
@ -8,23 +8,56 @@
|
||||
package helmwatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
)
|
||||
|
||||
// restConfigFromKubeconfig parses the raw kubeconfig YAML and stamps
|
||||
// a per-request Timeout on the resulting rest.Config (issue #923).
|
||||
// Without this, individual List/Watch/Get calls from the informer
|
||||
// have no per-request deadline — a hung TLS handshake to the
|
||||
// Sovereign apiserver after a catalyst-api Pod restart can block
|
||||
// for the full WatchTimeout (60m) silently. DefaultRESTConfigTimeout
|
||||
// (30s) caps each REST call so client-go's internal retry loop has
|
||||
// a chance to recover when the LB warms up or kube-vip stops
|
||||
// flapping.
|
||||
//
|
||||
// Pulled out of NewDynamicClientFromKubeconfig / NewKubernetesClient-
|
||||
// FromKubeconfig so the production reachability probe
|
||||
// (NewReachabilityProbeFromKubeconfig) can share the same parse +
|
||||
// timeout-stamping path.
|
||||
func restConfigFromKubeconfig(kubeconfigYAML string) (*rest.Config, error) {
|
||||
cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfigYAML))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse kubeconfig: %w", err)
|
||||
}
|
||||
// Stamp the per-request timeout. clientcmd never sets one by
|
||||
// default, so a hung handshake stays hung forever (issue #923).
|
||||
cfg.Timeout = DefaultRESTConfigTimeout
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// NewDynamicClientFromKubeconfig builds a dynamic.Interface from raw
|
||||
// kubeconfig YAML. The kubeconfig is the new Sovereign cluster's
|
||||
// k3s.yaml (rewritten with the load-balancer's public IP — the
|
||||
// in-VM 127.0.0.1 server URL is invariant the fetcher must rewrite,
|
||||
// but that rewrite happens upstream in the Phase-0 fetch step, NOT
|
||||
// here).
|
||||
//
|
||||
// Per issue #923 the rest.Config carries a 30s per-request timeout
|
||||
// so a hung TLS handshake after Pod restart fails fast and the
|
||||
// pre-flight reachability probe can retry with backoff instead of
|
||||
// the informer hanging silently for the entire WatchTimeout.
|
||||
func NewDynamicClientFromKubeconfig(kubeconfigYAML string) (dynamic.Interface, error) {
|
||||
cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfigYAML))
|
||||
cfg, err := restConfigFromKubeconfig(kubeconfigYAML)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse kubeconfig: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
dyn, err := dynamic.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
@ -36,10 +69,13 @@ func NewDynamicClientFromKubeconfig(kubeconfigYAML string) (dynamic.Interface, e
|
||||
// NewKubernetesClientFromKubeconfig builds a typed kubernetes.Interface
|
||||
// from raw kubeconfig YAML. Used for Pod listing + log tailing on
|
||||
// helm-controller in flux-system.
|
||||
//
|
||||
// Same rest.Config.Timeout treatment as NewDynamicClientFromKubeconfig
|
||||
// — issue #923.
|
||||
func NewKubernetesClientFromKubeconfig(kubeconfigYAML string) (kubernetes.Interface, error) {
|
||||
cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfigYAML))
|
||||
cfg, err := restConfigFromKubeconfig(kubeconfigYAML)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse kubeconfig: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
clientset, err := kubernetes.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
@ -47,3 +83,46 @@ func NewKubernetesClientFromKubeconfig(kubeconfigYAML string) (kubernetes.Interf
|
||||
}
|
||||
return clientset, nil
|
||||
}
|
||||
|
||||
// NewReachabilityProbeFromKubeconfig is the production default for
|
||||
// Config.Reachability (issue #923). It returns a closure that, on
|
||||
// each call, builds a discovery client from the kubeconfig and asks
|
||||
// the apiserver for its server-version. The discovery client uses
|
||||
// the same rest.Config.Timeout as the rest of the helmwatch path
|
||||
// (DefaultRESTConfigTimeout, 30s), so the per-attempt timeout that
|
||||
// matters in the probe loop is whichever is shorter:
|
||||
// rest.Config.Timeout vs the per-attempt context the caller passes.
|
||||
//
|
||||
// We rebuild the client per call rather than caching it: rest.Config
|
||||
// caches connection pools internally, but a transient TLS handshake
|
||||
// failure can leave a poisoned connection in the pool. A fresh client
|
||||
// every attempt sidesteps that — the probe loop only runs at most
|
||||
// ReachabilityOverallBudget (10m default), so the allocation cost is
|
||||
// negligible.
|
||||
//
|
||||
// On success: returns nil. On any failure (parse, factory init,
|
||||
// ServerVersion call): returns a wrapped error so the probe loop's
|
||||
// emitted message names the failure mode. The watcher's reconnect
|
||||
// loop classifies any non-nil return as "retry with backoff".
|
||||
func NewReachabilityProbeFromKubeconfig(kubeconfigYAML string) func(ctx context.Context) error {
|
||||
return func(ctx context.Context) error {
|
||||
cfg, err := restConfigFromKubeconfig(kubeconfigYAML)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reachability: %w", err)
|
||||
}
|
||||
client, err := discovery.NewDiscoveryClientForConfig(cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reachability: build discovery client: %w", err)
|
||||
}
|
||||
// Use RESTClient().Get on /version so the apiserver's CA
|
||||
// validation + auth path runs end-to-end. ServerVersion()
|
||||
// uses RESTClient().AbsPath("/version") under the hood; we
|
||||
// call the same path explicitly so we can attach the
|
||||
// caller's per-attempt context for fast cancellation.
|
||||
raw := client.RESTClient().Get().AbsPath("/version").Do(ctx)
|
||||
if err := raw.Error(); err != nil {
|
||||
return fmt.Errorf("reachability: GET /version: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,400 @@
|
||||
// Tests for the pre-flight apiserver reachability probe + reconnect
|
||||
// loop (issue #923).
|
||||
//
|
||||
// What this file proves:
|
||||
//
|
||||
// 1. After-Pod-restart reconnect path — a Watcher started against an
|
||||
// apiserver that is unreachable for the first N attempts, then
|
||||
// reachable, emits one "watcher-reconnecting" warn event per
|
||||
// failed attempt, fires OnSubstate with SubstateReconnecting on
|
||||
// the first failure and SubstateWatching on the eventual success,
|
||||
// and proceeds to the informer's normal cache-sync path.
|
||||
//
|
||||
// 2. Reachability budget exhaustion — when the apiserver stays
|
||||
// unreachable for the full ReachabilityOverallBudget, the probe
|
||||
// loop falls through to the informer (NOT a hard failure). The
|
||||
// informer's own WatchTimeout path then classifies as
|
||||
// OutcomeFluxNotReconciling — exactly the right diagnostic for a
|
||||
// genuinely unreachable apiserver.
|
||||
//
|
||||
// 3. Substate transitions — OnSubstate fires exactly once for the
|
||||
// first reconnecting event, and exactly once for the final
|
||||
// watching event; no spurious duplicate substate notifications
|
||||
// even when the informer cache-sync also runs.
|
||||
//
|
||||
// We use a fake DynamicFactory + fake CoreFactory so no real cluster
|
||||
// is needed; the probe is supplied via Config.Reachability with a
|
||||
// counter-based closure so transient-then-success can be exercised
|
||||
// deterministically. Sleep is overridden to a no-op so the backoff
|
||||
// runs in microseconds.
|
||||
package helmwatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
dynamicfake "k8s.io/client-go/dynamic/fake"
|
||||
)
|
||||
|
||||
// transientThenSuccessProbe returns a Reachability factory that fails
|
||||
// the first `failures` attempts with the given error, then succeeds.
|
||||
// Counter is shared via the returned pointer so the test can assert
|
||||
// the call count.
|
||||
func transientThenSuccessProbe(failures int, transientErr error) (func(string) func(context.Context) error, *int32) {
|
||||
var count int32
|
||||
factory := func(_ string) func(context.Context) error {
|
||||
return func(_ context.Context) error {
|
||||
n := atomic.AddInt32(&count, 1)
|
||||
if int(n) <= failures {
|
||||
return transientErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return factory, &count
|
||||
}
|
||||
|
||||
// substateRecorder collects every substate transition the watcher
|
||||
// emits via OnSubstate so tests can assert ordering + de-duplication.
|
||||
type substateRecorder struct {
|
||||
mu sync.Mutex
|
||||
values []string
|
||||
}
|
||||
|
||||
func (s *substateRecorder) record(v string) {
|
||||
s.mu.Lock()
|
||||
s.values = append(s.values, v)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *substateRecorder) snapshot() []string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
out := make([]string, len(s.values))
|
||||
copy(out, s.values)
|
||||
return out
|
||||
}
|
||||
|
||||
// noopSleep makes the reachability backoff run in microseconds. Used
|
||||
// by every reachability test so a 60s max-interval cap doesn't drag
|
||||
// the test runtime to a halt.
|
||||
func noopSleep(_ context.Context, _ time.Duration) {}
|
||||
|
||||
// TestReachabilityProbe_HappyPath_SingleAttemptSucceeds verifies the
|
||||
// production-shaped "apiserver was reachable on the first try" path
|
||||
// emits NO "watcher-reconnecting" diagnostics and substate flips
|
||||
// straight to SubstateWatching.
|
||||
func TestReachabilityProbe_HappyPath_SingleAttemptSucceeds(t *testing.T) {
|
||||
scheme := newFakeScheme()
|
||||
releases := []runtime.Object{
|
||||
makeHelmRelease("bp-cilium", []metav1.Condition{
|
||||
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
|
||||
}),
|
||||
}
|
||||
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
|
||||
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
|
||||
releases...,
|
||||
)
|
||||
|
||||
rec := &recorder{}
|
||||
subs := &substateRecorder{}
|
||||
|
||||
probeFactory, calls := transientThenSuccessProbe(0, nil)
|
||||
|
||||
cfg := Config{
|
||||
KubeconfigYAML: "fake-kubeconfig: bytes",
|
||||
WatchTimeout: 5 * time.Second,
|
||||
DynamicFactory: fakeFactory(client),
|
||||
Reachability: probeFactory,
|
||||
Sleep: noopSleep,
|
||||
Resync: 0,
|
||||
OnSubstate: subs.record,
|
||||
}
|
||||
w, err := NewWatcher(cfg, rec.emit)
|
||||
if err != nil {
|
||||
t.Fatalf("NewWatcher: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := w.Watch(ctx); err != nil {
|
||||
t.Fatalf("Watch: %v", err)
|
||||
}
|
||||
|
||||
if got := atomic.LoadInt32(calls); got != 1 {
|
||||
t.Errorf("expected exactly 1 reachability probe call, got %d", got)
|
||||
}
|
||||
|
||||
// Substate transitions: success-on-first-attempt should NOT emit
|
||||
// SubstateReconnecting (we only flip substate once the first
|
||||
// failed attempt happens). It should still flip to
|
||||
// SubstateWatching so the wizard can read the live state.
|
||||
got := subs.snapshot()
|
||||
if len(got) != 1 || got[0] != SubstateWatching {
|
||||
t.Errorf("substate transitions = %v, want [%q]", got, SubstateWatching)
|
||||
}
|
||||
|
||||
// No "Sovereign apiserver unreachable" warns should be in the
|
||||
// emit buffer on the happy path.
|
||||
for _, ev := range rec.snapshot() {
|
||||
if strings.Contains(ev.Message, "unreachable") {
|
||||
t.Errorf("happy-path watch should not emit any 'unreachable' diagnostic, got: %q", ev.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestReachabilityProbe_TransientThenSuccess proves the
|
||||
// catalyst-api-Pod-restart-then-LB-warm-up scenario: the apiserver
|
||||
// is unreachable for the first 2 attempts, then answers. The watcher
|
||||
// emits exactly 2 "watcher-reconnecting" warns, fires OnSubstate
|
||||
// with reconnecting → watching, and proceeds to the normal informer
|
||||
// cache-sync path that returns OutcomeReady.
|
||||
func TestReachabilityProbe_TransientThenSuccess(t *testing.T) {
|
||||
scheme := newFakeScheme()
|
||||
releases := []runtime.Object{
|
||||
makeHelmRelease("bp-cilium", []metav1.Condition{
|
||||
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
|
||||
}),
|
||||
}
|
||||
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
|
||||
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
|
||||
releases...,
|
||||
)
|
||||
|
||||
rec := &recorder{}
|
||||
subs := &substateRecorder{}
|
||||
|
||||
transientErr := errors.New("Get https://5.161.50.175:6443/version: net/http: TLS handshake timeout")
|
||||
probeFactory, calls := transientThenSuccessProbe(2, transientErr)
|
||||
|
||||
cfg := Config{
|
||||
KubeconfigYAML: "fake-kubeconfig: bytes",
|
||||
WatchTimeout: 5 * time.Second,
|
||||
DynamicFactory: fakeFactory(client),
|
||||
Reachability: probeFactory,
|
||||
Sleep: noopSleep,
|
||||
ReachabilityRetryInitialInterval: 1 * time.Millisecond,
|
||||
ReachabilityRetryMaxInterval: 1 * time.Millisecond,
|
||||
ReachabilityProbeTimeout: 100 * time.Millisecond,
|
||||
Resync: 0,
|
||||
OnSubstate: subs.record,
|
||||
}
|
||||
w, err := NewWatcher(cfg, rec.emit)
|
||||
if err != nil {
|
||||
t.Fatalf("NewWatcher: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := w.Watch(ctx); err != nil {
|
||||
t.Fatalf("Watch: %v", err)
|
||||
}
|
||||
|
||||
if got := atomic.LoadInt32(calls); got != 3 {
|
||||
t.Errorf("expected 3 reachability probe calls (2 transient failures + 1 success), got %d", got)
|
||||
}
|
||||
|
||||
// Outcome must be Ready — the informer ran normally after
|
||||
// reachability succeeded.
|
||||
if outcome := w.Outcome(); outcome != OutcomeReady {
|
||||
t.Errorf("Outcome = %q, want %q", outcome, OutcomeReady)
|
||||
}
|
||||
|
||||
// Substate transitions: first reconnecting (set on first failed
|
||||
// probe), then watching (set on the eventual success). Duplicate
|
||||
// reconnecting calls during the loop are de-duped by
|
||||
// notifySubstate's per-call dispatch — but the recorder sees
|
||||
// every notifySubstate call, so we expect 2x reconnecting + 1
|
||||
// watching.
|
||||
got := subs.snapshot()
|
||||
if len(got) != 3 {
|
||||
t.Errorf("substate transitions length = %d, want 3 (got %v)", len(got), got)
|
||||
}
|
||||
if len(got) >= 1 && got[0] != SubstateReconnecting {
|
||||
t.Errorf("first substate = %q, want %q", got[0], SubstateReconnecting)
|
||||
}
|
||||
if len(got) >= 1 && got[len(got)-1] != SubstateWatching {
|
||||
t.Errorf("last substate = %q, want %q", got[len(got)-1], SubstateWatching)
|
||||
}
|
||||
|
||||
// Count the "watcher-reconnecting"-shaped warn events. We expect
|
||||
// exactly 2 (one per failed attempt; the 3rd attempt succeeded
|
||||
// and emits an info-level "reachable on attempt N" diagnostic
|
||||
// instead).
|
||||
warnCount := 0
|
||||
infoReachableCount := 0
|
||||
for _, ev := range rec.snapshot() {
|
||||
if ev.Phase == PhaseComponent && ev.Level == "warn" && strings.Contains(ev.Message, "Sovereign apiserver unreachable") {
|
||||
warnCount++
|
||||
}
|
||||
if ev.Phase == PhaseComponent && ev.Level == "info" && strings.Contains(ev.Message, "Sovereign apiserver reachable on attempt") {
|
||||
infoReachableCount++
|
||||
}
|
||||
}
|
||||
if warnCount != 2 {
|
||||
t.Errorf("expected 2 'unreachable' warn events, got %d", warnCount)
|
||||
}
|
||||
if infoReachableCount != 1 {
|
||||
t.Errorf("expected 1 'reachable on attempt N' info event, got %d", infoReachableCount)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReachabilityProbe_BudgetExhausted_FallsThroughToInformer proves
|
||||
// that when the reachability probe loop exhausts its overall budget
|
||||
// (apiserver stays unreachable), the watcher does NOT terminate the
|
||||
// run with a hard failure — instead it falls through to factory.Start
|
||||
// + WaitForCacheSync. The informer's own retry path then drives the
|
||||
// terminal classification. With a fake DynamicFactory whose List
|
||||
// resolves cleanly (no real apiserver), the watch then proceeds to
|
||||
// OutcomeReady — proving that "budget-exhausted" is NOT a hard
|
||||
// failure mode. (In production the informer hits a real unreachable
|
||||
// apiserver, WaitForCacheSync times out via WatchTimeout, and the
|
||||
// classifier returns OutcomeFluxNotReconciling.)
|
||||
func TestReachabilityProbe_BudgetExhausted_FallsThroughToInformer(t *testing.T) {
|
||||
scheme := newFakeScheme()
|
||||
releases := []runtime.Object{
|
||||
makeHelmRelease("bp-cilium", []metav1.Condition{
|
||||
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
|
||||
}),
|
||||
}
|
||||
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
|
||||
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
|
||||
releases...,
|
||||
)
|
||||
|
||||
rec := &recorder{}
|
||||
subs := &substateRecorder{}
|
||||
|
||||
// Always-fail probe.
|
||||
var calls int32
|
||||
alwaysFail := func(_ string) func(context.Context) error {
|
||||
return func(_ context.Context) error {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
return errors.New("apiserver unreachable")
|
||||
}
|
||||
}
|
||||
|
||||
cfg := Config{
|
||||
KubeconfigYAML: "fake-kubeconfig: bytes",
|
||||
WatchTimeout: 5 * time.Second,
|
||||
DynamicFactory: fakeFactory(client),
|
||||
Reachability: alwaysFail,
|
||||
Sleep: noopSleep,
|
||||
ReachabilityRetryInitialInterval: 1 * time.Millisecond,
|
||||
ReachabilityRetryMaxInterval: 1 * time.Millisecond,
|
||||
ReachabilityProbeTimeout: 10 * time.Millisecond,
|
||||
// 50ms budget — guarantees we exhaust within the test's
|
||||
// 5s WatchTimeout.
|
||||
ReachabilityOverallBudget: 50 * time.Millisecond,
|
||||
Resync: 0,
|
||||
OnSubstate: subs.record,
|
||||
}
|
||||
w, err := NewWatcher(cfg, rec.emit)
|
||||
if err != nil {
|
||||
t.Fatalf("NewWatcher: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := w.Watch(ctx); err != nil {
|
||||
t.Fatalf("Watch: %v", err)
|
||||
}
|
||||
|
||||
// Probe should have been called at least once.
|
||||
if got := atomic.LoadInt32(&calls); got < 1 {
|
||||
t.Errorf("expected ≥1 reachability probe call, got %d", got)
|
||||
}
|
||||
|
||||
// We should see a "reachability budget exhausted" warn
|
||||
// diagnostic in the emit buffer — this is the operator-facing
|
||||
// signal that the loop fell through to the informer.
|
||||
exhaustedSeen := false
|
||||
for _, ev := range rec.snapshot() {
|
||||
if ev.Level == "warn" && strings.Contains(ev.Message, "reachability budget") && strings.Contains(ev.Message, "exhausted") {
|
||||
exhaustedSeen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !exhaustedSeen {
|
||||
t.Errorf("expected 'reachability budget exhausted' warn event in emit buffer; got events:\n%v", rec.snapshot())
|
||||
}
|
||||
|
||||
// Substate must have flipped to reconnecting at least once
|
||||
// during the failure loop (a budget-exhausted run never reaches
|
||||
// SubstateWatching from the probe path; the fact that the fake
|
||||
// informer succeeds afterwards is irrelevant — the substate
|
||||
// invariant is "reconnecting was observed on the failure
|
||||
// trajectory").
|
||||
subSeen := false
|
||||
for _, v := range subs.snapshot() {
|
||||
if v == SubstateReconnecting {
|
||||
subSeen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !subSeen {
|
||||
t.Errorf("expected SubstateReconnecting to be fired during the failure loop; got %v", subs.snapshot())
|
||||
}
|
||||
}
|
||||
|
||||
// TestReachabilityProbe_ContextCancelDuringProbe proves the watcher
|
||||
// returns cleanly (no hang) when the overall watchCtx fires while
|
||||
// the probe loop is mid-retry. This is the Pod-shutdown / overall-
|
||||
// WatchTimeout path.
|
||||
func TestReachabilityProbe_ContextCancelDuringProbe(t *testing.T) {
|
||||
scheme := newFakeScheme()
|
||||
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
|
||||
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
|
||||
)
|
||||
|
||||
rec := &recorder{}
|
||||
subs := &substateRecorder{}
|
||||
|
||||
// Probe blocks until ctx fires.
|
||||
blockingProbe := func(_ string) func(context.Context) error {
|
||||
return func(ctx context.Context) error {
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
cfg := Config{
|
||||
KubeconfigYAML: "fake-kubeconfig: bytes",
|
||||
WatchTimeout: 250 * time.Millisecond,
|
||||
DynamicFactory: fakeFactory(client),
|
||||
Reachability: blockingProbe,
|
||||
Sleep: noopSleep,
|
||||
ReachabilityRetryInitialInterval: 10 * time.Millisecond,
|
||||
ReachabilityRetryMaxInterval: 10 * time.Millisecond,
|
||||
ReachabilityProbeTimeout: 50 * time.Millisecond,
|
||||
ReachabilityOverallBudget: 10 * time.Second, // much larger than WatchTimeout
|
||||
Resync: 0,
|
||||
OnSubstate: subs.record,
|
||||
}
|
||||
w, err := NewWatcher(cfg, rec.emit)
|
||||
if err != nil {
|
||||
t.Fatalf("NewWatcher: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
_, _ = w.Watch(ctx)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// The watch must return within a reasonable bound — the inner
|
||||
// WatchTimeout is 250ms, plus probe-attempt timeouts of 50ms
|
||||
// each. We give 2s of slack.
|
||||
if elapsed > 2*time.Second {
|
||||
t.Errorf("Watch took %s — expected to return within 2s of WatchTimeout firing", elapsed)
|
||||
}
|
||||
}
|
||||
@ -664,6 +664,25 @@ type Result struct {
|
||||
// nil while Phase 1 is in flight or has not started.
|
||||
Phase1FinishedAt *time.Time `json:"phase1FinishedAt,omitempty"`
|
||||
|
||||
// Phase1Substate — live sub-status while Phase 1 is in flight
|
||||
// (issue #923). Set by helmwatch.Watcher.OnSubstate as the watch
|
||||
// progresses through pre-flight reachability + cache-sync. Cleared
|
||||
// (set to "") once the watch terminates and Phase1Outcome is
|
||||
// stamped. The Sovereign Admin's wizard banner reads this to
|
||||
// render a granular status pill while Status itself stays
|
||||
// "phase1-watching":
|
||||
//
|
||||
// - "watcher-reconnecting" — apiserver was unreachable and the
|
||||
// pre-flight probe is retrying with backoff (typical after a
|
||||
// catalyst-api Pod restart while the LB / kube-vip warms up)
|
||||
// - "watcher-watching" — apiserver reachable, informer
|
||||
// attached, observing per-component HelmRelease transitions
|
||||
//
|
||||
// Empty while Phase 1 has not started, has terminated, or the
|
||||
// build of catalyst-api predates the substate field — the wizard
|
||||
// falls back to rendering the bare Status pill.
|
||||
Phase1Substate string `json:"phase1Substate,omitempty"`
|
||||
|
||||
// Phase1Outcome — terminal classification of the Phase-1 watch.
|
||||
// One of:
|
||||
//
|
||||
|
||||
Loading…
Reference in New Issue
Block a user