Skip to content

Commit

Permalink
spanner/request: add StreamingCall checks from tests
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Nov 1, 2024
1 parent 563efb7 commit 1938e60
Showing 1 changed file with 339 additions and 2 deletions.
341 changes: 339 additions & 2 deletions spanner/request_id_header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func checkForMissingSpannerRequestIDHeader(opts []grpc.CallOption) (*requestIDSe
}
}
}

if requestID == "" {
return nil, status.Errorf(codes.InvalidArgument, "missing %q header", xSpannerRequestIDHeader)
}

if !regRequestID.MatchString(requestID) {
return nil, status.Errorf(codes.InvalidArgument, "requestID does not conform to pattern=%q", regRequestID.String())
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func newInterceptorTracker() *interceptorTracker {
}
}

func TestRequestIDHeader_updatedOnRetries(t *testing.T) {
func TestRequestIDHeader_onRetriesWithFailedTransactionCommit(t *testing.T) {
interceptorTracker := newInterceptorTracker()
clientOpts := []option.ClientOption{
option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptorTracker.unaryClientInterceptor)),
Expand Down Expand Up @@ -386,3 +386,340 @@ func TestRequestIDHeader_updatedOnRetries(t *testing.T) {
t.Fatal(err)
}
}

// Tests that SessionNotFound errors are retried.
func TestRequestIDHeader_retriesOnSessionNotFound(t *testing.T) {
t.Parallel()
ctx := context.Background()
interceptorTracker := newInterceptorTracker()
clientOpts := []option.ClientOption{
option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptorTracker.unaryClientInterceptor)),
option.WithGRPCDialOption(grpc.WithStreamInterceptor(interceptorTracker.streamClientInterceptor)),
}
clientConfig := ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 2,
MaxOpened: 10,
WriteSessions: 0.2,
incStep: 2,
},
}
server, sc, tearDown := setupMockedTestServerWithConfigAndClientOptions(t, clientConfig, clientOpts)
t.Cleanup(tearDown)
defer sc.Close()

serverErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
server.TestSpanner.PutExecutionTime(testutil.MethodBeginTransaction,
testutil.SimulatedExecutionTime{
Errors: []error{serverErr, serverErr, serverErr},
})
server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction,
testutil.SimulatedExecutionTime{
Errors: []error{serverErr},
})

txn := sc.ReadOnlyTransaction()
defer txn.Close()

var wantErr error
if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) {
t.Fatalf("Expect acquire to succeed, got %v, want %v.", got, wantErr)
}

// The server error should lead to a retry of the BeginTransaction call and
// a valid session handle to be returned that will be used by the following
// requests. Note that calling txn.Query(...) does not actually send the
// query to the (mock) server. That is done at the first call to
// RowIterator.Next. The following statement only verifies that the
// transaction is in a valid state and received a valid session handle.
if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) {
t.Fatalf("Expect Query to succeed, got %v, want %v.", got.err, wantErr)
}

if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) {
t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr)
}

wantErr = ToSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []any{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []any{int64(2), "Bar", int64(1)}),
}
_, got := sc.Apply(ctx, ms, ApplyAtLeastOnce())
if !testEqual(wantErr, got) {
t.Fatalf("Expect Apply to fail\nGot: %v\nWant: %v\n", got, wantErr)
}

if g, w := interceptorTracker.unaryCallCount(), uint64(8); g != w {
t.Errorf("unaryClientCall is incorrect; got=%d want=%d", g, w)
}
if g := interceptorTracker.streamCallCount(); g > 0 {
t.Errorf("streamClientCall was unexpectedly invoked %d times", g)
}

if err := interceptorTracker.validateRequestIDsMonotonicity(); err != nil {
t.Fatal(err)
}
}

func TestRequestIDHeader_BatchDMLWithMultipleDML(t *testing.T) {
t.Parallel()

interceptorTracker := newInterceptorTracker()
clientOpts := []option.ClientOption{
option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptorTracker.unaryClientInterceptor)),
option.WithGRPCDialOption(grpc.WithStreamInterceptor(interceptorTracker.streamClientInterceptor)),
}
clientConfig := ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 2,
MaxOpened: 10,
WriteSessions: 0.2,
incStep: 2,
},
}

ctx := context.Background()
server, sc, tearDown := setupMockedTestServerWithConfigAndClientOptions(t, clientConfig, clientOpts)
t.Cleanup(tearDown)
defer sc.Close()

updateBarSetFoo := testutil.UpdateBarSetFoo
_, err := sc.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
if _, err = tx.Update(ctx, Statement{SQL: updateBarSetFoo}); err != nil {
return err
}
if _, err = tx.BatchUpdate(ctx, []Statement{{SQL: updateBarSetFoo}, {SQL: updateBarSetFoo}}); err != nil {
return err
}
if _, err = tx.Update(ctx, Statement{SQL: updateBarSetFoo}); err != nil {
return err
}
_, err = tx.BatchUpdate(ctx, []Statement{{SQL: updateBarSetFoo}})
return err
})
if err != nil {
t.Fatal(err)
}

gotReqs, err := shouldHaveReceived(server.TestSpanner, []any{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteBatchDmlRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.ExecuteBatchDmlRequest{},
&sppb.CommitRequest{},
})
if err != nil {
t.Fatal(err)
}
muxCreateBuffer := 0
if isMultiplexEnabled {
muxCreateBuffer = 1
}
if got, want := gotReqs[1+muxCreateBuffer].(*sppb.ExecuteSqlRequest).Seqno, int64(1); got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := gotReqs[2+muxCreateBuffer].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(2); got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := gotReqs[3+muxCreateBuffer].(*sppb.ExecuteSqlRequest).Seqno, int64(3); got != want {
t.Errorf("got %d, want %d", got, want)
}
if got, want := gotReqs[4+muxCreateBuffer].(*sppb.ExecuteBatchDmlRequest).Seqno, int64(4); got != want {
t.Errorf("got %d, want %d", got, want)
}

if g, w := interceptorTracker.unaryCallCount(), uint64(6); g != w {
t.Errorf("unaryClientCall is incorrect; got=%d want=%d", g, w)
}
if g := interceptorTracker.streamCallCount(); g > 0 {
t.Errorf("streamClientCall was unexpectedly invoked %d times", g)
}

if err := interceptorTracker.validateRequestIDsMonotonicity(); err != nil {
t.Fatal(err)
}
}

func TestRequestIDHeader_clientBatchWrite(t *testing.T) {
t.Parallel()

interceptorTracker := newInterceptorTracker()
clientOpts := []option.ClientOption{
option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptorTracker.unaryClientInterceptor)),
option.WithGRPCDialOption(grpc.WithStreamInterceptor(interceptorTracker.streamClientInterceptor)),
}
clientConfig := ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 2,
MaxOpened: 10,
WriteSessions: 0.2,
incStep: 2,
},
}

server, sc, tearDown := setupMockedTestServerWithConfigAndClientOptions(t, clientConfig, clientOpts)
t.Cleanup(tearDown)
defer sc.Close()

mutationGroups := []*MutationGroup{
{[]*Mutation{
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []any{"foo1", 1}},
}},
}
iter := sc.BatchWrite(context.Background(), mutationGroups)
responseCount := 0
doFunc := func(r *sppb.BatchWriteResponse) error {
responseCount++
return nil
}
if err := iter.Do(doFunc); err != nil {
t.Fatal(err)
}
if responseCount != len(mutationGroups) {
t.Fatalf("Response count mismatch.\nGot: %v\nWant:%v", responseCount, len(mutationGroups))
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]any{
&sppb.BatchCreateSessionsRequest{},
&sppb.BatchWriteRequest{},
}, requests); err != nil {
t.Fatal(err)
}

if g, w := interceptorTracker.unaryCallCount(), uint64(1); g != w {
t.Errorf("unaryClientCall is incorrect; got=%d want=%d", g, w)
}
if g, w := interceptorTracker.streamCallCount(), uint64(1); g != w {
t.Errorf("streamClientCall is incorrect; got=%d want=%d", g, w)
}

if err := interceptorTracker.validateRequestIDsMonotonicity(); err != nil {
t.Fatal(err)
}
}

func TestRequestIDHeader_ClientBatchWriteWithSessionNotFound(t *testing.T) {
t.Parallel()

interceptorTracker := newInterceptorTracker()
clientOpts := []option.ClientOption{
option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptorTracker.unaryClientInterceptor)),
option.WithGRPCDialOption(grpc.WithStreamInterceptor(interceptorTracker.streamClientInterceptor)),
}
clientConfig := ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 2,
MaxOpened: 10,
WriteSessions: 0.2,
incStep: 2,
},
}

server, sc, tearDown := setupMockedTestServerWithConfigAndClientOptions(t, clientConfig, clientOpts)
t.Cleanup(tearDown)
defer sc.Close()

server.TestSpanner.PutExecutionTime(
testutil.MethodBatchWrite,
testutil.SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
mutationGroups := []*MutationGroup{
{[]*Mutation{
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []any{"foo1", 1}},
}},
}
iter := sc.BatchWrite(context.Background(), mutationGroups)
responseCount := 0
doFunc := func(r *sppb.BatchWriteResponse) error {
responseCount++
return nil
}
if err := iter.Do(doFunc); err != nil {
t.Fatal(err)
}
if responseCount != len(mutationGroups) {
t.Fatalf("Response count mismatch.\nGot: %v\nWant:%v", responseCount, len(mutationGroups))
}

requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]any{
&sppb.BatchCreateSessionsRequest{},
&sppb.BatchWriteRequest{},
&sppb.BatchWriteRequest{},
}, requests); err != nil {
t.Fatal(err)
}

if g, w := interceptorTracker.unaryCallCount(), uint64(1); g != w {
t.Errorf("unaryClientCall is incorrect; got=%d want=%d", g, w)
}

// We had a retry for BatchWrite after the first SessionNotFound error, hence expecting 2 calls.
if g, w := interceptorTracker.streamCallCount(), uint64(2); g != w {
t.Errorf("streamClientCall is incorrect; got=%d want=%d", g, w)
}

if err := interceptorTracker.validateRequestIDsMonotonicity(); err != nil {
t.Fatal(err)
}
}

func TestRequestIDHeader_ClientBatchWriteWithError(t *testing.T) {
t.Parallel()

interceptorTracker := newInterceptorTracker()
clientOpts := []option.ClientOption{
option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptorTracker.unaryClientInterceptor)),
option.WithGRPCDialOption(grpc.WithStreamInterceptor(interceptorTracker.streamClientInterceptor)),
}
clientConfig := ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 2,
MaxOpened: 10,
WriteSessions: 0.2,
incStep: 2,
},
}

server, sc, tearDown := setupMockedTestServerWithConfigAndClientOptions(t, clientConfig, clientOpts)
t.Cleanup(tearDown)
defer sc.Close()

injectedErr := status.Error(codes.InvalidArgument, "Invalid argument")
server.TestSpanner.PutExecutionTime(
testutil.MethodBatchWrite,
testutil.SimulatedExecutionTime{Errors: []error{injectedErr}},
)
mutationGroups := []*MutationGroup{
{[]*Mutation{
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []any{"foo1", 1}},
}},
}
iter := sc.BatchWrite(context.Background(), mutationGroups)
responseCount := 0
doFunc := func(r *sppb.BatchWriteResponse) error {
responseCount++
return nil
}
if err := iter.Do(doFunc); err != nil {
t.Fatal(err)
}
if responseCount != 0 {
t.Fatalf("Do unexpectedly called %d times", responseCount)
}

if g, w := interceptorTracker.unaryCallCount(), uint64(1); g != w {
t.Errorf("unaryClientCall is incorrect; got=%d want=%d", g, w)
}

// We had a straight-up failure after the first BatchWrite call so only 1 call.
if g, w := interceptorTracker.streamCallCount(), uint64(1); g != w {
t.Errorf("streamClientCall is incorrect; got=%d want=%d", g, w)
}

if err := interceptorTracker.validateRequestIDsMonotonicity(); err != nil {
t.Fatal(err)
}
}

0 comments on commit 1938e60

Please sign in to comment.