如何运行一个在pandas-on-spark API中迭代应用正则表达式的函数?

huangapple go评论69阅读模式
英文:

How do I run a function that applies regex iteratively in pandas-on-spark API?

问题

I see that you're encountering various issues while trying to run your code in a Spark environment. It's difficult to pinpoint the exact problem without having access to the full environment and logs, but I can provide some general suggestions to troubleshoot:

  1. Spark Context Shutdown: The error message "Job canceled because SparkContext was shut down" indicates that the SparkContext might be getting shut down prematurely. Ensure that you have a correctly configured SparkContext and that there are no conflicts with other Spark applications running in the same cluster.

  2. Checkpointing: Checkpointing can help break the lineage and optimize execution plans. However, it seems that checkpointing is causing issues. Ensure that you have a valid checkpoint directory specified and that you have the necessary permissions to write to it.

  3. Arrow Optimization: The warning about Arrow optimization suggests an issue with Arrow serialization. You can try disabling Arrow optimization by setting spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false") in your Spark configuration.

  4. Resource Configuration: Make sure that your Spark cluster has enough resources (CPU, memory) to handle the workload. You can adjust Spark's resource allocation settings to match the requirements of your job.

  5. Logging and Debugging: Review the Spark logs (both driver and worker logs) for more detailed error messages. They can provide insights into what's causing the job to fail.

  6. Try Smaller Datasets: If your DataFrame is large, try running your code on a smaller subset of data to see if it works without issues. This can help isolate the problem.

  7. Cluster Version Compatibility: Ensure that the Spark version you are using is compatible with other components and libraries you are using, such as pandas-on-spark.

  8. PySpark UDFs: If you are using custom Python functions (like resolve_abbreviations) in PySpark, be aware that they may not be optimized for distributed execution. Consider using built-in PySpark functions whenever possible.

  9. Cluster Configuration: Check if there are any cluster-specific configurations or restrictions in your Azure Synapse environment that might affect your Spark job.

  10. Consult Documentation: Review the documentation of pandas-on-spark and Azure Synapse for any specific best practices or known issues related to your setup.

Remember that debugging Spark issues can sometimes be complex due to the distributed nature of Spark. It might be helpful to consult with your Spark cluster administrators or seek support from your cloud provider for specific issues related to your environment.

英文:

I am using pandas-on-spark in combination with regex to remove some abbreviations from a column in a dataframe. In pandas this all works fine, but I have the task to migrate this code to a production workload on our spark cluster, and therefore decided to use pandas-on-spark. I'm experiencing issues with the function below. I'm using it to clean up a number of abbreviations (Somewhat simplified here for readability purposes, in reality abbreviations_dict has 61 abbreviations and patterns is a list with three regex patterns - so for loop iterates 61x3 = 183 iterations). df["SchoneFunctie"] is a pyspark.pandas.Series of approx 420k rows. I'm running this code on an Apache spark pool in Azure Synapse, with Spark version = 3.3. (To be a bit more specific: 3.3.1.5.2-90111858)

import pyspark.pandas as pspd

def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
    """
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    """
    abbreviations_dict = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                    fr'{abb}\.']
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)

    return job_list

The problem & things I've tried:

As per pandas-on-spark best practices docs, I'm trying to checkpoint my dataframe after this function, as it's a function with a bunch of iterations so the lineage can get big quite fast. df.spark.explain() gives a query plan of 373 lines. Please find a snippet of it below:

*(186) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6074 AS SchoneFunctie#5881]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6073)#5878], [pythonUDF0#6074], 200
   +- *(185) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6073]
      +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6072)#5873], [pythonUDF0#6073], 200
         +- *(184) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6072]
            +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6071)#5868], [pythonUDF0#6072], 200
               +- *(183) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6071]
                  +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6070)#5863], [pythonUDF0#6071], 200
                     +- *(182) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6070]
                        +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6069)#5858], [pythonUDF0#6070], 200
                           +- *(181) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6069]
                              +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6068)#5853], [pythonUDF0#6069], 200
                                 +- *(180) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6068]
                                    +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6067)#5848], [pythonUDF0#6068], 200

However, whatever I'm trying, I can't succesfully run this function without running into errors.

Simply calling resolve_abbreviations and trying to checkpoint after

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
df = df.spark.checkpoint()

Results in the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [17], line 2
      1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 df = df.spark.checkpoint()

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/spark/accessors.py:1073, in SparkFrameMethods.checkpoint(self, eager)
   1070 from pyspark.pandas.frame import DataFrame
   1072 internal = self._psdf._internal.resolved_copy
-> 1073 checkpointed_sdf = internal.spark_frame.checkpoint(eager)
   1074 return DataFrame(internal.with_new_sdf(checkpointed_sdf))

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:682, in DataFrame.checkpoint(self, eager)
    665 def checkpoint(self, eager: bool = True) -> "DataFrame":
    666     """Returns a checkpointed version of this :class:`DataFrame`. Checkpointing can be used to
    667     truncate the logical plan of this :class:`DataFrame`, which is especially useful in
    668     iterative algorithms where the plan may grow exponentially. It will be saved to files
   (...)
    680     This API is experimental.
    681     """
--> 682     jdf = self._jdf.checkpoint(eager)
    683     return DataFrame(jdf, self.sparkSession)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o8801.checkpoint.
: org.apache.spark.SparkException: Job 32 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1196)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1194)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1194)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2897)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2794)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2217)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1484)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2217)
	at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2154)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2403)
	at org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:166)
	at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:60)
	at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
	at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1913)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1903)
	at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:700)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:691)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:654)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)

Trying to use local_checkpoint() instead of checkpoint

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
df = df.spark.local_checkpoint()

Results in a similar error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [21], line 2
      1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 df = df.spark.local_checkpoint()

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/spark/accessors.py:1111, in SparkFrameMethods.local_checkpoint(self, eager)
   1108 from pyspark.pandas.frame import DataFrame
   1110 internal = self._psdf._internal.resolved_copy
-> 1111 checkpointed_sdf = internal.spark_frame.localCheckpoint(eager)
   1112 return DataFrame(internal.with_new_sdf(checkpointed_sdf))

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:702, in DataFrame.localCheckpoint(self, eager)
    685 def localCheckpoint(self, eager: bool = True) -> "DataFrame":
    686     """Returns a locally checkpointed version of this :class:`DataFrame`. Checkpointing can be
    687     used to truncate the logical plan of this :class:`DataFrame`, which is especially useful in
    688     iterative algorithms where the plan may grow exponentially. Local checkpoints are
   (...)
    700     This API is experimental.
    701     """
--> 702     jdf = self._jdf.localCheckpoint(eager)
    703     return DataFrame(jdf, self.sparkSession)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o12529.localCheckpoint.
: org.apache.spark.SparkException: Job 32 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1196)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1194)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1194)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2897)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2794)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2217)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1484)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2217)
	at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2154)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
	at org.apache.spark.rdd.LocalRDDCheckpointData.doCheckpoint(LocalRDDCheckpointData.scala:54)
	at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
	at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1913)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1903)
	at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:700)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:691)
	at org.apache.spark.sql.Dataset.localCheckpoint(Dataset.scala:678)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)

Even when I try to break up the lineage by calling an action

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
print(df.head(10))

I get an error:

/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:201: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation.
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [23], line 2
1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 print(df.head(10))
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12255, in DataFrame.__repr__(self)
12252 if max_display_count is None:
12253     return self._to_internal_pandas().to_string()
> 12255 pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
12256 pdf_length = len(pdf)
12257 pdf = cast("DataFrame", pdf.iloc[:max_display_count])
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12246, in DataFrame._get_or_create_repr_pandas_cache(self, n)
12243 def _get_or_create_repr_pandas_cache(self, n: int) -> Union[pd.DataFrame, pd.Series]:
12244     if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
12245         object.__setattr__(
> 12246             self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
12247         )
12248     return self._repr_pandas_cache[n]
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12241, in DataFrame._to_internal_pandas(self)
12235 def _to_internal_pandas(self) -> pd.DataFrame:
12236     """
12237     Return a pandas DataFrame directly from _internal to avoid overhead of copy.
12238 
12239     This method is for internal use only.
12240     """
> 12241     return self._internal.to_pandas_frame
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/utils.py:588, in lazy_property.<locals>.wrapped_lazy_property(self)
584 @property
585 @functools.wraps(fn)
586 def wrapped_lazy_property(self):
587     if not hasattr(self, attr_name):
--> 588         setattr(self, attr_name, fn(self))
589     return getattr(self, attr_name)
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/internal.py:1056, in InternalFrame.to_pandas_frame(self)
1054 """Return as pandas DataFrame."""
1055 sdf = self.to_internal_spark_frame
-> 1056 pdf = sdf.toPandas()
1057 if len(pdf) == 0 and len(sdf.schema) > 0:
1058     pdf = pdf.astype(
1059         {field.name: spark_type_to_pandas_dtype(field.dataType) for field in sdf.schema}
1060     )
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:140, in PandasConversionMixin.toPandas(self)
138 tmp_column_names = ["col_{}".format(i) for i in range(len(self.columns))]
139 self_destruct = jconf.arrowPySparkSelfDestructEnabled()
--> 140 batches = self.toDF(*tmp_column_names)._collect_as_arrow(
141     split_batches=self_destruct
142 )
143 if len(batches) > 0:
144     table = pyarrow.Table.from_batches(batches)
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:358, in PandasConversionMixin._collect_as_arrow(self, split_batches)
355         results = list(batch_stream)
356 finally:
357     # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 358     jsocket_auth_server.getResult()
360 # Separate RecordBatches from batch order indices in results
361 batches = results[:-1]
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316     self.command_header +\
1317     args_command +\
1318     proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322     answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325     temp_arg._detach()
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189     try:
--> 190         return f(*a, **kw)
191     except Py4JJavaError as e:
192         converted = convert_exception(e.java_exception)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
327         "An error occurred while calling {0}{1}{2}.\n".
328         format(target_id, ".", name), value)
329 else:
330     raise Py4JError(
331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332         format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o16336.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: The "collectAsArrowToPython" action failed. Please, fill a bug report in, and provide the full stack trace.
at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:552)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:564)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:3792)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:3791)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:139)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:141)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:136)
at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:113)
at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:107)
at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:68)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:222)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:384)
at org.apache.spark.sql.execution.CollectLimitExec.doExecute(limit.scala:70)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226)
at org.apache.spark.sql.Dataset.toArrowBatchRdd(Dataset.scala:3923)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:3810)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3815)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3792)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
... 19 more

My Question

What is going wrong here? I understand that my lineage gets large because of the nested for-loop. It seems that any action I perfrom crashes my application but I don't know how to avoid it. It could also be that this is a pandas-on-spark issue, and that I would be better off using regular pyspark for this. At this point I am quite stuck, so any advice relating to solving this issue would be greatly appreciated.

答案1

得分: 1

以下是翻译好的内容:

"看起来你可能遇到了一个情况,不再合理使用Pandas API在Spark上。

你的查询计划对于应该是查询计划中的单个“Project”阶段而言,实际上是如此庞大,这可能是有问题的。

你可以从以下两种方式选择:

  • 将你的函数分割成片段,并在函数调用之间进行检查点,这将使你的查询性能甚至更差,但会在每个检查点操作之间切割你的谱系。
  • 只需为此使用普通的Pyspark。下面是一个示例:"
from pyspark.sql.types import StringType
import pyspark.sql.functions as F

def resolve_abbreviations(df, colName):
    """
    职位名称包含许多常见术语的缩写。
    我们将它们写出来,以创建更标准化的职位名称列表。

    :param job_list: 处理步骤中的df.SchoneFunctie
    :return: 缩写词以单词形式写出的SchoneFunctie
    """
    缩写词典 = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    # 创建缩写词列表
    缩写词列表 = list(缩写词典.keys())

    # 对于列表中的每个缩写词
    for abb in 缩写词列表:
        # 定义要查找的模式
        模式 = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                fr'{abb}\.']
        # 实际将缩写词替换为写出的形式
        要替换的值 = 缩写词典[abb]
        for patt in 模式:
            df = df.withColumn(colName, F.regexp_replace(colName, patt, 要替换的值))

    return df


df = spark.createDataFrame(
    [
        "Ik ben de 2e",
        "Jij bent de 1e geworden",
        "Ik wil de 3de zijn, maar ik ben de ceo",
        "Jij bent tech gezien de sr",
        "Later wil ik zw zijn"
    ],
    StringType()
)

我的函数和你的函数之间只有一些小差异:

  • 函数接受2个参数,一个数据框和一个字符串(表示一个列)
  • 我们实际执行正则表达式替换的那一行:df.withColumn(colName, F.regexp_replace(colName, patt, value_to_replace))使用了Pyspark的原生函数(withColumnregex_replace),因此Spark可以意识到这些不同的操作不应该成为查询计划中的步骤。

现在,如果我调用这个函数,我会得到翻译得很好的文本:

>>> resolve_abbreviations(df, "value").show(truncate=False)
+------------------------------------------------------------+
|value                                                       |
+------------------------------------------------------------+
|Ik ben de tweede                                            |
|Jij bent de eerste geworden                                 |
|Ik wil de derde zijn, maar ik ben de chief executive officer|
|Jij bent technisch gezien de senior                         |
|Later wil ik zelfstandig werkend zijn                       |
+------------------------------------------------------------+

如果你查看这个操作的查询计划:

>>> resolve_abbreviations(df, "value").explain()
== 物理计划 ==
*(1) Project [regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(
regexp_replace(regexp_replace(regexp_replace(regexp_replace(value#0, ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))1e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), eerste, 1), 1e\., eerste, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))1ste((?=( ))|(?=(\\))|(?=($))|(?=(\)))), eerste, 1), 1ste\., eerste, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))2e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), tweede, 1), 2e\., tweede, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))2de((?=( ))|(?=(\\))|(?=($))|(?=(\))
)), tweede, 1), 2de\., tweede, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))3e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), derde, 1), 3e\., derde, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))3de((?=( ))|(?=(\\))|(?=($))|(?=(\)))), derde, 1), 3de\., derde, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))ceo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief executive officer, 1), ceo\., chief executive officer, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))cfo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief financial officer, 1), cfo\., chief financial officer, 1), ((

<details>
<summary>英文:</summary>

It seems like you might be hitting against a situation where it&#39;s not reasonable anymore to use the Pandas API on Spark.

The fact that your query plan is so gigantic for something that should be a single `Project` stage in your query plan is probably problematic.

You can go 2 ways from here:

- Chop up your function into pieces, and [checkpoint][1] your dataframe in between function calls. This will make your query even less performant, but it will chop up your lineage in between each checkpoint operation.
- Just use plain Pyspark for this. Below you can find an example:


```python
from pyspark.sql.types import StringType
import pyspark.sql.functions as F

def resolve_abbreviations(df, colName):
    &quot;&quot;&quot;
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    &quot;&quot;&quot;
    abbreviations_dict = {
        &quot;1e&quot;: &quot;eerste&quot;,
        &quot;1ste&quot;: &quot;eerste&quot;,
        &quot;2e&quot;: &quot;tweede&quot;,
        &quot;2de&quot;: &quot;tweede&quot;,
        &quot;3e&quot;: &quot;derde&quot;,
        &quot;3de&quot;: &quot;derde&quot;,
        &quot;ceo&quot;: &quot;chief executive officer&quot;,
        &quot;cfo&quot;: &quot;chief financial officer&quot;,
        &quot;coo&quot;: &quot;chief operating officer&quot;,
        &quot;cto&quot;: &quot;chief technology officer&quot;,
        &quot;sr&quot;: &quot;senior&quot;,
        &quot;tech&quot;: &quot;technisch&quot;,
        &quot;zw&quot;: &quot;zelfstandig werkend&quot;
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr&#39;((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))&#39;,
                    fr&#39;{abb}\.&#39;]
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            df = df.withColumn(colName, F.regexp_replace(colName, patt, value_to_replace))

    return df


df = spark.createDataFrame(
    [
        &quot;Ik ben de 2e&quot;,
        &quot;Jij bent de 1e geworden&quot;,
        &quot;Ik wil de 3de zijn, maar ik ben de ceo&quot;,
        &quot;Jij bent tech gezien de sr&quot;,
        &quot;Later wil ik zw zijn&quot;
    ],
    StringType()
)

There are but small differences between your function and mine:

  • The function takes in 2 parameters, a dataframe and a String (representing a column)
  • The line where we're actually doing the regex replacement: df.withColumn(colName, F.regexp_replace(colName, patt, value_to_replace)) uses Pyspark native functions (withColumn, regex_replace) so Spark can realize not all of these different operations should become a step in the query plan.

Now, if I call this function, I get the nicely translated text:

&gt;&gt;&gt; resolve_abbreviations(df, &quot;value&quot;).show(truncate=False)
+------------------------------------------------------------+
|value                                                       |
+------------------------------------------------------------+
|Ik ben de tweede                                            |
|Jij bent de eerste geworden                                 |
|Ik wil de derde zijn, maar ik ben de chief executive officer|
|Jij bent technisch gezien de senior                         |
|Later wil ik zelfstandig werkend zijn                       |
+------------------------------------------------------------+

And if you look at the query plan of this operation:

&gt;&gt;&gt; resolve_abbreviations(df, &quot;value&quot;).explain()
== Physical Plan ==
*(1) Project [regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(reg
exp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(value#0, ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))1e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), eerste, 1), 1e\., eerste, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=
(\\))|(?&lt;=(\()))1ste((?=( ))|(?=(\\))|(?=($))|(?=(\)))), eerste, 1), 1ste\., eerste, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))2e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), tweede, 1), 2e\., tweede, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))2de((?=( ))|(?=(\\))|(?=($))|(?=(\))
)), tweede, 1), 2de\., tweede, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))3e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), derde, 1), 3e\., derde, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))3de((?=( ))|(?=(\\))|(?=($))|(?=(\)))), derde, 1), 3de\., derde, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=
(\\))|(?&lt;=(\()))ceo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief executive officer, 1), ceo\., chief executive officer, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))cfo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief financial officer, 1), cfo\., chief financial officer, 1), ((?&lt;=( 
))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))coo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief operating officer, 1), coo\., chief operating officer, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))cto((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief technology officer, 1), cto\., chief technology of
ficer, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))sr((?=( ))|(?=(\\))|(?=($))|(?=(\)))), senior, 1), sr\., senior, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\()))tech((?=( ))|(?=(\\))|(?=($))|(?=(\)))), technisch, 1), tech\., technisch, 1), ((?&lt;=( ))|(?&lt;=(^))|(?&lt;=(\\))|(?&lt;=(\
()))zw((?=( ))|(?=(\\))|(?=($))|(?=(\)))), zelfstandig werkend, 1), zw\., zelfstandig werkend, 1) AS value#275]
+- *(1) Scan ExistingRDD[value#0]

You see that even though you're doing many regex replacements, they are all part of a single Project step in your logical plan. This will make your operation much more performant as well.

huangapple
  • 本文由 发表于 2023年5月10日 21:13:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76218874.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定