Skip to content

Commit ebbaf26

Browse files
logs: re-introduce rules filtering for Prometheus API as this is still not supported by Loki (#805)
1 parent 272ca2f commit ebbaf26

File tree

5 files changed

+715
-5
lines changed

5 files changed

+715
-5
lines changed

api/logs/v1/http.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type handlerConfiguration struct {
5858
registry *prometheus.Registry
5959
instrument handlerInstrumenter
6060
spanRoutePrefix string
61+
rulesLabelFilters map[string][]string
6162
readMiddlewares []func(http.Handler) http.Handler
6263
writeMiddlewares []func(http.Handler) http.Handler
6364
rulesReadMiddlewares []func(http.Handler) http.Handler
@@ -131,6 +132,13 @@ func WithGlobalMiddleware(m ...func(http.Handler) http.Handler) HandlerOption {
131132
}
132133
}
133134

135+
// WithRulesLabelFilters adds the slice of rule labels filters to the handler configuration.
136+
func WithRulesLabelFilters(f map[string][]string) HandlerOption {
137+
return func(h *handlerConfiguration) {
138+
h.rulesLabelFilters = f
139+
}
140+
}
141+
134142
type handlerInstrumenter interface {
135143
NewHandler(labels prometheus.Labels, handler http.Handler) http.HandlerFunc
136144
}
@@ -231,7 +239,7 @@ func NewHandler(read, tail, write, rules *url.URL, rulesReadOnly bool, tlsOption
231239
}
232240

233241
if rules != nil {
234-
var proxyRules http.Handler
242+
var proxyRules, proxyPrometheusReadRules http.Handler
235243
{
236244
middlewares := proxy.Middlewares(
237245
proxy.MiddlewareSetUpstream(rules),
@@ -240,18 +248,27 @@ func NewHandler(read, tail, write, rules *url.URL, rulesReadOnly bool, tlsOption
240248
proxy.MiddlewareMetrics(c.registry, prometheus.Labels{"proxy": "logsv1-rules"}),
241249
)
242250

251+
logger := proxy.Logger(c.logger)
243252
t := &http.Transport{
244253
DialContext: (&net.Dialer{
245254
Timeout: dialTimeout,
246255
}).DialContext,
247256
TLSClientConfig: tlsOptions.NewClientConfig(),
248257
}
258+
transport := otelhttp.NewTransport(t)
249259

260+
proxyPrometheusReadRules = &httputil.ReverseProxy{
261+
Director: middlewares,
262+
ErrorLog: logger,
263+
Transport: transport,
264+
ModifyResponse: newModifyResponseProm(c.logger, c.rulesLabelFilters),
265+
}
250266
proxyRules = &httputil.ReverseProxy{
251267
Director: middlewares,
252-
ErrorLog: proxy.Logger(c.logger),
253-
Transport: otelhttp.NewTransport(t),
268+
ErrorLog: logger,
269+
Transport: transport,
254270
}
271+
255272
}
256273
r.Group(func(r chi.Router) {
257274
r.Use(c.readMiddlewares...)
@@ -270,11 +287,11 @@ func NewHandler(read, tail, write, rules *url.URL, rulesReadOnly bool, tlsOption
270287
))
271288
r.Get(prometheusRulesRoute, c.instrument.NewHandler(
272289
prometheus.Labels{"group": "logsv1", "handler": "rules"},
273-
otelhttp.WithRouteTag(c.spanRoutePrefix+prometheusRulesRoute, proxyRules),
290+
otelhttp.WithRouteTag(c.spanRoutePrefix+prometheusRulesRoute, proxyPrometheusReadRules),
274291
))
275292
r.Get(prometheusAlertsRoute, c.instrument.NewHandler(
276293
prometheus.Labels{"group": "logsv1", "handler": "alerts"},
277-
otelhttp.WithRouteTag(c.spanRoutePrefix+prometheusAlertsRoute, proxyRules),
294+
otelhttp.WithRouteTag(c.spanRoutePrefix+prometheusAlertsRoute, proxyPrometheusReadRules),
278295
))
279296
r.Get(promRulesRoute, c.instrument.NewHandler(
280297
prometheus.Labels{"group": "logsv1", "handler": "rules"},

api/logs/v1/rules_labels_enforcer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ func WithEnforceRulesLabelFilters(labelKeys map[string][]string) func(http.Handl
9999
func WithParametersAsLabelsFilterRules(labelKeys map[string][]string) func(http.Handler) http.Handler {
100100
return func(next http.Handler) http.Handler {
101101
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
102+
// Prometheus rules & alert endpoints do not support filtering using the `labels` query parameter.
103+
if strings.Contains(r.URL.Path, prometheusRulesRoute) || strings.Contains(r.URL.Path, prometheusAlertsRoute) {
104+
next.ServeHTTP(w, r)
105+
return
106+
}
107+
102108
tenant, ok := authentication.GetTenant(r.Context())
103109
if !ok {
104110
httperr.PrometheusAPIError(w, "missing tenant id", http.StatusBadRequest)
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
package http
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"strconv"
11+
"time"
12+
13+
"github.com/go-kit/log"
14+
"github.com/go-kit/log/level"
15+
"github.com/observatorium/api/authentication"
16+
"github.com/observatorium/api/authorization"
17+
"github.com/prometheus/prometheus/model/labels"
18+
)
19+
20+
const contentTypeApplicationJSON = "application/json"
21+
22+
var (
23+
errUnknownTenantKey = errors.New("unknown tenant key")
24+
errUnknownRulesContentType = errors.New("unknown rules response content type")
25+
)
26+
27+
type alert struct {
28+
Labels labels.Labels `json:"labels"`
29+
Annotations labels.Labels `json:"annotations"`
30+
State string `json:"state"`
31+
ActiveAt *time.Time `json:"activeAt,omitempty"`
32+
Value string `json:"value"`
33+
}
34+
35+
func (a *alert) GetLabels() labels.Labels { return a.Labels }
36+
37+
type alertingRule struct {
38+
State string `json:"state"`
39+
Name string `json:"name"`
40+
Query string `json:"query"`
41+
Duration float64 `json:"duration"`
42+
Labels labels.Labels `json:"labels"`
43+
Annotations labels.Labels `json:"annotations"`
44+
Alerts []*alert `json:"alerts"`
45+
Health string `json:"health"`
46+
LastError string `json:"lastError"`
47+
LastEvaluation string `json:"lastEvaluation"`
48+
EvaluationTime float64 `json:"evaluationTime"`
49+
// Type of an alertingRule is always "alerting".
50+
Type string `json:"type"`
51+
}
52+
53+
type recordingRule struct {
54+
Name string `json:"name"`
55+
Query string `json:"query"`
56+
Labels labels.Labels `json:"labels"`
57+
Health string `json:"health"`
58+
LastError string `json:"lastError"`
59+
LastEvaluation string `json:"lastEvaluation"`
60+
EvaluationTime float64 `json:"evaluationTime"`
61+
// Type of a recordingRule is always "recording".
62+
Type string `json:"type"`
63+
}
64+
65+
type ruleGroup struct {
66+
Name string `json:"name"`
67+
File string `json:"file"`
68+
Rules []rule `json:"rules"`
69+
Interval float64 `json:"interval"`
70+
LastEvaluation string `json:"lastEvaluation"`
71+
EvaluationTime float64 `json:"evaluationTime"`
72+
}
73+
74+
type rule struct {
75+
*alertingRule
76+
*recordingRule
77+
}
78+
79+
func (r *rule) GetLabels() labels.Labels {
80+
if r.alertingRule != nil {
81+
return r.alertingRule.Labels
82+
}
83+
return r.recordingRule.Labels
84+
}
85+
86+
// MarshalJSON implements the json.Marshaler interface for rule.
87+
func (r *rule) MarshalJSON() ([]byte, error) {
88+
if r.alertingRule != nil {
89+
return json.Marshal(r.alertingRule)
90+
}
91+
return json.Marshal(r.recordingRule)
92+
}
93+
94+
// UnmarshalJSON implements the json.Unmarshaler interface for rule.
95+
func (r *rule) UnmarshalJSON(b []byte) error {
96+
var ruleType struct {
97+
Type string `json:"type"`
98+
}
99+
if err := json.Unmarshal(b, &ruleType); err != nil {
100+
return err
101+
}
102+
switch ruleType.Type {
103+
case "alerting":
104+
var alertingr alertingRule
105+
if err := json.Unmarshal(b, &alertingr); err != nil {
106+
return err
107+
}
108+
r.alertingRule = &alertingr
109+
case "recording":
110+
var recordingr recordingRule
111+
if err := json.Unmarshal(b, &recordingr); err != nil {
112+
return err
113+
}
114+
r.recordingRule = &recordingr
115+
default:
116+
return fmt.Errorf("failed to unmarshal rule: unknown type %q", ruleType.Type)
117+
}
118+
119+
return nil
120+
}
121+
122+
type rulesData struct {
123+
RuleGroups []*ruleGroup `json:"groups,omitempty"`
124+
Alerts []*alert `json:"alerts,omitempty"`
125+
}
126+
127+
type prometheusRulesResponse struct {
128+
Status string `json:"status"`
129+
Data rulesData `json:"data"`
130+
Error string `json:"error"`
131+
ErrorType string `json:"errorType"`
132+
}
133+
134+
func newModifyResponseProm(logger log.Logger, labelKeys map[string][]string) func(*http.Response) error {
135+
return func(res *http.Response) error {
136+
tenant, ok := authentication.GetTenant(res.Request.Context())
137+
if !ok {
138+
return errUnknownTenantKey
139+
}
140+
141+
keys, ok := labelKeys[tenant]
142+
if !ok {
143+
level.Debug(logger).Log("msg", "Skip applying rule label filters", "tenant", tenant)
144+
return nil
145+
}
146+
147+
var (
148+
matchers = extractMatchers(res.Request, keys)
149+
contentType = res.Header.Get("Content-Type")
150+
)
151+
152+
data, ok := authorization.GetData(res.Request.Context())
153+
154+
var matchersInfo AuthzResponseData
155+
if ok && data != "" {
156+
if err := json.Unmarshal([]byte(data), &matchersInfo); err != nil {
157+
return nil
158+
}
159+
}
160+
161+
strictMode := len(matchersInfo.Matchers) != 0
162+
163+
matcherStr := fmt.Sprintf("%s", matchers)
164+
level.Debug(logger).Log("msg", "filtering using matchers", "tenant", tenant, "matchers", matcherStr)
165+
166+
body, err := io.ReadAll(res.Body)
167+
if err != nil {
168+
level.Error(logger).Log("msg", err)
169+
return err
170+
}
171+
res.Body.Close()
172+
173+
b, err := filterRules(body, contentType, matchers, strictMode)
174+
if err != nil {
175+
level.Error(logger).Log("msg", err)
176+
return err
177+
}
178+
179+
res.Body = io.NopCloser(bytes.NewReader(b))
180+
res.ContentLength = int64(len(b))
181+
res.Header.Set("Content-Length", strconv.FormatInt(res.ContentLength, 10))
182+
183+
return nil
184+
}
185+
}
186+
187+
func extractMatchers(r *http.Request, l []string) map[string]string {
188+
queryParams := r.URL.Query()
189+
matchers := map[string]string{}
190+
for _, name := range l {
191+
value := queryParams.Get(name)
192+
if value != "" {
193+
matchers[name] = value
194+
}
195+
}
196+
197+
return matchers
198+
}
199+
200+
func filterRules(body []byte, contentType string, matchers map[string]string, strictMode bool) ([]byte, error) {
201+
switch contentType {
202+
case contentTypeApplicationJSON:
203+
var res prometheusRulesResponse
204+
err := json.Unmarshal(body, &res)
205+
if err != nil {
206+
return nil, err
207+
}
208+
return json.Marshal(filterPrometheusResponse(res, matchers, strictMode))
209+
default:
210+
return nil, errUnknownRulesContentType
211+
}
212+
}
213+
214+
func filterPrometheusResponse(res prometheusRulesResponse, matchers map[string]string, strictEnforce bool) prometheusRulesResponse {
215+
if len(matchers) == 0 {
216+
if strictEnforce {
217+
res.Data = rulesData{}
218+
}
219+
220+
return res
221+
}
222+
223+
if len(res.Data.RuleGroups) > 0 {
224+
filtered := filterPrometheusRuleGroups(res.Data.RuleGroups, matchers)
225+
res.Data = rulesData{RuleGroups: filtered}
226+
}
227+
228+
if len(res.Data.Alerts) > 0 {
229+
filtered := filterPrometheusAlerts(res.Data.Alerts, matchers)
230+
res.Data = rulesData{Alerts: filtered}
231+
}
232+
233+
return res
234+
}
235+
236+
type labeledRule interface {
237+
GetLabels() labels.Labels
238+
}
239+
240+
func hasMatchingLabels(rule labeledRule, matchers map[string]string) bool {
241+
for key, value := range matchers {
242+
labels := rule.GetLabels().Map()
243+
val, ok := labels[key]
244+
if !ok || val != value {
245+
return false
246+
}
247+
}
248+
return true
249+
}
250+
251+
func filterPrometheusRuleGroups(groups []*ruleGroup, matchers map[string]string) []*ruleGroup {
252+
var filtered []*ruleGroup
253+
254+
for _, group := range groups {
255+
var filteredRules []rule
256+
for _, r := range group.Rules {
257+
if hasMatchingLabels(&r, matchers) {
258+
filteredRules = append(filteredRules, r)
259+
}
260+
}
261+
262+
if len(filteredRules) > 0 {
263+
group.Rules = filteredRules
264+
filtered = append(filtered, group)
265+
}
266+
}
267+
268+
return filtered
269+
}
270+
271+
func filterPrometheusAlerts(alerts []*alert, matchers map[string]string) []*alert {
272+
var filtered []*alert
273+
for _, a := range alerts {
274+
if hasMatchingLabels(a, matchers) {
275+
filtered = append(filtered, a)
276+
}
277+
}
278+
279+
return filtered
280+
}

0 commit comments

Comments
 (0)