Skip to content

Commit 7278243

Browse files
author
David Engel
authored
Address MARS TDS header contained errors (#490)
1 parent c5a825b commit 7278243

File tree

4 files changed

+267
-5
lines changed

4 files changed

+267
-5
lines changed

src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2935,15 +2935,16 @@ private bool TryProcessDone(SqlCommand cmd, SqlDataReader reader, ref RunBehavio
29352935
}
29362936
}
29372937

2938+
// _attentionSent set by 'SendAttention'
29382939
// _pendingData set by e.g. 'TdsExecuteSQLBatch'
29392940
// _hasOpenResult always set to true by 'WriteMarsHeader'
29402941
//
2941-
if (!stateObj.HasPendingData && stateObj.HasOpenResult)
2942+
if (!stateObj._attentionSent && !stateObj.HasPendingData && stateObj.HasOpenResult)
29422943
{
29432944
/*
29442945
Debug.Assert(!((sqlTransaction != null && _distributedTransaction != null) ||
2945-
(_userStartedLocalTransaction != null && _distributedTransaction != null))
2946-
, "ProcessDone - have both distributed and local transactions not null!");
2946+
(_userStartedLocalTransaction != null && _distributedTransaction != null))
2947+
, "ProcessDone - have both distributed and local transactions not null!");
29472948
*/
29482949
// WebData 112722
29492950

@@ -8706,7 +8707,7 @@ internal Task TdsExecuteSQLBatch(string text, int timeout, SqlNotificationReques
87068707
{
87078708
Debug.Assert(!sync, "Should not have gotten a Task when writing in sync mode");
87088709

8709-
// Need to wait for flush - continuation will unlock the connection
8710+
// Need to wait for flush - continuation will unlock the connection
87108711
bool taskReleaseConnectionLock = releaseConnectionLock;
87118712
releaseConnectionLock = false;
87128713
return executeTask.ContinueWith(

src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3326,10 +3326,11 @@ private bool TryProcessDone(SqlCommand cmd, SqlDataReader reader, ref RunBehavio
33263326
}
33273327
}
33283328

3329+
// _attentionSent set by 'SendAttention'
33293330
// _pendingData set by e.g. 'TdsExecuteSQLBatch'
33303331
// _hasOpenResult always set to true by 'WriteMarsHeader'
33313332
//
3332-
if (!stateObj._pendingData && stateObj._hasOpenResult)
3333+
if (!stateObj._attentionSent && !stateObj._pendingData && stateObj._hasOpenResult)
33333334
{
33343335
/*
33353336
Debug.Assert(!((sqlTransaction != null && _distributedTransaction != null) ||

src/Microsoft.Data.SqlClient/tests/ManualTests/Microsoft.Data.SqlClient.ManualTesting.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
<Compile Include="ProviderAgnostic\MultipleResultsTest\MultipleResultsTest.cs" />
100100
<Compile Include="ProviderAgnostic\ReaderTest\ReaderTest.cs" />
101101
<Compile Include="SQL\AsyncTest\AsyncTest.cs" />
102+
<Compile Include="SQL\AsyncTest\AsyncCancelledConnectionsTest.cs" />
102103
<Compile Include="SQL\SqlBulkCopyTest\DataConversionErrorMessageTest.cs" />
103104
<Compile Include="SQL\SqlCommand\SqlCommandCompletedTest.cs" />
104105
<Compile Include="SQL\SqlCommand\SqlCommandCancelTest.cs" />
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Xunit;
9+
using Xunit.Abstractions;
10+
11+
namespace Microsoft.Data.SqlClient.ManualTesting.Tests
12+
{
13+
public class AsyncCancelledConnectionsTest
14+
{
15+
private readonly ITestOutputHelper _output;
16+
private const int NumberOfTasks = 100; // How many attempts to poison the connection pool we will try
17+
private const int NumberOfNonPoisoned = 10; // Number of normal requests for each attempt
18+
19+
public AsyncCancelledConnectionsTest(ITestOutputHelper output)
20+
{
21+
this._output = output;
22+
}
23+
24+
[ActiveIssue(490)] // This test seems to fail regularly in pipelines due to deadlocks. But it's still useful for local testing.
25+
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))]
26+
public void CancelAsyncConnections()
27+
{
28+
string connectionString = DataTestUtility.TCPConnectionString;
29+
_watch = Stopwatch.StartNew();
30+
_random = new Random(4); // chosen via fair dice role.
31+
ParallelLoopResult results = new ParallelLoopResult();
32+
try
33+
{
34+
// Setup a timer so that we can see what is going on while our tasks run
35+
using (new Timer(TimerCallback, state: null, dueTime: TimeSpan.FromSeconds(5), period: TimeSpan.FromSeconds(5)))
36+
{
37+
results = Parallel.For(
38+
fromInclusive: 0,
39+
toExclusive: NumberOfTasks,
40+
(int i) => DoManyAsync(connectionString).GetAwaiter().GetResult());
41+
}
42+
}
43+
catch (Exception ex)
44+
{
45+
_output.WriteLine(ex.ToString());
46+
}
47+
while (!results.IsCompleted)
48+
{
49+
Thread.Sleep(50);
50+
}
51+
DisplaySummary();
52+
foreach (var detail in _exceptionDetails)
53+
{
54+
_output.WriteLine(detail);
55+
}
56+
Assert.Empty(_exceptionDetails);
57+
}
58+
59+
// Display one row every 5'ish seconds
60+
private void TimerCallback(object state)
61+
{
62+
lock (_lockObject)
63+
{
64+
DisplaySummary();
65+
}
66+
}
67+
68+
private void DisplaySummary()
69+
{
70+
int count;
71+
lock (_exceptionDetails)
72+
{
73+
count = _exceptionDetails.Count;
74+
}
75+
76+
_output.WriteLine($"{_watch.Elapsed} {_continue} Started:{_start} Done:{_done} InFlight:{_inFlight} RowsRead:{_rowsRead} ResultRead:{_resultRead} PoisonedEnded:{_poisonedEnded} nonPoisonedExceptions:{_nonPoisonedExceptions} PoisonedCleanupExceptions:{_poisonCleanUpExceptions} Count:{count} Found:{_found}");
77+
}
78+
79+
// This is the the main body that our Tasks run
80+
private async Task DoManyAsync(string connectionString)
81+
{
82+
Interlocked.Increment(ref _start);
83+
Interlocked.Increment(ref _inFlight);
84+
85+
// First poison
86+
await DoOneAsync(connectionString, poison: true);
87+
88+
for (int i = 0; i < NumberOfNonPoisoned && _continue; i++)
89+
{
90+
// now run some without poisoning
91+
await DoOneAsync(connectionString);
92+
}
93+
94+
Interlocked.Decrement(ref _inFlight);
95+
Interlocked.Increment(ref _done);
96+
}
97+
98+
// This will do our work, open a connection, and run a query (that returns 4 results sets)
99+
// if we are poisoning we will
100+
// 1 - Interject some sleeps in the sql statement so that it will run long enough that we can cancel it
101+
// 2 - Setup a time bomb task that will cancel the command a random amount of time later
102+
private async Task DoOneAsync(string connectionString, bool poison = false)
103+
{
104+
try
105+
{
106+
using (var connection = new SqlConnection(connectionString))
107+
{
108+
StringBuilder builder = new StringBuilder();
109+
for (int i = 0; i < 4; i++)
110+
{
111+
builder.AppendLine("SELECT name FROM sys.tables");
112+
if (poison && i < 3)
113+
{
114+
builder.AppendLine("WAITFOR DELAY '00:00:01'");
115+
}
116+
}
117+
118+
int rowsRead = 0;
119+
int resultRead = 0;
120+
121+
try
122+
{
123+
await connection.OpenAsync();
124+
using (var command = connection.CreateCommand())
125+
{
126+
Task timeBombTask = default;
127+
try
128+
{
129+
// Setup our time bomb
130+
if (poison)
131+
{
132+
timeBombTask = TimeBombAsync(command);
133+
}
134+
135+
command.CommandText = builder.ToString();
136+
137+
// Attempt to read all of the data
138+
using (var reader = await command.ExecuteReaderAsync())
139+
{
140+
try
141+
{
142+
do
143+
{
144+
resultRead++;
145+
while (await reader.ReadAsync() && _continue)
146+
{
147+
rowsRead++;
148+
}
149+
}
150+
while (await reader.NextResultAsync() && _continue);
151+
}
152+
catch when (poison)
153+
{
154+
// This looks a little strange, we failed to read above so this should fail too
155+
// But consider the case where this code is elsewhere (in the Dispose method of a class holding this logic)
156+
try
157+
{
158+
while (await reader.NextResultAsync())
159+
{
160+
}
161+
}
162+
catch
163+
{
164+
Interlocked.Increment(ref _poisonCleanUpExceptions);
165+
}
166+
167+
throw;
168+
}
169+
}
170+
}
171+
finally
172+
{
173+
// Make sure to clean up our time bomb
174+
// It is unlikely, but the timebomb may get delayed in the Task Queue
175+
// And we don't want it running after we dispose the command
176+
if (timeBombTask != default)
177+
{
178+
await timeBombTask;
179+
}
180+
}
181+
}
182+
}
183+
finally
184+
{
185+
Interlocked.Add(ref _rowsRead, rowsRead);
186+
Interlocked.Add(ref _resultRead, resultRead);
187+
if (poison)
188+
{
189+
Interlocked.Increment(ref _poisonedEnded);
190+
}
191+
}
192+
}
193+
}
194+
catch (Exception ex)
195+
{
196+
if (!poison)
197+
{
198+
Interlocked.Increment(ref _nonPoisonedExceptions);
199+
200+
string details = ex.ToString();
201+
details = details.Substring(0, Math.Min(200, details.Length));
202+
lock (_exceptionDetails)
203+
{
204+
_exceptionDetails.Add(details);
205+
}
206+
}
207+
208+
if (ex.Message.Contains("The MARS TDS header contained errors."))
209+
{
210+
_continue = false;
211+
if (_found == 0) // This check is not really safe we may list more than one.
212+
{
213+
lock (_lockObject)
214+
{
215+
// You will notice that poison will be likely be false here, it is the normal commands that suffer
216+
// Once we have successfully poisoned the connection pool, we may start to see some other request to poison fail just like the normal requests
217+
_output.WriteLine($"{poison} {DateTime.UtcNow.ToString("O")}");
218+
_output.WriteLine(ex.ToString());
219+
}
220+
}
221+
Interlocked.Increment(ref _found);
222+
}
223+
}
224+
}
225+
226+
private async Task TimeBombAsync(SqlCommand command)
227+
{
228+
await SleepAsync(100, 3000);
229+
command.Cancel();
230+
}
231+
232+
private async Task SleepAsync(int minMs, int maxMs)
233+
{
234+
int delayMs;
235+
lock (_random)
236+
{
237+
delayMs = _random.Next(minMs, maxMs);
238+
}
239+
await Task.Delay(delayMs);
240+
}
241+
242+
private Stopwatch _watch;
243+
244+
private int _inFlight;
245+
private int _start;
246+
private int _done;
247+
private int _rowsRead;
248+
private int _resultRead;
249+
private int _nonPoisonedExceptions;
250+
private int _poisonedEnded;
251+
private int _poisonCleanUpExceptions;
252+
private bool _continue = true;
253+
private int _found;
254+
private Random _random;
255+
private object _lockObject = new object();
256+
257+
private HashSet<string> _exceptionDetails = new HashSet<string>();
258+
}
259+
}

0 commit comments

Comments
 (0)