diff --git a/account/tasks.py b/account/tasks.py index 12c1587d..51359991 100644 --- a/account/tasks.py +++ b/account/tasks.py @@ -1,14 +1,13 @@ import logging - -from celery import shared_task +import dramatiq from options.options import SysOptions -from utils.shortcuts import send_email +from utils.shortcuts import send_email, DRAMATIQ_WORKER_ARGS logger = logging.getLogger(__name__) -@shared_task +@dramatiq.actor(**DRAMATIQ_WORKER_ARGS(max_retries=3)) def send_email_async(from_name, to_email, to_name, subject, content): if not SysOptions.smtp_config: return diff --git a/account/views/oj.py b/account/views/oj.py index 095f1e40..2d26e484 100644 --- a/account/views/oj.py +++ b/account/views/oj.py @@ -302,11 +302,11 @@ class ApplyResetPasswordAPI(APIView): "link": f"{SysOptions.website_base_url}/reset-password/{user.reset_password_token}" } email_html = render_to_string("reset_password_email.html", render_data) - send_email_async.delay(from_name=SysOptions.website_name_shortcut, - to_email=user.email, - to_name=user.username, - subject=f"Reset your password", - content=email_html) + send_email_async.send(from_name=SysOptions.website_name_shortcut, + to_email=user.email, + to_name=user.username, + subject=f"Reset your password", + content=email_html) return self.success("Succeeded") diff --git a/contest/views/admin.py b/contest/views/admin.py index 9adb45ad..66addb15 100644 --- a/contest/views/admin.py +++ b/contest/views/admin.py @@ -234,7 +234,7 @@ class DownloadContestSubmissions(APIView): exclude_admin = request.GET.get("exclude_admin") == "1" zip_path = self._dump_submissions(contest, exclude_admin) - delete_files.apply_async((zip_path,), countdown=300) + delete_files.send_with_options(args=(zip_path,), delay=300_000) resp = FileResponse(open(zip_path, "rb")) resp["Content-Type"] = "application/zip" resp["Content-Disposition"] = f"attachment;filename={os.path.basename(zip_path)}" diff --git a/deploy/requirements.txt b/deploy/requirements.txt index 4feaa4d5..8d12c69d 100644 --- a/deploy/requirements.txt +++ b/deploy/requirements.txt @@ -1,6 +1,3 @@ -amqp==2.4.2 -billiard==3.5.0.5 -celery==4.2.1 certifi==2019.3.9 chardet==3.0.4 coverage==4.5.3 @@ -15,7 +12,6 @@ flake8-quotes==1.0.0 gunicorn==19.9.0 idna==2.8 jsonfield==2.0.2 -kombu==4.4.0 mccabe==0.6.1 otpauth==1.0.1 Pillow==5.4.1 @@ -30,5 +26,6 @@ redis==3.2.0 requests==2.21.0 six==1.12.0 urllib3==1.24.1 -vine==1.2.0 XlsxWriter==1.1.5 +django-dramatiq==0.5.0 +dramatiq==1.3.0 \ No newline at end of file diff --git a/deploy/supervisord.conf b/deploy/supervisord.conf index 0eca7211..b1740466 100644 --- a/deploy/supervisord.conf +++ b/deploy/supervisord.conf @@ -38,12 +38,12 @@ startsecs=5 stopwaitsecs = 5 killasgroup=true -[program:celery] -command=celery -A oj worker -l warning --autoscale 2,%(ENV_MAX_WORKER_NUM)s +[program:dramatiq] +command=python3 manage.py rundramatiq --no-reload --processes %(ENV_MAX_WORKER_NUM)s --threads 4 directory=/app/ user=nobody -stdout_logfile=/data/log/celery.log -stderr_logfile=/data/log/celery.log +stdout_logfile=/data/log/dramatiq.log +stderr_logfile=/data/log/dramatiq.log autostart=true autorestart=true startsecs=5 diff --git a/judge/dispatcher.py b/judge/dispatcher.py index 6f3a9434..5466df85 100644 --- a/judge/dispatcher.py +++ b/judge/dispatcher.py @@ -26,7 +26,7 @@ def process_pending_task(): # 防止循环引入 from judge.tasks import judge_task data = json.loads(cache.rpop(CacheKey.waiting_queue).decode("utf-8")) - judge_task.delay(**data) + judge_task.send(**data) class DispatcherBase(object): diff --git a/judge/tasks.py b/judge/tasks.py index a5c18182..8a1794a1 100644 --- a/judge/tasks.py +++ b/judge/tasks.py @@ -1,12 +1,12 @@ -from __future__ import absolute_import, unicode_literals -from celery import shared_task +import dramatiq from account.models import User from submission.models import Submission from judge.dispatcher import JudgeDispatcher +from utils.shortcuts import DRAMATIQ_WORKER_ARGS -@shared_task +@dramatiq.actor(**DRAMATIQ_WORKER_ARGS()) def judge_task(submission_id, problem_id): uid = Submission.objects.get(id=submission_id).user_id if User.objects.get(id=uid).is_disabled: diff --git a/oj/__init__.py b/oj/__init__.py index 23fc183a..e69de29b 100644 --- a/oj/__init__.py +++ b/oj/__init__.py @@ -1,6 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -# Django starts so that shared_task will use this app. -from .celery import app as celery_app - -__all__ = ["celery_app"] diff --git a/oj/celery.py b/oj/celery.py deleted file mode 100644 index 4f24c7e7..00000000 --- a/oj/celery.py +++ /dev/null @@ -1,18 +0,0 @@ -from __future__ import absolute_import, unicode_literals -import os -from celery import Celery -from django.conf import settings - -# set the default Django settings module for the "celery" program. -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "oj.settings") - - -app = Celery("oj") - -# Using a string here means the worker will not have to -# pickle the object when using Windows. -app.config_from_object("django.conf:settings") - -# load task modules from all registered Django app configs. -app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) -# app.autodiscover_tasks() diff --git a/oj/settings.py b/oj/settings.py index 2cfa9c8d..37a85855 100644 --- a/oj/settings.py +++ b/oj/settings.py @@ -33,7 +33,8 @@ VENDOR_APPS = ( 'django.contrib.messages', 'django.contrib.staticfiles', 'rest_framework', - 'raven.contrib.django.raven_compat' + 'raven.contrib.django.raven_compat', + 'django_dramatiq', ) LOCAL_APPS = ( 'account', @@ -164,6 +165,11 @@ LOGGING = { 'level': 'ERROR', 'propagate': True, }, + 'dramatiq': { + 'handlers': LOGGING_HANDLERS, + 'level': 'DEBUG', + 'propagate': False, + }, '': { 'handlers': LOGGING_HANDLERS, 'level': 'WARNING', @@ -202,11 +208,32 @@ CACHES = { SESSION_ENGINE = "django.contrib.sessions.backends.cache" SESSION_CACHE_ALIAS = "default" -CELERY_RESULT_BACKEND = f"{REDIS_URL}/2" -BROKER_URL = f"{REDIS_URL}/3" -CELERY_TASK_SOFT_TIME_LIMIT = CELERY_TASK_TIME_LIMIT = 180 -CELERY_ACCEPT_CONTENT = ["json"] -CELERY_TASK_SERIALIZER = "json" +DRAMATIQ_BROKER = { + "BROKER": "dramatiq.brokers.redis.RedisBroker", + "OPTIONS": { + "url": f"{REDIS_URL}/4", + }, + "MIDDLEWARE": [ + # "dramatiq.middleware.Prometheus", + "dramatiq.middleware.AgeLimit", + "dramatiq.middleware.TimeLimit", + "dramatiq.middleware.Callbacks", + "dramatiq.middleware.Retries", + # "django_dramatiq.middleware.AdminMiddleware", + "django_dramatiq.middleware.DbConnectionsMiddleware" + ] +} + +DRAMATIQ_RESULT_BACKEND = { + "BACKEND": "dramatiq.results.backends.redis.RedisBackend", + "BACKEND_OPTIONS": { + "url": f"{REDIS_URL}/4", + }, + "MIDDLEWARE_OPTIONS": { + "result_ttl": None + } +} + RAVEN_CONFIG = { 'dsn': 'https://b200023b8aed4d708fb593c5e0a6ad3d:1fddaba168f84fcf97e0d549faaeaff0@sentry.io/263057' } diff --git a/problem/views/admin.py b/problem/views/admin.py index e06ff619..5a5268aa 100644 --- a/problem/views/admin.py +++ b/problem/views/admin.py @@ -300,8 +300,6 @@ class ProblemAPI(ProblemBase): except Problem.DoesNotExist: return self.error("Problem does not exists") ensure_created_by(problem, request.user) - if Submission.objects.filter(problem=problem).exists(): - return self.error("Can't delete the problem as it has submissions") d = os.path.join(settings.TEST_CASE_DIR, problem.test_case_id) if os.path.isdir(d): shutil.rmtree(d, ignore_errors=True) @@ -541,7 +539,7 @@ class ExportProblemAPI(APIView): with zipfile.ZipFile(path, "w") as zip_file: for index, problem in enumerate(problems): self.process_one_problem(zip_file=zip_file, user=request.user, problem=problem, index=index + 1) - delete_files.apply_async((path,), countdown=300) + delete_files.send_with_options(args=(path,), delay=300_000) resp = FileResponse(open(path, "rb")) resp["Content-Type"] = "application/zip" resp["Content-Disposition"] = f"attachment;filename=problem-export.zip" diff --git a/submission/views/admin.py b/submission/views/admin.py index d2312c04..87972561 100644 --- a/submission/views/admin.py +++ b/submission/views/admin.py @@ -18,5 +18,5 @@ class SubmissionRejudgeAPI(APIView): submission.statistic_info = {} submission.save() - judge_task.delay(submission.id, submission.problem.id) + judge_task.send(submission.id, submission.problem.id) return self.success() diff --git a/submission/views/oj.py b/submission/views/oj.py index 8501efb4..0f7c4a58 100644 --- a/submission/views/oj.py +++ b/submission/views/oj.py @@ -80,7 +80,7 @@ class SubmissionAPI(APIView): contest_id=data.get("contest_id")) # use this for debug # JudgeDispatcher(submission.id, problem.id).judge() - judge_task.delay(submission.id, problem.id) + judge_task.send(submission.id, problem.id) if hide_id: return self.success() else: diff --git a/utils/api/api.py b/utils/api/api.py index 3f3f24ca..a603bfb9 100644 --- a/utils/api/api.py +++ b/utils/api/api.py @@ -1,7 +1,6 @@ import functools import json import logging -from collections import OrderedDict from django.http import HttpResponse, QueryDict from django.utils.decorators import method_decorator @@ -98,6 +97,8 @@ class APIView(View): elif isinstance(errors, list): return self.extract_errors(errors[0], key) + return key, errors + def invalid_serializer(self, serializer): key, error = self.extract_errors(serializer.errors) if key == "non_field_errors": diff --git a/utils/shortcuts.py b/utils/shortcuts.py index ea0d094f..1569ce31 100644 --- a/utils/shortcuts.py +++ b/utils/shortcuts.py @@ -81,3 +81,7 @@ def send_email(smtp_config, from_name, to_email, to_name, subject, content): def get_env(name, default=""): return os.environ.get(name, default) + + +def DRAMATIQ_WORKER_ARGS(time_limit=3600_000, max_retries=0, max_age=7200_000): + return {"max_retries": max_retries, "time_limit": time_limit, "max_age": max_age} diff --git a/utils/tasks.py b/utils/tasks.py index 442b0bc4..26a21805 100644 --- a/utils/tasks.py +++ b/utils/tasks.py @@ -1,8 +1,10 @@ import os -from celery import shared_task +import dramatiq + +from utils.shortcuts import DRAMATIQ_WORKER_ARGS -@shared_task +@dramatiq.actor(**DRAMATIQ_WORKER_ARGS()) def delete_files(*args): for item in args: try: