Skip to content

db2: add CGO-free IBM DB2 CDC input connector#4414

Open
emaxerrno wants to merge 10 commits into
mainfrom
db2-ai
Open

db2: add CGO-free IBM DB2 CDC input connector#4414
emaxerrno wants to merge 10 commits into
mainfrom
db2-ai

Conversation

@emaxerrno
Copy link
Copy Markdown
Contributor

@emaxerrno emaxerrno commented May 8, 2026

Summary

A new enterprise db2_cdc input connector for IBM DB2 LUW. Uses CGO-free DB2 CLI bindings via github.com/ebitengine/purego — the IBM DB2 shared library (libdb2.so.1 / libdb2.dylib / db2cli.dll) is loaded at runtime via dlopen/LoadLibrary, so no C toolchain or IBM SDK is needed at build time.

What it does

  • Streams INSERT/UPDATE/DELETE events from IBM DB2 SQL Replication (ASNCDC change tables)
  • Full-table initial snapshot and Debezium-parity incremental snapshot with crash-safe resume
  • Emits Debezium-compatible JSON envelopes (op, before, after, source, ts_ms) for drop-in compatibility with existing Debezium DB2 connector consumers
  • Checkpoint persistence in a DB2 table or benthos cache; auto-created on first run
  • Multi-schema streaming: one connector instance monitors multiple DB2 schemas

Architecture (layered, 6 commits)

db2cli/          ← purego ODBC wrappers (SQLAllocHandle, SQLFetch, SQLGetData, SQLCancel, …)
driver.go        ← column value decoding (BIGINT→json.Number, BLOB streaming, VARGRAPHIC UTF-16LE→UTF-8)
replication/     ← CDC streaming engine + snapshot engine
input_db2_cdc.go ← Redpanda Connect input: checkpoint, signals, envelope, modes

Output format

Each message is a Debezium-compatible JSON envelope:

{
  "op": "u",
  "before": {"EMP_ID": 7, "SALARY": 90000},
  "after":  {"EMP_ID": 7, "SALARY": 95000},
  "source": {
    "connector": "db2",
    "name": "db2_cdc",
    "schema": "DB2INST1",
    "table": "EMPLOYEES",
    "snapshot": "false",
    "commit_lsn": "CSN:000000000000C350",
    "ts_ms": 1747526400000
  },
  "ts_ms": 1747526400000
}

op values: c (insert), u (update with before populated), d (delete), r (snapshot read), heartbeat, schema_change.

Message metadata keys: db2_op, db2_table, db2_schema, db2_csn, db2_intent_seq, db2_snapshot, db2_connector.

Key features

Snapshot modes

Mode Behaviour
initial Full snapshot of all tables, then stream CDC (default)
never Stream CDC from checkpoint CSN only
when_needed Initial snapshot if no checkpoint exists, otherwise stream
initial_only Snapshot then stop cleanly (no streaming phase)
no_data Skip existing rows; stream only future changes

Incremental snapshot (Debezium parity)

Implements the Netflix DBLog / Debezium open-window deduplication algorithm for DB2:

  1. lowCSN = MAX(SYNCHPOINT) before chunk SELECT
  2. SELECT * FROM table WHERE pk > lastKey ORDER BY pk FETCH FIRST N ROWS
  3. highCSN = MAX(SYNCHPOINT) after chunk SELECT
  4. CDC events with CSN in (lowCSN, highCSN] evict their rows from the window
  5. Surviving rows emitted as op=r; checkpoint saves LastEmittedPK after each chunk
  6. On restart, resumes from last checkpoint — no full re-scan

Correctness: composite (CSN, IntentSeq) watermark

Unlike Debezium DB2, polling uses (IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ) as a composite watermark. This prevents event loss when a single transaction writes more rows than poll_batch_size — a gap Debezium does not close.

Signal table

Supports Debezium-compatible execute-snapshot signals plus extended lifecycle controls:

Signal type Action
execute-snapshot Queue tables for incremental snapshot
stop-snapshot Abort in-progress snapshot
pause-snapshot Pause snapshot between chunks
resume-snapshot Resume a paused snapshot

Multi-schema streaming

schemas: ["DB2INST1", "FINANCE", "HR"]

One connector polls change tables across all schemas in a single query; changeTables keyed as SCHEMA.TABLE.

Observability

  • db2_cdc_lag_ms gauge: milliseconds behind the latest DB2 commit timestamp
  • Per-chunk snapshot progress log: incremental snapshot chunk: table=… chunkRows=… evicted=… csnLow=… csnHigh=…
  • Idempotency key per message: schema:table:csn:intentSeq

Debezium gap matrix

Dimension Debezium DB2 This connector Status
CGO-free build No (JDBC/JVM) Yes (purego dlopen) ⬆ Ahead
(CSN, IntentSeq) pagination No (CSN only, drops rows on large txn) Yes ⬆ Ahead
Multi-schema single connector No (separate connectors) Yes (schemas: [...]) ⬆ Ahead
initial_only mode No Yes ⬆ Ahead
Snapshot parallelism 1 (sequential) Configurable (default 4) ⬆ Ahead
stop/pause/resume signals No Yes ⬆ Ahead
Incremental snapshot open-window dedup Yes Yes ✅ Parity
Per-chunk checkpoint + crash resume Yes Yes ✅ Parity
Signal deduplication Yes Yes ✅ Parity
UPDATE event with before populated Yes (op=u) Yes (op=u) ✅ Parity
Heartbeat events Yes Yes (configurable interval) ✅ Parity
Schema change events Yes (separate topic) Yes (inline op=schema_change) ✅ Parity
Lag metric Yes (JMX) Yes (db2_cdc_lag_ms gauge) ✅ Parity
BIGINT >2^53 precision Yes (Java Long) Yes (json.Number) ✅ Parity
BLOB streaming (no 4 KB truncation) Yes (JDBC streams) Yes (32 KB SQL_SUCCESS_WITH_INFO loop) ✅ Parity
VARGRAPHIC / DBCS decoding Yes (JDBC transparent) Yes (SQL_C_WCHAR UTF-16LE→UTF-8) ✅ Parity
Context-aware SQL cancel Yes Yes (SQLCancel registered; QueryerContext + ExecerContext on db2Conn) ✅ Parity

Files

File Description
internal/impl/db2/db2cli/ CGO-free DB2 CLI bindings via purego (constants, types, functions, errors)
internal/impl/db2/driver.go ODBC column value driver (type mapping, binary decoding, BIGINT/BLOB/VARGRAPHIC)
internal/impl/db2/replication/types.go CSN, OpType, ChangeEvent, Version types
internal/impl/db2/replication/stream.go CDC streaming engine (change tables, composite CSN pagination, LEAD/LAG event pairing)
internal/impl/db2/replication/snapshot.go Initial full-table snapshot (parallel, configurable isolation)
internal/impl/db2/replication/incremental.go Incremental snapshot engine (open-window dedup, checkpoint, Logger, Len)
internal/impl/db2/input_db2_cdc.go Redpanda Connect input — snapshot modes, multi-schema, signals, checkpoint KV, envelope, metrics
internal/impl/db2/db2test/db2test.go Shared test helper (DB2 container, TestMain, DSN)
internal/impl/db2/integration_test.go Integration tests: CDC round-trip, multi-schema, incremental snapshot (multi-chunk, concurrent DML, resume-after-restart)
scripts/run-db2-macos-integration-tests.sh macOS native integration test runner via IBM clidriver (no docker-exec)
public/components/db2/package.go Enterprise bundle registration
docs/modules/components/pages/inputs/db2_cdc.adoc Generated reference docs

Test plan

Unit tests (always run, no Docker required):

go test ./internal/impl/db2/... -run '^Test[^I]' -count=1 -timeout 60s

Integration tests — macOS (IBM clidriver, ~8 min first run, ~30s subsequent):

./scripts/run-db2-macos-integration-tests.sh --keep
# Single test:
./scripts/run-db2-macos-integration-tests.sh --keep '-test.run=^TestIntegrationDB2CDCIncrementalSnapshotMultiChunk$'

Integration tests — Linux:

INTEGRATION_TESTS=1 go test -v -run '^TestIntegration' ./internal/impl/db2/ -timeout 40m

Requires Docker and icr.io/db2_community/db2:latest (IBM license accepted via LICENSE=accept). Container runs with --privileged for DB2's IPC/shared-memory subsystem.

Integration test coverage:

  • TestIntegrationDB2CDCSnapshotAndStreaming — full CDC round-trip (insert/update/delete)
  • TestIntegrationDB2CDCMultiSchema — two schemas in one connector instance
  • TestIntegrationDB2CDCInitialSnapshot — keyset-paginated full snapshot
  • TestIntegrationDB2CDCIncrementalSnapshotMultiChunk — 20 rows, chunk=4, verifies no duplicates
  • TestIntegrationDB2CDCIncrementalSnapshotWithConcurrentInsert — new rows outside window pass through
  • TestIntegrationDB2CDCIncrementalSnapshotWithConcurrentUpdate — CDC dedup evicts in-window updates
  • TestIntegrationDB2CDCIncrementalSnapshotWithConcurrentDelete — CDC dedup evicts in-window deletes
  • TestIntegrationDB2CDCIncrementalSnapshotResumeAfterRestart — crash at chunk 2, resume, no duplicates

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented May 8, 2026

CLA assistant check
All committers have signed the CLA.

Comment thread internal/impl/db2/replication/snapshot.go Outdated
Comment thread internal/impl/db2/replication/snapshot.go Outdated
Comment thread go.mod Outdated
Comment thread internal/impl/db2/replication/snapshot.go Outdated
Comment thread internal/impl/db2/input_db2_cdc.go
Comment thread internal/impl/db2/replication/types_test.go
Comment thread internal/impl/db2/input_db2_cdc.go
Comment thread internal/impl/db2/input_db2_cdc.go Outdated
Comment thread internal/plugins/info.csv Outdated
Comment thread internal/impl/db2/driver.go
Comment thread internal/impl/db2/input_db2_cdc.go
Comment thread internal/impl/db2/replication/stream.go
Comment thread internal/impl/db2/integration_test.go Outdated
Comment thread internal/impl/db2/input_db2_cdc.go Outdated
Comment thread internal/impl/db2/input_db2_cdc.go
Comment thread internal/impl/db2/input_db2_cdc_test.go Outdated
Comment thread internal/impl/db2/input_db2_cdc.go Outdated
Comment thread internal/impl/db2/input_db2_cdc.go
Comment thread internal/impl/db2/input_db2_cdc.go
Comment thread internal/impl/db2/input_db2_cdc.go Outdated
Comment thread internal/impl/db2/replication/stream.go Outdated
@redpanda-data redpanda-data deleted a comment from claude Bot May 9, 2026
@redpanda-data redpanda-data deleted a comment from claude Bot May 9, 2026
@redpanda-data redpanda-data deleted a comment from claude Bot May 9, 2026
@redpanda-data redpanda-data deleted a comment from claude Bot May 9, 2026
@redpanda-data redpanda-data deleted a comment from claude Bot May 9, 2026
@redpanda-data redpanda-data deleted a comment from claude Bot May 9, 2026
@redpanda-data redpanda-data deleted a comment from claude Bot May 9, 2026
Comment thread internal/impl/db2/input_db2_cdc.go
emaxerrno and others added 3 commits May 19, 2026 20:47
Pure-Go ODBC bindings for the IBM DB2 CLI shared library using purego.
Registers all required SQL functions (SQLConnect, SQLExecDirect, SQLFetch,
SQLGetData, etc.) at runtime without CGo. Supports both macOS (dylib)
and Linux (so) library paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Defines ChangeEvent, OpType, CSN (commit sequence number), and Version.
CSN is a variable-length big-endian byte sequence (10 bytes on DB2 ≤11.x,
16 bytes on DB2 12.1+). Provides SQL hex literal encoding, comparison,
and null-CSN sentinel for watermark tracking.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Reads all rows from configured tables inside a single read-only
DB2 RR (Repeatable Read) transaction, ensuring cross-table consistency.
Supports configurable batch size, parallelism, isolation level, and
table filter functions. Captures the starting CSN from
ASNCDC.IBMSNAP_REGISTER before the first row so streaming can resume
from the exact snapshot point.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented May 20, 2026

Commits
LGTM

Review
Reviewed the new db2_cdc enterprise input connector across internal/impl/db2/ (input, db2cli/ purego bindings, replication/ snapshot+stream+incremental engines, driver.go) plus tests and registration. The connector is carefully written: RCL license headers throughout, correct info.csv row (enterprise, n, n), proper public wrapper, identifier validation before every SQL interpolation, sync.WaitGroup-tracked goroutine lifecycle, idempotent Close() guarded by closed flag, consistent runtime.KeepAlive calls in the FFI layer, and credential sanitization in driver error messages.

LGTM

Comment thread internal/impl/db2/input_db2_cdc.go Outdated
Comment thread internal/impl/db2/input_db2_cdc.go
Comment thread internal/impl/db2/replication/snapshot.go
Comment thread internal/impl/db2/replication/snapshot.go Outdated
Comment thread internal/impl/db2/replication/snapshot.go Outdated
@josephwoodward
Copy link
Copy Markdown
Contributor

I see the integration tests are currently being skipped with the following reason:

skipping DB2 integration test: shared container unavailable: loading DB2 library: create dir for db2systm: mkdir /database: permission denied

I haven't looked into it yet so I'm not sure if possible or not, but it'd be good to get these running in the CI pipeline.

Comment thread internal/impl/db2/driver.go
@claude
Copy link
Copy Markdown

claude Bot commented May 21, 2026

Commits

  1. Commit f57ed8a — subject line ends with a literal Unicode ellipsis (db2: add CDC streaming engine with change-table polling and event ord…) and the message body begins with …ering. The subject was truncated mid-word and is incomprehensible on its own (e.g. in git log --oneline, in changelogs, in PR / issue back-references). Please rephrase to a shorter complete sentence, e.g. db2: add CDC streaming engine or db2: add CDC streaming with change-table polling, and move the detail to the body.

Review

Large, well-structured enterprise connector with thorough internal validation (identifier validation against SQL injection, sanitized error logging, mutex/atomic discipline, idempotent close path), unit + integration test coverage, and proper registration (public/components/db2/package.go, public/components/all/package.go, internal/plugins/info.csv row with support=enterprise, cloud=n).

LGTM

emaxerrno and others added 3 commits May 20, 2026 23:05
…ering

Polls ASNCDC change tables for rows beyond the current watermark CSN.
Merges events from all registered tables, sorts by (CSN, IBMSNAP_INTENTSEQ)
for total order, and pairs D+I opcode pairs into UPDATE events.
Auto-detects IBMSNAP_COMMITSEQ byte length (10 for DB2 ≤11.x, 16 for
DB2 12.1+). Supports configurable poll batch size, backoff interval,
and table filter. Column metadata is cached per change table and
invalidated on schema changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements the Debezium/Netflix DBLog open-window algorithm for
exactly-once incremental snapshotting alongside a live CDC stream.
Each chunk is bracketed by (lowCSN, highCSN] watermarks; any CDC event
arriving within the window evicts the corresponding snapshot row,
guaranteeing each row is delivered exactly once as op="r" or superseded
by its CDC event. Includes chunk-level checkpoint persistence so a
connector restart resumes from the last fully-emitted primary key.
Supports NULL PK fast-fail, per-chunk progress logging, and a Logger
interface for observability.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements driver.Driver, driver.Conn, driver.Stmt, and driver.Rows
on top of the purego db2cli bindings, making the IBM DB2 CLI accessible
via the standard database/sql interface. Includes DSN credential
sanitization in error messages to prevent password leaks in logs.
Supports context-aware Prepare/Exec/Query paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment thread internal/impl/db2/input_db2_cdc.go Outdated
Comment thread internal/impl/db2/input_db2_cdc.go Outdated
@claude
Copy link
Copy Markdown

claude Bot commented May 21, 2026

Commits
LGTM

Review
New enterprise db2_cdc input connector for IBM DB2 LUW built on a CGO-free purego DB2 CLI binding. Layering looks clean: db2cli (raw ODBC), driver.go (database/sql adapter), replication/ (CDC engine + snapshot + incremental snapshot), input_db2_cdc.go (connector). License headers, info.csv classification (enterprise, not cloud-safe), bundle registration (public/components/all), and integration test packages entry all look correct.

The connector logic has clearly received careful attention to concurrency/lifecycle (shutdown signaller reset on reconnect, single auxDB for control-path, mutex discipline around d.db / d.streamer, wg.Wait before closing handles), watermark safety (composite (CSN, IntentSeq) pagination, computeSafeCSN handling truncated batches, pending D-row carry-over across polls), and SQL identifier hygiene (uppercase-only validators, QuoteDB2Identifier, defense-in-depth re-validation of catalog/checkpoint-sourced names before SQL embedding).

Two issues raised inline:

  1. Validation rejects tables: [] + table_include_regex: [] even though the tables field documentation says empty means dynamic discovery — users following the docs will hit an unexpected startup error.
  2. The comment above sql.Open("db2-cli", ...) claims the driver has no context-aware cancellation and links to a placeholder issue URL, but QueryContext/ExecContext with SQLCancel are implemented in this same PR.

Neither blocks merge; both are doc/comment fixes.

emaxerrno and others added 4 commits May 21, 2026 09:54
Full-featured IBM DB2 CDC connector built on SQL Replication (ASNCDC).
Supports initial full-table snapshot, live CDC streaming, and incremental
on-demand snapshots via a signal table (compatible with Debezium's
execute-snapshot protocol). Key capabilities:

- Snapshot modes: initial, always, never, no_data
- Configurable snapshot isolation (DB2 RR or CS), batch size, parallelism
- Per-chunk checkpoint persistence for crash-safe incremental snapshot resume
- Signal table: execute-snapshot, stop-snapshot, pause/resume-snapshot
- Table regex include/exclude filters
- Debezium-envelope output (op c/u/d/r, before/after, source metadata)
- Schema-change events with column-metadata cache invalidation
- Heartbeat events for consumer lag detection
- checkpoint_cache support (Redpanda Connect cache resource or DB2 table)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Shared test helpers (db2test package) manage the DB2 Docker container
lifecycle for integration tests, including ASNCDC pre-warm and
per-test table cleanup. TestMain in the integration suite shares a
single container across all tests to avoid 8-minute Docker init
overhead. The macOS runner script builds a native test binary that
uses the local IBM clidriver dylib instead of a Linux container,
enabling fast local iteration on Apple Silicon.

Integration tests cover: snapshot+streaming, multi-chunk incremental
snapshot, concurrent INSERT/UPDATE/DELETE during snapshot, and
resume-after-crash checkpoint recovery.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds db2_cdc to the enterprise component bundle and registers it in
the component info CSV (enterprise, not cloud-safe). Wires into the
integration test packages list.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Generated component documentation covering all configuration fields,
snapshot modes, signal table protocol, checkpoint cache options,
and example configurations.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment on lines +88 to +96
{
name: "empty tables rejected",
configYAML: `
dsn: "DATABASE=SAMPLE;HOSTNAME=db2host;PORT=50000;PROTOCOL=TCPIP;UID=db2inst1;PWD=secret"
schema: "DB2ADMIN"
tables: []
`,
errContains: "either tables or table_include_regex must be specified",
},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases (and "neither tables nor include_regex rejected" at lines 355-362) expect newDB2CDCInput to return an error containing "either tables or table_include_regex must be specified", but newDB2CDCInput in input_db2_cdc.go does not perform that validation — an empty tables list (with no table_include_regex) passes through silently. Both test cases will fail with expected an error, got nil.

This also contradicts the tables field description which explicitly says: "When empty, all CDC-registered tables in the schema are discovered dynamically from ASNCDC.IBMSNAP_REGISTER" — so empty tables is an intentional auto-discovery feature.

Either remove these two test cases (matching the documented behavior) or add the missing validation in newDB2CDCInput and update the field description accordingly.

@claude
Copy link
Copy Markdown

claude Bot commented May 21, 2026

Commits
LGTM

Review
New db2_cdc enterprise input connector built on a CGO-free purego DB2 CLI binding, with layered architecture (db2cli ↔ driver ↔ replication ↔ input) and clean separation between snapshot, streaming, and incremental snapshot engines. Component registration, license headers, bundle import, and info.csv entry are all correctly wired.

  1. Unit tests at input_db2_cdc_test.go#L88-L96 and #L355-L362 expect a validation error that the implementation never produces (empty tables is documented as auto-discovery). See inline comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants