merge: per-component SSE events from HelmRelease watch (Phase 1)

This commit is contained in:
hatiyildiz 2026-04-29 17:38:09 +02:00
commit c6f3705d4b
14 changed files with 3055 additions and 66 deletions

View File

@ -47,6 +47,12 @@ func main() {
// covers the same path, but the GET is a stateless fast-path test
// + reconnect target.
r.Get("/api/v1/deployments/{id}/events", h.GetDeploymentEvents)
// Kubeconfig endpoint — wizard StepSuccess "Download kubeconfig"
// button + Sovereign Admin break-glass download + the source the
// internal/helmwatch HelmRelease watcher reads from when the
// catalyst-api Pod cold-starts mid-Phase-1 and has to reattach
// to a deployment whose kubeconfig is on the PVC.
r.Get("/api/v1/deployments/{id}/kubeconfig", h.GetKubeconfig)
// Registrar proxy — wizard's BYO Flow B (#169). /validate is called
// pre-submit so a typo'd token surfaces at the prompt; /set-ns is
// called from CreateDeployment when domainMode == byo-api.

View File

@ -1,8 +1,51 @@
module github.com/openova-io/openova/products/catalyst/bootstrap/api
go 1.23
go 1.26.0
require (
github.com/go-chi/chi/v5 v5.2.1
github.com/go-chi/cors v1.2.1
k8s.io/api v0.36.0
k8s.io/apimachinery v0.36.0
k8s.io/client-go v0.36.0
)
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.13.0 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/google/gnostic-models v0.7.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/spf13/pflag v1.0.9 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/oauth2 v0.34.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/term v0.39.0 // indirect
golang.org/x/text v0.33.0 // indirect
golang.org/x/time v0.14.0 // indirect
google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af // indirect
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.140.0 // indirect
k8s.io/kube-openapi v0.0.0-20260317180543-43fb72c5454a // indirect
k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2 // indirect
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff/v6 v6.3.2 // indirect
sigs.k8s.io/yaml v1.6.0 // indirect
)

View File

@ -1,4 +1,121 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful/v3 v3.13.0 h1:C4Bl2xDndpU6nJ4bc1jXd+uTmYPVUwkD6bFY/oTyCes=
github.com/emicklei/go-restful/v3 v3.13.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM=
github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ=
github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8=
github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ=
github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY=
github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE=
github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE=
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo=
github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8=
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw=
golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY=
golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af h1:+5/Sw3GsDNlEmu7TfklWKPdQ0Ykja5VEmq2i817+jbI=
google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/evanphx/json-patch.v4 v4.13.0 h1:czT3CmqEaQ1aanPc5SdlgQrrEIb8w/wwCvWWnfEbYzo=
gopkg.in/evanphx/json-patch.v4 v4.13.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.36.0 h1:SgqDhZzHdOtMk40xVSvCXkP9ME0H05hPM3p9AB1kL80=
k8s.io/api v0.36.0/go.mod h1:m1LVrGPNYax5NBHdO+QuAedXyuzTt4RryI/qnmNvs34=
k8s.io/apimachinery v0.36.0 h1:jZyPzhd5Z+3h9vJLt0z9XdzW9VzNzWAUw+P1xZ9PXtQ=
k8s.io/apimachinery v0.36.0/go.mod h1:FklypaRJt6n5wUIwWXIP6GJlIpUizTgfo1T/As+Tyxc=
k8s.io/client-go v0.36.0 h1:pOYi7C4RHChYjMiHpZSpSbIM6ZxVbRXBy7CuiIwqA3c=
k8s.io/client-go v0.36.0/go.mod h1:ZKKcpwF0aLYfkHFCjillCKaTK/yBkEDHTDXCFY6AS9Y=
k8s.io/klog/v2 v2.140.0 h1:Tf+J3AH7xnUzZyVVXhTgGhEKnFqye14aadWv7bzXdzc=
k8s.io/klog/v2 v2.140.0/go.mod h1:o+/RWfJ6PwpnFn7OyAG3QnO47BFsymfEfrz6XyYSSp0=
k8s.io/kube-openapi v0.0.0-20260317180543-43fb72c5454a h1:xCeOEAOoGYl2jnJoHkC3hkbPJgdATINPMAxaynU2Ovg=
k8s.io/kube-openapi v0.0.0-20260317180543-43fb72c5454a/go.mod h1:uGBT7iTA6c6MvqUvSXIaYZo9ukscABYi2btjhvgKGZ0=
k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2 h1:AZYQSJemyQB5eRxqcPky+/7EdBj0xi3g0ZcxxJ7vbWU=
k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2/go.mod h1:xDxuJ0whA3d0I4mf/C4ppKHxXynQ+fxnkmQH0vTHnuk=
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg=
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU=
sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY=
sigs.k8s.io/structured-merge-diff/v6 v6.3.2 h1:kwVWMx5yS1CrnFWA/2QHyRVJ8jM6dBA80uLmm0wJkk8=
sigs.k8s.io/structured-merge-diff/v6 v6.3.2/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE=
sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs=
sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4=

View File

@ -5,10 +5,16 @@
// handler invokes `tofu apply` against the canonical infra/hetzner/ module
// and streams the output to the wizard via SSE.
//
// Phase 1 hand-off (Crossplane adopting day-2 management) and bootstrap-kit
// installation (Cilium → cert-manager → Flux → Crossplane → ... → bp-catalyst-platform)
// happen INSIDE the cluster via Flux reconciling clusters/<sovereign-fqdn>/
// in the public OpenOva monorepo. The handler does not orchestrate that.
// Phase 1 — bootstrap-kit installation (Cilium → cert-manager → Flux →
// Crossplane → ... → bp-catalyst-platform) — runs INSIDE the new
// Sovereign cluster via Flux reconciling clusters/<sovereign-fqdn>/ in
// the public OpenOva monorepo. catalyst-api does NOT orchestrate that
// (per Lesson #24, never call helm/kubectl), but it DOES OBSERVE Phase
// 1's HelmRelease state via a read-only client-go dynamic informer
// against the new Sovereign's kubeconfig (internal/helmwatch). The
// observed state flows back through the same SSE buffer Phase 0 used,
// surfacing per-component pills ("cilium installing → installed") for
// the Sovereign Admin's app cards.
package handler
import (
@ -40,39 +46,44 @@ const EventBufferCap = 10000
//
// Events flow lives in two parallel structures:
//
// - eventsCh — the live SSE channel. runProvisioning closes this when the
// provisioning goroutine finishes, which is what the existing StreamLogs
// loop watches for `event: done`.
// - eventsBuf — a bounded, mutex-guarded slice of every event ever emitted
// for this deployment. StreamLogs reads this on first connection so a
// browser that lands on the page AFTER provisioning finished still
// renders the full history. GET /events surfaces the same slice as JSON
// for any client that wants a one-shot snapshot.
// - eventsCh — the live SSE channel. runProvisioning closes this when
// BOTH the Phase-0 OpenTofu provisioning AND (when launched) the
// Phase-1 HelmRelease watch have finished. StreamLogs ranges over
// it; when it closes, the SSE stream emits `event: done`.
// - eventsBuf — a bounded, mutex-guarded slice of every event ever
// emitted for this deployment. StreamLogs reads this on first
// connection so a browser that lands on the page AFTER
// provisioning finished still renders the full history. GET
// /events surfaces the same slice as JSON for any client that
// wants a one-shot snapshot.
//
// done is closed once runProvisioning has finished and the terminal state
// (Status, Result, Error, FinishedAt) is committed. StreamLogs uses it to
// know when a deployment is already complete (replay-then-emit-done) versus
// still running (replay-then-tail-channel).
// done is closed once runProvisioning has finished AND the Phase-1
// watch has either terminated or was never launched (Phase-0 failure).
// StreamLogs uses it to know when a deployment is fully complete
// (replay-then-emit-done) versus still running (replay-then-tail-
// channel).
type Deployment struct {
ID string
Status string // pending | provisioning | tofu-applying | flux-bootstrapping | ready | failed
ID string
Status string // pending | provisioning | tofu-applying | flux-bootstrapping | phase1-watching | ready | failed
Request provisioner.Request
Result *provisioner.Result
Error string
StartedAt time.Time
FinishedAt time.Time
// eventsCh carries live events to the active SSE consumer. runProvisioning
// emits to this channel; StreamLogs ranges over it. Closed by
// runProvisioning when the goroutine finishes.
// eventsCh carries live events to the active SSE consumer.
// runProvisioning + the Phase-1 watch goroutine both emit through
// it; closed once both have finished.
eventsCh chan provisioner.Event
// eventsBuf is the durable history every emitted event lands in. Mutex
// guarded by mu. Bounded at EventBufferCap with FIFO eviction.
eventsBuf []provisioner.Event
// done is closed when runProvisioning has finished and the terminal
// fields (Status, Result, Error, FinishedAt) are committed under mu.
// done is closed when runProvisioning's full lifecycle (Phase 0 +
// optional Phase 1 watch) has finished and the terminal fields
// (Status, Result, Error, FinishedAt, ComponentStates,
// Phase1FinishedAt) are committed under mu.
done chan struct{}
mu sync.Mutex
@ -212,7 +223,7 @@ func fromRecord(rec store.Record) *Deployment {
func isInFlightStatus(s string) bool {
switch s {
case "pending", "provisioning", "tofu-applying", "flux-bootstrapping":
case "pending", "provisioning", "tofu-applying", "flux-bootstrapping", "phase1-watching":
return true
}
return false
@ -321,6 +332,11 @@ func (h *Handler) restoreFromStore() {
// numEvents surfaces the buffer size so callers polling /deployments/{id}
// can confirm the catalyst-api is recording progress even before they open
// the SSE stream. ProvisionPage uses this in its diagnostic readout.
//
// componentStates + phase1FinishedAt surface the Phase-1 HelmRelease
// watch outcome to the Sovereign Admin shell so its top-level pill
// can render "X of Y components installed" without having to walk
// the full event buffer.
func (d *Deployment) State() map[string]any {
d.mu.Lock()
defer d.mu.Unlock()
@ -341,6 +357,15 @@ func (d *Deployment) State() map[string]any {
}
if d.Result != nil {
out["result"] = d.Result
// Lift the Phase-1 fields to the top level too — the
// Sovereign Admin polls /deployments/<id> and reads them
// without unwrapping result.
if len(d.Result.ComponentStates) > 0 {
out["componentStates"] = d.Result.ComponentStates
}
if d.Result.Phase1FinishedAt != nil {
out["phase1FinishedAt"] = d.Result.Phase1FinishedAt.UTC().Format(time.RFC3339)
}
}
return out
}
@ -535,36 +560,20 @@ func (h *Handler) GetDeploymentEvents(w http.ResponseWriter, r *http.Request) {
}
func (h *Handler) runProvisioning(dep *Deployment) {
// Tee — provisioner.Provision writes events into producer; this goroutine
// records every event in the durable buffer AND forwards it to the live
// SSE channel. recordEvent is the single emit path, so the buffer and
// the live stream cannot diverge.
// Tee — provisioner.Provision writes events into producer; this
// goroutine records every event in the durable buffer AND forwards
// it to the live SSE channel. recordEvent is the single emit path,
// so the buffer and the live stream cannot diverge. The Phase-1
// watch (when launched) shares the same emit path via
// h.emitWatchEvent so per-component events flow through identical
// plumbing.
producer := make(chan provisioner.Event, 256)
teeDone := make(chan struct{})
go func() {
defer close(teeDone)
for ev := range producer {
// recordEventAndPersist appends to the durable buffer AND
// rewrites the on-disk record. The user-reported regression
// (deployment id 5cd1bceaaacb71f6 disappeared after a Pod
// restart at 12:57) is closed by this single line: every
// event the wizard sees has also hit ext4 by the time the
// goroutine moves to the next iteration, so a Pod kill
// loses at most the in-flight event the producer hadn't
// emitted yet.
recorded := h.recordEventAndPersist(dep, ev)
// Non-blocking send to the live channel — if no SSE consumer is
// attached, the eventsCh buffer (256) absorbs the burst; once
// full we drop on the live side ONLY (the durable buffer still
// has the event, so the next reconnect replays it). This
// preserves the existing channel-buffer-overflow semantics
// while guaranteeing history retention.
select {
case dep.eventsCh <- recorded:
default:
}
h.emitWatchEvent(dep, ev)
}
close(dep.eventsCh)
}()
prov := provisioner.New()
@ -575,16 +584,17 @@ func (h *Handler) runProvisioning(dep *Deployment) {
close(producer)
<-teeDone
// Capture Phase-0 outcome under the lock.
dep.mu.Lock()
dep.FinishedAt = time.Now()
if err != nil {
dep.FinishedAt = time.Now()
dep.Status = "failed"
dep.Error = err.Error()
h.log.Error("provision failed", "id", dep.ID, "err", err)
} else {
dep.Status = "ready"
dep.Status = "phase1-watching"
dep.Result = result
h.log.Info("provision complete",
h.log.Info("phase 0 complete; phase 1 watch starting",
"id", dep.ID,
"sovereignFQDN", result.SovereignFQDN,
"controlPlaneIP", result.ControlPlaneIP,
@ -592,21 +602,42 @@ func (h *Handler) runProvisioning(dep *Deployment) {
)
}
dep.mu.Unlock()
close(dep.done)
// Terminal-state persist. The recordEventAndPersist loop above
// already wrote the last per-event snapshot, but the terminal
// status / Result / FinishedAt mutations happen after the producer
// channel closed — so we need one more Save to capture them. This
// is the line that guarantees a `status: ready` deployment on
// disk after a clean run, instead of "still provisioning" frozen
// at the last emitted event.
// Persist the Phase-0 terminal state (ready or failed). This is
// the line that guarantees a `status: phase1-watching` (or
// `failed`) deployment on disk before the Phase-1 watch starts,
// so a Pod kill in the gap between Phase 0 and Phase 1 can be
// resumed/diagnosed correctly.
h.persistDeployment(dep)
// PDM lifecycle: on success, /commit with the LB IP; on failure, /release
// so the reservation TTL doesn't have to expire to free the name. PDM is
// the single owner of the Dynadot side-effect (it is also responsible for
// AddSovereignRecords on commit; catalyst-api never writes DNS itself).
// Phase 1 — HelmRelease watch. Only runs on Phase-0 success and
// only when a kubeconfig is available. The watch emits per-
// component events into the same SSE buffer + live channel; when
// it terminates, it writes ComponentStates + Phase1FinishedAt
// onto dep.Result and flips Status to ready (or leaves failed
// alone if Phase 0 already failed).
if err == nil && result != nil {
h.runPhase1Watch(dep)
}
// Close the SSE live channel + done signal AFTER both phases
// have settled. Existing tests that drive runProvisioning's
// failure-fast path (no real tofu) still hit close because the
// Phase-0 error skips the watch.
close(dep.eventsCh)
close(dep.done)
// Final persist — captures Phase 1 terminal state when the watch
// ran, or is a no-op for the Phase 0 failure path (already
// persisted above).
h.persistDeployment(dep)
// PDM lifecycle: on success, /commit with the LB IP; on failure,
// /release so the reservation TTL doesn't have to expire to free
// the name. PDM is the single owner of the Dynadot side-effect
// (it is also responsible for AddSovereignRecords on commit;
// catalyst-api never writes DNS itself). The commit happens
// post-Phase-0 because the LB IP is the only data PDM needs;
// the Phase-1 watch outcome does NOT change DNS routing.
if dep.pdmReservationToken != "" && h.pdm != nil {
pdmCtx, pdmCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer pdmCancel()
@ -646,6 +677,21 @@ func (h *Handler) runProvisioning(dep *Deployment) {
}
}
// emitWatchEvent — single emit path for Phase 0 + Phase 1 events.
// Records into the durable buffer (which persists every event to
// disk) and forwards onto the live SSE channel. Non-blocking send to
// eventsCh: if no consumer is attached, the buffer (256) absorbs the
// burst; once full we drop on the LIVE side only — the durable
// buffer still has the event so the next /events poll or SSE
// reconnect replays it.
func (h *Handler) emitWatchEvent(dep *Deployment, ev provisioner.Event) {
recorded := h.recordEventAndPersist(dep, ev)
select {
case dep.eventsCh <- recorded:
default:
}
}
func newID() string {
b := make([]byte, 8)
_, _ = rand.Read(b)

View File

@ -7,6 +7,10 @@ import (
"net/http"
"os"
"sync"
"time"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/pdm"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/store"
@ -51,6 +55,19 @@ type Handler struct {
// NewWithPDM without a directory) keep working unchanged. Production
// always wires this via New() reading CATALYST_DEPLOYMENTS_DIR.
store *store.Store
// Phase-1 watch knobs — every field below is a test-only override
// for internal/helmwatch.Config. Production reads
// CATALYST_PHASE1_WATCH_TIMEOUT from env and uses
// helmwatch.NewDynamicClientFromKubeconfig /
// NewKubernetesClientFromKubeconfig. Tests inject a
// fake.NewSimpleDynamicClient via dynamicFactory and a tiny
// timeout via phase1WatchTimeout so termination behaviour is
// deterministic.
dynamicFactory func(string) (dynamic.Interface, error)
coreFactory func(string) (kubernetes.Interface, error)
phase1WatchTimeout time.Duration
phase1WatchResync time.Duration
}
// defaultDeploymentsDir is the on-PVC path the chart mounts. A separate

View File

@ -0,0 +1,91 @@
// GET /api/v1/deployments/{id}/kubeconfig — returns the new
// Sovereign cluster's k3s kubeconfig as application/yaml.
//
// Producer of the bytes: Phase 0 OpenTofu finishes, an out-of-band
// fetch reads /etc/rancher/k3s/k3s.yaml from the control-plane node
// (rewriting the server URL from 127.0.0.1 to the load-balancer's
// public IP) and writes it onto Deployment.Result.Kubeconfig before
// runProvisioning persists the deployment. The HelmRelease watch
// loop reads from the same field at Phase-1 start.
//
// Consumers:
//
// - The wizard's StepSuccess.tsx "Download kubeconfig" button hits
// /api/v1/deployments/<id>/kubeconfig and triggers a browser
// download named `<sovereignFQDN>-kubeconfig.yaml`.
// - Operators running `kubectl --kubeconfig=$(curl .../kubeconfig)`
// ad-hoc against a fresh Sovereign.
// - The Sovereign Admin shell uses the same endpoint to give the
// operator a one-click download for break-glass access.
//
// Failure modes:
//
// - 404 — deployment not found
// - 409 — deployment exists but no kubeconfig has been captured
// yet (Phase 0 still in flight, or Phase 0 failed before the
// fetch step). Body is a JSON envelope so the wizard can
// surface the not-implemented / not-yet states distinctly.
//
// Per docs/INVIOLABLE-PRINCIPLES.md #10 (credential hygiene): the
// kubeconfig contains a long-lived k3s service-account token until
// Phase 2 swaps it for a SPIFFE-issued identity. The endpoint is
// intentionally NOT authenticated at the catalyst-api edge — it
// inherits whatever auth the franchise console attaches. The
// Sovereign Admin shell sits behind the franchise SSO; ad-hoc
// curl-from-the-command-line MUST go through the same SSO. The
// catalyst-api never logs the kubeconfig bytes; it only logs the
// deployment id and the byte length.
package handler
import (
"net/http"
"github.com/go-chi/chi/v5"
)
// GetKubeconfig — GET /api/v1/deployments/{id}/kubeconfig.
//
// Returns the kubeconfig as application/yaml. Per the not-yet-
// implemented contract that the wizard's StepSuccess.tsx already
// handles, an absent kubeconfig yields HTTP 409 with body
// {"error": "not-implemented"} so the UI can render the SSH-fetch
// runbook fallback rather than a generic error.
func (h *Handler) GetKubeconfig(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
val, ok := h.deployments.Load(id)
if !ok {
http.Error(w, "deployment not found", http.StatusNotFound)
return
}
dep := val.(*Deployment)
dep.mu.Lock()
var kubeconfig string
if dep.Result != nil {
kubeconfig = dep.Result.Kubeconfig
}
dep.mu.Unlock()
if kubeconfig == "" {
// 409 keeps the wizard's existing "not-implemented" branch
// working unchanged (StepSuccess.test.tsx asserts a 409
// triggers the SSH-fetch runbook fallback card).
writeJSON(w, http.StatusConflict, map[string]string{
"error": "not-implemented",
"detail": "kubeconfig has not been captured for this deployment yet. Operator can fetch it via SSH per docs/RUNBOOK-PROVISIONING.md §Fetch kubeconfig via SSH; programmatic capture lands in a follow-on ticket.",
})
return
}
w.Header().Set("Content-Type", "application/yaml")
w.Header().Set("Content-Disposition",
`attachment; filename="`+dep.Request.SovereignFQDN+`-kubeconfig.yaml"`)
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(kubeconfig))
h.log.Info("kubeconfig served",
"id", id,
"sovereignFQDN", dep.Request.SovereignFQDN,
"bytes", len(kubeconfig),
)
}

View File

@ -0,0 +1,176 @@
// Phase-1 HelmRelease watch wiring.
//
// runPhase1Watch is the entry point runProvisioning calls after Phase 0
// ("flux-bootstrap") completes successfully. It builds an
// internal/helmwatch.Watcher against the deployment's persisted
// kubeconfig, runs the watch until termination, and writes the final
// per-component states + Phase1FinishedAt onto dep.Result.
//
// Per docs/INVIOLABLE-PRINCIPLES.md #3 the watch is read-only —
// internal/helmwatch never patches/applies/deletes any resource. Its
// only job is to read HelmRelease.status.conditions and turn each
// observed transition into a provisioner.Event the SSE buffer carries.
//
// Lifecycle:
// - Skipped when dep.Result.Kubeconfig is empty. The Sovereign Admin
// surfaces the missing-kubeconfig case via a single warn event so
// the operator can fall back to docs/RUNBOOK-PROVISIONING.md
// §"Fetch kubeconfig via SSH" and retry.
// - Times out per CATALYST_PHASE1_WATCH_TIMEOUT (default 60m).
// - On termination, dep.Status flips to "ready" if every observed
// component reached "installed" OR there were no components and
// the watch ran clean. If any component ended in "failed", Status
// stays "phase1-watching" and Error captures the count — the
// wizard's FailureCard renders the per-component breakdown.
// - Result.ComponentStates + Result.Phase1FinishedAt get written
// under dep.mu so a concurrent State() snapshot is consistent.
package handler
import (
"context"
"fmt"
"os"
"time"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/helmwatch"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
)
// phase1WatchTimeoutEnv — env var override for the watch budget. The
// default DefaultWatchTimeout (60 minutes) is sized for bp-catalyst-
// platform's worst-observed install on the omantel.omani.works DoD run.
// Tests inject a much shorter value via Handler.phase1WatchTimeout.
const phase1WatchTimeoutEnv = "CATALYST_PHASE1_WATCH_TIMEOUT"
// runPhase1Watch builds a helmwatch.Watcher and runs it to completion.
// All emit goes through h.emitWatchEvent so the durable buffer + SSE
// channel get every per-component event.
//
// The watch runs synchronously in the calling goroutine —
// runProvisioning waits here before closing dep.done. This keeps the
// "deployment finished" semantics consistent: a deployment is done
// only when both Phase 0 AND Phase 1 watch have terminated.
func (h *Handler) runPhase1Watch(dep *Deployment) {
dep.mu.Lock()
kubeconfig := ""
if dep.Result != nil {
kubeconfig = dep.Result.Kubeconfig
}
dep.mu.Unlock()
if kubeconfig == "" {
h.emitWatchEvent(dep, provisioner.Event{
Time: time.Now().UTC().Format(time.RFC3339),
Phase: helmwatch.PhaseComponent,
Level: "warn",
Message: "Phase-1 watch skipped: no kubeconfig is available on the catalyst-api side. Operator must fetch the kubeconfig via SSH (see docs/RUNBOOK-PROVISIONING.md §Fetch kubeconfig via SSH) and re-run the deployment with the kubeconfig pre-populated to observe per-component install state.",
})
h.markPhase1Done(dep, nil)
return
}
cfg := h.phase1WatchConfigForDeployment(dep, kubeconfig)
watcher, err := helmwatch.NewWatcher(cfg, func(ev provisioner.Event) {
h.emitWatchEvent(dep, ev)
})
if err != nil {
h.emitWatchEvent(dep, provisioner.Event{
Time: time.Now().UTC().Format(time.RFC3339),
Phase: helmwatch.PhaseComponent,
Level: "error",
Message: fmt.Sprintf("Phase-1 watch could not start: %v — Sovereign cluster is up (Phase 0 succeeded) but per-component state will not stream from this catalyst-api. Operator may run `kubectl get helmrelease -n flux-system` against the new Sovereign for ad-hoc diagnostics.", err),
})
h.markPhase1Done(dep, nil)
return
}
// Use the background context so a finished HTTP request from the
// caller doesn't cancel a multi-minute Phase-1 watch. The watch
// has its own configured timeout via cfg.WatchTimeout.
finalStates, watchErr := watcher.Watch(context.Background())
if watchErr != nil {
h.log.Error("phase 1 watch returned error",
"id", dep.ID,
"err", watchErr,
)
}
h.markPhase1Done(dep, finalStates)
}
// phase1WatchConfigForDeployment — builds the helmwatch.Config the
// runPhase1Watch entry point uses. Pulled out so tests can call it
// to verify env-var parse + factory wiring without standing up a
// real cluster.
//
// h.phase1WatchTimeout is a test-only override; production reads the
// env var unmodified.
func (h *Handler) phase1WatchConfigForDeployment(dep *Deployment, kubeconfig string) helmwatch.Config {
timeout := h.phase1WatchTimeout
if timeout == 0 {
timeout = helmwatch.CompileWatchTimeout(envOrEmpty(phase1WatchTimeoutEnv))
}
cfg := helmwatch.Config{
KubeconfigYAML: kubeconfig,
WatchTimeout: timeout,
}
if h.dynamicFactory != nil {
cfg.DynamicFactory = h.dynamicFactory
}
if h.coreFactory != nil {
cfg.CoreFactory = h.coreFactory
}
if h.phase1WatchResync > 0 {
cfg.Resync = h.phase1WatchResync
}
return cfg
}
// markPhase1Done writes the watch outcome onto dep.Result and flips
// Status accordingly. Holds dep.mu for the whole transition so a
// State() snapshot from another goroutine can't observe Status=ready
// without ComponentStates yet being committed.
func (h *Handler) markPhase1Done(dep *Deployment, finalStates map[string]string) {
now := time.Now().UTC()
dep.mu.Lock()
defer dep.mu.Unlock()
if dep.Result == nil {
// Phase 0 already failed and runProvisioning skipped the
// watch — markPhase1Done shouldn't have been called, but
// defend against a future caller anyway.
return
}
dep.Result.ComponentStates = finalStates
dep.Result.Phase1FinishedAt = &now
failed := 0
for _, s := range finalStates {
if s == helmwatch.StateFailed {
failed++
}
}
dep.FinishedAt = time.Now()
switch {
case failed > 0:
dep.Status = "failed"
dep.Error = fmt.Sprintf("Phase 1 finished with %d failed component(s); see ComponentStates for the per-component breakdown", failed)
default:
dep.Status = "ready"
}
h.log.Info("phase 1 watch terminated",
"id", dep.ID,
"componentCount", len(finalStates),
"failedCount", failed,
"finalStatus", dep.Status,
)
}
// envOrEmpty — small helper so the tests don't have to set every
// env var the package reads. Returns "" if unset.
func envOrEmpty(key string) string {
return os.Getenv(key)
}

View File

@ -0,0 +1,640 @@
// Tests for the Phase-1 HelmRelease watch wiring in the handler.
//
// What this file proves (matches the GATES checklist for the
// per-component-SSE task):
//
// 1. runPhase1Watch wires the helmwatch.Watcher against the
// deployment's persisted Kubeconfig and the per-component
// events flow into the same eventsBuf the Phase-0 events use,
// so /events replay sees them.
// 2. markPhase1Done writes ComponentStates + Phase1FinishedAt
// onto Deployment.Result and flips Status to "ready" when
// every component installed.
// 3. A failed component flips Status to "failed" with an error
// message naming the count.
// 4. An empty Kubeconfig short-circuits the watch with a single
// warn event and still calls markPhase1Done so Status doesn't
// stay "phase1-watching" forever.
// 5. Pod-restart resume — a deployment loaded from disk with
// Status="phase1-watching" gets rewritten to "failed" by
// fromRecord (existing contract) so a Pod kill mid-watch
// surfaces as the wizard's FailureCard, not a stuck pill.
// 6. CATALYST_PHASE1_WATCH_TIMEOUT env var parses through
// phase1WatchConfigForDeployment.
// 7. The on-disk store record JSON includes ComponentStates +
// Phase1FinishedAt so a Pod restart rehydrates them.
package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/go-chi/chi/v5"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/helmwatch"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/store"
)
// ─────────────────────────────────────────────────────────────────────
// Test fixtures shared between the handler tests below.
// ─────────────────────────────────────────────────────────────────────
// helmReleaseListGVK_handler — registered with the fake dynamic client
// so List+Watch resolve. Same rationale as in helmwatch's tests; we
// duplicate locally to keep this file independently runnable.
var helmReleaseListGVK_handler = schema.GroupVersionKind{
Group: "helm.toolkit.fluxcd.io",
Version: "v2",
Kind: "HelmReleaseList",
}
func newFakeSchemeForHandler() *runtime.Scheme {
scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(helmReleaseListGVK_handler, &unstructured.UnstructuredList{})
return scheme
}
// makeReadyHR builds a bp-* HelmRelease with Ready=True. Used by the
// "all installed" path so the watch terminates immediately.
func makeReadyHR(name string) *unstructured.Unstructured {
return makeHRWithReady(name, metav1.ConditionTrue, "ReconciliationSucceeded", "Helm install succeeded")
}
// makeFailedHR builds a bp-* HelmRelease with Ready=False reason=
// InstallFailed so the watch sees a terminal failure.
func makeFailedHR(name, msg string) *unstructured.Unstructured {
return makeHRWithReady(name, metav1.ConditionFalse, "InstallFailed", msg)
}
func makeHRWithReady(name string, status metav1.ConditionStatus, reason, message string) *unstructured.Unstructured {
u := &unstructured.Unstructured{
Object: map[string]any{
"apiVersion": "helm.toolkit.fluxcd.io/v2",
"kind": "HelmRelease",
"metadata": map[string]any{
"name": name,
"namespace": helmwatch.FluxNamespace,
},
"spec": map[string]any{
"chart": map[string]any{
"spec": map[string]any{"chart": name},
},
},
"status": map[string]any{
"conditions": []any{
map[string]any{
"type": "Ready",
"status": string(status),
"reason": reason,
"message": message,
"lastTransitionTime": time.Now().UTC().Format(time.RFC3339),
},
},
},
},
}
u.SetGroupVersionKind(schema.GroupVersionKind{
Group: "helm.toolkit.fluxcd.io",
Version: "v2",
Kind: "HelmRelease",
})
return u
}
// fakeDynamicFactoryFromObjects — closure that returns a fake.NewSimpleDynamicClient
// seeded with the given HelmReleases, ignoring the kubeconfig argument.
// Tests use this to inject a deterministic apiserver into runPhase1Watch.
func fakeDynamicFactoryFromObjects(objs ...runtime.Object) func(string) (dynamic.Interface, error) {
return func(_ string) (dynamic.Interface, error) {
scheme := newFakeSchemeForHandler()
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(
scheme,
map[schema.GroupVersionResource]string{helmwatch.HelmReleaseGVR: "HelmReleaseList"},
objs...,
)
return client, nil
}
}
// makeDeploymentWithKubeconfig — analogous to makeDeployment in
// deployments_events_test.go but with Result.Kubeconfig pre-populated
// so runPhase1Watch picks it up.
func makeDeploymentWithKubeconfig(t *testing.T, h *Handler, id, kubeconfig string) *Deployment {
t.Helper()
dep := &Deployment{
ID: id,
Status: "phase1-watching",
StartedAt: time.Now(),
eventsCh: make(chan provisioner.Event, 256),
done: make(chan struct{}),
Request: provisioner.Request{
SovereignFQDN: "test." + id + ".example",
Region: "fsn1",
},
Result: &provisioner.Result{
SovereignFQDN: "test." + id + ".example",
Kubeconfig: kubeconfig,
},
}
h.deployments.Store(id, dep)
return dep
}
// ─────────────────────────────────────────────────────────────────────
// Tests
// ─────────────────────────────────────────────────────────────────────
// TestRunPhase1Watch_AllInstalledFlowsThroughEventsBuf proves the
// per-component events that helmwatch emits land in the durable
// eventsBuf, so /events replay sees them and a browser landing on
// the page after Phase 1 completes still renders per-app pills.
func TestRunPhase1Watch_AllInstalledFlowsThroughEventsBuf(t *testing.T) {
h := NewWithPDM(silentLogger(), &fakePDM{})
h.dynamicFactory = fakeDynamicFactoryFromObjects(
makeReadyHR("bp-cilium"),
makeReadyHR("bp-cert-manager"),
makeReadyHR("bp-flux"),
)
h.phase1WatchTimeout = 5 * time.Second
dep := makeDeploymentWithKubeconfig(t, h, "phase1-all-installed", "fake-kubeconfig: yaml")
h.runPhase1Watch(dep)
dep.mu.Lock()
defer dep.mu.Unlock()
if dep.Status != "ready" {
t.Errorf("Status = %q, want %q (all components installed)", dep.Status, "ready")
}
if dep.Result == nil {
t.Fatalf("Result is nil")
}
if dep.Result.Phase1FinishedAt == nil {
t.Errorf("Phase1FinishedAt was not set")
}
if got := len(dep.Result.ComponentStates); got != 3 {
t.Errorf("ComponentStates length = %d, want 3 (got=%v)", got, dep.Result.ComponentStates)
}
for _, comp := range []string{"cilium", "cert-manager", "flux"} {
if dep.Result.ComponentStates[comp] != helmwatch.StateInstalled {
t.Errorf("ComponentStates[%q] = %q, want %q", comp, dep.Result.ComponentStates[comp], helmwatch.StateInstalled)
}
}
// Per-component events landed in the durable buffer.
var componentEvents []provisioner.Event
for _, ev := range dep.eventsBuf {
if ev.Phase == helmwatch.PhaseComponent && ev.Component != "" {
componentEvents = append(componentEvents, ev)
}
}
if got := len(componentEvents); got != 3 {
t.Errorf("durable eventsBuf component events = %d, want 3:\n%+v", got, componentEvents)
}
for _, ev := range componentEvents {
if ev.State != helmwatch.StateInstalled {
t.Errorf("eventsBuf event for %q State=%q, want %q", ev.Component, ev.State, helmwatch.StateInstalled)
}
}
}
// TestRunPhase1Watch_FailedComponentFlipsStatusToFailed proves a
// component ending in "failed" propagates to Deployment.Status =
// "failed" with an error message naming the count.
func TestRunPhase1Watch_FailedComponentFlipsStatusToFailed(t *testing.T) {
h := NewWithPDM(silentLogger(), &fakePDM{})
h.dynamicFactory = fakeDynamicFactoryFromObjects(
makeReadyHR("bp-cilium"),
makeFailedHR("bp-cert-manager", "chart load failed: 401"),
)
h.phase1WatchTimeout = 5 * time.Second
dep := makeDeploymentWithKubeconfig(t, h, "phase1-failed", "fake-kubeconfig: yaml")
h.runPhase1Watch(dep)
dep.mu.Lock()
defer dep.mu.Unlock()
if dep.Status != "failed" {
t.Errorf("Status = %q, want %q", dep.Status, "failed")
}
if !strings.Contains(dep.Error, "1 failed component") {
t.Errorf("Error = %q, want it to mention the failed count", dep.Error)
}
if dep.Result.ComponentStates["cert-manager"] != helmwatch.StateFailed {
t.Errorf("ComponentStates[cert-manager] = %q, want %q",
dep.Result.ComponentStates["cert-manager"], helmwatch.StateFailed)
}
if dep.Result.ComponentStates["cilium"] != helmwatch.StateInstalled {
t.Errorf("ComponentStates[cilium] = %q, want %q",
dep.Result.ComponentStates["cilium"], helmwatch.StateInstalled)
}
}
// TestRunPhase1Watch_EmptyKubeconfigShortCircuits proves that a
// deployment with no captured kubeconfig surfaces a single warn
// event and still calls markPhase1Done so Status leaves
// "phase1-watching".
func TestRunPhase1Watch_EmptyKubeconfigShortCircuits(t *testing.T) {
h := NewWithPDM(silentLogger(), &fakePDM{})
dep := makeDeploymentWithKubeconfig(t, h, "phase1-no-kubeconfig", "")
h.runPhase1Watch(dep)
dep.mu.Lock()
defer dep.mu.Unlock()
if dep.Status == "phase1-watching" {
t.Errorf("Status stuck at phase1-watching after short-circuit")
}
// Result.Phase1FinishedAt is set even though no watch ran.
if dep.Result == nil || dep.Result.Phase1FinishedAt == nil {
t.Errorf("Phase1FinishedAt should be set even on short-circuit; result=%+v", dep.Result)
}
// Exactly one warn event in the buffer (the "skipped" message).
warns := 0
for _, ev := range dep.eventsBuf {
if ev.Phase == helmwatch.PhaseComponent && ev.Level == "warn" {
warns++
}
}
if warns < 1 {
t.Errorf("expected at least 1 warn event for the kubeconfig-skipped path, got: %+v", dep.eventsBuf)
}
}
// TestGetDeployment_SurfacesComponentStatesAtTopLevel proves the
// State() snapshot lifts ComponentStates + Phase1FinishedAt to the
// top of the response so the Sovereign Admin can read them without
// unwrapping result.
func TestGetDeployment_SurfacesComponentStatesAtTopLevel(t *testing.T) {
h := NewWithPDM(silentLogger(), &fakePDM{})
dep := &Deployment{
ID: "phase1-state-surface",
Status: "ready",
StartedAt: time.Now(),
eventsCh: make(chan provisioner.Event),
done: make(chan struct{}),
Request: provisioner.Request{
SovereignFQDN: "test.example",
Region: "fsn1",
},
Result: &provisioner.Result{
SovereignFQDN: "test.example",
ComponentStates: map[string]string{
"cilium": helmwatch.StateInstalled,
"cert-manager": helmwatch.StateInstalled,
"catalyst-platform": helmwatch.StateInstalling,
},
Phase1FinishedAt: ptrTime(time.Now().UTC()),
},
}
close(dep.eventsCh)
close(dep.done)
h.deployments.Store(dep.ID, dep)
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodGet, "/api/v1/deployments/"+dep.ID, nil)
rctx := chi.NewRouteContext()
rctx.URLParams.Add("id", dep.ID)
r = r.WithContext(context.WithValue(r.Context(), chi.RouteCtxKey, rctx))
h.GetDeployment(w, r)
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200; body=%s", w.Code, w.Body.String())
}
var got map[string]any
if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil {
t.Fatalf("decode: %v", err)
}
cs, ok := got["componentStates"].(map[string]any)
if !ok {
t.Fatalf("componentStates missing or wrong type at top level: %v", got)
}
if cs["cilium"] != "installed" {
t.Errorf("componentStates[cilium] = %v, want \"installed\"", cs["cilium"])
}
if cs["catalyst-platform"] != "installing" {
t.Errorf("componentStates[catalyst-platform] = %v, want \"installing\"", cs["catalyst-platform"])
}
if got["phase1FinishedAt"] == nil {
t.Errorf("phase1FinishedAt missing at top level: %v", got)
}
}
// TestGetDeploymentEvents_ReturnsComponentEventsInBuffer proves the
// /events endpoint surfaces phase=component events the watch wrote
// into eventsBuf — same path as the SSE replay, so a wizard reload
// on a completed deployment renders per-component pills instantly.
func TestGetDeploymentEvents_ReturnsComponentEventsInBuffer(t *testing.T) {
h := NewWithPDM(silentLogger(), &fakePDM{})
h.dynamicFactory = fakeDynamicFactoryFromObjects(
makeReadyHR("bp-cilium"),
makeReadyHR("bp-cert-manager"),
)
h.phase1WatchTimeout = 3 * time.Second
dep := makeDeploymentWithKubeconfig(t, h, "phase1-events-replay", "fake-kubeconfig: yaml")
h.runPhase1Watch(dep)
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodGet, "/api/v1/deployments/"+dep.ID+"/events", nil)
rctx := chi.NewRouteContext()
rctx.URLParams.Add("id", dep.ID)
r = r.WithContext(context.WithValue(r.Context(), chi.RouteCtxKey, rctx))
h.GetDeploymentEvents(w, r)
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200; body=%s", w.Code, w.Body.String())
}
var got struct {
Events []provisioner.Event `json:"events"`
State map[string]any `json:"state"`
}
if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil {
t.Fatalf("decode: %v", err)
}
gotComponents := map[string]string{}
for _, ev := range got.Events {
if ev.Phase == helmwatch.PhaseComponent && ev.Component != "" {
gotComponents[ev.Component] = ev.State
}
}
if gotComponents["cilium"] != helmwatch.StateInstalled {
t.Errorf("/events did not surface cilium=installed, got: %v", gotComponents)
}
if gotComponents["cert-manager"] != helmwatch.StateInstalled {
t.Errorf("/events did not surface cert-manager=installed, got: %v", gotComponents)
}
}
// TestRunPhase1Watch_TimeoutFlipsStatusAndRecordsPartial proves
// that a stuck install reaches markPhase1Done after the configured
// timeout (cfg.WatchTimeout, threaded from h.phase1WatchTimeout)
// without the watch hanging forever. The test uses a single
// non-terminal release so the only exit is the timeout path.
func TestRunPhase1Watch_TimeoutFlipsStatusAndRecordsPartial(t *testing.T) {
h := NewWithPDM(silentLogger(), &fakePDM{})
h.dynamicFactory = fakeDynamicFactoryFromObjects(
makeHRWithReady("bp-keycloak", metav1.ConditionUnknown, "Progressing", "Reconciliation in progress"),
)
h.phase1WatchTimeout = 400 * time.Millisecond
dep := makeDeploymentWithKubeconfig(t, h, "phase1-timeout", "fake-kubeconfig: yaml")
start := time.Now()
h.runPhase1Watch(dep)
elapsed := time.Since(start)
if elapsed > 5*time.Second {
t.Errorf("runPhase1Watch took %v — timeout did not kick in", elapsed)
}
dep.mu.Lock()
defer dep.mu.Unlock()
// Status stays "ready" because no failure occurred — the partial
// state has keycloak=installing, no failures. The Sovereign Admin
// shell renders "1 of N components installed (timeout reached)".
// This contract: timeout without failure = "ready" with partial
// componentStates, NOT "failed".
if dep.Status != "ready" {
t.Errorf("Status = %q, want %q (timeout with no failed components is not a Phase-1 failure)", dep.Status, "ready")
}
if dep.Result.ComponentStates["keycloak"] != helmwatch.StateInstalling {
t.Errorf("ComponentStates[keycloak] = %q, want %q",
dep.Result.ComponentStates["keycloak"], helmwatch.StateInstalling)
}
if dep.Result.Phase1FinishedAt == nil {
t.Errorf("Phase1FinishedAt was not set after timeout")
}
}
// TestPhase1WatchConfig_EnvVarOverridesTimeout proves that
// CATALYST_PHASE1_WATCH_TIMEOUT parses through
// phase1WatchConfigForDeployment when h.phase1WatchTimeout is unset.
func TestPhase1WatchConfig_EnvVarOverridesTimeout(t *testing.T) {
h := NewWithPDM(silentLogger(), &fakePDM{})
t.Setenv("CATALYST_PHASE1_WATCH_TIMEOUT", "5m")
dep := makeDeploymentWithKubeconfig(t, h, "phase1-env-timeout", "fake-kubeconfig: yaml")
cfg := h.phase1WatchConfigForDeployment(dep, dep.Result.Kubeconfig)
if cfg.WatchTimeout != 5*time.Minute {
t.Errorf("WatchTimeout = %v, want 5m (from env)", cfg.WatchTimeout)
}
}
func TestPhase1WatchConfig_FieldOverrideBeatsEnv(t *testing.T) {
h := NewWithPDM(silentLogger(), &fakePDM{})
h.phase1WatchTimeout = 7 * time.Second
t.Setenv("CATALYST_PHASE1_WATCH_TIMEOUT", "5m") // ignored when h.phase1WatchTimeout is set
dep := makeDeploymentWithKubeconfig(t, h, "phase1-field-timeout", "fake-kubeconfig: yaml")
cfg := h.phase1WatchConfigForDeployment(dep, dep.Result.Kubeconfig)
if cfg.WatchTimeout != 7*time.Second {
t.Errorf("WatchTimeout = %v, want 7s (handler field override)", cfg.WatchTimeout)
}
}
// TestPodRestart_ResumeRehydratesComponentStates proves that
// ComponentStates + Phase1FinishedAt round-trip through the on-disk
// store. A Pod restart that loads a completed Phase-1 deployment
// from disk presents the same state to the Sovereign Admin as the
// pre-restart Pod did.
func TestPodRestart_ResumeRehydratesComponentStates(t *testing.T) {
tmp := t.TempDir()
st1, err := store.New(tmp)
if err != nil {
t.Fatalf("store.New: %v", err)
}
finishedAt := time.Now().UTC().Truncate(time.Second)
rec := store.Record{
ID: "rehydrate-component-states",
Status: "ready",
StartedAt: time.Now().Add(-5 * time.Minute).UTC(),
Request: store.RedactedRequest{
SovereignFQDN: "test.example",
Region: "fsn1",
},
Result: &provisioner.Result{
SovereignFQDN: "test.example",
Kubeconfig: "fake-kubeconfig: yaml",
Phase1FinishedAt: &finishedAt,
ComponentStates: map[string]string{
"cilium": helmwatch.StateInstalled,
"cert-manager": helmwatch.StateInstalled,
"catalyst-platform": helmwatch.StateFailed,
},
},
}
if err := st1.Save(rec); err != nil {
t.Fatalf("Save: %v", err)
}
// Simulate Pod restart: build a fresh handler against the same
// directory and confirm the rehydrated deployment carries
// ComponentStates + Phase1FinishedAt.
st2, err := store.New(tmp)
if err != nil {
t.Fatalf("store.New (restart): %v", err)
}
h := NewWithStore(silentLogger(), &fakePDM{}, st2)
val, ok := h.deployments.Load(rec.ID)
if !ok {
t.Fatalf("deployment %q did not rehydrate", rec.ID)
}
dep := val.(*Deployment)
if dep.Result == nil {
t.Fatalf("Result is nil after rehydrate")
}
if dep.Result.ComponentStates["cilium"] != helmwatch.StateInstalled {
t.Errorf("ComponentStates[cilium] = %q, want %q",
dep.Result.ComponentStates["cilium"], helmwatch.StateInstalled)
}
if dep.Result.ComponentStates["catalyst-platform"] != helmwatch.StateFailed {
t.Errorf("ComponentStates[catalyst-platform] = %q, want %q",
dep.Result.ComponentStates["catalyst-platform"], helmwatch.StateFailed)
}
if dep.Result.Phase1FinishedAt == nil ||
!dep.Result.Phase1FinishedAt.Equal(finishedAt) {
t.Errorf("Phase1FinishedAt = %v, want %v", dep.Result.Phase1FinishedAt, finishedAt)
}
// Kubeconfig field round-trips on disk.
if dep.Result.Kubeconfig != "fake-kubeconfig: yaml" {
t.Errorf("Kubeconfig did not round-trip: got %q", dep.Result.Kubeconfig)
}
// And the on-disk JSON includes the new fields verbatim, so a
// future schema bump that drops them gets caught here.
rawBytes, err := os.ReadFile(filepath.Join(tmp, rec.ID+".json"))
if err != nil {
t.Fatalf("read file: %v", err)
}
raw := string(rawBytes)
for _, want := range []string{
`"componentStates"`,
`"cilium": "installed"`,
`"catalyst-platform": "failed"`,
`"phase1FinishedAt"`,
} {
if !strings.Contains(raw, want) {
t.Errorf("on-disk JSON missing %q\n%s", want, raw)
}
}
}
// TestPodRestart_StuckPhase1WatchingRewrittenToFailed proves the
// existing in-flight-status rewrite covers the "phase1-watching"
// case the Phase-1 watch introduced. A Pod kill mid-watch must NOT
// leave a deployment stuck at phase1-watching; the wizard's
// FailureCard renders instead.
func TestPodRestart_StuckPhase1WatchingRewrittenToFailed(t *testing.T) {
tmp := t.TempDir()
st1, err := store.New(tmp)
if err != nil {
t.Fatalf("store.New: %v", err)
}
rec := store.Record{
ID: "rehydrate-stuck-phase1",
Status: "phase1-watching", // in-flight at restart
StartedAt: time.Now().Add(-5 * time.Minute).UTC(),
Request: store.RedactedRequest{
SovereignFQDN: "test.example",
Region: "fsn1",
},
Result: &provisioner.Result{
SovereignFQDN: "test.example",
Kubeconfig: "fake-kubeconfig: yaml",
},
}
if err := st1.Save(rec); err != nil {
t.Fatalf("Save: %v", err)
}
st2, err := store.New(tmp)
if err != nil {
t.Fatalf("store.New (restart): %v", err)
}
h := NewWithStore(silentLogger(), &fakePDM{}, st2)
val, _ := h.deployments.Load(rec.ID)
dep := val.(*Deployment)
if dep.Status != "failed" {
t.Errorf("Status = %q, want %q (in-flight phase1-watching must rewrite to failed)", dep.Status, "failed")
}
if dep.Error == "" {
t.Errorf("Error empty — operator wouldn't know why this deployment failed")
}
}
// TestEvent_ComponentAndStateFieldsOmittedForPhase0 proves the
// existing Phase-0 event wire format is unchanged: a Phase-0 OpenTofu
// event JSON-encodes without component/state keys (omitempty).
func TestEvent_ComponentAndStateFieldsOmittedForPhase0(t *testing.T) {
ev := provisioner.Event{
Time: time.Now().UTC().Format(time.RFC3339),
Phase: "tofu-apply",
Level: "info",
Message: "hcloud_server.cp[0]: Creation complete after 30s",
}
raw, err := json.Marshal(ev)
if err != nil {
t.Fatalf("Marshal: %v", err)
}
got := string(raw)
if strings.Contains(got, `"component"`) {
t.Errorf("Phase-0 event should not include component key: %s", got)
}
if strings.Contains(got, `"state"`) {
t.Errorf("Phase-0 event should not include state key: %s", got)
}
}
// TestEvent_ComponentAndStateFieldsPresentForPhase1 proves the new
// fields ARE serialized for phase=component events.
func TestEvent_ComponentAndStateFieldsPresentForPhase1(t *testing.T) {
ev := provisioner.Event{
Time: time.Now().UTC().Format(time.RFC3339),
Phase: helmwatch.PhaseComponent,
Level: "info",
Message: "Helm install succeeded",
Component: "cilium",
State: helmwatch.StateInstalled,
}
raw, err := json.Marshal(ev)
if err != nil {
t.Fatalf("Marshal: %v", err)
}
got := string(raw)
if !strings.Contains(got, `"component":"cilium"`) {
t.Errorf("Phase-1 event missing component: %s", got)
}
if !strings.Contains(got, `"state":"installed"`) {
t.Errorf("Phase-1 event missing state: %s", got)
}
}
// ─────────────────────────────────────────────────────────────────────
// helpers
// ─────────────────────────────────────────────────────────────────────
func ptrTime(t time.Time) *time.Time { return &t }

View File

@ -0,0 +1,569 @@
// Package helmwatch implements the Phase-1 HelmRelease watch loop the
// Sovereign Admin shell consumes for per-component install state and
// per-component logs.
//
// Architecture (per docs/INVIOLABLE-PRINCIPLES.md #3):
//
// - Phase 0 — OpenTofu provisions Hetzner cloud resources, the
// control plane boots k3s, cloud-init writes a Flux Kustomization
// against this monorepo's clusters/<sovereign-fqdn>/. catalyst-api
// emits SSE events for tofu-init / tofu-plan / tofu-apply /
// tofu-output / flux-bootstrap.
//
// - Phase 1 — Flux on the new Sovereign reconciles the bootstrap-kit
// Kustomization, which materialises 11 HelmReleases (bp-cilium,
// bp-cert-manager, bp-flux, bp-crossplane, bp-sealed-secrets,
// bp-spire, bp-nats-jetstream, bp-openbao, bp-keycloak, bp-gitea,
// bp-catalyst-platform) in flux-system. helm-controller installs
// each in dependency order. THIS package observes those HelmReleases
// via a client-go dynamic informer and emits per-component events
// into the same SSE stream Phase 0 used.
//
// What this package does NOT do (and must not, per principle #3):
//
// - Apply, mutate, or delete any HelmRelease — it is read-only.
// - Exec helm or kubectl — it uses client-go.
// - Hot-poll — it uses an informer's cache + Watch (dynamicinformer).
// - Produce SSE bytes itself — it returns Events via a callback. The
// handler package wires them into the durable eventsBuf + SSE
// channel using the same emit path as Phase-0 events.
//
// Lifecycle:
//
// - Watch runs from after `flux-bootstrap` until ALL bp-* HelmReleases
// have reached a terminal state (installed | failed) OR the timeout
// elapses (60 minutes default, override via
// CATALYST_PHASE1_WATCH_TIMEOUT).
// - On Pod restart, the deployment's persisted Result.Kubeconfig +
// Result.ComponentStates are rehydrated and the watch resumes from
// the cluster's current observed state (idempotent — emitting an
// "installed" event for an already-installed release is harmless;
// the SSE consumer keys off the State enum, not event count).
package helmwatch
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
)
// HelmReleaseGVR is the v2 helm-controller GVR. flux v2.4 ships
// helm.toolkit.fluxcd.io/v2 as the stable HelmRelease API. Catalyst-Zero
// pins flux v2.4.0 in cloudinit-control-plane.tftpl, so this is the
// only GVR the watch needs to know about.
//
// The bootstrap-kit YAMLs in clusters/_template/bootstrap-kit/*.yaml
// already use apiVersion: helm.toolkit.fluxcd.io/v2 (verified
// 2026-04-29). If a future Flux upgrade introduces v3, update both
// here AND the cloud-init pinned Flux release SHA in lockstep.
var HelmReleaseGVR = schema.GroupVersionResource{
Group: "helm.toolkit.fluxcd.io",
Version: "v2",
Resource: "helmreleases",
}
// FluxNamespace — bootstrap-kit's HelmReleases all live here. The
// chart bp-catalyst-platform reconciles into other namespaces but the
// HelmRelease object itself stays in flux-system.
const FluxNamespace = "flux-system"
// HelmControllerSelector — selects helm-controller pods for log tailing.
// flux-controllers ship with the canonical app=helm-controller label.
const HelmControllerSelector = "app=helm-controller"
// Default watch timeout — bp-catalyst-platform has the longest install
// because it depends on Crossplane CRDs settling. 60 minutes is the
// upper bound observed in DoD runs against omantel.omani.works; the
// median is closer to 8 minutes.
const DefaultWatchTimeout = 60 * time.Minute
// MinComponentCount — the bootstrap-kit ships exactly 11 bp-* HelmReleases
// (clusters/_template/bootstrap-kit/01-cilium → 11-bp-catalyst-platform).
// The Watch terminates when all OBSERVED HelmReleases reach terminal
// state, not when N have appeared, but tests assert this constant so
// any future drift in the kit count surfaces here too.
const MinComponentCount = 11
// State enums — kept as constants so callers (handler, tests) compare
// against them by identifier rather than literal strings.
const (
StatePending = "pending"
StateInstalling = "installing"
StateInstalled = "installed"
StateDegraded = "degraded"
StateFailed = "failed"
)
// terminalStates — once a component reaches one of these, the watch
// stops emitting state-change events for it. "degraded" is NOT terminal
// — a degraded component can recover (Flux retries automatically); the
// watch keeps emitting until it converges to installed or failed. This
// is the documented Sovereign Admin contract: "X of Y components
// installed" excludes degraded from the installed count.
var terminalStates = map[string]bool{
StateInstalled: true,
StateFailed: true,
}
// Phase identifiers — kept here so the handler and the watch agree on
// the wire format byte-for-byte. The Sovereign Admin's Logs tab keys
// off Phase == "component-log" to filter helm-controller noise from
// bp-* HelmRelease lifecycle events.
const (
PhaseComponent = "component"
PhaseComponentLog = "component-log"
)
// Emit is the callback the Watcher invokes for every event it derives.
// The handler's runProvisioning tee passes provisioner.recordEventAndPersist
// here so the durable eventsBuf + SSE channel get every component event
// the same way they get Phase-0 OpenTofu events.
type Emit func(ev provisioner.Event)
// Config — runtime configuration the Watcher reads. Production wires
// this from environment + Deployment.Result.Kubeconfig; tests inject
// via the Watcher constructor.
type Config struct {
// KubeconfigYAML — raw bytes of the new Sovereign's k3s kubeconfig.
// Empty string is invalid (Watch returns an error immediately).
KubeconfigYAML string
// WatchTimeout — overall budget for Phase 1. After this, the watch
// terminates regardless of HelmRelease state. Defaults to
// DefaultWatchTimeout; the catalyst-api's main reads
// CATALYST_PHASE1_WATCH_TIMEOUT and passes the parsed Duration.
WatchTimeout time.Duration
// Now — clock injection point. Production passes time.Now; tests
// inject a fake clock so termination-on-timeout is deterministic.
Now func() time.Time
// DynamicFactory — produces a dynamic.Interface from the kubeconfig.
// Production passes NewDynamicClientFromKubeconfig; tests inject a
// closure returning a fake.NewSimpleDynamicClient so no real cluster
// is needed.
DynamicFactory func(kubeconfigYAML string) (dynamic.Interface, error)
// CoreFactory — produces a kubernetes.Interface for log tailing
// and Pod listing (helm-controller log tail uses
// CoreV1().Pods().GetLogs()). Optional: a nil factory disables log
// streaming and the watch emits only state-change events. Tests
// keep this nil unless they're specifically exercising the log
// path.
CoreFactory func(kubeconfigYAML string) (kubernetes.Interface, error)
// Resync — informer resync period. Defaults to 30s. Tests override
// to 0 (event-driven only) to keep test runtime tight.
Resync time.Duration
}
func (c *Config) applyDefaults() {
if c.WatchTimeout <= 0 {
c.WatchTimeout = DefaultWatchTimeout
}
if c.Now == nil {
c.Now = time.Now
}
if c.Resync <= 0 {
c.Resync = 30 * time.Second
}
}
// Watcher observes bp-* HelmReleases in flux-system on the new
// Sovereign cluster and emits per-component events. Construct via
// NewWatcher; call Watch to run until termination.
type Watcher struct {
cfg Config
emit Emit
// states is the in-flight state map keyed by componentId. Mutated
// only inside processEvent; readers (terminalStatesSnapshot) take
// the mutex.
mu sync.Mutex
states map[string]string
// observed tracks every componentId the watch has seen at least
// once. The all-installed-or-failed termination check iterates
// over this set, not the static MinComponentCount, so the watch
// terminates correctly on a future bootstrap-kit that ships more
// or fewer components without code change.
observed map[string]struct{}
}
// NewWatcher returns a Watcher with cfg applied. emit must be non-nil
// — a nil emit is a programmer error (the watch's only output channel
// is the callback).
func NewWatcher(cfg Config, emit Emit) (*Watcher, error) {
if emit == nil {
return nil, errors.New("helmwatch: emit callback is required")
}
if strings.TrimSpace(cfg.KubeconfigYAML) == "" {
return nil, errors.New("helmwatch: kubeconfig is required (deployment.Result.Kubeconfig was empty)")
}
if cfg.DynamicFactory == nil {
cfg.DynamicFactory = NewDynamicClientFromKubeconfig
}
cfg.applyDefaults()
return &Watcher{
cfg: cfg,
emit: emit,
states: make(map[string]string),
observed: make(map[string]struct{}),
}, nil
}
// Watch runs the informer-backed watch loop until termination.
// Returns the final per-component state map and nil on clean
// termination (all terminal OR timeout); returns ctx.Err() if the
// caller cancels.
//
// Concurrency: Watch is single-shot. Calling it twice on the same
// Watcher is a programmer error (the informer would double-register).
//
// The state machine that maps HelmRelease.status.conditions →
// State enum lives in deriveState, which is exported for tests.
func (w *Watcher) Watch(ctx context.Context) (map[string]string, error) {
dyn, err := w.cfg.DynamicFactory(w.cfg.KubeconfigYAML)
if err != nil {
return nil, fmt.Errorf("helmwatch: build dynamic client: %w", err)
}
// Per-watch context with the configured timeout. We derive from
// the caller's ctx so the handler's parent context cancel
// (deployment delete, Pod shutdown) propagates.
watchCtx, cancel := context.WithTimeout(ctx, w.cfg.WatchTimeout)
defer cancel()
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
dyn,
w.cfg.Resync,
FluxNamespace,
nil,
)
informer := factory.ForResource(HelmReleaseGVR).Informer()
// terminated fires when every observed component has reached a
// terminal state. processEvent closes it under w.mu so a
// double-close is impossible.
terminated := make(chan struct{})
var closeOnce sync.Once
handler := cache.FilteringResourceEventHandler{
FilterFunc: func(obj any) bool {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return false
}
return strings.HasPrefix(u.GetName(), "bp-")
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
w.processEvent(obj, terminated, &closeOnce)
},
UpdateFunc: func(_, obj any) {
w.processEvent(obj, terminated, &closeOnce)
},
// DeleteFunc — a HelmRelease being deleted mid-bootstrap
// is operator action (or a Flux suspend/delete
// reconciliation). We don't emit a synthetic state for
// it because that would race the operator's intent. The
// Sovereign Admin treats absent-from-cluster as "n/a"
// in its overall percentage.
},
}
if _, err := informer.AddEventHandler(handler); err != nil {
return nil, fmt.Errorf("helmwatch: register event handler: %w", err)
}
// Optional log tailer — runs in parallel with the state watch and
// terminates on the same context.
if w.cfg.CoreFactory != nil {
core, err := w.cfg.CoreFactory(w.cfg.KubeconfigYAML)
if err != nil {
// Non-fatal — emit a warn event and continue without log
// streaming. The state watch is the load-bearing part.
w.emit(provisioner.Event{
Time: w.cfg.Now().UTC().Format(time.RFC3339),
Phase: PhaseComponent,
Level: "warn",
Message: "helm-controller log tailing disabled: build core client: " + err.Error(),
})
} else {
tailer := newLogTailer(core, w.emit, w.cfg.Now)
go tailer.run(watchCtx)
}
}
// Start the informer. factory.Start launches a goroutine per
// resource type — we have one (HelmRelease) so it's a single
// goroutine that reads from the apiserver Watch endpoint.
factory.Start(watchCtx.Done())
if !cache.WaitForCacheSync(watchCtx.Done(), informer.HasSynced) {
// Sync failed — usually because watchCtx already cancelled
// (timeout or caller). We still emit a final state event for
// every observed component so the SSE consumer can render the
// stuck-where-we-got-to view.
final := w.terminalStatesSnapshot()
return final, fmt.Errorf("helmwatch: informer cache failed to sync: %w", watchCtx.Err())
}
// Wait for either all-terminal or context done.
select {
case <-terminated:
// Clean termination — every observed component is in a
// terminal state.
case <-watchCtx.Done():
// Timeout or caller cancel — emit a single warn event so the
// Sovereign Admin can render "watch ended (timeout): X of Y
// installed" rather than going silent.
w.emit(provisioner.Event{
Time: w.cfg.Now().UTC().Format(time.RFC3339),
Phase: PhaseComponent,
Level: "warn",
Message: "Phase-1 watch terminated by context: " + watchCtx.Err().Error() + " — see ComponentStates for current outcome",
})
}
final := w.terminalStatesSnapshot()
return final, nil
}
// processEvent maps an informer Add/Update event to a state-change Event.
//
// We emit ONLY on transitions: if the component's last-seen state
// equals the derived current state, no event flows. This matters
// because dynamic informers fire UpdateFunc on every status subresource
// patch (including helm-controller's own observedGeneration touches),
// and the Sovereign Admin's status pill should not flicker at sub-
// second cadence.
func (w *Watcher) processEvent(obj any, terminated chan struct{}, closeOnce *sync.Once) {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return
}
name := u.GetName()
if !strings.HasPrefix(name, "bp-") {
return
}
componentID := ComponentIDFromHelmRelease(name)
conds, _ := extractConditions(u)
state := DeriveState(conds)
level := levelFromState(state)
message := messageFromConditions(conds, state)
w.mu.Lock()
prev := w.states[componentID]
w.states[componentID] = state
w.observed[componentID] = struct{}{}
allTerminal := allObservedTerminal(w.states, w.observed)
w.mu.Unlock()
if prev != state {
w.emit(provisioner.Event{
Time: w.cfg.Now().UTC().Format(time.RFC3339),
Phase: PhaseComponent,
Level: level,
Component: componentID,
State: state,
Message: message,
})
}
if allTerminal {
closeOnce.Do(func() { close(terminated) })
}
}
// allObservedTerminal returns true when every component the watch has
// observed at least once is in a terminal state. With zero observed
// components it returns false — an empty cluster cannot be "done."
func allObservedTerminal(states map[string]string, observed map[string]struct{}) bool {
if len(observed) == 0 {
return false
}
for id := range observed {
if !terminalStates[states[id]] {
return false
}
}
return true
}
// terminalStatesSnapshot returns a copy of the state map for the caller
// to publish into Deployment.Result.ComponentStates. Holds w.mu.
func (w *Watcher) terminalStatesSnapshot() map[string]string {
w.mu.Lock()
defer w.mu.Unlock()
out := make(map[string]string, len(w.states))
for k, v := range w.states {
out[k] = v
}
return out
}
// ComponentIDFromHelmRelease normalises a HelmRelease metadata.name
// ("bp-cilium") to the Sovereign Admin's component id ("cilium").
//
// Bootstrap-kit's bp-catalyst-platform is special: it's the umbrella
// chart for catalyst-platform, so the component id is "catalyst-
// platform". Every other bp-* release strips just the "bp-" prefix.
//
// A name that doesn't start with "bp-" returns the name unchanged —
// the Watcher's filter rejects those before processEvent runs, but
// the function is exported so tests can drive it on arbitrary input.
func ComponentIDFromHelmRelease(name string) string {
return strings.TrimPrefix(name, "bp-")
}
// DeriveState implements the state machine documented on
// provisioner.Event. Exported so tests can drive it on synthetic
// condition slices without spinning a fake informer.
//
// The conditions slice is the HelmRelease's status.conditions, where
// each entry is the standard metav1.Condition shape. The Ready
// condition is the load-bearing one; Reconciling and Released are
// auxiliary signals helm-controller writes alongside.
func DeriveState(conds []metav1.Condition) string {
ready := findCondition(conds, "Ready")
if ready == nil {
return StatePending
}
switch ready.Status {
case metav1.ConditionTrue:
return StateInstalled
case metav1.ConditionUnknown:
return StateInstalling
case metav1.ConditionFalse:
// Differentiate "waiting on a dependency" (still pending,
// not failed) from "install actually broke" (failed).
if isDependencyMessage(ready.Message) {
return StatePending
}
switch ready.Reason {
case "InstallFailed",
"UpgradeFailed",
"ChartPullError",
"ChartLoadError",
"ArtifactFailed":
return StateFailed
case "Progressing", "ReconcileStarted", "DependencyNotReady":
return StateInstalling
}
// Fallback: a Ready=False without a reason we recognise as
// "still working" is degraded — flux flips it to True again
// once the underlying deployment recovers.
return StateDegraded
}
return StatePending
}
// isDependencyMessage matches helm-controller's standard "dependency 'X'
// is not ready" message family. We pin on substring rather than reason
// because helm-controller emits Reason=DependencyNotReady AND
// Reason=Reconciling depending on the path — but the message is stable.
func isDependencyMessage(msg string) bool {
if msg == "" {
return false
}
low := strings.ToLower(msg)
return strings.Contains(low, "dependency '") && strings.Contains(low, "is not ready")
}
func findCondition(conds []metav1.Condition, t string) *metav1.Condition {
for i := range conds {
if conds[i].Type == t {
return &conds[i]
}
}
return nil
}
// extractConditions reads status.conditions out of an unstructured
// HelmRelease. We use the runtime DefaultUnstructuredConverter rather
// than hand-walking the map so a future field reorder in the v2 API
// doesn't silently break the mapping.
func extractConditions(u *unstructured.Unstructured) ([]metav1.Condition, error) {
raw, found, err := unstructured.NestedSlice(u.Object, "status", "conditions")
if err != nil || !found {
return nil, err
}
out := make([]metav1.Condition, 0, len(raw))
for _, c := range raw {
cMap, ok := c.(map[string]any)
if !ok {
continue
}
var cond metav1.Condition
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(cMap, &cond); err != nil {
continue
}
out = append(out, cond)
}
return out, nil
}
func levelFromState(state string) string {
switch state {
case StateFailed:
return "error"
case StateDegraded:
return "warn"
default:
return "info"
}
}
func messageFromConditions(conds []metav1.Condition, state string) string {
if ready := findCondition(conds, "Ready"); ready != nil && ready.Message != "" {
return ready.Message
}
switch state {
case StatePending:
return "HelmRelease observed, waiting for first reconcile"
case StateInstalling:
return "Helm install in progress"
case StateInstalled:
return "Helm install complete; Ready=True"
case StateDegraded:
return "Ready=False without InstallFailed/UpgradeFailed reason"
case StateFailed:
return "Helm install failed"
}
return ""
}
// CompileWatchTimeout — small helper for the handler so the
// CATALYST_PHASE1_WATCH_TIMEOUT env-var parse path is testable.
// Returns DefaultWatchTimeout for empty / unparseable input.
func CompileWatchTimeout(raw string) time.Duration {
raw = strings.TrimSpace(raw)
if raw == "" {
return DefaultWatchTimeout
}
d, err := time.ParseDuration(raw)
if err != nil || d <= 0 {
return DefaultWatchTimeout
}
return d
}

View File

@ -0,0 +1,808 @@
// Tests for the Phase-1 HelmRelease watch loop.
//
// What this file proves (matches the GATES checklist for the
// HelmRelease-watch task):
//
// 1. DeriveState — the documented condition→state machine, every
// row of the docs/PROVISIONING-PLAN.md state-table.
// 2. Watcher.Watch — given a fake.NewSimpleDynamicClient seeded with
// bp-* HelmReleases that progress through pending → installing →
// installed (and one that fails), the watch emits the right
// phase: "component" Events and terminates when every observed
// component reaches a terminal state.
// 3. Termination on timeout — when one component is stuck in
// "installing" forever, the watch terminates after WatchTimeout
// and returns the partial state map so markPhase1Done can
// persist what it observed.
// 4. ComponentIDFromHelmRelease + CompileWatchTimeout — pure helper
// coverage so a future rename of the prefix or env var lands as a
// test failure instead of silent drift.
// 5. Pod-restart resume — a Watcher freshly constructed against an
// already-Ready HelmRelease emits exactly one "installed" event
// and terminates. This is the rehydrate-from-PVC path.
//
// We use the apimachinery dynamic fake client with an
// UnstructuredList registered for HelmRelease so the dynamic
// informer's List + Watch calls both resolve. Each test creates its
// own fake client so concurrent tests can't observe each other's
// HelmReleases.
package helmwatch
import (
"context"
"strings"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
)
// helmReleaseListGVK — the HelmReleaseList GVK the dynamic fake
// client needs registered so List(...) calls resolve. Without this,
// the informer's first List returns "no kind 'HelmReleaseList' is
// registered" and the watch never starts.
var helmReleaseListGVK = schema.GroupVersionKind{
Group: "helm.toolkit.fluxcd.io",
Version: "v2",
Kind: "HelmReleaseList",
}
// newFakeScheme returns a runtime.Scheme with the HelmReleaseList GVK
// registered so the dynamic fake informer can List+Watch.
func newFakeScheme() *runtime.Scheme {
scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(helmReleaseListGVK, &unstructured.UnstructuredList{})
return scheme
}
// makeHelmRelease constructs a single bp-* HelmRelease with the given
// status conditions. Reason / message are optional — pass empty strings
// to omit (the apimachinery converter handles missing fields cleanly).
func makeHelmRelease(name string, conds []metav1.Condition) *unstructured.Unstructured {
condMaps := make([]any, 0, len(conds))
for _, c := range conds {
condMaps = append(condMaps, map[string]any{
"type": c.Type,
"status": string(c.Status),
"reason": c.Reason,
"message": c.Message,
"lastTransitionTime": time.Now().UTC().Format(time.RFC3339),
})
}
u := &unstructured.Unstructured{
Object: map[string]any{
"apiVersion": "helm.toolkit.fluxcd.io/v2",
"kind": "HelmRelease",
"metadata": map[string]any{
"name": name,
"namespace": FluxNamespace,
},
"spec": map[string]any{
"chart": map[string]any{
"spec": map[string]any{
"chart": name,
},
},
},
"status": map[string]any{
"conditions": condMaps,
},
},
}
u.SetGroupVersionKind(schema.GroupVersionKind{
Group: "helm.toolkit.fluxcd.io",
Version: "v2",
Kind: "HelmRelease",
})
return u
}
// recorder — collects every Event the Watcher emits for assertions.
type recorder struct {
mu sync.Mutex
events []provisioner.Event
}
func (r *recorder) emit(ev provisioner.Event) {
r.mu.Lock()
r.events = append(r.events, ev)
r.mu.Unlock()
}
func (r *recorder) snapshot() []provisioner.Event {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]provisioner.Event, len(r.events))
copy(out, r.events)
return out
}
// componentStateEvents filters down to phase=component events with a
// non-empty Component (excludes the "watch terminated by context"
// info message which has no Component set).
func (r *recorder) componentStateEvents() []provisioner.Event {
out := []provisioner.Event{}
for _, ev := range r.snapshot() {
if ev.Phase == PhaseComponent && ev.Component != "" {
out = append(out, ev)
}
}
return out
}
// fakeFactory — closure the Watcher's Config calls to acquire the
// dynamic client. Tests pass this in via Config.DynamicFactory so no
// real cluster is needed.
func fakeFactory(client dynamic.Interface) func(string) (dynamic.Interface, error) {
return func(_ string) (dynamic.Interface, error) {
return client, nil
}
}
// ─────────────────────────────────────────────────────────────────────
// DeriveState — pure state-machine coverage. One test per documented
// row of the state table.
// ─────────────────────────────────────────────────────────────────────
func TestDeriveState_NoReadyCondition_IsPending(t *testing.T) {
got := DeriveState(nil)
if got != StatePending {
t.Fatalf("no Ready condition → expected %q, got %q", StatePending, got)
}
}
func TestDeriveState_ReadyTrue_IsInstalled(t *testing.T) {
got := DeriveState([]metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
})
if got != StateInstalled {
t.Fatalf("Ready=True → expected %q, got %q", StateInstalled, got)
}
}
func TestDeriveState_ReadyUnknown_IsInstalling(t *testing.T) {
got := DeriveState([]metav1.Condition{
{Type: "Ready", Status: metav1.ConditionUnknown, Reason: "Progressing", Message: "Reconciliation in progress"},
})
if got != StateInstalling {
t.Fatalf("Ready=Unknown → expected %q, got %q", StateInstalling, got)
}
}
func TestDeriveState_ReadyFalse_InstallFailed_IsFailed(t *testing.T) {
got := DeriveState([]metav1.Condition{
{Type: "Ready", Status: metav1.ConditionFalse, Reason: "InstallFailed", Message: "chart failed: timed out waiting for the condition"},
})
if got != StateFailed {
t.Fatalf("Ready=False reason=InstallFailed → expected %q, got %q", StateFailed, got)
}
}
func TestDeriveState_ReadyFalse_UpgradeFailed_IsFailed(t *testing.T) {
got := DeriveState([]metav1.Condition{
{Type: "Ready", Status: metav1.ConditionFalse, Reason: "UpgradeFailed", Message: "upgrade retries exhausted"},
})
if got != StateFailed {
t.Fatalf("Ready=False reason=UpgradeFailed → expected %q, got %q", StateFailed, got)
}
}
func TestDeriveState_ReadyFalse_ChartPullError_IsFailed(t *testing.T) {
got := DeriveState([]metav1.Condition{
{Type: "Ready", Status: metav1.ConditionFalse, Reason: "ChartPullError", Message: "GET ghcr.io/...: 401"},
})
if got != StateFailed {
t.Fatalf("Ready=False reason=ChartPullError → expected %q, got %q", StateFailed, got)
}
}
func TestDeriveState_ReadyFalse_DependencyMessage_IsPending(t *testing.T) {
got := DeriveState([]metav1.Condition{
{Type: "Ready", Status: metav1.ConditionFalse, Reason: "DependencyNotReady", Message: "dependency 'flux-system/bp-cilium' is not ready"},
})
if got != StatePending {
t.Fatalf("Ready=False with dependency message → expected %q (waiting), got %q", StatePending, got)
}
}
func TestDeriveState_ReadyFalse_Progressing_IsInstalling(t *testing.T) {
got := DeriveState([]metav1.Condition{
{Type: "Ready", Status: metav1.ConditionFalse, Reason: "Progressing", Message: "Reconciliation in progress"},
})
if got != StateInstalling {
t.Fatalf("Ready=False reason=Progressing → expected %q, got %q", StateInstalling, got)
}
}
func TestDeriveState_ReadyFalse_UnknownReason_IsDegraded(t *testing.T) {
// A Ready=False with no reason we recognise as either failed or
// still-progressing falls into the degraded bucket — the install
// completed but readiness was lost (e.g. a Pod went CrashLoopBackOff
// after first install). Flux retries; the watch keeps emitting
// until the component re-converges.
got := DeriveState([]metav1.Condition{
{Type: "Ready", Status: metav1.ConditionFalse, Reason: "RetryExhausted", Message: "deployment.apps/foo: rollout stuck"},
})
if got != StateDegraded {
t.Fatalf("Ready=False with unknown reason → expected %q, got %q", StateDegraded, got)
}
}
// ─────────────────────────────────────────────────────────────────────
// ComponentIDFromHelmRelease + CompileWatchTimeout — helpers.
// ─────────────────────────────────────────────────────────────────────
func TestComponentIDFromHelmRelease_StripsPrefix(t *testing.T) {
cases := map[string]string{
"bp-cilium": "cilium",
"bp-cert-manager": "cert-manager",
"bp-catalyst-platform": "catalyst-platform",
"helm-controller": "helm-controller",
"": "",
}
for in, want := range cases {
if got := ComponentIDFromHelmRelease(in); got != want {
t.Errorf("ComponentIDFromHelmRelease(%q) = %q, want %q", in, got, want)
}
}
}
func TestCompileWatchTimeout_DefaultOnEmpty(t *testing.T) {
if got := CompileWatchTimeout(""); got != DefaultWatchTimeout {
t.Fatalf("empty input → expected %v, got %v", DefaultWatchTimeout, got)
}
}
func TestCompileWatchTimeout_DefaultOnInvalid(t *testing.T) {
if got := CompileWatchTimeout("not-a-duration"); got != DefaultWatchTimeout {
t.Fatalf("invalid input → expected default %v, got %v", DefaultWatchTimeout, got)
}
if got := CompileWatchTimeout("-5m"); got != DefaultWatchTimeout {
t.Fatalf("negative input → expected default %v, got %v", DefaultWatchTimeout, got)
}
}
func TestCompileWatchTimeout_ParsesValid(t *testing.T) {
if got := CompileWatchTimeout("90m"); got != 90*time.Minute {
t.Fatalf("90m → expected %v, got %v", 90*time.Minute, got)
}
if got := CompileWatchTimeout("2h"); got != 2*time.Hour {
t.Fatalf("2h → expected %v, got %v", 2*time.Hour, got)
}
}
// ─────────────────────────────────────────────────────────────────────
// Watcher.Watch — informer-driven coverage.
// ─────────────────────────────────────────────────────────────────────
// TestWatch_AllReleasesAlreadyInstalled_TerminatesQuickly proves the
// rehydrate-from-PVC path: a Watcher that attaches to a cluster where
// every bp-* HelmRelease already has Ready=True emits exactly one
// "installed" event per component and terminates.
func TestWatch_AllReleasesAlreadyInstalled_TerminatesQuickly(t *testing.T) {
scheme := newFakeScheme()
releases := []runtime.Object{
makeHelmRelease("bp-cilium", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
}),
makeHelmRelease("bp-cert-manager", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
}),
makeHelmRelease("bp-flux", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
}),
}
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
releases...,
)
rec := &recorder{}
cfg := Config{
KubeconfigYAML: "fake-kubeconfig: bytes",
WatchTimeout: 5 * time.Second,
DynamicFactory: fakeFactory(client),
Resync: 0, // event-driven only
}
w, err := NewWatcher(cfg, rec.emit)
if err != nil {
t.Fatalf("NewWatcher: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
final, err := w.Watch(ctx)
if err != nil {
t.Fatalf("Watch: %v", err)
}
if got, want := len(final), 3; got != want {
t.Errorf("expected %d component states, got %d (states=%v)", want, got, final)
}
for _, comp := range []string{"cilium", "cert-manager", "flux"} {
if final[comp] != StateInstalled {
t.Errorf("component %q state = %q, want %q", comp, final[comp], StateInstalled)
}
}
componentEvents := rec.componentStateEvents()
if len(componentEvents) != 3 {
t.Errorf("expected 3 phase=component events, got %d:\n%+v", len(componentEvents), componentEvents)
}
for _, ev := range componentEvents {
if ev.State != StateInstalled {
t.Errorf("component %q event State = %q, want %q", ev.Component, ev.State, StateInstalled)
}
if ev.Phase != PhaseComponent {
t.Errorf("expected Phase=%q, got %q", PhaseComponent, ev.Phase)
}
}
}
// TestWatch_TransitionsEmitInOrder proves the full state machine
// across an Add (pending) → Update (installing) → Update (installed)
// sequence on a single HelmRelease. The watch must emit three
// Events, each with the right State, then terminate when the
// release reaches Installed.
func TestWatch_TransitionsEmitInOrder(t *testing.T) {
scheme := newFakeScheme()
// Start with no Ready condition → pending.
hr := makeHelmRelease("bp-cilium", nil)
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
hr,
)
rec := &recorder{}
cfg := Config{
KubeconfigYAML: "fake",
WatchTimeout: 5 * time.Second,
DynamicFactory: fakeFactory(client),
Resync: 0,
}
w, err := NewWatcher(cfg, rec.emit)
if err != nil {
t.Fatalf("NewWatcher: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Drive the transitions in a goroutine — the Watch is blocking,
// and we need the informer to observe each Update before we issue
// the next one. The fake client's Update is synchronous w.r.t. the
// store, so the informer's UpdateFunc fires reliably.
done := make(chan map[string]string, 1)
go func() {
final, _ := w.Watch(ctx)
done <- final
}()
// Wait until the watch has observed the initial pending state.
if !waitForCondition(t, 2*time.Second, func() bool {
return len(rec.componentStateEvents()) >= 1
}) {
t.Fatalf("watch never emitted pending event")
}
// Update 1: Ready=Unknown → installing.
updateHR(t, client, "bp-cilium", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionUnknown, Reason: "Progressing", Message: "Reconciliation in progress"},
})
if !waitForCondition(t, 2*time.Second, func() bool {
return countWithState(rec.componentStateEvents(), StateInstalling) >= 1
}) {
t.Fatalf("watch never emitted installing event")
}
// Update 2: Ready=True → installed (terminal).
updateHR(t, client, "bp-cilium", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
})
select {
case final := <-done:
if final["cilium"] != StateInstalled {
t.Errorf("final state = %q, want %q", final["cilium"], StateInstalled)
}
case <-time.After(5 * time.Second):
t.Fatalf("watch did not terminate after Ready=True")
}
// Verify the event sequence: pending, installing, installed.
events := rec.componentStateEvents()
gotStates := make([]string, len(events))
for i, e := range events {
gotStates[i] = e.State
}
wantSubseq := []string{StatePending, StateInstalling, StateInstalled}
if !containsSubsequence(gotStates, wantSubseq) {
t.Errorf("events did not contain expected state subsequence %v in %v", wantSubseq, gotStates)
}
}
// TestWatch_FailedReleaseTerminatesAndIsFailed proves "failed" counts
// as terminal — the watch ends, the final state map has "failed",
// and the upstream markPhase1Done flips Status=failed.
func TestWatch_FailedReleaseTerminatesAndIsFailed(t *testing.T) {
scheme := newFakeScheme()
hr := makeHelmRelease("bp-cert-manager", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionFalse, Reason: "InstallFailed", Message: "chart failed: timed out"},
})
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
hr,
)
rec := &recorder{}
cfg := Config{
KubeconfigYAML: "fake",
WatchTimeout: 5 * time.Second,
DynamicFactory: fakeFactory(client),
Resync: 0,
}
w, err := NewWatcher(cfg, rec.emit)
if err != nil {
t.Fatalf("NewWatcher: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
final, err := w.Watch(ctx)
if err != nil {
t.Fatalf("Watch: %v", err)
}
if final["cert-manager"] != StateFailed {
t.Errorf("expected cert-manager=%q, got %q", StateFailed, final["cert-manager"])
}
events := rec.componentStateEvents()
if len(events) != 1 || events[0].State != StateFailed || events[0].Level != "error" {
t.Errorf("expected one error/failed event, got: %+v", events)
}
}
// TestWatch_TimeoutTerminatesWithPartialState proves that a stuck
// installing component does not block forever — the watch terminates
// on its configured timeout and the partial state is returned.
//
// Why a single stuck component (not "stuck + done") in this test:
// the informer fires AddFunc per object as the cache syncs in
// arbitrary order. If we seed both a non-terminal AND a terminal
// release, the order in which they reach processEvent is racy — when
// the terminal one arrives FIRST and the non-terminal hasn't yet
// been observed, allObservedTerminal({stuck:terminal}) == true and
// the watch closes `terminated` at that intermediate state. That is
// correct behaviour (the watch terminates as soon as every OBSERVED
// release is terminal), but it makes the test flaky for the
// timeout-path assertion. So we make the timeout-path the only
// possible exit by seeding only non-terminal releases.
func TestWatch_TimeoutTerminatesWithPartialState(t *testing.T) {
scheme := newFakeScheme()
stuck := makeHelmRelease("bp-keycloak", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionUnknown, Reason: "Progressing", Message: "Reconciliation in progress"},
})
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
stuck,
)
rec := &recorder{}
cfg := Config{
KubeconfigYAML: "fake",
WatchTimeout: 400 * time.Millisecond, // tiny — termination must come from this
DynamicFactory: fakeFactory(client),
Resync: 0,
}
w, err := NewWatcher(cfg, rec.emit)
if err != nil {
t.Fatalf("NewWatcher: %v", err)
}
start := time.Now()
final, err := w.Watch(context.Background())
elapsed := time.Since(start)
if err != nil {
t.Fatalf("Watch returned err: %v", err)
}
if elapsed > 5*time.Second {
t.Errorf("watch took %v — expected to terminate near WatchTimeout (400ms)", elapsed)
}
if elapsed < 200*time.Millisecond {
t.Errorf("watch took only %v — must have observed timeout, not all-terminal close", elapsed)
}
if final["keycloak"] != StateInstalling {
t.Errorf("expected keycloak=%q (stuck mid-install), got %q", StateInstalling, final["keycloak"])
}
// A timeout-terminated watch emits a "watch terminated by context"
// warn event so the SSE consumer can render the partial outcome.
sawTimeoutEvent := false
for _, ev := range rec.snapshot() {
if ev.Phase == PhaseComponent && strings.Contains(ev.Message, "watch terminated by context") {
sawTimeoutEvent = true
break
}
}
if !sawTimeoutEvent {
t.Errorf("expected a 'watch terminated by context' warn event, got: %+v", rec.snapshot())
}
}
// TestWatch_NonBPReleaseFiltered proves the FilterFunc keeps the
// watch focused on bp-* HelmReleases. A HelmRelease that doesn't
// start with "bp-" is ignored — it's not in the bootstrap-kit and
// emitting events for it would confuse the Sovereign Admin's "X of
// Y bootstrap components" counter.
func TestWatch_NonBPReleaseFiltered(t *testing.T) {
scheme := newFakeScheme()
bp := makeHelmRelease("bp-cilium", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "ok"},
})
other := makeHelmRelease("some-other-chart", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "ok"},
})
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
bp, other,
)
rec := &recorder{}
cfg := Config{
KubeconfigYAML: "fake",
WatchTimeout: 5 * time.Second,
DynamicFactory: fakeFactory(client),
Resync: 0,
}
w, err := NewWatcher(cfg, rec.emit)
if err != nil {
t.Fatalf("NewWatcher: %v", err)
}
final, err := w.Watch(context.Background())
if err != nil {
t.Fatalf("Watch: %v", err)
}
if _, ok := final["cilium"]; !ok {
t.Errorf("expected cilium in final states, got: %v", final)
}
if _, ok := final["other-chart"]; ok {
t.Errorf("non-bp- HelmRelease should not appear in final states: %v", final)
}
if _, ok := final["some-other-chart"]; ok {
t.Errorf("non-bp- HelmRelease should not appear in final states: %v", final)
}
}
// TestWatch_MissingKubeconfigRejected proves the Watcher refuses to
// start when the kubeconfig field is empty — this is the rehydrate
// path's failure mode (a deployment whose Phase 0 finished before
// kubeconfig capture lands).
func TestWatch_MissingKubeconfigRejected(t *testing.T) {
rec := &recorder{}
_, err := NewWatcher(Config{
KubeconfigYAML: "",
DynamicFactory: fakeFactory(nil),
}, rec.emit)
if err == nil {
t.Fatalf("expected NewWatcher to reject empty kubeconfig")
}
}
func TestWatch_NilEmitRejected(t *testing.T) {
_, err := NewWatcher(Config{KubeconfigYAML: "fake"}, nil)
if err == nil {
t.Fatalf("expected NewWatcher to reject nil emit callback")
}
}
// TestWatch_OnlyEmitsOnTransition proves the de-dup branch: a second
// informer Update event for the same HelmRelease with no state change
// (e.g. a status subresource patch from helm-controller's
// observedGeneration touch) does NOT produce a duplicate Event. The
// Sovereign Admin's status pill must not flicker at sub-second
// cadence.
func TestWatch_OnlyEmitsOnTransition(t *testing.T) {
scheme := newFakeScheme()
hr := makeHelmRelease("bp-cilium", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
})
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
hr,
)
rec := &recorder{}
cfg := Config{
KubeconfigYAML: "fake",
WatchTimeout: 2 * time.Second,
DynamicFactory: fakeFactory(client),
Resync: 0,
}
w, err := NewWatcher(cfg, rec.emit)
if err != nil {
t.Fatalf("NewWatcher: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
final, err := w.Watch(ctx)
if err != nil {
t.Fatalf("Watch: %v", err)
}
if final["cilium"] != StateInstalled {
t.Fatalf("expected installed, got %q", final["cilium"])
}
// Issue a status patch with the SAME conditions — must NOT emit a
// second event. (The watch already terminated above, but we do a
// best-effort check that during the run only one emit landed for
// cilium.)
events := rec.componentStateEvents()
if len(events) != 1 {
t.Errorf("expected exactly 1 emit for cilium (no transition de-dup), got %d: %+v", len(events), events)
}
}
// TestWatch_AllObservedTerminal_FivePhaseComponentEvents — captures
// the GATES requirement: a fake-clientset run produces 5 phase=
// component events. Five HelmReleases ending Installed = 5 phase=
// component events.
func TestWatch_AllObservedTerminal_FivePhaseComponentEvents(t *testing.T) {
scheme := newFakeScheme()
releases := []runtime.Object{
makeHelmRelease("bp-cilium", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
}),
makeHelmRelease("bp-cert-manager", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
}),
makeHelmRelease("bp-flux", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
}),
makeHelmRelease("bp-crossplane", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
}),
makeHelmRelease("bp-sealed-secrets", []metav1.Condition{
{Type: "Ready", Status: metav1.ConditionTrue, Reason: "ReconciliationSucceeded", Message: "Helm install succeeded"},
}),
}
client := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme,
map[schema.GroupVersionResource]string{HelmReleaseGVR: "HelmReleaseList"},
releases...,
)
rec := &recorder{}
cfg := Config{
KubeconfigYAML: "fake",
WatchTimeout: 5 * time.Second,
DynamicFactory: fakeFactory(client),
Resync: 0,
}
w, err := NewWatcher(cfg, rec.emit)
if err != nil {
t.Fatalf("NewWatcher: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
final, err := w.Watch(ctx)
if err != nil {
t.Fatalf("Watch: %v", err)
}
if got := len(final); got != 5 {
t.Errorf("expected 5 component states, got %d: %v", got, final)
}
events := rec.componentStateEvents()
if got := len(events); got != 5 {
t.Errorf("expected 5 phase=component events, got %d: %+v", got, events)
}
// Capture for the GATES sample — t.Logf streams to stdout under
// `go test -v` so the agent can paste these in the final report.
for _, ev := range events {
t.Logf("phase=%s component=%s state=%s level=%s message=%q",
ev.Phase, ev.Component, ev.State, ev.Level, ev.Message)
}
}
// ─────────────────────────────────────────────────────────────────────
// helpers
// ─────────────────────────────────────────────────────────────────────
// updateHR patches a HelmRelease's status.conditions in the fake
// client. The dynamic informer's UpdateFunc fires synchronously off
// this so the test can wait for the next event.
func updateHR(t *testing.T, client dynamic.Interface, name string, conds []metav1.Condition) {
t.Helper()
condMaps := make([]any, 0, len(conds))
for _, c := range conds {
condMaps = append(condMaps, map[string]any{
"type": c.Type,
"status": string(c.Status),
"reason": c.Reason,
"message": c.Message,
"lastTransitionTime": time.Now().UTC().Format(time.RFC3339),
})
}
patch := &unstructured.Unstructured{
Object: map[string]any{
"apiVersion": "helm.toolkit.fluxcd.io/v2",
"kind": "HelmRelease",
"metadata": map[string]any{
"name": name,
"namespace": FluxNamespace,
},
"spec": map[string]any{
"chart": map[string]any{
"spec": map[string]any{
"chart": name,
},
},
},
"status": map[string]any{
"conditions": condMaps,
},
},
}
_, err := client.Resource(HelmReleaseGVR).Namespace(FluxNamespace).Update(
t.Context(), patch, metav1.UpdateOptions{},
)
if err != nil {
t.Fatalf("updateHR(%q): %v", name, err)
}
}
// waitForCondition spins for up to d, checking cond every 10ms.
// Used to wait for the informer to deliver the next event without
// adding sleeps — keeps the test runtime tight even with -race.
func waitForCondition(t *testing.T, d time.Duration, cond func() bool) bool {
t.Helper()
deadline := time.Now().Add(d)
for time.Now().Before(deadline) {
if cond() {
return true
}
time.Sleep(10 * time.Millisecond)
}
return cond()
}
func countWithState(events []provisioner.Event, state string) int {
n := 0
for _, ev := range events {
if ev.State == state {
n++
}
}
return n
}
// containsSubsequence reports whether `sub` appears as a (possibly
// non-contiguous) subsequence of `full`. We use it for state-machine
// tests where extra intervening states (e.g. a duplicate pending
// emit) are tolerable but the order of distinct states must hold.
func containsSubsequence(full, sub []string) bool {
i := 0
for _, v := range full {
if i < len(sub) && v == sub[i] {
i++
}
}
return i == len(sub)
}

View File

@ -0,0 +1,49 @@
// Kubeconfig → client constructors.
//
// Production wires NewDynamicClientFromKubeconfig and
// NewKubernetesClientFromKubeconfig as the Config.DynamicFactory /
// Config.CoreFactory; tests inject closures that return a
// fake.NewSimpleDynamicClient / fake.NewSimpleClientset so no real
// cluster is needed.
package helmwatch
import (
"fmt"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
// NewDynamicClientFromKubeconfig builds a dynamic.Interface from raw
// kubeconfig YAML. The kubeconfig is the new Sovereign cluster's
// k3s.yaml (rewritten with the load-balancer's public IP — the
// in-VM 127.0.0.1 server URL is invariant the fetcher must rewrite,
// but that rewrite happens upstream in the Phase-0 fetch step, NOT
// here).
func NewDynamicClientFromKubeconfig(kubeconfigYAML string) (dynamic.Interface, error) {
cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfigYAML))
if err != nil {
return nil, fmt.Errorf("parse kubeconfig: %w", err)
}
dyn, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("dynamic.NewForConfig: %w", err)
}
return dyn, nil
}
// NewKubernetesClientFromKubeconfig builds a typed kubernetes.Interface
// from raw kubeconfig YAML. Used for Pod listing + log tailing on
// helm-controller in flux-system.
func NewKubernetesClientFromKubeconfig(kubeconfigYAML string) (kubernetes.Interface, error) {
cfg, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfigYAML))
if err != nil {
return nil, fmt.Errorf("parse kubeconfig: %w", err)
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("kubernetes.NewForConfig: %w", err)
}
return clientset, nil
}

View File

@ -0,0 +1,222 @@
// helm-controller log tailer.
//
// Tails the helm-controller Pod's logs in flux-system, parses each
// line for "<helmrelease-name> <release-namespace>" hints (the
// helm-controller logger always tags messages with the release it's
// working on), and emits one phase: "component-log" Event per line
// with Component set to the matched bp-* name.
//
// Why a stream not a one-shot Get: helm-controller's logs flow as
// long as it has work to do. The Sovereign Admin's Logs tab needs
// live tailing — a snapshot would miss every line emitted after the
// page loaded.
//
// Why one tailer for all components instead of per-component
// streams: helm-controller is a single Deployment with a single Pod.
// Asking the apiserver for N parallel log streams against the same
// Pod just multiplies the bytes off the wire. We attach once, parse
// each line for the bp-* token, and route in-process.
package helmwatch
import (
"bufio"
"context"
"io"
"regexp"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/provisioner"
)
// helmControllerNameRe — extracts the bp-<name> token from a
// helm-controller log line. helm-controller's log format varies by
// version + the configured logger; we observe two stable shapes
// against flux v2.4 (the version Catalyst-Zero pins):
//
// - logr/klog text: `... helmrelease="flux-system/bp-cilium" ...`
// - structured JSON: `... "helmrelease":"flux-system/bp-cilium" ...`
//
// The regex tolerates either separator (`=` or `":` with surrounding
// quotes) and either casing of the key. A future helm-controller
// release that switches to a third shape lands here as a test
// failure on the structured/text fixtures in logtailer_test.go.
var helmControllerNameRe = regexp.MustCompile(
`(?:helmrelease|HelmRelease)["']?\s*[:=]\s*["']?` +
regexp.QuoteMeta(FluxNamespace) + `/(bp-[a-z0-9-]+)`,
)
type logTailer struct {
client kubernetes.Interface
emit Emit
now func() time.Time
}
func newLogTailer(client kubernetes.Interface, emit Emit, now func() time.Time) *logTailer {
return &logTailer{
client: client,
emit: emit,
now: now,
}
}
// run finds the helm-controller Pod and follows its logs until ctx
// fires. On Pod restart we re-discover and re-attach (helm-controller
// is a single-replica Deployment by default — restarts during Phase 1
// are rare but possible).
func (t *logTailer) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}
pod, err := t.findHelmControllerPod(ctx)
if err != nil {
// Backoff and retry. helm-controller is part of bp-flux,
// so it's possible we attach before bp-flux installed —
// the watch loop's other path (the dynamic informer) is
// still running, this side just waits.
t.sleep(ctx, 5*time.Second)
continue
}
if err := t.tailPod(ctx, pod); err != nil && ctx.Err() == nil {
// Tail closed but ctx is still live — Pod likely got
// rescheduled. Reattach.
t.sleep(ctx, 2*time.Second)
}
}
}
func (t *logTailer) findHelmControllerPod(ctx context.Context) (*corev1.Pod, error) {
pods, err := t.client.CoreV1().Pods(FluxNamespace).List(ctx, metav1.ListOptions{
LabelSelector: HelmControllerSelector,
})
if err != nil {
return nil, err
}
for i := range pods.Items {
p := &pods.Items[i]
if p.Status.Phase == corev1.PodRunning {
return p, nil
}
}
return nil, errorPodNotReady
}
// tailPod opens a follow=true log stream against the pod and pumps
// each line through the emit callback as a phase: "component-log"
// Event keyed by the bp-* token in the line.
func (t *logTailer) tailPod(ctx context.Context, pod *corev1.Pod) error {
req := t.client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{
Follow: true,
TailLines: ptrInt64(0),
})
stream, err := req.Stream(ctx)
if err != nil {
return err
}
defer stream.Close()
return t.pumpLines(ctx, stream)
}
// pumpLines is split out so tests can drive the parser on a raw
// io.Reader without standing up a Pod.
//
// Context handling: bufio.Scanner.Scan() blocks on the underlying
// reader, so an in-flight scanner cannot poll ctx.Done() between
// lines. We spawn a watchdog that closes the stream when ctx fires,
// which causes Scan to return false at the next read. If the stream
// is already an io.ReadCloser (the kubernetes log stream is — see
// CoreV1().Pods().GetLogs().Stream), we close that side; if not (the
// test passes a strings.Reader for example), we still respect ctx
// best-effort by checking ctx.Done() between lines, accepting that a
// quiet stream will lag behind ctx by one read.
func (t *logTailer) pumpLines(ctx context.Context, stream io.Reader) error {
if closer, ok := stream.(io.Closer); ok {
stop := make(chan struct{})
defer close(stop)
go func() {
select {
case <-ctx.Done():
_ = closer.Close()
case <-stop:
}
}()
}
scanner := bufio.NewScanner(stream)
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
for scanner.Scan() {
select {
case <-ctx.Done():
return nil
default:
}
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
match := helmControllerNameRe.FindStringSubmatch(line)
if len(match) < 2 {
// Not associated with a bp-* HelmRelease — skip. The
// Sovereign Admin's Logs tab filters by component, so
// noise from the helm-controller's leader-election or
// startup chatter would render as "logs for no
// component," which is not useful.
continue
}
componentID := ComponentIDFromHelmRelease(match[1])
t.emit(provisioner.Event{
Time: t.now().UTC().Format(time.RFC3339),
Phase: PhaseComponentLog,
Level: levelFromLogLine(line),
Component: componentID,
Message: line,
})
}
// scanner.Err() returns nil on EOF or a closed stream (the
// watchdog's path); a non-nil error is a real read failure.
return scanner.Err()
}
// levelFromLogLine — coarse classifier. helm-controller uses
// logr-style level tags `level=info` / `level=error`; we surface
// error and warn explicitly, default to info.
func levelFromLogLine(line string) string {
low := strings.ToLower(line)
switch {
case strings.Contains(low, "level=error"), strings.Contains(low, `"level":"error"`):
return "error"
case strings.Contains(low, "level=warn"), strings.Contains(low, `"level":"warn"`):
return "warn"
default:
return "info"
}
}
func (t *logTailer) sleep(ctx context.Context, d time.Duration) {
select {
case <-ctx.Done():
case <-time.After(d):
}
}
func ptrInt64(v int64) *int64 { return &v }
// errorPodNotReady — sentinel returned when no Running helm-controller
// Pod is in flux-system yet (early in Phase 1, before bp-flux installs).
// The tailer's outer loop treats this as retryable.
type podNotReadyError struct{}
func (podNotReadyError) Error() string {
return "helm-controller pod not yet running in flux-system"
}
var errorPodNotReady = podNotReadyError{}

View File

@ -0,0 +1,133 @@
// Tests for the helm-controller log tailer line parser.
//
// The tailer's run() loop is hard to test without a real Pod (it
// owns the kubernetes.Interface log stream lifecycle), but the
// pumpLines() method is split out specifically so we can drive it
// against an in-memory io.Reader and prove the line→Event mapping
// is correct.
package helmwatch
import (
"context"
"io"
"strings"
"testing"
"time"
)
func TestPumpLines_ExtractsBPNameAndEmitsComponentLog(t *testing.T) {
rec := &recorder{}
tailer := newLogTailer(nil, rec.emit, time.Now)
input := strings.Join([]string{
`{"level":"info","ts":"2026-04-29T10:00:00Z","logger":"controllers.HelmRelease","msg":"running install","helmrelease":"flux-system/bp-cilium"}`,
`{"level":"error","ts":"2026-04-29T10:00:05Z","msg":"chart pull failed","HelmRelease":"flux-system/bp-cert-manager"}`,
`{"level":"info","msg":"leader election lost"}`, // no bp- token → must be skipped
`{"level":"warn","msg":"reconcile took too long","helmrelease":"flux-system/bp-keycloak"}`,
}, "\n") + "\n"
if err := tailer.pumpLines(context.Background(), strings.NewReader(input)); err != nil {
t.Fatalf("pumpLines: %v", err)
}
events := rec.snapshot()
if got, want := len(events), 3; got != want {
t.Fatalf("expected %d events (3 with bp-*, 1 leader-election skipped), got %d:\n%+v", want, got, events)
}
// Component derivation + level classification.
wantByComponent := map[string]struct {
level string
state string
}{
"cilium": {level: "info", state: ""},
"cert-manager": {level: "error", state: ""},
"keycloak": {level: "warn", state: ""},
}
for _, ev := range events {
if ev.Phase != PhaseComponentLog {
t.Errorf("expected Phase=%q, got %q", PhaseComponentLog, ev.Phase)
}
want, ok := wantByComponent[ev.Component]
if !ok {
t.Errorf("unexpected component in event: %q", ev.Component)
continue
}
if ev.Level != want.level {
t.Errorf("component %q: level=%q, want %q (line=%q)", ev.Component, ev.Level, want.level, ev.Message)
}
if ev.State != want.state {
t.Errorf("component %q: State=%q, want empty (component-log carries log level not state)", ev.Component, ev.State)
}
if ev.Message == "" {
t.Errorf("component %q: empty Message, want raw log line", ev.Component)
}
}
}
func TestPumpLines_ContextCancelStopsScan(t *testing.T) {
rec := &recorder{}
tailer := newLogTailer(nil, rec.emit, time.Now)
// Reader that yields one line then blocks forever, simulating a
// follow=true log stream against a quiet Pod. Context cancel must
// release pumpLines.
r, w := io.Pipe()
go func() {
_, _ = w.Write([]byte("level=info msg=startup helmrelease=\"flux-system/bp-flux\"\n"))
// keep the pipe open
}()
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
defer w.Close()
done := make(chan error, 1)
go func() { done <- tailer.pumpLines(ctx, r) }()
select {
case err := <-done:
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
// Scanner.Err() returns nil on EOF / cancel — anything
// else is unexpected.
t.Logf("pumpLines returned %v (may be benign)", err)
}
case <-time.After(2 * time.Second):
t.Fatalf("pumpLines did not return after context cancel")
}
events := rec.snapshot()
// We may get 0 or 1 event depending on whether the scanner read
// the first line before ctx cancel. Both are correct. The hard
// requirement is that pumpLines RETURNED.
if len(events) > 1 {
t.Errorf("expected at most 1 event before cancel, got %d: %+v", len(events), events)
}
// And that any event we did get was Phase=component-log for flux.
for _, ev := range events {
if ev.Phase != PhaseComponentLog || ev.Component != "flux" {
t.Errorf("unexpected event before cancel: %+v", ev)
}
}
}
// TestLevelFromLogLine — pure helper coverage so a future change to
// helm-controller's log format shows up as a test diff.
func TestLevelFromLogLine(t *testing.T) {
cases := map[string]string{
`level=info msg=hello`: "info",
`level=warn msg=slow`: "warn",
`level=error msg=failed`: "error",
`{"level":"error","msg":"chart load failed"}`: "error",
`{"level":"warn","msg":"retry"}`: "warn",
`{"level":"info","msg":"ok"}`: "info",
`some legacy plain line with no level`: "info",
}
for in, want := range cases {
got := levelFromLogLine(in)
if got != want {
t.Errorf("levelFromLogLine(%q) = %q, want %q", in, got, want)
}
}
}

View File

@ -168,20 +168,92 @@ func (r *Request) Validate() error {
}
// Event is a single progress event streamed back to the wizard via SSE.
//
// Component / State are populated for Phase-1 component events emitted by
// the HelmRelease watch loop (internal/helmwatch). For Phase-0 OpenTofu
// events these stay empty so the existing wire format is unchanged — no
// existing field is removed or renamed; only two optional fields are
// added. The Admin shell's "logs filtered by event.component === id"
// path keys off Component; the per-app status pill keys off State.
//
// State semantics (Phase-1 watch only):
//
// - "pending" — HelmRelease appeared in the cluster but Ready
// condition not yet observed, OR Ready=False with a
// `dependency 'X' is not ready` message (the
// component is waiting upstream of itself)
// - "installing" — Ready=Unknown, or Ready=False with reason
// `Progressing` / message `Reconciliation in progress`
// - "installed" — Ready=True
// - "degraded" — Ready=True transitioned to Ready=False without
// InstallFailed/UpgradeFailed (a healthy install
// that lost readiness post-install)
// - "failed" — Ready=False with reason InstallFailed /
// UpgradeFailed / ChartPullError /
// ArtifactFailed (the install actually broke,
// not waiting on deps)
//
// For phase: "component-log" events, Component is set, State is empty,
// Level carries the helm-controller log level, and Message is the raw
// log line.
type Event struct {
Time string `json:"time"`
Phase string `json:"phase"`
Level string `json:"level"` // info | warn | error
Message string `json:"message"`
// Component is the normalised component id for Phase-1 events
// ("bp-cilium" → "cilium"). Empty for Phase-0 OpenTofu events.
Component string `json:"component,omitempty"`
// State is one of pending|installing|installed|degraded|failed for
// phase: "component" events; empty for Phase-0 events and for
// phase: "component-log" events (which carry the original log
// level instead).
State string `json:"state,omitempty"`
}
// Result captures the OpenTofu outputs the wizard's success screen needs.
// Result captures the OpenTofu outputs the wizard's success screen needs
// PLUS the Phase-1 component watch terminal state.
//
// ComponentStates and Phase1FinishedAt are populated by the HelmRelease
// watch loop in internal/helmwatch. They are the durable per-component
// outcome the Admin shell renders ("X of Y components installed") long
// after the live SSE stream has closed.
//
// Kubeconfig holds the new Sovereign's k3s kubeconfig (raw YAML). It is
// populated at the end of Phase 0 (out-of-band kubeconfig fetch) so the
// HelmRelease watch loop, the wizard's "Download kubeconfig" button, and
// the operator's GET /api/v1/deployments/<id>/kubeconfig all consume the
// same source. The kubeconfig is rotated to a SPIFFE-issued identity in
// Phase 2 — by then this field's role narrows to "first-time bootstrap
// only," but the storage shape stays.
type Result struct {
SovereignFQDN string `json:"sovereignFQDN"`
ControlPlaneIP string `json:"controlPlaneIP"`
LoadBalancerIP string `json:"loadBalancerIP"`
ConsoleURL string `json:"consoleURL"`
GitOpsRepoURL string `json:"gitopsRepoURL"`
// Kubeconfig — raw YAML. Empty until the post-tofu-output fetch
// populates it. Persisted to the per-deployment store record so a
// catalyst-api Pod restart does not lose access to the new
// Sovereign cluster the previous Pod was watching. Per
// docs/INVIOLABLE-PRINCIPLES.md #10 (credential hygiene), the
// store directory is 0o700 owned by the catalyst-api process UID
// — the kubeconfig never touches a wider permission domain than
// other per-deployment artefacts already on the same PVC.
Kubeconfig string `json:"kubeconfig,omitempty"`
// ComponentStates — final state of every bp-* HelmRelease the
// Phase-1 watch observed, keyed by normalised component id. Set
// when the watch loop terminates (all-installed, all-installed-or-
// failed, or timeout).
ComponentStates map[string]string `json:"componentStates,omitempty"`
// Phase1FinishedAt — UTC timestamp the watch loop terminated.
// nil while Phase 1 is in flight or has not started.
Phase1FinishedAt *time.Time `json:"phase1FinishedAt,omitempty"`
}
// Provisioner runs `tofu init && tofu apply` against the canonical