Add publish_time_field to ReadFromPubSub YAML transform#38985
Conversation
|
/gemini review |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the ability to extract and expose the Pub/Sub message publish time within the Apache Beam YAML 'ReadFromPubSub' transform. By adding a 'publish_time_field' configuration, users can now easily propagate this metadata to downstream components, such as BigQuery, which is particularly useful for time-series analysis and data auditing. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds YAML Pub/Sub IO support for emitting Pub/Sub message publish time as an output field, with a dedicated unit test.
Changes:
- Introduces
publish_time_fieldoption toReadFromPubSubYAML IO to append message publish time to output rows. - Forces
ReadFromPubSub(with_attributes=True)when publish time is requested so publish time is available. - Adds a Python unit test validating publish time propagation into a new row field.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| sdks/python/apache_beam/yaml/yaml_io.py | Adds publish_time_field support, schema wiring, and mapping logic to include Pub/Sub publish time in output rows. |
| sdks/python/apache_beam/yaml/yaml_io_test.py | Adds coverage for reading Pub/Sub messages with publish_time_field enabled. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| publish_time_field: Field to add to output messages with the Pub/Sub | ||
| message publish time. If None, no such field is added. |
| payload_schema, parser = _create_parser(format, schema) | ||
| extra_fields: list[schema_pb2.Field] = [] | ||
| if not attributes and not attributes_map: | ||
| if not attributes and not attributes_map and not publish_time_field: |
| if publish_time_field: | ||
| extra_fields.append(schemas.schema_field(publish_time_field, Timestamp)) |
| if publish_time_field: | ||
| values[publish_time_field] = Timestamp.from_utc_datetime( | ||
| msg.publish_time) |
| beam.Row(payload=b'msg2', attrMap={'attr': 'value2'}) | ||
| ])) | ||
|
|
||
| def test_read_with_publish_time_field(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( | ||
| pickle_library='cloudpickle')) as p: |
| result = p | YamlTransform( | ||
| ''' | ||
| type: ReadFromPubSub | ||
| config: | ||
| topic: my_topic | ||
| format: RAW | ||
| publish_time_field: publish_time | ||
| ''') |
There was a problem hiding this comment.
Code Review
This pull request introduces the publish_time_field parameter to read_from_pubsub in Apache Beam's YAML IO, allowing users to include the Pub/Sub message publish time in the output rows. However, a critical issue was identified where msg.publish_time is already a Timestamp object in production, meaning that calling Timestamp.from_utc_datetime on it will raise a ValueError. A type check should be added to handle both Timestamp and datetime.datetime objects safely.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if publish_time_field: | ||
| values[publish_time_field] = Timestamp.from_utc_datetime( | ||
| msg.publish_time) |
There was a problem hiding this comment.
In production, msg.publish_time returned by ReadFromPubSub is already an instance of Timestamp (from apache_beam.utils.timestamp). Calling Timestamp.from_utc_datetime on it directly will raise a ValueError because it expects a datetime.datetime object.
To safely handle both the production case (where it is a Timestamp), the test case (where it is a datetime.datetime), and cases where it might be None, we should check the type of msg.publish_time before converting.
if publish_time_field:
if isinstance(msg.publish_time, Timestamp):
values[publish_time_field] = msg.publish_time
elif msg.publish_time is not None:
values[publish_time_field] = Timestamp.from_utc_datetime(
msg.publish_time)
else:
values[publish_time_field] = NoneThere was a problem hiding this comment.
Code Review
This pull request adds a new publish_time_field parameter to the read_from_pubsub function in Apache Beam's YAML IO, enabling users to include the Pub/Sub message publish time in the output rows. A critical issue was identified where calling Timestamp.from_utc_datetime on msg.publish_time will raise an AttributeError in production because publish_time is already a Timestamp object. It is recommended to use Timestamp.of instead to safely handle both Timestamp and datetime objects.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| values[publish_time_field] = Timestamp.from_utc_datetime( | ||
| msg.publish_time) |
There was a problem hiding this comment.
In production, PubsubMessage.publish_time is already a Timestamp object (from apache_beam.utils.timestamp), not a datetime.datetime object. Calling Timestamp.from_utc_datetime on it will raise an AttributeError because Timestamp does not have the datetime attributes expected by from_utc_datetime.\n\nUsing Timestamp.of is much safer and more robust as it seamlessly handles both Timestamp and datetime.datetime objects (which also keeps your unit tests passing).
values[publish_time_field] = Timestamp.of(msg.publish_time)
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #38985 +/- ##
============================================
+ Coverage 55.86% 55.87% +0.01%
Complexity 3912 3912
============================================
Files 1320 1320
Lines 180905 180934 +29
Branches 2671 2671
============================================
+ Hits 101065 101101 +36
+ Misses 76944 76937 -7
Partials 2896 2896
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
assign set of reviewers |
|
Assigning reviewers: R: @tvalentyn for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Adds
publish_time_fieldsupport to the Beam YAMLReadFromPubSubtransform.When configured,
ReadFromPubSubexposes the Pub/Sub messagepublish_timeas a field on the outputRow, allowing downstream transforms and sinks such as BigQuery to store or inspect the original Pub/Sub publish time.addresses #35127