Skip to content

Commit

Permalink
#318: add a feature to get detailed information of submissions
Browse files Browse the repository at this point in the history
  • Loading branch information
kmyk committed Feb 18, 2019
1 parent 79a26ff commit 7c3c547
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 49 deletions.
11 changes: 11 additions & 0 deletions onlinejudge/implementation/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,14 @@ def remove_suffix(s: str, suffix: str) -> str:


tzinfo_jst = datetime.timezone(datetime.timedelta(hours=+9), 'JST')


def getter_with_load_details(name: str) -> Callable:
def wrapper(self, session: Optional[requests.Session] = None):
if getattr(self, name) is None:
getattr(self, '_load_details')(session)
attr = getattr(self, name)
assert attr is not None
return attr

return wrapper
158 changes: 109 additions & 49 deletions onlinejudge/service/atcoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def _from_table_row(cls, tr: bs4.Tag) -> 'AtCoderProblem':
tds = tr.find_all('td')
assert len(tds) == 5
path = tds[1].find('a')['href']
self = cls.from_url('https://atcoder.jp/' + path)
self = cls.from_url('https://atcoder.jp' + path)
assert self is not None
self._alphabet = tds[0].text
self._task_name = tds[1].text
Expand Down Expand Up @@ -453,36 +453,28 @@ def _get_task_id(self, session: Optional[requests.Session] = None) -> int:
def _load_details(self, session: Optional[requests.Session] = None) -> int:
raise NotImplementedError

def get_task_name(self) -> str:
if self._task_name is None:
self._load_details()
assert self._task_name is not None
return self._task_name

def get_time_limit_msec(self) -> int:
if self._time_limit_msec is None:
self._load_details()
assert self._time_limit_msec is not None
return self._time_limit_msec

def get_memory_limit_mb(self) -> int:
if self._memory_limit_mb is None:
self._load_details()
assert self._memory_limit_mb is not None
return self._memory_limit_mb

def get_alphabet(self) -> str:
if self._alphabet is None:
self._load_details()
assert self._alphabet is not None
return self._alphabet
get_task_name = utils.getter_with_load_details('_task_name') # type: Callable[..., str]
get_time_limit_msec = utils.getter_with_load_details('_time_limit_msec') # type: Callable[..., int]
get_memory_limit_mb = utils.getter_with_load_details('_memory_limit_mb') # type: Callable[..., int]
get_alphabet = utils.getter_with_load_details('_alphabet') # type: Callable[..., str]


class AtCoderSubmission(onlinejudge.type.Submission):
def __init__(self, contest_id: str, submission_id: int, problem_id: Optional[str] = None):
self.contest_id = contest_id
self.submission_id = submission_id
self.problem_id = problem_id
self._problem_id = problem_id
self._source_code = None # type: Optional[bytes]
self._submission_time = None # type: Optional[datetime.datetime]
self._user_id = None # type: Optional[str]
self._language_name = None # type: Optional[str]
self._score = None # type: Optional[int]
self._code_size = None # type: Optional[int]
self._status = None # type: Optional[str]
self._exec_time_ms = None # type: Optional[int]
self._memory_kb = None # type: Optional[int]
self._compile_error = None # type: Optional[str]
self._test_cases = None # type: Optional[List[AtCoderSubmissionTestCaseResult]]

@classmethod
def from_url(cls, s: str, problem_id: Optional[str] = None) -> Optional['AtCoderSubmission']:
Expand Down Expand Up @@ -520,37 +512,105 @@ def from_url(cls, s: str, problem_id: Optional[str] = None) -> Optional['AtCoder

return None

def get_url(self) -> str:
return 'http://{}.contest.atcoder.jp/submissions/{}'.format(self.contest_id, self.submission_id)

def get_problem(self) -> AtCoderProblem:
if self.problem_id is None:
raise ValueError
return AtCoderProblem(self.contest_id, self.problem_id)
def get_url(self, type: str = 'latest', lang: Optional[str] = None) -> str:
if type == 'latest' or type == 'beta':
url = 'https://atcoder.jp/contests/{}/submissions/{}'.format(self.contest_id, self.submission_id)
elif type == 'old':
url = 'https://{}.contest.atcoder.jp/submissions/{}'.format(self.contest_id, self.submission_id)
else:
assert False
if lang is not None:
url += '?lang={}'.format(lang)
return url

def get_service(self) -> AtCoderService:
return AtCoderService()

def download(self, session: Optional[requests.Session] = None) -> str:
return self.get_source_code(session=session).decode()

def _load_details(self, session: Optional[requests.Session] = None) -> None:
session = session or utils.new_default_session()
# get
resp = _request('GET', self.get_url(), session=session)
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
if AtCoderService._report_messages(msgs, unexpected=True):
raise RuntimeError
# parse
resp = _request('GET', self.get_url(type='beta', lang='en'), session=session)
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
code = None
for pre in soup.find_all('pre'):
log.debug('pre tag: %s', str(pre))
prv = utils.previous_sibling_tag(pre)
if not (prv and prv.name == 'h3' and 'Source code' in prv.text):
continue
code = pre.string
if code is None:
log.error('source code not found')
raise RuntimeError
return code

# Submission #N
id_, = soup.find_all('span', class_='h2')
assert id_.text == 'Submission #{}'.format(self.submission_id)

# Source Code
source_code = soup.find(id='submission-code')
self._source_code = source_code.text.encode()

submission_info, test_cases_summary, test_cases_data = soup.find_all('table')

# Submission Info
data = {} # type: Dict[str, str]
for tr in submission_info.find_all('tr'):
key = tr.find('th').text.strip()
value = tr.find('td').text.strip()
data[key] = value

if key == 'Task':
problem = AtCoderProblem.from_url('https://atcoder.jp' + tr.find('a')['href'])
assert problem is not None
self._problem_id = problem.problem_id

self._submission_time = datetime.datetime.strptime(data['Submission Time'], '%Y-%m-%d %H:%M:%S+0900').replace(tzinfo=utils.tzinfo_jst)
self._user_id = data['User']
self._language_name = data['Language']
self._score = int(data['Score'])
self._code_size = int(utils.remove_suffix(data['Code Size'], ' Byte'))
self._status = data['Status']
self._exec_time_ms = int(utils.remove_suffix(data['Exec Time'], ' ms'))
self._memory_kb = int(utils.remove_suffix(data['Memory'], ' KB'))

# Compile Error
compile_error = soup.find('h4', text='Compile Error')
if compile_error is None:
self.compile_error = ''
else:
compile_error = compile_error.find_next_sibling('pre')
self.compile_error = compile_error.text

# Test Cases
trs = test_cases_data.find('tbody').find_all('tr')
self._test_cases = [AtCoderSubmissionTestCaseResult._from_table_row(tr) for tr in trs]

def get_problem(self, session: Optional[requests.Session] = None) -> AtCoderProblem:
if self._problem_id is None:
self._load_details(session=session)
assert self._problem_id is not None
return AtCoderProblem(self.contest_id, self._problem_id)

get_source_code = utils.getter_with_load_details('_source_code') # type: Callable[..., bytes]
get_compile_error = utils.getter_with_load_details('_compiler_error') # type: Callable[..., str]
get_user_id = utils.getter_with_load_details('_user_id') # type: Callable[..., str]
get_submission_time = utils.getter_with_load_details('_submission_time') # type: Callable[..., datetime.datetime]
get_language_name = utils.getter_with_load_details('_language_name') # type: Callable[..., str]
get_score = utils.getter_with_load_details('_score') # type: Callable[..., int]
get_code_size = utils.getter_with_load_details('_code_size') # type: Callable[..., int]
get_status = utils.getter_with_load_details('_status') # type: Callable[..., str]
get_exec_time_ms = utils.getter_with_load_details('_exec_time_ms') # type: Callable[..., int]
get_memory_kb = utils.getter_with_load_details('_memory_kb') # type: Callable[..., int]
get_test_cases = utils.getter_with_load_details('_test_cases') # type: Callable[..., List[AtCoderSubmissionTestCaseResult]]


class AtCoderSubmissionTestCaseResult(object):
def __init__(self, case_name: str, status: str, exec_time_ms: int, memory_kb: int):
self.case_name = case_name
self.status = status
self.exec_time_ms = exec_time_ms
self.memory_kb = memory_kb

@classmethod
def _from_table_row(cls, tr: bs4.Tag) -> 'AtCoderSubmissionTestCaseResult':
tds = tr.find_all('td')
case_name = tds[0].text
status = tds[1].text
exec_time_ms = int(utils.remove_suffix(tds[2].text, ' ms'))
memory_kb = int(utils.remove_suffix(tds[3].text, ' KB'))
return AtCoderSubmissionTestCaseResult(case_name, status, exec_time_ms, memory_kb)


onlinejudge.dispatch.services += [AtCoderService]
Expand Down
26 changes: 26 additions & 0 deletions tests/service_atcoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,32 @@ def test_from_url(self):
self.assertEqual(AtCoderSubmission.from_url('https://qupc2014.contest.atcoder.jp/submissions/1444440').contest_id, 'qupc2014')
self.assertEqual(AtCoderSubmission.from_url('https://qupc2014.contest.atcoder.jp/submissions/1444440').submission_id, 1444440)

def test_submission_info(self):
submission = AtCoderSubmission.from_url('https://atcoder.jp/contests/agc030/submissions/3904911')
self.assertEqual(submission.get_submission_time().year, 2018)
self.assertEqual(submission.get_submission_time().month, 12)
self.assertEqual(submission.get_submission_time().day, 31)
self.assertEqual(submission.get_user_id(), 'kimiyuki')
self.assertEqual(submission.get_problem().problem_id, 'agc030_b')
self.assertEqual(submission.get_language_name(), 'C++14 (GCC 5.4.1)')
self.assertEqual(submission.get_score(), 800)
self.assertEqual(submission.get_code_size(), 1457)
self.assertEqual(submission.get_exec_time_ms(), 85)
self.assertEqual(submission.get_memory_kb(), 3328)

def test_get_source_code(self):
submission = AtCoderSubmission.from_url('https://atcoder.jp/contests/abc100/submissions/3082514')
self.assertEqual(submission.get_source_code(), b'/9\\|\\B/c:(\ncYay!')
self.assertEqual(submission.get_code_size(), 16)

submission = AtCoderSubmission.from_url('https://atcoder.jp/contests/abc100/submissions/4069980')
self.assertEqual(submission.get_source_code(), b'/9\\|\\B/c:(\r\ncYay!')
self.assertEqual(submission.get_code_size(), 17)

submission = AtCoderSubmission.from_url('https://atcoder.jp/contests/abc100/submissions/4317534')
self.assertEqual(submission.get_source_code(), b'/9\\|\\B/c:(\r\ncYay!\r\n')
self.assertEqual(submission.get_code_size(), 19)


if __name__ == '__main__':
unittest.main()

0 comments on commit 7c3c547

Please sign in to comment.