Entity Store
Intelligent change detection for polling-based integrations
The Entity Store solves a very specific, very annoying problem: having to poll a dumb API every five minutes just to see if anything changed.
If an external system doesn’t support webhooks and doesn’t give you a reliable “updated_at” timestamp to filter by, the Entity Store figures out the delta for you.
What is the Entity Store?
It’s a state-tracking database for your polling integrations. You fetch the entire dataset from the external system and dump it into the Entity Store. The store compares the new payload against what it already has. If a record is new, or if any fields changed, it emits a change event. If the record is exactly the same as the last time you polled it, the Entity Store quietly drops it.
This means your downstream flows only run when actual work needs to be done.
Why use it?
- Save your API limits - Stop making 10,000 downstream API calls when only 3 records actually changed.
- Audit history - It keeps the last 64 versions of an entity, so you can see exactly how a record mutated over time.
- Decoupled architecture - You separate the messy business of polling from the actual business logic of processing the changes.
How It Works
The pattern requires two separate integration flows:
┌─────────────────────────────────────────────────────────────────────┐
│ STAGE 1: POLL AND STORE │
│ (Producer Flow) │
└─────────────────────────────────────────────────────────────────────┘
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ External │ Poll │ Integration │ Push │ Entity │
│ System │ ───────>│ Flow │ ───────>│ Store │
│ (API/DB) │ Every │ (Producer) │ All │ │
└──────────────┘ 5 min └──────────────┘ Data └──────┬───────┘
│
│ Compare
│ Detect Changes
│
v
┌────────────────┐
│ Changefeed │
│ (Only actual │
│ changes) │
└────────┬───────┘
│
┌─────────────────────────────────────────────────────────────────────┐
│ STAGE 2: PROCESS CHANGES │
│ (Consumer Flow) │
└─────────────────────────────────────────────────────────────────────┘
│
v
┌────────────────┐
│ Integration │
│ Flow │
│ (Consumer) │
└────────┬───────┘
│
v
┌──────────────────────────┐
│ Process Only Changes: │
│ • Update downstream │
│ • Send notifications │
│ • Execute business logic│
└──────────────────────────┘
Stage 1: Poll and Store (The Producer)
Your first flow just blindly fetches data and throws it at the wall:
- Poll the external API or database on a timer.
- Push every single record to the Entity Store.
- The Entity Store does the heavy lifting of comparing the JSON.
Stage 2: Process Changes (The Consumer)
Your second flow actually does the work, but only when told to:
- Subscribe to the Entity Store changefeed.
- Wake up only when a record is created, updated, or deleted.
- Run your business logic (sync to Salesforce, send an email, etc.).
A Practical Example
Let’s say you’re syncing a list of 100 customers:
- 10:00 AM - You poll the API. All 100 customers are new. The Entity Store emits 100
Createdevents. Your downstream flow runs 100 times. - 10:05 AM - You poll the API again, getting all 100 customers. Only 2 of them have changes. The Entity Store emits 2
Updatedevents. Your downstream flow runs 2 times. - 10:10 AM - You poll the API again. Nobody changed anything. The Entity Store emits 0 events. Your downstream flow stays asleep.
Result: Producer runs every 5 minutes polling 100 records, but consumer only processes the 7 records that actually changed, saving 93 downstream API calls.
Use Cases
Polling APIs Without Timestamps
Many systems don’t provide “last modified” timestamps or “changed since” query parameters:
# Producer: Poll external API every 5 minutes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-customer-api
spec:
Schedule: "*/5 * * * *" # Every 5 minutes
Route:
- route:
from:
uri: timer:poll
steps:
- to: "https://api.example.com/customers"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "customer"
entityId: "${body[customerId]}"
source: "https://api.example.com/customers"
# Consumer: Process only actual changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: process-customer-changes
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "customer"
steps:
- log: "Customer changed: ${body}"
- to: "salesforce:update-customer"
The producer polls every 5 minutes and pushes all data to the Entity Store. The consumer only receives events when customer data has actually changed, preventing unnecessary processing and downstream API calls.
Polling Databases Without CDC
For databases where Change Data Capture (CDC) isn’t available or practical:
# Producer: Poll database every minute
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-legacy-db
spec:
Schedule: "* * * * *" # Every minute
Route:
- route:
from:
uri: timer:poll
steps:
- to: "sql:SELECT * FROM orders?dataSource=#myDataSource"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "order"
entityId: "${body[order_id]}"
source: "legacy-database"
# Consumer: React to actual order changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: process-order-changes
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "order"
steps:
- choice:
when:
- simple: "${body[Operation]} == 'Created'"
steps:
- to: "direct:send-order-confirmation"
- simple: "${body[Operation]} == 'Updated'"
steps:
- to: "direct:update-shipping-status"
File System Monitoring
Monitor file systems or S3 buckets for content changes:
# Producer: Poll S3 bucket for files
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-s3-files
spec:
Schedule: "*/10 * * * *" # Every 10 minutes
Route:
- route:
from:
uri: timer:poll
steps:
- to: "aws2-s3://my-bucket?operation=listObjects"
- split:
simple: "${body}"
steps:
- to: "aws2-s3://my-bucket?operation=getObject"
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "s3-file"
entityId: "${body[key]}"
source: "s3://my-bucket"
# Consumer: Process only changed files
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: process-file-changes
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "s3-file"
steps:
- log: "File changed: ${body[EntityId]}"
- to: "direct:process-file"
IoT Sensor Data Deduplication
Poll IoT sensors and only react to actual state changes:
# Producer: Poll sensors frequently
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-sensors
spec:
Schedule: "*/30 * * * * *" # Every 30 seconds
Route:
- route:
from:
uri: timer:poll
steps:
- to: "mqtt:sensors/temperature/#"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "sensor-reading"
entityId: "${body[sensorId]}"
source: "mqtt://sensors"
# Consumer: Alert only on actual temperature changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: temperature-alerts
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "sensor-reading"
steps:
- filter:
simple: "${body[Data][temperature]} > 80"
steps:
- to: "direct:send-alert"
Configuration
Required Kamelets
Entity Store requires two Kamelets to be available in your Weik.io installation:
- entitystore-sink - Pushes data to Entity Store
- entitystore-source - Consumes changefeed events from Entity Store
These Kamelets are provided with Weik.io and use NATS as the underlying message broker.
Entity Types
Entity types categorize your data and allow multiple producers and consumers to work with different data types:
# Producer for customers
parameters:
entityType: "customer"
# Consumer for customers
parameters:
entityType: "customer"
# Different producer for orders
parameters:
entityType: "order"
Each entity type maintains its own changefeed stream, allowing independent processing.
Changefeed Event Format
When consuming from the Entity Store changefeed using kamelet:entitystore-source, events are structured as follows:
{
"EntityType": "customer",
"EntityId": "12345",
"Timestamp": "2025-01-13T10:30:00Z",
"Operation": "Created",
"Data": {
"customerId": "12345",
"name": "John Doe",
"email": "john@example.com"
}
}
Event Fields:
EntityType- The entity type specified in the sinkEntityId- The unique identifier for the entityTimestamp- When the change was detectedOperation- Type of change:Created,Updated, orDeletedData- The actual entity data that was pushed to the sink
Accessing Data in Routes:
# Access operation type
- simple: "${body[Operation]} == 'Created'"
# Access entity ID
- simple: "${body[EntityId]}"
# Access entity data fields
- simple: "${body[Data][customerId]}"
- simple: "${body[Data][email]}"
Storage Statistics
The Entity Store provides real-time statistics:
- Entities Stored - Current number of unique entities
- Entities with History - Total entities including historical versions
- History Kept - Number of historical versions maintained (default: 64)
- Current Size - Storage space used
- Maximum Age - Data retention period (default: unlimited)
Querying Entities
Search for stored entities using:
- Entity Type - Filter by entity type or category
- Entity ID - Look up specific entities by identifier
- Changed Since - Find entities modified after a specific date
Benefits Over Direct Polling
Without Entity Store
# Direct polling - processes everything every time
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: direct-poll
spec:
Schedule: "*/5 * * * *"
Route:
- route:
from:
uri: timer:poll
steps:
- to: "https://api.example.com/customers"
- split:
simple: "${body}"
steps:
- to: "salesforce:sync" # Called for ALL customers every 5 minutes
Problems:
- Processes unchanged data repeatedly
- Wastes API quotas on downstream systems
- Increases costs and latency
- Difficult to implement proper change detection logic
With Entity Store
# Producer: Poll and detect changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-customers
spec:
Schedule: "*/5 * * * *"
Route:
- route:
from:
uri: timer:poll
steps:
- to: "https://api.example.com/customers"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "customer"
entityId: "${body[customerId]}"
source: "https://api.example.com/customers"
# Consumer: Process only changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: sync-changes
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "customer"
steps:
- to: "salesforce:sync" # Only called for changed customers
Benefits:
- Only processes actual changes
- Reduces downstream API calls by 90%+ in typical scenarios
- Built-in change detection logic
- Maintains audit trail automatically
- Decouples polling from processing
Best Practices
Entity Identification
Ensure each entity has a unique identifier for proper change tracking:
# Use natural keys from your source system
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "customer"
entityId: "${body[customerId]}" # Natural key
source: "my-api"
# Or use composite identifiers
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "order"
entityId: "${body[orderId]}-${body[region]}" # Composite key
source: "order-api"
Polling Frequency
Choose polling frequency based on your requirements:
- High-frequency polling (every minute) - Entity Store prevents downstream overload
- Low-frequency polling (every hour) - Entity Store accumulates changes for batch processing
- Mixed frequency - Different producers can poll at different rates
Error Handling
Implement error handling in both producer and consumer flows:
# Producer with error handling
- route:
from:
uri: timer:poll
steps:
- doTry:
steps:
- to: "https://api.example.com/data"
- split:
simple: "${body}"
steps:
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "data"
entityId: "${body[id]}"
source: "api"
doCatch:
- exception: java.lang.Exception
steps:
- log: "Polling failed: ${exception.message}"
- to: "direct:alert"
Comparison with Database Change Tracking
| Feature | Entity Store | Database Change Tracking |
|---|---|---|
| Source | Any system (APIs, files, databases) | Databases only |
| Change Detection | Application-level data comparison | Database-level CDC |
| Setup | No database configuration needed | Requires CDC enabled on database |
| Latency | Depends on polling frequency | Near real-time |
| Overhead | Minimal (storage only) | Database performance impact |
| Use Case | Systems without CDC/webhooks | Databases with CDC support |
Use Entity Store when:
- Source system doesn’t support CDC or webhooks
- Polling is the only available option
- You need change detection across multiple diverse sources
- Source is not a database (API, file, message queue, etc.)
Use Database Change Tracking when:
- Source is a database with CDC support
- Near real-time change detection is required
- You control the database configuration
- Minimal latency is critical
See Database Change Tracking for database-specific CDC patterns.
Complete Example
Here’s a complete working example that polls a REST API and processes only changes:
# poll-products.yaml
# Producer: Polls product API every 5 minutes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: poll-products
description: Poll products from external API
spec:
Schedule: "*/5 * * * *"
Route:
- route:
from:
uri: timer:poll
steps:
# Fetch products from API
- to:
uri: "https://api.example.com/products"
parameters:
httpMethod: GET
# Parse JSON response
- unmarshal:
json: {}
# Split into individual products
- split:
simple: "${body}"
steps:
# Push each product to Entity Store
- to:
uri: "kamelet:entitystore-sink"
parameters:
entityType: "product"
entityId: "${body[productId]}"
source: "https://api.example.com/products"
# process-product-changes.yaml
# Consumer: Processes only changed products
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: process-product-changes
description: Process product changes and sync to warehouse
spec:
Route:
- route:
from:
uri: "kamelet:entitystore-source"
parameters:
entityType: "product"
steps:
# Log the change
- log:
message: "Product ${body[Operation]}: ${body[EntityId]}"
# Route based on operation type
- choice:
when:
# New product created
- simple: "${body[Operation]} == 'Created'"
steps:
- log: "Creating new product in warehouse"
- to: "direct:create-warehouse-product"
# Product updated
- simple: "${body[Operation]} == 'Updated'"
steps:
- log: "Updating product in warehouse"
- to: "direct:update-warehouse-product"
# Product deleted
- simple: "${body[Operation]} == 'Deleted'"
steps:
- log: "Archiving product in warehouse"
- to: "direct:archive-warehouse-product"
Result: The producer polls 1000 products every 5 minutes. Entity Store detects that only 3 products changed. The consumer processes only those 3 products, saving 997 unnecessary warehouse API calls.
What’s Next
- Database Change Tracking - Real-time database change detection with CDC
- Integration Overview - Learn about integrations
- Using Variables - Use variables in entity configurations
- Integration Flows - Define integration flows