This guide provides detailed instructions and examples for creating flows in FlowRunner.
FlowRunner uses YAML to define workflows. A flow consists of a series of connected nodes, each performing a specific task. Nodes can be connected in various ways to create complex workflows.
A flow definition consists of two main sections:
- Metadata: Contains information about the flow
- Nodes: Defines the nodes in the flow and their connections
metadata:
name: "My Flow"
description: "A description of what this flow does"
version: "1.0.0"nodes:
node_name:
type: "node_type"
params:
# Parameters specific to the node type
next:
default: "next_node"
error: "error_node"
batch:
# Batch processing configuration
retry:
# Retry configuration
hooks:
# JavaScript hooksThis simple flow makes an HTTP request and logs the response.
metadata:
name: "Hello World Flow"
description: "A simple flow that makes an HTTP request"
version: "1.0.0"
nodes:
start:
type: "http.request"
params:
url: "https://httpbin.org/get"
method: "GET"
next:
default: "log"
error: "error"
log:
type: "transform"
params:
script: |
console.log("Response:", input);
return input;
next:
default: "end"
error:
type: "transform"
params:
script: |
console.log("Error:", input);
return { error: input };
next:
default: "end"
end:
type: "transform"
params:
script: |
console.log("Flow completed");
return { status: "completed" };This flow fetches data from an API, transforms it, and sends it to another API.
metadata:
name: "Data Transformation Flow"
description: "Fetches data, transforms it, and sends it to another API"
version: "1.0.0"
nodes:
fetch_data:
type: "http.request"
params:
url: "https://api.example.com/data"
method: "GET"
headers:
Authorization: "Bearer ${secrets.API_KEY}"
next:
default: "transform_data"
error: "handle_error"
transform_data:
type: "transform"
params:
script: |
// Extract the items from the response
const items = input.body.items || [];
// Transform each item
const transformed = items.map(item => ({
id: item.id,
name: item.name.toUpperCase(),
created: new Date(item.created_at).toISOString(),
status: item.active ? "ACTIVE" : "INACTIVE"
}));
return { items: transformed };
next:
default: "send_data"
send_data:
type: "http.request"
params:
url: "https://api.example.com/submit"
method: "POST"
headers:
Content-Type: "application/json"
Authorization: "Bearer ${secrets.API_KEY}"
body: "${input.items}"
next:
default: "success"
error: "handle_error"
success:
type: "transform"
params:
script: |
return { status: "success", message: "Data processed successfully" };
handle_error:
type: "transform"
params:
script: |
console.log("Error:", input);
return { status: "error", message: "An error occurred", error: input };This flow demonstrates conditional branching based on input data.
metadata:
name: "Conditional Flow"
description: "Demonstrates conditional branching"
version: "1.0.0"
nodes:
start:
type: "condition"
params:
conditions:
- condition: "input.status == 'success'"
action: "success"
- condition: "input.status == 'error'"
action: "error"
- condition: "input.status == 'pending'"
action: "pending"
default_action: "unknown"
next:
success: "handle_success"
error: "handle_error"
pending: "handle_pending"
unknown: "handle_unknown"
handle_success:
type: "transform"
params:
script: |
console.log("Success case");
return { result: "success" };
next:
default: "end"
handle_error:
type: "transform"
params:
script: |
console.log("Error case");
return { result: "error" };
next:
default: "end"
handle_pending:
type: "transform"
params:
script: |
console.log("Pending case");
return { result: "pending" };
next:
default: "end"
handle_unknown:
type: "transform"
params:
script: |
console.log("Unknown case");
return { result: "unknown" };
next:
default: "end"
end:
type: "transform"
params:
script: |
console.log("Flow completed with result:", input.result);
return input;This flow uses an LLM to generate content based on a prompt.
metadata:
name: "LLM Content Generator"
description: "Generates content using an LLM"
version: "1.0.0"
nodes:
get_topic:
type: "http.request"
params:
url: "https://api.example.com/topics/random"
method: "GET"
next:
default: "generate_content"
error: "handle_error"
generate_content:
type: "llm"
params:
provider: "openai"
api_key: "${secrets.OPENAI_API_KEY}"
model: "gpt-3.5-turbo"
template: "Write a 300-word blog post about {{.topic}}. The tone should be informative and engaging."
variables:
topic: "${input.body.topic}"
temperature: 0.7
max_tokens: 500
next:
default: "format_content"
error: "handle_error"
format_content:
type: "transform"
params:
script: |
// Extract the content from the LLM response
const content = input.content;
// Format the content
const formatted = {
title: `Blog Post: ${input.input.body.topic}`,
content: content,
word_count: content.split(/\s+/).length,
created_at: new Date().toISOString()
};
return formatted;
next:
default: "store_content"
store_content:
type: "http.request"
params:
url: "https://api.example.com/content"
method: "POST"
headers:
Content-Type: "application/json"
Authorization: "Bearer ${secrets.API_KEY}"
body: "${input}"
next:
default: "success"
error: "handle_error"
success:
type: "transform"
params:
script: |
return { status: "success", message: "Content generated and stored successfully" };
handle_error:
type: "transform"
params:
script: |
console.log("Error:", input);
return { status: "error", message: "An error occurred", error: input };This flow monitors an email inbox, processes incoming emails, and sends responses.
metadata:
name: "Email Processor"
description: "Processes incoming emails and sends responses"
version: "1.0.0"
nodes:
check_emails:
type: "email.receive"
params:
imap_host: "imap.gmail.com"
imap_port: 993
username: "${secrets.EMAIL_USERNAME}"
password: "${secrets.EMAIL_PASSWORD}"
folder: "INBOX"
limit: 10
unseen: true
with_body: true
mark_as_read: true
next:
default: "process_emails"
error: "handle_error"
process_emails:
type: "transform"
params:
script: |
// Check if we have any emails
if (!Array.isArray(input) || input.length === 0) {
console.log("No new emails");
return { status: "no_emails" };
}
// Process each email
const processed = input.map(email => ({
id: email.messageId,
from: email.from,
subject: email.subject,
body: email.body,
category: categorizeEmail(email.subject, email.body)
}));
// Helper function to categorize emails
function categorizeEmail(subject, body) {
const subjectLower = subject.toLowerCase();
const bodyLower = body.toLowerCase();
if (subjectLower.includes("support") || bodyLower.includes("help")) {
return "support";
} else if (subjectLower.includes("order") || bodyLower.includes("purchase")) {
return "order";
} else {
return "general";
}
}
return { emails: processed };
next:
default: "categorize_emails"
categorize_emails:
type: "condition"
params:
conditions:
- condition: "input.status == 'no_emails'"
action: "no_emails"
default_action: "has_emails"
next:
no_emails: "end"
has_emails: "send_responses"
send_responses:
type: "transform"
params:
script: |
// Prepare responses for each email
const responses = input.emails.map(email => {
let responseTemplate;
switch (email.category) {
case "support":
responseTemplate = "Thank you for contacting support. Your request has been received and we will get back to you shortly. Reference: {{id}}";
break;
case "order":
responseTemplate = "Thank you for your order inquiry. Our team will review your request and respond within 24 hours. Reference: {{id}}";
break;
default:
responseTemplate = "Thank you for your message. We have received it and will respond soon. Reference: {{id}}";
}
return {
to: email.from,
subject: `Re: ${email.subject}`,
body: responseTemplate.replace("{{id}}", email.id)
};
});
return { responses };
next:
default: "send_emails"
send_emails:
type: "transform"
params:
script: |
// This is a placeholder for sending multiple emails
// In a real implementation, you would use a batch node or loop
console.log(`Preparing to send ${input.responses.length} email responses`);
return input;
next:
default: "send_email_batch"
send_email_batch:
type: "email.send"
params:
smtp_host: "smtp.gmail.com"
smtp_port: 587
username: "${secrets.EMAIL_USERNAME}"
password: "${secrets.EMAIL_PASSWORD}"
from: "${secrets.EMAIL_USERNAME}"
to: "${input.responses[0].to}"
subject: "${input.responses[0].subject}"
body: "${input.responses[0].body}"
next:
default: "success"
error: "handle_error"
success:
type: "transform"
params:
script: |
return { status: "success", message: "Emails processed and responses sent" };
next:
default: "end"
handle_error:
type: "transform"
params:
script: |
console.log("Error:", input);
return { status: "error", message: "An error occurred", error: input };
next:
default: "end"
end:
type: "transform"
params:
script: |
console.log("Flow completed");
return { status: "completed" };This flow demonstrates a data pipeline that extracts data from an API, transforms it, and loads it into a database.
metadata:
name: "ETL Pipeline"
description: "Extracts data from an API, transforms it, and loads it into a database"
version: "1.0.0"
nodes:
extract:
type: "http.request"
params:
url: "https://api.example.com/data"
method: "GET"
headers:
Authorization: "Bearer ${secrets.API_KEY}"
next:
default: "transform"
error: "handle_error"
transform:
type: "transform"
params:
script: |
// Extract the items from the response
const items = input.body.items || [];
// Transform each item
const transformed = items.map(item => ({
id: item.id,
name: item.name,
email: item.email,
created_at: new Date(item.created_at).toISOString(),
status: item.active ? "active" : "inactive"
}));
return { items: transformed };
next:
default: "load"
load:
type: "postgres"
params:
operation: "batch_insert"
table: "users"
items: "${input.items}"
columns: ["id", "name", "email", "created_at", "status"]
on_conflict: "id"
on_conflict_action: "update"
next:
default: "success"
error: "handle_error"
success:
type: "transform"
params:
script: |
return {
status: "success",
message: "Data pipeline executed successfully",
items_processed: input.items.length
};
handle_error:
type: "transform"
params:
script: |
console.log("Error:", input);
return { status: "error", message: "An error occurred", error: input };- Start with a clear goal: Define what the flow should accomplish before starting to write YAML.
- Keep flows focused: Each flow should have a single responsibility.
- Use descriptive node names: Names like
fetch_user_dataare better thanhttp1. - Add comments: Use YAML comments (
# Comment) to explain complex logic. - Handle errors: Always include error handling for each node.
- Group related nodes: Keep related functionality together.
- Use a consistent naming convention: For example, use verb-noun format (
fetch_data,transform_data,store_result). - Limit flow complexity: If a flow becomes too complex, consider splitting it into multiple flows.
- Keep scripts simple: Complex logic should be moved to external services.
- Use proper error handling: Wrap code in try-catch blocks for better error reporting.
- Validate inputs: Check that inputs match expected formats before processing.
- Limit script size: Large scripts are harder to maintain and debug.
- Use secrets for sensitive data: Never hardcode API keys, passwords, or other sensitive information.
- Validate external inputs: Always validate and sanitize data from external sources.
- Limit permissions: Use the principle of least privilege for database operations and API calls.
- Audit flows: Regularly review flows for security issues.
- Use batch processing: For operations on multiple items.
- Implement caching: For frequently accessed data.
- Limit payload sizes: Large payloads can slow down execution.
- Use appropriate timeouts: Set reasonable timeouts for external services.
- Start with simple flows: Test basic functionality before adding complexity.
- Use logging nodes: Add transform nodes with
console.log()statements to debug issues. - Test with sample data: Create sample inputs for testing.
- Monitor executions: Use the WebSocket API to monitor flow execution in real-time.