Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.accumulo.core.Constants;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
Expand All @@ -47,6 +51,13 @@
public class ZooCache {
private static final Logger log = LoggerFactory.getLogger(ZooCache.class);

public final static Pattern TABLE_CONFIG_DIR_PATTERN = Pattern.compile(
"(/accumulo/[0-9a-z-]+)(" + Constants.ZTABLES + ")(/.*)(" + Constants.ZTABLE_CONF + ")");

public final static Pattern TABLE_SETTING_CONFIG_PATTERN =
Pattern.compile("(/accumulo/[0-9a-z-]+)(" + Constants.ZTABLES + ")(/.*)("
+ Constants.ZTABLE_CONF + ")/(table.*|tserver.*)");

private final ZCacheWatcher watcher = new ZCacheWatcher();
private final Watcher externalWatcher;

Expand All @@ -57,6 +68,7 @@ public class ZooCache {
private final HashMap<String,byte[]> cache;
private final HashMap<String,ZcStat> statCache;
private final HashMap<String,List<String>> childrenCache;
private final HashSet<String> znodeExists = new HashSet<>();

private final ZooReader zReader;
private final SecureRandom secureRandom = new SecureRandom();
Expand Down Expand Up @@ -155,8 +167,23 @@ public void process(WatchedEvent event) {
}

switch (event.getType()) {
case NodeDataChanged:
case NodeChildrenChanged:
if (event.getPath().endsWith("conf")) {
Matcher confDirMatcher = TABLE_CONFIG_DIR_PATTERN.matcher(event.getPath());
if (confDirMatcher.matches()) {
try {
// If a table config parameter without a watch on it is deleted
// this is how it gets cleared from ZooCache
clear(event.getPath());
getZooKeeper().exists(event.getPath(), watcher);
log.info("NodeChildrenChanged: resetting watcher for " + event.getPath());

} catch (KeeperException | InterruptedException e) {
log.error("Could not reset watcher on parent node: " + event.getPath());
}
}
}
case NodeDataChanged:
case NodeCreated:
case NodeDeleted:
remove(event.getPath());
Expand Down Expand Up @@ -402,11 +429,23 @@ public byte[] run() throws KeeperException, InterruptedException {
* a special case that looks for Code.NONODE in the KeeperException, then non-existence can
* not be cached.
*/

cacheWriteLock.lock();
try {

final ZooKeeper zooKeeper = getZooKeeper();
Stat stat = zooKeeper.exists(zPath, watcher);

byte[] data = null;

// watched will be false if it is a table config that is not
// actually a node inside Zookeeper
boolean watched = isWatched(zooKeeper, zPath);

if (!watched)
return data;

Stat stat = zooKeeper.exists(zPath, watcher);

if (stat == null) {
if (log.isTraceEnabled()) {
log.trace("zookeeper did not contain {}", zPath);
Expand All @@ -423,8 +462,10 @@ public byte[] run() throws KeeperException, InterruptedException {
(data == null ? null : new String(data, UTF_8)));
}
}

put(zPath, data, zstat);
copyStats(status, zstat);

return data;
} finally {
cacheWriteLock.unlock();
Expand All @@ -435,6 +476,46 @@ public byte[] run() throws KeeperException, InterruptedException {
return zr.retry();
}

/*
* Some of the table configuration parameters are possibly not in ZooKeeper yet and even though
* you are able you should not put a watcher on them to observe the NodeCreated event. If you do
* that and the ZNode for that configuration is never created, you will have no way to delete the
* watcher you created for it in Zookeeper. Unremovable watchers that are created for for table
* configs that never actually have a Znode is causing memory leaks in Accumulo instances that
* create and delete tables at a high rate.
*/

private boolean isWatched(ZooKeeper zooKeeper, String zPath)
throws KeeperException, InterruptedException {

boolean watched = true;

if (znodeExists.contains(zPath))
return watched;

Matcher configMatcher = TABLE_SETTING_CONFIG_PATTERN.matcher(zPath);

if (configMatcher.matches()) {
Stat stat;
try {
stat = zooKeeper.exists(zPath, false);
} catch (KeeperException.BadVersionException | KeeperException.NoNodeException e1) {
throw e1;
}

if (stat != null) {
watched = true;
synchronized (znodeExists) {
znodeExists.add(zPath);
}
} else {
watched = false;
}
}

return watched;
}

/**
* Helper method to copy stats from the cached stat into userStat
*
Expand Down Expand Up @@ -468,6 +549,7 @@ private void remove(String zPath) {
cache.remove(zPath);
childrenCache.remove(zPath);
statCache.remove(zPath);
znodeExists.remove(zPath);

immutableCache = new ImmutableCacheCopies(++updateCount, cache, statCache, childrenCache);
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.tables;

Expand Down