import { Observable, Subscriber } from 'rxjs';

/**
  A: Source observable type
  B: Subscriber observable type
*/

type processFn<A, B> = {
  (sourceValue: A, subscriber: Subscriber<B>): void;
};

export class GenericOperator<A, B> {
  /**
  The processFn receives the source value (from the previous operator)
  and the subscriber (for calling the next operator).

  Is responsible for calling subscriber.next() (or .error/.completed).
  */
  processFn: processFn<A, B>;

  constructor(processFn: processFn<A, B>) {
    this.processFn = processFn;
  }

  process = (sourceValue: A, subscriber: Subscriber<B>) => {
    this.processFn(sourceValue, subscriber);
  };

  observer = (subscriber: Subscriber<B>) => {
    return {
      next: (sourceValue: A) => {
        this.process(sourceValue, subscriber);
      },
      error: (err: any) => {
        subscriber.error(err);
      },
      complete: () => {
        subscriber.complete();
      },
    };
  };

  observable = (source$: Observable<A>) => {
    return new Observable<B>((subscriber) => {
      const subscription = source$.subscribe(this.observer(subscriber));

      const onDestroy = () => {
        subscription.unsubscribe();
      };

      return onDestroy;
    });
  };

  operator = () => {
    return (source$: Observable<A>) => {
      return this.observable(source$);
    };
  };
}
