For Loop替代方案,动态筛选和总结数百万个点

huangapple go评论114阅读模式
英文:

For Loop alternative to dynamically filter and summarise millions of points

问题

我有一个包含500万条记录的数据集,格式如下:

  1. library(dplyr)
  2. Data <- tibble(X = runif(5000000, min = 200000, max = 400000),
  3. Y = runif(5000000, min = 400000, max = 500000),
  4. UID = 1:5000000,
  5. Count = runif(5000000, min = 1, max = 2000))

对于每个UID,我需要总结以下信息:1)其他点的数量,2)其他点的计数总和,3)在200米、400米、800米和1600米范围内的UID的密度计算。所需的最终输出是一个表格,包含每个UID及其摘要输出。

以下的for循环演示了所需的输出,但需要花费几周的时间来完成:

  1. Summary <- NULL
  2. for (n in 1:nrow(Data)) {
  3. Data_Row <- Data[n,]
  4. Row_200 <- Data %>%
  5. filter(between(X, Data_Row$X - 200, Data_Row$X + 200) &
  6. between(Y, Data_Row$Y - 200, Data_Row$Y + 200)) %>%
  7. summarise(Instances_200 = n(), Count_200 = sum(Count), Calculation_200 = Count_200 / (Instances_200 * 0.000625))
  8. Row_400 <- Data %>%
  9. filter(between(X, Data_Row$X - 400, Data_Row$X + 400) &
  10. between(Y, Data_Row$Y - 400, Data_Row$Y + 400)) %>%
  11. summarise(Instances_400 = n(), Count_400 = sum(Count), Calculation_400 = Count_400 / (Instances_400 * 0.000625))
  12. Row_800 <- Data %>%
  13. filter(between(X, Data_Row$X - 800, Data_Row$X + 800) &
  14. between(Y, Data_Row$Y - 800, Data_Row$Y + 800)) %>%
  15. summarise(Instances_800 = n(), Count_800 = sum(Count), Calculation_800 = Count_800 / (Instances_800 * 0.000625))
  16. Row_1600 <- Data %>%
  17. filter(between(X, Data_Row$X - 1600, Data_Row$X + 1600) &
  18. between(Y, Data_Row$Y - 1600, Data_Row$Y + 1600)) %>%
  19. summarise(Instances_1600 = n(), Count_1600 = sum(Count), Calculation_1600 = Count_1600 / (Instances_1600 * 0.000625))
  20. Summary_Row <- bind_cols(Data[n,"UID"], Row_200, Row_400, Row_800, Row_1600)
  21. Summary <- bind_rows(Summary, Summary_Row)
  22. }

我知道像lapplymap这样的函数,但我不知道如何使用它们来更快地进行这种动态过滤和汇总。

英文:

I have a dataset with 5 million records in this format:

  1. library(dplyr)
  2. Data &lt;- tibble(X=runif(5000000, min=200000, max=400000),
  3. Y=runif(5000000, min=400000, max=500000),
  4. UID = 1:5000000,
  5. Count = runif(5000000, min=1, max=2000))

For each UID, I need to summarise 1) the number of other points 2) the count sum of the other points and 3) a density calculation of UIDs that are within 200m, 400m, 800m and 1600m. The final output required is a table with each UID and its summary output.

This for loop illustrates the output required, but would take weeks to complete:

  1. Summary &lt;- NULL
  2. for (n in 1:nrow(Data)){
  3. Data_Row &lt;- Data[n,]
  4. Row_200 &lt;- Data %&gt;% filter(between(X, Data_Row$X-200,Data_Row$X+200) &amp; between(Y, Data_Row$Y-200, Data_Row$Y+200)) %&gt;%
  5. summarise(Instances_200 = n(), Count_200 = sum(Count), Calcualtion_200 = Count_200/(Instances_200*0.000625))
  6. Row_400 &lt;- Data %&gt;% filter(between(X, Data_Row$X-400,Data_Row$X+400) &amp; between(Y, Data_Row$Y-400, Data_Row$Y+400)) %&gt;%
  7. summarise(Instances_400 = n(), Count_400 = sum(Count), Calcualtion_400 = Count_400/(Instances_400*0.000625))
  8. Row_800 &lt;- Data %&gt;% filter(between(X, Data_Row$X-800,Data_Row$X+800) &amp; between(Y, Data_Row$Y-800, Data_Row$Y+800)) %&gt;%
  9. summarise(Instances_800 = n(), Count_800 = sum(Count), Calcualtion_800 = Count_800/(Instances_800*0.000625))
  10. Row_1600 &lt;- Data %&gt;% filter(between(X, Data_Row$X-1600,Data_Row$X+1600) &amp; between(Y, Data_Row$Y-1600, Data_Row$Y+1600)) %&gt;%
  11. summarise(Instances_1600 = n(), Count_1600 = sum(Count), Calcualtion_1600 = Count_1600/(Instances_1600*0.000625))
  12. Summary_Row &lt;- bind_cols(Data[n,&quot;UID&quot;], Row_200, Row_400, Row_800, Row_1600)
  13. Summary &lt;- bind_rows(Summary, Summary_Row)
  14. }

I am aware of things like lapply and map, but I don't know how they could be used to do this dynamic filtering and summarisation more quickly.

答案1

得分: 3

基于@Jon Spring的评论,我参考了[此答案][1]来加快处理速度。以下答案现在在我的真实数据上运行大约需要12小时(而不是我的问题中的代码需要的天数/周数)。它在某些时候需要较多内存(我使用带有32GB RAM的Windows桌面),这就是为什么我尽量减少存储内容的原因(即为什么我将每个循环的输出写入CSV文件,然后在最后将它们带回来)。使用data.table确实加快了速度,但我确信仍然有一种更快/更有效的方法来做到这一点 - 但这就是我所需的(即使我的代码有点混乱!)。

  1. # geosphere v1.5-19 required
  2. install.packages('geosphere', repos='https://rspatial.r-universe.dev')
  3. # Required libraries
  4. library(data.table)
  5. library(dplyr)
  6. library(geosphere)
  7. library(tidyr)
  8. library(readr)
  9. library(purrr)
  10. set.seed(123)
  11. # 输入数据
  12. Data <- data.table(
  13. X=runif(5000000, min=200000, max=400000),
  14. Y=runif(5000000, min=400000, max=500000),
  15. UID = 1:5000000,
  16. Count = sample(1:2000,5000000,replace=TRUE))
  17. # 为每个点添加50m网格单元ID
  18. Data[, ID := OSGB(Data[,c("X", "Y")], "50m")]
  19. # 汇总每个50m网格单元的记录数
  20. Data_Summary <- Data[, .(Count = .N), by=.(ID)]
  21. # 为覆盖'Data'范围的50米网格方格生成质心坐标
  22. Coordinates <- CJ(X = seq((floor(min(Data$X)/1000)*1000)+25, ceiling(max(Data$X)/1000)*1000, 50),
  23. Y = seq((floor(min(Data$Y)/1000)*1000)+25, ceiling(max(Data$Y)/1000)*1000, 50))
  24. # 为每个质心添加50m网格单元ID
  25. Coordinates[, ID := OSGB(Coordinates[,c("X", "Y")], "50m")]
  26. # 将50m网格单元质心坐标与'Data_Summary'连接
  27. Data_Summary_with_Centroids <- Coordinates[Data_Summary, on = .(ID), nomatch = NULL]
  28. # 定义最大缓冲区大小
  29. Buffer <- 1650
  30. # 为每个网格质心生成偏移XY坐标
  31. Data_Summary_with_Centroids[,`:=`(X_NW = X-Buffer, Y_NW = Y+Buffer,
  32. X_NE = X+Buffer, Y_NE = Y+Buffer,
  33. X_SW = X-Buffer, Y_SW = Y-Buffer,
  34. X_SE = X+Buffer, Y_SE = Y-Buffer)]
  35. # 为所有坐标对分配5km网格单元ID
  36. Data_Summary_with_Centroids[,`:=`(HashCN = OSGB(Data_Summary_with_Centroids[,c("X", "Y")], "5km"),
  37. HashNW = OSGB(Data_Summary_with_Centroids[,c("X_NW", "Y_NW")], "5km"),
  38. HashNE = OSGB(Data_Summary_with_Centroids[,c("X_NE", "Y_NE")], "5km"),
  39. HashSW = OSGB(Data_Summary_with_Centroids[,c("X_SW", "Y_SW")], "5km"),
  40. HashSE = OSGB(Data_Summary_with_Centroids[,c("X_SE", "Y_SE")], "5km"))]
  41. # 移除偏移XY坐标
  42. Data_Summary_with_Centroids[, c("X_NW", "Y_NW","X_NE", "Y_NE","X_SW", "Y_SW","X_SE", "Y_SE"):=NULL]
  43. Unique_5km_Hash <- unique(Data_Summary_with_Centroids$HashCN)
  44. # 所有唯一的5km网格单元ID列表
  45. Unique_5km_Hash <- unique(Data_Summary_with_Centroids$HashCN)
  46. XY_Columns <- c("X", "Y")
  47. for (n in 1:length(Unique_5km_Hash)){
  48. # 只保留具有该循环的5km网格单元ID的网格单元,其中HashCN、HashNW、HashNE、HashSW或HashSE中的任何一个
  49. Hash <- Data_Summary_with_Centroids[Data_Summary_with_Centroids[, Reduce(`|`, lapply(.SD, `==`, Unique_5km_Hash[n])),
  50. .SDcols = c("HashCN", "HashNW","HashNE","HashSW","HashSE")]][, c("HashNW", "HashNE","HashSW", "HashSE"):=NULL]
  51. # 仅选择'crossing'所需的列
  52. Hash_Cross <- Hash[, ..XY_Columns]
  53. # 扩展以包括所有可能的值组合
  54. suppressMessages(Hash_5km <- Hash_Cross %>%
  55. crossing(., ., .name_repair = "unique"))
  56. rm(Hash_Cross)
  57. # 清理内存
  58. invisible(gc())
  59. # 计算距离,舍弃超过1600m的距离,仅保留在'HashCN'中的'origin'记录,并添加Count
  60. Hash_5km <- Hash_5km %>%
  61. mutate(Distance = sqrt((X...1 - X...3)^2 + (Y...2 - Y...4)^2)) %>%
  62. filter(Distance <=1600) %>%
  63. left_join(.,Hash %>% select(X, Y, ID, HashCN), join_by(X...1 == X, Y...2 == Y)) %>%
  64. filter(HashCN == Unique_5km_Hash[n]) %>%
  65. left_join(.,Hash %>% select(X, Y, Count), join_by(X...3 == X, Y...4 == Y)) %>%
  66. select(ID, Count, Distance)
  67. # 清理内存
  68. invisible(gc())
  69. # 创建200m内其他网格单元的汇总统计信息
  70. Hash_200 <- Hash_5km %>%
  71. filter(Distance <=200) %>%
  72. group_by(ID) %>%
  73. summarise(Instances_200 = n(), Count_200 = sum(Count), Calculation
  74. <details>
  75. <summary>英文:</summary>
  76. Based on @Jon Spring&#39;s comment I have used [this answer][1] as inspiration to speed up the process. The following answer now takes around 12 hours to run on my real data (as opposed to the days/weeks the code in my question would have taken). It is a bit memory hungry at points (I am using a Windows desktop with 32GB of RAM) which is why I have minimised what gets stored (i.e. why I write out each loops output to CSV and then bring them back in at the end). Using `data.table` has definitely speed things up, but I am sure there will still be a quicker/more efficient way to do this - but this does what I need (even if my code is messy!).
  77. # geosphere v1.5-19 required
  78. install.packages(&#39;geosphere&#39;, repos=&#39;https://rspatial.r-universe.dev&#39;)
  79. # Required libraries
  80. library(data.table)
  81. library(dplyr)
  82. library(geosphere)
  83. library(tidyr)
  84. library(readr)
  85. library(purrr)
  86. set.seed(123)
  87. # Input data
  88. Data &lt;- data.table(
  89. X=runif(5000000, min=200000, max=400000),
  90. Y=runif(5000000, min=400000, max=500000),
  91. UID = 1:5000000,
  92. Count = sample(1:2000,5000000,replace=TRUE))
  93. # Add 50m grid cell ID for each point
  94. Data[, ID := OSGB(Data[,c(&quot;X&quot;, &quot;Y&quot;)], &quot;50m&quot;)]
  95. # Summarise number of records for each 50m grid cell
  96. Data_Summary &lt;- Data[, .(Count = .N), by=.(ID)]
  97. # Generate centroid coorindates for 50 metre grid squares that cover &#39;Data&#39; extent
  98. Coordinates &lt;- CJ(X = seq((floor(min(Data$X)/1000)*1000)+25, ceiling(max(Data$X)/1000)*1000, 50),
  99. Y = seq((floor(min(Data$Y)/1000)*1000)+25, ceiling(max(Data$Y)/1000)*1000, 50))
  100. # Add 50m grid cell ID for each centroid
  101. Coordinates[, ID := OSGB(Coordinates[,c(&quot;X&quot;, &quot;Y&quot;)], &quot;50m&quot;)]
  102. # Join 50m grid cell centroid coordinates to &#39;Data_Summary&#39;
  103. Data_Summary_with_Centroids &lt;- Coordinates[Data_Summary, on = .(ID), nomatch = NULL]
  104. # Define maximum buffer size
  105. Buffer &lt;- 1650
  106. # Generate offset XY coordinates for each grid centroid
  107. Data_Summary_with_Centroids[,`:=`(X_NW = X-Buffer, Y_NW = Y+Buffer,
  108. X_NE = X+Buffer, Y_NE = Y+Buffer,
  109. X_SW = X-Buffer, Y_SW = Y-Buffer,
  110. X_SE = X+Buffer, Y_SE = Y-Buffer)]
  111. # Assign 5km grid cell ID to all pairs of coordinates
  112. Data_Summary_with_Centroids[,`:=`(HashCN = OSGB(Data_Summary_with_Centroids[,c(&quot;X&quot;, &quot;Y&quot;)], &quot;5km&quot;),
  113. HashNW = OSGB(Data_Summary_with_Centroids[,c(&quot;X_NW&quot;, &quot;Y_NW&quot;)], &quot;5km&quot;),
  114. HashNE = OSGB(Data_Summary_with_Centroids[,c(&quot;X_NE&quot;, &quot;Y_NE&quot;)], &quot;5km&quot;),
  115. HashSW = OSGB(Data_Summary_with_Centroids[,c(&quot;X_SW&quot;, &quot;Y_SW&quot;)], &quot;5km&quot;),
  116. HashSE = OSGB(Data_Summary_with_Centroids[,c(&quot;X_SE&quot;, &quot;Y_SE&quot;)], &quot;5km&quot;))]
  117. # Remove offset XY coordinates
  118. Data_Summary_with_Centroids[, c(&quot;X_NW&quot;, &quot;Y_NW&quot;,&quot;X_NE&quot;, &quot;Y_NE&quot;,&quot;X_SW&quot;, &quot;Y_SW&quot;,&quot;X_SE&quot;, &quot;Y_SE&quot;):=NULL]
  119. Unique_5km_Hash &lt;- unique(Data_Summary_with_Centroids$HashCN)
  120. # List of all unique 5km grid cell IDs
  121. Unique_5km_Hash &lt;- unique(Data_Summary_with_Centroids$HashCN)
  122. XY_Columns &lt;- c(&quot;X&quot;, &quot;Y&quot;)
  123. for (n in 1:length(Unique_5km_Hash)){
  124. # Subset so only grid cells that have that loops 5km grid cell ID in any of HashCN, HashNW, HashNE, HashSW or HashSE remain
  125. Hash &lt;- Data_Summary_with_Centroids[Data_Summary_with_Centroids[, Reduce(`|`, lapply(.SD, `==`, Unique_5km_Hash[n])),
  126. .SDcols = c(&quot;HashCN&quot;, &quot;HashNW&quot;,&quot;HashNE&quot;,&quot;HashSW&quot;,&quot;HashSE&quot;)]][, c(&quot;HashNW&quot;, &quot;HashNE&quot;,&quot;HashSW&quot;, &quot;HashSE&quot;):=NULL]
  127. # Select only the columns needed for &#39;crossing&#39;
  128. Hash_Cross &lt;- Hash[, ..XY_Columns]
  129. # Expand to include all possible combinations of values
  130. suppressMessages(Hash_5km &lt;- Hash_Cross %&gt;%
  131. crossing(., ., .name_repair = &quot;unique&quot;))
  132. rm(Hash_Cross)
  133. # Clear memory
  134. invisible(gc())
  135. # Calculate distances, disgard those over 1600m, keep only &#39;origin&#39; records that are in &#39;HashCN&#39; and add Count
  136. Hash_5km &lt;- Hash_5km %&gt;%
  137. mutate(Distance = sqrt((X...1 - X...3)^2 + (Y...2 - Y...4)^2)) %&gt;%
  138. filter(Distance &lt;=1600) %&gt;%
  139. left_join(.,Hash %&gt;% select(X, Y, ID, HashCN), join_by(X...1 == X, Y...2 == Y)) %&gt;%
  140. filter(HashCN == Unique_5km_Hash[n]) %&gt;%
  141. left_join(.,Hash %&gt;% select(X, Y, Count), join_by(X...3 == X, Y...4 == Y)) %&gt;%
  142. select(ID, Count, Distance)
  143. # Clear memory
  144. invisible(gc())
  145. # Create summary statistics of other grid cells within 200m
  146. Hash_200 &lt;- Hash_5km %&gt;%
  147. filter(Distance &lt;=200) %&gt;%
  148. group_by(ID) %&gt;%
  149. summarise(Instances_200 = n(), Count_200 = sum(Count), Calculation_200 = Count_200/(Instances_200*0.25))
  150. # Create summary statistics of other grid cells within 400m
  151. Hash_400 &lt;- Hash_5km %&gt;%
  152. filter(Distance &lt;=400) %&gt;%
  153. group_by(ID) %&gt;%
  154. summarise(Instances_400 = n(), Count_400 = sum(Count), Calculation_400 = Count_400/(Instances_400*0.25))
  155. # Create summary statistics of other grid cells within 800m
  156. Hash_800 &lt;- Hash_5km %&gt;%
  157. filter(Distance &lt;=800) %&gt;%
  158. group_by(ID) %&gt;%
  159. summarise(Instances_800 = n(), Count_800 = sum(Count), Calculation_800 = Count_800/(Instances_800*0.25))
  160. # Create summary statistics of other grid cells within 1600m
  161. Hash_1600 &lt;- Hash_5km %&gt;%
  162. group_by(ID) %&gt;%
  163. summarise(Instances_1600 = n(), Count_1600 = sum(Count), Calculation_1600 = Count_1600/(Instances_1600*0.25))
  164. # Join summary statistics together
  165. Output &lt;- Hash_200 %&gt;%
  166. left_join(Hash_400, by=&#39;ID&#39;) %&gt;%
  167. left_join(Hash_800, by=&#39;ID&#39;) %&gt;%
  168. left_join(Hash_1600, by=&#39;ID&#39;)
  169. # Export output
  170. write_csv(Output, paste0(&quot;C:/Temp/Hash/&quot;,Unique_5km_Hash[n],&quot;.csv&quot;))
  171. rm(Hash_5km)
  172. rm(Hash)
  173. rm(Hash_200)
  174. rm(Hash_400)
  175. rm(Hash_800)
  176. rm(Hash_1600)
  177. rm(Output)
  178. invisible(gc())
  179. }
  180. # Read in all CSV files into single
  181. Grid_Calculations &lt;- list.files( path=&quot;C:/Temp/Hash/&quot;, , pattern = &quot;*.csv&quot;, full.names=TRUE ) %&gt;%
  182. map_dfr(read_csv, show_col_types = FALSE)
  183. **EDIT:** Have added set.seed() and slightly changed the start of the loop so only the relevant columns are used with &#39;crossing&#39; (this makes it memory efficient for me).
  184. **Output**
  185. First five rows of first loop:
  186. |ID|Instances_200|Count_200|Calculation_200|Instances_400|Count_400|Calculation_400|Instances_800|Count_800|Calculation_800|Instances_1600|Count_1600|Calculation_1600|
  187. |--|-------------|---------|---------------|-------------|---------|---------------|-------------|---------|---------------|--------------|----------|----------------|
  188. |SC550650NE|20|24|4.8|88|119|5.4090909090909|360|475|5.27777777777777|1503|2041|5.43180306054557|
  189. |SC550650SW|22|28|5.09090909090909|87|119|5.47126436781609|360|475|5.27777777777777|1500|2049|5.464|
  190. |SC550651NE|20|30|6|86|117|5.44186046511627|364|479|5.26373626373626|1494|2032|5.44042838018741|
  191. |SC550651SE|21|30|5.71428571428571|85|113|5.31764705882352|364|479|5.26373626373626|1491|2037|5.46478873239436|
  192. |SC550652NE|28|42|6|85|113|5.31764705882352|369|485|5.25745257452574|1487|2020|5.43375924680564|
  193. [1]: https://stackoverflow.com/questions/72750975/fastest-way-to-get-geo-distance-for-large-dataset-in-r/72755385#72755385
  194. </details>
  195. # 答案2
  196. **得分**: 1
  197. 作为一个认为一定有人比我做得更好的人,我会寻找别人制作的解决方案。
  198. 因此,我认为使用`sf``spdep`包有一些优势。它们都是专门为空间数据设计的。
  199. 有两个函数可以开始使用。
  200. 创建一个sf对象
  201. ```R
  202. x <- sf::st_as_sf(Data, coords = c("X", "Y"))

距离可以使用以下方式计算

  1. st_distance()

当然,一定会找到一种不需要计算所有距离的解决方案。一个简单的算法是将点按照所需半径的一半大小分成不同的箱子。然后,您只需要检查相邻箱子中的点。

英文:

Being the man who says there must be people who can do it better than me, I would look for solutions made by others.

So I see some advantages in using the sf and spdep packages. Both are explicitly designed for spatial data.

There are two functions to start with.

Create an sf-object

  1. x &lt;- sf::st_as_sf(Data, coords = c(&quot;X&quot;, &quot;Y&quot;))

and the distances could be done with the

  1. st_distance()

Surely a solution has to be found that does not compute all the distances. A simple algorithm is to sort the points into bins of half the size of the desired radius. Then you only need to check the points in neighbouring bins.

答案3

得分: 1

使用data.table可能会有帮助,但对于这种情况,我认为计算时间仍然会非常长。也许你还是想尝试一下:

  1. library(data.table)
  2. tbl.data <- as.data.table(Data)
  3. tbl.data[, Instances_200 := mapply(function (x,y) {tbl.data[X<x+200 & X > x -200
  4. & Y < y+200 & Y > y -200, .N ]}, X, Y)]
  5. tbl.data[, Count_200 := mapply(function (x,y) {tbl.data[X<x+200 & X > x -200
  6. & Y < y+200 & Y > y -200, sum(Count) ]}, X, Y)]
  7. tbl.data[, Calculation_200 := Count_200/(Instances_200*0.000625)]

等等。

英文:

Using data.table could help .. but for this case, i think the calculation time will still be far too huge ..

May be you want to try anyway :

  1. library(data.table)
  2. tbl.data &lt;- as.data.table(Data)
  3. tbl.data[, Instances_200 := mapply(function (x,y) {tbl.data[X&lt;x+200 &amp; X &gt; x -200
  4. &amp; Y &lt; y+200 &amp; Y &gt; y -200, .N ]}, X, Y)]
  5. tbl.data[, Count_200 := mapply(function (x,y) {tbl.data[X&lt;x+200 &amp; X &gt; x -200
  6. &amp; Y &lt; y+200 &amp; Y &gt; y -200, sum(Count) ]}, X, Y)]
  7. tbl.data[, Calculation_200 := Count_200/(Instances_200*0.000625)]

And so on.

答案4

得分: 1

我理解您需要翻译的部分是代码和相关注释。以下是已翻译的内容:

  1. library(tictoc)
  2. library(data.table)
  3. # 选择数据集大小
  4. num.rows <- 1e5
  5. # 输入数据
  6. tbl.data <- data.table(
  7. X = runif(num.rows, min = 200000, max = 400000),
  8. Y = runif(num.rows, min = 400000, max = 500000),
  9. UID = 1:num.rows,
  10. Count = sample(1:2000, num.rows, replace = TRUE)
  11. )
  12. setkey(tbl.data, UID)
  13. # 创建超级数据堆栈
  14. tbl.data.1600 <- copy(tbl.data)
  15. tbl.data.0800 <- copy(tbl.data)
  16. tbl.data.0400 <- copy(tbl.data)
  17. tbl.data.0200 <- copy(tbl.data)
  18. tbl.data.1600[, within_metric := 1600]
  19. tbl.data.0800[, within_metric := 800]
  20. tbl.data.0400[, within_metric := 400]
  21. tbl.data.0200[, within_metric := 200]
  22. tbl.data.1600[, group := 1]
  23. tbl.data.0800[, group := 2]
  24. tbl.data.0400[, group := 3]
  25. tbl.data.0200[, group := 4]
  26. tbl.data.stack <- copy(rbind(tbl.data.1600, tbl.data.0800, tbl.data.0400, tbl.data.0200))
  27. setkey(tbl.data.stack, group, UID)
  28. tbl.data.stack[, X_low_bound := X - within_metric]
  29. tbl.data.stack[, X_upp_bound := X + within_metric]
  30. tbl.data.stack[, Y_low_bound := Y - within_metric]
  31. tbl.data.stack[, Y_upp_bound := Y + within_metric]
  32. Xvec <- tbl.data.stack[group == 1]$X
  33. Yvec <- tbl.data.stack[group == 1]$Y
  34. Cvec <- tbl.data.stack[group == 1]$Count
  35. # 定义函数 instances
  36. instances <- function(x = Xvec, y = Yvec, count = Cvec, xlb, xub, ylb, yub) {
  37. truth <- data.table::between(x, xlb, xub) & data.table::between(y, ylb, yub)
  38. c("instances" = sum(truth), "sumCount" = sum(count * truth))
  39. }
  40. # 定义函数 instances_stacked
  41. instances_stacked <- function(DT, x = Xvec, y = Yvec, count = Cvec) {
  42. DT[, as.list(instances(xlb = X_low_bound, xub = X_upp_bound, ylb = Y_low_bound, yub = Y_upp_bound)), by = c("within_metric", "UID")]
  43. }
  44. # 定义函数 split_df
  45. split_df <- function(d, var) {
  46. base::split(d, get(var, as.environment(d)))
  47. }
  48. # 定义函数 foo2
  49. foo2 <- function(dt) {
  50. dt2 <- split_df(dt, "group_half")
  51. require(parallel)
  52. cl <- parallel::makeCluster(4)
  53. print(cl)
  54. clusterExport(cl, varlist = "instances_stacked")
  55. clusterExport(cl, varlist = "instances")
  56. clusterExport(cl, varlist = c("Xvec", "Yvec", "Cvec"))
  57. clusterExport(cl, varlist = "dt2", envir = environment())
  58. clusterEvalQ(cl, library("data.table"))
  59. dt2 <- parallel::parLapply(cl, X = dt2, fun = instances_stacked)
  60. parallel::stopCluster(cl)
  61. return(rbindlist(dt2))
  62. }
  63. tic("使用 SOCKET 的并行计算:")
  64. results_stacked_socket <- foo2(tbl.data.stack)
  65. results_stacked_socket[, calculation := sumCount / (instances * 0.25)]
  66. toc()
  67. setkey(results_stacked_socket, UID, within_metric)
  68. head(results_stacked_socket)

请注意,我已经为您翻译了代码和相关注释,以便更容易理解。如果您需要进一步的翻译或有其他问题,请随时提出。

英文:

I am trying to understand OP's answer post, specifically the output table.

Focusing on the first row of the output table -- I see that SC550650NE is reported to have 20 instances of another point within 200 of (X=255085.6, Y=465070.1) and a sum-count of 24.

  1. &gt; Data[ID==&quot;SC550650NE&quot;]
  2. X Y UID Count ID distance
  3. 1: 255085.6 465070.1 832751 1908 SC550650NE 0.03272809

So I calculate the distance of all the points relative to (X=255085.6, Y=465070.1). And then display those that are within 200 in ascending order. I count 29, not 20. Also, when I sum the counts, I get 25,463, not 24.

  1. &gt; Data[,distance:=sqrt((X-255085.6)^2 + (Y-465070.1)^2)]
  2. &gt; Data[distance &lt;= 200][order(distance)]
  3. X Y UID Count ID distance
  4. 1: 255085.6 465070.1 832751 1908 SC550650NE 0.03272809
  5. 2: 255093.6 465124.7 1371253 1513 SC550651SE 55.16468957
  6. 3: 255034.2 465011.8 3512521 396 SC550650SW 77.73592845
  7. 4: 255049.8 464993.6 2603353 255 SC550649NW 84.47194005
  8. 5: 255091.7 465162.7 193893 1784 SC550651NE 92.82767523
  9. 6: 254968.1 465083.5 2245518 186 SC549650NE 118.25898594
  10. 7: 255180.5 464970.4 3376500 826 SC551649NE 137.66330325
  11. 8: 255148.5 464947.4 3473247 1526 SC551649SW 137.86046404
  12. 9: 255136.8 465202.7 2560816 212 SC551652SW 142.12504000
  13. 10: 255089.6 465224.3 2788575 1429 SC550652SE 154.24575868
  14. 11: 255213.6 465158.4 2051343 331 SC552651NW 155.52946932
  15. 12: 255234.1 465015.9 601896 1175 SC552650SW 158.08971475
  16. 13: 255231.4 464993.8 2562794 1209 SC552649NW 164.57602726
  17. 14: 255236.8 465003.9 2579822 624 SC552650SW 165.05736525
  18. 15: 255163.8 465219.0 2793140 491 SC551652SE 168.19129349
  19. 16: 254997.8 465224.2 3686855 1948 SC549652SE 177.38964783
  20. 17: 255055.7 464895.0 3850088 1904 SC550648NE 177.60704244
  21. 18: 254952.1 465189.1 4992168 668 SC549651NE 178.80180844
  22. 19: 255237.6 465165.5 3778801 950 SC552651NW 179.50253647
  23. 20: 255148.1 464900.4 2842521 204 SC551649SW 180.86815284
  24. 21: 255035.4 464891.2 933305 854 SC550648NW 185.79738782
  25. 22: 255245.6 465175.2 1453263 446 SC552651NW 191.44182598
  26. 23: 255227.8 465200.5 1557782 851 SC552652SW 192.90236356
  27. 24: 254894.7 465031.3 657336 1247 SC548650SE 194.79510938
  28. 25: 255102.8 464875.4 3051082 148 SC551648NW 195.41964698
  29. 26: 254914.2 464971.4 3090917 28 SC549649NW 197.75429388
  30. 27: 255030.4 464879.7 895456 109 SC550648NW 198.20424559
  31. 28: 255281.9 465101.9 3970383 1810 SC552651SE 198.85139040
  32. 29: 255108.3 465268.3 1968621 431 SC551652NW 199.51847902
  33. X Y UID Count ID distance
  34. &gt; Data[distance &lt;= 200, sum(Count)]
  35. [1] 25463

Edit

5 million rows

So after ill-advisedly using the comments for discussion (sorry, SO!) we have a better sense of our situation. @Chris has a solution that gives an approximation but this was a concession that was being made to try and lower the computation time. During the discussion, we also learned that the distance metric may be a square or a circle, and 4 cores are available on the machine.

I will present two solutions, one for the circle metric and one for the square metric. The circle metric is faster.

Both follow the approach in this SO post.

Circle metric

  • e.g. distance = sqrt( (x1-x2)^2 + (y1-y2)^2) ) &lt;= 200
  1. library(tictoc)
  2. library(data.table)

Pick one of the num.rows. Start small and work up to get a sense of timing. I have 32GB as well (on a mac) so I'm hoping it is comparable. Roughly it seemed doubling the number of rows led to a 4x increase in computation time.

  1. ### Pick one:
  2. ###
  3. ### num.rows &lt;- 4e4 ### 12 seconds on 4 nodes
  4. ### num.rows &lt;- 0.078125e6 ### 43 seconds on 4 nodes
  5. ### num.rows &lt;- 0.15625e6 ### 154 seconds on 4 nodes
  6. ### num.rows &lt;- 0.3125e6 ### 10.5 min on 4 nodes
  7. ### num.rows &lt;- 0.625e6 ### 44. min on 4 nodes
  8. ### num.rows &lt;- 1.25e6 ### ~3 hours
  9. ### num.rows &lt;- 2.5e6 ### ~12 hours
  10. ## num.rows &lt;- 5.0e6 ### ~ 42.53 hours on 4 nodes VERIFIED.
  11. set.seed(123)
  12. # Input data
  13. tbl.data &lt;- data.table(
  14. X=runif(num.rows, min=200000, max=400000),
  15. Y=runif(num.rows, min=400000, max=500000),
  16. UID = 1:num.rows,
  17. Count = sample(1:2000,num.rows,replace=TRUE))
  18. setkey(tbl.data, UID)
  19. tbl.data
  20. setkey(tbl.data, UID)

This approach depends on passing the entire X, Y, and Count columns of the dataset. We denote them Xvec, Yvec, and Cvec, respectively. From there we make a function, instances() that will first calculate the distance of 1 point (x.in, y.in) to all the other points in (Xvec, Yvec), calculate the instances and do the sumCount (the density "calculation" is done at the end and doesn't need to be done within this step or be parallelized).

This function, instances(), is made to be deployed within a data.table, and is setup to do so in the function instances.stacked(). Since we are passing the entire entire X, Y, and Count columns of the dataset and operating row-wise in the original dataset, we can tackle the problem in parallel. If we had parallel::detectCores() of 5 million, we'd have one core per row and be done in a jiffy. However, we have 4, so we take a moment to make a variable group_quarters that will split the dataset into 4 chunks via foo2()'s deployment of split_df() (both of these adapted from the example in aforementioned SO post).

  1. Xvec &lt;- tbl.data$X
  2. Yvec &lt;- tbl.data$Y
  3. Cvec &lt;- tbl.data$Count
  4. ## based on https://stackoverflow.com/a/44814035/2727349
  5. instances &lt;- function(x=Xvec,y=Yvec, count=Cvec, x.in, y.in){
  6. calcdist &lt;- sqrt( (x-x.in)^2+ (y-y.in)^2 )
  7. truth_1600 &lt;- calcdist &lt;= 1600
  8. result_1600 &lt;- c(&quot;instances_1600&quot; = sum( truth_1600 ), &quot;sumCount_1600&quot;= sum( Cvec*truth_1600))
  9. truth_0800 &lt;- calcdist &lt;= 800
  10. result_0800 &lt;- c(&quot;instances_0800&quot; = sum( truth_0800 ), &quot;sumCount_0800&quot;= sum( Cvec*truth_0800))
  11. truth_0400 &lt;- calcdist &lt;= 400
  12. result_0400 &lt;- c(&quot;instances_0400&quot; = sum( truth_0400 ), &quot;sumCount_0400&quot;= sum( Cvec*truth_0400))
  13. truth_0200 &lt;- calcdist &lt;= 200
  14. result_0200 &lt;- c(&quot;instances_0200&quot; = sum( truth_0200 ), &quot;sumCount_0200&quot;= sum( Cvec*truth_0200))
  15. result=c(result_0200, result_0400, result_0800, result_1600)
  16. return(result)
  17. }
  1. instances_stacked &lt;- function(DT, x=Xvec,y=Yvec, count=Cvec){
  2. DT[,
  3. as.list(instances( x.in=X, y.in=Y)),
  4. by=c(&quot;UID&quot;)
  5. ]
  6. }
  7. split_df &lt;- function(d, var) {
  8. base::split(d, get(var, as.environment(d)))
  9. }
  1. ## define group_quarters roughly as 25% population
  2. tbl.data[UID &lt; 0.25 * num.rows , group_quarters:=&quot;1st q&quot;]
  3. tbl.data[UID &lt; 0.50 * num.rows &amp; UID &gt;= 0.25 * num.rows, group_quarters:=&quot;2nd q&quot;]
  4. tbl.data[UID &lt; 0.75 * num.rows &amp; UID &gt;= 0.50 * num.rows, group_quarters:=&quot;3rd q&quot;]
  5. tbl.data[ UID &gt;= 0.75 * num.rows, group_quarters:=&quot;4th q&quot;]
  6. tbl.data[, uniqueN(group_quarters)]
  7. tbl.data[, table(group_quarters)]
  1. foo2 &lt;- function(dt) {
  2. dt2 &lt;- split_df(dt, &quot;group_quarters&quot;)
  3. require(parallel)
  4. cl &lt;- parallel::makeCluster(4) ## 4 chosen based on parallel::detectCores()
  5. print(cl)
  6. clusterExport(cl, varlist= &quot;instances_stacked&quot;) ## this was the `foo` in https://stackoverflow.com/a/44814035/2727349
  7. clusterExport(cl, varlist= &quot;instances&quot;)
  8. clusterExport(cl, varlist= c(&quot;Xvec&quot;,&quot;Yvec&quot;,&quot;Cvec&quot;))
  9. clusterExport(cl, varlist= &quot;dt2&quot;, envir = environment())
  10. clusterEvalQ(cl, library(&quot;data.table&quot;))
  11. dt2 &lt;- parallel::parLapply(cl, X= dt2, fun=instances_stacked)
  12. parallel::stopCluster(cl)
  13. return(rbindlist(dt2))
  14. }

The call to foo2() will deploy the analysis in parallel and combine everything back together into results_wide. We can then do the density calculation and view some results.

  1. tic(&quot;foo2() runs on 4 data chunks (1/4 sized) in parallel: &quot;)
  2. results_wide &lt;- foo2(tbl.data)
  3. ## next 4 lines done after the parallel part
  4. results_wide[,calculation_1600:=sumCount_1600/(instances_1600*0.25)]
  5. results_wide[,calculation_0800:=sumCount_0800/(instances_0800*0.25)]
  6. results_wide[,calculation_0400:=sumCount_0400/(instances_0400*0.25)]
  7. results_wide[,calculation_0200:=sumCount_0200/(instances_0200*0.25)]
  8. toc()
  9. ## let&#39;s see the results
  10. head(results_wide)
  11. ## for num.rows&lt;-5e6,
  12. ## should match the intro example at top of the post:
  13. results_wide[UID==832751]

Which gives (after 42 1/2 hours):

  1. &gt; head(results_wide)
  2. UID instances_0200 sumCount_0200 instances_0400 sumCount_0400 instances_0800
  3. 1: 1 28 25348 115 110519 510
  4. 2: 2 38 37791 123 128978 498
  5. 3: 3 28 26958 123 131816 506
  6. 4: 4 33 33861 125 113013 514
  7. 5: 5 31 36492 123 130939 522
  8. 6: 6 37 40887 136 141213 480
  9. sumCount_0800 instances_1600 sumCount_1600 calculation_1600 calculation_0800
  10. 1: 521004 2025 2015967 3982.157 4086.306
  11. 2: 499358 2001 1999339 3996.680 4010.908
  12. 3: 507117 2013 2003329 3980.783 4008.830
  13. 4: 513712 2065 2115524 4097.867 3997.759
  14. 5: 516877 2054 1990644 3876.619 3960.743
  15. 6: 491969 1952 2001461 4101.355 4099.742
  16. calculation_0400 calculation_0200
  17. 1: 3844.139 3621.143
  18. 2: 4194.407 3978.000
  19. 3: 4286.699 3851.143
  20. 4: 3616.416 4104.364
  21. 5: 4258.179 4708.645
  22. 6: 4153.324 4420.216
  1. ## for num.rows &lt;- 5e6, should match the
  2. ## intro example at top of the post for
  3. ## Data[ID==&quot;SC550650NE&quot;]
  4. &gt; results_wide[UID==832751]
  5. UID instances_0200 sumCount_0200 instances_0400 sumCount_0400
  6. 1: 832751 29 25463 122 110993
  7. instances_0800 sumCount_0800 instances_1600 sumCount_1600 calculation_1600
  8. 1: 480 467032 2048 2054509 4012.713
  9. calculation_0800 calculation_0400 calculation_0200
  10. 1: 3891.933 3639.115 3512.138

Square metric

  • e.g. Question Post's (X-200, X+200) &amp; (Y-200, Y+200)

...NOTE...I overloaded some function names so might be best to put this code in separate script and separate R session from the circle metric code

Similar idea to circle metric approach above, but the execution is slightly different because it is based on the bounds as opposed to the distance. Basically, we make a super stack of the data and calculate the boundaries specific for each distance. These four "boundaries" make a natural candidate for chunking the data for parallelizing -- this code goes one step farther and breaks each of those 4 in half resulting in 8 groups. We still run on 4 nodes, though. I tested a bunch on 8 nodes bc my testing environment allowed for this and I developed this code before I knew OP had a 4 core environment.

  1. library(data.table)
  2. library(tictoc)
  3. library(dplyr)
  4. set.seed(123)
  5. ### Pick one:
  6. ### num.rows &lt;- 4e4 ### 15 seconds on 8 nodes | 23 seconds on 4 nodes
  7. num.rows &lt;- 1e5 ### 86 seconds on 8 nodes | 140.034 seconds on 4 nodes
  8. ### num.rows &lt;- 2e5 ### 347 seconds on 8 nodes
  9. ### num.rows &lt;- 4e5 ### 1370 seconds on 8 nodes
  10. ### num.rows &lt;- 8e5 ### 1370*4 = 90 minutes?
  11. ### num.rows &lt;- 1.6e6 ### 1370*4*4 = 360 minutes = 6 hours (confirmed: 21742.233 / 60 / 60 = 6 hours)
  12. ### num.rows &lt;- 2.5e6 ### 47523 / 60 / 60 = 13 hours and 12 minutes VERIFIED.
  13. ### num.rows &lt;- 5.0e6 ### 4*47523 / 60 / 60 = ~53 hours estimated.
  14. # Input data
  15. tbl.data &lt;- data.table(
  16. X=runif(num.rows, min=200000, max=400000),
  17. Y=runif(num.rows, min=400000, max=500000),
  18. UID = 1:num.rows,
  19. Count = sample(1:2000,num.rows,replace=TRUE))
  20. setkey(tbl.data, UID)
  21. tbl.data
  22. setkey(tbl.data, UID)

Make a superstack:

  1. tbl.data.1600 &lt;- copy(tbl.data)
  2. tbl.data.0800 &lt;- copy(tbl.data)
  3. tbl.data.0400 &lt;- copy(tbl.data)
  4. tbl.data.0200 &lt;- copy(tbl.data)
  5. tbl.data.1600[, within_metric:=1600]
  6. tbl.data.0800[, within_metric:= 800]
  7. tbl.data.0400[, within_metric:= 400]
  8. tbl.data.0200[, within_metric:= 200]
  9. tbl.data.1600[, group:=1]
  10. tbl.data.0800[, group:=2]
  11. tbl.data.0400[, group:=3]
  12. tbl.data.0200[, group:=4]
  13. tbl.data.stack &lt;- copy(rbind(tbl.data.1600, tbl.data.0800, tbl.data.0400, tbl.data.0200))
  14. setkey(tbl.data.stack, group, UID)
  15. tbl.data.stack[,X_low_bound:=X-within_metric]
  16. tbl.data.stack[,X_upp_bound:=X+within_metric]
  17. tbl.data.stack[,Y_low_bound:=Y-within_metric]
  18. tbl.data.stack[,Y_upp_bound:=Y+within_metric]
  19. Xvec &lt;- tbl.data.stack[group==1]$X
  20. Yvec &lt;- tbl.data.stack[group==1]$Y
  21. Cvec &lt;- tbl.data.stack[group==1]$Count

I include this commented chunk -- this is how one could run it without parallelizing it. Go ahead and skip for now to next block.

  1. ###################
  2. ## BEGIN ##########
  3. ###################
  4. ## no parallel. ###
  5. ## just stacked ###
  6. ###################
  7. # instances_stacked &lt;- function(x=Xvec,y=Yvec, count=Cvec, xlb, xub, ylb, yub){
  8. #
  9. # truth &lt;- data.table::between(x, xlb, xub) &amp; data.table::between(y, ylb, yub)
  10. #
  11. # c(&quot;instances&quot; = sum( truth ), &quot;sumCount&quot;= sum( Cvec*truth))
  12. #
  13. # }
  14. #
  15. #
  16. # tic(&quot;external default vector approach -- STACKED -- all 4: &quot;)
  17. # results_stacked &lt;-
  18. # tbl.data.stack[,
  19. # as.list(instances_stacked( xlb=X_low_bound, xub=X_upp_bound, ylb=Y_low_bound, yub=Y_upp_bound)),
  20. # by=c(&quot;group&quot;,&quot;UID&quot;)
  21. # ]
  22. #
  23. # results_stacked[,calculation:=sumCount/(instances*0.25)]
  24. # toc()
  25. #
  26. # head(results_stacked)
  27. ###################
  28. ## END. ##########
  29. ###################
  30. ## no parallel. ###
  31. ## just stacked ###
  32. ###################

Below is the parallel code. I broke into 8 groups, but changed the makeCluster line to 4 nodes for your computer.

  1. ###################
  2. ## BEGIN ##########
  3. ###################
  4. ## SOCKET. ###
  5. ## ROCKET. ###
  6. ###################
  7. ## follow the leader: https://stackoverflow.com/a/44814035/2727349
  8. ## step 1: change instances_stacked (which is our `foo`) so that it returns a list. Also change it to where
  9. ## it takes data table as an argument.
  10. ## step 2: use split_df -- keep in mind might need to make a concatenated grouping of &quot;group&quot; and &quot;UID&quot;
  11. instances &lt;- function(x=Xvec,y=Yvec, count=Cvec, xlb, xub, ylb, yub){
  12. truth &lt;- data.table::between(x, xlb, xub) &amp; data.table::between(y, ylb, yub)
  13. c(&quot;instances&quot; = sum( truth ), &quot;sumCount&quot;= sum( Cvec*truth))
  14. }
  15. instances_stacked &lt;- function(DT, x=Xvec,y=Yvec, count=Cvec){
  16. DT[,
  17. as.list(instances( xlb=X_low_bound, xub=X_upp_bound, ylb=Y_low_bound, yub=Y_upp_bound)),
  18. by=c(&quot;within_metric&quot;,&quot;UID&quot;)
  19. ]
  20. }
  21. split_df &lt;- function(d, var) {
  22. base::split(d, get(var, as.environment(d)))
  23. }
  24. ## define grps to be distance-bound and UID combo
  25. tbl.data.stack[,grps:=paste0(within_metric,&quot;_&quot;,UID)]
  26. tbl.data.stack[, uniqueN(grps)]
  27. ## define group_half to be distance-bound and UID &lt; 1/2 num.rows
  28. tbl.data.stack[,group_half:=paste0(within_metric,&quot;_&quot;, (UID &lt; 0.5 * num.rows)+0L)]
  29. tbl.data.stack[, uniqueN(group_half)]
  30. foo2 &lt;- function(dt) {
  31. #dt2 &lt;- split_df(dt, &quot;grps&quot;)
  32. #dt2 &lt;- split_df(dt, &quot;group&quot;)
  33. dt2 &lt;- split_df(dt, &quot;group_half&quot;)
  34. require(parallel)
  35. ##cl &lt;- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
  36. #cl &lt;- parallel::makeCluster(tbl.data.stack[, uniqueN(group_half)])
  37. cl &lt;- parallel::makeCluster(4)
  38. print(cl)
  39. clusterExport(cl, varlist= &quot;instances_stacked&quot;) ## this was the `foo` in https://stackoverflow.com/a/44814035/2727349
  40. clusterExport(cl, varlist= &quot;instances&quot;)
  41. clusterExport(cl, varlist= c(&quot;Xvec&quot;,&quot;Yvec&quot;,&quot;Cvec&quot;))
  42. clusterExport(cl, varlist= &quot;dt2&quot;, envir = environment())
  43. clusterEvalQ(cl, library(&quot;data.table&quot;))
  44. dt2 &lt;- parallel::parLapply(cl, X= dt2, fun=instances_stacked)
  45. parallel::stopCluster(cl)
  46. return(rbindlist(dt2))
  47. }
  48. tic(&quot;external default vector approach -- SOCKET -- all 4 in parallel: &quot;)
  49. results_stacked_socket &lt;- foo2(tbl.data.stack)
  50. results_stacked_socket[,calculation:=sumCount/(instances*0.25)]
  51. toc()
  52. setkey(results_stacked_socket, UID, within_metric)
  53. head(results_stacked_socket)
  54. ###################
  55. ## END. ##########
  56. ###################
  57. ## SOCKET. ###
  58. ## ROCKET. ###
  59. ###################

huangapple
  • 本文由 发表于 2023年7月17日 21:35:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76705013.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定