Skip to content

Commit

Permalink
Merge pull request #358 from kmyk/issue/166
Browse files Browse the repository at this point in the history
#166 use the new version of AtCoder
  • Loading branch information
fukatani authored Mar 8, 2019
2 parents f57d196 + 2d38f8f commit 6a26353
Showing 1 changed file with 80 additions and 110 deletions.
190 changes: 80 additions & 110 deletions onlinejudge/service/atcoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@
from onlinejudge.type import *


def _list_alert(resp: requests.Response, soup: Optional[bs4.BeautifulSoup] = None, print_: bool = False) -> List[str]:
if soup is None:
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
msgs = [] # type: List[str]
for alert in soup.find_all('div', attrs={'role': 'alert'}):
msg = ' '.join([s.strip() for s in alert.strings if s.strip()])
if print_:
log.warning('AtCoder says: %s', msg)
msgs += [msg]
return msgs


def _request(*args, **kwargs):
"""
This is a workaround. AtCoder's servers sometime fail to send "Content-Type" field.
Expand All @@ -38,6 +50,7 @@ def _request(*args, **kwargs):
resp = utils.request(*args, **kwargs)
log.debug('AtCoder\'s server said "Content-Type: %s"', resp.headers.get('Content-Type', '(not sent)'))
resp.encoding = 'UTF-8'
_list_alert(resp, print_=True)
return resp


Expand All @@ -48,33 +61,39 @@ def login(self, get_credentials: onlinejudge.type.CredentialsProvider, session:
"""

session = session or utils.new_default_session()
url = 'https://practice.contest.atcoder.jp/login'
if self.is_logged_in(session=session):
return

# get
url = 'https://atcoder.jp/login'
resp = _request('GET', url, session=session, allow_redirects=False)
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
for msg in msgs:
log.status('message: %s', msg)
if msgs:
if 'login' not in resp.url:
return # redirect means that you are already logged in
else:
raise LoginError('something wrong: ' + str(msgs))

# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
form = soup.find('form', action='')
if not form:
raise LoginError('something wrong')

# post
username, password = get_credentials()
resp = _request('POST', url, session=session, data={'name': username, 'password': password}, allow_redirects=False)
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
AtCoderService._report_messages(msgs)
form = utils.FormSender(form, url=resp.url)
form.set('username', username)
form.set('password', password)
resp = form.request(session)
_list_alert(resp, print_=True)

# result
if 'login' not in resp.url:
pass # AtCoder redirects to the top page if success
log.success('Welcome,') # AtCoder redirects to the top page if success
else:
raise LoginError('your password may be not correct: ' + str(msgs))
log.failure('Username or Password is incorrect.')
raise LoginError

def is_logged_in(self, session: Optional[requests.Session] = None) -> bool:
session = session or utils.new_default_session()
url = 'https://practice.contest.atcoder.jp/login'
url = 'https://atcoder.jp/contests/practice/submit'
resp = _request('GET', url, session=session, allow_redirects=False)
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
return bool(msgs)
return resp.status_code == 200

def get_url(self) -> str:
return 'https://atcoder.jp/'
Expand All @@ -97,37 +116,6 @@ def from_url(cls, url: str) -> Optional['AtCoderService']:
return cls()
return None

@classmethod
def _get_messages_from_cookie(cls, cookies) -> List[str]:
msgtags = [] # type: List[str]
for cookie in cookies:
log.debug('cookie: %s', str(cookie))
if cookie.name.startswith('__message_'):
msg = json.loads(urllib.parse.unquote_plus(cookie.value))
msgtags += [msg['c']]
log.debug('message: %s: %s', cookie.name, str(msg))
msgs = [] # type: List[str]
for msgtag in msgtags:
soup = bs4.BeautifulSoup(msgtag, utils.html_parser)
msg = None
for tag in soup.find_all():
if tag.string and tag.string.strip():
msg = tag.string
break
if msg is None:
log.error('failed to parse message')
else:
msgs += [msg]
return msgs

@classmethod
def _report_messages(cls, msgs: List[str], unexpected: bool = False) -> bool:
for msg in msgs:
log.status('message: %s', msg)
if msgs and unexpected:
log.failure('unexpected messages found')
return bool(msgs)

def iterate_contests(self, lang: str = 'ja', session: Optional[requests.Session] = None) -> Generator['AtCoderContest', None, None]:
"""
:param lang: must be `ja` (default) or `en`.
Expand Down Expand Up @@ -292,8 +280,7 @@ class AtCoderProblem(onlinejudge.type.Problem):

def __init__(self, contest_id: str, problem_id: str):
self.contest_id = contest_id
self.problem_id = problem_id # TODO: fix the name, since AtCoder calls this as "task_screen_name"
self._task_id = None # type: Optional[int]
self.problem_id = problem_id # NOTE: AtCoder calls this as "task_screen_name"
self._task_name = None # type: Optional[str]
self._time_limit_msec = None # type: Optional[int]
self._memory_limit_byte = None # type: Optional[int]
Expand All @@ -316,14 +303,18 @@ def _from_table_row(cls, tr: bs4.Tag) -> 'AtCoderProblem':
return self

def download_sample_cases(self, session: Optional[requests.Session] = None) -> List[onlinejudge.type.TestCase]:
"""
:raises Exception: if no such problem exists
"""

session = session or utils.new_default_session()

# get
resp = _request('GET', self.get_url(), session=session)
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
if AtCoderService._report_messages(msgs, unexpected=True):
# example message: "message: You cannot see this page."
resp = _request('GET', self.get_url(type='beta'), raise_for_status=False, session=session)
if _list_alert(resp):
log.warning('are you logged in?')
return []
resp.raise_for_status()

# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
samples = onlinejudge._implementation.testcase_zipper.SampleZipper()
Expand Down Expand Up @@ -418,12 +409,18 @@ def from_url(cls, s: str) -> Optional['AtCoderProblem']:
return None

def get_input_format(self, session: Optional[requests.Session] = None) -> str:
"""
:raises Exception: if no such problem exists
"""

session = session or utils.new_default_session()

# get
resp = _request('GET', self.get_url(type='old'), session=session)
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
if AtCoderService._report_messages(msgs, unexpected=True):
return ''
resp = _request('GET', self.get_url(type='beta'), raise_for_status=False, session=session)
if _list_alert(resp):
log.warning('are you logged in?')
resp.raise_for_status()

# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
for h3 in soup.find_all('h3'):
Expand All @@ -445,20 +442,19 @@ def get_available_languages(self, session: Optional[requests.Session] = None) ->
:raises NotLoggedInError:
"""
session = session or utils.new_default_session()

# get
url = 'http://{}.contest.atcoder.jp/submit'.format(self.contest_id)
resp = _request('GET', url, session=session)
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
if AtCoderService._report_messages(msgs, unexpected=True):
return []
# check whether logged in
path = utils.normpath(urllib.parse.urlparse(resp.url).path)
if path.startswith('/login'):
resp = _request('GET', self.get_url(type='beta'), session=session)

# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
form = soup.find('form', action='/contests/{}/submit'.format(self.contest_id))
if form is None:
log.error('not logged in')
raise NotLoggedInError

# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
select = soup.find('select', class_='submit-language-selector') # NOTE: AtCoder can vary languages depending on tasks, even in one contest. here, ignores this fact.
select = form.find('div', id='select-lang').find('select', attrs={'name': 'data.LanguageId'}) # NOTE: AtCoder can vary languages depending on tasks, even in one contest. here, ignores this fact.
languages = [] # type: List[Language]
for option in select.find_all('option'):
languages += [Language(option.attrs['value'], option.string)]
Expand All @@ -469,68 +465,42 @@ def submit_code(self, code: bytes, language_id: LanguageId, filename: Optional[s
:raises NotLoggedInError:
:raises SubmissionError:
"""

assert language_id in [language.id for language in self.get_available_languages(session=session)]
session = session or utils.new_default_session()

# get
url = 'http://{}.contest.atcoder.jp/submit'.format(self.contest_id) # TODO: use beta.atcoder.jp
url = 'https://atcoder.jp/contests/{}/submit'.format(self.contest_id)
resp = _request('GET', url, session=session)
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
if AtCoderService._report_messages(msgs, unexpected=True):
raise SubmissionError

# check whether logged in
path = utils.normpath(urllib.parse.urlparse(resp.url).path)
if path.startswith('/login'):
log.error('not logged in')
if 'login' in resp.url:
raise NotLoggedInError

# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
form = soup.find('form', action=re.compile(r'^/submit\?task_id='))
form = soup.find('form', action='/contests/{}/submit'.format(self.contest_id))
if not form:
log.error('form not found')
raise SubmissionError
raise SubmissionError('something wrong')
log.debug('form: %s', str(form))

# post
task_id = self._get_task_id(session=session)
form = utils.FormSender(form, url=resp.url)
form.set('task_id', str(task_id))
form.set('source_code', code)
form.set('language_id_{}'.format(task_id), str(language_id))
form.set('data.TaskScreenName', self.problem_id)
form.set('data.LanguageId', str(language_id))
form.set('sourceCode', code)
resp = form.request(session=session)
resp.raise_for_status()
_list_alert(resp, print_=True)

# result
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
AtCoderService._report_messages(msgs)
if '/submissions/me' in resp.url:
# example: https://practice.contest.atcoder.jp/submissions/me#32174
# CAUTION: this URL is not a URL of the submission
log.success('success: result: %s', resp.url)
# NOTE: ignore the returned legacy URL and use beta.atcoder.jp's one
url = 'https://beta.atcoder.jp/contests/{}/submissions/me'.format(self.contest_id)
return utils.DummySubmission(url, problem=self)
return utils.DummySubmission(resp.url, problem=self)
else:
log.failure('failure')
log.debug('redirected to %s', resp.url)
raise SubmissionError('it may be a rate limit')

def _get_task_id(self, session: Optional[requests.Session] = None) -> int:
if self._task_id is None:
session = session or utils.new_default_session()
# get
resp = _request('GET', self.get_url(type='old'), session=session)
msgs = AtCoderService._get_messages_from_cookie(resp.cookies)
if AtCoderService._report_messages(msgs, unexpected=True):
raise SubmissionError
# parse
soup = bs4.BeautifulSoup(resp.content.decode(resp.encoding), utils.html_parser)
submit = soup.find('a', href=re.compile(r'^/submit\?task_id='))
if not submit:
log.error('link to submit not found')
raise SubmissionError
m = re.match(r'^/submit\?task_id=([0-9]+)$', submit.attrs['href'])
assert m
self._task_id = int(m.group(1))
return self._task_id

def _load_details(self, session: Optional[requests.Session] = None) -> None:
session = session or utils.new_default_session()

Expand Down

0 comments on commit 6a26353

Please sign in to comment.