import { Injectable } from '@angular/core';
import { AppConfig } from '@app/app.config';
import {
  catchError,
  filter,
  finalize,
  Observable,
  retry,
  Subject,
  Subscription,
  switchMap,
  timeout,
  timer,
} from 'rxjs';
import { SseClient } from '../sse/sse-client.service';
import { ChannelsService } from './channels.service';
import { EventType } from './event-type.enum';

@Injectable({ providedIn: 'root' })
export class EventsService {
  private subscription: Subscription;

  private readonly onmessage$ = new Subject<MessageEvent>();

  constructor(private sseClient: SseClient, private readonly channelsService: ChannelsService) {}

  connect(): void {
    const url = `${AppConfig.API_URL_NOTIFICATION}/eventos/source`;
    this.subscription?.unsubscribe();
    this.subscription = this.sseClient
      .stream(url, { keepAlive: true, reconnectionDelay: 5_000 })
      .subscribe(event => {
        if (event.type !== 'error') {
          this.onmessage$.next(event as MessageEvent);
          return;
        }

        const errorEvent = event as ErrorEvent;
        if (errorEvent.error?.status > 0) {
          console.error(errorEvent.message, errorEvent.error);
        }
      });
  }

  disconnect(): void {
    this.subscription?.unsubscribe();
    this.subscription = undefined;
  }

  listen<T extends object = any>(type: EventType): Observable<T> {
    return new Observable<T>(observer => {
      const subscription = this.onmessage$
        .pipe(filter(event => event.type === type))
        .subscribe(event => {
          observer.next(JSON.parse(event.data) as T);
        });

      return () => subscription.unsubscribe();
    });
  }

  /**
   * Escutar eventos de um canal.
   * - Adiciona o usuário logado ao canal e escuta os eventos do canal.
   * - Caso não exista um canal com o ID, será criado um novo.
   * @param channelId UUID do canal.
   */
  channel<T extends object = any>(channelId: string): Observable<T> {
    return this.channelsService.subscribe(channelId).pipe(
      retry({
        count: 3,
        delay: (_, retryAttempt: number) => timer(retryAttempt * 2_000),
        resetOnSuccess: true,
      }),
      switchMap(() => this.listenChannel(channelId)),
      finalize(() =>
        this.channelsService
          .unsubscribe(channelId)
          .pipe(
            timeout(15_000),
            catchError(() => [])
          )
          .subscribe()
      )
    );
  }

  private listenChannel<T extends object = any>(channelId: string): Observable<T> {
    return new Observable<T>(observer => {
      const subscription = this.onmessage$
        .pipe(filter(event => event.lastEventId === channelId))
        .subscribe(event => {
          observer.next(JSON.parse(event.data) as T);
        });

      return () => subscription.unsubscribe();
    });
  }
}
