Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Commit

Permalink
fix: REST bridge performance (#39)
Browse files Browse the repository at this point in the history
* fix: REST bridge performance

During a benchmark test using `wrk` it was revealed that a REST
bridged service was significantly slower than comparable servers such
as Spring Boot. Profiling the server revealed that the ListenOnce()
method was causing a significant amount of overheads as it was being
called per HTTP request, thus generating a new message handler and
invoking once.Do() call involving mutex every single time.

The fix involves using the ListenStream() method to stop generating
excessive time and space overheads during the bridge process.
See the attached screenshots for comparsion between before and after
fix.

Signed-off-by: Josh Kim <kjosh@vmware.com>

* new: support noop io.Writer for logs

Signed-off-by: Josh Kim <kjosh@vmware.com>

* fix broken unit tests

Signed-off-by: Josh Kim <kjosh@vmware.com>

* isolate bus instance throughout platformServer

Signed-off-by: Josh Kim <kjosh@vmware.com>
  • Loading branch information
jooskim authored Nov 1, 2021
1 parent b944e02 commit c8a42e8
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 98 deletions.
13 changes: 8 additions & 5 deletions plank/pkg/server/core_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/vmware/transport-go/plank/utils"
"github.com/vmware/transport-go/service"
"github.com/vmware/transport-go/stompserver"
"io"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -66,22 +67,24 @@ type PlatformServer interface {
type platformServer struct {
HttpServer *http.Server // Http server instance
SyscallChan chan os.Signal // syscall channel to receive SIGINT, SIGKILL events
eventbus bus.EventBus // event bus pointer
serverConfig *PlatformServerConfig // server config instance
middlewareManager middleware.MiddlewareManager // middleware maanger instance
router *mux.Router // *mux.Router instance
routerConcurrencyProtection *int32 // atomic int32 to protect the main router being concurrently written to
out *os.File // platform log output pointer
out io.Writer // platform log output pointer
endpointHandlerMap map[string]http.HandlerFunc // internal map to store rest endpoint -handler mappings
serviceChanToBridgeEndpoints map[string][]string // internal map to store service channel - endpoint handler key mappings
fabricConn stompserver.RawConnectionListener // WebSocket listener instance
ServerAvailability *ServerAvailability // server availability (not much used other than for internal monitoring for now)
lock sync.Mutex // lock
messageBridgeMap map[string]*MessageBridge
}

// TransportChannelResponse wraps Transport *Message.Message with an error object for easier transfer
type TransportChannelResponse struct {
Message *model.Message // wrapper object that contains the payload
Err error // error object if there is any
// MessageBridge is a conduit used for returning service responses as HTTP responses
type MessageBridge struct {
ServiceListenStream bus.MessageHandler // message handler returned by bus.ListenStream responsible for relaying back messages as HTTP responses
payloadChannel chan *model.Message // internal golang channel used for passing bus responses/errors across goroutines
}

// ServerAvailability contains boolean fields to indicate what components of the system are available or not
Expand Down
35 changes: 8 additions & 27 deletions plank/pkg/server/endpointer_handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/sirupsen/logrus"
"github.com/vmware/transport-go/bus"
"github.com/vmware/transport-go/model"
"github.com/vmware/transport-go/plank/utils"
"github.com/vmware/transport-go/service"
Expand All @@ -16,7 +14,7 @@ import (

// buildEndpointHandler builds a http.HandlerFunc that wraps Transport Bus operations in an HTTP request-response cycle.
// service channel, request builder and rest bridge timeout are passed as parameters.
func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, restBridgeTimeout time.Duration) http.HandlerFunc {
func (ps *platformServer) buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, restBridgeTimeout time.Duration, msgChan chan *model.Message) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -28,23 +26,10 @@ func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder,
// set context that expires after the provided amount of time in restBridgeTimeout to prevent requests from hanging forever
ctx, cancelFn := context.WithTimeout(context.Background(), restBridgeTimeout)
defer cancelFn()
h, err := bus.GetBus().ListenOnce(svcChannel)
if err != nil {
panic(err)
}

// set up a channel through which to receive the raw response from transport channel
// handler function runs in another thread so we need to utilize channel to use the correct writer.
chanReturn := make(chan *TransportChannelResponse)
h.Handle(func(message *model.Message) {
chanReturn <- &TransportChannelResponse{Message: message}
}, func(err error) {
chanReturn <- &TransportChannelResponse{Err: err}
})

// relay the request to transport channel
reqModel := reqBuilder(w, r)
err = bus.GetBus().SendRequestMessage(svcChannel, reqModel, reqModel.Id)
err := ps.eventbus.SendRequestMessage(svcChannel, reqModel, reqModel.Id)

// get a response from the channel, render the results using ResponseWriter and log the data/error
// to the console as well.
Expand All @@ -53,14 +38,14 @@ func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder,
http.Error(
w,
fmt.Sprintf("No response received from service channel in %s, request timed out", restBridgeTimeout.String()), 500)
case chanResponse := <-chanReturn:
if chanResponse.Err != nil {
utils.Log.WithError(chanResponse.Err).Errorf(
case msg := <-msgChan:
if msg.Error != nil {
utils.Log.WithError(msg.Error).Errorf(
"Error received from channel %s:", svcChannel)
http.Error(w, chanResponse.Err.Error(), 500)
http.Error(w, msg.Error.Error(), 500)
} else {
// only send the actual user payload not wrapper information
response := chanResponse.Message.Payload.(*model.Response)
// only send the actual user payloadChannel not wrapper information
response := msg.Payload.(*model.Response)
var respBody interface{}
if response.Error {
if response.Payload != nil {
Expand All @@ -72,10 +57,6 @@ func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder,
respBody = response.Payload
}

utils.Log.WithFields(logrus.Fields{
//"payload": respBody, // don't show this, we may be sending around big byte arrays
}).Debugf("Response received from channel %s:", svcChannel)

// if our Message is an error and it has a code, lets send that back to the client.
if response.Error {

Expand Down
65 changes: 48 additions & 17 deletions plank/pkg/server/endpointer_handler_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,60 +9,81 @@ import (
"github.com/vmware/transport-go/model"
"github.com/vmware/transport-go/service"
"net/http"
"os"
"testing"
"time"
)

func TestBuildEndpointHandler_Timeout(t *testing.T) {
b := bus.ResetBus()
service.ResetServiceRegistry()
msgChan := make(chan *model.Message, 1)
_ = b.GetChannelManager().CreateChannel("test-chan")
assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
port := GetTestPort()
config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true)
ps := NewPlatformServer(config).(*platformServer)
ps.eventbus = b
assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
return model.Request{
Payload: nil,
Request: "test-request",
}
}, 5*time.Millisecond), "GET", "http://localhost", nil, "request timed out")
}, 5*time.Millisecond, msgChan), "GET", "http://localhost", nil, "request timed out")
}

func TestBuildEndpointHandler_ChanResponseErr(t *testing.T) {
b := bus.ResetBus()
service.ResetServiceRegistry()
msgChan := make(chan *model.Message, 1)
_ = b.GetChannelManager().CreateChannel("test-chan")
assert.HTTPErrorf(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
port := GetTestPort()
config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true)
ps := NewPlatformServer(config).(*platformServer)
ps.eventbus = b
assert.HTTPErrorf(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
uId := &uuid.UUID{}
_ = b.SendErrorMessage("test-chan", fmt.Errorf("test error"), uId)
msgChan <- &model.Message{Error: fmt.Errorf("test error")}
return model.Request{
Id: uId,
Payload: nil,
Request: "test-request",
}
}, 5*time.Second), "GET", "http://localhost", nil, "test error")
}, 5*time.Second, msgChan), "GET", "http://localhost", nil, "test error")
}

func TestBuildEndpointHandler_SuccessResponse(t *testing.T) {
b := bus.ResetBus()
service.ResetServiceRegistry()
msgChan := make(chan *model.Message, 1)
_ = b.GetChannelManager().CreateChannel("test-chan")
assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
port := GetTestPort()
config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true)
ps := NewPlatformServer(config).(*platformServer)
ps.eventbus = b
assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
uId := &uuid.UUID{}
_ = b.SendResponseMessage("test-chan", &model.Response{
msgChan <- &model.Message{Payload: &model.Response{
Id: uId,
Payload: []byte("{\"error\": false}"),
}, uId)
}}
return model.Request{
Id: uId,
Payload: nil,
Request: "test-request",
}
}, 5*time.Second), "GET", "http://localhost", nil, "{\"error\": false}")
}, 5*time.Second, msgChan), "GET", "http://localhost", nil, "{\"error\": false}")
}

func TestBuildEndpointHandler_ErrorResponse(t *testing.T) {
b := bus.ResetBus()
service.ResetServiceRegistry()
_ = b.GetChannelManager().CreateChannel("test-chan")
port := GetTestPort()
config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true)
ps := NewPlatformServer(config).(*platformServer)
ps.eventbus = b

msgChan := make(chan *model.Message, 1)
uId := &uuid.UUID{}
rsp := &model.Response{
Id: uId,
Expand All @@ -72,21 +93,26 @@ func TestBuildEndpointHandler_ErrorResponse(t *testing.T) {
}
expected, _ := json.Marshal(rsp.Payload)

assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
_ = b.SendResponseMessage("test-chan", rsp, uId)
assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
msgChan <- &model.Message{Payload: rsp}
return model.Request{
Id: uId,
Payload: nil,
Request: "test-request",
}

}, 5*time.Second), "GET", "http://localhost", nil, string(expected))
}, 5*time.Second, msgChan), "GET", "http://localhost", nil, string(expected))
}

func TestBuildEndpointHandler_ErrorResponseAlternative(t *testing.T) {
b := bus.ResetBus()
service.ResetServiceRegistry()
msgChan := make(chan *model.Message, 1)
_ = b.GetChannelManager().CreateChannel("test-chan")
port := GetTestPort()
config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true)
ps := NewPlatformServer(config).(*platformServer)
ps.eventbus = b

uId := &uuid.UUID{}
rsp := &model.Response{
Expand All @@ -95,26 +121,31 @@ func TestBuildEndpointHandler_ErrorResponseAlternative(t *testing.T) {
Error: true,
}

assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
_ = b.SendResponseMessage("test-chan", rsp, uId)
assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
msgChan <- &model.Message{Payload: rsp}
return model.Request{
Id: uId,
Payload: nil,
Request: "test-request",
}

}, 5*time.Second), "GET", "http://localhost", nil, "418")
}, 5*time.Second, msgChan), "GET", "http://localhost", nil, "418")
}

func TestBuildEndpointHandler_CatchPanic(t *testing.T) {
b := bus.ResetBus()
service.ResetServiceRegistry()
msgChan := make(chan *model.Message, 1)
_ = b.GetChannelManager().CreateChannel("test-chan")
assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
port := GetTestPort()
config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true)
ps := NewPlatformServer(config).(*platformServer)
ps.eventbus = b
assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request {
panic("peekaboo")
return model.Request{
Payload: nil,
Request: "test-request",
}
}, 5*time.Second), "GET", "http://localhost", nil, "Internal Server Error")
}, 5*time.Second, msgChan), "GET", "http://localhost", nil, "Internal Server Error")
}
23 changes: 16 additions & 7 deletions plank/pkg/server/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"github.com/vmware/transport-go/stompserver"
"log"
"net/http"
_ "net/http/pprof"
"path/filepath"
"reflect"
"runtime"
"time"
)

Expand All @@ -25,12 +27,11 @@ func (ps *platformServer) initialize() {
var err error

// initialize core components
var busInstance = bus.GetBus()
var serviceRegistryInstance = service.GetServiceRegistry()
var svcLifecycleManager = service.GetServiceLifecycleManager()

// create essential bus channels
busInstance.GetChannelManager().CreateChannel(PLANK_SERVER_ONLINE_CHANNEL)
ps.eventbus.GetChannelManager().CreateChannel(PLANK_SERVER_ONLINE_CHANNEL)

// initialize HTTP endpoint handlers map
ps.endpointHandlerMap = map[string]http.HandlerFunc{}
Expand All @@ -49,10 +50,18 @@ func (ps *platformServer) initialize() {
utils.Log.SetFormatter(formatter)
utils.Log.SetOutput(ps.out)

// if debug flag is provided enable extra logging
// if debug flag is provided enable extra logging. also, enable profiling at port 6060
if ps.serverConfig.Debug {
utils.Log.SetLevel(logrus.DebugLevel)
utils.Log.Debugln("Debug logging enabled")
go func() {
runtime.SetBlockProfileRate(1) // capture traces of all possible contended mutex holders
profilerRouter := mux.NewRouter()
profilerRouter.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)
if err := http.ListenAndServe(":6060", profilerRouter); err != nil {
panic(err)
}
}()
utils.Log.Debugln("Debug logging and profiling enabled. Available types of profiles at http://localhost:6060/debug/pprof")
}

// set a new route handler
Expand Down Expand Up @@ -92,7 +101,7 @@ func (ps *platformServer) initialize() {
}

// set up a listener to receive REST bridge configs for services and set them up according to their specs
lcmChanHandler, err := busInstance.ListenStreamForDestination(service.LifecycleManagerChannelName, busInstance.GetId())
lcmChanHandler, err := ps.eventbus.ListenStreamForDestination(service.LifecycleManagerChannelName, ps.eventbus.GetId())
if err != nil {
utils.Log.Fatalln(err)
}
Expand All @@ -104,7 +113,7 @@ func (ps *platformServer) initialize() {
}

fabricSvc, _ := serviceRegistryInstance.GetService(request.ServiceChannel)
svcReadyStore := busInstance.GetStoreManager().GetStore(service.ServiceReadyStore)
svcReadyStore := ps.eventbus.GetStoreManager().GetStore(service.ServiceReadyStore)
hooks := svcLifecycleManager.GetServiceHooks(request.ServiceChannel)

if request.Override {
Expand Down Expand Up @@ -135,7 +144,7 @@ func (ps *platformServer) initialize() {

// create an internal bus channel to notify significant changes in sessions such as disconnect
if ps.serverConfig.FabricConfig != nil {
channelManager := busInstance.GetChannelManager()
channelManager := ps.eventbus.GetChannelManager()
channelManager.CreateChannel(bus.STOMP_SESSION_NOTIFY_CHANNEL)
}

Expand Down
9 changes: 5 additions & 4 deletions plank/pkg/server/initialize_rest_bridge_override_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestInitialize_DebugLogging(t *testing.T) {

func TestInitialize_RestBridgeOverride(t *testing.T) {
// arrange
bus.ResetBus()
newBus := bus.ResetBus()
service.ResetServiceRegistry()
testRoot := filepath.Join(os.TempDir(), "plank-tests")
_ = os.MkdirAll(testRoot, 0755)
Expand All @@ -47,6 +47,7 @@ func TestInitialize_RestBridgeOverride(t *testing.T) {
cfg := GetBasicTestServerConfig(testRoot, "stdout", "stdout", "stderr", GetTestPort(), true)
baseUrl, _, testServerInterface := CreateTestServer(cfg)
testServer := testServerInterface.(*platformServer)
testServer.eventbus = newBus

// register ping pong service with default bridge points of /rest/ping-pong, /rest/ping-pong2 and /rest/ping-pong/{from}/{to}/{message}
testServerInterface.RegisterService(services.NewPingPongService(), services.PingPongServiceChan)
Expand All @@ -62,8 +63,8 @@ func TestInitialize_RestBridgeOverride(t *testing.T) {
oldRouter := testServer.router

// assert
RunWhenServerReady(t, bus.GetBus(), func(t2 *testing.T) {
_ = bus.GetBus().SendResponseMessage(service.LifecycleManagerChannelName, &service.SetupRESTBridgeRequest{
RunWhenServerReady(t, newBus, func(t2 *testing.T) {
_ = newBus.SendResponseMessage(service.LifecycleManagerChannelName, &service.SetupRESTBridgeRequest{
ServiceChannel: services.PingPongServiceChan,
Override: true,
Config: []*service.RESTBridgeConfig{
Expand All @@ -76,7 +77,7 @@ func TestInitialize_RestBridgeOverride(t *testing.T) {
},
},
},
}, bus.GetBus().GetId())
}, newBus.GetId())

// router instance should have been swapped
time.Sleep(1 * time.Second)
Expand Down
Loading

0 comments on commit c8a42e8

Please sign in to comment.