diff --git a/products/catalyst/bootstrap/api/internal/k8scache/evaluators/evaluators.go b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/evaluators.go new file mode 100644 index 00000000..562f861f --- /dev/null +++ b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/evaluators.go @@ -0,0 +1,575 @@ +// Package evaluators — synthetic PolicyReport producers for compliance +// signals Kyverno cannot evaluate at admission. +// +// EPIC-1 (#1096) Slice W2 — five evaluators ship in this package: +// +// - hpa : HPA min replicas vs Deployment.replicas +// - otel : Pod has otel-collector sidecar OR namespace has Instrumentation CR +// - hubble : Cilium Hubble has observed flow to/from this Pod (deferred — no client dep) +// - harbor : Container image refs harbor./... +// - flux : Resource has app.kubernetes.io/managed-by=flux OR Flux ownerRef +// +// Architecture (per docs/EPICS-1-6-unified-design.md §4.1, brief +// `02-W-watcher-extension.md`): +// +// k8scache.Factory evaluators.Engine +// ───────────────── ────────────────── +// [informer events] ─Pod ADDED ──→ Subscribe(kinds={pod}) +// │ +// ┌────────┴────────┐ +// │ EvaluateAll(...) │ +// └────────┬────────┘ +// │ +// synthetic SyntheticReport rows +// │ +// Factory.Publish(Event{Kind:"compliance-evaluator"}) +// │ +// [SSE fanout] ←───────────── re-enters the same fanout the +// architecture-graph subscribers consume +// +// Per ADR-0001 §5: the 30s ticker re-evaluates over the in-process +// Indexer, NOT against the apiserver. Evaluators are pure functions +// over a Snapshot read-interface — they never make REST calls. +// +// Per docs/INVIOLABLE-PRINCIPLES.md #4: every threshold (lookback +// window, regex pattern, registry domain template) is a Config field +// — no hardcoded values. +package evaluators + +import ( + "context" + "errors" + "fmt" + "log/slog" + "strings" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" +) + +// Result is the canonical outcome — string-typed so it round-trips +// 1:1 through the synthetic PolicyReport JSON wire format the score +// aggregator (slice S1) consumes. +type Result string + +const ( + // ResultPass — workload satisfies the policy. + ResultPass Result = "pass" + // ResultFail — workload violates the policy. + ResultFail Result = "fail" + // ResultSkip — policy not applicable to this workload (e.g. HPA + // not present, Hubble UI disabled). Slice S1's normalizer drops + // skip rows from the denominator. + ResultSkip Result = "skip" + // ResultWarn — informational, does not affect the score. + ResultWarn Result = "warn" +) + +// SyntheticReport mirrors the wgpolicyk8s.io PolicyReport row schema so +// the score aggregator's join is uniform across Kyverno-emitted rows +// and evaluator-emitted ones. +// +// Wire-shape: encoded directly into the SSE event's Object. The score +// aggregator deserialises this struct from each event's +// .object.results[0] field — keep field names stable across releases. +type SyntheticReport struct { + // Policy — short canonical policy name, matching the EPIC-1 §4.3 + // table ("hpa-effective", "otel-injected", "harbor-proxy-pull", + // "flux-managed", "hubble-flows-seen"). + Policy string `json:"policy"` + + // Rule — for parity with Kyverno's per-rule reporting; equal to + // Policy for the simple one-rule evaluators in this package. + Rule string `json:"rule"` + + // Result — pass / fail / skip / warn. + Result Result `json:"result"` + + // Resource — back-pointer to the offending K8s object. Populated + // from the workload metadata (apiVersion, kind, name, namespace, + // uid). Skip rows still set Resource so the UI drill-down shows + // the workload that was inspected. + Resource metav1.OwnerReference `json:"resource"` + + // Namespace — copied from the workload for the namespace-aware + // drill-down filter. + Namespace string `json:"namespace,omitempty"` + + // Message — human-readable explanation. + Message string `json:"message,omitempty"` + + // Properties — arbitrary key-value details (e.g. for HPA the + // minReplicas + currentReplicas observed values; for Harbor the + // rejected image ref). + Properties map[string]string `json:"properties,omitempty"` + + // Time — server-side timestamp the row was produced. Driven by + // the Engine's clock for deterministic test fixtures. + Time metav1.Time `json:"time"` +} + +// Snapshot is the read-side interface evaluators consume. Backed in +// production by k8scache.Factory.List which reads the in-process +// Indexer (no apiserver calls). Tests inject a fake. +// +// Cluster-scope: every evaluator works against ONE Sovereign at a +// time — the cluster id is passed at Engine.Run time. Snapshot.List +// returns objects from that single cluster's Indexer. +type Snapshot interface { + // List returns every object of `kindName` currently in the cache, + // optionally filtered by a label selector. An empty selector + // returns all objects. Returns an error when the kind is not + // registered. + List(kindName string, sel labels.Selector) ([]*unstructured.Unstructured, error) +} + +// Evaluator is a pure function: given a snapshot of the cluster and a +// target workload (Pod / Deployment / etc.), produce zero or more +// SyntheticReport rows. +// +// Evaluators MUST NOT block on I/O — they read from `snap` (the local +// Indexer) and return. Long-running reachability checks (e.g. Hubble +// flow query) belong on the engine's tick goroutine, not in +// Evaluate. +type Evaluator interface { + // Name returns the canonical policy name. Used for logging, + // metrics, and the SyntheticReport.Policy field. + Name() string + + // Evaluate applies the policy to the target. May return: + // - one row (typical) + // - zero rows (target not in scope — skipped silently rather + // than emitting a skip row; e.g. otel evaluator skips + // non-application workloads) + // - multiple rows (one per container, etc.) + Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport +} + +// Config — runtime knobs for the engine and individual evaluators. +// Per INVIOLABLE-PRINCIPLES #4 every threshold is a config var. +type Config struct { + // Logger — required. + Logger *slog.Logger + + // TickInterval — how often the engine re-evaluates over the + // snapshot. Defaults to 30s. Setting to 0 disables the ticker (the + // engine still reacts to Pod ADD/MODIFY events). + TickInterval time.Duration + + // HPAMinReplicas — only inspected by hpa.go; the floor under + // which the evaluator emits FAIL even when HPA is happy. Default + // 1 (any positive replica count is acceptable). + HPAMinReplicas int32 + + // HubbleEnabled — when false the hubble evaluator emits skip + // without trying to talk to the Hubble Observer API. Wired from + // the bp-cilium chart's `hubble.relay.enabled` value. + HubbleEnabled bool + + // HubbleLookbackWindow — how far back in time the hubble + // evaluator searches for flows touching the Pod. Default 5min. + HubbleLookbackWindow time.Duration + + // HarborDomain — the per-Sovereign Harbor host used by the harbor + // evaluator's prefix check (e.g. `harbor.omantel.omani.works`). + // Empty disables the harbor evaluator (skip everywhere). Per + // `feedback_never_hardcode_urls.md` this is a runtime config — + // the value is sourced from the Sovereign's bp-harbor chart, NOT + // from Go. + HarborDomain string + + // HarborAllowedPrefixes — additional registry prefixes that are + // permitted (e.g. internal mirrors). Default empty. The harbor + // evaluator passes if image starts with `/` OR any + // of these prefixes. + HarborAllowedPrefixes []string + + // FluxManagedByLabel — label key whose value `flux` indicates + // Flux ownership. Default `app.kubernetes.io/managed-by`. + FluxManagedByLabel string + + // FluxManagedByValue — label value indicating Flux ownership. + // Default `flux`. + FluxManagedByValue string + + // OTelInjectAnnotationPrefix — annotation prefix that signals + // OTel auto-instrumentation (Pod-level). Default + // `instrumentation.opentelemetry.io/inject-`. The evaluator + // checks any annotation key with this prefix whose value is + // `true`. + OTelInjectAnnotationPrefix string + + // OTelSidecarImageMatch — substring matched against each + // container's image to detect an OTel collector sidecar. + // Default `opentelemetry-collector`. + OTelSidecarImageMatch string + + // OTelInstrumentationKind — the kind name (in the k8scache + // registry) for the namespace-scoped Instrumentation CR. When + // the kind is not registered the evaluator falls back to + // sidecar + annotation checks only. Default + // `instrumentation.opentelemetry.io`. + OTelInstrumentationKind string + + // Now — time source for SyntheticReport.Time. Defaults to + // time.Now. Overridden by tests for deterministic timestamps. + Now func() time.Time +} + +func (c *Config) defaults() { + if c.TickInterval == 0 { + c.TickInterval = 30 * time.Second + } + if c.HPAMinReplicas == 0 { + c.HPAMinReplicas = 1 + } + if c.HubbleLookbackWindow == 0 { + c.HubbleLookbackWindow = 5 * time.Minute + } + if c.FluxManagedByLabel == "" { + c.FluxManagedByLabel = "app.kubernetes.io/managed-by" + } + if c.FluxManagedByValue == "" { + c.FluxManagedByValue = "flux" + } + if c.OTelInjectAnnotationPrefix == "" { + c.OTelInjectAnnotationPrefix = "instrumentation.opentelemetry.io/inject-" + } + if c.OTelSidecarImageMatch == "" { + c.OTelSidecarImageMatch = "opentelemetry-collector" + } + if c.OTelInstrumentationKind == "" { + c.OTelInstrumentationKind = "instrumentation.opentelemetry.io" + } + if c.Now == nil { + c.Now = time.Now + } +} + +// Publisher is the write-side interface — the Engine pushes synthetic +// events through this. Production wires it to k8scache.Factory.Publish; +// tests inject a recorder. +type Publisher interface { + // Publish emits one synthetic event. The Engine wraps every + // SyntheticReport into the wire envelope before calling. + Publish(clusterID string, report SyntheticReport) +} + +// EvaluateAll runs every evaluator against the target and returns the +// concatenated results. Convenience wrapper used by the engine and +// tests; never returns nil — empty slice on no findings. +// +// Order is deterministic — evaluators are applied in the order +// supplied. Tests rely on this; do not parallelise here. +func EvaluateAll(ctx context.Context, snap Snapshot, target *unstructured.Unstructured, evals []Evaluator) []SyntheticReport { + if target == nil { + return nil + } + out := make([]SyntheticReport, 0, len(evals)) + for _, e := range evals { + rows := e.Evaluate(ctx, snap, target) + out = append(out, rows...) + } + return out +} + +// Engine subscribes to the watcher's Pod events, runs every registered +// evaluator on each event, and publishes the synthetic reports back +// through the SSE fanout. +// +// Lifecycle: +// - NewEngine validates Config + evaluator set +// - Run blocks until ctx is cancelled; spawns one goroutine per +// subscribed cluster + one ticker goroutine +// - Multiple invocations are NOT supported on the same Engine; create +// a new Engine for re-runs +type Engine struct { + cfg Config + evaluators []Evaluator + pub Publisher + + // resolveSnapshot maps clusterID → Snapshot. The factory has one + // Indexer per cluster; the engine looks up by id on every event / + // tick. + resolveSnapshot func(clusterID string) (Snapshot, []string, error) + + // subscribe maps a (user, kinds) pair to a channel. Production + // wires to Factory.Subscribe. + subscribe func(kinds map[string]struct{}) (<-chan eventLite, func()) + + mu sync.Mutex + started bool +} + +// eventLite is the engine's internal copy of k8scache.Event minus the +// EventType — the engine only cares about ADD / MODIFY for the trigger +// path. DELETE is handled implicitly: when a target disappears from +// the Indexer, the next tick stops emitting reports for it. +type eventLite struct { + Cluster string + Kind string + Object *unstructured.Unstructured +} + +// NewEngine wires an Engine without starting it. +func NewEngine(cfg Config, evals []Evaluator, pub Publisher, + resolveSnapshot func(clusterID string) (Snapshot, []string, error), + subscribe func(kinds map[string]struct{}) (<-chan eventLite, func()), +) (*Engine, error) { + if cfg.Logger == nil { + return nil, errors.New("evaluators: Config.Logger is required") + } + if pub == nil { + return nil, errors.New("evaluators: Publisher is required") + } + if resolveSnapshot == nil { + return nil, errors.New("evaluators: resolveSnapshot is required") + } + if subscribe == nil { + return nil, errors.New("evaluators: subscribe is required") + } + if len(evals) == 0 { + return nil, errors.New("evaluators: at least one Evaluator must be registered") + } + cfg.defaults() + return &Engine{ + cfg: cfg, + evaluators: evals, + pub: pub, + resolveSnapshot: resolveSnapshot, + subscribe: subscribe, + }, nil +} + +// Run starts the engine and blocks until ctx is cancelled. On exit it +// closes the watcher subscription cleanly. +func (e *Engine) Run(ctx context.Context) error { + e.mu.Lock() + if e.started { + e.mu.Unlock() + return errors.New("evaluators: Engine already started") + } + e.started = true + e.mu.Unlock() + + // Subscribe to Pod events from the underlying watcher. + ch, unsub := e.subscribe(map[string]struct{}{"pod": {}}) + defer unsub() + + // Periodic ticker — pure cache reads, no apiserver polling. Per + // ADR-0001 §5 this is acceptable because evaluators compute over + // data already in the in-process Indexer. + var tick <-chan time.Time + if e.cfg.TickInterval > 0 { + t := time.NewTicker(e.cfg.TickInterval) + defer t.Stop() + tick = t.C + } + + for { + select { + case <-ctx.Done(): + return nil + case ev, ok := <-ch: + if !ok { + // Subscription closed by the factory (Stop). Exit + // cleanly. + return nil + } + e.evaluateOne(ctx, ev.Cluster, ev.Object) + case <-tick: + e.evaluateAllClusters(ctx) + } + } +} + +// evaluateOne runs every evaluator against a single target and +// publishes the resulting rows. +func (e *Engine) evaluateOne(ctx context.Context, clusterID string, target *unstructured.Unstructured) { + if target == nil { + return + } + snap, _, err := e.resolveSnapshot(clusterID) + if err != nil { + e.cfg.Logger.Warn("evaluators: snapshot unavailable for event", + "cluster", clusterID, "err", err) + return + } + rows := EvaluateAll(ctx, snap, target, e.evaluators) + for i := range rows { + if rows[i].Time.IsZero() { + rows[i].Time = metav1.NewTime(e.cfg.Now()) + } + e.pub.Publish(clusterID, rows[i]) + } +} + +// evaluateAllClusters fans the tick across every cluster the +// resolveSnapshot function knows about. Each cluster's Pod list is +// pulled from its Indexer and every evaluator runs against every +// target. Cost is O(clusters × pods × evaluators) but all reads are +// from the local cache. +func (e *Engine) evaluateAllClusters(ctx context.Context) { + // resolveSnapshot returns the cluster's Snapshot AND the list of + // known cluster ids when the second return is non-empty. Callers + // can pass clusterID="" to mean "give me the list of known + // clusters". + _, clusters, err := e.resolveSnapshot("") + if err != nil || len(clusters) == 0 { + return + } + for _, id := range clusters { + snap, _, err := e.resolveSnapshot(id) + if err != nil { + continue + } + pods, err := snap.List("pod", labels.Everything()) + if err != nil { + continue + } + for _, p := range pods { + rows := EvaluateAll(ctx, snap, p, e.evaluators) + for i := range rows { + if rows[i].Time.IsZero() { + rows[i].Time = metav1.NewTime(e.cfg.Now()) + } + e.pub.Publish(id, rows[i]) + } + } + } +} + +// helpers -------------------------------------------------------- + +// resourceFor builds an OwnerReference from the target's metadata. +// All five evaluators populate SyntheticReport.Resource via this so +// the wire shape is uniform. +func resourceFor(target *unstructured.Unstructured) metav1.OwnerReference { + if target == nil { + return metav1.OwnerReference{} + } + gv := target.GetAPIVersion() + return metav1.OwnerReference{ + APIVersion: gv, + Kind: target.GetKind(), + Name: target.GetName(), + UID: target.GetUID(), + } +} + +// containerImages returns every container image string in the Pod +// (initContainers + containers + ephemeralContainers). Empty slice +// when the target is not a Pod or has no containers. Used by harbor +// + otel evaluators. +func containerImages(target *unstructured.Unstructured) []string { + if target == nil { + return nil + } + out := []string{} + for _, group := range []string{"containers", "initContainers", "ephemeralContainers"} { + raw, found, _ := unstructured.NestedSlice(target.Object, "spec", group) + if !found { + continue + } + for _, item := range raw { + c, ok := item.(map[string]any) + if !ok { + continue + } + if img, ok := c["image"].(string); ok && img != "" { + out = append(out, img) + } + } + } + return out +} + +// containerNames returns container names alongside images — needed by +// the otel evaluator's "is there a sidecar named otel-*" branch. +func containerNames(target *unstructured.Unstructured) []string { + if target == nil { + return nil + } + out := []string{} + for _, group := range []string{"containers", "initContainers"} { + raw, found, _ := unstructured.NestedSlice(target.Object, "spec", group) + if !found { + continue + } + for _, item := range raw { + c, ok := item.(map[string]any) + if !ok { + continue + } + if n, ok := c["name"].(string); ok && n != "" { + out = append(out, n) + } + } + } + return out +} + +// hasOwnerOfKind returns true when the target has at least one +// ownerReference whose APIVersion and Kind match. Empty apiVersion +// matches any group (used by the flux evaluator's HelmRelease / +// Kustomization detection). +func hasOwnerOfKind(target *unstructured.Unstructured, apiGroupSuffix, kind string) bool { + if target == nil { + return false + } + for _, ref := range target.GetOwnerReferences() { + if !strings.EqualFold(ref.Kind, kind) { + continue + } + if apiGroupSuffix == "" || strings.Contains(ref.APIVersion, apiGroupSuffix) { + return true + } + } + return false +} + +// isPod returns true when the target is a v1 Pod. +func isPod(target *unstructured.Unstructured) bool { + if target == nil { + return false + } + return target.GetKind() == "Pod" +} + +// reportTime — produce a metav1.Time from the engine's clock; used by +// evaluators that may emit before the engine fills it in. +func reportTime(now func() time.Time) metav1.Time { + if now == nil { + now = time.Now + } + return metav1.NewTime(now()) +} + +// formatLabelMap stringifies a label set for human-readable +// SyntheticReport.Properties values. +func formatLabelMap(m map[string]string) string { + if len(m) == 0 { + return "" + } + parts := make([]string, 0, len(m)) + for k, v := range m { + parts = append(parts, fmt.Sprintf("%s=%s", k, v)) + } + return strings.Join(parts, ",") +} + +// SubscribeAdapter wraps a k8scache.Factory.Subscribe call into the +// engine's eventLite channel. Production code in the catalyst-api +// `cmd/api/main.go` wires this. Exposed here so tests can construct +// an Engine without depending on the full Factory. +type SubscribeAdapter func(kinds map[string]struct{}) (<-chan eventLite, func()) + +// EventLiteFromUnstructured is a test convenience to build eventLite +// values without exposing the unexported field. Used by the table +// tests in evaluators_test.go. +func EventLiteFromUnstructured(cluster, kind string, obj *unstructured.Unstructured) eventLite { + return eventLite{Cluster: cluster, Kind: kind, Object: obj} +} diff --git a/products/catalyst/bootstrap/api/internal/k8scache/evaluators/evaluators_test.go b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/evaluators_test.go new file mode 100644 index 00000000..3d8e7af6 --- /dev/null +++ b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/evaluators_test.go @@ -0,0 +1,759 @@ +// evaluators_test.go — unit tests for the 5 evaluators + the engine. +// +// EPIC-1 (#1096) Slice W2. +// +// Coverage matrix per evaluator: +// +// hpa : pass (HPA satisfies floor) + fail (currentReplicas < min) +// + skip (no HPA) + skip (Job-owned Pod) +// otel : pass (sidecar) + pass (auto-inject + Instrumentation CR) +// + fail (no sidecar, no annotation) + fail (annotation but no CR) +// hubble : skip (Hubble disabled) + pass (FlowsSeen=true) +// + fail (FlowsSeen=false) + warn (Probe error) +// harbor : pass (Harbor-prefixed) + pass (allowed-prefix) +// + fail (docker.io) + skip (HarborDomain empty) +// + skip (no containers) +// flux : pass (managed-by label) + pass (HelmRelease ownerRef) +// + pass (controller is Flux-owned) + fail (neither) +// +// Engine: +// - subscribe + tick path emits events to the Publisher +// - resolveSnapshot error path is logged + suppressed +// - cancellation cleanly shuts the engine down +// +// Plus 1 EvaluateAll concatenation test. +package evaluators + +import ( + "context" + "errors" + "io" + "log/slog" + "sync" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" +) + +// quietLogger discards log output. +func quietLogger() *slog.Logger { + return slog.New(slog.NewJSONHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})) +} + +// fakeSnapshot is an in-memory Snapshot keyed by canonical kind name. +type fakeSnapshot struct { + by map[string][]*unstructured.Unstructured +} + +func (f *fakeSnapshot) List(kind string, _ labels.Selector) ([]*unstructured.Unstructured, error) { + if f.by == nil { + return nil, nil + } + v, ok := f.by[kind] + if !ok { + // Mirror the production Factory.List behaviour — unknown + // kind returns an error (the evaluator must skip rather + // than crash). + return nil, errKindNotRegistered + } + return v, nil +} + +var errKindNotRegistered = errors.New("kind not registered") + +// recorder is a Publisher that captures every emitted (cluster, report) pair. +type recorder struct { + mu sync.Mutex + entries []recordedEntry +} + +type recordedEntry struct { + cluster string + report SyntheticReport +} + +func (r *recorder) Publish(cluster string, report SyntheticReport) { + r.mu.Lock() + defer r.mu.Unlock() + r.entries = append(r.entries, recordedEntry{cluster: cluster, report: report}) +} + +func (r *recorder) Snapshot() []recordedEntry { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]recordedEntry, len(r.entries)) + copy(out, r.entries) + return out +} + +// ── builders ───────────────────────────────────────────────────── + +func uPod(ns, name string, opts ...podOpt) *unstructured.Unstructured { + p := &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]any{ + "namespace": ns, + "name": name, + "uid": "uid-" + ns + "-" + name, + }, + "spec": map[string]any{ + "containers": []any{}, + }, + }} + for _, o := range opts { + o(p) + } + return p +} + +type podOpt func(*unstructured.Unstructured) + +func withOwner(kind, name string) podOpt { + return func(p *unstructured.Unstructured) { + owners := p.GetOwnerReferences() + owners = append(owners, metav1.OwnerReference{Kind: kind, Name: name, APIVersion: "apps/v1"}) + p.SetOwnerReferences(owners) + } +} + +func withFluxOwner() podOpt { + return func(p *unstructured.Unstructured) { + owners := p.GetOwnerReferences() + owners = append(owners, metav1.OwnerReference{Kind: "HelmRelease", Name: "x", APIVersion: "helm.toolkit.fluxcd.io/v2"}) + p.SetOwnerReferences(owners) + } +} + +func withLabels(m map[string]string) podOpt { + return func(p *unstructured.Unstructured) { p.SetLabels(m) } +} + +func withAnnotations(m map[string]string) podOpt { + return func(p *unstructured.Unstructured) { p.SetAnnotations(m) } +} + +func withContainerImages(images ...string) podOpt { + return func(p *unstructured.Unstructured) { + raw := []any{} + for i, img := range images { + raw = append(raw, map[string]any{ + "name": imageContainerName(i), + "image": img, + }) + } + _ = unstructured.SetNestedSlice(p.Object, raw, "spec", "containers") + } +} + +func imageContainerName(i int) string { + if i == 0 { + return "main" + } + return "side" +} + +func uHPA(ns, name, targetKind, targetName string, minReplicas, currentReplicas int64) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "metadata": map[string]any{ + "namespace": ns, + "name": name, + }, + "spec": map[string]any{ + "minReplicas": minReplicas, + "scaleTargetRef": map[string]any{ + "kind": targetKind, + "name": targetName, + }, + }, + "status": map[string]any{ + "currentReplicas": currentReplicas, + }, + }} +} + +func uReplicaSet(ns, name, deploymentName string) *unstructured.Unstructured { + rs := &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "apps/v1", + "kind": "ReplicaSet", + "metadata": map[string]any{ + "namespace": ns, + "name": name, + }, + }} + if deploymentName != "" { + rs.SetOwnerReferences([]metav1.OwnerReference{{ + Kind: "Deployment", + Name: deploymentName, + APIVersion: "apps/v1", + }}) + } + return rs +} + +func uDeployment(ns, name string, fluxLabel bool) *unstructured.Unstructured { + d := &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]any{ + "namespace": ns, + "name": name, + }, + }} + if fluxLabel { + d.SetLabels(map[string]string{"app.kubernetes.io/managed-by": "flux"}) + } + return d +} + +func uInstrumentation(ns, name string) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "opentelemetry.io/v1alpha1", + "kind": "Instrumentation", + "metadata": map[string]any{ + "namespace": ns, + "name": name, + }, + }} +} + +// ── HPA evaluator ──────────────────────────────────────────────── + +func TestHPA_Pass_HPASatisfiesFloor(t *testing.T) { + hpa := uHPA("acme", "frontend-hpa", "Deployment", "frontend", 3, 5) + rs := uReplicaSet("acme", "frontend-7c5f", "frontend") + pod := uPod("acme", "frontend-7c5f-abc", withOwner("ReplicaSet", "frontend-7c5f")) + snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{ + "horizontalpodautoscaler": {hpa}, + "replicaset": {rs}, + }} + rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod) + if len(rep) != 1 { + t.Fatalf("want 1 row got %d", len(rep)) + } + if rep[0].Result != ResultPass { + t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message) + } +} + +func TestHPA_Fail_CurrentBelowMin(t *testing.T) { + hpa := uHPA("acme", "frontend-hpa", "Deployment", "frontend", 3, 1) + rs := uReplicaSet("acme", "frontend-7c5f", "frontend") + pod := uPod("acme", "frontend-7c5f-abc", withOwner("ReplicaSet", "frontend-7c5f")) + snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{ + "horizontalpodautoscaler": {hpa}, + "replicaset": {rs}, + }} + rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultFail { + t.Fatalf("want fail got %s (%s)", rep[0].Result, rep[0].Message) + } + if rep[0].Properties["minReplicas"] != "3" || rep[0].Properties["currentReplicas"] != "1" { + t.Fatalf("properties not populated: %+v", rep[0].Properties) + } +} + +func TestHPA_Skip_NoHPA(t *testing.T) { + rs := uReplicaSet("acme", "control-plane-rs", "control-plane") + pod := uPod("acme", "control-plane-pod", withOwner("ReplicaSet", "control-plane-rs")) + snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{ + "horizontalpodautoscaler": {}, + "replicaset": {rs}, + }} + rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultSkip { + t.Fatalf("want skip got %s", rep[0].Result) + } +} + +func TestHPA_Skip_JobOwnedPod(t *testing.T) { + pod := uPod("acme", "batch-pod", withOwner("Job", "nightly-cron")) + snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{}} + rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultSkip { + t.Fatalf("want skip for Job-owned Pod got %s", rep[0].Result) + } +} + +func newHPAEvaluator() *HPAEvaluator { + cfg := Config{Logger: quietLogger()} + cfg.defaults() + return NewHPAEvaluator(cfg) +} + +// ── OTel evaluator ─────────────────────────────────────────────── + +func TestOTel_Pass_Sidecar(t *testing.T) { + pod := uPod("acme", "app", withContainerImages( + "docker.io/library/nginx:1.25", + "otel/opentelemetry-collector-contrib:0.95", + )) + snap := &fakeSnapshot{} + rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultPass { + t.Fatalf("want pass got %s", rep[0].Result) + } + if rep[0].Properties["detection"] != "sidecar" { + t.Fatalf("want sidecar detection got %v", rep[0].Properties) + } +} + +func TestOTel_Pass_AutoInjectAnnotation(t *testing.T) { + pod := uPod("acme", "app", + withContainerImages("docker.io/library/nginx:1.25"), + withAnnotations(map[string]string{ + "instrumentation.opentelemetry.io/inject-go": "true", + }), + ) + snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{ + "instrumentation.opentelemetry.io": {uInstrumentation("acme", "default")}, + }} + rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultPass { + t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message) + } + if rep[0].Properties["detection"] != "auto-inject" { + t.Fatalf("want auto-inject got %v", rep[0].Properties) + } +} + +func TestOTel_Fail_NoSidecarNoAnnotation(t *testing.T) { + pod := uPod("acme", "app", withContainerImages("docker.io/library/nginx:1.25")) + snap := &fakeSnapshot{} + rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultFail { + t.Fatalf("want fail got %s", rep[0].Result) + } +} + +func TestOTel_Fail_AnnotationButNoInstrumentationCR(t *testing.T) { + pod := uPod("acme", "app", + withContainerImages("docker.io/library/nginx:1.25"), + withAnnotations(map[string]string{ + "instrumentation.opentelemetry.io/inject-go": "true", + }), + ) + snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{ + "instrumentation.opentelemetry.io": {}, // operator installed, no CR in this ns + }} + rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultFail { + t.Fatalf("want fail (orphan annotation) got %s", rep[0].Result) + } + if rep[0].Properties["detection"] != "auto-inject-orphan" { + t.Fatalf("want auto-inject-orphan got %v", rep[0].Properties) + } +} + +func newOTelEvaluator() *OTelEvaluator { + cfg := Config{Logger: quietLogger()} + cfg.defaults() + return NewOTelEvaluator(cfg) +} + +// ── Hubble evaluator ───────────────────────────────────────────── + +func TestHubble_Skip_Disabled(t *testing.T) { + pod := uPod("acme", "app") + rep := newHubbleEvaluator(false, nil).Evaluate(context.Background(), nil, pod) + if rep[0].Result != ResultSkip { + t.Fatalf("want skip when disabled got %s", rep[0].Result) + } +} + +func TestHubble_Pass_FlowsSeen(t *testing.T) { + pod := uPod("acme", "app") + probe := &fakeHubbleProbe{seen: true} + rep := newHubbleEvaluator(true, probe).Evaluate(context.Background(), nil, pod) + if rep[0].Result != ResultPass { + t.Fatalf("want pass got %s", rep[0].Result) + } +} + +func TestHubble_Fail_NoFlows(t *testing.T) { + pod := uPod("acme", "app") + probe := &fakeHubbleProbe{seen: false} + rep := newHubbleEvaluator(true, probe).Evaluate(context.Background(), nil, pod) + if rep[0].Result != ResultFail { + t.Fatalf("want fail got %s", rep[0].Result) + } +} + +func TestHubble_Warn_ProbeError(t *testing.T) { + pod := uPod("acme", "app") + probe := &fakeHubbleProbe{err: errors.New("connection refused")} + rep := newHubbleEvaluator(true, probe).Evaluate(context.Background(), nil, pod) + if rep[0].Result != ResultWarn { + t.Fatalf("want warn got %s", rep[0].Result) + } +} + +type fakeHubbleProbe struct { + seen bool + err error +} + +func (f *fakeHubbleProbe) FlowsSeen(_ context.Context, _ *unstructured.Unstructured, _ int64) (bool, error) { + if f.err != nil { + return false, f.err + } + return f.seen, nil +} + +func newHubbleEvaluator(enabled bool, probe HubbleProbe) *HubbleEvaluator { + cfg := Config{Logger: quietLogger(), HubbleEnabled: enabled} + cfg.defaults() + e := NewHubbleEvaluator(cfg) + e.Probe = probe + return e +} + +// ── Harbor evaluator ───────────────────────────────────────────── + +func TestHarbor_Pass_HarborPrefixed(t *testing.T) { + pod := uPod("acme", "app", withContainerImages( + "harbor.omantel.omani.works/proxy-ghcr/openova-io/openova/website:abc123", + )) + rep := newHarborEvaluator("harbor.omantel.omani.works", nil).Evaluate(context.Background(), nil, pod) + if rep[0].Result != ResultPass { + t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message) + } +} + +func TestHarbor_Pass_AllowedPrefix(t *testing.T) { + pod := uPod("acme", "app", withContainerImages( + "mirror.openova.io/internal/sidecar:1.0", + )) + rep := newHarborEvaluator("harbor.omantel.omani.works", + []string{"mirror.openova.io/"}).Evaluate(context.Background(), nil, pod) + if rep[0].Result != ResultPass { + t.Fatalf("want pass got %s", rep[0].Result) + } +} + +func TestHarbor_Fail_DockerHub(t *testing.T) { + pod := uPod("acme", "app", withContainerImages( + "harbor.omantel.omani.works/proxy-ghcr/openova-io/openova/api:abc", + "docker.io/library/redis:7.0", + )) + rep := newHarborEvaluator("harbor.omantel.omani.works", nil).Evaluate(context.Background(), nil, pod) + if rep[0].Result != ResultFail { + t.Fatalf("want fail got %s", rep[0].Result) + } + if rep[0].Properties["rejectedImages"] != "docker.io/library/redis:7.0" { + t.Fatalf("rejectedImages wrong: %v", rep[0].Properties) + } +} + +func TestHarbor_Skip_DomainEmpty(t *testing.T) { + pod := uPod("acme", "app", withContainerImages("docker.io/library/nginx:1.25")) + rep := newHarborEvaluator("", nil).Evaluate(context.Background(), nil, pod) + if rep[0].Result != ResultSkip { + t.Fatalf("want skip got %s", rep[0].Result) + } +} + +func TestHarbor_Skip_NoContainers(t *testing.T) { + pod := uPod("acme", "app") // empty containers slice + rep := newHarborEvaluator("harbor.openova.io", nil).Evaluate(context.Background(), nil, pod) + if rep[0].Result != ResultSkip { + t.Fatalf("want skip got %s", rep[0].Result) + } +} + +func newHarborEvaluator(domain string, allowed []string) *HarborEvaluator { + cfg := Config{ + Logger: quietLogger(), + HarborDomain: domain, + HarborAllowedPrefixes: allowed, + } + cfg.defaults() + return NewHarborEvaluator(cfg) +} + +// ── Flux evaluator ─────────────────────────────────────────────── + +func TestFlux_Pass_LabelOnPod(t *testing.T) { + pod := uPod("acme", "app", withLabels(map[string]string{ + "app.kubernetes.io/managed-by": "flux", + })) + snap := &fakeSnapshot{} + rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultPass { + t.Fatalf("want pass got %s", rep[0].Result) + } +} + +func TestFlux_Pass_HelmReleaseOwnerRef(t *testing.T) { + pod := uPod("acme", "app", withFluxOwner()) + snap := &fakeSnapshot{} + rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultPass { + t.Fatalf("want pass got %s", rep[0].Result) + } +} + +func TestFlux_Pass_ControllerIsFluxOwned(t *testing.T) { + dep := uDeployment("acme", "frontend", true) + rs := uReplicaSet("acme", "frontend-7c5f", "frontend") + pod := uPod("acme", "frontend-pod", withOwner("ReplicaSet", "frontend-7c5f")) + snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{ + "deployment": {dep}, + "replicaset": {rs}, + }} + rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultPass { + t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message) + } + if rep[0].Properties["detection"] != "controller-flux-owned" { + t.Fatalf("want controller-flux-owned detection got %v", rep[0].Properties) + } +} + +func TestFlux_Fail_NoLabelNoOwnerRef(t *testing.T) { + dep := uDeployment("acme", "frontend", false) + rs := uReplicaSet("acme", "frontend-7c5f", "frontend") + pod := uPod("acme", "frontend-pod", withOwner("ReplicaSet", "frontend-7c5f")) + snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{ + "deployment": {dep}, + "replicaset": {rs}, + }} + rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod) + if rep[0].Result != ResultFail { + t.Fatalf("want fail got %s", rep[0].Result) + } +} + +func newFluxEvaluator() *FluxEvaluator { + cfg := Config{Logger: quietLogger()} + cfg.defaults() + return NewFluxEvaluator(cfg) +} + +// ── EvaluateAll concatenation ──────────────────────────────────── + +func TestEvaluateAll_ConcatenatesAndPreservesOrder(t *testing.T) { + pod := uPod("acme", "app", withContainerImages("harbor.openova.io/proxy/x:1")) + snap := &fakeSnapshot{} + + cfg := Config{Logger: quietLogger(), HarborDomain: "harbor.openova.io"} + cfg.defaults() + evals := []Evaluator{ + NewHarborEvaluator(cfg), + NewFluxEvaluator(cfg), + } + rows := EvaluateAll(context.Background(), snap, pod, evals) + if len(rows) != 2 { + t.Fatalf("want 2 rows got %d", len(rows)) + } + if rows[0].Policy != "harbor-proxy-pull" || rows[1].Policy != "flux-managed" { + t.Fatalf("order not preserved: %v %v", rows[0].Policy, rows[1].Policy) + } +} + +func TestEvaluateAll_SkipsNonPodTargets(t *testing.T) { + dep := uDeployment("acme", "x", false) + cfg := Config{Logger: quietLogger(), HarborDomain: "harbor.openova.io"} + cfg.defaults() + rows := EvaluateAll(context.Background(), &fakeSnapshot{}, dep, []Evaluator{ + NewHarborEvaluator(cfg), + NewFluxEvaluator(cfg), + }) + if len(rows) != 0 { + t.Fatalf("Pod-only evaluators should ignore Deployments — got %d rows", len(rows)) + } +} + +// ── Engine ─────────────────────────────────────────────────────── + +func TestEngine_PublishesOnSubscribedEvent(t *testing.T) { + rec := &recorder{} + pod := uPod("acme", "app", withContainerImages("harbor.openova.io/proxy/x:1")) + + eventCh := make(chan eventLite, 4) + cfg := Config{ + Logger: quietLogger(), + HarborDomain: "harbor.openova.io", + TickInterval: 0, // event-only path for this test + Now: func() time.Time { return time.Unix(1700000000, 0) }, + } + eng, err := NewEngine(cfg, + []Evaluator{NewHarborEvaluator(cfg)}, + rec, + func(_ string) (Snapshot, []string, error) { return &fakeSnapshot{}, nil, nil }, + func(_ map[string]struct{}) (<-chan eventLite, func()) { + return eventCh, func() {} + }, + ) + if err != nil { + t.Fatalf("NewEngine: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { _ = eng.Run(ctx); close(done) }() + + eventCh <- eventLite{Cluster: "alpha", Kind: "pod", Object: pod} + + deadline := time.After(2 * time.Second) + for { + select { + case <-deadline: + t.Fatalf("recorder never saw the event-driven publish") + default: + if len(rec.Snapshot()) >= 1 { + cancel() + <-done + if rec.Snapshot()[0].cluster != "alpha" { + t.Fatalf("cluster id not propagated") + } + if rec.Snapshot()[0].report.Policy != "harbor-proxy-pull" { + t.Fatalf("unexpected policy %q", rec.Snapshot()[0].report.Policy) + } + if rec.Snapshot()[0].report.Time.IsZero() { + t.Fatalf("report timestamp not stamped by engine") + } + return + } + time.Sleep(10 * time.Millisecond) + } + } +} + +func TestEngine_TickerEvaluatesAllPodsAcrossClusters(t *testing.T) { + rec := &recorder{} + pod1 := uPod("acme", "p1", withContainerImages("harbor.openova.io/proxy/x:1")) + pod2 := uPod("widgets", "p2", withContainerImages("docker.io/library/nginx:1")) + + snaps := map[string]*fakeSnapshot{ + "alpha": {by: map[string][]*unstructured.Unstructured{"pod": {pod1}}}, + "beta": {by: map[string][]*unstructured.Unstructured{"pod": {pod2}}}, + } + eventCh := make(chan eventLite, 1) + + cfg := Config{ + Logger: quietLogger(), + HarborDomain: "harbor.openova.io", + TickInterval: 50 * time.Millisecond, + Now: func() time.Time { return time.Unix(1700000000, 0) }, + } + eng, err := NewEngine(cfg, + []Evaluator{NewHarborEvaluator(cfg)}, + rec, + func(id string) (Snapshot, []string, error) { + if id == "" { + return nil, []string{"alpha", "beta"}, nil + } + s, ok := snaps[id] + if !ok { + return nil, nil, errors.New("no snap") + } + return s, nil, nil + }, + func(_ map[string]struct{}) (<-chan eventLite, func()) { + return eventCh, func() {} + }, + ) + if err != nil { + t.Fatalf("NewEngine: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond) + defer cancel() + done := make(chan struct{}) + go func() { _ = eng.Run(ctx); close(done) }() + + deadline := time.After(1 * time.Second) + for { + select { + case <-deadline: + t.Fatalf("ticker never produced both pass + fail rows; got %v", rec.Snapshot()) + default: + entries := rec.Snapshot() + seenPass, seenFail := false, false + for _, e := range entries { + if e.cluster == "alpha" && e.report.Result == ResultPass { + seenPass = true + } + if e.cluster == "beta" && e.report.Result == ResultFail { + seenFail = true + } + } + if seenPass && seenFail { + cancel() + <-done + return + } + time.Sleep(20 * time.Millisecond) + } + } +} + +func TestEngine_RejectsInvalidConfig(t *testing.T) { + cases := map[string]Config{ + "missing-logger": {}, + } + for name, cfg := range cases { + t.Run(name, func(t *testing.T) { + _, err := NewEngine(cfg, []Evaluator{}, &recorder{}, nil, nil) + if err == nil { + t.Fatalf("want error for %q, got nil", name) + } + }) + } + // Logger-OK but missing other deps → still error. + cfgOK := Config{Logger: quietLogger()} + if _, err := NewEngine(cfgOK, []Evaluator{}, nil, nil, nil); err == nil { + t.Fatalf("missing publisher should error") + } + if _, err := NewEngine(cfgOK, []Evaluator{}, &recorder{}, nil, nil); err == nil { + t.Fatalf("missing resolveSnapshot should error") + } + if _, err := NewEngine(cfgOK, []Evaluator{}, &recorder{}, + func(string) (Snapshot, []string, error) { return nil, nil, nil }, + nil); err == nil { + t.Fatalf("missing subscribe should error") + } + if _, err := NewEngine(cfgOK, nil, &recorder{}, + func(string) (Snapshot, []string, error) { return nil, nil, nil }, + func(map[string]struct{}) (<-chan eventLite, func()) { return nil, nil }); err == nil { + t.Fatalf("empty evaluator slice should error") + } +} + +func TestEngine_DoubleRunRefused(t *testing.T) { + cfg := Config{Logger: quietLogger()} + cfg.defaults() + rec := &recorder{} + eventCh := make(chan eventLite) + eng, err := NewEngine(cfg, []Evaluator{NewFluxEvaluator(cfg)}, rec, + func(string) (Snapshot, []string, error) { return &fakeSnapshot{}, nil, nil }, + func(map[string]struct{}) (<-chan eventLite, func()) { + return eventCh, func() {} + }, + ) + if err != nil { + t.Fatalf("NewEngine: %v", err) + } + ctx, cancel := context.WithCancel(context.Background()) + go func() { _ = eng.Run(ctx) }() + time.Sleep(20 * time.Millisecond) + if err := eng.Run(context.Background()); err == nil { + t.Fatalf("second Run should error") + } + cancel() +} + +// Sanity that the package's helper conversions don't drop fields. +func TestEventLiteFromUnstructured(t *testing.T) { + pod := uPod("ns", "n") + ev := EventLiteFromUnstructured("alpha", "pod", pod) + if ev.Cluster != "alpha" || ev.Kind != "pod" || ev.Object != pod { + t.Fatalf("EventLiteFromUnstructured dropped a field: %+v", ev) + } +} diff --git a/products/catalyst/bootstrap/api/internal/k8scache/evaluators/flux.go b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/flux.go new file mode 100644 index 00000000..9325d87e --- /dev/null +++ b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/flux.go @@ -0,0 +1,165 @@ +// flux.go — Flux-managed evaluator. +// +// EPIC-1 (#1096) §4.3 row "Flux-managed (GitOps)". +// +// Logic (per `02-W-watcher-extension.md` brief): +// +// - Pass if the target carries the well-known +// `app.kubernetes.io/managed-by: flux` label. +// - Pass if any ownerReference has APIVersion containing +// `helm.toolkit.fluxcd.io` (HelmRelease) or +// `kustomize.toolkit.fluxcd.io` (Kustomization). +// - Fail otherwise. +// +// The evaluator runs against Pods (the per-event trigger path) but +// returns a SyntheticReport that points at the Pod's controller +// owner (Deployment / StatefulSet / DaemonSet) when one is +// resolvable. This matches the rest of the pipeline — score +// aggregation rolls up by workload, not by individual Pod. +// +// Pods that are themselves Flux-owned (rare but possible — Flux can +// manage a static Pod manifest) also pass. +// +// Both label key + Flux-owner-suffix are configurable via +// Config.FluxManagedByLabel / Config.FluxManagedByValue. Per +// docs/INVIOLABLE-PRINCIPLES.md #4. +package evaluators + +import ( + "context" + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" +) + +// FluxEvaluator implements `policy=flux-managed`. +type FluxEvaluator struct { + ManagedByLabel string + ManagedByValue string +} + +// NewFluxEvaluator builds a FluxEvaluator from cfg. +func NewFluxEvaluator(cfg Config) *FluxEvaluator { + return &FluxEvaluator{ + ManagedByLabel: cfg.FluxManagedByLabel, + ManagedByValue: cfg.FluxManagedByValue, + } +} + +func (FluxEvaluator) Name() string { return "flux-managed" } + +func (f *FluxEvaluator) Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport { + if !isPod(target) { + return nil + } + + // 1. Direct check on the Pod itself. + if f.targetIsFluxOwned(target) { + return []SyntheticReport{{ + Policy: f.Name(), + Rule: f.Name(), + Result: ResultPass, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "Pod carries flux managed-by label or HelmRelease / Kustomization owner", + Properties: map[string]string{"detection": "pod-direct"}, + }} + } + + // 2. Pod isn't directly Flux-managed — chase its controller. + owner := f.lookupController(snap, target) + if owner != nil && f.targetIsFluxOwned(owner) { + return []SyntheticReport{{ + Policy: f.Name(), + Rule: f.Name(), + Result: ResultPass, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "Pod's controller " + owner.GetKind() + "/" + owner.GetName() + " is Flux-managed", + Properties: map[string]string{ + "detection": "controller-flux-owned", + "controller": owner.GetKind() + "/" + owner.GetName(), + "controllerNs": owner.GetNamespace(), + }, + }} + } + + return []SyntheticReport{{ + Policy: f.Name(), + Rule: f.Name(), + Result: ResultFail, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "Pod and its controller carry no flux managed-by label and no Flux ownerRef", + }} +} + +// targetIsFluxOwned applies the label + ownerRef check to a single +// object. Pure function — no snapshot reads. +func (f *FluxEvaluator) targetIsFluxOwned(target *unstructured.Unstructured) bool { + if target == nil { + return false + } + // Label check. + if v, ok := target.GetLabels()[f.ManagedByLabel]; ok && strings.EqualFold(v, f.ManagedByValue) { + return true + } + // ownerRef check — HelmRelease / Kustomization from + // helm.toolkit.fluxcd.io / kustomize.toolkit.fluxcd.io. + for _, ref := range target.GetOwnerReferences() { + if strings.Contains(ref.APIVersion, "fluxcd.io") { + return true + } + } + return false +} + +// lookupController follows the Pod's controller ownerRef one hop +// (Pod → ReplicaSet / StatefulSet / DaemonSet) and where applicable +// chases the next hop (ReplicaSet → Deployment). Returns nil when +// the controller can't be located in the snapshot. +func (f *FluxEvaluator) lookupController(snap Snapshot, pod *unstructured.Unstructured) *unstructured.Unstructured { + for _, ref := range pod.GetOwnerReferences() { + switch ref.Kind { + case "Deployment": + return findInList(snap, "deployment", pod.GetNamespace(), ref.Name) + case "StatefulSet": + return findInList(snap, "statefulset", pod.GetNamespace(), ref.Name) + case "DaemonSet": + return findInList(snap, "daemonset", pod.GetNamespace(), ref.Name) + case "ReplicaSet": + rs := findInList(snap, "replicaset", pod.GetNamespace(), ref.Name) + if rs == nil { + return nil + } + // First check the RS itself — sometimes Flux + // produces ReplicaSets directly without a Deployment. + if f.targetIsFluxOwned(rs) { + return rs + } + for _, rsOwner := range rs.GetOwnerReferences() { + if rsOwner.Kind == "Deployment" { + return findInList(snap, "deployment", pod.GetNamespace(), rsOwner.Name) + } + } + return rs + } + } + return nil +} + +// findInList walks the snapshot and returns the first object matching +// (kind, namespace, name). Returns nil on miss or list error. +func findInList(snap Snapshot, kindName, namespace, name string) *unstructured.Unstructured { + list, err := snap.List(kindName, labels.Everything()) + if err != nil { + return nil + } + for _, obj := range list { + if obj.GetNamespace() == namespace && obj.GetName() == name { + return obj + } + } + return nil +} diff --git a/products/catalyst/bootstrap/api/internal/k8scache/evaluators/harbor.go b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/harbor.go new file mode 100644 index 00000000..8423e795 --- /dev/null +++ b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/harbor.go @@ -0,0 +1,124 @@ +// harbor.go — image-via-Harbor-proxy evaluator. +// +// EPIC-1 (#1096) §4.3 row "Images via Harbor proxy". +// +// Logic (per `02-W-watcher-extension.md` brief): +// +// - For each container in the Pod (containers + initContainers + +// ephemeralContainers), parse the image reference. +// - Pass if image starts with `/proxy-ghcr/`, +// `//`, or any of the operator-supplied +// HarborAllowedPrefixes (e.g. internal mirrors). +// - Fail if the image references docker.io, ghcr.io, quay.io, etc. +// directly — every image must traverse the Sovereign's Harbor +// proxy for cosign verification + air-gap fallback. +// - When Config.HarborDomain is empty (Sovereign without Harbor +// enabled) the evaluator skips every Pod. +// +// Per `feedback_never_hardcode_urls.md` the Harbor domain comes from +// runtime config — the Sovereign provisions Harbor at +// `harbor.` and stamps the same value into the +// catalyst-api's CATALYST_HARBOR_DOMAIN env. Tests inject directly +// via Config.HarborDomain. +package evaluators + +import ( + "context" + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// HarborEvaluator implements `policy=harbor-proxy-pull`. +type HarborEvaluator struct { + Domain string + AllowedPrefixes []string +} + +// NewHarborEvaluator builds a HarborEvaluator from cfg. +func NewHarborEvaluator(cfg Config) *HarborEvaluator { + return &HarborEvaluator{ + Domain: cfg.HarborDomain, + AllowedPrefixes: append([]string(nil), cfg.HarborAllowedPrefixes...), + } +} + +func (HarborEvaluator) Name() string { return "harbor-proxy-pull" } + +func (h *HarborEvaluator) Evaluate(ctx context.Context, _ Snapshot, target *unstructured.Unstructured) []SyntheticReport { + if !isPod(target) { + return nil + } + if h.Domain == "" { + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultSkip, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "Harbor not enabled on this Sovereign — evaluator skipped", + }} + } + + imgs := containerImages(target) + if len(imgs) == 0 { + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultSkip, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "Pod has no containers — nothing to check", + }} + } + + domainPrefix := h.Domain + "/" + rejected := []string{} + for _, img := range imgs { + if h.imageAccepted(img, domainPrefix) { + continue + } + rejected = append(rejected, img) + } + if len(rejected) == 0 { + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultPass, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "all container images via Harbor proxy", + }} + } + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultFail, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "container images bypass Harbor proxy: " + strings.Join(rejected, ", "), + Properties: map[string]string{ + "rejectedImages": strings.Join(rejected, ","), + "harborDomain": h.Domain, + }, + }} +} + +// imageAccepted returns true when the image string starts with the +// Harbor domain prefix or any of the allowed-prefix entries. +// +// Acceptance is BYTE-PREFIX, not substring — `harbor.evil.com/` +// must not match `harbor.openova.io/` because of a trailing-slash +// rule. The check is case-sensitive (image refs are case-sensitive +// in OCI). +func (h *HarborEvaluator) imageAccepted(img, harborPrefix string) bool { + if strings.HasPrefix(img, harborPrefix) { + return true + } + for _, p := range h.AllowedPrefixes { + if p != "" && strings.HasPrefix(img, p) { + return true + } + } + return false +} diff --git a/products/catalyst/bootstrap/api/internal/k8scache/evaluators/hpa.go b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/hpa.go new file mode 100644 index 00000000..ba7c7197 --- /dev/null +++ b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/hpa.go @@ -0,0 +1,207 @@ +// hpa.go — HPA-effective evaluator. +// +// EPIC-1 (#1096) §4.3 row "Autoscaler (HPA/VPA) effective". +// +// Logic (per `02-W-watcher-extension.md` brief): +// - If the target is a Pod, walk owner chain Pod → ReplicaSet → +// Deployment to identify the workload owner (StatefulSet / +// DaemonSet are also acceptable owners). Pods owned by Jobs / +// CronJobs / standalone are out-of-scope (skip). +// - Find an HPA (autoscaling/v2) whose spec.scaleTargetRef points +// at the workload owner. +// - No HPA → result=skip (HPA isn't applicable to this workload — +// e.g. a singleton control-plane pod). +// - HPA present but currentReplicas < minReplicas → result=fail +// (the autoscaler isn't keeping the floor). +// - HPA present + currentReplicas >= minReplicas → result=pass. +// +// The score aggregator (slice S1) drops `skip` rows from the +// denominator so this evaluator does NOT punish workloads that +// legitimately have no HPA. +package evaluators + +import ( + "context" + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" +) + +// HPAEvaluator implements `policy=hpa-effective`. +type HPAEvaluator struct { + // MinFloor — synthetic floor below which the evaluator emits FAIL + // even when the HPA reports happiness. Defaults to Config.HPAMinReplicas + // at engine wiring. + MinFloor int32 +} + +// NewHPAEvaluator constructs an HPAEvaluator with values copied from cfg. +func NewHPAEvaluator(cfg Config) *HPAEvaluator { + return &HPAEvaluator{MinFloor: cfg.HPAMinReplicas} +} + +// Name — canonical policy id. +func (HPAEvaluator) Name() string { return "hpa-effective" } + +func (h *HPAEvaluator) Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport { + if !isPod(target) { + return nil + } + // 1. Resolve workload owner via Pod → ReplicaSet → Deployment chain. + ownerKind, ownerName, ownerNamespace := resolveWorkloadOwner(snap, target) + if ownerKind == "" || ownerName == "" { + // Standalone Pod / Job-owned — out of scope. + return []SyntheticReport{newSkip(h.Name(), target, "pod has no controller workload owner")} + } + + // 2. Find an HPA whose scaleTargetRef matches. + hpa := findHPAFor(snap, ownerKind, ownerName, ownerNamespace) + if hpa == nil { + return []SyntheticReport{newSkip(h.Name(), target, fmt.Sprintf("no HPA targets %s/%s in %s", ownerKind, ownerName, ownerNamespace))} + } + + // 3. Compare currentReplicas vs minReplicas (and the synthetic + // floor MinFloor). + min, _ := hpaMinReplicas(hpa) + current, _ := hpaCurrentReplicas(hpa) + if min < h.MinFloor { + min = h.MinFloor + } + props := map[string]string{ + "hpaName": hpa.GetName(), + "hpaNamespace": hpa.GetNamespace(), + "minReplicas": fmt.Sprintf("%d", min), + "currentReplicas": fmt.Sprintf("%d", current), + "workloadKind": ownerKind, + "workloadName": ownerName, + "workloadNamespace": ownerNamespace, + } + if current < min { + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultFail, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: fmt.Sprintf("HPA %s/%s reports currentReplicas=%d below minReplicas=%d", hpa.GetNamespace(), hpa.GetName(), current, min), + Properties: props, + }} + } + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultPass, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: fmt.Sprintf("HPA %s/%s satisfies minReplicas=%d (current=%d)", hpa.GetNamespace(), hpa.GetName(), min, current), + Properties: props, + }} +} + +// resolveWorkloadOwner walks Pod → controller → (ReplicaSet → +// Deployment) and returns (kind, name, namespace) of the top-level +// workload. Returns ("","","") for standalone or Job-owned Pods. +func resolveWorkloadOwner(snap Snapshot, pod *unstructured.Unstructured) (string, string, string) { + if pod == nil { + return "", "", "" + } + ns := pod.GetNamespace() + for _, ref := range pod.GetOwnerReferences() { + // Direct workload owners — StatefulSet, DaemonSet are + // terminal here. + switch ref.Kind { + case "StatefulSet", "DaemonSet", "Deployment": + return ref.Kind, ref.Name, ns + case "ReplicaSet": + // Hop through ReplicaSet to Deployment if the RS itself + // has a Deployment owner. We look up the RS in the + // snapshot — if it isn't cached, fall back to "ReplicaSet" + // directly (HPA can target ReplicaSet too, rarely). + rsList, err := snap.List("replicaset", labels.Everything()) + if err == nil { + for _, rs := range rsList { + if rs.GetName() == ref.Name && rs.GetNamespace() == ns { + for _, rsOwner := range rs.GetOwnerReferences() { + if rsOwner.Kind == "Deployment" { + return "Deployment", rsOwner.Name, ns + } + } + } + } + } + return "ReplicaSet", ref.Name, ns + case "Job": + // Job-owned pods are out of scope — return empty and let + // the caller emit skip. + return "", "", "" + } + } + return "", "", "" +} + +// findHPAFor scans every HPA in the snapshot and returns the first +// one whose spec.scaleTargetRef matches (kind, name, namespace). +// Returns nil when no match. +// +// HPA is a namespace-scoped resource; we iterate every namespace's +// HPA in the cache. Cost is O(hpa-count); typical clusters have +// O(10) HPAs so this is cheap. +// +// Kind name "horizontalpodautoscaler" — the canonical k8scache name +// for autoscaling/v2 HPAs (registered by the operator via the kinds +// ConfigMap on Sovereigns that opt-in; absent on others). When the +// kind is not registered the Snapshot.List returns an error and we +// gracefully report nil (caller emits skip). +func findHPAFor(snap Snapshot, ownerKind, ownerName, ownerNamespace string) *unstructured.Unstructured { + hpaList, err := snap.List("horizontalpodautoscaler", labels.Everything()) + if err != nil { + return nil + } + for _, hpa := range hpaList { + if hpa.GetNamespace() != ownerNamespace { + continue + } + ref, found, _ := unstructured.NestedMap(hpa.Object, "spec", "scaleTargetRef") + if !found { + continue + } + kind, _ := ref["kind"].(string) + name, _ := ref["name"].(string) + if strings.EqualFold(kind, ownerKind) && name == ownerName { + return hpa + } + } + return nil +} + +// hpaMinReplicas extracts spec.minReplicas (int32 default 1). +func hpaMinReplicas(hpa *unstructured.Unstructured) (int32, bool) { + v, found, err := unstructured.NestedInt64(hpa.Object, "spec", "minReplicas") + if err != nil || !found { + return 1, false + } + return int32(v), true +} + +// hpaCurrentReplicas extracts status.currentReplicas (int32 default 0). +func hpaCurrentReplicas(hpa *unstructured.Unstructured) (int32, bool) { + v, found, err := unstructured.NestedInt64(hpa.Object, "status", "currentReplicas") + if err != nil || !found { + return 0, false + } + return int32(v), true +} + +// newSkip — small helper so each branch above stays readable. +func newSkip(policy string, target *unstructured.Unstructured, msg string) SyntheticReport { + return SyntheticReport{ + Policy: policy, + Rule: policy, + Result: ResultSkip, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: msg, + } +} diff --git a/products/catalyst/bootstrap/api/internal/k8scache/evaluators/hubble.go b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/hubble.go new file mode 100644 index 00000000..a6cf137a --- /dev/null +++ b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/hubble.go @@ -0,0 +1,127 @@ +// hubble.go — Hubble flow-observed evaluator. +// +// EPIC-1 (#1096) §4.3 row "Hubble flows observed (last 5m)". +// +// **Status — DEFERRED HUBBLE CLIENT**: +// +// The brief permits this evaluator to defer when the Hubble Observer +// gRPC client (`github.com/cilium/cilium/api/v1/observer`) is not in +// the catalyst-api dependency graph. As of this slice (W2) the +// catalyst-api has NOT pulled `github.com/cilium/cilium` — adding it +// would import the full Cilium operator graph (~30 transitive Go +// modules; ~80MB on `go mod download`). The Coordinator's standing +// rule on dep weight is to land the synthetic-row plumbing first + +// wire the Hubble client in a follow-up slice once the score +// aggregator has a real consumer. +// +// Behaviour TODAY: +// +// - When Config.HubbleEnabled == false (the default), the evaluator +// emits result=skip for every Pod. Score aggregator drops skip +// rows from the denominator → no false negatives on the score. +// - When Config.HubbleEnabled == true (after the follow-up wires +// the client), the evaluator queries the Observer API for the +// last Config.HubbleLookbackWindow (default 5min) of flows +// touching the Pod's IP. Pass when ≥1 flow seen, fail otherwise. +// +// Wiring of the actual Hubble client lives in the follow-up issue; +// this file documents the contract so the consumer (slice S1) can +// plan around the skip-then-pass transition without re-tooling. +package evaluators + +import ( + "context" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// HubbleEvaluator implements `policy=hubble-flows-seen`. +type HubbleEvaluator struct { + Enabled bool + + // Lookback — passed to the Observer API client when Enabled. + // Stored so a future implementation reads it; not used today. + LookbackSeconds int64 + + // Probe — pluggable client used in the Enabled branch. + // Production wiring (follow-up slice) sets this to a thin + // wrapper around the cilium/cilium observer gRPC client. Tests + // inject a fake to exercise pass/fail without the Hubble dep. + Probe HubbleProbe +} + +// HubbleProbe is the pluggable interface the evaluator calls when +// Enabled. The implementation queries the Hubble Observer API for +// flows touching the given Pod within the given lookback window. +type HubbleProbe interface { + // FlowsSeen returns true if Hubble has observed at least one flow + // to/from the Pod's IP/UID in the last `lookbackSeconds` seconds. + // Errors fail open (pass) — the evaluator does not punish a Pod + // for an unreachable Hubble Observer; ops can detect Hubble + // outages via the cluster-level alarm. + FlowsSeen(ctx context.Context, pod *unstructured.Unstructured, lookbackSeconds int64) (bool, error) +} + +// NewHubbleEvaluator builds a HubbleEvaluator from cfg. +func NewHubbleEvaluator(cfg Config) *HubbleEvaluator { + return &HubbleEvaluator{ + Enabled: cfg.HubbleEnabled, + LookbackSeconds: int64(cfg.HubbleLookbackWindow.Seconds()), + } +} + +func (HubbleEvaluator) Name() string { return "hubble-flows-seen" } + +func (h *HubbleEvaluator) Evaluate(ctx context.Context, _ Snapshot, target *unstructured.Unstructured) []SyntheticReport { + if !isPod(target) { + return nil + } + if !h.Enabled || h.Probe == nil { + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultSkip, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "Hubble UI not enabled on this Sovereign — evaluator skipped", + Properties: map[string]string{ + "reason": "hubble-disabled", + }, + }} + } + + seen, err := h.Probe.FlowsSeen(ctx, target, h.LookbackSeconds) + if err != nil { + // Fail open — surface as warn so the score isn't depressed + // by a transient Hubble Observer outage. + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultWarn, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "Hubble Observer unreachable — flow check inconclusive", + Properties: map[string]string{ + "err": err.Error(), + }, + }} + } + if seen { + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultPass, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "Hubble observed flows to/from Pod within lookback window", + }} + } + return []SyntheticReport{{ + Policy: h.Name(), + Rule: h.Name(), + Result: ResultFail, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "Hubble observed no flows to/from Pod within lookback window", + }} +} diff --git a/products/catalyst/bootstrap/api/internal/k8scache/evaluators/otel.go b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/otel.go new file mode 100644 index 00000000..1d1b660b --- /dev/null +++ b/products/catalyst/bootstrap/api/internal/k8scache/evaluators/otel.go @@ -0,0 +1,130 @@ +// otel.go — OTel auto-instrumentation evaluator. +// +// EPIC-1 (#1096) §4.3 row "OTel auto-instrumentation present". +// +// Logic (per `02-W-watcher-extension.md` brief): +// +// 1. For each Pod: check container list for an `otel-collector` +// sidecar (heuristic: image substring matches +// Config.OTelSidecarImageMatch — default `opentelemetry-collector`). +// If found → result=pass. +// 2. OTel Operator auto-injection: check if the Pod has an annotation +// with prefix `instrumentation.opentelemetry.io/inject-` whose +// value is `true`. Combined with an Instrumentation CR existing in +// the Pod's namespace → result=pass. +// 3. Neither → result=fail. +// +// The Instrumentation CR is a namespaced opentelemetry.io/v1alpha1 +// resource. When the kind is not registered in the k8scache (the +// bp-otel-operator chart isn't installed on this Sovereign) the +// evaluator falls back to sidecar-only detection — annotation alone +// without the operator running is meaningless. +package evaluators + +import ( + "context" + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" +) + +// OTelEvaluator implements `policy=otel-injected`. +type OTelEvaluator struct { + SidecarImageMatch string + InjectAnnotationPrefix string + InstrumentationKindName string +} + +// NewOTelEvaluator builds an OTelEvaluator from cfg. +func NewOTelEvaluator(cfg Config) *OTelEvaluator { + return &OTelEvaluator{ + SidecarImageMatch: cfg.OTelSidecarImageMatch, + InjectAnnotationPrefix: cfg.OTelInjectAnnotationPrefix, + InstrumentationKindName: cfg.OTelInstrumentationKind, + } +} + +func (OTelEvaluator) Name() string { return "otel-injected" } + +func (o *OTelEvaluator) Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport { + if !isPod(target) { + return nil + } + + // 1. Sidecar check — substring match against container images. + for _, img := range containerImages(target) { + if o.SidecarImageMatch != "" && strings.Contains(img, o.SidecarImageMatch) { + return []SyntheticReport{{ + Policy: o.Name(), + Rule: o.Name(), + Result: ResultPass, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "OTel collector sidecar detected (image=" + img + ")", + Properties: map[string]string{ + "detection": "sidecar", + "image": img, + }, + }} + } + } + + // 2. Auto-inject path — Pod annotation + Instrumentation CR in + // the same namespace. + annots := target.GetAnnotations() + injectAnnotation := "" + for k, v := range annots { + if strings.HasPrefix(k, o.InjectAnnotationPrefix) && strings.EqualFold(v, "true") { + injectAnnotation = k + break + } + } + if injectAnnotation != "" { + // Look up Instrumentation CR in the same namespace. + instList, err := snap.List(o.InstrumentationKindName, labels.Everything()) + if err == nil { + for _, inst := range instList { + if inst.GetNamespace() == target.GetNamespace() { + return []SyntheticReport{{ + Policy: o.Name(), + Rule: o.Name(), + Result: ResultPass, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "OTel auto-inject annotation present + Instrumentation CR in namespace", + Properties: map[string]string{ + "detection": "auto-inject", + "annotation": injectAnnotation, + "instrumentationName": inst.GetName(), + }, + }} + } + } + } + // Annotation present but no Instrumentation CR — operator + // not installed. Surface as fail with a hint. + return []SyntheticReport{{ + Policy: o.Name(), + Rule: o.Name(), + Result: ResultFail, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "OTel auto-inject annotation set but no Instrumentation CR in namespace — operator missing", + Properties: map[string]string{ + "detection": "auto-inject-orphan", + "annotation": injectAnnotation, + }, + }} + } + + // 3. Neither path matched. + return []SyntheticReport{{ + Policy: o.Name(), + Rule: o.Name(), + Result: ResultFail, + Resource: resourceFor(target), + Namespace: target.GetNamespace(), + Message: "no OTel collector sidecar and no auto-inject annotation", + }} +} diff --git a/products/catalyst/bootstrap/api/internal/k8scache/factory.go b/products/catalyst/bootstrap/api/internal/k8scache/factory.go index 5f66e6d5..b6508a15 100644 --- a/products/catalyst/bootstrap/api/internal/k8scache/factory.go +++ b/products/catalyst/bootstrap/api/internal/k8scache/factory.go @@ -72,6 +72,19 @@ const ( EventDeleted EventType = "DELETED" ) +// KindComplianceEvaluator is the canonical kind name carried on every +// synthetic PolicyReport-shaped event published by evaluators in +// internal/k8scache/evaluators (EPIC-1 #1096 slice W2). The score +// aggregator (slice S1) subscribes to this kind alongside the upstream +// `policyreport` / `clusterpolicyreport` kinds — a single SSE stream, +// uniform shape. +// +// The synthetic kind is NOT registered as a watch target — it has no +// GVR and the factory never LISTs a real K8s resource for it. +// Subscribers filter on it via the same `?kinds=` query parameter that +// gates pods, services, etc. +const KindComplianceEvaluator = "compliance-evaluator" + // Event is a single K8s state-change notification. Encoded directly // into the SSE frame — field names are part of the public wire // contract and must stay stable. @@ -637,6 +650,21 @@ func (f *Factory) dispatch(cs *clusterState, k Kind, t EventType, obj any) { f.fanout(ev) } +// Publish injects a non-watch-derived Event onto the SSE fanout. Used +// by internal/k8scache/evaluators (EPIC-1 #1096 slice W2) to emit +// synthetic PolicyReport-shaped events on the +// `compliance-evaluator` kind. The event MUST set Cluster, Kind, +// Type, Object, At — the factory does NOT mutate or redact synthetic +// payloads (the producer owns them). +// +// Concurrency: safe for concurrent callers; the underlying fanout +// holds subMu briefly to snapshot the subscriber set, then sends +// non-blocking with the same drop-oldest backpressure as informer +// events. +func (f *Factory) Publish(ev Event) { + f.fanout(ev) +} + // fanout — non-blocking send to each subscriber. On full channel we // drop the OLDEST event (drain one then push) so the consumer // catches up automatically without the producer waiting. diff --git a/products/catalyst/bootstrap/api/internal/k8scache/k8scache_test.go b/products/catalyst/bootstrap/api/internal/k8scache/k8scache_test.go index 6a0058a2..c4de6713 100644 --- a/products/catalyst/bootstrap/api/internal/k8scache/k8scache_test.go +++ b/products/catalyst/bootstrap/api/internal/k8scache/k8scache_test.go @@ -116,6 +116,8 @@ func TestDefaultKinds_GraphAndDashboardSurface(t *testing.T) { "persistentvolume", "replicaset", "endpointslice", // dashboard color_by=utilization depends on this (#1084) "podmetrics", + // EPIC-1 (#1096) compliance — Kyverno PolicyReports. + "policyreport", "clusterpolicyreport", } for _, name := range mandatory { if _, ok := r.Get(name); !ok { @@ -603,3 +605,152 @@ func itoa(n int) string { // keep corev1 referenced to prevent the import being elided when this // file changes. var _ = corev1.NamespaceDefault + +// ── EPIC-1 (#1096) — PolicyReport SSE fanout ───────────────────── + +// newPolicyReport returns a tiny unstructured wgpolicyk8s.io PolicyReport. +// Mirrors the schema produced by the Kyverno reports controller — fields +// kept minimal because the score aggregator (slice S1) reads only +// `metadata.{name,namespace,labels,ownerReferences}` and `results[]`. +// +// Results pass through DeepCopyJSON so the slice element type must be +// `any` (not `map[string]any`) — DeepCopyJSON refuses unknown concrete +// slice element types. +func newPolicyReport(ns, name string, results []any) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "wgpolicyk8s.io/v1alpha2", + "kind": "PolicyReport", + "metadata": map[string]any{ + "namespace": ns, + "name": name, + "resourceVersion": "1", + }, + "results": results, + }} +} + +func newClusterPolicyReport(name string, results []any) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "wgpolicyk8s.io/v1alpha2", + "kind": "ClusterPolicyReport", + "metadata": map[string]any{ + "name": name, + "resourceVersion": "1", + }, + "results": results, + }} +} + +// policyReportRegistry — minimal registry with the two PolicyReport +// kinds. Used by the W1 fanout test below. +func policyReportRegistry() *Registry { + r := NewRegistry() + _ = r.Add(Kind{ + Name: "policyreport", + GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"}, + Namespaced: true, + }) + _ = r.Add(Kind{ + Name: "clusterpolicyreport", + GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"}, + Namespaced: false, + }) + return r +} + +// fakePolicyReportClients — like fakeClients but seeds the discovery +// scheme with the wgpolicyk8s.io types so the dynamic informer's LIST + +// WATCH succeed against the in-memory store. +func fakePolicyReportClients(objs ...runtime.Object) (*dynamicfake.FakeDynamicClient, clientgokubernetes.Interface) { + scheme := runtime.NewScheme() + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "PolicyReportList"}, &unstructured.UnstructuredList{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "PolicyReport"}, &unstructured.Unstructured{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "ClusterPolicyReportList"}, &unstructured.UnstructuredList{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "ClusterPolicyReport"}, &unstructured.Unstructured{}) + gvrToListKind := map[schema.GroupVersionResource]string{ + {Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"}: "PolicyReportList", + {Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"}: "ClusterPolicyReportList", + } + dyn := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrToListKind, objs...) + core := kfake.NewSimpleClientset() + return dyn, core +} + +// TestPolicyReport_FlowsThroughSSEFanout asserts W1 — applying a +// PolicyReport CR fires the same ADD event the architecture-graph kinds +// fire, with no special-case handling. Coverage: +// - PolicyReport (namespace-scoped) +// - ClusterPolicyReport (cluster-scoped) on the same factory +// - Subscriber filtered to compliance kinds receives both +func TestPolicyReport_FlowsThroughSSEFanout(t *testing.T) { + dyn, core := fakePolicyReportClients() + cfg := Config{ + Logger: quietLogger(), + Registry: policyReportRegistry(), + Clusters: []ClusterRef{ + {ID: "alpha", DynamicClient: dyn, CoreClient: core}, + }, + } + f, err := NewFactory(cfg) + if err != nil { + t.Fatalf("NewFactory: %v", err) + } + defer f.Stop() + + ch, unsub := f.Subscribe("operator", map[string]struct{}{ + "policyreport": {}, + "clusterpolicyreport": {}, + }) + defer unsub() + + if err := f.Start(context.Background()); err != nil { + t.Fatalf("Start: %v", err) + } + + // Apply a namespaced PolicyReport. + results := []any{ + map[string]any{ + "policy": "multi-replica-drainability", + "rule": "multi-replica-drainability", + "result": "fail", + "message": "Deployment has only 1 replica", + }, + } + prGVR := schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"} + if _, err := dyn.Resource(prGVR).Namespace("acme").Create(context.Background(), newPolicyReport("acme", "pod-frontend-7c5f", results), metav1.CreateOptions{}); err != nil { + t.Fatalf("create PolicyReport: %v", err) + } + + // Apply a cluster-scoped ClusterPolicyReport. + cprGVR := schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"} + if _, err := dyn.Resource(cprGVR).Create(context.Background(), newClusterPolicyReport("ns-acme", results), metav1.CreateOptions{}); err != nil { + t.Fatalf("create ClusterPolicyReport: %v", err) + } + + gotPR, gotCPR := false, false + timeout := time.After(2 * time.Second) + for !(gotPR && gotCPR) { + select { + case ev, ok := <-ch: + if !ok { + t.Fatalf("subscriber channel closed") + } + if ev.Type != EventAdded { + continue + } + if ev.Kind == "policyreport" && ev.Object.GetName() == "pod-frontend-7c5f" { + // Sanity check — body survives unmodified (PolicyReport + // is non-sensitive so redact is a no-op). + if results, _, _ := unstructured.NestedSlice(ev.Object.Object, "results"); len(results) != 1 { + t.Fatalf("PolicyReport.results not preserved through fanout") + } + gotPR = true + } + if ev.Kind == "clusterpolicyreport" && ev.Object.GetName() == "ns-acme" { + gotCPR = true + } + case <-timeout: + t.Fatalf("never received PolicyReport (got=%v) + ClusterPolicyReport (got=%v) ADD events", gotPR, gotCPR) + } + } +} diff --git a/products/catalyst/bootstrap/api/internal/k8scache/kinds.go b/products/catalyst/bootstrap/api/internal/k8scache/kinds.go index 690dfaba..c132f3f1 100644 --- a/products/catalyst/bootstrap/api/internal/k8scache/kinds.go +++ b/products/catalyst/bootstrap/api/internal/k8scache/kinds.go @@ -120,6 +120,20 @@ var DefaultKinds = []Kind{ // real Sovereign ships bp-metrics-server in the platform bundle, // so the utilization gradient renders out of the box. {Name: "podmetrics", GVR: schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "pods"}, Namespaced: true}, + + // EPIC-1 (#1096) — Compliance: Kyverno PolicyReports. + // + // `wgpolicyk8s.io/v1alpha2/PolicyReport` is the namespace-scoped + // per-resource compliance report Kyverno emits for every Pod / + // Workload it audits. `ClusterPolicyReport` is the cluster-scoped + // equivalent for cluster-scoped resources (Namespaces, Nodes, + // CRDs, …). The score aggregator (slice S1) consumes both via the + // same SSE fanout the architecture graph already uses — no special + // path. The reports themselves carry no secret material (Kyverno + // omits the offending object's data fields by design) so + // Sensitive=false is correct. + {Name: "policyreport", GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"}, Namespaced: true}, + {Name: "clusterpolicyreport", GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"}, Namespaced: false}, } // Registry is a runtime-mutable lookup keyed by the short Name. It