Combination Operators and Stream Merging

1. merge and mergeAll for Concurrent Combination

Operator Syntax Description Emission Pattern
merge merge(obs1$, obs2$, ...) Combines multiple observables concurrently - emits values as they arrive from any source Interleaved, order by arrival time
obs$.pipe(merge()) obs1$.pipe(merge(obs2$, obs3$)) Pipeable version - merges source with additional observables Same as merge creation operator
mergeAll higherOrder$.pipe(mergeAll(concurrent?)) Flattens higher-order observable by subscribing to all inner observables concurrently Concurrent inner subscriptions

Example: Concurrent stream combination with merge

import { merge, interval, fromEvent, of } from 'rxjs';
import { map, mergeAll, take } from 'rxjs/operators';

// merge - combine multiple sources
const fast$ = interval(500).pipe(map(x => `Fast: ${x}`), take(5));
const slow$ = interval(1000).pipe(map(x => `Slow: ${x}`), take(3));
merge(fast$, slow$).subscribe(val => console.log(val));
// Output: Fast:0, Slow:0, Fast:1, Fast:2, Slow:1, Fast:3, Fast:4, Slow:2

// Pipeable merge
of(1, 2, 3).pipe(
  merge(of(4, 5, 6), of(7, 8, 9))
).subscribe(val => console.log(val)); // 1,2,3,4,5,6,7,8,9

// mergeAll - flatten higher-order observable
const clicks$ = fromEvent(document, 'click');
clicks$.pipe(
  map(() => interval(1000).pipe(take(3))),
  mergeAll() // Subscribe to all inner intervals concurrently
).subscribe(val => console.log(val));

// mergeAll with concurrency limit
const urls = ['url1', 'url2', 'url3', 'url4', 'url5'];
from(urls).pipe(
  map(url => ajax.get(url)),
  mergeAll(2) // Max 2 concurrent HTTP requests
).subscribe(response => console.log(response));

// Practical: combine multiple data sources
const user$ = this.http.get('/api/user');
const settings$ = this.http.get('/api/settings');
const notifications$ = this.http.get('/api/notifications');

merge(user$, settings$, notifications$).subscribe(
  data => console.log('Data arrived:', data)
);

// Practical: multiple event sources
const keyPress$ = fromEvent(document, 'keypress').pipe(mapTo('key'));
const mouseClick$ = fromEvent(document, 'click').pipe(mapTo('click'));
const touchTap$ = fromEvent(document, 'touchstart').pipe(mapTo('touch'));

merge(keyPress$, mouseClick$, touchTap$).pipe(
  scan((count) => count + 1, 0)
).subscribe(count => console.log('Total interactions:', count));

2. concat and concatAll for Sequential Combination

Operator Syntax Description Emission Pattern
concat concat(obs1$, obs2$, ...) Combines observables sequentially - waits for each to complete before subscribing to next Sequential, preserves order
obs$.pipe(concat()) obs1$.pipe(concat(obs2$, obs3$)) Pipeable version - concatenates after source completes Source first, then additional streams
concatAll higherOrder$.pipe(concatAll()) Flattens higher-order observable by subscribing to inner observables one at a time Queue-based sequential processing

Example: Sequential stream combination with concat

import { concat, of, interval } from 'rxjs';
import { concatAll, take, delay } from 'rxjs/operators';

// concat - sequential execution
const first$ = of(1, 2, 3);
const second$ = of(4, 5, 6);
const third$ = of(7, 8, 9);
concat(first$, second$, third$).subscribe(val => console.log(val));
// Output: 1,2,3,4,5,6,7,8,9 (in order)

// concat waits for completion
const slow$ = interval(1000).pipe(take(3));
const fast$ = of('a', 'b', 'c');
concat(slow$, fast$).subscribe(val => console.log(val));
// Output: 0,1,2 (over 3 seconds), then immediately a,b,c

// Pipeable concat
of(1, 2, 3).pipe(
  concat(of(4, 5, 6))
).subscribe(val => console.log(val));

// concatAll - sequential flattening
of(
  of(1, 2, 3),
  of(4, 5, 6),
  of(7, 8, 9)
).pipe(
  concatAll()
).subscribe(val => console.log(val)); // 1,2,3,4,5,6,7,8,9

// Practical: sequential API calls
const deleteUser = (id) => this.http.delete(`/api/users/${id}`);
const userIds = [1, 2, 3];
from(userIds).pipe(
  map(id => deleteUser(id)),
  concatAll() // Delete one at a time, in order
).subscribe(
  result => console.log('Deleted:', result),
  err => console.error('Error:', err)
);

// Practical: animation sequence
const animations = [
  animate('fadeIn', 500),
  animate('slideUp', 300),
  animate('bounce', 400)
];
concat(...animations).subscribe(
  animation => console.log('Playing:', animation),
  null,
  () => console.log('Animation sequence complete')
);

// Practical: onboarding flow
const step1$ = showWelcome().pipe(delay(2000));
const step2$ = showFeatures().pipe(delay(3000));
const step3$ = showTutorial().pipe(delay(2000));
concat(step1$, step2$, step3$).subscribe(
  step => displayStep(step),
  null,
  () => completeOnboarding()
);
Note: merge for concurrent (parallel), concat for sequential (queue). Use concat when order matters or operations must not overlap.

3. combineLatest and combineLatestWith Multi-stream Coordination

Operator Syntax Description Emission Trigger
combineLatest combineLatest([obs1$, obs2$, ...]) Waits for all sources to emit once, then emits array with latest from each on any change Any source emits (after all emitted once)
combineLatest (dict) combineLatest({key1: obs1$, key2: obs2$}) Object version - emits object with latest values keyed by property names Any source emits (after all emitted once)
combineLatestWith obs1$.pipe(combineLatestWith(obs2$, obs3$)) Pipeable version - combines source with additional observables Same as combineLatest

Example: Multi-stream coordination with combineLatest

import { combineLatest, of, interval } from 'rxjs';
import { combineLatestWith, take, map } from 'rxjs/operators';

// combineLatest - array form
const age$ = of(25, 26, 27);
const name$ = of('Alice', 'Bob');
combineLatest([age$, name$]).subscribe(
  ([age, name]) => console.log(`${name} is ${age}`)
);
// Bob is 25, Bob is 26, Bob is 27

// combineLatest - object form (recommended)
const firstName$ = of('John');
const lastName$ = of('Doe', 'Smith');
const age2$ = of(30, 31);
combineLatest({
  firstName: firstName$,
  lastName: lastName$,
  age: age2$
}).subscribe(user => console.log(user));
// {firstName: 'John', lastName: 'Doe', age: 30}
// {firstName: 'John', lastName: 'Doe', age: 31}
// {firstName: 'John', lastName: 'Smith', age: 31}

// combineLatestWith - pipeable
interval(1000).pipe(
  take(3),
  combineLatestWith(interval(1500).pipe(take(2)))
).subscribe(([a, b]) => console.log(`a: ${a}, b: ${b}`));

// Practical: form validation
const username$ = fromEvent(usernameInput, 'input').pipe(
  map(e => e.target.value)
);
const password$ = fromEvent(passwordInput, 'input').pipe(
  map(e => e.target.value)
);
const terms$ = fromEvent(termsCheckbox, 'change').pipe(
  map(e => e.target.checked)
);

combineLatest({
  username: username$,
  password: password$,
  terms: terms$
}).pipe(
  map(form => ({
    ...form,
    valid: form.username.length > 3 && 
           form.password.length > 8 && 
           form.terms
  }))
).subscribe(form => {
  submitButton.disabled = !form.valid;
});

// Practical: derived data from multiple sources
const temperature$ = sensorData$.pipe(map(d => d.temp));
const humidity$ = sensorData$.pipe(map(d => d.humidity));
const pressure$ = sensorData$.pipe(map(d => d.pressure));

combineLatest({
  temp: temperature$,
  humidity: humidity$,
  pressure: pressure$
}).pipe(
  map(readings => calculateComfortIndex(readings))
).subscribe(index => displayComfortLevel(index));

// Practical: multi-filter search
const nameFilter$ = new BehaviorSubject('');
const categoryFilter$ = new BehaviorSubject('all');
const priceRange$ = new BehaviorSubject({ min: 0, max: 1000 });

combineLatest([nameFilter$, categoryFilter$, priceRange$]).pipe(
  debounceTime(300),
  switchMap(([name, category, price]) => 
    this.http.get('/api/products', { 
      params: { name, category, minPrice: price.min, maxPrice: price.max }
    })
  )
).subscribe(products => displayProducts(products));

4. withLatestFrom for Auxiliary Stream Integration

Operator Syntax Description Emission Trigger
withLatestFrom source$.pipe(withLatestFrom(other$)) When source emits, combine with latest value from other observable(s) Only source emissions (not other)
Multiple sources withLatestFrom(obs1$, obs2$, obs3$) Combines with latest from multiple auxiliary streams Only source emissions

Example: Auxiliary stream integration with withLatestFrom

import { fromEvent, interval, BehaviorSubject } from 'rxjs';
import { withLatestFrom, map, take } from 'rxjs/operators';

// withLatestFrom - basic usage
const clicks$ = fromEvent(button, 'click');
const timer$ = interval(1000);
clicks$.pipe(
  withLatestFrom(timer$)
).subscribe(([click, time]) => console.log(`Click at time: ${time}`));
// Only emits when button clicked, includes latest timer value

// Compare with combineLatest
const source$ = interval(1000).pipe(take(3));
const other$ = interval(500).pipe(take(5));

// combineLatest - emits on ANY change
combineLatest([source$, other$]).subscribe(
  ([s, o]) => console.log(`combineLatest: ${s}, ${o}`)
);

// withLatestFrom - emits only on source change
source$.pipe(
  withLatestFrom(other$)
).subscribe(
  ([s, o]) => console.log(`withLatestFrom: ${s}, ${o}`)
);

// withLatestFrom multiple sources
const userAction$ = fromEvent(actionButton, 'click');
const userState$ = new BehaviorSubject({ id: 1, role: 'user' });
const appConfig$ = new BehaviorSubject({ theme: 'dark', locale: 'en' });

userAction$.pipe(
  withLatestFrom(userState$, appConfig$)
).subscribe(([event, user, config]) => {
  console.log('Action by:', user.id);
  console.log('Theme:', config.theme);
  performAction(event, user, config);
});

// Practical: save with current state
const saveButton$ = fromEvent(saveBtn, 'click');
const formState$ = new BehaviorSubject({});
const userId$ = new BehaviorSubject(null);

saveButton$.pipe(
  withLatestFrom(formState$, userId$),
  switchMap(([_, formData, userId]) => 
    this.http.post('/api/save', { userId, data: formData })
  )
).subscribe(result => showSuccessMessage(result));

// Practical: enriched analytics
const clickEvent$ = fromEvent(document, 'click');
const currentPage$ = new BehaviorSubject('/home');
const sessionData$ = new BehaviorSubject({ sessionId: 'abc123' });

clickEvent$.pipe(
  withLatestFrom(currentPage$, sessionData$),
  map(([event, page, session]) => ({
    type: 'click',
    element: event.target.tagName,
    page: page,
    sessionId: session.sessionId,
    timestamp: Date.now()
  }))
).subscribe(analyticsData => sendToAnalytics(analyticsData));

// Practical: game input with state
const keyPress$ = fromEvent(document, 'keydown');
const playerState$ = new BehaviorSubject({ 
  x: 0, y: 0, health: 100, ammo: 50 
});

keyPress$.pipe(
  withLatestFrom(playerState$),
  map(([event, state]) => processInput(event.key, state))
).subscribe(newState => playerState$.next(newState));
Note: combineLatest emits when ANY source changes. withLatestFrom emits ONLY when the source emits, sampling others.

5. zip and zipWith for Paired Value Emission

Operator Syntax Description Emission Pattern
zip zip([obs1$, obs2$, ...]) Pairs values by index - emits when all sources have emitted at that position Synchronized by index, slowest determines rate
zipWith obs1$.pipe(zipWith(obs2$, obs3$)) Pipeable version - zips source with additional observables Same as zip

Example: Paired value emission with zip

import { zip, interval, of } from 'rxjs';
import { zipWith, take, map } from 'rxjs/operators';

// zip - pair by index
const numbers$ = of(1, 2, 3, 4, 5);
const letters$ = of('a', 'b', 'c');
zip([numbers$, letters$]).subscribe(
  ([num, letter]) => console.log(`${num}${letter}`)
);
// Output: 1a, 2b, 3c (stops when letters$ completes)

// zip with different speeds
const fast$ = interval(500).pipe(take(5));
const slow$ = interval(1000).pipe(take(3));
zip([fast$, slow$]).subscribe(
  ([f, s]) => console.log(`Fast: ${f}, Slow: ${s}`)
);
// Emits at slow rate: [0,0], [1,1], [2,2]

// zipWith - pipeable form
of(1, 2, 3).pipe(
  zipWith(of('a', 'b', 'c'), of('X', 'Y', 'Z'))
).subscribe(
  ([num, lower, upper]) => console.log(`${num}${lower}${upper}`)
); // 1aX, 2bY, 3cZ

// Practical: parallel API calls with pairing
const userIds$ = of(1, 2, 3);
const users$ = userIds$.pipe(
  mergeMap(id => this.http.get(`/api/users/${id}`))
);
const posts$ = userIds$.pipe(
  mergeMap(id => this.http.get(`/api/posts/${id}`))
);

zip([users$, posts$]).subscribe(
  ([user, posts]) => displayUserWithPosts(user, posts)
);

// Practical: coordinate animations
const element1Anim$ = animate(element1, 'fadeIn', 1000);
const element2Anim$ = animate(element2, 'slideUp', 1500);
const element3Anim$ = animate(element3, 'zoomIn', 800);

zip([element1Anim$, element2Anim$, element3Anim$]).subscribe(
  () => console.log('All animations complete')
);

// Practical: batch processing with paired data
const tasks$ = from(['task1', 'task2', 'task3']);
const resources$ = from([{ cpu: 1 }, { cpu: 2 }, { cpu: 3 }]);

zip([tasks$, resources$]).pipe(
  concatMap(([task, resource]) => processTask(task, resource))
).subscribe(result => console.log('Processed:', result));

// Practical: test data generation
const timestamps$ = interval(100).pipe(take(5));
const values$ = of(10, 20, 30, 40, 50);
const labels$ = of('A', 'B', 'C', 'D', 'E');

zip([timestamps$, values$, labels$]).pipe(
  map(([time, value, label]) => ({ time, value, label }))
).subscribe(dataPoint => chartData.push(dataPoint));
Warning: zip completes when the shortest observable completes. Unused values from longer observables are discarded.

6. startWith and endWith for Stream Initialization/Termination

Operator Syntax Description Use Case
startWith startWith(...values) Emits specified values before source observable emissions begin Initial/default values, loading states
endWith endWith(...values) Emits specified values after source observable completes Final/sentinel values, completion markers

Example: Stream initialization and termination

import { of, fromEvent, interval } from 'rxjs';
import { startWith, endWith, map, take, scan } from 'rxjs/operators';

// startWith - provide initial value
of(1, 2, 3).pipe(
  startWith(0)
).subscribe(val => console.log(val)); // 0, 1, 2, 3

// startWith multiple values
of('b', 'c').pipe(
  startWith('a', 'a.5')
).subscribe(val => console.log(val)); // a, a.5, b, c

// endWith - append final values
of(1, 2, 3).pipe(
  endWith(4, 5)
).subscribe(val => console.log(val)); // 1, 2, 3, 4, 5

// Combined startWith and endWith
of('middle').pipe(
  startWith('start'),
  endWith('end')
).subscribe(val => console.log(val)); // start, middle, end

// Practical: loading state management
this.http.get('/api/data').pipe(
  map(data => ({ loading: false, data })),
  startWith({ loading: true, data: null })
).subscribe(state => {
  if (state.loading) {
    showSpinner();
  } else {
    hideSpinner();
    displayData(state.data);
  }
});

// Practical: search with empty initial state
const searchQuery$ = fromEvent(searchInput, 'input').pipe(
  map(e => e.target.value),
  debounceTime(300)
);

searchQuery$.pipe(
  startWith(''), // Empty search on load
  switchMap(query => 
    query ? this.http.get(`/api/search?q=${query}`) : of([])
  )
).subscribe(results => displayResults(results));

// Practical: counter with initial value
fromEvent(incrementBtn, 'click').pipe(
  scan(count => count + 1, 0),
  startWith(0) // Show initial count
).subscribe(count => updateCountDisplay(count));

// Practical: data stream with header/footer
const dataRows$ = from([
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' },
  { id: 3, name: 'Charlie' }
]);

dataRows$.pipe(
  startWith({ type: 'header', columns: ['ID', 'Name'] }),
  endWith({ type: 'footer', total: 3 })
).subscribe(row => {
  if (row.type === 'header') {
    renderHeader(row.columns);
  } else if (row.type === 'footer') {
    renderFooter(row.total);
  } else {
    renderRow(row);
  }
});

// Practical: event stream with markers
interval(1000).pipe(
  take(5),
  map(n => `Event ${n}`),
  startWith('Stream started'),
  endWith('Stream completed')
).subscribe(msg => console.log(msg));

// Practical: form with defaults
const formValue$ = formControl.valueChanges;
formValue$.pipe(
  startWith({ name: '', email: '', age: 0 }) // Default values
).subscribe(value => validateForm(value));

Section 5 Summary

  • merge combines streams concurrently (interleaved), concat combines sequentially (queued)
  • combineLatest emits array/object when ANY source changes (after all emit once)
  • withLatestFrom emits only when source emits, sampling latest from others
  • zip pairs values by index position, rate limited by slowest source
  • startWith provides initial values, endWith appends final values
  • Choose: merge (parallel), concat (sequential), combineLatest (sync all), withLatestFrom (main+aux), zip (paired)