Skip to content

Commit 4e02bee

Browse files
committed
bigquery: add gcp_bigquery_write_api enterprise output
New BatchOutput backed by the BigQuery Storage Write API for higher throughput and lower latency than the legacy streaming API. - JSON or protobuf input with auto schema fetch + proto conversion - Per-table managed stream cache with idle eviction sweeper - gRPC error classification: permanent failures returned as BatchError - Service-account impersonation via target_principal + delegates - Configurable stream idle timeout and sweep interval - Endpoint overrides for local emulators - Prometheus-style _total counter metrics + batch latency timer
1 parent 813e390 commit 4e02bee

8 files changed

Lines changed: 1548 additions & 3 deletions

File tree

Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
= gcp_bigquery_write_api
2+
:type: output
3+
:status: stable
4+
:categories: ["GCP","Services"]
5+
6+
7+
8+
////
9+
THIS FILE IS AUTOGENERATED!
10+
11+
To make changes, edit the corresponding source file under:
12+
13+
https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.
14+
15+
And:
16+
17+
https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
18+
////
19+
20+
// © 2026 Redpanda Data Inc.
21+
22+
23+
component_type_dropdown::[]
24+
25+
26+
Streams data into BigQuery using the Storage Write API.
27+
28+
Introduced in version 4.90.0.
29+
30+
31+
[tabs]
32+
======
33+
Common::
34+
+
35+
--
36+
37+
```yml
38+
# Common config fields, showing default values
39+
output:
40+
label: ""
41+
gcp_bigquery_write_api:
42+
project: ""
43+
dataset: "" # No default (required)
44+
table: "" # No default (required)
45+
message_format: json
46+
max_in_flight: 64
47+
batching:
48+
count: 0
49+
byte_size: 0
50+
period: ""
51+
check: ""
52+
credentials_json: ""
53+
```
54+
55+
--
56+
Advanced::
57+
+
58+
--
59+
60+
```yml
61+
# All config fields, showing default values
62+
output:
63+
label: ""
64+
gcp_bigquery_write_api:
65+
project: ""
66+
dataset: "" # No default (required)
67+
table: "" # No default (required)
68+
message_format: json
69+
max_in_flight: 64
70+
batching:
71+
count: 0
72+
byte_size: 0
73+
period: ""
74+
check: ""
75+
processors: [] # No default (optional)
76+
credentials_json: ""
77+
target_principal: ""
78+
delegates: []
79+
stream_idle_timeout: 5m
80+
stream_sweep_interval: 1m
81+
endpoint:
82+
http: ""
83+
grpc: ""
84+
```
85+
86+
--
87+
======
88+
89+
Writes messages to a BigQuery table using the Storage Write API.
90+
This provides higher throughput and lower latency than the legacy streaming API or load jobs.
91+
92+
Messages can be formatted as JSON (default) or raw protobuf bytes.
93+
When using JSON format the component automatically fetches the table schema and converts each message to the corresponding proto representation.
94+
95+
WARNING: The proto3 JSON mapping encodes int64 and uint64 values as strings.
96+
JSON messages with integer fields must use string values (e.g. `"age": "30"` not `"age": 30`).
97+
Otherwise the write will fail with an unmarshalling error.
98+
99+
When batching is enabled the table name is resolved from the first message in each batch.
100+
All messages in the same batch are written to that table.
101+
102+
103+
== Fields
104+
105+
=== `project`
106+
107+
The GCP project ID. If empty, the project is auto-detected from the environment.
108+
109+
110+
*Type*: `string`
111+
112+
*Default*: `""`
113+
114+
=== `dataset`
115+
116+
The BigQuery dataset ID.
117+
118+
119+
*Type*: `string`
120+
121+
122+
=== `table`
123+
124+
The BigQuery table ID. Supports interpolation functions. When batching, resolved from the first message in each batch.
125+
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
126+
127+
128+
*Type*: `string`
129+
130+
131+
=== `message_format`
132+
133+
The format of input messages. Use 'json' to have the component convert JSON to proto automatically. Use 'protobuf' to supply raw proto-encoded bytes.
134+
135+
136+
*Type*: `string`
137+
138+
*Default*: `"json"`
139+
140+
Options:
141+
`json`
142+
, `protobuf`
143+
.
144+
145+
=== `max_in_flight`
146+
147+
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
148+
149+
150+
*Type*: `int`
151+
152+
*Default*: `64`
153+
154+
=== `batching`
155+
156+
Allows you to configure a xref:configuration:batching.adoc[batching policy].
157+
158+
159+
*Type*: `object`
160+
161+
162+
```yml
163+
# Examples
164+
165+
batching:
166+
byte_size: 5000
167+
count: 0
168+
period: 1s
169+
170+
batching:
171+
count: 10
172+
period: 1s
173+
174+
batching:
175+
check: this.contains("END BATCH")
176+
count: 0
177+
period: 1m
178+
```
179+
180+
=== `batching.count`
181+
182+
A number of messages at which the batch should be flushed. If `0` disables count based batching.
183+
184+
185+
*Type*: `int`
186+
187+
*Default*: `0`
188+
189+
=== `batching.byte_size`
190+
191+
An amount of bytes at which the batch should be flushed. If `0` disables size based batching.
192+
193+
194+
*Type*: `int`
195+
196+
*Default*: `0`
197+
198+
=== `batching.period`
199+
200+
A period in which an incomplete batch should be flushed regardless of its size.
201+
202+
203+
*Type*: `string`
204+
205+
*Default*: `""`
206+
207+
```yml
208+
# Examples
209+
210+
period: 1s
211+
212+
period: 1m
213+
214+
period: 500ms
215+
```
216+
217+
=== `batching.check`
218+
219+
A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch.
220+
221+
222+
*Type*: `string`
223+
224+
*Default*: `""`
225+
226+
```yml
227+
# Examples
228+
229+
check: this.type == "end_of_transaction"
230+
```
231+
232+
=== `batching.processors`
233+
234+
A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
235+
236+
237+
*Type*: `array`
238+
239+
240+
```yml
241+
# Examples
242+
243+
processors:
244+
- archive:
245+
format: concatenate
246+
247+
processors:
248+
- archive:
249+
format: lines
250+
251+
processors:
252+
- archive:
253+
format: json_array
254+
```
255+
256+
=== `credentials_json`
257+
258+
An optional JSON string containing GCP credentials. If empty, credentials are loaded from the environment.
259+
[CAUTION]
260+
====
261+
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
262+
====
263+
264+
265+
266+
*Type*: `string`
267+
268+
*Default*: `""`
269+
270+
=== `target_principal`
271+
272+
Service account email to impersonate. When set, the output obtains tokens acting as this service account. Requires the caller to have roles/iam.serviceAccountTokenCreator on the target.
273+
274+
275+
*Type*: `string`
276+
277+
*Default*: `""`
278+
279+
=== `delegates`
280+
281+
Optional delegation chain for chained service account impersonation. Each service account must be granted roles/iam.serviceAccountTokenCreator on the next in the chain.
282+
283+
284+
*Type*: `array`
285+
286+
*Default*: `[]`
287+
288+
=== `stream_idle_timeout`
289+
290+
How long a cached stream can remain unused before being closed. Relevant when the table field uses interpolation to route to many tables.
291+
292+
293+
*Type*: `string`
294+
295+
*Default*: `"5m"`
296+
297+
=== `stream_sweep_interval`
298+
299+
How often to check for idle streams to close.
300+
301+
302+
*Type*: `string`
303+
304+
*Default*: `"1m"`
305+
306+
=== `endpoint`
307+
308+
Optional endpoint overrides for the BigQuery and Storage Write API clients.
309+
310+
311+
*Type*: `object`
312+
313+
314+
=== `endpoint.http`
315+
316+
Override the BigQuery HTTP endpoint. Useful for local emulators.
317+
318+
319+
*Type*: `string`
320+
321+
*Default*: `""`
322+
323+
=== `endpoint.grpc`
324+
325+
Override the BigQuery Storage gRPC endpoint. Useful for local emulators.
326+
327+
328+
*Type*: `string`
329+
330+
*Default*: `""`
331+
332+

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ require (
2424
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20260323171043-3635d3966b23.1
2525
buf.build/go/hyperpb v0.1.3
2626
cloud.google.com/go/aiplatform v1.121.0
27-
cloud.google.com/go/bigquery v1.74.0
27+
cloud.google.com/go/bigquery v1.75.0
2828
cloud.google.com/go/pubsub v1.50.1
2929
cloud.google.com/go/spanner v1.88.0
3030
cloud.google.com/go/storage v1.62.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvf
7272
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
7373
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
7474
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
75-
cloud.google.com/go/bigquery v1.74.0 h1:Q6bAMv+eyvufOpIrfrYxhM46qq1D3ZQTdgUDQqKS+n8=
76-
cloud.google.com/go/bigquery v1.74.0/go.mod h1:iViO7Cx3A/cRKcHNRsHB3yqGAMInFBswrE9Pxazsc90=
75+
cloud.google.com/go/bigquery v1.75.0 h1:gI4AgIhXNZ8hxvPDOp4hLGUnpNBjoBor6POSLcrdWkY=
76+
cloud.google.com/go/bigquery v1.75.0/go.mod h1:zNCHWok+hfTgKCwNqT+V7GH/YmFFgZqjzljKCZBJTWc=
7777
cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
7878
cloud.google.com/go/compute v1.2.0/go.mod h1:xlogom/6gr8RJGBe7nT2eGsQYAFUbbv8dbC29qE3Xmw=
7979
cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJWM7YD99wM=

0 commit comments

Comments
 (0)