import { NEVER, Observable, Subject, throwError } from 'rxjs';
import { switchMap, take, takeUntil, tap } from 'rxjs/operators';
import { SessionManagerService } from '@nida-web/api/rest/authentication';
import { MqttConnectionState } from '@meddv/ngx-mqtt';
import notify from 'devextreme/ui/notify';
import { ConfigService } from '@nida-web/api/rest/config';
import { MQTTSubscribeListenerService } from '@nida-web/api-mqtt-wrapper';
import { NotificationBrokerService } from './notification-broker.service';

export abstract class NotificationServable {
  protected startedDataSubscription = false;
  protected unsubscribe$: Subject<void> = new Subject();

  protected constructor(
    protected sessionManagerService: SessionManagerService,
    protected appConfigservice: ConfigService,
    protected mqttService: MQTTSubscribeListenerService,
    protected notificationService: NotificationBrokerService,
    protected topicConfigKey: string
  ) {
    this.prepareMqttDataReceiving(topicConfigKey)
      .pipe(
        switchMap((mqttMessage) => {
          return this.handleMQTTMessage(mqttMessage);
        })
      )
      .subscribe();
  }

  /**
   * Always listens for changes of the login status.
   * Activates the mqtt connection based on the login status.
   * @param configKey
   * @private
   */
  protected prepareMqttDataReceiving(configKey: string): Observable<string> {
    return this.sessionManagerService.onSessionInformationChanged().pipe(
      switchMap((sessionInformation) => {
        // move set of clinics to checkAssignment() because they do not get updated.
        //this.clinicIds = sessionInformation.clinics ? sessionInformation.clinics : [];
        if (sessionInformation.loggedIn) {
          return this.startMqttDataReceiving(configKey);
        } else {
          this.stopMqttDataReceiving();
          return NEVER;
        }
      })
    );
  }

  /**
   * Always listen for changes of the mqtt connection status.
   * Builts the filter based on the connection status.
   * @param configKey
   * @private
   */
  protected startMqttDataReceiving(configKey: string): Observable<never> | Observable<string> {
    return this.appConfigservice.getConfigsByGroup('mqtt').pipe(
      take(1),
      takeUntil(this.unsubscribe$),
      switchMap((configs) =>
        // this subscription should not be cancelled until the user is logging out
        this.mqttService.getConnectionStatusObservable().pipe(
          switchMap((state) => {
            switch (state) {
              case MqttConnectionState.CONNECTED: {
                //find configKey
                const config = configs.data.filter((config) => config.key === configKey && config.value);

                //configKey should only be included once, because otherwise the configuration is messed up
                if (config.length === 1 && config[0].value) {
                  this.loadFilterSetting();
                  return this.mqttService.registerTopic(config[0].value).pipe(
                    tap(() => (this.startedDataSubscription = true)),
                    takeUntil(this.unsubscribe$)
                  );
                } else {
                  this.stopMqttDataReceiving();
                  console.error('configKey was not found or included multiple times.');
                  notify({
                    message: 'Beim Initialisieren der Benachrichtigungen ist ein Fehler aufgetreten.',
                    type: 'Error',
                    displayTime: 5000,
                  });
                  throw throwError('configKey was not found or included multiple times.');
                }
              }
              case MqttConnectionState.CONNECTING: {
                return NEVER;
              }
              case MqttConnectionState.CLOSED: {
                this.stopMqttDataReceiving();
                return NEVER;
              }
            }
          })
        )
      )
    );
  }

  protected stopMqttDataReceiving(): void {
    if (this.startedDataSubscription) {
      this.unsubscribe$.next();
      this.unsubscribe$ = new Subject();
      this.notificationService.resetFilterFieldList();
      this.notificationService.setFilterInitializedStatus(false);
      this.startedDataSubscription = false;
    }
  }

  protected abstract loadFilterSetting();
  protected abstract handleMQTTMessage(mqttMessage: string): Observable<any>;
}
