Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Data model cache refactoring #3527

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 127 additions & 66 deletions src/ansys/fluent/core/data_model_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import copy
from enum import Enum
from threading import RLock
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from ansys.api.fluent.v0.variant_pb2 import Variant
from ansys.fluent.core.utils.fluent_version import FluentVersion
Expand Down Expand Up @@ -206,28 +206,34 @@ def _update_cache_from_variant_state(
source: Dict[str, StateType],
key: str,
state: Variant,
updaterFn,
updater_fn,
rules_str: str,
version,
):
if state.HasField("bool_state"):
updaterFn(source, key, state.bool_state)
elif state.HasField("int64_state"):
updaterFn(source, key, state.int64_state)
elif state.HasField("double_state"):
updaterFn(source, key, state.double_state)
elif state.HasField("string_state"):
updaterFn(source, key, state.string_state)
elif state.HasField("bool_vector_state"):
updaterFn(source, key, state.bool_vector_state.item)
elif state.HasField("int64_vector_state"):
updaterFn(source, key, state.int64_vector_state.item)
elif state.HasField("double_vector_state"):
updaterFn(source, key, state.double_vector_state.item)
elif state.HasField("string_vector_state"):
updaterFn(source, key, state.string_vector_state.item)
elif state.HasField("variant_vector_state"):
updaterFn(source, key, [])
# Helper function to update the source with the state value
def update_source_with_state(state_field):
if state.HasField(state_field):
updater_fn(source, key, getattr(state, state_field))
return True
return False

# Check for basic state types
for state_type in [
"bool_state",
"int64_state",
"double_state",
"string_state",
"bool_vector_state",
"int64_vector_state",
"double_vector_state",
"string_vector_state",
]:
if update_source_with_state(state_type):
return

# Handle variant vector state
if state.HasField("variant_vector_state"):
updater_fn(source, key, [])
for item in state.variant_vector_state.item:
self._update_cache_from_variant_state(
rules,
Expand All @@ -238,34 +244,28 @@ def _update_cache_from_variant_state(
rules_str + "/" + key.split(":", maxsplit=1)[0],
version,
)
elif state.HasField("variant_map_state"):
return

# Handle variant map state
if state.HasField("variant_map_state"):
internal_names_as_keys = (
self.get_config(rules, "name_key") == NameKey.INTERNAL
)

# Determine the appropriate key
if ":" in key:
type_, iname = key.split(":", maxsplit=1)
for k1, v1 in source.items():
if (internal_names_as_keys and k1 == key) or (
(not internal_names_as_keys)
and isinstance(v1, dict)
and v1.get(NameKey.INTERNAL.value) == iname
):
key = k1
break
else: # new named object
if internal_names_as_keys:
source[key] = {}
else:
name = state.variant_map_state.item[
NameKey.DISPLAY.value
].string_state
key = f"{type_}:{name}"
source[key] = {NameKey.INTERNAL.value: iname}
key = self._determine_key(
source, internal_names_as_keys, key, state, type_, iname
)
else:
if key not in source:
source[key] = {}

if version and _is_dict_parameter_type(version, rules, rules_str):
source[key] = {}

# Update the source with items from the variant map state
if state.variant_map_state.item:
source = source[key]
for k, v in state.variant_map_state.item.items():
Expand All @@ -280,8 +280,40 @@ def _update_cache_from_variant_state(
)
else:
source[key] = {}

# Default case when no fields are matched
else:
updaterFn(source, key, None)
updater_fn(source, key, None)

def _determine_key(
self,
source: Dict[str, StateType],
internal_names_as_keys: bool,
key: str,
state: Variant,
type_: str,
iname: str,
) -> str:
"""Determine the appropriate key based on internal naming conventions."""
for k1, v1 in source.items():
if (internal_names_as_keys and k1 == key) or (
(not internal_names_as_keys)
and isinstance(v1, dict)
and v1.get(NameKey.INTERNAL.value) == iname
):
return k1 # Found a matching key

# If no match found and internal naming is used
if internal_names_as_keys:
source[key] = {}
return key

# If no match found and external naming is used
name = state.variant_map_state.item[NameKey.DISPLAY.value].string_state
new_key = f"{type_}:{name}"
source[new_key] = {NameKey.INTERNAL.value: iname}

return new_key

def update_cache(
self, rules: str, state: Variant, deleted_paths: List[str], version=None
Expand All @@ -300,37 +332,16 @@ def update_cache(
Fluent version
"""
cache = self.rules_str_to_cache[rules]

with self._with_lock(rules):
internal_names_as_keys = (
self.get_config(rules, "name_key") == NameKey.INTERNAL
)
for deleted_path in deleted_paths:
comps = [x for x in deleted_path.split("/") if x]
sub_cache = cache
for i, comp in enumerate(comps):
if ":" in comp:
_, iname = comp.split(":", maxsplit=1)
key_to_del = None
for k, v in sub_cache.items():
if (internal_names_as_keys and k == comp) or (
(not internal_names_as_keys)
and isinstance(v, dict)
and v.get(NameKey.INTERNAL.value) == iname
):
if i == len(comps) - 1:
key_to_del = k
else:
sub_cache = v
break
else:
break
if key_to_del:
del sub_cache[key_to_del]
else:
if comp in sub_cache:
sub_cache = sub_cache[comp]
else:
break

# Process deleted paths
self._process_deleted_paths(cache, deleted_paths, internal_names_as_keys)

# Update cache with new state items
for k, v in state.variant_map_state.item.items():
self._update_cache_from_variant_state(
rules,
Expand All @@ -342,6 +353,56 @@ def update_cache(
version,
)

def _process_deleted_paths(
self,
cache: Dict[str, Any],
deleted_paths: List[str],
internal_names_as_keys: bool,
):
"""Process and delete paths from the cache based on the deleted paths list."""
for deleted_path in deleted_paths:
comps = [x for x in deleted_path.split("/") if x]
self._delete_from_cache(cache, comps, internal_names_as_keys)

def _delete_from_cache(
self, sub_cache: Dict[str, Any], comps: List[str], internal_names_as_keys: bool
):
"""Recursively delete components from the cache."""
for i, comp in enumerate(comps):
if ":" in comp:
_, iname = comp.split(":", maxsplit=1)
key_to_del = self._find_key_to_delete(
sub_cache, comp, iname, i == len(comps) - 1, internal_names_as_keys
)
if key_to_del:
del sub_cache[key_to_del]
return # Exit after deletion
else:
if comp in sub_cache:
sub_cache = sub_cache[comp]
else:
break

def _find_key_to_delete(
self,
sub_cache: Dict[str, Any],
comp: str,
iname: str,
is_last_component: bool,
internal_names_as_keys: bool,
) -> Optional[str]:
"""Find the key to delete from the sub-cache."""
for k, v in sub_cache.items():
if (internal_names_as_keys and k == comp) or (
(not internal_names_as_keys)
and isinstance(v, dict)
and v.get(NameKey.INTERNAL.value) == iname
):
return (
k if is_last_component else None
) # Return key if it's the last component
return None # No key found to delete

@staticmethod
def _dm_path_comp(comp):
return ":".join(comp) if comp[1] else comp[0]
Expand Down