diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlDataReader.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlDataReader.cs index 5e4208b7e6..0932bccb51 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlDataReader.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlDataReader.cs @@ -3766,7 +3766,7 @@ private TdsOperationStatus TryNextResult(out bool more) if (result != TdsOperationStatus.Done) { more = false; - return TdsOperationStatus.Done; + return result; } // In the case of not closing the reader, null out the metadata AFTER @@ -4525,7 +4525,12 @@ private TdsOperationStatus TryResetBlobState() #if DEBUG else { - Debug.Assert((_sharedState._columnDataBytesRemaining == 0 || _sharedState._columnDataBytesRemaining == -1) && _stateObj._longlen == 0, "Haven't read header yet, but column is partially read?"); + Debug.Assert( + (_sharedState._columnDataBytesRemaining == 0 || _sharedState._columnDataBytesRemaining == -1) + && + (_stateObj._longlen == 0 || _stateObj.IsSnapshotContinuing()), + "Haven't read header yet, but column is partially read?" + ); } #endif diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs index a03a92ad67..43ad123d4a 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs @@ -5556,6 +5556,12 @@ private TdsOperationStatus TryProcessColumnHeaderNoNBC(SqlMetaDataPriv col, TdsP { if (col.metaType.IsLong && !col.metaType.IsPlp) { + if (stateObj.IsSnapshotContinuing()) + { + length = (ulong)stateObj.GetSnapshotStorageLength(); + isNull = length == 0; + return TdsOperationStatus.Done; + } // // we don't care about TextPtrs, simply go after the data after it // @@ -5998,34 +6004,17 @@ private TdsOperationStatus TryReadSqlStringValue(SqlBuffer value, byte type, int if (isPlp) { - char[] cc = null; - bool buffIsRented = false; - result = TryReadPlpUnicodeChars(ref cc, 0, length >> 1, stateObj, out length, supportRentedBuff: true, rentedBuff: ref buffIsRented); + result = TryReadPlpUnicodeCharsWithContinue( + stateObj, + length, + out string resultString + ); if (result == TdsOperationStatus.Done) { - if (length > 0) - { - s = new string(cc, 0, length); - } - else - { - s = string.Empty; - } - } - - if (buffIsRented) - { - // do not use clearArray:true on the rented array because it can be massively larger - // than the space we've used and we would incur performance clearing memory that - // we haven't used and can't leak out information. - // clear only the length that we know we have used. - cc.AsSpan(0, length).Clear(); - ArrayPool.Shared.Return(cc, clearArray: false); - cc = null; + s = resultString; } - - if (result != TdsOperationStatus.Done) + else { return result; } @@ -6379,9 +6368,7 @@ internal TdsOperationStatus TryReadSqlValue(SqlBuffer value, SqlMetaDataPriv md, } else { - //Debug.Assert(length > 0 && length < (long)(Int32.MaxValue), "Bad length for column"); - b = new byte[length]; - result = stateObj.TryReadByteArray(b, length); + result = TryReadByteArrayWithContinue(stateObj, length, out b); if (result != TdsOperationStatus.Done) { return result; @@ -6502,6 +6489,48 @@ internal TdsOperationStatus TryReadSqlValue(SqlBuffer value, SqlMetaDataPriv md, return TdsOperationStatus.Done; } + private TdsOperationStatus TryReadByteArrayWithContinue(TdsParserStateObject stateObj, int length, out byte[] bytes) + { + bytes = null; + int offset = 0; + byte[] temp = null; + (bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); + if (isAvailable) + { + if (isContinuing || isStarting) + { + temp = stateObj.TryTakeSnapshotStorage() as byte[]; + Debug.Assert(bytes == null || bytes.Length == length, "stored buffer length must be null or must have been created with the correct length"); + } + if (temp != null) + { + offset = stateObj.GetSnapshotTotalSize(); + } + } + + + if (temp == null) + { + temp = new byte[length]; + } + + TdsOperationStatus result = stateObj.TryReadByteArray(temp, length, out _, offset, isStarting || isContinuing); + + if (result == TdsOperationStatus.Done) + { + bytes = temp; + } + else if (result == TdsOperationStatus.NeedMoreData) + { + if (isStarting || isContinuing) + { + stateObj.SetSnapshotStorage(temp); + } + } + + return result; + } + private TdsOperationStatus TryReadSqlDateTime(SqlBuffer value, byte tdsType, int length, byte scale, TdsParserStateObject stateObj) { Span datetimeBuffer = ((uint)length <= 16) ? stackalloc byte[16] : new byte[length]; @@ -8071,14 +8100,22 @@ internal TdsOperationStatus TryGetTokenLength(byte token, TdsParserStateObject s case TdsEnums.SQLVarCnt: if (0 != (token & 0x80)) { - ushort value; - result = stateObj.TryReadUInt16(out value); - if (result != TdsOperationStatus.Done) + if (stateObj.IsSnapshotContinuing()) { - tokenLength = 0; - return result; + tokenLength = stateObj.GetSnapshotStorageLength(); + Debug.Assert(tokenLength != 0, "stored buffer length on continue must contain the length of the data required for the token"); + } + else + { + ushort value; + result = stateObj.TryReadUInt16(out value); + if (result != TdsOperationStatus.Done) + { + tokenLength = 0; + return result; + } + tokenLength = value; } - tokenLength = value; return TdsOperationStatus.Done; } else if (0 == (token & 0x0c)) @@ -12872,6 +12909,85 @@ internal int ReadPlpUnicodeChars(ref char[] buff, int offst, int len, TdsParserS return charsRead; } + internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObject stateObj, int length, out string resultString) + { + resultString = null; + char[] temp = null; + bool buffIsRented = false; + int startOffset = 0; + (bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); + + if (isAvailable) + { + if (isContinuing || isStarting) + { + temp = stateObj.TryTakeSnapshotStorage() as char[]; + Debug.Assert(temp == null || length == int.MaxValue || temp.Length == length, "stored buffer length must be null or must have been created with the correct length"); + } + if (temp != null) + { + startOffset = stateObj.GetSnapshotTotalSize(); + } + } + + TdsOperationStatus result = TryReadPlpUnicodeChars( + ref temp, + 0, + length >> 1, + stateObj, + out length, + supportRentedBuff: !isAvailable, // do not use the arraypool if we are going to keep the buffer in the snapshot + rentedBuff: ref buffIsRented, + startOffset, + isStarting || isContinuing + ); + + if (result == TdsOperationStatus.Done) + { + if (length > 0) + { + resultString = new string(temp, 0, length); + } + else + { + resultString = string.Empty; + } + + if (buffIsRented) + { + // do not use clearArray:true on the rented array because it can be massively larger + // than the space we've used and we would incur performance clearing memory that + // we haven't used and can't leak out information. + // clear only the length that we know we have used. + temp.AsSpan(0, length).Clear(); + ArrayPool.Shared.Return(temp, clearArray: false); + temp = null; + } + } + else if (result == TdsOperationStatus.NeedMoreData) + { + if (isStarting || isContinuing) + { + stateObj.SetSnapshotStorage(temp); + } + } + + return result; + } + + internal TdsOperationStatus TryReadPlpUnicodeChars( + ref char[] buff, + int offst, + int len, + TdsParserStateObject stateObj, + out int totalCharsRead, + bool supportRentedBuff, + ref bool rentedBuff + ) + { + return TryReadPlpUnicodeChars(ref buff, offst, len, stateObj, out totalCharsRead, supportRentedBuff, ref rentedBuff, 0, false); + } + // Reads the requested number of chars from a plp data stream, or the entire data if // requested length is -1 or larger than the actual length of data. First call to this method // should be preceeded by a call to ReadPlpLength or ReadDataLength. @@ -12883,11 +12999,13 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( TdsParserStateObject stateObj, out int totalCharsRead, bool supportRentedBuff, - ref bool rentedBuff) + ref bool rentedBuff, + int startOffsetByteCount, + bool writeDataSizeToSnapshot + ) { int charsRead = 0; int charsLeft = 0; - char[] newbuf; if (stateObj._longlen == 0) { @@ -12897,8 +13015,14 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( } Debug.Assert((ulong)stateObj._longlen != TdsEnums.SQL_PLP_NULL, "Out of sync plp read request"); - - Debug.Assert((buff == null && offst == 0) || (buff.Length >= offst + len), "Invalid length sent to ReadPlpUnicodeChars()!"); + Debug.Assert( + (buff == null && offst == 0) + || + (buff.Length >= offst + len) + || + (buff.Length == (startOffsetByteCount >> 1) + 1), + "Invalid length sent to ReadPlpUnicodeChars()!" + ); charsLeft = len; // If total length is known up front, the length isn't specified as unknown @@ -12919,6 +13043,11 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( } TdsOperationStatus result; + + bool partialReadInProgress = (startOffsetByteCount & 0x1) == 1; + bool restartingDataSizeCount = startOffsetByteCount == 0; + int currentPacketId = 0; + if (stateObj._longlenleft == 0) { result = stateObj.TryReadPlpLength(false, out _); @@ -12934,63 +13063,100 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( } } - totalCharsRead = 0; + totalCharsRead = (startOffsetByteCount >> 1); + charsLeft -= totalCharsRead; + offst = totalCharsRead; + + while (charsLeft > 0) { - charsRead = (int)Math.Min((stateObj._longlenleft + 1) >> 1, (ulong)charsLeft); - if ((buff == null) || (buff.Length < (offst + charsRead))) + if (!partialReadInProgress) { - bool returnRentedBufferAfterCopy = rentedBuff; - if (supportRentedBuff && (offst + charsRead) < 1073741824) // 1 Gib + charsRead = (int)Math.Min((stateObj._longlenleft + 1) >> 1, (ulong)charsLeft); + if ((buff == null) || (buff.Length < (offst + charsRead))) { - newbuf = ArrayPool.Shared.Rent(offst + charsRead); - rentedBuff = true; + char[] newbuf; + bool returnRentedBufferAfterCopy = rentedBuff; + if (supportRentedBuff && (offst + charsRead) < 1073741824) // 1 Gib + { + newbuf = ArrayPool.Shared.Rent(offst + charsRead); + rentedBuff = true; + } + else + { + newbuf = new char[offst + charsRead]; + rentedBuff = false; + } + + if (buff != null) + { + Buffer.BlockCopy(buff, 0, newbuf, 0, offst * 2); + if (returnRentedBufferAfterCopy) + { + buff.AsSpan(0, offst).Clear(); + ArrayPool.Shared.Return(buff, clearArray: false); + } + } + buff = newbuf; + newbuf = null; } - else + if (charsRead > 0) { - newbuf = new char[offst + charsRead]; - rentedBuff = false; - } + result = TryReadPlpUnicodeCharsChunk(buff, offst, charsRead, stateObj, out charsRead); + if (result != TdsOperationStatus.Done) + { + return result; + } + charsLeft -= charsRead; + offst += charsRead; + totalCharsRead += charsRead; - if (buff != null) - { - Buffer.BlockCopy(buff, 0, newbuf, 0, offst * 2); - if (returnRentedBufferAfterCopy) + if (writeDataSizeToSnapshot) { - buff.AsSpan(0, offst).Clear(); - ArrayPool.Shared.Return(buff, clearArray: false); + currentPacketId = IncrementSnapshotDataSize(stateObj, restartingDataSizeCount, currentPacketId, charsRead * 2); } } - buff = newbuf; } - if (charsRead > 0) + // Special case single byte + if ( + (stateObj._longlenleft == 1 || partialReadInProgress) + && (charsLeft > 0) + ) { - result = TryReadPlpUnicodeCharsChunk(buff, offst, charsRead, stateObj, out charsRead); - if (result != TdsOperationStatus.Done) + byte b1 = 0; + byte b2 = 0; + if (partialReadInProgress) { - return result; + partialReadInProgress = false; + // we're resuming with a partial char in the buffer so we need to load the byte + // from the char buffer and put it into b1 so we can combine it with the second + // half later + b1 = (byte)(buff[offst] & 0x00ff); } - charsLeft -= charsRead; - offst += charsRead; - totalCharsRead += charsRead; - } - // Special case single byte left - if (stateObj._longlenleft == 1 && (charsLeft > 0)) - { - byte b1; - result = stateObj.TryReadByte(out b1); - if (result != TdsOperationStatus.Done) - { - return result; - } - stateObj._longlenleft--; - result = stateObj.TryReadPlpLength(false, out _); - if (result != TdsOperationStatus.Done) + else { - return result; + result = stateObj.TryReadByte(out b1); + if (result != TdsOperationStatus.Done) + { + return result; + } + stateObj._longlenleft--; + if (writeDataSizeToSnapshot) + { + // we need to write the single b1 byte to the array because we may run out of data + // and need to wait for another packet + buff[offst] = (char)((b1 & 0xff)); + currentPacketId = IncrementSnapshotDataSize(stateObj, restartingDataSizeCount, currentPacketId, 1); + } + + result = stateObj.TryReadPlpLength(false, out _); + if (result != TdsOperationStatus.Done) + { + return result; + } + Debug.Assert((stateObj._longlenleft != 0), "ReadPlpUnicodeChars: Odd byte left at the end!"); } - Debug.Assert((stateObj._longlenleft != 0), "ReadPlpUnicodeChars: Odd byte left at the end!"); - byte b2; + result = stateObj.TryReadByte(out b2); if (result != TdsOperationStatus.Done) { @@ -13003,6 +13169,11 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( charsRead++; charsLeft--; totalCharsRead++; + + if (writeDataSizeToSnapshot) + { + currentPacketId = IncrementSnapshotDataSize(stateObj, restartingDataSizeCount, currentPacketId, 1); + } } if (stateObj._longlenleft == 0) { @@ -13015,11 +13186,44 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( } if (stateObj._longlenleft == 0) // Data read complete + { break; + } } return TdsOperationStatus.Done; + + static int IncrementSnapshotDataSize(TdsParserStateObject stateObj, bool resetting, int previousPacketId, int value) + { + int current = 0; + if (resetting) + { + int currentPacketId = stateObj.GetSnapshotPacketID(); + if (previousPacketId == currentPacketId) + { + // we have already reset it the first time we saw it so just add normally + current = stateObj.GetSnapshotDataSize(); + } + else + { + // a packet we haven't seen before, reset the size + current = 0; + } + + stateObj.SetSnapshotDataSize(current + value); + + // return new packetid so next time we see this packet we know it isn't new + return currentPacketId; + } + else + { + current = stateObj.GetSnapshotDataSize(); + stateObj.SetSnapshotDataSize(current + value); + return previousPacketId; + } + } } + internal int ReadPlpAnsiChars(ref char[] buff, int offst, int len, SqlMetaDataPriv metadata, TdsParserStateObject stateObj) { int charsRead = 0; diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlDataReader.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlDataReader.cs index 60a8cf6153..4f308d3c5f 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlDataReader.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlDataReader.cs @@ -4524,7 +4524,12 @@ private TdsOperationStatus TryResetBlobState() #if DEBUG else { - Debug.Assert((_sharedState._columnDataBytesRemaining == 0 || _sharedState._columnDataBytesRemaining == -1) && _stateObj._longlen == 0, "Haven't read header yet, but column is partially read?"); + Debug.Assert( + (_sharedState._columnDataBytesRemaining == 0 || _sharedState._columnDataBytesRemaining == -1) + && + (_stateObj._longlen == 0 || _stateObj.IsSnapshotContinuing()), + "Haven't read header yet, but column is partially read?" + ); } #endif diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs index 2b41165983..6dc02c7505 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs @@ -6019,6 +6019,12 @@ private TdsOperationStatus TryProcessColumnHeaderNoNBC(SqlMetaDataPriv col, TdsP { if (col.metaType.IsLong && !col.metaType.IsPlp) { + if (stateObj.IsSnapshotContinuing()) + { + length = (ulong)stateObj.GetSnapshotStorageLength(); + isNull = length == 0; + return TdsOperationStatus.Done; + } // // we don't care about TextPtrs, simply go after the data after it // @@ -6470,34 +6476,17 @@ private TdsOperationStatus TryReadSqlStringValue(SqlBuffer value, byte type, int if (isPlp) { - char[] cc = null; - bool buffIsRented = false; - result = TryReadPlpUnicodeChars(ref cc, 0, length >> 1, stateObj, out length, supportRentedBuff: true, rentedBuff: ref buffIsRented); + result = TryReadPlpUnicodeCharsWithContinue( + stateObj, + length, + out string resultString + ); if (result == TdsOperationStatus.Done) { - if (length > 0) - { - s = new string(cc, 0, length); - } - else - { - s = string.Empty; - } - } - - if (buffIsRented) - { - // do not use clearArray:true on the rented array because it can be massively larger - // than the space we've used and we would incur performance clearing memory that - // we haven't used and can't leak out information. - // clear only the length that we know we have used. - cc.AsSpan(0, length).Clear(); - ArrayPool.Shared.Return(cc, clearArray: false); - cc = null; + s = resultString; } - - if (result != TdsOperationStatus.Done) + else { return result; } @@ -6856,9 +6845,7 @@ internal TdsOperationStatus TryReadSqlValue(SqlBuffer value, } else { - //Debug.Assert(length > 0 && length < (long)(Int32.MaxValue), "Bad length for column"); - b = new byte[length]; - result = stateObj.TryReadByteArray(b, length); + result = TryReadByteArrayWithContinue(stateObj, length, out b); if (result != TdsOperationStatus.Done) { return result; @@ -6979,6 +6966,48 @@ internal TdsOperationStatus TryReadSqlValue(SqlBuffer value, return TdsOperationStatus.Done; } + private TdsOperationStatus TryReadByteArrayWithContinue(TdsParserStateObject stateObj, int length, out byte[] bytes) + { + bytes = null; + int offset = 0; + byte[] temp = null; + (bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); + if (isAvailable) + { + if (isContinuing || isStarting) + { + temp = stateObj.TryTakeSnapshotStorage() as byte[]; + Debug.Assert(bytes == null || bytes.Length == length, "stored buffer length must be null or must have been created with the correct length"); + } + if (temp != null) + { + offset = stateObj.GetSnapshotTotalSize(); + } + } + + + if (temp == null) + { + temp = new byte[length]; + } + + TdsOperationStatus result = stateObj.TryReadByteArray(temp, length, out _, offset, isStarting || isContinuing); + + if (result == TdsOperationStatus.Done) + { + bytes = temp; + } + else if (result == TdsOperationStatus.NeedMoreData) + { + if (isStarting || isContinuing) + { + stateObj.SetSnapshotStorage(temp); + } + } + + return result; + } + private TdsOperationStatus TryReadSqlDateTime(SqlBuffer value, byte tdsType, int length, byte scale, TdsParserStateObject stateObj) { Span datetimeBuffer = ((uint)length <= 16) ? stackalloc byte[16] : new byte[length]; @@ -8541,14 +8570,22 @@ internal TdsOperationStatus TryGetTokenLength(byte token, TdsParserStateObject s case TdsEnums.SQLVarCnt: if (0 != (token & 0x80)) { - ushort value; - result = stateObj.TryReadUInt16(out value); - if (result != TdsOperationStatus.Done) + if (stateObj.IsSnapshotContinuing()) { - tokenLength = 0; - return result; + tokenLength = stateObj.GetSnapshotStorageLength(); + Debug.Assert(tokenLength != 0, "stored buffer length on continue must contain the length of the data required for the token"); + } + else + { + ushort value; + result = stateObj.TryReadUInt16(out value); + if (result != TdsOperationStatus.Done) + { + tokenLength = 0; + return result; + } + tokenLength = value; } - tokenLength = value; return TdsOperationStatus.Done; } else if (0 == (token & 0x0c)) @@ -13415,6 +13452,85 @@ internal int ReadPlpUnicodeChars(ref char[] buff, int offst, int len, TdsParserS return charsRead; } + internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObject stateObj, int length, out string resultString) + { + resultString = null; + char[] temp = null; + bool buffIsRented = false; + int startOffset = 0; + (bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); + + if (isAvailable) + { + if (isContinuing || isStarting) + { + temp = stateObj.TryTakeSnapshotStorage() as char[]; + Debug.Assert(temp == null || length == int.MaxValue || temp.Length == length, "stored buffer length must be null or must have been created with the correct length"); + } + if (temp != null) + { + startOffset = stateObj.GetSnapshotTotalSize(); + } + } + + TdsOperationStatus result = TryReadPlpUnicodeChars( + ref temp, + 0, + length >> 1, + stateObj, + out length, + supportRentedBuff: !isAvailable, // do not use the arraypool if we are going to keep the buffer in the snapshot + rentedBuff: ref buffIsRented, + startOffset, + isStarting || isContinuing + ); + + if (result == TdsOperationStatus.Done) + { + if (length > 0) + { + resultString = new string(temp, 0, length); + } + else + { + resultString = string.Empty; + } + + if (buffIsRented) + { + // do not use clearArray:true on the rented array because it can be massively larger + // than the space we've used and we would incur performance clearing memory that + // we haven't used and can't leak out information. + // clear only the length that we know we have used. + temp.AsSpan(0, length).Clear(); + ArrayPool.Shared.Return(temp, clearArray: false); + temp = null; + } + } + else if (result == TdsOperationStatus.NeedMoreData) + { + if (isStarting || isContinuing) + { + stateObj.SetSnapshotStorage(temp); + } + } + + return result; + } + + internal TdsOperationStatus TryReadPlpUnicodeChars( + ref char[] buff, + int offst, + int len, + TdsParserStateObject stateObj, + out int totalCharsRead, + bool supportRentedBuff, + ref bool rentedBuff + ) + { + return TryReadPlpUnicodeChars(ref buff, offst, len, stateObj, out totalCharsRead, supportRentedBuff, ref rentedBuff, 0, false); + } + // Reads the requested number of chars from a plp data stream, or the entire data if // requested length is -1 or larger than the actual length of data. First call to this method // should be preceeded by a call to ReadPlpLength or ReadDataLength. @@ -13426,12 +13542,14 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( TdsParserStateObject stateObj, out int totalCharsRead, bool supportRentedBuff, - ref bool rentedBuff) + ref bool rentedBuff, + int startOffsetByteCount, + bool writeDataSizeToSnapshot + ) { int charsRead = 0; int charsLeft = 0; - char[] newbuf; - + if (stateObj._longlen == 0) { Debug.Assert(stateObj._longlenleft == 0); @@ -13440,8 +13558,14 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( } Debug.Assert((ulong)stateObj._longlen != TdsEnums.SQL_PLP_NULL, "Out of sync plp read request"); - - Debug.Assert((buff == null && offst == 0) || (buff.Length >= offst + len), "Invalid length sent to ReadPlpUnicodeChars()!"); + Debug.Assert( + (buff == null && offst == 0) + || + (buff.Length >= offst + len) + || + (buff.Length == (startOffsetByteCount >> 1) + 1), + "Invalid length sent to ReadPlpUnicodeChars()!" + ); charsLeft = len; // If total length is known up front, the length isn't specified as unknown @@ -13462,6 +13586,11 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( } TdsOperationStatus result; + + bool partialReadInProgress = (startOffsetByteCount & 0x1) == 1; + bool restartingDataSizeCount = startOffsetByteCount == 0; + int currentPacketId = 0; + if (stateObj._longlenleft == 0) { result = stateObj.TryReadPlpLength(false, out _); @@ -13477,63 +13606,101 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( } } - totalCharsRead = 0; + totalCharsRead = (startOffsetByteCount >> 1); + charsLeft -= totalCharsRead; + offst = totalCharsRead; + + while (charsLeft > 0) { - charsRead = (int)Math.Min((stateObj._longlenleft + 1) >> 1, (ulong)charsLeft); - if ((buff == null) || (buff.Length < (offst + charsRead))) + if (!partialReadInProgress) { - bool returnRentedBufferAfterCopy = rentedBuff; - if (supportRentedBuff && (offst + charsRead) < 1073741824) // 1 Gib + charsRead = (int)Math.Min((stateObj._longlenleft + 1) >> 1, (ulong)charsLeft); + if ((buff == null) || (buff.Length < (offst + charsRead))) { - newbuf = ArrayPool.Shared.Rent(offst + charsRead); - rentedBuff = true; - } - else - { - newbuf = new char[offst + charsRead]; - rentedBuff = false; - } + char[] newbuf; + bool returnRentedBufferAfterCopy = rentedBuff; + if (supportRentedBuff && (offst + charsRead) < 1073741824) // 1 Gib + { + newbuf = ArrayPool.Shared.Rent(offst + charsRead); + rentedBuff = true; + } + else + { + newbuf = new char[offst + charsRead]; + rentedBuff = false; + } - if (buff != null) - { - Buffer.BlockCopy(buff, 0, newbuf, 0, offst * 2); - if (returnRentedBufferAfterCopy) + if (buff != null) { - buff.AsSpan(0, offst).Clear(); - ArrayPool.Shared.Return(buff, clearArray: false); + Buffer.BlockCopy(buff, 0, newbuf, 0, offst * 2); + if (returnRentedBufferAfterCopy) + { + buff.AsSpan(0, offst).Clear(); + ArrayPool.Shared.Return(buff, clearArray: false); + } } + buff = newbuf; + newbuf = null; } - buff = newbuf; - } - if (charsRead > 0) - { - result = TryReadPlpUnicodeCharsChunk(buff, offst, charsRead, stateObj, out charsRead); - if (result != TdsOperationStatus.Done) + if (charsRead > 0) { - return result; + result = TryReadPlpUnicodeCharsChunk(buff, offst, charsRead, stateObj, out charsRead); + if (result != TdsOperationStatus.Done) + { + return result; + } + charsLeft -= charsRead; + offst += charsRead; + totalCharsRead += charsRead; + + if (writeDataSizeToSnapshot) + { + currentPacketId = IncrementSnapshotDataSize(stateObj, restartingDataSizeCount, currentPacketId, charsRead * 2); + } } - charsLeft -= charsRead; - offst += charsRead; - totalCharsRead += charsRead; } - // Special case single byte left - if (stateObj._longlenleft == 1 && (charsLeft > 0)) + + // Special case single byte + if ( + (stateObj._longlenleft == 1 || partialReadInProgress ) + && (charsLeft > 0) + ) { - byte b1; - result = stateObj.TryReadByte(out b1); - if (result != TdsOperationStatus.Done) + byte b1=0; + byte b2=0; + if (partialReadInProgress) { - return result; + partialReadInProgress = false; + // we're resuming with a partial char in the buffer so we need to load the byte + // from the char buffer and put it into b1 so we can combine it with the second + // half later + b1 = (byte)(buff[offst] & 0x00ff); } - stateObj._longlenleft--; - result = stateObj.TryReadPlpLength(false, out _); - if (result != TdsOperationStatus.Done) + else { - return result; + result = stateObj.TryReadByte(out b1); + if (result != TdsOperationStatus.Done) + { + return result; + } + stateObj._longlenleft--; + if (writeDataSizeToSnapshot) + { + // we need to write the single b1 byte to the array because we may run out of data + // and need to wait for another packet + buff[offst] = (char)((b1 & 0xff)); + currentPacketId = IncrementSnapshotDataSize(stateObj, restartingDataSizeCount, currentPacketId, 1); + } + + result = stateObj.TryReadPlpLength(false, out _); + if (result != TdsOperationStatus.Done) + { + return result; + } + Debug.Assert((stateObj._longlenleft != 0), "ReadPlpUnicodeChars: Odd byte left at the end!"); } - Debug.Assert((stateObj._longlenleft != 0), "ReadPlpUnicodeChars: Odd byte left at the end!"); - byte b2; + result = stateObj.TryReadByte(out b2); if (result != TdsOperationStatus.Done) { @@ -13546,6 +13713,11 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( charsRead++; charsLeft--; totalCharsRead++; + + if (writeDataSizeToSnapshot) + { + currentPacketId = IncrementSnapshotDataSize(stateObj, restartingDataSizeCount, currentPacketId, 1); + } } if (stateObj._longlenleft == 0) { @@ -13558,9 +13730,41 @@ internal TdsOperationStatus TryReadPlpUnicodeChars( } if (stateObj._longlenleft == 0) // Data read complete + { break; + } } return TdsOperationStatus.Done; + + static int IncrementSnapshotDataSize(TdsParserStateObject stateObj, bool resetting, int previousPacketId, int value) + { + int current = 0; + if (resetting) + { + int currentPacketId = stateObj.GetSnapshotPacketID(); + if (previousPacketId == currentPacketId) + { + // we have already reset it the first time we saw it so just add normally + current = stateObj.GetSnapshotDataSize(); + } + else + { + // a packet we haven't seen before, reset the size + current = 0; + } + + stateObj.SetSnapshotDataSize(current + value); + + // return new packetid so next time we see this packet we know it isn't new + return currentPacketId; + } + else + { + current = stateObj.GetSnapshotDataSize(); + stateObj.SetSnapshotDataSize(current + value); + return previousPacketId; + } + } } internal int ReadPlpAnsiChars(ref char[] buff, int offst, int len, SqlMetaDataPriv metadata, TdsParserStateObject stateObj) diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs index b66154a2ae..23eae838ae 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs @@ -21,6 +21,7 @@ private enum Tristate : byte internal const string UseMinimumLoginTimeoutString = @"Switch.Microsoft.Data.SqlClient.UseOneSecFloorInTimeoutCalculationDuringLogin"; internal const string LegacyVarTimeZeroScaleBehaviourString = @"Switch.Microsoft.Data.SqlClient.LegacyVarTimeZeroScaleBehaviour"; internal const string UseCompatibilityProcessSniString = @"Switch.Microsoft.Data.SqlClient.UseCompatibilityProcessSni"; + internal const string UseCompatibilityAsyncBehaviourString = @"Switch.Microsoft.Data.SqlClient.UseCompatibilityAsyncBehaviour"; // this field is accessed through reflection in tests and should not be renamed or have the type changed without refactoring NullRow related tests private static Tristate s_legacyRowVersionNullBehavior; @@ -30,6 +31,7 @@ private enum Tristate : byte // this field is accessed through reflection in Microsoft.Data.SqlClient.Tests.SqlParameterTests and should not be renamed or have the type changed without refactoring related tests private static Tristate s_legacyVarTimeZeroScaleBehaviour; private static Tristate s_useCompatProcessSni; + private static Tristate s_useCompatAsyncBehaviour; #if NET static LocalAppContextSwitches() @@ -85,6 +87,12 @@ public static bool DisableTNIRByDefault } } #endif + /// + /// In TdsParser the ProcessSni function changed significantly when the packet + /// multiplexing code needed for high speed multi-packet column values was added. + /// In case of compatibility problems this switch will change TdsParser to use + /// the previous version of the function. + /// public static bool UseCompatibilityProcessSni { get @@ -104,6 +112,42 @@ public static bool UseCompatibilityProcessSni } } + /// + /// In TdsParser the async multi-packet column value fetch behaviour is capable of + /// using a continue snapshot state in addition to the original replay from start + /// logic. + /// This switch disables use of the continue snapshot state. This switch will always + /// return true if is enabled because the + /// continue state is not stable without the multiplexer. + /// + public static bool UseCompatibilityAsyncBehaviour + { + get + { + if (UseCompatibilityProcessSni) + { + // If ProcessSni compatibility mode has been enabled then the packet + // multiplexer has been disabled. The new async behaviour using continue + // point capture is only stable if the multiplexer is enabled so we must + // return true to enable compatibility async behaviour using only restarts. + return true; + } + + if (s_useCompatAsyncBehaviour == Tristate.NotInitialized) + { + if (AppContext.TryGetSwitch(UseCompatibilityAsyncBehaviourString, out bool returnedValue) && returnedValue) + { + s_useCompatAsyncBehaviour = Tristate.True; + } + else + { + s_useCompatAsyncBehaviour = Tristate.False; + } + } + return s_useCompatAsyncBehaviour == Tristate.True; + } + } + /// /// When using Encrypt=false in the connection string, a security warning is output to the console if the TLS version is 1.2 or lower. /// This warning can be suppressed by enabling this AppContext switch. diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs index 2b656501a5..1f1670b97f 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs @@ -37,11 +37,26 @@ private SqlCachedBuffer(List cachedBytes) /// internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser parser, TdsParserStateObject stateObj, out SqlCachedBuffer buffer) { - byte[] byteArr; - - List cachedBytes = new(); buffer = null; + (bool isAvailable, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses(); + + List cachedBytes = null; + if (isAvailable) + { + cachedBytes = stateObj.TryTakeSnapshotStorage() as List; + if (cachedBytes != null && !isStarting && !isContinuing) + { + stateObj.SetSnapshotStorage(null); + } + } + + if (cachedBytes == null) + { + cachedBytes = new List(); + } + + // the very first length is already read. TdsOperationStatus result = parser.TryPlpBytesLeft(stateObj, out ulong plplength); if (result != TdsOperationStatus.Done) @@ -49,6 +64,7 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser return result; } + // For now we only handle Plp data from the parser directly. Debug.Assert(metadata.metaType.IsPlp, "SqlCachedBuffer call on a non-plp data"); do @@ -59,13 +75,25 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser } do { + bool returnAfterAdd = false; int cb = (plplength > (ulong)MaxChunkSize) ? MaxChunkSize : (int)plplength; - byteArr = new byte[cb]; - result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb); + byte[] byteArr = new byte[cb]; + // pass false for the writeDataSizeToSnapshot parameter because we want to only take data + // from the current packet and not try to do a continue-capable multi packet read + result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, writeDataSizeToSnapshot: false, compatibilityMode: false); if (result != TdsOperationStatus.Done) { - return result; + if (result == TdsOperationStatus.NeedMoreData && isAvailable && cb == byteArr.Length) + { + // succeeded in getting the data but failed to find the next plp length + returnAfterAdd = true; + } + else + { + return result; + } } + Debug.Assert(cb == byteArr.Length); if (cachedBytes.Count == 0) { @@ -74,6 +102,16 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser } cachedBytes.Add(byteArr); plplength -= (ulong)cb; + + if (returnAfterAdd) + { + if (isStarting || isContinuing) + { + stateObj.SetSnapshotStorage(cachedBytes); + } + return result; + } + } while (plplength > 0); result = parser.TryPlpBytesLeft(stateObj, out plplength); diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs index 6cd3900906..e9a3552008 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @@ -69,7 +69,8 @@ private enum SnapshotStatus { NotActive, ReplayStarting, - ReplayRunning + ReplayRunning, + ContinueRunning } private const int AttentionTimeoutSeconds = 5; @@ -1199,12 +1200,17 @@ internal TdsOperationStatus TryPeekByte(out byte value) // bytes from the in buffer. public TdsOperationStatus TryReadByteArray(Span buff, int len) { - return TryReadByteArray(buff, len, out _); + return TryReadByteArray(buff, len, out _, 0, false); + } + + public TdsOperationStatus TryReadByteArray(Span buff, int len, out int totalRead) + { + return TryReadByteArray(buff, len, out totalRead, 0, false); } // NOTE: This method must be retriable WITHOUT replaying a snapshot // Every time you call this method increment the offset and decrease len by the value of totalRead - public TdsOperationStatus TryReadByteArray(Span buff, int len, out int totalRead) + public TdsOperationStatus TryReadByteArray(Span buff, int len, out int totalRead, int startOffset, bool writeDataSizeToSnapshot) { totalRead = 0; @@ -1229,6 +1235,10 @@ public TdsOperationStatus TryReadByteArray(Span buff, int len, out int tot Debug.Assert(buff.IsEmpty || buff.Length >= len, "Invalid length sent to ReadByteArray()!"); + + totalRead += startOffset; + len -= startOffset; + // loop through and read up to array length while (len > 0) { @@ -1255,6 +1265,11 @@ public TdsOperationStatus TryReadByteArray(Span buff, int len, out int tot _inBytesPacket -= bytesToRead; len -= bytesToRead; + if (writeDataSizeToSnapshot) + { + SetSnapshotDataSize(bytesToRead); + } + AssertValidState(); } @@ -1669,6 +1684,7 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En } byte[] buf = null; int offset = 0; + (bool isAvailable, bool isStarting, bool isContinuing) = GetSnapshotStatuses(); if (isPlp) { @@ -1685,21 +1701,40 @@ internal TdsOperationStatus TryReadStringWithEncoding(int length, System.Text.En { if (((_inBytesUsed + length) > _inBytesRead) || (_inBytesPacket < length)) { - if (_bTmp == null || _bTmp.Length < length) + int startOffset = 0; + if (isAvailable) { - _bTmp = new byte[length]; + if (isContinuing || isStarting) + { + buf = TryTakeSnapshotStorage() as byte[]; + Debug.Assert(buf == null || buf.Length == length, "stored buffer length must be null or must have been created with the correct length"); + } + if (buf != null) + { + startOffset = GetSnapshotTotalSize(); + } + } + + if (buf == null || buf.Length < length) + { + buf = new byte[length]; } - TdsOperationStatus result = TryReadByteArray(_bTmp, length); + TdsOperationStatus result = TryReadByteArray(buf, length, out _, startOffset, isAvailable); + if (result != TdsOperationStatus.Done) { + if (result == TdsOperationStatus.NeedMoreData) + { + if (isStarting || isContinuing) + { + SetSnapshotStorage(buf); + } + } value = null; return result; } - // assign local to point to parser scratch buffer - buf = _bTmp; - AssertValidState(); } else @@ -1813,18 +1848,26 @@ internal int ReadPlpBytesChunk(byte[] buff, int offset, int len) return value; } + internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead) + { + bool isStarting = false; + bool isContinuing = false; + bool compatibilityMode = LocalAppContextSwitches.UseCompatibilityAsyncBehaviour; + if (!compatibilityMode) + { + (_, isStarting, isContinuing) = GetSnapshotStatuses(); + } + return TryReadPlpBytes(ref buff, offset, len, out totalBytesRead, isStarting || isContinuing, compatibilityMode); + } // Reads the requested number of bytes from a plp data stream, or the entire data if // requested length is -1 or larger than the actual length of data. First call to this method // should be preceeded by a call to ReadPlpLength or ReadDataLength. // Returns the actual bytes read. // NOTE: This method must be retriable WITHOUT replaying a snapshot // Every time you call this method increment the offset and decrease len by the value of totalBytesRead - internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead) + internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len, out int totalBytesRead, bool writeDataSizeToSnapshot, bool compatibilityMode) { totalBytesRead = 0; - int bytesRead; - int bytesLeft; - byte[] newbuf; if (_longlen == 0) { @@ -1842,17 +1885,28 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len Debug.Assert(_longlen != TdsEnums.SQL_PLP_NULL, "Out of sync plp read request"); Debug.Assert((buff == null && offset == 0) || (buff.Length >= offset + len), "Invalid length sent to ReadPlpBytes()!"); - bytesLeft = len; + int bytesLeft = len; // If total length is known up front, allocate the whole buffer in one shot instead of realloc'ing and copying over each time if (buff == null && _longlen != TdsEnums.SQL_PLP_UNKNOWNLEN) { - if (_snapshot != null && _snapshotStatus != SnapshotStatus.NotActive) + if (writeDataSizeToSnapshot) + { + // if there is a snapshot and it contains a stored plp buffer take it + // and try to use it if it is the right length + buff = TryTakeSnapshotStorage() as byte[]; + if (buff != null) + { + offset = _snapshot.GetPacketDataOffset(); + totalBytesRead = offset; + } + } + else if (compatibilityMode && _snapshot != null && _snapshotStatus != SnapshotStatus.NotActive) { + // legacy replay path perf optimization // if there is a snapshot and it contains a stored plp buffer take it // and try to use it if it is the right length - buff = _snapshot._plpBuffer; - _snapshot._plpBuffer = null; + buff = TryTakeSnapshotStorage() as byte[]; } if ((ulong)(buff?.Length ?? 0) != _longlen) @@ -1882,18 +1936,20 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len { buff = new byte[_longlenleft]; } + while (bytesLeft > 0) { int bytesToRead = (int)Math.Min(_longlenleft, (ulong)bytesLeft); if (buff.Length < (offset + bytesToRead)) { // Grow the array - newbuf = new byte[offset + bytesToRead]; + byte[] newbuf = new byte[offset + bytesToRead]; Buffer.BlockCopy(buff, 0, newbuf, 0, offset); buff = newbuf; + newbuf = null; } - TdsOperationStatus result = TryReadByteArray(buff.AsSpan(offset), bytesToRead, out bytesRead); + TdsOperationStatus result = TryReadByteArray(buff.AsSpan(offset), bytesToRead, out int bytesRead); Debug.Assert(bytesRead <= bytesLeft, "Read more bytes than we needed"); Debug.Assert((ulong)bytesRead <= _longlenleft, "Read more bytes than is available"); @@ -1903,11 +1959,20 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len _longlenleft -= (ulong)bytesRead; if (result != TdsOperationStatus.Done) { - if (_snapshot != null) + if (writeDataSizeToSnapshot) + { + // a partial read has happened so store the target buffer in the snapshot + // so it can be re-used when another packet arrives and we read again + SetSnapshotStorage(buff); + SetSnapshotDataSize(bytesRead); + + } + else if (compatibilityMode && _snapshot != null) { + // legacy replay path perf optimization // a partial read has happened so store the target buffer in the snapshot // so it can be re-used when another packet arrives and we read again - _snapshot._plpBuffer = buff; + SetSnapshotStorage(buff); } return result; } @@ -1918,11 +1983,19 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len result = TryReadPlpLength(false, out _); if (result != TdsOperationStatus.Done) { - if (_snapshot != null) + if (writeDataSizeToSnapshot) + { + if (result == TdsOperationStatus.NeedMoreData) + { + SetSnapshotStorage(buff); + SetSnapshotDataSize(bytesRead); + } + } + else if (compatibilityMode && _snapshot != null) { // a partial read has happened so store the target buffer in the snapshot // so it can be re-used when another packet arrives and we read again - _snapshot._plpBuffer = buff; + SetSnapshotStorage(buff); } return result; } @@ -1932,7 +2005,9 @@ internal TdsOperationStatus TryReadPlpBytes(ref byte[] buff, int offset, int len // Catch the point where we read the entire plp data stream and clean up state if (_longlenleft == 0) // Data read complete + { break; + } } return TdsOperationStatus.Done; } @@ -1997,6 +2072,18 @@ internal TdsOperationStatus TryReadNetworkPacket() stackTrace = Environment.StackTrace; } #endif + bool capturedAsContinue = false; + if (_snapshotStatus == SnapshotStatus.ReplayRunning || _snapshotStatus == SnapshotStatus.ReplayStarting) + { + if (_bTmpRead == 0 && _partialHeaderBytesRead == 0 && _longlenleft == 0 && _snapshot.ContinueEnabled) + { + // no temp between packets + // mark this point as continue-able + _snapshot.CaptureAsContinue(this); + capturedAsContinue = true; + } + } + if (_snapshot.MoveNext()) { #if DEBUG @@ -2015,6 +2102,13 @@ internal TdsOperationStatus TryReadNetworkPacket() _lastStack = stackTrace; } #endif + if (_bTmpRead == 0 && _partialHeaderBytesRead == 0 && _longlenleft == 0 && _snapshot.ContinueEnabled && !capturedAsContinue) + { + // no temp between packets + // mark this point as continue-able + _snapshot.CaptureAsContinue(this); + capturedAsContinue = true; + } } } @@ -2063,7 +2157,10 @@ internal TdsOperationStatus TryReadNetworkPacket() internal void PrepareReplaySnapshot() { _networkPacketTaskSource = null; - _snapshot.MoveToStart(); + if (!_snapshot.MoveToContinue()) + { + _snapshot.MoveToStart(); + } } internal void ReadSniSyncOverAsync() @@ -2755,6 +2852,7 @@ internal void SetSnapshot() snapshot.Clear(); } _snapshot = snapshot; + Debug.Assert(_snapshot._storage == null); _snapshot.CaptureAsStart(this); _snapshotStatus = SnapshotStatus.NotActive; } @@ -2765,13 +2863,103 @@ internal void ResetSnapshot() { StateSnapshot snapshot = _snapshot; _snapshot = null; + Debug.Assert(snapshot._storage == null); snapshot.Clear(); Interlocked.CompareExchange(ref _cachedSnapshot, snapshot, null); } _snapshotStatus = SnapshotStatus.NotActive; } - sealed partial class StateSnapshot + internal bool IsSnapshotAvailable() + { + return _snapshot != null && _snapshot.ContinueEnabled; + } + /// + /// Returns true if the state object is in the state of continuing from a previously stored snapshot packet + /// meaning that consumers should resume from the point where they last needed more data instead of beginning + /// to process packets in the snapshot from the beginning again + /// + /// + internal bool IsSnapshotContinuing() + { + return _snapshot != null && + _snapshot.ContinueEnabled && + _snapshotStatus == TdsParserStateObject.SnapshotStatus.ContinueRunning; + } + + internal (bool IsAvailable, bool IsStarting, bool IsContinuing) GetSnapshotStatuses() + { + bool isAvailable = _snapshot != null && _snapshot.ContinueEnabled; + bool isStarting = false; + bool isContinuing = false; + if (isAvailable) + { + isStarting = _snapshotStatus == SnapshotStatus.ReplayStarting; + isContinuing = _snapshotStatus == SnapshotStatus.ContinueRunning; + } + return (isAvailable, isStarting, isContinuing); + } + + internal int GetSnapshotStorageLength() + { + Debug.Assert(_snapshot != null && _snapshot.ContinueEnabled, "should not access snapshot accessor functions without first checking that the snapshot is available"); + return (_snapshot?._storage as IList)?.Count ?? 0; + } + + internal object TryTakeSnapshotStorage() + { + Debug.Assert(_snapshot != null, "should not access snapshot accessor functions without first checking that the snapshot is present"); + object buffer = null; + if (_snapshot != null) + { + buffer = _snapshot._storage; + _snapshot._storage = null; + } + return buffer; + } + + internal void SetSnapshotStorage(object buffer) + { + Debug.Assert(_snapshot != null, "should not access snapshot accessor functions without first checking that the snapshot is available"); + Debug.Assert(_snapshot._storage == null, "should not overwrite snapshot stored buffer"); + if (_snapshot != null) + { + _snapshot._storage = buffer; + } + } + + /// + /// stores the countOfBytesCopiedFromCurrentPacket of bytes copied from the current packet in the snapshot allowing the total + /// countOfBytesCopiedFromCurrentPacket to be calculated + /// + /// + internal void SetSnapshotDataSize(int countOfBytesCopiedFromCurrentPacket) + { + Debug.Assert(_snapshot != null && _snapshot.ContinueEnabled, "_snapshot must exist to store packet data size"); + _snapshot.SetPacketDataSize(countOfBytesCopiedFromCurrentPacket); + } + + internal int GetSnapshotTotalSize() + { + Debug.Assert(_snapshot != null && _snapshot.ContinueEnabled, "_snapshot must exist to read total size"); + Debug.Assert(_snapshotStatus != SnapshotStatus.NotActive, "_snapshot must be active read total size"); + return _snapshot.GetPacketDataOffset(); + } + + internal int GetSnapshotDataSize() + { + Debug.Assert(_snapshot != null && _snapshot.ContinueEnabled, "_snapshot must exist to read packet data size"); + Debug.Assert(_snapshotStatus != SnapshotStatus.NotActive, "_snapshot must be active read packet data size"); + return _snapshot.GetPacketDataSize(); + } + + internal int GetSnapshotPacketID() + { + Debug.Assert(_snapshot != null && _snapshot.ContinueEnabled, "_snapshot must exist to read packet data size"); + return _snapshot.GetPacketID(); + } + + internal sealed partial class StateSnapshot { private sealed partial class PacketData { @@ -2780,6 +2968,33 @@ private sealed partial class PacketData public PacketData NextPacket; public PacketData PrevPacket; + /// + /// Stores the data size of the total snapshot so far so that enumeration is not needed + /// to get the offset of the previous packet data in the stored buffer + /// + public int RunningDataSize; + + public int PacketID => Packet.GetIDFromHeader(Buffer.AsSpan(0, TdsEnums.HEADER_LEN)); + + internal int GetPacketDataOffset() + { + int previous = 0; + if (PrevPacket != null) + { + previous = PrevPacket.RunningDataSize; + } + return previous; + } + internal int GetPacketDataSize() + { + int previous = 0; + if (PrevPacket != null) + { + previous = PrevPacket.RunningDataSize; + } + return Math.Max(RunningDataSize - previous, 0); + } + internal void Clear() { Buffer = null; @@ -2926,6 +3141,8 @@ public string Status public ReadOnlySpan Data => _data.Buffer.AsSpan(TdsEnums.HEADER_LEN); + public int RunningDataSize => _data.RunningDataSize; + public PacketData NextPacket => _data.NextPacket; public PacketData PrevPacket => _data.PrevPacket; } @@ -2934,8 +3151,6 @@ public string Status public string Stack; public byte[] Hash; - public int PacketID => Packet.GetIDFromHeader(Buffer.AsSpan(0, TdsEnums.HEADER_LEN)); - public int SPID => Packet.GetSpidFromHeader(Buffer.AsSpan(0, TdsEnums.HEADER_LEN)); public bool IsEOM => Packet.GetIsEOMFromHeader(Buffer.AsSpan(0, TdsEnums.HEADER_LEN)); @@ -2990,6 +3205,11 @@ partial void CheckDebugDataHashImpl() } } } + + public override string ToString() + { + return $"{PacketID}({GetPacketDataOffset()},{GetPacketDataSize()})"; + } } #endif @@ -3078,12 +3298,14 @@ internal void Restore(TdsParserStateObject stateObj) private TdsParserStateObject _stateObj; private StateObjectData _replayStateData; + private StateObjectData _continueStateData; - internal byte[] _plpBuffer; + internal object _storage; private PacketData _lastPacket; private PacketData _firstPacket; private PacketData _current; + private PacketData _continuePacket; private PacketData _sparePacket; #if DEBUG @@ -3127,6 +3349,7 @@ internal void CheckStack(string trace) } } #endif + public bool ContinueEnabled => !LocalAppContextSwitches.UseCompatibilityAsyncBehaviour; internal void CloneNullBitmapInfo() { @@ -3204,6 +3427,10 @@ internal bool MoveNext() } else if (_current.NextPacket != null) { + if (_stateObj._snapshotStatus == SnapshotStatus.ContinueRunning) + { + moveToMode = SnapshotStatus.ContinueRunning; + } _current = _current.NextPacket; moved = true; } @@ -3211,7 +3438,6 @@ internal bool MoveNext() if (moved) { _stateObj.SetBuffer(_current.Buffer, 0, _current.Read); - _current.CheckDebugDataHash(); _stateObj._snapshotStatus = moveToMode; retval = true; } @@ -3228,6 +3454,22 @@ internal void MoveToStart() _stateObj.AssertValidState(); } + internal bool MoveToContinue() + { + if (ContinueEnabled) + { + if (_continuePacket != null && _continuePacket != _current) + { + _continueStateData.Restore(_stateObj); + _stateObj.SetBuffer(_current.Buffer, 0, _current.Read); + _stateObj._snapshotStatus = SnapshotStatus.ContinueRunning; + _stateObj.AssertValidState(); + return true; + } + } + return false; + } + internal void CaptureAsStart(TdsParserStateObject stateObj) { _firstPacket = null; @@ -3248,6 +3490,76 @@ internal void CaptureAsStart(TdsParserStateObject stateObj) AppendPacketData(stateObj._inBuff, stateObj._inBytesRead); } + internal void CaptureAsContinue(TdsParserStateObject stateObj) + { + if (ContinueEnabled) + { + Debug.Assert(_stateObj == stateObj); + if (_current is not null) + { + _continueStateData ??= new StateObjectData(); + _continueStateData.Capture(stateObj, trackStack: false); + _continuePacket = _current; + } + } + } + + internal void SetPacketDataSize(int size) + { + PacketData target = _current; + // special case for the start of a snapshot when we expect to have only a single packet + // but have no current packet because we haven't started to replay yet. + if ( + target == null && + _firstPacket != null && + _firstPacket == _lastPacket + ) + { + target = _firstPacket; + } + + if (target == null) + { + throw new InvalidOperationException(); + } + int total = 0; + if (target.PrevPacket != null) + { + total = target.PrevPacket.RunningDataSize; + } + target.RunningDataSize = total + size; + } + + internal int GetPacketDataOffset() + { + int offset = 0; + if (_current != null) + { + offset = _current.GetPacketDataOffset(); + } + return offset; + } + + internal int GetPacketDataSize() + { + int offset = 0; + if (_current != null) + { + offset = _current.GetPacketDataSize(); + } + return offset; + } + + internal int GetPacketID() + { + int id = 0; + if (_current != null) + { + id = _current.PacketID; + } + return id; + } + internal void Clear() { ClearState(); @@ -3259,6 +3571,7 @@ private void ClearPackets() PacketData packet = _firstPacket; _firstPacket = null; _lastPacket = null; + _continuePacket = null; _current = null; packet.Clear(); _sparePacket = packet; @@ -3266,7 +3579,10 @@ private void ClearPackets() private void ClearState() { + Debug.Assert(_storage == null); + _storage = null; _replayStateData.Clear(_stateObj); + _continueStateData?.Clear(_stateObj, trackStack: false); #if DEBUG _rollingPend = 0; _rollingPendCount = 0; diff --git a/src/Microsoft.Data.SqlClient/tests/FunctionalTests/TdsParserStateObject.TestHarness.cs b/src/Microsoft.Data.SqlClient/tests/FunctionalTests/TdsParserStateObject.TestHarness.cs index eded4d1986..b746e7c0ad 100644 --- a/src/Microsoft.Data.SqlClient/tests/FunctionalTests/TdsParserStateObject.TestHarness.cs +++ b/src/Microsoft.Data.SqlClient/tests/FunctionalTests/TdsParserStateObject.TestHarness.cs @@ -159,6 +159,16 @@ public static bool UseCompatibilityProcessSni return (bool)switchesType.GetProperty(nameof(UseCompatibilityProcessSni), BindingFlags.Public | BindingFlags.Static).GetValue(null); } } + + public static bool UseCompatibilityAsyncBehaviour + { + get + { + var switchesType = typeof(SqlCommand).Assembly.GetType("Microsoft.Data.SqlClient.LocalAppContextSwitches"); + + return (bool)switchesType.GetProperty(nameof(UseCompatibilityAsyncBehaviour), BindingFlags.Public | BindingFlags.Static).GetValue(null); + } + } } #if NETFRAMEWORK diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/Common/SystemDataInternals/TdsParserStateObjectHelper.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/Common/SystemDataInternals/TdsParserStateObjectHelper.cs index 2d6e234f55..9f484093b1 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/Common/SystemDataInternals/TdsParserStateObjectHelper.cs +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/Common/SystemDataInternals/TdsParserStateObjectHelper.cs @@ -3,22 +3,38 @@ // See the LICENSE file in the project root for more information. using System; +using System.Diagnostics; using System.Reflection; namespace Microsoft.Data.SqlClient.ManualTesting.Tests.SystemDataInternals { internal static class TdsParserStateObjectHelper { - private static readonly Assembly s_systemDotData = typeof(Microsoft.Data.SqlClient.SqlConnection).GetTypeInfo().Assembly; - private static readonly Type s_tdsParserStateObject = s_systemDotData.GetType("Microsoft.Data.SqlClient.TdsParserStateObject"); - private static readonly FieldInfo s_forceAllPends = s_tdsParserStateObject.GetField("s_forceAllPends", BindingFlags.Static | BindingFlags.NonPublic); - private static readonly FieldInfo s_skipSendAttention = s_tdsParserStateObject.GetField("s_skipSendAttention", BindingFlags.Static | BindingFlags.NonPublic); - private static readonly FieldInfo s_forceSyncOverAsyncAfterFirstPend = s_tdsParserStateObject.GetField("s_forceSyncOverAsyncAfterFirstPend", BindingFlags.Static | BindingFlags.NonPublic); - private static readonly FieldInfo s_failAsyncPends = s_tdsParserStateObject.GetField("s_failAsyncPends", BindingFlags.Static | BindingFlags.NonPublic); - private static readonly FieldInfo s_forcePendingReadsToWaitForUser = s_tdsParserStateObject.GetField("s_forcePendingReadsToWaitForUser", BindingFlags.Static | BindingFlags.NonPublic); - private static readonly Type s_tdsParserStateObjectManaged = s_systemDotData.GetType("Microsoft.Data.SqlClient.SNI.TdsParserStateObjectManaged"); - private static readonly FieldInfo s_tdsParserStateObjectManagedSessionHandle = s_tdsParserStateObjectManaged.GetField("_sessionHandle", BindingFlags.Instance | BindingFlags.NonPublic); + private static readonly Assembly s_systemDotData; + private static readonly Type s_tdsParserStateObject; + private static readonly FieldInfo s_forceAllPends; + private static readonly FieldInfo s_skipSendAttention; + private static readonly FieldInfo s_forceSyncOverAsyncAfterFirstPend; + private static readonly FieldInfo s_failAsyncPends; + private static readonly FieldInfo s_forcePendingReadsToWaitForUser; + private static readonly Type s_tdsParserStateObjectManaged; + private static readonly FieldInfo s_tdsParserStateObjectManagedSessionHandle; + static TdsParserStateObjectHelper() + { + s_systemDotData = typeof(Microsoft.Data.SqlClient.SqlConnection).GetTypeInfo().Assembly; + s_tdsParserStateObject = s_systemDotData.GetType("Microsoft.Data.SqlClient.TdsParserStateObject"); + s_forceAllPends = s_tdsParserStateObject.GetField("s_forceAllPends", BindingFlags.Static | BindingFlags.NonPublic); + s_skipSendAttention = s_tdsParserStateObject.GetField("s_skipSendAttention", BindingFlags.Static | BindingFlags.NonPublic); + s_forceSyncOverAsyncAfterFirstPend = s_tdsParserStateObject.GetField("s_forceSyncOverAsyncAfterFirstPend", BindingFlags.Static | BindingFlags.NonPublic); + s_failAsyncPends = s_tdsParserStateObject.GetField("s_failAsyncPends", BindingFlags.Static | BindingFlags.NonPublic); + s_forcePendingReadsToWaitForUser = s_tdsParserStateObject.GetField("s_forcePendingReadsToWaitForUser", BindingFlags.Static | BindingFlags.NonPublic); + s_tdsParserStateObjectManaged = s_systemDotData.GetType("Microsoft.Data.SqlClient.SNI.TdsParserStateObjectManaged"); + if (s_tdsParserStateObjectManaged != null) + { + s_tdsParserStateObjectManagedSessionHandle = s_tdsParserStateObjectManaged.GetField("_sessionHandle", BindingFlags.Instance | BindingFlags.NonPublic); + } + } internal static bool ForceAllPends { get { return (bool)s_forceAllPends.GetValue(null); } @@ -52,9 +68,17 @@ internal static bool FailAsyncPends private static void VerifyObjectIsTdsParserStateObject(object stateObject) { if (stateObject == null) + { throw new ArgumentNullException(nameof(stateObject)); + } + if (s_tdsParserStateObjectManaged == null) + { + throw new ArgumentException("Library being tested does not implement TdsParserStateObjectManaged", nameof(stateObject)); + } if (!s_tdsParserStateObjectManaged.IsInstanceOfType(stateObject)) + { throw new ArgumentException("Object provided was not a TdsParserStateObjectManaged", nameof(stateObject)); + } } internal static object GetSessionHandle(object stateObject) diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderStreamsTest.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderStreamsTest.cs index a180c21b3b..adc20b5e92 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderStreamsTest.cs +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderStreamsTest.cs @@ -20,7 +20,7 @@ public static class DataReaderStreamsTest [MemberData(nameof(GetCommandBehavioursAndIsAsync))] public static async Task GetFieldValueAsync_OfStream(CommandBehavior behavior, bool isExecuteAsync) { - const int PacketSize = 512; // force minimun packet size so that the test data spans multiple packets to test sequential access spanning + const int PacketSize = 512; // force minimum packet size so that the test data spans multiple packets to test sequential access spanning string connectionString = SetConnectionStringPacketSize(DataTestUtility.TCPConnectionString, PacketSize); byte[] originalData = CreateBinaryData(PacketSize, forcedPacketCount: 4); string query = CreateBinaryDataQuery(originalData); diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs index d00ea1d226..8a04ba7e66 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/DataReaderTest/DataReaderTest.cs @@ -9,6 +9,7 @@ using System.Reflection; using System.Text; using System.Threading; +using System.Threading.Tasks; using Xunit; namespace Microsoft.Data.SqlClient.ManualTesting.Tests @@ -300,6 +301,105 @@ public static void CheckNullRowVersionIsBDNull() } } + [ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))] + public static int CanReadEmployeesTableCompletely() + { + int counter = 0; + + using (var conn = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + using (var cmd = new SqlCommand("SELECT EmployeeID,LastName,FirstName,Title,TitleOfCourtesy,BirthDate,HireDate,Address,City,Region,PostalCode,Country,HomePhone,Extension,Photo,Notes,ReportsTo,PhotoPath FROM Employees WHERE ReportsTo = @p0 OR (ReportsTo IS NULL AND @p0 IS NULL)", conn)) + { + cmd.Parameters.AddWithValue("@p0", 5); + + conn.Open(); + + using (var reader = cmd.ExecuteReader()) + { + if (reader.Read()) + { + for (int index = 0; index < reader.FieldCount; index++) + { + if (!reader.IsDBNull(index)) + { + object value = reader[index]; + counter += 1; + } + } + } + } + } + } + + return counter; + } + + [ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))] + public static async Task CanReadEmployeesTableCompletelyAsync() + { + int counter = 0; + + using (var conn = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + using (var cmd = new SqlCommand("SELECT EmployeeID,LastName,FirstName,Title,TitleOfCourtesy,BirthDate,HireDate,Address,City,Region,PostalCode,Country,HomePhone,Extension,Photo,Notes,ReportsTo,PhotoPath FROM Employees WHERE ReportsTo = @p0 OR (ReportsTo IS NULL AND @p0 IS NULL)", conn)) + { + cmd.Parameters.AddWithValue("@p0", 5); + + await conn.OpenAsync(); + + using (var reader = await cmd.ExecuteReaderAsync()) + { + if (await reader.ReadAsync()) + { + for (int index = 0; index < reader.FieldCount; index++) + { + if (!await reader.IsDBNullAsync(index)) + { + object value = reader[index]; + counter += 1; + } + } + } + } + } + } + + return counter; + } + + [ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))] + public static async Task CanReadEmployeesTableCompletelyWithNullImageAsync() + { + int counter = 0; + + using (var conn = new SqlConnection(DataTestUtility.TCPConnectionString)) + { + using (var cmd = new SqlCommand("SELECT EmployeeID,LastName,FirstName,Title,TitleOfCourtesy,BirthDate,HireDate,Address,City,Region,PostalCode,Country,HomePhone,Extension,convert(image,NULL) as Photo,Notes,ReportsTo,PhotoPath FROM Employees WHERE ReportsTo = @p0 OR (ReportsTo IS NULL AND @p0 IS NULL)", conn)) + { + cmd.Parameters.AddWithValue("@p0", 5); + + await conn.OpenAsync(); + + using (var reader = await cmd.ExecuteReaderAsync()) + { + if (await reader.ReadAsync()) + { + for (int index = 0; index < reader.FieldCount; index++) + { + if (!await reader.IsDBNullAsync(index)) + { + object value = reader[index]; + counter += 1; + } + } + } + } + } + } + + return counter; + } + // Synapse: Cannot find data type 'rowversion'. [ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup), nameof(DataTestUtility.IsNotAzureSynapse))] public static void CheckLegacyNullRowVersionIsEmptyArray()