feat(openova-flow): server (HTTP+SSE event router) + flux adapter (K8s informer sidecar) (#1390)

Agent #2 of 3 for OpenovaFlow. Ships the Go backend independently of
Agent #1's TS packages (@openova/flow-core + @openova/flow-canvas);
the FlowMessage JSON contract is locked between agents.

Two Go modules (separate go.mod each so the dep graphs stay decoupled):

- products/openova-flow/server/ — stateless HTTP+SSE event router.
  Map<flowId, RingBuffer<FlowMessage>>, in-memory, no DB. Endpoints:
  POST /v1/flows/{flowId}/events, GET /v1/flows/{flowId}/snapshot,
  GET /v1/flows/{flowId}/stream (SSE with 15s heartbeats + Last-Event-ID
  seq stamping), DELETE /v1/flows/{flowId}, GET /healthz, /readyz.
  Zero external Go deps (stdlib net/http). Ring cap default 4096
  (env-overridable). Locked schema validation rejects unknown envelope
  variants with 400.

- products/openova-flow/adapter-flux/ — DaemonSet sidecar that watches
  helm.toolkit.fluxcd.io/v2.HelmRelease + HelmChart CRs via
  client-go's dynamicinformer.NewFilteredDynamicSharedInformerFactory
  (canonical seam: products/catalyst/bootstrap/api/internal/k8scache/factory.go),
  maps each event to FlowMessage via a pure-transform mapper, POSTs to
  the configured openova-flow-server with exponential-backoff retry.
  Status mapping: Ready=True → succeeded, InstallFailed/UpgradeFailed/
  RetriesExhausted → failed, Progressing/Unknown/other-False → running,
  no Ready yet → pending. FlowNode.id format "{REGION_KEY}/{hrName}"
  so multi-region renders correctly. Region-aware: synthetic region
  parent FlowNode emitted on bootstrap; dependsOn entries fan-out to
  finish-to-start relationships.

Two wrapper charts under platform/openova-flow-{server,emitter}/chart/
(canonical seam: platform/qa-app/chart/ for the simple
Deployment+Service+SA shape; platform/k8s-ws-proxy/chart/ for the
DaemonSet+ClusterRole+ClusterRoleBinding shape). MIRROR-EVERYTHING:
image refs go through harbor.openova.io/proxy-ghcr/openova-io/...
Image tag + required runtime config fail-fast at chart render via
_helpers.tpl so silent ImagePullBackOff / boot crash is impossible.

Two bootstrap-kit HRs added (slots 56 + 57):
- 56-bp-openova-flow-server (dependsOn: bp-cilium, bp-cert-manager) —
  installs on primary cluster only; Cilium Gateway HTTPRoute at
  openova-flow.<sovereignFQDN> for cross-cluster ingest.
- 57-bp-openova-flow-emitter (dependsOn: bp-flux) — DaemonSet, runs
  on every cluster (mother + Sovereign + every secondary region).

scripts/expected-bootstrap-deps.yaml updated; check-bootstrap-deps.sh
audit passes (drift=0, cycles=0).

Tests (all green):
- server contract_test.go — every FlowMessage variant round-trips JSON,
  unknown/malformed variants reject. Cross-flow Triggerer/ToFlowID
  preserved.
- server server_test.go — full HTTP surface, including SSE replay+tail
  with a real httptest.Server.
- adapter mapper_test.go — every HelmRelease.status.conditions[Ready]
  transition + multi-dependsOn fan-out + family-label/heuristic + region
  fallback.

Verification done locally:
- (cd products/openova-flow/server && go build ./... && go test ./...) — PASS
- (cd products/openova-flow/adapter-flux && go build ./... && go test ./...) — PASS
- helm template platform/openova-flow-server/chart/ — renders cleanly
- helm template platform/openova-flow-emitter/chart/ — renders cleanly
- bash scripts/check-bootstrap-deps.sh — PASS (drift=0)

Agent #3 follow-ups (called out in slot 57's HelmRelease comments):
- Thread SOVEREIGN_DEPLOYMENT_ID + REGION_KEY into the
  postBuild.substitute env in infra/hetzner/cloudinit-control-plane.tftpl
  so the emitter's flowId/regionKey become per-deployment + per-region
  automatically. Today the slot uses SOVEREIGN_FQDN as the flowId
  fallback and "primary" as the regionKey default; per-Sovereign overlays
  can override pre-Agent-#3.
- catalyst-api proxy at /sovereign/api/v1/flows/{id}/stream so the
  Sovereign Console canvas hits a single in-tree origin.

Co-authored-by: e3mrah <1234567+e3mrah@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
e3mrah 2026-05-11 15:36:54 +04:00 committed by GitHub
parent 16ec3399e9
commit aaaaadf8bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 3498 additions and 0 deletions

View File

@ -0,0 +1,97 @@
# bp-openova-flow-server — Catalyst bootstrap-kit Blueprint slot 56
# (Observability / OpenovaFlow event router).
#
# Stateless HTTP+SSE event router for OpenovaFlow. Emitters
# (bp-openova-flow-emitter on every cluster, catalyst-api proxy on the
# mother) POST FlowMessage envelopes; consumers (Sovereign Console
# canvas) GET snapshots and subscribe to the SSE stream.
#
# Architecture:
# - Primary-cluster only — one Service per Sovereign, reached
# cross-region via Cilium Gateway HTTPRoute over public HTTPS.
# No NetBird required for v1.
# - In-memory ring buffer per flowId (default 4096 envelopes).
# State is lost on Pod restart; emitters re-emit snapshot on
# reconnect.
# - Workload: single Deployment, ClusterIP Service, optional
# HTTPRoute for cross-cluster reachability.
#
# Wrapper chart: platform/openova-flow-server/chart/
# Catalyst-curated values: platform/openova-flow-server/chart/values.yaml
# Reconciled by: Flux on the new Sovereign's k3s control plane.
#
# dependsOn:
# - bp-cilium — Pod network + Gateway API for the operator-
# facing HTTPRoute.
# - bp-cert-manager — TLS for openova-flow.<sovereign-fqdn>.
#
# Per docs/INVIOLABLE-PRINCIPLES.md #1 (target-state) the chart ships
# the real workload. Per #4 (never hardcode) the hostname,
# ringCapacity, and image tag are operator-driven.
---
apiVersion: source.toolkit.fluxcd.io/v1beta2
kind: HelmRepository
metadata:
name: bp-openova-flow-server
namespace: flux-system
spec:
type: oci
interval: 15m
url: oci://ghcr.io/openova-io
secretRef:
name: ghcr-pull
---
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-openova-flow-server
namespace: flux-system
spec:
interval: 15m
releaseName: openova-flow-server
# Lands in catalyst-system co-located with the rest of the
# Catalyst control-plane stack (catalyst-api / catalyst-ui / etc.).
targetNamespace: catalyst-system
dependsOn:
- name: bp-cilium
- name: bp-cert-manager
chart:
spec:
chart: bp-openova-flow-server
version: 0.1.0
sourceRef:
kind: HelmRepository
name: bp-openova-flow-server
namespace: flux-system
# Event-driven install: openova-flow-server is a single Deployment +
# Service + ServiceAccount. Helm install completes when manifests
# apply; readiness signalled via Flux dependsOn, never via
# spec.timeout watchdogs.
install:
timeout: 15m
disableWait: true
remediation:
retries: 3
upgrade:
timeout: 15m
disableWait: true
remediation:
retries: 3
# Per-Sovereign overlay surface. The Sovereign's FQDN is interpolated
# at Flux apply time via the bootstrap-kit Kustomization's
# postBuild.substitute env hook — `${SOVEREIGN_FQDN}` is replaced
# with the concrete sovereign FQDN before the HR bytes land in the
# cluster.
values:
flowServer:
enabled: true
httproute:
# Default ON — cross-cluster emitters reach this server's
# public HTTPS endpoint via the Cilium Gateway. Per-Sovereign
# overlay disables when only the in-cluster Service is needed.
enabled: true
hostname: openova-flow.${SOVEREIGN_FQDN}
gatewayRef:
name: catalyst-gateway
namespace: kube-system

View File

@ -0,0 +1,99 @@
# bp-openova-flow-emitter — Catalyst bootstrap-kit Blueprint slot 57
# (Observability / OpenovaFlow Flux adapter).
#
# Region-aware DaemonSet sidecar that watches HelmRelease + HelmChart
# CRs on the LOCAL cluster's Flux and POSTs FlowMessage envelopes to
# the configured openova-flow-server (slot 56, primary cluster only).
#
# Topology — runs on EVERY cluster (mother + primary Sovereign + every
# secondary region). The receiving server sits on the primary cluster;
# cross-cluster reachability is via the Cilium Gateway HTTPRoute over
# public HTTPS.
#
# Wrapper chart: platform/openova-flow-emitter/chart/
# Catalyst-curated values: platform/openova-flow-emitter/chart/values.yaml
# Reconciled by: Flux on the new Sovereign's k3s control plane.
#
# dependsOn:
# - bp-flux — informer needs Flux's helmrelease CRDs.
#
# Per docs/INVIOLABLE-PRINCIPLES.md #1 (target-state) the emitter runs
# from first cut on every cluster. Per #4 (never hardcode) the
# FLOW_SERVER_URL, FLOW_ID, and REGION_KEY all flow from the
# per-Sovereign overlay's substitute env.
---
apiVersion: source.toolkit.fluxcd.io/v1beta2
kind: HelmRepository
metadata:
name: bp-openova-flow-emitter
namespace: flux-system
spec:
type: oci
interval: 15m
url: oci://ghcr.io/openova-io
secretRef:
name: ghcr-pull
---
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-openova-flow-emitter
namespace: flux-system
spec:
interval: 15m
releaseName: openova-flow-emitter
targetNamespace: catalyst-system
dependsOn:
- name: bp-flux
chart:
spec:
chart: bp-openova-flow-emitter
version: 0.1.0
sourceRef:
kind: HelmRepository
name: bp-openova-flow-emitter
namespace: flux-system
install:
timeout: 15m
disableWait: true
remediation:
retries: 3
upgrade:
timeout: 15m
disableWait: true
remediation:
retries: 3
# Per-Sovereign overlay surface. ${SOVEREIGN_FQDN} is provided today
# by the bootstrap-kit Kustomization's postBuild.substitute env hook
# (see infra/hetzner/cloudinit-control-plane.tftpl).
#
# ${SOVEREIGN_DEPLOYMENT_ID} and ${REGION_KEY} are added by Agent #3's
# integration PR — they need to be threaded into the
# postBuild.substitute block in cloudinit-control-plane.tftpl alongside
# SOVEREIGN_FQDN / SOVEREIGN_LB_IP. Until Agent #3 lands that, the
# bootstrap-kit slot can be locally overridden in per-Sovereign Kustomize
# overlays to inject concrete values.
#
# FlowID — falls back to SOVEREIGN_FQDN-derived id when DEPLOYMENT_ID is
# not substituted. Agent #3 swaps this to the canonical deployment id.
# RegionKey — defaults to "primary" pre-Agent-#3; per-Sovereign overlay
# patches the value to the concrete region (e.g. "fsn1", "hel1").
values:
flowEmitter:
enabled: true
# The mothership-Harbor proxied server URL. Resolves to the
# primary cluster's HTTPRoute through the Cilium Gateway.
flowServerUrl: https://openova-flow.${SOVEREIGN_FQDN}
# Per the OpenovaFlow contract, FlowID is the runtime
# FlowInstance id — the Sovereign's deployment id is the
# natural choice (one flow per deployment). Agent #3 substitutes
# ${SOVEREIGN_DEPLOYMENT_ID} for the literal string below once the
# postBuild env wiring is in place.
flowId: ${SOVEREIGN_FQDN}
# Region the adapter operates on. Per-Sovereign overlay patches
# the value to the concrete region (e.g. "fsn1", "hel1") via the
# secondary's bootstrap-kit Kustomize overlay; Agent #3's
# integration adds substitution wiring for multi-region.
regionKey: primary
namespaceFilter: flux-system

View File

@ -71,6 +71,16 @@ resources:
# clustermesh-apiserver could not migrate from NodePort to LB on
# omantel multi-region (qa-loop iter-12 Fix #53D).
- 55-bp-hcloud-ccm.yaml
# OpenovaFlow observability cohort — slots 56/57. Three-agent split
# (Agent #1: TS @openova/flow-core + @openova/flow-canvas, Agent #2:
# Go server + flux adapter, Agent #3: bootstrap-kit + catalyst-api
# proxy integration). Slot 56 (server) installs on PRIMARY clusters
# only; per-Sovereign overlay disables on secondaries. Slot 57
# (emitter) is a DaemonSet — runs on every cluster (mother + every
# Sovereign + every secondary region) so each region's Flux events
# land in the same per-deployment flow.
- 56-bp-openova-flow-server.yaml
- 57-bp-openova-flow-emitter.yaml
# bp-newapi (slot 80) — multi-tenant LLM marketplace gateway. Sequenced
# after the W2.K1 dependency wave (cnpg/keycloak/openbao Ready) so
# NewAPI's ExternalSecret + DSN dependencies resolve on first reconcile.

View File

@ -0,0 +1,34 @@
apiVersion: v2
name: bp-openova-flow-emitter
# Catalyst-authored umbrella with no upstream Helm chart dependency —
# ships only the in-repo openova-flow-adapter-flux DaemonSet.
# Opt out of the hollow-chart guard per docs/BLUEPRINT-AUTHORING.md §11.1.
annotations:
catalyst.openova.io/no-upstream: "true"
version: 0.1.0
appVersion: "0.1.0"
description: |
Catalyst Blueprint chart for the openova-flow-adapter-flux emitter
— a region-aware DaemonSet sidecar that watches Flux HelmRelease +
HelmChart CRs on the local cluster and POSTs FlowMessage envelopes
to a configured openova-flow-server.
Runs on EVERY cluster (mother + primary Sovereign + every secondary
region). The receiving openova-flow-server runs only on the primary
cluster (slot 56). Cross-cluster reachability is via the Cilium
Gateway HTTPRoute over public HTTPS.
Per docs/INVIOLABLE-PRINCIPLES.md #4 (never hardcode) the
FLOW_SERVER_URL, FLOW_ID, REGION_KEY, and NAMESPACE_FILTER are
operator-driven via values.yaml; the chart fails fast when any
required value is empty.
Per MIRROR-EVERYTHING the image reference goes through the
mothership Harbor proxy at harbor.openova.io/proxy-ghcr/...
keywords: [catalyst, blueprint, openova-flow, observability, flux]
home: https://docs.openova.io/openova-flow
sources:
- https://github.com/openova-io/openova/tree/main/products/openova-flow/adapter-flux
maintainers:
- name: openova-platform
email: catalyst@openova.io

View File

@ -0,0 +1,61 @@
{{- define "bp-openova-flow-emitter.name" -}}
{{- default (.Chart.Name | trimPrefix "bp-") .Values.nameOverride | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- define "bp-openova-flow-emitter.workloadName" -}}
{{- default "openova-flow-emitter" .Values.flowEmitter.workloadName -}}
{{- end -}}
{{- define "bp-openova-flow-emitter.labels" -}}
helm.sh/chart: {{ printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
app.kubernetes.io/name: {{ include "bp-openova-flow-emitter.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
catalyst.openova.io/blueprint: bp-openova-flow-emitter
{{- end -}}
{{- define "bp-openova-flow-emitter.selectorLabels" -}}
app.kubernetes.io/name: {{ include "bp-openova-flow-emitter.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end -}}
{{- define "bp-openova-flow-emitter.serviceAccountName" -}}
{{- if .Values.flowEmitter.serviceAccount.create -}}
{{- default (include "bp-openova-flow-emitter.workloadName" .) .Values.flowEmitter.serviceAccount.name -}}
{{- else -}}
{{- default "default" .Values.flowEmitter.serviceAccount.name -}}
{{- end -}}
{{- end -}}
{{/*
Image-tag fail-fast — INVIOLABLE-PRINCIPLES #4a.
*/}}
{{- define "bp-openova-flow-emitter.image" -}}
{{- $tag := .Values.flowEmitter.image.tag -}}
{{- if not $tag -}}
{{- fail "bp-openova-flow-emitter: .Values.flowEmitter.image.tag is empty — SHA-pinned image required (CI populates this)" -}}
{{- end -}}
{{- printf "%s:%s" .Values.flowEmitter.image.repository $tag -}}
{{- end -}}
{{/*
Required-config fail-fast — INVIOLABLE-PRINCIPLES #1 (target-state).
The adapter must have all three of FLOW_SERVER_URL, FLOW_ID,
REGION_KEY at boot or it fails immediately. Failing at chart render
gives the operator a clear error instead of an ImagePullBackOff
silence.
*/}}
{{- define "bp-openova-flow-emitter.requireConfig" -}}
{{- if not .Values.flowEmitter.flowServerUrl -}}
{{- fail "bp-openova-flow-emitter: .Values.flowEmitter.flowServerUrl is required" -}}
{{- end -}}
{{- if not .Values.flowEmitter.flowId -}}
{{- fail "bp-openova-flow-emitter: .Values.flowEmitter.flowId is required" -}}
{{- end -}}
{{- if not .Values.flowEmitter.regionKey -}}
{{- fail "bp-openova-flow-emitter: .Values.flowEmitter.regionKey is required" -}}
{{- end -}}
{{- end -}}

View File

@ -0,0 +1,70 @@
{{- if .Values.flowEmitter.enabled -}}
{{- include "bp-openova-flow-emitter.requireConfig" . -}}
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: {{ include "bp-openova-flow-emitter.workloadName" . }}
namespace: {{ .Release.Namespace }}
labels: {{- include "bp-openova-flow-emitter.labels" . | nindent 4 }}
spec:
selector:
matchLabels: {{- include "bp-openova-flow-emitter.selectorLabels" . | nindent 6 }}
template:
metadata:
labels: {{- include "bp-openova-flow-emitter.labels" . | nindent 8 }}
spec:
serviceAccountName: {{ include "bp-openova-flow-emitter.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.flowEmitter.podSecurityContext | nindent 8 }}
{{- with .Values.flowEmitter.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.flowEmitter.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.flowEmitter.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: openova-flow-adapter-flux
image: {{ include "bp-openova-flow-emitter.image" . | quote }}
imagePullPolicy: {{ .Values.flowEmitter.image.pullPolicy | default "IfNotPresent" }}
ports:
- name: health
containerPort: {{ .Values.flowEmitter.healthPort | default 8081 }}
protocol: TCP
env:
- name: FLOW_SERVER_URL
value: {{ .Values.flowEmitter.flowServerUrl | quote }}
- name: FLOW_ID
value: {{ .Values.flowEmitter.flowId | quote }}
- name: REGION_KEY
value: {{ .Values.flowEmitter.regionKey | quote }}
- name: NAMESPACE_FILTER
value: {{ .Values.flowEmitter.namespaceFilter | quote }}
- name: EMIT_INTERVAL
value: {{ .Values.flowEmitter.emitInterval | quote }}
- name: POST_TIMEOUT
value: {{ .Values.flowEmitter.postTimeout | quote }}
- name: HEALTH_LISTEN_ADDR
value: ":{{ .Values.flowEmitter.healthPort | default 8081 }}"
livenessProbe:
httpGet:
path: /healthz
port: health
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /readyz
port: health
initialDelaySeconds: 2
periodSeconds: 5
resources:
{{- toYaml .Values.flowEmitter.resources | nindent 12 }}
securityContext:
{{- toYaml .Values.flowEmitter.containerSecurityContext | nindent 12 }}
{{- end -}}

View File

@ -0,0 +1,37 @@
{{- if .Values.flowEmitter.enabled -}}
{{- if .Values.flowEmitter.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "bp-openova-flow-emitter.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
labels: {{- include "bp-openova-flow-emitter.labels" . | nindent 4 }}
---
{{- end }}
# ClusterRole — the emitter needs read-only access to HelmRelease +
# HelmChart CRs across all namespaces (informer scope is configurable
# via NAMESPACE_FILTER but the SA still needs cluster-wide read).
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "bp-openova-flow-emitter.workloadName" . }}
labels: {{- include "bp-openova-flow-emitter.labels" . | nindent 4 }}
rules:
- apiGroups: ["helm.toolkit.fluxcd.io"]
resources: ["helmreleases", "helmcharts"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "bp-openova-flow-emitter.workloadName" . }}
labels: {{- include "bp-openova-flow-emitter.labels" . | nindent 4 }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ include "bp-openova-flow-emitter.workloadName" . }}
subjects:
- kind: ServiceAccount
name: {{ include "bp-openova-flow-emitter.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
{{- end -}}

View File

@ -0,0 +1,83 @@
# bp-openova-flow-emitter — Catalyst Blueprint chart values.
#
# Per docs/INVIOLABLE-PRINCIPLES.md:
# #1 waterfall — full DaemonSet + Service + RBAC ships at first cut.
# #4 never hardcode — flowServerUrl, flowId, regionKey, image tag.
# #4a SHA-pinned image tag; the chart fails fast on empty tag.
#
# Per MIRROR-EVERYTHING the image reference goes through the mothership
# Harbor proxy at harbor.openova.io/proxy-ghcr/...
catalystBlueprint:
upstream:
chart: ""
version: ""
repo: ""
flowEmitter:
enabled: true
# Canonical workload name. Override only when running >1 emitter
# per cluster (rare).
workloadName: openova-flow-emitter
image:
repository: harbor.openova.io/proxy-ghcr/openova-io/openova/openova-flow-adapter-flux
# SHA-pinned per INVIOLABLE-PRINCIPLES #4a. CI populates this via
# build-openova-flow-adapter-flux.yaml on every push to
# products/openova-flow/adapter-flux/** or this chart.
# Empty fails chart render via _helpers.tpl image-fail-fast.
tag: ""
pullPolicy: IfNotPresent
# ── Runtime configuration ──────────────────────────────────────────
# Per docs/INVIOLABLE-PRINCIPLES.md #4 every value is operator-driven.
# The chart fails fast when any required value is empty.
#
# Per-Sovereign overlay sets these via the bootstrap-kit HelmRelease's
# spec.values block: see clusters/_template/bootstrap-kit/57-bp-openova-flow-emitter.yaml.
flowServerUrl: "" # e.g. "https://openova-flow.<sov-fqdn>"
flowId: "" # typically the deployment id; falls back to SOVEREIGN_FQDN-derived
regionKey: "" # e.g. "fsn1"
namespaceFilter: flux-system
emitInterval: "200ms"
postTimeout: "10s"
healthPort: 8081
resources:
requests:
cpu: 25m
memory: 64Mi
limits:
cpu: 250m
memory: 192Mi
podSecurityContext:
runAsNonRoot: true
runAsUser: 1001
runAsGroup: 1001
seccompProfile:
type: RuntimeDefault
containerSecurityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: [ALL]
imagePullSecrets:
- name: ghcr-pull
serviceAccount:
create: true
name: ""
nodeSelector:
kubernetes.io/os: linux
tolerations:
# DaemonSet semantics: run on every Linux node including the
# control-plane (the control-plane runs Flux on Sovereigns).
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule

View File

@ -0,0 +1,35 @@
apiVersion: v2
name: bp-openova-flow-server
# Catalyst-authored umbrella with no upstream Helm chart dependency —
# ships only the in-repo openova-flow-server Deployment+Service.
# Opt out of the hollow-chart guard per docs/BLUEPRINT-AUTHORING.md §11.1.
annotations:
catalyst.openova.io/no-upstream: "true"
version: 0.1.0
appVersion: "0.1.0"
description: |
Catalyst Blueprint chart for the openova-flow-server — the
stateless HTTP+SSE event router that backs OpenovaFlow. Holds an
in-memory ring buffer of FlowMessage envelopes per flowId; emitters
POST events, consumers GET the snapshot or subscribe to the SSE
stream.
Per docs/INVIOLABLE-PRINCIPLES.md #1 (target-state) the chart ships
the real workload, not a stub. Per #4 (never hardcode) every
operational knob (listen port, ring capacity, image tag, replicas,
HTTPRoute hostname) flows through values.yaml.
Per MIRROR-EVERYTHING the image reference goes through the
mothership Harbor proxy at harbor.openova.io/proxy-ghcr/... — never
direct ghcr.io references.
Wired into the bootstrap-kit at slot 56 on the PRIMARY cluster
only; cross-cluster emitters reach this server's Service via the
Cilium Gateway HTTPRoute (public HTTPS — no NetBird required for v1).
keywords: [catalyst, blueprint, openova-flow, observability]
home: https://docs.openova.io/openova-flow
sources:
- https://github.com/openova-io/openova/tree/main/products/openova-flow/server
maintainers:
- name: openova-platform
email: catalyst@openova.io

View File

@ -0,0 +1,50 @@
{{- define "bp-openova-flow-server.name" -}}
{{- default (.Chart.Name | trimPrefix "bp-") .Values.nameOverride | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- define "bp-openova-flow-server.fullname" -}}
{{- if .Values.fullnameOverride -}}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}}
{{- else -}}
{{- .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- end -}}
{{- define "bp-openova-flow-server.workloadName" -}}
{{- default "openova-flow-server" .Values.flowServer.workloadName -}}
{{- end -}}
{{- define "bp-openova-flow-server.labels" -}}
helm.sh/chart: {{ printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
app.kubernetes.io/name: {{ include "bp-openova-flow-server.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
catalyst.openova.io/blueprint: bp-openova-flow-server
{{- end -}}
{{- define "bp-openova-flow-server.selectorLabels" -}}
app.kubernetes.io/name: {{ include "bp-openova-flow-server.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end -}}
{{- define "bp-openova-flow-server.serviceAccountName" -}}
{{- if .Values.flowServer.serviceAccount.create -}}
{{- default (include "bp-openova-flow-server.workloadName" .) .Values.flowServer.serviceAccount.name -}}
{{- else -}}
{{- default "default" .Values.flowServer.serviceAccount.name -}}
{{- end -}}
{{- end -}}
{{/*
Image-tag fail-fast — INVIOLABLE-PRINCIPLES #4a.
*/}}
{{- define "bp-openova-flow-server.image" -}}
{{- $tag := .Values.flowServer.image.tag -}}
{{- if not $tag -}}
{{- fail "bp-openova-flow-server: .Values.flowServer.image.tag is empty — SHA-pinned image required (CI populates this)" -}}
{{- end -}}
{{- printf "%s:%s" .Values.flowServer.image.repository $tag -}}
{{- end -}}

View File

@ -0,0 +1,60 @@
{{- if .Values.flowServer.enabled -}}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "bp-openova-flow-server.workloadName" . }}
namespace: {{ .Release.Namespace }}
labels: {{- include "bp-openova-flow-server.labels" . | nindent 4 }}
spec:
replicas: {{ .Values.flowServer.replicas | default 1 }}
selector:
matchLabels: {{- include "bp-openova-flow-server.selectorLabels" . | nindent 6 }}
template:
metadata:
labels: {{- include "bp-openova-flow-server.labels" . | nindent 8 }}
spec:
serviceAccountName: {{ include "bp-openova-flow-server.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.flowServer.podSecurityContext | nindent 8 }}
{{- with .Values.flowServer.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.flowServer.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.flowServer.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: openova-flow-server
image: {{ include "bp-openova-flow-server.image" . | quote }}
imagePullPolicy: {{ .Values.flowServer.image.pullPolicy | default "IfNotPresent" }}
ports:
- name: http
containerPort: {{ .Values.flowServer.port }}
protocol: TCP
env:
- name: FLOW_SERVER_LISTEN_ADDR
value: ":{{ .Values.flowServer.port }}"
- name: FLOW_SERVER_RING_CAPACITY
value: {{ .Values.flowServer.ringCapacity | quote }}
livenessProbe:
httpGet:
path: /healthz
port: http
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /readyz
port: http
initialDelaySeconds: 2
periodSeconds: 5
resources:
{{- toYaml .Values.flowServer.resources | nindent 12 }}
securityContext:
{{- toYaml .Values.flowServer.containerSecurityContext | nindent 12 }}
{{- end -}}

View File

@ -0,0 +1,28 @@
{{- if and .Values.flowServer.enabled .Values.flowServer.httproute.enabled -}}
{{- if not .Values.flowServer.httproute.hostname }}
{{- fail "bp-openova-flow-server: .Values.flowServer.httproute.hostname is required when httproute.enabled=true" }}
{{- end }}
{{- if not .Values.flowServer.httproute.gatewayRef.name }}
{{- fail "bp-openova-flow-server: .Values.flowServer.httproute.gatewayRef.name is required when httproute.enabled=true" }}
{{- end }}
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
name: {{ include "bp-openova-flow-server.workloadName" . }}
namespace: {{ .Release.Namespace }}
labels: {{- include "bp-openova-flow-server.labels" . | nindent 4 }}
spec:
parentRefs:
- name: {{ .Values.flowServer.httproute.gatewayRef.name }}
namespace: {{ .Values.flowServer.httproute.gatewayRef.namespace | default "kube-system" }}
hostnames:
- {{ .Values.flowServer.httproute.hostname | quote }}
rules:
- matches:
- path:
type: PathPrefix
value: /
backendRefs:
- name: {{ include "bp-openova-flow-server.workloadName" . }}
port: {{ .Values.flowServer.service.port | default 80 }}
{{- end -}}

View File

@ -0,0 +1,16 @@
{{- if .Values.flowServer.enabled -}}
apiVersion: v1
kind: Service
metadata:
name: {{ include "bp-openova-flow-server.workloadName" . }}
namespace: {{ .Release.Namespace }}
labels: {{- include "bp-openova-flow-server.labels" . | nindent 4 }}
spec:
type: {{ .Values.flowServer.service.type | default "ClusterIP" }}
selector: {{- include "bp-openova-flow-server.selectorLabels" . | nindent 4 }}
ports:
- name: http
port: {{ .Values.flowServer.service.port | default 80 }}
targetPort: {{ .Values.flowServer.service.targetPort | default 8080 }}
protocol: TCP
{{- end -}}

View File

@ -0,0 +1,8 @@
{{- if and .Values.flowServer.enabled .Values.flowServer.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "bp-openova-flow-server.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
labels: {{- include "bp-openova-flow-server.labels" . | nindent 4 }}
{{- end -}}

View File

@ -0,0 +1,83 @@
# bp-openova-flow-server — Catalyst Blueprint chart values.
#
# Per docs/INVIOLABLE-PRINCIPLES.md:
# #4 never hardcode — image tag, replicas, ring capacity, hostname
# #4a SHA-pinned image tag; the chart fails fast on empty tag
#
# Per MIRROR-EVERYTHING the image reference goes through the mothership
# Harbor proxy at harbor.openova.io/proxy-ghcr/...
catalystBlueprint:
upstream:
chart: ""
version: ""
repo: ""
flowServer:
enabled: true
# Canonical workload name. Override only when running >1 server in
# the same namespace (rare; one per primary cluster per ADR-0001 §11).
workloadName: openova-flow-server
replicas: 1
image:
# Mothership Harbor pull-through proxy. The Sovereign-local Harbor
# rewrites this prefix to its own proxy at registry-pivot time per
# platform/self-sovereign-cutover/chart/.
repository: harbor.openova.io/proxy-ghcr/openova-io/openova/openova-flow-server
# SHA-pinned per INVIOLABLE-PRINCIPLES #4a. CI populates this via
# build-openova-flow-server.yaml on every push to
# products/openova-flow/server/** or platform/openova-flow-server/chart/**.
# Empty fails chart render via _helpers.tpl image-fail-fast.
tag: ""
pullPolicy: IfNotPresent
port: 8080
# Per-flow ring buffer capacity (FlowMessage envelopes). The server
# folds the ring to a snapshot on demand; FIFO drop on overflow.
ringCapacity: 4096
service:
type: ClusterIP
port: 80
targetPort: 8080
# Cilium Gateway HTTPRoute. Disabled by default; per-Sovereign
# overlay flips on with hostname `openova-flow.<sov-fqdn>`. Server
# is reachable cross-cluster via this gateway over public HTTPS.
httproute:
enabled: false
hostname: ""
gatewayRef:
name: ""
namespace: kube-system
resources:
requests:
cpu: 25m
memory: 64Mi
limits:
cpu: 500m
memory: 256Mi
podSecurityContext:
runAsNonRoot: true
runAsUser: 1001
runAsGroup: 1001
seccompProfile:
type: RuntimeDefault
containerSecurityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: [ALL]
imagePullSecrets:
- name: ghcr-pull
serviceAccount:
create: true
name: ""
nodeSelector:
kubernetes.io/os: linux
tolerations: []

View File

@ -0,0 +1,23 @@
# openova-flow-adapter-flux — DaemonSet sidecar that watches Flux
# HelmRelease CRs and POSTs FlowMessage envelopes to openova-flow-server.
#
# Per docs/INVIOLABLE-PRINCIPLES.md #4a images are built by GitHub
# Actions and pulled through the per-Sovereign Harbor proxy. Never
# `docker build`d locally for shipment.
FROM golang:1.22-alpine AS build
WORKDIR /src
COPY go.mod go.sum ./
RUN go mod download
COPY cmd ./cmd
COPY internal ./internal
COPY test ./test
RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o /openova-flow-adapter-flux ./cmd/openova-flow-adapter-flux
FROM scratch
LABEL org.opencontainers.image.source="https://github.com/openova-io/openova"
LABEL org.opencontainers.image.description="OpenovaFlow Flux adapter — HelmRelease informer to FlowMessage emitter"
LABEL org.opencontainers.image.licenses="Apache-2.0"
COPY --from=build /openova-flow-adapter-flux /openova-flow-adapter-flux
USER 1001:1001
EXPOSE 8081
ENTRYPOINT ["/openova-flow-adapter-flux"]

View File

@ -0,0 +1,84 @@
# openova-flow-adapter-flux
Kubernetes informer sidecar that watches Flux HelmRelease + HelmChart
CRs in the local cluster, maps each state change into a FlowMessage
envelope, and POSTs to a configured `openova-flow-server`.
This is Agent #2's region-aware emitter — runs as a DaemonSet on every
cluster (mother + every Sovereign + every secondary). The receiving
server is on the primary cluster only; cross-cluster reachability comes
from Cilium Gateway over public HTTPS (no NetBird required for v1).
## Status mapping
Flux's `HelmRelease.status.conditions[type=Ready]` folds into the
`FlowNode.status` palette:
| Ready.status | Ready.reason | FlowNode.status |
|--------------|--------------------------|-----------------|
| True | *any* | `succeeded` |
| False | InstallFailed | `failed` |
| False | UpgradeFailed | `failed` |
| False | RetriesExhausted | `failed` |
| False | Progressing | `running` |
| False | *(other)* | `running` |
| Unknown | *any* | `running` |
| *(no Ready)* | — | `pending` |
## Identity & topology
- **FlowNode.id** = `{REGION_KEY}/{hr.metadata.name}` — region-aware so
multi-region renders correctly when N adapter sidecars (one per
cluster) all post to the same flowId.
- **FlowNode.family** — reads `metadata.labels[catalyst.openova.io/family]`
when present; otherwise heuristic `<name>` with `bp-` prefix stripped
(so `bp-cert-manager``cert-manager`).
- **FlowNode.region** = `REGION_KEY` env.
- **Synthetic region node** — on startup the adapter emits a
`FlowNode` whose ID == `REGION_KEY` (label "fsn1", "hel1", etc.) so
the canvas has a stable container parent. Each HR then emits a
`contains` relationship FROM the region node TO itself.
- **DependsOn relationships** — one per entry in `hr.spec.dependsOn[]`,
type `finish-to-start`, condition `on-success`.
## Env
| Name | Default | Required | Purpose |
|---------------------|----------------|----------|---------|
| `FLOW_SERVER_URL` | — | yes | Base URL of openova-flow-server. |
| `FLOW_ID` | — | yes | Runtime FlowInstance id this adapter binds to. |
| `REGION_KEY` | — | yes | Region id ("fsn1", "hel1", etc.). |
| `NAMESPACE_FILTER` | `flux-system` | no | Informer namespace scope. |
| `EMIT_INTERVAL` | `200ms` | no | Min delay between (id,status) duplicates. |
| `POST_TIMEOUT` | `10s` | no | Per-POST wall clock cap. |
| `HEALTH_LISTEN_ADDR`| `:8081` | no | Liveness/readiness listener. |
Per `docs/INVIOLABLE-PRINCIPLES.md` #4 every operational knob is
env-driven; per #4a the container image is built by GitHub Actions
and pulled through harbor.
## Build
```bash
cd products/openova-flow/adapter-flux
go build ./...
go test ./...
```
CI image: `harbor.openova.io/proxy-ghcr/openova-io/openova/openova-flow-adapter-flux:<sha>`.
## Canonical patterns this code follows
- **Informer factory**`products/catalyst/bootstrap/api/internal/k8scache/factory.go`
is the seam for `dynamicinformer.NewDynamicSharedInformerFactory` +
per-resource event handler dispatch.
- **Chart layout**`platform/qa-app/chart/` is the seam for a simple
in-house Deployment+Service+ServiceAccount chart (the mirror chart
for openova-flow-server). The DaemonSet+ClusterRole+ClusterRoleBinding
shape (mirror chart for this adapter) follows
`platform/k8s-ws-proxy/chart/`.
## Tests
- `test/mapper_test.go` — every Ready condition state + every dependsOn
fan-out + every family-label / heuristic path is asserted.

View File

@ -0,0 +1,102 @@
// openova-flow-adapter-flux — DaemonSet sidecar that watches
// HelmRelease + HelmChart CRs in the local cluster and POSTs
// FlowMessage envelopes to a configured openova-flow-server.
//
// See products/openova-flow/adapter-flux/README.md for the contract.
//
// Per docs/INVIOLABLE-PRINCIPLES.md #4 every parameter is env-driven.
package main
import (
"context"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/openova-io/openova/products/openova-flow/adapter-flux/internal/config"
"github.com/openova-io/openova/products/openova-flow/adapter-flux/internal/informer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
cfg, err := config.FromEnv()
if err != nil {
log.Error("config", "err", err)
os.Exit(1)
}
restCfg, err := loadRestConfig(log)
if err != nil {
log.Error("kubeconfig", "err", err)
os.Exit(1)
}
// Match catalyst-api factory's QPS/Burst — the cold-LIST on a
// fresh Sovereign is multi-kind and starves on client-go
// defaults.
restCfg.QPS = 25
restCfg.Burst = 50
rt, err := informer.NewRuntime(cfg, restCfg, log)
if err != nil {
log.Error("informer init", "err", err)
os.Exit(1)
}
// Tiny health endpoint so the chart's livenessProbe has a target.
mux := http.NewServeMux()
mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok\n"))
})
mux.HandleFunc("/readyz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok\n"))
})
healthAddr := envDefault("HEALTH_LISTEN_ADDR", ":8081")
healthSrv := &http.Server{Addr: healthAddr, Handler: mux, ReadHeaderTimeout: 5 * time.Second}
go func() {
log.Info("adapter-flux health listening", "addr", healthAddr)
if err := healthSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error("health listen", "err", err)
}
}()
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()
if err := rt.Start(ctx); err != nil {
log.Error("informer", "err", err)
os.Exit(1)
}
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = healthSrv.Shutdown(shutdownCtx)
}
func loadRestConfig(log *slog.Logger) (*rest.Config, error) {
// Prefer in-cluster (production DaemonSet); fall back to
// $KUBECONFIG or ~/.kube/config for local development.
if cfg, err := rest.InClusterConfig(); err == nil {
return cfg, nil
}
kc := os.Getenv("KUBECONFIG")
rules := clientcmd.NewDefaultClientConfigLoadingRules()
if kc != "" {
rules.ExplicitPath = kc
}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
rules, &clientcmd.ConfigOverrides{}).ClientConfig()
}
func envDefault(k, def string) string {
if v := os.Getenv(k); v != "" {
return v
}
return def
}

View File

@ -0,0 +1,52 @@
module github.com/openova-io/openova/products/openova-flow/adapter-flux
go 1.22.0
toolchain go1.22.10
require (
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
)
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.31.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

View File

@ -0,0 +1,158 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE=
github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogBU=
github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM=
github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA=
golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4=
gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.31.1 h1:Xe1hX/fPW3PXYYv8BlozYqw63ytA92snr96zMW9gWTU=
k8s.io/api v0.31.1/go.mod h1:sbN1g6eY6XVLeqNsZGLnI5FwVseTrZX7Fv3O26rhAaI=
k8s.io/apimachinery v0.31.1 h1:mhcUBbj7KUjaVhyXILglcVjuS4nYXiwC+KKFBgIVy7U=
k8s.io/apimachinery v0.31.1/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/client-go v0.31.1 h1:f0ugtWSbWpxHR7sjVpQwuvw9a3ZKLXX0u0itkFXufb0=
k8s.io/client-go v0.31.1/go.mod h1:sKI8871MJN2OyeqRlmA4W4KM9KBdBUpDLu/43eGemCg=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=

View File

@ -0,0 +1,82 @@
// Package config — runtime knobs from env vars. Per
// docs/INVIOLABLE-PRINCIPLES.md #4 (never hardcode) every parameter
// the adapter needs at boot is operator-driven.
package config
import (
"errors"
"fmt"
"os"
"strings"
"time"
)
// Config — resolved at process start. Immutable thereafter.
type Config struct {
// FlowServerURL — base URL of the openova-flow-server, e.g.
// "https://openova-flow.<sov-fqdn>". The adapter appends
// /v1/flows/<FlowID>/events to POST events.
FlowServerURL string
// FlowID — runtime FlowInstance id this adapter sidecar binds to.
// Typically the Sovereign deployment id ("dep-abc123") on
// post-handover chroots, or the cluster id on the mother.
FlowID string
// RegionKey — region the adapter operates on, e.g. "fsn1".
// Region-aware so multi-region renders correctly (one bubble per
// region per HR-named family).
RegionKey string
// NamespaceFilter — informer scope. Empty == all namespaces.
// Defaults to "flux-system" (canonical Flux namespace).
NamespaceFilter string
// EmitInterval — minimum delay between successive POSTs for the
// SAME (node ID, status) tuple. The informer fires per event;
// the emitter coalesces consecutive identical states into one
// POST.
EmitInterval time.Duration
// PostTimeout — max wall-clock per HTTP POST to the flow server.
PostTimeout time.Duration
}
// FromEnv parses Config out of process env. Returns an error when a
// required value is missing.
func FromEnv() (Config, error) {
c := Config{
FlowServerURL: strings.TrimRight(os.Getenv("FLOW_SERVER_URL"), "/"),
FlowID: os.Getenv("FLOW_ID"),
RegionKey: os.Getenv("REGION_KEY"),
NamespaceFilter: envDefault("NAMESPACE_FILTER", "flux-system"),
}
if c.FlowServerURL == "" {
return c, errors.New("config: FLOW_SERVER_URL is required")
}
if c.FlowID == "" {
return c, errors.New("config: FLOW_ID is required")
}
if c.RegionKey == "" {
return c, errors.New("config: REGION_KEY is required")
}
emit, err := time.ParseDuration(envDefault("EMIT_INTERVAL", "200ms"))
if err != nil {
return c, fmt.Errorf("config: bad EMIT_INTERVAL: %w", err)
}
c.EmitInterval = emit
post, err := time.ParseDuration(envDefault("POST_TIMEOUT", "10s"))
if err != nil {
return c, fmt.Errorf("config: bad POST_TIMEOUT: %w", err)
}
c.PostTimeout = post
return c, nil
}
func envDefault(k, def string) string {
v := os.Getenv(k)
if v == "" {
return def
}
return v
}

View File

@ -0,0 +1,89 @@
// Package emit — POST FlowMessage envelopes to the configured
// openova-flow-server with exponential backoff + jitter.
package emit
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"math/rand"
"net/http"
"time"
"github.com/openova-io/openova/products/openova-flow/adapter-flux/internal/types"
)
// Client posts FlowMessage envelopes. Construct one per process.
type Client struct {
baseURL string
flowID string
http *http.Client
maxRetry int
log *slog.Logger
}
// NewClient — baseURL like "https://openova-flow.<sov-fqdn>". The
// client appends "/v1/flows/<flowID>/events" on every POST.
//
// Per docs/INVIOLABLE-PRINCIPLES.md #4 the baseURL is operator-driven
// (env-supplied at boot); the FlowID is similarly env-driven.
func NewClient(baseURL, flowID string, timeout time.Duration, log *slog.Logger) *Client {
return &Client{
baseURL: baseURL,
flowID: flowID,
http: &http.Client{Timeout: timeout},
maxRetry: 5,
log: log,
}
}
// Emit POSTs one envelope, retrying with exponential backoff on
// 5xx / network error. Returns the final error (nil on success).
func (c *Client) Emit(ctx context.Context, m types.FlowMessage) error {
body, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("emit: marshal: %w", err)
}
url := fmt.Sprintf("%s/v1/flows/%s/events", c.baseURL, c.flowID)
var lastErr error
for attempt := 0; attempt < c.maxRetry; attempt++ {
if ctx.Err() != nil {
return ctx.Err()
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("emit: build req: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.http.Do(req)
if err != nil {
lastErr = err
c.log.Warn("emit: POST failed; will retry", "url", url, "attempt", attempt, "err", err)
} else {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
// Schema error — retrying won't help.
return fmt.Errorf("emit: %s -> %d (no retry)", url, resp.StatusCode)
}
lastErr = fmt.Errorf("emit: %s -> %d", url, resp.StatusCode)
c.log.Warn("emit: POST 5xx; will retry", "url", url, "attempt", attempt, "status", resp.StatusCode)
}
// Backoff: 200ms, 400ms, 800ms, 1.6s, 3.2s with ±25% jitter.
base := time.Duration(200*(1<<attempt)) * time.Millisecond
jitter := time.Duration(rand.Int63n(int64(base / 4)))
sleep := base + jitter
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(sleep):
}
}
return fmt.Errorf("emit: gave up after %d attempts: %w", c.maxRetry, lastErr)
}

View File

@ -0,0 +1,223 @@
// hr_informer.go — watches HelmRelease + HelmChart CRs in the
// configured namespace, maps each state change into a FlowMessage via
// mapper.go, and posts to the openova-flow-server via the emit.Client.
//
// Mirror canonical pattern at products/catalyst/bootstrap/api/internal/k8scache/factory.go:
// - dynamicinformer.NewDynamicSharedInformerFactory keyed off
// dynamic.NewForConfig(restCfg).
// - One informer per GVR; event handler dispatches per
// ADDED/UPDATED/DELETED.
// - Resync set to 0 — event-driven only (per
// docs/INVIOLABLE-PRINCIPLES.md #3 event-driven, no polling).
package informer
import (
"context"
"log/slog"
"sync"
"time"
"github.com/openova-io/openova/products/openova-flow/adapter-flux/internal/config"
"github.com/openova-io/openova/products/openova-flow/adapter-flux/internal/emit"
"github.com/openova-io/openova/products/openova-flow/adapter-flux/internal/types"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// HRGVR — Flux v2 HelmRelease.
var HRGVR = schema.GroupVersionResource{
Group: "helm.toolkit.fluxcd.io",
Version: "v2",
Resource: "helmreleases",
}
// HCGVR — Flux v2 HelmChart (one per HelmRelease). Currently unused
// by the mapper but watched so the adapter could surface chart-pull
// progress in a future slice.
var HCGVR = schema.GroupVersionResource{
Group: "helm.toolkit.fluxcd.io",
Version: "v2",
Resource: "helmcharts",
}
// Runtime wraps the informer goroutine + the emit client. Constructed
// from a Config + a *rest.Config (in-cluster or test fake).
type Runtime struct {
cfg config.Config
dyn dynamic.Interface
emit *emit.Client
log *slog.Logger
factory dynamicinformer.DynamicSharedInformerFactory
// dedupe — last-emitted status keyed by node ID. The informer
// fires on every Update including no-op resyncs; dedupe collapses
// (same id, same status) into a single POST.
mu sync.Mutex
lastStatus map[string]string
}
// NewRuntime constructs the adapter runtime. The caller supplies the
// REST config (typically rest.InClusterConfig() in prod, a fake in tests).
func NewRuntime(cfg config.Config, restCfg *rest.Config, log *slog.Logger) (*Runtime, error) {
dyn, err := dynamic.NewForConfig(restCfg)
if err != nil {
return nil, err
}
return &Runtime{
cfg: cfg,
dyn: dyn,
emit: emit.NewClient(cfg.FlowServerURL, cfg.FlowID, cfg.PostTimeout, log),
log: log,
factory: dynamicinformer.NewFilteredDynamicSharedInformerFactory(dyn, 0, cfg.NamespaceFilter, nil),
lastStatus: map[string]string{},
}, nil
}
// NewRuntimeForTest is the dynamic-client-injection seam used by
// mapper_test.go-adjacent integration tests. Production calls
// NewRuntime.
func NewRuntimeForTest(cfg config.Config, dyn dynamic.Interface, log *slog.Logger) *Runtime {
return &Runtime{
cfg: cfg,
dyn: dyn,
emit: emit.NewClient(cfg.FlowServerURL, cfg.FlowID, cfg.PostTimeout, log),
log: log,
factory: dynamicinformer.NewFilteredDynamicSharedInformerFactory(dyn, 0, cfg.NamespaceFilter, nil),
lastStatus: map[string]string{},
}
}
// Start kicks off the HR informer + emits the synthetic region node
// envelope so the server has a parent bubble before the first HR
// arrives. Blocks on the supplied context — returns when ctx is cancelled.
func (r *Runtime) Start(ctx context.Context) error {
// 1) Emit synthetic FlowInstance + region node up-front. The
// canvas needs the per-region container before the first HR
// upsert lands.
if err := r.bootstrap(ctx); err != nil {
r.log.Warn("informer: bootstrap emit failed; continuing", "err", err)
}
// 2) Spawn the HR informer.
hrInf := r.factory.ForResource(HRGVR).Informer()
_, err := hrInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { r.handle(ctx, obj, false) },
UpdateFunc: func(_, obj any) { r.handle(ctx, obj, false) },
DeleteFunc: func(obj any) { r.handle(ctx, obj, true) },
})
if err != nil {
return err
}
stop := make(chan struct{})
r.factory.Start(stop)
r.factory.WaitForCacheSync(stop)
r.log.Info("informer: started",
"namespace", r.cfg.NamespaceFilter,
"flowServerURL", r.cfg.FlowServerURL,
"flowID", r.cfg.FlowID,
"regionKey", r.cfg.RegionKey)
<-ctx.Done()
close(stop)
return nil
}
// bootstrap emits the FlowInstance + the per-region synthetic
// FlowNode so the canvas has a stable parent for the HR nodes
// before any HR event has flowed.
func (r *Runtime) bootstrap(ctx context.Context) error {
region := r.cfg.RegionKey
now := time.Now().UnixMilli()
flow := &types.FlowInstance{
ID: r.cfg.FlowID,
Status: "running",
StartedAt: now,
Meta: map[string]interface{}{
"emittedBy": "openova-flow-adapter-flux",
"region": region,
},
}
if err := r.emit.Emit(ctx, types.FlowMessage{Type: types.TypeUpsertFlow, Flow: flow}); err != nil {
return err
}
regionNode := BuildRegionNode(region)
regionNode.FlowID = r.cfg.FlowID
return r.emit.Emit(ctx, types.FlowMessage{
Type: types.TypeUpsertNodes,
Nodes: []types.FlowNode{regionNode},
})
}
// handle is invoked on every HR informer event. Maps to FlowMessage
// envelopes and emits, with same-status dedupe so a noisy informer
// (Flux churns conditions on every reconcile) doesn't spam the
// server.
func (r *Runtime) handle(ctx context.Context, obj any, deleted bool) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
u, ok := obj.(*unstructured.Unstructured)
if !ok {
r.log.Warn("informer: non-unstructured event payload")
return
}
if deleted {
nodeID := r.cfg.RegionKey + "/" + u.GetName()
r.mu.Lock()
delete(r.lastStatus, nodeID)
r.mu.Unlock()
if err := r.emit.Emit(ctx, types.FlowMessage{
Type: types.TypeDeleteNodes,
IDs: []string{nodeID},
}); err != nil {
r.log.Warn("informer: delete emit failed", "err", err, "id", nodeID)
}
return
}
res, ok := BuildFromHR(u, r.cfg.RegionKey)
if !ok {
return
}
res.Node.FlowID = r.cfg.FlowID
// Dedupe per (id, status).
r.mu.Lock()
last, seen := r.lastStatus[res.Node.ID]
if seen && last == res.Node.Status {
r.mu.Unlock()
// Still re-emit relationships once in a while because
// dependsOn can mutate. For v1: relationships are emitted
// every event; dedupe only the node update. (Server is
// idempotent on upsert-rels.)
if err := r.emit.Emit(ctx, types.FlowMessage{
Type: types.TypeUpsertRels,
Relationships: res.Relationships,
}); err != nil {
r.log.Warn("informer: rel emit failed", "err", err)
}
return
}
r.lastStatus[res.Node.ID] = res.Node.Status
r.mu.Unlock()
if err := r.emit.Emit(ctx, types.FlowMessage{
Type: types.TypeUpsertNodes,
Nodes: []types.FlowNode{res.Node},
}); err != nil {
r.log.Warn("informer: node emit failed", "err", err, "id", res.Node.ID)
}
if err := r.emit.Emit(ctx, types.FlowMessage{
Type: types.TypeUpsertRels,
Relationships: res.Relationships,
}); err != nil {
r.log.Warn("informer: rel emit failed", "err", err)
}
}

View File

@ -0,0 +1,184 @@
// Package informer maps Flux HelmRelease state changes to FlowMessage
// envelopes the openova-flow-server understands.
//
// This file contains the PURE TRANSFORM — no I/O, no client-go calls.
// It is unit-tested against fixture HelmRelease YAML in test/.
//
// Status mapping per the locked OpenovaFlow contract (Agent #2 brief):
//
// Ready=True → "succeeded"
// Ready=False, Reason=Progressing → "running"
// Ready=False, Reason=InstallFailed|
// UpgradeFailed|
// RetriesExhausted → "failed"
// Ready=Unknown → "running"
// No Ready condition yet → "pending"
//
// FlowNode.id = "{regionKey}/{hrName}" — region-aware so multi-region
// renders correctly when the canvas pulls bubbles from N adapter sidecars.
package informer
import (
"strings"
"github.com/openova-io/openova/products/openova-flow/adapter-flux/internal/types"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
// LabelFamily — when the operator sets this label on the HR, the
// adapter uses it verbatim as FlowNode.family. Mirrors the
// "label-driven config" rule (#4 never-hardcode).
const LabelFamily = "catalyst.openova.io/family"
// RegionContainsType — relationship type the adapter emits to group
// each HR FlowNode under the per-region synthetic parent node.
const RegionContainsType = "contains"
// DependsOnType — relationship type the adapter emits for every
// HelmRelease.spec.dependsOn entry.
const DependsOnType = "finish-to-start"
// MapResult — what BuildFromHR returns: the FlowNode for this HR + the
// edges (dependsOn fan-in + the region-contains edge).
type MapResult struct {
Node types.FlowNode
Relationships []types.Relationship
}
// BuildFromHR converts one HelmRelease (as unstructured) into a
// FlowNode + Relationship set. regionKey is the env-driven cluster id
// (e.g. "fsn1"); the FlowNode id becomes "{regionKey}/{hr.metadata.name}".
//
// Pure: no I/O, no clock, deterministic given the input.
func BuildFromHR(hr *unstructured.Unstructured, regionKey string) (MapResult, bool) {
if hr == nil {
return MapResult{}, false
}
name := hr.GetName()
if name == "" {
return MapResult{}, false
}
region := strings.TrimSpace(regionKey)
if region == "" {
region = "default"
}
node := types.FlowNode{
ID: region + "/" + name,
FlowID: "", // populated by emitter with the deployment id
Label: name,
Status: statusFromConditions(hr),
Family: ptr(familyFor(hr, name)),
Region: ptr(region),
}
// Relationship #1: contains under the synthetic region node.
rels := []types.Relationship{
{
FromID: region,
ToID: node.ID,
Type: RegionContainsType,
Condition: "always",
},
}
// Relationships from spec.dependsOn — one per entry.
deps, _, _ := unstructured.NestedSlice(hr.Object, "spec", "dependsOn")
for _, d := range deps {
m, ok := d.(map[string]interface{})
if !ok {
continue
}
depName, _ := m["name"].(string)
if strings.TrimSpace(depName) == "" {
continue
}
rels = append(rels, types.Relationship{
FromID: region + "/" + depName,
ToID: node.ID,
Type: DependsOnType,
Condition: "on-success",
})
}
return MapResult{Node: node, Relationships: rels}, true
}
// BuildRegionNode emits the synthetic per-region parent FlowNode the
// adapter sends on startup so the canvas has a stable container for
// the region's HRs. Status stays "running" — the region itself has
// no terminal lifecycle.
func BuildRegionNode(regionKey string) types.FlowNode {
if strings.TrimSpace(regionKey) == "" {
regionKey = "default"
}
r := regionKey
return types.FlowNode{
ID: regionKey,
Label: regionKey,
Status: "running",
Family: ptr("region"),
Region: &r,
}
}
// statusFromConditions — maps the HR's Ready condition to the
// FlowNode.status palette. Reads from the standard
// .status.conditions[] shape Flux v2 uses (ObservedGeneration,
// LastTransitionTime, Type, Status, Reason, Message).
func statusFromConditions(hr *unstructured.Unstructured) string {
conds, found, _ := unstructured.NestedSlice(hr.Object, "status", "conditions")
if !found || len(conds) == 0 {
return "pending"
}
for _, c := range conds {
m, ok := c.(map[string]interface{})
if !ok {
continue
}
typ, _ := m["type"].(string)
if typ != "Ready" {
continue
}
status, _ := m["status"].(string)
reason, _ := m["reason"].(string)
switch status {
case "True":
return "succeeded"
case "False":
switch reason {
case "InstallFailed", "UpgradeFailed", "RetriesExhausted":
return "failed"
case "Progressing":
return "running"
default:
// Default to "running" — a False Ready that's not in
// the failed-list typically means in-flight Flux
// retry, which the operator wants to see as
// non-terminal.
return "running"
}
case "Unknown":
return "running"
}
}
return "pending"
}
// familyFor reads the operator-set label first; falls back to the
// heuristic "<name without bp- prefix without first word>" only when
// the label is absent.
//
// Examples:
//
// bp-cert-manager → "cert-manager" (heuristic: drop "bp-")
// bp-hcloud-ccm → "hcloud-ccm" (heuristic)
// with label "platform" set → "platform"
func familyFor(hr *unstructured.Unstructured, name string) string {
labels := hr.GetLabels()
if v, ok := labels[LabelFamily]; ok && strings.TrimSpace(v) != "" {
return strings.TrimSpace(v)
}
return strings.TrimPrefix(name, "bp-")
}
func ptr[T any](v T) *T { return &v }

View File

@ -0,0 +1,62 @@
// Package types mirrors the locked FlowMessage wire contract, scoped
// to the subset the adapter EMITS (upsert-flow, upsert-nodes,
// upsert-rels, delete-nodes). The receiving end (openova-flow-server)
// owns the master copy in products/openova-flow/server/internal/types.
//
// We do NOT depend on the server module across the OCI image boundary
// because (a) two Go modules side-by-side keep the dep graphs decoupled,
// and (b) Agent #1 already locked the wire contract — re-declaring
// the types here keeps the producer side honest.
package types
// FlowInstance — runtime instance the adapter MAY emit (one per
// deployment in v1 — emitter overlay decides).
type FlowInstance struct {
ID string `json:"id"`
DefinitionID *string `json:"definitionId,omitempty"`
ParentFlowID *string `json:"parentFlowId,omitempty"`
Status string `json:"status"`
StartedAt int64 `json:"startedAt"`
EndedAt *int64 `json:"endedAt,omitempty"`
Meta map[string]interface{} `json:"meta,omitempty"`
}
// FlowNode — one node within the flow.
type FlowNode struct {
ID string `json:"id"`
FlowID string `json:"flowId"`
Label string `json:"label"`
Status string `json:"status"`
Family *string `json:"family,omitempty"`
Region *string `json:"region,omitempty"`
StartedAt *int64 `json:"startedAt,omitempty"`
EndedAt *int64 `json:"endedAt,omitempty"`
Meta map[string]interface{} `json:"meta,omitempty"`
}
// Relationship — directed edge.
type Relationship struct {
FromID string `json:"fromId"`
ToID string `json:"toId"`
FromFlowID *string `json:"fromFlowId,omitempty"`
ToFlowID *string `json:"toFlowId,omitempty"`
Type string `json:"type"`
Condition string `json:"condition,omitempty"`
Lag int64 `json:"lag,omitempty"`
}
// FlowMessage envelope variants the adapter emits.
type FlowMessage struct {
Type string `json:"type"`
Flow *FlowInstance `json:"flow,omitempty"`
Nodes []FlowNode `json:"nodes,omitempty"`
Relationships []Relationship `json:"relationships,omitempty"`
IDs []string `json:"ids,omitempty"`
}
const (
TypeUpsertFlow = "upsert-flow"
TypeUpsertNodes = "upsert-nodes"
TypeUpsertRels = "upsert-rels"
TypeDeleteNodes = "delete-nodes"
)

View File

@ -0,0 +1,256 @@
// mapper_test.go — covers every HelmRelease state → FlowNode.status
// transition that the adapter has to ship. Fixtures are inline YAML
// (no testdata files) so the test is self-contained.
package test
import (
"strings"
"testing"
"github.com/openova-io/openova/products/openova-flow/adapter-flux/internal/informer"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/yaml"
)
func parseHR(t *testing.T, raw string) *unstructured.Unstructured {
t.Helper()
js, err := yaml.YAMLToJSON([]byte(strings.TrimSpace(raw)))
if err != nil {
t.Fatalf("yaml->json: %v", err)
}
u := &unstructured.Unstructured{}
if err := u.UnmarshalJSON(js); err != nil {
t.Fatalf("unmarshal: %v", err)
}
return u
}
func TestMapper_Ready_True(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-cert-manager
namespace: flux-system
spec:
dependsOn:
- name: bp-cilium
status:
conditions:
- type: Ready
status: "True"
reason: ReconciliationSucceeded
`)
res, ok := informer.BuildFromHR(hr, "fsn1")
if !ok {
t.Fatal("BuildFromHR returned not-ok")
}
if res.Node.Status != "succeeded" {
t.Fatalf("status: %s", res.Node.Status)
}
if res.Node.ID != "fsn1/bp-cert-manager" {
t.Fatalf("id: %s", res.Node.ID)
}
if res.Node.Family == nil || *res.Node.Family != "cert-manager" {
t.Fatalf("family: %+v", res.Node.Family)
}
if res.Node.Region == nil || *res.Node.Region != "fsn1" {
t.Fatalf("region: %+v", res.Node.Region)
}
// One "contains" rel + one dependsOn rel.
if len(res.Relationships) != 2 {
t.Fatalf("rels=%d want 2: %+v", len(res.Relationships), res.Relationships)
}
var hasContains, hasDep bool
for _, r := range res.Relationships {
if r.Type == "contains" && r.FromID == "fsn1" && r.ToID == "fsn1/bp-cert-manager" {
hasContains = true
}
if r.Type == "finish-to-start" && r.FromID == "fsn1/bp-cilium" && r.ToID == "fsn1/bp-cert-manager" {
hasDep = true
}
}
if !hasContains || !hasDep {
t.Fatalf("missing rels: %+v", res.Relationships)
}
}
func TestMapper_Ready_False_Progressing(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-trivy
status:
conditions:
- type: Ready
status: "False"
reason: Progressing
`)
res, _ := informer.BuildFromHR(hr, "hel1")
if res.Node.Status != "running" {
t.Fatalf("status: %s", res.Node.Status)
}
}
func TestMapper_Ready_False_InstallFailed(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-openbao
status:
conditions:
- type: Ready
status: "False"
reason: InstallFailed
`)
res, _ := informer.BuildFromHR(hr, "fsn1")
if res.Node.Status != "failed" {
t.Fatalf("status: %s", res.Node.Status)
}
}
func TestMapper_Ready_False_UpgradeFailed(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-keycloak
status:
conditions:
- type: Ready
status: "False"
reason: UpgradeFailed
`)
res, _ := informer.BuildFromHR(hr, "fsn1")
if res.Node.Status != "failed" {
t.Fatalf("status: %s", res.Node.Status)
}
}
func TestMapper_Ready_False_RetriesExhausted(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-grafana
status:
conditions:
- type: Ready
status: "False"
reason: RetriesExhausted
`)
res, _ := informer.BuildFromHR(hr, "fsn1")
if res.Node.Status != "failed" {
t.Fatalf("status: %s", res.Node.Status)
}
}
func TestMapper_Ready_Unknown(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-cnpg
status:
conditions:
- type: Ready
status: Unknown
reason: Reconciling
`)
res, _ := informer.BuildFromHR(hr, "fsn1")
if res.Node.Status != "running" {
t.Fatalf("status: %s", res.Node.Status)
}
}
func TestMapper_NoReadyConditionYet(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-newapi
`)
res, _ := informer.BuildFromHR(hr, "fsn1")
if res.Node.Status != "pending" {
t.Fatalf("status: %s", res.Node.Status)
}
}
func TestMapper_FamilyLabelOverride(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-stalwart-sovereign
labels:
catalyst.openova.io/family: mail
status:
conditions:
- type: Ready
status: "True"
`)
res, _ := informer.BuildFromHR(hr, "fsn1")
if res.Node.Family == nil || *res.Node.Family != "mail" {
t.Fatalf("family: %+v", res.Node.Family)
}
}
func TestMapper_RegionFallback(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-cilium
`)
res, _ := informer.BuildFromHR(hr, "")
if res.Node.ID != "default/bp-cilium" {
t.Fatalf("id: %s", res.Node.ID)
}
if res.Node.Region == nil || *res.Node.Region != "default" {
t.Fatalf("region: %+v", res.Node.Region)
}
}
func TestMapper_MultiDependsOn(t *testing.T) {
hr := parseHR(t, `
apiVersion: helm.toolkit.fluxcd.io/v2
kind: HelmRelease
metadata:
name: bp-guacamole
spec:
dependsOn:
- name: bp-cilium
- name: bp-cert-manager
- name: bp-keycloak
- name: bp-sealed-secrets
- name: bp-seaweedfs
- name: bp-k8s-ws-proxy
status:
conditions:
- type: Ready
status: "True"
`)
res, _ := informer.BuildFromHR(hr, "fsn1")
// 1 "contains" + 6 finish-to-start.
if len(res.Relationships) != 7 {
t.Fatalf("rels=%d want 7", len(res.Relationships))
}
}
func TestMapper_RegionNodeBootstrap(t *testing.T) {
n := informer.BuildRegionNode("hel1")
if n.ID != "hel1" {
t.Fatalf("id: %s", n.ID)
}
if n.Region == nil || *n.Region != "hel1" {
t.Fatalf("region: %+v", n.Region)
}
if n.Status != "running" {
t.Fatalf("status: %s", n.Status)
}
if n.Family == nil || *n.Family != "region" {
t.Fatalf("family: %+v", n.Family)
}
}

View File

@ -0,0 +1,22 @@
# openova-flow-server — stateless HTTP+SSE event router.
#
# Per docs/INVIOLABLE-PRINCIPLES.md #4a images are built by GitHub
# Actions and pulled through harbor.openova.io / per-Sovereign Harbor.
# This Dockerfile is the CI build recipe — never `docker build`d locally
# for shipment.
FROM golang:1.22-alpine AS build
WORKDIR /src
COPY go.mod ./
COPY cmd ./cmd
COPY internal ./internal
COPY test ./test
RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o /openova-flow-server ./cmd/openova-flow-server
FROM scratch
LABEL org.opencontainers.image.source="https://github.com/openova-io/openova"
LABEL org.opencontainers.image.description="OpenovaFlow event router — HTTP ingest + SSE replay"
LABEL org.opencontainers.image.licenses="Apache-2.0"
COPY --from=build /openova-flow-server /openova-flow-server
USER 1001:1001
EXPOSE 8080
ENTRYPOINT ["/openova-flow-server"]

View File

@ -0,0 +1,86 @@
# openova-flow-server
Stateless HTTP+SSE event router for OpenovaFlow. Holds an in-memory ring
buffer of `FlowMessage` envelopes per `flowId` and replays them to SSE
subscribers. Emitters POST envelopes; consumers GET the snapshot or
subscribe to the stream.
This is Agent #2's Go backend; Agent #1's TypeScript `@openova/flow-core`
+ `@openova/flow-canvas` packages are the canvas consumer.
## Wire contract
All endpoints use the locked `FlowMessage` JSON shape (see
`internal/types/flow.go` for the Go definition + the brief for the
locked schema). Schema version 1.
| Method | Path | Purpose |
|--------|-----------------------------------|---------|
| POST | `/v1/flows/{flowId}/events` | Ingest one FlowMessage. 200 on accept, 400 on schema violation. |
| GET | `/v1/flows/{flowId}/snapshot` | Current folded state. 404 when the flow has never been ingested. |
| GET | `/v1/flows/{flowId}/stream` | SSE: synthetic snapshot frame, then live tail. Heartbeats every 15s. |
| DELETE | `/v1/flows/{flowId}` | Purge a flow's state. Idempotent, returns 204. |
| GET | `/healthz` | Liveness. |
| GET | `/readyz` | Readiness. |
### SSE format
```
event: snapshot
id: 7
data: {"type":"snapshot",...}
event: upsert-nodes
id: 8
data: {"type":"upsert-nodes",...}
event: heartbeat
data: {}
```
The `id:` line carries the server-assigned monotonic sequence number,
so the EventSource client's `Last-Event-ID` resume header works
out-of-the-box on a future slice (we replay the buffer from
LastEventID+1 on reconnect — see backlog item).
## Behavior rules
- **Ring buffer per flow.** Default 4096 envelopes (env
`FLOW_SERVER_RING_CAPACITY`). FIFO drop on overflow. Snapshot folds
the buffer to current state on demand.
- **Concurrency.** Lock per flow, not global. Two flows mutate in
parallel.
- **SSE backpressure.** 16-slot channel per client. Slowest client
drops oldest events (mirrors catalyst-api k8scache fanout).
- **Validation.** Envelope `type` discriminates the variant; unknown
variants reject with 400. Missing required fields (e.g. `snapshot`
without a `flow`) reject with 400.
- **Storage.** None. State is in-memory, lost on restart. Replay
relies on emitters re-emitting `snapshot` on reconnect.
## Env
| Name | Default | Purpose |
|------------------------------|-------------|---------|
| `FLOW_SERVER_LISTEN_ADDR` | `:8080` | Listen address. |
| `FLOW_SERVER_RING_CAPACITY` | `4096` | Per-flow ring buffer size. |
Per `docs/INVIOLABLE-PRINCIPLES.md` #4 every parameter is env-driven.
## Build
```bash
cd products/openova-flow/server
go build ./...
go test ./...
```
CI image: `harbor.openova.io/proxy-ghcr/openova-io/openova/openova-flow-server:<sha>`
per the MIRROR-EVERYTHING rule. Never built/pushed from a workstation.
## Tests
- `test/contract_test.go` — every FlowMessage variant round-trips JSON
unchanged + every unknown / malformed variant rejects.
- `test/server_test.go` — full HTTP surface (health, ingest, snapshot,
delete, SSE replay+tail).

View File

@ -0,0 +1,72 @@
// openova-flow-server — stateless HTTP+SSE event router for
// OpenovaFlow. See products/openova-flow/server/README.md for the
// wire contract.
//
// Per docs/INVIOLABLE-PRINCIPLES.md #4 every operational knob is env-
// driven; no hardcoded port, no hardcoded ring capacity.
package main
import (
"context"
"errors"
"log/slog"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/openova-io/openova/products/openova-flow/server/internal/api"
"github.com/openova-io/openova/products/openova-flow/server/internal/store"
)
func main() {
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
addr := envDefault("FLOW_SERVER_LISTEN_ADDR", ":8080")
bufCap, err := strconv.Atoi(envDefault("FLOW_SERVER_RING_CAPACITY", "4096"))
if err != nil || bufCap <= 0 {
log.Warn("invalid FLOW_SERVER_RING_CAPACITY, falling back to 4096",
"raw", os.Getenv("FLOW_SERVER_RING_CAPACITY"), "err", err)
bufCap = 4096
}
s := store.NewStore(bufCap)
srv := &http.Server{
Addr: addr,
Handler: api.NewRouter(s),
ReadHeaderTimeout: 10 * time.Second,
// SSE streams are long-lived; no overall write timeout.
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()
go func() {
log.Info("openova-flow-server listening",
"addr", addr, "ringCapacity", bufCap)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error("listen failed", "err", err)
os.Exit(1)
}
}()
<-ctx.Done()
log.Info("shutting down")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Warn("graceful shutdown failed", "err", err)
}
}
func envDefault(k, def string) string {
v := os.Getenv(k)
if v == "" {
return def
}
return v
}

View File

@ -0,0 +1,3 @@
module github.com/openova-io/openova/products/openova-flow/server
go 1.22

View File

@ -0,0 +1,46 @@
package api
import (
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/openova-io/openova/products/openova-flow/server/internal/store"
"github.com/openova-io/openova/products/openova-flow/server/internal/types"
)
// MaxIngestBody — guard against an emitter shipping a runaway
// payload. Hard cap of 1 MiB per envelope. Snapshots that need more
// must split across N upsert-* messages.
const MaxIngestBody = 1 << 20
// HandleIngest accepts one FlowMessage envelope on POST. Returns
// 200 on accept, 400 on schema violation, 405 on bad method.
//
// Response body — {"seq":<assigned-sequence>}. Useful for emitters
// that want to correlate POSTs with the SSE id they'll see.
func HandleIngest(s *store.Store, flowID string, w http.ResponseWriter, r *http.Request) {
if flowID == "" {
http.Error(w, "flowId required", http.StatusBadRequest)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, MaxIngestBody+1))
if err != nil {
http.Error(w, fmt.Sprintf("read body: %v", err), http.StatusBadRequest)
return
}
if len(body) > MaxIngestBody {
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
return
}
m, err := types.DecodeFlowMessage(body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
seq := s.Append(flowID, m)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(map[string]any{"seq": seq})
}

View File

@ -0,0 +1,75 @@
// Package api wires the HTTP+SSE endpoints. Uses stdlib net/http
// servemux + a tiny path-param helper — we do NOT pull chi in here
// because the surface is five routes and zero middleware chains.
//
// Wire contract — locked across the three OpenovaFlow agents:
//
// POST /v1/flows/{flowId}/events ingest one FlowMessage
// GET /v1/flows/{flowId}/snapshot current FlowInstance + nodes + rels
// GET /v1/flows/{flowId}/stream SSE: replay snapshot + tail
// DELETE /v1/flows/{flowId} purge a flow
// GET /healthz liveness
// GET /readyz readiness
//
// Per docs/INVIOLABLE-PRINCIPLES.md #3 the implementation is
// event-driven — POST appends to the per-flow ring, fanout pushes onto
// every subscriber channel, the SSE handler reads from the channel.
// No polling loops.
package api
import (
"net/http"
"strings"
"github.com/openova-io/openova/products/openova-flow/server/internal/store"
)
// NewRouter returns an http.Handler routing the OpenovaFlow surface.
// The store is a long-lived process-global; multiple HTTP servers
// (the main listener and any test server) share the same store.
func NewRouter(s *store.Store) http.Handler {
mux := http.NewServeMux()
// Health endpoints are first-class — no auth, no flowId.
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok\n"))
})
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok\n"))
})
mux.HandleFunc("/v1/flows/", func(w http.ResponseWriter, r *http.Request) {
// Parse: /v1/flows/{flowId}[/sub]
path := strings.TrimPrefix(r.URL.Path, "/v1/flows/")
if path == "" {
http.NotFound(w, r)
return
}
parts := strings.SplitN(path, "/", 2)
flowID := parts[0]
if flowID == "" {
http.NotFound(w, r)
return
}
sub := ""
if len(parts) == 2 {
sub = parts[1]
}
switch {
case sub == "events" && r.Method == http.MethodPost:
HandleIngest(s, flowID, w, r)
case sub == "snapshot" && r.Method == http.MethodGet:
HandleSnapshot(s, flowID, w, r)
case sub == "stream" && r.Method == http.MethodGet:
HandleStream(s, flowID, w, r)
case sub == "" && r.Method == http.MethodDelete:
HandleDelete(s, flowID, w, r)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
})
return mux
}

View File

@ -0,0 +1,35 @@
package api
import (
"encoding/json"
"net/http"
"github.com/openova-io/openova/products/openova-flow/server/internal/store"
"github.com/openova-io/openova/products/openova-flow/server/internal/types"
)
// HandleSnapshot folds the per-flow ring into a `snapshot` envelope
// and writes it as a single JSON object. 404 when the flow id has
// never been ingested.
func HandleSnapshot(s *store.Store, flowID string, w http.ResponseWriter, r *http.Request) {
flow, nodes, rels := s.Snapshot(flowID)
if flow == nil && len(nodes) == 0 && len(rels) == 0 {
http.NotFound(w, r)
return
}
msg := types.FlowMessage{
Type: types.TypeSnapshot,
Flow: flow,
Nodes: nodes,
Relationships: rels,
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(msg)
}
// HandleDelete drops a flow's state. 204 on success regardless of
// whether the flow existed (idempotent).
func HandleDelete(s *store.Store, flowID string, w http.ResponseWriter, r *http.Request) {
s.Drop(flowID)
w.WriteHeader(http.StatusNoContent)
}

View File

@ -0,0 +1,109 @@
package api
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/openova-io/openova/products/openova-flow/server/internal/store"
"github.com/openova-io/openova/products/openova-flow/server/internal/types"
)
// HeartbeatInterval — sent to every SSE client every 15s so the
// connection survives an intermediate proxy's idle-timeout (most
// public ingresses cut at 60s).
const HeartbeatInterval = 15 * time.Second
// HandleStream serves SSE for a flowId:
// 1. Write a synthetic `snapshot` event from the current state.
// 2. Subscribe to the per-flow fanout.
// 3. Tail the channel forever (until client disconnects or DELETE
// drops the flow).
//
// Wire format mirrors @openova/flow-core's expected SSE stream:
//
// event: snapshot
// id: <seq>
// data: {"type":"snapshot",...}
//
// event: upsert-nodes
// id: <seq>
// data: {"type":"upsert-nodes",...}
//
// event: heartbeat
// data: {}
func HandleStream(s *store.Store, flowID string, w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
// SSE headers.
h := w.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache, no-transform")
h.Set("Connection", "keep-alive")
h.Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
// Subscribe BEFORE folding the snapshot so any event that
// arrives during the fold is queued for delivery after the
// snapshot (no events lost in the gap).
sub, cancel := s.Subscribe(flowID)
defer cancel()
// Initial snapshot. May be empty (no prior events) — in that
// case we emit a placeholder so the client knows the stream is
// alive but unseeded.
flow, nodes, rels := s.Snapshot(flowID)
snapMsg := types.FlowMessage{
Type: types.TypeSnapshot,
Flow: flow,
Nodes: nodes,
Relationships: rels,
}
if err := writeSSE(w, "snapshot", s.SeqForFlow(flowID), snapMsg); err != nil {
return
}
flusher.Flush()
heartbeat := time.NewTicker(HeartbeatInterval)
defer heartbeat.Stop()
for {
select {
case <-r.Context().Done():
return
case <-heartbeat.C:
if _, err := fmt.Fprint(w, "event: heartbeat\ndata: {}\n\n"); err != nil {
return
}
flusher.Flush()
case ev, ok := <-sub.Ch:
if !ok {
// Channel closed — flow was dropped.
return
}
if ev.Msg == nil {
continue
}
if err := writeSSE(w, string(ev.Msg.Type), ev.Seq, *ev.Msg); err != nil {
return
}
flusher.Flush()
}
}
}
func writeSSE(w http.ResponseWriter, event string, seq uint64, payload types.FlowMessage) error {
body, err := json.Marshal(payload)
if err != nil {
return err
}
if _, err := fmt.Fprintf(w, "event: %s\nid: %d\ndata: %s\n\n", event, seq, body); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,286 @@
package store
import (
"sync"
"github.com/openova-io/openova/products/openova-flow/server/internal/types"
)
// Store — Map<flowId, *flowState>. Each flowState owns:
// - a RingBuffer of raw FlowMessages (for SSE replay),
// - a folded current FlowInstance + nodes + relationships (built
// lazily from the buffer when /snapshot is hit).
//
// Concurrency: per-flow lock; no global lock across flows so multiple
// flows are mutated in parallel. Map mutations (Add/Drop) guarded by
// the top-level RWMutex.
type Store struct {
mu sync.RWMutex
flows map[string]*flowState
bufCap int
// fanout — per-flow set of subscriber channels. Each channel
// receives the assigned sequence + envelope after append. Backed
// by a 16-slot buffer per subscriber per the SSE spec.
subMu sync.Mutex
subs map[string]map[int64]*Subscriber
next int64
}
type flowState struct {
mu sync.RWMutex
buf *RingBuffer
}
// Subscriber — one SSE client. The handler reads from Ch.
type Subscriber struct {
ID int64
FlowID string
Ch chan SubEvent
// LastSeq the subscriber has acked (its cursor through the
// ring).
LastSeq uint64
}
// SubEvent — what flows down a Subscriber's channel: the assigned
// sequence + the envelope. The SSE handler renders both as the
// `id: <seq>` line + the JSON data payload.
type SubEvent struct {
Seq uint64
Msg *types.FlowMessage
}
// NewStore — empty store with the per-flow ring capacity.
func NewStore(bufCap int) *Store {
if bufCap <= 0 {
bufCap = 4096
}
return &Store{
flows: map[string]*flowState{},
bufCap: bufCap,
subs: map[string]map[int64]*Subscriber{},
}
}
// Append ingests a FlowMessage for the named flow. Lazily creates the
// per-flow state on first ingest. Returns the assigned sequence number.
func (s *Store) Append(flowID string, m *types.FlowMessage) uint64 {
if flowID == "" {
return 0
}
s.mu.Lock()
fs, ok := s.flows[flowID]
if !ok {
fs = &flowState{buf: NewRingBuffer(s.bufCap)}
s.flows[flowID] = fs
}
s.mu.Unlock()
seq := fs.buf.Append(m)
s.fanout(flowID, seq, m)
return seq
}
// BufferSlice — copy of the per-flow ring (oldest first). Empty when
// the flow id has never been ingested.
func (s *Store) BufferSlice(flowID string) []*types.FlowMessage {
s.mu.RLock()
fs, ok := s.flows[flowID]
s.mu.RUnlock()
if !ok {
return nil
}
return fs.buf.Slice()
}
// Snapshot folds the per-flow ring into the current FlowInstance + the
// current set of FlowNodes (keyed by (flowId,id)) + Relationships
// (keyed by (fromId,toId,type)). Mirrors @openova/flow-core's
// reducer — the wire contract is the same.
//
// Returns nils when the flow id has never been ingested.
func (s *Store) Snapshot(flowID string) (*types.FlowInstance, []types.FlowNode, []types.Relationship) {
msgs := s.BufferSlice(flowID)
if len(msgs) == 0 {
return nil, nil, nil
}
var flow *types.FlowInstance
nodes := map[string]types.FlowNode{}
rels := map[string]types.Relationship{}
for _, m := range msgs {
switch m.Type {
case types.TypeSnapshot:
// A snapshot resets state — drop any prior state, then
// seed from this envelope.
if m.Flow != nil {
f := *m.Flow
flow = &f
}
nodes = map[string]types.FlowNode{}
for _, n := range m.Nodes {
nodes[nodeKey(n)] = n
}
rels = map[string]types.Relationship{}
for _, r := range m.Relationships {
rels[relKey(r)] = r
}
case types.TypeUpsertFlow:
if m.Flow != nil {
f := *m.Flow
flow = &f
}
case types.TypeUpsertNodes:
for _, n := range m.Nodes {
nodes[nodeKey(n)] = n
}
case types.TypeUpsertRels:
for _, r := range m.Relationships {
rels[relKey(r)] = r
}
case types.TypeDeleteNodes:
for _, id := range m.IDs {
// Delete by id-suffix match across flowIds; the
// adapter typically scopes ids to a single flow but
// we honour cross-flow deletes too.
for k, n := range nodes {
if n.ID == id {
delete(nodes, k)
}
}
}
case types.TypeDeleteRels:
for _, p := range m.Pairs {
k := relKeyFromPair(p)
delete(rels, k)
}
}
}
nodeSlice := make([]types.FlowNode, 0, len(nodes))
for _, n := range nodes {
nodeSlice = append(nodeSlice, n)
}
relSlice := make([]types.Relationship, 0, len(rels))
for _, r := range rels {
relSlice = append(relSlice, r)
}
return flow, nodeSlice, relSlice
}
// Drop removes a flow's state entirely. Called by DELETE /v1/flows/{id}.
func (s *Store) Drop(flowID string) {
s.mu.Lock()
delete(s.flows, flowID)
s.mu.Unlock()
// Tear down every subscriber on the flow as well so SSE clients
// see EOF on their stream.
s.subMu.Lock()
subs, ok := s.subs[flowID]
if ok {
for _, sub := range subs {
close(sub.Ch)
}
delete(s.subs, flowID)
}
s.subMu.Unlock()
}
// Subscribe registers a new SSE consumer for the flow. The returned
// channel emits SubEvent values whose Seq is monotonic. The cancel
// func tears down the registration. The fanout is non-blocking with
// drop-oldest semantics — slow consumers lose events but the ingest
// path never stalls.
func (s *Store) Subscribe(flowID string) (*Subscriber, func()) {
s.subMu.Lock()
s.next++
sub := &Subscriber{
ID: s.next,
FlowID: flowID,
Ch: make(chan SubEvent, 16),
}
if _, ok := s.subs[flowID]; !ok {
s.subs[flowID] = map[int64]*Subscriber{}
}
s.subs[flowID][sub.ID] = sub
s.subMu.Unlock()
return sub, func() {
s.subMu.Lock()
if m, ok := s.subs[flowID]; ok {
if _, ok2 := m[sub.ID]; ok2 {
delete(m, sub.ID)
close(sub.Ch)
}
if len(m) == 0 {
delete(s.subs, flowID)
}
}
s.subMu.Unlock()
}
}
// fanout — non-blocking deliver to every subscriber on the flow.
// Slowest subscriber drops the oldest event to make room (16-slot
// buffer per client + drop-oldest). This mirrors the catalyst-api
// k8scache pattern.
func (s *Store) fanout(flowID string, seq uint64, m *types.FlowMessage) {
s.subMu.Lock()
subs := s.subs[flowID]
out := make([]*Subscriber, 0, len(subs))
for _, sub := range subs {
out = append(out, sub)
}
s.subMu.Unlock()
ev := SubEvent{Seq: seq, Msg: m}
for _, sub := range out {
select {
case sub.Ch <- ev:
default:
// Drop oldest, push new — never block the writer.
select {
case <-sub.Ch:
default:
}
select {
case sub.Ch <- ev:
default:
}
}
}
}
// SeqForFlow returns the most-recently-assigned sequence number for
// the given flowId, or 0 when the flow has never been ingested. Used
// by the SSE handler to stamp the initial snapshot's `id:` line.
func (s *Store) SeqForFlow(flowID string) uint64 {
s.mu.RLock()
fs, ok := s.flows[flowID]
s.mu.RUnlock()
if !ok {
return 0
}
return fs.buf.Seq()
}
// FlowIDs — debug accessor; returns the currently-known flow ids.
func (s *Store) FlowIDs() []string {
s.mu.RLock()
defer s.mu.RUnlock()
out := make([]string, 0, len(s.flows))
for id := range s.flows {
out = append(out, id)
}
return out
}
func nodeKey(n types.FlowNode) string {
return n.FlowID + "\x00" + n.ID
}
func relKey(r types.Relationship) string {
return r.FromID + "\x00" + r.ToID + "\x00" + r.Type
}
func relKeyFromPair(p types.RelPair) string {
return p.FromID + "\x00" + p.ToID + "\x00" + p.Type
}

View File

@ -0,0 +1,89 @@
// Package store — in-memory state per flowId. No persistence.
package store
import (
"sync"
"github.com/openova-io/openova/products/openova-flow/server/internal/types"
)
// RingBuffer is a fixed-capacity FIFO ring of FlowMessage envelopes.
// On overflow the oldest entry is dropped — emitters are expected to
// re-emit a snapshot on reconnect, so durability per-event is not the
// invariant; the invariant is "the buffer holds the most recent N
// events, in order, for SSE catch-up".
//
// Concurrency: safe for concurrent Append + Snapshot + Subscribe.
type RingBuffer struct {
mu sync.RWMutex
cap int
data []*types.FlowMessage
head int // next write index
count int
// monotonic counter so SSE subscribers can request "everything
// after sequence N" — the cursor sits between calls.
seq uint64
}
// NewRingBuffer constructs a buffer with the supplied capacity (must
// be > 0).
func NewRingBuffer(capacity int) *RingBuffer {
if capacity <= 0 {
capacity = 1
}
return &RingBuffer{
cap: capacity,
data: make([]*types.FlowMessage, capacity),
}
}
// Append pushes one envelope onto the buffer. Returns the assigned
// monotonic sequence number; the SSE handler emits this in the SSE
// `id:` field so reconnecting clients can resume from Last-Event-ID.
func (rb *RingBuffer) Append(m *types.FlowMessage) uint64 {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.seq++
rb.data[rb.head] = m
rb.head = (rb.head + 1) % rb.cap
if rb.count < rb.cap {
rb.count++
}
return rb.seq
}
// Slice returns a copy of the buffered envelopes in insertion order
// (oldest first). The returned slice is safe to mutate by the caller.
func (rb *RingBuffer) Slice() []*types.FlowMessage {
rb.mu.RLock()
defer rb.mu.RUnlock()
out := make([]*types.FlowMessage, 0, rb.count)
if rb.count < rb.cap {
// data[0..count) is the entire ring contiguous from index 0.
for i := 0; i < rb.count; i++ {
out = append(out, rb.data[i])
}
return out
}
// Full ring: oldest is at head.
for i := 0; i < rb.cap; i++ {
idx := (rb.head + i) % rb.cap
out = append(out, rb.data[idx])
}
return out
}
// Seq returns the latest assigned sequence number.
func (rb *RingBuffer) Seq() uint64 {
rb.mu.RLock()
defer rb.mu.RUnlock()
return rb.seq
}
// Len returns the number of currently buffered envelopes.
func (rb *RingBuffer) Len() int {
rb.mu.RLock()
defer rb.mu.RUnlock()
return rb.count
}

View File

@ -0,0 +1,163 @@
// Package types defines the FlowMessage wire contract — the JSON
// envelopes the openova-flow-server accepts on POST /v1/flows/{flowId}/events
// and replays on GET /v1/flows/{flowId}/stream (SSE).
//
// Schema version 1. Locked across all three OpenovaFlow agents:
// - Agent #1 ships matching TypeScript types in @openova/flow-core.
// - Agent #2 (this module) defines the Go-side shape.
// - Agent #3 wires the emitters (catalyst-api proxy + flux adapter
// sidecar) to POST these envelopes.
//
// Per docs/INVIOLABLE-PRINCIPLES.md:
//
// #1 (waterfall) — every envelope variant defined in the locked
// contract is implemented at first cut; there is no "subset for
// v1" carve-out.
// #4 (never hardcode) — message variants are open strings (Status,
// Family, Region) so per-deployment overlays can drive the
// palette without re-rolling this binary.
//
// FlowMessage is decoded in two phases: the envelope unmarshals to
// pick the Type discriminator, then the variant-specific payload is
// re-unmarshalled into the typed shape. See ingest.go.
package types
import (
"encoding/json"
"errors"
"fmt"
)
// Triggerer describes one "this flow caused that flow" edge across
// the runtime instance graph. Always a soft (no-FK) reference — the
// referenced flowId may have rolled out of the ring buffer or never
// been seen.
type Triggerer struct {
FlowID string `json:"flowId"`
// When ∈ {"success", "failure", "always"}.
When string `json:"when"`
}
// FlowInstance — one runtime instance of a flow (e.g. one Temporal
// workflow execution, one bootstrap-kit reconcile, one CI job tree).
type FlowInstance struct {
ID string `json:"id"`
DefinitionID *string `json:"definitionId,omitempty"`
ParentFlowID *string `json:"parentFlowId,omitempty"`
TriggeredBy []Triggerer `json:"triggeredBy,omitempty"`
Status string `json:"status"`
StartedAt int64 `json:"startedAt"`
EndedAt *int64 `json:"endedAt,omitempty"`
Meta map[string]interface{} `json:"meta,omitempty"`
}
// FlowNode — one node within a FlowInstance. The (flowId,id) pair is
// the natural key.
type FlowNode struct {
ID string `json:"id"`
FlowID string `json:"flowId"`
Label string `json:"label"`
Status string `json:"status"`
Family *string `json:"family,omitempty"`
Region *string `json:"region,omitempty"`
StartedAt *int64 `json:"startedAt,omitempty"`
EndedAt *int64 `json:"endedAt,omitempty"`
Meta map[string]interface{} `json:"meta,omitempty"`
}
// Relationship — directed edge between two nodes. Cross-flow edges
// set ToFlowID; intra-flow edges leave both nullable flow-ids unset.
type Relationship struct {
FromID string `json:"fromId"`
ToID string `json:"toId"`
FromFlowID *string `json:"fromFlowId,omitempty"`
ToFlowID *string `json:"toFlowId,omitempty"`
// Type ∈ {"contains", "finish-to-start", "start-to-start",
// "finish-to-finish", "start-to-finish", "triggers"}.
Type string `json:"type"`
// Condition ∈ {"on-success", "on-failure", "always"}.
Condition string `json:"condition,omitempty"`
// Lag in seconds (>= 0).
Lag int64 `json:"lag,omitempty"`
}
// RelPair — minimal identity for a Relationship used by
// delete-rels envelopes.
type RelPair struct {
FromID string `json:"fromId"`
ToID string `json:"toId"`
Type string `json:"type"`
}
// MessageType discriminator. New variants land here; envelopes whose
// Type is unknown fail validation at ingest.
type MessageType string
const (
TypeSnapshot MessageType = "snapshot"
TypeUpsertFlow MessageType = "upsert-flow"
TypeUpsertNodes MessageType = "upsert-nodes"
TypeUpsertRels MessageType = "upsert-rels"
TypeDeleteNodes MessageType = "delete-nodes"
TypeDeleteRels MessageType = "delete-rels"
)
// FlowMessage is the wire envelope. Fields beyond Type are optional;
// which subset is non-empty depends on Type (validated by Validate).
type FlowMessage struct {
Type MessageType `json:"type"`
Flow *FlowInstance `json:"flow,omitempty"`
Nodes []FlowNode `json:"nodes,omitempty"`
Relationships []Relationship `json:"relationships,omitempty"`
IDs []string `json:"ids,omitempty"`
Pairs []RelPair `json:"pairs,omitempty"`
}
// Validate enforces the per-variant required-fields contract. Returns
// nil on a well-formed envelope, an error suitable for HTTP 400 body
// otherwise.
func (m *FlowMessage) Validate() error {
switch m.Type {
case TypeSnapshot:
if m.Flow == nil {
return errors.New("snapshot: flow is required")
}
case TypeUpsertFlow:
if m.Flow == nil {
return errors.New("upsert-flow: flow is required")
}
case TypeUpsertNodes:
if len(m.Nodes) == 0 {
return errors.New("upsert-nodes: nodes must be non-empty")
}
case TypeUpsertRels:
if len(m.Relationships) == 0 {
return errors.New("upsert-rels: relationships must be non-empty")
}
case TypeDeleteNodes:
if len(m.IDs) == 0 {
return errors.New("delete-nodes: ids must be non-empty")
}
case TypeDeleteRels:
if len(m.Pairs) == 0 {
return errors.New("delete-rels: pairs must be non-empty")
}
default:
return fmt.Errorf("unknown message type %q", m.Type)
}
return nil
}
// DecodeFlowMessage parses raw JSON bytes into a FlowMessage and
// validates the variant. Returns the typed envelope or a 400-eligible
// error.
func DecodeFlowMessage(raw []byte) (*FlowMessage, error) {
var m FlowMessage
if err := json.Unmarshal(raw, &m); err != nil {
return nil, fmt.Errorf("decode FlowMessage: %w", err)
}
if err := m.Validate(); err != nil {
return nil, err
}
return &m, nil
}

View File

@ -0,0 +1,148 @@
// Contract tests — every FlowMessage variant from the locked schema
// must round-trip through JSON unchanged, validate cleanly, and fold
// to the expected snapshot. Mirror file at @openova/flow-core in
// Agent #1's package; the wire shape MUST stay byte-identical.
package test
import (
"encoding/json"
"strings"
"testing"
"github.com/openova-io/openova/products/openova-flow/server/internal/types"
)
func ptrString(s string) *string { return &s }
func TestFlowMessage_RoundTrip(t *testing.T) {
cases := []struct {
name string
raw string
}{
{
name: "snapshot",
raw: `{
"type":"snapshot",
"flow":{"id":"f1","status":"running","startedAt":1234567890123},
"nodes":[{"id":"n1","flowId":"f1","label":"hello","status":"succeeded"}],
"relationships":[{"fromId":"n1","toId":"n2","type":"finish-to-start","condition":"on-success"}]
}`,
},
{
name: "upsert-flow",
raw: `{"type":"upsert-flow","flow":{"id":"f1","status":"running","startedAt":1}}`,
},
{
name: "upsert-nodes",
raw: `{"type":"upsert-nodes","nodes":[{"id":"n1","flowId":"f1","label":"x","status":"running"}]}`,
},
{
name: "upsert-rels",
raw: `{"type":"upsert-rels","relationships":[{"fromId":"a","toId":"b","type":"contains"}]}`,
},
{
name: "delete-nodes",
raw: `{"type":"delete-nodes","ids":["n1","n2"]}`,
},
{
name: "delete-rels",
raw: `{"type":"delete-rels","pairs":[{"fromId":"a","toId":"b","type":"finish-to-start"}]}`,
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
m, err := types.DecodeFlowMessage([]byte(tc.raw))
if err != nil {
t.Fatalf("decode: %v", err)
}
out, err := json.Marshal(m)
if err != nil {
t.Fatalf("marshal: %v", err)
}
// Re-decode after round-trip — ensures both directions
// honour the schema.
m2, err := types.DecodeFlowMessage(out)
if err != nil {
t.Fatalf("round-trip decode: %v\noutput=%s", err, out)
}
if m2.Type != m.Type {
t.Fatalf("Type changed: %s -> %s", m.Type, m2.Type)
}
})
}
}
func TestFlowMessage_ValidateRejectsUnknown(t *testing.T) {
cases := []string{
`{"type":"banana"}`,
`{"type":"snapshot"}`, // missing flow
`{"type":"upsert-nodes"}`,
`{"type":"delete-nodes"}`,
`{"type":"delete-rels"}`,
}
for _, raw := range cases {
raw := raw
t.Run(strings.TrimPrefix(raw, `{"type":"`), func(t *testing.T) {
if _, err := types.DecodeFlowMessage([]byte(raw)); err == nil {
t.Fatalf("expected error for %s", raw)
}
})
}
}
// TestFlowMessage_CrossFlowRelationship — the cross-flow case from
// the locked contract (Triggerer + ToFlowID). Must survive
// round-trip.
func TestFlowMessage_CrossFlowRelationship(t *testing.T) {
raw := `{
"type":"upsert-rels",
"relationships":[
{"fromId":"a","toId":"b","toFlowId":"sister","type":"triggers","condition":"on-success","lag":30}
]
}`
m, err := types.DecodeFlowMessage([]byte(raw))
if err != nil {
t.Fatalf("decode: %v", err)
}
if len(m.Relationships) != 1 {
t.Fatalf("expected 1 rel, got %d", len(m.Relationships))
}
r := m.Relationships[0]
if r.ToFlowID == nil || *r.ToFlowID != "sister" {
t.Fatalf("ToFlowID not preserved: %+v", r.ToFlowID)
}
if r.Type != "triggers" {
t.Fatalf("Type changed: %s", r.Type)
}
if r.Lag != 30 {
t.Fatalf("Lag changed: %d", r.Lag)
}
}
// TestFlowMessage_TriggeredByChain — Triggerer arrays on FlowInstance
// (multi-source triggers) must serialise.
func TestFlowMessage_TriggeredByChain(t *testing.T) {
fi := types.FlowInstance{
ID: "child",
Status: "running",
StartedAt: 1,
TriggeredBy: []types.Triggerer{
{FlowID: "parentA", When: "success"},
{FlowID: "parentB", When: "failure"},
},
DefinitionID: ptrString("def-1"),
ParentFlowID: ptrString("parentA"),
}
body, err := json.Marshal(types.FlowMessage{Type: types.TypeUpsertFlow, Flow: &fi})
if err != nil {
t.Fatalf("marshal: %v", err)
}
m2, err := types.DecodeFlowMessage(body)
if err != nil {
t.Fatalf("decode: %v", err)
}
if m2.Flow == nil || len(m2.Flow.TriggeredBy) != 2 {
t.Fatalf("TriggeredBy lost: %+v", m2.Flow)
}
}

View File

@ -0,0 +1,261 @@
// HTTP-level tests — every endpoint with a happy path and a sad path.
package test
import (
"bufio"
"bytes"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/openova-io/openova/products/openova-flow/server/internal/api"
"github.com/openova-io/openova/products/openova-flow/server/internal/store"
"github.com/openova-io/openova/products/openova-flow/server/internal/types"
)
func newServer(t *testing.T) (*httptest.Server, *store.Store) {
t.Helper()
s := store.NewStore(64)
ts := httptest.NewServer(api.NewRouter(s))
t.Cleanup(ts.Close)
return ts, s
}
func post(t *testing.T, ts *httptest.Server, flowID string, body string) *http.Response {
t.Helper()
req, err := http.NewRequest(http.MethodPost,
ts.URL+"/v1/flows/"+flowID+"/events",
strings.NewReader(body))
if err != nil {
t.Fatalf("new req: %v", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("do: %v", err)
}
return resp
}
func TestHealth(t *testing.T) {
ts, _ := newServer(t)
for _, p := range []string{"/healthz", "/readyz"} {
resp, err := http.Get(ts.URL + p)
if err != nil {
t.Fatalf("get %s: %v", p, err)
}
if resp.StatusCode != 200 {
t.Fatalf("%s status %d", p, resp.StatusCode)
}
resp.Body.Close()
}
}
func TestIngestAndSnapshot(t *testing.T) {
ts, _ := newServer(t)
resp := post(t, ts, "f1", `{
"type":"upsert-flow",
"flow":{"id":"f1","status":"running","startedAt":1}
}`)
if resp.StatusCode != 200 {
t.Fatalf("upsert-flow status %d", resp.StatusCode)
}
resp.Body.Close()
resp = post(t, ts, "f1", `{
"type":"upsert-nodes",
"nodes":[{"id":"n1","flowId":"f1","label":"hi","status":"running"}]
}`)
if resp.StatusCode != 200 {
t.Fatalf("upsert-nodes status %d", resp.StatusCode)
}
resp.Body.Close()
resp = post(t, ts, "f1", `{
"type":"upsert-rels",
"relationships":[{"fromId":"n1","toId":"n2","type":"finish-to-start"}]
}`)
if resp.StatusCode != 200 {
t.Fatalf("upsert-rels status %d", resp.StatusCode)
}
resp.Body.Close()
// Snapshot must aggregate all three.
resp, err := http.Get(ts.URL + "/v1/flows/f1/snapshot")
if err != nil {
t.Fatalf("get snapshot: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Fatalf("snapshot status %d", resp.StatusCode)
}
var msg types.FlowMessage
if err := json.NewDecoder(resp.Body).Decode(&msg); err != nil {
t.Fatalf("decode: %v", err)
}
if msg.Type != types.TypeSnapshot {
t.Fatalf("type %s", msg.Type)
}
if msg.Flow == nil || msg.Flow.ID != "f1" {
t.Fatalf("flow missing: %+v", msg.Flow)
}
if len(msg.Nodes) != 1 || msg.Nodes[0].ID != "n1" {
t.Fatalf("nodes wrong: %+v", msg.Nodes)
}
if len(msg.Relationships) != 1 || msg.Relationships[0].Type != "finish-to-start" {
t.Fatalf("rels wrong: %+v", msg.Relationships)
}
}
func TestSnapshot404(t *testing.T) {
ts, _ := newServer(t)
resp, err := http.Get(ts.URL + "/v1/flows/nope/snapshot")
if err != nil {
t.Fatalf("get: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 404 {
t.Fatalf("status %d", resp.StatusCode)
}
}
func TestIngest400OnBadJSON(t *testing.T) {
ts, _ := newServer(t)
resp := post(t, ts, "f1", `{not json}`)
defer resp.Body.Close()
if resp.StatusCode != 400 {
t.Fatalf("status %d", resp.StatusCode)
}
}
func TestIngest400OnUnknownType(t *testing.T) {
ts, _ := newServer(t)
resp := post(t, ts, "f1", `{"type":"unknown-thing"}`)
defer resp.Body.Close()
if resp.StatusCode != 400 {
t.Fatalf("status %d", resp.StatusCode)
}
}
func TestDelete(t *testing.T) {
ts, _ := newServer(t)
post(t, ts, "f1", `{"type":"upsert-flow","flow":{"id":"f1","status":"running","startedAt":1}}`).Body.Close()
req, _ := http.NewRequest(http.MethodDelete, ts.URL+"/v1/flows/f1", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("do: %v", err)
}
if resp.StatusCode != 204 {
t.Fatalf("status %d", resp.StatusCode)
}
resp.Body.Close()
// Subsequent snapshot must 404.
resp, _ = http.Get(ts.URL + "/v1/flows/f1/snapshot")
if resp.StatusCode != 404 {
t.Fatalf("after delete status %d", resp.StatusCode)
}
resp.Body.Close()
}
// TestStreamReplayAndTail — connect to /stream, expect snapshot
// frame, then post a new event and expect it on the wire.
func TestStreamReplayAndTail(t *testing.T) {
ts, _ := newServer(t)
post(t, ts, "f2", `{"type":"upsert-flow","flow":{"id":"f2","status":"running","startedAt":1}}`).Body.Close()
resp, err := http.Get(ts.URL + "/v1/flows/f2/stream")
if err != nil {
t.Fatalf("stream: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Fatalf("stream status %d", resp.StatusCode)
}
// Read the snapshot frame.
frame := readSSEFrame(t, resp.Body, 2*time.Second)
if frame.event != "snapshot" {
t.Fatalf("first frame event=%s", frame.event)
}
if !strings.Contains(frame.data, `"f2"`) {
t.Fatalf("snapshot missing flowId: %s", frame.data)
}
// Post a follow-up event in a goroutine.
go func() {
time.Sleep(100 * time.Millisecond)
post(t, ts, "f2", `{"type":"upsert-nodes","nodes":[{"id":"n5","flowId":"f2","label":"x","status":"running"}]}`).Body.Close()
}()
frame = readSSEFrame(t, resp.Body, 2*time.Second)
if frame.event != "upsert-nodes" {
t.Fatalf("second frame event=%s data=%s", frame.event, frame.data)
}
if !strings.Contains(frame.data, `"n5"`) {
t.Fatalf("upsert-nodes frame missing id: %s", frame.data)
}
}
type sseFrame struct {
event string
id string
data string
}
// readSSEFrame consumes lines until a blank line, returning the
// parsed event/id/data triple. Times out cleanly.
func readSSEFrame(t *testing.T, body io.Reader, timeout time.Duration) sseFrame {
t.Helper()
type result struct {
f sseFrame
err error
}
ch := make(chan result, 1)
go func() {
sc := bufio.NewScanner(body)
sc.Buffer(make([]byte, 1<<20), 1<<20)
var f sseFrame
var dataBuf bytes.Buffer
for sc.Scan() {
line := sc.Text()
if line == "" {
f.data = strings.TrimRight(dataBuf.String(), "\n")
// Skip heartbeats — caller usually wants real frames.
if f.event == "heartbeat" {
f = sseFrame{}
dataBuf.Reset()
continue
}
ch <- result{f: f}
return
}
switch {
case strings.HasPrefix(line, "event:"):
f.event = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
case strings.HasPrefix(line, "id:"):
f.id = strings.TrimSpace(strings.TrimPrefix(line, "id:"))
case strings.HasPrefix(line, "data:"):
if dataBuf.Len() > 0 {
dataBuf.WriteByte('\n')
}
dataBuf.WriteString(strings.TrimSpace(strings.TrimPrefix(line, "data:")))
}
}
ch <- result{err: sc.Err()}
}()
select {
case r := <-ch:
if r.err != nil {
t.Fatalf("sse scan: %v", r.err)
}
return r.f
case <-time.After(timeout):
t.Fatalf("sse read timeout")
}
return sseFrame{}
}

View File

@ -388,6 +388,23 @@ slots:
depends_on: []
wave: present
# ---- Slots 56/57 — OpenovaFlow observability cohort.
# Stateless HTTP+SSE event router (slot 56, primary cluster only) +
# region-aware Flux adapter DaemonSet (slot 57, every cluster).
# The adapter watches helm.toolkit.fluxcd.io/HelmRelease CRs and
# POSTs FlowMessage envelopes to the configured openova-flow-server.
# Three-agent build: Agent #1 (TS @openova/flow-core +
# @openova/flow-canvas), Agent #2 (this Go server + flux adapter),
# Agent #3 (catalyst-api proxy + bootstrap-kit integration).
- slot: 56
name: bp-openova-flow-server
depends_on: [bp-cilium, bp-cert-manager]
wave: present
- slot: 57
name: bp-openova-flow-emitter
depends_on: [bp-flux]
wave: present
# ---- Slot 80 — bp-newapi multi-tenant LLM marketplace gateway. Issue #799.
# Sequenced past the W2.K4 numbering plan (slots 36-48) so it never
# collides with the AI-runtime / observability / livekit cohort. The