mirror of
https://github.com/QingdaoU/OpenVJ.git
synced 2024-12-28 15:31:48 +00:00
Merge branch 'master' into server
This commit is contained in:
commit
2ba22054e1
@ -1,23 +1,27 @@
|
||||
# coding=utf-8
|
||||
import re
|
||||
import json
|
||||
from .robot import Robot
|
||||
from .exceptions import AuthFailed, RequestFailed, RegexError, SubmitProblemFailed
|
||||
from .exceptions import AuthFailed, RegexError, SubmitProblemFailed
|
||||
from .utils import Language, Result
|
||||
|
||||
|
||||
class CodeForcesRobot(Robot):
|
||||
def __init__(self, cookies=None):
|
||||
def __init__(self, cookies=None, token=""):
|
||||
super().__init__(cookies=cookies)
|
||||
self.token = ""
|
||||
self.token = token
|
||||
|
||||
def save(self):
|
||||
return {"cookies": self.cookies, "token": self.token}
|
||||
|
||||
def check_url(self, url):
|
||||
return re.compile(r"^http://codeforces.com/problemset/problem/\d+/[A-Z]$").match(url) is not None
|
||||
|
||||
def _get_token(self):
|
||||
r = self.get("http://codeforces.com/enter", headers={"Referer": "http://codeforces.com/enter"})
|
||||
r = self.get("http://codeforces.com/problemset", headers={"Referer": "http://codeforces.com/problemset"}, cookies=self.cookies)
|
||||
self.check_status_code(r)
|
||||
self.token = re.compile(r'<meta name="X-Csrf-Token" content="([\s\S]*?)"/>').findall(r.text)[0]
|
||||
self.cookies = dict(r.cookies)
|
||||
self.cookies.update(dict(r.cookies))
|
||||
|
||||
def login(self, username, password):
|
||||
self._get_token()
|
||||
@ -27,7 +31,8 @@ class CodeForcesRobot(Robot):
|
||||
cookies=self.cookies)
|
||||
if r.status_code != 302:
|
||||
raise AuthFailed("Failed to login CodeForces")
|
||||
self.cookies = dict(r.cookies)
|
||||
self.cookies.update(dict(r.cookies))
|
||||
self._get_token()
|
||||
|
||||
@property
|
||||
def is_logged_in(self):
|
||||
@ -37,23 +42,101 @@ class CodeForcesRobot(Robot):
|
||||
def get_problem(self, url):
|
||||
r = self.get(url, headers={"Referer": "https://codeforces.com"})
|
||||
regex = {"title": r'<div class="header"><div class="title">([\s\S]*?)</div>',
|
||||
"time_limit": r'<div class="time-limit"><div class="property-title">time limit per test</div>(\d+)\s*seconds</div>',
|
||||
"time_limit": r'<div class="time-limit"><div class="property-title">time limit per test</div>(\d+)\s*second',
|
||||
"memory_limit": r'<div class="memory-limit"><div class="property-title">memory limit per test</div>(\d+)\s*megabytes</div>',
|
||||
"description": r'<div class="output-file"><div class="property-title">output</div>[\s\S]*?</div></div><div>([\s\S]*?)</div>',
|
||||
"input_description": r'<div class="section-title">Input</div>([\s\S]*?)</div>',
|
||||
"output_description": r'<div class="section-title">Output</div>([\s\s]*?)</div>'}
|
||||
"hint": r'<div class="note"><div class="section-title">Note</div>([\s\S]*?)</div>',
|
||||
"output_description": r'<div class="section-title">Output</div>([\s\S]*?)</div>'}
|
||||
input_samples_regex = r'<div class="title">Input</div><pre>([\s\S]*?)</pre></div>'
|
||||
output_samples_regex = r'<div class="title">Output</div><pre>([\s\S]*?)</pre></div>'
|
||||
data = {}
|
||||
for k, v in regex.items():
|
||||
items = re.compile(v).findall(r.text)
|
||||
if not items:
|
||||
if k == "hint":
|
||||
data[k] = None
|
||||
continue
|
||||
raise RegexError("No such data")
|
||||
data[k] = self._clean_html(items[0])
|
||||
|
||||
data["samples"] = []
|
||||
data["memory_limit"] = int(data["memory_limit"])
|
||||
data["time_limit"] = int(data["time_limit"]) * 1000
|
||||
input_samples = re.compile(input_samples_regex).findall(r.text)
|
||||
output_samples = re.compile(output_samples_regex).findall(r.text)
|
||||
for i in range(len(input_samples)):
|
||||
data["samples"].append({"input": self._clean_html(input_samples[i]), "output": self._clean_html(output_samples[i])})
|
||||
return data
|
||||
return data
|
||||
|
||||
def submit(self, submit_url, language, code, origin_id):
|
||||
if language == Language.C:
|
||||
language = "43"
|
||||
elif language == Language.CPP:
|
||||
language = "42"
|
||||
else:
|
||||
language = "36"
|
||||
|
||||
r = self._request("post", url=submit_url + "?csrf_token=" + self.token,
|
||||
files={"sourcefile": (" ", ""),
|
||||
"csrf_token": ('', self.token),
|
||||
"action": ('', "submitsolutionformsubmitted"),
|
||||
"submittedProblemCode": ('', origin_id),
|
||||
"programTypeId": ('', language),
|
||||
"source": ('', code),
|
||||
"tabSize ": ('', "4")},
|
||||
cookies=self.cookies,
|
||||
headers={"Referer": submit_url})
|
||||
|
||||
if re.compile(r"You have submitted exactly the same code before").search(r.text) is not None:
|
||||
raise SubmitProblemFailed("You have submitted exactly the same code before")
|
||||
|
||||
if re.compile(r"Source should satisfy regex").search(r.text) is not None:
|
||||
raise SubmitProblemFailed("Source should satisfy regex [^{}]*public\s+(final)?\s*class\s+(\w+).*")
|
||||
|
||||
if re.compile(r"\<title\>Codeforces\</title\>").search(r.text) is not None:
|
||||
raise SubmitProblemFailed("Failed to submit problem, url: %s code %d" % (submit_url, r.status_code))
|
||||
|
||||
def get_result(self, submission_id, username):
|
||||
status_url = "http://codeforces.com/api/user.status?handle=" + username + "&from=1&count=1"
|
||||
s = json.loads(self.get(status_url).text)
|
||||
data = {}
|
||||
data["cpu_time"] = s["result"][0]["timeConsumedMillis"]
|
||||
data["memory"] = s["result"][0]["memoryConsumedBytes"] // 1024
|
||||
# 没有开始判题之前,没有这个字段
|
||||
if "verdict" not in s["result"][0]:
|
||||
data["result"] = "TESTING"
|
||||
else:
|
||||
data["result"] = s["result"][0]["verdict"]
|
||||
|
||||
if data["result"] == "OK":
|
||||
data["result"] = Result.accepted
|
||||
elif data["result"] == "WRONG_ANSWER":
|
||||
data["result"] = Result.wrong_answer
|
||||
elif data["result"] == "TIME_LIMIT_EXCEEDED":
|
||||
data["result"] = Result.time_limit_exceeded
|
||||
elif data["result"] == "COMPILATION_ERROR":
|
||||
data["result"] = Result.compile_error
|
||||
elif data["result"] == "RUNTIME_ERROR":
|
||||
data["result"] = Result.runtime_error
|
||||
elif data["result"] == "MEMORY_LIMIT_EXCEEDED":
|
||||
data["result"] = Result.memory_limit_exceeded
|
||||
elif data["result"] == "TESTING":
|
||||
data["result"] = Result.waiting
|
||||
else:
|
||||
data["result"] = Result.runtime_error
|
||||
|
||||
data["info"] = {"result_text": "", "error": None}
|
||||
if data["result"] == Result.compile_error:
|
||||
e = self.post("http://codeforces.com/data/judgeProtocol",
|
||||
headers={"Referer": "http://codeforces.com/submissions/" + username,
|
||||
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
|
||||
"X-Csrf-Token": self.token,
|
||||
"X-Requested-With": "XMLHttpRequest"},
|
||||
cookies=self.cookies,
|
||||
data={"submissionId": s["result"][0]["id"],
|
||||
"csrf_token": self.token})
|
||||
self.check_status_code(e)
|
||||
data["info"] = {"result_text": "Compilation error", "error": json.loads(e.text)}
|
||||
return data
|
||||
|
||||
|
@ -95,7 +95,7 @@ class HduojRobot(Robot):
|
||||
"Referer": submit_url})
|
||||
|
||||
if r.status_code != 302:
|
||||
raise SubmitProblemFailed("Faild to submit problem, url: %s, status code %d" % (submit_url, r.status_code))
|
||||
raise SubmitProblemFailed("Failed to submit problem, url: %s, status code %d" % (submit_url, r.status_code))
|
||||
|
||||
def get_result(self, submission_id, username):
|
||||
status_url = r"http://acm.hdu.edu.cn/status.php?&user=" + username
|
||||
@ -151,5 +151,5 @@ class HduojRobot(Robot):
|
||||
error = self._clean_html(str(re.compile("<pre>([\s\S]*)</pre>").findall(r.text)))
|
||||
|
||||
return {"result": result, "cpu_time": cpu_time, "memory": memory,
|
||||
"info": {"result_text": self._clean_html(data[0][1])}, "error": error}
|
||||
"info": {"result_text": self._clean_html(data[0][1]), "error": error}}
|
||||
|
||||
|
@ -9,6 +9,9 @@ class Robot(object):
|
||||
def __init__(self, cookies=None):
|
||||
self.cookies = cookies if cookies is not None else {}
|
||||
|
||||
def save(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def check_url(self, url):
|
||||
"""
|
||||
检查一个url是否是本oj的合法的url
|
||||
|
@ -1 +1,36 @@
|
||||
# coding=utf-8
|
||||
import unittest
|
||||
from robots.zoj import ZOJRobot
|
||||
from robots.pat import PATRobot
|
||||
from robots.utils import Language
|
||||
class ZOJRobotTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.tclass = ZOJRobot()
|
||||
self.tclass.login("ltwy", "jiangxuelei")
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def test_check_url(self):
|
||||
self.assertEqual(self.tclass.check_url(r"http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode=12234"), False)
|
||||
self.assertEqual(self.tclass.check_url(r"http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode=12"), False)
|
||||
self.assertEqual(self.tclass.check_url(r"http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode=1234"), True)
|
||||
self.assertEqual(self.tclass.check_url(r"http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode=1223"), True)
|
||||
|
||||
def test_login(self):
|
||||
self.assertEquals(self.tclass.login("ltwy", "jiangxuelei"), None)
|
||||
|
||||
def test_is_logged_in(self):
|
||||
self.assertEquals(self.tclass.is_logged_in, True)
|
||||
|
||||
|
||||
def test_get_problem(self):
|
||||
self.tclass.get_problem(r"http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode=3807")
|
||||
|
||||
def test_submit(self):
|
||||
print(self.tclass.submit(r"http://acm.zju.edu.cn/onlinejudge/submit.do?problemId=1", Language.CPP, r'#include<stdio.h> int main(){ int a,b; scanf("%d %d",&a,&b); printf("%d\n",a+b);return 0;}', int("1001") - 1000))
|
||||
|
||||
def test_get_result(self):
|
||||
self.tclass.get_result("4162525", "ltwy")
|
||||
|
||||
|
126
robots/zoj.py
Normal file
126
robots/zoj.py
Normal file
@ -0,0 +1,126 @@
|
||||
# coding=utf-8
|
||||
import re
|
||||
import html
|
||||
from .robot import Robot
|
||||
from .exceptions import AuthFailed,RequestFailed, RegexError, SubmitProblemFailed
|
||||
from .utils import Language, Result
|
||||
|
||||
|
||||
class ZOJRobot(Robot):
|
||||
def save(self):
|
||||
return {"cookies": self.cookies}
|
||||
|
||||
def check_url(self, url):
|
||||
regex = r"^http://acm.zju.edu.cn/onlinejudge/showProblem.do\?problemCode=(\d{4})$"
|
||||
return re.compile(regex).match(url) is not None
|
||||
|
||||
def login(self, username, password):
|
||||
url = r"http://acm.zju.edu.cn/onlinejudge/login.do"
|
||||
data = {
|
||||
"handle": username,
|
||||
"password": password,
|
||||
"rememberMe": "on"
|
||||
}
|
||||
headers = {
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Referer": "http://acm.zju.edu.cn/onlinejudge/login.do"
|
||||
}
|
||||
r = self.post(url, data, headers)
|
||||
|
||||
if r.status_code != 302:
|
||||
raise AuthFailed("Failed to login ZOJ!")
|
||||
self.cookies = dict(r.cookies)
|
||||
|
||||
@property
|
||||
def is_logged_in(self):
|
||||
r = self.get("http://acm.zju.edu.cn/onlinejudge/editProfile.do", cookies = self.cookies)
|
||||
return r'<td align="right">Confirm Password</td>' in r.text
|
||||
|
||||
def get_problem(self, url):
|
||||
if not self.check_url(url):
|
||||
raise RequestFailed("Invalid ZOJ URL!")
|
||||
regex = {
|
||||
"title": r"<center><span\s*class=\"bigProblemTitle\">(.*)</span></center>",
|
||||
"time_Limit": r"<font\s*color=\"green\">Time\s*Limit:\s*</font>\s*(\d+)\s*(?:Seconds|Second)",
|
||||
"memory_limit": r"<font\s*color=\"green\">Memory\s*Limit:\s*</font>\s*(\d+)\s*KB",
|
||||
"description": r"</center>\s*<hr>[\s\S]*?<p>\s*([\s\S]*?)\s*(?:<b>|<h4>)Input",
|
||||
"input_description": r"(?:<h4>|<b>|<strong>)Input(?:</h4>|</b>|</strong>)\s*([\s\S]*?)\s*(?:<h4>|<b>|<strong>)Output(?:</h4>|</b>|</strong>)",
|
||||
"output_description": r"(?:<h4>|<b>|<strong>)Output(?:</h4>|</b>|</strong>)\s*([\s\S]*?)\s*(?:<h4>|<b>|<strong>)Sample Input",
|
||||
"samples": r"(?:<h4>|<b>|<strong>)Sample\sInput(?:</h4>|</b>|</strong>)\s*<pre>\s*([\s\S]*?)</pre>\s*(?:<h4>|<strong>|<b>)Sample Output(?:</b>|</strong>|</h4>)\s*<pre>([\s\S]*?)</pre>",
|
||||
"hint": r'(?:<h4>|<b>|<strong>)Hint(?:</h4>|</b>|</strong>)\s*<p>[\s\S]*<hr>'
|
||||
}
|
||||
data = self._regex_page(url, regex)
|
||||
data["id"] = re.compile(r"^http://acm.zju.edu.cn/onlinejudge/showProblem.do\?problemCode=(\d{4})$").findall(url)[0]
|
||||
data["memory_limit"] = int(data["memory_limit"]) // 1024
|
||||
data["time_limit"] = int(data["time_limit"])
|
||||
return data
|
||||
|
||||
def _regex_page(self, url, regex):
|
||||
r = self.get(url)
|
||||
self.check_status_code(r)
|
||||
data = {}
|
||||
for k, v in regex.items():
|
||||
items = re.compile(v).findall(r.text)
|
||||
if not items:
|
||||
raise RegexError("NO such data!")
|
||||
if k != "samples":
|
||||
data[k] = self._clean_html(items[0])
|
||||
else:
|
||||
data[k] = {items[0][0]: items[0][1]}
|
||||
return data
|
||||
|
||||
def submit(self, submit_url, language, code, origin_id):
|
||||
if language == Language.C:
|
||||
compiler_id = "1"
|
||||
elif language == Language.CPP:
|
||||
compiler_id = "2"
|
||||
else:
|
||||
compiler_id = "4"
|
||||
r = self.post(submit_url,
|
||||
data={"problemId": str(int(origin_id) - 1000), "languageId": compiler_id, "source": code},
|
||||
cookies=self.cookies,
|
||||
headers={"Referer": "http://acm.zju.edu.cn/",
|
||||
"Content-Type": "application/x-www-form-urlencoded"})
|
||||
if r.status_code != 200:
|
||||
raise SubmitProblemFailed("Failed to submit problem, url: %s, status code: %d" % (submit_url, r.status_code))
|
||||
return re.compile(r"<p>Your source has been submitted. The submission id is <font color='red'>(\d+)</font>").findall(r.text)[0]
|
||||
|
||||
def get_result(self, submission_id, username):
|
||||
url = r"http://acm.zju.edu.cn/onlinejudge/showRuns.do?contestId=1&search=true&firstId=-1&lastId=-1&problemCode=&handle=&idStart=" + \
|
||||
submission_id + r"&idEnd=" + submission_id
|
||||
r = self.get(url, headers={r"Referer": r"http://acm.zju.edu.cn/"}, cookies=self.cookies)
|
||||
res = {"info": {"error": None}, "cpu_time": None, "memory": None}
|
||||
|
||||
compile_list = re.compile(r'<a href="/onlinejudge/showJudgeComment\.do\?submissionId=(\d+)">Compilation Error</a>').findall(r.text)
|
||||
if compile_list:
|
||||
res["result"] = Result.compile_error
|
||||
compile_id = compile_list[0]
|
||||
res["info"]["error"] = self.get(r"http://acm.zju.edu.cn/onlinejudge/showJudgeComment.do?submissionId=" + compile_id,
|
||||
headers={r"Referer": r"http://acm.zju.edu.cn/"}, cookies=self.cookies).text
|
||||
elif r"No submission available." in r.text:
|
||||
res["result"] = Result.waiting
|
||||
else:
|
||||
regex = {
|
||||
"result": r'<span class="judgeReply(?:AC|Other)">\s*([\s\S]*?)\s*</span></td>',
|
||||
"cpu_time": r'<td class="runTime">(\d+)</td>',
|
||||
"memory": r'<td class="runMemory">(\d+)</td>',
|
||||
}
|
||||
result_str = re.compile(regex["result"]).findall(r.text)[0]
|
||||
|
||||
result_mapping = {
|
||||
"Accepted": Result.accepted,
|
||||
"Time Limit Exceeded": Result.time_limit_exceeded,
|
||||
"Memory Limit Exceeded": Result.memory_limit_exceeded,
|
||||
"Compilation Error": Result.compile_error,
|
||||
"Presentation Error": Result.format_error,
|
||||
"Wrong Answer": Result.wrong_answer,
|
||||
"Segmentation Fault": Result.runtime_error,
|
||||
"Non-zero Exit Code": Result.runtime_error,
|
||||
"Floating Point Error": Result.runtime_error,
|
||||
"Output Limit Exceeded": Result.runtime_error,
|
||||
}
|
||||
|
||||
res["result"] = result_mapping[result_str]
|
||||
res["cpu_time"] = int(re.compile(regex["cpu_time"]).findall(r.text)[0])
|
||||
res["memory"] = int(re.compile(regex["memory"]).findall(r.text)[0])
|
||||
return res
|
Loading…
Reference in New Issue
Block a user