Skip to content

A bug of removing keys #27

@zhangchuanben

Description

@zhangchuanben

I was trying to remove keys from a group, but i found i can't. So i have to debug source code to find out what happening. The source codes is below

func (g *group) RemoveKeys(ctx context.Context, keys ...string) error {
	if len(keys) == 0 {
		return nil
	}

	g.Stats.RemoveKeysRequests.Add(1)
	g.Stats.RemovedKeys.Add(int64(len(keys)))

	keysByOwner := make(map[peer.Client][]string)
	var localKeys []string

	for _, key := range keys {
		owner, isRemote := g.instance.PickPeer(key)
		if isRemote {
			keysByOwner[owner] = append(keysByOwner[owner], key)
		} else {
			localKeys = append(localKeys, key)
		}
	}

	for _, key := range localKeys {
		g.LocalRemove(key)
	}

	multiErr := &MultiError{}
	errCh := make(chan error)

	// Send removeKeys requests to owners (parallel)
	var wg sync.WaitGroup
	for owner, ownerKeys := range keysByOwner {
		wg.Add(1)
		go func(p peer.Client, k []string) {
			errCh <- p.RemoveKeys(ctx, &pb.RemoveKeysRequest{
				Group: &g.name,
				Keys:  k,
			})
			wg.Done()
		}(owner, ownerKeys)
	}

	allPeers := g.instance.getAllPeers()
	for _, p := range allPeers {
		if p.PeerInfo().IsSelf {
			continue
		}
		if _, isOwner := keysByOwner[p]; isOwner {
			continue
		}

		wg.Add(1)
		go func(peer peer.Client) {
			errCh <- peer.RemoveKeys(ctx, &pb.RemoveKeysRequest{
				Group: &g.name,
				Keys:  keys,
			})
			wg.Done()
		}(p)
	}

	go func() {
		wg.Wait()
		close(errCh)
	}()

	for err := range errCh {
		if err != nil {
			multiErr.Add(err)
		}
	}

	return multiErr.NilOrError()
}

From the above code, it's seem that local cache will not be removed if i request a key that not belong to this Peer. I don't known if i am right, is anyone can help?

When i reference source codes of Remove, everything looks good, because it remove local cache first and then try to request to remove remote cache.

func (g *group) Remove(ctx context.Context, key string) error {
	_, err := g.removeGroup.Do(key, func() (interface{}, error) {

		// Remove from key owner first
		owner, isRemote := g.instance.PickPeer(key)
		if isRemote {
			if err := g.removeFromPeer(ctx, owner, key); err != nil {
				return nil, err
			}
		}
		// Remove from our cache next
		g.LocalRemove(key)
		wg := sync.WaitGroup{}
		errCh := make(chan error)

		// Asynchronously clear the key from all hot and main caches of peers
		for _, p := range g.instance.getAllPeers() {
			// avoid deleting from owner a second time
			if p == owner {
				continue
			}

			wg.Add(1)
			go func(p peer.Client) {
				errCh <- g.removeFromPeer(ctx, p, key)
				wg.Done()
			}(p)
		}
		go func() {
			wg.Wait()
			close(errCh)
		}()

		m := &MultiError{}
		for err := range errCh {
			m.Add(err)
		}

		return nil, m.NilOrError()
	})
	return err
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions