Skip to content

Commit

Permalink
#318: add the feature to iterate submissions of AtCoder
Browse files Browse the repository at this point in the history
  • Loading branch information
kmyk committed Mar 1, 2019
1 parent f1a0184 commit c87aee5
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
73 changes: 73 additions & 0 deletions onlinejudge/service/atcoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,55 @@ def list_problems(self, session: Optional[requests.Session] = None) -> List['AtC
tbody = soup.find('tbody')
return [AtCoderProblem._from_table_row(tr) for tr in tbody.find_all('tr')]

def iterate_submissions_with(self, me: bool = False, problem_id: Optional[str] = None, language_id: Optional[LanguageId] = None, status: Optional[str] = None, user_glob: Optional[str] = None, order: Optional[str] = None, desc: bool = False, lang: Optional[str] = None, session: Optional[requests.Session] = None) -> Generator['AtCoderSubmission', None, None]:
"""
:note: If you use certain combination of options, then the results may not correct when there are new submissions while crawling.
"""
assert status in (None, 'AC', 'WA', 'TLE', 'MLE', 'RE', 'CE', 'QLE', 'OLE', 'IE', 'WJ', 'WR', 'Judging')
assert order in (None, 'created', 'score', 'source_length', 'time_consumption', 'memory_consumption')
if desc:
assert order is not None

base_url = 'https://atcoder.jp/contests/{}/submissions'.format(self.contest_id)
if me:
base_url += '/me'
params = {}
if problem_id is not None:
params['f.Task'] = problem_id
if language_id is not None:
params['f.Language'] = language_id
if language_id is not None:
params['f.Status'] = status
if language_id is not None:
params['f.User'] = user_glob
if order is not None:
params['orderBy'] = order
if desc:
params['desc'] = 'true'

# get
session = session or utils.get_default_session()
for page in itertools.count(1):
params_page = ({'page': str(page)} if page >= 2 else {})
url = base_url + '?' + urllib.parse.urlencode({**params, **params_page})
resp = _request('GET', url, session=session)

# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
tbodies = soup.find_all('tbody')
if len(tbodies) == 0:
break # No Submissions
assert len(tbodies) == 1
tbody = tbodies[0]
for tr in tbody.find_all('tr'):
yield AtCoderSubmission._from_table_row(tr, contest_id=self.contest_id)

def iterate_submissions(self, session: Optional[requests.Session] = None) -> Generator['AtCoderSubmission', None, None]:
"""
:note: in implementation, use "ORDER BY created DESC" to list all submissions even when there are new submissions
"""
yield from self.iterate_submissions_with(order='created', desc=False, session=session)


class AtCoderProblem(onlinejudge.type.Problem):
"""
Expand Down Expand Up @@ -549,6 +598,12 @@ def get_score(self, session: Optional[requests.Session] = None) -> Optional[int]
get_memory_limit_byte = utils.getter_with_load_details('_memory_limit_byte', type=int) # type: Callable[..., int]
get_alphabet = utils.getter_with_load_details('_alphabet', type=str) # type: Callable[..., str]

def iterate_submissions(self, session: Optional[requests.Session] = None) -> Generator['AtCoderSubmission', None, None]:
"""
:note: in implementation, use "ORDER BY created DESC" to list all submissions even when there are new submissions
"""
yield from self.get_contest().iterate_submissions_with(problem_id=self.problem_id, order='created', desc=False, session=session)


class AtCoderSubmission(onlinejudge.type.Submission):
"""
Expand All @@ -573,6 +628,24 @@ def __init__(self, contest_id: str, submission_id: int, problem_id: Optional[str
self._test_sets = None # type: Optional[List[AtCoderSubmissionTestSet]]
self._test_cases = None # type: Optional[List[AtCoderSubmissionTestCaseResult]]

@classmethod
def _from_table_row(cls, tr: bs4.Tag, contest_id: str) -> Optional['AtCoderSubmission']:
tds = tr.find_all('td')
assert len(tds) in (8, 10)
self = cls.from_url('https://atcoder.jp' + tds[-1].find('a')['href'])

self._submission_time = datetime.datetime.strptime(tds[0].text, '%Y-%m-%d %H:%M:%S+0900').replace(tzinfo=utils.tzinfo_jst)
self.problem_id = AtCoderProblem.from_url('https://atcoder.jp' + tds[1].find('a')['href']).problem_id
self._user_id = tds[2].find_all('a')[0]['href'].split('/')[-1]
self._language_name = tds[3].text
self._score = int(tds[4].text)
self._code_size = int(utils.remove_suffix(tds[5].text, ' Byte'))
self._status = tds[6].text
if len(tds) == 10:
self._exec_time_msec = int(utils.remove_suffix(tds[7].text, ' ms'))
self._memory_byte = int(utils.remove_suffix(tds[8].text, ' KB')) * 1000
return self

@classmethod
def from_url(cls, s: str, problem_id: Optional[str] = None) -> Optional['AtCoderSubmission']:
submission_id = None # type: Optional[int]
Expand Down
17 changes: 17 additions & 0 deletions tests/service_atcoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ def test_list_problems(self):
self.assertEqual(problems[6].get_alphabet(), 'F2')
self.assertEqual(problems[6].problem_id, 'agc028_f2')

def test_iterate_submissions(self):
contest = AtCoderContest.from_url('https://atcoder.jp/contests/code-festival-2014-exhibition-open')
submissions = list(contest.iterate_submissions())
self.assertGreater(len(submissions), 300)
self.assertEqual(submissions[0].get_code_size(), 276)
self.assertEqual(submissions[0].get_status(), 'WA')
self.assertEqual(submissions[1].get_user_id(), 'snuke')
self.assertEqual(submissions[1].get_status(), 'WA')


class AtCoderProblemTest(unittest.TestCase):
def test_from_url(self):
Expand All @@ -93,6 +102,14 @@ def test_get_score(self):
self.assertEqual(AtCoderProblem.from_url('https://atcoder.jp/contests/future-contest-2018-final/tasks/future_contest_2018_final_a').get_score(), 50000000)
self.assertEqual(AtCoderProblem.from_url('https://atcoder.jp/contests/abc001/tasks/abc001_4').get_score(), None)

def test_iterate_submissions(self):
problem = AtCoderProblem.from_url('https://atcoder.jp/contests/abc119/tasks/abc119_c')
submissions = problem.iterate_submissions()
self.assertEqual(next(submissions).get_score(), 300)
self.assertEqual(next(submissions).get_code_size(), 1208)
self.assertEqual(next(submissions).get_exec_time_msec(), 2)
self.assertEqual(next(submissions).get_memory_byte(), 256 * 1000)


class AtCoderSubmissionTest(unittest.TestCase):
def test_from_url(self):
Expand Down

0 comments on commit c87aee5

Please sign in to comment.