From 7c44f3ec6845a872564f1fc8775496119b9b7648 Mon Sep 17 00:00:00 2001 From: kalaiselvan-binary Date: Wed, 30 Nov 2022 12:55:45 +0800 Subject: [PATCH 1/3] initial commit --- params.go | 2 +- server.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/params.go b/params.go index 76cd78b..f22689f 100644 --- a/params.go +++ b/params.go @@ -9,7 +9,7 @@ var buffer *int func init() { mainRedis = flag.String("main", ":6379", "connection string for the main redis") - mirrorRedis = flag.String("mirror", ":6381", "connection string for the main redis") + mirrorRedis = flag.String("mirror", ":6381", "connection string for the mirror redis") addr = flag.String("addr", ":6380", "connection string for the mirroring service") buffer = flag.Int("buff", 500, "Buffer size for mirror queue. If it is full Do commands will be ignored") flag.Parse() diff --git a/server.go b/server.go index f55cf5a..cfb95d3 100644 --- a/server.go +++ b/server.go @@ -18,6 +18,35 @@ type rdbDo struct { cmdArgs []interface{} } +func scanAndUpdate(mainRedis, mirrorRedis redis.Conn) error { + cursor := 0 + for { + keys, err := redis.Values(mainRedis.Do("SCAN", cursor)) + if err != nil { + return err + } + // Get the next cursor and keys + cursor, _ = redis.Int(keys[0], nil) + keys, _ = redis.Values(keys[1], nil) + // Iterate over the keys and update them in the destination + for _, key := range keys { + value, err := redis.String(mainRedis.Do("GET", key)) + if err != nil { + return err + } + // Update the key in the destination redis + _, err = mirrorRedis.Do("SET", key, value) + if err != nil { + return err + } + } + if cursor == 0 { + break + } + } + return nil +} + // This function will process the incoming messages from client. It will act as a multiplexer. // To get more information refer to: // https://redis.io/docs/reference/protocol-spec @@ -100,6 +129,15 @@ func redisClose(conn redcon.Conn, err error) { log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err) } +// This block needs help + +func fullsync() { + err = scanAndUpdate(mainRedis, mirrorRedis) + if err != nil { + log.Println("Initial fullsync failed:", (err)) + } +} + // This function starts as a goroutine. A concurrent process. // mirrorDo loops forever and run the queued commands against the mirror redis func mirrorDo() { From e23ae5a67a1d30cf805fb65809f2285faa756e94 Mon Sep 17 00:00:00 2001 From: kalaiselvan-binary Date: Wed, 30 Nov 2022 14:29:22 +0800 Subject: [PATCH 2/3] changes to main() --- main.go | 2 ++ server.go | 9 ++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/main.go b/main.go index 8f42b5c..b3ebe35 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,8 @@ func main() { log.Printf("started server at %s", *addr) // Create a channel do mirror commands with capacity of *buffer to receive the mirrored commands mirrorDoQueue = make(chan rdbDo, *buffer) + // Full sync + go fullsync() // Starting a separate goroutine to process the mirror commands go mirrorDo() err := redcon.ListenAndServe(*addr, diff --git a/server.go b/server.go index cfb95d3..b8f375d 100644 --- a/server.go +++ b/server.go @@ -25,7 +25,7 @@ func scanAndUpdate(mainRedis, mirrorRedis redis.Conn) error { if err != nil { return err } - // Get the next cursor and keys + // Need help to get the value of the next cursor and the keys cursor, _ = redis.Int(keys[0], nil) keys, _ = redis.Values(keys[1], nil) // Iterate over the keys and update them in the destination @@ -130,11 +130,10 @@ func redisClose(conn redcon.Conn, err error) { } // This block needs help - func fullsync() { - err = scanAndUpdate(mainRedis, mirrorRedis) - if err != nil { - log.Println("Initial fullsync failed:", (err)) + sync := scanAndUpdate(mainRedis, mirrorRedis) + if sync != nil { + log.Println("Initial fullsync failed:", sync.Error()) } } From cf23bc31d4f13be59f417068d1e54cf6f50dc2c8 Mon Sep 17 00:00:00 2001 From: kalaiselvan-binary Date: Wed, 30 Nov 2022 16:03:07 +0800 Subject: [PATCH 3/3] Trying some methods --- main.go | 2 +- server.go | 22 +++++++--------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index b3ebe35..a99bbbd 100644 --- a/main.go +++ b/main.go @@ -12,7 +12,7 @@ func main() { log.Printf("started server at %s", *addr) // Create a channel do mirror commands with capacity of *buffer to receive the mirrored commands mirrorDoQueue = make(chan rdbDo, *buffer) - // Full sync + // Starting full sync go fullsync() // Starting a separate goroutine to process the mirror commands go mirrorDo() diff --git a/server.go b/server.go index b8f375d..0de5218 100644 --- a/server.go +++ b/server.go @@ -18,33 +18,29 @@ type rdbDo struct { cmdArgs []interface{} } -func scanAndUpdate(mainRedis, mirrorRedis redis.Conn) error { +func scanAndUpdate(mainRedis, mirrorRedis redis.Conn) { cursor := 0 for { - keys, err := redis.Values(mainRedis.Do("SCAN", cursor)) + keys, err := redis.Values(mainRedis.Do("SCAN", cursor, "MATCH", "*")) if err != nil { - return err + log.Println("Scanning failed from the elasticache:", (err)) } - // Need help to get the value of the next cursor and the keys - cursor, _ = redis.Int(keys[0], nil) - keys, _ = redis.Values(keys[1], nil) // Iterate over the keys and update them in the destination for _, key := range keys { value, err := redis.String(mainRedis.Do("GET", key)) if err != nil { - return err + log.Println("Unable get the key/value from elasticache:", (err)) } - // Update the key in the destination redis + // Update keys in the destination redis _, err = mirrorRedis.Do("SET", key, value) if err != nil { - return err + log.Println("Intial replication got failed on the EC2 Redis:", (err)) } } if cursor == 0 { break } } - return nil } // This function will process the incoming messages from client. It will act as a multiplexer. @@ -129,12 +125,8 @@ func redisClose(conn redcon.Conn, err error) { log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err) } -// This block needs help func fullsync() { - sync := scanAndUpdate(mainRedis, mirrorRedis) - if sync != nil { - log.Println("Initial fullsync failed:", sync.Error()) - } + scanAndUpdate(mainRedis, mirrorRedis) } // This function starts as a goroutine. A concurrent process.