Observables
The observable system provides reactive state management with automatic synchronization across client and server boundaries.
Architecture Overview
The observable system synchronizes state between server and client using FiveM's emitNet/onNet events.
Ownership Model
| Observable | Server | Client | Sync Direction |
|---|---|---|---|
serverOwned | ✅ Read/Write | 🔒 Read-only | Server → Client |
clientOwned | 🔒 Read-only | ✅ Read/Write | Client → Server |
shared | ✅ Read/Write | ✅ Read/Write | Bidirectional |
serverOnly | ✅ Read/Write | — | None |
clientOnly | — | ✅ Read/Write | None |
Sync Patterns
Server-Owned (Server is authoritative):
Client-Owned (Client is authoritative):
Shared (Both can write, with conflict resolution):
Observable Types
| Type | Owner | Sync Direction | Use Case |
|---|---|---|---|
serverOnly | Server | None | Server-internal state |
clientOnly | Client | None | Client-internal state |
serverOwned | Server | Server → Clients | Authoritative server state |
clientOwned | Client | Client → Server | Client-authoritative state |
shared | Both | Bidirectional | Collaborative state |
When to Use Observables
Use Observables For
- Continuous state that changes frequently (hunger/thirst decay)
- State that needs real-time updates (HUD data)
- Bidirectional sync where both sides need current values
Use RPC Instead For
- One-time data transfer (initial spawn data)
- Request/response patterns (fetch character on login)
- Actions that happen once, not continuous state
Core Concepts
Observable Class Hierarchy
Subscription Lifecycle
Server-Owned Observables
The server has read/write access, clients have read-only:
import { serverOwned } from "@core/observable";
// Global state - broadcast to all players
const gameState = serverOwned({
id: "game:state",
initialValue: { phase: "lobby", timer: 0 },
broadcast: true,
});
// Per-player state - each player gets their own instance
const playerNeeds = serverOwned({
id: "player:needs",
initialValue: { hunger: 100, thirst: 100 },
perPlayer: true,
});
Server-Side Usage
// Update state (server only)
gameState.set({ phase: "playing", timer: 300 });
// Read state
const current = gameState.value;
// Subscribe to changes
gameState.subscribe((value) => {
console.log("Game state changed:", value);
});
// Per-player: update specific player
playerNeeds.set({ hunger: 80, thirst: 90 }, playerId);
Client-Side Usage
// Read state (client)
const state = gameState.value;
// Subscribe to updates from server
gameState.subscribe((value) => {
updateUI(value);
});
// ❌ Cannot set on client - throws error
// gameState.set({ phase: "ended" });
Client-Owned Observables
The client has read/write access, server has read-only:
import { clientOwned } from "@core/observable";
const playerInput = clientOwned({
id: "player:input",
initialValue: { x: 0, y: 0 },
});
Shared Observables
Both client and server can read/write with conflict resolution:
import { shared } from "@core/observable";
const position = shared({
id: "player:position",
initialValue: { x: 0, y: 0, z: 0 },
// Built-in strategies
conflictResolution: "last-write-wins",
// conflictResolution: "server-wins",
// conflictResolution: "client-wins",
// Or custom resolver
conflictResolution: (serverVal, clientVal, serverTime, clientTime) => {
return clientTime > serverTime ? clientVal : serverVal;
},
});
Player Subscriptions
By default, players must be manually subscribed to synchronized observables:
// Manual subscription
const inventory = serverOwned({
id: "player:inventory",
initialValue: { items: [] },
});
// Subscribe a specific player (server-side)
if (__RUNTIME__ === "server") {
inventory.subscribePlayer(playerId);
}
// Or use broadcast: true for auto-subscription
const globalState = serverOwned({
id: "game:global",
initialValue: { online: 0 },
broadcast: true, // All players receive updates
});
Player Subscription Flow
Per-Player vs Global Observables
| Option | Behavior | Use Case |
|---|---|---|
perPlayer: true | Each player has their own value | Player stats, inventory |
broadcast: true | All players receive same value | Game state, time |
| Neither | Manual subscription required | Selective sync |
// Per-player: Each player gets their own hunger value
const hunger = serverOwned({
id: "player:hunger",
initialValue: 100,
perPlayer: true, // Automatic per-player routing
});
// On server: Set for specific player
hunger.set(80, playerId);
// On client: Receives only their own value
hunger.subscribe((value) => {
console.log("My hunger:", value);
});
Lazy Registration
Observables use lazy registration - handlers are only registered when first accessed:
export class NeedsService {
// Safe to declare as field - registration deferred
private needs = serverOwned<NeedsState>({
id: "needs:player",
initialValue: { hunger: 100, thirst: 100 },
perPlayer: true,
});
@ClientOnly
initClientSide(): void {
// First access triggers registration
this.needs.subscribe((value) => {
sendHudEvent("needs:update", value);
});
}
}
Computed Observables
Derive values from other observables:
import { serverOwned, computed } from "@core/observable";
const health = serverOwned({ id: "player:health", initialValue: 100 });
const maxHealth = serverOwned({ id: "player:maxHealth", initialValue: 100 });
// Automatically updates when dependencies change
const healthPercent = computed(
"player:healthPercent",
() => (health.value / maxHealth.value) * 100,
[health, maxHealth]
);
Piping Observables
Connect observables with transformations:
const health = serverOwned({ id: "player:health", initialValue: 100 });
const healthDisplay = serverOwned({ id: "player:healthDisplay", initialValue: "100%" });
// Pipe health to display with transformation
healthDisplay.pipe(health, (h) => `${h}%`);
Lifecycle Integration
Initialize observable lifecycle in bootstrap:
// Server main.ts
import { initObservableLifecycle } from "@core/observable";
// In onStart
const observableLifecycle = initObservableLifecycle();
// In onStop
observableLifecycle.dispose();
Internal Architecture
Registry System
The observable system uses a global registry for tracking and lookup:
// Registry state
const registry = new Map<string, ObservableRegistryEntry>();
const playerSubscriptions = new Map<string, Set<PlayerId>>();
interface ObservableRegistryEntry {
id: string;
ownership: "serverOnly" | "clientOnly" | "serverOwned" | "clientOwned" | "shared";
broadcast: boolean;
observable: ReadonlyObservable<any>;
}
RPC Event Naming
Observables use specific event naming for network synchronization:
| Event Pattern | Direction | Purpose |
|---|---|---|
observable:sync:s2c:{id} | Server → Client | Push value to clients |
observable:sync:c2s:{id} | Client → Server | Push value to server |
observable:sync:sub:{id} | Client → Server | Subscribe request |
observable:sync:unsub:{id} | Client → Server | Unsubscribe request |
Sync Payload Structure
interface ObservableSyncPayload<T> {
id: string; // Observable ID
value: T; // New value
timestamp: number; // For conflict resolution
source?: PlayerId; // Source player (for client-owned/shared)
}
Debouncing
For frequently changing values, use debouncing to batch updates:
const position = serverOwned({
id: "player:position",
initialValue: { x: 0, y: 0, z: 0 },
syncDebounceMs: 50, // Batch updates within 50ms window
});
// Rapid updates are batched
position.set({ x: 1, y: 0, z: 0 });
position.set({ x: 2, y: 0, z: 0 });
position.set({ x: 3, y: 0, z: 0 });
// Only final value is synced after 50ms
Custom Equality
Control when updates are triggered:
const stats = serverOwned({
id: "player:stats",
initialValue: { hp: 100, mp: 100 },
// Deep comparison for objects
equals: (a, b) => a.hp === b.hp && a.mp === b.mp,
});
// This won't trigger an update (same values)
stats.set({ hp: 100, mp: 100 });
// This will trigger an update
stats.set({ hp: 99, mp: 100 });
Complete Example
// src/modules/needs/needs.service.ts
import { serverOwned } from "@core/observable";
import { sendHudEvent } from "@core/hud";
interface NeedsState {
hunger: number;
thirst: number;
health: number;
armor: number;
}
export class NeedsService {
private needs = serverOwned<NeedsState>({
id: "needs:player",
initialValue: { hunger: 100, thirst: 100, health: 100, armor: 0 },
perPlayer: true,
});
private decayTimer: ReturnType<typeof setInterval> | null = null;
@ServerOnly
initServerSide(): void {
// Decay needs every minute
this.decayTimer = setInterval(() => {
for (const [playerId] of this.needs.entries()) {
const current = this.needs.get(playerId);
if (current) {
this.needs.set({
...current,
hunger: Math.max(0, current.hunger - 1),
thirst: Math.max(0, current.thirst - 2),
}, playerId);
}
}
}, 60000);
}
@ClientOnly
initClientSide(): void {
// Subscribe to updates and forward to UI
this.needs.subscribe((value) => {
sendHudEvent("needs:update", value);
});
}
@ServerOnly
cleanupServerSide(): void {
if (this.decayTimer) {
clearInterval(this.decayTimer);
this.decayTimer = null;
}
}
}
API Reference
Factory Functions
| Function | Returns | Description |
|---|---|---|
serverOnly<T>(options) | WritableObservable<T> | Server-only, no sync |
clientOnly<T>(options) | WritableObservable<T> | Client-only, no sync |
serverOwned<T>(options) | ServerOwnedObservable<T> | Server R/W, client RO |
clientOwned<T>(options) | ClientOwnedObservable<T> | Client R/W, server RO |
shared<T>(options) | SharedObservable<T> | Both R/W with conflict resolution |
computed<T>(id, fn, sources) | ReadonlyObservable<T> | Derived from other observables |
Observable Options
interface ObservableOptions<T> {
id: string; // Required: unique identifier
initialValue: T; // Required: starting value
broadcast?: boolean; // Auto-subscribe all players (default: false)
perPlayer?: boolean; // Per-player instances (default: false)
equals?: (a: T, b: T) => boolean; // Custom equality check
syncDebounceMs?: number; // Debounce sync updates (default: 0)
noResponse?: boolean; // Fire-and-forget sync (default: true)
}
Observable Methods
| Method | Description |
|---|---|
.value | Get current value |
.set(value, playerId?) | Set new value (notifies if changed). For per-player observables, pass playerId to set a specific player's value |
.update(fn) | Update using function: (current) => newValue |
.subscribe(callback) | Subscribe to changes, returns Subscription |
.pipe(source, transform?) | Subscribe to another observable |
.subscribePlayer(id) | Subscribe a player (server-owned) |
.unsubscribePlayer(id) | Unsubscribe a player |
.get(playerId) | Get a specific player's value (per-player observables, server-side) |
.getPlayerValue(id) | Get player's value (client-owned, server-side) |
.entries() | Iterate all player entries as [playerId, value] pairs (per-player observables, server-side) |
.dispose() | Clean up resources |
Registry Functions
| Function | Description |
|---|---|
registerObservable(entry) | Add to registry |
unregisterObservable(id) | Remove from registry |
getObservable<T>(id) | Lookup by ID |
hasObservable(id) | Check if registered |
getAllObservables() | Get all entries |
getSubscribedPlayers(id) | Get subscribed player IDs |
addPlayerSubscription(id, playerId) | Subscribe player |
removePlayerFromAllSubscriptions(playerId) | Cleanup on disconnect |
Best Practices
- Choose the right type - Match ownership to data authority
- Use perPlayer for player state - Automatic per-player routing
- Prefer serverOwned - Server authority prevents cheating
- Use broadcast sparingly - Only for truly global state
- Clean up subscriptions - Use
DisposableStorefor cleanup - Consider RPC alternatives - Not everything needs to be reactive
- Use debouncing - For high-frequency updates like position
- Custom equality for objects - Prevent unnecessary updates
- Handle disconnects - Call
removePlayerFromAllSubscriptions