Entity Store

Intelligent change detection for polling-based integrations

The Entity Store solves a very specific, very annoying problem: having to poll a dumb API every five minutes just to see if anything changed.

If an external system doesn’t support webhooks and doesn’t give you a reliable “updated_at” timestamp to filter by, the Entity Store figures out the delta for you.

What is the Entity Store?

It’s a state-tracking database for your polling integrations. You fetch the entire dataset from the external system and dump it into the Entity Store. The store compares the new payload against what it already has. If a record is new, or if any fields changed, it emits a change event. If the record is exactly the same as the last time you polled it, the Entity Store quietly drops it.

This means your downstream flows only run when actual work needs to be done.

Why use it?

  • Save your API limits - Stop making 10,000 downstream API calls when only 3 records actually changed.
  • Audit history - It keeps the last 64 versions of an entity, so you can see exactly how a record mutated over time.
  • Decoupled architecture - You separate the messy business of polling from the actual business logic of processing the changes.

How It Works

The pattern requires two separate integration flows:

┌─────────────────────────────────────────────────────────────────────┐
│                    STAGE 1: POLL AND STORE                          │
│                      (Producer Flow)                                │
└─────────────────────────────────────────────────────────────────────┘

  ┌──────────────┐         ┌──────────────┐         ┌──────────────┐
  │   External   │  Poll   │  Integration │  Push   │    Entity    │
  │    System    │ ───────>│     Flow     │ ───────>│    Store     │
  │  (API/DB)    │  Every  │  (Producer)  │   All   │              │
  └──────────────┘  5 min  └──────────────┘   Data  └──────┬───────┘

                                                             │ Compare
                                                             │ Detect Changes

                                                             v
                                                    ┌────────────────┐
                                                    │  Changefeed    │
                                                    │  (Only actual  │
                                                    │   changes)     │
                                                    └────────┬───────┘

┌─────────────────────────────────────────────────────────────────────┐
│                  STAGE 2: PROCESS CHANGES                           │
│                     (Consumer Flow)                                 │
└─────────────────────────────────────────────────────────────────────┘

                                                             v
                                                    ┌────────────────┐
                                                    │  Integration   │
                                                    │     Flow       │
                                                    │  (Consumer)    │
                                                    └────────┬───────┘

                                                             v
                                              ┌──────────────────────────┐
                                              │  Process Only Changes:   │
                                              │  • Update downstream     │
                                              │  • Send notifications    │
                                              │  • Execute business logic│
                                              └──────────────────────────┘

Stage 1: Poll and Store (The Producer)

Your first flow just blindly fetches data and throws it at the wall:

  1. Poll the external API or database on a timer.
  2. Push every single record to the Entity Store.
  3. The Entity Store does the heavy lifting of comparing the JSON.

Stage 2: Process Changes (The Consumer)

Your second flow actually does the work, but only when told to:

  1. Subscribe to the Entity Store changefeed.
  2. Wake up only when a record is created, updated, or deleted.
  3. Run your business logic (sync to Salesforce, send an email, etc.).

A Practical Example

Let’s say you’re syncing a list of 100 customers:

  • 10:00 AM - You poll the API. All 100 customers are new. The Entity Store emits 100 Created events. Your downstream flow runs 100 times.
  • 10:05 AM - You poll the API again, getting all 100 customers. Only 2 of them have changes. The Entity Store emits 2 Updated events. Your downstream flow runs 2 times.
  • 10:10 AM - You poll the API again. Nobody changed anything. The Entity Store emits 0 events. Your downstream flow stays asleep.

Result: Producer runs every 5 minutes polling 100 records, but consumer only processes the 7 records that actually changed, saving 93 downstream API calls.

Use Cases

Polling APIs Without Timestamps

Many systems don’t provide “last modified” timestamps or “changed since” query parameters:

# Producer: Poll external API every 5 minutes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-customer-api
spec:
  Schedule: "*/5 * * * *"  # Every 5 minutes
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "https://api.example.com/customers"
          - split:
              simple: "${body}"
              steps:
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "customer"
                      entityId: "${body[customerId]}"
                      source: "https://api.example.com/customers"
# Consumer: Process only actual changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: process-customer-changes
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "customer"
        steps:
          - log: "Customer changed: ${body}"
          - to: "salesforce:update-customer"

The producer polls every 5 minutes and pushes all data to the Entity Store. The consumer only receives events when customer data has actually changed, preventing unnecessary processing and downstream API calls.

Polling Databases Without CDC

For databases where Change Data Capture (CDC) isn’t available or practical:

# Producer: Poll database every minute
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-legacy-db
spec:
  Schedule: "* * * * *"  # Every minute
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "sql:SELECT * FROM orders?dataSource=#myDataSource"
          - split:
              simple: "${body}"
              steps:
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "order"
                      entityId: "${body[order_id]}"
                      source: "legacy-database"
# Consumer: React to actual order changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: process-order-changes
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "order"
        steps:
          - choice:
              when:
                - simple: "${body[Operation]} == 'Created'"
                  steps:
                    - to: "direct:send-order-confirmation"
                - simple: "${body[Operation]} == 'Updated'"
                  steps:
                    - to: "direct:update-shipping-status"

File System Monitoring

Monitor file systems or S3 buckets for content changes:

# Producer: Poll S3 bucket for files
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-s3-files
spec:
  Schedule: "*/10 * * * *"  # Every 10 minutes
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "aws2-s3://my-bucket?operation=listObjects"
          - split:
              simple: "${body}"
              steps:
                - to: "aws2-s3://my-bucket?operation=getObject"
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "s3-file"
                      entityId: "${body[key]}"
                      source: "s3://my-bucket"
# Consumer: Process only changed files
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: process-file-changes
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "s3-file"
        steps:
          - log: "File changed: ${body[EntityId]}"
          - to: "direct:process-file"

IoT Sensor Data Deduplication

Poll IoT sensors and only react to actual state changes:

# Producer: Poll sensors frequently
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-sensors
spec:
  Schedule: "*/30 * * * * *"  # Every 30 seconds
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "mqtt:sensors/temperature/#"
          - split:
              simple: "${body}"
              steps:
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "sensor-reading"
                      entityId: "${body[sensorId]}"
                      source: "mqtt://sensors"
# Consumer: Alert only on actual temperature changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: temperature-alerts
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "sensor-reading"
        steps:
          - filter:
              simple: "${body[Data][temperature]} > 80"
              steps:
                - to: "direct:send-alert"

Configuration

Required Kamelets

Entity Store requires two Kamelets to be available in your Weik.io installation:

  • entitystore-sink - Pushes data to Entity Store
  • entitystore-source - Consumes changefeed events from Entity Store

These Kamelets are provided with Weik.io and use NATS as the underlying message broker.

Entity Types

Entity types categorize your data and allow multiple producers and consumers to work with different data types:

# Producer for customers
parameters:
  entityType: "customer"

# Consumer for customers
parameters:
  entityType: "customer"

# Different producer for orders
parameters:
  entityType: "order"

Each entity type maintains its own changefeed stream, allowing independent processing.

Changefeed Event Format

When consuming from the Entity Store changefeed using kamelet:entitystore-source, events are structured as follows:

{
  "EntityType": "customer",
  "EntityId": "12345",
  "Timestamp": "2025-01-13T10:30:00Z",
  "Operation": "Created",
  "Data": {
    "customerId": "12345",
    "name": "John Doe",
    "email": "john@example.com"
  }
}

Event Fields:

  • EntityType - The entity type specified in the sink
  • EntityId - The unique identifier for the entity
  • Timestamp - When the change was detected
  • Operation - Type of change: Created, Updated, or Deleted
  • Data - The actual entity data that was pushed to the sink

Accessing Data in Routes:

# Access operation type
- simple: "${body[Operation]} == 'Created'"

# Access entity ID
- simple: "${body[EntityId]}"

# Access entity data fields
- simple: "${body[Data][customerId]}"
- simple: "${body[Data][email]}"

Storage Statistics

The Entity Store provides real-time statistics:

  • Entities Stored - Current number of unique entities
  • Entities with History - Total entities including historical versions
  • History Kept - Number of historical versions maintained (default: 64)
  • Current Size - Storage space used
  • Maximum Age - Data retention period (default: unlimited)

Querying Entities

Search for stored entities using:

  • Entity Type - Filter by entity type or category
  • Entity ID - Look up specific entities by identifier
  • Changed Since - Find entities modified after a specific date

Benefits Over Direct Polling

Without Entity Store

# Direct polling - processes everything every time
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: direct-poll
spec:
  Schedule: "*/5 * * * *"
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "https://api.example.com/customers"
          - split:
              simple: "${body}"
              steps:
                - to: "salesforce:sync"  # Called for ALL customers every 5 minutes

Problems:

  • Processes unchanged data repeatedly
  • Wastes API quotas on downstream systems
  • Increases costs and latency
  • Difficult to implement proper change detection logic

With Entity Store

# Producer: Poll and detect changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-customers
spec:
  Schedule: "*/5 * * * *"
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          - to: "https://api.example.com/customers"
          - split:
              simple: "${body}"
              steps:
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "customer"
                      entityId: "${body[customerId]}"
                      source: "https://api.example.com/customers"
# Consumer: Process only changes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: sync-changes
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "customer"
        steps:
          - to: "salesforce:sync"  # Only called for changed customers

Benefits:

  • Only processes actual changes
  • Reduces downstream API calls by 90%+ in typical scenarios
  • Built-in change detection logic
  • Maintains audit trail automatically
  • Decouples polling from processing

Best Practices

Entity Identification

Ensure each entity has a unique identifier for proper change tracking:

# Use natural keys from your source system
- to:
    uri: "kamelet:entitystore-sink"
    parameters:
      entityType: "customer"
      entityId: "${body[customerId]}"  # Natural key
      source: "my-api"

# Or use composite identifiers
- to:
    uri: "kamelet:entitystore-sink"
    parameters:
      entityType: "order"
      entityId: "${body[orderId]}-${body[region]}"  # Composite key
      source: "order-api"

Polling Frequency

Choose polling frequency based on your requirements:

  • High-frequency polling (every minute) - Entity Store prevents downstream overload
  • Low-frequency polling (every hour) - Entity Store accumulates changes for batch processing
  • Mixed frequency - Different producers can poll at different rates

Error Handling

Implement error handling in both producer and consumer flows:

# Producer with error handling
- route:
    from:
      uri: timer:poll
    steps:
      - doTry:
          steps:
            - to: "https://api.example.com/data"
            - split:
                simple: "${body}"
                steps:
                  - to:
                      uri: "kamelet:entitystore-sink"
                      parameters:
                          entityType: "data"
                        entityId: "${body[id]}"
                        source: "api"
          doCatch:
            - exception: java.lang.Exception
              steps:
                - log: "Polling failed: ${exception.message}"
                - to: "direct:alert"

Comparison with Database Change Tracking

FeatureEntity StoreDatabase Change Tracking
SourceAny system (APIs, files, databases)Databases only
Change DetectionApplication-level data comparisonDatabase-level CDC
SetupNo database configuration neededRequires CDC enabled on database
LatencyDepends on polling frequencyNear real-time
OverheadMinimal (storage only)Database performance impact
Use CaseSystems without CDC/webhooksDatabases with CDC support

Use Entity Store when:

  • Source system doesn’t support CDC or webhooks
  • Polling is the only available option
  • You need change detection across multiple diverse sources
  • Source is not a database (API, file, message queue, etc.)

Use Database Change Tracking when:

  • Source is a database with CDC support
  • Near real-time change detection is required
  • You control the database configuration
  • Minimal latency is critical

See Database Change Tracking for database-specific CDC patterns.

Complete Example

Here’s a complete working example that polls a REST API and processes only changes:

# poll-products.yaml
# Producer: Polls product API every 5 minutes
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: poll-products
  description: Poll products from external API
spec:
  Schedule: "*/5 * * * *"
  Route:
    - route:
        from:
          uri: timer:poll
        steps:
          # Fetch products from API
          - to:
              uri: "https://api.example.com/products"
              parameters:
                httpMethod: GET

          # Parse JSON response
          - unmarshal:
              json: {}

          # Split into individual products
          - split:
              simple: "${body}"
              steps:
                # Push each product to Entity Store
                - to:
                    uri: "kamelet:entitystore-sink"
                    parameters:
                      entityType: "product"
                      entityId: "${body[productId]}"
                      source: "https://api.example.com/products"
# process-product-changes.yaml
# Consumer: Processes only changed products
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: process-product-changes
  description: Process product changes and sync to warehouse
spec:
  Route:
    - route:
        from:
          uri: "kamelet:entitystore-source"
          parameters:
            entityType: "product"
        steps:
          # Log the change
          - log:
              message: "Product ${body[Operation]}: ${body[EntityId]}"

          # Route based on operation type
          - choice:
              when:
                # New product created
                - simple: "${body[Operation]} == 'Created'"
                  steps:
                    - log: "Creating new product in warehouse"
                    - to: "direct:create-warehouse-product"

                # Product updated
                - simple: "${body[Operation]} == 'Updated'"
                  steps:
                    - log: "Updating product in warehouse"
                    - to: "direct:update-warehouse-product"

                # Product deleted
                - simple: "${body[Operation]} == 'Deleted'"
                  steps:
                    - log: "Archiving product in warehouse"
                    - to: "direct:archive-warehouse-product"

Result: The producer polls 1000 products every 5 minutes. Entity Store detects that only 3 products changed. The consumer processes only those 3 products, saving 997 unnecessary warehouse API calls.

What’s Next