For Loop替代方案,动态筛选和总结数百万个点

huangapple go评论77阅读模式
英文:

For Loop alternative to dynamically filter and summarise millions of points

问题

我有一个包含500万条记录的数据集,格式如下:

library(dplyr)

Data <- tibble(X = runif(5000000, min = 200000, max = 400000),
                Y = runif(5000000, min = 400000, max = 500000),
                UID = 1:5000000,
                Count = runif(5000000, min = 1, max = 2000))

对于每个UID,我需要总结以下信息:1)其他点的数量,2)其他点的计数总和,3)在200米、400米、800米和1600米范围内的UID的密度计算。所需的最终输出是一个表格,包含每个UID及其摘要输出。

以下的for循环演示了所需的输出,但需要花费几周的时间来完成:

Summary <- NULL

for (n in 1:nrow(Data)) {
    Data_Row <- Data[n,]

    Row_200 <- Data %>%
        filter(between(X, Data_Row$X - 200, Data_Row$X + 200) &
               between(Y, Data_Row$Y - 200, Data_Row$Y + 200)) %>%
        summarise(Instances_200 = n(), Count_200 = sum(Count), Calculation_200 = Count_200 / (Instances_200 * 0.000625))

    Row_400 <- Data %>%
        filter(between(X, Data_Row$X - 400, Data_Row$X + 400) &
               between(Y, Data_Row$Y - 400, Data_Row$Y + 400)) %>%
        summarise(Instances_400 = n(), Count_400 = sum(Count), Calculation_400 = Count_400 / (Instances_400 * 0.000625))

    Row_800 <- Data %>%
        filter(between(X, Data_Row$X - 800, Data_Row$X + 800) &
               between(Y, Data_Row$Y - 800, Data_Row$Y + 800)) %>%
        summarise(Instances_800 = n(), Count_800 = sum(Count), Calculation_800 = Count_800 / (Instances_800 * 0.000625))

    Row_1600 <- Data %>%
        filter(between(X, Data_Row$X - 1600, Data_Row$X + 1600) &
               between(Y, Data_Row$Y - 1600, Data_Row$Y + 1600)) %>%
        summarise(Instances_1600 = n(), Count_1600 = sum(Count), Calculation_1600 = Count_1600 / (Instances_1600 * 0.000625))

    Summary_Row <- bind_cols(Data[n,"UID"], Row_200, Row_400, Row_800, Row_1600)

    Summary <- bind_rows(Summary, Summary_Row)
}

我知道像lapplymap这样的函数,但我不知道如何使用它们来更快地进行这种动态过滤和汇总。

英文:

I have a dataset with 5 million records in this format:

library(dplyr)
    
Data &lt;- tibble(X=runif(5000000, min=200000, max=400000),
                Y=runif(5000000, min=400000, max=500000),
    			UID = 1:5000000,
    			Count = runif(5000000, min=1, max=2000))

For each UID, I need to summarise 1) the number of other points 2) the count sum of the other points and 3) a density calculation of UIDs that are within 200m, 400m, 800m and 1600m. The final output required is a table with each UID and its summary output.

This for loop illustrates the output required, but would take weeks to complete:

Summary &lt;- NULL

for (n in 1:nrow(Data)){
Data_Row &lt;- Data[n,]

Row_200 &lt;- Data %&gt;% filter(between(X, Data_Row$X-200,Data_Row$X+200) &amp;  between(Y, Data_Row$Y-200, Data_Row$Y+200)) %&gt;%
	summarise(Instances_200 = n(), Count_200 = sum(Count), Calcualtion_200 = Count_200/(Instances_200*0.000625))

Row_400 &lt;- Data %&gt;% filter(between(X, Data_Row$X-400,Data_Row$X+400) &amp;  between(Y, Data_Row$Y-400, Data_Row$Y+400)) %&gt;%
	summarise(Instances_400 = n(), Count_400 = sum(Count), Calcualtion_400 = Count_400/(Instances_400*0.000625))

Row_800 &lt;- Data %&gt;% filter(between(X, Data_Row$X-800,Data_Row$X+800) &amp;  between(Y, Data_Row$Y-800, Data_Row$Y+800)) %&gt;%
	summarise(Instances_800 = n(), Count_800 = sum(Count), Calcualtion_800 = Count_800/(Instances_800*0.000625))

Row_1600 &lt;- Data %&gt;% filter(between(X, Data_Row$X-1600,Data_Row$X+1600) &amp;  between(Y, Data_Row$Y-1600, Data_Row$Y+1600)) %&gt;%
	summarise(Instances_1600 = n(), Count_1600 = sum(Count), Calcualtion_1600 = Count_1600/(Instances_1600*0.000625))

Summary_Row &lt;- bind_cols(Data[n,&quot;UID&quot;], Row_200, Row_400, Row_800, Row_1600)

Summary &lt;- bind_rows(Summary, Summary_Row)
}

I am aware of things like lapply and map, but I don't know how they could be used to do this dynamic filtering and summarisation more quickly.

答案1

得分: 3

基于@Jon Spring的评论,我参考了[此答案][1]来加快处理速度。以下答案现在在我的真实数据上运行大约需要12小时(而不是我的问题中的代码需要的天数/周数)。它在某些时候需要较多内存(我使用带有32GB RAM的Windows桌面),这就是为什么我尽量减少存储内容的原因(即为什么我将每个循环的输出写入CSV文件,然后在最后将它们带回来)。使用data.table确实加快了速度,但我确信仍然有一种更快/更有效的方法来做到这一点 - 但这就是我所需的(即使我的代码有点混乱!)。

# geosphere v1.5-19 required
install.packages('geosphere', repos='https://rspatial.r-universe.dev')

# Required libraries
library(data.table)
library(dplyr)
library(geosphere)
library(tidyr)
library(readr)
library(purrr)

set.seed(123)

# 输入数据
Data <- data.table(
    X=runif(5000000, min=200000, max=400000),
    Y=runif(5000000, min=400000, max=500000),
    UID = 1:5000000,
    Count = sample(1:2000,5000000,replace=TRUE))

# 为每个点添加50m网格单元ID
Data[, ID := OSGB(Data[,c("X", "Y")], "50m")]

# 汇总每个50m网格单元的记录数
Data_Summary <- Data[, .(Count = .N), by=.(ID)]

# 为覆盖'Data'范围的50米网格方格生成质心坐标
Coordinates <- CJ(X = seq((floor(min(Data$X)/1000)*1000)+25, ceiling(max(Data$X)/1000)*1000, 50),
                  Y = seq((floor(min(Data$Y)/1000)*1000)+25, ceiling(max(Data$Y)/1000)*1000, 50))

# 为每个质心添加50m网格单元ID
Coordinates[, ID := OSGB(Coordinates[,c("X", "Y")], "50m")]

# 将50m网格单元质心坐标与'Data_Summary'连接
Data_Summary_with_Centroids <- Coordinates[Data_Summary, on = .(ID), nomatch = NULL]

# 定义最大缓冲区大小
Buffer <- 1650

# 为每个网格质心生成偏移XY坐标
Data_Summary_with_Centroids[,`:=`(X_NW = X-Buffer, Y_NW = Y+Buffer, 
                        X_NE = X+Buffer, Y_NE = Y+Buffer,
                        X_SW = X-Buffer, Y_SW = Y-Buffer,
                        X_SE = X+Buffer, Y_SE = Y-Buffer)]

# 为所有坐标对分配5km网格单元ID
Data_Summary_with_Centroids[,`:=`(HashCN = OSGB(Data_Summary_with_Centroids[,c("X", "Y")], "5km"), 
                        HashNW = OSGB(Data_Summary_with_Centroids[,c("X_NW", "Y_NW")], "5km"),
                        HashNE = OSGB(Data_Summary_with_Centroids[,c("X_NE", "Y_NE")], "5km"),
                        HashSW = OSGB(Data_Summary_with_Centroids[,c("X_SW", "Y_SW")], "5km"),
                        HashSE = OSGB(Data_Summary_with_Centroids[,c("X_SE", "Y_SE")], "5km"))]

# 移除偏移XY坐标
Data_Summary_with_Centroids[, c("X_NW", "Y_NW","X_NE", "Y_NE","X_SW", "Y_SW","X_SE", "Y_SE"):=NULL] 

Unique_5km_Hash <- unique(Data_Summary_with_Centroids$HashCN)

# 所有唯一的5km网格单元ID列表
Unique_5km_Hash <- unique(Data_Summary_with_Centroids$HashCN)

XY_Columns <- c("X", "Y")

for (n in 1:length(Unique_5km_Hash)){
    
    # 只保留具有该循环的5km网格单元ID的网格单元,其中HashCN、HashNW、HashNE、HashSW或HashSE中的任何一个
    Hash <- Data_Summary_with_Centroids[Data_Summary_with_Centroids[, Reduce(`|`, lapply(.SD, `==`, Unique_5km_Hash[n])), 
    .SDcols = c("HashCN", "HashNW","HashNE","HashSW","HashSE")]][, c("HashNW", "HashNE","HashSW", "HashSE"):=NULL] 
    
    # 仅选择'crossing'所需的列
    Hash_Cross <- Hash[, ..XY_Columns]

    # 扩展以包括所有可能的值组合
    suppressMessages(Hash_5km <- Hash_Cross %>%
        crossing(., ., .name_repair = "unique"))
        
    rm(Hash_Cross)
    # 清理内存
    invisible(gc())
    
    # 计算距离,舍弃超过1600m的距离,仅保留在'HashCN'中的'origin'记录,并添加Count
    Hash_5km <- Hash_5km %>%
        mutate(Distance = sqrt((X...1 - X...3)^2 + (Y...2 - Y...4)^2)) %>%
        filter(Distance <=1600) %>%
        left_join(.,Hash %>% select(X, Y, ID, HashCN), join_by(X...1 == X, Y...2 == Y)) %>%
        filter(HashCN == Unique_5km_Hash[n]) %>%
        left_join(.,Hash %>% select(X, Y, Count), join_by(X...3 == X, Y...4 == Y)) %>%
        select(ID, Count, Distance)

    # 清理内存
    invisible(gc())

    # 创建200m内其他网格单元的汇总统计信息
    Hash_200 <- Hash_5km %>%
        filter(Distance <=200) %>%
        group_by(ID) %>%
        summarise(Instances_200 = n(), Count_200 = sum(Count), Calculation

<details>
<summary>英文:</summary>

Based on @Jon Spring&#39;s comment I have used [this answer][1] as inspiration to speed up the process. The following answer now takes around 12 hours to run on my real data (as opposed to the days/weeks the code in my question would have taken). It is a bit memory hungry at points (I am using a Windows desktop with 32GB of RAM) which is why I have minimised what gets stored (i.e. why I write out each loops output to CSV and then bring them back in at the end). Using `data.table` has definitely speed things up, but I am sure there will still be a quicker/more efficient way to do this - but this does what I need (even if my code is messy!).

    # geosphere v1.5-19 required
    install.packages(&#39;geosphere&#39;, repos=&#39;https://rspatial.r-universe.dev&#39;)
    
    # Required libraries
    library(data.table)
    library(dplyr)
    library(geosphere)
    library(tidyr)
    library(readr)
    library(purrr)
       
    set.seed(123)
       
    # Input data
    Data &lt;- data.table(
    	X=runif(5000000, min=200000, max=400000),
        Y=runif(5000000, min=400000, max=500000),
    	UID = 1:5000000,
        Count = sample(1:2000,5000000,replace=TRUE))
    
    # Add 50m grid cell ID for each point
    Data[, ID := OSGB(Data[,c(&quot;X&quot;, &quot;Y&quot;)], &quot;50m&quot;)]	
    
    # Summarise number of records for each 50m grid cell
    Data_Summary &lt;- Data[, .(Count = .N), by=.(ID)]
    
    # Generate centroid coorindates for 50 metre grid squares that cover &#39;Data&#39; extent
    Coordinates &lt;- CJ(X = seq((floor(min(Data$X)/1000)*1000)+25, ceiling(max(Data$X)/1000)*1000, 50),
    				Y = seq((floor(min(Data$Y)/1000)*1000)+25, ceiling(max(Data$Y)/1000)*1000, 50))
    
    # Add 50m grid cell ID for each centroid
    Coordinates[, ID := OSGB(Coordinates[,c(&quot;X&quot;, &quot;Y&quot;)], &quot;50m&quot;)]
    
    # Join 50m grid cell centroid coordinates to &#39;Data_Summary&#39;
    Data_Summary_with_Centroids &lt;- Coordinates[Data_Summary, on = .(ID), nomatch = NULL]	
    
    # Define maximum buffer size
    Buffer &lt;- 1650
    
    # Generate offset XY coordinates for each grid centroid
    Data_Summary_with_Centroids[,`:=`(X_NW = X-Buffer, Y_NW = Y+Buffer, 
    						X_NE = X+Buffer, Y_NE = Y+Buffer,
    						X_SW = X-Buffer, Y_SW = Y-Buffer,
    						X_SE = X+Buffer, Y_SE = Y-Buffer)]
    
    # Assign 5km grid cell ID to all pairs of coordinates						
    Data_Summary_with_Centroids[,`:=`(HashCN = OSGB(Data_Summary_with_Centroids[,c(&quot;X&quot;, &quot;Y&quot;)], &quot;5km&quot;), 
    						HashNW = OSGB(Data_Summary_with_Centroids[,c(&quot;X_NW&quot;, &quot;Y_NW&quot;)], &quot;5km&quot;),
    						HashNE = OSGB(Data_Summary_with_Centroids[,c(&quot;X_NE&quot;, &quot;Y_NE&quot;)], &quot;5km&quot;),
    						HashSW = OSGB(Data_Summary_with_Centroids[,c(&quot;X_SW&quot;, &quot;Y_SW&quot;)], &quot;5km&quot;),
    						HashSE = OSGB(Data_Summary_with_Centroids[,c(&quot;X_SE&quot;, &quot;Y_SE&quot;)], &quot;5km&quot;))]
    
    # Remove offset XY coordinates
    Data_Summary_with_Centroids[, c(&quot;X_NW&quot;, &quot;Y_NW&quot;,&quot;X_NE&quot;, &quot;Y_NE&quot;,&quot;X_SW&quot;, &quot;Y_SW&quot;,&quot;X_SE&quot;, &quot;Y_SE&quot;):=NULL] 
    
    Unique_5km_Hash &lt;- unique(Data_Summary_with_Centroids$HashCN)
    
    # List of all unique 5km grid cell IDs
    Unique_5km_Hash &lt;- unique(Data_Summary_with_Centroids$HashCN)
    
    XY_Columns &lt;- c(&quot;X&quot;, &quot;Y&quot;)
    
    for (n in 1:length(Unique_5km_Hash)){
    	
    	# Subset so only grid cells that have that loops 5km grid cell ID in any of HashCN, HashNW, HashNE, HashSW or HashSE remain
    	Hash &lt;- Data_Summary_with_Centroids[Data_Summary_with_Centroids[, Reduce(`|`, lapply(.SD, `==`, Unique_5km_Hash[n])), 
    	.SDcols = c(&quot;HashCN&quot;, &quot;HashNW&quot;,&quot;HashNE&quot;,&quot;HashSW&quot;,&quot;HashSE&quot;)]][, c(&quot;HashNW&quot;, &quot;HashNE&quot;,&quot;HashSW&quot;, &quot;HashSE&quot;):=NULL] 
    	
    	# Select only the columns needed for &#39;crossing&#39;
    	Hash_Cross &lt;- Hash[, ..XY_Columns]
    
    	# Expand to include all possible combinations of values
    	suppressMessages(Hash_5km &lt;- Hash_Cross %&gt;% 
    		crossing(., ., .name_repair = &quot;unique&quot;))
    		
    	rm(Hash_Cross)
    	# Clear memory
    	invisible(gc())
    	
    	# Calculate distances, disgard those over 1600m, keep only &#39;origin&#39; records that are in &#39;HashCN&#39; and add Count
    	Hash_5km &lt;- Hash_5km %&gt;%
    		mutate(Distance = sqrt((X...1 - X...3)^2 + (Y...2 - Y...4)^2)) %&gt;%
    		filter(Distance &lt;=1600) %&gt;%
    		left_join(.,Hash %&gt;% select(X, Y, ID, HashCN), join_by(X...1 == X, Y...2 == Y)) %&gt;%
    		filter(HashCN == Unique_5km_Hash[n]) %&gt;%
    		left_join(.,Hash %&gt;% select(X, Y, Count), join_by(X...3 == X, Y...4 == Y)) %&gt;%
    		select(ID, Count, Distance)
    
    	# Clear memory
    	invisible(gc())
    
    	# Create summary statistics of other grid cells within 200m
    	Hash_200 &lt;- Hash_5km %&gt;%
    		filter(Distance &lt;=200) %&gt;%
    		group_by(ID) %&gt;%
    		summarise(Instances_200 = n(), Count_200 = sum(Count), Calculation_200 = Count_200/(Instances_200*0.25))
    
    	# Create summary statistics of other grid cells within 400m
    	Hash_400 &lt;- Hash_5km %&gt;%
    		filter(Distance &lt;=400) %&gt;%
    		group_by(ID) %&gt;%
    		summarise(Instances_400 = n(), Count_400 = sum(Count), Calculation_400 = Count_400/(Instances_400*0.25))
    
    	# Create summary statistics of other grid cells within 800m	
    	Hash_800 &lt;- Hash_5km %&gt;%
    		filter(Distance &lt;=800) %&gt;%
    		group_by(ID) %&gt;%
    		summarise(Instances_800 = n(), Count_800 = sum(Count), Calculation_800 = Count_800/(Instances_800*0.25))
    
    	# Create summary statistics of other grid cells within 1600m	
    	Hash_1600 &lt;- Hash_5km %&gt;%
    		group_by(ID) %&gt;%
    		summarise(Instances_1600 = n(), Count_1600 = sum(Count), Calculation_1600 = Count_1600/(Instances_1600*0.25))
    
    	# Join summary statistics together
    	Output &lt;- Hash_200 %&gt;%
    		left_join(Hash_400, by=&#39;ID&#39;) %&gt;%
    		left_join(Hash_800, by=&#39;ID&#39;) %&gt;%
    		left_join(Hash_1600, by=&#39;ID&#39;) 
    
    	# Export output
    	write_csv(Output, paste0(&quot;C:/Temp/Hash/&quot;,Unique_5km_Hash[n],&quot;.csv&quot;))
    
    	rm(Hash_5km)
    	rm(Hash)
    	rm(Hash_200)
    	rm(Hash_400)
    	rm(Hash_800)
    	rm(Hash_1600)
    	rm(Output)
    	invisible(gc())
    }
    
    # Read in all CSV files into single 
    Grid_Calculations &lt;- list.files( path=&quot;C:/Temp/Hash/&quot;, , pattern = &quot;*.csv&quot;, full.names=TRUE ) %&gt;% 
      map_dfr(read_csv, show_col_types = FALSE)

**EDIT:** Have added set.seed() and slightly changed the start of the loop so only the relevant columns are used with &#39;crossing&#39; (this makes it memory efficient for me).

**Output**

First five rows of first loop:

|ID|Instances_200|Count_200|Calculation_200|Instances_400|Count_400|Calculation_400|Instances_800|Count_800|Calculation_800|Instances_1600|Count_1600|Calculation_1600|
|--|-------------|---------|---------------|-------------|---------|---------------|-------------|---------|---------------|--------------|----------|----------------|
|SC550650NE|20|24|4.8|88|119|5.4090909090909|360|475|5.27777777777777|1503|2041|5.43180306054557|
|SC550650SW|22|28|5.09090909090909|87|119|5.47126436781609|360|475|5.27777777777777|1500|2049|5.464|
|SC550651NE|20|30|6|86|117|5.44186046511627|364|479|5.26373626373626|1494|2032|5.44042838018741|
|SC550651SE|21|30|5.71428571428571|85|113|5.31764705882352|364|479|5.26373626373626|1491|2037|5.46478873239436|
|SC550652NE|28|42|6|85|113|5.31764705882352|369|485|5.25745257452574|1487|2020|5.43375924680564|



  [1]: https://stackoverflow.com/questions/72750975/fastest-way-to-get-geo-distance-for-large-dataset-in-r/72755385#72755385



</details>



# 答案2
**得分**: 1

作为一个认为一定有人比我做得更好的人,我会寻找别人制作的解决方案。

因此,我认为使用`sf`和`spdep`包有一些优势。它们都是专门为空间数据设计的。

有两个函数可以开始使用。

创建一个sf对象

```R
x <- sf::st_as_sf(Data, coords = c("X", "Y"))

距离可以使用以下方式计算

st_distance()

当然,一定会找到一种不需要计算所有距离的解决方案。一个简单的算法是将点按照所需半径的一半大小分成不同的箱子。然后,您只需要检查相邻箱子中的点。

英文:

Being the man who says there must be people who can do it better than me, I would look for solutions made by others.

So I see some advantages in using the sf and spdep packages. Both are explicitly designed for spatial data.

There are two functions to start with.

Create an sf-object

x &lt;- sf::st_as_sf(Data, coords = c(&quot;X&quot;, &quot;Y&quot;))

and the distances could be done with the

st_distance()

Surely a solution has to be found that does not compute all the distances. A simple algorithm is to sort the points into bins of half the size of the desired radius. Then you only need to check the points in neighbouring bins.

答案3

得分: 1

使用data.table可能会有帮助,但对于这种情况,我认为计算时间仍然会非常长。也许你还是想尝试一下:

library(data.table)
tbl.data <- as.data.table(Data)

tbl.data[, Instances_200 := mapply(function (x,y) {tbl.data[X<x+200 & X > x -200
                                                               & Y < y+200 & Y > y -200, .N ]}, X, Y)]

tbl.data[, Count_200 := mapply(function (x,y) {tbl.data[X<x+200 & X > x -200
                                                            & Y < y+200 & Y > y -200, sum(Count) ]}, X, Y)]

tbl.data[, Calculation_200 := Count_200/(Instances_200*0.000625)]

等等。

英文:

Using data.table could help .. but for this case, i think the calculation time will still be far too huge ..

May be you want to try anyway :

library(data.table)
tbl.data &lt;- as.data.table(Data)
tbl.data[, Instances_200 := mapply(function (x,y) {tbl.data[X&lt;x+200 &amp; X &gt; x -200
&amp; Y &lt; y+200 &amp; Y &gt; y -200, .N ]}, X, Y)]
tbl.data[, Count_200 := mapply(function (x,y) {tbl.data[X&lt;x+200 &amp; X &gt; x -200
&amp; Y &lt; y+200 &amp; Y &gt; y -200, sum(Count) ]}, X, Y)]
tbl.data[, Calculation_200 := Count_200/(Instances_200*0.000625)]

And so on.

答案4

得分: 1

我理解您需要翻译的部分是代码和相关注释。以下是已翻译的内容:

library(tictoc)
library(data.table)

# 选择数据集大小
num.rows <- 1e5

# 输入数据
tbl.data <- data.table(
    X = runif(num.rows, min = 200000, max = 400000),
    Y = runif(num.rows, min = 400000, max = 500000),
    UID = 1:num.rows,
    Count = sample(1:2000, num.rows, replace = TRUE)
)

setkey(tbl.data, UID)

# 创建超级数据堆栈
tbl.data.1600 <- copy(tbl.data)
tbl.data.0800 <- copy(tbl.data)
tbl.data.0400 <- copy(tbl.data)
tbl.data.0200 <- copy(tbl.data)

tbl.data.1600[, within_metric := 1600]
tbl.data.0800[, within_metric := 800]
tbl.data.0400[, within_metric := 400]
tbl.data.0200[, within_metric := 200]

tbl.data.1600[, group := 1]
tbl.data.0800[, group := 2]
tbl.data.0400[, group := 3]
tbl.data.0200[, group := 4]

tbl.data.stack <- copy(rbind(tbl.data.1600, tbl.data.0800, tbl.data.0400, tbl.data.0200))

setkey(tbl.data.stack, group, UID)

tbl.data.stack[, X_low_bound := X - within_metric]
tbl.data.stack[, X_upp_bound := X + within_metric]
tbl.data.stack[, Y_low_bound := Y - within_metric]
tbl.data.stack[, Y_upp_bound := Y + within_metric]

Xvec <- tbl.data.stack[group == 1]$X
Yvec <- tbl.data.stack[group == 1]$Y
Cvec <- tbl.data.stack[group == 1]$Count

# 定义函数 instances
instances <- function(x = Xvec, y = Yvec, count = Cvec, xlb, xub, ylb, yub) {
    truth <- data.table::between(x, xlb, xub) & data.table::between(y, ylb, yub)
    c("instances" = sum(truth), "sumCount" = sum(count * truth))
}

# 定义函数 instances_stacked
instances_stacked <- function(DT, x = Xvec, y = Yvec, count = Cvec) {
    DT[, as.list(instances(xlb = X_low_bound, xub = X_upp_bound, ylb = Y_low_bound, yub = Y_upp_bound)), by = c("within_metric", "UID")]
}

# 定义函数 split_df
split_df <- function(d, var) {
    base::split(d, get(var, as.environment(d)))
}

# 定义函数 foo2
foo2 <- function(dt) {
    dt2 <- split_df(dt, "group_half")
    require(parallel)
    cl <- parallel::makeCluster(4)
    print(cl)
    clusterExport(cl, varlist = "instances_stacked")
    clusterExport(cl, varlist = "instances")
    clusterExport(cl, varlist = c("Xvec", "Yvec", "Cvec"))
    clusterExport(cl, varlist = "dt2", envir = environment())
    clusterEvalQ(cl, library("data.table"))
    dt2 <- parallel::parLapply(cl, X = dt2, fun = instances_stacked)
    parallel::stopCluster(cl)
    return(rbindlist(dt2))
}

tic("使用 SOCKET 的并行计算:")
results_stacked_socket <- foo2(tbl.data.stack)
results_stacked_socket[, calculation := sumCount / (instances * 0.25)]
toc()

setkey(results_stacked_socket, UID, within_metric)
head(results_stacked_socket)

请注意,我已经为您翻译了代码和相关注释,以便更容易理解。如果您需要进一步的翻译或有其他问题,请随时提出。

英文:

I am trying to understand OP's answer post, specifically the output table.

Focusing on the first row of the output table -- I see that SC550650NE is reported to have 20 instances of another point within 200 of (X=255085.6, Y=465070.1) and a sum-count of 24.

&gt; Data[ID==&quot;SC550650NE&quot;]
X        Y    UID Count         ID   distance
1: 255085.6 465070.1 832751  1908 SC550650NE 0.03272809

So I calculate the distance of all the points relative to (X=255085.6, Y=465070.1). And then display those that are within 200 in ascending order. I count 29, not 20. Also, when I sum the counts, I get 25,463, not 24.

&gt; Data[,distance:=sqrt((X-255085.6)^2 + (Y-465070.1)^2)]
&gt; Data[distance &lt;= 200][order(distance)]
X        Y     UID Count         ID     distance
1: 255085.6 465070.1  832751  1908 SC550650NE   0.03272809
2: 255093.6 465124.7 1371253  1513 SC550651SE  55.16468957
3: 255034.2 465011.8 3512521   396 SC550650SW  77.73592845
4: 255049.8 464993.6 2603353   255 SC550649NW  84.47194005
5: 255091.7 465162.7  193893  1784 SC550651NE  92.82767523
6: 254968.1 465083.5 2245518   186 SC549650NE 118.25898594
7: 255180.5 464970.4 3376500   826 SC551649NE 137.66330325
8: 255148.5 464947.4 3473247  1526 SC551649SW 137.86046404
9: 255136.8 465202.7 2560816   212 SC551652SW 142.12504000
10: 255089.6 465224.3 2788575  1429 SC550652SE 154.24575868
11: 255213.6 465158.4 2051343   331 SC552651NW 155.52946932
12: 255234.1 465015.9  601896  1175 SC552650SW 158.08971475
13: 255231.4 464993.8 2562794  1209 SC552649NW 164.57602726
14: 255236.8 465003.9 2579822   624 SC552650SW 165.05736525
15: 255163.8 465219.0 2793140   491 SC551652SE 168.19129349
16: 254997.8 465224.2 3686855  1948 SC549652SE 177.38964783
17: 255055.7 464895.0 3850088  1904 SC550648NE 177.60704244
18: 254952.1 465189.1 4992168   668 SC549651NE 178.80180844
19: 255237.6 465165.5 3778801   950 SC552651NW 179.50253647
20: 255148.1 464900.4 2842521   204 SC551649SW 180.86815284
21: 255035.4 464891.2  933305   854 SC550648NW 185.79738782
22: 255245.6 465175.2 1453263   446 SC552651NW 191.44182598
23: 255227.8 465200.5 1557782   851 SC552652SW 192.90236356
24: 254894.7 465031.3  657336  1247 SC548650SE 194.79510938
25: 255102.8 464875.4 3051082   148 SC551648NW 195.41964698
26: 254914.2 464971.4 3090917    28 SC549649NW 197.75429388
27: 255030.4 464879.7  895456   109 SC550648NW 198.20424559
28: 255281.9 465101.9 3970383  1810 SC552651SE 198.85139040
29: 255108.3 465268.3 1968621   431 SC551652NW 199.51847902
X        Y     UID Count         ID     distance
&gt; Data[distance &lt;= 200, sum(Count)]
[1] 25463

Edit

5 million rows

So after ill-advisedly using the comments for discussion (sorry, SO!) we have a better sense of our situation. @Chris has a solution that gives an approximation but this was a concession that was being made to try and lower the computation time. During the discussion, we also learned that the distance metric may be a square or a circle, and 4 cores are available on the machine.

I will present two solutions, one for the circle metric and one for the square metric. The circle metric is faster.

Both follow the approach in this SO post.

Circle metric

  • e.g. distance = sqrt( (x1-x2)^2 + (y1-y2)^2) ) &lt;= 200
library(tictoc)
library(data.table)

Pick one of the num.rows. Start small and work up to get a sense of timing. I have 32GB as well (on a mac) so I'm hoping it is comparable. Roughly it seemed doubling the number of rows led to a 4x increase in computation time.

### Pick one:
###
### num.rows &lt;-     4e4     ###   12 seconds on 4 nodes
### num.rows &lt;-  0.078125e6 ###   43 seconds on 4 nodes
### num.rows &lt;-  0.15625e6  ###  154 seconds on 4 nodes
### num.rows &lt;-  0.3125e6   ###   10.5 min   on 4 nodes
### num.rows  &lt;- 0.625e6    ###   44.  min   on 4 nodes
### num.rows  &lt;- 1.25e6     ###  ~3 hours
### num.rows  &lt;- 2.5e6      ###  ~12 hours
## num.rows  &lt;- 5.0e6      ###  ~ 42.53 hours on 4 nodes VERIFIED.
set.seed(123)
# Input data
tbl.data &lt;- data.table(
X=runif(num.rows, min=200000, max=400000),
Y=runif(num.rows, min=400000, max=500000),
UID = 1:num.rows,
Count = sample(1:2000,num.rows,replace=TRUE))
setkey(tbl.data, UID)
tbl.data
setkey(tbl.data, UID)

This approach depends on passing the entire X, Y, and Count columns of the dataset. We denote them Xvec, Yvec, and Cvec, respectively. From there we make a function, instances() that will first calculate the distance of 1 point (x.in, y.in) to all the other points in (Xvec, Yvec), calculate the instances and do the sumCount (the density "calculation" is done at the end and doesn't need to be done within this step or be parallelized).

This function, instances(), is made to be deployed within a data.table, and is setup to do so in the function instances.stacked(). Since we are passing the entire entire X, Y, and Count columns of the dataset and operating row-wise in the original dataset, we can tackle the problem in parallel. If we had parallel::detectCores() of 5 million, we'd have one core per row and be done in a jiffy. However, we have 4, so we take a moment to make a variable group_quarters that will split the dataset into 4 chunks via foo2()'s deployment of split_df() (both of these adapted from the example in aforementioned SO post).

Xvec &lt;- tbl.data$X
Yvec &lt;- tbl.data$Y
Cvec &lt;- tbl.data$Count
## based on https://stackoverflow.com/a/44814035/2727349
instances &lt;- function(x=Xvec,y=Yvec, count=Cvec, x.in, y.in){
calcdist &lt;- sqrt( (x-x.in)^2+ (y-y.in)^2 )
truth_1600 &lt;- calcdist &lt;= 1600
result_1600 &lt;- c(&quot;instances_1600&quot; = sum( truth_1600 ), &quot;sumCount_1600&quot;= sum( Cvec*truth_1600))
truth_0800 &lt;- calcdist &lt;=  800
result_0800 &lt;- c(&quot;instances_0800&quot; = sum( truth_0800 ), &quot;sumCount_0800&quot;= sum( Cvec*truth_0800))
truth_0400 &lt;- calcdist &lt;=  400
result_0400 &lt;- c(&quot;instances_0400&quot; = sum( truth_0400 ), &quot;sumCount_0400&quot;= sum( Cvec*truth_0400))
truth_0200 &lt;- calcdist &lt;=  200
result_0200 &lt;- c(&quot;instances_0200&quot; = sum( truth_0200 ), &quot;sumCount_0200&quot;= sum( Cvec*truth_0200))
result=c(result_0200, result_0400, result_0800, result_1600)
return(result)
}
instances_stacked &lt;- function(DT, x=Xvec,y=Yvec, count=Cvec){
DT[,
as.list(instances( x.in=X, y.in=Y)),
by=c(&quot;UID&quot;)
]
}
split_df &lt;- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
## define group_quarters roughly as 25% population
tbl.data[UID &lt;  0.25 * num.rows                         , group_quarters:=&quot;1st q&quot;]
tbl.data[UID &lt;  0.50 * num.rows &amp; UID &gt;= 0.25 * num.rows, group_quarters:=&quot;2nd q&quot;]
tbl.data[UID &lt;  0.75 * num.rows &amp; UID &gt;= 0.50 * num.rows, group_quarters:=&quot;3rd q&quot;]
tbl.data[                         UID &gt;= 0.75 * num.rows, group_quarters:=&quot;4th q&quot;]
tbl.data[, uniqueN(group_quarters)]
tbl.data[, table(group_quarters)]
foo2 &lt;- function(dt) {
dt2 &lt;- split_df(dt, &quot;group_quarters&quot;)
require(parallel)
cl &lt;- parallel::makeCluster(4) ## 4 chosen based on parallel::detectCores()
print(cl)
clusterExport(cl, varlist= &quot;instances_stacked&quot;) ## this was the `foo` in https://stackoverflow.com/a/44814035/2727349
clusterExport(cl, varlist= &quot;instances&quot;)
clusterExport(cl, varlist= c(&quot;Xvec&quot;,&quot;Yvec&quot;,&quot;Cvec&quot;)) 
clusterExport(cl, varlist= &quot;dt2&quot;, envir = environment())
clusterEvalQ(cl, library(&quot;data.table&quot;))
dt2 &lt;- parallel::parLapply(cl, X= dt2, fun=instances_stacked)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}

The call to foo2() will deploy the analysis in parallel and combine everything back together into results_wide. We can then do the density calculation and view some results.

tic(&quot;foo2() runs on 4 data chunks (1/4 sized) in parallel: &quot;)
results_wide &lt;- foo2(tbl.data)
## next 4 lines done after the parallel part
results_wide[,calculation_1600:=sumCount_1600/(instances_1600*0.25)]
results_wide[,calculation_0800:=sumCount_0800/(instances_0800*0.25)]
results_wide[,calculation_0400:=sumCount_0400/(instances_0400*0.25)]
results_wide[,calculation_0200:=sumCount_0200/(instances_0200*0.25)]
toc()
## let&#39;s see the results
head(results_wide)
## for num.rows&lt;-5e6, 
## should match the intro example at top of the post:
results_wide[UID==832751] 

Which gives (after 42 1/2 hours):

&gt; head(results_wide)
UID instances_0200 sumCount_0200 instances_0400 sumCount_0400 instances_0800
1:   1             28         25348            115        110519            510
2:   2             38         37791            123        128978            498
3:   3             28         26958            123        131816            506
4:   4             33         33861            125        113013            514
5:   5             31         36492            123        130939            522
6:   6             37         40887            136        141213            480
sumCount_0800 instances_1600 sumCount_1600 calculation_1600 calculation_0800
1:        521004           2025       2015967         3982.157         4086.306
2:        499358           2001       1999339         3996.680         4010.908
3:        507117           2013       2003329         3980.783         4008.830
4:        513712           2065       2115524         4097.867         3997.759
5:        516877           2054       1990644         3876.619         3960.743
6:        491969           1952       2001461         4101.355         4099.742
calculation_0400 calculation_0200
1:         3844.139         3621.143
2:         4194.407         3978.000
3:         4286.699         3851.143
4:         3616.416         4104.364
5:         4258.179         4708.645
6:         4153.324         4420.216
## for num.rows &lt;- 5e6, should match the 
## intro example at top of the post for
## Data[ID==&quot;SC550650NE&quot;]
&gt; results_wide[UID==832751] 
UID instances_0200 sumCount_0200 instances_0400 sumCount_0400
1: 832751             29         25463            122        110993
instances_0800 sumCount_0800 instances_1600 sumCount_1600 calculation_1600
1:            480        467032           2048       2054509         4012.713
calculation_0800 calculation_0400 calculation_0200
1:         3891.933         3639.115         3512.138

Square metric

  • e.g. Question Post's (X-200, X+200) &amp; (Y-200, Y+200)

...NOTE...I overloaded some function names so might be best to put this code in separate script and separate R session from the circle metric code

Similar idea to circle metric approach above, but the execution is slightly different because it is based on the bounds as opposed to the distance. Basically, we make a super stack of the data and calculate the boundaries specific for each distance. These four "boundaries" make a natural candidate for chunking the data for parallelizing -- this code goes one step farther and breaks each of those 4 in half resulting in 8 groups. We still run on 4 nodes, though. I tested a bunch on 8 nodes bc my testing environment allowed for this and I developed this code before I knew OP had a 4 core environment.

library(data.table)
library(tictoc)
library(dplyr)
set.seed(123)
### Pick one:
### num.rows &lt;- 4e4 ###     15 seconds on 8 nodes | 23 seconds on 4 nodes
num.rows &lt;- 1e5 ###        86 seconds on 8 nodes | 140.034 seconds on 4 nodes
### num.rows &lt;- 2e5 ###    347 seconds on 8 nodes
### num.rows &lt;- 4e5 ###   1370 seconds on 8 nodes   
### num.rows &lt;- 8e5 ###   1370*4 =  90 minutes?
### num.rows &lt;- 1.6e6 ###   1370*4*4 =  360 minutes = 6 hours (confirmed: 21742.233 / 60 / 60 = 6 hours)
### num.rows &lt;- 2.5e6 ###     47523 / 60 / 60 = 13 hours and 12 minutes VERIFIED.
### num.rows &lt;- 5.0e6 ###   4*47523 / 60 / 60 = ~53 hours estimated.
# Input data
tbl.data &lt;- data.table(
X=runif(num.rows, min=200000, max=400000),
Y=runif(num.rows, min=400000, max=500000),
UID = 1:num.rows,
Count = sample(1:2000,num.rows,replace=TRUE))
setkey(tbl.data, UID)
tbl.data
setkey(tbl.data, UID)

Make a superstack:


tbl.data.1600 &lt;- copy(tbl.data)
tbl.data.0800 &lt;- copy(tbl.data)
tbl.data.0400 &lt;- copy(tbl.data)
tbl.data.0200 &lt;- copy(tbl.data)
tbl.data.1600[, within_metric:=1600]
tbl.data.0800[, within_metric:= 800]
tbl.data.0400[, within_metric:= 400]
tbl.data.0200[, within_metric:= 200]
tbl.data.1600[, group:=1]
tbl.data.0800[, group:=2]
tbl.data.0400[, group:=3]
tbl.data.0200[, group:=4]
tbl.data.stack &lt;- copy(rbind(tbl.data.1600, tbl.data.0800, tbl.data.0400, tbl.data.0200))
setkey(tbl.data.stack, group, UID)
tbl.data.stack[,X_low_bound:=X-within_metric]
tbl.data.stack[,X_upp_bound:=X+within_metric]
tbl.data.stack[,Y_low_bound:=Y-within_metric]
tbl.data.stack[,Y_upp_bound:=Y+within_metric]
Xvec &lt;- tbl.data.stack[group==1]$X
Yvec &lt;- tbl.data.stack[group==1]$Y
Cvec &lt;- tbl.data.stack[group==1]$Count

I include this commented chunk -- this is how one could run it without parallelizing it. Go ahead and skip for now to next block.


###################
## BEGIN ##########
###################
## no parallel. ###
## just stacked ###
###################
# instances_stacked &lt;- function(x=Xvec,y=Yvec, count=Cvec, xlb, xub, ylb, yub){
#   
#   truth &lt;- data.table::between(x, xlb, xub) &amp; data.table::between(y, ylb, yub)
#   
#   c(&quot;instances&quot; = sum( truth ), &quot;sumCount&quot;= sum( Cvec*truth))
#   
# }
# 
# 
# tic(&quot;external default vector approach -- STACKED -- all 4: &quot;)
# results_stacked &lt;- 
# tbl.data.stack[,
#          as.list(instances_stacked( xlb=X_low_bound, xub=X_upp_bound, ylb=Y_low_bound, yub=Y_upp_bound)),
#          by=c(&quot;group&quot;,&quot;UID&quot;)
# ]
# 
# results_stacked[,calculation:=sumCount/(instances*0.25)]
# toc()
# 
# head(results_stacked)
###################
## END.  ##########
###################
## no parallel. ###
## just stacked ###
###################

Below is the parallel code. I broke into 8 groups, but changed the makeCluster line to 4 nodes for your computer.

###################
## BEGIN ##########
###################
## SOCKET.      ###
## ROCKET.      ###
###################
## follow the leader: https://stackoverflow.com/a/44814035/2727349
## step 1: change instances_stacked (which is our `foo`) so that it returns a list.  Also change it to where
##           it takes data table as an argument.
## step 2: use split_df -- keep in mind might need to make a concatenated grouping of &quot;group&quot; and &quot;UID&quot;
instances &lt;- function(x=Xvec,y=Yvec, count=Cvec, xlb, xub, ylb, yub){
truth &lt;- data.table::between(x, xlb, xub) &amp; data.table::between(y, ylb, yub)
c(&quot;instances&quot; = sum( truth ), &quot;sumCount&quot;= sum( Cvec*truth))
}
instances_stacked &lt;- function(DT, x=Xvec,y=Yvec, count=Cvec){
DT[,
as.list(instances( xlb=X_low_bound, xub=X_upp_bound, ylb=Y_low_bound, yub=Y_upp_bound)),
by=c(&quot;within_metric&quot;,&quot;UID&quot;)
]
}
split_df &lt;- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
## define grps to be distance-bound and UID combo
tbl.data.stack[,grps:=paste0(within_metric,&quot;_&quot;,UID)]
tbl.data.stack[, uniqueN(grps)]
## define group_half to be distance-bound and UID &lt; 1/2 num.rows
tbl.data.stack[,group_half:=paste0(within_metric,&quot;_&quot;, (UID &lt; 0.5 * num.rows)+0L)]
tbl.data.stack[, uniqueN(group_half)]
foo2 &lt;- function(dt) {
#dt2 &lt;- split_df(dt, &quot;grps&quot;)
#dt2 &lt;- split_df(dt, &quot;group&quot;)
dt2 &lt;- split_df(dt, &quot;group_half&quot;)
require(parallel)
##cl &lt;- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
#cl &lt;- parallel::makeCluster(tbl.data.stack[, uniqueN(group_half)])
cl &lt;- parallel::makeCluster(4)
print(cl)
clusterExport(cl, varlist= &quot;instances_stacked&quot;) ## this was the `foo` in https://stackoverflow.com/a/44814035/2727349
clusterExport(cl, varlist= &quot;instances&quot;)
clusterExport(cl, varlist= c(&quot;Xvec&quot;,&quot;Yvec&quot;,&quot;Cvec&quot;)) 
clusterExport(cl, varlist= &quot;dt2&quot;, envir = environment())
clusterEvalQ(cl, library(&quot;data.table&quot;))
dt2 &lt;- parallel::parLapply(cl, X= dt2, fun=instances_stacked)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
tic(&quot;external default vector approach -- SOCKET -- all 4 in parallel: &quot;)
results_stacked_socket &lt;- foo2(tbl.data.stack)
results_stacked_socket[,calculation:=sumCount/(instances*0.25)]
toc()
setkey(results_stacked_socket, UID, within_metric)
head(results_stacked_socket)
###################
## END.  ##########
###################
## SOCKET.      ###
## ROCKET.      ###
###################

huangapple
  • 本文由 发表于 2023年7月17日 21:35:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76705013.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定