Skip to content

Commit

Permalink
spanner/request-id: complete TODOs and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Nov 5, 2024
1 parent 921c239 commit 3d50ba7
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 18 deletions.
8 changes: 6 additions & 2 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,13 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
ParamTypes: paramTypes,
}
sh.updateLastUseTime()
// TODO: (@odeke-em) retrieve the requestID and increment the RPC number
// then send it along in every call per retry.

// PartitionQuery does not retry automatically so we don't need to retrieve
// the injected requestID to increment the RPC number on retries.
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), req, gax.WithGRPCOptions(grpc.Header(&md)))
if gcl, ok := client.(*grpcSpannerClient); ok {
gcl.setOrResetRPCID()
}

if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions spanner/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func (e *Error) decorate(info string) {
// APIError error having given error code as its status.
func spannerErrorf(code codes.Code, format string, args ...interface{}) error {
msg := fmt.Sprintf(format, args...)
return spannerError(code, msg)
}

func spannerError(code codes.Code, msg string) error {
wrapped, _ := apierror.FromError(status.Error(code, msg))
return &Error{
Code: code,
Expand Down
4 changes: 4 additions & 0 deletions spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const (
MethodExecuteBatchDml string = "EXECUTE_BATCH_DML"
MethodStreamingRead string = "EXECUTE_STREAMING_READ"
MethodBatchWrite string = "BATCH_WRITE"
MethodPartitionQuery string = "PARTITION_QUERY"
)

// StatementResult represents a mocked result on the test server. The result is
Expand Down Expand Up @@ -1107,6 +1108,9 @@ func (s *inMemSpannerServer) Rollback(ctx context.Context, req *spannerpb.Rollba
}

func (s *inMemSpannerServer) PartitionQuery(ctx context.Context, req *spannerpb.PartitionQueryRequest) (*spannerpb.PartitionResponse, error) {
if err := s.simulateExecutionTime(MethodPartitionQuery, req); err != nil {
return nil, err
}
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
Expand Down
13 changes: 9 additions & 4 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,19 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
var md metadata.MD
sh.updateLastUseTime()
// Begin transaction.
res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
client := sh.getClient()
res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
ExcludeTxnFromChangeStreams: options.ExcludeTxnFromChangeStreams,
},
})
// This function is invoked afresh on every retry and it retrieves a fresh client
// each time hence does not need an extraction and increment of the injected spanner requestId.
if gcl, ok := client.(*grpcSpannerClient); ok {
defer gcl.setOrResetRPCID()
}
if err != nil {
return 0, ToSpannerError(err)
}
Expand All @@ -126,9 +132,8 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
}

sh.updateLastUseTime()
// TODO: (@odeke-em) retrieve the requestID and increment the RPC number
// then send it along in every call per retry.
resultSet, err := sh.getClient().ExecuteSql(ctx, req, gax.WithGRPCOptions(grpc.Header(&md)))

resultSet, err := client.ExecuteSql(ctx, req, gax.WithGRPCOptions(grpc.Header(&md)))
if getGFELatencyMetricsFlag() && md != nil && sh.session.pool != nil {
err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
if err != nil {
Expand Down
Loading

0 comments on commit 3d50ba7

Please sign in to comment.