From 9b7a7c42ba5e90891323a497e159b92e27cb746f Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Fri, 22 Aug 2025 08:13:06 +0300 Subject: [PATCH 01/46] ADBDEV-7890 --- helper/restore_helper.go | 27 ++++++++++++++++++++++----- plugins/example_plugin.bash | 20 ++++++++++++++++++++ plugins/example_plugin_config.yaml | 3 ++- 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index a4c5120fe..d11a1ec4d 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -40,6 +40,14 @@ var ( contentRE *regexp.Regexp ) +type DiscardError struct { + what string +} + +func (e DiscardError) Error() string { + return e.what +} + /* IRestoreReader interface to wrap the underlying reader. * getReaderType() identifies how the reader can be used * SEEKABLE uses seekReader. Used when restoring from uncompressed data with filters from local filesystem @@ -108,8 +116,20 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { switch r.readerType { case SEEKABLE: bytesRead, err = io.CopyN(writer, r.seekReader, num) - case NONSEEKABLE, SUBSET: + case NONSEEKABLE: bytesRead, err = io.CopyN(writer, r.bufReader, num) + case SUBSET: + bytesRead, err = io.CopyN(writer, r.bufReader, num) + if err != nil { + if err != io.EOF { + bytesLeftToRead := num - bytesRead + bytesDiscard, errDiscard := io.CopyN(io.Discard, r.bufReader, bytesLeftToRead) + bytesRead += bytesDiscard + + err = DiscardError {errDiscard.Error()} + //Что делать в случае критичных ошибок? + } + } } return bytesRead, err } @@ -612,10 +632,6 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool { if !pluginConfig.CanRestoreSubset() { return false } - // Helper's option does not allow to use subset - if !*isFiltered || *onErrorContinue { - return false - } // Restore subset and compression does not allow together if strings.HasSuffix(fileToRead, ".gz") || strings.HasSuffix(fileToRead, ".zst") { return false @@ -663,6 +679,7 @@ func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidLis return nil, nil, false, err } cmdStr := "" + logVerbose("sokolov111", *isFiltered, *onErrorContinue, objToc, getSubsetFlag(fileToRead, pluginConfig)) if objToc != nil && getSubsetFlag(fileToRead, pluginConfig) { offsetsFile, _ := ioutil.TempFile("/tmp", "gprestore_offsets_") defer func() { diff --git a/plugins/example_plugin.bash b/plugins/example_plugin.bash index 302d5d6c8..9683f4691 100755 --- a/plugins/example_plugin.bash +++ b/plugins/example_plugin.bash @@ -89,6 +89,26 @@ restore_data() { cat /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename } +restore_data_subset() { + echo "restore_data_subset $1 $2 $3" >> /tmp/plugin_out.txt + filename=`basename "$2"` + timestamp_dir=`basename $(dirname "$2")` + timestamp_day_dir=${timestamp_dir%??????} + if [ -e "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR" ] ; then + echo 'Some plugin warning' >&2 + elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" -a "$GPBACKUP_PLUGIN_DIE_ON_OID" = "" ] ; then + exit 1 + elif [[ -e "/tmp/GPBACKUP_PLUGIN_DIE" && "$filename" == *"$GPBACKUP_PLUGIN_DIE_ON_OID"* ]] ; then + # sleep a while for test purposes - to let gprestore start COPY commands + sleep 5 + exit 1 + fi + /home/keremet/arenadata/tasks/7890_handle_table_restore_errors_during_subset_restore/restore_data_subset /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename $3 +} + + + + delete_backup() { echo "delete_backup $1 $2" >> /tmp/plugin_out.txt timestamp_day_dir=${2%??????} diff --git a/plugins/example_plugin_config.yaml b/plugins/example_plugin_config.yaml index 89deab6a7..bad10e64b 100644 --- a/plugins/example_plugin_config.yaml +++ b/plugins/example_plugin_config.yaml @@ -1,3 +1,4 @@ -executablepath: $GOPATH/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash +executablepath: /home/keremet/compile/gpbackup/plugins/example_plugin.bash options: password: unknown + restore_subset: on From f52c130b38b80c473ca95d893a1ed70ed76d409b Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Thu, 28 Aug 2025 16:26:28 +0300 Subject: [PATCH 02/46] Error handling --- helper/restore_helper.go | 49 +++++++++++++++++++----------- plugins/example_plugin.bash | 20 ------------ plugins/example_plugin_config.yaml | 3 +- 3 files changed, 33 insertions(+), 39 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index d11a1ec4d..634df55cd 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -40,13 +40,7 @@ var ( contentRE *regexp.Regexp ) -type DiscardError struct { - what string -} - -func (e DiscardError) Error() string { - return e.what -} +var discardError = errors.New("helper: discard error") /* IRestoreReader interface to wrap the underlying reader. * getReaderType() identifies how the reader can be used @@ -62,6 +56,7 @@ type IRestoreReader interface { copyAllData() (int64, error) closeFileHandle() getReaderType() ReaderType + discardData(num int64) (int64, error) } type RestoreReader struct { @@ -110,6 +105,16 @@ func (r *RestoreReader) positionReader(pos uint64, oid int) error { return nil } +func (r *RestoreReader) discardData(num int64) (int64, error) { + if (r.readerType == SUBSET) { + n, err := io.CopyN(io.Discard, r.bufReader, num) + logVerbose(fmt.Sprintf("%d bytes to discard, discarded %d bytes", num, n)) + return n, err + } + + return 0, nil +} + func (r *RestoreReader) copyData(num int64) (int64, error) { var bytesRead int64 var err error @@ -120,14 +125,15 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { bytesRead, err = io.CopyN(writer, r.bufReader, num) case SUBSET: bytesRead, err = io.CopyN(writer, r.bufReader, num) - if err != nil { - if err != io.EOF { - bytesLeftToRead := num - bytesRead - bytesDiscard, errDiscard := io.CopyN(io.Discard, r.bufReader, bytesLeftToRead) - bytesRead += bytesDiscard - - err = DiscardError {errDiscard.Error()} - //Что делать в случае критичных ошибок? + if err != nil && err != io.EOF && *onErrorContinue { + bytesDiscard, errDiscard := r.discardData(num - bytesRead) + bytesRead += bytesDiscard + if errDiscard != nil { + if errDiscard == io.EOF { + err = errDiscard + } else { + err = errors.Wrap(discardError, errDiscard.Error()) + } } } } @@ -371,6 +377,16 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil skipOid = tableOid + if *singleDataFile { + _, errDiscard := readers[contentToRestore].discardData(int64(end[contentToRestore] - start[contentToRestore])) + if errDiscard != nil { + if errDiscard == io.EOF { + err = errDiscard + } else { + err = errors.Wrap(discardError, errDiscard.Error()) + } + } + } /* Close up to *copyQueue files with this tableOid */ for idx := 0; idx < *copyQueue; idx++ { batchToDelete := batchNum + idx @@ -465,7 +481,7 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { if err != nil { logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err)) - if *onErrorContinue { + if *onErrorContinue && !errors.Is(err, io.EOF) && !errors.Is(err, discardError) { lastError = err err = nil continue @@ -679,7 +695,6 @@ func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidLis return nil, nil, false, err } cmdStr := "" - logVerbose("sokolov111", *isFiltered, *onErrorContinue, objToc, getSubsetFlag(fileToRead, pluginConfig)) if objToc != nil && getSubsetFlag(fileToRead, pluginConfig) { offsetsFile, _ := ioutil.TempFile("/tmp", "gprestore_offsets_") defer func() { diff --git a/plugins/example_plugin.bash b/plugins/example_plugin.bash index 9683f4691..302d5d6c8 100755 --- a/plugins/example_plugin.bash +++ b/plugins/example_plugin.bash @@ -89,26 +89,6 @@ restore_data() { cat /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename } -restore_data_subset() { - echo "restore_data_subset $1 $2 $3" >> /tmp/plugin_out.txt - filename=`basename "$2"` - timestamp_dir=`basename $(dirname "$2")` - timestamp_day_dir=${timestamp_dir%??????} - if [ -e "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR" ] ; then - echo 'Some plugin warning' >&2 - elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" -a "$GPBACKUP_PLUGIN_DIE_ON_OID" = "" ] ; then - exit 1 - elif [[ -e "/tmp/GPBACKUP_PLUGIN_DIE" && "$filename" == *"$GPBACKUP_PLUGIN_DIE_ON_OID"* ]] ; then - # sleep a while for test purposes - to let gprestore start COPY commands - sleep 5 - exit 1 - fi - /home/keremet/arenadata/tasks/7890_handle_table_restore_errors_during_subset_restore/restore_data_subset /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename $3 -} - - - - delete_backup() { echo "delete_backup $1 $2" >> /tmp/plugin_out.txt timestamp_day_dir=${2%??????} diff --git a/plugins/example_plugin_config.yaml b/plugins/example_plugin_config.yaml index bad10e64b..89deab6a7 100644 --- a/plugins/example_plugin_config.yaml +++ b/plugins/example_plugin_config.yaml @@ -1,4 +1,3 @@ -executablepath: /home/keremet/compile/gpbackup/plugins/example_plugin.bash +executablepath: $GOPATH/src/github.com/greenplum-db/gpbackup/plugins/example_plugin.bash options: password: unknown - restore_subset: on From 0b3fe62330dd83afe0d18ce2bb3b0f3c3e2cb687 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Fri, 29 Aug 2025 08:52:26 +0300 Subject: [PATCH 03/46] Fix test --- helper/helper_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/helper/helper_test.go b/helper/helper_test.go index f9e0d3b83..0d8be00c5 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -48,6 +48,10 @@ func (r *restoreReaderTestImpl) getReaderType() ReaderType { return "nil" } +func (r *restoreReaderTestImpl) discardData(num int64) (int64, error) { + return num, nil +} + type helperTestStep struct { restorePipeWriterArgExpect string restorePipeWriterResult bool From 5ca11b709c1563b9c27801a8116177727f692d99 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Fri, 29 Aug 2025 16:45:51 +0300 Subject: [PATCH 04/46] Fix test --- helper/helper_test.go | 2 +- helper/restore_helper.go | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 0d8be00c5..f499a4c76 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -247,7 +247,7 @@ var _ = Describe("helper tests", func() { pluginConfig.Options["restore_subset"] = "on" *onErrorContinue = true isSubset = getSubsetFlag(fileToRead, &pluginConfig) - Expect(isSubset).To(Equal(false)) + Expect(isSubset).To(Equal(true)) }) It("when restore_subset is off, --on-error-continue is false, compression \"gz\" is used", func() { pluginConfig.Options["restore_subset"] = "off" diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 634df55cd..d4bea4e05 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -378,12 +378,14 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { err = nil skipOid = tableOid if *singleDataFile { - _, errDiscard := readers[contentToRestore].discardData(int64(end[contentToRestore] - start[contentToRestore])) - if errDiscard != nil { - if errDiscard == io.EOF { - err = errDiscard - } else { - err = errors.Wrap(discardError, errDiscard.Error()) + if readers[contentToRestore] != nil { + _, errDiscard := readers[contentToRestore].discardData(int64(end[contentToRestore] - start[contentToRestore])) + if errDiscard != nil { + if errDiscard == io.EOF { + err = errDiscard + } else { + err = errors.Wrap(discardError, errDiscard.Error()) + } } } } From e2fab0aa9f3b765491afd1fb7288d0ddf7237506 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Fri, 29 Aug 2025 16:58:20 +0300 Subject: [PATCH 05/46] Decrease indentation --- helper/restore_helper.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index d4bea4e05..c0620d7be 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -377,15 +377,14 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil skipOid = tableOid - if *singleDataFile { - if readers[contentToRestore] != nil { - _, errDiscard := readers[contentToRestore].discardData(int64(end[contentToRestore] - start[contentToRestore])) - if errDiscard != nil { - if errDiscard == io.EOF { - err = errDiscard - } else { - err = errors.Wrap(discardError, errDiscard.Error()) - } + if *singleDataFile && readers[contentToRestore] != nil { + bytesToDiscard := int64(end[contentToRestore] - start[contentToRestore]) + _, errDiscard := readers[contentToRestore].discardData(bytesToDiscard) + if errDiscard != nil { + if errDiscard == io.EOF { + err = errDiscard + } else { + err = errors.Wrap(discardError, errDiscard.Error()) } } } From 5c4db1a82cd7cd268f0567fc18bccc5d822906ba Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 1 Sep 2025 10:55:01 +0300 Subject: [PATCH 06/46] Revert condition --- helper/restore_helper.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index c0620d7be..21b9d740a 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -649,6 +649,10 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool { if !pluginConfig.CanRestoreSubset() { return false } + // Helper's option does not allow to use subset + if !*isFiltered { + return false + } // Restore subset and compression does not allow together if strings.HasSuffix(fileToRead, ".gz") || strings.HasSuffix(fileToRead, ".zst") { return false From 90d5469f1a2ad8e96595815b0f8b0fd557f99053 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 1 Sep 2025 16:38:52 +0300 Subject: [PATCH 07/46] Add tests --- helper/helper_test.go | 106 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/helper/helper_test.go b/helper/helper_test.go index f499a4c76..f98cdd1e2 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -3,6 +3,7 @@ package helper import ( "bufio" "fmt" + "io" "os" "github.com/greenplum-db/gpbackup/utils" @@ -202,6 +203,47 @@ func (pt *testPluginCmd) Wait() error { func (pt *testPluginCmd) errLog() { } +type limitReader struct { + rem int + err error +} + +func (r *limitReader) Read(p []byte) (n int, err error) { + if r.rem <= 0 { + return 0, r.err + } + + if len(p) > r.rem { + p = p[0:r.rem] + } + + n = len(p) + for i := 0; i < n; i++ { + p[i] = 1 + } + r.rem -= n + return +} + +type limitWriter struct { + rem int +} + +func (w *limitWriter) Write(p []byte) (n int, err error) { + if w.rem < len(p) { + n = w.rem + } else { + n = len(p) + } + + if w.rem == 0 { + err = io.ErrShortWrite + } + + w.rem -= n + return +} + var _ = Describe("helper tests", func() { var pluginConfig utils.PluginConfig var isSubset bool @@ -508,6 +550,70 @@ var _ = Describe("helper tests", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal(msg)) }) + It("CopyData, readerType is SUBSET. Normal completion", func() { + writer = bufio.NewWriterSize(&limitWriter{100}, 5) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{100, io.EOF}), + } + + bytesRead, err := test_reader.copyData(18) + Expect(bytesRead).To(Equal(int64(18))) + Expect(err).ToNot(HaveOccurred()) + }) + It("CopyData, readerType is SUBSET. Error on write", func() { + *onErrorContinue = true + writer = bufio.NewWriterSize(&limitWriter{7}, 5) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{100, io.EOF}), + } + + bytesRead, err := test_reader.copyData(18) + Expect(bytesRead).To(Equal(int64(18))) + Expect(err).To(HaveOccurred()) + }) + It("CopyData, readerType is SUBSET. EOF", func() { + *onErrorContinue = true + writer = bufio.NewWriterSize(&limitWriter{100}, 5) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{25, io.EOF}), + } + + bytesRead, err := test_reader.copyData(30) + Expect(bytesRead).To(Equal(int64(25))) + Expect(err).To(Equal(io.EOF)) + }) + It("CopyData, readerType is SUBSET. Error on write and EOF", func() { + *onErrorContinue = true + writer = bufio.NewWriterSize(&limitWriter{7}, 5) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{25, io.EOF}), + } + + bytesRead, err := test_reader.copyData(30) + Expect(bytesRead).To(Equal(int64(25))) + Expect(err).To(Equal(io.EOF)) + }) + It("CopyData, readerType is SUBSET. Error on write and on read", func() { + *onErrorContinue = true + writer = bufio.NewWriterSize(&limitWriter{7}, 5) + + test_reader := RestoreReader{ + readerType: SUBSET, + bufReader: bufio.NewReader(&limitReader{25, io.ErrNoProgress}), + } + + bytesRead, err := test_reader.copyData(30) + Expect(bytesRead).To(Equal(int64(25))) + Expect(errors.Is(err, discardError)).To(Equal(true)) + }) }) }) From 87e6f24bd38ca6b60eb683651ba0ea850137a976 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 2 Sep 2025 17:10:48 +0300 Subject: [PATCH 08/46] skipoid test --- helper/helper_test.go | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index f98cdd1e2..b03069ce2 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -19,6 +19,7 @@ import ( var ( testDir = "/tmp/helper_test/20180101/20180101010101" testTocFile = fmt.Sprintf("%s/test_toc.yaml", testDir) + discarded int64 ) type restoreReaderTestImpl struct { @@ -50,6 +51,7 @@ func (r *restoreReaderTestImpl) getReaderType() ReaderType { } func (r *restoreReaderTestImpl) discardData(num int64) (int64, error) { + discarded += num return num, nil } @@ -462,6 +464,29 @@ var _ = Describe("helper tests", func() { err := doRestoreAgentInternal(helper) Expect(err).To(BeNil()) }) + It("discard data if skip file is discovered with single datafile", func() { + *singleDataFile = true + *isResizeRestore = false + *tocFile = testTocFile + + writeTestTOC(testTocFile) + defer func() { + _ = os.Remove(*tocFile) + }() + + oidBatch := []oidWithBatch{ + {1 /* The first oid from TOC */, 0}, + } + + expectedScenario := []helperTestStep{ + {"mock_1_0", false, 1, true, "Can not open pipe for table 1, check_skip_file shall called, skip file exists"}, + } + + helper := newHelperTest(oidBatch, expectedScenario) + err := doRestoreAgentInternal(helper) + Expect(err).To(BeNil()) + Expect(discarded).To(Equal(int64(18))) + }) It("calls Wait in waitForPlugin doRestoreAgent for single data file", func() { *singleDataFile = true *isResizeRestore = false @@ -621,13 +646,13 @@ func writeTestTOC(tocFile string) { // Write test TOC. We are not going to read data using it, so dataLength is a random number dataLength := 100 customTOC := fmt.Sprintf(`dataentries: -1: + 1: startbyte: 0 endbyte: 18 -2: + 2: startbyte: 18 endbyte: %[1]d -3: + 3: startbyte: %[1]d endbyte: %d `, dataLength+18, dataLength+18+18) From bbae9baf20d3aed284c8c8380638bdf391d391b6 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 3 Sep 2025 10:46:08 +0300 Subject: [PATCH 09/46] Remove parenthesis --- helper/restore_helper.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 21b9d740a..795cd2c00 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -106,7 +106,7 @@ func (r *RestoreReader) positionReader(pos uint64, oid int) error { } func (r *RestoreReader) discardData(num int64) (int64, error) { - if (r.readerType == SUBSET) { + if r.readerType == SUBSET { n, err := io.CopyN(io.Discard, r.bufReader, num) logVerbose(fmt.Sprintf("%d bytes to discard, discarded %d bytes", num, n)) return n, err @@ -266,11 +266,14 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { break } tocFileForContent := replaceContentInFilename(*tocFile, contentToRestore) + logWarn("\n\n!!!!!tocFileForContent = ", tocFileForContent) segmentTOC[contentToRestore] = toc.NewSegmentTOC(tocFileForContent) + logWarn("\n\n!!!!!tocFileForContent = ", segmentTOC[contentToRestore]) tocEntries[contentToRestore] = segmentTOC[contentToRestore].DataEntries filename := replaceContentInFilename(*dataFile, contentToRestore) readers[contentToRestore], err = restoreHelper.getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList) + logWarn("\n\n!!!!!readers[contentToRestore] = ", readers[contentToRestore]) if readers[contentToRestore] != nil { // NOTE: If we reach here with batches > 1, there will be // *origSize / *destSize (N old segments / N new segments) @@ -360,11 +363,14 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { } } + logWarn("\n\n!!!!! 2 readers[contentToRestore] = ", readers[contentToRestore], tocEntries) logInfo(fmt.Sprintf("Oid %d, Batch %d: Opening pipe %s", tableOid, batchNum, currentPipe)) for { writer, writeHandle, err = restoreHelper.getRestorePipeWriter(currentPipe) + logWarn("\n\n!!!!! 3 writer, writeHandle, err = ", writer, writeHandle, err) if err != nil { if errors.Is(err, unix.ENXIO) { + logWarn("\n\n!!!!! 4 writer, writeHandle, err = ", writer, writeHandle, err) // COPY (the pipe reader) has not tried to access the pipe yet so our restore_helper // process will get ENXIO error on its nonblocking open call on the pipe. We loop in // here while looking to see if gprestore has created a skip file for this restore entry. @@ -374,7 +380,8 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { // not contain a database connection so the version should be passed through the helper // invocation from gprestore (e.g. create a --db-version flag option). if *onErrorContinue && restoreHelper.checkForSkipFile(*pipeFile, tableOid) { - logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) + logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum), *singleDataFile, readers[contentToRestore]) + logWarn("", end[contentToRestore], start[contentToRestore], contentToRestore) err = nil skipOid = tableOid if *singleDataFile && readers[contentToRestore] != nil { From 663f7f7b9145b197572b6d7b8dcbc78895b19adb Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 3 Sep 2025 11:26:28 +0300 Subject: [PATCH 10/46] Deduplication --- helper/restore_helper.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 795cd2c00..75321be6a 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -108,7 +108,13 @@ func (r *RestoreReader) positionReader(pos uint64, oid int) error { func (r *RestoreReader) discardData(num int64) (int64, error) { if r.readerType == SUBSET { n, err := io.CopyN(io.Discard, r.bufReader, num) - logVerbose(fmt.Sprintf("%d bytes to discard, discarded %d bytes", num, n)) + if err != nil { + logVerbose(fmt.Sprintf("%d bytes to discard", n)) + if err != io.EOF { + err = errors.Wrap(discardError, err.Error()) + } + } + logVerbose(fmt.Sprintf("%discarded %d bytes", n)) return n, err } @@ -129,11 +135,7 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { bytesDiscard, errDiscard := r.discardData(num - bytesRead) bytesRead += bytesDiscard if errDiscard != nil { - if errDiscard == io.EOF { - err = errDiscard - } else { - err = errors.Wrap(discardError, errDiscard.Error()) - } + err = errDiscard } } } @@ -386,14 +388,7 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { skipOid = tableOid if *singleDataFile && readers[contentToRestore] != nil { bytesToDiscard := int64(end[contentToRestore] - start[contentToRestore]) - _, errDiscard := readers[contentToRestore].discardData(bytesToDiscard) - if errDiscard != nil { - if errDiscard == io.EOF { - err = errDiscard - } else { - err = errors.Wrap(discardError, errDiscard.Error()) - } - } + _, err = readers[contentToRestore].discardData(bytesToDiscard) } /* Close up to *copyQueue files with this tableOid */ for idx := 0; idx < *copyQueue; idx++ { From c9acd36bd84181fe8ebc7761f26d27ec3c7f6ba6 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 3 Sep 2025 11:35:54 +0300 Subject: [PATCH 11/46] Fix --- helper/restore_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 75321be6a..5e11edd73 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -114,7 +114,7 @@ func (r *RestoreReader) discardData(num int64) (int64, error) { err = errors.Wrap(discardError, err.Error()) } } - logVerbose(fmt.Sprintf("%discarded %d bytes", n)) + logVerbose(fmt.Sprintf("discarded %d bytes", n)) return n, err } From 4d66f3e09a35d769d218492780b8035a51aae787 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 3 Sep 2025 11:38:07 +0300 Subject: [PATCH 12/46] Revert "Remove parenthesis" This reverts commit bbae9baf20d3aed284c8c8380638bdf391d391b6. --- helper/restore_helper.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 5e11edd73..5602d384d 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -106,7 +106,7 @@ func (r *RestoreReader) positionReader(pos uint64, oid int) error { } func (r *RestoreReader) discardData(num int64) (int64, error) { - if r.readerType == SUBSET { + if (r.readerType == SUBSET) { n, err := io.CopyN(io.Discard, r.bufReader, num) if err != nil { logVerbose(fmt.Sprintf("%d bytes to discard", n)) @@ -268,14 +268,11 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { break } tocFileForContent := replaceContentInFilename(*tocFile, contentToRestore) - logWarn("\n\n!!!!!tocFileForContent = ", tocFileForContent) segmentTOC[contentToRestore] = toc.NewSegmentTOC(tocFileForContent) - logWarn("\n\n!!!!!tocFileForContent = ", segmentTOC[contentToRestore]) tocEntries[contentToRestore] = segmentTOC[contentToRestore].DataEntries filename := replaceContentInFilename(*dataFile, contentToRestore) readers[contentToRestore], err = restoreHelper.getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList) - logWarn("\n\n!!!!!readers[contentToRestore] = ", readers[contentToRestore]) if readers[contentToRestore] != nil { // NOTE: If we reach here with batches > 1, there will be // *origSize / *destSize (N old segments / N new segments) @@ -365,14 +362,11 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { } } - logWarn("\n\n!!!!! 2 readers[contentToRestore] = ", readers[contentToRestore], tocEntries) logInfo(fmt.Sprintf("Oid %d, Batch %d: Opening pipe %s", tableOid, batchNum, currentPipe)) for { writer, writeHandle, err = restoreHelper.getRestorePipeWriter(currentPipe) - logWarn("\n\n!!!!! 3 writer, writeHandle, err = ", writer, writeHandle, err) if err != nil { if errors.Is(err, unix.ENXIO) { - logWarn("\n\n!!!!! 4 writer, writeHandle, err = ", writer, writeHandle, err) // COPY (the pipe reader) has not tried to access the pipe yet so our restore_helper // process will get ENXIO error on its nonblocking open call on the pipe. We loop in // here while looking to see if gprestore has created a skip file for this restore entry. @@ -382,8 +376,7 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { // not contain a database connection so the version should be passed through the helper // invocation from gprestore (e.g. create a --db-version flag option). if *onErrorContinue && restoreHelper.checkForSkipFile(*pipeFile, tableOid) { - logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum), *singleDataFile, readers[contentToRestore]) - logWarn("", end[contentToRestore], start[contentToRestore], contentToRestore) + logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil skipOid = tableOid if *singleDataFile && readers[contentToRestore] != nil { From 68aa2ee8a6f45be010cade4305034f59bb1b9761 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 3 Sep 2025 11:38:56 +0300 Subject: [PATCH 13/46] Remove parenthesis --- helper/restore_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 5602d384d..c012924c7 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -106,7 +106,7 @@ func (r *RestoreReader) positionReader(pos uint64, oid int) error { } func (r *RestoreReader) discardData(num int64) (int64, error) { - if (r.readerType == SUBSET) { + if r.readerType == SUBSET { n, err := io.CopyN(io.Discard, r.bufReader, num) if err != nil { logVerbose(fmt.Sprintf("%d bytes to discard", n)) From fe9fc5a77c4289cbe9a0a2ab39639a494f96be5f Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 3 Sep 2025 13:41:57 +0300 Subject: [PATCH 14/46] Assert --- helper/restore_helper.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index c012924c7..a2e3edf1d 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -106,19 +106,19 @@ func (r *RestoreReader) positionReader(pos uint64, oid int) error { } func (r *RestoreReader) discardData(num int64) (int64, error) { - if r.readerType == SUBSET { - n, err := io.CopyN(io.Discard, r.bufReader, num) - if err != nil { - logVerbose(fmt.Sprintf("%d bytes to discard", n)) - if err != io.EOF { - err = errors.Wrap(discardError, err.Error()) - } - } - logVerbose(fmt.Sprintf("discarded %d bytes", n)) - return n, err + if r.readerType != SUBSET { + panic("discardData should be called for readerType == SUBSET only") } - return 0, nil + n, err := io.CopyN(io.Discard, r.bufReader, num) + if err != nil { + logVerbose(fmt.Sprintf("%d bytes to discard", n)) + if err != io.EOF { + err = errors.Wrap(discardError, err.Error()) + } + } + logVerbose(fmt.Sprintf("discarded %d bytes", n)) + return n, err } func (r *RestoreReader) copyData(num int64) (int64, error) { @@ -379,7 +379,7 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil skipOid = tableOid - if *singleDataFile && readers[contentToRestore] != nil { + if *singleDataFile && readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SUBSET { bytesToDiscard := int64(end[contentToRestore] - start[contentToRestore]) _, err = readers[contentToRestore].discardData(bytesToDiscard) } From edeb4d99e1dd5416a424051919f742851b9fbcce Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 3 Sep 2025 13:47:15 +0300 Subject: [PATCH 15/46] fmt --- helper/restore_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index a2e3edf1d..467cda806 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -379,7 +379,7 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil skipOid = tableOid - if *singleDataFile && readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SUBSET { + if *singleDataFile && readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SUBSET { bytesToDiscard := int64(end[contentToRestore] - start[contentToRestore]) _, err = readers[contentToRestore].discardData(bytesToDiscard) } From 708cf622e0e3c53116f1a95076a128f1610a81fa Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 3 Sep 2025 13:49:25 +0300 Subject: [PATCH 16/46] Fix --- helper/restore_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 467cda806..a869a7fd3 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -112,7 +112,7 @@ func (r *RestoreReader) discardData(num int64) (int64, error) { n, err := io.CopyN(io.Discard, r.bufReader, num) if err != nil { - logVerbose(fmt.Sprintf("%d bytes to discard", n)) + logVerbose(fmt.Sprintf("%d bytes to discard", num)) if err != io.EOF { err = errors.Wrap(discardError, err.Error()) } From 852a4a597e717e9f229dee6473e7b750e3715968 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Wed, 3 Sep 2025 14:42:04 +0300 Subject: [PATCH 17/46] Fix tests --- helper/helper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index b03069ce2..3613547f4 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -47,7 +47,7 @@ func (r *restoreReaderTestImpl) closeFileHandle() { } func (r *restoreReaderTestImpl) getReaderType() ReaderType { - return "nil" + return SUBSET } func (r *restoreReaderTestImpl) discardData(num int64) (int64, error) { From c95ed61c9d13f51d1577783fe88c776ac511f568 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Thu, 4 Sep 2025 08:43:36 +0300 Subject: [PATCH 18/46] BeNil -> HaveOccurred --- helper/helper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 3613547f4..d12302c72 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -484,7 +484,7 @@ var _ = Describe("helper tests", func() { helper := newHelperTest(oidBatch, expectedScenario) err := doRestoreAgentInternal(helper) - Expect(err).To(BeNil()) + Expect(err).ToNot(HaveOccurred()) Expect(discarded).To(Equal(int64(18))) }) It("calls Wait in waitForPlugin doRestoreAgent for single data file", func() { From cbaba3841ef8a11f43bb094143a6bc07c9676333 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Thu, 4 Sep 2025 09:46:40 +0300 Subject: [PATCH 19/46] BeforeEach --- helper/helper_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/helper/helper_test.go b/helper/helper_test.go index d12302c72..c894b10c8 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -547,6 +547,9 @@ var _ = Describe("helper tests", func() { }) }) Describe("RestoreReader tests", func() { + BeforeEach(func() { + *onErrorContinue = false + }) It("waitForPlugin normal completion", func() { test_cmd1 := testPluginCmd{hasProcess_: true} test_reader := new(RestoreReader) From 4b52d502c3d220f11d18a6c1cbfac8fa25a6ebc9 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Thu, 4 Sep 2025 10:04:08 +0300 Subject: [PATCH 20/46] writer --- helper/helper_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/helper/helper_test.go b/helper/helper_test.go index c894b10c8..81297ddf8 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -549,6 +549,7 @@ var _ = Describe("helper tests", func() { Describe("RestoreReader tests", func() { BeforeEach(func() { *onErrorContinue = false + writer = nil }) It("waitForPlugin normal completion", func() { test_cmd1 := testPluginCmd{hasProcess_: true} From 8caf16a3d9926df70bba014035d22a5bd4cea83c Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Thu, 4 Sep 2025 10:14:28 +0300 Subject: [PATCH 21/46] AfterEach --- helper/helper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 81297ddf8..0bdbba192 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -547,7 +547,7 @@ var _ = Describe("helper tests", func() { }) }) Describe("RestoreReader tests", func() { - BeforeEach(func() { + AfterEach(func() { *onErrorContinue = false writer = nil }) From d3d034b08dc6887327c78d21ead3a40e54643ba6 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Fri, 5 Sep 2025 15:04:16 +0300 Subject: [PATCH 22/46] EOF and discardError are not fatal for seekable --- helper/restore_helper.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index a869a7fd3..e47dd9c96 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -477,7 +477,8 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { if err != nil { logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err)) - if *onErrorContinue && !errors.Is(err, io.EOF) && !errors.Is(err, discardError) { + seekable := readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SEEKABLE + if *onErrorContinue && (seekable || (!errors.Is(err, io.EOF) && !errors.Is(err, discardError))) { lastError = err err = nil continue From d18a663dbb350407c06a39deb70fb6b1c85b1aec Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Fri, 5 Sep 2025 15:34:33 +0300 Subject: [PATCH 23/46] Rename --- helper/helper_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 0bdbba192..0b182a661 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -206,43 +206,43 @@ func (pt *testPluginCmd) errLog() { } type limitReader struct { - rem int + remainder int err error } func (r *limitReader) Read(p []byte) (n int, err error) { - if r.rem <= 0 { + if r.remainder <= 0 { return 0, r.err } - if len(p) > r.rem { - p = p[0:r.rem] + if len(p) > r.remainder { + p = p[0:r.remainder] } n = len(p) for i := 0; i < n; i++ { p[i] = 1 } - r.rem -= n + r.remainder -= n return } type limitWriter struct { - rem int + remainder int } func (w *limitWriter) Write(p []byte) (n int, err error) { - if w.rem < len(p) { - n = w.rem + if w.remainder < len(p) { + n = w.remainder } else { n = len(p) } - if w.rem == 0 { + if w.remainder == 0 { err = io.ErrShortWrite } - w.rem -= n + w.remainder -= n return } From 3cb04faeabf7eeaa5e6671d6282b967f00aa7bf1 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Sun, 7 Sep 2025 20:06:34 +0300 Subject: [PATCH 24/46] Rewrite test --- helper/helper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 0b182a661..bdf30d67c 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -602,7 +602,7 @@ var _ = Describe("helper tests", func() { bytesRead, err := test_reader.copyData(18) Expect(bytesRead).To(Equal(int64(18))) - Expect(err).To(HaveOccurred()) + Expect(err).To(Equal(io.ErrShortWrite)) }) It("CopyData, readerType is SUBSET. EOF", func() { *onErrorContinue = true From 82c8ef18b22a5039ecdc473feb38a441b1478c11 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 8 Sep 2025 10:21:51 +0300 Subject: [PATCH 25/46] Improve logging --- helper/restore_helper.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index e47dd9c96..1c5984f08 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -112,12 +112,13 @@ func (r *RestoreReader) discardData(num int64) (int64, error) { n, err := io.CopyN(io.Discard, r.bufReader, num) if err != nil { - logVerbose(fmt.Sprintf("%d bytes to discard", num)) + logError(fmt.Sprintf("discarded %d bytes from %d. Error: %s", n, num, err.Error())) if err != io.EOF { err = errors.Wrap(discardError, err.Error()) } + } else { + logVerbose(fmt.Sprintf("discarded %d bytes", n)) } - logVerbose(fmt.Sprintf("discarded %d bytes", n)) return n, err } From 3b0e82ebe9f7db85ad020b532944f23be5bcece2 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 8 Sep 2025 13:48:47 +0300 Subject: [PATCH 26/46] Improve logging --- helper/helper_test.go | 19 ++++++++++++++----- helper/restore_helper.go | 6 ++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index bdf30d67c..7fd1f504d 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -4,6 +4,7 @@ import ( "bufio" "fmt" "io" + "strings" "os" "github.com/greenplum-db/gpbackup/utils" @@ -628,20 +629,28 @@ var _ = Describe("helper tests", func() { bytesRead, err := test_reader.copyData(30) Expect(bytesRead).To(Equal(int64(25))) - Expect(err).To(Equal(io.EOF)) + Expect(errors.Is(err, discardError)).To(Equal(true)) + Expect(errors.Is(err, io.EOF)).To(Equal(true)) }) It("CopyData, readerType is SUBSET. Error on write and on read", func() { *onErrorContinue = true - writer = bufio.NewWriterSize(&limitWriter{7}, 5) + bufSize := 5 + toCopy := int64(30) + rLmt := int64(25) + writer = bufio.NewWriterSize(&limitWriter{7}, bufSize) test_reader := RestoreReader{ readerType: SUBSET, - bufReader: bufio.NewReader(&limitReader{25, io.ErrNoProgress}), + bufReader: bufio.NewReader(&limitReader{int(rLmt), io.ErrNoProgress}), } - bytesRead, err := test_reader.copyData(30) - Expect(bytesRead).To(Equal(int64(25))) + bytesRead, err := test_reader.copyData(toCopy) + Expect(bytesRead).To(Equal(rLmt)) Expect(errors.Is(err, discardError)).To(Equal(true)) + Expect(errors.Is(err, io.ErrNoProgress)).To(Equal(true)) + readBeforeErr := int64(bufSize * 2) + str := fmt.Sprintf("discarded %d bytes from %d: [", rLmt - readBeforeErr, toCopy - readBeforeErr) + Expect(strings.HasPrefix(err.Error(), str)).To(Equal(true)) }) }) }) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 1c5984f08..123453f5f 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -112,10 +112,8 @@ func (r *RestoreReader) discardData(num int64) (int64, error) { n, err := io.CopyN(io.Discard, r.bufReader, num) if err != nil { - logError(fmt.Sprintf("discarded %d bytes from %d. Error: %s", n, num, err.Error())) - if err != io.EOF { - err = errors.Wrap(discardError, err.Error()) - } + err = fmt.Errorf("discarded %d bytes from %d: [%w: [%w]]", n, num, err, discardError) + logError(err.Error()) } else { logVerbose(fmt.Sprintf("discarded %d bytes", n)) } From 28807a105a2edf90e61f91df15c528e32e9ab53a Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 8 Sep 2025 15:09:53 +0300 Subject: [PATCH 27/46] Simplify --- helper/restore_helper.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 123453f5f..575927c7c 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -476,14 +476,17 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { if err != nil { logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err)) + if (!*onErrorContinue) { + return err + } + seekable := readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SEEKABLE - if *onErrorContinue && (seekable || (!errors.Is(err, io.EOF) && !errors.Is(err, discardError))) { - lastError = err - err = nil - continue - } else { + if !seekable && (errors.Is(err, io.EOF) || errors.Is(err, discardError)) { return err } + + lastError = err + err = nil } } From 327db888dfbbab69ef4b5368c588a924db1021f6 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 8 Sep 2025 15:23:42 +0300 Subject: [PATCH 28/46] Add comment --- helper/restore_helper.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 575927c7c..4cf1c4afd 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -480,6 +480,8 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { return err } + // When we read data from NONSEEKABLE or SUBSET, we cannot read more when EOF or discardError happens. + // These errors are not a problem for SEEKABLE, because the next table may be in the middle of the file. seekable := readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SEEKABLE if !seekable && (errors.Is(err, io.EOF) || errors.Is(err, discardError)) { return err From f3b8da089beec9aaeea47ed424c964e874a688e9 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 8 Sep 2025 15:34:38 +0300 Subject: [PATCH 29/46] split conditions --- helper/restore_helper.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 4cf1c4afd..c0b448cb0 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -482,9 +482,10 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { // When we read data from NONSEEKABLE or SUBSET, we cannot read more when EOF or discardError happens. // These errors are not a problem for SEEKABLE, because the next table may be in the middle of the file. - seekable := readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SEEKABLE - if !seekable && (errors.Is(err, io.EOF) || errors.Is(err, discardError)) { - return err + if readers[contentToRestore] == nil || readers[contentToRestore].getReaderType() != SEEKABLE { + if errors.Is(err, io.EOF) || errors.Is(err, discardError) { + return err + } } lastError = err From 3cad64a68dcea288a24414fc53e2322c956c813c Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 8 Sep 2025 16:50:07 +0300 Subject: [PATCH 30/46] Remove parenthesis --- helper/restore_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index c0b448cb0..58866947d 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -476,7 +476,7 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { if err != nil { logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err)) - if (!*onErrorContinue) { + if !*onErrorContinue { return err } From b87a05ddd8304dbdba9215af33fee1f0f7229d7a Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 9 Sep 2025 15:08:02 +0300 Subject: [PATCH 31/46] Revert changes about exit from doRestoreAgentInternal --- helper/restore_helper.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 58866947d..71c2772bb 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -476,20 +476,13 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { if err != nil { logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err)) - if !*onErrorContinue { + if *onErrorContinue { + lastError = err + err = nil + continue + } else { return err } - - // When we read data from NONSEEKABLE or SUBSET, we cannot read more when EOF or discardError happens. - // These errors are not a problem for SEEKABLE, because the next table may be in the middle of the file. - if readers[contentToRestore] == nil || readers[contentToRestore].getReaderType() != SEEKABLE { - if errors.Is(err, io.EOF) || errors.Is(err, discardError) { - return err - } - } - - lastError = err - err = nil } } From cb926b2bdf816cdf542f112892c05abb8f2a046c Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Thu, 11 Sep 2025 10:20:15 +0300 Subject: [PATCH 32/46] Add exit on discardError --- helper/restore_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 71c2772bb..9b3a861ad 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -476,7 +476,7 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { if err != nil { logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err)) - if *onErrorContinue { + if *onErrorContinue && !errors.Is(err, discardError) { lastError = err err = nil continue From 12094de252e0575560488f6fc4276e76fd4f4c6f Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Thu, 11 Sep 2025 13:13:03 +0300 Subject: [PATCH 33/46] Don't miss CopyN error --- helper/helper_test.go | 35 ++++++++++++++++++++++++++--------- helper/restore_helper.go | 3 ++- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 7fd1f504d..75b4ad54e 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -594,16 +594,21 @@ var _ = Describe("helper tests", func() { }) It("CopyData, readerType is SUBSET. Error on write", func() { *onErrorContinue = true - writer = bufio.NewWriterSize(&limitWriter{7}, 5) + bufSize := 5 + toRead := int64(18) + writer = bufio.NewWriterSize(&limitWriter{7}, bufSize) test_reader := RestoreReader{ readerType: SUBSET, bufReader: bufio.NewReader(&limitReader{100, io.EOF}), } - bytesRead, err := test_reader.copyData(18) - Expect(bytesRead).To(Equal(int64(18))) - Expect(err).To(Equal(io.ErrShortWrite)) + bytesRead, err := test_reader.copyData(toRead) + Expect(bytesRead).To(Equal(toRead)) + Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) + str := fmt.Sprintf("copied %d bytes from %d: [", bufSize * 2, toRead) + Expect(strings.HasPrefix(err.Error(), str)).To(Equal(true)) + }) It("CopyData, readerType is SUBSET. EOF", func() { *onErrorContinue = true @@ -620,17 +625,26 @@ var _ = Describe("helper tests", func() { }) It("CopyData, readerType is SUBSET. Error on write and EOF", func() { *onErrorContinue = true - writer = bufio.NewWriterSize(&limitWriter{7}, 5) + bufSize := 5 + toCopy := int64(30) + rLmt := int64(25) + writer = bufio.NewWriterSize(&limitWriter{7}, bufSize) test_reader := RestoreReader{ readerType: SUBSET, - bufReader: bufio.NewReader(&limitReader{25, io.EOF}), + bufReader: bufio.NewReader(&limitReader{int(rLmt), io.EOF}), } - bytesRead, err := test_reader.copyData(30) - Expect(bytesRead).To(Equal(int64(25))) + bytesRead, err := test_reader.copyData(toCopy) + Expect(bytesRead).To(Equal(rLmt)) Expect(errors.Is(err, discardError)).To(Equal(true)) + Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) Expect(errors.Is(err, io.EOF)).To(Equal(true)) + readBeforeErr := int64(bufSize * 2) + prefix := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt - readBeforeErr, toCopy - readBeforeErr) + Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) + strCopied := fmt.Sprintf("[copied %d bytes from %d: [", readBeforeErr, toCopy) + Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) }) It("CopyData, readerType is SUBSET. Error on write and on read", func() { *onErrorContinue = true @@ -647,10 +661,13 @@ var _ = Describe("helper tests", func() { bytesRead, err := test_reader.copyData(toCopy) Expect(bytesRead).To(Equal(rLmt)) Expect(errors.Is(err, discardError)).To(Equal(true)) + Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) Expect(errors.Is(err, io.ErrNoProgress)).To(Equal(true)) readBeforeErr := int64(bufSize * 2) - str := fmt.Sprintf("discarded %d bytes from %d: [", rLmt - readBeforeErr, toCopy - readBeforeErr) + str := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt - readBeforeErr, toCopy - readBeforeErr) Expect(strings.HasPrefix(err.Error(), str)).To(Equal(true)) + strCopied := fmt.Sprintf("[copied %d bytes from %d: [", readBeforeErr, toCopy) + Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) }) }) }) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 9b3a861ad..9e9c273c9 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -131,10 +131,11 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { case SUBSET: bytesRead, err = io.CopyN(writer, r.bufReader, num) if err != nil && err != io.EOF && *onErrorContinue { + err = fmt.Errorf("copied %d bytes from %d: [%w]", bytesRead, num, err) bytesDiscard, errDiscard := r.discardData(num - bytesRead) bytesRead += bytesDiscard if errDiscard != nil { - err = errDiscard + err = fmt.Errorf("discard error in copyData: [%w: [%w]]", errDiscard, err) } } } From 52568ff8c066e163aaf5d9e3752bb216f92d314e Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Thu, 11 Sep 2025 13:15:28 +0300 Subject: [PATCH 34/46] change variable name --- helper/helper_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 75b4ad54e..dba72625c 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -664,8 +664,8 @@ var _ = Describe("helper tests", func() { Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) Expect(errors.Is(err, io.ErrNoProgress)).To(Equal(true)) readBeforeErr := int64(bufSize * 2) - str := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt - readBeforeErr, toCopy - readBeforeErr) - Expect(strings.HasPrefix(err.Error(), str)).To(Equal(true)) + prefix := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt - readBeforeErr, toCopy - readBeforeErr) + Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) strCopied := fmt.Sprintf("[copied %d bytes from %d: [", readBeforeErr, toCopy) Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) }) From 8bb569ffb62e9bbf6bfabb945bcb546bde21fc62 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Fri, 12 Sep 2025 13:44:32 +0300 Subject: [PATCH 35/46] Fix hanging --- helper/helper_test.go | 2 -- helper/restore_helper.go | 30 ++++++++++++++++++++++-------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index dba72625c..ab59b35a5 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -637,7 +637,6 @@ var _ = Describe("helper tests", func() { bytesRead, err := test_reader.copyData(toCopy) Expect(bytesRead).To(Equal(rLmt)) - Expect(errors.Is(err, discardError)).To(Equal(true)) Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) Expect(errors.Is(err, io.EOF)).To(Equal(true)) readBeforeErr := int64(bufSize * 2) @@ -660,7 +659,6 @@ var _ = Describe("helper tests", func() { bytesRead, err := test_reader.copyData(toCopy) Expect(bytesRead).To(Equal(rLmt)) - Expect(errors.Is(err, discardError)).To(Equal(true)) Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) Expect(errors.Is(err, io.ErrNoProgress)).To(Equal(true)) readBeforeErr := int64(bufSize * 2) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 9e9c273c9..81611e4df 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -40,8 +40,6 @@ var ( contentRE *regexp.Regexp ) -var discardError = errors.New("helper: discard error") - /* IRestoreReader interface to wrap the underlying reader. * getReaderType() identifies how the reader can be used * SEEKABLE uses seekReader. Used when restoring from uncompressed data with filters from local filesystem @@ -65,6 +63,7 @@ type RestoreReader struct { seekReader io.ReadSeeker pluginCmd IPluginCmd readerType ReaderType + discardErr error } // Wait for plugin process that should be already finished. This should be @@ -110,14 +109,19 @@ func (r *RestoreReader) discardData(num int64) (int64, error) { panic("discardData should be called for readerType == SUBSET only") } + if r.discardErr != nil { + logVerbose(fmt.Sprintf("%d bytes to discard, but discard error has already been set. Don't read", num)) + return 0, r.discardErr + } + n, err := io.CopyN(io.Discard, r.bufReader, num) - if err != nil { - err = fmt.Errorf("discarded %d bytes from %d: [%w: [%w]]", n, num, err, discardError) - logError(err.Error()) - } else { + if err == nil { logVerbose(fmt.Sprintf("discarded %d bytes", n)) + } else { + r.discardErr = fmt.Errorf("discarded %d bytes from %d: [%w]", n, num, err) + logError(r.discardErr.Error()) } - return n, err + return n, r.discardErr } func (r *RestoreReader) copyData(num int64) (int64, error) { @@ -129,6 +133,11 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { case NONSEEKABLE: bytesRead, err = io.CopyN(writer, r.bufReader, num) case SUBSET: + if r.discardErr != nil { + logVerbose(fmt.Sprintf("%d bytes to copy, but discard error has already been set. Don't read", num)) + return 0, r.discardErr + } + bytesRead, err = io.CopyN(writer, r.bufReader, num) if err != nil && err != io.EOF && *onErrorContinue { err = fmt.Errorf("copied %d bytes from %d: [%w]", bytesRead, num, err) @@ -149,6 +158,11 @@ func (r *RestoreReader) copyAllData() (int64, error) { case SEEKABLE: bytesRead, err = io.Copy(writer, r.seekReader) case NONSEEKABLE, SUBSET: + if r.discardErr != nil { + logVerbose("copyAllData: discard error has already been set. Don't read") + return 0, r.discardErr + } + bytesRead, err = io.Copy(writer, r.bufReader) } return bytesRead, err @@ -477,7 +491,7 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error { if err != nil { logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err)) - if *onErrorContinue && !errors.Is(err, discardError) { + if *onErrorContinue { lastError = err err = nil continue From aa88bf067505352b580075cd6785c98133d331ad Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 15 Sep 2025 08:42:16 +0300 Subject: [PATCH 36/46] Fix logging --- helper/restore_helper.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 81611e4df..318404251 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -40,6 +40,8 @@ var ( contentRE *regexp.Regexp ) +var discardError = errors.New("discard error occurred when restoring a previous table") + /* IRestoreReader interface to wrap the underlying reader. * getReaderType() identifies how the reader can be used * SEEKABLE uses seekReader. Used when restoring from uncompressed data with filters from local filesystem @@ -63,7 +65,7 @@ type RestoreReader struct { seekReader io.ReadSeeker pluginCmd IPluginCmd readerType ReaderType - discardErr error + discardErr bool } // Wait for plugin process that should be already finished. This should be @@ -109,19 +111,20 @@ func (r *RestoreReader) discardData(num int64) (int64, error) { panic("discardData should be called for readerType == SUBSET only") } - if r.discardErr != nil { - logVerbose(fmt.Sprintf("%d bytes to discard, but discard error has already been set. Don't read", num)) - return 0, r.discardErr + if r.discardErr { + logVerbose(fmt.Sprintf("%d bytes to discard, but discard error has already occurred. Don't read", num)) + return 0, discardError } n, err := io.CopyN(io.Discard, r.bufReader, num) if err == nil { logVerbose(fmt.Sprintf("discarded %d bytes", n)) } else { - r.discardErr = fmt.Errorf("discarded %d bytes from %d: [%w]", n, num, err) - logError(r.discardErr.Error()) + r.discardErr = true + err = fmt.Errorf("discarded %d bytes from %d: [%w]", n, num, err) + logError(err.Error()) } - return n, r.discardErr + return n, err } func (r *RestoreReader) copyData(num int64) (int64, error) { @@ -133,9 +136,9 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { case NONSEEKABLE: bytesRead, err = io.CopyN(writer, r.bufReader, num) case SUBSET: - if r.discardErr != nil { - logVerbose(fmt.Sprintf("%d bytes to copy, but discard error has already been set. Don't read", num)) - return 0, r.discardErr + if r.discardErr { + logVerbose(fmt.Sprintf("%d bytes to copy, but discard error has already occurred. Don't read", num)) + return 0, discardError } bytesRead, err = io.CopyN(writer, r.bufReader, num) @@ -158,9 +161,9 @@ func (r *RestoreReader) copyAllData() (int64, error) { case SEEKABLE: bytesRead, err = io.Copy(writer, r.seekReader) case NONSEEKABLE, SUBSET: - if r.discardErr != nil { - logVerbose("copyAllData: discard error has already been set. Don't read") - return 0, r.discardErr + if r.discardErr { + logVerbose("copyAllData: discard error has already occurred. Don't read") + return 0, discardError } bytesRead, err = io.Copy(writer, r.bufReader) From d6663ec239e94ea9650ec236944b4fe41fb945bf Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 15 Sep 2025 10:23:07 +0300 Subject: [PATCH 37/46] Revert changes in CopyAllData --- helper/restore_helper.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 318404251..1df69537b 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -161,11 +161,6 @@ func (r *RestoreReader) copyAllData() (int64, error) { case SEEKABLE: bytesRead, err = io.Copy(writer, r.seekReader) case NONSEEKABLE, SUBSET: - if r.discardErr { - logVerbose("copyAllData: discard error has already occurred. Don't read") - return 0, discardError - } - bytesRead, err = io.Copy(writer, r.bufReader) } return bytesRead, err From c644bea2485fc55acc92b24693a1af1d05fdfc4b Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 15 Sep 2025 11:04:51 +0300 Subject: [PATCH 38/46] Rewrite to use Join, go fmt --- helper/helper_test.go | 18 +++++++++--------- helper/restore_helper.go | 4 +++- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index ab59b35a5..189ebe6f2 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -4,8 +4,8 @@ import ( "bufio" "fmt" "io" - "strings" "os" + "strings" "github.com/greenplum-db/gpbackup/utils" "golang.org/x/sys/unix" @@ -20,7 +20,7 @@ import ( var ( testDir = "/tmp/helper_test/20180101/20180101010101" testTocFile = fmt.Sprintf("%s/test_toc.yaml", testDir) - discarded int64 + discarded int64 ) type restoreReaderTestImpl struct { @@ -208,7 +208,7 @@ func (pt *testPluginCmd) errLog() { type limitReader struct { remainder int - err error + err error } func (r *limitReader) Read(p []byte) (n int, err error) { @@ -476,7 +476,7 @@ var _ = Describe("helper tests", func() { }() oidBatch := []oidWithBatch{ - {1 /* The first oid from TOC */, 0}, + {1 /* The first oid from TOC */, 0}, } expectedScenario := []helperTestStep{ @@ -606,7 +606,7 @@ var _ = Describe("helper tests", func() { bytesRead, err := test_reader.copyData(toRead) Expect(bytesRead).To(Equal(toRead)) Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) - str := fmt.Sprintf("copied %d bytes from %d: [", bufSize * 2, toRead) + str := fmt.Sprintf("copied %d bytes from %d: [", bufSize*2, toRead) Expect(strings.HasPrefix(err.Error(), str)).To(Equal(true)) }) @@ -640,9 +640,9 @@ var _ = Describe("helper tests", func() { Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) Expect(errors.Is(err, io.EOF)).To(Equal(true)) readBeforeErr := int64(bufSize * 2) - prefix := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt - readBeforeErr, toCopy - readBeforeErr) + prefix := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt-readBeforeErr, toCopy-readBeforeErr) Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) - strCopied := fmt.Sprintf("[copied %d bytes from %d: [", readBeforeErr, toCopy) + strCopied := fmt.Sprintf("copied %d bytes from %d: [", readBeforeErr, toCopy) Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) }) It("CopyData, readerType is SUBSET. Error on write and on read", func() { @@ -662,9 +662,9 @@ var _ = Describe("helper tests", func() { Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) Expect(errors.Is(err, io.ErrNoProgress)).To(Equal(true)) readBeforeErr := int64(bufSize * 2) - prefix := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt - readBeforeErr, toCopy - readBeforeErr) + prefix := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt-readBeforeErr, toCopy-readBeforeErr) Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) - strCopied := fmt.Sprintf("[copied %d bytes from %d: [", readBeforeErr, toCopy) + strCopied := fmt.Sprintf("copied %d bytes from %d: [", readBeforeErr, toCopy) Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) }) }) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 1df69537b..d2513857e 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "compress/gzip" + errorsStd "errors" "fmt" "io" "io/ioutil" @@ -147,7 +148,8 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { bytesDiscard, errDiscard := r.discardData(num - bytesRead) bytesRead += bytesDiscard if errDiscard != nil { - err = fmt.Errorf("discard error in copyData: [%w: [%w]]", errDiscard, err) + err = errorsStd.Join(errDiscard, err) + err = fmt.Errorf("discard error in copyData: [%w]", err) } } } From 5420290b94933f0a7a54be8b50b6c802c6673fae Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Mon, 15 Sep 2025 16:25:52 +0300 Subject: [PATCH 39/46] Add RestoreReader tests --- helper/helper_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/helper/helper_test.go b/helper/helper_test.go index 189ebe6f2..4f97cfdcc 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -644,6 +644,10 @@ var _ = Describe("helper tests", func() { Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) strCopied := fmt.Sprintf("copied %d bytes from %d: [", readBeforeErr, toCopy) Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) + + bytesRead, err = test_reader.copyData(10) + Expect(bytesRead).To(Equal(int64(0))) + Expect(err).To(Equal(discardError)) }) It("CopyData, readerType is SUBSET. Error on write and on read", func() { *onErrorContinue = true @@ -666,6 +670,10 @@ var _ = Describe("helper tests", func() { Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) strCopied := fmt.Sprintf("copied %d bytes from %d: [", readBeforeErr, toCopy) Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) + + bytesRead, err = test_reader.copyData(10) + Expect(bytesRead).To(Equal(int64(0))) + Expect(err).To(Equal(discardError)) }) }) }) From c7c9d66eb6811cb6fda77f222b8d45d427b4a412 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 16 Sep 2025 09:52:24 +0300 Subject: [PATCH 40/46] Remove discardError --- helper/helper_test.go | 12 ++++++++++-- helper/restore_helper.go | 12 ++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 4f97cfdcc..f7854789b 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -647,7 +647,11 @@ var _ = Describe("helper tests", func() { bytesRead, err = test_reader.copyData(10) Expect(bytesRead).To(Equal(int64(0))) - Expect(err).To(Equal(discardError)) + Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Don't read")) + + bytesRead, err = test_reader.discardData(5) + Expect(bytesRead).To(Equal(int64(0))) + Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Don't read")) }) It("CopyData, readerType is SUBSET. Error on write and on read", func() { *onErrorContinue = true @@ -673,7 +677,11 @@ var _ = Describe("helper tests", func() { bytesRead, err = test_reader.copyData(10) Expect(bytesRead).To(Equal(int64(0))) - Expect(err).To(Equal(discardError)) + Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Don't read")) + + bytesRead, err = test_reader.discardData(5) + Expect(bytesRead).To(Equal(int64(0))) + Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Don't read")) }) }) }) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index d2513857e..cefa30efa 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -41,8 +41,6 @@ var ( contentRE *regexp.Regexp ) -var discardError = errors.New("discard error occurred when restoring a previous table") - /* IRestoreReader interface to wrap the underlying reader. * getReaderType() identifies how the reader can be used * SEEKABLE uses seekReader. Used when restoring from uncompressed data with filters from local filesystem @@ -113,8 +111,9 @@ func (r *RestoreReader) discardData(num int64) (int64, error) { } if r.discardErr { - logVerbose(fmt.Sprintf("%d bytes to discard, but discard error has already occurred. Don't read", num)) - return 0, discardError + err := fmt.Errorf("%d bytes to discard, but discard error has already occurred. Don't read", num) + logVerbose(err.Error()) + return 0, err } n, err := io.CopyN(io.Discard, r.bufReader, num) @@ -138,8 +137,9 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { bytesRead, err = io.CopyN(writer, r.bufReader, num) case SUBSET: if r.discardErr { - logVerbose(fmt.Sprintf("%d bytes to copy, but discard error has already occurred. Don't read", num)) - return 0, discardError + err := fmt.Errorf("%d bytes to copy, but discard error has already occurred. Don't read", num) + logVerbose(err.Error()) + return 0, err } bytesRead, err = io.CopyN(writer, r.bufReader, num) From 769bddc2809cdaa7868981bac73d93790ec00c35 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 16 Sep 2025 10:37:34 +0300 Subject: [PATCH 41/46] Replace fmt.Errorf with errors.Wrap --- helper/helper_test.go | 10 +++++----- helper/restore_helper.go | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index f7854789b..bd5551c13 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -606,7 +606,7 @@ var _ = Describe("helper tests", func() { bytesRead, err := test_reader.copyData(toRead) Expect(bytesRead).To(Equal(toRead)) Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) - str := fmt.Sprintf("copied %d bytes from %d: [", bufSize*2, toRead) + str := fmt.Sprintf("copied %d bytes from %d: ", bufSize*2, toRead) Expect(strings.HasPrefix(err.Error(), str)).To(Equal(true)) }) @@ -640,9 +640,9 @@ var _ = Describe("helper tests", func() { Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) Expect(errors.Is(err, io.EOF)).To(Equal(true)) readBeforeErr := int64(bufSize * 2) - prefix := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt-readBeforeErr, toCopy-readBeforeErr) + prefix := fmt.Sprintf("discard error in copyData: discarded %d bytes from %d: ", rLmt-readBeforeErr, toCopy-readBeforeErr) Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) - strCopied := fmt.Sprintf("copied %d bytes from %d: [", readBeforeErr, toCopy) + strCopied := fmt.Sprintf("copied %d bytes from %d: ", readBeforeErr, toCopy) Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) bytesRead, err = test_reader.copyData(10) @@ -670,9 +670,9 @@ var _ = Describe("helper tests", func() { Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true)) Expect(errors.Is(err, io.ErrNoProgress)).To(Equal(true)) readBeforeErr := int64(bufSize * 2) - prefix := fmt.Sprintf("discard error in copyData: [discarded %d bytes from %d: [", rLmt-readBeforeErr, toCopy-readBeforeErr) + prefix := fmt.Sprintf("discard error in copyData: discarded %d bytes from %d: ", rLmt-readBeforeErr, toCopy-readBeforeErr) Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true)) - strCopied := fmt.Sprintf("copied %d bytes from %d: [", readBeforeErr, toCopy) + strCopied := fmt.Sprintf("copied %d bytes from %d: ", readBeforeErr, toCopy) Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true)) bytesRead, err = test_reader.copyData(10) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index cefa30efa..7196e8ead 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -121,7 +121,7 @@ func (r *RestoreReader) discardData(num int64) (int64, error) { logVerbose(fmt.Sprintf("discarded %d bytes", n)) } else { r.discardErr = true - err = fmt.Errorf("discarded %d bytes from %d: [%w]", n, num, err) + err = errors.Wrapf(err, "discarded %d bytes from %d", n, num) logError(err.Error()) } return n, err @@ -144,12 +144,12 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { bytesRead, err = io.CopyN(writer, r.bufReader, num) if err != nil && err != io.EOF && *onErrorContinue { - err = fmt.Errorf("copied %d bytes from %d: [%w]", bytesRead, num, err) + err = errors.Wrapf(err, "copied %d bytes from %d", bytesRead, num) bytesDiscard, errDiscard := r.discardData(num - bytesRead) bytesRead += bytesDiscard if errDiscard != nil { err = errorsStd.Join(errDiscard, err) - err = fmt.Errorf("discard error in copyData: [%w]", err) + err = errors.Wrap(err, "discard error in copyData") } } } From bba8d88c9385f8cbf6546d276f8538bfd50df922 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 16 Sep 2025 11:47:24 +0300 Subject: [PATCH 42/46] Fix error message --- helper/helper_test.go | 8 ++++---- helper/restore_helper.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index bd5551c13..b68e71a7d 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -647,11 +647,11 @@ var _ = Describe("helper tests", func() { bytesRead, err = test_reader.copyData(10) Expect(bytesRead).To(Equal(int64(0))) - Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Don't read")) + Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Skipping read.")) bytesRead, err = test_reader.discardData(5) Expect(bytesRead).To(Equal(int64(0))) - Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Don't read")) + Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Skipping read.")) }) It("CopyData, readerType is SUBSET. Error on write and on read", func() { *onErrorContinue = true @@ -677,11 +677,11 @@ var _ = Describe("helper tests", func() { bytesRead, err = test_reader.copyData(10) Expect(bytesRead).To(Equal(int64(0))) - Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Don't read")) + Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Skipping read.")) bytesRead, err = test_reader.discardData(5) Expect(bytesRead).To(Equal(int64(0))) - Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Don't read")) + Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Skipping read.")) }) }) }) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 7196e8ead..a0374fd03 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -111,7 +111,7 @@ func (r *RestoreReader) discardData(num int64) (int64, error) { } if r.discardErr { - err := fmt.Errorf("%d bytes to discard, but discard error has already occurred. Don't read", num) + err := fmt.Errorf("%d bytes to discard, but discard error has already occurred. Skipping read.", num) logVerbose(err.Error()) return 0, err } @@ -137,7 +137,7 @@ func (r *RestoreReader) copyData(num int64) (int64, error) { bytesRead, err = io.CopyN(writer, r.bufReader, num) case SUBSET: if r.discardErr { - err := fmt.Errorf("%d bytes to copy, but discard error has already occurred. Don't read", num) + err := fmt.Errorf("%d bytes to copy, but discard error has already occurred. Skipping read.", num) logVerbose(err.Error()) return 0, err } From 3154bf63718e209ef02c6df0578762a4afc5c348 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 16 Sep 2025 12:57:03 +0300 Subject: [PATCH 43/46] Add test for discard error. Rename discarded to discardedCount --- helper/helper_test.go | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index b68e71a7d..649a8a264 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -18,9 +18,10 @@ import ( ) var ( - testDir = "/tmp/helper_test/20180101/20180101010101" - testTocFile = fmt.Sprintf("%s/test_toc.yaml", testDir) - discarded int64 + testDir = "/tmp/helper_test/20180101/20180101010101" + testTocFile = fmt.Sprintf("%s/test_toc.yaml", testDir) + discardedCount int64 + discardErr error ) type restoreReaderTestImpl struct { @@ -52,7 +53,11 @@ func (r *restoreReaderTestImpl) getReaderType() ReaderType { } func (r *restoreReaderTestImpl) discardData(num int64) (int64, error) { - discarded += num + if discardErr != nil { + return 0, discardErr + } + + discardedCount += num return num, nil } @@ -486,7 +491,30 @@ var _ = Describe("helper tests", func() { helper := newHelperTest(oidBatch, expectedScenario) err := doRestoreAgentInternal(helper) Expect(err).ToNot(HaveOccurred()) - Expect(discarded).To(Equal(int64(18))) + Expect(discardedCount).To(Equal(int64(18))) + }) + It("discard error data if skip file is discovered with single datafile", func() { + discardErr = io.EOF + *singleDataFile = true + *isResizeRestore = false + *tocFile = testTocFile + + writeTestTOC(testTocFile) + defer func() { + _ = os.Remove(*tocFile) + }() + + oidBatch := []oidWithBatch{ + {1 /* The first oid from TOC */, 0}, + } + + expectedScenario := []helperTestStep{ + {"mock_1_0", false, 1, true, "Can not open pipe for table 1, check_skip_file shall called, skip file exists"}, + } + + helper := newHelperTest(oidBatch, expectedScenario) + err := doRestoreAgentInternal(helper) + Expect(err).To(Equal(discardErr)) }) It("calls Wait in waitForPlugin doRestoreAgent for single data file", func() { *singleDataFile = true From 90420b9f4c30ba1ec5b12db14515949edd8f1573 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 16 Sep 2025 12:59:03 +0300 Subject: [PATCH 44/46] Rename --- helper/helper_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 649a8a264..d89f03ce4 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -20,7 +20,7 @@ import ( var ( testDir = "/tmp/helper_test/20180101/20180101010101" testTocFile = fmt.Sprintf("%s/test_toc.yaml", testDir) - discardedCount int64 + discardedBytes int64 discardErr error ) @@ -57,7 +57,7 @@ func (r *restoreReaderTestImpl) discardData(num int64) (int64, error) { return 0, discardErr } - discardedCount += num + discardedBytes += num return num, nil } @@ -491,7 +491,7 @@ var _ = Describe("helper tests", func() { helper := newHelperTest(oidBatch, expectedScenario) err := doRestoreAgentInternal(helper) Expect(err).ToNot(HaveOccurred()) - Expect(discardedCount).To(Equal(int64(18))) + Expect(discardedBytes).To(Equal(int64(18))) }) It("discard error data if skip file is discovered with single datafile", func() { discardErr = io.EOF From 4c4c51743152e31c5089f4e7191db9b8a8e5b023 Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 16 Sep 2025 13:03:17 +0300 Subject: [PATCH 45/46] Clear discardErr --- helper/helper_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/helper/helper_test.go b/helper/helper_test.go index d89f03ce4..63ee09ebc 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -515,6 +515,7 @@ var _ = Describe("helper tests", func() { helper := newHelperTest(oidBatch, expectedScenario) err := doRestoreAgentInternal(helper) Expect(err).To(Equal(discardErr)) + discardErr = nil }) It("calls Wait in waitForPlugin doRestoreAgent for single data file", func() { *singleDataFile = true From 3a54c484b68c84935acd112c5006dbc808903dcc Mon Sep 17 00:00:00 2001 From: Andrey Sokolov Date: Tue, 16 Sep 2025 13:23:31 +0300 Subject: [PATCH 46/46] Move global variables to struct --- helper/helper_test.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/helper/helper_test.go b/helper/helper_test.go index 63ee09ebc..fa10d7686 100644 --- a/helper/helper_test.go +++ b/helper/helper_test.go @@ -18,14 +18,14 @@ import ( ) var ( - testDir = "/tmp/helper_test/20180101/20180101010101" - testTocFile = fmt.Sprintf("%s/test_toc.yaml", testDir) - discardedBytes int64 - discardErr error + testDir = "/tmp/helper_test/20180101/20180101010101" + testTocFile = fmt.Sprintf("%s/test_toc.yaml", testDir) ) type restoreReaderTestImpl struct { - waitCount int + waitCount int + discardedBytes int64 + discardErr error } func (r *restoreReaderTestImpl) waitForPlugin() error { @@ -53,11 +53,11 @@ func (r *restoreReaderTestImpl) getReaderType() ReaderType { } func (r *restoreReaderTestImpl) discardData(num int64) (int64, error) { - if discardErr != nil { - return 0, discardErr + if r.discardErr != nil { + return 0, r.discardErr } - discardedBytes += num + r.discardedBytes += num return num, nil } @@ -491,10 +491,9 @@ var _ = Describe("helper tests", func() { helper := newHelperTest(oidBatch, expectedScenario) err := doRestoreAgentInternal(helper) Expect(err).ToNot(HaveOccurred()) - Expect(discardedBytes).To(Equal(int64(18))) + Expect(helper.restoreData.discardedBytes).To(Equal(int64(18))) }) It("discard error data if skip file is discovered with single datafile", func() { - discardErr = io.EOF *singleDataFile = true *isResizeRestore = false *tocFile = testTocFile @@ -513,9 +512,9 @@ var _ = Describe("helper tests", func() { } helper := newHelperTest(oidBatch, expectedScenario) + helper.restoreData.discardErr = io.EOF err := doRestoreAgentInternal(helper) - Expect(err).To(Equal(discardErr)) - discardErr = nil + Expect(err).To(Equal(io.EOF)) }) It("calls Wait in waitForPlugin doRestoreAgent for single data file", func() { *singleDataFile = true