Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions playground/publishers/Publishers.AppHost/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ services:
ports:
- "8002:8001"
- "8004:8003"
depends_on:
pg:
condition: "service_started"
networks:
- "aspire"
api:
Expand All @@ -37,6 +40,11 @@ services:
ports:
- "8006:8005"
- "8008:8007"
depends_on:
pg:
condition: "service_started"
dbsetup:
condition: "service_completed_successfully"
networks:
- "aspire"
sqlserver:
Expand Down Expand Up @@ -70,6 +78,13 @@ services:
ports:
- "8011:8010"
- "8013:8012"
depends_on:
api:
condition: "service_started"
networks:
- "aspire"
mycontainer:
image: "${MYCONTAINER_IMAGE}"
networks:
- "aspire"
networks:
Expand Down
12 changes: 5 additions & 7 deletions src/Aspire.Cli/Commands/PublishCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ protected override async Task<int> ExecuteAsync(ParseResult parseResult, Cancell

var backchannelCompletionSource = new TaskCompletionSource<AppHostBackchannel>();

var launchingAppHostTask = context.AddTask(":play_button: Launching apphost");
var launchingAppHostTask = context.AddTask(":play_button: Launching apphost");
launchingAppHostTask.IsIndeterminate();
launchingAppHostTask.StartTask();

Expand All @@ -167,7 +167,7 @@ protected override async Task<int> ExecuteAsync(ParseResult parseResult, Cancell

var backchannel = await backchannelCompletionSource.Task.ConfigureAwait(false);

launchingAppHostTask.Description = $":check_mark: Launching apphost";
launchingAppHostTask.Description = $":check_mark: Launching apphost";
launchingAppHostTask.Value = 100;
launchingAppHostTask.StopTask();

Expand All @@ -185,17 +185,17 @@ protected override async Task<int> ExecuteAsync(ParseResult parseResult, Cancell
progressTasks.Add(publishingActivity.Id, progressTask);
}

progressTask.Description = $":play_button: {publishingActivity.StatusText}";
progressTask.Description = $":play_button: {publishingActivity.StatusText}";

if (publishingActivity.IsComplete && !publishingActivity.IsError)
{
progressTask.Description = $":check_mark: {publishingActivity.StatusText}";
progressTask.Description = $":check_mark: {publishingActivity.StatusText}";
progressTask.Value = 100;
progressTask.StopTask();
}
else if (publishingActivity.IsError)
{
progressTask.Description = $"[red bold]:cross_mark: {publishingActivity.StatusText}[/]";
progressTask.Description = $"[red bold]:cross_mark: {publishingActivity.StatusText}[/]";
progressTask.Value = 0;
break;
}
Expand All @@ -205,8 +205,6 @@ protected override async Task<int> ExecuteAsync(ParseResult parseResult, Cancell
}
}

await backchannel.RequestStopAsync(cancellationToken).ConfigureAwait(false);

// When we are running in publish mode we don't want the app host to
// stop itself while we might still be streaming data back across
// the RPC backchannel. So we need to take responsibility for stopping
Expand Down
14 changes: 7 additions & 7 deletions src/Aspire.Hosting/Backchannel/AppHostRpcTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ IHostApplicationLifetime lifetime
{
while (cancellationToken.IsCancellationRequested == false)
{
var publishingActivity = await activityReporter.ActivitiyUpdated.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
var publishingActivityStatus = await activityReporter.ActivityStatusUpdated.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);

if (publishingActivity == null)
if (publishingActivityStatus == null)
{
// If the publishing activity is null, it means that the activity has been removed.
// This can happen if the activity is complete or an error occurred.
yield break;
}

yield return (
publishingActivity.Id,
publishingActivity.StatusMessage,
publishingActivity.IsComplete,
publishingActivity.IsError
publishingActivityStatus.Activity.Id,
publishingActivityStatus.StatusText,
publishingActivityStatus.IsComplete,
publishingActivityStatus.IsError
);

if ( publishingActivity.IsPrimary &&(publishingActivity.IsComplete || publishingActivity.IsError))
if ( publishingActivityStatus.Activity.IsPrimary &&(publishingActivityStatus.IsComplete || publishingActivityStatus.IsError))
{
// If the activity is complete or an error and it is the primary activity,
// we can stop listening for updates.
Expand Down
12 changes: 8 additions & 4 deletions src/Aspire.Hosting/DistributedApplicationRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ await eventing.PublishAsync<AfterPublishEvent>(
new AfterPublishEvent(serviceProvider, model), stoppingToken
).ConfigureAwait(false);

publishingActivity.IsComplete = true;
await activityReporter.UpdateActivityAsync(publishingActivity, stoppingToken).ConfigureAwait(false);
await activityReporter.UpdateActivityStatusAsync(
publishingActivity,
(status) => status with { IsComplete = true },
stoppingToken).ConfigureAwait(false);

// If we are running in publish mode and a backchannel is being
// used then we don't want to stop the app host. Instead the
Expand All @@ -65,8 +67,10 @@ await eventing.PublishAsync<AfterPublishEvent>(
catch (Exception ex)
{
logger.LogError(ex, "Failed to publish the distributed application.");
publishingActivity.IsError = true;
await activityReporter.UpdateActivityAsync(publishingActivity, stoppingToken).ConfigureAwait(false);
await activityReporter.UpdateActivityStatusAsync(
publishingActivity,
(status) => status with { IsError = true },
stoppingToken).ConfigureAwait(false);

if (!backchannelService.IsBackchannelExpected)
{
Expand Down
88 changes: 70 additions & 18 deletions src/Aspire.Hosting/Publishing/PublishingActivityProgressReporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ namespace Aspire.Hosting.Publishing;
[Experimental("ASPIREPUBLISHERS001")]
public sealed class PublishingActivity
{
internal PublishingActivity(string id, string initialStatusText, bool isPrimary = false)
internal PublishingActivity(string id, bool isPrimary = false)
{
Id = id;
StatusMessage = initialStatusText;
IsPrimary = isPrimary;
}

Expand All @@ -27,25 +26,41 @@ internal PublishingActivity(string id, string initialStatusText, bool isPrimary
public string Id { get; private set; }

/// <summary>
/// Status message of the publishing activity.
/// Indicates whether the publishing activity is the primary activity.
/// </summary>
public string StatusMessage { get; set; }
public bool IsPrimary { get; private set; }

/// <summary>
/// Indicates whether the publishing activity is complete.
/// The status text of the publishing activity.
/// </summary>
public bool IsComplete { get; set; }
public PublishingActivityStatus? LastStatus { get; internal set; }
}

/// <summary>
/// Represents the status of a publishing activity.
/// </summary>
[Experimental("ASPIREPUBLISHERS001")]
public sealed record PublishingActivityStatus
{
/// <summary>
/// Indicates whether the publishing activity is the primary activity.
/// The publishing activity associated with this status.
/// </summary>
public bool IsPrimary { get; private set; }
public required PublishingActivity Activity { get; init; }

/// <summary>
/// Indicates whether the publishing activity has encountered an error.
/// The status text of the publishing activity.
/// </summary>
public bool IsError { get; set; }
public required string StatusText { get; init; }

/// <summary>
/// Indicates whether the publishing activity is complete.
/// </summary>
public required bool IsComplete { get; init; }

/// <summary>
/// Indicates whether the publishing activity encountered an error.
/// </summary>
public required bool IsError { get; init; }
}

/// <summary>
Expand Down Expand Up @@ -73,31 +88,68 @@ public interface IPublishingActivityProgressReporter
/// Updates the status of an existing publishing activity.
/// </summary>
/// <param name="publishingActivity">The activity with updated properties.</param>
/// <param name="statusUpdate"></param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns></returns>
Task UpdateActivityAsync(PublishingActivity publishingActivity, CancellationToken cancellationToken);
Task UpdateActivityStatusAsync(PublishingActivity publishingActivity, Func<PublishingActivityStatus, PublishingActivityStatus> statusUpdate, CancellationToken cancellationToken);
}

internal sealed class PublishingActivityProgressReporter : IPublishingActivityProgressReporter
{
public async Task<PublishingActivity> CreateActivityAsync(string id, string initialStatusText, bool isPrimary, CancellationToken cancellationToken)
{
var publishingActivity = new PublishingActivity(id, initialStatusText, isPrimary);
await ActivitiyUpdated.Writer.WriteAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
var publishingActivity = new PublishingActivity(id, isPrimary);
await UpdateActivityStatusAsync(
publishingActivity,
(status) => status with
{
StatusText = initialStatusText,
IsComplete = false,
IsError = false
},
cancellationToken
).ConfigureAwait(false);

return publishingActivity;
}

public async Task UpdateActivityAsync(PublishingActivity publishingActivity, CancellationToken cancellationToken)
private readonly object _updateLock = new object();

public async Task UpdateActivityStatusAsync(PublishingActivity publishingActivity, Func<PublishingActivityStatus, PublishingActivityStatus> statusUpdate, CancellationToken cancellationToken)
{
await ActivitiyUpdated.Writer.WriteAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
PublishingActivityStatus? lastStatus;
PublishingActivityStatus? newStatus;

lock (_updateLock)
{
lastStatus = publishingActivity.LastStatus ?? new PublishingActivityStatus
{
Activity = publishingActivity,
StatusText = string.Empty,
IsComplete = false,
IsError = false
};

newStatus = statusUpdate(lastStatus);
publishingActivity.LastStatus = newStatus;
}

if (lastStatus == newStatus)
{
throw new DistributedApplicationException(
$"The status of the publishing activity '{publishingActivity.Id}' was not updated. The status update function must return a new instance of the status."
);
}

await ActivityStatusUpdated.Writer.WriteAsync(newStatus, cancellationToken).ConfigureAwait(false);

if (publishingActivity.IsPrimary && (publishingActivity.IsComplete || publishingActivity.IsError))
if (publishingActivity.IsPrimary && (newStatus.IsComplete || newStatus.IsError))
{
// If the activity is complete or an error and it is the primary activity,
// we can stop listening for updates.
ActivitiyUpdated.Writer.Complete();
ActivityStatusUpdated.Writer.Complete();
}
}

internal Channel<PublishingActivity> ActivitiyUpdated { get; } = Channel.CreateUnbounded<PublishingActivity>();
internal Channel<PublishingActivityStatus> ActivityStatusUpdated { get; } = Channel.CreateUnbounded<PublishingActivityStatus>();
}
20 changes: 12 additions & 8 deletions src/Aspire.Hosting/Publishing/ResourceContainerImageBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,17 @@ private async Task<string> BuildProjectContainerImageAsync(IResource resource, C
stdout,
stderr);

publishingActivity.IsError = true;
await activityReporter.UpdateActivityAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
await activityReporter.UpdateActivityStatusAsync(
publishingActivity, (status) => status with { IsError = true },
cancellationToken).ConfigureAwait(false);

throw new DistributedApplicationException($"Failed to build container image, stdout: {stdout}, stderr: {stderr}");
}
else
{
publishingActivity.IsComplete = true;
await activityReporter.UpdateActivityAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
await activityReporter.UpdateActivityStatusAsync(
publishingActivity, (status) => status with { IsComplete = true },
cancellationToken).ConfigureAwait(false);

logger.LogDebug(
".NET CLI completed with exit code: {ExitCode}",
Expand Down Expand Up @@ -171,17 +173,19 @@ private async Task<string> BuildContainerImageFromDockerfileAsync(string resourc
imageName,
cancellationToken).ConfigureAwait(false);

publishingActivity.IsComplete = true;
await activityReporter.UpdateActivityAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
await activityReporter.UpdateActivityStatusAsync(
publishingActivity, (status) => status with { IsComplete = true },
cancellationToken).ConfigureAwait(false);

return image;
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to build container image from Dockerfile.");

publishingActivity.IsError = true;
await activityReporter.UpdateActivityAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
await activityReporter.UpdateActivityStatusAsync(
publishingActivity, (status) => status with { IsError = true },
cancellationToken).ConfigureAwait(false);

throw;
}
Expand Down
Loading