Skip to content
Snippets Groups Projects
Commit 9e8bd1b6 authored by Henrik Thostrup Jensen's avatar Henrik Thostrup Jensen
Browse files

make data plane change function properly, required a bit of db/state/backend replumming though

parent 5c626932
No related branches found
No related tags found
No related merge requests found
...@@ -16,10 +16,10 @@ CREATE TABLE service_connections ( ...@@ -16,10 +16,10 @@ CREATE TABLE service_connections (
global_reservation_id text, global_reservation_id text,
description text, description text,
requester_nsa text NOT NULL, requester_nsa text NOT NULL,
requester_url text,
reserve_time timestamp NOT NULL, reserve_time timestamp NOT NULL,
reservation_state text NOT NULL, reservation_state text NOT NULL,
provision_state text NOT NULL, provision_state text NOT NULL,
activation_state text NOT NULL,
lifecycle_state text NOT NULL, lifecycle_state text NOT NULL,
source_network text NOT NULL, source_network text NOT NULL,
source_port text NOT NULL, source_port text NOT NULL,
...@@ -43,8 +43,10 @@ CREATE TABLE sub_connections ( ...@@ -43,8 +43,10 @@ CREATE TABLE sub_connections (
order_id integer NOT NULL, order_id integer NOT NULL,
reservation_state text NOT NULL, reservation_state text NOT NULL,
provision_state text NOT NULL, provision_state text NOT NULL,
activation_state text NOT NULL,
lifecycle_state text NOT NULL, lifecycle_state text NOT NULL,
data_plane_active boolean NOT NULL,
data_plane_version int,
data_plane_consistent boolean,
source_network text NOT NULL, source_network text NOT NULL,
source_port text NOT NULL, source_port text NOT NULL,
source_labels label[], source_labels label[],
...@@ -66,8 +68,8 @@ CREATE TABLE generic_backend_connections ( ...@@ -66,8 +68,8 @@ CREATE TABLE generic_backend_connections (
reserve_time timestamp NOT NULL, reserve_time timestamp NOT NULL,
reservation_state text NOT NULL, reservation_state text NOT NULL,
provision_state text NOT NULL, provision_state text NOT NULL,
activation_state text NOT NULL,
lifecycle_state text NOT NULL, lifecycle_state text NOT NULL,
data_plane_active boolean NOT NULL,
source_network text NOT NULL, source_network text NOT NULL,
source_port text NOT NULL, source_port text NOT NULL,
source_labels label[], source_labels label[],
......
...@@ -151,8 +151,8 @@ class Aggregator: ...@@ -151,8 +151,8 @@ class Aggregator:
raise error.TopologyError('Cannot connect STP %s to itself.' % source_stp) raise error.TopologyError('Cannot connect STP %s to itself.' % source_stp)
conn = database.ServiceConnection(connection_id=connection_id, revision=0, global_reservation_id=global_reservation_id, description=description, conn = database.ServiceConnection(connection_id=connection_id, revision=0, global_reservation_id=global_reservation_id, description=description,
requester_nsa=header.requester_nsa, reserve_time=datetime.datetime.utcnow(), requester_nsa=header.requester_nsa, requester_url=header.reply_to, reserve_time=datetime.datetime.utcnow(),
reservation_state=state.INITIAL, provision_state=state.SCHEDULED, activation_state=state.INACTIVE, lifecycle_state=state.INITIAL, reservation_state=state.INITIAL, provision_state=state.SCHEDULED, lifecycle_state=state.INITIAL,
source_network=source_stp.network, source_port=source_stp.port, source_labels=source_stp.labels, source_network=source_stp.network, source_port=source_stp.port, source_labels=source_stp.labels,
dest_network=dest_stp.network, dest_port=dest_stp.port, dest_labels=dest_stp.labels, dest_network=dest_stp.network, dest_port=dest_stp.port, dest_labels=dest_stp.labels,
start_time=service_params.start_time, end_time=service_params.end_time, bandwidth=service_params.bandwidth) start_time=service_params.start_time, end_time=service_params.end_time, bandwidth=service_params.bandwidth)
...@@ -244,7 +244,7 @@ class Aggregator: ...@@ -244,7 +244,7 @@ class Aggregator:
sc = database.SubConnection(provider_nsa=link_provider_nsa.urn(), sc = database.SubConnection(provider_nsa=link_provider_nsa.urn(),
connection_id=connection_id, local_link=local_link, revision=0, service_connection_id=conn.id, order_id=order_id, connection_id=connection_id, local_link=local_link, revision=0, service_connection_id=conn.id, order_id=order_id,
global_reservation_id=global_reservation_id, description=description, global_reservation_id=global_reservation_id, description=description,
reservation_state=state.INITIAL, provision_state=state.SCHEDULED, activation_state=state.INACTIVE, lifecycle_state=state.INITIAL, reservation_state=state.INITIAL, provision_state=state.SCHEDULED, lifecycle_state=state.INITIAL, data_plane_active=False,
source_network=sp.source_stp.network, source_port=sp.source_stp.port, source_labels=sp.source_stp.labels, source_network=sp.source_stp.network, source_port=sp.source_stp.port, source_labels=sp.source_stp.labels,
dest_network=sp.dest_stp.network, dest_port=sp.dest_stp.port, dest_labels=sp.dest_stp.labels, dest_network=sp.dest_stp.network, dest_port=sp.dest_stp.port, dest_labels=sp.dest_stp.labels,
start_time=sp.start_time.isoformat(), end_time=sp.end_time.isoformat(), bandwidth=sp.bandwidth) start_time=sp.start_time.isoformat(), end_time=sp.end_time.isoformat(), bandwidth=sp.bandwidth)
...@@ -282,8 +282,6 @@ class Aggregator: ...@@ -282,8 +282,6 @@ class Aggregator:
raise err raise err
@defer.inlineCallbacks @defer.inlineCallbacks
def reserveCommit(self, header, connection_id): def reserveCommit(self, header, connection_id):
...@@ -569,28 +567,6 @@ class Aggregator: ...@@ -569,28 +567,6 @@ class Aggregator:
# -- # --
@defer.inlineCallbacks
def doActivate(self, conn):
yield state.activating(conn)
yield state.active(conn)
header = nsa.NSIHeader(conn.requester_nsa, self.nsa_.urn())
now = datetime.datetime.utcnow()
data_plane_status = (True, conn.revision, True) # active, version, consistent
self.parent_requester.dataPlaneStateChange(header, conn.connection_id, 0, now, data_plane_status)
@defer.inlineCallbacks
def doTeardown(self, conn):
yield state.deactivating(conn)
yield state.inactive(conn)
header = nsa.NSIHeader(conn.requester_nsa, self.nsa_.urn())
now = datetime.datetime.utcnow()
data_plane_status = (False, conn.revision, True) # active, version, consistent
self.parent_requester.dataPlaneStateChange(header, conn.connection_id, 0, now, data_plane_status)
def doTimeout(self, conn): def doTimeout(self, conn):
header = None header = None
self.parent_requester.reserveTimeout(header, conn.connection_id, None, None, None, None, None) self.parent_requester.reserveTimeout(header, conn.connection_id, None, None, None, None, None)
...@@ -633,21 +609,38 @@ class Aggregator: ...@@ -633,21 +609,38 @@ class Aggregator:
@defer.inlineCallbacks @defer.inlineCallbacks
def dataPlaneStateChange(self, header, connection_id, notification_id, timestamp, dps): def dataPlaneStateChange(self, header, connection_id, notification_id, timestamp, dps):
active, version, version_consistent = dps active, version, consistent = dps
log.msg("Data plane change for sub connection: %s Active: %s, version %i, consistent: %s" % \
(connection_id, active, version, consistent), system=LOG_SYSTEM)
sub_conn = yield self.findSubConnection(header.provider_nsa, connection_id) sub_conn = yield self.findSubConnection(header.provider_nsa, connection_id)
sub_conn.data_plane_active = active
sub_conn.data_plane_version = version
sub_conn.data_plane_consistent = consistent
yield sub_conn.save()
conn = yield sub_conn.ServiceConnection.get() conn = yield sub_conn.ServiceConnection.get()
sub_conns = yield conn.SubConnections.get() sub_conns = yield conn.SubConnections.get()
if len(sub_conns) == 1: if not conn.requester_url:
log.msg("than one sub connection for connection %s, notifying" % conn.connection_id) log.msg("Connection %s: No url for requester to notify about data plane change" % conn.connection_id, system=LOG_SYSTEM)
if active: defer.returnValue(None)
yield self.doActivate(conn)
else: # do notification
yield self.doTeardown(conn)
else: aggr_active = all( [ sc.data_plane_active for sc in sub_conns ] )
log.msg("more than one sub connection for connection %s" % conn.connection_id) aggr_version = max( [ sc.data_plane_version for sc in sub_conns ] )
aggr_consistent = all( [ sc.data_plane_consistent for sc in sub_conns ] )
header = nsa.NSIHeader(conn.requester_nsa, self.nsa_.urn(), reply_to=conn.requester_url)
now = datetime.datetime.utcnow()
data_plane_status = (aggr_active, aggr_version, aggr_consistent)
log.msg("Connection %s: Aggregated data plane status: Active %s, version %i, consistent %s" % \
(conn.connection_id, aggr_active, aggr_version, aggr_consistent), system=LOG_SYSTEM)
self.parent_requester.dataPlaneStateChange(header, conn.connection_id, 0, now, data_plane_status)
@defer.inlineCallbacks @defer.inlineCallbacks
......
...@@ -106,7 +106,7 @@ class GenericBackend(service.Service): ...@@ -106,7 +106,7 @@ class GenericBackend(service.Service):
log.msg('Unhandled provision state %s for connection %s in scheduler building' % (conn.provision_state, conn.connection_id)) log.msg('Unhandled provision state %s for connection %s in scheduler building' % (conn.provision_state, conn.connection_id))
elif conn.start_time > now: elif conn.start_time > now:
if conn.provision_state == state.PROVISIONED and conn.activation_state != state.ACTIVE: if conn.provision_state == state.PROVISIONED and conn.data_plane_active == False:
log.msg('Connection %s: Immediate activate during buildSchedule' % conn.connection_id, system=self.log_system) log.msg('Connection %s: Immediate activate during buildSchedule' % conn.connection_id, system=self.log_system)
yield self._doActivate(conn) yield self._doActivate(conn)
elif conn.provision_state == state.SCHEDULED: elif conn.provision_state == state.SCHEDULED:
...@@ -232,7 +232,7 @@ class GenericBackend(service.Service): ...@@ -232,7 +232,7 @@ class GenericBackend(service.Service):
# should we save the requester or provider here? # should we save the requester or provider here?
conn = GenericBackendConnections(connection_id=connection_id, revision=0, global_reservation_id=global_reservation_id, description=description, conn = GenericBackendConnections(connection_id=connection_id, revision=0, global_reservation_id=global_reservation_id, description=description,
requester_nsa=header.requester_nsa, reserve_time=now, requester_nsa=header.requester_nsa, reserve_time=now,
reservation_state=state.INITIAL, provision_state=state.SCHEDULED, activation_state=state.INACTIVE, lifecycle_state=state.INITIAL, reservation_state=state.INITIAL, provision_state=state.SCHEDULED, lifecycle_state=state.INITIAL, data_plane_active=False,
source_network=source_stp.network, source_port=source_stp.port, source_labels=[src_label], source_network=source_stp.network, source_port=source_stp.port, source_labels=[src_label],
dest_network=dest_stp.network, dest_port=dest_stp.port, dest_labels=[dst_label], dest_network=dest_stp.network, dest_port=dest_stp.port, dest_labels=[dst_label],
start_time=service_params.start_time, end_time=service_params.end_time, start_time=service_params.start_time, end_time=service_params.end_time,
...@@ -319,7 +319,7 @@ class GenericBackend(service.Service): ...@@ -319,7 +319,7 @@ class GenericBackend(service.Service):
self.scheduler.cancelCall(connection_id) self.scheduler.cancelCall(connection_id)
if conn.activation_state == state.ACTIVE: if conn.data_plane_active == state.ACTIVE:
try: try:
yield self._doTeardown(conn) yield self._doTeardown(conn)
except Exception as e: except Exception as e:
...@@ -398,7 +398,7 @@ class GenericBackend(service.Service): ...@@ -398,7 +398,7 @@ class GenericBackend(service.Service):
yield self._doReserveAbort(conn) yield self._doReserveAbort(conn)
connection_states = (conn.reservation_state, conn.provision_state, conn.lifecycle_state, conn.activation_state) connection_states = (conn.reservation_state, conn.provision_state, conn.lifecycle_state, None)
header = nsa.NSIHeader(conn.requester_nsa, conn.requester_nsa) # The NSA is both requester and provider in the backend, but this might be problematic without aggregator header = nsa.NSIHeader(conn.requester_nsa, conn.requester_nsa) # The NSA is both requester and provider in the backend, but this might be problematic without aggregator
self.parent_requester.reserveTimeout(header, conn.connection_id, connection_states, self.TPC_TIMEOUT, datetime.datetime.utcnow()) self.parent_requester.reserveTimeout(header, conn.connection_id, connection_states, self.TPC_TIMEOUT, datetime.datetime.utcnow())
...@@ -442,10 +442,6 @@ class GenericBackend(service.Service): ...@@ -442,10 +442,6 @@ class GenericBackend(service.Service):
@defer.inlineCallbacks @defer.inlineCallbacks
def _doActivate(self, conn): def _doActivate(self, conn):
if conn.activation_state != state.ACTIVATING: # We died during a previous activate somehow
yield state.activating(conn)
self.logStateUpdate(conn, 'ACTIVATING')
src_target = self.connection_manager.getTarget(conn.source_port, conn.source_labels[0].type_, conn.source_labels[0].labelValue()) src_target = self.connection_manager.getTarget(conn.source_port, conn.source_labels[0].type_, conn.source_labels[0].labelValue())
dst_target = self.connection_manager.getTarget(conn.dest_port, conn.dest_labels[0].type_, conn.dest_labels[0].labelValue()) dst_target = self.connection_manager.getTarget(conn.dest_port, conn.dest_labels[0].type_, conn.dest_labels[0].labelValue())
try: try:
...@@ -454,8 +450,8 @@ class GenericBackend(service.Service): ...@@ -454,8 +450,8 @@ class GenericBackend(service.Service):
# We need to mark failure in state machine here somehow.... # We need to mark failure in state machine here somehow....
log.msg('Connection %s: Error setting up connection: %s' % (conn.connection_id, str(e)), system=self.log_system) log.msg('Connection %s: Error setting up connection: %s' % (conn.connection_id, str(e)), system=self.log_system)
# should include stack trace # should include stack trace
yield state.inactive(conn) conn.data_plane_active = False
self.logStateUpdate(conn, 'INACTIVE') yield conn.save()
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
service_ex = None service_ex = None
...@@ -464,8 +460,9 @@ class GenericBackend(service.Service): ...@@ -464,8 +460,9 @@ class GenericBackend(service.Service):
defer.returnValue(None) defer.returnValue(None)
try: try:
yield state.active(conn) conn.data_plane_active = True
self.logStateUpdate(conn, 'ACTIVE') yield conn.save()
log.msg('Connection %s: Data plane activated' % (conn.connection_id), system=self.log_system)
# we might have passed end time during activation... # we might have passed end time during activation...
end_time = conn.end_time end_time = conn.end_time
...@@ -489,8 +486,6 @@ class GenericBackend(service.Service): ...@@ -489,8 +486,6 @@ class GenericBackend(service.Service):
@defer.inlineCallbacks @defer.inlineCallbacks
def _doTeardown(self, conn): def _doTeardown(self, conn):
# this one is not used as a stand-alone, just a utility function # this one is not used as a stand-alone, just a utility function
yield state.deactivating(conn)
self.logStateUpdate(conn, 'DEACTIVATING')
src_target = self.connection_manager.getTarget(conn.source_port, conn.source_labels[0].type_, conn.source_labels[0].labelValue()) src_target = self.connection_manager.getTarget(conn.source_port, conn.source_labels[0].type_, conn.source_labels[0].labelValue())
dst_target = self.connection_manager.getTarget(conn.dest_port, conn.dest_labels[0].type_, conn.dest_labels[0].labelValue()) dst_target = self.connection_manager.getTarget(conn.dest_port, conn.dest_labels[0].type_, conn.dest_labels[0].labelValue())
...@@ -500,19 +495,20 @@ class GenericBackend(service.Service): ...@@ -500,19 +495,20 @@ class GenericBackend(service.Service):
# We need to mark failure in state machine here somehow.... # We need to mark failure in state machine here somehow....
log.msg('Connection %s: Error deactivating connection: %s' % (conn.connection_id, str(e)), system=self.log_system) log.msg('Connection %s: Error deactivating connection: %s' % (conn.connection_id, str(e)), system=self.log_system)
# should include stack trace # should include stack trace
yield state.inactive(conn) conn.data_plane_active = False # technically we don't know, but for NSI that means not active
self.logStateUpdate(conn, 'INACTIVE') yield conn.save()
error_event = self.service_registry.getHandler(registry.ERROR_EVENT, self.parent_system) error_event = self.service_registry.getHandler(registry.ERROR_EVENT, self.parent_system)
connection_states = (conn.reservation_state, conn.provision_state, conn.lifecycle_state, conn.activation_state) connection_states = (conn.reservation_state, conn.provision_state, conn.lifecycle_state, None)
service_ex = (None, type(e), str(e), None, None) service_ex = (None, type(e), str(e), None, None)
error_event(None, None, None, conn.connection_id, 'deactivateFailed', connection_states, datetime.datetime.utcnow(), str(e), service_ex) error_event(None, None, None, conn.connection_id, 'deactivateFailed', connection_states, datetime.datetime.utcnow(), str(e), service_ex)
defer.returnValue(None) defer.returnValue(None)
try: try:
yield state.inactive(conn) conn.data_plane_active = False # technically we don't know, but for NSI that means not active
self.logStateUpdate(conn, 'INACTIVE') yield conn.save()
log.msg('Connection %s: Data planed deactivated' % (conn.connection_id), system=self.log_system)
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
data_plane_status = (False, conn.revision, True) # active, version, onsistent data_plane_status = (False, conn.revision, True) # active, version, onsistent
...@@ -536,7 +532,7 @@ class GenericBackend(service.Service): ...@@ -536,7 +532,7 @@ class GenericBackend(service.Service):
self.scheduler.cancelCall(conn.connection_id) self.scheduler.cancelCall(conn.connection_id)
if conn.activation_state == state.ACTIVE: if conn.data_plane_active:
try: try:
yield self._doTeardown(conn) yield self._doTeardown(conn)
# we can only remove resource reservation entry if we succesfully shut down the link :-( # we can only remove resource reservation entry if we succesfully shut down the link :-(
......
...@@ -55,13 +55,6 @@ PROVISION_TRANSITIONS = { ...@@ -55,13 +55,6 @@ PROVISION_TRANSITIONS = {
RELEASING : [ SCHEDULED ] RELEASING : [ SCHEDULED ]
} }
ACTIVATION_TRANSITIONS = {
INACTIVE : [ ACTIVATING ],
ACTIVATING : [ ACTIVE, INACTIVE ],
ACTIVE : [ DEACTIVATING ],
DEACTIVATING : [ INACTIVE ]
}
LIFECYCLE_TRANSITIONS = { LIFECYCLE_TRANSITIONS = {
INITIAL : [ TERMINATING, TERMINATED ], INITIAL : [ TERMINATING, TERMINATED ],
TERMINATING : [ TERMINATED ], TERMINATING : [ TERMINATED ],
...@@ -130,28 +123,6 @@ def scheduled(conn): ...@@ -130,28 +123,6 @@ def scheduled(conn):
conn.provision_state = SCHEDULED conn.provision_state = SCHEDULED
return conn.save() return conn.save()
# Data plane
def activating(conn):
_switchState(ACTIVATION_TRANSITIONS, conn.activation_state, ACTIVATING)
conn.activation_state = ACTIVATING
return conn.save()
def active(conn):
_switchState(ACTIVATION_TRANSITIONS, conn.activation_state, ACTIVE)
conn.activation_state = ACTIVE
return conn.save()
def deactivating(conn):
_switchState(ACTIVATION_TRANSITIONS, conn.activation_state, DEACTIVATING)
conn.activation_state = DEACTIVATING
return conn.save()
def inactive(conn):
_switchState(ACTIVATION_TRANSITIONS, conn.activation_state, INACTIVE)
conn.activation_state = INACTIVE
return conn.save()
# Lifecyle # Lifecyle
def terminating(conn): def terminating(conn):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment