-
Notifications
You must be signed in to change notification settings - Fork 599
[Eng-32] feat :create otp based reset for password #3630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
4ec016d
667e54e
3dfdd03
8021e7a
f666b68
520e877
8a9b522
164721e
54cdcda
7454a65
e15ae5a
afeb68c
fafad78
ab6042d
0a9937b
fc1da34
7215c1e
9347046
f532197
447ce22
6fe3ccd
97d1998
1638abe
7f8e3d9
02e5abc
21dc945
acfb02a
8e03058
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| import secrets | ||
| import string | ||
| from datetime import timedelta | ||
| from enum import Enum | ||
|
|
||
| from django.conf import settings | ||
| from django.utils import timezone | ||
|
|
@@ -12,20 +13,60 @@ | |
| from rest_framework.response import Response | ||
|
|
||
| from care.emr.api.viewsets.base import EMRBaseViewSet | ||
| from care.facility.models.patient import PatientMobileOTP | ||
| from care.facility.models.patient import MobileOTP | ||
| from care.utils import sms | ||
| from care.utils.models.validators import mobile_validator | ||
| from care.utils.sms.utils import get_sms_content | ||
| from config.patient_otp_token import PatientToken | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class OTPType(str, Enum): | ||
| login = "login" | ||
| reset_password = "reset_password" | ||
|
|
||
|
|
||
| def rand_pass(size): | ||
| return "".join(secrets.choice(string.digits) for _ in range(size)) | ||
|
|
||
|
|
||
| class OTPLoginRequestSpec(BaseModel): | ||
| def send_otp(phone_number, purpose): | ||
|
nandkishorr marked this conversation as resolved.
Outdated
|
||
| sent_otps = MobileOTP.objects.filter( | ||
| created_date__gte=(timezone.now() - timedelta(settings.OTP_REPEAT_WINDOW)), | ||
| is_used=False, | ||
| phone_number=phone_number, | ||
| ) | ||
| if sent_otps.count() >= settings.OTP_MAX_REPEATS_WINDOW: | ||
| raise ValidationError({"phone_number": "Max Retries has exceeded"}) | ||
|
nandkishorr marked this conversation as resolved.
Outdated
|
||
|
|
||
| random_otp = "" | ||
| if settings.USE_SMS: | ||
| random_otp = rand_pass(settings.OTP_LENGTH) | ||
| try: | ||
| if purpose == OTPType.login: | ||
| content = settings.OTP_SMS_CONTENT.format(otp=random_otp) | ||
| elif purpose == OTPType.reset_password: | ||
| content = settings.OTP_SMS_RESET_PASSWORD_CONTENT.format(otp=random_otp) | ||
|
|
||
| sms.send_text_message( | ||
| content=content, | ||
| recipients=[phone_number], | ||
| ) | ||
| except Exception as e: | ||
| logger.error(e) | ||
| return Response( | ||
| {"error": "Error while sending OTP. Contact admin."}, status=400 | ||
| ) | ||
| elif settings.IS_PRODUCTION: | ||
| random_otp = rand_pass(settings.OTP_LENGTH) | ||
| else: | ||
| random_otp = "45612" | ||
|
|
||
| MobileOTP.objects.create(phone_number=phone_number, otp=random_otp) | ||
| return None | ||
|
|
||
|
|
||
| class OTPRequestBaseSpec(BaseModel): | ||
| phone_number: str | ||
|
|
||
| @field_validator("phone_number") | ||
|
|
@@ -39,7 +80,7 @@ def validate_phone_number(cls, value): | |
| return value | ||
|
|
||
|
|
||
| class OTPLoginSpec(OTPLoginRequestSpec): | ||
| class OTPLoginSpec(OTPRequestBaseSpec): | ||
| otp: str = Field(min_length=settings.OTP_LENGTH, max_length=settings.OTP_LENGTH) | ||
|
|
||
|
|
||
|
|
@@ -48,41 +89,14 @@ class OTPLoginView(EMRBaseViewSet): | |
| permission_classes = [] | ||
|
|
||
| @extend_schema( | ||
| request=OTPLoginRequestSpec, | ||
| request=OTPRequestBaseSpec, | ||
| ) | ||
| @action(detail=False, methods=["POST"]) | ||
| def send(self, request): | ||
| data = OTPLoginRequestSpec(**request.data) | ||
| sent_otps = PatientMobileOTP.objects.filter( | ||
| created_date__gte=(timezone.now() - timedelta(settings.OTP_REPEAT_WINDOW)), | ||
| is_used=False, | ||
| phone_number=data.phone_number, | ||
| ) | ||
| if sent_otps.count() >= settings.OTP_MAX_REPEATS_WINDOW: | ||
| raise ValidationError({"phone_number": "Max Retries has exceeded"}) | ||
| random_otp = "" | ||
| if settings.USE_SMS: | ||
| random_otp = rand_pass(settings.OTP_LENGTH) | ||
| try: | ||
| content = get_sms_content( | ||
| settings.OTP_SMS_TEMPLATE_PATH, {"random_otp": random_otp} | ||
| ) | ||
| sms.send_text_message( | ||
| content=content, | ||
| recipients=[data.phone_number], | ||
| ) | ||
| except Exception as e: | ||
| logger.error(e) | ||
| return Response( | ||
| {"error": "Error while sending OTP. Contact admin."}, status=400 | ||
| ) | ||
| elif settings.IS_PRODUCTION: | ||
| random_otp = rand_pass(settings.OTP_LENGTH) | ||
| else: | ||
| random_otp = "45612" | ||
|
|
||
| otp_obj = PatientMobileOTP(phone_number=data.phone_number, otp=random_otp) | ||
| otp_obj.save() | ||
| data = OTPRequestBaseSpec(**request.data) | ||
| error_response = send_otp(data.phone_number, purpose=OTPType.login) | ||
| if error_response: | ||
| return error_response | ||
| return Response({"otp": "generated"}) | ||
|
nandkishorr marked this conversation as resolved.
|
||
|
|
||
| @extend_schema( | ||
|
|
@@ -91,7 +105,7 @@ def send(self, request): | |
| @action(detail=False, methods=["POST"]) | ||
| def login(self, request): | ||
| data = OTPLoginSpec(**request.data) | ||
| otp_object = PatientMobileOTP.objects.filter( | ||
| otp_object = MobileOTP.objects.filter( | ||
| phone_number=data.phone_number, otp=data.otp, is_used=False | ||
| ).first() | ||
|
Comment on lines
+108
to
110
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Expire login OTPs when validating them. Lines 115-117 accept any unused row matching the phone number and OTP, with no Suggested fix- otp_object = MobileOTP.objects.filter(
- phone_number=data.phone_number, otp=data.otp, is_used=False
- ).first()
+ otp_object = (
+ MobileOTP.objects.filter(
+ phone_number=data.phone_number,
+ otp=data.otp,
+ is_used=False,
+ created_date__gte=(
+ timezone.now() - timedelta(hours=settings.OTP_REPEAT_WINDOW)
+ ),
+ )
+ .order_by("-created_date")
+ .first()
+ )🤖 Prompt for AI Agents |
||
| if not otp_object: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| # Generated by Django 6.0 on 2026-05-05 17:41 | ||
|
|
||
| from django.db import migrations | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ('facility', '0484_remove_facility_discount_codes_and_more'), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.RenameModel( | ||
| old_name='PatientMobileOTP', | ||
| new_name='MobileOTP', | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ | |
| from care.utils.models.validators import mobile_or_landline_number_validator | ||
|
|
||
|
|
||
| class PatientMobileOTP(BaseModel): | ||
| class MobileOTP(BaseModel): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need a periodic job to delete all OTP's that are older than the period they are active for.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its resolved in 3632 |
||
| is_used = models.BooleanField(default=False) | ||
| phone_number = models.CharField( | ||
| max_length=14, validators=[mobile_or_landline_number_validator] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| import logging | ||
| from datetime import timedelta | ||
|
|
||
| from django.conf import settings | ||
| from django.contrib.auth.password_validation import ( | ||
| get_password_validators, | ||
| validate_password, | ||
| ) | ||
| from django.utils import timezone | ||
| from drf_spectacular.utils import extend_schema | ||
| from pydantic import Field | ||
| from rest_framework.decorators import action | ||
| from rest_framework.exceptions import ValidationError | ||
| from rest_framework.response import Response | ||
|
|
||
| from care.emr.api.otp_viewsets.login import OTPRequestBaseSpec, OTPType, send_otp | ||
| from care.emr.api.viewsets.base import EMRBaseViewSet | ||
| from care.facility.models.patient import MobileOTP | ||
| from care.users.models import User | ||
| from config.ratelimit import ratelimit | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class OTPResetSendSpec(OTPRequestBaseSpec): | ||
| pass | ||
|
|
||
|
|
||
| class OTPResetConfirmSpec(OTPRequestBaseSpec): | ||
| otp: str = Field(min_length=settings.OTP_LENGTH, max_length=settings.OTP_LENGTH) | ||
| password: str = Field(min_length=8) | ||
|
|
||
|
|
||
| class OTPResetPasswordView(EMRBaseViewSet): | ||
| authentication_classes = [] | ||
| permission_classes = [] | ||
|
|
||
| @action(detail=False, methods=["POST"]) | ||
| @extend_schema(request=OTPResetSendSpec) | ||
| def send(self, request): | ||
| data = OTPResetSendSpec(**request.data) | ||
|
|
||
| if ratelimit(request, "otp-password-reset", ["ip"]): | ||
| error_message = "Too many requests. Please try again later." | ||
| return Response( | ||
| {"detail": error_message}, | ||
| status=429, | ||
| ) | ||
|
|
||
| if not User.objects.filter(phone_number=data.phone_number).exists(): | ||
| return Response({"otp": "generated"}) | ||
|
|
||
| error_response = send_otp( | ||
| data.phone_number, | ||
| purpose=OTPType.reset_password, | ||
| ) | ||
| if error_response: | ||
| return error_response | ||
| return Response({"otp": "generated"}) | ||
|
|
||
| @action(detail=False, methods=["POST"]) | ||
| @extend_schema(request=OTPResetConfirmSpec) | ||
| def confirm(self, request): | ||
| data = OTPResetConfirmSpec(**request.data) | ||
| if ratelimit(request, "otp-password-confirm", ["ip"]): | ||
| error_message = "Too many requests. Please try again later." | ||
| return Response( | ||
| {"detail": error_message}, | ||
| status=429, | ||
| ) | ||
| user = User.objects.filter(phone_number=data.phone_number).first() | ||
|
nandkishorr marked this conversation as resolved.
Outdated
|
||
| if not user: | ||
| raise ValidationError({"error": "No User linked to this phone number"}) | ||
| otp_obj = ( | ||
| MobileOTP.objects.filter( | ||
| phone_number=data.phone_number, | ||
| is_used=False, | ||
| created_date__gte=( | ||
| timezone.now() - timedelta(hours=settings.OTP_REPEAT_WINDOW) | ||
| ), | ||
| ) | ||
| .order_by("-created_date") | ||
| .first() | ||
| ) | ||
| if not otp_obj or otp_obj.otp != data.otp: | ||
| raise ValidationError({"otp": "Invalid OTP"}) | ||
|
|
||
| validate_password( | ||
| data.password, | ||
| user=user, | ||
| password_validators=get_password_validators( | ||
| settings.AUTH_PASSWORD_VALIDATORS | ||
| ), | ||
| ) | ||
| user.set_password(data.password) | ||
| user.save() | ||
|
nandkishorr marked this conversation as resolved.
|
||
| MobileOTP.objects.filter( | ||
| phone_number=data.phone_number, | ||
| ).delete() | ||
|
nandkishorr marked this conversation as resolved.
|
||
| return Response({"message": "Password reset successful"}) | ||
Uh oh!
There was an error while loading. Please reload this page.