Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e3d65be
Added functionality of parallel mis
pnijhara Dec 13, 2025
562c725
Fix get chunk tests
pnijhara Dec 13, 2025
393ff44
Implemented custom sequential mis instead of nx.mis for node count le…
pnijhara Dec 13, 2025
6c94be7
Remove unnecessary docstrings
pnijhara Dec 13, 2025
375f0b1
Remove unnecessary docstrings
pnijhara Dec 13, 2025
dacab14
Remove assert messages and docstrings from mis tests
pnijhara Dec 14, 2025
c592668
Add NetworkX fallback with custom input node count
pnijhara Dec 14, 2025
0a73d7b
Replace nx mis unwrap method with direct call
pnijhara Dec 17, 2025
e76f9ef
Fix suggest review changes in mis.py
pnijhara Dec 26, 2025
31770ff
Fix suggested review changes in utils and policies
pnijhara Dec 26, 2025
9179f1b
Remove unused typing.Any import from mis.py
pnijhara Mar 9, 2026
6810dee
Refactor MIS tests to focus on nx-parallel specific aspects
pnijhara Mar 9, 2026
960b0b3
Simplify NetworkX MIS import using nx.maximal_independent_set.orig_func
pnijhara Mar 9, 2026
f0a9aec
Remove redundant should_run check and unused _nx_mis import
pnijhara Mar 9, 2026
8ba51ac
Fix style issues for line length and quote consistency
pnijhara Mar 9, 2026
2abc98b
Add final pass to ensure MIS maximality after parallel merge
pnijhara Mar 9, 2026
94e5537
Add maximal_independent_set to get_info
pnijhara Mar 9, 2026
b0641dd
Add on_start_tests to mark MIS order test as xfail
pnijhara Mar 12, 2026
c735307
Merge remote-tracking branch 'upstream/main' into parallel-mis
pnijhara Mar 13, 2026
81238b8
Simplify MIS adjacency handling and update docstring
pnijhara May 4, 2026
d8be1c3
Change MIS return type to set to match NetworkX main
pnijhara May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,17 @@ def get_info():
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."
},
},
"maximal_independent_set": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/mis.py#L9",
"additional_docs": "Returns a random maximal independent set guaranteed to contain a given set of nodes.",
"additional_parameters": {
"G : NetworkX graph": "An undirected graph.",
"nodes : list or iterable, optional": "Nodes that must be part of the independent set. This set of nodes must be independent. If not provided, a random starting node is chosen.",
"seed : integer, random_state, or None (default)": "Indicator of random number generation state. See :ref:`Randomness<randomness>`.",
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of nodes and returns chunks. The default chunking divides nodes into n_jobs chunks.",
"indep_nodes : list": "List of nodes that are part of a maximal independent set.",
},
},
"node_redundancy": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/bipartite/redundancy.py#L12",
"additional_docs": "In the parallel implementation we divide the nodes into chunks and compute the node redundancy coefficients for all `node_chunk` in parallel.",
Expand Down
1 change: 1 addition & 0 deletions nx_parallel/algorithms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
from .cluster import *
from .link_prediction import *
from .dag import *
from .mis import *
193 changes: 193 additions & 0 deletions nx_parallel/algorithms/mis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
from joblib import Parallel, delayed
import nx_parallel as nxp
import networkx as nx

__all__ = ["maximal_independent_set"]


@nxp._configure_if_nx_active(should_run=nxp.should_run_if_large(50000))
def maximal_independent_set(G, nodes=None, seed=None, get_chunks="chunks"):
"""Returns a random maximal independent set guaranteed to contain
a given set of nodes.

This parallel implementation processes nodes in chunks across multiple
cores.

An independent set is a set of nodes such that the subgraph
of G induced by these nodes contains no edges. A maximal
independent set is an independent set such that it is not possible
to add a new node and still get an independent set.

The parallel computation divides nodes into chunks and processes them
in parallel, iteratively building the independent set faster than
sequential processing on large graphs.

networkx.maximal_independent_set: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.mis.maximal_independent_set.html

Parameters
----------
G : NetworkX graph
An undirected graph.

nodes : list or iterable, optional
Nodes that must be part of the independent set. This set of nodes
must be independent. If not provided, a random starting node is chosen.

seed : integer, random_state, or None (default)
Indicator of random number generation state.
See :ref:`Randomness<randomness>`.

get_chunks : str, function (default = "chunks")
A function that takes in a list of nodes and returns chunks.
The default chunking divides nodes into n_jobs chunks.

Returns
-------
indep_nodes : list
List of nodes that are part of a maximal independent set.

Raises
------
NetworkXUnfeasible
If the nodes in the provided list are not part of the graph or
do not form an independent set, an exception is raised.

NetworkXNotImplemented
If `G` is directed.

Examples
--------
>>> import networkx as nx
>>> import nx_parallel as nxp
>>> G = nx.path_graph(5)
>>> nxp.maximal_independent_set(G) # doctest: +SKIP
[4, 0, 2]
>>> nxp.maximal_independent_set(G, [1]) # doctest: +SKIP
[1, 3]

Notes
-----
This algorithm does not solve the maximum independent set problem.
Comment thread
dschult marked this conversation as resolved.
It computes a maximal independent set, which is not guaranteed to be
the largest possible independent set in the graph.

The parallel version uses a chunk-based parallel algorithm that
provides speedup on large graphs (>= 50000 nodes). For smaller graphs,
the NetworkX sequential version is used automatically.

The parallel implementation may produce a maximal independent set with
different node ordering than the sequential version, but both are valid
maximal independent sets.

"""
if hasattr(G, "graph_object"):
G = G.graph_object

# Validate directed graph
if G.is_directed():
raise nx.NetworkXNotImplemented(
"NX-PARALLEL: Not implemented for directed graphs."
)

# Note: When called through nx.maximal_independent_set with backend="parallel",
# the @py_random_state(2) decorator in NetworkX runs BEFORE @_dispatchable,
# so seed is already a Random object by the time it reaches this backend function.
# However, keeping this conversion for defensive purposes in case this function
# is called directly via nxp.maximal_independent_set().
import random

if seed is not None and hasattr(seed, "random"):
rng = seed
elif seed is not None:
rng = random.Random(seed)
else:
rng = random._inst

# Validate nodes parameter
if nodes is not None:
nodes_set = set(nodes)
if not nodes_set.issubset(G):
raise nx.NetworkXUnfeasible(f"{nodes} is not a subset of the nodes of G")
neighbors = (
set.union(*[set(G.adj[v]) for v in nodes_set]) if nodes_set else set()
)
if set.intersection(neighbors, nodes_set):
raise nx.NetworkXUnfeasible(f"{nodes} is not an independent set of G")
else:
nodes_set = set()

n_jobs = nxp.get_n_jobs()

# Parallel strategy: Run complete MIS algorithm on node chunks independently
all_nodes = list(G)

# Remove required nodes and their neighbors from consideration
if nodes_set:
available = set(all_nodes) - nodes_set
for node in nodes_set:
available.difference_update(G.neighbors(node))
available = list(available)
else:
available = all_nodes

# Shuffle for randomness
rng.shuffle(available)

# Split into chunks
if get_chunks == "chunks":
chunks = list(nxp.chunks(available, n_jobs))
else:
chunks = list(get_chunks(available))

def _process_chunk_independent(chunk, chunk_seed):
"""Process chunk completely independently - build local MIS."""
local_rng = random.Random(chunk_seed)
local_mis = []
local_excluded = set()
chunk_set = set(chunk)

# Shuffle chunk for randomness
chunk_list = list(chunk)
local_rng.shuffle(chunk_list)

for node in chunk_list:
if node not in local_excluded:
# Add to MIS
local_mis.append(node)
local_excluded.add(node)
# Mark neighbors as excluded (only within this chunk)
for neighbor in G.neighbors(node):
if neighbor in chunk_set:
local_excluded.add(neighbor)
Comment thread
pnijhara marked this conversation as resolved.

return local_mis

# Generate seeds for each chunk
chunk_seeds = [rng.randint(0, 2**31 - 1) for _ in range(len(chunks))]

# Process chunks in parallel
results = Parallel()(
delayed(_process_chunk_independent)(chunk, chunk_seeds[i])
for i, chunk in enumerate(chunks)
)

# Merge results: resolve conflicts between chunks
indep_set = set(nodes_set) if nodes_set else set()
excluded = set(all_nodes) - set(available) if nodes_set else set()
Comment thread
pnijhara marked this conversation as resolved.

# Process results in order, greedily adding non-conflicting nodes
for local_mis in results:
for node in local_mis:
if node not in excluded:
indep_set.add(node)
excluded.add(node)
excluded.update(G.neighbors(node))

# Final pass: ensure maximality by adding any remaining available nodes
for node in available:
if node not in excluded:
indep_set.add(node)
excluded.add(node)
excluded.update(G.neighbors(node))

return indep_set
52 changes: 52 additions & 0 deletions nx_parallel/algorithms/tests/test_mis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import networkx as nx
import nx_parallel as nxp


def test_should_run_small_graph():
"""Small graphs should fall back to NetworkX sequential implementation."""
G = nx.fast_gnp_random_graph(100, 0.1, seed=42)
H = nxp.ParallelGraph(G)

result = nxp.maximal_independent_set.should_run(H)
assert result == "Graph too small for parallel execution"


def test_should_run_large_graph():
"""Large graphs should use the parallel implementation."""
G = nx.fast_gnp_random_graph(60000, 0.0001, seed=42)
H = nxp.ParallelGraph(G)
Comment thread
pnijhara marked this conversation as resolved.

result = nxp.maximal_independent_set.should_run(H)
assert result is True


def test_get_chunks():
"""Test custom chunking function."""
G = nx.fast_gnp_random_graph(60000, 0.0001, seed=42)
H = nxp.ParallelGraph(G)

def custom_chunks(nodes):
nodes_list = list(nodes)
mid = len(nodes_list) // 2
return [nodes_list[:mid], nodes_list[mid:]]

result1 = nxp.maximal_independent_set(H, seed=42)
result2 = nxp.maximal_independent_set(H, seed=42, get_chunks=custom_chunks)

# Both should be valid independent sets (correctness is tested by NetworkX)
for result in [result1, result2]:
result_set = set(result)
for node in result:
neighbors = set(G.neighbors(node))
assert not result_set.intersection(neighbors)


def test_parallel_deterministic_with_seed():
"""Parallel execution with same seed should produce same result."""
G = nx.fast_gnp_random_graph(60000, 0.0001, seed=42)
H = nxp.ParallelGraph(G)

result1 = nxp.maximal_independent_set(H, seed=42)
result2 = nxp.maximal_independent_set(H, seed=42)

assert result1 == result2
34 changes: 34 additions & 0 deletions nx_parallel/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
"average_neighbor_degree",
# Connectivity
"all_pairs_node_connectivity",
# Maximal Independent Set
"maximal_independent_set",
]


Expand Down Expand Up @@ -136,6 +138,38 @@ def should_run(cls, name, args, kwargs):
"""
return getattr(cls, name).should_run(*args, **kwargs)

@staticmethod
def on_start_tests(items):
"""Modify pytest items after tests have been collected.

This is called during pytest_collection_modifyitems phase.
Mark tests that have different valid behavior in parallel backend.
"""
try:
import pytest
except ModuleNotFoundError:
return

xfail_tests = {
"test_random_seed": (
"test_mis.py",
"Parallel MIS produces different valid ordering than sequential",
),
"test_K5[graph0]": (
"test_mis.py",
"Return type changed from list to set in NetworkX main",
),
"test_K5[graph1]": (
"test_mis.py",
"Return type changed from list to set in NetworkX main",
),
}

for item in items:
for test_name, (filename, reason) in xfail_tests.items():
if item.name == test_name and filename in str(item.fspath):
item.add_marker(pytest.mark.xfail(reason=reason))


for attr in ALGORITHMS:
setattr(BackendInterface, attr, getattr(algorithms, attr))
1 change: 1 addition & 0 deletions nx_parallel/tests/test_get_chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def test_get_functions_with_get_chunks():
ignore_funcs = [
"number_of_isolates",
"is_reachable",
"maximal_independent_set",
]


Expand Down
39 changes: 30 additions & 9 deletions nx_parallel/utils/should_run_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,41 @@
]


def should_skip_parallel(*_):
def should_skip_parallel(*_, **__):
return "Fast algorithm; skip parallel execution"


def should_run_if_large(G, *_):
if hasattr(G, "graph_object"):
G = G.graph_object
def should_run_if_large(G=None, nodes_threshold=200, *_, **__):
# Detect if first arg is a graph (has both __len__ and nodes attributes)
is_graph = G is not None and hasattr(G, "__len__") and hasattr(G, "nodes")

if len(G) <= 200:
return "Graph too small for parallel execution"
return True
if is_graph:
# Direct usage: called with a graph as first argument
# Example: should_run_if_large(G) or func.should_run(G)
if hasattr(G, "graph_object"):
G = G.graph_object
Comment thread
pnijhara marked this conversation as resolved.

if len(G) < nodes_threshold:
return "Graph too small for parallel execution"
return True

# Factory usage: called with threshold (positional or keyword) but no graph
# Examples: should_run_if_large(50000) or should_run_if_large(nodes_threshold=50000)
# Use G if it's a number (threshold passed positionally), otherwise use nodes_threshold
threshold = G if G is not None and isinstance(G, (int, float)) else nodes_threshold

def wrapper(G, *_, **__):
if hasattr(G, "graph_object"):
G = G.graph_object
Comment thread
dschult marked this conversation as resolved.

if len(G) < threshold:
return "Graph too small for parallel execution"
return True

return wrapper


def default_should_run(*_):
def default_should_run(*_, **__):
n_jobs = nxp.get_n_jobs()
print(f"{n_jobs=}")
if n_jobs in (None, 0, 1):
Expand All @@ -38,7 +59,7 @@ def should_run_if_nodes_none(G, nodes=None, *_):


def should_run_if_sparse(threshold=0.3):
def wrapper(G, *_):
def wrapper(G, *_, **__):
if hasattr(G, "graph_object"):
G = G.graph_object

Expand Down
Loading