加快加载多个CSV文件的速度

huangapple go评论95阅读模式
英文:

Speed up loading of multiple CSV files

问题

我看到你的代码和测试结果了。以下是你的代码部分的翻译:

# 这里是你的 Python 代码,包括各种函数和类的定义。

# 下面是测试结果的翻译:
print("Sequential:", timeit.timeit(lambda: import_data_sequential(), number=10))
print("Multi ThreadPool:", timeit.timeit(lambda: import_data_mp_tp(), number=10))
print("Multi Pool:", timeit.timeit(lambda: import_data_mp_p(), number=10))
print("Async:", timeit.timeit(lambda: import_data_async(), number=10))

测试结果表明,最基本的迭代文件和行的解决方案似乎也是最快的。你是否做了任何会导致导入变慢的错误?或者这样的时间差异是正常/预期的吗?

编辑 2023-08-15: 我意识到,由于我需要解析的所有文件都是 TSV(它们不包含值中的制表符),我仍然可以通过使用简单的 line.strip('\n').split('\t') 而不是 CSV 模块来加快解析速度,这可以再节省约40% 的运行时间。 加快加载多个CSV文件的速度 当我有了这个数据库的完整 API 时,我可能会发布一个 Gist。

英文:

I am trying to write a parser/API for the France's public drug database (https://base-donnees-publique.medicaments.gouv.fr/). It consists of eight CSV files (in fact TSV as they use tabs), each from a few kB to 4MB, the biggest having ~20000 lines (each line representing a drug with its name, codes, prices, etc).

As these files may chance periodically, I'd like to parse them directly instead of creating a cleaner database (as I would probably have to recreate it regularly anyway).

Importing these files took a little time (about one second), so I tried to speed it up a bit and did some benchmarking of several methods, and I was surprised to see that the most basic one seemed to also be the fastest.

Here is my test code (sorry it is quite long). Each file is associated with a dedicated class to parse its lines. Basically, these classes are namedtuples with a custom classmethod to parse dates, numbers, etc.

import pathlib
import enum
import datetime
from decimal import Decimal
from collections import namedtuple
import csv


def parse_date(date: str) -> datetime.datetime:
    return datetime.datetime.strptime(date, "%d/%m/%Y").date()


def parse_date_bis(date: str) -> datetime.datetime:
    return datetime.datetime.strptime(date, "%Y%m%d").date()


def parse_text(text):
    if not text:
        return ""
    return text.replace("<br>", "\n").strip()


def parse_list(raw):
    return raw.split(";")


def parse_price(price: str) -> Decimal:
    if not price:
        return None
    # Handles cases like "4,417,08".
    price = '.'.join(price.rsplit(",", 1)).replace(',', '')
    return Decimal(price)


def parse_percentage(raw: str) -> int:
    if not raw:
        return None
    return int(raw.replace("%", "").strip())


class StatutAdministratifPresentation(enum.Enum):
    ACTIVE = "Présentation active"
    ABROGEE = "Présentation abrogée"


class EtatCommercialisation(enum.Enum):
    DC = "Déclaration de commercialisation"
    S = "Déclaration de suspension de commercialisation"
    DAC = "Déclaration d'arrêt de commercialisation"
    AC = "Arrêt de commercialisation (le médicament n'a plus d'autorisation)"


class MotifAvisSMR(enum.Enum):
    INSCRIPTION = "Inscription (CT)"
    RENOUVELLEMENT = "Renouvellement d'inscription (CT)"
    EXT = "Extension d'indication"
    EXTNS = "Extension d'indication non sollicitée"
    REEV_SMR = "Réévaluation SMR"
    REEV_ASMR = "Réévaluation ASMR"
    REEV_SMR_ASMR = "Réévaluation SMR et ASMR"
    REEV_ETUDE = "Réévaluation suite à résultats étude post-inscript"
    REEV_SAISINE = "Réévaluation suite saisine Ministères (CT)"
    NOUV_EXAM = "Nouvel examen suite au dépôt de nouvelles données"
    MODIF_COND = "Modification des conditions d'inscription (CT)"
    AUTRE = "Autre demande"


class ImportanceSMR(enum.Enum):
    IMPORTANT = "Important"
    MODERE = "Modéré"
    FAIBLE = "Faible"
    INSUFFISANT = "Insuffisant"
    COMMENTAIRES = "Commentaires"
    NP = "Non précisé"


class ImportanceASMR(enum.Enum):
    COM = "Commentaires sans chiffrage de l'ASMR"
    I = "I"
    II = "II"
    III = "III"
    IV = "IV"
    V = "V"
    NP = "Non précisée"
    SO = "Sans objet"


class Specialite(namedtuple("Specialite", ("cis", "denomation", "forme", "voies_administration", "statut_amm", "type_amm", "commercialisation", "date_amm", "statut_bdm", "numero_autorisation_europeenne", "titulaire", "surveillance_renforcee"))):
    @classmethod
    def from_line(cls, line):
        line[2] = line[2].replace("  ", " ").strip()
        line[3] = parse_list(line[3])
        line[7] = parse_date(line[7])
        line[10] = line[10].strip()  # There are often leading spaces here (like ' OPELLA HEALTHCARE FRANCE').
        return cls(*line)


class Presentation(namedtuple("Specialite", ("cis", "cip7", "libelle", "statut", "commercialisation", "date_commercialisation", "cip13", "agrement_collectivites", "taux_remboursement", "prix", "prix_hors_honoraires", "montant_honoraires", "indications_remboursement"))):
    @classmethod
    def from_line(cls, line):
        if line[3] == "Présentation active":
            line[3] = StatutAdministratifPresentation.ACTIVE
        else:
            line[3] = StatutAdministratifPresentation.ABROGEE
        line[4] = {
            "Déclaration de commercialisation": EtatCommercialisation.DC,
            "Déclaration de suspension de commercialisation": EtatCommercialisation.S,
            "Déclaration d'arrêt de commercialisation": EtatCommercialisation.DAC,
            "Arrêt de commercialisation (le médicament n'a plus d'autorisation)": EtatCommercialisation.AC
        }.get(line[4])
        line[5] = parse_date(line[5])
        line[7] = True if line[7] == "oui" else False
        line[8] = parse_percentage(line[8])
        line[9] = parse_price(line[9])
        line[10] = parse_price(line[10])
        line[11] = parse_price(line[11])
        line[12] = parse_text(line[12])
        return cls(*line)


class Composition(namedtuple("Composition", ("cis", "element", "code", "substance", "dosage", "ref_dosage", "nature_composant", "cle"))):
    @classmethod
    def from_line(cls, line):
        line.pop(-1)
        return cls(*line)


class AvisSMR(namedtuple("AvisSMR", ("cis", "dossier_has", "motif", "date", "valeur", "libelle"))):
    @classmethod
    def from_line(cls, line):
        line[2] = MotifAvisSMR(line[2])
        line[3] = parse_date_bis(line[3])
        line[4] = ImportanceSMR(line[4])
        line[5] = parse_text(line[5])
        return cls(*line)


class AvisASMR(namedtuple("AvisASMR", ("cis", "dossier_has", "motif", "date", "valeur", "libelle"))):
    @classmethod
    def from_line(cls, line):
        line[2] = MotifAvisSMR(line[2])
        line[3] = parse_date_bis(line[3])
        line[4] = ImportanceASMR(line[4])
        line[5] = parse_text(line[5])
        return cls(*line)


class AvisCT(namedtuple("AvisCT", ("dossier_has", "lien"))):
    @classmethod
    def from_line(cls, line):
        return cls(*line)


FILE_MATCHES = {
    "CIS_bdpm.txt": Specialite,
    "CIS_CIP_bdpm.txt": Presentation,
    "CIS_COMPO_bdpm.txt": Composition,
    "CIS_HAS_ASMR_bdpm.txt": AvisASMR,
    "CIS_HAS_SMR_bdpm.txt": AvisSMR,
    "HAS_LiensPageCT_bdpm.txt": AvisCT
}


def sequential_import_file_data(filename, cls):
    result = {cls: []}
    with (pathlib.Path("data") / filename).open("r", encoding="latin1") as f:
        rows = csv.reader(f, delimiter="\t")
        for line in rows:
            data = cls.from_line(line)
            result[cls].append(data)
    return result


def import_data_sequential():
    results = []
    for filename, cls in FILE_MATCHES.items():
        results.append(sequential_import_file_data(filename, cls))


from multiprocessing.pool import ThreadPool

def import_data_mp_tp(n=2):
    pool = ThreadPool(n)
    results = []
    for filename, cls in FILE_MATCHES.items():
        results.append(pool.apply_async(
            sequential_import_file_data,
            (filename, cls)
        ))
    results = [r.get() for r in results]


from multiprocessing.pool import Pool

def import_data_mp_p(n=2):
    pool = Pool(n)
    results = []
    for filename, cls in FILE_MATCHES.items():
        results.append(pool.apply_async(
            sequential_import_file_data,
            (filename, cls)
        ))
    results = [r.get() for r in results]


import asyncio
import aiofiles
from aiocsv import AsyncReader

async def async_import_file_data(filename, cls):
    results = {cls: []}
    async with aiofiles.open(
        (pathlib.Path("data") / filename),
        mode="r",
        encoding="latin1"
    ) as afp:
        async for line in AsyncReader(afp, delimiter="\t"):
            data = cls.from_line(line)
            results[cls].append(data)
    return results


def import_data_async():
    results = []
    for filename, cls in FILE_MATCHES.items():
        results.append(asyncio.run(async_import_file_data(filename, cls)))


def main():
    import timeit
    print(
        "Sequential:",
        timeit.timeit(lambda: import_data_sequential(), number=10)
    )
    print(
        "Multi ThreadPool:",
        timeit.timeit(lambda: import_data_mp_tp(), number=10)
    )
    print(
        "Multi Pool:",
        timeit.timeit(lambda: import_data_mp_p(), number=10)
    )
    print(
        "Async:",
        timeit.timeit(lambda: import_data_async(), number=10)
    )


if __name__ == "__main__":
    main()

So when I run it, I get the following result.

Sequential: 9.821639589001279
Multi ThreadPool: 10.137484730999859
Multi Pool: 12.531487682997977
Async: 30.953154197999538

The most basic solution of iterating through all files and through all their lines seems to also be the fastest.

So, did I do anything wrong which would slow the import? Or is it normal/expected to have such time differences?

Edit 2023-08-15: I realized that since all the files I need to parse are TSV (and they don't contain tabs in their values), I could still speed up the parsing by using a simple line.strip('\n').split('\t') instead of the CSV module, which saves another 40% in runtime. 加快加载多个CSV文件的速度 I'll probably post a Gist when I have a complete API for this database.

答案1

得分: 3

像往常一样:在你的代码上运行性能分析器,看看它在哪里花费了时间。 (这是PyCharm的,它包装了标准库的cProfile。)

顺序执行:7.865187874995172

加快加载多个CSV文件的速度

嗯,好的。strptime,我可以告诉它将被datetime.datetime.strptime调用。另外,奇怪的是,getlocale...为什么我们需要在这里使用区域设置?通过点击调用图表,可以看到strptime实际上会查找当前的区域设置,并有一堆锁和所有这些东西 - 如果我们用自己的实现替换这些parse_date会怎么样?

def parse_date(date: str) -> datetime.date:
    d, m, y = (int(x) for x in date.split("/", 2))
    return datetime.date(2000 + y, m, d)


def parse_date_bis(date: str) -> datetime.datetime:
    y = int(date[:4])
    m = int(date[4:6])
    d = int(date[6:8])
    return datetime.datetime(y, m, d)

顺序执行:3.8978060420195106

好的,我们做到了!右边有52%的改进!

加快加载多个CSV文件的速度

(在这里的截图上没有显示,因为我当时傻乎乎地剪裁了它,但strptime在底层使用的re东西也几乎没有了。)

现在让我们假设会有很多相同的日期,并在那些热门的parse_date_*函数上添加@lru_cache(maxsize=None)(内存弹性,无限制的缓存),运行代码并打印出缓存信息:

顺序执行3.2240814580000006
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)

对我来说看起来不错,我们在最后一个数字上又减少了15%。

parse_price显然也可以使用缓存:

加快加载多个CSV文件的速度

顺序执行2.928746833000332
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)
CacheInfo(hits=622064, misses=4096, maxsize=None, currsize=4096)

嘿,谁知道,在数据中只有4096个单独的价格字符串。

如果你有足够的内存,其他剩余的解析函数也可以使用缓存,但经过一点性能分析和解析工作,现在的速度提高了2.7倍[当运行每个东西10次时,这意味着这些缓存将会被频繁使用 - 单次运行的速度提升并不那么显著],而不需要并行处理。魔法!

为了让竞技场更加公平,这里是一个hyperfine基准测试,其中每次导入都会从头开始启动Python解释器(每个解释器只运行一次导入):

$ hyperfine 'python3 so76781391-orig.py' 'python3 so76781391-opt.py' --warmup 5 --min-benchmarking-time 10
Benchmark 1: python3 so76781391-orig.py
  Time (mean ± σ):     363.0 ms ±   2.7 ms    [User: 340.8 ms, System: 20.7 ms]
  Range (min … max):   358.9 ms … 367.9 ms    27 runs

Benchmark 2: python3 so76781391-opt.py
  Time (mean ± σ):     234.1 ms ±   2.5 ms    [User: 215.6 ms, System: 17.0 ms]
  Range (min … max):   228.2 ms … 238.5 ms    42 runs

Summary
  'python3 so76781391-opt.py' ran
    1.55 ± 0.02 times faster than 'python3 so76781391-orig.py'

所以,在查看性能分析器的情况下(以及一些其他优化,比如不在from_line函数内创建映射字典等等),速度提高了55%。

英文:

As usual: run a profiler on your code to see where it's spending its time. (This is PyCharm's, it wraps the stdlib cProfile.)

> Sequential: 7.865187874995172

加快加载多个CSV文件的速度

Huh, okay. strptime, I can tell that'd get called by datetime.datetime.strptime. Also, weird, getlocale... why do we need locales there? Clicking through to the call graph shows that strptime actually looks up the current locale, and has a bunch of locks and all that – what if we replace those parse_dates with our own implementations?

def parse_date(date: str) -> datetime.date:
d, m, y = (int(x) for x in date.split("/", 2))
return datetime.date(2000 + y, m, d)
def parse_date_bis(date: str) -> datetime.datetime:
y = int(date[:4])
m = int(date[4:6])
d = int(date[6:8])
return datetime.datetime(y, m, d)

> Sequential: 3.8978060420195106

Okay, we're cooking! 52% improvement right there!

加快加载多个CSV文件的速度

(It doesn't show up on the screenshot here because I was a silly goose cropping it, but the re stuff that strptime uses under the hood dropped right off too.)

Now let's work with the assumption that there'll be a lot of the same date and slap @lru_cache(maxsize=None)s (RAM flex there, unbounded caches) on those hot parse_date_* functions, run the code and also print out the cache info:

Sequential: 3.2240814580000006
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)

Looks pretty good to me, we got another 15% off the last number.

parse_price could apparently use a cache too, though:

加快加载多个CSV文件的速度

Sequential: 2.928746833000332
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)
CacheInfo(hits=622064, misses=4096, maxsize=None, currsize=4096)

Hey, who knew, there was only 4096 individual price strings in the data.

The remaining parsing functions could use a cache too if you have the memory, but with a little bit of profiling and parsing elbow grease, it's now 2.7x faster [when running everything 10 times, which means those caches will be hot – a single run's speedup isn't as dramatic], with no parallel processing required. Magic!

And just so the playing ground is a bit more even, here's a hyperfine benchmark where the Python interpreter is started from scratch for each import (and each interpreter runs the import only once):

$ hyperfine 'python3 so76781391-orig.py' 'python3 so76781391-opt.py' --warmup 5 --min-benchmarking-time 10
Benchmark 1: python3 so76781391-orig.py
Time (mean ± σ):     363.0 ms ±   2.7 ms    [User: 340.8 ms, System: 20.7 ms]
Range (min … max):   358.9 ms … 367.9 ms    27 runs
Benchmark 2: python3 so76781391-opt.py
Time (mean ± σ):     234.1 ms ±   2.5 ms    [User: 215.6 ms, System: 17.0 ms]
Range (min … max):   228.2 ms … 238.5 ms    42 runs
Summary
'python3 so76781391-opt.py' ran
1.55 ± 0.02 times faster than 'python3 so76781391-orig.py'

so, 55% concrete speed boost with a quick look at the profiler (and a couple of additional optimizations such as not creating mapping dicts within from_line functions, etc., etc.).

huangapple
  • 本文由 发表于 2023年7月27日 23:39:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76781391.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定