Entity Store

Intelligent change detection for polling-based integrations

The Entity Store provides intelligent change tracking by detecting actual data changes and emitting changefeed events only when data has truly changed. Perfect for polling systems that don’t provide update timestamps or webhooks.

What is Entity Store?

Entity Store is a change detection system designed for polling-based integrations. You continuously push data to the Entity Store, and it automatically compares the data to detect real changes. Only when actual changes are detected does it emit changefeed events that trigger downstream integration flows.

This solves the common problem of polling large amounts of data from systems that lack proper change tracking, update timestamps, or webhook capabilities.

Key Features

  • Intelligent Change Detection - Automatically detects actual data changes, not just new polls
  • Changefeed Events - Only emits events when data has truly changed
  • Polling Pattern Support - Designed for systems without webhooks or update timestamps
  • History Tracking - Maintains up to 64 versions of entity history for audit trails
  • Deduplication - Prevents duplicate processing of unchanged data
  • Size Management - Monitor storage usage with configurable limits

How It Works

The Entity Store change detection pattern works in two stages:

┌─────────────────────────────────────────────────────────────────────┐
│                    STAGE 1: POLL AND STORE                          │
│                      (Producer Flow)                                │
└─────────────────────────────────────────────────────────────────────┘

  ┌──────────────┐         ┌──────────────┐         ┌──────────────┐
  │   External   │  Poll   │  Integration │  Push   │    Entity    │
  │    System    │ ───────>│     Flow     │ ───────>│    Store     │
  │  (API/DB)    │  Every  │  (Producer)  │   All   │              │
  └──────────────┘  5 min  └──────────────┘   Data  └──────┬───────┘

                                                             │ Compare
                                                             │ Detect Changes

                                                             v
                                                    ┌────────────────┐
                                                    │  Changefeed    │
                                                    │  (Only actual  │
                                                    │   changes)     │
                                                    └────────┬───────┘

┌─────────────────────────────────────────────────────────────────────┐
│                  STAGE 2: PROCESS CHANGES                           │
│                     (Consumer Flow)                                 │
└─────────────────────────────────────────────────────────────────────┘

                                                             v
                                                    ┌────────────────┐
                                                    │  Integration   │
                                                    │     Flow       │
                                                    │  (Consumer)    │
                                                    └────────┬───────┘

                                                             v
                                              ┌──────────────────────────┐
                                              │  Process Only Changes:   │
                                              │  • Update downstream     │
                                              │  • Send notifications    │
                                              │  • Execute business logic│
                                              └──────────────────────────┘

Stage 1: Poll and Store (Producer Flow)

Create an integration flow that polls data from your source system and pushes it to the Entity Store:

  1. Poll Source System - Regularly fetch data from your external system (e.g., every minute)
  2. Push to Entity Store - Send all fetched data to the Entity Store
  3. Automatic Comparison - Entity Store compares new data with stored versions
  4. Change Detection - Only actual changes trigger changefeed events
  5. No Duplicate Events - Unchanged data is stored but doesn’t emit events

Stage 2: Process Changes (Consumer Flow)

Create a separate integration flow that listens to the Entity Store changefeed:

  1. Subscribe to Changefeed - Flow waits for change events from Entity Store
  2. Receive Only Changes - Gets notified only when data has actually changed
  3. Process Changes - Execute business logic, sync to other systems, send notifications, etc.

This two-stage pattern decouples polling frequency from change processing, allowing you to poll frequently while only processing actual changes.

Example Data Flow

Time: 10:00 - Producer polls API, gets 100 customers
              └─> Push to Entity Store
                  └─> 5 customers changed → 5 changefeed events
                      └─> Consumer processes 5 customers

Time: 10:05 - Producer polls API, gets 100 customers
              └─> Push to Entity Store
                  └─> 2 customers changed → 2 changefeed events
                      └─> Consumer processes 2 customers

Time: 10:10 - Producer polls API, gets 100 customers
              └─> Push to Entity Store
                  └─> 0 customers changed → 0 changefeed events
                      └─> Consumer processes nothing

Result: Producer runs every 5 minutes polling 100 records, but consumer only processes the 7 records that actually changed, saving 93 downstream API calls.

Use Cases

Polling APIs Without Timestamps

Many systems don’t provide “last modified” timestamps or “changed since” query parameters:

# Producer: Poll external API every 5 minutes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-customer-api
spec:
  Schedule: "*/5 * * * *"  # Every 5 minutes
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "https://api.example.com/customers"
          - split:
              simple: "${body}"
              steps:
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "customer"
                      entityId: "${body[customerId]}"
                      source: "https://api.example.com/customers"
# Consumer: Process only actual changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: process-customer-changes
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "customer"
        steps:
          - log: "Customer changed: ${body}"
          - to: "salesforce:update-customer"

The producer polls every 5 minutes and pushes all data to the Entity Store. The consumer only receives events when customer data has actually changed, preventing unnecessary processing and downstream API calls.

Polling Databases Without CDC

For databases where Change Data Capture (CDC) isn’t available or practical:

# Producer: Poll database every minute
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-legacy-db
spec:
  Schedule: "* * * * *"  # Every minute
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "sql:SELECT * FROM orders?dataSource=#myDataSource"
          - split:
              simple: "${body}"
              steps:
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "order"
                      entityId: "${body[order_id]}"
                      source: "legacy-database"
# Consumer: React to actual order changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: process-order-changes
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "order"
        steps:
          - choice:
              when:
                - simple: "${body[Operation]} == 'Created'"
                  steps:
                    - to: "direct:send-order-confirmation"
                - simple: "${body[Operation]} == 'Updated'"
                  steps:
                    - to: "direct:update-shipping-status"

File System Monitoring

Monitor file systems or S3 buckets for content changes:

# Producer: Poll S3 bucket for files
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-s3-files
spec:
  Schedule: "*/10 * * * *"  # Every 10 minutes
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "aws2-s3://my-bucket?operation=listObjects"
          - split:
              simple: "${body}"
              steps:
                - to: "aws2-s3://my-bucket?operation=getObject"
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "s3-file"
                      entityId: "${body[key]}"
                      source: "s3://my-bucket"
# Consumer: Process only changed files
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: process-file-changes
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "s3-file"
        steps:
          - log: "File changed: ${body[EntityId]}"
          - to: "direct:process-file"

IoT Sensor Data Deduplication

Poll IoT sensors and only react to actual state changes:

# Producer: Poll sensors frequently
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-sensors
spec:
  Schedule: "*/30 * * * * *"  # Every 30 seconds
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "mqtt:sensors/temperature/#"
          - split:
              simple: "${body}"
              steps:
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "sensor-reading"
                      entityId: "${body[sensorId]}"
                      source: "mqtt://sensors"
# Consumer: Alert only on actual temperature changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: temperature-alerts
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "sensor-reading"
        steps:
          - filter:
              simple: "${body[Data][temperature]} > 80"
              steps:
                - to: "direct:send-alert"

Configuration

Required Kamelets

Entity Store requires two Kamelets to be available in your Weik.io installation:

  • entitystore-sink - Pushes data to Entity Store
  • entitystore-source - Consumes changefeed events from Entity Store

These Kamelets are provided with Weik.io and use NATS as the underlying message broker.

Entity Types

Entity types categorize your data and allow multiple producers and consumers to work with different data types:

# Producer for customers
parameters:
  entityType: "customer"

# Consumer for customers
parameters:
  entityType: "customer"

# Different producer for orders
parameters:
  entityType: "order"

Each entity type maintains its own changefeed stream, allowing independent processing.

Changefeed Event Format

When consuming from the Entity Store changefeed using kamelet:entitystore-source, events are structured as follows:

{
  "EntityType": "customer",
  "EntityId": "12345",
  "Timestamp": "2025-01-13T10:30:00Z",
  "Operation": "Created",
  "Data": {
    "customerId": "12345",
    "name": "John Doe",
    "email": "john@example.com"
  }
}

Event Fields:

  • EntityType - The entity type specified in the sink
  • EntityId - The unique identifier for the entity
  • Timestamp - When the change was detected
  • Operation - Type of change: Created, Updated, or Deleted
  • Data - The actual entity data that was pushed to the sink

Accessing Data in Routes:

# Access operation type
- simple: "${body[Operation]} == 'Created'"

# Access entity ID
- simple: "${body[EntityId]}"

# Access entity data fields
- simple: "${body[Data][customerId]}"
- simple: "${body[Data][email]}"

Storage Statistics

The Entity Store provides real-time statistics:

  • Entities Stored - Current number of unique entities
  • Entities with History - Total entities including historical versions
  • History Kept - Number of historical versions maintained (default: 64)
  • Current Size - Storage space used
  • Maximum Age - Data retention period (default: unlimited)

Querying Entities

Search for stored entities using:

  • Entity Type - Filter by entity type or category
  • Entity ID - Look up specific entities by identifier
  • Changed Since - Find entities modified after a specific date

Benefits Over Direct Polling

Without Entity Store

# Direct polling - processes everything every time
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: direct-poll
spec:
  Schedule: "*/5 * * * *"
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "https://api.example.com/customers"
          - split:
              simple: "${body}"
              steps:
                - to: "salesforce:sync"  # Called for ALL customers every 5 minutes

Problems:

  • Processes unchanged data repeatedly
  • Wastes API quotas on downstream systems
  • Increases costs and latency
  • Difficult to implement proper change detection logic

With Entity Store

# Producer: Poll and detect changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-customers
spec:
  Schedule: "*/5 * * * *"
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "https://api.example.com/customers"
          - split:
              simple: "${body}"
              steps:
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "customer"
                      entityId: "${body[customerId]}"
                      source: "https://api.example.com/customers"
# Consumer: Process only changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: sync-changes
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "customer"
        steps:
          - to: "salesforce:sync"  # Only called for changed customers

Benefits:

  • Only processes actual changes
  • Reduces downstream API calls by 90%+ in typical scenarios
  • Built-in change detection logic
  • Maintains audit trail automatically
  • Decouples polling from processing

Best Practices

Entity Identification

Ensure each entity has a unique identifier for proper change tracking:

# Use natural keys from your source system
- to:
    uri: "kamelet:entitystore-sink"
    parameters:
      entityType: "customer"
      entityId: "${body[customerId]}"  # Natural key
      source: "my-api"

# Or use composite identifiers
- to:
    uri: "kamelet:entitystore-sink"
    parameters:
      entityType: "order"
      entityId: "${body[orderId]}-${body[region]}"  # Composite key
      source: "order-api"

Polling Frequency

Choose polling frequency based on your requirements:

  • High-frequency polling (every minute) - Entity Store prevents downstream overload
  • Low-frequency polling (every hour) - Entity Store accumulates changes for batch processing
  • Mixed frequency - Different producers can poll at different rates

Error Handling

Implement error handling in both producer and consumer flows:

# Producer with error handling
- route:
    from:
      uri: timer:poll
    steps:
      - doTry:
          steps:
            - to: "https://api.example.com/data"
            - split:
                simple: "${body}"
                steps:
                  - to:
                      uri: "kamelet:entitystore-sink"
                      parameters:
                          entityType: "data"
                        entityId: "${body[id]}"
                        source: "api"
          doCatch:
            - exception: java.lang.Exception
              steps:
                - log: "Polling failed: ${exception.message}"
                - to: "direct:alert"

Comparison with Database Change Tracking

FeatureEntity StoreDatabase Change Tracking
SourceAny system (APIs, files, databases)Databases only
Change DetectionApplication-level data comparisonDatabase-level CDC
SetupNo database configuration neededRequires CDC enabled on database
LatencyDepends on polling frequencyNear real-time
OverheadMinimal (storage only)Database performance impact
Use CaseSystems without CDC/webhooksDatabases with CDC support

Use Entity Store when:

  • Source system doesn’t support CDC or webhooks
  • Polling is the only available option
  • You need change detection across multiple diverse sources
  • Source is not a database (API, file, message queue, etc.)

Use Database Change Tracking when:

  • Source is a database with CDC support
  • Near real-time change detection is required
  • You control the database configuration
  • Minimal latency is critical

See Database Change Tracking for database-specific CDC patterns.

Complete Example

Here’s a complete working example that polls a REST API and processes only changes:

# poll-products.yaml
# Producer: Polls product API every 5 minutes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-products
  description: Poll products from external API
spec:
  Schedule: "*/5 * * * *"
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          # Fetch products from API
          - to:
              uri: "https://api.example.com/products"
              parameters:
                httpMethod: GET

          # Parse JSON response
          - unmarshal:
              json: {}

          # Split into individual products
          - split:
              simple: "${body}"
              steps:
                # Push each product to Entity Store
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "product"
                      entityId: "${body[productId]}"
                      source: "https://api.example.com/products"
# process-product-changes.yaml
# Consumer: Processes only changed products
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: process-product-changes
  description: Process product changes and sync to warehouse
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "product"
        steps:
          # Log the change
          - log:
              message: "Product ${body[Operation]}: ${body[EntityId]}"

          # Route based on operation type
          - choice:
              when:
                # New product created
                - simple: "${body[Operation]} == 'Created'"
                  steps:
                    - log: "Creating new product in warehouse"
                    - to: "direct:create-warehouse-product"

                # Product updated
                - simple: "${body[Operation]} == 'Updated'"
                  steps:
                    - log: "Updating product in warehouse"
                    - to: "direct:update-warehouse-product"

                # Product deleted
                - simple: "${body[Operation]} == 'Deleted'"
                  steps:
                    - log: "Archiving product in warehouse"
                    - to: "direct:archive-warehouse-product"

Result: The producer polls 1000 products every 5 minutes. Entity Store detects that only 3 products changed. The consumer processes only those 3 products, saving 997 unnecessary warehouse API calls.

What’s Next