diff --git a/README.md b/README.md index 01a32f0..f00c2e7 100644 --- a/README.md +++ b/README.md @@ -1 +1,4 @@ This is a client interface to the Open vSwitch DB (OVSDB) protocol. There are still many functions to be fleshed out and the code is pretty messy at this point. There is also a function to keep the connection open to get updates from the database using `select` to make it non-blocking. + +Start the api server in ovs by running: +ovs-appctl -t ovsdb-server ovsdb-server/add-remote ptcp:6632 diff --git a/ovsdb.py b/ovsdb.py index bc5e570..8dc1510 100644 --- a/ovsdb.py +++ b/ovsdb.py @@ -1,131 +1,180 @@ +import os import sys import Queue import socket import json +import uuid from select import select OVSDB_IP = '127.0.0.1' OVSDB_PORT = 6632 -DEFAULT_DB = 'Open_vSwitch' +DEFAULT_DB = "Open_vSwitch" BUFFER_SIZE = 4096 -# TODO: Could start by getting the DB name and using that for ongoing requests -# TODO: How to keep an eye out for montor, update, echo messages? -def gather_reply(socket): - print "Waiting for reply" - result = "" - # we got the whole thing if we received all the fields - while "error" not in result or "id" not in result or "result" not in result: - reply = socket.recv(BUFFER_SIZE) - result += reply - return json.loads(result) - -def listen_for_messages(sock, message_queues): - # To send something, add a message to queue and append sock to outputs - inputs = [sock, sys.stdin] - outputs = [] - while sock: - readable, writable, exceptional = select(inputs, outputs, []) - for s in readable: - if s is sock: - data = sock.recv(4096) - # should test if its echo, if so, reply - # message_type = get_msg_type(data) - # if message_type is "echo": - # send_echo(message_ - message_queues[sock].put(data) - outputs.append(sock) - print "recv:" + data - elif s is sys.stdin: - print sys.stdin.readline() - sock.close() - return - else: - print "error" - for w in writable: - if w is sock: - sock.send(message_queues[sock].get_nowait()) - outputs.remove(sock) - else: - print "error" - -def list_dbs(): - list_dbs_query = {"method":"list_dbs", "params":[], "id": 0} - return json.dumps(list_dbs_query) - -def get_schema(socket, db = DEFAULT_DB, current_id = 0): - list_schema = {"method": "get_schema", "params":[db_name], "id": current_id} - socket.send(json.dumps(list_schema)) - result = gather_reply(socket) - return result - -def get_schema_version(socket, db = DEFAULT_DB): - db_schema = get_schema(socket, db) - return db_schema['version'] - -def list_tables(server, db): - # keys that are under 'tables' - db_schema = get_schema(socket, db) - # return db_schema['tables'].keys - return json.loads() - -def list_columns(server, db): - return - -def transact(server, transactions): - # Variants of this will add stuff - return - -def monitor(columns, monitor_id = None, db = DEFAULT_DB): - msg = {"method":"monitor", "params":[db, monitor_id, columns], "id":0} - return json.dumps(msg) - -def monitor_cancel(): - return - -def locking(): - return - -def echo(): - echo_msg = {"method":"echo","id":"echo","params":[]} - return json.dumps(echo_msg) - -def dump(server, db): - return - -def list_bridges(db = DEFAULT_DB): - # What if we replaced with a more specific query - # columns = {"Bridge":{"name"}} - columns = {"Port":{"columns":["fake_bridge","interfaces","name","tag"]},"Controller":{"columns":[]},"Interface":{"columns":["name"]},"Open_vSwitch":{"columns":["bridges","cur_cfg"]},"Bridge":{"columns":["controller","fail_mode","name","ports"]}} - # TODO: cancel the monitor after we're done? - return monitor(columns, db) +class OVSDB: + def __init__(self, db_ip = OVSDB_IP, db_port = OVSDB_PORT, db_name = DEFAULT_DB): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((db_ip,db_port)) + self.db_name = db_name + + # TODO: Could start by getting the DB name and using that for ongoing requests + # TODO: How to keep an eye out for montor, update, echo messages? + def gather_reply(self): + print "Waiting for reply" + result = "" + while True: + reply = self.socket.recv(BUFFER_SIZE) + result += reply + # we got the whole thing if we received all the fields + if "error" in result and "id" in result and "result" in result: + try: + return json.loads(result) + except ValueError: + pass + + def listen_for_messages(self, message_queues): + # To send something, add a message to queue and append sock to outputs + inputs = [self.socket, sys.stdin] + outputs = [] + while self.socket: + readable, writable, exceptional = select(inputs, outputs, []) + for s in readable: + if s is self.socket: + data = self.socket.recv(4096) + # should test if its echo, if so, reply + # message_type = get_msg_type(data) + # if message_type is "echo": + # send_echo(message_ + message_queues[self.socket].put(data) + outputs.append(self.socket) + print "recv:" + data + elif s is sys.stdin: + print sys.stdin.readline() + self.socket.close() + return + else: + print "error" + for w in writable: + if w is self.socket: + self.socket.send(message_queues[self.socket].get_nowait()) + outputs.remove(self.socket) + else: + print "error" + + def build_query_list_dbs(self): + return json.dumps({"method":"list_dbs", "params":[], "id": uuid.uuid4().int}) + + def get_schema(self): + list_schema = {"method": "get_schema", "params":[self.db_name], "id": uuid.uuid4().int} + self.socket.send(json.dumps(list_schema)) + result = self.gather_reply() + return result + + def get_dbs(self): + self.socket.send(self.build_query_list_dbs()) + result = self.gather_reply() + return result + + def get_schema_version(self): + db_schema = get_schema(self.socket, self.db_name) + return db_schema['version'] + + def list_tables(self): + # keys that are under 'tables' + db_schema = get_schema(self.socket, self.db_name) + return db_schema['result']['tables'].keys() + + def list_columns(server): + return + + def transact(self, operations = ""): + # Variants of this will add stuff + request = { "method": "transact", + "params": [self.db_name] + operations, + "id": uuid.uuid4().int, + } + + self.socket.send(json.dumps(request)) + response = self.gather_reply() + + #assumtion: no overlapping calls + assert( request['id'] == response['id'] ) + results = response['result'] + if len(operations) == len(results): + for i, val in enumerate(zip(operations, results)): + if 'error' in val[1]: + raise RuntimeError('Op failed (%d, %s): %s' % + (i, val[0], val[1])) + else: + raise RuntimeError('transact failed: %s' % results[-1]) + + return results + + def monitor(self, columns, monitor_id = None): + msg = {"method":"monitor", "params":[self.db_name, monitor_id, columns], "id":uuid.uuid4().int} + return json.dumps(msg) + + def monitor_cancel(): + return + + def locking(): + return + + def echo(): + echo_msg = {"method":"echo","id":"echo","params":[]} + return json.dumps(echo_msg) + + def dump(self): + return + + def list_bridges(self): + # What if we replaced with a more specific query + # columns = {"Bridge":{"name"}} + columns = {"Port":{"columns":["fake_bridge","interfaces","name","tag"]},"Controller":{"columns":[]},"Interface":{"columns":["name"]},"Open_vSwitch":{"columns":["bridges","cur_cfg"]},"Bridge":{"columns":["controller","fail_mode","name","ports"]}} + # TODO: cancel the monitor after we're done? + self.socket.send(self.monitor(columns)) + return self.gather_reply()['result'] + +daemon_uuid = None +def get_daemon_uuid(socket, db = DEFAULT_DB): + "Get the uuid from table Open_vSwitch" + global daemon_uuid + if daemon_uuid: + return daemon_uuid + op = {"op": "select", + "table": "Open_vSwitch", + "where": [], + "columns": ["_uuid"], + } + reply = transact(socket, db, [op]) + try: + if (len(reply[0]['rows']) != 1): + e = 'There must be exactly one record in the Open_vSwitch table.' + raise RuntimeError(e) + daemon_uuid = reply[0]['rows'][0]['_uuid'][1] + except (KeyError, TypeError): + raise RuntimeError("Database schema changed") + return daemon_uuid if __name__ == '__main__': - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect((OVSDB_IP, OVSDB_PORT)) + ovs = OVSDB() - current_id = 0 - - s.send(list_dbs()) - db_list = gather_reply(s) - db_name = db_list['result'][0] + print "List dbs" + db_list = ovs.get_dbs() + print db_list['result'][0] + print "list bridges:" - s.send(list_bridges()) - bridge_list = gather_reply(s) + bridge_list = ovs.list_bridges() print bridge_list - bridges = bridge_list['result']['Bridge'] + bridges = bridge_list['Bridge'] print "\nbridges\n" print bridges.values() for bridge in bridges.values(): print "---" print bridge['new']['name'] - #db_schema = get_schema(s, db_name) - #print db_schema - - #columns = {"Bridge":{"columns":["name"]}} - #print monitor(s, columns, 1) + # TODO: Put this in a thread and use Queues to send/recv data from the thread - message_queues = {} - message_queues[s] = Queue.Queue() - listen_for_messages(s, message_queues) + # message_queues = {} + # message_queues[s] = Queue.Queue() + # listen_for_messages(s, message_queues) diff --git a/ovsdb.pyc b/ovsdb.pyc deleted file mode 100644 index c15b9a9..0000000 Binary files a/ovsdb.pyc and /dev/null differ diff --git a/ovsdb_test.py b/ovsdb_test.py index 76fab5e..509c7dd 100644 --- a/ovsdb_test.py +++ b/ovsdb_test.py @@ -1,36 +1,23 @@ -import ovsdb +from ovsdb import OVSDB import unittest import socket class TestFunctions(unittest.TestCase): def setUp(self): - OVSDB_IP = '127.0.0.1' - OVSDB_PORT = 6632 - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.connect((OVSDB_IP, OVSDB_PORT)) + self.ovs = OVSDB() def test_list_dbs(self): - self.sock.send(ovsdb.list_dbs()) - db_list = ovsdb.gather_reply(self.sock) + db_list = self.ovs.get_dbs() db_name = db_list['result'][0] self.assertEqual(db_name, 'Open_vSwitch') - def test_monitor(self): - columns = {"Bridge":{"columns":["name"]}} - self.sock.send(ovsdb.monitor(columns)) - result = ovsdb.gather_reply(self.sock) - self.assertEqual(result['error'], None) - self.assertEqual(result['id'], 0) - self.assertTrue("Bridge" in result['result']) - def test_list_br(self): - self.sock.send(ovsdb.list_bridges()) - bridge_list = ovsdb.gather_reply(self.sock) - bridges = bridge_list['result']['Bridge'] + bridge_list = self.ovs.list_bridges() + bridges = bridge_list['Bridge'] #print bridges - print bridges.values() - self.assertTrue("br0" in bridges.values()) + #print bridges.values()[0]['new']['name'] + self.assertTrue("br0" in bridges.values()[0]['new']['name']) def test_choice(self): self.assertTrue(True)