-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
138 additions
and
136 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
243 changes: 117 additions & 126 deletions
243
software/tracksight/backend/app/process/influx_handler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,142 +1,133 @@ | ||
""" | ||
Influx database handler class. | ||
File for handling influxdb queries. | ||
This requires the influx dbrc mapping to have db name == bucket name | ||
TODO: Implement proper error handling for things like no data available. | ||
""" | ||
|
||
from typing import List, Tuple, Dict | ||
import requests | ||
from dateutil.parser import parse | ||
import os | ||
from typing import List, Tuple, TypedDict | ||
|
||
import influxdb_client | ||
|
||
# Configs for InfluxDBClient | ||
INFLUX_DB_ORG = "org1" | ||
# "https://us-east-1-1.aws.cloud2.influxdata.com" | ||
INFLUX_DB_URL = "http://localhost:8086" | ||
# "pyh_P66tpmkqnfB6IL73p1GVSyiSK_o5_fmt-1KhZ8eYu_WVoyUMddNsHDlozlstS8gZ0WVyuycQtQOCKIIWJQ==" | ||
INFLUX_DB_TOKEN = os.environ.get("INFLUXDB_TOKEN") | ||
if INFLUX_DB_TOKEN is None: | ||
raise ValueError("INFLUXDB_TOKEN environment variable not set") | ||
|
||
|
||
# "testing" | ||
INFLUX_DB_DEFAULT_BUCKET_ID = "example-bucket" | ||
|
||
# TODO consider if we should use the context manager as a wrapper for the client | ||
# with InfluxDBClient(url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG) as client: | ||
# INFLUX_DB_CLIENT = influxdb_client.InfluxDBClient( | ||
# url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG | ||
# ) | ||
|
||
INFLUX_DB_URL = "https://us-east-1-1.aws.cloud2.influxdata.com" | ||
# INFLUX_DB_URL = "http://localhost:8086" | ||
BUCKET = "testing" | ||
TEMP_TOKEN = "pyh_P66tpmkqnfB6IL73p1GVSyiSK_o5_fmt-1KhZ8eYu_WVoyUMddNsHDlozlstS8gZ0WVyuycQtQOCKIIWJQ==" | ||
|
||
# Checks if the vehicle bucket exists, and if not, creates it | ||
with influxdb_client.InfluxDBClient( | ||
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG | ||
) as _client: | ||
if _client.buckets_api().find_bucket_by_name(INFLUX_DB_DEFAULT_BUCKET_ID) is None: | ||
_client.buckets_api().create_bucket(bucket_name=INFLUX_DB_DEFAULT_BUCKET_ID) | ||
|
||
class NoDataForQueryException(Exception): | ||
|
||
def get_measurements(bucket=INFLUX_DB_DEFAULT_BUCKET_ID) -> list[str]: | ||
""" | ||
Raised when no data was found for a specific query | ||
Get all measurements from the database. | ||
:param bucket: Name of bucket to fetch data from. | ||
:returns List of all measurements. | ||
""" | ||
with influxdb_client.InfluxDBClient( | ||
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG | ||
) as client: | ||
return [ | ||
str(i[0]) # "i" is an array of form ["_result", 0, <measurement_name>] | ||
for i in client.query_api() | ||
.query( | ||
f""" | ||
import \"influxdata/influxdb/schema\" | ||
schema.measurements(bucket: \"{bucket}\") | ||
""" | ||
) | ||
.to_values(columns=["_value"]) | ||
] | ||
|
||
|
||
pass | ||
def get_fields( | ||
measurement: str, bucket: str = INFLUX_DB_DEFAULT_BUCKET_ID | ||
) -> list[str]: | ||
""" | ||
Get all fields from a measurement. | ||
:param measurement: Measurement to fetch fields from. | ||
:param bucket: Name of bucket to fetch data from. | ||
:return: List of all fields. | ||
""" | ||
with influxdb_client.InfluxDBClient( | ||
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG | ||
) as client: | ||
return [ | ||
str(i[0]) # "i" is an array of form ["_result", 0, <measurement_name>] | ||
for i in client.query_api() | ||
.query( | ||
f""" | ||
import \"influxdata/influxdb/schema\" | ||
schema.measurementFieldKeys(bucket: \"{bucket}\", measurement: \"{measurement}\") | ||
""" | ||
) | ||
.to_values(columns=["_value"]) | ||
] | ||
|
||
|
||
class InfluxHandler: | ||
class TimeValue(TypedDict): | ||
""" | ||
Class for handling influxdb queries. | ||
This requires the influx dbrc mapping to have db name == bucket name | ||
TODO: Implement proper error handling for things like no data available. | ||
TypedDict for the time and value fields in a query response. | ||
""" | ||
|
||
@staticmethod | ||
def _gen_headers() -> Dict: | ||
""" | ||
:returns Required headers for proper database querying. | ||
""" | ||
headers = { | ||
"Authorization": f"Token {TEMP_TOKEN}", | ||
"Content-type": "application/json", | ||
} | ||
return headers | ||
|
||
@staticmethod | ||
def get_measurements(db: str = BUCKET) -> List: | ||
""" | ||
Get all measurements from the database. | ||
:param db: Name of bucket to fetch data from. | ||
:returns List of all measurements. | ||
""" | ||
headers = InfluxHandler._gen_headers() | ||
params = { | ||
"db": db, | ||
"q": f"SHOW MEASUREMENTS ON {db}", | ||
} | ||
response = requests.get( | ||
f"{INFLUX_DB_URL}/query", headers=headers, params=params | ||
) | ||
|
||
results = response.json()["results"][0] | ||
# Very jank, not sure if this is correct | ||
return [measurement["values"][0][0] for measurement in results["series"]] | ||
|
||
@staticmethod | ||
def get_fields(measurement: str, db: str = BUCKET) -> List: | ||
""" | ||
Get all fields from a measurement. | ||
:param measurement: Measurement to fetch fields from. | ||
:param db: Name of bucket to fetch data from. | ||
:return: List of all fields. | ||
""" | ||
headers = InfluxHandler._gen_headers() | ||
params = { | ||
"db": db, | ||
"q": f"SHOW FIELD KEYS ON {db} FROM {measurement}", | ||
} | ||
response = requests.get( | ||
f"{INFLUX_DB_URL}/query", headers=headers, params=params | ||
) | ||
|
||
# lol | ||
results = response.json()["results"][0]["series"][0]["values"] | ||
return [field_pair[0] for field_pair in results] | ||
|
||
@staticmethod | ||
def query( | ||
measurement: str, | ||
fields: List[str], | ||
time_range: Tuple[int, int], | ||
db: str = BUCKET, | ||
max_points: int = 8000, | ||
ms_resolution: int = 100, | ||
) -> Dict: | ||
""" | ||
Make a general query to the database. | ||
:param measurement: Measurement to pull data from. | ||
:param fields: Fields to fetch. | ||
:param time_range: Tuple like (time start, time end) to specify the time interval. | ||
:param db: Name of bucket to fetch data from. | ||
:param max_points: Maximum number of datapoints to fetch. | ||
:param ms_resolution: Minimum time delta required before grabbing a new datapoint. | ||
:return: | ||
times: list[str] | ||
values: list[str] | ||
|
||
|
||
def query( | ||
measurement: str, | ||
fields: List[str], | ||
time_range: Tuple[int, int], | ||
bucket: str = INFLUX_DB_DEFAULT_BUCKET_ID, | ||
max_points: int = 8000, | ||
ms_resolution: int = 100, | ||
) -> dict[str, TimeValue]: | ||
""" | ||
Make a general query to the database. | ||
:param measurement: Measurement to pull data from. | ||
:param fields: Fields to fetch. | ||
:param time_range: Tuple like (time start, time end) to specify the time interval. | ||
:param bucket: Name of bucket to fetch data from. | ||
:param max_points: Maximum number of datapoints to fetch. | ||
:param ms_resolution: Minimum time delta required before grabbing a new datapoint. | ||
:return: | ||
""" | ||
out: dict[str, TimeValue] = {field: {"times": [], "values": []} for field in fields} | ||
with influxdb_client.InfluxDBClient( | ||
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG | ||
) as client: | ||
for field, value, time in ( | ||
client.query_api() | ||
.query( | ||
f""" | ||
from(bucket:"{bucket}") | ||
|> range(start: {time_range[0]}, stop: {time_range[1]}) | ||
|> filter(fn: (r) => r._measurement == "{measurement}" and contains(value: r._field, set: {str(fields).replace("'", '"')})) | ||
""" | ||
headers = InfluxHandler._gen_headers() | ||
fields_string = ",".join(fields) | ||
query = f"SELECT {fields_string} FROM {measurement} WHERE time >= {time_range[0]} AND time <= {time_range[1]} tz('America/Los_Angeles')" | ||
params = { | ||
"db": db, | ||
"q": query, | ||
} | ||
response = requests.get( | ||
f"{INFLUX_DB_URL}/query", headers=headers, params=params | ||
) | ||
|
||
results = response.json()["results"][0] | ||
if "series" not in results: | ||
raise NoDataForQueryException("No data found for this query") | ||
|
||
results = results["series"][0] | ||
columns = results["columns"] | ||
values = results["values"] | ||
|
||
data = {column: {"time": [], "value": []} for column in columns[1:]} | ||
prev_time = parse(values[0][0]) | ||
cur_point = 0 | ||
|
||
for value in values: | ||
nextTime = parse(value[0]) | ||
time_delta = nextTime - prev_time | ||
|
||
if cur_point > max_points: | ||
break | ||
|
||
if time_delta.total_seconds() <= ms_resolution / 1000: | ||
continue | ||
|
||
for i in range(1, len(value)): | ||
column = columns[i] | ||
data[column]["time"].append(value[0]) | ||
data[column]["value"].append(value[i]) | ||
|
||
prev_time = nextTime | ||
cur_point += 1 | ||
|
||
return data | ||
) | ||
.to_values(columns=["_field", "_value", "_time"]) | ||
): | ||
out[field]["times"].append(time) | ||
out[field]["values"].append(value) | ||
return out |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters