Skip to content

Commit

Permalink
Merge pull request #17 from SloMR/FixAttachments
Browse files Browse the repository at this point in the history
Fix attachments multithread, improve GUI
  • Loading branch information
SloMR authored Nov 2, 2024
2 parents 279f3ee + d364e1a commit 65563a3
Show file tree
Hide file tree
Showing 11 changed files with 777 additions and 541 deletions.
383 changes: 239 additions & 144 deletions client/src/app/core/services/file-transfer.service.ts

Large diffs are not rendered by default.

55 changes: 35 additions & 20 deletions client/src/app/core/services/webrtc.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@angular/core';
import { Injectable, NgZone } from '@angular/core';
import { BehaviorSubject, Subject } from 'rxjs';
import { LoggerService } from './logger.service';
import { WebSocketConnectionService } from './websocket-connection.service';
Expand All @@ -14,6 +14,7 @@ import {
MAX_RECONNECT_ATTEMPTS,
RECONNECT_DELAY,
} from '../../utils/constants';
import { MatSnackBar } from '@angular/material/snack-bar';

interface SignalMessage {
type: SignalMessageType;
Expand Down Expand Up @@ -46,20 +47,23 @@ export class WebRTCService {
fromUser: string;
}>();
public fileResponses$ = new Subject<{ accepted: boolean; fromUser: string }>();
public incomingData$ = new Subject<ArrayBuffer>();
public bufferedAmountLow$ = new Subject<void>();
public incomingData$ = new Subject<{ data: ArrayBuffer; fromUser: string }>();
public fileTransferCancelled$ = new Subject<{ fromUser: string }>();
public bufferedAmountLow$ = new Subject<string>();

private peerConnections = new Map<string, RTCPeerConnection>();
private dataChannels = new Map<string, RTCDataChannel>();
private candidateQueues = new Map<string, RTCIceCandidateInit[]>();
private messageQueues = new Map<string, (DataChannelMessage | ArrayBuffer)[]>();

private reconnectAttempts = 0;
private reconnectAttempts = new Map<string, number>();

constructor(
private logger: LoggerService,
private wsService: WebSocketConnectionService,
private userService: UserService
private userService: UserService,
private zone: NgZone,
private snackBar: MatSnackBar
) {
this.wsService.signalMessages$.subscribe((message) => {
if (message) {
Expand Down Expand Up @@ -99,21 +103,16 @@ export class WebRTCService {
}

private reconnect(targetUser: string) {
this.logger.info('Reconnecting WebRTC...');
this.logger.info(`Reconnecting WebRTC with ${targetUser}...`);

const peerConnection = this.peerConnections.get(targetUser);
if (!peerConnection) {
this.logger.warn(`PeerConnection does not exist for user: ${targetUser}`);
return;
}

if (peerConnection) {
peerConnection.close();
this.peerConnections.delete(targetUser);
}

this.initiateConnection(targetUser);
this.reconnectAttempts = 0;
this.reconnectAttempts.set(targetUser, 0);
}

private createPeerConnection(targetUser: string): RTCPeerConnection {
Expand Down Expand Up @@ -167,6 +166,7 @@ export class WebRTCService {

private setupDataChannel(channel: RTCDataChannel, targetUser: string): void {
channel.binaryType = 'arraybuffer';

channel.onopen = () => {
this.logger.info(`Data channel with ${targetUser} is open`);
this.dataChannelOpen$.next(true);
Expand Down Expand Up @@ -206,15 +206,18 @@ export class WebRTCService {

channel.bufferedAmountLowThreshold = BUFFERED_AMOUNT_LOW_THRESHOLD;
channel.onbufferedamountlow = () => {
this.bufferedAmountLow$.next();
this.bufferedAmountLow$.next(targetUser);
};
}

private handleDisconnection(targetUser: string) {
if (this.reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
this.reconnectAttempts++;
const attempts = this.reconnectAttempts.get(targetUser) || 0;
if (attempts < MAX_RECONNECT_ATTEMPTS) {
this.reconnectAttempts.set(targetUser, attempts + 1);
this.logger.warn(
`Attempt ${this.reconnectAttempts}: Reconnecting in ${RECONNECT_DELAY / 1000} seconds...`
`Attempt ${attempts + 1}: Reconnecting to ${targetUser} in ${
RECONNECT_DELAY / 1000
} seconds...`
);

setTimeout(() => {
Expand All @@ -223,8 +226,12 @@ export class WebRTCService {
}
}, RECONNECT_DELAY);
} else {
this.logger.error('Max reconnection attempts reached. Could not reconnect.');
alert('Could not reconnect to the user. Please try again later.');
this.logger.error(
`Max reconnection attempts reached for ${targetUser}. Could not reconnect.`
);
this.snackBar.open('Could not reconnect to the user. Please try again later.', 'Close', {
duration: 5000,
});
this.closePeerConnection(targetUser);
}
}
Expand Down Expand Up @@ -252,13 +259,19 @@ export class WebRTCService {
this.logger.info(`Received file decline from ${targetUser}`);
this.fileResponses$.next({ accepted: false, fromUser: targetUser });
break;
case FILE_TRANSFER_MESSAGE_TYPES.FILE_CANCEL:
this.logger.info(`Received file cancellation from ${targetUser}`);
this.fileTransferCancelled$.next({ fromUser: targetUser });
break;
default:
this.logger.warn(`Unknown message type: ${message.type}`);
}
} else if (data instanceof ArrayBuffer) {
this.incomingData$.next(data);
this.zone.run(() => {
this.incomingData$.next({ data, fromUser: targetUser });
});
} else {
this.logger.warn(`Unknown data type received: ${data}`);
this.logger.warn(`Unknown data type received from ${targetUser}: ${data}`);
}
}

Expand Down Expand Up @@ -453,5 +466,7 @@ export class WebRTCService {
this.dataChannels.clear();
this.peerConnections.clear();
this.candidateQueues.clear();
this.messageQueues.clear();
this.reconnectAttempts.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { BehaviorSubject } from 'rxjs';
import { environment } from '../../environments/environment';
import { LoggerService } from './logger.service';

// WebSocket Message Interface
export interface WebSocketMessage {
type: string;
payload: any;
Expand Down
Loading

0 comments on commit 65563a3

Please sign in to comment.