From ed657830df60a395122080da8cb7af75bcf0d2a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=80=E7=B2=9F?= Date: Fri, 27 Dec 2024 00:18:45 +0800 Subject: [PATCH 01/11] patch etcd value --- internal/core/message/a6conf.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 127eb6a..53200b6 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -6,6 +6,11 @@ import ( "strings" ) +type Labels struct { + ServiceName string `json:"service_name,omitempty"` + DiscoveryType string `json:"discovery_type,omitempty"` +} + type UpstreamArg struct { NamespaceID string `json:"namespace_id,omitempty"` GroupName string `json:"group_name,omitempty"` @@ -103,7 +108,7 @@ func embedElm(v reflect.Value, all map[string]interface{}) { } if fieldName == "DiscoveryType" || fieldName == "ServiceName" { - all["_"+tagName] = val.Interface() + // all["_"+tagName] = val.Interface() delete(all, tagName) continue } @@ -175,6 +180,7 @@ func NewUpstreams(value []byte) (A6Conf, error) { } type Routes struct { + Labels Labels `json:"labels"` Upstream Upstream `json:"upstream"` All map[string]interface{} `json:"-"` hasNodesAttr bool @@ -206,11 +212,21 @@ func NewRoutes(value []byte) (A6Conf, error) { routes := &Routes{ All: make(map[string]interface{}), } + + // println("===", string(value)) + err := unmarshal(value, routes) if err != nil { return nil, err } + routes.Upstream.ServiceName = routes.Labels.ServiceName + routes.Upstream.DiscoveryType = routes.Labels.DiscoveryType + + // if routes.Upstream.ServiceName != "" { + // println(routes.Upstream.ServiceName) + // } + if routes.Upstream.Nodes != nil { routes.hasNodesAttr = true } From 4187fb51f70c612aa8ca1a9dc07675ea52157977 Mon Sep 17 00:00:00 2001 From: Hugo Zhu Date: Fri, 27 Dec 2024 13:51:32 +0800 Subject: [PATCH 02/11] fixed labels --- internal/core/message/a6conf.go | 39 +++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 53200b6..26953cf 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -7,8 +7,8 @@ import ( ) type Labels struct { - ServiceName string `json:"service_name,omitempty"` DiscoveryType string `json:"discovery_type,omitempty"` + ServiceName string `json:"service_name,omitempty"` } type UpstreamArg struct { @@ -107,11 +107,11 @@ func embedElm(v reflect.Value, all map[string]interface{}) { continue } - if fieldName == "DiscoveryType" || fieldName == "ServiceName" { - // all["_"+tagName] = val.Interface() - delete(all, tagName) - continue - } + // if fieldName == "DiscoveryType" || fieldName == "ServiceName" { + // all["_"+tagName] = val.Interface() + // delete(all, tagName) + // continue + // } if val.Kind() == reflect.Ptr { val = val.Elem() @@ -179,6 +179,11 @@ func NewUpstreams(value []byte) (A6Conf, error) { return ups, nil } +// "labels": { +// "service_name":"aquaman-user", +// "discovery_type":"nacos" +// }, + type Routes struct { Labels Labels `json:"labels"` Upstream Upstream `json:"upstream"` @@ -193,7 +198,10 @@ func (routes *Routes) GetAll() *map[string]interface{} { func (routes *Routes) Marshal() ([]byte, error) { embedElm(reflect.ValueOf(routes), routes.All) - return json.Marshal(routes.All) + // routes.All["labels"] = routes.Labels + bytes, error := json.Marshal(routes.All) + // print("a6conf marshal 2=====", string(bytes)) + return bytes, error } func (routes *Routes) Inject(nodes interface{}) { @@ -220,16 +228,25 @@ func NewRoutes(value []byte) (A6Conf, error) { return nil, err } + if id, ok := routes.All["id"].(string); ok { + // if id == "web_25" { + // println("web_25 in NewRoutes", fmt.Sprintf("%v", string(value))) + // for key, value := range routes.All["labels"].(map[string]interface{}) { + // println(fmt.Sprintf("Key: %s, Value: %v", key, value)) + // } + // } + if routes.Labels.ServiceName != "" { + print("====", id, "====>", routes.Labels.ServiceName) + } + } + routes.Upstream.ServiceName = routes.Labels.ServiceName routes.Upstream.DiscoveryType = routes.Labels.DiscoveryType - // if routes.Upstream.ServiceName != "" { - // println(routes.Upstream.ServiceName) - // } - if routes.Upstream.Nodes != nil { routes.hasNodesAttr = true } + return routes, nil } From 9025ccb02cdf7f2b169b7dffedf05ae1852d7ede Mon Sep 17 00:00:00 2001 From: Hugo Zhu Date: Fri, 27 Dec 2024 14:51:51 +0800 Subject: [PATCH 03/11] fixed labels --- internal/core/message/a6conf.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 26953cf..917da74 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -107,11 +107,14 @@ func embedElm(v reflect.Value, all map[string]interface{}) { continue } - // if fieldName == "DiscoveryType" || fieldName == "ServiceName" { - // all["_"+tagName] = val.Interface() - // delete(all, tagName) - // continue - // } + if fieldName == "DiscoveryType" || fieldName == "ServiceName" { + typStr := typ.String() + if typStr == "message.Upstream" { + // all["_"+tagName] = val.Interface() + delete(all, tagName) + continue + } + } if val.Kind() == reflect.Ptr { val = val.Elem() @@ -196,11 +199,14 @@ func (routes *Routes) GetAll() *map[string]interface{} { } func (routes *Routes) Marshal() ([]byte, error) { + bytes1, _ := json.Marshal(routes.All) + print("a6conf marshal 1=====", string(bytes1)) + embedElm(reflect.ValueOf(routes), routes.All) // routes.All["labels"] = routes.Labels bytes, error := json.Marshal(routes.All) - // print("a6conf marshal 2=====", string(bytes)) + print("a6conf marshal 2=====", string(bytes)) return bytes, error } @@ -236,7 +242,7 @@ func NewRoutes(value []byte) (A6Conf, error) { // } // } if routes.Labels.ServiceName != "" { - print("====", id, "====>", routes.Labels.ServiceName) + println("====", id, "====> ", routes.Labels.ServiceName) } } From 1331d426611fc9481cfbe92b74c885e15fb38ce1 Mon Sep 17 00:00:00 2001 From: Hugo Zhu Date: Fri, 27 Dec 2024 15:47:53 +0800 Subject: [PATCH 04/11] testing --- internal/core/message/a6conf.go | 4 ++++ main.go | 2 ++ 2 files changed, 6 insertions(+) diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 917da74..444ef90 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -2,6 +2,7 @@ package message import ( "encoding/json" + "fmt" "reflect" "strings" ) @@ -86,7 +87,10 @@ func embedElm(v reflect.Value, all map[string]interface{}) { v = v.Elem() } + println(fmt.Printf("===========Identity of v: %v %v\n", v.Type(), v.Type().Name())) + typ := v.Type() + fieldNum := typ.NumField() for i := 0; i < fieldNum; i++ { field := typ.Field(i) diff --git a/main.go b/main.go index 83d37a9..f4a9bbd 100644 --- a/main.go +++ b/main.go @@ -47,6 +47,8 @@ func initLogger(logConf *conf.Log) error { } func main() { + println("===========starting 123============") + conf.InitConf() if err := initLogger(conf.LogConfig); err != nil { From 0f09cbcd6e5858f78675851af72c4001375a02f2 Mon Sep 17 00:00:00 2001 From: Hugo Zhu Date: Fri, 27 Dec 2024 17:57:36 +0800 Subject: [PATCH 05/11] added discovery_args support --- internal/core/message/a6conf.go | 38 +++++++++++++++------------------ 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 444ef90..23d26a4 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -2,14 +2,15 @@ package message import ( "encoding/json" - "fmt" "reflect" "strings" ) type Labels struct { - DiscoveryType string `json:"discovery_type,omitempty"` - ServiceName string `json:"service_name,omitempty"` + DiscoveryType string `json:"discovery_type,omitempty"` + ServiceName string `json:"service_name,omitempty"` + DiscoveryArgsNamespaceID string `json:"discovery_args.namespace_id,omitempty"` + DiscoveryArgsGroupName string `json:"discovery_args.group_name,omitempty"` } type UpstreamArg struct { @@ -87,8 +88,6 @@ func embedElm(v reflect.Value, all map[string]interface{}) { v = v.Elem() } - println(fmt.Printf("===========Identity of v: %v %v\n", v.Type(), v.Type().Name())) - typ := v.Type() fieldNum := typ.NumField() @@ -111,9 +110,9 @@ func embedElm(v reflect.Value, all map[string]interface{}) { continue } - if fieldName == "DiscoveryType" || fieldName == "ServiceName" { - typStr := typ.String() - if typStr == "message.Upstream" { + if fieldName == "DiscoveryType" || fieldName == "ServiceName" || fieldName == "DiscoveryArgs" { + name := typ.Name() + if name == "Upstream" { // all["_"+tagName] = val.Interface() delete(all, tagName) continue @@ -203,14 +202,14 @@ func (routes *Routes) GetAll() *map[string]interface{} { } func (routes *Routes) Marshal() ([]byte, error) { - bytes1, _ := json.Marshal(routes.All) - print("a6conf marshal 1=====", string(bytes1)) + // bytes1, _ := json.Marshal(routes.All) + // print("a6conf marshal 1=====", string(bytes1)) embedElm(reflect.ValueOf(routes), routes.All) // routes.All["labels"] = routes.Labels bytes, error := json.Marshal(routes.All) - print("a6conf marshal 2=====", string(bytes)) + // print("a6conf marshal 2=====", string(bytes)) return bytes, error } @@ -239,20 +238,17 @@ func NewRoutes(value []byte) (A6Conf, error) { } if id, ok := routes.All["id"].(string); ok { - // if id == "web_25" { - // println("web_25 in NewRoutes", fmt.Sprintf("%v", string(value))) - // for key, value := range routes.All["labels"].(map[string]interface{}) { - // println(fmt.Sprintf("Key: %s, Value: %v", key, value)) - // } - // } if routes.Labels.ServiceName != "" { - println("====", id, "====> ", routes.Labels.ServiceName) + println("upstream nodes in route id: ", id, " will be synced with service: ", routes.Labels.ServiceName) + routes.Upstream.ServiceName = routes.Labels.ServiceName + routes.Upstream.DiscoveryType = routes.Labels.DiscoveryType + routes.Upstream.DiscoveryArgs = &UpstreamArg{ + NamespaceID: routes.Labels.DiscoveryArgsNamespaceID, + GroupName: routes.Labels.DiscoveryArgsGroupName, + } } } - routes.Upstream.ServiceName = routes.Labels.ServiceName - routes.Upstream.DiscoveryType = routes.Labels.DiscoveryType - if routes.Upstream.Nodes != nil { routes.hasNodesAttr = true } From 7f44a5fb72960f2f3444f7436a564bf53c2b863d Mon Sep 17 00:00:00 2001 From: Hugo Zhu Date: Fri, 27 Dec 2024 18:44:55 +0800 Subject: [PATCH 06/11] fixed issue when nacos restart, empty hosts --- internal/core/message/a6conf.go | 9 +++++++++ internal/discoverer/nacos.go | 16 ++++++++++++++-- main.go | 2 +- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 23d26a4..0f4090b 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -6,6 +6,15 @@ import ( "strings" ) +// Sample route config +// "labels": +// { +// "discovery_args.group_name": "group_name", +// "discovery_args.namespace_id": "test_name", +// "discovery_type": "nacos", +// "service_name": "test-service" +// }, + type Labels struct { DiscoveryType string `json:"discovery_type,omitempty"` ServiceName string `json:"service_name,omitempty"` diff --git a/internal/discoverer/nacos.go b/internal/discoverer/nacos.go index be514b9..495a594 100644 --- a/internal/discoverer/nacos.go +++ b/internal/discoverer/nacos.go @@ -124,7 +124,7 @@ func (d *NacosDiscoverer) Stop() { func (d *NacosDiscoverer) Query(msg *message.Message) error { serviceId := serviceID(msg.ServiceName(), msg.DiscoveryArgs()) - + println("Nacos: ", msg.ServiceName(), msg.DiscoveryArgs()) d.cacheMutex.Lock() defer d.cacheMutex.Unlock() @@ -143,6 +143,10 @@ func (d *NacosDiscoverer) Query(msg *message.Message) error { if err != nil { return err } + if len(nodes) == 0 { + log.Warnf("No nodes found for service[%s]", serviceId) + return fmt.Errorf("no nodes found for service[%s]", serviceId) + } msg.InjectNodes(nodes) @@ -201,7 +205,10 @@ func (d *NacosDiscoverer) Update(oldMsg, msg *message.Message) error { if err != nil { return err } - + if len(nodes) == 0 { + log.Warnf("No nodes found for service[%s]", serviceId) + return fmt.Errorf("no nodes found for service[%s]", serviceId) + } msg.InjectNodes(nodes) newDiscover.nodes = nodes newDiscover.a6Conf = map[string]*message.Message{ @@ -310,6 +317,11 @@ func (d *NacosDiscoverer) newSubscribeCallback(serviceId string, metadata interf }) } + if len(nodes) == 0 { + log.Warnf("No valid nodes found for service[%s]", serviceId) + return + } + d.cacheMutex.Lock() defer d.cacheMutex.Unlock() diff --git a/main.go b/main.go index f4a9bbd..54d03c1 100644 --- a/main.go +++ b/main.go @@ -47,7 +47,7 @@ func initLogger(logConf *conf.Log) error { } func main() { - println("===========starting 123============") + println("=========== starting ============") conf.InitConf() From 912bd55473a50939124ee5ab3022d16b8cf20ff5 Mon Sep 17 00:00:00 2001 From: Hugo Zhu Date: Sun, 29 Dec 2024 15:36:49 +0800 Subject: [PATCH 07/11] added Dockerfile --- Dockerfile.minimal | 31 +++++++++++++++++++++++++++++++ conf/conf.yaml | 8 +------- docker-compose-build.yml | 25 +++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) create mode 100644 Dockerfile.minimal create mode 100644 docker-compose-build.yml diff --git a/Dockerfile.minimal b/Dockerfile.minimal new file mode 100644 index 0000000..c143b79 --- /dev/null +++ b/Dockerfile.minimal @@ -0,0 +1,31 @@ +FROM golang:alpine AS build + +WORKDIR /build + +ENV GO111MODULE=on \ + CGO_ENABLED=0 \ + GOOS=linux + +COPY . . + +RUN go version + +RUN mkdir -p /usr/local/apisix-seed/logs + +RUN go build -o apisix-seed main.go + +FROM scratch + +WORKDIR /usr/local/apisix-seed + +COPY --from=build /usr/local/apisix-seed/logs /usr/local/apisix-seed/logs + +COPY --from=build /build/apisix-seed . +COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=build /build/conf/conf.yaml conf/conf.yaml + +ENV PATH=$PATH:/usr/local/apisix-seed + +ENV APISIX_SEED_WORKDIR /usr/local/apisix-seed + +CMD [ "/usr/local/apisix-seed/apisix-seed" ] diff --git a/conf/conf.yaml b/conf/conf.yaml index 8dd9ee9..25edc73 100644 --- a/conf/conf.yaml +++ b/conf/conf.yaml @@ -13,7 +13,7 @@ etcd: # the default value is true, e.g. the certificate will be verified strictly. log: level: warn - path: apisix-seed.log # path is the file to write logs to. Backup log files will be retained in the same directory + path: logs/apisix-seed.log # path is the file to write logs to. Backup log files will be retained in the same directory maxage: 168h # maxage is the maximum number of days to retain old log files based on the timestamp encoded in their filename maxsize: 104857600 # maxsize is the maximum size in megabytes of the log file before it gets rotated. It defaults to 100mb rotation_time: 1h # rotation_time is the log rotation time @@ -30,9 +30,3 @@ discovery: # service discovery center connect: 2000 # default 2000ms send: 2000 # default 2000ms read: 5000 # default 5000ms - zookeeper: - hosts: - - "127.0.0.1:2181" - prefix: /zookeeper - weight: 100 # default weight for node - timeout: 10 # default 10s diff --git a/docker-compose-build.yml b/docker-compose-build.yml new file mode 100644 index 0000000..c113590 --- /dev/null +++ b/docker-compose-build.yml @@ -0,0 +1,25 @@ +version: "3" + +services: + apisix_seed_dev: + image: hugozhu/apisix-seed + build: + context: . + dockerfile: Dockerfile.minimal + restart: always + volumes: + - ./conf/conf.yaml:/usr/local/apisix-seed/conf/conf.yaml:ro + - ./logs:/usr/local/apisix-seed/logs + networks: + apisix-seed: + ipv4_address: 172.50.238.50 + +networks: + apisix-seed: + driver: bridge + ipam: + driver: default + config: + - subnet: 172.50.238.0/24 + gateway: 172.50.238.1 + From b93a428e882c282e89080cab70ca8992909a22c9 Mon Sep 17 00:00:00 2001 From: qingshui Date: Mon, 6 Jan 2025 19:33:59 +0800 Subject: [PATCH 08/11] feat: support service use grpc port --- internal/core/message/a6conf.go | 1 + internal/core/message/message.go | 18 ++++++++++++++++++ internal/discoverer/nacos.go | 25 +++++++++++++++++++------ 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 0f4090b..7141cf7 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -20,6 +20,7 @@ type Labels struct { ServiceName string `json:"service_name,omitempty"` DiscoveryArgsNamespaceID string `json:"discovery_args.namespace_id,omitempty"` DiscoveryArgsGroupName string `json:"discovery_args.group_name,omitempty"` + ServiceGrpcPort string `json:"service_grpc_port,omitempty"` } type UpstreamArg struct { diff --git a/internal/core/message/message.go b/internal/core/message/message.go index e572e92..7795e16 100644 --- a/internal/core/message/message.go +++ b/internal/core/message/message.go @@ -2,6 +2,7 @@ package message import ( "reflect" + "strconv" ) type StoreEvent = int @@ -61,6 +62,23 @@ func (msg *Message) DiscoveryType() string { return up.DupDiscoveryType } +func (msg *Message) ServiceGrpcPort() int { + all := msg.a6Conf.GetAll() + if all == nil { + return 0 + } + grpcPort, ok := (*all)["labels"].(map[string]interface{})["service_grpc_port"] + if !ok { + return 0 + } + grpcPortInt, err := strconv.Atoi(grpcPort.(string)) + if err != nil { + println("failed to convert grpc port %s to integer: %s", grpcPort, err) + return 0 + } + return grpcPortInt +} + func (msg *Message) DiscoveryArgs() map[string]interface{} { up := msg.a6Conf.GetUpstream() if up.DiscoveryArgs == nil { diff --git a/internal/discoverer/nacos.go b/internal/discoverer/nacos.go index 495a594..8779731 100644 --- a/internal/discoverer/nacos.go +++ b/internal/discoverer/nacos.go @@ -124,7 +124,7 @@ func (d *NacosDiscoverer) Stop() { func (d *NacosDiscoverer) Query(msg *message.Message) error { serviceId := serviceID(msg.ServiceName(), msg.DiscoveryArgs()) - println("Nacos: ", msg.ServiceName(), msg.DiscoveryArgs()) + println("Nacos: ", msg.ServiceName(), msg.DiscoveryArgs(), msg.ServiceGrpcPort()) d.cacheMutex.Lock() defer d.cacheMutex.Unlock() @@ -138,6 +138,9 @@ func (d *NacosDiscoverer) Query(msg *message.Message) error { id: serviceId, name: msg.ServiceName(), args: msg.DiscoveryArgs(), + a6Conf: map[string]*message.Message{ + serviceId: msg, + }, } nodes, err := d.fetch(dis) if err != nil { @@ -258,6 +261,7 @@ func (d *NacosDiscoverer) fetch(service *NacosService) ([]*message.Node, error) // metadata metadata := service.args["metadata"] + grpcPort := service.a6Conf[service.id].ServiceGrpcPort() nodes := make([]*message.Node, 0) for _, host := range serviceInfo.Hosts { if metadata != nil { @@ -276,10 +280,13 @@ func (d *NacosDiscoverer) fetch(service *NacosService) ([]*message.Node, error) if weight == 0 { weight = d.weight } - + port := int(host.Port) + if grpcPort != 0 { + port = grpcPort + } nodes = append(nodes, &message.Node{ Host: host.Ip, - Port: int(host.Port), + Port: port, Weight: weight, }) } @@ -287,7 +294,7 @@ func (d *NacosDiscoverer) fetch(service *NacosService) ([]*message.Node, error) return nodes, nil } -func (d *NacosDiscoverer) newSubscribeCallback(serviceId string, metadata interface{}) func([]model.SubscribeService, error) { +func (d *NacosDiscoverer) newSubscribeCallback(serviceId string, metadata interface{}, grpcPort int) func([]model.SubscribeService, error) { return func(services []model.SubscribeService, err error) { nodes := make([]*message.Node, 0) meta, ok := metadata.(map[string]interface{}) @@ -310,9 +317,14 @@ func (d *NacosDiscoverer) newSubscribeCallback(serviceId string, metadata interf weight = d.weight } + port := int(inst.Port) + if grpcPort != 0 { + port = grpcPort + } + nodes = append(nodes, &message.Node{ Host: inst.Ip, - Port: int(inst.Port), + Port: port, Weight: weight, }) } @@ -339,10 +351,11 @@ func (d *NacosDiscoverer) subscribe(service *NacosService, client naming_client. groupName, _ := service.args["group_name"].(string) log.Infof("Nacos subscribe service: %s, groupName: %s", service.name, groupName) + grpcPort := service.a6Conf[service.id].ServiceGrpcPort() param := &vo.SubscribeParam{ ServiceName: service.name, GroupName: groupName, - SubscribeCallback: d.newSubscribeCallback(service.id, service.args["metadata"]), + SubscribeCallback: d.newSubscribeCallback(service.id, service.args["metadata"], grpcPort), } // TODO: retry if failed to Subscribe From 86474d8d101b91c4761fc132871ae28a8118a39b Mon Sep 17 00:00:00 2001 From: qingshui Date: Mon, 6 Jan 2025 21:57:50 +0800 Subject: [PATCH 09/11] feat: support service use grpc port v2 --- internal/core/message/a6conf.go | 26 ++++++++++++++++++++++---- internal/core/message/message.go | 18 ------------------ internal/discoverer/nacos.go | 25 ++++++------------------- 3 files changed, 28 insertions(+), 41 deletions(-) diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 7141cf7..815e86d 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -2,8 +2,12 @@ package message import ( "encoding/json" + "fmt" "reflect" + "strconv" "strings" + + "github.com/api7/gopkg/pkg/log" ) // Sample route config @@ -12,7 +16,8 @@ import ( // "discovery_args.group_name": "group_name", // "discovery_args.namespace_id": "test_name", // "discovery_type": "nacos", -// "service_name": "test-service" +// "service_name": "test-service", +// "service_grpc_port": "10001" // }, type Labels struct { @@ -197,7 +202,8 @@ func NewUpstreams(value []byte) (A6Conf, error) { // "labels": { // "service_name":"aquaman-user", -// "discovery_type":"nacos" +// "discovery_type":"nacos", +// "service_grpc_port":"10001" // }, type Routes struct { @@ -212,8 +218,20 @@ func (routes *Routes) GetAll() *map[string]interface{} { } func (routes *Routes) Marshal() ([]byte, error) { - // bytes1, _ := json.Marshal(routes.All) - // print("a6conf marshal 1=====", string(bytes1)) + // If grpc port is configured, modify all nodes' port to grpc port + if routes.Labels.ServiceGrpcPort != "" && routes.Upstream.Nodes != nil { + grpcPort, err := strconv.ParseInt(routes.Labels.ServiceGrpcPort, 10, 64) + if err != nil { + log.Errorf("invalid grpc port configuration: failed to parse port %s to integer for route %s: %s", routes.Labels.ServiceGrpcPort, routes.All["id"], err) + return nil, fmt.Errorf("invalid grpc port configuration: failed to parse port %s to integer for route %s: %s", routes.Labels.ServiceGrpcPort, routes.All["id"], err) + } + if nodes, ok := routes.Upstream.Nodes.([]*Node); ok { + for _, node := range nodes { + node.Port = int(grpcPort) + log.Infof("updated gRPC port to %d for node %s in route %s", grpcPort, node.Host, routes.All["id"]) + } + } + } embedElm(reflect.ValueOf(routes), routes.All) diff --git a/internal/core/message/message.go b/internal/core/message/message.go index 7795e16..e572e92 100644 --- a/internal/core/message/message.go +++ b/internal/core/message/message.go @@ -2,7 +2,6 @@ package message import ( "reflect" - "strconv" ) type StoreEvent = int @@ -62,23 +61,6 @@ func (msg *Message) DiscoveryType() string { return up.DupDiscoveryType } -func (msg *Message) ServiceGrpcPort() int { - all := msg.a6Conf.GetAll() - if all == nil { - return 0 - } - grpcPort, ok := (*all)["labels"].(map[string]interface{})["service_grpc_port"] - if !ok { - return 0 - } - grpcPortInt, err := strconv.Atoi(grpcPort.(string)) - if err != nil { - println("failed to convert grpc port %s to integer: %s", grpcPort, err) - return 0 - } - return grpcPortInt -} - func (msg *Message) DiscoveryArgs() map[string]interface{} { up := msg.a6Conf.GetUpstream() if up.DiscoveryArgs == nil { diff --git a/internal/discoverer/nacos.go b/internal/discoverer/nacos.go index 8779731..495a594 100644 --- a/internal/discoverer/nacos.go +++ b/internal/discoverer/nacos.go @@ -124,7 +124,7 @@ func (d *NacosDiscoverer) Stop() { func (d *NacosDiscoverer) Query(msg *message.Message) error { serviceId := serviceID(msg.ServiceName(), msg.DiscoveryArgs()) - println("Nacos: ", msg.ServiceName(), msg.DiscoveryArgs(), msg.ServiceGrpcPort()) + println("Nacos: ", msg.ServiceName(), msg.DiscoveryArgs()) d.cacheMutex.Lock() defer d.cacheMutex.Unlock() @@ -138,9 +138,6 @@ func (d *NacosDiscoverer) Query(msg *message.Message) error { id: serviceId, name: msg.ServiceName(), args: msg.DiscoveryArgs(), - a6Conf: map[string]*message.Message{ - serviceId: msg, - }, } nodes, err := d.fetch(dis) if err != nil { @@ -261,7 +258,6 @@ func (d *NacosDiscoverer) fetch(service *NacosService) ([]*message.Node, error) // metadata metadata := service.args["metadata"] - grpcPort := service.a6Conf[service.id].ServiceGrpcPort() nodes := make([]*message.Node, 0) for _, host := range serviceInfo.Hosts { if metadata != nil { @@ -280,13 +276,10 @@ func (d *NacosDiscoverer) fetch(service *NacosService) ([]*message.Node, error) if weight == 0 { weight = d.weight } - port := int(host.Port) - if grpcPort != 0 { - port = grpcPort - } + nodes = append(nodes, &message.Node{ Host: host.Ip, - Port: port, + Port: int(host.Port), Weight: weight, }) } @@ -294,7 +287,7 @@ func (d *NacosDiscoverer) fetch(service *NacosService) ([]*message.Node, error) return nodes, nil } -func (d *NacosDiscoverer) newSubscribeCallback(serviceId string, metadata interface{}, grpcPort int) func([]model.SubscribeService, error) { +func (d *NacosDiscoverer) newSubscribeCallback(serviceId string, metadata interface{}) func([]model.SubscribeService, error) { return func(services []model.SubscribeService, err error) { nodes := make([]*message.Node, 0) meta, ok := metadata.(map[string]interface{}) @@ -317,14 +310,9 @@ func (d *NacosDiscoverer) newSubscribeCallback(serviceId string, metadata interf weight = d.weight } - port := int(inst.Port) - if grpcPort != 0 { - port = grpcPort - } - nodes = append(nodes, &message.Node{ Host: inst.Ip, - Port: port, + Port: int(inst.Port), Weight: weight, }) } @@ -351,11 +339,10 @@ func (d *NacosDiscoverer) subscribe(service *NacosService, client naming_client. groupName, _ := service.args["group_name"].(string) log.Infof("Nacos subscribe service: %s, groupName: %s", service.name, groupName) - grpcPort := service.a6Conf[service.id].ServiceGrpcPort() param := &vo.SubscribeParam{ ServiceName: service.name, GroupName: groupName, - SubscribeCallback: d.newSubscribeCallback(service.id, service.args["metadata"], grpcPort), + SubscribeCallback: d.newSubscribeCallback(service.id, service.args["metadata"]), } // TODO: retry if failed to Subscribe From 4a11b3b50b332105143a123025dd985bedb11520 Mon Sep 17 00:00:00 2001 From: qingshui Date: Tue, 7 Jan 2025 14:03:07 +0800 Subject: [PATCH 10/11] feat: enhance gRPC port handling in route configuration - Removed deprecated discovery attributes from test cases. - Added a new test for marshaling routes with gRPC port support. - Updated the Marshal function to correctly set the gRPC port for nodes in the route configuration. --- internal/core/message/a6conf.go | 12 +++-- internal/core/message/a6conf_test.go | 77 ++++++++++++++++++++++++++-- 2 files changed, 81 insertions(+), 8 deletions(-) diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 815e86d..f7ac4ff 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -226,10 +226,16 @@ func (routes *Routes) Marshal() ([]byte, error) { return nil, fmt.Errorf("invalid grpc port configuration: failed to parse port %s to integer for route %s: %s", routes.Labels.ServiceGrpcPort, routes.All["id"], err) } if nodes, ok := routes.Upstream.Nodes.([]*Node); ok { - for _, node := range nodes { - node.Port = int(grpcPort) - log.Infof("updated gRPC port to %d for node %s in route %s", grpcPort, node.Host, routes.All["id"]) + nodesCopy := make([]*Node, len(nodes)) + for i, n := range nodes { + nodesCopy[i] = &Node{ + Host: n.Host, + Weight: n.Weight, + Port: int(grpcPort), + } + log.Infof("updated gRPC port to %d for node %s in route %s", grpcPort, n.Host, routes.All["id"]) } + routes.Upstream.Nodes = nodesCopy } } diff --git a/internal/core/message/a6conf_test.go b/internal/core/message/a6conf_test.go index 935657b..6829c69 100644 --- a/internal/core/message/a6conf_test.go +++ b/internal/core/message/a6conf_test.go @@ -124,11 +124,6 @@ func TestMarshal_Routes(t *testing.T) { "pass_host": "pass", "type": "roundrobin", "hash_on": "vars", - "_discovery_type": "nacos", - "_service_name": "APISIX-NACOS", - "discovery_args": { - "group_name": "DEFAULT_GROUP" - }, "nodes": [ { "host": "192.168.1.1", @@ -157,6 +152,78 @@ func TestMarshal_Routes(t *testing.T) { assert.JSONEq(t, wantA6Str, string(ss)) } +func TestMarshal_Routes_With_Grpc_Port(t *testing.T) { + givenA6Str := `{ + "status": 1, + "id": "3", + "uri": "/hh", + "labels": { + "discovery_type": "nacos", + "discovery_args.group_name": "DEFAULT_GROUP", + "service_name": "test-service", + "service_grpc_port":"10001" + }, + "upstream": { + "scheme": "http", + "pass_host": "pass", + "type": "roundrobin", + "hash_on": "vars", + "nodes": { + "192.168.1.1:10001": 1 + } + }, + "create_time": 1648871506, + "priority": 0, + "update_time": 1648871506 +}` + nodes := []*Node{ + {Host: "192.168.1.1", Port: 80, Weight: 1}, + {Host: "192.168.1.2", Port: 80, Weight: 1}, + } + + wantA6Str := `{ + "status": 1, + "id": "3", + "uri": "/hh", + "labels": { + "discovery_type": "nacos", + "discovery_args.group_name": "DEFAULT_GROUP", + "service_name": "test-service", + "service_grpc_port":"10001" + }, + "upstream": { + "scheme": "http", + "pass_host": "pass", + "type": "roundrobin", + "hash_on": "vars", + "nodes": [ + { + "host": "192.168.1.1", + "port": 10001, + "weight": 1 + }, + { + "host": "192.168.1.2", + "port": 10001, + "weight": 1 + } + ] + }, + "create_time": 1648871506, + "priority": 0, + "update_time": 1648871506 +}` + caseDesc := "sanity" + a6, err := NewA6Conf([]byte(givenA6Str), A6RoutesConf) + assert.Nil(t, err, caseDesc) + + a6.Inject(nodes) + ss, err := a6.Marshal() + assert.Nil(t, err, caseDesc) + + assert.JSONEq(t, wantA6Str, string(ss)) +} + func TestHasNodesAttr_Routes(t *testing.T) { tests := []struct { name string From 94308385e2c3fba8f3831e6d61173f3e696d183f Mon Sep 17 00:00:00 2001 From: Hugo Zhu Date: Tue, 7 Jan 2025 14:51:07 +0800 Subject: [PATCH 11/11] bump version to 1.1 --- main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 54d03c1..4d3bebc 100644 --- a/main.go +++ b/main.go @@ -46,8 +46,10 @@ func initLogger(logConf *conf.Log) error { return nil } +var VERSION = 1.1 + func main() { - println("=========== starting ============") + println("=========== version ", VERSION, " ============") conf.InitConf()