diff --git a/main.go b/main.go index 8f42b5c..a99bbbd 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,8 @@ func main() { log.Printf("started server at %s", *addr) // Create a channel do mirror commands with capacity of *buffer to receive the mirrored commands mirrorDoQueue = make(chan rdbDo, *buffer) + // Starting full sync + go fullsync() // Starting a separate goroutine to process the mirror commands go mirrorDo() err := redcon.ListenAndServe(*addr, diff --git a/params.go b/params.go index 76cd78b..f22689f 100644 --- a/params.go +++ b/params.go @@ -9,7 +9,7 @@ var buffer *int func init() { mainRedis = flag.String("main", ":6379", "connection string for the main redis") - mirrorRedis = flag.String("mirror", ":6381", "connection string for the main redis") + mirrorRedis = flag.String("mirror", ":6381", "connection string for the mirror redis") addr = flag.String("addr", ":6380", "connection string for the mirroring service") buffer = flag.Int("buff", 500, "Buffer size for mirror queue. If it is full Do commands will be ignored") flag.Parse() diff --git a/server.go b/server.go index f55cf5a..0de5218 100644 --- a/server.go +++ b/server.go @@ -18,6 +18,31 @@ type rdbDo struct { cmdArgs []interface{} } +func scanAndUpdate(mainRedis, mirrorRedis redis.Conn) { + cursor := 0 + for { + keys, err := redis.Values(mainRedis.Do("SCAN", cursor, "MATCH", "*")) + if err != nil { + log.Println("Scanning failed from the elasticache:", (err)) + } + // Iterate over the keys and update them in the destination + for _, key := range keys { + value, err := redis.String(mainRedis.Do("GET", key)) + if err != nil { + log.Println("Unable get the key/value from elasticache:", (err)) + } + // Update keys in the destination redis + _, err = mirrorRedis.Do("SET", key, value) + if err != nil { + log.Println("Intial replication got failed on the EC2 Redis:", (err)) + } + } + if cursor == 0 { + break + } + } +} + // This function will process the incoming messages from client. It will act as a multiplexer. // To get more information refer to: // https://redis.io/docs/reference/protocol-spec @@ -100,6 +125,10 @@ func redisClose(conn redcon.Conn, err error) { log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err) } +func fullsync() { + scanAndUpdate(mainRedis, mirrorRedis) +} + // This function starts as a goroutine. A concurrent process. // mirrorDo loops forever and run the queued commands against the mirror redis func mirrorDo() {