-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Description
I was trying to remove keys from a group, but i found i can't. So i have to debug source code to find out what happening. The source codes is below
func (g *group) RemoveKeys(ctx context.Context, keys ...string) error {
if len(keys) == 0 {
return nil
}
g.Stats.RemoveKeysRequests.Add(1)
g.Stats.RemovedKeys.Add(int64(len(keys)))
keysByOwner := make(map[peer.Client][]string)
var localKeys []string
for _, key := range keys {
owner, isRemote := g.instance.PickPeer(key)
if isRemote {
keysByOwner[owner] = append(keysByOwner[owner], key)
} else {
localKeys = append(localKeys, key)
}
}
for _, key := range localKeys {
g.LocalRemove(key)
}
multiErr := &MultiError{}
errCh := make(chan error)
// Send removeKeys requests to owners (parallel)
var wg sync.WaitGroup
for owner, ownerKeys := range keysByOwner {
wg.Add(1)
go func(p peer.Client, k []string) {
errCh <- p.RemoveKeys(ctx, &pb.RemoveKeysRequest{
Group: &g.name,
Keys: k,
})
wg.Done()
}(owner, ownerKeys)
}
allPeers := g.instance.getAllPeers()
for _, p := range allPeers {
if p.PeerInfo().IsSelf {
continue
}
if _, isOwner := keysByOwner[p]; isOwner {
continue
}
wg.Add(1)
go func(peer peer.Client) {
errCh <- peer.RemoveKeys(ctx, &pb.RemoveKeysRequest{
Group: &g.name,
Keys: keys,
})
wg.Done()
}(p)
}
go func() {
wg.Wait()
close(errCh)
}()
for err := range errCh {
if err != nil {
multiErr.Add(err)
}
}
return multiErr.NilOrError()
}From the above code, it's seem that local cache will not be removed if i request a key that not belong to this Peer. I don't known if i am right, is anyone can help?
When i reference source codes of Remove, everything looks good, because it remove local cache first and then try to request to remove remote cache.
func (g *group) Remove(ctx context.Context, key string) error {
_, err := g.removeGroup.Do(key, func() (interface{}, error) {
// Remove from key owner first
owner, isRemote := g.instance.PickPeer(key)
if isRemote {
if err := g.removeFromPeer(ctx, owner, key); err != nil {
return nil, err
}
}
// Remove from our cache next
g.LocalRemove(key)
wg := sync.WaitGroup{}
errCh := make(chan error)
// Asynchronously clear the key from all hot and main caches of peers
for _, p := range g.instance.getAllPeers() {
// avoid deleting from owner a second time
if p == owner {
continue
}
wg.Add(1)
go func(p peer.Client) {
errCh <- g.removeFromPeer(ctx, p, key)
wg.Done()
}(p)
}
go func() {
wg.Wait()
close(errCh)
}()
m := &MultiError{}
for err := range errCh {
m.Add(err)
}
return nil, m.NilOrError()
})
return err
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels