Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
timemap_public
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
gn4-3-wp6-t1-lola
timemap_public
Merge requests
!1
Anomaly Detection refactoring. Overall code QA
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Anomaly Detection refactoring. Overall code QA
ad_refactor
into
master
Overview
0
Commits
1
Pipelines
0
Changes
37
Merged
Fabio Farina
requested to merge
ad_refactor
into
master
2 years ago
Overview
0
Commits
1
Pipelines
0
Changes
37
Expand
0
0
Merge request reports
Compare
master
master (base)
and
latest version
latest version
5bb72d64
1 commit,
2 years ago
37 files
+
2611
−
1637
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
37
Search (e.g. *.vue) (Ctrl+P)
docker_images/timemap_analytics/analytics.py deleted
100644 → 0
+
0
−
210
Options
import
os
import
time
from
river
import
anomaly
from
river
import
compose
from
river
import
preprocessing
from
schedule
import
every
,
repeat
,
run_pending
from
influxdb_client
import
InfluxDBClient
,
Point
from
influxdb_client.client.write_api
import
SYNCHRONOUS
import
queries
organization
=
str
(
os
.
getenv
(
'
INFLUX_ORG
'
,
"
geant
"
))
url
=
str
(
os
.
getenv
(
'
INFLUX_URL
'
))
token
=
str
(
os
.
getenv
(
'
INFLUX_TOKEN
'
))
# Support data structures to represent topology and to handle ML models for each network segment
# topology, src_dst_map[probe_system][src][dst1, dst2, ...]
src_dst_map
=
{
'
rpm
'
:
{},
'
twping
'
:
{},
}
# ML models for each segment, models_map[probe_system][metrics][src][dst][model1, model2, ...]
models_map
=
{
'
rpm
'
:
{
"
posRttJitterMax
"
:
{},
"
roundTripTimeMax
"
:
{},
},
'
twping
'
:
{
"
rtt_max
"
:
{},
"
two_way_jitter
"
:
{},
},
}
def
timer_func
(
func
):
# This function shows the execution time of
# the function object passed
def
wrap_func
(
*
args
,
**
kwargs
):
t1
=
time
.
time
()
result
=
func
(
*
args
,
**
kwargs
)
t2
=
time
.
time
()
print
(
f
'
Function
{
func
.
__name__
!r}
executed in
{
(
t2
-
t1
)
:
.
4
f
}
s
'
)
return
result
return
wrap_func
#
# ETL Methods
#
@timer_func
def
get_topology
(
query_api
):
'''
Extract and update the topology adjacency map from Influx
'''
rpm_topo
=
{}
tables
=
query_api
.
query
(
queries
.
rpm_srcs
)
for
src
in
[
row
.
values
[
'
_value
'
]
for
row
in
tables
[
0
].
records
]:
tables
=
query_api
.
query
(
queries
.
rpm_dsts
.
format
(
src
=
src
))
rpm_topo
[
src
]
=
[
row
.
values
[
'
_value
'
]
for
row
in
tables
[
0
].
records
]
twping_topo
=
{}
tables
=
query_api
.
query
(
queries
.
twping_srcs
)
for
src
in
[
row
.
values
[
'
_value
'
]
for
row
in
tables
[
0
].
records
]:
tables
=
query_api
.
query
(
queries
.
twping_dsts
.
format
(
src
=
src
))
twping_topo
[
src
]
=
[
row
.
values
[
'
_value
'
]
for
row
in
tables
[
0
].
records
]
return
{
'
rpm
'
:
rpm_topo
,
'
twping
'
:
twping_topo
}
@timer_func
def
build_models
():
'''
Build the data structure that keeps a ML models for each network segment
'''
global
src_dst_map
global
models_map
# for each probing system, rpm or twamp
for
probe
in
models_map
:
# for each observed metric
for
metric
in
models_map
[
probe
]:
# get each src from topology and create a dict
for
src
in
src_dst_map
[
probe
]:
models_map
[
probe
][
metric
][
src
]
=
{}
# then for each dst of a given src create an array of ML models
for
dst
in
src_dst_map
[
probe
][
src
]:
models_map
[
probe
][
metric
][
src
][
dst
]
=
[
# Half-Space Trees random forest over unit-linearized data
compose
.
Pipeline
(
preprocessing
.
MinMaxScaler
(),
anomaly
.
QuantileThresholder
(
anomaly
.
HalfSpaceTrees
(
seed
=
42
),
q
=
0.997
)
),
# OneClass Support Vector Machine over standardized data
compose
.
Pipeline
(
preprocessing
.
StandardScaler
(),
anomaly
.
QuantileThresholder
(
anomaly
.
OneClassSVM
(
nu
=
0.15
),
q
=
0.96
)
),
]
return
def
get_anomalies
(
timespan
=
'
-5m
'
,
warmup_mode
=
False
):
'''
Core procedure: trains models and predicts anomalies
'''
global
src_dst_map
global
models_map
global
organization
points
=
0
anomalies
=
0
client
=
InfluxDBClient
(
url
=
url
,
token
=
token
,
org
=
organization
)
query_api
=
client
.
query_api
()
write_api
=
client
.
write_api
(
write_options
=
SYNCHRONOUS
)
data_points
=
{
'
rpm
'
:
query_api
.
query_stream
(
queries
.
rpm
.
format
(
timespan
=
timespan
)
),
'
twping
'
:
query_api
.
query_stream
(
queries
.
twping
.
format
(
timespan
=
timespan
)
),
}
for
probe
in
data_points
:
for
row
in
data_points
[
probe
]:
try
:
points
+=
1
metric
=
row
.
values
[
"
_field
"
]
src
=
row
.
values
[
"
src
"
]
dst
=
row
.
values
[
"
dst
"
]
ts
=
row
.
values
[
"
_time
"
]
features
=
{
f
"
{
metric
}
_Anomaly
"
:
row
.
values
[
"
_value
"
]
}
# TRAIN & SCORE
# models_map[probe][metrics][src][dst][model1, model2, ...]
raw_s
=
[]
for
model
in
models_map
[
probe
][
metric
][
src
][
dst
]:
# Learn feature
model
=
model
.
learn_one
(
features
)
# Score a point, if anomalous count and write to Influx if not warming up
s
=
model
.
score_one
(
features
)
raw_s
.
append
(
s
)
#print(src, dst, sum(raw_s), raw_s) #DEBUG
# Majority rule to mark a point as anomalous
if
sum
(
raw_s
)
>
len
(
raw_s
)
/
2
:
anomalies
+=
1
# If warming up, just skip to the next point
if
warmup_mode
==
True
:
continue
p
=
{
"
measurement
"
:
f
"
anomaly_
{
probe
}
"
,
"
tags
"
:
{
"
src
"
:
src
,
"
dst
"
:
dst
,
"
dst_ip
"
:
row
.
values
[
"
dst_ip
"
],
"
src_ip
"
:
row
.
values
[
"
src_ip
"
],
"
host
"
:
row
.
values
[
"
host
"
]
},
"
fields
"
:
features
,
"
time
"
:
ts
}
write_api
.
write
(
f
"
{
probe
}
_anomaly
"
,
organization
,
p
)
#if points > 3000:
# break #DEBUG only
except
Exception
as
e
:
print
(
str
(
e
))
continue
client
.
close
()
return
(
anomalies
,
points
)
#
# Main and scheduled methods
#
@repeat
(
every
().
day
.
at
(
"
00:03
"
)
)
def
cron_topology
():
'''
Update topology map
'''
global
src_dst_map
client
=
InfluxDBClient
(
url
=
url
,
token
=
token
,
org
=
organization
)
src_dst_map
=
get_topology
(
client
.
query_api
()
)
client
.
close
()
@timer_func
def
warmup_models
():
'''
Pre-train models
'''
print
(
"
Models pre-training initiated: it will take some minutes (usually 5-6)
"
)
points
=
get_anomalies
(
timespan
=
os
.
getenv
(
'
WARMUP_TIME
'
,
'
-1d
'
),
warmup_mode
=
True
)
print
(
f
"
Models pre-complete:
{
points
}
points learned
"
)
@repeat
(
every
(
int
(
os
.
getenv
(
'
SCHEDULE_EVERY
'
,
'
600
'
))).
seconds
)
def
cron_run
():
'''
Repeat the anomaly detection every SCHEDULE_EVERY seconds
'''
try
:
print
(
"
=
"
*
45
,
f
"
\n
Start:
{
time
.
ctime
()
}
"
)
points
=
get_anomalies
(
timespan
=
os
.
getenv
(
'
SCHEDULE_TIME
'
,
'
-10m
'
))
print
(
f
"
Complete:
{
time
.
ctime
()
}
Anomalies,Points:
{
points
}
"
)
except
Exception
as
e
:
print
(
f
'
Operation failed:
{
e
.
strerror
}
'
)
def
main
():
'''
Main
'''
print
(
"
Starting Timemap Anomaly Detection Agent
"
)
print
(
f
"
Requested scheduling time:
{
os
.
getenv
(
'
SCHEDULE_EVERY
'
,
'
600
'
)
}
"
)
print
(
"
~
"
*
45
,
"
\n
Warming up
"
)
cron_topology
()
build_models
()
warmup_models
()
#exit() # DEBUG only
print
(
"
~
"
*
45
,
"
\n
Waiting for schedule...
"
)
while
True
:
run_pending
()
time
.
sleep
(
1
)
if
__name__
==
"
__main__
"
:
main
()
Loading