Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.2k views
in Technique[技术] by (71.8m points)

python - PySpark: py4j.protocol.Py4JJavaError: An error occurred while calling o215.save

I am trying to create and load the pickle file for Kmeans model in Pyspark. I am using Python 3.7.9 and PySpark version 3.0.1. I am able to create a pickle file but getting below error:

Code:

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
import joblib
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Session1').getOrCreate()

mindset_agg_df = spark.read.csv(config.mindset_level2_output, header=True, inferSchema=True)


def customer_seg(data, k_value):
feature_cols = data.columns
feature_cols.remove('Cst_ID')
data = data.na.fill(0).drop('Cst_ID')
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_data = assembler.transform(data)

scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
transformed_data = scaler.fit(assembled_data).transform(assembled_data)

kmeans = KMeans(featuresCol='scaled_features', k=k_value)
model = kmeans.fit(transformed_data)
data4 = model.transform(transformed_data)

data5 = data4.withColumn("cluster_name", data4.prediction + 1)
data6 = data5.select('cluster_name')
data6 = data6.withColumn("id_col", F.monotonically_increasing_id())
data = data.withColumn("id_col", F.monotonically_increasing_id())
data_final = data.join(data6, on="id_col", how="inner")

# You can drop the id_col column after this
data_final = data_final.drop("id_col")

# Saving the Model as Pickle file
# joblib.dump(data4, config.file_path_customer + 'kmeans_model.pkl')
model_path = config.file_path_customer + 'model_kmeans'
model.save(model_path)

return data_final
# Calling the function
df_cluster = customer_seg(mindset_agg_df, 4)
print(df_cluster.show())

Above is the code which I am running in Pyspark version 3.0.1.

Error:

Traceback (most recent call last):



File "C:/Users/1624004/PycharmProjects/predictcx_15122020/src/core/customer/customer_seg.py", line 87, in <module>
    df_cluster = customer_seg(mindset_agg_df, 4)
  File "C:/Users/1624004/PycharmProjects/predictcx_15122020/src/core/customer/customer_seg.py", line 74, in customer_seg
    model.save(model_path)
  File "C:Users1624004PycharmProjectspredictcxvenvlibsite-packagespysparkmlutil.py", line 224, in save
    self.write().save(path)
  File "C:Users1624004PycharmProjectspredictcxvenvlibsite-packagespysparkmlutil.py", line 175, in save
    self._jwrite.save(path)
  File "C:Users1624004PycharmProjectspredictcxvenvlibsite-packagespy4jjava_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:Users1624004PycharmProjectspredictcxvenvlibsite-packagespysparksqlutils.py", line 128, in deco
    return f(*a, **kw)
  File "C:Users1624004PycharmProjectspredictcxvenvlibsite-packagespy4jprotocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o215.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
    at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1552)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1552)
    at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1538)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1538)
    at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
    at org.apache.spark.ml.clustering.InternalKMeansModelWriter.write(KMeans.scala:197)
    at org.apache.spark.ml.util.GeneralMLWriter.saveImpl(ReadWrite.scala:260)
    at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 245, 01HW1729894, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 C:Users1624004PycharmProjectspredictcx_15122020srcfilestoreoutputsegmentationmodel_kmeansmetadata\_temporary\_temporaryattempt_20210113101034_0096_m_000000_0part-00000
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
    at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
    at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
    at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:230)
    at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:120)
    at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:83)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2152)
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
    ... 51 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 C:Users1624004PycharmProjectspredictcx_15122020srcfilestoreoutputsegmentationmodel_kmeansmetadata\_temporary\_temporaryattempt_20210113101034_

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You can't pickle a dataframe. I think you're trying to save the model, not the dataframe data4. If so, you can call the save method on the model:

# ... previous code in the function ...

kmeans = KMeans(featuresCol='scaled_features', k=k_value)
model = kmeans.fit(transformed_data)
model.save(config.file_path_customer + 'kmeans_model.pkl')

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share

2.1m questions

2.1m answers

63 comments

56.6k users

...