import type { UnaryFunction } from 'rxjs';
import { Observable } from 'rxjs';

import { useObservable } from './useObservable';

/** Pipe creation process for batch updating, based on a batch complete condition and a mapper
 * @param obsToBatch The observable to batch using the condition
 * @param batchCondition The condition to check if the batch is complete
 * @param batchMapper The initial mapper pipe to apply to the batched data
 */
export function useBatchedPipe<TOutputValue, TInputValue = TOutputValue>({
  input,
  batchCondition,
  batchHandlerPipe,
}: {
  /** The observable to batch using the condition */
  input: Observable<TInputValue>;
  /** The condition to check if the batch is complete */
  batchCondition: (value: TInputValue) => boolean;
  /** The initial mapper pipe to apply to the batched data */
  batchHandlerPipe: UnaryFunction<Observable<TInputValue[]>, Observable<TOutputValue>>;
}): Observable<TOutputValue> {
  return useObservable<TOutputValue>(() => {
    // rxjs's built-in buffer operator can do a similar job,
    // but it can't be used with immediate values (for our test case),
    // so this modified operator does the trick
    const inputBatcher = (source: Observable<TInputValue>) =>
      new Observable<TInputValue[]>(observer => {
        let buffer: TInputValue[] = [];

        return source.subscribe({
          next(value) {
            buffer.push(value);

            if (batchCondition(value)) {
              observer.next(buffer);
              buffer = [];
            }
          },
          error(err) {
            observer.error(err);
          },
          complete() {
            observer.complete();
          },
        });
      });
    return input.pipe(inputBatcher, batchHandlerPipe);
  }, [batchCondition, batchHandlerPipe, input]);
}
