I know how to do an orderBy("a", "b"…), a groupBy on a dataset. I need to perform calculations and work on each subset of records, idependently

huangapple go评论71阅读模式
英文:

I know how to do an orderBy("a", "b"...), a groupBy on a dataset. I need to perform calculations and work on each subset of records, idependently

问题

Apologies, but it seems like the code you provided is in Java and contains some complex logic related to Spark operations on accounting data. It's quite extensive, and translating it manually line by line might not be very efficient. If you have specific questions or parts you would like to understand or discuss, please feel free to ask.

英文:

I'm reading an accounting file for cities. My goal is to offer some informative subtotals for each accounting number of each establishment :

Some columns, named from (cumulSD3, cumulSC3) to (cumulSD7, cumulSC7) are added to the records, and aggregates soldeDebiteur and soldeCrediteur for root accounts : account number 13248 will aggregate under 13248, 1324 and 132 levels, in example.

+--------------------------+----------+-----------------+---------------------+---------------------+---------+----------+------------+-----------+------------+----------+---------------------+-----------+------------+------------------+-------------------+------------------------+-------------------------+---------------------------+----------------------------+-----------------------------+------------------------------+-------------+--------------+-------------+---------------+--------------------------+--------+--------+-----------------------------------------------------------------------------------------------------+-------------------------+------------+----------------+----------------+----------+----------+----------------+----------+----------+----------------+----------+---------+---------------+-----------+--------------+----------------+--------+---------+
|libelleBudget             |typeBudget|typeEtablissement|sousTypeEtablissement|nomenclatureComptable|siren    |codeRegion|codeActivite|codeSecteur|numeroFINESS|codeBudget|categorieCollectivite|typeBalance|numeroCompte|balanceEntreeDebit|balanceEntreeCredit|operationBudgetaireDebit|operationBudgetaireCredit|operationNonBudgetaireDebit|operationNonBudgetaireCredit|operationOrdreBudgetaireDebit|operationOrdreBudgetaireCredit|soldeDebiteur|soldeCrediteur|anneeExercice|budgetPrincipal|nombreChiffresNumeroCompte|cumulSD7|cumulSC7|libelleCompte                                                                                        |nomenclatureComptablePlan|sirenCommune|populationTotale|numeroCompteSur3|cumulSD3  |cumulSC3  |numeroCompteSur4|cumulSD4  |cumulSC4  |numeroCompteSur5|cumulSD5  |cumulSC5 |codeDepartement|codeCommune|siret         |numeroCompteSur6|cumulSD6|cumulSC6 |
+--------------------------+----------+-----------------+---------------------+---------------------+---------+----------+------------+-----------+------------+----------+---------------------+-----------+------------+------------------+-------------------+------------------------+-------------------------+---------------------------+----------------------------+-----------------------------+------------------------------+-------------+--------------+-------------+---------------+--------------------------+--------+--------+-----------------------------------------------------------------------------------------------------+-------------------------+------------+----------------+----------------+----------+----------+----------------+----------+----------+----------------+----------+---------+---------------+-----------+--------------+----------------+--------+---------+
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1021        |0.0               |349139.71          |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |349139.71     |2019         |true           |4                         |0.0     |0.0     |Dotation                                                                                             |M14                      |210100012   |794             |102             |0.0       |995427.19 |1021            |0.0       |349139.71 |1021            |0.0       |0.0      |01             |01001      |21010001200017|1021            |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |10222       |0.0               |554545.85          |0.0                     |30003.0                  |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |584548.85     |2019         |true           |5                         |0.0     |0.0     |F.C.T.V.A.                                                                                           |M14                      |210100012   |794             |102             |0.0       |995427.19 |1022            |0.0       |646287.48 |10222           |0.0       |584548.85|01             |01001      |21010001200017|10222           |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |10223       |0.0               |4946.0             |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |4946.0        |2019         |true           |5                         |0.0     |0.0     |T.L.E.                                                                                               |M14                      |210100012   |794             |102             |0.0       |995427.19 |1022            |0.0       |646287.48 |10223           |0.0       |4946.0   |01             |01001      |21010001200017|10223           |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |10226       |0.0               |41753.65           |0.0                     |12078.54                 |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |53832.19      |2019         |true           |5                         |0.0     |0.0     |Taxe d’aménagement                                                                                   |M14                      |210100012   |794             |102             |0.0       |995427.19 |1022            |0.0       |646287.48 |10226           |0.0       |53832.19 |01             |01001      |21010001200017|10226           |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |10227       |0.0               |2960.44            |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |2960.44       |2019         |true           |5                         |0.0     |0.0     |Versement pour sous-densité                                                                          |M14                      |210100012   |794             |102             |0.0       |995427.19 |1022            |0.0       |646287.48 |10227           |0.0       |2960.44  |01             |01001      |21010001200017|10227           |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1068        |0.0               |2281475.34         |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |2281475.34    |2019         |true           |4                         |0.0     |0.0     |Excédents de fonctionnement capitalisés                                                              |M14                      |210100012   |794             |106             |0.0       |2281475.34|1068            |0.0       |2281475.34|1068            |0.0       |0.0      |01             |01001      |21010001200017|1068            |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |110         |0.0               |97772.73           |0.0                     |0.0                      |0.0                        |112620.66                   |0.0                          |0.0                           |0.0          |210393.39     |2019         |true           |3                         |0.0     |0.0     |Report à nouveau (solde créditeur)                                                                   |M14                      |210100012   |794             |110             |0.0       |210393.39 |110             |0.0       |0.0       |110             |0.0       |0.0      |01             |01001      |21010001200017|110             |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |12          |0.0               |112620.66          |0.0                     |0.0                      |112620.66                  |0.0                         |0.0                          |0.0                           |0.0          |0.0           |2019         |true           |2                         |0.0     |0.0     |RÉSULTAT DE L'EXERCICE (excédentaire ou déficitaire)                                                 |M14                      |210100012   |794             |12              |0.0       |0.0       |12              |0.0       |0.0       |12              |0.0       |0.0      |01             |01001      |21010001200017|12              |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1321        |0.0               |29097.78           |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |29097.78      |2019         |true           |4                         |0.0     |0.0     |État et établissements nationaux                                                                     |M14                      |210100012   |794             |132             |0.0       |296722.26 |1321            |0.0       |29097.78  |1321            |0.0       |0.0      |01             |01001      |21010001200017|1321            |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1322        |0.0               |201.67             |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |201.67        |2019         |true           |4                         |0.0     |0.0     |Régions                                                                                              |M14                      |210100012   |794             |132             |0.0       |296722.26 |1322            |0.0       |201.67    |1322            |0.0       |0.0      |01             |01001      |21010001200017|1322            |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |1323        |0.0               |163194.37          |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |163194.37     |2019         |true           |4                         |0.0     |0.0     |Départements                                                                                         |M14                      |210100012   |794             |132             |0.0       |296722.26 |1323            |0.0       |163194.37 |1323            |0.0       |0.0      |01             |01001      |21010001200017|1323            |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |13248       |0.0               |1129.37            |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |1129.37       |2019         |true           |5                         |0.0     |0.0     |Autres communes                                                                                      |M14                      |210100012   |794             |132             |0.0       |296722.26 |1324            |0.0       |1129.37   |13248           |0.0       |1129.37  |01             |01001      |21010001200017|13248           |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |13251       |0.0               |47079.11           |0.0                     |2387.05                  |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |49466.16      |2019         |true           |5                         |0.0     |0.0     |GFP de rattachement                                                                                  |M14                      |210100012   |794             |132             |0.0       |296722.26 |1325            |0.0       |49532.16  |13251           |0.0       |49466.16 |01             |01001      |21010001200017|13251           |0.0     |0.0      |
|ABERGEMENT-CLEMENCIAT (L')|1         |101              |00                   |M14                  |210100012|084       |40          |null       |null        |null      |Commune              |DEF        |13258       |0.0               |66.0               |0.0                     |0.0                      |0.0                        |0.0                         |0.0                          |0.0                           |0.0          |66.0          |2019         |true           |5                         |0.0     |0.0     |Autres groupements                                                                                   |M14                      |210100012   |794             |132             |0.0       |296722.26 |1325            |0.0       |49532.16  |13258           |0.0       |66.0     |01             |01001      |21010001200017|13258           |0.0     |0.0      |

To be clearer, retaining only the main fields involved in calculations, here's what my function focus on :

+--------------+------------+-------------+--------------+--------+--------+--------+--------+---------+---------+----------+----------+----------+----------+
|         siret|numeroCompte|soldeDebiteur|soldeCrediteur|cumulSD7|cumulSC7|cumulSD6|cumulSC6| cumulSD5| cumulSC5|  cumulSD4|  cumulSC4|  cumulSD3|  cumulSC3|
+--------------+------------+-------------+--------------+--------+--------+--------+--------+---------+---------+----------+----------+----------+----------+
|21010001200017|        1021|          0.0|     349139.71|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0| 349139.71|       0.0| 995427.19|
|21010001200017|       10222|          0.0|     584548.85|     0.0|     0.0|     0.0|     0.0|      0.0|584548.85|       0.0| 646287.48|       0.0| 995427.19|
|21010001200017|       10223|          0.0|        4946.0|     0.0|     0.0|     0.0|     0.0|      0.0|   4946.0|       0.0| 646287.48|       0.0| 995427.19|
|21010001200017|       10226|          0.0|      53832.19|     0.0|     0.0|     0.0|     0.0|      0.0| 53832.19|       0.0| 646287.48|       0.0| 995427.19|
|21010001200017|       10227|          0.0|       2960.44|     0.0|     0.0|     0.0|     0.0|      0.0|  2960.44|       0.0| 646287.48|       0.0| 995427.19|
|21010001200017|        1068|          0.0|    2281475.34|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|2281475.34|       0.0|2281475.34|
|21010001200017|         110|          0.0|     210393.39|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|       0.0|       0.0| 210393.39|
|21010001200017|          12|          0.0|           0.0|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|       0.0|       0.0|       0.0|
|21010001200017|        1321|          0.0|      29097.78|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|  29097.78|       0.0| 296722.26|
|21010001200017|        1322|          0.0|        201.67|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|    201.67|       0.0| 296722.26|
|21010001200017|        1323|          0.0|     163194.37|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0| 163194.37|       0.0| 296722.26|
|21010001200017|       13248|          0.0|       1129.37|     0.0|     0.0|     0.0|     0.0|      0.0|  1129.37|       0.0|   1129.37|       0.0| 296722.26|
|21010001200017|       13251|          0.0|      49466.16|     0.0|     0.0|     0.0|     0.0|      0.0| 49466.16|       0.0|  49532.16|       0.0| 296722.26|
|21010001200017|       13258|          0.0|          66.0|     0.0|     0.0|     0.0|     0.0|      0.0|     66.0|       0.0|  49532.16|       0.0| 296722.26|
|21010001200017|        1328|          0.0|      53566.91|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|  53566.91|       0.0| 296722.26|
|21010001200017|        1341|          0.0|     142734.21|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0| 142734.21|       0.0| 145233.21|
|21010001200017|        1342|          0.0|        2499.0|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|    2499.0|       0.0| 145233.21|
|21010001200017|        1383|          0.0|       2550.01|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0|   2550.01|       0.0|   2550.01|
|21010001200017|        1641|          0.0|     236052.94|     0.0|     0.0|     0.0|     0.0|      0.0|      0.0|       0.0| 236052.94|       0.0| 236052.94|

This starts on an accounting file sorted by department, city code, account number, siret (our identifier for establishments).
However, lacking of knowledge, I'm doing something that breaks the heart :

First attempt that is costly but works, through RDD

/**
 * Créer un dataset de cumuls de comptes parents par siret.
 * @param session Session Spark.
 * @param comptes Dataset des comptes de comptabilités de tous les siret.
 * @return Dataset avec un siret associés à des cumuls par comptes à 7, 6, 5, 4, 3 chiffres, pour soldes de débit et soldes de crédit.
 */
private Dataset<Row> cumulsComptesParentsParSiret(SparkSession session, Dataset<Row> comptes) {
   JavaPairRDD<String, Iterable<Row>> rddComptesParSiret = comptes.javaRDD().groupBy((Function<Row, String>)compte -> compte.getAs("siret"));
   
   // Réaliser les cumuls par siret et compte, par compte parent.
   JavaRDD<Row> rdd = rddComptesParSiret.flatMap((FlatMapFunction<Tuple2<String, Iterable<Row>>, Row>)comptesSiret -> {
      String siret = comptesSiret._1();
      AccumulateurCompte comptesParentsPourSiret = new AccumulateurCompte(siret);

      for(Row rowCompte : comptesSiret._2()) {
         String numeroCompte = rowCompte.getAs("numeroCompte");
         Double soldeSD = rowCompte.getAs("soldeDebiteur");
         Double soldeSC = rowCompte.getAs("soldeCrediteur");
         
         comptesParentsPourSiret.add(numeroCompte, soldeSD, soldeSC);
      }

      // Faire une ligne de regroupement siret, compte et ses comptes parents.
      List<Row> rowsCumulsPourSiret = new ArrayList<>();
      
      for(Row rowCompte : comptesSiret._2()) {
         String numeroCompte = rowCompte.getAs("numeroCompte");
         double sd[] = new double[6]; 
         double sc[] = new double[6]; 
         
         for(int nombreChiffres = numeroCompte.length(); nombreChiffres >= 3; nombreChiffres--) {
            String compteParent = numeroCompte.substring(0, nombreChiffres);
            Double cumulDebits = comptesParentsPourSiret.getCumulSD(compteParent);
            Double cumulCredits = comptesParentsPourSiret.getCumulSC(compteParent);
            
            sd[nombreChiffres - 3] = cumulDebits != null ? Precision.round(cumulDebits, 2, BigDecimal.ROUND_CEILING) : 0.0;
            sc[nombreChiffres - 3] = cumulCredits != null ? Precision.round(cumulCredits, 2, BigDecimal.ROUND_CEILING) : 0.0;
         }
         
         Row rowCumulsPourCompte = RowFactory.create(siret, numeroCompte, sd[4], sc[4], sd[3], sc[3], sd[2], sc[2], sd[1], sc[1], sd[0], sc[0]);
         rowsCumulsPourSiret.add(rowCumulsPourCompte);
      }
      
      return rowsCumulsPourSiret.iterator();
   });
   
   return session.createDataFrame(rdd, schemaCumulComptesParents());
}

Second attempt, through datasets : efficient, but doesn't allow low-level management of accounting records

/**
 * Cumuler les sous-comptes.
 * @param comptes Dataset de comptes.
 * @return Dataset aux cumuls de comptes à 3, 4, 5, 6, 7 chiffres réalisés, par commune.
 */
private Dataset<Row> cumulsSousComptes(Dataset<Row> comptes) {
   Dataset<Row> comptesAvecCumuls = comptes; 
   
   for(int nombreChiffresNiveauCompte = 3; nombreChiffresNiveauCompte < 7; nombreChiffresNiveauCompte ++) {
      comptesAvecCumuls = cumulsCompteParent(comptesAvecCumuls, nombreChiffresNiveauCompte);
   }
   
   return comptesAvecCumuls;
}

/**
 * Cumul par un niveau de compte parent.
 * @param comptes Liste des comptes.
 * @param nombreChiffres Nombre de chiffres auquel réduire le compte à cummuler. Exemple 4 : 2041582 est cumulé sur 2041. 
 * @return cumuls par compte parent : dataset au format (cumul des soldes débiteurs, cumul des soldes créditeurs).
 */
private Dataset<Row> cumulsCompteParent(Dataset<Row> comptes, int nombreChiffres) {
   // Cumuler pour un niveau de compte parent sur le préfixe de leurs comptes réduits à nombreChiffres.
   Column nombreChiffresCompte = comptes.col("nombreChiffresNumeroCompte");
   
   String aliasNumeroCompte = MessageFormat.format("numeroCompteSur{0}", nombreChiffres);
   RelationalGroupedDataset group = comptes.groupBy(col("codeDepartement"), col("codeCommune"), col("siret"), col("numeroCompte").substr(1,nombreChiffres).as(aliasNumeroCompte));
   
   String nomChampCumulSD = MessageFormat.format("cumulSD{0}", nombreChiffres);
   String nomChampCumulSC = MessageFormat.format("cumulSC{0}", nombreChiffres);
   Column sd = sum(when(nombreChiffresCompte.$greater$eq(lit(nombreChiffres)), col("soldeDebiteur")).otherwise(lit(0.0))).as(nomChampCumulSD);
   Column sc = sum(when(nombreChiffresCompte.$greater$eq(lit(nombreChiffres)), col("soldeCrediteur")).otherwise(lit(0.0))).as(nomChampCumulSC);

   Dataset<Row> cumuls = group.agg(sd, sc);
   
   // Associer à chaque compte la colonne de cumuls de comptes parents, pour le niveau en question.
   Column jointure =  
      comptes.col("codeDepartement").equalTo(cumuls.col("codeDepartement"))
      .and(comptes.col("codeCommune").equalTo(cumuls.col("codeCommune")))
      .and(comptes.col("siret").equalTo(cumuls.col("siret")))
      .and(comptes.col("numeroCompte").substr(1, nombreChiffres).equalTo(cumuls.col(aliasNumeroCompte)));

   Dataset<Row> comptesAvecCumuls = comptes.join(cumuls, jointure, "left_outer")
      .drop(comptes.col("siret"))
      .drop(comptes.col("codeDepartement"))
      .drop(comptes.col("codeCommune"))
      .drop(comptes.col(nomChampCumulSD))
      .drop(comptes.col(nomChampCumulSC))
      .withColumnRenamed("cumulSD", nomChampCumulSD)
      .withColumnRenamed("cumulSC", nomChampCumulSC)
      .withColumn(nomChampCumulSD, round(col(nomChampCumulSD), 2))
      .withColumn(nomChampCumulSC, round(col(nomChampCumulSC), 2));
   
   return comptesAvecCumuls;
}

By low level management, I mean : some last minute verifications to emit some warnings or exclude at summation time some values :

  • If accounting nomenclature changes for one record among those of establishment.
  • To warn about a value that seems strange, regarding to other knowledge I have.

What I would like to have

I need to browse the rows content of each group independently. One group after the other.

I would need a Spark function that would offer me to implement a call-back method, where :

  • the first parameter would give the current keys values (for department code, city code, siret),
  • and the second one, the rows associated with these keys.
Dataset<Row> eachGroupContent(Row keys, Dataset<Row> groupContent);

It would be successively called by Spark with these entries parameters :

Row (keys) : {Department : 01, City code : 01001, siret : 21010001200017}
Dataset<Row> (values) associated :
+---------------+-----------+--------------+------------+-------------+--------------+--------+
|codeDepartement|codeCommune|         siret|numeroCompte|soldeDebiteur|soldeCrediteur|(others)|
+---------------+-----------+--------------+------------+-------------+--------------+--------+
|             01|      01001|21010001200017|        1021|          0.0|     349139.71|     ...|
|             01|      01001|21010001200017|       10222|          0.0|     584548.85|     ...|
|             01|      01001|21010001200017|       10223|          0.0|        4946.0|     ...|
|             01|      01001|21010001200017|       10226|          0.0|      53832.19|     ...|
Row : {Department : 01, City code : 01001, siret : 21010001200033}
Dataset<Row> :
|             01|      01001|21010001200033|        1021|          0.0|      38863.22|     ...|
|             01|      01001|21010001200033|       10222|          0.0|       62067.0|     ...|
|             01|      01001|21010001200033|       10228|          0.0|        9666.0|     ...|
|             01|      01001|21010001200033|        1068|          0.0|     100121.62|     ...|
Row : {Department : 01, City code : 01001, siret : 21010001200066}
Dataset<Row> :
|             01|      01001|21010001200066|        1641|          0.0|      100000.0|     ...|
|             01|      01001|21010001200066|        3355|    587689.33|           0.0|     ...|
|             01|      01001|21010001200066|        4011|          0.0|           0.0|     ...|
|             01|      01001|21010001200066|       40171|          0.0|       10036.5|     ...|

It's what was my first attempt was somewhat able to do,

rddComptesParSiret.flatMap((FlatMapFunction<Tuple2<String, Iterable<Row>>, Row>)comptesSiret

but without providing all the good keys (department and city code were missing breaking all the sorting previously done), and also : RDD are no more in favor.

But that I wasn't able to achieve in Java through RelationalGroupedDataset methods that don't seem to offer such tool.

Currently, I know how to do a groupBy or a sort, that way :

accounting.groupBy("department", "cityCode", "accountNumber", "siret").agg(...);

My question

How to browse
each record of
each group
[to perform sub calculations or other work]
group after group

答案1

得分: 1

[KeyValueGroupedDataset.mapGroups](https://spark.apache.org/docs/3.0.0/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#mapGroups-org.apache.spark.api.java.function.MapGroupsFunction-org.apache.spark.sql.Encoder-)将为您提供一个针对给定分组的所有行的迭代器。在实现[MapGroupsFunction](https://spark.apache.org/docs/3.0.0/api/java/org/apache/spark/api/java/function/MapGroupsFunction.html)接口时,您可以访问整个组的此迭代器。
```lang-java
Dataset<Row> df = spark.read().option("header", true).option("inferSchema", true).csv(...);
Dataset<Result> resultDf = df
.groupByKey((MapFunction<Row, Key>) (Row r)
-> new Key(r.getInt(r.fieldIndex("codeDepartement")),
r.getInt(r.fieldIndex("codeCommune")),
r.getLong(r.fieldIndex("siret"))),
Encoders.bean(Key.class))
.mapGroups(new MyMapGroupsFunction(), Encoders.bean(Result.class));
resultDf.show();

由于我们处于Java环境中,我们必须为数据集定义bean类。

首先是用于分组列的类:

public static class Key {
private int codeDepartement;
private int codeCommune;
private long siret;
//constructors, getters and setters
...
}

然后是用于结果列的类:

public static class Result {
private int codeDepartement;
private int codeCommune;
private long siret;
private double result1;
private double result2;
//constructors, getters and setters
...
}

在这个示例中,我使用了一个结果结构,其中包含了三个键列和两个计算列result1result2。可以在这里添加更多的结果列。

实际的逻辑发生在MyMapGroupsFunction内部:

public static class MyMapGroupsFunction implements MapGroupsFunction<Key, Row, Result> {
@Override
public Result call(Key key, Iterator<Row> values) throws Exception {
//将迭代器的内容读取到列表中。现在列表中包含了属于同一组的所有行
List<Row> rows = new ArrayList<>();
values.forEachRemaining(rows::add);
//现在可以使用任意的逻辑基于列表的内容来计算结果值
double result1 = 0;
double result2 = 0;
for (Row r : rows) {
double cumulSD3 = r.getDouble(r.fieldIndex("cumulSC3"));
double cumulSD4 = r.getDouble(r.fieldIndex("cumulSC4"));
result1 += cumulSD3 + cumulSD4;
result2 += cumulSD3 * cumulSD4;
}
//返回由键的元素和计算的值组成的结果
return new Result(key.getCodeDepartement(), key.getCodeCommune(),
key.getSiret(), result1, result2);
}
}

打印结果如下:

+-----------+---------------+--------------------+--------------------+--------------+
|codeCommune|codeDepartement|             result1|             result2|         siret|
+-----------+---------------+--------------------+--------------------+--------------+
|       1001|              1|   692508.8400000001|2.939458891576320...|21010001200019|
|       1001|              1|1.4411536300000003E7|8.198151013048245E12|21010001200017|
|       1001|              1|   692508.8400000001|2.939458891576320...|21010001200018|
+-----------+---------------+--------------------+--------------------+--------------+

如果可能的话,我建议切换到Scala。使用Scala,数据集API更易用。


<details>
<summary>英文:</summary>
[KeyValueGroupedDataset.mapGroups](https://spark.apache.org/docs/3.0.0/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#mapGroups-org.apache.spark.api.java.function.MapGroupsFunction-org.apache.spark.sql.Encoder-) will provide you with an iterator over all rows for a given group. When implementing the interface [MapGroupsFunction](https://spark.apache.org/docs/3.0.0/api/java/org/apache/spark/api/java/function/MapGroupsFunction.html) you can access this iterator over the whole group.
```lang-java
Dataset&lt;Row&gt; df = spark.read().option(&quot;header&quot;, true).option(&quot;inferSchema&quot;, true).csv(...);
Dataset&lt;Result&gt; resultDf = df
.groupByKey((MapFunction&lt;Row, Key&gt;) (Row r)
-&gt; new Key(r.getInt(r.fieldIndex(&quot;codeDepartement&quot;)),
r.getInt(r.fieldIndex(&quot;codeCommune&quot;)),
r.getLong(r.fieldIndex(&quot;siret&quot;))),
Encoders.bean(Key.class))
.mapGroups(new MyMapGroupsFunction(), Encoders.bean(Result.class));
resultDf.show();

Being in the Java world we have to define bean classes for the datasets.

One for the grouping columns:

public static class Key {
private int codeDepartement;
private int codeCommune;
private long siret;
//constructors, getters and setters
...
}

and one for the result columns:

public static class Result {
private int codeDepartement;
private int codeCommune;
private long siret;
private double result1;
private double result2;
//constructors, getters and setters
...
}

In this example I use a result structure consisting of the three key columns and two calculated columns result1 and result2. More result columns could be added here.

The actual logic happens inside of MyMapGroupsFunction:

public static class MyMapGroupsFunction implements MapGroupsFunction&lt;Key, Row, Result&gt; {
@Override
public Result call(Key key, Iterator&lt;Row&gt; values) throws Exception {
//drain the iterator into a list. The list now
//contains all rows that belong to one single group
List&lt;Row&gt; rows = new ArrayList&lt;&gt;();
values.forEachRemaining(rows::add);
//now any arbitrary logic can be used to calculate the result values 
//based on the contents of the list
double result1 = 0;
double result2 = 0;
for (Row r : rows) {
double cumulSD3 = r.getDouble(r.fieldIndex(&quot;cumulSC3&quot;));
double cumulSD4 = r.getDouble(r.fieldIndex(&quot;cumulSC4&quot;));
result1 += cumulSD3 + cumulSD4;
result2 += cumulSD3 * cumulSD4;
}
//return the result consisting of the elements of the key and the calculated values
return new Result(key.getCodeDepartement(), key.getCodeCommune(),
key.getSiret(), result1, result2);
}
}

Printing the result we get

+-----------+---------------+--------------------+--------------------+--------------+
|codeCommune|codeDepartement|             result1|             result2|         siret|
+-----------+---------------+--------------------+--------------------+--------------+
|       1001|              1|   692508.8400000001|2.939458891576320...|21010001200019|
|       1001|              1|1.4411536300000003E7|8.198151013048245E12|21010001200017|
|       1001|              1|   692508.8400000001|2.939458891576320...|21010001200018|
+-----------+---------------+--------------------+--------------------+--------------+

If it is possible to switch to Scala I would recommend to do so. The dataset API is much better usable with Scala.

答案2

得分: 1

以下是翻译好的部分:

首先我要感谢您@werner保持您的答案不变它对许多情况来说是最方便和有用的它验证了我提问的必要性因为我自己不会找到这个答案

所以我创建了这个键

/**
 * Clef de l'établissement dans la ville.
 */
static class ClefEtablissement {
   /** Code département. */
   private String codeDepartement;
   
   /** Code commune. */
   private String codeCommune;
   
   /** Numéro siret. */
   private String siret;
   
   /**
    * Construire la clef de l'établissement.
    * @param codeDepartement Code département.
    * @param codeCommune Code commune.
    * @param siret Numéro siret.
    */
   ClefEtablissement(String codeDepartement, String codeCommune, String siret) {
      this.setCodeDepartement(codeDepartement);
      this.setCodeCommune(codeCommune);
      this.setSiret(siret);
   }

   // ... (以下是一些方法的定义)
}

您提供的解决方案使用`mapGroups(...)`,这是它最常见的用法
因此它从*e*个机构中获取*n*个帐户并生成*e*行的`Dataset&lt;Result&gt;`,每个机构一个因为您的`Result call(Key key, Iterator&lt;Row&gt; values)`每次返回一个单独的`Result`。

但我的问题有点棘手我需要返回一个仍然有*n*行的`Dataset&lt;Row&gt;`,与一开始接收到的行相同但每行都添加了十列

以下是部分代码

正如您所看到的它涉及使用`ArrayList`,在末尾进行一些联接我还没有运行它)。但您可以看到整体问题它看起来笨拙并且不太安全

以下是另一部分代码

关于您对使用*Scala*的建议我不使用它有两个原因

1. 我的主题是对城市地方政府资产负债表等进行深入分析有许多业务规则需要遵循它不能由一个仅仅关注大数据主题的语言来处理

2. *Scala*可以调用*Java*函数但反之则不成立我的Java应用程序集包括GISAngular和其他少数服务今天它们可以在需要的时候使用*Spark*它与*Spring Boot*相关联

我希望就像*PySpark**SparkR*用户一样),Spark 3.0.0将继续为其他语言提供*Scala*API上定义的*Spark*方法的准确翻译

请注意,由于您要求只翻译代码部分,我已将非代码的部分删除。如果您需要更多翻译,请告诉我。

英文:

I add an answer here to illustrate appart the effects of your(s) solution(s) on my code.

First, I would like to thank you, @werner, and keep your answer how it is: it is the most convenient and useful for many cases, and it validates my need of asking my question, because I wouldn't have found this by myself.

So I've created the key :

/**
 * Clef de l&#39;&#233;tablissement dans la ville.
 */
static class ClefEtablissement {
   /** Code d&#233;partement. */
   private String codeDepartement;
   
   /** Code commune. */
   private String codeCommune;
   
   /** Num&#233;ro siret. */
   private String siret;
   
   /**
    * Construire la clef de l&#39;&#233;tablissement.
    * @param codeDepartement Code d&#233;partement.
    * @param codeCommune Code commune.
    * @param siret Num&#233;ro siret.
    */
   ClefEtablissement(String codeDepartement, String codeCommune, String siret) {
      this.setCodeDepartement(codeDepartement);
      this.setCodeCommune(codeCommune);
      this.setSiret(siret);
   }

   /**
    * Renvoyer le code du d&#233;partement.
    * @return Code du d&#233;partement.
    */
   public String getCodeDepartement() {
      return this.codeDepartement;
   }

   /**
    * Fixer le code du d&#233;partement.
    * @param codeDepartement Code du d&#233;partement. 
    */
   public void setCodeDepartement(String codeDepartement) {
      this.codeDepartement = codeDepartement;
   }

   /**
    * Renvoyer le code de la commune.
    * @return Code de la commune.
    */
   public String getCodeCommune() {
      return this.codeCommune;
   }

   /**
    * Fixer le code de la commune.
    * @param codeCommune Code de la commune.
    */
   public void setCodeCommune(String codeCommune) {
      this.codeCommune = codeCommune;
   }

   /**
    * Renvoyer le num&#233;ro SIRET.
    * @return Siret.
    */
   public String getSiret() {
      return this.siret;
   }

   /**
    * Fixer le num&#233;ro SIRET.
    * @param siret SIRET.
    */
   public void setSiret(String siret) {
      this.siret = siret;
   }
}

The solution you offer uses mapGroups(...) with it's most common use.
Therefore it takes n accounts from e establishments and produces a Dataset&lt;Result&gt; of e rows. One per establishment as your Result call(Key key, Iterator&lt;Row&gt; values) returns each time a single Result.

But my problem is tricky : I need in return a Dataset&lt;Row&gt; that has still n rows : the same than the ones received at the beginning, but with ten columns added on each.

/**
 * Cumuler les comptes racines sur chaque ligne.
 */
@SuppressWarnings(&quot;rawtypes&quot;)
public static class CumulComptesRacinesGroupFunction implements MapGroupsFunction&lt;ClefEtablissement, Row, ArrayList&gt; {
   /** Serial ID. */
   private static final long serialVersionUID = -7519513974536696466L;

   /**
    * Cumuler les comptes racines sur chaque ligne d&#39;un groupe.
    */
   @Override
   public ArrayList call(ClefEtablissement etablissement, Iterator&lt;Row&gt; values) throws Exception {
      List&lt;Row&gt; comptes = new ArrayList&lt;&gt;();
      values.forEachRemaining(comptes::add);
      
      ArrayList&lt;Row&gt; cumulsRow = new ArrayList&lt;&gt;();
      Map&lt;String, Double&gt; cumulsSoldesDebits = new HashMap&lt;&gt;();
      Map&lt;String, Double&gt; cumulsSoldesCredits = new HashMap&lt;&gt;();
      
      // Pour chaque compte, cumuler son solde dans comptes racines &#224; n chiffres (qu&#39;il a), n-1, n-2, n-3 ... 3 chiffres. 
      comptes.forEach(compte -&gt; {
         String numeroCompte = compte.getAs(&quot;numeroCompte&quot;);

         for(int nombreChiffres = numeroCompte.length(); nombreChiffres &gt;= 3; nombreChiffres--) {
            String compteParent = numeroCompte.substring(0, nombreChiffres);
         
            Double soldeDebit = compte.getAs(&quot;soldeDebiteur&quot;);
            Double soldeCredit = compte.getAs(&quot;soldeCrediteur&quot;);
            
            cumulsSoldesDebits.put(compteParent, cumulsSoldesDebits.get(compteParent) != null ? cumulsSoldesDebits.get(compteParent) + soldeDebit : soldeDebit);
            cumulsSoldesDebits.put(compteParent, cumulsSoldesCredits.get(compteParent) != null ? cumulsSoldesCredits.get(compteParent) + soldeCredit : soldeCredit);
         }
      });
      
      // Cr&#233;er des Row(siret, numeroCompte, cumulSoldesDebiteurs &#224; 7 chiffres, cumulSoldeCrediteur &#224; 7 chiffres, ..., , cumulSoldesDebiteurs &#224; 3 chiffres, cumulSoldeCrediteur &#224; 3 chiffres) 
      for(Row compte : comptes) {
         String numeroCompte = compte.getAs(&quot;numeroCompte&quot;);
         double sd[] = new double[6]; 
         double sc[] = new double[6]; 
         
         for(int nombreChiffres = numeroCompte.length(); nombreChiffres &gt;= 3; nombreChiffres--) {
            String compteParent = numeroCompte.substring(0, nombreChiffres);
            Double cumulDebits = cumulsSoldesDebits.get(compteParent);
            Double cumulCredits = cumulsSoldesCredits.get(compteParent);
            
            sd[nombreChiffres - 3] = cumulDebits != null ? Precision.round(cumulDebits, 2, BigDecimal.ROUND_CEILING) : 0.0;
            sc[nombreChiffres - 3] = cumulCredits != null ? Precision.round(cumulCredits, 2, BigDecimal.ROUND_CEILING) : 0.0;
         }
         
         Row rowCumulsPourCompte = RowFactory.create(etablissement.getSiret(), numeroCompte, sd[4], sc[4], sd[3], sc[3], sd[2], sc[2], sd[1], sc[1], sd[0], sc[0]);         
         cumulsRow.add(rowCumulsPourCompte);
      }

      return cumulsRow;
   }
}

As you can see it involves the use of an ArrayList, some unions at the ends (I haven't run it yet). But you see the overall problem : it is clumsy and looks... unsafe.

/**
 * Calculer Rassembler les comptes.
 * @param session Session Spark.
 * @param comptes Comptes candidats.
 * @return Liste des comptes compl&#233;t&#233;s sur chaque ligne de leur comptes racines cumul&#233;s.
 */
protected Dataset&lt;Row&gt; calculerRacinesDesComptes(SparkSession session, Dataset&lt;Row&gt; comptes) {
   Dataset&lt;ArrayList&gt; comptesParSiret = comptes
      .groupByKey((MapFunction&lt;Row, ClefEtablissement&gt;) (Row r) -&gt; 
         new ClefEtablissement(r.getAs(&quot;codeDepartement&quot;), r.getAs(&quot;codeCommune&quot;), r.getAs(&quot;siret&quot;)), Encoders.bean(ClefEtablissement.class))
      .mapGroups(new CumulComptesRacinesGroupFunction(), Encoders.bean(ArrayList.class));
   
   StructType schema = new StructType()
      .add(&quot;siret&quot;, StringType, false)
      .add(&quot;numeroCompte&quot;, StringType, false)
      .add(&quot;soldeDebiteur7chiffres&quot;, StringType, false)
      .add(&quot;soldeCrediteur7chiffres&quot;, StringType, false)
      .add(&quot;soldeDebiteur6chiffres&quot;, StringType, false)
      .add(&quot;soldeCrediteur6chiffres&quot;, StringType, false)
      .add(&quot;soldeDebiteur5chiffres&quot;, StringType, false)
      .add(&quot;soldeCrediteur5chiffres&quot;, StringType, false)
      .add(&quot;soldeDebiteur4chiffres&quot;, StringType, false)
      .add(&quot;soldeCrediteur4chiffres&quot;, StringType, false)
      .add(&quot;soldeDebiteur3chiffres&quot;, StringType, false)
      .add(&quot;soldeCrediteur3chiffres&quot;, StringType, false);            
         
   List&lt;Dataset&lt;Row&gt;&gt; ensembles = new ArrayList&lt;&gt;();
   
   comptesParSiret.foreach((ForeachFunction&lt;ArrayList&gt;) comptesAvecCumulsPourUnSiret -&gt; {
      Dataset&lt;Row&gt; ensembleComptesSiret = session.createDataFrame(comptesAvecCumulsPourUnSiret, schema);
      ensembles.add(ensembleComptesSiret); 
   });
   
   Dataset&lt;Row&gt; union = null;
   
   for(Dataset&lt;Row&gt; ensemble : ensembles) {
      union = union != null ? union.union(ensemble) : union;
   }
   
   if (union == null) {
      // FIXME : I don&#39;t remember how to create an empty dataset with an underlying schema.
   }

   return union;
}

About your recommandation for the use of Scala. I don't use it for two reasons :

  1. My subject is a deep analysis of cities, local autorities, balance accounts... and has many business rules to follow. It cannot be handled by a language that "only" wants to focus on Big Data thematics.

  2. Scala can call Java functions, but the reverse isn't true.
    My Java set of applications include GIS, Angular and few others services. Today they can use Spark at the moment they wish to : it is associated with Spring Boot.

I hope (like PySpark or SparkR users) that Spark 3.0.0 will continue to give accurate translations for Spark methods defined on Scala API for others languages.

huangapple
  • 本文由 发表于 2020年9月12日 13:45:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/63857257.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定