diff --git a/onlinejudge/service/atcoder.py b/onlinejudge/service/atcoder.py index ccbfeb34..dd46427d 100644 --- a/onlinejudge/service/atcoder.py +++ b/onlinejudge/service/atcoder.py @@ -295,6 +295,55 @@ def list_problems(self, session: Optional[requests.Session] = None) -> List['AtC tbody = soup.find('tbody') return [AtCoderProblem._from_table_row(tr) for tr in tbody.find_all('tr')] + def iterate_submissions_with(self, me: bool = False, problem_id: Optional[str] = None, language_id: Optional[LanguageId] = None, status: Optional[str] = None, user_glob: Optional[str] = None, order: Optional[str] = None, desc: bool = False, lang: Optional[str] = None, session: Optional[requests.Session] = None) -> Generator['AtCoderSubmission', None, None]: + """ + :note: If you use certain combination of options, then the results may not correct when there are new submissions while crawling. + """ + assert status in (None, 'AC', 'WA', 'TLE', 'MLE', 'RE', 'CE', 'QLE', 'OLE', 'IE', 'WJ', 'WR', 'Judging') + assert order in (None, 'created', 'score', 'source_length', 'time_consumption', 'memory_consumption') + if desc: + assert order is not None + + base_url = 'https://atcoder.jp/contests/{}/submissions'.format(self.contest_id) + if me: + base_url += '/me' + params = {} + if problem_id is not None: + params['f.Task'] = problem_id + if language_id is not None: + params['f.Language'] = language_id + if language_id is not None: + params['f.Status'] = status + if language_id is not None: + params['f.User'] = user_glob + if order is not None: + params['orderBy'] = order + if desc: + params['desc'] = 'true' + + # get + session = session or utils.get_default_session() + for page in itertools.count(1): + params_page = ({'page': str(page)} if page >= 2 else {}) + url = base_url + '?' + urllib.parse.urlencode({**params, **params_page}) + resp = _request('GET', url, session=session) + + # parse + soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) + tbodies = soup.find_all('tbody') + if len(tbodies) == 0: + break # No Submissions + assert len(tbodies) == 1 + tbody = tbodies[0] + for tr in tbody.find_all('tr'): + yield AtCoderSubmission._from_table_row(tr, contest_id=self.contest_id) + + def iterate_submissions(self, session: Optional[requests.Session] = None) -> Generator['AtCoderSubmission', None, None]: + """ + :note: in implementation, use "ORDER BY created DESC" to list all submissions even when there are new submissions + """ + yield from self.iterate_submissions_with(order='created', desc=False, session=session) + class AtCoderProblem(onlinejudge.type.Problem): """ @@ -549,6 +598,12 @@ def get_score(self, session: Optional[requests.Session] = None) -> Optional[int] get_memory_limit_byte = utils.getter_with_load_details('_memory_limit_byte', type=int) # type: Callable[..., int] get_alphabet = utils.getter_with_load_details('_alphabet', type=str) # type: Callable[..., str] + def iterate_submissions(self, session: Optional[requests.Session] = None) -> Generator['AtCoderSubmission', None, None]: + """ + :note: in implementation, use "ORDER BY created DESC" to list all submissions even when there are new submissions + """ + yield from self.get_contest().iterate_submissions_with(problem_id=self.problem_id, order='created', desc=False, session=session) + class AtCoderSubmission(onlinejudge.type.Submission): """ @@ -573,6 +628,24 @@ def __init__(self, contest_id: str, submission_id: int, problem_id: Optional[str self._test_sets = None # type: Optional[List[AtCoderSubmissionTestSet]] self._test_cases = None # type: Optional[List[AtCoderSubmissionTestCaseResult]] + @classmethod + def _from_table_row(cls, tr: bs4.Tag, contest_id: str) -> Optional['AtCoderSubmission']: + tds = tr.find_all('td') + assert len(tds) in (8, 10) + self = cls.from_url('https://atcoder.jp' + tds[-1].find('a')['href']) + + self._submission_time = datetime.datetime.strptime(tds[0].text, '%Y-%m-%d %H:%M:%S+0900').replace(tzinfo=utils.tzinfo_jst) + self.problem_id = AtCoderProblem.from_url('https://atcoder.jp' + tds[1].find('a')['href']).problem_id + self._user_id = tds[2].find_all('a')[0]['href'].split('/')[-1] + self._language_name = tds[3].text + self._score = int(tds[4].text) + self._code_size = int(utils.remove_suffix(tds[5].text, ' Byte')) + self._status = tds[6].text + if len(tds) == 10: + self._exec_time_msec = int(utils.remove_suffix(tds[7].text, ' ms')) + self._memory_byte = int(utils.remove_suffix(tds[8].text, ' KB')) * 1000 + return self + @classmethod def from_url(cls, s: str, problem_id: Optional[str] = None) -> Optional['AtCoderSubmission']: submission_id = None # type: Optional[int] diff --git a/tests/service_atcoder.py b/tests/service_atcoder.py index 829d9e45..8c4dcaf5 100644 --- a/tests/service_atcoder.py +++ b/tests/service_atcoder.py @@ -69,6 +69,15 @@ def test_list_problems(self): self.assertEqual(problems[6].get_alphabet(), 'F2') self.assertEqual(problems[6].problem_id, 'agc028_f2') + def test_iterate_submissions(self): + contest = AtCoderContest.from_url('https://atcoder.jp/contests/code-festival-2014-exhibition-open') + submissions = list(contest.iterate_submissions()) + self.assertGreater(len(submissions), 300) + self.assertEqual(submissions[0].get_code_size(), 276) + self.assertEqual(submissions[0].get_status(), 'WA') + self.assertEqual(submissions[1].get_user_id(), 'snuke') + self.assertEqual(submissions[1].get_status(), 'WA') + class AtCoderProblemTest(unittest.TestCase): def test_from_url(self): @@ -93,6 +102,14 @@ def test_get_score(self): self.assertEqual(AtCoderProblem.from_url('https://atcoder.jp/contests/future-contest-2018-final/tasks/future_contest_2018_final_a').get_score(), 50000000) self.assertEqual(AtCoderProblem.from_url('https://atcoder.jp/contests/abc001/tasks/abc001_4').get_score(), None) + def test_iterate_submissions(self): + problem = AtCoderProblem.from_url('https://atcoder.jp/contests/abc119/tasks/abc119_c') + submissions = problem.iterate_submissions() + self.assertEqual(next(submissions).get_score(), 300) + self.assertEqual(next(submissions).get_code_size(), 1208) + self.assertEqual(next(submissions).get_exec_time_msec(), 2) + self.assertEqual(next(submissions).get_memory_byte(), 256 * 1000) + class AtCoderSubmissionTest(unittest.TestCase): def test_from_url(self):