Agent Integration: LLM-Powered Workflows
Status: MVP (v5.1) | Maturity: Beta | Tests: 4 (Agent: 2, Memory: 1, Integration: 1)
Providers: LangChain (production), CrewAI (planned) | Memory: KV, Conversation, Semantic (v5.1)
Cascade Platform integrates large language models as workflow activities, enabling AI-powered decisions while maintaining workflow determinism and auditability.
What is Agent Integration?
Agent integration allows you to embed LLM intelligence into workflows as activities. Agents can analyze text, make decisions, generate content, and interact with external systems—all within deterministic workflows.
Key features:
- ✅ LLM agents as activities (not replacing workflows)
- ✅ Deterministic execution (same input = same output)
- ✅ Complete audit trail (see exactly what agent did)
- ✅ Memory types: KV, Conversation, Semantic
- ✅ Cost monitoring and rate limiting
- ✅ Fallback to human review
Use cases:
- Sentiment analysis
- Document classification
- Chatbot integration
- Content generation
- Recommendation engines
- Anomaly detection
Architecture: Why Agents as Activities?
The Right Way: Agents as Activities
CDL Workflow (deterministic)
├── State 1: Input Collection (HumanTask)
├── State 2: Analyze with Agent (Task → Agent Activity)
│ └─ Agent: Analyze sentiment, extract entities
├── State 3: Route based on result (Choice)
└── State 4: Process & respond (Task)Benefits:
- ✅ Reproducible: Same input always produces same output
- ✅ Auditable: See agent decisions in execution trace
- ✅ Versioned: Control agent behavior via CDL versions
- ✅ Retryable: Failed agent calls retry automatically
- ✅ Monitorable: Track agent costs and performance
The Wrong Way: Workflows Inside Agents
Agent (non-deterministic)
├── Make API call
├── Maybe get confused
├── Call another API
├── Make unpredictable decision
└── User confusionProblems:
- ❌ Non-deterministic (different results each time)
- ❌ Not auditable (hidden decisions)
- ❌ Hard to debug (agent behavior unpredictable)
- ❌ Can’t retry properly
- ❌ Uncontrolled costs
Basic Integration: Using Agents in Workflows
Simple Agent Activity
CDL with agent call:
- name: AnalyzeSentiment
type: Task
resource: urn:cascade:activity:analyze_sentiment
parameters:
text: "{{ $.customer_message }}"
language: "{{ $.language | default: 'en' }}"
result: $.sentiment_analysis
timeout: 30s
retries:
max_attempts: 2
next: RouteBysentimentAgent Activity Implementation (Go):
package activities
import (
"context"
"github.com/cascade-platform/sdk/agents"
)
type SentimentInput struct {
Text string `json:"text"`
Language string `json:"language"`
}
type SentimentOutput struct {
Sentiment string `json:"sentiment"` // positive, negative, neutral
Confidence float64 `json:"confidence"` // 0.0-1.0
Score float64 `json:"score"` // -1.0 to 1.0
Emotions []string `json:"emotions"`
Keywords []string `json:"keywords"`
}
func AnalyzeSentiment(ctx context.Context, input *SentimentInput) (*SentimentOutput, error) {
// Create LLM agent
agent := agents.NewLanguageAgent(
agents.WithModel("gpt-4-turbo"),
agents.WithProvider("openai"),
)
// Define the prompt
prompt := fmt.Sprintf(`Analyze the following text and provide:
1. Sentiment: positive, negative, or neutral
2. Confidence: 0.0-1.0
3. Score: -1.0 to 1.0
4. Emotions detected
5. Key topics
Text: "%s"
Language: %s
Respond as JSON.`, input.Text, input.Language)
// Call agent (deterministic with seed)
response, err := agent.Execute(ctx, agents.Request{
Prompt: prompt,
Temperature: 0.1, // Low temp = deterministic
MaxTokens: 500,
ResponseFormat: "json",
})
if err != nil {
return nil, err // Temporal will retry
}
// Parse response
var output SentimentOutput
err = json.Unmarshal([]byte(response), &output)
return &output, err
}Using in workflow:
- name: RouteBysentiment
type: Choice
choices:
- condition: "{{ $.sentiment_analysis.sentiment == 'negative' && $.sentiment_analysis.confidence > 0.9 }}"
next: EscalateToAgent
description: "Definitely negative, needs human"
- condition: "{{ $.sentiment_analysis.sentiment == 'positive' }}"
next: AutoRespond
description: "Positive feedback, auto response"
default: ManagerReviewMemory Types: Persistence for Agents
Memory Architecture
┌─────────────────────────────────────────┐
│ Workflow Context │
├─────────────────────────────────────────┤
│ Contains: │
│ - Input data │
│ - Activity results │
│ - Agent memory references │
│ - User information │
└────────────┬────────────────────────────┘
│
├────────────────────┬────────────────────┐
▼ ▼ ▼
┌──────────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ KV Store (Redis) │ │ Conversation Log │ │ Vector Database │
│ (Key-Value Memory) │ │ (Chat History) │ │ (Semantic Mem) │
├──────────────────────┤ ├──────────────────┤ ├─────────────────┤
│ user:123.last_order │ │ user:123:history │ │ user:123.similar│
│ product:456.viewed │ │ [msg1, msg2...] │ │ [embedding1...]│
│ state.temp_data │ │ 100 messages │ │ 1000 vectors │
└──────────────────────┘ └──────────────────┘ └─────────────────┘1. KV Memory (Stateless Facts)
Store simple facts agent needs to remember:
- name: ExtractCustomerInfo
type: Task
resource: urn:cascade:activity:extract_customer_info
parameters:
conversation: "{{ $.customer_messages }}"
memory_key: "customer:{{ $.customer_id }}"
result: $.extracted_info
next: ProcessExtractionActivity implementation:
func ExtractCustomerInfo(ctx context.Context, input ExtractInput) (Output, error) {
agent := agents.NewLanguageAgent()
// Get existing facts from memory
existingFacts, _ := agents.MemoryKV.Get(ctx, input.MemoryKey)
// Analyze conversation
response, _ := agent.Execute(ctx, agents.Request{
Prompt: fmt.Sprintf(`
Conversation:
%s
Existing facts:
%s
Extract and update facts about customer.
Return JSON with: {preferences, location, needs, budget}
`, input.Conversation, existingFacts),
})
// Store updated facts
output := parseResponse(response)
agents.MemoryKV.Set(ctx, input.MemoryKey, output, 24*time.Hour)
return output, nil
}2. Conversation Memory (Chat History)
Maintain conversation history across interactions:
- name: RespondToCustomer
type: Task
resource: urn:cascade:activity:customer_response
parameters:
customer_message: "{{ $.current_message }}"
customer_id: "{{ $.customer_id }}"
memory_window: 20 # Remember last 20 messages
result: $.agent_response
next: SendResponseActivity implementation:
func RespondToCustomer(ctx context.Context, input CustomerInput) (Output, error) {
// Get conversation history (last N messages)
history, _ := agents.MemoryConversation.GetWindow(
ctx,
"customer:" + input.CustomerID,
input.MemoryWindow, // Last 20 messages
)
// Create agent prompt with context
messages := []agents.Message{
{Role: "system", Content: "You are a helpful customer service agent."},
}
// Add conversation history
for _, msg := range history {
messages = append(messages, agents.Message{
Role: msg.Role, // "user" or "assistant"
Content: msg.Content,
})
}
// Add current message
messages = append(messages, agents.Message{
Role: "user",
Content: input.CustomerMessage,
})
// Get response from agent
response, _ := agents.NewLanguageAgent().Chat(ctx, messages)
// Store in conversation history
agents.MemoryConversation.Add(ctx, "customer:"+input.CustomerID, agents.Message{
Role: "user",
Content: input.CustomerMessage,
Timestamp: time.Now(),
})
agents.MemoryConversation.Add(ctx, "customer:"+input.CustomerID, agents.Message{
Role: "assistant",
Content: response,
Timestamp: time.Now(),
})
return Output{Response: response}, nil
}3. Semantic Memory (Vector Store)
Remember conceptual similarities, not just exact matches:
- name: RecommendProducts
type: Task
resource: urn:cascade:activity:product_recommendation
parameters:
customer_interests: "{{ $.customer.interests }}"
customer_id: "{{ $.customer_id }}"
result: $.recommendations
timeout: 60s
next: PresentRecommendationsActivity implementation:
func RecommendProducts(ctx context.Context, input RecInput) (Output, error) {
// Convert customer interests to embedding
embedding, _ := agents.EmbeddingModel.Encode(
ctx,
input.CustomerInterests,
)
// Search for similar products in vector store
similarProducts, _ := agents.MemorySemantic.Search(
ctx,
embedding,
"products", // Collection name
10, // Top 10
)
// Use agent to rank and explain recommendations
agent := agents.NewLanguageAgent()
productsJSON, _ := json.Marshal(similarProducts)
response, _ := agent.Execute(ctx, agents.Request{
Prompt: fmt.Sprintf(`
Customer interests: %s
Similar products found:
%s
Select best 3 products and explain why they match the customer.
Return JSON: {recommendations: [{id, name, reason}]}
`, input.CustomerInterests, string(productsJSON)),
})
return parseRecommendations(response), nil
}Advanced Patterns
Pattern 1: Multi-Agent Workflows
Different agents for different tasks:
- name: AnalyzeDocument
type: Parallel
branches:
- name: ExtractEntities
type: Task
resource: urn:cascade:activity:extract_entities
parameters:
document: "{{ $.document }}"
result: $.entities
- name: ClassifyDocument
type: Task
resource: urn:cascade:activity:classify_document
parameters:
document: "{{ $.document }}"
result: $.classification
- name: SummarizeDocument
type: Task
resource: urn:cascade:activity:summarize_document
parameters:
document: "{{ $.document }}"
result: $.summary
completion_strategy: ALL
next: CombineAnalysisPattern 2: Agent with Fallback
If agent uncertain, escalate to human:
- name: MakeDecision
type: Task
resource: urn:cascade:activity:make_recommendation
parameters:
data: "{{ $.analysis_data }}"
result: $.agent_decision
next: CheckConfidence
- name: CheckConfidence
type: Choice
choices:
- condition: "{{ $.agent_decision.confidence >= 0.9 }}"
next: ApplyDecision
description: "High confidence, apply"
- condition: "{{ $.agent_decision.confidence >= 0.7 }}"
next: ManagerReview
description: "Medium confidence, review"
default: EscalateToExpertPattern 3: Agent with Tool Use
Agent can call tools to gather information:
// Define tools agent can use
tools := []agents.Tool{
{
Name: "search_database",
Description: "Search customer database",
Function: func(ctx context.Context, query string) string {
results := db.Search(query)
return fmt.Sprintf("Found %d results", len(results))
},
},
{
Name: "get_inventory",
Description: "Check product inventory",
Function: func(ctx context.Context, productID string) string {
count := inventory.Check(productID)
return fmt.Sprintf("In stock: %d units", count)
},
},
}
// Create agent with tools
agent := agents.NewLanguageAgent(
agents.WithTools(tools),
agents.WithThinkingEnabled(true), // Let agent think
)
// Agent will automatically use tools as needed
response, _ := agent.Execute(ctx, agents.Request{
Prompt: "What products match customer interest in 'outdoor gear' under $100?",
})
// Agent calls: search_database("outdoor gear") → get_inventory("item-123")Real-World Example: Customer Service Workflow
Complete customer service with AI:
workflows:
- name: HandleCustomerQuery
start: ReceiveQuery
states:
# Step 1: Collect customer message
- name: ReceiveQuery
type: HumanTask
description: "Customer submits query"
ui:
schema: urn:cascade:schema:customer_query_form
target: appsmith
timeout: 30m
next: AnalyzeQuery
# Step 2: AI analyzes and classifies
- name: AnalyzeQuery
type: Task
resource: urn:cascade:activity:classify_and_analyze
parameters:
query: "{{ $.customer_message }}"
customer_id: "{{ $.customer_id }}"
history_window: 10
result: $.analysis
timeout: 30s
retries:
max_attempts: 2
next: RouteByType
# Step 3: Route based on classification
- name: RouteByType
type: Choice
choices:
- condition: "{{ $.analysis.category == 'technical_support' && $.analysis.confidence > 0.9 }}"
next: TechnicalSupport
- condition: "{{ $.analysis.category == 'billing' && $.analysis.confidence > 0.9 }}"
next: BillingSupport
- condition: "{{ $.analysis.category == 'product_recommendation' }}"
next: RecommendProducts
- condition: "{{ $.analysis.confidence < 0.7 }}"
next: EscalateToHuman
description: "Unclear, needs human"
default: GeneralResponse
# Path 1: Technical support with AI
- name: TechnicalSupport
type: Task
resource: urn:cascade:activity:provide_tech_solution
parameters:
query: "{{ $.analysis.query }}"
product: "{{ $.analysis.product_mentioned }}"
customer_history: "{{ $.analysis.previous_issues }}"
result: $.solution
timeout: 60s
next: ConfirmSolution
# Path 2: Billing support
- name: BillingSupport
type: Task
resource: urn:cascade:activity:handle_billing_query
parameters:
query: "{{ $.analysis.query }}"
customer_id: "{{ $.customer_id }}"
result: $.billing_response
next: ConfirmResponse
# Path 3: Product recommendation
- name: RecommendProducts
type: Task
resource: urn:cascade:activity:recommend_products
parameters:
interests: "{{ $.analysis.extracted_interests }}"
budget: "{{ $.analysis.budget }}"
customer_id: "{{ $.customer_id }}"
result: $.recommendations
next: PresentRecommendations
# Path 4: General response
- name: GeneralResponse
type: Task
resource: urn:cascade:activity:generate_response
parameters:
query: "{{ $.customer_message }}"
context: "{{ $.analysis.context }}"
result: $.response
next: ConfirmResponse
# Path 5: Escalate to human
- name: EscalateToHuman
type: HumanTask
description: "Agent escalates unclear query"
ui:
schema: urn:cascade:schema:agent_escalation_form
pre_populate:
customer_message: "{{ $.customer_message }}"
agent_analysis: "{{ $.analysis }}"
assignee:
role: customer_service_agent
timeout: 4h
next: SendFinalResponse
# Response confirmation
- name: ConfirmSolution
type: HumanTask
description: "Confirm technical solution"
ui:
schema: urn:cascade:schema:confirm_solution
pre_populate:
solution: "{{ $.solution }}"
timeout: 1h
next: SendFinalResponse
- name: ConfirmResponse
type: Task
resource: urn:cascade:activity:format_response
parameters:
content: "{{ $.response || $.billing_response }}"
result: $.formatted_response
next: SendFinalResponse
- name: PresentRecommendations
type: HumanTask
description: "Approve recommendations"
ui:
schema: urn:cascade:schema:recommendations_view
pre_populate:
recommendations: "{{ $.recommendations }}"
timeout: 30m
next: SendFinalResponse
# Final step: Send response
- name: SendFinalResponse
type: Task
resource: urn:cascade:activity:send_customer_response
parameters:
customer_id: "{{ $.customer_id }}"
response: "{{ $.formatted_response || $.solution || $.response }}"
resolution_type: "{{ $.analysis.category }}"
end: trueEnsuring Determinism
Problem: LLMs are Non-Deterministic
Prompt: "What is 2+2?"
Run 1: "2+2 equals 4"
Run 2: "The answer is 4"
Run 3: "2+2 = 4" ← Different format!Solution: Structured Output
// ✅ GOOD: Structured output
response, _ := agent.Execute(ctx, agents.Request{
Prompt: "Calculate 2+2",
ResponseFormat: "json", // Force JSON
JSONSchema: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"result": {"type": "integer"},
"explanation": {"type": "string"},
},
"required": []string{"result"},
},
})
// Response always: {"result": 4, "explanation": "2+2 equals 4"}
// ❌ BAD: Freeform output
response, _ := agent.Execute(ctx, agents.Request{
Prompt: "Calculate 2+2",
// No structure → anything goes
})Solution: Low Temperature
// ✅ GOOD: Temperature 0.1 = deterministic
response, _ := agent.Execute(ctx, agents.Request{
Prompt: prompt,
Temperature: 0.1, // More deterministic (0.0-1.0)
})
// ❌ BAD: Temperature 1.0 = creative/random
response, _ := agent.Execute(ctx, agents.Request{
Prompt: prompt,
Temperature: 1.0, // Very creative!
})Solution: Fixed Seed
// ✅ GOOD: Same seed = same result
response1, _ := agent.Execute(ctx, agents.Request{
Prompt: prompt,
Seed: 12345,
})
response2, _ := agent.Execute(ctx, agents.Request{
Prompt: prompt,
Seed: 12345,
})
// response1 == response2Cost Management
Monitoring Agent Costs
# Prometheus metrics
cascade_agent_api_calls_total # Calls to LLM
cascade_agent_tokens_used_total # Total tokens
cascade_agent_cost_usd_total # Cost in USD
cascade_agent_latency_seconds # Response time
cascade_agent_errors_total # Failed callsRate Limiting
agent := agents.NewLanguageAgent(
agents.WithRateLimit(
agents.RateLimit{
MaxCallsPerSecond: 10,
MaxTokensPerMinute: 90000,
MaxCostPerDay: 100, // $100/day limit
},
),
)Cost Analysis
# Show cost breakdown
cascade agents costs --period day --group-by model
# gpt-4-turbo: $42.50 (1200 calls, 500K tokens)
# gpt-3.5-turbo: $15.20 (4500 calls, 250K tokens)
# Total: $57.70
# Alert on cost threshold
cascade agents costs --alert-threshold 100 # Alert if >$100/dayTroubleshooting
Agent Timeout
Symptom: Agent execution timeout exceeded
Solution:
- name: SlowAgent
type: Task
resource: urn:cascade:activity:slow_analysis
timeout: 120s # Increase from default 30s
retries:
max_attempts: 1 # Don't retry slow operationsNon-Deterministic Output
Symptom: Same input produces different results
Solution:
agent := agents.NewLanguageAgent(
agents.WithTemperature(0.0), // Fully deterministic
agents.WithSeed(42), // Fixed seed
agents.WithResponseFormat("json"), // Structured output
)Memory Overflow
Symptom: Agent memory grows too large
Solution:
# Set memory retention
memory:
kv_retention: 30d # Delete KV entries after 30 days
conversation_retention: 7d # Delete chats after 7 days
semantic_retention: 90d # Keep vectors for 90 days
max_conversation_length: 100 # Keep only last 100 messagesBest Practices
✅ DO:
- Use structured output (JSON schema)
- Set temperature to 0.0-0.3 (deterministic)
- Test agents thoroughly before production
- Monitor costs and set budgets
- Use appropriate timeout values
- Version agents like code
- Audit all agent decisions
❌ DON’T:
- Use creative/random temperatures (>0.7)
- Forget to handle agent failures
- Store sensitive data in agent memory
- Ignore cost implications
- Deploy untested agent changes
- Make agents make critical decisions alone
Future: More Providers
Planned for Q1 2026:
- CrewAI (multi-agent orchestration)
- Anthropic Claude integration
- Open-source models (Llama, Mistral)
- Custom LLM providers
Next Steps
Ready to use agents? → Agent Development Guide
Want to understand memory? → Memory Systems Guide
Need cost optimization? → Agent Optimization Guide
Production deployment? → Agent Operations
Updated: October 29, 2025
Version: 5.1
Providers: LangChain 0.1.0+, OpenAI API v1+
Maturity: Beta (Production Ready for non-critical paths)