Data Flow Architecture Implementation
1. Flux Redux Unidirectional Flow
| Concept | Component | Description | Role |
|---|---|---|---|
| Action | { type: 'ADD', payload: data } |
Plain object describing state change intent | Event description |
| Dispatcher | dispatch(action) |
Central hub distributing actions to stores | Action distribution |
| Store | function reducer(state, action) |
Holds application state, responds to actions | State container |
| View | useSelector(state => state.data) |
React component subscribing to store updates | UI rendering |
| Middleware | const logger = store => next => action => {} |
Intercepts actions for logging, async, etc | Side effect handling |
Example: Complete Flux/Redux unidirectional data flow
// Actions - describe what happened
const ActionTypes = {
ADD_TODO: 'ADD_TODO',
TOGGLE_TODO: 'TOGGLE_TODO',
SET_FILTER: 'SET_FILTER',
};
const addTodo = (text) => ({
type: ActionTypes.ADD_TODO,
payload: { id: Date.now(), text, completed: false },
});
const toggleTodo = (id) => ({
type: ActionTypes.TOGGLE_TODO,
payload: { id },
});
// Reducer - specifies how state changes
function todosReducer(state = [], action) {
switch (action.type) {
case ActionTypes.ADD_TODO:
return [...state, action.payload];
case ActionTypes.TOGGLE_TODO:
return state.map(todo =>
todo.id === action.payload.id
? { ...todo, completed: !todo.completed }
: todo
);
default:
return state;
}
}
// Store - holds state, provides dispatch
import { createStore, combineReducers } from 'redux';
const rootReducer = combineReducers({
todos: todosReducer,
filter: filterReducer,
});
const store = createStore(rootReducer);
// View - subscribes and dispatches
import { useSelector, useDispatch } from 'react-redux';
function TodoList() {
const todos = useSelector(state => state.todos);
const dispatch = useDispatch();
return (
<div>
{todos.map(todo => (
<div key={todo.id}>
<input
type="checkbox"
checked={todo.completed}
onChange={() => dispatch(toggleTodo(todo.id))}
/>
{todo.text}
</div>
))}
<button onClick={() => dispatch(addTodo('New Todo'))}>
Add Todo
</button>
</div>
);
}
// Data Flow:
// 1. User clicks button → dispatch(action)
// 2. Action sent to reducer
// 3. Reducer creates new state
// 4. Store notifies subscribers
// 5. Component re-renders with new state
Note: Unidirectional flow ensures predictable state changes -
state can only be modified through actions, making debugging easier with Redux DevTools.
2. GraphQL Apollo Client Normalization
| Feature | Syntax | Description | Use Case |
|---|---|---|---|
| ApolloClient | new ApolloClient({ uri, cache }) |
GraphQL client with intelligent caching | Client initialization |
| InMemoryCache | new InMemoryCache({ typePolicies }) |
Normalized cache storing entities by ID | Cache configuration |
| useQuery | const { data, loading } = useQuery(QUERY) |
Fetches data with automatic caching and updates | Data fetching |
| useMutation | const [mutate] = useMutation(MUTATION) |
Executes mutations with cache updates | Data modification |
| cache.writeQuery | cache.writeQuery({ query, data }) |
Manually updates cache with new data | Optimistic updates |
| refetchQueries | { refetchQueries: [{ query: QUERY }] } |
Refreshes queries after mutation | Cache invalidation |
| @client directive | field @client |
Local-only fields not fetched from server | Client-side state |
Example: Apollo Client with normalized cache and optimistic updates
// Apollo Client setup
import { ApolloClient, InMemoryCache, gql } from '@apollo/client';
const client = new ApolloClient({
uri: 'https://api.example.com/graphql',
cache: new InMemoryCache({
typePolicies: {
Query: {
fields: {
posts: {
merge(existing = [], incoming) {
return [...existing, ...incoming];
},
},
},
},
User: {
fields: {
fullName: {
read(_, { readField }) {
return `${readField('firstName')} ${readField('lastName')}`;
},
},
},
},
},
}),
});
// GraphQL queries and mutations
const GET_USERS = gql`
query GetUsers {
users {
id
name
email
}
}
`;
const CREATE_USER = gql`
mutation CreateUser($input: UserInput!) {
createUser(input: $input) {
id
name
email
}
}
`;
// Component with query
import { useQuery, useMutation } from '@apollo/client';
function UserList() {
const { data, loading, error } = useQuery(GET_USERS, {
pollInterval: 30000, // Refetch every 30s
fetchPolicy: 'cache-and-network',
});
const [createUser] = useMutation(CREATE_USER, {
// Optimistic response
optimisticResponse: {
createUser: {
__typename: 'User',
id: 'temp-id',
name: 'New User',
email: 'temp@example.com',
},
},
// Update cache after mutation
update(cache, { data: { createUser } }) {
const existing = cache.readQuery({ query: GET_USERS });
cache.writeQuery({
query: GET_USERS,
data: {
users: [...existing.users, createUser],
},
});
},
});
if (loading) return <div>Loading...</div>;
if (error) return <div>Error: {error.message}</div>;
return (
<div>
{data.users.map(user => (
<div key={user.id}>{user.name} - {user.email}</div>
))}
<button onClick={() => createUser({
variables: { input: { name: 'John', email: 'john@example.com' } }
})}>
Add User
</button>
</div>
);
}
// Cache normalization example
// Apollo automatically normalizes:
// { users: [{ id: 1, name: 'John' }, { id: 2, name: 'Jane' }] }
// Into:
// {
// ROOT_QUERY: { users: [ref('User:1'), ref('User:2')] },
// 'User:1': { id: 1, name: 'John' },
// 'User:2': { id: 2, name: 'Jane' }
// }
3. React Query Optimistic Updates
| Feature | Syntax | Description | Use Case |
|---|---|---|---|
| useQuery | useQuery({ queryKey, queryFn }) |
Declarative data fetching with caching | Server state |
| useMutation | useMutation({ mutationFn, onMutate }) |
Data mutations with rollback support | Data updates |
| queryClient.setQueryData | queryClient.setQueryData(key, data) |
Manually updates cached query data | Optimistic updates |
| onMutate | onMutate: async (newData) => {} |
Fires before mutation, returns rollback context | Optimistic UI |
| onError | onError: (err, vars, context) => {} |
Handles mutation errors, rolls back changes | Error recovery |
| invalidateQueries | queryClient.invalidateQueries(['key']) |
Marks queries as stale, triggers refetch | Cache invalidation |
Example: React Query with optimistic updates and rollback
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
function TodoList() {
const queryClient = useQueryClient();
// Fetch todos
const { data: todos } = useQuery({
queryKey: ['todos'],
queryFn: async () => {
const res = await fetch('/api/todos');
return res.json();
},
});
// Mutation with optimistic update
const updateTodoMutation = useMutation({
mutationFn: async (updatedTodo) => {
const res = await fetch(`/api/todos/${updatedTodo.id}`, {
method: 'PUT',
body: JSON.stringify(updatedTodo),
});
return res.json();
},
// Optimistic update
onMutate: async (updatedTodo) => {
// Cancel outgoing refetches
await queryClient.cancelQueries({ queryKey: ['todos'] });
// Snapshot current value
const previousTodos = queryClient.getQueryData(['todos']);
// Optimistically update cache
queryClient.setQueryData(['todos'], (old) =>
old.map((todo) =>
todo.id === updatedTodo.id ? updatedTodo : todo
)
);
// Return context with snapshot
return { previousTodos };
},
// Rollback on error
onError: (err, updatedTodo, context) => {
queryClient.setQueryData(['todos'], context.previousTodos);
console.error('Update failed, rolled back:', err);
},
// Refetch after success
onSettled: () => {
queryClient.invalidateQueries({ queryKey: ['todos'] });
},
});
const deleteTodoMutation = useMutation({
mutationFn: async (id) => {
await fetch(`/api/todos/${id}`, { method: 'DELETE' });
},
onMutate: async (deletedId) => {
await queryClient.cancelQueries({ queryKey: ['todos'] });
const previousTodos = queryClient.getQueryData(['todos']);
// Optimistically remove from UI
queryClient.setQueryData(['todos'], (old) =>
old.filter((todo) => todo.id !== deletedId)
);
return { previousTodos };
},
onError: (err, deletedId, context) => {
queryClient.setQueryData(['todos'], context.previousTodos);
},
onSettled: () => {
queryClient.invalidateQueries({ queryKey: ['todos'] });
},
});
const handleToggle = (todo) => {
updateTodoMutation.mutate({
...todo,
completed: !todo.completed,
});
};
return (
<div>
{todos?.map((todo) => (
<div key={todo.id}>
<input
type="checkbox"
checked={todo.completed}
onChange={() => handleToggle(todo)}
/>
{todo.text}
<button onClick={() => deleteTodoMutation.mutate(todo.id)}>
Delete
</button>
</div>
))}
</div>
);
}
4. WebSocket Real-time Data Sync
| Feature | API/Method | Description | Use Case |
|---|---|---|---|
| WebSocket | new WebSocket('ws://url') |
Creates persistent bidirectional connection | Real-time communication |
| onopen | ws.onopen = () => {} |
Fires when connection established | Connection ready |
| onmessage | ws.onmessage = (event) => {} |
Receives messages from server | Incoming data |
| send | ws.send(JSON.stringify(data)) |
Sends data to server | Outgoing messages |
| onerror | ws.onerror = (error) => {} |
Handles connection errors | Error handling |
| onclose | ws.onclose = () => {} |
Fires when connection closes | Cleanup, reconnection |
| Socket.io | io('http://url') |
Library with auto-reconnection, rooms, namespaces | Enhanced WebSocket |
Example: WebSocket with React hook and reconnection logic
import { useEffect, useState, useRef } from 'react';
// Custom WebSocket hook
function useWebSocket(url) {
const [messages, setMessages] = useState([]);
const [connectionStatus, setConnectionStatus] = useState('disconnected');
const ws = useRef(null);
const reconnectTimeoutRef = useRef(null);
const connect = () => {
ws.current = new WebSocket(url);
ws.current.onopen = () => {
console.log('Connected');
setConnectionStatus('connected');
};
ws.current.onmessage = (event) => {
const data = JSON.parse(event.data);
setMessages((prev) => [...prev, data]);
};
ws.current.onerror = (error) => {
console.error('WebSocket error:', error);
setConnectionStatus('error');
};
ws.current.onclose = () => {
console.log('Disconnected');
setConnectionStatus('disconnected');
// Auto-reconnect after 3 seconds
reconnectTimeoutRef.current = setTimeout(() => {
console.log('Reconnecting...');
connect();
}, 3000);
};
};
useEffect(() => {
connect();
return () => {
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
}
if (ws.current) {
ws.current.close();
}
};
}, [url]);
const sendMessage = (message) => {
if (ws.current?.readyState === WebSocket.OPEN) {
ws.current.send(JSON.stringify(message));
}
};
return { messages, sendMessage, connectionStatus };
}
// Component using WebSocket
function ChatRoom() {
const { messages, sendMessage, connectionStatus } = useWebSocket(
'ws://localhost:8080'
);
const [input, setInput] = useState('');
const handleSend = () => {
sendMessage({ type: 'chat', text: input, timestamp: Date.now() });
setInput('');
};
return (
<div>
<div>Status: {connectionStatus}</div>
<div>
{messages.map((msg, i) => (
<div key={i}>{msg.text}</div>
))}
</div>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && handleSend()}
/>
<button onClick={handleSend}>Send</button>
</div>
);
}
// Socket.io alternative
import io from 'socket.io-client';
function useSocketIO(url) {
const [socket, setSocket] = useState(null);
useEffect(() => {
const newSocket = io(url, {
reconnection: true,
reconnectionDelay: 1000,
reconnectionAttempts: 5,
});
newSocket.on('connect', () => {
console.log('Socket.io connected');
});
setSocket(newSocket);
return () => newSocket.close();
}, [url]);
return socket;
}
5. IndexedDB Offline Data Persistence
| Feature | API | Description | Use Case |
|---|---|---|---|
| open | indexedDB.open(name, version) |
Opens database, creates if doesn't exist | DB initialization |
| createObjectStore | db.createObjectStore(name, { keyPath }) |
Creates table-like storage space | Schema definition |
| transaction | db.transaction([store], 'readwrite') |
Groups operations with ACID properties | Data operations |
| add | objectStore.add(data) |
Inserts new record, fails if key exists | Insert operation |
| put | objectStore.put(data) |
Updates or inserts record | Upsert operation |
| get | objectStore.get(key) |
Retrieves single record by key | Read operation |
| getAll | objectStore.getAll() |
Retrieves all records | Bulk read |
| Index | objectStore.createIndex(name, keyPath) |
Creates searchable index on field | Query optimization |
Example: IndexedDB wrapper with React hook
// IndexedDB helper class
class DatabaseManager {
constructor(dbName, version = 1) {
this.dbName = dbName;
this.version = version;
this.db = null;
}
async open() {
return new Promise((resolve, reject) => {
const request = indexedDB.open(this.dbName, this.version);
request.onerror = () => reject(request.error);
request.onsuccess = () => {
this.db = request.result;
resolve(this.db);
};
request.onupgradeneeded = (event) => {
const db = event.target.result;
// Create object stores
if (!db.objectStoreNames.contains('todos')) {
const todoStore = db.createObjectStore('todos', {
keyPath: 'id',
autoIncrement: true
});
todoStore.createIndex('completed', 'completed', { unique: false });
todoStore.createIndex('createdAt', 'createdAt', { unique: false });
}
};
});
}
async add(storeName, data) {
const transaction = this.db.transaction([storeName], 'readwrite');
const store = transaction.objectStore(storeName);
return new Promise((resolve, reject) => {
const request = store.add(data);
request.onsuccess = () => resolve(request.result);
request.onerror = () => reject(request.error);
});
}
async getAll(storeName) {
const transaction = this.db.transaction([storeName], 'readonly');
const store = transaction.objectStore(storeName);
return new Promise((resolve, reject) => {
const request = store.getAll();
request.onsuccess = () => resolve(request.result);
request.onerror = () => reject(request.error);
});
}
async update(storeName, data) {
const transaction = this.db.transaction([storeName], 'readwrite');
const store = transaction.objectStore(storeName);
return new Promise((resolve, reject) => {
const request = store.put(data);
request.onsuccess = () => resolve(request.result);
request.onerror = () => reject(request.error);
});
}
async delete(storeName, key) {
const transaction = this.db.transaction([storeName], 'readwrite');
const store = transaction.objectStore(storeName);
return new Promise((resolve, reject) => {
const request = store.delete(key);
request.onsuccess = () => resolve();
request.onerror = () => reject(request.error);
});
}
}
// React hook for IndexedDB
function useIndexedDB(dbName, storeName) {
const [db, setDb] = useState(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
const initDB = async () => {
const manager = new DatabaseManager(dbName);
await manager.open();
setDb(manager);
setLoading(false);
};
initDB();
}, [dbName]);
const addItem = async (data) => {
if (!db) return;
return db.add(storeName, data);
};
const getAllItems = async () => {
if (!db) return [];
return db.getAll(storeName);
};
const updateItem = async (data) => {
if (!db) return;
return db.update(storeName, data);
};
const deleteItem = async (key) => {
if (!db) return;
return db.delete(storeName, key);
};
return { addItem, getAllItems, updateItem, deleteItem, loading };
}
// Component using IndexedDB
function TodoApp() {
const [todos, setTodos] = useState([]);
const { addItem, getAllItems, updateItem, deleteItem, loading } =
useIndexedDB('TodoDB', 'todos');
useEffect(() => {
if (!loading) {
loadTodos();
}
}, [loading]);
const loadTodos = async () => {
const items = await getAllItems();
setTodos(items);
};
const handleAdd = async (text) => {
const newTodo = {
text,
completed: false,
createdAt: Date.now(),
};
await addItem(newTodo);
loadTodos();
};
const handleToggle = async (todo) => {
await updateItem({ ...todo, completed: !todo.completed });
loadTodos();
};
if (loading) return <div>Loading...</div>;
return (
<div>
{todos.map(todo => (
<div key={todo.id}>
<input
type="checkbox"
checked={todo.completed}
onChange={() => handleToggle(todo)}
/>
{todo.text}
</div>
))}
</div>
);
}
6. Observer Pattern Event Emitters
| Pattern | Implementation | Description | Use Case |
|---|---|---|---|
| EventEmitter | class EventEmitter { on, emit, off } |
Pub/sub pattern for decoupled communication | Event-driven architecture |
| on/subscribe | emitter.on('event', callback) |
Registers listener for specific event | Event subscription |
| emit/publish | emitter.emit('event', data) |
Triggers event, notifies all listeners | Event broadcasting |
| off/unsubscribe | emitter.off('event', callback) |
Removes specific event listener | Cleanup |
| once | emitter.once('event', callback) |
Listener fires only first time | One-time events |
| Custom Events | new CustomEvent('type', { detail }) |
Native browser event system | DOM-based communication |
Example: Event Emitter implementation with React integration
// EventEmitter class
class EventEmitter {
constructor() {
this.events = {};
}
on(event, listener) {
if (!this.events[event]) {
this.events[event] = [];
}
this.events[event].push(listener);
// Return unsubscribe function
return () => this.off(event, listener);
}
once(event, listener) {
const onceWrapper = (...args) => {
listener(...args);
this.off(event, onceWrapper);
};
return this.on(event, onceWrapper);
}
emit(event, ...args) {
if (!this.events[event]) return;
this.events[event].forEach(listener => listener(...args));
}
off(event, listenerToRemove) {
if (!this.events[event]) return;
this.events[event] = this.events[event].filter(
listener => listener !== listenerToRemove
);
}
removeAllListeners(event) {
if (event) {
delete this.events[event];
} else {
this.events = {};
}
}
}
// Global event bus instance
const eventBus = new EventEmitter();
// React hook for event subscriptions
function useEventListener(event, handler) {
useEffect(() => {
const unsubscribe = eventBus.on(event, handler);
return unsubscribe; // Cleanup on unmount
}, [event, handler]);
}
// Components using event bus
function UserProfile() {
const [user, setUser] = useState(null);
// Subscribe to user updates
useEventListener('user:updated', (updatedUser) => {
setUser(updatedUser);
});
return <div>{user?.name}</div>;
}
function UpdateButton() {
const handleUpdate = () => {
const newUser = { id: 1, name: 'John Updated' };
// Emit event to all subscribers
eventBus.emit('user:updated', newUser);
};
return <button onClick={handleUpdate}>Update User</button>;
}
// Custom DOM Events
function NotificationSystem() {
useEffect(() => {
const handleNotification = (event) => {
console.log('Notification:', event.detail);
};
window.addEventListener('app:notification', handleNotification);
return () => window.removeEventListener('app:notification', handleNotification);
}, []);
return <div>Notification System</div>;
}
function TriggerNotification() {
const notify = () => {
const event = new CustomEvent('app:notification', {
detail: {
message: 'Hello World',
type: 'info',
timestamp: Date.now(),
},
});
window.dispatchEvent(event);
};
return <button onClick={notify}>Send Notification</button>;
}
// Advanced: Typed EventEmitter with TypeScript
interface Events {
'user:login': (user: User) => void;
'user:logout': () => void;
'notification': (message: string) => void;
}
class TypedEventEmitter<T extends Record<string, (...args: any[]) => void>> {
private events = new Map<keyof T, Set<T[keyof T]>>();
on<K extends keyof T>(event: K, listener: T[K]): () => void {
if (!this.events.has(event)) {
this.events.set(event, new Set());
}
this.events.get(event)!.add(listener);
return () => this.off(event, listener);
}
emit<K extends keyof T>(event: K, ...args: Parameters<T[K]>): void {
this.events.get(event)?.forEach(listener => listener(...args));
}
off<K extends keyof T>(event: K, listener: T[K]): void {
this.events.get(event)?.delete(listener);
}
}
const typedEventBus = new TypedEventEmitter<Events>();
typedEventBus.on('user:login', (user) => console.log(user));
typedEventBus.emit('user:login', { id: 1, name: 'John' });
Data Flow Architecture Patterns
- Flux/Redux - Predictable unidirectional flow, ideal for complex state management with time-travel debugging
- GraphQL - Normalized cache eliminates data duplication, automatic updates across queries
- React Query - Optimistic updates provide instant feedback, with automatic rollback on errors
- WebSocket - Real-time bidirectional communication, essential for chat, live updates, collaborative editing
- IndexedDB - Offline-first architecture with large data storage capacity (GBs), transactional operations
- Observer Pattern - Decoupled component communication, reduces prop drilling, enables event-driven architecture