From 55d4327f6e0333011945740920461bbba18fb1e7 Mon Sep 17 00:00:00 2001 From: Wenhai Zhu Date: Fri, 15 Jul 2022 11:42:30 -0700 Subject: [PATCH 1/3] Expose GCP reader Summary: Expose GCP reader Differential Revision: D37727313 fbshipit-source-id: df0676f78c8d42535b3dff24fa4672644c124b90 --- fbpcf/gcp/GCSUtil.cpp | 4 ++++ fbpcf/gcp/GCSUtil.h | 3 +-- fbpcf/io/cloud_util/CloudFileUtil.cpp | 6 +++++- fbpcf/io/cloud_util/GCSFileReader.cpp | 15 ++++++++------- fbpcf/io/cloud_util/GCSFileReader.h | 5 ++--- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/fbpcf/gcp/GCSUtil.cpp b/fbpcf/gcp/GCSUtil.cpp index 6f17306d..ee67b21c 100644 --- a/fbpcf/gcp/GCSUtil.cpp +++ b/fbpcf/gcp/GCSUtil.cpp @@ -58,4 +58,8 @@ GCSObjectReference uriToObjectReference(std::string url) { return GCSObjectReference{bucket, path.substr(pos + 1)}; } +std::unique_ptr createGCSClient() { + return std::make_unique(); +} + } // namespace fbpcf::gcp diff --git a/fbpcf/gcp/GCSUtil.h b/fbpcf/gcp/GCSUtil.h index 69839b41..6820c30c 100644 --- a/fbpcf/gcp/GCSUtil.h +++ b/fbpcf/gcp/GCSUtil.h @@ -22,6 +22,5 @@ struct GCSObjectReference { }; GCSObjectReference uriToObjectReference(std::string url); -std::unique_ptr createGCSClient( - const GCSClientOption& option); +std::unique_ptr createGCSClient(); } // namespace fbpcf::gcp diff --git a/fbpcf/io/cloud_util/CloudFileUtil.cpp b/fbpcf/io/cloud_util/CloudFileUtil.cpp index 2f39cba9..ebe58daa 100644 --- a/fbpcf/io/cloud_util/CloudFileUtil.cpp +++ b/fbpcf/io/cloud_util/CloudFileUtil.cpp @@ -9,6 +9,8 @@ #include #include "fbpcf/aws/S3Util.h" #include "fbpcf/exception/PcfException.h" +#include "fbpcf/gcp/GCSUtil.h" +#include "fbpcf/io/cloud_util/GCSFileReader.h" #include "fbpcf/io/cloud_util/S3Client.h" #include "fbpcf/io/cloud_util/S3FileReader.h" #include "fbpcf/io/cloud_util/S3FileUploader.h" @@ -58,8 +60,10 @@ std::unique_ptr getCloudFileReader(const std::string& filePath) { fbpcf::cloudio::S3Client::getInstance( fbpcf::aws::S3ClientOption{.region = ref.region}) .getS3Client()); + } else if (fileType == CloudFileType::GCS) { + return std::make_unique(fbpcf::gcp::createGCSClient()); } else { - return nullptr; + throw fbpcf::PcfException("Not supported yet."); } } diff --git a/fbpcf/io/cloud_util/GCSFileReader.cpp b/fbpcf/io/cloud_util/GCSFileReader.cpp index e88cc377..b18d7229 100644 --- a/fbpcf/io/cloud_util/GCSFileReader.cpp +++ b/fbpcf/io/cloud_util/GCSFileReader.cpp @@ -12,8 +12,8 @@ namespace fbpcf::cloudio { -template -std::string GCSFileReader::readBytes( +// template +std::string GCSFileReader::readBytes( const std::string& filePath, std::size_t start, std::size_t end) { @@ -29,15 +29,16 @@ std::string GCSFileReader::readBytes( return ss.str(); } -template -size_t GCSFileReader::getFileContentLength( - const std::string& filePath) { +// template +size_t GCSFileReader::getFileContentLength(const std::string& filePath) { const auto& ref = fbpcf::gcp::uriToObjectReference(filePath); auto outcome = GCSClient_->GetObjectMetadata(ref.bucket, ref.key); if (!outcome) { - throw GcpException{"Error getting object metadata for object " + ref.key}; + throw GcpException{ + "Error getting object metadata for object " + ref.key + + " Reason: " + outcome.status().message()}; } - return outcome.size(); + return outcome->size(); } } // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSFileReader.h b/fbpcf/io/cloud_util/GCSFileReader.h index fa77e639..441ca81b 100644 --- a/fbpcf/io/cloud_util/GCSFileReader.h +++ b/fbpcf/io/cloud_util/GCSFileReader.h @@ -14,10 +14,9 @@ #include "fbpcf/io/cloud_util/IFileReader.h" namespace fbpcf::cloudio { -template class GCSFileReader : public IFileReader { public: - explicit GCSFileReader(std::shared_ptr client) + explicit GCSFileReader(std::shared_ptr client) : GCSClient_{std::move(client)} {} std::string readBytes( @@ -28,7 +27,7 @@ class GCSFileReader : public IFileReader { size_t getFileContentLength(const std::string& filePath) override; private: - std::shared_ptr GCSClient_; + std::shared_ptr GCSClient_; }; } // namespace fbpcf::cloudio From b19781ba2360b6434ee2778666f88ecff1418d3a Mon Sep 17 00:00:00 2001 From: Achyut Agarwal Date: Fri, 15 Jul 2022 11:42:30 -0700 Subject: [PATCH 2/3] Support GCP Uploader (#274) Summary: Pull Request resolved: https://github.com/facebookresearch/fbpcf/pull/274 Implementing Resumable Uploads for GCS as Multipart upload using a cpp library is currently not available using GCP. Changed earlier implementation of using S3 AWS multipart to now using GCP's resumable upload Differential Revision: D37804966 fbshipit-source-id: 792447aee94939dfd040518f546758214f76c325 --- fbpcf/io/cloud_util/CloudFileUtil.cpp | 25 +++++++++----- fbpcf/io/cloud_util/GCSFileUploader.cpp | 45 +++++++++++++++++++++++++ fbpcf/io/cloud_util/GCSFileUploader.h | 35 +++++++++++++++++++ 3 files changed, 96 insertions(+), 9 deletions(-) create mode 100644 fbpcf/io/cloud_util/GCSFileUploader.cpp create mode 100644 fbpcf/io/cloud_util/GCSFileUploader.h diff --git a/fbpcf/io/cloud_util/CloudFileUtil.cpp b/fbpcf/io/cloud_util/CloudFileUtil.cpp index ebe58daa..8097c751 100644 --- a/fbpcf/io/cloud_util/CloudFileUtil.cpp +++ b/fbpcf/io/cloud_util/CloudFileUtil.cpp @@ -6,11 +6,13 @@ */ #include "fbpcf/io/cloud_util/CloudFileUtil.h" +#include #include #include "fbpcf/aws/S3Util.h" #include "fbpcf/exception/PcfException.h" #include "fbpcf/gcp/GCSUtil.h" #include "fbpcf/io/cloud_util/GCSFileReader.h" +#include "fbpcf/io/cloud_util/GCSFileUploader.h" #include "fbpcf/io/cloud_util/S3Client.h" #include "fbpcf/io/cloud_util/S3FileReader.h" #include "fbpcf/io/cloud_util/S3FileUploader.h" @@ -18,15 +20,17 @@ namespace fbpcf::cloudio { CloudFileType getCloudFileType(const std::string& filePath) { - // S3 file format: - // 1. https://bucket-name.s3.region.amazonaws.com/key-name - // 2. https://bucket-name.s3-region.amazonaws.com/key-name - // 3. s3://bucket-name/key-name - // GCS file format: - // 1. https://storage.cloud.google.com/bucket-name/key-name - // 2. https://bucket-name.storage.googleapis.com/key-name - // 3. https://storage.googleapis.com/bucket-name/key-name - // 4. gs://bucket-name/key-name + /* + * S3 file format: + * 1. https://bucket-name.s3.region.amazonaws.com/key-name + * 2. https://bucket-name.s3-region.amazonaws.com/key-name + * 3. s3://bucket-name/key-name + * GCS file format: + * 1. https://storage.cloud.google.com/bucket-name/key-name + * 2. https://bucket-name.storage.googleapis.com/key-name + * 3. https://storage.googleapis.com/bucket-name/key-name + * 4. gs://bucket-name/key-name + */ static const re2::RE2 s3Regex1( "https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+"); static const re2::RE2 s3Regex2( @@ -77,6 +81,9 @@ std::unique_ptr getCloudFileUploader( fbpcf::aws::S3ClientOption{.region = ref.region}) .getS3Client(), filePath); + } else if (fileType == CloudFileType::GCS) { + return std::make_unique( + fbpcf::gcp::createGCSClient(), filePath); } else { throw fbpcf::PcfException("Not supported yet."); } diff --git a/fbpcf/io/cloud_util/GCSFileUploader.cpp b/fbpcf/io/cloud_util/GCSFileUploader.cpp new file mode 100644 index 00000000..cb1ab287 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSFileUploader.cpp @@ -0,0 +1,45 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include "fbpcf/exception/GcpException.h" +#include "fbpcf/gcp/GCSUtil.h" +#include "fbpcf/io/cloud_util/GCSFileUploader.h" + +namespace fbpcf::cloudio { +static const std::string FILE_TYPE = "text/csv"; + +void GCSFileUploader::init() {} + +int32_t GCSFileUploader::upload(std::vector& buf) { + XLOG(INFO) << "Start resumable upload. "; + const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_); + std::string bucket_ = ref.bucket; + std::string object_ = ref.key; + + namespace gcs = ::google::cloud::storage; + using ::google::cloud::StatusOr; + std::string str(buf.begin(), buf.end()); + + StatusOr object_metadata = gcsClient_->InsertObject( + bucket_, object_, str, gcs::ContentType(FILE_TYPE)); + + if (!object_metadata) { + throw GcpException{ + "Resumable upload failed: " + object_metadata.status().message()}; + return 0; + } + XLOG(INFO) << " Resumable upload successful "; + XLOG(INFO) << "Bucket: " << bucket_ << ", Object Name: " << object_; + return str.size(); +} + +int GCSFileUploader::complete() { + return 0; +} +} // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSFileUploader.h b/fbpcf/io/cloud_util/GCSFileUploader.h new file mode 100644 index 00000000..c099ec00 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSFileUploader.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +#include +#include "fbpcf/io/cloud_util/IFileUploader.h" + +namespace fbpcf::cloudio { +class GCSFileUploader : public IFileUploader { + public: + explicit GCSFileUploader( + std::shared_ptr gcsClient, + const std::string& filePath) + : gcsClient_{std::move(gcsClient)}, filePath_{filePath} { + init(); + } + int upload(std::vector& buf) override; + int complete() override; + + private: + void init() override; + + std::shared_ptr gcsClient_; + const std::string filePath_; +}; + +} // namespace fbpcf::cloudio From 3f468d717f51e1d7655d8c846de17156d52b30d0 Mon Sep 17 00:00:00 2001 From: Achyut Agarwal Date: Fri, 15 Jul 2022 11:42:54 -0700 Subject: [PATCH 3/3] Making GCS Client Singleton (#280) Summary: Pull Request resolved: https://github.com/facebookresearch/fbpcf/pull/280 Making the GCS Client a singleton Differential Revision: D37853909 fbshipit-source-id: 49293778ec8ef6da9ede5dff5f8561fcda31fbdc --- fbpcf/io/cloud_util/CloudFileUtil.cpp | 13 +++++++--- fbpcf/io/cloud_util/GCSClient.cpp | 17 ++++++++++++++ fbpcf/io/cloud_util/GCSClient.h | 34 +++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 fbpcf/io/cloud_util/GCSClient.cpp create mode 100644 fbpcf/io/cloud_util/GCSClient.h diff --git a/fbpcf/io/cloud_util/CloudFileUtil.cpp b/fbpcf/io/cloud_util/CloudFileUtil.cpp index 8097c751..15863dcd 100644 --- a/fbpcf/io/cloud_util/CloudFileUtil.cpp +++ b/fbpcf/io/cloud_util/CloudFileUtil.cpp @@ -5,12 +5,15 @@ * LICENSE file in the root directory of this source tree. */ -#include "fbpcf/io/cloud_util/CloudFileUtil.h" #include +#include #include + #include "fbpcf/aws/S3Util.h" #include "fbpcf/exception/PcfException.h" #include "fbpcf/gcp/GCSUtil.h" +#include "fbpcf/io/cloud_util/CloudFileUtil.h" +#include "fbpcf/io/cloud_util/GCSClient.h" #include "fbpcf/io/cloud_util/GCSFileReader.h" #include "fbpcf/io/cloud_util/GCSFileUploader.h" #include "fbpcf/io/cloud_util/S3Client.h" @@ -65,7 +68,9 @@ std::unique_ptr getCloudFileReader(const std::string& filePath) { fbpcf::aws::S3ClientOption{.region = ref.region}) .getS3Client()); } else if (fileType == CloudFileType::GCS) { - return std::make_unique(fbpcf::gcp::createGCSClient()); + return std::make_unique( + fbpcf::cloudio::GCSClient::getInstance(fbpcf::gcp::GCSClientOption{}) + .getGCSClient()); } else { throw fbpcf::PcfException("Not supported yet."); } @@ -83,7 +88,9 @@ std::unique_ptr getCloudFileUploader( filePath); } else if (fileType == CloudFileType::GCS) { return std::make_unique( - fbpcf::gcp::createGCSClient(), filePath); + fbpcf::cloudio::GCSClient::getInstance(fbpcf::gcp::GCSClientOption{}) + .getGCSClient(), + filePath); } else { throw fbpcf::PcfException("Not supported yet."); } diff --git a/fbpcf/io/cloud_util/GCSClient.cpp b/fbpcf/io/cloud_util/GCSClient.cpp new file mode 100644 index 00000000..06829fc0 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSClient.cpp @@ -0,0 +1,17 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include "fbpcf/io/cloud_util/GCSClient.h" + +namespace fbpcf::cloudio { +GCSClient& GCSClient::getInstance(const fbpcf::gcp::GCSClientOption& option) { + static GCSClient GCSClient(option); + return GCSClient; +} +} // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSClient.h b/fbpcf/io/cloud_util/GCSClient.h new file mode 100644 index 00000000..049ff171 --- /dev/null +++ b/fbpcf/io/cloud_util/GCSClient.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include + +#include +#include "fbpcf/gcp/GCSUtil.h" + +namespace fbpcf::cloudio { + +class GCSClient { + private: + explicit GCSClient(const fbpcf::gcp::GCSClientOption& option) { + GCSClient_ = fbpcf::gcp::createGCSClient(); + } + + public: + static GCSClient& getInstance(const fbpcf::gcp::GCSClientOption& option); + + std::shared_ptr getGCSClient() { + return GCSClient_; + } + + private: + std::shared_ptr GCSClient_; +}; + +} // namespace fbpcf::cloudio