Skip to content

Commit c9f8737

Browse files
vperelmaxdeanlorenz
authored andcommitted
Active-Active Topology - Distributor open_flow back-end
Distributor open_flow back-end with ovs state and heartbeat Implements: blueprint https://review.openstack.org/#/c/234639 Change-Id: Ifd01d908edd4e245e18651db998c07b857d46190 Co-Authored-By: Dean Lorenz <dean@il.ibm.com>
1 parent a3895c5 commit c9f8737

18 files changed

Lines changed: 1716 additions & 188 deletions

File tree

octavia/amphorae/cluster_manager/drivers/active_active/active_active_driver.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,6 @@ def finalize_amphora_cluster(self):
169169
provides=constants.LISTENERS))
170170
new_amphora_net_subflow.add(amphora_driver_tasks.ListenersUpdate(
171171
requires=(constants.LISTENERS)))
172-
# we dont need to update listeners here!!!!
173-
new_amphora_net_subflow.add(CreateAmphoraClusterAlgExtraTask(
174-
name=constants.ADD_CLUSTER_ALG_EXTRA,
175-
provides=(constants.CLUSTER_ALG_TYPE, constants.CLUSTER_MIN_SIZE)
176-
))
177172

178173
for amphoraCount in range(self._cluster_size):
179174
sf_name = 'FINALIZE_AMP_' + str(amphoraCount)
@@ -205,9 +200,9 @@ def finalize_amphora_cluster(self):
205200
name=sf_name + '-' + constants.DISTRIBUTOR_REGISTER_AMP,
206201
requires=(constants.DISTRIBUTOR,
207202
constants.LOADBALANCER,
208-
constants.AMPHORA,
209-
constants.CLUSTER_ALG_TYPE,
210-
constants.CLUSTER_MIN_SIZE)))
203+
constants.AMPHORA),
204+
inject={constants.CLUSTER_ALG_TYPE:'active_active',
205+
constants.CLUSTER_SLOT: amphoraCount}))
211206
new_amphora_net_subflow.add(finalize_amp_for_lb_subflow)
212207

213208
return new_amphora_net_subflow

octavia/amphorae/drivers/health/heartbeat_udp.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ class UDPStatusGetter(object):
3434
The heartbeats are transmitted via UDP and this class will bind to a port
3535
and absorb them
3636
"""
37-
def __init__(self, health_update, stats_update):
37+
def __init__(self, health_update, stats_update, distributor_update):
3838
self.stats_update = stats_update
3939
self.health_update = health_update
40+
self.distributo_update = distributor_update
4041
self.key = cfg.CONF.health_manager.heartbeat_key
4142
self.ip = cfg.CONF.health_manager.bind_ip
4243
self.port = cfg.CONF.health_manager.bind_port
@@ -171,6 +172,11 @@ def dorecv(self, *args, **kw):
171172
def check(self):
172173
try:
173174
(obj, _) = self.dorecv()
175+
if self.distributo_update and 'distributor_id' in obj:
176+
self.executor.submit(
177+
self.distributo_update.update_distributor_health,
178+
obj)
179+
return
174180
if self.health_update:
175181
self.executor.submit(self.health_update.update_health, obj)
176182
if self.stats_update:

octavia/cmd/distributor_agent.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,28 @@
1212
# License for the specific language governing permissions and limitations
1313
# under the License.
1414

15+
import logging
16+
import multiprocessing as multiproc
17+
1518
import sys
1619

1720
import gunicorn.app.base
1821
from oslo_config import cfg
1922
from oslo_reports import guru_meditation_report as gmr
2023
import six
2124

25+
from octavia.distributor.backend.agent.api_server import server
26+
from octavia.distributor.backend.health_daemon import health_daemon
2227
from octavia.common import service
2328
from octavia.common import utils
24-
from octavia.distributor.backend.agent.api_server import server
2529
from octavia import version
2630

31+
LOG = logging.getLogger(__name__)
2732
CONF = cfg.CONF
33+
CONF.import_group('distributor', 'octavia.common.config')
34+
CONF.import_group('haproxy_amphora', 'octavia.common.config')
35+
CONF.import_group('amphora_agent', 'octavia.common.config')
36+
HM_SENDER_CMD_QUEUE = multiproc.Queue()
2837

2938

3039
class DistributorAgent(gunicorn.app.base.BaseApplication):
@@ -50,12 +59,18 @@ def main():
5059
service.prepare_service(sys.argv)
5160

5261
gmr.TextGuruMeditation.setup_autorun(version)
62+
health_sender_proc = multiproc.Process(name='HM_sender',
63+
target=health_daemon.run_sender,
64+
args=(HM_SENDER_CMD_QUEUE,))
65+
health_sender_proc.daemon = True
66+
health_sender_proc.start()
5367

5468
# Initiate server class
5569
server_instance = server.Server()
5670

5771
bind_ip_port = utils.ip_port_str(CONF.haproxy_amphora.bind_host,
5872
CONF.distributor.bind_port)
73+
5974
options = {
6075
'bind': bind_ip_port,
6176
'workers': 1,
@@ -64,8 +79,9 @@ def main():
6479
'ca_certs': CONF.amphora_agent.agent_server_ca,
6580
'cert_reqs': True,
6681
'preload_app': True,
67-
'accesslog': '-',
68-
'errorlog': '-',
82+
'accesslog': '/var/log/distributor-agent.log',
83+
'errorlog': '/var/log/distributor-agent.log',
6984
'loglevel': 'debug',
7085
}
86+
7187
DistributorAgent(server_instance.app, options).run()

octavia/common/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,12 +430,14 @@
430430
DISTRIBUTOR_PENDING_UPDATE = 'DISTRIBUTOR_PENDING_UPDATE'
431431
DISTRIBUTOR_PENDING_CREATE = 'DISTRIBUTOR_PENDING_CREATE'
432432
DISTRIBUTOR_DELETED = 'DISTRIBUTOR_DELETED'
433+
DISTRIBUTOR_FULL = 'FULL'
433434
DISTRIBUTOR_ERROR = 'DISTRIBUTOR_ERROR'
434435
SUPPORTED_DISTRIBUTOR_PROVISIONING_STATUSES = (DISTRIBUTOR_ACTIVE,
435436
DISTRIBUTOR_ALLOCATED,
436437
DISTRIBUTOR_BOOTING,
437438
DISTRIBUTOR_READY,
438439
DISTRIBUTOR_DELETED,
440+
DISTRIBUTOR_FULL,
439441
DISTRIBUTOR_ERROR,
440442
DISTRIBUTOR_PENDING_CREATE,
441443
DISTRIBUTOR_PENDING_DELETE,
@@ -446,5 +448,6 @@
446448
DISTRIBUTOR_PENDING_DELETE)
447449
CLUSTER_ALG_TYPE = 'cluster_alg_type'
448450
CLUSTER_MIN_SIZE = 'cluster_min_size'
451+
CLUSTER_SLOT = 'cluster_slot'
449452
ADD_CLUSTER_ALG_EXTRA = (
450453
'octavia-create-amp-cluster-alg-extra')

octavia/controller/healthmanager/update_db.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,113 @@ def update_health(self, health):
212212
LOG.error(_LE("Load balancer %s is not in DB"), lb_id)
213213

214214

215+
class UpdateDistributorHealthDb(object):
216+
def __init__(self):
217+
super(UpdateDistributorHealthDb, self).__init__()
218+
self.event_streamer = stevedore_driver.DriverManager(
219+
namespace='octavia.controller.queues',
220+
name=cfg.CONF.health_manager.event_streamer_driver,
221+
invoke_on_load=True).driver
222+
self.loadbalancer_repo = repo.LoadBalancerRepository()
223+
self.distributor_repo = repo.DistributorRepository()
224+
225+
def emit(self, info_type, info_id, info_obj):
226+
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
227+
self.event_streamer.emit(cnt)
228+
229+
def update_distributor_health(self, health):
230+
"""This function is to update db info based on amphora status
231+
232+
:type health: dict
233+
:param health: map object that contains distributor, amphora info
234+
235+
The input health data structure is shown as below:
236+
237+
health = {
238+
"distributor-id": self.FAKE_UUID_1,
239+
"provisioning-state": {
240+
"state": service provisioning status,
241+
"reason": str
242+
}
243+
"loadbalancers": {
244+
"lb-id-1": {
245+
"status": instance provisioning status
246+
"size": int
247+
"registered": int
248+
}
249+
}
250+
}
251+
252+
"""
253+
session = db_api.get_session()
254+
distributor_id = health['distributor_id']
255+
distributor_state = health['provisioning_state']['state']
256+
if distributor_state == constants.DISTRIBUTOR_BOOTING:
257+
LOG.debug("Distributor %s is still booting -- skipping"
258+
" health message.", distributor_id)
259+
# nothing to report
260+
return
261+
elif distributor_state == constants.DISTRIBUTOR_FULL:
262+
# @TODO handle FULL state -- eg, set quota limit
263+
LOG.warning(_LW("Distributor %s is unavailable for new"
264+
" requests."),
265+
distributor_id)
266+
elif distributor_state == constants.DISTRIBUTOR_ERROR:
267+
LOG.warning(_LW("Distributor %s is in error state"),
268+
distributor_id)
269+
# @TODO set state of distributor so it is recycled
270+
271+
expected_lbs = self.distributor_repo.get_all_lbs_on_distributor(
272+
session, distributor_id)
273+
reported_lbs = health.get('loadbalancers', {})
274+
275+
if any(lb not in expected_lbs for lb in reported_lbs):
276+
LOG.warning(_LW("Distributor %s reported unexpected"
277+
" loadbalancer ids"), distributor_id)
278+
# @TODO something is wrong here we should recycle
279+
280+
# NO_MONITOR for all missing lbs
281+
reported_lbs.update(
282+
(lb, {'status': constants.NO_MONITOR})
283+
for lb in expected_lbs if lb not in reported_lbs)
284+
285+
# do actual update per lb
286+
for lb_id, lb in six.iteritems(reported_lbs):
287+
try:
288+
lb_in_db = self.loadbalancer_repo.get(session, id=lb_id)
289+
except sqlalchemy.orm.exc.NoResultFound:
290+
LOG.error(_LE("Load balancer %s is not in DB"), lb_id)
291+
continue
292+
293+
reported_lb_op_status = lb['status']
294+
if (reported_lb_op_status == constants.ERROR or
295+
distributor_state == constants.DISTRIBUTOR_ERROR):
296+
# Distributor error currently sets all LBs to error too.
297+
# This is conservative. Could try to recycle Distributor
298+
# without recycling the LBs
299+
new_lb_op_status = constants.ERROR
300+
elif (reported_lb_op_status == constants.DEGRADED and
301+
lb_in_db.operating_status == constants.ONLINE):
302+
new_lb_op_status = constants.DEGRADED
303+
elif (reported_lb_op_status == constants.NO_MONITOR and
304+
lb_in_db.operating_status == constants.ONLINE):
305+
# @TODO Ignoring for now. Should we do anything here?
306+
new_lb_op_status = None
307+
else:
308+
new_lb_op_status = None
309+
# @TODO verify size and registered
310+
if new_lb_op_status:
311+
LOG.debug("%s %s status has changed from %s to "
312+
"%s. Updating db and sending event.",
313+
constants.LOADBALANCER, lb_id,
314+
lb_in_db.operating_status,
315+
new_lb_op_status)
316+
self.loadbalancer_repo.update(
317+
session, lb_id, operating_status=new_lb_op_status)
318+
self.emit(constants.LOADBALANCER, lb_id,
319+
{constants.OPERATING_STATUS: new_lb_op_status})
320+
321+
215322
class UpdateStatsDb(stats.StatsMixin):
216323

217324
def __init__(self):

octavia/controller/worker/tasks/distributor_driver_tasks.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,13 @@ def revert(self, result, loadbalancer, *args, **kwargs):
8282

8383
class DistributorRegisterAmphora(BaseDistributorTask):
8484
def execute(self, distributor, loadbalancer, amphora,
85-
cluster_alg_type, cluster_min_size):
85+
cluster_alg_type, cluster_slot):
8686
load_balancer = self.loadbalancer_repo.get(
8787
db_apis.get_session(), id=loadbalancer.id)
8888
self.distributor_driver.register_amphora(
8989
distributor, load_balancer,
9090
amphora, cluster_alg_type,
91-
cluster_min_size)
91+
cluster_slot)
9292

9393
def revert(self, result, loadbalancer, *args, **kwargs):
9494
if isinstance(result, failure.Failure):
@@ -98,12 +98,11 @@ def revert(self, result, loadbalancer, *args, **kwargs):
9898

9999
class DistributorUnregisterAmphora(BaseDistributorTask):
100100
def execute(self, distributor, loadbalancer, amphora,
101-
cluster_alg_type, cluster_min_size):
101+
cluster_alg_type):
102102
load_balancer = self.loadbalancer_repo.get(db_apis.get_session(),
103103
id=loadbalancer.id)
104104
self.distributor_driver.register_amphora(distributor, load_balancer,
105-
amphora, cluster_alg_type,
106-
cluster_min_size)
105+
amphora, cluster_alg_type)
107106

108107
def revert(self, result, loadbalancer, *args, **kwargs):
109108
if isinstance(result, failure.Failure):

octavia/db/repositories.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,6 +1039,23 @@ def get_shared_ready_distributor(self, session):
10391039
return None
10401040
return distributor.to_data_model()
10411041

1042+
def get_all_lbs_on_distributor(self, session, distributor_id):
1043+
"""Get all of the load balancers on a Distributor.
1044+
1045+
:param session: A Sql Alchemy database session.
1046+
:param distributor_id: The Distributor id to list the load
1047+
balancers from
1048+
:returns: [octavia.common.data_model]
1049+
"""
1050+
with session.begin(subtransactions=True):
1051+
lb_subquery = (
1052+
session.query(models.AmphoraCluster.load_balancer_id).
1053+
filter_by(distributor_id=distributor_id).subquery())
1054+
lb_list = (session.query(models.LoadBalancer).
1055+
filter(models.LoadBalancer.id.in_(lb_subquery)).all())
1056+
data_model_list = [model.to_data_model() for model in lb_list]
1057+
return data_model_list
1058+
10421059

10431060
class AmphoraClusterRepository(BaseRepository):
10441061
model_class = models.AmphoraCluster

octavia/distributor/backend/agent/api_server/distributor_info.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ def _get_version_of_installed_package(name):
7272
return m.group(0)[len('Version: '):]
7373

7474

75-
def _get_network_bytes(interface, type):
75+
def _get_network_bytes(interface, byte_type):
7676
file_name = "/sys/class/net/{interface}/statistics/{type}_bytes".format(
77-
interface=interface, type=type)
77+
interface=interface, type=byte_type)
7878
with open(file_name, 'r') as f:
7979
return f.readline()
8080

0 commit comments

Comments
 (0)