收到来自Spark作业中bcp客户端的无效列长度。

huangapple go评论68阅读模式
英文:

Received an invalid column length from the bcp client in spark job

问题

我正在使用Spark并想要将数据框存储到SQL数据库中。这可以实现,但在保存日期时间列时出现问题。

from pyspark.sql import SparkSession, Row
from pyspark.sql.types import IntegerType, TimestampType, StructType, StructField, StringType
from datetime import datetime

...

spark = SparkSession.builder \
    ...
    .getOrCreate()

# 创建数据框

rdd = spark.sparkContext.parallelize([
    Row(id=1, title='string1', created_at=datetime.now())
])

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("title", StringType(), False),
    StructField("created_at", TimestampType(), True)
])

df = spark.createDataFrame(rdd, schema)

df.show()

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("truncate", True) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

模式:

收到来自Spark作业中bcp客户端的无效列长度。

错误:

com.microsoft.sqlserver.jdbc.SQLServerException: Received an invalid column length from the bcp client for colid 3

根据我的理解,错误指出datetime.now()具有无效的长度。但如果它是标准日期时间,为什么会出现这种情况呢?有关问题的任何想法?

英文:

I am playing around with spark and wanted to store a data frame in a sql database. It works but not when saving a datetime column:

from pyspark.sql import SparkSession,Row
from pyspark.sql.types import IntegerType,TimestampType,StructType,StructField,StringType
from datetime import datetime

...

spark = SparkSession.builder \
    ...
    .getOrCreate()
    
# Create DataFrame 

rdd = spark.sparkContext.parallelize([
    Row(id=1, title='string1', created_at=datetime.now())
])

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("title", StringType(), False),
    StructField("created_at", TimestampType(), True)
])

df = spark.createDataFrame(rdd, schema)

df.show()

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("truncate", True) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Schema:

收到来自Spark作业中bcp客户端的无效列长度。

Error:

com.microsoft.sqlserver.jdbc.SQLServerException: Received an invalid column length from the bcp client for colid 3

From my understanding the error states that datetime.now() has invalid length. But how can that be, if it is a standard datetime? Any ideas what the issue is?

答案1

得分: 2

SQLServer的datetime数据类型具有时间范围-从00:00:0023:59:59.997datetime.now()的输出将不适用于datetime,您需要将SQLServer表上的数据类型更改为datetime2

英文:

SQLServer datetime datatype has time range - 00:00:00 through 23:59:59.997. output of datetime.now() will not fit in for datetime, you need to change the datatype on SQLSever table to datetime2

答案2

得分: 2

创建数据框的代码存在问题,缺少库。下面的代码可以正确创建数据框。

# 1 - 创建测试数据框

# 库
from pyspark.sql import Row
from datetime import datetime
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

# 创建 rdd
rdd = spark.sparkContext.parallelize([Row(id=1, title='string1', created_at=datetime.now())])

# 定义结构
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("title", StringType(), False),
    StructField("created_at", TimestampType(), True)
])

# 创建数据框
df = spark.createDataFrame(rdd, schema)

# 显示数据框
display(df)

输出如上所示。我们需要创建一个符合空值和数据类型规范的表。

下面的代码创建了一个名为 stack_overflow 的表。

-- 删除表
drop table if exists stack_overflow
go

-- 创建表
create table stack_overflow
(
  id int not null,
  title varchar(100) not null,
  created_at datetime2 null
)
go

-- 显示数据
select * from stack_overflow
go

接下来,我们需要定义连接属性。

# 2 - 设置连接属性

server_name = "jdbc:sqlserver://svr4tips2030.database.windows.net"
database_name = "dbs4tips2020"
url = server_name + ";" + "databaseName=" + database_name + ";"
user_name = "jminer"
password = "<your password here>"
table_name = "stack_overflow"

最后,我们要执行写入数据框的代码。

# 3 - 写入测试数据框

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("truncate", True) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", user_name) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

执行选择查询显示数据已正确写入。

简言之,查看 Spark SQL 类型的文档。我发现 datetime2 很好用。

链接:https://learn.microsoft.com/en-us/dotnet/api/microsoft.spark.sql.types?view=spark-dotnet

需要注意的是,这段代码无法处理日期时间偏移。而且,在 Spark 中没有用于偏移映射的数据类型。

# 示例日期时间偏移值
import pytz
from datetime import datetime, timezone, timedelta
user_timezone_setting = 'US/Pacific'
user_timezone = pytz.timezone(user_timezone_setting)
the_event = datetime.now() 
localized_event = user_timezone.localize(the_event)
print(localized_event)

上述代码创建了具有如下数据的变量。

但是,一旦将其转换为数据框,它会失去偏移量,因为被转换为了 UTC 时间。如果 UTC 偏移很重要,你将不得不将该信息作为单独的整数传递。

英文:

There are problems with the code to create the dataframe. You are missing libraries. The code below creates the dataframe correctly.

#
#  1 - Make test dataframe
#

# libraries
from pyspark.sql import Row
from datetime import datetime
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

# create rdd
rdd = spark.sparkContext.parallelize([Row(id=1, title=&#39;string1&#39;, created_at=datetime.now())])

# define structure
schema = StructType([
    StructField(&quot;id&quot;, IntegerType(), False),
    StructField(&quot;title&quot;, StringType(), False),
    StructField(&quot;created_at&quot;, TimestampType(), True)
])

# create df
df = spark.createDataFrame(rdd, schema)

# show df
display(df)

收到来自Spark作业中bcp客户端的无效列长度。

The output is shown above. We need to create a table that follows the nullability and data types.

The code below creates a table called stack_overflow.

-- drop table
drop table if exists stack_overflow
go

-- create table
create table stack_overflow
(
  id int not null,
  title varchar(100) not null,
  created_at datetime2 null
)
go

-- show data
select * from stack_overflow
go

Next, we need to define our connection properties.

#
#  2 - Set connection properties
#

server_name = &quot;jdbc:sqlserver://svr4tips2030.database.windows.net&quot;
database_name = &quot;dbs4tips2020&quot;
url = server_name + &quot;;&quot; + &quot;databaseName=&quot; + database_name + &quot;;&quot;
user_name = &quot;jminer&quot;
password = &quot;&lt;your password here&gt;&quot;
table_name = &quot;stack_overflow&quot;

Last, we want to execute the code to write the dataframe.

#
#  3 - Write test dataframe
#

try:
  df.write \
    .format(&quot;com.microsoft.sqlserver.jdbc.spark&quot;) \
    .mode(&quot;overwrite&quot;) \
    .option(&quot;truncate&quot;, True) \
    .option(&quot;driver&quot;, &quot;com.microsoft.sqlserver.jdbc.SQLServerDriver&quot;) \
    .option(&quot;url&quot;, url) \
    .option(&quot;dbtable&quot;, table_name) \
    .option(&quot;user&quot;, user_name) \
    .option(&quot;password&quot;, password) \
    .save()
except ValueError as error :
    print(&quot;Connector write failed&quot;, error)

Executing a select query shows that the data was written correctly.

收到来自Spark作业中bcp客户端的无效列长度。

In short, look at the documentation for Spark SQL Types. I found out that datetime2 works nicely.

https://learn.microsoft.com/en-us/dotnet/api/microsoft.spark.sql.types?view=spark-dotnet

One word of caution, this code does not handle date time offset. Also, there is no data type in Spark to use in an offset mapping.

# Sample date time offset value
import pytz
from datetime import datetime, timezone, timedelta
user_timezone_setting = &#39;US/Pacific&#39;
user_timezone = pytz.timezone(user_timezone_setting)
the_event = datetime.now() 
localized_event = user_timezone.localize(the_event)
print(localized_event)

The code above creates a variable with the following data.

收到来自Spark作业中bcp客户端的无效列长度。

But once we cast it to a dataframe, it loses the offset since it is converted to UTC time. If UTC offset is important, you will have to pass that information as separate integer.

收到来自Spark作业中bcp客户端的无效列长度。

huangapple
  • 本文由 发表于 2023年2月14日 22:37:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/75449370.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定