Schedulers and Execution Context Control

1. asyncScheduler for Macro-task Scheduling

Feature Syntax Description Use Case
asyncScheduler observeOn(asyncScheduler) Schedules work using setTimeout (macro-task queue) Async operations, preventing blocking
schedule() asyncScheduler.schedule(work, delay) Schedule callback with optional delay (ms) Delayed execution
Default scheduler Used by interval, timer, delay operators Default for time-based operators Standard async timing

Example: asyncScheduler for async execution

import { asyncScheduler, of, range } from 'rxjs';
import { observeOn } from 'rxjs/operators';

// Direct scheduler usage
asyncScheduler.schedule(() => console.log('Executed async'));
console.log('Synchronous');
// Output:
// Synchronous
// Executed async

// With delay
asyncScheduler.schedule(
  () => console.log('Delayed'),
  1000 // 1 second delay
);

// Recursive scheduling
asyncScheduler.schedule(function(state) {
  console.log('Count:', state);
  if (state < 3) {
    this.schedule(state + 1, 500); // Reschedule after 500ms
  }
}, 0, 0); // Initial delay 0, initial state 0

// observeOn with asyncScheduler
range(1, 5).pipe(
  observeOn(asyncScheduler)
).subscribe(val => console.log('Async:', val));

console.log('After subscribe');
// Output:
// After subscribe
// Async: 1
// Async: 2
// Async: 3
// Async: 4
// Async: 5

// Practical: Prevent blocking UI
of(1, 2, 3, 4, 5).pipe(
  observeOn(asyncScheduler),
  map(n => {
    // Heavy computation
    const result = expensiveCalculation(n);
    return result;
  })
).subscribe(result => updateUI(result));

// Practical: Debounced batch processing
const batchProcessor = {
  items: [],
  scheduledWork: null,
  
  addItem(item) {
    this.items.push(item);
    
    if (this.scheduledWork) {
      this.scheduledWork.unsubscribe();
    }
    
    this.scheduledWork = asyncScheduler.schedule(() => {
      this.processBatch(this.items);
      this.items = [];
    }, 100);
  },
  
  processBatch(items) {
    console.log('Processing batch:', items);
  }
};

batchProcessor.addItem('a');
batchProcessor.addItem('b');
batchProcessor.addItem('c');
// Processes all 3 together after 100ms

// Practical: Animation frame alternative for non-visual tasks
const heavyWork$ = range(1, 1000).pipe(
  observeOn(asyncScheduler), // Yield to event loop
  map(n => n * n)
);

// Practical: Testing with scheduler
import { TestScheduler } from 'rxjs/testing';

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, expectObservable }) => {
  const source$ = cold('--a--b--c|');
  const expected =     '--a--b--c|';
  
  expectObservable(
    source$.pipe(observeOn(asyncScheduler))
  ).toBe(expected);
});
Note: asyncScheduler uses setTimeout which runs in the macro-task queue after current call stack clears. Default for most time-based RxJS operators.

2. queueScheduler for Synchronous Scheduling

Feature Syntax Description Execution Model
queueScheduler observeOn(queueScheduler) Schedules work synchronously in FIFO queue Immediate, blocking execution
Iteration control Prevents recursive stack overflow Trampolines recursive operations Flattens call stack
Default for repeat, retry operators Used for synchronous repetition Sequential operations

Example: queueScheduler for synchronous execution

import { queueScheduler, of, range } from 'rxjs';
import { observeOn } from 'rxjs/operators';

// Synchronous execution
console.log('Before');
queueScheduler.schedule(() => console.log('Queued work'));
console.log('After');
// Output:
// Before
// Queued work
// After

// Compare async vs queue scheduler
console.log('Start');

of(1, 2, 3).pipe(
  observeOn(asyncScheduler)
).subscribe(val => console.log('Async:', val));

of(1, 2, 3).pipe(
  observeOn(queueScheduler)
).subscribe(val => console.log('Queue:', val));

console.log('End');
// Output:
// Start
// Queue: 1
// Queue: 2
// Queue: 3
// End
// Async: 1
// Async: 2
// Async: 3

// Prevent stack overflow with recursive operations
queueScheduler.schedule(function(state) {
  console.log('Count:', state);
  if (state < 10000) {
    this.schedule(state + 1); // No delay, queued synchronously
  }
}, 0, 0);
// Completes without stack overflow

// Without scheduler - stack overflow risk
function recursiveSync(n) {
  console.log(n);
  if (n < 10000) {
    recursiveSync(n + 1); // Stack overflow!
  }
}

// Practical: Synchronous stream processing
const processSync$ = range(1, 100).pipe(
  observeOn(queueScheduler),
  map(n => n * 2)
);

processSync$.subscribe(val => {
  // All values processed before next line
  console.log(val);
});
console.log('Processing complete');

// Practical: State machine transitions
class StateMachine {
  private state = 'idle';
  
  transition(event: string) {
    queueScheduler.schedule(() => {
      this.state = this.getNextState(this.state, event);
      console.log('State:', this.state);
    });
  }
  
  getNextState(current: string, event: string): string {
    // State transition logic
    return `${current}->${event}`;
  }
}

const machine = new StateMachine();
machine.transition('start');
machine.transition('process');
machine.transition('complete');
// All transitions execute synchronously in order

// Practical: Immediate event processing
const events$ = of('click', 'hover', 'focus').pipe(
  observeOn(queueScheduler),
  tap(event => console.log('Processing:', event))
);

console.log('Before subscription');
events$.subscribe();
console.log('After subscription');
// Output:
// Before subscription
// Processing: click
// Processing: hover
// Processing: focus
// After subscription

// Practical: Testing synchronous behavior
const syncTest$ = range(1, 3).pipe(
  observeOn(queueScheduler)
);

const results = [];
syncTest$.subscribe(val => results.push(val));

console.log(results); // [1, 2, 3] immediately available
Warning: queueScheduler executes synchronously and blocks. Use for trampolining recursive operations or when immediate execution is required.

3. animationFrameScheduler for Browser Animation

Feature Syntax Description Timing
animationFrameScheduler observeOn(animationFrameScheduler) Schedules work using requestAnimationFrame ~60fps, synced with browser repaint
Frame timing Executes before next browser paint Optimal for DOM animations and visual updates 16.67ms intervals (60Hz)
Performance Pauses when tab inactive Battery-friendly, respects browser optimization Efficient rendering

Example: animationFrameScheduler for smooth animations

import { animationFrameScheduler, interval, fromEvent } from 'rxjs';
import { observeOn, map, takeWhile } from 'rxjs/operators';

// Schedule on animation frame
animationFrameScheduler.schedule(() => {
  console.log('Frame tick');
});

// Smooth animation loop
interval(0, animationFrameScheduler).pipe(
  takeWhile(frame => frame < 60) // 1 second at 60fps
).subscribe(frame => {
  const progress = frame / 60;
  updateAnimation(progress);
});

// Practical: Smooth element movement
const moveElement$ = interval(0, animationFrameScheduler).pipe(
  map(frame => frame * 2), // 2px per frame
  takeWhile(position => position < 500)
);

moveElement$.subscribe(position => {
  element.style.left = position + 'px';
});

// Practical: Easing animation
function animate(element, property, from, to, duration) {
  const start = Date.now();
  
  return interval(0, animationFrameScheduler).pipe(
    map(() => {
      const elapsed = Date.now() - start;
      const progress = Math.min(elapsed / duration, 1);
      return progress;
    }),
    map(progress => {
      // Ease-in-out function
      const eased = progress < 0.5
        ? 2 * progress * progress
        : 1 - Math.pow(-2 * progress + 2, 2) / 2;
      return from + (to - from) * eased;
    }),
    takeWhile((_, index) => Date.now() - start < duration, true)
  ).subscribe(value => {
    element.style[property] = value + 'px';
  });
}

animate(box, 'left', 0, 500, 1000); // Animate left from 0 to 500px in 1s

// Practical: Parallax scrolling
fromEvent(window, 'scroll').pipe(
  observeOn(animationFrameScheduler),
  map(() => window.scrollY)
).subscribe(scrollY => {
  background.style.transform = `translateY(${scrollY * 0.5}px)`;
  foreground.style.transform = `translateY(${scrollY * 0.8}px)`;
});

// Practical: Canvas animation
const canvas = document.getElementById('canvas');
const ctx = canvas.getContext('2d');

interval(0, animationFrameScheduler).subscribe(frame => {
  // Clear canvas
  ctx.clearRect(0, 0, canvas.width, canvas.height);
  
  // Draw animated content
  const x = (frame % canvas.width);
  const y = Math.sin(frame * 0.05) * 50 + canvas.height / 2;
  
  ctx.fillStyle = 'blue';
  ctx.fillRect(x, y, 20, 20);
});

// Practical: Drag and drop with smooth updates
const mouseMove$ = fromEvent(document, 'mousemove');

const drag$ = fromEvent(element, 'mousedown').pipe(
  switchMap(() => mouseMove$.pipe(
    observeOn(animationFrameScheduler), // Smooth updates
    takeUntil(fromEvent(document, 'mouseup'))
  ))
);

drag$.subscribe(event => {
  element.style.left = event.clientX + 'px';
  element.style.top = event.clientY + 'px';
});

// Practical: Progress bar animation
function animateProgressBar(target) {
  const startWidth = progressBar.offsetWidth;
  const targetWidth = target;
  const duration = 500;
  const start = Date.now();
  
  interval(0, animationFrameScheduler).pipe(
    map(() => (Date.now() - start) / duration),
    takeWhile(progress => progress <= 1, true),
    map(progress => startWidth + (targetWidth - startWidth) * progress)
  ).subscribe(width => {
    progressBar.style.width = width + '%';
  });
}

// Practical: Game loop
const gameLoop$ = interval(0, animationFrameScheduler);

gameLoop$.subscribe(() => {
  updateGameState();
  detectCollisions();
  renderGame();
});

// Performance monitoring
let frameCount = 0;
let lastTime = Date.now();

interval(0, animationFrameScheduler).subscribe(() => {
  frameCount++;
  const now = Date.now();
  
  if (now - lastTime >= 1000) {
    console.log('FPS:', frameCount);
    frameCount = 0;
    lastTime = now;
  }
});
Note: Use animationFrameScheduler for all DOM animations and visual updates. Automatically pauses when tab is inactive, saving battery and CPU.

4. asapScheduler for Micro-task Scheduling

Feature Syntax Description Queue Priority
asapScheduler observeOn(asapScheduler) Schedules work in micro-task queue (like Promise.then) Before macro-tasks, after current stack
Implementation Uses setImmediate (Node.js) or Promise.resolve Platform-specific micro-task scheduling Faster than setTimeout
Use case High-priority async work before next macro-task Critical updates, Promise-like behavior Minimal delay execution

Example: asapScheduler for micro-task execution

import { asapScheduler, asyncScheduler, of } from 'rxjs';
import { observeOn } from 'rxjs/operators';

// Execution order demonstration
console.log('1. Synchronous start');

asapScheduler.schedule(() => console.log('3. ASAP (micro-task)'));

asyncScheduler.schedule(() => console.log('4. Async (macro-task)'));

Promise.resolve().then(() => console.log('2. Promise (micro-task)'));

console.log('1b. Synchronous end');

// Output:
// 1. Synchronous start
// 1b. Synchronous end
// 2. Promise (micro-task)
// 3. ASAP (micro-task)
// 4. Async (macro-task)

// Compare schedulers
of(1, 2, 3).pipe(observeOn(asapScheduler))
  .subscribe(val => console.log('ASAP:', val));

of(1, 2, 3).pipe(observeOn(asyncScheduler))
  .subscribe(val => console.log('Async:', val));

console.log('After subscriptions');
// Output:
// After subscriptions
// ASAP: 1, 2, 3
// Async: 1, 2, 3

// Practical: Critical state updates
class StateManager {
  private state = { count: 0 };
  
  updateState(updates: any) {
    asapScheduler.schedule(() => {
      this.state = { ...this.state, ...updates };
      this.notifySubscribers();
    });
  }
  
  notifySubscribers() {
    // High-priority notification before next event loop
  }
}

// Practical: Promise integration
const promiseValue$ = from(Promise.resolve(42)).pipe(
  observeOn(asapScheduler)
);

// Executes in micro-task queue, similar timing to Promise

// Practical: Immediate DOM updates (before paint)
const criticalUpdate$ = of(newData).pipe(
  observeOn(asapScheduler),
  tap(data => {
    // Update DOM before next paint cycle
    element.textContent = data.value;
  })
);

// Practical: High-priority event processing
fromEvent(button, 'click').pipe(
  observeOn(asapScheduler),
  tap(() => {
    // Process click with minimal delay
    handleCriticalClick();
  })
).subscribe();

// Practical: React-like state batching
class ComponentState {
  private pendingUpdates = [];
  
  setState(update: any) {
    this.pendingUpdates.push(update);
    
    asapScheduler.schedule(() => {
      if (this.pendingUpdates.length > 0) {
        const merged = Object.assign({}, ...this.pendingUpdates);
        this.pendingUpdates = [];
        this.applyState(merged);
      }
    });
  }
  
  applyState(state: any) {
    // Apply batched updates
  }
}

// Practical: Minimal latency data processing
const realtime$ = websocketMessages$.pipe(
  observeOn(asapScheduler), // Process ASAP
  map(msg => processMessage(msg))
);

// Practical: Testing with micro-task timing
const test$ = of('value').pipe(
  observeOn(asapScheduler)
);

let result;
test$.subscribe(val => result = val);

// result is still undefined here (async)
setTimeout(() => {
  console.log(result); // 'value' available in next tick
}, 0);

// Practical: Error boundary with priority
const withErrorHandling$ = source$.pipe(
  observeOn(asapScheduler),
  catchError(err => {
    // Handle error with high priority
    logError(err);
    return of(fallbackValue);
  })
);

5. observeOn and subscribeOn for Context Control

Operator Syntax Description Affects
observeOn observeOn(scheduler) Controls execution context for downstream operators and subscription Emission delivery to observers
subscribeOn subscribeOn(scheduler) Controls execution context for subscription (source creation) Subscribe call and source setup
Placement observeOn: affects below, subscribeOn: affects entire chain observeOn can be used multiple times at different points Different scope of influence

Example: observeOn vs subscribeOn control

import { of, asyncScheduler, queueScheduler } from 'rxjs';
import { observeOn, subscribeOn, tap } from 'rxjs/operators';

// observeOn affects downstream
of(1, 2, 3).pipe(
  tap(val => console.log('Before observeOn:', val)), // Sync
  observeOn(asyncScheduler),
  tap(val => console.log('After observeOn:', val)) // Async
).subscribe();

console.log('Subscribe completed');
// Output:
// Before observeOn: 1
// Before observeOn: 2
// Before observeOn: 3
// Subscribe completed
// After observeOn: 1
// After observeOn: 2
// After observeOn: 3

// subscribeOn affects subscription
of(1, 2, 3).pipe(
  tap(val => console.log('Source:', val)),
  subscribeOn(asyncScheduler)
).subscribe(val => console.log('Observer:', val));

console.log('After subscribe call');
// Output:
// After subscribe call
// Source: 1
// Observer: 1
// Source: 2
// Observer: 2
// Source: 3
// Observer: 3

// Multiple observeOn operators
of(1, 2, 3).pipe(
  tap(val => console.log('1. Sync:', val)),
  observeOn(asyncScheduler),
  tap(val => console.log('2. Async:', val)),
  observeOn(queueScheduler),
  tap(val => console.log('3. Queue:', val))
).subscribe();

// Practical: Heavy computation offload
const heavyWork$ = range(1, 1000).pipe(
  subscribeOn(asyncScheduler), // Don't block subscription
  map(n => {
    // Heavy computation
    return complexCalculation(n);
  }),
  observeOn(asyncScheduler), // Don't block observer
);

heavyWork$.subscribe(result => {
  updateUI(result);
});

// Practical: Thread control pattern
const data$ = fetchData().pipe(
  subscribeOn(asyncScheduler), // Fetch on async
  map(processData),
  observeOn(animationFrameScheduler), // Update UI on animation frame
);

data$.subscribe(result => {
  renderToDOM(result);
});

// Practical: Testing with schedulers
import { TestScheduler } from 'rxjs/testing';

const scheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

scheduler.run(({ cold, expectObservable }) => {
  const source$ = cold('--a--b--c|').pipe(
    observeOn(scheduler) // Use test scheduler
  );
  
  expectObservable(source$).toBe('--a--b--c|');
});

// Practical: Event handling with context control
fromEvent(button, 'click').pipe(
  observeOn(asapScheduler), // Handle ASAP
  switchMap(() => httpRequest$.pipe(
    subscribeOn(asyncScheduler) // HTTP on async
  )),
  observeOn(animationFrameScheduler), // Render on RAF
).subscribe(data => {
  updateView(data);
});

// Practical: Web Worker integration
const workerObservable$ = new Observable(observer => {
  const worker = new Worker('worker.js');
  
  worker.onmessage = (event) => observer.next(event.data);
  worker.onerror = (error) => observer.error(error);
  
  return () => worker.terminate();
}).pipe(
  subscribeOn(asyncScheduler),
  observeOn(asyncScheduler)
);

// Practical: Batch processing with timing control
range(1, 10000).pipe(
  subscribeOn(queueScheduler), // Synchronous iteration
  bufferCount(100),
  observeOn(asyncScheduler), // Async batch processing
  concatMap(batch => processBatch(batch))
).subscribe();

// Practical: Form validation
const input$ = fromEvent(inputElement, 'input').pipe(
  map(event => event.target.value),
  observeOn(asyncScheduler), // Don't block typing
  debounceTime(300),
  switchMap(value => validate(value).pipe(
    subscribeOn(asyncScheduler)
  ))
).subscribe(validationResult => {
  displayErrors(validationResult);
});
Note: Use observeOn to control when emissions reach observers. Use subscribeOn to control subscription execution. observeOn is most commonly used.

6. VirtualTimeScheduler for Testing and Simulation

Feature Syntax Description Use Case
VirtualTimeScheduler new VirtualTimeScheduler() Simulates passage of time without actual delays Testing, time travel debugging
TestScheduler new TestScheduler(assertDeepEqual) Built on VirtualTimeScheduler for marble testing RxJS testing framework
flush() scheduler.flush() Execute all scheduled work immediately Fast-forward time

Example: VirtualTimeScheduler for testing

import { VirtualTimeScheduler, TestScheduler } from 'rxjs';
import { interval, timer } from 'rxjs';
import { take, map } from 'rxjs/operators';

// Basic VirtualTimeScheduler usage
const virtualScheduler = new VirtualTimeScheduler();

const source$ = interval(1000, virtualScheduler).pipe(take(3));

const results = [];
source$.subscribe(val => results.push(val));

// No actual time has passed yet
console.log('Before flush:', results); // []

// Fast-forward through all scheduled work
virtualScheduler.flush();

console.log('After flush:', results); // [0, 1, 2]

// TestScheduler for marble testing
const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
  // Marble syntax: - = 10ms, | = complete, # = error
  const source$ = cold('--a--b--c|');
  const expected =     '--a--b--c|';
  
  expectObservable(source$).toBe(expected);
});

// Testing time-based operators
testScheduler.run(({ cold, expectObservable }) => {
  const source$ = cold('a-b-c|').pipe(
    debounceTime(20) // 2 frames
  );
  const expected =    '----c|';
  
  expectObservable(source$).toBe(expected);
});

// Testing with values
testScheduler.run(({ cold, expectObservable }) => {
  const source$ = cold('--a--b--c|', { a: 1, b: 2, c: 3 }).pipe(
    map(x => x * 10)
  );
  const expected =     '--a--b--c|';
  const values = { a: 10, b: 20, c: 30 };
  
  expectObservable(source$).toBe(expected, values);
});

// Testing subscriptions
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
  const source$ = cold('--a--b--c|');
  const subs =         '^--------!';
  
  expectObservable(source$).toBe('--a--b--c|');
  expectSubscriptions(source$.subscriptions).toBe(subs);
});

// Testing async operations instantly
testScheduler.run(({ cold, expectObservable }) => {
  const source$ = timer(1000, testScheduler).pipe(
    take(3),
    map(x => x * 10)
  );
  
  // 1000ms becomes 100 frames in test
  const expected = '1s a 999ms b 999ms (c|)';
  const values = { a: 0, b: 10, c: 20 };
  
  expectObservable(source$).toBe(expected, values);
});

// Practical: Testing debounced search
testScheduler.run(({ cold, expectObservable }) => {
  const input$ = cold('a-b-c---|');
  const search$ = input$.pipe(
    debounceTime(20, testScheduler),
    switchMap(term => cold('--r|', { r: `result:${term}` }))
  );
  
  const expected = '------r-|';
  const values = { r: 'result:c' };
  
  expectObservable(search$).toBe(expected, values);
});

// Practical: Testing retry logic
testScheduler.run(({ cold, expectObservable }) => {
  const source$ = cold('--#', null, new Error('fail')).pipe(
    retry(2)
  );
  
  const expected = '--#--#--#';
  
  expectObservable(source$).toBe(expected, null, new Error('fail'));
});

// Practical: Testing complex timing
testScheduler.run(({ cold, hot, expectObservable }) => {
  const source1$ = hot('-a--b--c|');
  const source2$ = hot('--1--2--3|');
  
  const result$ = combineLatest([source1$, source2$]);
  
  const expected = '--x-yz-w-|';
  const values = {
    x: ['a', '1'],
    y: ['b', '1'],
    z: ['b', '2'],
    w: ['c', '3']
  };
  
  expectObservable(result$).toBe(expected, values);
});

// Practical: Testing HTTP with delays
testScheduler.run(({ cold, expectObservable }) => {
  const mockHttp = (url: string) => {
    return cold('---r|', { r: { data: url } });
  };
  
  const request$ = of('/api/users').pipe(
    delay(10, testScheduler),
    switchMap(url => mockHttp(url))
  );
  
  const expected = '----------(r|)';
  const values = { r: { data: '/api/users' } };
  
  expectObservable(request$).toBe(expected, values);
});

// Marble diagram reference:
// '-' = 10ms time frame
// '|' = completion
// '#' = error
// 'a' = emission with value 'a'
// '()' = grouping (simultaneous emissions)
// '^' = subscription start
// '!' = unsubscription
Note: TestScheduler enables marble testing - visual representation of observable streams over time. Essential for testing async RxJS code without actual delays.

Section 8 Summary

  • asyncScheduler uses setTimeout (macro-task) - default for time-based operators
  • queueScheduler executes synchronously in FIFO queue - prevents stack overflow
  • animationFrameScheduler uses requestAnimationFrame - optimal for DOM animations at 60fps
  • asapScheduler uses micro-task queue (Promise-like) - high priority async execution
  • observeOn controls emission delivery, subscribeOn controls subscription execution
  • TestScheduler enables marble testing with virtual time - test async code instantly