Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
This is a client interface to the Open vSwitch DB (OVSDB) protocol. There are still many functions to be fleshed out and the code is pretty messy at this point. There is also a function to keep the connection open to get updates from the database using `select` to make it non-blocking.

Start the api server in ovs by running:
ovs-appctl -t ovsdb-server ovsdb-server/add-remote ptcp:6632
267 changes: 158 additions & 109 deletions ovsdb.py
Original file line number Diff line number Diff line change
@@ -1,131 +1,180 @@
import os
import sys
import Queue
import socket
import json
import uuid
from select import select

OVSDB_IP = '127.0.0.1'
OVSDB_PORT = 6632
DEFAULT_DB = 'Open_vSwitch'
DEFAULT_DB = "Open_vSwitch"
BUFFER_SIZE = 4096

# TODO: Could start by getting the DB name and using that for ongoing requests
# TODO: How to keep an eye out for montor, update, echo messages?
def gather_reply(socket):
print "Waiting for reply"
result = ""
# we got the whole thing if we received all the fields
while "error" not in result or "id" not in result or "result" not in result:
reply = socket.recv(BUFFER_SIZE)
result += reply
return json.loads(result)

def listen_for_messages(sock, message_queues):
# To send something, add a message to queue and append sock to outputs
inputs = [sock, sys.stdin]
outputs = []
while sock:
readable, writable, exceptional = select(inputs, outputs, [])
for s in readable:
if s is sock:
data = sock.recv(4096)
# should test if its echo, if so, reply
# message_type = get_msg_type(data)
# if message_type is "echo":
# send_echo(message_
message_queues[sock].put(data)
outputs.append(sock)
print "recv:" + data
elif s is sys.stdin:
print sys.stdin.readline()
sock.close()
return
else:
print "error"
for w in writable:
if w is sock:
sock.send(message_queues[sock].get_nowait())
outputs.remove(sock)
else:
print "error"

def list_dbs():
list_dbs_query = {"method":"list_dbs", "params":[], "id": 0}
return json.dumps(list_dbs_query)

def get_schema(socket, db = DEFAULT_DB, current_id = 0):
list_schema = {"method": "get_schema", "params":[db_name], "id": current_id}
socket.send(json.dumps(list_schema))
result = gather_reply(socket)
return result

def get_schema_version(socket, db = DEFAULT_DB):
db_schema = get_schema(socket, db)
return db_schema['version']

def list_tables(server, db):
# keys that are under 'tables'
db_schema = get_schema(socket, db)
# return db_schema['tables'].keys
return json.loads()

def list_columns(server, db):
return

def transact(server, transactions):
# Variants of this will add stuff
return

def monitor(columns, monitor_id = None, db = DEFAULT_DB):
msg = {"method":"monitor", "params":[db, monitor_id, columns], "id":0}
return json.dumps(msg)

def monitor_cancel():
return

def locking():
return

def echo():
echo_msg = {"method":"echo","id":"echo","params":[]}
return json.dumps(echo_msg)

def dump(server, db):
return

def list_bridges(db = DEFAULT_DB):
# What if we replaced with a more specific query
# columns = {"Bridge":{"name"}}
columns = {"Port":{"columns":["fake_bridge","interfaces","name","tag"]},"Controller":{"columns":[]},"Interface":{"columns":["name"]},"Open_vSwitch":{"columns":["bridges","cur_cfg"]},"Bridge":{"columns":["controller","fail_mode","name","ports"]}}
# TODO: cancel the monitor after we're done?
return monitor(columns, db)
class OVSDB:
def __init__(self, db_ip = OVSDB_IP, db_port = OVSDB_PORT, db_name = DEFAULT_DB):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((db_ip,db_port))
self.db_name = db_name

# TODO: Could start by getting the DB name and using that for ongoing requests
# TODO: How to keep an eye out for montor, update, echo messages?
def gather_reply(self):
print "Waiting for reply"
result = ""
while True:
reply = self.socket.recv(BUFFER_SIZE)
result += reply
# we got the whole thing if we received all the fields
if "error" in result and "id" in result and "result" in result:
try:
return json.loads(result)
except ValueError:
pass

def listen_for_messages(self, message_queues):
# To send something, add a message to queue and append sock to outputs
inputs = [self.socket, sys.stdin]
outputs = []
while self.socket:
readable, writable, exceptional = select(inputs, outputs, [])
for s in readable:
if s is self.socket:
data = self.socket.recv(4096)
# should test if its echo, if so, reply
# message_type = get_msg_type(data)
# if message_type is "echo":
# send_echo(message_
message_queues[self.socket].put(data)
outputs.append(self.socket)
print "recv:" + data
elif s is sys.stdin:
print sys.stdin.readline()
self.socket.close()
return
else:
print "error"
for w in writable:
if w is self.socket:
self.socket.send(message_queues[self.socket].get_nowait())
outputs.remove(self.socket)
else:
print "error"

def build_query_list_dbs(self):
return json.dumps({"method":"list_dbs", "params":[], "id": uuid.uuid4().int})

def get_schema(self):
list_schema = {"method": "get_schema", "params":[self.db_name], "id": uuid.uuid4().int}
self.socket.send(json.dumps(list_schema))
result = self.gather_reply()
return result

def get_dbs(self):
self.socket.send(self.build_query_list_dbs())
result = self.gather_reply()
return result

def get_schema_version(self):
db_schema = get_schema(self.socket, self.db_name)
return db_schema['version']

def list_tables(self):
# keys that are under 'tables'
db_schema = get_schema(self.socket, self.db_name)
return db_schema['result']['tables'].keys()

def list_columns(server):
return

def transact(self, operations = ""):
# Variants of this will add stuff
request = { "method": "transact",
"params": [self.db_name] + operations,
"id": uuid.uuid4().int,
}

self.socket.send(json.dumps(request))
response = self.gather_reply()

#assumtion: no overlapping calls
assert( request['id'] == response['id'] )
results = response['result']
if len(operations) == len(results):
for i, val in enumerate(zip(operations, results)):
if 'error' in val[1]:
raise RuntimeError('Op failed (%d, %s): %s' %
(i, val[0], val[1]))
else:
raise RuntimeError('transact failed: %s' % results[-1])

return results

def monitor(self, columns, monitor_id = None):
msg = {"method":"monitor", "params":[self.db_name, monitor_id, columns], "id":uuid.uuid4().int}
return json.dumps(msg)

def monitor_cancel():
return

def locking():
return

def echo():
echo_msg = {"method":"echo","id":"echo","params":[]}
return json.dumps(echo_msg)

def dump(self):
return

def list_bridges(self):
# What if we replaced with a more specific query
# columns = {"Bridge":{"name"}}
columns = {"Port":{"columns":["fake_bridge","interfaces","name","tag"]},"Controller":{"columns":[]},"Interface":{"columns":["name"]},"Open_vSwitch":{"columns":["bridges","cur_cfg"]},"Bridge":{"columns":["controller","fail_mode","name","ports"]}}
# TODO: cancel the monitor after we're done?
self.socket.send(self.monitor(columns))
return self.gather_reply()['result']

daemon_uuid = None
def get_daemon_uuid(socket, db = DEFAULT_DB):
"Get the uuid from table Open_vSwitch"
global daemon_uuid
if daemon_uuid:
return daemon_uuid
op = {"op": "select",
"table": "Open_vSwitch",
"where": [],
"columns": ["_uuid"],
}
reply = transact(socket, db, [op])
try:
if (len(reply[0]['rows']) != 1):
e = 'There must be exactly one record in the Open_vSwitch table.'
raise RuntimeError(e)
daemon_uuid = reply[0]['rows'][0]['_uuid'][1]
except (KeyError, TypeError):
raise RuntimeError("Database schema changed")
return daemon_uuid

if __name__ == '__main__':
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((OVSDB_IP, OVSDB_PORT))
ovs = OVSDB()

current_id = 0

s.send(list_dbs())
db_list = gather_reply(s)
db_name = db_list['result'][0]
print "List dbs"
db_list = ovs.get_dbs()
print db_list['result'][0]

print "list bridges:"
s.send(list_bridges())
bridge_list = gather_reply(s)
bridge_list = ovs.list_bridges()
print bridge_list
bridges = bridge_list['result']['Bridge']
bridges = bridge_list['Bridge']
print "\nbridges\n"
print bridges.values()
for bridge in bridges.values():
print "---"
print bridge['new']['name']
#db_schema = get_schema(s, db_name)
#print db_schema

#columns = {"Bridge":{"columns":["name"]}}
#print monitor(s, columns, 1)


# TODO: Put this in a thread and use Queues to send/recv data from the thread
message_queues = {}
message_queues[s] = Queue.Queue()
listen_for_messages(s, message_queues)
# message_queues = {}
# message_queues[s] = Queue.Queue()
# listen_for_messages(s, message_queues)
Binary file removed ovsdb.pyc
Binary file not shown.
27 changes: 7 additions & 20 deletions ovsdb_test.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,23 @@
import ovsdb
from ovsdb import OVSDB
import unittest
import socket

class TestFunctions(unittest.TestCase):

def setUp(self):
OVSDB_IP = '127.0.0.1'
OVSDB_PORT = 6632
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((OVSDB_IP, OVSDB_PORT))
self.ovs = OVSDB()

def test_list_dbs(self):
self.sock.send(ovsdb.list_dbs())
db_list = ovsdb.gather_reply(self.sock)
db_list = self.ovs.get_dbs()
db_name = db_list['result'][0]
self.assertEqual(db_name, 'Open_vSwitch')

def test_monitor(self):
columns = {"Bridge":{"columns":["name"]}}
self.sock.send(ovsdb.monitor(columns))
result = ovsdb.gather_reply(self.sock)
self.assertEqual(result['error'], None)
self.assertEqual(result['id'], 0)
self.assertTrue("Bridge" in result['result'])

def test_list_br(self):
self.sock.send(ovsdb.list_bridges())
bridge_list = ovsdb.gather_reply(self.sock)
bridges = bridge_list['result']['Bridge']
bridge_list = self.ovs.list_bridges()
bridges = bridge_list['Bridge']
#print bridges
print bridges.values()
self.assertTrue("br0" in bridges.values())
#print bridges.values()[0]['new']['name']
self.assertTrue("br0" in bridges.values()[0]['new']['name'])

def test_choice(self):
self.assertTrue(True)
Expand Down