@@ -88,7 +88,27 @@ struct ConnectionStateMachine {
8888        case  sendParseDescribeBindExecuteSync( PostgresQuery ) 
8989        case  sendBindExecuteSync( PSQLExecuteStatement ) 
9090        case  failQuery( EventLoopPromise < PSQLRowStream > ,  with:  PSQLError ,  cleanupContext:  CleanUpContext ? ) 
91+         /// Fail a query's execution by resuming the continuation with the given error. When `sync` is `true`, send a
92+         /// `Sync` message to the backend.
93+         case  failQueryContinuation( AnyErrorContinuation ,  with:  PSQLError ,  sync:  Bool ,  cleanupContext:  CleanUpContext ? ) 
94+         /// Fail a query's execution by resuming the continuation with the given error and send a `Sync` message to the
95+         /// backend.
9196        case  succeedQuery( EventLoopPromise < PSQLRowStream > ,  with:  QueryResult ) 
97+         /// Succeed the continuation with a void result. When `sync` is `true`, send a `Sync` message to the backend.
98+         case  succeedQueryContinuation( CheckedContinuation < Void ,  any  Error > ,  sync:  Bool ) 
99+ 
100+         /// Trigger a data transfer returning a `PostgresCopyFromWriter` to the given continuation.
101+         ///
102+         /// Once the data transfer is triggered, it will send `CopyData` messages to the backend. After that the state
103+         /// machine needs to be prodded again to send a `CopyDone` or `CopyFail` by calling
104+         /// `PostgresChannelHandler.sendCopyDone` or `PostgresChannelHandler.sendCopyFail`.
105+         case  triggerCopyData( CheckedContinuation < PostgresCopyFromWriter ,  any  Error > ) 
106+ 
107+         /// Send a `CopyDone` and `Sync` message to the backend.
108+         case  sendCopyDoneAndSync
109+ 
110+         /// Send a `CopyFail` message to the backend with the given error message.
111+         case  sendCopyFail( message:  String ) 
92112
93113        // --- streaming actions
94114        // actions if query has requested next row but we are waiting for backend
@@ -107,6 +127,14 @@ struct ConnectionStateMachine {
107127        case  failClose( CloseCommandContext ,  with:  PSQLError ,  cleanupContext:  CleanUpContext ? ) 
108128    } 
109129
130+     enum  ChannelWritabilityChangedAction  { 
131+         /// No action needs to be taken based on the writability change.
132+         case  none
133+ 
134+         /// Resume the given continuation successfully.
135+         case  succeedPromise( EventLoopPromise < Void > ) 
136+     } 
137+ 
110138    private  var  state :  State 
111139    private  let  requireBackendKeyData :  Bool 
112140    private  var  taskQueue =  CircularBuffer < PSQLTask > ( ) 
@@ -587,6 +615,8 @@ struct ConnectionStateMachine {
587615            switch  queryContext. query { 
588616            case  . executeStatement( _,  let  promise) ,  . unnamed( _,  let  promise) : 
589617                return  . failQuery( promise,  with:  psqlErrror,  cleanupContext:  nil ) 
618+             case  . copyFrom( _,  let  triggerCopy) : 
619+                 return  . failQueryContinuation( . copyFromWriter( triggerCopy) ,  with:  psqlErrror,  sync:  false ,  cleanupContext:  nil ) 
590620            case  . prepareStatement( _,  _,  _,  let  promise) : 
591621                return  . failPreparedStatementCreation( promise,  with:  psqlErrror,  cleanupContext:  nil ) 
592622            } 
@@ -660,6 +690,16 @@ struct ConnectionStateMachine {
660690            preconditionFailure ( " Invalid state:  \( self . state) " ) 
661691        } 
662692    } 
693+ 
694+     mutating  func  channelWritabilityChanged( isWritable:  Bool )  ->  ChannelWritabilityChangedAction  { 
695+         guard  case . extendedQuery( var  queryState,  let  connectionContext)  =  state else  { 
696+             return  . none
697+         } 
698+         self . state =  . modifying // avoid CoW
699+         let  action  =  queryState. channelWritabilityChanged ( isWritable:  isWritable) 
700+         self . state =  . extendedQuery( queryState,  connectionContext) 
701+         return  action
702+     } 
663703
664704    // MARK: - Running Queries -
665705
@@ -752,10 +792,55 @@ struct ConnectionStateMachine {
752792        return  self . modify ( with:  action) 
753793    } 
754794
755-     mutating  func  copyInResponseReceived( 
756-         _ copyInResponse:  PostgresBackendMessage . CopyInResponseMessage 
757-     )  ->  ConnectionAction  { 
758-         return  self . closeConnectionAndCleanup ( . unexpectedBackendMessage( . copyInResponse( copyInResponse) ) ) 
795+     mutating  func  copyInResponseReceived( _ copyInResponse:  PostgresBackendMessage . CopyInResponseMessage )  ->  ConnectionAction  { 
796+         guard  case . extendedQuery( var  queryState,  let  connectionContext)  =  self . state,  !queryState. isComplete else  { 
797+             return  self . closeConnectionAndCleanup ( . unexpectedBackendMessage( . copyInResponse( copyInResponse) ) ) 
798+         } 
799+ 
800+         self . state =  . modifying // avoid CoW
801+         let  action  =  queryState. copyInResponseReceived ( copyInResponse) 
802+         self . state =  . extendedQuery( queryState,  connectionContext) 
803+         return  self . modify ( with:  action) 
804+     } 
805+ 
806+ 
807+     /// Succeed the promise when the channel to the backend is writable and the backend is ready to receive more data.
808+     ///
809+     /// The promise may be failed if the backend indicated that it can't handle any more data by sending an
810+     /// `ErrorResponse`. This is mostly the case when malformed data is sent to it. In that case, the data transfer
811+     /// should be aborted to avoid unnecessary work.
812+     mutating  func  checkBackendCanReceiveCopyData( channelIsWritable:  Bool ,  promise:  EventLoopPromise < Void > )  { 
813+         guard  case . extendedQuery( var  queryState,  let  connectionContext)  =  self . state else  { 
814+             preconditionFailure ( " Copy mode is only supported for extended queries " ) 
815+         } 
816+ 
817+         self . state =  . modifying // avoid CoW
818+         queryState. checkBackendCanReceiveCopyData ( channelIsWritable:  channelIsWritable,  promise:  promise) 
819+         self . state =  . extendedQuery( queryState,  connectionContext) 
820+     } 
821+ 
822+     /// Put the state machine out of the copying mode and send a `CopyDone` message to the backend.
823+     mutating  func  sendCopyDone( continuation:  CheckedContinuation < Void ,  any  Error > )  ->  ConnectionAction  { 
824+         guard  case . extendedQuery( var  queryState,  let  connectionContext)  =  self . state else  { 
825+             preconditionFailure ( " Copy mode is only supported for extended queries " ) 
826+         } 
827+ 
828+         self . state =  . modifying // avoid CoW
829+         let  action  =  queryState. sendCopyDone ( continuation:  continuation) 
830+         self . state =  . extendedQuery( queryState,  connectionContext) 
831+         return  self . modify ( with:  action) 
832+     } 
833+ 
834+     /// Put the state machine out of the copying mode and send a `CopyFail` message to the backend.
835+     mutating  func  sendCopyFail( message:  String ,  continuation:  CheckedContinuation < Void ,  any  Error > )  ->  ConnectionAction  { 
836+         guard  case . extendedQuery( var  queryState,  let  connectionContext)  =  self . state else  { 
837+             preconditionFailure ( " Copy mode is only supported for extended queries " ) 
838+         } 
839+ 
840+         self . state =  . modifying // avoid CoW
841+         let  action  =  queryState. sendCopyFail ( message:  message,  continuation:  continuation) 
842+         self . state =  . extendedQuery( queryState,  connectionContext) 
843+         return  self . modify ( with:  action) 
759844    } 
760845
761846    mutating  func  emptyQueryResponseReceived( )  ->  ConnectionAction  { 
@@ -866,14 +951,21 @@ struct ConnectionStateMachine {
866951                 . forwardRows, 
867952                 . forwardStreamComplete, 
868953                 . wait, 
869-                  . read: 
954+                  . read, 
955+                  . triggerCopyData, 
956+                  . sendCopyDoneAndSync, 
957+                  . sendCopyFail, 
958+                  . succeedQueryContinuation: 
870959                preconditionFailure ( " Invalid query state machine action in state:  \( self . state) , action:  \( action) " ) 
871960
872961            case  . evaluateErrorAtConnectionLevel: 
873962                return  . closeConnectionAndCleanup( cleanupContext) 
874963
875-             case  . failQuery( let  queryContext,  with:  let  error) : 
876-                 return  . failQuery( queryContext,  with:  error,  cleanupContext:  cleanupContext) 
964+             case  . failQuery( let  promise,  with:  let  error) : 
965+                 return  . failQuery( promise,  with:  error,  cleanupContext:  cleanupContext) 
966+ 
967+             case  . failQueryContinuation( let  continuation,  with:  let  error,  let  sync) : 
968+                 return  . failQueryContinuation( continuation,  with:  error,  sync:  sync,  cleanupContext:  cleanupContext) 
877969
878970            case  . forwardStreamError( let  error,  let  read) : 
879971                return  . forwardStreamError( error,  read:  read,  cleanupContext:  cleanupContext) 
@@ -1044,8 +1136,19 @@ extension ConnectionStateMachine {
10441136        case  . failQuery( let  requestContext,  with:  let  error) : 
10451137            let  cleanupContext  =  self . setErrorAndCreateCleanupContextIfNeeded ( error) 
10461138            return  . failQuery( requestContext,  with:  error,  cleanupContext:  cleanupContext) 
1139+         case  . failQueryContinuation( let  continuation,  with:  let  error,  let  sync) : 
1140+             let  cleanupContext  =  self . setErrorAndCreateCleanupContextIfNeeded ( error) 
1141+             return  . failQueryContinuation( continuation,  with:  error,  sync:  sync,  cleanupContext:  cleanupContext) 
10471142        case  . succeedQuery( let  requestContext,  with:  let  result) : 
10481143            return  . succeedQuery( requestContext,  with:  result) 
1144+         case  . succeedQueryContinuation( let  continuation,  let  sync) : 
1145+             return  . succeedQueryContinuation( continuation,  sync:  sync) 
1146+         case  . triggerCopyData( let  triggerCopy) : 
1147+             return  . triggerCopyData( triggerCopy) 
1148+         case  . sendCopyDoneAndSync: 
1149+             return  . sendCopyDoneAndSync
1150+         case  . sendCopyFail( message:  let  message) : 
1151+             return  . sendCopyFail( message:  message) 
10491152        case  . forwardRows( let  buffer) : 
10501153            return  . forwardRows( buffer) 
10511154        case  . forwardStreamComplete( let  buffer,  let  commandTag) : 
0 commit comments