W1: extend `internal/k8scache/kinds.go` `DefaultKinds` with
`wgpolicyk8s.io/v1alpha2/PolicyReport` (namespaced) and
`ClusterPolicyReport` (cluster-scoped). Reports flow through the
existing `Factory.dispatch` → `fanout` → SSE subscribers — no special
treatment. Test coverage: `TestPolicyReport_FlowsThroughSSEFanout`
applies a synthetic PolicyReport + ClusterPolicyReport via the fake
dynamic client and asserts both ADD events arrive at a kind-filtered
subscriber.
W2: new package `internal/k8scache/evaluators/` shipping 5 custom
evaluators that emit synthetic PolicyReport-shaped rows on the
`compliance-evaluator` SSE channel:
- hpa.go — HPA `spec.minReplicas` vs `status.currentReplicas`,
with Pod → ReplicaSet → Deployment owner chain.
- otel.go — OTel collector sidecar OR Pod auto-inject annotation
+ namespace Instrumentation CR.
- hubble.go — Hubble Observer flow check (DEFERRED: cilium/cilium
client not pulled by current deps; evaluator emits
skip when `Config.HubbleEnabled=false`, follow-up
slice wires the gRPC client).
- harbor.go — image starts with `<HarborDomain>/...` or operator-
supplied allow-list prefix; fail on docker.io / ghcr.io
direct refs.
- flux.go — `app.kubernetes.io/managed-by: flux` label OR Flux
ownerRef on the Pod or its controller.
Engine architecture (per ADR-0001 §5):
- Subscribes to Pod ADD/MODIFY events from the watcher.
- 30s ticker re-evaluates over the in-process Indexer (no apiserver
polling — pure cache reads).
- Publishes synthetic events via the new exported
`Factory.Publish(Event)` method which re-uses the same fanout the
architecture-graph subscribers consume.
- `KindComplianceEvaluator = "compliance-evaluator"` constant for
the score aggregator (slice S1) to subscribe to.
Per INVIOLABLE-PRINCIPLES #4: every threshold (HPA min replicas,
Hubble lookback, Harbor regex, OTel annotation prefix, Flux label
key/value) is a Config field — no hardcoded values.
Tests (28 unit cases, 17 evaluator-specific covering pass/fail/skip
matrix per evaluator + 8 engine + 1 helper):
- go test -count=1 -race ./internal/k8scache/... → CLEAN
- go vet ./... → CLEAN
Co-authored-by: hatiyildiz <hatice@openova.io>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
d74e0d5e5a
commit
a987748b42
@ -0,0 +1,575 @@
|
||||
// Package evaluators — synthetic PolicyReport producers for compliance
|
||||
// signals Kyverno cannot evaluate at admission.
|
||||
//
|
||||
// EPIC-1 (#1096) Slice W2 — five evaluators ship in this package:
|
||||
//
|
||||
// - hpa : HPA min replicas vs Deployment.replicas
|
||||
// - otel : Pod has otel-collector sidecar OR namespace has Instrumentation CR
|
||||
// - hubble : Cilium Hubble has observed flow to/from this Pod (deferred — no client dep)
|
||||
// - harbor : Container image refs harbor.<sovereign>/...
|
||||
// - flux : Resource has app.kubernetes.io/managed-by=flux OR Flux ownerRef
|
||||
//
|
||||
// Architecture (per docs/EPICS-1-6-unified-design.md §4.1, brief
|
||||
// `02-W-watcher-extension.md`):
|
||||
//
|
||||
// k8scache.Factory evaluators.Engine
|
||||
// ───────────────── ──────────────────
|
||||
// [informer events] ─Pod ADDED ──→ Subscribe(kinds={pod})
|
||||
// │
|
||||
// ┌────────┴────────┐
|
||||
// │ EvaluateAll(...) │
|
||||
// └────────┬────────┘
|
||||
// │
|
||||
// synthetic SyntheticReport rows
|
||||
// │
|
||||
// Factory.Publish(Event{Kind:"compliance-evaluator"})
|
||||
// │
|
||||
// [SSE fanout] ←───────────── re-enters the same fanout the
|
||||
// architecture-graph subscribers consume
|
||||
//
|
||||
// Per ADR-0001 §5: the 30s ticker re-evaluates over the in-process
|
||||
// Indexer, NOT against the apiserver. Evaluators are pure functions
|
||||
// over a Snapshot read-interface — they never make REST calls.
|
||||
//
|
||||
// Per docs/INVIOLABLE-PRINCIPLES.md #4: every threshold (lookback
|
||||
// window, regex pattern, registry domain template) is a Config field
|
||||
// — no hardcoded values.
|
||||
package evaluators
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
// Result is the canonical outcome — string-typed so it round-trips
|
||||
// 1:1 through the synthetic PolicyReport JSON wire format the score
|
||||
// aggregator (slice S1) consumes.
|
||||
type Result string
|
||||
|
||||
const (
|
||||
// ResultPass — workload satisfies the policy.
|
||||
ResultPass Result = "pass"
|
||||
// ResultFail — workload violates the policy.
|
||||
ResultFail Result = "fail"
|
||||
// ResultSkip — policy not applicable to this workload (e.g. HPA
|
||||
// not present, Hubble UI disabled). Slice S1's normalizer drops
|
||||
// skip rows from the denominator.
|
||||
ResultSkip Result = "skip"
|
||||
// ResultWarn — informational, does not affect the score.
|
||||
ResultWarn Result = "warn"
|
||||
)
|
||||
|
||||
// SyntheticReport mirrors the wgpolicyk8s.io PolicyReport row schema so
|
||||
// the score aggregator's join is uniform across Kyverno-emitted rows
|
||||
// and evaluator-emitted ones.
|
||||
//
|
||||
// Wire-shape: encoded directly into the SSE event's Object. The score
|
||||
// aggregator deserialises this struct from each event's
|
||||
// .object.results[0] field — keep field names stable across releases.
|
||||
type SyntheticReport struct {
|
||||
// Policy — short canonical policy name, matching the EPIC-1 §4.3
|
||||
// table ("hpa-effective", "otel-injected", "harbor-proxy-pull",
|
||||
// "flux-managed", "hubble-flows-seen").
|
||||
Policy string `json:"policy"`
|
||||
|
||||
// Rule — for parity with Kyverno's per-rule reporting; equal to
|
||||
// Policy for the simple one-rule evaluators in this package.
|
||||
Rule string `json:"rule"`
|
||||
|
||||
// Result — pass / fail / skip / warn.
|
||||
Result Result `json:"result"`
|
||||
|
||||
// Resource — back-pointer to the offending K8s object. Populated
|
||||
// from the workload metadata (apiVersion, kind, name, namespace,
|
||||
// uid). Skip rows still set Resource so the UI drill-down shows
|
||||
// the workload that was inspected.
|
||||
Resource metav1.OwnerReference `json:"resource"`
|
||||
|
||||
// Namespace — copied from the workload for the namespace-aware
|
||||
// drill-down filter.
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
|
||||
// Message — human-readable explanation.
|
||||
Message string `json:"message,omitempty"`
|
||||
|
||||
// Properties — arbitrary key-value details (e.g. for HPA the
|
||||
// minReplicas + currentReplicas observed values; for Harbor the
|
||||
// rejected image ref).
|
||||
Properties map[string]string `json:"properties,omitempty"`
|
||||
|
||||
// Time — server-side timestamp the row was produced. Driven by
|
||||
// the Engine's clock for deterministic test fixtures.
|
||||
Time metav1.Time `json:"time"`
|
||||
}
|
||||
|
||||
// Snapshot is the read-side interface evaluators consume. Backed in
|
||||
// production by k8scache.Factory.List which reads the in-process
|
||||
// Indexer (no apiserver calls). Tests inject a fake.
|
||||
//
|
||||
// Cluster-scope: every evaluator works against ONE Sovereign at a
|
||||
// time — the cluster id is passed at Engine.Run time. Snapshot.List
|
||||
// returns objects from that single cluster's Indexer.
|
||||
type Snapshot interface {
|
||||
// List returns every object of `kindName` currently in the cache,
|
||||
// optionally filtered by a label selector. An empty selector
|
||||
// returns all objects. Returns an error when the kind is not
|
||||
// registered.
|
||||
List(kindName string, sel labels.Selector) ([]*unstructured.Unstructured, error)
|
||||
}
|
||||
|
||||
// Evaluator is a pure function: given a snapshot of the cluster and a
|
||||
// target workload (Pod / Deployment / etc.), produce zero or more
|
||||
// SyntheticReport rows.
|
||||
//
|
||||
// Evaluators MUST NOT block on I/O — they read from `snap` (the local
|
||||
// Indexer) and return. Long-running reachability checks (e.g. Hubble
|
||||
// flow query) belong on the engine's tick goroutine, not in
|
||||
// Evaluate.
|
||||
type Evaluator interface {
|
||||
// Name returns the canonical policy name. Used for logging,
|
||||
// metrics, and the SyntheticReport.Policy field.
|
||||
Name() string
|
||||
|
||||
// Evaluate applies the policy to the target. May return:
|
||||
// - one row (typical)
|
||||
// - zero rows (target not in scope — skipped silently rather
|
||||
// than emitting a skip row; e.g. otel evaluator skips
|
||||
// non-application workloads)
|
||||
// - multiple rows (one per container, etc.)
|
||||
Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport
|
||||
}
|
||||
|
||||
// Config — runtime knobs for the engine and individual evaluators.
|
||||
// Per INVIOLABLE-PRINCIPLES #4 every threshold is a config var.
|
||||
type Config struct {
|
||||
// Logger — required.
|
||||
Logger *slog.Logger
|
||||
|
||||
// TickInterval — how often the engine re-evaluates over the
|
||||
// snapshot. Defaults to 30s. Setting to 0 disables the ticker (the
|
||||
// engine still reacts to Pod ADD/MODIFY events).
|
||||
TickInterval time.Duration
|
||||
|
||||
// HPAMinReplicas — only inspected by hpa.go; the floor under
|
||||
// which the evaluator emits FAIL even when HPA is happy. Default
|
||||
// 1 (any positive replica count is acceptable).
|
||||
HPAMinReplicas int32
|
||||
|
||||
// HubbleEnabled — when false the hubble evaluator emits skip
|
||||
// without trying to talk to the Hubble Observer API. Wired from
|
||||
// the bp-cilium chart's `hubble.relay.enabled` value.
|
||||
HubbleEnabled bool
|
||||
|
||||
// HubbleLookbackWindow — how far back in time the hubble
|
||||
// evaluator searches for flows touching the Pod. Default 5min.
|
||||
HubbleLookbackWindow time.Duration
|
||||
|
||||
// HarborDomain — the per-Sovereign Harbor host used by the harbor
|
||||
// evaluator's prefix check (e.g. `harbor.omantel.omani.works`).
|
||||
// Empty disables the harbor evaluator (skip everywhere). Per
|
||||
// `feedback_never_hardcode_urls.md` this is a runtime config —
|
||||
// the value is sourced from the Sovereign's bp-harbor chart, NOT
|
||||
// from Go.
|
||||
HarborDomain string
|
||||
|
||||
// HarborAllowedPrefixes — additional registry prefixes that are
|
||||
// permitted (e.g. internal mirrors). Default empty. The harbor
|
||||
// evaluator passes if image starts with `<HarborDomain>/` OR any
|
||||
// of these prefixes.
|
||||
HarborAllowedPrefixes []string
|
||||
|
||||
// FluxManagedByLabel — label key whose value `flux` indicates
|
||||
// Flux ownership. Default `app.kubernetes.io/managed-by`.
|
||||
FluxManagedByLabel string
|
||||
|
||||
// FluxManagedByValue — label value indicating Flux ownership.
|
||||
// Default `flux`.
|
||||
FluxManagedByValue string
|
||||
|
||||
// OTelInjectAnnotationPrefix — annotation prefix that signals
|
||||
// OTel auto-instrumentation (Pod-level). Default
|
||||
// `instrumentation.opentelemetry.io/inject-`. The evaluator
|
||||
// checks any annotation key with this prefix whose value is
|
||||
// `true`.
|
||||
OTelInjectAnnotationPrefix string
|
||||
|
||||
// OTelSidecarImageMatch — substring matched against each
|
||||
// container's image to detect an OTel collector sidecar.
|
||||
// Default `opentelemetry-collector`.
|
||||
OTelSidecarImageMatch string
|
||||
|
||||
// OTelInstrumentationKind — the kind name (in the k8scache
|
||||
// registry) for the namespace-scoped Instrumentation CR. When
|
||||
// the kind is not registered the evaluator falls back to
|
||||
// sidecar + annotation checks only. Default
|
||||
// `instrumentation.opentelemetry.io`.
|
||||
OTelInstrumentationKind string
|
||||
|
||||
// Now — time source for SyntheticReport.Time. Defaults to
|
||||
// time.Now. Overridden by tests for deterministic timestamps.
|
||||
Now func() time.Time
|
||||
}
|
||||
|
||||
func (c *Config) defaults() {
|
||||
if c.TickInterval == 0 {
|
||||
c.TickInterval = 30 * time.Second
|
||||
}
|
||||
if c.HPAMinReplicas == 0 {
|
||||
c.HPAMinReplicas = 1
|
||||
}
|
||||
if c.HubbleLookbackWindow == 0 {
|
||||
c.HubbleLookbackWindow = 5 * time.Minute
|
||||
}
|
||||
if c.FluxManagedByLabel == "" {
|
||||
c.FluxManagedByLabel = "app.kubernetes.io/managed-by"
|
||||
}
|
||||
if c.FluxManagedByValue == "" {
|
||||
c.FluxManagedByValue = "flux"
|
||||
}
|
||||
if c.OTelInjectAnnotationPrefix == "" {
|
||||
c.OTelInjectAnnotationPrefix = "instrumentation.opentelemetry.io/inject-"
|
||||
}
|
||||
if c.OTelSidecarImageMatch == "" {
|
||||
c.OTelSidecarImageMatch = "opentelemetry-collector"
|
||||
}
|
||||
if c.OTelInstrumentationKind == "" {
|
||||
c.OTelInstrumentationKind = "instrumentation.opentelemetry.io"
|
||||
}
|
||||
if c.Now == nil {
|
||||
c.Now = time.Now
|
||||
}
|
||||
}
|
||||
|
||||
// Publisher is the write-side interface — the Engine pushes synthetic
|
||||
// events through this. Production wires it to k8scache.Factory.Publish;
|
||||
// tests inject a recorder.
|
||||
type Publisher interface {
|
||||
// Publish emits one synthetic event. The Engine wraps every
|
||||
// SyntheticReport into the wire envelope before calling.
|
||||
Publish(clusterID string, report SyntheticReport)
|
||||
}
|
||||
|
||||
// EvaluateAll runs every evaluator against the target and returns the
|
||||
// concatenated results. Convenience wrapper used by the engine and
|
||||
// tests; never returns nil — empty slice on no findings.
|
||||
//
|
||||
// Order is deterministic — evaluators are applied in the order
|
||||
// supplied. Tests rely on this; do not parallelise here.
|
||||
func EvaluateAll(ctx context.Context, snap Snapshot, target *unstructured.Unstructured, evals []Evaluator) []SyntheticReport {
|
||||
if target == nil {
|
||||
return nil
|
||||
}
|
||||
out := make([]SyntheticReport, 0, len(evals))
|
||||
for _, e := range evals {
|
||||
rows := e.Evaluate(ctx, snap, target)
|
||||
out = append(out, rows...)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Engine subscribes to the watcher's Pod events, runs every registered
|
||||
// evaluator on each event, and publishes the synthetic reports back
|
||||
// through the SSE fanout.
|
||||
//
|
||||
// Lifecycle:
|
||||
// - NewEngine validates Config + evaluator set
|
||||
// - Run blocks until ctx is cancelled; spawns one goroutine per
|
||||
// subscribed cluster + one ticker goroutine
|
||||
// - Multiple invocations are NOT supported on the same Engine; create
|
||||
// a new Engine for re-runs
|
||||
type Engine struct {
|
||||
cfg Config
|
||||
evaluators []Evaluator
|
||||
pub Publisher
|
||||
|
||||
// resolveSnapshot maps clusterID → Snapshot. The factory has one
|
||||
// Indexer per cluster; the engine looks up by id on every event /
|
||||
// tick.
|
||||
resolveSnapshot func(clusterID string) (Snapshot, []string, error)
|
||||
|
||||
// subscribe maps a (user, kinds) pair to a channel. Production
|
||||
// wires to Factory.Subscribe.
|
||||
subscribe func(kinds map[string]struct{}) (<-chan eventLite, func())
|
||||
|
||||
mu sync.Mutex
|
||||
started bool
|
||||
}
|
||||
|
||||
// eventLite is the engine's internal copy of k8scache.Event minus the
|
||||
// EventType — the engine only cares about ADD / MODIFY for the trigger
|
||||
// path. DELETE is handled implicitly: when a target disappears from
|
||||
// the Indexer, the next tick stops emitting reports for it.
|
||||
type eventLite struct {
|
||||
Cluster string
|
||||
Kind string
|
||||
Object *unstructured.Unstructured
|
||||
}
|
||||
|
||||
// NewEngine wires an Engine without starting it.
|
||||
func NewEngine(cfg Config, evals []Evaluator, pub Publisher,
|
||||
resolveSnapshot func(clusterID string) (Snapshot, []string, error),
|
||||
subscribe func(kinds map[string]struct{}) (<-chan eventLite, func()),
|
||||
) (*Engine, error) {
|
||||
if cfg.Logger == nil {
|
||||
return nil, errors.New("evaluators: Config.Logger is required")
|
||||
}
|
||||
if pub == nil {
|
||||
return nil, errors.New("evaluators: Publisher is required")
|
||||
}
|
||||
if resolveSnapshot == nil {
|
||||
return nil, errors.New("evaluators: resolveSnapshot is required")
|
||||
}
|
||||
if subscribe == nil {
|
||||
return nil, errors.New("evaluators: subscribe is required")
|
||||
}
|
||||
if len(evals) == 0 {
|
||||
return nil, errors.New("evaluators: at least one Evaluator must be registered")
|
||||
}
|
||||
cfg.defaults()
|
||||
return &Engine{
|
||||
cfg: cfg,
|
||||
evaluators: evals,
|
||||
pub: pub,
|
||||
resolveSnapshot: resolveSnapshot,
|
||||
subscribe: subscribe,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts the engine and blocks until ctx is cancelled. On exit it
|
||||
// closes the watcher subscription cleanly.
|
||||
func (e *Engine) Run(ctx context.Context) error {
|
||||
e.mu.Lock()
|
||||
if e.started {
|
||||
e.mu.Unlock()
|
||||
return errors.New("evaluators: Engine already started")
|
||||
}
|
||||
e.started = true
|
||||
e.mu.Unlock()
|
||||
|
||||
// Subscribe to Pod events from the underlying watcher.
|
||||
ch, unsub := e.subscribe(map[string]struct{}{"pod": {}})
|
||||
defer unsub()
|
||||
|
||||
// Periodic ticker — pure cache reads, no apiserver polling. Per
|
||||
// ADR-0001 §5 this is acceptable because evaluators compute over
|
||||
// data already in the in-process Indexer.
|
||||
var tick <-chan time.Time
|
||||
if e.cfg.TickInterval > 0 {
|
||||
t := time.NewTicker(e.cfg.TickInterval)
|
||||
defer t.Stop()
|
||||
tick = t.C
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case ev, ok := <-ch:
|
||||
if !ok {
|
||||
// Subscription closed by the factory (Stop). Exit
|
||||
// cleanly.
|
||||
return nil
|
||||
}
|
||||
e.evaluateOne(ctx, ev.Cluster, ev.Object)
|
||||
case <-tick:
|
||||
e.evaluateAllClusters(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// evaluateOne runs every evaluator against a single target and
|
||||
// publishes the resulting rows.
|
||||
func (e *Engine) evaluateOne(ctx context.Context, clusterID string, target *unstructured.Unstructured) {
|
||||
if target == nil {
|
||||
return
|
||||
}
|
||||
snap, _, err := e.resolveSnapshot(clusterID)
|
||||
if err != nil {
|
||||
e.cfg.Logger.Warn("evaluators: snapshot unavailable for event",
|
||||
"cluster", clusterID, "err", err)
|
||||
return
|
||||
}
|
||||
rows := EvaluateAll(ctx, snap, target, e.evaluators)
|
||||
for i := range rows {
|
||||
if rows[i].Time.IsZero() {
|
||||
rows[i].Time = metav1.NewTime(e.cfg.Now())
|
||||
}
|
||||
e.pub.Publish(clusterID, rows[i])
|
||||
}
|
||||
}
|
||||
|
||||
// evaluateAllClusters fans the tick across every cluster the
|
||||
// resolveSnapshot function knows about. Each cluster's Pod list is
|
||||
// pulled from its Indexer and every evaluator runs against every
|
||||
// target. Cost is O(clusters × pods × evaluators) but all reads are
|
||||
// from the local cache.
|
||||
func (e *Engine) evaluateAllClusters(ctx context.Context) {
|
||||
// resolveSnapshot returns the cluster's Snapshot AND the list of
|
||||
// known cluster ids when the second return is non-empty. Callers
|
||||
// can pass clusterID="" to mean "give me the list of known
|
||||
// clusters".
|
||||
_, clusters, err := e.resolveSnapshot("")
|
||||
if err != nil || len(clusters) == 0 {
|
||||
return
|
||||
}
|
||||
for _, id := range clusters {
|
||||
snap, _, err := e.resolveSnapshot(id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
pods, err := snap.List("pod", labels.Everything())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, p := range pods {
|
||||
rows := EvaluateAll(ctx, snap, p, e.evaluators)
|
||||
for i := range rows {
|
||||
if rows[i].Time.IsZero() {
|
||||
rows[i].Time = metav1.NewTime(e.cfg.Now())
|
||||
}
|
||||
e.pub.Publish(id, rows[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// helpers --------------------------------------------------------
|
||||
|
||||
// resourceFor builds an OwnerReference from the target's metadata.
|
||||
// All five evaluators populate SyntheticReport.Resource via this so
|
||||
// the wire shape is uniform.
|
||||
func resourceFor(target *unstructured.Unstructured) metav1.OwnerReference {
|
||||
if target == nil {
|
||||
return metav1.OwnerReference{}
|
||||
}
|
||||
gv := target.GetAPIVersion()
|
||||
return metav1.OwnerReference{
|
||||
APIVersion: gv,
|
||||
Kind: target.GetKind(),
|
||||
Name: target.GetName(),
|
||||
UID: target.GetUID(),
|
||||
}
|
||||
}
|
||||
|
||||
// containerImages returns every container image string in the Pod
|
||||
// (initContainers + containers + ephemeralContainers). Empty slice
|
||||
// when the target is not a Pod or has no containers. Used by harbor
|
||||
// + otel evaluators.
|
||||
func containerImages(target *unstructured.Unstructured) []string {
|
||||
if target == nil {
|
||||
return nil
|
||||
}
|
||||
out := []string{}
|
||||
for _, group := range []string{"containers", "initContainers", "ephemeralContainers"} {
|
||||
raw, found, _ := unstructured.NestedSlice(target.Object, "spec", group)
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
for _, item := range raw {
|
||||
c, ok := item.(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if img, ok := c["image"].(string); ok && img != "" {
|
||||
out = append(out, img)
|
||||
}
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// containerNames returns container names alongside images — needed by
|
||||
// the otel evaluator's "is there a sidecar named otel-*" branch.
|
||||
func containerNames(target *unstructured.Unstructured) []string {
|
||||
if target == nil {
|
||||
return nil
|
||||
}
|
||||
out := []string{}
|
||||
for _, group := range []string{"containers", "initContainers"} {
|
||||
raw, found, _ := unstructured.NestedSlice(target.Object, "spec", group)
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
for _, item := range raw {
|
||||
c, ok := item.(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if n, ok := c["name"].(string); ok && n != "" {
|
||||
out = append(out, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// hasOwnerOfKind returns true when the target has at least one
|
||||
// ownerReference whose APIVersion and Kind match. Empty apiVersion
|
||||
// matches any group (used by the flux evaluator's HelmRelease /
|
||||
// Kustomization detection).
|
||||
func hasOwnerOfKind(target *unstructured.Unstructured, apiGroupSuffix, kind string) bool {
|
||||
if target == nil {
|
||||
return false
|
||||
}
|
||||
for _, ref := range target.GetOwnerReferences() {
|
||||
if !strings.EqualFold(ref.Kind, kind) {
|
||||
continue
|
||||
}
|
||||
if apiGroupSuffix == "" || strings.Contains(ref.APIVersion, apiGroupSuffix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// isPod returns true when the target is a v1 Pod.
|
||||
func isPod(target *unstructured.Unstructured) bool {
|
||||
if target == nil {
|
||||
return false
|
||||
}
|
||||
return target.GetKind() == "Pod"
|
||||
}
|
||||
|
||||
// reportTime — produce a metav1.Time from the engine's clock; used by
|
||||
// evaluators that may emit before the engine fills it in.
|
||||
func reportTime(now func() time.Time) metav1.Time {
|
||||
if now == nil {
|
||||
now = time.Now
|
||||
}
|
||||
return metav1.NewTime(now())
|
||||
}
|
||||
|
||||
// formatLabelMap stringifies a label set for human-readable
|
||||
// SyntheticReport.Properties values.
|
||||
func formatLabelMap(m map[string]string) string {
|
||||
if len(m) == 0 {
|
||||
return ""
|
||||
}
|
||||
parts := make([]string, 0, len(m))
|
||||
for k, v := range m {
|
||||
parts = append(parts, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
return strings.Join(parts, ",")
|
||||
}
|
||||
|
||||
// SubscribeAdapter wraps a k8scache.Factory.Subscribe call into the
|
||||
// engine's eventLite channel. Production code in the catalyst-api
|
||||
// `cmd/api/main.go` wires this. Exposed here so tests can construct
|
||||
// an Engine without depending on the full Factory.
|
||||
type SubscribeAdapter func(kinds map[string]struct{}) (<-chan eventLite, func())
|
||||
|
||||
// EventLiteFromUnstructured is a test convenience to build eventLite
|
||||
// values without exposing the unexported field. Used by the table
|
||||
// tests in evaluators_test.go.
|
||||
func EventLiteFromUnstructured(cluster, kind string, obj *unstructured.Unstructured) eventLite {
|
||||
return eventLite{Cluster: cluster, Kind: kind, Object: obj}
|
||||
}
|
||||
@ -0,0 +1,759 @@
|
||||
// evaluators_test.go — unit tests for the 5 evaluators + the engine.
|
||||
//
|
||||
// EPIC-1 (#1096) Slice W2.
|
||||
//
|
||||
// Coverage matrix per evaluator:
|
||||
//
|
||||
// hpa : pass (HPA satisfies floor) + fail (currentReplicas < min)
|
||||
// + skip (no HPA) + skip (Job-owned Pod)
|
||||
// otel : pass (sidecar) + pass (auto-inject + Instrumentation CR)
|
||||
// + fail (no sidecar, no annotation) + fail (annotation but no CR)
|
||||
// hubble : skip (Hubble disabled) + pass (FlowsSeen=true)
|
||||
// + fail (FlowsSeen=false) + warn (Probe error)
|
||||
// harbor : pass (Harbor-prefixed) + pass (allowed-prefix)
|
||||
// + fail (docker.io) + skip (HarborDomain empty)
|
||||
// + skip (no containers)
|
||||
// flux : pass (managed-by label) + pass (HelmRelease ownerRef)
|
||||
// + pass (controller is Flux-owned) + fail (neither)
|
||||
//
|
||||
// Engine:
|
||||
// - subscribe + tick path emits events to the Publisher
|
||||
// - resolveSnapshot error path is logged + suppressed
|
||||
// - cancellation cleanly shuts the engine down
|
||||
//
|
||||
// Plus 1 EvaluateAll concatenation test.
|
||||
package evaluators
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
// quietLogger discards log output.
|
||||
func quietLogger() *slog.Logger {
|
||||
return slog.New(slog.NewJSONHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
|
||||
}
|
||||
|
||||
// fakeSnapshot is an in-memory Snapshot keyed by canonical kind name.
|
||||
type fakeSnapshot struct {
|
||||
by map[string][]*unstructured.Unstructured
|
||||
}
|
||||
|
||||
func (f *fakeSnapshot) List(kind string, _ labels.Selector) ([]*unstructured.Unstructured, error) {
|
||||
if f.by == nil {
|
||||
return nil, nil
|
||||
}
|
||||
v, ok := f.by[kind]
|
||||
if !ok {
|
||||
// Mirror the production Factory.List behaviour — unknown
|
||||
// kind returns an error (the evaluator must skip rather
|
||||
// than crash).
|
||||
return nil, errKindNotRegistered
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
var errKindNotRegistered = errors.New("kind not registered")
|
||||
|
||||
// recorder is a Publisher that captures every emitted (cluster, report) pair.
|
||||
type recorder struct {
|
||||
mu sync.Mutex
|
||||
entries []recordedEntry
|
||||
}
|
||||
|
||||
type recordedEntry struct {
|
||||
cluster string
|
||||
report SyntheticReport
|
||||
}
|
||||
|
||||
func (r *recorder) Publish(cluster string, report SyntheticReport) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.entries = append(r.entries, recordedEntry{cluster: cluster, report: report})
|
||||
}
|
||||
|
||||
func (r *recorder) Snapshot() []recordedEntry {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
out := make([]recordedEntry, len(r.entries))
|
||||
copy(out, r.entries)
|
||||
return out
|
||||
}
|
||||
|
||||
// ── builders ─────────────────────────────────────────────────────
|
||||
|
||||
func uPod(ns, name string, opts ...podOpt) *unstructured.Unstructured {
|
||||
p := &unstructured.Unstructured{Object: map[string]any{
|
||||
"apiVersion": "v1",
|
||||
"kind": "Pod",
|
||||
"metadata": map[string]any{
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
"uid": "uid-" + ns + "-" + name,
|
||||
},
|
||||
"spec": map[string]any{
|
||||
"containers": []any{},
|
||||
},
|
||||
}}
|
||||
for _, o := range opts {
|
||||
o(p)
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
type podOpt func(*unstructured.Unstructured)
|
||||
|
||||
func withOwner(kind, name string) podOpt {
|
||||
return func(p *unstructured.Unstructured) {
|
||||
owners := p.GetOwnerReferences()
|
||||
owners = append(owners, metav1.OwnerReference{Kind: kind, Name: name, APIVersion: "apps/v1"})
|
||||
p.SetOwnerReferences(owners)
|
||||
}
|
||||
}
|
||||
|
||||
func withFluxOwner() podOpt {
|
||||
return func(p *unstructured.Unstructured) {
|
||||
owners := p.GetOwnerReferences()
|
||||
owners = append(owners, metav1.OwnerReference{Kind: "HelmRelease", Name: "x", APIVersion: "helm.toolkit.fluxcd.io/v2"})
|
||||
p.SetOwnerReferences(owners)
|
||||
}
|
||||
}
|
||||
|
||||
func withLabels(m map[string]string) podOpt {
|
||||
return func(p *unstructured.Unstructured) { p.SetLabels(m) }
|
||||
}
|
||||
|
||||
func withAnnotations(m map[string]string) podOpt {
|
||||
return func(p *unstructured.Unstructured) { p.SetAnnotations(m) }
|
||||
}
|
||||
|
||||
func withContainerImages(images ...string) podOpt {
|
||||
return func(p *unstructured.Unstructured) {
|
||||
raw := []any{}
|
||||
for i, img := range images {
|
||||
raw = append(raw, map[string]any{
|
||||
"name": imageContainerName(i),
|
||||
"image": img,
|
||||
})
|
||||
}
|
||||
_ = unstructured.SetNestedSlice(p.Object, raw, "spec", "containers")
|
||||
}
|
||||
}
|
||||
|
||||
func imageContainerName(i int) string {
|
||||
if i == 0 {
|
||||
return "main"
|
||||
}
|
||||
return "side"
|
||||
}
|
||||
|
||||
func uHPA(ns, name, targetKind, targetName string, minReplicas, currentReplicas int64) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{Object: map[string]any{
|
||||
"apiVersion": "autoscaling/v2",
|
||||
"kind": "HorizontalPodAutoscaler",
|
||||
"metadata": map[string]any{
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
},
|
||||
"spec": map[string]any{
|
||||
"minReplicas": minReplicas,
|
||||
"scaleTargetRef": map[string]any{
|
||||
"kind": targetKind,
|
||||
"name": targetName,
|
||||
},
|
||||
},
|
||||
"status": map[string]any{
|
||||
"currentReplicas": currentReplicas,
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
func uReplicaSet(ns, name, deploymentName string) *unstructured.Unstructured {
|
||||
rs := &unstructured.Unstructured{Object: map[string]any{
|
||||
"apiVersion": "apps/v1",
|
||||
"kind": "ReplicaSet",
|
||||
"metadata": map[string]any{
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
},
|
||||
}}
|
||||
if deploymentName != "" {
|
||||
rs.SetOwnerReferences([]metav1.OwnerReference{{
|
||||
Kind: "Deployment",
|
||||
Name: deploymentName,
|
||||
APIVersion: "apps/v1",
|
||||
}})
|
||||
}
|
||||
return rs
|
||||
}
|
||||
|
||||
func uDeployment(ns, name string, fluxLabel bool) *unstructured.Unstructured {
|
||||
d := &unstructured.Unstructured{Object: map[string]any{
|
||||
"apiVersion": "apps/v1",
|
||||
"kind": "Deployment",
|
||||
"metadata": map[string]any{
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
},
|
||||
}}
|
||||
if fluxLabel {
|
||||
d.SetLabels(map[string]string{"app.kubernetes.io/managed-by": "flux"})
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
func uInstrumentation(ns, name string) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{Object: map[string]any{
|
||||
"apiVersion": "opentelemetry.io/v1alpha1",
|
||||
"kind": "Instrumentation",
|
||||
"metadata": map[string]any{
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// ── HPA evaluator ────────────────────────────────────────────────
|
||||
|
||||
func TestHPA_Pass_HPASatisfiesFloor(t *testing.T) {
|
||||
hpa := uHPA("acme", "frontend-hpa", "Deployment", "frontend", 3, 5)
|
||||
rs := uReplicaSet("acme", "frontend-7c5f", "frontend")
|
||||
pod := uPod("acme", "frontend-7c5f-abc", withOwner("ReplicaSet", "frontend-7c5f"))
|
||||
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
|
||||
"horizontalpodautoscaler": {hpa},
|
||||
"replicaset": {rs},
|
||||
}}
|
||||
rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if len(rep) != 1 {
|
||||
t.Fatalf("want 1 row got %d", len(rep))
|
||||
}
|
||||
if rep[0].Result != ResultPass {
|
||||
t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHPA_Fail_CurrentBelowMin(t *testing.T) {
|
||||
hpa := uHPA("acme", "frontend-hpa", "Deployment", "frontend", 3, 1)
|
||||
rs := uReplicaSet("acme", "frontend-7c5f", "frontend")
|
||||
pod := uPod("acme", "frontend-7c5f-abc", withOwner("ReplicaSet", "frontend-7c5f"))
|
||||
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
|
||||
"horizontalpodautoscaler": {hpa},
|
||||
"replicaset": {rs},
|
||||
}}
|
||||
rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultFail {
|
||||
t.Fatalf("want fail got %s (%s)", rep[0].Result, rep[0].Message)
|
||||
}
|
||||
if rep[0].Properties["minReplicas"] != "3" || rep[0].Properties["currentReplicas"] != "1" {
|
||||
t.Fatalf("properties not populated: %+v", rep[0].Properties)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHPA_Skip_NoHPA(t *testing.T) {
|
||||
rs := uReplicaSet("acme", "control-plane-rs", "control-plane")
|
||||
pod := uPod("acme", "control-plane-pod", withOwner("ReplicaSet", "control-plane-rs"))
|
||||
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
|
||||
"horizontalpodautoscaler": {},
|
||||
"replicaset": {rs},
|
||||
}}
|
||||
rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultSkip {
|
||||
t.Fatalf("want skip got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHPA_Skip_JobOwnedPod(t *testing.T) {
|
||||
pod := uPod("acme", "batch-pod", withOwner("Job", "nightly-cron"))
|
||||
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{}}
|
||||
rep := newHPAEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultSkip {
|
||||
t.Fatalf("want skip for Job-owned Pod got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func newHPAEvaluator() *HPAEvaluator {
|
||||
cfg := Config{Logger: quietLogger()}
|
||||
cfg.defaults()
|
||||
return NewHPAEvaluator(cfg)
|
||||
}
|
||||
|
||||
// ── OTel evaluator ───────────────────────────────────────────────
|
||||
|
||||
func TestOTel_Pass_Sidecar(t *testing.T) {
|
||||
pod := uPod("acme", "app", withContainerImages(
|
||||
"docker.io/library/nginx:1.25",
|
||||
"otel/opentelemetry-collector-contrib:0.95",
|
||||
))
|
||||
snap := &fakeSnapshot{}
|
||||
rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultPass {
|
||||
t.Fatalf("want pass got %s", rep[0].Result)
|
||||
}
|
||||
if rep[0].Properties["detection"] != "sidecar" {
|
||||
t.Fatalf("want sidecar detection got %v", rep[0].Properties)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOTel_Pass_AutoInjectAnnotation(t *testing.T) {
|
||||
pod := uPod("acme", "app",
|
||||
withContainerImages("docker.io/library/nginx:1.25"),
|
||||
withAnnotations(map[string]string{
|
||||
"instrumentation.opentelemetry.io/inject-go": "true",
|
||||
}),
|
||||
)
|
||||
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
|
||||
"instrumentation.opentelemetry.io": {uInstrumentation("acme", "default")},
|
||||
}}
|
||||
rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultPass {
|
||||
t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message)
|
||||
}
|
||||
if rep[0].Properties["detection"] != "auto-inject" {
|
||||
t.Fatalf("want auto-inject got %v", rep[0].Properties)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOTel_Fail_NoSidecarNoAnnotation(t *testing.T) {
|
||||
pod := uPod("acme", "app", withContainerImages("docker.io/library/nginx:1.25"))
|
||||
snap := &fakeSnapshot{}
|
||||
rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultFail {
|
||||
t.Fatalf("want fail got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOTel_Fail_AnnotationButNoInstrumentationCR(t *testing.T) {
|
||||
pod := uPod("acme", "app",
|
||||
withContainerImages("docker.io/library/nginx:1.25"),
|
||||
withAnnotations(map[string]string{
|
||||
"instrumentation.opentelemetry.io/inject-go": "true",
|
||||
}),
|
||||
)
|
||||
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
|
||||
"instrumentation.opentelemetry.io": {}, // operator installed, no CR in this ns
|
||||
}}
|
||||
rep := newOTelEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultFail {
|
||||
t.Fatalf("want fail (orphan annotation) got %s", rep[0].Result)
|
||||
}
|
||||
if rep[0].Properties["detection"] != "auto-inject-orphan" {
|
||||
t.Fatalf("want auto-inject-orphan got %v", rep[0].Properties)
|
||||
}
|
||||
}
|
||||
|
||||
func newOTelEvaluator() *OTelEvaluator {
|
||||
cfg := Config{Logger: quietLogger()}
|
||||
cfg.defaults()
|
||||
return NewOTelEvaluator(cfg)
|
||||
}
|
||||
|
||||
// ── Hubble evaluator ─────────────────────────────────────────────
|
||||
|
||||
func TestHubble_Skip_Disabled(t *testing.T) {
|
||||
pod := uPod("acme", "app")
|
||||
rep := newHubbleEvaluator(false, nil).Evaluate(context.Background(), nil, pod)
|
||||
if rep[0].Result != ResultSkip {
|
||||
t.Fatalf("want skip when disabled got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubble_Pass_FlowsSeen(t *testing.T) {
|
||||
pod := uPod("acme", "app")
|
||||
probe := &fakeHubbleProbe{seen: true}
|
||||
rep := newHubbleEvaluator(true, probe).Evaluate(context.Background(), nil, pod)
|
||||
if rep[0].Result != ResultPass {
|
||||
t.Fatalf("want pass got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubble_Fail_NoFlows(t *testing.T) {
|
||||
pod := uPod("acme", "app")
|
||||
probe := &fakeHubbleProbe{seen: false}
|
||||
rep := newHubbleEvaluator(true, probe).Evaluate(context.Background(), nil, pod)
|
||||
if rep[0].Result != ResultFail {
|
||||
t.Fatalf("want fail got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubble_Warn_ProbeError(t *testing.T) {
|
||||
pod := uPod("acme", "app")
|
||||
probe := &fakeHubbleProbe{err: errors.New("connection refused")}
|
||||
rep := newHubbleEvaluator(true, probe).Evaluate(context.Background(), nil, pod)
|
||||
if rep[0].Result != ResultWarn {
|
||||
t.Fatalf("want warn got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
type fakeHubbleProbe struct {
|
||||
seen bool
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *fakeHubbleProbe) FlowsSeen(_ context.Context, _ *unstructured.Unstructured, _ int64) (bool, error) {
|
||||
if f.err != nil {
|
||||
return false, f.err
|
||||
}
|
||||
return f.seen, nil
|
||||
}
|
||||
|
||||
func newHubbleEvaluator(enabled bool, probe HubbleProbe) *HubbleEvaluator {
|
||||
cfg := Config{Logger: quietLogger(), HubbleEnabled: enabled}
|
||||
cfg.defaults()
|
||||
e := NewHubbleEvaluator(cfg)
|
||||
e.Probe = probe
|
||||
return e
|
||||
}
|
||||
|
||||
// ── Harbor evaluator ─────────────────────────────────────────────
|
||||
|
||||
func TestHarbor_Pass_HarborPrefixed(t *testing.T) {
|
||||
pod := uPod("acme", "app", withContainerImages(
|
||||
"harbor.omantel.omani.works/proxy-ghcr/openova-io/openova/website:abc123",
|
||||
))
|
||||
rep := newHarborEvaluator("harbor.omantel.omani.works", nil).Evaluate(context.Background(), nil, pod)
|
||||
if rep[0].Result != ResultPass {
|
||||
t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHarbor_Pass_AllowedPrefix(t *testing.T) {
|
||||
pod := uPod("acme", "app", withContainerImages(
|
||||
"mirror.openova.io/internal/sidecar:1.0",
|
||||
))
|
||||
rep := newHarborEvaluator("harbor.omantel.omani.works",
|
||||
[]string{"mirror.openova.io/"}).Evaluate(context.Background(), nil, pod)
|
||||
if rep[0].Result != ResultPass {
|
||||
t.Fatalf("want pass got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHarbor_Fail_DockerHub(t *testing.T) {
|
||||
pod := uPod("acme", "app", withContainerImages(
|
||||
"harbor.omantel.omani.works/proxy-ghcr/openova-io/openova/api:abc",
|
||||
"docker.io/library/redis:7.0",
|
||||
))
|
||||
rep := newHarborEvaluator("harbor.omantel.omani.works", nil).Evaluate(context.Background(), nil, pod)
|
||||
if rep[0].Result != ResultFail {
|
||||
t.Fatalf("want fail got %s", rep[0].Result)
|
||||
}
|
||||
if rep[0].Properties["rejectedImages"] != "docker.io/library/redis:7.0" {
|
||||
t.Fatalf("rejectedImages wrong: %v", rep[0].Properties)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHarbor_Skip_DomainEmpty(t *testing.T) {
|
||||
pod := uPod("acme", "app", withContainerImages("docker.io/library/nginx:1.25"))
|
||||
rep := newHarborEvaluator("", nil).Evaluate(context.Background(), nil, pod)
|
||||
if rep[0].Result != ResultSkip {
|
||||
t.Fatalf("want skip got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHarbor_Skip_NoContainers(t *testing.T) {
|
||||
pod := uPod("acme", "app") // empty containers slice
|
||||
rep := newHarborEvaluator("harbor.openova.io", nil).Evaluate(context.Background(), nil, pod)
|
||||
if rep[0].Result != ResultSkip {
|
||||
t.Fatalf("want skip got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func newHarborEvaluator(domain string, allowed []string) *HarborEvaluator {
|
||||
cfg := Config{
|
||||
Logger: quietLogger(),
|
||||
HarborDomain: domain,
|
||||
HarborAllowedPrefixes: allowed,
|
||||
}
|
||||
cfg.defaults()
|
||||
return NewHarborEvaluator(cfg)
|
||||
}
|
||||
|
||||
// ── Flux evaluator ───────────────────────────────────────────────
|
||||
|
||||
func TestFlux_Pass_LabelOnPod(t *testing.T) {
|
||||
pod := uPod("acme", "app", withLabels(map[string]string{
|
||||
"app.kubernetes.io/managed-by": "flux",
|
||||
}))
|
||||
snap := &fakeSnapshot{}
|
||||
rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultPass {
|
||||
t.Fatalf("want pass got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlux_Pass_HelmReleaseOwnerRef(t *testing.T) {
|
||||
pod := uPod("acme", "app", withFluxOwner())
|
||||
snap := &fakeSnapshot{}
|
||||
rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultPass {
|
||||
t.Fatalf("want pass got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlux_Pass_ControllerIsFluxOwned(t *testing.T) {
|
||||
dep := uDeployment("acme", "frontend", true)
|
||||
rs := uReplicaSet("acme", "frontend-7c5f", "frontend")
|
||||
pod := uPod("acme", "frontend-pod", withOwner("ReplicaSet", "frontend-7c5f"))
|
||||
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
|
||||
"deployment": {dep},
|
||||
"replicaset": {rs},
|
||||
}}
|
||||
rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultPass {
|
||||
t.Fatalf("want pass got %s (%s)", rep[0].Result, rep[0].Message)
|
||||
}
|
||||
if rep[0].Properties["detection"] != "controller-flux-owned" {
|
||||
t.Fatalf("want controller-flux-owned detection got %v", rep[0].Properties)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlux_Fail_NoLabelNoOwnerRef(t *testing.T) {
|
||||
dep := uDeployment("acme", "frontend", false)
|
||||
rs := uReplicaSet("acme", "frontend-7c5f", "frontend")
|
||||
pod := uPod("acme", "frontend-pod", withOwner("ReplicaSet", "frontend-7c5f"))
|
||||
snap := &fakeSnapshot{by: map[string][]*unstructured.Unstructured{
|
||||
"deployment": {dep},
|
||||
"replicaset": {rs},
|
||||
}}
|
||||
rep := newFluxEvaluator().Evaluate(context.Background(), snap, pod)
|
||||
if rep[0].Result != ResultFail {
|
||||
t.Fatalf("want fail got %s", rep[0].Result)
|
||||
}
|
||||
}
|
||||
|
||||
func newFluxEvaluator() *FluxEvaluator {
|
||||
cfg := Config{Logger: quietLogger()}
|
||||
cfg.defaults()
|
||||
return NewFluxEvaluator(cfg)
|
||||
}
|
||||
|
||||
// ── EvaluateAll concatenation ────────────────────────────────────
|
||||
|
||||
func TestEvaluateAll_ConcatenatesAndPreservesOrder(t *testing.T) {
|
||||
pod := uPod("acme", "app", withContainerImages("harbor.openova.io/proxy/x:1"))
|
||||
snap := &fakeSnapshot{}
|
||||
|
||||
cfg := Config{Logger: quietLogger(), HarborDomain: "harbor.openova.io"}
|
||||
cfg.defaults()
|
||||
evals := []Evaluator{
|
||||
NewHarborEvaluator(cfg),
|
||||
NewFluxEvaluator(cfg),
|
||||
}
|
||||
rows := EvaluateAll(context.Background(), snap, pod, evals)
|
||||
if len(rows) != 2 {
|
||||
t.Fatalf("want 2 rows got %d", len(rows))
|
||||
}
|
||||
if rows[0].Policy != "harbor-proxy-pull" || rows[1].Policy != "flux-managed" {
|
||||
t.Fatalf("order not preserved: %v %v", rows[0].Policy, rows[1].Policy)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAll_SkipsNonPodTargets(t *testing.T) {
|
||||
dep := uDeployment("acme", "x", false)
|
||||
cfg := Config{Logger: quietLogger(), HarborDomain: "harbor.openova.io"}
|
||||
cfg.defaults()
|
||||
rows := EvaluateAll(context.Background(), &fakeSnapshot{}, dep, []Evaluator{
|
||||
NewHarborEvaluator(cfg),
|
||||
NewFluxEvaluator(cfg),
|
||||
})
|
||||
if len(rows) != 0 {
|
||||
t.Fatalf("Pod-only evaluators should ignore Deployments — got %d rows", len(rows))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Engine ───────────────────────────────────────────────────────
|
||||
|
||||
func TestEngine_PublishesOnSubscribedEvent(t *testing.T) {
|
||||
rec := &recorder{}
|
||||
pod := uPod("acme", "app", withContainerImages("harbor.openova.io/proxy/x:1"))
|
||||
|
||||
eventCh := make(chan eventLite, 4)
|
||||
cfg := Config{
|
||||
Logger: quietLogger(),
|
||||
HarborDomain: "harbor.openova.io",
|
||||
TickInterval: 0, // event-only path for this test
|
||||
Now: func() time.Time { return time.Unix(1700000000, 0) },
|
||||
}
|
||||
eng, err := NewEngine(cfg,
|
||||
[]Evaluator{NewHarborEvaluator(cfg)},
|
||||
rec,
|
||||
func(_ string) (Snapshot, []string, error) { return &fakeSnapshot{}, nil, nil },
|
||||
func(_ map[string]struct{}) (<-chan eventLite, func()) {
|
||||
return eventCh, func() {}
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEngine: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() { _ = eng.Run(ctx); close(done) }()
|
||||
|
||||
eventCh <- eventLite{Cluster: "alpha", Kind: "pod", Object: pod}
|
||||
|
||||
deadline := time.After(2 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-deadline:
|
||||
t.Fatalf("recorder never saw the event-driven publish")
|
||||
default:
|
||||
if len(rec.Snapshot()) >= 1 {
|
||||
cancel()
|
||||
<-done
|
||||
if rec.Snapshot()[0].cluster != "alpha" {
|
||||
t.Fatalf("cluster id not propagated")
|
||||
}
|
||||
if rec.Snapshot()[0].report.Policy != "harbor-proxy-pull" {
|
||||
t.Fatalf("unexpected policy %q", rec.Snapshot()[0].report.Policy)
|
||||
}
|
||||
if rec.Snapshot()[0].report.Time.IsZero() {
|
||||
t.Fatalf("report timestamp not stamped by engine")
|
||||
}
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_TickerEvaluatesAllPodsAcrossClusters(t *testing.T) {
|
||||
rec := &recorder{}
|
||||
pod1 := uPod("acme", "p1", withContainerImages("harbor.openova.io/proxy/x:1"))
|
||||
pod2 := uPod("widgets", "p2", withContainerImages("docker.io/library/nginx:1"))
|
||||
|
||||
snaps := map[string]*fakeSnapshot{
|
||||
"alpha": {by: map[string][]*unstructured.Unstructured{"pod": {pod1}}},
|
||||
"beta": {by: map[string][]*unstructured.Unstructured{"pod": {pod2}}},
|
||||
}
|
||||
eventCh := make(chan eventLite, 1)
|
||||
|
||||
cfg := Config{
|
||||
Logger: quietLogger(),
|
||||
HarborDomain: "harbor.openova.io",
|
||||
TickInterval: 50 * time.Millisecond,
|
||||
Now: func() time.Time { return time.Unix(1700000000, 0) },
|
||||
}
|
||||
eng, err := NewEngine(cfg,
|
||||
[]Evaluator{NewHarborEvaluator(cfg)},
|
||||
rec,
|
||||
func(id string) (Snapshot, []string, error) {
|
||||
if id == "" {
|
||||
return nil, []string{"alpha", "beta"}, nil
|
||||
}
|
||||
s, ok := snaps[id]
|
||||
if !ok {
|
||||
return nil, nil, errors.New("no snap")
|
||||
}
|
||||
return s, nil, nil
|
||||
},
|
||||
func(_ map[string]struct{}) (<-chan eventLite, func()) {
|
||||
return eventCh, func() {}
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEngine: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
|
||||
defer cancel()
|
||||
done := make(chan struct{})
|
||||
go func() { _ = eng.Run(ctx); close(done) }()
|
||||
|
||||
deadline := time.After(1 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-deadline:
|
||||
t.Fatalf("ticker never produced both pass + fail rows; got %v", rec.Snapshot())
|
||||
default:
|
||||
entries := rec.Snapshot()
|
||||
seenPass, seenFail := false, false
|
||||
for _, e := range entries {
|
||||
if e.cluster == "alpha" && e.report.Result == ResultPass {
|
||||
seenPass = true
|
||||
}
|
||||
if e.cluster == "beta" && e.report.Result == ResultFail {
|
||||
seenFail = true
|
||||
}
|
||||
}
|
||||
if seenPass && seenFail {
|
||||
cancel()
|
||||
<-done
|
||||
return
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_RejectsInvalidConfig(t *testing.T) {
|
||||
cases := map[string]Config{
|
||||
"missing-logger": {},
|
||||
}
|
||||
for name, cfg := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
_, err := NewEngine(cfg, []Evaluator{}, &recorder{}, nil, nil)
|
||||
if err == nil {
|
||||
t.Fatalf("want error for %q, got nil", name)
|
||||
}
|
||||
})
|
||||
}
|
||||
// Logger-OK but missing other deps → still error.
|
||||
cfgOK := Config{Logger: quietLogger()}
|
||||
if _, err := NewEngine(cfgOK, []Evaluator{}, nil, nil, nil); err == nil {
|
||||
t.Fatalf("missing publisher should error")
|
||||
}
|
||||
if _, err := NewEngine(cfgOK, []Evaluator{}, &recorder{}, nil, nil); err == nil {
|
||||
t.Fatalf("missing resolveSnapshot should error")
|
||||
}
|
||||
if _, err := NewEngine(cfgOK, []Evaluator{}, &recorder{},
|
||||
func(string) (Snapshot, []string, error) { return nil, nil, nil },
|
||||
nil); err == nil {
|
||||
t.Fatalf("missing subscribe should error")
|
||||
}
|
||||
if _, err := NewEngine(cfgOK, nil, &recorder{},
|
||||
func(string) (Snapshot, []string, error) { return nil, nil, nil },
|
||||
func(map[string]struct{}) (<-chan eventLite, func()) { return nil, nil }); err == nil {
|
||||
t.Fatalf("empty evaluator slice should error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_DoubleRunRefused(t *testing.T) {
|
||||
cfg := Config{Logger: quietLogger()}
|
||||
cfg.defaults()
|
||||
rec := &recorder{}
|
||||
eventCh := make(chan eventLite)
|
||||
eng, err := NewEngine(cfg, []Evaluator{NewFluxEvaluator(cfg)}, rec,
|
||||
func(string) (Snapshot, []string, error) { return &fakeSnapshot{}, nil, nil },
|
||||
func(map[string]struct{}) (<-chan eventLite, func()) {
|
||||
return eventCh, func() {}
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("NewEngine: %v", err)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() { _ = eng.Run(ctx) }()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
if err := eng.Run(context.Background()); err == nil {
|
||||
t.Fatalf("second Run should error")
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
|
||||
// Sanity that the package's helper conversions don't drop fields.
|
||||
func TestEventLiteFromUnstructured(t *testing.T) {
|
||||
pod := uPod("ns", "n")
|
||||
ev := EventLiteFromUnstructured("alpha", "pod", pod)
|
||||
if ev.Cluster != "alpha" || ev.Kind != "pod" || ev.Object != pod {
|
||||
t.Fatalf("EventLiteFromUnstructured dropped a field: %+v", ev)
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,165 @@
|
||||
// flux.go — Flux-managed evaluator.
|
||||
//
|
||||
// EPIC-1 (#1096) §4.3 row "Flux-managed (GitOps)".
|
||||
//
|
||||
// Logic (per `02-W-watcher-extension.md` brief):
|
||||
//
|
||||
// - Pass if the target carries the well-known
|
||||
// `app.kubernetes.io/managed-by: flux` label.
|
||||
// - Pass if any ownerReference has APIVersion containing
|
||||
// `helm.toolkit.fluxcd.io` (HelmRelease) or
|
||||
// `kustomize.toolkit.fluxcd.io` (Kustomization).
|
||||
// - Fail otherwise.
|
||||
//
|
||||
// The evaluator runs against Pods (the per-event trigger path) but
|
||||
// returns a SyntheticReport that points at the Pod's controller
|
||||
// owner (Deployment / StatefulSet / DaemonSet) when one is
|
||||
// resolvable. This matches the rest of the pipeline — score
|
||||
// aggregation rolls up by workload, not by individual Pod.
|
||||
//
|
||||
// Pods that are themselves Flux-owned (rare but possible — Flux can
|
||||
// manage a static Pod manifest) also pass.
|
||||
//
|
||||
// Both label key + Flux-owner-suffix are configurable via
|
||||
// Config.FluxManagedByLabel / Config.FluxManagedByValue. Per
|
||||
// docs/INVIOLABLE-PRINCIPLES.md #4.
|
||||
package evaluators
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
// FluxEvaluator implements `policy=flux-managed`.
|
||||
type FluxEvaluator struct {
|
||||
ManagedByLabel string
|
||||
ManagedByValue string
|
||||
}
|
||||
|
||||
// NewFluxEvaluator builds a FluxEvaluator from cfg.
|
||||
func NewFluxEvaluator(cfg Config) *FluxEvaluator {
|
||||
return &FluxEvaluator{
|
||||
ManagedByLabel: cfg.FluxManagedByLabel,
|
||||
ManagedByValue: cfg.FluxManagedByValue,
|
||||
}
|
||||
}
|
||||
|
||||
func (FluxEvaluator) Name() string { return "flux-managed" }
|
||||
|
||||
func (f *FluxEvaluator) Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport {
|
||||
if !isPod(target) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 1. Direct check on the Pod itself.
|
||||
if f.targetIsFluxOwned(target) {
|
||||
return []SyntheticReport{{
|
||||
Policy: f.Name(),
|
||||
Rule: f.Name(),
|
||||
Result: ResultPass,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "Pod carries flux managed-by label or HelmRelease / Kustomization owner",
|
||||
Properties: map[string]string{"detection": "pod-direct"},
|
||||
}}
|
||||
}
|
||||
|
||||
// 2. Pod isn't directly Flux-managed — chase its controller.
|
||||
owner := f.lookupController(snap, target)
|
||||
if owner != nil && f.targetIsFluxOwned(owner) {
|
||||
return []SyntheticReport{{
|
||||
Policy: f.Name(),
|
||||
Rule: f.Name(),
|
||||
Result: ResultPass,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "Pod's controller " + owner.GetKind() + "/" + owner.GetName() + " is Flux-managed",
|
||||
Properties: map[string]string{
|
||||
"detection": "controller-flux-owned",
|
||||
"controller": owner.GetKind() + "/" + owner.GetName(),
|
||||
"controllerNs": owner.GetNamespace(),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
return []SyntheticReport{{
|
||||
Policy: f.Name(),
|
||||
Rule: f.Name(),
|
||||
Result: ResultFail,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "Pod and its controller carry no flux managed-by label and no Flux ownerRef",
|
||||
}}
|
||||
}
|
||||
|
||||
// targetIsFluxOwned applies the label + ownerRef check to a single
|
||||
// object. Pure function — no snapshot reads.
|
||||
func (f *FluxEvaluator) targetIsFluxOwned(target *unstructured.Unstructured) bool {
|
||||
if target == nil {
|
||||
return false
|
||||
}
|
||||
// Label check.
|
||||
if v, ok := target.GetLabels()[f.ManagedByLabel]; ok && strings.EqualFold(v, f.ManagedByValue) {
|
||||
return true
|
||||
}
|
||||
// ownerRef check — HelmRelease / Kustomization from
|
||||
// helm.toolkit.fluxcd.io / kustomize.toolkit.fluxcd.io.
|
||||
for _, ref := range target.GetOwnerReferences() {
|
||||
if strings.Contains(ref.APIVersion, "fluxcd.io") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// lookupController follows the Pod's controller ownerRef one hop
|
||||
// (Pod → ReplicaSet / StatefulSet / DaemonSet) and where applicable
|
||||
// chases the next hop (ReplicaSet → Deployment). Returns nil when
|
||||
// the controller can't be located in the snapshot.
|
||||
func (f *FluxEvaluator) lookupController(snap Snapshot, pod *unstructured.Unstructured) *unstructured.Unstructured {
|
||||
for _, ref := range pod.GetOwnerReferences() {
|
||||
switch ref.Kind {
|
||||
case "Deployment":
|
||||
return findInList(snap, "deployment", pod.GetNamespace(), ref.Name)
|
||||
case "StatefulSet":
|
||||
return findInList(snap, "statefulset", pod.GetNamespace(), ref.Name)
|
||||
case "DaemonSet":
|
||||
return findInList(snap, "daemonset", pod.GetNamespace(), ref.Name)
|
||||
case "ReplicaSet":
|
||||
rs := findInList(snap, "replicaset", pod.GetNamespace(), ref.Name)
|
||||
if rs == nil {
|
||||
return nil
|
||||
}
|
||||
// First check the RS itself — sometimes Flux
|
||||
// produces ReplicaSets directly without a Deployment.
|
||||
if f.targetIsFluxOwned(rs) {
|
||||
return rs
|
||||
}
|
||||
for _, rsOwner := range rs.GetOwnerReferences() {
|
||||
if rsOwner.Kind == "Deployment" {
|
||||
return findInList(snap, "deployment", pod.GetNamespace(), rsOwner.Name)
|
||||
}
|
||||
}
|
||||
return rs
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// findInList walks the snapshot and returns the first object matching
|
||||
// (kind, namespace, name). Returns nil on miss or list error.
|
||||
func findInList(snap Snapshot, kindName, namespace, name string) *unstructured.Unstructured {
|
||||
list, err := snap.List(kindName, labels.Everything())
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
for _, obj := range list {
|
||||
if obj.GetNamespace() == namespace && obj.GetName() == name {
|
||||
return obj
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -0,0 +1,124 @@
|
||||
// harbor.go — image-via-Harbor-proxy evaluator.
|
||||
//
|
||||
// EPIC-1 (#1096) §4.3 row "Images via Harbor proxy".
|
||||
//
|
||||
// Logic (per `02-W-watcher-extension.md` brief):
|
||||
//
|
||||
// - For each container in the Pod (containers + initContainers +
|
||||
// ephemeralContainers), parse the image reference.
|
||||
// - Pass if image starts with `<HarborDomain>/proxy-ghcr/`,
|
||||
// `<HarborDomain>/<org>/`, or any of the operator-supplied
|
||||
// HarborAllowedPrefixes (e.g. internal mirrors).
|
||||
// - Fail if the image references docker.io, ghcr.io, quay.io, etc.
|
||||
// directly — every image must traverse the Sovereign's Harbor
|
||||
// proxy for cosign verification + air-gap fallback.
|
||||
// - When Config.HarborDomain is empty (Sovereign without Harbor
|
||||
// enabled) the evaluator skips every Pod.
|
||||
//
|
||||
// Per `feedback_never_hardcode_urls.md` the Harbor domain comes from
|
||||
// runtime config — the Sovereign provisions Harbor at
|
||||
// `harbor.<sovereign-domain>` and stamps the same value into the
|
||||
// catalyst-api's CATALYST_HARBOR_DOMAIN env. Tests inject directly
|
||||
// via Config.HarborDomain.
|
||||
package evaluators
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
)
|
||||
|
||||
// HarborEvaluator implements `policy=harbor-proxy-pull`.
|
||||
type HarborEvaluator struct {
|
||||
Domain string
|
||||
AllowedPrefixes []string
|
||||
}
|
||||
|
||||
// NewHarborEvaluator builds a HarborEvaluator from cfg.
|
||||
func NewHarborEvaluator(cfg Config) *HarborEvaluator {
|
||||
return &HarborEvaluator{
|
||||
Domain: cfg.HarborDomain,
|
||||
AllowedPrefixes: append([]string(nil), cfg.HarborAllowedPrefixes...),
|
||||
}
|
||||
}
|
||||
|
||||
func (HarborEvaluator) Name() string { return "harbor-proxy-pull" }
|
||||
|
||||
func (h *HarborEvaluator) Evaluate(ctx context.Context, _ Snapshot, target *unstructured.Unstructured) []SyntheticReport {
|
||||
if !isPod(target) {
|
||||
return nil
|
||||
}
|
||||
if h.Domain == "" {
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultSkip,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "Harbor not enabled on this Sovereign — evaluator skipped",
|
||||
}}
|
||||
}
|
||||
|
||||
imgs := containerImages(target)
|
||||
if len(imgs) == 0 {
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultSkip,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "Pod has no containers — nothing to check",
|
||||
}}
|
||||
}
|
||||
|
||||
domainPrefix := h.Domain + "/"
|
||||
rejected := []string{}
|
||||
for _, img := range imgs {
|
||||
if h.imageAccepted(img, domainPrefix) {
|
||||
continue
|
||||
}
|
||||
rejected = append(rejected, img)
|
||||
}
|
||||
if len(rejected) == 0 {
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultPass,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "all container images via Harbor proxy",
|
||||
}}
|
||||
}
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultFail,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "container images bypass Harbor proxy: " + strings.Join(rejected, ", "),
|
||||
Properties: map[string]string{
|
||||
"rejectedImages": strings.Join(rejected, ","),
|
||||
"harborDomain": h.Domain,
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// imageAccepted returns true when the image string starts with the
|
||||
// Harbor domain prefix or any of the allowed-prefix entries.
|
||||
//
|
||||
// Acceptance is BYTE-PREFIX, not substring — `harbor.evil.com/`
|
||||
// must not match `harbor.openova.io/` because of a trailing-slash
|
||||
// rule. The check is case-sensitive (image refs are case-sensitive
|
||||
// in OCI).
|
||||
func (h *HarborEvaluator) imageAccepted(img, harborPrefix string) bool {
|
||||
if strings.HasPrefix(img, harborPrefix) {
|
||||
return true
|
||||
}
|
||||
for _, p := range h.AllowedPrefixes {
|
||||
if p != "" && strings.HasPrefix(img, p) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -0,0 +1,207 @@
|
||||
// hpa.go — HPA-effective evaluator.
|
||||
//
|
||||
// EPIC-1 (#1096) §4.3 row "Autoscaler (HPA/VPA) effective".
|
||||
//
|
||||
// Logic (per `02-W-watcher-extension.md` brief):
|
||||
// - If the target is a Pod, walk owner chain Pod → ReplicaSet →
|
||||
// Deployment to identify the workload owner (StatefulSet /
|
||||
// DaemonSet are also acceptable owners). Pods owned by Jobs /
|
||||
// CronJobs / standalone are out-of-scope (skip).
|
||||
// - Find an HPA (autoscaling/v2) whose spec.scaleTargetRef points
|
||||
// at the workload owner.
|
||||
// - No HPA → result=skip (HPA isn't applicable to this workload —
|
||||
// e.g. a singleton control-plane pod).
|
||||
// - HPA present but currentReplicas < minReplicas → result=fail
|
||||
// (the autoscaler isn't keeping the floor).
|
||||
// - HPA present + currentReplicas >= minReplicas → result=pass.
|
||||
//
|
||||
// The score aggregator (slice S1) drops `skip` rows from the
|
||||
// denominator so this evaluator does NOT punish workloads that
|
||||
// legitimately have no HPA.
|
||||
package evaluators
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
// HPAEvaluator implements `policy=hpa-effective`.
|
||||
type HPAEvaluator struct {
|
||||
// MinFloor — synthetic floor below which the evaluator emits FAIL
|
||||
// even when the HPA reports happiness. Defaults to Config.HPAMinReplicas
|
||||
// at engine wiring.
|
||||
MinFloor int32
|
||||
}
|
||||
|
||||
// NewHPAEvaluator constructs an HPAEvaluator with values copied from cfg.
|
||||
func NewHPAEvaluator(cfg Config) *HPAEvaluator {
|
||||
return &HPAEvaluator{MinFloor: cfg.HPAMinReplicas}
|
||||
}
|
||||
|
||||
// Name — canonical policy id.
|
||||
func (HPAEvaluator) Name() string { return "hpa-effective" }
|
||||
|
||||
func (h *HPAEvaluator) Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport {
|
||||
if !isPod(target) {
|
||||
return nil
|
||||
}
|
||||
// 1. Resolve workload owner via Pod → ReplicaSet → Deployment chain.
|
||||
ownerKind, ownerName, ownerNamespace := resolveWorkloadOwner(snap, target)
|
||||
if ownerKind == "" || ownerName == "" {
|
||||
// Standalone Pod / Job-owned — out of scope.
|
||||
return []SyntheticReport{newSkip(h.Name(), target, "pod has no controller workload owner")}
|
||||
}
|
||||
|
||||
// 2. Find an HPA whose scaleTargetRef matches.
|
||||
hpa := findHPAFor(snap, ownerKind, ownerName, ownerNamespace)
|
||||
if hpa == nil {
|
||||
return []SyntheticReport{newSkip(h.Name(), target, fmt.Sprintf("no HPA targets %s/%s in %s", ownerKind, ownerName, ownerNamespace))}
|
||||
}
|
||||
|
||||
// 3. Compare currentReplicas vs minReplicas (and the synthetic
|
||||
// floor MinFloor).
|
||||
min, _ := hpaMinReplicas(hpa)
|
||||
current, _ := hpaCurrentReplicas(hpa)
|
||||
if min < h.MinFloor {
|
||||
min = h.MinFloor
|
||||
}
|
||||
props := map[string]string{
|
||||
"hpaName": hpa.GetName(),
|
||||
"hpaNamespace": hpa.GetNamespace(),
|
||||
"minReplicas": fmt.Sprintf("%d", min),
|
||||
"currentReplicas": fmt.Sprintf("%d", current),
|
||||
"workloadKind": ownerKind,
|
||||
"workloadName": ownerName,
|
||||
"workloadNamespace": ownerNamespace,
|
||||
}
|
||||
if current < min {
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultFail,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: fmt.Sprintf("HPA %s/%s reports currentReplicas=%d below minReplicas=%d", hpa.GetNamespace(), hpa.GetName(), current, min),
|
||||
Properties: props,
|
||||
}}
|
||||
}
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultPass,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: fmt.Sprintf("HPA %s/%s satisfies minReplicas=%d (current=%d)", hpa.GetNamespace(), hpa.GetName(), min, current),
|
||||
Properties: props,
|
||||
}}
|
||||
}
|
||||
|
||||
// resolveWorkloadOwner walks Pod → controller → (ReplicaSet →
|
||||
// Deployment) and returns (kind, name, namespace) of the top-level
|
||||
// workload. Returns ("","","") for standalone or Job-owned Pods.
|
||||
func resolveWorkloadOwner(snap Snapshot, pod *unstructured.Unstructured) (string, string, string) {
|
||||
if pod == nil {
|
||||
return "", "", ""
|
||||
}
|
||||
ns := pod.GetNamespace()
|
||||
for _, ref := range pod.GetOwnerReferences() {
|
||||
// Direct workload owners — StatefulSet, DaemonSet are
|
||||
// terminal here.
|
||||
switch ref.Kind {
|
||||
case "StatefulSet", "DaemonSet", "Deployment":
|
||||
return ref.Kind, ref.Name, ns
|
||||
case "ReplicaSet":
|
||||
// Hop through ReplicaSet to Deployment if the RS itself
|
||||
// has a Deployment owner. We look up the RS in the
|
||||
// snapshot — if it isn't cached, fall back to "ReplicaSet"
|
||||
// directly (HPA can target ReplicaSet too, rarely).
|
||||
rsList, err := snap.List("replicaset", labels.Everything())
|
||||
if err == nil {
|
||||
for _, rs := range rsList {
|
||||
if rs.GetName() == ref.Name && rs.GetNamespace() == ns {
|
||||
for _, rsOwner := range rs.GetOwnerReferences() {
|
||||
if rsOwner.Kind == "Deployment" {
|
||||
return "Deployment", rsOwner.Name, ns
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return "ReplicaSet", ref.Name, ns
|
||||
case "Job":
|
||||
// Job-owned pods are out of scope — return empty and let
|
||||
// the caller emit skip.
|
||||
return "", "", ""
|
||||
}
|
||||
}
|
||||
return "", "", ""
|
||||
}
|
||||
|
||||
// findHPAFor scans every HPA in the snapshot and returns the first
|
||||
// one whose spec.scaleTargetRef matches (kind, name, namespace).
|
||||
// Returns nil when no match.
|
||||
//
|
||||
// HPA is a namespace-scoped resource; we iterate every namespace's
|
||||
// HPA in the cache. Cost is O(hpa-count); typical clusters have
|
||||
// O(10) HPAs so this is cheap.
|
||||
//
|
||||
// Kind name "horizontalpodautoscaler" — the canonical k8scache name
|
||||
// for autoscaling/v2 HPAs (registered by the operator via the kinds
|
||||
// ConfigMap on Sovereigns that opt-in; absent on others). When the
|
||||
// kind is not registered the Snapshot.List returns an error and we
|
||||
// gracefully report nil (caller emits skip).
|
||||
func findHPAFor(snap Snapshot, ownerKind, ownerName, ownerNamespace string) *unstructured.Unstructured {
|
||||
hpaList, err := snap.List("horizontalpodautoscaler", labels.Everything())
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
for _, hpa := range hpaList {
|
||||
if hpa.GetNamespace() != ownerNamespace {
|
||||
continue
|
||||
}
|
||||
ref, found, _ := unstructured.NestedMap(hpa.Object, "spec", "scaleTargetRef")
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
kind, _ := ref["kind"].(string)
|
||||
name, _ := ref["name"].(string)
|
||||
if strings.EqualFold(kind, ownerKind) && name == ownerName {
|
||||
return hpa
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// hpaMinReplicas extracts spec.minReplicas (int32 default 1).
|
||||
func hpaMinReplicas(hpa *unstructured.Unstructured) (int32, bool) {
|
||||
v, found, err := unstructured.NestedInt64(hpa.Object, "spec", "minReplicas")
|
||||
if err != nil || !found {
|
||||
return 1, false
|
||||
}
|
||||
return int32(v), true
|
||||
}
|
||||
|
||||
// hpaCurrentReplicas extracts status.currentReplicas (int32 default 0).
|
||||
func hpaCurrentReplicas(hpa *unstructured.Unstructured) (int32, bool) {
|
||||
v, found, err := unstructured.NestedInt64(hpa.Object, "status", "currentReplicas")
|
||||
if err != nil || !found {
|
||||
return 0, false
|
||||
}
|
||||
return int32(v), true
|
||||
}
|
||||
|
||||
// newSkip — small helper so each branch above stays readable.
|
||||
func newSkip(policy string, target *unstructured.Unstructured, msg string) SyntheticReport {
|
||||
return SyntheticReport{
|
||||
Policy: policy,
|
||||
Rule: policy,
|
||||
Result: ResultSkip,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: msg,
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,127 @@
|
||||
// hubble.go — Hubble flow-observed evaluator.
|
||||
//
|
||||
// EPIC-1 (#1096) §4.3 row "Hubble flows observed (last 5m)".
|
||||
//
|
||||
// **Status — DEFERRED HUBBLE CLIENT**:
|
||||
//
|
||||
// The brief permits this evaluator to defer when the Hubble Observer
|
||||
// gRPC client (`github.com/cilium/cilium/api/v1/observer`) is not in
|
||||
// the catalyst-api dependency graph. As of this slice (W2) the
|
||||
// catalyst-api has NOT pulled `github.com/cilium/cilium` — adding it
|
||||
// would import the full Cilium operator graph (~30 transitive Go
|
||||
// modules; ~80MB on `go mod download`). The Coordinator's standing
|
||||
// rule on dep weight is to land the synthetic-row plumbing first +
|
||||
// wire the Hubble client in a follow-up slice once the score
|
||||
// aggregator has a real consumer.
|
||||
//
|
||||
// Behaviour TODAY:
|
||||
//
|
||||
// - When Config.HubbleEnabled == false (the default), the evaluator
|
||||
// emits result=skip for every Pod. Score aggregator drops skip
|
||||
// rows from the denominator → no false negatives on the score.
|
||||
// - When Config.HubbleEnabled == true (after the follow-up wires
|
||||
// the client), the evaluator queries the Observer API for the
|
||||
// last Config.HubbleLookbackWindow (default 5min) of flows
|
||||
// touching the Pod's IP. Pass when ≥1 flow seen, fail otherwise.
|
||||
//
|
||||
// Wiring of the actual Hubble client lives in the follow-up issue;
|
||||
// this file documents the contract so the consumer (slice S1) can
|
||||
// plan around the skip-then-pass transition without re-tooling.
|
||||
package evaluators
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
)
|
||||
|
||||
// HubbleEvaluator implements `policy=hubble-flows-seen`.
|
||||
type HubbleEvaluator struct {
|
||||
Enabled bool
|
||||
|
||||
// Lookback — passed to the Observer API client when Enabled.
|
||||
// Stored so a future implementation reads it; not used today.
|
||||
LookbackSeconds int64
|
||||
|
||||
// Probe — pluggable client used in the Enabled branch.
|
||||
// Production wiring (follow-up slice) sets this to a thin
|
||||
// wrapper around the cilium/cilium observer gRPC client. Tests
|
||||
// inject a fake to exercise pass/fail without the Hubble dep.
|
||||
Probe HubbleProbe
|
||||
}
|
||||
|
||||
// HubbleProbe is the pluggable interface the evaluator calls when
|
||||
// Enabled. The implementation queries the Hubble Observer API for
|
||||
// flows touching the given Pod within the given lookback window.
|
||||
type HubbleProbe interface {
|
||||
// FlowsSeen returns true if Hubble has observed at least one flow
|
||||
// to/from the Pod's IP/UID in the last `lookbackSeconds` seconds.
|
||||
// Errors fail open (pass) — the evaluator does not punish a Pod
|
||||
// for an unreachable Hubble Observer; ops can detect Hubble
|
||||
// outages via the cluster-level alarm.
|
||||
FlowsSeen(ctx context.Context, pod *unstructured.Unstructured, lookbackSeconds int64) (bool, error)
|
||||
}
|
||||
|
||||
// NewHubbleEvaluator builds a HubbleEvaluator from cfg.
|
||||
func NewHubbleEvaluator(cfg Config) *HubbleEvaluator {
|
||||
return &HubbleEvaluator{
|
||||
Enabled: cfg.HubbleEnabled,
|
||||
LookbackSeconds: int64(cfg.HubbleLookbackWindow.Seconds()),
|
||||
}
|
||||
}
|
||||
|
||||
func (HubbleEvaluator) Name() string { return "hubble-flows-seen" }
|
||||
|
||||
func (h *HubbleEvaluator) Evaluate(ctx context.Context, _ Snapshot, target *unstructured.Unstructured) []SyntheticReport {
|
||||
if !isPod(target) {
|
||||
return nil
|
||||
}
|
||||
if !h.Enabled || h.Probe == nil {
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultSkip,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "Hubble UI not enabled on this Sovereign — evaluator skipped",
|
||||
Properties: map[string]string{
|
||||
"reason": "hubble-disabled",
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
seen, err := h.Probe.FlowsSeen(ctx, target, h.LookbackSeconds)
|
||||
if err != nil {
|
||||
// Fail open — surface as warn so the score isn't depressed
|
||||
// by a transient Hubble Observer outage.
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultWarn,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "Hubble Observer unreachable — flow check inconclusive",
|
||||
Properties: map[string]string{
|
||||
"err": err.Error(),
|
||||
},
|
||||
}}
|
||||
}
|
||||
if seen {
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultPass,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "Hubble observed flows to/from Pod within lookback window",
|
||||
}}
|
||||
}
|
||||
return []SyntheticReport{{
|
||||
Policy: h.Name(),
|
||||
Rule: h.Name(),
|
||||
Result: ResultFail,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "Hubble observed no flows to/from Pod within lookback window",
|
||||
}}
|
||||
}
|
||||
@ -0,0 +1,130 @@
|
||||
// otel.go — OTel auto-instrumentation evaluator.
|
||||
//
|
||||
// EPIC-1 (#1096) §4.3 row "OTel auto-instrumentation present".
|
||||
//
|
||||
// Logic (per `02-W-watcher-extension.md` brief):
|
||||
//
|
||||
// 1. For each Pod: check container list for an `otel-collector`
|
||||
// sidecar (heuristic: image substring matches
|
||||
// Config.OTelSidecarImageMatch — default `opentelemetry-collector`).
|
||||
// If found → result=pass.
|
||||
// 2. OTel Operator auto-injection: check if the Pod has an annotation
|
||||
// with prefix `instrumentation.opentelemetry.io/inject-` whose
|
||||
// value is `true`. Combined with an Instrumentation CR existing in
|
||||
// the Pod's namespace → result=pass.
|
||||
// 3. Neither → result=fail.
|
||||
//
|
||||
// The Instrumentation CR is a namespaced opentelemetry.io/v1alpha1
|
||||
// resource. When the kind is not registered in the k8scache (the
|
||||
// bp-otel-operator chart isn't installed on this Sovereign) the
|
||||
// evaluator falls back to sidecar-only detection — annotation alone
|
||||
// without the operator running is meaningless.
|
||||
package evaluators
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
// OTelEvaluator implements `policy=otel-injected`.
|
||||
type OTelEvaluator struct {
|
||||
SidecarImageMatch string
|
||||
InjectAnnotationPrefix string
|
||||
InstrumentationKindName string
|
||||
}
|
||||
|
||||
// NewOTelEvaluator builds an OTelEvaluator from cfg.
|
||||
func NewOTelEvaluator(cfg Config) *OTelEvaluator {
|
||||
return &OTelEvaluator{
|
||||
SidecarImageMatch: cfg.OTelSidecarImageMatch,
|
||||
InjectAnnotationPrefix: cfg.OTelInjectAnnotationPrefix,
|
||||
InstrumentationKindName: cfg.OTelInstrumentationKind,
|
||||
}
|
||||
}
|
||||
|
||||
func (OTelEvaluator) Name() string { return "otel-injected" }
|
||||
|
||||
func (o *OTelEvaluator) Evaluate(ctx context.Context, snap Snapshot, target *unstructured.Unstructured) []SyntheticReport {
|
||||
if !isPod(target) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 1. Sidecar check — substring match against container images.
|
||||
for _, img := range containerImages(target) {
|
||||
if o.SidecarImageMatch != "" && strings.Contains(img, o.SidecarImageMatch) {
|
||||
return []SyntheticReport{{
|
||||
Policy: o.Name(),
|
||||
Rule: o.Name(),
|
||||
Result: ResultPass,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "OTel collector sidecar detected (image=" + img + ")",
|
||||
Properties: map[string]string{
|
||||
"detection": "sidecar",
|
||||
"image": img,
|
||||
},
|
||||
}}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Auto-inject path — Pod annotation + Instrumentation CR in
|
||||
// the same namespace.
|
||||
annots := target.GetAnnotations()
|
||||
injectAnnotation := ""
|
||||
for k, v := range annots {
|
||||
if strings.HasPrefix(k, o.InjectAnnotationPrefix) && strings.EqualFold(v, "true") {
|
||||
injectAnnotation = k
|
||||
break
|
||||
}
|
||||
}
|
||||
if injectAnnotation != "" {
|
||||
// Look up Instrumentation CR in the same namespace.
|
||||
instList, err := snap.List(o.InstrumentationKindName, labels.Everything())
|
||||
if err == nil {
|
||||
for _, inst := range instList {
|
||||
if inst.GetNamespace() == target.GetNamespace() {
|
||||
return []SyntheticReport{{
|
||||
Policy: o.Name(),
|
||||
Rule: o.Name(),
|
||||
Result: ResultPass,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "OTel auto-inject annotation present + Instrumentation CR in namespace",
|
||||
Properties: map[string]string{
|
||||
"detection": "auto-inject",
|
||||
"annotation": injectAnnotation,
|
||||
"instrumentationName": inst.GetName(),
|
||||
},
|
||||
}}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Annotation present but no Instrumentation CR — operator
|
||||
// not installed. Surface as fail with a hint.
|
||||
return []SyntheticReport{{
|
||||
Policy: o.Name(),
|
||||
Rule: o.Name(),
|
||||
Result: ResultFail,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "OTel auto-inject annotation set but no Instrumentation CR in namespace — operator missing",
|
||||
Properties: map[string]string{
|
||||
"detection": "auto-inject-orphan",
|
||||
"annotation": injectAnnotation,
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// 3. Neither path matched.
|
||||
return []SyntheticReport{{
|
||||
Policy: o.Name(),
|
||||
Rule: o.Name(),
|
||||
Result: ResultFail,
|
||||
Resource: resourceFor(target),
|
||||
Namespace: target.GetNamespace(),
|
||||
Message: "no OTel collector sidecar and no auto-inject annotation",
|
||||
}}
|
||||
}
|
||||
@ -72,6 +72,19 @@ const (
|
||||
EventDeleted EventType = "DELETED"
|
||||
)
|
||||
|
||||
// KindComplianceEvaluator is the canonical kind name carried on every
|
||||
// synthetic PolicyReport-shaped event published by evaluators in
|
||||
// internal/k8scache/evaluators (EPIC-1 #1096 slice W2). The score
|
||||
// aggregator (slice S1) subscribes to this kind alongside the upstream
|
||||
// `policyreport` / `clusterpolicyreport` kinds — a single SSE stream,
|
||||
// uniform shape.
|
||||
//
|
||||
// The synthetic kind is NOT registered as a watch target — it has no
|
||||
// GVR and the factory never LISTs a real K8s resource for it.
|
||||
// Subscribers filter on it via the same `?kinds=` query parameter that
|
||||
// gates pods, services, etc.
|
||||
const KindComplianceEvaluator = "compliance-evaluator"
|
||||
|
||||
// Event is a single K8s state-change notification. Encoded directly
|
||||
// into the SSE frame — field names are part of the public wire
|
||||
// contract and must stay stable.
|
||||
@ -637,6 +650,21 @@ func (f *Factory) dispatch(cs *clusterState, k Kind, t EventType, obj any) {
|
||||
f.fanout(ev)
|
||||
}
|
||||
|
||||
// Publish injects a non-watch-derived Event onto the SSE fanout. Used
|
||||
// by internal/k8scache/evaluators (EPIC-1 #1096 slice W2) to emit
|
||||
// synthetic PolicyReport-shaped events on the
|
||||
// `compliance-evaluator` kind. The event MUST set Cluster, Kind,
|
||||
// Type, Object, At — the factory does NOT mutate or redact synthetic
|
||||
// payloads (the producer owns them).
|
||||
//
|
||||
// Concurrency: safe for concurrent callers; the underlying fanout
|
||||
// holds subMu briefly to snapshot the subscriber set, then sends
|
||||
// non-blocking with the same drop-oldest backpressure as informer
|
||||
// events.
|
||||
func (f *Factory) Publish(ev Event) {
|
||||
f.fanout(ev)
|
||||
}
|
||||
|
||||
// fanout — non-blocking send to each subscriber. On full channel we
|
||||
// drop the OLDEST event (drain one then push) so the consumer
|
||||
// catches up automatically without the producer waiting.
|
||||
|
||||
@ -116,6 +116,8 @@ func TestDefaultKinds_GraphAndDashboardSurface(t *testing.T) {
|
||||
"persistentvolume", "replicaset", "endpointslice",
|
||||
// dashboard color_by=utilization depends on this (#1084)
|
||||
"podmetrics",
|
||||
// EPIC-1 (#1096) compliance — Kyverno PolicyReports.
|
||||
"policyreport", "clusterpolicyreport",
|
||||
}
|
||||
for _, name := range mandatory {
|
||||
if _, ok := r.Get(name); !ok {
|
||||
@ -603,3 +605,152 @@ func itoa(n int) string {
|
||||
// keep corev1 referenced to prevent the import being elided when this
|
||||
// file changes.
|
||||
var _ = corev1.NamespaceDefault
|
||||
|
||||
// ── EPIC-1 (#1096) — PolicyReport SSE fanout ─────────────────────
|
||||
|
||||
// newPolicyReport returns a tiny unstructured wgpolicyk8s.io PolicyReport.
|
||||
// Mirrors the schema produced by the Kyverno reports controller — fields
|
||||
// kept minimal because the score aggregator (slice S1) reads only
|
||||
// `metadata.{name,namespace,labels,ownerReferences}` and `results[]`.
|
||||
//
|
||||
// Results pass through DeepCopyJSON so the slice element type must be
|
||||
// `any` (not `map[string]any`) — DeepCopyJSON refuses unknown concrete
|
||||
// slice element types.
|
||||
func newPolicyReport(ns, name string, results []any) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{Object: map[string]any{
|
||||
"apiVersion": "wgpolicyk8s.io/v1alpha2",
|
||||
"kind": "PolicyReport",
|
||||
"metadata": map[string]any{
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
"resourceVersion": "1",
|
||||
},
|
||||
"results": results,
|
||||
}}
|
||||
}
|
||||
|
||||
func newClusterPolicyReport(name string, results []any) *unstructured.Unstructured {
|
||||
return &unstructured.Unstructured{Object: map[string]any{
|
||||
"apiVersion": "wgpolicyk8s.io/v1alpha2",
|
||||
"kind": "ClusterPolicyReport",
|
||||
"metadata": map[string]any{
|
||||
"name": name,
|
||||
"resourceVersion": "1",
|
||||
},
|
||||
"results": results,
|
||||
}}
|
||||
}
|
||||
|
||||
// policyReportRegistry — minimal registry with the two PolicyReport
|
||||
// kinds. Used by the W1 fanout test below.
|
||||
func policyReportRegistry() *Registry {
|
||||
r := NewRegistry()
|
||||
_ = r.Add(Kind{
|
||||
Name: "policyreport",
|
||||
GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"},
|
||||
Namespaced: true,
|
||||
})
|
||||
_ = r.Add(Kind{
|
||||
Name: "clusterpolicyreport",
|
||||
GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"},
|
||||
Namespaced: false,
|
||||
})
|
||||
return r
|
||||
}
|
||||
|
||||
// fakePolicyReportClients — like fakeClients but seeds the discovery
|
||||
// scheme with the wgpolicyk8s.io types so the dynamic informer's LIST +
|
||||
// WATCH succeed against the in-memory store.
|
||||
func fakePolicyReportClients(objs ...runtime.Object) (*dynamicfake.FakeDynamicClient, clientgokubernetes.Interface) {
|
||||
scheme := runtime.NewScheme()
|
||||
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "PolicyReportList"}, &unstructured.UnstructuredList{})
|
||||
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "PolicyReport"}, &unstructured.Unstructured{})
|
||||
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "ClusterPolicyReportList"}, &unstructured.UnstructuredList{})
|
||||
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "wgpolicyk8s.io", Version: "v1alpha2", Kind: "ClusterPolicyReport"}, &unstructured.Unstructured{})
|
||||
gvrToListKind := map[schema.GroupVersionResource]string{
|
||||
{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"}: "PolicyReportList",
|
||||
{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"}: "ClusterPolicyReportList",
|
||||
}
|
||||
dyn := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrToListKind, objs...)
|
||||
core := kfake.NewSimpleClientset()
|
||||
return dyn, core
|
||||
}
|
||||
|
||||
// TestPolicyReport_FlowsThroughSSEFanout asserts W1 — applying a
|
||||
// PolicyReport CR fires the same ADD event the architecture-graph kinds
|
||||
// fire, with no special-case handling. Coverage:
|
||||
// - PolicyReport (namespace-scoped)
|
||||
// - ClusterPolicyReport (cluster-scoped) on the same factory
|
||||
// - Subscriber filtered to compliance kinds receives both
|
||||
func TestPolicyReport_FlowsThroughSSEFanout(t *testing.T) {
|
||||
dyn, core := fakePolicyReportClients()
|
||||
cfg := Config{
|
||||
Logger: quietLogger(),
|
||||
Registry: policyReportRegistry(),
|
||||
Clusters: []ClusterRef{
|
||||
{ID: "alpha", DynamicClient: dyn, CoreClient: core},
|
||||
},
|
||||
}
|
||||
f, err := NewFactory(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("NewFactory: %v", err)
|
||||
}
|
||||
defer f.Stop()
|
||||
|
||||
ch, unsub := f.Subscribe("operator", map[string]struct{}{
|
||||
"policyreport": {},
|
||||
"clusterpolicyreport": {},
|
||||
})
|
||||
defer unsub()
|
||||
|
||||
if err := f.Start(context.Background()); err != nil {
|
||||
t.Fatalf("Start: %v", err)
|
||||
}
|
||||
|
||||
// Apply a namespaced PolicyReport.
|
||||
results := []any{
|
||||
map[string]any{
|
||||
"policy": "multi-replica-drainability",
|
||||
"rule": "multi-replica-drainability",
|
||||
"result": "fail",
|
||||
"message": "Deployment has only 1 replica",
|
||||
},
|
||||
}
|
||||
prGVR := schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"}
|
||||
if _, err := dyn.Resource(prGVR).Namespace("acme").Create(context.Background(), newPolicyReport("acme", "pod-frontend-7c5f", results), metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("create PolicyReport: %v", err)
|
||||
}
|
||||
|
||||
// Apply a cluster-scoped ClusterPolicyReport.
|
||||
cprGVR := schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"}
|
||||
if _, err := dyn.Resource(cprGVR).Create(context.Background(), newClusterPolicyReport("ns-acme", results), metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("create ClusterPolicyReport: %v", err)
|
||||
}
|
||||
|
||||
gotPR, gotCPR := false, false
|
||||
timeout := time.After(2 * time.Second)
|
||||
for !(gotPR && gotCPR) {
|
||||
select {
|
||||
case ev, ok := <-ch:
|
||||
if !ok {
|
||||
t.Fatalf("subscriber channel closed")
|
||||
}
|
||||
if ev.Type != EventAdded {
|
||||
continue
|
||||
}
|
||||
if ev.Kind == "policyreport" && ev.Object.GetName() == "pod-frontend-7c5f" {
|
||||
// Sanity check — body survives unmodified (PolicyReport
|
||||
// is non-sensitive so redact is a no-op).
|
||||
if results, _, _ := unstructured.NestedSlice(ev.Object.Object, "results"); len(results) != 1 {
|
||||
t.Fatalf("PolicyReport.results not preserved through fanout")
|
||||
}
|
||||
gotPR = true
|
||||
}
|
||||
if ev.Kind == "clusterpolicyreport" && ev.Object.GetName() == "ns-acme" {
|
||||
gotCPR = true
|
||||
}
|
||||
case <-timeout:
|
||||
t.Fatalf("never received PolicyReport (got=%v) + ClusterPolicyReport (got=%v) ADD events", gotPR, gotCPR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -120,6 +120,20 @@ var DefaultKinds = []Kind{
|
||||
// real Sovereign ships bp-metrics-server in the platform bundle,
|
||||
// so the utilization gradient renders out of the box.
|
||||
{Name: "podmetrics", GVR: schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "pods"}, Namespaced: true},
|
||||
|
||||
// EPIC-1 (#1096) — Compliance: Kyverno PolicyReports.
|
||||
//
|
||||
// `wgpolicyk8s.io/v1alpha2/PolicyReport` is the namespace-scoped
|
||||
// per-resource compliance report Kyverno emits for every Pod /
|
||||
// Workload it audits. `ClusterPolicyReport` is the cluster-scoped
|
||||
// equivalent for cluster-scoped resources (Namespaces, Nodes,
|
||||
// CRDs, …). The score aggregator (slice S1) consumes both via the
|
||||
// same SSE fanout the architecture graph already uses — no special
|
||||
// path. The reports themselves carry no secret material (Kyverno
|
||||
// omits the offending object's data fields by design) so
|
||||
// Sensitive=false is correct.
|
||||
{Name: "policyreport", GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "policyreports"}, Namespaced: true},
|
||||
{Name: "clusterpolicyreport", GVR: schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha2", Resource: "clusterpolicyreports"}, Namespaced: false},
|
||||
}
|
||||
|
||||
// Registry is a runtime-mutable lookup keyed by the short Name. It
|
||||
|
||||
Loading…
Reference in New Issue
Block a user