import {Injectable} from '@angular/core';
import {Client} from '@stomp/stompjs';
import {environment} from "../../environments/environment";
import {BehaviorSubject, mergeWith, Observable, ReplaySubject, Subject, switchMap} from "rxjs";
import {filter, map} from "rxjs/operators";
import {WebsocketTopicOptions} from "./websocket-topic-options";
import {observableTopic} from "./topic.observable";
import {IMessage} from "@stomp/stompjs/src/i-message";

export const defaultTopicOptions: WebsocketTopicOptions = {
  observe: 'body'
}

@Injectable({
  providedIn: 'root'
})
export class WebsocketService {

  private _client: Client | undefined;
  private _connected: Subject<boolean> = new BehaviorSubject(false);
  private error$: Subject<any> = new ReplaySubject();

  constructor() {}

  private get client(): Client {
    // NOTE: initialize the first time we need to subscribe a topic
    if (!this._client) {
      this._client = new Client({
        brokerURL: environment.wsUrl,
        onConnect: () => this._connected.next(true),
        onStompError: error => this.error$.error(error),
        onWebSocketError: error => this.error$.error(error)
      });
    }
    return this._client;
  }

  private get whenConnected(): Observable<void> {
    return this._connected.pipe(
      filter(connected => connected),
      map(() => undefined)
    );
  }

  createTopic(name: string, options: Omit<WebsocketTopicOptions, 'observe'> & { observe: 'message'}): Observable<IMessage>
  createTopic<T>(name: string, options: Omit<WebsocketTopicOptions, 'observe'> & { observe: 'body'}): Observable<T>
  createTopic<T>(name: string, options: WebsocketTopicOptions = defaultTopicOptions): Observable<IMessage | T> {
    if (!this.client.active) {
      this.client.activate();
    }
    return this.whenConnected.pipe(
      switchMap(() => {
        let topic$ = observableTopic(this.client, name);
        if (options.observe === 'body') {
          return topic$.pipe(map(message => JSON.parse(message.body) as T))
        }
        return topic$;
      }),
      mergeWith(this.error$),
    );
  }
}
