我是 pyspark 的新手,我正在处理一个复杂的数据框。经过一些过滤后,我一直试图从列表中获取N行到我的 df.column 中。 我有以下 df.struct: root |-- struct1: struct (nullable = true) | |-- array1: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- struct2 : struct (nullable = true) | | | | |-- date: string (nullable = true) | | | | |-- value: string (nullable = true) | | | |-- struct3 : struct (nullable = true) | | | | |-- date: string (nullable = true) | | | | |-- value: string (nullable = true) | | | |-- property: string (nullable = true) 我想要实现的是在属性为 Good 时获取所有 struct2.values 的总和。因为我可以为 array1 设置多个 (N) 个值。 现在,我得到了第一个财产的小句子。但我无法以成功的方式将它传递给 udf 以遍历所有可能的行: df.withColumn("Sum", (col('struct1.array1')[0])['property']) 我想到的一些步骤是: 当 property=Good 时过滤列表中的每个元素 以 struct3.value 的总和返回 udf 中的 lambda 值 期望的输出应该是这样的: None +---------------------------------------------------------------------------------------------------------+ |Struct1 |Sum| +---------------------------------------------------------------------------------------------------------+ |[[[[2020-01-01, 10], [2020-02-02, 15], Good], [[2020-01-01, 20], [2020-02-02, 25], Good]]] |20| +---------------------------------------------------------------------------------------------------------+ 任何帮助将不胜感激

在这种情况下,您不一定需要 UDF。当使用 Spark >= 2.4.0 时,您只需使用内置的高阶函数即可实现相同的效果,如下所示: from pyspark.sql.functions import expr df.withColumn("good_elements", expr("""transform( \ filter(struct1.array1, e -> e.property == 'Good'), e -> cast(e.struct2.value as int) )""")) \ .withColumn("sum", expr("aggregate(good_elements, 0, (sum, e) -> sum + e)")) filter(struct1.array1, e -> e.property == 'Good'):首先我们过滤具有property == 'Good' transform(..., e -> cast(e.struct2.value as int):接下来我们将每个项目转换为整数并将它们存储到一个名为good_elements aggregate(good_elements, 0, (sum, e) -> sum + e)sum:最后我们通过计算总和来创建列good_elements

这是一个很好的解决方案。我以前从未听说过变换。一个问题,它抛出一个错误:pyspark.sql.utils.AnalysisException: "cannot resolve ' struct2.value ' given input columns: [struct1];; I've tried to add the path since the beginning but still doesn't work.Do我需要做更多的事情来尝试您的解决方案吗?

是的,@Als 错了,不e.struct2.value应该struct2.value。e 是将遍历 的项目的迭代器filter(struct1.array1, e -> e.property == 'Good')。我刚改了

我正在调试该函数,我意识到迭代器 e 具有值。谢谢你的纠正。@Alexandros Biratsis 真的很有帮助。

我很高兴能帮助@Als