Skip to content

feat(metrics): add SessionContext.memoryUsage and runtimeStats#85

Open
LantaoJin wants to merge 1 commit into
apache:mainfrom
LantaoJin:feat/query-memory-runtime-metrics
Open

feat(metrics): add SessionContext.memoryUsage and runtimeStats#85
LantaoJin wants to merge 1 commit into
apache:mainfrom
LantaoJin:feat/query-memory-runtime-metrics

Conversation

@LantaoJin
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

Multi-tenant DataFusion deployments need two operational signals that the Java binding currently does not expose:

  1. Per-session memory. SessionContextBuilder.memoryLimit(...) (PR feat: configure SessionContext and RuntimeEnv via builder #28) caps the global pool, but if a tenant blows past their fair-share allocation there is no way to attribute the bytes back to a session. Without per-session attribution, fair-share scheduling, abuse detection, and OOM root-causing all fall back to runtime restart.

  2. Tokio runtime stats. The JNI library drives a single shared multi-threaded Tokio runtime in lib.rs. Embedders that surface node-level health -- e.g. an OpenSearch _nodes/stats endpoint -- need worker count, busy time, queue depth, etc. Today they have to hand-roll a parallel native bridge.

Both share an FFI snapshot pattern: read a small struct of counters across the boundary on demand. They are bundled here so the design conversation only happens once.

What changes are included in this PR?

Two new accessors on SessionContext, two new immutable POJOs:

public final class MemoryUsage { long currentBytes(); long peakBytes(); }
public final class RuntimeStats {
  int  numWorkers();
  long liveTasksCount(), globalQueueDepth(),
       elapsedNanos(), totalBusyNanos(),
       totalParkCount(), totalPollsCount(), totalNoopCount(),
       totalStealCount(), totalLocalScheduleCount(), totalOverflowCount();
}

ctx.memoryUsage();   // always-on; thread-safe; pollable while queries run
ctx.runtimeStats();  // requires `runtime-metrics` Cargo feature

Per-session memory tracking

native/src/memory.rs introduces TrackingMemoryPool, a thin wrapper around any Arc<dyn MemoryPool> that intercepts grow/try_grow/shrink to maintain two AtomicU64 counters: total bytes currently held and the peak observed since session creation. Pool semantics (limits, eviction, spilling) are unchanged because try_grow still defers to the inner pool.

The wrapper is layered on automatically by both createSessionContext and createSessionContextWithOptions -- callers don't opt in. If SessionContextBuilder.memoryLimit(...) configured a GreedyMemoryPool or TrackConsumersPool, the tracker wraps that. If it didn't, the tracker wraps DataFusion's default UnboundedMemoryPool.

Java callers can't downcast Arc<dyn MemoryPool> back to the concrete tracker type (the trait does not require Any), so a process-wide Mutex<HashMap<jlong, Arc<TrackingMemoryPool>>> keyed by the JNI handle gives the snapshot path a way to find the right tracker. Inserted at session create, drained at session close; no extra failure modes.

Per-session, not per-DataFrame. A cross-engine survey (pandas / Polars / Spark / DuckDB / DataFusion-Rust + Python) confirmed that no engine ships per-DataFrame in-flight memory accounting. What pandas/Polars expose as memory_usage / estimated_size is data-at-rest sizing of materialised columns -- a different feature. Multi-tenant attribution in DataFusion is conventionally one session per tenant, which matches the OpenSearch prior art (QueryMemoryPool keyed off context_id). Per-DataFrame attribution would need a side-channel registry hooked into operator-time consumer creation; not blocked by this PR, can land later if requested.

Tokio runtime metrics

native/src/runtime_metrics.rs is gated behind a default-off runtime-metrics Cargo feature because tokio-metrics requires --cfg tokio_unstable at build time. tokio_metrics::RuntimeMonitor::intervals() is a delta iterator -- each next() returns metrics covering the period since the previous call -- so the module owns a single process-wide RuntimeAccumulator that maintains running totals for documented-monotonic fields. Snapshot (point-in-time) fields (workers_count, live_tasks_count, global_queue_depth) pass through without accumulation.

[features]
runtime-metrics = ["dep:tokio-metrics"]

[dependencies]
tokio-metrics = { version = "0.5", optional = true }

Build matrix:

invocation runtime-metrics build prereqs
cargo build (default) off (stub handler) none
RUSTFLAGS="--cfg tokio_unstable" cargo build --features runtime-metrics on --cfg tokio_unstable

The Java surface is unchanged either way -- SessionContext.runtimeStats() is always present; calls just throw a clear "datafusion-jni was built without the runtime-metrics Cargo feature; rebuild the native crate with RUSTFLAGS=\"--cfg tokio_unstable\" cargo build --features runtime-metrics" error from the JVM if the feature was compiled off. SessionContextRuntimeStatsTest detects this case and skips itself via JUnit's Assumptions.assumeFalse(...), so make test stays green either way.

A new make native-runtime-metrics target makes the opt-in build a one-liner.

This is intentionally similar to PR #75's substrait feature handling: a heavy / build-prereq-bearing dependency stays out of the default build, the Java surface is unchanged, and a feature-off compile substitutes a stub handler that throws clearly.

Are these changes tested?

Yes -- 9 new tests across SessionContextMemoryUsageTest and SessionContextRuntimeStatsTest.

Are there any user-facing changes?

Yes -- purely additive. New public API:

  • org.apache.datafusion.MemoryUsage (immutable value class)
  • org.apache.datafusion.RuntimeStats (immutable value class)
  • SessionContext.memoryUsage() -> MemoryUsage
  • SessionContext.runtimeStats() -> RuntimeStats

No API removals, no deprecations, no behavior change for existing callers. The default cargo build does not pull in tokio-metrics and adds no new build prerequisites. SessionContext.memoryUsage() is always available; runtimeStats() is present but throws "feature not enabled" at runtime unless rebuilt with the feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(metrics): per-query memory accounting and Tokio runtime metrics

1 participant