Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package api

import (
cmodel "github.com/open-falcon/common/model"

"github.com/open-falcon/graph/index"
)

// 删除一条索引,及其对应的graph索引缓存
func (this *Graph) DeleteIndex(counter *cmodel.GraphCounter, resp *cmodel.SimpleRpcResponse) error {
err := index.DeleteCounter(counter.Endpoint, counter.Counter)
if err != nil {
resp.Code = 1
resp.Msg = err.Error()
}

return nil
}
4 changes: 3 additions & 1 deletion g/g.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
// 0.5.3 fix bug of last&last_raw
// 0.5.4 fix bug of Query.merge
// 0.5.5 use commom(rm model), fix sync disk
// 0.5.6 add auto expansion
// 0.5.7 add api of deleting one index

const (
VERSION = "0.5.6"
VERSION = "0.5.7"
GAUGE = "GAUGE"
DERIVE = "DERIVE"
COUNTER = "COUNTER"
Expand Down
3 changes: 1 addition & 2 deletions g/git.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package g

const (
COMMIT = "gitversion"
COMMIT = "42efe2d"
)
91 changes: 91 additions & 0 deletions http/index_http.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package http

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"

cmodel "github.com/open-falcon/common/model"

"github.com/open-falcon/graph/index"
)

Expand Down Expand Up @@ -121,4 +124,92 @@ func configIndexRoutes() {
RenderDataJson(w, item)
})

http.HandleFunc("/v2/index/delete", httpHandler_deleteIndex)
http.HandleFunc("/v2/index/update", httpHandler_updateIndex)
http.HandleFunc("/v2/index/cached", httpHandler_cachedIndex)
}

// 删除 指定counter对应的mysql索引 及其 本地缓存
func httpHandler_deleteIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
RenderMsgJson(w, "bad http method, use post")
return
}

var body []*cmodel.GraphCounter
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&body)
if err != nil {
RenderMsgJson(w, err.Error())
return
}
if len(body) < 1 {
RenderMsgJson(w, "empty")
return
}

counter := body[0]
err = index.DeleteCounter(counter.Endpoint, counter.Counter)
if err != nil {
RenderMsgJson(w, err.Error())
return
}

RenderDataJson(w, "")
}

func httpHandler_updateIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
RenderMsgJson(w, "bad http method, use post")
return
}

var body []*cmodel.GraphCounter
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&body)
if err != nil {
RenderMsgJson(w, err.Error())
return
}
if len(body) < 1 {
RenderMsgJson(w, "empty")
return
}

counter := body[0]
err = index.UpdateIndexOneV2(counter.Endpoint, counter.Counter)
if err != nil {
RenderMsgJson(w, err.Error())
return
}

RenderDataJson(w, "")
}

func httpHandler_cachedIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
RenderMsgJson(w, "bad http method, use get")
return
}

var body []*cmodel.GraphCounter
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&body)
if err != nil {
RenderMsgJson(w, err.Error())
return
}
if len(body) < 1 {
RenderMsgJson(w, "empty")
return
}

counter := body[0]
item, err := index.GetIndexedItemCacheV2(counter.Endpoint, counter.Counter)
if err != nil {
RenderDataJson(w, fmt.Sprintf("%v", err))
return
}

RenderDataJson(w, item)
}
15 changes: 15 additions & 0 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"log"

cmodel "github.com/open-falcon/common/model"
cutils "github.com/open-falcon/common/utils"

"github.com/open-falcon/graph/g"
)

Expand Down Expand Up @@ -74,3 +76,16 @@ func GetIndexedItemCache(endpoint string, metric string, tags map[string]string,
r = icitem.Item
return
}

func GetIndexedItemCacheV2(endpoint, counter string) (r *cmodel.GraphItem, rerr error) {
md5 := cutils.ChecksumOfPK2(endpoint, counter)
cached := indexedItemCache.Get(md5)
if cached == nil {
rerr = fmt.Errorf("not found")
return
}

icitem := cached.(*IndexCacheItem)
r = icitem.Item
return
}
71 changes: 71 additions & 0 deletions index/index_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package index

import (
"fmt"
"log"

"github.com/open-falcon/common/utils"

"github.com/open-falcon/graph/g"
)

func DeleteCounter(endpoint, counter string) error {
if len(endpoint) < 1 || len(counter) < 1 {
return fmt.Errorf("bad args, endpoint counter: %s, %s", endpoint, counter)
}

config := g.Config()
// get mysqlConn
conn, err := g.GetDbConn("DeleteIndex")
if err != nil {
if config.Debug {
log.Println("[ERROR] get mysql conn fail", err)
}
return err
}

// get endpoint_id
var endpointId int64 = -1
var targetEndpoint string = ""
err = conn.QueryRow("SELECT id,endpoint FROM endpoint WHERE endpoint = ?", endpoint).Scan(&endpointId, &targetEndpoint)
if err != nil {
if config.Debug {
log.Println("[ERROR] query endpoint fail", err)
}
return err
}
if targetEndpoint != endpoint { // 防止注入
return fmt.Errorf("bad arg: endpoint %s", endpoint)
}

// delete counter
var endpointCounterId int64 = -1
var targetCounter string = ""
err = conn.QueryRow("SELECT id,counter FROM endpoint_counter WHERE endpoint_id = ? and counter = ?",
endpointId, counter).Scan(&endpointCounterId, &targetCounter)
if err != nil {
if config.Debug {
log.Println("[ERROR] query counter fail", err)
}
return err
}
if targetCounter != counter { // 防止注入
return fmt.Errorf("bad arg: counter %s", counter)
}

_, err = conn.Exec("DELETE FROM endpoint_counter WHERE id = ?", endpointCounterId)
if err != nil {
if config.Debug {
log.Println("[ERROR] delete counter fail", err)
}
return err
}
log.Printf("delete counter: %s/%s", endpoint, counter)

// delete indexed cache
key := utils.ChecksumOfPK2(endpoint, counter)
indexedItemCache.Remove(key)

// return
return nil
}
20 changes: 20 additions & 0 deletions index/index_update_all_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ func UpdateIndexOne(endpoint string, metric string, tags map[string]string, dsty
return updateIndexFromOneItem(gitem, dbConn)
}

// 使用本地缓存,更新mysql索引; 只要提供endpoint+counter即可
func UpdateIndexOneV2(endpoint, counter string) error {
md5 := cutils.ChecksumOfPK2(endpoint, counter)
cached := indexedItemCache.Get(md5)
if cached == nil {
return fmt.Errorf("not found")
}

icitem := cached.(*IndexCacheItem)
gitem := icitem.Item

dbConn, err := g.GetDbConn("UpdateIndexIncrTask")
if err != nil {
log.Println("[ERROR] make dbConn fail", err)
return err
}

return updateIndexFromOneItem(gitem, dbConn)
}

// 索引全量更新的当前并行数
func GetConcurrentOfUpdateIndexAll() int {
return ConcurrentOfUpdateIndexAll - semaIndexUpdateAllTask.AvailablePermits()
Expand Down
4 changes: 1 addition & 3 deletions test/graph.list
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
test.hostname01:6071
test.hostname02:6071
test.hostname03:6071
11.11.0.21:6071
7 changes: 2 additions & 5 deletions test/http.recv.history
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
#!/bin/bash
e=$1
m=$2
tags=$3

host_file=./graph.list
counter=$1
for i in `cat $host_file`;
do
printf "%s\n" $i
curl -s "$i/history/$e/$m/$tags"
curl -s "http://$i/history/$counter"
printf "\n"
sleep 0.1
done
30 changes: 30 additions & 0 deletions test/index.cached
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash
## test home
testdir=$(cd $(dirname $0)/; pwd)
## word home
workdir=$(dirname $testdir)
cd $workdir

cfg=./cfg.json
httpport=80
if [ -f $cfg ]; then
httpport=`cat $cfg | grep -A3 "\"http\":" | grep "\"listen\"" | cut -d\" -f4 | cut -d: -f2`
fi
httpprex="127.0.0.1:$httpport"

if [ $# != 2 ];then
printf "format:./query \"endpoint\" \"counter\"\n"
exit 1
fi

# args
endpoint=$1
counter=$2

# form request body
req="[{\"endpoint\":\"$endpoint\", \"counter\":\"$counter\"}]"

# request
url="$httpprex/v2/index/cached"

curl -s -X GET -d "$req" "$url" | python -m json.tool
30 changes: 30 additions & 0 deletions test/index.delete
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash
## test home
testdir=$(cd $(dirname $0)/; pwd)
## word home
workdir=$(dirname $testdir)
cd $workdir

cfg=./cfg.json
httpport=80
if [ -f $cfg ]; then
httpport=`cat $cfg | grep -A3 "\"http\":" | grep "\"listen\"" | cut -d\" -f4 | cut -d: -f2`
fi
httpprex="127.0.0.1:$httpport"

if [ $# != 2 ];then
printf "format:./query \"endpoint\" \"counter\"\n"
exit 1
fi

# args
endpoint=$1
counter=$2

# form request body
req="[{\"endpoint\":\"$endpoint\", \"counter\":\"$counter\"}]"

# request
url="$httpprex/v2/index/delete"

curl -s -X POST -d "$req" "$url" | python -m json.tool
30 changes: 30 additions & 0 deletions test/index.update
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash
## test home
testdir=$(cd $(dirname $0)/; pwd)
## word home
workdir=$(dirname $testdir)
cd $workdir

cfg=./cfg.json
httpport=80
if [ -f $cfg ]; then
httpport=`cat $cfg | grep -A3 "\"http\":" | grep "\"listen\"" | cut -d\" -f4 | cut -d: -f2`
fi
httpprex="127.0.0.1:$httpport"

if [ $# != 2 ];then
printf "format:./query \"endpoint\" \"counter\"\n"
exit 1
fi

# args
endpoint=$1
counter=$2

# form request body
req="[{\"endpoint\":\"$endpoint\", \"counter\":\"$counter\"}]"

# request
url="$httpprex/v2/index/update"

curl -s -X POST -d "$req" "$url" | python -m json.tool