From 10d1517ee994cd0620ab145516f4a0840011f25d Mon Sep 17 00:00:00 2001 From: Raphael Philipe Mendes da Silva Date: Fri, 8 Nov 2024 16:24:41 -0800 Subject: [PATCH] Don't propagate cancel signal to the Prometheus rules manager context (#6326) * Don't propagate cancel signal to the Prometheus rules manager context This change allows the rules that are still executing queries to complete before cortex if sully shutdown. Signed-off-by: Raphael Silva * Make ruler unit tests to run faster Signed-off-by: Raphael Silva * Avoid tests to fail due to race condition Use atomic counter to keep track of the successful queries Signed-off-by: Raphael Silva --------- Signed-off-by: Raphael Silva --- CHANGELOG.md | 1 + pkg/ruler/api_test.go | 8 +-- pkg/ruler/compat.go | 6 +- pkg/ruler/ruler_test.go | 136 ++++++++++++++++++++++++++++++++++- pkg/ruler/store_mock_test.go | 2 +- 5 files changed, 146 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1832fc50a1..4e77b62020 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276 * [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 +* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326 ## 1.18.1 2024-10-14 diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 3ef611a47e..46477f47e2 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -67,7 +67,7 @@ func TestRuler_rules(t *testing.T) { Alerts: []*Alert{}, }, }, - Interval: 60, + Interval: 10, }, }, }, @@ -123,7 +123,7 @@ func TestRuler_rules_special_characters(t *testing.T) { Alerts: []*Alert{}, }, }, - Interval: 60, + Interval: 10, }, }, }, @@ -178,7 +178,7 @@ func TestRuler_rules_limit(t *testing.T) { Alerts: []*Alert{}, }, }, - Interval: 60, + Interval: 10, }, }, }, @@ -342,7 +342,7 @@ func TestRuler_DeleteNamespace(t *testing.T) { router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) - require.Equal(t, "name: group1\ninterval: 1m\nrules:\n - record: UP_RULE\n expr: up\n - alert: UP_ALERT\n expr: up < 1\n", w.Body.String()) + require.Equal(t, "name: group1\ninterval: 10s\nrules:\n - record: UP_RULE\n expr: up\n - alert: UP_ALERT\n expr: up < 1\n", w.Body.String()) // Delete namespace1 req = requestFor(t, http.MethodDelete, "https://localhost:8080/api/v1/rules/namespace1", nil, "user1") diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 235c07d41c..b370e34d20 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -341,11 +341,15 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi queryFunc = metricsQueryFunc } + // We let the Prometheus rules manager control the context so that there is a chance + // for graceful shutdown of rules that are still in execution even in case the cortex context is canceled. + prometheusContext := user.InjectOrgID(context.WithoutCancel(ctx), userID) + return rules.NewManager(&rules.ManagerOptions{ Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites), Queryable: q, QueryFunc: queryFunc, - Context: user.InjectOrgID(ctx, userID), + Context: prometheusContext, ExternalURL: cfg.ExternalURL.URL, NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()), Logger: log.With(logger, "user", userID), diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 8519fa9d90..8daf23e7b6 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -138,6 +138,55 @@ func (e emptyQuerier) Select(ctx context.Context, sortSeries bool, hints *storag return storage.EmptySeriesSet() } +func fixedQueryable(querier storage.Querier) storage.Queryable { + return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { + return querier, nil + }) +} + +type blockingQuerier struct { + queryStarted chan struct{} + queryFinished chan struct{} + queryBlocker chan struct{} + successfulQueries *atomic.Int64 +} + +func (s *blockingQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (s *blockingQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (s *blockingQuerier) Close() error { + return nil +} + +func (s *blockingQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (returnSeries storage.SeriesSet) { + select { + case <-s.queryStarted: + default: + close(s.queryStarted) + } + + select { + case <-ctx.Done(): + returnSeries = storage.ErrSeriesSet(ctx.Err()) + case <-s.queryBlocker: + s.successfulQueries.Add(1) + returnSeries = storage.EmptySeriesSet() + } + + select { + case <-s.queryFinished: + default: + close(s.queryFinished) + } + + return returnSeries +} + func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Registerer, logger log.Logger) storage.QueryableFunc { if querierTestConfig != nil { // disable active query tracking for test @@ -158,10 +207,15 @@ func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Reg func testSetup(t *testing.T, querierTestConfig *querier.TestConfig) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, prometheus.Registerer) { tracker := promql.NewActiveQueryTracker(t.TempDir(), 20, log.NewNopLogger()) + timeout := time.Minute * 2 + + if querierTestConfig != nil && querierTestConfig.Cfg.Timeout != 0 { + timeout = querierTestConfig.Cfg.Timeout + } engine := promql.NewEngine(promql.EngineOpts{ MaxSamples: 1e6, ActiveQueryTracker: tracker, - Timeout: 2 * time.Minute, + Timeout: timeout, }) // Mock the pusher @@ -322,6 +376,86 @@ func TestNotifierSendsUserIDHeader(t *testing.T) { `), "prometheus_notifications_dropped_total")) } +func TestRuler_TestShutdown(t *testing.T) { + tests := []struct { + name string + shutdownFn func(*blockingQuerier, *Ruler) + }{ + { + name: "successful query after shutdown", + shutdownFn: func(querier *blockingQuerier, ruler *Ruler) { + // Wait query to start + <-querier.queryStarted + + // The following cancel the context of the ruler service. + ruler.StopAsync() + + // Simulate the completion of the query + close(querier.queryBlocker) + + // Wait query to finish + <-querier.queryFinished + + require.GreaterOrEqual(t, querier.successfulQueries.Load(), int64(1), "query failed to complete successfully failed to complete") + }, + }, + { + name: "query timeout while shutdown", + shutdownFn: func(querier *blockingQuerier, ruler *Ruler) { + // Wait query to start + <-querier.queryStarted + + // The following cancel the context of the ruler service. + ruler.StopAsync() + + // Wait query to finish + <-querier.queryFinished + + require.Equal(t, querier.successfulQueries.Load(), int64(0), "query should not be succesfull") + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + store := newMockRuleStore(mockRules, nil) + cfg := defaultRulerConfig(t) + mockQuerier := &blockingQuerier{ + queryBlocker: make(chan struct{}), + queryStarted: make(chan struct{}), + queryFinished: make(chan struct{}), + successfulQueries: atomic.NewInt64(0), + } + sleepQueriable := fixedQueryable(mockQuerier) + + d := &querier.MockDistributor{} + + d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + &client.QueryStreamResponse{ + Chunkseries: []client.TimeSeriesChunk{}, + }, nil) + d.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Panic("This should not be called for the ruler use-cases.") + + r := newTestRuler(t, cfg, store, &querier.TestConfig{ + Distributor: d, + Stores: []querier.QueryableWithFilter{ + querier.UseAlwaysQueryable(sleepQueriable), + }, + Cfg: querier.Config{Timeout: time.Second * 1}, + }) + + test.shutdownFn(mockQuerier, r) + + err := r.AwaitTerminated(context.Background()) + require.NoError(t, err) + + e := r.FailureCase() + require.NoError(t, e) + }) + } + +} + func TestRuler_Rules(t *testing.T) { store := newMockRuleStore(mockRules, nil) cfg := defaultRulerConfig(t) diff --git a/pkg/ruler/store_mock_test.go b/pkg/ruler/store_mock_test.go index 30a53fdab1..75f38432a4 100644 --- a/pkg/ruler/store_mock_test.go +++ b/pkg/ruler/store_mock_test.go @@ -19,7 +19,7 @@ type mockRuleStore struct { var ( delim = "/" - interval, _ = time.ParseDuration("1m") + interval, _ = time.ParseDuration("10s") mockRulesNamespaces = map[string]rulespb.RuleGroupList{ "user1": { &rulespb.RuleGroupDesc{