From 202b59ae84d930c3151cf7de2fc03dc0ae7e5f9e Mon Sep 17 00:00:00 2001 From: Kimiyuki Onaka Date: Tue, 19 Feb 2019 03:42:04 +0900 Subject: [PATCH 1/3] #318: add code to list contests --- onlinejudge/implementation/utils.py | 3 + onlinejudge/service/atcoder.py | 95 +++++++++++++++++++++++++++++ tests/service_atcoder.py | 32 ++++++++++ 3 files changed, 130 insertions(+) create mode 100644 tests/service_atcoder.py diff --git a/onlinejudge/implementation/utils.py b/onlinejudge/implementation/utils.py index f02c6c01..7aaf9542 100644 --- a/onlinejudge/implementation/utils.py +++ b/onlinejudge/implementation/utils.py @@ -1,6 +1,7 @@ # Python Version: 3.x # -*- coding: utf-8 -*- import contextlib +import datetime import distutils.version import http.client import http.cookiejar @@ -247,3 +248,5 @@ def is_update_available_on_pypi() -> bool: a = distutils.version.StrictVersion(version.__version__) b = distutils.version.StrictVersion(get_latest_version_from_pypi()) return a < b + +tzinfo_jst = datetime.timezone(datetime.timedelta(hours=+9), 'JST') diff --git a/onlinejudge/service/atcoder.py b/onlinejudge/service/atcoder.py index 81090a4d..1bd24c38 100644 --- a/onlinejudge/service/atcoder.py +++ b/onlinejudge/service/atcoder.py @@ -1,5 +1,7 @@ # Python Version: 3.x # -*- coding: utf-8 -*- +import datetime +import itertools import json import posixpath import re @@ -98,6 +100,99 @@ def _report_messages(cls, msgs: List[str], unexpected: bool = False) -> bool: log.failure('unexpected messages found') return bool(msgs) + def iterate_contests(self, lang: str = 'ja', session: Optional[requests.Session] = None) -> Generator['AtCoderContest', None, None]: + assert lang in ('ja', 'en') # NOTE: "lang=ja" is required to see some Japanese-local contests. However you can use "lang=en" to see the English names of contests. + session = session or utils.new_default_session() + last_page = None + for page in itertools.count(1): # 1-based + if last_page is not None and page > last_page: + break + # get + url = 'https://atcoder.jp/contests/archive?lang={}&page={}'.format(lang, page) + resp = _request('GET', url, session=session) + # parse + soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) + if last_page is None: + last_page = int(soup.find('ul', class_='pagination').find_all('li')[-1].text) + log.debug('last page: %s', last_page) + tbody = soup.find('tbody') + for tr in tbody.find_all('tr'): + yield AtCoderContest._from_table_row(tr, lang=lang) + + def get_user_history_url(self, user_id: str) -> str: + return 'https://atcoder.jp/users/{}/history/json'.format(user_id) + + +class AtCoderContest(object): + def __init__(self, contest_id: str): + self.contest_id = contest_id + + # NOTE: some fields remain undefined, comparing `_from_table_row` + self._start_time_url = None # type: Optional[str] + self._contest_name_ja = None # type: Optional[str] + self._contest_name_en = None # type: Optional[str] + self._duration_text = None # type: Optional[str] + self._rated_range = None # type: Optional[str] + + @classmethod + def _from_table_row(cls, tr: bs4.Tag, lang: str) -> 'AtCoderContest': + tds = tr.find_all('td') + assert len(tds) == 4 + anchors = [tds[0].find('a'), tds[1].find('a')] + contest_path = anchors[1]['href'] + assert contest_path.startswith('/contests/') + contest_id = contest_path[len('/contests/'):] + self = AtCoderContest(contest_id) + self._start_time_url = anchors[0]['href'] + if lang == 'ja': + self._contest_name_ja = anchors[1].text + elif lang == 'en': + self._contest_name_en = anchors[1].text + else: + assert False + self._duration_text = tds[2].text + self._rated_range = tds[3].text + return self + + def get_start_time(self) -> datetime.datetime: + if self._start_time_url is None: + raise NotImplementedError + # TODO: we need to use an ISO-format parser + query = urllib.parse.parse_qs(urllib.parse.urlparse(self._start_time_url).query) + assert len(query['iso']) == 1 + assert query['p1'] == ['248'] # means JST + return datetime.datetime.strptime(query['iso'][0], '%Y%m%dT%H%M').replace(tzinfo=utils.tzinfo_jst) + + def get_contest_name(self, lang: Optional[str] = None) -> str: + if lang is None: + if self._contest_name_en is not None: + return self._contest_name_en + elif self._contest_name_ja is not None: + return self._contest_name_ja + else: + raise NotImplementedError + elif lang == 'en': + if self._contest_name_en is None: + raise NotImplementedError + return self._contest_name_en + elif lang == 'ja': + if self._contest_name_ja is None: + raise NotImplementedError + return self._contest_name_ja + else: + assert False + + def get_duration(self) -> datetime.timedelta: + if self._duration_text is None: + raise NotImplementedError + hours, minutes = map(int, self._duration_text.split(':')) + return datetime.timedelta(hours=hours, minutes=minutes) + + def get_rated_range(self) -> str: + if self._rated_range is None: + raise NotImplementedError + return self._rated_range + class AtCoderProblem(onlinejudge.type.Problem): def __init__(self, contest_id: str, problem_id: str): diff --git a/tests/service_atcoder.py b/tests/service_atcoder.py new file mode 100644 index 00000000..1dfdb377 --- /dev/null +++ b/tests/service_atcoder.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +import unittest + +from onlinejudge.service.atcoder import AtCoderService + + +class AtCoderSerivceTest(unittest.TestCase): + def test_from_url(self): + service = AtCoderService() + self.assertEqual(AtCoderService.from_url('https://atcoder.jp/'), service) + self.assertEqual(AtCoderService.from_url('https://beta.atcoder.jp/'), service) + self.assertEqual(AtCoderService.from_url('https://abc001.contest.atcoder.jp/'), service) + self.assertEqual(AtCoderService.from_url('https://atcoder.jp/contests/agc001/submissions/806160'), service) + self.assertEqual(AtCoderService.from_url('https://codeforces.com/'), None) + + def test_iterate_contests(self): + contests = list(AtCoderService().iterate_contests()) + contest_ids = [contest.contest_id for contest in contests] + self.assertIn('arc001', contest_ids) + self.assertIn('abc100', contest_ids) + self.assertIn('kupc2012', contest_ids) + contest, = [contest for contest in contests if contest.contest_id == 'utpc2013'] + self.assertEqual(contest.get_start_time().year, 2014) + self.assertEqual(contest.get_start_time().month, 3) + self.assertEqual(contest.get_start_time().day, 2) + self.assertEqual(contest.get_contest_name(), '東京大学プログラミングコンテスト2013') + self.assertEqual(contest.get_duration().total_seconds(), 5 * 60 * 60) + self.assertEqual(contest.get_rated_range(), 'All') + + +if __name__ == '__main__': + unittest.main() From 32e56889978314980ba553ebbb7f4020751a616a Mon Sep 17 00:00:00 2001 From: Kimiyuki Onaka Date: Tue, 19 Feb 2019 04:22:03 +0900 Subject: [PATCH 2/3] #317: add an internal feature to list the problems in contests --- onlinejudge/implementation/utils.py | 6 +++ onlinejudge/service/atcoder.py | 81 ++++++++++++++++++++++++++++- tests/service_atcoder.py | 22 +++++++- 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/onlinejudge/implementation/utils.py b/onlinejudge/implementation/utils.py index 7aaf9542..282ecc96 100644 --- a/onlinejudge/implementation/utils.py +++ b/onlinejudge/implementation/utils.py @@ -249,4 +249,10 @@ def is_update_available_on_pypi() -> bool: b = distutils.version.StrictVersion(get_latest_version_from_pypi()) return a < b + +def remove_suffix(s: str, suffix: str) -> str: + assert s.endswith(suffix) + return s[:-len(suffix)] + + tzinfo_jst = datetime.timezone(datetime.timedelta(hours=+9), 'JST') diff --git a/onlinejudge/service/atcoder.py b/onlinejudge/service/atcoder.py index 1bd24c38..f734c755 100644 --- a/onlinejudge/service/atcoder.py +++ b/onlinejudge/service/atcoder.py @@ -134,6 +134,24 @@ def __init__(self, contest_id: str): self._duration_text = None # type: Optional[str] self._rated_range = None # type: Optional[str] + @classmethod + def from_url(cls, url: str) -> Optional['AtCoderContest']: + result = urllib.parse.urlparse(url) + + # example: https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d + if result.scheme in ('', 'http', 'https') and result.hostname.endswith('.contest.atcoder.jp'): + contest_id = utils.remove_suffix(result.hostname, '.contest.atcoder.jp') + return cls(contest_id) + + # example: https://atcoder.jp/contests/agc030 + if result.scheme in ('', 'http', 'https') and result.hostname in ('atcoder.jp', 'beta.atcoder.jp'): + m = re.match(r'^/contests/([\w\-_]+)/?$', utils.normpath(result.path)) + if m: + contest_id = m.group(1) + return cls(contest_id) + + return None + @classmethod def _from_table_row(cls, tr: bs4.Tag, lang: str) -> 'AtCoderContest': tds = tr.find_all('td') @@ -193,12 +211,43 @@ def get_rated_range(self) -> str: raise NotImplementedError return self._rated_range + def list_problems(self, session: Optional[requests.Session] = None) -> List['AtCoderProblem']: + # get + session = session or utils.new_default_session() + url = 'https://atcoder.jp/contests/{}/tasks'.format(self.contest_id) + resp = _request('GET', url, session=session) + + # parse + soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser) + tbody = soup.find('tbody') + return [AtCoderProblem._from_table_row(tr) for tr in tbody.find_all('tr')] + class AtCoderProblem(onlinejudge.type.Problem): + # AtCoder has problems independently from contests. Therefore the notions "contest_id", "alphabet", and "url" don't belong to problems itself. + def __init__(self, contest_id: str, problem_id: str): self.contest_id = contest_id - self.problem_id = problem_id + self.problem_id = problem_id # TODO: fix the name, since AtCoder calls this as "task_screen_name" self._task_id = None # type: Optional[int] + self._task_name = None # type: Optional[str] + self._time_limit_msec = None # type: Optional[int] + self._memory_limit_mb = None # type: Optional[int] + self._alphabet = None # type: Optional[str] + + @classmethod + def _from_table_row(cls, tr: bs4.Tag) -> 'AtCoderProblem': + tds = tr.find_all('td') + assert len(tds) == 5 + path = tds[1].find('a')['href'] + self = cls.from_url('https://atcoder.jp/' + path) + assert self is not None + self._alphabet = tds[0].text + self._task_name = tds[1].text + self._time_limit_msec = int(float(utils.remove_suffix(tds[2].text, ' sec')) * 1000) + self._memory_limit_mb = int(utils.remove_suffix(tds[3].text, ' MB')) + assert tds[4].text.strip() in ('', 'Submit') + return self def download_sample_cases(self, session: Optional[requests.Session] = None) -> List[onlinejudge.type.TestCase]: session = session or utils.new_default_session() @@ -265,6 +314,9 @@ def get_url(self) -> str: def get_service(self) -> AtCoderService: return AtCoderService() + def get_contest(self) -> AtCoderContest: + return AtCoderContest(self.contest_id) + @classmethod def from_url(cls, s: str) -> Optional['AtCoderProblem']: # example: http://agc012.contest.atcoder.jp/tasks/agc012_d @@ -398,6 +450,33 @@ def _get_task_id(self, session: Optional[requests.Session] = None) -> int: self._task_id = int(m.group(1)) return self._task_id + def _load_details(self, session: Optional[requests.Session] = None) -> int: + raise NotImplementedError + + def get_task_name(self) -> str: + if self._task_name is None: + self._load_details() + assert self._task_name is not None + return self._task_name + + def get_time_limit_msec(self) -> int: + if self._time_limit_msec is None: + self._load_details() + assert self._time_limit_msec is not None + return self._time_limit_msec + + def get_memory_limit_mb(self) -> int: + if self._memory_limit_mb is None: + self._load_details() + assert self._memory_limit_mb is not None + return self._memory_limit_mb + + def get_alphabet(self) -> str: + if self._alphabet is None: + self._load_details() + assert self._alphabet is not None + return self._alphabet + class AtCoderSubmission(onlinejudge.type.Submission): def __init__(self, contest_id: str, submission_id: int, problem_id: Optional[str] = None): diff --git a/tests/service_atcoder.py b/tests/service_atcoder.py index 1dfdb377..cb660616 100644 --- a/tests/service_atcoder.py +++ b/tests/service_atcoder.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import unittest -from onlinejudge.service.atcoder import AtCoderService +from onlinejudge.service.atcoder import AtCoderContest, AtCoderProblem, AtCoderService, AtCoderSubmission class AtCoderSerivceTest(unittest.TestCase): @@ -28,5 +28,25 @@ def test_iterate_contests(self): self.assertEqual(contest.get_rated_range(), 'All') +class AtCoderContestTest(unittest.TestCase): + def test_from_url(self): + self.assertEqual(AtCoderContest.from_url('https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d').contest_id, 'kupc2014') + self.assertEqual(AtCoderContest.from_url('https://atcoder.jp/contests/agc030').contest_id, 'agc030') + self.assertEqual(AtCoderContest.from_url('https://atcoder.jp/contests/'), None) + + def test_list_problems(self): + contest = AtCoderContest('agc028') + problems = contest.list_problems() + self.assertEqual(len(problems), 7) + self.assertEqual(problems[0].get_alphabet(), 'A') + self.assertEqual(problems[0].get_task_name(), 'Two Abbreviations') + self.assertEqual(problems[0].get_time_limit_msec(), 2000) + self.assertEqual(problems[0].get_memory_limit_mb(), 1024) + self.assertEqual(problems[5].get_alphabet(), 'F') + self.assertEqual(problems[5].problem_id, 'agc028_f') + self.assertEqual(problems[6].get_alphabet(), 'F2') + self.assertEqual(problems[6].problem_id, 'agc028_f2') + + if __name__ == '__main__': unittest.main() From b88c649471aeaca4387269faa9333841a09382fd Mon Sep 17 00:00:00 2001 From: Kimiyuki Onaka Date: Tue, 19 Feb 2019 04:31:22 +0900 Subject: [PATCH 3/3] #318: add tests and fix a bug --- onlinejudge/service/atcoder.py | 2 +- tests/service_atcoder.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/onlinejudge/service/atcoder.py b/onlinejudge/service/atcoder.py index f734c755..ac42555a 100644 --- a/onlinejudge/service/atcoder.py +++ b/onlinejudge/service/atcoder.py @@ -508,7 +508,7 @@ def from_url(cls, s: str, problem_id: Optional[str] = None) -> Optional['AtCoder # example: https://beta.atcoder.jp/contests/abc073/submissions/1592381 m = re.match(r'^/contests/([\w\-_]+)/submissions/(\d+)$', utils.normpath(result.path)) if result.scheme in ('', 'http', 'https') \ - and result.netloc == ('atcoder.jp', 'beta.atcoder.jp') \ + and result.netloc in ('atcoder.jp', 'beta.atcoder.jp') \ and m: contest_id = m.group(1) try: diff --git a/tests/service_atcoder.py b/tests/service_atcoder.py index cb660616..6ca5d530 100644 --- a/tests/service_atcoder.py +++ b/tests/service_atcoder.py @@ -48,5 +48,21 @@ def test_list_problems(self): self.assertEqual(problems[6].problem_id, 'agc028_f2') +class AtCoderProblemTest(unittest.TestCase): + def test_from_url(self): + self.assertEqual(AtCoderProblem.from_url('https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d').contest_id, 'kupc2014') + self.assertEqual(AtCoderProblem.from_url('https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d').problem_id, 'kupc2014_d') + self.assertEqual(AtCoderProblem.from_url('https://atcoder.jp/contests/agc030/tasks/agc030_c').contest_id, 'agc030') + self.assertEqual(AtCoderProblem.from_url('https://atcoder.jp/contests/agc030/tasks/agc030_c').problem_id, 'agc030_c') + + +class AtCoderSubmissionTest(unittest.TestCase): + def test_from_url(self): + self.assertEqual(AtCoderSubmission.from_url('https://atcoder.jp/contests/kupc2012/submissions/2097011').contest_id, 'kupc2012') + self.assertEqual(AtCoderSubmission.from_url('https://atcoder.jp/contests/kupc2012/submissions/2097011').submission_id, 2097011) + self.assertEqual(AtCoderSubmission.from_url('https://qupc2014.contest.atcoder.jp/submissions/1444440').contest_id, 'qupc2014') + self.assertEqual(AtCoderSubmission.from_url('https://qupc2014.contest.atcoder.jp/submissions/1444440').submission_id, 1444440) + + if __name__ == '__main__': unittest.main()