ARROW-17688: [C++][Java][FlightRPC] Substrait, transaction, cancellation for Flight SQL#13492
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW Opening JIRAs ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename pull request title in the following format? or See also: |
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
what is the difference between transcations and save points? Are there docs someplace?
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
I haven't been keeping up but is serialized plan sufficient, or are yaml plugins necessary?
There was a problem hiding this comment.
The root plan contains extension definitions, and I believe they're intended to be self-contained as a result, but I'll seek some clarification: https://github.com/substrait-io/substrait/blob/1080f06298d8e50abcd6acfaa6c425326a7e0579/proto/substrait/plan.proto#L24-L45
There was a problem hiding this comment.
If we're expecting the Protobuf serialized plan here, would it make more sense to just import the substrait proto definition and reference the object directly rather than having to serialize the plan and then stick the bytes inside another serialized protobuf?
There was a problem hiding this comment.
I'd like to keep code dependencies on other Protobuf messages out because Windows/Protobuf has issues with those when they're in different DLLs (as they are with Flight/Flight SQL, and as they would be here). As seen below with the CancelQuery message I already ran into linking issues and I think they're insurmountable unless protoc itself is modified.
There was a problem hiding this comment.
On the wire, the actual encoding is the same either way.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
should server timeout be something that the client has the option of specifying? Can it be introspected?
There was a problem hiding this comment.
I chose to follow prepared statements in this regard. I'll add a SqlInfo value to retrieve the timeout.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
is this misplaced should it be above name?
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
is transaction_id optional here for a new transaction? it seems like in most cases this should be server assigned?
There was a problem hiding this comment.
Hopefully it's clearer now that it's split. The ID is always server assigned. To begin a savepoint, you must provide the ID of the transaction it falls under.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
what are the semantics relative to savepoint here?
There was a problem hiding this comment.
I clarified the docstring, though I'm not sure what you're referring to here.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
transaction + savepoint?
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
from an RPC optimization standpoint, it seems that maybe we want a way of specifying this should be considered the first action inside of a transaction?
There was a problem hiding this comment.
It seems like maybe there should be something like:
message TransactionDetails {
oneof transaction {
ActionBeginTranscation begin_transaction = 1;
bytes existing_transaction_id = 2;
}
}
that can be included.
There was a problem hiding this comment.
The intent was to mimic prepared statements: the server assigns the transaction ID and gives it to the client. So here there's not a great way to return the transaction ID to the client. It would be good if FlightInfo could gain an app_metadata field for such things (since Tickets are not meant to be client-introspected).
There was a problem hiding this comment.
I suppose we could use schema metadata for that, though.
There was a problem hiding this comment.
it seems app_metadata would probably be generally useful. It seems like a hack to force the info onto the schema.
There was a problem hiding this comment.
I'll propose that separately when I get a chance (and I'll see if I can set aside some time to help with the small-result optimization stuff)
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
does there need to be a response associated with this?
There was a problem hiding this comment.
Could state. CANCELLED, NOT_CANCELLABLE, ALREADY_DONE, CANCELLING. Not sure this is useful, so feel free to ignore.
There was a problem hiding this comment.
If we declare the command idempotent, that could be useful. I suppose the server could just synchronously block until the query is cancelled but then it'd be unclear how to recover from a transient failure.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
should this be some sort of ID instead?
There was a problem hiding this comment.
What would it be? Since most of the data inside FlightInfo isn't meant to be introspected, and we haven't specified what the contents of Ticket should be, there's no consistent 'query ID' concept right now. So I chose FlightInfo since presumably that has all information the server needs to identify the query.
There was a problem hiding this comment.
would this be another place where app_metadata would be useful? Otherwise, it seems like the only implementation path for the server would be to introspect one of the tickets? (it would seem lfight descriptors might very commonly be non-unique). This might be fine, I just want to confirm my understanding.
There was a problem hiding this comment.
Yes - if we had FlightInfo.app_metadata, we could return an explicit cancellation token. But given the server is generating the tickets, it should be OK for the server to also introspect them, so long as the client doesn't.
There was a problem hiding this comment.
yeah, I think introspecting the tickets for now should be fine.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
are there semantics that need to be considered relative to transcations?
There was a problem hiding this comment.
I'll update it to specify that the transaction is not rolled back/this is only to terminate reading of the result set (CC @jduo is that in line with what you were thinking?)
There was a problem hiding this comment.
This is what I was thinking in terms of read, but also think it should terminate a running write (same as in ODBC/JDBC).
There was a problem hiding this comment.
Thanks - updated. The semantics around cancelling a write are a little unclear, I suppose in autocommit mode it gets committed, and otherwise the client can commit or rollback.
120d649 to
8326857
Compare
|
Generally seems OK to me. |
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
So "Flight SQL" is generic enough that it might support things other than actual SQL?
There was a problem hiding this comment.
Yes, the naming is unfortunate, but there's no real reason why we need to be tied to SQL specifically, and no reason why many of the existing concepts can't map to Substrait.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Er... which unit is that? Seconds? Can we make it a real/float instead, if protobuf allows that?
There was a problem hiding this comment.
I made it milliseconds, unless we prefer floating point seconds?
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Just for my understanding, savepoints are for two-phase commits, right?
There was a problem hiding this comment.
Hmm, not in the distributed systems sense, a savepoint is really just a nested transaction. I included it for parity with JDBC but it's not essential.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Hmm... should there be two separate commands for starting a transaction and a savepoint, so that they can take different parameters?
There was a problem hiding this comment.
Split into separate commands (though the response messasge and EndTransaction are still shared)
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Does the "default" happen if transaction_id is left unset?
There was a problem hiding this comment.
If so, perhaps replace "by default" with "if unset, " for clarity?
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Should we specify that the timeout is refreshed when the handle is "used"?
There was a problem hiding this comment.
I think so, we should probably provide any semantics necessary for when a timeout is refreshed.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
These transaction_id fields should be explicitly marked optional
There was a problem hiding this comment.
technically, by definition in proto3 all fields are optional and it's unnecessary to explicitly mark any as such.
There was a problem hiding this comment.
Flight SQL uses explicit optional elsewhere though (just to make it clear what is expected), so Flight SQL actually has a higher protoc minimum version as a result
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Hopefully it's clearer now that it's split. The ID is always server assigned. To begin a savepoint, you must provide the ID of the transaction it falls under.
fcaadae to
eae9ee1
Compare
|
I've been pushing various tweaks to the spec while implementing it in C++. One thing I will need to change: transaction IDs should be supplied when creating a prepared statement, not when executing them (since generally existing APIs associate the statement with a particular connection). |
|
There's now implementations in C++ and Java. TODOs:
|
bb77bbd to
78453af
Compare
|
Hmm, Windows builds fail because of a similar issue to #13434 - Protobuf and DLLs don't interact well, since you can't get The easiest thing might be to just punt on CancelQuery for now. Or else, it would have to be message ActionCancelQueryRequest {
// XXX(ARROW-16902): A serialized FlightInfo
bytes info = 1;
}and then rely on |
|
@lidavidm Since this is a draft, are you looking for a detailed review or more for general opinions? |
|
The Protobuf definitions deserve more scrutiny; for the code, I'm just looking for general opinions. |
83241ca to
6431071
Compare
|
No worries, just want to make sure! |
zeroshade
left a comment
There was a problem hiding this comment.
Added some comments for clarification
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
I think so, we should probably provide any semantics necessary for when a timeout is refreshed.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Is there a functional difference here? Or are we just including an "Unknown" so that it will be the default? In most cases developers are likely to treat "unknown" the same as "none" when it comes to transaction support: (ie. don't try calling them)
There was a problem hiding this comment.
Not really - I guess in that case let's just fold them together since there's no point. (I suppose normally in protobuf you'd distinguish the two, but that doesn't apply here.)
There was a problem hiding this comment.
yea, I agree that in protobuf you'd distinguish the two in many cases, but my typical litmus test is if a consumer of the protobuf would treat the "unknown" case differently than the "none" case. And if there isn't any functional difference, it's not necessary to distinguish them.
There was a problem hiding this comment.
Yup - I ended up folding them together
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
If we're expecting the Protobuf serialized plan here, would it make more sense to just import the substrait proto definition and reference the object directly rather than having to serialize the plan and then stick the bytes inside another serialized protobuf?
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Should this be a serialized flightinfo? or should it actually be a Ticket?
There was a problem hiding this comment.
It should be a FlightInfo because I assume the server needs the information of all endpoints in order to fully cancel a query. Also in the event that we do update FlightInfo with an application metadata field, it would automatically get passed back to the server
There was a problem hiding this comment.
Gotcha, that makes sense then.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
If the server returns CANCEL_RESULT_CANCELLING is a client supposed to poll with subsequent cancel requests until it receives CANCELLED? Or is there a different way to determine when the cancellation is completed?
There was a problem hiding this comment.
That was the intent. I'll document the variants.
There was a problem hiding this comment.
Updated - is this clearer?
265f37a to
0ac1408
Compare
|
Updated again, since @jvanstraten pointed out that the server may want to know the client's Substrait release version since otherwise it may be unclear how to interpret the plan (even if it parses properly). Also, adds some validation for SqlInfo values to the integration test + adds SqlInfo values so the server can report Substrait version support. |
There was a problem hiding this comment.
Hmm, I think I needed it originally, but maybe now that we aren't referencing Protobuf files from each other it's not needed anymore - removed
There was a problem hiding this comment.
Should perhaps use a URI placeholder and produce the proper URI programmatically?
| "uri_file": "file://FILENAME_PLACEHOLDER", | |
| "uri_file": "URI_PLACEHOLDER", |
There was a problem hiding this comment.
Perhaps expose a server_->connect_uri() instead?
There was a problem hiding this comment.
I was trying not to add new methods (MakeAceroServer just returns the base FlightSqlServerBase) and a server doesn't necessarily know what its 'public' address is (even here, we bind to 0.0.0.0 but assume that it's accessible on 'localhost')
There was a problem hiding this comment.
and a server doesn't necessarily know what its 'public' address is (even here, we bind to 0.0.0.0 but assume that it's accessible on 'localhost')
Hmm... if you mean the server might reside behind a NAT, sure, but at least from a local point of view it should know on which addresses (plural, ideally :-)) it is reachable (perhaps only localhost, or perhaps one interface, or perhaps all/many interfaces...).
There was a problem hiding this comment.
Updated to use server_->location() and bind to localhost instead of 0.0.0.0 (should be OK/preferable in test code anyways)
There was a problem hiding this comment.
Can you perhaps make parameters more explicit?
| client_->GetSqlInfo({}, { | |
| client_->GetSqlInfo(/*abc=*/{}, /*def=*/{ |
There was a problem hiding this comment.
Same here (at least for the first one :-)).
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Do you plan to remove the "experimental" markers like the above?
There was a problem hiding this comment.
I think not in this RFC, but we should do it in the near future (perhaps after the JDBC driver has seen some use)
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Is there a particular rationale for adding this option in some message definitions but not all of them?
There was a problem hiding this comment.
No, I just missed it :) I'll fix that.
lidavidm
left a comment
There was a problem hiding this comment.
Thanks for the comments, I'll update this soon. (I guess these are mostly minor changes and we shouldn't need to restart the vote?)
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
As I understand, the backwards compatibility story is not yet worked out, so in the future, this may be less useful, but for now, it's the only way to reliably determine whether a plan can really be executed.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Maybe @jduo can chime in; this timeout existed implicitly before, but something here is necessary because unlike JDBC/ODBC which can tie these to the lifetime of an actual connection, Flight SQL makes fewer assumptions about state being tied to the gRPC connection (which are more disposable).
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
I think not in this RFC, but we should do it in the near future (perhaps after the JDBC driver has seen some use)
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
No, I just missed it :) I'll fix that.
format/FlightSql.proto
Outdated
There was a problem hiding this comment.
Hmm, I think that makes sense, though current clients are going to assume SQL support.
format/FlightSql.proto
Outdated
Yes, definitely. Also, sorry, the review contains C++ comments that I did some days/weeks ago but had forgotten to submit apparently :-S |
58f748e to
5823b75
Compare
|
Rebased + updated (minus the timeout since we'd have to change the result set schema for GetSqlInfo to add floating point) |
a8ff5fb to
1a8af54
Compare
pitrou
left a comment
There was a problem hiding this comment.
I took a quick look at the C++ parts again.
There was a problem hiding this comment.
Use checked_cast here and below?
There was a problem hiding this comment.
Would be nice to add docstrings/comments explaining each non-trivial helper class here.
There was a problem hiding this comment.
Nit, but the logic to create the FlightInfo from an encoded substrait plan could perhaps be factored out in a dedicated helper method? (since GetFlightInfoPreparedStatement has the same logic inside)
There was a problem hiding this comment.
Nit
| std::string db_uri_; | |
| const std::string db_uri_; |
There was a problem hiding this comment.
I see that accesses to prepared_statements_ are never mutex-protected, is it right?
There was a problem hiding this comment.
Not sure how efficient you want this to be, but you might release the lock around these lines (and call open_transactions_.erase before?).
|
Updated, thanks Antoine! |
274a3d8 to
6c48fdd
Compare
|
CI failures here are addressed/fixed elsewhere |
|
Benchmark runs are scheduled for baseline = d571e93 and contender = 3ce4014. 3ce4014 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
['Python', 'R'] benchmarks have high level of regressions. |
…ion for Flight SQL (apache#13492) "[VOTE] Substrait for Flight SQL" https://lists.apache.org/thread/3k3np6314dwb0n7n1hrfwony5fcy7kzl Authored-by: David Li <li.davidm96@gmail.com> Signed-off-by: David Li <li.davidm96@gmail.com>
This PR bumps Apache Arrow version from 9.0.0 to 10.0.0. Main changes related to PyAmber: ## Java/Scala side: - JDBC Driver for Arrow Flight SQL ([13800](apache/arrow#13800)) - Initial implementation of immutable Table API ([14316](apache/arrow#14316)) - Substrait, transaction, cancellation for Flight SQL ([13492](apache/arrow#13492)) - Read Arrow IPC, CSV, and ORC files by NativeDatasetFactory ([13811](apache/arrow#13811), [13973](apache/arrow#13973), [14182](apache/arrow#14182)) - Add utility to bind Arrow data to JDBC parameters ([13589](apache/arrow#13589)) ## Python side: - The batch_readahead and fragment_readahead arguments for scanning Datasets are exposed in Python ([ARROW-17299](https://issues.apache.org/jira/browse/ARROW-17299)). - ExtensionArrays can now be created from a storage array through the pa.array(..) constructor ([ARROW-17834](https://issues.apache.org/jira/browse/ARROW-17834)). - Converting ListArrays containing ExtensionArray values to numpy or pandas works by falling back to the storage array ([ARROW-17813](https://issues.apache.org/jira/browse/ARROW-17813)). - Casting Tables to a new schema now honors the nullability flag in the target schema ([ARROW-16651](https://issues.apache.org/jira/browse/ARROW-16651)).
…ion for Flight SQL (apache#13492) "[VOTE] Substrait for Flight SQL" https://lists.apache.org/thread/3k3np6314dwb0n7n1hrfwony5fcy7kzl Authored-by: David Li <li.davidm96@gmail.com> Signed-off-by: David Li <li.davidm96@gmail.com>
"[VOTE] Substrait for Flight SQL"
https://lists.apache.org/thread/3k3np6314dwb0n7n1hrfwony5fcy7kzl