diff --git a/causy/causal_discovery/constraint/orientation_rules/pc.py b/causy/causal_discovery/constraint/orientation_rules/pc.py index b5d3d6f..36adbaa 100644 --- a/causy/causal_discovery/constraint/orientation_rules/pc.py +++ b/causy/causal_discovery/constraint/orientation_rules/pc.py @@ -249,12 +249,13 @@ def process( # It cannot be a collider because we have already oriented all unshielded triples that contain colliders. for z in potential_zs: z = graph.nodes[z] + print(f"x: {x.name}, y: {y.name}, z: {z.name}") breakflag = False if graph.only_directed_edge_exists(x, z) and graph.undirected_edge_exists( z, y ): for node in graph.nodes: - if graph.only_directed_edge_exists(graph.nodes[node], y): + if graph.only_directed_edge_exists(graph.nodes[node], y) and not graph.edge_exists(graph.nodes[node], z): breakflag = True break if breakflag is True: @@ -273,7 +274,11 @@ def process( action=TestResultAction.REMOVE_EDGE_DIRECTED, data={"between": {"x": x, "y": y, "z": z}}, ) - + print(graph.only_directed_edge_exists(y, z)) + print(graph.undirected_edge_exists(z, x)) + if z.name in graph.edges: + if x.name in graph.edges[z.name]: + print(f"{z.name} -> {x.name}") if graph.only_directed_edge_exists(y, z) and graph.undirected_edge_exists( z, x ): @@ -281,6 +286,7 @@ def process( if graph.only_directed_edge_exists(graph.nodes[node], x): breakflag = True break + print(f"breakflag: {breakflag}") if breakflag is True: return TestResult( u=x, diff --git a/tests/test_orientation_tests.py b/tests/test_orientation_tests.py index 2cdef2a..f65a277 100644 --- a/tests/test_orientation_tests.py +++ b/tests/test_orientation_tests.py @@ -1,3 +1,4 @@ +from causy.causal_discovery.constraint.algorithms.pc import PC_ORIENTATION_RULES from causy.common_pipeline_steps.exit_conditions import ExitOnNoActions from causy.common_pipeline_steps.logic import Loop from causy.edge_types import DirectedEdge, UndirectedEdge @@ -436,6 +437,277 @@ def test_non_collider_test(self): self.assertTrue(model.graph.only_directed_edge_exists(x, y)) self.assertTrue(model.graph.only_directed_edge_exists(y, z)) + def test_non_collider_test_auto_mpg_graph_after_collider_rule_noncollider_test( + self, + ): + pipeline = [NonColliderTest()] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestCollider", + ) + )() + model.graph = GraphManager() + acceleration = model.graph.add_node("acceleration", []) + horsepower = model.graph.add_node("horsepower", []) + mpg = model.graph.add_node("mpg", []) + cylinders = model.graph.add_node("cylinders", []) + displacement = model.graph.add_node("displacement", []) + weight = model.graph.add_node("weight", []) + + model.graph.add_edge(mpg, weight, {}) + model.graph.add_edge(displacement, cylinders, {}) + model.graph.add_edge(horsepower, displacement, {}) + + # collider acceleration -> horsepower <- weight + model.graph.add_directed_edge(acceleration, horsepower, {}) + model.graph.add_directed_edge(weight, horsepower, {}) + # collider mpg -> horsepower <- acceleration + model.graph.add_directed_edge(mpg, horsepower, {}) + # collider acceleration -> displacement <- weight + model.graph.add_directed_edge(weight, displacement, {}) + model.graph.add_directed_edge(acceleration, displacement, {}) + + model.execute_pipeline_steps() + # test NonColliderTest + self.assertTrue( + model.graph.edge_of_type_exists(displacement, cylinders, DirectedEdge()) + ) + self.assertTrue( + model.graph.edge_of_type_exists(horsepower, displacement, DirectedEdge()) + ) + + def test_non_collider_test_auto_mpg_graph_after_collider_rule_whole_loop(self): + pipeline = [ + Loop( + pipeline_steps=[ + NonColliderTest(display_name="Non-Collider Test"), + FurtherOrientTripleTest(display_name="Further Orient Triple Test"), + OrientQuadrupleTest(display_name="Orient Quadruple Test"), + FurtherOrientQuadrupleTest( + display_name="Further Orient Quadruple Test" + ), + ], + display_name="Orientation Rules Loop", + exit_condition=ExitOnNoActions(), + ), + ] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestLoopAutoMpgGraph", + ) + )() + model.graph = GraphManager() + acceleration = model.graph.add_node("acceleration", []) + horsepower = model.graph.add_node("horsepower", []) + mpg = model.graph.add_node("mpg", []) + cylinders = model.graph.add_node("cylinders", []) + displacement = model.graph.add_node("displacement", []) + weight = model.graph.add_node("weight", []) + + model.graph.add_edge(mpg, weight, {}) + model.graph.add_edge(displacement, cylinders, {}) + model.graph.add_edge(horsepower, displacement, {}) + + # collider acceleration -> horsepower <- weight + model.graph.add_directed_edge(acceleration, horsepower, {}) + model.graph.add_directed_edge(weight, horsepower, {}) + # collider mpg -> horsepower <- acceleration + model.graph.add_directed_edge(mpg, horsepower, {}) + # collider acceleration -> displacement <- weight + model.graph.add_directed_edge(weight, displacement, {}) + model.graph.add_directed_edge(acceleration, displacement, {}) + + model.execute_pipeline_steps() + # test NonColliderTest + self.assertTrue( + model.graph.edge_of_type_exists(displacement, cylinders, DirectedEdge()) + ) + self.assertTrue( + model.graph.edge_of_type_exists(horsepower, displacement, DirectedEdge()) + ) + + def test_only_non_collider_rule_on_loop_test_model(self): + pipeline = [NonColliderTest()] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestLoop", + ) + )() + model.graph = GraphManager() + x = model.graph.add_node("X", []) + y = model.graph.add_node("Y", []) + z = model.graph.add_node("Z", []) + w = model.graph.add_node("W", []) + model.graph.add_edge(z, w, {}) + model.graph.add_directed_edge(x, z, {}) + model.graph.add_directed_edge(y, z, {}) + model.execute_pipeline_steps() + self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge())) + + def test_loop(self): + pipeline = [ + Loop( + pipeline_steps=[ + NonColliderTest(display_name="Non-Collider Test"), + FurtherOrientTripleTest(display_name="Further Orient Triple Test"), + OrientQuadrupleTest(display_name="Orient Quadruple Test"), + FurtherOrientQuadrupleTest( + display_name="Further Orient Quadruple Test" + ), + ], + display_name="Orientation Rules Loop", + exit_condition=ExitOnNoActions(), + ) + ] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestLoop", + ) + )() + model.graph = GraphManager() + x = model.graph.add_node("X", []) + y = model.graph.add_node("Y", []) + z = model.graph.add_node("Z", []) + w = model.graph.add_node("W", []) + model.graph.add_edge(z, w, {}) + model.graph.add_directed_edge(x, z, {}) + model.graph.add_directed_edge(y, z, {}) + model.execute_pipeline_steps() + self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge())) + + def test_loop_two_iterations(self): + pipeline = [ + Loop( + pipeline_steps=[ + NonColliderTest(display_name="Non-Collider Test"), + FurtherOrientTripleTest(display_name="Further Orient Triple Test"), + OrientQuadrupleTest(display_name="Orient Quadruple Test"), + FurtherOrientQuadrupleTest( + display_name="Further Orient Quadruple Test" + ), + ], + display_name="Orientation Rules Loop", + exit_condition=ExitOnNoActions(), + ) + ] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestLoop", + ) + )() + model.graph = GraphManager() + x = model.graph.add_node("X", []) + y = model.graph.add_node("Y", []) + z = model.graph.add_node("Z", []) + w = model.graph.add_node("W", []) + v = model.graph.add_node("V", []) + model.graph.add_edge(z, w, {}) + model.graph.add_edge(w, v, {}) + model.graph.add_directed_edge(x, z, {}) + model.graph.add_directed_edge(y, z, {}) + model.execute_pipeline_steps() + self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(w, v, DirectedEdge())) + + def test_only_noncollider_rule_on_loop_model_after_one_iteration(self): + pipeline = [ + Loop( + pipeline_steps=[ + NonColliderTest(display_name="Non-Collider Test"), + ], + display_name="NonColliderTest", + exit_condition=ExitOnNoActions(), + ) + ] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestLoop", + ) + )() + model.graph = GraphManager() + x = model.graph.add_node("X", []) + y = model.graph.add_node("Y", []) + z = model.graph.add_node("Z", []) + w = model.graph.add_node("W", []) + v = model.graph.add_node("V", []) + model.graph.add_edge(w, v, {}) + model.graph.add_directed_edge(z, w, {}) + model.graph.add_directed_edge(x, z, {}) + model.graph.add_directed_edge(y, z, {}) + model.execute_pipeline_steps() + self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(w, v, DirectedEdge())) + + def test_non_collider_loop_auto_mpg_graph(self): + pipeline = [ + Loop( + pipeline_steps=[ + NonColliderTest(display_name="Non-Collider Test"), + FurtherOrientTripleTest(display_name="Further Orient Triple Test"), + OrientQuadrupleTest(display_name="Orient Quadruple Test"), + FurtherOrientQuadrupleTest( + display_name="Further Orient Quadruple Test" + ), + ], + display_name="Orientation Rules Loop", + exit_condition=ExitOnNoActions(), + ) + ] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestCollider", + ) + )() + model.graph = GraphManager() + + acceleration = model.graph.add_node("acceleration", []) + horsepower = model.graph.add_node("horsepower", []) + mpg = model.graph.add_node("mpg", []) + cylinders = model.graph.add_node("cylinders", []) + displacement = model.graph.add_node("displacement", []) + weight = model.graph.add_node("weight", []) + + model.graph.add_edge(mpg, weight, {}) + model.graph.add_edge(displacement, cylinders, {}) + model.graph.add_edge(horsepower, displacement, {}) + + model.graph.add_directed_edge(acceleration, horsepower, {}) + model.graph.add_directed_edge(mpg, horsepower, {}) + model.graph.add_directed_edge(weight, horsepower, {}) + model.graph.add_directed_edge(weight, displacement, {}) + model.graph.add_directed_edge(acceleration, displacement, {}) + + model.execute_pipeline_steps() + + self.assertTrue( + model.graph.edge_of_type_exists(displacement, cylinders, DirectedEdge()) + ) + self.assertTrue( + model.graph.edge_of_type_exists(horsepower, displacement, DirectedEdge()) + ) + def test_further_orient_triple_test(self): pipeline = [FurtherOrientTripleTest()] model = graph_model_factory( diff --git a/tests/test_pc_e2e.py b/tests/test_pc_e2e.py index e0d84d3..591d746 100644 --- a/tests/test_pc_e2e.py +++ b/tests/test_pc_e2e.py @@ -1,3 +1,6 @@ +import json +import os + from causy.causal_discovery.constraint.algorithms.pc import ( PC_EDGE_TYPES, PC, @@ -10,7 +13,7 @@ ComputeDirectEffectsInDAGsMultivariateRegression, ) from causy.common_pipeline_steps.calculation import CalculatePearsonCorrelations -from causy.edge_types import DirectedEdge +from causy.edge_types import DirectedEdge, UndirectedEdge from causy.generators import PairsWithNeighboursGenerator from causy.graph_model import graph_model_factory from causy.causal_discovery.constraint.independence_tests.common import ( @@ -29,7 +32,6 @@ class PCTestTestCase(CausyTestCase): SEED = 1 - def _sample_generator(self): rdnv = self.seeded_random.normalvariate return IIDSampleGenerator( @@ -43,6 +45,144 @@ def _sample_generator(self): random=lambda: rdnv(0, 1), ) + def test_pc_e2e_auto_mpg(self): + script_dir = os.path.dirname(os.path.abspath(__file__)) + folder_auto_mpg = os.path.join(script_dir, "fixtures/auto_mpg/") + with open(f"{folder_auto_mpg}auto_mpg.json", "r") as f: + auto_mpg_data_set = json.load(f) + PC_LOCAL = graph_model_factory( + Algorithm( + pipeline_steps=[ + CalculatePearsonCorrelations(display_name="Calculate Pearson Correlations"), + CorrelationCoefficientTest( + threshold=VariableReference(name="threshold"), + display_name="Correlation Coefficient Test", + ), + PartialCorrelationTest( + threshold=VariableReference(name="threshold"), + display_name="Partial Correlation Test", + ), + ExtendedPartialCorrelationTestMatrix( + threshold=VariableReference(name="threshold"), + display_name="Extended Partial Correlation Test Matrix", + ), + *PC_ORIENTATION_RULES, + ComputeDirectEffectsInDAGsMultivariateRegression( + display_name="Compute Direct Effects in DAGs Multivariate Regression" + ), + ], + edge_types=PC_EDGE_TYPES, + extensions=[PC_GRAPH_UI_EXTENSION], + name="PC", + variables=[FloatVariable(name="threshold", value=0.05)], + ) + ) + pc = PC_LOCAL() + pc.create_graph_from_data(auto_mpg_data_set) + pc.create_all_possible_edges() + pc.execute_pipeline_steps() + + for s in pc.graph.action_history: + print(s.name) + for a in s.actions: + print(a.u.name, a.v.name, a.action, a.data.keys()) + + # skeleton + self.assertEqual(pc.graph.edge_exists("mpg", "weight"), True) + self.assertEqual(pc.graph.edge_exists("mpg", "horsepower"), True) + self.assertEqual(pc.graph.edge_exists("weight", "displacement"), True) + self.assertEqual(pc.graph.edge_exists("weight", "horsepower"), True) + self.assertEqual(pc.graph.edge_exists("displacement", "cylinders"), True) + self.assertEqual(pc.graph.edge_exists("displacement", "acceleration"), True) + self.assertEqual(pc.graph.edge_exists("displacement", "horsepower"), True) + self.assertEqual(pc.graph.edge_exists("horsepower", "acceleration"), True) + + # assert all other edges are not present + self.assertEqual(pc.graph.edge_exists("mpg", "displacement"), False) + self.assertEqual(pc.graph.edge_exists("mpg", "cylinders"), False) + self.assertEqual(pc.graph.edge_exists("mpg", "acceleration"), False) + self.assertEqual(pc.graph.edge_exists("weight", "cylinders"), False) + self.assertEqual(pc.graph.edge_exists("weight", "acceleration"), False) + self.assertEqual(pc.graph.edge_exists("acceleration", "cylinders"), False) + self.assertEqual(pc.graph.edge_exists("horsepower", "cylinders"), False) + + # directions + self.assertEqual(pc.graph.edge_of_type_exists("mpg", "weight", UndirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("weight", "horsepower", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("weight", "displacement", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("mpg", "horsepower", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("acceleration", "horsepower", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("acceleration", "displacement", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("displacement", "cylinders", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("horsepower", "displacement", DirectedEdge()), True) + + + def test_pc_collider_rule_on_auto_mpg(self): + script_dir = os.path.dirname(os.path.abspath(__file__)) + folder_auto_mpg = os.path.join(script_dir, "fixtures/auto_mpg/") + with open(f"{folder_auto_mpg}auto_mpg.json", "r") as f: + auto_mpg_data_set = json.load(f) + PC_LOCAL = graph_model_factory( + Algorithm( + pipeline_steps=[ + CalculatePearsonCorrelations(display_name="Calculate Pearson Correlations"), + CorrelationCoefficientTest( + threshold=VariableReference(name="threshold"), + display_name="Correlation Coefficient Test", + ), + PartialCorrelationTest( + threshold=VariableReference(name="threshold"), + display_name="Partial Correlation Test", + ), + ExtendedPartialCorrelationTestMatrix( + threshold=VariableReference(name="threshold"), + display_name="Extended Partial Correlation Test Matrix", + ), + ColliderTest(display_name="Collider Test"), + ], + edge_types=PC_EDGE_TYPES, + extensions=[PC_GRAPH_UI_EXTENSION], + name="PC", + variables=[FloatVariable(name="threshold", value=0.05)], + ) + ) + pc = PC_LOCAL() + pc.create_graph_from_data(auto_mpg_data_set) + pc.create_all_possible_edges() + pc.execute_pipeline_steps() + + # skeleton + self.assertEqual(pc.graph.edge_exists("mpg", "weight"), True) + self.assertEqual(pc.graph.edge_exists("mpg", "horsepower"), True) + self.assertEqual(pc.graph.edge_exists("weight", "displacement"), True) + self.assertEqual(pc.graph.edge_exists("weight", "horsepower"), True) + self.assertEqual(pc.graph.edge_exists("displacement", "cylinders"), True) + self.assertEqual(pc.graph.edge_exists("displacement", "acceleration"), True) + self.assertEqual(pc.graph.edge_exists("displacement", "horsepower"), True) + self.assertEqual(pc.graph.edge_exists("horsepower", "acceleration"), True) + + # assert all other edges are not present + self.assertEqual(pc.graph.edge_exists("mpg", "displacement"), False) + self.assertEqual(pc.graph.edge_exists("mpg", "cylinders"), False) + self.assertEqual(pc.graph.edge_exists("mpg", "acceleration"), False) + self.assertEqual(pc.graph.edge_exists("weight", "cylinders"), False) + self.assertEqual(pc.graph.edge_exists("weight", "acceleration"), False) + self.assertEqual(pc.graph.edge_exists("acceleration", "cylinders"), False) + self.assertEqual(pc.graph.edge_exists("horsepower", "cylinders"), False) + + # after collider rule + self.assertEqual(pc.graph.edge_of_type_exists("mpg", "weight", UndirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("weight", "horsepower", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("weight", "displacement", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("mpg", "horsepower", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("acceleration", "horsepower", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("acceleration", "displacement", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("displacement", "cylinders", UndirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("horsepower", "displacement", UndirectedEdge()), True) + + # wrongly discovered collider? + self.assertEqual(pc.graph.edge_of_type_exists("displacement", "horsepower", DirectedEdge()), False) + def test_pc_number_of_all_proposed_actions_two_nodes(self): """ test if the number of all proposed actions is correct @@ -522,3 +662,26 @@ def test_noncollider_triple_rule_e2e(self): self.assertEqual(tst.graph.edge_of_type_exists("X", "Y", DirectedEdge()), True) self.assertEqual(tst.graph.edge_of_type_exists("Z", "Y", DirectedEdge()), True) self.assertEqual(tst.graph.edge_of_type_exists("Y", "W", DirectedEdge()), True) + + + def test_five_node_example_e2e(self): + rdnv = self.seeded_random.normalvariate + sample_generator = IIDSampleGenerator( + edges=[ + SampleEdge(NodeReference("X"), NodeReference("Z"), 1), + SampleEdge(NodeReference("Y"), NodeReference("Z"), 1), + SampleEdge(NodeReference("Z"), NodeReference("V"), 1), + SampleEdge(NodeReference("Z"), NodeReference("W"), 1), + ], + random=lambda: rdnv(0, 1), + ) + test_data, graph = sample_generator.generate(10000) + tst = PCClassic() + tst.create_graph_from_data(test_data) + tst.create_all_possible_edges() + tst.execute_pipeline_steps() + + self.assertEqual(tst.graph.edge_of_type_exists("X", "Z", DirectedEdge()), True) + self.assertEqual(tst.graph.edge_of_type_exists("Y", "Z", DirectedEdge()), True) + self.assertEqual(tst.graph.edge_of_type_exists("Z", "W", DirectedEdge()), True) + self.assertEqual(tst.graph.edge_of_type_exists("Z", "V", DirectedEdge()), True)