diff --git a/plank/pkg/server/core_models.go b/plank/pkg/server/core_models.go index 1c1d771..2e5bd2f 100644 --- a/plank/pkg/server/core_models.go +++ b/plank/pkg/server/core_models.go @@ -12,6 +12,7 @@ import ( "github.com/vmware/transport-go/plank/utils" "github.com/vmware/transport-go/service" "github.com/vmware/transport-go/stompserver" + "io" "net/http" "os" "sync" @@ -66,22 +67,24 @@ type PlatformServer interface { type platformServer struct { HttpServer *http.Server // Http server instance SyscallChan chan os.Signal // syscall channel to receive SIGINT, SIGKILL events + eventbus bus.EventBus // event bus pointer serverConfig *PlatformServerConfig // server config instance middlewareManager middleware.MiddlewareManager // middleware maanger instance router *mux.Router // *mux.Router instance routerConcurrencyProtection *int32 // atomic int32 to protect the main router being concurrently written to - out *os.File // platform log output pointer + out io.Writer // platform log output pointer endpointHandlerMap map[string]http.HandlerFunc // internal map to store rest endpoint -handler mappings serviceChanToBridgeEndpoints map[string][]string // internal map to store service channel - endpoint handler key mappings fabricConn stompserver.RawConnectionListener // WebSocket listener instance ServerAvailability *ServerAvailability // server availability (not much used other than for internal monitoring for now) lock sync.Mutex // lock + messageBridgeMap map[string]*MessageBridge } -// TransportChannelResponse wraps Transport *Message.Message with an error object for easier transfer -type TransportChannelResponse struct { - Message *model.Message // wrapper object that contains the payload - Err error // error object if there is any +// MessageBridge is a conduit used for returning service responses as HTTP responses +type MessageBridge struct { + ServiceListenStream bus.MessageHandler // message handler returned by bus.ListenStream responsible for relaying back messages as HTTP responses + payloadChannel chan *model.Message // internal golang channel used for passing bus responses/errors across goroutines } // ServerAvailability contains boolean fields to indicate what components of the system are available or not diff --git a/plank/pkg/server/endpointer_handler_factory.go b/plank/pkg/server/endpointer_handler_factory.go index 3f7978d..19e0156 100644 --- a/plank/pkg/server/endpointer_handler_factory.go +++ b/plank/pkg/server/endpointer_handler_factory.go @@ -4,8 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/sirupsen/logrus" - "github.com/vmware/transport-go/bus" "github.com/vmware/transport-go/model" "github.com/vmware/transport-go/plank/utils" "github.com/vmware/transport-go/service" @@ -16,7 +14,7 @@ import ( // buildEndpointHandler builds a http.HandlerFunc that wraps Transport Bus operations in an HTTP request-response cycle. // service channel, request builder and rest bridge timeout are passed as parameters. -func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, restBridgeTimeout time.Duration) http.HandlerFunc { +func (ps *platformServer) buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, restBridgeTimeout time.Duration, msgChan chan *model.Message) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { defer func() { if r := recover(); r != nil { @@ -28,23 +26,10 @@ func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, // set context that expires after the provided amount of time in restBridgeTimeout to prevent requests from hanging forever ctx, cancelFn := context.WithTimeout(context.Background(), restBridgeTimeout) defer cancelFn() - h, err := bus.GetBus().ListenOnce(svcChannel) - if err != nil { - panic(err) - } - - // set up a channel through which to receive the raw response from transport channel - // handler function runs in another thread so we need to utilize channel to use the correct writer. - chanReturn := make(chan *TransportChannelResponse) - h.Handle(func(message *model.Message) { - chanReturn <- &TransportChannelResponse{Message: message} - }, func(err error) { - chanReturn <- &TransportChannelResponse{Err: err} - }) // relay the request to transport channel reqModel := reqBuilder(w, r) - err = bus.GetBus().SendRequestMessage(svcChannel, reqModel, reqModel.Id) + err := ps.eventbus.SendRequestMessage(svcChannel, reqModel, reqModel.Id) // get a response from the channel, render the results using ResponseWriter and log the data/error // to the console as well. @@ -53,14 +38,14 @@ func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, http.Error( w, fmt.Sprintf("No response received from service channel in %s, request timed out", restBridgeTimeout.String()), 500) - case chanResponse := <-chanReturn: - if chanResponse.Err != nil { - utils.Log.WithError(chanResponse.Err).Errorf( + case msg := <-msgChan: + if msg.Error != nil { + utils.Log.WithError(msg.Error).Errorf( "Error received from channel %s:", svcChannel) - http.Error(w, chanResponse.Err.Error(), 500) + http.Error(w, msg.Error.Error(), 500) } else { - // only send the actual user payload not wrapper information - response := chanResponse.Message.Payload.(*model.Response) + // only send the actual user payloadChannel not wrapper information + response := msg.Payload.(*model.Response) var respBody interface{} if response.Error { if response.Payload != nil { @@ -72,10 +57,6 @@ func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, respBody = response.Payload } - utils.Log.WithFields(logrus.Fields{ - //"payload": respBody, // don't show this, we may be sending around big byte arrays - }).Debugf("Response received from channel %s:", svcChannel) - // if our Message is an error and it has a code, lets send that back to the client. if response.Error { diff --git a/plank/pkg/server/endpointer_handler_factory_test.go b/plank/pkg/server/endpointer_handler_factory_test.go index c7b2497..b4fb94e 100644 --- a/plank/pkg/server/endpointer_handler_factory_test.go +++ b/plank/pkg/server/endpointer_handler_factory_test.go @@ -9,6 +9,7 @@ import ( "github.com/vmware/transport-go/model" "github.com/vmware/transport-go/service" "net/http" + "os" "testing" "time" ) @@ -16,53 +17,73 @@ import ( func TestBuildEndpointHandler_Timeout(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { return model.Request{ Payload: nil, Request: "test-request", } - }, 5*time.Millisecond), "GET", "http://localhost", nil, "request timed out") + }, 5*time.Millisecond, msgChan), "GET", "http://localhost", nil, "request timed out") } func TestBuildEndpointHandler_ChanResponseErr(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") - assert.HTTPErrorf(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + assert.HTTPErrorf(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { uId := &uuid.UUID{} - _ = b.SendErrorMessage("test-chan", fmt.Errorf("test error"), uId) + msgChan <- &model.Message{Error: fmt.Errorf("test error")} return model.Request{ Id: uId, Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, "test error") + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, "test error") } func TestBuildEndpointHandler_SuccessResponse(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { uId := &uuid.UUID{} - _ = b.SendResponseMessage("test-chan", &model.Response{ + msgChan <- &model.Message{Payload: &model.Response{ Id: uId, Payload: []byte("{\"error\": false}"), - }, uId) + }} return model.Request{ Id: uId, Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, "{\"error\": false}") + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, "{\"error\": false}") } func TestBuildEndpointHandler_ErrorResponse(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() _ = b.GetChannelManager().CreateChannel("test-chan") + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + msgChan := make(chan *model.Message, 1) uId := &uuid.UUID{} rsp := &model.Response{ Id: uId, @@ -72,21 +93,26 @@ func TestBuildEndpointHandler_ErrorResponse(t *testing.T) { } expected, _ := json.Marshal(rsp.Payload) - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { - _ = b.SendResponseMessage("test-chan", rsp, uId) + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + msgChan <- &model.Message{Payload: rsp} return model.Request{ Id: uId, Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, string(expected)) + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, string(expected)) } func TestBuildEndpointHandler_ErrorResponseAlternative(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b uId := &uuid.UUID{} rsp := &model.Response{ @@ -95,26 +121,31 @@ func TestBuildEndpointHandler_ErrorResponseAlternative(t *testing.T) { Error: true, } - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { - _ = b.SendResponseMessage("test-chan", rsp, uId) + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + msgChan <- &model.Message{Payload: rsp} return model.Request{ Id: uId, Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, "418") + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, "418") } func TestBuildEndpointHandler_CatchPanic(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { panic("peekaboo") return model.Request{ Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, "Internal Server Error") + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, "Internal Server Error") } diff --git a/plank/pkg/server/initialize.go b/plank/pkg/server/initialize.go index 7bb2742..276af09 100644 --- a/plank/pkg/server/initialize.go +++ b/plank/pkg/server/initialize.go @@ -14,8 +14,10 @@ import ( "github.com/vmware/transport-go/stompserver" "log" "net/http" + _ "net/http/pprof" "path/filepath" "reflect" + "runtime" "time" ) @@ -25,12 +27,11 @@ func (ps *platformServer) initialize() { var err error // initialize core components - var busInstance = bus.GetBus() var serviceRegistryInstance = service.GetServiceRegistry() var svcLifecycleManager = service.GetServiceLifecycleManager() // create essential bus channels - busInstance.GetChannelManager().CreateChannel(PLANK_SERVER_ONLINE_CHANNEL) + ps.eventbus.GetChannelManager().CreateChannel(PLANK_SERVER_ONLINE_CHANNEL) // initialize HTTP endpoint handlers map ps.endpointHandlerMap = map[string]http.HandlerFunc{} @@ -49,10 +50,18 @@ func (ps *platformServer) initialize() { utils.Log.SetFormatter(formatter) utils.Log.SetOutput(ps.out) - // if debug flag is provided enable extra logging + // if debug flag is provided enable extra logging. also, enable profiling at port 6060 if ps.serverConfig.Debug { utils.Log.SetLevel(logrus.DebugLevel) - utils.Log.Debugln("Debug logging enabled") + go func() { + runtime.SetBlockProfileRate(1) // capture traces of all possible contended mutex holders + profilerRouter := mux.NewRouter() + profilerRouter.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) + if err := http.ListenAndServe(":6060", profilerRouter); err != nil { + panic(err) + } + }() + utils.Log.Debugln("Debug logging and profiling enabled. Available types of profiles at http://localhost:6060/debug/pprof") } // set a new route handler @@ -92,7 +101,7 @@ func (ps *platformServer) initialize() { } // set up a listener to receive REST bridge configs for services and set them up according to their specs - lcmChanHandler, err := busInstance.ListenStreamForDestination(service.LifecycleManagerChannelName, busInstance.GetId()) + lcmChanHandler, err := ps.eventbus.ListenStreamForDestination(service.LifecycleManagerChannelName, ps.eventbus.GetId()) if err != nil { utils.Log.Fatalln(err) } @@ -104,7 +113,7 @@ func (ps *platformServer) initialize() { } fabricSvc, _ := serviceRegistryInstance.GetService(request.ServiceChannel) - svcReadyStore := busInstance.GetStoreManager().GetStore(service.ServiceReadyStore) + svcReadyStore := ps.eventbus.GetStoreManager().GetStore(service.ServiceReadyStore) hooks := svcLifecycleManager.GetServiceHooks(request.ServiceChannel) if request.Override { @@ -135,7 +144,7 @@ func (ps *platformServer) initialize() { // create an internal bus channel to notify significant changes in sessions such as disconnect if ps.serverConfig.FabricConfig != nil { - channelManager := busInstance.GetChannelManager() + channelManager := ps.eventbus.GetChannelManager() channelManager.CreateChannel(bus.STOMP_SESSION_NOTIFY_CHANNEL) } diff --git a/plank/pkg/server/initialize_rest_bridge_override_test.go b/plank/pkg/server/initialize_rest_bridge_override_test.go index 4658d04..204e9b6 100644 --- a/plank/pkg/server/initialize_rest_bridge_override_test.go +++ b/plank/pkg/server/initialize_rest_bridge_override_test.go @@ -37,7 +37,7 @@ func TestInitialize_DebugLogging(t *testing.T) { func TestInitialize_RestBridgeOverride(t *testing.T) { // arrange - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() testRoot := filepath.Join(os.TempDir(), "plank-tests") _ = os.MkdirAll(testRoot, 0755) @@ -47,6 +47,7 @@ func TestInitialize_RestBridgeOverride(t *testing.T) { cfg := GetBasicTestServerConfig(testRoot, "stdout", "stdout", "stderr", GetTestPort(), true) baseUrl, _, testServerInterface := CreateTestServer(cfg) testServer := testServerInterface.(*platformServer) + testServer.eventbus = newBus // register ping pong service with default bridge points of /rest/ping-pong, /rest/ping-pong2 and /rest/ping-pong/{from}/{to}/{message} testServerInterface.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) @@ -62,8 +63,8 @@ func TestInitialize_RestBridgeOverride(t *testing.T) { oldRouter := testServer.router // assert - RunWhenServerReady(t, bus.GetBus(), func(t2 *testing.T) { - _ = bus.GetBus().SendResponseMessage(service.LifecycleManagerChannelName, &service.SetupRESTBridgeRequest{ + RunWhenServerReady(t, newBus, func(t2 *testing.T) { + _ = newBus.SendResponseMessage(service.LifecycleManagerChannelName, &service.SetupRESTBridgeRequest{ ServiceChannel: services.PingPongServiceChan, Override: true, Config: []*service.RESTBridgeConfig{ @@ -76,7 +77,7 @@ func TestInitialize_RestBridgeOverride(t *testing.T) { }, }, }, - }, bus.GetBus().GetId()) + }, newBus.GetId()) // router instance should have been swapped time.Sleep(1 * time.Second) diff --git a/plank/pkg/server/server.go b/plank/pkg/server/server.go index 5c79fe2..82f02a3 100644 --- a/plank/pkg/server/server.go +++ b/plank/pkg/server/server.go @@ -11,6 +11,7 @@ import ( "fmt" "github.com/spf13/cobra" "github.com/spf13/pflag" + "github.com/vmware/transport-go/model" "io/ioutil" "net" "net/http" @@ -46,6 +47,8 @@ func NewPlatformServer(config *PlatformServerConfig) PlatformServer { ps.serverConfig = config ps.ServerAvailability = &ServerAvailability{} ps.routerConcurrencyProtection = new(int32) + ps.messageBridgeMap = make(map[string]*MessageBridge) + ps.eventbus = bus.GetBus() ps.initialize() return ps } @@ -73,6 +76,7 @@ func NewPlatformServerFromConfig(configPath string) (PlatformServer, error) { } ps := new(platformServer) + ps.eventbus = bus.GetBus() sanitizeConfigRootPath(&config) // ensure references to file system paths are relative to config.RootDir @@ -165,7 +169,7 @@ func (ps *platformServer) StartServer(syschan chan os.Signal) { utils.Log.Infof("[plank] Starting Transport broker at %s:%d%s", ps.serverConfig.Host, ps.serverConfig.Port, ps.serverConfig.FabricConfig.FabricEndpoint) ps.ServerAvailability.Fabric = true - if err := bus.GetBus().StartFabricEndpoint(ps.fabricConn, *ps.serverConfig.FabricConfig.EndpointConfig); err != nil { + if err := ps.eventbus.StartFabricEndpoint(ps.fabricConn, *ps.serverConfig.FabricConfig.EndpointConfig); err != nil { panic(err) } }() @@ -175,7 +179,7 @@ func (ps *platformServer) StartServer(syschan chan os.Signal) { go func() { <-ps.SyscallChan // notify subscribers that the server is shutting down - _ = bus.GetBus().SendResponseMessage(PLANK_SERVER_ONLINE_CHANNEL, false, nil) + _ = ps.eventbus.SendResponseMessage(PLANK_SERVER_ONLINE_CHANNEL, false, nil) ps.StopServer() close(connClosed) }() @@ -190,7 +194,7 @@ func (ps *platformServer) StartServer(syschan chan os.Signal) { utils.Log.Debugln("waiting for http server to be ready to accept connections") continue } - _ = bus.GetBus().SendResponseMessage(PLANK_SERVER_ONLINE_CHANNEL, true, nil) + _ = ps.eventbus.SendResponseMessage(PLANK_SERVER_ONLINE_CHANNEL, true, nil) break } @@ -284,7 +288,7 @@ func (ps *platformServer) RegisterService(svc service.FabricService, svcChannel var hooks service.ServiceLifecycleHookEnabled if hooks = svcLifecycleManager.GetServiceHooks(svcChannel); hooks == nil { // if service has no lifecycle hooks mark the channel as ready straight up - storeManager := bus.GetBus().GetStoreManager() + storeManager := ps.eventbus.GetStoreManager() store := storeManager.GetStore(service.ServiceReadyStore) store.Put(svcChannel, true, service.ServiceInitStateChange) utils.Log.Infof("[plank] Service '%s' initialized successfully", svcType.String()) @@ -293,7 +297,7 @@ func (ps *platformServer) RegisterService(svc service.FabricService, svcChannel return err } -// SetHttpChannelBridge establishes a conduit between the the transport service channel and an HTTP endpoint +// SetHttpChannelBridge establishes a conduit between the transport service channel and an HTTP endpoint // that allows a client to invoke the service via REST. func (ps *platformServer) SetHttpChannelBridge(bridgeConfig *service.RESTBridgeConfig) { ps.lock.Lock() @@ -312,11 +316,24 @@ func (ps *platformServer) SetHttpChannelBridge(bridgeConfig *service.RESTBridgeC ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel] = make([]string, 0) } + if _, exists := ps.messageBridgeMap[bridgeConfig.ServiceChannel]; !exists { + handler, _ := ps.eventbus.ListenStream(bridgeConfig.ServiceChannel) + handler.Handle(func(message *model.Message) { + ps.messageBridgeMap[bridgeConfig.ServiceChannel].payloadChannel <- message + }, func(err error) {}) + + ps.messageBridgeMap[bridgeConfig.ServiceChannel] = &MessageBridge{ + ServiceListenStream: handler, + payloadChannel: make(chan *model.Message, 100), + } + } + // build endpoint handler - ps.endpointHandlerMap[endpointHandlerKey] = buildEndpointHandler( + ps.endpointHandlerMap[endpointHandlerKey] = ps.buildEndpointHandler( bridgeConfig.ServiceChannel, bridgeConfig.FabricRequestBuilder, - ps.serverConfig.RestBridgeTimeout) + ps.serverConfig.RestBridgeTimeout, + ps.messageBridgeMap[bridgeConfig.ServiceChannel].payloadChannel) ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel] = append( ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel], endpointHandlerKey) @@ -349,7 +366,7 @@ func (ps *platformServer) SetHttpChannelBridge(bridgeConfig *service.RESTBridgeC bridgeConfig.ServiceChannel, bridgeConfig.Uri, bridgeConfig.Method) } -// SetHttpPathPrefixChannelBridge establishes a conduit between the the transport service channel and a path prefix +// SetHttpPathPrefixChannelBridge establishes a conduit between the transport service channel and a path prefix // every request on this prefix will be sent through to the target service, all methods, all sub paths, lock, stock and barrel. func (ps *platformServer) SetHttpPathPrefixChannelBridge(bridgeConfig *service.RESTBridgeConfig) { ps.lock.Lock() @@ -368,11 +385,24 @@ func (ps *platformServer) SetHttpPathPrefixChannelBridge(bridgeConfig *service.R ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel] = make([]string, 0) } + if _, exists := ps.messageBridgeMap[bridgeConfig.ServiceChannel]; !exists { + handler, _ := ps.eventbus.ListenStream(bridgeConfig.ServiceChannel) + handler.Handle(func(message *model.Message) { + ps.messageBridgeMap[bridgeConfig.ServiceChannel].payloadChannel <- message + }, func(err error) {}) + + ps.messageBridgeMap[bridgeConfig.ServiceChannel] = &MessageBridge{ + ServiceListenStream: handler, + payloadChannel: make(chan *model.Message, 100), + } + } + // build endpoint handler - ps.endpointHandlerMap[endpointHandlerKey] = buildEndpointHandler( + ps.endpointHandlerMap[endpointHandlerKey] = ps.buildEndpointHandler( bridgeConfig.ServiceChannel, bridgeConfig.FabricRequestBuilder, - ps.serverConfig.RestBridgeTimeout) + ps.serverConfig.RestBridgeTimeout, + ps.messageBridgeMap[bridgeConfig.ServiceChannel].payloadChannel) ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel] = append( ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel], endpointHandlerKey) @@ -492,3 +522,9 @@ func (ps *platformServer) checkPortAvailability() { ps.serverConfig.Host, ps.serverConfig.Port) } } + +func (ps *platformServer) setEventBusRef(evtBus bus.EventBus) { + ps.lock.Lock() + ps.eventbus = evtBus + ps.lock.Unlock() +} diff --git a/plank/pkg/server/server_smoke_test.go b/plank/pkg/server/server_smoke_test.go index 45476ad..ed65144 100644 --- a/plank/pkg/server/server_smoke_test.go +++ b/plank/pkg/server/server_smoke_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/stretchr/testify/assert" "github.com/vmware/transport-go/bus" + "github.com/vmware/transport-go/service" "io/ioutil" "net/http" "os" @@ -13,6 +14,8 @@ import ( ) func TestSmokeTests(t *testing.T) { + newBus := bus.ResetBus() + service.ResetServiceRegistry() testRoot := filepath.Join(os.TempDir(), "plank-tests") //testOutFile := filepath.Join(testRoot, "plank-server-tests.log") _ = os.MkdirAll(testRoot, 0755) @@ -32,6 +35,7 @@ func TestSmokeTests(t *testing.T) { }, } baseUrl, _, testServer := CreateTestServer(cfg) + testServer.(*platformServer).eventbus = newBus assert.EqualValues(t, fmt.Sprintf("http://localhost:%d", port), baseUrl) @@ -39,7 +43,7 @@ func TestSmokeTests(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go testServer.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t *testing.T) { + RunWhenServerReady(t, newBus, func(t *testing.T) { // root url - 404 t.Run("404 on root", func(t2 *testing.T) { cl := http.DefaultClient @@ -63,6 +67,8 @@ func TestSmokeTests(t *testing.T) { } func TestSmokeTests_NoFabric(t *testing.T) { + newBus := bus.ResetBus() + service.ResetServiceRegistry() testRoot := filepath.Join(os.TempDir(), "plank-tests") _ = os.MkdirAll(testRoot, 0755) defer os.RemoveAll(testRoot) @@ -71,6 +77,7 @@ func TestSmokeTests_NoFabric(t *testing.T) { cfg := GetBasicTestServerConfig(testRoot, "stdout", "stdout", "stderr", port, true) cfg.FabricConfig = nil baseUrl, _, testServer := CreateTestServer(cfg) + testServer.(*platformServer).eventbus = newBus assert.EqualValues(t, fmt.Sprintf("http://localhost:%d", port), baseUrl) @@ -78,7 +85,7 @@ func TestSmokeTests_NoFabric(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go testServer.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t *testing.T) { + RunWhenServerReady(t, newBus, func(t *testing.T) { // fabric - 404 t.Run("404 on fabric endpoint", func(t2 *testing.T) { cl := http.DefaultClient @@ -94,6 +101,8 @@ func TestSmokeTests_NoFabric(t *testing.T) { } func TestSmokeTests_HealthEndpoint(t *testing.T) { + newBus := bus.ResetBus() + service.ResetServiceRegistry() testRoot := filepath.Join(os.TempDir(), "plank-tests") _ = os.MkdirAll(testRoot, 0755) defer os.RemoveAll(testRoot) @@ -102,6 +111,7 @@ func TestSmokeTests_HealthEndpoint(t *testing.T) { cfg := GetBasicTestServerConfig(testRoot, "stdout", "stdout", "stderr", port, true) cfg.FabricConfig = nil baseUrl, _, testServer := CreateTestServer(cfg) + testServer.(*platformServer).eventbus = newBus assert.EqualValues(t, fmt.Sprintf("http://localhost:%d", port), baseUrl) @@ -109,7 +119,7 @@ func TestSmokeTests_HealthEndpoint(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go testServer.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(*testing.T) { + RunWhenServerReady(t, newBus, func(*testing.T) { t.Run("/health returns OK", func(t2 *testing.T) { cl := http.DefaultClient rsp, err := cl.Get(fmt.Sprintf("%s/health", baseUrl)) diff --git a/plank/pkg/server/server_test.go b/plank/pkg/server/server_test.go index 992f828..1459da8 100644 --- a/plank/pkg/server/server_test.go +++ b/plank/pkg/server/server_test.go @@ -4,6 +4,7 @@ package server import ( + "context" "fmt" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -11,10 +12,12 @@ import ( "github.com/vmware/transport-go/model" "github.com/vmware/transport-go/plank/services" "github.com/vmware/transport-go/service" + "golang.org/x/net/context/ctxhttp" "io/ioutil" "net/http" "os" "path/filepath" + "strings" "sync" "testing" ) @@ -43,25 +46,27 @@ func TestNewPlatformServer_FileLog(t *testing.T) { _ = os.Remove(filepath.Join(os.TempDir(), "testlog.log")) }() - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() newConfig := GetBasicTestServerConfig(os.TempDir(), filepath.Join(os.TempDir(), "testlog.log"), "stdout", "stderr", port, true) - NewPlatformServer(newConfig) + ps := NewPlatformServer(newConfig) + ps.(*platformServer).eventbus = newBus assert.FileExists(t, filepath.Join(os.TempDir(), "testlog.log")) } func TestPlatformServer_StartServer(t *testing.T) { - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus syschan := make(chan os.Signal, 1) wg := sync.WaitGroup{} wg.Add(1) go ps.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t2 *testing.T) { + RunWhenServerReady(t, newBus, func(t2 *testing.T) { rsp, err := http.Get(fmt.Sprintf("http://localhost:%d", port)) assert.Nil(t, err) @@ -76,29 +81,92 @@ func TestPlatformServer_StartServer(t *testing.T) { } func TestPlatformServer_RegisterService(t *testing.T) { - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus err := ps.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) assert.Nil(t, err) } +func TestPlatformServer_SetHttpPathPrefixChannelBridge(t *testing.T) { + // get a new bus instance and create a new platform server instance + newBus := bus.ResetBus() + service.ResetServiceRegistry() + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus + + // register a service + _ = ps.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) + + // set PathPrefix bridge + bridgeConfig := &service.RESTBridgeConfig{ + ServiceChannel: services.PingPongServiceChan, + Uri: "/ping-pong", + FabricRequestBuilder: func(w http.ResponseWriter, r *http.Request) model.Request { + return model.Request{ + Payload: "hello", + Request: "ping-get", + } + }, + } + ps.SetHttpPathPrefixChannelBridge(bridgeConfig) + + syschan := make(chan os.Signal, 1) + wg := sync.WaitGroup{} + wg.Add(1) + go ps.StartServer(syschan) + RunWhenServerReady(t, newBus, func(t2 *testing.T) { + // GET + rsp, err := http.Get(fmt.Sprintf("http://localhost:%d/ping-pong", port)) + assert.Nil(t, err) + + body, err := ioutil.ReadAll(rsp.Body) + assert.Nil(t, err) + assert.Contains(t, string(body), "hello") + + // POST + rsp, err = http.Post(fmt.Sprintf("http://localhost:%d/ping-pong", port), "application/json", strings.NewReader("")) + assert.Nil(t, err) + body, err = ioutil.ReadAll(rsp.Body) + assert.Nil(t, err) + assert.Contains(t, string(body), "hello") + + // DELETE + req, _ := http.NewRequest("DELETE", fmt.Sprintf("http://localhost:%d/ping-pong", port), strings.NewReader("")) + rsp, err = ctxhttp.Do(context.Background(), http.DefaultClient, req) + assert.Nil(t, err) + body, err = ioutil.ReadAll(rsp.Body) + assert.Nil(t, err) + assert.Contains(t, string(body), "hello") + + ps.StopServer() + service.GetServiceRegistry().UnregisterService(services.PingPongServiceChan) + wg.Done() + }) + + wg.Wait() +} + func TestPlatformServer_SetHttpChannelBridge(t *testing.T) { - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus _ = ps.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) syschan := make(chan os.Signal, 1) wg := sync.WaitGroup{} wg.Add(1) go ps.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t2 *testing.T) { + RunWhenServerReady(t, newBus, func(t2 *testing.T) { rsp, err := http.Get(fmt.Sprintf("http://localhost:%d/rest/ping-pong2?message=hello", port)) assert.Nil(t, err) @@ -114,11 +182,12 @@ func TestPlatformServer_SetHttpChannelBridge(t *testing.T) { } func TestPlatformServer_UnknownRequest(t *testing.T) { - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus _ = ps.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) defer service.GetServiceRegistry().UnregisterService(services.PingPongServiceChan) setupBridge(ps, "/ping", "GET", services.PingPongServiceChan, "bubble") @@ -127,7 +196,7 @@ func TestPlatformServer_UnknownRequest(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go ps.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t2 *testing.T) { + RunWhenServerReady(t, newBus, func(t2 *testing.T) { rsp, err := http.Get(fmt.Sprintf("http://localhost:%d/ping?msg=hello", port)) assert.Nil(t, err) diff --git a/plank/pkg/server/test_suite_harness.go b/plank/pkg/server/test_suite_harness.go index e60883a..f839131 100644 --- a/plank/pkg/server/test_suite_harness.go +++ b/plank/pkg/server/test_suite_harness.go @@ -12,7 +12,7 @@ import ( "github.com/vmware/transport-go/bus" "github.com/vmware/transport-go/model" "github.com/vmware/transport-go/plank/utils" - "github.com/vmware/transport-go/service" + svc "github.com/vmware/transport-go/service" "io/ioutil" "net" "os" @@ -81,7 +81,7 @@ func GetBasicTestServerConfig(rootDir, outLog, accessLog, errLog string, port in // SetupPlankTestSuite will boot a new instance of plank on your chosen port and will also fire up your service // Ready to be tested. This always runs on localhost. -func SetupPlankTestSuite(service service.FabricService, serviceChannel string, port int, +func SetupPlankTestSuite(service svc.FabricService, serviceChannel string, port int, config *PlatformServerConfig) (*PlankIntegrationTestSuite, error) { s := &PlankIntegrationTestSuite{} @@ -103,7 +103,8 @@ func SetupPlankTestSuite(service service.FabricService, serviceChannel string, p s.Syschan = make(chan os.Signal, 1) go s.PlatformServer.StartServer(s.Syschan) - s.EventBus = bus.GetBus() + s.EventBus = bus.ResetBus() + svc.ResetServiceRegistry() // get a pointer to the channel manager s.ChannelManager = s.EventBus.GetChannelManager() diff --git a/plank/pkg/server/test_suite_harness_test.go b/plank/pkg/server/test_suite_harness_test.go index fdf7063..ab4927d 100644 --- a/plank/pkg/server/test_suite_harness_test.go +++ b/plank/pkg/server/test_suite_harness_test.go @@ -35,7 +35,8 @@ func (m *testPlankTestIntegration) SetBus(eventBus bus.EventBus) { } func TestSetupPlankTestSuiteForTest(t *testing.T) { - b := bus.GetBus() + b := bus.ResetBus() + service.ResetServiceRegistry() cm := b.GetChannelManager() pit := &PlankIntegrationTestSuite{ Suite: suite.Suite{}, diff --git a/plank/utils/cli.go b/plank/utils/cli.go index 2b486ea..b7c2a82 100644 --- a/plank/utils/cli.go +++ b/plank/utils/cli.go @@ -71,17 +71,17 @@ var PlatformServerFlagConstants = map[string]map[string]string{ "OutputLog": { "FlagName": "output-log", "ShortFlag": "l", - "Description": "Platform log output", + "Description": "Platform log output. Possible values: stdout, stderr, null, or path to a file", }, "AccessLog": { "FlagName": "access-log", "ShortFlag": "a", - "Description": "HTTP server access log output", + "Description": "HTTP server access log output. Possible values: stdout, stderr, null, or path to a file", }, "ErrorLog": { "FlagName": "error-log", "ShortFlag": "e", - "Description": "HTTP server error log output", + "Description": "HTTP server error log output. Possible values: stdout, stderr, null, or path to a file", }, "Debug": { "FlagName": "debug", diff --git a/plank/utils/logger.go b/plank/utils/logger.go index 493168f..f986e0d 100644 --- a/plank/utils/logger.go +++ b/plank/utils/logger.go @@ -5,6 +5,7 @@ package utils import ( "github.com/sirupsen/logrus" + "io" "os" "path" ) @@ -17,9 +18,9 @@ type LogConfig struct { OutputLog string `json:"output_log"` FormatOptions *LogFormatOption `json:"format_options"` Root string `json:"root"` - accessLogFp *os.File `json:"-"` - errorLogFp *os.File `json:"-"` - outputLogFp *os.File `json:"-"` + accessLogFp io.Writer `json:"-"` + errorLogFp io.Writer `json:"-"` + outputLogFp io.Writer `json:"-"` } // LogFormatOption is merely a wrapper of logrus.TextFormatter because TextFormatter does not allow serializing @@ -82,7 +83,7 @@ type LogFormatOption struct { } func (lc *LogConfig) PrepareLogFiles() error { - var fp *os.File + var fp io.Writer var err error if fp, err = lc.prepareLogFilePointer(lc.AccessLog); err != nil { return err @@ -102,23 +103,25 @@ func (lc *LogConfig) PrepareLogFiles() error { return nil } -func (lc *LogConfig) GetAccessLogFilePointer() *os.File { +func (lc *LogConfig) GetAccessLogFilePointer() io.Writer { return lc.accessLogFp } -func (lc *LogConfig) GetErrorLogFilePointer() *os.File { +func (lc *LogConfig) GetErrorLogFilePointer() io.Writer { return lc.errorLogFp } -func (lc *LogConfig) GetPlatformLogFilePointer() *os.File { +func (lc *LogConfig) GetPlatformLogFilePointer() io.Writer { return lc.outputLogFp } -func (lc *LogConfig) prepareLogFilePointer(target string) (fp *os.File, err error) { +func (lc *LogConfig) prepareLogFilePointer(target string) (fp io.Writer, err error) { if target == "stdout" { fp = os.Stdout } else if target == "stderr" { fp = os.Stderr + } else if target == "null" { + fp = &noopWriter{} } else { logFilePath := JoinBasePathIfRelativeRegularFilePath(lc.Root, target) fp, err = GetNewLogFilePointer(logFilePath) @@ -213,6 +216,17 @@ func (l *PlankLogger) Panicf(format string, args ...interface{}) { l.setCommonFields().Panicf(format, args...) } +// noopWriter does absolutely nothing and return immediately. for references +// to those who wonder why this is here then in the first place, this is so +// this no-op writer instance can be passed as an access logger to not bother +// logging HTTP access logs which may not be necessary for some applications +// that want as lowest IO bottlenecks as possible. +type noopWriter struct{} + +func (noopWriter *noopWriter) Write(p []byte) (n int, err error) { + return 0, nil +} + func init() { Log = &PlankLogger{logrus.New()} } diff --git a/plank/utils/paths.go b/plank/utils/paths.go index bc20ee0..ecc3bda 100644 --- a/plank/utils/paths.go +++ b/plank/utils/paths.go @@ -55,7 +55,7 @@ func DeriveStaticURIFromPath(input string) (string, string) { func JoinBasePathIfRelativeRegularFilePath(base string, in string) (out string) { out = in - if in == "stdout" || in == "stderr" { + if in == "stdout" || in == "stderr" || in == "null" { return }