apache spark sql - Aggregate across multiple columns of different types when pivoting in pyspark - Stack Overflow

I have a melted table of the form:+------+---------+--------------+------------+--------------+| time

I have a melted table of the form:


+------+---------+--------------+------------+--------------+
| time | channel | value_double | value_long | value_string |
+------+---------+--------------+------------+--------------+
|    0 | A       | 1.1          | null       | null         |
|    0 | B       | null         | 1          | null         |
|    0 | C       | null         | null       | "foo"        |
|    1 | A       | 2.1          | null       | null         |
|    1 | B       | null         | 2          | null         |
|    1 | C       | null         | null       | "bar"        |
|    2 | A       | 3.1          | null       | null         |
|    2 | B       | null         | 3          | null         |
|    2 | C       | null         | null       | "foobar"     |
+------+---------+--------------+------------+--------------+

And I'd like to pivot this table to be:

+------+-----+---+----------+
| time | A   | B | C        |
+------+-----+---+----------+
| 0    | 1.1 | 1 | "foo"    |
| 1    | 2.1 | 2 | "bar"    |
| 2    | 3.1 | 3 | "foobar" |
+------+-----+---+----------+

I've got something along the lines of:

df.groupBy("time").pivot("channel").agg(...)

But I'm strugging to fill the agg function to aggregate across the different values. I've tried coalesce but it runs into errors because of the distinct types between the columns.

Some key points:

  • The three value columns have distinct types (double, long, and string)
  • The typing is consistent per channel
  • There is always one and only one value column with data per row

Is this possible with PySpark/SparkSQL?

I have a melted table of the form:


+------+---------+--------------+------------+--------------+
| time | channel | value_double | value_long | value_string |
+------+---------+--------------+------------+--------------+
|    0 | A       | 1.1          | null       | null         |
|    0 | B       | null         | 1          | null         |
|    0 | C       | null         | null       | "foo"        |
|    1 | A       | 2.1          | null       | null         |
|    1 | B       | null         | 2          | null         |
|    1 | C       | null         | null       | "bar"        |
|    2 | A       | 3.1          | null       | null         |
|    2 | B       | null         | 3          | null         |
|    2 | C       | null         | null       | "foobar"     |
+------+---------+--------------+------------+--------------+

And I'd like to pivot this table to be:

+------+-----+---+----------+
| time | A   | B | C        |
+------+-----+---+----------+
| 0    | 1.1 | 1 | "foo"    |
| 1    | 2.1 | 2 | "bar"    |
| 2    | 3.1 | 3 | "foobar" |
+------+-----+---+----------+

I've got something along the lines of:

df.groupBy("time").pivot("channel").agg(...)

But I'm strugging to fill the agg function to aggregate across the different values. I've tried coalesce but it runs into errors because of the distinct types between the columns.

Some key points:

  • The three value columns have distinct types (double, long, and string)
  • The typing is consistent per channel
  • There is always one and only one value column with data per row

Is this possible with PySpark/SparkSQL?

Share Improve this question asked Nov 16, 2024 at 4:33 GolferDudeGolferDude 644 bronze badges 2
  • are channels A, B, C always guaranteed to having non-null values that are all the same type? – Derek O Commented Nov 17, 2024 at 0:46
  • 1 Yes, they are. They will always have one and only one non null value of the same type – GolferDude Commented Nov 17, 2024 at 1:07
Add a comment  | 

4 Answers 4

Reset to default 0

Use coalesce and first functions together.

from pyspark.sql import functions as F
...
df = df.groupBy('time').pivot('channel').agg(F.first(F.coalesce('value_double', 'value_long', 'value_string')))

Breaking down the steps for better understanding :

from pyspark.sql.functions import col, when, coalesce


# Identifying the appropriate value column for each channel:
df = df.withColumn("value", 
                   when(col("channel") == "A", col("value_double"))
                   .when(col("channel") == "B", col("value_long"))
                   .otherwise(col("value_string")))
                  )

# Then, pivot the DataFrame:
df_pivoted = df.groupBy("time").pivot("channel").agg(first("value"))

df_pivoted.show()

EDIT : leveraging dynamic schema inference and data type conversion as the number of channels are high and mapping is not efficient.

from pyspark.sql.functions import col, when, coalesce, to_date, to_timestamp, to_timestamp_ntz


# Define a function to dynamically infer the appropriate data type:
def infer_data_type(value):
    try:
        return float(value)
    except ValueError:
        try:
            return int(value)
        except ValueError:
            try:
                return to_date(value)
            except ValueError:
                try:
                    return to_timestamp(value)
                except ValueError:
                    return to_timestamp_ntz(value)
                except ValueError:
                    return str(value)

# Create a UDF for the data type inference:
infer_type_udf = udf(infer_data_type)

# Pivot the DataFrame, inferring the data type for each value:
df_pivoted = df.groupBy("time").pivot("channel").agg(first(infer_type_udf("value")))

If it is guaranteed that there is one non-null value for each channel and each value you could restructure the DataFrame before pivoting:

df_filtered = []

for value_col in ["value_double","value_long","value_string"]:
    df_filtered.append(df.select("time","channel",value_col).dropna().groupby("time").pivot("channel").agg(F.max(value_col)))

for i, df_curr in enumerate(df_filtered):
    if i == 0:
        df_all = df_curr
    else:
        df_all = df_all.join(df_curr, on=['time'], how='inner')

Result:

+----+---+---+------+
|time|  A|  B|     C|
+----+---+---+------+
|   1|2.1|  2|   bar|
|   2|3.1|  3|foobar|
|   0|1.1|  1|   foo|
+----+---+---+------+

Use colRegex to dynamically identify the value_ like columns, then create a mapping (m) from the value columns

cols = df.select(df.colRegex(r'`value_.*`')).columns
m = F.create_map(*[y for c in cols for y in (F.lit(c), F.col(c))])
df1 = df.select('time', 'channel', m.alias('m'))

# df1.show()
# +----+-------+--------------------------------------------------------------------+
# |time|channel|m                                                                   |
# +----+-------+--------------------------------------------------------------------+
# |0   |A      |{value_double -> 1.1, value_long -> null, value_string -> null}     |
# |0   |B      |{value_double -> null, value_long -> 1.0, value_string -> null}     |
# |0   |C      |{value_double -> null, value_long -> null, value_string -> "foo"}   |
# |1   |A      |{value_double -> 2.1, value_long -> null, value_string -> null}     |
# |1   |B      |{value_double -> null, value_long -> 2.0, value_string -> null}     |
# |1   |C      |{value_double -> null, value_long -> null, value_string -> "bar"}   |
# |2   |A      |{value_double -> 3.1, value_long -> null, value_string -> null}     |
# |2   |B      |{value_double -> null, value_long -> 3.0, value_string -> null}     |
# |2   |C      |{value_double -> null, value_long -> null, value_string -> "foobar"}|

Use map_filter to remove the null key-value pairs

df1 = df1.withColumn('m', F.map_filter('m', lambda k, v: ~F.isnull(v)))

# df1.show()
# +----+-------+--------------------------+
# |time|channel|m                         |
# +----+-------+--------------------------+
# |0   |A      |{value_double -> 1.1}     |
# |0   |B      |{value_long -> 1.0}       |
# |0   |C      |{value_string -> "foo"}   |
# |1   |A      |{value_double -> 2.1}     |
# |1   |B      |{value_long -> 2.0}       |
# |1   |C      |{value_string -> "bar"}   |
# |2   |A      |{value_double -> 3.1}     |
# |2   |B      |{value_long -> 3.0}       |
# |2   |C      |{value_string -> "foobar"}|
# +----+-------+--------------------------+

Pivot the data frame by time and channel

df1 = df1.groupBy('time').pivot('channel').agg(F.first('m'))

# df1.show()
# +----+---------------------+-------------------+--------------------------+
# |time|A                    |B                  |C                         |
# +----+---------------------+-------------------+--------------------------+
# |0   |{value_double -> 1.1}|{value_long -> 1.0}|{value_string -> "foo"}   |
# |1   |{value_double -> 2.1}|{value_long -> 2.0}|{value_string -> "bar"}   |
# |2   |{value_double -> 3.1}|{value_long -> 3.0}|{value_string -> "foobar"}|
# +----+---------------------+-------------------+--------------------------+

Use map_values to extract the value from the mapping

df1 = df1.select('time', *[F.map_values(c)[0].alias(c) for c in df2.columns[1:]])

# df1.show()
# +----+---+---+--------+
# |time|A  |B  |C       |
# +----+---+---+--------+
# |0   |1.1|1.0|"foo"   |
# |1   |2.1|2.0|"bar"   |
# |2   |3.1|3.0|"foobar"|
# +----+---+---+--------+

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745665525a4639099.html

相关推荐

  • 面试官:从三万英尺角度谈一下Ceph架构设计(1)

    把面试官当陪练,在找工作中才会越战越勇大家好我是小义同学,这是大厂面试拆解——项目实战系列的第3篇文章,如果有误,请指正。本文主要解决的一个问题,Ceph为例子 如何描述项目的架构。一句话描述:主要矛盾发生变化10年前的技术和方案,放到10

    1小时前
    00
  • 20 万 POC,直接拿来用,这不是测试,这是拒绝服务!!!

    之前看到很多人分享 github 上的一个项目,自动收录全网 nuclei 模板文件,目前已经 19 万了,如果直接拿来对目标进行漏洞探测,无疑会对目标造成巨大伤害,意味着可能要对目标发起十九万次请求以上,可以说是一次小型的 DDoS 攻击

    1小时前
    00
  • 长读长测序揭示结直肠癌异常可变剪接图谱与新型治疗靶点

    徐州医科大学肿瘤研究所董东郑骏年教授团队在Genome Medicine杂志发表题为“Long-read sequencing reveals the landscape of aberrant alternative splicing

    1小时前
    00
  • OWASP TOP10

    什么是OWASP?它的全称是 Open Web Application Security Project(开放式 Web 应用程序 安全 项目)TOP 10OWASP Top 10的意思就是10项最严重的Web 应用程序安全风险列表 ,它总

    59分钟前
    00
  • 开源在线考试系统

    看到调问已经开始扩展在线考试的场景,试了一下,发现在线考试的基本能力都已经支持了。主要是考试中的各种计分功能,包括对每道题的选项设置分值计算、考试时间限制等,和官方了解了一下,考试中的其他各项能力也在逐步完善,有需求可以随时

    53分钟前
    00
  • Power BI 无公式实现帕累托图表

    帕累托分析(Pareto Analysis),也被称为8020法则、关键少数法则,是一种常用的管理工具,用于识别和处理影响业务的主要因素。看到李伟坚老师在Excel使用Vega实现了花式帕累托(参考:Excel 零公式实现高级帕累托图表)

    52分钟前
    00
  • 深度学习在DOM解析中的应用:自动识别页面关键内容区块

    爬虫代理摘要本文介绍了如何在爬取东方财富吧()财经新闻时,利用深度学习模型对 DOM 树中的内容区块进行自动识别和过滤,并将新闻标题、时间、正文等关键信息分类存储。文章聚焦爬虫整体性能瓶颈,通过指标对比、优化策略、压测数据及改进结果,展示了

    43分钟前
    10
  • UCB的硅光MEMS OCS控制技术

    昨天写的旭创与nEye合作了一个64端口硅光OCS一、光电路交换技术概述(一)电交换与分组交换电路交换的概念源于早期的电话交换机,通过物理连接实现设备间的通信,就像早期打电话时,接线员手动连接线路一样。而分组交换则是

    40分钟前
    00
  • 推荐一个轻量级的监控平台并且支持移动端

    简介XUGOU 是基于Cloudflare构建的轻量化监控平台,专精于系统资源监控与可视化状态页面服务。该平台提供英文简体中文双语支持,满足全球化部署需求。面向开发者及中小团队,项目致力于提供高可用性的监控解决方案。核心功能与实现平台功能

    39分钟前
    00
  • maxwell遇到的一则问题

    结论和原因maxwell的元数据库里面没有存储全部的schema数据(就是少数据了),导致相关表的DDL校验失败。PS:我这里maxwell的作用只是采集库表修改情况的统计粗粒度指标,因为之前maxwell在运行报错的时候,直接修改了pos

    31分钟前
    00
  • Windows Server20192022 Evaluation评估版未激活导致关机问题

    摘要:在安装Windows Server 20192022后,会出现系统版本为 Evaluation 评估版情况,如提示Windows许可证已到期,就

    29分钟前
    00
  • 人工智能与ai有什么区别

    一、引言:概念之辨的必要性在科技浪潮席卷全球的当下,人工智能(Artificial Intelligence,简称AI)已成为人们耳熟能详的词汇。然而,当我们深入探讨时,会发现“人工智能”与“AI”这两个表述在语义和使用场景上存在微妙差异。

    29分钟前
    00
  • Nat. Mater.

    大家好,今天给大家分享一篇近期发表在Nat. Mater.上的研究进展,题为:De novo design of self-assembling peptides with antimicrobial activity guided

    26分钟前
    00
  • 人工智能适合什么人学

    一、引言:人工智能浪潮下的新机遇在当今科技飞速发展的时代,人工智能(AI)无疑是最为耀眼的技术明星之一。从智能语音助手到自动驾驶汽车,从医疗诊断辅助到金融风险预测,人工智能正以前所未有的速度改变着我们的生活和工作方式。随着全球领先的终身学习

    23分钟前
    00
  • Windows系统密钥检测工具PIDKey 2.1中文版

    Windows系统密钥检测工具PIDKey 2.1中文版 【下载地址】Windows系统密钥检测工具PIDKey2.1中文版 Windows系统密钥检测工具PIDKey 2.1中文版是一款功能强大的工具,专为管理Win

    23分钟前
    00
  • 子网掩码是怎么“掩”的?用积木教你彻底搞懂!

    子网掩码是怎么“掩”的?用积木教你彻底搞懂!前言肝文不易,点个免费的赞和关注,有错误的地方请指出,看个人主页有惊喜。作者:神的孩子都在歌唱你是不是也曾被“子网掩码”这个术语搞得晕头转向?明明是学网络的第一步,却像是打开了数学世界的大门:2

    21分钟前
    00
  • 1.54G 雨晨 26100.3775 Windows 11 IoT 企业版 LTSC 24H2 极速版

    精简AERO外主题并增加一套壁纸主题(默认启用)误杀导致功能界面空白、因WMIC被默认移除系统可能会多次重启。 拒止连接 www.5909 拒止连接 www.mnpc 拒止连接 quark 拒止

    21分钟前
    00
  • 雨晨 26200.5516 Windows 11 IoT 企业版 LTSC 2024 轻装版

    简述:以下为YCDISM (雨晨作品自2025年03月25日起通用介绍,若无重大更改不再额外敖述) 全程由最新YCDISM2025脱机装载26100.1742_zh-cn_windows_11_

    19分钟前
    00
  • 用Xshell8配置密钥登陆

    1.首先在服务端查看root.sshauthorized_keys是否存在,这是存储公钥的文件,若不存在需新建此文件 2. 打开Xshell8,选择"新建",选择"新建用户密钥生成向导" 给用户

    15分钟前
    00
  • windows切换系统版本

    powershell 管理员身份打开 输入 irm massgrave.devget | iex 输入数字 对应后面写着 change windows edition新的会话框中选择想要的版本即可 获取windows 密钥 官方提供的

    13分钟前
    00

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信