Skip to content

Commit 6083994

Browse files
committed
Observable backup and restore - SnapshotObservable and RestoreObservable unit tests
1 parent def085a commit 6083994

File tree

6 files changed

+796
-102
lines changed

6 files changed

+796
-102
lines changed

src/Nest/Domain/Responses/RestoreObservable.cs

Lines changed: 145 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Diagnostics;
34
using System.Linq;
45
using System.Text.RegularExpressions;
@@ -13,13 +14,22 @@ public class RestoreObservable : IDisposable, IObservable<IRecoveryStatusRespons
1314
private readonly TimeSpan _interval = TimeSpan.FromSeconds(2);
1415
private Timer _timer;
1516
private bool _disposed;
16-
private string _renamePattern;
17-
private string _renameReplacement;
17+
private readonly RestoreStatusHumbleObject _restoreStatusHumbleObject;
18+
private EventHandler<RestoreNextEventArgs> _nextEventHandlers;
19+
private EventHandler<RestoreCompletedEventArgs> _completedEentHandlers;
20+
private EventHandler<RestoreErrorEventArgs> _errorEventHandlers;
1821

1922
public RestoreObservable(IElasticClient elasticClient, IRestoreRequest restoreRequest)
2023
{
24+
elasticClient.ThrowIfNull("elasticClient");
25+
restoreRequest.ThrowIfNull("restoreRequest");
26+
2127
_elasticClient = elasticClient;
2228
_restoreRequest = restoreRequest;
29+
30+
_restoreStatusHumbleObject = new RestoreStatusHumbleObject(elasticClient, restoreRequest);
31+
_restoreStatusHumbleObject.Completed += StopTimer;
32+
_restoreStatusHumbleObject.Error += StopTimer;
2333
}
2434

2535
public RestoreObservable(IElasticClient elasticClient, IRestoreRequest restoreRequest, TimeSpan interval)
@@ -43,10 +53,19 @@ public IDisposable Subscribe(IObserver<IRecoveryStatusResponse> observer)
4353
if (!restoreResponse.IsValid)
4454
throw new RestoreException(restoreResponse.ConnectionStatus);
4555

46-
_renamePattern = _restoreRequest.RenamePattern;
47-
_renameReplacement = _restoreRequest.RenameReplacement;
56+
EventHandler<RestoreNextEventArgs> onNext = (sender, args) => observer.OnNext(args.RecoveryStatusResponse);
57+
EventHandler<RestoreCompletedEventArgs> onCompleted = (sender, args) => observer.OnCompleted();
58+
EventHandler<RestoreErrorEventArgs> onError = (sender, args) => observer.OnError(args.Exception);
59+
60+
_nextEventHandlers = onNext;
61+
_completedEentHandlers = onCompleted;
62+
_errorEventHandlers = onError;
4863

49-
_timer = new Timer(Restore, observer, _interval.Milliseconds, Timeout.Infinite);
64+
_restoreStatusHumbleObject.Next += onNext;
65+
_restoreStatusHumbleObject.Completed += onCompleted;
66+
_restoreStatusHumbleObject.Error += onError;
67+
68+
_timer = new Timer(Restore, observer, (long)_interval.TotalMilliseconds, Timeout.Infinite);
5069
}
5170
catch (Exception exception)
5271
{
@@ -67,33 +86,20 @@ private void Restore(object state)
6786
var watch = new Stopwatch();
6887
watch.Start();
6988

70-
var recoveryStatus = _elasticClient.RecoveryStatus(
71-
descriptor =>
72-
descriptor.Indices(
73-
_restoreRequest.Indices.Select(
74-
x => Regex.Replace(x.Name, _renamePattern, _renameReplacement)).ToArray())
75-
.Detailed(true)
76-
);
77-
78-
if (!recoveryStatus.IsValid)
79-
throw new RestoreException(recoveryStatus.ConnectionStatus);
80-
81-
if (recoveryStatus.Indices.All(x => x.Value.Shards.All(s => s.Index.Files.Recovered == s.Index.Files.Total)))
82-
{
83-
_timer.Change(Timeout.Infinite, Timeout.Infinite);
84-
observer.OnCompleted();
85-
return;
86-
}
89+
_restoreStatusHumbleObject.CheckStatus();
8790

88-
observer.OnNext(recoveryStatus);
89-
_timer.Change(Math.Max(0, _interval.Milliseconds - watch.ElapsedMilliseconds), Timeout.Infinite);
91+
_timer.Change(Math.Max(0, (long)_interval.TotalMilliseconds - watch.ElapsedMilliseconds), Timeout.Infinite);
9092
}
9193
catch (Exception exception)
9294
{
93-
_timer.Change(Timeout.Infinite, Timeout.Infinite);
9495
observer.OnError(exception);
9596
}
9697
}
98+
99+
private void StopTimer(object sender, EventArgs restoreCompletedEventArgs)
100+
{
101+
_timer.Change(Timeout.Infinite, Timeout.Infinite);
102+
}
97103

98104
public void Dispose()
99105
{
@@ -105,6 +111,12 @@ protected virtual void Dispose(bool disposing)
105111
if (_disposed) return;
106112

107113
_timer.Dispose();
114+
_restoreStatusHumbleObject.Next -= _nextEventHandlers;
115+
_restoreStatusHumbleObject.Completed -= _completedEentHandlers;
116+
_restoreStatusHumbleObject.Error -= _errorEventHandlers;
117+
118+
_restoreStatusHumbleObject.Completed -= StopTimer;
119+
_restoreStatusHumbleObject.Error -= StopTimer;
108120

109121
_disposed = true;
110122
}
@@ -114,4 +126,112 @@ protected virtual void Dispose(bool disposing)
114126
Dispose(false);
115127
}
116128
}
129+
130+
public class RestoreNextEventArgs : EventArgs
131+
{
132+
public IRecoveryStatusResponse RecoveryStatusResponse { get; private set; }
133+
134+
public RestoreNextEventArgs(IRecoveryStatusResponse recoveryStatusResponse)
135+
{
136+
RecoveryStatusResponse = recoveryStatusResponse;
137+
}
138+
}
139+
140+
public class RestoreCompletedEventArgs : EventArgs
141+
{
142+
public IRecoveryStatusResponse RecoveryStatusResponse { get; private set; }
143+
144+
public RestoreCompletedEventArgs(IRecoveryStatusResponse recoveryStatusResponse)
145+
{
146+
RecoveryStatusResponse = recoveryStatusResponse;
147+
}
148+
}
149+
150+
public class RestoreErrorEventArgs : EventArgs
151+
{
152+
public Exception Exception { get; private set; }
153+
154+
public RestoreErrorEventArgs(Exception exception)
155+
{
156+
Exception = exception;
157+
}
158+
}
159+
160+
public class RestoreStatusHumbleObject
161+
{
162+
private readonly IElasticClient _elasticClient;
163+
private readonly IRestoreRequest _restoreRequest;
164+
private string _renamePattern;
165+
private string _renameReplacement;
166+
167+
public event EventHandler<RestoreCompletedEventArgs> Completed;
168+
public event EventHandler<RestoreErrorEventArgs> Error;
169+
public event EventHandler<RestoreNextEventArgs> Next;
170+
171+
public RestoreStatusHumbleObject(IElasticClient elasticClient, IRestoreRequest restoreRequest)
172+
{
173+
elasticClient.ThrowIfNull("elasticClient");
174+
restoreRequest.ThrowIfNull("restoreRequest");
175+
176+
_elasticClient = elasticClient;
177+
_restoreRequest = restoreRequest;
178+
179+
_renamePattern = string.IsNullOrEmpty(_restoreRequest.RenamePattern) ? string.Empty : _restoreRequest.RenamePattern;
180+
_renameReplacement = string.IsNullOrEmpty(_restoreRequest.RenameReplacement) ? string.Empty : _restoreRequest.RenameReplacement;
181+
}
182+
183+
public void CheckStatus()
184+
{
185+
try
186+
{
187+
var indices =
188+
_restoreRequest.Indices.Select(
189+
x => new IndexNameMarker
190+
{
191+
Name = Regex.Replace(x.Name, _renamePattern, _renameReplacement),
192+
Type = x.Type
193+
})
194+
.ToArray();
195+
196+
var recoveryStatus = _elasticClient.RecoveryStatus(new RecoveryStatusRequest
197+
{
198+
Detailed = true,
199+
Indices = indices
200+
});
201+
202+
if (!recoveryStatus.IsValid)
203+
throw new RestoreException(recoveryStatus.ConnectionStatus);
204+
205+
if (recoveryStatus.Indices.All(x => x.Value.Shards.All(s => s.Index.Files.Recovered == s.Index.Files.Total)))
206+
{
207+
OnCompleted(new RestoreCompletedEventArgs(recoveryStatus));
208+
return;
209+
}
210+
211+
OnNext(new RestoreNextEventArgs(recoveryStatus));
212+
}
213+
catch (Exception exception)
214+
{
215+
OnError(new RestoreErrorEventArgs(exception));
216+
}
217+
}
218+
219+
protected virtual void OnNext(RestoreNextEventArgs nextEventArgs)
220+
{
221+
var handler = Next;
222+
if (handler != null) handler(this, nextEventArgs);
223+
}
224+
225+
protected virtual void OnCompleted(RestoreCompletedEventArgs completedEventArgs)
226+
{
227+
var handler = Completed;
228+
if (handler != null) handler(this, completedEventArgs);
229+
}
230+
231+
protected virtual void OnError(RestoreErrorEventArgs errorEventArgs)
232+
{
233+
var handler = Error;
234+
if (handler != null) handler(this, errorEventArgs);
235+
}
236+
}
117237
}

0 commit comments

Comments
 (0)