Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dockerplugin.db
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ blkio.single value:COUNTER:0:U
cpu.percpu.usage value:COUNTER:0:U
cpu.usage total:COUNTER:0:U, kernelmode:COUNTER:0:U, usermode:COUNTER:0:U, system:COUNTER:0:U
cpu.throttling_data periods:COUNTER:0:U, throttled_periods:COUNTER:0:U, throttled_time:COUNTER:0:U
network.usage rx_bytes:COUNTER:0:U, rx_dropped:COUNTER:0:U, rx_errors:COUNTER:0:U, rx_packets:COUNTER:0:U, tx_bytes:COUNTER:0:U, tx_dropped:COUNTER:0:U, tx_errors:COUNTER:0:U, tx_packets:COUNTER:0:U
network.usage rx_bytes:GAUGE:0:U, tx_bytes:GAUGE:0:U
memory.usage limit:GAUGE:0:U, max:GAUGE:0:U, total:GAUGE:0:U
memory.stats value:GAUGE:0:U
memory.percent value:GAUGE:0:150
Expand Down
65 changes: 50 additions & 15 deletions dockerplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def read(cls, container, stats, t):

class CpuStats(Stats):
@classmethod
def read(cls, container, stats, t):
def read(cls, container, pre_stats, stats, t):
cpu_stats = stats['cpu_stats']
cpu_usage = cpu_stats['cpu_usage']

Expand All @@ -128,21 +128,29 @@ def read(cls, container, stats, t):
# CPU Percentage based on calculateCPUPercent Docker method
# https://github.com/docker/docker/blob/master/api/client/stats.go
cpu_percent = 0.0
if 'precpu_stats' in stats:
precpu_stats = stats['precpu_stats']
precpu_usage = precpu_stats['cpu_usage']
cpu_delta = cpu_usage['total_usage'] - precpu_usage['total_usage']
system_delta = system_cpu_usage - precpu_stats['system_cpu_usage']

cur_stats_t = time.mktime(dateutil.parser.parse(stats['read']).timetuple())

if pre_stats:
precpu_total_usage = pre_stats['cpu_stats']['cpu_usage']['total_usage']
precpu_system_usage = pre_stats['cpu_stats']['system_cpu_usage']
pre_stats_t = time.mktime(dateutil.parser.parse(pre_stats['read']).timetuple())

cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - precpu_total_usage
system_delta = stats['cpu_stats']['system_cpu_usage'] - precpu_system_usage
t_delta = cur_stats_t - pre_stats_t

if system_delta > 0 and cpu_delta > 0:
cpu_percent = 100.0 * cpu_delta / system_delta * len(percpu)
cpu_percent = 100.0 * float(cpu_delta) / system_delta * len(percpu)

cls.emit(container, "cpu.percent", ["%.2f" % (cpu_percent)], t=t)


class NetworkStats(Stats):
@classmethod
def read(cls, container, stats, t):
items = sorted(stats['network'].items())
cls.emit(container, 'network.usage', [x[1] for x in items], t=t)
cls.emit(container, 'network.usage', [stats['network']['rx_bytes'], stats['network']['tx_bytes']], t=t)


class MemoryStats(Stats):
Expand All @@ -151,6 +159,7 @@ def read(cls, container, stats, t):
mem_stats = stats['memory_stats']
values = [mem_stats['limit'], mem_stats['max_usage'],
mem_stats['usage']]

cls.emit(container, 'memory.usage', values, t=t)

for key, value in mem_stats['stats'].items():
Expand Down Expand Up @@ -189,6 +198,8 @@ def __init__(self, container, client):
self._client = client
self._feed = None
self._stats = None
self._pre_stats = None
self._tmp_stats = None

# Automatically start stats reading thread
self.start()
Expand All @@ -198,12 +209,16 @@ def run(self):
.format(container=_c(self._container)))

failures = 0
count = 0

while not self.stop:
try:
if not self._feed:
self._feed = self._client.stats(self._container,
decode=True)
self._stats = self._feed.next()
self._pre_stats = self._tmp_stats

# Reset failure count on successfull read from the stats API.
failures = 0
except Exception, e:
Expand All @@ -224,6 +239,8 @@ def run(self):
# survive transient Docker daemon errors/unavailabilities.
self._feed = None

self._tmp_stats = self._stats

collectd.info('Stopped stats gathering for {container}.'
.format(container=_c(self._container)))

Expand All @@ -235,6 +252,11 @@ def stats(self):
pass
return self._stats

@property
def pre_stats(self):

return self._pre_stats


class DockerPlugin:
"""
Expand All @@ -244,6 +266,7 @@ class DockerPlugin:

DEFAULT_BASE_URL = 'unix://var/run/docker.sock'
DEFAULT_DOCKER_TIMEOUT = 5
DEFAULT_INTERVAL = 1

# The stats endpoint is only supported by API >= 1.17
MIN_DOCKER_API_VERSION = '1.17'
Expand All @@ -254,6 +277,7 @@ def __init__(self, docker_url=None):
self.docker_url = docker_url or DockerPlugin.DEFAULT_BASE_URL
self.timeout = DockerPlugin.DEFAULT_DOCKER_TIMEOUT
self.capture = False
self.interval = DockerPlugin.DEFAULT_INTERVAL
self.stats = {}

def configure_callback(self, conf):
Expand All @@ -262,6 +286,10 @@ def configure_callback(self, conf):
self.docker_url = node.values[0]
elif node.key == 'Timeout':
self.timeout = int(node.values[0])
elif node.key == 'Interval':
self.interval = int(float(node.values[0]))
else:
collectd.warning('Unknown config key: {}'.format(node.key))

def init_callback(self):
self.client = docker.Client(
Expand All @@ -281,12 +309,14 @@ def init_callback(self):
.format(url=self.docker_url))
return False

collectd.register_read(self.read_callback)
collectd.register_read(self.read_callback, self.interval)
collectd.info(('Collecting stats about Docker containers from {url} '
'(API version {version}; timeout: {timeout}s).')
'(API version {version}; timeout: {timeout}s; interval: {interval}s).')
.format(url=self.docker_url,
version=version,
timeout=self.timeout))
timeout=self.timeout,
interval = self.interval))

return True

def read_callback(self):
Expand Down Expand Up @@ -314,9 +344,14 @@ def read_callback(self):

# Get and process stats from the container.
stats = self.stats[container['Id']].stats
pre_stats = self.stats[container['Id']].pre_stats

t = stats['read']
for klass in self.CLASSES:
klass.read(container, stats, t)
if klass == CpuStats:
klass.read(container, pre_stats, stats, t)
else:
klass.read(container, stats, t)
except Exception, e:
collectd.warning(('Error getting stats for container '
'{container}: {msg}')
Expand All @@ -335,8 +370,8 @@ def dispatch(self):
identifier += '/' + self.type
if getattr(self, 'type_instance', None):
identifier += '-' + self.type_instance
print 'PUTVAL', identifier, \
':'.join(map(str, [int(self.time)] + self.values))
#print 'PUTVAL', identifier, \
# ':'.join(map(str, [int(self.time)] + self.values))

class ExecCollectd:
def Values(self):
Expand All @@ -348,7 +383,7 @@ def warning(self, msg):
def info(self, msg):
print 'INFO:', msg

def register_read(self, docker_plugin):
def register_read(self, docker_plugin, interval):
pass

collectd = ExecCollectd()
Expand Down