mirror of
https://github.com/QingdaoU/OnlineJudge.git
synced 2025-01-17 10:04:52 +00:00
213 lines
7.9 KiB
Python
213 lines
7.9 KiB
Python
import os
|
|
import re
|
|
import xlsxwriter
|
|
|
|
from django.db import transaction, IntegrityError
|
|
from django.db.models import Q
|
|
from django.http import HttpResponse
|
|
from django.contrib.auth.hashers import make_password
|
|
|
|
from submission.models import Submission
|
|
from utils.api import APIView, validate_serializer
|
|
from utils.shortcuts import rand_str
|
|
|
|
from ..decorators import super_admin_required
|
|
from ..models import AdminType, ProblemPermission, User, UserProfile
|
|
from ..serializers import EditUserSerializer, UserAdminSerializer, GenerateUserSerializer
|
|
from ..serializers import ImportUserSeralizer
|
|
|
|
|
|
class UserAdminAPI(APIView):
|
|
@validate_serializer(ImportUserSeralizer)
|
|
@super_admin_required
|
|
def post(self, request):
|
|
"""
|
|
Import User
|
|
"""
|
|
data = request.data["users"]
|
|
|
|
user_list = []
|
|
for user_data in data:
|
|
if len(user_data) != 3 or len(user_data[0]) > 32:
|
|
return self.error(f"Error occurred while processing data '{user_data}'")
|
|
user_list.append(User(username=user_data[0], password=make_password(user_data[1]), email=user_data[2]))
|
|
|
|
try:
|
|
with transaction.atomic():
|
|
ret = User.objects.bulk_create(user_list)
|
|
UserProfile.objects.bulk_create([UserProfile(user=user) for user in ret])
|
|
return self.success()
|
|
except IntegrityError as e:
|
|
# Extract detail from exception message
|
|
# duplicate key value violates unique constraint "user_username_key"
|
|
# DETAIL: Key (username)=(root11) already exists.
|
|
return self.error(str(e).split("\n")[1])
|
|
|
|
@validate_serializer(EditUserSerializer)
|
|
@super_admin_required
|
|
def put(self, request):
|
|
"""
|
|
Edit user api
|
|
"""
|
|
data = request.data
|
|
try:
|
|
user = User.objects.get(id=data["id"])
|
|
except User.DoesNotExist:
|
|
return self.error("User does not exist")
|
|
if User.objects.filter(username=data["username"]).exclude(id=user.id).exists():
|
|
return self.error("Username already exists")
|
|
if User.objects.filter(email=data["email"].lower()).exclude(id=user.id).exists():
|
|
return self.error("Email already exists")
|
|
|
|
pre_username = user.username
|
|
user.username = data["username"]
|
|
user.email = data["email"]
|
|
user.admin_type = data["admin_type"]
|
|
user.is_disabled = data["is_disabled"]
|
|
|
|
if data["admin_type"] == AdminType.ADMIN:
|
|
user.problem_permission = data["problem_permission"]
|
|
elif data["admin_type"] == AdminType.SUPER_ADMIN:
|
|
user.problem_permission = ProblemPermission.ALL
|
|
else:
|
|
user.problem_permission = ProblemPermission.NONE
|
|
|
|
if data["password"]:
|
|
user.set_password(data["password"])
|
|
|
|
if data["open_api"]:
|
|
# Avoid reset user appkey after saving changes
|
|
if not user.open_api:
|
|
user.open_api_appkey = rand_str()
|
|
else:
|
|
user.open_api_appkey = None
|
|
user.open_api = data["open_api"]
|
|
|
|
if data["two_factor_auth"]:
|
|
# Avoid reset user tfa_token after saving changes
|
|
if not user.two_factor_auth:
|
|
user.tfa_token = rand_str()
|
|
else:
|
|
user.tfa_token = None
|
|
|
|
user.two_factor_auth = data["two_factor_auth"]
|
|
|
|
user.save()
|
|
if pre_username != user.username:
|
|
Submission.objects.filter(username=pre_username).update(username=user.username)
|
|
|
|
UserProfile.objects.filter(user=user).update(real_name=data["real_name"])
|
|
return self.success(UserAdminSerializer(user).data)
|
|
|
|
@super_admin_required
|
|
def get(self, request):
|
|
"""
|
|
User list api / Get user by id
|
|
"""
|
|
user_id = request.GET.get("id")
|
|
if user_id:
|
|
try:
|
|
user = User.objects.get(id=user_id)
|
|
except User.DoesNotExist:
|
|
return self.error("User does not exist")
|
|
return self.success(UserAdminSerializer(user).data)
|
|
|
|
user = User.objects.all().order_by("-create_time")
|
|
|
|
keyword = request.GET.get("keyword", None)
|
|
if keyword:
|
|
user = user.filter(Q(username__icontains=keyword) |
|
|
Q(userprofile__real_name__icontains=keyword) |
|
|
Q(email__icontains=keyword))
|
|
return self.success(self.paginate_data(request, user, UserAdminSerializer))
|
|
|
|
def delete_one(self, user_id):
|
|
try:
|
|
user = User.objects.get(id=user_id)
|
|
except User.DoesNotExist:
|
|
return f"User {user_id} does not exist"
|
|
if Submission.objects.filter(user_id=user_id).exists():
|
|
return f"Can't delete the user {user_id} as he/she has submissions"
|
|
user.delete()
|
|
|
|
@super_admin_required
|
|
def delete(self, request):
|
|
id = request.GET.get("id")
|
|
if not id:
|
|
return self.error("Invalid Parameter, id is required")
|
|
for user_id in id.split(","):
|
|
if user_id:
|
|
error = self.delete_one(user_id)
|
|
if error:
|
|
return self.error(error)
|
|
return self.success()
|
|
|
|
|
|
class GenerateUserAPI(APIView):
|
|
@super_admin_required
|
|
def get(self, request):
|
|
"""
|
|
download users excel
|
|
"""
|
|
file_id = request.GET.get("file_id")
|
|
if not file_id:
|
|
return self.error("Invalid Parameter, file_id is required")
|
|
if not re.match(r"^[a-zA-Z0-9]+$", file_id):
|
|
return self.error("Illegal file_id")
|
|
file_path = f"/tmp/{file_id}.xlsx"
|
|
if not os.path.isfile(file_path):
|
|
return self.error("File does not exist")
|
|
with open(file_path, "rb") as f:
|
|
raw_data = f.read()
|
|
os.remove(file_path)
|
|
response = HttpResponse(raw_data)
|
|
response["Content-Disposition"] = f"attachment; filename=users.xlsx"
|
|
response["Content-Type"] = "application/xlsx"
|
|
return response
|
|
|
|
@validate_serializer(GenerateUserSerializer)
|
|
@super_admin_required
|
|
def post(self, request):
|
|
"""
|
|
Generate User
|
|
"""
|
|
data = request.data
|
|
number_max_length = max(len(str(data["number_from"])), len(str(data["number_to"])))
|
|
if number_max_length + len(data["prefix"]) + len(data["suffix"]) > 32:
|
|
return self.error("Username should not more than 32 characters")
|
|
if data["number_from"] > data["number_to"]:
|
|
return self.error("Start number must be lower than end number")
|
|
|
|
file_id = rand_str(8)
|
|
filename = f"/tmp/{file_id}.xlsx"
|
|
workbook = xlsxwriter.Workbook(filename)
|
|
worksheet = workbook.add_worksheet()
|
|
worksheet.set_column("A:B", 20)
|
|
worksheet.write("A1", "Username")
|
|
worksheet.write("B1", "Password")
|
|
i = 1
|
|
|
|
user_list = []
|
|
for number in range(data["number_from"], data["number_to"] + 1):
|
|
raw_password = rand_str(data["password_length"])
|
|
user = User(username=f"{data['prefix']}{number}{data['suffix']}", password=make_password(raw_password))
|
|
user.raw_password = raw_password
|
|
user_list.append(user)
|
|
|
|
try:
|
|
with transaction.atomic():
|
|
|
|
ret = User.objects.bulk_create(user_list)
|
|
UserProfile.objects.bulk_create([UserProfile(user=user) for user in ret])
|
|
for item in user_list:
|
|
worksheet.write_string(i, 0, item.username)
|
|
worksheet.write_string(i, 1, item.raw_password)
|
|
i += 1
|
|
workbook.close()
|
|
return self.success({"file_id": file_id})
|
|
except IntegrityError as e:
|
|
# Extract detail from exception message
|
|
# duplicate key value violates unique constraint "user_username_key"
|
|
# DETAIL: Key (username)=(root11) already exists.
|
|
return self.error(str(e).split("\n")[1])
|