a MapType causes an AnalysisException in Spark 3.x : Encoders.bean to an object containg a map<String, someClass> fails, that works fine in Spark 2.4

huangapple go评论70阅读模式
英文:

a MapType causes an AnalysisException in Spark 3.x : Encoders.bean to an object containg a map<String, someClass> fails, that works fine in Spark 2.4

问题

The issue you are facing when migrating your Java Spark code from version 2.4 to 3.x is related to a change in how Spark handles complex types like MapType in Datasets. Specifically, it seems to be related to the extraction of values from a MapType column in Spark 3.x.

The error message you provided indicates the problem:

org.apache.spark.sql.AnalysisException: Can't extract value from lambdavariable(MapObject, StringType, true, 376): need struct type but got string;

This error occurs when Spark 3.x is trying to extract a value from a MapType column but expects a struct type instead of a string.

The code you provided includes the schema definition for your Datasets and the structure of your Entreprise and Etablissement classes. However, without the actual code where you perform the transformation or extraction, it's challenging to pinpoint the exact issue.

To resolve this issue, you may need to review the code where you are performing the transformation or extraction of data from your Datasets. In Spark 3.x, there might be changes in how you need to work with complex types like MapType. It's possible that you need to modify your code to accommodate these changes.

Here are some general steps you can take to troubleshoot and fix the issue:

  1. Check the code where you are performing operations on your Datasets, especially where you are accessing or transforming MapType columns. Ensure that you are using the correct methods and functions for handling complex types.

  2. Review the Spark 3.x documentation and release notes for any changes related to complex types and Datasets. This can provide insights into the changes you need to make in your code.

  3. Consider using Spark's built-in functions and methods for working with complex types. Spark provides a wide range of functions for working with arrays, maps, and structs. Using these functions can help ensure compatibility with Spark 3.x.

  4. If necessary, update your schema definitions or data structures to match any changes in Spark 3.x's handling of complex types.

  5. Debug your code step by step to identify the specific operation or transformation that is causing the error. This can help pinpoint the issue more accurately.

Keep in mind that Spark's behavior can change between major versions, so it's essential to review the documentation and adjust your code accordingly when migrating to a new Spark version.

英文:

Attempting to migrate my Java Spark code from 2.4 to 3.x, I have one dataset one that holds a MapType.

/**
 * Renvoyer le sch&#233;ma du Dataset.
 * @return Schema.
 */
public StructType schemaEntreprise() {
   StructType schema = new StructType()
      .add(&quot;siren&quot;, StringType, false)
      .add(&quot;statutDiffusionUniteLegale&quot;, StringType, true)
      .add(&quot;unitePurgeeUniteLegale&quot;, StringType, true )
      .add(&quot;dateCreationEntreprise&quot;, StringType, true)
      .add(&quot;sigle&quot;, StringType, true)
     
   /* ... and other fields mostly of String, Integer, Boolean type... */
   
   // Ajouter au Dataset des entreprises la liaison avec les &#233;tablissements.
   MapType mapEtablissements = new MapType(StringType,
this.datasetEtablissement.schemaEtablissement(), true);
   StructField etablissements = new StructField(&quot;etablissements&quot;,
mapEtablissements, true, Metadata.empty());
   schema.add(etablissements);
   schema.add(&quot;libelleCategorieJuridique&quot;, StringType, true);
   schema.add(&quot;partition&quot;, StringType, true);
   
   return schema;
}

The Dataset&lt;Etablissement&gt; and the business objet Etablissment have only primitives types in them :

public StructType schemaEtablissement() {
   return new StructType()
      .add(&quot;siren&quot;, StringType, false)
      .add(&quot;nic&quot;, StringType, false)
      .add(&quot;siret&quot;, StringType, false)
      .add(&quot;statutDiffusionEtablissement&quot;, StringType, true)
      .add(&quot;dateCreationEtablissement&quot;, StringType, true)
         
      .add(&quot;trancheEffectifSalarie&quot;, StringType, true)
   [...]
public class Etablissement extends AbstractSirene&lt;SIRET&gt; implements Comparable&lt;Etablissement&gt; {
   /** Serial ID. */
   private static final long serialVersionUID = 2451240618966775942L;
   
   /** Ann&#233;e et mois de cr&#233;ation de l&#39;&#233;tablissement. */
   private String dateCreation;
   
   /** Qualit&#233; de si&#232;ge ou non de l&#39;&#233;tablissement */
   private boolean siege;

   /** Enseigne 1 ou nom de l&#39;exploitation */
   private String enseigne1;
   
   /** Enseigne 2 ou nom de l&#39;exploitation */
   private String enseigne2;
   [...]

This Entreprise dataset works perfectly in Spark 2.4. but when used in Spark 3.0.1 inside an operation, its analysis phase ends with an unclear message :

org.apache.spark.sql.AnalysisException: *Can&#39;t extract value from lambdavariable(MapObject, StringType, true, 376)*: need struct type but got string;


EDIT : I add new information about my problem :
It's not a spark.sql.legacy.allowHashOnMapType=true missing problem. Adding it doesn't resolve it.

The problem happens when Spark 3 attempts to perform a :
Encoders.bean(Entreprise.class) in order to create the enterprise objects, who have this class :

public class Entreprise extends AbstractSirene&lt;SIREN&gt; implements Comparable&lt;Entreprise&gt; {
   /** Serial ID. */
   private static final long serialVersionUID = 2451240618966775942L;
   
   /** Liste des &#233;tablissements de l&#39;entreprise. */
   private Map&lt;String, Etablissement&gt; etablissements = new HashMap&lt;&gt;();
   
   /** Sigle de l&#39;entreprise */
   private String sigle;
   
   /** Nom de naissance */
   private String nomNaissance;

   [...]   
   /**
    * Renvoyer la liste des &#233;tablissements de l&#39;entreprise.
    * @return Liste des &#233;tablissements.
    */
   public Map&lt;String, Etablissement&gt; getEtablissements() {
      return this.etablissements;
   }

   /**
    * Fixer la liste des &#233;tablissements de l&#39;entreprise.
    * @param etablissementsEntreprise Liste des &#233;tablissements.
    */
   public void setEtablissements(Map&lt;String, Etablissement&gt; etablissementsEntreprise) {
      this.etablissements = etablissementsEntreprise;
   }

   /**
    * Renvoyer le sigle (forme r&#233;duite de la raison sociale ou de la d&#233;nomination d&#39;une personne morale ou d&#39;un organisme public) (SIGLE).
    * @return Sigle. 
    */
   public String getSigle() {
      return this.sigle;
   }

   /**
    * Fixer le sigle (forme r&#233;duite de la raison sociale ou de la d&#233;nomination d&#39;une personne morale ou d&#39;un organisme public) (SIGLE).
    * @param sigle Sigle. 
    */
   public void setSigle(String sigle) {
      this.sigle = sigle;
   }

   /**
    * Renvoyer le nom de naissance pour une personne physique (NOM).
    * @return Nom de naissance pour une personne physique.
    */
   public String getNomNaissance() {
      return this.nomNaissance;
   }

   /**
    * Fixer le nom de naissance pour une personne physique (NOM).
    * @param nom Nom de naissance pour une personne physique.
    */
   public void setNomNaissance(String nom) {
      this.nomNaissance = nom;
   }

   [...]
}

A debugging has shown me that Scala failed here :

org.apache.spark.sql.AnalysisException: Can&#39;t extract value from lambdavariable(MapObject, StringType, true, 32): need struct type but got string;
at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:73)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3076)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3074)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
[...]
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$2(TreeNode.scala:416)
at scala.collection.MapLike$MappedValues.$anonfun$iterator$3(MapLike.scala:257)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.IterableLike$$anon$1.foreach(IterableLike.scala:331)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
at scala.collection.TraversableViewLike.force(TraversableViewLike.scala:91)
at scala.collection.TraversableViewLike.force$(TraversableViewLike.scala:89)
at scala.collection.IterableLike$$anon$1.force(IterableLike.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:424)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10.applyOrElse(Analyzer.scala:3074)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10.applyOrElse(Analyzer.scala:3070)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChild$2(TreeNode.scala:368)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$4(TreeNode.scala:427)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:427)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$2(TreeNode.scala:416)
at scala.collection.MapLike$MappedValues.$anonfun$iterator$3(MapLike.scala:257)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.IterableLike$$anon$1.foreach(IterableLike.scala:331)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
at scala.collection.TraversableViewLike.force(TraversableViewLike.scala:91)
at scala.collection.TraversableViewLike.force$(TraversableViewLike.scala:89)
at scala.collection.IterableLike$$anon$1.force(IterableLike.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:424)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
[...]
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:170)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:349)
at org.apache.spark.sql.Dataset.resolvedEnc$lzycompute(Dataset.scala:252)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolvedEnc(Dataset.scala:251)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:83)
at org.apache.spark.sql.Dataset.as(Dataset.scala:475)
at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDataset.toDatasetEntreprise(EntrepriseDataset.java:320)
at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDataset.dsEntreprises(EntrepriseDataset.java:307)
at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDataset.collectEntreprisesEtEtablissements(EntrepriseDataset.java:366)
at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDatasetIT.entreprisesEtEtablissementsDeDouarnenez(EntrepriseDatasetIT.java:189)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)[...]
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)

and the org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala is this one, but I have no knowledge of Scala, and I don't know what it expect :

a MapType causes an AnalysisException in Spark 3.x : Encoders.bean to an object containg a map<String, someClass> fails, that works fine in Spark 2.4


In the case everything is working fine (=> in Spark 2.4.7), the following unit test gives the results next to him :

/**
* Obtention des entreprises et &#233;tablissements de Douanenez.
* @throws TechniqueException si un incident survient.
*/
@Test
@DisplayName(&quot;Les entreprises et &#233;tablissements de Douanenez.&quot;)
public void entreprisesEtEtablissementsDeDouarnenez() throws TechniqueException {
  Column douarnenez = col(&quot;codeCommune&quot;).equalTo(&quot;29046&quot;);
  
  Entreprises entreprises = 
    this.entrepriseDataset.collectEntreprisesEtEtablissements(this.session, 
    COG, ANNEE_SIRENE, true, true, null, douarnenez);
  
  LOGGER.info(&quot;{} entreprises ont &#233;t&#233; lues.&quot;, entreprises.size());
  
  for(Entreprise entreprise : entreprises) {
	 LOGGER.info(entreprise.toString());
	 entreprise.getEtablissements().values()
	    .forEach(etablissement -&gt; LOGGER.info(&quot;\t{}&quot;, etablissement.toString()));
  }
}
2287 entreprises ont &#233;t&#233; lues.
{{314551813, Activit&#233; principale : 56.30Z (NAFRev2), effectif salari&#233; : 00 (2017, employeur : null), active : null, dernier traitement : 24 juin 2019, historisation d&#233;but&#233;e le 1 janv. 2008, nombre de p&#233;riodes sans changement : 3}, nombre d&#39;&#233;tablissements : 1, cat&#233;gorie entreprise : PME (2&#160;017), cat&#233;gorie juridique : 1000, n&#176; r&#233;pertoire national des associations : null, Economie Sociale et Solidaire : null, NIC de l&#39;&#233;tablissement si&#232;ge : 00012, sigle : null, d&#233;nomination de l&#39;entreprise : {18}, d&#233;nominations usuelles 1 : HOTEL BAR LA RADE, 2 :{19}, 3 : {20}, 4 : {21} , Nom de naissance : HERAUD, Nom d&#39;usage : HASCOET, pr&#233;nom usuel : MICHELINE, autres pr&#233;noms : MICHELINE, pseudonyme : null, sexe : F, purg&#233;e : null, date de cr&#233;ation : 1 janv. 1978}
{{31455181300012, Activit&#233; principale : 56.30Z (NAFRev2), effectif salari&#233; : 00 (2017, employeur : null), active : null, dernier traitement : 24 juin 2019, historisation d&#233;but&#233;e le 1 janv. 2008, nombre de p&#233;riodes sans changement : 3}, activit&#233; au registre des m&#233;tiers : null, date de cr&#233;ation de l&#39;&#233;tablissement : 1978-01-01, &#233;tablissement si&#232;ge : false, d&#233;nomination de l&#39;&#233;tablissement : null, enseigne 1 : null, 2 : null, 3 : null, adresses : {anomalies : [], annul&#233; logiquement : false, distribution sp&#233;ciale : null, num&#233;ro dans la voie : 31, r&#233;p&#233;tition : null, type de voie : QUAI, libell&#233; de voie : DU GRAND PORT, compl&#233;ment d&#39;adresse : null, code postal : 29100, cedex : null - null, commune : 29046 - Douarnenez, commune &#233;trang&#232;re : null, pays : null - null}}
{{484663224, Activit&#233; principale : 46.49Z (NAFRev2), effectif salari&#233; : 02 (2017, employeur : null), active : null, dernier traitement : 5 juil. 2020, historisation d&#233;but&#233;e le 31 d&#233;c. 2019, nombre de p&#233;riodes sans changement : 4}, nombre d&#39;&#233;tablissements : 2, cat&#233;gorie entreprise : PME (2&#160;017), cat&#233;gorie juridique : 5499, n&#176; r&#233;pertoire national des associations : null, Economie Sociale et Solidaire : null, NIC de l&#39;&#233;tablissement si&#232;ge : 00018, sigle : null, d&#233;nomination de l&#39;entreprise : {18}, d&#233;nominations usuelles 1 : null, 2 :{19}, 3 : {20}, 4 : {21} , Nom de naissance : null, Nom d&#39;usage : null, pr&#233;nom usuel : null, autres pr&#233;noms : null, pseudonyme : null, sexe : null, purg&#233;e : null, date de cr&#233;ation : 5 oct. 2005}
{{48466322400026, Activit&#233; principale : 33.15Z (NAFRev2), effectif salari&#233; : null (null, employeur : null), active : null, dernier traitement : 10 juil. 2014, historisation d&#233;but&#233;e le 1 janv. 2014, nombre de p&#233;riodes sans changement : 1}, activit&#233; au registre des m&#233;tiers : null, date de cr&#233;ation de l&#39;&#233;tablissement : 2014-01-01, &#233;tablissement si&#232;ge : false, d&#233;nomination de l&#39;&#233;tablissement : null, enseigne 1 : MARINE SERVICE, 2 : null, 3 : null, adresses : {anomalies : [], annul&#233; logiquement : false, distribution sp&#233;ciale : null, num&#233;ro dans la voie : 3, r&#233;p&#233;tition : null, type de voie : IMP, libell&#233; de voie : DE PENN AR CREACH, compl&#233;ment d&#39;adresse : null, code postal : 29100, cedex : null - null, commune : 29046 - Douarnenez, commune &#233;trang&#232;re : null, pays : null - null}}
{{48466322400018, Activit&#233; principale : 33.15Z (NAFRev2), effectif salari&#233; : 02 (2017, employeur : null), active : null, dernier traitement : 5 juil. 2020, historisation d&#233;but&#233;e le 1 janv. 2008, nombre de p&#233;riodes sans changement : 4}, activit&#233; au registre des m&#233;tiers : null, date de cr&#233;ation de l&#39;&#233;tablissement : 2005-10-05, &#233;tablissement si&#232;ge : false, d&#233;nomination de l&#39;&#233;tablissement : null, enseigne 1 : MARINE SERVICE, 2 : null, 3 : null, adresses : {anomalies : [], annul&#233; logiquement : false, distribution sp&#233;ciale : null, num&#233;ro dans la voie : null, r&#233;p&#233;tition : null, type de voie : PL, libell&#233; de voie : VICTOR SALEZ, compl&#233;ment d&#39;adresse : null, code postal : 29100, cedex : null - null, commune : 29046 - Douarnenez, commune &#233;trang&#232;re : null, pays : null - null}}
[...]

EDIT 2 : The collect method

public Entreprises collectEntreprisesEtEtablissements(SparkSession session, int anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, 
   Column conditionSurEntreprises, Column conditionSurEtablissements) throws TechniqueException {
   return collectEntreprisesEtEtablissements(dsEntreprises(session, anneeSIRENE, actifsSeulement, conditionSurEntreprises, Tri.CODE_COMMUNE), 
   this.datasetEtablissement.dsEtablissements(session, anneeCOG, anneeSIRENE, actifsSeulement, communesValides, conditionSurEtablissements));
}

where the dsEnterprises(...) dans dsEtablissements(...) methods converts Dataset&lt;Row&gt; to Dataset&lt;Entreprise&gt; or Dataset&lt;Etablissement&gt;.

/**
  * Obtenir les entreprises li&#233;es &#224; leur &#233;tablissements.
  * @param dsEntreprises Dataset d&#39;entreprises.
  * @param dsEtablissements Dataset d&#39;&#233;tablissements.
  * @return Entreprises aliment&#233;es avec leurs &#233;tablissements.
  */
public Entreprises collectEntreprisesEtEtablissements(Dataset&lt;Entreprise&gt; dsEntreprises, Dataset&lt;Etablissement&gt; dsEtablissements) {
   Dataset&lt;Tuple2&lt;Entreprise, Etablissement&gt;&gt; ds = dsEntreprises.joinWith(dsEtablissements, dsEntreprises.col(&quot;siren&quot;).equalTo(dsEtablissements.col(&quot;siren&quot;)), &quot;inner&quot;);
   Entreprises entreprises = new Entreprises();
      
   List&lt;Tuple2&lt;Entreprise, Etablissement&gt;&gt; tuples = ds.collectAsList();
   Iterator&lt;Tuple2&lt;Entreprise, Etablissement&gt;&gt; itTuples = tuples.iterator();
      
   while(itTuples.hasNext()) {
      Tuple2&lt;Entreprise, Etablissement&gt; tuple = itTuples.next();
      Entreprise entreprise = entreprises.get(tuple._1().getSiren());
      Etablissement etablissement = tuple._2();
         
      if (entreprise == null) {
         entreprise = tuple._1();
         entreprises.add(entreprise);
      }
         
      entreprise.ajouterEtablissement(etablissement);
   }
      
   return entreprises;
}

My question : what is expecting the new Spark version ?

答案1

得分: 1

我在通过另一个针对特定问题的帮助的情况下发现,困扰我的问题的原因是这里没有显示的一行代码,只影响了Entreprise对象的构建:

Dataset<Row> ds = ...
   .withColumn("etablissements", lit(null).cast("map<string,string>"))

并且导致在dataset.as(Encoders.bean(Entreprise.class))处失败:Spark 2.x在转换时未检查值的类型,但从3.x开始开始进行检查,结果发现我声明的该转换的值具有错误的类型。

我的map<string,string>应该是map<string,Etablissement>。但实际上无法直接这样写:

解决方案是:

StructType etablissementType = Encoders.bean(Etablissement.class).schema();

Dataset<Row> ds = ...
   .withColumn("etablissements", lit(null)
      .cast(DataTypes.createMapType(StringType, etablissementType)))
英文:

I found with the help of another question targeting a point specifically that the cause of my trouble is a line not shown here, affecting only the construction of the Entreprise object :

Dataset&lt;Row&gt; ds = ...
   .withColumn(&quot;etablissements&quot;, lit(null).cast(&quot;map&lt;string,string&gt;&quot;))

and causes a failure at dataset.as(Encoders.bean(Entreprise.class)) : Spark 2.x didn't checked the type of the value at cast time, but started to do it in 3.x, and it appeared that my declared value for that cast had a wrong type.

my map&lt;string,string&gt; should be a map&lt;string,Etablissement&gt; instead. But that it cannot exactly be written that way :

The solution is :

StructType etablissementType = Encoders.bean(Etablissement.class).schema();

Dataset&lt;Row&gt; ds = ...
   .withColumn(&quot;etablissements&quot;, lit(null)
      .cast(DataTypes.createMapType(StringType, etablissementType)))

huangapple
  • 本文由 发表于 2020年10月13日 15:52:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/64330916.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定