feat(k8scache): subscribe to PolicyReport + 5 custom evaluators (slice W, #1096) (#1139)

W1: extend `internal/k8scache/kinds.go` `DefaultKinds` with
`wgpolicyk8s.io/v1alpha2/PolicyReport` (namespaced) and
`ClusterPolicyReport` (cluster-scoped). Reports flow through the
existing `Factory.dispatch` → `fanout` → SSE subscribers — no special
treatment. Test coverage: `TestPolicyReport_FlowsThroughSSEFanout`
applies a synthetic PolicyReport + ClusterPolicyReport via the fake
dynamic client and asserts both ADD events arrive at a kind-filtered
subscriber.

W2: new package `internal/k8scache/evaluators/` shipping 5 custom
evaluators that emit synthetic PolicyReport-shaped rows on the
`compliance-evaluator` SSE channel:

  - hpa.go     — HPA `spec.minReplicas` vs `status.currentReplicas`,
                 with Pod → ReplicaSet → Deployment owner chain.
  - otel.go    — OTel collector sidecar OR Pod auto-inject annotation
                 + namespace Instrumentation CR.
  - hubble.go  — Hubble Observer flow check (DEFERRED: cilium/cilium
                 client not pulled by current deps; evaluator emits
                 skip when `Config.HubbleEnabled=false`, follow-up
                 slice wires the gRPC client).
  - harbor.go  — image starts with `<HarborDomain>/...` or operator-
                 supplied allow-list prefix; fail on docker.io / ghcr.io
                 direct refs.
  - flux.go    — `app.kubernetes.io/managed-by: flux` label OR Flux
                 ownerRef on the Pod or its controller.

Engine architecture (per ADR-0001 §5):
  - Subscribes to Pod ADD/MODIFY events from the watcher.
  - 30s ticker re-evaluates over the in-process Indexer (no apiserver
    polling — pure cache reads).
  - Publishes synthetic events via the new exported
    `Factory.Publish(Event)` method which re-uses the same fanout the
    architecture-graph subscribers consume.
  - `KindComplianceEvaluator = "compliance-evaluator"` constant for
    the score aggregator (slice S1) to subscribe to.

Per INVIOLABLE-PRINCIPLES #4: every threshold (HPA min replicas,
Hubble lookback, Harbor regex, OTel annotation prefix, Flux label
key/value) is a Config field — no hardcoded values.

Tests (28 unit cases, 17 evaluator-specific covering pass/fail/skip
matrix per evaluator + 8 engine + 1 helper):
  - go test -count=1 -race ./internal/k8scache/...  → CLEAN
  - go vet ./... → CLEAN

Co-authored-by: hatiyildiz <hatice@openova.io>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
e3mrah 2026-05-09 02:02:43 +04:00 committed by GitHub
parent d74e0d5e5a
commit a987748b42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 2280 additions and 0 deletions

View File

@ -0,0 +1,575 @@
// Package evaluators — synthetic PolicyReport producers for compliance
// signals Kyverno cannot evaluate at admission.
//
// EPIC-1 (#1096) Slice W2 — five evaluators ship in this package:
//
// - hpa : HPA min replicas vs Deployment.replicas
// - otel : Pod has otel-collector sidecar OR namespace has Instrumentation CR
// - hubble : Cilium Hubble has observed flow to/from this Pod (deferred — no client dep)
// - harbor : Container image refs harbor.<sovereign>/...
// - flux : Resource has app.kubernetes.io/managed-by=flux OR Flux ownerRef
//
// Architecture (per docs/EPICS-1-6-unified-design.md §4.1, brief
// `02-W-watcher-extension.md`):
//
// k8scache.Factory evaluators.Engine
// ───────────────── ──────────────────
// [informer events] ─Pod ADDED ──→ Subscribe(kinds={pod})
// │
// ┌────────┴────────┐
// │ EvaluateAll(...) │
// └────────┬────────┘
// │
// synthetic SyntheticReport rows
// │
// Factory.Publish(Event{Kind:"compliance-evaluator"})
// │
// [SSE fanout] ←───────────── re-enters the same fanout the
// architecture-graph subscribers consume
//
// Per ADR-0001 §5: the 30s ticker re-evaluates over the in-process
// Indexer, NOT against the apiserver. Evaluators are pure functions
// over a Snapshot read-interface — they never make REST calls.
//
// Per docs/INVIOLABLE-PRINCIPLES.md #4: every threshold (lookback
// window, regex pattern, registry domain template) is a Config field
// — no hardcoded values.
package evaluators
import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
)
// Result is the canonical outcome — string-typed so it round-trips
// 1:1 through the synthetic PolicyReport JSON wire format the score
// aggregator (slice S1) consumes.
type Result string
const (
// ResultPass — workload satisfies the policy.
ResultPass Result = "pass"
// ResultFail — workload violates the policy.
ResultFail Result = "fail"
// ResultSkip — policy not applicable to this workload (e.g. HPA
// not present, Hubble UI disabled). Slice S1's normalizer drops
// skip rows from the denominator.
ResultSkip Result = "skip"
// ResultWarn — informational, does not affect the score.
ResultWarn Result = "warn"
)
// SyntheticReport mirrors the wgpolicyk8s.io PolicyReport row schema so
// the score aggregator's join is uniform across Kyverno-emitted rows
// and evaluator-emitted ones.
//
// Wire-shape: encoded directly into the SSE event's Object. The score
// aggregator deserialises this struct from each event's
// .object.results[0] field — keep field names stable across releases.
type SyntheticReport struct {
// Policy — short canonical policy name, matching the EPIC-1 §4.3
// table ("hpa-effective", "otel-injected", "harbor-proxy-pull",
// "flux-managed", "hubble-flows-seen").
Policy string `json:"policy"`
// Rule — for parity with Kyverno's per-rule reporting; equal to
// Policy for the simple one-rule evaluators in this package.
Rule string `json:"rule"`
// Result — pass / fail / skip / warn.
Result Result `json:"result"`
// Resource — back-pointer to the offending K8s object. Populated
// from the workload metadata (apiVersion, kind, name, namespace,
// uid). Skip rows still set Resource so the UI drill-down shows
// the workload that was inspected.
Resource metav1.OwnerReference `json:"resource"`
// Namespace — copied from the workload for the namespace-aware
// drill-down filter.
Namespace string `json:"namespace,omitempty"`
// Message — human-readable explanation.
Message string `json:"message,omitempty"`
// Properties — arbitrary key-value details (e.g. for HPA the
// minReplicas + currentReplicas observed values; for Harbor the
// rejected image ref).
Properties map[string]string `json:"properties,omitempty"`
// Time — server-side timestamp the row was produced. Driven by
// the Engine's clock for deterministic test fixtures.
Time metav1.Time `json:"time"`
}
// Snapshot is the read-side interface evaluators consume. Backed in
// production by k8scache.Factory.List which reads the in-process
// Indexer (no apiserver calls). Tests inject a fake.
//
// Cluster-scope: every evaluator works against ONE Sovereign at a
// time — the cluster id is passed at Engine.Run time. Snapshot.List
// returns objects from that single cluster's Indexer.
type Snapshot interface {
// List returns every object of `kindName` currently in the cache,
// optionally filtered by a label selector. An empty selector
// returns all objects. Returns an error when the kind is not
// registered.
List(kindName string, sel labels.Selector) ([]*unstructured.Unstructured, error)
}
// Evaluator is a pure function: given a snapshot of the cluster and a
// target workload (Pod / Deployment / etc.), produce zero or more
// SyntheticReport rows.
//
// Evaluators MUST NOT block on I/O — they read from `snap` (the local
// Indexer) and return. Long-running reachability checks (e.g. Hubble
// flow query) belong on the engine's tick goroutine, not in
// Evaluate.
type Evaluator interface {
// Name returns the canonical policy name. Used for logging,
// metrics, and the SyntheticReport.Policy field.
Name() string
// Evaluate applies the policy to the target. May return:
// - one row (typical)
// - zero rows (target not in scope — skipped silently rather
// than emitting a skip row; e.g. otel evaluator skips
// non-application workloads)
// - multiple rows (one per container, etc.)
Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport
}
// Config — runtime knobs for the engine and individual evaluators.
// Per INVIOLABLE-PRINCIPLES #4 every threshold is a config var.
type Config struct {
// Logger — required.
Logger *slog.Logger
// TickInterval — how often the engine re-evaluates over the
// snapshot. Defaults to 30s. Setting to 0 disables the ticker (the
// engine still reacts to Pod ADD/MODIFY events).
TickInterval time.Duration
// HPAMinReplicas — only inspected by hpa.go; the floor under
// which the evaluator emits FAIL even when HPA is happy. Default
// 1 (any positive replica count is acceptable).
HPAMinReplicas int32
// HubbleEnabled — when false the hubble evaluator emits skip
// without trying to talk to the Hubble Observer API. Wired from
// the bp-cilium chart's `hubble.relay.enabled` value.
HubbleEnabled bool
// HubbleLookbackWindow — how far back in time the hubble
// evaluator searches for flows touching the Pod. Default 5min.
HubbleLookbackWindow time.Duration
// HarborDomain — the per-Sovereign Harbor host used by the harbor
// evaluator's prefix check (e.g. `harbor.omantel.omani.works`).
// Empty disables the harbor evaluator (skip everywhere). Per
// `feedback_never_hardcode_urls.md` this is a runtime config —
// the value is sourced from the Sovereign's bp-harbor chart, NOT
// from Go.
HarborDomain string
// HarborAllowedPrefixes — additional registry prefixes that are
// permitted (e.g. internal mirrors). Default empty. The harbor
// evaluator passes if image starts with `<HarborDomain>/` OR any
// of these prefixes.
HarborAllowedPrefixes []string
// FluxManagedByLabel — label key whose value `flux` indicates
// Flux ownership. Default `app.kubernetes.io/managed-by`.
FluxManagedByLabel string
// FluxManagedByValue — label value indicating Flux ownership.
// Default `flux`.
FluxManagedByValue string
// OTelInjectAnnotationPrefix — annotation prefix that signals
// OTel auto-instrumentation (Pod-level). Default
// `instrumentation.opentelemetry.io/inject-`. The evaluator
// checks any annotation key with this prefix whose value is
// `true`.
OTelInjectAnnotationPrefix string
// OTelSidecarImageMatch — substring matched against each
// container's image to detect an OTel collector sidecar.
// Default `opentelemetry-collector`.
OTelSidecarImageMatch string
// OTelInstrumentationKind — the kind name (in the k8scache
// registry) for the namespace-scoped Instrumentation CR. When
// the kind is not registered the evaluator falls back to
// sidecar + annotation checks only. Default
// `instrumentation.opentelemetry.io`.
OTelInstrumentationKind string
// Now — time source for SyntheticReport.Time. Defaults to
// time.Now. Overridden by tests for deterministic timestamps.
Now func() time.Time
}
func (c *Config) defaults() {
if c.TickInterval == 0 {
c.TickInterval = 30 * time.Second
}
if c.HPAMinReplicas == 0 {
c.HPAMinReplicas = 1
}
if c.HubbleLookbackWindow == 0 {
c.HubbleLookbackWindow = 5 * time.Minute
}
if c.FluxManagedByLabel == "" {
c.FluxManagedByLabel = "app.kubernetes.io/managed-by"
}
if c.FluxManagedByValue == "" {
c.FluxManagedByValue = "flux"
}
if c.OTelInjectAnnotationPrefix == "" {
c.OTelInjectAnnotationPrefix = "instrumentation.opentelemetry.io/inject-"
}
if c.OTelSidecarImageMatch == "" {
c.OTelSidecarImageMatch = "opentelemetry-collector"
}
if c.OTelInstrumentationKind == "" {
c.OTelInstrumentationKind = "instrumentation.opentelemetry.io"
}
if c.Now == nil {
c.Now = time.Now
}
}
// Publisher is the write-side interface — the Engine pushes synthetic
// events through this. Production wires it to k8scache.Factory.Publish;
// tests inject a recorder.
type Publisher interface {
// Publish emits one synthetic event. The Engine wraps every
// SyntheticReport into the wire envelope before calling.
Publish(clusterID string, report SyntheticReport)
}
// EvaluateAll runs every evaluator against the target and returns the
// concatenated results. Convenience wrapper used by the engine and
// tests; never returns nil — empty slice on no findings.
//
// Order is deterministic — evaluators are applied in the order
// supplied. Tests rely on this; do not parallelise here.
func EvaluateAll(ctx context.Context, snap Snapshot, target *unstructured.Unstructured, evals []Evaluator) []SyntheticReport {
if target == nil {
return nil
}
out := make([]SyntheticReport, 0, len(evals))
for _, e := range evals {
rows := e.Evaluate(ctx, snap, target)
out = append(out, rows...)
}
return out
}
// Engine subscribes to the watcher's Pod events, runs every registered
// evaluator on each event, and publishes the synthetic reports back
// through the SSE fanout.
//
// Lifecycle:
// - NewEngine validates Config + evaluator set
// - Run blocks until ctx is cancelled; spawns one goroutine per
// subscribed cluster + one ticker goroutine
// - Multiple invocations are NOT supported on the same Engine; create
// a new Engine for re-runs
type Engine struct {
cfg Config
evaluators []Evaluator
pub Publisher
// resolveSnapshot maps clusterID → Snapshot. The factory has one
// Indexer per cluster; the engine looks up by id on every event /
// tick.
resolveSnapshot func(clusterID string) (Snapshot, []string, error)
// subscribe maps a (user, kinds) pair to a channel. Production
// wires to Factory.Subscribe.
subscribe func(kinds map[string]struct{}) (<-chan eventLite, func())
mu sync.Mutex
started bool
}
// eventLite is the engine's internal copy of k8scache.Event minus the
// EventType — the engine only cares about ADD / MODIFY for the trigger
// path. DELETE is handled implicitly: when a target disappears from
// the Indexer, the next tick stops emitting reports for it.
type eventLite struct {
Cluster string
Kind string
Object *unstructured.Unstructured
}
// NewEngine wires an Engine without starting it.
func NewEngine(cfg Config, evals []Evaluator, pub Publisher,
resolveSnapshot func(clusterID string) (Snapshot, []string, error),
subscribe func(kinds map[string]struct{}) (<-chan eventLite, func()),
) (*Engine, error) {
if cfg.Logger == nil {
return nil, errors.New("evaluators: Config.Logger is required")
}
if pub == nil {
return nil, errors.New("evaluators: Publisher is required")
}
if resolveSnapshot == nil {
return nil, errors.New("evaluators: resolveSnapshot is required")
}
if subscribe == nil {
return nil, errors.New("evaluators: subscribe is required")
}
if len(evals) == 0 {
return nil, errors.New("evaluators: at least one Evaluator must be registered")
}
cfg.defaults()
return &Engine{
cfg: cfg,
evaluators: evals,
pub: pub,
resolveSnapshot: resolveSnapshot,
subscribe: subscribe,
}, nil
}
// Run starts the engine and blocks until ctx is cancelled. On exit it
// closes the watcher subscription cleanly.
func (e *Engine) Run(ctx context.Context) error {
e.mu.Lock()
if e.started {
e.mu.Unlock()
return errors.New("evaluators: Engine already started")
}
e.started = true
e.mu.Unlock()
// Subscribe to Pod events from the underlying watcher.
ch, unsub := e.subscribe(map[string]struct{}{"pod": {}})
defer unsub()
// Periodic ticker — pure cache reads, no apiserver polling. Per
// ADR-0001 §5 this is acceptable because evaluators compute over
// data already in the in-process Indexer.
var tick <-chan time.Time
if e.cfg.TickInterval > 0 {
t := time.NewTicker(e.cfg.TickInterval)
defer t.Stop()
tick = t.C
}
for {
select {
case <-ctx.Done():
return nil
case ev, ok := <-ch:
if !ok {
// Subscription closed by the factory (Stop). Exit
// cleanly.
return nil
}
e.evaluateOne(ctx, ev.Cluster, ev.Object)
case <-tick:
e.evaluateAllClusters(ctx)
}
}
}
// evaluateOne runs every evaluator against a single target and
// publishes the resulting rows.
func (e *Engine) evaluateOne(ctx context.Context, clusterID string, target *unstructured.Unstructured) {
if target == nil {
return
}
snap, _, err := e.resolveSnapshot(clusterID)
if err != nil {
e.cfg.Logger.Warn("evaluators: snapshot unavailable for event",
"cluster", clusterID, "err", err)
return
}
rows := EvaluateAll(ctx, snap, target, e.evaluators)
for i := range rows {
if rows[i].Time.IsZero() {
rows[i].Time = metav1.NewTime(e.cfg.Now())
}
e.pub.Publish(clusterID, rows[i])
}
}
// evaluateAllClusters fans the tick across every cluster the
// resolveSnapshot function knows about. Each cluster's Pod list is
// pulled from its Indexer and every evaluator runs against every
// target. Cost is O(clusters × pods × evaluators) but all reads are
// from the local cache.
func (e *Engine) evaluateAllClusters(ctx context.Context) {
// resolveSnapshot returns the cluster's Snapshot AND the list of
// known cluster ids when the second return is non-empty. Callers
// can pass clusterID="" to mean "give me the list of known
// clusters".
_, clusters, err := e.resolveSnapshot("")
if err != nil || len(clusters) == 0 {
return
}
for _, id := range clusters {
snap, _, err := e.resolveSnapshot(id)
if err != nil {
continue
}
pods, err := snap.List("pod", labels.Everything())
if err != nil {
continue
}
for _, p := range pods {
rows := EvaluateAll(ctx, snap, p, e.evaluators)
for i := range rows {
if rows[i].Time.IsZero() {
rows[i].Time = metav1.NewTime(e.cfg.Now())
}
e.pub.Publish(id, rows[i])
}
}
}
}
// helpers --------------------------------------------------------
// resourceFor builds an OwnerReference from the target's metadata.
// All five evaluators populate SyntheticReport.Resource via this so
// the wire shape is uniform.
func resourceFor(target *unstructured.Unstructured) metav1.OwnerReference {
if target == nil {
return metav1.OwnerReference{}
}
gv := target.GetAPIVersion()
return metav1.OwnerReference{
APIVersion: gv,
Kind: target.GetKind(),
Name: target.GetName(),
UID: target.GetUID(),
}
}
// containerImages returns every container image string in the Pod
// (initContainers + containers + ephemeralContainers). Empty slice
// when the target is not a Pod or has no containers. Used by harbor
// + otel evaluators.
func containerImages(target *unstructured.Unstructured) []string {
if target == nil {
return nil
}
out := []string{}
for _, group := range []string{"containers", "initContainers", "ephemeralContainers"} {
raw, found, _ := unstructured.NestedSlice(target.Object, "spec", group)
if !found {
continue
}
for _, item := range raw {
c, ok := item.(map[string]any)
if !ok {
continue
}
if img, ok := c["image"].(string); ok && img != "" {
out = append(out, img)
}
}
}
return out
}
// containerNames returns container names alongside images — needed by
// the otel evaluator's "is there a sidecar named otel-*" branch.
func containerNames(target *unstructured.Unstructured) []string {
if target == nil {
return nil
}
out := []string{}
for _, group := range []string{"containers", "initContainers"} {
raw, found, _ := unstructured.NestedSlice(target.Object, "spec", group)
if !found {
continue
}
for _, item := range raw {
c, ok := item.(map[string]any)
if !ok {
continue
}
if n, ok := c["name"].(string); ok && n != "" {
out = append(out, n)
}
}
}
return out
}
// hasOwnerOfKind returns true when the target has at least one
// ownerReference whose APIVersion and Kind match. Empty apiVersion
// matches any group (used by the flux evaluator's HelmRelease /
// Kustomization detection).
func hasOwnerOfKind(target *unstructured.Unstructured, apiGroupSuffix, kind string) bool {
if target == nil {
return false
}
for _, ref := range target.GetOwnerReferences() {
if !strings.EqualFold(ref.Kind, kind) {
continue
}
if apiGroupSuffix == "" || strings.Contains(ref.APIVersion, apiGroupSuffix) {
return true
}
}
return false
}
// isPod returns true when the target is a v1 Pod.
func isPod(target *unstructured.Unstructured) bool {
if target == nil {
return false
}
return target.GetKind() == "Pod"
}
// reportTime — produce a metav1.Time from the engine's clock; used by
// evaluators that may emit before the engine fills it in.
func reportTime(now func() time.Time) metav1.Time {
if now == nil {
now = time.Now
}
return metav1.NewTime(now())
}
// formatLabelMap stringifies a label set for human-readable
// SyntheticReport.Properties values.
func formatLabelMap(m map[string]string) string {
if len(m) == 0 {
return ""
}
parts := make([]string, 0, len(m))
for k, v := range m {
parts = append(parts, fmt.Sprintf("%s=%s", k, v))
}
return strings.Join(parts, ",")
}
// SubscribeAdapter wraps a k8scache.Factory.Subscribe call into the
// engine's eventLite channel. Production code in the catalyst-api
// `cmd/api/main.go` wires this. Exposed here so tests can construct
// an Engine without depending on the full Factory.
type SubscribeAdapter func(kinds map[string]struct{}) (<-chan eventLite, func())
// EventLiteFromUnstructured is a test convenience to build eventLite
// values without exposing the unexported field. Used by the table
// tests in evaluators_test.go.
func EventLiteFromUnstructured(cluster, kind string, obj *unstructured.Unstructured) eventLite {
return eventLite{Cluster: cluster, Kind: kind, Object: obj}
}

View File

@ -0,0 +1,759 @@
// evaluators_test.go — unit tests for the 5 evaluators + the engine.
//
// EPIC-1 (#1096) Slice W2.
//
// Coverage matrix per evaluator:
//
// hpa : pass (HPA satisfies floor) + fail (currentReplicas < min)
// + skip (no HPA) + skip (Job-owned Pod)
// otel : pass (sidecar) + pass (auto-inject + Instrumentation CR)
// + fail (no sidecar, no annotation) + fail (annotation but no CR)
// hubble : skip (Hubble disabled) + pass (FlowsSeen=true)
// + fail (FlowsSeen=false) + warn (Probe error)
// harbor : pass (Harbor-prefixed) + pass (allowed-prefix)
// + fail (docker.io) + skip (HarborDomain empty)
// + skip (no containers)
// flux : pass (managed-by label) + pass (HelmRelease ownerRef)
// + pass (controller is Flux-owned) + fail (neither)
//
// Engine:
// - subscribe + tick path emits events to the Publisher
// - resolveSnapshot error path is logged + suppressed
// - cancellation cleanly shuts the engine down
//
// Plus 1 EvaluateAll concatenation test.
package evaluators
import (
"context"
"errors"
"io"
"log/slog"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
)
// quietLogger discards log output.
func quietLogger() *slog.Logger {
return slog.New(slog.NewJSONHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
}
// fakeSnapshot is an in-memory Snapshot keyed by canonical kind name.
type fakeSnapshot struct {
by map[string][]*unstructured.Unstructured
}
func (f *fakeSnapshot) List(kind string, _ labels.Selector) ([]*unstructured.Unstructured, error) {
if f.by == nil {
return nil, nil
}
v, ok := f.by[kind]
if !ok {
// Mirror the production Factory.List behaviour — unknown
// kind returns an error (the evaluator must skip rather
// than crash).
return nil, errKindNotRegistered
}
return v, nil
}
var errKindNotRegistered = errors.New("kind not registered")
// recorder is a Publisher that captures every emitted (cluster, report) pair.
type recorder struct {
mu sync.Mutex
entries []recordedEntry
}
type recordedEntry struct {
cluster string
report SyntheticReport
}
func (r *recorder) Publish(cluster string, report SyntheticReport) {
r.mu.Lock()
defer r.mu.Unlock()
r.entries = append(r.entries, recordedEntry{cluster: cluster, report: report})
}
func (r *recorder) Snapshot() []recordedEntry {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]recordedEntry, len(r.entries))
copy(out, r.entries)
return out
}
// ── builders ─────────────────────────────────────────────────────
func uPod(ns, name string, opts ...podOpt) *unstructured.Unstructured {
p := &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]any{
"namespace": ns,
"name": name,
"uid": "uid-" + ns + "-" + name,
},
"spec": map[string]any{
"containers": []any{},
},
}}
for _, o := range opts {
o(p)
}
return p
}
type podOpt func(*unstructured.Unstructured)
func withOwner(kind, name string) podOpt {
return func(p *unstructured.Unstructured) {
owners := p.GetOwnerReferences()
owners = append(owners, metav1.OwnerReference{Kind: kind, Name: name, APIVersion: "apps/v1"})
p.SetOwnerReferences(owners)
}
}
func withFluxOwner() podOpt {
return func(p *unstructured.Unstructured) {
owners := p.GetOwnerReferences()
owners = append(owners, metav1.OwnerReference{Kind: "HelmRelease", Name: "x", APIVersion: "helm.toolkit.fluxcd.io/v2"})
p.SetOwnerReferences(owners)
}
}
func withLabels(m map[string]string) podOpt {
return func(p *unstructured.Unstructured) { p.SetLabels(m) }
}
func withAnnotations(m map[string]string) podOpt {
return func(p *unstructured.Unstructured) { p.SetAnnotations(m) }
}
func withContainerImages(images ...string) podOpt {
return func(p *unstructured.Unstructured) {
raw := []any{}
for i, img := range images {
raw = append(raw, map[string]any{
"name": imageContainerName(i),
"image": img,
})
}
_ = unstructured.SetNestedSlice(p.Object, raw, "spec", "containers")
}
}
func imageContainerName(i int) string {
if i == 0 {
return "main"
}
return "side"
}
func uHPA(ns, name, targetKind, targetName string, minReplicas, currentReplicas int64) *unstructured.Unstructured {
return &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "autoscaling/v2",
"kind": "HorizontalPodAutoscaler",
"metadata": map[string]any{
"namespace": ns,
"name": name,
},
"spec": map[string]any{
"minReplicas": minReplicas,
"scaleTargetRef": map[string]any{
"kind": targetKind,
"name": targetName,
},
},
"status": map[string]any{
"currentReplicas": currentReplicas,
},
}}
}
func uReplicaSet(ns, name, deploymentName string) *unstructured.Unstructured {
rs := &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "apps/v1",
"kind": "ReplicaSet",
"metadata": map[string]any{
"namespace": ns,
"name": name,
},
}}
if deploymentName != "" {
rs.SetOwnerReferences([]metav1.OwnerReference{{
Kind: "Deployment",
Name: deploymentName,
APIVersion: "apps/v1",
}})
}
return rs
}
func uDeployment(ns, name string, fluxLabel bool) *unstructured.Unstructured {
d := &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]any{
"namespace": ns,
"name": name,
},
}}
if fluxLabel {
d.SetLabels(map[string]string{"app.kubernetes.io/managed-by": "flux"})
}
return d
}
func uInstrumentation(ns, name string) *unstructured.Unstructured {
return &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "opentelemetry.io/v1alpha1",
"kind": "Instrumentation",
"metadata": map[string]any{
"namespace": ns,
"name": name,
},
}}
}
// ── HPA evaluator ────────────────────────────────────────────────
func TestHPA_Pass_HPASatisfiesFloor(t *testing.T) {
hpa := uHPA("acme", "frontend-hpa", "Deployment", "frontend", 3, 5)
rs := uReplicaSet("acme", "frontend-7c5f", "frontend")
pod := uPod("acme", "frontend-7c5f-abc", withOwner("ReplicaSet", "frontend-7c5f"))
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
"horizontalpodautoscaler": {hpa},
"replicaset": {rs},
}}
rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod)
if len(rep) != 1 {
t.Fatalf("want 1 row got %d", len(rep))
}
if rep[0].Result != ResultPass {
t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message)
}
}
func TestHPA_Fail_CurrentBelowMin(t *testing.T) {
hpa := uHPA("acme", "frontend-hpa", "Deployment", "frontend", 3, 1)
rs := uReplicaSet("acme", "frontend-7c5f", "frontend")
pod := uPod("acme", "frontend-7c5f-abc", withOwner("ReplicaSet", "frontend-7c5f"))
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
"horizontalpodautoscaler": {hpa},
"replicaset": {rs},
}}
rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultFail {
t.Fatalf("want fail got %s (%s)", rep[0].Result, rep[0].Message)
}
if rep[0].Properties["minReplicas"] != "3" || rep[0].Properties["currentReplicas"] != "1" {
t.Fatalf("properties not populated: %+v", rep[0].Properties)
}
}
func TestHPA_Skip_NoHPA(t *testing.T) {
rs := uReplicaSet("acme", "control-plane-rs", "control-plane")
pod := uPod("acme", "control-plane-pod", withOwner("ReplicaSet", "control-plane-rs"))
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
"horizontalpodautoscaler": {},
"replicaset": {rs},
}}
rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultSkip {
t.Fatalf("want skip got %s", rep[0].Result)
}
}
func TestHPA_Skip_JobOwnedPod(t *testing.T) {
pod := uPod("acme", "batch-pod", withOwner("Job", "nightly-cron"))
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{}}
rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultSkip {
t.Fatalf("want skip for Job-owned Pod got %s", rep[0].Result)
}
}
func newHPAEvaluator() *HPAEvaluator {
cfg := Config{Logger: quietLogger()}
cfg.defaults()
return NewHPAEvaluator(cfg)
}
// ── OTel evaluator ───────────────────────────────────────────────
func TestOTel_Pass_Sidecar(t *testing.T) {
pod := uPod("acme", "app", withContainerImages(
"docker.io/library/nginx:1.25",
"otel/opentelemetry-collector-contrib:0.95",
))
snap := &fakeSnapshot{}
rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultPass {
t.Fatalf("want pass got %s", rep[0].Result)
}
if rep[0].Properties["detection"] != "sidecar" {
t.Fatalf("want sidecar detection got %v", rep[0].Properties)
}
}
func TestOTel_Pass_AutoInjectAnnotation(t *testing.T) {
pod := uPod("acme", "app",
withContainerImages("docker.io/library/nginx:1.25"),
withAnnotations(map[string]string{
"instrumentation.opentelemetry.io/inject-go": "true",
}),
)
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
"instrumentation.opentelemetry.io": {uInstrumentation("acme", "default")},
}}
rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultPass {
t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message)
}
if rep[0].Properties["detection"] != "auto-inject" {
t.Fatalf("want auto-inject got %v", rep[0].Properties)
}
}
func TestOTel_Fail_NoSidecarNoAnnotation(t *testing.T) {
pod := uPod("acme", "app", withContainerImages("docker.io/library/nginx:1.25"))
snap := &fakeSnapshot{}
rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultFail {
t.Fatalf("want fail got %s", rep[0].Result)
}
}
func TestOTel_Fail_AnnotationButNoInstrumentationCR(t *testing.T) {
pod := uPod("acme", "app",
withContainerImages("docker.io/library/nginx:1.25"),
withAnnotations(map[string]string{
"instrumentation.opentelemetry.io/inject-go": "true",
}),
)
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
"instrumentation.opentelemetry.io": {}, // operator installed, no CR in this ns
}}
rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultFail {
t.Fatalf("want fail (orphan annotation) got %s", rep[0].Result)
}
if rep[0].Properties["detection"] != "auto-inject-orphan" {
t.Fatalf("want auto-inject-orphan got %v", rep[0].Properties)
}
}
func newOTelEvaluator() *OTelEvaluator {
cfg := Config{Logger: quietLogger()}
cfg.defaults()
return NewOTelEvaluator(cfg)
}
// ── Hubble evaluator ─────────────────────────────────────────────
func TestHubble_Skip_Disabled(t *testing.T) {
pod := uPod("acme", "app")
rep := newHubbleEvaluator(false, nil).Evaluate(context.Background(), nil, pod)
if rep[0].Result != ResultSkip {
t.Fatalf("want skip when disabled got %s", rep[0].Result)
}
}
func TestHubble_Pass_FlowsSeen(t *testing.T) {
pod := uPod("acme", "app")
probe := &fakeHubbleProbe{seen: true}
rep := newHubbleEvaluator(true, probe).Evaluate(context.Background(), nil, pod)
if rep[0].Result != ResultPass {
t.Fatalf("want pass got %s", rep[0].Result)
}
}
func TestHubble_Fail_NoFlows(t *testing.T) {
pod := uPod("acme", "app")
probe := &fakeHubbleProbe{seen: false}
rep := newHubbleEvaluator(true, probe).Evaluate(context.Background(), nil, pod)
if rep[0].Result != ResultFail {
t.Fatalf("want fail got %s", rep[0].Result)
}
}
func TestHubble_Warn_ProbeError(t *testing.T) {
pod := uPod("acme", "app")
probe := &fakeHubbleProbe{err: errors.New("connection refused")}
rep := newHubbleEvaluator(true, probe).Evaluate(context.Background(), nil, pod)
if rep[0].Result != ResultWarn {
t.Fatalf("want warn got %s", rep[0].Result)
}
}
type fakeHubbleProbe struct {
seen bool
err error
}
func (f *fakeHubbleProbe) FlowsSeen(_ context.Context, _ *unstructured.Unstructured, _ int64) (bool, error) {
if f.err != nil {
return false, f.err
}
return f.seen, nil
}
func newHubbleEvaluator(enabled bool, probe HubbleProbe) *HubbleEvaluator {
cfg := Config{Logger: quietLogger(), HubbleEnabled: enabled}
cfg.defaults()
e := NewHubbleEvaluator(cfg)
e.Probe = probe
return e
}
// ── Harbor evaluator ─────────────────────────────────────────────
func TestHarbor_Pass_HarborPrefixed(t *testing.T) {
pod := uPod("acme", "app", withContainerImages(
"harbor.omantel.omani.works/proxy-ghcr/openova-io/openova/website:abc123",
))
rep := newHarborEvaluator("harbor.omantel.omani.works", nil).Evaluate(context.Background(), nil, pod)
if rep[0].Result != ResultPass {
t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message)
}
}
func TestHarbor_Pass_AllowedPrefix(t *testing.T) {
pod := uPod("acme", "app", withContainerImages(
"mirror.openova.io/internal/sidecar:1.0",
))
rep := newHarborEvaluator("harbor.omantel.omani.works",
[]string{"mirror.openova.io/"}).Evaluate(context.Background(), nil, pod)
if rep[0].Result != ResultPass {
t.Fatalf("want pass got %s", rep[0].Result)
}
}
func TestHarbor_Fail_DockerHub(t *testing.T) {
pod := uPod("acme", "app", withContainerImages(
"harbor.omantel.omani.works/proxy-ghcr/openova-io/openova/api:abc",
"docker.io/library/redis:7.0",
))
rep := newHarborEvaluator("harbor.omantel.omani.works", nil).Evaluate(context.Background(), nil, pod)
if rep[0].Result != ResultFail {
t.Fatalf("want fail got %s", rep[0].Result)
}
if rep[0].Properties["rejectedImages"] != "docker.io/library/redis:7.0" {
t.Fatalf("rejectedImages wrong: %v", rep[0].Properties)
}
}
func TestHarbor_Skip_DomainEmpty(t *testing.T) {
pod := uPod("acme", "app", withContainerImages("docker.io/library/nginx:1.25"))
rep := newHarborEvaluator("", nil).Evaluate(context.Background(), nil, pod)
if rep[0].Result != ResultSkip {
t.Fatalf("want skip got %s", rep[0].Result)
}
}
func TestHarbor_Skip_NoContainers(t *testing.T) {
pod := uPod("acme", "app") // empty containers slice
rep := newHarborEvaluator("harbor.openova.io", nil).Evaluate(context.Background(), nil, pod)
if rep[0].Result != ResultSkip {
t.Fatalf("want skip got %s", rep[0].Result)
}
}
func newHarborEvaluator(domain string, allowed []string) *HarborEvaluator {
cfg := Config{
Logger: quietLogger(),
HarborDomain: domain,
HarborAllowedPrefixes: allowed,
}
cfg.defaults()
return NewHarborEvaluator(cfg)
}
// ── Flux evaluator ───────────────────────────────────────────────
func TestFlux_Pass_LabelOnPod(t *testing.T) {
pod := uPod("acme", "app", withLabels(map[string]string{
"app.kubernetes.io/managed-by": "flux",
}))
snap := &fakeSnapshot{}
rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultPass {
t.Fatalf("want pass got %s", rep[0].Result)
}
}
func TestFlux_Pass_HelmReleaseOwnerRef(t *testing.T) {
pod := uPod("acme", "app", withFluxOwner())
snap := &fakeSnapshot{}
rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultPass {
t.Fatalf("want pass got %s", rep[0].Result)
}
}
func TestFlux_Pass_ControllerIsFluxOwned(t *testing.T) {
dep := uDeployment("acme", "frontend", true)
rs := uReplicaSet("acme", "frontend-7c5f", "frontend")
pod := uPod("acme", "frontend-pod", withOwner("ReplicaSet", "frontend-7c5f"))
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
"deployment": {dep},
"replicaset": {rs},
}}
rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultPass {
t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message)
}
if rep[0].Properties["detection"] != "controller-flux-owned" {
t.Fatalf("want controller-flux-owned detection got %v", rep[0].Properties)
}
}
func TestFlux_Fail_NoLabelNoOwnerRef(t *testing.T) {
dep := uDeployment("acme", "frontend", false)
rs := uReplicaSet("acme", "frontend-7c5f", "frontend")
pod := uPod("acme", "frontend-pod", withOwner("ReplicaSet", "frontend-7c5f"))
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
"deployment": {dep},
"replicaset": {rs},
}}
rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod)
if rep[0].Result != ResultFail {
t.Fatalf("want fail got %s", rep[0].Result)
}
}
func newFluxEvaluator() *FluxEvaluator {
cfg := Config{Logger: quietLogger()}
cfg.defaults()
return NewFluxEvaluator(cfg)
}
// ── EvaluateAll concatenation ────────────────────────────────────
func TestEvaluateAll_ConcatenatesAndPreservesOrder(t *testing.T) {
pod := uPod("acme", "app", withContainerImages("harbor.openova.io/proxy/x:1"))
snap := &fakeSnapshot{}
cfg := Config{Logger: quietLogger(), HarborDomain: "harbor.openova.io"}
cfg.defaults()
evals := []Evaluator{
NewHarborEvaluator(cfg),
NewFluxEvaluator(cfg),
}
rows := EvaluateAll(context.Background(), snap, pod, evals)
if len(rows) != 2 {
t.Fatalf("want 2 rows got %d", len(rows))
}
if rows[0].Policy != "harbor-proxy-pull" || rows[1].Policy != "flux-managed" {
t.Fatalf("order not preserved: %v %v", rows[0].Policy, rows[1].Policy)
}
}
func TestEvaluateAll_SkipsNonPodTargets(t *testing.T) {
dep := uDeployment("acme", "x", false)
cfg := Config{Logger: quietLogger(), HarborDomain: "harbor.openova.io"}
cfg.defaults()
rows := EvaluateAll(context.Background(), &fakeSnapshot{}, dep, []Evaluator{
NewHarborEvaluator(cfg),
NewFluxEvaluator(cfg),
})
if len(rows) != 0 {
t.Fatalf("Pod-only evaluators should ignore Deployments — got %d rows", len(rows))
}
}
// ── Engine ───────────────────────────────────────────────────────
func TestEngine_PublishesOnSubscribedEvent(t *testing.T) {
rec := &recorder{}
pod := uPod("acme", "app", withContainerImages("harbor.openova.io/proxy/x:1"))
eventCh := make(chan eventLite, 4)
cfg := Config{
Logger: quietLogger(),
HarborDomain: "harbor.openova.io",
TickInterval: 0, // event-only path for this test
Now: func() time.Time { return time.Unix(1700000000, 0) },
}
eng, err := NewEngine(cfg,
[]Evaluator{NewHarborEvaluator(cfg)},
rec,
func(_ string) (Snapshot, []string, error) { return &fakeSnapshot{}, nil, nil },
func(_ map[string]struct{}) (<-chan eventLite, func()) {
return eventCh, func() {}
},
)
if err != nil {
t.Fatalf("NewEngine: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() { _ = eng.Run(ctx); close(done) }()
eventCh <- eventLite{Cluster: "alpha", Kind: "pod", Object: pod}
deadline := time.After(2 * time.Second)
for {
select {
case <-deadline:
t.Fatalf("recorder never saw the event-driven publish")
default:
if len(rec.Snapshot()) >= 1 {
cancel()
<-done
if rec.Snapshot()[0].cluster != "alpha" {
t.Fatalf("cluster id not propagated")
}
if rec.Snapshot()[0].report.Policy != "harbor-proxy-pull" {
t.Fatalf("unexpected policy %q", rec.Snapshot()[0].report.Policy)
}
if rec.Snapshot()[0].report.Time.IsZero() {
t.Fatalf("report timestamp not stamped by engine")
}
return
}
time.Sleep(10 * time.Millisecond)
}
}
}
func TestEngine_TickerEvaluatesAllPodsAcrossClusters(t *testing.T) {
rec := &recorder{}
pod1 := uPod("acme", "p1", withContainerImages("harbor.openova.io/proxy/x:1"))
pod2 := uPod("widgets", "p2", withContainerImages("docker.io/library/nginx:1"))
snaps := map[string]*fakeSnapshot{
"alpha": {by: map[string][]*unstructured.Unstructured{"pod": {pod1}}},
"beta": {by: map[string][]*unstructured.Unstructured{"pod": {pod2}}},
}
eventCh := make(chan eventLite, 1)
cfg := Config{
Logger: quietLogger(),
HarborDomain: "harbor.openova.io",
TickInterval: 50 * time.Millisecond,
Now: func() time.Time { return time.Unix(1700000000, 0) },
}
eng, err := NewEngine(cfg,
[]Evaluator{NewHarborEvaluator(cfg)},
rec,
func(id string) (Snapshot, []string, error) {
if id == "" {
return nil, []string{"alpha", "beta"}, nil
}
s, ok := snaps[id]
if !ok {
return nil, nil, errors.New("no snap")
}
return s, nil, nil
},
func(_ map[string]struct{}) (<-chan eventLite, func()) {
return eventCh, func() {}
},
)
if err != nil {
t.Fatalf("NewEngine: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel()
done := make(chan struct{})
go func() { _ = eng.Run(ctx); close(done) }()
deadline := time.After(1 * time.Second)
for {
select {
case <-deadline:
t.Fatalf("ticker never produced both pass + fail rows; got %v", rec.Snapshot())
default:
entries := rec.Snapshot()
seenPass, seenFail := false, false
for _, e := range entries {
if e.cluster == "alpha" && e.report.Result == ResultPass {
seenPass = true
}
if e.cluster == "beta" && e.report.Result == ResultFail {
seenFail = true
}
}
if seenPass && seenFail {
cancel()
<-done
return
}
time.Sleep(20 * time.Millisecond)
}
}
}
func TestEngine_RejectsInvalidConfig(t *testing.T) {
cases := map[string]Config{
"missing-logger": {},
}
for name, cfg := range cases {
t.Run(name, func(t *testing.T) {
_, err := NewEngine(cfg, []Evaluator{}, &recorder{}, nil, nil)
if err == nil {
t.Fatalf("want error for %q, got nil", name)
}
})
}
// Logger-OK but missing other deps → still error.
cfgOK := Config{Logger: quietLogger()}
if _, err := NewEngine(cfgOK, []Evaluator{}, nil, nil, nil); err == nil {
t.Fatalf("missing publisher should error")
}
if _, err := NewEngine(cfgOK, []Evaluator{}, &recorder{}, nil, nil); err == nil {
t.Fatalf("missing resolveSnapshot should error")
}
if _, err := NewEngine(cfgOK, []Evaluator{}, &recorder{},
func(string) (Snapshot, []string, error) { return nil, nil, nil },
nil); err == nil {
t.Fatalf("missing subscribe should error")
}
if _, err := NewEngine(cfgOK, nil, &recorder{},
func(string) (Snapshot, []string, error) { return nil, nil, nil },
func(map[string]struct{}) (<-chan eventLite, func()) { return nil, nil }); err == nil {
t.Fatalf("empty evaluator slice should error")
}
}
func TestEngine_DoubleRunRefused(t *testing.T) {
cfg := Config{Logger: quietLogger()}
cfg.defaults()
rec := &recorder{}
eventCh := make(chan eventLite)
eng, err := NewEngine(cfg, []Evaluator{NewFluxEvaluator(cfg)}, rec,
func(string) (Snapshot, []string, error) { return &fakeSnapshot{}, nil, nil },
func(map[string]struct{}) (<-chan eventLite, func()) {
return eventCh, func() {}
},
)
if err != nil {
t.Fatalf("NewEngine: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
go func() { _ = eng.Run(ctx) }()
time.Sleep(20 * time.Millisecond)
if err := eng.Run(context.Background()); err == nil {
t.Fatalf("second Run should error")
}
cancel()
}
// Sanity that the package's helper conversions don't drop fields.
func TestEventLiteFromUnstructured(t *testing.T) {
pod := uPod("ns", "n")
ev := EventLiteFromUnstructured("alpha", "pod", pod)
if ev.Cluster != "alpha" || ev.Kind != "pod" || ev.Object != pod {
t.Fatalf("EventLiteFromUnstructured dropped a field: %+v", ev)
}
}

View File

@ -0,0 +1,165 @@
// flux.go — Flux-managed evaluator.
//
// EPIC-1 (#1096) §4.3 row "Flux-managed (GitOps)".
//
// Logic (per `02-W-watcher-extension.md` brief):
//
// - Pass if the target carries the well-known
// `app.kubernetes.io/managed-by: flux` label.
// - Pass if any ownerReference has APIVersion containing
// `helm.toolkit.fluxcd.io` (HelmRelease) or
// `kustomize.toolkit.fluxcd.io` (Kustomization).
// - Fail otherwise.
//
// The evaluator runs against Pods (the per-event trigger path) but
// returns a SyntheticReport that points at the Pod's controller
// owner (Deployment / StatefulSet / DaemonSet) when one is
// resolvable. This matches the rest of the pipeline — score
// aggregation rolls up by workload, not by individual Pod.
//
// Pods that are themselves Flux-owned (rare but possible — Flux can
// manage a static Pod manifest) also pass.
//
// Both label key + Flux-owner-suffix are configurable via
// Config.FluxManagedByLabel / Config.FluxManagedByValue. Per
// docs/INVIOLABLE-PRINCIPLES.md #4.
package evaluators
import (
"context"
"strings"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
)
// FluxEvaluator implements `policy=flux-managed`.
type FluxEvaluator struct {
ManagedByLabel string
ManagedByValue string
}
// NewFluxEvaluator builds a FluxEvaluator from cfg.
func NewFluxEvaluator(cfg Config) *FluxEvaluator {
return &FluxEvaluator{
ManagedByLabel: cfg.FluxManagedByLabel,
ManagedByValue: cfg.FluxManagedByValue,
}
}
func (FluxEvaluator) Name() string { return "flux-managed" }
func (f *FluxEvaluator) Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport {
if !isPod(target) {
return nil
}
// 1. Direct check on the Pod itself.
if f.targetIsFluxOwned(target) {
return []SyntheticReport{{
Policy: f.Name(),
Rule: f.Name(),
Result: ResultPass,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "Pod carries flux managed-by label or HelmRelease / Kustomization owner",
Properties: map[string]string{"detection": "pod-direct"},
}}
}
// 2. Pod isn't directly Flux-managed — chase its controller.
owner := f.lookupController(snap, target)
if owner != nil && f.targetIsFluxOwned(owner) {
return []SyntheticReport{{
Policy: f.Name(),
Rule: f.Name(),
Result: ResultPass,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "Pod's controller " + owner.GetKind() + "/" + owner.GetName() + " is Flux-managed",
Properties: map[string]string{
"detection": "controller-flux-owned",
"controller": owner.GetKind() + "/" + owner.GetName(),
"controllerNs": owner.GetNamespace(),
},
}}
}
return []SyntheticReport{{
Policy: f.Name(),
Rule: f.Name(),
Result: ResultFail,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "Pod and its controller carry no flux managed-by label and no Flux ownerRef",
}}
}
// targetIsFluxOwned applies the label + ownerRef check to a single
// object. Pure function — no snapshot reads.
func (f *FluxEvaluator) targetIsFluxOwned(target *unstructured.Unstructured) bool {
if target == nil {
return false
}
// Label check.
if v, ok := target.GetLabels()[f.ManagedByLabel]; ok && strings.EqualFold(v, f.ManagedByValue) {
return true
}
// ownerRef check — HelmRelease / Kustomization from
// helm.toolkit.fluxcd.io / kustomize.toolkit.fluxcd.io.
for _, ref := range target.GetOwnerReferences() {
if strings.Contains(ref.APIVersion, "fluxcd.io") {
return true
}
}
return false
}
// lookupController follows the Pod's controller ownerRef one hop
// (Pod → ReplicaSet / StatefulSet / DaemonSet) and where applicable
// chases the next hop (ReplicaSet → Deployment). Returns nil when
// the controller can't be located in the snapshot.
func (f *FluxEvaluator) lookupController(snap Snapshot, pod *unstructured.Unstructured) *unstructured.Unstructured {
for _, ref := range pod.GetOwnerReferences() {
switch ref.Kind {
case "Deployment":
return findInList(snap, "deployment", pod.GetNamespace(), ref.Name)
case "StatefulSet":
return findInList(snap, "statefulset", pod.GetNamespace(), ref.Name)
case "DaemonSet":
return findInList(snap, "daemonset", pod.GetNamespace(), ref.Name)
case "ReplicaSet":
rs := findInList(snap, "replicaset", pod.GetNamespace(), ref.Name)
if rs == nil {
return nil
}
// First check the RS itself — sometimes Flux
// produces ReplicaSets directly without a Deployment.
if f.targetIsFluxOwned(rs) {
return rs
}
for _, rsOwner := range rs.GetOwnerReferences() {
if rsOwner.Kind == "Deployment" {
return findInList(snap, "deployment", pod.GetNamespace(), rsOwner.Name)
}
}
return rs
}
}
return nil
}
// findInList walks the snapshot and returns the first object matching
// (kind, namespace, name). Returns nil on miss or list error.
func findInList(snap Snapshot, kindName, namespace, name string) *unstructured.Unstructured {
list, err := snap.List(kindName, labels.Everything())
if err != nil {
return nil
}
for _, obj := range list {
if obj.GetNamespace() == namespace && obj.GetName() == name {
return obj
}
}
return nil
}

View File

@ -0,0 +1,124 @@
// harbor.go — image-via-Harbor-proxy evaluator.
//
// EPIC-1 (#1096) §4.3 row "Images via Harbor proxy".
//
// Logic (per `02-W-watcher-extension.md` brief):
//
// - For each container in the Pod (containers + initContainers +
// ephemeralContainers), parse the image reference.
// - Pass if image starts with `<HarborDomain>/proxy-ghcr/`,
// `<HarborDomain>/<org>/`, or any of the operator-supplied
// HarborAllowedPrefixes (e.g. internal mirrors).
// - Fail if the image references docker.io, ghcr.io, quay.io, etc.
// directly — every image must traverse the Sovereign's Harbor
// proxy for cosign verification + air-gap fallback.
// - When Config.HarborDomain is empty (Sovereign without Harbor
// enabled) the evaluator skips every Pod.
//
// Per `feedback_never_hardcode_urls.md` the Harbor domain comes from
// runtime config — the Sovereign provisions Harbor at
// `harbor.<sovereign-domain>` and stamps the same value into the
// catalyst-api's CATALYST_HARBOR_DOMAIN env. Tests inject directly
// via Config.HarborDomain.
package evaluators
import (
"context"
"strings"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
// HarborEvaluator implements `policy=harbor-proxy-pull`.
type HarborEvaluator struct {
Domain string
AllowedPrefixes []string
}
// NewHarborEvaluator builds a HarborEvaluator from cfg.
func NewHarborEvaluator(cfg Config) *HarborEvaluator {
return &HarborEvaluator{
Domain: cfg.HarborDomain,
AllowedPrefixes: append([]string(nil), cfg.HarborAllowedPrefixes...),
}
}
func (HarborEvaluator) Name() string { return "harbor-proxy-pull" }
func (h *HarborEvaluator) Evaluate(ctx context.Context, _ Snapshot, target *unstructured.Unstructured) []SyntheticReport {
if !isPod(target) {
return nil
}
if h.Domain == "" {
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultSkip,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "Harbor not enabled on this Sovereign — evaluator skipped",
}}
}
imgs := containerImages(target)
if len(imgs) == 0 {
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultSkip,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "Pod has no containers — nothing to check",
}}
}
domainPrefix := h.Domain + "/"
rejected := []string{}
for _, img := range imgs {
if h.imageAccepted(img, domainPrefix) {
continue
}
rejected = append(rejected, img)
}
if len(rejected) == 0 {
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultPass,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "all container images via Harbor proxy",
}}
}
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultFail,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "container images bypass Harbor proxy: " + strings.Join(rejected, ", "),
Properties: map[string]string{
"rejectedImages": strings.Join(rejected, ","),
"harborDomain": h.Domain,
},
}}
}
// imageAccepted returns true when the image string starts with the
// Harbor domain prefix or any of the allowed-prefix entries.
//
// Acceptance is BYTE-PREFIX, not substring — `harbor.evil.com/`
// must not match `harbor.openova.io/` because of a trailing-slash
// rule. The check is case-sensitive (image refs are case-sensitive
// in OCI).
func (h *HarborEvaluator) imageAccepted(img, harborPrefix string) bool {
if strings.HasPrefix(img, harborPrefix) {
return true
}
for _, p := range h.AllowedPrefixes {
if p != "" && strings.HasPrefix(img, p) {
return true
}
}
return false
}

View File

@ -0,0 +1,207 @@
// hpa.go — HPA-effective evaluator.
//
// EPIC-1 (#1096) §4.3 row "Autoscaler (HPA/VPA) effective".
//
// Logic (per `02-W-watcher-extension.md` brief):
// - If the target is a Pod, walk owner chain Pod → ReplicaSet →
// Deployment to identify the workload owner (StatefulSet /
// DaemonSet are also acceptable owners). Pods owned by Jobs /
// CronJobs / standalone are out-of-scope (skip).
// - Find an HPA (autoscaling/v2) whose spec.scaleTargetRef points
// at the workload owner.
// - No HPA → result=skip (HPA isn't applicable to this workload —
// e.g. a singleton control-plane pod).
// - HPA present but currentReplicas < minReplicas → result=fail
// (the autoscaler isn't keeping the floor).
// - HPA present + currentReplicas >= minReplicas → result=pass.
//
// The score aggregator (slice S1) drops `skip` rows from the
// denominator so this evaluator does NOT punish workloads that
// legitimately have no HPA.
package evaluators
import (
"context"
"fmt"
"strings"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
)
// HPAEvaluator implements `policy=hpa-effective`.
type HPAEvaluator struct {
// MinFloor — synthetic floor below which the evaluator emits FAIL
// even when the HPA reports happiness. Defaults to Config.HPAMinReplicas
// at engine wiring.
MinFloor int32
}
// NewHPAEvaluator constructs an HPAEvaluator with values copied from cfg.
func NewHPAEvaluator(cfg Config) *HPAEvaluator {
return &HPAEvaluator{MinFloor: cfg.HPAMinReplicas}
}
// Name — canonical policy id.
func (HPAEvaluator) Name() string { return "hpa-effective" }
func (h *HPAEvaluator) Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport {
if !isPod(target) {
return nil
}
// 1. Resolve workload owner via Pod → ReplicaSet → Deployment chain.
ownerKind, ownerName, ownerNamespace := resolveWorkloadOwner(snap, target)
if ownerKind == "" || ownerName == "" {
// Standalone Pod / Job-owned — out of scope.
return []SyntheticReport{newSkip(h.Name(), target, "pod has no controller workload owner")}
}
// 2. Find an HPA whose scaleTargetRef matches.
hpa := findHPAFor(snap, ownerKind, ownerName, ownerNamespace)
if hpa == nil {
return []SyntheticReport{newSkip(h.Name(), target, fmt.Sprintf("no HPA targets %s/%s in %s", ownerKind, ownerName, ownerNamespace))}
}
// 3. Compare currentReplicas vs minReplicas (and the synthetic
// floor MinFloor).
min, _ := hpaMinReplicas(hpa)
current, _ := hpaCurrentReplicas(hpa)
if min < h.MinFloor {
min = h.MinFloor
}
props := map[string]string{
"hpaName": hpa.GetName(),
"hpaNamespace": hpa.GetNamespace(),
"minReplicas": fmt.Sprintf("%d", min),
"currentReplicas": fmt.Sprintf("%d", current),
"workloadKind": ownerKind,
"workloadName": ownerName,
"workloadNamespace": ownerNamespace,
}
if current < min {
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultFail,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: fmt.Sprintf("HPA %s/%s reports currentReplicas=%d below minReplicas=%d", hpa.GetNamespace(), hpa.GetName(), current, min),
Properties: props,
}}
}
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultPass,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: fmt.Sprintf("HPA %s/%s satisfies minReplicas=%d (current=%d)", hpa.GetNamespace(), hpa.GetName(), min, current),
Properties: props,
}}
}
// resolveWorkloadOwner walks Pod → controller → (ReplicaSet →
// Deployment) and returns (kind, name, namespace) of the top-level
// workload. Returns ("","","") for standalone or Job-owned Pods.
func resolveWorkloadOwner(snap Snapshot, pod *unstructured.Unstructured) (string, string, string) {
if pod == nil {
return "", "", ""
}
ns := pod.GetNamespace()
for _, ref := range pod.GetOwnerReferences() {
// Direct workload owners — StatefulSet, DaemonSet are
// terminal here.
switch ref.Kind {
case "StatefulSet", "DaemonSet", "Deployment":
return ref.Kind, ref.Name, ns
case "ReplicaSet":
// Hop through ReplicaSet to Deployment if the RS itself
// has a Deployment owner. We look up the RS in the
// snapshot — if it isn't cached, fall back to "ReplicaSet"
// directly (HPA can target ReplicaSet too, rarely).
rsList, err := snap.List("replicaset", labels.Everything())
if err == nil {
for _, rs := range rsList {
if rs.GetName() == ref.Name && rs.GetNamespace() == ns {
for _, rsOwner := range rs.GetOwnerReferences() {
if rsOwner.Kind == "Deployment" {
return "Deployment", rsOwner.Name, ns
}
}
}
}
}
return "ReplicaSet", ref.Name, ns
case "Job":
// Job-owned pods are out of scope — return empty and let
// the caller emit skip.
return "", "", ""
}
}
return "", "", ""
}
// findHPAFor scans every HPA in the snapshot and returns the first
// one whose spec.scaleTargetRef matches (kind, name, namespace).
// Returns nil when no match.
//
// HPA is a namespace-scoped resource; we iterate every namespace's
// HPA in the cache. Cost is O(hpa-count); typical clusters have
// O(10) HPAs so this is cheap.
//
// Kind name "horizontalpodautoscaler" — the canonical k8scache name
// for autoscaling/v2 HPAs (registered by the operator via the kinds
// ConfigMap on Sovereigns that opt-in; absent on others). When the
// kind is not registered the Snapshot.List returns an error and we
// gracefully report nil (caller emits skip).
func findHPAFor(snap Snapshot, ownerKind, ownerName, ownerNamespace string) *unstructured.Unstructured {
hpaList, err := snap.List("horizontalpodautoscaler", labels.Everything())
if err != nil {
return nil
}
for _, hpa := range hpaList {
if hpa.GetNamespace() != ownerNamespace {
continue
}
ref, found, _ := unstructured.NestedMap(hpa.Object, "spec", "scaleTargetRef")
if !found {
continue
}
kind, _ := ref["kind"].(string)
name, _ := ref["name"].(string)
if strings.EqualFold(kind, ownerKind) && name == ownerName {
return hpa
}
}
return nil
}
// hpaMinReplicas extracts spec.minReplicas (int32 default 1).
func hpaMinReplicas(hpa *unstructured.Unstructured) (int32, bool) {
v, found, err := unstructured.NestedInt64(hpa.Object, "spec", "minReplicas")
if err != nil || !found {
return 1, false
}
return int32(v), true
}
// hpaCurrentReplicas extracts status.currentReplicas (int32 default 0).
func hpaCurrentReplicas(hpa *unstructured.Unstructured) (int32, bool) {
v, found, err := unstructured.NestedInt64(hpa.Object, "status", "currentReplicas")
if err != nil || !found {
return 0, false
}
return int32(v), true
}
// newSkip — small helper so each branch above stays readable.
func newSkip(policy string, target *unstructured.Unstructured, msg string) SyntheticReport {
return SyntheticReport{
Policy: policy,
Rule: policy,
Result: ResultSkip,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: msg,
}
}

View File

@ -0,0 +1,127 @@
// hubble.go — Hubble flow-observed evaluator.
//
// EPIC-1 (#1096) §4.3 row "Hubble flows observed (last 5m)".
//
// **Status — DEFERRED HUBBLE CLIENT**:
//
// The brief permits this evaluator to defer when the Hubble Observer
// gRPC client (`github.com/cilium/cilium/api/v1/observer`) is not in
// the catalyst-api dependency graph. As of this slice (W2) the
// catalyst-api has NOT pulled `github.com/cilium/cilium` — adding it
// would import the full Cilium operator graph (~30 transitive Go
// modules; ~80MB on `go mod download`). The Coordinator's standing
// rule on dep weight is to land the synthetic-row plumbing first +
// wire the Hubble client in a follow-up slice once the score
// aggregator has a real consumer.
//
// Behaviour TODAY:
//
// - When Config.HubbleEnabled == false (the default), the evaluator
// emits result=skip for every Pod. Score aggregator drops skip
// rows from the denominator → no false negatives on the score.
// - When Config.HubbleEnabled == true (after the follow-up wires
// the client), the evaluator queries the Observer API for the
// last Config.HubbleLookbackWindow (default 5min) of flows
// touching the Pod's IP. Pass when ≥1 flow seen, fail otherwise.
//
// Wiring of the actual Hubble client lives in the follow-up issue;
// this file documents the contract so the consumer (slice S1) can
// plan around the skip-then-pass transition without re-tooling.
package evaluators
import (
"context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
// HubbleEvaluator implements `policy=hubble-flows-seen`.
type HubbleEvaluator struct {
Enabled bool
// Lookback — passed to the Observer API client when Enabled.
// Stored so a future implementation reads it; not used today.
LookbackSeconds int64
// Probe — pluggable client used in the Enabled branch.
// Production wiring (follow-up slice) sets this to a thin
// wrapper around the cilium/cilium observer gRPC client. Tests
// inject a fake to exercise pass/fail without the Hubble dep.
Probe HubbleProbe
}
// HubbleProbe is the pluggable interface the evaluator calls when
// Enabled. The implementation queries the Hubble Observer API for
// flows touching the given Pod within the given lookback window.
type HubbleProbe interface {
// FlowsSeen returns true if Hubble has observed at least one flow
// to/from the Pod's IP/UID in the last `lookbackSeconds` seconds.
// Errors fail open (pass) — the evaluator does not punish a Pod
// for an unreachable Hubble Observer; ops can detect Hubble
// outages via the cluster-level alarm.
FlowsSeen(ctx context.Context, pod *unstructured.Unstructured, lookbackSeconds int64) (bool, error)
}
// NewHubbleEvaluator builds a HubbleEvaluator from cfg.
func NewHubbleEvaluator(cfg Config) *HubbleEvaluator {
return &HubbleEvaluator{
Enabled: cfg.HubbleEnabled,
LookbackSeconds: int64(cfg.HubbleLookbackWindow.Seconds()),
}
}
func (HubbleEvaluator) Name() string { return "hubble-flows-seen" }
func (h *HubbleEvaluator) Evaluate(ctx context.Context, _ Snapshot, target *unstructured.Unstructured) []SyntheticReport {
if !isPod(target) {
return nil
}
if !h.Enabled || h.Probe == nil {
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultSkip,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "Hubble UI not enabled on this Sovereign — evaluator skipped",
Properties: map[string]string{
"reason": "hubble-disabled",
},
}}
}
seen, err := h.Probe.FlowsSeen(ctx, target, h.LookbackSeconds)
if err != nil {
// Fail open — surface as warn so the score isn't depressed
// by a transient Hubble Observer outage.
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultWarn,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "Hubble Observer unreachable — flow check inconclusive",
Properties: map[string]string{
"err": err.Error(),
},
}}
}
if seen {
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultPass,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "Hubble observed flows to/from Pod within lookback window",
}}
}
return []SyntheticReport{{
Policy: h.Name(),
Rule: h.Name(),
Result: ResultFail,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "Hubble observed no flows to/from Pod within lookback window",
}}
}

View File

@ -0,0 +1,130 @@
// otel.go — OTel auto-instrumentation evaluator.
//
// EPIC-1 (#1096) §4.3 row "OTel auto-instrumentation present".
//
// Logic (per `02-W-watcher-extension.md` brief):
//
// 1. For each Pod: check container list for an `otel-collector`
// sidecar (heuristic: image substring matches
// Config.OTelSidecarImageMatch — default `opentelemetry-collector`).
// If found → result=pass.
// 2. OTel Operator auto-injection: check if the Pod has an annotation
// with prefix `instrumentation.opentelemetry.io/inject-` whose
// value is `true`. Combined with an Instrumentation CR existing in
// the Pod's namespace → result=pass.
// 3. Neither → result=fail.
//
// The Instrumentation CR is a namespaced opentelemetry.io/v1alpha1
// resource. When the kind is not registered in the k8scache (the
// bp-otel-operator chart isn't installed on this Sovereign) the
// evaluator falls back to sidecar-only detection — annotation alone
// without the operator running is meaningless.
package evaluators
import (
"context"
"strings"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
)
// OTelEvaluator implements `policy=otel-injected`.
type OTelEvaluator struct {
SidecarImageMatch string
InjectAnnotationPrefix string
InstrumentationKindName string
}
// NewOTelEvaluator builds an OTelEvaluator from cfg.
func NewOTelEvaluator(cfg Config) *OTelEvaluator {
return &OTelEvaluator{
SidecarImageMatch: cfg.OTelSidecarImageMatch,
InjectAnnotationPrefix: cfg.OTelInjectAnnotationPrefix,
InstrumentationKindName: cfg.OTelInstrumentationKind,
}
}
func (OTelEvaluator) Name() string { return "otel-injected" }
func (o *OTelEvaluator) Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport {
if !isPod(target) {
return nil
}
// 1. Sidecar check — substring match against container images.
for _, img := range containerImages(target) {
if o.SidecarImageMatch != "" && strings.Contains(img, o.SidecarImageMatch) {
return []SyntheticReport{{
Policy: o.Name(),
Rule: o.Name(),
Result: ResultPass,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "OTel collector sidecar detected (image=" + img + ")",
Properties: map[string]string{
"detection": "sidecar",
"image": img,
},
}}
}
}
// 2. Auto-inject path — Pod annotation + Instrumentation CR in
// the same namespace.
annots := target.GetAnnotations()
injectAnnotation := ""
for k, v := range annots {
if strings.HasPrefix(k, o.InjectAnnotationPrefix) && strings.EqualFold(v, "true") {
injectAnnotation = k
break
}
}
if injectAnnotation != "" {
// Look up Instrumentation CR in the same namespace.
instList, err := snap.List(o.InstrumentationKindName, labels.Everything())
if err == nil {
for _, inst := range instList {
if inst.GetNamespace() == target.GetNamespace() {
return []SyntheticReport{{
Policy: o.Name(),
Rule: o.Name(),
Result: ResultPass,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "OTel auto-inject annotation present + Instrumentation CR in namespace",
Properties: map[string]string{
"detection": "auto-inject",
"annotation": injectAnnotation,
"instrumentationName": inst.GetName(),
},
}}
}
}
}
// Annotation present but no Instrumentation CR — operator
// not installed. Surface as fail with a hint.
return []SyntheticReport{{
Policy: o.Name(),
Rule: o.Name(),
Result: ResultFail,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "OTel auto-inject annotation set but no Instrumentation CR in namespace — operator missing",
Properties: map[string]string{
"detection": "auto-inject-orphan",
"annotation": injectAnnotation,
},
}}
}
// 3. Neither path matched.
return []SyntheticReport{{
Policy: o.Name(),
Rule: o.Name(),
Result: ResultFail,
Resource: resourceFor(target),
Namespace: target.GetNamespace(),
Message: "no OTel collector sidecar and no auto-inject annotation",
}}
}

View File

@ -72,6 +72,19 @@ const (
EventDeleted EventType = "DELETED"
)
// KindComplianceEvaluator is the canonical kind name carried on every
// synthetic PolicyReport-shaped event published by evaluators in
// internal/k8scache/evaluators (EPIC-1 #1096 slice W2). The score
// aggregator (slice S1) subscribes to this kind alongside the upstream
// `policyreport` / `clusterpolicyreport` kinds — a single SSE stream,
// uniform shape.
//
// The synthetic kind is NOT registered as a watch target — it has no
// GVR and the factory never LISTs a real K8s resource for it.
// Subscribers filter on it via the same `?kinds=` query parameter that
// gates pods, services, etc.
const KindComplianceEvaluator = "compliance-evaluator"
// Event is a single K8s state-change notification. Encoded directly
// into the SSE frame — field names are part of the public wire
// contract and must stay stable.
@ -637,6 +650,21 @@ func (f *Factory) dispatch(cs *clusterState, k Kind, t EventType, obj any) {
f.fanout(ev)
}
// Publish injects a non-watch-derived Event onto the SSE fanout. Used
// by internal/k8scache/evaluators (EPIC-1 #1096 slice W2) to emit
// synthetic PolicyReport-shaped events on the
// `compliance-evaluator` kind. The event MUST set Cluster, Kind,
// Type, Object, At — the factory does NOT mutate or redact synthetic
// payloads (the producer owns them).
//
// Concurrency: safe for concurrent callers; the underlying fanout
// holds subMu briefly to snapshot the subscriber set, then sends
// non-blocking with the same drop-oldest backpressure as informer
// events.
func (f *Factory) Publish(ev Event) {
f.fanout(ev)
}
// fanout — non-blocking send to each subscriber. On full channel we
// drop the OLDEST event (drain one then push) so the consumer
// catches up automatically without the producer waiting.

View File

@ -116,6 +116,8 @@ func TestDefaultKinds_GraphAndDashboardSurface(t *testing.T) {
"persistentvolume", "replicaset", "endpointslice",
// dashboard color_by=utilization depends on this (#1084)
"podmetrics",
// EPIC-1 (#1096) compliance — Kyverno PolicyReports.
"policyreport", "clusterpolicyreport",
}
for _, name := range mandatory {
if _, ok := r.Get(name); !ok {
@ -603,3 +605,152 @@ func itoa(n int) string {
// keep corev1 referenced to prevent the import being elided when this
// file changes.
var _ = corev1.NamespaceDefault
// ── EPIC-1 (#1096) — PolicyReport SSE fanout ─────────────────────
// newPolicyReport returns a tiny unstructured wgpolicyk8s.io PolicyReport.
// Mirrors the schema produced by the Kyverno reports controller — fields
// kept minimal because the score aggregator (slice S1) reads only
// `metadata.{name,namespace,labels,ownerReferences}` and `results[]`.
//
// Results pass through DeepCopyJSON so the slice element type must be
// `any` (not `map[string]any`) — DeepCopyJSON refuses unknown concrete
// slice element types.
func newPolicyReport(ns, name string, results []any) *unstructured.Unstructured {
return &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "wgpolicyk8s.io/v1alpha2",
"kind": "PolicyReport",
"metadata": map[string]any{
"namespace": ns,
"name": name,
"resourceVersion": "1",
},
"results": results,
}}
}
func newClusterPolicyReport(name string, results []any) *unstructured.Unstructured {
return &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "wgpolicyk8s.io/v1alpha2",
"kind": "ClusterPolicyReport",
"metadata": map[string]any{
"name": name,
"resourceVersion": "1",
},
"results": results,
}}
}
// policyReportRegistry — minimal registry with the two PolicyReport
// kinds. Used by the W1 fanout test below.
func policyReportRegistry() *Registry {
r := NewRegistry()
_ = r.Add(Kind{
Name: "policyreport",
GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"},
Namespaced: true,
})
_ = r.Add(Kind{
Name: "clusterpolicyreport",
GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"},
Namespaced: false,
})
return r
}
// fakePolicyReportClients — like fakeClients but seeds the discovery
// scheme with the wgpolicyk8s.io types so the dynamic informer's LIST +
// WATCH succeed against the in-memory store.
func fakePolicyReportClients(objs ...runtime.Object) (*dynamicfake.FakeDynamicClient, clientgokubernetes.Interface) {
scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "PolicyReportList"}, &unstructured.UnstructuredList{})
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "PolicyReport"}, &unstructured.Unstructured{})
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "ClusterPolicyReportList"}, &unstructured.UnstructuredList{})
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "ClusterPolicyReport"}, &unstructured.Unstructured{})
gvrToListKind := map[schema.GroupVersionResource]string{
{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"}: "PolicyReportList",
{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"}: "ClusterPolicyReportList",
}
dyn := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrToListKind, objs...)
core := kfake.NewSimpleClientset()
return dyn, core
}
// TestPolicyReport_FlowsThroughSSEFanout asserts W1 — applying a
// PolicyReport CR fires the same ADD event the architecture-graph kinds
// fire, with no special-case handling. Coverage:
// - PolicyReport (namespace-scoped)
// - ClusterPolicyReport (cluster-scoped) on the same factory
// - Subscriber filtered to compliance kinds receives both
func TestPolicyReport_FlowsThroughSSEFanout(t *testing.T) {
dyn, core := fakePolicyReportClients()
cfg := Config{
Logger: quietLogger(),
Registry: policyReportRegistry(),
Clusters: []ClusterRef{
{ID: "alpha", DynamicClient: dyn, CoreClient: core},
},
}
f, err := NewFactory(cfg)
if err != nil {
t.Fatalf("NewFactory: %v", err)
}
defer f.Stop()
ch, unsub := f.Subscribe("operator", map[string]struct{}{
"policyreport": {},
"clusterpolicyreport": {},
})
defer unsub()
if err := f.Start(context.Background()); err != nil {
t.Fatalf("Start: %v", err)
}
// Apply a namespaced PolicyReport.
results := []any{
map[string]any{
"policy": "multi-replica-drainability",
"rule": "multi-replica-drainability",
"result": "fail",
"message": "Deployment has only 1 replica",
},
}
prGVR := schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"}
if _, err := dyn.Resource(prGVR).Namespace("acme").Create(context.Background(), newPolicyReport("acme", "pod-frontend-7c5f", results), metav1.CreateOptions{}); err != nil {
t.Fatalf("create PolicyReport: %v", err)
}
// Apply a cluster-scoped ClusterPolicyReport.
cprGVR := schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"}
if _, err := dyn.Resource(cprGVR).Create(context.Background(), newClusterPolicyReport("ns-acme", results), metav1.CreateOptions{}); err != nil {
t.Fatalf("create ClusterPolicyReport: %v", err)
}
gotPR, gotCPR := false, false
timeout := time.After(2 * time.Second)
for !(gotPR && gotCPR) {
select {
case ev, ok := <-ch:
if !ok {
t.Fatalf("subscriber channel closed")
}
if ev.Type != EventAdded {
continue
}
if ev.Kind == "policyreport" && ev.Object.GetName() == "pod-frontend-7c5f" {
// Sanity check — body survives unmodified (PolicyReport
// is non-sensitive so redact is a no-op).
if results, _, _ := unstructured.NestedSlice(ev.Object.Object, "results"); len(results) != 1 {
t.Fatalf("PolicyReport.results not preserved through fanout")
}
gotPR = true
}
if ev.Kind == "clusterpolicyreport" && ev.Object.GetName() == "ns-acme" {
gotCPR = true
}
case <-timeout:
t.Fatalf("never received PolicyReport (got=%v) + ClusterPolicyReport (got=%v) ADD events", gotPR, gotCPR)
}
}
}

View File

@ -120,6 +120,20 @@ var DefaultKinds = []Kind{
// real Sovereign ships bp-metrics-server in the platform bundle,
// so the utilization gradient renders out of the box.
{Name: "podmetrics", GVR: schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "pods"}, Namespaced: true},
// EPIC-1 (#1096) — Compliance: Kyverno PolicyReports.
//
// `wgpolicyk8s.io/v1alpha2/PolicyReport` is the namespace-scoped
// per-resource compliance report Kyverno emits for every Pod /
// Workload it audits. `ClusterPolicyReport` is the cluster-scoped
// equivalent for cluster-scoped resources (Namespaces, Nodes,
// CRDs, …). The score aggregator (slice S1) consumes both via the
// same SSE fanout the architecture graph already uses — no special
// path. The reports themselves carry no secret material (Kyverno
// omits the offending object's data fields by design) so
// Sensitive=false is correct.
{Name: "policyreport", GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"}, Namespaced: true},
{Name: "clusterpolicyreport", GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"}, Namespaced: false},
}
// Registry is a runtime-mutable lookup keyed by the short Name. It