type Subscription = () => void;
type Handler<T> = (t: T) => void;

type Atom<T = any> = {
   (val?: T): T;
   subscribe: (subscriber: (t: T) => void) => Subscription;
};

function atom<T = any>(value: T) {
   let v: T = value;
   const subscribers: Set<Handler<T>> = new Set();

   const subscribe = (handler: Handler<T>) => {
      subscribers.add(handler);
      return () => {
         subscribers.delete(handler);
      };
   };

   return Object.assign(
      // if the function is called without any params then we default and simply return the previous value;
      function (val: T = undefined as any): T {
         if (val !== undefined) {
            v = val;
            subscribers.forEach((handler) => handler(v));
            return v;
         }
         return v;
      },
      { subscribe },
   );
}

export type Channel<T> = {
   port: Atom<T[]>;
};

/**
 * Creates a channel or queue or 'stream' of data intended to be used for async or synchronous applications by converting all actions into async operations.
 * The return type of this function in and of itself does not do anything, and
 *  is meant to be use in tandem, with the following operations:
 *  @see put operator
 *  @see take operator
 */
export function channel<T>(): Channel<T> {
   const port = atom<T[]>([]);
   return {
      port,
   };
}

/**
 * 'Put' calls on the channel will immediately put the value in the channel. Returns a promise.
 * The promise returning successfully is a signal that the value has been taken by a taker of the channel.
 *
 * WARNING:
 * All attempts to 'put' on channel that does not have taker will result in never ending promises.
 * If the promise is still referenced this will prevent the promise from being garbage collected.
 * Thus careful state management is deferred to the user of the channel and their respective application.
 *
 * Maybe Future Additions:
 * Adding timeouts to 'put' calls to prevent these leaks.
 */
export function put<T>(channel: Channel<T>, value: T): Promise<void> {
   const { port } = channel;
   return new Promise<void>((resolve) => {
      const subscription = port.subscribe((val: T[]) => {
         if (val.length === 0) {
            resolve();
            subscription();
         }
      });
      port([...(port() || []), value]);
   });
}

/**
 * 'Take' calls on a channel will return a promise of the the next item in the stream synchronously if available.
 * Otherwise it will wait (async) until a value in the stream is available and return it as a promise
 *
 * WARNING:
 * All attempts to 'take' on channel that does not have any putters will result in a never ending promises.
 * If the promise is still referenced this will prevent the promise from being garbage collected.
 * Thus careful state management is deferred to the user of the channel and their respective application.
 *
 * Maybe Future Additions:
 * Adding timeouts to 'take' calls to prevent never ending promises leaks.
 * Adding transformers/transducers to map values in the stream.
 *  */
export function take<T>(channel: Channel<T>): Promise<T> {
   const { port } = channel;
   // take calls wait for the first value to exist
   return new Promise((resolve) => {
      const takeOffPort = (subscription?: Subscription): void => {
         const value: T[] = [...(port() || [])];
         resolve(value.shift() as T);
         if (subscription) subscription();
         port(value);
      };
      if (port()?.length || 0 > 0) {
         takeOffPort();
      } else {
         const subscription = port.subscribe((value: T[]) => {
            if (value.length > 0) {
               takeOffPort(subscription);
            }
         });
      }
   });
}
