Combination Operators and Stream Merging
1. merge and mergeAll for Concurrent Combination
| Operator | Syntax | Description | Emission Pattern |
|---|---|---|---|
| merge | merge(obs1$, obs2$, ...) |
Combines multiple observables concurrently - emits values as they arrive from any source | Interleaved, order by arrival time |
| obs$.pipe(merge()) | obs1$.pipe(merge(obs2$, obs3$)) |
Pipeable version - merges source with additional observables | Same as merge creation operator |
| mergeAll | higherOrder$.pipe(mergeAll(concurrent?)) |
Flattens higher-order observable by subscribing to all inner observables concurrently | Concurrent inner subscriptions |
Example: Concurrent stream combination with merge
import { merge, interval, fromEvent, of } from 'rxjs';
import { map, mergeAll, take } from 'rxjs/operators';
// merge - combine multiple sources
const fast$ = interval(500).pipe(map(x => `Fast: ${x}`), take(5));
const slow$ = interval(1000).pipe(map(x => `Slow: ${x}`), take(3));
merge(fast$, slow$).subscribe(val => console.log(val));
// Output: Fast:0, Slow:0, Fast:1, Fast:2, Slow:1, Fast:3, Fast:4, Slow:2
// Pipeable merge
of(1, 2, 3).pipe(
merge(of(4, 5, 6), of(7, 8, 9))
).subscribe(val => console.log(val)); // 1,2,3,4,5,6,7,8,9
// mergeAll - flatten higher-order observable
const clicks$ = fromEvent(document, 'click');
clicks$.pipe(
map(() => interval(1000).pipe(take(3))),
mergeAll() // Subscribe to all inner intervals concurrently
).subscribe(val => console.log(val));
// mergeAll with concurrency limit
const urls = ['url1', 'url2', 'url3', 'url4', 'url5'];
from(urls).pipe(
map(url => ajax.get(url)),
mergeAll(2) // Max 2 concurrent HTTP requests
).subscribe(response => console.log(response));
// Practical: combine multiple data sources
const user$ = this.http.get('/api/user');
const settings$ = this.http.get('/api/settings');
const notifications$ = this.http.get('/api/notifications');
merge(user$, settings$, notifications$).subscribe(
data => console.log('Data arrived:', data)
);
// Practical: multiple event sources
const keyPress$ = fromEvent(document, 'keypress').pipe(mapTo('key'));
const mouseClick$ = fromEvent(document, 'click').pipe(mapTo('click'));
const touchTap$ = fromEvent(document, 'touchstart').pipe(mapTo('touch'));
merge(keyPress$, mouseClick$, touchTap$).pipe(
scan((count) => count + 1, 0)
).subscribe(count => console.log('Total interactions:', count));
2. concat and concatAll for Sequential Combination
| Operator | Syntax | Description | Emission Pattern |
|---|---|---|---|
| concat | concat(obs1$, obs2$, ...) |
Combines observables sequentially - waits for each to complete before subscribing to next | Sequential, preserves order |
| obs$.pipe(concat()) | obs1$.pipe(concat(obs2$, obs3$)) |
Pipeable version - concatenates after source completes | Source first, then additional streams |
| concatAll | higherOrder$.pipe(concatAll()) |
Flattens higher-order observable by subscribing to inner observables one at a time | Queue-based sequential processing |
Example: Sequential stream combination with concat
import { concat, of, interval } from 'rxjs';
import { concatAll, take, delay } from 'rxjs/operators';
// concat - sequential execution
const first$ = of(1, 2, 3);
const second$ = of(4, 5, 6);
const third$ = of(7, 8, 9);
concat(first$, second$, third$).subscribe(val => console.log(val));
// Output: 1,2,3,4,5,6,7,8,9 (in order)
// concat waits for completion
const slow$ = interval(1000).pipe(take(3));
const fast$ = of('a', 'b', 'c');
concat(slow$, fast$).subscribe(val => console.log(val));
// Output: 0,1,2 (over 3 seconds), then immediately a,b,c
// Pipeable concat
of(1, 2, 3).pipe(
concat(of(4, 5, 6))
).subscribe(val => console.log(val));
// concatAll - sequential flattening
of(
of(1, 2, 3),
of(4, 5, 6),
of(7, 8, 9)
).pipe(
concatAll()
).subscribe(val => console.log(val)); // 1,2,3,4,5,6,7,8,9
// Practical: sequential API calls
const deleteUser = (id) => this.http.delete(`/api/users/${id}`);
const userIds = [1, 2, 3];
from(userIds).pipe(
map(id => deleteUser(id)),
concatAll() // Delete one at a time, in order
).subscribe(
result => console.log('Deleted:', result),
err => console.error('Error:', err)
);
// Practical: animation sequence
const animations = [
animate('fadeIn', 500),
animate('slideUp', 300),
animate('bounce', 400)
];
concat(...animations).subscribe(
animation => console.log('Playing:', animation),
null,
() => console.log('Animation sequence complete')
);
// Practical: onboarding flow
const step1$ = showWelcome().pipe(delay(2000));
const step2$ = showFeatures().pipe(delay(3000));
const step3$ = showTutorial().pipe(delay(2000));
concat(step1$, step2$, step3$).subscribe(
step => displayStep(step),
null,
() => completeOnboarding()
);
Note: merge for concurrent (parallel), concat for sequential (queue). Use concat when order matters or operations must not
overlap.
3. combineLatest and combineLatestWith Multi-stream Coordination
| Operator | Syntax | Description | Emission Trigger |
|---|---|---|---|
| combineLatest | combineLatest([obs1$, obs2$, ...]) |
Waits for all sources to emit once, then emits array with latest from each on any change | Any source emits (after all emitted once) |
| combineLatest (dict) | combineLatest({key1: obs1$, key2: obs2$}) |
Object version - emits object with latest values keyed by property names | Any source emits (after all emitted once) |
| combineLatestWith | obs1$.pipe(combineLatestWith(obs2$, obs3$)) |
Pipeable version - combines source with additional observables | Same as combineLatest |
Example: Multi-stream coordination with combineLatest
import { combineLatest, of, interval } from 'rxjs';
import { combineLatestWith, take, map } from 'rxjs/operators';
// combineLatest - array form
const age$ = of(25, 26, 27);
const name$ = of('Alice', 'Bob');
combineLatest([age$, name$]).subscribe(
([age, name]) => console.log(`${name} is ${age}`)
);
// Bob is 25, Bob is 26, Bob is 27
// combineLatest - object form (recommended)
const firstName$ = of('John');
const lastName$ = of('Doe', 'Smith');
const age2$ = of(30, 31);
combineLatest({
firstName: firstName$,
lastName: lastName$,
age: age2$
}).subscribe(user => console.log(user));
// {firstName: 'John', lastName: 'Doe', age: 30}
// {firstName: 'John', lastName: 'Doe', age: 31}
// {firstName: 'John', lastName: 'Smith', age: 31}
// combineLatestWith - pipeable
interval(1000).pipe(
take(3),
combineLatestWith(interval(1500).pipe(take(2)))
).subscribe(([a, b]) => console.log(`a: ${a}, b: ${b}`));
// Practical: form validation
const username$ = fromEvent(usernameInput, 'input').pipe(
map(e => e.target.value)
);
const password$ = fromEvent(passwordInput, 'input').pipe(
map(e => e.target.value)
);
const terms$ = fromEvent(termsCheckbox, 'change').pipe(
map(e => e.target.checked)
);
combineLatest({
username: username$,
password: password$,
terms: terms$
}).pipe(
map(form => ({
...form,
valid: form.username.length > 3 &&
form.password.length > 8 &&
form.terms
}))
).subscribe(form => {
submitButton.disabled = !form.valid;
});
// Practical: derived data from multiple sources
const temperature$ = sensorData$.pipe(map(d => d.temp));
const humidity$ = sensorData$.pipe(map(d => d.humidity));
const pressure$ = sensorData$.pipe(map(d => d.pressure));
combineLatest({
temp: temperature$,
humidity: humidity$,
pressure: pressure$
}).pipe(
map(readings => calculateComfortIndex(readings))
).subscribe(index => displayComfortLevel(index));
// Practical: multi-filter search
const nameFilter$ = new BehaviorSubject('');
const categoryFilter$ = new BehaviorSubject('all');
const priceRange$ = new BehaviorSubject({ min: 0, max: 1000 });
combineLatest([nameFilter$, categoryFilter$, priceRange$]).pipe(
debounceTime(300),
switchMap(([name, category, price]) =>
this.http.get('/api/products', {
params: { name, category, minPrice: price.min, maxPrice: price.max }
})
)
).subscribe(products => displayProducts(products));
4. withLatestFrom for Auxiliary Stream Integration
| Operator | Syntax | Description | Emission Trigger |
|---|---|---|---|
| withLatestFrom | source$.pipe(withLatestFrom(other$)) |
When source emits, combine with latest value from other observable(s) | Only source emissions (not other) |
| Multiple sources | withLatestFrom(obs1$, obs2$, obs3$) |
Combines with latest from multiple auxiliary streams | Only source emissions |
Example: Auxiliary stream integration with withLatestFrom
import { fromEvent, interval, BehaviorSubject } from 'rxjs';
import { withLatestFrom, map, take } from 'rxjs/operators';
// withLatestFrom - basic usage
const clicks$ = fromEvent(button, 'click');
const timer$ = interval(1000);
clicks$.pipe(
withLatestFrom(timer$)
).subscribe(([click, time]) => console.log(`Click at time: ${time}`));
// Only emits when button clicked, includes latest timer value
// Compare with combineLatest
const source$ = interval(1000).pipe(take(3));
const other$ = interval(500).pipe(take(5));
// combineLatest - emits on ANY change
combineLatest([source$, other$]).subscribe(
([s, o]) => console.log(`combineLatest: ${s}, ${o}`)
);
// withLatestFrom - emits only on source change
source$.pipe(
withLatestFrom(other$)
).subscribe(
([s, o]) => console.log(`withLatestFrom: ${s}, ${o}`)
);
// withLatestFrom multiple sources
const userAction$ = fromEvent(actionButton, 'click');
const userState$ = new BehaviorSubject({ id: 1, role: 'user' });
const appConfig$ = new BehaviorSubject({ theme: 'dark', locale: 'en' });
userAction$.pipe(
withLatestFrom(userState$, appConfig$)
).subscribe(([event, user, config]) => {
console.log('Action by:', user.id);
console.log('Theme:', config.theme);
performAction(event, user, config);
});
// Practical: save with current state
const saveButton$ = fromEvent(saveBtn, 'click');
const formState$ = new BehaviorSubject({});
const userId$ = new BehaviorSubject(null);
saveButton$.pipe(
withLatestFrom(formState$, userId$),
switchMap(([_, formData, userId]) =>
this.http.post('/api/save', { userId, data: formData })
)
).subscribe(result => showSuccessMessage(result));
// Practical: enriched analytics
const clickEvent$ = fromEvent(document, 'click');
const currentPage$ = new BehaviorSubject('/home');
const sessionData$ = new BehaviorSubject({ sessionId: 'abc123' });
clickEvent$.pipe(
withLatestFrom(currentPage$, sessionData$),
map(([event, page, session]) => ({
type: 'click',
element: event.target.tagName,
page: page,
sessionId: session.sessionId,
timestamp: Date.now()
}))
).subscribe(analyticsData => sendToAnalytics(analyticsData));
// Practical: game input with state
const keyPress$ = fromEvent(document, 'keydown');
const playerState$ = new BehaviorSubject({
x: 0, y: 0, health: 100, ammo: 50
});
keyPress$.pipe(
withLatestFrom(playerState$),
map(([event, state]) => processInput(event.key, state))
).subscribe(newState => playerState$.next(newState));
Note: combineLatest emits when ANY source changes. withLatestFrom emits ONLY when the source emits, sampling others.
5. zip and zipWith for Paired Value Emission
| Operator | Syntax | Description | Emission Pattern |
|---|---|---|---|
| zip | zip([obs1$, obs2$, ...]) |
Pairs values by index - emits when all sources have emitted at that position | Synchronized by index, slowest determines rate |
| zipWith | obs1$.pipe(zipWith(obs2$, obs3$)) |
Pipeable version - zips source with additional observables | Same as zip |
Example: Paired value emission with zip
import { zip, interval, of } from 'rxjs';
import { zipWith, take, map } from 'rxjs/operators';
// zip - pair by index
const numbers$ = of(1, 2, 3, 4, 5);
const letters$ = of('a', 'b', 'c');
zip([numbers$, letters$]).subscribe(
([num, letter]) => console.log(`${num}${letter}`)
);
// Output: 1a, 2b, 3c (stops when letters$ completes)
// zip with different speeds
const fast$ = interval(500).pipe(take(5));
const slow$ = interval(1000).pipe(take(3));
zip([fast$, slow$]).subscribe(
([f, s]) => console.log(`Fast: ${f}, Slow: ${s}`)
);
// Emits at slow rate: [0,0], [1,1], [2,2]
// zipWith - pipeable form
of(1, 2, 3).pipe(
zipWith(of('a', 'b', 'c'), of('X', 'Y', 'Z'))
).subscribe(
([num, lower, upper]) => console.log(`${num}${lower}${upper}`)
); // 1aX, 2bY, 3cZ
// Practical: parallel API calls with pairing
const userIds$ = of(1, 2, 3);
const users$ = userIds$.pipe(
mergeMap(id => this.http.get(`/api/users/${id}`))
);
const posts$ = userIds$.pipe(
mergeMap(id => this.http.get(`/api/posts/${id}`))
);
zip([users$, posts$]).subscribe(
([user, posts]) => displayUserWithPosts(user, posts)
);
// Practical: coordinate animations
const element1Anim$ = animate(element1, 'fadeIn', 1000);
const element2Anim$ = animate(element2, 'slideUp', 1500);
const element3Anim$ = animate(element3, 'zoomIn', 800);
zip([element1Anim$, element2Anim$, element3Anim$]).subscribe(
() => console.log('All animations complete')
);
// Practical: batch processing with paired data
const tasks$ = from(['task1', 'task2', 'task3']);
const resources$ = from([{ cpu: 1 }, { cpu: 2 }, { cpu: 3 }]);
zip([tasks$, resources$]).pipe(
concatMap(([task, resource]) => processTask(task, resource))
).subscribe(result => console.log('Processed:', result));
// Practical: test data generation
const timestamps$ = interval(100).pipe(take(5));
const values$ = of(10, 20, 30, 40, 50);
const labels$ = of('A', 'B', 'C', 'D', 'E');
zip([timestamps$, values$, labels$]).pipe(
map(([time, value, label]) => ({ time, value, label }))
).subscribe(dataPoint => chartData.push(dataPoint));
Warning:
zip completes when the shortest observable completes. Unused values from
longer observables are discarded.
6. startWith and endWith for Stream Initialization/Termination
| Operator | Syntax | Description | Use Case |
|---|---|---|---|
| startWith | startWith(...values) |
Emits specified values before source observable emissions begin | Initial/default values, loading states |
| endWith | endWith(...values) |
Emits specified values after source observable completes | Final/sentinel values, completion markers |
Example: Stream initialization and termination
import { of, fromEvent, interval } from 'rxjs';
import { startWith, endWith, map, take, scan } from 'rxjs/operators';
// startWith - provide initial value
of(1, 2, 3).pipe(
startWith(0)
).subscribe(val => console.log(val)); // 0, 1, 2, 3
// startWith multiple values
of('b', 'c').pipe(
startWith('a', 'a.5')
).subscribe(val => console.log(val)); // a, a.5, b, c
// endWith - append final values
of(1, 2, 3).pipe(
endWith(4, 5)
).subscribe(val => console.log(val)); // 1, 2, 3, 4, 5
// Combined startWith and endWith
of('middle').pipe(
startWith('start'),
endWith('end')
).subscribe(val => console.log(val)); // start, middle, end
// Practical: loading state management
this.http.get('/api/data').pipe(
map(data => ({ loading: false, data })),
startWith({ loading: true, data: null })
).subscribe(state => {
if (state.loading) {
showSpinner();
} else {
hideSpinner();
displayData(state.data);
}
});
// Practical: search with empty initial state
const searchQuery$ = fromEvent(searchInput, 'input').pipe(
map(e => e.target.value),
debounceTime(300)
);
searchQuery$.pipe(
startWith(''), // Empty search on load
switchMap(query =>
query ? this.http.get(`/api/search?q=${query}`) : of([])
)
).subscribe(results => displayResults(results));
// Practical: counter with initial value
fromEvent(incrementBtn, 'click').pipe(
scan(count => count + 1, 0),
startWith(0) // Show initial count
).subscribe(count => updateCountDisplay(count));
// Practical: data stream with header/footer
const dataRows$ = from([
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' },
{ id: 3, name: 'Charlie' }
]);
dataRows$.pipe(
startWith({ type: 'header', columns: ['ID', 'Name'] }),
endWith({ type: 'footer', total: 3 })
).subscribe(row => {
if (row.type === 'header') {
renderHeader(row.columns);
} else if (row.type === 'footer') {
renderFooter(row.total);
} else {
renderRow(row);
}
});
// Practical: event stream with markers
interval(1000).pipe(
take(5),
map(n => `Event ${n}`),
startWith('Stream started'),
endWith('Stream completed')
).subscribe(msg => console.log(msg));
// Practical: form with defaults
const formValue$ = formControl.valueChanges;
formValue$.pipe(
startWith({ name: '', email: '', age: 0 }) // Default values
).subscribe(value => validateForm(value));
Section 5 Summary
- merge combines streams concurrently (interleaved), concat combines sequentially (queued)
- combineLatest emits array/object when ANY source changes (after all emit once)
- withLatestFrom emits only when source emits, sampling latest from others
- zip pairs values by index position, rate limited by slowest source
- startWith provides initial values, endWith appends final values
- Choose: merge (parallel), concat (sequential), combineLatest (sync all), withLatestFrom (main+aux), zip (paired)