diff --git a/datafiles/schema.sql b/datafiles/schema.sql index 99b67b0ecac011830f9aa4be1b548d9e8890603b..c25dd7fc08c5be49368dcb9497e44c897eac851b 100644 --- a/datafiles/schema.sql +++ b/datafiles/schema.sql @@ -16,10 +16,10 @@ CREATE TABLE service_connections ( global_reservation_id text, description text, requester_nsa text NOT NULL, + requester_url text, reserve_time timestamp NOT NULL, reservation_state text NOT NULL, provision_state text NOT NULL, - activation_state text NOT NULL, lifecycle_state text NOT NULL, source_network text NOT NULL, source_port text NOT NULL, @@ -43,8 +43,10 @@ CREATE TABLE sub_connections ( order_id integer NOT NULL, reservation_state text NOT NULL, provision_state text NOT NULL, - activation_state text NOT NULL, lifecycle_state text NOT NULL, + data_plane_active boolean NOT NULL, + data_plane_version int, + data_plane_consistent boolean, source_network text NOT NULL, source_port text NOT NULL, source_labels label[], @@ -66,8 +68,8 @@ CREATE TABLE generic_backend_connections ( reserve_time timestamp NOT NULL, reservation_state text NOT NULL, provision_state text NOT NULL, - activation_state text NOT NULL, lifecycle_state text NOT NULL, + data_plane_active boolean NOT NULL, source_network text NOT NULL, source_port text NOT NULL, source_labels label[], diff --git a/opennsa/aggregator.py b/opennsa/aggregator.py index 7c6a227af50680d7310346b2c0fcfc7e083f1824..179b6926f19495b1ad73735868773a92d0d586ee 100644 --- a/opennsa/aggregator.py +++ b/opennsa/aggregator.py @@ -151,8 +151,8 @@ class Aggregator: raise error.TopologyError('Cannot connect STP %s to itself.' % source_stp) conn = database.ServiceConnection(connection_id=connection_id, revision=0, global_reservation_id=global_reservation_id, description=description, - requester_nsa=header.requester_nsa, reserve_time=datetime.datetime.utcnow(), - reservation_state=state.INITIAL, provision_state=state.SCHEDULED, activation_state=state.INACTIVE, lifecycle_state=state.INITIAL, + requester_nsa=header.requester_nsa, requester_url=header.reply_to, reserve_time=datetime.datetime.utcnow(), + reservation_state=state.INITIAL, provision_state=state.SCHEDULED, lifecycle_state=state.INITIAL, source_network=source_stp.network, source_port=source_stp.port, source_labels=source_stp.labels, dest_network=dest_stp.network, dest_port=dest_stp.port, dest_labels=dest_stp.labels, start_time=service_params.start_time, end_time=service_params.end_time, bandwidth=service_params.bandwidth) @@ -244,7 +244,7 @@ class Aggregator: sc = database.SubConnection(provider_nsa=link_provider_nsa.urn(), connection_id=connection_id, local_link=local_link, revision=0, service_connection_id=conn.id, order_id=order_id, global_reservation_id=global_reservation_id, description=description, - reservation_state=state.INITIAL, provision_state=state.SCHEDULED, activation_state=state.INACTIVE, lifecycle_state=state.INITIAL, + reservation_state=state.INITIAL, provision_state=state.SCHEDULED, lifecycle_state=state.INITIAL, data_plane_active=False, source_network=sp.source_stp.network, source_port=sp.source_stp.port, source_labels=sp.source_stp.labels, dest_network=sp.dest_stp.network, dest_port=sp.dest_stp.port, dest_labels=sp.dest_stp.labels, start_time=sp.start_time.isoformat(), end_time=sp.end_time.isoformat(), bandwidth=sp.bandwidth) @@ -282,8 +282,6 @@ class Aggregator: raise err - - @defer.inlineCallbacks def reserveCommit(self, header, connection_id): @@ -569,28 +567,6 @@ class Aggregator: # -- - @defer.inlineCallbacks - def doActivate(self, conn): - yield state.activating(conn) - yield state.active(conn) - - header = nsa.NSIHeader(conn.requester_nsa, self.nsa_.urn()) - now = datetime.datetime.utcnow() - data_plane_status = (True, conn.revision, True) # active, version, consistent - self.parent_requester.dataPlaneStateChange(header, conn.connection_id, 0, now, data_plane_status) - - - @defer.inlineCallbacks - def doTeardown(self, conn): - yield state.deactivating(conn) - yield state.inactive(conn) - - header = nsa.NSIHeader(conn.requester_nsa, self.nsa_.urn()) - now = datetime.datetime.utcnow() - data_plane_status = (False, conn.revision, True) # active, version, consistent - self.parent_requester.dataPlaneStateChange(header, conn.connection_id, 0, now, data_plane_status) - - def doTimeout(self, conn): header = None self.parent_requester.reserveTimeout(header, conn.connection_id, None, None, None, None, None) @@ -633,21 +609,38 @@ class Aggregator: @defer.inlineCallbacks def dataPlaneStateChange(self, header, connection_id, notification_id, timestamp, dps): - active, version, version_consistent = dps + active, version, consistent = dps + log.msg("Data plane change for sub connection: %s Active: %s, version %i, consistent: %s" % \ + (connection_id, active, version, consistent), system=LOG_SYSTEM) sub_conn = yield self.findSubConnection(header.provider_nsa, connection_id) + sub_conn.data_plane_active = active + sub_conn.data_plane_version = version + sub_conn.data_plane_consistent = consistent + + yield sub_conn.save() + conn = yield sub_conn.ServiceConnection.get() sub_conns = yield conn.SubConnections.get() - if len(sub_conns) == 1: - log.msg("than one sub connection for connection %s, notifying" % conn.connection_id) - if active: - yield self.doActivate(conn) - else: - yield self.doTeardown(conn) - else: - log.msg("more than one sub connection for connection %s" % conn.connection_id) + if not conn.requester_url: + log.msg("Connection %s: No url for requester to notify about data plane change" % conn.connection_id, system=LOG_SYSTEM) + defer.returnValue(None) + + # do notification + + aggr_active = all( [ sc.data_plane_active for sc in sub_conns ] ) + aggr_version = max( [ sc.data_plane_version for sc in sub_conns ] ) + aggr_consistent = all( [ sc.data_plane_consistent for sc in sub_conns ] ) + + header = nsa.NSIHeader(conn.requester_nsa, self.nsa_.urn(), reply_to=conn.requester_url) + now = datetime.datetime.utcnow() + data_plane_status = (aggr_active, aggr_version, aggr_consistent) + log.msg("Connection %s: Aggregated data plane status: Active %s, version %i, consistent %s" % \ + (conn.connection_id, aggr_active, aggr_version, aggr_consistent), system=LOG_SYSTEM) + + self.parent_requester.dataPlaneStateChange(header, conn.connection_id, 0, now, data_plane_status) @defer.inlineCallbacks diff --git a/opennsa/backends/common/genericbackend.py b/opennsa/backends/common/genericbackend.py index ff22d9aa3990a0d6706245fd4cb9ecf1b95b9f49..22cfd8929f9a00ed9d4ac6bf99f7b2de66b52c59 100644 --- a/opennsa/backends/common/genericbackend.py +++ b/opennsa/backends/common/genericbackend.py @@ -106,7 +106,7 @@ class GenericBackend(service.Service): log.msg('Unhandled provision state %s for connection %s in scheduler building' % (conn.provision_state, conn.connection_id)) elif conn.start_time > now: - if conn.provision_state == state.PROVISIONED and conn.activation_state != state.ACTIVE: + if conn.provision_state == state.PROVISIONED and conn.data_plane_active == False: log.msg('Connection %s: Immediate activate during buildSchedule' % conn.connection_id, system=self.log_system) yield self._doActivate(conn) elif conn.provision_state == state.SCHEDULED: @@ -232,7 +232,7 @@ class GenericBackend(service.Service): # should we save the requester or provider here? conn = GenericBackendConnections(connection_id=connection_id, revision=0, global_reservation_id=global_reservation_id, description=description, requester_nsa=header.requester_nsa, reserve_time=now, - reservation_state=state.INITIAL, provision_state=state.SCHEDULED, activation_state=state.INACTIVE, lifecycle_state=state.INITIAL, + reservation_state=state.INITIAL, provision_state=state.SCHEDULED, lifecycle_state=state.INITIAL, data_plane_active=False, source_network=source_stp.network, source_port=source_stp.port, source_labels=[src_label], dest_network=dest_stp.network, dest_port=dest_stp.port, dest_labels=[dst_label], start_time=service_params.start_time, end_time=service_params.end_time, @@ -319,7 +319,7 @@ class GenericBackend(service.Service): self.scheduler.cancelCall(connection_id) - if conn.activation_state == state.ACTIVE: + if conn.data_plane_active == state.ACTIVE: try: yield self._doTeardown(conn) except Exception as e: @@ -398,7 +398,7 @@ class GenericBackend(service.Service): yield self._doReserveAbort(conn) - connection_states = (conn.reservation_state, conn.provision_state, conn.lifecycle_state, conn.activation_state) + connection_states = (conn.reservation_state, conn.provision_state, conn.lifecycle_state, None) header = nsa.NSIHeader(conn.requester_nsa, conn.requester_nsa) # The NSA is both requester and provider in the backend, but this might be problematic without aggregator self.parent_requester.reserveTimeout(header, conn.connection_id, connection_states, self.TPC_TIMEOUT, datetime.datetime.utcnow()) @@ -442,10 +442,6 @@ class GenericBackend(service.Service): @defer.inlineCallbacks def _doActivate(self, conn): - if conn.activation_state != state.ACTIVATING: # We died during a previous activate somehow - yield state.activating(conn) - self.logStateUpdate(conn, 'ACTIVATING') - src_target = self.connection_manager.getTarget(conn.source_port, conn.source_labels[0].type_, conn.source_labels[0].labelValue()) dst_target = self.connection_manager.getTarget(conn.dest_port, conn.dest_labels[0].type_, conn.dest_labels[0].labelValue()) try: @@ -454,8 +450,8 @@ class GenericBackend(service.Service): # We need to mark failure in state machine here somehow.... log.msg('Connection %s: Error setting up connection: %s' % (conn.connection_id, str(e)), system=self.log_system) # should include stack trace - yield state.inactive(conn) - self.logStateUpdate(conn, 'INACTIVE') + conn.data_plane_active = False + yield conn.save() now = datetime.datetime.utcnow() service_ex = None @@ -464,8 +460,9 @@ class GenericBackend(service.Service): defer.returnValue(None) try: - yield state.active(conn) - self.logStateUpdate(conn, 'ACTIVE') + conn.data_plane_active = True + yield conn.save() + log.msg('Connection %s: Data plane activated' % (conn.connection_id), system=self.log_system) # we might have passed end time during activation... end_time = conn.end_time @@ -489,8 +486,6 @@ class GenericBackend(service.Service): @defer.inlineCallbacks def _doTeardown(self, conn): # this one is not used as a stand-alone, just a utility function - yield state.deactivating(conn) - self.logStateUpdate(conn, 'DEACTIVATING') src_target = self.connection_manager.getTarget(conn.source_port, conn.source_labels[0].type_, conn.source_labels[0].labelValue()) dst_target = self.connection_manager.getTarget(conn.dest_port, conn.dest_labels[0].type_, conn.dest_labels[0].labelValue()) @@ -500,19 +495,20 @@ class GenericBackend(service.Service): # We need to mark failure in state machine here somehow.... log.msg('Connection %s: Error deactivating connection: %s' % (conn.connection_id, str(e)), system=self.log_system) # should include stack trace - yield state.inactive(conn) - self.logStateUpdate(conn, 'INACTIVE') + conn.data_plane_active = False # technically we don't know, but for NSI that means not active + yield conn.save() error_event = self.service_registry.getHandler(registry.ERROR_EVENT, self.parent_system) - connection_states = (conn.reservation_state, conn.provision_state, conn.lifecycle_state, conn.activation_state) + connection_states = (conn.reservation_state, conn.provision_state, conn.lifecycle_state, None) service_ex = (None, type(e), str(e), None, None) error_event(None, None, None, conn.connection_id, 'deactivateFailed', connection_states, datetime.datetime.utcnow(), str(e), service_ex) defer.returnValue(None) try: - yield state.inactive(conn) - self.logStateUpdate(conn, 'INACTIVE') + conn.data_plane_active = False # technically we don't know, but for NSI that means not active + yield conn.save() + log.msg('Connection %s: Data planed deactivated' % (conn.connection_id), system=self.log_system) now = datetime.datetime.utcnow() data_plane_status = (False, conn.revision, True) # active, version, onsistent @@ -536,7 +532,7 @@ class GenericBackend(service.Service): self.scheduler.cancelCall(conn.connection_id) - if conn.activation_state == state.ACTIVE: + if conn.data_plane_active: try: yield self._doTeardown(conn) # we can only remove resource reservation entry if we succesfully shut down the link :-( diff --git a/opennsa/state.py b/opennsa/state.py index df6a374667b8f4b601a400e792d3a660c3c326d4..f2d991b99a31ceb58bf069e26a7d1c40a0119de4 100644 --- a/opennsa/state.py +++ b/opennsa/state.py @@ -55,13 +55,6 @@ PROVISION_TRANSITIONS = { RELEASING : [ SCHEDULED ] } -ACTIVATION_TRANSITIONS = { - INACTIVE : [ ACTIVATING ], - ACTIVATING : [ ACTIVE, INACTIVE ], - ACTIVE : [ DEACTIVATING ], - DEACTIVATING : [ INACTIVE ] -} - LIFECYCLE_TRANSITIONS = { INITIAL : [ TERMINATING, TERMINATED ], TERMINATING : [ TERMINATED ], @@ -130,28 +123,6 @@ def scheduled(conn): conn.provision_state = SCHEDULED return conn.save() -# Data plane - -def activating(conn): - _switchState(ACTIVATION_TRANSITIONS, conn.activation_state, ACTIVATING) - conn.activation_state = ACTIVATING - return conn.save() - -def active(conn): - _switchState(ACTIVATION_TRANSITIONS, conn.activation_state, ACTIVE) - conn.activation_state = ACTIVE - return conn.save() - -def deactivating(conn): - _switchState(ACTIVATION_TRANSITIONS, conn.activation_state, DEACTIVATING) - conn.activation_state = DEACTIVATING - return conn.save() - -def inactive(conn): - _switchState(ACTIVATION_TRANSITIONS, conn.activation_state, INACTIVE) - conn.activation_state = INACTIVE - return conn.save() - # Lifecyle def terminating(conn):