Skip to content

Commit

Permalink
Tracksight Backend Influx Refactor (#1280)
Browse files Browse the repository at this point in the history
### Changelist 
<!-- Give a list of the changes covered in this PR. This will help both
you and the reviewer keep this PR within scope. -->
Does what it says on the tin

### Testing Done
<!-- Outline the testing that was done to demonstrate the changes are
solid. This could be unit tests, integration tests, testing on the car,
etc. Include relevant code snippets, screenshots, etc as needed. -->

### Resolved Tickets
<!-- Link any tickets that this PR resolves. -->

---------

Co-authored-by: Gus Tahara-Edmonds <[email protected]>
  • Loading branch information
Lucien950 and gtaharaedmonds authored May 26, 2024
1 parent 6c33fa7 commit 2b42a87
Show file tree
Hide file tree
Showing 25 changed files with 1,232 additions and 8,023 deletions.
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ flask_socketio = "*"
pytest = "*"
asammdf = "*"
requests = "*"
python-dotenv = "*"
influxdb-client = "*"

[dev-packages]
setuptools = "*"
Expand Down
4 changes: 3 additions & 1 deletion environment/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ pandas
flask_socketio
pytest
asammdf
requests
requests
python-dotenv
influxdb-client
62 changes: 31 additions & 31 deletions software/tracksight/backend/app/process/flask_apps/http_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,75 @@
Main REST component of the backend
"""

from typing import Tuple
from flask import Blueprint, jsonify, request, Response
from flask import Blueprint, request

from ..influx_handler import InfluxHandler as influx
from ..influx_handler import NoDataForQueryException
from .. import influx_handler as influx

# HTTP processes for data that is not live
app = Blueprint("http_app", __name__)


def responsify(data):
"""
Manual CORS?????? very bad idea
:param data: Data to JSON-ify.
:returns JSON-ified data response.
"""
response = jsonify(data)
response.headers.add("Access-Control-Allow-Origin", "*")
return response


@app.route("/")
def hello_world() -> str:
def hello_world():
"""
:returns Hello world page for backend.
"""
return "Telemetry Backend is running!"


@app.route("/health")
def health() -> Tuple[Response, int]:
def health():
"""
:returns Health check page for backend.
"""
return jsonify(status="healthy"), 200
return {"status": "healthy"}, 200


@app.route("/signal/measurements", methods=["GET"])
def return_all_measurements() -> Response:
def return_all_measurements():
"""
:returns Page displaying all measurements in the database.
"""
measurements = influx.get_measurements()
return responsify(measurements)
return influx.get_measurements(), 200


@app.route("/signal/measurement/<string:measurement>/fields", methods=["GET"])
def return_all_fields_for_measurement(measurement: str) -> Response:
def return_all_fields_for_measurement(measurement: str):
"""
:param measurement: Measurement to fetch fields for.
:returns Page displaying all fields for a specific measurement.
"""
fields = influx.get_fields(measurement)
return responsify(fields)
return influx.get_fields(measurement), 200


@app.route("/signal/query", methods=["GET"])
def return_query() -> Response:
def return_query():
"""
:returns Page displaying the result of a single query.
"""
params = request.args
measurement = params.get("measurement")
fields = params.get("fields").split(",")
fields: list[str] | None = params.get("fields").split(",")
start_epoch = params.get("start_epoch")
end_epoch = params.get("end_epoch")

if (
measurement is None
or fields is None
or start_epoch is None
or end_epoch is None
):
missing_keys = [
k
for k, v in [
("measurement", measurement),
("fields", fields),
("start_epoch", start_epoch),
("end_epoch", end_epoch),
]
if v is None
]
return {"error": f"Missing parameters: {missing_keys}"}, 400
try:
data = influx.query(measurement, fields, [start_epoch, end_epoch])
except NoDataForQueryException as e:
return responsify({"error": str(e)}), 400
return responsify(data)
return influx.query(measurement, fields, (start_epoch, end_epoch))
except Exception as e:
return {"error": str(e)}, 500
227 changes: 106 additions & 121 deletions software/tracksight/backend/app/process/influx_handler.py
Original file line number Diff line number Diff line change
@@ -1,142 +1,127 @@
"""
Influx database handler class.
File for handling influxdb queries.
This requires the influx dbrc mapping to have db name == bucket name
TODO: Implement proper error handling for things like no data available.
"""

from typing import List, Tuple, Dict
import requests
from dateutil.parser import parse
import os
from typing import List, Tuple, TypedDict

INFLUX_DB_URL = "https://us-east-1-1.aws.cloud2.influxdata.com"
# INFLUX_DB_URL = "http://localhost:8086"
BUCKET = "testing"
TEMP_TOKEN = "pyh_P66tpmkqnfB6IL73p1GVSyiSK_o5_fmt-1KhZ8eYu_WVoyUMddNsHDlozlstS8gZ0WVyuycQtQOCKIIWJQ=="
import influxdb_client

# Configs for InfluxDBClient
INFLUX_DB_ORG = "org1"
# "https://us-east-1-1.aws.cloud2.influxdata.com"
if os.environ.get("USING_DOCKER") == "true":
INFLUX_DB_URL = "http://influx:8086"
else:
INFLUX_DB_URL = "http://localhost:8086"
# "pyh_P66tpmkqnfB6IL73p1GVSyiSK_o5_fmt-1KhZ8eYu_WVoyUMddNsHDlozlstS8gZ0WVyuycQtQOCKIIWJQ=="
INFLUX_DB_TOKEN = os.environ.get("INFLUXDB_TOKEN")
if INFLUX_DB_TOKEN is None:
raise ValueError("INFLUXDB_TOKEN environment variable not set")

class NoDataForQueryException(Exception):
"""
Raised when no data was found for a specific query
"""
# "testing"
INFLUX_DB_VEHICLE_BUCKET = "vehicle"
print(f"Using URL {INFLUX_DB_URL} with token {INFLUX_DB_TOKEN}")

pass
# Checks if the vehicle bucket exists, and if not, creates it
with influxdb_client.InfluxDBClient(
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG
) as _client:
if _client.buckets_api().find_bucket_by_name(INFLUX_DB_VEHICLE_BUCKET) is None:
_client.buckets_api().create_bucket(bucket_name=INFLUX_DB_VEHICLE_BUCKET)


class InfluxHandler:
def get_measurements(bucket=INFLUX_DB_VEHICLE_BUCKET) -> list[str]:
"""
Class for handling influxdb queries.
This requires the influx dbrc mapping to have db name == bucket name
TODO: Implement proper error handling for things like no data available.
Get all measurements from the database.
:param bucket: Name of bucket to fetch data from.
:returns List of all measurements.
"""

@staticmethod
def _gen_headers() -> Dict:
"""
:returns Required headers for proper database querying.
"""
headers = {
"Authorization": f"Token {TEMP_TOKEN}",
"Content-type": "application/json",
}
return headers

@staticmethod
def get_measurements(db: str = BUCKET) -> List:
"""
Get all measurements from the database.
:param db: Name of bucket to fetch data from.
:returns List of all measurements.
"""
headers = InfluxHandler._gen_headers()
params = {
"db": db,
"q": f"SHOW MEASUREMENTS ON {db}",
}
response = requests.get(
f"{INFLUX_DB_URL}/query", headers=headers, params=params
)

results = response.json()["results"][0]
# Very jank, not sure if this is correct
return [measurement["values"][0][0] for measurement in results["series"]]

@staticmethod
def get_fields(measurement: str, db: str = BUCKET) -> List:
"""
Get all fields from a measurement.
:param measurement: Measurement to fetch fields from.
:param db: Name of bucket to fetch data from.
:return: List of all fields.
with influxdb_client.InfluxDBClient(
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG
) as client:
return [
str(i[0]) # "i" is an array of form ["_result", 0, <measurement_name>]
for i in client.query_api()
.query(
f"""
import \"influxdata/influxdb/schema\"
schema.measurements(bucket: \"{bucket}\")
"""
headers = InfluxHandler._gen_headers()
params = {
"db": db,
"q": f"SHOW FIELD KEYS ON {db} FROM {measurement}",
}
response = requests.get(
f"{INFLUX_DB_URL}/query", headers=headers, params=params
)
)
.to_values(columns=["_value"])
]

# lol
results = response.json()["results"][0]["series"][0]["values"]
return [field_pair[0] for field_pair in results]

@staticmethod
def query(
measurement: str,
fields: List[str],
time_range: Tuple[int, int],
db: str = BUCKET,
max_points: int = 8000,
ms_resolution: int = 100,
) -> Dict:
"""
Make a general query to the database.
:param measurement: Measurement to pull data from.
:param fields: Fields to fetch.
:param time_range: Tuple like (time start, time end) to specify the time interval.
:param db: Name of bucket to fetch data from.
:param max_points: Maximum number of datapoints to fetch.
:param ms_resolution: Minimum time delta required before grabbing a new datapoint.
:return:
def get_fields(measurement: str, bucket: str = INFLUX_DB_VEHICLE_BUCKET) -> list[str]:
"""
Get all fields from a measurement.
:param measurement: Measurement to fetch fields from.
:param bucket: Name of bucket to fetch data from.
:return: List of all fields.
"""
with influxdb_client.InfluxDBClient(
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG
) as client:
return [
str(i[0]) # "i" is an array of form ["_result", 0, <measurement_name>]
for i in client.query_api()
.query(
f"""
import \"influxdata/influxdb/schema\"
schema.measurementFieldKeys(bucket: \"{bucket}\", measurement: \"{measurement}\")
"""
headers = InfluxHandler._gen_headers()
fields_string = ",".join(fields)
query = f"SELECT {fields_string} FROM {measurement} WHERE time >= {time_range[0]} AND time <= {time_range[1]} tz('America/Los_Angeles')"
params = {
"db": db,
"q": query,
}
response = requests.get(
f"{INFLUX_DB_URL}/query", headers=headers, params=params
)

results = response.json()["results"][0]
if "series" not in results:
raise NoDataForQueryException("No data found for this query")

results = results["series"][0]
columns = results["columns"]
values = results["values"]

data = {column: {"time": [], "value": []} for column in columns[1:]}
prev_time = parse(values[0][0])
cur_point = 0
)
.to_values(columns=["_value"])
]

for value in values:
nextTime = parse(value[0])
time_delta = nextTime - prev_time

if cur_point > max_points:
break

if time_delta.total_seconds() <= ms_resolution / 1000:
continue
class TimeValue(TypedDict):
"""
TypedDict for the time and value fields in a query response.
"""

for i in range(1, len(value)):
column = columns[i]
data[column]["time"].append(value[0])
data[column]["value"].append(value[i])
times: list[str]
values: list[str]

prev_time = nextTime
cur_point += 1

return data
def query(
measurement: str,
fields: List[str],
time_range: Tuple[str, str],
bucket: str = INFLUX_DB_VEHICLE_BUCKET,
max_points: int = 8000, # TODO implement
ms_resolution: int = 100, # TODO implement
) -> dict[str, TimeValue]:
"""
Make a general query to the database.
:param measurement: Measurement to pull data from.
:param fields: Fields to fetch.
:param time_range: Tuple like (time start, time end) to specify the time interval.
:param bucket: Name of bucket to fetch data from.
:param max_points: Maximum number of datapoints to fetch.
:param ms_resolution: Minimum time delta required before grabbing a new datapoint.
:return: A dictionary where the keys are the fields and the values are TimeValue objects.
"""
out: dict[str, TimeValue] = {field: {"times": [], "values": []} for field in fields}
with influxdb_client.InfluxDBClient(
url=INFLUX_DB_URL, token=INFLUX_DB_TOKEN, org=INFLUX_DB_ORG
) as client:
for field, value, time in (
client.query_api()
.query(
f"""
from(bucket:"{bucket}")
|> range(start: {time_range[0]}, stop: {time_range[1]})
|> filter(fn: (r) => r._measurement == "{measurement}" and contains(value: r._field, set: {str(fields).replace("'", '"')}))
"""
)
.to_values(columns=["_field", "_value", "_time"])
):
out[field]["times"].append(time)
out[field]["values"].append(value)
return out
Loading

0 comments on commit 2b42a87

Please sign in to comment.