Advanced Operators and Higher-Order Observables
1. switchMap for Latest Value Switching
| Feature | Syntax | Description | Behavior |
|---|---|---|---|
| switchMap | switchMap(project: (value, index) => ObservableInput) |
Maps to inner observable, cancels previous inner observable on new emission | Only latest inner observable emits |
| Cancellation | Automatically unsubscribes from previous inner observable | Prevents memory leaks and race conditions | Cancel-on-new pattern |
| Use case | HTTP requests, type-ahead search, latest value only | When only most recent result matters | Discard outdated responses |
Example: switchMap for request cancellation
import { fromEvent, interval } from 'rxjs';
import { switchMap, map, ajax } from 'rxjs';
// Basic switchMap - cancels previous
interval(1000).pipe(
switchMap(n => interval(300).pipe(
map(i => `Outer: ${n}, Inner: ${i}`)
))
).subscribe(console.log);
// Each outer emission cancels previous inner observable
// Practical: Type-ahead search with cancellation
const searchBox = document.getElementById('search');
fromEvent(searchBox, 'input').pipe(
map(event => event.target.value),
debounceTime(300),
distinctUntilChanged(),
switchMap(term => {
if (term.length < 2) return of([]);
return ajax.getJSON(`/api/search?q=${term}`);
// Previous search cancelled if user types again
})
).subscribe(results => {
displayResults(results);
});
// Practical: Auto-save with latest value only
const input$ = fromEvent(inputElement, 'input').pipe(
map(e => e.target.value),
debounceTime(500),
switchMap(value =>
ajax.post('/api/save', { value }).pipe(
catchError(err => {
console.error('Save failed:', err);
return of({ error: true });
})
)
)
);
input$.subscribe(response => {
if (!response.error) {
showSaveSuccess();
}
});
// Practical: Dynamic data refresh
const refreshButton$ = fromEvent(refreshBtn, 'click');
refreshButton$.pipe(
switchMap(() => ajax.getJSON('/api/data').pipe(
catchError(err => of({ error: err }))
))
).subscribe(data => updateView(data));
// Practical: Tab switching
const tabClick$ = fromEvent(document, 'click').pipe(
filter(e => e.target.classList.contains('tab')),
map(e => e.target.dataset.tabId)
);
tabClick$.pipe(
switchMap(tabId => ajax.getJSON(`/api/tabs/${tabId}/content`))
).subscribe(content => renderTabContent(content));
// Practical: Cascading dropdowns
const countrySelect$ = fromEvent(countryDropdown, 'change').pipe(
map(e => e.target.value)
);
countrySelect$.pipe(
switchMap(countryId =>
ajax.getJSON(`/api/countries/${countryId}/states`)
)
).subscribe(states => {
populateStateDropdown(states);
});
// Avoid race conditions
// BAD: without switchMap
searchInput$.pipe(
debounceTime(300),
mergeMap(term => ajax.getJSON(`/api/search?q=${term}`))
// Old requests can return after new ones!
).subscribe(results => display(results));
// GOOD: with switchMap
searchInput$.pipe(
debounceTime(300),
switchMap(term => ajax.getJSON(`/api/search?q=${term}`))
// Old requests cancelled, only latest matters
).subscribe(results => display(results));
// Practical: Polling with cancellation
const startPolling$ = fromEvent(startBtn, 'click');
const stopPolling$ = fromEvent(stopBtn, 'click');
startPolling$.pipe(
switchMap(() =>
interval(5000).pipe(
switchMap(() => ajax.getJSON('/api/status')),
takeUntil(stopPolling$)
)
)
).subscribe(status => updateStatus(status));
// Practical: Real-time validation
fromEvent(emailInput, 'input').pipe(
map(e => e.target.value),
debounceTime(500),
distinctUntilChanged(),
switchMap(email =>
ajax.post('/api/validate-email', { email }).pipe(
map(response => ({ valid: response.valid, email })),
catchError(() => of({ valid: false, email }))
)
)
).subscribe(result => {
if (result.valid) {
showValidIcon();
} else {
showInvalidIcon();
}
});
Note: switchMap is perfect for search/autocomplete -
automatically cancels outdated requests. Use when only the latest result matters.
2. mergeMap for Concurrent Inner Observable Processing
| Feature | Syntax | Description | Concurrency |
|---|---|---|---|
| mergeMap | mergeMap(project, concurrent?) |
Maps to inner observable, subscribes to all concurrently | All inner observables run in parallel |
| Concurrent limit | mergeMap(project, 3) |
Limit number of concurrent inner subscriptions | Control resource usage |
| Alias | flatMap (deprecated name) |
Same as mergeMap | Use mergeMap instead |
Example: mergeMap for parallel processing
import { of, from, interval } from 'rxjs';
import { mergeMap, map, delay } from 'rxjs';
// Basic mergeMap - all run concurrently
of(1, 2, 3).pipe(
mergeMap(n =>
interval(1000).pipe(
map(i => `n=${n}, i=${i}`),
take(3)
)
)
).subscribe(console.log);
// All 3 inner observables run simultaneously
// Practical: Parallel HTTP requests
const userIds = [1, 2, 3, 4, 5];
from(userIds).pipe(
mergeMap(id => ajax.getJSON(`/api/users/${id}`))
).subscribe(user => {
console.log('User loaded:', user);
// All 5 requests made in parallel
});
// Practical: Concurrent limit for resource control
from(userIds).pipe(
mergeMap(
id => ajax.getJSON(`/api/users/${id}`),
3 // Max 3 concurrent requests
)
).subscribe(user => processUser(user));
// Practical: File upload queue
const fileList = [file1, file2, file3, file4, file5];
from(fileList).pipe(
mergeMap(
file => uploadFile(file).pipe(
map(response => ({ file, response })),
catchError(err => of({ file, error: err }))
),
2 // Upload 2 files at a time
)
).subscribe(result => {
if (result.error) {
console.error('Upload failed:', result.file.name);
} else {
console.log('Uploaded:', result.file.name);
}
});
// Practical: Batch processing
const items = Array.from({ length: 100 }, (_, i) => i);
from(items).pipe(
mergeMap(
item => processItem(item).pipe(
delay(Math.random() * 1000) // Simulate variable processing time
),
10 // Process 10 items concurrently
)
).subscribe(result => console.log('Processed:', result));
// Practical: Multi-source data aggregation
const dataSources = [
'/api/source1',
'/api/source2',
'/api/source3'
];
from(dataSources).pipe(
mergeMap(url => ajax.getJSON(url).pipe(
catchError(err => of({ error: err, url }))
))
).subscribe(data => {
if (!data.error) {
aggregateData(data);
}
});
// Practical: Click handler with async operation
fromEvent(buttons, 'click').pipe(
mergeMap(event => {
const button = event.target;
button.disabled = true;
return ajax.post('/api/action', { id: button.dataset.id }).pipe(
finalize(() => button.disabled = false)
);
})
).subscribe(response => {
showNotification('Action completed');
});
// Practical: WebSocket message handling
const messages$ = new WebSocketSubject('ws://api.example.com');
messages$.pipe(
mergeMap(message => {
// Process each message independently
return processMessage(message).pipe(
catchError(err => {
console.error('Message processing failed:', err);
return of(null);
})
);
})
).subscribe();
// Practical: Rate-limited API calls
const requests = Array.from({ length: 50 }, (_, i) => i);
from(requests).pipe(
mergeMap(
id => ajax.getJSON(`/api/items/${id}`).pipe(
delay(100) // Add delay to respect rate limits
),
5 // Max 5 concurrent
)
).subscribe(item => storeItem(item));
// mergeMap vs switchMap comparison
// mergeMap: All searches complete
searchInput$.pipe(
mergeMap(term => search(term))
).subscribe(results => {
// Might show old results after new ones
});
// switchMap: Only latest search completes
searchInput$.pipe(
switchMap(term => search(term))
).subscribe(results => {
// Always shows latest results
});
Note: Use mergeMap when all operations should complete. Specify
concurrent limit to prevent overwhelming server/client resources.
3. concatMap for Sequential Inner Observable Processing
| Feature | Syntax | Description | Ordering |
|---|---|---|---|
| concatMap | concatMap(project) |
Maps to inner observable, waits for each to complete before starting next | Sequential, FIFO order preserved |
| Queuing | Buffers outer emissions while inner observable active | Processes all emissions in order | No cancellation, no concurrency |
| Use case | Ordered operations, sequential API calls, queue processing | When order matters | Guaranteed sequence |
Example: concatMap for sequential processing
import { of, from, interval } from 'rxjs';
import { concatMap, map, delay } from 'rxjs';
// Basic concatMap - sequential execution
of(1, 2, 3).pipe(
concatMap(n =>
of(n).pipe(
delay(1000),
map(val => val * 10)
)
)
).subscribe(console.log);
// Output (1s apart): 10, 20, 30
// Practical: Sequential API updates
const updates = [
{ id: 1, value: 'first' },
{ id: 2, value: 'second' },
{ id: 3, value: 'third' }
];
from(updates).pipe(
concatMap(update =>
ajax.post('/api/update', update).pipe(
tap(() => console.log('Updated:', update.id))
)
)
).subscribe(
response => console.log('Response:', response),
err => console.error('Error:', err),
() => console.log('All updates complete')
);
// Practical: Animation sequence
const animationSteps = [
{ element: box1, property: 'left', to: 200 },
{ element: box2, property: 'top', to: 100 },
{ element: box3, property: 'opacity', to: 0 }
];
from(animationSteps).pipe(
concatMap(step =>
animate(step.element, step.property, step.to, 500)
)
).subscribe({
complete: () => console.log('Animation sequence complete')
});
// Practical: Queue processor
class TaskQueue {
private tasks$ = new Subject<Task>();
constructor() {
this.tasks$.pipe(
concatMap(task => this.executeTask(task))
).subscribe(result => {
console.log('Task completed:', result);
});
}
enqueue(task: Task) {
this.tasks$.next(task);
}
private executeTask(task: Task): Observable<any> {
return of(task).pipe(
delay(task.duration),
map(() => ({ id: task.id, status: 'completed' }))
);
}
}
const queue = new TaskQueue();
queue.enqueue({ id: 1, duration: 1000 });
queue.enqueue({ id: 2, duration: 500 });
queue.enqueue({ id: 3, duration: 800 });
// Executes in order: 1, then 2, then 3
// Practical: Form submission queue
const submitButton$ = fromEvent(submitBtn, 'click');
submitButton$.pipe(
concatMap(() => {
const formData = getFormData();
return ajax.post('/api/submit', formData).pipe(
tap(() => showSuccess()),
catchError(err => {
showError(err);
return of({ error: err });
})
);
})
).subscribe();
// Multiple rapid clicks processed sequentially
// Practical: Database migration steps
const migrationSteps = [
() => createTables(),
() => insertSeedData(),
() => createIndexes(),
() => updateConstraints()
];
from(migrationSteps).pipe(
concatMap(step => from(step()))
).subscribe({
next: result => console.log('Step completed:', result),
error: err => console.error('Migration failed:', err),
complete: () => console.log('Migration complete')
});
// Practical: Log file writing
const logEntries$ = new Subject<string>();
logEntries$.pipe(
concatMap(entry =>
ajax.post('/api/log', { entry, timestamp: Date.now() })
)
).subscribe();
// Ensures logs written in order
logEntries$.next('User logged in');
logEntries$.next('User viewed page');
logEntries$.next('User clicked button');
// Practical: Sequential file processing
const files = ['file1.txt', 'file2.txt', 'file3.txt'];
from(files).pipe(
concatMap(filename =>
readFile(filename).pipe(
concatMap(content => processContent(content)),
concatMap(processed => writeFile(filename + '.out', processed))
)
)
).subscribe({
complete: () => console.log('All files processed')
});
// Order guarantee comparison
// concatMap: Guaranteed order
clicks$.pipe(
concatMap(() => saveData())
).subscribe(); // Saves execute in click order
// mergeMap: No order guarantee
clicks$.pipe(
mergeMap(() => saveData())
).subscribe(); // Faster save might complete first
// switchMap: Only last click
clicks$.pipe(
switchMap(() => saveData())
).subscribe(); // Only last click's save completes
Warning: concatMap queues all emissions - can cause memory issues if inner observables are slow
and outer emits rapidly. Consider mergeMap with concurrency limit instead.
4. exhaustMap for Ignore-while-busy Pattern
| Feature | Syntax | Description | Behavior |
|---|---|---|---|
| exhaustMap | exhaustMap(project) |
Maps to inner observable, ignores new emissions while inner is active | Drop new emissions while busy |
| Ignore pattern | Only accepts new outer emission after inner completes | Prevents overlapping operations | First-wins strategy |
| Use case | Login buttons, form submission, prevent double-click | When operation shouldn't overlap | Debounce for async ops |
Example: exhaustMap to prevent duplicate operations
import { fromEvent, of } from 'rxjs';
import { exhaustMap, delay, tap } from 'rxjs';
// Basic exhaustMap - ignores while busy
const clicks$ = fromEvent(button, 'click');
clicks$.pipe(
exhaustMap(() =>
of('Processing...').pipe(
delay(2000),
tap(() => console.log('Done'))
)
)
).subscribe(console.log);
// Rapid clicks ignored while processing
// Practical: Login button (prevent double submission)
const loginButton$ = fromEvent(loginBtn, 'click');
loginButton$.pipe(
exhaustMap(() => {
loginBtn.disabled = true;
showSpinner();
return ajax.post('/api/login', {
username: usernameInput.value,
password: passwordInput.value
}).pipe(
tap(response => {
if (response.success) {
redirectToDashboard();
} else {
showError('Invalid credentials');
}
}),
catchError(err => {
showError('Login failed');
return of({ error: err });
}),
finalize(() => {
loginBtn.disabled = false;
hideSpinner();
})
);
})
).subscribe();
// Multiple clicks ignored while login in progress
// Practical: Save button (prevent concurrent saves)
const saveButton$ = fromEvent(saveBtn, 'click');
saveButton$.pipe(
exhaustMap(() => {
const data = getCurrentFormData();
return ajax.post('/api/save', data).pipe(
tap(() => showSuccessMessage()),
catchError(err => {
showErrorMessage(err);
return of(null);
})
);
})
).subscribe();
// Ignores save attempts while save in progress
// Practical: Refresh button
const refreshButton$ = fromEvent(refreshBtn, 'click');
refreshButton$.pipe(
exhaustMap(() => {
showRefreshSpinner();
return ajax.getJSON('/api/data').pipe(
tap(data => updateView(data)),
finalize(() => hideRefreshSpinner())
);
})
).subscribe();
// Ignores refresh clicks while refresh in progress
// Practical: Infinite scroll (prevent overlapping loads)
const scroll$ = fromEvent(window, 'scroll').pipe(
filter(() => isNearBottom()),
debounceTime(200)
);
scroll$.pipe(
exhaustMap(() =>
ajax.getJSON(`/api/items?page=${currentPage++}`).pipe(
tap(items => appendItems(items))
)
)
).subscribe();
// Won't load next page until current page loaded
// Practical: File download (prevent duplicate downloads)
const downloadLinks$ = fromEvent(document, 'click').pipe(
filter(e => e.target.classList.contains('download-link')),
map(e => e.target.href)
);
downloadLinks$.pipe(
exhaustMap(url => {
showDownloadProgress();
return ajax({
url,
method: 'GET',
responseType: 'blob'
}).pipe(
tap(response => saveFile(response.response)),
finalize(() => hideDownloadProgress())
);
})
).subscribe();
// Practical: Modal open (prevent opening multiple)
const openModalButton$ = fromEvent(openBtn, 'click');
openModalButton$.pipe(
exhaustMap(() => {
return showModal().pipe(
// Modal returns observable that completes when closed
tap(() => console.log('Modal opened')),
switchMap(() => modalClosed$)
);
})
).subscribe(() => console.log('Modal closed'));
// Can't open new modal while one is open
// Practical: Payment processing
const payButton$ = fromEvent(payBtn, 'click');
payButton$.pipe(
exhaustMap(() => {
const paymentData = getPaymentData();
payBtn.disabled = true;
return ajax.post('/api/payment', paymentData).pipe(
tap(response => {
if (response.success) {
showPaymentSuccess();
} else {
showPaymentError();
}
}),
catchError(err => {
showPaymentError(err);
return of({ error: err });
}),
finalize(() => payBtn.disabled = false)
);
})
).subscribe();
// Critical: prevents double payment
// Comparison with other operators
// exhaustMap: Ignore new while busy
loginBtn$.pipe(
exhaustMap(() => login())
); // Clicks during login ignored
// switchMap: Cancel old, start new
searchInput$.pipe(
switchMap(term => search(term))
); // Old search cancelled
// concatMap: Queue all
submitBtn$.pipe(
concatMap(() => submit())
); // All clicks queued and processed
// mergeMap: Run all concurrently
uploadBtn$.pipe(
mergeMap(() => upload())
); // All clicks trigger concurrent uploads
Note: exhaustMap is perfect for login/submit buttons - ignores
rapid clicks while operation in progress. Prevents duplicate submissions naturally.
5. expand for Recursive Observable Generation
| Feature | Syntax | Description | Pattern |
|---|---|---|---|
| expand | expand(project, concurrent?, scheduler?) |
Recursively projects each emission to observable, subscribes to results | Breadth-first expansion |
| Recursion | Each emission fed back as input to projection function | Continues until EMPTY returned | Tree traversal, pagination |
| Termination | Return EMPTY to stop recursion | Must have base case to prevent infinite loop | Conditional completion |
Example: expand for recursive operations
import { of, EMPTY } from 'rxjs';
import { expand, map, delay, take } from 'rxjs';
// Basic expand - countdown
of(5).pipe(
expand(n => n > 0 ? of(n - 1).pipe(delay(1000)) : EMPTY)
).subscribe(console.log);
// Output (1s apart): 5, 4, 3, 2, 1, 0
// Practical: Paginated API loading
function loadAllPages(page = 1) {
return ajax.getJSON(`/api/items?page=${page}`).pipe(
expand(response =>
response.hasMore
? ajax.getJSON(`/api/items?page=${response.nextPage}`)
: EMPTY
)
);
}
loadAllPages().subscribe(
response => {
appendItems(response.items);
console.log('Loaded page:', response.page);
},
err => console.error('Error:', err),
() => console.log('All pages loaded')
);
// Practical: Exponential backoff retry
function retryWithBackoff(source$, maxRetries = 5) {
return of({ attempt: 0 }).pipe(
expand(state =>
state.attempt < maxRetries
? source$.pipe(
catchError(err => {
const delay = Math.pow(2, state.attempt) * 1000;
console.log(`Retry ${state.attempt + 1} in ${delay}ms`);
return of({ attempt: state.attempt + 1 }).pipe(
delay(delay)
);
})
)
: EMPTY
),
filter(result => !result.attempt) // Only emit actual results
);
}
retryWithBackoff(
ajax.getJSON('/api/unreliable')
).subscribe(data => console.log('Success:', data));
// Practical: Tree traversal (file system)
interface FileNode {
name: string;
type: 'file' | 'folder';
children?: FileNode[];
}
function traverseFileTree(node: FileNode): Observable<FileNode> {
return of(node).pipe(
expand(n =>
n.type === 'folder' && n.children
? from(n.children)
: EMPTY
)
);
}
const root = {
name: 'root',
type: 'folder',
children: [
{ name: 'file1.txt', type: 'file' },
{
name: 'subfolder',
type: 'folder',
children: [
{ name: 'file2.txt', type: 'file' }
]
}
]
};
traverseFileTree(root).subscribe(node => {
console.log('Found:', node.name);
});
// Practical: Polling with increasing intervals
of(1000).pipe(
expand(interval => {
return ajax.getJSON('/api/status').pipe(
delay(interval),
map(() => Math.min(interval * 1.5, 30000)) // Max 30s
);
}),
take(10) // Limit total polls
).subscribe();
// Practical: Fibonacci sequence generator
of([0, 1]).pipe(
expand(([prev, curr]) =>
curr < 100
? of([curr, prev + curr]).pipe(delay(100))
: EMPTY
),
map(([_, curr]) => curr)
).subscribe(n => console.log('Fibonacci:', n));
// Practical: Recursive API data collection
function fetchUserWithFriends(userId: number): Observable<User> {
return ajax.getJSON(`/api/users/${userId}`).pipe(
expand(user =>
user.friends && user.friends.length > 0
? from(user.friends).pipe(
mergeMap(friendId =>
ajax.getJSON(`/api/users/${friendId}`)
)
)
: EMPTY
),
take(50) // Prevent infinite expansion
);
}
// Practical: Breadth-first search
interface Node {
id: number;
neighbors: number[];
}
function bfs(startId: number, graph: Map<number, Node>): Observable<Node> {
const visited = new Set<number>();
return of(graph.get(startId)).pipe(
expand(node => {
visited.add(node.id);
return from(node.neighbors).pipe(
filter(id => !visited.has(id)),
map(id => graph.get(id)),
filter(n => n !== undefined)
);
})
);
}
// Practical: Dynamic form field generation
of({ level: 1, maxLevel: 5 }).pipe(
expand(state =>
state.level < state.maxLevel
? ajax.getJSON(`/api/form-fields?level=${state.level}`).pipe(
tap(fields => renderFormFields(fields)),
map(() => ({ level: state.level + 1, maxLevel: state.maxLevel }))
)
: EMPTY
)
).subscribe({
complete: () => console.log('All form fields loaded')
});
// Practical: Power calculation with logging
of(2).pipe(
expand(n => n < 1024 ? of(n * 2) : EMPTY),
tap(n => console.log('Power of 2:', n))
).subscribe({
complete: () => console.log('Complete')
});
// Output: 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024
Warning: Always include a termination condition (return EMPTY) in expand to prevent infinite
loops. Use take() or similar as a safety limit.
6. window and windowTime for Grouped Emission Windows
| Operator | Syntax | Description | Output |
|---|---|---|---|
| window | window(windowBoundaries$) |
Groups emissions into nested observables based on boundary emissions | Observable of Observables |
| windowTime | windowTime(windowTimeSpan, windowCreationInterval?) |
Groups emissions into time-based windows | Observable of Observables |
| windowCount | windowCount(windowSize, startWindowEvery?) |
Groups emissions into count-based windows | Observable of Observables |
Example: window operators for grouped processing
import { interval, fromEvent } from 'rxjs';
import { window, windowTime, windowCount, mergeAll, take, map, reduce } from 'rxjs';
// Basic window with boundary observable
const source$ = interval(1000);
const boundary$ = interval(3000);
source$.pipe(
take(10),
window(boundary$),
mergeAll() // Flatten windows
).subscribe(console.log);
// Process each window
source$.pipe(
take(10),
window(boundary$),
map(window$ => window$.pipe(
reduce((acc, val) => acc + val, 0)
)),
mergeAll()
).subscribe(sum => console.log('Window sum:', sum));
// windowTime - time-based grouping
interval(500).pipe(
take(10),
windowTime(2000), // 2-second windows
map((window$, index) => {
console.log('Window', index, 'opened');
return window$.pipe(
reduce((acc, val) => [...acc, val], [])
);
}),
mergeAll()
).subscribe(values => {
console.log('Window values:', values);
});
// Practical: Batch click processing
const clicks$ = fromEvent(document, 'click');
clicks$.pipe(
windowTime(1000), // 1-second windows
map(window$ => window$.pipe(
reduce((count) => count + 1, 0)
)),
mergeAll()
).subscribe(count => {
if (count > 0) {
console.log(`${count} clicks in last second`);
}
});
// windowCount - count-based grouping
interval(1000).pipe(
take(10),
windowCount(3), // Groups of 3
map((window$, index) => {
console.log('Batch', index);
return window$.pipe(
reduce((acc, val) => [...acc, val], [])
);
}),
mergeAll()
).subscribe(batch => {
console.log('Processing batch:', batch);
});
// Practical: Real-time metrics aggregation
const metrics$ = interval(100).pipe(
map(() => ({
cpu: Math.random() * 100,
memory: Math.random() * 100,
timestamp: Date.now()
}))
);
metrics$.pipe(
windowTime(5000), // 5-second windows
map(window$ => window$.pipe(
reduce((acc, metric) => ({
avgCpu: (acc.avgCpu + metric.cpu) / 2,
avgMemory: (acc.avgMemory + metric.memory) / 2,
count: acc.count + 1
}), { avgCpu: 0, avgMemory: 0, count: 0 })
)),
mergeAll()
).subscribe(stats => {
console.log('5s stats:', stats);
sendToMonitoring(stats);
});
// Practical: Message batching for bulk API
const messages$ = new Subject<Message>();
messages$.pipe(
windowTime(2000), // Batch every 2 seconds
mergeMap(window$ => window$.pipe(
reduce((acc, msg) => [...acc, msg], [])
)),
filter(batch => batch.length > 0),
mergeMap(batch =>
ajax.post('/api/messages/bulk', { messages: batch })
)
).subscribe(() => {
console.log('Batch sent');
});
// Send individual messages
messages$.next({ text: 'Hello' });
messages$.next({ text: 'World' });
// Automatically batched and sent every 2s
// Practical: Analytics event batching
const analyticsEvents$ = new Subject<AnalyticsEvent>();
analyticsEvents$.pipe(
windowTime(10000), // 10-second windows
mergeMap(window$ => window$.pipe(
reduce((events, event) => [...events, event], [])
)),
filter(events => events.length > 0)
).subscribe(events => {
sendAnalyticsBatch(events);
console.log(`Sent ${events.length} analytics events`);
});
// Practical: Sliding window for moving average
interval(1000).pipe(
take(20),
map(() => Math.random() * 100),
windowTime(5000, 1000), // 5s window, new window every 1s
mergeMap(window$ => window$.pipe(
reduce((acc, val) => [...acc, val], []),
map(values => {
const sum = values.reduce((a, b) => a + b, 0);
return sum / values.length;
})
))
).subscribe(avg => {
console.log('5-second moving average:', avg.toFixed(2));
});
// Practical: Request rate limiting
const requests$ = new Subject<Request>();
requests$.pipe(
windowTime(1000), // 1-second windows
mergeMap(window$ => window$.pipe(
take(10) // Max 10 requests per second
))
).subscribe(request => {
processRequest(request);
});
// Practical: Grouped error tracking
const errors$ = new Subject<Error>();
errors$.pipe(
windowTime(60000), // 1-minute windows
mergeMap(window$ => window$.pipe(
reduce((acc, error) => {
acc[error.type] = (acc[error.type] || 0) + 1;
return acc;
}, {})
))
).subscribe(errorCounts => {
console.log('Error counts (last minute):', errorCounts);
if (Object.values(errorCounts).some(count => count > 10)) {
triggerAlert('High error rate detected');
}
});
Note: window operators return Observable of Observables
(higher-order). Use mergeAll, switchAll, or map+mergeAll to process windows. Similar to buffer but with nested
observables.
Section 9 Summary
- switchMap cancels previous inner observable - perfect for search/autocomplete (only latest matters)
- mergeMap runs all inner observables concurrently - use concurrent parameter to limit parallelism
- concatMap queues and processes sequentially in order - when sequence matters
- exhaustMap ignores new emissions while busy - ideal for login/submit buttons
- expand recursively applies projection - pagination, tree traversal, always return EMPTY to terminate
- window/windowTime groups emissions into nested observables - batch processing, metrics aggregation