import { Task } from 'redux-saga';
import {
  Effect,
  call,
  cancel,
  cancelled,
  fork,
  put,
  race,
  spawn,
  take,
  takeEvery,
} from 'redux-saga/effects';
import { v4 as uuidv4 } from 'uuid';

import {
  AllTasksOutcomes,
  BaseFunnelTask,
  FunnelOutcome,
  Options,
  TaskDoneAction,
  TaskOutcome,
} from './type';

/**
 * Funnel saga utility
 * Based on original idea from @alk-sdavid
 * A funnel is a saga managing to execute a queue of tasks while taking care that the number of concurrent running tasks is bounded.
 * All tasks will be executed, the order of startings being the order of the given queue.
 * Funnel can be cancelled by dispatching the proper action, in that case unstarted scheduled tasks won't be started.
 */

const DEFAULT_MAX_CONCURRENCY = 5;

type FunnelEffectsGenerator<T> = Generator<Effect, FunnelOutcome<T>>;
type Params = {
  allTasksOutcomes: AllTasksOutcomes<unknown>;
  funnelIsCancelled?: boolean;
};
type Result = FunnelOutcome<unknown>['global'];

const computeGlobalOutcome = ({
  allTasksOutcomes,
  funnelIsCancelled = false,
}: Params): Result => {
  const someTaskFailed = Object.values(allTasksOutcomes).some(
    ({ status }) => status === 'failure',
  );
  if (funnelIsCancelled) {
    return someTaskFailed ? 'cancelled with failures' : 'cancelled';
  }
  return someTaskFailed ? 'failure' : 'success';
};

const funnel = function* <
  TaskResult,
  FunnelTask extends BaseFunnelTask<TaskResult>,
>(
  queue: FunnelTask[],
  options: Options<TaskResult, FunnelTask> = {},
): FunnelEffectsGenerator<TaskResult> {
  const {
    maxConcurrency = DEFAULT_MAX_CONCURRENCY,
    cancelActionType = uuidv4() as string, // if cancel action type is not given, this will ensure the funnel is (very) unlikely to be cancelled
    onProgress,
  } = options;

  const internalActionTypePrefix = `FUNNEL_${uuidv4()}`;
  const taskDoneActionType = `${internalActionTypePrefix}_TASK_DONE`;
  const allTasksDoneActionType = `${internalActionTypePrefix}_ALL_TASKS_DONE`;

  if (maxConcurrency < 2) {
    // 1 would work but it would be dumb to use a funnel for that
    throw new Error('maxConcurrency must be >= 2');
  }

  let funnelIsCancelledByAction = false;
  const allTasksOutcomes: AllTasksOutcomes<TaskResult> = {};

  const nbTasks = queue.length;

  function* forkInitialTasks(tasks) {
    for (const task of tasks) {
      yield fork(runTask, task);
    }
  }

  function* runTask(task: FunnelTask) {
    const { id, runner } = task;
    try {
      let outcome: TaskOutcome<TaskResult>;
      try {
        const result: TaskResult = yield call(runner);
        outcome = {
          id,
          status: 'success',
          result,
        };
      } catch (e: any) {
        outcome = { id, status: 'failure', error: e };
      }
      const action: TaskDoneAction<TaskResult> = {
        type: taskDoneActionType,
        task,
        outcome,
      };
      yield put(action);
    } finally {
      if (yield cancelled()) {
        yield put({
          type: taskDoneActionType,
          task,
          outcome: { id, status: 'cancelled' },
        });
      }
    }
  }

  function* onTaskDone(
    remainingTasks: FunnelTask[],
    taskDoneAction: TaskDoneAction<TaskResult, FunnelTask>,
  ) {
    // storing outcome of the task just done
    const { task, outcome: doneTaskOutcome } = taskDoneAction;
    allTasksOutcomes[doneTaskOutcome.id] = doneTaskOutcome;

    const nbTasksDone = Object.keys(allTasksOutcomes).length;
    if (onProgress) {
      // not forking, caller is responsible for termination of his hook
      yield spawn(onProgress, {
        task,
        outcome: doneTaskOutcome,
        nbTasksDone,
      });
    }

    if (nbTasksDone === nbTasks || funnelIsCancelledByAction) {
      // last running task is done
      yield put({ type: allTasksDoneActionType });
      return;
    }
    // the side effect of shift is essential
    // it make the initial queue copy (param of *start()) progressively decrease
    const nextTask = remainingTasks.shift();

    if (nextTask) {
      const { cancel: _cancelEffect } = yield race({
        task: call(runTask, nextTask),
        cancel: take(cancelActionType),
      });
      if (_cancelEffect) {
        funnelIsCancelledByAction = true;
      }
    }
  }

  //
  // funnel main saga here
  //

  if (nbTasks === 0) {
    return {
      global: computeGlobalOutcome({
        allTasksOutcomes,
        funnelIsCancelled: funnelIsCancelledByAction,
      }),
      tasks: allTasksOutcomes,
    };
  }

  // don't alter caller array
  const remainingQueue = [...queue];

  const tasksTerminationReactor = (yield takeEvery(
    taskDoneActionType,
    onTaskDone,
    remainingQueue,
  )) as Task;

  // side effect on purpose on remainingQueue
  const pool = remainingQueue.splice(0, maxConcurrency);
  const { cancel: cancelEffect } = (yield race({
    processing: call(forkInitialTasks, pool),
    cancel: take(cancelActionType),
  })) as { cancel: any };

  if (cancelEffect) {
    funnelIsCancelledByAction = true;
  }

  try {
    // wait for completion of all forks here
    yield take(allTasksDoneActionType);
  } finally {
    // cancelled or not, we must release the task termination saga
    yield cancel(tasksTerminationReactor);
  }

  return {
    global: computeGlobalOutcome({
      allTasksOutcomes,
      funnelIsCancelled: funnelIsCancelledByAction,
    }),
    tasks: allTasksOutcomes,
  };
};

export default funnel;
