From b609b8ff431178e01a33b160b0366a8d75cbd04c Mon Sep 17 00:00:00 2001 From: Charles Hooper Date: Mon, 30 Jan 2012 18:04:04 -0500 Subject: [PATCH 1/2] Add custom send_interval support for collectd --- collectd.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/collectd.py b/collectd.py index 2fee145..a7449d7 100644 --- a/collectd.py +++ b/collectd.py @@ -17,7 +17,6 @@ logger = logging.getLogger("collectd") -SEND_INTERVAL = 10 # seconds MAX_PACKET_SIZE = 1024 # bytes PLUGIN_TYPE = "gauge" @@ -68,19 +67,19 @@ def pack(id, value): else: raise AssertionError("invalid type code " + str(id)) -def message_start(when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any"): +def message_start(when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any", send_interval=10): return "".join([ pack(TYPE_HOST, host), pack(TYPE_TIME, when or time.time()), pack(TYPE_PLUGIN, plugin_name), pack(TYPE_PLUGIN_INSTANCE, plugin_inst), pack(TYPE_TYPE, PLUGIN_TYPE), - pack(TYPE_INTERVAL, SEND_INTERVAL) + pack(TYPE_INTERVAL, send_interval) ]) -def messages(counts, when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any"): +def messages(counts, when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any", send_interval=10): packets = [] - start = message_start(when, host, plugin_inst, plugin_name) + start = message_start(when, host, plugin_inst, plugin_name, send_interval) parts = [pack(name, count) for name,count in counts.items()] parts = [p for p in parts if len(start) + len(p) <= MAX_PACKET_SIZE] if parts: @@ -158,7 +157,7 @@ class Connection(object): @synchronized def __new__(cls, hostname = socket.gethostname(), collectd_host = "localhost", collectd_port = 25826, - plugin_inst = "", plugin_name = "any"): + plugin_inst = "", plugin_name = "any", send_interval=10): id = (hostname, collectd_host, collectd_port, plugin_inst, plugin_name) if id in cls.instances: return cls.instances[id] @@ -169,7 +168,7 @@ def __new__(cls, hostname = socket.gethostname(), def __init__(self, hostname = socket.gethostname(), collectd_host = "localhost", collectd_port = 25826, - plugin_inst = "", plugin_name = "any"): + plugin_inst = "", plugin_name = "any", send_interval=10): if "_counters" not in self.__dict__: self._lock = RLock() self._counters = {} @@ -177,6 +176,7 @@ def __init__(self, hostname = socket.gethostname(), self._plugin_name = plugin_name self._hostname = hostname self._collectd_addr = (collectd_host, collectd_port) + self._send_interval = send_interval @synchronized def __getattr__(self, name): @@ -208,7 +208,7 @@ def take_snapshots(): def send_stats(raise_on_empty = False): try: when, stats, conn = snaps.get(timeout = 0.1) - for message in messages(stats, when, conn._hostname, conn._plugin_inst, conn._plugin_name): + for message in messages(stats, when, conn._hostname, conn._plugin_inst, conn._plugin_name, conn._send_interval): sock.sendto(message, conn._collectd_addr) except Empty: if raise_on_empty: @@ -232,7 +232,7 @@ def wrapped(): t.start() single_start = Semaphore() -def start_threads(): +def start_threads(send_interval=10): assert single_start.acquire(blocking = False) - daemonize(take_snapshots, sleep_for = SEND_INTERVAL) + daemonize(take_snapshots, sleep_for = send_interval) daemonize(send_stats) From 686cccb14ed2fea2765051f3847c62dc7b01a2be Mon Sep 17 00:00:00 2001 From: Charles Hooper Date: Sat, 4 Feb 2012 12:20:47 -0500 Subject: [PATCH 2/2] Add support for multiple collectd types (plugin-wide) --- collectd.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/collectd.py b/collectd.py index a7449d7..5e51de6 100644 --- a/collectd.py +++ b/collectd.py @@ -36,6 +36,14 @@ VALUE_GAUGE = 1 VALUE_DERIVE = 2 VALUE_ABSOLUTE = 3 + +VALUE_LOOKUP = { + 'counter': VALUE_COUNTER, + 'gauge': VALUE_GAUGE, + 'derive': VALUE_DERIVE, + 'absolute': VALUE_ABSOLUTE, +} + VALUE_CODES = { VALUE_COUNTER: "!Q", VALUE_GAUGE: "