Utility Operators and Stream Manipulation

1. tap for Side Effects without Affecting Stream

Feature Syntax Description Use Case
tap tap(observer | nextFn) Performs side effects without modifying stream values Logging, debugging, analytics
Full observer tap({ next, error, complete }) Handle all notification types Comprehensive side effects
Transparent Does not transform values or affect stream Pure observation without interference Non-invasive monitoring

Example: tap for debugging and side effects

import { of, interval } from 'rxjs';
import { tap, map, filter, take } from 'rxjs/operators';

// Basic tap - logging without affecting stream
of(1, 2, 3, 4, 5).pipe(
  tap(val => console.log('Before filter:', val)),
  filter(val => val % 2 === 0),
  tap(val => console.log('After filter:', val)),
  map(val => val * 10),
  tap(val => console.log('After map:', val))
).subscribe(val => console.log('Final:', val));

// Full observer syntax
interval(1000).pipe(
  take(3),
  tap({
    next: val => console.log('Next:', val),
    error: err => console.error('Error:', err),
    complete: () => console.log('Complete!')
  })
).subscribe();

// Practical: Analytics tracking
fromEvent(button, 'click').pipe(
  tap(event => {
    analytics.track('button_click', {
      buttonId: event.target.id,
      timestamp: Date.now()
    });
  }),
  map(event => event.target.value)
).subscribe(value => processValue(value));

// Practical: Debug pipeline
ajax.getJSON('/api/users').pipe(
  tap(response => console.log('Raw response:', response)),
  map(response => response.data),
  tap(data => console.log('Extracted data:', data)),
  filter(data => data.active),
  tap(data => console.log('Filtered data:', data)),
  map(data => transformData(data)),
  tap(transformed => console.log('Transformed:', transformed))
).subscribe(result => displayResult(result));

// Practical: Loading state management
this.http.get('/api/data').pipe(
  tap(() => this.isLoading = true),
  tap(() => this.loadingStartTime = Date.now()),
  tap({
    next: data => {
      this.isLoading = false;
      const duration = Date.now() - this.loadingStartTime;
      console.log(`Loaded in ${duration}ms`);
    },
    error: err => {
      this.isLoading = false;
      this.showError(err);
    }
  })
).subscribe(data => this.data = data);

// Practical: Caching side effect
const cache = new Map();

ajax.getJSON('/api/expensive').pipe(
  tap(data => {
    cache.set('expensive-data', data);
    cache.set('cached-at', Date.now());
  })
).subscribe();

// Practical: Progress tracking
const files = [file1, file2, file3, file4, file5];
let completed = 0;

from(files).pipe(
  mergeMap(file => uploadFile(file).pipe(
    tap(() => {
      completed++;
      const progress = (completed / files.length) * 100;
      updateProgressBar(progress);
      console.log(`Progress: ${progress.toFixed(0)}%`);
    })
  ), 2)
).subscribe({
  complete: () => console.log('All uploads complete')
});

// Practical: Request/response logging
const logRequest$ = (url: string) => ajax.getJSON(url).pipe(
  tap({
    subscribe: () => console.log(`[REQUEST] ${url}`),
    next: data => console.log(`[RESPONSE] ${url}:`, data),
    error: err => console.error(`[ERROR] ${url}:`, err),
    complete: () => console.log(`[COMPLETE] ${url}`),
    finalize: () => console.log(`[FINALIZE] ${url}`)
  })
);

// Practical: Form validation feedback
formValue$.pipe(
  tap(() => clearValidationErrors()),
  switchMap(value => validateForm(value)),
  tap(validation => {
    if (validation.errors.length > 0) {
      displayErrors(validation.errors);
    }
  })
).subscribe();

// Practical: Debugging slow operations
source$.pipe(
  tap(() => console.time('operation')),
  map(heavyComputation),
  tap(() => console.timeEnd('operation'))
).subscribe();

// Practical: State synchronization
dataStream$.pipe(
  tap(data => {
    localStorage.setItem('lastData', JSON.stringify(data));
    this.store.dispatch(updateAction(data));
  })
).subscribe(data => this.currentData = data);

// Practical: Monitoring and metrics
api$.pipe(
  tap({
    next: () => metrics.increment('api.success'),
    error: () => metrics.increment('api.error')
  })
).subscribe();
Note: tap is transparent - never modifies stream values. Perfect for logging, debugging, analytics, and side effects without affecting data flow.

2. delay and delayWhen for Time-based Stream Delays

Operator Syntax Description Delay Strategy
delay delay(dueTime, scheduler?) Delays all emissions by fixed duration (ms) Constant delay for all values
delay (Date) delay(new Date('2024-01-01')) Delays emissions until specific date/time Absolute time delay
delayWhen delayWhen(delayDurationSelector) Delays each emission by duration from selector observable Dynamic per-value delay

Example: delay operators for timing control

import { of, interval, timer } from 'rxjs';
import { delay, delayWhen, map, take } from 'rxjs/operators';

// Basic delay - constant delay
of(1, 2, 3).pipe(
  delay(1000) // Delay entire stream by 1s
).subscribe(val => console.log('Delayed:', val));

// Delay each emission
interval(500).pipe(
  take(5),
  delay(2000) // Each value delayed by 2s from original time
).subscribe(val => console.log('Value:', val));

// Delay until specific time
const futureTime = new Date(Date.now() + 5000);
of('Message').pipe(
  delay(futureTime)
).subscribe(msg => console.log('Delivered at scheduled time:', msg));

// delayWhen - dynamic delay per value
of(1, 2, 3).pipe(
  delayWhen(val => timer(val * 1000))
).subscribe(val => console.log('Dynamic delay:', val));
// 1 after 1s, 2 after 2s, 3 after 3s

// Practical: Retry with exponential backoff
function retryWithBackoff(source$, maxRetries = 3) {
  return source$.pipe(
    retryWhen(errors => errors.pipe(
      mergeMap((error, index) => {
        if (index >= maxRetries) {
          return throwError(() => error);
        }
        const delayMs = Math.pow(2, index) * 1000;
        console.log(`Retry ${index + 1} after ${delayMs}ms`);
        return of(error).pipe(delay(delayMs));
      })
    ))
  );
}

retryWithBackoff(ajax.getJSON('/api/flaky')).subscribe(
  data => console.log('Success:', data),
  err => console.error('Failed after retries:', err)
);

// Practical: Staggered animation
const elements = [box1, box2, box3, box4];

from(elements).pipe(
  delayWhen((element, index) => timer(index * 200))
).subscribe(element => {
  element.classList.add('animate-in');
});

// Practical: Rate limiting with delay
const requests = [req1, req2, req3, req4, req5];

from(requests).pipe(
  concatMap(req => ajax(req).pipe(
    delay(200) // 200ms between requests
  ))
).subscribe(response => processResponse(response));

// Practical: Delayed notification
of('Session expiring soon').pipe(
  delay(25 * 60 * 1000) // 25 minutes
).subscribe(message => showWarning(message));

// Practical: Simulated network latency (testing)
const mockAPI = (data) => of(data).pipe(
  delay(Math.random() * 1000 + 500) // 500-1500ms random delay
);

mockAPI({ users: [...] }).subscribe(data => {
  console.log('Mock response:', data);
});

// Practical: Tooltip delay
fromEvent(element, 'mouseenter').pipe(
  switchMap(() => of(true).pipe(
    delay(500), // Show tooltip after 500ms hover
    takeUntil(fromEvent(element, 'mouseleave'))
  ))
).subscribe(() => showTooltip());

// Practical: Debounced save with confirmation delay
formValue$.pipe(
  debounceTime(1000),
  switchMap(value => saveData(value).pipe(
    switchMap(response => of(response).pipe(
      tap(() => showSaveSuccess()),
      delay(2000), // Hide success message after 2s
      tap(() => hideSaveSuccess())
    ))
  ))
).subscribe();

// Practical: Message queue with processing delay
const messageQueue$ = new Subject<Message>();

messageQueue$.pipe(
  concatMap(message => of(message).pipe(
    delay(100), // Process at max 10 msgs/second
    tap(msg => processMessage(msg))
  ))
).subscribe();

// Practical: Scheduled task execution
const tasks = [task1, task2, task3];
const startTime = new Date(Date.now() + 10000); // Start in 10s

from(tasks).pipe(
  delay(startTime),
  concatMap(task => executeTask(task))
).subscribe();

// delayWhen with conditional delay
source$.pipe(
  delayWhen(value => 
    value.priority === 'high' 
      ? timer(0) // No delay for high priority
      : timer(5000) // 5s delay for normal
  )
).subscribe();

3. repeat and repeatWhen for Stream Repetition

Operator Syntax Description Repetition Control
repeat repeat(count?) Re-subscribes to source when it completes Fixed count or infinite
repeatWhen repeatWhen(notifier) Re-subscribes based on notifier observable emissions Dynamic, conditional repetition
Behavior Only repeats on completion, not on error For errors, use retry/retryWhen Success repetition only

Example: repeat operators for stream repetition

import { of, interval, fromEvent } from 'rxjs';
import { repeat, repeatWhen, take, delay, tap } from 'rxjs/operators';

// Basic repeat - repeat 3 times
of(1, 2, 3).pipe(
  tap(val => console.log('Value:', val)),
  repeat(3)
).subscribe({
  complete: () => console.log('All repetitions complete')
});
// Output: 1,2,3, 1,2,3, 1,2,3

// Infinite repeat
interval(1000).pipe(
  take(3),
  tap(val => console.log('Interval:', val)),
  repeat() // Infinite - careful!
).subscribe();
// Output: 0,1,2, 0,1,2, 0,1,2... forever

// repeatWhen - conditional repetition
of('Attempt').pipe(
  tap(val => console.log(val)),
  repeatWhen(notifications => notifications.pipe(
    delay(1000), // Wait 1s between repetitions
    take(3) // Repeat 3 times
  ))
).subscribe({
  complete: () => console.log('Done repeating')
});

// Practical: Polling
function pollAPI(url: string, intervalMs: number) {
  return ajax.getJSON(url).pipe(
    repeatWhen(notifications => notifications.pipe(
      delay(intervalMs)
    ))
  );
}

pollAPI('/api/status', 5000).pipe(
  takeUntil(stopPolling$)
).subscribe(status => updateStatus(status));

// Practical: Heartbeat
of('ping').pipe(
  tap(() => console.log('Heartbeat sent')),
  switchMap(() => ajax.post('/api/heartbeat')),
  repeatWhen(notifications => notifications.pipe(
    delay(30000) // Every 30 seconds
  )),
  takeUntil(disconnect$)
).subscribe();

// Practical: Game loop
function gameLoop() {
  return of(null).pipe(
    tap(() => {
      updateGameState();
      renderFrame();
    }),
    repeatWhen(notifications => notifications.pipe(
      delay(16) // ~60 FPS
    ))
  );
}

gameLoop().pipe(
  takeUntil(fromEvent(stopButton, 'click'))
).subscribe();

// Practical: Retry with increasing delay
let attemptCount = 0;

ajax.getJSON('/api/data').pipe(
  tap(() => attemptCount++),
  repeatWhen(notifications => notifications.pipe(
    delayWhen(() => timer(Math.min(attemptCount * 1000, 10000))),
    take(5) // Max 5 attempts
  ))
).subscribe(
  data => console.log('Data:', data),
  err => console.error('Failed:', err)
);

// Practical: Scheduled tasks
of('Task execution').pipe(
  tap(() => executeScheduledTask()),
  repeatWhen(notifications => notifications.pipe(
    delay(60 * 60 * 1000) // Every hour
  )),
  takeUntil(shutdownSignal$)
).subscribe();

// Practical: Animation loop with repeat
function animateElement(element, duration) {
  return of(element).pipe(
    tap(el => el.classList.add('animate')),
    delay(duration),
    tap(el => el.classList.remove('animate')),
    delay(500), // Pause between animations
    repeat(3) // Animate 3 times
  );
}

animateElement(box, 1000).subscribe({
  complete: () => console.log('Animation complete')
});

// Practical: Data sync with conditional repeat
function syncData() {
  return ajax.post('/api/sync', localChanges).pipe(
    tap(response => {
      if (response.hasMore) {
        console.log('More data to sync');
      }
    }),
    repeatWhen(notifications => notifications.pipe(
      delayWhen(() => timer(1000)),
      takeWhile(() => hasLocalChanges())
    ))
  );
}

// Practical: Queue processor with repeat
const queue = [];

of(null).pipe(
  tap(() => {
    if (queue.length > 0) {
      const item = queue.shift();
      processItem(item);
    }
  }),
  repeatWhen(notifications => notifications.pipe(
    delay(100)
  )),
  takeUntil(stopProcessing$)
).subscribe();

// Practical: Metrics collection
of(null).pipe(
  switchMap(() => collectMetrics()),
  tap(metrics => sendToAnalytics(metrics)),
  repeatWhen(notifications => notifications.pipe(
    delay(10000) // Collect every 10 seconds
  ))
).subscribe();

// Practical: Keep-alive connection
of('keep-alive').pipe(
  switchMap(() => ajax.post('/api/keep-alive')),
  repeatWhen(notifications => notifications.pipe(
    delay(45000) // Every 45 seconds
  )),
  takeUntil(connectionClosed$)
).subscribe(
  response => console.log('Keep-alive sent'),
  err => console.error('Keep-alive failed:', err)
);
Note: repeat only triggers on completion, not errors. Use with takeUntil to stop infinite repetition. For errors, use retry/retryWhen instead.

4. sample and sampleTime for Periodic Sampling

Operator Syntax Description Sampling Strategy
sample sample(notifier$) Emits most recent value when notifier emits Event-based sampling
sampleTime sampleTime(period) Emits most recent value periodically (ms) Time-based sampling
Behavior Only emits if source emitted since last sample Ignores periods with no source emissions Leading edge sampling

Example: sample operators for value sampling

import { interval, fromEvent } from 'rxjs';
import { sample, sampleTime, map, take } from 'rxjs/operators';

// Basic sample - sample on click
const source$ = interval(500).pipe(take(20));
const clicks$ = fromEvent(document, 'click');

source$.pipe(
  sample(clicks$)
).subscribe(val => console.log('Sampled on click:', val));
// Emits latest interval value when user clicks

// sampleTime - periodic sampling
interval(100).pipe(
  sampleTime(1000)
).subscribe(val => console.log('Sample:', val));
// Samples every 1 second (emits latest value from 100ms intervals)

// Practical: Mouse position sampling
const mouseMove$ = fromEvent(document, 'mousemove').pipe(
  map(e => ({ x: e.clientX, y: e.clientY }))
);

mouseMove$.pipe(
  sampleTime(200) // Sample every 200ms
).subscribe(pos => {
  updatePositionDisplay(pos);
  // Reduces updates from potentially thousands to 5/second
});

// Practical: Sensor data throttling
const sensorData$ = interval(10).pipe(
  map(() => ({
    temperature: readTemperature(),
    humidity: readHumidity(),
    timestamp: Date.now()
  }))
);

sensorData$.pipe(
  sampleTime(5000) // Sample every 5 seconds
).subscribe(data => {
  logSensorData(data);
  // Reduces logging from 100/sec to 1/5sec
});

// Practical: Form value sampling on button click
const formValue$ = formControl.valueChanges;
const submitClick$ = fromEvent(submitBtn, 'click');

formValue$.pipe(
  sample(submitClick$)
).subscribe(value => {
  console.log('Form value on submit:', value);
  submitForm(value);
});

// Practical: Window resize sampling
fromEvent(window, 'resize').pipe(
  sampleTime(500) // Sample every 500ms
).subscribe(() => {
  recalculateLayout();
  // Prevents excessive recalculations during resize
});

// Practical: Scroll position sampling
fromEvent(window, 'scroll').pipe(
  map(() => window.scrollY),
  sampleTime(100)
).subscribe(scrollY => {
  updateScrollIndicator(scrollY);
  checkLazyLoadImages(scrollY);
});

// Practical: Real-time chart updates
const dataStream$ = websocket.pipe(
  map(msg => msg.value)
);

dataStream$.pipe(
  sampleTime(1000) // Update chart every second
).subscribe(value => {
  addDataPointToChart(value);
  // Smooth chart updates instead of jerky real-time
});

// Practical: Network bandwidth sampling
const bytesTransferred$ = new Subject<number>();

bytesTransferred$.pipe(
  scan((acc, bytes) => acc + bytes, 0),
  sampleTime(1000),
  map((total, index) => {
    const previous = this.previousTotal || 0;
    const bandwidth = (total - previous) / 1024; // KB/s
    this.previousTotal = total;
    return bandwidth;
  })
).subscribe(bandwidth => {
  displayBandwidth(`${bandwidth.toFixed(2)} KB/s`);
});

// Practical: Game state sampling
const gameState$ = new BehaviorSubject(initialState);

gameState$.pipe(
  sampleTime(16) // Sample at ~60 FPS
).subscribe(state => {
  renderGame(state);
});

// Practical: Audio level meter
const audioLevel$ = new Subject<number>();

audioLevel$.pipe(
  sampleTime(50) // Update 20 times per second
).subscribe(level => {
  updateAudioMeter(level);
});

// Practical: Sample on interval
const rapidSource$ = interval(50);
const sampler$ = interval(1000);

rapidSource$.pipe(
  sample(sampler$)
).subscribe(val => console.log('Sampled:', val));
// Samples every 1 second instead of every 50ms

// Practical: Price ticker sampling
const priceUpdates$ = websocket.pipe(
  filter(msg => msg.type === 'price'),
  map(msg => msg.price)
);

priceUpdates$.pipe(
  sampleTime(500) // Update UI twice per second
).subscribe(price => {
  updatePriceDisplay(price);
});

5. audit and auditTime for Trailing Edge Throttling

Operator Syntax Description Edge
audit audit(durationSelector) Emits most recent value after duration selector completes Trailing edge
auditTime auditTime(duration) Emits most recent value after fixed duration Trailing edge
vs throttle audit emits trailing (last), throttle emits leading (first) Opposite edge sampling Different use cases

Example: audit operators for trailing edge throttling

import { fromEvent, interval, timer } from 'rxjs';
import { audit, auditTime, map, throttleTime } from 'rxjs/operators';

// Basic auditTime
fromEvent(button, 'click').pipe(
  auditTime(1000)
).subscribe(() => console.log('Click processed (trailing)'));
// Ignores clicks for 1s after each processed click

// audit with dynamic duration
fromEvent(button, 'click').pipe(
  audit(() => timer(1000))
).subscribe(() => console.log('Audited click'));

// Practical: Search input (trailing edge)
fromEvent(searchInput, 'input').pipe(
  map(e => e.target.value),
  auditTime(500)
).subscribe(searchTerm => {
  performSearch(searchTerm);
  // Searches with final value after user stops typing for 500ms
});

// Practical: Window resize (final dimensions)
fromEvent(window, 'resize').pipe(
  auditTime(300)
).subscribe(() => {
  const width = window.innerWidth;
  const height = window.innerHeight;
  recalculateLayout(width, height);
  // Uses final dimensions after resize settles
});

// Practical: Scroll position (final position)
fromEvent(window, 'scroll').pipe(
  map(() => window.scrollY),
  auditTime(200)
).subscribe(finalScrollY => {
  updateScrollBasedContent(finalScrollY);
  // Updates based on final scroll position
});

// Practical: Button click flood prevention
fromEvent(actionBtn, 'click').pipe(
  auditTime(2000)
).subscribe(() => {
  performExpensiveAction();
  // Executes with last click in 2-second window
});

// Practical: Form field validation (final value)
fromEvent(emailInput, 'input').pipe(
  map(e => e.target.value),
  auditTime(800)
).subscribe(email => {
  validateEmail(email);
  // Validates final value after user stops typing
});

// Practical: Drag end position
fromEvent(element, 'drag').pipe(
  map(e => ({ x: e.clientX, y: e.clientY })),
  auditTime(100)
).subscribe(finalPosition => {
  updateElementPosition(finalPosition);
});

// Compare: throttleTime (leading) vs auditTime (trailing)
const clicks$ = fromEvent(button, 'click');

// throttleTime - emits first, ignores rest
clicks$.pipe(
  throttleTime(1000)
).subscribe(() => console.log('Throttle: First click'));

// auditTime - emits last after period
clicks$.pipe(
  auditTime(1000)
).subscribe(() => console.log('Audit: Last click'));

// Practical: Auto-save (save final changes)
formValue$.pipe(
  auditTime(3000) // Wait 3s of no changes
).subscribe(value => {
  autoSave(value);
  showSaveIndicator();
});

// Practical: Live preview update (final content)
fromEvent(textarea, 'input').pipe(
  map(e => e.target.value),
  auditTime(500)
).subscribe(content => {
  updatePreview(content);
  // Preview shows final content after typing pauses
});

// Practical: API rate limiting (trailing requests)
const apiRequests$ = new Subject<Request>();

apiRequests$.pipe(
  auditTime(1000) // Max 1 request per second (last one wins)
).subscribe(request => {
  sendRequest(request);
});

// Practical: Mouse hover end detection
fromEvent(element, 'mousemove').pipe(
  auditTime(1000)
).subscribe(() => {
  console.log('Mouse stopped moving');
  showContextMenu();
});

// audit with custom duration selector
clicks$.pipe(
  audit(click => {
    // Longer delay for rapid clicks
    return timer(rapidClickDetected ? 2000 : 500);
  })
).subscribe();

// Practical: Zoom level adjustment (final zoom)
fromEvent(canvas, 'wheel').pipe(
  map(e => e.deltaY),
  auditTime(150)
).subscribe(delta => {
  adjustZoom(delta);
  renderAtNewZoom();
});
Note: auditTime emits the trailing (last) value, unlike throttleTime which emits leading (first). Use audit for capturing final state after activity settles.

6. materialize and dematerialize for Notification Objects

Operator Syntax Description Transformation
materialize materialize() Converts emissions/errors/completion to Notification objects Observable<T> → Observable<Notification<T>>
dematerialize dematerialize() Converts Notification objects back to normal emissions Observable<Notification<T>> → Observable<T>
Notification types next, error, complete Metadata about stream events Reify stream semantics

Example: materialize and dematerialize for metadata handling

import { of, throwError, EMPTY } from 'rxjs';
import { materialize, dematerialize, map, delay } from 'rxjs/operators';

// Basic materialize - convert to notifications
of(1, 2, 3).pipe(
  materialize()
).subscribe(notification => {
  console.log('Kind:', notification.kind);
  console.log('Value:', notification.value);
  console.log('Has value:', notification.hasValue);
});
// Output:
// { kind: 'N', value: 1, hasValue: true }
// { kind: 'N', value: 2, hasValue: true }
// { kind: 'N', value: 3, hasValue: true }
// { kind: 'C', hasValue: false }

// Error notification
throwError(() => new Error('Oops')).pipe(
  materialize()
).subscribe(notification => {
  console.log('Kind:', notification.kind); // 'E'
  console.log('Error:', notification.error);
});

// dematerialize - convert back
of(
  { kind: 'N', value: 1 },
  { kind: 'N', value: 2 },
  { kind: 'C' }
).pipe(
  dematerialize()
).subscribe({
  next: val => console.log('Value:', val),
  complete: () => console.log('Complete')
});

// Practical: Error logging without stopping stream
ajax.getJSON('/api/data').pipe(
  materialize(),
  tap(notification => {
    if (notification.kind === 'E') {
      console.error('API Error:', notification.error);
      logError(notification.error);
    }
  }),
  dematerialize()
).subscribe();

// Practical: Delay errors
source$.pipe(
  materialize(),
  delay(1000), // Delay all notifications including errors
  dematerialize()
).subscribe();

// Practical: Convert errors to values
source$.pipe(
  materialize(),
  map(notification => {
    if (notification.kind === 'E') {
      return { kind: 'N', value: { error: notification.error } };
    }
    return notification;
  }),
  dematerialize()
).subscribe(result => {
  if (result.error) {
    handleError(result.error);
  } else {
    handleSuccess(result);
  }
});

// Practical: Stream replay with timing
const notifications = [];

source$.pipe(
  materialize(),
  tap(n => notifications.push({
    notification: n,
    timestamp: Date.now()
  }))
).subscribe();

// Later, replay with original timing
function replayStream() {
  const startTime = Date.now();
  
  from(notifications).pipe(
    concatMap(item => {
      const delay = item.timestamp - startTime;
      return of(item.notification).pipe(delay(Math.max(0, delay)));
    }),
    dematerialize()
  ).subscribe();
}

// Practical: Error recovery with metadata
source$.pipe(
  materialize(),
  scan((acc, notification) => {
    if (notification.kind === 'E') {
      acc.errors.push(notification.error);
      return { ...acc, lastError: notification.error };
    }
    if (notification.kind === 'N') {
      acc.values.push(notification.value);
    }
    return acc;
  }, { values: [], errors: [], lastError: null }),
  map(state => {
    // Convert back to notification
    if (state.lastError) {
      return { kind: 'N', value: { error: state.lastError } };
    }
    return { kind: 'N', value: state.values[state.values.length - 1] };
  }),
  dematerialize()
).subscribe();

// Practical: Notification filtering
source$.pipe(
  materialize(),
  filter(notification => {
    // Filter out specific errors
    if (notification.kind === 'E') {
      return !isIgnorableError(notification.error);
    }
    return true;
  }),
  dematerialize()
).subscribe();

// Practical: Testing observable behavior
testScheduler.run(({ cold, expectObservable }) => {
  const source$ = cold('--a--b--#', { a: 1, b: 2 }, new Error('fail'));
  
  const materialized$ = source$.pipe(materialize());
  
  expectObservable(materialized$).toBe('--a--b--(c|)', {
    a: { kind: 'N', value: 1, hasValue: true },
    b: { kind: 'N', value: 2, hasValue: true },
    c: { kind: 'E', error: new Error('fail'), hasValue: false }
  });
});

// Practical: Stream introspection
source$.pipe(
  materialize(),
  tap(notification => {
    metrics.increment(`stream.${notification.kind}`);
    
    if (notification.kind === 'N') {
      console.log('Value emitted:', notification.value);
    } else if (notification.kind === 'E') {
      console.error('Error occurred:', notification.error);
    } else if (notification.kind === 'C') {
      console.log('Stream completed');
    }
  }),
  dematerialize()
).subscribe();

// Practical: Conditional retry based on error type
source$.pipe(
  materialize(),
  expand(notification => {
    if (notification.kind === 'E' && isRetryable(notification.error)) {
      return source$.pipe(
        delay(1000),
        materialize()
      );
    }
    return EMPTY;
  }),
  dematerialize()
).subscribe();
Note: materialize/dematerialize convert stream events to/from Notification objects. Useful for delaying errors, logging stream behavior, or manipulating stream semantics.

Section 10 Summary

  • tap performs side effects without modifying stream - perfect for logging, analytics, debugging
  • delay delays all emissions by fixed duration, delayWhen applies dynamic per-value delays
  • repeat re-subscribes on completion, repeatWhen enables conditional repetition with delays
  • sample/sampleTime emits most recent value periodically - reduces high-frequency streams
  • audit/auditTime emits trailing edge (last value after silence) - opposite of throttle
  • materialize/dematerialize converts stream events to Notification objects for metadata manipulation