Database Change Tracking
Monitor and capture database changes in real-time
Database Change Tracking enables real-time monitoring of database changes using Apache Camel with Debezium connectors. Changes are automatically published to NATS for event-driven integration patterns.
What is Database Change Tracking?
Database Change Tracking captures INSERT, UPDATE, and DELETE operations from databases using Change Data Capture (CDC) technology. Weik.io leverages Apache Camel’s Debezium integration to provide reliable, low-latency change detection across multiple database systems.
Supported Databases
- PostgreSQL
- MySQL/MariaDB
- SQL Server
- Oracle
- MongoDB
Additional databases supported through Debezium connectors.
Key Features
- Real-time Change Capture - Detect database changes as they happen using CDC
- Automatic NATS Publishing - Changes automatically published to NATS streams
- Transaction Support - Maintain transaction boundaries in change events
- Schema Evolution - Handle database schema changes automatically
- Multiple Tables - Monitor multiple tables with separate configurations
How It Works
- Configure Change Tracking - Define DatabaseChangeTracking resource with database connection
- Automatic Integration Flow - Weik.io creates Apache Camel integration flow with Debezium connector
- Automatic NATS Stream - NATS JetStream stream created for change events
- Publish Changes - Database changes captured and published to NATS
- Consume Events - Integration flows subscribe to NATS subjects to process changes
Basic Configuration
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: postgres-production
description: Track changes on production database
spec:
type: postgres
parameters:
databaseHostname: db.example.com
databasePort: "5432"
databaseDbname: productiondb
databaseUser: weikio_user
databasePassword: secure_password
tableIncludeList: public.customers
This configuration:
- Creates a Debezium connector for PostgreSQL
- Monitors the
public.customerstable - Creates NATS stream
dbchangetracking-postgres-production-stream - Publishes changes to
dbchangetracking.postgres-production
Using Variables for Credentials
Variables provide secure credential management using environment variable substitution:
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: postgres-secure
spec:
type: postgres
parameters:
databaseHostname: "{{dbHost}}"
databasePort: "5432"
databaseDbname: "{{dbName}}"
databaseUser: "{{dbUser}}"
databasePassword: "{{dbPassword}}"
tableIncludeList: public.orders
variables:
- dbHost: "{{sys:DB_HOST}}"
- dbName: "{{sys:DB_NAME}}"
- dbUser: "{{sys:DB_USERNAME}}"
- dbPassword: "{{sys:DB_PASSWORD}}"
Set environment variables before starting Weik.io:
export DB_HOST="db.production.com"
export DB_NAME="productiondb"
export DB_USERNAME="weikio_user"
export DB_PASSWORD="secure_password"
Variables use double-brace syntax {{variableName}} and can reference system environment variables with the sys: prefix.
See Using Variables for detailed examples.
Database-Specific Examples
PostgreSQL
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: postgres-orders
spec:
type: postgres
parameters:
databaseHostname: postgres.example.com
databasePort: "5432"
databaseDbname: ecommerce
databaseUser: weikio
databasePassword: "{{sys:POSTGRES_PASSWORD}}"
schemaIncludeList: public
tableIncludeList: public.orders,public.order_items
Parameters:
- Uses logical replication with pgoutput plugin (automatically configured)
tableIncludeListformat:schema.table(comma-separated)- Requires database with logical replication enabled
Debezium PostgreSQL Documentation
MySQL
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: mysql-inventory
spec:
type: mysql
parameters:
databaseHostname: mysql.example.com
databasePort: "3306"
databaseUser: weikio
databasePassword: "{{sys:MYSQL_PASSWORD}}"
databaseServerName: mysql-prod
databaseIncludeList: inventory
tableIncludeList: inventory.products,inventory.stock
Parameters:
- Uses binary log (binlog) parsing
databaseServerNameis required and must be uniquetableIncludeListformat:database.table- Requires binary logging enabled
SQL Server
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: sqlserver-customers
spec:
type: sqlserver
parameters:
databaseHostname: sqlserver.example.com
databasePort: "1433"
databaseUser: sa
databasePassword: "{{sys:SQLSERVER_PASSWORD}}"
databaseDbname: CRM
tableIncludeList: dbo.Customers,dbo.Contacts
Parameters:
- Uses SQL Server CDC
tableIncludeListformat:schema.table- Requires CDC enabled on database and tables
Debezium SQL Server Documentation
MongoDB
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: mongodb-events
spec:
type: mongodb
parameters:
mongodbConnectionString: "{{connectionString}}"
collectionIncludeList: mydb.events,mydb.notifications
variables:
- connectionString: "{{sys:MONGODB_URI}}"
Parameters:
- Uses MongoDB change streams
collectionIncludeListformat:database.collection- Requires replica set or sharded cluster
Debezium MongoDB Documentation
Oracle
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: oracle-crm
spec:
type: oracle
parameters:
databaseHostname: oracle.example.com
databasePort: "1521"
databaseUser: weikio
databasePassword: "{{sys:ORACLE_PASSWORD}}"
databaseDbname: ORCL
schemaIncludeList: CRM_SCHEMA
tableIncludeList: CRM_SCHEMA.CUSTOMERS
Parameters:
- Uses LogMiner or Xstream
tableIncludeListformat:SCHEMA.TABLE(uppercase recommended)- Requires supplemental logging enabled
NATS Integration
DatabaseChangeTracking automatically creates and manages NATS resources. No additional NATS configuration required.
Automatic NATS Resources
For a DatabaseChangeTracking named postgres-orders:
NATS Stream:
dbchangetracking-postgres-orders-stream- Storage: File-based persistence
- Max messages: 512
- Retention: Limits-based (oldest messages discarded)
NATS Subject:
dbchangetracking.postgres-orders- Change events published here
Republish Subject:
dbchangetrackingprocessing.postgres-orders- Events automatically republished for processing
Consuming Change Events
Subscribe to the NATS subject to process changes:
apiVersion: weik.io/v1alpha1
kind: IntegrationFlow
metadata:
name: process-order-changes
spec:
Description: Process order changes from PostgreSQL
# Subscribe to: dbchangetracking.postgres-orders
Or use Apache Camel routes:
- route:
from:
uri: nats:dbchangetracking.postgres-orders
parameters:
servers: localhost
steps:
- log:
message: "Received change: ${body}"
Change Event Structure
Change events are JSON-formatted Debezium events containing:
{
"op": "c",
"ts_ms": 1678901234567,
"before": null,
"after": {
"id": 123,
"name": "John Doe",
"email": "john@example.com",
"created_at": "2025-01-01T00:00:00Z"
},
"source": {
"version": "2.3.0.Final",
"connector": "postgresql",
"name": "postgres-orders",
"ts_ms": 1678901234567,
"db": "ecommerce",
"schema": "public",
"table": "customers"
}
}
Operation Types:
c- CREATE (INSERT)u- UPDATEd- DELETEr- READ (initial snapshot)
Event Fields:
op- Operation typebefore- Row state before change (UPDATE, DELETE)after- Row state after change (INSERT, UPDATE)ts_ms- Timestamp of changesource- Metadata about the change source
Use Cases
Real-time Data Synchronization
Sync data changes to external systems:
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: crm-sync
spec:
type: postgres
parameters:
databaseHostname: "{{sys:CRM_DB_HOST}}"
databaseDbname: crm
databaseUser: "{{sys:DB_USER}}"
databasePassword: "{{sys:DB_PASSWORD}}"
tableIncludeList: public.customers,public.contacts
Audit Logging
Track all changes for compliance:
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: audit-trail
spec:
type: sqlserver
parameters:
databaseHostname: "{{sys:DB_HOST}}"
databaseDbname: Financial
databaseUser: "{{sys:DB_USER}}"
databasePassword: "{{sys:DB_PASSWORD}}"
tableIncludeList: dbo.Transactions,dbo.Accounts
Cache Invalidation
Invalidate caches when data changes:
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: cache-invalidation
spec:
type: mysql
parameters:
databaseHostname: "{{sys:DB_HOST}}"
databaseUser: "{{sys:DB_USER}}"
databasePassword: "{{sys:DB_PASSWORD}}"
databaseServerName: cache-source
databaseIncludeList: products
tableIncludeList: products.catalog
Search Index Updates
Keep search indexes in sync:
apiVersion: weik.io/v1alpha1
kind: DatabaseChangeTracking
metadata:
name: search-indexing
spec:
type: postgres
parameters:
databaseHostname: "{{sys:DB_HOST}}"
databaseDbname: ecommerce
databaseUser: "{{sys:DB_USER}}"
databasePassword: "{{sys:DB_PASSWORD}}"
tableIncludeList: public.products,public.categories
Best Practices
Security
- Use variables with environment variables for credentials
- Never hardcode passwords in configuration files
- Use database users with minimal required permissions (read-only when possible)
- Enable SSL/TLS for database connections
- Rotate credentials regularly
- Monitor who subscribes to change event streams
Performance
- Monitor NATS stream sizes and message rates
- Consider database performance impact of CDC
- Use specific table include lists (avoid monitoring entire databases)
- Test in development before production deployment
- Monitor the generated integration flow performance
Configuration
- Use descriptive names for tracking configurations
- Document which tables are being monitored and why
- Use Debezium’s filtering capabilities to reduce event volume
- Review database-specific requirements in Debezium documentation
- Plan for database maintenance windows (CDC may need reconfiguration)
Monitoring
Monitor the automatically created integration flows:
# List integration flows
weikio integration flows ls
# Check specific flow (named {tracking-name}-flow)
weikio integration flows status postgres-orders-flow
# View flow logs
weikio integration flows logs postgres-orders-flow
Monitor NATS streams:
# Check NATS stream status
nats stream info dbchangetracking-postgres-orders-stream
Debezium Parameters
All parameters in the parameters section are passed directly to the Debezium connector. Parameter names use Debezium’s camelCase convention:
databaseHostname(not Host or hostname)databaseDbname(not Database)databaseUser(not Username)databasePassword(not Password)tableIncludeList(not Table)
Refer to the Debezium documentation for your database type for complete parameter lists and advanced configuration options.
Integration with Other Features
- Integration Flows - Process change events with custom logic
- Variables - Secure credential management
- NATS - Automatic event publishing and streaming
- Entity Store - Store change history for audit trails
What’s Next
- Configuration Reference - Complete parameter documentation
- Using Variables - Variable substitution patterns
- Integration Flows - Define flows to process changes
- Integration Overview - Learn about other integration features