Compare commits

...

2 Commits

Author SHA1 Message Date
hatiyildiz
b060ffa991 fix(catalyst-api): guard SSE channel send under dep.mu to close pre-existing race
The pre-existing TestRestoreFromStore_ResumesHelmwatchWhenKubeconfigPathExists
test under -race fails 100% deterministically: resumePhase1Watch.func1
closes dep.eventsCh under dep.mu while a still-in-flight helmwatch
processEvent goroutine sends to it without holding the same lock.

Hold dep.mu around the channel send + bridge fan-out and short-circuit
on dep.done so a closed channel can never panic on send. The lazy
jobs.Bridge allocation now happens under the same lock so the next
emit doesn't allocate a fresh Bridge after close-on-resume.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 21:38:10 +02:00
hatiyildiz
18b94fea75 feat(catalyst-api): jobs/executions data model + log pagination API (closes #205)
Adds the backend data model + REST API + persistence the table-view UX
in epic #204 reads. Each bp-<chart> HelmRelease the Phase-1 helmwatch
observes maps 1:1 to a Job; helmwatch state transitions append
LogLines to the active Execution's NDJSON file. The existing SSE
events feed is unmodified — both feeds live in parallel.

New package internal/jobs:
  - types.go             Job / Execution / LogLine / Index / BatchSummary
  - store.go             flat-file persistence: index.json (atomic
                         temp+rename) + per-execution NDJSON append-only
                         log file. Pagination + race-free concurrent
                         appends.
  - helmwatch_bridge.go  helmwatch.Event → Job/Execution/LogLine
                         translation. Handles dependsOn (bp- prefix
                         strip + install- prepend), level mapping,
                         duplicate-state suppression.

New endpoints (read-only, all return well-shaped JSON):
  GET /api/v1/deployments/{depId}/jobs               list jobs
  GET /api/v1/deployments/{depId}/jobs/{jobId}       job + executions
  GET /api/v1/deployments/{depId}/jobs/batches       per-batch progress
  GET /api/v1/actions/executions/{execId}/logs?fromLine&limit
                                                     paginated logs

Wiring:
  - emitWatchEvent (the single Phase 0/1 emit path) forwards Phase-1
    component events to a per-deployment jobs.Bridge. Bridge writes
    are best-effort + logged at warn — failure does NOT abort the
    SSE feed.
  - jobs.Store wired in handler.New() from CATALYST_EXECUTIONS_DIR
    (default /var/lib/catalyst/executions on the existing
    catalyst-api-deployments PVC). Nil-tolerant: tests without
    persistence and CI runners without write access fall back to a
    503 on the jobs endpoints.

Tests:
  - 13 store tests (round-trip, pagination, atomic write, race-free
    concurrent appends, batch summary, path-traversal rejection)
  - 8 bridge tests (seed, happy-path, failed terminal, dup
    suppression, Phase-0 filtering, level/state mapping)
  - 9 handler tests via httptest covering all 4 endpoints + 503 +
    404 paths
  - 3 integration tests proving emitWatchEvent populates the jobs
    store without breaking the SSE buffer

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 21:31:25 +02:00
11 changed files with 2771 additions and 1 deletions

View File

@ -77,6 +77,14 @@ func main() {
// Phase 1 retries emit operator instructions per the architectural
// contract (Flux owns Phase 1 reconciliation).
r.Post("/api/v1/deployments/{id}/phases/{phase}/retry", h.RetryPhase)
// Jobs/Executions REST surface (issue #205, sub of epic #204) — the
// table-view UX reads this in parallel to the existing SSE events
// feed. The 4 endpoints are read-only; every mutation flows
// through the helmwatch bridge in internal/jobs.
r.Get("/api/v1/deployments/{depId}/jobs", h.ListJobs)
r.Get("/api/v1/deployments/{depId}/jobs/batches", h.ListBatches)
r.Get("/api/v1/deployments/{depId}/jobs/{jobId}", h.GetJob)
r.Get("/api/v1/actions/executions/{execId}/logs", h.GetExecutionLogs)
log.Info("catalyst api listening", "port", port)
if err := http.ListenAndServe(":"+port, r); err != nil {

View File

@ -32,6 +32,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/pdm"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/store"
@ -123,6 +124,13 @@ type Deployment struct {
// a no-op rather than racing two informers against the same
// HelmRelease list.
phase1Started bool
// jobsBridge — per-deployment helmwatch → Job/Execution/LogLine
// bridge (issue #205, sub of epic #204). Allocated on first
// component event in emitWatchEvent. Nil-tolerant: the emit path
// no-ops the forward when bridge is nil OR the Handler's jobs
// store is nil (tests without persistence).
jobsBridge *jobs.Bridge
}
// recordEvent appends ev to the durable history under the mutex, evicting
@ -872,12 +880,62 @@ func (h *Handler) runProvisioning(dep *Deployment) {
// burst; once full we drop on the LIVE side only — the durable
// buffer still has the event so the next /events poll or SSE
// reconnect replays it.
//
// The same call also forwards Phase-1 component events to the per-
// deployment jobs.Bridge, which materialises Jobs / Executions / LogLines
// into the new /api/v1/deployments/{id}/jobs surface (issue #205, sub
// of epic #204). The bridge write is best-effort: a failure does NOT
// abort the SSE feed (the durable buffer is the contract for
// /api/v1/deployments/{id}/events; the jobs surface is a parallel
// projection). Errors are logged at warn so an operator can spot
// drift.
func (h *Handler) emitWatchEvent(dep *Deployment, ev provisioner.Event) {
recorded := h.recordEventAndPersist(dep, ev)
// Synchronise channel send with the close() path in
// resumePhase1Watch (which closes dep.eventsCh under dep.mu after
// the watch loop returns). Without this guard a helmwatch
// goroutine still in-flight when resumePhase1Watch closes
// eventsCh races the close — the race detector flags the read
// of eventsCh against the close write. Holding dep.mu here makes
// the close-vs-send linearisation point unambiguous, and the
// `done` short-circuit prevents a panic-on-closed-channel send.
dep.mu.Lock()
closed := false
select {
case dep.eventsCh <- recorded:
case <-dep.done:
closed = true
default:
}
if !closed {
select {
case dep.eventsCh <- recorded:
default:
}
}
bridge := dep.jobsBridge
if h.jobs != nil && bridge == nil {
bridge = jobs.NewBridge(h.jobs, dep.ID)
dep.jobsBridge = bridge
}
dep.mu.Unlock()
// Forward Phase-1 component events to the jobs bridge. Phase-0
// OpenTofu events have no Job analogue and are silently dropped
// inside the bridge (it filters on Phase=="component" + non-
// empty Component). The bridge write is best-effort: a failure
// does NOT abort the SSE feed.
if bridge == nil {
return
}
if err := bridge.OnProvisionerEvent(recorded); err != nil {
h.log.Warn("jobs bridge: forward failed",
"id", dep.ID,
"phase", recorded.Phase,
"component", recorded.Component,
"err", err,
)
}
}
func newID() string {

View File

@ -12,6 +12,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/pdm"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/store"
)
@ -56,6 +57,13 @@ type Handler struct {
// always wires this via New() reading CATALYST_DEPLOYMENTS_DIR.
store *store.Store
// jobs — Jobs/Executions/LogLines persistence (issue #205, sub of
// epic #204). Nil-tolerant the same way as `store` above: tests
// building Handler{} directly run without a jobs store; production
// New() wires this from CATALYST_EXECUTIONS_DIR. The 4 jobs HTTP
// handlers respond 503 when this is nil.
jobs *jobs.Store
// kubeconfigsDir — directory the cloud-init postback handler
// (PutKubeconfig, issue #183) writes the new Sovereign's
// kubeconfig file into, mode 0600 per file. Same PVC as the
@ -173,6 +181,25 @@ func New(log *slog.Logger) *Handler {
h.restoreFromStore()
}
// Jobs/Executions store (issue #205) — same PVC mount, different
// subdirectory. CATALYST_EXECUTIONS_DIR overrides the default per
// docs/INVIOLABLE-PRINCIPLES.md #4. A failure to create the store
// is logged + non-fatal, mirroring the deployment store's CI
// fallback path.
jobsDir := os.Getenv(jobs.EnvDir)
if jobsDir == "" {
jobsDir = jobs.DefaultDir
}
js, err := jobs.NewStore(jobsDir)
if err != nil {
log.Warn("jobs store unavailable; jobs/executions API will return 503",
"dir", jobsDir,
"err", err,
)
} else {
h.jobs = js
}
return h
}
@ -207,6 +234,17 @@ func NewWithStore(log *slog.Logger, client pdmClient, st *store.Store) *Handler
return h
}
// NewWithJobsStore is a test-only constructor that wires an in-memory
// Handler with a jobs.Store rooted at a t.TempDir(). The 4 jobs HTTP
// handlers can be exercised end-to-end against this construction
// without standing up the full deployment store / PDM stack.
func NewWithJobsStore(log *slog.Logger, js *jobs.Store) *Handler {
return &Handler{
log: log,
jobs: js,
}
}
// NewWithStoreAndKubeconfigsDir is the persistence-aware constructor
// the issue-#183 test suite uses to point both the deployment store
// AND the kubeconfigs directory at t.TempDir() subdirectories. The

View File

@ -0,0 +1,239 @@
// Package handler — jobs.go: REST surface for the Jobs/Executions
// data model the Sovereign Admin's table-view UX (epic #204) reads.
//
// Four endpoints, all read-only — every mutation flows through the
// helmwatch bridge in internal/jobs.Bridge, which the Phase-1 watch
// goroutine wires up.
//
// - GET /api/v1/deployments/{depId}/jobs — list Jobs
// - GET /api/v1/deployments/{depId}/jobs/{jobId} — one Job +
// executions
// - GET /api/v1/actions/executions/{execId}/logs — paginated
// LogLines
// - GET /api/v1/deployments/{depId}/jobs/batches — per-batch
// progress
//
// Backwards compat: the existing `/api/v1/deployments/{id}/events`
// SSE feed is not modified. Both feeds live in parallel; the wizard
// reads SSE for live banner state and the new table-view UX reads
// these endpoints.
package handler
import (
"errors"
"net/http"
"strconv"
"strings"
"github.com/go-chi/chi/v5"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
)
// jobsStore returns the Handler's jobs.Store. Returns nil when
// persistence is disabled (CI runners without write access to
// /var/lib). Handlers map a nil store onto HTTP 503 so the operator
// can tell "no jobs yet" (200 with empty list) apart from "store is
// down" (503 with retry-after).
func (h *Handler) jobsStore() *jobs.Store {
return h.jobs
}
// ListJobs handles GET /api/v1/deployments/{depId}/jobs.
//
// Returns `{ "jobs": [...] }` — the slice is sorted started-at DESC
// with pending Jobs (no StartedAt) bucketed last. Empty deployment →
// empty slice (not null) so the JSON shape never breaks the
// frontend's render loop.
func (h *Handler) ListJobs(w http.ResponseWriter, r *http.Request) {
st := h.jobsStore()
if st == nil {
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
"error": "jobs-store-unavailable",
"detail": "catalyst-api is running with persistence disabled — see Pod logs",
})
return
}
depID := strings.TrimSpace(chi.URLParam(r, "depId"))
if depID == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{
"error": "missing-depId",
"detail": "deployment id path segment is required",
})
return
}
out, err := st.ListJobs(depID)
if err != nil {
h.log.Error("ListJobs: load index failed", "depId", depID, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{
"error": "store-read-failed",
})
return
}
if out == nil {
out = []jobs.Job{}
}
writeJSON(w, http.StatusOK, map[string]any{
"jobs": out,
})
}
// GetJob handles GET /api/v1/deployments/{depId}/jobs/{jobId}.
//
// jobId is the "<deploymentId>:<jobName>" stable id. Chi routes a
// colon as a literal so the parameter arrives intact; a stray segment
// is rejected before hitting the store.
//
// Returns `{ "job": {...}, "executions": [...] }`. The executions
// slice is sorted startedAt DESC so the most-recent attempt is index
// 0 — matches the wire spec in #205 and the GitLab-CI runner
// convention.
func (h *Handler) GetJob(w http.ResponseWriter, r *http.Request) {
st := h.jobsStore()
if st == nil {
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
"error": "jobs-store-unavailable",
})
return
}
depID := strings.TrimSpace(chi.URLParam(r, "depId"))
jobID := strings.TrimSpace(chi.URLParam(r, "jobId"))
if depID == "" || jobID == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{
"error": "missing-path-params",
})
return
}
job, execs, err := st.GetJob(depID, jobID)
if err != nil {
if errors.Is(err, jobs.ErrNotFound) {
writeJSON(w, http.StatusNotFound, map[string]string{
"error": "job-not-found",
})
return
}
h.log.Error("GetJob: load failed", "depId", depID, "jobId", jobID, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{
"error": "store-read-failed",
})
return
}
if execs == nil {
execs = []jobs.Execution{}
}
writeJSON(w, http.StatusOK, map[string]any{
"job": job,
"executions": execs,
})
}
// ListBatches handles GET /api/v1/deployments/{depId}/jobs/batches.
//
// Returns `{ "batches": [...] }` — one row per BatchID with progress
// counts. Empty deployment → empty slice. The current implementation
// always emits at most one batch ("bootstrap-kit") since Phase-1 is
// the only Job source; future Day-2 batches will appear automatically
// as the bridge writes them.
func (h *Handler) ListBatches(w http.ResponseWriter, r *http.Request) {
st := h.jobsStore()
if st == nil {
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
"error": "jobs-store-unavailable",
})
return
}
depID := strings.TrimSpace(chi.URLParam(r, "depId"))
if depID == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{
"error": "missing-depId",
})
return
}
out, err := st.SummarizeBatches(depID)
if err != nil {
h.log.Error("ListBatches: summarize failed", "depId", depID, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{
"error": "store-read-failed",
})
return
}
if out == nil {
out = []jobs.BatchSummary{}
}
writeJSON(w, http.StatusOK, map[string]any{
"batches": out,
})
}
// GetExecutionLogs handles GET
// /api/v1/actions/executions/{execId}/logs?fromLine=N&limit=M.
//
// Returns `{ "lines": [...], "total": N, "executionFinished": bool }`.
// Pagination contract:
//
// - fromLine — 1-indexed, default 1 (omitted / non-positive ⇒ 1).
// - limit — default DefaultLogPageSize (500), max MaxLogPageSize
// (5000). Out-of-range values are clamped silently —
// the frontend's polling loop never has to retry on
// 422.
//
// The endpoint deliberately omits the deploymentId from the URL path —
// the spec in #205 wants a flat /actions/executions/{id}/logs surface
// the GitLab-style viewer can deep-link to without juggling the
// parent deployment id. The store walks every deployment subdir to
// resolve the executionId.
func (h *Handler) GetExecutionLogs(w http.ResponseWriter, r *http.Request) {
st := h.jobsStore()
if st == nil {
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
"error": "jobs-store-unavailable",
})
return
}
execID := strings.TrimSpace(chi.URLParam(r, "execId"))
if execID == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{
"error": "missing-execId",
})
return
}
q := r.URL.Query()
fromLine, _ := strconv.Atoi(strings.TrimSpace(q.Get("fromLine")))
limit, _ := strconv.Atoi(strings.TrimSpace(q.Get("limit")))
// Resolve the deploymentID by scanning the store. The Bridge
// guarantees executionId uniqueness (16-byte hex) so first match
// wins.
exec, err := st.FindExecutionAcrossDeployments(execID)
if err != nil {
if errors.Is(err, jobs.ErrNotFound) {
writeJSON(w, http.StatusNotFound, map[string]string{
"error": "execution-not-found",
})
return
}
h.log.Error("GetExecutionLogs: lookup failed", "execId", execID, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{
"error": "store-read-failed",
})
return
}
page, err := st.PageLogs(exec.DeploymentID, execID, fromLine, limit)
if err != nil {
if errors.Is(err, jobs.ErrNotFound) {
writeJSON(w, http.StatusNotFound, map[string]string{
"error": "execution-not-found",
})
return
}
h.log.Error("GetExecutionLogs: page failed", "execId", execID, "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{
"error": "store-read-failed",
})
return
}
if page.Lines == nil {
page.Lines = []jobs.LogLine{}
}
writeJSON(w, http.StatusOK, page)
}

View File

@ -0,0 +1,184 @@
// jobs_helmwatch_test.go — integration test that proves a HelmRelease
// component event flowing through emitWatchEvent (the single emit
// path Phase 0 + Phase 1 share) materialises a Job + Execution +
// LogLine in the jobs store. Sibling SSE feed must stay intact.
package handler
import (
"io"
"log/slog"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
)
func TestEmitWatchEvent_PopulatesJobsStore(t *testing.T) {
js, err := jobs.NewStore(t.TempDir())
if err != nil {
t.Fatalf("NewStore: %v", err)
}
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), js)
// Build a minimal Deployment with the channels emitWatchEvent
// expects. We don't run any goroutine; we just call the emit
// method directly to assert the store-side projection.
dep := &Deployment{
ID: "dep-emit",
eventsCh: make(chan provisioner.Event, 8),
eventsBuf: nil,
done: make(chan struct{}),
}
// Phase-0 event must NOT create a Job (filtered by bridge).
h.emitWatchEvent(dep, provisioner.Event{
Time: time.Now().UTC().Format(time.RFC3339),
Phase: "tofu-apply",
Level: "info",
Message: "applying...",
})
got, err := js.ListJobs("dep-emit")
if err != nil {
t.Fatal(err)
}
if len(got) != 0 {
t.Errorf("Phase-0 event must not create jobs, got %+v", got)
}
// Phase-1 component event → Job + Execution + LogLine.
t0 := time.Now().UTC()
h.emitWatchEvent(dep, provisioner.Event{
Time: t0.Format(time.RFC3339),
Phase: "component",
Level: "info",
Component: "cilium",
State: "installing",
Message: "Helm install in progress",
})
h.emitWatchEvent(dep, provisioner.Event{
Time: t0.Add(5 * time.Second).Format(time.RFC3339),
Phase: "component",
Level: "info",
Component: "cilium",
State: "installed",
Message: "Ready=True",
})
got, err = js.ListJobs("dep-emit")
if err != nil {
t.Fatal(err)
}
if len(got) != 1 {
t.Fatalf("expected 1 job, got %d", len(got))
}
job := got[0]
if job.JobName != "install-cilium" || job.AppID != "cilium" {
t.Errorf("job metadata: %+v", job)
}
if job.Status != jobs.StatusSucceeded {
t.Errorf("status: want succeeded, got %q", job.Status)
}
// Bridge must NOT break the SSE stream — eventsBuf still records
// every emit.
dep.mu.Lock()
bufLen := len(dep.eventsBuf)
dep.mu.Unlock()
if bufLen != 3 {
t.Errorf("eventsBuf length: want 3 (1 phase-0 + 2 phase-1), got %d", bufLen)
}
}
func TestEmitWatchEvent_NoStore_NoCrash(t *testing.T) {
// When the jobs store is nil (CI runner without write access)
// emitWatchEvent must still record into the SSE buffer.
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), nil)
dep := &Deployment{
ID: "dep-nostore",
eventsCh: make(chan provisioner.Event, 4),
done: make(chan struct{}),
}
h.emitWatchEvent(dep, provisioner.Event{
Time: time.Now().UTC().Format(time.RFC3339),
Phase: "component",
Component: "cilium",
State: "installed",
Level: "info",
Message: "ok",
})
dep.mu.Lock()
bufLen := len(dep.eventsBuf)
dep.mu.Unlock()
if bufLen != 1 {
t.Errorf("expected 1 buffered event, got %d", bufLen)
}
}
func TestRouter_AllFourEndpointsWired(t *testing.T) {
// End-to-end smoke that the 4 endpoints can be routed and produce
// well-shaped JSON. Mirrors the exact route patterns main.go uses.
js, err := jobs.NewStore(t.TempDir())
if err != nil {
t.Fatal(err)
}
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), js)
r := chi.NewRouter()
r.Get("/api/v1/deployments/{depId}/jobs", h.ListJobs)
r.Get("/api/v1/deployments/{depId}/jobs/batches", h.ListBatches)
r.Get("/api/v1/deployments/{depId}/jobs/{jobId}", h.GetJob)
r.Get("/api/v1/actions/executions/{execId}/logs", h.GetExecutionLogs)
// Seed a deployment with a finished job + a tail of log lines.
depID := "dep-router"
if err := js.UpsertJob(jobs.Job{
DeploymentID: depID,
JobName: "install-cilium",
AppID: "cilium",
BatchID: jobs.BatchBootstrapKit,
DependsOn: []string{},
Status: jobs.StatusPending,
}); err != nil {
t.Fatal(err)
}
exec, err := js.StartExecution(depID, "install-cilium", time.Now())
if err != nil {
t.Fatal(err)
}
if err := js.AppendLogLines(depID, exec.ID, []jobs.LogLine{
{Level: jobs.LevelInfo, Message: "first"},
{Level: jobs.LevelInfo, Message: "second"},
}); err != nil {
t.Fatal(err)
}
if err := js.FinishExecution(depID, exec.ID, jobs.StatusSucceeded, time.Now()); err != nil {
t.Fatal(err)
}
cases := []struct {
name string
path string
}{
{"list-jobs", "/api/v1/deployments/" + depID + "/jobs"},
{"get-job", "/api/v1/deployments/" + depID + "/jobs/" + jobs.JobID(depID, "install-cilium")},
{"batches", "/api/v1/deployments/" + depID + "/jobs/batches"},
{"logs", "/api/v1/actions/executions/" + exec.ID + "/logs?fromLine=1&limit=10"},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
rec := httptest.NewRecorder()
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, c.path, nil))
if rec.Code != 200 {
t.Errorf("status %d body=%s", rec.Code, rec.Body.String())
}
if rec.Body.Len() == 0 {
t.Error("empty body")
}
})
}
}

View File

@ -0,0 +1,274 @@
// jobs_test.go — httptest-driven handler tests for the 4 Jobs/
// Executions endpoints. Each test seeds a fresh in-memory store via
// NewWithJobsStore(t.TempDir()), wires the chi router the production
// main.go does, then asserts on the JSON shape end-to-end.
package handler
import (
"encoding/json"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
)
// newJobsAPIRouter wires the same chi routes main.go does, but only
// for the 4 endpoints under test. Avoids spinning up the full HTTP
// surface for these unit tests.
func newJobsAPIRouter(t *testing.T) (*chi.Mux, *jobs.Store, *Handler) {
t.Helper()
js, err := jobs.NewStore(t.TempDir())
if err != nil {
t.Fatalf("NewStore: %v", err)
}
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), js)
r := chi.NewRouter()
r.Get("/api/v1/deployments/{depId}/jobs", h.ListJobs)
r.Get("/api/v1/deployments/{depId}/jobs/batches", h.ListBatches)
r.Get("/api/v1/deployments/{depId}/jobs/{jobId}", h.GetJob)
r.Get("/api/v1/actions/executions/{execId}/logs", h.GetExecutionLogs)
return r, js, h
}
func decodeJSON(t *testing.T, body io.Reader, into any) {
t.Helper()
if err := json.NewDecoder(body).Decode(into); err != nil {
t.Fatalf("decode: %v", err)
}
}
func TestHandler_ListJobs_Empty(t *testing.T) {
r, _, _ := newJobsAPIRouter(t)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/dep-empty/jobs", nil))
if rec.Code != http.StatusOK {
t.Fatalf("status: %d body=%s", rec.Code, rec.Body.String())
}
body := rec.Body.String()
var resp struct {
Jobs []jobs.Job `json:"jobs"`
}
if err := json.NewDecoder(strings.NewReader(body)).Decode(&resp); err != nil {
t.Fatalf("decode: %v body=%s", err, body)
}
if resp.Jobs == nil {
t.Fatal("jobs must be empty slice not null")
}
if len(resp.Jobs) != 0 {
t.Errorf("expected 0 jobs, got %d", len(resp.Jobs))
}
// Verify the `jobs` key is `[]` not `null` in the raw body.
if !strings.Contains(body, `"jobs"`) {
t.Errorf("missing jobs key: %s", body)
}
if !strings.Contains(body, `[]`) {
t.Errorf("expected empty array `[]`: %s", body)
}
}
func TestHandler_ListJobs_Populated(t *testing.T) {
r, st, _ := newJobsAPIRouter(t)
depID := "dep-populated"
t0 := time.Date(2026, 4, 29, 12, 0, 0, 0, time.UTC)
jobsToSeed := []jobs.Job{
{DeploymentID: depID, JobName: "install-cilium", AppID: "cilium", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusSucceeded, StartedAt: &t0, FinishedAt: ptrTime(t0.Add(20 * time.Second))},
{DeploymentID: depID, JobName: "install-flux", AppID: "flux", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusRunning, StartedAt: ptrTime(t0.Add(time.Minute))},
{DeploymentID: depID, JobName: "install-keycloak", AppID: "keycloak", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusPending},
}
for _, j := range jobsToSeed {
if err := st.UpsertJob(j); err != nil {
t.Fatal(err)
}
}
rec := httptest.NewRecorder()
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/"+depID+"/jobs", nil))
if rec.Code != 200 {
t.Fatalf("code: %d", rec.Code)
}
var resp struct {
Jobs []jobs.Job `json:"jobs"`
}
decodeJSON(t, rec.Body, &resp)
if len(resp.Jobs) != 3 {
t.Fatalf("expected 3 jobs, got %d", len(resp.Jobs))
}
// Started DESC: install-flux (t0+1m) first, install-cilium (t0)
// second, install-keycloak (pending) last.
wantOrder := []string{"install-flux", "install-cilium", "install-keycloak"}
for i, w := range wantOrder {
if resp.Jobs[i].JobName != w {
t.Errorf("position %d: got %q want %q", i, resp.Jobs[i].JobName, w)
}
}
}
func TestHandler_GetJob_FoundAndNotFound(t *testing.T) {
r, st, _ := newJobsAPIRouter(t)
depID := "dep-getjob"
if err := st.UpsertJob(jobs.Job{
DeploymentID: depID,
JobName: "install-cilium",
AppID: "cilium",
BatchID: jobs.BatchBootstrapKit,
Status: jobs.StatusPending,
DependsOn: []string{"install-flux"},
}); err != nil {
t.Fatal(err)
}
exec, err := st.StartExecution(depID, "install-cilium", time.Now())
if err != nil {
t.Fatal(err)
}
url := "/api/v1/deployments/" + depID + "/jobs/" + jobs.JobID(depID, "install-cilium")
rec := httptest.NewRecorder()
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, url, nil))
if rec.Code != 200 {
t.Fatalf("status: %d body=%s", rec.Code, rec.Body.String())
}
var resp struct {
Job jobs.Job `json:"job"`
Executions []jobs.Execution `json:"executions"`
}
decodeJSON(t, rec.Body, &resp)
if resp.Job.AppID != "cilium" {
t.Errorf("appId: %q", resp.Job.AppID)
}
if len(resp.Job.DependsOn) != 1 || resp.Job.DependsOn[0] != "install-flux" {
t.Errorf("dependsOn: %+v", resp.Job.DependsOn)
}
if len(resp.Executions) != 1 || resp.Executions[0].ID != exec.ID {
t.Errorf("executions: %+v", resp.Executions)
}
// 404 path
rec404 := httptest.NewRecorder()
r.ServeHTTP(rec404, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/"+depID+"/jobs/"+jobs.JobID(depID, "install-missing"), nil))
if rec404.Code != http.StatusNotFound {
t.Errorf("404 expected, got %d", rec404.Code)
}
}
func TestHandler_GetExecutionLogs_Pagination(t *testing.T) {
r, st, _ := newJobsAPIRouter(t)
depID := "dep-logs"
if err := st.UpsertJob(jobs.Job{DeploymentID: depID, JobName: "install-x"}); err != nil {
t.Fatal(err)
}
exec, err := st.StartExecution(depID, "install-x", time.Now())
if err != nil {
t.Fatal(err)
}
lines := make([]jobs.LogLine, 50)
for i := range lines {
lines[i] = jobs.LogLine{Level: jobs.LevelInfo, Message: "log"}
}
if err := st.AppendLogLines(depID, exec.ID, lines); err != nil {
t.Fatal(err)
}
rec := httptest.NewRecorder()
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/actions/executions/"+exec.ID+"/logs?fromLine=10&limit=5", nil))
if rec.Code != 200 {
t.Fatalf("status: %d body=%s", rec.Code, rec.Body.String())
}
var resp jobs.LogPage
decodeJSON(t, rec.Body, &resp)
if len(resp.Lines) != 5 {
t.Fatalf("lines: %d", len(resp.Lines))
}
if resp.Lines[0].LineNumber != 10 || resp.Lines[4].LineNumber != 14 {
t.Errorf("LineNumbers: %+v", resp.Lines)
}
if resp.Total != 50 {
t.Errorf("total: %d", resp.Total)
}
if resp.ExecutionFinished {
t.Errorf("ExecutionFinished should be false while running")
}
}
func TestHandler_GetExecutionLogs_NotFound(t *testing.T) {
r, _, _ := newJobsAPIRouter(t)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/actions/executions/no-such/logs", nil))
if rec.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d", rec.Code)
}
}
func TestHandler_ListBatches(t *testing.T) {
r, st, _ := newJobsAPIRouter(t)
depID := "dep-batches"
now := time.Now().UTC()
seeds := []jobs.Job{
{DeploymentID: depID, JobName: "install-a", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusSucceeded, StartedAt: &now},
{DeploymentID: depID, JobName: "install-b", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusFailed, StartedAt: &now},
{DeploymentID: depID, JobName: "install-c", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusRunning, StartedAt: &now},
{DeploymentID: depID, JobName: "install-d", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusPending},
}
for _, j := range seeds {
if err := st.UpsertJob(j); err != nil {
t.Fatal(err)
}
}
rec := httptest.NewRecorder()
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/"+depID+"/jobs/batches", nil))
if rec.Code != 200 {
t.Fatalf("code: %d", rec.Code)
}
var resp struct {
Batches []jobs.BatchSummary `json:"batches"`
}
decodeJSON(t, rec.Body, &resp)
if len(resp.Batches) != 1 {
t.Fatalf("batches: %+v", resp.Batches)
}
bs := resp.Batches[0]
if bs.BatchID != jobs.BatchBootstrapKit ||
bs.Total != 4 ||
bs.Succeeded != 1 ||
bs.Failed != 1 ||
bs.Running != 1 ||
bs.Pending != 1 ||
bs.Finished != 2 {
t.Errorf("batch summary: %+v", bs)
}
}
func TestHandler_ListBatches_EmptySliceNotNull(t *testing.T) {
r, _, _ := newJobsAPIRouter(t)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/dep-x/jobs/batches", nil))
if rec.Code != 200 {
t.Fatalf("code: %d", rec.Code)
}
body := rec.Body.String()
if !strings.Contains(body, `"batches":[]`) {
t.Errorf("expected `\"batches\":[]`, got %s", body)
}
}
func TestHandler_NoStore_503(t *testing.T) {
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), nil)
r := chi.NewRouter()
r.Get("/api/v1/deployments/{depId}/jobs", h.ListJobs)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/d/jobs", nil))
if rec.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503, got %d", rec.Code)
}
}

View File

@ -0,0 +1,314 @@
// helmwatch_bridge.go — translates helmwatch.Watcher events into Job +
// Execution + LogLine writes against the Store.
//
// The bridge is a goroutine-safe object (every method takes the
// store's mutex implicitly through the Store API). The catalyst-api
// constructs one Bridge per deployment and feeds it via OnEvent on the
// same path runProvisioning already feeds the SSE buffer. The two
// feeds (jobs/executions REST + the existing SSE stream) live in
// parallel — neither is ever the source of truth for the other.
//
// Mapping
//
// - Each bp-<chart> HelmRelease maps to exactly one Job whose
// jobName="install-<chart>".
// - The Bridge calls UpsertJob on every component event so a new
// HelmRelease (the first time helmwatch emits it) materialises a
// pending Job row.
// - On the first transition out of helmwatch.StatePending the
// Bridge calls StartExecution to allocate a new Execution row
// and stamp the Job's StartedAt + LatestExecutionID.
// - On every component event the Bridge appends a LogLine to the
// active Execution's NDJSON file (level mapped via levelFor). The
// bridge keeps no in-memory line buffer — append is the canonical
// persistence path.
// - On a terminal helmwatch state (Installed / Failed) the Bridge
// calls FinishExecution which flips the Job's terminal status +
// stamps DurationMs.
//
// DependsOn — the bridge does not derive dependsOn from helmwatch
// events (helmwatch.Event carries only conditions). The catalyst-api
// is expected to call SeedJobs once at watch start with the bp-*
// HelmRelease specs (the bootstrap-kit YAMLs ship dependsOn metadata
// the helmwatch reader can stamp directly via the same dynamic
// informer it already runs). This file exposes the SeedJobs entry
// point; the actual wiring lives in
// internal/handler/phase1_watch.go.
package jobs
import (
"strings"
"time"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
)
// Helmwatch state strings — kept here as untyped consts so the bridge
// does not import the helmwatch package and create an import cycle
// (helmwatch_test → handler → jobs would otherwise close back into
// jobs needing helmwatch types).
const (
HelmStatePending = "pending"
HelmStateInstalling = "installing"
HelmStateInstalled = "installed"
HelmStateDegraded = "degraded"
HelmStateFailed = "failed"
)
// Bridge holds the per-deployment cursor the helmwatch consumer needs:
// which Execution is currently active for which Job. The cursor is
// memory-only and is discarded on Pod restart — a resumed Phase-1
// watch starts a fresh Execution row, which is the correct behaviour
// (an Execution is "one watch attempt"; a Pod restart legitimately
// counts as a new attempt).
type Bridge struct {
store *Store
deploymentID string
// activeExecID — per-job map of the in-flight Execution.id. Set
// on the first transition out of StatePending; cleared when the
// Job reaches a terminal state. Concurrent OnEvent calls for
// different Jobs are race-free because the Store serialises
// every write under its own mutex; however, the per-Job cursor
// itself is not accessed concurrently for the same Job by
// design (helmwatch emits state changes for a given component
// strictly sequentially).
activeExecID map[string]string
// lastState — per-job last-seen helmwatch state, so the bridge
// can suppress duplicate appends when helmwatch refires UpdateFunc
// at the informer's resync cadence without an actual transition.
lastState map[string]string
}
// NewBridge returns a fresh Bridge for the given deployment id.
// store must be non-nil; deploymentID must be non-empty.
func NewBridge(store *Store, deploymentID string) *Bridge {
return &Bridge{
store: store,
deploymentID: deploymentID,
activeExecID: map[string]string{},
lastState: map[string]string{},
}
}
// SeedJobs registers the supplied jobs against the deployment in a
// pending state. Used by the catalyst-api at Phase-1 watch start to
// pre-populate the table view with rows + dependsOn before any
// HelmRelease has reconciled.
//
// Each spec carries the chart name (without the bp- prefix) and the
// list of dependent chart names (also without the bp- prefix). The
// bridge translates them to JobName + DependsOn list using the
// install-<chart> convention.
func (b *Bridge) SeedJobs(specs []SeedSpec) error {
now := time.Now().UTC()
for _, sp := range specs {
j := Job{
DeploymentID: b.deploymentID,
JobName: JobNamePrefix + sp.Chart,
AppID: sp.Chart,
BatchID: BatchBootstrapKit,
DependsOn: dependsOnFromCharts(sp.DependsOn),
Status: StatusPending,
}
if err := b.store.UpsertJob(j); err != nil {
return err
}
_ = now // reserved: future "createdAt" field, intentionally
// unused for now — the wire spec in #205 doesn't include
// it.
}
return nil
}
// SeedSpec is the per-Job seed input: the chart name (after the bp-
// prefix is stripped) plus the list of dependent chart names (also
// post-strip). The handler builds this list from the bootstrap-kit
// YAMLs it already reads via the helmwatch dynamic informer.
type SeedSpec struct {
Chart string
DependsOn []string
}
// OnHelmReleaseEvent is the single entry point the helmwatch
// consumer calls. componentID is helmwatch.ComponentIDFromHelmRelease
// (the chart name with bp- stripped); state is one of the HelmState*
// constants; level + message map onto the LogLine. Time is the wall-
// clock instant of the event — the LogLine inherits it directly.
//
// The function is a no-op when state == previous state for the same
// componentID — helmwatch's UpdateFunc fires on every status sub-
// resource patch, including helm-controller's own observedGeneration
// touches, and we don't want to persist a fresh LogLine for each.
//
// The bridge tolerates store errors (returns them) but does not abort
// the helmwatch event loop — the handler's emit path treats this as
// a non-fatal best-effort write.
func (b *Bridge) OnHelmReleaseEvent(componentID, state, level, message string, t time.Time) error {
jobName := JobNamePrefix + componentID
prev := b.lastState[componentID]
if prev == state {
return nil
}
b.lastState[componentID] = state
// Ensure the Job row exists. If SeedJobs was called this is a
// no-op merge; if it wasn't (e.g. the bootstrap-kit hot-shipped a
// new chart helmwatch wasn't seeded with) the bridge still
// auto-creates a row so no event is dropped.
if err := b.store.UpsertJob(Job{
DeploymentID: b.deploymentID,
JobName: jobName,
AppID: componentID,
BatchID: BatchBootstrapKit,
DependsOn: []string{},
Status: jobStatusFromHelmState(state),
}); err != nil {
return err
}
// Allocate an Execution if the Job has just become non-pending and
// no Execution is active.
execID := b.activeExecID[componentID]
if execID == "" && state != HelmStatePending {
exec, err := b.store.StartExecution(b.deploymentID, jobName, t)
if err != nil {
return err
}
execID = exec.ID
b.activeExecID[componentID] = execID
}
// Append a LogLine for every observed transition. The bridge
// drops the line if there is no active execution (the Job is
// still pending) — the table view's "started" column is
// authoritative for pending rows; a LogLine without an Execution
// has no display surface.
if execID != "" {
ll := LogLine{
Timestamp: t.UTC(),
Level: mapLevel(level),
Message: buildLogMessage(componentID, state, message),
}
if err := b.store.AppendLogLines(b.deploymentID, execID, []LogLine{ll}); err != nil {
return err
}
}
// Terminal-state Job: finish the Execution + clear the cursor so
// a future re-run (Day-2 retry) gets a fresh Execution row.
if execID != "" && (state == HelmStateInstalled || state == HelmStateFailed) {
final := StatusSucceeded
if state == HelmStateFailed {
final = StatusFailed
}
if err := b.store.FinishExecution(b.deploymentID, execID, final, t); err != nil {
return err
}
delete(b.activeExecID, componentID)
}
return nil
}
// OnProvisionerEvent is a convenience adapter: the handler's emit
// path passes provisioner.Event (the same struct the SSE stream
// carries). Only PhaseComponent events are forwarded — Phase-0 OpenTofu
// events have no Job analogue and fall through silently.
//
// The function is allocation-light: it builds no event copies, just
// translates strings.
func (b *Bridge) OnProvisionerEvent(ev provisioner.Event) error {
if ev.Phase != "component" || ev.Component == "" || ev.State == "" {
return nil
}
t := parseEventTime(ev.Time)
return b.OnHelmReleaseEvent(ev.Component, ev.State, ev.Level, ev.Message, t)
}
// dependsOnFromCharts converts a list of dependent chart names (e.g.
// ["cilium", "cert-manager"]) into the install-<chart> jobName form
// the wire spec expects. Empty input yields an empty (non-nil) slice
// so the JSON shape is `[]` not `null`.
func dependsOnFromCharts(charts []string) []string {
out := make([]string, 0, len(charts))
for _, c := range charts {
c = strings.TrimSpace(c)
c = strings.TrimPrefix(c, "bp-")
if c == "" {
continue
}
out = append(out, JobNamePrefix+c)
}
return out
}
// jobStatusFromHelmState maps helmwatch's State enum onto the Job
// Status enum. The Bridge writes this through UpsertJob on every
// event so the table view reflects current state without waiting for
// a terminal transition.
func jobStatusFromHelmState(state string) string {
switch state {
case HelmStateInstalled:
return StatusSucceeded
case HelmStateFailed:
return StatusFailed
case HelmStateInstalling, HelmStateDegraded:
return StatusRunning
case HelmStatePending:
return StatusPending
}
return StatusPending
}
// mapLevel translates the helmwatch event level (info|warn|error)
// onto the LogLine wire-format level (INFO|WARN|ERROR|DEBUG). The
// wire spec is uppercase per the GitLab-CI viewer convention (#204).
func mapLevel(level string) string {
switch strings.ToLower(strings.TrimSpace(level)) {
case "error":
return LevelError
case "warn", "warning":
return LevelWarn
case "debug":
return LevelDebug
default:
return LevelInfo
}
}
// buildLogMessage formats the LogLine.Message text. We prepend a
// "[<state>]" tag so an operator scrolling the GitLab-style viewer
// can scan transitions without horizontal scanning. The original
// helm-controller message (HelmRelease.status.conditions[Ready].
// Message) is preserved unchanged after the tag.
func buildLogMessage(componentID, state, message string) string {
state = strings.TrimSpace(state)
message = strings.TrimSpace(message)
if state == "" {
return message
}
if message == "" {
return "[" + state + "] " + componentID
}
return "[" + state + "] " + message
}
// parseEventTime parses the RFC3339 timestamp helmwatch stamps onto
// every Event. A bad parse falls back to time.Now() so a malformed
// timestamp doesn't drop the LogLine.
func parseEventTime(s string) time.Time {
if s == "" {
return time.Now().UTC()
}
t, err := time.Parse(time.RFC3339, s)
if err != nil {
t, err = time.Parse(time.RFC3339Nano, s)
}
if err != nil {
return time.Now().UTC()
}
return t.UTC()
}

View File

@ -0,0 +1,236 @@
// helmwatch_bridge_test.go — assert that helmwatch component events
// translate into the Job + Execution + LogLine writes the table-view
// UX renders.
package jobs
import (
"strings"
"testing"
"time"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
)
func newBridgeFixture(t *testing.T) (*Store, *Bridge, string) {
t.Helper()
st, err := NewStore(t.TempDir())
if err != nil {
t.Fatalf("NewStore: %v", err)
}
depID := "dep-bridge"
return st, NewBridge(st, depID), depID
}
func TestBridge_SeedJobs_StripsBpPrefix(t *testing.T) {
st, br, depID := newBridgeFixture(t)
if err := br.SeedJobs([]SeedSpec{
{Chart: "cilium"},
{Chart: "cert-manager", DependsOn: []string{"bp-cilium", "cilium"}},
}); err != nil {
t.Fatal(err)
}
got, err := st.ListJobs(depID)
if err != nil {
t.Fatal(err)
}
if len(got) != 2 {
t.Fatalf("expected 2 jobs, got %d", len(got))
}
var cm Job
for _, j := range got {
if j.JobName == "install-cert-manager" {
cm = j
}
}
if cm.JobName == "" {
t.Fatal("no install-cert-manager job")
}
if cm.AppID != "cert-manager" || cm.BatchID != BatchBootstrapKit || cm.Status != StatusPending {
t.Errorf("seed metadata: %+v", cm)
}
// dependsOn: bp- prefix must be stripped, then install- prepended.
want := []string{"install-cilium", "install-cilium"}
if len(cm.DependsOn) != len(want) {
t.Fatalf("dependsOn len: %v", cm.DependsOn)
}
for i, w := range want {
if cm.DependsOn[i] != w {
t.Errorf("dependsOn[%d]: got %q, want %q", i, cm.DependsOn[i], w)
}
}
}
func TestBridge_OnHelmReleaseEvent_HappyPath(t *testing.T) {
st, br, depID := newBridgeFixture(t)
t0 := time.Date(2026, 4, 29, 12, 0, 0, 0, time.UTC)
if err := br.OnHelmReleaseEvent("cilium", HelmStatePending, "info", "observed", t0); err != nil {
t.Fatal(err)
}
got, _ := st.ListJobs(depID)
if len(got) != 1 || got[0].Status != StatusPending {
t.Fatalf("after pending: %+v", got)
}
if got[0].LatestExecutionID != "" {
t.Fatalf("Pending must not allocate an execution: %+v", got[0])
}
// Transition into installing — allocates an Execution.
t1 := t0.Add(2 * time.Second)
if err := br.OnHelmReleaseEvent("cilium", HelmStateInstalling, "info", "Helm install in progress", t1); err != nil {
t.Fatal(err)
}
got, _ = st.ListJobs(depID)
if got[0].Status != StatusRunning {
t.Errorf("status: want running, got %q", got[0].Status)
}
if got[0].LatestExecutionID == "" {
t.Fatalf("execution not allocated")
}
if got[0].StartedAt == nil || !got[0].StartedAt.Equal(t1) {
t.Errorf("StartedAt: got %v want %v", got[0].StartedAt, t1)
}
// Terminal: installed.
t2 := t1.Add(30 * time.Second)
if err := br.OnHelmReleaseEvent("cilium", HelmStateInstalled, "info", "Ready=True", t2); err != nil {
t.Fatal(err)
}
job, execs, err := st.GetJob(depID, JobID(depID, "install-cilium"))
if err != nil {
t.Fatal(err)
}
if job.Status != StatusSucceeded {
t.Errorf("final status: want succeeded, got %q", job.Status)
}
if job.FinishedAt == nil || !job.FinishedAt.Equal(t2) {
t.Errorf("FinishedAt: got %v want %v", job.FinishedAt, t2)
}
if job.DurationMs != 30000 {
t.Errorf("DurationMs: got %d want 30000", job.DurationMs)
}
if len(execs) != 1 || execs[0].Status != StatusSucceeded {
t.Errorf("executions: %+v", execs)
}
// Logs: 2 transitions (installing, installed) → 2 lines, prefixed.
page, _ := st.PageLogs(depID, execs[0].ID, 1, 100)
if page.Total != 2 || len(page.Lines) != 2 {
t.Fatalf("logs: %+v", page)
}
if !strings.HasPrefix(page.Lines[0].Message, "[installing]") {
t.Errorf("line0 message prefix: %q", page.Lines[0].Message)
}
if !strings.HasPrefix(page.Lines[1].Message, "[installed]") {
t.Errorf("line1 message prefix: %q", page.Lines[1].Message)
}
}
func TestBridge_OnHelmReleaseEvent_FailedTerminal(t *testing.T) {
st, br, depID := newBridgeFixture(t)
t0 := time.Now().UTC()
if err := br.OnHelmReleaseEvent("flux", HelmStateInstalling, "info", "first reconcile", t0); err != nil {
t.Fatal(err)
}
if err := br.OnHelmReleaseEvent("flux", HelmStateFailed, "error", "InstallFailed: chart not found", t0.Add(time.Second)); err != nil {
t.Fatal(err)
}
job, _, _ := st.GetJob(depID, JobID(depID, "install-flux"))
if job.Status != StatusFailed {
t.Errorf("status: want failed, got %q", job.Status)
}
page, _ := st.PageLogs(depID, job.LatestExecutionID, 1, 100)
hasError := false
for _, ll := range page.Lines {
if ll.Level == LevelError {
hasError = true
}
}
if !hasError {
t.Errorf("expected at least one ERROR log line, got %+v", page.Lines)
}
}
func TestBridge_DuplicateStateSuppressed(t *testing.T) {
st, br, depID := newBridgeFixture(t)
t0 := time.Now().UTC()
for i := 0; i < 5; i++ {
if err := br.OnHelmReleaseEvent("foo", HelmStateInstalling, "info", "spinning", t0.Add(time.Duration(i)*time.Second)); err != nil {
t.Fatal(err)
}
}
job, _, _ := st.GetJob(depID, JobID(depID, "install-foo"))
page, _ := st.PageLogs(depID, job.LatestExecutionID, 1, 100)
// Only the first emit registers as a transition; the next four
// repeats are suppressed by lastState.
if page.Total != 1 {
t.Errorf("expected 1 line for 5 dup events, got %d", page.Total)
}
}
func TestBridge_OnProvisionerEvent_FiltersPhase0(t *testing.T) {
st, br, depID := newBridgeFixture(t)
// Phase-0 OpenTofu event has no Component/State — bridge must drop.
if err := br.OnProvisionerEvent(provisioner.Event{
Time: time.Now().UTC().Format(time.RFC3339),
Phase: "tofu-apply",
Level: "info",
Message: "Apply complete",
}); err != nil {
t.Fatal(err)
}
got, _ := st.ListJobs(depID)
if len(got) != 0 {
t.Errorf("Phase-0 event must not create jobs, got %+v", got)
}
// Phase-1 component event creates a Job.
if err := br.OnProvisionerEvent(provisioner.Event{
Time: time.Now().UTC().Format(time.RFC3339),
Phase: "component",
Level: "info",
Component: "cilium",
State: HelmStateInstalling,
Message: "in progress",
}); err != nil {
t.Fatal(err)
}
got, _ = st.ListJobs(depID)
if len(got) != 1 || got[0].JobName != "install-cilium" {
t.Errorf("expected install-cilium job, got %+v", got)
}
}
func TestMapLevel(t *testing.T) {
cases := map[string]string{
"": LevelInfo,
"info": LevelInfo,
"warn": LevelWarn,
"warning": LevelWarn,
"error": LevelError,
"debug": LevelDebug,
"WEIRD": LevelInfo,
}
for in, want := range cases {
if got := mapLevel(in); got != want {
t.Errorf("mapLevel(%q): got %q, want %q", in, got, want)
}
}
}
func TestJobStatusFromHelmState(t *testing.T) {
cases := map[string]string{
HelmStateInstalled: StatusSucceeded,
HelmStateFailed: StatusFailed,
HelmStateInstalling: StatusRunning,
HelmStateDegraded: StatusRunning,
HelmStatePending: StatusPending,
"": StatusPending,
"unknown": StatusPending,
}
for in, want := range cases {
if got := jobStatusFromHelmState(in); got != want {
t.Errorf("jobStatusFromHelmState(%q): got %q, want %q", in, got, want)
}
}
}

View File

@ -0,0 +1,716 @@
// store.go — flat-file persistence for Jobs + Executions + LogLines.
//
// Three on-disk artefacts per deployment:
//
// - <dir>/<deploymentId>/index.json — atomic temp+rename, holds
// the Job + Execution
// metadata.
// - <dir>/<deploymentId>/<execId>.log — append-only NDJSON, one
// LogLine per line.
// - The directory itself is created at 0o700 the first time the
// store touches a deployment.
//
// Atomicity: every persistIndex call writes to a sibling temp file then
// os.Rename. Concurrent calls are serialised under Store.mu so the
// rename is the linearisation point — a crash mid-write leaves the old
// index intact (or, on first write, a missing file the load path
// treats as "no jobs yet").
//
// NDJSON append: opened O_APPEND on every LogLines write. The store
// holds Store.mu around the open+write+close so concurrent writers
// can't interleave bytes (NDJSON is line-oriented; partial writes
// would corrupt parsing).
package jobs
import (
"bufio"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
)
// DefaultDir — the on-PVC path the chart already mounts at
// /var/lib/catalyst (see products/catalyst/chart/templates/api-deployment.yaml).
// Per docs/INVIOLABLE-PRINCIPLES.md #4 the path is configuration, not
// code; the env var CATALYST_EXECUTIONS_DIR overrides it.
const DefaultDir = "/var/lib/catalyst/executions"
// EnvDir is the env var the catalyst-api main reads to override the
// store directory. Empty / unset falls back to DefaultDir.
const EnvDir = "CATALYST_EXECUTIONS_DIR"
// indexFileName — the per-deployment metadata file.
const indexFileName = "index.json"
// MaxLogPageSize — upper bound the API enforces on the /logs
// pagination `limit` query param. The wire spec in #205 documents the
// same number. Hardcoded here so the store's pagination helper agrees
// with the handler.
const MaxLogPageSize = 5000
// DefaultLogPageSize — default `limit` when the caller omits the query
// param.
const DefaultLogPageSize = 500
// ErrNotFound is returned when the requested Job, Execution, or
// Deployment doesn't exist in the store. Callers map this onto HTTP
// 404; tests assert on errors.Is.
var ErrNotFound = errors.New("jobs: not found")
// Store is the flat-file persistence layer for Jobs + Executions +
// LogLines. Construct via NewStore; Close is a no-op (no FDs are kept
// open between calls).
//
// All writes are serialised under mu — the store is designed for
// dozens of writes/sec from a single helmwatch goroutine, not high-
// concurrency log ingestion. Reads also take mu so a partially-written
// index can never be observed by GET /jobs.
type Store struct {
dir string
mu sync.Mutex
}
// NewStore returns a Store rooted at dir, creating the directory at
// 0o700 if missing. A failure to create the root directory is fatal —
// production manifests guarantee the PVC exists, and a CI runner
// without write access surfaces an unmistakable error rather than
// silently dropping logs.
func NewStore(dir string) (*Store, error) {
dir = strings.TrimSpace(dir)
if dir == "" {
return nil, errors.New("jobs: store directory is required (set CATALYST_EXECUTIONS_DIR or pass DefaultDir)")
}
if err := os.MkdirAll(dir, 0o700); err != nil {
return nil, fmt.Errorf("jobs: create store dir %q: %w", dir, err)
}
return &Store{dir: dir}, nil
}
// Dir returns the absolute root path the Store persists to. Used by
// log-paths the handler renders into operator diagnostics.
func (s *Store) Dir() string {
return s.dir
}
// deploymentDir returns the per-deployment subdirectory, ensuring it
// exists at 0o700. Called from every mutator under s.mu.
func (s *Store) deploymentDir(deploymentID string) (string, error) {
if strings.TrimSpace(deploymentID) == "" {
return "", errors.New("jobs: deploymentID is required")
}
// Disallow path-traversal — the deploymentId comes from
// CreateDeployment which uses crypto/rand hex, but defence-in-
// depth: reject any id that contains a path separator.
if strings.ContainsAny(deploymentID, "/\\") {
return "", fmt.Errorf("jobs: invalid deploymentID %q", deploymentID)
}
d := filepath.Join(s.dir, deploymentID)
if err := os.MkdirAll(d, 0o700); err != nil {
return "", fmt.Errorf("jobs: create deployment dir %q: %w", d, err)
}
return d, nil
}
// loadIndex reads <depDir>/index.json. Returns a fresh zero-Index when
// the file is missing — that's a "no jobs yet" deployment, not an
// error. Caller MUST hold s.mu.
func (s *Store) loadIndex(deploymentID string) (*Index, error) {
depDir, err := s.deploymentDir(deploymentID)
if err != nil {
return nil, err
}
path := filepath.Join(depDir, indexFileName)
raw, err := os.ReadFile(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return &Index{
DeploymentID: deploymentID,
Jobs: []Job{},
Executions: []Execution{},
}, nil
}
return nil, fmt.Errorf("jobs: read index %q: %w", path, err)
}
var idx Index
if err := json.Unmarshal(raw, &idx); err != nil {
return nil, fmt.Errorf("jobs: decode index %q: %w", path, err)
}
if idx.Jobs == nil {
idx.Jobs = []Job{}
}
if idx.Executions == nil {
idx.Executions = []Execution{}
}
idx.DeploymentID = deploymentID
return &idx, nil
}
// persistIndex writes idx to <depDir>/index.json via atomic
// temp+rename. The temp file is written at 0o600 so concurrent readers
// either see the old version or the new one — never a partial write.
// Caller MUST hold s.mu.
func (s *Store) persistIndex(idx *Index) error {
depDir, err := s.deploymentDir(idx.DeploymentID)
if err != nil {
return err
}
final := filepath.Join(depDir, indexFileName)
raw, err := json.MarshalIndent(idx, "", " ")
if err != nil {
return fmt.Errorf("jobs: marshal index: %w", err)
}
tmp, err := os.CreateTemp(depDir, ".index-*.json.tmp")
if err != nil {
return fmt.Errorf("jobs: create temp index: %w", err)
}
tmpPath := tmp.Name()
cleanup := true
defer func() {
if cleanup {
_ = os.Remove(tmpPath)
}
}()
if _, err := tmp.Write(raw); err != nil {
_ = tmp.Close()
return fmt.Errorf("jobs: write temp index %q: %w", tmpPath, err)
}
if err := tmp.Sync(); err != nil {
_ = tmp.Close()
return fmt.Errorf("jobs: fsync temp index %q: %w", tmpPath, err)
}
if err := tmp.Close(); err != nil {
return fmt.Errorf("jobs: close temp index %q: %w", tmpPath, err)
}
if err := os.Chmod(tmpPath, 0o600); err != nil {
return fmt.Errorf("jobs: chmod temp index %q: %w", tmpPath, err)
}
if err := os.Rename(tmpPath, final); err != nil {
return fmt.Errorf("jobs: rename temp index → %q: %w", final, err)
}
cleanup = false
return nil
}
// UpsertJob inserts or updates the Job with id JobID(deploymentID,
// jobName). The supplied Job's ID + DeploymentID are stamped from the
// arguments — callers don't have to spell them.
//
// The merge keeps StartedAt + FinishedAt monotonic: a re-emission with
// nil StartedAt won't clobber a previously-stamped one. The frontend
// never sees a job "un-start".
func (s *Store) UpsertJob(j Job) error {
if strings.TrimSpace(j.DeploymentID) == "" {
return errors.New("jobs: UpsertJob: deploymentID is required")
}
if strings.TrimSpace(j.JobName) == "" {
return errors.New("jobs: UpsertJob: jobName is required")
}
j.ID = JobID(j.DeploymentID, j.JobName)
if j.DependsOn == nil {
j.DependsOn = []string{}
}
s.mu.Lock()
defer s.mu.Unlock()
idx, err := s.loadIndex(j.DeploymentID)
if err != nil {
return err
}
for i := range idx.Jobs {
if idx.Jobs[i].ID == j.ID {
merged := mergeJob(idx.Jobs[i], j)
idx.Jobs[i] = merged
return s.persistIndex(idx)
}
}
idx.Jobs = append(idx.Jobs, j)
return s.persistIndex(idx)
}
// mergeJob keeps monotonic timestamps + the latest non-empty
// LatestExecutionID. The status from the new event always wins (the
// helmwatch bridge is the only writer; later state-machine events
// supersede earlier ones).
func mergeJob(prev, next Job) Job {
out := next
if next.StartedAt == nil && prev.StartedAt != nil {
out.StartedAt = prev.StartedAt
}
if next.FinishedAt == nil && prev.FinishedAt != nil {
out.FinishedAt = prev.FinishedAt
}
if next.LatestExecutionID == "" && prev.LatestExecutionID != "" {
out.LatestExecutionID = prev.LatestExecutionID
}
if out.StartedAt != nil && out.FinishedAt != nil {
out.DurationMs = out.FinishedAt.Sub(*out.StartedAt).Milliseconds()
}
return out
}
// StartExecution allocates a new Execution row for the given Job and
// stamps the Job's LatestExecutionID + StartedAt + Status=running. The
// returned Execution.ID is the path-segment component the /logs
// endpoint accepts. Caller is responsible for writing the matching
// Job upsert with appId/batchId metadata BEFORE the first
// StartExecution — the store does not back-fill those fields.
func (s *Store) StartExecution(deploymentID, jobName string, startedAt time.Time) (Execution, error) {
if strings.TrimSpace(deploymentID) == "" {
return Execution{}, errors.New("jobs: StartExecution: deploymentID is required")
}
if strings.TrimSpace(jobName) == "" {
return Execution{}, errors.New("jobs: StartExecution: jobName is required")
}
jobID := JobID(deploymentID, jobName)
execID, err := newExecutionID()
if err != nil {
return Execution{}, err
}
exec := Execution{
ID: execID,
JobID: jobID,
DeploymentID: deploymentID,
Status: StatusRunning,
StartedAt: startedAt.UTC(),
}
s.mu.Lock()
defer s.mu.Unlock()
idx, err := s.loadIndex(deploymentID)
if err != nil {
return Execution{}, err
}
idx.Executions = append(idx.Executions, exec)
// Stamp the Job's LatestExecutionID + flip Status=running so the
// table view reflects the in-flight attempt without a separate
// UpsertJob call from the bridge.
for i := range idx.Jobs {
if idx.Jobs[i].ID == jobID {
started := startedAt.UTC()
if idx.Jobs[i].StartedAt == nil {
idx.Jobs[i].StartedAt = &started
}
idx.Jobs[i].Status = StatusRunning
idx.Jobs[i].LatestExecutionID = execID
break
}
}
if err := s.persistIndex(idx); err != nil {
return Execution{}, err
}
return exec, nil
}
// FinishExecution flips an Execution's Status + FinishedAt + flips the
// parent Job into the corresponding terminal state. status must be
// StatusSucceeded or StatusFailed.
func (s *Store) FinishExecution(deploymentID, execID, status string, finishedAt time.Time) error {
if !IsTerminal(status) {
return fmt.Errorf("jobs: FinishExecution: status must be terminal, got %q", status)
}
s.mu.Lock()
defer s.mu.Unlock()
idx, err := s.loadIndex(deploymentID)
if err != nil {
return err
}
finished := finishedAt.UTC()
var jobID string
found := false
for i := range idx.Executions {
if idx.Executions[i].ID == execID {
idx.Executions[i].Status = status
idx.Executions[i].FinishedAt = &finished
jobID = idx.Executions[i].JobID
found = true
break
}
}
if !found {
return fmt.Errorf("jobs: FinishExecution: execution %q: %w", execID, ErrNotFound)
}
for i := range idx.Jobs {
if idx.Jobs[i].ID == jobID {
idx.Jobs[i].Status = status
idx.Jobs[i].FinishedAt = &finished
if idx.Jobs[i].StartedAt != nil {
idx.Jobs[i].DurationMs = finished.Sub(*idx.Jobs[i].StartedAt).Milliseconds()
}
break
}
}
return s.persistIndex(idx)
}
// AppendLogLines appends one or more LogLines to the per-execution
// NDJSON file. Stamps LineNumber 1-indexed, monotonic across calls.
// Updates the parent Execution's LineCount under the same lock so
// subsequent /logs?total reflects the new ceiling.
//
// Lines is a slice so a bridge that emits batched events (e.g. one
// state transition + a derived "Helm install in progress" log line)
// can persist them in a single write.
func (s *Store) AppendLogLines(deploymentID, execID string, lines []LogLine) error {
if len(lines) == 0 {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
idx, err := s.loadIndex(deploymentID)
if err != nil {
return err
}
var exec *Execution
for i := range idx.Executions {
if idx.Executions[i].ID == execID {
exec = &idx.Executions[i]
break
}
}
if exec == nil {
return fmt.Errorf("jobs: AppendLogLines: execution %q: %w", execID, ErrNotFound)
}
depDir, err := s.deploymentDir(deploymentID)
if err != nil {
return err
}
logPath := filepath.Join(depDir, execID+".log")
f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
return fmt.Errorf("jobs: open log %q: %w", logPath, err)
}
defer f.Close()
bw := bufio.NewWriter(f)
startLine := exec.LineCount
for i := range lines {
startLine++
lines[i].LineNumber = startLine
if lines[i].Timestamp.IsZero() {
lines[i].Timestamp = time.Now().UTC()
} else {
lines[i].Timestamp = lines[i].Timestamp.UTC()
}
if lines[i].Level == "" {
lines[i].Level = LevelInfo
}
raw, err := json.Marshal(lines[i])
if err != nil {
return fmt.Errorf("jobs: marshal log line: %w", err)
}
if _, err := bw.Write(raw); err != nil {
return fmt.Errorf("jobs: write log %q: %w", logPath, err)
}
if err := bw.WriteByte('\n'); err != nil {
return fmt.Errorf("jobs: write log newline %q: %w", logPath, err)
}
}
if err := bw.Flush(); err != nil {
return fmt.Errorf("jobs: flush log %q: %w", logPath, err)
}
if err := f.Sync(); err != nil {
return fmt.Errorf("jobs: fsync log %q: %w", logPath, err)
}
exec.LineCount = startLine
return s.persistIndex(idx)
}
// ListJobs returns every Job for the deployment, sorted started-at
// DESC with pending Jobs (no StartedAt) bucketed last. The handler
// returns the slice unchanged.
func (s *Store) ListJobs(deploymentID string) ([]Job, error) {
s.mu.Lock()
defer s.mu.Unlock()
idx, err := s.loadIndex(deploymentID)
if err != nil {
return nil, err
}
out := make([]Job, len(idx.Jobs))
copy(out, idx.Jobs)
sort.SliceStable(out, func(i, j int) bool {
// Pending (no StartedAt) sort last.
ai, bi := out[i].StartedAt, out[j].StartedAt
switch {
case ai == nil && bi == nil:
return out[i].JobName < out[j].JobName
case ai == nil:
return false
case bi == nil:
return true
}
// started-at DESC: more-recent first.
if ai.Equal(*bi) {
return out[i].JobName < out[j].JobName
}
return ai.After(*bi)
})
return out, nil
}
// GetJob returns the Job + its Executions list. ErrNotFound if no Job
// with the given id exists for the deployment.
func (s *Store) GetJob(deploymentID, jobID string) (Job, []Execution, error) {
s.mu.Lock()
defer s.mu.Unlock()
idx, err := s.loadIndex(deploymentID)
if err != nil {
return Job{}, nil, err
}
for i := range idx.Jobs {
if idx.Jobs[i].ID == jobID {
execs := []Execution{}
for _, e := range idx.Executions {
if e.JobID == jobID {
execs = append(execs, e)
}
}
sort.Slice(execs, func(a, b int) bool {
return execs[a].StartedAt.After(execs[b].StartedAt)
})
return idx.Jobs[i], execs, nil
}
}
return Job{}, nil, fmt.Errorf("jobs: GetJob %q: %w", jobID, ErrNotFound)
}
// GetExecution returns the Execution metadata + the parent
// deploymentID for resolving the log file path. The /logs endpoint
// uses this so the URL only carries the executionID, not the
// deployment id (matching the spec's
// /api/v1/actions/executions/{execId}/logs shape).
func (s *Store) FindExecution(deploymentID, execID string) (Execution, error) {
s.mu.Lock()
defer s.mu.Unlock()
idx, err := s.loadIndex(deploymentID)
if err != nil {
return Execution{}, err
}
for _, e := range idx.Executions {
if e.ID == execID {
return e, nil
}
}
return Execution{}, fmt.Errorf("jobs: FindExecution %q: %w", execID, ErrNotFound)
}
// FindExecutionAcrossDeployments scans every <depId>/index.json under
// the store root for an execution with the given id. Used by the
// /api/v1/actions/executions/{execId}/logs endpoint where the URL
// does not carry the deploymentID — see the contract spec in #205.
//
// Returns the Execution + its DeploymentID. Stops scanning at the
// first match. ErrNotFound when no deployment has it.
func (s *Store) FindExecutionAcrossDeployments(execID string) (Execution, error) {
s.mu.Lock()
defer s.mu.Unlock()
entries, err := os.ReadDir(s.dir)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return Execution{}, fmt.Errorf("jobs: FindExecutionAcrossDeployments %q: %w", execID, ErrNotFound)
}
return Execution{}, fmt.Errorf("jobs: scan store dir %q: %w", s.dir, err)
}
for _, ent := range entries {
if !ent.IsDir() {
continue
}
idx, err := s.loadIndex(ent.Name())
if err != nil {
// A single corrupt deployment must not poison the lookup;
// the API returns 404 only if NO deployment matches.
continue
}
for _, e := range idx.Executions {
if e.ID == execID {
return e, nil
}
}
}
return Execution{}, fmt.Errorf("jobs: FindExecutionAcrossDeployments %q: %w", execID, ErrNotFound)
}
// LogPage is the wire-contract response shape for the /logs endpoint.
// Defined in the store package so the handler doesn't have to
// re-declare it.
type LogPage struct {
Lines []LogLine `json:"lines"`
Total int `json:"total"`
ExecutionFinished bool `json:"executionFinished"`
}
// PageLogs returns a window into the Execution's NDJSON log file.
// fromLine is 1-indexed (matches LogLine.LineNumber); limit is
// clamped to [1, MaxLogPageSize] with DefaultLogPageSize on
// fromLine==0/limit==0.
func (s *Store) PageLogs(deploymentID, execID string, fromLine, limit int) (LogPage, error) {
if fromLine <= 0 {
fromLine = 1
}
if limit <= 0 {
limit = DefaultLogPageSize
}
if limit > MaxLogPageSize {
limit = MaxLogPageSize
}
s.mu.Lock()
defer s.mu.Unlock()
idx, err := s.loadIndex(deploymentID)
if err != nil {
return LogPage{}, err
}
var exec *Execution
for i := range idx.Executions {
if idx.Executions[i].ID == execID {
exec = &idx.Executions[i]
break
}
}
if exec == nil {
return LogPage{}, fmt.Errorf("jobs: PageLogs %q: %w", execID, ErrNotFound)
}
depDir, err := s.deploymentDir(deploymentID)
if err != nil {
return LogPage{}, err
}
logPath := filepath.Join(depDir, execID+".log")
f, err := os.Open(logPath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// No log file yet — execution started but no LogLines
// were appended. That's a valid empty-page response.
return LogPage{
Lines: []LogLine{},
Total: exec.LineCount,
ExecutionFinished: IsTerminal(exec.Status),
}, nil
}
return LogPage{}, fmt.Errorf("jobs: open log %q: %w", logPath, err)
}
defer f.Close()
out := make([]LogLine, 0, limit)
br := bufio.NewReader(f)
lineNum := 0
for {
raw, err := br.ReadBytes('\n')
if len(raw) > 0 {
lineNum++
if lineNum >= fromLine && len(out) < limit {
var ll LogLine
if uerr := json.Unmarshal(stripNewline(raw), &ll); uerr == nil {
out = append(out, ll)
}
}
if len(out) >= limit {
// Drain remaining lines just for the count — but we
// have exec.LineCount on hand; abort scan early.
break
}
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return LogPage{}, fmt.Errorf("jobs: read log %q: %w", logPath, err)
}
}
return LogPage{
Lines: out,
Total: exec.LineCount,
ExecutionFinished: IsTerminal(exec.Status),
}, nil
}
func stripNewline(b []byte) []byte {
if n := len(b); n > 0 && b[n-1] == '\n' {
b = b[:n-1]
}
if n := len(b); n > 0 && b[n-1] == '\r' {
b = b[:n-1]
}
return b
}
// SummarizeBatches groups Jobs by BatchID and returns a per-batch
// progress row. Empty deployment → empty slice (not nil) so the JSON
// shape matches the spec's `{batches: []}` exactly.
func (s *Store) SummarizeBatches(deploymentID string) ([]BatchSummary, error) {
s.mu.Lock()
defer s.mu.Unlock()
idx, err := s.loadIndex(deploymentID)
if err != nil {
return nil, err
}
byBatch := map[string]*BatchSummary{}
order := []string{}
for _, j := range idx.Jobs {
bid := j.BatchID
if bid == "" {
bid = "(unbatched)"
}
bs, ok := byBatch[bid]
if !ok {
bs = &BatchSummary{BatchID: bid}
byBatch[bid] = bs
order = append(order, bid)
}
bs.Total++
switch j.Status {
case StatusSucceeded:
bs.Succeeded++
bs.Finished++
case StatusFailed:
bs.Failed++
bs.Finished++
case StatusRunning:
bs.Running++
case StatusPending, "":
bs.Pending++
}
}
out := make([]BatchSummary, 0, len(order))
for _, bid := range order {
out = append(out, *byBatch[bid])
}
return out, nil
}
// newExecutionID returns a 16-byte hex string. Globally unique within
// a deployment with vanishing collision probability — even at the
// catalyst-api's maximum sustained emit rate (a few hundred per
// minute) this is overkill, but cheap.
func newExecutionID() (string, error) {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
return "", fmt.Errorf("jobs: crypto/rand: %w", err)
}
return hex.EncodeToString(b), nil
}

View File

@ -0,0 +1,456 @@
// store_test.go — round-trip + pagination + atomic-write tests for the
// Jobs/Executions store. Tests use t.TempDir() so they run without a
// PVC and clean themselves up.
package jobs
import (
"errors"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
)
func newTestStore(t *testing.T) *Store {
t.Helper()
st, err := NewStore(t.TempDir())
if err != nil {
t.Fatalf("NewStore: %v", err)
}
return st
}
func TestStore_UpsertJob_RoundTrip(t *testing.T) {
st := newTestStore(t)
depID := "dep-1"
j := Job{
DeploymentID: depID,
JobName: "install-cilium",
AppID: "cilium",
BatchID: BatchBootstrapKit,
DependsOn: []string{"install-flux"},
Status: StatusPending,
}
if err := st.UpsertJob(j); err != nil {
t.Fatalf("UpsertJob: %v", err)
}
got, err := st.ListJobs(depID)
if err != nil {
t.Fatalf("ListJobs: %v", err)
}
if len(got) != 1 {
t.Fatalf("expected 1 job, got %d", len(got))
}
if got[0].ID != JobID(depID, "install-cilium") {
t.Fatalf("ID mismatch: %q", got[0].ID)
}
if got[0].AppID != "cilium" || got[0].BatchID != BatchBootstrapKit {
t.Fatalf("metadata mismatch: %+v", got[0])
}
if len(got[0].DependsOn) != 1 || got[0].DependsOn[0] != "install-flux" {
t.Fatalf("dependsOn mismatch: %+v", got[0].DependsOn)
}
}
func TestStore_UpsertJob_MergesMonotonicTimestamps(t *testing.T) {
st := newTestStore(t)
depID := "dep-2"
started := time.Date(2026, 4, 29, 12, 0, 0, 0, time.UTC)
if err := st.UpsertJob(Job{
DeploymentID: depID,
JobName: "install-foo",
StartedAt: &started,
Status: StatusRunning,
}); err != nil {
t.Fatal(err)
}
// Re-emit without StartedAt — the merge must preserve the prior value.
if err := st.UpsertJob(Job{
DeploymentID: depID,
JobName: "install-foo",
Status: StatusRunning,
}); err != nil {
t.Fatal(err)
}
got, _ := st.ListJobs(depID)
if got[0].StartedAt == nil || !got[0].StartedAt.Equal(started) {
t.Fatalf("StartedAt clobbered: %+v", got[0].StartedAt)
}
}
func TestStore_StartAndFinishExecution(t *testing.T) {
st := newTestStore(t)
depID := "dep-3"
if err := st.UpsertJob(Job{
DeploymentID: depID,
JobName: "install-foo",
AppID: "foo",
BatchID: BatchBootstrapKit,
Status: StatusPending,
}); err != nil {
t.Fatal(err)
}
t0 := time.Now().UTC()
exec, err := st.StartExecution(depID, "install-foo", t0)
if err != nil {
t.Fatalf("StartExecution: %v", err)
}
if exec.ID == "" || exec.Status != StatusRunning {
t.Fatalf("bad exec: %+v", exec)
}
job, execs, err := st.GetJob(depID, JobID(depID, "install-foo"))
if err != nil {
t.Fatalf("GetJob: %v", err)
}
if job.Status != StatusRunning {
t.Errorf("Job.Status: want running, got %q", job.Status)
}
if job.LatestExecutionID != exec.ID {
t.Errorf("LatestExecutionID: want %q, got %q", exec.ID, job.LatestExecutionID)
}
if len(execs) != 1 {
t.Fatalf("expected 1 exec, got %d", len(execs))
}
t1 := t0.Add(5 * time.Second)
if err := st.FinishExecution(depID, exec.ID, StatusSucceeded, t1); err != nil {
t.Fatalf("FinishExecution: %v", err)
}
job, _, _ = st.GetJob(depID, JobID(depID, "install-foo"))
if job.Status != StatusSucceeded {
t.Errorf("Job.Status: want succeeded, got %q", job.Status)
}
if job.FinishedAt == nil {
t.Fatalf("Job.FinishedAt nil")
}
if job.DurationMs != 5000 {
t.Errorf("DurationMs: want 5000, got %d", job.DurationMs)
}
}
func TestStore_FinishExecution_RejectsNonTerminal(t *testing.T) {
st := newTestStore(t)
if err := st.UpsertJob(Job{DeploymentID: "d", JobName: "install-x"}); err != nil {
t.Fatal(err)
}
exec, err := st.StartExecution("d", "install-x", time.Now())
if err != nil {
t.Fatal(err)
}
if err := st.FinishExecution("d", exec.ID, StatusRunning, time.Now()); err == nil {
t.Fatal("expected error finishing with non-terminal status")
}
}
func TestStore_FinishExecution_NotFound(t *testing.T) {
st := newTestStore(t)
err := st.FinishExecution("d", "no-such-exec", StatusSucceeded, time.Now())
if !errors.Is(err, ErrNotFound) {
t.Fatalf("want ErrNotFound, got %v", err)
}
}
func TestStore_AppendLogLines_Pagination(t *testing.T) {
st := newTestStore(t)
depID := "dep-pag"
if err := st.UpsertJob(Job{DeploymentID: depID, JobName: "install-x"}); err != nil {
t.Fatal(err)
}
exec, err := st.StartExecution(depID, "install-x", time.Now())
if err != nil {
t.Fatal(err)
}
// Append 100 lines.
lines := make([]LogLine, 100)
for i := range lines {
lines[i] = LogLine{
Level: LevelInfo,
Message: "line-" + strings.Repeat(".", i%5),
}
}
if err := st.AppendLogLines(depID, exec.ID, lines); err != nil {
t.Fatal(err)
}
page, err := st.PageLogs(depID, exec.ID, 1, 10)
if err != nil {
t.Fatal(err)
}
if len(page.Lines) != 10 || page.Total != 100 {
t.Fatalf("page1: %+v", page)
}
if page.Lines[0].LineNumber != 1 || page.Lines[9].LineNumber != 10 {
t.Fatalf("LineNumber stamping: %+v", page.Lines)
}
if page.ExecutionFinished {
t.Errorf("ExecutionFinished: want false (still running)")
}
// Page 11..20.
page2, err := st.PageLogs(depID, exec.ID, 11, 10)
if err != nil {
t.Fatal(err)
}
if len(page2.Lines) != 10 || page2.Lines[0].LineNumber != 11 {
t.Fatalf("page2: %+v", page2.Lines)
}
// fromLine past total → empty page, executionFinished still false.
pageEmpty, _ := st.PageLogs(depID, exec.ID, 200, 10)
if len(pageEmpty.Lines) != 0 {
t.Errorf("expected empty page, got %d", len(pageEmpty.Lines))
}
// Limit > MaxLogPageSize is clamped.
pageBig, _ := st.PageLogs(depID, exec.ID, 1, 99999)
if len(pageBig.Lines) > MaxLogPageSize {
t.Errorf("limit not clamped: got %d", len(pageBig.Lines))
}
// Finish exec, executionFinished flips true.
if err := st.FinishExecution(depID, exec.ID, StatusSucceeded, time.Now()); err != nil {
t.Fatal(err)
}
pageDone, _ := st.PageLogs(depID, exec.ID, 1, 5)
if !pageDone.ExecutionFinished {
t.Errorf("ExecutionFinished: want true after FinishExecution")
}
}
func TestStore_ListJobs_SortStartedAtDescPendingLast(t *testing.T) {
st := newTestStore(t)
depID := "dep-sort"
t0 := time.Date(2026, 4, 29, 12, 0, 0, 0, time.UTC)
t1 := t0.Add(10 * time.Second)
t2 := t0.Add(20 * time.Second)
mkJob := func(name string, start *time.Time, status string) Job {
return Job{
DeploymentID: depID,
JobName: name,
Status: status,
StartedAt: start,
}
}
jobs := []Job{
mkJob("install-a", &t0, StatusSucceeded),
mkJob("install-b", &t2, StatusRunning),
mkJob("install-c", nil, StatusPending),
mkJob("install-d", &t1, StatusFailed),
}
for _, j := range jobs {
if err := st.UpsertJob(j); err != nil {
t.Fatal(err)
}
}
got, err := st.ListJobs(depID)
if err != nil {
t.Fatal(err)
}
want := []string{"install-b", "install-d", "install-a", "install-c"}
if len(got) != len(want) {
t.Fatalf("length mismatch: %d vs %d", len(got), len(want))
}
for i, w := range want {
if got[i].JobName != w {
t.Errorf("position %d: got %q, want %q", i, got[i].JobName, w)
}
}
}
func TestStore_GetJob_NotFound(t *testing.T) {
st := newTestStore(t)
_, _, err := st.GetJob("dep-x", JobID("dep-x", "install-missing"))
if !errors.Is(err, ErrNotFound) {
t.Fatalf("want ErrNotFound, got %v", err)
}
}
func TestStore_SummarizeBatches(t *testing.T) {
st := newTestStore(t)
depID := "dep-batch"
t0 := time.Now().UTC()
cases := []struct {
name string
status string
start *time.Time
}{
{"install-a", StatusSucceeded, &t0},
{"install-b", StatusFailed, &t0},
{"install-c", StatusRunning, &t0},
{"install-d", StatusPending, nil},
{"install-e", StatusSucceeded, &t0},
}
for _, c := range cases {
if err := st.UpsertJob(Job{
DeploymentID: depID,
JobName: c.name,
BatchID: BatchBootstrapKit,
Status: c.status,
StartedAt: c.start,
}); err != nil {
t.Fatal(err)
}
}
out, err := st.SummarizeBatches(depID)
if err != nil {
t.Fatal(err)
}
if len(out) != 1 || out[0].BatchID != BatchBootstrapKit {
t.Fatalf("expected one batch, got %+v", out)
}
bs := out[0]
if bs.Total != 5 || bs.Succeeded != 2 || bs.Failed != 1 || bs.Running != 1 || bs.Pending != 1 || bs.Finished != 3 {
t.Errorf("counts: %+v", bs)
}
}
func TestStore_AtomicIndexWrite_NoTempLeftBehind(t *testing.T) {
st := newTestStore(t)
depID := "dep-atomic"
for i := 0; i < 50; i++ {
j := Job{
DeploymentID: depID,
JobName: "install-x",
Status: StatusRunning,
}
if err := st.UpsertJob(j); err != nil {
t.Fatal(err)
}
}
depDir := filepath.Join(st.Dir(), depID)
entries, err := os.ReadDir(depDir)
if err != nil {
t.Fatal(err)
}
for _, e := range entries {
if strings.HasSuffix(e.Name(), ".tmp") {
t.Errorf("temp file left behind: %s", e.Name())
}
}
}
func TestStore_RaceFreeConcurrentAppends(t *testing.T) {
// Concurrent writers across N executions must not corrupt the
// per-execution NDJSON files. Each writer appends K lines to its
// own execution; we then assert each file has exactly K
// well-formed lines and the index reports the right LineCount.
st := newTestStore(t)
depID := "dep-race"
const N = 4
const K = 100
execIDs := make([]string, N)
for i := 0; i < N; i++ {
jobName := "install-" + string(rune('a'+i))
if err := st.UpsertJob(Job{
DeploymentID: depID,
JobName: jobName,
}); err != nil {
t.Fatal(err)
}
exec, err := st.StartExecution(depID, jobName, time.Now())
if err != nil {
t.Fatal(err)
}
execIDs[i] = exec.ID
}
var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
for k := 0; k < K; k++ {
if err := st.AppendLogLines(depID, execIDs[idx], []LogLine{{
Level: LevelInfo,
Message: "k=", // tiny payload
}}); err != nil {
t.Errorf("AppendLogLines: %v", err)
return
}
}
}(i)
}
wg.Wait()
for i := 0; i < N; i++ {
page, err := st.PageLogs(depID, execIDs[i], 1, MaxLogPageSize)
if err != nil {
t.Fatal(err)
}
if page.Total != K {
t.Errorf("exec %d: total want %d, got %d", i, K, page.Total)
}
if len(page.Lines) != K {
t.Errorf("exec %d: lines want %d, got %d", i, K, len(page.Lines))
}
// LineNumbers must be 1..K monotonic.
for j, ll := range page.Lines {
if ll.LineNumber != j+1 {
t.Errorf("exec %d line %d: LineNumber want %d, got %d", i, j, j+1, ll.LineNumber)
break
}
}
}
}
func TestStore_FindExecutionAcrossDeployments(t *testing.T) {
st := newTestStore(t)
for _, depID := range []string{"dep-a", "dep-b", "dep-c"} {
if err := st.UpsertJob(Job{DeploymentID: depID, JobName: "install-x"}); err != nil {
t.Fatal(err)
}
}
exec, err := st.StartExecution("dep-b", "install-x", time.Now())
if err != nil {
t.Fatal(err)
}
got, err := st.FindExecutionAcrossDeployments(exec.ID)
if err != nil {
t.Fatalf("FindExecutionAcrossDeployments: %v", err)
}
if got.DeploymentID != "dep-b" {
t.Errorf("DeploymentID: want dep-b, got %q", got.DeploymentID)
}
_, err = st.FindExecutionAcrossDeployments("nope")
if !errors.Is(err, ErrNotFound) {
t.Errorf("want ErrNotFound, got %v", err)
}
}
func TestStore_DeploymentDir_RejectsPathTraversal(t *testing.T) {
st := newTestStore(t)
if err := st.UpsertJob(Job{DeploymentID: "../etc/passwd", JobName: "install-x"}); err == nil {
t.Fatal("expected error for path-traversal id")
}
}
func TestStore_LogsForMissingExec(t *testing.T) {
st := newTestStore(t)
if err := st.UpsertJob(Job{DeploymentID: "d", JobName: "install-x"}); err != nil {
t.Fatal(err)
}
_, err := st.PageLogs("d", "no-such", 1, 10)
if !errors.Is(err, ErrNotFound) {
t.Fatalf("want ErrNotFound, got %v", err)
}
}

View File

@ -0,0 +1,247 @@
// Package jobs implements the Jobs/Executions data model + persistence
// the catalyst-api Sovereign Admin surfaces consume (issue #205, sub of
// epic #204).
//
// # Architecture
//
// Each `bp-<chart>` HelmRelease the Phase-1 helmwatch observes maps 1:1
// to a Job (jobName="install-<chart>", appId="<chart>",
// batchId="bootstrap-kit"). Each watch attempt the helmwatch emits is
// an Execution; LogLines append to the active Execution's NDJSON log
// file as the helmwatch goroutine derives them from HelmRelease
// status.conditions.
//
// Two parallel feeds live for the same data:
//
// - The existing `/api/v1/deployments/{id}/events` SSE feed — kept
// untouched, the wizard's live banner reads it.
// - The new Jobs/Executions REST surface — the table-view UX (issue
// #204) reads it.
//
// Persistence (per docs/INVIOLABLE-PRINCIPLES.md #4: every path is
// runtime-configurable via CATALYST_EXECUTIONS_DIR; default lives on
// the same `catalyst-api-deployments` PVC mount the deployments store
// uses):
//
// /var/lib/catalyst/executions/<deploymentId>/index.json — Job +
// Execution
// metadata,
// atomic
// write
// (temp+
// rename).
// /var/lib/catalyst/executions/<deploymentId>/<execId>.log — append-
// only
// NDJSON
// (one
// LogLine
// per
// line).
//
// Per docs/INVIOLABLE-PRINCIPLES.md #10 (credential hygiene) no log
// line ever carries a kubeconfig, bearer token, or other secret — the
// helmwatch package only emits HelmRelease status messages, which are
// public-cluster-state by definition.
package jobs
import "time"
// Status enums — kept in lockstep with helmwatch.State* via the
// translation in helmwatch_bridge.go. The wire contract uses these
// strings verbatim (frontend agents code against the spec in #205).
const (
StatusPending = "pending"
StatusRunning = "running"
StatusSucceeded = "succeeded"
StatusFailed = "failed"
)
// Log levels — the helmwatch bridge maps Helm condition severity onto
// these. ERROR for "failed", WARN for "degraded", INFO for everything
// else (DEBUG is reserved for future client-instrumented log feeds).
const (
LevelDebug = "DEBUG"
LevelInfo = "INFO"
LevelWarn = "WARN"
LevelError = "ERROR"
)
// BatchID — the only batch the bootstrap-kit currently emits. Future
// batches (Phase-2 component installs, Day-2 Crossplane reconciles)
// will introduce additional batch ids; this constant is the canonical
// "Phase-1 install" batch tag.
const BatchBootstrapKit = "bootstrap-kit"
// JobNamePrefix — every Phase-1 Job is named "install-<chart>". The
// helmwatch bridge derives this from the HelmRelease metadata.name
// ("bp-foo" → "install-foo").
const JobNamePrefix = "install-"
// IsTerminal reports whether a status string represents a terminal
// state (no further state transitions). The store's running→done
// transitions and the "executionFinished" pagination flag both key
// off this.
func IsTerminal(status string) bool {
switch status {
case StatusSucceeded, StatusFailed:
return true
}
return false
}
// Job is the wire-contract Job shape. The store materialises one Job
// per `bp-<chart>` HelmRelease; the helmwatch bridge keeps its state
// in sync as conditions transition.
//
// Fields use omitempty for nullable timestamps so the JSON shape the
// frontend sees matches the spec verbatim.
type Job struct {
// ID is the stable identifier "<deploymentId>:<jobName>". It is
// the URL-safe id the GET /jobs/{jobId} endpoint accepts.
ID string `json:"id"`
// DeploymentID — the parent deployment the Job belongs to.
DeploymentID string `json:"deploymentId"`
// JobName — "install-<chart>", e.g. "install-cilium".
JobName string `json:"jobName"`
// AppID — the Sovereign component id, e.g. "cilium". Equals
// helmwatch.ComponentIDFromHelmRelease(HR.metadata.name).
AppID string `json:"appId"`
// BatchID — currently always BatchBootstrapKit; reserved for
// future Day-2 batches.
BatchID string `json:"batchId"`
// DependsOn — list of jobNames this Job depends on. Derived from
// the HelmRelease's `spec.dependsOn[*].name` with the bp- prefix
// stripped and "install-" prepended (e.g. spec.dependsOn entry
// `name: bp-cilium` → DependsOn entry `install-cilium`).
DependsOn []string `json:"dependsOn"`
// Status — pending|running|succeeded|failed. See package consts.
Status string `json:"status"`
// StartedAt — UTC instant the Job first transitioned out of
// pending. nil while the Job is still pending.
StartedAt *time.Time `json:"startedAt,omitempty"`
// FinishedAt — UTC instant the Job reached a terminal state. nil
// until the Job is succeeded or failed.
FinishedAt *time.Time `json:"finishedAt,omitempty"`
// DurationMs — milliseconds between StartedAt and FinishedAt.
// Zero while either is nil.
DurationMs int64 `json:"durationMs"`
// LatestExecutionID — id of the most-recent Execution for this
// Job, empty until the first attempt starts. The frontend uses
// this to deep-link to the GitLab-style log viewer without
// having to load the full Execution list first.
LatestExecutionID string `json:"latestExecutionId,omitempty"`
}
// Execution captures one attempt of a Job. The store appends a new
// Execution every time the Job transitions back into running from a
// terminal state (Day-2 retry flows; Phase-1 installs typically have
// exactly one Execution per Job).
type Execution struct {
// ID — opaque identifier, hex-encoded random bytes. Globally
// unique within a deployment.
ID string `json:"id"`
// JobID — parent Job stable id ("<deploymentId>:<jobName>").
JobID string `json:"jobId"`
// DeploymentID — parent deployment id, denormalised so the
// /executions/{execId}/logs endpoint can resolve the log file
// path without a Job lookup.
DeploymentID string `json:"deploymentId"`
// Status — running|succeeded|failed (pending is reserved for the
// Job aggregate; an Execution is always at least running).
Status string `json:"status"`
// StartedAt — UTC instant the Execution attempt began.
StartedAt time.Time `json:"startedAt"`
// FinishedAt — UTC instant the Execution reached terminal. nil
// while still running.
FinishedAt *time.Time `json:"finishedAt,omitempty"`
// LineCount — total LogLines appended to this Execution's NDJSON
// log file. The /logs endpoint compares fromLine against this to
// derive `total` and pagination boundaries.
LineCount int `json:"lineCount"`
}
// LogLine is one record in an Execution's append-only NDJSON file.
// LineNumber is 1-indexed — the GitLab CI runner viewer the frontend
// renders keys off it for the gutter.
type LogLine struct {
// LineNumber — 1-indexed. The store stamps this on append.
LineNumber int `json:"lineNumber"`
// Timestamp — RFC3339Nano in UTC.
Timestamp time.Time `json:"timestamp"`
// Level — INFO|DEBUG|WARN|ERROR. See package consts.
Level string `json:"level"`
// Message — the rendered log line. The frontend strips ANSI on
// its own; the store does not parse or re-encode.
Message string `json:"message"`
}
// Index is the on-disk shape of <depId>/index.json. Holds Job +
// Execution metadata; log lines live in the per-execution NDJSON
// files. Persisted via atomic temp+rename through Store.persistIndex.
type Index struct {
// DeploymentID — denormalised so a stray index.json file is
// self-describing.
DeploymentID string `json:"deploymentId"`
// Jobs — all Jobs for this deployment, in insertion order. The
// API handler sorts on read (started-at DESC, pending last).
Jobs []Job `json:"jobs"`
// Executions — all Executions across all Jobs. The API handler
// filters by JobID on the per-job endpoint. Stored flat so a
// per-execution mutation only rewrites this slice once, not a
// nested per-Job list.
Executions []Execution `json:"executions"`
}
// BatchSummary is the wire-contract row for the
// /api/v1/deployments/{depId}/jobs/batches endpoint. The handler
// computes this on read by aggregating Job statuses keyed by BatchID.
type BatchSummary struct {
// BatchID — e.g. "bootstrap-kit".
BatchID string `json:"batchId"`
// Total — number of Jobs in this batch.
Total int `json:"total"`
// Finished — number of Jobs in a terminal state (succeeded |
// failed). Equals Succeeded + Failed.
Finished int `json:"finished"`
// Succeeded — number of Jobs with Status=succeeded.
Succeeded int `json:"succeeded"`
// Failed — number of Jobs with Status=failed.
Failed int `json:"failed"`
// Running — number of Jobs with Status=running.
Running int `json:"running"`
// Pending — number of Jobs with Status=pending.
Pending int `json:"pending"`
}
// JobID — synthesises the stable per-deployment Job id. Exported so
// the helmwatch bridge AND the API handler agree on the format.
func JobID(deploymentID, jobName string) string {
return deploymentID + ":" + jobName
}