Skip to content

Commit fa35703

Browse files
authored
Add optional backend for ClickHouse storage (#6194)
* feat: ClickHouse storage backend Add a new optional storage backend backed by a ClickHouse database. This should improve performance over the FrostDB backend and help users send data from more nodes and query it efficiently. This commit is the first working version of the code, improvements will follow in later commits. * chore: Small fixes and style issues in ClickHouse storage code - move Symbolizer interface to the symbolizer module - remove deduplication and ordering of results on the Go side, relying on the database instead - in QueryRange, move grouping by labels to ClickHouse - in QuerySingle and QueryMerge, match FrostDB aggregation behavior - remove dead code - formatting * Remove unnecessary pings from ClickHouse client * Refactor duplicated ParseQuery function, move to profile pkg * test: Move TestParseQuery to profile module, remove unnecessary test The function was moved to that module so it makes sense for the test to be there too. Remove test that only checked constants had specific values. * chore: Update flags, comments with linter requirements * fix: Move ClickHouse storage flags to FlagHidden Hide them by default as it's an experimental feature
1 parent e7032ad commit fa35703

File tree

13 files changed

+2455
-200
lines changed

13 files changed

+2455
-200
lines changed

go.mod

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.24.1
44

55
require (
66
cloud.google.com/go/storage v1.59.1
7+
github.com/ClickHouse/clickhouse-go/v2 v2.43.0
78
github.com/alecthomas/kong v0.9.0
89
github.com/apache/arrow-go/v18 v18.5.1
910
github.com/cenkalti/backoff/v4 v4.3.0
@@ -83,6 +84,7 @@ require (
8384
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v4 v4.3.0 // indirect
8485
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0 // indirect
8586
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect
87+
github.com/ClickHouse/ch-go v0.71.0 // indirect
8688
github.com/Code-Hex/go-generics-cache v1.5.1 // indirect
8789
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 // indirect
8890
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0 // indirect
@@ -124,9 +126,9 @@ require (
124126
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
125127
github.com/dgryski/go-metro v0.0.0-20250106013310-edb8663e5e33 // indirect
126128
github.com/digitalocean/godo v1.152.0 // indirect
127-
github.com/distribution/reference v0.5.0 // indirect
128-
github.com/docker/docker v28.3.3+incompatible // indirect
129-
github.com/docker/go-connections v0.4.0 // indirect
129+
github.com/distribution/reference v0.6.0 // indirect
130+
github.com/docker/docker v28.5.2+incompatible // indirect
131+
github.com/docker/go-connections v0.6.0 // indirect
130132
github.com/docker/go-units v0.5.0 // indirect
131133
github.com/dustin/go-humanize v1.0.1 // indirect
132134
github.com/efficientgo/core v1.0.0-rc.2 // indirect
@@ -137,6 +139,8 @@ require (
137139
github.com/felixge/httpsnoop v1.0.4 // indirect
138140
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
139141
github.com/gin-gonic/gin v1.9.1 // indirect
142+
github.com/go-faster/city v1.0.1 // indirect
143+
github.com/go-faster/errors v0.7.1 // indirect
140144
github.com/go-jose/go-jose/v4 v4.1.3 // indirect
141145
github.com/go-logfmt/logfmt v0.6.0 // indirect
142146
github.com/go-logr/logr v1.4.3 // indirect
@@ -201,8 +205,6 @@ require (
201205
github.com/mitchellh/go-homedir v1.1.0 // indirect
202206
github.com/mitchellh/mapstructure v1.5.0 // indirect
203207
github.com/moby/docker-image-spec v1.3.1 // indirect
204-
github.com/moby/sys/sequential v0.6.0 // indirect
205-
github.com/moby/term v0.5.0 // indirect
206208
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
207209
github.com/modern-go/reflect2 v1.0.2 // indirect
208210
github.com/mozillazg/go-httpheader v0.2.1 // indirect
@@ -213,10 +215,11 @@ require (
213215
github.com/oklog/ulid v1.3.1 // indirect
214216
github.com/oklog/ulid/v2 v2.1.1 // indirect
215217
github.com/opencontainers/go-digest v1.0.0 // indirect
216-
github.com/opencontainers/image-spec v1.1.0-rc4 // indirect
218+
github.com/opencontainers/image-spec v1.1.1 // indirect
217219
github.com/oracle/oci-go-sdk/v65 v65.41.1 // indirect
218220
github.com/ovh/go-ovh v1.8.0 // indirect
219-
github.com/pierrec/lz4/v4 v4.1.23 // indirect
221+
github.com/paulmach/orb v0.12.0 // indirect
222+
github.com/pierrec/lz4/v4 v4.1.25 // indirect
220223
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
221224
github.com/pkg/errors v0.9.1 // indirect
222225
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
@@ -227,6 +230,8 @@ require (
227230
github.com/rs/cors v1.8.0 // indirect
228231
github.com/rs/xid v1.5.0 // indirect
229232
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.33 // indirect
233+
github.com/segmentio/asm v1.2.1 // indirect
234+
github.com/shopspring/decimal v1.4.0 // indirect
230235
github.com/sony/gobreaker v0.5.0 // indirect
231236
github.com/spf13/pflag v1.0.6 // indirect
232237
github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect
@@ -242,6 +247,7 @@ require (
242247
go.opentelemetry.io/otel/metric v1.39.0 // indirect
243248
go.opentelemetry.io/otel/sdk/metric v1.39.0 // indirect
244249
go.yaml.in/yaml/v2 v2.4.3 // indirect
250+
go.yaml.in/yaml/v3 v3.0.4 // indirect
245251
golang.org/x/crypto v0.47.0 // indirect
246252
golang.org/x/mod v0.32.0 // indirect
247253
golang.org/x/sys v0.40.0 // indirect

go.sum

Lines changed: 45 additions & 12 deletions
Large diffs are not rendered by default.

pkg/clickhouse/client.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright 2024-2026 The Parca Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package clickhouse
15+
16+
import (
17+
"context"
18+
"crypto/tls"
19+
"fmt"
20+
21+
"github.com/ClickHouse/clickhouse-go/v2"
22+
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
23+
)
24+
25+
// Config holds ClickHouse connection configuration.
26+
type Config struct {
27+
Address string
28+
Database string
29+
Username string
30+
Password string
31+
Table string
32+
Secure bool
33+
}
34+
35+
// Client is a wrapper around the ClickHouse connection.
36+
type Client struct {
37+
conn driver.Conn
38+
cfg Config
39+
}
40+
41+
// NewClient creates a new ClickHouse client with the given configuration.
42+
// It first connects without a database to ensure the database can be created,
43+
// then reconnects with the database specified.
44+
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
45+
// First, connect without specifying a database to allow database creation
46+
opts := &clickhouse.Options{
47+
Addr: []string{cfg.Address},
48+
Auth: clickhouse.Auth{
49+
Username: cfg.Username,
50+
Password: cfg.Password,
51+
},
52+
}
53+
54+
if cfg.Secure {
55+
opts.TLS = &tls.Config{}
56+
}
57+
58+
conn, err := clickhouse.Open(opts)
59+
if err != nil {
60+
return nil, fmt.Errorf("failed to open ClickHouse connection: %w", err)
61+
}
62+
63+
// Create database if it doesn't exist
64+
if err := conn.Exec(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database)); err != nil {
65+
conn.Close()
66+
return nil, fmt.Errorf("failed to create database: %w", err)
67+
}
68+
69+
// Close the initial connection
70+
conn.Close()
71+
72+
// Now connect with the database specified
73+
opts.Auth.Database = cfg.Database
74+
conn, err = clickhouse.Open(opts)
75+
if err != nil {
76+
return nil, fmt.Errorf("failed to open ClickHouse connection with database: %w", err)
77+
}
78+
79+
return &Client{
80+
conn: conn,
81+
cfg: cfg,
82+
}, nil
83+
}
84+
85+
// Close closes the ClickHouse connection.
86+
func (c *Client) Close() error {
87+
return c.conn.Close()
88+
}
89+
90+
// Conn returns the underlying ClickHouse connection.
91+
func (c *Client) Conn() driver.Conn {
92+
return c.conn
93+
}
94+
95+
// Config returns the client configuration.
96+
func (c *Client) Config() Config {
97+
return c.cfg
98+
}
99+
100+
// Database returns the database name.
101+
func (c *Client) Database() string {
102+
return c.cfg.Database
103+
}
104+
105+
// Table returns the table name.
106+
func (c *Client) Table() string {
107+
return c.cfg.Table
108+
}
109+
110+
// FullTableName returns the fully qualified table name (database.table).
111+
func (c *Client) FullTableName() string {
112+
return fmt.Sprintf("%s.%s", c.cfg.Database, c.cfg.Table)
113+
}
114+
115+
// EnsureSchema creates the table if it doesn't exist.
116+
// Note: The database is already created in NewClient.
117+
func (c *Client) EnsureSchema(ctx context.Context) error {
118+
// Create table using the schema definition
119+
schema := CreateTableSQL(c.cfg.Database, c.cfg.Table)
120+
if err := c.conn.Exec(ctx, schema); err != nil {
121+
return fmt.Errorf("failed to create table: %w", err)
122+
}
123+
124+
return nil
125+
}
126+
127+
// Query executes a query and returns the rows.
128+
func (c *Client) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
129+
return c.conn.Query(ctx, query, args...)
130+
}
131+
132+
// Exec executes a query without returning rows.
133+
func (c *Client) Exec(ctx context.Context, query string, args ...interface{}) error {
134+
return c.conn.Exec(ctx, query, args...)
135+
}
136+
137+
// PrepareBatch prepares a batch for insertion.
138+
func (c *Client) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) {
139+
return c.conn.PrepareBatch(ctx, query)
140+
}

pkg/clickhouse/filter.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// Copyright 2024-2026 The Parca Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package clickhouse
15+
16+
import (
17+
"fmt"
18+
"strings"
19+
20+
"github.com/prometheus/prometheus/model/labels"
21+
22+
"github.com/parca-dev/parca/pkg/profile"
23+
)
24+
25+
// ProfileTypeFilter generates SQL WHERE clause conditions for profile type filtering.
26+
func ProfileTypeFilter(qp profile.QueryParts) (string, []interface{}) {
27+
conditions := []string{
28+
"name = ?",
29+
"sample_type = ?",
30+
"sample_unit = ?",
31+
"period_type = ?",
32+
"period_unit = ?",
33+
}
34+
args := []interface{}{
35+
qp.Meta.Name,
36+
qp.Meta.SampleType.Type,
37+
qp.Meta.SampleType.Unit,
38+
qp.Meta.PeriodType.Type,
39+
qp.Meta.PeriodType.Unit,
40+
}
41+
42+
if qp.Delta {
43+
conditions = append(conditions, "duration != 0")
44+
} else {
45+
conditions = append(conditions, "duration = 0")
46+
}
47+
48+
return strings.Join(conditions, " AND "), args
49+
}
50+
51+
// LabelMatchersToSQL converts Prometheus label matchers to SQL WHERE clause conditions.
52+
func LabelMatchersToSQL(matchers []*labels.Matcher) (string, []interface{}, error) {
53+
if len(matchers) == 0 {
54+
return "", nil, nil
55+
}
56+
57+
conditions := make([]string, 0, len(matchers))
58+
args := make([]interface{}, 0, len(matchers))
59+
60+
for _, m := range matchers {
61+
condition, arg, err := matcherToSQL(m)
62+
if err != nil {
63+
return "", nil, err
64+
}
65+
conditions = append(conditions, condition)
66+
if arg != nil {
67+
args = append(args, arg)
68+
}
69+
}
70+
71+
return strings.Join(conditions, " AND "), args, nil
72+
}
73+
74+
// matcherToSQL converts a single Prometheus label matcher to a SQL condition.
75+
func matcherToSQL(m *labels.Matcher) (string, interface{}, error) {
76+
// Use ClickHouse JSON path syntax for label access
77+
labelPath := fmt.Sprintf("labels.%s", m.Name)
78+
79+
switch m.Type {
80+
case labels.MatchEqual:
81+
if m.Value == "" {
82+
// Empty value means label should not exist or be null
83+
return fmt.Sprintf("(%s IS NULL OR %s = '')", labelPath, labelPath), nil, nil
84+
}
85+
return fmt.Sprintf("%s = ?", labelPath), m.Value, nil
86+
87+
case labels.MatchNotEqual:
88+
if m.Value == "" {
89+
// Not empty means label should exist and not be null/empty
90+
return fmt.Sprintf("(%s IS NOT NULL AND %s != '')", labelPath, labelPath), nil, nil
91+
}
92+
return fmt.Sprintf("(%s != ? OR %s IS NULL)", labelPath, labelPath), m.Value, nil
93+
94+
case labels.MatchRegexp:
95+
// ClickHouse uses match() for regex
96+
return fmt.Sprintf("match(toString(%s), ?)", labelPath), m.Value, nil
97+
98+
case labels.MatchNotRegexp:
99+
return fmt.Sprintf("NOT match(toString(%s), ?)", labelPath), m.Value, nil
100+
101+
default:
102+
return "", nil, fmt.Errorf("unsupported matcher type: %v", m.Type)
103+
}
104+
}
105+
106+
// TimeRangeFilter generates SQL WHERE clause conditions for time range filtering.
107+
func TimeRangeFilter(startNanos, endNanos int64) (string, []interface{}) {
108+
return "time_nanos >= ? AND time_nanos <= ?", []interface{}{startNanos, endNanos}
109+
}
110+
111+
// BuildWhereClause combines multiple filter conditions into a single WHERE clause.
112+
func BuildWhereClause(conditions []string, allArgs []interface{}) (string, []interface{}) {
113+
nonEmpty := make([]string, 0, len(conditions))
114+
for _, c := range conditions {
115+
if c != "" {
116+
nonEmpty = append(nonEmpty, c)
117+
}
118+
}
119+
120+
if len(nonEmpty) == 0 {
121+
return "", nil
122+
}
123+
124+
return "WHERE " + strings.Join(nonEmpty, " AND "), allArgs
125+
}
126+
127+
// QueryToFilters converts a query string and time range to SQL filter components.
128+
func QueryToFilters(query string, startNanos, endNanos int64) (string, []interface{}, profile.QueryParts, error) {
129+
qp, err := profile.ParseQuery(query)
130+
if err != nil {
131+
return "", nil, qp, err
132+
}
133+
134+
// Profile type filter
135+
profileFilter, profileArgs := ProfileTypeFilter(qp)
136+
137+
// Label matchers filter
138+
labelFilter, labelArgs, err := LabelMatchersToSQL(qp.Matchers)
139+
if err != nil {
140+
return "", nil, qp, err
141+
}
142+
143+
// Time range filter
144+
timeFilter, timeArgs := TimeRangeFilter(startNanos, endNanos)
145+
146+
// Combine all conditions
147+
conditions := []string{profileFilter}
148+
args := append([]interface{}{}, profileArgs...)
149+
150+
if labelFilter != "" {
151+
conditions = append(conditions, labelFilter)
152+
args = append(args, labelArgs...)
153+
}
154+
155+
if startNanos != 0 || endNanos != 0 {
156+
conditions = append(conditions, timeFilter)
157+
args = append(args, timeArgs...)
158+
}
159+
160+
whereClause, _ := BuildWhereClause(conditions, args)
161+
162+
return whereClause, args, qp, nil
163+
}

0 commit comments

Comments
 (0)