Skip to content

Commit 3a41614

Browse files
Added complete implementation for migrating incompatible indexes during datastore registration
1 parent 05f755f commit 3a41614

File tree

4 files changed

+231
-15
lines changed

4 files changed

+231
-15
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ targets: [
5151
### Road to 0.1 Betas
5252

5353
As this project matures towards its first beta, a number of features still need to be fleshed out:
54-
- Migrating entries
54+
- Typed IndexName, remove use of "key" in datastore
55+
- Index deletion
5556

5657
The above list will be kept up to date during development and will likely see additions during that process.
5758

5859
### Road to 1.0
5960

6061
Once an initial beta is released, the project will start focussing on the functionality and work listed below:
62+
- Force migration methods
6163
- Composite indexes (via macros?)
6264
- Cleaning up old resources in memory
6365
- Cleaning up old resources on disk

Sources/CodableDatastore/Datastore/Datastore.swift

Lines changed: 218 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -146,22 +146,215 @@ extension Datastore {
146146
}
147147

148148
func registerAndMigrate(with transaction: DatastoreInterfaceProtocol) async throws {
149-
let descriptor = try await transaction.register(datastore: self)
150-
print("\(String(describing: descriptor))")
149+
let persistedDescriptor = try await transaction.register(datastore: self)
151150

152151
/// Only operate on read-write datastores beyond this point.
153-
guard let self = self as? Datastore<Version, CodedType, IdentifierType, ReadWrite> else { return }
154-
print("\(self)")
152+
guard let self = self as? Datastore<Version, CodedType, IdentifierType, ReadWrite>
153+
else { return }
155154

155+
/// Make sure we have a descriptor, and that there is at least one entry, otherwise stop here.
156+
guard let persistedDescriptor, persistedDescriptor.size > 0
157+
else { return }
158+
159+
/// Check the version to see if the current one is greater or equal to the one in the existing descriptor. If we can't decode it, stop here and throw an error — the data store is unsupported.
160+
let persistedVersion = try Version(persistedDescriptor.version)
161+
guard persistedVersion.rawValue <= version.rawValue
162+
else { throw DatastoreError.incompatibleVersion(version: String(describing: persistedVersion)) }
163+
164+
/// Notify progress handlers we are evaluating for possible migrations.
156165
for handler in warmupProgressHandlers {
157166
handler(.evaluating)
158167
}
159168

160-
// TODO: Migrate any incompatible indexes by calling the internal methods below as needed.
161-
await Task.yield() // The "work"
169+
var newDescriptor: DatastoreDescriptor?
170+
171+
let primaryIndex = load(IndexRange(), order: .ascending, awaitWarmup: false)
172+
173+
var rebuildPrimaryIndex = false
174+
var directIndexesToBuild: Set<String> = []
175+
var secondaryIndexesToBuild: Set<String> = []
176+
var index = 0
177+
178+
let versionData = try Data(self.version)
179+
180+
for try await (idenfifier, instance) in primaryIndex {
181+
defer { index += 1 }
182+
/// Use the first index to grab an up-to-date descriptor
183+
if newDescriptor == nil {
184+
let updatedDescriptor = try updatedDescriptor(for: instance)
185+
newDescriptor = updatedDescriptor
186+
187+
/// Check the primary index for compatibility.
188+
if persistedDescriptor.identifierType != updatedDescriptor.identifierType {
189+
try await transaction.resetPrimaryIndex(datastoreKey: key)
190+
rebuildPrimaryIndex = true
191+
}
192+
193+
/// Check existing direct indexes for compatibility
194+
for (indexKey, persistedIndex) in persistedDescriptor.directIndexes {
195+
if let updatedIndex = updatedDescriptor.directIndexes[indexKey] {
196+
/// If the index still exists, make sure it is compatible by checking their types, or checking if the primary index must be re-built.
197+
if persistedIndex.indexType != updatedIndex.indexType || rebuildPrimaryIndex {
198+
/// They were not compatible, so delete the bad index, and queue it to be re-built.
199+
try await transaction.deleteDirectIndex(indexName: persistedIndex.key, datastoreKey: key)
200+
directIndexesToBuild.insert(indexKey)
201+
}
202+
} else {
203+
/// The index is no longer needed, delete it.
204+
try await transaction.deleteDirectIndex(indexName: persistedIndex.key, datastoreKey: key)
205+
}
206+
}
207+
208+
/// Check for new direct indexes to build
209+
for (indexKey, _) in updatedDescriptor.directIndexes {
210+
guard persistedDescriptor.directIndexes[indexKey] == nil else { continue }
211+
/// The index does not yet exist, so queue it to be built.
212+
directIndexesToBuild.insert(indexKey)
213+
}
214+
215+
/// Check existing secondary indexes for compatibility
216+
for (indexKey, persistedIndex) in persistedDescriptor.secondaryIndexes {
217+
if let updatedIndex = updatedDescriptor.secondaryIndexes[indexKey] {
218+
/// If the index still exists, make sure it is compatible
219+
if persistedIndex.indexType != updatedIndex.indexType {
220+
/// They were not compatible, so delete the bad index, and queue it to be re-built.
221+
try await transaction.deleteDirectIndex(indexName: persistedIndex.key, datastoreKey: key)
222+
secondaryIndexesToBuild.insert(indexKey)
223+
}
224+
} else {
225+
/// The index is no longer needed, delete it.
226+
try await transaction.deleteDirectIndex(indexName: persistedIndex.key, datastoreKey: key)
227+
}
228+
}
229+
230+
/// Check for new secondary indexes to build
231+
for (indexKey, _) in updatedDescriptor.secondaryIndexes {
232+
guard persistedDescriptor.secondaryIndexes[indexKey] == nil else { continue }
233+
/// The index does not yet exist, so queue it to be built.
234+
secondaryIndexesToBuild.insert(indexKey)
235+
}
236+
237+
/// Remove any direct indexes from the secondary ones we may have requested.
238+
secondaryIndexesToBuild.subtract(directIndexesToBuild)
239+
240+
/// If we don't need to migrate anything, stop here.
241+
if rebuildPrimaryIndex == false, directIndexesToBuild.isEmpty, secondaryIndexesToBuild.isEmpty {
242+
break
243+
}
244+
245+
/// Create any missing indexes and prime the datastore for writing.
246+
try await transaction.apply(descriptor: updatedDescriptor, for: key)
247+
}
248+
249+
/// Notify progress handlers we are starting an entry.
250+
for handler in warmupProgressHandlers {
251+
handler(.working(current: index, total: persistedDescriptor.size))
252+
}
253+
254+
let instanceData = try await encoder(instance)
255+
256+
if rebuildPrimaryIndex {
257+
let insertionCursor = try await transaction.primaryIndexCursor(inserting: idenfifier, datastoreKey: key)
258+
259+
try await transaction.persistPrimaryIndexEntry(
260+
versionData: versionData,
261+
identifierValue: idenfifier,
262+
instanceData: instanceData,
263+
cursor: insertionCursor,
264+
datastoreKey: key
265+
)
266+
}
267+
268+
var queriedIndexes: Set<String> = []
269+
270+
/// Persist the direct indexes with full copies
271+
for indexPath in directIndexes {
272+
let indexName = indexPath.path
273+
guard
274+
directIndexesToBuild.contains(indexName),
275+
!queriedIndexes.contains(indexName)
276+
else { continue }
277+
queriedIndexes.insert(indexName)
278+
279+
let updatedValue = instance[keyPath: indexPath]
280+
281+
/// Grab a cursor to insert the new value in the index.
282+
let updatedValueCursor = try await transaction.directIndexCursor(
283+
inserting: updatedValue.indexed,
284+
identifier: idenfifier,
285+
indexName: indexName,
286+
datastoreKey: key
287+
)
288+
289+
/// Insert it.
290+
try await transaction.persistDirectIndexEntry(
291+
versionData: versionData,
292+
indexValue: updatedValue.indexed,
293+
identifierValue: idenfifier,
294+
instanceData: instanceData,
295+
cursor: updatedValueCursor,
296+
indexName: indexName,
297+
datastoreKey: key
298+
)
299+
}
300+
301+
/// Next, go through any remaining computed indexes as secondary indexes.
302+
for indexPath in computedIndexes {
303+
let indexName = indexPath.path
304+
guard
305+
secondaryIndexesToBuild.contains(indexName),
306+
!queriedIndexes.contains(indexName)
307+
else { continue }
308+
queriedIndexes.insert(indexName)
309+
310+
let updatedValue = instance[keyPath: indexPath]
311+
312+
/// Grab a cursor to insert the new value in the index.
313+
let updatedValueCursor = try await transaction.secondaryIndexCursor(
314+
inserting: updatedValue.indexed,
315+
identifier: idenfifier,
316+
indexName: indexName,
317+
datastoreKey: self.key
318+
)
319+
320+
/// Insert it.
321+
try await transaction.persistSecondaryIndexEntry(
322+
indexValue: updatedValue.indexed,
323+
identifierValue: idenfifier,
324+
cursor: updatedValueCursor,
325+
indexName: indexName,
326+
datastoreKey: self.key
327+
)
328+
}
329+
330+
/// Re-insert any remaining indexed values into the new index.
331+
try await Mirror.indexedChildren(from: instance, assertIdentifiable: true) { indexName, value in
332+
guard
333+
secondaryIndexesToBuild.contains(indexName),
334+
!queriedIndexes.contains(indexName)
335+
else { return }
336+
337+
/// Grab a cursor to insert the new value in the index.
338+
let updatedValueCursor = try await transaction.secondaryIndexCursor(
339+
inserting: value,
340+
identifier: idenfifier,
341+
indexName: indexName,
342+
datastoreKey: self.key
343+
)
344+
345+
/// Insert it.
346+
try await transaction.persistSecondaryIndexEntry(
347+
indexValue: value,
348+
identifierValue: idenfifier,
349+
cursor: updatedValueCursor,
350+
indexName: indexName,
351+
datastoreKey: self.key
352+
)
353+
}
354+
}
162355

163356
for handler in warmupProgressHandlers {
164-
handler(.complete(total: 0))
357+
handler(.complete(total: persistedDescriptor.size))
165358
}
166359

167360
warmupProgressHandlers.removeAll()
@@ -288,12 +481,15 @@ extension Datastore {
288481
}
289482
}
290483

291-
nonisolated public func load(
484+
nonisolated func load(
292485
_ range: some IndexRangeExpression<IdentifierType>,
293-
order: RangeOrder = .ascending
294-
) -> some TypedAsyncSequence<CodedType> {
486+
order: RangeOrder,
487+
awaitWarmup: Bool
488+
) -> some TypedAsyncSequence<(id: IdentifierType, instance: CodedType)> {
295489
AsyncThrowingBackpressureStream { provider in
296-
try await self.warmupIfNeeded()
490+
if awaitWarmup {
491+
try await self.warmupIfNeeded()
492+
}
297493

298494
try await self.persistence._withTransaction(
299495
actionName: nil,
@@ -302,14 +498,22 @@ extension Datastore {
302498
try await transaction.primaryIndexScan(range: range.applying(order), datastoreKey: self.key) { versionData, instanceData in
303499
let entryVersion = try Version(versionData)
304500
let decoder = try await self.decoder(for: entryVersion)
305-
let instance = try await decoder(instanceData).instance
501+
let decodedValue = try await decoder(instanceData)
306502

307-
try await provider.yield(instance)
503+
try await provider.yield(decodedValue)
308504
}
309505
}
310506
}
311507
}
312508

509+
nonisolated public func load(
510+
_ range: some IndexRangeExpression<IdentifierType>,
511+
order: RangeOrder = .ascending
512+
) -> some TypedAsyncSequence<CodedType> {
513+
load(range, order: order, awaitWarmup: true)
514+
.map { $0.instance }
515+
}
516+
313517
@_disfavoredOverload
314518
public nonisolated func load(
315519
_ range: IndexRange<IdentifierType>,
@@ -631,7 +835,7 @@ extension Datastore where AccessMode == ReadWrite {
631835
)
632836
}
633837

634-
/// Re-insert those indexes from the new index.
838+
/// Re-insert those indexes into the new index.
635839
try await Mirror.indexedChildren(from: instance, assertIdentifiable: true) { indexName, value in
636840
guard !queriedIndexes.contains(indexName) else { return }
637841

Sources/CodableDatastore/Datastore/DatastoreError.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,17 @@ public enum DatastoreError: LocalizedError {
1313
/// A decoder was missing for the specified version.
1414
case missingDecoder(version: String)
1515

16+
/// The persisted version is incompatible with the one supported by the datastore.
17+
case incompatibleVersion(version: String?)
18+
1619
public var errorDescription: String? {
1720
switch self {
1821
case .missingDecoder(let version):
1922
return "The decoder for version \(version) is missing."
23+
case .incompatibleVersion(.some(let version)):
24+
return "The persisted version \(version) is newer than the one supported by the datastore."
25+
case .incompatibleVersion(.none):
26+
return "The persisted version is incompatible with the one supported by the datastore."
2027
}
2128
}
2229
}

Sources/CodableDatastore/Persistence/TransactionOptions.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,7 @@ public struct UnsafeTransactionOptions: OptionSet {
5151

5252
/// The transaction should skip emitting observations. This is useful when the transaction must enumerate and modify the entire data set, which would cause each modified entry to be kept in memory for the duration of the transaction.
5353
public static let skipObservations = Self(rawValue: 1 << 16)
54+
55+
/// The transaction should persist to storage even if it is a child transaction. Note that this must be the _first_ non-readonly child transaction of a parent transaction to succeed.
56+
public static let enforceDurability = Self(rawValue: 1 << 17)
5457
}

0 commit comments

Comments
 (0)