Skip to content

Parallel node setup v2 #1230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/xdist/scheduler/loadscope.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ def add_node(self, node: WorkerController) -> None:
"""
assert node not in self.assigned_work
self.assigned_work[node] = {}
self.assigned_work = dict(
sorted(self.assigned_work.items(), key=lambda item: item[0].gateway.id)
)
Comment on lines +165 to +167
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the other schedulers need this to pass tests, is this intentional?


def remove_node(self, node: WorkerController) -> str | None:
"""Remove a node from the scheduler.
Expand Down
11 changes: 10 additions & 1 deletion src/xdist/workermanage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
import enum
import fnmatch
import os
Expand Down Expand Up @@ -92,9 +93,17 @@ def setup_nodes(
self,
putevent: Callable[[tuple[str, dict[str, Any]]], None],
) -> list[WorkerController]:
# create basetemp directory only once
if hasattr(self.config, "_tmp_path_factory"):
self.config._tmp_path_factory.getbasetemp()

self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
self.trace("setting up nodes")
return [self.setup_node(spec, putevent) for spec in self.specs]
with ThreadPoolExecutor(max_workers=len(self.specs)) as executor:
futs = [
executor.submit(self.setup_node, spec, putevent) for spec in self.specs
]
return [f.result() for f in futs]

def setup_node(
self,
Expand Down
14 changes: 9 additions & 5 deletions testing/test_workermanage.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,15 @@ def test_popen_makegateway_events(
call = hookrecorder.popcall("pytest_xdist_setupnodes")
assert len(call.specs) == 2

call = hookrecorder.popcall("pytest_xdist_newgateway")
assert call.gateway.spec == execnet.XSpec("execmodel=main_thread_only//popen")
assert call.gateway.id == "gw0"
call = hookrecorder.popcall("pytest_xdist_newgateway")
assert call.gateway.id == "gw1"
# check expected gateways
gw_calls = [
hookrecorder.popcall("pytest_xdist_newgateway"),
hookrecorder.popcall("pytest_xdist_newgateway"),
Comment on lines +87 to +88
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also sort the pytest_xdist_newgateway calls inside setup_nodes to maintain behavior

]
assert {c.gateway.id for c in gw_calls} == {"gw0", "gw1"}
assert {c.gateway.spec for c in gw_calls} == {
execnet.XSpec("execmodel=main_thread_only//popen")
}
assert len(hm.group) == 2
hm.teardown_nodes()
assert not len(hm.group)
Expand Down