Using multiple sets of credentials in to_parquet when transfering to s3 using pandas

huangapple go评论62阅读模式
英文:

Using multiple sets of credentials in to_parquet when transfering to s3 using pandas

问题

以下是您要翻译的部分:

"I have a pipeline that does some data processing and then transfers parquet files to s3.

I wanted to push this data directly to s3 without saving it locally, so I thought the best way for that would just be to use an s3 URI in the to_parquet call, as follows:

@task(name='upload_parquet', retries=2, retry_delay_seconds=2)
def upload_to_s3(df: pd.DataFrame, bucket: str, key: str):
    bucket = bucket if not bucket.endswith('/') else bucket[:-1]

    access_key, secret_key = get_s3_credentials(bucket)
    os.environ['AWS_ACCESS_KEY_ID'] = access_key
    os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key.get_secret_value()

    df.to_parquet(f's3://{bucket}/{key}', engine='pyarrow', compression='snappy')

    del os.environ['AWS_ACCESS_KEY_ID']
    del os.environ['AWS_SECRET_ACCESS_KEY']

This works for the first bucket. But, as soon as the second bucket (and second set of credentials) is used, I receive an AccessDenied error.

My current guess is that boto3 might possibly cache the credentials at some level.

What's the recommended way of dealing with this situation? I wish to keep it as simple as possible."

英文:

I have a pipeline that does some data processing and then transfers parquet files to s3.

I wanted to push this data directly to s3 without saving it locally, so I thought the best way for that would just be to use an s3 URI in the to_parquet call, as follows:

@task(name='upload_parquet', retries=2, retry_delay_seconds=2)
def upload_to_s3(df: pd.DataFrame, bucket: str, key: str):
    bucket = bucket if not bucket.endswith('/') else bucket[:-1]

    access_key, secret_key = get_s3_credentials(bucket)
    os.environ['AWS_ACCESS_KEY_ID'] = access_key
    os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key.get_secret_value()

    df.to_parquet(f's3://{bucket}/{key}', engine='pyarrow', compression='snappy')

    del os.environ['AWS_ACCESS_KEY_ID']
    del os.environ['AWS_SECRET_ACCESS_KEY']

This works for the first bucket. But, as soon as the second bucket (and second set of credentials) is used, I receive an AccessDenied error.

My current guess is that boto3 might possibly cache the credentials at some level.

What's the recommended way of dealing with this situation? I wish to keep it as simple as possible.

答案1

得分: 0

以下是代码部分的翻译:

import s3fs
from io import BytesIO

df = ...
s3 = s3fs.S3FileSystem(key=access_key, secret=secret_key)

buffer = BytesIO()
s3_path = f's3://{bucket}/{key}'
df.to_parquet(buffer, engine='pyarrow', compression='snappy')

with s3.open(s3_path, 'wb') as result_file:
    result_file.write(buffer.getvalue())

请注意,代码部分已经保留在原文中,不需要翻译。

英文:

In case anyone else stumbles upon this: it didn't seem like there was any way to affect this credentials-caching mechanism in boto. As a result, I used s3fs directly and saved the parquet file to a buffer in memory.

This solution probably doesn't scale very well to very large amounts of data, but it worked for me:

import s3fs
from io import BytesIO

df = ...
s3 = s3fs.S3FileSystem(key=access_key, secret=secret_key)

buffer = BytesIO()
s3_path = f's3://{bucket}/{key}'
df.to_parquet(buffer, engine='pyarrow', compression='snappy')

with s3.open(s3_path, 'wb') as result_file:
    result_file.write(buffer.getvalue())

It would be much better to try and stream the file in chunks, but I'm unsure if the s3fs API allows for that. buffer.getvalue() was good enough for my purposes.

huangapple
  • 本文由 发表于 2023年3月15日 18:28:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/75743443.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定