Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 83 additions & 92 deletions internal/watcher/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/diff"
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/synthesizer"
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -75,6 +76,7 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string

w.lastAuthHashes = make(map[string]string)
w.lastAuthContents = make(map[string]*coreauth.Auth)
w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth)
if resolvedAuthDir, errResolveAuthDir := util.ResolveAuthDir(cfg.AuthDir); errResolveAuthDir != nil {
log.Errorf("failed to resolve auth directory for hash cache: %v", errResolveAuthDir)
} else if resolvedAuthDir != "" {
Expand All @@ -92,6 +94,17 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string
if errParse := json.Unmarshal(data, &auth); errParse == nil {
w.lastAuthContents[normalizedPath] = &auth
}
ctx := &synthesizer.SynthesisContext{
Config: cfg,
AuthDir: resolvedAuthDir,
Now: time.Now(),
IDGenerator: synthesizer.NewStableIDGenerator(),
}
if generated := synthesizer.SynthesizeAuthFile(ctx, path, data); len(generated) > 0 {
if pathAuths := authSliceToMap(generated); len(pathAuths) > 0 {
w.fileAuthsByPath[normalizedPath] = pathAuths
}
}
}
}
return nil
Expand Down Expand Up @@ -143,13 +156,14 @@ func (w *Watcher) addOrUpdateClient(path string) {
}

w.clientsMutex.Lock()

cfg := w.config
if cfg == nil {
if w.config == nil {
log.Error("config is nil, cannot add or update client")
w.clientsMutex.Unlock()
return
}
if w.fileAuthsByPath == nil {
w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth)
}
if prev, ok := w.lastAuthHashes[normalized]; ok && prev == curHash {
log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(path))
w.clientsMutex.Unlock()
Expand Down Expand Up @@ -177,34 +191,86 @@ func (w *Watcher) addOrUpdateClient(path string) {
}
w.lastAuthContents[normalized] = &newAuth

w.clientsMutex.Unlock() // Unlock before the callback

w.refreshAuthState(false)
oldByID := make(map[string]*coreauth.Auth, len(w.fileAuthsByPath[normalized]))
for id, a := range w.fileAuthsByPath[normalized] {
oldByID[id] = a
}

if w.reloadCallback != nil {
log.Debugf("triggering server update callback after add/update")
w.triggerServerUpdate(cfg)
// Build synthesized auth entries for this single file only.
sctx := &synthesizer.SynthesisContext{
Config: w.config,
AuthDir: w.effectiveAuthDir(),
Now: time.Now(),
IDGenerator: synthesizer.NewStableIDGenerator(),
}
generated := synthesizer.SynthesizeAuthFile(sctx, path, data)
newByID := authSliceToMap(generated)
if len(newByID) > 0 {
w.fileAuthsByPath[normalized] = newByID
} else {
delete(w.fileAuthsByPath, normalized)
}
updates := w.computePerPathUpdatesLocked(oldByID, newByID)
w.clientsMutex.Unlock()

w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
w.dispatchAuthUpdates(updates)
}

func (w *Watcher) removeClient(path string) {
normalized := w.normalizeAuthPath(path)
w.clientsMutex.Lock()

cfg := w.config
oldByID := make(map[string]*coreauth.Auth, len(w.fileAuthsByPath[normalized]))
for id, a := range w.fileAuthsByPath[normalized] {
oldByID[id] = a
}
delete(w.lastAuthHashes, normalized)
delete(w.lastAuthContents, normalized)
delete(w.fileAuthsByPath, normalized)

w.clientsMutex.Unlock() // Release the lock before the callback
updates := w.computePerPathUpdatesLocked(oldByID, map[string]*coreauth.Auth{})
w.clientsMutex.Unlock()

w.refreshAuthState(false)
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
w.dispatchAuthUpdates(updates)
}

if w.reloadCallback != nil {
log.Debugf("triggering server update callback after removal")
w.triggerServerUpdate(cfg)
func (w *Watcher) computePerPathUpdatesLocked(oldByID, newByID map[string]*coreauth.Auth) []AuthUpdate {
if w.currentAuths == nil {
w.currentAuths = make(map[string]*coreauth.Auth)
}
updates := make([]AuthUpdate, 0, len(oldByID)+len(newByID))
for id, newAuth := range newByID {
existing, ok := w.currentAuths[id]
if !ok {
w.currentAuths[id] = newAuth.Clone()
updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: newAuth.Clone()})
continue
}
if !authEqual(existing, newAuth) {
w.currentAuths[id] = newAuth.Clone()
updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: newAuth.Clone()})
}
}
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
for id := range oldByID {
if _, stillExists := newByID[id]; stillExists {
continue
}
delete(w.currentAuths, id)
updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id})
}
return updates
}

func authSliceToMap(auths []*coreauth.Auth) map[string]*coreauth.Auth {
byID := make(map[string]*coreauth.Auth, len(auths))
for _, a := range auths {
if a == nil || strings.TrimSpace(a.ID) == "" {
continue
}
byID[a.ID] = a
}
return byID
}

func (w *Watcher) loadFileClients(cfg *config.Config) int {
Expand Down Expand Up @@ -304,78 +370,3 @@ func (w *Watcher) persistAuthAsync(message string, paths ...string) {
}()
}

func (w *Watcher) stopServerUpdateTimer() {
w.serverUpdateMu.Lock()
defer w.serverUpdateMu.Unlock()
if w.serverUpdateTimer != nil {
w.serverUpdateTimer.Stop()
w.serverUpdateTimer = nil
}
w.serverUpdatePend = false
}

func (w *Watcher) triggerServerUpdate(cfg *config.Config) {
if w == nil || w.reloadCallback == nil || cfg == nil {
return
}
if w.stopped.Load() {
return
}

now := time.Now()

w.serverUpdateMu.Lock()
if w.serverUpdateLast.IsZero() || now.Sub(w.serverUpdateLast) >= serverUpdateDebounce {
w.serverUpdateLast = now
if w.serverUpdateTimer != nil {
w.serverUpdateTimer.Stop()
w.serverUpdateTimer = nil
}
w.serverUpdatePend = false
w.serverUpdateMu.Unlock()
w.reloadCallback(cfg)
return
}

if w.serverUpdatePend {
w.serverUpdateMu.Unlock()
return
}

delay := serverUpdateDebounce - now.Sub(w.serverUpdateLast)
if delay < 10*time.Millisecond {
delay = 10 * time.Millisecond
}
w.serverUpdatePend = true
if w.serverUpdateTimer != nil {
w.serverUpdateTimer.Stop()
w.serverUpdateTimer = nil
}
var timer *time.Timer
timer = time.AfterFunc(delay, func() {
if w.stopped.Load() {
return
}
w.clientsMutex.RLock()
latestCfg := w.config
w.clientsMutex.RUnlock()

w.serverUpdateMu.Lock()
if w.serverUpdateTimer != timer || !w.serverUpdatePend {
w.serverUpdateMu.Unlock()
return
}
w.serverUpdateTimer = nil
w.serverUpdatePend = false
if latestCfg == nil || w.reloadCallback == nil || w.stopped.Load() {
w.serverUpdateMu.Unlock()
return
}

w.serverUpdateLast = time.Now()
w.serverUpdateMu.Unlock()
w.reloadCallback(latestCfg)
})
w.serverUpdateTimer = timer
w.serverUpdateMu.Unlock()
}
9 changes: 8 additions & 1 deletion internal/watcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
)

var snapshotCoreAuthsFunc = snapshotCoreAuths

func (w *Watcher) setAuthUpdateQueue(queue chan<- AuthUpdate) {
w.clientsMutex.Lock()
defer w.clientsMutex.Unlock()
Expand Down Expand Up @@ -76,7 +78,11 @@ func (w *Watcher) dispatchRuntimeAuthUpdate(update AuthUpdate) bool {
}

func (w *Watcher) refreshAuthState(force bool) {
auths := w.SnapshotCoreAuths()
w.clientsMutex.RLock()
cfg := w.config
authDir := w.effectiveAuthDir()
w.clientsMutex.RUnlock()
auths := snapshotCoreAuthsFunc(cfg, authDir)
w.clientsMutex.Lock()
if len(w.runtimeAuths) > 0 {
for _, a := range w.runtimeAuths {
Expand Down Expand Up @@ -271,3 +277,4 @@ func snapshotCoreAuths(cfg *config.Config, authDir string) []*coreauth.Auth {

return out
}

13 changes: 8 additions & 5 deletions internal/watcher/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ func (w *Watcher) start(ctx context.Context) error {
}
log.Debugf("watching config file: %s", w.configPath)

if errAddAuthDir := w.watcher.Add(w.authDir); errAddAuthDir != nil {
log.Errorf("failed to watch auth directory %s: %v", w.authDir, errAddAuthDir)
authDir := w.effectiveAuthDir()
if errAddAuthDir := w.watcher.Add(authDir); errAddAuthDir != nil {
log.Errorf("failed to watch auth directory %s: %v", authDir, errAddAuthDir)
return errAddAuthDir
}
log.Debugf("watching auth directory: %s", w.authDir)
log.Debugf("watching auth directory: %s", authDir)

go w.processEvents(ctx)

Expand Down Expand Up @@ -69,10 +70,9 @@ func (w *Watcher) handleEvent(event fsnotify.Event) {
configOps := fsnotify.Write | fsnotify.Create | fsnotify.Rename
normalizedName := w.normalizeAuthPath(event.Name)
normalizedConfigPath := w.normalizeAuthPath(w.configPath)
normalizedAuthDir := w.normalizeAuthPath(w.authDir)
isConfigEvent := normalizedName == normalizedConfigPath && event.Op&configOps != 0
authOps := fsnotify.Create | fsnotify.Write | fsnotify.Remove | fsnotify.Rename
isAuthJSON := strings.HasPrefix(normalizedName, normalizedAuthDir) && strings.HasSuffix(normalizedName, ".json") && event.Op&authOps != 0
isAuthJSON := strings.HasSuffix(normalizedName, ".json") && pathBelongsToDir(event.Name, w.effectiveAuthDir()) && event.Op&authOps != 0
if !isConfigEvent && !isAuthJSON {
// Ignore unrelated files (e.g., cookie snapshots *.cookie) and other noise.
return
Expand Down Expand Up @@ -192,3 +192,6 @@ func (w *Watcher) shouldDebounceRemove(normalizedPath string, now time.Time) boo
w.clientsMutex.Unlock()
return false
}



Loading