BigQuery running in Airflow is running query in incorrect region even though different region is explicitly specified

huangapple go评论65阅读模式
英文:

BigQuery running in Airflow is running query in incorrect region even though different region is explicitly specified

问题

I'm trying to set a DEFAULT_TABLE_EXPIRATION_DAYS=7 on BigQuery schemas/datasets using a FOR loop. All my datasets are location in multi-region EU and I'm getting a list of datasets from the information schema.

Here's the query I planned to use for this:

FOR record IN (
                  SELECT schema_name
                  FROM `region-eu`.INFORMATION_SCHEMA.SCHEMATA
                  WHERE REGEXP_CONTAINS(schema_name, r'^analytics_\d+$')
              )
    DO
        EXECUTE IMMEDIATE FORMAT('''
            ALTER SCHEMA %s
                SET OPTIONS (
                    DEFAULT_TABLE_EXPIRATION_DAYS = 7
                    );
            ''', record.schema_name);
    END FOR;

The query should run in Airflow (currently using 1.10.15, running on GCP Cloud Composer 1), once a week. However, when I run the query in Airflow, I receive following error.

Not found: Table <project_id>:region-eu.INFORMATION_SCHEMA.SCHEMATA was not found in location US at [3:19]

This is a strange issue, since I'm explicitly querying the region-eu information schema tables, but Airflow (or BigQuery) seems to run this query in US instead. I also tried to specify location='eu' in the Airflow task, but the same error occurs. When I run the below query separately in Airflow, everything is working and the SCHEMATA view is queried successfully.

SELECT schema_name
FROM `region-eu`.INFORMATION_SCHEMA.SCHEMATA
WHERE REGEXP_CONTAINS(schema_name, r'^analytics_\d+$')

What's even stranger, I have a similar script, which also queries region-eu.INFORMATION_SCHEMA.SCHEMATA and uses a FOR loop, and that script is working without issues.

DECLARE start_date_ft, end_date_ft STRING;
DECLARE start_date, end_date DATE;

SET start_date = CURRENT_DATE() - 1;
SET end_date = CURRENT_DATE();

SET start_date_ft = FORMAT_DATE('%Y%m%d', start_date);
SET end_date_ft = FORMAT_DATE('%Y%m%d', end_date);

FOR record IN (
                  SELECT schema_name, REGEXP_EXTRACT(schema_name, r'\d+') AS ga_property_id
                  FROM `region-eu`.INFORMATION_SCHEMA.SCHEMATA
                  WHERE REGEXP_CONTAINS(schema_name, r'^analytics_\d+$')
              )
    DO
        EXECUTE IMMEDIATE FORMAT('''
            DELETE
            FROM dataset.table
            WHERE TRUE
              AND event_date >= '%s'
              AND ga_property_id = '%s'
            ;
            ''', CAST(start_date AS STRING), record.ga_property_id);
        EXECUTE IMMEDIATE FORMAT('''
            INSERT INTO dataset.table
            SELECT *
            ...
                ''',record.ga_property_id, record.schema_name, record.ga_property_id, start_date_ft, end_date_ft
            );

    END FOR;

Any ideas why this is happening and how to fix this?

英文:

I'm trying to set a DEFAULT_TABLE_EXPIRATION_DAYS=7 on BigQuery schemas/datasets using a FOR loop. All my datasets are location in multi-region EU and I'm getting a list of datasets from the information schema.

Here's the query I planned to use for this:

FOR record IN (
                  SELECT schema_name
                  FROM `region-eu`.INFORMATION_SCHEMA.SCHEMATA
                  WHERE REGEXP_CONTAINS(schema_name, r&#39;^analytics_\d+$&#39;)
              )
    DO
        EXECUTE IMMEDIATE FORMAT(&#39;&#39;&#39;
            ALTER SCHEMA %s
                SET OPTIONS (
                    DEFAULT_TABLE_EXPIRATION_DAYS = 7
                    );
            &#39;&#39;&#39;, record.schema_name);
    END FOR;

The query should run in Airflow (currently using 1.10.15, running on GCP Cloud Composer 1), once a week. However, when I run the query in Airflow, I receive following error.

Not found: Table &lt;project_id&gt;:region-eu.INFORMATION_SCHEMA.SCHEMATA was not found in location US at [3:19]

This is a strange issue, since I'm explicitly querying the region-eu information schema tables, but Airflow (or BigQuery) seems to run this query in US instead. I also tried to specify location=&#39;eu&#39; in the Airflow task, but the same error occurs. When I run the below query separately in Airflow, everything is working and the SCHEMATA view is queried successfully.

SELECT schema_name
FROM `region-eu`.INFORMATION_SCHEMA.SCHEMATA
WHERE REGEXP_CONTAINS(schema_name, r&#39;^analytics_\d+$&#39;)

What's even stranger, I have a similar script, which also queries region-eu.INFORMATION_SCHEMA.SCHEMATA and uses a FOR loop, and that script is working without issues.

DECLARE start_date_ft, end_date_ft STRING;
DECLARE start_date, end_date DATE;

SET start_date = CURRENT_DATE() - 1;
SET end_date = CURRENT_DATE();

SET start_date_ft = FORMAT_DATE(&#39;%Y%m%d&#39;, start_date);
SET end_date_ft = FORMAT_DATE(&#39;%Y%m%d&#39;, end_date);

FOR record IN (
                  SELECT schema_name, REGEXP_EXTRACT(schema_name, r&#39;\d+&#39;) AS ga_property_id
                  FROM `region-eu`.INFORMATION_SCHEMA.SCHEMATA
                  WHERE REGEXP_CONTAINS(schema_name, r&#39;^analytics_\d+$&#39;)
              )
    DO
        EXECUTE IMMEDIATE FORMAT(&#39;&#39;&#39;
            DELETE
            FROM dataset.table
            WHERE TRUE
              AND event_date &gt;= &#39;%s&#39;
              AND ga_property_id = &#39;%s&#39;
            ;
            &#39;&#39;&#39;, CAST(start_date AS STRING), record.ga_property_id);
        EXECUTE IMMEDIATE FORMAT(&#39;&#39;&#39;
            INSERT INTO dataset.table
            SELECT *
            ...
                &#39;&#39;&#39;,record.ga_property_id, record.schema_name, record.ga_property_id, start_date_ft, end_date_ft
            );

    END FOR;

Any ideas why this is happening and how to fix this?

答案1

得分: 1

在最后,我成功地通过首先创建一个“TEMP TABLE”来解决了这个问题,该表存储了来自“region-eu.INFORMATION_SCHEMA.SCHEMATA”的信息,然后对这个临时表进行了“FOR”循环处理。

在Airflow中不需要直接指定位置。最终的脚本如下所示:

CREATE TEMP TABLE schemas_to_set_expiry AS
SELECT schema_name
FROM `region-eu`.INFORMATION_SCHEMA.SCHEMATA
WHERE REGEXP_CONTAINS(schema_name, r'^analytics_\d+$');

FOR record IN (SELECT schema_name FROM schemas_to_set_expiry)
    DO
        EXECUTE IMMEDIATE FORMAT('
            ALTER SCHEMA %s
                SET OPTIONS (
                    DEFAULT_TABLE_EXPIRATION_DAYS = 7
                    );
            ', record.schema_name);
    END FOR;
英文:

In the end, I managed to work around the issue by first creating a TEMP TABLE which stored information from region-eu.INFORMATION_SCHEMA.SCHEMATA and then doing a FOR loop over this temporary table.

There was no need to specify a location in Airflow directly. The final script looks like this:

CREATE TEMP TABLE schemas_to_set_expiry AS
SELECT schema_name
FROM `region-eu`.INFORMATION_SCHEMA.SCHEMATA
WHERE REGEXP_CONTAINS(schema_name, r&#39;^analytics_\d+$&#39;);

FOR record IN (SELECT schema_name FROM schemas_to_set_expiry)
    DO
        EXECUTE IMMEDIATE FORMAT(&#39;&#39;&#39;
            ALTER SCHEMA %s
                SET OPTIONS (
                    DEFAULT_TABLE_EXPIRATION_DAYS = 7
                    );
            &#39;&#39;&#39;, record.schema_name);
    END FOR;

huangapple
  • 本文由 发表于 2023年5月22日 19:20:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76305630.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定