@@ -3,6 +3,10 @@ package storage
33import (
44 "ashishkujoy/queue/internal/config"
55 "fmt"
6+ "os"
7+ "slices"
8+ "strconv"
9+ "strings"
610 "sync"
711)
812
@@ -35,6 +39,77 @@ func NewSegments(config *config.Config, index *Index) (*Segments, error) {
3539 }, nil
3640}
3741
42+ // RestoreSegments restores segments from the given configuration and index.
43+ func RestoreSegments (c * config.Config , index * Index ) (* Segments , error ) {
44+ segmentIds , err := getSegmentIds (c .SegmentsRoot ())
45+ if err != nil {
46+ return nil , err
47+ }
48+
49+ closedSegments , err2 := restoreSegmentsById (c , segmentIds )
50+ if err2 != nil {
51+ return nil , err2
52+ }
53+
54+ activeSegmentId , activeSegment , err := createActiveSegment (c , segmentIds , err )
55+ if err != nil {
56+ return nil , err
57+ }
58+ return & Segments {
59+ config : c ,
60+ active : activeSegment ,
61+ id : activeSegmentId ,
62+ index : index ,
63+ closedSegments : closedSegments ,
64+ mu : & sync.Mutex {},
65+ }, nil
66+ }
67+
68+ func createActiveSegment (c * config.Config , segmentIds []int , err error ) (int , * Segment , error ) {
69+ activeSegmentId := 0
70+ if len (segmentIds ) != 0 {
71+ activeSegmentId = segmentIds [len (segmentIds )- 1 ]
72+ }
73+ activeSegment , err := NewSegment (activeSegmentId + 1 , c )
74+ if err != nil {
75+ return 0 , nil , err
76+ }
77+ return activeSegmentId + 1 , activeSegment , nil
78+ }
79+
80+ func restoreSegmentsById (c * config.Config , segmentIds []int ) ([]* Segment , error ) {
81+ var closedSegments []* Segment
82+ for _ , segmentId := range segmentIds {
83+ segment , err := RestoreSegment (segmentId , c )
84+ if err != nil {
85+ return nil , err
86+ }
87+ closedSegments = append (closedSegments , segment )
88+ }
89+ return closedSegments , nil
90+ }
91+
92+ func getSegmentIds (segmentsRoot string ) ([]int , error ) {
93+ entries , err := os .ReadDir (segmentsRoot )
94+ if err != nil {
95+ return nil , err
96+ }
97+ var segmentIds []int
98+ for _ , entry := range entries {
99+ if ! strings .HasPrefix (entry .Name (), "segment-" ) {
100+ continue
101+ }
102+ segmentIdStr := strings .TrimPrefix (entry .Name (), "segment-" )
103+ id , err := strconv .Atoi (segmentIdStr )
104+ if err != nil {
105+ return nil , err
106+ }
107+ segmentIds = append (segmentIds , id )
108+ }
109+ slices .Sort (segmentIds )
110+ return segmentIds , nil
111+ }
112+
38113// Append appends data to the active segment.
39114// If the active segment is full, it rolls over to a new segment.
40115func (s * Segments ) Append (data []byte ) (int , error ) {
0 commit comments