Compare commits
2 Commits
04bc7bde0e
...
b060ffa991
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b060ffa991 | ||
|
|
18b94fea75 |
@ -77,6 +77,14 @@ func main() {
|
||||
// Phase 1 retries emit operator instructions per the architectural
|
||||
// contract (Flux owns Phase 1 reconciliation).
|
||||
r.Post("/api/v1/deployments/{id}/phases/{phase}/retry", h.RetryPhase)
|
||||
// Jobs/Executions REST surface (issue #205, sub of epic #204) — the
|
||||
// table-view UX reads this in parallel to the existing SSE events
|
||||
// feed. The 4 endpoints are read-only; every mutation flows
|
||||
// through the helmwatch bridge in internal/jobs.
|
||||
r.Get("/api/v1/deployments/{depId}/jobs", h.ListJobs)
|
||||
r.Get("/api/v1/deployments/{depId}/jobs/batches", h.ListBatches)
|
||||
r.Get("/api/v1/deployments/{depId}/jobs/{jobId}", h.GetJob)
|
||||
r.Get("/api/v1/actions/executions/{execId}/logs", h.GetExecutionLogs)
|
||||
|
||||
log.Info("catalyst api listening", "port", port)
|
||||
if err := http.ListenAndServe(":"+port, r); err != nil {
|
||||
|
||||
@ -32,6 +32,7 @@ import (
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/pdm"
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/store"
|
||||
@ -123,6 +124,13 @@ type Deployment struct {
|
||||
// a no-op rather than racing two informers against the same
|
||||
// HelmRelease list.
|
||||
phase1Started bool
|
||||
|
||||
// jobsBridge — per-deployment helmwatch → Job/Execution/LogLine
|
||||
// bridge (issue #205, sub of epic #204). Allocated on first
|
||||
// component event in emitWatchEvent. Nil-tolerant: the emit path
|
||||
// no-ops the forward when bridge is nil OR the Handler's jobs
|
||||
// store is nil (tests without persistence).
|
||||
jobsBridge *jobs.Bridge
|
||||
}
|
||||
|
||||
// recordEvent appends ev to the durable history under the mutex, evicting
|
||||
@ -872,13 +880,63 @@ func (h *Handler) runProvisioning(dep *Deployment) {
|
||||
// burst; once full we drop on the LIVE side only — the durable
|
||||
// buffer still has the event so the next /events poll or SSE
|
||||
// reconnect replays it.
|
||||
//
|
||||
// The same call also forwards Phase-1 component events to the per-
|
||||
// deployment jobs.Bridge, which materialises Jobs / Executions / LogLines
|
||||
// into the new /api/v1/deployments/{id}/jobs surface (issue #205, sub
|
||||
// of epic #204). The bridge write is best-effort: a failure does NOT
|
||||
// abort the SSE feed (the durable buffer is the contract for
|
||||
// /api/v1/deployments/{id}/events; the jobs surface is a parallel
|
||||
// projection). Errors are logged at warn so an operator can spot
|
||||
// drift.
|
||||
func (h *Handler) emitWatchEvent(dep *Deployment, ev provisioner.Event) {
|
||||
recorded := h.recordEventAndPersist(dep, ev)
|
||||
|
||||
// Synchronise channel send with the close() path in
|
||||
// resumePhase1Watch (which closes dep.eventsCh under dep.mu after
|
||||
// the watch loop returns). Without this guard a helmwatch
|
||||
// goroutine still in-flight when resumePhase1Watch closes
|
||||
// eventsCh races the close — the race detector flags the read
|
||||
// of eventsCh against the close write. Holding dep.mu here makes
|
||||
// the close-vs-send linearisation point unambiguous, and the
|
||||
// `done` short-circuit prevents a panic-on-closed-channel send.
|
||||
dep.mu.Lock()
|
||||
closed := false
|
||||
select {
|
||||
case <-dep.done:
|
||||
closed = true
|
||||
default:
|
||||
}
|
||||
if !closed {
|
||||
select {
|
||||
case dep.eventsCh <- recorded:
|
||||
default:
|
||||
}
|
||||
}
|
||||
bridge := dep.jobsBridge
|
||||
if h.jobs != nil && bridge == nil {
|
||||
bridge = jobs.NewBridge(h.jobs, dep.ID)
|
||||
dep.jobsBridge = bridge
|
||||
}
|
||||
dep.mu.Unlock()
|
||||
|
||||
// Forward Phase-1 component events to the jobs bridge. Phase-0
|
||||
// OpenTofu events have no Job analogue and are silently dropped
|
||||
// inside the bridge (it filters on Phase=="component" + non-
|
||||
// empty Component). The bridge write is best-effort: a failure
|
||||
// does NOT abort the SSE feed.
|
||||
if bridge == nil {
|
||||
return
|
||||
}
|
||||
if err := bridge.OnProvisionerEvent(recorded); err != nil {
|
||||
h.log.Warn("jobs bridge: forward failed",
|
||||
"id", dep.ID,
|
||||
"phase", recorded.Phase,
|
||||
"component", recorded.Component,
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func newID() string {
|
||||
b := make([]byte, 8)
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/pdm"
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/store"
|
||||
)
|
||||
@ -56,6 +57,13 @@ type Handler struct {
|
||||
// always wires this via New() reading CATALYST_DEPLOYMENTS_DIR.
|
||||
store *store.Store
|
||||
|
||||
// jobs — Jobs/Executions/LogLines persistence (issue #205, sub of
|
||||
// epic #204). Nil-tolerant the same way as `store` above: tests
|
||||
// building Handler{} directly run without a jobs store; production
|
||||
// New() wires this from CATALYST_EXECUTIONS_DIR. The 4 jobs HTTP
|
||||
// handlers respond 503 when this is nil.
|
||||
jobs *jobs.Store
|
||||
|
||||
// kubeconfigsDir — directory the cloud-init postback handler
|
||||
// (PutKubeconfig, issue #183) writes the new Sovereign's
|
||||
// kubeconfig file into, mode 0600 per file. Same PVC as the
|
||||
@ -173,6 +181,25 @@ func New(log *slog.Logger) *Handler {
|
||||
h.restoreFromStore()
|
||||
}
|
||||
|
||||
// Jobs/Executions store (issue #205) — same PVC mount, different
|
||||
// subdirectory. CATALYST_EXECUTIONS_DIR overrides the default per
|
||||
// docs/INVIOLABLE-PRINCIPLES.md #4. A failure to create the store
|
||||
// is logged + non-fatal, mirroring the deployment store's CI
|
||||
// fallback path.
|
||||
jobsDir := os.Getenv(jobs.EnvDir)
|
||||
if jobsDir == "" {
|
||||
jobsDir = jobs.DefaultDir
|
||||
}
|
||||
js, err := jobs.NewStore(jobsDir)
|
||||
if err != nil {
|
||||
log.Warn("jobs store unavailable; jobs/executions API will return 503",
|
||||
"dir", jobsDir,
|
||||
"err", err,
|
||||
)
|
||||
} else {
|
||||
h.jobs = js
|
||||
}
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
@ -207,6 +234,17 @@ func NewWithStore(log *slog.Logger, client pdmClient, st *store.Store) *Handler
|
||||
return h
|
||||
}
|
||||
|
||||
// NewWithJobsStore is a test-only constructor that wires an in-memory
|
||||
// Handler with a jobs.Store rooted at a t.TempDir(). The 4 jobs HTTP
|
||||
// handlers can be exercised end-to-end against this construction
|
||||
// without standing up the full deployment store / PDM stack.
|
||||
func NewWithJobsStore(log *slog.Logger, js *jobs.Store) *Handler {
|
||||
return &Handler{
|
||||
log: log,
|
||||
jobs: js,
|
||||
}
|
||||
}
|
||||
|
||||
// NewWithStoreAndKubeconfigsDir is the persistence-aware constructor
|
||||
// the issue-#183 test suite uses to point both the deployment store
|
||||
// AND the kubeconfigs directory at t.TempDir() subdirectories. The
|
||||
|
||||
239
products/catalyst/bootstrap/api/internal/handler/jobs.go
Normal file
239
products/catalyst/bootstrap/api/internal/handler/jobs.go
Normal file
@ -0,0 +1,239 @@
|
||||
// Package handler — jobs.go: REST surface for the Jobs/Executions
|
||||
// data model the Sovereign Admin's table-view UX (epic #204) reads.
|
||||
//
|
||||
// Four endpoints, all read-only — every mutation flows through the
|
||||
// helmwatch bridge in internal/jobs.Bridge, which the Phase-1 watch
|
||||
// goroutine wires up.
|
||||
//
|
||||
// - GET /api/v1/deployments/{depId}/jobs — list Jobs
|
||||
// - GET /api/v1/deployments/{depId}/jobs/{jobId} — one Job +
|
||||
// executions
|
||||
// - GET /api/v1/actions/executions/{execId}/logs — paginated
|
||||
// LogLines
|
||||
// - GET /api/v1/deployments/{depId}/jobs/batches — per-batch
|
||||
// progress
|
||||
//
|
||||
// Backwards compat: the existing `/api/v1/deployments/{id}/events`
|
||||
// SSE feed is not modified. Both feeds live in parallel; the wizard
|
||||
// reads SSE for live banner state and the new table-view UX reads
|
||||
// these endpoints.
|
||||
package handler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
|
||||
)
|
||||
|
||||
// jobsStore returns the Handler's jobs.Store. Returns nil when
|
||||
// persistence is disabled (CI runners without write access to
|
||||
// /var/lib). Handlers map a nil store onto HTTP 503 so the operator
|
||||
// can tell "no jobs yet" (200 with empty list) apart from "store is
|
||||
// down" (503 with retry-after).
|
||||
func (h *Handler) jobsStore() *jobs.Store {
|
||||
return h.jobs
|
||||
}
|
||||
|
||||
// ListJobs handles GET /api/v1/deployments/{depId}/jobs.
|
||||
//
|
||||
// Returns `{ "jobs": [...] }` — the slice is sorted started-at DESC
|
||||
// with pending Jobs (no StartedAt) bucketed last. Empty deployment →
|
||||
// empty slice (not null) so the JSON shape never breaks the
|
||||
// frontend's render loop.
|
||||
func (h *Handler) ListJobs(w http.ResponseWriter, r *http.Request) {
|
||||
st := h.jobsStore()
|
||||
if st == nil {
|
||||
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
|
||||
"error": "jobs-store-unavailable",
|
||||
"detail": "catalyst-api is running with persistence disabled — see Pod logs",
|
||||
})
|
||||
return
|
||||
}
|
||||
depID := strings.TrimSpace(chi.URLParam(r, "depId"))
|
||||
if depID == "" {
|
||||
writeJSON(w, http.StatusBadRequest, map[string]string{
|
||||
"error": "missing-depId",
|
||||
"detail": "deployment id path segment is required",
|
||||
})
|
||||
return
|
||||
}
|
||||
out, err := st.ListJobs(depID)
|
||||
if err != nil {
|
||||
h.log.Error("ListJobs: load index failed", "depId", depID, "err", err)
|
||||
writeJSON(w, http.StatusInternalServerError, map[string]string{
|
||||
"error": "store-read-failed",
|
||||
})
|
||||
return
|
||||
}
|
||||
if out == nil {
|
||||
out = []jobs.Job{}
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"jobs": out,
|
||||
})
|
||||
}
|
||||
|
||||
// GetJob handles GET /api/v1/deployments/{depId}/jobs/{jobId}.
|
||||
//
|
||||
// jobId is the "<deploymentId>:<jobName>" stable id. Chi routes a
|
||||
// colon as a literal so the parameter arrives intact; a stray segment
|
||||
// is rejected before hitting the store.
|
||||
//
|
||||
// Returns `{ "job": {...}, "executions": [...] }`. The executions
|
||||
// slice is sorted startedAt DESC so the most-recent attempt is index
|
||||
// 0 — matches the wire spec in #205 and the GitLab-CI runner
|
||||
// convention.
|
||||
func (h *Handler) GetJob(w http.ResponseWriter, r *http.Request) {
|
||||
st := h.jobsStore()
|
||||
if st == nil {
|
||||
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
|
||||
"error": "jobs-store-unavailable",
|
||||
})
|
||||
return
|
||||
}
|
||||
depID := strings.TrimSpace(chi.URLParam(r, "depId"))
|
||||
jobID := strings.TrimSpace(chi.URLParam(r, "jobId"))
|
||||
if depID == "" || jobID == "" {
|
||||
writeJSON(w, http.StatusBadRequest, map[string]string{
|
||||
"error": "missing-path-params",
|
||||
})
|
||||
return
|
||||
}
|
||||
job, execs, err := st.GetJob(depID, jobID)
|
||||
if err != nil {
|
||||
if errors.Is(err, jobs.ErrNotFound) {
|
||||
writeJSON(w, http.StatusNotFound, map[string]string{
|
||||
"error": "job-not-found",
|
||||
})
|
||||
return
|
||||
}
|
||||
h.log.Error("GetJob: load failed", "depId", depID, "jobId", jobID, "err", err)
|
||||
writeJSON(w, http.StatusInternalServerError, map[string]string{
|
||||
"error": "store-read-failed",
|
||||
})
|
||||
return
|
||||
}
|
||||
if execs == nil {
|
||||
execs = []jobs.Execution{}
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"job": job,
|
||||
"executions": execs,
|
||||
})
|
||||
}
|
||||
|
||||
// ListBatches handles GET /api/v1/deployments/{depId}/jobs/batches.
|
||||
//
|
||||
// Returns `{ "batches": [...] }` — one row per BatchID with progress
|
||||
// counts. Empty deployment → empty slice. The current implementation
|
||||
// always emits at most one batch ("bootstrap-kit") since Phase-1 is
|
||||
// the only Job source; future Day-2 batches will appear automatically
|
||||
// as the bridge writes them.
|
||||
func (h *Handler) ListBatches(w http.ResponseWriter, r *http.Request) {
|
||||
st := h.jobsStore()
|
||||
if st == nil {
|
||||
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
|
||||
"error": "jobs-store-unavailable",
|
||||
})
|
||||
return
|
||||
}
|
||||
depID := strings.TrimSpace(chi.URLParam(r, "depId"))
|
||||
if depID == "" {
|
||||
writeJSON(w, http.StatusBadRequest, map[string]string{
|
||||
"error": "missing-depId",
|
||||
})
|
||||
return
|
||||
}
|
||||
out, err := st.SummarizeBatches(depID)
|
||||
if err != nil {
|
||||
h.log.Error("ListBatches: summarize failed", "depId", depID, "err", err)
|
||||
writeJSON(w, http.StatusInternalServerError, map[string]string{
|
||||
"error": "store-read-failed",
|
||||
})
|
||||
return
|
||||
}
|
||||
if out == nil {
|
||||
out = []jobs.BatchSummary{}
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"batches": out,
|
||||
})
|
||||
}
|
||||
|
||||
// GetExecutionLogs handles GET
|
||||
// /api/v1/actions/executions/{execId}/logs?fromLine=N&limit=M.
|
||||
//
|
||||
// Returns `{ "lines": [...], "total": N, "executionFinished": bool }`.
|
||||
// Pagination contract:
|
||||
//
|
||||
// - fromLine — 1-indexed, default 1 (omitted / non-positive ⇒ 1).
|
||||
// - limit — default DefaultLogPageSize (500), max MaxLogPageSize
|
||||
// (5000). Out-of-range values are clamped silently —
|
||||
// the frontend's polling loop never has to retry on
|
||||
// 422.
|
||||
//
|
||||
// The endpoint deliberately omits the deploymentId from the URL path —
|
||||
// the spec in #205 wants a flat /actions/executions/{id}/logs surface
|
||||
// the GitLab-style viewer can deep-link to without juggling the
|
||||
// parent deployment id. The store walks every deployment subdir to
|
||||
// resolve the executionId.
|
||||
func (h *Handler) GetExecutionLogs(w http.ResponseWriter, r *http.Request) {
|
||||
st := h.jobsStore()
|
||||
if st == nil {
|
||||
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
|
||||
"error": "jobs-store-unavailable",
|
||||
})
|
||||
return
|
||||
}
|
||||
execID := strings.TrimSpace(chi.URLParam(r, "execId"))
|
||||
if execID == "" {
|
||||
writeJSON(w, http.StatusBadRequest, map[string]string{
|
||||
"error": "missing-execId",
|
||||
})
|
||||
return
|
||||
}
|
||||
q := r.URL.Query()
|
||||
fromLine, _ := strconv.Atoi(strings.TrimSpace(q.Get("fromLine")))
|
||||
limit, _ := strconv.Atoi(strings.TrimSpace(q.Get("limit")))
|
||||
|
||||
// Resolve the deploymentID by scanning the store. The Bridge
|
||||
// guarantees executionId uniqueness (16-byte hex) so first match
|
||||
// wins.
|
||||
exec, err := st.FindExecutionAcrossDeployments(execID)
|
||||
if err != nil {
|
||||
if errors.Is(err, jobs.ErrNotFound) {
|
||||
writeJSON(w, http.StatusNotFound, map[string]string{
|
||||
"error": "execution-not-found",
|
||||
})
|
||||
return
|
||||
}
|
||||
h.log.Error("GetExecutionLogs: lookup failed", "execId", execID, "err", err)
|
||||
writeJSON(w, http.StatusInternalServerError, map[string]string{
|
||||
"error": "store-read-failed",
|
||||
})
|
||||
return
|
||||
}
|
||||
page, err := st.PageLogs(exec.DeploymentID, execID, fromLine, limit)
|
||||
if err != nil {
|
||||
if errors.Is(err, jobs.ErrNotFound) {
|
||||
writeJSON(w, http.StatusNotFound, map[string]string{
|
||||
"error": "execution-not-found",
|
||||
})
|
||||
return
|
||||
}
|
||||
h.log.Error("GetExecutionLogs: page failed", "execId", execID, "err", err)
|
||||
writeJSON(w, http.StatusInternalServerError, map[string]string{
|
||||
"error": "store-read-failed",
|
||||
})
|
||||
return
|
||||
}
|
||||
if page.Lines == nil {
|
||||
page.Lines = []jobs.LogLine{}
|
||||
}
|
||||
writeJSON(w, http.StatusOK, page)
|
||||
}
|
||||
@ -0,0 +1,184 @@
|
||||
// jobs_helmwatch_test.go — integration test that proves a HelmRelease
|
||||
// component event flowing through emitWatchEvent (the single emit
|
||||
// path Phase 0 + Phase 1 share) materialises a Job + Execution +
|
||||
// LogLine in the jobs store. Sibling SSE feed must stay intact.
|
||||
package handler
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
|
||||
)
|
||||
|
||||
func TestEmitWatchEvent_PopulatesJobsStore(t *testing.T) {
|
||||
js, err := jobs.NewStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore: %v", err)
|
||||
}
|
||||
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), js)
|
||||
|
||||
// Build a minimal Deployment with the channels emitWatchEvent
|
||||
// expects. We don't run any goroutine; we just call the emit
|
||||
// method directly to assert the store-side projection.
|
||||
dep := &Deployment{
|
||||
ID: "dep-emit",
|
||||
eventsCh: make(chan provisioner.Event, 8),
|
||||
eventsBuf: nil,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Phase-0 event must NOT create a Job (filtered by bridge).
|
||||
h.emitWatchEvent(dep, provisioner.Event{
|
||||
Time: time.Now().UTC().Format(time.RFC3339),
|
||||
Phase: "tofu-apply",
|
||||
Level: "info",
|
||||
Message: "applying...",
|
||||
})
|
||||
|
||||
got, err := js.ListJobs("dep-emit")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(got) != 0 {
|
||||
t.Errorf("Phase-0 event must not create jobs, got %+v", got)
|
||||
}
|
||||
|
||||
// Phase-1 component event → Job + Execution + LogLine.
|
||||
t0 := time.Now().UTC()
|
||||
h.emitWatchEvent(dep, provisioner.Event{
|
||||
Time: t0.Format(time.RFC3339),
|
||||
Phase: "component",
|
||||
Level: "info",
|
||||
Component: "cilium",
|
||||
State: "installing",
|
||||
Message: "Helm install in progress",
|
||||
})
|
||||
h.emitWatchEvent(dep, provisioner.Event{
|
||||
Time: t0.Add(5 * time.Second).Format(time.RFC3339),
|
||||
Phase: "component",
|
||||
Level: "info",
|
||||
Component: "cilium",
|
||||
State: "installed",
|
||||
Message: "Ready=True",
|
||||
})
|
||||
|
||||
got, err = js.ListJobs("dep-emit")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("expected 1 job, got %d", len(got))
|
||||
}
|
||||
job := got[0]
|
||||
if job.JobName != "install-cilium" || job.AppID != "cilium" {
|
||||
t.Errorf("job metadata: %+v", job)
|
||||
}
|
||||
if job.Status != jobs.StatusSucceeded {
|
||||
t.Errorf("status: want succeeded, got %q", job.Status)
|
||||
}
|
||||
|
||||
// Bridge must NOT break the SSE stream — eventsBuf still records
|
||||
// every emit.
|
||||
dep.mu.Lock()
|
||||
bufLen := len(dep.eventsBuf)
|
||||
dep.mu.Unlock()
|
||||
if bufLen != 3 {
|
||||
t.Errorf("eventsBuf length: want 3 (1 phase-0 + 2 phase-1), got %d", bufLen)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmitWatchEvent_NoStore_NoCrash(t *testing.T) {
|
||||
// When the jobs store is nil (CI runner without write access)
|
||||
// emitWatchEvent must still record into the SSE buffer.
|
||||
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), nil)
|
||||
dep := &Deployment{
|
||||
ID: "dep-nostore",
|
||||
eventsCh: make(chan provisioner.Event, 4),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
h.emitWatchEvent(dep, provisioner.Event{
|
||||
Time: time.Now().UTC().Format(time.RFC3339),
|
||||
Phase: "component",
|
||||
Component: "cilium",
|
||||
State: "installed",
|
||||
Level: "info",
|
||||
Message: "ok",
|
||||
})
|
||||
dep.mu.Lock()
|
||||
bufLen := len(dep.eventsBuf)
|
||||
dep.mu.Unlock()
|
||||
if bufLen != 1 {
|
||||
t.Errorf("expected 1 buffered event, got %d", bufLen)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_AllFourEndpointsWired(t *testing.T) {
|
||||
// End-to-end smoke that the 4 endpoints can be routed and produce
|
||||
// well-shaped JSON. Mirrors the exact route patterns main.go uses.
|
||||
js, err := jobs.NewStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), js)
|
||||
r := chi.NewRouter()
|
||||
r.Get("/api/v1/deployments/{depId}/jobs", h.ListJobs)
|
||||
r.Get("/api/v1/deployments/{depId}/jobs/batches", h.ListBatches)
|
||||
r.Get("/api/v1/deployments/{depId}/jobs/{jobId}", h.GetJob)
|
||||
r.Get("/api/v1/actions/executions/{execId}/logs", h.GetExecutionLogs)
|
||||
|
||||
// Seed a deployment with a finished job + a tail of log lines.
|
||||
depID := "dep-router"
|
||||
if err := js.UpsertJob(jobs.Job{
|
||||
DeploymentID: depID,
|
||||
JobName: "install-cilium",
|
||||
AppID: "cilium",
|
||||
BatchID: jobs.BatchBootstrapKit,
|
||||
DependsOn: []string{},
|
||||
Status: jobs.StatusPending,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
exec, err := js.StartExecution(depID, "install-cilium", time.Now())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := js.AppendLogLines(depID, exec.ID, []jobs.LogLine{
|
||||
{Level: jobs.LevelInfo, Message: "first"},
|
||||
{Level: jobs.LevelInfo, Message: "second"},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := js.FinishExecution(depID, exec.ID, jobs.StatusSucceeded, time.Now()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
path string
|
||||
}{
|
||||
{"list-jobs", "/api/v1/deployments/" + depID + "/jobs"},
|
||||
{"get-job", "/api/v1/deployments/" + depID + "/jobs/" + jobs.JobID(depID, "install-cilium")},
|
||||
{"batches", "/api/v1/deployments/" + depID + "/jobs/batches"},
|
||||
{"logs", "/api/v1/actions/executions/" + exec.ID + "/logs?fromLine=1&limit=10"},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, c.path, nil))
|
||||
if rec.Code != 200 {
|
||||
t.Errorf("status %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if rec.Body.Len() == 0 {
|
||||
t.Error("empty body")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
274
products/catalyst/bootstrap/api/internal/handler/jobs_test.go
Normal file
274
products/catalyst/bootstrap/api/internal/handler/jobs_test.go
Normal file
@ -0,0 +1,274 @@
|
||||
// jobs_test.go — httptest-driven handler tests for the 4 Jobs/
|
||||
// Executions endpoints. Each test seeds a fresh in-memory store via
|
||||
// NewWithJobsStore(t.TempDir()), wires the chi router the production
|
||||
// main.go does, then asserts on the JSON shape end-to-end.
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
|
||||
)
|
||||
|
||||
// newJobsAPIRouter wires the same chi routes main.go does, but only
|
||||
// for the 4 endpoints under test. Avoids spinning up the full HTTP
|
||||
// surface for these unit tests.
|
||||
func newJobsAPIRouter(t *testing.T) (*chi.Mux, *jobs.Store, *Handler) {
|
||||
t.Helper()
|
||||
js, err := jobs.NewStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore: %v", err)
|
||||
}
|
||||
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), js)
|
||||
r := chi.NewRouter()
|
||||
r.Get("/api/v1/deployments/{depId}/jobs", h.ListJobs)
|
||||
r.Get("/api/v1/deployments/{depId}/jobs/batches", h.ListBatches)
|
||||
r.Get("/api/v1/deployments/{depId}/jobs/{jobId}", h.GetJob)
|
||||
r.Get("/api/v1/actions/executions/{execId}/logs", h.GetExecutionLogs)
|
||||
return r, js, h
|
||||
}
|
||||
|
||||
func decodeJSON(t *testing.T, body io.Reader, into any) {
|
||||
t.Helper()
|
||||
if err := json.NewDecoder(body).Decode(into); err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_ListJobs_Empty(t *testing.T) {
|
||||
r, _, _ := newJobsAPIRouter(t)
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/dep-empty/jobs", nil))
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("status: %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
body := rec.Body.String()
|
||||
var resp struct {
|
||||
Jobs []jobs.Job `json:"jobs"`
|
||||
}
|
||||
if err := json.NewDecoder(strings.NewReader(body)).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode: %v body=%s", err, body)
|
||||
}
|
||||
if resp.Jobs == nil {
|
||||
t.Fatal("jobs must be empty slice not null")
|
||||
}
|
||||
if len(resp.Jobs) != 0 {
|
||||
t.Errorf("expected 0 jobs, got %d", len(resp.Jobs))
|
||||
}
|
||||
// Verify the `jobs` key is `[]` not `null` in the raw body.
|
||||
if !strings.Contains(body, `"jobs"`) {
|
||||
t.Errorf("missing jobs key: %s", body)
|
||||
}
|
||||
if !strings.Contains(body, `[]`) {
|
||||
t.Errorf("expected empty array `[]`: %s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_ListJobs_Populated(t *testing.T) {
|
||||
r, st, _ := newJobsAPIRouter(t)
|
||||
depID := "dep-populated"
|
||||
|
||||
t0 := time.Date(2026, 4, 29, 12, 0, 0, 0, time.UTC)
|
||||
jobsToSeed := []jobs.Job{
|
||||
{DeploymentID: depID, JobName: "install-cilium", AppID: "cilium", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusSucceeded, StartedAt: &t0, FinishedAt: ptrTime(t0.Add(20 * time.Second))},
|
||||
{DeploymentID: depID, JobName: "install-flux", AppID: "flux", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusRunning, StartedAt: ptrTime(t0.Add(time.Minute))},
|
||||
{DeploymentID: depID, JobName: "install-keycloak", AppID: "keycloak", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusPending},
|
||||
}
|
||||
for _, j := range jobsToSeed {
|
||||
if err := st.UpsertJob(j); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/"+depID+"/jobs", nil))
|
||||
if rec.Code != 200 {
|
||||
t.Fatalf("code: %d", rec.Code)
|
||||
}
|
||||
var resp struct {
|
||||
Jobs []jobs.Job `json:"jobs"`
|
||||
}
|
||||
decodeJSON(t, rec.Body, &resp)
|
||||
if len(resp.Jobs) != 3 {
|
||||
t.Fatalf("expected 3 jobs, got %d", len(resp.Jobs))
|
||||
}
|
||||
// Started DESC: install-flux (t0+1m) first, install-cilium (t0)
|
||||
// second, install-keycloak (pending) last.
|
||||
wantOrder := []string{"install-flux", "install-cilium", "install-keycloak"}
|
||||
for i, w := range wantOrder {
|
||||
if resp.Jobs[i].JobName != w {
|
||||
t.Errorf("position %d: got %q want %q", i, resp.Jobs[i].JobName, w)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_GetJob_FoundAndNotFound(t *testing.T) {
|
||||
r, st, _ := newJobsAPIRouter(t)
|
||||
depID := "dep-getjob"
|
||||
|
||||
if err := st.UpsertJob(jobs.Job{
|
||||
DeploymentID: depID,
|
||||
JobName: "install-cilium",
|
||||
AppID: "cilium",
|
||||
BatchID: jobs.BatchBootstrapKit,
|
||||
Status: jobs.StatusPending,
|
||||
DependsOn: []string{"install-flux"},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
exec, err := st.StartExecution(depID, "install-cilium", time.Now())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
url := "/api/v1/deployments/" + depID + "/jobs/" + jobs.JobID(depID, "install-cilium")
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, url, nil))
|
||||
if rec.Code != 200 {
|
||||
t.Fatalf("status: %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
var resp struct {
|
||||
Job jobs.Job `json:"job"`
|
||||
Executions []jobs.Execution `json:"executions"`
|
||||
}
|
||||
decodeJSON(t, rec.Body, &resp)
|
||||
if resp.Job.AppID != "cilium" {
|
||||
t.Errorf("appId: %q", resp.Job.AppID)
|
||||
}
|
||||
if len(resp.Job.DependsOn) != 1 || resp.Job.DependsOn[0] != "install-flux" {
|
||||
t.Errorf("dependsOn: %+v", resp.Job.DependsOn)
|
||||
}
|
||||
if len(resp.Executions) != 1 || resp.Executions[0].ID != exec.ID {
|
||||
t.Errorf("executions: %+v", resp.Executions)
|
||||
}
|
||||
|
||||
// 404 path
|
||||
rec404 := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec404, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/"+depID+"/jobs/"+jobs.JobID(depID, "install-missing"), nil))
|
||||
if rec404.Code != http.StatusNotFound {
|
||||
t.Errorf("404 expected, got %d", rec404.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_GetExecutionLogs_Pagination(t *testing.T) {
|
||||
r, st, _ := newJobsAPIRouter(t)
|
||||
depID := "dep-logs"
|
||||
if err := st.UpsertJob(jobs.Job{DeploymentID: depID, JobName: "install-x"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
exec, err := st.StartExecution(depID, "install-x", time.Now())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
lines := make([]jobs.LogLine, 50)
|
||||
for i := range lines {
|
||||
lines[i] = jobs.LogLine{Level: jobs.LevelInfo, Message: "log"}
|
||||
}
|
||||
if err := st.AppendLogLines(depID, exec.ID, lines); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/actions/executions/"+exec.ID+"/logs?fromLine=10&limit=5", nil))
|
||||
if rec.Code != 200 {
|
||||
t.Fatalf("status: %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
var resp jobs.LogPage
|
||||
decodeJSON(t, rec.Body, &resp)
|
||||
if len(resp.Lines) != 5 {
|
||||
t.Fatalf("lines: %d", len(resp.Lines))
|
||||
}
|
||||
if resp.Lines[0].LineNumber != 10 || resp.Lines[4].LineNumber != 14 {
|
||||
t.Errorf("LineNumbers: %+v", resp.Lines)
|
||||
}
|
||||
if resp.Total != 50 {
|
||||
t.Errorf("total: %d", resp.Total)
|
||||
}
|
||||
if resp.ExecutionFinished {
|
||||
t.Errorf("ExecutionFinished should be false while running")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_GetExecutionLogs_NotFound(t *testing.T) {
|
||||
r, _, _ := newJobsAPIRouter(t)
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/actions/executions/no-such/logs", nil))
|
||||
if rec.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_ListBatches(t *testing.T) {
|
||||
r, st, _ := newJobsAPIRouter(t)
|
||||
depID := "dep-batches"
|
||||
|
||||
now := time.Now().UTC()
|
||||
seeds := []jobs.Job{
|
||||
{DeploymentID: depID, JobName: "install-a", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusSucceeded, StartedAt: &now},
|
||||
{DeploymentID: depID, JobName: "install-b", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusFailed, StartedAt: &now},
|
||||
{DeploymentID: depID, JobName: "install-c", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusRunning, StartedAt: &now},
|
||||
{DeploymentID: depID, JobName: "install-d", BatchID: jobs.BatchBootstrapKit, Status: jobs.StatusPending},
|
||||
}
|
||||
for _, j := range seeds {
|
||||
if err := st.UpsertJob(j); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/"+depID+"/jobs/batches", nil))
|
||||
if rec.Code != 200 {
|
||||
t.Fatalf("code: %d", rec.Code)
|
||||
}
|
||||
var resp struct {
|
||||
Batches []jobs.BatchSummary `json:"batches"`
|
||||
}
|
||||
decodeJSON(t, rec.Body, &resp)
|
||||
if len(resp.Batches) != 1 {
|
||||
t.Fatalf("batches: %+v", resp.Batches)
|
||||
}
|
||||
bs := resp.Batches[0]
|
||||
if bs.BatchID != jobs.BatchBootstrapKit ||
|
||||
bs.Total != 4 ||
|
||||
bs.Succeeded != 1 ||
|
||||
bs.Failed != 1 ||
|
||||
bs.Running != 1 ||
|
||||
bs.Pending != 1 ||
|
||||
bs.Finished != 2 {
|
||||
t.Errorf("batch summary: %+v", bs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_ListBatches_EmptySliceNotNull(t *testing.T) {
|
||||
r, _, _ := newJobsAPIRouter(t)
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/dep-x/jobs/batches", nil))
|
||||
if rec.Code != 200 {
|
||||
t.Fatalf("code: %d", rec.Code)
|
||||
}
|
||||
body := rec.Body.String()
|
||||
if !strings.Contains(body, `"batches":[]`) {
|
||||
t.Errorf("expected `\"batches\":[]`, got %s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_NoStore_503(t *testing.T) {
|
||||
h := NewWithJobsStore(slog.New(slog.NewJSONHandler(io.Discard, nil)), nil)
|
||||
r := chi.NewRouter()
|
||||
r.Get("/api/v1/deployments/{depId}/jobs", h.ListJobs)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/v1/deployments/d/jobs", nil))
|
||||
if rec.Code != http.StatusServiceUnavailable {
|
||||
t.Errorf("expected 503, got %d", rec.Code)
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,314 @@
|
||||
// helmwatch_bridge.go — translates helmwatch.Watcher events into Job +
|
||||
// Execution + LogLine writes against the Store.
|
||||
//
|
||||
// The bridge is a goroutine-safe object (every method takes the
|
||||
// store's mutex implicitly through the Store API). The catalyst-api
|
||||
// constructs one Bridge per deployment and feeds it via OnEvent on the
|
||||
// same path runProvisioning already feeds the SSE buffer. The two
|
||||
// feeds (jobs/executions REST + the existing SSE stream) live in
|
||||
// parallel — neither is ever the source of truth for the other.
|
||||
//
|
||||
// Mapping
|
||||
//
|
||||
// - Each bp-<chart> HelmRelease maps to exactly one Job whose
|
||||
// jobName="install-<chart>".
|
||||
// - The Bridge calls UpsertJob on every component event so a new
|
||||
// HelmRelease (the first time helmwatch emits it) materialises a
|
||||
// pending Job row.
|
||||
// - On the first transition out of helmwatch.StatePending the
|
||||
// Bridge calls StartExecution to allocate a new Execution row
|
||||
// and stamp the Job's StartedAt + LatestExecutionID.
|
||||
// - On every component event the Bridge appends a LogLine to the
|
||||
// active Execution's NDJSON file (level mapped via levelFor). The
|
||||
// bridge keeps no in-memory line buffer — append is the canonical
|
||||
// persistence path.
|
||||
// - On a terminal helmwatch state (Installed / Failed) the Bridge
|
||||
// calls FinishExecution which flips the Job's terminal status +
|
||||
// stamps DurationMs.
|
||||
//
|
||||
// DependsOn — the bridge does not derive dependsOn from helmwatch
|
||||
// events (helmwatch.Event carries only conditions). The catalyst-api
|
||||
// is expected to call SeedJobs once at watch start with the bp-*
|
||||
// HelmRelease specs (the bootstrap-kit YAMLs ship dependsOn metadata
|
||||
// the helmwatch reader can stamp directly via the same dynamic
|
||||
// informer it already runs). This file exposes the SeedJobs entry
|
||||
// point; the actual wiring lives in
|
||||
// internal/handler/phase1_watch.go.
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
|
||||
)
|
||||
|
||||
// Helmwatch state strings — kept here as untyped consts so the bridge
|
||||
// does not import the helmwatch package and create an import cycle
|
||||
// (helmwatch_test → handler → jobs would otherwise close back into
|
||||
// jobs needing helmwatch types).
|
||||
const (
|
||||
HelmStatePending = "pending"
|
||||
HelmStateInstalling = "installing"
|
||||
HelmStateInstalled = "installed"
|
||||
HelmStateDegraded = "degraded"
|
||||
HelmStateFailed = "failed"
|
||||
)
|
||||
|
||||
// Bridge holds the per-deployment cursor the helmwatch consumer needs:
|
||||
// which Execution is currently active for which Job. The cursor is
|
||||
// memory-only and is discarded on Pod restart — a resumed Phase-1
|
||||
// watch starts a fresh Execution row, which is the correct behaviour
|
||||
// (an Execution is "one watch attempt"; a Pod restart legitimately
|
||||
// counts as a new attempt).
|
||||
type Bridge struct {
|
||||
store *Store
|
||||
deploymentID string
|
||||
|
||||
// activeExecID — per-job map of the in-flight Execution.id. Set
|
||||
// on the first transition out of StatePending; cleared when the
|
||||
// Job reaches a terminal state. Concurrent OnEvent calls for
|
||||
// different Jobs are race-free because the Store serialises
|
||||
// every write under its own mutex; however, the per-Job cursor
|
||||
// itself is not accessed concurrently for the same Job by
|
||||
// design (helmwatch emits state changes for a given component
|
||||
// strictly sequentially).
|
||||
activeExecID map[string]string
|
||||
|
||||
// lastState — per-job last-seen helmwatch state, so the bridge
|
||||
// can suppress duplicate appends when helmwatch refires UpdateFunc
|
||||
// at the informer's resync cadence without an actual transition.
|
||||
lastState map[string]string
|
||||
}
|
||||
|
||||
// NewBridge returns a fresh Bridge for the given deployment id.
|
||||
// store must be non-nil; deploymentID must be non-empty.
|
||||
func NewBridge(store *Store, deploymentID string) *Bridge {
|
||||
return &Bridge{
|
||||
store: store,
|
||||
deploymentID: deploymentID,
|
||||
activeExecID: map[string]string{},
|
||||
lastState: map[string]string{},
|
||||
}
|
||||
}
|
||||
|
||||
// SeedJobs registers the supplied jobs against the deployment in a
|
||||
// pending state. Used by the catalyst-api at Phase-1 watch start to
|
||||
// pre-populate the table view with rows + dependsOn before any
|
||||
// HelmRelease has reconciled.
|
||||
//
|
||||
// Each spec carries the chart name (without the bp- prefix) and the
|
||||
// list of dependent chart names (also without the bp- prefix). The
|
||||
// bridge translates them to JobName + DependsOn list using the
|
||||
// install-<chart> convention.
|
||||
func (b *Bridge) SeedJobs(specs []SeedSpec) error {
|
||||
now := time.Now().UTC()
|
||||
for _, sp := range specs {
|
||||
j := Job{
|
||||
DeploymentID: b.deploymentID,
|
||||
JobName: JobNamePrefix + sp.Chart,
|
||||
AppID: sp.Chart,
|
||||
BatchID: BatchBootstrapKit,
|
||||
DependsOn: dependsOnFromCharts(sp.DependsOn),
|
||||
Status: StatusPending,
|
||||
}
|
||||
if err := b.store.UpsertJob(j); err != nil {
|
||||
return err
|
||||
}
|
||||
_ = now // reserved: future "createdAt" field, intentionally
|
||||
// unused for now — the wire spec in #205 doesn't include
|
||||
// it.
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SeedSpec is the per-Job seed input: the chart name (after the bp-
|
||||
// prefix is stripped) plus the list of dependent chart names (also
|
||||
// post-strip). The handler builds this list from the bootstrap-kit
|
||||
// YAMLs it already reads via the helmwatch dynamic informer.
|
||||
type SeedSpec struct {
|
||||
Chart string
|
||||
DependsOn []string
|
||||
}
|
||||
|
||||
// OnHelmReleaseEvent is the single entry point the helmwatch
|
||||
// consumer calls. componentID is helmwatch.ComponentIDFromHelmRelease
|
||||
// (the chart name with bp- stripped); state is one of the HelmState*
|
||||
// constants; level + message map onto the LogLine. Time is the wall-
|
||||
// clock instant of the event — the LogLine inherits it directly.
|
||||
//
|
||||
// The function is a no-op when state == previous state for the same
|
||||
// componentID — helmwatch's UpdateFunc fires on every status sub-
|
||||
// resource patch, including helm-controller's own observedGeneration
|
||||
// touches, and we don't want to persist a fresh LogLine for each.
|
||||
//
|
||||
// The bridge tolerates store errors (returns them) but does not abort
|
||||
// the helmwatch event loop — the handler's emit path treats this as
|
||||
// a non-fatal best-effort write.
|
||||
func (b *Bridge) OnHelmReleaseEvent(componentID, state, level, message string, t time.Time) error {
|
||||
jobName := JobNamePrefix + componentID
|
||||
|
||||
prev := b.lastState[componentID]
|
||||
if prev == state {
|
||||
return nil
|
||||
}
|
||||
b.lastState[componentID] = state
|
||||
|
||||
// Ensure the Job row exists. If SeedJobs was called this is a
|
||||
// no-op merge; if it wasn't (e.g. the bootstrap-kit hot-shipped a
|
||||
// new chart helmwatch wasn't seeded with) the bridge still
|
||||
// auto-creates a row so no event is dropped.
|
||||
if err := b.store.UpsertJob(Job{
|
||||
DeploymentID: b.deploymentID,
|
||||
JobName: jobName,
|
||||
AppID: componentID,
|
||||
BatchID: BatchBootstrapKit,
|
||||
DependsOn: []string{},
|
||||
Status: jobStatusFromHelmState(state),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Allocate an Execution if the Job has just become non-pending and
|
||||
// no Execution is active.
|
||||
execID := b.activeExecID[componentID]
|
||||
if execID == "" && state != HelmStatePending {
|
||||
exec, err := b.store.StartExecution(b.deploymentID, jobName, t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
execID = exec.ID
|
||||
b.activeExecID[componentID] = execID
|
||||
}
|
||||
|
||||
// Append a LogLine for every observed transition. The bridge
|
||||
// drops the line if there is no active execution (the Job is
|
||||
// still pending) — the table view's "started" column is
|
||||
// authoritative for pending rows; a LogLine without an Execution
|
||||
// has no display surface.
|
||||
if execID != "" {
|
||||
ll := LogLine{
|
||||
Timestamp: t.UTC(),
|
||||
Level: mapLevel(level),
|
||||
Message: buildLogMessage(componentID, state, message),
|
||||
}
|
||||
if err := b.store.AppendLogLines(b.deploymentID, execID, []LogLine{ll}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Terminal-state Job: finish the Execution + clear the cursor so
|
||||
// a future re-run (Day-2 retry) gets a fresh Execution row.
|
||||
if execID != "" && (state == HelmStateInstalled || state == HelmStateFailed) {
|
||||
final := StatusSucceeded
|
||||
if state == HelmStateFailed {
|
||||
final = StatusFailed
|
||||
}
|
||||
if err := b.store.FinishExecution(b.deploymentID, execID, final, t); err != nil {
|
||||
return err
|
||||
}
|
||||
delete(b.activeExecID, componentID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnProvisionerEvent is a convenience adapter: the handler's emit
|
||||
// path passes provisioner.Event (the same struct the SSE stream
|
||||
// carries). Only PhaseComponent events are forwarded — Phase-0 OpenTofu
|
||||
// events have no Job analogue and fall through silently.
|
||||
//
|
||||
// The function is allocation-light: it builds no event copies, just
|
||||
// translates strings.
|
||||
func (b *Bridge) OnProvisionerEvent(ev provisioner.Event) error {
|
||||
if ev.Phase != "component" || ev.Component == "" || ev.State == "" {
|
||||
return nil
|
||||
}
|
||||
t := parseEventTime(ev.Time)
|
||||
return b.OnHelmReleaseEvent(ev.Component, ev.State, ev.Level, ev.Message, t)
|
||||
}
|
||||
|
||||
// dependsOnFromCharts converts a list of dependent chart names (e.g.
|
||||
// ["cilium", "cert-manager"]) into the install-<chart> jobName form
|
||||
// the wire spec expects. Empty input yields an empty (non-nil) slice
|
||||
// so the JSON shape is `[]` not `null`.
|
||||
func dependsOnFromCharts(charts []string) []string {
|
||||
out := make([]string, 0, len(charts))
|
||||
for _, c := range charts {
|
||||
c = strings.TrimSpace(c)
|
||||
c = strings.TrimPrefix(c, "bp-")
|
||||
if c == "" {
|
||||
continue
|
||||
}
|
||||
out = append(out, JobNamePrefix+c)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// jobStatusFromHelmState maps helmwatch's State enum onto the Job
|
||||
// Status enum. The Bridge writes this through UpsertJob on every
|
||||
// event so the table view reflects current state without waiting for
|
||||
// a terminal transition.
|
||||
func jobStatusFromHelmState(state string) string {
|
||||
switch state {
|
||||
case HelmStateInstalled:
|
||||
return StatusSucceeded
|
||||
case HelmStateFailed:
|
||||
return StatusFailed
|
||||
case HelmStateInstalling, HelmStateDegraded:
|
||||
return StatusRunning
|
||||
case HelmStatePending:
|
||||
return StatusPending
|
||||
}
|
||||
return StatusPending
|
||||
}
|
||||
|
||||
// mapLevel translates the helmwatch event level (info|warn|error)
|
||||
// onto the LogLine wire-format level (INFO|WARN|ERROR|DEBUG). The
|
||||
// wire spec is uppercase per the GitLab-CI viewer convention (#204).
|
||||
func mapLevel(level string) string {
|
||||
switch strings.ToLower(strings.TrimSpace(level)) {
|
||||
case "error":
|
||||
return LevelError
|
||||
case "warn", "warning":
|
||||
return LevelWarn
|
||||
case "debug":
|
||||
return LevelDebug
|
||||
default:
|
||||
return LevelInfo
|
||||
}
|
||||
}
|
||||
|
||||
// buildLogMessage formats the LogLine.Message text. We prepend a
|
||||
// "[<state>]" tag so an operator scrolling the GitLab-style viewer
|
||||
// can scan transitions without horizontal scanning. The original
|
||||
// helm-controller message (HelmRelease.status.conditions[Ready].
|
||||
// Message) is preserved unchanged after the tag.
|
||||
func buildLogMessage(componentID, state, message string) string {
|
||||
state = strings.TrimSpace(state)
|
||||
message = strings.TrimSpace(message)
|
||||
if state == "" {
|
||||
return message
|
||||
}
|
||||
if message == "" {
|
||||
return "[" + state + "] " + componentID
|
||||
}
|
||||
return "[" + state + "] " + message
|
||||
}
|
||||
|
||||
// parseEventTime parses the RFC3339 timestamp helmwatch stamps onto
|
||||
// every Event. A bad parse falls back to time.Now() so a malformed
|
||||
// timestamp doesn't drop the LogLine.
|
||||
func parseEventTime(s string) time.Time {
|
||||
if s == "" {
|
||||
return time.Now().UTC()
|
||||
}
|
||||
t, err := time.Parse(time.RFC3339, s)
|
||||
if err != nil {
|
||||
t, err = time.Parse(time.RFC3339Nano, s)
|
||||
}
|
||||
if err != nil {
|
||||
return time.Now().UTC()
|
||||
}
|
||||
return t.UTC()
|
||||
}
|
||||
@ -0,0 +1,236 @@
|
||||
// helmwatch_bridge_test.go — assert that helmwatch component events
|
||||
// translate into the Job + Execution + LogLine writes the table-view
|
||||
// UX renders.
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
|
||||
)
|
||||
|
||||
func newBridgeFixture(t *testing.T) (*Store, *Bridge, string) {
|
||||
t.Helper()
|
||||
st, err := NewStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore: %v", err)
|
||||
}
|
||||
depID := "dep-bridge"
|
||||
return st, NewBridge(st, depID), depID
|
||||
}
|
||||
|
||||
func TestBridge_SeedJobs_StripsBpPrefix(t *testing.T) {
|
||||
st, br, depID := newBridgeFixture(t)
|
||||
if err := br.SeedJobs([]SeedSpec{
|
||||
{Chart: "cilium"},
|
||||
{Chart: "cert-manager", DependsOn: []string{"bp-cilium", "cilium"}},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, err := st.ListJobs(depID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("expected 2 jobs, got %d", len(got))
|
||||
}
|
||||
var cm Job
|
||||
for _, j := range got {
|
||||
if j.JobName == "install-cert-manager" {
|
||||
cm = j
|
||||
}
|
||||
}
|
||||
if cm.JobName == "" {
|
||||
t.Fatal("no install-cert-manager job")
|
||||
}
|
||||
if cm.AppID != "cert-manager" || cm.BatchID != BatchBootstrapKit || cm.Status != StatusPending {
|
||||
t.Errorf("seed metadata: %+v", cm)
|
||||
}
|
||||
// dependsOn: bp- prefix must be stripped, then install- prepended.
|
||||
want := []string{"install-cilium", "install-cilium"}
|
||||
if len(cm.DependsOn) != len(want) {
|
||||
t.Fatalf("dependsOn len: %v", cm.DependsOn)
|
||||
}
|
||||
for i, w := range want {
|
||||
if cm.DependsOn[i] != w {
|
||||
t.Errorf("dependsOn[%d]: got %q, want %q", i, cm.DependsOn[i], w)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBridge_OnHelmReleaseEvent_HappyPath(t *testing.T) {
|
||||
st, br, depID := newBridgeFixture(t)
|
||||
|
||||
t0 := time.Date(2026, 4, 29, 12, 0, 0, 0, time.UTC)
|
||||
if err := br.OnHelmReleaseEvent("cilium", HelmStatePending, "info", "observed", t0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, _ := st.ListJobs(depID)
|
||||
if len(got) != 1 || got[0].Status != StatusPending {
|
||||
t.Fatalf("after pending: %+v", got)
|
||||
}
|
||||
if got[0].LatestExecutionID != "" {
|
||||
t.Fatalf("Pending must not allocate an execution: %+v", got[0])
|
||||
}
|
||||
|
||||
// Transition into installing — allocates an Execution.
|
||||
t1 := t0.Add(2 * time.Second)
|
||||
if err := br.OnHelmReleaseEvent("cilium", HelmStateInstalling, "info", "Helm install in progress", t1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, _ = st.ListJobs(depID)
|
||||
if got[0].Status != StatusRunning {
|
||||
t.Errorf("status: want running, got %q", got[0].Status)
|
||||
}
|
||||
if got[0].LatestExecutionID == "" {
|
||||
t.Fatalf("execution not allocated")
|
||||
}
|
||||
if got[0].StartedAt == nil || !got[0].StartedAt.Equal(t1) {
|
||||
t.Errorf("StartedAt: got %v want %v", got[0].StartedAt, t1)
|
||||
}
|
||||
|
||||
// Terminal: installed.
|
||||
t2 := t1.Add(30 * time.Second)
|
||||
if err := br.OnHelmReleaseEvent("cilium", HelmStateInstalled, "info", "Ready=True", t2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
job, execs, err := st.GetJob(depID, JobID(depID, "install-cilium"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if job.Status != StatusSucceeded {
|
||||
t.Errorf("final status: want succeeded, got %q", job.Status)
|
||||
}
|
||||
if job.FinishedAt == nil || !job.FinishedAt.Equal(t2) {
|
||||
t.Errorf("FinishedAt: got %v want %v", job.FinishedAt, t2)
|
||||
}
|
||||
if job.DurationMs != 30000 {
|
||||
t.Errorf("DurationMs: got %d want 30000", job.DurationMs)
|
||||
}
|
||||
if len(execs) != 1 || execs[0].Status != StatusSucceeded {
|
||||
t.Errorf("executions: %+v", execs)
|
||||
}
|
||||
|
||||
// Logs: 2 transitions (installing, installed) → 2 lines, prefixed.
|
||||
page, _ := st.PageLogs(depID, execs[0].ID, 1, 100)
|
||||
if page.Total != 2 || len(page.Lines) != 2 {
|
||||
t.Fatalf("logs: %+v", page)
|
||||
}
|
||||
if !strings.HasPrefix(page.Lines[0].Message, "[installing]") {
|
||||
t.Errorf("line0 message prefix: %q", page.Lines[0].Message)
|
||||
}
|
||||
if !strings.HasPrefix(page.Lines[1].Message, "[installed]") {
|
||||
t.Errorf("line1 message prefix: %q", page.Lines[1].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBridge_OnHelmReleaseEvent_FailedTerminal(t *testing.T) {
|
||||
st, br, depID := newBridgeFixture(t)
|
||||
t0 := time.Now().UTC()
|
||||
if err := br.OnHelmReleaseEvent("flux", HelmStateInstalling, "info", "first reconcile", t0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := br.OnHelmReleaseEvent("flux", HelmStateFailed, "error", "InstallFailed: chart not found", t0.Add(time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
job, _, _ := st.GetJob(depID, JobID(depID, "install-flux"))
|
||||
if job.Status != StatusFailed {
|
||||
t.Errorf("status: want failed, got %q", job.Status)
|
||||
}
|
||||
page, _ := st.PageLogs(depID, job.LatestExecutionID, 1, 100)
|
||||
hasError := false
|
||||
for _, ll := range page.Lines {
|
||||
if ll.Level == LevelError {
|
||||
hasError = true
|
||||
}
|
||||
}
|
||||
if !hasError {
|
||||
t.Errorf("expected at least one ERROR log line, got %+v", page.Lines)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBridge_DuplicateStateSuppressed(t *testing.T) {
|
||||
st, br, depID := newBridgeFixture(t)
|
||||
t0 := time.Now().UTC()
|
||||
for i := 0; i < 5; i++ {
|
||||
if err := br.OnHelmReleaseEvent("foo", HelmStateInstalling, "info", "spinning", t0.Add(time.Duration(i)*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
job, _, _ := st.GetJob(depID, JobID(depID, "install-foo"))
|
||||
page, _ := st.PageLogs(depID, job.LatestExecutionID, 1, 100)
|
||||
// Only the first emit registers as a transition; the next four
|
||||
// repeats are suppressed by lastState.
|
||||
if page.Total != 1 {
|
||||
t.Errorf("expected 1 line for 5 dup events, got %d", page.Total)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBridge_OnProvisionerEvent_FiltersPhase0(t *testing.T) {
|
||||
st, br, depID := newBridgeFixture(t)
|
||||
// Phase-0 OpenTofu event has no Component/State — bridge must drop.
|
||||
if err := br.OnProvisionerEvent(provisioner.Event{
|
||||
Time: time.Now().UTC().Format(time.RFC3339),
|
||||
Phase: "tofu-apply",
|
||||
Level: "info",
|
||||
Message: "Apply complete",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, _ := st.ListJobs(depID)
|
||||
if len(got) != 0 {
|
||||
t.Errorf("Phase-0 event must not create jobs, got %+v", got)
|
||||
}
|
||||
|
||||
// Phase-1 component event creates a Job.
|
||||
if err := br.OnProvisionerEvent(provisioner.Event{
|
||||
Time: time.Now().UTC().Format(time.RFC3339),
|
||||
Phase: "component",
|
||||
Level: "info",
|
||||
Component: "cilium",
|
||||
State: HelmStateInstalling,
|
||||
Message: "in progress",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, _ = st.ListJobs(depID)
|
||||
if len(got) != 1 || got[0].JobName != "install-cilium" {
|
||||
t.Errorf("expected install-cilium job, got %+v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMapLevel(t *testing.T) {
|
||||
cases := map[string]string{
|
||||
"": LevelInfo,
|
||||
"info": LevelInfo,
|
||||
"warn": LevelWarn,
|
||||
"warning": LevelWarn,
|
||||
"error": LevelError,
|
||||
"debug": LevelDebug,
|
||||
"WEIRD": LevelInfo,
|
||||
}
|
||||
for in, want := range cases {
|
||||
if got := mapLevel(in); got != want {
|
||||
t.Errorf("mapLevel(%q): got %q, want %q", in, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobStatusFromHelmState(t *testing.T) {
|
||||
cases := map[string]string{
|
||||
HelmStateInstalled: StatusSucceeded,
|
||||
HelmStateFailed: StatusFailed,
|
||||
HelmStateInstalling: StatusRunning,
|
||||
HelmStateDegraded: StatusRunning,
|
||||
HelmStatePending: StatusPending,
|
||||
"": StatusPending,
|
||||
"unknown": StatusPending,
|
||||
}
|
||||
for in, want := range cases {
|
||||
if got := jobStatusFromHelmState(in); got != want {
|
||||
t.Errorf("jobStatusFromHelmState(%q): got %q, want %q", in, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
716
products/catalyst/bootstrap/api/internal/jobs/store.go
Normal file
716
products/catalyst/bootstrap/api/internal/jobs/store.go
Normal file
@ -0,0 +1,716 @@
|
||||
// store.go — flat-file persistence for Jobs + Executions + LogLines.
|
||||
//
|
||||
// Three on-disk artefacts per deployment:
|
||||
//
|
||||
// - <dir>/<deploymentId>/index.json — atomic temp+rename, holds
|
||||
// the Job + Execution
|
||||
// metadata.
|
||||
// - <dir>/<deploymentId>/<execId>.log — append-only NDJSON, one
|
||||
// LogLine per line.
|
||||
// - The directory itself is created at 0o700 the first time the
|
||||
// store touches a deployment.
|
||||
//
|
||||
// Atomicity: every persistIndex call writes to a sibling temp file then
|
||||
// os.Rename. Concurrent calls are serialised under Store.mu so the
|
||||
// rename is the linearisation point — a crash mid-write leaves the old
|
||||
// index intact (or, on first write, a missing file the load path
|
||||
// treats as "no jobs yet").
|
||||
//
|
||||
// NDJSON append: opened O_APPEND on every LogLines write. The store
|
||||
// holds Store.mu around the open+write+close so concurrent writers
|
||||
// can't interleave bytes (NDJSON is line-oriented; partial writes
|
||||
// would corrupt parsing).
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DefaultDir — the on-PVC path the chart already mounts at
|
||||
// /var/lib/catalyst (see products/catalyst/chart/templates/api-deployment.yaml).
|
||||
// Per docs/INVIOLABLE-PRINCIPLES.md #4 the path is configuration, not
|
||||
// code; the env var CATALYST_EXECUTIONS_DIR overrides it.
|
||||
const DefaultDir = "/var/lib/catalyst/executions"
|
||||
|
||||
// EnvDir is the env var the catalyst-api main reads to override the
|
||||
// store directory. Empty / unset falls back to DefaultDir.
|
||||
const EnvDir = "CATALYST_EXECUTIONS_DIR"
|
||||
|
||||
// indexFileName — the per-deployment metadata file.
|
||||
const indexFileName = "index.json"
|
||||
|
||||
// MaxLogPageSize — upper bound the API enforces on the /logs
|
||||
// pagination `limit` query param. The wire spec in #205 documents the
|
||||
// same number. Hardcoded here so the store's pagination helper agrees
|
||||
// with the handler.
|
||||
const MaxLogPageSize = 5000
|
||||
|
||||
// DefaultLogPageSize — default `limit` when the caller omits the query
|
||||
// param.
|
||||
const DefaultLogPageSize = 500
|
||||
|
||||
// ErrNotFound is returned when the requested Job, Execution, or
|
||||
// Deployment doesn't exist in the store. Callers map this onto HTTP
|
||||
// 404; tests assert on errors.Is.
|
||||
var ErrNotFound = errors.New("jobs: not found")
|
||||
|
||||
// Store is the flat-file persistence layer for Jobs + Executions +
|
||||
// LogLines. Construct via NewStore; Close is a no-op (no FDs are kept
|
||||
// open between calls).
|
||||
//
|
||||
// All writes are serialised under mu — the store is designed for
|
||||
// dozens of writes/sec from a single helmwatch goroutine, not high-
|
||||
// concurrency log ingestion. Reads also take mu so a partially-written
|
||||
// index can never be observed by GET /jobs.
|
||||
type Store struct {
|
||||
dir string
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewStore returns a Store rooted at dir, creating the directory at
|
||||
// 0o700 if missing. A failure to create the root directory is fatal —
|
||||
// production manifests guarantee the PVC exists, and a CI runner
|
||||
// without write access surfaces an unmistakable error rather than
|
||||
// silently dropping logs.
|
||||
func NewStore(dir string) (*Store, error) {
|
||||
dir = strings.TrimSpace(dir)
|
||||
if dir == "" {
|
||||
return nil, errors.New("jobs: store directory is required (set CATALYST_EXECUTIONS_DIR or pass DefaultDir)")
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0o700); err != nil {
|
||||
return nil, fmt.Errorf("jobs: create store dir %q: %w", dir, err)
|
||||
}
|
||||
return &Store{dir: dir}, nil
|
||||
}
|
||||
|
||||
// Dir returns the absolute root path the Store persists to. Used by
|
||||
// log-paths the handler renders into operator diagnostics.
|
||||
func (s *Store) Dir() string {
|
||||
return s.dir
|
||||
}
|
||||
|
||||
// deploymentDir returns the per-deployment subdirectory, ensuring it
|
||||
// exists at 0o700. Called from every mutator under s.mu.
|
||||
func (s *Store) deploymentDir(deploymentID string) (string, error) {
|
||||
if strings.TrimSpace(deploymentID) == "" {
|
||||
return "", errors.New("jobs: deploymentID is required")
|
||||
}
|
||||
// Disallow path-traversal — the deploymentId comes from
|
||||
// CreateDeployment which uses crypto/rand hex, but defence-in-
|
||||
// depth: reject any id that contains a path separator.
|
||||
if strings.ContainsAny(deploymentID, "/\\") {
|
||||
return "", fmt.Errorf("jobs: invalid deploymentID %q", deploymentID)
|
||||
}
|
||||
d := filepath.Join(s.dir, deploymentID)
|
||||
if err := os.MkdirAll(d, 0o700); err != nil {
|
||||
return "", fmt.Errorf("jobs: create deployment dir %q: %w", d, err)
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// loadIndex reads <depDir>/index.json. Returns a fresh zero-Index when
|
||||
// the file is missing — that's a "no jobs yet" deployment, not an
|
||||
// error. Caller MUST hold s.mu.
|
||||
func (s *Store) loadIndex(deploymentID string) (*Index, error) {
|
||||
depDir, err := s.deploymentDir(deploymentID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
path := filepath.Join(depDir, indexFileName)
|
||||
raw, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return &Index{
|
||||
DeploymentID: deploymentID,
|
||||
Jobs: []Job{},
|
||||
Executions: []Execution{},
|
||||
}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("jobs: read index %q: %w", path, err)
|
||||
}
|
||||
var idx Index
|
||||
if err := json.Unmarshal(raw, &idx); err != nil {
|
||||
return nil, fmt.Errorf("jobs: decode index %q: %w", path, err)
|
||||
}
|
||||
if idx.Jobs == nil {
|
||||
idx.Jobs = []Job{}
|
||||
}
|
||||
if idx.Executions == nil {
|
||||
idx.Executions = []Execution{}
|
||||
}
|
||||
idx.DeploymentID = deploymentID
|
||||
return &idx, nil
|
||||
}
|
||||
|
||||
// persistIndex writes idx to <depDir>/index.json via atomic
|
||||
// temp+rename. The temp file is written at 0o600 so concurrent readers
|
||||
// either see the old version or the new one — never a partial write.
|
||||
// Caller MUST hold s.mu.
|
||||
func (s *Store) persistIndex(idx *Index) error {
|
||||
depDir, err := s.deploymentDir(idx.DeploymentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
final := filepath.Join(depDir, indexFileName)
|
||||
|
||||
raw, err := json.MarshalIndent(idx, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("jobs: marshal index: %w", err)
|
||||
}
|
||||
|
||||
tmp, err := os.CreateTemp(depDir, ".index-*.json.tmp")
|
||||
if err != nil {
|
||||
return fmt.Errorf("jobs: create temp index: %w", err)
|
||||
}
|
||||
tmpPath := tmp.Name()
|
||||
cleanup := true
|
||||
defer func() {
|
||||
if cleanup {
|
||||
_ = os.Remove(tmpPath)
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := tmp.Write(raw); err != nil {
|
||||
_ = tmp.Close()
|
||||
return fmt.Errorf("jobs: write temp index %q: %w", tmpPath, err)
|
||||
}
|
||||
if err := tmp.Sync(); err != nil {
|
||||
_ = tmp.Close()
|
||||
return fmt.Errorf("jobs: fsync temp index %q: %w", tmpPath, err)
|
||||
}
|
||||
if err := tmp.Close(); err != nil {
|
||||
return fmt.Errorf("jobs: close temp index %q: %w", tmpPath, err)
|
||||
}
|
||||
if err := os.Chmod(tmpPath, 0o600); err != nil {
|
||||
return fmt.Errorf("jobs: chmod temp index %q: %w", tmpPath, err)
|
||||
}
|
||||
if err := os.Rename(tmpPath, final); err != nil {
|
||||
return fmt.Errorf("jobs: rename temp index → %q: %w", final, err)
|
||||
}
|
||||
cleanup = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpsertJob inserts or updates the Job with id JobID(deploymentID,
|
||||
// jobName). The supplied Job's ID + DeploymentID are stamped from the
|
||||
// arguments — callers don't have to spell them.
|
||||
//
|
||||
// The merge keeps StartedAt + FinishedAt monotonic: a re-emission with
|
||||
// nil StartedAt won't clobber a previously-stamped one. The frontend
|
||||
// never sees a job "un-start".
|
||||
func (s *Store) UpsertJob(j Job) error {
|
||||
if strings.TrimSpace(j.DeploymentID) == "" {
|
||||
return errors.New("jobs: UpsertJob: deploymentID is required")
|
||||
}
|
||||
if strings.TrimSpace(j.JobName) == "" {
|
||||
return errors.New("jobs: UpsertJob: jobName is required")
|
||||
}
|
||||
j.ID = JobID(j.DeploymentID, j.JobName)
|
||||
if j.DependsOn == nil {
|
||||
j.DependsOn = []string{}
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
idx, err := s.loadIndex(j.DeploymentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range idx.Jobs {
|
||||
if idx.Jobs[i].ID == j.ID {
|
||||
merged := mergeJob(idx.Jobs[i], j)
|
||||
idx.Jobs[i] = merged
|
||||
return s.persistIndex(idx)
|
||||
}
|
||||
}
|
||||
idx.Jobs = append(idx.Jobs, j)
|
||||
return s.persistIndex(idx)
|
||||
}
|
||||
|
||||
// mergeJob keeps monotonic timestamps + the latest non-empty
|
||||
// LatestExecutionID. The status from the new event always wins (the
|
||||
// helmwatch bridge is the only writer; later state-machine events
|
||||
// supersede earlier ones).
|
||||
func mergeJob(prev, next Job) Job {
|
||||
out := next
|
||||
if next.StartedAt == nil && prev.StartedAt != nil {
|
||||
out.StartedAt = prev.StartedAt
|
||||
}
|
||||
if next.FinishedAt == nil && prev.FinishedAt != nil {
|
||||
out.FinishedAt = prev.FinishedAt
|
||||
}
|
||||
if next.LatestExecutionID == "" && prev.LatestExecutionID != "" {
|
||||
out.LatestExecutionID = prev.LatestExecutionID
|
||||
}
|
||||
if out.StartedAt != nil && out.FinishedAt != nil {
|
||||
out.DurationMs = out.FinishedAt.Sub(*out.StartedAt).Milliseconds()
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// StartExecution allocates a new Execution row for the given Job and
|
||||
// stamps the Job's LatestExecutionID + StartedAt + Status=running. The
|
||||
// returned Execution.ID is the path-segment component the /logs
|
||||
// endpoint accepts. Caller is responsible for writing the matching
|
||||
// Job upsert with appId/batchId metadata BEFORE the first
|
||||
// StartExecution — the store does not back-fill those fields.
|
||||
func (s *Store) StartExecution(deploymentID, jobName string, startedAt time.Time) (Execution, error) {
|
||||
if strings.TrimSpace(deploymentID) == "" {
|
||||
return Execution{}, errors.New("jobs: StartExecution: deploymentID is required")
|
||||
}
|
||||
if strings.TrimSpace(jobName) == "" {
|
||||
return Execution{}, errors.New("jobs: StartExecution: jobName is required")
|
||||
}
|
||||
|
||||
jobID := JobID(deploymentID, jobName)
|
||||
execID, err := newExecutionID()
|
||||
if err != nil {
|
||||
return Execution{}, err
|
||||
}
|
||||
|
||||
exec := Execution{
|
||||
ID: execID,
|
||||
JobID: jobID,
|
||||
DeploymentID: deploymentID,
|
||||
Status: StatusRunning,
|
||||
StartedAt: startedAt.UTC(),
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
idx, err := s.loadIndex(deploymentID)
|
||||
if err != nil {
|
||||
return Execution{}, err
|
||||
}
|
||||
idx.Executions = append(idx.Executions, exec)
|
||||
// Stamp the Job's LatestExecutionID + flip Status=running so the
|
||||
// table view reflects the in-flight attempt without a separate
|
||||
// UpsertJob call from the bridge.
|
||||
for i := range idx.Jobs {
|
||||
if idx.Jobs[i].ID == jobID {
|
||||
started := startedAt.UTC()
|
||||
if idx.Jobs[i].StartedAt == nil {
|
||||
idx.Jobs[i].StartedAt = &started
|
||||
}
|
||||
idx.Jobs[i].Status = StatusRunning
|
||||
idx.Jobs[i].LatestExecutionID = execID
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := s.persistIndex(idx); err != nil {
|
||||
return Execution{}, err
|
||||
}
|
||||
return exec, nil
|
||||
}
|
||||
|
||||
// FinishExecution flips an Execution's Status + FinishedAt + flips the
|
||||
// parent Job into the corresponding terminal state. status must be
|
||||
// StatusSucceeded or StatusFailed.
|
||||
func (s *Store) FinishExecution(deploymentID, execID, status string, finishedAt time.Time) error {
|
||||
if !IsTerminal(status) {
|
||||
return fmt.Errorf("jobs: FinishExecution: status must be terminal, got %q", status)
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
idx, err := s.loadIndex(deploymentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
finished := finishedAt.UTC()
|
||||
var jobID string
|
||||
found := false
|
||||
for i := range idx.Executions {
|
||||
if idx.Executions[i].ID == execID {
|
||||
idx.Executions[i].Status = status
|
||||
idx.Executions[i].FinishedAt = &finished
|
||||
jobID = idx.Executions[i].JobID
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return fmt.Errorf("jobs: FinishExecution: execution %q: %w", execID, ErrNotFound)
|
||||
}
|
||||
for i := range idx.Jobs {
|
||||
if idx.Jobs[i].ID == jobID {
|
||||
idx.Jobs[i].Status = status
|
||||
idx.Jobs[i].FinishedAt = &finished
|
||||
if idx.Jobs[i].StartedAt != nil {
|
||||
idx.Jobs[i].DurationMs = finished.Sub(*idx.Jobs[i].StartedAt).Milliseconds()
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
return s.persistIndex(idx)
|
||||
}
|
||||
|
||||
// AppendLogLines appends one or more LogLines to the per-execution
|
||||
// NDJSON file. Stamps LineNumber 1-indexed, monotonic across calls.
|
||||
// Updates the parent Execution's LineCount under the same lock so
|
||||
// subsequent /logs?total reflects the new ceiling.
|
||||
//
|
||||
// Lines is a slice so a bridge that emits batched events (e.g. one
|
||||
// state transition + a derived "Helm install in progress" log line)
|
||||
// can persist them in a single write.
|
||||
func (s *Store) AppendLogLines(deploymentID, execID string, lines []LogLine) error {
|
||||
if len(lines) == 0 {
|
||||
return nil
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
idx, err := s.loadIndex(deploymentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var exec *Execution
|
||||
for i := range idx.Executions {
|
||||
if idx.Executions[i].ID == execID {
|
||||
exec = &idx.Executions[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if exec == nil {
|
||||
return fmt.Errorf("jobs: AppendLogLines: execution %q: %w", execID, ErrNotFound)
|
||||
}
|
||||
|
||||
depDir, err := s.deploymentDir(deploymentID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logPath := filepath.Join(depDir, execID+".log")
|
||||
f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
|
||||
if err != nil {
|
||||
return fmt.Errorf("jobs: open log %q: %w", logPath, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
bw := bufio.NewWriter(f)
|
||||
startLine := exec.LineCount
|
||||
for i := range lines {
|
||||
startLine++
|
||||
lines[i].LineNumber = startLine
|
||||
if lines[i].Timestamp.IsZero() {
|
||||
lines[i].Timestamp = time.Now().UTC()
|
||||
} else {
|
||||
lines[i].Timestamp = lines[i].Timestamp.UTC()
|
||||
}
|
||||
if lines[i].Level == "" {
|
||||
lines[i].Level = LevelInfo
|
||||
}
|
||||
raw, err := json.Marshal(lines[i])
|
||||
if err != nil {
|
||||
return fmt.Errorf("jobs: marshal log line: %w", err)
|
||||
}
|
||||
if _, err := bw.Write(raw); err != nil {
|
||||
return fmt.Errorf("jobs: write log %q: %w", logPath, err)
|
||||
}
|
||||
if err := bw.WriteByte('\n'); err != nil {
|
||||
return fmt.Errorf("jobs: write log newline %q: %w", logPath, err)
|
||||
}
|
||||
}
|
||||
if err := bw.Flush(); err != nil {
|
||||
return fmt.Errorf("jobs: flush log %q: %w", logPath, err)
|
||||
}
|
||||
if err := f.Sync(); err != nil {
|
||||
return fmt.Errorf("jobs: fsync log %q: %w", logPath, err)
|
||||
}
|
||||
exec.LineCount = startLine
|
||||
return s.persistIndex(idx)
|
||||
}
|
||||
|
||||
// ListJobs returns every Job for the deployment, sorted started-at
|
||||
// DESC with pending Jobs (no StartedAt) bucketed last. The handler
|
||||
// returns the slice unchanged.
|
||||
func (s *Store) ListJobs(deploymentID string) ([]Job, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
idx, err := s.loadIndex(deploymentID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := make([]Job, len(idx.Jobs))
|
||||
copy(out, idx.Jobs)
|
||||
sort.SliceStable(out, func(i, j int) bool {
|
||||
// Pending (no StartedAt) sort last.
|
||||
ai, bi := out[i].StartedAt, out[j].StartedAt
|
||||
switch {
|
||||
case ai == nil && bi == nil:
|
||||
return out[i].JobName < out[j].JobName
|
||||
case ai == nil:
|
||||
return false
|
||||
case bi == nil:
|
||||
return true
|
||||
}
|
||||
// started-at DESC: more-recent first.
|
||||
if ai.Equal(*bi) {
|
||||
return out[i].JobName < out[j].JobName
|
||||
}
|
||||
return ai.After(*bi)
|
||||
})
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// GetJob returns the Job + its Executions list. ErrNotFound if no Job
|
||||
// with the given id exists for the deployment.
|
||||
func (s *Store) GetJob(deploymentID, jobID string) (Job, []Execution, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
idx, err := s.loadIndex(deploymentID)
|
||||
if err != nil {
|
||||
return Job{}, nil, err
|
||||
}
|
||||
for i := range idx.Jobs {
|
||||
if idx.Jobs[i].ID == jobID {
|
||||
execs := []Execution{}
|
||||
for _, e := range idx.Executions {
|
||||
if e.JobID == jobID {
|
||||
execs = append(execs, e)
|
||||
}
|
||||
}
|
||||
sort.Slice(execs, func(a, b int) bool {
|
||||
return execs[a].StartedAt.After(execs[b].StartedAt)
|
||||
})
|
||||
return idx.Jobs[i], execs, nil
|
||||
}
|
||||
}
|
||||
return Job{}, nil, fmt.Errorf("jobs: GetJob %q: %w", jobID, ErrNotFound)
|
||||
}
|
||||
|
||||
// GetExecution returns the Execution metadata + the parent
|
||||
// deploymentID for resolving the log file path. The /logs endpoint
|
||||
// uses this so the URL only carries the executionID, not the
|
||||
// deployment id (matching the spec's
|
||||
// /api/v1/actions/executions/{execId}/logs shape).
|
||||
func (s *Store) FindExecution(deploymentID, execID string) (Execution, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
idx, err := s.loadIndex(deploymentID)
|
||||
if err != nil {
|
||||
return Execution{}, err
|
||||
}
|
||||
for _, e := range idx.Executions {
|
||||
if e.ID == execID {
|
||||
return e, nil
|
||||
}
|
||||
}
|
||||
return Execution{}, fmt.Errorf("jobs: FindExecution %q: %w", execID, ErrNotFound)
|
||||
}
|
||||
|
||||
// FindExecutionAcrossDeployments scans every <depId>/index.json under
|
||||
// the store root for an execution with the given id. Used by the
|
||||
// /api/v1/actions/executions/{execId}/logs endpoint where the URL
|
||||
// does not carry the deploymentID — see the contract spec in #205.
|
||||
//
|
||||
// Returns the Execution + its DeploymentID. Stops scanning at the
|
||||
// first match. ErrNotFound when no deployment has it.
|
||||
func (s *Store) FindExecutionAcrossDeployments(execID string) (Execution, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
entries, err := os.ReadDir(s.dir)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return Execution{}, fmt.Errorf("jobs: FindExecutionAcrossDeployments %q: %w", execID, ErrNotFound)
|
||||
}
|
||||
return Execution{}, fmt.Errorf("jobs: scan store dir %q: %w", s.dir, err)
|
||||
}
|
||||
for _, ent := range entries {
|
||||
if !ent.IsDir() {
|
||||
continue
|
||||
}
|
||||
idx, err := s.loadIndex(ent.Name())
|
||||
if err != nil {
|
||||
// A single corrupt deployment must not poison the lookup;
|
||||
// the API returns 404 only if NO deployment matches.
|
||||
continue
|
||||
}
|
||||
for _, e := range idx.Executions {
|
||||
if e.ID == execID {
|
||||
return e, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return Execution{}, fmt.Errorf("jobs: FindExecutionAcrossDeployments %q: %w", execID, ErrNotFound)
|
||||
}
|
||||
|
||||
// LogPage is the wire-contract response shape for the /logs endpoint.
|
||||
// Defined in the store package so the handler doesn't have to
|
||||
// re-declare it.
|
||||
type LogPage struct {
|
||||
Lines []LogLine `json:"lines"`
|
||||
Total int `json:"total"`
|
||||
ExecutionFinished bool `json:"executionFinished"`
|
||||
}
|
||||
|
||||
// PageLogs returns a window into the Execution's NDJSON log file.
|
||||
// fromLine is 1-indexed (matches LogLine.LineNumber); limit is
|
||||
// clamped to [1, MaxLogPageSize] with DefaultLogPageSize on
|
||||
// fromLine==0/limit==0.
|
||||
func (s *Store) PageLogs(deploymentID, execID string, fromLine, limit int) (LogPage, error) {
|
||||
if fromLine <= 0 {
|
||||
fromLine = 1
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = DefaultLogPageSize
|
||||
}
|
||||
if limit > MaxLogPageSize {
|
||||
limit = MaxLogPageSize
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
idx, err := s.loadIndex(deploymentID)
|
||||
if err != nil {
|
||||
return LogPage{}, err
|
||||
}
|
||||
var exec *Execution
|
||||
for i := range idx.Executions {
|
||||
if idx.Executions[i].ID == execID {
|
||||
exec = &idx.Executions[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if exec == nil {
|
||||
return LogPage{}, fmt.Errorf("jobs: PageLogs %q: %w", execID, ErrNotFound)
|
||||
}
|
||||
|
||||
depDir, err := s.deploymentDir(deploymentID)
|
||||
if err != nil {
|
||||
return LogPage{}, err
|
||||
}
|
||||
logPath := filepath.Join(depDir, execID+".log")
|
||||
f, err := os.Open(logPath)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
// No log file yet — execution started but no LogLines
|
||||
// were appended. That's a valid empty-page response.
|
||||
return LogPage{
|
||||
Lines: []LogLine{},
|
||||
Total: exec.LineCount,
|
||||
ExecutionFinished: IsTerminal(exec.Status),
|
||||
}, nil
|
||||
}
|
||||
return LogPage{}, fmt.Errorf("jobs: open log %q: %w", logPath, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
out := make([]LogLine, 0, limit)
|
||||
br := bufio.NewReader(f)
|
||||
lineNum := 0
|
||||
for {
|
||||
raw, err := br.ReadBytes('\n')
|
||||
if len(raw) > 0 {
|
||||
lineNum++
|
||||
if lineNum >= fromLine && len(out) < limit {
|
||||
var ll LogLine
|
||||
if uerr := json.Unmarshal(stripNewline(raw), &ll); uerr == nil {
|
||||
out = append(out, ll)
|
||||
}
|
||||
}
|
||||
if len(out) >= limit {
|
||||
// Drain remaining lines just for the count — but we
|
||||
// have exec.LineCount on hand; abort scan early.
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
return LogPage{}, fmt.Errorf("jobs: read log %q: %w", logPath, err)
|
||||
}
|
||||
}
|
||||
return LogPage{
|
||||
Lines: out,
|
||||
Total: exec.LineCount,
|
||||
ExecutionFinished: IsTerminal(exec.Status),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func stripNewline(b []byte) []byte {
|
||||
if n := len(b); n > 0 && b[n-1] == '\n' {
|
||||
b = b[:n-1]
|
||||
}
|
||||
if n := len(b); n > 0 && b[n-1] == '\r' {
|
||||
b = b[:n-1]
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// SummarizeBatches groups Jobs by BatchID and returns a per-batch
|
||||
// progress row. Empty deployment → empty slice (not nil) so the JSON
|
||||
// shape matches the spec's `{batches: []}` exactly.
|
||||
func (s *Store) SummarizeBatches(deploymentID string) ([]BatchSummary, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
idx, err := s.loadIndex(deploymentID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
byBatch := map[string]*BatchSummary{}
|
||||
order := []string{}
|
||||
for _, j := range idx.Jobs {
|
||||
bid := j.BatchID
|
||||
if bid == "" {
|
||||
bid = "(unbatched)"
|
||||
}
|
||||
bs, ok := byBatch[bid]
|
||||
if !ok {
|
||||
bs = &BatchSummary{BatchID: bid}
|
||||
byBatch[bid] = bs
|
||||
order = append(order, bid)
|
||||
}
|
||||
bs.Total++
|
||||
switch j.Status {
|
||||
case StatusSucceeded:
|
||||
bs.Succeeded++
|
||||
bs.Finished++
|
||||
case StatusFailed:
|
||||
bs.Failed++
|
||||
bs.Finished++
|
||||
case StatusRunning:
|
||||
bs.Running++
|
||||
case StatusPending, "":
|
||||
bs.Pending++
|
||||
}
|
||||
}
|
||||
out := make([]BatchSummary, 0, len(order))
|
||||
for _, bid := range order {
|
||||
out = append(out, *byBatch[bid])
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// newExecutionID returns a 16-byte hex string. Globally unique within
|
||||
// a deployment with vanishing collision probability — even at the
|
||||
// catalyst-api's maximum sustained emit rate (a few hundred per
|
||||
// minute) this is overkill, but cheap.
|
||||
func newExecutionID() (string, error) {
|
||||
b := make([]byte, 16)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
return "", fmt.Errorf("jobs: crypto/rand: %w", err)
|
||||
}
|
||||
return hex.EncodeToString(b), nil
|
||||
}
|
||||
456
products/catalyst/bootstrap/api/internal/jobs/store_test.go
Normal file
456
products/catalyst/bootstrap/api/internal/jobs/store_test.go
Normal file
@ -0,0 +1,456 @@
|
||||
// store_test.go — round-trip + pagination + atomic-write tests for the
|
||||
// Jobs/Executions store. Tests use t.TempDir() so they run without a
|
||||
// PVC and clean themselves up.
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func newTestStore(t *testing.T) *Store {
|
||||
t.Helper()
|
||||
st, err := NewStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatalf("NewStore: %v", err)
|
||||
}
|
||||
return st
|
||||
}
|
||||
|
||||
func TestStore_UpsertJob_RoundTrip(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
depID := "dep-1"
|
||||
|
||||
j := Job{
|
||||
DeploymentID: depID,
|
||||
JobName: "install-cilium",
|
||||
AppID: "cilium",
|
||||
BatchID: BatchBootstrapKit,
|
||||
DependsOn: []string{"install-flux"},
|
||||
Status: StatusPending,
|
||||
}
|
||||
if err := st.UpsertJob(j); err != nil {
|
||||
t.Fatalf("UpsertJob: %v", err)
|
||||
}
|
||||
|
||||
got, err := st.ListJobs(depID)
|
||||
if err != nil {
|
||||
t.Fatalf("ListJobs: %v", err)
|
||||
}
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("expected 1 job, got %d", len(got))
|
||||
}
|
||||
if got[0].ID != JobID(depID, "install-cilium") {
|
||||
t.Fatalf("ID mismatch: %q", got[0].ID)
|
||||
}
|
||||
if got[0].AppID != "cilium" || got[0].BatchID != BatchBootstrapKit {
|
||||
t.Fatalf("metadata mismatch: %+v", got[0])
|
||||
}
|
||||
if len(got[0].DependsOn) != 1 || got[0].DependsOn[0] != "install-flux" {
|
||||
t.Fatalf("dependsOn mismatch: %+v", got[0].DependsOn)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_UpsertJob_MergesMonotonicTimestamps(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
depID := "dep-2"
|
||||
|
||||
started := time.Date(2026, 4, 29, 12, 0, 0, 0, time.UTC)
|
||||
if err := st.UpsertJob(Job{
|
||||
DeploymentID: depID,
|
||||
JobName: "install-foo",
|
||||
StartedAt: &started,
|
||||
Status: StatusRunning,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Re-emit without StartedAt — the merge must preserve the prior value.
|
||||
if err := st.UpsertJob(Job{
|
||||
DeploymentID: depID,
|
||||
JobName: "install-foo",
|
||||
Status: StatusRunning,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, _ := st.ListJobs(depID)
|
||||
if got[0].StartedAt == nil || !got[0].StartedAt.Equal(started) {
|
||||
t.Fatalf("StartedAt clobbered: %+v", got[0].StartedAt)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_StartAndFinishExecution(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
depID := "dep-3"
|
||||
|
||||
if err := st.UpsertJob(Job{
|
||||
DeploymentID: depID,
|
||||
JobName: "install-foo",
|
||||
AppID: "foo",
|
||||
BatchID: BatchBootstrapKit,
|
||||
Status: StatusPending,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t0 := time.Now().UTC()
|
||||
exec, err := st.StartExecution(depID, "install-foo", t0)
|
||||
if err != nil {
|
||||
t.Fatalf("StartExecution: %v", err)
|
||||
}
|
||||
if exec.ID == "" || exec.Status != StatusRunning {
|
||||
t.Fatalf("bad exec: %+v", exec)
|
||||
}
|
||||
|
||||
job, execs, err := st.GetJob(depID, JobID(depID, "install-foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("GetJob: %v", err)
|
||||
}
|
||||
if job.Status != StatusRunning {
|
||||
t.Errorf("Job.Status: want running, got %q", job.Status)
|
||||
}
|
||||
if job.LatestExecutionID != exec.ID {
|
||||
t.Errorf("LatestExecutionID: want %q, got %q", exec.ID, job.LatestExecutionID)
|
||||
}
|
||||
if len(execs) != 1 {
|
||||
t.Fatalf("expected 1 exec, got %d", len(execs))
|
||||
}
|
||||
|
||||
t1 := t0.Add(5 * time.Second)
|
||||
if err := st.FinishExecution(depID, exec.ID, StatusSucceeded, t1); err != nil {
|
||||
t.Fatalf("FinishExecution: %v", err)
|
||||
}
|
||||
|
||||
job, _, _ = st.GetJob(depID, JobID(depID, "install-foo"))
|
||||
if job.Status != StatusSucceeded {
|
||||
t.Errorf("Job.Status: want succeeded, got %q", job.Status)
|
||||
}
|
||||
if job.FinishedAt == nil {
|
||||
t.Fatalf("Job.FinishedAt nil")
|
||||
}
|
||||
if job.DurationMs != 5000 {
|
||||
t.Errorf("DurationMs: want 5000, got %d", job.DurationMs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_FinishExecution_RejectsNonTerminal(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
if err := st.UpsertJob(Job{DeploymentID: "d", JobName: "install-x"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
exec, err := st.StartExecution("d", "install-x", time.Now())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := st.FinishExecution("d", exec.ID, StatusRunning, time.Now()); err == nil {
|
||||
t.Fatal("expected error finishing with non-terminal status")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_FinishExecution_NotFound(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
err := st.FinishExecution("d", "no-such-exec", StatusSucceeded, time.Now())
|
||||
if !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("want ErrNotFound, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_AppendLogLines_Pagination(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
depID := "dep-pag"
|
||||
if err := st.UpsertJob(Job{DeploymentID: depID, JobName: "install-x"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
exec, err := st.StartExecution(depID, "install-x", time.Now())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Append 100 lines.
|
||||
lines := make([]LogLine, 100)
|
||||
for i := range lines {
|
||||
lines[i] = LogLine{
|
||||
Level: LevelInfo,
|
||||
Message: "line-" + strings.Repeat(".", i%5),
|
||||
}
|
||||
}
|
||||
if err := st.AppendLogLines(depID, exec.ID, lines); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
page, err := st.PageLogs(depID, exec.ID, 1, 10)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(page.Lines) != 10 || page.Total != 100 {
|
||||
t.Fatalf("page1: %+v", page)
|
||||
}
|
||||
if page.Lines[0].LineNumber != 1 || page.Lines[9].LineNumber != 10 {
|
||||
t.Fatalf("LineNumber stamping: %+v", page.Lines)
|
||||
}
|
||||
if page.ExecutionFinished {
|
||||
t.Errorf("ExecutionFinished: want false (still running)")
|
||||
}
|
||||
|
||||
// Page 11..20.
|
||||
page2, err := st.PageLogs(depID, exec.ID, 11, 10)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(page2.Lines) != 10 || page2.Lines[0].LineNumber != 11 {
|
||||
t.Fatalf("page2: %+v", page2.Lines)
|
||||
}
|
||||
|
||||
// fromLine past total → empty page, executionFinished still false.
|
||||
pageEmpty, _ := st.PageLogs(depID, exec.ID, 200, 10)
|
||||
if len(pageEmpty.Lines) != 0 {
|
||||
t.Errorf("expected empty page, got %d", len(pageEmpty.Lines))
|
||||
}
|
||||
|
||||
// Limit > MaxLogPageSize is clamped.
|
||||
pageBig, _ := st.PageLogs(depID, exec.ID, 1, 99999)
|
||||
if len(pageBig.Lines) > MaxLogPageSize {
|
||||
t.Errorf("limit not clamped: got %d", len(pageBig.Lines))
|
||||
}
|
||||
|
||||
// Finish exec, executionFinished flips true.
|
||||
if err := st.FinishExecution(depID, exec.ID, StatusSucceeded, time.Now()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pageDone, _ := st.PageLogs(depID, exec.ID, 1, 5)
|
||||
if !pageDone.ExecutionFinished {
|
||||
t.Errorf("ExecutionFinished: want true after FinishExecution")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_ListJobs_SortStartedAtDescPendingLast(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
depID := "dep-sort"
|
||||
|
||||
t0 := time.Date(2026, 4, 29, 12, 0, 0, 0, time.UTC)
|
||||
t1 := t0.Add(10 * time.Second)
|
||||
t2 := t0.Add(20 * time.Second)
|
||||
|
||||
mkJob := func(name string, start *time.Time, status string) Job {
|
||||
return Job{
|
||||
DeploymentID: depID,
|
||||
JobName: name,
|
||||
Status: status,
|
||||
StartedAt: start,
|
||||
}
|
||||
}
|
||||
|
||||
jobs := []Job{
|
||||
mkJob("install-a", &t0, StatusSucceeded),
|
||||
mkJob("install-b", &t2, StatusRunning),
|
||||
mkJob("install-c", nil, StatusPending),
|
||||
mkJob("install-d", &t1, StatusFailed),
|
||||
}
|
||||
for _, j := range jobs {
|
||||
if err := st.UpsertJob(j); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
got, err := st.ListJobs(depID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
want := []string{"install-b", "install-d", "install-a", "install-c"}
|
||||
if len(got) != len(want) {
|
||||
t.Fatalf("length mismatch: %d vs %d", len(got), len(want))
|
||||
}
|
||||
for i, w := range want {
|
||||
if got[i].JobName != w {
|
||||
t.Errorf("position %d: got %q, want %q", i, got[i].JobName, w)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_GetJob_NotFound(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
_, _, err := st.GetJob("dep-x", JobID("dep-x", "install-missing"))
|
||||
if !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("want ErrNotFound, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_SummarizeBatches(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
depID := "dep-batch"
|
||||
|
||||
t0 := time.Now().UTC()
|
||||
cases := []struct {
|
||||
name string
|
||||
status string
|
||||
start *time.Time
|
||||
}{
|
||||
{"install-a", StatusSucceeded, &t0},
|
||||
{"install-b", StatusFailed, &t0},
|
||||
{"install-c", StatusRunning, &t0},
|
||||
{"install-d", StatusPending, nil},
|
||||
{"install-e", StatusSucceeded, &t0},
|
||||
}
|
||||
for _, c := range cases {
|
||||
if err := st.UpsertJob(Job{
|
||||
DeploymentID: depID,
|
||||
JobName: c.name,
|
||||
BatchID: BatchBootstrapKit,
|
||||
Status: c.status,
|
||||
StartedAt: c.start,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
out, err := st.SummarizeBatches(depID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(out) != 1 || out[0].BatchID != BatchBootstrapKit {
|
||||
t.Fatalf("expected one batch, got %+v", out)
|
||||
}
|
||||
bs := out[0]
|
||||
if bs.Total != 5 || bs.Succeeded != 2 || bs.Failed != 1 || bs.Running != 1 || bs.Pending != 1 || bs.Finished != 3 {
|
||||
t.Errorf("counts: %+v", bs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_AtomicIndexWrite_NoTempLeftBehind(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
depID := "dep-atomic"
|
||||
|
||||
for i := 0; i < 50; i++ {
|
||||
j := Job{
|
||||
DeploymentID: depID,
|
||||
JobName: "install-x",
|
||||
Status: StatusRunning,
|
||||
}
|
||||
if err := st.UpsertJob(j); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
depDir := filepath.Join(st.Dir(), depID)
|
||||
entries, err := os.ReadDir(depDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, e := range entries {
|
||||
if strings.HasSuffix(e.Name(), ".tmp") {
|
||||
t.Errorf("temp file left behind: %s", e.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_RaceFreeConcurrentAppends(t *testing.T) {
|
||||
// Concurrent writers across N executions must not corrupt the
|
||||
// per-execution NDJSON files. Each writer appends K lines to its
|
||||
// own execution; we then assert each file has exactly K
|
||||
// well-formed lines and the index reports the right LineCount.
|
||||
st := newTestStore(t)
|
||||
depID := "dep-race"
|
||||
|
||||
const N = 4
|
||||
const K = 100
|
||||
execIDs := make([]string, N)
|
||||
for i := 0; i < N; i++ {
|
||||
jobName := "install-" + string(rune('a'+i))
|
||||
if err := st.UpsertJob(Job{
|
||||
DeploymentID: depID,
|
||||
JobName: jobName,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
exec, err := st.StartExecution(depID, jobName, time.Now())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
execIDs[i] = exec.ID
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < N; i++ {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
for k := 0; k < K; k++ {
|
||||
if err := st.AppendLogLines(depID, execIDs[idx], []LogLine{{
|
||||
Level: LevelInfo,
|
||||
Message: "k=", // tiny payload
|
||||
}}); err != nil {
|
||||
t.Errorf("AppendLogLines: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i := 0; i < N; i++ {
|
||||
page, err := st.PageLogs(depID, execIDs[i], 1, MaxLogPageSize)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if page.Total != K {
|
||||
t.Errorf("exec %d: total want %d, got %d", i, K, page.Total)
|
||||
}
|
||||
if len(page.Lines) != K {
|
||||
t.Errorf("exec %d: lines want %d, got %d", i, K, len(page.Lines))
|
||||
}
|
||||
// LineNumbers must be 1..K monotonic.
|
||||
for j, ll := range page.Lines {
|
||||
if ll.LineNumber != j+1 {
|
||||
t.Errorf("exec %d line %d: LineNumber want %d, got %d", i, j, j+1, ll.LineNumber)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_FindExecutionAcrossDeployments(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
|
||||
for _, depID := range []string{"dep-a", "dep-b", "dep-c"} {
|
||||
if err := st.UpsertJob(Job{DeploymentID: depID, JobName: "install-x"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
exec, err := st.StartExecution("dep-b", "install-x", time.Now())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
got, err := st.FindExecutionAcrossDeployments(exec.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("FindExecutionAcrossDeployments: %v", err)
|
||||
}
|
||||
if got.DeploymentID != "dep-b" {
|
||||
t.Errorf("DeploymentID: want dep-b, got %q", got.DeploymentID)
|
||||
}
|
||||
|
||||
_, err = st.FindExecutionAcrossDeployments("nope")
|
||||
if !errors.Is(err, ErrNotFound) {
|
||||
t.Errorf("want ErrNotFound, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_DeploymentDir_RejectsPathTraversal(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
if err := st.UpsertJob(Job{DeploymentID: "../etc/passwd", JobName: "install-x"}); err == nil {
|
||||
t.Fatal("expected error for path-traversal id")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_LogsForMissingExec(t *testing.T) {
|
||||
st := newTestStore(t)
|
||||
if err := st.UpsertJob(Job{DeploymentID: "d", JobName: "install-x"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err := st.PageLogs("d", "no-such", 1, 10)
|
||||
if !errors.Is(err, ErrNotFound) {
|
||||
t.Fatalf("want ErrNotFound, got %v", err)
|
||||
}
|
||||
}
|
||||
247
products/catalyst/bootstrap/api/internal/jobs/types.go
Normal file
247
products/catalyst/bootstrap/api/internal/jobs/types.go
Normal file
@ -0,0 +1,247 @@
|
||||
// Package jobs implements the Jobs/Executions data model + persistence
|
||||
// the catalyst-api Sovereign Admin surfaces consume (issue #205, sub of
|
||||
// epic #204).
|
||||
//
|
||||
// # Architecture
|
||||
//
|
||||
// Each `bp-<chart>` HelmRelease the Phase-1 helmwatch observes maps 1:1
|
||||
// to a Job (jobName="install-<chart>", appId="<chart>",
|
||||
// batchId="bootstrap-kit"). Each watch attempt the helmwatch emits is
|
||||
// an Execution; LogLines append to the active Execution's NDJSON log
|
||||
// file as the helmwatch goroutine derives them from HelmRelease
|
||||
// status.conditions.
|
||||
//
|
||||
// Two parallel feeds live for the same data:
|
||||
//
|
||||
// - The existing `/api/v1/deployments/{id}/events` SSE feed — kept
|
||||
// untouched, the wizard's live banner reads it.
|
||||
// - The new Jobs/Executions REST surface — the table-view UX (issue
|
||||
// #204) reads it.
|
||||
//
|
||||
// Persistence (per docs/INVIOLABLE-PRINCIPLES.md #4: every path is
|
||||
// runtime-configurable via CATALYST_EXECUTIONS_DIR; default lives on
|
||||
// the same `catalyst-api-deployments` PVC mount the deployments store
|
||||
// uses):
|
||||
//
|
||||
// /var/lib/catalyst/executions/<deploymentId>/index.json — Job +
|
||||
// Execution
|
||||
// metadata,
|
||||
// atomic
|
||||
// write
|
||||
// (temp+
|
||||
// rename).
|
||||
// /var/lib/catalyst/executions/<deploymentId>/<execId>.log — append-
|
||||
// only
|
||||
// NDJSON
|
||||
// (one
|
||||
// LogLine
|
||||
// per
|
||||
// line).
|
||||
//
|
||||
// Per docs/INVIOLABLE-PRINCIPLES.md #10 (credential hygiene) no log
|
||||
// line ever carries a kubeconfig, bearer token, or other secret — the
|
||||
// helmwatch package only emits HelmRelease status messages, which are
|
||||
// public-cluster-state by definition.
|
||||
package jobs
|
||||
|
||||
import "time"
|
||||
|
||||
// Status enums — kept in lockstep with helmwatch.State* via the
|
||||
// translation in helmwatch_bridge.go. The wire contract uses these
|
||||
// strings verbatim (frontend agents code against the spec in #205).
|
||||
const (
|
||||
StatusPending = "pending"
|
||||
StatusRunning = "running"
|
||||
StatusSucceeded = "succeeded"
|
||||
StatusFailed = "failed"
|
||||
)
|
||||
|
||||
// Log levels — the helmwatch bridge maps Helm condition severity onto
|
||||
// these. ERROR for "failed", WARN for "degraded", INFO for everything
|
||||
// else (DEBUG is reserved for future client-instrumented log feeds).
|
||||
const (
|
||||
LevelDebug = "DEBUG"
|
||||
LevelInfo = "INFO"
|
||||
LevelWarn = "WARN"
|
||||
LevelError = "ERROR"
|
||||
)
|
||||
|
||||
// BatchID — the only batch the bootstrap-kit currently emits. Future
|
||||
// batches (Phase-2 component installs, Day-2 Crossplane reconciles)
|
||||
// will introduce additional batch ids; this constant is the canonical
|
||||
// "Phase-1 install" batch tag.
|
||||
const BatchBootstrapKit = "bootstrap-kit"
|
||||
|
||||
// JobNamePrefix — every Phase-1 Job is named "install-<chart>". The
|
||||
// helmwatch bridge derives this from the HelmRelease metadata.name
|
||||
// ("bp-foo" → "install-foo").
|
||||
const JobNamePrefix = "install-"
|
||||
|
||||
// IsTerminal reports whether a status string represents a terminal
|
||||
// state (no further state transitions). The store's running→done
|
||||
// transitions and the "executionFinished" pagination flag both key
|
||||
// off this.
|
||||
func IsTerminal(status string) bool {
|
||||
switch status {
|
||||
case StatusSucceeded, StatusFailed:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Job is the wire-contract Job shape. The store materialises one Job
|
||||
// per `bp-<chart>` HelmRelease; the helmwatch bridge keeps its state
|
||||
// in sync as conditions transition.
|
||||
//
|
||||
// Fields use omitempty for nullable timestamps so the JSON shape the
|
||||
// frontend sees matches the spec verbatim.
|
||||
type Job struct {
|
||||
// ID is the stable identifier "<deploymentId>:<jobName>". It is
|
||||
// the URL-safe id the GET /jobs/{jobId} endpoint accepts.
|
||||
ID string `json:"id"`
|
||||
|
||||
// DeploymentID — the parent deployment the Job belongs to.
|
||||
DeploymentID string `json:"deploymentId"`
|
||||
|
||||
// JobName — "install-<chart>", e.g. "install-cilium".
|
||||
JobName string `json:"jobName"`
|
||||
|
||||
// AppID — the Sovereign component id, e.g. "cilium". Equals
|
||||
// helmwatch.ComponentIDFromHelmRelease(HR.metadata.name).
|
||||
AppID string `json:"appId"`
|
||||
|
||||
// BatchID — currently always BatchBootstrapKit; reserved for
|
||||
// future Day-2 batches.
|
||||
BatchID string `json:"batchId"`
|
||||
|
||||
// DependsOn — list of jobNames this Job depends on. Derived from
|
||||
// the HelmRelease's `spec.dependsOn[*].name` with the bp- prefix
|
||||
// stripped and "install-" prepended (e.g. spec.dependsOn entry
|
||||
// `name: bp-cilium` → DependsOn entry `install-cilium`).
|
||||
DependsOn []string `json:"dependsOn"`
|
||||
|
||||
// Status — pending|running|succeeded|failed. See package consts.
|
||||
Status string `json:"status"`
|
||||
|
||||
// StartedAt — UTC instant the Job first transitioned out of
|
||||
// pending. nil while the Job is still pending.
|
||||
StartedAt *time.Time `json:"startedAt,omitempty"`
|
||||
|
||||
// FinishedAt — UTC instant the Job reached a terminal state. nil
|
||||
// until the Job is succeeded or failed.
|
||||
FinishedAt *time.Time `json:"finishedAt,omitempty"`
|
||||
|
||||
// DurationMs — milliseconds between StartedAt and FinishedAt.
|
||||
// Zero while either is nil.
|
||||
DurationMs int64 `json:"durationMs"`
|
||||
|
||||
// LatestExecutionID — id of the most-recent Execution for this
|
||||
// Job, empty until the first attempt starts. The frontend uses
|
||||
// this to deep-link to the GitLab-style log viewer without
|
||||
// having to load the full Execution list first.
|
||||
LatestExecutionID string `json:"latestExecutionId,omitempty"`
|
||||
}
|
||||
|
||||
// Execution captures one attempt of a Job. The store appends a new
|
||||
// Execution every time the Job transitions back into running from a
|
||||
// terminal state (Day-2 retry flows; Phase-1 installs typically have
|
||||
// exactly one Execution per Job).
|
||||
type Execution struct {
|
||||
// ID — opaque identifier, hex-encoded random bytes. Globally
|
||||
// unique within a deployment.
|
||||
ID string `json:"id"`
|
||||
|
||||
// JobID — parent Job stable id ("<deploymentId>:<jobName>").
|
||||
JobID string `json:"jobId"`
|
||||
|
||||
// DeploymentID — parent deployment id, denormalised so the
|
||||
// /executions/{execId}/logs endpoint can resolve the log file
|
||||
// path without a Job lookup.
|
||||
DeploymentID string `json:"deploymentId"`
|
||||
|
||||
// Status — running|succeeded|failed (pending is reserved for the
|
||||
// Job aggregate; an Execution is always at least running).
|
||||
Status string `json:"status"`
|
||||
|
||||
// StartedAt — UTC instant the Execution attempt began.
|
||||
StartedAt time.Time `json:"startedAt"`
|
||||
|
||||
// FinishedAt — UTC instant the Execution reached terminal. nil
|
||||
// while still running.
|
||||
FinishedAt *time.Time `json:"finishedAt,omitempty"`
|
||||
|
||||
// LineCount — total LogLines appended to this Execution's NDJSON
|
||||
// log file. The /logs endpoint compares fromLine against this to
|
||||
// derive `total` and pagination boundaries.
|
||||
LineCount int `json:"lineCount"`
|
||||
}
|
||||
|
||||
// LogLine is one record in an Execution's append-only NDJSON file.
|
||||
// LineNumber is 1-indexed — the GitLab CI runner viewer the frontend
|
||||
// renders keys off it for the gutter.
|
||||
type LogLine struct {
|
||||
// LineNumber — 1-indexed. The store stamps this on append.
|
||||
LineNumber int `json:"lineNumber"`
|
||||
|
||||
// Timestamp — RFC3339Nano in UTC.
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
|
||||
// Level — INFO|DEBUG|WARN|ERROR. See package consts.
|
||||
Level string `json:"level"`
|
||||
|
||||
// Message — the rendered log line. The frontend strips ANSI on
|
||||
// its own; the store does not parse or re-encode.
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
// Index is the on-disk shape of <depId>/index.json. Holds Job +
|
||||
// Execution metadata; log lines live in the per-execution NDJSON
|
||||
// files. Persisted via atomic temp+rename through Store.persistIndex.
|
||||
type Index struct {
|
||||
// DeploymentID — denormalised so a stray index.json file is
|
||||
// self-describing.
|
||||
DeploymentID string `json:"deploymentId"`
|
||||
|
||||
// Jobs — all Jobs for this deployment, in insertion order. The
|
||||
// API handler sorts on read (started-at DESC, pending last).
|
||||
Jobs []Job `json:"jobs"`
|
||||
|
||||
// Executions — all Executions across all Jobs. The API handler
|
||||
// filters by JobID on the per-job endpoint. Stored flat so a
|
||||
// per-execution mutation only rewrites this slice once, not a
|
||||
// nested per-Job list.
|
||||
Executions []Execution `json:"executions"`
|
||||
}
|
||||
|
||||
// BatchSummary is the wire-contract row for the
|
||||
// /api/v1/deployments/{depId}/jobs/batches endpoint. The handler
|
||||
// computes this on read by aggregating Job statuses keyed by BatchID.
|
||||
type BatchSummary struct {
|
||||
// BatchID — e.g. "bootstrap-kit".
|
||||
BatchID string `json:"batchId"`
|
||||
|
||||
// Total — number of Jobs in this batch.
|
||||
Total int `json:"total"`
|
||||
|
||||
// Finished — number of Jobs in a terminal state (succeeded |
|
||||
// failed). Equals Succeeded + Failed.
|
||||
Finished int `json:"finished"`
|
||||
|
||||
// Succeeded — number of Jobs with Status=succeeded.
|
||||
Succeeded int `json:"succeeded"`
|
||||
|
||||
// Failed — number of Jobs with Status=failed.
|
||||
Failed int `json:"failed"`
|
||||
|
||||
// Running — number of Jobs with Status=running.
|
||||
Running int `json:"running"`
|
||||
|
||||
// Pending — number of Jobs with Status=pending.
|
||||
Pending int `json:"pending"`
|
||||
}
|
||||
|
||||
// JobID — synthesises the stable per-deployment Job id. Exported so
|
||||
// the helmwatch bridge AND the API handler agree on the format.
|
||||
func JobID(deploymentID, jobName string) string {
|
||||
return deploymentID + ":" + jobName
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user