EventSource Configuration

Configure event sources to generate events from various systems

EventSource Configuration

Defines an event source that generates events from external systems, internal operations, or scheduled triggers. Event sources are the entry points for event-driven integrations.

Schema Properties

PropertyTypeRequiredDefaultDescription
TypestringYes-Event source type (webhook, http-poll, database, file-watch, custom)
EventTypestringYes-CloudEvents event type identifier
EventSourcestringYes-CloudEvents event source identifier
SubjectstringNo-CloudEvents subject for event categorization
ParametersDictionary<string, string>No{}Source-specific configuration parameters
VariablesList<Dictionary<string, string>>No[]Variable mappings for event data
RequirementsDictionary<string, string>No{}Python package requirements for source implementation

YAML Examples

Webhook Event Source

apiVersion: weik.io/v1
kind: EventSource
metadata:
  name: github-webhooks
spec:
  Type: webhook
  EventType: com.github.push
  EventSource: github-repository
  Subject: /repos/myorg/myrepo
  Parameters:
    Path: /webhooks/github
    Secret: ${github-webhook-secret}
    ValidateSignature: "true"

HTTP Polling Source

apiVersion: weik.io/v1
kind: EventSource
metadata:
  name: api-poller
spec:
  Type: http-poll
  EventType: com.example.api.update
  EventSource: external-api
  Parameters:
    Url: https://api.example.com/changes
    Method: GET
    Headers: |
      {
        "Authorization": "Bearer ${api-token}",
        "Accept": "application/json"
      }
    PollingInterval: "60"
    ResponsePath: $.data.items

Database Event Source

apiVersion: weik.io/v1
kind: EventSource
metadata:
  name: order-changes
spec:
  Type: database
  EventType: com.example.orders.changed
  EventSource: orders-database
  Subject: /tables/orders
  Parameters:
    DatabaseType: postgresql
    Host: db.example.com
    Port: "5432"
    Database: ecommerce
    Username: weikio_user
    Password: ${database-password}
    Schema: public
    Table: orders
    TrackInserts: "true"
    TrackUpdates: "true"
    TrackDeletes: "true"
    PollingInterval: "10"
  Variables:
    - SourceColumn: order_id
      TargetVariable: OrderId
    - SourceColumn: customer_id
      TargetVariable: CustomerId
    - SourceColumn: total
      TargetVariable: OrderTotal

File Watch Source

apiVersion: weik.io/v1
kind: EventSource
metadata:
  name: file-upload-watcher
spec:
  Type: file-watch
  EventType: com.example.file.uploaded
  EventSource: upload-directory
  Subject: /uploads
  Parameters:
    Path: /data/uploads
    Pattern: "*.csv"
    IncludeSubdirectories: "true"
    NotifyFilters: Created,Modified

Custom Event Source

apiVersion: weik.io/v1
kind: EventSource
metadata:
  name: custom-sensor-data
spec:
  Type: custom
  EventType: com.example.sensor.reading
  EventSource: iot-sensors
  Subject: /devices/sensor-001
  Parameters:
    Implementation: ./sources/sensor_reader.py
    DeviceId: sensor-001
    SampleInterval: "5"
  Requirements:
    pyserial: ">=3.5"
    paho-mqtt: ">=1.6.0"

Multiple Event Sources

apiVersion: weik.io/v1
kind: EventSource
metadata:
  name: stripe-webhooks
spec:
  Type: webhook
  EventType: com.stripe.payment.succeeded
  EventSource: stripe
  Subject: /payments
  Parameters:
    Path: /webhooks/stripe
    Secret: ${stripe-webhook-secret}
---
apiVersion: weik.io/v1
kind: EventSource
metadata:
  name: customer-changes
spec:
  Type: database
  EventType: com.crm.customer.changed
  EventSource: crm-database
  Subject: /customers
  Parameters:
    DatabaseType: postgresql
    Host: ${database-host}
    Database: crm
    Table: customers
    TrackInserts: "true"
    TrackUpdates: "true"
---
apiVersion: weik.io/v1
kind: EventSource
metadata:
  name: price-updates-poll
spec:
  Type: http-poll
  EventType: com.supplier.price.updated
  EventSource: supplier-api
  Parameters:
    Url: https://supplier.example.com/api/prices
    PollingInterval: "300"
    Method: GET

Complex Webhook with Transformation

apiVersion: weik.io/v1
kind: EventSource
metadata:
  name: shopify-orders
spec:
  Type: webhook
  EventType: com.shopify.orders.create
  EventSource: shopify-store
  Subject: /orders
  Parameters:
    Path: /webhooks/shopify/orders
    Secret: ${shopify-webhook-secret}
    ValidateSignature: "true"
    TransformPayload: |
      def transform(payload):
          return {
              'orderId': payload['id'],
              'orderNumber': payload['order_number'],
              'customer': {
                  'id': payload['customer']['id'],
                  'email': payload['customer']['email']
              },
              'total': payload['total_price'],
              'currency': payload['currency'],
              'items': [
                  {
                      'productId': item['product_id'],
                      'quantity': item['quantity'],
                      'price': item['price']
                  }
                  for item in payload['line_items']
              ]
          }
  Variables:
    - SourceColumn: orderId
      TargetVariable: OrderId
    - SourceColumn: customer.email
      TargetVariable: CustomerEmail
    - SourceColumn: total
      TargetVariable: OrderTotal

Event Source Types

Webhook

Receives HTTP webhook calls:

Type: webhook
Parameters:
  Path: /webhooks/endpoint
  Secret: webhook-secret
  ValidateSignature: "true"
  AllowedIPs: "10.0.0.0/8,192.168.0.0/16"

Use cases: GitHub, Stripe, Shopify webhooks, custom integrations

HTTP Poll

Polls HTTP endpoints at intervals:

Type: http-poll
Parameters:
  Url: https://api.example.com/endpoint
  Method: GET
  Headers: '{"Authorization": "Bearer token"}'
  PollingInterval: "60"
  ResponsePath: $.data

Use cases: APIs without webhooks, legacy systems, RSS feeds

Database

Monitors database changes:

Type: database
Parameters:
  DatabaseType: postgresql
  Host: localhost
  Database: mydb
  Table: tablename
  TrackInserts: "true"
  TrackUpdates: "true"
  TrackDeletes: "true"

Use cases: Database change tracking, CDC, data synchronization

File Watch

Monitors file system changes:

Type: file-watch
Parameters:
  Path: /data/directory
  Pattern: "*.csv"
  NotifyFilters: Created,Modified,Deleted

Use cases: File uploads, batch processing, FTP drops

Custom

Custom implementation:

Type: custom
Parameters:
  Implementation: ./path/to/source.py
  CustomParam1: value1

Use cases: IoT devices, message queues, proprietary protocols

CloudEvents Properties

EventSource uses CloudEvents specification:

  • EventType: Reverse DNS notation (e.g., com.example.orders.created)
  • EventSource: URI identifying the source (e.g., order-api, /services/orders)
  • Subject: Optional resource identifier (e.g., /orders/12345)

These properties help categorize and route events consistently.

Usage Notes

EventSource configurations define how events enter the Weik.io platform. Sources generate CloudEvents-compliant events that flow through channels to subscribers.

Event Flow

EventSource → EventChannel → EventSubscription → IntegrationFlow
  1. Source generates event
  2. Event is published to channel
  3. Subscribers receive event
  4. Integration flows process event

Webhook Configuration

Webhooks expose HTTP endpoints:

  • Endpoint: https://your-host/webhooks/{Path}
  • Verify signatures using Secret
  • Restrict access with AllowedIPs

Configure webhook URL in external system:

https://your-weikio-host/webhooks/github

Polling Configuration

HTTP polling retrieves data periodically:

  • PollingInterval in seconds
  • ResponsePath uses JSONPath to extract events
  • One event per item in response array

Database Change Tracking

Monitor database tables:

  • Supports PostgreSQL, MySQL, SQL Server, Oracle
  • Tracks inserts, updates, deletes
  • Generates event per change
  • Use Variables to map columns to event data

File Watching

Monitor directories for changes:

  • Supports patterns (glob)
  • Watches subdirectories optionally
  • Generates event per file change
  • Event includes file path and metadata

Variable Mapping

Map source data to event variables:

Variables:
  - SourceColumn: field_name
    TargetVariable: VariableName

Variables are accessible in subscribing flows using ${VariableName} syntax.

Security

Protect event sources:

  • Use Secret for webhook signature validation
  • Restrict webhook access with AllowedIPs
  • Store credentials in Variables with IsSecret: true
  • Use HTTPS for webhook endpoints
  • Implement authentication for HTTP polling
  • Secure database connections with SSL

Error Handling

Handle errors in event sources:

  • Implement retry logic for transient failures
  • Log errors for troubleshooting
  • Dead-letter failed events
  • Alert on persistent failures
  • Monitor source health

Performance

Optimize event source performance:

  • Set appropriate polling intervals
  • Batch database changes when possible
  • Limit file watch scope with patterns
  • Monitor event generation rate
  • Scale webhook endpoints as needed

Best Practices

  • Use descriptive EventType in reverse DNS format
  • Set meaningful EventSource identifiers
  • Implement signature validation for webhooks
  • Choose appropriate polling intervals (not too frequent)
  • Use database change tracking for real-time data sync
  • Monitor source health and event flow
  • Document event schemas
  • Test sources before production deployment
  • Implement idempotency in event handlers
  • Use CloudEvents consistently

Event Schema

Document event structure:

# Example event generated:
{
  "specversion": "1.0",
  "type": "com.example.orders.created",
  "source": "order-api",
  "subject": "/orders/12345",
  "id": "unique-event-id",
  "time": "2025-01-15T10:30:00Z",
  "datacontenttype": "application/json",
  "data": {
    "orderId": "12345",
    "customerId": "67890",
    "total": 99.99
  }
}

Monitoring

Monitor event sources:

weikio eventsource ls
weikio eventsourceinstance ls
weikio eventsourceinstance status <instance-id>

Track metrics:

  • Event generation rate
  • Error rate
  • Processing latency
  • Source availability

Testing

Test event sources:

  1. Create source configuration
  2. Deploy to test environment
  3. Generate test events
  4. Verify event delivery
  5. Validate event data
  6. Test error scenarios
  7. Monitor performance under load