-
Notifications
You must be signed in to change notification settings - Fork 47
WIP: OCPBUGS-62517 ha replicas #709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,9 @@ | ||
| package serverutil | ||
|
|
||
| import ( | ||
| "context" | ||
| "crypto/tls" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "net" | ||
|
|
@@ -13,7 +15,7 @@ import ( | |
| "github.com/klauspost/compress/gzhttp" | ||
| ctrl "sigs.k8s.io/controller-runtime" | ||
| "sigs.k8s.io/controller-runtime/pkg/certwatcher" | ||
| "sigs.k8s.io/controller-runtime/pkg/manager" | ||
| "sigs.k8s.io/controller-runtime/pkg/healthz" | ||
|
|
||
| catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics" | ||
| "github.com/operator-framework/operator-controller/internal/catalogd/storage" | ||
|
|
@@ -27,49 +29,115 @@ type CatalogServerConfig struct { | |
| LocalStorage storage.Instance | ||
| } | ||
|
|
||
| func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFileWatcher *certwatcher.CertWatcher) error { | ||
| listener, err := net.Listen("tcp", cfg.CatalogAddr) | ||
| // AddCatalogServerToManager adds the catalog HTTP server to the manager and registers | ||
| // a readiness check that passes once the server has started serving. Because | ||
| // NeedLeaderElection returns false, Start() is called on every pod immediately, so all | ||
| // replicas bind the catalog port and become ready. Non-leader pods serve requests but | ||
| // return 404 (empty local cache); callers are expected to retry. | ||
| func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, cw *certwatcher.CertWatcher) error { | ||
| shutdownTimeout := 30 * time.Second | ||
| r := &catalogServerRunnable{ | ||
| cfg: cfg, | ||
| cw: cw, | ||
| server: &http.Server{ | ||
| Addr: cfg.CatalogAddr, | ||
| Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg), | ||
| ReadTimeout: 5 * time.Second, | ||
| WriteTimeout: 5 * time.Minute, | ||
| }, | ||
| shutdownTimeout: shutdownTimeout, | ||
| ready: make(chan struct{}), | ||
| } | ||
|
|
||
| if err := mgr.Add(r); err != nil { | ||
| return fmt.Errorf("error adding catalog server to manager: %w", err) | ||
| } | ||
|
|
||
| // Register a readiness check that passes once Start() has been called and the | ||
| // server is actively serving. All pods reach Start() (NeedLeaderElection=false), | ||
| // so all replicas become ready and receive traffic; non-leaders return 404 until | ||
| // they win the leader lease and populate their local cache. | ||
| if err := mgr.AddReadyzCheck("catalog-server", r.readyzCheck()); err != nil { | ||
| return fmt.Errorf("error adding catalog server readiness check: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // catalogServerRunnable is a leader-only Runnable that binds the catalog HTTP port | ||
| // lazily inside Start(), so non-leader pods never hold the listen socket. | ||
| type catalogServerRunnable struct { | ||
| cfg CatalogServerConfig | ||
| cw *certwatcher.CertWatcher | ||
| server *http.Server | ||
| shutdownTimeout time.Duration | ||
| // ready is closed by Start() once the server is about to begin serving. | ||
| ready chan struct{} | ||
| } | ||
|
|
||
| // NeedLeaderElection returns false so the catalog server starts on every pod | ||
| // immediately, regardless of leadership. This is required for rolling updates: | ||
| // if Start() were gated on leadership, a new pod could not win the leader lease | ||
| // (held by the still-running old pod) and therefore could never pass the | ||
| // catalog-server readiness check, deadlocking the rollout. | ||
| // | ||
| // Non-leader pods serve the catalog HTTP port but have an empty local cache | ||
| // (only the leader's reconciler downloads catalog content), so requests to a | ||
| // non-leader return 404. Callers are expected to retry. | ||
| func (r *catalogServerRunnable) NeedLeaderElection() bool { return false } | ||
|
Comment on lines
+78
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Search for catalog client code and retry/backoff patterns
rg -n -A5 'catalogd-service|/api/v1|ClusterCatalog' --type go | head -100
# Look for retry logic in catalog client
ast-grep --pattern $'func $FUNC($$$) {
$$$
retry($$$)
$$$
}'Repository: openshift/operator-framework-operator-controller Length of output: 6971 🏁 Script executed: #!/bin/bash
# Find files related to catalog client implementation
fd -e go | xargs rg -l 'catalogd' | grep -E '(client|catalog)' | head -20
# Search for HTTP client code making requests to catalogd
rg -n 'http.*Get|http\.Client' --type go | grep -i catalog | head -20
# Look for where catalog data is fetched
rg -n 'catalogd-service|/api/v1' --type go --no-heading | head -30Repository: openshift/operator-framework-operator-controller Length of output: 7441 🏁 Script executed: #!/bin/bash
# Look for the actual catalog client implementation
find . -path ./test -prune -o -name '*catalog*.go' -type f -print | head -20
# Search for retry logic and backoff patterns
rg -n 'retry|backoff|exponential|MaxRetries' --type go | head -30Repository: openshift/operator-framework-operator-controller Length of output: 5753 🏁 Script executed: #!/bin/bash
# Search in internal/operator-controller for catalog fetching logic
rg -n 'catalogd\|/api/v1/catalogs\|ClusterCatalog' internal/operator-controller --type go -B2 -A5 | head -100Repository: openshift/operator-framework-operator-controller Length of output: 74 🏁 Script executed: #!/bin/bash
# Read the catalog client implementation
cat -n internal/operator-controller/catalogmetadata/client/client.go
# Also check the test file to understand expected behavior
cat -n internal/operator-controller/catalogmetadata/client/client_test.go | head -300Repository: openshift/operator-framework-operator-controller Length of output: 17978 Client lacks retry logic for 404 responses from non-leader pods. The catalog client in 🤖 Prompt for AI Agents |
||
|
|
||
| func (r *catalogServerRunnable) Start(ctx context.Context) error { | ||
| listener, err := net.Listen("tcp", r.cfg.CatalogAddr) | ||
| if err != nil { | ||
| return fmt.Errorf("error creating catalog server listener: %w", err) | ||
| } | ||
|
|
||
| if cfg.CertFile != "" && cfg.KeyFile != "" { | ||
| // Use the passed certificate watcher instead of creating a new one | ||
| if r.cfg.CertFile != "" && r.cfg.KeyFile != "" { | ||
| config := &tls.Config{ | ||
| GetCertificate: tlsFileWatcher.GetCertificate, | ||
| GetCertificate: r.cw.GetCertificate, | ||
| MinVersion: tls.VersionTLS12, | ||
| } | ||
| listener = tls.NewListener(listener, config) | ||
| } | ||
|
|
||
| shutdownTimeout := 30 * time.Second | ||
| catalogServer := manager.Server{ | ||
| Name: "catalogs", | ||
| OnlyServeWhenLeader: true, | ||
| Server: &http.Server{ | ||
| Addr: cfg.CatalogAddr, | ||
| Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg), | ||
| ReadTimeout: 5 * time.Second, | ||
| // TODO: Revert this to 10 seconds if/when the API | ||
| // evolves to have significantly smaller responses | ||
| WriteTimeout: 5 * time.Minute, | ||
| }, | ||
| ShutdownTimeout: &shutdownTimeout, | ||
| Listener: listener, | ||
| } | ||
| // Signal readiness before blocking on Serve so the readiness probe passes promptly. | ||
| close(r.ready) | ||
|
|
||
| err = mgr.Add(&catalogServer) | ||
| if err != nil { | ||
| return fmt.Errorf("error adding catalog server to manager: %w", err) | ||
| } | ||
| go func() { | ||
| <-ctx.Done() | ||
| shutdownCtx := context.Background() | ||
| if r.shutdownTimeout > 0 { | ||
| var cancel context.CancelFunc | ||
| shutdownCtx, cancel = context.WithTimeout(shutdownCtx, r.shutdownTimeout) | ||
| defer cancel() | ||
| } | ||
| if err := r.server.Shutdown(shutdownCtx); err != nil { | ||
| // Shutdown errors (e.g. context deadline exceeded) are not actionable; | ||
| // the process is terminating regardless. | ||
| _ = err | ||
| } | ||
| }() | ||
|
|
||
| if err := r.server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // readyzCheck returns a healthz.Checker that passes once Start() has been called. | ||
| func (r *catalogServerRunnable) readyzCheck() healthz.Checker { | ||
| return func(_ *http.Request) error { | ||
| select { | ||
| case <-r.ready: | ||
| return nil | ||
| default: | ||
| return fmt.Errorf("catalog server not yet started") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler { | ||
| return handlers.CustomLoggingHandler(nil, handler, func(_ io.Writer, params handlers.LogFormatterParams) { | ||
| // extract parameters used in apache common log format, but then log using `logr` to remain consistent | ||
| // with other loggers used in this codebase. | ||
| username := "-" | ||
| if params.URL.User != nil { | ||
| if name := params.URL.User.Username(); name != "" { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stale comment contradicts implementation.
The comment says "leader-only Runnable" but
NeedLeaderElection()returnsfalseat line 87, meaning this runs on all pods. The comment should be updated to match the implementation.📝 Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents