diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e77b62020..a86b2cd9cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129 * [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245 * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 +* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333 * [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280 * [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255 * [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 42da5e4a6f..211ad25711 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2562,6 +2562,10 @@ ha_tracker: # CLI flag: -distributor.max-recv-msg-size [max_recv_msg_size: | default = 104857600] +# Maximum OTLP request size in bytes that the Distributor can accept. +# CLI flag: -distributor.otlp-max-recv-msg-size +[otlp_max_recv_msg_size: | default = 104857600] + # Timeout for downstream ingesters. # CLI flag: -distributor.remote-timeout [remote_timeout: | default = 2s] diff --git a/pkg/api/api.go b/pkg/api/api.go index 9de60b9bc4..13843c3e64 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -278,7 +278,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib distributorpb.RegisterDistributorServer(a.server.GRPC, d) a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") - a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics") diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 8f6b97aa5d..4725f6dff1 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -135,9 +135,10 @@ type Config struct { HATrackerConfig HATrackerConfig `yaml:"ha_tracker"` - MaxRecvMsgSize int `yaml:"max_recv_msg_size"` - RemoteTimeout time.Duration `yaml:"remote_timeout"` - ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"` + MaxRecvMsgSize int `yaml:"max_recv_msg_size"` + OTLPMaxRecvMsgSize int `yaml:"otlp_max_recv_msg_size"` + RemoteTimeout time.Duration `yaml:"remote_timeout"` + ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"` ShardingStrategy string `yaml:"sharding_strategy"` ShardByAllLabels bool `yaml:"shard_by_all_labels"` @@ -186,6 +187,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.DistributorRing.RegisterFlags(f) f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "remote_write API max receive message size (bytes).") + f.IntVar(&cfg.OTLPMaxRecvMsgSize, "distributor.otlp-max-recv-msg-size", 100<<20, "Maximum OTLP request size in bytes that the Distributor can accept.") f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.") diff --git a/pkg/util/http.go b/pkg/util/http.go index 09fb3df38c..09b6aea9fe 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -2,6 +2,7 @@ package util import ( "bytes" + "compress/gzip" "context" "encoding/json" "flag" @@ -143,6 +144,7 @@ type CompressionType int const ( NoCompression CompressionType = iota RawSnappy + Gzip ) // ParseProtoReader parses a compressed proto from an io.Reader. @@ -215,6 +217,13 @@ func decompressFromReader(reader io.Reader, expectedSize, maxSize int, compressi return nil, err } body, err = decompressFromBuffer(&buf, maxSize, RawSnappy, sp) + case Gzip: + reader, err = gzip.NewReader(reader) + if err != nil { + return nil, err + } + _, err = buf.ReadFrom(reader) + body = buf.Bytes() } return body, err } diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index e6e7fef6ce..9d0bcb1fba 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -1,20 +1,24 @@ package push import ( + "bytes" + "compress/gzip" "context" + "fmt" + "io" "net/http" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" "github.com/prometheus/prometheus/util/annotations" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/distributor" @@ -24,8 +28,13 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) +const ( + pbContentType = "application/x-protobuf" + jsonContentType = "application/json" +) + // OTLPHandler is a http.Handler which accepts OTLP metrics. -func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := util_log.WithContext(ctx, util_log.Logger) @@ -42,7 +51,7 @@ func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, so return } - req, err := remote.DecodeOTLPWriteRequest(r) + req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -90,6 +99,64 @@ func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, so }) } +func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) { + expectedSize := int(r.ContentLength) + if expectedSize > maxSize { + return pmetricotlp.NewExportRequest(), fmt.Errorf("received message larger than max (%d vs %d)", expectedSize, maxSize) + } + + contentType := r.Header.Get("Content-Type") + contentEncoding := r.Header.Get("Content-Encoding") + + var compressionType util.CompressionType + switch contentEncoding { + case "gzip": + compressionType = util.Gzip + case "": + compressionType = util.NoCompression + default: + return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported compression: %s, Supported compression types are \"gzip\" or '' (no compression)", contentEncoding) + } + + var decoderFunc func(reader io.Reader) (pmetricotlp.ExportRequest, error) + switch contentType { + case pbContentType: + decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + otlpReqProto := otlpProtoMessage{req: &req} + return req, util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType) + } + case jsonContentType: + decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + + reader = io.LimitReader(reader, int64(maxSize)+1) + if compressionType == util.Gzip { + var err error + reader, err = gzip.NewReader(reader) + if err != nil { + return req, err + } + } + + var buf bytes.Buffer + if expectedSize > 0 { + buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation + } + _, err := buf.ReadFrom(reader) + if err != nil { + return req, err + } + + return req, req.UnmarshalJSON(buf.Bytes()) + } + default: + return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported content type: %s, supported: [%s, %s]", contentType, jsonContentType, pbContentType) + } + + return decoderFunc(r.Body) +} + func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, error) { promConverter := prometheusremotewrite.NewPrometheusConverter() settings := prometheusremotewrite.Settings{ @@ -223,3 +290,16 @@ func joinAttributeMaps(from, to pcommon.Map) { return true }) } + +// otlpProtoMessage Implements proto.Meesage, proto.Unmarshaler +type otlpProtoMessage struct { + req *pmetricotlp.ExportRequest +} + +func (otlpProtoMessage) ProtoMessage() {} + +func (otlpProtoMessage) Reset() {} + +func (otlpProtoMessage) String() string { return "" } + +func (o otlpProtoMessage) Unmarshal(data []byte) error { return o.req.UnmarshalProto(data) } diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index 40b42f3fee..0adc65135a 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -2,7 +2,9 @@ package push import ( "bytes" + "compress/gzip" "context" + "io" "net/http" "net/http/httptest" "testing" @@ -226,50 +228,144 @@ func TestOTLPWriteHandler(t *testing.T) { exportRequest := generateOTLPWriteRequest(t) - t.Run("Test proto format write", func(t *testing.T) { - buf, err := exportRequest.MarshalProto() - require.NoError(t, err) + tests := []struct { + description string + maxRecvMsgSize int + format string + expectedStatusCode int + expectedErrMsg string + gzipCompression bool + encodingType string + }{ + { + description: "Test proto format write with no compression", + maxRecvMsgSize: 10000, + format: pbContentType, + expectedStatusCode: http.StatusOK, + }, + { + description: "Test proto format write with gzip", + maxRecvMsgSize: 10000, + format: pbContentType, + expectedStatusCode: http.StatusOK, + encodingType: "gzip", + gzipCompression: true, + }, + { + description: "Test json format write with no compression", + maxRecvMsgSize: 10000, + format: jsonContentType, + expectedStatusCode: http.StatusOK, + }, + { + description: "Test json format write with gzip", + maxRecvMsgSize: 10000, + format: jsonContentType, + expectedStatusCode: http.StatusOK, + encodingType: "gzip", + gzipCompression: true, + }, + { + description: "request too big than maxRecvMsgSize (proto) with no compression", + maxRecvMsgSize: 10, + format: pbContentType, + expectedStatusCode: http.StatusBadRequest, + expectedErrMsg: "received message larger than max", + }, + { + description: "request too big than maxRecvMsgSize (proto) with gzip", + maxRecvMsgSize: 10, + format: pbContentType, + expectedStatusCode: http.StatusBadRequest, + expectedErrMsg: "received message larger than max", + encodingType: "gzip", + gzipCompression: true, + }, + { + description: "request too big than maxRecvMsgSize (json) with no compression", + maxRecvMsgSize: 10, + format: jsonContentType, + expectedStatusCode: http.StatusBadRequest, + expectedErrMsg: "received message larger than max", + }, + { + description: "request too big than maxRecvMsgSize (json) with gzip", + maxRecvMsgSize: 10, + format: jsonContentType, + expectedStatusCode: http.StatusBadRequest, + expectedErrMsg: "received message larger than max", + encodingType: "gzip", + gzipCompression: true, + }, + { + description: "invalid encoding type: snappy", + maxRecvMsgSize: 10000, + format: jsonContentType, + expectedStatusCode: http.StatusBadRequest, + encodingType: "snappy", + }, + } - ctx := context.Background() - ctx = user.InjectOrgID(ctx, "user-1") + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + var req *http.Request + + compressionFunc := func(t *testing.T, body []byte) []byte { + var b bytes.Buffer + gz := gzip.NewWriter(&b) + _, err := gz.Write(body) + require.NoError(t, err) + require.NoError(t, gz.Close()) + + return b.Bytes() + } - req, err := http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf)) - require.NoError(t, err) - req.Header.Set("Content-Type", "application/x-protobuf") + if test.format == pbContentType { + buf, err := exportRequest.MarshalProto() + require.NoError(t, err) - push := verifyOTLPWriteRequestHandler(t, cortexpb.API) - overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) - require.NoError(t, err) - handler := OTLPHandler(overrides, cfg, nil, push) + if test.gzipCompression { + buf = compressionFunc(t, buf) + } - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + req, err = http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf)) + require.NoError(t, err) + req.Header.Set("Content-Type", pbContentType) + req.Header.Set("Content-Encoding", test.encodingType) + } else { + buf, err := exportRequest.MarshalJSON() + require.NoError(t, err) - resp := recorder.Result() - require.Equal(t, http.StatusOK, resp.StatusCode) - }) - t.Run("Test json format write", func(t *testing.T) { - buf, err := exportRequest.MarshalJSON() - require.NoError(t, err) + if test.gzipCompression { + buf = compressionFunc(t, buf) + } - ctx := context.Background() - ctx = user.InjectOrgID(ctx, "user-1") + req, err = http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf)) + require.NoError(t, err) + req.Header.Set("Content-Type", jsonContentType) + req.Header.Set("Content-Encoding", test.encodingType) + } - req, err := http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf)) - require.NoError(t, err) - req.Header.Set("Content-Type", "application/json") + push := verifyOTLPWriteRequestHandler(t, cortexpb.API) + overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) + require.NoError(t, err) + handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push) - push := verifyOTLPWriteRequestHandler(t, cortexpb.API) - overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) - require.NoError(t, err) - handler := OTLPHandler(overrides, cfg, nil, push) + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + resp := recorder.Result() + require.Equal(t, test.expectedStatusCode, resp.StatusCode) - resp := recorder.Result() - require.Equal(t, http.StatusOK, resp.StatusCode) - }) + if test.expectedErrMsg != "" { + b, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Contains(t, string(b), test.expectedErrMsg) + } + }) + } } func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest {