Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 94 additions & 22 deletions care/emr/api/viewsets/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,18 +350,42 @@ class DeviceLocationHistoryViewSet(EMRModelReadOnlyViewSet):
def get_device(self):
return get_object_or_404(Device, external_id=self.kwargs["device_external_id"])

def get_queryset(self):
def authorize_retrieve(self, model_instance):
device = self.get_device()
if not AuthorizationController.call(
"can_read_device", self.request.user, device
"can_read_device",
self.request.user,
device,
):
raise PermissionDenied("You do not have permission to access the device")
Comment thread
nandkishorr marked this conversation as resolved.
if device.id != model_instance.device_id:
raise ValidationError(
"device does not match with the device location history"
)

return (
DeviceLocationHistory.objects.filter(device=device)
.select_related("location")
.order_by("-end")
def get_queryset(self):
device = self.get_device()
queryset = (
super()
.get_queryset()
.select_related("created_by", "updated_by")
.order_by("-modified_date")
)
if self.action == "list":
if not AuthorizationController.call(
"can_read_device",
self.request.user,
device,
):
raise PermissionDenied(
"You do not have permission to access the device"
)
return (
DeviceLocationHistory.objects.filter(device=device)
.select_related("location")
.order_by("-end")
)
return queryset


class DeviceEncounterHistoryViewSet(EMRModelReadOnlyViewSet):
Expand All @@ -371,22 +395,47 @@ class DeviceEncounterHistoryViewSet(EMRModelReadOnlyViewSet):
def get_device(self):
return get_object_or_404(Device, external_id=self.kwargs["device_external_id"])

def get_queryset(self):
"""
Encounter history access is limited to everyone within the location or associated with the managing org
"""
def authorize_retrieve(self, model_instance):
device = self.get_device()
if not AuthorizationController.call(
"can_read_device",
self.request.user,
device,
):
raise PermissionDenied("You do not have permission to access the device")
Comment thread
nandkishorr marked this conversation as resolved.
return (
DeviceEncounterHistory.objects.filter(device=device)
.select_related("encounter", "encounter__patient", "encounter__facility")
.order_by("-end")
if device.id != model_instance.device_id:
raise ValidationError(
"device does not match with the device encounter history"
)

def get_queryset(self):
"""
Encounter history access is limited to everyone within the location or associated with the managing org
"""
device = self.get_device()
queryset = (
super()
.get_queryset()
.select_related("created_by", "updated_by")
.order_by("-modified_date")
)
if self.action == "list":
if not AuthorizationController.call(
"can_read_device",
self.request.user,
device,
):
raise PermissionDenied(
"You do not have permission to access the device"
)
return (
DeviceEncounterHistory.objects.filter(device=device)
.select_related(
"encounter", "encounter__patient", "encounter__facility"
)
.order_by("-end")
)
return queryset


class DeviceServiceHistoryViewSet(
Expand Down Expand Up @@ -421,6 +470,19 @@ def authorize_create(self, instance):
def authorize_update(self, request_obj, model_instance):
self.authorize_create(model_instance)

def authorize_retrieve(self, model_instance):
device = self.get_device()
if not AuthorizationController.call(
"can_read_device",
self.request.user,
device,
):
raise PermissionDenied("You do not have permission to access the device")
if device.id != model_instance.device_id:
raise ValidationError(
"device does not match with the device service history"
)
Comment on lines 470 to +484

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 security Cross-parent write operations exposed by queryset refactoring

The refactoring moved parent-scoped filtering in get_queryset to list-only, and added authorize_retrieve with a parent-mismatch check. However, authorize_update and authorize_destroy were not updated with equivalent mismatch checks. For any non-list, non-retrieve action, get_object() now resolves against the full unfiltered queryset, so a caller with can_manage_device on Device A can PATCH or delete a DeviceServiceHistory record belonging to Device B by supplying Device A's ID in the URL — authorize_update checks permission against the URL device only, never against model_instance.device.

The same gap exists in every other viewset refactored in this PR that has write operations:

  • FacilityLocationEncounterViewSet (location.py) — authorize_update/authorize_destroy delegate to authorize_create, which uses the URL location with no mismatch check.
  • NoteMessageViewSet (notes.py) — authorize_update uses the URL thread via authorize_create without checking model_instance.thread.
  • OrganizationUsersViewSet (organization.py) — authorize_update/authorize_destroy use the URL organization with no mismatch check against model_instance.organization.

Each of these needs a parent-ID mismatch guard (mirroring the authorize_retrieve pattern already added) at the top of authorize_update / authorize_destroy.


Comment thread
nandkishorr marked this conversation as resolved.
def perform_update(self, instance):
if instance.edit_history and len(instance.edit_history) >= 50: # noqa PLR2004
raise ValidationError("Cannot Edit instance anymore")
Expand All @@ -441,15 +503,25 @@ def get_queryset(self):
Encounter history access is limited to everyone within the location or associated with the managing org
"""
device = self.get_device()
if not AuthorizationController.call(
"can_read_device",
self.request.user,
device,
):
raise PermissionDenied("You do not have permission to access the device")
return DeviceServiceHistory.objects.filter(device=device).order_by(
"-serviced_on"
queryset = (
super()
.get_queryset()
.select_related("created_by", "updated_by")
.order_by("-modified_date")
)
if self.action == "list":
if not AuthorizationController.call(
"can_read_device",
self.request.user,
device,
):
raise PermissionDenied(
"You do not have permission to access the device"
)
return DeviceServiceHistory.objects.filter(device=device).order_by(
"-serviced_on"
)
return queryset


def disassociate_device_from_encounter(instance):
Expand Down
27 changes: 17 additions & 10 deletions care/emr/api/viewsets/inventory/inventory_item.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from django.db.models import Q
from django_filters import rest_framework as filters
from rest_framework.exceptions import PermissionDenied
from rest_framework.exceptions import PermissionDenied, ValidationError
from rest_framework.filters import OrderingFilter

from care.emr.api.viewsets.base import EMRBaseViewSet, EMRListMixin, EMRRetrieveMixin
Expand Down Expand Up @@ -46,19 +46,26 @@ def authorize_location_read(self, location):
raise PermissionDenied("You do not have permission to read inventory items")

def authorize_retrieve(self, model_instance):
location = self.get_location_obj()
self.authorize_location_read(model_instance.location)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Authorize the URL location, not the item's current location.

Line 50 checks permissions against model_instance.location before confirming the item belongs to the URL-scoped location. That lets a mismatched request skip the requested parent location’s inventory-read check, which is rather not the point of this nested-route guard.

Proposed fix
     def authorize_retrieve(self, model_instance):
         location = self.get_location_obj()
-        self.authorize_location_read(model_instance.location)
+        self.authorize_location_read(location)
         if location.id != model_instance.location.id:
             raise ValidationError(
                 "Inventory item does not belong to the specified location"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.authorize_location_read(model_instance.location)
def authorize_retrieve(self, model_instance):
location = self.get_location_obj()
self.authorize_location_read(location)
if location.id != model_instance.location.id:
raise ValidationError(
"Inventory item does not belong to the specified location"
)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@care/emr/api/viewsets/inventory/inventory_item.py` at line 50, The
authorization check on line 50 in the retrieve method is using the item's
current location (model_instance.location) instead of the location from the URL
route parameters. This allows requests for items in different locations to
bypass the intended location-based access control. Replace
model_instance.location with the location identifier from the URL parameters
(typically available through self.kwargs) when calling
self.authorize_location_read so that the permission check validates against the
requested parent location from the nested route, not the item's actual location.

if location.id != model_instance.location.id:
raise ValidationError(
"Inventory item does not belong to the specified location"
)
Comment thread
nandkishorr marked this conversation as resolved.

def get_queryset(self):
queryset = super().get_queryset()
location = self.get_location_obj()
self.authorize_location_read(location)
include_children = (
self.request.GET.get("include_children", "false").lower() == "true"
)
if include_children:
queryset = queryset.filter(
Q(location__parent_cache__overlap=[location.id]) | Q(location=location)
if self.action == "list":
self.authorize_location_read(location)
include_children = (
self.request.GET.get("include_children", "false").lower() == "true"
)
else:
queryset = queryset.filter(location=location)
if include_children:
queryset = queryset.filter(
Q(location__parent_cache__overlap=[location.id])
| Q(location=location)
)
else:
queryset = queryset.filter(location=location)
return queryset
29 changes: 23 additions & 6 deletions care/emr/api/viewsets/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,16 @@ def authorize_update(self, request_obj, model_instance):
def authorize_destroy(self, instance):
return self.authorize_create(instance)

def authorize_retrieve(self, model_instance):
location = self.get_location_obj()
facility = self.get_facility_obj()
if not AuthorizationController.call(
"can_list_facility_location_obj", self.request.user, facility, location
):
raise PermissionDenied("You do not have permission to given location")
if location.id != model_instance.location.id:
raise ValidationError("Bed does not belong to the specified location")

Comment thread
nandkishorr marked this conversation as resolved.
def reset_encounter_location_association(self, location):
"""
Reset encounters to the right location.
Expand Down Expand Up @@ -488,14 +498,21 @@ def _validate_data(self, instance, model_obj=None): # noqa PLR0912
def get_queryset(self):
location = self.get_location_obj()
facility = self.get_facility_obj()
if not AuthorizationController.call(
"can_list_facility_location_obj", self.request.user, facility, location
):
raise PermissionDenied("You do not have permission to given location")
return FacilityLocationEncounter.objects.filter(location=location).order_by(
"-created_date"
queryset = (
super()
.get_queryset()
.select_related("created_by", "updated_by")
.order_by("-modified_date")
)

if self.action == "list":
if not AuthorizationController.call(
"can_list_facility_location_obj", self.request.user, facility, location
):
raise PermissionDenied("You do not have permission to given location")
return queryset.filter(location=location).order_by("-created_date")
return queryset
Comment thread
nandkishorr marked this conversation as resolved.


def close_related_location_from_encounter(instance):
if instance.status in COMPLETED_CHOICES:
Expand Down
27 changes: 18 additions & 9 deletions care/emr/api/viewsets/notes.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,13 @@ def get_patient_obj(self):
Patient, external_id=self.kwargs["patient_external_id"]
)

def perform_create(self, instance):
instance.thread = get_object_or_404(
def get_thread_obj(self):
return get_object_or_404(
NoteThread, external_id=self.kwargs["thread_external_id"]
)
Comment thread
nandkishorr marked this conversation as resolved.

def perform_create(self, instance):
instance.thread = self.get_thread_obj()
if encounter_id := self.request.data.get("encounter"):
encounter = get_object_or_404(Encounter, external_id=encounter_id)
if encounter.patient != instance.thread.patient:
Expand All @@ -138,9 +141,7 @@ def authorize_update(self, request_obj, model_instance):
self.authorize_create({})

def authorize_create(self, instance):
thread = get_object_or_404(
NoteThread, external_id=self.kwargs["thread_external_id"]
)
thread = self.get_thread_obj()
if thread.encounter:
allowed = AuthorizationController.call(
"can_update_encounter_clinical_data",
Expand All @@ -154,6 +155,11 @@ def authorize_create(self, instance):
if not allowed:
raise PermissionDenied("You do not have permission for this action")

def authorize_retrieve(self, model_instance):
thread = self.get_thread_obj()
if model_instance.thread != thread:
raise ValidationError("Message does not belong to the thread")

def get_queryset(self):
if not AuthorizationController.call(
"can_view_clinical_data", self.request.user, self.get_patient_obj()
Expand All @@ -166,10 +172,13 @@ def get_queryset(self):
raise PermissionDenied("Permission denied to user")
else:
raise PermissionDenied("Permission denied to user")

return (
thread = self.get_thread_obj()
queryset = (
super()
.get_queryset()
.filter(thread__external_id=self.kwargs["thread_external_id"])
.order_by("-created_date")
.select_related("created_by", "updated_by")
.order_by("-modified_date")
)
if self.action == "list":
return queryset.filter(thread=thread).order_by("-created_date")
return queryset
31 changes: 25 additions & 6 deletions care/emr/api/viewsets/organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,15 +449,34 @@ def authorize_create(self, instance):
):
raise PermissionDenied("User does not have permission for this action")

def get_queryset(self):
"""
Only users part of the organization can access its users
"""
def authorize_retrieve(self, model_instance):
organization = self.get_organization_obj()
if not AuthorizationController.call(
Comment thread
nandkishorr marked this conversation as resolved.
"can_list_organization_users_obj", self.request.user, organization
):
raise PermissionDenied(
"User does not have the required permissions to list users"
"User does not have the required permission to read user"
)
return OrganizationUser.objects.filter(organization=organization)
if model_instance.organization != organization:
raise ValidationError("User does not belong to the organization")

def get_queryset(self):
"""
Only users part of the organization can access its users
"""
organization = self.get_organization_obj()
queryset = (
super()
.get_queryset()
.select_related("created_by", "updated_by")
.order_by("-modified_date")
)
if self.action == "list":
if not AuthorizationController.call(
"can_list_organization_users_obj", self.request.user, organization
):
raise PermissionDenied(
"User does not have the required permission to list users"
)
return queryset.filter(organization=organization)
return queryset
Comment thread
nandkishorr marked this conversation as resolved.
2 changes: 2 additions & 0 deletions care/emr/api/viewsets/scheduling/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ def authorize_retrieve(self, model_instance):
self.request.user,
):
raise PermissionDenied("You do not have permission to read token")
if queue != model_instance.queue:
raise ValidationError("Token does not belong to the specified queue")

def get_queryset(self):
_, queue = self.get_queue_obj()
Expand Down
Loading
Loading