Skip to content

Add publish_time_field to ReadFromPubSub YAML transform#38985

Open
lalitx17 wants to merge 3 commits into
apache:masterfrom
lalitx17:publish_time_field_to_yaml_pubsub
Open

Add publish_time_field to ReadFromPubSub YAML transform#38985
lalitx17 wants to merge 3 commits into
apache:masterfrom
lalitx17:publish_time_field_to_yaml_pubsub

Conversation

@lalitx17

Copy link
Copy Markdown
Contributor

Adds publish_time_field support to the Beam YAML ReadFromPubSub transform.

When configured, ReadFromPubSub exposes the Pub/Sub message publish_time as a field on the output Row, allowing downstream transforms and sinks such as BigQuery to store or inspect the original Pub/Sub publish time.

type: ReadFromPubSub
config:
  topic: ...........
  format: RAW
  publish_time_field: pubsub_publish_time

addresses #35127

@lalitx17 lalitx17 marked this pull request as ready for review June 16, 2026 17:10
Copilot AI review requested due to automatic review settings June 16, 2026 17:10
@lalitx17

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces the ability to extract and expose the Pub/Sub message publish time within the Apache Beam YAML 'ReadFromPubSub' transform. By adding a 'publish_time_field' configuration, users can now easily propagate this metadata to downstream components, such as BigQuery, which is particularly useful for time-series analysis and data auditing.

Highlights

  • Pub/Sub Publish Time Support: Added the 'publish_time_field' configuration option to the 'ReadFromPubSub' YAML transform, allowing users to include the message publish time in the output row.
  • Schema Enhancement: Updated the transform logic to include the publish time as a timestamp field in the resulting Beam Row, enabling better downstream processing.
  • Test Coverage: Added a new test case in 'yaml_io_test.py' to verify that the publish time is correctly captured and mapped when the 'publish_time_field' is configured.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds YAML Pub/Sub IO support for emitting Pub/Sub message publish time as an output field, with a dedicated unit test.

Changes:

  • Introduces publish_time_field option to ReadFromPubSub YAML IO to append message publish time to output rows.
  • Forces ReadFromPubSub(with_attributes=True) when publish time is requested so publish time is available.
  • Adds a Python unit test validating publish time propagation into a new row field.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.

File Description
sdks/python/apache_beam/yaml/yaml_io.py Adds publish_time_field support, schema wiring, and mapping logic to include Pub/Sub publish time in output rows.
sdks/python/apache_beam/yaml/yaml_io_test.py Adds coverage for reading Pub/Sub messages with publish_time_field enabled.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +371 to +372
publish_time_field: Field to add to output messages with the Pub/Sub
message publish time. If None, no such field is added.
Comment thread sdks/python/apache_beam/yaml/yaml_io.py Outdated
payload_schema, parser = _create_parser(format, schema)
extra_fields: list[schema_pb2.Field] = []
if not attributes and not attributes_map:
if not attributes and not attributes_map and not publish_time_field:
Comment thread sdks/python/apache_beam/yaml/yaml_io.py Outdated
Comment on lines +391 to +392
if publish_time_field:
extra_fields.append(schemas.schema_field(publish_time_field, Timestamp))
Comment thread sdks/python/apache_beam/yaml/yaml_io.py Outdated
Comment on lines +402 to +404
if publish_time_field:
values[publish_time_field] = Timestamp.from_utc_datetime(
msg.publish_time)
beam.Row(payload=b'msg2', attrMap={'attr': 'value2'})
]))

def test_read_with_publish_time_field(self):
Comment on lines +191 to +192
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
Comment on lines +201 to +208
result = p | YamlTransform(
'''
type: ReadFromPubSub
config:
topic: my_topic
format: RAW
publish_time_field: publish_time
''')

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the publish_time_field parameter to read_from_pubsub in Apache Beam's YAML IO, allowing users to include the Pub/Sub message publish time in the output rows. However, a critical issue was identified where msg.publish_time is already a Timestamp object in production, meaning that calling Timestamp.from_utc_datetime on it will raise a ValueError. A type check should be added to handle both Timestamp and datetime.datetime objects safely.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread sdks/python/apache_beam/yaml/yaml_io.py Outdated
Comment on lines +402 to +404
if publish_time_field:
values[publish_time_field] = Timestamp.from_utc_datetime(
msg.publish_time)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In production, msg.publish_time returned by ReadFromPubSub is already an instance of Timestamp (from apache_beam.utils.timestamp). Calling Timestamp.from_utc_datetime on it directly will raise a ValueError because it expects a datetime.datetime object.

To safely handle both the production case (where it is a Timestamp), the test case (where it is a datetime.datetime), and cases where it might be None, we should check the type of msg.publish_time before converting.

      if publish_time_field:
        if isinstance(msg.publish_time, Timestamp):
          values[publish_time_field] = msg.publish_time
        elif msg.publish_time is not None:
          values[publish_time_field] = Timestamp.from_utc_datetime(
              msg.publish_time)
        else:
          values[publish_time_field] = None

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds a new publish_time_field parameter to the read_from_pubsub function in Apache Beam's YAML IO, enabling users to include the Pub/Sub message publish time in the output rows. A critical issue was identified where calling Timestamp.from_utc_datetime on msg.publish_time will raise an AttributeError in production because publish_time is already a Timestamp object. It is recommended to use Timestamp.of instead to safely handle both Timestamp and datetime objects.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread sdks/python/apache_beam/yaml/yaml_io.py Outdated
Comment on lines +403 to +404
values[publish_time_field] = Timestamp.from_utc_datetime(
msg.publish_time)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In production, PubsubMessage.publish_time is already a Timestamp object (from apache_beam.utils.timestamp), not a datetime.datetime object. Calling Timestamp.from_utc_datetime on it will raise an AttributeError because Timestamp does not have the datetime attributes expected by from_utc_datetime.\n\nUsing Timestamp.of is much safer and more robust as it seamlessly handles both Timestamp and datetime.datetime objects (which also keeps your unit tests passing).

        values[publish_time_field] = Timestamp.of(msg.publish_time)

@codecov

codecov Bot commented Jun 16, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 55.87%. Comparing base (b7bc937) to head (e9f3222).
⚠️ Report is 5 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #38985      +/-   ##
============================================
+ Coverage     55.86%   55.87%   +0.01%     
  Complexity     3912     3912              
============================================
  Files          1320     1320              
  Lines        180905   180934      +29     
  Branches       2671     2671              
============================================
+ Hits         101065   101101      +36     
+ Misses        76944    76937       -7     
  Partials       2896     2896              
Flag Coverage Δ
python 79.28% <100.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@lalitx17

Copy link
Copy Markdown
Contributor Author

assign set of reviewers

@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @tvalentyn for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants