IntegrationFlow Configuration
Configure integration flows to orchestrate data and processes
IntegrationFlow Configuration
Defines an integration flow that orchestrates data movement and processing across systems. Integration flows are the core building blocks of Weik.io integrations.
Schema Properties
| Property | Type | Required | Default | Description |
|---|---|---|---|---|
| Description | string | No | - | Human-readable description of the flow’s purpose |
| InitialState | IntegrationStatusEnum | No | Stopped | Initial state when the flow is created (Stopped, Started, Unknown) |
YAML Examples
Basic Integration Flow
apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
name: customer-sync
spec:
Description: Synchronize customer data from CRM to data warehouse
InitialState: Stopped
Auto-Starting Flow
apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
name: order-processing
spec:
Description: Process incoming orders from webhook events
InitialState: Started
Complete Integration Flow Example
apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
name: multi-system-sync
spec:
Description: |
Comprehensive integration flow that:
- Receives customer updates from CRM
- Validates and enriches data
- Updates ERP system
- Sends notifications
InitialState: Stopped
---
# Supporting configurations
apiVersion: weik.io/v1
kind: EventSubscription
metadata:
name: crm-customer-subscription
spec:
Source: crm-changes
Channel: customer-events
---
apiVersion: weik.io/v1
kind: IntegrationService
metadata:
name: customer-validator
spec:
Title: Customer Data Validator
Specification: |
def validate_customer(data):
required_fields = ['id', 'email', 'name']
if not all(field in data for field in required_fields):
raise ValueError("Missing required fields")
return data
---
apiVersion: weik.io/v1
kind: Variable
metadata:
name: erp-api-key
spec:
Value: ${ERP_API_KEY}
Type: string
IsSecret: true
Event-Driven Flow
apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
name: payment-processor
spec:
Description: Process payment events and update order status
InitialState: Started
Scheduled Flow
apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
name: nightly-report-generator
spec:
Description: Generate and distribute daily reports
InitialState: Stopped
---
apiVersion: weik.io/v1
kind: Schedule
metadata:
name: nightly-schedule
spec:
Cron: "0 2 * * *"
Command: python
Args:
- generate_reports.py
Complex Multi-Step Flow
apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
name: product-catalog-sync
spec:
Description: |
Product catalog synchronization workflow:
1. Extract products from PIM system
2. Transform product data
3. Enrich with pricing from ERP
4. Load to e-commerce platform
5. Update search index
InitialState: Stopped
---
apiVersion: weik.io/v1
kind: IntegrationService
metadata:
name: product-transformer
spec:
Title: Product Data Transformer
Specification: |
def transform_product(product):
return {
'id': product['product_id'],
'sku': product['sku'],
'name': product['name'],
'description': product['description'],
'category': product.get('category', 'General'),
'attributes': parse_attributes(product.get('attributes', {}))
}
def parse_attributes(attrs):
return {k: v for k, v in attrs.items() if v is not None}
---
apiVersion: weik.io/v1
kind: IntegrationService
metadata:
name: pricing-enricher
spec:
Title: Product Pricing Enrichment
Specification: |
import requests
def enrich_pricing(product, erp_api_url, api_key):
response = requests.get(
f"{erp_api_url}/pricing/{product['sku']}",
headers={"Authorization": f"Bearer {api_key}"}
)
pricing = response.json()
product['price'] = pricing['unit_price']
product['currency'] = pricing['currency']
return product
Requirements:
requests: ">=2.28.0"
Integration Flow States
IntegrationStatusEnum Values
| State | Description |
|---|---|
Stopped | Flow is not running and will not process events or triggers |
Started | Flow is running and actively processing |
Unknown | Flow state is not determined |
State Management
Control flow state using CLI:
# Start a flow
weikio integration flows start <flow-name>
# Stop a flow
weikio integration flows stop <flow-name>
# Check flow status
weikio integration flows status <flow-name>
Usage Notes
IntegrationFlow configurations define the metadata and lifecycle of integration flows. The actual flow logic is typically defined in the flow’s Python implementation or through service orchestration.
Flow Components
Integration flows typically include:
- Event subscriptions - Trigger flows based on events
- Services - Reusable business logic
- Variables - Configuration and secrets
- Schedules - Time-based execution
- API endpoints - Expose flow operations
Flow Lifecycle
- Creation - Flow is created with specified initial state
- Starting - Flow transitions to active state, begins processing
- Running - Flow processes events, executes logic
- Stopping - Flow gracefully shuts down, completes in-flight operations
- Stopped - Flow is inactive
Initial State Considerations
Set InitialState based on deployment strategy:
Stopped- Safe default, allows testing before activationStarted- For flows that should run immediately after deploymentUnknown- Rarely used, system will determine appropriate state
Flow Organization
Use descriptive naming and documentation:
- Choose clear, purpose-indicating names
- Write comprehensive descriptions
- Document dependencies and requirements
- Group related flows by naming convention
Best Practices
- Start with
InitialState: Stoppedfor new flows - Test flows thoroughly before setting to auto-start
- Document flow purpose and behavior in
Description - Use version control for flow definitions
- Implement proper error handling in flow logic
- Monitor flow execution and performance
- Log important operations and decisions
- Handle transient failures with retry logic
- Validate input data before processing
- Use transactions for data consistency
Flow Development
Typical development workflow:
- Create flow configuration with
InitialState: Stopped - Define supporting resources (services, subscriptions, etc.)
- Deploy to development environment
- Test flow manually
- Start flow for integration testing
- Monitor execution and refine
- Promote to production
- Set to auto-start if appropriate
Flow Monitoring
Monitor flow execution:
# List all flows
weikio integration flows ls
# View flow details
weikio integration flows view <flow-name>
# Check flow status
weikio integration flows status <flow-name>
# View flow logs
weikio integration flows logs <flow-name>
Error Handling
Implement error handling in flows:
- Catch and log exceptions
- Implement retry logic for transient failures
- Use dead-letter queues for failed events
- Alert on critical errors
- Track error rates and patterns
Performance Considerations
- Design flows to handle expected load
- Implement batching for high-volume scenarios
- Use async processing where appropriate
- Monitor resource usage
- Optimize expensive operations
- Consider scaling strategies
Security
- Store secrets in Variables with
IsSecret: true - Validate and sanitize input data
- Use appropriate authentication for external calls
- Implement authorization checks
- Audit sensitive operations
- Follow principle of least privilege
Dependencies
Declare flow dependencies clearly:
- Document required services
- Specify event subscriptions
- List external system dependencies
- Note package requirements
- Version dependencies appropriately