Skip to content

Commit

Permalink
Merge pull request #325 from kmyk/atcoder/list
Browse files Browse the repository at this point in the history
#318 #317 add features to list contests and problems of AtCoder
  • Loading branch information
fukatani authored Feb 20, 2019
2 parents b25ba8c + b88c649 commit 77d505a
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 2 deletions.
9 changes: 9 additions & 0 deletions onlinejudge/implementation/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Python Version: 3.x
# -*- coding: utf-8 -*-
import contextlib
import datetime
import distutils.version
import http.client
import http.cookiejar
Expand Down Expand Up @@ -247,3 +248,11 @@ def is_update_available_on_pypi() -> bool:
a = distutils.version.StrictVersion(version.__version__)
b = distutils.version.StrictVersion(get_latest_version_from_pypi())
return a < b


def remove_suffix(s: str, suffix: str) -> str:
assert s.endswith(suffix)
return s[:-len(suffix)]


tzinfo_jst = datetime.timezone(datetime.timedelta(hours=+9), 'JST')
178 changes: 176 additions & 2 deletions onlinejudge/service/atcoder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Python Version: 3.x
# -*- coding: utf-8 -*-
import datetime
import itertools
import json
import posixpath
import re
Expand Down Expand Up @@ -98,12 +100,154 @@ def _report_messages(cls, msgs: List[str], unexpected: bool = False) -> bool:
log.failure('unexpected messages found')
return bool(msgs)

def iterate_contests(self, lang: str = 'ja', session: Optional[requests.Session] = None) -> Generator['AtCoderContest', None, None]:
assert lang in ('ja', 'en') # NOTE: "lang=ja" is required to see some Japanese-local contests. However you can use "lang=en" to see the English names of contests.
session = session or utils.new_default_session()
last_page = None
for page in itertools.count(1): # 1-based
if last_page is not None and page > last_page:
break
# get
url = 'https://atcoder.jp/contests/archive?lang={}&page={}'.format(lang, page)
resp = _request('GET', url, session=session)
# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
if last_page is None:
last_page = int(soup.find('ul', class_='pagination').find_all('li')[-1].text)
log.debug('last page: %s', last_page)
tbody = soup.find('tbody')
for tr in tbody.find_all('tr'):
yield AtCoderContest._from_table_row(tr, lang=lang)

def get_user_history_url(self, user_id: str) -> str:
return 'https://atcoder.jp/users/{}/history/json'.format(user_id)


class AtCoderContest(object):
def __init__(self, contest_id: str):
self.contest_id = contest_id

# NOTE: some fields remain undefined, comparing `_from_table_row`
self._start_time_url = None # type: Optional[str]
self._contest_name_ja = None # type: Optional[str]
self._contest_name_en = None # type: Optional[str]
self._duration_text = None # type: Optional[str]
self._rated_range = None # type: Optional[str]

@classmethod
def from_url(cls, url: str) -> Optional['AtCoderContest']:
result = urllib.parse.urlparse(url)

# example: https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d
if result.scheme in ('', 'http', 'https') and result.hostname.endswith('.contest.atcoder.jp'):
contest_id = utils.remove_suffix(result.hostname, '.contest.atcoder.jp')
return cls(contest_id)

# example: https://atcoder.jp/contests/agc030
if result.scheme in ('', 'http', 'https') and result.hostname in ('atcoder.jp', 'beta.atcoder.jp'):
m = re.match(r'^/contests/([\w\-_]+)/?$', utils.normpath(result.path))
if m:
contest_id = m.group(1)
return cls(contest_id)

return None

@classmethod
def _from_table_row(cls, tr: bs4.Tag, lang: str) -> 'AtCoderContest':
tds = tr.find_all('td')
assert len(tds) == 4
anchors = [tds[0].find('a'), tds[1].find('a')]
contest_path = anchors[1]['href']
assert contest_path.startswith('/contests/')
contest_id = contest_path[len('/contests/'):]
self = AtCoderContest(contest_id)
self._start_time_url = anchors[0]['href']
if lang == 'ja':
self._contest_name_ja = anchors[1].text
elif lang == 'en':
self._contest_name_en = anchors[1].text
else:
assert False
self._duration_text = tds[2].text
self._rated_range = tds[3].text
return self

def get_start_time(self) -> datetime.datetime:
if self._start_time_url is None:
raise NotImplementedError
# TODO: we need to use an ISO-format parser
query = urllib.parse.parse_qs(urllib.parse.urlparse(self._start_time_url).query)
assert len(query['iso']) == 1
assert query['p1'] == ['248'] # means JST
return datetime.datetime.strptime(query['iso'][0], '%Y%m%dT%H%M').replace(tzinfo=utils.tzinfo_jst)

def get_contest_name(self, lang: Optional[str] = None) -> str:
if lang is None:
if self._contest_name_en is not None:
return self._contest_name_en
elif self._contest_name_ja is not None:
return self._contest_name_ja
else:
raise NotImplementedError
elif lang == 'en':
if self._contest_name_en is None:
raise NotImplementedError
return self._contest_name_en
elif lang == 'ja':
if self._contest_name_ja is None:
raise NotImplementedError
return self._contest_name_ja
else:
assert False

def get_duration(self) -> datetime.timedelta:
if self._duration_text is None:
raise NotImplementedError
hours, minutes = map(int, self._duration_text.split(':'))
return datetime.timedelta(hours=hours, minutes=minutes)

def get_rated_range(self) -> str:
if self._rated_range is None:
raise NotImplementedError
return self._rated_range

def list_problems(self, session: Optional[requests.Session] = None) -> List['AtCoderProblem']:
# get
session = session or utils.new_default_session()
url = 'https://atcoder.jp/contests/{}/tasks'.format(self.contest_id)
resp = _request('GET', url, session=session)

# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
tbody = soup.find('tbody')
return [AtCoderProblem._from_table_row(tr) for tr in tbody.find_all('tr')]


class AtCoderProblem(onlinejudge.type.Problem):
# AtCoder has problems independently from contests. Therefore the notions "contest_id", "alphabet", and "url" don't belong to problems itself.

def __init__(self, contest_id: str, problem_id: str):
self.contest_id = contest_id
self.problem_id = problem_id
self.problem_id = problem_id # TODO: fix the name, since AtCoder calls this as "task_screen_name"
self._task_id = None # type: Optional[int]
self._task_name = None # type: Optional[str]
self._time_limit_msec = None # type: Optional[int]
self._memory_limit_mb = None # type: Optional[int]
self._alphabet = None # type: Optional[str]

@classmethod
def _from_table_row(cls, tr: bs4.Tag) -> 'AtCoderProblem':
tds = tr.find_all('td')
assert len(tds) == 5
path = tds[1].find('a')['href']
self = cls.from_url('https://atcoder.jp/' + path)
assert self is not None
self._alphabet = tds[0].text
self._task_name = tds[1].text
self._time_limit_msec = int(float(utils.remove_suffix(tds[2].text, ' sec')) * 1000)
self._memory_limit_mb = int(utils.remove_suffix(tds[3].text, ' MB'))
assert tds[4].text.strip() in ('', 'Submit')
return self

def download_sample_cases(self, session: Optional[requests.Session] = None) -> List[onlinejudge.type.TestCase]:
session = session or utils.new_default_session()
Expand Down Expand Up @@ -170,6 +314,9 @@ def get_url(self) -> str:
def get_service(self) -> AtCoderService:
return AtCoderService()

def get_contest(self) -> AtCoderContest:
return AtCoderContest(self.contest_id)

@classmethod
def from_url(cls, s: str) -> Optional['AtCoderProblem']:
# example: http://agc012.contest.atcoder.jp/tasks/agc012_d
Expand Down Expand Up @@ -303,6 +450,33 @@ def _get_task_id(self, session: Optional[requests.Session] = None) -> int:
self._task_id = int(m.group(1))
return self._task_id

def _load_details(self, session: Optional[requests.Session] = None) -> int:
raise NotImplementedError

def get_task_name(self) -> str:
if self._task_name is None:
self._load_details()
assert self._task_name is not None
return self._task_name

def get_time_limit_msec(self) -> int:
if self._time_limit_msec is None:
self._load_details()
assert self._time_limit_msec is not None
return self._time_limit_msec

def get_memory_limit_mb(self) -> int:
if self._memory_limit_mb is None:
self._load_details()
assert self._memory_limit_mb is not None
return self._memory_limit_mb

def get_alphabet(self) -> str:
if self._alphabet is None:
self._load_details()
assert self._alphabet is not None
return self._alphabet


class AtCoderSubmission(onlinejudge.type.Submission):
def __init__(self, contest_id: str, submission_id: int, problem_id: Optional[str] = None):
Expand Down Expand Up @@ -334,7 +508,7 @@ def from_url(cls, s: str, problem_id: Optional[str] = None) -> Optional['AtCoder
# example: https://beta.atcoder.jp/contests/abc073/submissions/1592381
m = re.match(r'^/contests/([\w\-_]+)/submissions/(\d+)$', utils.normpath(result.path))
if result.scheme in ('', 'http', 'https') \
and result.netloc == ('atcoder.jp', 'beta.atcoder.jp') \
and result.netloc in ('atcoder.jp', 'beta.atcoder.jp') \
and m:
contest_id = m.group(1)
try:
Expand Down
68 changes: 68 additions & 0 deletions tests/service_atcoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
import unittest

from onlinejudge.service.atcoder import AtCoderContest, AtCoderProblem, AtCoderService, AtCoderSubmission


class AtCoderSerivceTest(unittest.TestCase):
def test_from_url(self):
service = AtCoderService()
self.assertEqual(AtCoderService.from_url('https://atcoder.jp/'), service)
self.assertEqual(AtCoderService.from_url('https://beta.atcoder.jp/'), service)
self.assertEqual(AtCoderService.from_url('https://abc001.contest.atcoder.jp/'), service)
self.assertEqual(AtCoderService.from_url('https://atcoder.jp/contests/agc001/submissions/806160'), service)
self.assertEqual(AtCoderService.from_url('https://codeforces.com/'), None)

def test_iterate_contests(self):
contests = list(AtCoderService().iterate_contests())
contest_ids = [contest.contest_id for contest in contests]
self.assertIn('arc001', contest_ids)
self.assertIn('abc100', contest_ids)
self.assertIn('kupc2012', contest_ids)
contest, = [contest for contest in contests if contest.contest_id == 'utpc2013']
self.assertEqual(contest.get_start_time().year, 2014)
self.assertEqual(contest.get_start_time().month, 3)
self.assertEqual(contest.get_start_time().day, 2)
self.assertEqual(contest.get_contest_name(), '東京大学プログラミングコンテスト2013')
self.assertEqual(contest.get_duration().total_seconds(), 5 * 60 * 60)
self.assertEqual(contest.get_rated_range(), 'All')


class AtCoderContestTest(unittest.TestCase):
def test_from_url(self):
self.assertEqual(AtCoderContest.from_url('https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d').contest_id, 'kupc2014')
self.assertEqual(AtCoderContest.from_url('https://atcoder.jp/contests/agc030').contest_id, 'agc030')
self.assertEqual(AtCoderContest.from_url('https://atcoder.jp/contests/'), None)

def test_list_problems(self):
contest = AtCoderContest('agc028')
problems = contest.list_problems()
self.assertEqual(len(problems), 7)
self.assertEqual(problems[0].get_alphabet(), 'A')
self.assertEqual(problems[0].get_task_name(), 'Two Abbreviations')
self.assertEqual(problems[0].get_time_limit_msec(), 2000)
self.assertEqual(problems[0].get_memory_limit_mb(), 1024)
self.assertEqual(problems[5].get_alphabet(), 'F')
self.assertEqual(problems[5].problem_id, 'agc028_f')
self.assertEqual(problems[6].get_alphabet(), 'F2')
self.assertEqual(problems[6].problem_id, 'agc028_f2')


class AtCoderProblemTest(unittest.TestCase):
def test_from_url(self):
self.assertEqual(AtCoderProblem.from_url('https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d').contest_id, 'kupc2014')
self.assertEqual(AtCoderProblem.from_url('https://kupc2014.contest.atcoder.jp/tasks/kupc2014_d').problem_id, 'kupc2014_d')
self.assertEqual(AtCoderProblem.from_url('https://atcoder.jp/contests/agc030/tasks/agc030_c').contest_id, 'agc030')
self.assertEqual(AtCoderProblem.from_url('https://atcoder.jp/contests/agc030/tasks/agc030_c').problem_id, 'agc030_c')


class AtCoderSubmissionTest(unittest.TestCase):
def test_from_url(self):
self.assertEqual(AtCoderSubmission.from_url('https://atcoder.jp/contests/kupc2012/submissions/2097011').contest_id, 'kupc2012')
self.assertEqual(AtCoderSubmission.from_url('https://atcoder.jp/contests/kupc2012/submissions/2097011').submission_id, 2097011)
self.assertEqual(AtCoderSubmission.from_url('https://qupc2014.contest.atcoder.jp/submissions/1444440').contest_id, 'qupc2014')
self.assertEqual(AtCoderSubmission.from_url('https://qupc2014.contest.atcoder.jp/submissions/1444440').submission_id, 1444440)


if __name__ == '__main__':
unittest.main()

0 comments on commit 77d505a

Please sign in to comment.