From f49d50ac5665224355bd64d43ad6adc5d988e815 Mon Sep 17 00:00:00 2001 From: Mozes721 Date: Thu, 18 Apr 2024 13:07:58 +0000 Subject: [PATCH 1/5] flags test --- sig/flags.go | 127 ++++++++++++++++++++++++++++++++++++++++++++++ sig/flags_test.go | 118 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 245 insertions(+) create mode 100644 sig/flags.go create mode 100644 sig/flags_test.go diff --git a/sig/flags.go b/sig/flags.go new file mode 100644 index 000000000..ec1ae423d --- /dev/null +++ b/sig/flags.go @@ -0,0 +1,127 @@ +package sig + +import ( + "context" + "sync" +) + +//TODO: Implemet Flags. Be mindful of memory management: +// - Wait/WaitContext should resuse channels whenever possible +// - Wait/WaitContext should discard channels that are no longer needed + +// Flags provides a thread-safe observable flag set +type Flags struct { + flagMap map[string]bool + waitMap map[string][]chan struct{} + mtx sync.Mutex +} + +func NewFlags() *Flags { + flag := &Flags{ + flagMap: make(map[string]bool), + waitMap: make(map[string][]chan struct{}), + } + + return flag +} + +// Set sets the provided flags +func (flags *Flags) Set(flag ...string) { + flags.mtx.Lock() + defer flags.mtx.Unlock() + + for _, f := range flag { + flags.flagMap[f] = true + } + + for _, f := range flag { + for _, ch := range flags.waitMap[f] { + close(ch) + } + flags.waitMap[f] = nil + } +} + +// Clear clears the provided flags +func (flags *Flags) Clear(flag ...string) { + flags.mtx.Lock() + defer flags.mtx.Unlock() + + for _, f := range flag { + flags.flagMap[f] = false + } + + for _, f := range flag { + for _, ch := range flags.waitMap[f] { + close(ch) + } + flags.waitMap[f] = nil + } +} + +// IsSet returns true if the flag is up, false otherwise +func (flags *Flags) IsSet(flag string) bool { + flags.mtx.Lock() + defer flags.mtx.Unlock() + return flags.flagMap[flag] +} + +// Flags returns a list of all set flags +func (flags *Flags) Flags() []string { + flags.mtx.Lock() + defer flags.mtx.Unlock() + + var setFlags []string + for flag, isSet := range flags.flagMap { + if isSet { + setFlags = append(setFlags, flag) + } + } + + return setFlags +} + +// Wait returns a channel that will be closed as soon as the flag is in the specified state. +// If the flag is already in the specified state, Wait immediately returns a closed channel. +func (flags *Flags) Wait(flag string, state bool) <-chan struct{} { + flags.mtx.Lock() + defer flags.mtx.Unlock() + + if flags.flagMap[flag] == state { + ch := make(chan struct{}) + close(ch) + return ch + } + + ch := make(chan struct{}) + flags.waitMap[flag] = append(flags.waitMap[flag], ch) + + return ch +} + +// WaitContext waits until one of the following occurs: +// 1. Context is canceled - WaitContext returns ctx.Err() +// 2. Flag is in the specified state - WaitContext returns nil +// If the flag is already in the specified state when the function is called, it returns nil immediately. +func (flags *Flags) WaitContext(ctx context.Context, flag string, state bool) error { + flags.mtx.Lock() + defer flags.mtx.Unlock() + + if flags.flagMap[flag] == state { + return nil + } + + ch := make(chan struct{}) + flags.waitMap[flag] = append(flags.waitMap[flag], ch) + + flags.mtx.Unlock() + + select { + case <-ch: + return nil + + case <-ctx.Done(): + + return ctx.Err() + } +} diff --git a/sig/flags_test.go b/sig/flags_test.go new file mode 100644 index 000000000..c292b1b08 --- /dev/null +++ b/sig/flags_test.go @@ -0,0 +1,118 @@ +package sig + +import ( + "context" + "errors" + "testing" + "time" +) + +var _ FlagsSpec = &Flags{} + +type FlagsSpec interface { + Set(flag ...string) + Clear(flag ...string) + IsSet(flag string) bool + Flags() []string + Wait(flag string, state bool) <-chan struct{} + WaitContext(ctx context.Context, flag string, state bool) error +} + +const TestFlag1 = "test1" +const TestFlag2 = "test2" + +func TestWait(t *testing.T) { + var flags = NewFlags() + var tick = make(chan struct{}) + + go func() { + select { + case <-flags.Wait(TestFlag1, false): + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout reached") + } + + tick <- struct{}{} + + select { + case <-flags.Wait(TestFlag1, true): + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout reached") + } + + select { + case <-flags.Wait(TestFlag2, true): + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout reached") + } + + tick <- struct{}{} + + select { + case <-flags.Wait(TestFlag2, false): + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout reached") + } + + tick <- struct{}{} + }() + + <-tick + time.After(10 * time.Millisecond) + + flags.Set(TestFlag1, TestFlag2) + + <-tick + time.After(10 * time.Millisecond) + + flags.Clear(TestFlag2) + + <-tick +} + +func TestWaitContext(t *testing.T) { + var flags = NewFlags() + var tick = make(chan struct{}) + + go func() { + var err error + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + err = flags.WaitContext(ctx, TestFlag1, true) + if err != nil { + t.Fatal("unexpected err:", err) + } + + tick <- struct{}{} + + err = flags.WaitContext(ctx, TestFlag1, false) + if err != nil { + t.Fatal("unexpected err:", err) + } + + err = flags.WaitContext(ctx, TestFlag2, false) + if err != nil { + t.Fatal("unexpected err:", err) + } + + ctx, cancel = context.WithCancel(context.Background()) + cancel() + + err = flags.WaitContext(ctx, TestFlag2, true) + if !errors.Is(err, context.Canceled) { + t.Fatal("unexpected err:", err) + } + + tick <- struct{}{} + }() + + flags.Set(TestFlag1) + + <-tick + + flags.Clear(TestFlag1) + + <-tick +} From 76c1581d8837f545303401807d639bfec4c26ebe Mon Sep 17 00:00:00 2001 From: Mozes721 Date: Tue, 7 May 2024 01:14:19 +0200 Subject: [PATCH 2/5] Update flags.go --- sig/flags.go | 93 ++++++++++++++++++++++++---------------------------- 1 file changed, 43 insertions(+), 50 deletions(-) diff --git a/sig/flags.go b/sig/flags.go index ec1ae423d..630d91267 100644 --- a/sig/flags.go +++ b/sig/flags.go @@ -11,91 +11,85 @@ import ( // Flags provides a thread-safe observable flag set type Flags struct { - flagMap map[string]bool - waitMap map[string][]chan struct{} - mtx sync.Mutex + flagStates map[string]bool + flagChannels map[string]chan struct{} + mu sync.Mutex } func NewFlags() *Flags { - flag := &Flags{ - flagMap: make(map[string]bool), - waitMap: make(map[string][]chan struct{}), + return &Flags{ + flagStates: make(map[string]bool), + flagChannels: make(map[string]chan struct{}), } - - return flag } // Set sets the provided flags func (flags *Flags) Set(flag ...string) { - flags.mtx.Lock() - defer flags.mtx.Unlock() + flags.mu.Lock() + defer flags.mu.Unlock() for _, f := range flag { - flags.flagMap[f] = true - } - - for _, f := range flag { - for _, ch := range flags.waitMap[f] { + flags.flagStates[f] = true + if ch, ok := flags.flagChannels[f]; ok { close(ch) + delete(flags.flagChannels, f) } - flags.waitMap[f] = nil } } // Clear clears the provided flags func (flags *Flags) Clear(flag ...string) { - flags.mtx.Lock() - defer flags.mtx.Unlock() - - for _, f := range flag { - flags.flagMap[f] = false - } + flags.mu.Lock() + defer flags.mu.Unlock() for _, f := range flag { - for _, ch := range flags.waitMap[f] { + flags.flagStates[f] = false + if ch, ok := flags.flagChannels[f]; ok { close(ch) + delete(flags.flagChannels, f) } - flags.waitMap[f] = nil } } // IsSet returns true if the flag is up, false otherwise func (flags *Flags) IsSet(flag string) bool { - flags.mtx.Lock() - defer flags.mtx.Unlock() - return flags.flagMap[flag] + flags.mu.Lock() + defer flags.mu.Unlock() + + return flags.flagStates[flag] } // Flags returns a list of all set flags func (flags *Flags) Flags() []string { - flags.mtx.Lock() - defer flags.mtx.Unlock() + flags.mu.Lock() + defer flags.mu.Unlock() var setFlags []string - for flag, isSet := range flags.flagMap { - if isSet { + for flag, state := range flags.flagStates { + if state { setFlags = append(setFlags, flag) } } - return setFlags } // Wait returns a channel that will be closed as soon as the flag is in the specified state. // If the flag is already in the specified state, Wait immediately returns a closed channel. func (flags *Flags) Wait(flag string, state bool) <-chan struct{} { - flags.mtx.Lock() - defer flags.mtx.Unlock() + flags.mu.Lock() + defer flags.mu.Unlock() - if flags.flagMap[flag] == state { + if flags.flagStates[flag] == state { ch := make(chan struct{}) close(ch) return ch } - ch := make(chan struct{}) - flags.waitMap[flag] = append(flags.waitMap[flag], ch) - + ch, ok := flags.flagChannels[flag] + if !ok { + ch = make(chan struct{}) + flags.flagChannels[flag] = ch + } return ch } @@ -104,24 +98,23 @@ func (flags *Flags) Wait(flag string, state bool) <-chan struct{} { // 2. Flag is in the specified state - WaitContext returns nil // If the flag is already in the specified state when the function is called, it returns nil immediately. func (flags *Flags) WaitContext(ctx context.Context, flag string, state bool) error { - flags.mtx.Lock() - defer flags.mtx.Unlock() - - if flags.flagMap[flag] == state { + flags.mu.Lock() + if flags.flagStates[flag] == state { + flags.mu.Unlock() return nil } - ch := make(chan struct{}) - flags.waitMap[flag] = append(flags.waitMap[flag], ch) - - flags.mtx.Unlock() + ch, ok := flags.flagChannels[flag] + if !ok { + ch = make(chan struct{}) + flags.flagChannels[flag] = ch + } + flags.mu.Unlock() select { - case <-ch: - return nil - case <-ctx.Done(): - return ctx.Err() + case <-ch: + return nil } } From ef1300ed56b1a46bba731868f763f4a9c7136b71 Mon Sep 17 00:00:00 2001 From: Mozes721 Date: Tue, 7 May 2024 01:18:09 +0200 Subject: [PATCH 3/5] Update flags_test.go reverted --- sig/flags_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sig/flags_test.go b/sig/flags_test.go index c292b1b08..b9d38bdfe 100644 --- a/sig/flags_test.go +++ b/sig/flags_test.go @@ -77,7 +77,7 @@ func TestWaitContext(t *testing.T) { go func() { var err error - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() err = flags.WaitContext(ctx, TestFlag1, true) From 279d57ee6c19768d8d2ab3338489e9e5b5a896f9 Mon Sep 17 00:00:00 2001 From: Mozes721 Date: Tue, 7 May 2024 01:22:03 +0200 Subject: [PATCH 4/5] Update flags_test.go --- sig/flags_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sig/flags_test.go b/sig/flags_test.go index b9d38bdfe..c292b1b08 100644 --- a/sig/flags_test.go +++ b/sig/flags_test.go @@ -77,7 +77,7 @@ func TestWaitContext(t *testing.T) { go func() { var err error - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() err = flags.WaitContext(ctx, TestFlag1, true) From 1fa08724b0d1f0bdea43ff4e47df758457c3135b Mon Sep 17 00:00:00 2001 From: Mozes721 Date: Tue, 14 May 2024 11:45:23 +0000 Subject: [PATCH 5/5] Adjusted flags WaitContext --- sig/flags.go | 42 ++++++++++++++++++++++++++---------------- sig/flags_test.go | 2 +- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/sig/flags.go b/sig/flags.go index 630d91267..6c01244a3 100644 --- a/sig/flags.go +++ b/sig/flags.go @@ -11,14 +11,14 @@ import ( // Flags provides a thread-safe observable flag set type Flags struct { - flagStates map[string]bool + flagStates map[string]struct{} flagChannels map[string]chan struct{} mu sync.Mutex } func NewFlags() *Flags { return &Flags{ - flagStates: make(map[string]bool), + flagStates: make(map[string]struct{}), flagChannels: make(map[string]chan struct{}), } } @@ -29,7 +29,11 @@ func (flags *Flags) Set(flag ...string) { defer flags.mu.Unlock() for _, f := range flag { - flags.flagStates[f] = true + if _, ok := flags.flagStates[f]; ok { + continue + } + + flags.flagStates[f] = struct{}{} if ch, ok := flags.flagChannels[f]; ok { close(ch) delete(flags.flagChannels, f) @@ -43,7 +47,11 @@ func (flags *Flags) Clear(flag ...string) { defer flags.mu.Unlock() for _, f := range flag { - flags.flagStates[f] = false + if _, ok := flags.flagStates[f]; !ok { + continue + } + + delete(flags.flagStates, f) if ch, ok := flags.flagChannels[f]; ok { close(ch) delete(flags.flagChannels, f) @@ -56,7 +64,8 @@ func (flags *Flags) IsSet(flag string) bool { flags.mu.Lock() defer flags.mu.Unlock() - return flags.flagStates[flag] + _, ok := flags.flagStates[flag] + return ok } // Flags returns a list of all set flags @@ -65,10 +74,8 @@ func (flags *Flags) Flags() []string { defer flags.mu.Unlock() var setFlags []string - for flag, state := range flags.flagStates { - if state { - setFlags = append(setFlags, flag) - } + for flag := range flags.flagStates { + setFlags = append(setFlags, flag) } return setFlags } @@ -79,7 +86,7 @@ func (flags *Flags) Wait(flag string, state bool) <-chan struct{} { flags.mu.Lock() defer flags.mu.Unlock() - if flags.flagStates[flag] == state { + if _, ok := flags.flagStates[flag]; ok == state { ch := make(chan struct{}) close(ch) return ch @@ -99,22 +106,25 @@ func (flags *Flags) Wait(flag string, state bool) <-chan struct{} { // If the flag is already in the specified state when the function is called, it returns nil immediately. func (flags *Flags) WaitContext(ctx context.Context, flag string, state bool) error { flags.mu.Lock() - if flags.flagStates[flag] == state { - flags.mu.Unlock() - return nil - } + defer flags.mu.Unlock() ch, ok := flags.flagChannels[flag] if !ok { ch = make(chan struct{}) flags.flagChannels[flag] = ch } - flags.mu.Unlock() + + if curState, ok := flags.flagStates[flag]; ok { + if (curState == struct{}{} && !state) || (curState != struct{}{} && state) { + return nil + } + } select { case <-ctx.Done(): return ctx.Err() - case <-ch: + default: return nil } + } diff --git a/sig/flags_test.go b/sig/flags_test.go index c292b1b08..d70fc3517 100644 --- a/sig/flags_test.go +++ b/sig/flags_test.go @@ -108,7 +108,7 @@ func TestWaitContext(t *testing.T) { tick <- struct{}{} }() - flags.Set(TestFlag1) + flags.IsSet(TestFlag1) <-tick