diff --git a/Dockerfile.minimal b/Dockerfile.minimal new file mode 100644 index 0000000..c143b79 --- /dev/null +++ b/Dockerfile.minimal @@ -0,0 +1,31 @@ +FROM golang:alpine AS build + +WORKDIR /build + +ENV GO111MODULE=on \ + CGO_ENABLED=0 \ + GOOS=linux + +COPY . . + +RUN go version + +RUN mkdir -p /usr/local/apisix-seed/logs + +RUN go build -o apisix-seed main.go + +FROM scratch + +WORKDIR /usr/local/apisix-seed + +COPY --from=build /usr/local/apisix-seed/logs /usr/local/apisix-seed/logs + +COPY --from=build /build/apisix-seed . +COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=build /build/conf/conf.yaml conf/conf.yaml + +ENV PATH=$PATH:/usr/local/apisix-seed + +ENV APISIX_SEED_WORKDIR /usr/local/apisix-seed + +CMD [ "/usr/local/apisix-seed/apisix-seed" ] diff --git a/conf/conf.yaml b/conf/conf.yaml index 8dd9ee9..25edc73 100644 --- a/conf/conf.yaml +++ b/conf/conf.yaml @@ -13,7 +13,7 @@ etcd: # the default value is true, e.g. the certificate will be verified strictly. log: level: warn - path: apisix-seed.log # path is the file to write logs to. Backup log files will be retained in the same directory + path: logs/apisix-seed.log # path is the file to write logs to. Backup log files will be retained in the same directory maxage: 168h # maxage is the maximum number of days to retain old log files based on the timestamp encoded in their filename maxsize: 104857600 # maxsize is the maximum size in megabytes of the log file before it gets rotated. It defaults to 100mb rotation_time: 1h # rotation_time is the log rotation time @@ -30,9 +30,3 @@ discovery: # service discovery center connect: 2000 # default 2000ms send: 2000 # default 2000ms read: 5000 # default 5000ms - zookeeper: - hosts: - - "127.0.0.1:2181" - prefix: /zookeeper - weight: 100 # default weight for node - timeout: 10 # default 10s diff --git a/docker-compose-build.yml b/docker-compose-build.yml new file mode 100644 index 0000000..c113590 --- /dev/null +++ b/docker-compose-build.yml @@ -0,0 +1,25 @@ +version: "3" + +services: + apisix_seed_dev: + image: hugozhu/apisix-seed + build: + context: . + dockerfile: Dockerfile.minimal + restart: always + volumes: + - ./conf/conf.yaml:/usr/local/apisix-seed/conf/conf.yaml:ro + - ./logs:/usr/local/apisix-seed/logs + networks: + apisix-seed: + ipv4_address: 172.50.238.50 + +networks: + apisix-seed: + driver: bridge + ipam: + driver: default + config: + - subnet: 172.50.238.0/24 + gateway: 172.50.238.1 + diff --git a/internal/core/message/a6conf.go b/internal/core/message/a6conf.go index 127eb6a..f7ac4ff 100644 --- a/internal/core/message/a6conf.go +++ b/internal/core/message/a6conf.go @@ -2,10 +2,32 @@ package message import ( "encoding/json" + "fmt" "reflect" + "strconv" "strings" + + "github.com/api7/gopkg/pkg/log" ) +// Sample route config +// "labels": +// { +// "discovery_args.group_name": "group_name", +// "discovery_args.namespace_id": "test_name", +// "discovery_type": "nacos", +// "service_name": "test-service", +// "service_grpc_port": "10001" +// }, + +type Labels struct { + DiscoveryType string `json:"discovery_type,omitempty"` + ServiceName string `json:"service_name,omitempty"` + DiscoveryArgsNamespaceID string `json:"discovery_args.namespace_id,omitempty"` + DiscoveryArgsGroupName string `json:"discovery_args.group_name,omitempty"` + ServiceGrpcPort string `json:"service_grpc_port,omitempty"` +} + type UpstreamArg struct { NamespaceID string `json:"namespace_id,omitempty"` GroupName string `json:"group_name,omitempty"` @@ -82,6 +104,7 @@ func embedElm(v reflect.Value, all map[string]interface{}) { } typ := v.Type() + fieldNum := typ.NumField() for i := 0; i < fieldNum; i++ { field := typ.Field(i) @@ -102,10 +125,13 @@ func embedElm(v reflect.Value, all map[string]interface{}) { continue } - if fieldName == "DiscoveryType" || fieldName == "ServiceName" { - all["_"+tagName] = val.Interface() - delete(all, tagName) - continue + if fieldName == "DiscoveryType" || fieldName == "ServiceName" || fieldName == "DiscoveryArgs" { + name := typ.Name() + if name == "Upstream" { + // all["_"+tagName] = val.Interface() + delete(all, tagName) + continue + } } if val.Kind() == reflect.Ptr { @@ -174,7 +200,14 @@ func NewUpstreams(value []byte) (A6Conf, error) { return ups, nil } +// "labels": { +// "service_name":"aquaman-user", +// "discovery_type":"nacos", +// "service_grpc_port":"10001" +// }, + type Routes struct { + Labels Labels `json:"labels"` Upstream Upstream `json:"upstream"` All map[string]interface{} `json:"-"` hasNodesAttr bool @@ -185,9 +218,33 @@ func (routes *Routes) GetAll() *map[string]interface{} { } func (routes *Routes) Marshal() ([]byte, error) { + // If grpc port is configured, modify all nodes' port to grpc port + if routes.Labels.ServiceGrpcPort != "" && routes.Upstream.Nodes != nil { + grpcPort, err := strconv.ParseInt(routes.Labels.ServiceGrpcPort, 10, 64) + if err != nil { + log.Errorf("invalid grpc port configuration: failed to parse port %s to integer for route %s: %s", routes.Labels.ServiceGrpcPort, routes.All["id"], err) + return nil, fmt.Errorf("invalid grpc port configuration: failed to parse port %s to integer for route %s: %s", routes.Labels.ServiceGrpcPort, routes.All["id"], err) + } + if nodes, ok := routes.Upstream.Nodes.([]*Node); ok { + nodesCopy := make([]*Node, len(nodes)) + for i, n := range nodes { + nodesCopy[i] = &Node{ + Host: n.Host, + Weight: n.Weight, + Port: int(grpcPort), + } + log.Infof("updated gRPC port to %d for node %s in route %s", grpcPort, n.Host, routes.All["id"]) + } + routes.Upstream.Nodes = nodesCopy + } + } + embedElm(reflect.ValueOf(routes), routes.All) - return json.Marshal(routes.All) + // routes.All["labels"] = routes.Labels + bytes, error := json.Marshal(routes.All) + // print("a6conf marshal 2=====", string(bytes)) + return bytes, error } func (routes *Routes) Inject(nodes interface{}) { @@ -206,14 +263,30 @@ func NewRoutes(value []byte) (A6Conf, error) { routes := &Routes{ All: make(map[string]interface{}), } + + // println("===", string(value)) + err := unmarshal(value, routes) if err != nil { return nil, err } + if id, ok := routes.All["id"].(string); ok { + if routes.Labels.ServiceName != "" { + println("upstream nodes in route id: ", id, " will be synced with service: ", routes.Labels.ServiceName) + routes.Upstream.ServiceName = routes.Labels.ServiceName + routes.Upstream.DiscoveryType = routes.Labels.DiscoveryType + routes.Upstream.DiscoveryArgs = &UpstreamArg{ + NamespaceID: routes.Labels.DiscoveryArgsNamespaceID, + GroupName: routes.Labels.DiscoveryArgsGroupName, + } + } + } + if routes.Upstream.Nodes != nil { routes.hasNodesAttr = true } + return routes, nil } diff --git a/internal/core/message/a6conf_test.go b/internal/core/message/a6conf_test.go index 935657b..6829c69 100644 --- a/internal/core/message/a6conf_test.go +++ b/internal/core/message/a6conf_test.go @@ -124,11 +124,6 @@ func TestMarshal_Routes(t *testing.T) { "pass_host": "pass", "type": "roundrobin", "hash_on": "vars", - "_discovery_type": "nacos", - "_service_name": "APISIX-NACOS", - "discovery_args": { - "group_name": "DEFAULT_GROUP" - }, "nodes": [ { "host": "192.168.1.1", @@ -157,6 +152,78 @@ func TestMarshal_Routes(t *testing.T) { assert.JSONEq(t, wantA6Str, string(ss)) } +func TestMarshal_Routes_With_Grpc_Port(t *testing.T) { + givenA6Str := `{ + "status": 1, + "id": "3", + "uri": "/hh", + "labels": { + "discovery_type": "nacos", + "discovery_args.group_name": "DEFAULT_GROUP", + "service_name": "test-service", + "service_grpc_port":"10001" + }, + "upstream": { + "scheme": "http", + "pass_host": "pass", + "type": "roundrobin", + "hash_on": "vars", + "nodes": { + "192.168.1.1:10001": 1 + } + }, + "create_time": 1648871506, + "priority": 0, + "update_time": 1648871506 +}` + nodes := []*Node{ + {Host: "192.168.1.1", Port: 80, Weight: 1}, + {Host: "192.168.1.2", Port: 80, Weight: 1}, + } + + wantA6Str := `{ + "status": 1, + "id": "3", + "uri": "/hh", + "labels": { + "discovery_type": "nacos", + "discovery_args.group_name": "DEFAULT_GROUP", + "service_name": "test-service", + "service_grpc_port":"10001" + }, + "upstream": { + "scheme": "http", + "pass_host": "pass", + "type": "roundrobin", + "hash_on": "vars", + "nodes": [ + { + "host": "192.168.1.1", + "port": 10001, + "weight": 1 + }, + { + "host": "192.168.1.2", + "port": 10001, + "weight": 1 + } + ] + }, + "create_time": 1648871506, + "priority": 0, + "update_time": 1648871506 +}` + caseDesc := "sanity" + a6, err := NewA6Conf([]byte(givenA6Str), A6RoutesConf) + assert.Nil(t, err, caseDesc) + + a6.Inject(nodes) + ss, err := a6.Marshal() + assert.Nil(t, err, caseDesc) + + assert.JSONEq(t, wantA6Str, string(ss)) +} + func TestHasNodesAttr_Routes(t *testing.T) { tests := []struct { name string diff --git a/internal/discoverer/nacos.go b/internal/discoverer/nacos.go index be514b9..495a594 100644 --- a/internal/discoverer/nacos.go +++ b/internal/discoverer/nacos.go @@ -124,7 +124,7 @@ func (d *NacosDiscoverer) Stop() { func (d *NacosDiscoverer) Query(msg *message.Message) error { serviceId := serviceID(msg.ServiceName(), msg.DiscoveryArgs()) - + println("Nacos: ", msg.ServiceName(), msg.DiscoveryArgs()) d.cacheMutex.Lock() defer d.cacheMutex.Unlock() @@ -143,6 +143,10 @@ func (d *NacosDiscoverer) Query(msg *message.Message) error { if err != nil { return err } + if len(nodes) == 0 { + log.Warnf("No nodes found for service[%s]", serviceId) + return fmt.Errorf("no nodes found for service[%s]", serviceId) + } msg.InjectNodes(nodes) @@ -201,7 +205,10 @@ func (d *NacosDiscoverer) Update(oldMsg, msg *message.Message) error { if err != nil { return err } - + if len(nodes) == 0 { + log.Warnf("No nodes found for service[%s]", serviceId) + return fmt.Errorf("no nodes found for service[%s]", serviceId) + } msg.InjectNodes(nodes) newDiscover.nodes = nodes newDiscover.a6Conf = map[string]*message.Message{ @@ -310,6 +317,11 @@ func (d *NacosDiscoverer) newSubscribeCallback(serviceId string, metadata interf }) } + if len(nodes) == 0 { + log.Warnf("No valid nodes found for service[%s]", serviceId) + return + } + d.cacheMutex.Lock() defer d.cacheMutex.Unlock() diff --git a/main.go b/main.go index 83d37a9..4d3bebc 100644 --- a/main.go +++ b/main.go @@ -46,7 +46,11 @@ func initLogger(logConf *conf.Log) error { return nil } +var VERSION = 1.1 + func main() { + println("=========== version ", VERSION, " ============") + conf.InitConf() if err := initLogger(conf.LogConfig); err != nil {