import type { OperatorFunction } from 'rxjs';
import { Observable, pipe } from 'rxjs';

/** Pipe creation process for batch creation, based on a batch complete condition
 * @param obsToBatch The observable to batch using the condition
 * @param batchCondition The condition to check if the batch is complete
 */
export function getBatchingPipe<TInputValue>({
  batchCondition,
}: {
  /** The condition to check if the batch is complete */
  batchCondition: (value: TInputValue) => boolean;
}): OperatorFunction<TInputValue, TInputValue[]> {
  const inputBatcher = (source: Observable<TInputValue>) =>
    new Observable<TInputValue[]>(observer => {
      let buffer: TInputValue[] = [];

      return source.subscribe({
        next(value) {
          buffer.push(value);

          if (batchCondition(value)) {
            observer.next(buffer);
            buffer = [];
          }
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        },
      });
    });

  return pipe(inputBatcher);
}
