import { channel, take, put } from "@/lib/utils/channel";
import type { IAsyncIterable } from "./async-iterable";
import type { CancellableStream } from "./cancellable-stream";
import { CancelledStreamError } from "./cancellable-stream";

async function* readableStreamIterable(
   stream: ReadableStream<Uint8Array>,
   cancelSignal: () => boolean,
): AsyncGenerator<string> {
   const decoder = new TextDecoderStream();
   const reader = stream.pipeThrough(decoder).getReader();
   while (true) {
      if (cancelSignal()) return reader.cancel();
      const { done, value } = await reader.read();
      if (done) {
         return;
      }
      yield value as string;
   }
}

async function* readableStreamIterableFirefox(
   stream: ReadableStream<Uint8Array>,
   cancelSignal: () => boolean,
): AsyncGenerator<Uint8Array> {
   const reader = stream.getReader();
   while (true) {
      if (cancelSignal()) return reader.cancel();
      const { done, value } = await reader.read();
      if (done) {
         return;
      }
      yield value as Uint8Array;
   }
}

export type SymbolTerminated<T> = T | symbol;

/**
 * Asynchronous JSON stream from a readable stream of UTF-8 encoded characters. Expects the stream
 * to contain serialized JSON objects separated by a newline (\n).
 * Uses a symbol to terminate the stream;
 * Stream can only be read once.
 */
export class JsonStream<T> implements CancellableStream<T>, IAsyncIterable<T> {
   public static readonly CHANNEL_TERMINATOR: symbol = Symbol.for("end");

   private hasCompleted = false;
   private cancelled = false;
   private channel = channel<SymbolTerminated<T>>();
   private capturedError: Error | null = null;

   /**
    * Calling the constructor will immediately begin reading the stream.
    *  The stream will be lazily parsed as callers call the 'next' or the iterate over the data.
    */
   constructor(
      private readonly stream: ReadableStream<Uint8Array>,
      private readonly cancelFn: () => void,
   ) {
      this.readStream();
   }

   [Symbol.asyncIterator](): AsyncIterator<T, symbol, undefined> {
      return this;
   }

   private async readStream() {
      if (typeof TextDecoderStream !== "undefined") {
         this.readStreamWithCorrectDecoding();
      } else {
         this.readStreamFirefox();
      }
   }

   /**
    * Supports decoding multi-byte characters using TextDecoderStream.
    */
   private async readStreamWithCorrectDecoding() {
      let stringBuffer = "";
      try {
         for await (const stringChunk of readableStreamIterable(
            this.stream,
            () => this.cancelled,
         )) {
            if (this.cancelled) break;
            stringBuffer = stringBuffer.concat(stringChunk);
            const items = stringBuffer.split("\n");

            // The last element of items may be an incomplete object, so we should defer parsing.
            // If the stream is finished, the last element will be an empty string.
            stringBuffer = items.pop() ?? "";

            for (const item of items) {
               // Awaiting handles 'back pressure' or 'lazy' parsing.
               // Reason being, If caller doesn't take then there is no reason to parse more than the caller can handle.
               await put<SymbolTerminated<T>>(this.channel, JSON.parse(item));
            }
         }
      } catch (err: unknown) {
         if (err instanceof Error && err.name == "AbortError") {
            this.cancelled = true;
            this.capturedError = new CancelledStreamError();
         } else {
            this.capturedError = err as Error;
         }
      }

      put(this.channel, JsonStream.CHANNEL_TERMINATOR);
   }
   /**
    * Firefox does not yet support TextDecoderStream, so multi-byte characters will still be broken.
    */
   private async readStreamFirefox() {
      let jsonCharBuffer: string[] = [];
      try {
         for await (const byteArray of readableStreamIterableFirefox(
            this.stream,
            () => this.cancelled,
         )) {
            if (this.cancelled) break;
            for (const bit of byteArray) {
               const char = String.fromCharCode(bit);
               if (!char) continue;

               if (char !== "\n") {
                  jsonCharBuffer.push(char);
               } else {
                  const item = JSON.parse(jsonCharBuffer.join(""));
                  // Awaiting handles 'back pressure' or 'lazy' parsing.
                  // Reason being, If caller doesn't take then there is no reason to parse more than the caller can handle.
                  await put<SymbolTerminated<T>>(this.channel, item);
                  jsonCharBuffer = [];
               }
            }
         }
      } catch (err: unknown) {
         if (err instanceof Error && err.name == "AbortError") {
            this.cancelled = true;
            this.capturedError = new CancelledStreamError();
         } else {
            this.capturedError = err as Error;
         }
      }
      put(this.channel, JsonStream.CHANNEL_TERMINATOR);
   }

   /**
    * Issues a call to the stream to stop further reading. Cancel calls are async in nature.
    * Therefore cancels are not immediate nor do they guarantee that the stream has not been read already.
    * The only guarantee is that the reading of the stream by this instance is cancelled eventually.
    */
   cancel(): void {
      if (!this.hasCompleted) {
         this.cancelFn();
         this.cancelled = true;
      }
   }

   isCancelled(): boolean {
      return this.cancelled;
   }

   /**
    * Returns the next value in the asynchronous stream. The last value in the stream is always a symbol;
    * Calls exceeding the length of the stream will lead to promises that never complete & could lead to memory leaks.
    * Callers should make use of the terminating symbol to prevent memory leaks or use the for await...of statement.
    */
   next(): Promise<IteratorResult<T, symbol>> {
      if (this.capturedError) throw this.capturedError;
      return take(this.channel).then((value) => {
         if (typeof value === "symbol") {
            if (this.capturedError) throw this.capturedError;
            return {
               done: true,
               value,
            };
         } else {
            return {
               done: false,
               value: value as T,
            };
         }
      });
   }

   /**
    * Convienence method for retrieving the rest of the values in the stream.
    * Since the stream can only be read once whenever this function is called it will start from where it left off.
    * Thus it does not guarentee an index for where it started.
    * consumers will not be called if the stream has already been consumed. Returns a promise for when the stream has been completed.
    */
   async forEachRemaining(consumer: (t: T) => void): Promise<void> {
      if (this.hasCompleted) return;
      for await (const json of this) {
         consumer(json);
      }
   }
}
