Transport is a full stack, simple, fast, expandable application event bus for your applications. It can transport anything you want around your application, as well extend different channels to brokers and destinations. Transport makes it easy to move your bits around localized or distribiuted apps, without worrying about all the wiring.
Transport is an event bus, that allows application developers to build components that can talk to one another, really easily. It provides a standardized and simple API, implemented in multiple languages, to allow any individual component inside your applications to talk to one another.
It really comes to life when you use it to send messages, requests, responses and events around your backend and front-end. Your backend can stream messages to your UI, or other services or agents, as if they were sitting right next to each other, You can build one to one, many to one and many to many topologies with ease.
Channels can be extended to major brokers like Kafka or RabbitMQ, so Transport becomes an 'on/off-ramp' for your main sources of truth.
plank is a micro platform for building just about anything you can think of, it exposes RESTful and AsyncAPI & Pub/Sub endpoints
for services that can do just about anything. Read more about plank
Install transport
go get -u github.com/vmware/transport-go
To create an instance of the bus
import "github.com/vmware/transport-go/bus"
var transport = bus.GetBus()
Transport is a singleton, there is (should) only ever a single instance of the bus in your application.
The ChannelManager
interface on the EventBus
interface facilitates all Channel operations.
channelManager := transport.GetChannelManager()
The CreateChannel
method will create a new channel with the name "some-channel". It will return a pointer to a
Channel
object. You don't need to hold on to that pointer if you don't want to. The channel will still exist.
channel := channelManager.CreateChanel("some-channel")
A simple ping pong looks a little like this.
// listen for a single request on 'some-channel'
ts := bus.GetBus()
channel := "some-channel"
ts.GetChannelManager().CreateChannel(channel)
// listen for a single request on 'some-channel'
requestHandler, _ := ts.ListenRequestStream(channel)
requestHandler.Handle(
func(msg *model.Message) {
pingContent := msg.Payload.(string)
fmt.Printf("\nPing: %s\n", pingContent)
// send a response back.
ts.SendResponseMessage(channel, pingContent, msg.DestinationId)
},
func(err error) {
// something went wrong...
})
// send a request to 'some-channel' and handle a single response
responseHandler, _ := ts.RequestOnce(channel, "Woo!")
responseHandler.Handle(
func(msg *model.Message) {
fmt.Printf("Pong: %s\n", msg.Payload.(string))
},
func(err error) {
// something went wrong...
})
// fire the request.
responseHandler.Fire()
This will output:
π Transport booted with id [e495e5d5-2b72-46dd-8013-d49049bd4800]
Ping: Woo!
Pong: Woo!
You can see this all working live in some of our interactive demos for Transport TypeScript.
it shows Transport acting as both client and server, in which we use plank
to run
services, and the UI subscribes to those services and talks to them.
We have a live and running instance of plank
operating at
transport-bus.io. You can try the example code below to use the sample
simple stream service and
see how simple it is for yourself.
// get a pointer to the bus.
b := bus.GetBus()
// get a pointer to the channel manager
cm := b.GetChannelManager()
// create a broker connector config and connect to
// transport-bus.io over WebSocket using TLS.
config := &bridge.BrokerConnectorConfig{
Username: "guest", // not required for demo, but our API requires it.
Password: "guest", // ^^ same.
ServerAddr: "transport-bus.io", // our live broker running plank and demo services.
UseWS: true, // connect over websockets
WebSocketConfig: &bridge.WebSocketConfig{ // configure websocket
WSPath: "/ws", // websocket endpoint
UseTLS: true,
// use TLS/HTTPS. When using TLS, you can supply your own TLSConfig value, or we can
// generate a basic one for you if you leave TLSConfig empty. In most cases,
// you won't need to supply one.
}}
// connect to transport-bus.io demo broker
c, err := b.ConnectBroker(config)
if err != nil {
utils.Log.Fatalf("unable to connect to transport-bus.io, error: %v", err.Error())
}
// create a local channel on the bus.
myLocalChan := "my-stream"
cm.CreateChannel(myLocalChan)
// listen to stream of messages coming in on channel, a handler is returned
// that allows you to add in lambdas that handle your success messages, and your errors.
handler, _ := b.ListenStream(myLocalChan)
// mark our local 'my-stream' myLocalChan as galactic and map it to our connection and
// the /topic/simple-stream service
err = cm.MarkChannelAsGalactic(myLocalChan, "/topic/simple-stream", c)
if err != nil {
utils.Log.Fatalf("unable to map local channel to broker destination: %e", err)
}
// collect the streamed values in a slice
var streamedValues []string
// create a wait group that will wait 10 times before completing.
var wg sync.WaitGroup
wg.Add(10)
// keep listening
handler.Handle(
func(msg *model.Message) {
// unmarshal the message payload into a model.Response object
// this is a wrapper transport uses when being used as a server,
// it encapsulates a rich set of data about the message,
// but you only really care about the payload (body)
r := &model.Response{}
d := msg.Payload.([]byte)
err := json.Unmarshal(d, &r)
if err != nil {
utils.Log.Errorf("error unmarshalling request: %v", err.Error())
return
}
// the value we want is in the payload of our model.Response
value := r.Payload.(string)
// log it and save it to our streamedValues
utils.Log.Infof("stream ticked: %s", value)
streamedValues = append(streamedValues, value)
wg.Done()
},
func(err error) {
utils.Log.Errorf("error received on channel: %e", err)
})
// wait for 10 ticks of the stream, then we're done.
wg.Wait()
// close our handler, we're done.
handler.Close()
// mark channel as local (unsubscribe from all mappings)
err = cm.MarkChannelAsLocal(myLocalChan)
if err != nil {
utils.Log.Fatalf("unable to unsubscribe, error: %e", err)
}
// disconnect
err = c.Disconnect()
if err != nil {
utils.Log.Fatalf("unable to disconnect, error: %e", err)
}
// return what we got from the stream.
return streamedValues
You can see this simple example here
plank
is 'just enough' of a platform for building just about anything you want. Run RESTful and AsyncAPIs with the
same code, build simple or complex services that can be exposed to any client in any manner. Talk over WebSockets and pub/sub
and streaming, or call the same APIs via REST mappings. plank
can do it all. It's tiny, super-fast and runs on any platform. Runs in
just a few megabytes of memory, and can be compiled down to the same. It can be used for micro-services, daemons, agents,
local UI helper applications... anything!
plank
is only available in transport-go
The transport-go project team welcomes contributions from the community. Before you start working with transport-go, please read our Developer Certificate of Origin. All contributions to this repository must be signed as described on that page. Your signature certifies that you wrote the patch or have the right to pass it on as an open-source patch. For more detailed information, refer to CONTRIBUTING.md.
BSD-2-Clause
Copyright (c) 2016-2021, VMware, Inc.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.