RxJS Testing Strategies and Tools

1. TestScheduler for Virtual Time Testing

Feature Syntax Description Use Case
TestScheduler new TestScheduler(assertDeepEqual) Virtual time scheduler for synchronous testing Testing time-based operators without waiting
run() scheduler.run(callback) Execute tests in virtual time context Running marble tests synchronously
flush() scheduler.flush() Execute all scheduled actions immediately Force completion of async operations
maxFrames scheduler.maxFrames = 1000 Set maximum virtual time frames Preventing infinite loops in tests
Virtual Time Unit 1 frame = 1ms (virtual) Time advances only when scheduled Deterministic timing in tests

Example: Basic TestScheduler setup

import { TestScheduler } from 'rxjs/testing';

describe('Observable Tests', () => {
  let scheduler: TestScheduler;
  
  beforeEach(() => {
    // Create scheduler with assertion function
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });
  
  it('should test time-based operators synchronously', () => {
    scheduler.run(({ cold, expectObservable }) => {
      // Test debounceTime without waiting
      const source$ = cold('a-b-c---|');
      const expected =     '----c---|';
      
      const result$ = source$.pipe(
        debounceTime(30) // 30 virtual frames
      );
      
      expectObservable(result$).toBe(expected);
    });
  });
});

Example: Testing asynchronous operations synchronously

it('should test interval and delay synchronously', () => {
  scheduler.run(({ expectObservable }) => {
    // interval emits every 10 frames
    const source$ = interval(10).pipe(
      take(5),
      delay(5) // Add 5 frame delay
    );
    
    // Expected: delay 5, then emit at 10, 20, 30, 40, 50
    const expected = '-----a-b-c-d-(e|)';
    const values = { a: 0, b: 1, c: 2, d: 3, e: 4 };
    
    expectObservable(source$).toBe(expected, values);
  });
});

it('should test timer and timeout', () => {
  scheduler.run(({ expectObservable, cold }) => {
    const source$ = cold('-------a--|').pipe(
      timeout(50)
    );
    
    const expected =     '-------a--|';
    
    expectObservable(source$).toBe(expected);
  });
  
  scheduler.run(({ expectObservable, cold }) => {
    // This should timeout
    const source$ = cold('---------------a--|').pipe(
      timeout(50)
    );
    
    const expected =     '-----#'; // Error at frame 50
    
    expectObservable(source$).toBe(expected, undefined, 
      new Error('Timeout has occurred'));
  });
});

2. Marble Testing Syntax and Patterns

Symbol Meaning Example Description
- Time frame '---a' Each dash = 1 virtual time frame (1ms)
a-z, 0-9 Emission 'a-b-c' Emit values at specific frames
| Completion 'a-b-c|' Observable completes
# Error 'a-b-#' Observable errors at this frame
( ) Grouping '(ab)-c' Synchronous emissions in same frame
^ Subscription start '^--!' Subscription point marker
! Unsubscription '^--!' Unsubscription point marker
' ' Space (ignored) 'a b c' For readability, no effect on timing

Example: Marble diagram patterns

// Basic emission pattern
'a-b-c|'        // Emit a, b, c then complete
// Frame: 0-1-2-3

// Synchronous emissions
'(abc)|'        // Emit a, b, c synchronously then complete
// All at frame 0

// Multiple values with grouping
'a-(bc)-d|'     // Emit a, then b+c together, then d, complete
// a at 0, b+c at 2, d at 4, complete at 5

// Error handling
'a-b-#'         // Emit a, b, then error
'a-b-#', null, new Error('oops')  // With specific error

// Long delays
'a 99ms b|'     // Emit a, wait 99ms, emit b, complete
'a 1s b|'       // Emit a, wait 1 second, emit b

// Never completes
'a-b-c-'        // Emit a, b, c, never completes

// Immediate completion
'|'             // Complete immediately
'(a|)'          // Emit a and complete synchronously

// Empty observable
''              // Never emits, never completes

// Subscription tracking
'^---!'         // Subscribe at 0, unsubscribe at 4
'---^--!'       // Subscribe at 3, unsubscribe at 6

Example: Complex marble test

it('should test switchMap with marble diagrams', () => {
  scheduler.run(({ cold, hot, expectObservable }) => {
    // Source observable (hot)
    const source$ = hot('  -a-----b-----c----|');
    
    // Inner observables (cold)
    const a$ =      cold('   --1--2--3|       ');
    const b$ =      cold('         --4--5--6|');
    const c$ =      cold('               --7|');
    
    const expected =    '  ---1--2---4--5---7|';
    const values = { 
      1: 'a1', 2: 'a2', 
      4: 'b4', 5: 'b5', 6: 'b6',
      7: 'c7'
    };
    
    const result$ = source$.pipe(
      switchMap(val => {
        if (val === 'a') return a$.pipe(map(n => val + n));
        if (val === 'b') return b$.pipe(map(n => val + n));
        if (val === 'c') return c$.pipe(map(n => val + n));
        return EMPTY;
      })
    );
    
    expectObservable(result$).toBe(expected, values);
  });
});

3. hot() and cold() Observable Testing Helpers

Helper Type Behavior Use Case
cold() Cold Observable Starts emitting when subscribed Testing operators that create new observables
hot() Hot Observable Emits regardless of subscriptions Testing shared observables, subjects
Subscription Point ^ Default subscription at frame 0 Controlling when subscription occurs
Values Object { a: value } Map marble symbols to actual values Testing with complex objects
Error Object { # : error } Define error for # symbol Testing error handling

Example: cold() vs hot() observables

it('should understand cold observable behavior', () => {
  scheduler.run(({ cold, expectObservable }) => {
    // Cold: each subscription gets full sequence
    const cold$ = cold('--a--b--c|');
    
    // First subscription at frame 0
    expectObservable(cold$).toBe('--a--b--c|');
    
    // Second subscription also starts from beginning
    expectObservable(cold$).toBe('--a--b--c|');
  });
});

it('should understand hot observable behavior', () => {
  scheduler.run(({ hot, expectObservable }) => {
    // Hot: emissions happen regardless of subscriptions
    const hot$ = hot('--a--b--c--d--e|');
    
    // Subscribe at frame 0, see all emissions
    expectObservable(
      hot$,
      '^--------------!'
    ).toBe('--a--b--c--d--e|');
    
    // Subscribe at frame 6 (after 'b'), miss early values
    expectObservable(
      hot$,
      '------^--------!'
    ).toBe('------c--d--e|');
  });
});

it('should test hot observable with subscription points', () => {
  scheduler.run(({ hot, expectObservable }) => {
    // Subscription point marked with ^
    const hot$ = hot('---^-a--b--c|');
    
    // Will receive a, b, c (subscribed at ^)
    expectObservable(hot$).toBe('--a--b--c|');
  });
});

Example: Using values object for complex data

it('should test with complex objects', () => {
  scheduler.run(({ cold, expectObservable }) => {
    const values = {
      a: { id: 1, name: 'Alice' },
      b: { id: 2, name: 'Bob' },
      c: { id: 3, name: 'Charlie' }
    };
    
    const source$ = cold('--a--b--c|', values);
    const expected =     '--a--b--c|';
    
    const result$ = source$.pipe(
      map(user => ({ ...user, processed: true }))
    );
    
    expectObservable(result$).toBe(expected, {
      a: { id: 1, name: 'Alice', processed: true },
      b: { id: 2, name: 'Bob', processed: true },
      c: { id: 3, name: 'Charlie', processed: true }
    });
  });
});

it('should test error emissions', () => {
  scheduler.run(({ cold, expectObservable }) => {
    const error = new Error('Test error');
    
    const source$ = cold('--a--b--#', 
      { a: 1, b: 2 }, 
      error
    );
    
    const result$ = source$.pipe(
      catchError(err => of('recovered'))
    );
    
    const expected = '--a--b--(r|)';
    
    expectObservable(result$).toBe(expected, { 
      a: 1, b: 2, r: 'recovered' 
    });
  });
});

4. expectObservable() and expectSubscriptions() Assertions

Assertion Syntax Description Use Case
expectObservable() .toBe(marble, values?, error?) Assert observable emissions match pattern Verifying emission timing and values
expectSubscriptions() .toBe(subscriptionMarbles) Assert subscription/unsubscription timing Verifying subscription lifecycle
toEqual() .toEqual(expected) Deep equality check Comparing complex objects
Subscription Marble '^---!' ^ = subscribe, ! = unsubscribe Tracking subscription timing
Multiple Subscriptions ['^--!', '--^--!'] Array of subscription patterns Testing multicasting, sharing

Example: Using expectObservable()

it('should verify emission values and timing', () => {
  scheduler.run(({ cold, expectObservable }) => {
    const source$ = cold('--a--b--c|');
    const expected =     '--a--b--c|';
    
    expectObservable(source$).toBe(expected);
  });
});

it('should verify with custom values', () => {
  scheduler.run(({ cold, expectObservable }) => {
    const source$ = cold('--a--b--c|', { a: 1, b: 2, c: 3 });
    
    const result$ = source$.pipe(
      map(x => x * 10)
    );
    
    const expected = '--a--b--c|';
    const values = { a: 10, b: 20, c: 30 };
    
    expectObservable(result$).toBe(expected, values);
  });
});

it('should verify error emissions', () => {
  scheduler.run(({ cold, expectObservable }) => {
    const error = new Error('Failed');
    const source$ = cold('--a--#', { a: 1 }, error);
    
    expectObservable(source$).toBe('--a--#', { a: 1 }, error);
  });
});

Example: Using expectSubscriptions()

it('should verify subscription timing', () => {
  scheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
    const source$ = cold('--a--b--c--d--e|');
    const subs =         '^--------------!';
    
    expectObservable(source$).toBe('--a--b--c--d--e|');
    expectSubscriptions(source$.subscriptions).toBe(subs);
  });
});

it('should verify unsubscription with takeUntil', () => {
  scheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
    const source$ = cold('--a--b--c--d--e|');
    const notifier$ = hot('-------x');
    const expected =      '--a--b-|';
    const subs =          '^------!';
    
    const result$ = source$.pipe(
      takeUntil(notifier$)
    );
    
    expectObservable(result$).toBe(expected);
    expectSubscriptions(source$.subscriptions).toBe(subs);
  });
});

it('should verify multiple subscriptions with share', () => {
  scheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
    const source$ = cold('--a--b--c|');
    const shared$ = source$.pipe(share());
    
    // Two subscriptions
    expectObservable(shared$).toBe('--a--b--c|');
    expectObservable(shared$).toBe('--a--b--c|');
    
    // But only one subscription to source
    expectSubscriptions(source$.subscriptions).toBe('^--------!');
  });
});

5. flush() and getMessages() for Test Execution Control

Method Purpose Description Use Case
flush() Execute scheduled Force all scheduled virtual time actions Manual test execution control
getMessages() Inspect emissions Retrieve array of notification messages Custom assertions on messages
frame Virtual time Current virtual time frame number Timing verification
Notification Message object { frame, notification: { kind, value/error } } Detailed emission inspection

Example: Using flush() for manual control

import { TestScheduler } from 'rxjs/testing';

it('should use flush for manual execution', () => {
  const scheduler = new TestScheduler((actual, expected) => {
    expect(actual).toEqual(expected);
  });
  
  const emissions: number[] = [];
  
  // Create observable on the scheduler
  interval(10, scheduler).pipe(
    take(5)
  ).subscribe(x => emissions.push(x));
  
  // Nothing emitted yet
  expect(emissions).toEqual([]);
  
  // Manually flush - executes all scheduled actions
  scheduler.flush();
  
  // Now all emissions have occurred
  expect(emissions).toEqual([0, 1, 2, 3, 4]);
});

it('should control partial execution', () => {
  const scheduler = new TestScheduler((actual, expected) => {
    expect(actual).toEqual(expected);
  });
  
  const emissions: number[] = [];
  
  timer(0, 10, scheduler).pipe(
    take(10)
  ).subscribe(x => emissions.push(x));
  
  // Advance time partially
  scheduler.flush();
  
  expect(emissions.length).toBe(10);
});

Example: Using getMessages() for custom assertions

it('should inspect messages directly', () => {
  scheduler.run(({ cold, expectObservable }) => {
    const source$ = cold('--a--b--c|', { a: 1, b: 2, c: 3 });
    
    const result$ = source$.pipe(
      map(x => x * 10)
    );
    
    // Subscribe and get messages
    const messages = scheduler.run(({ expectObservable }) => {
      expectObservable(result$);
      return result$;
    });
  });
});

// More advanced: accessing raw messages
it('should verify notification details', (done) => {
  const scheduler = new TestScheduler((actual, expected) => {
    expect(actual).toEqual(expected);
  });
  
  scheduler.run(({ cold }) => {
    const source$ = cold('--a--b--c|', { a: 1, b: 2, c: 3 });
    
    // Create a tracked observable
    const messages: any[] = [];
    
    source$.subscribe({
      next: val => messages.push({ 
        frame: scheduler.frame, 
        kind: 'N', 
        value: val 
      }),
      complete: () => messages.push({ 
        frame: scheduler.frame, 
        kind: 'C' 
      })
    });
    
    scheduler.flush();
    
    // Custom assertions
    expect(messages).toEqual([
      { frame: 20, kind: 'N', value: 1 },  // 'a' at frame 20
      { frame: 50, kind: 'N', value: 2 },  // 'b' at frame 50
      { frame: 80, kind: 'N', value: 3 },  // 'c' at frame 80
      { frame: 90, kind: 'C' }             // Complete at frame 90
    ]);
  });
  
  done();
});

6. Mock Observers and Subscription Testing

Technique Implementation Purpose Use Case
Mock Observer Object with next/error/complete spies Track observer method calls Verifying observer behavior
Spy Functions jest.fn() or jasmine.createSpy() Mock and track function calls Assertion on call count and arguments
Subscription Tracking Track add() and unsubscribe() calls Verify subscription lifecycle Memory leak prevention testing
Subject Mocking Use real Subjects for testing Control emission timing manually Integration testing

Example: Mock observer testing

describe('Mock Observer Tests', () => {
  it('should track observer method calls', (done) => {
    const nextSpy = jest.fn();
    const errorSpy = jest.fn();
    const completeSpy = jest.fn();
    
    const observer = {
      next: nextSpy,
      error: errorSpy,
      complete: completeSpy
    };
    
    of(1, 2, 3).subscribe(observer);
    
    // Verify calls
    expect(nextSpy).toHaveBeenCalledTimes(3);
    expect(nextSpy).toHaveBeenNthCalledWith(1, 1);
    expect(nextSpy).toHaveBeenNthCalledWith(2, 2);
    expect(nextSpy).toHaveBeenNthCalledWith(3, 3);
    expect(errorSpy).not.toHaveBeenCalled();
    expect(completeSpy).toHaveBeenCalledTimes(1);
    
    done();
  });
  
  it('should track error handling', (done) => {
    const nextSpy = jest.fn();
    const errorSpy = jest.fn();
    const completeSpy = jest.fn();
    
    throwError(() => new Error('Test error')).subscribe({
      next: nextSpy,
      error: errorSpy,
      complete: completeSpy
    });
    
    expect(nextSpy).not.toHaveBeenCalled();
    expect(errorSpy).toHaveBeenCalledTimes(1);
    expect(errorSpy.mock.calls[0][0]).toBeInstanceOf(Error);
    expect(completeSpy).not.toHaveBeenCalled();
    
    done();
  });
});

Example: Subscription lifecycle testing

describe('Subscription Testing', () => {
  it('should verify subscription cleanup', () => {
    const teardownSpy = jest.fn();
    
    const source$ = new Observable(subscriber => {
      subscriber.next(1);
      subscriber.next(2);
      
      // Return teardown function
      return () => {
        teardownSpy();
      };
    });
    
    const subscription = source$.subscribe();
    
    expect(teardownSpy).not.toHaveBeenCalled();
    
    subscription.unsubscribe();
    
    expect(teardownSpy).toHaveBeenCalledTimes(1);
  });
  
  it('should test subscription composition', () => {
    const teardown1 = jest.fn();
    const teardown2 = jest.fn();
    const teardown3 = jest.fn();
    
    const sub1 = new Subscription(teardown1);
    const sub2 = new Subscription(teardown2);
    const sub3 = new Subscription(teardown3);
    
    sub1.add(sub2);
    sub1.add(sub3);
    
    sub1.unsubscribe();
    
    // All should be called when parent unsubscribes
    expect(teardown1).toHaveBeenCalledTimes(1);
    expect(teardown2).toHaveBeenCalledTimes(1);
    expect(teardown3).toHaveBeenCalledTimes(1);
  });
  
  it('should verify takeUntil unsubscription', () => {
    scheduler.run(({ cold, hot }) => {
      const sourceTeardown = jest.fn();
      
      const source$ = cold('--a--b--c--d--e|').pipe(
        finalize(() => sourceTeardown())
      );
      
      const notifier$ = hot('-------x');
      
      source$.pipe(
        takeUntil(notifier$)
      ).subscribe();
      
      scheduler.flush();
      
      expect(sourceTeardown).toHaveBeenCalledTimes(1);
    });
  });
});

Example: Testing with Subject mocks

describe('Subject Mock Testing', () => {
  it('should test component with Subject input', () => {
    const clicks$ = new Subject<MouseEvent>();
    const emissions: number[] = [];
    
    clicks$.pipe(
      scan((count, _) => count + 1, 0)
    ).subscribe(count => emissions.push(count));
    
    // Simulate clicks
    clicks$.next({} as MouseEvent);
    clicks$.next({} as MouseEvent);
    clicks$.next({} as MouseEvent);
    
    expect(emissions).toEqual([1, 2, 3]);
  });
  
  it('should test with BehaviorSubject state', () => {
    const state$ = new BehaviorSubject({ count: 0 });
    const emissions: any[] = [];
    
    state$.subscribe(state => emissions.push(state));
    
    // Initial value received immediately
    expect(emissions).toEqual([{ count: 0 }]);
    
    state$.next({ count: 1 });
    state$.next({ count: 2 });
    
    expect(emissions).toEqual([
      { count: 0 },
      { count: 1 },
      { count: 2 }
    ]);
  });
  
  it('should test service with injected observables', () => {
    // Mock HTTP service
    const mockHttp = {
      get: jest.fn().mockReturnValue(of({ data: 'test' }))
    };
    
    // Service under test
    class DataService {
      getData() {
        return mockHttp.get('/api/data').pipe(
          map(response => response.data)
        );
      }
    }
    
    const service = new DataService();
    
    service.getData().subscribe(data => {
      expect(data).toBe('test');
    });
    
    expect(mockHttp.get).toHaveBeenCalledWith('/api/data');
    expect(mockHttp.get).toHaveBeenCalledTimes(1);
  });
});
Testing Best Practices:
  • Use marble diagrams for visual clarity in time-based tests
  • Prefer scheduler.run() over manual flush() for consistency
  • Test error paths and edge cases explicitly
  • Verify subscription cleanup to prevent memory leaks
  • Use mock observers to verify exact call sequences
  • Keep tests deterministic - avoid real timers
Common Testing Pitfalls:
  • Mixing real time with virtual time - leads to flaky tests
  • Not handling asynchronous completion properly (use done() or async/await)
  • Forgetting to unsubscribe in tests - causes test pollution
  • Over-mocking - test too far from real behavior
  • Not testing unsubscription and cleanup logic

Section 14 Summary

  • TestScheduler provides virtual time for synchronous testing of async operations
  • Marble diagrams offer visual, declarative syntax for test expectations
  • cold() creates new emissions per subscription; hot() shares emissions
  • expectObservable() verifies emissions; expectSubscriptions() verifies lifecycle
  • flush() executes all scheduled virtual time; getMessages() inspects raw notifications
  • Mock observers and subjects enable precise behavior verification