Utility Operators and Stream Manipulation
1. tap for Side Effects without Affecting Stream
| Feature | Syntax | Description | Use Case |
|---|---|---|---|
| tap | tap(observer | nextFn) |
Performs side effects without modifying stream values | Logging, debugging, analytics |
| Full observer | tap({ next, error, complete }) |
Handle all notification types | Comprehensive side effects |
| Transparent | Does not transform values or affect stream | Pure observation without interference | Non-invasive monitoring |
Example: tap for debugging and side effects
import { of, interval } from 'rxjs';
import { tap, map, filter, take } from 'rxjs/operators';
// Basic tap - logging without affecting stream
of(1, 2, 3, 4, 5).pipe(
tap(val => console.log('Before filter:', val)),
filter(val => val % 2 === 0),
tap(val => console.log('After filter:', val)),
map(val => val * 10),
tap(val => console.log('After map:', val))
).subscribe(val => console.log('Final:', val));
// Full observer syntax
interval(1000).pipe(
take(3),
tap({
next: val => console.log('Next:', val),
error: err => console.error('Error:', err),
complete: () => console.log('Complete!')
})
).subscribe();
// Practical: Analytics tracking
fromEvent(button, 'click').pipe(
tap(event => {
analytics.track('button_click', {
buttonId: event.target.id,
timestamp: Date.now()
});
}),
map(event => event.target.value)
).subscribe(value => processValue(value));
// Practical: Debug pipeline
ajax.getJSON('/api/users').pipe(
tap(response => console.log('Raw response:', response)),
map(response => response.data),
tap(data => console.log('Extracted data:', data)),
filter(data => data.active),
tap(data => console.log('Filtered data:', data)),
map(data => transformData(data)),
tap(transformed => console.log('Transformed:', transformed))
).subscribe(result => displayResult(result));
// Practical: Loading state management
this.http.get('/api/data').pipe(
tap(() => this.isLoading = true),
tap(() => this.loadingStartTime = Date.now()),
tap({
next: data => {
this.isLoading = false;
const duration = Date.now() - this.loadingStartTime;
console.log(`Loaded in ${duration}ms`);
},
error: err => {
this.isLoading = false;
this.showError(err);
}
})
).subscribe(data => this.data = data);
// Practical: Caching side effect
const cache = new Map();
ajax.getJSON('/api/expensive').pipe(
tap(data => {
cache.set('expensive-data', data);
cache.set('cached-at', Date.now());
})
).subscribe();
// Practical: Progress tracking
const files = [file1, file2, file3, file4, file5];
let completed = 0;
from(files).pipe(
mergeMap(file => uploadFile(file).pipe(
tap(() => {
completed++;
const progress = (completed / files.length) * 100;
updateProgressBar(progress);
console.log(`Progress: ${progress.toFixed(0)}%`);
})
), 2)
).subscribe({
complete: () => console.log('All uploads complete')
});
// Practical: Request/response logging
const logRequest$ = (url: string) => ajax.getJSON(url).pipe(
tap({
subscribe: () => console.log(`[REQUEST] ${url}`),
next: data => console.log(`[RESPONSE] ${url}:`, data),
error: err => console.error(`[ERROR] ${url}:`, err),
complete: () => console.log(`[COMPLETE] ${url}`),
finalize: () => console.log(`[FINALIZE] ${url}`)
})
);
// Practical: Form validation feedback
formValue$.pipe(
tap(() => clearValidationErrors()),
switchMap(value => validateForm(value)),
tap(validation => {
if (validation.errors.length > 0) {
displayErrors(validation.errors);
}
})
).subscribe();
// Practical: Debugging slow operations
source$.pipe(
tap(() => console.time('operation')),
map(heavyComputation),
tap(() => console.timeEnd('operation'))
).subscribe();
// Practical: State synchronization
dataStream$.pipe(
tap(data => {
localStorage.setItem('lastData', JSON.stringify(data));
this.store.dispatch(updateAction(data));
})
).subscribe(data => this.currentData = data);
// Practical: Monitoring and metrics
api$.pipe(
tap({
next: () => metrics.increment('api.success'),
error: () => metrics.increment('api.error')
})
).subscribe();
Note: tap is transparent - never modifies stream values. Perfect
for logging, debugging, analytics, and side effects without affecting data flow.
2. delay and delayWhen for Time-based Stream Delays
| Operator | Syntax | Description | Delay Strategy |
|---|---|---|---|
| delay | delay(dueTime, scheduler?) |
Delays all emissions by fixed duration (ms) | Constant delay for all values |
| delay (Date) | delay(new Date('2024-01-01')) |
Delays emissions until specific date/time | Absolute time delay |
| delayWhen | delayWhen(delayDurationSelector) |
Delays each emission by duration from selector observable | Dynamic per-value delay |
Example: delay operators for timing control
import { of, interval, timer } from 'rxjs';
import { delay, delayWhen, map, take } from 'rxjs/operators';
// Basic delay - constant delay
of(1, 2, 3).pipe(
delay(1000) // Delay entire stream by 1s
).subscribe(val => console.log('Delayed:', val));
// Delay each emission
interval(500).pipe(
take(5),
delay(2000) // Each value delayed by 2s from original time
).subscribe(val => console.log('Value:', val));
// Delay until specific time
const futureTime = new Date(Date.now() + 5000);
of('Message').pipe(
delay(futureTime)
).subscribe(msg => console.log('Delivered at scheduled time:', msg));
// delayWhen - dynamic delay per value
of(1, 2, 3).pipe(
delayWhen(val => timer(val * 1000))
).subscribe(val => console.log('Dynamic delay:', val));
// 1 after 1s, 2 after 2s, 3 after 3s
// Practical: Retry with exponential backoff
function retryWithBackoff(source$, maxRetries = 3) {
return source$.pipe(
retryWhen(errors => errors.pipe(
mergeMap((error, index) => {
if (index >= maxRetries) {
return throwError(() => error);
}
const delayMs = Math.pow(2, index) * 1000;
console.log(`Retry ${index + 1} after ${delayMs}ms`);
return of(error).pipe(delay(delayMs));
})
))
);
}
retryWithBackoff(ajax.getJSON('/api/flaky')).subscribe(
data => console.log('Success:', data),
err => console.error('Failed after retries:', err)
);
// Practical: Staggered animation
const elements = [box1, box2, box3, box4];
from(elements).pipe(
delayWhen((element, index) => timer(index * 200))
).subscribe(element => {
element.classList.add('animate-in');
});
// Practical: Rate limiting with delay
const requests = [req1, req2, req3, req4, req5];
from(requests).pipe(
concatMap(req => ajax(req).pipe(
delay(200) // 200ms between requests
))
).subscribe(response => processResponse(response));
// Practical: Delayed notification
of('Session expiring soon').pipe(
delay(25 * 60 * 1000) // 25 minutes
).subscribe(message => showWarning(message));
// Practical: Simulated network latency (testing)
const mockAPI = (data) => of(data).pipe(
delay(Math.random() * 1000 + 500) // 500-1500ms random delay
);
mockAPI({ users: [...] }).subscribe(data => {
console.log('Mock response:', data);
});
// Practical: Tooltip delay
fromEvent(element, 'mouseenter').pipe(
switchMap(() => of(true).pipe(
delay(500), // Show tooltip after 500ms hover
takeUntil(fromEvent(element, 'mouseleave'))
))
).subscribe(() => showTooltip());
// Practical: Debounced save with confirmation delay
formValue$.pipe(
debounceTime(1000),
switchMap(value => saveData(value).pipe(
switchMap(response => of(response).pipe(
tap(() => showSaveSuccess()),
delay(2000), // Hide success message after 2s
tap(() => hideSaveSuccess())
))
))
).subscribe();
// Practical: Message queue with processing delay
const messageQueue$ = new Subject<Message>();
messageQueue$.pipe(
concatMap(message => of(message).pipe(
delay(100), // Process at max 10 msgs/second
tap(msg => processMessage(msg))
))
).subscribe();
// Practical: Scheduled task execution
const tasks = [task1, task2, task3];
const startTime = new Date(Date.now() + 10000); // Start in 10s
from(tasks).pipe(
delay(startTime),
concatMap(task => executeTask(task))
).subscribe();
// delayWhen with conditional delay
source$.pipe(
delayWhen(value =>
value.priority === 'high'
? timer(0) // No delay for high priority
: timer(5000) // 5s delay for normal
)
).subscribe();
3. repeat and repeatWhen for Stream Repetition
| Operator | Syntax | Description | Repetition Control |
|---|---|---|---|
| repeat | repeat(count?) |
Re-subscribes to source when it completes | Fixed count or infinite |
| repeatWhen | repeatWhen(notifier) |
Re-subscribes based on notifier observable emissions | Dynamic, conditional repetition |
| Behavior | Only repeats on completion, not on error | For errors, use retry/retryWhen | Success repetition only |
Example: repeat operators for stream repetition
import { of, interval, fromEvent } from 'rxjs';
import { repeat, repeatWhen, take, delay, tap } from 'rxjs/operators';
// Basic repeat - repeat 3 times
of(1, 2, 3).pipe(
tap(val => console.log('Value:', val)),
repeat(3)
).subscribe({
complete: () => console.log('All repetitions complete')
});
// Output: 1,2,3, 1,2,3, 1,2,3
// Infinite repeat
interval(1000).pipe(
take(3),
tap(val => console.log('Interval:', val)),
repeat() // Infinite - careful!
).subscribe();
// Output: 0,1,2, 0,1,2, 0,1,2... forever
// repeatWhen - conditional repetition
of('Attempt').pipe(
tap(val => console.log(val)),
repeatWhen(notifications => notifications.pipe(
delay(1000), // Wait 1s between repetitions
take(3) // Repeat 3 times
))
).subscribe({
complete: () => console.log('Done repeating')
});
// Practical: Polling
function pollAPI(url: string, intervalMs: number) {
return ajax.getJSON(url).pipe(
repeatWhen(notifications => notifications.pipe(
delay(intervalMs)
))
);
}
pollAPI('/api/status', 5000).pipe(
takeUntil(stopPolling$)
).subscribe(status => updateStatus(status));
// Practical: Heartbeat
of('ping').pipe(
tap(() => console.log('Heartbeat sent')),
switchMap(() => ajax.post('/api/heartbeat')),
repeatWhen(notifications => notifications.pipe(
delay(30000) // Every 30 seconds
)),
takeUntil(disconnect$)
).subscribe();
// Practical: Game loop
function gameLoop() {
return of(null).pipe(
tap(() => {
updateGameState();
renderFrame();
}),
repeatWhen(notifications => notifications.pipe(
delay(16) // ~60 FPS
))
);
}
gameLoop().pipe(
takeUntil(fromEvent(stopButton, 'click'))
).subscribe();
// Practical: Retry with increasing delay
let attemptCount = 0;
ajax.getJSON('/api/data').pipe(
tap(() => attemptCount++),
repeatWhen(notifications => notifications.pipe(
delayWhen(() => timer(Math.min(attemptCount * 1000, 10000))),
take(5) // Max 5 attempts
))
).subscribe(
data => console.log('Data:', data),
err => console.error('Failed:', err)
);
// Practical: Scheduled tasks
of('Task execution').pipe(
tap(() => executeScheduledTask()),
repeatWhen(notifications => notifications.pipe(
delay(60 * 60 * 1000) // Every hour
)),
takeUntil(shutdownSignal$)
).subscribe();
// Practical: Animation loop with repeat
function animateElement(element, duration) {
return of(element).pipe(
tap(el => el.classList.add('animate')),
delay(duration),
tap(el => el.classList.remove('animate')),
delay(500), // Pause between animations
repeat(3) // Animate 3 times
);
}
animateElement(box, 1000).subscribe({
complete: () => console.log('Animation complete')
});
// Practical: Data sync with conditional repeat
function syncData() {
return ajax.post('/api/sync', localChanges).pipe(
tap(response => {
if (response.hasMore) {
console.log('More data to sync');
}
}),
repeatWhen(notifications => notifications.pipe(
delayWhen(() => timer(1000)),
takeWhile(() => hasLocalChanges())
))
);
}
// Practical: Queue processor with repeat
const queue = [];
of(null).pipe(
tap(() => {
if (queue.length > 0) {
const item = queue.shift();
processItem(item);
}
}),
repeatWhen(notifications => notifications.pipe(
delay(100)
)),
takeUntil(stopProcessing$)
).subscribe();
// Practical: Metrics collection
of(null).pipe(
switchMap(() => collectMetrics()),
tap(metrics => sendToAnalytics(metrics)),
repeatWhen(notifications => notifications.pipe(
delay(10000) // Collect every 10 seconds
))
).subscribe();
// Practical: Keep-alive connection
of('keep-alive').pipe(
switchMap(() => ajax.post('/api/keep-alive')),
repeatWhen(notifications => notifications.pipe(
delay(45000) // Every 45 seconds
)),
takeUntil(connectionClosed$)
).subscribe(
response => console.log('Keep-alive sent'),
err => console.error('Keep-alive failed:', err)
);
Note: repeat only triggers on completion, not errors. Use with
takeUntil to stop infinite repetition. For errors, use retry/retryWhen instead.
4. sample and sampleTime for Periodic Sampling
| Operator | Syntax | Description | Sampling Strategy |
|---|---|---|---|
| sample | sample(notifier$) |
Emits most recent value when notifier emits | Event-based sampling |
| sampleTime | sampleTime(period) |
Emits most recent value periodically (ms) | Time-based sampling |
| Behavior | Only emits if source emitted since last sample | Ignores periods with no source emissions | Leading edge sampling |
Example: sample operators for value sampling
import { interval, fromEvent } from 'rxjs';
import { sample, sampleTime, map, take } from 'rxjs/operators';
// Basic sample - sample on click
const source$ = interval(500).pipe(take(20));
const clicks$ = fromEvent(document, 'click');
source$.pipe(
sample(clicks$)
).subscribe(val => console.log('Sampled on click:', val));
// Emits latest interval value when user clicks
// sampleTime - periodic sampling
interval(100).pipe(
sampleTime(1000)
).subscribe(val => console.log('Sample:', val));
// Samples every 1 second (emits latest value from 100ms intervals)
// Practical: Mouse position sampling
const mouseMove$ = fromEvent(document, 'mousemove').pipe(
map(e => ({ x: e.clientX, y: e.clientY }))
);
mouseMove$.pipe(
sampleTime(200) // Sample every 200ms
).subscribe(pos => {
updatePositionDisplay(pos);
// Reduces updates from potentially thousands to 5/second
});
// Practical: Sensor data throttling
const sensorData$ = interval(10).pipe(
map(() => ({
temperature: readTemperature(),
humidity: readHumidity(),
timestamp: Date.now()
}))
);
sensorData$.pipe(
sampleTime(5000) // Sample every 5 seconds
).subscribe(data => {
logSensorData(data);
// Reduces logging from 100/sec to 1/5sec
});
// Practical: Form value sampling on button click
const formValue$ = formControl.valueChanges;
const submitClick$ = fromEvent(submitBtn, 'click');
formValue$.pipe(
sample(submitClick$)
).subscribe(value => {
console.log('Form value on submit:', value);
submitForm(value);
});
// Practical: Window resize sampling
fromEvent(window, 'resize').pipe(
sampleTime(500) // Sample every 500ms
).subscribe(() => {
recalculateLayout();
// Prevents excessive recalculations during resize
});
// Practical: Scroll position sampling
fromEvent(window, 'scroll').pipe(
map(() => window.scrollY),
sampleTime(100)
).subscribe(scrollY => {
updateScrollIndicator(scrollY);
checkLazyLoadImages(scrollY);
});
// Practical: Real-time chart updates
const dataStream$ = websocket.pipe(
map(msg => msg.value)
);
dataStream$.pipe(
sampleTime(1000) // Update chart every second
).subscribe(value => {
addDataPointToChart(value);
// Smooth chart updates instead of jerky real-time
});
// Practical: Network bandwidth sampling
const bytesTransferred$ = new Subject<number>();
bytesTransferred$.pipe(
scan((acc, bytes) => acc + bytes, 0),
sampleTime(1000),
map((total, index) => {
const previous = this.previousTotal || 0;
const bandwidth = (total - previous) / 1024; // KB/s
this.previousTotal = total;
return bandwidth;
})
).subscribe(bandwidth => {
displayBandwidth(`${bandwidth.toFixed(2)} KB/s`);
});
// Practical: Game state sampling
const gameState$ = new BehaviorSubject(initialState);
gameState$.pipe(
sampleTime(16) // Sample at ~60 FPS
).subscribe(state => {
renderGame(state);
});
// Practical: Audio level meter
const audioLevel$ = new Subject<number>();
audioLevel$.pipe(
sampleTime(50) // Update 20 times per second
).subscribe(level => {
updateAudioMeter(level);
});
// Practical: Sample on interval
const rapidSource$ = interval(50);
const sampler$ = interval(1000);
rapidSource$.pipe(
sample(sampler$)
).subscribe(val => console.log('Sampled:', val));
// Samples every 1 second instead of every 50ms
// Practical: Price ticker sampling
const priceUpdates$ = websocket.pipe(
filter(msg => msg.type === 'price'),
map(msg => msg.price)
);
priceUpdates$.pipe(
sampleTime(500) // Update UI twice per second
).subscribe(price => {
updatePriceDisplay(price);
});
5. audit and auditTime for Trailing Edge Throttling
| Operator | Syntax | Description | Edge |
|---|---|---|---|
| audit | audit(durationSelector) |
Emits most recent value after duration selector completes | Trailing edge |
| auditTime | auditTime(duration) |
Emits most recent value after fixed duration | Trailing edge |
| vs throttle | audit emits trailing (last), throttle emits leading (first) | Opposite edge sampling | Different use cases |
Example: audit operators for trailing edge throttling
import { fromEvent, interval, timer } from 'rxjs';
import { audit, auditTime, map, throttleTime } from 'rxjs/operators';
// Basic auditTime
fromEvent(button, 'click').pipe(
auditTime(1000)
).subscribe(() => console.log('Click processed (trailing)'));
// Ignores clicks for 1s after each processed click
// audit with dynamic duration
fromEvent(button, 'click').pipe(
audit(() => timer(1000))
).subscribe(() => console.log('Audited click'));
// Practical: Search input (trailing edge)
fromEvent(searchInput, 'input').pipe(
map(e => e.target.value),
auditTime(500)
).subscribe(searchTerm => {
performSearch(searchTerm);
// Searches with final value after user stops typing for 500ms
});
// Practical: Window resize (final dimensions)
fromEvent(window, 'resize').pipe(
auditTime(300)
).subscribe(() => {
const width = window.innerWidth;
const height = window.innerHeight;
recalculateLayout(width, height);
// Uses final dimensions after resize settles
});
// Practical: Scroll position (final position)
fromEvent(window, 'scroll').pipe(
map(() => window.scrollY),
auditTime(200)
).subscribe(finalScrollY => {
updateScrollBasedContent(finalScrollY);
// Updates based on final scroll position
});
// Practical: Button click flood prevention
fromEvent(actionBtn, 'click').pipe(
auditTime(2000)
).subscribe(() => {
performExpensiveAction();
// Executes with last click in 2-second window
});
// Practical: Form field validation (final value)
fromEvent(emailInput, 'input').pipe(
map(e => e.target.value),
auditTime(800)
).subscribe(email => {
validateEmail(email);
// Validates final value after user stops typing
});
// Practical: Drag end position
fromEvent(element, 'drag').pipe(
map(e => ({ x: e.clientX, y: e.clientY })),
auditTime(100)
).subscribe(finalPosition => {
updateElementPosition(finalPosition);
});
// Compare: throttleTime (leading) vs auditTime (trailing)
const clicks$ = fromEvent(button, 'click');
// throttleTime - emits first, ignores rest
clicks$.pipe(
throttleTime(1000)
).subscribe(() => console.log('Throttle: First click'));
// auditTime - emits last after period
clicks$.pipe(
auditTime(1000)
).subscribe(() => console.log('Audit: Last click'));
// Practical: Auto-save (save final changes)
formValue$.pipe(
auditTime(3000) // Wait 3s of no changes
).subscribe(value => {
autoSave(value);
showSaveIndicator();
});
// Practical: Live preview update (final content)
fromEvent(textarea, 'input').pipe(
map(e => e.target.value),
auditTime(500)
).subscribe(content => {
updatePreview(content);
// Preview shows final content after typing pauses
});
// Practical: API rate limiting (trailing requests)
const apiRequests$ = new Subject<Request>();
apiRequests$.pipe(
auditTime(1000) // Max 1 request per second (last one wins)
).subscribe(request => {
sendRequest(request);
});
// Practical: Mouse hover end detection
fromEvent(element, 'mousemove').pipe(
auditTime(1000)
).subscribe(() => {
console.log('Mouse stopped moving');
showContextMenu();
});
// audit with custom duration selector
clicks$.pipe(
audit(click => {
// Longer delay for rapid clicks
return timer(rapidClickDetected ? 2000 : 500);
})
).subscribe();
// Practical: Zoom level adjustment (final zoom)
fromEvent(canvas, 'wheel').pipe(
map(e => e.deltaY),
auditTime(150)
).subscribe(delta => {
adjustZoom(delta);
renderAtNewZoom();
});
Note: auditTime emits the trailing (last) value, unlike
throttleTime which emits leading (first). Use audit for capturing final state after activity settles.
6. materialize and dematerialize for Notification Objects
| Operator | Syntax | Description | Transformation |
|---|---|---|---|
| materialize | materialize() |
Converts emissions/errors/completion to Notification objects | Observable<T> → Observable<Notification<T>> |
| dematerialize | dematerialize() |
Converts Notification objects back to normal emissions | Observable<Notification<T>> → Observable<T> |
| Notification types | next, error, complete | Metadata about stream events | Reify stream semantics |
Example: materialize and dematerialize for metadata handling
import { of, throwError, EMPTY } from 'rxjs';
import { materialize, dematerialize, map, delay } from 'rxjs/operators';
// Basic materialize - convert to notifications
of(1, 2, 3).pipe(
materialize()
).subscribe(notification => {
console.log('Kind:', notification.kind);
console.log('Value:', notification.value);
console.log('Has value:', notification.hasValue);
});
// Output:
// { kind: 'N', value: 1, hasValue: true }
// { kind: 'N', value: 2, hasValue: true }
// { kind: 'N', value: 3, hasValue: true }
// { kind: 'C', hasValue: false }
// Error notification
throwError(() => new Error('Oops')).pipe(
materialize()
).subscribe(notification => {
console.log('Kind:', notification.kind); // 'E'
console.log('Error:', notification.error);
});
// dematerialize - convert back
of(
{ kind: 'N', value: 1 },
{ kind: 'N', value: 2 },
{ kind: 'C' }
).pipe(
dematerialize()
).subscribe({
next: val => console.log('Value:', val),
complete: () => console.log('Complete')
});
// Practical: Error logging without stopping stream
ajax.getJSON('/api/data').pipe(
materialize(),
tap(notification => {
if (notification.kind === 'E') {
console.error('API Error:', notification.error);
logError(notification.error);
}
}),
dematerialize()
).subscribe();
// Practical: Delay errors
source$.pipe(
materialize(),
delay(1000), // Delay all notifications including errors
dematerialize()
).subscribe();
// Practical: Convert errors to values
source$.pipe(
materialize(),
map(notification => {
if (notification.kind === 'E') {
return { kind: 'N', value: { error: notification.error } };
}
return notification;
}),
dematerialize()
).subscribe(result => {
if (result.error) {
handleError(result.error);
} else {
handleSuccess(result);
}
});
// Practical: Stream replay with timing
const notifications = [];
source$.pipe(
materialize(),
tap(n => notifications.push({
notification: n,
timestamp: Date.now()
}))
).subscribe();
// Later, replay with original timing
function replayStream() {
const startTime = Date.now();
from(notifications).pipe(
concatMap(item => {
const delay = item.timestamp - startTime;
return of(item.notification).pipe(delay(Math.max(0, delay)));
}),
dematerialize()
).subscribe();
}
// Practical: Error recovery with metadata
source$.pipe(
materialize(),
scan((acc, notification) => {
if (notification.kind === 'E') {
acc.errors.push(notification.error);
return { ...acc, lastError: notification.error };
}
if (notification.kind === 'N') {
acc.values.push(notification.value);
}
return acc;
}, { values: [], errors: [], lastError: null }),
map(state => {
// Convert back to notification
if (state.lastError) {
return { kind: 'N', value: { error: state.lastError } };
}
return { kind: 'N', value: state.values[state.values.length - 1] };
}),
dematerialize()
).subscribe();
// Practical: Notification filtering
source$.pipe(
materialize(),
filter(notification => {
// Filter out specific errors
if (notification.kind === 'E') {
return !isIgnorableError(notification.error);
}
return true;
}),
dematerialize()
).subscribe();
// Practical: Testing observable behavior
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--a--b--#', { a: 1, b: 2 }, new Error('fail'));
const materialized$ = source$.pipe(materialize());
expectObservable(materialized$).toBe('--a--b--(c|)', {
a: { kind: 'N', value: 1, hasValue: true },
b: { kind: 'N', value: 2, hasValue: true },
c: { kind: 'E', error: new Error('fail'), hasValue: false }
});
});
// Practical: Stream introspection
source$.pipe(
materialize(),
tap(notification => {
metrics.increment(`stream.${notification.kind}`);
if (notification.kind === 'N') {
console.log('Value emitted:', notification.value);
} else if (notification.kind === 'E') {
console.error('Error occurred:', notification.error);
} else if (notification.kind === 'C') {
console.log('Stream completed');
}
}),
dematerialize()
).subscribe();
// Practical: Conditional retry based on error type
source$.pipe(
materialize(),
expand(notification => {
if (notification.kind === 'E' && isRetryable(notification.error)) {
return source$.pipe(
delay(1000),
materialize()
);
}
return EMPTY;
}),
dematerialize()
).subscribe();
Note: materialize/dematerialize convert stream events to/from
Notification objects. Useful for delaying errors, logging stream behavior, or manipulating stream semantics.
Section 10 Summary
- tap performs side effects without modifying stream - perfect for logging, analytics, debugging
- delay delays all emissions by fixed duration, delayWhen applies dynamic per-value delays
- repeat re-subscribes on completion, repeatWhen enables conditional repetition with delays
- sample/sampleTime emits most recent value periodically - reduces high-frequency streams
- audit/auditTime emits trailing edge (last value after silence) - opposite of throttle
- materialize/dematerialize converts stream events to Notification objects for metadata manipulation