import * as Sentry from '@sentry/browser';
import type { ScopeContext } from '@sentry/types';
import { isFunction, isNil, values } from 'lodash';
import {
  BehaviorSubject,
  EMPTY,
  Subject,
  throwError,
  timer,
  type Observable,
  type Subscription as RxJsSubscription,
} from 'rxjs';
import { filter, flatMap, repeatWhen, retryWhen, share } from 'rxjs/operators';
import { webSocket, type WebSocketSubject } from 'rxjs/webSocket';
import {
  AVA_PING,
  AVA_PONG,
  CANCEL,
  CURRENCY,
  MARKET,
  MARKET_DATA_SNAPSHOT,
  PAGE,
  SECURITY,
  SUBSCRIBE,
  TRADE_DIRECTION,
  type WL_PING,
  type WL_PONG,
} from '../tokens';

import type { RequestStream } from '../types/RequestStream';
import type { Response } from '../types/Response';
import { logger } from '../utils';
import { formattedDateForSubscription } from '../utils/date';
import { Subscription, type SubscriptionCallback } from './subscription';
import { WebsocketErrorCodeEnum } from './types';

const SENTRY_IGNORED_MESSAGE_TYPES = [CURRENCY, MARKET, MARKET_DATA_SNAPSHOT, SECURITY, TRADE_DIRECTION];
const ErrorCodesCausingInactiveSubscription = new Set([
  WebsocketErrorCodeEnum.WebsocketErrorDuplicateReqID,
  WebsocketErrorCodeEnum.WebsocketErrorInvalidRequest,
  WebsocketErrorCodeEnum.WebsocketErrorInvalidStream,
]);

export interface IWebSocketClient<TMessageType> {
  isConnected: boolean;
  sessionId: string | null;
  performance: WebSocketPerformance | null;

  closeAndReconnect(): void;
  registerPublication(request): void;
  registerSubscription(
    address: string,
    streams: RequestStream[],
    callback: Subscription<TMessageType>['callback'],
    options?: { loadAll?: boolean; overrideParticipant?: string }
  ): void;
  unregisterSubscription(address: string): void;
  updateSubscription(address, streams, callback?: unknown): void;
  pageSubscription(address: string, options?: { loadAll?: boolean }): void;
  sendMessage(message: object): void;
  sendTimedMessage(message: object): void;
  ping(data: object): void;
  pongs(): Observable<Response<{ ts: number }>>;
}

type PingPongTypes = { ping: typeof AVA_PING | typeof WL_PING; pong: typeof AVA_PONG | typeof WL_PONG };

export class WebSocketClient<TMessageType = unknown> implements IWebSocketClient<TMessageType> {
  private url: string;
  private subscriptionsByAddress: Map<string, Subscription<TMessageType>>;
  private subscriptionsByReqID: Map<number, Subscription<TMessageType>>;
  private canceledSubscriptions: Map<number | string, Subscription<TMessageType>>;
  private requestCount = 1;
  public isConnected = false;
  private retryCount = 0;
  private tryReconnect: boolean;
  public sessionId: string | null;
  private socket: WebSocketSubject<Response<TMessageType>>;
  private onOpen: () => void;
  private onError: (args: { sentryId: string | undefined }) => void;
  public performance: WebSocketPerformance | null;
  private socketSubscription: RxJsSubscription;
  private pongObservable: Observable<Response<{ ts: number }>>;
  // Customer Api and principal api have different ping/pong types.
  private pingPongTypes: PingPongTypes;

  constructor({
    url,
    initialRetryInterval = 1000,
    maxRetryInterval = 15000,
    maxRetryAttempts = 10,
    onOpen = () => {},
    onClose = _e => {},
    onError = () => {},
    performance = null,
    pingPongTypes = { ping: AVA_PING, pong: AVA_PONG },
  }: {
    url: string;
    initialRetryInterval?: number;
    maxRetryInterval?: number;
    maxRetryAttempts?: number;
    onOpen?: () => void;
    onClose?: (e: CloseEvent) => void;
    onError?: (args: { sentryId: string | undefined }) => void;
    performance?: WebSocketPerformance | null;
    pingPongTypes?: PingPongTypes;
  }) {
    this.url = url;
    this.subscriptionsByAddress = new Map();
    this.subscriptionsByReqID = new Map();
    this.canceledSubscriptions = new Map();
    this.tryReconnect = maxRetryAttempts > 0;
    this.sessionId = null;
    this.pingPongTypes = pingPongTypes;
    this.socket = webSocket({
      url,
      openObserver: {
        next: () => {
          // Collect any active subscriptions
          const activeSubscriptions = [...this.subscriptionsByAddress.entries()];

          // Re-subscribe to any active subscriptions
          for (const [address, subscription] of activeSubscriptions) {
            this.updateSubscription(address, subscription.streams, subscription.callback);
          }
          this.isConnected = true;
          this.retryCount = 0;
        },
      },
      closeObserver: {
        next: e => {
          this.isConnected = false;
          onClose(e);
        },
      },
    });
    this.onOpen = onOpen;
    this.onError = onError;
    this.performance = performance;

    let socketObservable: Observable<Response<TMessageType>> = this.socket;

    if (this.tryReconnect) {
      const exponentialBackoff = observable =>
        observable.pipe(
          flatMap(() => {
            if (maxRetryAttempts > 0 && this.retryCount >= maxRetryAttempts) {
              return throwError(new Error(`Unable to reconnect after ${maxRetryAttempts} attempts`));
            }
            if (this.tryReconnect) {
              return timer(Math.min(maxRetryInterval, initialRetryInterval * Math.pow(2, this.retryCount++)));
            }
            return EMPTY;
          })
        );

      socketObservable = socketObservable.pipe(
        retryWhen(exponentialBackoff),
        repeatWhen(observable => (this.tryReconnect ? observable : EMPTY))
      );
    }

    socketObservable = socketObservable.pipe(
      share({
        resetOnError: false,
        resetOnComplete: false,
        resetOnRefCountZero: false,
      })
    );

    this.socketSubscription = socketObservable.subscribe({
      next: json => this.handleMessage(json),
      error: error => this.handleError(error),
    });

    this.pongObservable = socketObservable.pipe(
      filter((json: any): json is Response<{ ts: number }> => json.type === AVA_PONG)
    );
  }

  closeAndReconnect() {
    this.tryReconnect = true;
    return this.socket.error({
      code: 1000,
      reason: 'Clean close',
    });
  }

  close() {
    this.tryReconnect = false;
    if (this.socketSubscription != null) {
      this.socketSubscription.unsubscribe();
    }
    if (this.isConnected) {
      this.socket.error({
        code: 1000,
        reason: 'Clean close',
      });
    }
  }

  pongs() {
    return this.pongObservable;
  }

  registerPublication(request) {
    const reqid = this.requestCount++;
    this.sendMessage({ reqid, ...request });
  }

  registerSubscription(
    address: string,
    streams: RequestStream[],
    callback: SubscriptionCallback<TMessageType>,
    options?: {
      loadAll?: boolean;
      overrideParticipant?: string;
    }
  ) {
    if (this.subscriptionsByAddress.has(address)) {
      this.logError(`Subscription address ${address} is already in use`);
    }
    performance.mark(`${address}-start`);
    const reqid = this.requestCount++;
    const overrideParticipant = options?.overrideParticipant;
    const sub = new Subscription<TMessageType>({
      address,
      streams,
      reqid,
      callback,
      loadAll: options?.loadAll,
      overrideParticipant,
    });
    this.subscriptionsByAddress.set(address, sub);
    this.subscriptionsByReqID.set(reqid, sub);

    this.sendTimedMessage({
      type: SUBSCRIBE,
      reqid,
      streams,
      overrideParticipant,
    });
    this.performance?.registerSubscription(reqid, streams);
  }

  unregisterSubscription(address: string) {
    const sub = this.subscriptionsByAddress.get(address);
    if (sub == null) {
      this.logError(`Subscription not found while trying to unregister subscription on address ${address}`);
      return;
    }

    const reqid = sub.reqid;
    // Note: we always send a `cancel` message, even if the sub is *believed* to be inactive
    // We just ignore `unknown req id` errors for any subs in our canceled subscriptions list
    // This way, we can be sure to cancel subscriptions that are still active on the server.
    this.sendTimedMessage({
      type: CANCEL,
      tag: sub.streams[0]['Tag'],
      reqid,
    });

    // Dispose to avoid memory leak
    sub.dispose();

    // add to the canceled subs map, by both reqid and address
    this.canceledSubscriptions.set(reqid, sub);
    this.canceledSubscriptions.set(address, sub);
    this.subscriptionsByAddress.delete(address);
    this.subscriptionsByReqID.delete(reqid);
    this.performance?.unregisterSubscription(reqid);
  }

  updateSubscription(address, streams, callback?: unknown) {
    const sub = this.subscriptionsByAddress.get(address);
    if (sub == null) {
      this.logError(`Subscription not found while trying to update subscription on address ${address}`);
      return;
    }

    // Since we can never quite be sure if an error response means that a subscription is active or not,
    // We amend subscriptions by cancelling the old one and sending a new subscribe request.
    // This also prevents us sending an `amend` that fails, and leaves the server with the old subscription
    // while the client side has the updated subscription details.
    // Instead, we'd now have one cancelled subscription, and one in-active subscription, which is consistent
    // with the server state.
    this.unregisterSubscription(address);
    return this.registerSubscription(address, streams, isFunction(callback) ? callback : sub?.callback, {
      loadAll: sub?.loadAll,
    });
  }

  pageSubscription(address, { loadAll = false } = {}) {
    const sub = this.subscriptionsByAddress.get(address);
    if (sub == null) {
      if (!this.canceledSubscriptions.has(address)) {
        this.logError(`Subscription not found while trying to page on address ${address}`);
      }
      return;
    }

    const { reqid, streams, next } = sub;
    if (streams.length > 1) {
      throw new Error('Paging subscriptions with multiple streams is not supported');
    }
    const stream = streams[0];
    sub.loadAll = loadAll;
    if (next != null) {
      return this.sendTimedMessage({
        type: PAGE,
        reqid,
        streams: [{ name: stream.name, after: next }],
      });
    }
  }

  sendMessage(message) {
    Sentry.addBreadcrumb({
      type: 'query',
      category: 'ws.send',
      message: `${message?.type ? `Type: ${message.type} ` : ''}${
        message.streams?.[0]?.Tag ? `Tag: ${message.streams[0].Tag} ` : ''
      }${message.streams?.[0]?.name ? `Name: ${message.streams[0].name} ` : ''}`,
      data: message,
    });
    this.socket.next(message);
  }

  sendTimedMessage(message) {
    return this.sendMessage({
      ...message,
      ts: formattedDateForSubscription(new Date()),
    });
  }

  ping(data) {
    if (this.isConnected) {
      return this.sendTimedMessage({
        type: this.pingPongTypes.ping,
        reqid: this.requestCount++,
        data: [data],
      });
    }
  }

  pong(reqid, data) {
    return this.sendTimedMessage({
      type: this.pingPongTypes.pong,
      reqid,
      data,
    });
  }

  handleMessage(json) {
    const sub = this.subscriptionsByReqID.get(json.reqid);
    const reqSub = sub ?? this.canceledSubscriptions.get(json.reqid);

    if (!SENTRY_IGNORED_MESSAGE_TYPES.includes(json.type)) {
      const subName = reqSub?.streams?.map(s => s.name).join(', ');
      const subTag = reqSub?.streams?.map(s => s.tag).join(', ');

      const typeMsg = json.type === 'error' ? `Type: ${subName} error` : json.type ? `Type: ${json.type} ` : '';
      const tagMsg = json.tag ?? subTag ? `Tag: ${json.tag ?? subTag} ` : '';
      const dataMsg = json.data?.length ? `Data: ${json.data.length} records ` : '';
      const msg = json.msg ? `Msg: ${json.msg} ` : '';
      Sentry.addBreadcrumb({
        type: 'query',
        category: 'ws.receive',
        message: `${typeMsg}${tagMsg}${dataMsg}${msg}`,
        data: {
          ...json,
          ...(json.data ? { data: json.data } : {}),
        },
      });
    }

    if (json.type === 'hello') {
      this.sessionId = json.session_id;
      if (this.performance) {
        this.performance.sessionId = this.sessionId;
      }
      return this.onOpen();
    }

    if (json.type === AVA_PING) {
      return this.pong(json.reqid, json.data);
    }

    if (json.type === AVA_PONG) {
      return;
    }

    if (this.performance) {
      this.performance.onMessage(json);
    }

    // Note: we only ignore `UnknownReqID` errors here if the subscription is in our cancelled list
    // That usually indicates that we tried to cancel a subscription that never got setup correctly
    // (e.g. invalid stream, etc.).
    // If the reqid is not in our list of cancelled subscriptions, this is definitely still an error worth reporting

    const isIgnoredErrorCode = [
      WebsocketErrorCodeEnum.WebsocketErrorUnknownReqID,
      WebsocketErrorCodeEnum.WebsocketErrorSubscriptionLimit,
      WebsocketErrorCodeEnum.WebsocketErrorPermissionDenied,
    ].includes(json?.error?.code);
    const isCancelledSub = this.canceledSubscriptions.has(json.reqid);

    const shouldLogError = json.type === 'error' && !isIgnoredErrorCode && !isCancelledSub;

    if (shouldLogError) {
      const { sentryId } = this.logError(JSON.stringify(json.error), {
        contexts: {
          subscription: {
            address: reqSub?.address,
            reqid: reqSub?.reqid,
            streams: reqSub?.streams,
            active: reqSub?.active,
            next: reqSub?.next,
            tag: reqSub?.streams?.map(s => s.tag).join(', '),
            cancelled: isNil(sub) && !isNil(reqSub),
          },
        },
      });

      // mark this sub as inactive, so we won't try to unsubscribe later
      if (sub && ErrorCodesCausingInactiveSubscription.has(json.error.code)) {
        sub.active = false;
      }

      if (json.error.code === WebsocketErrorCodeEnum.WebsocketErrorInternalServerError) {
        this.onError({ sentryId });
      }

      const callback = sub?.callback;
      if (callback != null) {
        return callback(json);
      }
    }

    if (isNil(sub) && isCancelledSub) {
      // Sometimes the backend manages to send out messages right before
      // our `cancel` message has reached it
      return;
    }

    if (isNil(sub)) {
      this.logError('Subscription not found for request id', {
        level: 'info',
        extra: { json },
      });
    } else {
      sub.active = true; // in case it was inactive previously
      const { loadMore } = sub.handleJson(json);
      if (loadMore) {
        this.pageSubscription(sub.address, { loadAll: true });
      } else {
        if (json.initial) {
          performance.mark(`${sub.address}-end`);
          performance.measure(`sub-${json.type}-${json.tag}`, `${sub.address}-start`, `${sub.address}-end`);
        }
      }
    }
  }

  handleError(error) {
    console.warn('websocket error:', error);
  }

  logError = (message: string, captureContext?: Partial<ScopeContext>) => {
    if (import.meta.env.DEV) {
      // in this case, we won't have initialized sentry at all, but it would be nice to log the message in the developer's console still
      console.error(message, captureContext);
      return {};
    } else {
      return logger.error(new Error(message), {
        contexts: {
          websocket: {
            url: this.url,
            session_id: this.sessionId,
          },
          ...captureContext?.contexts,
        },
      });
    }
  };
}

declare global {
  // typing for https://developer.mozilla.org/en-US/docs/Web/API/NetworkInformation
  // - now in TS main due to limited browser support (was added then removed)
  interface NetworkInformation {
    type?: string;
    downlink?: number;
    effectiveType?: string;
    rtt?: number;
  }
}

interface SubscriptionInfo {
  /** The first of the requestStreams, primarily used for typing to show that we always have at least one stream for this object to exist. */
  primaryStream: RequestStream;

  /** The raw request stream objects, these won't show up properly in the console table, but we can expand the object underneath the table to drill into these if needed. */
  requestStreams: RequestStream[];

  /** Total number of messages received for this subscription. */
  totalMessages: number;

  /** Data records received from this subscription if this stream is listed in the captureStreams local storage config. */
  capturedMessages: unknown[];

  /** Total number of data records received for this subscription.  */
  totalRecords: number;

  /** Number of data records received since the last timed reset (every second). */
  recordsSinceLastSecond: number;

  /** Number of data records received in the previous second. Consider changing this to a moving average. */
  recordsInLastSecond: number;

  /** Time when the subscription was first subscribed to. */
  subscribedAt: Date;

  /** Time taken for the subscription to either receive it first message or when it has loaded it's last page of data. */
  loadTimeMs?: number;

  /** Time when the subscription was unsubscribed from. */
  unsubscribedAt?: Date;
}

type SubscriptionInfos = { [reqid: string]: SubscriptionInfo };

export interface SubscriptionStats {
  reqId: string;
  sub: string;
  tag: string;
  totalMessages: number;
  totalRecords: number;
  recordsLastSecond: number;
  loadTimeMs?: number;
  subscribedTime: string;
  unsubscribedTime?: string;
  requestStreams: RequestStream[];
  isCapturedStream: boolean;
  capturedMessages: unknown[];
}
const CAPTURED_STREAMS_KEY = 'stats.capturedStreams';
const LOG_SUBSCRIPTION_INFO_KEY = 'stats.logSubscriptionInfo';
const LOG_SUBSCRIPTION_INFO_TIME_LIMIT_KEY = 'stats.logSubscriptionInfoTimeLimit';

export class WebSocketPerformance {
  sessionId: string | null = null;
  interval: ReturnType<typeof setInterval> | null = null;
  subject = new Subject<{
    messagesPerSecond: number;
    latencyPerSecond: number;
    activeSubscriptions: number;
    activeSubscriptionStats: SubscriptionStats[];
    canceledSubscriptions: number;
    canceledSubscriptionStats: SubscriptionStats[];
  }>();

  // Only send latency warnings once every 60s
  didSendLatencyWarning = false;
  latencySinceLastReset = 0;
  messagesSinceLastReset = 0;
  latencyPerSecond = 0;
  messagesPerSecond = 0;
  started = false;

  private subscriptionInfos: SubscriptionInfos = {};

  private capturedStreams: Set<string>;

  activeSubscriptions = 0;
  canceledSubscriptions = 0;

  isLogSubscriptionInfo$ = new BehaviorSubject<boolean>(this.isLogSubscriptionInfo());

  constructor() {
    this.sessionId = null;
    const capturedStreams = localStorage.getItem(CAPTURED_STREAMS_KEY);
    this.capturedStreams = new Set(capturedStreams?.split(','));
  }

  reset() {
    this.messagesSinceLastReset = 0;
    this.latencySinceLastReset = 0;
  }

  stats() {
    return this.subject;
  }

  registerSubscription(reqid: number, streams: RequestStream[]) {
    this.activeSubscriptions++;

    if (this.isLogSubscriptionInfo()) {
      if (streams.length > 0) {
        this.subscriptionInfos[reqid] = {
          primaryStream: streams[0],
          requestStreams: streams,
          totalMessages: 0,
          capturedMessages: [],
          totalRecords: 0,
          recordsInLastSecond: 0,
          recordsSinceLastSecond: 0,
          subscribedAt: new Date(),
        };
      }
    }
  }

  onMessage(json: any) {
    const { reqid, next, ts } = json;
    if (ts) {
      const latency = new Date().getTime() - Date.parse(ts);
      this.latencySinceLastReset += latency;
    }
    this.messagesSinceLastReset += 1;

    const subscriptionInfo = this.subscriptionInfos[reqid];
    // this will be undefined if isLogSubscriptionInfo() is false
    if (subscriptionInfo) {
      const recordsLength = json.data?.length ?? 0;
      subscriptionInfo.totalMessages++;
      const streamName = subscriptionInfo.primaryStream.name;
      if (this.isCapturedStream(streamName)) {
        subscriptionInfo.capturedMessages.push(json);
      }
      subscriptionInfo.totalRecords += recordsLength;
      subscriptionInfo.recordsSinceLastSecond += recordsLength;
      if (!next && !subscriptionInfo.loadTimeMs) {
        subscriptionInfo.loadTimeMs = new Date().getTime() - subscriptionInfo.subscribedAt.getTime();
      }
    }
  }

  unregisterSubscription(reqid: number) {
    this.activeSubscriptions--;
    this.canceledSubscriptions++;

    const subscription = this.subscriptionInfos[reqid];
    if (subscription) {
      subscription.unsubscribedAt = new Date();
    }
  }

  private getSubscriptionStats(unsubscribed: boolean): SubscriptionStats[] {
    if (!this.isLogSubscriptionInfo()) {
      return [];
    }
    const filteredSubscriptions = Object.entries(this.subscriptionInfos).filter(([, s]) =>
      unsubscribed ? !!s.unsubscribedAt : !s.unsubscribedAt
    );

    return filteredSubscriptions.map(([reqid, info]) => {
      return {
        reqId: reqid,
        sub: info.primaryStream.name,
        tag: info.primaryStream.tag,
        totalMessages: info.totalMessages,
        totalRecords: info.totalRecords,
        recordsLastSecond: info.recordsInLastSecond,
        loadTimeMs: info.loadTimeMs,
        subscribedTime: info.subscribedAt?.toLocaleTimeString(),
        unsubscribedTime: info.unsubscribedAt?.toLocaleTimeString(),
        requestStreams: info.requestStreams,
        isCapturedStream: this.isCapturedStream(info.primaryStream.name),
        capturedMessages: info.capturedMessages,
      };
    });
  }

  start() {
    this.interval = setInterval(() => {
      this.messagesPerSecond = this.messagesSinceLastReset;

      if (this.isLogSubscriptionInfoTimeLimitExceeded()) {
        this.setLogSubscriptionInfo(false);
      }
      if (this.isLogSubscriptionInfo()) {
        values(this.subscriptionInfos).forEach(info => {
          info.recordsInLastSecond = info.recordsSinceLastSecond;
          info.recordsSinceLastSecond = 0;
        });
      }

      if (this.messagesSinceLastReset > 0) {
        this.latencyPerSecond = Math.round(this.latencySinceLastReset / this.messagesSinceLastReset);
      } else {
        this.latencyPerSecond = 0;
      }

      this.subject.next({
        messagesPerSecond: this.messagesPerSecond,
        latencyPerSecond: this.latencyPerSecond,
        activeSubscriptions: this.activeSubscriptions,
        activeSubscriptionStats: this.getSubscriptionStats(false),
        canceledSubscriptions: this.canceledSubscriptions,
        canceledSubscriptionStats: this.getSubscriptionStats(true),
      });

      this.reset();
    }, 1000);
  }

  stop() {
    this.started = false;
    if (this.interval != null) {
      clearInterval(this.interval);
    }
  }

  isLogSubscriptionInfo() {
    // only log subscription info when feature flag is set as subscription info will grow indefinitely over time.
    return localStorage.getItem(LOG_SUBSCRIPTION_INFO_KEY) === 'true';
  }

  isLogSubscriptionInfoTimeLimitExceeded() {
    // we want to do this in case someone has accidentally left subscription logging on, which will eventually cause an OOM.
    const timeLimit = localStorage.getItem(LOG_SUBSCRIPTION_INFO_TIME_LIMIT_KEY);
    if (!timeLimit) {
      return false;
    }
    return new Date(timeLimit) < new Date();
  }

  setLogSubscriptionInfo(enabled: boolean) {
    if (!enabled) {
      this.subscriptionInfos = {};
      localStorage.removeItem(LOG_SUBSCRIPTION_INFO_KEY);
      this.capturedStreams.clear();
      localStorage.removeItem(CAPTURED_STREAMS_KEY);
      localStorage.removeItem(LOG_SUBSCRIPTION_INFO_TIME_LIMIT_KEY);
    } else {
      localStorage.setItem(LOG_SUBSCRIPTION_INFO_KEY, 'true');
      // save UTC date one hour from now to localStorage to automatically stop logging subscription info after an hour.
      const stopTime = new Date();
      stopTime.setHours(stopTime.getHours() + 1);
      localStorage.setItem(LOG_SUBSCRIPTION_INFO_TIME_LIMIT_KEY, stopTime.toISOString());
    }
    this.isLogSubscriptionInfo$.next(enabled);
  }

  isCapturedStream(stream?: string) {
    return stream ? this.capturedStreams.has(stream) ?? false : false;
  }

  setCapturedStream(stream: string, enabled: boolean) {
    if (enabled) {
      this.capturedStreams.add(stream);
    } else {
      this.capturedStreams.delete(stream);
    }
    localStorage.setItem(CAPTURED_STREAMS_KEY, [...this.capturedStreams].join(','));
  }
}
