Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/config/serverconfigreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,21 @@ po::options_description ServerConfigReader::_defineConfigOptions()
po::value<std::string>( &(_vars["ExperimentalTapeRESTAPI"]) )->default_value("false"),
"Enable or disable experimental features of the TAPE REST API"
)
(
"NetlinkMinActive",
po::value<int>()->default_value(1),
"Default minimum number of active transfers over a given netlink"
)
(
"NetlinkMaxActive",
po::value<int>()->default_value(200),
"Default maximum number of active transfers over a given netlink"
)
(
"NetlinkMaxThroughput",
po::value<double>()->default_value(1000000),
"Default througput limit for transfers over a given netlink"
)
;

return config;
Expand Down
96 changes: 81 additions & 15 deletions src/db/mysql/OptimizerDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ static void updateOptimizerEvolution(soci::session &sql,
soci::use(newState.activeCount), soci::use(newState.queueSize),
soci::use(rationale), soci::use(diff);
sql.commit();


}
catch (std::exception &e) {
Expand Down Expand Up @@ -134,7 +134,7 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
// Function reads in all values from the t_se table, which specify
// - inbound and outbound throughput from every SE
// - inbound and outbound maximum number of connections from every SE.
// Additionally, the instantaneous throughput is also computed.
// Additionally, the instantaneous throughput is also computed.
// Returns: A map from SE name (string) --> StorageState (both limits and actual throughput values).
void dumpStorageStates(std::map<std::string, StorageState> *result) {
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "RYAN Opt3.0.0" << commit;
Expand Down Expand Up @@ -198,22 +198,22 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
if (nullInTput != soci::i_null) {
SEState.inbound_max_throughput = inTputGlobal;
}
}
}

// Queries database to get current instantaneous throughput value.
if(SEState.outbound_max_throughput > 0) {
SEState.asSourceThroughputInst = getThroughputAsSourceInst(se);
}
if(SEState.inbound_max_throughput > 0) {
SEState.asDestThroughputInst = getThroughputAsDestinationInst(se);
if(SEState.inbound_max_throughput > 0) {
SEState.asDestThroughputInst = getThroughputAsDestinationInst(se);
}

(*result)[se] = SEState;
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "inbound max throughput for " << se
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "inbound max throughput for " << se
<< ": " << SEState.inbound_max_throughput << commit;
}
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "RYAN Opt3.0.2: " << result->size() << commit;
}
}

OptimizerMode getOptimizerMode(const std::string &source, const std::string &dest) {
return getOptimizerModeInner(sql, source, dest);
Expand Down Expand Up @@ -257,7 +257,7 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
// double getInstThroughputPerConn(const Pair &pair) {
// double avgTput = 0;
// soci::indicator isAvgNull;
// sql <<
// sql <<
// "SELECT SUM(throughput) from t_file WHERE throughput>0 "
// "AND file_state = 'ACTIVE' "
// "AND source_se= :sourceSe AND dest_se= :destSe",
Expand Down Expand Up @@ -442,7 +442,7 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
soci::into(throughput, isThroughputNull);
if (isThroughputNull == soci::i_null) {
throughput = 0;
}
}
return throughput/(1024*1024); //Returns value in MB/s
}

Expand All @@ -455,9 +455,9 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
"SELECT SUM(throughput) FROM t_file "
"WHERE source_se= :name AND file_state='ACTIVE' AND throughput IS NOT NULL",
soci::use(se), soci::into(throughput, isNull);

return throughput; //Returns value in MB/s
}
}

double getThroughputAsDestination(const std::string &se, const boost::posix_time::time_duration &optimizerInterval) {
soci::indicator isThroughputNull;
Expand All @@ -474,7 +474,7 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
throughput = 0;
}
return throughput/(1024*1024);
}
}

double getThroughputAsDestinationInst(const std::string &se) {
double throughput = 0;
Expand All @@ -487,6 +487,72 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
return throughput;
}

// Calculate total throughput over the given netlink
double getThroughputOverNetlink(const std::string &netlink, const boost::posix_time::time_duration &optimizerInterval) {
soci::indicator isThroughputNull;
double throughput = 0;

int timeout = optimizerInterval.total_seconds() - 5;

sql <<
"SELECT SUM(o.throughput) from t_optimizer o, t_netlink_trace nt "
"WHERE o.source_se = nt.source_se AND o.dest_se = nt.dest_se AND nt.netlink = :netlink "
" AND datetime >= UTC_TIMESTAMP() - INTERVAL :optimizerInterval SECOND",
soci::use(netlink, "netlink"), soci::use(timeout, "optimizerInterval"),
soci::into(throughput, isThroughputNull);

if (isThroughputNull == soci::i_null) {
throughput = 0;
}
return throughput/(1024*1024);
}

double getThroughputOverNetlinkInst(const std::string &netlink) {
double throughput = 0;
soci::indicator isNull;

sql << "SELECT SUM(f.throughput) FROM t_file f, t_netlink_trace nt "
"WHERE f.source_se = nt.source_se AND f.dest_se = nt.dest_se AND nt.netlink = :netlink "
" AND f.file_state = 'ACTIVE' AND f.throughput IS NOT NULL ",
soci::use(netlink), soci::into(throughput, isNull);

return throughput;
}

// Writes to range object for a specific netlink.
void getNetlinkLimits(const std::string netlinkName, NetlinkLimits *netlinkLimits) {
// Netlink limits
soci::indicator isNullMin, isNullMax;
soci::indicator isNullMaxThroughput, isNullCapacity;
double maxThroughput, capacity;
sql <<
"SELECT c.min_active, c.max_active, c.max_throughput, s.capacity "
"FROM t_netlink_config c, t_netlink_stat s "
"WHERE c.netlink_name = :netlink_name "
"AND c.head_ip = s.head_ip AND c.tail_ip = s.tail_ip",
soci::use(netlinkName),
soci::into(netlinkLimits->minActive, isNullMin), soci::into(netlinkLimits->maxActive, isNullMax),
soci::into(maxThroughput, isNullMaxThroughput), soci::into(capacity, isNullCapacity);

if (isNullMin == soci::i_null || isNullMax == soci::i_null) {
netlinkLimits->minActive = netlinkLimits->maxActive = 0;
}

if (isNullMaxThroughput == soci::i_null) {
if (isNullCapacity == soci::i_null) {
// If maxThroughput < 0, no throughput limit is set for this specific netlink.
// The optimizer should use the default value.
netlinkLimits->maxThroughput = -1;
}
else {
netlinkLimits->maxThroughput = capacity;
}
}
else {
netlinkLimits->maxThroughput = maxThroughput;
}
}

// Stores value in both the snapshot database t_optimizer as well as t_optimizer_evolution
void storeOptimizerDecision(const Pair &pair, int activeDecision,
const PairState &newState, int diff, const std::string &rationale) {
Expand All @@ -495,7 +561,7 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
}

void storeOptimizerStreams(const Pair &pair, int streams) {

sql.begin();
sql << "UPDATE t_optimizer "
"SET nostreams = :nostreams, datetime = UTC_TIMESTAMP() "
Expand All @@ -517,8 +583,8 @@ class MySqlOptimizerDataSource: public OptimizerDataSource {
// " actual_active = :actualActive, throughput= :throughput, queue_size = :queueSize",
// soci::use(pair.source, "source"), soci::use(pair.destination, "dest"),
// soci::use(newState.activeCount, "actualActive"),
// soci::use(newState.throughput, "throughput"),
// soci::use(newState.queueSize, "queueSize");
// soci::use(newState.throughput, "throughput"),
// soci::use(newState.queueSize, "queueSize");
// sql.commit();
// }
};
Expand Down
51 changes: 50 additions & 1 deletion src/db/schema/mysql/fts-diff-8.2.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,54 @@ CREATE TABLE `t_optimizer` (
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;


DROP TABLE IF EXISTS `t_netlink_stat`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `t_netlink_stat` (
`netlink_id` char(36) NOT NULL,
`head_ip` varchar(150) DEFAULT '*',
`tail_ip` varchar(150) DEFAULT '*',
`head_asn` int(32) DEFAULT '0',
`tail_asn` int(32) DEFAULT '0',
`head_rdns` varchar(253) DEFAULT NULL,
`tail_rdns` varchar(253) DEFAULT NULL,
PRIMARY KEY (`netlink_id`),
CONSTRAINT `idx_ports` UNIQUE (`head_ip`, `tail_ip`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;


DROP TABLE IF EXISTS `t_netlink_trace`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `t_netlink_trace` (
`trace_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`source_se` varchar(255) NOT NULL,
`dest_se` varchar(255) NOT NULL,
`hop_idx` int(8) NOT NULL,
`netlink` char(36) NOT NULL,
PRIMARY KEY (`trace_id`),
CONSTRAINT `idx_pair_hop` UNIQUE (`source_se`, `dest_se`, `hop_idx`),
CONSTRAINT `fk_netlink` FOREIGN KEY (`netlink`) REFERENCES `t_netlink_stat` (`netlink_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;


DROP TABLE IF EXISTS `t_netlink_config`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `t_netlink_config` (
`head_ip` varchar(150) NOT NULL,
`tail_ip` varchar(150) NOT NULL,
`netlink_name` varchar(150) NOT NULL,
`min_active` int(11) DEFAULT NULL,
`max_active` int(11) DEFAULT NULL,
`max_throughput` float DEFAULT NULL
PRIMARY KEY (`head_ip`,`tail_ip`),
UNIQUE KEY `netlink_name` (`netlink_name`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;

INSERT INTO t_schema_vers (major, minor, patch, message)
VALUES (8, 2, 0, 'FTS-1782: SE Throughput Limitation');
VALUES (8, 2, 0, 'FTS-1782: SE Throughput Limitation');
59 changes: 58 additions & 1 deletion src/db/schema/mysql/fts-schema-8.2.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -739,4 +739,61 @@ CREATE TABLE `t_stage_req` (
`concurrent_ops` int(11) DEFAULT '0',
PRIMARY KEY (`vo_name`,`host`,`operation`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Table structure for table `t_netlink_stat`
--

DROP TABLE IF EXISTS `t_netlink_stat`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `t_netlink_stat` (
`netlink_id` char(36) NOT NULL,
`head_ip` varchar(150) DEFAULT '*',
`tail_ip` varchar(150) DEFAULT '*',
`head_asn` int(32) DEFAULT '0',
`tail_asn` int(32) DEFAULT '0',
`head_rdns` varchar(253) DEFAULT NULL,
`tail_rdns` varchar(253) DEFAULT NULL,
PRIMARY KEY (`netlink_id`),
CONSTRAINT `idx_ports` UNIQUE (`head_ip`, `tail_ip`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Table structure for table `t_netlink_trace`
--

DROP TABLE IF EXISTS `t_netlink_trace`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `t_netlink_trace` (
`trace_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`source_se` varchar(255) NOT NULL,
`dest_se` varchar(255) NOT NULL,
`hop_idx` int(8) NOT NULL,
`netlink` char(36) NOT NULL,
PRIMARY KEY (`trace_id`),
CONSTRAINT `idx_pair_hop` UNIQUE (`source_se`, `dest_se`, `hop_idx`),
CONSTRAINT `fk_netlink` FOREIGN KEY (`netlink`) REFERENCES `t_netlink_stat` (`netlink_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Table structure for table `t_netlink_config`
--
DROP TABLE IF EXISTS `t_netlink_config`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `t_netlink_config` (
`head_ip` varchar(150) NOT NULL,
`tail_ip` varchar(150) NOT NULL,
`netlink_name` varchar(150) NOT NULL,
`min_active` int(11) DEFAULT NULL,
`max_active` int(11) DEFAULT NULL,
`max_throughput` float DEFAULT NULL
PRIMARY KEY (`head_ip`,`tail_ip`),
UNIQUE KEY `netlink_name` (`netlink_name`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
/*!40101 SET character_set_client = @saved_cs_client */;
31 changes: 30 additions & 1 deletion src/server/services/optimizer/Optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,35 @@ void Optimizer::setEmaAlpha(double alpha)
}


void Optimizer::setDefaultNetlinkLimits(int netlinkMinActive, int netlinkMaxActive, double netlinkMaxThroughput)
{
defaultNetlinkMinActive = netlinkMinActive;
defaultNetlinkMaxActive = netlinkMaxActive;
defaultNetlinkMaxThroughput = netlinkMaxThroughput;
}


void Optimizer::getOptimizerNetlinkLimits(std::string netlinkName, NetlinkLimits *limits)
{
dataSource->getNetlinkLimits(netlinkName, limits);

if (limits->minActive)
{
limits->minActive = defaultNetlinkMinActive;
}

if (limits->maxActive)
{
limits->maxActive = defaultNetlinkMaxActive;
}

if (limits->maxThroughput < 0)
{
limits->maxThroughput = defaultNetlinkMaxThroughput;
}
}


void Optimizer::run(void)
{
FTS3_COMMON_LOGGER_NEWLOG(DEBUG) << "Optimizer run" << commit;
Expand Down Expand Up @@ -124,7 +153,7 @@ void Optimizer::run(void)
}
}

// Runs runOptimizerForPair for all active pairs, and
// Runs runOptimizerForPair for all active pairs, and
// is responsible for changing decision values.
void Optimizer::updateDecisions(const std::list<Pair> &pairs) {
// Update every pair's decision.
Expand Down
Loading