Entity Store
Intelligent change detection for polling-based integrations
The Entity Store provides intelligent change tracking by detecting actual data changes and emitting changefeed events only when data has truly changed. Perfect for polling systems that don’t provide update timestamps or webhooks.
What is Entity Store?
Entity Store is a change detection system designed for polling-based integrations. You continuously push data to the Entity Store, and it automatically compares the data to detect real changes. Only when actual changes are detected does it emit changefeed events that trigger downstream integration flows.
This solves the common problem of polling large amounts of data from systems that lack proper change tracking, update timestamps, or webhook capabilities.
Key Features
- Intelligent Change Detection - Automatically detects actual data changes, not just new polls
- Changefeed Events - Only emits events when data has truly changed
- Polling Pattern Support - Designed for systems without webhooks or update timestamps
- History Tracking - Maintains up to 64 versions of entity history for audit trails
- Deduplication - Prevents duplicate processing of unchanged data
- Size Management - Monitor storage usage with configurable limits
How It Works
The Entity Store change detection pattern works in two stages:
┌─────────────────────────────────────────────────────────────────────┐
│ STAGE 1: POLL AND STORE │
│ (Producer Flow) │
└─────────────────────────────────────────────────────────────────────┘
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ External │ Poll │ Integration │ Push │ Entity │
│ System │ ───────>│ Flow │ ───────>│ Store │
│ (API/DB) │ Every │ (Producer) │ All │ │
└──────────────┘ 5 min └──────────────┘ Data └──────┬───────┘
│
│ Compare
│ Detect Changes
│
v
┌────────────────┐
│ Changefeed │
│ (Only actual │
│ changes) │
└────────┬───────┘
│
┌─────────────────────────────────────────────────────────────────────┐
│ STAGE 2: PROCESS CHANGES │
│ (Consumer Flow) │
└─────────────────────────────────────────────────────────────────────┘
│
v
┌────────────────┐
│ Integration │
│ Flow │
│ (Consumer) │
└────────┬───────┘
│
v
┌──────────────────────────┐
│ Process Only Changes: │
│ • Update downstream │
│ • Send notifications │
│ • Execute business logic│
└──────────────────────────┘
Stage 1: Poll and Store (Producer Flow)
Create an integration flow that polls data from your source system and pushes it to the Entity Store:
- Poll Source System - Regularly fetch data from your external system (e.g., every minute)
- Push to Entity Store - Send all fetched data to the Entity Store
- Automatic Comparison - Entity Store compares new data with stored versions
- Change Detection - Only actual changes trigger changefeed events
- No Duplicate Events - Unchanged data is stored but doesn’t emit events
Stage 2: Process Changes (Consumer Flow)
Create a separate integration flow that listens to the Entity Store changefeed:
- Subscribe to Changefeed - Flow waits for change events from Entity Store
- Receive Only Changes - Gets notified only when data has actually changed
- Process Changes - Execute business logic, sync to other systems, send notifications, etc.
This two-stage pattern decouples polling frequency from change processing, allowing you to poll frequently while only processing actual changes.
Example Data Flow
Time: 10:00 - Producer polls API, gets 100 customers
└─> Push to Entity Store
└─> 5 customers changed → 5 changefeed events
└─> Consumer processes 5 customers
Time: 10:05 - Producer polls API, gets 100 customers
└─> Push to Entity Store
└─> 2 customers changed → 2 changefeed events
└─> Consumer processes 2 customers
Time: 10:10 - Producer polls API, gets 100 customers
└─> Push to Entity Store
└─> 0 customers changed → 0 changefeed events
└─> Consumer processes nothing
Result: Producer runs every 5 minutes polling 100 records, but consumer only processes the 7 records that actually changed, saving 93 downstream API calls.
Use Cases
Polling APIs Without Timestamps
Many systems don’t provide “last modified” timestamps or “changed since” query parameters:
# Producer: Poll external API every 5 minutes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-customer-api
spec:
Schedule: "*/5 * * * *" # Every 5 minutes
Route:
- route:
from:
uri: timer:poll
steps:
- to: "https://api.example.com/customers"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "customer"
entityId: "${body[customerId]}"
source: "https://api.example.com/customers"
# Consumer: Process only actual changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: process-customer-changes
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "customer"
steps:
- log: "Customer changed: ${body}"
- to: "salesforce:update-customer"
The producer polls every 5 minutes and pushes all data to the Entity Store. The consumer only receives events when customer data has actually changed, preventing unnecessary processing and downstream API calls.
Polling Databases Without CDC
For databases where Change Data Capture (CDC) isn’t available or practical:
# Producer: Poll database every minute
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-legacy-db
spec:
Schedule: "* * * * *" # Every minute
Route:
- route:
from:
uri: timer:poll
steps:
- to: "sql:SELECT * FROM orders?dataSource=#myDataSource"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "order"
entityId: "${body[order_id]}"
source: "legacy-database"
# Consumer: React to actual order changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: process-order-changes
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "order"
steps:
- choice:
when:
- simple: "${body[Operation]} == 'Created'"
steps:
- to: "direct:send-order-confirmation"
- simple: "${body[Operation]} == 'Updated'"
steps:
- to: "direct:update-shipping-status"
File System Monitoring
Monitor file systems or S3 buckets for content changes:
# Producer: Poll S3 bucket for files
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-s3-files
spec:
Schedule: "*/10 * * * *" # Every 10 minutes
Route:
- route:
from:
uri: timer:poll
steps:
- to: "aws2-s3://my-bucket?operation=listObjects"
- split:
simple: "${body}"
steps:
- to: "aws2-s3://my-bucket?operation=getObject"
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "s3-file"
entityId: "${body[key]}"
source: "s3://my-bucket"
# Consumer: Process only changed files
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: process-file-changes
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "s3-file"
steps:
- log: "File changed: ${body[EntityId]}"
- to: "direct:process-file"
IoT Sensor Data Deduplication
Poll IoT sensors and only react to actual state changes:
# Producer: Poll sensors frequently
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-sensors
spec:
Schedule: "*/30 * * * * *" # Every 30 seconds
Route:
- route:
from:
uri: timer:poll
steps:
- to: "mqtt:sensors/temperature/#"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "sensor-reading"
entityId: "${body[sensorId]}"
source: "mqtt://sensors"
# Consumer: Alert only on actual temperature changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: temperature-alerts
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "sensor-reading"
steps:
- filter:
simple: "${body[Data][temperature]} > 80"
steps:
- to: "direct:send-alert"
Configuration
Required Kamelets
Entity Store requires two Kamelets to be available in your Weik.io installation:
- entitystore-sink - Pushes data to Entity Store
- entitystore-source - Consumes changefeed events from Entity Store
These Kamelets are provided with Weik.io and use NATS as the underlying message broker.
Entity Types
Entity types categorize your data and allow multiple producers and consumers to work with different data types:
# Producer for customers
parameters:
entityType: "customer"
# Consumer for customers
parameters:
entityType: "customer"
# Different producer for orders
parameters:
entityType: "order"
Each entity type maintains its own changefeed stream, allowing independent processing.
Changefeed Event Format
When consuming from the Entity Store changefeed using kamelet:entitystore-source, events are structured as follows:
{
"EntityType": "customer",
"EntityId": "12345",
"Timestamp": "2025-01-13T10:30:00Z",
"Operation": "Created",
"Data": {
"customerId": "12345",
"name": "John Doe",
"email": "john@example.com"
}
}
Event Fields:
EntityType- The entity type specified in the sinkEntityId- The unique identifier for the entityTimestamp- When the change was detectedOperation- Type of change:Created,Updated, orDeletedData- The actual entity data that was pushed to the sink
Accessing Data in Routes:
# Access operation type
- simple: "${body[Operation]} == 'Created'"
# Access entity ID
- simple: "${body[EntityId]}"
# Access entity data fields
- simple: "${body[Data][customerId]}"
- simple: "${body[Data][email]}"
Storage Statistics
The Entity Store provides real-time statistics:
- Entities Stored - Current number of unique entities
- Entities with History - Total entities including historical versions
- History Kept - Number of historical versions maintained (default: 64)
- Current Size - Storage space used
- Maximum Age - Data retention period (default: unlimited)
Querying Entities
Search for stored entities using:
- Entity Type - Filter by entity type or category
- Entity ID - Look up specific entities by identifier
- Changed Since - Find entities modified after a specific date
Benefits Over Direct Polling
Without Entity Store
# Direct polling - processes everything every time
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: direct-poll
spec:
Schedule: "*/5 * * * *"
Route:
- route:
from:
uri: timer:poll
steps:
- to: "https://api.example.com/customers"
- split:
simple: "${body}"
steps:
- to: "salesforce:sync" # Called for ALL customers every 5 minutes
Problems:
- Processes unchanged data repeatedly
- Wastes API quotas on downstream systems
- Increases costs and latency
- Difficult to implement proper change detection logic
With Entity Store
# Producer: Poll and detect changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-customers
spec:
Schedule: "*/5 * * * *"
Route:
- route:
from:
uri: timer:poll
steps:
- to: "https://api.example.com/customers"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "customer"
entityId: "${body[customerId]}"
source: "https://api.example.com/customers"
# Consumer: Process only changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: sync-changes
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "customer"
steps:
- to: "salesforce:sync" # Only called for changed customers
Benefits:
- Only processes actual changes
- Reduces downstream API calls by 90%+ in typical scenarios
- Built-in change detection logic
- Maintains audit trail automatically
- Decouples polling from processing
Best Practices
Entity Identification
Ensure each entity has a unique identifier for proper change tracking:
# Use natural keys from your source system
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "customer"
entityId: "${body[customerId]}" # Natural key
source: "my-api"
# Or use composite identifiers
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "order"
entityId: "${body[orderId]}-${body[region]}" # Composite key
source: "order-api"
Polling Frequency
Choose polling frequency based on your requirements:
- High-frequency polling (every minute) - Entity Store prevents downstream overload
- Low-frequency polling (every hour) - Entity Store accumulates changes for batch processing
- Mixed frequency - Different producers can poll at different rates
Error Handling
Implement error handling in both producer and consumer flows:
# Producer with error handling
- route:
from:
uri: timer:poll
steps:
- doTry:
steps:
- to: "https://api.example.com/data"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "data"
entityId: "${body[id]}"
source: "api"
doCatch:
- exception: java.lang.Exception
steps:
- log: "Polling failed: ${exception.message}"
- to: "direct:alert"
Comparison with Database Change Tracking
| Feature | Entity Store | Database Change Tracking |
|---|---|---|
| Source | Any system (APIs, files, databases) | Databases only |
| Change Detection | Application-level data comparison | Database-level CDC |
| Setup | No database configuration needed | Requires CDC enabled on database |
| Latency | Depends on polling frequency | Near real-time |
| Overhead | Minimal (storage only) | Database performance impact |
| Use Case | Systems without CDC/webhooks | Databases with CDC support |
Use Entity Store when:
- Source system doesn’t support CDC or webhooks
- Polling is the only available option
- You need change detection across multiple diverse sources
- Source is not a database (API, file, message queue, etc.)
Use Database Change Tracking when:
- Source is a database with CDC support
- Near real-time change detection is required
- You control the database configuration
- Minimal latency is critical
See Database Change Tracking for database-specific CDC patterns.
Complete Example
Here’s a complete working example that polls a REST API and processes only changes:
# poll-products.yaml
# Producer: Polls product API every 5 minutes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-products
description: Poll products from external API
spec:
Schedule: "*/5 * * * *"
Route:
- route:
from:
uri: timer:poll
steps:
# Fetch products from API
- to:
uri: "https://api.example.com/products"
parameters:
httpMethod: GET
# Parse JSON response
- unmarshal:
json: {}
# Split into individual products
- split:
simple: "${body}"
steps:
# Push each product to Entity Store
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "product"
entityId: "${body[productId]}"
source: "https://api.example.com/products"
# process-product-changes.yaml
# Consumer: Processes only changed products
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: process-product-changes
description: Process product changes and sync to warehouse
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "product"
steps:
# Log the change
- log:
message: "Product ${body[Operation]}: ${body[EntityId]}"
# Route based on operation type
- choice:
when:
# New product created
- simple: "${body[Operation]} == 'Created'"
steps:
- log: "Creating new product in warehouse"
- to: "direct:create-warehouse-product"
# Product updated
- simple: "${body[Operation]} == 'Updated'"
steps:
- log: "Updating product in warehouse"
- to: "direct:update-warehouse-product"
# Product deleted
- simple: "${body[Operation]} == 'Deleted'"
steps:
- log: "Archiving product in warehouse"
- to: "direct:archive-warehouse-product"
Result: The producer polls 1000 products every 5 minutes. Entity Store detects that only 3 products changed. The consumer processes only those 3 products, saving 997 unnecessary warehouse API calls.
What’s Next
- Database Change Tracking - Real-time database change detection with CDC
- Integration Overview - Learn about integrations
- Using Variables - Use variables in entity configurations
- Integration Flows - Define integration flows