import { ofType } from 'redux-observable';
import {
  of,
  catchError,
  take,
  combineLatestWith,
  map,
  mergeMap,
  filter,
} from 'rxjs';
import { MQTT_CONNECTED, MQTT_RX, MQTT_SUBSCRIBE_ } from './mqtt';
import { markets as market } from 'liquibook-lib';
import { decodeProtobuf } from '../utils/utils';
import { SESSION_FETCHED } from './session';
import { MarketDataApiService, MarketsApiService } from '../services';

// action types
export const MARKETS_FETCH = 'MARKETS:FETCH';
export const MARKETS_FETCHED = 'MARKETS:FETCHED';
export const MARKETS_FETCHED_ERROR = 'MARKETS:FETCHED_ERROR';
export const MARKETS_QUOTE_FETCH = 'MARKETS:QUOTE_FETCH';
export const MARKETS_QUOTE_FETCH_ERROR = 'MARKETS:QUOTE_FETCH_ERROR';
export const MARKETS_QUOTE = 'MARKETS:QUOTE';

// action creators
export const MARKETS_FETCH_ = () => ({ type: MARKETS_FETCH });
export const MARKETS_FETCHED_ = (markets) => ({
  type: MARKETS_FETCHED,
  markets,
});
export const MARKETS_FETCHED_ERROR_ = (status, message) => ({
  type: MARKETS_FETCHED_ERROR,
  status,
  message,
});
export const MARKETS_QUOTE_FETCH_ = (symbol) => ({
  type: MARKETS_QUOTE_FETCH,
  symbol,
});
export const MARKETS_QUOTE_FETCH_ERROR_ = (error) => ({
  type: MARKETS_QUOTE_FETCH_ERROR,
  error,
});
export const MARKETS_QUOTE_ = (quote) => ({ type: MARKETS_QUOTE, quote });

// reducer
export const markets = (state = {}, action) => {
  switch (action.type) {
    default:
      return state;
    case MARKETS_FETCHED:
      var newState = {};
      action.markets.map(
        (o) =>
          (newState[o.symbol] = {
            ...o,
            quote: {},
          })
      );
      return newState;
    case MARKETS_QUOTE:
      return {
        ...state,
        [action.quote.symbol]: {
          ...state[action.quote.symbol],
          quote: action.quote,
        },
      };
  }
};

// epics
export const epics = [
  // fetch markets
  //
  (action$, state$) =>
    action$.pipe(
      ofType(SESSION_FETCHED),
      filter((action) => action.session.isAuthorized),
      take(1), // since this rarely changes
      mergeMap(() =>
        new MarketsApiService(state$.value.session.access).getAll$().pipe(
          map((response) => MARKETS_FETCHED_(response)),
          catchError((error) => {
            of(MARKETS_FETCHED_ERROR_(error.status, error.message));
          })
        )
      )
    ),

  // subscribe to market quotes
  //
  (action$, state$) =>
    action$.pipe(
      combineLatestWith(
        action$.pipe(ofType(MARKETS_FETCHED), take(1)),
        action$.pipe(ofType(MQTT_CONNECTED), take(1))
      ),
      take(1),
      mergeMap(() =>
        Object.keys(state$.value.markets).map((subject) =>
          MQTT_SUBSCRIBE_(`markets/${subject}`)
        )
      )
    ),

  // handle market quotes
  //
  (action$) =>
    action$.pipe(
      ofType(MQTT_RX),
      filter((action) => action.topic[0] === 'markets'),
      map((action) => MARKETS_QUOTE_(market.Summary.decode(action.message)))
    ),

  // dispatch initial fetch of market quotes
  //
  (action$) =>
    action$.pipe(
      ofType(MARKETS_FETCHED),
      mergeMap((action) =>
        action.markets.map((m) => MARKETS_QUOTE_FETCH_(m.symbol))
      )
    ),

  // fetch a market quote
  //
  (action$, state$) =>
    action$.pipe(
      ofType(MARKETS_QUOTE_FETCH),
      mergeMap((action) =>
        new MarketDataApiService(state$.value.session.access)
          .getSummary$(`${action.symbol}`)
          .pipe(
            map((response) =>
              MARKETS_QUOTE_(decodeProtobuf(market.Summary, response))
            ),
            catchError((error) => of(MARKETS_QUOTE_FETCH_ERROR_(error)))
          )
      )
    ),
];
