Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ proc requestStorage*(
proc onStore(
self: CodexNodeRef,
request: StorageRequest,
expiry: SecondsSince1970,
slotIdx: uint64,
blocksCb: BlocksCb,
isRepairing: bool = false,
Expand Down Expand Up @@ -651,8 +652,6 @@ proc onStore(
trace "Unable to create slots builder", err = err.msg
return failure(err)

let expiry = request.expiry

if slotIdx > manifest.slotRoots.high.uint64:
trace "Slot index not in manifest", slotIdx
return failure(newException(CodexError, "Slot index not in manifest"))
Expand All @@ -663,7 +662,7 @@ proc onStore(
trace "Updating expiry for blocks", blocks = blocks.len

let ensureExpiryFutures =
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970))
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry))

let res = await allFinishedFailed[?!void](ensureExpiryFutures)
if res.failure.len > 0:
Expand Down Expand Up @@ -789,11 +788,12 @@ proc start*(self: CodexNodeRef) {.async.} =
if hostContracts =? self.contracts.host:
hostContracts.sales.onStore = proc(
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
onBatch: BatchProc,
isRepairing: bool = false,
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
self.onStore(request, slot, onBatch, isRepairing)
self.onStore(request, expiry, slot, onBatch, isRepairing)

hostContracts.sales.onExpiryUpdate = proc(
rootCid: Cid, expiry: SecondsSince1970
Expand Down
31 changes: 9 additions & 22 deletions codex/sales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -148,26 +148,12 @@ proc cleanUp(

# Re-add items back into the queue to prevent small availabilities from
# draining the queue. Seen items will be ordered last.
if data.slotIndex <= uint16.high.uint64 and reprocessSlot and request =? data.request:
let res =
await noCancel sales.context.market.slotCollateral(data.requestId, data.slotIndex)
if res.isErr:
error "Failed to re-add item back to the slot queue: unable to calculate collateral",
error = res.error.msg
else:
let collateral = res.get()
let queue = sales.context.slotQueue
var seenItem = SlotQueueItem.init(
data.requestId,
data.slotIndex.uint16,
data.ask,
request.expiry,
seen = true,
collateral = collateral,
)
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(seenItem).errorOption:
error "failed to readd slot to queue", errorType = $(type err), error = err.msg
if reprocessSlot and request =? data.request and var item =? agent.data.slotQueueItem:
let queue = sales.context.slotQueue
item.seen = true
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(item).errorOption:
error "failed to readd slot to queue", errorType = $(type err), error = err.msg

let fut = sales.remove(agent)
sales.trackedFutures.track(fut)
Expand All @@ -181,8 +167,9 @@ proc processSlot(
) {.async: (raises: [CancelledError]).} =
debug "Processing slot from queue", requestId = item.requestId, slot = item.slotIndex

let agent =
newSalesAgent(sales.context, item.requestId, item.slotIndex, none StorageRequest)
let agent = newSalesAgent(
sales.context, item.requestId, item.slotIndex, none StorageRequest, some item
)

let completed = newAsyncEvent()

Expand Down
9 changes: 8 additions & 1 deletion codex/sales/salesagent.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import ./statemachine
import ./salescontext
import ./salesdata
import ./reservations
import ./slotqueue

export reservations

Expand Down Expand Up @@ -42,10 +43,16 @@ proc newSalesAgent*(
requestId: RequestId,
slotIndex: uint64,
request: ?StorageRequest,
slotQueueItem = SlotQueueItem.none,
): SalesAgent =
var agent = SalesAgent.new()
agent.context = context
agent.data = SalesData(requestId: requestId, slotIndex: slotIndex, request: request)
agent.data = SalesData(
requestId: requestId,
slotIndex: slotIndex,
request: request,
slotQueueItem: slotQueueItem,
)
return agent

proc retrieveRequest*(agent: SalesAgent) {.async.} =
Expand Down
6 changes: 5 additions & 1 deletion codex/sales/salescontext.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ type
gcsafe, async: (raises: [CancelledError])
.}
OnStore* = proc(
request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool
request: StorageRequest,
expiry: SecondsSince1970,
slot: uint64,
blocksCb: BlocksCb,
isRepairing: bool,
): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.
gcsafe, async: (raises: [CancelledError])
Expand Down
2 changes: 2 additions & 0 deletions codex/sales/salesdata.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import pkg/chronos
import ../contracts/requests
import ../market
import ./reservations
import ./slotqueue

type SalesData* = ref object
requestId*: RequestId
Expand All @@ -10,3 +11,4 @@ type SalesData* = ref object
slotIndex*: uint64
cancelled*: Future[void]
reservation*: ?Reservation
slotQueueItem*: ?SlotQueueItem
36 changes: 30 additions & 6 deletions codex/sales/slotqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type
duration: uint64
pricePerBytePerSecond: UInt256
collateral: UInt256 # Collateral computed
expiry: uint64
expiry: ?uint64
seen: bool

# don't need to -1 to prevent overflow when adding 1 (to always allow push)
Expand Down Expand Up @@ -89,8 +89,9 @@ proc `<`*(a, b: SlotQueueItem): bool =
scoreA.addIf(a.collateral < b.collateral, 2)
scoreB.addIf(a.collateral > b.collateral, 2)

scoreA.addIf(a.expiry > b.expiry, 1)
scoreB.addIf(a.expiry < b.expiry, 1)
if expiryA =? a.expiry and expiryB =? b.expiry:
scoreA.addIf(expiryA > expiryB, 1)
scoreB.addIf(expiryA < expiryB, 1)

return scoreA > scoreB

Expand Down Expand Up @@ -124,7 +125,7 @@ proc init*(
requestId: RequestId,
slotIndex: uint16,
ask: StorageAsk,
expiry: uint64,
expiry: ?uint64,
collateral: UInt256,
seen = false,
): SlotQueueItem =
Expand All @@ -139,6 +140,17 @@ proc init*(
seen: seen,
)

proc init*(
_: type SlotQueueItem,
requestId: RequestId,
slotIndex: uint16,
ask: StorageAsk,
expiry: uint64,
collateral: UInt256,
seen = false,
): SlotQueueItem =
SlotQueueItem.init(requestId, slotIndex, ask, some expiry, collateral, seen)

proc init*(
_: type SlotQueueItem,
request: StorageRequest,
Expand All @@ -151,7 +163,7 @@ proc init*(
_: type SlotQueueItem,
requestId: RequestId,
ask: StorageAsk,
expiry: uint64,
expiry: ?uint64,
collateral: UInt256,
): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} =
if not ask.slots.inRange:
Expand All @@ -167,10 +179,19 @@ proc init*(
Rng.instance.shuffle(items)
return items

proc init*(
_: type SlotQueueItem,
requestId: RequestId,
ask: StorageAsk,
expiry: uint64,
collateral: UInt256,
): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} =
SlotQueueItem.init(requestId, ask, some expiry, collateral)

proc init*(
_: type SlotQueueItem, request: StorageRequest, collateral: UInt256
): seq[SlotQueueItem] =
return SlotQueueItem.init(request.id, request.ask, request.expiry, collateral)
return SlotQueueItem.init(request.id, request.ask, uint64.none, collateral)

proc inRange*(val: SomeUnsignedInt): bool =
val.uint16 in SlotQueueSize.low .. SlotQueueSize.high
Expand All @@ -196,6 +217,9 @@ proc collateralPerByte*(self: SlotQueueItem): UInt256 =
proc seen*(self: SlotQueueItem): bool =
self.seen

proc `seen=`*(self: var SlotQueueItem, seen: bool) =
self.seen = seen

proc running*(self: SlotQueue): bool =
self.running

Expand Down
17 changes: 14 additions & 3 deletions codex/sales/states/downloading.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
let market = context.market
let reservations = context.reservations

without onStore =? context.onStore:
Expand Down Expand Up @@ -69,11 +70,21 @@
return await reservations.release(reservation.id, reservation.availabilityId, bytes)

try:
let slotId = slotId(request.id, data.slotIndex)
let isRepairing = (await context.market.slotState(slotId)) == SlotState.Repair
let requestId = request.id

Check warning on line 73 in codex/sales/states/downloading.nim

View check run for this annotation

Codecov / codecov/patch

codex/sales/states/downloading.nim#L73

Added line #L73 was not covered by tests
let slotId = slotId(requestId, data.slotIndex)
let requestState = await market.requestState(requestId)
let isRepairing = (await market.slotState(slotId)) == SlotState.Repair

trace "Retrieving expiry"
var expiry: SecondsSince1970
if state =? requestState and state == RequestState.Started:
expiry = await market.getRequestEnd(requestId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon the filled state, this will get "re-updated" to the same value, but I guess it is not a problem except a bit of wasted CPU cycles.

else:
expiry = await market.requestExpiresAt(requestId)

trace "Starting download"
if err =? (await onStore(request, data.slotIndex, onBlocks, isRepairing)).errorOption:
if err =?
(await onStore(request, expiry, data.slotIndex, onBlocks, isRepairing)).errorOption:
return some State(SaleErrored(error: err, reprocessSlot: false))

trace "Download complete"
Expand Down
33 changes: 22 additions & 11 deletions tests/codex/helpers/mockmarket.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import codex/clock

import ../examples
import ./mockclock

export market
export tables
Expand Down Expand Up @@ -51,7 +52,7 @@
errorOnFillSlot*: ?(ref MarketError)
errorOnFreeSlot*: ?(ref MarketError)
errorOnGetHost*: ?(ref MarketError)
clock: ?Clock
clock: Clock

Fulfillment* = object
requestId*: RequestId
Expand All @@ -63,7 +64,7 @@
host*: Address
slotIndex*: uint64
proof*: Groth16Proof
timestamp: ?SecondsSince1970
timestamp: SecondsSince1970
collateral*: UInt256

Subscriptions = object
Expand Down Expand Up @@ -119,7 +120,7 @@
proc hash*(requestId: RequestId): Hash =
hash(requestId.toArray)

proc new*(_: type MockMarket, clock: ?Clock = Clock.none): MockMarket =
proc new*(_: type MockMarket, clock: Clock = MockClock.new()): MockMarket =
## Create a new mocked Market instance
##
let config = MarketplaceConfig(
Expand Down Expand Up @@ -181,10 +182,15 @@
method requestStorage*(
market: MockMarket, request: StorageRequest
) {.async: (raises: [CancelledError, MarketError]).} =
let now = market.clock.now()
let requestExpiresAt = now + request.expiry.toSecondsSince1970
let requestEndsAt = now + request.ask.duration.toSecondsSince1970
market.requested.add(request)
market.requestExpiry[request.id] = requestExpiresAt
market.requestEnds[request.id] = requestEndsAt
var subscriptions = market.subscriptions.onRequest
for subscription in subscriptions:
subscription.callback(request.id, request.ask, request.expiry)
subscription.callback(request.id, request.ask, requestExpiresAt.uint64)

method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} =
return market.activeRequests[market.signer]
Expand Down Expand Up @@ -308,7 +314,7 @@
slotIndex: slotIndex,
proof: proof,
host: host,
timestamp: market.clock .? now,
timestamp: market.clock.now,
collateral: collateral,
)
market.filled.add(slot)
Expand Down Expand Up @@ -541,15 +547,23 @@
): Future[seq[StorageRequested]] {.async.} =
return market.requested.map(
request =>
StorageRequested(requestId: request.id, ask: request.ask, expiry: request.expiry)
StorageRequested(
requestId: request.id,
ask: request.ask,
expiry: market.requestExpiry[request.id].uint64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the expiry should be the duration no ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no sorry, it is ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's really confusing keeping these two uint64 values apart, so that's why in #1196 I created different types for durations and timestamps. But I didn't want to extract all of that into this PR.

)

Check warning on line 554 in tests/codex/helpers/mockmarket.nim

View check run for this annotation

Codecov / codecov/patch

tests/codex/helpers/mockmarket.nim#L550-L554

Added lines #L550 - L554 were not covered by tests
)

method queryPastStorageRequestedEvents*(
market: MockMarket, blocksAgo: int
): Future[seq[StorageRequested]] {.async.} =
return market.requested.map(
request =>
StorageRequested(requestId: request.id, ask: request.ask, expiry: request.expiry)
StorageRequested(
requestId: request.id,
ask: request.ask,
expiry: market.requestExpiry[request.id].uint64,
)

Check warning on line 566 in tests/codex/helpers/mockmarket.nim

View check run for this annotation

Codecov / codecov/patch

tests/codex/helpers/mockmarket.nim#L562-L566

Added lines #L562 - L566 were not covered by tests
)

method queryPastSlotFilledEvents*(
Expand All @@ -571,10 +585,7 @@
): Future[seq[SlotFilled]] {.async.} =
let filtered = market.filled.filter(
proc(slot: MockSlot): bool =
if timestamp =? slot.timestamp:
return timestamp >= fromTime
else:
true
return slot.timestamp >= fromTime
)
return filtered.map(
slot => SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex)
Expand Down
7 changes: 3 additions & 4 deletions tests/codex/node/testcontracts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ asyncchecksuite "Test Node - Host contracts":
let onStore = !sales.onStore
var request = StorageRequest.example
request.content.cid = verifiableBlock.cid
request.expiry =
(getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.uint64
let expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix
var fetchedBytes: uint = 0

let onBlocks = proc(
Expand All @@ -127,7 +126,7 @@ asyncchecksuite "Test Node - Host contracts":
fetchedBytes += blk.data.len.uint
return success()

(await onStore(request, 1.uint64, onBlocks, isRepairing = false)).tryGet()
(await onStore(request, expiry, 1.uint64, onBlocks, isRepairing = false)).tryGet()
check fetchedBytes == 12 * DefaultBlockSize.uint

let indexer = verifiable.protectedStrategy.init(
Expand All @@ -141,4 +140,4 @@ asyncchecksuite "Test Node - Host contracts":
bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet

check blkMd.expiry == request.expiry.toSecondsSince1970
check blkMd.expiry == expiry
Loading