From 37dce6ec69a4cf3811bc9d7c449d116c786c8ed8 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Tue, 24 Mar 2026 18:38:00 +0000 Subject: [PATCH 1/5] Add new network route via cursor.execute() --- .../adbc_driver_manager/dbapi.py | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py index d3cb157503..ea65ffa751 100644 --- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py +++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py @@ -40,6 +40,7 @@ import abc import datetime +import logging import os import pathlib import threading @@ -72,6 +73,8 @@ # ---------------------------------------------------------- # Globals +_logger = logging.getLogger(__name__) + #: The DB-API API level (2.0). apilevel = "2.0" #: The thread safety level (connections may not be shared). @@ -921,10 +924,22 @@ def execute(self, operation: Union[bytes, str], parameters=None) -> "Self": self._clear() self._prepare_execute(operation, parameters) - handle, self._rowcount = _blocking_call( - self._stmt.execute_query, (), {}, self._stmt.cancel - ) - self._results = _RowIterator(self._stmt, handle, self._backend) + is_update = False + try: + val = self._stmt.get_option("adbc.flight.sql.is_update") + is_update = val.lower() == "true" + except (adbc_driver_manager.NotSupportedError, adbc_driver_manager.ProgrammingError) as e: + _logger.debug("adbc.flight.sql.is_update option not available: %s", e) + + if is_update: + self._rowcount = _blocking_call( + self._stmt.execute_update, (), {}, self._stmt.cancel + ) + else: + handle, self._rowcount = _blocking_call( + self._stmt.execute_query, (), {}, self._stmt.cancel + ) + self._results = _RowIterator(self._stmt, handle, self._backend) return self def executemany(self, operation: Union[bytes, str], seq_of_parameters) -> None: From 9b1aaef91277df8c1d0f160965a1d542b5e05f8b Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Tue, 31 Mar 2026 20:39:21 +0100 Subject: [PATCH 2/5] Expose new option to read the value of the field is_update in ActionCreatePreparedStatementResult --- .../driver/flightsql/flightsql_statement.go | 24 +++++++++++++++++++ .../adbc_driver_flightsql/__init__.py | 8 +++++++ 2 files changed, 32 insertions(+) diff --git a/go/adbc/driver/flightsql/flightsql_statement.go b/go/adbc/driver/flightsql/flightsql_statement.go index 85f750a0c9..f6b425b6c0 100644 --- a/go/adbc/driver/flightsql/flightsql_statement.go +++ b/go/adbc/driver/flightsql/flightsql_statement.go @@ -46,6 +46,12 @@ const ( // so this is not entirely necessary depending on the version // of substrait and the capabilities of the server. OptionStatementSubstraitVersion = "adbc.flight.sql.substrait.version" + // OptionStatementIsUpdate is a read-only option that indicates whether a + // prepared statement performs an update (DML) rather than returning a + // result set. The value is "true" or "false". If the server did not + // include the hint in its CreatePreparedStatement response, reading this + // option returns StatusNotFound. + OptionStatementIsUpdate = "adbc.flight.sql.is_update" ) func atomicLoadFloat64(x *float64) float64 { @@ -256,6 +262,24 @@ func (s *statement) GetOption(key string) (string, error) { return adbc.OptionValueEnabled, nil } return adbc.OptionValueDisabled, nil + case OptionStatementIsUpdate: + if s.prepared == nil { + return "", adbc.Error{ + Msg: "[Flight SQL] adbc.flight.sql.is_update is only available after Prepare()", + Code: adbc.StatusNotFound, + } + } + isUpdate := s.prepared.IsUpdate() + if isUpdate == nil { + return "", adbc.Error{ + Msg: "[Flight SQL] server did not provide is_update hint for this prepared statement", + Code: adbc.StatusNotFound, + } + } + if *isUpdate { + return adbc.OptionValueEnabled, nil + } + return adbc.OptionValueDisabled, nil } if strings.HasPrefix(key, OptionRPCCallHeaderPrefix) { diff --git a/python/adbc_driver_flightsql/adbc_driver_flightsql/__init__.py b/python/adbc_driver_flightsql/adbc_driver_flightsql/__init__.py index 14a086b3a5..cbf7232dd3 100644 --- a/python/adbc_driver_flightsql/adbc_driver_flightsql/__init__.py +++ b/python/adbc_driver_flightsql/adbc_driver_flightsql/__init__.py @@ -215,6 +215,14 @@ class ConnectionOptions(enum.Enum): class StatementOptions(enum.Enum): """Statement options specific to the Flight SQL driver.""" + #: Whether the prepared statement performs an update (DML) rather than + #: returning a result set. + #: + #: Read-only. Only available after the statement has been prepared. + #: Returns "true" or "false". If the server did not include this hint + #: in its CreatePreparedStatement response, reading this option raises + #: an error. + IS_UPDATE = "adbc.flight.sql.is_update" #: The latest FlightInfo value. #: #: Thread-safe. Mostly useful when using incremental execution, where an From 0a921bb4c57b2386caf62769d6395ecb42a66abf Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Fri, 3 Apr 2026 00:52:29 +0100 Subject: [PATCH 3/5] Revert changes on Python drivers --- .../driver/flightsql/flightsql_statement.go | 24 ------------------- .../adbc_driver_flightsql/__init__.py | 8 ------- .../adbc_driver_manager/dbapi.py | 23 ++++-------------- 3 files changed, 4 insertions(+), 51 deletions(-) diff --git a/go/adbc/driver/flightsql/flightsql_statement.go b/go/adbc/driver/flightsql/flightsql_statement.go index f6b425b6c0..85f750a0c9 100644 --- a/go/adbc/driver/flightsql/flightsql_statement.go +++ b/go/adbc/driver/flightsql/flightsql_statement.go @@ -46,12 +46,6 @@ const ( // so this is not entirely necessary depending on the version // of substrait and the capabilities of the server. OptionStatementSubstraitVersion = "adbc.flight.sql.substrait.version" - // OptionStatementIsUpdate is a read-only option that indicates whether a - // prepared statement performs an update (DML) rather than returning a - // result set. The value is "true" or "false". If the server did not - // include the hint in its CreatePreparedStatement response, reading this - // option returns StatusNotFound. - OptionStatementIsUpdate = "adbc.flight.sql.is_update" ) func atomicLoadFloat64(x *float64) float64 { @@ -262,24 +256,6 @@ func (s *statement) GetOption(key string) (string, error) { return adbc.OptionValueEnabled, nil } return adbc.OptionValueDisabled, nil - case OptionStatementIsUpdate: - if s.prepared == nil { - return "", adbc.Error{ - Msg: "[Flight SQL] adbc.flight.sql.is_update is only available after Prepare()", - Code: adbc.StatusNotFound, - } - } - isUpdate := s.prepared.IsUpdate() - if isUpdate == nil { - return "", adbc.Error{ - Msg: "[Flight SQL] server did not provide is_update hint for this prepared statement", - Code: adbc.StatusNotFound, - } - } - if *isUpdate { - return adbc.OptionValueEnabled, nil - } - return adbc.OptionValueDisabled, nil } if strings.HasPrefix(key, OptionRPCCallHeaderPrefix) { diff --git a/python/adbc_driver_flightsql/adbc_driver_flightsql/__init__.py b/python/adbc_driver_flightsql/adbc_driver_flightsql/__init__.py index cbf7232dd3..14a086b3a5 100644 --- a/python/adbc_driver_flightsql/adbc_driver_flightsql/__init__.py +++ b/python/adbc_driver_flightsql/adbc_driver_flightsql/__init__.py @@ -215,14 +215,6 @@ class ConnectionOptions(enum.Enum): class StatementOptions(enum.Enum): """Statement options specific to the Flight SQL driver.""" - #: Whether the prepared statement performs an update (DML) rather than - #: returning a result set. - #: - #: Read-only. Only available after the statement has been prepared. - #: Returns "true" or "false". If the server did not include this hint - #: in its CreatePreparedStatement response, reading this option raises - #: an error. - IS_UPDATE = "adbc.flight.sql.is_update" #: The latest FlightInfo value. #: #: Thread-safe. Mostly useful when using incremental execution, where an diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py index ea65ffa751..d3cb157503 100644 --- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py +++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py @@ -40,7 +40,6 @@ import abc import datetime -import logging import os import pathlib import threading @@ -73,8 +72,6 @@ # ---------------------------------------------------------- # Globals -_logger = logging.getLogger(__name__) - #: The DB-API API level (2.0). apilevel = "2.0" #: The thread safety level (connections may not be shared). @@ -924,22 +921,10 @@ def execute(self, operation: Union[bytes, str], parameters=None) -> "Self": self._clear() self._prepare_execute(operation, parameters) - is_update = False - try: - val = self._stmt.get_option("adbc.flight.sql.is_update") - is_update = val.lower() == "true" - except (adbc_driver_manager.NotSupportedError, adbc_driver_manager.ProgrammingError) as e: - _logger.debug("adbc.flight.sql.is_update option not available: %s", e) - - if is_update: - self._rowcount = _blocking_call( - self._stmt.execute_update, (), {}, self._stmt.cancel - ) - else: - handle, self._rowcount = _blocking_call( - self._stmt.execute_query, (), {}, self._stmt.cancel - ) - self._results = _RowIterator(self._stmt, handle, self._backend) + handle, self._rowcount = _blocking_call( + self._stmt.execute_query, (), {}, self._stmt.cancel + ) + self._results = _RowIterator(self._stmt, handle, self._backend) return self def executemany(self, operation: Union[bytes, str], seq_of_parameters) -> None: From 0ab4e00d99b0f9f10139d5c72b298febf53d0b97 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Fri, 3 Apr 2026 02:35:34 +0100 Subject: [PATCH 4/5] Add new network flow to ADBC Go Statement.ExecuteQuery() for prepared statements --- go/adbc/driver/flightsql/flightsql_statement.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/go/adbc/driver/flightsql/flightsql_statement.go b/go/adbc/driver/flightsql/flightsql_statement.go index 85f750a0c9..3e12bf52fd 100644 --- a/go/adbc/driver/flightsql/flightsql_statement.go +++ b/go/adbc/driver/flightsql/flightsql_statement.go @@ -514,6 +514,14 @@ func (s *statement) ExecuteQuery(ctx context.Context) (rdr array.RecordReader, n var header, trailer metadata.MD opts := append([]grpc.CallOption{}, grpc.Header(&header), grpc.Trailer(&trailer), s.timeouts) if s.prepared != nil { + if isUpdate := s.prepared.IsUpdate(); isUpdate != nil && *isUpdate { + nrec, err = s.prepared.ExecuteUpdate(ctx, opts...) + if err != nil { + return nil, -1, adbcFromFlightStatusWithDetails(err, header, trailer, "ExecuteUpdate") + } + rdr, err = array.NewRecordReader(arrow.NewSchema(nil, nil), nil) + return + } info, err = s.prepared.Execute(ctx, opts...) } else { info, err = s.query.execute(ctx, s.cnxn, opts...) From 6d5a113181e4684989bc211ef1256d899a8f97bf Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Fri, 3 Apr 2026 19:17:27 +0100 Subject: [PATCH 5/5] Add tests --- .../flightsql/flightsql_adbc_server_test.go | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go index 6567c0008b..c27813648e 100644 --- a/go/adbc/driver/flightsql/flightsql_adbc_server_test.go +++ b/go/adbc/driver/flightsql/flightsql_adbc_server_test.go @@ -3098,3 +3098,158 @@ func (suite *BulkIngestTests) TestBulkIngestViaExecuteQuery() { suite.Require().Len(ingestedData, 1) suite.Equal(int64(2), ingestedData[0].NumRows()) } + +// ---- IsUpdate Prepared Statement Tests -------------------- + +type IsUpdateTestServer struct { + flightsql.BaseServer + + getFlightInfoPreparedCalled bool + doPutPreparedStatementUpdCalled bool + doGetPreparedStatementCalled bool + doGetPreparedStatementHandle []byte +} + +func (srv *IsUpdateTestServer) CreatePreparedStatement(_ context.Context, req flightsql.ActionCreatePreparedStatementRequest) (flightsql.ActionCreatePreparedStatementResult, error) { + result := flightsql.ActionCreatePreparedStatementResult{ + Handle: []byte(req.GetQuery()), + } + switch req.GetQuery() { + case "UPDATE t SET x = 1": + t := true + result.IsUpdate = &t + case "SELECT 1": + f := false + result.IsUpdate = &f + } + // default: IsUpdate remains nil + return result, nil +} + +func (srv *IsUpdateTestServer) ClosePreparedStatement(_ context.Context, _ flightsql.ActionClosePreparedStatementRequest) error { + return nil +} + +func (srv *IsUpdateTestServer) GetFlightInfoPreparedStatement(_ context.Context, cmd flightsql.PreparedStatementQuery, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) { + srv.getFlightInfoPreparedCalled = true + + query := &flightproto.CommandPreparedStatementQuery{ + PreparedStatementHandle: cmd.GetPreparedStatementHandle(), + } + var ticket anypb.Any + if err := ticket.MarshalFrom(query); err != nil { + return nil, err + } + tkt, err := proto.Marshal(&ticket) + if err != nil { + return nil, err + } + return &flight.FlightInfo{ + FlightDescriptor: desc, + Endpoint: []*flight.FlightEndpoint{ + {Ticket: &flight.Ticket{Ticket: tkt}}, + }, + TotalRecords: 0, + TotalBytes: -1, + }, nil +} + +func (srv *IsUpdateTestServer) DoGetPreparedStatement(_ context.Context, cmd flightsql.PreparedStatementQuery) (*arrow.Schema, <-chan flight.StreamChunk, error) { + srv.doGetPreparedStatementCalled = true + srv.doGetPreparedStatementHandle = cmd.GetPreparedStatementHandle() + sc := arrow.NewSchema([]arrow.Field{}, nil) + ch := make(chan flight.StreamChunk) + close(ch) + return sc, ch, nil +} + +func (srv *IsUpdateTestServer) DoPutPreparedStatementUpdate(_ context.Context, _ flightsql.PreparedStatementUpdate, _ flight.MessageReader) (int64, error) { + srv.doPutPreparedStatementUpdCalled = true + return 42, nil +} + +type IsUpdateTests struct { + ServerBasedTests + + srv *IsUpdateTestServer +} + +func (suite *IsUpdateTests) SetupSuite() { + suite.srv = &IsUpdateTestServer{} + suite.srv.Alloc = memory.DefaultAllocator + suite.DoSetupSuite(suite.srv, nil, nil) +} + +func (suite *IsUpdateTests) SetupTest() { + suite.srv.getFlightInfoPreparedCalled = false + suite.srv.doPutPreparedStatementUpdCalled = false + suite.srv.doGetPreparedStatementCalled = false + suite.srv.doGetPreparedStatementHandle = nil + suite.ServerBasedTests.SetupTest() +} + +// TestIsUpdateNil verifies that when IsUpdate is nil (not set by server), +// the driver calls GetFlightInfoPreparedStatement (Execute path). +func (suite *IsUpdateTests) TestIsUpdateNil() { + stmt, err := suite.cnxn.NewStatement() + suite.Require().NoError(err) + defer validation.CheckedClose(suite.T(), stmt) + + suite.Require().NoError(stmt.SetSqlQuery("SELECT *")) + suite.Require().NoError(stmt.Prepare(context.Background())) + + rdr, _, err := stmt.ExecuteQuery(context.Background()) + suite.Require().NoError(err) + defer rdr.Release() + + suite.True(suite.srv.getFlightInfoPreparedCalled, "Expected GetFlightInfoPreparedStatement to be called when IsUpdate is nil") + suite.False(suite.srv.doPutPreparedStatementUpdCalled, "Expected DoPutPreparedStatementUpdate NOT to be called when IsUpdate is nil") + suite.True(suite.srv.doGetPreparedStatementCalled, "Expected DoGetPreparedStatement to be called when IsUpdate is nil") + suite.Equal([]byte("SELECT *"), suite.srv.doGetPreparedStatementHandle, "Expected DoGetPreparedStatement to receive the correct prepared statement handle") +} + +// TestIsUpdateFalse verifies that when IsUpdate is explicitly false, +// the driver calls GetFlightInfoPreparedStatement (Execute path). +func (suite *IsUpdateTests) TestIsUpdateFalse() { + stmt, err := suite.cnxn.NewStatement() + suite.Require().NoError(err) + defer validation.CheckedClose(suite.T(), stmt) + + suite.Require().NoError(stmt.SetSqlQuery("SELECT 1")) + suite.Require().NoError(stmt.Prepare(context.Background())) + + rdr, _, err := stmt.ExecuteQuery(context.Background()) + suite.Require().NoError(err) + defer rdr.Release() + + suite.True(suite.srv.getFlightInfoPreparedCalled, "Expected GetFlightInfoPreparedStatement to be called when IsUpdate is false") + suite.False(suite.srv.doPutPreparedStatementUpdCalled, "Expected DoPutPreparedStatementUpdate NOT to be called when IsUpdate is false") + suite.True(suite.srv.doGetPreparedStatementCalled, "Expected DoGetPreparedStatement to be called when IsUpdate is false") + suite.Equal([]byte("SELECT 1"), suite.srv.doGetPreparedStatementHandle, "Expected DoGetPreparedStatement to receive the correct prepared statement handle") +} + +// TestIsUpdateTrue verifies that when IsUpdate is explicitly true, +// the driver calls DoPutPreparedStatementUpdate (ExecuteUpdate path) +// instead of GetFlightInfoPreparedStatement, even when ExecuteQuery is called. +func (suite *IsUpdateTests) TestIsUpdateTrue() { + stmt, err := suite.cnxn.NewStatement() + suite.Require().NoError(err) + defer validation.CheckedClose(suite.T(), stmt) + + suite.Require().NoError(stmt.SetSqlQuery("UPDATE t SET x = 1")) + suite.Require().NoError(stmt.Prepare(context.Background())) + + rdr, nrec, err := stmt.ExecuteQuery(context.Background()) + suite.Require().NoError(err) + defer rdr.Release() + + suite.EqualValues(42, nrec, "Expected ExecuteQuery to return the number of rows affected by DoPutPreparedStatementUpdate") + suite.False(rdr.Next(), "Expected empty record reader when IsUpdate is true") + suite.False(suite.srv.getFlightInfoPreparedCalled, "Expected GetFlightInfoPreparedStatement NOT to be called when IsUpdate is true") + suite.True(suite.srv.doPutPreparedStatementUpdCalled, "Expected DoPutPreparedStatementUpdate to be called when IsUpdate is true") + suite.False(suite.srv.doGetPreparedStatementCalled, "Expected DoGetPreparedStatement NOT to be called when IsUpdate is true") +} + +func TestIsUpdate(t *testing.T) { + suite.Run(t, &IsUpdateTests{}) +}