Airflow传感器不完整文件列表

huangapple go评论74阅读模式
英文:

Airflow Sensor incomplete files list

问题

我真的很好奇要知道,当文件没有完全复制/加载到目标数据存储时,空气流传感器是如何工作的。例如:我们有一个文件系统,传感器检查其中的文件。我们正在将一个大文件复制到文件夹中,这确实需要一些时间。空气流传感器会消耗不完整的文件还是会等待文件完全加载?

我真的很期待一个答案,但没有找到类似的信息。

英文:

I'm really curious to know, how airflow sensors work when a file is not completely copied/loaded to a target data storage. As an example: we have a filesystem, and sensor checks files within it. We are copying a large file to folder and it really takes some time. Will airflow sensor consume incomplete file or it will wait for the file to be fully loaded?

I'm really looking for an answer and haven't found anything similar

答案1

得分: 0

你可以在FileSensor的代码中找到答案。

答案是操作系统负责答案。在我下载一个大文件的测试中,例如,"Mac"在下载文件时返回True。

这个想法是,在复制一个大文件时,给它一个临时扩展名,然后在复制完成后将其替换为真实的扩展名。

def poke(self, context: Context):
    hook = FSHook(self.fs_conn_id)
    basepath = hook.get_path()
    full_path = os.path.join(basepath, self.filepath)
    self.log.info("Poking for file %s", full_path)

    for path in glob(full_path, recursive=self.recursive):
        if os.path.isfile(path):
            mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y%m%d%H%M%S")
            self.log.info("Found File %s last modified: %s", str(path), mod_time)
            return True

        for _, _, files in os.walk(path):
            if len(files) > 0:
                return True
    return False
英文:

you can see inside the code of FileSensor the answer.

The answer is that the operation system is responsible for the answer. in a test I made while downloading a big file "Mac" for example returned True while downloading the file

the idea is while you copy a big file give it a temporary extension and replace it to the real one after copy finished

 def poke(self, context: Context):
    hook = FSHook(self.fs_conn_id)
    basepath = hook.get_path()
    full_path = os.path.join(basepath, self.filepath)
    self.log.info("Poking for file %s", full_path)

    for path in glob(full_path, recursive=self.recursive):
        if os.path.isfile(path):
            mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y%m%d%H%M%S")
            self.log.info("Found File %s last modified: %s", str(path), mod_time)
            return True

        for _, _, files in os.walk(path):
            if len(files) > 0:
                return True
    return False

huangapple
  • 本文由 发表于 2023年6月15日 05:22:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76477629.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定