diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..e347195 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,5 @@ +root=true + +[*.rs] +indent_style = tab +indent_size = 4 \ No newline at end of file diff --git a/.gitignore b/.gitignore index ad90359..b85e5ce 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target cache +store diff --git a/Cargo.lock b/Cargo.lock index 03f09bd..b269b5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,6 +101,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -177,6 +188,17 @@ dependencies = [ "syn", ] +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "base64" version = "0.22.1" @@ -227,9 +249,9 @@ checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" [[package]] name = "bytes" -version = "1.10.1" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "cc" @@ -249,6 +271,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.42" @@ -327,10 +355,13 @@ name = "common" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "chrono", "flate2", + "futures", "hex", "lzma-rust2", + "opendal", "serde", "sha2", "tokio", @@ -356,6 +387,32 @@ version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a9b614a5787ef0c8802a55766480563cb3a93b435898c422ed2a359cf811582" +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.16", + "once_cell", + "tiny-keccak", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -386,6 +443,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -395,6 +461,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-common" version = "0.1.6" @@ -412,9 +484,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", + "subtle", +] + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "find-msvc-tools" version = "0.1.4" @@ -447,6 +547,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -454,6 +569,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -462,6 +578,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -480,10 +624,16 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -503,8 +653,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -514,11 +666,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasip2", + "wasm-bindgen", +] + +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "heck" version = "0.5.0" @@ -531,6 +703,24 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "http" version = "1.3.1" @@ -596,6 +786,24 @@ dependencies = [ "pin-utils", "smallvec", "tokio", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +dependencies = [ + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots", ] [[package]] @@ -604,14 +812,22 @@ version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" dependencies = [ + "base64", "bytes", + "futures-channel", "futures-core", + "futures-util", "http", "http-body", "hyper", + "ipnet", + "libc", + "percent-encoding", "pin-project-lite", + "socket2", "tokio", "tower-service", + "tracing", ] [[package]] @@ -638,6 +854,124 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" + +[[package]] +name = "icu_properties" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" + +[[package]] +name = "icu_provider" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + +[[package]] +name = "ipnet" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" + +[[package]] +name = "iri-string" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -691,6 +1025,12 @@ dependencies = [ "zlib-rs", ] +[[package]] +name = "litemap" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" + [[package]] name = "lock_api" version = "0.4.14" @@ -706,6 +1046,12 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "lzma-rust2" version = "0.15.1" @@ -722,6 +1068,16 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.6" @@ -776,6 +1132,44 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "opendal" +version = "0.54.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42afda58fa2cf50914402d132cc1caacff116a85d10c72ab2082bb7c50021754" +dependencies = [ + "anyhow", + "backon", + "base64", + "bytes", + "chrono", + "crc32c", + "futures", + "getrandom 0.2.16", + "http", + "http-body", + "log", + "md-5", + "percent-encoding", + "quick-xml 0.38.3", + "reqsign", + "reqwest", + "serde", + "serde_json", + "tokio", + "uuid", +] + +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -823,6 +1217,24 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "potential_utf" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +dependencies = [ + "zerovec", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.103" @@ -833,19 +1245,153 @@ dependencies = [ ] [[package]] -name = "quote" -version = "1.0.42" +name = "quick-xml" +version = "0.37.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" dependencies = [ - "proc-macro2", + "memchr", + "serde", ] [[package]] -name = "r-efi" -version = "5.3.0" +name = "quick-xml" +version = "0.38.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +dependencies = [ + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand 0.9.2", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.60.2", +] + +[[package]] +name = "quote" +version = "1.0.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.4", +] [[package]] name = "redox_syscall" @@ -856,6 +1402,76 @@ dependencies = [ "bitflags", ] +[[package]] +name = "reqsign" +version = "0.16.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43451dbf3590a7590684c25fb8d12ecdcc90ed3ac123433e500447c7d77ed701" +dependencies = [ + "anyhow", + "async-trait", + "base64", + "chrono", + "form_urlencoded", + "getrandom 0.2.16", + "hex", + "hmac", + "home", + "http", + "log", + "percent-encoding", + "quick-xml 0.37.5", + "rand 0.8.5", + "reqwest", + "rust-ini", + "serde", + "serde_json", + "sha1", + "sha2", + "tokio", +] + +[[package]] +name = "reqwest" +version = "0.12.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tokio-util", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "webpki-roots", +] + [[package]] name = "ring" version = "0.17.14" @@ -870,6 +1486,31 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rust-ini" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "796e8d2b6696392a43bea58116b667fb4c29727dc5abd27d6acf338bb4f688c7" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustls" version = "0.23.35" @@ -891,6 +1532,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" dependencies = [ + "web-time", "zeroize", ] @@ -923,6 +1565,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "serde" version = "1.0.228" @@ -996,9 +1644,11 @@ dependencies = [ "axum", "clap", "common", + "futures", "futures-core", "http-body", "lazy_static", + "opendal", "sha2", "tokio", "tokio-stream", @@ -1006,6 +1656,17 @@ dependencies = [ "tower-http", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -1038,6 +1699,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + [[package]] name = "smallvec" version = "1.15.1" @@ -1054,6 +1721,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "strsim" version = "0.11.1" @@ -1082,6 +1755,74 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + +[[package]] +name = "tinystr" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +dependencies = [ + "displaydoc", + "zerovec", +] + +[[package]] +name = "tinyvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" @@ -1111,6 +1852,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.17" @@ -1161,11 +1912,14 @@ dependencies = [ "bitflags", "bytes", "futures-core", + "futures-util", "http", "http-body", + "iri-string", "pin-project-lite", "tokio", "tokio-util", + "tower", "tower-layer", "tower-service", ] @@ -1202,6 +1956,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.19.0" @@ -1249,24 +2009,63 @@ dependencies = [ "log", ] +[[package]] +name = "url" +version = "2.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + [[package]] name = "utf-8" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +dependencies = [ + "getrandom 0.3.4", + "js-sys", + "serde", + "wasm-bindgen", +] + [[package]] name = "version_check" version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -1295,6 +2094,19 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "551f88106c6d5e7ccc7cd9a16f312dd3b5d36ea8b4954304657d5dfba115d4a0" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.105" @@ -1327,6 +2139,39 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "web-sys" +version = "0.3.82" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a1f95c0d03a47f4ae1f7a64643a6bb97465d9b740f0fa8f90ea33915c99a9a1" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "1.0.4" @@ -1557,12 +2402,115 @@ version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +[[package]] +name = "writeable" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" + +[[package]] +name = "yoke" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerocopy" +version = "0.8.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0894878a5fa3edfd6da3f88c4805f4c8558e2b996227a3d864f47fe11e38282c" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zeroize" version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" +[[package]] +name = "zerotrie" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zlib-rs" version = "0.5.2" diff --git a/common/Cargo.toml b/common/Cargo.toml index dcc12be..04b85b8 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -5,10 +5,13 @@ edition = "2021" [dependencies] anyhow = "1.0.100" +bytes = "1.11.1" chrono = "0.4.41" flate2 = { version = "1.1.5", features = ["zlib-rs"] } +futures = "0.3.31" hex = "0.4.3" lzma-rust2 = {version = "0.15.1", features = ["std"] } +opendal = "0.54.1" serde = { version = "1.0.228", features = ["serde_derive"] } sha2 = "0.10.9" tokio = { version = "1.48.0", features = ["full"] } diff --git a/common/src/archive.rs b/common/src/archive.rs index 6ac228d..09adc5f 100644 --- a/common/src/archive.rs +++ b/common/src/archive.rs @@ -1,10 +1,21 @@ -use std::{fs::File, io::{BufRead, BufReader, Read, Write}, num::NonZero, path::PathBuf, str::FromStr}; +use std::{ + fs::File, + io::{BufRead, BufReader, Read, Write}, + num::NonZero, + path::PathBuf, + str::FromStr, +}; -use anyhow::{Error, anyhow}; +use anyhow::{anyhow, Error}; +use bytes::Bytes; +use futures::{AsyncRead, AsyncReadExt, Stream, TryStream}; use sha2::{Digest, Sha512}; use crate::{ - Hash, Header, object_body::{Index, Object}, pipe, read_header_and_body + object_body::{Index, Object}, + pipe, + store::Store, + Hash, }; pub const HEADER: [u8; 4] = [b'a', b'r', b'x', b'a']; @@ -27,7 +38,7 @@ impl FromStr for Compression { "gzip" => Ok(Compression::Gzip), "deflate" => Ok(Compression::Deflate), "lzma2" => Ok(Compression::LZMA2), - _ => Err(anyhow!("Invalid Compression Type")) + _ => Err(anyhow!("Invalid Compression Type")), } } } @@ -71,21 +82,28 @@ where match self.compression { Compression::None => self.body.to_data(writer)?, Compression::Gzip => { - let mut gz_encoder = flate2::write::GzEncoder::new(writer, flate2::Compression::default()); + let mut gz_encoder = + flate2::write::GzEncoder::new(writer, flate2::Compression::default()); self.body.to_data(&mut gz_encoder)?; gz_encoder.finish()?.flush()?; - - }, + } Compression::Deflate => { - let mut gz_encoder = flate2::write::DeflateEncoder::new(writer, flate2::Compression::default()); + let mut gz_encoder = + flate2::write::DeflateEncoder::new(writer, flate2::Compression::default()); self.body.to_data(&mut gz_encoder)?; gz_encoder.finish()?.flush()?; - }, - Compression::LZMA2 => self.body.to_data(&mut lzma_rust2::Lzma2WriterMt::new( - writer, - lzma_rust2::Lzma2Options { lzma_options: Default::default(), chunk_size: NonZero::new(1024 * 64) }, - std::thread::available_parallelism().unwrap().get() as u32, - )?.auto_finish())?, + } + Compression::LZMA2 => self.body.to_data( + &mut lzma_rust2::Lzma2WriterMt::new( + writer, + lzma_rust2::Lzma2Options { + lzma_options: Default::default(), + chunk_size: NonZero::new(1024 * 64), + }, + std::thread::available_parallelism().unwrap().get() as u32, + )? + .auto_finish(), + )?, } Ok(()) @@ -116,8 +134,12 @@ where let body = match compression { Compression::None => ArchiveBody::::from_data(&mut reader)?, - Compression::Gzip => ArchiveBody::::from_data(&mut flate2::read::GzDecoder::new(&mut reader))?, - Compression::Deflate => ArchiveBody::::from_data(&mut flate2::read::DeflateDecoder::new(&mut reader))?, + Compression::Gzip => ArchiveBody::::from_data( + &mut flate2::read::GzDecoder::new(&mut reader), + )?, + Compression::Deflate => ArchiveBody::::from_data( + &mut flate2::read::DeflateDecoder::new(&mut reader), + )?, Compression::LZMA2 => ArchiveBody::::from_data({ &mut lzma_rust2::Lzma2ReaderMt::new( &mut reader, @@ -125,7 +147,7 @@ where None, std::thread::available_parallelism().unwrap().get() as u32, ) - })? + })?, }; Ok(Archive { @@ -138,6 +160,34 @@ where } } +// /// Create a new `Body` from a [`Stream`]. +// /// +// /// [`Stream`]: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html +// pub fn from_stream(stream: S) -> Self +// where +// S: TryStream + Send + 'static, +// S::Ok: Into, +// S::Error: Into, +// { +// Self::new(StreamBody { +// stream: SyncWrapper::new(stream), +// }) +// } + +impl Stream for Archive +where + T: ArchiveEntryData + Unpin, +{ + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + + } +} + pub struct ArchiveHeaderEntry { pub hash: Hash, pub index: u64, @@ -155,7 +205,6 @@ impl ArchiveEntryData for RawEntryData { self.0 } } - pub struct ReaderEntryData(T) where T: Read; @@ -193,6 +242,23 @@ impl ArchiveEntryData for FileEntryData { } } +pub struct StoreEntryData { + pub store: Store, + pub hash: Hash, +} + +impl<'a> ArchiveEntryData for StoreEntryData { + fn turn_into_vec(self) -> Vec { + let mut object = futures::executor::block_on(self.store.get_object(&self.hash)) + .expect("Object to be available in store"); + + let mut data: Vec = Vec::new(); + futures::executor::block_on(object.read_to_end(&mut data)).expect("Reading to work"); + + data + } +} + pub struct ArchiveBody where T: ArchiveEntryData, diff --git a/common/src/header.rs b/common/src/header.rs index 6954d3f..917da66 100644 --- a/common/src/header.rs +++ b/common/src/header.rs @@ -1,7 +1,7 @@ use std::{io::{Read, Write}, str::from_utf8}; use anyhow::{Result, anyhow}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use crate::ObjectType; @@ -62,7 +62,7 @@ impl Header { Self::from_data(&buffer) } - pub async fn read_from_async(reader: &mut (impl AsyncRead + std::marker::Unpin)) -> Result { + pub async fn read_from_async(reader: &mut (impl AsyncRead + AsyncSeek + std::marker::Unpin)) -> Result { let mut buffer = [0u8; 32]; let bytes_read = reader.read(&mut buffer).await?; @@ -71,7 +71,15 @@ impl Header { } let buffer = &buffer[..bytes_read]; - Self::from_buf(&buffer) + + // Find the null marker of the header. If its not available then we just gotta assume the whole buffer is a valid utf8 header + let null_position = buffer.iter().position(|x| *x == 0).unwrap_or(buffer.len()); + let buffer = &buffer[..null_position]; + + // We set the reader position to after the null byte so the body doesn't contain it + reader.seek(std::io::SeekFrom::Start(null_position as u64 + 1)).await?; + + Self::from_data(&buffer) } pub fn read_from(reader: &mut impl Read) -> Result { diff --git a/common/src/lib.rs b/common/src/lib.rs index 7c5dbf5..cdf42fb 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -2,10 +2,12 @@ use std::{ collections::HashMap, fs::File, io::{BufRead, BufReader, Read, Write}, path::PathBuf, str::from_utf8 }; +use futures::AsyncReadExt; + pub use crate::constants::{BLOB_KEY, INDEX_KEY, TREE_KEY}; pub use crate::hash::Hash; pub use crate::header::Header; -use crate::object_body::{Object as ObjectTrait}; +use crate::{object_body::Object as ObjectTrait, store::Store}; pub use crate::primitives::{Mode, ObjectType}; pub use crate::object::Object; @@ -52,42 +54,40 @@ pub fn read_header_from_file(reader: &mut BufReader) -> Option
{ read_header_from_slice(&vec[..vec.len() - 1]) } -pub fn read_object_into_headers( - cache: &PathBuf, +pub async fn read_object_into_headers( + store: &Store, headers: &mut HashMap, object_hash: &Hash, ) -> anyhow::Result<()> { - let object_path = object_hash.get_path(&cache); - assert!(object_path.exists()); + let mut stack = vec![object_hash.clone()]; - // We assume that if our hash already exists, We probably have already collected all children - if headers.contains_key(object_hash) { - return Ok(()); - } + while let Some(current_hash) = stack.pop() { + if headers.contains_key(¤t_hash) { + continue; + } - let file = File::open(object_path).expect("file to exist"); - let mut reader = BufReader::new(file); - let mut data = Vec::new(); - let bytes_read = reader - .read_until(0, &mut data) - .expect("File to be readable"); + let mut object = store.get_object(¤t_hash).await?; - let header = - read_header_from_slice(&data[..bytes_read - 1]).expect("File to be correctly formatted"); - assert!(header.object_type != ObjectType::Index); + if object.header.object_type == ObjectType::Index { + return Err(anyhow::anyhow!("Indexes cannot exist within a tree. Likely a hash collision 😳")); + } - headers.insert(object_hash.clone(), header.clone()); - if header.object_type == ObjectType::Blob { - return Ok(()); - } + headers.insert(current_hash.clone(), object.header.clone()); + + if object.header.object_type == ObjectType::Blob { + continue; + } - reader.read_to_end(&mut data)?; + let mut data = Vec::new(); + let bytes_read = object.read_to_end(&mut data).await?; - // println!("Reading Tree {object_hash}"); - let tree = crate::object_body::Tree::from_data(&data[bytes_read..]); + assert!(bytes_read as u64 == object.header.size, "Read size must match header size"); + + let tree = crate::object_body::Tree::from_data(&data); - for entry in &tree.contents { - read_object_into_headers(cache, headers, &entry.hash)?; + for entry in &tree.contents { + stack.push(entry.hash.clone()); + } } Ok(()) diff --git a/common/src/store.rs b/common/src/store.rs index ed2361b..1708d79 100644 --- a/common/src/store.rs +++ b/common/src/store.rs @@ -1,23 +1,130 @@ -use anyhow::Result; -use crate::{Hash, Object}; +use crate::{header, Hash, Header}; +use anyhow::{anyhow, Result}; +use futures::io::copy; +use futures::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWriteExt, FutureExt}; +use futures::{AsyncReadExt, AsyncSeekExt}; +use opendal::{Builder, FuturesAsyncReader, Operator}; +pub struct StoreObject +where + T: AsyncBufRead + AsyncRead + Unpin, +{ + pub header: Header, + body: T, +} + +impl StoreObject +where + T: AsyncBufRead + AsyncRead + Unpin, +{ + // pub async fn new(mut reader: T) -> Result + // { + // let mut buffer = [0u8; 32]; + // let bytes_read = reader.read(&mut buffer).await?; + // let data = &buffer[..bytes_read]; + + // let Some(header_end) = data.iter().position(|x| *x == 0) else { + // return Err(anyhow!( + // "Invalid header. No null byte in the first 32 bytes" + // )); + // }; + // let header = Header::from_data(&data[..header_end])?; + // reader + // .seek(std::io::SeekFrom::Start(header_end as u64)) + // .await?; + + // Ok(Self { + // header, + // body: reader, + // }) + // } -pub trait Store { - fn get_object(&self, hash: &Hash) -> Option; + pub fn new_with_header(header: Header, reader: T) -> Self { + Self { + header, + body: reader, + } + } +} - fn put_object(&mut self, hash: &Hash, object: &Object) -> Result<()>; +impl AsyncRead for StoreObject +where + T: AsyncBufRead + AsyncRead + Unpin +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let this = self.get_mut(); + std::pin::Pin::new(&mut this.body).poll_read(cx, buf) + } } -pub struct InMemoryStore { +impl AsyncBufRead for StoreObject +where + T: AsyncBufRead + AsyncRead + Unpin +{ + fn poll_fill_buf( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + std::pin::Pin::new(&mut this.body).poll_fill_buf(cx) + } + + fn consume(self: std::pin::Pin<&mut Self>, amt: usize) { + let this = self.get_mut(); + std::pin::Pin::new(&mut this.body).consume(amt); + } +} +#[derive(Clone)] +pub struct Store { + operator: Operator, } -impl Store for InMemoryStore { - fn get_object(&self, hash: &Hash) -> Option { - todo!() +impl Store { + pub fn new(operator: Operator) -> Self { + Self { operator } } - fn put_object(&mut self, hash: &Hash, object: &Object) -> Result<()> { - todo!() + pub fn from_builder(builder: impl Builder) -> Result { + Ok(Self::new(Operator::new(builder)?.finish())) + } + + pub async fn exists(&self, hash: &Hash) -> Result { + Ok(self.operator.exists(hash.as_str()).await?) } + + pub async fn get_object(&self, hash: &Hash) -> Result> { + let mut reader = self + .operator + .reader(hash.as_str()) + .await? + .into_futures_async_read(..) + .await?; + + let header = Header::read_from_async(&mut reader).await?; + + Ok(StoreObject::new_with_header(header, reader)) + } + + pub async fn put_object(&self, hash: &Hash, mut object: StoreObject) -> Result<()> + where + T: AsyncBufRead + AsyncRead + Unpin, + { + let mut writer = self + .operator + .writer(hash.as_str()) + .await? + .into_futures_async_write(); + + object.header.write_to_async(&mut writer).await?; + copy(&mut object.body, &mut writer).await?; + + writer.close().await?; + + Ok(()) + } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 833ed7a..f29ac58 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -7,9 +7,11 @@ edition = "2021" axum = { version = "0.8.4", features = ["macros"] } clap = { version = "4.5.51", features = ["derive"] } common = { path = "../common" } +futures = "0.3.31" futures-core = "0.3.31" http-body = "1.0.1" lazy_static = "1.5.0" +opendal = { version = "0.54.1", features = ["services-fs", "services-s3"] } sha2 = "0.10.9" tokio = { version = "1.45.1", features = ["full"] } tokio-stream = "0.1.17" diff --git a/server/src/main.rs b/server/src/main.rs index 84febd1..4ba31c8 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,89 +1,106 @@ use axum::{ - Router, body::{Body, BodyDataStream}, debug_handler, extract::{Path as AxumPath, Request, State}, http::{HeaderMap, HeaderValue, StatusCode}, response::Response, routing::{get, put} + body::{Body, BodyDataStream}, + debug_handler, + extract::{Path as AxumPath, Request, State}, + http::{HeaderMap, HeaderValue, Response, StatusCode}, + routing::{get, put}, + Router, }; use clap::Parser; -use common::{Hash, Header, ObjectType, archive::{Archive, ArchiveBody, ArchiveEntryData, ArchiveHeaderEntry, FileEntryData, HEADER}, object_body::{Index, Object}, read_object_into_headers}; +use common::{ + Hash, Header, ObjectType, archive::{ + Archive, ArchiveBody, ArchiveEntryData, ArchiveHeaderEntry, AsyncReaderEntryData, + FileEntryData, HEADER, StoreEntryData, + }, object_body::{Index, Object}, read_object_into_headers, store::{Store, StoreObject} +}; use lazy_static::lazy_static; +use opendal::{Builder, Operator}; use sha2::{Digest, Sha512}; -use tower_http::compression::CompressionLayer; use std::{ - collections::{HashMap, HashSet}, fs::{self, create_dir, remove_file}, io::Write, path::{Path, PathBuf}, str::from_utf8, sync::RwLock + collections::{HashMap, HashSet}, + fs::{self, create_dir, remove_file}, + io::Write, + path::{Path, PathBuf}, + str::from_utf8, + sync::RwLock, }; -use tokio::{fs::File, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter}}; -use tokio_stream::StreamExt; -use tokio_util::io::ReaderStream; - -lazy_static! { - static ref INDEXES: RwLock> = Default::default(); - static ref TREES: RwLock> = Default::default(); - static ref BLOBS: RwLock> = Default::default(); -} +use tower_http::compression::CompressionLayer; -async fn read_cache>(path: P) { - let mut indexes = INDEXES.try_write().unwrap(); - let mut trees = TREES.try_write().unwrap(); - let mut blobs = BLOBS.try_write().unwrap(); +use futures::{AsyncReadExt, AsyncSeekExt, StreamExt, TryStreamExt, future::join_all}; - let mut total_size: u128 = 0; +// lazy_static! { +// static ref INDEXES: RwLock> = Default::default(); +// static ref TREES: RwLock> = Default::default(); +// static ref BLOBS: RwLock> = Default::default(); +// } - for entry in fs::read_dir(path).unwrap().filter_map(|x| x.ok()) { - let Ok(metadata) = entry.metadata() else { - continue; - }; +// async fn read_cache(store: &Store) { +// let mut indexes = INDEXES.try_write().unwrap(); +// let mut trees = TREES.try_write().unwrap(); +// let mut blobs = BLOBS.try_write().unwrap(); - if metadata.is_file() { - continue; - } +// store. - let prefix = entry.file_name(); +// let mut total_size: u128 = 0; - for entry in fs::read_dir(entry.path()).unwrap().filter_map(|x| x.ok()) { - let Ok(metadata) = entry.metadata() else { - continue; - }; +// for entry in fs::read_dir(path).unwrap().filter_map(|x| x.ok()) { +// let Ok(metadata) = entry.metadata() else { +// continue; +// }; - if !metadata.is_file() { - continue; - } +// if metadata.is_file() { +// continue; +// } - let name = format!( - "{}{}", - prefix.to_string_lossy(), - entry.file_name().to_string_lossy() - ); - let hash = Hash::try_from(name).expect("Valid Hash"); - - let Ok(file) = File::open(entry.path()).await else { - continue; - }; - let mut reader = BufReader::new(file); - - let Ok(Header { object_type, size }) = Header::read_from_async(&mut reader).await else { - panic!("Corrupt file {:?}", entry.path()); - }; - - total_size += size as u128; - - match object_type { - common::ObjectType::Blob => &mut blobs, - common::ObjectType::Tree => &mut trees, - common::ObjectType::Index => &mut indexes, - } - .insert(hash); - } - } +// let prefix = entry.file_name(); - println!("Loaded {} blobs", blobs.len()); - println!("Loaded {} trees", trees.len()); - println!("Loaded {} indexes", indexes.len()); - println!("Total Size: {} bytes", total_size); +// for entry in fs::read_dir(entry.path()).unwrap().filter_map(|x| x.ok()) { +// let Ok(metadata) = entry.metadata() else { +// continue; +// }; - indexes.iter().for_each(|i| println!("Index {i}")); -} +// if !metadata.is_file() { +// continue; +// } + +// let name = format!( +// "{}{}", +// prefix.to_string_lossy(), +// entry.file_name().to_string_lossy() +// ); +// let hash = Hash::try_from(name).expect("Valid Hash"); + +// let Ok(file) = File::open(entry.path()).await else { +// continue; +// }; +// let mut reader = BufReader::new(file); + +// let Ok(Header { object_type, size }) = Header::read_from_async(&mut reader).await else { +// panic!("Corrupt file {:?}", entry.path()); +// }; + +// total_size += size as u128; + +// match object_type { +// common::ObjectType::Blob => &mut blobs, +// common::ObjectType::Tree => &mut trees, +// common::ObjectType::Index => &mut indexes, +// } +// .insert(hash); +// } +// } + +// println!("Loaded {} blobs", blobs.len()); +// println!("Loaded {} trees", trees.len()); +// println!("Loaded {} indexes", indexes.len()); +// println!("Total Size: {} bytes", total_size); + +// indexes.iter().for_each(|i| println!("Index {i}")); +// } #[derive(Clone)] struct ServerState { - cache_path: PathBuf, + store: Store, } enum ErrorResult { @@ -114,76 +131,70 @@ impl From for ErrorResult { } } -async fn write_body_to_file( - path: &PathBuf, - hash: &Hash, - object_type: ObjectType, - object_size: u64, - mut body: BodyDataStream, -) -> Result<(), ErrorResult> { - assert!( - !path.exists(), - "Race condition. Someone else has created this file before us" - ); - - let header = Header::new(object_type, object_size); - - let file = File::create(path).await?; - let mut writer = BufWriter::new(file); - let mut hasher = Sha512::new(); - - header.write_to(&mut hasher)?; - header.write_to_async(&mut writer).await?; - - let mut length: u64 = 0; - - while let Some(chunk) = body.next().await { - let chunk = match chunk { - Ok(v) => v, - Err(err) => return Err(ErrorResult::InternalError(err.to_string())), - }; - - hasher.write_all(&chunk)?; - writer.write_all(&chunk).await?; - length += chunk.len() as u64; - } - - if object_size != length { - writer.flush().await?; - remove_file(path)?; - return Err(ErrorResult::LengthDoesntMatch); - } - - let new_hash = Hash::from(hasher); - - // the hashes don't match - if *hash != new_hash { - writer.flush().await?; - remove_file(path)?; - return Err(ErrorResult::HashDoesntMatch); - } - - Ok(()) -} +// async fn write_body_to_file( +// path: &PathBuf, +// hash: &Hash, +// object_type: ObjectType, +// object_size: u64, +// mut body: BodyDataStream, +// ) -> Result<(), ErrorResult> { +// assert!( +// !path.exists(), +// "Race condition. Someone else has created this file before us" +// ); + +// let header = Header::new(object_type, object_size); + +// let file = File::create(path).await?; +// let mut writer = BufWriter::new(file); +// let mut hasher = Sha512::new(); + +// header.write_to(&mut hasher)?; +// header.write_to_async(&mut writer).await?; + +// let mut length: u64 = 0; + +// while let Some(chunk) = body.next().await { +// let chunk = match chunk { +// Ok(v) => v, +// Err(err) => return Err(ErrorResult::InternalError(err.to_string())), +// }; + +// hasher.write_all(&chunk)?; +// writer.write_all(&chunk).await?; +// length += chunk.len() as u64; +// } + +// if object_size != length { +// writer.flush().await?; +// remove_file(path)?; +// return Err(ErrorResult::LengthDoesntMatch); +// } + +// let new_hash = Hash::from(hasher); + +// // the hashes don't match +// if *hash != new_hash { +// writer.flush().await?; +// remove_file(path)?; +// return Err(ErrorResult::HashDoesntMatch); +// } + +// Ok(()) +// } #[debug_handler] async fn put_object( AxumPath(object_hash): AxumPath, - State(state): State, + State(ServerState { store }): State, headers: HeaderMap, request: Request, ) -> Result { - let object_path = object_hash.get_path(&state.cache_path); - - if object_path.exists() { - return Err((StatusCode::OK, "Object already exists".into())); - } - - let Some(parent) = object_path.parent() else { - return Err((StatusCode::INTERNAL_SERVER_ERROR, "Impossible state".into())); - }; - - create_dir(parent).map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + match store.exists(&object_hash).await { + Ok(false) => Ok(()), + Ok(true) => Err((StatusCode::OK, "Object already exists".into())), + Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())), + }?; let Some(object_type) = headers.get("Object-Type").and_then(|v| v.to_str().ok()) else { return Err((StatusCode::BAD_REQUEST, "Missing Object-Type Header".into())); @@ -200,14 +211,22 @@ async fn put_object( let Some(object_size): Option = object_size.parse().ok() else { return Err((StatusCode::BAD_REQUEST, "Invalid Object-Size Header".into())); }; - + let header = Header::new(object_type, object_size); let data_stream = request.into_body().into_data_stream(); - if let Err(err) = - write_body_to_file(&object_path, &object_hash, object_type, object_size, data_stream).await - { - return Err(err.get_response()); - } + let buffered_reader = data_stream.map(|result| { + result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + }).into_async_read(); + + let store_object = StoreObject::new_with_header(header, buffered_reader); + + store + .put_object( + &object_hash, + store_object, + ) + .await + .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; Ok(StatusCode::CREATED) } @@ -215,27 +234,43 @@ async fn put_object( #[debug_handler] async fn get_object( AxumPath(object_hash): AxumPath, - State(state): State, + State(ServerState { store }): State, ) -> Result, (StatusCode, String)> { - let object_path = object_hash.get_path(&state.cache_path); - - if !object_path.exists() { - return Err(( - StatusCode::NO_CONTENT, - "No object with this hash exists".into(), - )); - } - - let file = tokio::fs::File::open(object_path).await.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; - - let mut reader = BufReader::new(file); - let Header { object_type, size } = Header::read_from_async(&mut reader).await.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; - - reader.rewind().await.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, format!("Unable to read file: \"{err}\"")))?; - - let reader = ReaderStream::new(reader); - let s = Body::from_stream(reader); - let mut response = Response::new(s); + match store.exists(&object_hash).await { + Ok(true) => Ok(()), + Ok(false) => Err((StatusCode::NO_CONTENT, "no object".into())), + Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())), + }?; + + // let file = tokio::fs::File::open(object_path).await.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; + + // let mut reader = BufReader::new(file); + // let Header { object_type, size } = Header::read_from_async(&mut reader).await.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; + + // reader.rewind().await.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, format!("Unable to read file: \"{err}\"")))?; + + // let reader = ReaderStream::new(reader); + // let s = Body::from_stream(reader); + // let mut response = Response::new(s); + + let object = store + .get_object(&object_hash) + .await + .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; + let Header { object_type, size } = object.header.clone(); + + let reader_stream = futures::stream::unfold(object, |mut reader| async move { + let mut buffer = vec![0u8; 8192]; + match reader.read(&mut buffer).await { + Ok(0) => None, + Ok(n) => { + buffer.truncate(n); + Some((Ok(buffer), reader)) + } + Err(e) => Some((Err(e), reader)), + } + }); + let mut response = Response::new(Body::from_stream(reader_stream)); let headers = response.headers_mut(); headers.insert("Object-Type", object_type.to_str().parse().unwrap()); @@ -247,39 +282,40 @@ async fn get_object( #[debug_handler] async fn get_bundle( AxumPath(index_hash): AxumPath, - State(ServerState { cache_path }): State, + State(ServerState { store }): State, ) -> Result, (StatusCode, String)> { - let object_path = index_hash.get_path(&cache_path); - - if !object_path.exists() { - return Err(( - StatusCode::NO_CONTENT, - "No object with this hash exists".into(), - )); - } - - let file = tokio::fs::File::open(object_path).await.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; - - let mut reader = BufReader::new(file); - let header = Header::read_from_async(&mut reader).await.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; - - if header.object_type != ObjectType::Index { + match store.exists(&index_hash).await { + Ok(true) => Ok(()), + Ok(false) => Err((StatusCode::NO_CONTENT, "no object".into())), + Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())), + }?; + + let mut object = store + .get_object(&index_hash) + .await + .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; + + if object.header.object_type != ObjectType::Index { return Err(( StatusCode::BAD_REQUEST, - "Object requested is not an index".into() - )) + "Object requested is not an index".into(), + )); } - reader.seek(std::io::SeekFrom::Start((header.to_string().len() ) as u64)).await.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; - let mut index_data = Vec::new(); - reader.read_to_end(&mut index_data).await.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; + + object + .read_to_end(&mut index_data) + .await + .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; let index = Index::from_data(&index_data); let mut headers = HashMap::new(); - read_object_into_headers(&cache_path, &mut headers, &index.tree); + println!("Reading objects for index {}", index_hash); + read_object_into_headers(&store, &mut headers, &index.tree).await.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; + println!("Finished reading {} objects from index", headers.len()); //TODO: Surely there is an algorithm to more efficiently lay out this data let mut i = 0; @@ -298,7 +334,6 @@ async fn get_bundle( i += total_length; } - let archive = Archive { header: HEADER, compression: common::archive::Compression::None, @@ -306,18 +341,29 @@ async fn get_bundle( index: index, body: ArchiveBody { header: header_entries, - entries: headers.into_iter().map(|(hash, header)| FileEntryData(hash.get_path(&cache_path))).collect() - } + entries: headers + .iter() + .map(|(hash, header)| StoreEntryData { + store: store.clone(), + hash: hash.clone(), + }) + .collect(), + }, }; - let mut body = Vec::new(); - - archive.to_data(&mut body).map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?; - - let mut response = Response::new(body.into()); + let mut response = Response::new(Body::from_stream(archive)); let headers = response.headers_mut(); - headers.insert("Content-Type", HeaderValue::from_str("application/arc").unwrap()); - headers.insert("Content-Disposition", HeaderValue::from_str(&format!("Content-Disposition: attachment; filename=\"{index_hash}.ar\"")).unwrap()); + headers.insert( + "Content-Type", + HeaderValue::from_str("application/arc").unwrap(), + ); + headers.insert( + "Content-Disposition", + HeaderValue::from_str(&format!( + "Content-Disposition: attachment; filename=\"{index_hash}.ar\"" + )) + .unwrap(), + ); return Ok(response); } @@ -337,14 +383,15 @@ pub struct Cli { #[tokio::main] async fn main() { - - let Cli { store, port, ..} = Cli::parse(); + let Cli { store, port, .. } = Cli::parse(); if !store.exists() { create_dir(&store).expect("Cache directory to exist"); } - read_cache(&store).await; + let store = opendal::services::Fs::default().root(store.to_str().expect("valid path")); + + // read_cache(&store).await; let comression_layer: CompressionLayer = CompressionLayer::new() .br(true) @@ -358,12 +405,14 @@ async fn main() { .route("/object/{object_id}", get(get_object)) .route("/bundle/{index_id}", get(get_bundle)) .with_state(ServerState { - cache_path: store, + store: Store::from_builder(store).expect("S"), }) .layer(comression_layer) .route("/", get(|| async { "Hello, World!" })); - let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)).await.unwrap(); + let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)) + .await + .unwrap(); println!("Listening at http://{}", listener.local_addr().unwrap()); axum::serve(listener, app).await.unwrap();