Skip to content

Commit b7c5f1a

Browse files
authored
Adapt InternalUploadFile for async (#1653)
The recently added UploadFileAsync effectively calls stream.CopyToAsync(SftpFileStream). This is slower than the sync UploadFile (by about 4x in a local test) because the sync version sends multiple write requests concurrently, without waiting for each response in turn like the stream-based version does. This change adapts the sync code for async and uses it to bring the performance of UploadFileAsync in line with that of UploadFile.
1 parent d40bc43 commit b7c5f1a

File tree

4 files changed

+190
-92
lines changed

4 files changed

+190
-92
lines changed

src/Renci.SshNet/.editorconfig

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,6 @@ dotnet_diagnostic.MA0042.severity = none
194194

195195
# S3236: Caller information arguments should not be provided explicitly
196196
dotnet_diagnostic.S3236.severity = none
197+
198+
# S3358: Ternary operators should not be nested
199+
dotnet_diagnostic.S3358.severity = none

src/Renci.SshNet/ISubsystemSession.cs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Threading;
3+
using System.Threading.Tasks;
34

45
using Microsoft.Extensions.Logging;
56

@@ -49,15 +50,40 @@ internal interface ISubsystemSession : IDisposable
4950
void Disconnect();
5051

5152
/// <summary>
52-
/// Waits a specified time for a given <see cref="WaitHandle"/> to get signaled.
53+
/// Waits a specified time for a given <see cref="WaitHandle"/> to be signaled.
5354
/// </summary>
5455
/// <param name="waitHandle">The handle to wait for.</param>
55-
/// <param name="millisecondsTimeout">The number of milliseconds wait for <paramref name="waitHandle"/> to get signaled, or <c>-1</c> to wait indefinitely.</param>
56+
/// <param name="millisecondsTimeout">The number of milliseconds to wait for <paramref name="waitHandle"/> to be signaled, or <c>-1</c> to wait indefinitely.</param>
5657
/// <exception cref="SshException">The connection was closed by the server.</exception>
5758
/// <exception cref="SshException">The channel was closed.</exception>
5859
/// <exception cref="SshOperationTimeoutException">The handle did not get signaled within the specified timeout.</exception>
5960
void WaitOnHandle(WaitHandle waitHandle, int millisecondsTimeout);
6061

62+
/// <summary>
63+
/// Asynchronously waits for a given <see cref="WaitHandle"/> to be signaled.
64+
/// </summary>
65+
/// <param name="waitHandle">The handle to wait for.</param>
66+
/// <param name="millisecondsTimeout">The number of milliseconds to wait for <paramref name="waitHandle"/> to be signaled, or <c>-1</c> to wait indefinitely.</param>
67+
/// <param name="cancellationToken">The cancellation token to observe.</param>
68+
/// <exception cref="SshException">The connection was closed by the server.</exception>
69+
/// <exception cref="SshException">The channel was closed.</exception>
70+
/// <exception cref="SshOperationTimeoutException">The handle did not get signaled within the specified timeout.</exception>
71+
/// <returns>A <see cref="Task"/> representing the wait.</returns>
72+
Task WaitOnHandleAsync(WaitHandle waitHandle, int millisecondsTimeout, CancellationToken cancellationToken);
73+
74+
/// <summary>
75+
/// Asynchronously waits for a given <see cref="TaskCompletionSource{T}"/> to complete.
76+
/// </summary>
77+
/// <typeparam name="T">The type of the result which is being awaited.</typeparam>
78+
/// <param name="tcs">The handle to wait for.</param>
79+
/// <param name="millisecondsTimeout">The number of milliseconds to wait for <paramref name="tcs"/> to complete, or <c>-1</c> to wait indefinitely.</param>
80+
/// <param name="cancellationToken">The cancellation token to observe.</param>
81+
/// <exception cref="SshException">The connection was closed by the server.</exception>
82+
/// <exception cref="SshException">The channel was closed.</exception>
83+
/// <exception cref="SshOperationTimeoutException">The handle did not get signaled within the specified timeout.</exception>
84+
/// <returns>A <see cref="Task"/> representing the wait.</returns>
85+
Task<T> WaitOnHandleAsync<T>(TaskCompletionSource<T> tcs, int millisecondsTimeout, CancellationToken cancellationToken);
86+
6187
/// <summary>
6288
/// Blocks the current thread until the specified <see cref="WaitHandle"/> gets signaled, using a
6389
/// 32-bit signed integer to specify the time interval in milliseconds.

src/Renci.SshNet/SftpClient.cs

Lines changed: 91 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,6 +1027,8 @@ public void UploadFile(Stream input, string path, Action<ulong>? uploadCallback
10271027
/// <inheritdoc/>
10281028
public void UploadFile(Stream input, string path, bool canOverride, Action<ulong>? uploadCallback = null)
10291029
{
1030+
ThrowHelper.ThrowIfNull(input);
1031+
ThrowHelper.ThrowIfNullOrWhiteSpace(path);
10301032
CheckDisposed();
10311033

10321034
var flags = Flags.Write | Flags.Truncate;
@@ -1040,15 +1042,31 @@ public void UploadFile(Stream input, string path, bool canOverride, Action<ulong
10401042
flags |= Flags.CreateNew;
10411043
}
10421044

1043-
InternalUploadFile(input, path, flags, asyncResult: null, uploadCallback);
1045+
InternalUploadFile(
1046+
input,
1047+
path,
1048+
flags,
1049+
asyncResult: null,
1050+
uploadCallback,
1051+
isAsync: false,
1052+
default).GetAwaiter().GetResult();
10441053
}
10451054

10461055
/// <inheritdoc />
10471056
public Task UploadFileAsync(Stream input, string path, CancellationToken cancellationToken = default)
10481057
{
1058+
ThrowHelper.ThrowIfNull(input);
1059+
ThrowHelper.ThrowIfNullOrWhiteSpace(path);
10491060
CheckDisposed();
10501061

1051-
return InternalUploadFileAsync(input, path, cancellationToken);
1062+
return InternalUploadFile(
1063+
input,
1064+
path,
1065+
Flags.Write | Flags.Truncate | Flags.CreateNewOrOpen,
1066+
asyncResult: null,
1067+
uploadCallback: null,
1068+
isAsync: true,
1069+
cancellationToken);
10521070
}
10531071

10541072
/// <summary>
@@ -1163,9 +1181,9 @@ public IAsyncResult BeginUploadFile(Stream input, string path, AsyncCallback? as
11631181
/// </remarks>
11641182
public IAsyncResult BeginUploadFile(Stream input, string path, bool canOverride, AsyncCallback? asyncCallback, object? state, Action<ulong>? uploadCallback = null)
11651183
{
1166-
CheckDisposed();
11671184
ThrowHelper.ThrowIfNull(input);
11681185
ThrowHelper.ThrowIfNullOrWhiteSpace(path);
1186+
CheckDisposed();
11691187

11701188
var flags = Flags.Write | Flags.Truncate;
11711189

@@ -1180,19 +1198,28 @@ public IAsyncResult BeginUploadFile(Stream input, string path, bool canOverride,
11801198

11811199
var asyncResult = new SftpUploadAsyncResult(asyncCallback, state);
11821200

1183-
ThreadAbstraction.ExecuteThread(() =>
1201+
_ = DoUploadAndSetResult();
1202+
1203+
async Task DoUploadAndSetResult()
11841204
{
11851205
try
11861206
{
1187-
InternalUploadFile(input, path, flags, asyncResult, uploadCallback);
1207+
await InternalUploadFile(
1208+
input,
1209+
path,
1210+
flags,
1211+
asyncResult,
1212+
uploadCallback,
1213+
isAsync: true,
1214+
CancellationToken.None).ConfigureAwait(false);
11881215

11891216
asyncResult.SetAsCompleted(exception: null, completedSynchronously: false);
11901217
}
11911218
catch (Exception exp)
11921219
{
1193-
asyncResult.SetAsCompleted(exception: exp, completedSynchronously: false);
1220+
asyncResult.SetAsCompleted(exp, completedSynchronously: false);
11941221
}
1195-
});
1222+
}
11961223

11971224
return asyncResult;
11981225
}
@@ -2284,11 +2311,16 @@ private List<FileInfo> InternalSynchronizeDirectories(string sourcePath, string
22842311
var remoteFileName = string.Format(CultureInfo.InvariantCulture, @"{0}/{1}", destinationPath, localFile.Name);
22852312
try
22862313
{
2287-
#pragma warning disable CA2000 // Dispose objects before losing scope; false positive
22882314
using (var file = File.OpenRead(localFile.FullName))
2289-
#pragma warning restore CA2000 // Dispose objects before losing scope; false positive
22902315
{
2291-
InternalUploadFile(file, remoteFileName, uploadFlag, asyncResult: null, uploadCallback: null);
2316+
InternalUploadFile(
2317+
file,
2318+
remoteFileName,
2319+
uploadFlag,
2320+
asyncResult: null,
2321+
uploadCallback: null,
2322+
isAsync: false,
2323+
CancellationToken.None).GetAwaiter().GetResult();
22922324
}
22932325

22942326
uploadedFiles.Add(localFile);
@@ -2455,37 +2487,42 @@ private async Task InternalDownloadFileAsync(string path, Stream output, Cancell
24552487
}
24562488
}
24572489

2458-
/// <summary>
2459-
/// Internals the upload file.
2460-
/// </summary>
2461-
/// <param name="input">The input.</param>
2462-
/// <param name="path">The path.</param>
2463-
/// <param name="flags">The flags.</param>
2464-
/// <param name="asyncResult">An <see cref="IAsyncResult"/> that references the asynchronous request.</param>
2465-
/// <param name="uploadCallback">The upload callback.</param>
2466-
/// <exception cref="ArgumentNullException"><paramref name="input" /> is <see langword="null"/>.</exception>
2467-
/// <exception cref="ArgumentException"><paramref name="path" /> is <see langword="null"/> or contains whitespace.</exception>
2468-
/// <exception cref="SshConnectionException">Client not connected.</exception>
2469-
private void InternalUploadFile(Stream input, string path, Flags flags, SftpUploadAsyncResult? asyncResult, Action<ulong>? uploadCallback)
2490+
#pragma warning disable S6966 // Awaitable method should be used
2491+
private async Task InternalUploadFile(
2492+
Stream input,
2493+
string path,
2494+
Flags flags,
2495+
SftpUploadAsyncResult? asyncResult,
2496+
Action<ulong>? uploadCallback,
2497+
bool isAsync,
2498+
CancellationToken cancellationToken)
24702499
{
2471-
ThrowHelper.ThrowIfNull(input);
2472-
ThrowHelper.ThrowIfNullOrWhiteSpace(path);
2500+
Debug.Assert(isAsync || cancellationToken == default);
24732501

24742502
if (_sftpSession is null)
24752503
{
24762504
throw new SshConnectionException("Client not connected.");
24772505
}
24782506

2479-
var fullPath = _sftpSession.GetCanonicalPath(path);
2507+
string fullPath;
2508+
byte[] handle;
24802509

2481-
var handle = _sftpSession.RequestOpen(fullPath, flags);
2510+
if (isAsync)
2511+
{
2512+
fullPath = await _sftpSession.GetCanonicalPathAsync(path, cancellationToken).ConfigureAwait(false);
2513+
handle = await _sftpSession.RequestOpenAsync(fullPath, flags, cancellationToken).ConfigureAwait(false);
2514+
}
2515+
else
2516+
{
2517+
fullPath = _sftpSession.GetCanonicalPath(path);
2518+
handle = _sftpSession.RequestOpen(fullPath, flags);
2519+
}
24822520

24832521
ulong offset = 0;
24842522

24852523
// create buffer of optimal length
24862524
var buffer = new byte[_sftpSession.CalculateOptimalWriteLength(_bufferSize, handle)];
24872525

2488-
int bytesRead;
24892526
var expectedResponses = 0;
24902527

24912528
// We will send out all the write requests without waiting for each response.
@@ -2495,8 +2532,21 @@ private void InternalUploadFile(Stream input, string path, Flags flags, SftpUplo
24952532

24962533
ExceptionDispatchInfo? exception = null;
24972534

2498-
while ((bytesRead = input.Read(buffer, 0, buffer.Length)) != 0)
2535+
while (true)
24992536
{
2537+
var bytesRead = isAsync
2538+
#if NET
2539+
? await input.ReadAsync(buffer, cancellationToken).ConfigureAwait(false)
2540+
#else
2541+
? await input.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)
2542+
#endif
2543+
: input.Read(buffer, 0, buffer.Length);
2544+
2545+
if (bytesRead == 0)
2546+
{
2547+
break;
2548+
}
2549+
25002550
if (asyncResult is not null && asyncResult.IsUploadCanceled)
25012551
{
25022552
break;
@@ -2555,34 +2605,28 @@ private void InternalUploadFile(Stream input, string path, Flags flags, SftpUplo
25552605

25562606
if (Volatile.Read(ref expectedResponses) != 0)
25572607
{
2558-
_sftpSession.WaitOnHandle(mres.WaitHandle, _operationTimeout);
2608+
if (isAsync)
2609+
{
2610+
await _sftpSession.WaitOnHandleAsync(mres.WaitHandle, _operationTimeout, cancellationToken).ConfigureAwait(false);
2611+
}
2612+
else
2613+
{
2614+
_sftpSession.WaitOnHandle(mres.WaitHandle, _operationTimeout);
2615+
}
25592616
}
25602617

25612618
exception?.Throw();
25622619

2563-
_sftpSession.RequestClose(handle);
2564-
}
2565-
2566-
private async Task InternalUploadFileAsync(Stream input, string path, CancellationToken cancellationToken)
2567-
{
2568-
ThrowHelper.ThrowIfNull(input);
2569-
ThrowHelper.ThrowIfNullOrWhiteSpace(path);
2570-
2571-
if (_sftpSession is null)
2620+
if (isAsync)
25722621
{
2573-
throw new SshConnectionException("Client not connected.");
2622+
await _sftpSession.RequestCloseAsync(handle, cancellationToken).ConfigureAwait(false);
25742623
}
2575-
2576-
cancellationToken.ThrowIfCancellationRequested();
2577-
2578-
var fullPath = await _sftpSession.GetCanonicalPathAsync(path, cancellationToken).ConfigureAwait(false);
2579-
var openStreamTask = SftpFileStream.OpenAsync(_sftpSession, fullPath, FileMode.Create, FileAccess.Write, (int)_bufferSize, cancellationToken);
2580-
2581-
using (var output = await openStreamTask.ConfigureAwait(false))
2624+
else
25822625
{
2583-
await input.CopyToAsync(output, 81920, cancellationToken).ConfigureAwait(false);
2626+
_sftpSession.RequestClose(handle);
25842627
}
25852628
}
2629+
#pragma warning restore S6966 // Awaitable method should be used
25862630

25872631
/// <summary>
25882632
/// Called when client is connected to the server.

0 commit comments

Comments
 (0)