All files / src/app/core/utils operators.ts

100% Statements 49/49
100% Branches 5/5
100% Functions 32/32
100% Lines 39/39

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110457x   457x 457x                     457x             457x 77x 77x   31x 19x         457x 938x 938x   330x   1x   329x 329x         457x 3448x     457x 2222x     457x 3070x 3070x 515x         457x 3174x     457x 151x           457x 1x 1x   1x                       457x       319x 319x                   457x 135x 135x 103x      
import { ofType } from '@ngrx/effects';
import { Action, ActionCreator } from '@ngrx/store';
import { isEqual } from 'lodash-es';
import {
  MonoTypeOperatorFunction,
  NEVER,
  Observable,
  OperatorFunction,
  combineLatest,
  concat,
  connect,
  of,
  throwError,
} from 'rxjs';
import { buffer, catchError, distinctUntilChanged, filter, map, mergeAll, take, withLatestFrom } from 'rxjs/operators';
 
import { HttpError } from 'ish-core/models/http-error/http-error.model';
 
/**
 * compare the current stream with the latest value from given observable distinctively and fire only when value is different than the observable value and the last fired value
 */
export function distinctCompareWith<T>(observable: Observable<T>): OperatorFunction<T, T> {
  return (source$: Observable<T>) =>
    source$.pipe(
      withLatestFrom(observable),
      filter(([newVal, oldVal]) => newVal !== oldVal),
      map(([newVal]) => newVal),
      distinctUntilChanged()
    );
}
 
export function mapErrorToAction<S, T>(actionType: (props: { error: HttpError }) => T, extras?: object) {
  return (source$: Observable<S | T>) =>
    source$.pipe(
      catchError((error: HttpError) => {
        if (error.name !== 'HttpErrorResponse') {
          // rethrow runtime errors
          return throwError(() => error);
        }
        const errorAction = actionType({ error, ...extras });
        return of(errorAction);
      })
    );
}
 
export function mapToProperty<T, K extends keyof T>(property: K) {
  return (source$: Observable<T>) => source$.pipe<T[K]>(map(x => (x ? x[property] : undefined)));
}
 
export function mapToPayload<T>(): OperatorFunction<{ payload: T } & Action, T> {
  return (source$: Observable<{ payload: T }>) => source$.pipe(map(action => action.payload));
}
 
export function mapToPayloadProperty<T, K extends keyof T>(key: K): OperatorFunction<{ payload: T } & Action, T[K]> {
  return (source$: Observable<{ payload: T }>) =>
    source$.pipe(
      map(action => action.payload),
      mapToProperty(key)
    );
}
 
export function whenTruthy<T>(): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => source$.pipe(filter(x => !!x));
}
 
export function whenFalsy<T>(): MonoTypeOperatorFunction<T> {
  return (source$: Observable<T>) => source$.pipe(filter(x => !x));
}
 
/**
 * Stream will fire only emissions when the current value T of the source$ stream provides the value V for property T[K]
 */
export function whenPropertyHasValue<T, K extends keyof T, V>(property: K, value: V) {
  return (source$: Observable<T>) =>
    source$.pipe(
      whenTruthy(),
      filter(x => !!x[property] && isEqual(x[property], value))
    );
}
 
/**
 * Operator that maps to an observable when the stream contains the specified action.
 * Uses combineLatest so it will emit after both the action is fired and the observable emits.
 *
 * @param observable$ the observable that will be mapped to
 * @param action the action to listen for
 * @returns a stream containing only the values of the provided observable
 */
export function useCombinedObservableOnAction<T>(
  observable$: Observable<T>,
  action: ActionCreator
): OperatorFunction<Action, T> {
  return (source$: Observable<Action>) =>
    combineLatest([observable$, source$.pipe(ofType(action))]).pipe(map(([obs, _]) => obs));
}
 
/**
 * Delays emissions until the notifier emits.
 * Taken from https://ncjamieson.com/how-to-write-delayuntil/
 *
 * @param notifier the observable that will be waited for
 * @returns an observable that starts emitting only after the notifier emits
 */
export function delayUntil<T>(notifier: Observable<unknown>): OperatorFunction<T, T> {
  return source =>
    source.pipe(
      connect(connected => concat(concat(connected, NEVER).pipe(buffer(notifier), take(1), mergeAll()), connected))
    );
}