英文:
How can I improve my Python code for classifying intermittent signals in a timeseries?
问题
Classifying intermittent signals in the timeseries - is there a better way to write this in Python?
问题:传感器产生的信号可能是间歇性的,比如一段时间是0.01,下一段时间是0,接下来又是0.01。这是设计上的正常现象。
分析的目标是检测信号,尽管它是间歇性的,基本上忽略可能存在的间隔。分析不是实时进行的,意味着可以接受向前查看。假设要忽略最多两个周期的间隔,则结果如下。
信号 | 检测 |
---|---|
0 | FALSE |
0.01 | TRUE |
0.036 | TRUE |
0 | TRUE |
0.2 | TRUE |
0 | FALSE |
0 | FALSE |
0 | FALSE |
0 | FALSE |
0.5 | TRUE |
0 | TRUE |
0 | TRUE |
0.1 | TRUE |
0.0 | FALSE |
0.0 | FALSE |
0.0 | FALSE |
解决方案:由于编程技能仍处于初学者水平,编写了以下函数。函数后面的代码包含了演示。该函数的不足之处在于,它总是会延长忽略的间隔的检测时间,实际上不会向前查找以检查间隔内是否有信号。
#%%
from IPython.display import display
import pandas as pd
#%%
def find_continuous(df, threshold, max_gap):
# df - 包含数据的系列
# threshold - 最小值(包括在内,用于检测)
# max_gap - 上一个值 >= 阈值后继续包含信号的时间周期数
i = 0
min_value = threshold
currentlyPriming = False
primeTimes = []
PrimeTrue = []
Prime2 = []
distance = 0
distance_to_check = 0
distance_checked = 1
while i < len(df):
print('元素等于', df.iloc[i], ',索引为', df.index[i], ',当前i为', i)
if df.iloc[i] < min_value and len(PrimeTrue) > 0:
currentlyPriming = False
print ('当前正在启动设置为False')
print('PrimeTrue列表中的最后一个索引元素是', PrimeTrue[-1])
if max_gap == 0:
print('最大间隔为零')
elif max_gap == 1 and df.index[i] - PrimeTrue[-1] == 1:
print('最大间隔为1且此元素是正值之后的下一个')
primeTimes.append(df.index[i])
elif max_gap >= 2:
try:
distance = (Prime2[-1] - PrimeTrue[-1])
distance_to_check = max(max_gap - distance, 0)
print('Prime2列表中的最后一个索引元素是', Prime2[-1])
except:
print('Prime2尚未初始化,第一个聚类检测')
distance = 88888
distance_to_check = max(max_gap - 1, 0)
print('距离为', distance, ',要检查的距离为', distance_to_check)
if distance_to_check > 0:
primeTimes.append(df.index[i])
Prime2.append(df.index[i])
distance_checked += 1
print('已检查的距离为', distance_checked)
elif distance_to_check == 0:
distance_checked = 1
elif df.iloc[i] < min_value and len(PrimeTrue) == 0:
currentlyPriming = False
print('元素小于最小值且尚未找到大于最小值的元素')
elif df.iloc[i] >= min_value:
PrimeTrue.append(df.index[i])
if currentlyPriming:
primeTimes.append(df.index[i])
print('部分d,正在启动,当前正在启动', currentlyPriming)
elif not currentlyPriming:
primeTimes.append(df.index[i])
currentlyPriming = True
print('部分f,正在启动,当前正在启动', currentlyPriming )
i += 1
return primeTimes
# %%
if __name__ == "__main__":
values = [0.05, 0, 0, 0, 0, 0.037037037, 0, 0, 0, 0.035714286, 0, 0.05, 0, 0, 0, 0, 0, 0, 0, 0.025677, 0, 0.05, 0, 0, 0, 0.04, 0, 0.031037037, 0, 0, 0, 0, 0, 0.04, 0, 0, 0, 0.074074074, 0, 0.032258065, 0, 0, 0, 0.001, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.060606061, 0, 0, 0, 0.060606061, 0, 0, 0, 0, 0, 0, 0, 0, 0]
v1 = pd.DataFrame(data=values, index=None, columns=['values'])
list2 = []
list2 = find_continuous(v1['values'], 0.035, 2)
for k in range(len(list2)):
print(k)
v1.at[list2[k], 'cluster'] = list2[k]
with pd.option_context("display.max_rows", v1.shape[0]):
display(v1)
问题:有没有更好的方法来编写这个Python代码,高级Python开发人员会如何编写?
谢谢!
英文:
Classifying intermittent signals in the timeseries - is there a better way to write this in Python?
Problem: a sensor produces a signal which can be intermittent, say one period it is 0.01, next period it is 0 and the following period it is 0.01 again. This is normal by design.
The aim of the analysis is to detect the signal despite its intermittent nature - essentially ignoring gaps that may be present. Analysis is done not in real time, meaning that look ahead is acceptable. Let's say if the gaps of maximum of two periods are to be ignored then the results would be as follows.
signal | detection |
---|---|
0 | FALSE |
0.01 | TRUE |
0.036 | TRUE |
0 | TRUE |
0.2 | TRUE |
0 | FALSE |
0 | FALSE |
0 | FALSE |
0 | FALSE |
0.5 | TRUE |
0 | TRUE |
0 | TRUE |
0.1 | TRUE |
0.0 | FALSE |
0.0 | FALSE |
0.0 | FALSE |
Solution: since programming skills are still at a beginner level, the following function was written. The code after the function contains a demonstration. The deficiency of the function is that it always lengthens the detection by the gap which it ignores, essentially is does not look ahead to check if there is a signal anywhere within the gap.
#%%
from IPython.display import display
import pandas as pd
#%%
def find_continuous(df,threshold,max_gap):
#df - series containing data
#threshold - minimum value (inclusive to detect)
#max_gap - number of time periods after the last value >= threshold to be considered as still containing the signal
i = 0
min_value=threshold
currentlyPriming = False
primeTimes = []
PrimeTrue=[]
Prime2=[]
distance=0
distance_to_check=0
distance_checked=1
while i < (len(df)):
print('element equals ',df.iloc[i],', index is ',df.index[i],', current i is ',i)
if df.iloc[i] < min_value and len(PrimeTrue)>0:
currentlyPriming=False
print ('currently priming set to False')
print('last index element in primeTrue list is ',PrimeTrue[-1])
if max_gap==0:
print('max gap is at zero')
elif max_gap==1 and df.index[i]-PrimeTrue[-1]==1:
print('max gap is 1 and this element is next after positive')
primeTimes.append(df.index[i])
elif max_gap>=2:
try:
distance=(Prime2[-1]-PrimeTrue[-1])
distance_to_check=max(max_gap-distance,0)
print('last index element in Prime2 list is ',Prime2[-1])
except:
print('Prime2 has not been initiated, first clustering detection')
distance=88888
distance_to_check=max(max_gap-1,0)
print('distance is ',distance,' distance to check is ', distance_to_check )
if distance_to_check>0:
primeTimes.append(df.index[i])
Prime2.append(df.index[i])
distance_checked+=1
print('distance checked is ',distance_checked)
elif distance_to_check==0:
distance_checked=1
elif df.iloc[i] < min_value and len(PrimeTrue)==0:
currentlyPriming=False
print('element is less than minimum value and element greater than minimum value was not found yet')
elif df.iloc[i] >= min_value:
PrimeTrue.append(df.index[i])
if currentlyPriming:
primeTimes.append(df.index[i])
print('section d, priming is ',currentlyPriming )
elif not currentlyPriming:
primeTimes.append(df.index[i])
currentlyPriming = True
print('section f, priming is ',currentlyPriming )
i += 1
return primeTimes
# %%
if __name__ == "__main__":
values=[0.05,0,0,0,0,0.037037037,0,0,0,0.035714286,0,0.05,0,0,0,0,0,0,0,0.025677,0,0.05,0,0,0,0.04,0,0.031037037,0,0,0,0,0,0.04,0,0,0,0.074074074,0,0.032258065,0,0,0,0.001,0,0,0,0,0,0,0,0,0,0,0.060606061,0,0,0,0.060606061,0,0,0,0,0,0,0,0,0]
v1=pd.DataFrame(data=values,index=None,columns=['values'])
list2=[]
list2=find_continuous(v1['values'],0.035,2)
for k in range(len(list2)):
print(k)
v1.at[list2[k],'cluster']=list2[k]
with pd.option_context("display.max_rows", v1.shape[0]):
display(v1)
Question: is there a better way to write this in Python and how would a highly-skilled Python developer would write this?
Thank you!
答案1
得分: 2
我认为最简单的方法是将零信号分组并计算每组的大小,然后使用条件来确定检测值。这避免了对行进行循环,更清晰:
import pandas as pd
import numpy as np
df = pd.DataFrame({'signal': [0, 0.01, 0.036, 0, 0.2, 0, 0, 0, 0, 0.5, 0, 0, 0.1, 0.0, 0.0, 0.0]})
gp = 0
gp_prev = False
def func(x):
global gp, gp_prev
if x == 0:
if gp_prev == False:
gp += 1
gp_prev = True
return gp
else:
gp_prev = False
return 0
df['group'] = df['signal'].map(func)
df['gsize'] = df.groupby('group')['signal'].transform('size')
df['detection'] = np.where(((df['group'] > 0) & (df['gsize'] > 2)) | ((df['group'] == 1) & (df.loc[0, 'signal'] == 0)), False, True)
df_final = df.drop(['group', 'gsize'], axis=1)
print(df_final)
英文:
I consider that the simplest approach is to form groups of zero signals and count the size of each group and then use conditions to determine the detection value. This avoids looping over the rows and is clearer:
import pandas as pd
import numpy as np
df= pd.DataFrame({'signal': [0, 0.01, 0.036, 0, 0.2, 0, 0, 0, 0, 0.5, 0, 0, 0.1, 0.0, 0.0, 0.0]})
gp = 0
gp_prev = False
def func(x):
global gp, gp_prev
if x == 0:
if gp_prev == False:
gp += 1
gp_prev = True
return gp
else:
gp_prev = False
return 0
# use func to map and number groupings of zero and non-zero groups
df['group'] = df['signal'].map(func)
# collect on the groups, and record their sizes in column gsize
df['gsize'] = df.groupby('group')['signal'].transform('size')
# mark detection True or False according to type and size of groups
#note that group 1 (starting zero signal) is treated as a special case
df['detection'] = np.where(((df['group'] > 0) & (df['gsize'] >2)) | ((df['group'] == 1) & (df.loc[0,'signal'] ==0)), False, True)
# clean up by dropping the temporary columns
df_final = df.drop(['group','gsize'], axis = 1)
print(df)
which gives (for df so as to show the temporary workings):
signal group gsize detection
0 0.000 1 1 False
1 0.010 0 5 True
2 0.036 0 5 True
3 0.000 2 1 True
4 0.200 0 5 True
5 0.000 3 4 False
6 0.000 3 4 False
7 0.000 3 4 False
8 0.000 3 4 False
9 0.500 0 5 True
10 0.000 4 2 True
11 0.000 4 2 True
12 0.100 0 5 True
13 0.000 5 3 False
14 0.000 5 3 False
15 0.000 5 3 False
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论