338 lines
12 KiB
Python
Raw Normal View History

2017-08-20 08:35:59 +08:00
import os
import qrcode
from io import BytesIO
2017-04-18 11:57:57 +08:00
from datetime import timedelta
2017-08-20 08:35:59 +08:00
from otpauth import OtpAuth
2017-04-18 11:57:57 +08:00
from django.conf import settings
2017-04-19 01:37:10 +08:00
from django.contrib import auth
2017-04-18 11:57:57 +08:00
from django.utils.timezone import now
2017-08-20 08:35:59 +08:00
from django.http import HttpResponse
from django.views.decorators.csrf import ensure_csrf_cookie
from django.utils.decorators import method_decorator
2017-01-23 16:01:56 +08:00
from conf.models import WebsiteConfig
2017-08-20 08:35:59 +08:00
from utils.api import APIView, validate_serializer, CSRFExemptAPIView
2016-09-25 14:07:45 +08:00
from utils.captcha import Captcha
2017-04-18 11:57:57 +08:00
from utils.shortcuts import rand_str
2017-01-23 16:48:04 +08:00
2016-09-25 14:07:45 +08:00
from ..decorators import login_required
from ..models import User, UserProfile
2017-08-20 20:32:07 +08:00
from ..serializers import (ApplyResetPasswordSerializer, ResetPasswordSerializer,
2017-04-19 01:37:10 +08:00
UserChangePasswordSerializer, UserLoginSerializer,
2017-08-20 20:32:07 +08:00
UserRegisterSerializer, UsernameOrEmailCheckSerializer,
RankInfoSerializer)
2017-08-20 08:35:59 +08:00
from ..serializers import (SSOSerializer, TwoFactorAuthCodeSerializer,
UserProfileSerializer,
EditUserProfileSerializer, AvatarUploadForm)
2017-04-19 02:03:48 +08:00
from ..tasks import send_email_async
2016-09-25 14:07:45 +08:00
2017-08-20 08:35:59 +08:00
class UserProfileAPI(APIView):
"""
判断是否登录 若登录返回用户信息
"""
2017-08-20 20:32:07 +08:00
2017-08-20 08:35:59 +08:00
@method_decorator(ensure_csrf_cookie)
def get(self, request, **kwargs):
user = request.user
if not user.is_authenticated():
return self.success(0)
username = request.GET.get("username")
try:
if username:
user = User.objects.get(username=username, is_disabled=False)
else:
user = request.user
except User.DoesNotExist:
return self.error("User does not exist")
profile = UserProfile.objects.get(user=user)
return self.success(UserProfileSerializer(profile).data)
@validate_serializer(EditUserProfileSerializer)
@login_required
def put(self, request):
data = request.data
user_profile = request.user.userprofile
print(data)
if data.get("avatar"):
user_profile.avatar = data["avatar"]
else:
user_profile.mood = data["mood"]
user_profile.blog = data["blog"]
user_profile.school = data["school"]
user_profile.student_id = data["student_id"]
user_profile.phone_number = data["phone_number"]
user_profile.major = data["major"]
# Timezone & language 暂时不加
user_profile.save()
return self.success("Succeeded")
class AvatarUploadAPI(CSRFExemptAPIView):
request_parsers = ()
def post(self, request):
form = AvatarUploadForm(request.POST, request.FILES)
if form.is_valid():
avatar = form.cleaned_data["file"]
else:
return self.error("Upload failed")
if avatar.size > 1024 * 1024:
return self.error("Picture too large")
if os.path.splitext(avatar.name)[-1].lower() not in [".gif", ".jpg", ".jpeg", ".bmp", ".png"]:
return self.error("Unsupported file format")
name = "avatar_" + rand_str(5) + os.path.splitext(avatar.name)[-1]
with open(os.path.join(settings.IMAGE_UPLOAD_DIR, name), "wb") as img:
for chunk in avatar:
img.write(chunk)
print(os.path.join(settings.IMAGE_UPLOAD_DIR, name))
return self.success({"path": "/static/upload/" + name})
class SSOAPI(APIView):
@login_required
def get(self, request):
callback = request.GET.get("callback", None)
if not callback:
return self.error("Parameter Error")
token = rand_str()
request.user.auth_token = token
request.user.save()
return self.success({"redirect_url": callback + "?token=" + token,
"callback": callback})
@validate_serializer(SSOSerializer)
def post(self, request):
data = request.data
try:
User.objects.get(open_api_appkey=data["appkey"])
except User.DoesNotExist:
return self.error("Invalid appkey")
try:
user = User.objects.get(auth_token=data["token"])
user.auth_token = None
user.save()
return self.success({"username": user.username,
"id": user.id,
"admin_type": user.admin_type,
"avatar": user.userprofile.avatar})
except User.DoesNotExist:
return self.error("User does not exist")
class TwoFactorAuthAPI(APIView):
@login_required
def get(self, request):
"""
Get QR code
"""
user = request.user
if user.two_factor_auth:
return self.error("Already open 2FA")
token = rand_str()
user.tfa_token = token
user.save()
config = WebsiteConfig.objects.first()
image = qrcode.make(OtpAuth(token).to_uri("totp", config.base_url, config.name))
buf = BytesIO()
image.save(buf, "gif")
return HttpResponse(buf.getvalue(), "image/gif")
@login_required
@validate_serializer(TwoFactorAuthCodeSerializer)
def post(self, request):
"""
Open 2FA
"""
code = request.data["code"]
user = request.user
if OtpAuth(user.tfa_token).valid_totp(code):
user.two_factor_auth = True
user.save()
return self.success("Succeeded")
else:
return self.error("Invalid captcha")
@login_required
@validate_serializer(TwoFactorAuthCodeSerializer)
def put(self, request):
code = request.data["code"]
user = request.user
if OtpAuth(user.tfa_token).valid_totp(code):
user.two_factor_auth = False
user.save()
else:
return self.error("Invalid captcha")
2016-11-19 12:37:27 +08:00
class UserLoginAPI(APIView):
2016-11-19 12:32:23 +08:00
@validate_serializer(UserLoginSerializer)
2016-09-25 14:07:45 +08:00
def post(self, request):
"""
User login api
"""
2016-11-19 12:32:23 +08:00
data = request.data
user = auth.authenticate(username=data["username"], password=data["password"])
# None is returned if username or password is wrong
if user:
if not user.two_factor_auth:
auth.login(request, user)
2017-04-19 02:03:48 +08:00
return self.success("Succeeded")
2016-09-25 14:07:45 +08:00
2016-11-19 12:32:23 +08:00
# `tfa_code` not in post data
if user.two_factor_auth and "tfa_code" not in data:
return self.success("tfa_required")
2016-09-25 14:07:45 +08:00
2016-11-19 12:32:23 +08:00
if OtpAuth(user.tfa_token).valid_totp(data["tfa_code"]):
auth.login(request, user)
2017-04-19 02:03:48 +08:00
return self.success("Succeeded")
2016-09-25 14:07:45 +08:00
else:
2017-04-19 02:03:48 +08:00
return self.error("Invalid two factor verification code")
2016-09-25 14:07:45 +08:00
else:
2017-04-19 02:03:48 +08:00
return self.error("Invalid username or password")
2016-09-25 14:07:45 +08:00
# todo remove this, only for debug use
def get(self, request):
auth.login(request, auth.authenticate(username=request.GET["username"], password=request.GET["password"]))
2016-10-30 02:17:35 +08:00
return self.success({})
2016-09-25 14:07:45 +08:00
2017-05-01 15:20:13 +08:00
class UserLogoutAPI(APIView):
def get(self, request):
auth.logout(request)
return self.success({})
2017-08-19 17:25:39 +08:00
class UsernameOrEmailCheck(APIView):
@validate_serializer(UsernameOrEmailCheckSerializer)
def post(self, request):
"""
check username or email is duplicate
"""
data = request.data
# True means OK.
result = {
"username": True,
"email": True
}
if data.get("username"):
if User.objects.filter(username=data["username"]).exists():
result["username"] = False
if data.get("email"):
if User.objects.filter(email=data["email"]).exists():
result["email"] = False
return self.success(result)
2016-11-19 12:37:27 +08:00
class UserRegisterAPI(APIView):
2016-11-19 12:32:23 +08:00
@validate_serializer(UserRegisterSerializer)
2016-09-25 14:07:45 +08:00
def post(self, request):
"""
User register api
"""
2016-11-19 12:32:23 +08:00
data = request.data
captcha = Captcha(request)
2017-08-20 20:41:48 +08:00
if not captcha.check(data["captcha"]):
2017-05-01 16:06:45 +08:00
return self.error("Invalid captcha")
2017-08-20 08:35:59 +08:00
if User.objects.filter(username=data["username"]).exists():
2017-04-19 02:03:48 +08:00
return self.error("Username already exists")
2017-08-20 08:35:59 +08:00
if User.objects.filter(email=data["email"]).exists():
2017-04-19 02:03:48 +08:00
return self.error("Email already exists")
2017-08-20 08:35:59 +08:00
user = User.objects.create(username=data["username"], email=data["email"])
user.set_password(data["password"])
user.save()
UserProfile.objects.create(user=user, time_zone=settings.USER_DEFAULT_TZ)
return self.success("Succeeded")
2016-09-25 14:07:45 +08:00
2016-11-19 12:37:27 +08:00
class UserChangePasswordAPI(APIView):
2016-11-19 12:32:23 +08:00
@validate_serializer(UserChangePasswordSerializer)
2016-09-25 14:07:45 +08:00
@login_required
def post(self, request):
"""
User change password api
"""
2016-11-19 12:32:23 +08:00
data = request.data
captcha = Captcha(request)
2017-08-20 20:41:48 +08:00
if not captcha.check(data["captcha"]):
2017-04-19 02:03:48 +08:00
return self.error("Invalid captcha")
2016-11-19 12:32:23 +08:00
username = request.user.username
user = auth.authenticate(username=username, password=data["old_password"])
if user:
user.set_password(data["new_password"])
user.save()
2017-04-19 02:03:48 +08:00
return self.success("Succeeded")
2016-09-25 14:07:45 +08:00
else:
2017-04-19 02:03:48 +08:00
return self.error("Invalid old password")
2017-04-18 11:57:57 +08:00
class ApplyResetPasswordAPI(APIView):
@validate_serializer(ApplyResetPasswordSerializer)
def post(self, request):
data = request.data
captcha = Captcha(request)
2017-04-18 15:19:26 +08:00
config = WebsiteConfig.objects.first()
2017-04-18 11:57:57 +08:00
if not captcha.check(data["captcha"]):
2017-04-19 02:03:48 +08:00
return self.error("Invalid captcha")
2017-04-18 11:57:57 +08:00
try:
user = User.objects.get(email=data["email"])
except User.DoesNotExist:
2017-04-19 02:03:48 +08:00
return self.error("User does not exist")
2017-04-18 11:57:57 +08:00
if user.reset_password_token_expire_time and 0 < (
user.reset_password_token_expire_time - now()).total_seconds() < 20 * 60:
2017-04-19 02:03:48 +08:00
return self.error("You can only reset password once per 20 minutes")
2017-04-18 11:57:57 +08:00
user.reset_password_token = rand_str()
user.reset_password_token_expire_time = now() + timedelta(minutes=20)
user.save()
email_template = open("reset_password_email.html", "w",
encoding="utf-8").read()
email_template = email_template.replace("{{ username }}", user.username). \
replace("{{ website_name }}", settings.WEBSITE_INFO["website_name"]). \
replace("{{ link }}", settings.WEBSITE_INFO["url"] + "/reset_password/t/" +
user.reset_password_token)
2017-04-19 02:03:48 +08:00
send_email_async.delay(config.name,
user.email,
user.username,
config.name + " 登录信息找回邮件",
email_template)
return self.success("Succeeded")
2017-04-18 11:57:57 +08:00
class ResetPasswordAPI(APIView):
2017-04-18 15:19:26 +08:00
@validate_serializer(ResetPasswordSerializer)
2017-04-18 11:57:57 +08:00
def post(self, request):
data = request.data
captcha = Captcha(request)
if not captcha.check(data["captcha"]):
2017-04-19 02:03:48 +08:00
return self.error("Invalid captcha")
2017-04-18 11:57:57 +08:00
try:
user = User.objects.get(reset_password_token=data["token"])
except User.DoesNotExist:
2017-04-19 02:03:48 +08:00
return self.error("Token dose not exist")
2017-04-18 11:57:57 +08:00
if 0 < (user.reset_password_token_expire_time - now()).total_seconds() < 30 * 60:
2017-04-19 02:03:48 +08:00
return self.error("Token expired")
2017-04-18 11:57:57 +08:00
user.reset_password_token = None
user.set_password(data["password"])
user.save()
2017-04-19 02:03:48 +08:00
return self.success("Succeeded")
2017-08-20 20:32:07 +08:00
class UserRankAPI(APIView):
def get(self, request):
rule_type = request.GET.get("rule")
if rule_type not in ["acm", "oi"]:
rule_type = "acm"
profiles = UserProfile.objects.select_related("user").filter(submission_number__gt=0)
if rule_type == "acm":
profiles = profiles.order_by("-accepted_number", "submission_number")
else:
profiles = profiles.order_by("-total_score")
return self.success(self.paginate_data(request, profiles, RankInfoSerializer))