Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/adaptivequemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() int {
AllowEveryX: 3*len(entry.Workers) + 1,
}
now := time.Now()

throttle.privGapCalculatedTimeInNanos.Store(int64(0))
throttle.RecentAttempt.Store(&now)
sqlBind[concatKey] = &throttle
}
Expand Down
52 changes: 39 additions & 13 deletions lib/bindevict.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package lib

import (
"fmt"
"math"
"regexp"
"sync"
"sync/atomic"
Expand All @@ -28,12 +29,13 @@ import (
)

type BindThrottle struct {
Name string
Value string
Sqlhash uint32
RecentAttempt atomic.Value // time.Time
AllowEveryX int
AllowEveryXCount int
Name string
Value string
Sqlhash uint32
RecentAttempt atomic.Value // time.Time
AllowEveryX int
AllowEveryXCount int
privGapCalculatedTimeInNanos atomic.Value
}

var gBindEvict atomic.Value
Expand Down Expand Up @@ -116,7 +118,7 @@ func (entry *BindThrottle) incrAllowEveryX() {
}
}

func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) {
func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool, throttleRecoveryFactor float64) (bool, *BindThrottle) {
GetBindEvict().lock.Lock()
sqlBinds := GetBindEvict().BindThrottle[sqlhash]
GetBindEvict().lock.Unlock()
Expand All @@ -129,19 +131,43 @@ func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavy
}
/* matched bind name and value
we stop searching and should return something */

now := time.Now()
// update based on usage
if heavyUsage {
entry.incrAllowEveryX()
//disable the gap-based throttle reduction when usage is heavy,
//ensuring reductions only happen during sustained low usage.
privGapCalTime := entry.privGapCalculatedTimeInNanos.Load().(int64)
if privGapCalTime > 0 {
entry.privGapCalculatedTimeInNanos.Store(int64(0))
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "increasing throttle, so clearing previous gap calculated time")
}
}
} else {
entry.decrAllowEveryX(2)
Comment thread
ishi-0987 marked this conversation as resolved.
privGapCalTime := entry.privGapCalculatedTimeInNanos.Load().(int64)
if privGapCalTime == 0 {
entry.privGapCalculatedTimeInNanos.Store(now.UnixNano())
} else {
// check if not used in a while
//This GAP will calculate every one second and decrese throttle for every 1 seconds
//with multiplicative value
gapInSeconds := float64(now.UnixNano()-privGapCalTime) / 1_000_000_000
if gapInSeconds >= 1.0 {
//This calculation helps if sustained low usage around 60 seconds with 40% higher than threshold then
//The recovery will 1 x 10 + 40 per second, in a minute it is going to reduce = 3000.
//This makes from peak value 10000 it takes 2.5 minutes to full recovery from bind throttle.
gap := gapInSeconds*GetConfig().BindEvictionDecrPerSec + math.Ceil(throttleRecoveryFactor)
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, fmt.Sprintf("throttle recovery with time gap: %f, bind eviction dec/sec: %f and throttle recovery factor: %f", gapInSeconds, GetConfig().BindEvictionDecrPerSec, throttleRecoveryFactor))
}
entry.decrAllowEveryX(int(gap))
entry.privGapCalculatedTimeInNanos.Store(now.UnixNano())
}
}
}

// check if not used in a while
now := time.Now()
recent := entry.RecentAttempt.Load().(*time.Time)
gap := now.Sub(*recent).Seconds() * GetConfig().BindEvictionDecrPerSec
entry.decrAllowEveryX(int(gap))
if entry.AllowEveryX == 0 {
return false, nil
}
Expand Down
5 changes: 4 additions & 1 deletion lib/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,11 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error {
numFree := GetStateLog().numFreeWorker(crd.shard.shardID, wType)
heavyUsage := false
thres := float64(GetConfig().BindEvictionTargetConnPct) / 100.0 * float64(cfg)
var throttleRecoveryFactor float64 = 1.0
if numFree < int(thres) {
heavyUsage = true
} else {
throttleRecoveryFactor = ((float64(numFree) - thres) / float64(cfg)) * 100
}
if logger.GetLogger().V(logger.Verbose) {
msg := fmt.Sprintf("bind throttle heavyUsage?%t free:%d cfg:%d pct:%d thres:%f", heavyUsage,
Expand All @@ -686,7 +689,7 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error {
logger.GetLogger().Log(logger.Debug, msg)
}
}
needBlock, throttleEntry := GetBindEvict().ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage)
needBlock, throttleEntry := GetBindEvict().ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage, throttleRecoveryFactor)
if needBlock {
msg := fmt.Sprintf("k=%s&v=%s&allowEveryX=%d&allowFrac=%.5f&raddr=%s",
throttleEntry.Name,
Expand Down
237 changes: 237 additions & 0 deletions tests/unittest/bindThrottleRecovery/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package main

import (
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a functional test also for it in the mirror repository

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is currently is a functional test. It spawn mysql DB, generate slow queries via gosqldriver

"context"
"database/sql"
"fmt"
"os"
"testing"
"time"

//"github.com/paypal/hera/client/gosqldriver"
_ "github.com/paypal/hera/client/gosqldriver/tcp" /*to register the driver*/

"github.com/paypal/hera/tests/unittest/testutil"
"github.com/paypal/hera/utility/logger"
)

var mx testutil.Mux
var tableName string
var max_conn float64

func cfg() (map[string]string, map[string]string, testutil.WorkerType) {

appcfg := make(map[string]string)
// best to chose an "unique" port in case golang runs tests in paralel
appcfg["bind_port"] = "31002"
appcfg["log_level"] = "5"
appcfg["log_file"] = "hera.log"
appcfg["sharding_cfg_reload_interval"] = "0"
appcfg["rac_sql_interval"] = "0"
appcfg["child.executable"] = "mysqlworker"
appcfg["bind_eviction_names"] = "p"
appcfg["bind_eviction_threshold_pct"] = "50"

appcfg["request_backlog_timeout"] = "1000"
appcfg["soft_eviction_probability"] = "100"

opscfg := make(map[string]string)
max_conn = 25
opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", int(max_conn))
opscfg["opscfg.default.server.log_level"] = "5"

opscfg["opscfg.default.server.saturation_recover_threshold"] = "10"
//opscfg["opscfg.default.server.saturation_recover_throttle_rate"]= "100"
opscfg["opscfg.hera.server.saturation_recover_throttle_rate"] = "100"
// saturation_recover_throttle_rate

return appcfg, opscfg, testutil.MySQLWorker
}

func before() error {
fmt.Printf("before run mysql")
testutil.RunMysql("create table sleep_info (id bigint, seconds float);")
testutil.RunMysql("insert into sleep_info (id,seconds) values(10, 0.01);")
testutil.RunMysql("insert into sleep_info (id,seconds) values(100, 0.1);")
testutil.RunMysql("insert into sleep_info (id,seconds) values(1600, 2.6);")
testutil.RunMysql("insert into sleep_info (id,seconds) values(21001111, 0.1);")
testutil.RunMysql("insert into sleep_info (id,seconds) values(22001111, 0.1);")
testutil.RunMysql("insert into sleep_info (id,seconds) values(29001111, 3.9);")
out, err := testutil.RunMysql(`DELIMITER $$
CREATE FUNCTION sleep_option (id bigint)
RETURNS float
DETERMINISTIC
BEGIN
declare dur float;
declare rv bigint;
select max(seconds) into dur from sleep_info where sleep_info.id=id;
select sleep(dur) into rv;
RETURN dur;
END$$
DELIMITER ;`)
if err != nil {
fmt.Printf("err after run mysql " + err.Error())
return nil
}
fmt.Printf("after run mysql " + out) // */
return nil
}

func TestMain(m *testing.M) {
logger.GetLogger().Log(logger.Debug, "begin 20230918kkang TestMain")
fmt.Printf("TestMain 20230918kkang\n")
os.Exit(testutil.UtilMain(m, cfg, before))
}

func sleepyQ(conn *sql.Conn, delayRow int) error {
stmt, err := conn.PrepareContext(context.Background(), fmt.Sprintf("select * from sleep_info where ( seconds > sleep_option(?) or seconds > 0.0 ) and id=%d", delayRow))
if err != nil {
fmt.Printf("Error preparing sleepyQ %s\n", err.Error())
return err
}
defer stmt.Close()
rows, err := stmt.Query(delayRow)
if err != nil {
fmt.Printf("Error query sleepyQ %s\n", err.Error())
return err
}
defer rows.Close()
return nil
}

var normCliErr error

func NormCliErr() error {
if normCliErr == nil {
normCliErr = fmt.Errorf("normal client got error")
}
return normCliErr
}

func partialBadLoad(fracBad float64) error {
db, err := sql.Open("hera", "127.0.0.1:31002")
if err != nil {
fmt.Printf("Error db %s\n", err.Error())
return err
}
db.SetConnMaxLifetime(111 * time.Second)
db.SetMaxIdleConns(0)
db.SetMaxOpenConns(22111)
defer db.Close()

// client threads of slow queries
var stop2 int
var stop3 int
var badCliErr string
var cliErr string
numBad := int(max_conn * fracBad)
numNorm := int(max_conn*2.1) + 1 - numBad
fmt.Printf("spawning clients bad%d norm%d\n", numBad, numNorm)
mkClients(numBad, &stop2, 29001111, "badClient", &badCliErr, db)
mkClients(numNorm, &stop3, 100, "normClient", &cliErr, db) // bind value is short, so bindevict won't trigger
time.Sleep(3000 * time.Millisecond)

// start normal clients after initial backlog timeouts
var stop int
var normCliErrStr string
mkClients(1, &stop, 21001111, "n client", &normCliErrStr, db)
time.Sleep(1000 * time.Millisecond)

// if we throttle down or stop, it restores
stop2 = 1 // stop bad clients
stop3 = 1
time.Sleep(5 * time.Second) //Make sure that clear throttle
conn, err := db.Conn(context.Background())
if err != nil {
fmt.Printf("Error conn %s\n", err.Error())
return err
}
defer conn.Close()
err = sleepyQ(conn, 29001111)
if err != nil {
msg := fmt.Sprintf("test failed, throttle down didn't restore")
fmt.Printf("%s", msg)
return fmt.Errorf("%s", msg)
}

stop = 1
// tolerate soft eviction on normal client when we did not use bind eviction
if len(normCliErrStr) != 0 {
return NormCliErr()
} // */
return nil
}

func mkClients(num int, stop *int, bindV int, grpName string, outErr *string, db *sql.DB) {
for i := 0; i < num; i++ {
go func(clientId int) {
count := 0
var conn *sql.Conn
var err error
var curErr string
for *stop == 0 {
nowStr := time.Now().Format("15:04:05.000000 ")
if conn == nil {
conn, err = db.Conn(context.Background())
fmt.Printf("%s connected %d\n", grpName, clientId)
if err != nil {
fmt.Printf("%s %s Error %d conn %s\n", nowStr, grpName, clientId, err.Error())
time.Sleep(7 * time.Millisecond)
continue
}
}

fmt.Printf("%s %s %d loop%d %s\n", nowStr, grpName, clientId, count, time.Now().Format("20060102j150405.000000"))
err := sleepyQ(conn, bindV)
if err != nil {
if err.Error() == curErr {
fmt.Printf("%s %s %d same err twice\n", nowStr, grpName, clientId)
conn.Close()
conn = nil
} else {
curErr = err.Error()
*outErr = curErr
fmt.Printf("%s %s %d err %s\n", nowStr, grpName, clientId, curErr)
}
}
count++
time.Sleep(10 * time.Millisecond)
}
fmt.Printf("%s %s %d END loop%d\n", time.Now().Format("15:04:05.000000 "), grpName, clientId, count)
}(i)
}
}

func TestBindThrottleWithRecovery(t *testing.T) {
// we would like to clear hera.log, but even if we try, lots of messages still go there
logger.GetLogger().Log(logger.Debug, "TestBindThrottleWithRecovery +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
err := partialBadLoad(0.8)
time.Sleep(time.Second * 10)
if err != nil {
t.Logf("main step function returned err %s", err.Error())
}
if testutil.RegexCountFile("BIND_THROTTLE", "cal.log") < 0 {
t.Fatalf("BIND_THROTTLE should trigger")
}
if testutil.RegexCountFile("BIND_EVICT", "cal.log") == 0 {
t.Fatalf("BIND_EVICT should trigger")
}

if testutil.RegexCountFile(".*BIND_EVICT\t1354401077\t1.*", "cal.log") < 1 {
t.Fatalf("BIND_EVICT should trigger for SQL HASH 1354401077")
}

if testutil.RegexCountFile(".*BIND_THROTTLE\t1354401077\t1.*", "cal.log") < 1 {
t.Fatalf("BIND_THROTTLE should trigger for SQL HASH 1354401077")
}
err = partialBadLoad(0.07)
time.Sleep(time.Second * 5)
if testutil.RegexCountFile(".*throttle recovery with time gap.*", "hera.log") < 1 {
t.Fatalf("throttle recovery should be triggerred")
}
if testutil.RegexCountFile(".*clearing previous gap calculated time.*", "hera.log") < 1 {
t.Fatalf("clearing previous gap calculated time event should trigger")
}

logger.GetLogger().Log(logger.Debug, "TestBindThrottleWithRecovery done +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
} // */
Loading