EventSource Configuration
Configure event sources to generate events from various systems
EventSource Configuration
Defines an event source that generates events from external systems, internal operations, or scheduled triggers. Event sources are the entry points for event-driven integrations.
Schema Properties
| Property | Type | Required | Default | Description |
|---|---|---|---|---|
| Type | string | Yes | - | Event source type (webhook, http-poll, database, file-watch, custom) |
| EventType | string | Yes | - | CloudEvents event type identifier |
| EventSource | string | Yes | - | CloudEvents event source identifier |
| Subject | string | No | - | CloudEvents subject for event categorization |
| Parameters | Dictionary<string, string> | No | {} | Source-specific configuration parameters |
| Variables | List<Dictionary<string, string>> | No | [] | Variable mappings for event data |
| Requirements | Dictionary<string, string> | No | {} | Python package requirements for source implementation |
YAML Examples
Webhook Event Source
apiVersion: weik.io/v1
kind: EventSource
metadata:
name: github-webhooks
spec:
Type: webhook
EventType: com.github.push
EventSource: github-repository
Subject: /repos/myorg/myrepo
Parameters:
Path: /webhooks/github
Secret: ${github-webhook-secret}
ValidateSignature: "true"
HTTP Polling Source
apiVersion: weik.io/v1
kind: EventSource
metadata:
name: api-poller
spec:
Type: http-poll
EventType: com.example.api.update
EventSource: external-api
Parameters:
Url: https://api.example.com/changes
Method: GET
Headers: |
{
"Authorization": "Bearer ${api-token}",
"Accept": "application/json"
}
PollingInterval: "60"
ResponsePath: $.data.items
Database Event Source
apiVersion: weik.io/v1
kind: EventSource
metadata:
name: order-changes
spec:
Type: database
EventType: com.example.orders.changed
EventSource: orders-database
Subject: /tables/orders
Parameters:
DatabaseType: postgresql
Host: db.example.com
Port: "5432"
Database: ecommerce
Username: weikio_user
Password: ${database-password}
Schema: public
Table: orders
TrackInserts: "true"
TrackUpdates: "true"
TrackDeletes: "true"
PollingInterval: "10"
Variables:
- SourceColumn: order_id
TargetVariable: OrderId
- SourceColumn: customer_id
TargetVariable: CustomerId
- SourceColumn: total
TargetVariable: OrderTotal
File Watch Source
apiVersion: weik.io/v1
kind: EventSource
metadata:
name: file-upload-watcher
spec:
Type: file-watch
EventType: com.example.file.uploaded
EventSource: upload-directory
Subject: /uploads
Parameters:
Path: /data/uploads
Pattern: "*.csv"
IncludeSubdirectories: "true"
NotifyFilters: Created,Modified
Custom Event Source
apiVersion: weik.io/v1
kind: EventSource
metadata:
name: custom-sensor-data
spec:
Type: custom
EventType: com.example.sensor.reading
EventSource: iot-sensors
Subject: /devices/sensor-001
Parameters:
Implementation: ./sources/sensor_reader.py
DeviceId: sensor-001
SampleInterval: "5"
Requirements:
pyserial: ">=3.5"
paho-mqtt: ">=1.6.0"
Multiple Event Sources
apiVersion: weik.io/v1
kind: EventSource
metadata:
name: stripe-webhooks
spec:
Type: webhook
EventType: com.stripe.payment.succeeded
EventSource: stripe
Subject: /payments
Parameters:
Path: /webhooks/stripe
Secret: ${stripe-webhook-secret}
---
apiVersion: weik.io/v1
kind: EventSource
metadata:
name: customer-changes
spec:
Type: database
EventType: com.crm.customer.changed
EventSource: crm-database
Subject: /customers
Parameters:
DatabaseType: postgresql
Host: ${database-host}
Database: crm
Table: customers
TrackInserts: "true"
TrackUpdates: "true"
---
apiVersion: weik.io/v1
kind: EventSource
metadata:
name: price-updates-poll
spec:
Type: http-poll
EventType: com.supplier.price.updated
EventSource: supplier-api
Parameters:
Url: https://supplier.example.com/api/prices
PollingInterval: "300"
Method: GET
Complex Webhook with Transformation
apiVersion: weik.io/v1
kind: EventSource
metadata:
name: shopify-orders
spec:
Type: webhook
EventType: com.shopify.orders.create
EventSource: shopify-store
Subject: /orders
Parameters:
Path: /webhooks/shopify/orders
Secret: ${shopify-webhook-secret}
ValidateSignature: "true"
TransformPayload: |
def transform(payload):
return {
'orderId': payload['id'],
'orderNumber': payload['order_number'],
'customer': {
'id': payload['customer']['id'],
'email': payload['customer']['email']
},
'total': payload['total_price'],
'currency': payload['currency'],
'items': [
{
'productId': item['product_id'],
'quantity': item['quantity'],
'price': item['price']
}
for item in payload['line_items']
]
}
Variables:
- SourceColumn: orderId
TargetVariable: OrderId
- SourceColumn: customer.email
TargetVariable: CustomerEmail
- SourceColumn: total
TargetVariable: OrderTotal
Event Source Types
Webhook
Receives HTTP webhook calls:
Type: webhook
Parameters:
Path: /webhooks/endpoint
Secret: webhook-secret
ValidateSignature: "true"
AllowedIPs: "10.0.0.0/8,192.168.0.0/16"
Use cases: GitHub, Stripe, Shopify webhooks, custom integrations
HTTP Poll
Polls HTTP endpoints at intervals:
Type: http-poll
Parameters:
Url: https://api.example.com/endpoint
Method: GET
Headers: '{"Authorization": "Bearer token"}'
PollingInterval: "60"
ResponsePath: $.data
Use cases: APIs without webhooks, legacy systems, RSS feeds
Database
Monitors database changes:
Type: database
Parameters:
DatabaseType: postgresql
Host: localhost
Database: mydb
Table: tablename
TrackInserts: "true"
TrackUpdates: "true"
TrackDeletes: "true"
Use cases: Database change tracking, CDC, data synchronization
File Watch
Monitors file system changes:
Type: file-watch
Parameters:
Path: /data/directory
Pattern: "*.csv"
NotifyFilters: Created,Modified,Deleted
Use cases: File uploads, batch processing, FTP drops
Custom
Custom implementation:
Type: custom
Parameters:
Implementation: ./path/to/source.py
CustomParam1: value1
Use cases: IoT devices, message queues, proprietary protocols
CloudEvents Properties
EventSource uses CloudEvents specification:
- EventType: Reverse DNS notation (e.g.,
com.example.orders.created) - EventSource: URI identifying the source (e.g.,
order-api,/services/orders) - Subject: Optional resource identifier (e.g.,
/orders/12345)
These properties help categorize and route events consistently.
Usage Notes
EventSource configurations define how events enter the Weik.io platform. Sources generate CloudEvents-compliant events that flow through channels to subscribers.
Event Flow
EventSource → EventChannel → EventSubscription → IntegrationFlow
- Source generates event
- Event is published to channel
- Subscribers receive event
- Integration flows process event
Webhook Configuration
Webhooks expose HTTP endpoints:
- Endpoint:
https://your-host/webhooks/{Path} - Verify signatures using
Secret - Restrict access with
AllowedIPs
Configure webhook URL in external system:
https://your-weikio-host/webhooks/github
Polling Configuration
HTTP polling retrieves data periodically:
PollingIntervalin secondsResponsePathuses JSONPath to extract events- One event per item in response array
Database Change Tracking
Monitor database tables:
- Supports PostgreSQL, MySQL, SQL Server, Oracle
- Tracks inserts, updates, deletes
- Generates event per change
- Use
Variablesto map columns to event data
File Watching
Monitor directories for changes:
- Supports patterns (glob)
- Watches subdirectories optionally
- Generates event per file change
- Event includes file path and metadata
Variable Mapping
Map source data to event variables:
Variables:
- SourceColumn: field_name
TargetVariable: VariableName
Variables are accessible in subscribing flows using ${VariableName} syntax.
Security
Protect event sources:
- Use
Secretfor webhook signature validation - Restrict webhook access with
AllowedIPs - Store credentials in Variables with
IsSecret: true - Use HTTPS for webhook endpoints
- Implement authentication for HTTP polling
- Secure database connections with SSL
Error Handling
Handle errors in event sources:
- Implement retry logic for transient failures
- Log errors for troubleshooting
- Dead-letter failed events
- Alert on persistent failures
- Monitor source health
Performance
Optimize event source performance:
- Set appropriate polling intervals
- Batch database changes when possible
- Limit file watch scope with patterns
- Monitor event generation rate
- Scale webhook endpoints as needed
Best Practices
- Use descriptive EventType in reverse DNS format
- Set meaningful EventSource identifiers
- Implement signature validation for webhooks
- Choose appropriate polling intervals (not too frequent)
- Use database change tracking for real-time data sync
- Monitor source health and event flow
- Document event schemas
- Test sources before production deployment
- Implement idempotency in event handlers
- Use CloudEvents consistently
Event Schema
Document event structure:
# Example event generated:
{
"specversion": "1.0",
"type": "com.example.orders.created",
"source": "order-api",
"subject": "/orders/12345",
"id": "unique-event-id",
"time": "2025-01-15T10:30:00Z",
"datacontenttype": "application/json",
"data": {
"orderId": "12345",
"customerId": "67890",
"total": 99.99
}
}
Monitoring
Monitor event sources:
weikio eventsource ls
weikio eventsourceinstance ls
weikio eventsourceinstance status <instance-id>
Track metrics:
- Event generation rate
- Error rate
- Processing latency
- Source availability
Testing
Test event sources:
- Create source configuration
- Deploy to test environment
- Generate test events
- Verify event delivery
- Validate event data
- Test error scenarios
- Monitor performance under load