完善队列功能

This commit is contained in:
virusdefender 2016-04-08 20:13:50 +08:00
parent 4dda5730af
commit 896e49d053
3 changed files with 25 additions and 13 deletions

View File

@ -121,7 +121,8 @@ STATIC_URL = '/static/'
CELERY_ROUTES = {
'server.tasks.submit_dispatcher': {'queue': 'local'},
'server.tasks.release_robot_user': {'queue': 'local'},
'server.tasks.submit_waiting_submission': {'queue': 'local'},
'server.tasks.update_submission': {'queue': 'local'},
'server.tasks.get_problem': {'queue': 'robot'}
'server.tasks.get_problem': {'queue': 'robot'},
'server.tasks.submit': {'queue': 'robot'},
}

View File

@ -42,6 +42,7 @@ admin.site.register(Problem, ProblemAdmin)
class SubmissionAdmin(admin.ModelAdmin):
list_display = ["api_key", "problem", "language", "result", "create_time", "status"]
ordering = ["create_time"]
admin.site.register(Submission, SubmissionAdmin)

View File

@ -5,7 +5,7 @@ import json
from openvj import celery_app
from robots.utils import Result
from .models import RobotUserStatus, SubmissionStatus
from .models import RobotUserStatus, SubmissionStatus, SubmissionWaitingQueue
# remote robot task
@ -17,7 +17,6 @@ def get_problem(robot, url):
# remote robot task
@celery_app.task
def submit(robot, robot_user, submit_url, origin_id, language, code):
print("submit")
try:
origin_submission_id = robot.submit(submit_url, language, code, origin_id)
except Exception as e:
@ -28,7 +27,6 @@ def submit(robot, robot_user, submit_url, origin_id, language, code):
time.sleep(2)
retries = 20
while retries:
print("retry", retries)
try:
result = robot.get_result(origin_submission_id, robot_user.username)
except Exception as e:
@ -43,11 +41,24 @@ def submit(robot, robot_user, submit_url, origin_id, language, code):
"origin_submission_id": origin_submission_id}
def release_robot_user(robot_user):
robot_user.status = RobotUserStatus.free
robot_user.save(update_fields=["status"])
# local task
@celery_app.task
def release_robot_user(submit_result, robot_user):
robot_user.status = RobotUserStatus.free
robot_user.save()
def submit_waiting_submission(submit_result, problem, robot, robot_user):
waiting_queue = SubmissionWaitingQueue.objects.all().order_by("create_time")
if waiting_queue.exists():
queue_head = waiting_queue.first()
submission = queue_head.submission
task_id = submit_dispatcher.delay(problem, submission, robot_user, robot).id
submission.task_id = task_id
submission.save(update_fields=["task_id"])
queue_head.delete()
else:
release_robot_user(robot_user)
# local task
@ -59,18 +70,17 @@ def update_submission(submit_result, submission):
submission.memory = submit_result["memory"]
submission.info = json.dumps(submit_result["info"])
submission.status = SubmissionStatus.done
print("updated")
submission.save(update_fields=["origin_submission_id", "result", "cpu_time", "memory", "info", "status"])
# local task
@celery_app.task
def submit_dispatcher(problem, submission, robot_user, robot):
task_id = submit.apply_async((robot, robot_user, problem.submit_url, problem.origin_id,
submission.language, submission.code),
task_id = submit.apply_async(args=(robot, robot_user, problem.submit_url, problem.origin_id,
submission.language, submission.code),
# link相当于执行成功后的回调函数
link=[release_robot_user.s(robot_user)])
link=[submit_waiting_submission.s(problem, robot, robot_user),
update_submission.s(submission)]).id
submission.robot_user = robot_user
submission.submit_task_id = task_id
print(task_id)
submission.save(update_fields=["submit_task_id", "robot_user"])