import { BehaviorSubject, Observable, Subject, Subscription } from "rxjs";
import { delayWhen, filter, mapTo, take } from "rxjs/operators";

/*
    Extracted from threads-frontend. Could be moved to common Findex library
*/

export class WebsocketService {
    private reconnectDelay = 1000;
    private reconnect = true;
    private socket: WebSocket;
    private socketState = new BehaviorSubject<boolean>(false);
    private subject = new Subject<any>();
    private sendQueue = new Subject<any>();
    private sendSubscription: Subscription;

    constructor(
        private authHeaders: Observable<any>,
        private websocketEndpoint: string,
    ) {
        this.createSocket();
        this.sendSubscription = this.sendQueue
            .pipe(delayWhen(() => this.waitForSocket()))
            .subscribe((message) => this.socketSend(message));
    }

    getEvents<EventData>(): Observable<EventData> {
        return this.subject;
    }

    send(action: string, data: any) {
        if (!action) {
            return;
        }
        this.sendQueue.next({ action, data });
    }

    close() {
        this.subject.complete();
        this.socketState.complete();
        this.sendQueue.complete();

        this.reconnect = false;

        if (this.sendSubscription) {
            this.sendSubscription.unsubscribe();
        }

        if (this.socket) {
            this.socket.close();
        }
    }

    private waitForSocket(): Observable<void> {
        return this.socketState.pipe(
            filter((state) => state === true),
            mapTo(null),
        );
    }

    private async waitForAuthHeaders(): Promise<any> {
        return await this.authHeaders.pipe(take(1)).toPromise();
    }

    private async createSocket() {
        const headers = await this.waitForAuthHeaders();
        if (!headers) {
            return;
        }
        const queryString = Object.entries(headers)
            .map(([key, val]) => `${key}=${encodeURIComponent(val as string)}`)
            .join("&");

        const url = `${this.websocketEndpoint}?${queryString}`;
        this.socket = new WebSocket(url);

        this.socket.onopen = this.socketOpened;
        this.socket.onmessage = this.socketMessage;
        this.socket.onerror = this.socketError;
        this.socket.onclose = this.socketClosed;
    }

    private socketSend(message: any) {
        console.log("Sending websocket message", message);
        this.socket.send(JSON.stringify(message));
    }

    private socketClosed = (event: CloseEvent) => {
        console.info("Socket closed", event.code);
        this.socketState.next(false);

        if (!this.reconnect) {
            return;
        }
        setTimeout(() => this.createSocket(), this.reconnectDelay);
    };

    private socketOpened = (_event: any) => {
        console.info("Socket opened");
        this.socketState.next(true);
    };

    private socketMessage = (event: any) => {
        try {
            const data = JSON.parse(event.data);
            this.subject.next(data);
        } catch (err) {
            console.error("Error processing event", event, err);
        }
    };

    private socketError = (event: any) => {
        console.error("socket error", event);
        this.socket.close();
    };
}
