import { Subject } from "rxjs";
import { filter } from "rxjs/operators";

import * as http from "skCommon/core/http";
import { getDefaultConfig } from "skCommon/api/config";
import * as client from "skCommon/api/client/simple";
import * as paging from "skCommon/api/client/pagination";
import { generateUniqueId } from "skCommon/utils/uniqueId";
import { sleep } from "skCommon/utils/delay";
import { SkError, SkErrorWrap } from "skCommon/core/error";
import { ApiObject, API_CONFIG } from "skCommon/api/apiConfig";
import { PipelineStatus, TaskStatusResponse, TaskResponse } from "skCommon/tasking/pipeline";
import { Logger } from "skCommon/utils/logger";

const REQUEST_CANCELLED = Symbol("REQUEST_CANCELLED");

export abstract class TaskedClient extends client.Client {
    public readonly TASKING_API = "tasking-api";
    public readonly TASKING_GET_STATUS_METHOD = "status";

    /**
     * @param passive Stop the tasked request if the state subject has no
     *  subscriptions (task doesn't resume on subscription).
     */
    public passive: boolean;

    constructor(options?: TaskedClientConstructorOptions) {
        super();

        this.passive = options && options.passive;
    }

    public getDefaultOptions(): TaskedClientOptions {
        return super.getDefaultOptions();
    }

    public taskedCall<T>(options: TaskedClientOptions): TaskedResult<T>;
    public taskedCall<T>(options: PaginatedTaskedClientOptions): PaginatedTaskedResult<T>;
    public taskedCall<T>(
        options: TaskedClientOptions | PaginatedTaskedClientOptions,
    ): TaskedResult<T> | PaginatedTaskedResult<T> {

        const result = "paginator" in options
            ? new PaginatedTaskedResult<T>(this.apiObject, options, this.log)
            : new TaskedResult<T>(this.apiObject, options, this.log);

        if (!options.trace) {
            options.trace = generateUniqueId(12);
        }
        result.promise = <any>this
            .urlForCall({
                ...options,
                urlSuffix: TaskedClientAction.Initiate,
            })
            .then<T | T[]>((url) => this.makeOrContinueOrRetryCall(url, result))
            .catch(
                (e: Error) => this.throwTaskedClientError(e, result, options),
            );

        return result;
    }

    public async urlForTaskStatus() {
        const urlUtil = getDefaultConfig().url;

        return urlUtil.get(API_CONFIG[this.TASKING_API], this.TASKING_GET_STATUS_METHOD);
    }

    private async makeOrContinueOrRetryCall<T>(
        url: string,
        result: PaginatedTaskedResult<T> | TaskedResult<T>,
    ): Promise<T | T[]> {
        try {
            return await this.makeOrContinueInCall(url, result);
        } catch (e) {
            if (e instanceof FailedPipelineError && result.pipelineRetries) {
                result.pipelineRetries--;
                result.pipelineId = void 0;

                return this.makeOrContinueOrRetryCall(url, result);
            } else {
                throw e;
            }
        }
    }

    private makeOrContinueInCall<T>(
        url: string,
        result: PaginatedTaskedResult<T> | TaskedResult<T>,
    ): Promise<T> | Promise<T[]> {
        const options = result.requestOptions;
        const pipelineId = result.pipelineId;

        if (!!pipelineId) {

            this.log.debug("Resume API call", {
                endpoint: options.endpoint,
                pipeline: pipelineId,
                async: true,
                paginated: "paginator" in options,
                trace: options.trace,
            });

            if (result instanceof PaginatedTaskedResult) {
                return this.resumeTaskedPaginatedCall<T>(
                    result,
                    url,
                    result.requestOptions,
                );
            } else {
                return this.resumeTaskedCall<T>(result, options);
            }
        } else {

            this.log.debug("API call", {
                endpoint: options.endpoint,
                async: true,
                paginated: "paginator" in options,
                trace: options.trace,
            });

            if (result instanceof PaginatedTaskedResult) {
                return this.makeTaskedPaginatedCall<T>(
                    result,
                    url,
                    result.requestOptions,
                );
            } else {
                return this.makeTaskedCall<T>(
                    result,
                    url,
                    options,
                );
            }
        }
    }

    private async makeTaskedCall<T>(
        result: TaskedResult<T>,
        url: string,
        options: TaskedClientOptions,
    ): Promise<T> {
        const innerResult = new client.Result<TaskResponse>(
            result.api,
            result.requestOptions,
            result.log,
        );

        const response = await this.makeCall(
            innerResult,
            url,
            options,
        );

        this.log.debug("Task meta-data received", {
            endpoint: options.endpoint,
            pipeline: response.pipelineId,
            status: response.status,
            trace: options.trace,
        });

        result.pipelineId = response.pipelineId;
        result.state.next({
            type: client.ResultStateType.PIPELINE_ID_RECEIVED,
        });

        if (response.status !== PipelineStatus.Resolved) {
            const cancelled = await this.waitForTaskToComplete(
                result,
                response.pipelineId,
                response.nextTry * 1000,
            );

            if (cancelled) {
                return;
            }
        }

        return this.getTaskResult(result, response.pipelineId, options);
    }

    private makeTaskedPaginatedCall<T>(
        result: PaginatedTaskedResult<T>,
        url: string,
        options: PaginatedTaskedClientOptions,
    ) {
        let call: Promise<T[]>,
            paginatedData: T[] = [];

        const innerResult = new TaskedResult<paging.SkCursorPaginatedResponse<T>>(
            result.api,
            result.requestOptions,
            result.log,
        );

        // pass the event from inside the call to the above result whenever
        // pipelineId changes.
        innerResult.state.pipe(
            filter(e => e.type === client.ResultStateType.PIPELINE_ID_RECEIVED),
        ).subscribe((data) => {
            result.pipelineId = innerResult.pipelineId;
            result.state.next({
                type: data.type,
            });
        });

        call = this
            .makeTaskedCall<paging.SkCursorPaginatedResponse<T>>(
                innerResult,
                url,
                options,
            )
            .then(async (response) => {
                const paginator = options.paginator;
                let pageResult: T[];

                if (!paginator) {
                    return;
                }

                if (!result.data) {
                    result.data = new Array();
                }

                if (!response) {
                    // Cancelled
                    return;
                }

                paginator.setFromResponse(response);

                pageResult = options.paginator
                    .getResultsFromResponse(response);

                result.data = (result.data).concat(pageResult);
                paginatedData = (paginatedData).concat(pageResult);

                this.log.debug("Page received", {
                    endpoint: options.endpoint,
                    pipeline: result.pipelineId,
                    count: (pageResult instanceof Array)
                        ? (pageResult).length
                        : "N/A",
                    trace: options.trace,
                });

                result.state.next({
                    type: client.ResultStateType.PAGE_RECEIVED,
                    pageResult,
                });

                if (paginator.hasMore()) {
                    const nextPageParams = paginator.getNextPageParams();

                    this.log.debug("Next page", {
                        endpoint: options.endpoint,
                        pipeline: result.pipelineId,
                        pageParams: nextPageParams,
                        trace: options.trace,
                    });

                    result.nextPageParams = nextPageParams;
                    result.state.next({
                        type: client.ResultStateType.NEXT_PAGE,
                        pageParams: nextPageParams,
                    });

                    Object.assign(options.body, nextPageParams);

                    const nextPageResult = await this.makeTaskedPaginatedCall<T>(
                        result,
                        url,
                        options,
                    );

                    paginatedData = (paginatedData).concat(nextPageResult);
                } else {
                    result.nextPageParams = null;
                }

                return paginatedData;
            });

        return call;
    }

    private async resumeTaskedCall<T>(
        result: TaskedResult<T>,
        options: TaskedClientOptions,
    ) {
        const cancelled = await this.waitForTaskToComplete(
            result,
            options.pipelineId,
        );

        if (!cancelled) {
            return this.getTaskResult<T>(result, options.pipelineId, options);
        }
    }

    private resumeTaskedPaginatedCall<T>(
        result: PaginatedTaskedResult<T>,
        url: string,
        options: PaginatedTaskedClientOptions,
    ) {
        let paginatedData: T[] = [];

        const innerResult = new TaskedResult<paging.SkCursorPaginatedResponse<T>>(
            result.api,
            result.requestOptions,
            result.log,
        );

        return this
            .resumeTaskedCall(innerResult, options)
            .then(async (response) => {
                const paginator = options.paginator;
                let pageResult: T[];

                if (!paginator) {
                    return;
                }

                if (!result.data) {
                    result.data = new Array();
                }

                paginator.setFromResponse(response);

                pageResult = options.paginator
                    .getResultsFromResponse(response);

                result.data = (result.data).concat(pageResult);
                paginatedData = (paginatedData).concat(pageResult);

                result.state.next({
                    type: client.ResultStateType.PAGE_RECEIVED,
                    pageResult,
                });

                if (paginator.hasMore()) {
                    const nextPageParams = paginator.getNextPageParams();

                    result.nextPageParams = nextPageParams;
                    result.state.next({
                        type: client.ResultStateType.NEXT_PAGE,
                        pageParams: nextPageParams,
                    });

                    Object.assign(options.body, nextPageParams);

                    const nextPageResult = await this.makeTaskedPaginatedCall<T>(
                        result,
                        url,
                        options,
                    );

                    paginatedData = (paginatedData).concat(nextPageResult);
                } else {
                    result.nextPageParams = null;
                }

                return paginatedData;
            });
    }

    private async waitForTaskToComplete<T>(
        result: TaskedResult<T>,
        pipelineId: string,
        delay = 0,
    ): Promise<Symbol | null> {
        await sleep(delay);

        if (this.passive && !result.state.observers.length) {
            return REQUEST_CANCELLED;
        }

        this.log.debug("Task status check", {
            endpoint: result.requestOptions.endpoint,
            pipeline: pipelineId,
            trace: result.requestOptions.trace,
        });

        const response = await this.makeCall(
            new client.Result<TaskStatusResponse>(
                result.api,
                result.requestOptions,
                result.log,
            ),
            await this.urlForTaskStatus(),
            {
                body: { pipelineId },
                method: http.Method.Post,
                endpoint: null,
                trace: result.requestOptions.trace,
            },
        );
        const status = response.status;

        this.log.debug("Task status received", {
            endpoint: result.requestOptions.endpoint,
            pipeline: pipelineId,
            status,
            trace: result.requestOptions.trace,
        });

        if (status === PipelineStatus.Failed) {
            const failureReason = await this.retrieveError(
                result,
                pipelineId,
                result.requestOptions,
            );

            throw new FailedPipelineError(
                // FIXME: api should not be object (#1762)
                this.api as any,
                result.requestOptions.endpoint,
                pipelineId,
                failureReason,
                result.requestOptions.trace,
            );
        } else if (status !== PipelineStatus.Resolved) {
            return this.waitForTaskToComplete(
                result,
                pipelineId,
                response.nextTry * 1000,
            );
        }
    }

    private async getTaskResult<T>(result: TaskedResult<T>, pipelineId: string,
        options: TaskedClientOptions) {
        let call: Promise<T>;

        this.log.debug("Retrieve task result", {
            endpoint: options.endpoint,
            pipeline: pipelineId,
            trace: options.trace,
        });

        call = this
            .urlForCall({
                ...options,
                urlSuffix: TaskedClientAction.Retrieve,
            })
            .then((url) => {
                return this.makeCall<T>(
                    result,
                    url,
                    {
                        body: { pipelineId },
                        endpoint: null,
                        trace: options.trace,
                    },
                );
            });

        return call;
    }

    private retrieveError(result: TaskedResult<any>, pipelineId: string,
        options: TaskedClientOptions): Promise<http.ErrorResponse> {

        this.log.debug("Retrieve task error", {
            endpoint: options.endpoint,
            pipeline: pipelineId,
            trace: options.trace,
        });

        return this.getTaskResult<http.ErrorResponse>(
            result,
            pipelineId,
            options,
        );
    }

    private throwTaskedClientError(
        err: Error,
        result: TaskedResult<any>,
        options: TaskedClientOptions,
    ) {
        if (err instanceof http.HttpResponseError) {
            const clientError = new TaskedClientError(
                err,
                this.api as any,
                options.endpoint,
                result.pipelineId,
            );
            clientError.trace = options.trace;
            throw clientError;
        }

        if (err instanceof SkError) {
            err.trace = options.trace;
            throw err;
        }

        throw new SkErrorWrap(err, {
            trace: options.trace,
            pipelineId: result.pipelineId,
        });
    }
}

export class TaskedResult<T, O extends TaskedClientOptions = TaskedClientOptions>
    extends client.Result<T, O> {

    public state: Subject<client.ResultState<T>> = new Subject<client.ResultState<T>>();
    public pipelineId: string;

    public pipelineRetries: number = 0;

    constructor(
        api: ApiObject,
        requestOptions: O,
        log: Logger,
    ) {
        super(api, requestOptions, log);

        if (requestOptions.pipelineRetries) {
            this.pipelineRetries = requestOptions.pipelineRetries;
        }

        if (requestOptions.pipelineId) {
            this.pipelineId = requestOptions.pipelineId;
        }
    }
}

export class PaginatedTaskedResult<T> extends TaskedResult<T[], PaginatedTaskedClientOptions> {
    public nextPageParams: {};
}

///
///
/// Errors
///
///

/**
 * Error thrown when pipeline finishes without an actual error, but finishes
 * with failure.
 */
export class FailedPipelineError extends SkError {

    public get dataToLog(): Object {
        return {
            pipelineId: this.pipelineId,
            api: this.api,
            endpoint: this.endpoint,
            error: this.reason.error,
            errorMessage: this.reason.errorMessage,
        };
    }

    constructor(
        private readonly api: string,
        private readonly endpoint: string,
        public readonly pipelineId: string,
        private readonly reason: http.ErrorResponse,
        trace: string,
    ) {
        super(
            "FailedPipelineError",
            reason.errorMessage || "Tasked request's pipeline failed",
        );

        this._trace = trace;
    }
}

/**
 * Error extending SimpleClientError which also logs pipelineId
 */
export class TaskedClientError extends client.SimpleClientError {

    public get dataToLog(): any {
        return {
            ...super.dataToLog,
            pipelineId: this.pipelineId,
        };
    }

    constructor(
        originalError: http.HttpResponseError,
        api: string,
        endpoint: string,
        private pipelineId: string,
    ) {
        super(originalError, api, endpoint);
    }
}

///
///
/// Interfaces & Types
///
///

export enum TaskedClientAction {
    Initiate = "initiate",
    Retrieve = "retrieve",
}

// Result state type

declare module "skCommon/api/client/simple" {
    export enum ResultStateType {
        PIPELINE_ID_RECEIVED = "PipelineIdReceived",
    }
}

// hack to extend the enum
Object.assign(client.ResultStateType, { PIPELINE_ID_RECEIVED: "PipelineIdReceived" });

export interface TaskedClientOptions extends client.ClientOptions {
    pipelineId?: string;
    pipelineRetries?: number;
}

export interface PaginatedTaskedClientOptions extends client.PaginatedOptions,
    TaskedClientOptions { }

export interface TaskedClientConstructorOptions {
    passive: boolean;
}
