Data Flow Architecture Implementation

1. Flux Redux Unidirectional Flow

Concept Component Description Role
Action { type: 'ADD', payload: data } Plain object describing state change intent Event description
Dispatcher dispatch(action) Central hub distributing actions to stores Action distribution
Store function reducer(state, action) Holds application state, responds to actions State container
View useSelector(state => state.data) React component subscribing to store updates UI rendering
Middleware const logger = store => next => action => {} Intercepts actions for logging, async, etc Side effect handling

Example: Complete Flux/Redux unidirectional data flow

// Actions - describe what happened
const ActionTypes = {
  ADD_TODO: 'ADD_TODO',
  TOGGLE_TODO: 'TOGGLE_TODO',
  SET_FILTER: 'SET_FILTER',
};

const addTodo = (text) => ({
  type: ActionTypes.ADD_TODO,
  payload: { id: Date.now(), text, completed: false },
});

const toggleTodo = (id) => ({
  type: ActionTypes.TOGGLE_TODO,
  payload: { id },
});

// Reducer - specifies how state changes
function todosReducer(state = [], action) {
  switch (action.type) {
    case ActionTypes.ADD_TODO:
      return [...state, action.payload];
    
    case ActionTypes.TOGGLE_TODO:
      return state.map(todo =>
        todo.id === action.payload.id
          ? { ...todo, completed: !todo.completed }
          : todo
      );
    
    default:
      return state;
  }
}

// Store - holds state, provides dispatch
import { createStore, combineReducers } from 'redux';

const rootReducer = combineReducers({
  todos: todosReducer,
  filter: filterReducer,
});

const store = createStore(rootReducer);

// View - subscribes and dispatches
import { useSelector, useDispatch } from 'react-redux';

function TodoList() {
  const todos = useSelector(state => state.todos);
  const dispatch = useDispatch();

  return (
    <div>
      {todos.map(todo => (
        <div key={todo.id}>
          <input
            type="checkbox"
            checked={todo.completed}
            onChange={() => dispatch(toggleTodo(todo.id))}
          />
          {todo.text}
        </div>
      ))}
      <button onClick={() => dispatch(addTodo('New Todo'))}>
        Add Todo
      </button>
    </div>
  );
}

// Data Flow:
// 1. User clicks button → dispatch(action)
// 2. Action sent to reducer
// 3. Reducer creates new state
// 4. Store notifies subscribers
// 5. Component re-renders with new state
Note: Unidirectional flow ensures predictable state changes - state can only be modified through actions, making debugging easier with Redux DevTools.

2. GraphQL Apollo Client Normalization

Feature Syntax Description Use Case
ApolloClient new ApolloClient({ uri, cache }) GraphQL client with intelligent caching Client initialization
InMemoryCache new InMemoryCache({ typePolicies }) Normalized cache storing entities by ID Cache configuration
useQuery const { data, loading } = useQuery(QUERY) Fetches data with automatic caching and updates Data fetching
useMutation const [mutate] = useMutation(MUTATION) Executes mutations with cache updates Data modification
cache.writeQuery cache.writeQuery({ query, data }) Manually updates cache with new data Optimistic updates
refetchQueries { refetchQueries: [{ query: QUERY }] } Refreshes queries after mutation Cache invalidation
@client directive field @client Local-only fields not fetched from server Client-side state

Example: Apollo Client with normalized cache and optimistic updates

// Apollo Client setup
import { ApolloClient, InMemoryCache, gql } from '@apollo/client';

const client = new ApolloClient({
  uri: 'https://api.example.com/graphql',
  cache: new InMemoryCache({
    typePolicies: {
      Query: {
        fields: {
          posts: {
            merge(existing = [], incoming) {
              return [...existing, ...incoming];
            },
          },
        },
      },
      User: {
        fields: {
          fullName: {
            read(_, { readField }) {
              return `${readField('firstName')} ${readField('lastName')}`;
            },
          },
        },
      },
    },
  }),
});

// GraphQL queries and mutations
const GET_USERS = gql`
  query GetUsers {
    users {
      id
      name
      email
    }
  }
`;

const CREATE_USER = gql`
  mutation CreateUser($input: UserInput!) {
    createUser(input: $input) {
      id
      name
      email
    }
  }
`;

// Component with query
import { useQuery, useMutation } from '@apollo/client';

function UserList() {
  const { data, loading, error } = useQuery(GET_USERS, {
    pollInterval: 30000, // Refetch every 30s
    fetchPolicy: 'cache-and-network',
  });

  const [createUser] = useMutation(CREATE_USER, {
    // Optimistic response
    optimisticResponse: {
      createUser: {
        __typename: 'User',
        id: 'temp-id',
        name: 'New User',
        email: 'temp@example.com',
      },
    },
    // Update cache after mutation
    update(cache, { data: { createUser } }) {
      const existing = cache.readQuery({ query: GET_USERS });
      cache.writeQuery({
        query: GET_USERS,
        data: {
          users: [...existing.users, createUser],
        },
      });
    },
  });

  if (loading) return <div>Loading...</div>;
  if (error) return <div>Error: {error.message}</div>;

  return (
    <div>
      {data.users.map(user => (
        <div key={user.id}>{user.name} - {user.email}</div>
      ))}
      <button onClick={() => createUser({ 
        variables: { input: { name: 'John', email: 'john@example.com' } } 
      })}>
        Add User
      </button>
    </div>
  );
}

// Cache normalization example
// Apollo automatically normalizes:
// { users: [{ id: 1, name: 'John' }, { id: 2, name: 'Jane' }] }
// Into:
// {
//   ROOT_QUERY: { users: [ref('User:1'), ref('User:2')] },
//   'User:1': { id: 1, name: 'John' },
//   'User:2': { id: 2, name: 'Jane' }
// }

3. React Query Optimistic Updates

Feature Syntax Description Use Case
useQuery useQuery({ queryKey, queryFn }) Declarative data fetching with caching Server state
useMutation useMutation({ mutationFn, onMutate }) Data mutations with rollback support Data updates
queryClient.setQueryData queryClient.setQueryData(key, data) Manually updates cached query data Optimistic updates
onMutate onMutate: async (newData) => {} Fires before mutation, returns rollback context Optimistic UI
onError onError: (err, vars, context) => {} Handles mutation errors, rolls back changes Error recovery
invalidateQueries queryClient.invalidateQueries(['key']) Marks queries as stale, triggers refetch Cache invalidation

Example: React Query with optimistic updates and rollback

import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';

function TodoList() {
  const queryClient = useQueryClient();

  // Fetch todos
  const { data: todos } = useQuery({
    queryKey: ['todos'],
    queryFn: async () => {
      const res = await fetch('/api/todos');
      return res.json();
    },
  });

  // Mutation with optimistic update
  const updateTodoMutation = useMutation({
    mutationFn: async (updatedTodo) => {
      const res = await fetch(`/api/todos/${updatedTodo.id}`, {
        method: 'PUT',
        body: JSON.stringify(updatedTodo),
      });
      return res.json();
    },
    // Optimistic update
    onMutate: async (updatedTodo) => {
      // Cancel outgoing refetches
      await queryClient.cancelQueries({ queryKey: ['todos'] });

      // Snapshot current value
      const previousTodos = queryClient.getQueryData(['todos']);

      // Optimistically update cache
      queryClient.setQueryData(['todos'], (old) =>
        old.map((todo) =>
          todo.id === updatedTodo.id ? updatedTodo : todo
        )
      );

      // Return context with snapshot
      return { previousTodos };
    },
    // Rollback on error
    onError: (err, updatedTodo, context) => {
      queryClient.setQueryData(['todos'], context.previousTodos);
      console.error('Update failed, rolled back:', err);
    },
    // Refetch after success
    onSettled: () => {
      queryClient.invalidateQueries({ queryKey: ['todos'] });
    },
  });

  const deleteTodoMutation = useMutation({
    mutationFn: async (id) => {
      await fetch(`/api/todos/${id}`, { method: 'DELETE' });
    },
    onMutate: async (deletedId) => {
      await queryClient.cancelQueries({ queryKey: ['todos'] });
      const previousTodos = queryClient.getQueryData(['todos']);

      // Optimistically remove from UI
      queryClient.setQueryData(['todos'], (old) =>
        old.filter((todo) => todo.id !== deletedId)
      );

      return { previousTodos };
    },
    onError: (err, deletedId, context) => {
      queryClient.setQueryData(['todos'], context.previousTodos);
    },
    onSettled: () => {
      queryClient.invalidateQueries({ queryKey: ['todos'] });
    },
  });

  const handleToggle = (todo) => {
    updateTodoMutation.mutate({
      ...todo,
      completed: !todo.completed,
    });
  };

  return (
    <div>
      {todos?.map((todo) => (
        <div key={todo.id}>
          <input
            type="checkbox"
            checked={todo.completed}
            onChange={() => handleToggle(todo)}
          />
          {todo.text}
          <button onClick={() => deleteTodoMutation.mutate(todo.id)}>
            Delete
          </button>
        </div>
      ))}
    </div>
  );
}

4. WebSocket Real-time Data Sync

Feature API/Method Description Use Case
WebSocket new WebSocket('ws://url') Creates persistent bidirectional connection Real-time communication
onopen ws.onopen = () => {} Fires when connection established Connection ready
onmessage ws.onmessage = (event) => {} Receives messages from server Incoming data
send ws.send(JSON.stringify(data)) Sends data to server Outgoing messages
onerror ws.onerror = (error) => {} Handles connection errors Error handling
onclose ws.onclose = () => {} Fires when connection closes Cleanup, reconnection
Socket.io io('http://url') Library with auto-reconnection, rooms, namespaces Enhanced WebSocket

Example: WebSocket with React hook and reconnection logic

import { useEffect, useState, useRef } from 'react';

// Custom WebSocket hook
function useWebSocket(url) {
  const [messages, setMessages] = useState([]);
  const [connectionStatus, setConnectionStatus] = useState('disconnected');
  const ws = useRef(null);
  const reconnectTimeoutRef = useRef(null);

  const connect = () => {
    ws.current = new WebSocket(url);

    ws.current.onopen = () => {
      console.log('Connected');
      setConnectionStatus('connected');
    };

    ws.current.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setMessages((prev) => [...prev, data]);
    };

    ws.current.onerror = (error) => {
      console.error('WebSocket error:', error);
      setConnectionStatus('error');
    };

    ws.current.onclose = () => {
      console.log('Disconnected');
      setConnectionStatus('disconnected');
      
      // Auto-reconnect after 3 seconds
      reconnectTimeoutRef.current = setTimeout(() => {
        console.log('Reconnecting...');
        connect();
      }, 3000);
    };
  };

  useEffect(() => {
    connect();

    return () => {
      if (reconnectTimeoutRef.current) {
        clearTimeout(reconnectTimeoutRef.current);
      }
      if (ws.current) {
        ws.current.close();
      }
    };
  }, [url]);

  const sendMessage = (message) => {
    if (ws.current?.readyState === WebSocket.OPEN) {
      ws.current.send(JSON.stringify(message));
    }
  };

  return { messages, sendMessage, connectionStatus };
}

// Component using WebSocket
function ChatRoom() {
  const { messages, sendMessage, connectionStatus } = useWebSocket(
    'ws://localhost:8080'
  );
  const [input, setInput] = useState('');

  const handleSend = () => {
    sendMessage({ type: 'chat', text: input, timestamp: Date.now() });
    setInput('');
  };

  return (
    <div>
      <div>Status: {connectionStatus}</div>
      <div>
        {messages.map((msg, i) => (
          <div key={i}>{msg.text}</div>
        ))}
      </div>
      <input
        value={input}
        onChange={(e) => setInput(e.target.value)}
        onKeyPress={(e) => e.key === 'Enter' && handleSend()}
      />
      <button onClick={handleSend}>Send</button>
    </div>
  );
}

// Socket.io alternative
import io from 'socket.io-client';

function useSocketIO(url) {
  const [socket, setSocket] = useState(null);

  useEffect(() => {
    const newSocket = io(url, {
      reconnection: true,
      reconnectionDelay: 1000,
      reconnectionAttempts: 5,
    });

    newSocket.on('connect', () => {
      console.log('Socket.io connected');
    });

    setSocket(newSocket);

    return () => newSocket.close();
  }, [url]);

  return socket;
}

5. IndexedDB Offline Data Persistence

Feature API Description Use Case
open indexedDB.open(name, version) Opens database, creates if doesn't exist DB initialization
createObjectStore db.createObjectStore(name, { keyPath }) Creates table-like storage space Schema definition
transaction db.transaction([store], 'readwrite') Groups operations with ACID properties Data operations
add objectStore.add(data) Inserts new record, fails if key exists Insert operation
put objectStore.put(data) Updates or inserts record Upsert operation
get objectStore.get(key) Retrieves single record by key Read operation
getAll objectStore.getAll() Retrieves all records Bulk read
Index objectStore.createIndex(name, keyPath) Creates searchable index on field Query optimization

Example: IndexedDB wrapper with React hook

// IndexedDB helper class
class DatabaseManager {
  constructor(dbName, version = 1) {
    this.dbName = dbName;
    this.version = version;
    this.db = null;
  }

  async open() {
    return new Promise((resolve, reject) => {
      const request = indexedDB.open(this.dbName, this.version);

      request.onerror = () => reject(request.error);
      request.onsuccess = () => {
        this.db = request.result;
        resolve(this.db);
      };

      request.onupgradeneeded = (event) => {
        const db = event.target.result;

        // Create object stores
        if (!db.objectStoreNames.contains('todos')) {
          const todoStore = db.createObjectStore('todos', { 
            keyPath: 'id', 
            autoIncrement: true 
          });
          todoStore.createIndex('completed', 'completed', { unique: false });
          todoStore.createIndex('createdAt', 'createdAt', { unique: false });
        }
      };
    });
  }

  async add(storeName, data) {
    const transaction = this.db.transaction([storeName], 'readwrite');
    const store = transaction.objectStore(storeName);
    return new Promise((resolve, reject) => {
      const request = store.add(data);
      request.onsuccess = () => resolve(request.result);
      request.onerror = () => reject(request.error);
    });
  }

  async getAll(storeName) {
    const transaction = this.db.transaction([storeName], 'readonly');
    const store = transaction.objectStore(storeName);
    return new Promise((resolve, reject) => {
      const request = store.getAll();
      request.onsuccess = () => resolve(request.result);
      request.onerror = () => reject(request.error);
    });
  }

  async update(storeName, data) {
    const transaction = this.db.transaction([storeName], 'readwrite');
    const store = transaction.objectStore(storeName);
    return new Promise((resolve, reject) => {
      const request = store.put(data);
      request.onsuccess = () => resolve(request.result);
      request.onerror = () => reject(request.error);
    });
  }

  async delete(storeName, key) {
    const transaction = this.db.transaction([storeName], 'readwrite');
    const store = transaction.objectStore(storeName);
    return new Promise((resolve, reject) => {
      const request = store.delete(key);
      request.onsuccess = () => resolve();
      request.onerror = () => reject(request.error);
    });
  }
}

// React hook for IndexedDB
function useIndexedDB(dbName, storeName) {
  const [db, setDb] = useState(null);
  const [loading, setLoading] = useState(true);

  useEffect(() => {
    const initDB = async () => {
      const manager = new DatabaseManager(dbName);
      await manager.open();
      setDb(manager);
      setLoading(false);
    };
    initDB();
  }, [dbName]);

  const addItem = async (data) => {
    if (!db) return;
    return db.add(storeName, data);
  };

  const getAllItems = async () => {
    if (!db) return [];
    return db.getAll(storeName);
  };

  const updateItem = async (data) => {
    if (!db) return;
    return db.update(storeName, data);
  };

  const deleteItem = async (key) => {
    if (!db) return;
    return db.delete(storeName, key);
  };

  return { addItem, getAllItems, updateItem, deleteItem, loading };
}

// Component using IndexedDB
function TodoApp() {
  const [todos, setTodos] = useState([]);
  const { addItem, getAllItems, updateItem, deleteItem, loading } = 
    useIndexedDB('TodoDB', 'todos');

  useEffect(() => {
    if (!loading) {
      loadTodos();
    }
  }, [loading]);

  const loadTodos = async () => {
    const items = await getAllItems();
    setTodos(items);
  };

  const handleAdd = async (text) => {
    const newTodo = {
      text,
      completed: false,
      createdAt: Date.now(),
    };
    await addItem(newTodo);
    loadTodos();
  };

  const handleToggle = async (todo) => {
    await updateItem({ ...todo, completed: !todo.completed });
    loadTodos();
  };

  if (loading) return <div>Loading...</div>;

  return (
    <div>
      {todos.map(todo => (
        <div key={todo.id}>
          <input
            type="checkbox"
            checked={todo.completed}
            onChange={() => handleToggle(todo)}
          />
          {todo.text}
        </div>
      ))}
    </div>
  );
}

6. Observer Pattern Event Emitters

Pattern Implementation Description Use Case
EventEmitter class EventEmitter { on, emit, off } Pub/sub pattern for decoupled communication Event-driven architecture
on/subscribe emitter.on('event', callback) Registers listener for specific event Event subscription
emit/publish emitter.emit('event', data) Triggers event, notifies all listeners Event broadcasting
off/unsubscribe emitter.off('event', callback) Removes specific event listener Cleanup
once emitter.once('event', callback) Listener fires only first time One-time events
Custom Events new CustomEvent('type', { detail }) Native browser event system DOM-based communication

Example: Event Emitter implementation with React integration

// EventEmitter class
class EventEmitter {
  constructor() {
    this.events = {};
  }

  on(event, listener) {
    if (!this.events[event]) {
      this.events[event] = [];
    }
    this.events[event].push(listener);
    
    // Return unsubscribe function
    return () => this.off(event, listener);
  }

  once(event, listener) {
    const onceWrapper = (...args) => {
      listener(...args);
      this.off(event, onceWrapper);
    };
    return this.on(event, onceWrapper);
  }

  emit(event, ...args) {
    if (!this.events[event]) return;
    this.events[event].forEach(listener => listener(...args));
  }

  off(event, listenerToRemove) {
    if (!this.events[event]) return;
    this.events[event] = this.events[event].filter(
      listener => listener !== listenerToRemove
    );
  }

  removeAllListeners(event) {
    if (event) {
      delete this.events[event];
    } else {
      this.events = {};
    }
  }
}

// Global event bus instance
const eventBus = new EventEmitter();

// React hook for event subscriptions
function useEventListener(event, handler) {
  useEffect(() => {
    const unsubscribe = eventBus.on(event, handler);
    return unsubscribe; // Cleanup on unmount
  }, [event, handler]);
}

// Components using event bus
function UserProfile() {
  const [user, setUser] = useState(null);

  // Subscribe to user updates
  useEventListener('user:updated', (updatedUser) => {
    setUser(updatedUser);
  });

  return <div>{user?.name}</div>;
}

function UpdateButton() {
  const handleUpdate = () => {
    const newUser = { id: 1, name: 'John Updated' };
    // Emit event to all subscribers
    eventBus.emit('user:updated', newUser);
  };

  return <button onClick={handleUpdate}>Update User</button>;
}

// Custom DOM Events
function NotificationSystem() {
  useEffect(() => {
    const handleNotification = (event) => {
      console.log('Notification:', event.detail);
    };

    window.addEventListener('app:notification', handleNotification);
    return () => window.removeEventListener('app:notification', handleNotification);
  }, []);

  return <div>Notification System</div>;
}

function TriggerNotification() {
  const notify = () => {
    const event = new CustomEvent('app:notification', {
      detail: {
        message: 'Hello World',
        type: 'info',
        timestamp: Date.now(),
      },
    });
    window.dispatchEvent(event);
  };

  return <button onClick={notify}>Send Notification</button>;
}

// Advanced: Typed EventEmitter with TypeScript
interface Events {
  'user:login': (user: User) => void;
  'user:logout': () => void;
  'notification': (message: string) => void;
}

class TypedEventEmitter<T extends Record<string, (...args: any[]) => void>> {
  private events = new Map<keyof T, Set<T[keyof T]>>();

  on<K extends keyof T>(event: K, listener: T[K]): () => void {
    if (!this.events.has(event)) {
      this.events.set(event, new Set());
    }
    this.events.get(event)!.add(listener);
    return () => this.off(event, listener);
  }

  emit<K extends keyof T>(event: K, ...args: Parameters<T[K]>): void {
    this.events.get(event)?.forEach(listener => listener(...args));
  }

  off<K extends keyof T>(event: K, listener: T[K]): void {
    this.events.get(event)?.delete(listener);
  }
}

const typedEventBus = new TypedEventEmitter<Events>();
typedEventBus.on('user:login', (user) => console.log(user));
typedEventBus.emit('user:login', { id: 1, name: 'John' });

Data Flow Architecture Patterns

  • Flux/Redux - Predictable unidirectional flow, ideal for complex state management with time-travel debugging
  • GraphQL - Normalized cache eliminates data duplication, automatic updates across queries
  • React Query - Optimistic updates provide instant feedback, with automatic rollback on errors
  • WebSocket - Real-time bidirectional communication, essential for chat, live updates, collaborative editing
  • IndexedDB - Offline-first architecture with large data storage capacity (GBs), transactional operations
  • Observer Pattern - Decoupled component communication, reduces prop drilling, enables event-driven architecture