From c18db8858cf834cf937e9e47ef7d053862dba594 Mon Sep 17 00:00:00 2001 From: Jacob Jeevan Date: Mon, 29 Dec 2025 17:10:47 +0530 Subject: [PATCH 1/2] added load scripts for definitions --- .../commands/load_activity_definition.py | 559 ++++++++++++++++++ .../commands/load_charge_item_definition.py | 374 ++++++++++++ .../emr/management/commands/load_emr_utils.py | 281 +++++++++ .../commands/load_observation_definition.py | 409 +++++++++++++ .../commands/load_specimen_definition.py | 352 +++++++++++ 5 files changed, 1975 insertions(+) create mode 100644 care/emr/management/commands/load_activity_definition.py create mode 100644 care/emr/management/commands/load_charge_item_definition.py create mode 100644 care/emr/management/commands/load_emr_utils.py create mode 100644 care/emr/management/commands/load_observation_definition.py create mode 100644 care/emr/management/commands/load_specimen_definition.py diff --git a/care/emr/management/commands/load_activity_definition.py b/care/emr/management/commands/load_activity_definition.py new file mode 100644 index 0000000000..9ccac650f5 --- /dev/null +++ b/care/emr/management/commands/load_activity_definition.py @@ -0,0 +1,559 @@ +""" +Management command to load Activity Definitions from CSV/Google Sheets. + +This command loads dependencies first (specimens, observations, charge items) +before creating activity definitions. + +Usage: + python manage.py load_activity_definition --facility + python manage.py load_activity_definition --google-sheet --sheet-name --facility +""" + +import logging +from datetime import UTC, datetime +from pathlib import Path + +from django.core.management import call_command +from django.core.management.base import BaseCommand + +from care.emr.management.commands.load_emr_utils import ( + create_slug, + ensure_category, + normalize_title, + parse_code, + read_csv_from_file, + read_csv_from_google_sheet, + read_csv_from_url, + validate_and_substitute_code, + write_output_csv, +) +from care.emr.models.activity_definition import ActivityDefinition +from care.emr.models.charge_item_definition import ChargeItemDefinition +from care.emr.models.location import FacilityLocation +from care.emr.models.observation_definition import ObservationDefinition +from care.emr.models.specimen_definition import SpecimenDefinition +from care.facility.models import Facility + +logger = logging.getLogger(__name__) + +current_dir = Path(__file__).resolve().parent +root_dir = current_dir.parent.parent.parent.parent +default_output_path = root_dir / "outputs" / "activity_definition_output.csv" + + +class Command(BaseCommand): + """ + Load Activity Definitions from CSV or Google Sheets. + + Expected CSV columns: + - title + - description + - usage (optional) + - status (optional, default: active) + - classification (optional, default: laboratory) + - kind (optional, default: service_request) + - category + - code_system + - code_value + - code_display + - body_site_system (optional) + - body_site_code (optional) + - body_site_display (optional) + - observation_slugs (optional, comma-separated) + - specimen_slugs (optional, comma-separated) + - charge_item_slugs (optional, comma-separated) + - locations (optional, comma-separated location names) + - derived_from_uri (optional) + """ + + help = "Load Activity Definitions from CSV or Google Sheets" + + def add_arguments(self, parser): + parser.add_argument( + "source", + type=str, + nargs="?", + help="CSV file path or URL", + ) + parser.add_argument( + "--google-sheet", + type=str, + help="Google Sheet ID", + ) + parser.add_argument( + "--sheet-name", + type=str, + default="Sheet1", + help="Sheet name (default: Sheet1)", + ) + parser.add_argument( + "--facility", + type=str, + required=True, + help="Facility external ID", + ) + parser.add_argument( + "--output", + type=str, + help="Output CSV file path", + ) + parser.add_argument( + "--batch-size", + type=int, + default=100, + help="Batch size for processing (default: 100)", + ) + parser.add_argument( + "--validate-codes", + action="store_true", + default=True, + help="Validate codes against valuesets (default: True)", + ) + parser.add_argument( + "--no-validate-codes", + action="store_false", + dest="validate_codes", + help="Skip code validation", + ) + parser.add_argument( + "--skip-dependencies", + action="store_true", + help="Skip loading dependencies (assume they exist)", + ) + parser.add_argument( + "--specimens-csv", + type=str, + help="CSV file for specimen definitions", + ) + parser.add_argument( + "--observations-csv", + type=str, + help="CSV file for observation definitions", + ) + parser.add_argument( + "--charge-items-csv", + type=str, + help="CSV file for charge item definitions", + ) + + def load_data(self, options): + """Load data from source.""" + if options["google_sheet"]: + return read_csv_from_google_sheet( + options["google_sheet"], options["sheet_name"] + ) + if options["source"]: + if options["source"].startswith("http"): + return read_csv_from_url(options["source"]) + return read_csv_from_file(options["source"]) + raise ValueError("Must provide either source file/URL or --google-sheet") + + def load_dependencies(self, facility: Facility, options): + """Load dependencies (specimens, observations, charge items) if provided.""" + if options["skip_dependencies"]: + logger.info("Skipping dependency loading") + return + + logger.info("\n=== Loading Dependencies ===") + + # Load specimens + if options.get("specimens_csv"): + logger.info("Loading specimen definitions...") + call_command( + "load_specimen_definition", + options["specimens_csv"], + facility=str(facility.external_id), + verbosity=options["verbosity"], + ) + + # Load observations + if options.get("observations_csv"): + logger.info("Loading observation definitions...") + call_command( + "load_observation_definition", + options["observations_csv"], + facility=str(facility.external_id), + verbosity=options["verbosity"], + ) + + # Load charge items + if options.get("charge_items_csv"): + logger.info("Loading charge item definitions...") + call_command( + "load_charge_item_definition", + options["charge_items_csv"], + facility=str(facility.external_id), + verbosity=options["verbosity"], + ) + + def lookup_locations( + self, location_names: list[str], facility: Facility + ) -> tuple[list[int], list[str]]: + """ + Lookup location IDs by names. + Returns (location_ids, missing_names). + """ + if not location_names: + return [], [] + + location_ids = [] + missing = [] + + for name in location_names: + location = FacilityLocation.objects.filter( + name__iexact=name.strip(), facility=facility + ).first() + if location: + location_ids.append(location.id) + else: + missing.append(name) + logger.warning("Location not found: %s", name) + + return location_ids, missing + + def process_row( + self, row: dict, facility: Facility, validate_codes: bool, created_by + ) -> dict: + """ + Process a single CSV row into an ActivityDefinition data dict. + Raises exceptions with descriptive messages on errors. + """ + try: + if not row.get("title"): + raise ValueError("Missing required field: title") + + code_value = row.get("code_value") + code_system = row.get("code_system", "http://snomed.info/sct") + code_display = row.get("code_display") + + # Default code for activity + default_code = { + "code": "71388002", + "system": "http://snomed.info/sct", + "display": "Procedure", + } + + substitution_messages = [] + + # Validate code if requested + if validate_codes and code_value: + code, sub_msg = validate_and_substitute_code( + code_value, + code_system, + "activity-definition-procedure-code", + default_code, + ) + if sub_msg: + substitution_messages.append(f"code: {sub_msg}") + else: + code = parse_code(code_value, code_system, code_display) or default_code + + body_site = parse_code( + row.get("body_site_code"), + row.get("body_site_system", "http://snomed.info/sct"), + row.get("body_site_display"), + ) + + observation_slugs = [] + if row.get("observation_slugs"): + observation_slugs = [ + s.strip() for s in row["observation_slugs"].split(",") if s.strip() + ] + + specimen_slugs = [] + if row.get("specimen_slugs"): + specimen_slugs = [ + s.strip() for s in row["specimen_slugs"].split(",") if s.strip() + ] + + charge_item_slugs = [] + if row.get("charge_item_slugs"): + charge_item_slugs = [ + s.strip() for s in row["charge_item_slugs"].split(",") if s.strip() + ] + + location_names = [] + if row.get("locations"): + location_names = [ + s.strip() for s in row["locations"].split(",") if s.strip() + ] + + category_name = row.get("category", "laboratory") + try: + category = ensure_category( + category_name, facility, "activity_definition", created_by + ) + except Exception as e: + error_message = f"Failed to ensure category '{category_name}': {e}" + raise ValueError(error_message) from e + + title = normalize_title(row["title"]) + slug_value = create_slug(title) + + return { + "title": title, + "slug_value": slug_value, + "status": row.get("status", "active"), + "description": row.get("description", ""), + "usage": row.get("usage", ""), + "classification": row.get("classification", "laboratory"), + "kind": row.get("kind", "service_request"), + "category": category, + "code": code, + "body_site": body_site, + "observation_slugs": observation_slugs, + "specimen_slugs": specimen_slugs, + "charge_item_slugs": charge_item_slugs, + "location_names": location_names, + "derived_from_uri": row.get("derived_from_uri", ""), + "substitutions": "; ".join(substitution_messages) + if substitution_messages + else "", + } + + except (KeyError, ValueError) as e: + error_message = f"Failed to process row: {e}" + raise ValueError(error_message) from e + except Exception as e: + error_message = f"Unexpected error processing row: {e}" + raise RuntimeError(error_message) from e + + def resolve_dependencies( + self, data: dict, facility: Facility + ) -> tuple[dict, list[str]]: + """ + Resolve slug references to internal IDs. + Returns (updated_data, missing_dependencies). + """ + missing = [] + + # Resolve observation slugs + observation_ids = [] + for slug in data["observation_slugs"]: + full_slug = ObservationDefinition.calculate_slug_from_facility( + str(facility.external_id), slug + ) + obs = ObservationDefinition.objects.filter( + slug=full_slug, facility=facility + ).first() + if obs: + observation_ids.append(obs.id) + else: + missing.append(f"observation:{slug}") + + # Resolve specimen slugs + specimen_ids = [] + for slug in data["specimen_slugs"]: + full_slug = SpecimenDefinition.calculate_slug_from_facility( + str(facility.external_id), slug + ) + spec = SpecimenDefinition.objects.filter( + slug=full_slug, facility=facility + ).first() + if spec: + specimen_ids.append(spec.id) + else: + missing.append(f"specimen:{slug}") + + # Resolve charge item slugs + charge_item_ids = [] + for slug in data["charge_item_slugs"]: + full_slug = ChargeItemDefinition.calculate_slug_from_facility( + str(facility.external_id), slug + ) + charge = ChargeItemDefinition.objects.filter( + slug=full_slug, facility=facility + ).first() + if charge: + charge_item_ids.append(charge.id) + else: + missing.append(f"charge_item:{slug}") + + # Resolve locations + location_ids, missing_locations = self.lookup_locations( + data["location_names"], facility + ) + for loc in missing_locations: + missing.append(f"location:{loc}") + + data["observation_ids"] = observation_ids + data["specimen_ids"] = specimen_ids + data["charge_item_ids"] = charge_item_ids + data["location_ids"] = location_ids + + return data, missing + + def create_activity_definition( + self, data: dict, facility: Facility, created_by + ) -> ActivityDefinition: + """ + Create or update an ActivityDefinition. + Raises exceptions with descriptive messages on errors. + """ + try: + full_slug = ActivityDefinition.calculate_slug_from_facility( + str(facility.external_id), data["slug_value"] + ) + + existing = ActivityDefinition.objects.filter( + title__iexact=data["title"], facility=facility + ).first() + + if existing: + logger.warning("Activity definition already exists: %s", data["title"]) + return existing + + activity = ActivityDefinition( + facility=facility, + slug=full_slug, + title=data["title"], + status=data["status"], + description=data["description"], + usage=data["usage"], + classification=data["classification"], + kind=data["kind"], + category=data["category"], + code=data["code"], + body_site=data["body_site"], + observation_result_requirements=data["observation_ids"], + specimen_requirements=data["specimen_ids"], + charge_item_definitions=data["charge_item_ids"], + locations=data["location_ids"], + derived_from_uri=data["derived_from_uri"], + created_by=created_by, + updated_by=created_by, + ) + activity.save() + logger.debug("Created activity: %s", data["title"]) + return activity + + except Exception as e: + error_message = ( + f"Failed to create activity '{data.get('title', 'Unknown')}': {e}" + ) + raise RuntimeError(error_message) from e + + def handle(self, *args, **options): + start_time = datetime.now(tz=UTC) + + # Set logging level + if options["verbosity"] == 0: + logger.setLevel(logging.ERROR) + elif options["verbosity"] == 1: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + try: + # Get facility + facility = Facility.objects.get(external_id=options["facility"]) + logger.info("Loading activities for facility: %s", facility.name) + + # Load dependencies first + self.load_dependencies(facility, options) + + # Load activity data + logger.info("\n=== Loading Activity Definitions ===") + rows = self.load_data(options) + logger.info("Loaded %d rows from source", len(rows)) + + if not rows: + self.stdout.write(self.style.WARNING("No rows found in source")) + return + + # Process rows in batches + batch_size = options["batch_size"] + validate_codes = options["validate_codes"] + total_rows = len(rows) + successful = [] + failed = [] + output_rows = [] + + for i in range(0, total_rows, batch_size): + batch = rows[i : i + batch_size] + batch_num = (i // batch_size) + 1 + total_batches = (total_rows + batch_size - 1) // batch_size + + logger.info( + "Processing batch %d/%d (rows %d-%d)", + batch_num, + total_batches, + i + 1, + min(i + batch_size, total_rows), + ) + + for row in batch: + row_title = row.get("title", "Unknown") + slug_value = "" + + try: + data = self.process_row(row, facility, validate_codes, None) + slug_value = data["slug_value"] + + data, missing = self.resolve_dependencies(data, facility) + + if missing: + error_msg = f"Missing dependencies: {', '.join(missing)}" + logger.warning("Skipping %s: %s", data["title"], error_msg) + failed.append(slug_value) + output_rows.append( + { + "title": data["title"], + "slug_value": slug_value, + "status": "Failed", + "error": error_msg, + "code_substitutions": data.get("substitutions", ""), + } + ) + continue + + self.create_activity_definition(data, facility, None) + + successful.append(slug_value) + output_rows.append( + { + "title": data["title"], + "slug_value": slug_value, + "status": "Success", + "error": "", + "code_substitutions": data.get("substitutions", ""), + } + ) + + except Exception as e: + logger.error("Error processing row '%s': %s", row_title, e) + failed.append(row_title) + output_rows.append( + { + "title": row_title, + "slug_value": slug_value, + "status": "Failed", + "error": str(e), + "code_substitutions": "", + } + ) + + # Write output CSV + output_path = options.get("output") or default_output_path + if output_path: + write_output_csv( + output_path, + output_rows, + ["title", "slug_value", "status", "error", "code_substitutions"], + ) + + # Print summary + self.stdout.write("\n=== Summary ===") + self.stdout.write(f"Total rows: {total_rows}") + self.stdout.write(self.style.SUCCESS(f"Successful: {len(successful)}")) + self.stdout.write(self.style.ERROR(f"Failed: {len(failed)}")) + self.stdout.write(f"Time taken: {datetime.now(tz=UTC) - start_time}") + self.stdout.write( + self.style.SUCCESS("Activity definitions loaded successfully") + ) + + except Exception as e: + logger.exception("Error in main process") + error_message = f"Error in main process: {e}" + self.stdout.write(self.style.ERROR(error_message)) + raise diff --git a/care/emr/management/commands/load_charge_item_definition.py b/care/emr/management/commands/load_charge_item_definition.py new file mode 100644 index 0000000000..72b0209bcd --- /dev/null +++ b/care/emr/management/commands/load_charge_item_definition.py @@ -0,0 +1,374 @@ +""" +Management command to load Charge Item Definitions from CSV/Google Sheets. + +Usage: + python manage.py load_charge_item_definition ./inputs/ChargeItemDefinition.csv --facility 24a071a3-07eb-442c-8457-bda417a375d3 + python manage.py load_charge_item_definition --google-sheet --sheet-name --facility +""" + +import logging +from datetime import UTC, datetime +from pathlib import Path + +from django.core.management.base import BaseCommand + +from care.emr.management.commands.load_emr_utils import ( + create_slug, + ensure_category, + normalize_title, + read_csv_from_file, + read_csv_from_google_sheet, + read_csv_from_url, + write_output_csv, +) +from care.emr.models.charge_item_definition import ChargeItemDefinition +from care.facility.models import Facility + +logger = logging.getLogger(__name__) + +current_dir = Path(__file__).resolve().parent +root_dir = current_dir.parent.parent.parent.parent +default_output_path = root_dir / "outputs" / "charge_item_definition_output.csv" + + +class Command(BaseCommand): + """ + Load Charge Item Definitions from CSV or Google Sheets. + + Expected CSV columns: + - title + - Base Price + - Tax Rate (optional: 5, 12, or 18) + - category + - description (optional) + - status (optional, default: active) + """ + + help = "Load Charge Item Definitions from CSV or Google Sheets" + + def add_arguments(self, parser): + parser.add_argument( + "source", + type=str, + nargs="?", + help="CSV file path or URL", + ) + parser.add_argument( + "--google-sheet", + type=str, + help="Google Sheet ID", + ) + parser.add_argument( + "--sheet-name", + type=str, + default="Sheet1", + help="Sheet name (default: Sheet1)", + ) + parser.add_argument( + "--facility", + type=str, + required=True, + help="Facility external ID", + ) + parser.add_argument( + "--output", + type=str, + help="Output CSV file path", + ) + parser.add_argument( + "--batch-size", + type=int, + default=100, + help="Batch size for processing (default: 100)", + ) + + def load_data(self, options): + """Load data from source.""" + if options["google_sheet"]: + return read_csv_from_google_sheet( + options["google_sheet"], options["sheet_name"] + ) + if options["source"]: + if options["source"].startswith("http"): + return read_csv_from_url(options["source"]) + return read_csv_from_file(options["source"]) + raise ValueError("Must provide either source file/URL or --google-sheet") + + def get_tax_components(self, tax_rate: str | None) -> list[dict]: + """ + Get tax components based on tax rate. + Returns list of monetary components for CGST and SGST. + """ + tax_mapping = { + "5": [ + { + "monetary_component_type": "tax", + "code": { + "system": "http://ohc.network/codes/monetary/tax", + "code": "cgst", + "display": "CGST", + }, + "factor": 2.5, + "conditions": [], + }, + { + "monetary_component_type": "tax", + "code": { + "system": "http://ohc.network/codes/monetary/tax", + "code": "sgst", + "display": "SGST", + }, + "factor": 2.5, + "conditions": [], + }, + ], + "12": [ + { + "monetary_component_type": "tax", + "code": { + "system": "http://ohc.network/codes/monetary/tax", + "code": "cgst", + "display": "CGST", + }, + "factor": 6, + "conditions": [], + }, + { + "monetary_component_type": "tax", + "code": { + "system": "http://ohc.network/codes/monetary/tax", + "code": "sgst", + "display": "SGST", + }, + "factor": 6, + "conditions": [], + }, + ], + "18": [ + { + "monetary_component_type": "tax", + "code": { + "system": "http://ohc.network/codes/monetary/tax", + "code": "cgst", + "display": "CGST", + }, + "factor": 9, + "conditions": [], + }, + { + "monetary_component_type": "tax", + "code": { + "system": "http://ohc.network/codes/monetary/tax", + "code": "sgst", + "display": "SGST", + }, + "factor": 9, + "conditions": [], + }, + ], + } + + if tax_rate and tax_rate in tax_mapping: + return tax_mapping[tax_rate] + + if tax_rate: + logger.warning("Unknown tax rate: %s", tax_rate) + + return [] + + def process_row(self, row: dict, facility: Facility, created_by) -> dict: + """ + Process a single CSV row into a ChargeItemDefinition data dict. + Raises exceptions with descriptive messages on errors. + """ + try: + # Validate required fields + if not row.get("title"): + raise ValueError("Missing required field: title") + + # Parse base price + base_price_str = row.get("Base Price", "0") + base_price_str = base_price_str.replace("₹", "").replace(",", "").strip() + try: + base_price = float(base_price_str) + except (ValueError, TypeError): + base_price = 0.0 + + tax_rate = row.get("Tax Rate") or row.get("RATE") or row.get("Tax") + + # Ensure category exists (may raise exceptions) + category_name = row.get("category", "service") + try: + category = ensure_category( + category_name, facility, "charge_item_definition", created_by + ) + except Exception as e: + error_message = f"Failed to ensure category '{category_name}': {e}" + raise ValueError(error_message) from e + + price_components = [ + { + "monetary_component_type": "base", + "amount": str(base_price), + "conditions": [], + } + ] + price_components.extend(self.get_tax_components(tax_rate)) + + title = normalize_title(row["title"]) + slug_value = create_slug(title) + + return { + "title": title, + "slug_value": slug_value, + "status": row.get("status", "active"), + "description": row.get("description", f"Service: {title}"), + "category": category, + "price_components": price_components, + } + + except (KeyError, ValueError) as e: + error_message = f"Failed to process row: {e}" + raise ValueError(error_message) from e + except Exception as e: + error_message = f"Unexpected error processing row: {e}" + raise RuntimeError(error_message) from e + + def create_charge_item_definition( + self, data: dict, facility: Facility, created_by + ) -> ChargeItemDefinition: + """ + Create or update a ChargeItemDefinition. + Raises exceptions with descriptive messages on errors. + """ + try: + full_slug = ChargeItemDefinition.calculate_slug_from_facility( + str(facility.external_id), data["slug_value"] + ) + + existing = ChargeItemDefinition.objects.filter( + title__iexact=data["title"], facility=facility + ).first() + + if existing: + logger.warning( + "Charge item definition already exists: %s", data["title"] + ) + return existing + + charge_item = ChargeItemDefinition( + facility=facility, + slug=full_slug, + title=data["title"], + status=data["status"], + description=data["description"], + category=data["category"], + price_components=data["price_components"], + created_by=created_by, + updated_by=created_by, + ) + charge_item.save() + logger.debug("Created charge item: %s", data["title"]) + return charge_item + + except Exception as e: + error_message = ( + f"Failed to create charge item '{data.get('title', 'Unknown')}': {e}" + ) + raise RuntimeError(error_message) from e + + def handle(self, *args, **options): + start_time = datetime.now(tz=UTC) + + # Set logging level + if options["verbosity"] == 0: + logger.setLevel(logging.ERROR) + elif options["verbosity"] == 1: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + try: + facility = Facility.objects.get(external_id=options["facility"]) + logger.info("Loading charge items for facility: %s", facility.name) + + rows = self.load_data(options) + logger.info("Loaded %d rows from source", len(rows)) + + if not rows: + self.stdout.write(self.style.WARNING("No rows found in source")) + return + + batch_size = options["batch_size"] + total_rows = len(rows) + successful = [] + failed = [] + output_rows = [] + + for i in range(0, total_rows, batch_size): + batch = rows[i : i + batch_size] + batch_num = (i // batch_size) + 1 + total_batches = (total_rows + batch_size - 1) // batch_size + + logger.info( + "Processing batch %d/%d (rows %d-%d)", + batch_num, + total_batches, + i + 1, + min(i + batch_size, total_rows), + ) + + for row in batch: + row_title = row.get("title", "Unknown") + slug_value = "" + + try: + data = self.process_row(row, facility, None) + slug_value = data["slug_value"] + + self.create_charge_item_definition(data, facility, None) + + successful.append(slug_value) + output_rows.append( + { + "title": data["title"], + "slug_value": slug_value, + "status": "Success", + "error": "", + } + ) + + except Exception as e: + logger.error("Error processing row '%s': %s", row_title, e) + failed.append(row_title) + output_rows.append( + { + "title": row_title, + "slug_value": slug_value, + "status": "Failed", + "error": str(e), + } + ) + + output_path = options.get("output") or default_output_path + if output_path: + write_output_csv( + output_path, + output_rows, + ["title", "slug_value", "status", "error"], + ) + + self.stdout.write("\n=== Summary ===") + self.stdout.write(f"Total rows: {total_rows}") + self.stdout.write(self.style.SUCCESS(f"Successful: {len(successful)}")) + self.stdout.write(self.style.ERROR(f"Failed: {len(failed)}")) + self.stdout.write(f"Time taken: {datetime.now(tz=UTC) - start_time}") + self.stdout.write( + self.style.SUCCESS("Charge item definitions loaded successfully") + ) + + except Exception as e: + logger.exception("Error in main process") + error_message = f"Error in main process: {e}" + self.stdout.write(self.style.ERROR(error_message)) + raise diff --git a/care/emr/management/commands/load_emr_utils.py b/care/emr/management/commands/load_emr_utils.py new file mode 100644 index 0000000000..9cab244f75 --- /dev/null +++ b/care/emr/management/commands/load_emr_utils.py @@ -0,0 +1,281 @@ +""" +Shared utilities for loading EMR data from CSV/Google Sheets. +""" + +import hashlib +import logging +import re +from csv import DictReader +from io import StringIO +from pathlib import Path + +import requests + +logger = logging.getLogger(__name__) + + +def normalize_title(title: str) -> str: + """ + Normalize title by cleaning up punctuation. + """ + if not title: + return "" + + # Clean up the title first + cleaned = title + # Remove extra spaces + cleaned = re.sub(r"\s+", " ", cleaned) + # Fix spacing around punctuation + cleaned = re.sub(r"\s*/\s*", "/", cleaned) + cleaned = re.sub(r"\s*\(\s*", " (", cleaned) + cleaned = re.sub(r"\s*\)\s*", ") ", cleaned) + cleaned = re.sub(r"\s*,\s*", ", ", cleaned) + cleaned = re.sub(r"\s*\.\s*", ". ", cleaned) + cleaned = re.sub(r"\s*-\s*", "-", cleaned) + cleaned = re.sub(r"\s*\+\s*", "+", cleaned) + cleaned = cleaned.strip() + + # Split by spaces and normalize each word + words = cleaned.split() + + # Preserve uppercase for special abbreviations + uppercase_words = { + "X", + "RAY", + "AP", + "LAT", + "CT", + "MRI", + "ECG", + "EKG", + "IV", + "OP", + "IP", + "ICU", + "OPD", + "IPD", + } + + result = [] + for word in words: + upper_word = word.upper() + if upper_word in uppercase_words: + result.append(upper_word) + elif any(char in word for char in ["(", ")", "/", ","]): + # Handle words with punctuation + def replace_word_part(match): + part = match.group(0) + if part.upper() in uppercase_words: + return part.upper() + return part.capitalize() + + result.append(re.sub(r"[a-zA-Z]+", replace_word_part, word)) + else: + # Regular word capitalization + result.append(word.capitalize()) + + # Final cleanup + final = " ".join(result) + final = re.sub(r"\s+", " ", final) + return final.strip() + + +def create_slug(name: str) -> str: + """ + Create a slug from a name. + Matches the TypeScript createSlug function. + """ + if not name: + return "" + + slug = name.lower() + slug = re.sub(r"[^a-z0-9\s_-]", "", slug) + slug = re.sub(r"\s+", "-", slug) + slug = re.sub(r"-+", "-", slug) + slug = slug.strip() + slug = slug[:25] + if len(slug) < 25: + hash_suffix = hashlib.sha256(slug.encode()).hexdigest() + needed_hash = 25 - len(slug) - 1 + slug = slug + "-" + hash_suffix[:needed_hash] + return slug + + +def read_csv_from_file(file_path: str) -> list[dict[str, str]]: + logger.info("Reading CSV from file: %s", file_path) + with Path(file_path).open(encoding="utf-8-sig") as f: + reader = DictReader(f) + rows = list(reader) + logger.info("Loaded %d rows from file", len(rows)) + return rows + + +def read_csv_from_url(url: str) -> list[dict[str, str]]: + """Read CSV from a URL (including Google Sheets export URLs).""" + logger.info("Reading CSV from URL: %s", url) + response = requests.get(url, timeout=30) + response.raise_for_status() + + # Parse CSV content + csv_content = response.text + # Remove BOM if present + csv_content = csv_content.removeprefix("\ufeff") + + reader = DictReader(StringIO(csv_content)) + rows = list(reader) + + logger.info("Loaded %d rows from URL", len(rows)) + return rows + + +def read_csv_from_google_sheet(sheet_id: str, sheet_name: str) -> list[dict[str, str]]: + """Read CSV from Google Sheets using the export URL.""" + url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/gviz/tq?tqx=out:csv&sheet={sheet_name}" + return read_csv_from_url(url) + + +def parse_code( + code: str | None, system: str | None, display: str | None +) -> dict | None: + """ + Parse code fields into a Code dict. + Returns None if code or system is missing. + """ + if not code or not system: + return None + + # Clean up code - remove .0 suffix if present + clean_code = str(code).strip() + if "." in clean_code: + if clean_code.endswith(".0"): + clean_code = clean_code[:-2] + else: + # Handle cases like 42342.0-6 => 42342-6 + clean_code = clean_code.replace(".0", "") + + return { + "system": system.strip(), + "code": clean_code, + "display": (display or clean_code).strip(), + } + + +def write_output_csv( + file_path: str, rows: list[dict], headers: list[str] | None = None +): + """Write output CSV with results.""" + import csv + + if not rows: + logger.warning("No rows to write to output CSV") + return + + if headers is None: + headers = list(rows[0].keys()) + + output_dir = Path(file_path).parent + output_dir.mkdir(parents=True, exist_ok=True) + + logger.info("Writing output CSV to: %s", file_path) + + with Path(file_path).open("w", encoding="utf-8", newline="") as f: + writer = csv.DictWriter(f, fieldnames=headers) + writer.writeheader() + writer.writerows(rows) + logger.info("Output CSV written successfully") + + +def validate_code_against_valueset(valueset_slug: str, code: str, system: str) -> bool: + """ + Validate a code against a valueset. + Returns True if valid, False otherwise. + """ + from care.emr.models.valueset import ValueSet + from care.emr.resources.common.coding import Coding + + try: + valueset = ValueSet.objects.filter(slug=valueset_slug).first() + if not valueset: + logger.warning("Valueset %s not found", valueset_slug) + return False + + coding = Coding(code=code, system=system) + return valueset.lookup(coding) + except Exception as e: + logger.error("Error validating code %s: %s", code, e) + return False + + +def batch_validate_codes( + valueset_slug: str, codes: list[tuple[str, str]] +) -> dict[str, bool]: + """ + Batch validate codes against a valueset. + Returns a dict mapping code -> is_valid. + """ + results = {} + for code, system in codes: + results[code] = validate_code_against_valueset(valueset_slug, code, system) + return results + + +def validate_and_substitute_code( + code: str, system: str, valueset_slug: str, default_code: dict +) -> tuple[dict, str]: + """ + Validate code against valueset and substitute with default if invalid. + Returns (code_dict, substitution_message). + """ + is_valid = validate_code_against_valueset(valueset_slug, code, system) + + if is_valid: + return parse_code(code, system, code), "" + + # Use default code + logger.warning( + "Code %s not found in valueset %s, using default", code, valueset_slug + ) + return default_code, f"{code} -> {default_code['code']}" + + +def ensure_category(category_name: str, facility, resource_type: str, created_by=None): + """ + Ensure a ResourceCategory exists for the given name, if not create it. + Returns the ResourceCategory object. + Raises exceptions on database or validation errors. + """ + from care.emr.models.resource_category import ResourceCategory + + try: + category_title = normalize_title(category_name) + category_slug_value = create_slug(category_name) + category_slug = ResourceCategory.calculate_slug_from_facility( + str(facility.external_id), category_slug_value + ) + + # Check if exists + category = ResourceCategory.objects.filter( + slug=category_slug, facility=facility + ).first() + + if category: + return category + + # Create new category + category = ResourceCategory( + facility=facility, + resource_type=resource_type, + resource_sub_type="other", + title=category_title, + slug=category_slug, + description=f"Auto-generated category for {category_title}", + created_by=created_by, + updated_by=created_by, + ) + category.save() + logger.info("Created category: %s", category_title) + return category + + except Exception as e: + error_message = f"Failed to ensure category '{category_name}': {e}" + raise RuntimeError(error_message) from e diff --git a/care/emr/management/commands/load_observation_definition.py b/care/emr/management/commands/load_observation_definition.py new file mode 100644 index 0000000000..ba2f6db5f7 --- /dev/null +++ b/care/emr/management/commands/load_observation_definition.py @@ -0,0 +1,409 @@ +""" +Management command to load Observation Definitions from CSV/Google Sheets. + +Usage: + python manage.py load_observation_definition --facility + python manage.py load_observation_definition --google-sheet --sheet-name --facility +""" + +import logging +from datetime import UTC, datetime +from pathlib import Path + +from django.core.management.base import BaseCommand + +from care.emr.management.commands.load_emr_utils import ( + create_slug, + normalize_title, + parse_code, + read_csv_from_file, + read_csv_from_google_sheet, + read_csv_from_url, + validate_and_substitute_code, + write_output_csv, +) +from care.emr.models.observation_definition import ObservationDefinition +from care.facility.models import Facility + +logger = logging.getLogger(__name__) + +current_dir = Path(__file__).resolve().parent +root_dir = current_dir.parent.parent.parent.parent +default_output_path = root_dir / "outputs" / "observation_definition_output.csv" + + +class Command(BaseCommand): + """ + Load Observation Definitions from CSV or Google Sheets. + + Expected CSV columns: + - title + - description + - status (optional, default: active) + - category (optional, default: laboratory) + - code_system + - code_value + - code_display + - permitted_data_type + - body_site_system (optional) + - body_site_code (optional) + - body_site_display (optional) + - method_system (optional) + - method_code (optional) + - method_display (optional) + - permitted_unit_system (optional) + - permitted_unit_code (optional) + - permitted_unit_display (optional) + - derived_from_uri (optional) + - component (optional, JSON string) + - qualified_ranges (optional, JSON string) + """ + + help = "Load Observation Definitions from CSV or Google Sheets" + + def add_arguments(self, parser): + parser.add_argument( + "source", + type=str, + nargs="?", + help="CSV file path or URL", + ) + parser.add_argument( + "--google-sheet", + type=str, + help="Google Sheet ID", + ) + parser.add_argument( + "--sheet-name", + type=str, + default="Sheet1", + help="Sheet name (default: Sheet1)", + ) + parser.add_argument( + "--facility", + type=str, + help="Facility external ID (optional for system-level definitions)", + ) + parser.add_argument( + "--output", + type=str, + help="Output CSV file path", + ) + parser.add_argument( + "--batch-size", + type=int, + default=100, + help="Batch size for processing (default: 100)", + ) + parser.add_argument( + "--validate-codes", + action="store_true", + default=True, + help="Validate codes against valuesets (default: True)", + ) + parser.add_argument( + "--no-validate-codes", + action="store_false", + dest="validate_codes", + help="Skip code validation", + ) + + def load_data(self, options): + """Load data from source.""" + if options["google_sheet"]: + return read_csv_from_google_sheet( + options["google_sheet"], options["sheet_name"] + ) + if options["source"]: + if options["source"].startswith("http"): + return read_csv_from_url(options["source"]) + return read_csv_from_file(options["source"]) + raise ValueError("Must provide either source file/URL or --google-sheet") + + def process_row( + self, row: dict, facility: Facility | None, validate_codes: bool + ) -> dict: + """ + Process a single CSV row into an ObservationDefinition data dict. + Raises exceptions with descriptive messages on errors. + """ + try: + import json + + if not row.get("title"): + raise ValueError("Missing required field: title") + + code_value = row.get("code_value") + code_system = row.get("code_system", "http://loinc.org") + code_display = row.get("code_display") + + # Default code for observation + default_code = { + "code": "104922-0", + "system": "http://loinc.org", + "display": "Laboratory test details panel", + } + + substitution_messages = [] + + # Validate code if requested + if validate_codes and code_value: + code, sub_msg = validate_and_substitute_code( + code_value, + code_system, + "system-observation", + default_code, + ) + if sub_msg: + substitution_messages.append(f"code: {sub_msg}") + else: + code = parse_code(code_value, code_system, code_display) or default_code + + body_site = parse_code( + row.get("body_site_code"), + row.get("body_site_system", "http://snomed.info/sct"), + row.get("body_site_display"), + ) + + # Parse optional method + method_code = row.get("method_code") + method_system = row.get( + "method_system", + "http://terminology.hl7.org/CodeSystem/observation-methods", + ) + method_display = row.get("method_display") + + default_method = { + "code": "386053000", + "system": "http://snomed.info/sct", + "display": "Technique", + } + + if validate_codes and method_code: + method, sub_msg = validate_and_substitute_code( + method_code, + method_system, + "system-collection-method", + default_method, + ) + if sub_msg: + substitution_messages.append(f"method: {sub_msg}") + else: + method = ( + parse_code(method_code, method_system, method_display) + if method_code + else None + ) + + permitted_unit = parse_code( + row.get("permitted_unit_code"), + row.get("permitted_unit_system", "http://unitsofmeasure.org"), + row.get("permitted_unit_display"), + ) + + component = [] + if row.get("component"): + try: + component = json.loads(row["component"]) + if not isinstance(component, list): + component = [] + except (json.JSONDecodeError, TypeError): + logger.warning("Invalid component JSON for %s", row.get("title")) + + qualified_ranges = [] + if row.get("qualified_ranges"): + try: + qualified_ranges = json.loads(row["qualified_ranges"]) + if not isinstance(qualified_ranges, list): + qualified_ranges = [] + except (json.JSONDecodeError, TypeError): + logger.warning( + "Invalid qualified_ranges JSON for %s", row.get("title") + ) + + title = normalize_title(row["title"]) + slug_value = create_slug(title) + + return { + "title": title, + "slug_value": slug_value, + "status": row.get("status", "active"), + "description": row.get("description", ""), + "category": row.get("category", "laboratory"), + "code": code, + "permitted_data_type": row.get("permitted_data_type", "string"), + "body_site": body_site, + "method": method, + "permitted_unit": permitted_unit, + "derived_from_uri": row.get("derived_from_uri", ""), + "component": component, + "qualified_ranges": qualified_ranges, + "substitutions": "; ".join(substitution_messages) + if substitution_messages + else "", + } + + except (KeyError, ValueError) as e: + error_message = f"Failed to process row: {e}" + raise ValueError(error_message) from e + except Exception as e: + error_message = f"Unexpected error processing row: {e}" + raise RuntimeError(error_message) from e + + def create_observation_definition( + self, data: dict, facility: Facility | None, created_by + ) -> ObservationDefinition: + """ + Create or update an ObservationDefinition. + Raises exceptions with descriptive messages on errors. + """ + try: + if facility: + full_slug = ObservationDefinition.calculate_slug_from_facility( + str(facility.external_id), data["slug_value"] + ) + else: + full_slug = ObservationDefinition.calculate_slug_from_instance( + data["slug_value"] + ) + + existing = ObservationDefinition.objects.filter( + title__iexact=data["title"], facility=facility + ).first() + + if existing: + logger.warning( + "Observation definition already exists: %s", data["title"] + ) + return existing + + observation = ObservationDefinition( + facility=facility, + slug=full_slug, + title=data["title"], + status=data["status"], + description=data["description"], + category=data["category"], + code=data["code"], + permitted_data_type=data["permitted_data_type"], + body_site=data["body_site"], + method=data["method"], + permitted_unit=data["permitted_unit"], + derived_from_uri=data["derived_from_uri"], + component=data["component"], + qualified_ranges=data["qualified_ranges"], + created_by=created_by, + updated_by=created_by, + ) + observation.save() + logger.debug("Created observation: %s", data["title"]) + return observation + + except Exception as e: + error_message = ( + f"Failed to create observation '{data.get('title', 'Unknown')}': {e}" + ) + raise RuntimeError(error_message) from e + + def handle(self, *args, **options): + start_time = datetime.now(tz=UTC) + + # Set logging level + if options["verbosity"] == 0: + logger.setLevel(logging.ERROR) + elif options["verbosity"] == 1: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + try: + facility = None + if options.get("facility"): + facility = Facility.objects.get(external_id=options["facility"]) + logger.info("Loading observations for facility: %s", facility.name) + else: + logger.info("Loading system-level observations (no facility)") + + rows = self.load_data(options) + logger.info("Loaded %d rows from source", len(rows)) + + if not rows: + self.stdout.write(self.style.WARNING("No rows found in source")) + return + + batch_size = options["batch_size"] + validate_codes = options["validate_codes"] + total_rows = len(rows) + successful = [] + failed = [] + output_rows = [] + + for i in range(0, total_rows, batch_size): + batch = rows[i : i + batch_size] + batch_num = (i // batch_size) + 1 + total_batches = (total_rows + batch_size - 1) // batch_size + + logger.info( + "Processing batch %d/%d (rows %d-%d)", + batch_num, + total_batches, + i + 1, + min(i + batch_size, total_rows), + ) + + for row in batch: + row_title = row.get("title", "Unknown") + slug_value = "" + + try: + data = self.process_row(row, facility, validate_codes) + slug_value = data["slug_value"] + + self.create_observation_definition(data, facility, None) + + successful.append(slug_value) + output_rows.append( + { + "title": data["title"], + "slug_value": slug_value, + "status": "Success", + "error": "", + "code_substitutions": data.get("substitutions", ""), + } + ) + + except Exception as e: + logger.error("Error processing row '%s': %s", row_title, e) + failed.append(row_title) + output_rows.append( + { + "title": row_title, + "slug_value": slug_value, + "status": "Failed", + "error": str(e), + "code_substitutions": "", + } + ) + + output_path = options.get("output") or default_output_path + if output_path: + write_output_csv( + output_path, + output_rows, + ["title", "slug_value", "status", "error", "code_substitutions"], + ) + + self.stdout.write("\n=== Summary ===") + self.stdout.write(f"Total rows: {total_rows}") + self.stdout.write(self.style.SUCCESS(f"Successful: {len(successful)}")) + self.stdout.write(self.style.ERROR(f"Failed: {len(failed)}")) + self.stdout.write(f"Time taken: {datetime.now(tz=UTC) - start_time}") + self.stdout.write( + self.style.SUCCESS("Observation definitions loaded successfully") + ) + + except Exception as e: + logger.exception("Error in main process") + error_message = f"Error in main process: {e}" + self.stdout.write(self.style.ERROR(error_message)) + raise diff --git a/care/emr/management/commands/load_specimen_definition.py b/care/emr/management/commands/load_specimen_definition.py new file mode 100644 index 0000000000..a938fb2f96 --- /dev/null +++ b/care/emr/management/commands/load_specimen_definition.py @@ -0,0 +1,352 @@ +""" +Management command to load Specimen Definitions from CSV/Google Sheets. + +Usage: + python manage.py load_specimen_definition --facility + python manage.py load_specimen_definition --google-sheet --sheet-name --facility +""" + +import logging +from datetime import UTC, datetime +from pathlib import Path + +from django.core.management.base import BaseCommand + +from care.emr.management.commands.load_emr_utils import ( + create_slug, + normalize_title, + parse_code, + read_csv_from_file, + read_csv_from_google_sheet, + read_csv_from_url, + write_output_csv, +) +from care.emr.models.specimen_definition import SpecimenDefinition +from care.facility.models import Facility + +logger = logging.getLogger(__name__) + +current_dir = Path(__file__).resolve().parent +root_dir = current_dir.parent.parent.parent.parent +default_output_path = root_dir / "outputs" / "specimen_definition_output.csv" + + +class Command(BaseCommand): + """ + Load Specimen Definitions from CSV or Google Sheets. + + Expected CSV columns: + - title + - description (optional) + - status (optional, default: active) + - type_collected_code + - type_collected_system + - type_collected_display + - container_cap_code (optional) + - container_cap_system (optional) + - container_cap_display (optional) + - container_minimumvolume (optional) + - container_minimumvolume_unit_code (optional) + - container_minimumvolume_unit_system (optional) + - container_minimumvolume_unit_display (optional) + - retention_time_value (optional) + - retention_time_unit_code (optional) + - retention_time_unit_system (optional) + - retention_time_unit_display (optional) + - preference (optional, default: preferred) + - requirement (optional) + - single_use (optional, default: true) + """ + + help = "Load Specimen Definitions from CSV or Google Sheets" + + def add_arguments(self, parser): + parser.add_argument( + "source", + type=str, + nargs="?", + help="CSV file path or URL", + ) + parser.add_argument( + "--google-sheet", + type=str, + help="Google Sheet ID", + ) + parser.add_argument( + "--sheet-name", + type=str, + default="Sheet1", + help="Sheet name (default: Sheet1)", + ) + parser.add_argument( + "--facility", + type=str, + required=True, + help="Facility external ID", + ) + parser.add_argument( + "--output", + type=str, + help="Output CSV file path", + ) + parser.add_argument( + "--batch-size", + type=int, + default=100, + help="Batch size for processing (default: 100)", + ) + + def load_data(self, options): + """Load data from source.""" + if options["google_sheet"]: + return read_csv_from_google_sheet( + options["google_sheet"], options["sheet_name"] + ) + if options["source"]: + if options["source"].startswith("http"): + return read_csv_from_url(options["source"]) + return read_csv_from_file(options["source"]) + raise ValueError("Must provide either source file/URL or --google-sheet") + + def process_row(self, row: dict, facility: Facility) -> dict: + """ + Process a single CSV row into a SpecimenDefinition data dict. + Raises exceptions with descriptive messages on errors. + """ + try: + if not row.get("title"): + raise ValueError("Missing required field: title") + + type_collected = parse_code( + row.get("type_collected_code"), + row.get("type_collected_system"), + row.get("type_collected_display"), + ) + if not type_collected: + raise ValueError("Missing required type_collected fields") + + container_cap = parse_code( + row.get("container_cap_code"), + row.get("container_cap_system"), + row.get("container_cap_display"), + ) + + minimum_volume_unit = parse_code( + row.get("container_minimumvolume_unit_code"), + row.get("container_minimumvolume_unit_system"), + row.get("container_minimumvolume_unit_display"), + ) + + retention_time_unit = parse_code( + row.get("retention_time_unit_code"), + row.get("retention_time_unit_system"), + row.get("retention_time_unit_display"), + ) + + container = {} + if container_cap: + container["cap"] = container_cap + + minimum_volume = row.get("container_minimumvolume") + if minimum_volume and minimum_volume_unit: + try: + volume_value = float(minimum_volume) + if volume_value > 0: + container["minimum_volume"] = { + "quantity": { + "value": volume_value, + "unit": minimum_volume_unit, + } + } + except (ValueError, TypeError): + pass + + retention_time = None + if row.get("retention_time_value") and retention_time_unit: + try: + retention_time = { + "value": float(row["retention_time_value"]), + "unit": retention_time_unit, + } + except (ValueError, TypeError): + pass + + # Set default retention time if not provided + if not retention_time: + retention_time = { + "value": 24, + "unit": { + "code": "h", + "display": "hours", + "system": "http://unitsofmeasure.org", + }, + } + + type_tested = { + "is_derived": bool(row.get("is_derived", False)), + "preference": row.get("preference", "preferred"), + "retention_time": retention_time, + "single_use": bool(row.get("single_use", True)), + } + + if container: + type_tested["container"] = container + + if row.get("requirement"): + type_tested["requirement"] = row["requirement"] + + title = normalize_title(row["title"]) + slug_value = create_slug(title) + + return { + "title": title, + "slug_value": slug_value, + "status": row.get("status", "active"), + "description": row.get("description", ""), + "type_collected": type_collected, + "type_tested": type_tested, + } + + except (KeyError, ValueError) as e: + error_message = f"Failed to process row: {e}" + raise ValueError(error_message) from e + except Exception as e: + error_message = f"Unexpected error processing row: {e}" + raise RuntimeError(error_message) from e + + def create_specimen_definition( + self, data: dict, facility: Facility, created_by + ) -> SpecimenDefinition: + """ + Create or update a SpecimenDefinition. + Raises exceptions with descriptive messages on errors. + """ + try: + full_slug = SpecimenDefinition.calculate_slug_from_facility( + str(facility.external_id), data["slug_value"] + ) + + existing = SpecimenDefinition.objects.filter( + title__iexact=data["title"], facility=facility + ).first() + + if existing: + logger.warning("Specimen definition already exists: %s", data["title"]) + return existing + + specimen = SpecimenDefinition( + facility=facility, + slug=full_slug, + title=data["title"], + status=data["status"], + description=data["description"], + type_collected=data["type_collected"], + type_tested=data["type_tested"], + created_by=created_by, + updated_by=created_by, + ) + specimen.save() + logger.debug("Created specimen: %s", data["title"]) + return specimen + + except Exception as e: + error_message = ( + f"Failed to create specimen '{data.get('title', 'Unknown')}': {e}" + ) + raise RuntimeError(error_message) from e + + def handle(self, *args, **options): + start_time = datetime.now(tz=UTC) + + # Set logging level + if options["verbosity"] == 0: + logger.setLevel(logging.ERROR) + elif options["verbosity"] == 1: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + try: + facility = Facility.objects.get(external_id=options["facility"]) + logger.info("Loading specimens for facility: %s", facility.name) + + rows = self.load_data(options) + logger.info("Loaded %d rows from source", len(rows)) + + if not rows: + self.stdout.write(self.style.WARNING("No rows found in source")) + return + + batch_size = options["batch_size"] + total_rows = len(rows) + successful = [] + failed = [] + output_rows = [] + + for i in range(0, total_rows, batch_size): + batch = rows[i : i + batch_size] + batch_num = (i // batch_size) + 1 + total_batches = (total_rows + batch_size - 1) // batch_size + + logger.info( + "Processing batch %d/%d (rows %d-%d)", + batch_num, + total_batches, + i + 1, + min(i + batch_size, total_rows), + ) + + for row in batch: + row_title = row.get("title", "Unknown") + slug_value = "" + + try: + data = self.process_row(row, facility) + slug_value = data["slug_value"] + + self.create_specimen_definition(data, facility, None) + + successful.append(slug_value) + output_rows.append( + { + "title": data["title"], + "slug_value": slug_value, + "status": "Success", + "error": "", + } + ) + + except Exception as e: + logger.error("Error processing row '%s': %s", row_title, e) + failed.append(row_title) + output_rows.append( + { + "title": row_title, + "slug_value": slug_value, + "status": "Failed", + "error": str(e), + } + ) + + output_path = options.get("output") or default_output_path + if output_path: + write_output_csv( + output_path, + output_rows, + ["title", "slug_value", "status", "error"], + ) + + self.stdout.write("\n=== Summary ===") + self.stdout.write(f"Total rows: {total_rows}") + self.stdout.write(self.style.SUCCESS(f"Successful: {len(successful)}")) + self.stdout.write(self.style.ERROR(f"Failed: {len(failed)}")) + self.stdout.write(f"Time taken: {datetime.now(tz=UTC) - start_time}") + self.stdout.write( + self.style.SUCCESS("Specimen definitions loaded successfully") + ) + + except Exception as e: + logger.exception("Error in main process") + error_message = f"Error in main process: {e}" + self.stdout.write(self.style.ERROR(error_message)) + raise From 245c892f5f438dac7a0b52c9a870c988c1f6adc0 Mon Sep 17 00:00:00 2001 From: Jacob Jeevan Date: Fri, 16 Jan 2026 14:00:33 +0530 Subject: [PATCH 2/2] simplify location loading --- .../commands/load_activity_definition.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/care/emr/management/commands/load_activity_definition.py b/care/emr/management/commands/load_activity_definition.py index 9ccac650f5..099675c6a1 100644 --- a/care/emr/management/commands/load_activity_definition.py +++ b/care/emr/management/commands/load_activity_definition.py @@ -272,9 +272,9 @@ def process_row( s.strip() for s in row["charge_item_slugs"].split(",") if s.strip() ] - location_names = [] + location_ids = [] if row.get("locations"): - location_names = [ + location_ids = [ s.strip() for s in row["locations"].split(",") if s.strip() ] @@ -304,7 +304,7 @@ def process_row( "observation_slugs": observation_slugs, "specimen_slugs": specimen_slugs, "charge_item_slugs": charge_item_slugs, - "location_names": location_names, + "location_ids": location_ids, "derived_from_uri": row.get("derived_from_uri", ""), "substitutions": "; ".join(substitution_messages) if substitution_messages @@ -369,17 +369,9 @@ def resolve_dependencies( else: missing.append(f"charge_item:{slug}") - # Resolve locations - location_ids, missing_locations = self.lookup_locations( - data["location_names"], facility - ) - for loc in missing_locations: - missing.append(f"location:{loc}") - data["observation_ids"] = observation_ids data["specimen_ids"] = specimen_ids data["charge_item_ids"] = charge_item_ids - data["location_ids"] = location_ids return data, missing