重构 PySpark 到 Snowflake Snowpark 代码

huangapple go评论59阅读模式
英文:

Refactoring PySpark to Snowflake Snowpark code

问题

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def checkHiveTableExists(tableName, stageName="base"):
    try:
        session.execute(f"DESCRIBE TABLE {stageName}.{tableName}")
        return True
    except Exception as e:
        print(f"An error was thrown, <<{e}>>")
        return False
英文:

Most of code is written in PySpark to be executed on Databricks.

I am evaluating SnowFlake with it's ability to execute Python with Snowpark.

Can someone let me know how I might go about refactoring the following PySpark function to Snowpark

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

def checkHiveTableExists(tableName,stageName=&quot;base&quot;): 
  
  try:
    spark.sql(f&quot;describe {stageName}{tableName}&quot;)
    return True
  except Exception as e:
    print(f&quot;An error was thrown, &lt;&lt;{e}&gt;&gt;&quot;)
    return False

My attempt is a follows:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
from snowflake.snowpark import session


def checkHiveTableExists(tableName,stageName=&quot;base&quot;): 
 

  try:
    spark.sql(f&quot;describe {stageName}{tableName}&quot;)
    return True
  except Exception as e:
    print(f&quot;An error was thrown, &lt;&lt;{e}&gt;&gt;&quot;)
    return False
  
def main(session: snowpark.Session):
    return checkHiveTableExists(False)

But it failed.

Any thoughts?

答案1

得分: 1

Here's the translated code:

import snowflake.snowpark as snowpark

def checkTableExists(session: snowpark.Session, tableName, schemaName="BASE"): 
  try:
    session.sql(f"DESC TABLE IDENTIFIER('{schemaName}.{tableName}')").collect()
    return True
  except Exception as e:
    #print(f"An error was thrown, <<{e}>>")
    return False
  
def main(session: snowpark.Session):
    return checkTableExists(session, 'TEST')

Please note that I've removed the HTML entities like &quot; and &#39; for better readability in the translated code.

英文:

Original code:

def checkHiveTableExists(tableName,stageName=&quot;base&quot;): 
  
  try:
    spark.sql(f&quot;describe {stageName}{tableName}&quot;)
    return True
  except Exception as e:
    print(f&quot;An error was thrown, &lt;&lt;{e}&gt;&gt;&quot;)
    return False

Copy-paste approach into Snowflake will not work.

  • There is no spark module
  • the direct SQL is also different DESC ... => DESC TABLE ...

The original code itself is somehow tricky, because for object existence is uses query to describe the object

Second: describe {stageName}{tableName} string interpolation makes it prone to SQL Injection - somebody could try to call it as checkHivetableExists(&quot;someName&#39;; DROP TABLE ...&quot;)


When translating code the focus should be on behavior.

Personally I would perform a metadata check INFORMATION_SCHEMA.TABLES and see if row exists - user calling this code needs to have permission to access this table.

Anyway, trying to be as as close as possible to original code:

import snowflake.snowpark as snowpark

def checkTableExists(session: snowpark.Session, tableName, schemaName=&quot;BASE&quot;): 
  try:
    session.sql(f&quot;DESC TABLE IDENTIFIER(&#39;{schemaName}.{tableName}&#39;)&quot;).collect()
    return True
  except Exception as e:
    #print(f&quot;An error was thrown, &lt;&lt;{e}&gt;&gt;&quot;)
    return False
  
def main(session: snowpark.Session):
    return checkTableExists(session, &#39;TEST&#39;)

huangapple
  • 本文由 发表于 2023年5月13日 23:29:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/76243537.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定