diff --git a/Cargo.lock b/Cargo.lock index 6a0c242a..112ba837 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,9 +10,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aho-corasick" -version = "0.7.20" +version = "0.7.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e" dependencies = [ "memchr", ] @@ -28,9 +28,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.68" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" +checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "assert-json-diff" @@ -44,9 +44,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.3.15" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" +checksum = "345fd392ab01f746c717b1357165b76f0b67a60192007b234058c9045fdcf695" dependencies = [ "flate2", "futures-core", @@ -74,9 +74,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "base64" -version = "0.13.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "bitflags" @@ -98,21 +98,21 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.11.1" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" [[package]] name = "bytes" -version = "1.3.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" +checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" [[package]] name = "cc" -version = "1.0.78" +version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a20104e2335ce8a659d6dd92a51a767a0c062599c73b343fd152cb401e828c3d" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" [[package]] name = "cfg-if" @@ -122,16 +122,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.23" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" +checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" dependencies = [ "iana-time-zone", "js-sys", "num-integer", "num-traits", "serde", - "time 0.1.45", + "time 0.1.44", "wasm-bindgen", "winapi", ] @@ -147,16 +147,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "codespan-reporting" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" -dependencies = [ - "termcolor", - "unicode-width", -] - [[package]] name = "colored" version = "2.0.0" @@ -170,15 +160,16 @@ dependencies = [ [[package]] name = "console" -version = "0.15.3" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5556015fe3aad8b968e5d4124980fbe2f6aaee7aeec6b749de1faaa2ca5d0a4c" +checksum = "89eab4d20ce20cea182308bca13088fecea9c05f6776cf287205d41a0ed3c847" dependencies = [ "encode_unicode 0.3.6", - "lazy_static", "libc", + "once_cell", + "terminal_size", "unicode-width", - "windows-sys 0.42.0", + "winapi", ] [[package]] @@ -230,54 +221,10 @@ dependencies = [ [[package]] name = "ctor" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" -dependencies = [ - "quote", - "syn", -] - -[[package]] -name = "cxx" -version = "1.0.85" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5add3fc1717409d029b20c5b6903fc0c0b02fa6741d820054f4a2efa5e5816fd" +checksum = "cdffe87e1d521a10f9696f833fe502293ea446d7f256c06128293a4119bdf4cb" dependencies = [ - "cc", - "cxxbridge-flags", - "cxxbridge-macro", - "link-cplusplus", -] - -[[package]] -name = "cxx-build" -version = "1.0.85" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4c87959ba14bc6fbc61df77c3fcfe180fc32b93538c4f1031dd802ccb5f2ff0" -dependencies = [ - "cc", - "codespan-reporting", - "once_cell", - "proc-macro2", - "quote", - "scratch", - "syn", -] - -[[package]] -name = "cxxbridge-flags" -version = "1.0.85" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69a3e162fde4e594ed2b07d0f83c6c67b745e7f28ce58c6df5e6b6bef99dfb59" - -[[package]] -name = "cxxbridge-macro" -version = "1.0.85" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e7e2adeb6a0d4a282e581096b06e1791532b7d576dcde5ccd9382acf55db8e6" -dependencies = [ - "proc-macro2", "quote", "syn", ] @@ -430,9 +377,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.25" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" dependencies = [ "crc32fast", "miniz_oxide", @@ -468,6 +415,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.25" @@ -475,6 +437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -483,12 +446,34 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +[[package]] +name = "futures-executor" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +[[package]] +name = "futures-macro" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.25" @@ -507,8 +492,11 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -518,9 +506,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.8" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" +checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ "cfg-if", "libc", @@ -529,9 +517,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.15" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" +checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be" dependencies = [ "bytes", "fnv", @@ -593,7 +581,7 @@ checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ "bytes", "fnv", - "itoa 1.0.5", + "itoa 1.0.3", ] [[package]] @@ -627,9 +615,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.23" +version = "0.14.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" +checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" dependencies = [ "bytes", "futures-channel", @@ -640,7 +628,7 @@ dependencies = [ "http-body", "httparse", "httpdate", - "itoa 1.0.5", + "itoa 1.0.3", "pin-project-lite", "socket2", "tokio", @@ -664,28 +652,17 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.53" +version = "0.1.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64c122667b287044802d6ce17ee2ddf13207ed924c712de9a66a5814d5b64765" +checksum = "3bbaead50122b06e9a973ac20bc7445074d99ad9a0a0654934876908a9cec82c" dependencies = [ "android_system_properties", "core-foundation-sys", - "iana-time-zone-haiku", "js-sys", "wasm-bindgen", "winapi", ] -[[package]] -name = "iana-time-zone-haiku" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca" -dependencies = [ - "cxx", - "cxx-build", -] - [[package]] name = "ident_case" version = "1.0.1" @@ -704,9 +681,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.9.2" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" +checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", "hashbrown", @@ -715,9 +692,9 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.17.2" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4295cbb7573c16d310e99e713cf9e75101eb190ab31fccd35f2d2691b4352b19" +checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729" dependencies = [ "console", "number_prefix", @@ -736,9 +713,9 @@ dependencies = [ [[package]] name = "io-lifetimes" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" +checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" dependencies = [ "libc", "windows-sys 0.42.0", @@ -746,9 +723,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.7.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b0d96e660696543b251e58030cf9787df56da39dab19ad60eae7353040917e" +checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" [[package]] name = "is-terminal" @@ -770,9 +747,9 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" [[package]] name = "itoa" -version = "1.0.5" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" +checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" [[package]] name = "js-sys" @@ -791,18 +768,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.139" +version = "0.2.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" - -[[package]] -name = "link-cplusplus" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5" -dependencies = [ - "cc", -] +checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966" [[package]] name = "linux-raw-sys" @@ -849,30 +817,30 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.6.2" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" +checksum = "96590ba8f175222643a85693f33d26e9c8a015f599c216509b1a6894af675d34" dependencies = [ "adler", ] [[package]] name = "mio" -version = "0.8.5" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" +checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" dependencies = [ "libc", "log", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys 0.42.0", + "windows-sys 0.36.1", ] [[package]] name = "mockito" -version = "0.31.1" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80f9fece9bd97ab74339fe19f4bcaf52b76dcc18e5364c7977c1838f76b38de9" +checksum = "401edc088069634afaa5f4a29617b36dba683c0c16fe4435a86debad23fa2f1a" dependencies = [ "assert-json-diff", "colored", @@ -888,9 +856,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.11" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" dependencies = [ "lazy_static", "libc", @@ -924,12 +892,11 @@ dependencies = [ ] [[package]] -name = "num_cpus" -version = "1.15.0" +name = "num_threads" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" dependencies = [ - "hermit-abi 0.2.6", "libc", ] @@ -947,9 +914,9 @@ checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" [[package]] name = "openssl" -version = "0.10.45" +version = "0.10.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b102428fd03bc5edf97f62620f7298614c45cedf287c271e7ed450bbaf83f2e1" +checksum = "618febf65336490dfcf20b73f885f5651a0c89c64c2d4a8c3662585a70bf5bd0" dependencies = [ "bitflags", "cfg-if", @@ -979,18 +946,18 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "111.24.0+1.1.1s" +version = "111.22.0+1.1.1q" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3498f259dab01178c6228c6b00dcef0ed2a2d5e20d648c017861227773ea4abd" +checksum = "8f31f0d509d1c1ae9cada2f9539ff8f37933831fd5098879e482aa687d659853" dependencies = [ "cc", ] [[package]] name = "openssl-sys" -version = "0.9.80" +version = "0.9.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23bbbf7854cd45b83958ebe919f0e8e516793727652e27fda10a8384cfc790b7" +checksum = "e5f9bd0c2710541a3cda73d6f9ac4f1b240de4ae261065d309dbe73d9dceb42f" dependencies = [ "autocfg", "cc", @@ -1002,9 +969,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "3.4.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d84eb1409416d254e4a9c8fa56cc24701755025b458f0fcd8e59e1f5f40c23bf" +checksum = "1f74e330193f90ec45e2b257fa3ef6df087784157ac1ad2c1e71c62837b03aa7" dependencies = [ "num-traits", "serde", @@ -1039,9 +1006,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.26" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" [[package]] name = "portable-atomic" @@ -1051,9 +1018,9 @@ checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b" [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "pretty_assertions" @@ -1107,18 +1074,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.49" +version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" +checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.23" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" dependencies = [ "proc-macro2", ] @@ -1175,9 +1142,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a" +checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" dependencies = [ "aho-corasick", "memchr", @@ -1192,9 +1159,9 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" [[package]] name = "regex-syntax" -version = "0.6.28" +version = "0.6.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" +checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" [[package]] name = "reinfer-cli" @@ -1205,6 +1172,7 @@ dependencies = [ "colored", "dirs", "env_logger", + "futures", "indicatif", "log", "maplit", @@ -1217,6 +1185,7 @@ dependencies = [ "serde", "serde_json", "structopt", + "tokio", "url", "uuid", ] @@ -1226,6 +1195,7 @@ name = "reinfer-client" version = "0.12.1" dependencies = [ "chrono", + "futures", "log", "mockito", "once_cell", @@ -1236,6 +1206,7 @@ dependencies = [ "serde_json", "serde_with", "thiserror", + "tokio", "url", ] @@ -1250,9 +1221,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.13" +version = "0.11.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c" +checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" dependencies = [ "async-compression", "base64", @@ -1290,9 +1261,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.36.6" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549" +checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03" dependencies = [ "bitflags", "errno", @@ -1304,15 +1275,15 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.11" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" +checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" [[package]] name = "ryu" -version = "1.0.12" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" +checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" [[package]] name = "schannel" @@ -1324,12 +1295,6 @@ dependencies = [ "windows-sys 0.36.1", ] -[[package]] -name = "scratch" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" - [[package]] name = "security-framework" version = "2.7.0" @@ -1355,18 +1320,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.152" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.152" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" dependencies = [ "proc-macro2", "quote", @@ -1375,11 +1340,11 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.91" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" +checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45" dependencies = [ - "itoa 1.0.5", + "itoa 1.0.3", "ryu", "serde", ] @@ -1391,16 +1356,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa 1.0.5", + "itoa 1.0.3", "ryu", "serde", ] [[package]] name = "serde_with" -version = "2.1.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25bf4a5a814902cd1014dbccfa4d4560fb8432c779471e96e035602519f82eef" +checksum = "368f2d60d049ea019a84dcd6687b0d1e0030fe663ae105039bdf967ed5e6a9a7" dependencies = [ "base64", "chrono", @@ -1409,14 +1374,14 @@ dependencies = [ "serde", "serde_json", "serde_with_macros", - "time 0.3.17", + "time 0.3.16", ] [[package]] name = "serde_with_macros" -version = "2.1.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3452b4c0f6c1e357f73fdb87cd1efabaa12acf328c7a528e252893baeb3f4aa" +checksum = "1ccadfacf6cf10faad22bbadf55986bdd0856edfb5d9210aa1dcf1f516e84e93" dependencies = [ "darling", "proc-macro2", @@ -1426,9 +1391,9 @@ dependencies = [ [[package]] name = "similar" -version = "2.2.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf" +checksum = "62ac7f900db32bf3fd12e0117dd3dc4da74bc52ebaac97f39668446d89694803" [[package]] name = "slab" @@ -1481,9 +1446,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.107" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5" +checksum = "52205623b1b0f064a4e71182c3b18ae902267282930c6d5462c91b859668426e" dependencies = [ "proc-macro2", "quote", @@ -1524,6 +1489,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "terminal_size" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -1535,18 +1510,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.38" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" +checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.38" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" +checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" dependencies = [ "proc-macro2", "quote", @@ -1555,9 +1530,9 @@ dependencies = [ [[package]] name = "time" -version = "0.1.45" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", "wasi 0.10.0+wasi-snapshot-preview1", @@ -1566,11 +1541,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.17" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" +checksum = "0fab5c8b9980850e06d92ddbe3ab839c062c801f3927c0fb8abd6fc8e918fbca" dependencies = [ - "itoa 1.0.5", + "itoa 1.0.3", + "libc", + "num_threads", "serde", "time-core", "time-macros", @@ -1584,9 +1561,9 @@ checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" [[package]] name = "time-macros" -version = "0.2.6" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" +checksum = "65bb801831d812c562ae7d2bfb531f26e66e4e1f6b17307ba4149c5064710e5b" dependencies = [ "time-core", ] @@ -1608,19 +1585,30 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.23.0" +version = "1.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" +checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" dependencies = [ "autocfg", "bytes", "libc", "memchr", "mio", - "num_cpus", "pin-project-lite", "socket2", - "windows-sys 0.42.0", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1655,9 +1643,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" dependencies = [ "cfg-if", "pin-project-lite", @@ -1666,9 +1654,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.30" +version = "0.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" dependencies = [ "once_cell", ] @@ -1696,9 +1684,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" [[package]] name = "unicode-ident" -version = "1.0.6" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" +checksum = "dcc811dc4066ac62f84f11307873c4850cb653bfa9b1719cee2bd2204a4bc5dd" [[package]] name = "unicode-normalization" @@ -1735,9 +1723,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.2.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" +checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83" dependencies = [ "getrandom", ] @@ -1903,19 +1891,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ "windows_aarch64_gnullvm", - "windows_aarch64_msvc 0.42.0", - "windows_i686_gnu 0.42.0", - "windows_i686_msvc 0.42.0", - "windows_x86_64_gnu 0.42.0", + "windows_aarch64_msvc 0.42.1", + "windows_i686_gnu 0.42.1", + "windows_i686_msvc 0.42.1", + "windows_x86_64_gnu 0.42.1", "windows_x86_64_gnullvm", - "windows_x86_64_msvc 0.42.0", + "windows_x86_64_msvc 0.42.1", ] [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" +checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" [[package]] name = "windows_aarch64_msvc" @@ -1925,9 +1913,9 @@ checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" [[package]] name = "windows_aarch64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" +checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" [[package]] name = "windows_i686_gnu" @@ -1937,9 +1925,9 @@ checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" [[package]] name = "windows_i686_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" +checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" [[package]] name = "windows_i686_msvc" @@ -1949,9 +1937,9 @@ checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" [[package]] name = "windows_i686_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" +checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" [[package]] name = "windows_x86_64_gnu" @@ -1961,15 +1949,15 @@ checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" [[package]] name = "windows_x86_64_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" +checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" +checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" [[package]] name = "windows_x86_64_msvc" @@ -1979,9 +1967,9 @@ checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" [[package]] name = "windows_x86_64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" +checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" [[package]] name = "winreg" diff --git a/api/Cargo.toml b/api/Cargo.toml index 750875f9..d148861a 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -11,18 +11,25 @@ edition = "2021" [lib] name = "reinfer_client" +[features] +default = ["native"] +native = ["tokio"] + [dependencies] chrono = { version = "0.4.22", features = ["serde"] } +futures = "0.3.25" log = "0.4.17" once_cell = "1.16.0" ordered-float = { version = "3.3.0", features = ["serde"] } regex = "1.6.0" -reqwest = { version = "0.11.12", default-features = false, features = ["blocking", "gzip", "json", "multipart", "native-tls-vendored"] } +reqwest = { version = "0.11.12", default-features = false, features = ["gzip", "json", "multipart", "native-tls-vendored"] } serde = { version = "1.0.147", features = ["derive"] } serde_json = "1.0.87" serde_with = "2.0.1" thiserror = "1.0.37" +tokio = { version = "1.21.2", default-features = false, features = ["fs"], optional = true } url = "2.3.1" [dev-dependencies] mockito = "0.31.0" +tokio = { version = "1.21.2", default-features = false, features = ["rt", "macros"] } diff --git a/api/src/lib.rs b/api/src/lib.rs index 0c8708bb..1a2baee7 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -4,17 +4,16 @@ pub mod resources; pub mod retry; use chrono::{DateTime, Utc}; -use log::debug; +use futures::Stream; use once_cell::sync::Lazy; use reqwest::{ - blocking::{multipart::Form, Client as HttpClient, Response as HttpResponse}, header::{self, HeaderMap, HeaderValue}, - IntoUrl, Proxy, Result as ReqwestResult, + Client as HttpClient, IntoUrl, Response as HttpResponse, Result as ReqwestResult, }; use resources::project::ForceDeleteProject; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{cell::Cell, fmt::Display, path::Path}; +use std::{cell::Cell, fmt::Display, future::Future}; use url::Url; use crate::resources::{ @@ -104,13 +103,14 @@ pub use crate::{ }, }; -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Token(pub String); pub struct Config { pub endpoint: Url, pub token: Token, pub accept_invalid_certificates: bool, + #[cfg(feature = "native")] pub proxy: Option, /// Retry settings to use, if any. This will apply to all requests except for POST requests /// which are not idempotent (as they cannot be naively retried). @@ -123,6 +123,7 @@ impl Default for Config { endpoint: DEFAULT_ENDPOINT.clone(), token: Token("".to_owned()), accept_invalid_certificates: false, + #[cfg(feature = "native")] proxy: None, retry_config: None, } @@ -163,7 +164,22 @@ pub struct GetCommentsIterPageQuery<'a> { impl Client { /// Create a new API client. pub fn new(config: Config) -> Result { - let http_client = build_http_client(&config)?; + let http_client = { + let builder = HttpClient::builder(); + #[cfg(feature = "native")] + let builder = { + let mut builder = builder + .gzip(true) + .danger_accept_invalid_certs(config.accept_invalid_certificates); + if let Some(proxy) = config.proxy.clone() { + builder = + builder.proxy(reqwest::Proxy::all(proxy).map_err(Error::BuildHttpClient)?); + } + builder + }; + builder.build().map_err(Error::BuildHttpClient)? + }; + let headers = build_headers(&config)?; let endpoints = Endpoints::new(config.endpoint)?; let retrier = config.retry_config.map(Retrier::new); @@ -176,38 +192,42 @@ impl Client { } /// List all visible sources. - pub fn get_sources(&self) -> Result> { + pub async fn get_sources(&self) -> Result> { Ok(self - .get::<_, GetAvailableSourcesResponse>(self.endpoints.sources.clone())? + .get::<_, GetAvailableSourcesResponse>(self.endpoints.sources.clone()) + .await? .sources) } /// Get a source by either id or name. - pub fn get_user(&self, user: impl Into) -> Result { + pub async fn get_user(&self, user: impl Into) -> Result { Ok(match user.into() { UserIdentifier::Id(user_id) => { - self.get::<_, GetUserResponse>(self.endpoints.user_by_id(&user_id)?)? + self.get::<_, GetUserResponse>(self.endpoints.user_by_id(&user_id)?) + .await? .user } }) } /// Get a source by either id or name. - pub fn get_source(&self, source: impl Into) -> Result { + pub async fn get_source(&self, source: impl Into) -> Result { Ok(match source.into() { SourceIdentifier::Id(source_id) => { - self.get::<_, GetSourceResponse>(self.endpoints.source_by_id(&source_id)?)? + self.get::<_, GetSourceResponse>(self.endpoints.source_by_id(&source_id)?) + .await? .source } SourceIdentifier::FullName(source_name) => { - self.get::<_, GetSourceResponse>(self.endpoints.source_by_name(&source_name)?)? + self.get::<_, GetSourceResponse>(self.endpoints.source_by_name(&source_name)?) + .await? .source } }) } /// Create a new source. - pub fn create_source( + pub async fn create_source( &self, source_name: &SourceFullName, options: NewSource<'_>, @@ -216,12 +236,13 @@ impl Client { .put::<_, _, CreateSourceResponse>( self.endpoints.source_by_name(source_name)?, CreateSourceRequest { source: options }, - )? + ) + .await? .source) } /// Update a source. - pub fn update_source( + pub async fn update_source( &self, source_name: &SourceFullName, options: UpdateSource<'_>, @@ -231,43 +252,45 @@ impl Client { self.endpoints.source_by_name(source_name)?, UpdateSourceRequest { source: options }, Retry::Yes, - )? + ) + .await? .source) } /// Delete a source. - pub fn delete_source(&self, source: impl Into) -> Result<()> { + pub async fn delete_source(&self, source: impl Into) -> Result<()> { let source_id = match source.into() { SourceIdentifier::Id(source_id) => source_id, - source @ SourceIdentifier::FullName(_) => self.get_source(source)?.id, + source @ SourceIdentifier::FullName(_) => self.get_source(source).await?.id, }; - self.delete(self.endpoints.source_by_id(&source_id)?) + self.delete(self.endpoints.source_by_id(&source_id)?).await } /// Delete a user. - pub fn delete_user(&self, user: impl Into) -> Result<()> { + pub async fn delete_user(&self, user: impl Into) -> Result<()> { let UserIdentifier::Id(user_id) = user.into(); - self.delete(self.endpoints.user_by_id(&user_id)?) + self.delete(self.endpoints.user_by_id(&user_id)?).await } /// Delete comments by id in a source. - pub fn delete_comments( + pub async fn delete_comments( &self, source: impl Into, comments: &[CommentId], ) -> Result<()> { let source_full_name = match source.into() { - source @ SourceIdentifier::Id(_) => self.get_source(source)?.full_name(), + source @ SourceIdentifier::Id(_) => self.get_source(source).await?.full_name(), SourceIdentifier::FullName(source_full_name) => source_full_name, }; self.delete_query( self.endpoints.comments_v1(&source_full_name)?, Some(&id_list_query(comments.iter().map(|uid| &uid.0))), ) + .await } /// Get a page of comments from a source. - pub fn get_comments_iter_page( + pub async fn get_comments_iter_page( &self, source_name: &SourceFullName, continuation: Option<&ContinuationKind>, @@ -294,30 +317,67 @@ impl Client { limit, }; self.get_query(self.endpoints.comments(source_name)?, Some(&query_params)) + .await } /// Iterate through all comments in a source. - pub fn get_comments_iter<'a>( + pub fn get_comments<'a>( &'a self, source_name: &'a SourceFullName, page_size: Option, timerange: CommentsIterTimerange, - ) -> CommentsIter<'a> { - CommentsIter::new(self, source_name, page_size, timerange) + ) -> impl Stream>> + 'a { + // Default number of comments per page to request from API. + const DEFAULT_PAGE_SIZE: usize = 64; + + struct CommentCursor { + continuation: Option, + page_size: usize, + to_timestamp: Option>, + done: bool, + } + let cursor = CommentCursor { + continuation: timerange.from.map(ContinuationKind::Timestamp), + page_size: page_size.unwrap_or(DEFAULT_PAGE_SIZE), + to_timestamp: timerange.to, + done: false, + }; + futures::stream::unfold(cursor, |mut cursor| async { + if cursor.done { + return None; + } + + let comments_page = self + .get_comments_iter_page( + source_name, + cursor.continuation.as_ref(), + cursor.to_timestamp, + cursor.page_size, + ) + .await + .map(|page| { + cursor.continuation = page.continuation.map(ContinuationKind::Continuation); + cursor.done = cursor.continuation.is_none(); + page.comments + }); + + Some((comments_page, cursor)) + }) } /// Get a single comment by id. - pub fn get_comment<'a>( + pub async fn get_comment<'a>( &'a self, source_name: &'a SourceFullName, comment_id: &'a CommentId, ) -> Result { Ok(self - .get::<_, GetCommentResponse>(self.endpoints.comment_by_id(source_name, comment_id)?)? + .get::<_, GetCommentResponse>(self.endpoints.comment_by_id(source_name, comment_id)?) + .await? .comment) } - pub fn put_comments( + pub async fn put_comments( &self, source_name: &SourceFullName, comments: &[NewComment], @@ -326,9 +386,10 @@ impl Client { self.endpoints.comments(source_name)?, PutCommentsRequest { comments }, ) + .await } - pub fn sync_comments( + pub async fn sync_comments( &self, source_name: &SourceFullName, comments: &[NewComment], @@ -338,9 +399,10 @@ impl Client { SyncCommentsRequest { comments }, Retry::Yes, ) + .await } - pub fn put_emails( + pub async fn put_emails( &self, bucket_name: &BucketFullName, emails: &[NewEmail], @@ -349,34 +411,53 @@ impl Client { self.endpoints.put_emails(bucket_name)?, PutEmailsRequest { emails }, ) + .await } - pub fn post_user(&self, user_id: &UserId, user: UpdateUser) -> Result { + pub async fn post_user(&self, user_id: &UserId, user: UpdateUser) -> Result { self.post( self.endpoints.post_user(user_id)?, PostUserRequest { user: &user }, Retry::Yes, ) + .await } - pub fn put_comment_audio( + #[cfg(feature = "native")] + pub async fn put_comment_audio( &self, source_id: &SourceId, comment_id: &CommentId, - audio_path: impl AsRef, + audio_path: impl AsRef, ) -> Result<()> { - let form = Form::new() - .file("file", audio_path) - .map_err(|source| Error::Unknown { - message: "PUT comment audio operation failed".to_owned(), - source: source.into(), - })?; + use reqwest::multipart::{Form, Part}; + + let audio_part = { + let file_name: String = audio_path + .as_ref() + .file_name() + .map(std::ffi::OsStr::to_string_lossy) + .unwrap_or_else(|| "unknown".into()) + .to_string(); // it's a Cow + + let audio_content = + tokio::fs::read(audio_path) + .await + .map_err(|source| Error::Unknown { + message: "Could not read audio file {audio_path}".to_owned(), + source: source.into(), + })?; + + Part::bytes(audio_content).file_name(file_name) + }; + let http_response = self .http_client .put(self.endpoints.comment_audio(source_id, comment_id)?) .headers(self.headers.clone()) - .multipart(form) + .multipart(Form::new().part("file", audio_part)) .send() + .await .map_err(|source| Error::ReqwestError { message: "PUT comment audio operation failed".to_owned(), source, @@ -384,35 +465,39 @@ impl Client { let status = http_response.status(); http_response .json::>() + .await .map_err(Error::BadJsonResponse)? .into_result(status)?; Ok(()) } - pub fn get_datasets(&self) -> Result> { + pub async fn get_datasets(&self) -> Result> { Ok(self - .get::<_, GetAvailableDatasetsResponse>(self.endpoints.datasets.clone())? + .get::<_, GetAvailableDatasetsResponse>(self.endpoints.datasets.clone()) + .await? .datasets) } - pub fn get_dataset(&self, dataset: IdentifierT) -> Result + pub async fn get_dataset(&self, dataset: IdentifierT) -> Result where IdentifierT: Into, { Ok(match dataset.into() { DatasetIdentifier::Id(dataset_id) => { - self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_id(&dataset_id)?)? + self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_id(&dataset_id)?) + .await? .dataset } DatasetIdentifier::FullName(dataset_name) => { - self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_name(&dataset_name)?)? + self.get::<_, GetDatasetResponse>(self.endpoints.dataset_by_name(&dataset_name)?) + .await? .dataset } }) } /// Create a dataset. - pub fn create_dataset( + pub async fn create_dataset( &self, dataset_name: &DatasetFullName, options: NewDataset<'_>, @@ -421,38 +506,48 @@ impl Client { .put::<_, _, CreateDatasetResponse>( self.endpoints.dataset_by_name(dataset_name)?, CreateDatasetRequest { dataset: options }, - )? + ) + .await? .dataset) } /// Update a dataset. - pub fn update_dataset( + pub async fn update_dataset( &self, dataset_name: &DatasetFullName, options: UpdateDataset<'_>, ) -> Result { + eprintln!( + "{:?}", + UpdateDatasetRequest { + dataset: options.clone() + } + ); + Ok(self .post::<_, _, UpdateDatasetResponse>( self.endpoints.dataset_by_name(dataset_name)?, UpdateDatasetRequest { dataset: options }, Retry::Yes, - )? + ) + .await? .dataset) } - pub fn delete_dataset(&self, dataset: IdentifierT) -> Result<()> + pub async fn delete_dataset(&self, dataset: IdentifierT) -> Result<()> where IdentifierT: Into, { let dataset_id = match dataset.into() { DatasetIdentifier::Id(dataset_id) => dataset_id, - dataset @ DatasetIdentifier::FullName(_) => self.get_dataset(dataset)?.id, + dataset @ DatasetIdentifier::FullName(_) => self.get_dataset(dataset).await?.id, }; self.delete(self.endpoints.dataset_by_id(&dataset_id)?) + .await } /// Get labellings for a given a dataset and a list of comment UIDs. - pub fn get_labellings<'a>( + pub async fn get_labellings<'a>( &self, dataset_name: &DatasetFullName, comment_uids: impl Iterator, @@ -461,7 +556,8 @@ impl Client { .get_query::<_, _, GetAnnotationsResponse>( self.endpoints.get_labellings(dataset_name)?, Some(&id_list_query(comment_uids.into_iter().map(|id| &id.0))), - )? + ) + .await? .results) } @@ -472,12 +568,53 @@ impl Client { source_id: &'a SourceId, return_predictions: bool, limit: Option, - ) -> LabellingsIter<'a> { - LabellingsIter::new(self, dataset_name, source_id, return_predictions, limit) + ) -> impl Stream>> + 'a { + struct LabellingsCursor { + after: Option, + return_predictions: bool, + limit: Option, + done: bool, + } + let cursor = LabellingsCursor { + after: None, + return_predictions, + limit, + done: false, + }; + + futures::stream::unfold(cursor, |mut cursor| async { + if cursor.done { + return None; + } + let page_result = self + .get_labellings_in_bulk( + dataset_name, + GetLabellingsInBulk { + source_id, + return_predictions: &cursor.return_predictions, + after: &cursor.after, + limit: &cursor.limit, + }, + ) + .await + .map(|page| { + if cursor.after == page.after && !page.results.is_empty() { + panic!("Labellings API did not increment pagination continuation"); + } + + cursor.after = page.after; + if page.results.is_empty() { + cursor.done = true; + } + + page.results + }); + Some((page_result, cursor)) + }) } /// Get reviewed comments in bulk - pub fn get_labellings_in_bulk( + pub async fn get_labellings_in_bulk( &self, dataset_name: &DatasetFullName, query_parameters: GetLabellingsInBulk<'_>, @@ -486,10 +623,11 @@ impl Client { self.endpoints.get_labellings(dataset_name)?, Some(&query_parameters), ) + .await } /// Update labellings for a given a dataset and comment UID. - pub fn update_labelling( + pub async fn update_labelling( &self, dataset_name: &DatasetFullName, comment_uid: &CommentUid, @@ -506,10 +644,11 @@ impl Client { }, Retry::Yes, ) + .await } /// Get predictions for a given a dataset, a model version, and a list of comment UIDs. - pub fn get_comment_predictions<'a>( + pub async fn get_comment_predictions<'a>( &self, dataset_name: &DatasetFullName, model_version: &ModelVersion, @@ -524,17 +663,19 @@ impl Client { "uids": comment_uids.into_iter().map(|id| id.0.as_str()).collect::>(), }), Retry::Yes, - )? + ) + .await? .predictions) } - pub fn get_triggers(&self, dataset_name: &DatasetFullName) -> Result> { + pub async fn get_triggers(&self, dataset_name: &DatasetFullName) -> Result> { Ok(self - .get::<_, GetTriggersResponse>(self.endpoints.triggers(dataset_name)?)? + .get::<_, GetTriggersResponse>(self.endpoints.triggers(dataset_name)?) + .await? .triggers) } - pub fn get_recent_comments( + pub async fn get_recent_comments( &self, dataset_name: &DatasetFullName, filter: &CommentFilter, @@ -550,50 +691,56 @@ impl Client { }, Retry::No, ) + .await } - pub fn get_current_user(&self) -> Result { + pub async fn get_current_user(&self) -> Result { Ok(self - .get::<_, GetCurrentUserResponse>(self.endpoints.current_user.clone())? + .get::<_, GetCurrentUserResponse>(self.endpoints.current_user.clone()) + .await? .user) } - pub fn get_users(&self) -> Result> { + pub async fn get_users(&self) -> Result> { Ok(self - .get::<_, GetAvailableUsersResponse>(self.endpoints.users.clone())? + .get::<_, GetAvailableUsersResponse>(self.endpoints.users.clone()) + .await? .users) } - pub fn create_user(&self, user: NewUser<'_>) -> Result { + pub async fn create_user(&self, user: NewUser<'_>) -> Result { Ok(self .put::<_, _, CreateUserResponse>( self.endpoints.users.clone(), CreateUserRequest { user }, - )? + ) + .await? .user) } - pub fn send_welcome_email(&self, user_id: UserId) -> Result<()> { + pub async fn send_welcome_email(&self, user_id: UserId) -> Result<()> { self.post::<_, _, WelcomeEmailResponse>( self.endpoints.welcome_email(&user_id)?, json!({}), Retry::No, - )?; + ) + .await?; Ok(()) } - pub fn get_statistics(&self, dataset_name: &DatasetFullName) -> Result { + pub async fn get_statistics(&self, dataset_name: &DatasetFullName) -> Result { Ok(self .post::<_, _, GetStatisticsResponse>( self.endpoints.statistics(dataset_name)?, json!({}), Retry::No, - )? + ) + .await? .statistics) } /// Create a new bucket. - pub fn create_bucket( + pub async fn create_bucket( &self, bucket_name: &BucketFullName, options: NewBucket<'_>, @@ -602,44 +749,48 @@ impl Client { .put::<_, _, CreateBucketResponse>( self.endpoints.bucket_by_name(bucket_name)?, CreateBucketRequest { bucket: options }, - )? + ) + .await? .bucket) } - pub fn get_buckets(&self) -> Result> { + pub async fn get_buckets(&self) -> Result> { Ok(self - .get::<_, GetAvailableBucketsResponse>(self.endpoints.buckets.clone())? + .get::<_, GetAvailableBucketsResponse>(self.endpoints.buckets.clone()) + .await? .buckets) } - pub fn get_bucket(&self, bucket: IdentifierT) -> Result + pub async fn get_bucket(&self, bucket: IdentifierT) -> Result where IdentifierT: Into, { Ok(match bucket.into() { BucketIdentifier::Id(bucket_id) => { - self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_id(&bucket_id)?)? + self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_id(&bucket_id)?) + .await? .bucket } BucketIdentifier::FullName(bucket_name) => { - self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_name(&bucket_name)?)? + self.get::<_, GetBucketResponse>(self.endpoints.bucket_by_name(&bucket_name)?) + .await? .bucket } }) } - pub fn delete_bucket(&self, bucket: IdentifierT) -> Result<()> + pub async fn delete_bucket(&self, bucket: IdentifierT) -> Result<()> where IdentifierT: Into, { let bucket_id = match bucket.into() { BucketIdentifier::Id(bucket_id) => bucket_id, - bucket @ BucketIdentifier::FullName(_) => self.get_bucket(bucket)?.id, + bucket @ BucketIdentifier::FullName(_) => self.get_bucket(bucket).await?.id, }; - self.delete(self.endpoints.bucket_by_id(&bucket_id)?) + self.delete(self.endpoints.bucket_by_id(&bucket_id)?).await } - pub fn fetch_trigger_comments( + pub async fn fetch_trigger_comments( &self, trigger_name: &TriggerFullName, size: u32, @@ -649,9 +800,10 @@ impl Client { TriggerFetchRequest { size }, Retry::No, ) + .await } - pub fn advance_trigger( + pub async fn advance_trigger( &self, trigger_name: &TriggerFullName, sequence_id: TriggerSequenceId, @@ -660,11 +812,12 @@ impl Client { self.endpoints.trigger_advance(trigger_name)?, TriggerAdvanceRequest { sequence_id }, Retry::No, - )?; + ) + .await?; Ok(()) } - pub fn reset_trigger( + pub async fn reset_trigger( &self, trigger_name: &TriggerFullName, to_comment_created_at: DateTime, @@ -675,40 +828,45 @@ impl Client { to_comment_created_at, }, Retry::No, - )?; + ) + .await?; Ok(()) } - pub fn tag_trigger_exceptions( + pub async fn tag_trigger_exceptions<'a>( &self, trigger_name: &TriggerFullName, - exceptions: &[TriggerException], + exceptions: &[TriggerException<'a>], ) -> Result<()> { self.put::<_, _, serde::de::IgnoredAny>( self.endpoints.trigger_exceptions(trigger_name)?, TagTriggerExceptionsRequest { exceptions }, - )?; + ) + .await?; Ok(()) } /// Gets a project. - pub fn get_project(&self, project_name: &ProjectName) -> Result { - let response = - self.get::<_, GetProjectResponse>(self.endpoints.project_by_name(project_name)?)?; + pub async fn get_project(&self, project_name: &ProjectName) -> Result { + let response = self + .get::<_, GetProjectResponse>(self.endpoints.project_by_name(project_name)?) + .await?; Ok(response.project) } /// Gets all projects. - pub fn get_projects(&self) -> Result> { - let response = self.get::<_, GetProjectsResponse>(self.endpoints.projects.clone())?; + pub async fn get_projects(&self) -> Result> { + let response = self + .get::<_, GetProjectsResponse>(self.endpoints.projects.clone()) + .await?; Ok(response.projects) } /// Creates a new project. - pub fn create_project( + pub async fn create_project<'a>( &self, project_name: &ProjectName, - options: NewProject, + options: NewProject<'a>, user_ids: &[UserId], ) -> Result { Ok(self @@ -718,50 +876,53 @@ impl Client { project: options, user_ids, }, - )? + ) + .await? .project) } /// Updates an existing project. - pub fn update_project( + pub async fn update_project<'request>( &self, project_name: &ProjectName, - options: UpdateProject, + options: UpdateProject<'request>, ) -> Result { Ok(self .post::<_, _, UpdateProjectResponse>( self.endpoints.project_by_name(project_name)?, UpdateProjectRequest { project: options }, Retry::Yes, - )? + ) + .await? .project) } /// Deletes an existing project. - pub fn delete_project( + pub async fn delete_project( &self, project_name: &ProjectName, force_delete: ForceDeleteProject, ) -> Result<()> { let endpoint = self.endpoints.project_by_name(project_name)?; match force_delete { - ForceDeleteProject::No => self.delete(endpoint)?, + ForceDeleteProject::No => self.delete(endpoint).await?, ForceDeleteProject::Yes => { - self.delete_query(endpoint, Some(&json!({ "force": true })))? + self.delete_query(endpoint, Some(&json!({ "force": true }))) + .await? } }; Ok(()) } - fn get(&self, url: LocationT) -> Result + async fn get(&self, url: LocationT) -> Result where LocationT: IntoUrl + Display + Clone, for<'de> SuccessT: Deserialize<'de>, { - self.get_query::(url, None) + self.get_query::(url, None).await } - fn get_query( + async fn get_query( &self, url: LocationT, query: Option<&QueryT>, @@ -771,7 +932,7 @@ impl Client { QueryT: Serialize, for<'de> SuccessT: Deserialize<'de>, { - debug!("Attempting GET `{}`", url); + log::debug!("Attempting GET `{}`", url); let http_response = self .with_retries(|| { let mut request = self @@ -783,6 +944,7 @@ impl Client { } request.send() }) + .await .map_err(|source| Error::ReqwestError { source, message: "GET operation failed.".to_owned(), @@ -790,23 +952,28 @@ impl Client { let status = http_response.status(); http_response .json::>() + .await .map_err(Error::BadJsonResponse)? .into_result(status) } - fn delete(&self, url: LocationT) -> Result<()> + async fn delete(&self, url: LocationT) -> Result<()> where LocationT: IntoUrl + Display + Clone, { - self.delete_query::(url, None) + self.delete_query::(url, None).await } - fn delete_query(&self, url: LocationT, query: Option<&QueryT>) -> Result<()> + async fn delete_query( + &self, + url: LocationT, + query: Option<&QueryT>, + ) -> Result<()> where LocationT: IntoUrl + Display + Clone, QueryT: Serialize, { - debug!("Attempting DELETE `{}`", url); + log::debug!("Attempting DELETE `{}`", url); let attempts = Cell::new(0); let http_response = self @@ -822,6 +989,7 @@ impl Client { } request.send() }) + .await .map_err(|source| Error::ReqwestError { source, message: "DELETE operation failed.".to_owned(), @@ -829,6 +997,7 @@ impl Client { let status = http_response.status(); http_response .json::>() + .await .map_err(Error::BadJsonResponse)? .into_result(status) .map_or_else( @@ -845,7 +1014,7 @@ impl Client { ) } - fn post( + async fn post( &self, url: LocationT, request: RequestT, @@ -856,7 +1025,7 @@ impl Client { RequestT: Serialize, for<'de> SuccessT: Deserialize<'de>, { - debug!("Attempting POST `{}`", url); + log::debug!("Attempting POST `{}`", url); let do_request = || { self.http_client .post(url.clone()) @@ -865,8 +1034,8 @@ impl Client { .send() }; let result = match retry { - Retry::Yes => self.with_retries(do_request), - Retry::No => do_request(), + Retry::Yes => self.with_retries(do_request).await, + Retry::No => do_request().await, }; let http_response = result.map_err(|source| Error::ReqwestError { @@ -876,11 +1045,12 @@ impl Client { let status = http_response.status(); http_response .json::>() + .await .map_err(Error::BadJsonResponse)? .into_result(status) } - fn put( + async fn put( &self, url: LocationT, request: RequestT, @@ -890,7 +1060,7 @@ impl Client { RequestT: Serialize, for<'de> SuccessT: Deserialize<'de>, { - debug!("Attempting PUT `{}`", url); + log::debug!("Attempting PUT `{}`", url); let http_response = self .with_retries(|| { self.http_client @@ -899,6 +1069,7 @@ impl Client { .json(&request) .send() }) + .await .map_err(|source| Error::ReqwestError { source, message: "PUT operation failed.".to_owned(), @@ -906,148 +1077,43 @@ impl Client { let status = http_response.status(); http_response .json::>() + .await .map_err(Error::BadJsonResponse)? .into_result(status) } - fn with_retries( + async fn with_retries( &self, - send_request: impl Fn() -> ReqwestResult, - ) -> ReqwestResult { + send_request: CallbackT, + ) -> ReqwestResult + where + CallbackT: Fn() -> ResponseT, + ResponseT: Future>, + { match &self.retrier { - Some(retrier) => retrier.with_retries(send_request), - None => send_request(), + Some(retrier) => retrier.with_retries(send_request).await, + None => send_request().await, } } } +#[derive(Copy, Clone, Debug)] enum Retry { Yes, No, } +#[derive(Clone, Debug)] pub enum ContinuationKind { Timestamp(DateTime), Continuation(Continuation), } -pub struct CommentsIter<'a> { - client: &'a Client, - source_name: &'a SourceFullName, - continuation: Option, - done: bool, - page_size: usize, - to_timestamp: Option>, -} - -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct CommentsIterTimerange { pub from: Option>, pub to: Option>, } -impl<'a> CommentsIter<'a> { - // Default number of comments per page to request from API. - pub const DEFAULT_PAGE_SIZE: usize = 64; - // Maximum number of comments per page which can be requested from the API. - pub const MAX_PAGE_SIZE: usize = 256; - - fn new( - client: &'a Client, - source_name: &'a SourceFullName, - page_size: Option, - timerange: CommentsIterTimerange, - ) -> Self { - let (from_timestamp, to_timestamp) = (timerange.from, timerange.to); - Self { - client, - source_name, - to_timestamp, - continuation: from_timestamp.map(ContinuationKind::Timestamp), - done: false, - page_size: page_size.unwrap_or(Self::DEFAULT_PAGE_SIZE), - } - } -} - -impl<'a> Iterator for CommentsIter<'a> { - type Item = Result>; - - fn next(&mut self) -> Option { - if self.done { - return None; - } - let response = self.client.get_comments_iter_page( - self.source_name, - self.continuation.as_ref(), - self.to_timestamp, - self.page_size, - ); - Some(response.map(|page| { - self.continuation = page.continuation.map(ContinuationKind::Continuation); - self.done = self.continuation.is_none(); - page.comments - })) - } -} - -pub struct LabellingsIter<'a> { - client: &'a Client, - dataset_name: &'a DatasetFullName, - source_id: &'a SourceId, - return_predictions: bool, - after: Option, - limit: Option, - done: bool, -} - -impl<'a> LabellingsIter<'a> { - fn new( - client: &'a Client, - dataset_name: &'a DatasetFullName, - source_id: &'a SourceId, - return_predictions: bool, - limit: Option, - ) -> Self { - Self { - client, - dataset_name, - source_id, - return_predictions, - after: None, - limit, - done: false, - } - } -} - -impl<'a> Iterator for LabellingsIter<'a> { - type Item = Result>; - - fn next(&mut self) -> Option { - if self.done { - return None; - } - let response = self.client.get_labellings_in_bulk( - self.dataset_name, - GetLabellingsInBulk { - source_id: self.source_id, - return_predictions: &self.return_predictions, - after: &self.after, - limit: &self.limit, - }, - ); - Some(response.map(|page| { - if self.after == page.after && !page.results.is_empty() { - panic!("Labellings API did not increment pagination continuation"); - } - self.after = page.after; - if page.results.is_empty() { - self.done = true; - } - page.results - })) - } -} #[derive(Debug)] struct Endpoints { @@ -1225,6 +1291,7 @@ impl Endpoints { ) } + #[cfg(feature = "native")] fn comment_audio(&self, source_id: &SourceId, comment_id: &CommentId) -> Result { construct_endpoint( &self.base, @@ -1332,16 +1399,6 @@ impl Endpoints { } } -fn build_http_client(config: &Config) -> Result { - let mut builder = HttpClient::builder() - .gzip(true) - .danger_accept_invalid_certs(config.accept_invalid_certificates); - if let Some(proxy) = config.proxy.clone() { - builder = builder.proxy(Proxy::all(proxy).map_err(Error::BuildHttpClient)?); - } - builder.build().map_err(Error::BuildHttpClient) -} - fn build_headers(config: &Config) -> Result { let mut headers = HeaderMap::new(); headers.insert( @@ -1389,3 +1446,6 @@ mod tests { ); } } + +/// Maximum number of comments per page which can be requested from the API. +pub const MAX_PAGE_SIZE: usize = 256; diff --git a/api/src/retry.rs b/api/src/retry.rs index f29ff7d6..ef63a6cc 100644 --- a/api/src/retry.rs +++ b/api/src/retry.rs @@ -1,7 +1,10 @@ -use reqwest::{blocking::Response, Result}; -use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; -use std::thread::sleep; -use std::time::Duration; +use reqwest::{Response, Result}; +use std::{ + future::Future, + sync::atomic::{AtomicBool, Ordering::SeqCst}, + thread::sleep, + time::Duration, +}; /// Strategy to use if retrying . #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -42,11 +45,18 @@ impl Retrier { } } - pub fn with_retries(&self, send_request: impl Fn() -> Result) -> Result { + pub async fn with_retries( + &self, + send_request: CallbackT, + ) -> Result + where + CallbackT: Fn() -> ResponseT, + ResponseT: Future>, + { if self.is_first_request.swap(false, SeqCst) && self.config.strategy == RetryStrategy::Automatic { - return send_request(); + return send_request().await; } for i_retry in 0..self.config.max_retry_count { @@ -59,7 +69,7 @@ impl Retrier { }}; } - match send_request() { + match send_request().await { Ok(response) if response.status().is_server_error() => { warn_and_sleep!(format!("{} for {}", response.status(), response.url())) } @@ -70,7 +80,7 @@ impl Retrier { } // On last retry don't handle the error, just propagate all errors. - send_request() + send_request().await } } @@ -78,12 +88,12 @@ impl Retrier { mod tests { use super::{Retrier, RetryConfig, RetryStrategy}; use mockito::{mock, server_address}; - use reqwest::blocking::{get, Client}; + use reqwest::{get, Client}; use std::thread::sleep; use std::time::Duration; - #[test] - fn test_always_retry() { + #[tokio::test] + async fn test_always_retry() { let mut handler = Retrier::new(RetryConfig { strategy: RetryStrategy::Always, max_retry_count: 5, @@ -95,7 +105,8 @@ mod tests { let ok = mock("GET", "/").expect(1).create(); assert!( handler - .with_retries(|| get(format!("http://{}", server_address()))) + .with_retries(|| async { get(&format!("http://{}", server_address())).await }) + .await .unwrap() .status() == 200 @@ -111,7 +122,8 @@ mod tests { handler.config.max_retry_count = i_retry; assert!( handler - .with_retries(|| get(format!("http://{}", server_address()))) + .with_retries(|| async { get(&format!("http://{}", server_address())).await }) + .await .unwrap() .status() == 500 @@ -120,8 +132,8 @@ mod tests { } } - #[test] - fn test_automatic_retry() { + #[tokio::test] + async fn test_automatic_retry() { let mut handler = Retrier::new(RetryConfig { strategy: RetryStrategy::Automatic, max_retry_count: 5, @@ -133,7 +145,8 @@ mod tests { let err = mock("GET", "/").with_status(500).expect(1).create(); assert!( handler - .with_retries(|| get(format!("http://{}", server_address()))) + .with_retries(|| async { get(&format!("http://{}", server_address())).await }) + .await .unwrap() .status() == 500 @@ -144,7 +157,8 @@ mod tests { let ok = mock("GET", "/").expect(1).create(); assert!( handler - .with_retries(|| get(format!("http://{}", server_address()))) + .with_retries(|| async { get(&format!("http://{}", server_address())).await }) + .await .unwrap() .status() == 200 @@ -160,7 +174,8 @@ mod tests { handler.config.max_retry_count = i_retry; assert!( handler - .with_retries(|| get(format!("http://{}", server_address()))) + .with_retries(|| async { get(&format!("http://{}", server_address())).await }) + .await .unwrap() .status() == 500 @@ -169,8 +184,8 @@ mod tests { } } - #[test] - fn test_timeout_retry() { + #[tokio::test] + async fn test_timeout_retry() { let handler = Retrier::new(RetryConfig { strategy: RetryStrategy::Always, max_retry_count: 1, @@ -188,15 +203,17 @@ mod tests { .create(); let client = Client::new(); assert!(handler - .with_retries(|| client - .get(format!("http://{}", server_address())) - .timeout(Duration::from_secs_f64(0.1)) - .send() - .and_then(|r| { - // This is a bit of a hack to force a timeout - let _ = r.text()?; - unreachable!() - })) + .with_retries(|| async { + let response = client + .get(&format!("http://{}", server_address())) + .timeout(Duration::from_secs_f64(0.1)) + .send() + .await?; + // This is a bit of a hack to force a timeout + let _ = response.text().await?; + unreachable!() + }) + .await .unwrap_err() .is_timeout()); timeout.assert(); diff --git a/cli/Cargo.toml b/cli/Cargo.toml index e80c1ed1..accd8555 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -22,6 +22,7 @@ chrono = "0.4.22" colored = "2.0.0" dirs = "4.0.0" env_logger = "0.10.0" +futures = "0.3.25" indicatif = "0.17.0" log = { version = "0.4.17", default-features = false, features = ["release_max_level_info"] } maplit = "1.0.2" @@ -32,6 +33,7 @@ reqwest = { version = "0.11.12", default-features = false } serde = { version = "1.0.147", features = ["derive"] } serde_json = "1.0.87" structopt = { version = "0.3.26", default-features = false } +tokio = { version = "1.21.2", default-features = false, features = ["rt", "macros"] } url = { version = "2.3.1", features = ["serde"] } reinfer-client = { version = "0.12.1", path = "../api" } diff --git a/cli/src/commands/config.rs b/cli/src/commands/config.rs index 44f52c4c..1c0488ae 100644 --- a/cli/src/commands/config.rs +++ b/cli/src/commands/config.rs @@ -117,7 +117,7 @@ pub fn run( table.printstd(); } ConfigArgs::ListContexts { .. } => { - info!("No available contexts."); + info!("No available contexts"); } ConfigArgs::AddContext { name, @@ -139,17 +139,17 @@ pub fn run( ConfigArgs::UseContext { name } => { if !config.set_current_context(name) { return Err(anyhow!( - "No such context `{}` exists in `{}`.", + "No such context `{}` exists in `{}`", name, config_path.as_ref().display(), )); } else { config::write_reinfer_config(config_path, &config)?; - info!("Switched to context `{}`.", name); + info!("Switched to context `{}`", name); } } ConfigArgs::CurrentContext => config.get_current_context().map_or_else( - || info!("There is no default context in use."), + || info!("There is no default context in use"), |current_context| println!("{}", current_context.name), ), ConfigArgs::GetToken { name } => match name.as_ref() { @@ -158,10 +158,10 @@ pub fn run( "{}", config .get_current_context() - .ok_or_else(|| anyhow!("There is no default context in use."))? + .ok_or_else(|| anyhow!("There is no default context in use"))? .token .as_ref() - .ok_or_else(|| anyhow!("The default context has no stored token."))? + .ok_or_else(|| anyhow!("The default context has no stored token"))? ); } Some(name) => { @@ -169,10 +169,10 @@ pub fn run( "{}", config .get_context(name) - .ok_or_else(|| anyhow!("No such context `{}`.", name))? + .ok_or_else(|| anyhow!("No such context `{}`", name))? .token .as_ref() - .ok_or_else(|| anyhow!("The context `{}` has no stored token.", name))? + .ok_or_else(|| anyhow!("The context `{}` has no stored token", name))? ); } }, @@ -181,13 +181,13 @@ pub fn run( if config.delete_context(name) { config::write_reinfer_config(&config_path, &config)?; info!( - "Deleted context `{}` from `{}`.", + "Deleted context `{}` from `{}`", name, config_path.as_ref().display() ); } else { return Err(anyhow!( - "No such context `{}` exists in `{}`.", + "No such context `{}` exists in `{}`", name, config_path.as_ref().display() )); @@ -216,15 +216,15 @@ fn add_or_edit_context( if !name.is_empty() { break name; } else { - error!("Context name cannot be empty."); + error!("Context name cannot be empty"); } }; let existing_context = config.get_context(&name); if existing_context.is_some() { - info!("Context `{}` already exists, it will be modified.", name); + info!("Context `{}` already exists, it will be modified", name); } else { - info!("A new context `{}` will be created.", name); + info!("A new context `{}` will be created", name); } // Get API token (either argument or from stdin) @@ -239,7 +239,7 @@ fn add_or_edit_context( )); } else { warn!( - "Be careful, API tokens are stored in cleartext in {}.", + "Be careful, API tokens are stored in cleartext in {}", config_path.as_ref().display() ); } @@ -281,16 +281,16 @@ fn add_or_edit_context( let update_existing = existing_context.is_some(); let is_new_context = !config.set_context(context); if is_new_context && config.num_contexts() == 1 { - info!("Default context set to `{}`.", name); + info!("Default context set to `{}`", name); config.set_current_context(&name); } config::write_reinfer_config(config_path, &config)?; if update_existing { - info!("Context `{}` was updated.", name); + info!("Context `{}` was updated", name); } else { - info!("New context `{}` was created.", name); + info!("New context `{}` was created", name); } Ok(()) diff --git a/cli/src/commands/create/annotations.rs b/cli/src/commands/create/annotations.rs index 1d7226e1..2f1b063a 100644 --- a/cli/src/commands/create/annotations.rs +++ b/cli/src/commands/create/annotations.rs @@ -43,14 +43,16 @@ pub struct CreateAnnotationsArgs { use_moon_forms: bool, } -pub fn create(client: &Client, args: &CreateAnnotationsArgs) -> Result<()> { +pub async fn create(client: &Client, args: &CreateAnnotationsArgs) -> Result<()> { let source = client .get_source(args.source.clone()) + .await .with_context(|| format!("Unable to get source {}", args.source))?; let source_name = source.full_name(); let dataset = client .get_dataset(args.dataset.clone()) + .await .with_context(|| format!("Unable to get dataset {}", args.dataset))?; let dataset_name = dataset.full_name(); @@ -87,7 +89,8 @@ pub fn create(client: &Client, args: &CreateAnnotationsArgs) -> Result<()> { &statistics, &dataset_name, args.use_moon_forms, - )?; + ) + .await?; if let Some(mut progress) = progress { progress.done(); } @@ -107,7 +110,8 @@ pub fn create(client: &Client, args: &CreateAnnotationsArgs) -> Result<()> { &statistics, &dataset_name, args.use_moon_forms, - )?; + ) + .await?; statistics } }; @@ -120,7 +124,7 @@ pub fn create(client: &Client, args: &CreateAnnotationsArgs) -> Result<()> { } #[allow(clippy::too_many_arguments)] -fn upload_annotations_from_reader( +async fn upload_annotations_from_reader( client: &Client, source: &Source, annotations: impl BufRead, @@ -132,14 +136,12 @@ fn upload_annotations_from_reader( let new_comment = read_comment_result?; if new_comment.has_annotations() { let comment_uid = CommentUid(format!("{}.{}", source.id.0, new_comment.comment.id.0)); + let new_labellings = new_comment.labelling.map(Into::>::into); (if !use_moon_forms { client.update_labelling( dataset_name, &comment_uid, - new_comment - .labelling - .map(Into::>::into) - .as_deref(), + new_labellings.as_deref(), new_comment.entities.as_ref(), None, ) @@ -152,6 +154,7 @@ fn upload_annotations_from_reader( new_comment.moon_forms.as_deref(), ) }) + .await .with_context(|| { format!( "Could not update labelling for comment `{}`", diff --git a/cli/src/commands/create/bucket.rs b/cli/src/commands/create/bucket.rs index 89206420..d1a43296 100644 --- a/cli/src/commands/create/bucket.rs +++ b/cli/src/commands/create/bucket.rs @@ -24,7 +24,7 @@ pub struct CreateBucketArgs { transform_tag: TransformTag, } -pub fn create(client: &Client, args: &CreateBucketArgs, printer: &Printer) -> Result<()> { +pub async fn create(client: &Client, args: &CreateBucketArgs, printer: &Printer) -> Result<()> { let CreateBucketArgs { name, title, @@ -41,6 +41,7 @@ pub fn create(client: &Client, args: &CreateBucketArgs, printer: &Printer) -> Re transform_tag, }, ) + .await .context("Operation to create a bucket has failed")?; info!( "New bucket `{}` [id: {}] created successfully", diff --git a/cli/src/commands/create/comments.rs b/cli/src/commands/create/comments.rs index c8ef420e..5fb7df78 100644 --- a/cli/src/commands/create/comments.rs +++ b/cli/src/commands/create/comments.rs @@ -58,9 +58,10 @@ pub struct CreateCommentsArgs { use_moon_forms: bool, } -pub fn create(client: &Client, args: &CreateCommentsArgs) -> Result<()> { +pub async fn create(client: &Client, args: &CreateCommentsArgs) -> Result<()> { let source = client .get_source(args.source.clone()) + .await .with_context(|| format!("Unable to get source {}", args.source))?; let source_name = source.full_name(); @@ -68,6 +69,7 @@ pub fn create(client: &Client, args: &CreateCommentsArgs) -> Result<()> { Some(dataset_ident) => Some( client .get_dataset(dataset_ident.clone()) + .await .with_context(|| format!("Unable to get dataset {}", args.source))? .full_name(), ), @@ -127,7 +129,8 @@ pub fn create(client: &Client, args: &CreateCommentsArgs) -> Result<()> { args.overwrite, args.allow_duplicates, args.use_moon_forms, - )?; + ) + .await?; if let Some(mut progress) = progress { progress.done(); } @@ -153,7 +156,8 @@ pub fn create(client: &Client, args: &CreateCommentsArgs) -> Result<()> { args.overwrite, args.allow_duplicates, args.use_moon_forms, - )?; + ) + .await?; statistics } }; @@ -240,7 +244,7 @@ type Annotation = ( ); #[allow(clippy::too_many_arguments)] -fn upload_batch( +async fn upload_batch( client: &Client, source: &Source, dataset_name: Option<&DatasetFullName>, @@ -259,6 +263,7 @@ fn upload_batch( if !comments_to_put.is_empty() { client .put_comments(&source.full_name(), comments_to_put) + .await .context("Could not put batch of comments")?; uploaded += comments_to_put.len(); @@ -267,6 +272,7 @@ fn upload_batch( if !comments_to_sync.is_empty() { let result = client .sync_comments(&source.full_name(), comments_to_sync) + .await .context("Could not sync batch of comments")?; uploaded += comments_to_sync.len(); @@ -293,6 +299,7 @@ fn upload_batch( entities.as_ref(), moon_forms.as_deref(), ) + .await .with_context(|| { format!("Could not update labelling for comment `{}`", comment_uid.0,) })?; @@ -304,6 +311,7 @@ fn upload_batch( for (comment_id, audio_path) in audio_paths.iter() { client .put_comment_audio(&source.id, comment_id, audio_path) + .await .with_context(|| { format!( "Could not upload audio file at `{}` for comment id `{}", @@ -317,7 +325,7 @@ fn upload_batch( } #[allow(clippy::too_many_arguments)] -fn upload_comments_from_reader( +async fn upload_comments_from_reader( client: &Client, source: &Source, comments: impl BufRead, @@ -386,7 +394,8 @@ fn upload_comments_from_reader( &comments_to_sync, &annotations, &audio_paths, - )?; + ) + .await?; comments_to_put.clear(); comments_to_sync.clear(); annotations.clear(); @@ -404,7 +413,8 @@ fn upload_comments_from_reader( &comments_to_sync, &annotations, &audio_paths, - )?; + ) + .await?; } Ok(()) diff --git a/cli/src/commands/create/dataset.rs b/cli/src/commands/create/dataset.rs index ad748e7f..930f1400 100644 --- a/cli/src/commands/create/dataset.rs +++ b/cli/src/commands/create/dataset.rs @@ -55,7 +55,7 @@ pub struct CreateDatasetArgs { copy_annotations_from: Option, } -pub fn create(client: &Client, args: &CreateDatasetArgs, printer: &Printer) -> Result<()> { +pub async fn create(client: &Client, args: &CreateDatasetArgs, printer: &Printer) -> Result<()> { let CreateDatasetArgs { name, title, @@ -75,6 +75,7 @@ pub fn create(client: &Client, args: &CreateDatasetArgs, printer: &Printer) -> R source_ids.push( client .get_source(source.clone()) + .await .context("Operation to get source has failed")? .id, ); @@ -114,6 +115,7 @@ pub fn create(client: &Client, args: &CreateDatasetArgs, printer: &Printer) -> R copy_annotations_from: copy_annotations_from.as_deref(), }, ) + .await .context("Operation to create a dataset has failed.")?; info!( "New dataset `{}` [id: {}] created successfully", diff --git a/cli/src/commands/create/emails.rs b/cli/src/commands/create/emails.rs index 2c96e420..9f9d3d82 100644 --- a/cli/src/commands/create/emails.rs +++ b/cli/src/commands/create/emails.rs @@ -34,9 +34,10 @@ pub struct CreateEmailsArgs { no_progress: bool, } -pub fn create(client: &Client, args: &CreateEmailsArgs) -> Result<()> { +pub async fn create(client: &Client, args: &CreateEmailsArgs) -> Result<()> { let bucket = client .get_bucket(args.bucket.clone()) + .await .with_context(|| format!("Unable to get bucket {}", args.bucket))?; let statistics = match &args.emails_path { @@ -63,7 +64,7 @@ pub fn create(client: &Client, args: &CreateEmailsArgs) -> Result<()> { } else { Some(progress_bar(file_metadata.len(), &statistics)) }; - upload_emails_from_reader(client, &bucket, file, args.batch_size, &statistics)?; + upload_emails_from_reader(client, &bucket, file, args.batch_size, &statistics).await?; if let Some(mut progress) = progress { progress.done(); } @@ -82,7 +83,8 @@ pub fn create(client: &Client, args: &CreateEmailsArgs) -> Result<()> { BufReader::new(io::stdin()), args.batch_size, &statistics, - )?; + ) + .await?; statistics } }; @@ -95,7 +97,7 @@ pub fn create(client: &Client, args: &CreateEmailsArgs) -> Result<()> { Ok(()) } -fn upload_emails_from_reader( +async fn upload_emails_from_reader( client: &Client, bucket: &Bucket, mut emails: impl BufRead, @@ -131,6 +133,7 @@ fn upload_emails_from_reader( // Upload emails client .put_emails(&bucket.full_name(), &batch) + .await .context("Could not upload batch of emails")?; statistics.add_emails(StatisticsUpdate { uploaded: batch.len(), diff --git a/cli/src/commands/create/mod.rs b/cli/src/commands/create/mod.rs index 7c2f0539..2a7c523b 100644 --- a/cli/src/commands/create/mod.rs +++ b/cli/src/commands/create/mod.rs @@ -57,18 +57,20 @@ pub enum CreateArgs { TriggerException(CreateTriggerExceptionArgs), } -pub fn run(create_args: &CreateArgs, client: Client, printer: &Printer) -> Result<()> { +pub async fn run(create_args: &CreateArgs, client: Client, printer: &Printer) -> Result<()> { match create_args { - CreateArgs::Bucket(bucket_args) => bucket::create(&client, bucket_args, printer), - CreateArgs::Source(source_args) => source::create(&client, source_args, printer), - CreateArgs::Dataset(dataset_args) => dataset::create(&client, dataset_args, printer), - CreateArgs::Project(project_args) => project::create(&client, project_args, printer), - CreateArgs::Comments(comments_args) => comments::create(&client, comments_args), - CreateArgs::Annotations(annotations_args) => annotations::create(&client, annotations_args), - CreateArgs::Emails(emails_args) => emails::create(&client, emails_args), - CreateArgs::User(user_args) => user::create(&client, user_args, printer), + CreateArgs::Bucket(bucket_args) => bucket::create(&client, bucket_args, printer).await, + CreateArgs::Source(source_args) => source::create(&client, source_args, printer).await, + CreateArgs::Dataset(dataset_args) => dataset::create(&client, dataset_args, printer).await, + CreateArgs::Project(project_args) => project::create(&client, project_args, printer).await, + CreateArgs::Comments(comments_args) => comments::create(&client, comments_args).await, + CreateArgs::Annotations(annotations_args) => { + annotations::create(&client, annotations_args).await + } + CreateArgs::Emails(emails_args) => emails::create(&client, emails_args).await, + CreateArgs::User(user_args) => user::create(&client, user_args, printer).await, CreateArgs::TriggerException(trigger_exception_args) => { - trigger_exception::create(&client, trigger_exception_args, printer) + trigger_exception::create(&client, trigger_exception_args, printer).await } } } diff --git a/cli/src/commands/create/project.rs b/cli/src/commands/create/project.rs index 1cfb034e..ab5695a1 100644 --- a/cli/src/commands/create/project.rs +++ b/cli/src/commands/create/project.rs @@ -23,7 +23,7 @@ pub struct CreateProjectArgs { user_ids: Vec, } -pub fn create(client: &Client, args: &CreateProjectArgs, printer: &Printer) -> Result<()> { +pub async fn create(client: &Client, args: &CreateProjectArgs, printer: &Printer) -> Result<()> { let CreateProjectArgs { name, title, @@ -40,6 +40,7 @@ pub fn create(client: &Client, args: &CreateProjectArgs, printer: &Printer) -> R }, user_ids, ) + .await .context("Operation to create a project has failed")?; info!("New project `{}` created successfully", project.name.0,); printer.print_resources(&[project])?; diff --git a/cli/src/commands/create/source.rs b/cli/src/commands/create/source.rs index a95a4e49..03eb44e8 100644 --- a/cli/src/commands/create/source.rs +++ b/cli/src/commands/create/source.rs @@ -41,7 +41,7 @@ pub struct CreateSourceArgs { transform_tag: Option, } -pub fn create(client: &Client, args: &CreateSourceArgs, printer: &Printer) -> Result<()> { +pub async fn create(client: &Client, args: &CreateSourceArgs, printer: &Printer) -> Result<()> { let CreateSourceArgs { name, title, @@ -58,6 +58,7 @@ pub fn create(client: &Client, args: &CreateSourceArgs, printer: &Printer) -> Re Some(full_name @ BucketIdentifier::FullName(_)) => Some( client .get_bucket(full_name) + .await .context("Fetching bucket for id.")? .id, ), @@ -78,6 +79,7 @@ pub fn create(client: &Client, args: &CreateSourceArgs, printer: &Printer) -> Re transform_tag: transform_tag.as_ref(), }, ) + .await .context("Operation to create a source has failed")?; info!( "New source `{}` [id: {}] created successfully", diff --git a/cli/src/commands/create/trigger_exception.rs b/cli/src/commands/create/trigger_exception.rs index 546f81a5..0e7e934b 100644 --- a/cli/src/commands/create/trigger_exception.rs +++ b/cli/src/commands/create/trigger_exception.rs @@ -21,7 +21,7 @@ pub struct CreateTriggerExceptionArgs { uid: CommentUid, } -pub fn create( +pub async fn create( client: &Client, args: &CreateTriggerExceptionArgs, _printer: &Printer, @@ -40,6 +40,7 @@ pub fn create( uid, }], ) + .await .context("Operation to create a trigger exception has failed")?; info!("New trigger exception created successfully"); Ok(()) diff --git a/cli/src/commands/create/user.rs b/cli/src/commands/create/user.rs index 951030eb..5bd48c99 100644 --- a/cli/src/commands/create/user.rs +++ b/cli/src/commands/create/user.rs @@ -34,7 +34,7 @@ pub struct CreateUserArgs { send_welcome_email: bool, } -pub fn create(client: &Client, args: &CreateUserArgs, printer: &Printer) -> Result<()> { +pub async fn create(client: &Client, args: &CreateUserArgs, printer: &Printer) -> Result<()> { let CreateUserArgs { username, email, @@ -63,6 +63,7 @@ pub fn create(client: &Client, args: &CreateUserArgs, printer: &Printer) -> Resu global_permissions, project_permissions: &project_permissions, }) + .await .context("Operation to create a user has failed")?; log::info!( "New user `{}` with email `{}` [id: {}] created successfully", @@ -74,6 +75,7 @@ pub fn create(client: &Client, args: &CreateUserArgs, printer: &Printer) -> Resu if *send_welcome_email { client .send_welcome_email(user.id.clone()) + .await .context("Operation to send welcome email failed")?; log::info!("Welcome email sent for user '{}'", user.username.0); } diff --git a/cli/src/commands/delete.rs b/cli/src/commands/delete.rs index d8ff3a78..65174f52 100644 --- a/cli/src/commands/delete.rs +++ b/cli/src/commands/delete.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use colored::Colorize; +use futures::stream::StreamExt; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -8,9 +9,9 @@ use std::sync::{ use structopt::StructOpt; use reinfer_client::{ - resources::project::ForceDeleteProject, BucketIdentifier, Client, CommentId, CommentsIter, + resources::project::ForceDeleteProject, BucketIdentifier, Client, CommentId, CommentsIterTimerange, DatasetIdentifier, ProjectName, Source, SourceIdentifier, - UserIdentifier, + UserIdentifier, MAX_PAGE_SIZE, }; use crate::progress::{Options as ProgressOptions, Progress}; @@ -103,23 +104,26 @@ pub enum DeleteArgs { }, } -pub fn run(delete_args: &DeleteArgs, client: Client) -> Result<()> { +pub async fn run(delete_args: &DeleteArgs, client: Client) -> Result<()> { match delete_args { DeleteArgs::Source { source } => { client .delete_source(source.clone()) + .await .context("Operation to delete source has failed.")?; log::info!("Deleted source."); } DeleteArgs::User { user } => { client .delete_user(user.clone()) + .await .context("Operation to delete user has failed.")?; log::info!("Deleted user."); } DeleteArgs::Comments { source, comments } => { client .delete_comments(source.clone(), comments) + .await .context("Operation to delete comments has failed.")?; log::info!("Deleted comments."); } @@ -130,7 +134,7 @@ pub fn run(delete_args: &DeleteArgs, client: Client) -> Result<()> { to_timestamp, no_progress, } => { - let source = client.get_source(source_identifier.clone())?; + let source = client.get_source(source_identifier.clone()).await?; let show_progress = !no_progress; delete_comments_in_period( &client, @@ -142,17 +146,20 @@ pub fn run(delete_args: &DeleteArgs, client: Client) -> Result<()> { }, show_progress, ) + .await .context("Operation to delete comments has failed.")?; } DeleteArgs::Dataset { dataset } => { client .delete_dataset(dataset.clone()) + .await .context("Operation to delete dataset has failed.")?; log::info!("Deleted dataset."); } DeleteArgs::Bucket { bucket } => { client .delete_bucket(bucket.clone()) + .await .context("Operation to delete bucket has failed.")?; log::info!("Deleted bucket."); } @@ -164,6 +171,7 @@ pub fn run(delete_args: &DeleteArgs, client: Client) -> Result<()> { }; client .delete_project(project, force_delete) + .await .context("Operation to delete project has failed.")?; log::info!("Deleted project."); } @@ -171,7 +179,7 @@ pub fn run(delete_args: &DeleteArgs, client: Client) -> Result<()> { Ok(()) } -fn delete_comments_in_period( +async fn delete_comments_in_period( client: &Client, source: Source, include_annotated: bool, @@ -201,54 +209,52 @@ fn delete_comments_in_period( // This is the maximum number of comments which the API permits deleting in a single call. const DELETION_BATCH_SIZE: usize = 128; + // Buffer to store comment IDs to delete - allow it to be slightly larger than the deletion // batch size so that if there's an incomplete page it'll increase the counts. - let mut comments_to_delete = - Vec::with_capacity(DELETION_BATCH_SIZE + CommentsIter::MAX_PAGE_SIZE); + let mut comments_to_delete = Vec::with_capacity(DELETION_BATCH_SIZE + MAX_PAGE_SIZE); - let delete_batch = |comment_ids: Vec| -> Result<()> { - client - .delete_comments(&source, &comment_ids) - .context("Operation to delete comments failed")?; - statistics.increment_deleted(comment_ids.len()); - Ok(()) - }; + let source_name = source.full_name(); + let comments = client.get_comments(&source_name, Some(MAX_PAGE_SIZE), timerange.clone()); + futures::pin_mut!(comments); // pin the comment stream to the stack - client - .get_comments_iter( - &source.full_name(), - Some(CommentsIter::MAX_PAGE_SIZE), - timerange, - ) - .try_for_each(|page| -> Result<()> { - let page = page.context("Operation to get comments failed")?; - let num_comments = page.len(); - let comment_ids = page - .into_iter() - .filter_map(|comment| { - if !include_annotated && comment.has_annotations { - None - } else { - Some(comment.id) - } - }) - .collect::>(); - - let num_skipped = num_comments - comment_ids.len(); - statistics.increment_skipped(num_skipped); - - comments_to_delete.extend(comment_ids); - while comments_to_delete.len() >= DELETION_BATCH_SIZE { - let remainder = comments_to_delete.split_off(DELETION_BATCH_SIZE); - delete_batch(std::mem::replace(&mut comments_to_delete, remainder))?; - } - Ok(()) - })?; + while let Some(page_result) = comments.next().await { + let page = page_result.context("Operation to get comments failed")?; + let num_comments = page.len(); + let comment_ids = page + .into_iter() + .filter_map(|comment| { + if !include_annotated && comment.has_annotations { + None + } else { + Some(comment.id) + } + }) + .collect::>(); + + let num_skipped = num_comments - comment_ids.len(); + statistics.increment_skipped(num_skipped); + + comments_to_delete.extend(comment_ids); + while comments_to_delete.len() >= DELETION_BATCH_SIZE { + let remainder = comments_to_delete.split_off(DELETION_BATCH_SIZE); + let comment_ids = std::mem::replace(&mut comments_to_delete, remainder); + client + .delete_comments(&source, &comment_ids) + .await + .context("Operation to delete comments failed")?; + statistics.increment_deleted(comment_ids.len()); + } + } // Delete any comments left over in any potential last partial batch. if !comments_to_delete.is_empty() { assert!(comments_to_delete.len() < DELETION_BATCH_SIZE); - delete_batch(comments_to_delete)?; + client + .delete_comments(&source, &comments_to_delete) + .await + .context("Operation to delete comments failed")?; + statistics.increment_deleted(comments_to_delete.len()); } } log::info!( @@ -256,6 +262,7 @@ fn delete_comments_in_period( statistics.deleted(), statistics.skipped() ); + Ok(()) } diff --git a/cli/src/commands/get/buckets.rs b/cli/src/commands/get/buckets.rs index 25a37686..abe209ed 100644 --- a/cli/src/commands/get/buckets.rs +++ b/cli/src/commands/get/buckets.rs @@ -11,15 +11,17 @@ pub struct GetBucketsArgs { bucket: Option, } -pub fn get(client: &Client, args: &GetBucketsArgs, printer: &Printer) -> Result<()> { +pub async fn get(client: &Client, args: &GetBucketsArgs, printer: &Printer) -> Result<()> { let GetBucketsArgs { bucket } = args; let buckets = if let Some(bucket) = bucket { vec![client .get_bucket(bucket.clone()) + .await .context("Operation to list buckets has failed.")?] } else { let mut buckets = client .get_buckets() + .await .context("Operation to list buckets has failed.")?; buckets.sort_unstable_by(|lhs, rhs| { (&lhs.owner.0, &lhs.name.0).cmp(&(&rhs.owner.0, &rhs.name.0)) diff --git a/cli/src/commands/get/comments.rs b/cli/src/commands/get/comments.rs index c898ee1d..6ed3d672 100644 --- a/cli/src/commands/get/comments.rs +++ b/cli/src/commands/get/comments.rs @@ -1,6 +1,7 @@ use anyhow::{bail, Context, Result}; use chrono::{DateTime, Utc}; use colored::Colorize; +use futures::stream::{StreamExt, TryStreamExt}; use reinfer_client::{ AnnotatedComment, Client, CommentId, CommentsIterTimerange, DatasetFullName, DatasetIdentifier, Entities, HasAnnotations, LabelName, Labelling, ModelVersion, PredictedLabel, Source, @@ -76,7 +77,7 @@ pub struct GetManyCommentsArgs { path: Option, } -pub fn get_single(client: &Client, args: &GetSingleCommentArgs) -> Result<()> { +pub async fn get_single(client: &Client, args: &GetSingleCommentArgs) -> Result<()> { let GetSingleCommentArgs { source, comment_id, @@ -95,8 +96,9 @@ pub fn get_single(client: &Client, args: &GetSingleCommentArgs) -> Result<()> { let mut writer: Box = file.unwrap_or_else(|| Box::new(stdout.lock())); let source = client .get_source(source.to_owned()) + .await .context("Operation to get source has failed.")?; - let comment = client.get_comment(&source.full_name(), comment_id)?; + let comment = client.get_comment(&source.full_name(), comment_id).await?; print_resources_as_json( std::iter::once(AnnotatedComment { comment, @@ -109,7 +111,7 @@ pub fn get_single(client: &Client, args: &GetSingleCommentArgs) -> Result<()> { ) } -pub fn get_many(client: &Client, args: &GetManyCommentsArgs) -> Result<()> { +pub async fn get_many(client: &Client, args: &GetManyCommentsArgs) -> Result<()> { let GetManyCommentsArgs { source, dataset, @@ -162,7 +164,7 @@ pub fn get_many(client: &Client, args: &GetManyCommentsArgs) -> Result<()> { }; if let Some(file) = file { - download_comments(client, source.clone(), file, download_options) + download_comments(client, source.clone(), file, download_options).await } else { download_comments( client, @@ -170,6 +172,7 @@ pub fn get_many(client: &Client, args: &GetManyCommentsArgs) -> Result<()> { io::stdout().lock(), download_options, ) + .await } } @@ -183,7 +186,27 @@ struct CommentDownloadOptions { show_progress: bool, } -fn download_comments( +async fn make_progress( + client: &Client, + statistics: &Arc, + dataset_name: Option<&DatasetFullName>, +) -> Result { + Ok(get_comments_progress_bar( + if let Some(dataset_name) = dataset_name { + *client + .get_statistics(dataset_name) + .await + .context("Operation to get comment count has failed..")? + .num_comments as u64 + } else { + 0 + }, + statistics, + dataset_name.is_some(), + )) +} + +async fn download_comments( client: &Client, source_identifier: SourceIdentifier, mut writer: impl Write, @@ -191,30 +214,17 @@ fn download_comments( ) -> Result<()> { let source = client .get_source(source_identifier) + .await .context("Operation to get source has failed.")?; let statistics = Arc::new(Statistics::new()); - let make_progress = |dataset_name: Option<&DatasetFullName>| -> Result { - Ok(get_comments_progress_bar( - if let Some(dataset_name) = dataset_name { - *client - .get_statistics(dataset_name) - .context("Operation to get comment count has failed..")? - .num_comments as u64 - } else { - 0 - }, - &statistics, - dataset_name.is_some(), - )) - }; - if let Some(dataset_identifier) = options.dataset_identifier { let dataset = client .get_dataset(dataset_identifier) + .await .context("Operation to get dataset has failed.")?; let dataset_name = dataset.full_name(); let _progress = if options.show_progress { - Some(make_progress(Some(&dataset_name))?) + Some(make_progress(client, &statistics, Some(&dataset_name)).await?) } else { None }; @@ -227,7 +237,8 @@ fn download_comments( &statistics, options.include_predictions, writer, - )?; + ) + .await?; } else { get_comments_from_uids( client, @@ -238,20 +249,21 @@ fn download_comments( options.model_version, writer, options.timerange, - )?; + ) + .await?; } } else { let _progress = if options.show_progress { - Some(make_progress(None)?) + Some(make_progress(client, &statistics, None).await?) } else { None }; client - .get_comments_iter(&source.full_name(), None, options.timerange) + .get_comments(&source.full_name(), None, options.timerange) + .map(|page| page.context("Operation to get comments has failed.")) .try_for_each(|page| { - let page = page.context("Operation to get comments has failed.")?; statistics.add_comments(page.len()); - print_resources_as_json( + futures::future::ready(print_resources_as_json( page.into_iter().map(|comment| AnnotatedComment { comment, labelling: None, @@ -260,8 +272,9 @@ fn download_comments( moon_forms: None, }), &mut writer, - ) - })?; + )) + }) + .await?; } log::info!( "Successfully downloaded {} comments [{} annotated].", @@ -272,7 +285,7 @@ fn download_comments( } #[allow(clippy::too_many_arguments)] -fn get_comments_from_uids( +async fn get_comments_from_uids( client: &Client, dataset_name: DatasetFullName, source: Source, @@ -282,84 +295,42 @@ fn get_comments_from_uids( mut writer: impl Write, timerange: CommentsIterTimerange, ) -> Result<()> { - client - .get_comments_iter(&source.full_name(), None, timerange) - .try_for_each(|page| { - let page = page.context("Operation to get comments has failed.")?; - if page.is_empty() { - return Ok(()); - } + let source_full_name = &source.full_name(); + let comments = client.get_comments(source_full_name, None, timerange); + futures::pin_mut!(comments); + while let Some(page_result) = comments.next().await { + let page = page_result.context("Operation to get labellings has failed")?; + + if page.is_empty() { + return Ok(()); + } + + statistics.add_comments(page.len()); + let annotations = client + .get_labellings(&dataset_name, page.iter().map(|comment| &comment.uid)) + .await + .context("Operation to get annotations has failed")?; + + let comments = + page.into_iter() + .zip(annotations.into_iter()) + .map(|(comment, mut annotated)| { + if !include_predictions { + annotated = annotated.without_predictions(); + } + annotated.comment = comment; + if annotated.has_annotations() { + statistics.add_annotated(1); + } + annotated + }); + print_resources_as_json(comments, &mut writer)?; + } - statistics.add_comments(page.len()); - - if let Some(model_version) = &model_version { - let model_version = model_version - .parse::() - .context("Invalid model version")?; - let predictions = client - .get_comment_predictions( - &dataset_name, - &ModelVersion(model_version), - page.iter().map(|comment| &comment.uid), - ) - .context("Operation to get predictions has failed.")?; - // since predict-comments endpoint doesn't return some fields, - // they are set to None or [] here - let comments = - page.into_iter() - .zip(predictions.into_iter()) - .map(|(comment, prediction)| AnnotatedComment { - comment, - labelling: Some(vec![Labelling { - group: DEFAULT_LABEL_GROUP_NAME.clone(), - assigned: Vec::new(), - dismissed: Vec::new(), - predicted: prediction.labels.map(|auto_threshold_labels| { - auto_threshold_labels - .iter() - .map(|auto_threshold_label| PredictedLabel { - name: LabelName(auto_threshold_label.name.join(" > ")), - sentiment: None, - probability: auto_threshold_label.probability, - auto_thresholds: Some( - auto_threshold_label.auto_thresholds.to_vec(), - ), - }) - .collect() - }), - }]), - entities: Some(Entities { - assigned: Vec::new(), - dismissed: Vec::new(), - predicted: prediction.entities, - }), - thread_properties: None, - moon_forms: None, - }); - print_resources_as_json(comments, &mut writer) - } else { - let annotations = client - .get_labellings(&dataset_name, page.iter().map(|comment| &comment.uid)) - .context("Operation to get labellings has failed.")?; - let comments = page.into_iter().zip(annotations.into_iter()).map( - |(comment, mut annotated)| { - if !include_predictions { - annotated = annotated.without_predictions(); - } - annotated.comment = comment; - if annotated.has_annotations() { - statistics.add_annotated(1); - } - annotated - }, - ); - print_resources_as_json(comments, &mut writer) - } - })?; Ok(()) } -fn get_reviewed_comments_in_bulk( +async fn get_reviewed_comments_in_bulk( client: &Client, dataset_name: DatasetFullName, source: Source, @@ -367,21 +338,22 @@ fn get_reviewed_comments_in_bulk( include_predictions: bool, mut writer: impl Write, ) -> Result<()> { - client - .get_labellings_iter(&dataset_name, &source.id, include_predictions, None) - .try_for_each(|page| { - let page = page.context("Operation to get labellings has failed.")?; - statistics.add_comments(page.len()); - statistics.add_annotated(page.len()); - let comments = page.into_iter().map(|comment| { - if !include_predictions { - comment.without_predictions() - } else { - comment - } - }); - print_resources_as_json(comments, &mut writer) - })?; + let comments = client.get_labellings_iter(&dataset_name, &source.id, include_predictions, None); + futures::pin_mut!(comments); + while let Some(page_result) = comments.next().await { + let page = page_result.context("Operation to get a comment page has failed")?; + statistics.add_comments(page.len()); + statistics.add_annotated(page.len()); + + let comments = page.into_iter().map(|comment| { + if !include_predictions { + comment.without_predictions() + } else { + comment + } + }); + print_resources_as_json(comments, &mut writer)?; + } Ok(()) } diff --git a/cli/src/commands/get/datasets.rs b/cli/src/commands/get/datasets.rs index 13f5b0ef..a336e844 100644 --- a/cli/src/commands/get/datasets.rs +++ b/cli/src/commands/get/datasets.rs @@ -11,15 +11,17 @@ pub struct GetDatasetsArgs { dataset: Option, } -pub fn get(client: &Client, args: &GetDatasetsArgs, printer: &Printer) -> Result<()> { +pub async fn get(client: &Client, args: &GetDatasetsArgs, printer: &Printer) -> Result<()> { let GetDatasetsArgs { dataset } = args; let datasets = if let Some(dataset) = dataset { vec![client .get_dataset(dataset.clone()) + .await .context("Operation to list datasets has failed.")?] } else { let mut datasets = client .get_datasets() + .await .context("Operation to list datasets has failed.")?; datasets.sort_unstable_by(|lhs, rhs| { (&lhs.owner.0, &lhs.name.0).cmp(&(&rhs.owner.0, &rhs.name.0)) diff --git a/cli/src/commands/get/mod.rs b/cli/src/commands/get/mod.rs index 0297c957..7933566d 100644 --- a/cli/src/commands/get/mod.rs +++ b/cli/src/commands/get/mod.rs @@ -64,17 +64,17 @@ pub enum GetArgs { CurrentUser, } -pub fn run(args: &GetArgs, client: Client, printer: &Printer) -> Result<()> { +pub async fn run(args: &GetArgs, client: Client, printer: &Printer) -> Result<()> { match args { - GetArgs::Buckets(args) => buckets::get(&client, args, printer), - GetArgs::Comment(args) => comments::get_single(&client, args), - GetArgs::Comments(args) => comments::get_many(&client, args), - GetArgs::Datasets(args) => datasets::get(&client, args, printer), - GetArgs::Projects(args) => projects::get(&client, args, printer), - GetArgs::Sources(args) => sources::get(&client, args, printer), - GetArgs::Triggers(args) => triggers::get(&client, args, printer), - GetArgs::TriggerComments(args) => triggers::get_trigger_comments(&client, args), - GetArgs::Users(args) => users::get(&client, args, printer), - GetArgs::CurrentUser => users::get_current_user(&client, printer), + GetArgs::Buckets(args) => buckets::get(&client, args, printer).await, + GetArgs::Comment(args) => comments::get_single(&client, args).await, + GetArgs::Comments(args) => comments::get_many(&client, args).await, + GetArgs::Datasets(args) => datasets::get(&client, args, printer).await, + GetArgs::Projects(args) => projects::get(&client, args, printer).await, + GetArgs::Sources(args) => sources::get(&client, args, printer).await, + GetArgs::Triggers(args) => triggers::get(&client, args, printer).await, + GetArgs::TriggerComments(args) => triggers::get_trigger_comments(&client, args).await, + GetArgs::Users(args) => users::get(&client, args, printer).await, + GetArgs::CurrentUser => users::get_current_user(&client, printer).await, } } diff --git a/cli/src/commands/get/projects.rs b/cli/src/commands/get/projects.rs index b349b188..30aa691d 100644 --- a/cli/src/commands/get/projects.rs +++ b/cli/src/commands/get/projects.rs @@ -11,15 +11,17 @@ pub struct GetProjectsArgs { project: Option, } -pub fn get(client: &Client, args: &GetProjectsArgs, printer: &Printer) -> Result<()> { +pub async fn get(client: &Client, args: &GetProjectsArgs, printer: &Printer) -> Result<()> { let GetProjectsArgs { project } = args; let projects = if let Some(project) = project { vec![client .get_project(project) + .await .context("Operation to list projects has failed.")?] } else { let mut projects = client .get_projects() + .await .context("Operation to list projects has failed.")?; projects.sort_unstable_by(|lhs, rhs| lhs.name.0.cmp(&rhs.name.0)); projects diff --git a/cli/src/commands/get/sources.rs b/cli/src/commands/get/sources.rs index 973d34b1..e10a51a2 100644 --- a/cli/src/commands/get/sources.rs +++ b/cli/src/commands/get/sources.rs @@ -12,15 +12,17 @@ pub struct GetSourcesArgs { source: Option, } -pub fn get(client: &Client, args: &GetSourcesArgs, printer: &Printer) -> Result<()> { +pub async fn get(client: &Client, args: &GetSourcesArgs, printer: &Printer) -> Result<()> { let GetSourcesArgs { source } = args; let sources = if let Some(source) = source { vec![client .get_source(source.clone()) + .await .context("Operation to list sources has failed.")?] } else { let mut sources = client .get_sources() + .await .context("Operation to list sources has failed.")?; sources.sort_unstable_by(|lhs, rhs| { (&lhs.owner.0, &lhs.name.0).cmp(&(&rhs.owner.0, &rhs.name.0)) @@ -30,6 +32,7 @@ pub fn get(client: &Client, args: &GetSourcesArgs, printer: &Printer) -> Result< let buckets: HashMap<_, _> = client .get_buckets() + .await .context("Operation to list buckets has failed.")? .into_iter() .map(|bucket| (bucket.id.clone(), bucket)) diff --git a/cli/src/commands/get/triggers.rs b/cli/src/commands/get/triggers.rs index 4787f28c..bfae2436 100644 --- a/cli/src/commands/get/triggers.rs +++ b/cli/src/commands/get/triggers.rs @@ -31,20 +31,22 @@ pub struct GetTriggerCommentsArgs { individual_advance: bool, } -pub fn get(client: &Client, args: &GetTriggersArgs, printer: &Printer) -> Result<()> { +pub async fn get(client: &Client, args: &GetTriggersArgs, printer: &Printer) -> Result<()> { let GetTriggersArgs { dataset } = args; let dataset_name = client .get_dataset(dataset.clone()) + .await .context("Operation to get dataset has failed.")? .full_name(); let mut triggers = client .get_triggers(&dataset_name) + .await .context("Operation to list triggers has failed.")?; triggers.sort_unstable_by(|lhs, rhs| lhs.name.0.cmp(&rhs.name.0)); printer.print_resources(&triggers) } -pub fn get_trigger_comments(client: &Client, args: &GetTriggerCommentsArgs) -> Result<()> { +pub async fn get_trigger_comments(client: &Client, args: &GetTriggerCommentsArgs) -> Result<()> { let GetTriggerCommentsArgs { trigger, size, @@ -56,6 +58,7 @@ pub fn get_trigger_comments(client: &Client, args: &GetTriggerCommentsArgs) -> R Some(delay) => loop { let batch = client .fetch_trigger_comments(trigger, *size) + .await .context("Operation to fetch trigger comments failed.")?; if batch.results.is_empty() { if batch.filtered == 0 { @@ -63,6 +66,7 @@ pub fn get_trigger_comments(client: &Client, args: &GetTriggerCommentsArgs) -> R } else { client .advance_trigger(trigger, batch.sequence_id) + .await .context("Operation to advance trigger for batch failed.")?; } continue; @@ -75,18 +79,21 @@ pub fn get_trigger_comments(client: &Client, args: &GetTriggerCommentsArgs) -> R if *individual_advance { client .advance_trigger(trigger, result.sequence_id) + .await .context("Operation to advance trigger for comment failed.")?; } } if needs_final_advance { client .advance_trigger(trigger, batch.sequence_id) + .await .context("Operation to advance trigger for batch failed.")?; } }, None => { let batch = client .fetch_trigger_comments(trigger, *size) + .await .context("Operation to fetch trigger comments failed.")?; print_resources_as_json(Some(&batch), io::stdout().lock()) } diff --git a/cli/src/commands/get/users.rs b/cli/src/commands/get/users.rs index 0db45cfa..220d4acc 100644 --- a/cli/src/commands/get/users.rs +++ b/cli/src/commands/get/users.rs @@ -11,27 +11,30 @@ pub struct GetUsersArgs { user: Option, } -pub fn get(client: &Client, args: &GetUsersArgs, printer: &Printer) -> Result<()> { +pub async fn get(client: &Client, args: &GetUsersArgs, printer: &Printer) -> Result<()> { let GetUsersArgs { user } = args; match user { Some(user_id) => { let user = client .get_user(user_id.clone()) + .await .context("Operation to get user has failed.")?; printer.print_resources(&[user]) } None => { let users = client .get_users() + .await .context("Operation to list users has failed.")?; printer.print_resources(&users) } } } -pub fn get_current_user(client: &Client, printer: &Printer) -> Result<()> { +pub async fn get_current_user(client: &Client, printer: &Printer) -> Result<()> { let user = client .get_current_user() + .await .context("Operation to get the current user has failed.")?; printer.print_resources(&[user]) } diff --git a/cli/src/commands/update/dataset.rs b/cli/src/commands/update/dataset.rs index fc25d5ca..43e8b66d 100644 --- a/cli/src/commands/update/dataset.rs +++ b/cli/src/commands/update/dataset.rs @@ -1,11 +1,11 @@ use crate::printer::Printer; use anyhow::{Context, Result}; use log::info; -use reinfer_client::{Client, DatasetIdentifier, SourceId, SourceIdentifier, UpdateDataset}; +use reinfer_client::{Client, DatasetIdentifier, SourceIdentifier, UpdateDataset}; use structopt::StructOpt; /// Update a dataset. -#[derive(Debug, StructOpt)] +#[derive(Clone, Debug, StructOpt)] pub struct UpdateDatasetArgs { #[structopt(name = "dataset")] /// Name or id of the dataset to delete @@ -24,29 +24,30 @@ pub struct UpdateDatasetArgs { sources: Option>, } -pub fn update(client: &Client, args: &UpdateDatasetArgs, printer: &Printer) -> Result<()> { +pub async fn update(client: &Client, args: UpdateDatasetArgs, printer: &Printer) -> Result<()> { let UpdateDatasetArgs { dataset, title, description, - sources, + mut sources, } = args; - let source_ids = sources - .as_ref() - .map::>, _>(|sources| { - sources - .iter() - .map(|source| Ok(client.get_source(source.clone())?.id)) - .collect() - }) - .transpose() - .context("Operation to get sources failed")?; + let source_ids = match sources.take() { + Some(sources) => Some( + futures::future::try_join_all(sources.into_iter().map(|identifier| async { + let source_id: Result<_> = Ok(client.get_source(identifier).await?.id); + source_id + })) + .await?, + ), + None => None, + }; let dataset_full_name = match dataset { DatasetIdentifier::FullName(name) => name.to_owned(), dataset @ DatasetIdentifier::Id(_) => client .get_dataset(dataset.to_owned()) + .await .context("Fetching dataset id.")? .full_name(), }; @@ -60,6 +61,7 @@ pub fn update(client: &Client, args: &UpdateDatasetArgs, printer: &Printer) -> R description: description.as_deref(), }, ) + .await .context("Operation to update a dataset has failed.")?; info!( "Dataset `{}` [id: {}] updated successfully", diff --git a/cli/src/commands/update/mod.rs b/cli/src/commands/update/mod.rs index 2784118a..baaefde3 100644 --- a/cli/src/commands/update/mod.rs +++ b/cli/src/commands/update/mod.rs @@ -12,7 +12,7 @@ use anyhow::Result; use reinfer_client::Client; use structopt::StructOpt; -#[derive(Debug, StructOpt)] +#[derive(Clone, Debug, StructOpt)] pub enum UpdateArgs { #[structopt(name = "source")] /// Update an existing source @@ -31,11 +31,11 @@ pub enum UpdateArgs { Users(UpdateUsersArgs), } -pub fn run(update_args: &UpdateArgs, client: Client, printer: &Printer) -> Result<()> { +pub async fn run(update_args: UpdateArgs, client: Client, printer: &Printer) -> Result<()> { match update_args { - UpdateArgs::Source(source_args) => source::update(&client, source_args, printer), - UpdateArgs::Dataset(dataset_args) => dataset::update(&client, dataset_args, printer), - UpdateArgs::Project(project_args) => project::update(&client, project_args, printer), - UpdateArgs::Users(users_args) => users::update(&client, users_args), + UpdateArgs::Source(source_args) => source::update(&client, source_args, printer).await, + UpdateArgs::Dataset(dataset_args) => dataset::update(&client, dataset_args, printer).await, + UpdateArgs::Project(project_args) => project::update(&client, project_args, printer).await, + UpdateArgs::Users(users_args) => users::update(&client, &users_args).await, } } diff --git a/cli/src/commands/update/project.rs b/cli/src/commands/update/project.rs index 5688f3b4..1966b988 100644 --- a/cli/src/commands/update/project.rs +++ b/cli/src/commands/update/project.rs @@ -4,7 +4,7 @@ use log::info; use reinfer_client::{Client, ProjectName, UpdateProject}; use structopt::StructOpt; -#[derive(Debug, StructOpt)] +#[derive(Clone, Debug, StructOpt)] pub struct UpdateProjectArgs { #[structopt(name = "project-name")] /// Full name of the project @@ -19,7 +19,7 @@ pub struct UpdateProjectArgs { description: Option, } -pub fn update(client: &Client, args: &UpdateProjectArgs, printer: &Printer) -> Result<()> { +pub async fn update(client: &Client, args: UpdateProjectArgs, printer: &Printer) -> Result<()> { let UpdateProjectArgs { name, title, @@ -28,12 +28,13 @@ pub fn update(client: &Client, args: &UpdateProjectArgs, printer: &Printer) -> R let project = client .update_project( - name, + &name, UpdateProject { title: title.as_deref(), description: description.as_deref(), }, ) + .await .context("Operation to update a project has failed")?; info!("Project `{}` updated successfully", project.name.0,); printer.print_resources(&[project])?; diff --git a/cli/src/commands/update/source.rs b/cli/src/commands/update/source.rs index 7d8459dc..0ea56826 100644 --- a/cli/src/commands/update/source.rs +++ b/cli/src/commands/update/source.rs @@ -4,7 +4,7 @@ use log::info; use reinfer_client::{BucketIdentifier, Client, SourceIdentifier, TransformTag, UpdateSource}; use structopt::StructOpt; -#[derive(Debug, StructOpt)] +#[derive(Clone, Debug, StructOpt)] pub struct UpdateSourceArgs { #[structopt(name = "source")] /// Id or full name of the source to update @@ -31,7 +31,7 @@ pub struct UpdateSourceArgs { transform_tag: Option, } -pub fn update(client: &Client, args: &UpdateSourceArgs, printer: &Printer) -> Result<()> { +pub async fn update(client: &Client, args: UpdateSourceArgs, printer: &Printer) -> Result<()> { let UpdateSourceArgs { source, title, @@ -46,6 +46,7 @@ pub fn update(client: &Client, args: &UpdateSourceArgs, printer: &Printer) -> Re Some(full_name @ BucketIdentifier::FullName(_)) => Some( client .get_bucket(full_name) + .await .context("Fetching bucket for id.")? .id, ), @@ -56,6 +57,7 @@ pub fn update(client: &Client, args: &UpdateSourceArgs, printer: &Printer) -> Re SourceIdentifier::FullName(name) => name, source @ SourceIdentifier::Id(_) => client .get_source(source) + .await .context("Fetching source id.")? .full_name(), }; @@ -66,12 +68,13 @@ pub fn update(client: &Client, args: &UpdateSourceArgs, printer: &Printer) -> Re UpdateSource { title: title.as_deref(), description: description.as_deref(), - should_translate: *should_translate, + should_translate, bucket_id, sensitive_properties: None, transform_tag: transform_tag.as_ref(), }, ) + .await .context("Operation to update a source has failed")?; info!( "Source `{}` [id: {}] updated successfully", diff --git a/cli/src/commands/update/users.rs b/cli/src/commands/update/users.rs index 77d8f2dc..afc58baf 100644 --- a/cli/src/commands/update/users.rs +++ b/cli/src/commands/update/users.rs @@ -15,7 +15,7 @@ use structopt::StructOpt; use crate::progress::{Options as ProgressOptions, Progress}; -#[derive(Debug, StructOpt)] +#[derive(Clone, Debug, StructOpt)] pub struct UpdateUsersArgs { #[structopt(short = "f", long = "file", parse(from_os_str))] /// Path to JSON file with users. If not specified, stdin will be used. @@ -26,7 +26,7 @@ pub struct UpdateUsersArgs { no_progress: bool, } -pub fn update(client: &Client, args: &UpdateUsersArgs) -> Result<()> { +pub async fn update(client: &Client, args: &UpdateUsersArgs) -> Result<()> { let statistics = match &args.input_file { Some(input_file) => { info!("Processing users from file `{}`", input_file.display(),); @@ -43,7 +43,7 @@ pub fn update(client: &Client, args: &UpdateUsersArgs) -> Result<()> { } else { Some(progress_bar(file_metadata.len(), &statistics)) }; - update_users_from_reader(client, file, &statistics)?; + update_users_from_reader(client, file, &statistics).await?; if let Some(mut progress) = progress { progress.done(); } @@ -52,7 +52,7 @@ pub fn update(client: &Client, args: &UpdateUsersArgs) -> Result<()> { None => { info!("Processing users from stdin",); let statistics = Statistics::new(); - update_users_from_reader(client, BufReader::new(io::stdin()), &statistics)?; + update_users_from_reader(client, BufReader::new(io::stdin()), &statistics).await?; statistics } }; @@ -74,7 +74,7 @@ struct UserLine { update: UpdateUser, } -fn update_users_from_reader( +async fn update_users_from_reader( client: &Client, mut users: impl BufRead, statistics: &Statistics, @@ -103,6 +103,7 @@ fn update_users_from_reader( // Upload users client .post_user(&user_line.id, user_line.update) + .await .context("Could not update user")?; statistics.add_user(); diff --git a/cli/src/main.rs b/cli/src/main.rs index d2e9e5c9..19e59aa3 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -14,6 +14,7 @@ use reinfer_client::{ }; use std::{fs, io, path::PathBuf, process}; use structopt::{clap::Shell as ClapShell, StructOpt}; +use tokio::runtime::Builder as RuntimeBuilder; use crate::{ args::{Args, Command, Shell}, @@ -22,7 +23,7 @@ use crate::{ printer::Printer, }; -fn run(args: Args) -> Result<()> { +async fn run(args: Args) -> Result<()> { let config_path = find_configuration(&args)?; let config = config::read_reinfer_config(&config_path)?; let printer = Printer::new(args.output); @@ -41,16 +42,21 @@ fn run(args: Args) -> Result<()> { Ok(()) } Command::Get { get_args } => { - get::run(get_args, client_from_args(&args, &config)?, &printer) + get::run(get_args, client_from_args(&args, &config)?, &printer).await } Command::Delete { delete_args } => { - delete::run(delete_args, client_from_args(&args, &config)?) + delete::run(delete_args, client_from_args(&args, &config)?).await } Command::Create { create_args } => { - create::run(create_args, client_from_args(&args, &config)?, &printer) + create::run(create_args, client_from_args(&args, &config)?, &printer).await } Command::Update { update_args } => { - update::run(update_args, client_from_args(&args, &config)?, &printer) + update::run( + update_args.clone(), + client_from_args(&args, &config)?, + &printer, + ) + .await } } } @@ -148,7 +154,12 @@ fn main() { let args = Args::from_args(); utils::init_env_logger(args.verbose); - if let Err(error) = run(args) { + if let Err(error) = RuntimeBuilder::new_current_thread() + .enable_all() + .build() + .context("Could not create tokio runtime") + .and_then(|runtime| runtime.block_on(run(args))) + { error!("An error occurred:"); for cause in error.chain() { error!(" |- {}", cause);