Skip to content

[connectors] configure storage for datafusion instances used by connectors#6170

Open
swanandx wants to merge 1 commit into
mainfrom
issue6153
Open

[connectors] configure storage for datafusion instances used by connectors#6170
swanandx wants to merge 1 commit into
mainfrom
issue6153

Conversation

@swanandx
Copy link
Copy Markdown
Member

@swanandx swanandx commented May 1, 2026

see commit

Fix #6153

Describe Manual Test Plan

Created big ( ~15M rows ) CDC Delta table and verified that delta connector cdc mode spillds to disk.

on main branch it uses over 8GB memory where as with this fix we use only about 2GB ( / around what we configure )

Verified there are datafusion session files on disk, if we configure pipeline with low memory we get:

Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by Resources exhausted: Failed to allocate additional 64.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 5.7 MB remain available for the total pool

though this was same behavior prev ig, but now we log warning if allocated memory is too low like:

=WARN feldera_adapterlib::utils::datafusion:  DataFusion memory pool is 64 MB; sort-heavy ad-hoc queries (ORDER BY, EXCEPT, hash joins) need at least 256 MB (4 workers x 64 MB reservation per worker). Such queries may fail at first allocation with 'Resources exhausted'. Increase 'datafusion_memory_mb' or reduce 'workers'.

pipeline sql:

CREATE TABLE seeds (
    id BIGINT NOT NULL
) WITH (
    'materialized' = 'true',
    'connectors' = '[{
        "transport": {
            "name": "datagen",
            "config": {
                "plan": [
                    {
                        "limit": 5000000,
                        "rate": 10000,
                        "fields": {
                            "id": {
                                "strategy": "increment"
                            }
                        }
                    }
                ]
            }
        }
    }]'
);

run select * from seeds order by id;.

for delta cdc:

CREATE TABLE big_cdc (
    id BIGINT NOT NULL,
    payload VARCHAR,
    "__feldera_op" VARCHAR,
    "__feldera_ts" BIGINT
) WITH (
    'connectors' = '[{
        "transport": {
            "name": "delta_table_input",
            "config": {
                "uri": "/tmp/pr6170-validation/cdc_table",
                "mode": "cdc",
                "version": 0,
                "cdc_delete_filter": "__feldera_op = ''d''",
                "cdc_order_by": "__feldera_ts",
                "max_concurrent_readers": 1
            }
        }
    }]'
);

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

not a breaking change

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two hard blocks (manual test plan + missing tests for the actual behavior change) and a couple of design points worth a look. Findings inline.

On the PR description: ### Describe Manual Test Plan literally says TODO: verify if we're actually spilling to disk. The whole point of this PR is to make Delta/Iceberg connectors spill so they stop OOMing on #6153 — please actually verify spilling happens (e.g. run a Delta CDC scan with a tight memory_mb_max against data large enough to force a spill, point at a storage path, observe files appearing under <storage>/delta-tmp-*/) and document those steps. Without that, neither you nor a reviewer can claim this fixes the bug.

Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs
Comment thread crates/adapters/src/integrated/delta_table/input.rs Outdated
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Architectural fix is in good shape — single shared RuntimeEnv, no more unwrap, sanitize_path_component retired, scratch dir centralized. One soft follow-up inline.

Comment thread crates/adapterlib/src/utils/datafusion.rs
@swanandx swanandx requested a review from ryzhyk May 2, 2026 07:54
@ryzhyk ryzhyk requested a review from gz May 2, 2026 19:05
Copy link
Copy Markdown
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice. I didn't know about shared runtime environments.
I requested @gz 's review, since he has the most experience with datafusion config.

Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapters/src/integrated/delta_table/input.rs Outdated
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — only change since my prior approval (a93909b) is the regenerated openapi.json. Trivial.

@swanandx swanandx force-pushed the issue6153 branch 2 times, most recently from caf694c to 7cd8572 Compare May 20, 2026 10:44
@swanandx
Copy link
Copy Markdown
Member Author

Seems to work as expected, needs feedback / input on:


1. Delta CDC target_partitions:

if max_concurrent_readers isn't set, Delta input connector falls bach to DEFAULT_MAX_CONCURRENT_READERS=6.
In that case we would need atleaset 6*64MB ( ~385MB ) for datafusion for it to work. Maybe we can:

  • cap target_partition based on available memory in poll ( memory / 64 + some buffer )
  • reduce SORT_SPILL_RESERVATION_BYTES from 64MB to lower value
  • increase default floor to allocated around that much

if user specifies max_concurrent_readers, then it's upto them to increase datafusion memory as well if we don't cap.


2. Logs spam:

When reading from spill, logs get spammed with:

WARN datafusion_physical_plan::spill: Record batch memory usage (3485712 bytes) exceeds the expected limit (3481600 bytes)
by more than the allowed tolerance (4096 bytes).
This likely indicates a bug in memory accounting during spilling.
Please report this issue in https://github.com/apache/datafusion/issues/17340. log.target="datafusion_physical_plan::spill" log.module_path="datafusion_physical_plan::spill" log.file="/Users/swanx/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/datafusion-physical-plan-52.5.0/src/spill/mod.rs" log.line=157

The issue is open at datafusion, one way would be to set RUST_LOG="datafusion_physical_plan::spill=error" but then we might lose some other warning?

@mythical-fred
Copy link
Copy Markdown

On the two open questions:

1. target_partitions vs datafusion budget.

I'd avoid silently expanding partitions past what the budget can sustain — the existing max_concurrent_readers = 6 default already burned us once (see the silent-regression note on #6007). Two options I'd consider:

  • Cap from below: effective_target_partitions = min(io_workers.unwrap_or(workers), max(1, datafusion_budget_mb / 80)) (80 ≈ 64 MB SORT_SPILL_RESERVATION_BYTES + ~25% headroom). Predictable: budget bounds parallelism, no surprise OOMs.
  • Cap from above (floor bump): lift the floor to ~512 MB so 6 partitions × 64 MB + slack always fits. Cheap but wasteful on tiny pipelines, and still breaks the moment a user bumps max_concurrent_readers to 12.

The first is the principled fix. Floor-bump-only papers over the symptom.

If you cap and the resulting partition count differs from io_workers, please log::info! it once with the budget arithmetic so an operator can see "I asked for 12, you gave me 4 because budget was 320 MB". Non-actionable silent caps are the same anti-pattern we've been flagging elsewhere.

Don't drop SORT_SPILL_RESERVATION_BYTES from 64 MB — that's datafusion's contract for sort-merge and lowering it tends to push the OOM into the sort operator itself rather than fixing anything.

2. Spill-log spam.

Target-scoped filter is the right call — datafusion_physical_plan::spill=error only mutes that one module. Either:

I'd take option (a) — single line of code, dies when upstream fixes the accounting. Global RUST_LOG knob is the wrong place; it leaves the next user to discover the spam.

Both are nice-to-have follow-ups, not blockers on this PR — my approval still stands.

@swanandx
Copy link
Copy Markdown
Member Author

re 2: spill-log spam

level was changed from warn! to -> debug! in apache/datafusion#19885 [ and was released in v53.0.0 ]

and to use datafusion 53, we would need to update iceberg & delta-rs ( which we need to do anyways for fixing dependabot security alerts, but waiting for iceberg next release )

@swanandx
Copy link
Copy Markdown
Member Author

re 1: changed it to use worker same as ad-hoc query, but with improved error message:

[df] 2026-05-21T09:33:29.043303Z  WARN dbsp_adapters::integrated::delta_table::input:  delta_table big_cdc.unnamed-0: error retrieving CDC transaction with 11 adds ["delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00000-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00001-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00002-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00003-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00004-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00005-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00006-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00007-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00010-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00008-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet", "delta-rs://file---private-tmp-pr6170-validation-cdc_table-/part-00009-10fefcd4-0970-420c-8eaa-51003c4cd239-c000.snappy.parquet"] and 0 removes [] after 4 attempts (current table version: 0): error retrieving batch 0: Context("Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes", ResourcesExhausted("Failed to allocate additional 61.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 11.4 MB remain available for the total pool"))
DataFusion memory pool is exhausted. Consider increasing 'datafusion_memory_mb' in the pipeline runtime config. If raising the budget is not an option, reduce 'io_workers' / 'workers' or set the env var 'DELTA_DF_TARGET_PARTITIONS=1' to lower per-scan parallelism.
; retrying in 4.187s

…ctors

Delta Lake and Iceberg connectors built bare SessionContexts with no
memory pool or spill path, so large scans (e.g. ORDER BY in Delta CDC)
could OOM the pipeline.

Build one Arc<RuntimeEnv> per pipeline with a FairSpillPool and a
spill directory at {storage.path}/datafusion-tmp/, shared by the ad-hoc
engine and every DataFusion-using connector. Add
RuntimeConfig.datafusion_memory_mb (default: 5% of the effective
memory budget, capped at 2 GB) and subtract it from the
DBSP circuit's RSS limit so the two no longer double-book RAM.

Rename ADHOC_TEMP_DIR -> DATAFUSION_TEMP_DIR ("datafusion-tmp") so
checkpointer::gc_startup keeps the new directory. Stale adhoc-tmp/
from prior releases is removed by the existing GC sweep.

Expose create_runtime_env / create_session_context[_with] in
feldera_adapterlib::utils::datafusion; the _with variant preserves
Delta's schema_force_view_types override.

Emit a startup `warn!` when the pool can't satisfy the ad-hoc sort
reservation (`workers * 64 MiB`), so an under-sized pool no longer
fails ORDER BY silently.

Fix #6153

Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-review on 6ca60eedb7 — APPROVE stands. Solid pass on both open questions:

1. target_partitions vs budget. You went with neither cap-from-below nor floor-bump, and instead made the failure mode loud and actionable. That's a reasonable call:

  • warn_if_pool_too_small_for_adhoc_sort fires at startup when pool_mb ≤ workers × 64 MB, with the exact arithmetic in the message.
  • format_datafusion_error appends the 'datafusion_memory_mb' / 'io_workers' / DELTA_DF_TARGET_PARTITIONS=1 hint when the root cause is ResourcesExhausted, walking past Context(...) wrappers via find_root so it survives nested errors. The log you pasted reads exactly the way a user-actionable error should.
  • min_pool_mb_for_adhoc_sort_matches_reservation_times_workers pins the threshold to SORT_SPILL_RESERVATION_BYTES, so the warning stays correct if the constant ever moves.

What I'd still gently push for as a follow-up: a real cap, not just a warning. A user who blows past the threshold sees the WARN at startup, ignores it (or it's drowned in pod logs), then gets a per-query failure later. Capping target_partitions = min(io_workers ∨ workers, max(1, pool_mb / 64)) would make the failure impossible instead of merely actionable. Tracking issue or follow-up PR is fine.

2. Spill-log spam. Agreed — apache/datafusion#19885 already demoted to debug! in v53.0.0 and the iceberg / delta-rs bump is the right cleanup path. Nothing to do here.

Small things, all non-blocking:

  • clean_stale_scratch_entries is per-process best-effort. Two pipelines sharing the same storage path would race here (and would already have other problems — just worth a doc note that the scratch dir is single-tenant).
  • format_datafusion_error is delta-only; iceberg input.rs will produce the same error class on the same memory pool but won't get the hint. Cheap to share via feldera_adapterlib::utils::datafusion.
  • create_session_context_with is a nice generalization, but the only remaining caller-customised setting is schema_force_view_types = false for serde_arrow. A // TODO: remove when serde_arrow supports Utf8View next to it would prevent it from becoming load-bearing.

Thanks for the careful turnaround.

/// the pipeline's effective memory budget, which would leave no memory
/// for the DBSP circuit.
DatafusionMemoryExceedsBudget {
datafusion_memory_mb: u64,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than throwing an error, you should just do min(datafusion_memory_mb, max_rss_mb) and log a warning?

use tracing::warn;

/// In-memory sort threshold; above this, sorts spill to disk. 64 MB.
const SORT_IN_PLACE_THRESHOLD_BYTES: usize = 64 * 1_000_000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we can't we do MiB here

I get it k8s decided to be stupid and use MB, but computers and os allocators ultimately deal with pages and ranges that are in powers of two (4KiB, 64KiB, 2MiB, for 1 GiB) and a good way is to just think in base 2 for this if you want things that naturally fit in a range the OS or CPU deals with anyways

/// typically a handful of bytes over a 4 KB tolerance -- upstream
/// accounting drift, tracked at
/// <https://github.com/apache/datafusion/issues/17340> Not a query failure
const SORT_SPILL_RESERVATION_BYTES: usize = 64 * 1_000_000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same.. and you can write it cooler: 1<<26

/// from the pool *before* sorting any rows. Both the pool and the
/// reservation are decimal MB, so this is straight multiplication.
fn min_pool_mb_for_adhoc_sort(workers: u64) -> u64 {
let reservation_mb = (SORT_SPILL_RESERVATION_BYTES as u64) / 1_000_000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another nice thing about using powers of two is that divisions are very quick for computers.

/// process is gone by the time we get here, so anything still in the dir is
/// orphaned. Spill files are per-query and never need to survive a restart.
/// Errors only logged: a stuck file should not block startup.
fn clean_stale_scratch_entries(scratch_dir: &Path) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have a safe guard here such that Path can't be / (and/or has to have osme well formed pattern like startswith(datafusion) and rejected otherwise

/// Upper bound on the default DataFusion pool size, in MB.
///
/// Spill-to-disk handles overflow; reserving more starves the circuit.
pub const DEFAULT_DATAFUSION_MEMORY_MB_CEILING: u64 = 2048;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey look, a power of two

pub const DEFAULT_DATAFUSION_MEMORY_MB_CEILING: u64 = 2048;

/// Default DataFusion pool sizing fraction, as the divisor `effective / N`.
pub const DEFAULT_DATAFUSION_MEMORY_FRACTION_DIVISOR: u64 = 20;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is 20

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Configure storage for datafusion instances used by connectors.

4 participants