-
Notifications
You must be signed in to change notification settings - Fork 19
/
netflix.py
1556 lines (1225 loc) · 60.5 KB
/
netflix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Netflix
监听奈飞(netflix)密码变更邮件,自动重置密码。
流程:实时监听邮件,发现有人修改了密码 -> 访问奈飞,点击忘记密码 -> 等待接收奈飞的重置密码邮件 -> 收到重置密码邮件,访问邮件内的链接,
进行密码重置操作,恢复初始密码
@author mybsdc <[email protected]>
@date 2021/6/29
@time 11:20
"""
import os
import sys
import time
import argparse
import random
import string
import json
import re
import datetime
import traceback
from functools import reduce, wraps
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.remote.webelement import WebElement
from dotenv import load_dotenv
from loguru import logger
import imaplib
import email
from email.header import decode_header
import redis
import ssl
import smtplib
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.utils import formataddr
from email import encoders
from utils.version import __version__
def catch_exception(origin_func):
"""
用于异常捕获的装饰器
:param origin_func:
:return:
"""
def wrapper(*args, **kwargs):
try:
return origin_func(*args, **kwargs)
except AssertionError as e:
logger.error(f'参数错误:{str(e)}')
except NoSuchElementException as e:
logger.error('匹配元素超时,超过 {} 秒依然没有发现元素:{}', Netflix.TIMEOUT, str(e))
except TimeoutException as e:
logger.error(f'查找元素超时或请求超时:{str(e)} [{Netflix.driver.current_url}]')
except WebDriverException as e:
logger.error(f'未知错误:{str(e)}')
except Exception as e:
logger.error('出错:{} 位置:{}', str(e), traceback.format_exc())
finally:
Netflix.driver.quit()
logger.info('已关闭浏览器,释放资源占用')
return wrapper
class Netflix(object):
# 超时秒数
# 如果同时设置了显式等待和隐式等待,则 webdriver 会取二者中更大的时间
TIMEOUT = 24
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36'
LOGIN_URL = 'https://www.netflix.com/login'
LOGOUT_URL = 'https://www.netflix.com/SignOut?lnkctr=mL'
RESET_PASSWORD_URL = 'https://www.netflix.com/password'
FORGOT_PASSWORD_URL = 'https://www.netflix.com/LoginHelp'
MANAGE_PROFILES_URL = 'https://www.netflix.com/ManageProfiles'
BROWSE_URL = 'https://www.netflix.com/browse'
ACCOUNT_URL = 'https://www.netflix.com/YourAccount' # 账户管理地址
# 请求重置密码的邮件正则
RESET_MAIL_REGEX = re.compile(r'accountaccess.*?URL_ACCOUNT_ACCESS', re.I)
# 提取完成密码重置的链接正则
RESET_URL_REGEX = re.compile(r'https://www\.netflix\.com/password[^]]+', re.I)
# 密码被重置邮件正则
PWD_HAS_BEEN_CHANGED_REGEX = re.compile(
r'https?://.*?netflix\.com/YourAccount\?(?:lnktrk=EMP&g=[^&]+&lkid=URL_YOUR_ACCOUNT_2|g=[^&]+&lkid=URL_YOUR_ACCOUNT&lnktrk=EVO)', re.I)
# 奈飞强迫用户修改密码
FORCE_CHANGE_PASSWORD_REGEX = re.compile(r'https?://www\.netflix\.com/LoginHelp.*?lkid=URL_LOGIN_HELP', re.I)
MAIL_SYMBOL_REGEX = re.compile('{(?!})|(?<!{)}')
def __init__(self):
Netflix.check_py_version()
# 命令行参数
self.args = self.get_all_args()
# 加载环境变量
if not os.path.exists('.env'):
raise Exception('.env 文件不存在,请复制 .env.example 为 .env 文件')
load_dotenv(verbose=True, override=True, encoding='utf-8')
# 日志
self.__logger_setting()
self.options = webdriver.ChromeOptions()
self.options.add_argument(f'user-agent={Netflix.USER_AGENT}')
self.options.add_experimental_option('excludeSwitches', ['enable-automation'])
self.options.add_experimental_option('useAutomationExtension', False)
self.options.add_argument('--disable-extensions') # 禁用扩展
self.options.add_argument('--profile-directory=Default')
self.options.add_argument('--incognito') # 隐身模式
self.options.add_argument('--disable-plugins-discovery')
# self.options.add_argument('--start-maximized')
self.options.add_argument('--window-size=1366,768')
if self.args.headless or self.args.test:
self.options.add_argument('--headless') # 启用无头模式
self.options.add_argument('--disable-gpu') # 谷歌官方文档说加上此参数可减少 bug,仅适用于 Windows 系统
# 解决 unknown error: DevToolsActivePort file doesn't exist
self.options.add_argument('--no-sandbox') # 绕过操作系统沙箱环境
self.options.add_argument('--disable-dev-shm-usage') # 解决资源限制,仅适用于 Linux 系统
self.options.add_argument('--disable-blink-features=AutomationControlled') # Chrome v88 以上版本正确隐藏浏览器特征
self.driver = webdriver.Chrome(executable_path=os.getenv('DRIVER_EXECUTABLE_FILE'), options=self.options)
# self.driver.implicitly_wait(Netflix.TIMEOUT) # 不再指定隐式等待时间,防止显示等待与隐式等待混用导致等待时间混乱问题
# 防止通过 window.navigator.webdriver === true 检测模拟浏览器
# 注意,低于 Chrome v88 (不含) 的浏览器可用此处代码隐藏 Web Driver 特征
# 参考:
# https://www.selenium.dev/selenium/docs/api/py/webdriver_chrome/selenium.webdriver.chrome.webdriver.html#selenium.webdriver.chrome.webdriver.WebDriver.execute_cdp_cmd
# https://chromedevtools.github.io/devtools-protocol/tot/Page/#method-addScriptToEvaluateOnNewDocument
# self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
# "source": """
# Object.defineProperty(navigator, 'webdriver', {
# get: () => undefined
# })
# """
# })
# 隐藏无头浏览器特征,增加检测难度
with open('resources/stealth.min.js') as f:
stealth_js = f.read()
self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': stealth_js
})
# 通用显式等待实例
self.wait = WebDriverWait(self.driver, timeout=Netflix.TIMEOUT, poll_frequency=0.5)
# 测试无头浏览器特征是否正确隐藏
if self.args.test:
logger.info('测试过程将只以无头模式进行')
logger.info('开始测试无头浏览器特征是否正确隐藏')
self.driver.get('https://bot.sannysoft.com/')
time.sleep(3.5)
filename = 'bot_test.png'
self.__screenshot(filename, True)
self.driver.quit()
logger.info(f'已测试完成,测试结果保存在 {filename}')
exit(0)
self.BOT_MAIL_USERNAME = os.getenv('BOT_MAIL_USERNAME')
assert self.BOT_MAIL_USERNAME, '请在 .env 文件配置 BOT_MAIL_USERNAME 的值,程式将监听此邮箱中的邮件内容'
self.BOT_MAIL_PASSWORD = os.getenv('BOT_MAIL_PASSWORD')
assert self.BOT_MAIL_PASSWORD, '请在 .env 文件配置 BOT_MAIL_PASSWORD 的值,程式用于登录被监听的邮箱'
self.MULTIPLE_NETFLIX_ACCOUNTS = Netflix._parse_multiple_accounts()
# 获取最近几天的邮件
self.day = 3
# 最多等待几分钟重置邮件的到来
self.max_wait_reset_mail_time = 10
# 恢复密码失败后最多重试几次
self.max_num_of_attempts = 12
self.first_time = []
self.today = Netflix.today_()
# 线程池
self.max_workers = self.args.max_workers
# Redis 配置
self.REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
self.REDIS_PORT = os.getenv('REDIS_PORT', 6379)
self.redis = None
@staticmethod
def _parse_multiple_accounts():
accounts = os.getenv('MULTIPLE_NETFLIX_ACCOUNTS')
match = re.findall(r'\[(?P<u>[^|\]]+?)\|(?P<p>[^|\]]+?)\|(?P<n>[^]]+?)\]', accounts, re.I)
if match:
return [{'u': item[0], 'p': item[1], 'n': item[2]} for item in match]
raise Exception('未配置 Netflix 账户')
@staticmethod
def format_time(time: str or int, format: str = '%m/%d %H:%M:%S') -> str:
return datetime.datetime.fromtimestamp(time).strftime(format)
@staticmethod
def today_():
return str(datetime.date.today())
def __logger_setting(self) -> None:
logger.remove()
level = 'DEBUG' if self.args.debug or int(os.getenv('DEBUG', 0)) else 'INFO'
format = '<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green> <b><level>{level: <8}</level></b> | <cyan>{process.id}</cyan>:<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>'
logger.add('logs/{time:YYYY-MM-DD}.log', level=level, format=format, encoding='utf-8')
logger.add(sys.stderr, colorize=True, level=level, format=format)
@staticmethod
def check_py_version(major=3, minor=7):
if sys.version_info < (major, minor):
raise UserWarning(f'请使用 python {major}.{minor} 及以上版本,推荐使用 python 3.9.8')
@staticmethod
def get_all_args():
"""
获取所有命令行参数
:return:
"""
parser = argparse.ArgumentParser(description='Netflix 的各种参数及其含义', epilog='')
parser.add_argument('-mw', '--max_workers', help='最大线程数', default=1, type=int)
parser.add_argument('-d', '--debug', help='是否开启 Debug 模式', action='store_true')
parser.add_argument('-f', '--force', help='是否强制执行,当然也要满足有“新的密码被重置的邮件”的条件', action='store_true')
parser.add_argument('-t', '--test', help='测试无头浏览器特征是否正确隐藏', action='store_true')
parser.add_argument('-hl', '--headless', help='是否启用无头模式', action='store_true')
return parser.parse_args()
def find_element_by_id(self, id: str, timeout: int or float = 24.0, ignored_exceptions=None,
poll_frequency: int or float or None = None, message: str or None = None,
scroll_into_view: bool = False, block: str = 'start') -> WebElement:
"""
根据 id 查找元素
元素必须是已加载且可见才能寻到,如若未指定超时时间和异常处理等相关参数,则优先使用前期准备好的 WebDriverWait 实例,不再重复实例化
:param id:
:param timeout:
:param ignored_exceptions:
:param poll_frequency:
:param message:
:param scroll_into_view: 是否将元素滚动到可视范围内
:param block: 定义垂直对齐方式,仅当 scroll_into_view 为 True 时才有意义,支持 start center end nearest
:return:
"""
message = f'查找 id 为 {id} 的元素未果' if not message else message
if not ignored_exceptions and timeout == Netflix.TIMEOUT and not poll_frequency:
el = self.wait.until(EC.visibility_of_element_located((By.ID, id)), message)
else:
el = WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency if poll_frequency else 0.5,
ignored_exceptions=ignored_exceptions).until(
EC.visibility_of_element_located((By.ID, id)), message)
if scroll_into_view:
self.scroll_page_until_el_is_visible(el, block)
return el
def find_element_by_class_name(self, class_name: str, timeout: int or float = 24.0, ignored_exceptions=None,
poll_frequency: int or float or None = None, message: str or None = None,
scroll_into_view: bool = False, block: str = 'start') -> WebElement:
"""
根据 class name 查找元素
元素必须是已加载且可见才能寻到,如若未指定超时时间和异常处理等相关参数,则优先使用前期准备好的 WebDriverWait 实例,不再重复实例化
:param class_name:
:param timeout:
:param ignored_exceptions:
:param poll_frequency:
:param message:
:param scroll_into_view: 是否将元素滚动到可视范围内
:param block: 定义垂直对齐方式,仅当 scroll_into_view 为 True 时才有意义,支持 start center end nearest
:return:
"""
message = f'查找 class name 为 {class_name} 的元素未果' if not message else message
if not ignored_exceptions and timeout == Netflix.TIMEOUT and not poll_frequency:
el = self.wait.until(EC.visibility_of_element_located((By.CLASS_NAME, class_name)), message)
else:
el = WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency if poll_frequency else 0.5,
ignored_exceptions=ignored_exceptions).until(
EC.visibility_of_element_located((By.CLASS_NAME, class_name)), message)
if scroll_into_view:
self.scroll_page_until_el_is_visible(el, block)
return el
def find_element_by_xpath(self, xpath: str, timeout: int or float = 24.0, ignored_exceptions=None,
poll_frequency: int or float or None = None, message: str or None = None,
scroll_into_view: bool = False, block: str = 'start') -> WebElement:
"""
根据 xpath 查找元素
元素必须是已加载且可见才能寻到,如若未指定超时时间和异常处理等相关参数,则优先使用前期准备好的 WebDriverWait 实例,不再重复实例化
:param xpath:
:param timeout:
:param ignored_exceptions:
:param poll_frequency:
:param message:
:param scroll_into_view: 是否将元素滚动到可视范围内
:param block: 定义垂直对齐方式,仅当 scroll_into_view 为 True 时才有意义,支持 start center end nearest
:return:
"""
message = f'查找 xpath 为 {xpath} 的元素未果' if not message else message
if not ignored_exceptions and timeout == Netflix.TIMEOUT and not poll_frequency:
el = self.wait.until(EC.visibility_of_element_located((By.XPATH, xpath)), message)
else:
el = WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency if poll_frequency else 0.5,
ignored_exceptions=ignored_exceptions).until(
EC.visibility_of_element_located((By.XPATH, xpath)), message)
if scroll_into_view:
self.scroll_page_until_el_is_visible(el, block)
return el
def find_element_by_tag_name(self, tag_name: str, timeout: int or float = 24.0, ignored_exceptions=None,
poll_frequency: int or float or None = None, message: str or None = None,
scroll_into_view: bool = False, block: str = 'start') -> WebElement:
"""
根据 tag name 查找元素
元素必须是已加载且可见才能寻到,如若未指定超时时间和异常处理等相关参数,则优先使用前期准备好的 WebDriverWait 实例,不再重复实例化
:param tag_name:
:param timeout:
:param ignored_exceptions:
:param poll_frequency:
:param message:
:param scroll_into_view: 是否将元素滚动到可视范围内
:param block: 定义垂直对齐方式,仅当 scroll_into_view 为 True 时才有意义,支持 start center end nearest
:return:
"""
message = f'查找 tag name 为 {tag_name} 的元素未果' if not message else message
if not ignored_exceptions and timeout == Netflix.TIMEOUT and not poll_frequency:
el = self.wait.until(EC.visibility_of_element_located((By.TAG_NAME, tag_name)), message)
else:
el = WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency if poll_frequency else 0.5,
ignored_exceptions=ignored_exceptions).until(
EC.visibility_of_element_located((By.TAG_NAME, tag_name)), message)
if scroll_into_view:
self.scroll_page_until_el_is_visible(el, block)
return el
def scroll_page_until_el_is_visible(self, el: WebElement, block: str = 'start') -> None:
"""
滚动直到元素可见
按钮需要滚动直到可见,否则无法点击
参考:https://developer.mozilla.org/zh-CN/docs/Web/API/Element/scrollIntoView
:param el:
:param block: 定义垂直方向的对齐,“start”、“center”、“end”, 或“nearest”之一。默认为 start
:return:
"""
self.driver.execute_script('arguments[0].scrollIntoView({{block: "{}"}});'.format(block), el)
def _login(self, u: str, p: str, n: str = '') -> tuple:
"""
登录
:param u:
:param p:
:param n:
:return:
"""
logger.debug('尝试登录账户:{}', u)
# 多账户,每次登录前需要清除 cookies
self.driver.delete_all_cookies()
self.driver.get(Netflix.LOGIN_URL)
username_input_el = self.find_element_by_id('id_userLoginId')
username_input_el.clear()
username_input_el.send_keys(u)
time.sleep(1.1)
pwd_input_el = self.find_element_by_id('id_password')
pwd_input_el.clear()
pwd_input_el.send_keys(p)
self.find_element_by_class_name('login-button').click()
if self.has_unknown_error_alert():
raise UserWarning(f'账户 {u} 可能正处于风控期间,无法登录,本次操作将被忽略')
try:
WebDriverWait(self.driver, timeout=3, poll_frequency=0.94).until(lambda d: 'browse' in d.current_url)
except Exception:
self.find_element_by_xpath('//a[@data-uia="header-signout-link"]', message='查找登出元素未果')
logger.warning(f'当前账户可能非 Netflix 会员,本次登录没有意义')
logger.debug(f'已成功登录。当前地址为:{self.driver.current_url}')
return u, p, n
def __forgot_password(self, netflix_username: str):
"""
忘记密码
:param netflix_username:
:return:
"""
logger.info('尝试忘记密码')
self.driver.delete_all_cookies()
self.driver.get(Netflix.FORGOT_PASSWORD_URL)
forgot_pwd = self.find_element_by_id('forgot_password_input')
forgot_pwd.clear()
forgot_pwd.send_keys(netflix_username)
time.sleep(1)
self.handle_event(self.click_forgot_pwd_btn, max_num_of_attempts=12)
# 直到页面显示已发送邮件
logger.debug('检测是否已到送信完成画面')
self.find_element_by_xpath('//*[@class="login-content"]//h2[@data-uia="email_sent_label"]',
message='查找送信完成元素未果')
logger.info('已发送重置密码邮件到 {},注意查收', netflix_username)
return True
def click_forgot_pwd_btn(self):
"""
点击忘记密码按钮
:return:
"""
self.find_element_by_class_name('forgot-password-action-button').click()
def __reset_password(self, curr_netflix_password: str, new_netflix_password: str):
"""
账户内修改密码
:param curr_netflix_password:
:param new_netflix_password:
:return:
"""
try:
self.driver.get(Netflix.RESET_PASSWORD_URL)
curr_pwd = self.find_element_by_id('id_currentPassword')
curr_pwd.clear()
curr_pwd.send_keys(curr_netflix_password)
time.sleep(1)
new_pwd = self.find_element_by_id('id_newPassword')
new_pwd.clear()
new_pwd.send_keys(new_netflix_password)
time.sleep(1)
confirm_new_pwd = self.find_element_by_id('id_confirmNewPassword')
confirm_new_pwd.clear()
confirm_new_pwd.send_keys(new_netflix_password)
time.sleep(1.1)
# 其它设备无需重新登录
self.find_element_by_xpath('//li[@data-uia="field-requireAllDevicesSignIn+wrapper"]').click()
time.sleep(1)
self.handle_event(self.click_submit_btn)
return self.__pwd_change_result()
except Exception as e:
raise Exception(f'直接在账户内修改密码出错:' + str(e))
def input_pwd(self, new_netflix_password: str) -> None:
"""
输入密码
:param new_netflix_password:
:return:
"""
new_pwd = self.find_element_by_id('id_newPassword')
new_pwd.clear()
new_pwd.send_keys(new_netflix_password)
time.sleep(2)
confirm_new_pwd = self.find_element_by_id('id_confirmNewPassword')
confirm_new_pwd.clear()
confirm_new_pwd.send_keys(new_netflix_password)
time.sleep(1)
def click_submit_btn(self):
"""
点击提交输入的密码
:return:
"""
self.find_element_by_id('btn-save').click()
def element_visibility_of(self, xpath: str, verify_val: bool = False,
max_num_of_attempts: int = 3, el_wait_time: int = 2) -> WebElement or None:
"""
元素是否存在且可见
适用于在已经加载完的网页做检测,可见且存在则返回元素,否则返回 None
:param xpath:
:param verify_val: 如果传入 True,则验证元素是否有值,或者 inner HTML 不为空,并作为关联条件
:param max_num_of_attempts: 最大尝试次数,由于有的元素的值可能是异步加载的,需要多次尝试是否能获取到值,每次获取间隔休眠次数秒
:param el_wait_time: 等待时间,查找元素最多等待多少秒,默认 2 秒
:return:
"""
try:
# 此处只为找到元素,如果下面不需要验证元素是否有值的话,则使用此处找到的元素
# 否则下面验值逻辑会重新找到该元素以使用,此处的 el 会被覆盖
el = self.find_element_by_xpath(xpath, timeout=el_wait_time)
num = 0
while True:
if not verify_val:
break
# 需要每次循环找到此元素,以确定元素的值是否发生变化
el = self.find_element_by_xpath(xpath, timeout=1)
if el.tag_name == 'input':
val = el.get_attribute('value')
if val and len(val) > 0:
break
elif el.text != '':
break
# 多次尝试无果则放弃
if num > max_num_of_attempts:
break
num += 1
time.sleep(num)
return el
except Exception:
return None
def has_unknown_error_alert(self, error_el_xpath: str = '//div[@class="ui-message-contents"]') -> bool:
"""
页面提示未知错误
:return:
"""
error_tips_el = self.element_visibility_of(error_el_xpath, True)
if error_tips_el:
# 密码修改成功画面的提示语与错误提示语共用的同一个元素,防止误报
if 'YourAccount?confirm=password' in self.driver.current_url or 'Your password has been changed' in error_tips_el.text:
return False
logger.warning(f'页面出现未知错误:{error_tips_el.text}')
return True
return False
def handle_event(self, func, error_el_xpath='//div[@class="ui-message-contents"]', max_num_of_attempts: int = 10):
"""
处理事件,一般是单个点击事件
在某些画面点击提交的时候,有可能报未知错误,需要稍等片刻再点击或者重新触发一系列事件后才正常
:param func:
:param max_num_of_attempts:
:return:
"""
func()
num = 0
while True:
if self.has_unknown_error_alert(error_el_xpath):
func()
if num >= max_num_of_attempts:
raise Exception('处理未知错误失败')
num += 1
logger.debug(f'程式将休眠 {num} 秒后重试,最多不超过 {max_num_of_attempts} 次 [{num}/{max_num_of_attempts}]')
time.sleep(num)
else:
break
def __reset_password_via_mail(self, reset_url: str, new_netflix_password: str) -> bool:
"""
通过邮件重置密码
:param reset_url:
:param new_netflix_password:
:return:
"""
logger.info('尝试通过邮件内的重置密码链接进行密码重置操作')
self.driver.delete_all_cookies()
self.driver.get(reset_url)
self.input_pwd(new_netflix_password)
self.handle_event(self.click_submit_btn)
# 如果奈飞提示密码曾经用过,则应该先改为随机密码,然后再改回来
pwd_error_tips = self.element_visibility_of('//div[@data-uia="field-newPassword+error"]')
if pwd_error_tips:
logger.warning('疑似 Netflix 提示你不能使用以前的密码(由于各种错误提示所在的 页面元素 相同,故无法准确判断,但是程式会妥善处理,不用担心)')
logger.warning(f'原始的提示语为 {pwd_error_tips.text},故程式将尝试先改为随机密码,然后再改回正常密码。')
random_pwd = self.gen_random_pwd()
self.input_pwd(random_pwd)
self.handle_event(self.click_submit_btn)
self.__pwd_change_result()
# 账户内直接将密码改回原始值
logger.info('尝试在账户内直接将密码改回原始密码')
return self.__reset_password(random_pwd, new_netflix_password)
return self.__pwd_change_result()
def __pwd_change_result(self):
"""
断言密码修改结果
:return:
"""
try:
self.wait.until(lambda d: d.current_url == 'https://www.netflix.com/YourAccount?confirm=password')
logger.info('已成功修改密码')
return True
except Exception as e:
raise Exception(f'未能正确跳到密码修改成功画面,疑似未成功,抛出异常:' + str(e))
@staticmethod
def parse_mail(data: bytes, onlySubject: bool = False) -> dict or str:
"""
解析邮件内容
:param data:
:param onlySubject:
:return:
"""
resp = {
'subject': '',
'from': '',
'date': '',
'text': '',
'html': ''
}
# 将字节邮件转换为一个 message 对象
msg = email.message_from_bytes(data)
# 解码邮件主题
subject, encoding = decode_header(msg['Subject'])[0]
if isinstance(subject, bytes):
# 如果是字节类型,则解码为字符串
subject = subject.decode(encoding)
if onlySubject:
return subject
# 解码邮件发送者
from_, encoding = decode_header(msg.get('From'))[0]
if isinstance(from_, bytes):
from_ = from_.decode(encoding)
# 解码送信日期
date, encoding = decode_header(msg.get('Date'))[0]
if isinstance(date, bytes):
date = date.decode(encoding)
logger.debug(f'\nSubject: {subject}\nFrom: {from_}\nDate: {date}')
resp['subject'] = subject
resp['from'] = from_
resp['date'] = date
# 邮件可能有多个部分,比如可能有 html、纯文本、附件 三个部分
if msg.is_multipart():
# 遍历邮件的各部分
for part in msg.walk():
# 获取邮件内容类型
content_type = part.get_content_type()
content_disposition = str(part.get('Content-Disposition'))
if 'attachment' in content_disposition:
# 附件,暂不处理
# filename = part.get_filename()
# if filename:
# open(filename, 'wb').write(part.get_payload(decode=True))
continue
try:
# 获取邮件正文
body = part.get_payload(decode=True).decode()
except Exception as e:
continue
if content_type == 'text/plain':
resp['text'] = body
elif content_type == 'text/html':
resp['html'] = body
else:
content_type = msg.get_content_type()
body = msg.get_payload(decode=True).decode()
if content_type == 'text/plain':
resp['text'] = body
elif content_type == 'text/html':
# 可以选择将 html 写入文件以便预览,此处暂且不处理,直接给内容
resp['html'] = body
return resp
@staticmethod
def is_password_reset_result(text: str) -> bool:
"""
是否密码重置结果邮件
:param text:
:return:
"""
return Netflix.PWD_HAS_BEEN_CHANGED_REGEX.search(text) is not None
@staticmethod
def is_password_reset_request(text: str):
"""
是否请求重置密码的邮件
:param text:
:return:
"""
return Netflix.RESET_MAIL_REGEX.search(text) is not None
@staticmethod
def is_force_change_password_request(text: str):
"""
是否奈飞强迫修改密码的邮件
:param text:
:return:
"""
return Netflix.FORCE_CHANGE_PASSWORD_REGEX.search(text) is not None
def get_mail_last_id(self, netflix_account_email: str):
"""
获取最新的邮件 ID
:param netflix_account_email:
:return:
"""
key_last_id = f'{netflix_account_email}.last_id'
last_id = self.redis.get(key_last_id) if self.redis.exists(key_last_id) else 0
return last_id
def set_mail_last_id(self, netflix_account_email: str, id: int) -> bool:
"""
设置最新的邮件 ID
:param netflix_account_email:
:param id:
:return:
"""
key_last_id = f'{netflix_account_email}.last_id'
self.redis.set(key_last_id, id)
return True
def is_need_to_do(self, netflix_account_email: str) -> int:
"""
是否需要做处理
:param netflix_account_email:
:return:
"""
key_need_to_do = f'{netflix_account_email}.need_to_do'
need_to_do = self.redis.get(key_need_to_do) if self.redis.exists(key_need_to_do) else 1
return need_to_do
def set_need_to_do(self, netflix_account_email: str, status: int = 1) -> bool:
"""
设置是否需要做处理
:param netflix_account_email:
:param status: 1:需要 0:不需要
:return:
"""
key_need_to_do = f'{netflix_account_email}.need_to_do'
self.redis.set(key_need_to_do, status)
return True
def __fetch_mail(self, netflix_account_email: str, onlySubject: bool = False) -> str or None:
"""
拉取邮件
:param netflix_account_email:
:param onlySubject:
:return:
"""
logger.debug('尝试拉取最新邮件,以监听是否有重置密码相关的邮件')
with imaplib.IMAP4_SSL('imap.gmail.com', 993) as M:
M.login(self.BOT_MAIL_USERNAME, self.BOT_MAIL_PASSWORD)
status, total = M.select('INBOX', readonly=True) # readonly=True 则邮件将不会被标记为已读
# https://gist.github.com/martinrusev/6121028
# https://stackoverflow.com/questions/5621341/search-before-after-with-pythons-imaplib
after_date = (datetime.date.today() - datetime.timedelta(self.day)).strftime(
'%d-%b-%Y') # 仅需要最近 N 天的邮件,%b 表示字符月份
criteria = f'(TO "<{netflix_account_email}>" SENTSINCE "{after_date}")'
status, data = M.search(None, criteria)
if status != 'OK':
raise Exception('通过发信人以及送信时间过滤邮件时出错')
last_id = self.get_mail_last_id(netflix_account_email)
data = data[0].split()[::-1]
for num in data:
id = int(num)
if id <= last_id: # 只要最新未读的
continue
status, mail_data = M.fetch(num, '(RFC822)')
if status != 'OK':
logger.error(f'邮箱 {self.BOT_MAIL_USERNAME} 在为 {netflix_account_email} 拉取 ID 为 {id} 的邮件时出错')
continue
# 解析邮件
resp = Netflix.parse_mail(mail_data[0][1], onlySubject)
# 记录邮件 ID,之后此邮箱的此类型邮件必须大于此 ID 才有效,且此 ID 跟随 Netflix 账户
self.set_mail_last_id(netflix_account_email, id)
return resp
return None
def pwd_result_mail_listener(self, netflix_account_email: str):
"""
监听密码重置结果邮件
既可能是恶意用户,也可能是 Netflix 强迫用户重置密码而发来的邮件,借此触发我们后续流程
:param netflix_account_email:
:return:
"""
# 拉取最新邮件
resp = self.__fetch_mail(netflix_account_email)
if not resp:
return None
# 定义事件类型 0:未知 1:用户恶意修改密码 2:Netflix 强迫用户修改密码
event_type = 0
if Netflix.is_password_reset_result(resp['text']): # 检测到有用户恶意修改密码
logger.info('检测到有人修改了 Netflix 账户 {} 的密码', netflix_account_email)
event_type = 1
need_to_do = self.is_need_to_do(netflix_account_email)
if not need_to_do:
logger.info('今次检测到的密码重置结果邮件应是脚本的动作回执,故不做处理')
self.set_need_to_do(netflix_account_email, 1)
return None
# 处理首次运行程式的情形
if netflix_account_email not in self.first_time:
self.first_time.append(netflix_account_email)
if self.args.force:
logger.info(f'强制运行,检测到账户 {netflix_account_email} 存在密码被重置的邮件,已触发密码重置流程')
return True, event_type
logger.info(f'首次运行,故今次检测账户 {netflix_account_email},发现的都是一些旧的密码被重置的邮件,不做处理')
return None
return True, event_type
elif Netflix.is_force_change_password_request(resp['text']): # 检测到奈飞强迫用户修改密码
logger.info('检测到 Netflix 以安全起见,强迫用户修改账户 {} 的密码', netflix_account_email)
event_type = 2
return True, event_type
def pwd_reset_request_mail_listener(self, netflix_account_email) -> str or None:
"""
监听请求重置密码的邮件
在发起重置密码动作后,我们会收到 Netflix 的邮件
:param netflix_account_email:
:return:
"""
# 拉取最新邮件
resp = self.__fetch_mail(netflix_account_email)
if resp and self.is_password_reset_request(resp.get('text', '')):
logger.info('Netflix 账户 {} 已收到请求重置密码的邮件,开始提取重置链接', netflix_account_email)
match = Netflix.RESET_URL_REGEX.search(resp['text'])
if not match:
raise Exception('已命中重置密码邮件,但是未能正确提取重置密码链接,请调查一下')
logger.info('已成功提取重置密码链接')
logger.info(f'本次重置链接为:{match.group(0)}')
return match.group(0)
return None
@staticmethod
def time_diff(start_time, end_time):
"""
计算时间间隔
:param start_time: 开始时间戳
:param end_time: 结束时间戳
:return:
"""
diff_time = end_time - start_time
if diff_time < 0:
raise ValueError('结束时间必须大于等于开始时间')
if diff_time < 1:
return '{:.2f}秒'.format(diff_time)
else:
diff_time = int(diff_time)
if diff_time < 60:
return '{:02d}秒'.format(diff_time)
elif 60 <= diff_time < 3600:
m, s = divmod(diff_time, 60)
return '{:02d}分钟{:02d}秒'.format(m, s)
elif 3600 <= diff_time < 24 * 3600:
m, s = divmod(diff_time, 60)
h, m = divmod(m, 60)
return '{:02d}小时{:02d}分钟{:02d}秒'.format(h, m, s)
elif 24 * 3600 <= diff_time:
m, s = divmod(diff_time, 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
return '{:02d}天{:02d}小时{:02d}分钟{:02d}秒'.format(d, h, m, s)
def __do_reset(self, netflix_account_email: str, p: str) -> bool:
"""
执行重置密码流程
:param netflix_account_email:
:param p:
:return:
"""
self.__forgot_password(netflix_account_email)
logger.info('等待接收重置密码链接')
# 坐等奈飞发送的重置密码链接
wait_start_time = time.time()
while True:
reset_link = self.pwd_reset_request_mail_listener(netflix_account_email)
if reset_link:
self.set_need_to_do(netflix_account_email, 0) # 忽略下一封密码重置邮件
break