英文:
Using multiple sets of credentials in to_parquet when transfering to s3 using pandas
问题
以下是您要翻译的部分:
"I have a pipeline that does some data processing and then transfers parquet files to s3.
I wanted to push this data directly to s3 without saving it locally, so I thought the best way for that would just be to use an s3
URI in the to_parquet
call, as follows:
@task(name='upload_parquet', retries=2, retry_delay_seconds=2)
def upload_to_s3(df: pd.DataFrame, bucket: str, key: str):
bucket = bucket if not bucket.endswith('/') else bucket[:-1]
access_key, secret_key = get_s3_credentials(bucket)
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key.get_secret_value()
df.to_parquet(f's3://{bucket}/{key}', engine='pyarrow', compression='snappy')
del os.environ['AWS_ACCESS_KEY_ID']
del os.environ['AWS_SECRET_ACCESS_KEY']
This works for the first bucket. But, as soon as the second bucket (and second set of credentials) is used, I receive an AccessDenied
error.
My current guess is that boto3
might possibly cache the credentials at some level.
What's the recommended way of dealing with this situation? I wish to keep it as simple as possible."
英文:
I have a pipeline that does some data processing and then transfers parquet files to s3.
I wanted to push this data directly to s3 without saving it locally, so I thought the best way for that would just be to use an s3
URI in the to_parquet
call, as follows:
@task(name='upload_parquet', retries=2, retry_delay_seconds=2)
def upload_to_s3(df: pd.DataFrame, bucket: str, key: str):
bucket = bucket if not bucket.endswith('/') else bucket[:-1]
access_key, secret_key = get_s3_credentials(bucket)
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key.get_secret_value()
df.to_parquet(f's3://{bucket}/{key}', engine='pyarrow', compression='snappy')
del os.environ['AWS_ACCESS_KEY_ID']
del os.environ['AWS_SECRET_ACCESS_KEY']
This works for the first bucket. But, as soon as the second bucket (and second set of credentials) is used, I receive an AccessDenied
error.
My current guess is that boto3
might possibly cache the credentials at some level.
What's the recommended way of dealing with this situation? I wish to keep it as simple as possible.
答案1
得分: 0
以下是代码部分的翻译:
import s3fs
from io import BytesIO
df = ...
s3 = s3fs.S3FileSystem(key=access_key, secret=secret_key)
buffer = BytesIO()
s3_path = f's3://{bucket}/{key}'
df.to_parquet(buffer, engine='pyarrow', compression='snappy')
with s3.open(s3_path, 'wb') as result_file:
result_file.write(buffer.getvalue())
请注意,代码部分已经保留在原文中,不需要翻译。
英文:
In case anyone else stumbles upon this: it didn't seem like there was any way to affect this credentials-caching mechanism in boto. As a result, I used s3fs
directly and saved the parquet file to a buffer in memory.
This solution probably doesn't scale very well to very large amounts of data, but it worked for me:
import s3fs
from io import BytesIO
df = ...
s3 = s3fs.S3FileSystem(key=access_key, secret=secret_key)
buffer = BytesIO()
s3_path = f's3://{bucket}/{key}'
df.to_parquet(buffer, engine='pyarrow', compression='snappy')
with s3.open(s3_path, 'wb') as result_file:
result_file.write(buffer.getvalue())
It would be much better to try and stream the file in chunks, but I'm unsure if the s3fs API allows for that. buffer.getvalue()
was good enough for my purposes.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论