RxJS Testing Strategies and Tools
1. TestScheduler for Virtual Time Testing
| Feature | Syntax | Description | Use Case |
|---|---|---|---|
| TestScheduler | new TestScheduler(assertDeepEqual) |
Virtual time scheduler for synchronous testing | Testing time-based operators without waiting |
| run() | scheduler.run(callback) |
Execute tests in virtual time context | Running marble tests synchronously |
| flush() | scheduler.flush() |
Execute all scheduled actions immediately | Force completion of async operations |
| maxFrames | scheduler.maxFrames = 1000 |
Set maximum virtual time frames | Preventing infinite loops in tests |
| Virtual Time Unit | 1 frame = 1ms (virtual) | Time advances only when scheduled | Deterministic timing in tests |
Example: Basic TestScheduler setup
import { TestScheduler } from 'rxjs/testing';
describe('Observable Tests', () => {
let scheduler: TestScheduler;
beforeEach(() => {
// Create scheduler with assertion function
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
it('should test time-based operators synchronously', () => {
scheduler.run(({ cold, expectObservable }) => {
// Test debounceTime without waiting
const source$ = cold('a-b-c---|');
const expected = '----c---|';
const result$ = source$.pipe(
debounceTime(30) // 30 virtual frames
);
expectObservable(result$).toBe(expected);
});
});
});
Example: Testing asynchronous operations synchronously
it('should test interval and delay synchronously', () => {
scheduler.run(({ expectObservable }) => {
// interval emits every 10 frames
const source$ = interval(10).pipe(
take(5),
delay(5) // Add 5 frame delay
);
// Expected: delay 5, then emit at 10, 20, 30, 40, 50
const expected = '-----a-b-c-d-(e|)';
const values = { a: 0, b: 1, c: 2, d: 3, e: 4 };
expectObservable(source$).toBe(expected, values);
});
});
it('should test timer and timeout', () => {
scheduler.run(({ expectObservable, cold }) => {
const source$ = cold('-------a--|').pipe(
timeout(50)
);
const expected = '-------a--|';
expectObservable(source$).toBe(expected);
});
scheduler.run(({ expectObservable, cold }) => {
// This should timeout
const source$ = cold('---------------a--|').pipe(
timeout(50)
);
const expected = '-----#'; // Error at frame 50
expectObservable(source$).toBe(expected, undefined,
new Error('Timeout has occurred'));
});
});
2. Marble Testing Syntax and Patterns
| Symbol | Meaning | Example | Description |
|---|---|---|---|
| - | Time frame | '---a' |
Each dash = 1 virtual time frame (1ms) |
| a-z, 0-9 | Emission | 'a-b-c' |
Emit values at specific frames |
| | | Completion | 'a-b-c|' |
Observable completes |
| # | Error | 'a-b-#' |
Observable errors at this frame |
| ( ) | Grouping | '(ab)-c' |
Synchronous emissions in same frame |
| ^ | Subscription start | '^--!' |
Subscription point marker |
| ! | Unsubscription | '^--!' |
Unsubscription point marker |
| ' ' | Space (ignored) | 'a b c' |
For readability, no effect on timing |
Example: Marble diagram patterns
// Basic emission pattern
'a-b-c|' // Emit a, b, c then complete
// Frame: 0-1-2-3
// Synchronous emissions
'(abc)|' // Emit a, b, c synchronously then complete
// All at frame 0
// Multiple values with grouping
'a-(bc)-d|' // Emit a, then b+c together, then d, complete
// a at 0, b+c at 2, d at 4, complete at 5
// Error handling
'a-b-#' // Emit a, b, then error
'a-b-#', null, new Error('oops') // With specific error
// Long delays
'a 99ms b|' // Emit a, wait 99ms, emit b, complete
'a 1s b|' // Emit a, wait 1 second, emit b
// Never completes
'a-b-c-' // Emit a, b, c, never completes
// Immediate completion
'|' // Complete immediately
'(a|)' // Emit a and complete synchronously
// Empty observable
'' // Never emits, never completes
// Subscription tracking
'^---!' // Subscribe at 0, unsubscribe at 4
'---^--!' // Subscribe at 3, unsubscribe at 6
Example: Complex marble test
it('should test switchMap with marble diagrams', () => {
scheduler.run(({ cold, hot, expectObservable }) => {
// Source observable (hot)
const source$ = hot(' -a-----b-----c----|');
// Inner observables (cold)
const a$ = cold(' --1--2--3| ');
const b$ = cold(' --4--5--6|');
const c$ = cold(' --7|');
const expected = ' ---1--2---4--5---7|';
const values = {
1: 'a1', 2: 'a2',
4: 'b4', 5: 'b5', 6: 'b6',
7: 'c7'
};
const result$ = source$.pipe(
switchMap(val => {
if (val === 'a') return a$.pipe(map(n => val + n));
if (val === 'b') return b$.pipe(map(n => val + n));
if (val === 'c') return c$.pipe(map(n => val + n));
return EMPTY;
})
);
expectObservable(result$).toBe(expected, values);
});
});
3. hot() and cold() Observable Testing Helpers
| Helper | Type | Behavior | Use Case |
|---|---|---|---|
| cold() | Cold Observable | Starts emitting when subscribed | Testing operators that create new observables |
| hot() | Hot Observable | Emits regardless of subscriptions | Testing shared observables, subjects |
| Subscription Point | ^ | Default subscription at frame 0 | Controlling when subscription occurs |
| Values Object | { a: value } |
Map marble symbols to actual values | Testing with complex objects |
| Error Object | { # : error } |
Define error for # symbol | Testing error handling |
Example: cold() vs hot() observables
it('should understand cold observable behavior', () => {
scheduler.run(({ cold, expectObservable }) => {
// Cold: each subscription gets full sequence
const cold$ = cold('--a--b--c|');
// First subscription at frame 0
expectObservable(cold$).toBe('--a--b--c|');
// Second subscription also starts from beginning
expectObservable(cold$).toBe('--a--b--c|');
});
});
it('should understand hot observable behavior', () => {
scheduler.run(({ hot, expectObservable }) => {
// Hot: emissions happen regardless of subscriptions
const hot$ = hot('--a--b--c--d--e|');
// Subscribe at frame 0, see all emissions
expectObservable(
hot$,
'^--------------!'
).toBe('--a--b--c--d--e|');
// Subscribe at frame 6 (after 'b'), miss early values
expectObservable(
hot$,
'------^--------!'
).toBe('------c--d--e|');
});
});
it('should test hot observable with subscription points', () => {
scheduler.run(({ hot, expectObservable }) => {
// Subscription point marked with ^
const hot$ = hot('---^-a--b--c|');
// Will receive a, b, c (subscribed at ^)
expectObservable(hot$).toBe('--a--b--c|');
});
});
Example: Using values object for complex data
it('should test with complex objects', () => {
scheduler.run(({ cold, expectObservable }) => {
const values = {
a: { id: 1, name: 'Alice' },
b: { id: 2, name: 'Bob' },
c: { id: 3, name: 'Charlie' }
};
const source$ = cold('--a--b--c|', values);
const expected = '--a--b--c|';
const result$ = source$.pipe(
map(user => ({ ...user, processed: true }))
);
expectObservable(result$).toBe(expected, {
a: { id: 1, name: 'Alice', processed: true },
b: { id: 2, name: 'Bob', processed: true },
c: { id: 3, name: 'Charlie', processed: true }
});
});
});
it('should test error emissions', () => {
scheduler.run(({ cold, expectObservable }) => {
const error = new Error('Test error');
const source$ = cold('--a--b--#',
{ a: 1, b: 2 },
error
);
const result$ = source$.pipe(
catchError(err => of('recovered'))
);
const expected = '--a--b--(r|)';
expectObservable(result$).toBe(expected, {
a: 1, b: 2, r: 'recovered'
});
});
});
4. expectObservable() and expectSubscriptions() Assertions
| Assertion | Syntax | Description | Use Case |
|---|---|---|---|
| expectObservable() | .toBe(marble, values?, error?) |
Assert observable emissions match pattern | Verifying emission timing and values |
| expectSubscriptions() | .toBe(subscriptionMarbles) |
Assert subscription/unsubscription timing | Verifying subscription lifecycle |
| toEqual() | .toEqual(expected) |
Deep equality check | Comparing complex objects |
| Subscription Marble | '^---!' |
^ = subscribe, ! = unsubscribe | Tracking subscription timing |
| Multiple Subscriptions | ['^--!', '--^--!'] |
Array of subscription patterns | Testing multicasting, sharing |
Example: Using expectObservable()
it('should verify emission values and timing', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--a--b--c|');
const expected = '--a--b--c|';
expectObservable(source$).toBe(expected);
});
});
it('should verify with custom values', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--a--b--c|', { a: 1, b: 2, c: 3 });
const result$ = source$.pipe(
map(x => x * 10)
);
const expected = '--a--b--c|';
const values = { a: 10, b: 20, c: 30 };
expectObservable(result$).toBe(expected, values);
});
});
it('should verify error emissions', () => {
scheduler.run(({ cold, expectObservable }) => {
const error = new Error('Failed');
const source$ = cold('--a--#', { a: 1 }, error);
expectObservable(source$).toBe('--a--#', { a: 1 }, error);
});
});
Example: Using expectSubscriptions()
it('should verify subscription timing', () => {
scheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source$ = cold('--a--b--c--d--e|');
const subs = '^--------------!';
expectObservable(source$).toBe('--a--b--c--d--e|');
expectSubscriptions(source$.subscriptions).toBe(subs);
});
});
it('should verify unsubscription with takeUntil', () => {
scheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const source$ = cold('--a--b--c--d--e|');
const notifier$ = hot('-------x');
const expected = '--a--b-|';
const subs = '^------!';
const result$ = source$.pipe(
takeUntil(notifier$)
);
expectObservable(result$).toBe(expected);
expectSubscriptions(source$.subscriptions).toBe(subs);
});
});
it('should verify multiple subscriptions with share', () => {
scheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source$ = cold('--a--b--c|');
const shared$ = source$.pipe(share());
// Two subscriptions
expectObservable(shared$).toBe('--a--b--c|');
expectObservable(shared$).toBe('--a--b--c|');
// But only one subscription to source
expectSubscriptions(source$.subscriptions).toBe('^--------!');
});
});
5. flush() and getMessages() for Test Execution Control
| Method | Purpose | Description | Use Case |
|---|---|---|---|
| flush() | Execute scheduled | Force all scheduled virtual time actions | Manual test execution control |
| getMessages() | Inspect emissions | Retrieve array of notification messages | Custom assertions on messages |
| frame | Virtual time | Current virtual time frame number | Timing verification |
| Notification | Message object | { frame, notification: { kind, value/error } } | Detailed emission inspection |
Example: Using flush() for manual control
import { TestScheduler } from 'rxjs/testing';
it('should use flush for manual execution', () => {
const scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
const emissions: number[] = [];
// Create observable on the scheduler
interval(10, scheduler).pipe(
take(5)
).subscribe(x => emissions.push(x));
// Nothing emitted yet
expect(emissions).toEqual([]);
// Manually flush - executes all scheduled actions
scheduler.flush();
// Now all emissions have occurred
expect(emissions).toEqual([0, 1, 2, 3, 4]);
});
it('should control partial execution', () => {
const scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
const emissions: number[] = [];
timer(0, 10, scheduler).pipe(
take(10)
).subscribe(x => emissions.push(x));
// Advance time partially
scheduler.flush();
expect(emissions.length).toBe(10);
});
Example: Using getMessages() for custom assertions
it('should inspect messages directly', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--a--b--c|', { a: 1, b: 2, c: 3 });
const result$ = source$.pipe(
map(x => x * 10)
);
// Subscribe and get messages
const messages = scheduler.run(({ expectObservable }) => {
expectObservable(result$);
return result$;
});
});
});
// More advanced: accessing raw messages
it('should verify notification details', (done) => {
const scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
scheduler.run(({ cold }) => {
const source$ = cold('--a--b--c|', { a: 1, b: 2, c: 3 });
// Create a tracked observable
const messages: any[] = [];
source$.subscribe({
next: val => messages.push({
frame: scheduler.frame,
kind: 'N',
value: val
}),
complete: () => messages.push({
frame: scheduler.frame,
kind: 'C'
})
});
scheduler.flush();
// Custom assertions
expect(messages).toEqual([
{ frame: 20, kind: 'N', value: 1 }, // 'a' at frame 20
{ frame: 50, kind: 'N', value: 2 }, // 'b' at frame 50
{ frame: 80, kind: 'N', value: 3 }, // 'c' at frame 80
{ frame: 90, kind: 'C' } // Complete at frame 90
]);
});
done();
});
6. Mock Observers and Subscription Testing
| Technique | Implementation | Purpose | Use Case |
|---|---|---|---|
| Mock Observer | Object with next/error/complete spies | Track observer method calls | Verifying observer behavior |
| Spy Functions | jest.fn() or jasmine.createSpy() | Mock and track function calls | Assertion on call count and arguments |
| Subscription Tracking | Track add() and unsubscribe() calls | Verify subscription lifecycle | Memory leak prevention testing |
| Subject Mocking | Use real Subjects for testing | Control emission timing manually | Integration testing |
Example: Mock observer testing
describe('Mock Observer Tests', () => {
it('should track observer method calls', (done) => {
const nextSpy = jest.fn();
const errorSpy = jest.fn();
const completeSpy = jest.fn();
const observer = {
next: nextSpy,
error: errorSpy,
complete: completeSpy
};
of(1, 2, 3).subscribe(observer);
// Verify calls
expect(nextSpy).toHaveBeenCalledTimes(3);
expect(nextSpy).toHaveBeenNthCalledWith(1, 1);
expect(nextSpy).toHaveBeenNthCalledWith(2, 2);
expect(nextSpy).toHaveBeenNthCalledWith(3, 3);
expect(errorSpy).not.toHaveBeenCalled();
expect(completeSpy).toHaveBeenCalledTimes(1);
done();
});
it('should track error handling', (done) => {
const nextSpy = jest.fn();
const errorSpy = jest.fn();
const completeSpy = jest.fn();
throwError(() => new Error('Test error')).subscribe({
next: nextSpy,
error: errorSpy,
complete: completeSpy
});
expect(nextSpy).not.toHaveBeenCalled();
expect(errorSpy).toHaveBeenCalledTimes(1);
expect(errorSpy.mock.calls[0][0]).toBeInstanceOf(Error);
expect(completeSpy).not.toHaveBeenCalled();
done();
});
});
Example: Subscription lifecycle testing
describe('Subscription Testing', () => {
it('should verify subscription cleanup', () => {
const teardownSpy = jest.fn();
const source$ = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
// Return teardown function
return () => {
teardownSpy();
};
});
const subscription = source$.subscribe();
expect(teardownSpy).not.toHaveBeenCalled();
subscription.unsubscribe();
expect(teardownSpy).toHaveBeenCalledTimes(1);
});
it('should test subscription composition', () => {
const teardown1 = jest.fn();
const teardown2 = jest.fn();
const teardown3 = jest.fn();
const sub1 = new Subscription(teardown1);
const sub2 = new Subscription(teardown2);
const sub3 = new Subscription(teardown3);
sub1.add(sub2);
sub1.add(sub3);
sub1.unsubscribe();
// All should be called when parent unsubscribes
expect(teardown1).toHaveBeenCalledTimes(1);
expect(teardown2).toHaveBeenCalledTimes(1);
expect(teardown3).toHaveBeenCalledTimes(1);
});
it('should verify takeUntil unsubscription', () => {
scheduler.run(({ cold, hot }) => {
const sourceTeardown = jest.fn();
const source$ = cold('--a--b--c--d--e|').pipe(
finalize(() => sourceTeardown())
);
const notifier$ = hot('-------x');
source$.pipe(
takeUntil(notifier$)
).subscribe();
scheduler.flush();
expect(sourceTeardown).toHaveBeenCalledTimes(1);
});
});
});
Example: Testing with Subject mocks
describe('Subject Mock Testing', () => {
it('should test component with Subject input', () => {
const clicks$ = new Subject<MouseEvent>();
const emissions: number[] = [];
clicks$.pipe(
scan((count, _) => count + 1, 0)
).subscribe(count => emissions.push(count));
// Simulate clicks
clicks$.next({} as MouseEvent);
clicks$.next({} as MouseEvent);
clicks$.next({} as MouseEvent);
expect(emissions).toEqual([1, 2, 3]);
});
it('should test with BehaviorSubject state', () => {
const state$ = new BehaviorSubject({ count: 0 });
const emissions: any[] = [];
state$.subscribe(state => emissions.push(state));
// Initial value received immediately
expect(emissions).toEqual([{ count: 0 }]);
state$.next({ count: 1 });
state$.next({ count: 2 });
expect(emissions).toEqual([
{ count: 0 },
{ count: 1 },
{ count: 2 }
]);
});
it('should test service with injected observables', () => {
// Mock HTTP service
const mockHttp = {
get: jest.fn().mockReturnValue(of({ data: 'test' }))
};
// Service under test
class DataService {
getData() {
return mockHttp.get('/api/data').pipe(
map(response => response.data)
);
}
}
const service = new DataService();
service.getData().subscribe(data => {
expect(data).toBe('test');
});
expect(mockHttp.get).toHaveBeenCalledWith('/api/data');
expect(mockHttp.get).toHaveBeenCalledTimes(1);
});
});
Testing Best Practices:
- Use marble diagrams for visual clarity in time-based tests
- Prefer scheduler.run() over manual flush() for consistency
- Test error paths and edge cases explicitly
- Verify subscription cleanup to prevent memory leaks
- Use mock observers to verify exact call sequences
- Keep tests deterministic - avoid real timers
Common Testing Pitfalls:
- Mixing real time with virtual time - leads to flaky tests
- Not handling asynchronous completion properly (use done() or async/await)
- Forgetting to unsubscribe in tests - causes test pollution
- Over-mocking - test too far from real behavior
- Not testing unsubscription and cleanup logic
Section 14 Summary
- TestScheduler provides virtual time for synchronous testing of async operations
- Marble diagrams offer visual, declarative syntax for test expectations
- cold() creates new emissions per subscription; hot() shares emissions
- expectObservable() verifies emissions; expectSubscriptions() verifies lifecycle
- flush() executes all scheduled virtual time; getMessages() inspects raw notifications
- Mock observers and subjects enable precise behavior verification