英文:
Airflow Sensor incomplete files list
问题
我真的很好奇要知道,当文件没有完全复制/加载到目标数据存储时,空气流传感器是如何工作的。例如:我们有一个文件系统,传感器检查其中的文件。我们正在将一个大文件复制到文件夹中,这确实需要一些时间。空气流传感器会消耗不完整的文件还是会等待文件完全加载?
我真的很期待一个答案,但没有找到类似的信息。
英文:
I'm really curious to know, how airflow sensors work when a file is not completely copied/loaded to a target data storage. As an example: we have a filesystem, and sensor checks files within it. We are copying a large file to folder and it really takes some time. Will airflow sensor consume incomplete file or it will wait for the file to be fully loaded?
I'm really looking for an answer and haven't found anything similar
答案1
得分: 0
你可以在FileSensor的代码中找到答案。
答案是操作系统负责答案。在我下载一个大文件的测试中,例如,"Mac"在下载文件时返回True。
这个想法是,在复制一个大文件时,给它一个临时扩展名,然后在复制完成后将其替换为真实的扩展名。
def poke(self, context: Context):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = os.path.join(basepath, self.filepath)
self.log.info("Poking for file %s", full_path)
for path in glob(full_path, recursive=self.recursive):
if os.path.isfile(path):
mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y%m%d%H%M%S")
self.log.info("Found File %s last modified: %s", str(path), mod_time)
return True
for _, _, files in os.walk(path):
if len(files) > 0:
return True
return False
英文:
you can see inside the code of FileSensor the answer.
The answer is that the operation system is responsible for the answer. in a test I made while downloading a big file "Mac" for example returned True while downloading the file
the idea is while you copy a big file give it a temporary extension and replace it to the real one after copy finished
def poke(self, context: Context):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = os.path.join(basepath, self.filepath)
self.log.info("Poking for file %s", full_path)
for path in glob(full_path, recursive=self.recursive):
if os.path.isfile(path):
mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y%m%d%H%M%S")
self.log.info("Found File %s last modified: %s", str(path), mod_time)
return True
for _, _, files in os.walk(path):
if len(files) > 0:
return True
return False
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论