英文:
Speed up loading of multiple CSV files
问题
我看到你的代码和测试结果了。以下是你的代码部分的翻译:
# 这里是你的 Python 代码,包括各种函数和类的定义。
# 下面是测试结果的翻译:
print("Sequential:", timeit.timeit(lambda: import_data_sequential(), number=10))
print("Multi ThreadPool:", timeit.timeit(lambda: import_data_mp_tp(), number=10))
print("Multi Pool:", timeit.timeit(lambda: import_data_mp_p(), number=10))
print("Async:", timeit.timeit(lambda: import_data_async(), number=10))
测试结果表明,最基本的迭代文件和行的解决方案似乎也是最快的。你是否做了任何会导致导入变慢的错误?或者这样的时间差异是正常/预期的吗?
编辑 2023-08-15: 我意识到,由于我需要解析的所有文件都是 TSV(它们不包含值中的制表符),我仍然可以通过使用简单的 line.strip('\n').split('\t')
而不是 CSV 模块来加快解析速度,这可以再节省约40% 的运行时间。 当我有了这个数据库的完整 API 时,我可能会发布一个 Gist。
英文:
I am trying to write a parser/API for the France's public drug database (https://base-donnees-publique.medicaments.gouv.fr/). It consists of eight CSV files (in fact TSV as they use tabs), each from a few kB to 4MB, the biggest having ~20000 lines (each line representing a drug with its name, codes, prices, etc).
As these files may chance periodically, I'd like to parse them directly instead of creating a cleaner database (as I would probably have to recreate it regularly anyway).
Importing these files took a little time (about one second), so I tried to speed it up a bit and did some benchmarking of several methods, and I was surprised to see that the most basic one seemed to also be the fastest.
Here is my test code (sorry it is quite long). Each file is associated with a dedicated class to parse its lines. Basically, these classes are namedtuples with a custom classmethod to parse dates, numbers, etc.
import pathlib
import enum
import datetime
from decimal import Decimal
from collections import namedtuple
import csv
def parse_date(date: str) -> datetime.datetime:
return datetime.datetime.strptime(date, "%d/%m/%Y").date()
def parse_date_bis(date: str) -> datetime.datetime:
return datetime.datetime.strptime(date, "%Y%m%d").date()
def parse_text(text):
if not text:
return ""
return text.replace("<br>", "\n").strip()
def parse_list(raw):
return raw.split(";")
def parse_price(price: str) -> Decimal:
if not price:
return None
# Handles cases like "4,417,08".
price = '.'.join(price.rsplit(",", 1)).replace(',', '')
return Decimal(price)
def parse_percentage(raw: str) -> int:
if not raw:
return None
return int(raw.replace("%", "").strip())
class StatutAdministratifPresentation(enum.Enum):
ACTIVE = "Présentation active"
ABROGEE = "Présentation abrogée"
class EtatCommercialisation(enum.Enum):
DC = "Déclaration de commercialisation"
S = "Déclaration de suspension de commercialisation"
DAC = "Déclaration d'arrêt de commercialisation"
AC = "Arrêt de commercialisation (le médicament n'a plus d'autorisation)"
class MotifAvisSMR(enum.Enum):
INSCRIPTION = "Inscription (CT)"
RENOUVELLEMENT = "Renouvellement d'inscription (CT)"
EXT = "Extension d'indication"
EXTNS = "Extension d'indication non sollicitée"
REEV_SMR = "Réévaluation SMR"
REEV_ASMR = "Réévaluation ASMR"
REEV_SMR_ASMR = "Réévaluation SMR et ASMR"
REEV_ETUDE = "Réévaluation suite à résultats étude post-inscript"
REEV_SAISINE = "Réévaluation suite saisine Ministères (CT)"
NOUV_EXAM = "Nouvel examen suite au dépôt de nouvelles données"
MODIF_COND = "Modification des conditions d'inscription (CT)"
AUTRE = "Autre demande"
class ImportanceSMR(enum.Enum):
IMPORTANT = "Important"
MODERE = "Modéré"
FAIBLE = "Faible"
INSUFFISANT = "Insuffisant"
COMMENTAIRES = "Commentaires"
NP = "Non précisé"
class ImportanceASMR(enum.Enum):
COM = "Commentaires sans chiffrage de l'ASMR"
I = "I"
II = "II"
III = "III"
IV = "IV"
V = "V"
NP = "Non précisée"
SO = "Sans objet"
class Specialite(namedtuple("Specialite", ("cis", "denomation", "forme", "voies_administration", "statut_amm", "type_amm", "commercialisation", "date_amm", "statut_bdm", "numero_autorisation_europeenne", "titulaire", "surveillance_renforcee"))):
@classmethod
def from_line(cls, line):
line[2] = line[2].replace(" ", " ").strip()
line[3] = parse_list(line[3])
line[7] = parse_date(line[7])
line[10] = line[10].strip() # There are often leading spaces here (like ' OPELLA HEALTHCARE FRANCE').
return cls(*line)
class Presentation(namedtuple("Specialite", ("cis", "cip7", "libelle", "statut", "commercialisation", "date_commercialisation", "cip13", "agrement_collectivites", "taux_remboursement", "prix", "prix_hors_honoraires", "montant_honoraires", "indications_remboursement"))):
@classmethod
def from_line(cls, line):
if line[3] == "Présentation active":
line[3] = StatutAdministratifPresentation.ACTIVE
else:
line[3] = StatutAdministratifPresentation.ABROGEE
line[4] = {
"Déclaration de commercialisation": EtatCommercialisation.DC,
"Déclaration de suspension de commercialisation": EtatCommercialisation.S,
"Déclaration d'arrêt de commercialisation": EtatCommercialisation.DAC,
"Arrêt de commercialisation (le médicament n'a plus d'autorisation)": EtatCommercialisation.AC
}.get(line[4])
line[5] = parse_date(line[5])
line[7] = True if line[7] == "oui" else False
line[8] = parse_percentage(line[8])
line[9] = parse_price(line[9])
line[10] = parse_price(line[10])
line[11] = parse_price(line[11])
line[12] = parse_text(line[12])
return cls(*line)
class Composition(namedtuple("Composition", ("cis", "element", "code", "substance", "dosage", "ref_dosage", "nature_composant", "cle"))):
@classmethod
def from_line(cls, line):
line.pop(-1)
return cls(*line)
class AvisSMR(namedtuple("AvisSMR", ("cis", "dossier_has", "motif", "date", "valeur", "libelle"))):
@classmethod
def from_line(cls, line):
line[2] = MotifAvisSMR(line[2])
line[3] = parse_date_bis(line[3])
line[4] = ImportanceSMR(line[4])
line[5] = parse_text(line[5])
return cls(*line)
class AvisASMR(namedtuple("AvisASMR", ("cis", "dossier_has", "motif", "date", "valeur", "libelle"))):
@classmethod
def from_line(cls, line):
line[2] = MotifAvisSMR(line[2])
line[3] = parse_date_bis(line[3])
line[4] = ImportanceASMR(line[4])
line[5] = parse_text(line[5])
return cls(*line)
class AvisCT(namedtuple("AvisCT", ("dossier_has", "lien"))):
@classmethod
def from_line(cls, line):
return cls(*line)
FILE_MATCHES = {
"CIS_bdpm.txt": Specialite,
"CIS_CIP_bdpm.txt": Presentation,
"CIS_COMPO_bdpm.txt": Composition,
"CIS_HAS_ASMR_bdpm.txt": AvisASMR,
"CIS_HAS_SMR_bdpm.txt": AvisSMR,
"HAS_LiensPageCT_bdpm.txt": AvisCT
}
def sequential_import_file_data(filename, cls):
result = {cls: []}
with (pathlib.Path("data") / filename).open("r", encoding="latin1") as f:
rows = csv.reader(f, delimiter="\t")
for line in rows:
data = cls.from_line(line)
result[cls].append(data)
return result
def import_data_sequential():
results = []
for filename, cls in FILE_MATCHES.items():
results.append(sequential_import_file_data(filename, cls))
from multiprocessing.pool import ThreadPool
def import_data_mp_tp(n=2):
pool = ThreadPool(n)
results = []
for filename, cls in FILE_MATCHES.items():
results.append(pool.apply_async(
sequential_import_file_data,
(filename, cls)
))
results = [r.get() for r in results]
from multiprocessing.pool import Pool
def import_data_mp_p(n=2):
pool = Pool(n)
results = []
for filename, cls in FILE_MATCHES.items():
results.append(pool.apply_async(
sequential_import_file_data,
(filename, cls)
))
results = [r.get() for r in results]
import asyncio
import aiofiles
from aiocsv import AsyncReader
async def async_import_file_data(filename, cls):
results = {cls: []}
async with aiofiles.open(
(pathlib.Path("data") / filename),
mode="r",
encoding="latin1"
) as afp:
async for line in AsyncReader(afp, delimiter="\t"):
data = cls.from_line(line)
results[cls].append(data)
return results
def import_data_async():
results = []
for filename, cls in FILE_MATCHES.items():
results.append(asyncio.run(async_import_file_data(filename, cls)))
def main():
import timeit
print(
"Sequential:",
timeit.timeit(lambda: import_data_sequential(), number=10)
)
print(
"Multi ThreadPool:",
timeit.timeit(lambda: import_data_mp_tp(), number=10)
)
print(
"Multi Pool:",
timeit.timeit(lambda: import_data_mp_p(), number=10)
)
print(
"Async:",
timeit.timeit(lambda: import_data_async(), number=10)
)
if __name__ == "__main__":
main()
So when I run it, I get the following result.
Sequential: 9.821639589001279
Multi ThreadPool: 10.137484730999859
Multi Pool: 12.531487682997977
Async: 30.953154197999538
The most basic solution of iterating through all files and through all their lines seems to also be the fastest.
So, did I do anything wrong which would slow the import? Or is it normal/expected to have such time differences?
Edit 2023-08-15: I realized that since all the files I need to parse are TSV (and they don't contain tabs in their values), I could still speed up the parsing by using a simple line.strip('\n').split('\t')
instead of the CSV module, which saves another 40% in runtime. I'll probably post a Gist when I have a complete API for this database.
答案1
得分: 3
像往常一样:在你的代码上运行性能分析器,看看它在哪里花费了时间。 (这是PyCharm的,它包装了标准库的cProfile
。)
顺序执行:7.865187874995172
嗯,好的。strptime
,我可以告诉它将被datetime.datetime.strptime
调用。另外,奇怪的是,getlocale
...为什么我们需要在这里使用区域设置?通过点击调用图表,可以看到strptime
实际上会查找当前的区域设置,并有一堆锁和所有这些东西 - 如果我们用自己的实现替换这些parse_date
会怎么样?
def parse_date(date: str) -> datetime.date:
d, m, y = (int(x) for x in date.split("/", 2))
return datetime.date(2000 + y, m, d)
def parse_date_bis(date: str) -> datetime.datetime:
y = int(date[:4])
m = int(date[4:6])
d = int(date[6:8])
return datetime.datetime(y, m, d)
顺序执行:3.8978060420195106
好的,我们做到了!右边有52%的改进!
(在这里的截图上没有显示,因为我当时傻乎乎地剪裁了它,但strptime
在底层使用的re
东西也几乎没有了。)
现在让我们假设会有很多相同的日期,并在那些热门的parse_date_*
函数上添加@lru_cache(maxsize=None)
(内存弹性,无限制的缓存),运行代码并打印出缓存信息:
顺序执行:3.2240814580000006
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)
对我来说看起来不错,我们在最后一个数字上又减少了15%。
parse_price
显然也可以使用缓存:
顺序执行:2.928746833000332
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)
CacheInfo(hits=622064, misses=4096, maxsize=None, currsize=4096)
嘿,谁知道,在数据中只有4096个单独的价格字符串。
如果你有足够的内存,其他剩余的解析函数也可以使用缓存,但经过一点性能分析和解析工作,现在的速度提高了2.7倍[当运行每个东西10次时,这意味着这些缓存将会被频繁使用 - 单次运行的速度提升并不那么显著],而不需要并行处理。魔法!
为了让竞技场更加公平,这里是一个hyperfine
基准测试,其中每次导入都会从头开始启动Python解释器(每个解释器只运行一次导入):
$ hyperfine 'python3 so76781391-orig.py' 'python3 so76781391-opt.py' --warmup 5 --min-benchmarking-time 10
Benchmark 1: python3 so76781391-orig.py
Time (mean ± σ): 363.0 ms ± 2.7 ms [User: 340.8 ms, System: 20.7 ms]
Range (min … max): 358.9 ms … 367.9 ms 27 runs
Benchmark 2: python3 so76781391-opt.py
Time (mean ± σ): 234.1 ms ± 2.5 ms [User: 215.6 ms, System: 17.0 ms]
Range (min … max): 228.2 ms … 238.5 ms 42 runs
Summary
'python3 so76781391-opt.py' ran
1.55 ± 0.02 times faster than 'python3 so76781391-orig.py'
所以,在查看性能分析器的情况下(以及一些其他优化,比如不在from_line
函数内创建映射字典等等),速度提高了55%。
英文:
As usual: run a profiler on your code to see where it's spending its time. (This is PyCharm's, it wraps the stdlib cProfile
.)
> Sequential: 7.865187874995172
Huh, okay. strptime
, I can tell that'd get called by datetime.datetime.strptime
. Also, weird, getlocale
... why do we need locales there? Clicking through to the call graph shows that strptime
actually looks up the current locale, and has a bunch of locks and all that – what if we replace those parse_date
s with our own implementations?
def parse_date(date: str) -> datetime.date:
d, m, y = (int(x) for x in date.split("/", 2))
return datetime.date(2000 + y, m, d)
def parse_date_bis(date: str) -> datetime.datetime:
y = int(date[:4])
m = int(date[4:6])
d = int(date[6:8])
return datetime.datetime(y, m, d)
> Sequential: 3.8978060420195106
Okay, we're cooking! 52% improvement right there!
(It doesn't show up on the screenshot here because I was a silly goose cropping it, but the re
stuff that strptime
uses under the hood dropped right off too.)
Now let's work with the assumption that there'll be a lot of the same date and slap @lru_cache(maxsize=None)
s (RAM flex there, unbounded caches) on those hot parse_date_*
functions, run the code and also print out the cache info:
Sequential: 3.2240814580000006
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)
Looks pretty good to me, we got another 15% off the last number.
parse_price
could apparently use a cache too, though:
Sequential: 2.928746833000332
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)
CacheInfo(hits=622064, misses=4096, maxsize=None, currsize=4096)
Hey, who knew, there was only 4096 individual price strings in the data.
The remaining parsing functions could use a cache too if you have the memory, but with a little bit of profiling and parsing elbow grease, it's now 2.7x faster [when running everything 10 times, which means those caches will be hot – a single run's speedup isn't as dramatic], with no parallel processing required. Magic!
And just so the playing ground is a bit more even, here's a hyperfine
benchmark where the Python interpreter is started from scratch for each import (and each interpreter runs the import only once):
$ hyperfine 'python3 so76781391-orig.py' 'python3 so76781391-opt.py' --warmup 5 --min-benchmarking-time 10
Benchmark 1: python3 so76781391-orig.py
Time (mean ± σ): 363.0 ms ± 2.7 ms [User: 340.8 ms, System: 20.7 ms]
Range (min … max): 358.9 ms … 367.9 ms 27 runs
Benchmark 2: python3 so76781391-opt.py
Time (mean ± σ): 234.1 ms ± 2.5 ms [User: 215.6 ms, System: 17.0 ms]
Range (min … max): 228.2 ms … 238.5 ms 42 runs
Summary
'python3 so76781391-opt.py' ran
1.55 ± 0.02 times faster than 'python3 so76781391-orig.py'
so, 55% concrete speed boost with a quick look at the profiler (and a couple of additional optimizations such as not creating mapping dicts within from_line
functions, etc., etc.).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论