diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index df9b783d531..2394159435e 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -1000,7 +1000,8 @@ if(ARROW_JSON) json/object_parser.cc json/object_writer.cc json/parser.cc - json/reader.cc) + json/reader.cc + json/writer.cc) foreach(ARROW_JSON_TARGET ${ARROW_JSON_TARGETS}) target_link_libraries(${ARROW_JSON_TARGET} PRIVATE RapidJSON) endforeach() diff --git a/cpp/src/arrow/json/CMakeLists.txt b/cpp/src/arrow/json/CMakeLists.txt index fa7d0607848..bd731cd212f 100644 --- a/cpp/src/arrow/json/CMakeLists.txt +++ b/cpp/src/arrow/json/CMakeLists.txt @@ -23,6 +23,7 @@ add_arrow_test(test from_string_test.cc parser_test.cc reader_test.cc + writer_test.cc PREFIX "arrow-json" EXTRA_LINK_LIBS @@ -33,6 +34,12 @@ add_arrow_benchmark(parser_benchmark "arrow-json" EXTRA_LINK_LIBS RapidJSON) + +add_arrow_benchmark(writer_benchmark + PREFIX + "arrow-json" + EXTRA_LINK_LIBS + RapidJSON) arrow_install_all_headers("arrow/json") # pkg-config support diff --git a/cpp/src/arrow/json/meson.build b/cpp/src/arrow/json/meson.build index bc1567df9ec..8c83323a99f 100644 --- a/cpp/src/arrow/json/meson.build +++ b/cpp/src/arrow/json/meson.build @@ -24,6 +24,7 @@ exc = executable( 'from_string_test.cc', 'parser_test.cc', 'reader_test.cc', + 'writer_test.cc', ], dependencies: [arrow_test_dep, rapidjson_dep], ) @@ -36,6 +37,13 @@ exc = executable( ) benchmark('arrow-json-parser-benchmark', exc) +exc = executable( + 'arrow-json-writer-benchmark', + sources: ['writer_benchmark.cc'], + dependencies: [arrow_benchmark_dep, rapidjson_dep], +) +benchmark('arrow-json-writer-benchmark', exc) + install_headers( [ 'api.h', @@ -51,6 +59,7 @@ install_headers( 'reader.h', 'test_common.h', 'type_fwd.h', + 'writer.h', ], subdir: 'arrow/json', ) diff --git a/cpp/src/arrow/json/options.cc b/cpp/src/arrow/json/options.cc index dc5e628b1f3..27a9e935620 100644 --- a/cpp/src/arrow/json/options.cc +++ b/cpp/src/arrow/json/options.cc @@ -24,5 +24,14 @@ ParseOptions ParseOptions::Defaults() { return ParseOptions(); } ReadOptions ReadOptions::Defaults() { return ReadOptions(); } +WriteOptions WriteOptions::Defaults() { return WriteOptions(); } + +Status WriteOptions::Validate() const { + if (ARROW_PREDICT_FALSE(batch_size < 1)) { + return Status::Invalid("WriteOptions: batch_size must be at least 1: ", batch_size); + } + return Status::OK(); +} + } // namespace json } // namespace arrow diff --git a/cpp/src/arrow/json/options.h b/cpp/src/arrow/json/options.h index d7edab9cedd..9192318b0a0 100644 --- a/cpp/src/arrow/json/options.h +++ b/cpp/src/arrow/json/options.h @@ -19,8 +19,10 @@ #include #include +#include #include "arrow/json/type_fwd.h" +#include "arrow/status.h" #include "arrow/util/visibility.h" namespace arrow { @@ -70,5 +72,25 @@ struct ARROW_EXPORT ReadOptions { static ReadOptions Defaults(); }; +struct ARROW_EXPORT WriteOptions { + /// \brief Maximum number of rows processed at a time + /// + /// The JSON writer converts and writes data in batches of N rows. + /// This number can impact performance. + int32_t batch_size = 1024; + + /// \brief Whether to emit null values in the JSON output + /// + /// If true, null values are included as JSON null. + /// If false, null values are omitted from the output entirely. + bool emit_null = false; + + /// Create write options with default values + static WriteOptions Defaults(); + + /// \brief Test that all set options are valid + Status Validate() const; +}; + } // namespace json } // namespace arrow diff --git a/cpp/src/arrow/json/writer.cc b/cpp/src/arrow/json/writer.cc new file mode 100644 index 00000000000..fce64a1f8f4 --- /dev/null +++ b/cpp/src/arrow/json/writer.cc @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/json/writer.h" + +#include "arrow/array.h" +#include "arrow/io/interfaces.h" +#include "arrow/ipc/writer.h" +#include "arrow/json/rapidjson_defs.h" // IWYU pragma: keep +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/iterator.h" +#include "arrow/visit_array_inline.h" + +#include +#include +#include + +#include +#include +#include + +namespace arrow { +namespace json { + +namespace rj = arrow::rapidjson; + +namespace { + +// \brief Builder that holds state for a single conversion. +// +// Implements Visit() methods for each type of Arrow Array that set the values +// of the corresponding fields in each row. +class RowBatchBuilder { + public: + explicit RowBatchBuilder(int64_t num_rows, bool emit_null = true) + : field_(nullptr), emit_null_(emit_null) { + // Reserve all of the space required up-front to avoid unnecessary resizing + rows_.reserve(num_rows); + + for (int64_t i = 0; i < num_rows; ++i) { + rows_.push_back(rj::Document()); + rows_[i].SetObject(); + } + } + + /// \brief Set which field to convert. + void SetField(const arrow::Field* field) { field_ = field; } + + /// \brief Retrieve converted rows from builder. + std::vector Rows() && { return std::move(rows_); } + + // Default implementation + arrow::Status Visit(const arrow::Array& array) { + return arrow::Status::NotImplemented( + "Cannot convert to json document for array of type ", array.type()->ToString()); + } + + // Handles booleans, integers, floats, temporal types + // (excludes interval types with struct C types) + template + arrow::enable_if_t::value && + !arrow::is_interval_type::value, + arrow::Status> + Visit(const ArrayType& array) { + assert(static_cast(rows_.size()) == array.length()); + for (int64_t i = 0; i < array.length(); ++i) { + if (array.IsNull(i) && !emit_null_) { + continue; + } + rj::Value str_key(field_->name(), rows_[i].GetAllocator()); + if (array.IsNull(i)) { + rows_[i].AddMember(str_key, rj::Value(), rows_[i].GetAllocator()); + } else { + rows_[i].AddMember(str_key, array.Value(i), rows_[i].GetAllocator()); + } + } + return arrow::Status::OK(); + } + + // Handle string types + template + arrow::enable_if_string_like Visit(const ArrayType& array) { + assert(static_cast(rows_.size()) == array.length()); + for (int64_t i = 0; i < array.length(); ++i) { + if (array.IsNull(i) && !emit_null_) { + continue; + } + rj::Value str_key(field_->name(), rows_[i].GetAllocator()); + if (array.IsNull(i)) { + rows_[i].AddMember(str_key, rj::Value(), rows_[i].GetAllocator()); + } else { + std::string_view value_view = array.Value(i); + rj::Value value; + value.SetString(value_view.data(), static_cast(value_view.size()), + rows_[i].GetAllocator()); + rows_[i].AddMember(str_key, value, rows_[i].GetAllocator()); + } + } + return arrow::Status::OK(); + } + + // Handle struct + arrow::Status Visit(const arrow::StructArray& array) { + const arrow::StructType* type = array.struct_type(); + + assert(static_cast(rows_.size()) == array.length()); + + RowBatchBuilder child_builder(rows_.size(), emit_null_); + for (int i = 0; i < type->num_fields(); ++i) { + const arrow::Field* child_field = type->field(i).get(); + child_builder.SetField(child_field); + ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*array.field(i).get(), &child_builder)); + } + std::vector rows = std::move(child_builder).Rows(); + + for (int64_t i = 0; i < array.length(); ++i) { + if (array.IsNull(i) && !emit_null_) { + continue; + } + rj::Value str_key(field_->name(), rows_[i].GetAllocator()); + if (array.IsNull(i)) { + rows_[i].AddMember(str_key, rj::Value(), rows_[i].GetAllocator()); + } else { + // Must copy value to new allocator + rj::Value row_val; + row_val.CopyFrom(rows[i], rows_[i].GetAllocator()); + rows_[i].AddMember(str_key, row_val, rows_[i].GetAllocator()); + } + } + return arrow::Status::OK(); + } + + // Handle list-like types + template + arrow::enable_if_list_like Visit(const ArrayType& array) { + assert(static_cast(rows_.size()) == array.length()); + // First create rows from values + std::shared_ptr values = array.values(); + RowBatchBuilder child_builder(values->length(), emit_null_); + const arrow::Field* value_field = array.list_type()->value_field().get(); + std::string value_field_name = value_field->name(); + child_builder.SetField(value_field); + ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*values.get(), &child_builder)); + + std::vector rows = std::move(child_builder).Rows(); + + int64_t values_i = 0; + for (int64_t i = 0; i < array.length(); ++i) { + if (array.IsNull(i) && !emit_null_) { + continue; + } + rj::Document::AllocatorType& allocator = rows_[i].GetAllocator(); + rj::Value str_key(field_->name(), allocator); + + if (array.IsNull(i)) { + rows_[i].AddMember(str_key, rj::Value(), allocator); + continue; + } + + auto array_len = array.value_length(i); + rj::Value value; + value.SetArray(); + value.Reserve(static_cast(array_len), allocator); + + for (int64_t j = 0; j < array_len; ++j) { + rj::Value row_val; + // Must copy value to new allocator + row_val.CopyFrom(rows[values_i][value_field_name], allocator); + value.PushBack(row_val, allocator); + ++values_i; + } + + rows_[i].AddMember(str_key, value, allocator); + } + + return arrow::Status::OK(); + } + + // Handle null + arrow::Status Visit(const arrow::NullArray& array) { + assert(static_cast(rows_.size()) == array.length()); + if (emit_null_) { + for (int64_t i = 0; i < array.length(); ++i) { + rj::Value str_key(field_->name(), rows_[i].GetAllocator()); + rows_[i].AddMember(str_key, rj::Value(), rows_[i].GetAllocator()); + } + } + return arrow::Status::OK(); + } + + private: + const arrow::Field* field_; + std::vector rows_; + bool emit_null_; +}; + +class JSONWriterImpl : public ipc::RecordBatchWriter { + public: + static Result> Make( + io::OutputStream* sink, std::shared_ptr owned_sink, + std::shared_ptr schema, const WriteOptions& options) { + RETURN_NOT_OK(options.Validate()); + auto writer = std::make_shared(sink, std::move(owned_sink), + std::move(schema), options); + return writer; + } + + Status WriteRecordBatch(const RecordBatch& batch) override { + RowBatchBuilder builder{batch.num_rows(), options_.emit_null}; + + for (int i = 0; i < batch.num_columns(); ++i) { + builder.SetField(batch.schema()->field(i).get()); + ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*batch.column(i).get(), &builder)); + } + + for (const auto& doc : std::move(builder).Rows()) { + rj::StringBuffer sb; + rj::Writer writer(sb); + doc.Accept(writer); + sb.Put('\n'); + RETURN_NOT_OK(sink_->Write(sb.GetString())); + } + stats_.num_record_batches++; + return Status::OK(); + } + + Status WriteTable(const Table& table, int64_t max_chunksize) override { + TableBatchReader reader(table); + reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size); + std::shared_ptr batch; + RETURN_NOT_OK(reader.ReadNext(&batch)); + while (batch != nullptr) { + RETURN_NOT_OK(WriteRecordBatch(*batch)); + RETURN_NOT_OK(reader.ReadNext(&batch)); + } + return Status::OK(); + } + + Status Close() override { return Status::OK(); } + + ipc::WriteStats stats() const override { return stats_; } + + JSONWriterImpl(io::OutputStream* sink, std::shared_ptr owned_sink, + std::shared_ptr schema, const WriteOptions& options) + : sink_(sink), + owned_sink_(std::move(owned_sink)), + schema_(std::move(schema)), + options_(options) {} + + private: + io::OutputStream* sink_; + std::shared_ptr owned_sink_; + const std::shared_ptr schema_; + const WriteOptions options_; + ipc::WriteStats stats_; +}; + +} // namespace + +Status WriteJSON(const Table& table, const WriteOptions& options, + arrow::io::OutputStream* output) { + ARROW_ASSIGN_OR_RAISE(auto writer, MakeJSONWriter(output, table.schema(), options)); + RETURN_NOT_OK(writer->WriteTable(table)); + return writer->Close(); +} + +Status WriteJSON(const RecordBatch& batch, const WriteOptions& options, + arrow::io::OutputStream* output) { + ARROW_ASSIGN_OR_RAISE(auto writer, MakeJSONWriter(output, batch.schema(), options)); + RETURN_NOT_OK(writer->WriteRecordBatch(batch)); + return writer->Close(); +} + +Status WriteJSON(const std::shared_ptr& reader, + const WriteOptions& options, arrow::io::OutputStream* output) { + ARROW_ASSIGN_OR_RAISE(auto writer, MakeJSONWriter(output, reader->schema(), options)); + std::shared_ptr batch; + while (true) { + ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); + if (batch == nullptr) break; + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); + } + return writer->Close(); +} + +ARROW_EXPORT +Result> MakeJSONWriter( + std::shared_ptr sink, const std::shared_ptr& schema, + const WriteOptions& options) { + return JSONWriterImpl::Make(sink.get(), sink, schema, options); +} + +ARROW_EXPORT +Result> MakeJSONWriter( + io::OutputStream* sink, const std::shared_ptr& schema, + const WriteOptions& options) { + return JSONWriterImpl::Make(sink, nullptr, schema, options); +} + +} // namespace json +} // namespace arrow diff --git a/cpp/src/arrow/json/writer.h b/cpp/src/arrow/json/writer.h new file mode 100644 index 00000000000..79303f56331 --- /dev/null +++ b/cpp/src/arrow/json/writer.h @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/io/interfaces.h" +#include "arrow/ipc/type_fwd.h" +#include "arrow/json/options.h" +#include "arrow/record_batch.h" +#include "arrow/table.h" + +namespace arrow { +namespace json { + +// Functionality for converting Arrow data to JSON text. +// This library supports all primitive types that can be converted to JSON values. +// Each row in a RecordBatch or Table is converted to a JSON object with field names +// as keys and values as JSON values. Null values are omitted by default. + +/// \defgroup json-write-functions High-level functions for writing JSON files +/// @{ + +/// \brief Convert table to JSON and write the result to output. +/// Experimental +ARROW_EXPORT Status WriteJSON(const Table& table, const WriteOptions& options, + arrow::io::OutputStream* output); +/// \brief Convert batch to JSON and write the result to output. +/// Experimental +ARROW_EXPORT Status WriteJSON(const RecordBatch& batch, const WriteOptions& options, + arrow::io::OutputStream* output); +/// \brief Convert batches read through a RecordBatchReader +/// to JSON and write the results to output. +/// Experimental +ARROW_EXPORT Status WriteJSON(const std::shared_ptr& reader, + const WriteOptions& options, + arrow::io::OutputStream* output); + +/// @} + +/// \defgroup json-writer-factories Functions for creating an incremental JSON writer +/// @{ + +/// \brief Create a new JSON writer. User is responsible for closing the +/// actual OutputStream. +/// +/// \param[in] sink output stream to write to +/// \param[in] schema the schema of the record batches to be written +/// \param[in] options options for serialization +/// \return Result> +ARROW_EXPORT +Result> MakeJSONWriter( + std::shared_ptr sink, const std::shared_ptr& schema, + const WriteOptions& options = WriteOptions::Defaults()); + +/// \brief Create a new JSON writer. +/// +/// \param[in] sink output stream to write to (does not take ownership) +/// \param[in] schema the schema of the record batches to be written +/// \param[in] options options for serialization +/// \return Result> +ARROW_EXPORT +Result> MakeJSONWriter( + io::OutputStream* sink, const std::shared_ptr& schema, + const WriteOptions& options = WriteOptions::Defaults()); + +/// @} + +} // namespace json +} // namespace arrow diff --git a/cpp/src/arrow/json/writer_benchmark.cc b/cpp/src/arrow/json/writer_benchmark.cc new file mode 100644 index 00000000000..e564004a293 --- /dev/null +++ b/cpp/src/arrow/json/writer_benchmark.cc @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include +#include + +#include "arrow/array.h" +#include "arrow/io/memory.h" +#include "arrow/json/options.h" +#include "arrow/json/writer.h" +#include "arrow/record_batch.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" + +namespace arrow { +namespace json { + +using internal::checked_cast; + +namespace { + +constexpr int kSeed = 12345; +// length range of json test strings +constexpr int kStrLenMin = 5; +constexpr int kStrLenMax = 50; +// rows/cols of json test dataset +constexpr int kJsonRows = 2000; +constexpr int kJsonCols = 10; + +std::shared_ptr MakeIntTestBatch(int rows, int cols, int64_t null_percent) { + random::RandomArrayGenerator rg(kSeed); + + FieldVector fields(cols); + ArrayVector arrays(cols); + for (int i = 0; i < cols; ++i) { + fields[i] = field("i" + std::to_string(i), int64()); + arrays[i] = rg.Int64(rows, -87654321, 123456789, null_percent / 100.); + } + return RecordBatch::Make(schema(fields), rows, arrays); +} + +std::shared_ptr MakeStrTestBatch(int rows, int cols, int64_t null_percent) { + random::RandomArrayGenerator rg(kSeed + 1); + + // string length varies from kStrLenMin to kStrLenMax for different columns + auto lengths = + std::dynamic_pointer_cast(rg.Int32(cols, kStrLenMin, kStrLenMax)); + FieldVector fields(cols); + ArrayVector arrays(cols); + for (int i = 0; i < cols; ++i) { + fields[i] = field('s' + std::to_string(i), utf8()); + // string length varies by 20% for different rows in same column + arrays[i] = rg.String(rows, lengths->Value(i), lengths->Value(i) * 6 / 5, + null_percent / 100.); + } + + return RecordBatch::Make(schema(fields), rows, arrays); +} + +void BenchmarkWriteJSON(benchmark::State& state, const WriteOptions& options, + const RecordBatch& batch) { + int64_t total_size = 0; + + for (auto _ : state) { + auto out = io::BufferOutputStream::Create().ValueOrDie(); + ABORT_NOT_OK(WriteJSON(batch, options, out.get())); + auto buffer = out->Finish().ValueOrDie(); + total_size += buffer->size(); + } + + // byte size of the generated json dataset + state.SetBytesProcessed(total_size); + state.SetItemsProcessed(state.iterations() * batch.num_columns() * batch.num_rows()); + state.counters["null_percent"] = static_cast(state.range(0)); +} + +// Exercises integers with emit_null = true +void WriteJSONNumericEmitNull(benchmark::State& state) { // NOLINT non-const reference + auto batch = MakeIntTestBatch(kJsonRows, kJsonCols, state.range(0)); + auto options = WriteOptions::Defaults(); + options.emit_null = true; + BenchmarkWriteJSON(state, options, *batch); +} + +// Exercises integers with emit_null = false +void WriteJSONNumericSkipNull(benchmark::State& state) { // NOLINT non-const reference + auto batch = MakeIntTestBatch(kJsonRows, kJsonCols, state.range(0)); + auto options = WriteOptions::Defaults(); + options.emit_null = false; + BenchmarkWriteJSON(state, options, *batch); +} + +// Exercises strings with emit_null = true +void WriteJSONStringEmitNull(benchmark::State& state) { // NOLINT non-const reference + auto batch = MakeStrTestBatch(kJsonRows, kJsonCols, state.range(0)); + auto options = WriteOptions::Defaults(); + options.emit_null = true; + BenchmarkWriteJSON(state, options, *batch); +} + +// Exercises with emit_null = false +void WriteJSONStringSkipNull(benchmark::State& state) { // NOLINT non-const reference + auto batch = MakeStrTestBatch(kJsonRows, kJsonCols, state.range(0)); + auto options = WriteOptions::Defaults(); + options.emit_null = false; + BenchmarkWriteJSON(state, options, *batch); +} + +void NullPercents(benchmark::internal::Benchmark* bench) { + std::vector null_percents = {0, 1, 10, 50}; + for (int null_percent : null_percents) { + bench->Args({null_percent}); + } +} + +} // namespace + +BENCHMARK(WriteJSONNumericEmitNull)->Apply(NullPercents); +BENCHMARK(WriteJSONNumericSkipNull)->Apply(NullPercents); +BENCHMARK(WriteJSONStringEmitNull)->Apply(NullPercents); +BENCHMARK(WriteJSONStringSkipNull)->Apply(NullPercents); + +} // namespace json +} // namespace arrow diff --git a/cpp/src/arrow/json/writer_test.cc b/cpp/src/arrow/json/writer_test.cc new file mode 100644 index 00000000000..89c89de15e1 --- /dev/null +++ b/cpp/src/arrow/json/writer_test.cc @@ -0,0 +1,348 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/writer.h" +#include "arrow/json/writer.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" + +namespace arrow { +namespace json { + +class TestWriteJSON : public ::testing::Test { + protected: + template + Result ToJSONString(const Data& data, const WriteOptions& options) { + std::shared_ptr out; + ARROW_ASSIGN_OR_RAISE(out, io::BufferOutputStream::Create()); + + RETURN_NOT_OK(WriteJSON(data, options, out.get())); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr buffer, out->Finish()); + return std::string(reinterpret_cast(buffer->data()), buffer->size()); + } + + Result ToJSONStringUsingWriter(const Table& data, + const WriteOptions& options) { + std::shared_ptr out; + ARROW_ASSIGN_OR_RAISE(out, io::BufferOutputStream::Create()); + ARROW_ASSIGN_OR_RAISE(auto writer, MakeJSONWriter(out, data.schema(), options)); + TableBatchReader reader(data); + reader.set_chunksize(1); + std::shared_ptr batch; + RETURN_NOT_OK(reader.ReadNext(&batch)); + while (batch != nullptr) { + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); + RETURN_NOT_OK(reader.ReadNext(&batch)); + } + RETURN_NOT_OK(writer->Close()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr buffer, out->Finish()); + return std::string(reinterpret_cast(buffer->data()), buffer->size()); + } +}; + +TEST_F(TestWriteJSON, EmptyBatch) { + auto schema = ::arrow::schema({field("a", int64()), field("b", utf8())}); + auto batch = RecordBatchFromJSON(schema, "[]"); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ(json, ""); +} + +TEST_F(TestWriteJSON, SimpleTypes) { + auto schema = ::arrow::schema({ + field("int64", int64()), + field("double", float64()), + field("string", utf8()), + field("bool", boolean()), + field("date32", date32()), + field("timestamp", timestamp(TimeUnit::SECOND)), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"int64": 42, "double": 3.14, "string": "hello", "bool": true, "date32": 172800000, "timestamp": 172800000}, + {"int64": -1, "double": 2.71, "string": "world", "bool": false, "date32": 259200000, "timestamp": 259200000} + ])"); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ( + json, + R"({"int64":42,"double":3.14,"string":"hello","bool":true,"date32":172800000,"timestamp":172800000} +{"int64":-1,"double":2.71,"string":"world","bool":false,"date32":259200000,"timestamp":259200000} +)"); +} + +TEST_F(TestWriteJSON, NullValues) { + auto schema = ::arrow::schema({ + field("a", int64()), + field("b", utf8()), + field("c", boolean()), + field("d", null()), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"a": 1, "b": "x", "c": true, "d": null}, + {"a": null, "b": "y", "c": null, "d": null}, + {"a": 3, "b": "z", "c": false, "d": null} + ])"); + + WriteOptions options; + options.emit_null = true; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ(json, + R"({"a":1,"b":"x","c":true,"d":null} +{"a":null,"b":"y","c":null,"d":null} +{"a":3,"b":"z","c":false,"d":null} +)"); +} + +TEST_F(TestWriteJSON, SkipNullValues) { + auto schema = ::arrow::schema({ + field("a", int64()), + field("b", utf8()), + field("c", boolean()), + field("d", null()), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"a": 1, "b": "x", "c": true, "d": null}, + {"a": null, "b": "y", "c": null, "d": null}, + {"a": 3, "b": null, "c": false, "d": null} + ])"); + + WriteOptions options; + options.emit_null = false; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ(json, + R"({"a":1,"b":"x","c":true} +{"b":"y"} +{"a":3,"c":false} +)"); +} + +TEST_F(TestWriteJSON, NestedStruct) { + auto schema = ::arrow::schema({ + field("id", int64()), + field("data", struct_({ + field("name", utf8()), + field("value", int32()), + })), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"id": 1, "data": {"name": "foo", "value": 42}}, + {"id": 2, "data": {"name": "bar", "value": 100}} + ])"); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ(json, + R"({"id":1,"data":{"name":"foo","value":42}} +{"id":2,"data":{"name":"bar","value":100}} +)"); +} + +TEST_F(TestWriteJSON, ListType) { + auto schema = ::arrow::schema({ + field("id", int64()), + field("values", list(int32())), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"id": 1, "values": [1, 2, 3]}, + {"id": 2, "values": [4, 5]}, + {"id": 3, "values": []} + ])"); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ(json, + R"({"id":1,"values":[1,2,3]} +{"id":2,"values":[4,5]} +{"id":3,"values":[]} +)"); +} + +TEST_F(TestWriteJSON, NestedListAndStruct) { + auto schema = ::arrow::schema({ + field("id", int64()), + field("items", list(struct_({ + field("key", utf8()), + field("value", int64()), + }))), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"id": 1, "items": [{"key": "a", "value": 1}, {"key": "b", "value": 2}]}, + {"id": 2, "items": []} + ])"); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ(json, + R"({"id":1,"items":[{"key":"a","value":1},{"key":"b","value":2}]} +{"id":2,"items":[]} +)"); +} + +TEST_F(TestWriteJSON, FromTable) { + auto schema = ::arrow::schema({ + field("a", int64()), + field("b", utf8()), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"a": 1, "b": "x"}, + {"a": 2, "b": "y"} + ])"); + + ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches({batch})); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*table, options)); + EXPECT_EQ(json, + R"({"a":1,"b":"x"} +{"a":2,"b":"y"} +)"); +} + +TEST_F(TestWriteJSON, FromRecordBatchReader) { + auto schema = ::arrow::schema({ + field("a", int64()), + field("b", utf8()), + }); + + auto batch1 = RecordBatchFromJSON(schema, R"([ + {"a": 1, "b": "x"} + ])"); + auto batch2 = RecordBatchFromJSON(schema, R"([ + {"a": 2, "b": "y"} + ])"); + + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make({batch1, batch2})); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(reader, options)); + EXPECT_EQ(json, + R"({"a":1,"b":"x"} +{"a":2,"b":"y"} +)"); +} + +TEST_F(TestWriteJSON, BatchSize) { + auto schema = ::arrow::schema({ + field("a", int64()), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"a": 1}, {"a": 2}, {"a": 3}, {"a": 4}, {"a": 5} + ])"); + + WriteOptions options; + options.batch_size = 2; // Process 2 rows at a time + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ(json, + R"({"a":1} +{"a":2} +{"a":3} +{"a":4} +{"a":5} +)"); +} + +TEST_F(TestWriteJSON, UsingWriter) { + auto schema = ::arrow::schema({ + field("a", int64()), + field("b", utf8()), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"a": 1, "b": "x"}, + {"a": 2, "b": "y"} + ])"); + + ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches({batch})); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONStringUsingWriter(*table, options)); + EXPECT_EQ(json, + R"({"a":1,"b":"x"} +{"a":2,"b":"y"} +)"); +} + +TEST_F(TestWriteJSON, LargeString) { + auto schema = ::arrow::schema({ + field("a", large_utf8()), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"a": "hello"}, + {"a": "world"} + ])"); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ(json, + R"({"a":"hello"} +{"a":"world"} +)"); +} + +TEST_F(TestWriteJSON, FixedSizeList) { + auto schema = ::arrow::schema({ + field("a", fixed_size_list(int32(), 3)), + }); + + auto batch = RecordBatchFromJSON(schema, R"([ + {"a": [1, 2, 3]}, + {"a": [4, 5, 6]} + ])"); + + WriteOptions options; + ASSERT_OK_AND_ASSIGN(std::string json, ToJSONString(*batch, options)); + EXPECT_EQ(json, + R"({"a":[1,2,3]} +{"a":[4,5,6]} +)"); +} + +TEST_F(TestWriteJSON, OptionsValidation) { + WriteOptions options; + options.batch_size = 0; + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::HasSubstr("batch_size must be at least 1"), options.Validate()); + + options.batch_size = 1024; + ASSERT_OK(options.Validate()); +} + +} // namespace json +} // namespace arrow diff --git a/cpp/src/arrow/meson.build b/cpp/src/arrow/meson.build index 48d01db729d..6622e374e51 100644 --- a/cpp/src/arrow/meson.build +++ b/cpp/src/arrow/meson.build @@ -515,6 +515,7 @@ if needs_json 'json/object_writer.cc', 'json/parser.cc', 'json/reader.cc', + 'json/writer.cc', ], 'dependencies': [rapidjson_dep], }, diff --git a/docs/source/cpp/api/formats.rst b/docs/source/cpp/api/formats.rst index 264b9e4e7c6..cdb1572003d 100644 --- a/docs/source/cpp/api/formats.rst +++ b/docs/source/cpp/api/formats.rst @@ -70,6 +70,18 @@ Line-separated JSON .. doxygenclass:: arrow::json::StreamingReader :members: +JSON writer +=========== + +.. doxygenstruct:: arrow::json::WriteOptions + :members: + +.. doxygengroup:: json-write-functions + :content-only: + +.. doxygengroup:: json-writer-factories + :content-only: + .. _cpp-api-parquet: Parquet reader diff --git a/docs/source/cpp/json.rst b/docs/source/cpp/json.rst index 3e49c045c8a..6e0e637e30b 100644 --- a/docs/source/cpp/json.rst +++ b/docs/source/cpp/json.rst @@ -20,7 +20,15 @@ .. cpp:namespace:: arrow::json -================== +============================== +Reading and writing JSON files +============================== + +Arrow provides both a reader and a writer for line-separated JSON files. + +.. seealso:: + :ref:`JSON reader/writer API reference `. + Reading JSON files ================== @@ -33,11 +41,8 @@ representing the input file. Their behavior can be customized using a combination of :class:`~ReadOptions`, :class:`~ParseOptions`, and other parameters. -.. seealso:: - :ref:`JSON reader API reference `. - TableReader -=========== +----------- :class:`~TableReader` reads an entire file in one shot as a :class:`~arrow::Table`. Each independent JSON object in the input file is converted to a row in @@ -72,7 +77,7 @@ the output table. } StreamingReader -=============== +--------------- :class:`~StreamingReader` reads a file incrementally from blocks of a roughly equal byte size, each yielding a :class:`~arrow::RecordBatch`. Each independent JSON object in a block @@ -110,7 +115,7 @@ may be passed via :class:`~ParseOptions`. } Data types -========== +---------- Since JSON values are typed, the possible Arrow data types on output depend on the input value types. Top-level JSON values should always be @@ -169,3 +174,133 @@ two modes. +-----------------+----------------------------------------------------+ | Object (nested) | Struct | +-----------------+----------------------------------------------------+ + +Writing JSON files +================== + +Arrow data can be written to line-delimited JSON (JSONL/NDJSON) format, where +each row in a RecordBatch or Table is converted to a JSON object on a single line. +Each field in the Arrow schema becomes a key in the JSON object, and the values +are converted to their JSON representation. + +The writer supports all primitive types that can be converted to JSON values, +plus nested structs and lists. Null values are omitted by default, but can be +represented as JSON null using the :member:`WriteOptions::emit_null` option. + +.. seealso:: + :ref:`JSON reader/writer API reference `. + +High-level write functions +-------------------------- + +The simplest way to write JSON is using the high-level functions that write +an entire Table, RecordBatch, or RecordBatchReader at once: + +.. code-block:: cpp + + #include "arrow/json/writer.h" + + { + // ... + std::shared_ptr output = ...; + auto write_options = arrow::json::WriteOptions::Defaults(); + + // Write a Table + if (!WriteJSON(table, write_options, output.get()).ok()) { + // Handle write error... + } + + // Write a RecordBatch + if (!WriteJSON(batch, write_options, output.get()).ok()) { + // Handle write error... + } + + // Write from a RecordBatchReader + std::shared_ptr reader = ...; + if (!WriteJSON(reader, write_options, output.get()).ok()) { + // Handle write error... + } + } + +Incremental writing +------------------- + +For writing data incrementally, create a JSON writer using +:func:`~MakeJSONWriter`. This requires the output stream, the schema of the +data to be written, and optionally write options: + +.. code-block:: cpp + + #include "arrow/json/writer.h" + + { + // ... + std::shared_ptr output = ...; + std::shared_ptr schema = ...; + auto write_options = arrow::json::WriteOptions::Defaults(); + + auto maybe_writer = arrow::json::MakeJSONWriter(output, schema, write_options); + if (!maybe_writer.ok()) { + // Handle writer instantiation error... + } + std::shared_ptr writer = *maybe_writer; + + // Write batches incrementally + std::shared_ptr batch = ...; + if (!writer->WriteRecordBatch(*batch).ok()) { + // Handle write error... + } + + // Write a Table + arrow::Table table = ...; + if (!writer->WriteTable(table).ok()) { + // Handle write error... + } + + if (!writer->Close().ok()) { + // Handle close error... + } + if (!output->Close().ok()) { + // Handle file close error... + } + } + +Write options +------------- + +The :class:`~WriteOptions` struct allows customization of the JSON output: + +* :member:`WriteOptions::batch_size` - Maximum number of rows processed at a time +* :member:`WriteOptions::emit_null` - Whether to include null values in the output + (by default, null values are omitted) + +.. code-block:: cpp + + auto write_options = arrow::json::WriteOptions::Defaults(); + write_options.batch_size = 2048; // Process 2048 rows at a time + write_options.emit_null = true; // Include null values in output + +Supported data types +-------------------- + +The following table shows how Arrow data types are converted to JSON value types: + +.. table:: Arrow to JSON type conversions + :align: center + + +---------------------------------------------------+------------------+ + | Arrow data type | JSON value type | + +===================================================+==================+ + | Integer, floating-point, and temporal types | Number | + | (not including interval types) | | + +---------------------------------------------------+------------------+ + | Boolean | Boolean | + +---------------------------------------------------+------------------+ + | String-like types | String | + +---------------------------------------------------+------------------+ + | Struct | Object | + +---------------------------------------------------+------------------+ + | List-like types | Array | + +---------------------------------------------------+------------------+ + | Null | Null | + +---------------------------------------------------+------------------+