import { invariant } from '@epic-web/invariant';
import { getBatchingPipe, logger, wrapForErrorHandling, wsSubscriptionCache } from '@talos/kyoko';
import { map, pipe, type OperatorFunction } from 'rxjs';
import type { FromRiskObsValue, FromRiskObsValueMin } from './PortfolioManagementProvider.types';

function batchFilterCondition(item: FromRiskObsValue): boolean {
  // If the last message has HasMore === false (or we got nothing), we are done
  return item.data.length === 0 || item.data.at(-1)?.BatchHasMore === false;
}

function validateMessages(messages: FromRiskObsValue[]) {
  if (messages.length === 0) {
    return;
  }
  const firstMessage = messages.at(0);
  const lastMessage = messages.at(-1);
  invariant(firstMessage, 'First message is missing');
  invariant(lastMessage, 'Last message is missing');
  const flattenMessageData = messages.flatMap(message => message.data);
  // each data has a BatchIndex, validate that they are in order
  for (let i = 0; i < flattenMessageData.length; i++) {
    if (flattenMessageData[i].BatchIndex !== i) {
      throw new Error(`BatchIndex is not in order, expected ${i} but got ${flattenMessageData[i].BatchIndex}`);
    }
  }
}

/**
 * Cache the risk data for the portfolio management provider, to allow downstream components to access the data
 * pipes to reset the data to late/changed subscribers
 *  - Batch the data to ensure responses are complete for aggregation
 */
export function getPortfolioRiskCachePipe(): OperatorFunction<FromRiskObsValue, FromRiskObsValueMin> {
  const batchHandlerPipe = pipe(
    map((messages: FromRiskObsValue[]): FromRiskObsValue => {
      validateMessages(messages);
      invariant(messages[0].tag, `due to tag-based processing, all messages should have a tag`);
      const result: FromRiskObsValue = {
        ...messages[0],
        data: messages.flatMap(message => message.data),
      };
      return result;
    })
  );
  const batchingPipe = getBatchingPipe({
    batchCondition: batchFilterCondition,
  });
  return pipe(
    // batching needs to be done outside of the error handling, as the
    // mergemap separates each message into a separate observable
    batchingPipe,
    wrapForErrorHandling({
      wrappedPipe: pipe(
        batchHandlerPipe,
        wsSubscriptionCache(item => `${item.SubAccount}@@@@@${item.Asset}`, undefined)
      ),
      errorHandler: error => {
        logger.error(
          new Error(
            `Portfolio Management batch reader error (processing will continue): ${
              error instanceof Error ? error.message : error
            }`,
            {
              cause: error,
            }
          )
        );
      },
    })
  );
}
