Schedulers and Execution Context Control
1. asyncScheduler for Macro-task Scheduling
| Feature | Syntax | Description | Use Case |
|---|---|---|---|
| asyncScheduler | observeOn(asyncScheduler) |
Schedules work using setTimeout (macro-task queue) | Async operations, preventing blocking |
| schedule() | asyncScheduler.schedule(work, delay) |
Schedule callback with optional delay (ms) | Delayed execution |
| Default scheduler | Used by interval, timer, delay operators | Default for time-based operators | Standard async timing |
Example: asyncScheduler for async execution
import { asyncScheduler, of, range } from 'rxjs';
import { observeOn } from 'rxjs/operators';
// Direct scheduler usage
asyncScheduler.schedule(() => console.log('Executed async'));
console.log('Synchronous');
// Output:
// Synchronous
// Executed async
// With delay
asyncScheduler.schedule(
() => console.log('Delayed'),
1000 // 1 second delay
);
// Recursive scheduling
asyncScheduler.schedule(function(state) {
console.log('Count:', state);
if (state < 3) {
this.schedule(state + 1, 500); // Reschedule after 500ms
}
}, 0, 0); // Initial delay 0, initial state 0
// observeOn with asyncScheduler
range(1, 5).pipe(
observeOn(asyncScheduler)
).subscribe(val => console.log('Async:', val));
console.log('After subscribe');
// Output:
// After subscribe
// Async: 1
// Async: 2
// Async: 3
// Async: 4
// Async: 5
// Practical: Prevent blocking UI
of(1, 2, 3, 4, 5).pipe(
observeOn(asyncScheduler),
map(n => {
// Heavy computation
const result = expensiveCalculation(n);
return result;
})
).subscribe(result => updateUI(result));
// Practical: Debounced batch processing
const batchProcessor = {
items: [],
scheduledWork: null,
addItem(item) {
this.items.push(item);
if (this.scheduledWork) {
this.scheduledWork.unsubscribe();
}
this.scheduledWork = asyncScheduler.schedule(() => {
this.processBatch(this.items);
this.items = [];
}, 100);
},
processBatch(items) {
console.log('Processing batch:', items);
}
};
batchProcessor.addItem('a');
batchProcessor.addItem('b');
batchProcessor.addItem('c');
// Processes all 3 together after 100ms
// Practical: Animation frame alternative for non-visual tasks
const heavyWork$ = range(1, 1000).pipe(
observeOn(asyncScheduler), // Yield to event loop
map(n => n * n)
);
// Practical: Testing with scheduler
import { TestScheduler } from 'rxjs/testing';
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--a--b--c|');
const expected = '--a--b--c|';
expectObservable(
source$.pipe(observeOn(asyncScheduler))
).toBe(expected);
});
Note: asyncScheduler uses
setTimeout which runs in the macro-task queue after
current call stack clears. Default for most time-based RxJS operators.
2. queueScheduler for Synchronous Scheduling
| Feature | Syntax | Description | Execution Model |
|---|---|---|---|
| queueScheduler | observeOn(queueScheduler) |
Schedules work synchronously in FIFO queue | Immediate, blocking execution |
| Iteration control | Prevents recursive stack overflow | Trampolines recursive operations | Flattens call stack |
| Default for | repeat, retry operators | Used for synchronous repetition | Sequential operations |
Example: queueScheduler for synchronous execution
import { queueScheduler, of, range } from 'rxjs';
import { observeOn } from 'rxjs/operators';
// Synchronous execution
console.log('Before');
queueScheduler.schedule(() => console.log('Queued work'));
console.log('After');
// Output:
// Before
// Queued work
// After
// Compare async vs queue scheduler
console.log('Start');
of(1, 2, 3).pipe(
observeOn(asyncScheduler)
).subscribe(val => console.log('Async:', val));
of(1, 2, 3).pipe(
observeOn(queueScheduler)
).subscribe(val => console.log('Queue:', val));
console.log('End');
// Output:
// Start
// Queue: 1
// Queue: 2
// Queue: 3
// End
// Async: 1
// Async: 2
// Async: 3
// Prevent stack overflow with recursive operations
queueScheduler.schedule(function(state) {
console.log('Count:', state);
if (state < 10000) {
this.schedule(state + 1); // No delay, queued synchronously
}
}, 0, 0);
// Completes without stack overflow
// Without scheduler - stack overflow risk
function recursiveSync(n) {
console.log(n);
if (n < 10000) {
recursiveSync(n + 1); // Stack overflow!
}
}
// Practical: Synchronous stream processing
const processSync$ = range(1, 100).pipe(
observeOn(queueScheduler),
map(n => n * 2)
);
processSync$.subscribe(val => {
// All values processed before next line
console.log(val);
});
console.log('Processing complete');
// Practical: State machine transitions
class StateMachine {
private state = 'idle';
transition(event: string) {
queueScheduler.schedule(() => {
this.state = this.getNextState(this.state, event);
console.log('State:', this.state);
});
}
getNextState(current: string, event: string): string {
// State transition logic
return `${current}->${event}`;
}
}
const machine = new StateMachine();
machine.transition('start');
machine.transition('process');
machine.transition('complete');
// All transitions execute synchronously in order
// Practical: Immediate event processing
const events$ = of('click', 'hover', 'focus').pipe(
observeOn(queueScheduler),
tap(event => console.log('Processing:', event))
);
console.log('Before subscription');
events$.subscribe();
console.log('After subscription');
// Output:
// Before subscription
// Processing: click
// Processing: hover
// Processing: focus
// After subscription
// Practical: Testing synchronous behavior
const syncTest$ = range(1, 3).pipe(
observeOn(queueScheduler)
);
const results = [];
syncTest$.subscribe(val => results.push(val));
console.log(results); // [1, 2, 3] immediately available
Warning: queueScheduler executes synchronously and blocks. Use for trampolining recursive
operations or when immediate execution is required.
3. animationFrameScheduler for Browser Animation
| Feature | Syntax | Description | Timing |
|---|---|---|---|
| animationFrameScheduler | observeOn(animationFrameScheduler) |
Schedules work using requestAnimationFrame | ~60fps, synced with browser repaint |
| Frame timing | Executes before next browser paint | Optimal for DOM animations and visual updates | 16.67ms intervals (60Hz) |
| Performance | Pauses when tab inactive | Battery-friendly, respects browser optimization | Efficient rendering |
Example: animationFrameScheduler for smooth animations
import { animationFrameScheduler, interval, fromEvent } from 'rxjs';
import { observeOn, map, takeWhile } from 'rxjs/operators';
// Schedule on animation frame
animationFrameScheduler.schedule(() => {
console.log('Frame tick');
});
// Smooth animation loop
interval(0, animationFrameScheduler).pipe(
takeWhile(frame => frame < 60) // 1 second at 60fps
).subscribe(frame => {
const progress = frame / 60;
updateAnimation(progress);
});
// Practical: Smooth element movement
const moveElement$ = interval(0, animationFrameScheduler).pipe(
map(frame => frame * 2), // 2px per frame
takeWhile(position => position < 500)
);
moveElement$.subscribe(position => {
element.style.left = position + 'px';
});
// Practical: Easing animation
function animate(element, property, from, to, duration) {
const start = Date.now();
return interval(0, animationFrameScheduler).pipe(
map(() => {
const elapsed = Date.now() - start;
const progress = Math.min(elapsed / duration, 1);
return progress;
}),
map(progress => {
// Ease-in-out function
const eased = progress < 0.5
? 2 * progress * progress
: 1 - Math.pow(-2 * progress + 2, 2) / 2;
return from + (to - from) * eased;
}),
takeWhile((_, index) => Date.now() - start < duration, true)
).subscribe(value => {
element.style[property] = value + 'px';
});
}
animate(box, 'left', 0, 500, 1000); // Animate left from 0 to 500px in 1s
// Practical: Parallax scrolling
fromEvent(window, 'scroll').pipe(
observeOn(animationFrameScheduler),
map(() => window.scrollY)
).subscribe(scrollY => {
background.style.transform = `translateY(${scrollY * 0.5}px)`;
foreground.style.transform = `translateY(${scrollY * 0.8}px)`;
});
// Practical: Canvas animation
const canvas = document.getElementById('canvas');
const ctx = canvas.getContext('2d');
interval(0, animationFrameScheduler).subscribe(frame => {
// Clear canvas
ctx.clearRect(0, 0, canvas.width, canvas.height);
// Draw animated content
const x = (frame % canvas.width);
const y = Math.sin(frame * 0.05) * 50 + canvas.height / 2;
ctx.fillStyle = 'blue';
ctx.fillRect(x, y, 20, 20);
});
// Practical: Drag and drop with smooth updates
const mouseMove$ = fromEvent(document, 'mousemove');
const drag$ = fromEvent(element, 'mousedown').pipe(
switchMap(() => mouseMove$.pipe(
observeOn(animationFrameScheduler), // Smooth updates
takeUntil(fromEvent(document, 'mouseup'))
))
);
drag$.subscribe(event => {
element.style.left = event.clientX + 'px';
element.style.top = event.clientY + 'px';
});
// Practical: Progress bar animation
function animateProgressBar(target) {
const startWidth = progressBar.offsetWidth;
const targetWidth = target;
const duration = 500;
const start = Date.now();
interval(0, animationFrameScheduler).pipe(
map(() => (Date.now() - start) / duration),
takeWhile(progress => progress <= 1, true),
map(progress => startWidth + (targetWidth - startWidth) * progress)
).subscribe(width => {
progressBar.style.width = width + '%';
});
}
// Practical: Game loop
const gameLoop$ = interval(0, animationFrameScheduler);
gameLoop$.subscribe(() => {
updateGameState();
detectCollisions();
renderGame();
});
// Performance monitoring
let frameCount = 0;
let lastTime = Date.now();
interval(0, animationFrameScheduler).subscribe(() => {
frameCount++;
const now = Date.now();
if (now - lastTime >= 1000) {
console.log('FPS:', frameCount);
frameCount = 0;
lastTime = now;
}
});
Note: Use animationFrameScheduler for all DOM animations and visual updates. Automatically
pauses when tab is inactive, saving battery and CPU.
4. asapScheduler for Micro-task Scheduling
| Feature | Syntax | Description | Queue Priority |
|---|---|---|---|
| asapScheduler | observeOn(asapScheduler) |
Schedules work in micro-task queue (like Promise.then) | Before macro-tasks, after current stack |
| Implementation | Uses setImmediate (Node.js) or Promise.resolve | Platform-specific micro-task scheduling | Faster than setTimeout |
| Use case | High-priority async work before next macro-task | Critical updates, Promise-like behavior | Minimal delay execution |
Example: asapScheduler for micro-task execution
import { asapScheduler, asyncScheduler, of } from 'rxjs';
import { observeOn } from 'rxjs/operators';
// Execution order demonstration
console.log('1. Synchronous start');
asapScheduler.schedule(() => console.log('3. ASAP (micro-task)'));
asyncScheduler.schedule(() => console.log('4. Async (macro-task)'));
Promise.resolve().then(() => console.log('2. Promise (micro-task)'));
console.log('1b. Synchronous end');
// Output:
// 1. Synchronous start
// 1b. Synchronous end
// 2. Promise (micro-task)
// 3. ASAP (micro-task)
// 4. Async (macro-task)
// Compare schedulers
of(1, 2, 3).pipe(observeOn(asapScheduler))
.subscribe(val => console.log('ASAP:', val));
of(1, 2, 3).pipe(observeOn(asyncScheduler))
.subscribe(val => console.log('Async:', val));
console.log('After subscriptions');
// Output:
// After subscriptions
// ASAP: 1, 2, 3
// Async: 1, 2, 3
// Practical: Critical state updates
class StateManager {
private state = { count: 0 };
updateState(updates: any) {
asapScheduler.schedule(() => {
this.state = { ...this.state, ...updates };
this.notifySubscribers();
});
}
notifySubscribers() {
// High-priority notification before next event loop
}
}
// Practical: Promise integration
const promiseValue$ = from(Promise.resolve(42)).pipe(
observeOn(asapScheduler)
);
// Executes in micro-task queue, similar timing to Promise
// Practical: Immediate DOM updates (before paint)
const criticalUpdate$ = of(newData).pipe(
observeOn(asapScheduler),
tap(data => {
// Update DOM before next paint cycle
element.textContent = data.value;
})
);
// Practical: High-priority event processing
fromEvent(button, 'click').pipe(
observeOn(asapScheduler),
tap(() => {
// Process click with minimal delay
handleCriticalClick();
})
).subscribe();
// Practical: React-like state batching
class ComponentState {
private pendingUpdates = [];
setState(update: any) {
this.pendingUpdates.push(update);
asapScheduler.schedule(() => {
if (this.pendingUpdates.length > 0) {
const merged = Object.assign({}, ...this.pendingUpdates);
this.pendingUpdates = [];
this.applyState(merged);
}
});
}
applyState(state: any) {
// Apply batched updates
}
}
// Practical: Minimal latency data processing
const realtime$ = websocketMessages$.pipe(
observeOn(asapScheduler), // Process ASAP
map(msg => processMessage(msg))
);
// Practical: Testing with micro-task timing
const test$ = of('value').pipe(
observeOn(asapScheduler)
);
let result;
test$.subscribe(val => result = val);
// result is still undefined here (async)
setTimeout(() => {
console.log(result); // 'value' available in next tick
}, 0);
// Practical: Error boundary with priority
const withErrorHandling$ = source$.pipe(
observeOn(asapScheduler),
catchError(err => {
// Handle error with high priority
logError(err);
return of(fallbackValue);
})
);
5. observeOn and subscribeOn for Context Control
| Operator | Syntax | Description | Affects |
|---|---|---|---|
| observeOn | observeOn(scheduler) |
Controls execution context for downstream operators and subscription | Emission delivery to observers |
| subscribeOn | subscribeOn(scheduler) |
Controls execution context for subscription (source creation) | Subscribe call and source setup |
| Placement | observeOn: affects below, subscribeOn: affects entire chain | observeOn can be used multiple times at different points | Different scope of influence |
Example: observeOn vs subscribeOn control
import { of, asyncScheduler, queueScheduler } from 'rxjs';
import { observeOn, subscribeOn, tap } from 'rxjs/operators';
// observeOn affects downstream
of(1, 2, 3).pipe(
tap(val => console.log('Before observeOn:', val)), // Sync
observeOn(asyncScheduler),
tap(val => console.log('After observeOn:', val)) // Async
).subscribe();
console.log('Subscribe completed');
// Output:
// Before observeOn: 1
// Before observeOn: 2
// Before observeOn: 3
// Subscribe completed
// After observeOn: 1
// After observeOn: 2
// After observeOn: 3
// subscribeOn affects subscription
of(1, 2, 3).pipe(
tap(val => console.log('Source:', val)),
subscribeOn(asyncScheduler)
).subscribe(val => console.log('Observer:', val));
console.log('After subscribe call');
// Output:
// After subscribe call
// Source: 1
// Observer: 1
// Source: 2
// Observer: 2
// Source: 3
// Observer: 3
// Multiple observeOn operators
of(1, 2, 3).pipe(
tap(val => console.log('1. Sync:', val)),
observeOn(asyncScheduler),
tap(val => console.log('2. Async:', val)),
observeOn(queueScheduler),
tap(val => console.log('3. Queue:', val))
).subscribe();
// Practical: Heavy computation offload
const heavyWork$ = range(1, 1000).pipe(
subscribeOn(asyncScheduler), // Don't block subscription
map(n => {
// Heavy computation
return complexCalculation(n);
}),
observeOn(asyncScheduler), // Don't block observer
);
heavyWork$.subscribe(result => {
updateUI(result);
});
// Practical: Thread control pattern
const data$ = fetchData().pipe(
subscribeOn(asyncScheduler), // Fetch on async
map(processData),
observeOn(animationFrameScheduler), // Update UI on animation frame
);
data$.subscribe(result => {
renderToDOM(result);
});
// Practical: Testing with schedulers
import { TestScheduler } from 'rxjs/testing';
const scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--a--b--c|').pipe(
observeOn(scheduler) // Use test scheduler
);
expectObservable(source$).toBe('--a--b--c|');
});
// Practical: Event handling with context control
fromEvent(button, 'click').pipe(
observeOn(asapScheduler), // Handle ASAP
switchMap(() => httpRequest$.pipe(
subscribeOn(asyncScheduler) // HTTP on async
)),
observeOn(animationFrameScheduler), // Render on RAF
).subscribe(data => {
updateView(data);
});
// Practical: Web Worker integration
const workerObservable$ = new Observable(observer => {
const worker = new Worker('worker.js');
worker.onmessage = (event) => observer.next(event.data);
worker.onerror = (error) => observer.error(error);
return () => worker.terminate();
}).pipe(
subscribeOn(asyncScheduler),
observeOn(asyncScheduler)
);
// Practical: Batch processing with timing control
range(1, 10000).pipe(
subscribeOn(queueScheduler), // Synchronous iteration
bufferCount(100),
observeOn(asyncScheduler), // Async batch processing
concatMap(batch => processBatch(batch))
).subscribe();
// Practical: Form validation
const input$ = fromEvent(inputElement, 'input').pipe(
map(event => event.target.value),
observeOn(asyncScheduler), // Don't block typing
debounceTime(300),
switchMap(value => validate(value).pipe(
subscribeOn(asyncScheduler)
))
).subscribe(validationResult => {
displayErrors(validationResult);
});
Note: Use observeOn to control when emissions reach observers.
Use subscribeOn to control subscription execution. observeOn is most commonly
used.
6. VirtualTimeScheduler for Testing and Simulation
| Feature | Syntax | Description | Use Case |
|---|---|---|---|
| VirtualTimeScheduler | new VirtualTimeScheduler() |
Simulates passage of time without actual delays | Testing, time travel debugging |
| TestScheduler | new TestScheduler(assertDeepEqual) |
Built on VirtualTimeScheduler for marble testing | RxJS testing framework |
| flush() | scheduler.flush() |
Execute all scheduled work immediately | Fast-forward time |
Example: VirtualTimeScheduler for testing
import { VirtualTimeScheduler, TestScheduler } from 'rxjs';
import { interval, timer } from 'rxjs';
import { take, map } from 'rxjs/operators';
// Basic VirtualTimeScheduler usage
const virtualScheduler = new VirtualTimeScheduler();
const source$ = interval(1000, virtualScheduler).pipe(take(3));
const results = [];
source$.subscribe(val => results.push(val));
// No actual time has passed yet
console.log('Before flush:', results); // []
// Fast-forward through all scheduled work
virtualScheduler.flush();
console.log('After flush:', results); // [0, 1, 2]
// TestScheduler for marble testing
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
// Marble syntax: - = 10ms, | = complete, # = error
const source$ = cold('--a--b--c|');
const expected = '--a--b--c|';
expectObservable(source$).toBe(expected);
});
// Testing time-based operators
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('a-b-c|').pipe(
debounceTime(20) // 2 frames
);
const expected = '----c|';
expectObservable(source$).toBe(expected);
});
// Testing with values
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--a--b--c|', { a: 1, b: 2, c: 3 }).pipe(
map(x => x * 10)
);
const expected = '--a--b--c|';
const values = { a: 10, b: 20, c: 30 };
expectObservable(source$).toBe(expected, values);
});
// Testing subscriptions
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source$ = cold('--a--b--c|');
const subs = '^--------!';
expectObservable(source$).toBe('--a--b--c|');
expectSubscriptions(source$.subscriptions).toBe(subs);
});
// Testing async operations instantly
testScheduler.run(({ cold, expectObservable }) => {
const source$ = timer(1000, testScheduler).pipe(
take(3),
map(x => x * 10)
);
// 1000ms becomes 100 frames in test
const expected = '1s a 999ms b 999ms (c|)';
const values = { a: 0, b: 10, c: 20 };
expectObservable(source$).toBe(expected, values);
});
// Practical: Testing debounced search
testScheduler.run(({ cold, expectObservable }) => {
const input$ = cold('a-b-c---|');
const search$ = input$.pipe(
debounceTime(20, testScheduler),
switchMap(term => cold('--r|', { r: `result:${term}` }))
);
const expected = '------r-|';
const values = { r: 'result:c' };
expectObservable(search$).toBe(expected, values);
});
// Practical: Testing retry logic
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--#', null, new Error('fail')).pipe(
retry(2)
);
const expected = '--#--#--#';
expectObservable(source$).toBe(expected, null, new Error('fail'));
});
// Practical: Testing complex timing
testScheduler.run(({ cold, hot, expectObservable }) => {
const source1$ = hot('-a--b--c|');
const source2$ = hot('--1--2--3|');
const result$ = combineLatest([source1$, source2$]);
const expected = '--x-yz-w-|';
const values = {
x: ['a', '1'],
y: ['b', '1'],
z: ['b', '2'],
w: ['c', '3']
};
expectObservable(result$).toBe(expected, values);
});
// Practical: Testing HTTP with delays
testScheduler.run(({ cold, expectObservable }) => {
const mockHttp = (url: string) => {
return cold('---r|', { r: { data: url } });
};
const request$ = of('/api/users').pipe(
delay(10, testScheduler),
switchMap(url => mockHttp(url))
);
const expected = '----------(r|)';
const values = { r: { data: '/api/users' } };
expectObservable(request$).toBe(expected, values);
});
// Marble diagram reference:
// '-' = 10ms time frame
// '|' = completion
// '#' = error
// 'a' = emission with value 'a'
// '()' = grouping (simultaneous emissions)
// '^' = subscription start
// '!' = unsubscription
Note: TestScheduler enables marble testing - visual representation of observable streams over
time. Essential for testing async RxJS code without actual delays.
Section 8 Summary
- asyncScheduler uses setTimeout (macro-task) - default for time-based operators
- queueScheduler executes synchronously in FIFO queue - prevents stack overflow
- animationFrameScheduler uses requestAnimationFrame - optimal for DOM animations at 60fps
- asapScheduler uses micro-task queue (Promise-like) - high priority async execution
- observeOn controls emission delivery, subscribeOn controls subscription execution
- TestScheduler enables marble testing with virtual time - test async code instantly