Skip to content

Commit

Permalink
Add request/response info to MessageSubject, subprotocol support, fix…
Browse files Browse the repository at this point in the history
…ed onComplete not passing through
  • Loading branch information
mbonneau committed Feb 2, 2016
1 parent 2a8c361 commit b9587a0
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 10 deletions.
18 changes: 17 additions & 1 deletion src/Websocket/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ private function startConnection()
// TODO: Should validate response
//$cNegotiator->validateResponse($response);

$subprotoHeader = "";

$psr7Response = new \GuzzleHttp\Psr7\Response(
$response->getCode(),
$response->getHeaders(),
null,
$response->getVersion()
);

if (count($psr7Response->getHeader('Sec-WebSocket-Protocol')) == 1) {
$subprotoHeader = $psr7Response->getHeader('Sec-WebSocket-Protocol')[0];
}

parent::onNext(new MessageSubject(
new AnonymousObservable(function (ObserverInterface $observer) use ($response) {
$response->on('data', function ($data) use ($observer) {
Expand Down Expand Up @@ -106,7 +119,10 @@ function () use ($request) {
}
),
true,
$this->useMessageObject
$this->useMessageObject,
$subprotoHeader,
$cNegotiator->getRequest(),
$psr7Response
));
});

Expand Down
43 changes: 38 additions & 5 deletions src/Websocket/MessageSubject.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Rx\Websocket;

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Rx\Websocket\RFC6455\Messaging\Protocol\Frame;
use Rx\Observable;
use Rx\Observable\AnonymousObservable;
Expand All @@ -27,21 +29,34 @@ class MessageSubject extends Subject
/** @var string */
private $subProtocol;

/** @var RequestInterface */
private $request;

/** @var ResponseInterface */
private $response;

/**
* ConnectionSubject constructor.
* @param ObservableInterface $rawDataIn
* @param ObserverInterface $rawDataOut
* @param bool $mask
* @param bool $useMessageObject
* @param string $subProtocol
* @param RequestInterface $request
* @param ResponseInterface $response
*/
public function __construct(
ObservableInterface $rawDataIn,
ObserverInterface $rawDataOut,
$mask = false,
$useMessageObject = false,
$subProtocol = ""
$subProtocol = "",
RequestInterface $request,
ResponseInterface $response
) {
$this->request = $request;
$this->response = $response;

$this->rawDataIn = new AnonymousObservable(function ($observer) use ($rawDataIn) {
return $rawDataIn->subscribe($observer);
});
Expand Down Expand Up @@ -92,10 +107,12 @@ function () use ($frames) {
->filter(function (Frame $frame) {
return $frame->getOpcode() === $frame::OP_PING;
})
->subscribe(new CallbackObserver(function (Frame $frame) {
$pong = new Frame($frame->getPayload(), true, Frame::OP_PONG);
$this->sendFrame($pong);
}));
->subscribe(new CallbackObserver(
function (Frame $frame) {
$pong = new Frame($frame->getPayload(), true, Frame::OP_PONG);
$this->sendFrame($pong);
}
));

$frames
->filter(function (Frame $frame) {
Expand Down Expand Up @@ -177,4 +194,20 @@ public function getSubProtocol()
{
return $this->subProtocol;
}

/**
* @return RequestInterface
*/
public function getRequest()
{
return $this->request;
}

/**
* @return ResponseInterface
*/
public function getResponse()
{
return $this->response;
}
}
26 changes: 24 additions & 2 deletions src/Websocket/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Rx\Websocket;

use Guzzle\Http\Message\RequestInterface;
use GuzzleHttp\Psr7\Uri;
use Rx\Websocket\RFC6455\Encoding\Validator;
use Rx\Websocket\RFC6455\Handshake\Negotiator;
use React\Http\Request;
Expand Down Expand Up @@ -56,12 +58,21 @@ private function startServer()

$http = new \React\Http\Server($socket);
$http->on('request', function (Request $request, Response $response) use ($negotiator) {
$uri = new Uri($request->getPath());
if (count($request->getQuery()) > 0) {
$uri = $uri->withQuery(\GuzzleHttp\Psr7\build_query($request->getQuery()));
}

$psrRequest = new \GuzzleHttp\Psr7\Request(
$request->getMethod(),
$request->getPath(),
$uri,
$request->getHeaders()
);

// cram the remote address into the header in out own X- header so
// the user will have access to it
$psrRequest = $psrRequest->withAddedHeader("X-RxWebsocket-Remote-Address", $request->remoteAddress);

$negotiatorResponse = $negotiator->handshake($psrRequest);

$response->writeHead(
Expand All @@ -77,6 +88,11 @@ private function startServer()
return;
}

$subProtocol = "";
if (count($negotiatorResponse->getHeader('Sec-WebSocket-Protocol')) > 0) {
$subProtocol = $negotiatorResponse->getHeader('Sec-WebSocket-Protocol')[0];
}

$connection = new MessageSubject(
new AnonymousObservable(
function (ObserverInterface $observer) use ($request) {
Expand All @@ -89,6 +105,9 @@ function (ObserverInterface $observer) use ($request) {
$request->on('close', function () use ($observer) {
$observer->onCompleted();
});
$request->on('end', function () use ($observer) {
$observer->onCompleted();
});

return new CallbackDisposable(
function () use ($request) {
Expand All @@ -109,7 +128,10 @@ function () use ($response) {
}
),
false,
$this->useMessageObject
$this->useMessageObject,
$subProtocol,
$psrRequest,
$negotiatorResponse
);

$this->connectionSubject->onNext($connection);
Expand Down
4 changes: 3 additions & 1 deletion src/Websocket/WebsocketFrameOperator.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ function ($data) use ($observer) {
$this->frame = new Frame();
}
}
}
},
[$observer, 'onError'],
[$observer, 'onCompleted']
));
}
}
4 changes: 3 additions & 1 deletion src/Websocket/WebsocketMessageOperator.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ function ($frame) use ($observer) {
}
$this->message = new Message();
}
}
},
[$observer, 'onError'],
[$observer, 'onCompleted']
));
}
}

0 comments on commit b9587a0

Please sign in to comment.