Skip to content

Commit

Permalink
cleaning up merge
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov committed Nov 14, 2023
1 parent 50c1c70 commit bad6531
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 204 deletions.
61 changes: 12 additions & 49 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -214,46 +214,6 @@ proc requestBlock*(
): Future[Block] =
b.requestBlock(BlockAddress.init(cid))

proc requestBlock(
b: BlockExcEngine,
treeReq: TreeReq,
index: Natural,
timeout = DefaultBlockTimeout
): Future[Block] {.async.} =
let blockFuture = b.pendingBlocks.getWantHandle(address, timeout)

if b.pendingBlocks.isInFlight(address):
return await blockFuture

let peers = b.peers.selectCheapest(address)
if peers.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])

let maybePeer =
if peers.len > 0:
peers[hash(address) mod peers.len].some
elif b.peers.len > 0:
toSeq(b.peers)[hash(address) mod b.peers.len].some
else:
BlockExcPeerCtx.none

if peer =? maybePeer:
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
b.pendingBlocks.setInFlight(address)
await b.sendWantBlock(address, peer)
codexBlockExchangeWantBlockListsSent.inc()
await b.sendWantHave(address, peer, toSeq(b.peers))
codexBlockExchangeWantHaveListsSent.inc()

return await blockFuture

proc requestBlock*(
b: BlockExcEngine,
cid: Cid,
timeout = DefaultBlockTimeout
): Future[Block] =
b.requestBlock(BlockAddress.init(cid))

proc blockPresenceHandler*(
b: BlockExcEngine,
peer: PeerId,
Expand Down Expand Up @@ -389,21 +349,24 @@ proc blocksDeliveryHandler*(

var validatedBlocksDelivery: seq[BlockDelivery]
for bd in blocksDelivery:
logScope:
peer = peer
address = bd.address

if err =? b.validateBlockDelivery(bd).errorOption:
warn "Block validation failed", address = bd.address, msg = err.msg
warn "Block validation failed", msg = err.msg
continue

if err =? (await b.localStore.putBlock(bd.blk)).errorOption:
error "Unable to store block", address = bd.address, err = err.msg
error "Unable to store block", err = err.msg
continue

if bd.address.leaf:
without proof =? bd.proof:
error "Proof expected for a leaf block delivery", address = bd.address
error "Proof expected for a leaf block delivery"
continue
if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption:
error "Unable to store proof and cid for a block", address = bd.address
error "Unable to store proof and cid for a block"
continue

validatedBlocksDelivery.add(bd)
Expand Down Expand Up @@ -438,11 +401,11 @@ proc wantListHandler*(

logScope:
peer = peerCtx.id
# cid = e.cid
address = e.address
wantType = $e.wantType

if idx < 0: # updating entry
trace "Processing new want list entry", address = e.address
trace "Processing new want list entry"

let
have = await e.address in b.localStore
Expand All @@ -454,21 +417,21 @@ proc wantListHandler*(
codex_block_exchange_want_have_lists_received.inc()

if not have and e.sendDontHave:
trace "Adding dont have entry to presence response", address = e.address
trace "Adding dont have entry to presence response"
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.DontHave,
price: price))
elif have and e.wantType == WantType.WantHave:
trace "Adding have entry to presence response", address = e.address
trace "Adding have entry to presence response"
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.Have,
price: price))
elif e.wantType == WantType.WantBlock:
trace "Added entry to peer's want blocks list", address = e.address
trace "Added entry to peer's want blocks list"
peerCtx.peerWants.add(e)
codex_block_exchange_want_block_lists_received.inc()
else:
Expand Down
20 changes: 10 additions & 10 deletions codex/blocktype.nim
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func new*(
codec = multiCodec("raw")
): ?!Block =
## creates a new block for both storage and network IO
##
##

let
hash = ? MultiHash.digest($mcodec, data).mapFailure
Expand Down Expand Up @@ -132,25 +132,25 @@ func new*(

proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!Cid =
## Returns cid representing empty content, given cid version, hash codec and data codec
##
##

const
Sha256 = multiCodec("sha2-256")
Raw = multiCodec("raw")
DagPB = multiCodec("dag-pb")
DagJson = multiCodec("dag-json")

var index {.global, threadvar.}: Table[(CIDv0, Sha256, DagPB), Result[Cid, CidError]]
var index {.global, threadvar.}: Table[(CidVersion, MultiCodec, MultiCodec), Cid]
once:
index = {
# source https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_empty
(CIDv0, Sha256, DagPB): Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"),
(CIDv1, Sha256, DagPB): Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi"), # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku
(CIDv1, Sha256, DagJson): Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP"), # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta
(CIDv1, Sha256, Raw): Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW"),
(CIDv0, Sha256, DagPB): ? Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").mapFailure,
(CIDv1, Sha256, DagPB): ? Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi").mapFailure, # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku
(CIDv1, Sha256, DagJson): ? Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP").mapFailure, # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta
(CIDv1, Sha256, Raw): ? Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW").mapFailure,
}.toTable

index[(version, hcodec, dcodec)].catch.flatMap((a: Result[Cid, CidError]) => a.mapFailure)
index[(version, hcodec, dcodec)].catch

proc emptyDigest*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!MultiHash =
emptyCid(version, hcodec, dcodec)
Expand All @@ -161,11 +161,11 @@ proc emptyBlock*(version: CidVersion, hcodec: MultiCodec): ?!Block =
.flatMap((cid: Cid) => Block.new(cid = cid, data = @[]))

proc emptyBlock*(cid: Cid): ?!Block =
cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
emptyBlock(cid.cidver, mhash.mcodec))

proc isEmpty*(cid: Cid): bool =
success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
emptyCid(cid.cidver, mhash.mcodec, cid.mcodec))

proc isEmpty*(blk: Block): bool =
Expand Down
3 changes: 0 additions & 3 deletions codex/codex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,6 @@ proc new*(
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)

treeReader = TreeReader.new()

repoData = case config.repoKind
of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5)
.expect("Should create repo file data store!"))
Expand All @@ -243,7 +241,6 @@ proc new*(
repoDs = repoData,
metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create meta data store!"),
treeReader = treeReader,
quotaMaxBytes = config.storageQuota.uint,
blockTtl = config.blockTtl)

Expand Down
21 changes: 12 additions & 9 deletions codex/erasure/erasure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,17 @@ proc getPendingBlocks(

proc isFinished(): bool = pendingBlocks.len == 0

proc genNext(): Future[(?!bt.Block, int)] {.async.} =
proc genNext(): Future[(?!bt.Block, int)] {.async.} =
let completedFut = await one(pendingBlocks)
pendingBlocks.del(pendingBlocks.find(completedFut))
return await completedFut
if (let i = pendingBlocks.find(completedFut); i >= 0):
pendingBlocks.del(i)
return await completedFut
else:
let (_, index) = await completedFut
raise newException(CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index)

Iter.new(genNext, isFinished)

proc prepareEncodingData(
self: Erasure,
manifest: Manifest,
Expand All @@ -128,9 +132,9 @@ proc prepareEncodingData(
for fut in pendingBlocksIter:
let (blkOrErr, idx) = await fut
without blk =? blkOrErr, err:
warn "Failed retreiving a block", idx, treeCid = manifest.treeCid
warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg
continue

let pos = indexToPos(params.steps, idx, step)
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
cids[idx] = blk.cid
Expand Down Expand Up @@ -164,7 +168,7 @@ proc prepareDecodingData(
## `emptyBlock` - the empty block to be used for padding
##

let
let
indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps))
pendingBlocksIter = self.getPendingBlocks(encoded, indicies)

Expand All @@ -180,7 +184,7 @@ proc prepareDecodingData(

let (blkOrErr, idx) = await fut
without blk =? blkOrErr, err:
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg
continue

let
Expand Down Expand Up @@ -368,7 +372,6 @@ proc decode*(
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
resolved = 0

data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M
Expand Down
20 changes: 10 additions & 10 deletions codex/manifest/manifest.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ export types

type
Manifest* = ref object of RootObj
treeCid: Cid # Root of the merkle tree
datasetSize: NBytes # Total size of all blocks
blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
version: CidVersion # Cid version
hcodec: MultiCodec # Multihash codec
codec: MultiCodec # Data set codec
case protected: bool # Protected datasets have erasure coded info
treeCid {.serialize.}: Cid # Root of the merkle tree
datasetSize {.serialize.}: NBytes # Total size of all blocks
blockSize {.serialize.}: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
version: CidVersion # Cid version
hcodec: MultiCodec # Multihash codec
codec: MultiCodec # Data set codec
case protected {.serialize.}: bool # Protected datasets have erasure coded info
of true:
ecK: int # Number of blocks to encode
ecM: int # Number of resulting parity blocks
originalTreeCid: Cid # The original root of the dataset being erasure coded
ecK: int # Number of blocks to encode
ecM: int # Number of resulting parity blocks
originalTreeCid: Cid # The original root of the dataset being erasure coded
originalDatasetSize: NBytes
else:
discard
Expand Down
Loading

0 comments on commit bad6531

Please sign in to comment.