Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
frhnfrq committed Nov 17, 2018
0 parents commit 7e719ca
Show file tree
Hide file tree
Showing 17 changed files with 1,016 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*.iml
/.idea/
/out
/src/xyz/farhanfarooqui/main
Binary file added lib/gson-2.8.5.jar
Binary file not shown.
Binary file added lib/org.json.jar
Binary file not shown.
126 changes: 126 additions & 0 deletions src/xyz/farhanfarooqui/JRocket/Client.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package xyz.farhanfarooqui.JRocket;

import com.sun.istack.internal.NotNull;
import org.json.JSONObject;

import java.io.IOException;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;

/**
* Represents each client connected to the server. Communicate with the client through an instance of this class. A new client object will be created if the client reconnects
* after disconnecting.
*/
public class Client {
private String mId;
private Communicator mCommunicator;
private HashMap<String, Object> mDatas;

private Client(String id, Communicator communicator) {
mId = id;
mCommunicator = communicator;
mDatas = new HashMap<>();
}

/**
* Creates a client instance for the client which just connected to the server.
*
* @param id Unique Id for each client.
* @param rocketServer Instance of {@link JRocketServer} with which the client is connected to the server.
* @param socket Instance of {@link Socket} over which the client is connected to the server.
* @param executorService All thread operations are performed on this executor service.
*/
static Client createClient(String id, JRocketServer rocketServer, Socket socket, ExecutorService executorService) throws IOException {
Communicator communicator = new Communicator(rocketServer, socket, executorService);
Client client = new Client(id, communicator);

client.mCommunicator.setClientListener(new ClientListener() {
@Override
public void onEventReceive(JRocket JRocket, String event, JSONObject data) {
((JRocketServer) JRocket).onReceiveEvent(event, data, client);
}

@Override
public void onClientDisconnect(JRocket JRocket) {
((JRocketServer) JRocket).onDisconnect(client);
}
});

client.mCommunicator.start();

return client;
}

/**
* @return Unique Id of the client.
*/
public String getId() {
return mId;
}

/**
* Store client datas with unique keys.
*
* @param key key with which the specified data is to be associated.
* @param data data to be associated with the specified key.
*/
public void put(String key, Object data) {
mDatas.put(key, data);
}

/**
* Returns data associated with the key.
*
* @param key The Key with which the data is stored.
* @return The data with which the specified key is stored.
*/
public Object get(String key) {
return mDatas.get(key);
}

/**
* Sends an event to the client with payload.
*
* @param event The event which will be sent to the client.
* @param data The data payload which will be sent to the client. Payloads must be stored in JSON format.
*/
public void send(@NotNull String event, @NotNull JSONObject data) {
mCommunicator.send(event, data);
}

/**
* Event is broadcasted to every client except the calling client
*
* @param event The event which will be sent to the clients.
* @param data The data payload which will be sent to the clients. Payloads must be stored in JSON format.
*/
public void broadCast(@NotNull String event, @NotNull JSONObject data) {
mCommunicator.broadCast(event, data, this);
}

/**
* Disconnects the client from the server.
*/
void disconnect() {
mCommunicator.close();
}

interface ClientListener {
void onEventReceive(JRocket JRocket, String event, JSONObject data);

void onClientDisconnect(JRocket JRocket);
}

@Override
public boolean equals(Object obj) {
return obj instanceof Client && ((Client) obj).getId().equals(getId());
}

@Override
public int hashCode() {
int hashCode = 1;
hashCode = 37 * hashCode + mId.hashCode();
return hashCode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package xyz.farhanfarooqui.JRocket.ClientListeners;

import org.json.JSONObject;

public interface OnReceiveListener {
void onReceive(JSONObject data);
}
41 changes: 41 additions & 0 deletions src/xyz/farhanfarooqui/JRocket/ClientReceiver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package xyz.farhanfarooqui.JRocket;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;

/**
* ClientReceiver handles new Socket connections
* it accepts a connection, and creates a Communicator for that.
*/

class ClientReceiver extends Thread {
private ServerSocket serverSocket;
private JRocketServer rocketServer;
private ExecutorService executorService;

ClientReceiver(JRocketServer rocketServer, ServerSocket serverSocket, ExecutorService executorService) {
this.serverSocket = serverSocket;
this.rocketServer = rocketServer;
this.executorService = executorService;
}

@Override
public void run() {
super.run();
while (!serverSocket.isClosed()) {
try {
long t1 = System.currentTimeMillis();
Socket socket = serverSocket.accept();
socket.setSoTimeout(rocketServer.getHeartBeatRate());
Client client = Client.createClient(Utils.createID(), rocketServer, socket, executorService);
System.out.println(System.currentTimeMillis() - t1);
rocketServer.onConnect(client);
} catch (IOException e) {
e.printStackTrace();
rocketServer.onServerStop();
}
}
}
}
199 changes: 199 additions & 0 deletions src/xyz/farhanfarooqui/JRocket/Communicator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package xyz.farhanfarooqui.JRocket;

import org.json.JSONException;
import org.json.JSONObject;

import java.io.*;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;

import static xyz.farhanfarooqui.JRocket.Constants.*;

/**
* Communicator class handles all the I/O between the client
* and the Server.
*/

class Communicator {
private JRocket mJRocket;
private Socket mSocket;
private Receiver mReceiver;
private Sender mSender;
private boolean hasRun = false;
private Client.ClientListener mClientListener;
private ExecutorService mExecutorService;
private volatile boolean running;
private LinkedBlockingQueue<JSONObject> mQueue;

void setClientListener(Client.ClientListener clientListener) {
this.mClientListener = clientListener;
}

Communicator(JRocket JRocket, Socket socket, ExecutorService executorService) throws IOException {
mJRocket = JRocket;
mSocket = socket;
mExecutorService = executorService;
mReceiver = new Receiver(socket);
mSender = new Sender(socket);
mQueue = new LinkedBlockingQueue<>();
}

private JRocket getJRocket() {
return mJRocket;
}

void start() {
if (!hasRun) {
try {
mExecutorService.execute(mSender);
mExecutorService.execute(mReceiver);
} catch (RejectedExecutionException e) {
e.printStackTrace();
disconnect();
}
this.running = true;
hasRun = true;
}
}

void close() {
disconnect();
}

/**
* Broadcasts to other clients
* <br>
* PS. broadcasts to <b>all of the clients except the client who's broadcasting</b>.
*/

void broadCast(String event, JSONObject data, Client client) {
((JRocketServer) getJRocket()).broadCast(event, data, client);
}

/**
* Sends data
*/
void send(String event, JSONObject data) {
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put(EVENT, event);
jsonObject.put(DATA, data);
mQueue.put(jsonObject);
} catch (JSONException | InterruptedException e) {
e.printStackTrace();
}
}

private class Sender implements Runnable {

private Socket socket;
private OutputStreamWriter outputStreamWriter;

Sender(Socket socket) throws IOException {
this.socket = socket;
outputStreamWriter = new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8);
}

@Override
public void run() {
while (!socket.isClosed()) {
try {
JSONObject jsonObject = mQueue.poll(mJRocket.getHeartBeatRate(), TimeUnit.MILLISECONDS);

if (jsonObject == null) {
send("heartbeat", new JSONObject());
continue;
}

outputStreamWriter.write(jsonObject.toString().length());
outputStreamWriter.write(jsonObject.toString());
outputStreamWriter.flush();
} catch (IOException e) {
try {
outputStreamWriter.close();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
disconnect();
}
}

private class Receiver implements Runnable {
private Socket socket;
private BufferedReader bufferedReader;

Receiver(Socket socket) throws IOException {
this.socket = socket;
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
}

@Override
public void run() {
StringBuilder stringBuilder = new StringBuilder();
int retry = 0;

while (!socket.isClosed()) {
try {

if (retry > 2) {
throw new IOException("Maximum retries reached");
}

int c;
char ch;
int length = bufferedReader.read();
for (int i = 0; i < length; i++) {
c = bufferedReader.read();
ch = (char) c;
stringBuilder.append(ch);
}

JSONObject jsonObject = new JSONObject(stringBuilder.toString());
String event = jsonObject.getString(EVENT);
JSONObject data = jsonObject.getJSONObject(DATA);

mClientListener.onEventReceive(getJRocket(), event, data);
stringBuilder.setLength(0);
retry = 0;
} catch (SocketTimeoutException s) {
retry++;
System.out.println("Time out on read. Trying again " + retry);
} catch (IOException e) {
try {
bufferedReader.close();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
break;
} catch (JSONException e) {
e.printStackTrace();
break;
}
}
disconnect();
}
}

private void disconnect() {
if (running) {
if (!mSocket.isClosed()) {
try {
mSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
mClientListener.onClientDisconnect(getJRocket());
running = false;
}
}
}
6 changes: 6 additions & 0 deletions src/xyz/farhanfarooqui/JRocket/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package xyz.farhanfarooqui.JRocket;

class Constants {
static final String EVENT = "event";
static final String DATA = "data";
}
Loading

0 comments on commit 7e719ca

Please sign in to comment.