Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
288 views
in Technique[技术] by (71.8m points)

python - Convert list of strings to list of json objects in pyspark

def tranform_doc(docs):
    json_list = []
    print(docs)
    for doc in docs:
        json_doc = {}
        json_doc["customKey"] = doc
        json_list.append(json_doc)
    return json_list


df.groupBy("colA") 
            .agg(custom_udf(collect_list(col("colB"))).alias("customCol"))

First Hurdle:
Input: ["str1","str2","str3"]

Output: [{"customKey":"str1"},{"customKey":"str2"},{"customKey":"str3"}]

Second Hurdle:
columns in agg collect_list are changing dynamically. So, how to adjust schema dynamically.

when elements in list changes, receiving an error Input row doesn't have expected number of values required by the schema. 1 fields are required while 3 values are provided

What I did:

def tranform_doc(agg_docs):
    return json_list
## When I failed to get a list of JSON I tried just return the original list of strings to the list of json


schema = StructType([{StructField("col1",StringType()),StructField("col2",StringType()),StructField("col3",StringType())}])

custom_udf = udf(tranform_doc,schema)

df.groupBy("colA") 
            .agg(custom_udf(collect_list(col("colB"))).alias("customCol"))

Output I got: {"col2":"str1","col1":"str2","col3":"str3"}

Struggling to get the required list of JSON strings and to make it dynamical to number of elements in the list


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

No UDF needed. You can convert colB to a struct before collect_list.

import pyspark.sql.functions as F

df2 = df.groupBy('colA').agg(
    F.to_json(
        F.collect_list(
            F.struct(F.col('colB').alias('customKey'))
        )
    ).alias('output')
)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...