IntegrationFlow Configuration

Configure integration flows to orchestrate data and processes

IntegrationFlow Configuration

Defines an integration flow that orchestrates data movement and processing across systems. Integration flows are the core building blocks of Weik.io integrations.

Schema Properties

PropertyTypeRequiredDefaultDescription
DescriptionstringNo-Human-readable description of the flow’s purpose
InitialStateIntegrationStatusEnumNoStoppedInitial state when the flow is created (Stopped, Started, Unknown)

YAML Examples

Basic Integration Flow

apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
  name: customer-sync
spec:
  Description: Synchronize customer data from CRM to data warehouse
  InitialState: Stopped

Auto-Starting Flow

apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
  name: order-processing
spec:
  Description: Process incoming orders from webhook events
  InitialState: Started

Complete Integration Flow Example

apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
  name: multi-system-sync
spec:
  Description: |
    Comprehensive integration flow that:
    - Receives customer updates from CRM
    - Validates and enriches data
    - Updates ERP system
    - Sends notifications
  InitialState: Stopped
---
# Supporting configurations
apiVersion: weik.io/v1
kind: EventSubscription
metadata:
  name: crm-customer-subscription
spec:
  Source: crm-changes
  Channel: customer-events
---
apiVersion: weik.io/v1
kind: IntegrationService
metadata:
  name: customer-validator
spec:
  Title: Customer Data Validator
  Specification: |
    def validate_customer(data):
        required_fields = ['id', 'email', 'name']
        if not all(field in data for field in required_fields):
            raise ValueError("Missing required fields")
        return data
---
apiVersion: weik.io/v1
kind: Variable
metadata:
  name: erp-api-key
spec:
  Value: ${ERP_API_KEY}
  Type: string
  IsSecret: true

Event-Driven Flow

apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
  name: payment-processor
spec:
  Description: Process payment events and update order status
  InitialState: Started

Scheduled Flow

apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
  name: nightly-report-generator
spec:
  Description: Generate and distribute daily reports
  InitialState: Stopped
---
apiVersion: weik.io/v1
kind: Schedule
metadata:
  name: nightly-schedule
spec:
  Cron: "0 2 * * *"
  Command: python
  Args:
    - generate_reports.py

Complex Multi-Step Flow

apiVersion: weik.io/v1
kind: IntegrationFlow
metadata:
  name: product-catalog-sync
spec:
  Description: |
    Product catalog synchronization workflow:
    1. Extract products from PIM system
    2. Transform product data
    3. Enrich with pricing from ERP
    4. Load to e-commerce platform
    5. Update search index
  InitialState: Stopped
---
apiVersion: weik.io/v1
kind: IntegrationService
metadata:
  name: product-transformer
spec:
  Title: Product Data Transformer
  Specification: |
    def transform_product(product):
        return {
            'id': product['product_id'],
            'sku': product['sku'],
            'name': product['name'],
            'description': product['description'],
            'category': product.get('category', 'General'),
            'attributes': parse_attributes(product.get('attributes', {}))
        }

    def parse_attributes(attrs):
        return {k: v for k, v in attrs.items() if v is not None}
---
apiVersion: weik.io/v1
kind: IntegrationService
metadata:
  name: pricing-enricher
spec:
  Title: Product Pricing Enrichment
  Specification: |
    import requests

    def enrich_pricing(product, erp_api_url, api_key):
        response = requests.get(
            f"{erp_api_url}/pricing/{product['sku']}",
            headers={"Authorization": f"Bearer {api_key}"}
        )
        pricing = response.json()
        product['price'] = pricing['unit_price']
        product['currency'] = pricing['currency']
        return product
  Requirements:
    requests: ">=2.28.0"

Integration Flow States

IntegrationStatusEnum Values

StateDescription
StoppedFlow is not running and will not process events or triggers
StartedFlow is running and actively processing
UnknownFlow state is not determined

State Management

Control flow state using CLI:

# Start a flow
weikio integration flows start <flow-name>

# Stop a flow
weikio integration flows stop <flow-name>

# Check flow status
weikio integration flows status <flow-name>

Usage Notes

IntegrationFlow configurations define the metadata and lifecycle of integration flows. The actual flow logic is typically defined in the flow’s Python implementation or through service orchestration.

Flow Components

Integration flows typically include:

  • Event subscriptions - Trigger flows based on events
  • Services - Reusable business logic
  • Variables - Configuration and secrets
  • Schedules - Time-based execution
  • API endpoints - Expose flow operations

Flow Lifecycle

  1. Creation - Flow is created with specified initial state
  2. Starting - Flow transitions to active state, begins processing
  3. Running - Flow processes events, executes logic
  4. Stopping - Flow gracefully shuts down, completes in-flight operations
  5. Stopped - Flow is inactive

Initial State Considerations

Set InitialState based on deployment strategy:

  • Stopped - Safe default, allows testing before activation
  • Started - For flows that should run immediately after deployment
  • Unknown - Rarely used, system will determine appropriate state

Flow Organization

Use descriptive naming and documentation:

  • Choose clear, purpose-indicating names
  • Write comprehensive descriptions
  • Document dependencies and requirements
  • Group related flows by naming convention

Best Practices

  • Start with InitialState: Stopped for new flows
  • Test flows thoroughly before setting to auto-start
  • Document flow purpose and behavior in Description
  • Use version control for flow definitions
  • Implement proper error handling in flow logic
  • Monitor flow execution and performance
  • Log important operations and decisions
  • Handle transient failures with retry logic
  • Validate input data before processing
  • Use transactions for data consistency

Flow Development

Typical development workflow:

  1. Create flow configuration with InitialState: Stopped
  2. Define supporting resources (services, subscriptions, etc.)
  3. Deploy to development environment
  4. Test flow manually
  5. Start flow for integration testing
  6. Monitor execution and refine
  7. Promote to production
  8. Set to auto-start if appropriate

Flow Monitoring

Monitor flow execution:

# List all flows
weikio integration flows ls

# View flow details
weikio integration flows view <flow-name>

# Check flow status
weikio integration flows status <flow-name>

# View flow logs
weikio integration flows logs <flow-name>

Error Handling

Implement error handling in flows:

  • Catch and log exceptions
  • Implement retry logic for transient failures
  • Use dead-letter queues for failed events
  • Alert on critical errors
  • Track error rates and patterns

Performance Considerations

  • Design flows to handle expected load
  • Implement batching for high-volume scenarios
  • Use async processing where appropriate
  • Monitor resource usage
  • Optimize expensive operations
  • Consider scaling strategies

Security

  • Store secrets in Variables with IsSecret: true
  • Validate and sanitize input data
  • Use appropriate authentication for external calls
  • Implement authorization checks
  • Audit sensitive operations
  • Follow principle of least privilege

Dependencies

Declare flow dependencies clearly:

  • Document required services
  • Specify event subscriptions
  • List external system dependencies
  • Note package requirements
  • Version dependencies appropriately