diff --git a/care/emr/api/viewsets/device.py b/care/emr/api/viewsets/device.py index 0caecad727..356f1391b6 100644 --- a/care/emr/api/viewsets/device.py +++ b/care/emr/api/viewsets/device.py @@ -350,18 +350,42 @@ class DeviceLocationHistoryViewSet(EMRModelReadOnlyViewSet): def get_device(self): return get_object_or_404(Device, external_id=self.kwargs["device_external_id"]) - def get_queryset(self): + def authorize_retrieve(self, model_instance): device = self.get_device() if not AuthorizationController.call( - "can_read_device", self.request.user, device + "can_read_device", + self.request.user, + device, ): raise PermissionDenied("You do not have permission to access the device") + if device.id != model_instance.device_id: + raise ValidationError( + "device does not match with the device location history" + ) - return ( - DeviceLocationHistory.objects.filter(device=device) - .select_related("location") - .order_by("-end") + def get_queryset(self): + device = self.get_device() + queryset = ( + super() + .get_queryset() + .select_related("created_by", "updated_by") + .order_by("-modified_date") ) + if self.action == "list": + if not AuthorizationController.call( + "can_read_device", + self.request.user, + device, + ): + raise PermissionDenied( + "You do not have permission to access the device" + ) + return ( + DeviceLocationHistory.objects.filter(device=device) + .select_related("location") + .order_by("-end") + ) + return queryset class DeviceEncounterHistoryViewSet(EMRModelReadOnlyViewSet): @@ -371,10 +395,7 @@ class DeviceEncounterHistoryViewSet(EMRModelReadOnlyViewSet): def get_device(self): return get_object_or_404(Device, external_id=self.kwargs["device_external_id"]) - def get_queryset(self): - """ - Encounter history access is limited to everyone within the location or associated with the managing org - """ + def authorize_retrieve(self, model_instance): device = self.get_device() if not AuthorizationController.call( "can_read_device", @@ -382,11 +403,39 @@ def get_queryset(self): device, ): raise PermissionDenied("You do not have permission to access the device") - return ( - DeviceEncounterHistory.objects.filter(device=device) - .select_related("encounter", "encounter__patient", "encounter__facility") - .order_by("-end") + if device.id != model_instance.device_id: + raise ValidationError( + "device does not match with the device encounter history" + ) + + def get_queryset(self): + """ + Encounter history access is limited to everyone within the location or associated with the managing org + """ + device = self.get_device() + queryset = ( + super() + .get_queryset() + .select_related("created_by", "updated_by") + .order_by("-modified_date") ) + if self.action == "list": + if not AuthorizationController.call( + "can_read_device", + self.request.user, + device, + ): + raise PermissionDenied( + "You do not have permission to access the device" + ) + return ( + DeviceEncounterHistory.objects.filter(device=device) + .select_related( + "encounter", "encounter__patient", "encounter__facility" + ) + .order_by("-end") + ) + return queryset class DeviceServiceHistoryViewSet( @@ -421,6 +470,19 @@ def authorize_create(self, instance): def authorize_update(self, request_obj, model_instance): self.authorize_create(model_instance) + def authorize_retrieve(self, model_instance): + device = self.get_device() + if not AuthorizationController.call( + "can_read_device", + self.request.user, + device, + ): + raise PermissionDenied("You do not have permission to access the device") + if device.id != model_instance.device_id: + raise ValidationError( + "device does not match with the device service history" + ) + def perform_update(self, instance): if instance.edit_history and len(instance.edit_history) >= 50: # noqa PLR2004 raise ValidationError("Cannot Edit instance anymore") @@ -441,15 +503,25 @@ def get_queryset(self): Encounter history access is limited to everyone within the location or associated with the managing org """ device = self.get_device() - if not AuthorizationController.call( - "can_read_device", - self.request.user, - device, - ): - raise PermissionDenied("You do not have permission to access the device") - return DeviceServiceHistory.objects.filter(device=device).order_by( - "-serviced_on" + queryset = ( + super() + .get_queryset() + .select_related("created_by", "updated_by") + .order_by("-modified_date") ) + if self.action == "list": + if not AuthorizationController.call( + "can_read_device", + self.request.user, + device, + ): + raise PermissionDenied( + "You do not have permission to access the device" + ) + return DeviceServiceHistory.objects.filter(device=device).order_by( + "-serviced_on" + ) + return queryset def disassociate_device_from_encounter(instance): diff --git a/care/emr/api/viewsets/inventory/inventory_item.py b/care/emr/api/viewsets/inventory/inventory_item.py index c210ca14cf..5c2198c706 100644 --- a/care/emr/api/viewsets/inventory/inventory_item.py +++ b/care/emr/api/viewsets/inventory/inventory_item.py @@ -1,6 +1,6 @@ from django.db.models import Q from django_filters import rest_framework as filters -from rest_framework.exceptions import PermissionDenied +from rest_framework.exceptions import PermissionDenied, ValidationError from rest_framework.filters import OrderingFilter from care.emr.api.viewsets.base import EMRBaseViewSet, EMRListMixin, EMRRetrieveMixin @@ -46,19 +46,26 @@ def authorize_location_read(self, location): raise PermissionDenied("You do not have permission to read inventory items") def authorize_retrieve(self, model_instance): + location = self.get_location_obj() self.authorize_location_read(model_instance.location) + if location.id != model_instance.location.id: + raise ValidationError( + "Inventory item does not belong to the specified location" + ) def get_queryset(self): queryset = super().get_queryset() location = self.get_location_obj() - self.authorize_location_read(location) - include_children = ( - self.request.GET.get("include_children", "false").lower() == "true" - ) - if include_children: - queryset = queryset.filter( - Q(location__parent_cache__overlap=[location.id]) | Q(location=location) + if self.action == "list": + self.authorize_location_read(location) + include_children = ( + self.request.GET.get("include_children", "false").lower() == "true" ) - else: - queryset = queryset.filter(location=location) + if include_children: + queryset = queryset.filter( + Q(location__parent_cache__overlap=[location.id]) + | Q(location=location) + ) + else: + queryset = queryset.filter(location=location) return queryset diff --git a/care/emr/api/viewsets/location.py b/care/emr/api/viewsets/location.py index a0e6993785..8af12b3412 100644 --- a/care/emr/api/viewsets/location.py +++ b/care/emr/api/viewsets/location.py @@ -303,6 +303,16 @@ def authorize_update(self, request_obj, model_instance): def authorize_destroy(self, instance): return self.authorize_create(instance) + def authorize_retrieve(self, model_instance): + location = self.get_location_obj() + facility = self.get_facility_obj() + if not AuthorizationController.call( + "can_list_facility_location_obj", self.request.user, facility, location + ): + raise PermissionDenied("You do not have permission to given location") + if location.id != model_instance.location.id: + raise ValidationError("Bed does not belong to the specified location") + def reset_encounter_location_association(self, location): """ Reset encounters to the right location. @@ -488,14 +498,21 @@ def _validate_data(self, instance, model_obj=None): # noqa PLR0912 def get_queryset(self): location = self.get_location_obj() facility = self.get_facility_obj() - if not AuthorizationController.call( - "can_list_facility_location_obj", self.request.user, facility, location - ): - raise PermissionDenied("You do not have permission to given location") - return FacilityLocationEncounter.objects.filter(location=location).order_by( - "-created_date" + queryset = ( + super() + .get_queryset() + .select_related("created_by", "updated_by") + .order_by("-modified_date") ) + if self.action == "list": + if not AuthorizationController.call( + "can_list_facility_location_obj", self.request.user, facility, location + ): + raise PermissionDenied("You do not have permission to given location") + return queryset.filter(location=location).order_by("-created_date") + return queryset + def close_related_location_from_encounter(instance): if instance.status in COMPLETED_CHOICES: diff --git a/care/emr/api/viewsets/notes.py b/care/emr/api/viewsets/notes.py index 676bbebc30..35958d877e 100644 --- a/care/emr/api/viewsets/notes.py +++ b/care/emr/api/viewsets/notes.py @@ -120,10 +120,13 @@ def get_patient_obj(self): Patient, external_id=self.kwargs["patient_external_id"] ) - def perform_create(self, instance): - instance.thread = get_object_or_404( + def get_thread_obj(self): + return get_object_or_404( NoteThread, external_id=self.kwargs["thread_external_id"] ) + + def perform_create(self, instance): + instance.thread = self.get_thread_obj() if encounter_id := self.request.data.get("encounter"): encounter = get_object_or_404(Encounter, external_id=encounter_id) if encounter.patient != instance.thread.patient: @@ -138,9 +141,7 @@ def authorize_update(self, request_obj, model_instance): self.authorize_create({}) def authorize_create(self, instance): - thread = get_object_or_404( - NoteThread, external_id=self.kwargs["thread_external_id"] - ) + thread = self.get_thread_obj() if thread.encounter: allowed = AuthorizationController.call( "can_update_encounter_clinical_data", @@ -154,6 +155,11 @@ def authorize_create(self, instance): if not allowed: raise PermissionDenied("You do not have permission for this action") + def authorize_retrieve(self, model_instance): + thread = self.get_thread_obj() + if model_instance.thread != thread: + raise ValidationError("Message does not belong to the thread") + def get_queryset(self): if not AuthorizationController.call( "can_view_clinical_data", self.request.user, self.get_patient_obj() @@ -166,10 +172,13 @@ def get_queryset(self): raise PermissionDenied("Permission denied to user") else: raise PermissionDenied("Permission denied to user") - - return ( + thread = self.get_thread_obj() + queryset = ( super() .get_queryset() - .filter(thread__external_id=self.kwargs["thread_external_id"]) - .order_by("-created_date") + .select_related("created_by", "updated_by") + .order_by("-modified_date") ) + if self.action == "list": + return queryset.filter(thread=thread).order_by("-created_date") + return queryset diff --git a/care/emr/api/viewsets/organization.py b/care/emr/api/viewsets/organization.py index a8edfb227d..f54ef52b8c 100644 --- a/care/emr/api/viewsets/organization.py +++ b/care/emr/api/viewsets/organization.py @@ -449,15 +449,34 @@ def authorize_create(self, instance): ): raise PermissionDenied("User does not have permission for this action") - def get_queryset(self): - """ - Only users part of the organization can access its users - """ + def authorize_retrieve(self, model_instance): organization = self.get_organization_obj() if not AuthorizationController.call( "can_list_organization_users_obj", self.request.user, organization ): raise PermissionDenied( - "User does not have the required permissions to list users" + "User does not have the required permission to read user" ) - return OrganizationUser.objects.filter(organization=organization) + if model_instance.organization != organization: + raise ValidationError("User does not belong to the organization") + + def get_queryset(self): + """ + Only users part of the organization can access its users + """ + organization = self.get_organization_obj() + queryset = ( + super() + .get_queryset() + .select_related("created_by", "updated_by") + .order_by("-modified_date") + ) + if self.action == "list": + if not AuthorizationController.call( + "can_list_organization_users_obj", self.request.user, organization + ): + raise PermissionDenied( + "User does not have the required permission to list users" + ) + return queryset.filter(organization=organization) + return queryset diff --git a/care/emr/api/viewsets/scheduling/token.py b/care/emr/api/viewsets/scheduling/token.py index d2c5e5f8e2..2d6cf9508e 100644 --- a/care/emr/api/viewsets/scheduling/token.py +++ b/care/emr/api/viewsets/scheduling/token.py @@ -153,6 +153,8 @@ def authorize_retrieve(self, model_instance): self.request.user, ): raise PermissionDenied("You do not have permission to read token") + if queue != model_instance.queue: + raise ValidationError("Token does not belong to the specified queue") def get_queryset(self): _, queue = self.get_queue_obj() diff --git a/care/emr/tests/test_device_api.py b/care/emr/tests/test_device_api.py index 8bd3a0c148..376d4ac0e9 100644 --- a/care/emr/tests/test_device_api.py +++ b/care/emr/tests/test_device_api.py @@ -709,6 +709,25 @@ def test_list_device_with_location(self): self.assertEqual(response.status_code, 200) self.assertEqual(response.json()["count"], 1) + def test_retrieve_device_location_history_with_mismatched_device(self): + history = self.associate_location_with_device(self.device, self.location) + other_device = self.create_device() + url = reverse( + "device_location_history-detail", + kwargs={ + "facility_external_id": self.facility.external_id, + "device_external_id": other_device["id"], + "external_id": history["id"], + }, + ) + self.add_permissions([DevicePermissions.can_list_devices.name]) + response = self.client.get(url) + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.json()["errors"][0]["msg"], + "device does not match with the device location history", + ) + class TestDeviceEncounterHistoryViewSet(DeviceBaseTest): def setUp(self): @@ -769,6 +788,30 @@ def test_retrieve_device_encounter_history(self): self.assertEqual(response.status_code, 200) self.assertEqual(response.json()["id"], history["id"]) + def test_retrieve_device_encounter_history_with_mismatched_device(self): + history = self.associate_encounter_with_device(self.device, self.encounter) + other_device = self.create_device() + url = reverse( + "device_encounter_history-detail", + kwargs={ + "facility_external_id": self.facility.external_id, + "device_external_id": other_device["id"], + "external_id": history["id"], + }, + ) + self.add_permissions( + [ + DevicePermissions.can_list_devices.name, + EncounterPermissions.can_list_encounter.name, + ] + ) + response = self.client.get(url) + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.json()["errors"][0]["msg"], + "device does not match with the device encounter history", + ) + class TestDeviceServiceHistoryViewSet(DeviceBaseTest): def setUp(self): @@ -887,3 +930,22 @@ def test_delete_device_service_history(self): ) response = self.client.delete(url) self.assertEqual(response.status_code, 405) # delete doesn't exist + + def test_retrieve_device_service_history_with_mismatched_device(self): + history = self.create_device_service_history(self.device) + other_device = self.create_device() + url = reverse( + "device_service_history-detail", + kwargs={ + "facility_external_id": self.facility.external_id, + "device_external_id": other_device["id"], + "external_id": history["id"], + }, + ) + self.add_permissions([DevicePermissions.can_list_devices.name]) + response = self.client.get(url) + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.json()["errors"][0]["msg"], + "device does not match with the device service history", + ) diff --git a/care/emr/tests/test_location_api.py b/care/emr/tests/test_location_api.py index a3f8cb218f..e7720a677c 100644 --- a/care/emr/tests/test_location_api.py +++ b/care/emr/tests/test_location_api.py @@ -984,6 +984,27 @@ def test_retrieve_with_permissions(self): self.assertEqual(response.status_code, 200) self.assertEqual(response.json()["id"], facility_location_encounter["id"]) + def test_retrieve_facility_location_encounter_with_wrong_location(self): + another_location = self.create_facility_location() + facility_location_encounter = self.create_facility_location_encounter( + self.encounter + ) + self.client.force_authenticate(self.super_user) + url = reverse( + "association-detail", + kwargs={ + "facility_external_id": self.facility.external_id, + "location_external_id": another_location["id"], + "external_id": facility_location_encounter["id"], + }, + ) + response = self.client.get(url) + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.json()["errors"][0]["msg"], + "Bed does not belong to the specified location", + ) + # DELETE TESTS def test_delete_without_permission(self): facility_location_encounter = self.create_facility_location_encounter( diff --git a/care/emr/tests/test_notes_api.py b/care/emr/tests/test_notes_api.py index c547985acb..dcfe67d1d6 100644 --- a/care/emr/tests/test_notes_api.py +++ b/care/emr/tests/test_notes_api.py @@ -2,6 +2,11 @@ from model_bakery import baker from care.emr.models.notes import NoteMessage, NoteThread +from care.emr.signals.patient.facility_name_identifier import ( + FacilityPatientNameIdentifierConfig, +) +from care.emr.signals.patient.name_identifier import NameIdentifierConfig +from care.emr.signals.patient.phone_number_identifier import PhoneNumberIdentifierConfig from care.security.permissions.encounter import EncounterPermissions from care.security.permissions.patient import PatientPermissions from care.utils.tests.base import CareAPITestBase @@ -191,7 +196,11 @@ def test_update_thread_without_permission(self): class NoteMessageApiTestCase(CareAPITestBase): def setUp(self): super().setUp() + NameIdentifierConfig.CACHED_CONFIG = {} + PhoneNumberIdentifierConfig.CACHED_CONFIG = {} + FacilityPatientNameIdentifierConfig.CACHED_CONFIG = {} self.user = self.create_user() + self.superuser = self.create_super_user() self.facility = self.create_facility(user=self.user) self.facility_organization = self.create_facility_organization( facility=self.facility @@ -278,6 +287,24 @@ def test_list_notes_on_encounter(self): self.assertEqual(response.status_code, 200, response.data) self.assertContains(response, note.message, status_code=200) + def test_get_note_details_with_invalid_thread(self): + self.client.force_authenticate(user=self.superuser) + thread = self._create_thread() + note = self._create_note(thread) + url = reverse( + "note-detail", + kwargs={ + "patient_external_id": self.patient.external_id, + "thread_external_id": self._create_thread().external_id, + "external_id": note.external_id, + }, + ) + response = self.client.get(url, format="json") + self.assertEqual(response.status_code, 400, response.data) + self.assertContains( + response, "Message does not belong to the thread", status_code=400 + ) + def test_create_note_on_encounter_with_permission(self): role = self.create_role_with_permissions( permissions=[EncounterPermissions.can_write_encounter_clinical_data.name] diff --git a/care/emr/tests/test_organization_api.py b/care/emr/tests/test_organization_api.py index 95841edfa8..e4a1381188 100644 --- a/care/emr/tests/test_organization_api.py +++ b/care/emr/tests/test_organization_api.py @@ -812,10 +812,28 @@ def test_get_users_in_organization_as_user_without_permission(self): self.assertEqual(response.status_code, 403) self.assertContains( response, - "User does not have the required permissions to list users", + "User does not have the required permission to read user", status_code=403, ) + def test_get_users_in_organization_with_invalid_organization(self): + """Test that getting users in an invalid organization returns a 400.""" + self.client.force_authenticate(user=self.super_user) + another_org = self.create_organization( + user=self.super_user, name="Another Organization", org_type="govt" + ) + org_user = self.attach_role_organization_user( + self.root_organization, self.user, self.administrator_role + ) + response = self.client.get( + self.get_detail_url(another_org.external_id, org_user.external_id) + ) + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.data["errors"][0]["msg"], + "User does not belong to the organization", + ) + # getting User List in Organization def test_list_users_in_organization_as_super_user(self): @@ -863,7 +881,7 @@ def test_list_users_in_organization_as_user_without_permission(self): self.assertEqual(response.status_code, 403) self.assertContains( response, - "User does not have the required permissions to list users", + "User does not have the required permission to list users", status_code=403, ) @@ -968,7 +986,7 @@ def test_update_user_in_organization_as_user_without_permission(self): self.assertEqual(response.status_code, 403) self.assertContains( response, - "User does not have the required permissions to list users", + "User does not have permission for this action", status_code=403, ) diff --git a/care/emr/tests/test_token_api.py b/care/emr/tests/test_token_api.py index 66d4661fb4..c3811c2b12 100644 --- a/care/emr/tests/test_token_api.py +++ b/care/emr/tests/test_token_api.py @@ -629,6 +629,34 @@ def test_retrieve_token_as_user_without_permission(self): "You do not have permission to read token", response.data["detail"] ) + def test_retrieve_token_with_invalid_queue_external_id(self): + """Test retrieving a token with invalid queue external id.""" + self.client.force_authenticate(user=self.superuser) + token = self.create_token( + patient=self.patient, + category=self.token_category, + queue=self.token_queue, + facility=self.facility, + status=TokenStatusOptions.CREATED, + ) + another_token_queue = self.create_queue( + facility=self.facility, + resource=self.schedule_resource, + date=timezone.now().date(), + ) + response = self.client.get( + self.generate_detail_url( + str(self.facility.external_id), + str(another_token_queue.external_id), + str(token.external_id), + ) + ) + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.data["errors"][0]["msg"], + "Token does not belong to the specified queue", + ) + # Tests for Token Listing def test_list_tokens_as_superuser(self):