-
Notifications
You must be signed in to change notification settings - Fork 2
/
core_api.py
399 lines (346 loc) · 12.5 KB
/
core_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
from fastapi import FastAPI
from fastapi import Request
from fastapi import Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi_pagination import add_pagination
from fastapi_pagination import Page
from ipfs_client.main import AsyncIPFSClientSingleton
from pydantic import Field
from web3 import Web3
from snapshotter.settings.config import settings
from snapshotter.utils.data_utils import get_project_epoch_snapshot
from snapshotter.utils.data_utils import get_project_finalized_cid
from snapshotter.utils.default_logger import logger
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.models.data_models import TaskStatusRequest
from snapshotter.utils.rpc import RpcHelper
# setup logging
rest_logger = logger.bind(module='CoreAPI')
protocol_state_contract_abi = read_json_file(
settings.protocol_state.abi,
rest_logger,
)
protocol_state_contract_address = settings.protocol_state.address
# setup CORS origins stuff
origins = ['*']
app = FastAPI()
# for pagination of epoch processing status reports
Page = Page.with_custom_options(
size=Field(10, ge=1, le=30),
)
add_pagination(app)
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
@app.on_event('startup')
async def startup_boilerplate():
"""
This function initializes various state variables and caches required for the application to function properly.
"""
app.state.core_settings = settings
app.state.local_user_cache = dict()
app.state.anchor_rpc_helper = RpcHelper(rpc_settings=settings.anchor_chain_rpc)
app.state.protocol_state_contract = app.state.anchor_rpc_helper.get_current_node()['web3_client'].eth.contract(
address=Web3.to_checksum_address(
protocol_state_contract_address,
),
abi=protocol_state_contract_abi,
)
if not settings.ipfs.url:
rest_logger.warning('IPFS url not set, /data API endpoint will be unusable!')
else:
app.state.ipfs_singleton = AsyncIPFSClientSingleton(settings.ipfs)
await app.state.ipfs_singleton.init_sessions()
app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client
app.state.epoch_size = 0
# Health check endpoint
@app.get('/health')
async def health_check(
request: Request,
response: Response,
):
"""
Endpoint to check the health of the Snapshotter service.
Parameters:
request (Request): The incoming request object.
response (Response): The outgoing response object.
Returns:
dict: A dictionary containing the status of the service.
"""
return {'status': 'OK'}
@app.get('/current_epoch')
async def get_current_epoch(
request: Request,
response: Response,
):
"""
Get the current epoch data from the protocol state contract.
Args:
request (Request): The incoming request object.
response (Response): The outgoing response object.
Returns:
dict: A dictionary containing the current epoch data.
"""
try:
[current_epoch_data] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.currentEpoch(Web3.to_checksum_address(settings.data_market))],
)
current_epoch = {
'begin': current_epoch_data[0],
'end': current_epoch_data[1],
'epochId': current_epoch_data[2],
}
except Exception as e:
rest_logger.exception(
'Exception in get_current_epoch',
e=e,
)
response.status_code = 500
return {
'status': 'error',
'message': f'Unable to get current epoch, error: {e}',
}
return current_epoch
@app.get('/epoch/{epoch_id}')
async def get_epoch_info(
request: Request,
response: Response,
epoch_id: int,
):
"""
Get epoch information for a given epoch ID.
Args:
request (Request): The incoming request object.
response (Response): The outgoing response object.
epoch_id (int): The epoch ID for which to retrieve information.
Returns:
dict: A dictionary containing epoch information including timestamp, block number, and epoch end.
"""
try:
[epoch_info_data] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.epochInfo(Web3.to_checksum_address(settings.data_market), epoch_id)],
)
epoch_info = {
'timestamp': epoch_info_data[0],
'blocknumber': epoch_info_data[1],
'epochEnd': epoch_info_data[2],
}
except Exception as e:
rest_logger.exception(
'Exception in get_current_epoch',
e=e,
)
response.status_code = 500
return {
'status': 'error',
'message': f'Unable to get current epoch, error: {e}',
}
return epoch_info
@app.get('/last_finalized_epoch/{project_id}')
async def get_project_last_finalized_epoch_info(
request: Request,
response: Response,
project_id: str,
):
"""
Get the last finalized epoch information for a given project.
Args:
request (Request): The incoming request object.
response (Response): The outgoing response object.
project_id (str): The ID of the project to get the last finalized epoch information for.
Returns:
dict: A dictionary containing the last finalized epoch information for the given project.
"""
try:
# find from contract
epoch_finalized = False
[cur_epoch] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.currentEpoch(Web3.to_checksum_address(settings.data_market))],
)
epoch_id = int(cur_epoch[2])
while not epoch_finalized and epoch_id >= 0:
# get finalization status
[epoch_finalized_contract] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.snapshotStatus(settings.data_market, project_id, epoch_id)],
)
if epoch_finalized_contract[0]:
epoch_finalized = True
project_last_finalized_epoch = epoch_id
else:
epoch_id -= 1
if epoch_id < 0:
response.status_code = 404
return {
'status': 'error',
'message': f'Unable to find last finalized epoch for project {project_id}',
}
[epoch_info_data] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.epochInfo(Web3.to_checksum_address(settings.data_market), project_last_finalized_epoch)],
)
epoch_info = {
'epochId': project_last_finalized_epoch,
'timestamp': epoch_info_data[0],
'blocknumber': epoch_info_data[1],
'epochEnd': epoch_info_data[2],
}
except Exception as e:
rest_logger.exception(
'Exception in get_project_last_finalized_epoch_info',
e=e,
)
response.status_code = 500
return {
'status': 'error',
'message': f'Unable to get last finalized epoch for project {project_id}, error: {e}',
}
return epoch_info
# get data for epoch_id, project_id
@app.get('/data/{epoch_id}/{project_id}/')
async def get_data_for_project_id_epoch_id(
request: Request,
response: Response,
project_id: str,
epoch_id: int,
):
"""
Get data for a given project and epoch ID.
Args:
request (Request): The incoming request.
response (Response): The outgoing response.
project_id (str): The ID of the project.
epoch_id (int): The ID of the epoch.
Returns:
dict: The data for the given project and epoch ID.
"""
if not settings.ipfs.url:
response.status_code = 500
return {
'status': 'error',
'message': f'IPFS url not set, /data API endpoint is unusable, please use /cid endpoint instead!',
}
try:
data = await get_project_epoch_snapshot(
request.app.state.protocol_state_contract,
request.app.state.anchor_rpc_helper,
request.app.state.ipfs_reader_client,
epoch_id,
project_id,
)
except Exception as e:
rest_logger.exception(
'Exception in get_data_for_project_id_epoch_id',
e=e,
)
response.status_code = 500
return {
'status': 'error',
'message': f'Unable to get data for project_id: {project_id},'
f' epoch_id: {epoch_id}, error: {e}',
}
if not data:
response.status_code = 404
return {
'status': 'error',
'message': f'No data found for project_id: {project_id},'
f' epoch_id: {epoch_id}',
}
return data
# get finalized cid for epoch_id, project_id
@app.get('/cid/{epoch_id}/{project_id}/')
async def get_finalized_cid_for_project_id_epoch_id(
request: Request,
response: Response,
project_id: str,
epoch_id: int,
):
"""
Get finalized cid for a given project_id and epoch_id.
Args:
request (Request): The incoming request.
response (Response): The outgoing response.
project_id (str): The project id.
epoch_id (int): The epoch id.
Returns:
dict: The finalized cid for the given project_id and epoch_id.
"""
try:
data = await get_project_finalized_cid(
request.app.state.protocol_state_contract,
settings.data_market,
request.app.state.anchor_rpc_helper,
epoch_id,
project_id,
)
except Exception as e:
rest_logger.exception(
'Exception in get_finalized_cid_for_project_id_epoch_id',
e=e,
)
response.status_code = 500
return {
'status': 'error',
'message': f'Unable to get finalized cid for project_id: {project_id},'
f' epoch_id: {epoch_id}, error: {e}',
}
if not data:
response.status_code = 404
return {
'status': 'error',
'message': f'No finalized cid found for project_id: {project_id},'
f' epoch_id: {epoch_id}',
}
return data
@app.post('/task_status')
async def get_task_status_post(
request: Request,
response: Response,
task_status_request: TaskStatusRequest,
):
"""
Endpoint to get the status of a task for a given wallet address.
Args:
request (Request): The incoming request object.
response (Response): The outgoing response object.
task_status_request (TaskStatusRequest): The request body containing the task type and wallet address.
Returns:
dict: A dictionary containing the status of the task and a message.
"""
# check wallet address is valid EVM address
try:
Web3.to_checksum_address(task_status_request.wallet_address)
except:
response.status_code = 400
return {
'status': 'error',
'message': f'Invalid wallet address: {task_status_request.wallet_address}',
}
project_id = f'{task_status_request.task_type}:{task_status_request.wallet_address.lower()}:{settings.namespace}'
try:
[last_finalized_epoch] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.lastFinalizedSnapshot(Web3.to_checksum_address(settings.data_market), project_id)],
)
except Exception as e:
rest_logger.exception(
'Exception in get_current_epoch',
e=e,
)
response.status_code = 500
return {
'status': 'error',
'message': f'Unable to get last_finalized_epoch, error: {e}',
}
else:
if last_finalized_epoch > 0:
return {
'completed': True,
'message': f'Task {task_status_request.task_type} for wallet {task_status_request.wallet_address} was completed in epoch {last_finalized_epoch}',
}
else:
return {
'completed': False,
'message': f'Task {task_status_request.task_type} for wallet {task_status_request.wallet_address} is not completed yet',
}