Advanced Operators and Higher-Order Observables

1. switchMap for Latest Value Switching

Feature Syntax Description Behavior
switchMap switchMap(project: (value, index) => ObservableInput) Maps to inner observable, cancels previous inner observable on new emission Only latest inner observable emits
Cancellation Automatically unsubscribes from previous inner observable Prevents memory leaks and race conditions Cancel-on-new pattern
Use case HTTP requests, type-ahead search, latest value only When only most recent result matters Discard outdated responses

Example: switchMap for request cancellation

import { fromEvent, interval } from 'rxjs';
import { switchMap, map, ajax } from 'rxjs';

// Basic switchMap - cancels previous
interval(1000).pipe(
  switchMap(n => interval(300).pipe(
    map(i => `Outer: ${n}, Inner: ${i}`)
  ))
).subscribe(console.log);
// Each outer emission cancels previous inner observable

// Practical: Type-ahead search with cancellation
const searchBox = document.getElementById('search');

fromEvent(searchBox, 'input').pipe(
  map(event => event.target.value),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(term => {
    if (term.length < 2) return of([]);
    
    return ajax.getJSON(`/api/search?q=${term}`);
    // Previous search cancelled if user types again
  })
).subscribe(results => {
  displayResults(results);
});

// Practical: Auto-save with latest value only
const input$ = fromEvent(inputElement, 'input').pipe(
  map(e => e.target.value),
  debounceTime(500),
  switchMap(value => 
    ajax.post('/api/save', { value }).pipe(
      catchError(err => {
        console.error('Save failed:', err);
        return of({ error: true });
      })
    )
  )
);

input$.subscribe(response => {
  if (!response.error) {
    showSaveSuccess();
  }
});

// Practical: Dynamic data refresh
const refreshButton$ = fromEvent(refreshBtn, 'click');

refreshButton$.pipe(
  switchMap(() => ajax.getJSON('/api/data').pipe(
    catchError(err => of({ error: err }))
  ))
).subscribe(data => updateView(data));

// Practical: Tab switching
const tabClick$ = fromEvent(document, 'click').pipe(
  filter(e => e.target.classList.contains('tab')),
  map(e => e.target.dataset.tabId)
);

tabClick$.pipe(
  switchMap(tabId => ajax.getJSON(`/api/tabs/${tabId}/content`))
).subscribe(content => renderTabContent(content));

// Practical: Cascading dropdowns
const countrySelect$ = fromEvent(countryDropdown, 'change').pipe(
  map(e => e.target.value)
);

countrySelect$.pipe(
  switchMap(countryId => 
    ajax.getJSON(`/api/countries/${countryId}/states`)
  )
).subscribe(states => {
  populateStateDropdown(states);
});

// Avoid race conditions
// BAD: without switchMap
searchInput$.pipe(
  debounceTime(300),
  mergeMap(term => ajax.getJSON(`/api/search?q=${term}`))
  // Old requests can return after new ones!
).subscribe(results => display(results));

// GOOD: with switchMap
searchInput$.pipe(
  debounceTime(300),
  switchMap(term => ajax.getJSON(`/api/search?q=${term}`))
  // Old requests cancelled, only latest matters
).subscribe(results => display(results));

// Practical: Polling with cancellation
const startPolling$ = fromEvent(startBtn, 'click');
const stopPolling$ = fromEvent(stopBtn, 'click');

startPolling$.pipe(
  switchMap(() => 
    interval(5000).pipe(
      switchMap(() => ajax.getJSON('/api/status')),
      takeUntil(stopPolling$)
    )
  )
).subscribe(status => updateStatus(status));

// Practical: Real-time validation
fromEvent(emailInput, 'input').pipe(
  map(e => e.target.value),
  debounceTime(500),
  distinctUntilChanged(),
  switchMap(email => 
    ajax.post('/api/validate-email', { email }).pipe(
      map(response => ({ valid: response.valid, email })),
      catchError(() => of({ valid: false, email }))
    )
  )
).subscribe(result => {
  if (result.valid) {
    showValidIcon();
  } else {
    showInvalidIcon();
  }
});
Note: switchMap is perfect for search/autocomplete - automatically cancels outdated requests. Use when only the latest result matters.

2. mergeMap for Concurrent Inner Observable Processing

Feature Syntax Description Concurrency
mergeMap mergeMap(project, concurrent?) Maps to inner observable, subscribes to all concurrently All inner observables run in parallel
Concurrent limit mergeMap(project, 3) Limit number of concurrent inner subscriptions Control resource usage
Alias flatMap (deprecated name) Same as mergeMap Use mergeMap instead

Example: mergeMap for parallel processing

import { of, from, interval } from 'rxjs';
import { mergeMap, map, delay } from 'rxjs';

// Basic mergeMap - all run concurrently
of(1, 2, 3).pipe(
  mergeMap(n => 
    interval(1000).pipe(
      map(i => `n=${n}, i=${i}`),
      take(3)
    )
  )
).subscribe(console.log);
// All 3 inner observables run simultaneously

// Practical: Parallel HTTP requests
const userIds = [1, 2, 3, 4, 5];

from(userIds).pipe(
  mergeMap(id => ajax.getJSON(`/api/users/${id}`))
).subscribe(user => {
  console.log('User loaded:', user);
  // All 5 requests made in parallel
});

// Practical: Concurrent limit for resource control
from(userIds).pipe(
  mergeMap(
    id => ajax.getJSON(`/api/users/${id}`),
    3 // Max 3 concurrent requests
  )
).subscribe(user => processUser(user));

// Practical: File upload queue
const fileList = [file1, file2, file3, file4, file5];

from(fileList).pipe(
  mergeMap(
    file => uploadFile(file).pipe(
      map(response => ({ file, response })),
      catchError(err => of({ file, error: err }))
    ),
    2 // Upload 2 files at a time
  )
).subscribe(result => {
  if (result.error) {
    console.error('Upload failed:', result.file.name);
  } else {
    console.log('Uploaded:', result.file.name);
  }
});

// Practical: Batch processing
const items = Array.from({ length: 100 }, (_, i) => i);

from(items).pipe(
  mergeMap(
    item => processItem(item).pipe(
      delay(Math.random() * 1000) // Simulate variable processing time
    ),
    10 // Process 10 items concurrently
  )
).subscribe(result => console.log('Processed:', result));

// Practical: Multi-source data aggregation
const dataSources = [
  '/api/source1',
  '/api/source2',
  '/api/source3'
];

from(dataSources).pipe(
  mergeMap(url => ajax.getJSON(url).pipe(
    catchError(err => of({ error: err, url }))
  ))
).subscribe(data => {
  if (!data.error) {
    aggregateData(data);
  }
});

// Practical: Click handler with async operation
fromEvent(buttons, 'click').pipe(
  mergeMap(event => {
    const button = event.target;
    button.disabled = true;
    
    return ajax.post('/api/action', { id: button.dataset.id }).pipe(
      finalize(() => button.disabled = false)
    );
  })
).subscribe(response => {
  showNotification('Action completed');
});

// Practical: WebSocket message handling
const messages$ = new WebSocketSubject('ws://api.example.com');

messages$.pipe(
  mergeMap(message => {
    // Process each message independently
    return processMessage(message).pipe(
      catchError(err => {
        console.error('Message processing failed:', err);
        return of(null);
      })
    );
  })
).subscribe();

// Practical: Rate-limited API calls
const requests = Array.from({ length: 50 }, (_, i) => i);

from(requests).pipe(
  mergeMap(
    id => ajax.getJSON(`/api/items/${id}`).pipe(
      delay(100) // Add delay to respect rate limits
    ),
    5 // Max 5 concurrent
  )
).subscribe(item => storeItem(item));

// mergeMap vs switchMap comparison
// mergeMap: All searches complete
searchInput$.pipe(
  mergeMap(term => search(term))
).subscribe(results => {
  // Might show old results after new ones
});

// switchMap: Only latest search completes
searchInput$.pipe(
  switchMap(term => search(term))
).subscribe(results => {
  // Always shows latest results
});
Note: Use mergeMap when all operations should complete. Specify concurrent limit to prevent overwhelming server/client resources.

3. concatMap for Sequential Inner Observable Processing

Feature Syntax Description Ordering
concatMap concatMap(project) Maps to inner observable, waits for each to complete before starting next Sequential, FIFO order preserved
Queuing Buffers outer emissions while inner observable active Processes all emissions in order No cancellation, no concurrency
Use case Ordered operations, sequential API calls, queue processing When order matters Guaranteed sequence

Example: concatMap for sequential processing

import { of, from, interval } from 'rxjs';
import { concatMap, map, delay } from 'rxjs';

// Basic concatMap - sequential execution
of(1, 2, 3).pipe(
  concatMap(n => 
    of(n).pipe(
      delay(1000),
      map(val => val * 10)
    )
  )
).subscribe(console.log);
// Output (1s apart): 10, 20, 30

// Practical: Sequential API updates
const updates = [
  { id: 1, value: 'first' },
  { id: 2, value: 'second' },
  { id: 3, value: 'third' }
];

from(updates).pipe(
  concatMap(update => 
    ajax.post('/api/update', update).pipe(
      tap(() => console.log('Updated:', update.id))
    )
  )
).subscribe(
  response => console.log('Response:', response),
  err => console.error('Error:', err),
  () => console.log('All updates complete')
);

// Practical: Animation sequence
const animationSteps = [
  { element: box1, property: 'left', to: 200 },
  { element: box2, property: 'top', to: 100 },
  { element: box3, property: 'opacity', to: 0 }
];

from(animationSteps).pipe(
  concatMap(step => 
    animate(step.element, step.property, step.to, 500)
  )
).subscribe({
  complete: () => console.log('Animation sequence complete')
});

// Practical: Queue processor
class TaskQueue {
  private tasks$ = new Subject<Task>();
  
  constructor() {
    this.tasks$.pipe(
      concatMap(task => this.executeTask(task))
    ).subscribe(result => {
      console.log('Task completed:', result);
    });
  }
  
  enqueue(task: Task) {
    this.tasks$.next(task);
  }
  
  private executeTask(task: Task): Observable<any> {
    return of(task).pipe(
      delay(task.duration),
      map(() => ({ id: task.id, status: 'completed' }))
    );
  }
}

const queue = new TaskQueue();
queue.enqueue({ id: 1, duration: 1000 });
queue.enqueue({ id: 2, duration: 500 });
queue.enqueue({ id: 3, duration: 800 });
// Executes in order: 1, then 2, then 3

// Practical: Form submission queue
const submitButton$ = fromEvent(submitBtn, 'click');

submitButton$.pipe(
  concatMap(() => {
    const formData = getFormData();
    return ajax.post('/api/submit', formData).pipe(
      tap(() => showSuccess()),
      catchError(err => {
        showError(err);
        return of({ error: err });
      })
    );
  })
).subscribe();
// Multiple rapid clicks processed sequentially

// Practical: Database migration steps
const migrationSteps = [
  () => createTables(),
  () => insertSeedData(),
  () => createIndexes(),
  () => updateConstraints()
];

from(migrationSteps).pipe(
  concatMap(step => from(step()))
).subscribe({
  next: result => console.log('Step completed:', result),
  error: err => console.error('Migration failed:', err),
  complete: () => console.log('Migration complete')
});

// Practical: Log file writing
const logEntries$ = new Subject<string>();

logEntries$.pipe(
  concatMap(entry => 
    ajax.post('/api/log', { entry, timestamp: Date.now() })
  )
).subscribe();

// Ensures logs written in order
logEntries$.next('User logged in');
logEntries$.next('User viewed page');
logEntries$.next('User clicked button');

// Practical: Sequential file processing
const files = ['file1.txt', 'file2.txt', 'file3.txt'];

from(files).pipe(
  concatMap(filename => 
    readFile(filename).pipe(
      concatMap(content => processContent(content)),
      concatMap(processed => writeFile(filename + '.out', processed))
    )
  )
).subscribe({
  complete: () => console.log('All files processed')
});

// Order guarantee comparison
// concatMap: Guaranteed order
clicks$.pipe(
  concatMap(() => saveData())
).subscribe(); // Saves execute in click order

// mergeMap: No order guarantee
clicks$.pipe(
  mergeMap(() => saveData())
).subscribe(); // Faster save might complete first

// switchMap: Only last click
clicks$.pipe(
  switchMap(() => saveData())
).subscribe(); // Only last click's save completes
Warning: concatMap queues all emissions - can cause memory issues if inner observables are slow and outer emits rapidly. Consider mergeMap with concurrency limit instead.

4. exhaustMap for Ignore-while-busy Pattern

Feature Syntax Description Behavior
exhaustMap exhaustMap(project) Maps to inner observable, ignores new emissions while inner is active Drop new emissions while busy
Ignore pattern Only accepts new outer emission after inner completes Prevents overlapping operations First-wins strategy
Use case Login buttons, form submission, prevent double-click When operation shouldn't overlap Debounce for async ops

Example: exhaustMap to prevent duplicate operations

import { fromEvent, of } from 'rxjs';
import { exhaustMap, delay, tap } from 'rxjs';

// Basic exhaustMap - ignores while busy
const clicks$ = fromEvent(button, 'click');

clicks$.pipe(
  exhaustMap(() => 
    of('Processing...').pipe(
      delay(2000),
      tap(() => console.log('Done'))
    )
  )
).subscribe(console.log);
// Rapid clicks ignored while processing

// Practical: Login button (prevent double submission)
const loginButton$ = fromEvent(loginBtn, 'click');

loginButton$.pipe(
  exhaustMap(() => {
    loginBtn.disabled = true;
    showSpinner();
    
    return ajax.post('/api/login', {
      username: usernameInput.value,
      password: passwordInput.value
    }).pipe(
      tap(response => {
        if (response.success) {
          redirectToDashboard();
        } else {
          showError('Invalid credentials');
        }
      }),
      catchError(err => {
        showError('Login failed');
        return of({ error: err });
      }),
      finalize(() => {
        loginBtn.disabled = false;
        hideSpinner();
      })
    );
  })
).subscribe();
// Multiple clicks ignored while login in progress

// Practical: Save button (prevent concurrent saves)
const saveButton$ = fromEvent(saveBtn, 'click');

saveButton$.pipe(
  exhaustMap(() => {
    const data = getCurrentFormData();
    
    return ajax.post('/api/save', data).pipe(
      tap(() => showSuccessMessage()),
      catchError(err => {
        showErrorMessage(err);
        return of(null);
      })
    );
  })
).subscribe();
// Ignores save attempts while save in progress

// Practical: Refresh button
const refreshButton$ = fromEvent(refreshBtn, 'click');

refreshButton$.pipe(
  exhaustMap(() => {
    showRefreshSpinner();
    
    return ajax.getJSON('/api/data').pipe(
      tap(data => updateView(data)),
      finalize(() => hideRefreshSpinner())
    );
  })
).subscribe();
// Ignores refresh clicks while refresh in progress

// Practical: Infinite scroll (prevent overlapping loads)
const scroll$ = fromEvent(window, 'scroll').pipe(
  filter(() => isNearBottom()),
  debounceTime(200)
);

scroll$.pipe(
  exhaustMap(() => 
    ajax.getJSON(`/api/items?page=${currentPage++}`).pipe(
      tap(items => appendItems(items))
    )
  )
).subscribe();
// Won't load next page until current page loaded

// Practical: File download (prevent duplicate downloads)
const downloadLinks$ = fromEvent(document, 'click').pipe(
  filter(e => e.target.classList.contains('download-link')),
  map(e => e.target.href)
);

downloadLinks$.pipe(
  exhaustMap(url => {
    showDownloadProgress();
    
    return ajax({
      url,
      method: 'GET',
      responseType: 'blob'
    }).pipe(
      tap(response => saveFile(response.response)),
      finalize(() => hideDownloadProgress())
    );
  })
).subscribe();

// Practical: Modal open (prevent opening multiple)
const openModalButton$ = fromEvent(openBtn, 'click');

openModalButton$.pipe(
  exhaustMap(() => {
    return showModal().pipe(
      // Modal returns observable that completes when closed
      tap(() => console.log('Modal opened')),
      switchMap(() => modalClosed$)
    );
  })
).subscribe(() => console.log('Modal closed'));
// Can't open new modal while one is open

// Practical: Payment processing
const payButton$ = fromEvent(payBtn, 'click');

payButton$.pipe(
  exhaustMap(() => {
    const paymentData = getPaymentData();
    payBtn.disabled = true;
    
    return ajax.post('/api/payment', paymentData).pipe(
      tap(response => {
        if (response.success) {
          showPaymentSuccess();
        } else {
          showPaymentError();
        }
      }),
      catchError(err => {
        showPaymentError(err);
        return of({ error: err });
      }),
      finalize(() => payBtn.disabled = false)
    );
  })
).subscribe();
// Critical: prevents double payment

// Comparison with other operators
// exhaustMap: Ignore new while busy
loginBtn$.pipe(
  exhaustMap(() => login())
); // Clicks during login ignored

// switchMap: Cancel old, start new
searchInput$.pipe(
  switchMap(term => search(term))
); // Old search cancelled

// concatMap: Queue all
submitBtn$.pipe(
  concatMap(() => submit())
); // All clicks queued and processed

// mergeMap: Run all concurrently
uploadBtn$.pipe(
  mergeMap(() => upload())
); // All clicks trigger concurrent uploads
Note: exhaustMap is perfect for login/submit buttons - ignores rapid clicks while operation in progress. Prevents duplicate submissions naturally.

5. expand for Recursive Observable Generation

Feature Syntax Description Pattern
expand expand(project, concurrent?, scheduler?) Recursively projects each emission to observable, subscribes to results Breadth-first expansion
Recursion Each emission fed back as input to projection function Continues until EMPTY returned Tree traversal, pagination
Termination Return EMPTY to stop recursion Must have base case to prevent infinite loop Conditional completion

Example: expand for recursive operations

import { of, EMPTY } from 'rxjs';
import { expand, map, delay, take } from 'rxjs';

// Basic expand - countdown
of(5).pipe(
  expand(n => n > 0 ? of(n - 1).pipe(delay(1000)) : EMPTY)
).subscribe(console.log);
// Output (1s apart): 5, 4, 3, 2, 1, 0

// Practical: Paginated API loading
function loadAllPages(page = 1) {
  return ajax.getJSON(`/api/items?page=${page}`).pipe(
    expand(response => 
      response.hasMore 
        ? ajax.getJSON(`/api/items?page=${response.nextPage}`)
        : EMPTY
    )
  );
}

loadAllPages().subscribe(
  response => {
    appendItems(response.items);
    console.log('Loaded page:', response.page);
  },
  err => console.error('Error:', err),
  () => console.log('All pages loaded')
);

// Practical: Exponential backoff retry
function retryWithBackoff(source$, maxRetries = 5) {
  return of({ attempt: 0 }).pipe(
    expand(state => 
      state.attempt < maxRetries
        ? source$.pipe(
            catchError(err => {
              const delay = Math.pow(2, state.attempt) * 1000;
              console.log(`Retry ${state.attempt + 1} in ${delay}ms`);
              return of({ attempt: state.attempt + 1 }).pipe(
                delay(delay)
              );
            })
          )
        : EMPTY
    ),
    filter(result => !result.attempt) // Only emit actual results
  );
}

retryWithBackoff(
  ajax.getJSON('/api/unreliable')
).subscribe(data => console.log('Success:', data));

// Practical: Tree traversal (file system)
interface FileNode {
  name: string;
  type: 'file' | 'folder';
  children?: FileNode[];
}

function traverseFileTree(node: FileNode): Observable<FileNode> {
  return of(node).pipe(
    expand(n => 
      n.type === 'folder' && n.children
        ? from(n.children)
        : EMPTY
    )
  );
}

const root = {
  name: 'root',
  type: 'folder',
  children: [
    { name: 'file1.txt', type: 'file' },
    {
      name: 'subfolder',
      type: 'folder',
      children: [
        { name: 'file2.txt', type: 'file' }
      ]
    }
  ]
};

traverseFileTree(root).subscribe(node => {
  console.log('Found:', node.name);
});

// Practical: Polling with increasing intervals
of(1000).pipe(
  expand(interval => {
    return ajax.getJSON('/api/status').pipe(
      delay(interval),
      map(() => Math.min(interval * 1.5, 30000)) // Max 30s
    );
  }),
  take(10) // Limit total polls
).subscribe();

// Practical: Fibonacci sequence generator
of([0, 1]).pipe(
  expand(([prev, curr]) => 
    curr < 100
      ? of([curr, prev + curr]).pipe(delay(100))
      : EMPTY
  ),
  map(([_, curr]) => curr)
).subscribe(n => console.log('Fibonacci:', n));

// Practical: Recursive API data collection
function fetchUserWithFriends(userId: number): Observable<User> {
  return ajax.getJSON(`/api/users/${userId}`).pipe(
    expand(user => 
      user.friends && user.friends.length > 0
        ? from(user.friends).pipe(
            mergeMap(friendId => 
              ajax.getJSON(`/api/users/${friendId}`)
            )
          )
        : EMPTY
    ),
    take(50) // Prevent infinite expansion
  );
}

// Practical: Breadth-first search
interface Node {
  id: number;
  neighbors: number[];
}

function bfs(startId: number, graph: Map<number, Node>): Observable<Node> {
  const visited = new Set<number>();
  
  return of(graph.get(startId)).pipe(
    expand(node => {
      visited.add(node.id);
      return from(node.neighbors).pipe(
        filter(id => !visited.has(id)),
        map(id => graph.get(id)),
        filter(n => n !== undefined)
      );
    })
  );
}

// Practical: Dynamic form field generation
of({ level: 1, maxLevel: 5 }).pipe(
  expand(state => 
    state.level < state.maxLevel
      ? ajax.getJSON(`/api/form-fields?level=${state.level}`).pipe(
          tap(fields => renderFormFields(fields)),
          map(() => ({ level: state.level + 1, maxLevel: state.maxLevel }))
        )
      : EMPTY
  )
).subscribe({
  complete: () => console.log('All form fields loaded')
});

// Practical: Power calculation with logging
of(2).pipe(
  expand(n => n < 1024 ? of(n * 2) : EMPTY),
  tap(n => console.log('Power of 2:', n))
).subscribe({
  complete: () => console.log('Complete')
});
// Output: 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024
Warning: Always include a termination condition (return EMPTY) in expand to prevent infinite loops. Use take() or similar as a safety limit.

6. window and windowTime for Grouped Emission Windows

Operator Syntax Description Output
window window(windowBoundaries$) Groups emissions into nested observables based on boundary emissions Observable of Observables
windowTime windowTime(windowTimeSpan, windowCreationInterval?) Groups emissions into time-based windows Observable of Observables
windowCount windowCount(windowSize, startWindowEvery?) Groups emissions into count-based windows Observable of Observables

Example: window operators for grouped processing

import { interval, fromEvent } from 'rxjs';
import { window, windowTime, windowCount, mergeAll, take, map, reduce } from 'rxjs';

// Basic window with boundary observable
const source$ = interval(1000);
const boundary$ = interval(3000);

source$.pipe(
  take(10),
  window(boundary$),
  mergeAll() // Flatten windows
).subscribe(console.log);

// Process each window
source$.pipe(
  take(10),
  window(boundary$),
  map(window$ => window$.pipe(
    reduce((acc, val) => acc + val, 0)
  )),
  mergeAll()
).subscribe(sum => console.log('Window sum:', sum));

// windowTime - time-based grouping
interval(500).pipe(
  take(10),
  windowTime(2000), // 2-second windows
  map((window$, index) => {
    console.log('Window', index, 'opened');
    return window$.pipe(
      reduce((acc, val) => [...acc, val], [])
    );
  }),
  mergeAll()
).subscribe(values => {
  console.log('Window values:', values);
});

// Practical: Batch click processing
const clicks$ = fromEvent(document, 'click');

clicks$.pipe(
  windowTime(1000), // 1-second windows
  map(window$ => window$.pipe(
    reduce((count) => count + 1, 0)
  )),
  mergeAll()
).subscribe(count => {
  if (count > 0) {
    console.log(`${count} clicks in last second`);
  }
});

// windowCount - count-based grouping
interval(1000).pipe(
  take(10),
  windowCount(3), // Groups of 3
  map((window$, index) => {
    console.log('Batch', index);
    return window$.pipe(
      reduce((acc, val) => [...acc, val], [])
    );
  }),
  mergeAll()
).subscribe(batch => {
  console.log('Processing batch:', batch);
});

// Practical: Real-time metrics aggregation
const metrics$ = interval(100).pipe(
  map(() => ({
    cpu: Math.random() * 100,
    memory: Math.random() * 100,
    timestamp: Date.now()
  }))
);

metrics$.pipe(
  windowTime(5000), // 5-second windows
  map(window$ => window$.pipe(
    reduce((acc, metric) => ({
      avgCpu: (acc.avgCpu + metric.cpu) / 2,
      avgMemory: (acc.avgMemory + metric.memory) / 2,
      count: acc.count + 1
    }), { avgCpu: 0, avgMemory: 0, count: 0 })
  )),
  mergeAll()
).subscribe(stats => {
  console.log('5s stats:', stats);
  sendToMonitoring(stats);
});

// Practical: Message batching for bulk API
const messages$ = new Subject<Message>();

messages$.pipe(
  windowTime(2000), // Batch every 2 seconds
  mergeMap(window$ => window$.pipe(
    reduce((acc, msg) => [...acc, msg], [])
  )),
  filter(batch => batch.length > 0),
  mergeMap(batch => 
    ajax.post('/api/messages/bulk', { messages: batch })
  )
).subscribe(() => {
  console.log('Batch sent');
});

// Send individual messages
messages$.next({ text: 'Hello' });
messages$.next({ text: 'World' });
// Automatically batched and sent every 2s

// Practical: Analytics event batching
const analyticsEvents$ = new Subject<AnalyticsEvent>();

analyticsEvents$.pipe(
  windowTime(10000), // 10-second windows
  mergeMap(window$ => window$.pipe(
    reduce((events, event) => [...events, event], [])
  )),
  filter(events => events.length > 0)
).subscribe(events => {
  sendAnalyticsBatch(events);
  console.log(`Sent ${events.length} analytics events`);
});

// Practical: Sliding window for moving average
interval(1000).pipe(
  take(20),
  map(() => Math.random() * 100),
  windowTime(5000, 1000), // 5s window, new window every 1s
  mergeMap(window$ => window$.pipe(
    reduce((acc, val) => [...acc, val], []),
    map(values => {
      const sum = values.reduce((a, b) => a + b, 0);
      return sum / values.length;
    })
  ))
).subscribe(avg => {
  console.log('5-second moving average:', avg.toFixed(2));
});

// Practical: Request rate limiting
const requests$ = new Subject<Request>();

requests$.pipe(
  windowTime(1000), // 1-second windows
  mergeMap(window$ => window$.pipe(
    take(10) // Max 10 requests per second
  ))
).subscribe(request => {
  processRequest(request);
});

// Practical: Grouped error tracking
const errors$ = new Subject<Error>();

errors$.pipe(
  windowTime(60000), // 1-minute windows
  mergeMap(window$ => window$.pipe(
    reduce((acc, error) => {
      acc[error.type] = (acc[error.type] || 0) + 1;
      return acc;
    }, {})
  ))
).subscribe(errorCounts => {
  console.log('Error counts (last minute):', errorCounts);
  if (Object.values(errorCounts).some(count => count > 10)) {
    triggerAlert('High error rate detected');
  }
});
Note: window operators return Observable of Observables (higher-order). Use mergeAll, switchAll, or map+mergeAll to process windows. Similar to buffer but with nested observables.

Section 9 Summary

  • switchMap cancels previous inner observable - perfect for search/autocomplete (only latest matters)
  • mergeMap runs all inner observables concurrently - use concurrent parameter to limit parallelism
  • concatMap queues and processes sequentially in order - when sequence matters
  • exhaustMap ignores new emissions while busy - ideal for login/submit buttons
  • expand recursively applies projection - pagination, tree traversal, always return EMPTY to terminate
  • window/windowTime groups emissions into nested observables - batch processing, metrics aggregation