RxJS Error Handling (catchError, retry, throwError)

1. catchError - Exception Handling and Recovery

Operator Syntax Description Return Behavior
catchError catchError((err, caught) => obs$) Catches errors and returns recovery observable - prevents stream termination Must return observable (fallback, retry, or EMPTY)
caught argument catchError((err, caught) => caught) Second parameter is source observable - enables restart/retry patterns Return caught to retry from beginning

Example: Error catching and recovery

import { of, throwError, EMPTY } from 'rxjs';
import { catchError, map } from 'rxjs/operators';

// Basic error catching with fallback
of(1, 2, 3, 4, 5).pipe(
  map(n => {
    if (n === 3) throw new Error('Error at 3');
    return n * 10;
  }),
  catchError(err => {
    console.error('Caught:', err.message);
    return of(999); // Fallback value
  })
).subscribe(val => console.log(val)); // 10, 20, 999, completes

// catchError with EMPTY - suppress error, complete silently
throwError(() => new Error('Fail')).pipe(
  catchError(err => {
    console.error('Error suppressed:', err.message);
    return EMPTY; // Complete without emitting
  })
).subscribe({
  next: val => console.log(val),
  complete: () => console.log('Completed')
});

// catchError in HTTP request
this.http.get('/api/data').pipe(
  catchError(err => {
    console.error('API Error:', err);
    return of({ data: [], fromCache: true }); // Fallback data
  })
).subscribe(response => displayData(response.data));

// Multiple catchError for different error types
this.http.get('/api/users').pipe(
  map(response => response.data),
  catchError(err => {
    if (err.status === 404) {
      return of([]); // Empty array for not found
    } else if (err.status === 401) {
      this.router.navigate(['/login']);
      return EMPTY;
    } else {
      return throwError(() => err); // Re-throw other errors
    }
  })
).subscribe(users => displayUsers(users));

// Practical: graceful degradation
const primaryApi$ = this.http.get('/api/primary');
const fallbackApi$ = this.http.get('/api/fallback');

primaryApi$.pipe(
  catchError(primaryErr => {
    console.warn('Primary failed, trying fallback');
    return fallbackApi$.pipe(
      catchError(fallbackErr => {
        console.error('Both failed');
        return of({ error: 'Service unavailable' });
      })
    );
  })
).subscribe(data => processData(data));

// Practical: error with user notification
this.userService.deleteUser(userId).pipe(
  catchError(err => {
    this.notificationService.showError('Failed to delete user');
    this.logger.error('Delete error:', err);
    return EMPTY; // Suppress error after handling
  })
).subscribe(() => this.notificationService.showSuccess('User deleted'));

// Using caught parameter for retry
let retryCount = 0;
const source$ = throwError(() => new Error('Fail'));
source$.pipe(
  catchError((err, caught) => {
    retryCount++;
    if (retryCount < 3) {
      console.log(`Retry ${retryCount}`);
      return caught; // Retry by returning source
    }
    return of('Failed after retries');
  })
).subscribe(val => console.log(val));
Note: catchError must return an Observable. Use EMPTY to complete silently, of(value) for fallback, or throwError to re-throw.

2. retry and retryWhen - Automatic Retry Logic

Operator Syntax Description Strategy
retry retry(count?) Resubscribes to source on error, up to count times (infinite if omitted) Immediate retry, no delay
retry (config) retry({ count, delay, resetOnSuccess }) Configurable retry with delay and reset options Delayed retry with backoff support
retryWhen retryWhen(notifier => notifier.pipe(...)) Custom retry logic based on error notification stream Fully customizable (exponential backoff, conditional)

Example: Retry strategies

import { throwError, timer, of } from 'rxjs';
import { retry, retryWhen, mergeMap, tap, delay } from 'rxjs/operators';

// retry - simple retry N times
let attempt = 0;
const unstable$ = new Observable(subscriber => {
  attempt++;
  if (attempt < 3) {
    subscriber.error(new Error(`Attempt ${attempt} failed`));
  } else {
    subscriber.next('Success!');
    subscriber.complete();
  }
});

unstable$.pipe(
  retry(2) // Retry up to 2 times (3 total attempts)
).subscribe({
  next: val => console.log(val),
  error: err => console.error('Final error:', err)
});

// retry with delay configuration
this.http.get('/api/data').pipe(
  retry({
    count: 3,
    delay: 1000, // Wait 1s between retries
    resetOnSuccess: true
  })
).subscribe(data => processData(data));

// retryWhen - exponential backoff
this.http.get('/api/users').pipe(
  retryWhen(errors => 
    errors.pipe(
      mergeMap((err, index) => {
        const retryAttempt = index + 1;
        if (retryAttempt > 3) {
          return throwError(() => err); // Max retries exceeded
        }
        const delayTime = Math.pow(2, retryAttempt) * 1000; // Exponential backoff
        console.log(`Retry ${retryAttempt} after ${delayTime}ms`);
        return timer(delayTime);
      })
    )
  )
).subscribe(
  users => displayUsers(users),
  err => showError('Failed after retries')
);

// retryWhen - conditional retry based on error type
this.http.post('/api/save', data).pipe(
  retryWhen(errors => 
    errors.pipe(
      mergeMap((err, index) => {
        // Retry only on network errors, not client errors
        if (err.status >= 500 && index < 3) {
          return timer((index + 1) * 2000); // 2s, 4s, 6s
        }
        return throwError(() => err); // Don't retry client errors
      })
    )
  )
).subscribe(
  response => handleSuccess(response),
  err => handleError(err)
);

// Practical: retry with max delay cap
const maxDelay = 10000; // Cap at 10 seconds
apiCall$.pipe(
  retryWhen(errors => 
    errors.pipe(
      mergeMap((err, index) => {
        if (index >= 5) return throwError(() => err);
        const backoff = Math.min(Math.pow(2, index) * 1000, maxDelay);
        return timer(backoff);
      })
    )
  )
).subscribe(data => processData(data));

// Practical: retry with user notification
this.http.get('/api/critical-data').pipe(
  retryWhen(errors => 
    errors.pipe(
      tap(() => this.notificationService.show('Retrying...')),
      delay(2000),
      take(3),
      concat(throwError(() => new Error('Max retries reached')))
    )
  ),
  catchError(err => {
    this.notificationService.showError('Unable to load data');
    return of(null);
  })
).subscribe(data => {
  if (data) {
    this.notificationService.showSuccess('Data loaded');
    processData(data);
  }
});

// Practical: smart retry - wait for connection
const checkOnline$ = interval(5000).pipe(
  filter(() => navigator.onLine),
  take(1)
);

apiCall$.pipe(
  retryWhen(errors => 
    errors.pipe(
      switchMap(err => {
        if (!navigator.onLine) {
          console.log('Waiting for connection...');
          return checkOnline$;
        }
        return timer(2000);
      }),
      take(5)
    )
  )
).subscribe(data => syncData(data));

3. throwError - Error Emission Operator

Function Syntax Description Use Case
throwError throwError(() => error) Creates observable that immediately emits error notification Testing, error propagation, fallback errors
Factory function throwError(() => new Error(msg)) Factory ensures fresh error instance per subscription Prevents shared error object mutations

Example: Error emission patterns

import { throwError, of, iif } from 'rxjs';
import { catchError, mergeMap } from 'rxjs/operators';

// Basic throwError
throwError(() => new Error('Something went wrong')).subscribe({
  next: val => console.log(val),
  error: err => console.error('Error:', err.message)
});

// Conditional error throwing
const isValid = false;
iif(
  () => isValid,
  of('Valid data'),
  throwError(() => new Error('Validation failed'))
).subscribe({
  next: val => console.log(val),
  error: err => console.error(err.message)
});

// throwError in catchError chain
this.http.get('/api/data').pipe(
  mergeMap(response => {
    if (!response.data) {
      return throwError(() => new Error('No data in response'));
    }
    return of(response.data);
  }),
  catchError(err => {
    this.logger.error(err);
    return throwError(() => new Error('Data processing failed'));
  })
).subscribe({
  next: data => processData(data),
  error: err => showErrorMessage(err.message)
});

// Practical: validation with throwError
function validateUser(user) {
  if (!user.email) {
    return throwError(() => new Error('Email required'));
  }
  if (!user.name) {
    return throwError(() => new Error('Name required'));
  }
  return of(user);
}

const user = { name: 'Alice' };
validateUser(user).subscribe({
  next: validUser => saveUser(validUser),
  error: err => showValidationError(err.message)
});

// Practical: custom error types
class ValidationError extends Error {
  constructor(public field: string, message: string) {
    super(message);
    this.name = 'ValidationError';
  }
}

class NetworkError extends Error {
  constructor(public statusCode: number, message: string) {
    super(message);
    this.name = 'NetworkError';
  }
}

this.http.get('/api/users').pipe(
  catchError(err => {
    if (err.status === 0) {
      return throwError(() => new NetworkError(0, 'No internet connection'));
    }
    return throwError(() => new NetworkError(err.status, err.message));
  })
).subscribe({
  error: err => {
    if (err instanceof NetworkError) {
      handleNetworkError(err);
    } else {
      handleGenericError(err);
    }
  }
});

// Practical: error transformation
apiCall$.pipe(
  catchError(err => 
    throwError(() => ({
      originalError: err,
      timestamp: Date.now(),
      context: 'API Call Failed',
      userMessage: 'Please try again later'
    }))
  )
).subscribe({
  error: errorInfo => {
    logError(errorInfo);
    showMessage(errorInfo.userMessage);
  }
});

4. onErrorResumeNext - Alternative Stream Switching

Operator Syntax Description Behavior
onErrorResumeNext onErrorResumeNext(obs1$, obs2$, ...) Continues with next observable on error OR completion, ignoring errors Errors are swallowed, continues to next stream

Example: Alternative stream switching

import { onErrorResumeNext, of, throwError } from 'rxjs';
import { map } from 'rxjs/operators';

// onErrorResumeNext - continue despite errors
const first$ = throwError(() => new Error('First failed'));
const second$ = of(2, 3, 4);
const third$ = throwError(() => new Error('Third failed'));
const fourth$ = of(5, 6);

onErrorResumeNext(first$, second$, third$, fourth$).subscribe({
  next: val => console.log(val), // 2, 3, 4, 5, 6
  error: err => console.error('Error:', err), // Never called
  complete: () => console.log('Complete')
});

// Compare with catchError
// catchError - handles error and decides next action
of(1, 2, 3).pipe(
  map(n => {
    if (n === 2) throw new Error('Error at 2');
    return n;
  }),
  catchError(err => of(999))
).subscribe(val => console.log(val)); // 1, 999

// onErrorResumeNext - silently moves to next observable
onErrorResumeNext(
  of(1, 2, 3).pipe(
    map(n => {
      if (n === 2) throw new Error('Error');
      return n;
    })
  ),
  of(4, 5, 6)
).subscribe(val => console.log(val)); // 1, 4, 5, 6 (error silently skipped)

// Practical: try multiple data sources
const cache$ = this.cacheService.get('data').pipe(
  map(data => {
    if (!data) throw new Error('No cache');
    return data;
  })
);
const api$ = this.http.get('/api/data');
const fallback$ = of({ data: [], offline: true });

onErrorResumeNext(cache$, api$, fallback$).subscribe(
  data => displayData(data)
);

// Practical: best-effort data collection
const source1$ = this.http.get('/api/source1').pipe(
  catchError(() => EMPTY)
);
const source2$ = this.http.get('/api/source2').pipe(
  catchError(() => EMPTY)
);
const source3$ = this.http.get('/api/source3').pipe(
  catchError(() => EMPTY)
);

onErrorResumeNext(source1$, source2$, source3$).pipe(
  scan((acc, data) => [...acc, data], [])
).subscribe(allData => processCollectedData(allData));

// Note: Prefer catchError for error handling
// onErrorResumeNext silently swallows errors which can hide issues
Warning: onErrorResumeNext silently swallows errors. Prefer catchError for explicit error handling and recovery logic.

5. timeout and timeoutWith - Time-based Errors

Operator Syntax Description On Timeout
timeout timeout(duration) Errors if no emission within specified time (ms) Emits TimeoutError
timeout (config) timeout({ first, each, with }) Configurable timeout for first emission and each subsequent Error or switch to 'with' observable
timeoutWith timeoutWith(duration, fallback$) Switches to fallback observable on timeout (deprecated, use timeout config) Switch to fallback stream

Example: Timeout handling

import { of, throwError, timer } from 'rxjs';
import { timeout, delay, catchError } from 'rxjs/operators';

// Basic timeout
of('data').pipe(
  delay(3000),
  timeout(2000) // Timeout after 2 seconds
).subscribe({
  next: val => console.log(val),
  error: err => console.error('Timeout:', err.name) // TimeoutError
});

// timeout with different limits
const slow$ = timer(5000);
slow$.pipe(
  timeout({
    first: 3000, // First value must arrive within 3s
    each: 1000   // Subsequent values within 1s of previous
  })
).subscribe({
  error: err => console.error('Timeout error')
});

// timeout with fallback observable
of('slow data').pipe(
  delay(5000),
  timeout({
    first: 2000,
    with: () => of('fallback data') // Switch to this on timeout
  })
).subscribe(val => console.log(val)); // 'fallback data'

// Practical: API call timeout
this.http.get('/api/users').pipe(
  timeout(5000), // 5 second timeout
  catchError(err => {
    if (err.name === 'TimeoutError') {
      this.notificationService.showError('Request timed out');
      return of([]);
    }
    return throwError(() => err);
  })
).subscribe(users => displayUsers(users));

// Practical: timeout with retry
this.http.post('/api/save', data).pipe(
  timeout(10000),
  retry({
    count: 2,
    delay: (error) => {
      if (error.name === 'TimeoutError') {
        return timer(1000); // Retry after 1s on timeout
      }
      return throwError(() => error);
    }
  }),
  catchError(err => {
    if (err.name === 'TimeoutError') {
      return of({ error: 'Operation timed out after retries' });
    }
    return throwError(() => err);
  })
).subscribe(response => handleResponse(response));

// Practical: real-time data with timeout fallback
const liveData$ = this.websocket.messages$.pipe(
  timeout({
    first: 5000,
    each: 3000,
    with: () => this.http.get('/api/fallback-data') // Use REST as fallback
  })
);

liveData$.subscribe(
  data => updateUI(data),
  err => console.error('Stream error:', err)
);

// Practical: user interaction timeout
const userInput$ = fromEvent(input, 'input');
userInput$.pipe(
  timeout({
    first: 30000, // User must start typing within 30s
    with: () => {
      showMessage('Session expired due to inactivity');
      return EMPTY;
    }
  })
).subscribe(event => processInput(event));

// Practical: progressive timeout
const progressiveTimeout$ = this.http.get('/api/large-dataset').pipe(
  timeout({
    first: 10000,  // 10s for first response
    each: 5000,    // 5s between chunks
    with: () => {
      // Try faster endpoint
      return this.http.get('/api/large-dataset/summary').pipe(
        map(summary => ({ partial: true, data: summary }))
      );
    }
  })
);

progressiveTimeout$.subscribe(
  data => {
    if (data.partial) {
      showPartialData(data.data);
    } else {
      showFullData(data);
    }
  }
);

6. finalize - Cleanup Operations (like finally)

Operator Syntax Description Execution
finalize finalize(() => cleanup()) Executes callback when observable completes, errors, or unsubscribes Always runs on stream termination (success/error/cancel)

Example: Cleanup operations with finalize

import { of, throwError, interval } from 'rxjs';
import { finalize, take, delay } from 'rxjs/operators';

// finalize on completion
of(1, 2, 3).pipe(
  finalize(() => console.log('Cleanup on complete'))
).subscribe(val => console.log(val));
// Output: 1, 2, 3, 'Cleanup on complete'

// finalize on error
throwError(() => new Error('Fail')).pipe(
  finalize(() => console.log('Cleanup on error'))
).subscribe({
  error: err => console.error(err.message)
});
// Output: 'Fail', 'Cleanup on error'

// finalize on unsubscribe
const subscription = interval(1000).pipe(
  finalize(() => console.log('Cleanup on unsubscribe'))
).subscribe(val => console.log(val));

setTimeout(() => subscription.unsubscribe(), 3500);
// Output: 0, 1, 2, 3, 'Cleanup on unsubscribe'

// Practical: loading spinner
this.http.get('/api/data').pipe(
  tap(() => this.showSpinner()),
  finalize(() => this.hideSpinner()) // Always hide spinner
).subscribe(
  data => this.displayData(data),
  err => this.showError(err)
);

// Practical: resource cleanup
const connection$ = this.openWebSocket().pipe(
  finalize(() => {
    console.log('Closing WebSocket connection');
    this.closeWebSocket();
  })
);

connection$.subscribe(
  message => handleMessage(message),
  err => handleError(err)
);

// Practical: temporary UI state
fromEvent(button, 'click').pipe(
  tap(() => button.disabled = true),
  switchMap(() => this.saveData()),
  finalize(() => button.disabled = false) // Re-enable button
).subscribe(
  result => showSuccess(result),
  err => showError(err)
);

// Practical: analytics tracking
this.processPayment(paymentData).pipe(
  finalize(() => {
    this.analytics.track('payment_attempt_completed', {
      success: this.paymentSuccess,
      timestamp: Date.now()
    });
  })
).subscribe(
  result => {
    this.paymentSuccess = true;
    showConfirmation(result);
  },
  err => {
    this.paymentSuccess = false;
    showError(err);
  }
);

// Practical: file upload cleanup
this.uploadFile(file).pipe(
  tap(() => this.uploadProgress = 0),
  finalize(() => {
    this.uploadProgress = null;
    this.cleanupTempFiles();
    console.log('Upload process completed');
  })
).subscribe(
  response => handleUploadSuccess(response),
  err => handleUploadError(err)
);

// Multiple finalize operators
of(1, 2, 3).pipe(
  finalize(() => console.log('First cleanup')),
  map(x => x * 10),
  finalize(() => console.log('Second cleanup'))
).subscribe(val => console.log(val));
// Output: 10, 20, 30, 'Second cleanup', 'First cleanup' (reverse order)

// Practical: database transaction
this.database.beginTransaction().pipe(
  switchMap(tx => this.performOperations(tx)),
  finalize(() => this.database.commit()) // Always commit/rollback
).subscribe(
  result => console.log('Transaction completed'),
  err => {
    this.database.rollback();
    console.error('Transaction failed');
  }
);
Note: finalize is guaranteed to run on completion, error, or unsubscribe - perfect for cleanup like hiding loaders, closing connections, or releasing resources.

Section 6 Summary

  • catchError handles errors and returns recovery observable (fallback, EMPTY, or retry)
  • retry resubscribes on error, retryWhen enables custom retry strategies (exponential backoff)
  • throwError creates error-emitting observable for testing and error propagation
  • timeout errors if no emission within time limit, can provide fallback observable
  • finalize executes cleanup on complete/error/unsubscribe - always runs
  • Error handling pattern: try → retry → catchError → finalize for robust pipelines