数据停止推送到BigQuery。

huangapple go评论99阅读模式
英文:

Data stops pushing to bigquery

问题

I'm trying to load data from PostgreSQL database to BigQuery using fetch with cursor and encountering an issue where it doesn't push enough data to BQ. It only inserts the first batch (1000 rows) and doesn't show any errors in the log.

If this code is executed on my laptop, it works well. However, when I bring it to GCP Composer, it behaves differently.

The data consists of 8 columns:

  • 4 columns with integer values ranging from 1 to 20 million (e.g., user_id).
  • 2 columns containing strings (e.g., user_name, hash).
  • 2 columns containing date values (e.g., created_date, dwh_created_date).
    The total number of rows is approximately 100,000.

Below is my code. I've already attempted to introduce sleep time for each fetch, thinking it might need processing time or that Google might have a gap time for API requests. The data frame contains enough data, so I suspect there might be another issue.

with cursor:
    cursor.execute(sql_query)
    while True:
        rows = cursor.fetchmany(1000)
        if not rows:
            break
        logger.info(f"rows: {len(rows)}")
        column_names = [desc[0] for desc in cursor.description]
        logger.info(f"Column name: {column_names}")
        df = pd.DataFrame(rows, columns=column_names)
        df.reset_index(drop=True, inplace=True)
        if schema_dict is not None and selected_column is not None:
            df = df[selected_column]
            df = convert_pandas_datatype(df, schema_dict)
        client.load_table_from_dataframe(
            df,
            table_id,
            job_config=job_config
        )
conn.close()

So, how can I ensure that enough data is input into BigQuery?

英文:

I'm trying to load data from postgresql db to bigquery by using fetch with cursor and have the issue that it wouldn't push enough data to BQ, it only inserted the first batch (1000 rows) and gave no error in log as well.

If this code is run on my laptop, it will work well. But, it's a different story if I bring it to GCP Composer.

The data only contains 8 columns, 4 columns with int value range from 1 to 20M (like user_id), 2 columns contain string (like user_name, hash), 2 columns contain date value (created_date, dwh_created_date). Total row will be ~100k.

Below is my code. I already tried to input the sleep time for each fetch, cuz I thought that it needs time for processing also maybe Google would have a gap time for API requesting. The data frame contains enough data, so I suspected there should be something else.

with cursor:
    cursor.execute(sql_query)
    while True:
        rows = cursor.fetchmany(1000)
        if not rows:
            break
        logger.info(f"rows :{len(rows)}")
        column_names = [desc[0] for desc in cursor.description]
        logger.info(f"Column name: {column_names}")
        df = pd.DataFrame(rows, columns=column_names)
        df.reset_index(drop=True, inplace=True)
        if schema_dict is not None and selected_column is not None:
            df = df[selected_column]
            df = convert_pandas_datatype(df, schema_dict)
        client.load_table_from_dataframe(
            df,
            table_id,
            job_config=job_config
        )
        # from time import sleep
        # sleep(5)
        # print("sleeping............")
conn.close()

So how can I input enough data to BigQuery..

答案1

得分: 1

根据Google Cloud文档,要等待作业完成,您需要使用result()函数。job.result()函数将等待作业完成。示例:rows = query_job.result()。您可以按如下方式编辑您的代码:

with cursor:
    cursor.execute(sql_query)
    while True:
        rows = cursor.fetchmany(1000)
        if not rows:
            break
        logger.info(f"rows :{len(rows)}")
        column_names = [desc[0] for desc in cursor.description]
        logger.info(f"Column name: {column_names}")
        df = pd.DataFrame(rows, columns=column_names)
        df.reset_index(drop=True, inplace=True)
        if schema_dict is not None and selected_column is not None:
            df = df[selected_column]
            df = convert_pandas_datatype(df, schema_dict)
       job = client.load_table_from_dataframe(
            df,
            table_id,
            job_config=job_config
        )
       job.result()
       
conn.close()

不要不必要地使用sleep()函数,因为此方法会暂停当前线程的执行一定数量的秒数。这将创建不必要的问题。有关更多信息,您可以查看这个链接

英文:

According to the google cloud documentation to wait for a job to complete you need to use the result() function.The job.result() function will wait for the job to complete. Example:rows = query_job.result() . You can edit your code as below:

with cursor:
    cursor.execute(sql_query)
    while True:
        rows = cursor.fetchmany(1000)
        if not rows:
            break
        logger.info(f"rows :{len(rows)}")
        column_names = [desc[0] for desc in cursor.description]
        logger.info(f"Column name: {column_names}")
        df = pd.DataFrame(rows, columns=column_names)
        df.reset_index(drop=True, inplace=True)
        if schema_dict is not None and selected_column is not None:
            df = df[selected_column]
            df = convert_pandas_datatype(df, schema_dict)
       job= client.load_table_from_dataframe(
            df,
            table_id,
            job_config=job_config
        )
       job.result()
       
conn.close()

Never use the sleep() function unnecessarily because this method suspends the execution of the current thread for a given number of seconds. This will create unnecessary problems. For more information, you can follow this link

huangapple
  • 本文由 发表于 2023年1月6日 12:59:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75027088.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定