From de533f8a1e8c570b0d5cb01c73b4eedc0fcd9979 Mon Sep 17 00:00:00 2001
From: Tomas Cejka <cejkat@cesnet.cz>
Date: Mon, 22 Nov 2021 18:34:27 +0000
Subject: [PATCH] pytest: a few basic tests

The tests can be executed by:
`pytest`

To see the output, use
`pytest --verbosity=4 --capture=tee-sys`

Server does not need to be running for the tests, pytest can start own django
service that responds (e.g., to REST API).
It is necessary to have created DB with `admin` user having a REST API token!
The `test_viewsets.py` sets DB in `django_db_setup()` fixture.
---
 flowspec/tasks.py         |   2 +
 flowspec/test_views.py    |  15 ++++
 flowspec/test_viewsets.py | 164 ++++++++++++++++++++++++++++++++++++++
 pytest.ini                |   5 ++
 requirements.txt          |   3 +
 5 files changed, 189 insertions(+)
 create mode 100644 flowspec/test_views.py
 create mode 100644 flowspec/test_viewsets.py
 create mode 100644 pytest.ini

diff --git a/flowspec/tasks.py b/flowspec/tasks.py
index 295bef7a..243c17c5 100644
--- a/flowspec/tasks.py
+++ b/flowspec/tasks.py
@@ -17,6 +17,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 
+import pytest
 from utils import proxy as PR
 from celery import shared_task, subtask
 import logging
@@ -397,6 +398,7 @@ def snmp_add_initial_zero_value(rule_id, zero_or_null=True):
         logger.info("exit_process(): before exit in child process (pid="+str(pid)+", npid="+str(npid)+"), after os._exit")
 
 
+@pytest.mark.skip
 @shared_task(ignore_result=True,default_retry_delay=5,max_retries=2,autoretry_for=(TimeoutError,))
 def testcelerytask():
     lockname = "/tmp/testlock"
diff --git a/flowspec/test_views.py b/flowspec/test_views.py
new file mode 100644
index 00000000..76dfc503
--- /dev/null
+++ b/flowspec/test_views.py
@@ -0,0 +1,15 @@
+import pytest
+from views import welcome
+
+#@pytest.mark.asyncio
+#async def test_welcome(async_rf):
+#    request = await aync_rf.get('/welcome/')
+#    response = welcome(request)
+#    assert response.status_code == 200
+
+
+def test_welcome(rf):
+    request = rf.get('/welcome/')
+    response = welcome(request)
+    assert response.status_code == 200
+
diff --git a/flowspec/test_viewsets.py b/flowspec/test_viewsets.py
new file mode 100644
index 00000000..1fc3a78b
--- /dev/null
+++ b/flowspec/test_viewsets.py
@@ -0,0 +1,164 @@
+import pytest
+from viewsets import *
+from flowspec.models import *
+from rest_framework.authtoken.models import Token
+
+@pytest.fixture
+def api_client():
+    from rest_framework.test import APIClient
+    token = Token.objects.get(user__username='admin').key
+    api_client = APIClient()
+    api_client.credentials(HTTP_AUTHORIZATION='Token ' + token)
+    return api_client
+
+@pytest.fixture(scope='session')
+def django_db_setup():
+    settings.DATABASES['default'] = {
+        'ENGINE': 'django.db.backends.sqlite3',
+        'NAME': 'example-data',
+        'USER': '',
+        'PASSWORD': '',
+        'HOST': '',
+        'PORT': '',
+    }
+
+pytestmark = pytest.mark.django_db
+
+class TestFragmenttypes:
+    def test_list(self, api_client):
+        endpoint = '/api/matchprotocol/'
+        response = api_client.get(endpoint)
+        assert response.status_code == 200
+        resp_data = json.loads(response.content)
+        assert len(resp_data) == 3
+        assert resp_data == [ "icmp", "tcp", "udp" ]
+
+
+class TestRoute:
+    def test_add(self, api_client):
+        endpoint = '/api/routes/'
+        data = {
+            "comments": "test route",
+            "destination": "1.0.0.2/32",
+            "destinationport": "123",
+            "name": "test",
+            "protocol": [
+                "tcp"
+            ],
+            "source": "0.0.0.0/0",
+            "sourceport": "123",
+            "then": ["discard"],
+            "status": "ACTIVE"
+        }
+
+        response = api_client.post(endpoint, json.dumps(data), content_type='application/json')
+        assert response.status_code == 201
+
+        resp_data = json.loads(response.content)
+        route_id = resp_data["id"]
+        response = api_client.delete(f"{endpoint}{route_id}/")
+        print(response.content)
+
+    def test_fail_icmpwithport(self, api_client):
+        endpoint = '/api/routes/'
+        data = {
+            "status": "ACTIVE",
+            "comments": "test comment",
+            "destination": "1.0.0.3/32",
+            "destinationport": "80",
+            "name": "test",
+            "protocol": ["icmp"],
+            "fragmenttype": [
+                "is-fragment"
+            ],
+            "source": "0.0.0.0/0",
+            "sourceport": "123",
+            "then": ["discard"]
+        }
+
+        response = api_client.post(endpoint, json.dumps(data), content_type='application/json')
+        assert response.status_code == 400
+        assert json.loads(response.content) == {"non_field_errors": ["ICMP protocol does not allow to specify ports"]}
+
+
+    def test_fail_unknownthenaction(self, api_client):
+        endpoint = '/api/routes/'
+        data = {
+            "status": "ACTIVE",
+            "comments": "test comment",
+            "destination": "1.0.0.3/32",
+            "port": "80",
+            "name": "test-unknownthen",
+            "source": "0.0.0.0/0",
+            "then": ["drop"]
+        }
+ 
+        response = api_client.post(endpoint, json.dumps(data), content_type='application/json')
+        assert response.status_code == 400
+        assert json.loads(response.content) == {"then": [["ThenAction does not exist."]]}
+
+
+    def test_fail_unknownfragmenttype(self, api_client):
+        endpoint = '/api/routes/'
+        data = {
+            "status": "ACTIVE",
+            "comments": "test comment",
+            "destination": "1.0.0.3/32",
+            "destinationport": "80",
+            "name": "test",
+            "protocol": [],
+            "fragmenttype": [
+                "unknown-fragment"
+            ],
+            "source": "0.0.0.0/0",
+            "sourceport": "123",
+            "then": ["discard"]
+        }
+ 
+        response = api_client.post(endpoint, json.dumps(data), content_type='application/json')
+        assert response.status_code == 400
+        assert json.loads(response.content) == {'fragmenttype': [['FragmentType does not exist.']]}
+
+    def test_list(self, api_client):
+        endpoint = '/api/routes/'
+        response = api_client.get(endpoint)
+        assert response.status_code == 200
+        resp_data = json.loads(response.content)
+        print(resp_data)
+
+    def test_add(self, api_client):
+        endpoint = '/api/routes/'
+        data = {
+            "comments": "test route",
+            "destination": "1.0.0.2/32",
+            "destinationport": "123",
+            "name": "testcreate",
+            "protocol": [
+                "tcp"
+            ],
+            "source": "0.0.0.0/0",
+            "sourceport": "123",
+            "then": ["discard"],
+            "status": "ACTIVE"
+        }
+
+        response = api_client.post(endpoint, json.dumps(data), content_type='application/json')
+        assert response.status_code == 201
+
+        resp_data = json.loads(response.content)
+        route_id = resp_data["id"]
+        
+        # lookup created object
+        Route.objects.get(id=route_id)
+
+        response = api_client.delete(f"{endpoint}{route_id}/")
+
+        try:
+            Route.objects.get(id=route_id)
+        except:
+            # should be deleted by now
+            pass
+
+
+
+
diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 00000000..0b59bbcb
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,5 @@
+[pytest]
+DJANGO_SETTINGS_MODULE = flowspy.settings
+django_find_project = true
+django_debug_mode = true
+
diff --git a/requirements.txt b/requirements.txt
index 4e5b414d..a4285015 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -35,3 +35,6 @@ appdirs
 anyjson
 cryptography
 redis
+pytest==6.2.5
+pytest-django==4.4.0
+
-- 
GitLab