-
Notifications
You must be signed in to change notification settings - Fork 599
[Eng-32] feat :create otp based reset for password #3630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4ec016d
667e54e
3dfdd03
8021e7a
f666b68
520e877
8a9b522
164721e
54cdcda
7454a65
e15ae5a
afeb68c
fafad78
ab6042d
0a9937b
fc1da34
7215c1e
9347046
f532197
447ce22
6fe3ccd
97d1998
1638abe
7f8e3d9
02e5abc
21dc945
acfb02a
8e03058
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,20 +12,58 @@ | |
| from rest_framework.response import Response | ||
|
|
||
| from care.emr.api.viewsets.base import EMRBaseViewSet | ||
| from care.facility.models.patient import PatientMobileOTP | ||
| from care.facility.models.patient import MobileOTP | ||
| from care.utils import sms | ||
| from care.utils.models.validators import mobile_validator | ||
| from care.utils.sms.utils import get_sms_content | ||
| from config.patient_otp_token import PatientToken | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class BaseOTPType: | ||
| def render_content(self, otp: str) -> str: | ||
| pass | ||
|
|
||
|
|
||
| class LoginOTP(BaseOTPType): | ||
| @classmethod | ||
| def render_content(cls, otp: str) -> str: | ||
| return settings.OTP_SMS_LOGIN_CONTENT.format(otp=otp) | ||
|
|
||
|
|
||
| def rand_pass(size): | ||
| return "".join(secrets.choice(string.digits) for _ in range(size)) | ||
|
|
||
|
|
||
| class OTPLoginRequestSpec(BaseModel): | ||
| def send_otp(phone_number, otp_type: BaseOTPType): | ||
| sent_otps = MobileOTP.objects.filter( | ||
| created_date__gte=(timezone.now() - timedelta(settings.OTP_REPEAT_WINDOW)), | ||
| is_used=False, | ||
| phone_number=phone_number, | ||
| ) | ||
| if sent_otps.count() >= settings.OTP_MAX_REPEATS_WINDOW: | ||
| raise ValueError("Max Retries has exceeded") | ||
|
|
||
| random_otp = "" | ||
| if settings.USE_SMS: | ||
| random_otp = rand_pass(settings.OTP_LENGTH) | ||
| try: | ||
| content = otp_type.render_content(random_otp) | ||
| sms.send_text_message( | ||
| content=content, | ||
| recipients=[phone_number], | ||
| ) | ||
| except Exception as e: | ||
| raise Exception("Error while sending OTP. Contact admin.") from e | ||
| elif settings.IS_PRODUCTION: | ||
| random_otp = rand_pass(settings.OTP_LENGTH) | ||
| else: | ||
| random_otp = "45612" | ||
|
|
||
| MobileOTP.objects.create(phone_number=phone_number, otp=random_otp) | ||
|
|
||
|
|
||
| class OTPRequestBaseSpec(BaseModel): | ||
| phone_number: str | ||
|
|
||
| @field_validator("phone_number") | ||
|
|
@@ -39,7 +77,7 @@ def validate_phone_number(cls, value): | |
| return value | ||
|
|
||
|
|
||
| class OTPLoginSpec(OTPLoginRequestSpec): | ||
| class OTPLoginSpec(OTPRequestBaseSpec): | ||
| otp: str = Field(min_length=settings.OTP_LENGTH, max_length=settings.OTP_LENGTH) | ||
|
|
||
|
|
||
|
|
@@ -48,41 +86,17 @@ class OTPLoginView(EMRBaseViewSet): | |
| permission_classes = [] | ||
|
|
||
| @extend_schema( | ||
| request=OTPLoginRequestSpec, | ||
| request=OTPRequestBaseSpec, | ||
| ) | ||
| @action(detail=False, methods=["POST"]) | ||
| def send(self, request): | ||
| data = OTPLoginRequestSpec(**request.data) | ||
| sent_otps = PatientMobileOTP.objects.filter( | ||
| created_date__gte=(timezone.now() - timedelta(settings.OTP_REPEAT_WINDOW)), | ||
| is_used=False, | ||
| phone_number=data.phone_number, | ||
| ) | ||
| if sent_otps.count() >= settings.OTP_MAX_REPEATS_WINDOW: | ||
| raise ValidationError({"phone_number": "Max Retries has exceeded"}) | ||
| random_otp = "" | ||
| if settings.USE_SMS: | ||
| random_otp = rand_pass(settings.OTP_LENGTH) | ||
| try: | ||
| content = get_sms_content( | ||
| settings.OTP_SMS_TEMPLATE_PATH, {"random_otp": random_otp} | ||
| ) | ||
| sms.send_text_message( | ||
| content=content, | ||
| recipients=[data.phone_number], | ||
| ) | ||
| except Exception as e: | ||
| logger.error(e) | ||
| return Response( | ||
| {"error": "Error while sending OTP. Contact admin."}, status=400 | ||
| ) | ||
| elif settings.IS_PRODUCTION: | ||
| random_otp = rand_pass(settings.OTP_LENGTH) | ||
| else: | ||
| random_otp = "45612" | ||
|
|
||
| otp_obj = PatientMobileOTP(phone_number=data.phone_number, otp=random_otp) | ||
| otp_obj.save() | ||
| data = OTPRequestBaseSpec(**request.data) | ||
| try: | ||
| send_otp(data.phone_number, otp_type=LoginOTP) | ||
| except ValueError as e: | ||
| raise ValidationError({"phone_number": "Unable to send OTP"}) from e | ||
| except Exception: | ||
| return Response({"error": "Unable to send OTP"}, status=400) | ||
| return Response({"otp": "generated"}) | ||
|
|
||
| @extend_schema( | ||
|
|
@@ -91,7 +105,7 @@ def send(self, request): | |
| @action(detail=False, methods=["POST"]) | ||
| def login(self, request): | ||
| data = OTPLoginSpec(**request.data) | ||
| otp_object = PatientMobileOTP.objects.filter( | ||
| otp_object = MobileOTP.objects.filter( | ||
| phone_number=data.phone_number, otp=data.otp, is_used=False | ||
| ).first() | ||
|
Comment on lines
+108
to
110
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Expire login OTPs when validating them. Lines 115-117 accept any unused row matching the phone number and OTP, with no Suggested fix- otp_object = MobileOTP.objects.filter(
- phone_number=data.phone_number, otp=data.otp, is_used=False
- ).first()
+ otp_object = (
+ MobileOTP.objects.filter(
+ phone_number=data.phone_number,
+ otp=data.otp,
+ is_used=False,
+ created_date__gte=(
+ timezone.now() - timedelta(hours=settings.OTP_REPEAT_WINDOW)
+ ),
+ )
+ .order_by("-created_date")
+ .first()
+ )🤖 Prompt for AI Agents |
||
| if not otp_object: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,171 @@ | ||
| from django.conf import settings | ||
| from django.test import override_settings | ||
| from django.urls import reverse | ||
| from rest_framework import status | ||
|
|
||
| from care.facility.models.patient import MobileOTP | ||
| from care.utils.tests.base import CareAPITestBase | ||
|
|
||
|
|
||
| @override_settings( | ||
| OTP_REPEAT_WINDOW=60, | ||
| OTP_MAX_REPEATS_WINDOW=3, | ||
| OTP_LENGTH=6, | ||
| USE_SMS=False, | ||
| IS_PRODUCTION=False, | ||
| ) | ||
| class OTPResetPasswordAPITestCase(CareAPITestBase): | ||
| def setUp(self): | ||
| self.user = self.create_user( | ||
| username="testuser", phone_number="+919876543210", password="OldPass123!" | ||
| ) | ||
| self.phone_number = "+919876543210" | ||
| self.password = "OldPass123!" | ||
| self.new_password = "NewPass123!" | ||
| self.send_otp_url = reverse("otp-password-reset-send") | ||
| self.confirm_otp_url = reverse("otp-password-reset-confirm") | ||
| self.otp = "45612" # As per settings in override, OTP will be this value | ||
|
|
||
| def _send(self, phone_number=None): | ||
| return self.client.post( | ||
| self.send_otp_url, | ||
| {"phone_number": phone_number or self.phone_number}, | ||
| format="json", | ||
| ) | ||
|
|
||
| def _confirm(self, otp, phone_number=None, username=None, password=None): | ||
| payload = { | ||
| "phone_number": phone_number or self.phone_number, | ||
| "otp": otp, | ||
| "password": password or self.new_password, | ||
| } | ||
| if username is not None: | ||
| payload["username"] = username | ||
| return self.client.post( | ||
| self.confirm_otp_url, | ||
| payload, | ||
| format="json", | ||
| ) | ||
|
|
||
| def test_send_otp_request_by_exisiting_user(self): | ||
| """ | ||
| When a valid phone number is provided, OTP should be generated and saved in the database, | ||
| regardless of whether the phone number is linked to a user or not, | ||
| to avoid leaking information about registered phone numbers.""" | ||
|
|
||
| response = self._send() | ||
| self.assertEqual(response.status_code, status.HTTP_200_OK) | ||
| self.assertEqual(response.data["otp"], "generated") | ||
| self.assertTrue( | ||
| MobileOTP.objects.filter(phone_number=self.phone_number).exists() | ||
| ) | ||
|
|
||
| def test_send_otp_request_by_non_exisiting_user(self): | ||
| """ | ||
| When a valid phone number is provided, OTP should be generated and saved in the database, | ||
| regardless of whether the phone number is linked to a user or not, | ||
| to avoid leaking information about registered phone numbers | ||
| """ | ||
| response = self._send(phone_number="+919876543211") | ||
| self.assertEqual(response.status_code, status.HTTP_200_OK) | ||
| self.assertEqual(response.data["otp"], "generated") | ||
| self.assertFalse( | ||
| MobileOTP.objects.filter(phone_number="+919876543211").exists() | ||
| ) | ||
|
|
||
| def test_send_opt_request_exceeding_limit(self): | ||
| """ | ||
| When OTP request is sent more than allowed limit within the repeat window, | ||
| it should return an error.""" | ||
| for _ in range(settings.OTP_MAX_REPEATS_WINDOW): | ||
| self._send() | ||
| response = self._send() | ||
| self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) | ||
| self.assertIn( | ||
| "Unable to send OTP", response.data["errors"][0]["msg"]["phone_number"] | ||
| ) | ||
|
|
||
| def test_confirm_otp_with_valid_otp(self): | ||
| """ | ||
| When OTP confirmation is done with valid OTP, | ||
| password should be reset successfully.""" | ||
| self._send() | ||
| response = self._confirm(otp=self.otp) | ||
| self.assertEqual(response.status_code, status.HTTP_200_OK) | ||
| self.user.refresh_from_db() | ||
| self.assertTrue(self.user.check_password(self.new_password)) | ||
|
|
||
| def test_confirm_otp_with_invalid_otp(self): | ||
| """ | ||
| When OTP confirmation is done with an invalid OTP, | ||
| it should return an error.""" | ||
| self._send() | ||
| response = self._confirm(otp="00000") | ||
| self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) | ||
| self.assertEqual(str(response.data["errors"][0]["msg"]["otp"]), "Invalid OTP") | ||
|
|
||
| def test_confirm_otp_with_invalid_phone_number(self): | ||
| """ | ||
| if the phone number is invalid, OTP confirmation should fail with invalid OTP error, | ||
| not with invalid phone number error, to avoid leaking information about registered phone numbers. | ||
| """ | ||
| self._send(phone_number="+919876543211") | ||
| response = self._confirm(otp=self.otp, phone_number="+919876543211") | ||
| self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) | ||
| self.assertEqual(str(response.data["errors"][0]["msg"]["otp"]), "Invalid OTP") | ||
|
|
||
| def test_confirm_otp_with_multiple_users_linked_to_phone_number_and_no_username( | ||
| self, | ||
| ): | ||
| """ | ||
| If multiple users are linked to the same phone number, OTP confirmation should fail with an error indicating multiple users, | ||
| unless a valid username is also provided which is linked to the phone number. | ||
| """ | ||
| self.create_user( | ||
| username="testuser2", | ||
| phone_number=self.phone_number, | ||
| password="AnotherPass123!", | ||
| ) | ||
| self._send() | ||
| response = self._confirm(otp=self.otp) | ||
| self.assertEqual(response.status_code, status.HTTP_409_CONFLICT) | ||
| self.assertIn( | ||
| "Multiple users linked to this phone number", response.data["error"] | ||
| ) | ||
|
|
||
| def test_confirm_otp_with_multiple_users_linked_to_phone_number_and_with_username( | ||
| self, | ||
| ): | ||
| """ | ||
| If multiple users are linked to the same phone number, and a valid username is provided, OTP confirmation should succeed. | ||
| """ | ||
| self.create_user( | ||
| username="testuser2", | ||
| phone_number=self.phone_number, | ||
| password="AnotherPass123!", | ||
| ) | ||
| self._send() | ||
| response = self._confirm(otp=self.otp, username="testuser") | ||
| self.assertEqual(response.status_code, status.HTTP_200_OK) | ||
| self.user.refresh_from_db() | ||
| self.assertTrue(self.user.check_password(self.new_password)) | ||
|
|
||
| def test_confirm_otp_with_multiple_users_linked_to_phone_number_and_with_invalid_username( | ||
| self, | ||
| ): | ||
| """ | ||
| If multiple users are linked to the same phone number, and an invalid username is provided, | ||
| OTP confirmation should fail with an error indicating no user with this username linked to this phone number. | ||
| """ | ||
| self.create_user( | ||
| username="testuser2", | ||
| phone_number=self.phone_number, | ||
| password="AnotherPass123!", | ||
| ) | ||
| self._send() | ||
| response = self._confirm(otp=self.otp, username="invalidusername") | ||
| self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) | ||
| self.assertIn( | ||
| "No User with this username linked to this phone number", | ||
| response.data["errors"][0]["msg"]["error"], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| # Generated by Django 6.0 on 2026-05-05 17:41 | ||
|
|
||
| from django.db import migrations | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ('facility', '0484_remove_facility_discount_codes_and_more'), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.RenameModel( | ||
| old_name='PatientMobileOTP', | ||
| new_name='MobileOTP', | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ | |
| from care.utils.models.validators import mobile_or_landline_number_validator | ||
|
|
||
|
|
||
| class PatientMobileOTP(BaseModel): | ||
| class MobileOTP(BaseModel): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need a periodic job to delete all OTP's that are older than the period they are active for.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its resolved in 3632 |
||
| is_used = models.BooleanField(default=False) | ||
| phone_number = models.CharField( | ||
| max_length=14, validators=[mobile_or_landline_number_validator] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.