Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: observation result calculation based on formula and condition #428

Merged
merged 2 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 120 additions & 1 deletion healthcare/healthcare/doctype/observation/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from frappe import _
from frappe.model.document import Document
from frappe.model.workflow import get_workflow_name, get_workflow_state_field
from frappe.utils import now_datetime
from frappe.utils import flt, get_link_to_form, getdate, now_datetime, nowdate

from erpnext.setup.doctype.terms_and_conditions.terms_and_conditions import (
get_terms_and_conditions,
Expand All @@ -25,6 +25,12 @@ def validate(self):

def on_update(self):
set_diagnostic_report_status(self)
if (
self.parent_observation
and self.result_data
and self.permitted_data_type in ["Quantity", "Numeric"]
):
set_calculated_result(self)

def before_insert(self):
set_observation_idx(self)
Expand Down Expand Up @@ -482,3 +488,116 @@ def set_diagnostic_report_status(doc):
frappe.db.set_value(
"Diagnostic Report", diagnostic_report.get("name"), set_value_dict, update_modified=False
)


def set_calculated_result(doc):
if doc.parent_observation:
parent_template = frappe.db.get_value(
"Observation", doc.parent_observation, "observation_template"
)
parent_template_doc = frappe.get_cached_doc("Observation Template", parent_template)

data = frappe._dict()
patient_doc = frappe.get_cached_doc("Patient", doc.patient).as_dict()
settings = frappe.get_cached_doc("Healthcare Settings").as_dict()

data.update(doc.as_dict())
data.update(parent_template_doc.as_dict())
data.update(patient_doc)
data.update(settings)

for component in parent_template_doc.observation_component:
"""
Data retrieval from observations has been moved into the loop
to accommodate component observations, which may contain formulas
utilizing results from previous iterations.

"""
if component.based_on_formula and component.formula:
obs_data = get_data(doc, parent_template_doc)
else:
continue

if obs_data and len(obs_data) > 0:
data.update(obs_data)
result = eval_condition_and_formula(component, data)
if not result:
continue

result_observation_name, result_data = frappe.db.get_value(
"Observation",
{
"parent_observation": doc.parent_observation,
"observation_template": component.get("observation_template"),
},
["name", "result_data"],
)
if result_observation_name and result_data != str(result):
frappe.db.set_value(
"Observation",
result_observation_name,
"result_data",
str(result),
)


def get_data(doc, parent_template_doc):
data = frappe._dict()
observation_details = frappe.get_all(
"Observation",
{"parent_observation": doc.parent_observation},
["observation_template", "result_data"],
)

# to get all result_data to map against abbs of all table rows
for component in parent_template_doc.observation_component:
result = [
d["result_data"]
for d in observation_details
if (d["observation_template"] == component.get("observation_template") and d["result_data"])
]
data[component.get("abbr")] = flt(result[0]) if (result and len(result) > 0 and result[0]) else 0
return data


def eval_condition_and_formula(d, data):
try:
if d.get("condition"):
cond = d.get("condition")
parts = cond.strip().splitlines()
condition = " ".join(parts)
if condition:
if not frappe.safe_eval(condition, data):
return None

if d.based_on_formula:
amount = None
formula = d.formula.strip().replace("\n", " ") if d.formula else None
operands = re.split(r"\W+", formula)
abbrs = [operand for operand in operands if re.search(r"[a-zA-Z]", operand)]
if "age" in abbrs and data.get("dob"):
age = (
getdate(nowdate()).year
- data.get("dob").year
- (
(getdate(nowdate()).month, getdate(nowdate()).day)
< (data.get("dob").month, data.get("dob").day)
)
)
if age > 0:
data["age"] = age

# check the formula abbrs has result value
abbrs_present = all(abbr in data and data[abbr] != 0 for abbr in abbrs)
if formula and abbrs_present:
amount = flt(frappe.safe_eval(formula, {}, data))

return amount

except Exception as err:
description = _("This error can be due to invalid formula.")
message = _(
"""Error while evaluating the {0} {1} at row {2}. <br><br> <b>Error:</b> {3}
<br><br> <b>Hint:</b> {4}"""
).format(d.parenttype, get_link_to_form(d.parenttype, d.parent), d.idx, err, description)
frappe.throw(message, title=_("Error in formula"))
187 changes: 186 additions & 1 deletion healthcare/healthcare/doctype/observation/test_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# See license.txt

import frappe
from frappe.custom.doctype.custom_field.custom_field import create_custom_fields
from frappe.tests.utils import FrappeTestCase
from frappe.utils import getdate, nowtime
from frappe.utils import flt, getdate, nowtime

from healthcare.healthcare.doctype.healthcare_settings.healthcare_settings import (
get_income_account,
Expand All @@ -20,6 +21,9 @@


class TestObservation(FrappeTestCase):
def setUp(self):
clear_table()

def test_single_observation_from_invoice(self):
frappe.db.set_single_value("Healthcare Settings", "create_observation_on_si_submit", 1)
obs_name = "Total Cholesterol"
Expand Down Expand Up @@ -184,6 +188,13 @@ def test_observation_from_encounter(self):
)
)

def test_with_formula(self):
patient = create_patient()
with_correct_formula(self, patient=patient)
with_incorrect_operand(self, patient)
with_custom_field_in_patient(self, patient)
with_condition_patient(self, patient)


def create_sales_invoice(patient, item):
sales_invoice = frappe.new_doc("Sales Invoice")
Expand Down Expand Up @@ -224,3 +235,177 @@ def create_patient_encounter(patient, observation_template):

patient_encounter.submit()
return patient_encounter


def observation_with_formula(**kwargs):
idx = 1
obs_name = "Test Observation"
operator = kwargs.get("operator")
custom_formula = kwargs.get("custom_formula")
condition1 = kwargs.get("condition1")
condition2 = kwargs.get("condition2")
obs_template = create_grouped_observation_template(obs_name, idx)
first_abbr = obs_template.observation_component[0].abbr
first_obs_template = obs_template.observation_component[0].observation_template

obs_template_component = create_observation_template("Observation Comp ", idx + 2)
obs_template_component_1 = create_observation_template("Observation Comp ", idx + 3)

obs_template.append(
"observation_component",
{
"observation_template": obs_template_component.name,
},
)

obs_template.append(
"observation_component",
{
"observation_template": obs_template_component_1.name,
"based_on_formula": True,
"formula": f"{first_abbr}{operator}{obs_template_component.abbr} {'+ test_custom_field' if custom_formula else ''}",
"condition": condition1,
},
)

if condition2:
obs_template.append(
"observation_component",
{
"observation_template": obs_template_component_1.name,
"abbr": "TC5",
"based_on_formula": True,
"formula": f"{first_abbr}-{obs_template_component.abbr}",
"condition": condition2,
},
)

obs_template.save()

create_sales_invoice(kwargs.get("patient"), obs_template.name)
child_obs_1 = frappe.db.get_value(
"Observation", {"observation_template": first_obs_template}, "name"
)
child_obs_2 = frappe.db.get_value(
"Observation", {"observation_template": obs_template_component.name}, "name"
)
if not kwargs.get("operand_1_db_set"):
child_obs_1_doc = frappe.get_doc("Observation", child_obs_1)
child_obs_1_doc.result_data = str(kwargs.get("input_value_1"))
child_obs_1_doc.save()
else:
frappe.db.set_value("Observation", child_obs_2, "result_data", str(kwargs.get("input_value_1")))

child_obs_2_doc = frappe.get_doc("Observation", child_obs_2)
child_obs_2_doc.result_data = str(kwargs.get("input_value_2"))
child_obs_2_doc.save()

result_value = frappe.db.get_value(
"Observation",
{"observation_template": obs_template_component_1.name},
"result_data",
)
if kwargs.get("operand_1_db_set"):
return obs_template_component_1.name

return result_value


def with_correct_formula(self, **kwargs):
clear_table()
custom_formula = ""
if kwargs.get("patient_custom_formula"):
custom_formula = kwargs.get("patient_custom_formula")

input_value_1 = kwargs.get("value1") if kwargs.get("value1") else 5
input_value_2 = kwargs.get("value2") if kwargs.get("value2") else 2
operator = "+"

result = frappe.safe_eval(str(input_value_1) + operator + str(input_value_2) + custom_formula)
result_value = observation_with_formula(
patient=kwargs.get("patient"),
input_value_1=input_value_1,
input_value_2=input_value_2,
operator=operator,
operand_1_db_set=False,
custom_formula=custom_formula,
condition1=kwargs.get("condition1"),
condition2=kwargs.get("condition2"),
)

if kwargs.get("condition2"):
return result_value

self.assertEqual(flt(result_value), result)


def with_incorrect_operand(self, patient):
clear_table()
input_value_1 = "a"
input_value_2 = 8
operator = "*"
result_observ_temp = observation_with_formula(
patient=patient,
input_value_1=input_value_1,
input_value_2=input_value_2,
operator=operator,
operand_1_db_set=True,
)
self.assertTrue(
frappe.db.exists(
"Observation", {"observation_template": result_observ_temp, "result_data": None}
)
)


def with_custom_field_in_patient(self, patient):
clear_table()
custom_fields = {
"Patient": [
dict(
fieldname="test_custom_field",
label="Test Calculation",
fieldtype="Int",
),
]
}
create_custom_fields(custom_fields, update=True)
custom_field_value = 10
frappe.db.set_value("Patient", patient, "test_custom_field", custom_field_value)

with_correct_formula(self, patient=patient, patient_custom_formula=f"+{custom_field_value}")


def with_condition_patient(self, patient):
clear_table()
condition1 = "gender=='Male'"
condition2 = "gender=='Female'"
result = with_correct_formula(
self, patient=patient, condition1=condition1, condition2=condition2, value1=7, value2=5
)
# equation is 7-5 result must be 2 for Female as Patient is Female
self.assertEqual(flt(result), 2)


def clear_table():
frappe.db.sql("""delete from `tabObservation Template`""")
frappe.db.sql("""delete from `tabObservation`""")
frappe.db.sql("""delete from `tabObservation Component`""")
frappe.db.sql(
"""
delete from `tabItem`
where
name like '%Observation%'
or name like '%CBC%'
or name like '%Cholesterol%'
"""
)
frappe.db.sql(
"""
delete from `tabItem Price`
where
item_code like '%Observation%'
or item_code like '%CBC%'
or item_code like '%Cholesterol%'
"""
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
"field_order": [
"observation_template",
"abbr",
"condition",
"column_break_seht",
"based_on_formula",
"formula"
"formula",
"note"
],
"fields": [
{
Expand All @@ -30,7 +32,7 @@
"fieldname": "based_on_formula",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Based On Formula"
"label": "Based on Condition and Formula"
},
{
"depends_on": "based_on_formula",
Expand All @@ -47,12 +49,24 @@
"in_list_view": 1,
"label": "Abbr",
"read_only": 1
},
{
"depends_on": "based_on_formula",
"fieldname": "condition",
"fieldtype": "Code",
"label": "Condition"
},
{
"depends_on": "based_on_formula",
"fieldname": "note",
"fieldtype": "HTML",
"options": "<p>Notes:</p>\n\n<ol>\n<li>Keywords Available : \n<code>age</code></li>\n<li>Tables Available : <code>Patient, Healthcare Settings, Observation, Observation Template</code></li>\n</ol>"
}
],
"index_web_pages_for_search": 1,
"istable": 1,
"links": [],
"modified": "2023-07-16 19:13:13.918443",
"modified": "2024-04-09 11:52:55.556596",
"modified_by": "Administrator",
"module": "Healthcare",
"name": "Observation Component",
Expand Down
Loading