import { Injectable } from "@angular/core";
import { AssistantEvent, LongRunningTask, WebsocketNotification } from "@visoryplatform/threads";
import { Observable } from "rxjs";
import { filter } from "rxjs/operators";
import { WebsocketService } from "./websocket.service";

enum WebsocketAction {
    Observe = "observe",
    Unobserve = "unobserve",
    MarkAsSeen = "markAsSeen",
}

@Injectable({ providedIn: "root" })
export class ThreadsWebsocketService {
    private threadEvents: Observable<WebsocketNotification>;
    private assistantEvents: Observable<AssistantEvent>;
    private longRunningTaskEvents: Observable<LongRunningTask>;

    constructor(private websocket: WebsocketService) {
        this.threadEvents = this.websocket
            .getEvents()
            .pipe(filter((event): event is WebsocketNotification => this.isWebsocketNotification(event)));

        this.assistantEvents = this.websocket
            .getEvents()
            .pipe(filter((event): event is AssistantEvent => this.isAssistantEvent(event)));

        this.longRunningTaskEvents = this.websocket
            .getEvents()
            .pipe(filter((event): event is LongRunningTask => this.isLongRunningTaskEvent(event)));
    }

    observe(threadId: string): void {
        this.websocket.send(WebsocketAction.Observe, { objectId: threadId });
    }

    unobserve(threadId: string): void {
        this.websocket.send(WebsocketAction.Unobserve, { objectId: threadId });
    }

    markAsSeen(threadId: string, cardId: string, participantId: string): void {
        this.websocket.send(WebsocketAction.MarkAsSeen, { threadId, cardId, participantId });
    }

    watchThreadId(threadId: string): Observable<WebsocketNotification> {
        return this.threadEvents.pipe(filter((event) => event.threadId === threadId));
    }

    watchThreadParticipantId(participantId: string): Observable<WebsocketNotification> {
        return this.threadEvents.pipe(filter((event) => event.participantId === participantId));
    }

    watchCardId(threadId: string, cardId?: string): Observable<WebsocketNotification> {
        return this.watchThreadId(threadId).pipe(filter((event) => !cardId || event.cardId === cardId));
    }

    watchAssistantChat(chatId: string): Observable<AssistantEvent> {
        return this.assistantEvents.pipe(filter((event) => event.chatId === chatId));
    }

    watchLongRunningTask(taskId: string): Observable<LongRunningTask> {
        return this.longRunningTaskEvents.pipe(filter((event) => event.id === taskId));
    }

    connectAllEvents(): Observable<WebsocketNotification> {
        return this.threadEvents;
    }

    private isWebsocketNotification(event: unknown): event is WebsocketNotification {
        return event && !!(event as WebsocketNotification).threadId;
    }

    private isAssistantEvent(event: unknown): event is AssistantEvent {
        return event && typeof event === "object" && "chatId" in event;
    }

    private isLongRunningTaskEvent(event: unknown): event is LongRunningTask {
        return event && typeof event === "object" && "id" in event && "status" in event && "params" in event;
    }
}
