Skip to content

Commit

Permalink
Reusing the grpc client to peform healthcheck (#6260)
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <alanprot@gmail.com>
  • Loading branch information
alanprot authored Oct 17, 2024
1 parent 441ed1d commit e455bda
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 32 deletions.
101 changes: 70 additions & 31 deletions pkg/util/grpcclient/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package grpcclient

import (
"context"
"errors"
"flag"
"fmt"
"io"
Expand All @@ -11,41 +10,49 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/util/services"
)

var (
unhealthyErr = errors.New("instance marked as unhealthy")
unhealthyErr = status.Error(codes.Unavailable, "instance marked as unhealthy")
)

type HealthCheckConfig struct {
*HealthCheckInterceptors `yaml:"-"`

UnhealthyThreshold int `yaml:"unhealthy_threshold"`
UnhealthyThreshold int64 `yaml:"unhealthy_threshold"`
Interval time.Duration `yaml:"interval"`
Timeout time.Duration `yaml:"timeout"`
}

// RegisterFlagsWithPrefix for Config.
func (cfg *HealthCheckConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.UnhealthyThreshold, prefix+".unhealthy-threshold", 0, "The number of consecutive failed health checks required before considering a target unhealthy. 0 means disabled.")
f.Int64Var(&cfg.UnhealthyThreshold, prefix+".unhealthy-threshold", 0, "The number of consecutive failed health checks required before considering a target unhealthy. 0 means disabled.")
f.DurationVar(&cfg.Timeout, prefix+".timeout", 1*time.Second, "The amount of time during which no response from a target means a failed health check.")
f.DurationVar(&cfg.Interval, prefix+".interval", 5*time.Second, "The approximate amount of time between health checks of an individual target.")
}

type healthCheckEntry struct {
address string
clientConfig *ConfigWithHealthCheck
type healthCheckClient struct {
grpc_health_v1.HealthClient
io.Closer
}

sync.RWMutex
unhealthyCount int
type healthCheckEntry struct {
address string
clientConfig *ConfigWithHealthCheck
lastCheckTime atomic.Time
lastTickTime atomic.Time
unhealthyCount atomic.Int64

healthCheckClientMutex sync.RWMutex
healthCheckClient *healthCheckClient
}

type HealthCheckInterceptors struct {
Expand Down Expand Up @@ -75,18 +82,14 @@ func NewHealthCheckInterceptors(logger log.Logger) *HealthCheckInterceptors {
}

func (e *healthCheckEntry) isHealthy() bool {
e.RLock()
defer e.RUnlock()
return e.unhealthyCount < e.clientConfig.HealthCheckConfig.UnhealthyThreshold
return e.unhealthyCount.Load() < e.clientConfig.HealthCheckConfig.UnhealthyThreshold
}

func (e *healthCheckEntry) recordHealth(err error) error {
e.Lock()
defer e.Unlock()
if err != nil {
e.unhealthyCount++
e.unhealthyCount.Inc()
} else {
e.unhealthyCount = 0
e.unhealthyCount.Store(0)
}

return err
Expand All @@ -96,6 +99,51 @@ func (e *healthCheckEntry) tick() {
e.lastTickTime.Store(time.Now())
}

func (e *healthCheckEntry) close() error {
e.healthCheckClientMutex.Lock()
defer e.healthCheckClientMutex.Unlock()

if e.healthCheckClient != nil {
err := e.healthCheckClient.Close()
e.healthCheckClient = nil
return err
}

return nil
}

func (e *healthCheckEntry) getClient(factory func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer)) (*healthCheckClient, error) {
e.healthCheckClientMutex.RLock()
c := e.healthCheckClient
e.healthCheckClientMutex.RUnlock()

if c != nil {
return c, nil
}

e.healthCheckClientMutex.Lock()
defer e.healthCheckClientMutex.Unlock()

if e.healthCheckClient == nil {
dialOpts, err := e.clientConfig.Config.DialOption(nil, nil)
if err != nil {
return nil, err
}
conn, err := grpc.NewClient(e.address, dialOpts...)
if err != nil {
return nil, err
}

client, closer := factory(conn)
e.healthCheckClient = &healthCheckClient{
HealthClient: client,
Closer: closer,
}
}

return e.healthCheckClient, nil
}

func (h *HealthCheckInterceptors) registeredInstances() []*healthCheckEntry {
h.RLock()
defer h.RUnlock()
Expand All @@ -112,6 +160,9 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
for _, instance := range h.registeredInstances() {
if time.Since(instance.lastTickTime.Load()) >= h.instanceGcTimeout {
h.Lock()
if err := instance.close(); err != nil {
level.Warn(h.logger).Log("msg", "Error closing health check", "err", err)
}
delete(h.activeInstances, instance.address)
h.Unlock()
continue
Expand All @@ -124,25 +175,13 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
instance.lastCheckTime.Store(time.Now())

go func(i *healthCheckEntry) {
dialOpts, err := i.clientConfig.Config.DialOption(nil, nil)
if err != nil {
level.Error(h.logger).Log("msg", "error creating dialOpts to perform healthcheck", "address", i.address, "err", err)
return
}
conn, err := grpc.NewClient(i.address, dialOpts...)
client, err := i.getClient(h.healthClientFactory)

if err != nil {
level.Error(h.logger).Log("msg", "error creating client to perform healthcheck", "address", i.address, "err", err)
level.Error(h.logger).Log("msg", "error creating healthcheck client to perform healthcheck", "address", i.address, "err", err)
return
}

client, closer := h.healthClientFactory(conn)

defer func() {
if err := closer.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)
}
}()

if err := i.recordHealth(healthCheck(client, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/util/grpcclient/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

utillog "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -136,7 +138,8 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
require.False(t, hMock.open.Load())

cortex_testutil.Poll(t, time.Second, true, func() interface{} {
return errors.Is(ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker), unhealthyErr)
err := ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker)
return errors.Is(err, unhealthyErr) || status.Code(err) == codes.Unavailable
})

// Other instances should remain healthy
Expand Down

0 comments on commit e455bda

Please sign in to comment.