diff --git a/src/Websocket/Client.php b/src/Websocket/Client.php index 87e1589..d212309 100644 --- a/src/Websocket/Client.php +++ b/src/Websocket/Client.php @@ -65,6 +65,19 @@ private function startConnection() // TODO: Should validate response //$cNegotiator->validateResponse($response); + $subprotoHeader = ""; + + $psr7Response = new \GuzzleHttp\Psr7\Response( + $response->getCode(), + $response->getHeaders(), + null, + $response->getVersion() + ); + + if (count($psr7Response->getHeader('Sec-WebSocket-Protocol')) == 1) { + $subprotoHeader = $psr7Response->getHeader('Sec-WebSocket-Protocol')[0]; + } + parent::onNext(new MessageSubject( new AnonymousObservable(function (ObserverInterface $observer) use ($response) { $response->on('data', function ($data) use ($observer) { @@ -106,7 +119,10 @@ function () use ($request) { } ), true, - $this->useMessageObject + $this->useMessageObject, + $subprotoHeader, + $cNegotiator->getRequest(), + $psr7Response )); }); diff --git a/src/Websocket/MessageSubject.php b/src/Websocket/MessageSubject.php index ea7a0ef..229b858 100644 --- a/src/Websocket/MessageSubject.php +++ b/src/Websocket/MessageSubject.php @@ -2,6 +2,8 @@ namespace Rx\Websocket; +use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; use Rx\Websocket\RFC6455\Messaging\Protocol\Frame; use Rx\Observable; use Rx\Observable\AnonymousObservable; @@ -27,6 +29,12 @@ class MessageSubject extends Subject /** @var string */ private $subProtocol; + /** @var RequestInterface */ + private $request; + + /** @var ResponseInterface */ + private $response; + /** * ConnectionSubject constructor. * @param ObservableInterface $rawDataIn @@ -34,14 +42,21 @@ class MessageSubject extends Subject * @param bool $mask * @param bool $useMessageObject * @param string $subProtocol + * @param RequestInterface $request + * @param ResponseInterface $response */ public function __construct( ObservableInterface $rawDataIn, ObserverInterface $rawDataOut, $mask = false, $useMessageObject = false, - $subProtocol = "" + $subProtocol = "", + RequestInterface $request, + ResponseInterface $response ) { + $this->request = $request; + $this->response = $response; + $this->rawDataIn = new AnonymousObservable(function ($observer) use ($rawDataIn) { return $rawDataIn->subscribe($observer); }); @@ -92,10 +107,12 @@ function () use ($frames) { ->filter(function (Frame $frame) { return $frame->getOpcode() === $frame::OP_PING; }) - ->subscribe(new CallbackObserver(function (Frame $frame) { - $pong = new Frame($frame->getPayload(), true, Frame::OP_PONG); - $this->sendFrame($pong); - })); + ->subscribe(new CallbackObserver( + function (Frame $frame) { + $pong = new Frame($frame->getPayload(), true, Frame::OP_PONG); + $this->sendFrame($pong); + } + )); $frames ->filter(function (Frame $frame) { @@ -177,4 +194,20 @@ public function getSubProtocol() { return $this->subProtocol; } + + /** + * @return RequestInterface + */ + public function getRequest() + { + return $this->request; + } + + /** + * @return ResponseInterface + */ + public function getResponse() + { + return $this->response; + } } diff --git a/src/Websocket/Server.php b/src/Websocket/Server.php index a4af2cc..3458478 100644 --- a/src/Websocket/Server.php +++ b/src/Websocket/Server.php @@ -2,6 +2,8 @@ namespace Rx\Websocket; +use Guzzle\Http\Message\RequestInterface; +use GuzzleHttp\Psr7\Uri; use Rx\Websocket\RFC6455\Encoding\Validator; use Rx\Websocket\RFC6455\Handshake\Negotiator; use React\Http\Request; @@ -56,12 +58,21 @@ private function startServer() $http = new \React\Http\Server($socket); $http->on('request', function (Request $request, Response $response) use ($negotiator) { + $uri = new Uri($request->getPath()); + if (count($request->getQuery()) > 0) { + $uri = $uri->withQuery(\GuzzleHttp\Psr7\build_query($request->getQuery())); + } + $psrRequest = new \GuzzleHttp\Psr7\Request( $request->getMethod(), - $request->getPath(), + $uri, $request->getHeaders() ); + // cram the remote address into the header in out own X- header so + // the user will have access to it + $psrRequest = $psrRequest->withAddedHeader("X-RxWebsocket-Remote-Address", $request->remoteAddress); + $negotiatorResponse = $negotiator->handshake($psrRequest); $response->writeHead( @@ -77,6 +88,11 @@ private function startServer() return; } + $subProtocol = ""; + if (count($negotiatorResponse->getHeader('Sec-WebSocket-Protocol')) > 0) { + $subProtocol = $negotiatorResponse->getHeader('Sec-WebSocket-Protocol')[0]; + } + $connection = new MessageSubject( new AnonymousObservable( function (ObserverInterface $observer) use ($request) { @@ -89,6 +105,9 @@ function (ObserverInterface $observer) use ($request) { $request->on('close', function () use ($observer) { $observer->onCompleted(); }); + $request->on('end', function () use ($observer) { + $observer->onCompleted(); + }); return new CallbackDisposable( function () use ($request) { @@ -109,7 +128,10 @@ function () use ($response) { } ), false, - $this->useMessageObject + $this->useMessageObject, + $subProtocol, + $psrRequest, + $negotiatorResponse ); $this->connectionSubject->onNext($connection); diff --git a/src/Websocket/WebsocketFrameOperator.php b/src/Websocket/WebsocketFrameOperator.php index d43b610..47bcca2 100644 --- a/src/Websocket/WebsocketFrameOperator.php +++ b/src/Websocket/WebsocketFrameOperator.php @@ -76,7 +76,9 @@ function ($data) use ($observer) { $this->frame = new Frame(); } } - } + }, + [$observer, 'onError'], + [$observer, 'onCompleted'] )); } } diff --git a/src/Websocket/WebsocketMessageOperator.php b/src/Websocket/WebsocketMessageOperator.php index a472261..73af33f 100644 --- a/src/Websocket/WebsocketMessageOperator.php +++ b/src/Websocket/WebsocketMessageOperator.php @@ -59,7 +59,9 @@ function ($frame) use ($observer) { } $this->message = new Message(); } - } + }, + [$observer, 'onError'], + [$observer, 'onCompleted'] )); } }