-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGraph.py
More file actions
175 lines (154 loc) · 6.4 KB
/
Graph.py
File metadata and controls
175 lines (154 loc) · 6.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import subprocess
from Utils import get_total_outgoing_rate
import sys
class Node:
def __init__(self):
self.var_values = tuple()
self.initial_state = False
self.out_edges = {}
self.index = -1
self.reachability_probability = (-sys.float_info.max)
self.in_edges = {}
self.unsat_bounds = set()
def add_out_edge(self, edge):
edge_tuple = edge.get_tuple()
if edge_tuple not in self.out_edges:
self.out_edges[edge_tuple] = edge
def add_in_edge(self, edge):
edge_tuple = edge.get_tuple()
if edge_tuple not in self.in_edges:
self.in_edges[edge_tuple] = edge
def __eq__(self, node) -> bool:
return self.var_values == node.var_values
def __lt__(self, node):
return True
class Edge:
def __init__(self):
self.src = None
self.dst = None
self.rate = 0
self.reaction = -1
def __eq__(self, edge) -> bool:
src_check = (self.src.var_values == edge.src.var_values)
dst_check = (self.dst.var_values == edge.dst.var_values)
reaction_check = (self.reaction == edge.reaction)
return src_check and dst_check and reaction_check
def get_tuple(self):
return self.src.var_values + self.dst.var_values + (self.reaction,)
class Graph:
def __init__(self):
self.nodes = {}
self.edges = {}
def get_node(self, node_tuple):
if node_tuple in self.nodes:
return (True, self.nodes[node_tuple])
return (False, None)
def get_edge(self, edge_tuple):
if edge_tuple in self.edges:
return (True, self.edges[edge_tuple])
return (False, None)
def add_node(self, node):
if node.var_values not in self.nodes:
self.nodes[node.var_values] = node
return node
else:
return self.nodes[node.var_values]
def add_edge(self, edge):
edge.src = self.add_node(edge.src)
edge.dst = self.add_node(edge.dst)
edge_tuple = edge.get_tuple()
if edge_tuple not in self.edges:
self.edges[edge_tuple] = edge
self.nodes[edge.src.var_values].add_out_edge(edge)
self.nodes[edge.dst.var_values].add_in_edge(edge)
###############################################################
# Probabilistic Model Checking functions
def to_file_prism_format(graph, model, file_name_prefix):
# the states file (.sta) ###########################################
species_vector = model.get_species_tuple()
states_file_name = file_name_prefix + '.sta'
with open(states_file_name, mode='w', encoding='ascii') as f:
f.truncate(0)
#first line of the .sta file shows the order of variables
state_vector_line = '('
for e in species_vector:
state_vector_line = state_vector_line + e + ','
state_vector_line_list = list(state_vector_line)
state_vector_line_list[-1] = ')'
state_vector_line = ''.join(state_vector_line_list)
f.write(state_vector_line)
f.write('\n')
#every other line is a unique state
index = 0
for n in graph.nodes.values():
n.index = index
if n.initial_state:
initial_state_index = index
line = str(index) + ':('
for j in n.var_values:
line = line + str(j) + ','
line_list = list(line)
line_list[-1] = ')'
line = ''.join(line_list)
f.write(line)
f.write('\n')
index = index + 1
#adding the sink state
line = str(index) + ':('
for i in range (len(model.get_species_tuple())):
line = line + '-1,'
line_list = list(line)
line_list[-1] = ')'
line = ''.join(line_list)
f.write(line)
f.write('\n')
##########################################################################
# the labels file (.lab) #################################################
labels_file_name = file_name_prefix + '.lab'
with open(labels_file_name, mode='w', encoding='ascii') as f:
f.truncate(0)
lab_line = '0="init" 2="sink"'
f.write(lab_line)
f.write('\n')
f.write(str(initial_state_index) + ': 0')
f.write('\n')
f.write(str(index) + ': 2')
f.close()
##########################################################################
# the transition file (.tra) #############################################
trans_file_name = file_name_prefix + '.tra'
with open(trans_file_name, mode='w', encoding='ascii') as f:
f.truncate(0)
size_line = str(len(graph.nodes)) + ' ' + str(len(graph.edges))
f.write(size_line)
f.write('\n')
for e in graph.edges.values():
line = str(e.src.index) + ' ' + str(e.dst.index) + ' ' + str(e.rate)
f.write(line)
f.write('\n')
for n in graph.nodes.values():
total_rate = get_total_outgoing_rate(n.var_values, model)
current_rate = 0
for e in n.out_edges.values():
current_rate = current_rate + e.rate
remainin_rate = total_rate - current_rate
if remainin_rate<=0:
continue
line = str(n.index) + ' ' + str(index) + ' ' + str(remainin_rate)
f.write(line)
f.write('\n')
f.close()
def check_probability(graph, model, file_name_prefix, prism_bin, csl_prop):
to_file_prism_format(graph, model, "./results/" + file_name_prefix + "/" + file_name_prefix)
stdout_result = subprocess.run([prism_bin, '-importmodel', "./results/" + file_name_prefix + "/" + file_name_prefix + ".all", '-pf', csl_prop, '-ctmc'], stdout=subprocess.PIPE)
stdout_result = stdout_result.stdout.decode('utf-8')
stdout_result = stdout_result.splitlines()
result = ''
for r in stdout_result:
if 'Result' in r:
result = r
result = result[result.rfind(':')+2:]
if ' ' in result:
result = result[:result.find(' ')]
result = float(result)
return result