-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(spanner): implement generation and propagation of "x-goog-spanner-request-id" Header #11048
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1318,10 +1318,21 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup | |
return &BatchWriteResponseIterator{meterTracerFactory: c.metricsTracerFactory, err: err} | ||
} | ||
|
||
nRPCs := uint64(0) | ||
rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) { | ||
var md metadata.MD | ||
sh.updateLastUseTime() | ||
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{ | ||
nRPCs++ | ||
|
||
// Firstly set the number of retries as the RPCID. | ||
client := sh.getClient() | ||
gcl, ok := client.(*grpcSpannerClient) | ||
if ok { | ||
gcl.setRPCID(nRPCs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to assume that there will be only one active request for a gRPC client at the same time. That does not seem correct for two reasons:
|
||
defer gcl.setOrResetRPCID() | ||
} | ||
|
||
stream, rpcErr := client.BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{ | ||
Session: sh.getID(), | ||
MutationGroups: mgsPb, | ||
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag), | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -18,14 +18,19 @@ | |||
|
||||
import ( | ||||
"context" | ||||
"fmt" | ||||
"math/rand" | ||||
"strings" | ||||
"sync/atomic" | ||||
"time" | ||||
|
||||
vkit "cloud.google.com/go/spanner/apiv1" | ||||
"cloud.google.com/go/spanner/apiv1/spannerpb" | ||||
"cloud.google.com/go/spanner/internal" | ||||
"github.com/googleapis/gax-go/v2" | ||||
"google.golang.org/api/option" | ||||
"google.golang.org/grpc" | ||||
"google.golang.org/grpc/metadata" | ||||
"google.golang.org/grpc/peer" | ||||
"google.golang.org/grpc/status" | ||||
) | ||||
|
@@ -65,10 +70,43 @@ | |||
// grpcSpannerClient is the gRPC API implementation of the transport-agnostic | ||||
// spannerClient interface. | ||||
type grpcSpannerClient struct { | ||||
id uint64 | ||||
raw *vkit.Client | ||||
metricsTracerFactory *builtinMetricsTracerFactory | ||||
|
||||
// These fields are used to uniquely track x-goog-spanner-request-id | ||||
// grpc.ClientConn is presumed to be the channel, hence channelID | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||||
// is redundant. However, is it correct to presume that raw.Connection() | ||||
// will always be the same throughout the lifetime of a grcpSpannerClient? | ||||
channelID uint64 | ||||
// nthRequest shall always be incremented on every fresh request. | ||||
nthRequest *atomic.Uint32 | ||||
// This id uniquely defines the RPC being issued and in | ||||
// the case of retries it should be incremented. | ||||
rpcID *atomic.Uint64 | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) setOrResetRPCID() { | ||||
if g.rpcID == nil { | ||||
g.rpcID = new(atomic.Uint64) | ||||
} | ||||
g.rpcID.Store(1) | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) setRPCID(rpcID uint64) { | ||||
g.rpcID.Store(rpcID) | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) prepareRequestIDTrackers() { | ||||
g.id = nGRPCClient.Add(1) | ||||
g.nthRequest = new(atomic.Uint32) | ||||
g.channelID = 1 // Assuming that .raw.Connection() never changes. | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not be fixed at For multiplexed sessions, we do that here: google-cloud-go/spanner/session.go Line 1102 in 45e1ce7
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above also shows why the current strategy of assuming that a |
||||
g.nthRequest = new(atomic.Uint32) | ||||
g.setOrResetRPCID() | ||||
} | ||||
|
||||
var nGRPCClient = new(atomic.Uint64) | ||||
|
||||
var ( | ||||
// Ensure that grpcSpannerClient implements spannerClient. | ||||
_ spannerClient = (*grpcSpannerClient)(nil) | ||||
|
@@ -83,6 +121,8 @@ | |||
} | ||||
|
||||
g := &grpcSpannerClient{raw: raw, metricsTracerFactory: sc.metricsTracerFactory} | ||||
g.prepareRequestIDTrackers() | ||||
|
||||
clientInfo := []string{"gccl", internal.Version} | ||||
if sc.userAgent != "" { | ||||
agentWithVersion := strings.SplitN(sc.userAgent, "/", 2) | ||||
|
@@ -118,7 +158,7 @@ | |||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.CreateSession(ctx, req, opts...) | ||||
resp, err := g.raw.CreateSession(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
|
@@ -128,7 +168,7 @@ | |||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.BatchCreateSessions(ctx, req, opts...) | ||||
resp, err := g.raw.BatchCreateSessions(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
|
@@ -138,45 +178,67 @@ | |||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.GetSession(ctx, req, opts...) | ||||
resp, err := g.raw.GetSession(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) ListSessions(ctx context.Context, req *spannerpb.ListSessionsRequest, opts ...gax.CallOption) *vkit.SessionIterator { | ||||
return g.raw.ListSessions(ctx, req, opts...) | ||||
return g.raw.ListSessions(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) DeleteSession(ctx context.Context, req *spannerpb.DeleteSessionRequest, opts ...gax.CallOption) error { | ||||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
err := g.raw.DeleteSession(ctx, req, opts...) | ||||
err := g.raw.DeleteSession(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return err | ||||
} | ||||
|
||||
var randIdForProcess uint32 | ||||
|
||||
func init() { | ||||
randIdForProcess = rand.New(rand.NewSource(time.Now().UnixNano())).Uint32() | ||||
} | ||||
|
||||
const xSpannerRequestIDHeader = "x-goog-spanner-request-id" | ||||
|
||||
// optsWithNextRequestID bundles priors with a new header "x-goog-spanner-request-id" | ||||
func (g *grpcSpannerClient) optsWithNextRequestID(priors []gax.CallOption) []gax.CallOption { | ||||
// TODO: Decide if each field should be padded and to what width or | ||||
// should we just let fields fill up so as to reduce bandwidth? | ||||
// Go creates grpc.ClientConn which is presumed to be a channel, so channelID is going to be redundant. | ||||
requestID := fmt.Sprintf("%d.%d.%d.%d.%d", randIdForProcess, g.id, g.nextNthRequest(), g.channelID, g.rpcID.Load()) | ||||
md := metadata.MD{xSpannerRequestIDHeader: []string{requestID}} | ||||
return append(priors, gax.WithGRPCOptions(grpc.Header(&md))) | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) nextNthRequest() uint32 { | ||||
return g.nthRequest.Add(1) | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (*spannerpb.ResultSet, error) { | ||||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.ExecuteSql(ctx, req, opts...) | ||||
resp, err := g.raw.ExecuteSql(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) ExecuteStreamingSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (spannerpb.Spanner_ExecuteStreamingSqlClient, error) { | ||||
return g.raw.ExecuteStreamingSql(peer.NewContext(ctx, &peer.Peer{}), req, opts...) | ||||
return g.raw.ExecuteStreamingSql(peer.NewContext(ctx, &peer.Peer{}), req, g.optsWithNextRequestID(opts)...) | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) ExecuteBatchDml(ctx context.Context, req *spannerpb.ExecuteBatchDmlRequest, opts ...gax.CallOption) (*spannerpb.ExecuteBatchDmlResponse, error) { | ||||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.ExecuteBatchDml(ctx, req, opts...) | ||||
resp, err := g.raw.ExecuteBatchDml(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
|
@@ -186,21 +248,21 @@ | |||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.Read(ctx, req, opts...) | ||||
resp, err := g.raw.Read(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) StreamingRead(ctx context.Context, req *spannerpb.ReadRequest, opts ...gax.CallOption) (spannerpb.Spanner_StreamingReadClient, error) { | ||||
return g.raw.StreamingRead(peer.NewContext(ctx, &peer.Peer{}), req, opts...) | ||||
return g.raw.StreamingRead(peer.NewContext(ctx, &peer.Peer{}), req, g.optsWithNextRequestID(opts)...) | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest, opts ...gax.CallOption) (*spannerpb.Transaction, error) { | ||||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.BeginTransaction(ctx, req, opts...) | ||||
resp, err := g.raw.BeginTransaction(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
|
@@ -210,7 +272,7 @@ | |||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.Commit(ctx, req, opts...) | ||||
resp, err := g.raw.Commit(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
|
@@ -220,7 +282,7 @@ | |||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
err := g.raw.Rollback(ctx, req, opts...) | ||||
err := g.raw.Rollback(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return err | ||||
|
@@ -230,7 +292,7 @@ | |||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.PartitionQuery(ctx, req, opts...) | ||||
resp, err := g.raw.PartitionQuery(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
|
@@ -240,12 +302,12 @@ | |||
mt := g.newBuiltinMetricsTracer(ctx) | ||||
defer recordOperationCompletion(mt) | ||||
ctx = context.WithValue(ctx, metricsTracerKey, mt) | ||||
resp, err := g.raw.PartitionRead(ctx, req, opts...) | ||||
resp, err := g.raw.PartitionRead(ctx, req, g.optsWithNextRequestID(opts)...) | ||||
statusCode, _ := status.FromError(err) | ||||
mt.currOp.setStatus(statusCode.Code().String()) | ||||
return resp, err | ||||
} | ||||
|
||||
func (g *grpcSpannerClient) BatchWrite(ctx context.Context, req *spannerpb.BatchWriteRequest, opts ...gax.CallOption) (spannerpb.Spanner_BatchWriteClient, error) { | ||||
return g.raw.BatchWrite(peer.NewContext(ctx, &peer.Peer{}), req, opts...) | ||||
return g.raw.BatchWrite(peer.NewContext(ctx, &peer.Peer{}), req, g.optsWithNextRequestID(opts)...) | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean with this?
PartitionQuery
retries if it receives anUNAVAILABLE
error (same as most unary RPCs). See https://github.com/googleapis/googleapis/blob/master/google/spanner/v1/spanner_grpc_service_config.json for the default RPC configuration.