import { logger, wrapForErrorHandling, type WebsocketRequest } from '@talos/kyoko';
import { useMemo } from 'react';
import { map, pipe } from 'rxjs';
import type { useRequestStateTagging } from '../../useRequestStateTagging';
import type { ExtraStateType, FromRiskObsValueMin, ToObsValue } from '../PortfolioManagementProvider.types';
import { useSubscriptionDataConverter } from './useSubscriptionDataConverter';

export function usePortfolioRiskGridDataConverterPipe<TRequest extends Omit<WebsocketRequest, 'tag'>>(
  tagMapRef: ReturnType<typeof useRequestStateTagging<TRequest, ExtraStateType>>['tagMapRef']
) {
  const convertSubToDataResults = useSubscriptionDataConverter<TRequest>(tagMapRef);

  return useMemo(() => {
    return pipe(
      wrapForErrorHandling<FromRiskObsValueMin, ToObsValue>({
        wrappedPipe: pipe(
          map(message => {
            const tag = message.tag;
            if (!tag) {
              throw new Error(`all PMS messages should have a tag`);
            }
            return {
              ...message,
              data: convertSubToDataResults(message.data, tag),
            };
          })
        ),
        errorHandler: error => {
          logger.error(
            new Error(
              `Portfolio Management batch reader error (processing will continue): ${
                error instanceof Error ? error.message : error
              }`,
              {
                cause: error,
              }
            )
          );
        },
      })
    );
  }, [convertSubToDataResults]);
}
