Skip to content

Commit

Permalink
Don't propagate cancel signal to the Prometheus rules manager context (
Browse files Browse the repository at this point in the history
…#6326)

* Don't propagate cancel signal to the Prometheus rules manager context

This change allows the rules that are still  executing queries to complete
before cortex if sully shutdown.

Signed-off-by: Raphael Silva <rapphil@gmail.com>

* Make ruler unit tests to run faster

Signed-off-by: Raphael Silva <rapphil@gmail.com>

* Avoid tests to fail due to race condition

Use atomic counter to keep track of the successful queries

Signed-off-by: Raphael Silva <rapphil@gmail.com>

---------

Signed-off-by: Raphael Silva <rapphil@gmail.com>
  • Loading branch information
rapphil authored Nov 9, 2024
1 parent db4ec12 commit 10d1517
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276
* [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326

## 1.18.1 2024-10-14

Expand Down
8 changes: 4 additions & 4 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestRuler_rules(t *testing.T) {
Alerts: []*Alert{},
},
},
Interval: 60,
Interval: 10,
},
},
},
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestRuler_rules_special_characters(t *testing.T) {
Alerts: []*Alert{},
},
},
Interval: 60,
Interval: 10,
},
},
},
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestRuler_rules_limit(t *testing.T) {
Alerts: []*Alert{},
},
},
Interval: 60,
Interval: 10,
},
},
},
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestRuler_DeleteNamespace(t *testing.T) {

router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
require.Equal(t, "name: group1\ninterval: 1m\nrules:\n - record: UP_RULE\n expr: up\n - alert: UP_ALERT\n expr: up < 1\n", w.Body.String())
require.Equal(t, "name: group1\ninterval: 10s\nrules:\n - record: UP_RULE\n expr: up\n - alert: UP_ALERT\n expr: up < 1\n", w.Body.String())

// Delete namespace1
req = requestFor(t, http.MethodDelete, "https://localhost:8080/api/v1/rules/namespace1", nil, "user1")
Expand Down
6 changes: 5 additions & 1 deletion pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,15 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
queryFunc = metricsQueryFunc
}

// We let the Prometheus rules manager control the context so that there is a chance
// for graceful shutdown of rules that are still in execution even in case the cortex context is canceled.
prometheusContext := user.InjectOrgID(context.WithoutCancel(ctx), userID)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
Queryable: q,
QueryFunc: queryFunc,
Context: user.InjectOrgID(ctx, userID),
Context: prometheusContext,
ExternalURL: cfg.ExternalURL.URL,
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
Logger: log.With(logger, "user", userID),
Expand Down
136 changes: 135 additions & 1 deletion pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,55 @@ func (e emptyQuerier) Select(ctx context.Context, sortSeries bool, hints *storag
return storage.EmptySeriesSet()
}

func fixedQueryable(querier storage.Querier) storage.Queryable {
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
return querier, nil
})
}

type blockingQuerier struct {
queryStarted chan struct{}
queryFinished chan struct{}
queryBlocker chan struct{}
successfulQueries *atomic.Int64
}

func (s *blockingQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}

func (s *blockingQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}

func (s *blockingQuerier) Close() error {
return nil
}

func (s *blockingQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (returnSeries storage.SeriesSet) {
select {
case <-s.queryStarted:
default:
close(s.queryStarted)
}

select {
case <-ctx.Done():
returnSeries = storage.ErrSeriesSet(ctx.Err())
case <-s.queryBlocker:
s.successfulQueries.Add(1)
returnSeries = storage.EmptySeriesSet()
}

select {
case <-s.queryFinished:
default:
close(s.queryFinished)
}

return returnSeries
}

func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Registerer, logger log.Logger) storage.QueryableFunc {
if querierTestConfig != nil {
// disable active query tracking for test
Expand All @@ -158,10 +207,15 @@ func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Reg
func testSetup(t *testing.T, querierTestConfig *querier.TestConfig) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, prometheus.Registerer) {
tracker := promql.NewActiveQueryTracker(t.TempDir(), 20, log.NewNopLogger())

timeout := time.Minute * 2

if querierTestConfig != nil && querierTestConfig.Cfg.Timeout != 0 {
timeout = querierTestConfig.Cfg.Timeout
}
engine := promql.NewEngine(promql.EngineOpts{
MaxSamples: 1e6,
ActiveQueryTracker: tracker,
Timeout: 2 * time.Minute,
Timeout: timeout,
})

// Mock the pusher
Expand Down Expand Up @@ -322,6 +376,86 @@ func TestNotifierSendsUserIDHeader(t *testing.T) {
`), "prometheus_notifications_dropped_total"))
}

func TestRuler_TestShutdown(t *testing.T) {
tests := []struct {
name string
shutdownFn func(*blockingQuerier, *Ruler)
}{
{
name: "successful query after shutdown",
shutdownFn: func(querier *blockingQuerier, ruler *Ruler) {
// Wait query to start
<-querier.queryStarted

// The following cancel the context of the ruler service.
ruler.StopAsync()

// Simulate the completion of the query
close(querier.queryBlocker)

// Wait query to finish
<-querier.queryFinished

require.GreaterOrEqual(t, querier.successfulQueries.Load(), int64(1), "query failed to complete successfully failed to complete")
},
},
{
name: "query timeout while shutdown",
shutdownFn: func(querier *blockingQuerier, ruler *Ruler) {
// Wait query to start
<-querier.queryStarted

// The following cancel the context of the ruler service.
ruler.StopAsync()

// Wait query to finish
<-querier.queryFinished

require.Equal(t, querier.successfulQueries.Load(), int64(0), "query should not be succesfull")
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := newMockRuleStore(mockRules, nil)
cfg := defaultRulerConfig(t)
mockQuerier := &blockingQuerier{
queryBlocker: make(chan struct{}),
queryStarted: make(chan struct{}),
queryFinished: make(chan struct{}),
successfulQueries: atomic.NewInt64(0),
}
sleepQueriable := fixedQueryable(mockQuerier)

d := &querier.MockDistributor{}

d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{},
}, nil)
d.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Panic("This should not be called for the ruler use-cases.")

r := newTestRuler(t, cfg, store, &querier.TestConfig{
Distributor: d,
Stores: []querier.QueryableWithFilter{
querier.UseAlwaysQueryable(sleepQueriable),
},
Cfg: querier.Config{Timeout: time.Second * 1},
})

test.shutdownFn(mockQuerier, r)

err := r.AwaitTerminated(context.Background())
require.NoError(t, err)

e := r.FailureCase()
require.NoError(t, e)
})
}

}

func TestRuler_Rules(t *testing.T) {
store := newMockRuleStore(mockRules, nil)
cfg := defaultRulerConfig(t)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/store_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type mockRuleStore struct {

var (
delim = "/"
interval, _ = time.ParseDuration("1m")
interval, _ = time.ParseDuration("10s")
mockRulesNamespaces = map[string]rulespb.RuleGroupList{
"user1": {
&rulespb.RuleGroupDesc{
Expand Down

0 comments on commit 10d1517

Please sign in to comment.