Filtering Operators Complete Guide

1. filter for Conditional Value Filtering

Operator Syntax Description Use Case
filter filter(predicate) Emits only values that satisfy predicate function (returns true) Conditional filtering, validation, data cleanup
filter with index filter((val, idx) => condition) Predicate receives value and index for position-based filtering Skip every nth item, index-based logic

Example: Conditional filtering patterns

import { of, fromEvent } from 'rxjs';
import { filter, map } from 'rxjs/operators';

// Basic filter - even numbers only
of(1, 2, 3, 4, 5, 6).pipe(
  filter(n => n % 2 === 0)
).subscribe(val => console.log(val)); // 2, 4, 6

// Filter objects by property
of(
  { name: 'Alice', age: 25 },
  { name: 'Bob', age: 17 },
  { name: 'Charlie', age: 30 }
).pipe(
  filter(user => user.age >= 18)
).subscribe(user => console.log(user.name)); // Alice, Charlie

// Filter with index
of(10, 20, 30, 40, 50).pipe(
  filter((val, idx) => idx % 2 === 0) // Every other item
).subscribe(val => console.log(val)); // 10, 30, 50

// Filter null/undefined values
of(1, null, 2, undefined, 3, null).pipe(
  filter(val => val != null) // != catches both null and undefined
).subscribe(val => console.log(val)); // 1, 2, 3

// Practical: filter valid keyboard inputs
fromEvent(input, 'keyup').pipe(
  map(e => e.target.value),
  filter(text => text.length >= 3), // Min 3 characters
  filter(text => /^[a-zA-Z0-9]+$/.test(text)) // Alphanumeric only
).subscribe(validInput => search(validInput));

// Practical: filter successful HTTP responses
this.http.get('/api/data').pipe(
  filter(response => response.status === 200),
  map(response => response.body)
).subscribe(data => processData(data));

2. take, takeUntil, and takeWhile Operators

Operator Syntax Description Completion
take take(count) Emits first N values, then completes After N emissions
takeUntil takeUntil(notifier$) Emits until notifier observable emits, then completes When notifier emits
takeWhile takeWhile(predicate, inclusive?) Emits while predicate is true, completes on first false When predicate returns false
takeLast takeLast(count) Emits last N values when source completes When source completes

Example: Take operator variants

import { interval, fromEvent, Subject } from 'rxjs';
import { take, takeUntil, takeWhile, takeLast } from 'rxjs/operators';

// take - first N values
interval(1000).pipe(
  take(5)
).subscribe(val => console.log(val)); // 0, 1, 2, 3, 4, then completes

// takeUntil - common cleanup pattern
const destroy$ = new Subject<void>();
interval(500).pipe(
  takeUntil(destroy$)
).subscribe(val => console.log(val));
// Later: destroy$.next() stops the interval

// takeWhile - conditional stopping
interval(500).pipe(
  takeWhile(n => n < 5)
).subscribe(val => console.log(val)); // 0, 1, 2, 3, 4

// takeWhile with inclusive flag
interval(500).pipe(
  takeWhile(n => n < 5, true) // Includes the failing value
).subscribe(val => console.log(val)); // 0, 1, 2, 3, 4, 5

// takeLast - buffer last N
of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
  takeLast(3)
).subscribe(val => console.log(val)); // 8, 9, 10

// Practical: stop on button click
const stop$ = fromEvent(stopButton, 'click');
interval(1000).pipe(
  takeUntil(stop$)
).subscribe(tick => updateTimer(tick));

// Practical: take until condition met
const temps$ = interval(1000).pipe(
  map(() => readTemperature())
);
temps$.pipe(
  takeWhile(temp => temp < 100, true) // Include threshold value
).subscribe(temp => console.log('Temp:', temp));

// Practical: Component lifecycle
class Component {
  private destroy$ = new Subject<void>();
  
  ngOnInit() {
    this.dataService.updates$.pipe(
      takeUntil(this.destroy$)
    ).subscribe(data => this.render(data));
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

3. first, last, and single Value Extraction

Operator Syntax Description Error Condition
first first(predicate?, defaultValue?) Emits first value (or first matching predicate), then completes Errors if empty and no default
last last(predicate?, defaultValue?) Emits last value (or last matching predicate) when source completes Errors if empty and no default
single single(predicate?) Emits single value - errors if zero or multiple emissions Errors if not exactly one emission

Example: Value extraction operators

import { of, EMPTY } from 'rxjs';
import { first, last, single } from 'rxjs/operators';

// first - get first emission
of(1, 2, 3, 4, 5).pipe(
  first()
).subscribe(val => console.log(val)); // 1

// first with predicate
of(1, 2, 3, 4, 5).pipe(
  first(n => n > 3)
).subscribe(val => console.log(val)); // 4

// first with default (prevents error on empty)
EMPTY.pipe(
  first(undefined, 'default')
).subscribe(val => console.log(val)); // 'default'

// last - get last emission
of(1, 2, 3, 4, 5).pipe(
  last()
).subscribe(val => console.log(val)); // 5

// last with predicate
of(1, 2, 3, 4, 5).pipe(
  last(n => n < 4)
).subscribe(val => console.log(val)); // 3

// single - must have exactly one emission
of(42).pipe(
  single()
).subscribe(val => console.log(val)); // 42

// single - errors on multiple values
of(1, 2, 3).pipe(
  single()
).subscribe(
  val => console.log(val),
  err => console.error('Error: Multiple values') // Triggered
);

// single with predicate
of(1, 2, 3, 4, 5).pipe(
  single(n => n === 3)
).subscribe(val => console.log(val)); // 3

// Practical: find first matching user
this.users$.pipe(
  first(user => user.id === targetId)
).subscribe(user => displayUser(user));

// Practical: get final calculation result
calculations$.pipe(
  last()
).subscribe(finalResult => saveResult(finalResult));

// Practical: ensure unique result
this.http.get('/api/config').pipe(
  single() // Validates exactly one response
).subscribe(config => applyConfig(config));
Warning: first() and last() throw errors on empty streams without default values. Use defaults or defaultIfEmpty() for safety.

4. skip, skipUntil, and skipWhile Operators

Operator Syntax Description Skip Criteria
skip skip(count) Skips first N values, emits all subsequent values First N emissions
skipUntil skipUntil(notifier$) Skips all values until notifier emits, then emits all Until notifier emits
skipWhile skipWhile(predicate) Skips values while predicate is true, emits rest (opposite of takeWhile) While predicate is true
skipLast skipLast(count) Skips last N values when source completes Last N emissions

Example: Skip operator patterns

import { interval, fromEvent, timer } from 'rxjs';
import { skip, skipUntil, skipWhile, skipLast, take } from 'rxjs/operators';

// skip - ignore first N
of(1, 2, 3, 4, 5).pipe(
  skip(2)
).subscribe(val => console.log(val)); // 3, 4, 5

// skipUntil - start emitting after trigger
const trigger$ = timer(3000); // Emit after 3 seconds
interval(500).pipe(
  skipUntil(trigger$),
  take(5)
).subscribe(val => console.log(val)); // Starts after 3s

// skipWhile - skip until condition false
of(1, 2, 3, 4, 5, 1, 2).pipe(
  skipWhile(n => n < 4)
).subscribe(val => console.log(val)); // 4, 5, 1, 2

// skipLast - omit last N
of(1, 2, 3, 4, 5, 6, 7, 8).pipe(
  skipLast(3)
).subscribe(val => console.log(val)); // 1, 2, 3, 4, 5

// Practical: skip initial loading state
this.dataService.state$.pipe(
  skip(1) // Skip initial/default state
).subscribe(state => updateUI(state));

// Practical: wait for user authentication
const authComplete$ = this.authService.authenticated$;
this.appData$.pipe(
  skipUntil(authComplete$)
).subscribe(data => loadUserData(data));

// Practical: skip warmup values
sensorReadings$.pipe(
  skipWhile(reading => reading.temperature < 20) // Skip until warmed up
).subscribe(reading => processReading(reading));

// Practical: pagination - skip previous pages
const pageSize = 10;
const page = 3;
allItems$.pipe(
  skip(pageSize * (page - 1)),
  take(pageSize)
).subscribe(items => displayPage(items));

5. distinct and distinctUntilChanged Deduplication

Operator Syntax Description Memory Impact
distinct distinct(keySelector?, flushes?) Emits only unique values never seen before in entire stream High - stores all seen values
distinctUntilChanged distinctUntilChanged(compare?) Emits only when current value differs from previous (consecutive duplicates) Low - only stores previous value
distinctUntilKeyChanged distinctUntilKeyChanged(key, compare?) Like distinctUntilChanged but compares specific object property Low - stores previous property value

Example: Deduplication strategies

import { of } from 'rxjs';
import { distinct, distinctUntilChanged, distinctUntilKeyChanged } from 'rxjs/operators';

// distinct - all unique values in entire stream
of(1, 2, 2, 3, 1, 4, 3, 5).pipe(
  distinct()
).subscribe(val => console.log(val)); // 1, 2, 3, 4, 5

// distinct with key selector
of(
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' },
  { id: 1, name: 'Alice Again' } // Duplicate id
).pipe(
  distinct(user => user.id)
).subscribe(user => console.log(user.name)); // Alice, Bob

// distinctUntilChanged - consecutive duplicates only
of(1, 1, 2, 2, 2, 3, 1, 1, 4).pipe(
  distinctUntilChanged()
).subscribe(val => console.log(val)); // 1, 2, 3, 1, 4

// distinctUntilChanged with custom comparator
of(
  { value: 1 },
  { value: 1 }, // Same value
  { value: 2 }
).pipe(
  distinctUntilChanged((prev, curr) => prev.value === curr.value)
).subscribe(obj => console.log(obj)); // First and third only

// distinctUntilKeyChanged - compare object property
of(
  { status: 'loading' },
  { status: 'loading' }, // Duplicate
  { status: 'success' },
  { status: 'success' }  // Duplicate
).pipe(
  distinctUntilKeyChanged('status')
).subscribe(state => console.log(state.status)); // loading, success

// Practical: search input deduplication
fromEvent(searchInput, 'input').pipe(
  map(e => e.target.value),
  distinctUntilChanged(), // Only when value actually changes
  debounceTime(300)
).subscribe(query => search(query));

// Practical: state change detection
this.store.select('user').pipe(
  distinctUntilKeyChanged('userId'), // Only when userId changes
  switchMap(user => this.loadUserData(user.userId))
).subscribe(data => updateUI(data));

// Practical: unique IDs from stream
eventStream$.pipe(
  map(event => event.id),
  distinct() // Collect all unique IDs
).subscribe(uniqueId => processUniqueId(uniqueId));

// Performance note: distinct with flush
const flush$ = interval(10000); // Clear cache every 10s
dataStream$.pipe(
  distinct(item => item.id, flush$) // Periodically clear stored values
).subscribe(item => processItem(item));
Note: Use distinctUntilChanged for most cases (low memory). Use distinct only when you need global uniqueness and can manage memory.

6. debounceTime and throttleTime Rate Limiting

Operator Syntax Behavior Use Case
debounceTime debounceTime(duration) Emits value after silence period (no new values for duration ms) Search input, resize events, autocomplete
throttleTime throttleTime(duration, config?) Emits first value, ignores subsequent for duration, repeats Scroll events, button clicks, API rate limiting
auditTime auditTime(duration) Emits most recent value after duration (trailing edge) Mouse move tracking, continuous updates
sampleTime sampleTime(period) Periodically emits most recent value at fixed intervals Periodic sampling, monitoring

Example: Rate limiting strategies

import { fromEvent, interval } from 'rxjs';
import { debounceTime, throttleTime, auditTime, sampleTime, map } from 'rxjs/operators';

// debounceTime - wait for pause in emissions
fromEvent(searchInput, 'input').pipe(
  map(e => e.target.value),
  debounceTime(500) // Wait 500ms after user stops typing
).subscribe(query => searchAPI(query));

// throttleTime - first value, then silence
fromEvent(button, 'click').pipe(
  throttleTime(2000) // First click, ignore for 2s, repeat
).subscribe(() => console.log('Throttled click'));

// throttleTime with trailing edge
fromEvent(window, 'scroll').pipe(
  throttleTime(1000, { leading: true, trailing: true })
).subscribe(() => checkScrollPosition());

// auditTime - most recent value after period
fromEvent(document, 'mousemove').pipe(
  auditTime(1000) // Emit latest mouse position every 1s
).subscribe(event => updateTooltip(event));

// sampleTime - periodic sampling
const temperature$ = interval(100).pipe(
  map(() => Math.random() * 100)
);
temperature$.pipe(
  sampleTime(1000) // Sample every 1 second
).subscribe(temp => displayTemperature(temp));

// Practical comparison - resize handling
const resize$ = fromEvent(window, 'resize');

// debounceTime - wait for resize to finish
resize$.pipe(
  debounceTime(300)
).subscribe(() => recalculateLayout()); // After resize complete

// throttleTime - periodic updates during resize
resize$.pipe(
  throttleTime(100)
).subscribe(() => updatePreview()); // Every 100ms while resizing

// Practical: autocomplete with debounce
fromEvent(input, 'input').pipe(
  map(e => e.target.value),
  filter(text => text.length >= 3),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => this.http.get(`/api/search?q=${query}`))
).subscribe(results => displaySuggestions(results));

// Practical: save button with throttle
fromEvent(saveBtn, 'click').pipe(
  throttleTime(3000) // Prevent rapid saves
).subscribe(() => saveDocument());

// Practical: scroll position tracking
fromEvent(window, 'scroll').pipe(
  auditTime(100), // Latest position every 100ms
  map(() => window.scrollY)
).subscribe(scrollY => updateScrollIndicator(scrollY));

Section 4 Summary

  • filter emits only values satisfying predicate condition
  • take/takeUntil/takeWhile limit emissions from start, skip variants limit from end
  • first/last/single extract specific values with optional predicates
  • distinctUntilChanged removes consecutive duplicates (low memory cost)
  • debounceTime for user input (wait for pause), throttleTime for events (periodic limiting)
  • Rate limiting: debounce (trailing), throttle (leading), audit (trailing periodic), sample (fixed interval)