From f365139319f1f7241cf98667e97c064df5bd9c8c Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Tue, 17 Oct 2023 23:31:13 +0200 Subject: [PATCH] Rework erasure.nim to include recent cleanup --- codex/blockexchange/engine/engine.nim | 21 +- codex/codex.nim | 1 + codex/erasure/erasure.nim | 373 +++++++++++++++++--------- codex/merkletree/merkletree.nim | 31 ++- codex/stores/blockstore.nim | 6 - codex/stores/cachestore.nim | 3 - codex/stores/networkstore.nim | 3 - codex/stores/repostore.nim | 12 +- codex/stores/treereader.nim | 37 +-- codex/utils.nim | 7 - codex/utils/asynciter.nim | 152 +++++++---- tests/codex/helpers.nim | 15 +- tests/codex/helpers/mockrepostore.nim | 1 - 13 files changed, 394 insertions(+), 268 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 8da80d0bb..f5c9b6ca6 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -275,24 +275,9 @@ proc requestBlocks*( without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err: return failure(err) - var - iter = AsyncIter[Block]() - index = 0 - - proc next(): Future[Block] = - if index < leavesCount: - let fut = b.requestBlock(treeReq, index, timeout) - inc index - if index >= leavesCount: - iter.finished = true - return fut - else: - let fut = newFuture[Block]("engine.requestBlocks") - fut.fail(newException(CodexError, "No more elements for tree with cid " & $treeCid)) - return fut - - iter.next = next - return success(iter) + return Iter.fromSlice(0.. b.requestBlock(treeReq, index, timeout) + ).success proc blockPresenceHandler*( b: BlockExcEngine, diff --git a/codex/codex.nim b/codex/codex.nim index 62aba38b1..f2369c82c 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -243,6 +243,7 @@ proc new*( repoDs = repoData, metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace) .expect("Should create meta data store!"), + treeReader = treeReader, quotaMaxBytes = config.storageQuota.uint, blockTtl = config.blockTtl) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 6ad61751e..10159bdc6 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -12,6 +12,7 @@ import pkg/upraises push: {.upraises: [].} import std/sequtils +import std/sugar import pkg/chronos import pkg/chronicles @@ -23,6 +24,7 @@ import ../merkletree import ../stores import ../blocktype as bt import ../utils +import ../utils/asynciter import pkg/stew/byteutils @@ -68,142 +70,242 @@ type decoderProvider*: DecoderProvider store*: BlockStore -proc encode*( - self: Erasure, - manifest: Manifest, - blocks: int, - parity: int -): Future[?!Manifest] {.async.} = - ## Encode a manifest into one that is erasure protected. - ## - ## `manifest` - the original manifest to be encoded - ## `blocks` - the number of blocks to be encoded - K - ## `parity` - the number of parity blocks to generate - M + EncodingParams = object + ecK: int + ecM: int + rounded: int + steps: int + blocksCount: int + +func indexToPos(steps, idx, step: int): int {.inline.} = + ## Convert an index to a position in the encoded + ## dataset + ## `idx` - the index to convert + ## `step` - the current step + ## `pos` - the position in the encoded dataset ## - logScope: - original_cid = manifest.cid.get() - original_len = manifest.blocksCount - blocks = blocks - parity = parity + (idx - step) div steps - trace "Erasure coding manifest", blocks, parity +proc getPendingBlocks( + self: Erasure, + manifest: Manifest, + indicies: seq[int]): AsyncIter[(?!bt.Block, int)] = + ## Get pending blocks iterator + ## - without tree =? await self.store.getTree(manifest.treeCid), err: - return err.failure + var + # request blocks from the store + pendingBlocks = indicies.map( (i: int) => + self.store.getBlock(manifest.treeCid, i, manifest.treeRoot).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K) + ) - let leaves = tree.leaves + proc isFinished(): bool = pendingBlocks.len == 0 + + proc genNext(): Future[(?!bt.Block, int)] {.async.} = + let completedFut = await one(pendingBlocks) + pendingBlocks.del(pendingBlocks.find(completedFut)) + return await completedFut + + Iter.new(genNext, isFinished) + +proc prepareEncodingData( + self: Erasure, + manifest: Manifest, + params: EncodingParams, + step: int, + data: ref seq[seq[byte]], + cids: ref seq[Cid], + emptyBlock: seq[byte]): Future[?!int] {.async.} = + ## Prepare data for encoding + ## let - rounded = roundUp(manifest.blocksCount, blocks) - steps = divUp(manifest.blocksCount, blocks) - blocksCount = rounded + (steps * parity) + indicies = toSeq(countup(step, params.rounded - 1, params.steps)) + pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount)) + + var resolved = 0 + for fut in pendingBlocksIter: + let (blkOrErr, idx) = await fut + without blk =? blkOrErr, err: + warn "Failed retreiving a block", idx, treeCid = manifest.treeCid + continue + + let pos = indexToPos(params.steps, idx, step) + shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) + cids[idx] = blk.cid - var cids = newSeq[Cid](blocksCount) + resolved.inc() + for idx in indicies.filterIt(it >= manifest.blocksCount): + let pos = indexToPos(params.steps, idx, step) + trace "Padding with empty block", idx + shallowCopy(data[pos], emptyBlock) + without emptyBlockCid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err: + return failure(err) + cids[idx] = emptyBlockCid + + success(resolved) + +proc prepareDecodingData( + self: Erasure, + encoded: Manifest, + step: int, + data: ref seq[seq[byte]], + parityData: ref seq[seq[byte]], + cids: ref seq[Cid], + emptyBlock: seq[byte]): Future[?!(int, int)] {.async.} = + ## Prepare data for decoding + ## `encoded` - the encoded manifest + ## `step` - the current step + ## `data` - the data to be prepared + ## `parityData` - the parityData to be prepared + ## `cids` - cids of prepared data + ## `emptyBlock` - the empty block to be used for padding + ## + + let + indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps)) + pendingBlocksIter = self.getPendingBlocks(encoded, indicies) - # copy original manifest blocks - for i in 0..= encoded.ecK: + break + + let (blkOrErr, idx) = await fut + without blk =? blkOrErr, err: + trace "Failed retreiving a block", idx, treeCid = encoded.treeCid + continue + + let + pos = indexToPos(encoded.steps, idx, step) + + logScope: + cid = blk.cid + idx = idx + pos = pos + step = step + empty = blk.isEmpty + + cids[idx] = blk.cid + if idx >= encoded.rounded: + trace "Retrieved parity block" + shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data) + parityPieces.inc else: - without cid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err: - return err.failure - cids[i] = cid + trace "Retrieved data block" + shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) + dataPieces.inc + + resolved.inc + + return success (dataPieces, parityPieces) + +proc init(_: type EncodingParams, manifest: Manifest, ecK: int, ecM: int): ?!EncodingParams = + if ecK > manifest.blocksCount: + return failure("Unable to encode manifest, not enough blocks, ecK = " & $ecK & ", blocksCount = " & $manifest.blocksCount) + + let + rounded = roundUp(manifest.blocksCount, ecK) + steps = divUp(manifest.blocksCount, ecK) + blocksCount = rounded + (steps * ecM) + + EncodingParams( + ecK: ecK, + ecM: ecM, + rounded: rounded, + steps: steps, + blocksCount: blocksCount + ).success + +proc encodeData( + self: Erasure, + manifest: Manifest, + params: EncodingParams + ): Future[?!Manifest] {.async.} = + ## Encode blocks pointed to by the protected manifest + ## + ## `manifest` - the manifest to encode + ## logScope: - steps = steps - rounded_blocks = rounded - new_manifest = blocksCount + steps = params.steps + rounded_blocks = params.rounded + blocks_count = params.blocksCount + ecK = params.ecK + ecM = params.ecM var - encoder = self.encoderProvider(manifest.blockSize.int, blocks, parity) - var toadd = 0 - var tocount = 0 - var maxidx = 0 + cids = seq[Cid].new() + encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM) + emptyBlock = newSeq[byte](manifest.blockSize.int) + + cids[].setLen(params.blocksCount) + try: - for i in 0..= encoded.ecK) or (idxPendingBlocks.len == 0): - break - - let - done = await one(idxPendingBlocks) - idx = pendingBlocks.find(done) - - idxPendingBlocks.del(idxPendingBlocks.find(done)) - - without blk =? (await done), error: - trace "Failed retrieving block", error = error.msg - continue + data[].setLen(encoded.ecK) # set len to K + parityData[].setLen(encoded.ecM) # set len to M - if idx >= encoded.ecK: - trace "Retrieved parity block", cid = blk.cid, idx - shallowCopy(parityData[idx - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data) - else: - trace "Retrieved data block", cid = blk.cid, idx - shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data) - - resolved.inc - - let - dataPieces = data.filterIt( it.len > 0 ).len - parityPieces = parityData.filterIt( it.len > 0 ).len + without (dataPieces, parityPieces) =? + (await self.prepareDecodingData(encoded, step, data, parityData, cids, emptyBlock)), err: + trace "Unable to prepare data", error = err.msg + return failure(err) if dataPieces >= encoded.ecK: - trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces + trace "Retrieved all the required data blocks" continue - trace "Erasure decoding data", data = dataPieces, parity = parityPieces + trace "Erasure decoding data" if ( - let err = decoder.decode(data, parityData, recovered); + let err = decoder.decode(data[], parityData[], recovered); err.isErr): - trace "Unable to decode manifest!", err = $err.error + trace "Unable to decode data!", err = $err.error return failure($err.error) for i in 0..= leavesCount: - iter.finish - - checkLen(0) - - var index = 0 - proc next(): Future[?!Block] {.async.} = - if not iter.finished: + let iter = Iter.fromSlice(0.. 5; 6; 7; 8 - ## ``` - a .. (when b is BackwardsIndex: succ(b) else: pred(b)) when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'} diff --git a/codex/utils/asynciter.nim b/codex/utils/asynciter.nim index 7156fc6e1..ffff80d1b 100644 --- a/codex/utils/asynciter.nim +++ b/codex/utils/asynciter.nim @@ -1,13 +1,16 @@ +import std/sugar + import pkg/questionable import pkg/chronos import pkg/upraises type - MapItem*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.} - NextItem*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.} + Function*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.} + IsFinished* = proc(): bool {.upraises: [], gcsafe, closure.} + GenNext*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.} Iter*[T] = ref object - finished*: bool - next*: NextItem[T] + finished: bool + next*: GenNext[T] AsyncIter*[T] = Iter[Future[T]] proc finish*[T](self: Iter[T]): void = @@ -20,66 +23,119 @@ iterator items*[T](self: Iter[T]): T = while not self.finished: yield self.next() -proc map*[T, U](wrappedIter: Iter[T], mapItem: MapItem[T, U]): Iter[U] = - var iter = Iter[U]() - - proc checkFinish(): void = - if wrappedIter.finished: - iter.finish +proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} = + let t = await fut + fn(t) - checkFinish() +proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] = + var iter = Iter[T]() - proc next(): U {.upraises: [CatchableError].} = + proc next(): T {.upraises: [CatchableError].} = if not iter.finished: - let fut = wrappedIter.next() - checkFinish() - return mapItem(fut) + var item: T + try: + item = genNext() + except CatchableError as err: + if finishOnErr or isFinished(): + iter.finish + raise err + + if isFinished(): + iter.finish + return item else: - raise newException(CatchableError, "Iterator finished, but next element was requested") + raise newException(CatchableError, "Iterator is finished but next item was requested") + if isFinished(): + iter.finish + iter.next = next return iter -proc prefetch*[T](wrappedIter: Iter[T], n: Positive): Iter[T] = +proc fromItems*[T](_: type Iter, items: openArray[T]): Iter[T] = + ## Create new iterator from items + ## + + Iter.fromSlice(0.. items[i]) - var ringBuf = newSeq[T](n) - var wrappedLen = int.high +proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] = + ## Creates new iterator from slice + ## - var iter = Iter[T]() + Iter.fromRange(slice.a.int, slice.b.int, 1) - proc tryFetch(i: int): void = - if not wrappedIter.finished: - let res = wrappedIter.next() - ringBuf[i mod n] = res - if wrappedIter.finished: - wrappedLen = min(i + 1, wrappedLen) - else: - if i == 0: - wrappedLen = 0 +proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U] = + ## Creates new iterator in range a..b with specified step (default 1) + ## + + var i = a + + proc genNext(): U = + let u = i + inc(i, step) + u + + proc isFinished(): bool = + (step > 0 and i > b) or + (step < 0 and i < b) + + Iter.new(genNext, isFinished) - proc checkLen(i: int): void = - if i >= wrappedLen: - iter.finish +proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] = + Iter.new( + genNext = () => fn(iter.next()), + isFinished = () => iter.finished + ) - # initialize buf with n prefetched values - for i in 0..= iterLen + + # initialize ringBuf with n prefetched values + for j in 0..