Skip to content

Commit

Permalink
Add OTLP message limit config and logic (#6333)
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
  • Loading branch information
SungJin1212 authored Nov 15, 2024
1 parent ee62ab2 commit b8156a0
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2562,6 +2562,10 @@ ha_tracker:
# CLI flag: -distributor.max-recv-msg-size
[max_recv_msg_size: <int> | default = 104857600]
# Maximum OTLP request size in bytes that the Distributor can accept.
# CLI flag: -distributor.otlp-max-recv-msg-size
[otlp_max_recv_msg_size: <int> | default = 104857600]
# Timeout for downstream ingesters.
# CLI flag: -distributor.remote-timeout
[remote_timeout: <duration> | default = 2s]
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
Expand Down
8 changes: 5 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ type Config struct {

HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
OTLPMaxRecvMsgSize int `yaml:"otlp_max_recv_msg_size"`
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
Expand Down Expand Up @@ -186,6 +187,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(f)

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "remote_write API max receive message size (bytes).")
f.IntVar(&cfg.OTLPMaxRecvMsgSize, "distributor.otlp-max-recv-msg-size", 100<<20, "Maximum OTLP request size in bytes that the Distributor can accept.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"flag"
Expand Down Expand Up @@ -143,6 +144,7 @@ type CompressionType int
const (
NoCompression CompressionType = iota
RawSnappy
Gzip
)

// ParseProtoReader parses a compressed proto from an io.Reader.
Expand Down Expand Up @@ -215,6 +217,13 @@ func decompressFromReader(reader io.Reader, expectedSize, maxSize int, compressi
return nil, err
}
body, err = decompressFromBuffer(&buf, maxSize, RawSnappy, sp)
case Gzip:
reader, err = gzip.NewReader(reader)
if err != nil {
return nil, err
}
_, err = buf.ReadFrom(reader)
body = buf.Bytes()
}
return body, err
}
Expand Down
86 changes: 83 additions & 3 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package push

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"github.com/prometheus/prometheus/util/annotations"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor"
Expand All @@ -24,8 +28,13 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
pbContentType = "application/x-protobuf"
jsonContentType = "application/json"
)

// OTLPHandler is a http.Handler which accepts OTLP metrics.
func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := util_log.WithContext(ctx, util_log.Logger)
Expand All @@ -42,7 +51,7 @@ func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, so
return
}

req, err := remote.DecodeOTLPWriteRequest(r)
req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -90,6 +99,64 @@ func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, so
})
}

func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) {
expectedSize := int(r.ContentLength)
if expectedSize > maxSize {
return pmetricotlp.NewExportRequest(), fmt.Errorf("received message larger than max (%d vs %d)", expectedSize, maxSize)
}

contentType := r.Header.Get("Content-Type")
contentEncoding := r.Header.Get("Content-Encoding")

var compressionType util.CompressionType
switch contentEncoding {
case "gzip":
compressionType = util.Gzip
case "":
compressionType = util.NoCompression
default:
return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported compression: %s, Supported compression types are \"gzip\" or '' (no compression)", contentEncoding)
}

var decoderFunc func(reader io.Reader) (pmetricotlp.ExportRequest, error)
switch contentType {
case pbContentType:
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
otlpReqProto := otlpProtoMessage{req: &req}
return req, util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)
}
case jsonContentType:
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()

reader = io.LimitReader(reader, int64(maxSize)+1)
if compressionType == util.Gzip {
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
return req, err
}
}

var buf bytes.Buffer
if expectedSize > 0 {
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
}
_, err := buf.ReadFrom(reader)
if err != nil {
return req, err
}

return req, req.UnmarshalJSON(buf.Bytes())
}
default:
return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported content type: %s, supported: [%s, %s]", contentType, jsonContentType, pbContentType)
}

return decoderFunc(r.Body)
}

func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, error) {
promConverter := prometheusremotewrite.NewPrometheusConverter()
settings := prometheusremotewrite.Settings{
Expand Down Expand Up @@ -223,3 +290,16 @@ func joinAttributeMaps(from, to pcommon.Map) {
return true
})
}

// otlpProtoMessage Implements proto.Meesage, proto.Unmarshaler
type otlpProtoMessage struct {
req *pmetricotlp.ExportRequest
}

func (otlpProtoMessage) ProtoMessage() {}

func (otlpProtoMessage) Reset() {}

func (otlpProtoMessage) String() string { return "" }

func (o otlpProtoMessage) Unmarshal(data []byte) error { return o.req.UnmarshalProto(data) }
Loading

0 comments on commit b8156a0

Please sign in to comment.