Skip to content
Snippets Groups Projects

implement isolated docker compose container for test env as a fixture

Files
4
+ 189
2
 
import concurrent.futures
import contextlib
import contextlib
import ipaddress
import ipaddress
import json
import json
 
import logging
import os
import os
 
import random
import socket
import socket
 
import subprocess
import tempfile
import tempfile
 
import time
from pathlib import Path
from pathlib import Path
import orchestrator
import orchestrator
import pytest
import pytest
 
import yaml
from alembic import command
from alembic import command
from alembic.config import Config
from alembic.config import Config
from faker import Faker
from faker import Faker
@@ -22,6 +28,181 @@ from starlette.testclient import TestClient
@@ -22,6 +28,181 @@ from starlette.testclient import TestClient
from gso.main import init_gso_app
from gso.main import init_gso_app
 
logger = logging.getLogger(__name__)
 
 
CONTAINER_UP_TIMEOUT_S = 40
 
CONTAINER_HEALTH_TIMEOUT_S = 60
 
 
 
def _wait_for_container_to_be_healthy(container_name):
 
def _is_container_healthy():
 
args = ["docker", "inspect", container_name]
 
# just die badly in case of any problems
 
# (e.g. json decoding error, no health in output)
 
inspect_output = json.loads(_run_command_or_fail(args))
 
 
# barf if this element isn't the output
 
# ... we require a health check in the container
 
return inspect_output[0]["State"]["Health"]["Status"] == "healthy"
 
 
start = time.time()
 
while True:
 
if time.time() - start > CONTAINER_HEALTH_TIMEOUT_S:
 
break
 
if _is_container_healthy():
 
return True
 
time.sleep(1)
 
 
return False
 
 
 
def _run_command_or_fail(args):
 
dc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8") # noqa S603
 
output_info, _ = dc.communicate(timeout=CONTAINER_UP_TIMEOUT_S)
 
assert dc.returncode == 0, output_info
 
return output_info
 
 
 
def _use_isolated_docker_compose():
 
if not bool(os.environ.get("USE_ISOLATED_DOCKER_COMPOSE")):
 
logger.warning('"USE_ISOLATED_DOCKER_COMPOSE" is not defined in the environment')
 
return False
 
 
def _is_docker_compose_installed():
 
compose_result = subprocess.call(
 
["docker", "compose"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL # noqa S603
 
)
 
 
if compose_result == 0:
 
return True
 
 
compose_result = subprocess.call(
 
["docker-compose"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL # noqa S603
 
)
 
 
return compose_result == 0
 
 
assert _is_docker_compose_installed(), "docker-compose is not available in the environment"
 
return True
 
 
 
@contextlib.contextmanager
 
def run_docker_compose(compose_filename, container_names):
 
# make a random project name, rather than some env dirname
 
project_name = f"test-{random.randint(1000, 10000)}" # noqa S311
 
 
def _get_compose_command_arguments(params):
 
compose_result = subprocess.call(
 
["docker", "compose"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL # noqa S603
 
)
 
if compose_result:
 
return ["docker-compose"] + params
 
return ["docker", "compose"] + params
 
 
args = _get_compose_command_arguments(["-f", compose_filename, "-p", project_name, "up", "--detach"])
 
 
_run_command_or_fail(args)
 
 
try:
 
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
 
future_to_container_name = {
 
executor.submit(_wait_for_container_to_be_healthy, name): name for name in container_names
 
}
 
 
for future in concurrent.futures.as_completed(future_to_container_name):
 
name = future_to_container_name[future]
 
assert future.result(), f"health check failed for container {name}"
 
 
yield # wait here until the context finishes
 
 
finally:
 
for name in container_names:
 
try:
 
args = ["docker", "logs", name]
 
logging.info(_run_command_or_fail(args))
 
except: # noqa: E722
 
# crappy bare except, but this is debugging code
 
# continue with cleanup - but log an error message about this
 
logging.exception(f"error calling `docker logs {name}`")
 
 
args = _get_compose_command_arguments(["-f", compose_filename, "-p", project_name, "down"])
 
_run_command_or_fail(args)
 
 
 
@pytest.fixture(scope="session")
 
def free_host_ports():
 
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as postgres:
 
postgres.bind(("", 0))
 
postgres.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
return {"postgres": postgres.getsockname()[1]}
 
 
 
@pytest.fixture(scope="session")
 
def postgres_db_params(free_host_ports):
 
return {
 
"hostname": "localhost",
 
"port": free_host_ports["postgres"],
 
"username": "nwa",
 
"password": "nwa",
 
"database": "gso-test-db",
 
}
 
 
 
@pytest.fixture(scope="session")
 
def postgres_container_params(free_host_ports, postgres_db_params):
 
container_name = f"gso-postgres-{random.randint(1000, 10000)}" # noqa S311
 
 
yield {
 
"image": "postgres:15.4",
 
"container_name": container_name,
 
"ports": [f'{free_host_ports["postgres"]}:5432'],
 
"environment": {
 
"POSTGRES_USER": postgres_db_params["username"],
 
"POSTGRES_PASSWORD": postgres_db_params["password"],
 
"POSTGRES_DB": postgres_db_params["database"],
 
},
 
"healthcheck": {
 
"interval": "2s",
 
"timeout": "2s",
 
"retries": 3,
 
"test": [
 
"CMD-SHELL",
 
f"pg_isready -U {postgres_db_params['username']} -d {postgres_db_params['database']}",
 
],
 
},
 
}
 
 
 
@pytest.fixture(scope="session")
 
def testenv_docker_compose_params(postgres_container_params):
 
compose_config = {
 
"version": "3.1",
 
"services": {
 
"gso-postgres-server": postgres_container_params,
 
},
 
}
 
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml") as f:
 
yaml.dump(compose_config, f)
 
f.flush()
 
 
yield {
 
"compose_filename": f.name,
 
"containers": [service["container_name"] for service in compose_config["services"].values()],
 
}
 
 
 
@pytest.fixture(scope="session")
 
def testenv_docker_containers(testenv_docker_compose_params):
 
if _use_isolated_docker_compose():
 
with run_docker_compose(
 
compose_filename=testenv_docker_compose_params["compose_filename"],
 
container_names=testenv_docker_compose_params["containers"],
 
):
 
# wait here until the caller context finishes
 
yield
 
else:
 
yield
 
class FakerProvider(BaseProvider):
class FakerProvider(BaseProvider):
def ipv4_network(self):
def ipv4_network(self):
@@ -116,8 +297,13 @@ def data_config_filename(configuration_data) -> str:
@@ -116,8 +297,13 @@ def data_config_filename(configuration_data) -> str:
@pytest.fixture(scope="session")
@pytest.fixture(scope="session")
def db_uri():
def db_uri(postgres_db_params):
"""Provide the database uri configuration to run the migration on."""
"""Provide the database uri configuration to run the migration on."""
 
if _use_isolated_docker_compose():
 
return (
 
f"postgresql://{postgres_db_params['username']}:{postgres_db_params['password']}"
 
f"@{postgres_db_params['hostname']}:{postgres_db_params['port']}/{postgres_db_params['database']}"
 
)
return os.environ.get("DATABASE_URI_TEST", "postgresql://nwa:nwa@localhost/gso-test-db")
return os.environ.get("DATABASE_URI_TEST", "postgresql://nwa:nwa@localhost/gso-test-db")
@@ -149,12 +335,13 @@ def run_migrations(db_uri: str) -> None:
@@ -149,12 +335,13 @@ def run_migrations(db_uri: str) -> None:
@pytest.fixture(scope="session")
@pytest.fixture(scope="session")
def database(db_uri):
def database(db_uri, testenv_docker_containers):
"""Create database and run migrations and cleanup after wards.
"""Create database and run migrations and cleanup after wards.
Args:
Args:
----
----
db_uri: The database uri configuration to run the migration on.
db_uri: The database uri configuration to run the migration on.
 
testenv_docker_containers: The docker containers to run the migration on.
"""
"""
db.update(Database(db_uri))
db.update(Database(db_uri))
Loading