英文:
Data stops pushing to bigquery
问题
I'm trying to load data from PostgreSQL database to BigQuery using fetch with cursor and encountering an issue where it doesn't push enough data to BQ. It only inserts the first batch (1000 rows) and doesn't show any errors in the log.
If this code is executed on my laptop, it works well. However, when I bring it to GCP Composer, it behaves differently.
The data consists of 8 columns:
- 4 columns with integer values ranging from 1 to 20 million (e.g., user_id).
- 2 columns containing strings (e.g., user_name, hash).
- 2 columns containing date values (e.g., created_date, dwh_created_date).
The total number of rows is approximately 100,000.
Below is my code. I've already attempted to introduce sleep time for each fetch, thinking it might need processing time or that Google might have a gap time for API requests. The data frame contains enough data, so I suspect there might be another issue.
with cursor:
cursor.execute(sql_query)
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
logger.info(f"rows: {len(rows)}")
column_names = [desc[0] for desc in cursor.description]
logger.info(f"Column name: {column_names}")
df = pd.DataFrame(rows, columns=column_names)
df.reset_index(drop=True, inplace=True)
if schema_dict is not None and selected_column is not None:
df = df[selected_column]
df = convert_pandas_datatype(df, schema_dict)
client.load_table_from_dataframe(
df,
table_id,
job_config=job_config
)
conn.close()
So, how can I ensure that enough data is input into BigQuery?
英文:
I'm trying to load data from postgresql db to bigquery by using fetch with cursor and have the issue that it wouldn't push enough data to BQ, it only inserted the first batch (1000 rows) and gave no error in log as well.
If this code is run on my laptop, it will work well. But, it's a different story if I bring it to GCP Composer.
The data only contains 8 columns, 4 columns with int value range from 1 to 20M (like user_id), 2 columns contain string (like user_name, hash), 2 columns contain date value (created_date, dwh_created_date). Total row will be ~100k.
Below is my code. I already tried to input the sleep time for each fetch, cuz I thought that it needs time for processing also maybe Google would have a gap time for API requesting. The data frame contains enough data, so I suspected there should be something else.
with cursor:
cursor.execute(sql_query)
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
logger.info(f"rows :{len(rows)}")
column_names = [desc[0] for desc in cursor.description]
logger.info(f"Column name: {column_names}")
df = pd.DataFrame(rows, columns=column_names)
df.reset_index(drop=True, inplace=True)
if schema_dict is not None and selected_column is not None:
df = df[selected_column]
df = convert_pandas_datatype(df, schema_dict)
client.load_table_from_dataframe(
df,
table_id,
job_config=job_config
)
# from time import sleep
# sleep(5)
# print("sleeping............")
conn.close()
So how can I input enough data to BigQuery..
答案1
得分: 1
根据Google Cloud文档,要等待作业完成,您需要使用result()函数。job.result()函数将等待作业完成。示例:rows = query_job.result()
。您可以按如下方式编辑您的代码:
with cursor:
cursor.execute(sql_query)
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
logger.info(f"rows :{len(rows)}")
column_names = [desc[0] for desc in cursor.description]
logger.info(f"Column name: {column_names}")
df = pd.DataFrame(rows, columns=column_names)
df.reset_index(drop=True, inplace=True)
if schema_dict is not None and selected_column is not None:
df = df[selected_column]
df = convert_pandas_datatype(df, schema_dict)
job = client.load_table_from_dataframe(
df,
table_id,
job_config=job_config
)
job.result()
conn.close()
不要不必要地使用sleep()
函数,因为此方法会暂停当前线程的执行一定数量的秒数。这将创建不必要的问题。有关更多信息,您可以查看这个链接。
英文:
According to the google cloud documentation to wait for a job to complete you need to use the result() function.The job.result() function will wait for the job to complete. Example:rows = query_job.result()
. You can edit your code as below:
with cursor:
cursor.execute(sql_query)
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
logger.info(f"rows :{len(rows)}")
column_names = [desc[0] for desc in cursor.description]
logger.info(f"Column name: {column_names}")
df = pd.DataFrame(rows, columns=column_names)
df.reset_index(drop=True, inplace=True)
if schema_dict is not None and selected_column is not None:
df = df[selected_column]
df = convert_pandas_datatype(df, schema_dict)
job= client.load_table_from_dataframe(
df,
table_id,
job_config=job_config
)
job.result()
conn.close()
Never use the sleep()
function unnecessarily because this method suspends the execution of the current thread for a given number of seconds. This will create unnecessary problems. For more information, you can follow this link
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论