Subjects and Multicasting Patterns
1. Subject for Multicast and Imperative Emission
| Type | Syntax | Description | Characteristics |
|---|---|---|---|
| Subject | new Subject<T>() |
Both Observable and Observer - multicasts to multiple subscribers | Hot observable, no initial value, no replay |
| next() | subject.next(value) |
Imperative emission to all active subscribers | Push values manually |
| error() | subject.error(err) |
Emit error to all subscribers, terminates subject | Cannot emit after error |
| complete() | subject.complete() |
Notify completion to all subscribers, terminates subject | Cannot emit after complete |
Example: Subject basic usage and multicasting
import { Subject } from 'rxjs';
// Create Subject
const subject = new Subject<number>();
// Subscribe multiple observers
subject.subscribe(val => console.log('Observer A:', val));
subject.subscribe(val => console.log('Observer B:', val));
// Emit values imperatively
subject.next(1);
subject.next(2);
// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2
// Late subscriber misses previous emissions
subject.next(3);
subject.subscribe(val => console.log('Observer C:', val));
subject.next(4);
// Output:
// Observer A: 3, Observer B: 3
// Observer A: 4, Observer B: 4, Observer C: 4
// Subject as Observer
const observable$ = interval(1000).pipe(take(3));
observable$.subscribe(subject); // Pipe observable into subject
subject.subscribe(val => console.log('Via subject:', val));
// Practical: Event bus
class EventBus {
private subject = new Subject<any>();
emit(event: any) {
this.subject.next(event);
}
on(eventType: string) {
return this.subject.pipe(
filter(event => event.type === eventType)
);
}
}
const bus = new EventBus();
bus.on('user:login').subscribe(event => console.log('User logged in:', event.data));
bus.emit({ type: 'user:login', data: { userId: 123 } });
// Practical: Component communication
class DataService {
private dataUpdated = new Subject<any>();
dataUpdated$ = this.dataUpdated.asObservable(); // Expose as Observable
updateData(data: any) {
this.processData(data);
this.dataUpdated.next(data); // Notify subscribers
}
}
// Practical: Click stream
const clicks = new Subject<MouseEvent>();
clicks.subscribe(event => console.log('Clicked at:', event.clientX, event.clientY));
document.addEventListener('click', event => clicks.next(event));
// Complete subject to stop emissions
subject.complete();
subject.next(5); // This will not emit - subject is completed
Note: Subjects are hot observables - late subscribers don't receive previous emissions. Always
call
complete() to properly terminate and release resources.
2. BehaviorSubject for State Management with Initial Value
| Feature | Syntax | Description | Use Case |
|---|---|---|---|
| BehaviorSubject | new BehaviorSubject<T>(initialValue) |
Requires initial value, replays current value to new subscribers | State management, current values |
| getValue() | subject.getValue() |
Synchronously returns current value (not available on regular Subject) | Imperative state access |
| value property | subject.value |
Read-only property for current value (same as getValue()) | State inspection |
Example: BehaviorSubject for state management
import { BehaviorSubject } from 'rxjs';
// Create with initial value
const count$ = new BehaviorSubject<number>(0);
// New subscriber immediately receives current value
count$.subscribe(val => console.log('Subscriber A:', val)); // 0
count$.next(1);
count$.next(2);
// Output: Subscriber A: 1, Subscriber A: 2
// Late subscriber gets latest value immediately
count$.subscribe(val => console.log('Subscriber B:', val)); // 2
count$.next(3);
// Output: Subscriber A: 3, Subscriber B: 3
// Synchronous value access
console.log('Current value:', count$.getValue()); // 3
console.log('Via property:', count$.value); // 3
// Practical: User authentication state
class AuthService {
private isAuthenticatedSubject = new BehaviorSubject<boolean>(false);
isAuthenticated$ = this.isAuthenticatedSubject.asObservable();
login(credentials: any) {
// Perform login...
this.isAuthenticatedSubject.next(true);
}
logout() {
this.isAuthenticatedSubject.next(false);
}
get isAuthenticated(): boolean {
return this.isAuthenticatedSubject.value;
}
}
const auth = new AuthService();
auth.isAuthenticated$.subscribe(status =>
console.log('Auth status:', status)
); // Immediately: false
auth.login({ username: 'alice' }); // Emits: true
// Practical: Application state store
class Store<T> {
private state$: BehaviorSubject<T>;
constructor(initialState: T) {
this.state$ = new BehaviorSubject<T>(initialState);
}
select<K extends keyof T>(key: K): Observable<T[K]> {
return this.state$.pipe(
map(state => state[key]),
distinctUntilChanged()
);
}
update(partialState: Partial<T>) {
this.state$.next({ ...this.state$.value, ...partialState });
}
get snapshot(): T {
return this.state$.value;
}
}
const store = new Store({ user: null, loading: false, error: null });
store.select('loading').subscribe(loading => toggleSpinner(loading));
store.update({ loading: true });
// Practical: Form state
class FormState {
private formValue$ = new BehaviorSubject<any>({
name: '',
email: '',
age: 0
});
value$ = this.formValue$.asObservable();
updateField(field: string, value: any) {
const current = this.formValue$.value;
this.formValue$.next({ ...current, [field]: value });
}
get currentValue() {
return this.formValue$.value;
}
reset() {
this.formValue$.next({ name: '', email: '', age: 0 });
}
}
// Practical: Theme management
const theme$ = new BehaviorSubject<'light' | 'dark'>('light');
theme$.subscribe(theme => document.body.className = theme);
theme$.next('dark'); // Switch theme
// Practical: Derived state
const items$ = new BehaviorSubject<any[]>([]);
const itemCount$ = items$.pipe(map(items => items.length));
const hasItems$ = items$.pipe(map(items => items.length > 0));
itemCount$.subscribe(count => console.log('Count:', count)); // 0
items$.next([1, 2, 3]); // Count: 3
3. ReplaySubject for Historical Value Replay
| Configuration | Syntax | Description | Replay Behavior |
|---|---|---|---|
| Buffer size | new ReplaySubject<T>(bufferSize) |
Replays last N emissions to new subscribers | Stores N most recent values |
| Time window | new ReplaySubject<T>(bufferSize, windowTime) |
Replays values emitted within time window (ms) | Age-based filtering |
| Infinite replay | new ReplaySubject<T>() |
Replays all values to new subscribers (memory warning!) | Complete history |
Example: ReplaySubject value replay
import { ReplaySubject } from 'rxjs';
// Replay last 3 values
const replay$ = new ReplaySubject<number>(3);
replay$.next(1);
replay$.next(2);
replay$.next(3);
replay$.next(4);
// New subscriber receives last 3 values
replay$.subscribe(val => console.log('Subscriber A:', val));
// Output: 1, 2, 3, 4 (wait, buffer is 3, so: 2, 3, 4)
replay$.next(5);
// Output: Subscriber A: 5
// Another late subscriber
replay$.subscribe(val => console.log('Subscriber B:', val));
// Output: Subscriber B: 3, 4, 5
// ReplaySubject with time window
const replayTime$ = new ReplaySubject<number>(100, 1000); // 100 items, 1 second
replayTime$.next(1);
setTimeout(() => replayTime$.next(2), 500);
setTimeout(() => replayTime$.next(3), 1500);
setTimeout(() => {
replayTime$.subscribe(val => console.log('Timed:', val));
// Only receives values within last 1 second
}, 2000);
// Practical: Chat message history
class ChatService {
private messages$ = new ReplaySubject<Message>(50); // Last 50 messages
messages = this.messages$.asObservable();
sendMessage(message: Message) {
this.messages$.next(message);
this.saveToServer(message);
}
// New users joining see recent chat history
joinChat() {
return this.messages$; // Replays last 50 messages
}
}
// Practical: Activity log
class ActivityLogger {
private log$ = new ReplaySubject<LogEntry>(100, 5 * 60 * 1000); // Last 5 minutes
logActivity(entry: LogEntry) {
this.log$.next({ ...entry, timestamp: Date.now() });
}
getRecentActivity() {
return this.log$.asObservable();
}
}
// Practical: Undo/Redo functionality
class UndoRedoService {
private actions$ = new ReplaySubject<Action>(20); // Last 20 actions
recordAction(action: Action) {
this.actions$.next(action);
}
getHistory() {
const history: Action[] = [];
this.actions$.subscribe(action => history.push(action));
return history;
}
}
// Practical: Real-time sensor data
class SensorMonitor {
private readings$ = new ReplaySubject<Reading>(10, 60000); // Last 10, max 1 min old
recordReading(reading: Reading) {
this.readings$.next(reading);
}
getRecentReadings() {
return this.readings$.pipe(
scan((acc, reading) => [...acc, reading], [])
);
}
}
// Warning: Infinite replay (memory intensive)
const infiniteReplay$ = new ReplaySubject<number>(); // All values!
for (let i = 0; i < 10000; i++) {
infiniteReplay$.next(i);
}
// New subscriber receives ALL 10,000 values - memory leak risk!
// Practical: Command history
class CommandHistory {
private commands$ = new ReplaySubject<Command>(30);
execute(command: Command) {
command.execute();
this.commands$.next(command);
}
getHistory(): Command[] {
const history: Command[] = [];
const sub = this.commands$.subscribe(cmd => history.push(cmd));
sub.unsubscribe();
return history;
}
}
Warning: ReplaySubject without buffer size stores ALL values in memory. Always specify buffer
size to prevent memory leaks.
4. AsyncSubject for Last Value Emission
| Feature | Syntax | Description | Emission Pattern |
|---|---|---|---|
| AsyncSubject | new AsyncSubject<T>() |
Emits only the last value when completed | Single emission on complete |
| Behavior | Buffers last value until complete() | Similar to Promise - emits once when "resolved" | Late subscribers get same final value |
Example: AsyncSubject final value emission
import { AsyncSubject } from 'rxjs';
// Create AsyncSubject
const async$ = new AsyncSubject<number>();
// Subscribe before emissions
async$.subscribe(val => console.log('Subscriber A:', val));
async$.next(1);
async$.next(2);
async$.next(3);
// No output yet - waiting for complete()
async$.complete(); // Triggers emission of last value
// Output: Subscriber A: 3
// Late subscriber still gets last value
async$.subscribe(val => console.log('Subscriber B:', val));
// Output: Subscriber B: 3
// Practical: One-time calculation result
class Calculator {
calculate(operation: string): Observable<number> {
const result$ = new AsyncSubject<number>();
setTimeout(() => {
const result = this.performCalculation(operation);
result$.next(result);
result$.complete(); // Emit final result
}, 1000);
return result$.asObservable();
}
}
// Practical: Async initialization
class AppInitializer {
private initialized$ = new AsyncSubject<boolean>();
init() {
Promise.all([
this.loadConfig(),
this.loadUserData(),
this.connectServices()
]).then(() => {
this.initialized$.next(true);
this.initialized$.complete();
});
return this.initialized$.asObservable();
}
}
const initializer = new AppInitializer();
initializer.init().subscribe(ready => {
if (ready) console.log('App ready!');
});
// Practical: Final test result
class TestRunner {
runTests(): Observable<TestResult> {
const result$ = new AsyncSubject<TestResult>();
this.executeTests().then(results => {
const finalResult = this.aggregateResults(results);
result$.next(finalResult);
result$.complete();
});
return result$;
}
}
// Practical: Migration completion
class DataMigration {
migrate(): Observable<MigrationStatus> {
const status$ = new AsyncSubject<MigrationStatus>();
this.performMigration()
.then(result => {
status$.next({
success: true,
recordsMigrated: result.count
});
status$.complete();
})
.catch(err => {
status$.error(err);
});
return status$;
}
}
// Compare with Promise
const promise = new Promise(resolve => {
setTimeout(() => resolve('Promise result'), 1000);
});
const asyncSubject = new AsyncSubject();
setTimeout(() => {
asyncSubject.next('AsyncSubject result');
asyncSubject.complete();
}, 1000);
// Both emit once when resolved/completed
promise.then(val => console.log(val));
asyncSubject.subscribe(val => console.log(val));
// Error handling
const errorAsync$ = new AsyncSubject<number>();
errorAsync$.subscribe({
next: val => console.log('Value:', val),
error: err => console.error('Error:', err)
});
errorAsync$.next(1);
errorAsync$.next(2);
errorAsync$.error(new Error('Failed'));
// Output: Error: Failed (no value emission)
5. share and shareReplay for Reference Counting
| Operator | Syntax | Description | Replay Behavior |
|---|---|---|---|
| share | share(config?) |
Multicasts observable, shares single subscription among all subscribers | No replay - late subscribers miss previous values |
| shareReplay | shareReplay(bufferSize?, windowTime?) |
Like share but replays buffered values to new subscribers | Replays last N values or time window |
| shareReplay (config) | shareReplay({ bufferSize, refCount }) |
Configurable replay with reference counting control | refCount: true completes when all unsubscribe |
Example: Sharing and replay strategies
import { interval, of } from 'rxjs';
import { share, shareReplay, take, tap } from 'rxjs/operators';
// WITHOUT share - each subscriber creates new subscription
const cold$ = interval(1000).pipe(
take(3),
tap(val => console.log('Source emission:', val))
);
cold$.subscribe(val => console.log('Sub A:', val));
cold$.subscribe(val => console.log('Sub B:', val));
// Creates 2 separate interval instances!
// WITH share - single shared subscription
const hot$ = interval(1000).pipe(
take(3),
tap(val => console.log('Source emission:', val)),
share()
);
hot$.subscribe(val => console.log('Sub A:', val));
hot$.subscribe(val => console.log('Sub B:', val));
// Single interval shared between subscribers
// shareReplay - late subscribers get history
const replay$ = interval(1000).pipe(
take(5),
tap(val => console.log('Source:', val)),
shareReplay(2) // Buffer last 2 values
);
replay$.subscribe(val => console.log('Early:', val));
setTimeout(() => {
replay$.subscribe(val => console.log('Late:', val));
// Receives last 2 buffered values immediately
}, 3500);
// Practical: Expensive HTTP call sharing
const users$ = this.http.get('/api/users').pipe(
shareReplay(1) // Cache result, share with all subscribers
);
// Multiple components can subscribe without duplicate requests
users$.subscribe(users => this.displayInComponent1(users));
users$.subscribe(users => this.displayInComponent2(users));
users$.subscribe(users => this.displayInComponent3(users));
// Only ONE HTTP request made!
// shareReplay with refCount
const data$ = this.http.get('/api/data').pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
// When last subscriber unsubscribes, cache is cleared
// Practical: WebSocket connection sharing
class WebSocketService {
private connection$ = this.connectWebSocket().pipe(
share() // Share single WebSocket connection
);
messages$ = this.connection$.pipe(
filter(msg => msg.type === 'message')
);
notifications$ = this.connection$.pipe(
filter(msg => msg.type === 'notification')
);
}
// Practical: Current user sharing
class UserService {
currentUser$ = this.http.get('/api/current-user').pipe(
shareReplay({ bufferSize: 1, refCount: false })
// Keeps cached even if all unsubscribe
);
}
// share vs shareReplay comparison
const source$ = interval(1000).pipe(take(4));
// share - no replay
const shared$ = source$.pipe(share());
shared$.subscribe(val => console.log('A:', val));
setTimeout(() => {
shared$.subscribe(val => console.log('B:', val)); // Misses early values
}, 2500);
// shareReplay - with replay
const sharedReplay$ = source$.pipe(shareReplay(2));
sharedReplay$.subscribe(val => console.log('A:', val));
setTimeout(() => {
sharedReplay$.subscribe(val => console.log('B:', val)); // Gets last 2 values
}, 2500);
// Practical: Configuration sharing
const config$ = this.http.get('/api/config').pipe(
shareReplay(1) // All components get same config
);
// Multiple feature modules
config$.subscribe(config => this.featureA.init(config));
config$.subscribe(config => this.featureB.init(config));
config$.subscribe(config => this.featureC.init(config));
Note: Use share() for hot observables without replay. Use shareReplay(1) for caching expensive operations like HTTP requests.
6. multicast and publish Operators for Custom Multicasting
| Operator | Syntax | Description | Connection |
|---|---|---|---|
| multicast | multicast(subjectFactory) |
Multicasts using provided Subject/BehaviorSubject/ReplaySubject factory | Returns ConnectableObservable - manual connect() |
| publish | publish() |
Shorthand for multicast(() => new Subject()) | Requires connect() to start |
| publishReplay | publishReplay(bufferSize?) |
Shorthand for multicast(() => new ReplaySubject(bufferSize)) | Replays to late subscribers |
| refCount | publish().refCount() |
Auto-connects on first subscription, disconnects when count reaches 0 | Automatic connection management |
Example: Custom multicasting patterns
import { interval, Subject, ReplaySubject } from 'rxjs';
import { multicast, publish, publishReplay, refCount, take, tap } from 'rxjs/operators';
// multicast with Subject
const source$ = interval(1000).pipe(
take(4),
tap(val => console.log('Source:', val)),
multicast(new Subject())
);
source$.subscribe(val => console.log('A:', val));
source$.subscribe(val => console.log('B:', val));
// Must call connect() to start
source$.connect();
// publish - simpler syntax
const published$ = interval(1000).pipe(
take(3),
publish()
);
const sub1 = published$.subscribe(val => console.log('Sub1:', val));
const sub2 = published$.subscribe(val => console.log('Sub2:', val));
published$.connect(); // Start emissions
// publishReplay - with buffering
const publishedReplay$ = interval(1000).pipe(
take(5),
publishReplay(2) // Buffer last 2
);
publishedReplay$.subscribe(val => console.log('Early:', val));
publishedReplay$.connect();
setTimeout(() => {
publishedReplay$.subscribe(val => console.log('Late:', val));
// Gets buffered values
}, 3500);
// refCount - automatic connection management
const autoCounted$ = interval(1000).pipe(
take(5),
tap(val => console.log('Source:', val)),
publish(),
refCount() // Auto-connect on first sub, auto-disconnect on zero subs
);
const subscription1 = autoCounted$.subscribe(val => console.log('A:', val));
// Source starts here (first subscriber)
setTimeout(() => {
const subscription2 = autoCounted$.subscribe(val => console.log('B:', val));
// Joins existing stream
}, 2000);
setTimeout(() => {
subscription1.unsubscribe();
// Source still running (subscription2 active)
}, 3000);
setTimeout(() => {
subscription2.unsubscribe();
// Source stops (zero subscribers)
}, 4000);
// Practical: Shared expensive operation
const expensiveCalc$ = of(null).pipe(
tap(() => console.log('Expensive calculation...')),
map(() => performExpensiveCalculation()),
publishReplay(1),
refCount()
);
expensiveCalc$.subscribe(result => console.log('User 1:', result));
expensiveCalc$.subscribe(result => console.log('User 2:', result));
// Only one calculation performed
// Practical: Controlled broadcast
class NotificationService {
private notifications$ = new Subject<Notification>();
private broadcast$ = this.notifications$.pipe(
multicast(new ReplaySubject<Notification>(5))
);
constructor() {
this.broadcast$.connect(); // Start broadcasting
}
send(notification: Notification) {
this.notifications$.next(notification);
}
subscribe() {
return this.broadcast$; // Subscribers get last 5 notifications
}
}
// Modern alternative: share() is preferred
// Instead of: publish().refCount()
// Use: share()
const modern$ = interval(1000).pipe(
take(5),
share() // Equivalent to publish().refCount()
);
// Instead of: publishReplay(1).refCount()
// Use: shareReplay(1)
const modernReplay$ = interval(1000).pipe(
take(5),
shareReplay(1) // Equivalent to publishReplay(1).refCount()
);
Section 7 Summary
- Subject is both Observable and Observer - multicast without replay
- BehaviorSubject requires initial value, replays current value to new subscribers
- ReplaySubject buffers N values, replays to late subscribers (specify buffer to avoid leaks)
- AsyncSubject emits only last value on completion (Promise-like behavior)
- share() for hot multicasting, shareReplay(1) for caching expensive operations
- Modern pattern: prefer share/shareReplay over publish/multicast operators