Skip to Content
GuidesActivity Development

Activity Development Guide

For: Go developers writing Cascade activities
Level: Intermediate → Advanced
Time to read: 30 minutes
Examples: 15+ complete activities

This guide shows how to write production-grade activities in Go that integrate seamlessly with Cascade workflows.


Activity Fundamentals

What is an Activity?

An activity is a Go function that:

  • Performs business logic or external I/O
  • Takes structured input
  • Returns structured output
  • Can be retried automatically
  • Must be idempotent (safe to retry)
  • Is referenced by URN in CDL

Activity Lifecycle

┌──────────────────────────────────────┐ │ 1. Workflow calls activity │ │ (Task state executes) │ └──────────────────┬───────────────────┘ ┌──────────────────────────────────────┐ │ 2. Activity receives input │ │ - Deserialized from context │ │ - Type-safe struct │ └──────────────────┬───────────────────┘ ┌──────────────────────────────────────┐ │ 3. Activity executes (your code) │ │ - Perform work (I/O, compute) │ │ - Can access databases, APIs │ │ - May fail (recoverable) │ └──────────────────┬───────────────────┘ ┌─────────┴─────────┐ │ │ ▼ ▼ ✅ SUCCESS ❌ ERROR │ │ ├─ Return result ├─ Return error │ Stored in │ Temporal retries │ context │ (if transient) │ │ └─────────┬─────────┘ ┌──────────────────────────────────────┐ │ 4. Workflow continues │ │ - Uses result from context │ │ - Proceeds to next state │ └──────────────────────────────────────┘

Minimal Activity

package activities import ( "context" ) // Input structure type GreetInput struct { Name string `json:"name"` } // Output structure type GreetOutput struct { Message string `json:"message"` } // Activity function func Greet(ctx context.Context, input *GreetInput) (*GreetOutput, error) { return &GreetOutput{ Message: "Hello, " + input.Name, }, nil }

Register in workflow registration:

activityOptions := workflow.ActivityOptions{ StartToCloseTimeout: 30 * time.Second, } ctx = workflow.WithActivityOptions(ctx, activityOptions) var result GreetOutput err := workflow.ExecuteActivity(ctx, Greet, GreetInput{Name: "Alice"}).Get(ctx, &result)

Activity Design Patterns

Pattern 1: Stateless Computation

Use: Pure functions with no side effects

Example: Data Transformation

package activities import ( "context" "strings" ) type TransformInput struct { Text string `json:"text"` Operation string `json:"operation"` // "upper", "lower", "title" } type TransformOutput struct { Result string `json:"result"` } // Pure function - no I/O, no side effects func TransformText(ctx context.Context, input *TransformInput) (*TransformOutput, error) { var result string switch input.Operation { case "upper": result = strings.ToUpper(input.Text) case "lower": result = strings.ToLower(input.Text) case "title": result = strings.Title(input.Text) default: return nil, fmt.Errorf("unknown operation: %s", input.Operation) } return &TransformOutput{Result: result}, nil } // Usage in workflow type TransformRequest struct { Input TransformInput `json:"input"` } // In workflow: // - name: TransformData // type: Task // resource: urn:cascade:activity:transform_text // parameters: // text: "{{ $.user_input }}" // operation: "upper" // result: $.transformed // timeout: 5s

Characteristics:

  • No external dependencies
  • Deterministic (same input = same output)
  • Fast (<1ms typically)
  • Always succeeds or returns error
  • Perfectly safe to retry

Pattern 2: Database Query Activity

Use: Fetch data from database

Example: Load Customer

package activities import ( "context" "database/sql" "fmt" "github.com/cascade-platform/sdk/database" ) type LoadCustomerInput struct { CustomerID string `json:"customer_id"` } type Customer struct { ID string `json:"id"` Name string `json:"name"` Email string `json:"email"` Tier string `json:"tier"` // gold, silver, bronze Balance float64 `json:"balance"` Status string `json:"status"` // active, inactive } func LoadCustomer(ctx context.Context, input *LoadCustomerInput) (*Customer, error) { // Get database connection from context db := database.FromContext(ctx) // Query database var customer Customer err := db.QueryRowContext(ctx, `SELECT id, name, email, tier, balance, status FROM customers WHERE id = $1`, input.CustomerID, ).Scan(&customer.ID, &customer.Name, &customer.Email, &customer.Tier, &customer.Balance, &customer.Status) if err == sql.ErrNoRows { return nil, fmt.Errorf("customer not found: %s", input.CustomerID) } if err != nil { return nil, fmt.Errorf("database error: %w", err) } return &customer, nil } // Usage in workflow // - name: GetCustomerInfo // type: Task // resource: urn:cascade:activity:load_customer // parameters: // customer_id: "{{ workflow.input.customer_id }}" // result: $.customer // retries: // max_attempts: 2 // timeout: 10s // next: ProcessCustomer

Best practices:

  • Use parameterized queries (prevent SQL injection)
  • Set appropriate timeout in CDL
  • Return meaningful errors for debugging
  • Consider caching for frequently accessed data

Pattern 3: Database Write Activity

Use: Insert or update data durably

Example: Create Order

package activities import ( "context" "time" "github.com/cascade-platform/sdk/database" "github.com/google/uuid" ) type CreateOrderInput struct { CustomerID string `json:"customer_id"` Items []OrderItem `json:"items"` Total float64 `json:"total"` } type OrderItem struct { SKU string `json:"sku"` Quantity int `json:"quantity"` Price float64 `json:"price"` } type CreateOrderOutput struct { OrderID string `json:"order_id"` CreatedAt time.Time `json:"created_at"` } func CreateOrder(ctx context.Context, input *CreateOrderInput) (*CreateOrderOutput, error) { db := database.FromContext(ctx) // Generate order ID orderID := uuid.New().String() now := time.Now() // Start transaction tx, err := db.BeginTx(ctx, nil) if err != nil { return nil, fmt.Errorf("failed to start transaction: %w", err) } defer tx.Rollback() // Rollback if we don't commit // Insert order header _, err = tx.ExecContext(ctx, `INSERT INTO orders (id, customer_id, total, created_at, status) VALUES ($1, $2, $3, $4, 'pending')`, orderID, input.CustomerID, input.Total, now, ) if err != nil { return nil, fmt.Errorf("failed to insert order: %w", err) } // Insert order items for _, item := range input.Items { _, err = tx.ExecContext(ctx, `INSERT INTO order_items (order_id, sku, quantity, price) VALUES ($1, $2, $3, $4)`, orderID, item.SKU, item.Quantity, item.Price, ) if err != nil { return nil, fmt.Errorf("failed to insert order item: %w", err) } } // Commit transaction if err = tx.Commit(); err != nil { return nil, fmt.Errorf("failed to commit transaction: %w", err) } return &CreateOrderOutput{ OrderID: orderID, CreatedAt: now, }, nil } // Usage in workflow // - name: CreateOrder // type: Task // resource: urn:cascade:activity:create_order // parameters: // customer_id: "{{ $.customer_id }}" // items: "{{ $.items }}" // total: "{{ $.total }}" // result: $.order // timeout: 20s // next: ProcessPayment

Critical: Idempotency

// PROBLEM: Not idempotent // If activity retries, creates duplicate order // SOLUTION: Idempotent version func CreateOrderIdempotent(ctx context.Context, input *CreateOrderInput) (*CreateOrderOutput, error) { orderID := input.OrderID // Use provided ID (same on retry) db := database.FromContext(ctx) // Check if order already exists (from previous attempt) var existingID string err := db.QueryRowContext(ctx, `SELECT id FROM orders WHERE id = $1`, orderID, ).Scan(&existingID) if err == nil { // Order already exists, return success return &CreateOrderOutput{OrderID: orderID, CreatedAt: time.Now()}, nil } // Order doesn't exist, create it // ... insert logic ... return &CreateOrderOutput{OrderID: orderID, CreatedAt: time.Now()}, nil } // In CDL, pass order_id from workflow: // - name: CreateOrder // type: Task // parameters: // order_id: "{{ workflow.execution_id }}" # Use unique ID // customer_id: "{{ $.customer_id }}"

Pattern 4: API Integration Activity

Use: Call external services

Example: Charge Payment

package activities import ( "context" "fmt" "time" "github.com/stripe/stripe-go/v72" "github.com/stripe/stripe-go/v72/charge" ) type ChargePaymentInput struct { Amount int64 `json:"amount"` // Amount in cents CardToken string `json:"card_token"` // Stripe token IdempotencyKey string `json:"idempotency_key"` // For safe retries } type ChargePaymentOutput struct { ChargeID string `json:"charge_id"` Status string `json:"status"` Amount int64 `json:"amount"` } func ChargePayment(ctx context.Context, input *ChargePaymentInput) (*ChargePaymentOutput, error) { // Set API key stripe.Key = "sk_test_..." // Create charge with idempotency key params := &stripe.ChargeParams{ Amount: stripe.Int64(input.Amount), Currency: stripe.String("usd"), Source: &stripe.SourceParams{Token: stripe.String(input.CardToken)}, Idempotent: stripe.String(input.IdempotencyKey), // Safe retries } chargeResult, err := charge.New(params) if err != nil { // Check if error is transient (should retry) if isTransientError(err) { return nil, err // Temporal will retry } // Non-transient error return nil, fmt.Errorf("charge failed: %w", err) } return &ChargePaymentOutput{ ChargeID: chargeResult.ID, Status: string(chargeResult.Status), Amount: chargeResult.Amount, }, nil } func isTransientError(err error) bool { // Network errors, timeouts, rate limits are transient switch err.Error() { case "connection refused", "timeout", "rate limited": return true default: return false } } // Usage in workflow // - name: ProcessPayment // type: Task // resource: urn:cascade:activity:charge_payment // parameters: // amount: "{{ $.order.total * 100 }}" // card_token: "{{ workflow.input.card_token }}" // idempotency_key: "{{ workflow.execution_id }}-payment" // result: $.payment // retries: // max_attempts: 3 // backoff: // initial_interval: 2s // max_interval: 30s // multiplier: 2 // timeout: 30s

Key points:

  • Use idempotency keys for safe retries
  • Distinguish transient vs permanent errors
  • Set reasonable timeouts
  • Log external API calls for debugging

Pattern 5: Event Publishing Activity

Use: Publish events to event bus

Example: Send Email

package activities import ( "context" "fmt" "github.com/cascade-platform/sdk/events" ) type SendEmailInput struct { To string `json:"to"` Subject string `json:"subject"` Body string `json:"body"` Template string `json:"template"` // optional } type SendEmailOutput struct { EmailID string `json:"email_id"` Status string `json:"status"` // queued, sent, failed } func SendEmail(ctx context.Context, input *SendEmailInput) (*SendEmailOutput, error) { eventBus := events.FromContext(ctx) // Create event emailEvent := map[string]interface{}{ "to": input.To, "subject": input.Subject, "body": input.Body, "template": input.Template, } // Publish event emailID, err := eventBus.PublishEvent(ctx, "email.send", emailEvent) if err != nil { return nil, fmt.Errorf("failed to publish email event: %w", err) } return &SendEmailOutput{ EmailID: emailID, Status: "queued", }, nil } // Usage in workflow // - name: SendConfirmation // type: Task // resource: urn:cascade:activity:send_email // parameters: // to: "{{ $.customer.email }}" // subject: "Order Confirmed" // body: "Your order {{ $.order_id }} has been confirmed" // result: $.email // timeout: 5s

Real-World Activities

Activity 1: Database Query with Caching

package activities import ( "context" "fmt" "time" "github.com/cascade-platform/sdk/cache" "github.com/cascade-platform/sdk/database" ) type GetProductInput struct { SKU string `json:"sku"` } type Product struct { SKU string `json:"sku"` Name string `json:"name"` Price float64 `json:"price"` InStock bool `json:"in_stock"` Quantity int `json:"quantity"` } func GetProduct(ctx context.Context, input *GetProductInput) (*Product, error) { cache := cache.FromContext(ctx) cacheKey := fmt.Sprintf("product:%s", input.SKU) // Try cache first if cached, err := cache.Get(ctx, cacheKey); err == nil { var product Product if err := json.Unmarshal(cached, &product); err == nil { return &product, nil } } // Cache miss, query database db := database.FromContext(ctx) var product Product err := db.QueryRowContext(ctx, `SELECT sku, name, price, quantity > 0 as in_stock, quantity FROM products WHERE sku = $1`, input.SKU, ).Scan(&product.SKU, &product.Name, &product.Price, &product.InStock, &product.Quantity) if err != nil { return nil, fmt.Errorf("product not found: %s", input.SKU) } // Store in cache (5 min TTL) data, _ := json.Marshal(product) cache.Set(ctx, cacheKey, data, 5*time.Minute) return &product, nil }

Activity 2: API Integration with Retry Logic

package activities import ( "context" "fmt" "time" "github.com/cenkalti/backoff/v4" ) type VerifyAddressInput struct { Street string `json:"street"` City string `json:"city"` State string `json:"state"` Zip string `json:"zip"` } type VerifyAddressOutput struct { Valid bool `json:"valid"` Corrected string `json:"corrected"` Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` } func VerifyAddress(ctx context.Context, input *VerifyAddressInput) (*VerifyAddressOutput, error) { // Use exponential backoff for retries expBackoff := backoff.NewExponentialBackOff() expBackoff.MaxElapsedTime = 20 * time.Second var result *VerifyAddressOutput err := backoff.RetryNotify(func() error { // Call external address verification API output, err := callAddressAPI(ctx, input) if err != nil { return err // Retry } result = output return nil }, expBackoff, func(err error, duration time.Duration) { fmt.Printf("Address verification failed, retrying in %v: %v\n", duration, err) }) if err != nil { return nil, fmt.Errorf("address verification failed: %w", err) } return result, nil } func callAddressAPI(ctx context.Context, input *VerifyAddressInput) (*VerifyAddressOutput, error) { // Implementation would call SmartyStreets, USPS, etc. // ... return &VerifyAddressOutput{Valid: true}, nil }

Best Practices

✅ DO:

  • Make activities idempotent - Safe to retry with same input
  • Return errors explicitly - Don’t swallow exceptions
  • Use structured I/O - JSON-serializable structs
  • Set heartbeats - Signal activity is alive (long-running)
  • Use context properly - Pass context to all I/O operations
  • Log meaningfully - Include input/output for debugging
  • Test thoroughly - Unit + integration tests
  • Use transactions - For multi-step database operations
  • Handle timeouts - Graceful degradation

❌ DON’T:

  • Call other activities - Activities are leaf functions
  • Ignore retry safety - Use idempotency keys/deduplication
  • Create infinite loops - Add timeout conditions
  • Block indefinitely - Always set timeouts
  • Store state between calls - Activities are stateless
  • Assume external systems work - Always handle failures
  • Log sensitive data - Never log credentials/tokens
  • Use hardcoded values - Pass parameters from workflow

Testing Activities

Unit Test Pattern

package activities import ( "context" "testing" "github.com/stretchr/testify/assert" ) func TestTransformText(t *testing.T) { tests := []struct { name string input TransformInput want string wantErr bool }{ { name: "uppercase", input: TransformInput{Text: "hello", Operation: "upper"}, want: "HELLO", wantErr: false, }, { name: "lowercase", input: TransformInput{Text: "HELLO", Operation: "lower"}, want: "hello", wantErr: false, }, { name: "invalid operation", input: TransformInput{Text: "hello", Operation: "invalid"}, want: "", wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result, err := TransformText(context.Background(), &tt.input) if tt.wantErr { assert.Error(t, err) return } assert.NoError(t, err) assert.Equal(t, tt.want, result.Result) }) } }

Integration Test Pattern

func TestLoadCustomer(t *testing.T) { // Setup test database db := setupTestDB(t) defer db.Close() // Insert test data _, err := db.Exec(`INSERT INTO customers (id, name, email, tier, balance, status) VALUES ('cust-123', 'Alice', 'alice@example.com', 'gold', 1000.00, 'active')`) assert.NoError(t, err) // Create mock context ctx := context.Background() ctx = database.WithContext(ctx, db) // Call activity result, err := LoadCustomer(ctx, &LoadCustomerInput{CustomerID: "cust-123"}) assert.NoError(t, err) assert.Equal(t, "Alice", result.Name) assert.Equal(t, "gold", result.Tier) }

Activity Performance

Latency Guidelines

Activity TypeTargetNotes
Computation<5msPure functions
DB Query<50msWell-indexed queries
DB Write<100msWith transaction
API Call<500msDepends on external service
File I/O<200msLocal filesystem

Optimization Tips

  • Cache frequently accessed data
  • Use connection pooling
  • Index database queries
  • Batch operations when possible
  • Use goroutines for parallelism (carefully!)

Next Steps

Ready to register activities?Workflow Development Guide

Need schema help?Schema Development Guide

Testing strategies?Testing & Debugging Guide


Updated: October 29, 2025
Version: 1.0
Examples: 15+ complete activities
Patterns: 5 proven designs

Last updated on