import { Observable, type Subscription } from 'rxjs';
import type { MinimalSubscriptionResponse } from '../types/SubscriptionResponse';

interface WSMergeParams<T> {
  sources: Observable<MinimalSubscriptionResponse<T>>[];
}

/**
 * `wsMerge` allows you to merge an array of source SubscriptionResponse Observables into one stream of SubscriptionResponse Observable
 *
 * The pipe handles all complexity around having several streams all sending their own `initial: true` messages. It does
 * this by, when some initial message comes in, starting to buffer all messages and waiting until all sources
 * have provided their own initial message. At this point, once all sources are "done buffering", `wsMerge` will emit
 * one true `initial: true` message, and then immediately release all buffered messages from all sources (setting `initial: false`)
 *
 * **Note: Assumptions!**
 *
 * This pipe assumes that, when some source emits an initial: true message, all other sources _should_ also emit their own initial: true messages.
 * This means that when you update one subscription request, you also need to ensure that all of them update. Otherwise, this pipe will stall.
 */
export function wsMerge<T>({ sources }: WSMergeParams<T>) {
  return new Observable<MinimalSubscriptionResponse<T>>(output => {
    // An array of arrays where each nested array represents the buffer of the source of that index
    const buffersBySourceIndex: MinimalSubscriptionResponse<T>[][] = sources.map(() => []);
    const subscriptions: Subscription[] = [];

    sources.forEach((source, sourceIndex) => {
      const sub = source.subscribe(message => {
        const myBuffer = buffersBySourceIndex[sourceIndex];
        if (myBuffer.length > 0) {
          // We're buffering our messages, waiting for the other sources.
          // Two cases depending on message.initial
          if (message.initial) {
            // If there is a buffer, but this source receives an initial: true message, we clear the buffer.
            buffersBySourceIndex[sourceIndex] = [message];
          } else {
            // Else just add ourselves to the end of the buffer
            buffersBySourceIndex[sourceIndex].push(message);
          }
        } else {
          // Else, buffer length is 0.
          if (message.initial) {
            // We got an initial message while the buffer is empty. Let's add it to the buffer.
            myBuffer.push(message);

            // A source is done buffering if it has a length of 1 or more.
            const areAllSourcesDoneBuffering = buffersBySourceIndex.every(buffer => buffer.length > 0);
            if (areAllSourcesDoneBuffering) {
              // All sources are done buffering. We now need to release all the buffers at once.
              // Important here is that only the very first message can have initial: true set, so lets use our own buffer for that.
              output.next(message);
              buffersBySourceIndex[sourceIndex] = [];

              // Now lets release all the other buffers too, setting initial: false on all of them
              buffersBySourceIndex.forEach((buffer, index) => {
                buffer.forEach(message => output.next({ ...message, initial: false }));
                buffersBySourceIndex[index] = [];
              });
            }
          } else {
            const isAnyoneBuffering = buffersBySourceIndex.some(buffer => buffer.length !== 0);
            // If no one is buffering, we just forward our message.
            // If anyone is buffering, and our message is not initial, ignore the message since this source has to return a initial: true
            if (!isAnyoneBuffering) {
              output.next(message);
            }
          }
        }
      });
      subscriptions.push(sub);
    });

    // Cleanup function, unsubscribe to all our sources
    return () => {
      subscriptions.forEach(subscription => subscription.unsubscribe());
    };
  });
}
