This repository has been archived by the owner on Mar 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
Copy pathreq.py
141 lines (115 loc) · 4.76 KB
/
req.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# encoding=utf8
# 请求的方法
import httpx
from httpx import Limits
from typing import Union, List
import asyncio
from utils import default_header_user_agent
from utils.models import API
from utils.log import logger
def reqAPI(api: API, client: Union[httpx.Client, httpx.AsyncClient]) -> httpx.Response:
if isinstance(api.data, dict):
resp = client.request(method=api.method, json=api.data,
headers=api.header, url=api.url, timeout=10)
else:
resp = client.request(method=api.method, data=api.data,
headers=api.header, url=api.url, timeout=10)
return resp
def reqFuncByProxy(api: Union[API, str], phone: Union[tuple, str], proxy: dict) -> bool:
"""通过代理请求接口方法"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
with httpx.Client(headers=default_header_user_agent(), verify=False, proxies=proxy) as client:
for ph in phone_lst:
try:
if isinstance(api, API):
api = api.handle_API(ph)
resp = reqAPI(api, client)
logger.info(f"{api.desc}-{resp.text[:30]}")
else:
api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '')
resp = client.get(url=api, headers=default_header_user_agent())
logger.info(f"GETAPI接口-{resp.text[:30]}")
return True
except httpx.HTTPError as why:
logger.error(f"请求失败{why}")
return False
def reqFunc(api: Union[API, str], phone: Union[tuple, str]) -> bool:
"""请求接口方法"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
with httpx.Client(headers=default_header_user_agent(), verify=False) as client:
for ph in phone_lst:
try:
if isinstance(api, API):
api = api.handle_API(ph)
resp = reqAPI(api, client)
logger.info(f"{api.desc}-{resp.text[:30]}")
else:
api = api.replace("[phone]", ph).replace(" ", "").replace('\n', '').replace('\r', '')
resp = client.get(url=api, headers=default_header_user_agent())
logger.info(f"GETAPI接口-{resp.text[:30]}")
return True
except httpx.HTTPError as why:
logger.error(f"请求失败{why}")
return False
async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore):
"""异步请求方法
:param:
:return:
"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
async with semaphore:
async with httpx.AsyncClient(
limits=Limits(max_connections=1000,
max_keepalive_connections=2000),
headers=default_header_user_agent(),
verify=False,
timeout=99999
) as c:
for ph in phone_lst:
try:
if isinstance(src, API):
src = src.handle_API(ph)
r = await reqAPI(src, c)
else:
# 利用元组传参安全因为元组不可修改
s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace(
"&", "").replace('\n', '').replace('\r', ''),)
r = await c.get(*s)
return r
except httpx.HTTPError as why:
logger.error(f"异步请求失败{type(why)}")
# logger.error(f"异步请求失败{why}")
# import aiofiles
# async with aiofiles.open("error.txt","a",encoding="utf-8") as f:
# await f.write(f"{str(s[0]) if str(s[0]) else str(src)}\n")
except TypeError:
logger.error("类型错误")
except Exception as wy:
logger.exception(f"异步失败{wy}")
def callback(result):
"""异步回调函数"""
log = result.result()
if log is not None:
logger.info(f"请求结果:{log.text[:30]}")
async def runAsync(apis: List[Union[API,str]], phone: Union[tuple, str]):
tasks = []
for api in apis:
semaphore = asyncio.Semaphore(999999)
task = asyncio.ensure_future(asyncReqs(api, phone, semaphore))
task.add_done_callback(callback)
tasks.append(task)
await asyncio.gather(
*tasks
)