python - Apache beam pipeline not running to completion after applying windowing and a trigger and no errors thrown - Stack Over

Hello am in process of creating an apache beam data pipiline that runs in GCP using DataFlow as runner.

Hello am in process of creating an apache beam data pipiline that runs in GCP using DataFlow as runner.Below is a my code ,it does not throw any errors but the issue is no data is written into bigquery when I check the Job execution graph there seems to be no activity on the "write to bigquery" section of the pipeline yet I can see activities in data commming from the pubsub activity.I have gone even ahead to add a trigger but still no data is coming out .Please assit .Thanks

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam import window
import typing
import numpy as np
import pandas as pd


class BmsSchema(typing.NamedTuple):
  can_data_frame_1: typing.Optional[str]


beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)


class ParsePubSubMessage(beam.DoFn):
  def process(self, message):
      import json
      all_columns = [
          "can_data_frame_1"
      ]
      main_dict = dict(zip(all_columns, [None] * len(all_columns)))
      record = json.loads(message.decode('utf-8'))
      main_dict.update(record)
      yield {
          all_columns[0]: main_dict[all_columns[0]]}


def run():
  options = PipelineOptions(
      project='dwingestion',
      runner='DataflowRunner',
      streaming=True,
      temp_location='gs://....../temp',
      staging_location='gs://.........../staging',
      region='europe-west1',
      job_name='.........streaming-pipeline-dataflow',
      save_main_session=True,
      flags=['--allow_unsafe_triggers']
  )

  options.view_as(StandardOptions).streaming = True

  input_subscription = 'projects/..._data_streaming'

  table_schema = {
      "fields": [
          {"name": "current_mA", "type": "INTEGER", "mode": "NULLABLE"}
      ]
  }

  with beam.Pipeline(options=options) as p:
      messages = (p
                  | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                  | 'Apply Fixed Window' >> beam.WindowInto(
                      window.FixedWindows(60),
                      trigger=beam.trigger.AfterWatermark(),
                      allowed_lateness=window.Duration(10),
                      accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
                  )
                  | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                  | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                  )

      # Convert the messages to a DataFrame
      df = to_dataframe(messages)
      

      # Extract and process the 'current_mA' field
      df['current_mA'] = df['can_data_frame_1'].str[4:8].apply(lambda x: int(x, 16) if pd.notna(x) else 0)
      df['current_mA'] = df['current_mA'].where(df['current_mA'] < 0x8000, df['current_mA'] - 0x10000)
      df['current_mA'] = df['current_mA'] * 10

      
      # Convert back to PCollection and map to dictionaries
      transformed_pcol = (
          to_pcollection(df)
          | 'Log Transformed PCollection' >> beam.Map(lambda x: (print(f"Transformed Row: {x}"), x)[1])  # Debugging
          | 'Convert to Dict with Native Types' >> beam.Map(lambda row: {
              "current_mA": int(row.current_mA) if row.current_mA is not None else None
          })
      )

      # Write to BigQuery
      transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(
          table='..........table_test_all_columns_04',
          write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
          schema=table_schema,
          custom_gcs_temp_location='gs://......_template/temp'
      )


if __name__ == '__main__':
  run()


发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745628617a4636964.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信