-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathnode.cpp
More file actions
171 lines (135 loc) · 5.19 KB
/
node.cpp
File metadata and controls
171 lines (135 loc) · 5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/* An IP node.
*
* Author: Daniel Krebs <github@daniel-krebs.net>
* SPDX-FileCopyrightText: 2017 Institute for Automation of Complex Power Systems, RWTH Aachen University
* SPDX-License-Identifier: Apache-2.0
*/
#include <stdexcept>
#include <unordered_map>
#include <jansson.h>
#include <villas/exceptions.hpp>
#include <villas/fpga/card.hpp>
#include <villas/fpga/ips/switch.hpp>
#include <villas/fpga/node.hpp>
#include <villas/utils.hpp>
using namespace villas::fpga::ip;
StreamGraph Node::streamGraph;
void NodeFactory::parse(Core &ip, json_t *cfg) {
CoreFactory::parse(ip, cfg);
auto &Node = dynamic_cast<ip::Node &>(ip);
auto logger = getLogger();
json_t *json_ports = json_object_get(cfg, "ports");
if (json_ports && json_is_array(json_ports)) {
size_t index;
json_t *json_port;
json_array_foreach (json_ports, index, json_port) {
if (not json_is_object(json_port))
throw ConfigError(json_port, "", "Port {} is not an object", index);
const char *role_raw, *target_raw, *name_raw;
json_error_t err;
int ret =
json_unpack_ex(json_port, &err, 0, "{ s: s, s: s, s: s }", "role",
&role_raw, "target", &target_raw, "name", &name_raw);
if (ret != 0)
throw ConfigError(json_port, err, "", "Cannot parse port {}", index);
const auto tokens = utils::tokenize(target_raw, ":");
if (tokens.size() != 2)
throw ConfigError(json_port, err, "",
"Cannot parse 'target' of port {}", index);
const std::string role(role_raw);
const bool isMaster = (role == "master" or role == "initiator");
auto thisVertex = Node::streamGraph.getOrCreateStreamVertex(
ip.getInstanceName(), name_raw, isMaster);
auto connectedVertex = Node::streamGraph.getOrCreateStreamVertex(
tokens[0], tokens[1], not isMaster);
if (isMaster) {
Node::streamGraph.addDefaultEdge(thisVertex->getIdentifier(),
connectedVertex->getIdentifier());
Node.portsMaster[name_raw] = thisVertex;
} else // Slave
Node.portsSlave[name_raw] = thisVertex;
}
} else if (json_ports) {
throw ConfigError(json_ports, "", "IP port list of {} must be an array",
ip.getInstanceName());
}
}
std::pair<std::string, std::string> Node::getLoopbackPorts() const {
for (auto &[masterName, masterVertex] : portsMaster) {
for (auto &[slaveName, slaveVertex] : portsSlave) {
StreamGraph::Path path;
if (streamGraph.getPath(masterVertex->getIdentifier(),
slaveVertex->getIdentifier(), path)) {
return {masterName, slaveName};
}
}
}
return {"", ""};
}
bool Node::connect(const StreamVertex &from, const StreamVertex &to) {
if (from.nodeName != getInstanceName()) {
logger->error("Cannot connect from a foreign StreamVertex: {}", from);
return false;
}
StreamGraph::Path path;
if (not streamGraph.getPath(from.getIdentifier(), to.getIdentifier(), path)) {
logger->error("No path from {} to {}", from, to);
return false;
}
if (path.size() == 0) {
return true;
}
auto currentEdge = path.begin();
auto firstEdge = streamGraph.getEdge(*currentEdge);
auto firstHopNode = streamGraph.getVertex(firstEdge->getVertexTo());
auto nextHopNode = firstHopNode;
// Check if next hop is an internal connection
if (firstHopNode->nodeName == getInstanceName()) {
if (not connectInternal(from.portName, firstHopNode->portName)) {
logger->error("Making internal connection from {} to {} failed", from,
*firstHopNode);
return false;
}
// We have to advance to next hop
if (++currentEdge == path.end())
return true; // Arrived at the end of path
auto secondEdge = streamGraph.getEdge(*currentEdge);
auto secondHopNode = streamGraph.getVertex(secondEdge->getVertexTo());
nextHopNode = secondHopNode;
}
auto nextHopNodeIp =
std::dynamic_pointer_cast<Node>(card->lookupIp(nextHopNode->nodeName));
if (nextHopNodeIp == nullptr) {
logger->error("Cannot find IP {}, this shouldn't happen!",
nextHopNode->nodeName);
return false;
}
return nextHopNodeIp->connect(*nextHopNode, to);
}
const StreamVertex &Node::getDefaultSlavePort() const {
logger->error("No default slave port available");
throw std::exception();
}
const StreamVertex &Node::getDefaultMasterPort() const {
logger->error("No default master port available");
throw std::exception();
}
bool Node::loopbackPossible() const {
auto ports = getLoopbackPorts();
return (not ports.first.empty()) and (not ports.second.empty());
}
bool Node::connectInternal(const std::string &slavePort,
const std::string &masterPort) {
(void)slavePort;
(void)masterPort;
logger->warn("This IP doesn't implement an internal connection");
return false;
}
bool Node::connectLoopback() {
auto ports = getLoopbackPorts();
const auto &portMaster = portsMaster[ports.first];
const auto &portSlave = portsSlave[ports.second];
logger->debug("master port: {}", ports.first);
logger->debug("slave port: {}", ports.second);
return connect(*portMaster, *portSlave);
}