Database Change Tracking

Monitor and capture database changes in real-time

Database Change Tracking enables real-time monitoring of database changes using Apache Camel with Debezium connectors. Changes are automatically published to NATS for event-driven integration patterns.

What is Database Change Tracking?

Database Change Tracking captures INSERT, UPDATE, and DELETE operations from databases using Change Data Capture (CDC) technology. Weik.io leverages Apache Camel’s Debezium integration to provide reliable, low-latency change detection across multiple database systems.

Supported Databases

  • PostgreSQL
  • MySQL/MariaDB
  • SQL Server
  • Oracle
  • MongoDB

Additional databases supported through Debezium connectors.

Key Features

  • Real-time Change Capture - Detect database changes as they happen using CDC
  • Automatic NATS Publishing - Changes automatically published to NATS streams
  • Transaction Support - Maintain transaction boundaries in change events
  • Schema Evolution - Handle database schema changes automatically
  • Multiple Tables - Monitor multiple tables with separate configurations

How It Works

  1. Configure Change Tracking - Define DatabaseChangeTracking resource with database connection
  2. Automatic Integration Flow - Weik.io creates Apache Camel integration flow with Debezium connector
  3. Automatic NATS Stream - NATS JetStream stream created for change events
  4. Publish Changes - Database changes captured and published to NATS
  5. Consume Events - Integration flows subscribe to NATS subjects to process changes

Basic Configuration

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: postgres-production
  description: Track changes on production database
spec:
  type: postgres
  parameters:
    databaseHostname: db.example.com
    databasePort: "5432"
    databaseDbname: productiondb
    databaseUser: weikio_user
    databasePassword: secure_password
    tableIncludeList: public.customers

This configuration:

  • Creates a Debezium connector for PostgreSQL
  • Monitors the public.customers table
  • Creates NATS stream dbchangetracking-postgres-production-stream
  • Publishes changes to dbchangetracking.postgres-production

Using Variables for Credentials

Variables provide secure credential management using environment variable substitution:

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: postgres-secure
spec:
  type: postgres
  parameters:
    databaseHostname: "{{dbHost}}"
    databasePort: "5432"
    databaseDbname: "{{dbName}}"
    databaseUser: "{{dbUser}}"
    databasePassword: "{{dbPassword}}"
    tableIncludeList: public.orders
  variables:
    - dbHost: "{{sys:DB_HOST}}"
    - dbName: "{{sys:DB_NAME}}"
    - dbUser: "{{sys:DB_USERNAME}}"
    - dbPassword: "{{sys:DB_PASSWORD}}"

Set environment variables before starting Weik.io:

export DB_HOST="db.production.com"
export DB_NAME="productiondb"
export DB_USERNAME="weikio_user"
export DB_PASSWORD="secure_password"

Variables use double-brace syntax {{variableName}} and can reference system environment variables with the sys: prefix.

See Using Variables for detailed examples.

Database-Specific Examples

PostgreSQL

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: postgres-orders
spec:
  type: postgres
  parameters:
    databaseHostname: postgres.example.com
    databasePort: "5432"
    databaseDbname: ecommerce
    databaseUser: weikio
    databasePassword: "{{sys:POSTGRES_PASSWORD}}"
    schemaIncludeList: public
    tableIncludeList: public.orders,public.order_items

Parameters:

  • Uses logical replication with pgoutput plugin (automatically configured)
  • tableIncludeList format: schema.table (comma-separated)
  • Requires database with logical replication enabled

Debezium PostgreSQL Documentation

MySQL

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: mysql-inventory
spec:
  type: mysql
  parameters:
    databaseHostname: mysql.example.com
    databasePort: "3306"
    databaseUser: weikio
    databasePassword: "{{sys:MYSQL_PASSWORD}}"
    databaseServerName: mysql-prod
    databaseIncludeList: inventory
    tableIncludeList: inventory.products,inventory.stock

Parameters:

  • Uses binary log (binlog) parsing
  • databaseServerName is required and must be unique
  • tableIncludeList format: database.table
  • Requires binary logging enabled

Debezium MySQL Documentation

SQL Server

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: sqlserver-customers
spec:
  type: sqlserver
  parameters:
    databaseHostname: sqlserver.example.com
    databasePort: "1433"
    databaseUser: sa
    databasePassword: "{{sys:SQLSERVER_PASSWORD}}"
    databaseDbname: CRM
    tableIncludeList: dbo.Customers,dbo.Contacts

Parameters:

  • Uses SQL Server CDC
  • tableIncludeList format: schema.table
  • Requires CDC enabled on database and tables

Debezium SQL Server Documentation

MongoDB

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: mongodb-events
spec:
  type: mongodb
  parameters:
    mongodbConnectionString: "{{connectionString}}"
    collectionIncludeList: mydb.events,mydb.notifications
  variables:
    - connectionString: "{{sys:MONGODB_URI}}"

Parameters:

  • Uses MongoDB change streams
  • collectionIncludeList format: database.collection
  • Requires replica set or sharded cluster

Debezium MongoDB Documentation

Oracle

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: oracle-crm
spec:
  type: oracle
  parameters:
    databaseHostname: oracle.example.com
    databasePort: "1521"
    databaseUser: weikio
    databasePassword: "{{sys:ORACLE_PASSWORD}}"
    databaseDbname: ORCL
    schemaIncludeList: CRM_SCHEMA
    tableIncludeList: CRM_SCHEMA.CUSTOMERS

Parameters:

  • Uses LogMiner or Xstream
  • tableIncludeList format: SCHEMA.TABLE (uppercase recommended)
  • Requires supplemental logging enabled

Debezium Oracle Documentation

NATS Integration

DatabaseChangeTracking automatically creates and manages NATS resources. No additional NATS configuration required.

Automatic NATS Resources

For a DatabaseChangeTracking named postgres-orders:

  1. NATS Stream: dbchangetracking-postgres-orders-stream

    • Storage: File-based persistence
    • Max messages: 512
    • Retention: Limits-based (oldest messages discarded)
  2. NATS Subject: dbchangetracking.postgres-orders

    • Change events published here
  3. Republish Subject: dbchangetrackingprocessing.postgres-orders

    • Events automatically republished for processing

Consuming Change Events

Subscribe to the NATS subject to process changes:

apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
  name: process-order-changes
spec:
  Description: Process order changes from PostgreSQL
  # Subscribe to: dbchangetracking.postgres-orders

Or use Apache Camel routes:

- route:
    from:
      uri: nats:dbchangetracking.postgres-orders
      parameters:
        servers: localhost
    steps:
      - log:
          message: "Received change: ${body}"

Change Event Structure

Change events are JSON-formatted Debezium events containing:

{
  "op": "c",
  "ts_ms": 1678901234567,
  "before": null,
  "after": {
    "id": 123,
    "name": "John Doe",
    "email": "john@example.com",
    "created_at": "2025-01-01T00:00:00Z"
  },
  "source": {
    "version": "2.3.0.Final",
    "connector": "postgresql",
    "name": "postgres-orders",
    "ts_ms": 1678901234567,
    "db": "ecommerce",
    "schema": "public",
    "table": "customers"
  }
}

Operation Types:

  • c - CREATE (INSERT)
  • u - UPDATE
  • d - DELETE
  • r - READ (initial snapshot)

Event Fields:

  • op - Operation type
  • before - Row state before change (UPDATE, DELETE)
  • after - Row state after change (INSERT, UPDATE)
  • ts_ms - Timestamp of change
  • source - Metadata about the change source

Use Cases

Real-time Data Synchronization

Sync data changes to external systems:

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: crm-sync
spec:
  type: postgres
  parameters:
    databaseHostname: "{{sys:CRM_DB_HOST}}"
    databaseDbname: crm
    databaseUser: "{{sys:DB_USER}}"
    databasePassword: "{{sys:DB_PASSWORD}}"
    tableIncludeList: public.customers,public.contacts

Audit Logging

Track all changes for compliance:

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: audit-trail
spec:
  type: sqlserver
  parameters:
    databaseHostname: "{{sys:DB_HOST}}"
    databaseDbname: Financial
    databaseUser: "{{sys:DB_USER}}"
    databasePassword: "{{sys:DB_PASSWORD}}"
    tableIncludeList: dbo.Transactions,dbo.Accounts

Cache Invalidation

Invalidate caches when data changes:

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: cache-invalidation
spec:
  type: mysql
  parameters:
    databaseHostname: "{{sys:DB_HOST}}"
    databaseUser: "{{sys:DB_USER}}"
    databasePassword: "{{sys:DB_PASSWORD}}"
    databaseServerName: cache-source
    databaseIncludeList: products
    tableIncludeList: products.catalog

Search Index Updates

Keep search indexes in sync:

apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
  name: search-indexing
spec:
  type: postgres
  parameters:
    databaseHostname: "{{sys:DB_HOST}}"
    databaseDbname: ecommerce
    databaseUser: "{{sys:DB_USER}}"
    databasePassword: "{{sys:DB_PASSWORD}}"
    tableIncludeList: public.products,public.categories

Best Practices

Security

  • Use variables with environment variables for credentials
  • Never hardcode passwords in configuration files
  • Use database users with minimal required permissions (read-only when possible)
  • Enable SSL/TLS for database connections
  • Rotate credentials regularly
  • Monitor who subscribes to change event streams

Performance

  • Monitor NATS stream sizes and message rates
  • Consider database performance impact of CDC
  • Use specific table include lists (avoid monitoring entire databases)
  • Test in development before production deployment
  • Monitor the generated integration flow performance

Configuration

  • Use descriptive names for tracking configurations
  • Document which tables are being monitored and why
  • Use Debezium’s filtering capabilities to reduce event volume
  • Review database-specific requirements in Debezium documentation
  • Plan for database maintenance windows (CDC may need reconfiguration)

Monitoring

Monitor the automatically created integration flows:

# List integration flows
weikio integration flows ls

# Check specific flow (named {tracking-name}-flow)
weikio integration flows status postgres-orders-flow

# View flow logs
weikio integration flows logs postgres-orders-flow

Monitor NATS streams:

# Check NATS stream status
nats stream info dbchangetracking-postgres-orders-stream

Debezium Parameters

All parameters in the parameters section are passed directly to the Debezium connector. Parameter names use Debezium’s camelCase convention:

  • databaseHostname (not Host or hostname)
  • databaseDbname (not Database)
  • databaseUser (not Username)
  • databasePassword (not Password)
  • tableIncludeList (not Table)

Refer to the Debezium documentation for your database type for complete parameter lists and advanced configuration options.

Integration with Other Features

  • Integration Flows - Process change events with custom logic
  • Variables - Secure credential management
  • NATS - Automatic event publishing and streaming
  • Entity Store - Store change history for audit trails

What’s Next