Best Practices: State Management
For: Architects designing workflow data flows
Level: Advanced
Time to read: 30 minutes
Patterns: 8+ production patterns
This guide covers context passing, data consistency, and state management strategies.
Workflow State Fundamentals
Context Data Model
┌──────────────────────────────────────┐
│ Workflow Execution Context │
├──────────────────────────────────────┤
│ │
│ $ (root) │
│ ├─ workflow │
│ │ ├─ execution_id │
│ │ ├─ input │
│ │ └─ started_at │
│ │ │
│ ├─ customer (from Step 1) │
│ │ ├─ id │
│ │ ├─ name │
│ │ └─ tier │
│ │ │
│ └─ order (from Step 2) │
│ ├─ id │
│ ├─ total │
│ └─ items │
│ │
└──────────────────────────────────────┘State Passing Patterns
Pattern 1: Result Path Chaining
states:
- name: LoadCustomer
type: Task
resource: urn:cascade:activity:load_customer
parameters:
customer_id: "{{ workflow.input.customer_id }}"
result: $.customer # Store at $.customer
next: LoadOrders
- name: LoadOrders
type: Task
resource: urn:cascade:activity:load_orders
parameters:
customer_id: "{{ $.customer.id }}" # Access from previous result
result: $.orders # Store at $.orders
next: ProcessAll
- name: ProcessAll
type: Task
resource: urn:cascade:activity:process
parameters:
customer: "{{ $.customer }}" # Available in context
orders: "{{ $.orders }}" # Available in context
end: trueData Flow:
Input → LoadCustomer → $.customer
↓
LoadOrders using $.customer → $.orders
↓
ProcessAll with both $.customer & $.ordersPattern 2: Nested Context Organization
# ✅ Good: Organized structure
states:
- name: GatherData
type: Parallel
branches:
- name: CustomerBranch
type: Task
result: $.customer
- name: OrderBranch
type: Task
result: $.order
- name: InventoryBranch
type: Task
result: $.inventory
next: Analyze
- name: Analyze
type: Task
parameters:
customer: "{{ $.customer }}"
order: "{{ $.order }}"
inventory: "{{ $.inventory }}"
end: true
# Context structure:
# {
# "customer": {...},
# "order": {...},
# "inventory": {...}
# }Pattern 3: Result Path Filtering
# ✅ Good: Only keep needed fields
- name: GetUserDetails
type: Task
resource: urn:cascade:activity:get_user
result_path: $.user_profile
output_path: "$.user_profile" # Filter output to only this path
next: ProcessUser
# Reduces context size:
# Large API response (1MB) → filtered to $.user_profile (50KB)Data Size Management
Context Size Optimization
❌ Bad: Large context (10MB+)
workflow.input: 5MB (full customer history)
previous_results: 3MB (all previous outputs)
intermediate_data: 2MB (temporary calculations)
Total: 10MB → slow serialization, memory issues
✅ Good: Minimal context (`<1MB`)
workflow.input: 50KB (ID + essential fields)
customer: 10KB (name, tier, email)
order: 20KB (ID, items, total)
Total: 80KB → fast, efficientPruning Context
# Before step: Large context
context:
full_customer_history: [1000 orders]
all_products: [50000 SKUs]
cached_data: [2MB]
# Use input_path to start clean
- name: Step2
type: Task
input_path: "$.customer" # Only pass customer data
result_path: "$.result"
next: Step3Data Consistency Patterns
Pattern 1: Read Consistent View
states:
- name: CheckInventory
type: Task
resource: urn:cascade:activity:check_stock
parameters:
items: "{{ $.order.items }}"
result: $.inventory_check
next: IfInventoryAvailable
- name: IfInventoryAvailable
type: Choice
choices:
- condition: "{{ $.inventory_check.all_available == true }}"
next: ReserveInventory
default: OutOfStock
- name: ReserveInventory
type: Task
parameters:
items: "{{ $.order.items }}" # Same items from inventory checkPattern 2: Transactional Semantics
// Workflow: Two database operations, transactional
type TransferFundsWorkflow struct{}
func (w *TransferFundsWorkflow) Execute(ctx context.Context, input *TransferInput) error {
// Step 1: Debit source
var debitResult DebitResult
err := workflow.ExecuteActivity(ctx, DebitAccount, input.FromAccount, input.Amount).Get(ctx, &debitResult)
if err != nil {
return err // Fail, no changes yet
}
// Step 2: Credit destination
var creditResult CreditResult
err = workflow.ExecuteActivity(ctx, CreditAccount, input.ToAccount, input.Amount).Get(ctx, &creditResult)
if err != nil {
// Compensate: reverse the debit
workflow.ExecuteActivity(ctx, RefundAccount, input.FromAccount, input.Amount)
return err
}
return nil
}State Recovery Patterns
Pattern 1: Checkpoint Strategy
# Save state at critical points
states:
- name: Step1
type: Task
result: $.phase1_result
next: Checkpoint1
- name: Checkpoint1
type: Task
resource: urn:cascade:activity:save_checkpoint
parameters:
phase: "phase_1"
state: "{{ $ }}" # Save entire context
next: Step2
- name: Step2
type: Task
catch:
- error_equals: ["States.ALL"]
next: LoadCheckpoint
next: Success
- name: LoadCheckpoint
type: Task
resource: urn:cascade:activity:load_checkpoint
parameters:
phase: "phase_1"
result: $.recovered_state
next: Step2 # Retry from checkpointPattern 2: State Reconciliation
// Verify state consistency after failure recovery
func ReconcileState(ctx context.Context, workflow *WorkflowState) error {
// Check: Do database records match workflow context?
customer, _ := db.GetCustomer(ctx, workflow.CustomerID)
if customer.Status != workflow.ExpectedStatus {
return fmt.Errorf("state mismatch: DB=%s, context=%s",
customer.Status, workflow.ExpectedStatus)
}
// Check: Are all transactions recorded?
dbTransactions := db.GetTransactions(ctx, workflow.OrderID)
contextTransactions := workflow.Transactions
if len(dbTransactions) != len(contextTransactions) {
return fmt.Errorf("transaction count mismatch")
}
return nil
}Versioning State
State Schema Evolution
# V1: Initial schema
input:
customer_id: string
order_id: string
# V2: Added validation status (backward compatible)
input:
customer_id: string
order_id: string
validation_status: string # Optional
# V3: Restructured (breaking change)
input:
customer:
id: string
order:
id: stringHandling State Evolution
// Normalize old state to new format
func NormalizeState(rawState map[string]interface{}) map[string]interface{} {
version := rawState["_version"].(string)
switch version {
case "v1":
// Convert V1 to V2
return map[string]interface{}{
"customer_id": rawState["customer_id"],
"order_id": rawState["order_id"],
"validation_status": "pending", // Default for V2
"_version": "v2",
}
case "v2":
// Convert V2 to V3
return map[string]interface{}{
"customer": map[string]interface{}{
"id": rawState["customer_id"],
},
"order": map[string]interface{}{
"id": rawState["order_id"],
},
"_version": "v3",
}
default:
return rawState
}
}State Debugging
Context Inspection
# View current context at execution point
cascade process inspect {execution_id} --json | jq '.context'
# Filter specific path
cascade process inspect {execution_id} --json | jq '.context.customer'
# Track context changes through execution
cascade trace {execution_id} --show-context-changesContext Logging
import "go.uber.org/zap"
func ProcessOrder(ctx context.Context, input *OrderInput) error {
logger := log.FromContext(ctx)
// Log initial context
logger.Info("Starting process",
zap.Any("input", input),
)
result, err := doWork(ctx, input)
// Log intermediate state
logger.Info("Work completed",
zap.Any("result", result),
zap.Error(err),
)
return err
}Best Practices
✅ DO:
- Organize context hierarchically
- Keep context small (
<1MB) - Use result_path for filtering
- Checkpoint at critical points
- Version state schemas
- Validate data consistency
- Document context shape
- Monitor context size
❌ DON’T:
- Store large datasets
- Mix concerns in context
- Lose intermediate results
- Assume state immutability
- Ignore schema evolution
- Skip state reconciliation
- Over-normalize context
Checklist: State Management
- Context organized by logical groups
- Context size < 1MB per execution
- Result paths filter large data
- Checkpoints at critical transitions
- State versioning strategy defined
- Data consistency validated
- State recovery tested
- Context logged for debugging
Updated: October 29, 2025
Version: 1.0
Patterns: 8+ production patterns
Last updated on