Activity Development Guide
For: Go developers writing Cascade activities
Level: Intermediate → Advanced
Time to read: 30 minutes
Examples: 15+ complete activities
This guide shows how to write production-grade activities in Go that integrate seamlessly with Cascade workflows.
Activity Fundamentals
What is an Activity?
An activity is a Go function that:
- Performs business logic or external I/O
- Takes structured input
- Returns structured output
- Can be retried automatically
- Must be idempotent (safe to retry)
- Is referenced by URN in CDL
Activity Lifecycle
┌──────────────────────────────────────┐
│ 1. Workflow calls activity │
│ (Task state executes) │
└──────────────────┬───────────────────┘
│
▼
┌──────────────────────────────────────┐
│ 2. Activity receives input │
│ - Deserialized from context │
│ - Type-safe struct │
└──────────────────┬───────────────────┘
│
▼
┌──────────────────────────────────────┐
│ 3. Activity executes (your code) │
│ - Perform work (I/O, compute) │
│ - Can access databases, APIs │
│ - May fail (recoverable) │
└──────────────────┬───────────────────┘
│
┌─────────┴─────────┐
│ │
▼ ▼
✅ SUCCESS ❌ ERROR
│ │
├─ Return result ├─ Return error
│ Stored in │ Temporal retries
│ context │ (if transient)
│ │
└─────────┬─────────┘
│
▼
┌──────────────────────────────────────┐
│ 4. Workflow continues │
│ - Uses result from context │
│ - Proceeds to next state │
└──────────────────────────────────────┘Minimal Activity
package activities
import (
"context"
)
// Input structure
type GreetInput struct {
Name string `json:"name"`
}
// Output structure
type GreetOutput struct {
Message string `json:"message"`
}
// Activity function
func Greet(ctx context.Context, input *GreetInput) (*GreetOutput, error) {
return &GreetOutput{
Message: "Hello, " + input.Name,
}, nil
}Register in workflow registration:
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, activityOptions)
var result GreetOutput
err := workflow.ExecuteActivity(ctx, Greet, GreetInput{Name: "Alice"}).Get(ctx, &result)Activity Design Patterns
Pattern 1: Stateless Computation
Use: Pure functions with no side effects
Example: Data Transformation
package activities
import (
"context"
"strings"
)
type TransformInput struct {
Text string `json:"text"`
Operation string `json:"operation"` // "upper", "lower", "title"
}
type TransformOutput struct {
Result string `json:"result"`
}
// Pure function - no I/O, no side effects
func TransformText(ctx context.Context, input *TransformInput) (*TransformOutput, error) {
var result string
switch input.Operation {
case "upper":
result = strings.ToUpper(input.Text)
case "lower":
result = strings.ToLower(input.Text)
case "title":
result = strings.Title(input.Text)
default:
return nil, fmt.Errorf("unknown operation: %s", input.Operation)
}
return &TransformOutput{Result: result}, nil
}
// Usage in workflow
type TransformRequest struct {
Input TransformInput `json:"input"`
}
// In workflow:
// - name: TransformData
// type: Task
// resource: urn:cascade:activity:transform_text
// parameters:
// text: "{{ $.user_input }}"
// operation: "upper"
// result: $.transformed
// timeout: 5sCharacteristics:
- No external dependencies
- Deterministic (same input = same output)
- Fast (
<1mstypically) - Always succeeds or returns error
- Perfectly safe to retry
Pattern 2: Database Query Activity
Use: Fetch data from database
Example: Load Customer
package activities
import (
"context"
"database/sql"
"fmt"
"github.com/cascade-platform/sdk/database"
)
type LoadCustomerInput struct {
CustomerID string `json:"customer_id"`
}
type Customer struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
Tier string `json:"tier"` // gold, silver, bronze
Balance float64 `json:"balance"`
Status string `json:"status"` // active, inactive
}
func LoadCustomer(ctx context.Context, input *LoadCustomerInput) (*Customer, error) {
// Get database connection from context
db := database.FromContext(ctx)
// Query database
var customer Customer
err := db.QueryRowContext(ctx,
`SELECT id, name, email, tier, balance, status
FROM customers WHERE id = $1`,
input.CustomerID,
).Scan(&customer.ID, &customer.Name, &customer.Email, &customer.Tier, &customer.Balance, &customer.Status)
if err == sql.ErrNoRows {
return nil, fmt.Errorf("customer not found: %s", input.CustomerID)
}
if err != nil {
return nil, fmt.Errorf("database error: %w", err)
}
return &customer, nil
}
// Usage in workflow
// - name: GetCustomerInfo
// type: Task
// resource: urn:cascade:activity:load_customer
// parameters:
// customer_id: "{{ workflow.input.customer_id }}"
// result: $.customer
// retries:
// max_attempts: 2
// timeout: 10s
// next: ProcessCustomerBest practices:
- Use parameterized queries (prevent SQL injection)
- Set appropriate timeout in CDL
- Return meaningful errors for debugging
- Consider caching for frequently accessed data
Pattern 3: Database Write Activity
Use: Insert or update data durably
Example: Create Order
package activities
import (
"context"
"time"
"github.com/cascade-platform/sdk/database"
"github.com/google/uuid"
)
type CreateOrderInput struct {
CustomerID string `json:"customer_id"`
Items []OrderItem `json:"items"`
Total float64 `json:"total"`
}
type OrderItem struct {
SKU string `json:"sku"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
type CreateOrderOutput struct {
OrderID string `json:"order_id"`
CreatedAt time.Time `json:"created_at"`
}
func CreateOrder(ctx context.Context, input *CreateOrderInput) (*CreateOrderOutput, error) {
db := database.FromContext(ctx)
// Generate order ID
orderID := uuid.New().String()
now := time.Now()
// Start transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback() // Rollback if we don't commit
// Insert order header
_, err = tx.ExecContext(ctx,
`INSERT INTO orders (id, customer_id, total, created_at, status)
VALUES ($1, $2, $3, $4, 'pending')`,
orderID, input.CustomerID, input.Total, now,
)
if err != nil {
return nil, fmt.Errorf("failed to insert order: %w", err)
}
// Insert order items
for _, item := range input.Items {
_, err = tx.ExecContext(ctx,
`INSERT INTO order_items (order_id, sku, quantity, price)
VALUES ($1, $2, $3, $4)`,
orderID, item.SKU, item.Quantity, item.Price,
)
if err != nil {
return nil, fmt.Errorf("failed to insert order item: %w", err)
}
}
// Commit transaction
if err = tx.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit transaction: %w", err)
}
return &CreateOrderOutput{
OrderID: orderID,
CreatedAt: now,
}, nil
}
// Usage in workflow
// - name: CreateOrder
// type: Task
// resource: urn:cascade:activity:create_order
// parameters:
// customer_id: "{{ $.customer_id }}"
// items: "{{ $.items }}"
// total: "{{ $.total }}"
// result: $.order
// timeout: 20s
// next: ProcessPaymentCritical: Idempotency
// PROBLEM: Not idempotent
// If activity retries, creates duplicate order
// SOLUTION: Idempotent version
func CreateOrderIdempotent(ctx context.Context, input *CreateOrderInput) (*CreateOrderOutput, error) {
orderID := input.OrderID // Use provided ID (same on retry)
db := database.FromContext(ctx)
// Check if order already exists (from previous attempt)
var existingID string
err := db.QueryRowContext(ctx,
`SELECT id FROM orders WHERE id = $1`,
orderID,
).Scan(&existingID)
if err == nil {
// Order already exists, return success
return &CreateOrderOutput{OrderID: orderID, CreatedAt: time.Now()}, nil
}
// Order doesn't exist, create it
// ... insert logic ...
return &CreateOrderOutput{OrderID: orderID, CreatedAt: time.Now()}, nil
}
// In CDL, pass order_id from workflow:
// - name: CreateOrder
// type: Task
// parameters:
// order_id: "{{ workflow.execution_id }}" # Use unique ID
// customer_id: "{{ $.customer_id }}"Pattern 4: API Integration Activity
Use: Call external services
Example: Charge Payment
package activities
import (
"context"
"fmt"
"time"
"github.com/stripe/stripe-go/v72"
"github.com/stripe/stripe-go/v72/charge"
)
type ChargePaymentInput struct {
Amount int64 `json:"amount"` // Amount in cents
CardToken string `json:"card_token"` // Stripe token
IdempotencyKey string `json:"idempotency_key"` // For safe retries
}
type ChargePaymentOutput struct {
ChargeID string `json:"charge_id"`
Status string `json:"status"`
Amount int64 `json:"amount"`
}
func ChargePayment(ctx context.Context, input *ChargePaymentInput) (*ChargePaymentOutput, error) {
// Set API key
stripe.Key = "sk_test_..."
// Create charge with idempotency key
params := &stripe.ChargeParams{
Amount: stripe.Int64(input.Amount),
Currency: stripe.String("usd"),
Source: &stripe.SourceParams{Token: stripe.String(input.CardToken)},
Idempotent: stripe.String(input.IdempotencyKey), // Safe retries
}
chargeResult, err := charge.New(params)
if err != nil {
// Check if error is transient (should retry)
if isTransientError(err) {
return nil, err // Temporal will retry
}
// Non-transient error
return nil, fmt.Errorf("charge failed: %w", err)
}
return &ChargePaymentOutput{
ChargeID: chargeResult.ID,
Status: string(chargeResult.Status),
Amount: chargeResult.Amount,
}, nil
}
func isTransientError(err error) bool {
// Network errors, timeouts, rate limits are transient
switch err.Error() {
case "connection refused", "timeout", "rate limited":
return true
default:
return false
}
}
// Usage in workflow
// - name: ProcessPayment
// type: Task
// resource: urn:cascade:activity:charge_payment
// parameters:
// amount: "{{ $.order.total * 100 }}"
// card_token: "{{ workflow.input.card_token }}"
// idempotency_key: "{{ workflow.execution_id }}-payment"
// result: $.payment
// retries:
// max_attempts: 3
// backoff:
// initial_interval: 2s
// max_interval: 30s
// multiplier: 2
// timeout: 30sKey points:
- Use idempotency keys for safe retries
- Distinguish transient vs permanent errors
- Set reasonable timeouts
- Log external API calls for debugging
Pattern 5: Event Publishing Activity
Use: Publish events to event bus
Example: Send Email
package activities
import (
"context"
"fmt"
"github.com/cascade-platform/sdk/events"
)
type SendEmailInput struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
Template string `json:"template"` // optional
}
type SendEmailOutput struct {
EmailID string `json:"email_id"`
Status string `json:"status"` // queued, sent, failed
}
func SendEmail(ctx context.Context, input *SendEmailInput) (*SendEmailOutput, error) {
eventBus := events.FromContext(ctx)
// Create event
emailEvent := map[string]interface{}{
"to": input.To,
"subject": input.Subject,
"body": input.Body,
"template": input.Template,
}
// Publish event
emailID, err := eventBus.PublishEvent(ctx, "email.send", emailEvent)
if err != nil {
return nil, fmt.Errorf("failed to publish email event: %w", err)
}
return &SendEmailOutput{
EmailID: emailID,
Status: "queued",
}, nil
}
// Usage in workflow
// - name: SendConfirmation
// type: Task
// resource: urn:cascade:activity:send_email
// parameters:
// to: "{{ $.customer.email }}"
// subject: "Order Confirmed"
// body: "Your order {{ $.order_id }} has been confirmed"
// result: $.email
// timeout: 5sReal-World Activities
Activity 1: Database Query with Caching
package activities
import (
"context"
"fmt"
"time"
"github.com/cascade-platform/sdk/cache"
"github.com/cascade-platform/sdk/database"
)
type GetProductInput struct {
SKU string `json:"sku"`
}
type Product struct {
SKU string `json:"sku"`
Name string `json:"name"`
Price float64 `json:"price"`
InStock bool `json:"in_stock"`
Quantity int `json:"quantity"`
}
func GetProduct(ctx context.Context, input *GetProductInput) (*Product, error) {
cache := cache.FromContext(ctx)
cacheKey := fmt.Sprintf("product:%s", input.SKU)
// Try cache first
if cached, err := cache.Get(ctx, cacheKey); err == nil {
var product Product
if err := json.Unmarshal(cached, &product); err == nil {
return &product, nil
}
}
// Cache miss, query database
db := database.FromContext(ctx)
var product Product
err := db.QueryRowContext(ctx,
`SELECT sku, name, price, quantity > 0 as in_stock, quantity
FROM products WHERE sku = $1`,
input.SKU,
).Scan(&product.SKU, &product.Name, &product.Price, &product.InStock, &product.Quantity)
if err != nil {
return nil, fmt.Errorf("product not found: %s", input.SKU)
}
// Store in cache (5 min TTL)
data, _ := json.Marshal(product)
cache.Set(ctx, cacheKey, data, 5*time.Minute)
return &product, nil
}Activity 2: API Integration with Retry Logic
package activities
import (
"context"
"fmt"
"time"
"github.com/cenkalti/backoff/v4"
)
type VerifyAddressInput struct {
Street string `json:"street"`
City string `json:"city"`
State string `json:"state"`
Zip string `json:"zip"`
}
type VerifyAddressOutput struct {
Valid bool `json:"valid"`
Corrected string `json:"corrected"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
}
func VerifyAddress(ctx context.Context, input *VerifyAddressInput) (*VerifyAddressOutput, error) {
// Use exponential backoff for retries
expBackoff := backoff.NewExponentialBackOff()
expBackoff.MaxElapsedTime = 20 * time.Second
var result *VerifyAddressOutput
err := backoff.RetryNotify(func() error {
// Call external address verification API
output, err := callAddressAPI(ctx, input)
if err != nil {
return err // Retry
}
result = output
return nil
}, expBackoff, func(err error, duration time.Duration) {
fmt.Printf("Address verification failed, retrying in %v: %v\n", duration, err)
})
if err != nil {
return nil, fmt.Errorf("address verification failed: %w", err)
}
return result, nil
}
func callAddressAPI(ctx context.Context, input *VerifyAddressInput) (*VerifyAddressOutput, error) {
// Implementation would call SmartyStreets, USPS, etc.
// ...
return &VerifyAddressOutput{Valid: true}, nil
}Best Practices
✅ DO:
- Make activities idempotent - Safe to retry with same input
- Return errors explicitly - Don’t swallow exceptions
- Use structured I/O - JSON-serializable structs
- Set heartbeats - Signal activity is alive (long-running)
- Use context properly - Pass context to all I/O operations
- Log meaningfully - Include input/output for debugging
- Test thoroughly - Unit + integration tests
- Use transactions - For multi-step database operations
- Handle timeouts - Graceful degradation
❌ DON’T:
- Call other activities - Activities are leaf functions
- Ignore retry safety - Use idempotency keys/deduplication
- Create infinite loops - Add timeout conditions
- Block indefinitely - Always set timeouts
- Store state between calls - Activities are stateless
- Assume external systems work - Always handle failures
- Log sensitive data - Never log credentials/tokens
- Use hardcoded values - Pass parameters from workflow
Testing Activities
Unit Test Pattern
package activities
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestTransformText(t *testing.T) {
tests := []struct {
name string
input TransformInput
want string
wantErr bool
}{
{
name: "uppercase",
input: TransformInput{Text: "hello", Operation: "upper"},
want: "HELLO",
wantErr: false,
},
{
name: "lowercase",
input: TransformInput{Text: "HELLO", Operation: "lower"},
want: "hello",
wantErr: false,
},
{
name: "invalid operation",
input: TransformInput{Text: "hello", Operation: "invalid"},
want: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := TransformText(context.Background(), &tt.input)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, tt.want, result.Result)
})
}
}Integration Test Pattern
func TestLoadCustomer(t *testing.T) {
// Setup test database
db := setupTestDB(t)
defer db.Close()
// Insert test data
_, err := db.Exec(`INSERT INTO customers (id, name, email, tier, balance, status)
VALUES ('cust-123', 'Alice', 'alice@example.com', 'gold', 1000.00, 'active')`)
assert.NoError(t, err)
// Create mock context
ctx := context.Background()
ctx = database.WithContext(ctx, db)
// Call activity
result, err := LoadCustomer(ctx, &LoadCustomerInput{CustomerID: "cust-123"})
assert.NoError(t, err)
assert.Equal(t, "Alice", result.Name)
assert.Equal(t, "gold", result.Tier)
}Activity Performance
Latency Guidelines
| Activity Type | Target | Notes |
|---|---|---|
| Computation | <5ms | Pure functions |
| DB Query | <50ms | Well-indexed queries |
| DB Write | <100ms | With transaction |
| API Call | <500ms | Depends on external service |
| File I/O | <200ms | Local filesystem |
Optimization Tips
- Cache frequently accessed data
- Use connection pooling
- Index database queries
- Batch operations when possible
- Use goroutines for parallelism (carefully!)
Next Steps
Ready to register activities? → Workflow Development Guide
Need schema help? → Schema Development Guide
Testing strategies? → Testing & Debugging Guide
Updated: October 29, 2025
Version: 1.0
Examples: 15+ complete activities
Patterns: 5 proven designs