import { Injectable } from "@angular/core";
import { OidcSecurityService } from "angular-auth-oidc-client";
import { Observable, Subject, map, mergeMap, of } from "rxjs";
import { finalize } from "rxjs/operators";

@Injectable({
    providedIn: 'root'
})
export class StreamingService {

    private readonly INVALID_JSON_OBJECT_RESPONSE = 'INVALID_JSON_OBJECT_RESPONSE';

    accessToken$: Observable<string> = of('');

    constructor(
        private oidcSecurityService: OidcSecurityService,
    ) {
        this.accessToken$ = this.oidcSecurityService.getAccessToken();
    }

    fetch = <T extends object>(
        url: string,
        props: RequestInit,
        classType: { new(...args: any[]): T }
    ): Observable<T> => {
        const abortController = new AbortController();
        props.signal = abortController.signal;
        return this.accessToken$.pipe(
            map((token) => props.headers = {
                ...props.headers,
                "Accept": "text/event-stream",
                "Authorization": `Bearer ${token}`,
            }),
            mergeMap(() => fetch(url, props)),
            mergeMap((response: Response) => this.readAllChunks(response, classType)),
            finalize(() => abortController?.abort())
        );
    };

    private isValidResponseObject = (response: any): boolean => {
        return response.parseStatus !== this.INVALID_JSON_OBJECT_RESPONSE;
    };

    private readAllChunks = <T extends object>(
        response: Response,
        classType: { new(...args: any[]): T }
    ): Observable<T> => {
        const stream = response.body;
        const reader = stream?.getReader();
        const subject = new Subject<T>();

        if (!response.ok) {
          subject.error(response)
        }

        return this.readChunk(reader!, subject, classType);
    };

    private readChunk =  <T extends object>(
        reader: ReadableStreamDefaultReader<Uint8Array>,
        subject: Subject<T>,
        classType: { new(...args: any[]): T }
    ): Observable<T> => {
        reader?.read()
            .then(({ value, done }) => {
                if (done) {
                    return subject.complete();
                }

                const decodedValue = new TextDecoder().decode(value).toString();

                const rawResponseValue = decodedValue
                    .split('\n')
                    .filter((line) => line.trim() !== '' && line.startsWith('data:'))
                    .map((line) => line.replaceAll('data:', ''))
                    .filter((line) => line.trim() !== '');

                rawResponseValue
                    .map((line) => this.parseResponseToClass(line, classType))
                    .filter((response) => this.isValidResponseObject(response))
                    .forEach((line) => subject.next(line));

                this.readChunk(reader, subject, classType);
            })
            .catch((error) => {
                console.error('Error reading chunk', error);
                subject.error(error)
            });

        return subject.asObservable();
    };

    private parseResponseToClass = <T extends object>(
        rawResponse: string,
        classType: { new(...args: any[]): T }
    ): T => {
        let response: any;

        try {
            response = JSON.parse(rawResponse);
        } catch {
            response = { parseStatus: this.INVALID_JSON_OBJECT_RESPONSE };
        }

        return Object.assign(new classType(), response);
    };
}
