Skip to content
Snippets Groups Projects

Anomaly Detection refactoring. Overall code QA

Merged Fabio Farina requested to merge ad_refactor into master
37 files
+ 2611
1637
Compare changes
  • Side-by-side
  • Inline
Files
37
import os
import time
from river import anomaly
from river import compose
from river import preprocessing
from schedule import every, repeat, run_pending
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import queries
organization = str(os.getenv('INFLUX_ORG', "geant"))
url = str(os.getenv('INFLUX_URL'))
token = str(os.getenv('INFLUX_TOKEN'))
# Support data structures to represent topology and to handle ML models for each network segment
# topology, src_dst_map[probe_system][src][dst1, dst2, ...]
src_dst_map = {
'rpm': {},
'twping': {},
}
# ML models for each segment, models_map[probe_system][metrics][src][dst][model1, model2, ...]
models_map = {
'rpm': {
"posRttJitterMax": {},
"roundTripTimeMax": {},
},
'twping': {
"rtt_max": {},
"two_way_jitter": {},
},
}
def timer_func(func):
# This function shows the execution time of
# the function object passed
def wrap_func(*args, **kwargs):
t1 = time.time()
result = func(*args, **kwargs)
t2 = time.time()
print(f'Function {func.__name__!r} executed in {(t2-t1):.4f}s')
return result
return wrap_func
#
# ETL Methods
#
@timer_func
def get_topology(query_api):
''' Extract and update the topology adjacency map from Influx'''
rpm_topo = {}
tables = query_api.query(queries.rpm_srcs)
for src in [ row.values['_value'] for row in tables[0].records ]:
tables = query_api.query(queries.rpm_dsts.format(src=src))
rpm_topo[src] = [ row.values['_value'] for row in tables[0].records ]
twping_topo = {}
tables = query_api.query(queries.twping_srcs)
for src in [ row.values['_value'] for row in tables[0].records ]:
tables = query_api.query(queries.twping_dsts.format(src=src))
twping_topo[src] = [ row.values['_value'] for row in tables[0].records ]
return { 'rpm': rpm_topo, 'twping': twping_topo }
@timer_func
def build_models():
''' Build the data structure that keeps a ML models for each network segment '''
global src_dst_map
global models_map
# for each probing system, rpm or twamp
for probe in models_map:
# for each observed metric
for metric in models_map[probe]:
# get each src from topology and create a dict
for src in src_dst_map[probe]:
models_map[probe][metric][src] = {}
# then for each dst of a given src create an array of ML models
for dst in src_dst_map[probe][src]:
models_map[probe][metric][src][dst] = [
# Half-Space Trees random forest over unit-linearized data
compose.Pipeline(
preprocessing.MinMaxScaler(),
anomaly.QuantileThresholder(
anomaly.HalfSpaceTrees(seed=42),
q=0.997
)
),
# OneClass Support Vector Machine over standardized data
compose.Pipeline(
preprocessing.StandardScaler(),
anomaly.QuantileThresholder(
anomaly.OneClassSVM(nu=0.15),
q=0.96
)
),
]
return
def get_anomalies(timespan='-5m', warmup_mode=False):
''' Core procedure: trains models and predicts anomalies '''
global src_dst_map
global models_map
global organization
points = 0
anomalies = 0
client = InfluxDBClient(url=url, token=token, org=organization)
query_api = client.query_api()
write_api = client.write_api(write_options=SYNCHRONOUS)
data_points = {
'rpm': query_api.query_stream(
queries.rpm.format( timespan = timespan )
),
'twping': query_api.query_stream(
queries.twping.format( timespan = timespan )
),
}
for probe in data_points:
for row in data_points[probe]:
try:
points += 1
metric = row.values["_field"]
src = row.values["src"]
dst = row.values["dst"]
ts = row.values["_time"]
features = { f"{metric}_Anomaly": row.values["_value"] }
# TRAIN & SCORE
# models_map[probe][metrics][src][dst][model1, model2, ...]
raw_s = []
for model in models_map[probe][metric][src][dst]:
# Learn feature
model = model.learn_one(features)
# Score a point, if anomalous count and write to Influx if not warming up
s = model.score_one(features)
raw_s.append(s)
#print(src, dst, sum(raw_s), raw_s) #DEBUG
# Majority rule to mark a point as anomalous
if sum(raw_s) > len(raw_s)/2 :
anomalies += 1
# If warming up, just skip to the next point
if warmup_mode == True:
continue
p = {
"measurement": f"anomaly_{probe}",
"tags": {"src": src, "dst": dst,
"dst_ip": row.values["dst_ip"], "src_ip": row.values["src_ip"],
"host": row.values["host"]
},
"fields": features,
"time": ts
}
write_api.write(f"{probe}_anomaly", organization, p)
#if points > 3000:
# break #DEBUG only
except Exception as e:
print(str(e))
continue
client.close()
return (anomalies, points)
#
# Main and scheduled methods
#
@repeat( every().day.at("00:03") )
def cron_topology():
'''Update topology map'''
global src_dst_map
client = InfluxDBClient(url=url, token=token, org=organization)
src_dst_map = get_topology( client.query_api() )
client.close()
@timer_func
def warmup_models():
''' Pre-train models '''
print("Models pre-training initiated: it will take some minutes (usually 5-6)")
points = get_anomalies(timespan=os.getenv('WARMUP_TIME','-1d'), warmup_mode=True)
print(f"Models pre-complete: {points} points learned")
@repeat( every(int(os.getenv('SCHEDULE_EVERY','600'))).seconds )
def cron_run():
'''Repeat the anomaly detection every SCHEDULE_EVERY seconds'''
try:
print("="*45, f"\nStart: {time.ctime()}")
points = get_anomalies(timespan=os.getenv('SCHEDULE_TIME','-10m'))
print(f"Complete: {time.ctime()} Anomalies,Points: {points}")
except Exception as e:
print(f'Operation failed: {e.strerror}')
def main():
''' Main '''
print("Starting Timemap Anomaly Detection Agent")
print(f"Requested scheduling time: {os.getenv('SCHEDULE_EVERY','600')}")
print("~"*45, "\nWarming up")
cron_topology()
build_models()
warmup_models()
#exit() # DEBUG only
print("~"*45, "\nWaiting for schedule...")
while True:
run_pending()
time.sleep(1)
if __name__ == "__main__":
main()
Loading