Framework Integration Patterns
1. Angular RxJS Integration and async Pipe
| Feature | Syntax | Description | Benefit |
|---|---|---|---|
| async Pipe | {{ observable$ | async }} |
Subscribe/unsubscribe automatically in template | No manual subscription management |
| HttpClient | http.get<T>(url) |
Returns observables for HTTP requests | Built-in RxJS integration |
| Reactive Forms | formControl.valueChanges |
Observable streams for form values | Reactive form validation |
| Router Events | router.events |
Observable stream of navigation events | React to route changes |
| Event Emitters | @Output() event = new EventEmitter() |
Component communication with observables | Type-safe event handling |
| Service State | BehaviorSubject in services | Centralized state management | Share state across components |
Example: async pipe for automatic subscription management
// Component
@Component({
selector: 'app-user-list',
template: `
<div *ngIf="users$ | async as users; else loading">
<div *ngFor="let user of users">
{{ user.name }}
</div>
</div>
<ng-template #loading>
<div>Loading...</div>
</ng-template>
<!-- Error handling -->
<div *ngIf="error$ | async as error" class="error">
{{ error }}
</div>
`
})
export class UserListComponent {
users$: Observable<User[]>;
error$ = new Subject<string>();
constructor(private userService: UserService) {
this.users$ = this.userService.getUsers().pipe(
catchError(err => {
this.error$.next(err.message);
return of([]);
})
);
}
// No ngOnDestroy needed - async pipe handles cleanup!
}
Example: Reactive forms with RxJS
@Component({
selector: 'app-search',
template: `
<input [formControl]="searchControl" placeholder="Search...">
<div *ngFor="let result of results$ | async">
{{ result.title }}
</div>
`
})
export class SearchComponent implements OnInit {
searchControl = new FormControl('');
results$: Observable<SearchResult[]>;
constructor(private searchService: SearchService) {}
ngOnInit() {
this.results$ = this.searchControl.valueChanges.pipe(
debounceTime(300), // Wait for user to stop typing
distinctUntilChanged(), // Only if value changed
filter(term => term.length >= 3), // Minimum 3 characters
switchMap(term =>
this.searchService.search(term).pipe(
catchError(() => of([])) // Handle errors gracefully
)
),
shareReplay({ bufferSize: 1, refCount: true }) // Cache results
);
}
}
Example: Angular service with state management
// User service with BehaviorSubject
@Injectable({ providedIn: 'root' })
export class UserService {
private currentUserSubject = new BehaviorSubject<User | null>(null);
public currentUser$ = this.currentUserSubject.asObservable();
private loadingSubject = new BehaviorSubject<boolean>(false);
public loading$ = this.loadingSubject.asObservable();
constructor(private http: HttpClient) {}
loadCurrentUser() {
this.loadingSubject.next(true);
return this.http.get<User>('/api/user/current').pipe(
tap(user => {
this.currentUserSubject.next(user);
this.loadingSubject.next(false);
}),
catchError(err => {
this.loadingSubject.next(false);
return throwError(() => err);
})
);
}
updateUser(user: User) {
return this.http.put<User>(`/api/users/${user.id}`, user).pipe(
tap(updatedUser => this.currentUserSubject.next(updatedUser))
);
}
logout() {
this.currentUserSubject.next(null);
}
}
// Component using the service
@Component({
template: `
<div *ngIf="user$ | async as user">
Welcome, {{ user.name }}!
</div>
<div *ngIf="loading$ | async">Loading...</div>
`
})
export class HeaderComponent {
user$ = this.userService.currentUser$;
loading$ = this.userService.loading$;
constructor(private userService: UserService) {}
}
2. React Hooks Integration with RxJS
| Hook Pattern | Purpose | Implementation | Use Case |
|---|---|---|---|
| useObservable | Subscribe to observable | useState + useEffect with observable | Render observable values |
| useSubject | Create managed subject | useRef for subject + cleanup | Event streams in components |
| useEventObservable | Convert events to observable | fromEvent with cleanup | DOM event handling |
| useObservableState | Observable-driven state | BehaviorSubject + useState | Complex state management |
| useRxEffect | Side effects with observables | useEffect with observable pipeline | API calls, subscriptions |
Example: Custom useObservable hook
// useObservable.ts
function useObservable<T>(
observable$: Observable<T>,
initialValue: T
): T {
const [value, setValue] = useState<T>(initialValue);
useEffect(() => {
const subscription = observable$.subscribe({
next: setValue,
error: err => console.error('Observable error:', err)
});
return () => subscription.unsubscribe();
}, [observable$]);
return value;
}
// Usage in component
function UserProfile({ userId }: { userId: string }) {
const user$ = useMemo(
() => userService.getUser(userId),
[userId]
);
const user = useObservable(user$, null);
if (!user) return <div>Loading...</div>;
return (
<div>
<h1>{user.name}</h1>
<p>{user.email}</p>
</div>
);
}
Example: useSubject for event handling
// useSubject.ts
function useSubject<T>(): [Observable<T>, (value: T) => void] {
const subjectRef = useRef<Subject<T>>();
if (!subjectRef.current) {
subjectRef.current = new Subject<T>();
}
useEffect(() => {
return () => {
subjectRef.current?.complete();
};
}, []);
const emit = useCallback((value: T) => {
subjectRef.current?.next(value);
}, []);
return [subjectRef.current.asObservable(), emit];
}
// Usage: Search with debounce
function SearchComponent() {
const [search$, emitSearch] = useSubject<string>();
const results = useObservable(
useMemo(() => search$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(term => searchAPI(term))
), [search$]),
[]
);
return (
<>
<input
onChange={e => emitSearch(e.target.value)}
placeholder="Search..."
/>
{results.map(r => <div key={r.id}>{r.title}</div>)}
</>
);
}
Example: Advanced observable state management
// useObservableState.ts
function useObservableState<T>(
initialState: T
): [T, (value: T | ((prev: T) => T)) => void, Observable<T>] {
const [state$] = useState(() => new BehaviorSubject<T>(initialState));
const [state, setState] = useState<T>(initialState);
useEffect(() => {
const sub = state$.subscribe(setState);
return () => {
sub.unsubscribe();
state$.complete();
};
}, [state$]);
const updateState = useCallback((value: T | ((prev: T) => T)) => {
const newValue = typeof value === 'function'
? (value as (prev: T) => T)(state$.value)
: value;
state$.next(newValue);
}, [state$]);
return [state, updateState, state$.asObservable()];
}
// Usage: Complex state with derived values
function TodoList() {
const [todos, setTodos, todos$] = useObservableState<Todo[]>([]);
const stats = useObservable(
useMemo(() => todos$.pipe(
map(todos => ({
total: todos.length,
completed: todos.filter(t => t.done).length,
pending: todos.filter(t => !t.done).length
}))
), [todos$]),
{ total: 0, completed: 0, pending: 0 }
);
const addTodo = (text: string) => {
setTodos(prev => [...prev, { id: Date.now(), text, done: false }]);
};
return (
<div>
<div>Total: {stats.total}, Completed: {stats.completed}</div>
{todos.map(todo => <TodoItem key={todo.id} todo={todo} />)}
</div>
);
}
3. Vue.js Composition API and RxJS
| Composable | Purpose | Implementation | Use Case |
|---|---|---|---|
| useObservable | Observable to ref | ref + watchEffect with subscription | Reactive observable values |
| useSubscription | Manage subscription lifecycle | onUnmounted cleanup | Side effect subscriptions |
| fromRef | Ref to observable | Observable from watch | Convert reactive refs |
| useRxState | BehaviorSubject state | Subject + ref sync | Observable state management |
Example: Vue composables for RxJS
// useObservable.ts
import { ref, onUnmounted, Ref } from 'vue';
import { Observable } from 'rxjs';
export function useObservable<T>(
observable$: Observable<T>,
initialValue?: T
): Ref<T | undefined> {
const value = ref<T | undefined>(initialValue);
const subscription = observable$.subscribe({
next: (v) => { value.value = v; },
error: (err) => console.error('Observable error:', err)
});
onUnmounted(() => {
subscription.unsubscribe();
});
return value;
}
// useSubscription.ts
export function useSubscription() {
const subscriptions: Subscription[] = [];
const add = (subscription: Subscription) => {
subscriptions.push(subscription);
};
onUnmounted(() => {
subscriptions.forEach(sub => sub.unsubscribe());
});
return { add };
}
// Usage in Vue component
export default {
setup() {
const userService = inject('userService');
// Convert observable to ref
const users = useObservable(
userService.getUsers(),
[]
);
const searchTerm = ref('');
// Create observable from ref
const searchResults = useObservable(
computed(() => searchTerm.value).pipe(
debounceTime(300),
switchMap(term => searchAPI(term))
),
[]
);
return {
users,
searchTerm,
searchResults
};
}
};
Example: Vue RxJS state management
// composables/useRxState.ts
export function useRxState<T>(initialValue: T) {
const subject = new BehaviorSubject<T>(initialValue);
const state = ref<T>(initialValue);
const subscription = subject.subscribe(value => {
state.value = value;
});
onUnmounted(() => {
subscription.unsubscribe();
subject.complete();
});
const setState = (value: T | ((prev: T) => T)) => {
const newValue = typeof value === 'function'
? value(subject.value)
: value;
subject.next(newValue);
};
return {
state: readonly(state),
setState,
state$: subject.asObservable()
};
}
// Usage
export default {
setup() {
const { state: todos, setState: setTodos, state$ } = useRxState<Todo[]>([]);
// Derived state
const completedCount = useObservable(
state$.pipe(
map(todos => todos.filter(t => t.completed).length)
),
0
);
const addTodo = (text: string) => {
setTodos(prev => [...prev, {
id: Date.now(),
text,
completed: false
}]);
};
return {
todos,
completedCount,
addTodo
};
}
};
4. Node.js Streams Integration
| Integration | Method | Description | Use Case |
|---|---|---|---|
| Readable to Observable | fromEvent(stream, 'data') |
Convert Node.js readable stream | File reading, HTTP response |
| Observable to Writable | Subscribe and write to stream | Pipe observable to writable stream | File writing, HTTP request |
| Transform Stream | Observable operators as Transform | Use RxJS operators on streams | Data processing pipelines |
| EventEmitter | fromEvent(emitter, 'event') |
Convert EventEmitter to observable | Event-driven architectures |
Example: Node.js stream to observable
import { fromEvent, merge } from 'rxjs';
import { map, takeUntil } from 'rxjs/operators';
import * as fs from 'fs';
import * as readline from 'readline';
// Read file line by line with RxJS
function readFileLines(filePath: string): Observable<string> {
return new Observable(subscriber => {
const readStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity
});
rl.on('line', line => subscriber.next(line));
rl.on('close', () => subscriber.complete());
rl.on('error', err => subscriber.error(err));
return () => {
rl.close();
readStream.destroy();
};
});
}
// Usage: Process large files
readFileLines('./large-file.txt').pipe(
map(line => line.toUpperCase()),
filter(line => line.includes('ERROR')),
take(100)
).subscribe({
next: line => console.log(line),
complete: () => console.log('Done processing')
});
// Convert stream events to observable
const readStream = fs.createReadStream('./data.txt');
const data$ = fromEvent(readStream, 'data');
const end$ = fromEvent(readStream, 'end');
const error$ = fromEvent(readStream, 'error');
data$.pipe(
map((chunk: Buffer) => chunk.toString()),
takeUntil(merge(end$, error$))
).subscribe({
next: chunk => processChunk(chunk),
error: err => console.error('Stream error:', err),
complete: () => console.log('Stream ended')
});
Example: Observable to Node.js stream
import { interval } from 'rxjs';
import { map } from 'rxjs/operators';
import * as fs from 'fs';
// Write observable data to file
function observableToStream<T>(
observable$: Observable<T>,
writeStream: fs.WriteStream
): Promise<void> {
return new Promise((resolve, reject) => {
const subscription = observable$.subscribe({
next: value => {
const canWrite = writeStream.write(JSON.stringify(value) + '\n');
if (!canWrite) {
// Backpressure handling
subscription.unsubscribe();
writeStream.once('drain', () => {
// Resume when buffer drained
});
}
},
error: err => {
writeStream.end();
reject(err);
},
complete: () => {
writeStream.end();
resolve();
}
});
writeStream.on('error', err => {
subscription.unsubscribe();
reject(err);
});
});
}
// Usage: Stream data to file
const writeStream = fs.createWriteStream('./output.jsonl');
const data$ = interval(1000).pipe(
take(100),
map(n => ({ id: n, timestamp: Date.now() }))
);
observableToStream(data$, writeStream)
.then(() => console.log('Write complete'))
.catch(err => console.error('Write error:', err));
5. WebSocket Integration Patterns
| Pattern | Implementation | Description | Feature |
|---|---|---|---|
| webSocket() Function | webSocket(url) |
RxJS built-in WebSocket subject | Bidirectional communication |
| Reconnection | retryWhen() with delay |
Auto-reconnect on disconnect | Connection resilience |
| Message Queue | Buffer messages during disconnect | Queue messages until reconnected | Guaranteed delivery |
| Heartbeat | Periodic ping/pong | Keep connection alive | Connection monitoring |
| multiplexing | Share single WebSocket | Multiple logical channels | Resource efficiency |
Example: Basic WebSocket observable
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
// Create WebSocket observable
const ws$: WebSocketSubject<any> = webSocket({
url: 'ws://localhost:8080',
openObserver: {
next: () => console.log('WebSocket connected')
},
closeObserver: {
next: () => console.log('WebSocket disconnected')
}
});
// Subscribe to messages
ws$.subscribe({
next: msg => console.log('Message:', msg),
error: err => console.error('WebSocket error:', err),
complete: () => console.log('WebSocket completed')
});
// Send messages
ws$.next({ type: 'subscribe', channel: 'updates' });
ws$.next({ type: 'message', data: 'Hello Server!' });
// Unsubscribe closes connection
// ws$.complete();
Example: WebSocket with auto-reconnect
import { webSocket } from 'rxjs/webSocket';
import { retryWhen, delay, tap } from 'rxjs/operators';
function createWebSocketWithReconnect(url: string) {
return webSocket({
url,
openObserver: {
next: () => console.log('Connected to WebSocket')
},
closeObserver: {
next: () => console.log('Disconnected from WebSocket')
}
}).pipe(
retryWhen(errors => errors.pipe(
tap(err => console.log('Connection lost, reconnecting...')),
delay(5000) // Wait 5s before reconnect
))
);
}
// Usage with automatic reconnection
const ws$ = createWebSocketWithReconnect('ws://localhost:8080');
ws$.subscribe({
next: msg => handleMessage(msg),
error: err => console.error('Fatal error:', err)
});
// Advanced: Exponential backoff
function createResilientWebSocket(url: string) {
return webSocket(url).pipe(
retryWhen(errors => errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 10) throw error;
console.log(`Retry ${retryCount + 1}/10`);
return retryCount + 1;
}, 0),
delayWhen(retryCount =>
timer(Math.min(1000 * Math.pow(2, retryCount), 30000))
)
))
);
}
Example: WebSocket with message queue and multiplexing
// WebSocket service with message queue
class WebSocketService {
private ws$: WebSocketSubject<any>;
private connectionState$ = new BehaviorSubject<'connected' | 'disconnected'>('disconnected');
private messageQueue: any[] = [];
constructor(url: string) {
this.ws$ = webSocket({
url,
openObserver: {
next: () => {
this.connectionState$.next('connected');
this.flushQueue();
}
},
closeObserver: {
next: () => this.connectionState$.next('disconnected')
}
});
// Auto-reconnect
this.ws$.pipe(
retryWhen(errors => errors.pipe(delay(5000)))
).subscribe();
}
// Send message (queue if disconnected)
send(message: any) {
if (this.connectionState$.value === 'connected') {
this.ws$.next(message);
} else {
this.messageQueue.push(message);
}
}
// Flush queued messages
private flushQueue() {
while (this.messageQueue.length > 0) {
const msg = this.messageQueue.shift();
this.ws$.next(msg);
}
}
// Subscribe to specific message type
on<T>(messageType: string): Observable<T> {
return this.ws$.pipe(
filter((msg: any) => msg.type === messageType),
map((msg: any) => msg.data as T)
);
}
// Multiplexing - multiple logical channels
channel<T>(channelName: string): Observable<T> {
return this.ws$.pipe(
filter((msg: any) => msg.channel === channelName),
map((msg: any) => msg.data as T)
);
}
}
// Usage
const wsService = new WebSocketService('ws://localhost:8080');
// Subscribe to user updates
wsService.on<User>('user-update').subscribe(
user => console.log('User updated:', user)
);
// Subscribe to notifications
wsService.on<Notification>('notification').subscribe(
notification => showNotification(notification)
);
// Send messages
wsService.send({ type: 'subscribe', channel: 'users' });
wsService.send({ type: 'message', data: 'Hello' });
6. Service Worker and RxJS Integration
| Pattern | Implementation | Description | Use Case |
|---|---|---|---|
| Message Channel | fromEvent on message port | Observable messaging with SW | Background sync, push notifications |
| Cache Strategy | Observable cache operators | Implement cache-first, network-first | Offline-first apps |
| Background Sync | Queue with retry logic | Sync data when online | Offline data persistence |
| Push Notifications | fromEvent on push events | Handle push notifications | Real-time updates |
Example: Service Worker messaging with RxJS
// In main app
class ServiceWorkerService {
private messageSubject = new Subject<any>();
public messages$ = this.messageSubject.asObservable();
constructor() {
if ('serviceWorker' in navigator) {
navigator.serviceWorker.ready.then(registration => {
// Listen for messages from SW
fromEvent<MessageEvent>(navigator.serviceWorker, 'message')
.pipe(
map(event => event.data)
)
.subscribe(message => this.messageSubject.next(message));
});
}
}
// Send message to SW
async postMessage(message: any) {
const registration = await navigator.serviceWorker.ready;
registration.active?.postMessage(message);
}
// Observable for specific message types
on<T>(type: string): Observable<T> {
return this.messages$.pipe(
filter(msg => msg.type === type),
map(msg => msg.data as T)
);
}
}
// Usage
const swService = new ServiceWorkerService();
// Listen for sync complete
swService.on<{ count: number }>('sync-complete').subscribe(
data => console.log(`Synced ${data.count} items`)
);
// Request background sync
swService.postMessage({
type: 'background-sync',
data: { items: [...] }
});
Example: Cache strategies with RxJS
// Offline-first HTTP service
class OfflineHttpService {
constructor(private http: HttpClient) {}
// Cache-first strategy
cacheFirst<T>(url: string): Observable<T> {
return from(caches.match(url)).pipe(
mergeMap(cached => {
if (cached) {
return from(cached.json() as Promise<T>);
}
// Not in cache, fetch from network
return this.http.get<T>(url).pipe(
tap(data => {
// Store in cache
caches.open('api-cache').then(cache => {
cache.put(url, new Response(JSON.stringify(data)));
});
})
);
})
);
}
// Network-first with cache fallback
networkFirst<T>(url: string): Observable<T> {
return this.http.get<T>(url).pipe(
timeout(5000),
tap(data => {
// Update cache
caches.open('api-cache').then(cache => {
cache.put(url, new Response(JSON.stringify(data)));
});
}),
catchError(() =>
// Network failed, try cache
from(caches.match(url)).pipe(
mergeMap(cached => {
if (cached) {
return from(cached.json() as Promise<T>);
}
return throwError(() => new Error('No cached data'));
})
)
)
);
}
// Stale-while-revalidate
staleWhileRevalidate<T>(url: string): Observable<T> {
const cache$ = from(caches.match(url)).pipe(
filter(response => !!response),
mergeMap(response => from(response!.json() as Promise<T>))
);
const network$ = this.http.get<T>(url).pipe(
tap(data => {
caches.open('api-cache').then(cache => {
cache.put(url, new Response(JSON.stringify(data)));
});
})
);
// Return cache immediately, then network
return concat(cache$, network$).pipe(
distinctUntilChanged()
);
}
}
// Usage
const offlineHttp = new OfflineHttpService(http);
// Get data with cache-first
offlineHttp.cacheFirst<User[]>('/api/users')
.subscribe(users => displayUsers(users));
Framework Integration Best Practices:
- Angular: Use async pipe for automatic subscription management
- React: Create custom hooks for observable integration and cleanup
- Vue: Use composables with onUnmounted for proper cleanup
- Node.js: Handle backpressure when converting streams
- WebSocket: Implement reconnection logic and message queuing
- Service Workers: Use RxJS for offline-first patterns
Section 16 Summary
- Angular async pipe provides automatic subscription lifecycle management
- React hooks require custom implementation for observable integration and cleanup
- Vue composables with onUnmounted enable reactive observable patterns
- Node.js streams convert to/from observables for unified data processing
- WebSocket subjects enable bidirectional communication with retry logic
- Service Workers with RxJS implement sophisticated offline-first strategies