import pytest import json from .models import Timeseries from toardb.stationmeta.models import StationmetaCore from toardb.stationmeta.schemas import get_geom_from_coordinates, Coordinates from toardb.variables.models import Variable from toardb.contacts.models import Person, Organisation from toardb.auth_user.models import AuthUser # Required imports 'create_test_database' from .test_base import ( client, get_test_db, create_test_database, url, test_db_session as db, ) from .test_base import _db_conn class TestApps: def setup(self): self.application_url = "/timeseries/" """Set up all the data before each test If you want the setup only once (per test module), the scope argument is not working in the expected way, as discussed here: https://stackoverflow.com/questions/45817153/py-test-fixture-use-function-fixture-in-scope-fixture """ @pytest.fixture(autouse=True) def setup_db_data(self, db): # id_seq will not be reset automatically between tests! fake_conn = _db_conn.raw_connection() fake_cur = fake_conn.cursor() fake_cur.execute("ALTER SEQUENCE auth_user_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE variables_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE stationmeta_core_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE stationmeta_global_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE stationmeta_global_services_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE stationmeta_roles_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE stationmeta_annotations_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE stationmeta_aux_docs_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE stationmeta_aux_images_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE stationmeta_aux_urls_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE persons_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE organisations_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE timeseries_annotations_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE timeseries_roles_id_seq RESTART WITH 1") fake_conn.commit() fake_cur.execute("ALTER SEQUENCE timeseries_id_seq RESTART WITH 1") fake_conn.commit() infilename = "toardb/auth_user/fixtures/auth.json" with open(infilename) as f: metajson=json.load(f) for entry in metajson: new_auth_user = AuthUser(**entry) db.add(new_auth_user) db.commit() db.refresh(new_auth_user) infilename = "toardb/contacts/fixtures/persons.json" with open(infilename) as f: metajson=json.load(f) for entry in metajson: new_person = Person(**entry) db.add(new_person) db.commit() db.refresh(new_person) infilename = "toardb/contacts/fixtures/organisations.json" with open(infilename) as f: metajson=json.load(f) for entry in metajson: new_organisation = Organisation(**entry) db.add(new_organisation) db.commit() db.refresh(new_organisation) infilename = "toardb/variables/fixtures/variables.json" with open(infilename) as f: metajson=json.load(f) for entry in metajson: new_variable = Variable(**entry) db.add(new_variable) db.commit() db.refresh(new_variable) infilename = "toardb/stationmeta/fixtures/stationmeta_core.json" with open(infilename) as f: metajson=json.load(f) for entry in metajson: new_stationmeta_core = StationmetaCore(**entry) # there's a mismatch with coordinates --> how to automatically switch back and forth?! tmp_coordinates = new_stationmeta_core.coordinates new_stationmeta_core.coordinates = get_geom_from_coordinates(Coordinates(**new_stationmeta_core.coordinates)) # there's also a mismatch with additional_metadata --> BUT: this should not be switched back! # in upload command, we have now: "additional_metadata": "{}" # but return from this method gives: "additional_metadata": {} # ==> there is a mismatch between model(JSONB) and schema(JSON) new_stationmeta_core.additional_metadata = str(new_stationmeta_core.additional_metadata) db.add(new_stationmeta_core) db.commit() db.refresh(new_stationmeta_core) infilename = "toardb/timeseries/fixtures/timeseries.json" with open(infilename) as f: metajson=json.load(f) for entry in metajson: new_timeseries = Timeseries(**entry) db.add(new_timeseries) db.commit() db.refresh(new_timeseries) def test_get_timeseries(self, client, db): response = client.get("/timeseries/") expected_status_code = 200 assert response.status_code == expected_status_code expected_resp = [{'id': 1, 'label': 'CMA', 'order': 1, 'access_rights': 0, 'sampling_frequency': 0, 'aggregation': 0, 'data_start_date': '2003-09-07T15:30:00+02:00', 'data_end_date': '2016-12-31T14:30:00+01:00', 'measurement_method': 'UV absorption', 'sampling_height': 7.0, 'date_added': '2020-05-15T15:30:00+02:00', 'date_modified': '2020-05-16T09:30:00+02:00', 'station_id': 2, 'variable_id': 7, 'additional_metadata':{}}] assert response.json() == expected_resp # def test_get_all(self): # response = client.get("/timeseries_nested/") # expected_status_code = 200 # assert response.status_code == expected_status_code # expected_resp = ... # assert response.json() == expected_resp def test_get_special(self, client, db): response = client.get("/timeseries/1") expected_status_code = 200 assert response.status_code == expected_status_code expected_resp = {'id': 1, 'label': 'CMA', 'order': 1, 'access_rights': 0, 'sampling_frequency': 0, 'aggregation': 0, 'data_start_date': '2003-09-07T15:30:00+02:00', 'data_end_date': '2016-12-31T14:30:00+01:00', 'measurement_method': 'UV absorption', 'sampling_height': 7.0, 'date_added': '2020-05-15T15:30:00+02:00', 'date_modified': '2020-05-16T09:30:00+02:00', 'station_id': 2, 'variable_id': 7, 'additional_metadata':{}} assert response.json() == expected_resp # def test_insert_new_without_credits(self): #? response = client.post("/timeseries/") # expected_status_code=401 # assert response.status_code == expected_status_code #? expected_resp = ... # assert response.json() == expected_resp # def test_insert_new_wrong_credits(self): #? response = client.post("/timeseries/") # expected_status_code = 401 # assert response.status_code == expected_status_code #? expected_resp = ... # assert response.json() == expected_resp def test_insert_new(self, client, db): response = client.post("/timeseries/", json={"timeseries": {"label": "CMA2", "order": 1, "access_rights": 0, "sampling_frequency": 0, "aggregation": 0, "data_start_date": "2003-09-07T15:30:00+02:00", "data_end_date": "2016-12-31T14:30:00+01:00", "measurement_method": "UV absorption", "sampling_height": 7.0, "date_added": "2020-05-15T15:30:00+02:00", "date_modified": "2020-05-16T09:30:00+02:00", "station_id": 2, "variable_id": 7, "additional_metadata":"{}"} } ) expected_status_code = 200 assert response.status_code == expected_status_code expected_resp = {'id': 2, 'label': 'CMA2', 'order': 1, 'access_rights': 0, 'sampling_frequency': 0, 'aggregation': 0, 'data_start_date': '2003-09-07T15:30:00+02:00', 'data_end_date': '2016-12-31T14:30:00+01:00', 'measurement_method': 'UV absorption', 'sampling_height': 7.0, 'date_added': '2020-05-15T15:30:00+02:00', 'date_modified': '2020-05-16T09:30:00+02:00', 'station_id': 2, 'variable_id': 7, 'additional_metadata':{}} assert response.json() == expected_resp ## def test_insert_duplicate(self, client, db): ## response = client.post("/timeseries/", ## json={"timeseries": ## {"id": 4, ## "additional_metadata":"{}"} ## } ## ) ## expected_status_code = 400 ## assert response.status_code == expected_status_code ## expected_resp = {'detail': 'Timeseries already registered.'} ## assert response.json() == expected_resp