Skip to content

Switch ad-hoc queries to arrow_ipc format#4240

Open
Karakatiza666 wants to merge 1 commit into
mainfrom
webconsole-arrow
Open

Switch ad-hoc queries to arrow_ipc format#4240
Karakatiza666 wants to merge 1 commit into
mainfrom
webconsole-arrow

Conversation

@Karakatiza666
Copy link
Copy Markdown
Contributor

@Karakatiza666 Karakatiza666 commented Jun 27, 2025

Changes:

  • Switch to arrow-ipc. It also carries column types, so they are now mapped to ISO SQL types to show in the header
  • To return SQL ARRAY in ad-hoc responces, Arrow's List is used; for DeltaLake output LargeList is still used

Testing: manual, added unit test for formatting

Part of #4219: Deprecate the JSON format for ad-hoc queries

@Karakatiza666 Karakatiza666 requested a review from gz June 27, 2025 08:49
@Karakatiza666 Karakatiza666 added Web Console Related to the browser based UI javascript Pull requests that update Javascript code adhoc Issue related to ad hoc query processing labels Jun 27, 2025
@Karakatiza666 Karakatiza666 force-pushed the webconsole-arrow branch 2 times, most recently from 56b7606 to 9c0a916 Compare June 27, 2025 09:04
@gz
Copy link
Copy Markdown
Contributor

gz commented Jun 27, 2025

It doesn't look like it answers queries (I tried fraud-detection), revision 9c0a916
https://github.com/user-attachments/assets/50f2fbc1-e59f-4638-ab97-31acd2746cad

@Karakatiza666
Copy link
Copy Markdown
Contributor Author

I think you encountered this one: #4239

Copy link
Copy Markdown
Contributor

@gz gz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm make sure to do
some testing before merging (different pipelines with different types (variant map etc.) and queries)

@gz
Copy link
Copy Markdown
Contributor

gz commented Jul 2, 2025

@Karakatiza666 can we merge this?

@Karakatiza666
Copy link
Copy Markdown
Contributor Author

Haven't had the chance to do a few tests with complex types, hopefully tomorrow

@Karakatiza666 Karakatiza666 force-pushed the webconsole-arrow branch 2 times, most recently from 94e2e00 to 726dedf Compare July 6, 2025 18:02
@Karakatiza666
Copy link
Copy Markdown
Contributor Author

Blocked by #4287
PR functions correctly, but triggers the above bug

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Web-console behavioral changes without tests. Blocked by #4287 per author — but tests are also needed before this lands.

import BigNumber from 'bignumber.js'
import Dayjs from 'dayjs'

const arrowIpcValueToJS = <T extends DataType<Type, any>>(arrowType: Field<T>, value: any) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New Arrow IPC value conversion logic covers a lot of type cases (int64, timestamps, structs, maps, lists). This is exactly the kind of logic that deserves unit tests — each type case can go wrong independently. Per Gerd's directive (2026-03-04): behavioral changes require tests. Setup: npm install -D vitest @testing-library/svelte jsdom. The pure conversion functions here are ideal for unit testing without any DOM or component setup.

@mihaibudiu
Copy link
Copy Markdown
Contributor

@abhizer can we close this?

@abhizer
Copy link
Copy Markdown
Contributor

abhizer commented Apr 22, 2026

This is for the UI, cc @Karakatiza666

gz added a commit that referenced this pull request May 13, 2026
 revert)

stream_arrow_query used a synchronous Arrow IPC StreamWriter to an
async mpsc by spawning one tokio task per std::io::Write::write call:

    let handle = TOKIO.spawn(async move { tx.send(bytes).await });
    self.handles.push(handle);

Each StreamWriter::write(&batch) makes ~6 sequential write_all calls
The spawned tasks have no ordering relation; on a multi-thread tokio
runtime they race to send into the receiver, so bytes arrive in
arbitrary order and the resulting Arrow IPC stream gets corrupted.

The fix is to not call sync Write from inside an async future at
all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains
the buffer between batches via std::mem::take(writer.get_mut()), then
yields a single ordered Bytes chunk per batch. Memory cost is bounded
by one record batch; behaviour matches stream_json_query, which has
always used this shape.

ChannelWriter retains its AsyncFileWriter impl for the parquet path
(AsyncArrowWriter awaits each write future before issuing the next,
so ordering there is already safe); the racy std::io::Write impl, the
handles vec, and the cfg(test) reordering shim are all removed.

Refs: #3923 #3792 #4287 #4226 #5814 #4240
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
gz added a commit that referenced this pull request May 13, 2026
 revert)

stream_arrow_query used a synchronous Arrow IPC StreamWriter to an
async mpsc by spawning one tokio task per std::io::Write::write call:

    let handle = TOKIO.spawn(async move { tx.send(bytes).await });
    self.handles.push(handle);

Each StreamWriter::write(&batch) makes ~6 sequential write_all calls
The spawned tasks have no ordering relation; on a multi-thread tokio
runtime they race to send into the receiver, so bytes arrive in
arbitrary order and the resulting Arrow IPC stream gets corrupted.

The fix is to not call sync Write from inside an async future at
all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains
the buffer between batches via std::mem::take(writer.get_mut()), then
yields a single ordered Bytes chunk per batch. Memory cost is bounded
by one record batch; behaviour matches stream_json_query, which has
always used this shape.

ChannelWriter retains its AsyncFileWriter impl for the parquet path
(AsyncArrowWriter awaits each write future before issuing the next,
so ordering there is already safe); the racy std::io::Write impl, the
handles vec, and the cfg(test) reordering shim are all removed.

Refs: #3923 #3792 #4287 #4226 #5814 #4240
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
gz added a commit that referenced this pull request May 13, 2026
 revert)

stream_arrow_query used a synchronous Arrow IPC StreamWriter to an
async mpsc by spawning one tokio task per std::io::Write::write call:

    let handle = TOKIO.spawn(async move { tx.send(bytes).await });
    self.handles.push(handle);

Each StreamWriter::write(&batch) makes ~6 sequential write_all calls
The spawned tasks have no ordering relation; on a multi-thread tokio
runtime they race to send into the receiver, so bytes arrive in
arbitrary order and the resulting Arrow IPC stream gets corrupted.

The fix is to not call sync Write from inside an async future at
all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains
the buffer between batches via std::mem::take(writer.get_mut()), then
yields a single ordered Bytes chunk per batch. Memory cost is bounded
by one record batch; behaviour matches stream_json_query, which has
always used this shape.

ChannelWriter retains its AsyncFileWriter impl for the parquet path
(AsyncArrowWriter awaits each write future before issuing the next,
so ordering there is already safe); the racy std::io::Write impl, the
handles vec, and the cfg(test) reordering shim are all removed.

Refs: #3923 #3792 #4287 #4226 #5814 #4240
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
@gz
Copy link
Copy Markdown
Contributor

gz commented May 13, 2026

@Karakatiza666 we shoudl be able to fix/merge this now

mihaibudiu pushed a commit to mihaibudiu/dbsp that referenced this pull request May 14, 2026
…eldera#4226 feldera#5814 revert)

stream_arrow_query used a synchronous Arrow IPC StreamWriter to an
async mpsc by spawning one tokio task per std::io::Write::write call:

    let handle = TOKIO.spawn(async move { tx.send(bytes).await });
    self.handles.push(handle);

Each StreamWriter::write(&batch) makes ~6 sequential write_all calls
The spawned tasks have no ordering relation; on a multi-thread tokio
runtime they race to send into the receiver, so bytes arrive in
arbitrary order and the resulting Arrow IPC stream gets corrupted.

The fix is to not call sync Write from inside an async future at
all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains
the buffer between batches via std::mem::take(writer.get_mut()), then
yields a single ordered Bytes chunk per batch. Memory cost is bounded
by one record batch; behaviour matches stream_json_query, which has
always used this shape.

ChannelWriter retains its AsyncFileWriter impl for the parquet path
(AsyncArrowWriter awaits each write future before issuing the next,
so ordering there is already safe); the racy std::io::Write impl, the
handles vec, and the cfg(test) reordering shim are all removed.

Refs: feldera#3923 feldera#3792 feldera#4287 feldera#4226 feldera#5814 feldera#4240
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
@Karakatiza666 Karakatiza666 marked this pull request as draft May 15, 2026 17:25
@Karakatiza666 Karakatiza666 force-pushed the webconsole-arrow branch 5 times, most recently from fada2e5 to 8255106 Compare May 19, 2026 00:52
@Karakatiza666 Karakatiza666 requested a review from mihaibudiu May 19, 2026 00:53
@Karakatiza666 Karakatiza666 marked this pull request as ready for review May 19, 2026 00:53
assert_eq!(expected_buffer, buffer_copy);
}

/// `relation_to_arrow_fields` picks the array layout based on the consumer:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically it's not "consumer", but the "delta_lake" boolean field, which happens to be correlated with the consumer.

if delta_lake {
DataType::LargeList(element)
} else {
DataType::List(element)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO here to remove this once that issue is resolved instead of the comment above.

adhocQueries[tenantName][pipelineName].queries[i].result.columns.length === 0 &&
isDataRow(input[0])
) {
// Limit result size behavior - ignore all but first bufferSize rows
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the comment before -.

enumValue(Type.Float16),
enumValue(Type.Float32),
enumValue(Type.Float64),
enumValue(Type.DenseUnion),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these? will they even work?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values? All arrow types recognized by this library

const field = batch.schema.fields[j]
const column = batch.getChildAt(j)

if (column && column.isValid(i)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean not being valid?
is this what is supposed to happen?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is arrow's semantics, just field not being NULL. Slightly adjusted the implementation

),
list_col: buildVector(new List(new Field('item', new Int32(), true)), [10, 20, 30]),
map_col: buildVector(
new Map_(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We support maps with (almost) arbitrary key types (and value types), so maybe you should add a test for a case with integer keys too.

expect(nullValue).toBeNull()
expect((struct as { toJSON(): unknown }).toJSON()).toEqual({ a: 7, b: 'foo' })
expect((list as { toJSON(): unknown }).toJSON()).toEqual([10, 20, 30])
expect((map as { toJSON(): unknown }).toJSON()).toEqual({ k1: 42 })
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for other maps this kind of test won't work

adhocQueries[tenantName][pipelineName].queries[i].result
.rows()
.push(...rows.slice(0, bufferSize - previousLength))
reclosureKey(adhocQueries[tenantName][pipelineName].queries[i].result, 'rows')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't guess from this function name what it does
(I know it's not added in this PR)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh yes, that's one of a pair of helpers for advanced state reactivity management in Svelte 5

Make ARRAYs serialize as arrow List for ad-hoc queries

Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
@Karakatiza666 Karakatiza666 enabled auto-merge May 19, 2026 12:45
@Karakatiza666 Karakatiza666 added this pull request to the merge queue May 19, 2026
@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks May 19, 2026
@Karakatiza666
Copy link
Copy Markdown
Contributor Author

Karakatiza666 commented May 19, 2026

Switching from LargeList to List for ad-hoc queries causes invalidation of all checksums we use in QA. I'll try to add support for LargeList to the upstream JS package first

@Karakatiza666
Copy link
Copy Markdown
Contributor Author

Tracked in apache/arrow-js#438

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

adhoc Issue related to ad hoc query processing javascript Pull requests that update Javascript code Web Console Related to the browser based UI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants