Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 67 additions & 10 deletions src/Aspire.Hosting.Redis/RedisBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Globalization;
using System.Net.Http.Json;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Redis;
using Aspire.Hosting.Utils;
Expand Down Expand Up @@ -192,20 +194,50 @@ public static IResourceBuilder<RedisResource> WithRedisInsight(this IResourceBui
return builder;
}

static async Task ImportRedisDatabases(ILogger resourceLogger, IEnumerable<RedisResource> redisInstances, HttpClient client, CancellationToken ct)
static async Task ImportRedisDatabases(ILogger resourceLogger, IEnumerable<RedisResource> redisInstances, HttpClient client, CancellationToken cancellationToken)
{
var databasesPath = "/api/databases";

var pipeline = new ResiliencePipelineBuilder().AddRetry(new Polly.Retry.RetryStrategyOptions
{
Delay = TimeSpan.FromSeconds(2),
MaxRetryAttempts = 5,
}).Build();

using (var stream = new MemoryStream())
{
// As part of configuring RedisInsight we need to factor in the possibility that the
// container resource is being run with persistence turned on. In this case we need
// to get the list of existing databases because we might need to delete some.
var lookup = await pipeline.ExecuteAsync(async (ctx) =>
{
var getDatabasesResponse = await client.GetFromJsonAsync<RedisDatabaseDto[]>(databasesPath, cancellationToken).ConfigureAwait(false);
return getDatabasesResponse?.ToLookup(
i => i.Name ?? throw new InvalidDataException("Database name is missing."),
i => i.Id ?? throw new InvalidDataException("Database ID is missing."));
}, cancellationToken).ConfigureAwait(false);

var databasesToDelete = new List<Guid>();

using var writer = new Utf8JsonWriter(stream);

writer.WriteStartArray();

foreach (var redisResource in redisInstances)
{
if (lookup is { } && lookup.Contains(redisResource.Name))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we deleting the servers that are already there, only to add them again?

Wouldn't the more appropriate approach be "if the server already exists, don't add it again" ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the credentials could have changed. Using the api/databases call we don't have a mechanism to check that the credentials are the same, so we just blow it away and recreate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add this in a comment. I don't think this algorithm is intuitive.

Copy link
Member

@eerhardt eerhardt Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add this in a comment. I don't think this algorithm is intuitive.

Did this ever happen?

{
// It is possible that there are multiple databases with
// a conflicting name so we delete them all. This just keeps
// track of the specific ID that we need to delete.
databasesToDelete.AddRange(lookup[redisResource.Name]);
}

if (redisResource.PrimaryEndpoint.IsAllocated)
{
var endpoint = redisResource.PrimaryEndpoint;
writer.WriteStartObject();

writer.WriteString("host", redisResource.Name);
writer.WriteNumber("port", endpoint.TargetPort!.Value);
writer.WriteString("name", redisResource.Name);
Expand All @@ -218,7 +250,7 @@ static async Task ImportRedisDatabases(ILogger resourceLogger, IEnumerable<Redis
}
}
writer.WriteEndArray();
await writer.FlushAsync(ct).ConfigureAwait(false);
await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
stream.Seek(0, SeekOrigin.Begin);

var content = new MultipartFormDataContent();
Expand All @@ -227,23 +259,39 @@ static async Task ImportRedisDatabases(ILogger resourceLogger, IEnumerable<Redis

content.Add(fileContent, "file", "RedisInsight_connections.json");

var apiUrl = $"/api/databases/import";

var pipeline = new ResiliencePipelineBuilder().AddRetry(new Polly.Retry.RetryStrategyOptions
{
Delay = TimeSpan.FromSeconds(2),
MaxRetryAttempts = 5,
}).Build();
var apiUrl = $"{databasesPath}/import";

try
{
if (databasesToDelete.Any())
{
await pipeline.ExecuteAsync(async (ctx) =>
{
// Create a DELETE request to send to the existing instance of
// RedisInsight with the IDs of the database to delete.
var deleteContent = JsonContent.Create(new
{
ids = databasesToDelete
});

var deleteRequest = new HttpRequestMessage(HttpMethod.Delete, databasesPath)
{
Content = deleteContent
};

var deleteResponse = await client.SendAsync(deleteRequest, cancellationToken).ConfigureAwait(false);
deleteResponse.EnsureSuccessStatusCode();

}, cancellationToken).ConfigureAwait(false);
}

await pipeline.ExecuteAsync(async (ctx) =>
{
var response = await client.PostAsync(apiUrl, content, ctx)
.ConfigureAwait(false);

response.EnsureSuccessStatusCode();
}, ct).ConfigureAwait(false);
}, cancellationToken).ConfigureAwait(false);

}
catch (Exception ex)
Expand All @@ -254,6 +302,15 @@ await pipeline.ExecuteAsync(async (ctx) =>
}
}

private class RedisDatabaseDto
{
[JsonPropertyName("id")]
public Guid? Id { get; set; }

[JsonPropertyName("name")]
public string? Name { get; set; }
}

/// <summary>
/// Configures the host port that the Redis Commander resource is exposed on instead of using randomly assigned port.
/// </summary>
Expand Down
106 changes: 106 additions & 0 deletions tests/Aspire.Hosting.Redis.Tests/RedisFunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using StackExchange.Redis;
using Xunit;
using Xunit.Abstractions;
using Aspire.Hosting.Tests.Dcp;

namespace Aspire.Hosting.Redis.Tests;

Expand Down Expand Up @@ -120,6 +121,111 @@ public async Task VerifyRedisResource()
Assert.Equal("value", value);
}

[Fact]
[RequiresDocker]
public async Task VerifyDatabasesAreNotDuplicatedForPersistentRedisInsightContainer()
{
var randomResourceSuffix = Random.Shared.Next(10000).ToString();
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));

var configure = (DistributedApplicationOptions options) =>
{
options.ContainerRegistryOverride = TestConstants.AspireTestContainerRegistry;
};

using var builder1 = TestDistributedApplicationBuilder.Create(configure, testOutputHelper);
builder1.Configuration[$"DcpPublisher:ResourceNameSuffix"] = randomResourceSuffix;

IResourceBuilder<RedisInsightResource>? redisInsightBuilder = null;
var redis1 = builder1.AddRedis("redisForInsightPersistence")
.WithRedisInsight(c =>
{
redisInsightBuilder = c;
c.WithLifetime(ContainerLifetime.Persistent);
});

// Wire up an additional event subcription to ResourceReadyEvent on the RedisInsightResource
// instance. This works because the ResourceReadyEvent fires non-blocking sequential so the
// wire-up that WithRedisInsight does is guaranteed to execute before this one does. So we then
// use this to block pulling the list of databases until we know they've been updated. This
// will repeated below for the second app.
//
// Issue: https://github.com/dotnet/aspire/issues/6455
Assert.NotNull(redisInsightBuilder);
var redisInsightsReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
builder1.Eventing.Subscribe<ResourceReadyEvent>(redisInsightBuilder.Resource, (evt, ct) =>
{
redisInsightsReady.TrySetResult();
return Task.CompletedTask;
});

using var app1 = builder1.Build();

await app1.StartAsync(cts.Token);

await redisInsightsReady.Task.WaitAsync(cts.Token);

using var client1 = app1.CreateHttpClient($"{redis1.Resource.Name}-insight", "http");
var firstRunDatabases = await client1.GetFromJsonAsync<RedisInsightDatabaseModel[]>("/api/databases", cts.Token);

Assert.NotNull(firstRunDatabases);
Assert.Single(firstRunDatabases);
Assert.Equal($"{redis1.Resource.Name}", firstRunDatabases[0].Name);

await app1.StopAsync(cts.Token);

using var builder2 = TestDistributedApplicationBuilder.Create(configure, testOutputHelper);
builder2.Configuration[$"DcpPublisher:ResourceNameSuffix"] = randomResourceSuffix;

var redis2 = builder2.AddRedis("redisForInsightPersistence")
.WithRedisInsight(c =>
{
redisInsightBuilder = c;
c.WithLifetime(ContainerLifetime.Persistent);
});

// Wire up an additional event subcription to ResourceReadyEvent on the RedisInsightResource
// instance. This works because the ResourceReadyEvent fires non-blocking sequential so the
// wire-up that WithRedisInsight does is guaranteed to execute before this one does. So we then
// use this to block pulling the list of databases until we know they've been updated. This
// will repeated below for the second app.
Assert.NotNull(redisInsightBuilder);
redisInsightsReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
builder2.Eventing.Subscribe<ResourceReadyEvent>(redisInsightBuilder.Resource, (evt, ct) =>
{
redisInsightsReady.TrySetResult();
return Task.CompletedTask;
});

using var app2 = builder2.Build();
await app2.StartAsync(cts.Token);

await redisInsightsReady.Task.WaitAsync(cts.Token);

using var client2 = app2.CreateHttpClient($"{redisInsightBuilder.Resource.Name}", "http");
var secondRunDatabases = await client2.GetFromJsonAsync<RedisInsightDatabaseModel[]>("/api/databases", cts.Token);

Assert.NotNull(secondRunDatabases);
Assert.Single(secondRunDatabases);
Assert.Equal($"{redis2.Resource.Name}", secondRunDatabases[0].Name);
Assert.NotEqual(secondRunDatabases.Single().Id, firstRunDatabases.Single().Id);

// HACK: This is a workaround for the fact that ApplicationExecutor is not a public type. What I have
// done here is I get the latest event from RNS for the insights instance which gives me the resource
// name as known from a DCP perspective. I then use the ApplicationExecutorProxy (introduced with this
// change to call the ApplicationExecutor stop method. The proxy is a public type with an internal
// constructor inside the Aspire.Hosting.Tests package. This is a short term solution for 9.0 to
// make sure that we have good test coverage for WithRedisInsight behavior, but we need a better
// long term solution in 9.x for folks that will want to do things like execute commands against
// resources to stop specific containers.
var rns = app2.Services.GetRequiredService<ResourceNotificationService>();
var latestEvent = await rns.WaitForResourceHealthyAsync(redisInsightBuilder.Resource.Name, cts.Token);
var executorProxy = app2.Services.GetRequiredService<ApplicationExecutorProxy>();
await executorProxy.StopResourceAsync(latestEvent.ResourceId, cts.Token);

await app2.StopAsync(cts.Token);
}

[Fact]
[RequiresDocker]
public async Task VerifyWithRedisInsightImportDatabases()
Expand Down
20 changes: 20 additions & 0 deletions tests/Aspire.Hosting.Tests/Dcp/ApplicationExecutorProxy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.Dcp;

namespace Aspire.Hosting.Tests.Dcp;

public class ApplicationExecutorProxy
{
internal ApplicationExecutorProxy(ApplicationExecutor executor)
{
_executor = executor;
}

private readonly ApplicationExecutor _executor;

public Task StartResourceAsync(string resourceName, CancellationToken cancellationToken) => _executor.StartResourceAsync(resourceName, cancellationToken);

public Task StopResourceAsync(string resourceName, CancellationToken cancellationToken) => _executor.StopResourceAsync(resourceName, cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
using System.Reflection;
using Aspire.Components.Common.Tests;
using Aspire.Hosting.Dashboard;
using Aspire.Hosting.Dcp;
using Aspire.Hosting.Eventing;
using Aspire.Hosting.Testing;
using Aspire.Hosting.Tests.Dcp;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -76,6 +78,8 @@ private TestDistributedApplicationBuilder(Action<DistributedApplicationOptions>?
o.OtlpGrpcEndpointUrl ??= "http://localhost:4317";
});

_innerBuilder.Services.AddSingleton<ApplicationExecutorProxy>(sp => new ApplicationExecutorProxy(sp.GetRequiredService<ApplicationExecutor>()));

_innerBuilder.Services.AddHttpClient();
_innerBuilder.Services.ConfigureHttpClientDefaults(http => http.AddStandardResilienceHandler());
if (testOutputHelper is not null)
Expand Down