Skip to content

Commit

Permalink
fixtures: minor cleanups to reorder_items
Browse files Browse the repository at this point in the history
This makes some minor clarity and performance improvements to the code.
  • Loading branch information
bluetech committed Jun 2, 2024
1 parent 9802183 commit 1eee63a
Showing 1 changed file with 65 additions and 60 deletions.
125 changes: 65 additions & 60 deletions src/_pytest/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Mapping
from typing import MutableMapping
from typing import NoReturn
from typing import Optional
Expand Down Expand Up @@ -161,6 +162,12 @@ def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]:
)


# Algorithm for sorting on a per-parametrized resource setup basis.
# It is called for Session scope first and performs sorting
# down to the lower scopes such as to minimize number of "high scope"
# setups and teardowns.


@dataclasses.dataclass(frozen=True)
class FixtureArgKey:
argname: str
Expand All @@ -169,97 +176,91 @@ class FixtureArgKey:
item_cls: Optional[type]


def get_parametrized_fixture_keys(
_V = TypeVar("_V")
OrderedSet = Dict[_V, None]


def get_parametrized_fixture_argkeys(
item: nodes.Item, scope: Scope
) -> Iterator[FixtureArgKey]:
"""Return list of keys for all parametrized arguments which match
the specified scope."""
assert scope is not Scope.Function

try:
callspec: CallSpec2 = item.callspec # type: ignore[attr-defined]
except AttributeError:
return

item_cls = None
if scope is Scope.Session:
scoped_item_path = None
elif scope is Scope.Package:
# Package key = module's directory.
scoped_item_path = item.path.parent
elif scope is Scope.Module:
scoped_item_path = item.path
elif scope is Scope.Class:
scoped_item_path = item.path
item_cls = item.cls # type: ignore[attr-defined]
else:
assert_never(scope)

for argname in callspec.indices:
if callspec._arg2scope[argname] != scope:
continue

item_cls = None
if scope is Scope.Session:
scoped_item_path = None
elif scope is Scope.Package:
# Package key = module's directory.
scoped_item_path = item.path.parent
elif scope is Scope.Module:
scoped_item_path = item.path
elif scope is Scope.Class:
scoped_item_path = item.path
item_cls = item.cls # type: ignore[attr-defined]
else:
assert_never(scope)

param_index = callspec.indices[argname]
yield FixtureArgKey(argname, param_index, scoped_item_path, item_cls)


# Algorithm for sorting on a per-parametrized resource setup basis.
# It is called for Session scope first and performs sorting
# down to the lower scopes such as to minimize number of "high scope"
# setups and teardowns.


def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]:
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]] = {}
argkeys_by_item: Dict[Scope, Dict[nodes.Item, OrderedSet[FixtureArgKey]]] = {}
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, Deque[nodes.Item]]] = {}
for scope in HIGH_SCOPES:
scoped_argkeys_cache = argkeys_cache[scope] = {}
scoped_argkeys_by_item = argkeys_by_item[scope] = {}
scoped_items_by_argkey = items_by_argkey[scope] = defaultdict(deque)
for item in items:
keys = dict.fromkeys(get_parametrized_fixture_keys(item, scope), None)
if keys:
scoped_argkeys_cache[item] = keys
for key in keys:
scoped_items_by_argkey[key].append(item)
items_dict = dict.fromkeys(items, None)
argkeys = dict.fromkeys(get_parametrized_fixture_argkeys(item, scope))
if argkeys:
scoped_argkeys_by_item[item] = argkeys
for argkey in argkeys:
scoped_items_by_argkey[argkey].append(item)

items_set = dict.fromkeys(items)
return list(
reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, Scope.Session)
reorder_items_atscope(
items_set, argkeys_by_item, items_by_argkey, Scope.Session
)
)


def fix_cache_order(
item: nodes.Item,
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]],
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]],
) -> None:
for scope in HIGH_SCOPES:
for key in argkeys_cache[scope].get(item, []):
items_by_argkey[scope][key].appendleft(item)


def reorder_items_atscope(
items: Dict[nodes.Item, None],
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]],
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]],
items: OrderedSet[nodes.Item],
argkeys_by_item: Mapping[Scope, Mapping[nodes.Item, OrderedSet[FixtureArgKey]]],
items_by_argkey: Mapping[Scope, Mapping[FixtureArgKey, "Deque[nodes.Item]"]],
scope: Scope,
) -> Dict[nodes.Item, None]:
) -> OrderedSet[nodes.Item]:
if scope is Scope.Function or len(items) < 3:
return items
ignore: Set[Optional[FixtureArgKey]] = set()
items_deque = deque(items)
items_done: Dict[nodes.Item, None] = {}

scoped_items_by_argkey = items_by_argkey[scope]
scoped_argkeys_cache = argkeys_cache[scope]
scoped_argkeys_by_item = argkeys_by_item[scope]

ignore: Set[FixtureArgKey] = set()
items_deque = deque(items)
items_done: OrderedSet[nodes.Item] = {}
while items_deque:
no_argkey_group: Dict[nodes.Item, None] = {}
no_argkey_items: OrderedSet[nodes.Item] = {}
slicing_argkey = None
while items_deque:
item = items_deque.popleft()
if item in items_done or item in no_argkey_group:
if item in items_done or item in no_argkey_items:
continue
argkeys = dict.fromkeys(
(k for k in scoped_argkeys_cache.get(item, []) if k not in ignore), None
k for k in scoped_argkeys_by_item.get(item, ()) if k not in ignore
)
if not argkeys:
no_argkey_group[item] = None
no_argkey_items[item] = None
else:
slicing_argkey, _ = argkeys.popitem()
# We don't have to remove relevant items from later in the
Expand All @@ -268,16 +269,20 @@ def reorder_items_atscope(
i for i in scoped_items_by_argkey[slicing_argkey] if i in items
]
for i in reversed(matching_items):
fix_cache_order(i, argkeys_cache, items_by_argkey)
items_deque.appendleft(i)
# Fix items_by_argkey order.
for other_scope in HIGH_SCOPES:
other_scoped_items_by_argkey = items_by_argkey[other_scope]
for argkey in argkeys_by_item[other_scope].get(i, ()):
other_scoped_items_by_argkey[argkey].appendleft(i)
break
if no_argkey_group:
no_argkey_group = reorder_items_atscope(
no_argkey_group, argkeys_cache, items_by_argkey, scope.next_lower()
if no_argkey_items:
reordered_no_argkey_items = reorder_items_atscope(
no_argkey_items, argkeys_by_item, items_by_argkey, scope.next_lower()
)
for item in no_argkey_group:
items_done[item] = None
ignore.add(slicing_argkey)
items_done.update(reordered_no_argkey_items)
if slicing_argkey is not None:
ignore.add(slicing_argkey)
return items_done


Expand Down

0 comments on commit 1eee63a

Please sign in to comment.