import { Observable, type MonoTypeOperatorFunction } from 'rxjs';
import type { MinimalSubscriptionResponse } from '../types';

interface WSWaitForAllPagesParams<T> {
  getUniqueKey: (item: T) => string;
}

/**
 * wsWaitForAllPages can be attached to a websocket stream in order to ensure that downstream consumers always receive complete updates
 *
 * This is useful in cases where you want to perform some computation on a data set which requires a guarantee that _all_
 * pageable records have been received in order to guarantee a valid result of that computation. One such example
 * would be asserting the existence or non-existence of some specific record in a data set. In order to guarantee that,
 * you need to be sure that you are working with all records.
 *
 * This observable will assume that we are connected to an upstream websocket stream which will continue paginating forever
 * (as in, there is no pagination limiter or other logic to stop paginating after some amount of records are received).
 *
 * The pipe will coalesce all received messages after buffering into one output message. The output message will have the json properties
 * of the first received initial: true message. As in, the output of this pipe will be one large coalesced initial: true message.
 *
 * One nuance worth noting is that the frontend might receive a non-paged delta message (as in: live update) message during
 * buffering pages. The pipe has two separate buffers, one for pages (historical updates) and one for delta messages (live updates).
 * Live updates received during the buffering stage are coalesced last, meaning that historical updates will not accidentally override
 * any live updates.
 */
export function wsWaitForAllPages<T>({
  getUniqueKey,
}: WSWaitForAllPagesParams<T>): MonoTypeOperatorFunction<MinimalSubscriptionResponse<T>> {
  return source =>
    new Observable(output => {
      let buffering = true; // always start true
      // The "page: true" messages received since we started buffering pagination messages
      let paginationBuffer: MinimalSubscriptionResponse<T>[] = [];
      // The "page: undefined" messages received since we started buffering pagination messages
      let deltaMessagesBuffer: MinimalSubscriptionResponse<T>[] = [];

      const sub = source.subscribe(json => {
        if (json.initial) {
          buffering = true;
          paginationBuffer = [];
          deltaMessagesBuffer = [];
        }

        if (buffering) {
          if (json.initial || json.page) {
            paginationBuffer.push(json);
          } else {
            deltaMessagesBuffer.push(json);
          }
        }

        if (!buffering) {
          // just forward the message, we're not buffering
          output.next(json);
        }

        const isPage = json.initial || json.page; // the initial: true message is technically a page
        const isLastPage = isPage && !json.next;
        if (buffering && isLastPage) {
          // we have received the last page. release buffer.
          buffering = false;

          // We have received pages (historical records) and delta messages (live records) over some amount of time.
          // Intuitively, the live messages we received should be coalesced last in order to override the historical records properly.
          const buffer = [...paginationBuffer, ...deltaMessagesBuffer];

          if (buffer.length === 1) {
            // if there's only one item in the buffer, just send it along
            output.next(buffer[0]);
          } else {
            // we have several messages buffered, and we only want to output one message. we need to coalesce it
            const uniqueUpdates = new Map<string, T>();
            for (const update of buffer) {
              for (const datapoint of update.data) {
                uniqueUpdates.set(getUniqueKey(datapoint), datapoint);
              }
            }
            const coalescedData: T[] = [...uniqueUpdates.values()];

            output.next({
              ...paginationBuffer[0], // the first message in the buffer, will be an initial: true message. use this as our message template
              next: undefined, // clear next just for sanity's sake, we already iterated over all these possible nexts
              data: coalescedData, // and just override with the coalesced data
            });
          }

          // After releasing all buffered messages, reset the buffers
          paginationBuffer = [];
          deltaMessagesBuffer = [];
        }
      });

      return () => sub.unsubscribe();
    });
}
