Skip to content

Commit 22113b6

Browse files
authored
Merge pull request #95 from intergral/deepql
fix(routtrip): fix round tripper not find correct path
2 parents bfa2730 + 884bc76 commit 22113b6

9 files changed

Lines changed: 102 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1-
<!-- main START -->
1+
<!-- 1.0.8 START -->
2+
# 1.0.8 (23/04/2024)
3+
- **[BUGFIX]**: fix deepql running in distributed mode [#95](https://github.com/intergral/deep/pull/95) [@Umaaz](https://github.com/Umaaz)
4+
<!-- 1.0.8 START -->
5+
6+
<!-- 1.0.7 START -->
27
# 1.0.7 (18/04/2024)
38
- **[FEATURE]**: add support for deepql [#93](https://github.com/intergral/deep/pull/93) [@Umaaz](https://github.com/Umaaz)
4-
<!-- main START -->
9+
<!-- 1.0.7 START -->
510

611
<!-- 1.0.6 START -->
712
# 1.0.6 (14/03/2024)

cmd/deep/app/modules.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,14 +367,14 @@ func (t *App) initQuerier() (services.Service, error) {
367367
// tracepointAPI handles requests to change the config for tracepoints
368368
func (t *App) initQueryFrontend() (services.Service, error) {
369369
// we create to 2 bridges (roundTrippers) one for each backend
370-
roundTripper, tpTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer)
370+
roundTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer)
371371
if err != nil {
372372
return nil, err
373373
}
374374
t.frontend = v1
375375

376376
// create query frontend
377-
queryFrontend, err := frontend.New(t.cfg.Frontend, roundTripper, tpTripper, t.overrides, t.store, log.Logger, prometheus.DefaultRegisterer)
377+
queryFrontend, err := frontend.New(t.cfg.Frontend, roundTripper, t.overrides, t.store, log.Logger, prometheus.DefaultRegisterer)
378378
if err != nil {
379379
return nil, err
380380
}

examples/docker-compose/debug/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ services:
3434
- GF_AUTH_ANONYMOUS_ENABLED=true
3535
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
3636
- GF_AUTH_DISABLE_LOGIN_FORM=true
37-
- GF_INSTALL_PLUGINS=https://github.com/intergral/grafana-deep-tracepoint-panel/releases/download/v1.0.0/intergral-deep-tracepoint-panel-1.0.0.zip;intergral-deep-tracepoint-panel,https://github.com/intergral/grafana-deep-panel/releases/download/v1.0.1/intergral-deep-panel-1.0.1.zip;intergral-deep-panel,https://github.com/intergral/grafana-deep-datasource/releases/download/v1.0.2/intergral-deep-datasource-1.0.2.zip;intergral-deep-datasource
37+
- GF_INSTALL_PLUGINS=https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-tracepoint-panel.v1.0.0/intergral-deep-tracepoint-panel-1.0.0.zip;intergral-deep-tracepoint-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-panel.v1.0.2/intergral-deep-panel-1.0.2.zip;intergral-deep-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-datasource.v1.0.3/intergral-deep-datasource-1.0.3.zip;intergral-deep-datasource
3838
ports:
3939
- "3000:3000"
4040

examples/docker-compose/distributed/docker-compose.yaml

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,27 @@ services:
147147
grafana:
148148
image: grafana/grafana-oss
149149
volumes:
150-
- ../shared/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
150+
- ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
151151
- ../shared/dashboards:/etc/grafana/provisioning/dashboards/
152152
environment:
153153
- GF_AUTH_ANONYMOUS_ENABLED=true
154154
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
155155
- GF_AUTH_DISABLE_LOGIN_FORM=true
156-
- GF_INSTALL_PLUGINS=https://github.com/intergral/grafana-deep-panel/releases/download/v0.0.3/intergral-deep-panel-0.0.3.zip;intergral-deep-panel,https://github.com/intergral/grafana-deep-datasource/releases/download/v0.0.7/intergral-deep-datasource-0.0.7.zip;intergral-deep-datasource
156+
- GF_INSTALL_PLUGINS=https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-tracepoint-panel.v1.0.0/intergral-deep-tracepoint-panel-1.0.0.zip;intergral-deep-tracepoint-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-panel.v1.0.2/intergral-deep-panel-1.0.2.zip;intergral-deep-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-datasource.v1.0.3/intergral-deep-datasource-1.0.3.zip;intergral-deep-datasource
157157
ports:
158158
- "3000:3000"
159+
160+
test_load:
161+
image: intergral/deep-cli
162+
depends_on:
163+
- distributor
164+
command:
165+
- generate
166+
- snapshot
167+
- --endpoint=distributor:43315
168+
- --count=3
169+
- --sleep=5
170+
- --iterations=-1
171+
- --random-string
172+
- --random-duration
173+
- --duration-nanos=1000000000

examples/docker-compose/distributed/grafana-datasources.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ datasources:
2525
apiVersion: 1
2626
uid: deep
2727
jsonData:
28-
httpMethod: GET
28+
experimental:
29+
deepql: true

modules/frontend/config.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"net/http"
2323
"time"
2424

25+
"github.com/intergral/deep/pkg/api"
26+
2527
"github.com/go-kit/log"
2628
"github.com/prometheus/client_golang/prometheus"
2729

@@ -103,12 +105,28 @@ func (CortexNoQuerierLimits) MaxQueriersPerUser(string) int { return 0 }
103105
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
104106
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
105107
// (if there are no errors), and it uses the returned frontend (if any).
106-
func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, http.RoundTripper, *v1.Frontend, error) {
108+
func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
107109
statVersion.Set("v1")
108110
// No scheduler = use original frontend.
109111
fr, err := v1.New(cfg, limits, log, reg)
110112
if err != nil {
111-
return nil, nil, nil, err
113+
return nil, nil, err
112114
}
113-
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.QuerierRoundTrip), transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.TracepointRoundTrip), fr, nil
115+
// create both round trippers
116+
searchRt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.QuerierRoundTrip)
117+
tracepointRt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.TracepointRoundTrip)
118+
// filter the round trippers based on the paths
119+
tripper := transport.NewSplittingRoundTripper(&transport.SplitRule{
120+
Rules: []string{
121+
api.PathPrefixTracepoints,
122+
},
123+
RoundTripper: tracepointRt,
124+
}, &transport.SplitRule{
125+
Rules: []string{
126+
api.PathPrefixQuerier,
127+
},
128+
RoundTripper: searchRt,
129+
})
130+
131+
return tripper, fr, nil
114132
}

modules/frontend/frontend.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type QueryFrontend struct {
6060
}
6161

6262
// New returns a new QueryFrontend
63-
func New(cfg Config, next http.RoundTripper, tpNext http.RoundTripper, o *overrides.Overrides, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
63+
func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
6464
level.Info(logger).Log("msg", "creating middleware in query frontend")
6565

6666
if cfg.SnapshotByID.QueryShards < minQueryShards || cfg.SnapshotByID.QueryShards > maxQueryShards {
@@ -100,7 +100,7 @@ func New(cfg Config, next http.RoundTripper, tpNext http.RoundTripper, o *overri
100100
search := searchMiddleware.Wrap(next)
101101

102102
tpMiddleware := newTracepointForwardMiddleware()
103-
tpHandler := tpMiddleware.Wrap(tpNext)
103+
tpHandler := tpMiddleware.Wrap(next)
104104

105105
return &QueryFrontend{
106106
SnapshotByID: newHandler(snapshots, snapshotByIDCounter, logger),

modules/frontend/frontend_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ func (s *mockNextTripperware) RoundTrip(_ *http.Request) (*http.Response, error)
4141

4242
func TestFrontendRoundTripsSearch(t *testing.T) {
4343
next := &mockNextTripperware{}
44-
tpNext := &mockNextTripperware{}
4544
f, err := New(Config{
4645
SnapshotByID: SnapshotByIDConfig{
4746
QueryShards: minQueryShards,
@@ -54,7 +53,7 @@ func TestFrontendRoundTripsSearch(t *testing.T) {
5453
},
5554
SLO: testSLOcfg,
5655
},
57-
}, next, tpNext, nil, nil, log.NewNopLogger(), nil)
56+
}, next, nil, nil, log.NewNopLogger(), nil)
5857
require.NoError(t, err)
5958

6059
req := httptest.NewRequest("GET", "/", nil)
@@ -77,7 +76,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
7776
},
7877
SLO: testSLOcfg,
7978
},
80-
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
79+
}, nil, nil, nil, log.NewNopLogger(), nil)
8180
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
8281
assert.Nil(t, f)
8382

@@ -93,7 +92,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
9392
},
9493
SLO: testSLOcfg,
9594
},
96-
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
95+
}, nil, nil, nil, log.NewNopLogger(), nil)
9796
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
9897
assert.Nil(t, f)
9998

@@ -109,7 +108,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
109108
},
110109
SLO: testSLOcfg,
111110
},
112-
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
111+
}, nil, nil, nil, log.NewNopLogger(), nil)
113112
assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
114113
assert.Nil(t, f)
115114

@@ -125,7 +124,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
125124
},
126125
SLO: testSLOcfg,
127126
},
128-
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
127+
}, nil, nil, nil, log.NewNopLogger(), nil)
129128
assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
130129
assert.Nil(t, f)
131130

@@ -143,7 +142,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
143142
},
144143
SLO: testSLOcfg,
145144
},
146-
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
145+
}, nil, nil, nil, log.NewNopLogger(), nil)
147146
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
148147
assert.Nil(t, f)
149148
}

modules/frontend/transport/roundtripper.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@ package transport
2020
import (
2121
"bytes"
2222
"context"
23+
"errors"
2324
"io"
2425
"io/ioutil"
2526
"net/http"
27+
"strings"
28+
29+
"github.com/opentracing/opentracing-go"
2630

2731
frontend_v1pb "github.com/intergral/deep/modules/frontend/v1/frontendv1pb"
2832
)
@@ -95,3 +99,43 @@ func fromHeader(hs http.Header) []*frontend_v1pb.Header {
9599
}
96100
return result
97101
}
102+
103+
// SplitRule allows controlling which requests go to this roundtripper
104+
type SplitRule struct {
105+
Rules []string
106+
RoundTripper http.RoundTripper
107+
}
108+
109+
func (r *SplitRule) matches(url string) bool {
110+
for _, rule := range r.Rules {
111+
if strings.HasPrefix(url, rule) {
112+
return true
113+
}
114+
}
115+
return false
116+
}
117+
118+
type splitRoundTripper struct {
119+
http.RoundTripper
120+
121+
rules []*SplitRule
122+
}
123+
124+
// NewSplittingRoundTripper creates a new round tripper with rules
125+
func NewSplittingRoundTripper(rules ...*SplitRule) http.RoundTripper {
126+
return &splitRoundTripper{
127+
rules: rules,
128+
}
129+
}
130+
131+
func (s *splitRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
132+
span, _ := opentracing.StartSpanFromContext(r.Context(), "splitRoundTripper.RoundTrip")
133+
defer span.Finish()
134+
135+
for _, rule := range s.rules {
136+
if rule.matches(r.RequestURI) {
137+
return rule.RoundTripper.RoundTrip(r)
138+
}
139+
}
140+
return nil, errors.New("no matching round tripper")
141+
}

0 commit comments

Comments
 (0)